Browse Source

Merge pull request #709 from majestrate/low-latency-merge

implement latency control option
pull/712/head
orignal 8 years ago committed by GitHub
parent
commit
b1333b7d99
  1. 6
      ClientContext.cpp
  2. 4
      Config.cpp
  3. 16
      Destination.cpp
  4. 6
      Destination.h
  5. 8
      HTTPServer.cpp
  6. 2
      NetDb.h
  7. 31
      Tunnel.cpp
  8. 24
      Tunnel.h
  9. 55
      TunnelPool.cpp
  10. 15
      TunnelPool.h
  11. 114
      docs/hacking.md

6
ClientContext.cpp

@ -372,6 +372,8 @@ namespace client @@ -372,6 +372,8 @@ namespace client
options[I2CP_PARAM_INBOUND_TUNNELS_QUANTITY] = GetI2CPOption (section, I2CP_PARAM_INBOUND_TUNNELS_QUANTITY, DEFAULT_INBOUND_TUNNELS_QUANTITY);
options[I2CP_PARAM_OUTBOUND_TUNNELS_QUANTITY] = GetI2CPOption (section, I2CP_PARAM_OUTBOUND_TUNNELS_QUANTITY, DEFAULT_OUTBOUND_TUNNELS_QUANTITY);
options[I2CP_PARAM_TAGS_TO_SEND] = GetI2CPOption (section, I2CP_PARAM_TAGS_TO_SEND, DEFAULT_TAGS_TO_SEND);
options[I2CP_PARAM_MIN_TUNNEL_LATENCY] = GetI2CPOption(section, I2CP_PARAM_MIN_TUNNEL_LATENCY, DEFAULT_MIN_TUNNEL_LATENCY);
options[I2CP_PARAM_MAX_TUNNEL_LATENCY] = GetI2CPOption(section, I2CP_PARAM_MAX_TUNNEL_LATENCY, DEFAULT_MAX_TUNNEL_LATENCY);
}
void ClientContext::ReadI2CPOptionsFromConfig (const std::string& prefix, std::map<std::string, std::string>& options) const
@ -385,6 +387,10 @@ namespace client @@ -385,6 +387,10 @@ namespace client
options[I2CP_PARAM_OUTBOUND_TUNNEL_LENGTH] = value;
if (i2p::config::GetOption(prefix + I2CP_PARAM_OUTBOUND_TUNNELS_QUANTITY, value))
options[I2CP_PARAM_OUTBOUND_TUNNELS_QUANTITY] = value;
if (i2p::config::GetOption(prefix + I2CP_PARAM_MIN_TUNNEL_LATENCY, value))
options[I2CP_PARAM_MIN_TUNNEL_LATENCY] = value;
if (i2p::config::GetOption(prefix + I2CP_PARAM_MAX_TUNNEL_LATENCY, value))
options[I2CP_PARAM_MAX_TUNNEL_LATENCY] = value;
}
void ClientContext::ReadTunnels ()

4
Config.cpp

@ -87,6 +87,8 @@ namespace config { @@ -87,6 +87,8 @@ namespace config {
("httpproxy.outbound.length", value<std::string>()->default_value("3"), "HTTP proxy outbound tunnel length")
("httpproxy.inbound.quantity", value<std::string>()->default_value("5"), "HTTP proxy inbound tunnels quantity")
("httpproxy.outbound.quantity", value<std::string>()->default_value("5"), "HTTP proxy outbound tunnels quantity")
("httpproxy.latency.min", value<int>()->default_value(0), "HTTP proxy min latency for tunnels")
("httpproxy.latency.max", value<int>()->default_value(0), "HTTP proxy max latency for tunnels")
;
options_description socksproxy("SOCKS Proxy options");
@ -99,6 +101,8 @@ namespace config { @@ -99,6 +101,8 @@ namespace config {
("socksproxy.outbound.length", value<std::string>()->default_value("3"), "SOCKS proxy outbound tunnel length")
("socksproxy.inbound.quantity", value<std::string>()->default_value("5"), "SOCKS proxy inbound tunnels quantity")
("socksproxy.outbound.quantity", value<std::string>()->default_value("5"), "SOCKS proxy outbound tunnels quantity")
("socksproxy.latency.min", value<int>()->default_value(0), "SOCKS proxy min latency for tunnels")
("socksproxy.latency.max", value<int>()->default_value(0), "SOCKS proxy max latency for tunnels")
("socksproxy.outproxy", value<std::string>()->default_value("127.0.0.1"), "Upstream outproxy address for SOCKS Proxy")
("socksproxy.outproxyport", value<uint16_t>()->default_value(9050), "Upstream outproxy port for SOCKS Proxy")
;

16
Destination.cpp

@ -63,6 +63,22 @@ namespace client @@ -63,6 +63,22 @@ namespace client
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (inLen, outLen, inQty, outQty);
if (explicitPeers)
m_Pool->SetExplicitPeers (explicitPeers);
if(params)
{
auto itr = params->find(I2CP_PARAM_MAX_TUNNEL_LATENCY);
if (itr != params->end()) {
auto maxlatency = std::stoi(itr->second);
itr = params->find(I2CP_PARAM_MIN_TUNNEL_LATENCY);
if (itr != params->end()) {
auto minlatency = std::stoi(itr->second);
if ( minlatency > 0 && maxlatency > 0 ) {
// set tunnel pool latency
LogPrint(eLogInfo, "Destination: requiring tunnel latency [", minlatency, "ms, ", maxlatency, "ms]");
m_Pool->RequireLatency(minlatency, maxlatency);
}
}
}
}
}
LeaseSetDestination::~LeaseSetDestination ()

6
Destination.h

@ -50,6 +50,12 @@ namespace client @@ -50,6 +50,12 @@ namespace client
const char I2CP_PARAM_TAGS_TO_SEND[] = "crypto.tagsToSend";
const int DEFAULT_TAGS_TO_SEND = 40;
// latency
const char I2CP_PARAM_MIN_TUNNEL_LATENCY[] = "latency.min";
const int DEFAULT_MIN_TUNNEL_LATENCY = 0;
const char I2CP_PARAM_MAX_TUNNEL_LATENCY[] = "latency.max";
const int DEFAULT_MAX_TUNNEL_LATENCY = 0;
typedef std::function<void (std::shared_ptr<i2p::stream::Stream> stream)> StreamRequestComplete;
class LeaseSetDestination: public i2p::garlic::GarlicDestination,

8
HTTPServer.cpp

@ -300,12 +300,16 @@ namespace http { @@ -300,12 +300,16 @@ namespace http {
s << "<b>Inbound tunnels:</b><br>\r\n";
for (auto & it : pool->GetInboundTunnels ()) {
it->Print(s);
if(it->LatencyIsKnown())
s << " ( " << it->GetMeanLatency() << "ms )";
ShowTunnelDetails(s, it->GetState (), it->GetNumReceivedBytes ());
}
s << "<br>\r\n";
s << "<b>Outbound tunnels:</b><br>\r\n";
for (auto & it : pool->GetOutboundTunnels ()) {
it->Print(s);
if(it->LatencyIsKnown())
s << " ( " << it->GetMeanLatency() << "ms )";
ShowTunnelDetails(s, it->GetState (), it->GetNumSentBytes ());
}
}
@ -401,12 +405,16 @@ namespace http { @@ -401,12 +405,16 @@ namespace http {
s << "<b>Inbound tunnels:</b><br>\r\n";
for (auto & it : i2p::tunnel::tunnels.GetInboundTunnels ()) {
it->Print(s);
if(it->LatencyIsKnown())
s << " ( " << it->GetMeanLatency() << "ms )";
ShowTunnelDetails(s, it->GetState (), it->GetNumReceivedBytes ());
}
s << "<br>\r\n";
s << "<b>Outbound tunnels:</b><br>\r\n";
for (auto & it : i2p::tunnel::tunnels.GetOutboundTunnels ()) {
it->Print(s);
if(it->LatencyIsKnown())
s << " ( " << it->GetMeanLatency() << "ms )";
ShowTunnelDetails(s, it->GetState (), it->GetNumSentBytes ());
}
s << "<br>\r\n";

2
NetDb.h

@ -99,8 +99,8 @@ namespace data @@ -99,8 +99,8 @@ namespace data
void VisitRouterInfos(RouterInfoVisitor v);
/** visit N random router that match using filter, then visit them with a visitor, return number of RouterInfos that were visited */
size_t VisitRandomRouterInfos(RouterInfoFilter f, RouterInfoVisitor v, size_t n);
void ClearRouterInfos () { m_RouterInfos.clear (); };
void ClearRouterInfos () { m_RouterInfos.clear (); };
private:

31
Tunnel.cpp

@ -21,6 +21,31 @@ namespace i2p @@ -21,6 +21,31 @@ namespace i2p
namespace tunnel
{
void TunnelLatency::AddSample(Sample s)
{
std::unique_lock<std::mutex> l(m_access);
m_samples.push_back(s);
}
bool TunnelLatency::HasSamples() const
{
std::unique_lock<std::mutex> l(m_access);
return m_samples.size() > 0;
}
TunnelLatency::Latency TunnelLatency::GetMeanLatency() const
{
std::unique_lock<std::mutex> lock(m_access);
if (m_samples.size() > 0) {
Latency l = 0;
for(auto s : m_samples)
l += s;
return l / m_samples.size();
}
return 0;
}
Tunnel::Tunnel (std::shared_ptr<const TunnelConfig> config):
TunnelBase (config->GetTunnelID (), config->GetNextTunnelID (), config->GetNextIdentHash ()),
m_Config (config), m_Pool (nullptr), m_State (eTunnelStatePending), m_IsRecreated (false)
@ -162,6 +187,12 @@ namespace tunnel @@ -162,6 +187,12 @@ namespace tunnel
return established;
}
bool Tunnel::LatencyFitsRange(uint64_t lower, uint64_t upper) const
{
auto latency = GetMeanLatency();
return latency >= lower && latency <= upper;
}
void Tunnel::EncryptTunnelMsg (std::shared_ptr<const I2NPMessage> in, std::shared_ptr<I2NPMessage> out)
{
const uint8_t * inPayload = in->GetPayload () + 4;

24
Tunnel.h

@ -79,6 +79,21 @@ namespace tunnel @@ -79,6 +79,21 @@ namespace tunnel
eTunnelStateExpiring
};
/** @brief for storing latency history */
struct TunnelLatency
{
typedef uint64_t Sample;
typedef uint64_t Latency;
void AddSample(Sample s);
bool HasSamples() const;
Latency GetMeanLatency() const;
std::vector<Sample> m_samples;
mutable std::mutex m_access;
};
class OutboundTunnel;
class InboundTunnel;
class Tunnel: public TunnelBase
@ -118,6 +133,14 @@ namespace tunnel @@ -118,6 +133,14 @@ namespace tunnel
void SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg);
void EncryptTunnelMsg (std::shared_ptr<const I2NPMessage> in, std::shared_ptr<I2NPMessage> out);
/** @brief add latency sample */
void AddLatencySample(const uint64_t ms) { m_Latency.AddSample(ms); }
/** @brief get this tunnel's estimated latency */
uint64_t GetMeanLatency() const { return m_Latency.GetMeanLatency(); }
/** @breif return true if this tunnel's latency fits in range [lowerbound, upperbound] */
bool LatencyFitsRange(uint64_t lowerbound, uint64_t upperbound) const;
bool LatencyIsKnown() const { return m_Latency.HasSamples(); }
protected:
void PrintHops (std::stringstream& s) const;
@ -129,6 +152,7 @@ namespace tunnel @@ -129,6 +152,7 @@ namespace tunnel
std::shared_ptr<TunnelPool> m_Pool; // pool, tunnel belongs to, or null
TunnelState m_State;
bool m_IsRecreated;
TunnelLatency m_Latency;
};
class OutboundTunnel: public Tunnel

55
TunnelPool.cpp

@ -164,6 +164,21 @@ namespace tunnel @@ -164,6 +164,21 @@ namespace tunnel
uint32_t ind = rand () % (tunnels.size ()/2 + 1), i = 0;
typename TTunnels::value_type tunnel = nullptr;
for (const auto& it: tunnels)
{
if (it->IsEstablished () && it != excluded)
{
if(HasLatencyRequirement() && it->LatencyIsKnown() && !it->LatencyFitsRange(m_MinLatency, m_MaxLatency)) {
i ++;
continue;
}
tunnel = it;
i++;
}
if (i > ind && tunnel) break;
}
if(HasLatencyRequirement() && !tunnel) {
ind = rand () % (tunnels.size ()/2 + 1), i = 0;
for (const auto& it: tunnels)
{
if (it->IsEstablished () && it != excluded)
{
@ -172,6 +187,7 @@ namespace tunnel @@ -172,6 +187,7 @@ namespace tunnel
}
if (i > ind && tunnel) break;
}
}
if (!tunnel && excluded && excluded->IsEstablished ()) tunnel = excluded;
return tunnel;
}
@ -322,7 +338,12 @@ namespace tunnel @@ -322,7 +338,12 @@ namespace tunnel
test.first->SetState (eTunnelStateEstablished);
if (test.second->GetState () == eTunnelStateTestFailed)
test.second->SetState (eTunnelStateEstablished);
LogPrint (eLogDebug, "Tunnels: test of ", msgID, " successful. ", i2p::util::GetMillisecondsSinceEpoch () - timestamp, " milliseconds");
uint64_t dlt = i2p::util::GetMillisecondsSinceEpoch () - timestamp;
LogPrint (eLogDebug, "Tunnels: test of ", msgID, " successful. ", dlt, " milliseconds");
// update latency
uint64_t latency = dlt / 2;
test.first->AddLatencySample(latency);
test.second->AddLatencySample(latency);
}
else
{
@ -523,5 +544,37 @@ namespace tunnel @@ -523,5 +544,37 @@ namespace tunnel
std::lock_guard<std::mutex> lock(m_CustomPeerSelectorMutex);
return m_CustomPeerSelector != nullptr;
}
std::shared_ptr<InboundTunnel> TunnelPool::GetLowestLatencyInboundTunnel(std::shared_ptr<InboundTunnel> exclude) const
{
std::shared_ptr<InboundTunnel> tun = nullptr;
std::unique_lock<std::mutex> lock(m_InboundTunnelsMutex);
uint64_t min = 1000000;
for (const auto & itr : m_InboundTunnels) {
if(!itr->LatencyIsKnown()) continue;
auto l = itr->GetMeanLatency();
if (l >= min) continue;
tun = itr;
if(tun == exclude) continue;
min = l;
}
return tun;
}
std::shared_ptr<OutboundTunnel> TunnelPool::GetLowestLatencyOutboundTunnel(std::shared_ptr<OutboundTunnel> exclude) const
{
std::shared_ptr<OutboundTunnel> tun = nullptr;
std::unique_lock<std::mutex> lock(m_OutboundTunnelsMutex);
uint64_t min = 1000000;
for (const auto & itr : m_OutboundTunnels) {
if(!itr->LatencyIsKnown()) continue;
auto l = itr->GetMeanLatency();
if (l >= min) continue;
tun = itr;
if(tun == exclude) continue;
min = l;
}
return tun;
}
}
}

15
TunnelPool.h

@ -69,6 +69,17 @@ namespace tunnel @@ -69,6 +69,17 @@ namespace tunnel
void SetCustomPeerSelector(TunnelPeerSelector selector);
void UnsetCustomPeerSelector();
bool HasCustomPeerSelector();
/** @brief make this tunnel pool yield tunnels that fit latency range [min, max] */
void RequireLatency(uint64_t min, uint64_t max) { m_MinLatency = min; m_MaxLatency = max; }
/** @brief return true if this tunnel pool has a latency requirement */
bool HasLatencyRequirement() const { return m_MinLatency > 0 && m_MaxLatency > 0; }
/** @brief get the lowest latency tunnel in this tunnel pool regardless of latency requirements */
std::shared_ptr<InboundTunnel> GetLowestLatencyInboundTunnel(std::shared_ptr<InboundTunnel> exclude=nullptr) const;
std::shared_ptr<OutboundTunnel> GetLowestLatencyOutboundTunnel(std::shared_ptr<OutboundTunnel> exclude=nullptr) const;
private:
void CreateInboundTunnel ();
@ -94,6 +105,10 @@ namespace tunnel @@ -94,6 +105,10 @@ namespace tunnel
bool m_IsActive;
std::mutex m_CustomPeerSelectorMutex;
TunnelPeerSelector m_CustomPeerSelector;
uint64_t m_MinLatency=0; // if > 0 this tunnel pool will try building tunnels with minimum latency by ms
uint64_t m_MaxLatency=0; // if > 0 this tunnel pool will try building tunnels with maximum latency by ms
public:
// for HTTP only

114
docs/hacking.md

@ -0,0 +1,114 @@ @@ -0,0 +1,114 @@
# Hacking on I2PD
This document contains notes compiled from hacking on i2pd
## prerequisites
This guide assumes:
* a decent understanding of c++
* basic understanding of how i2p works at i2np level and up
## general structure
Notes on multithreading
* every compontent runs in its own thread
* each component (usually) has a public function `GetService()` which can be used to obtain the `boost::asio::io_service` that it uses.
* when talking between components/threads, **always** use `GetService().post()` and be mindfull of stack allocated memory.
### NetDb
#### NetDb.h
The `i2p::data::netdb` is a `i2p::data::NetDb` instance processes and dispatches *inbound* i2np messages passed in from transports.
global singleton at `i2p::data::netdb` as of 2.10.1
#### NetDbRequests.h
For Pending RouterInfo/LeaseSet lookup and store requests
### ClientContext
#### ClientContext.h
`i2p::client::ClientContext` spawns all destinations used by the i2p router including the shared local destination.
global singleton at `i2p::client::context` as of 2.10.1
### Daemon
File: Daemon.cpp
`i2p::util::Daemon_Singleton_Private` subclasses implement the daemon start-up and tear-down, creates Http Webui and i2p control server.
### Destinations
#### Destination.h
each destination runs in its own thread
##### i2p::client::LeaseSetDestination
Base for `i2p::client::ClientDestination`
##### i2p::client::ClientDestination
Destination capable of creating (tcp/i2p) streams and datagram sessions.
#### Streaming.h
##### i2p::stream::StreamingDestination
Does not implement any destination related members, the name is a bit misleading.
Owns a `i2p::client::ClientDestination` and runs in the destination thread.
Anyone creating or using streams outside of the destination thread **MUST** be aware of the consequences of multithreaded c++ :^)
If you use streaming please consider running all code within the destination thread using `ClientDestination::GetService().post()`
#### Garlic.h
Provides Inter-Destination routing primatives.
##### i2p::garlic::GarlicDestination
sublcass of `i2p::client::LeaseSetDestination` for sending messages down shared routing paths.
##### i2p::garlic::GarlicRoutingSession
a point to point conversation between us and 1 other destination.
##### i2p::garlic::GarlicRoutingPath
A routing path currently used by a routing session. specifies which outbound tunnel to use and which remote lease set to use for `OBEP` to `IBGW` inter tunnel communication.
members:
* outboundTunnel (OBEP)
* remoteLease (IBGW)
* rtt (round trip time)
* updatedTime (last time this path's IBGW/OBEP was updated)
* numTimesUsesd (number of times this path was used)
### Transports
each transport runs in its own thread
#### Transports.h
`i2p::transport::Transports` contains NTCP and SSU transport instances
Loading…
Cancel
Save