Browse Source

Merge pull request #628 from majestrate/merge_udp_tunnel

Merge Recent Changes
pull/633/head
orignal 8 years ago committed by GitHub
parent
commit
9ecbbf09cc
  1. 7
      AddressBook.cpp
  2. 192
      ClientContext.cpp
  3. 26
      ClientContext.h
  4. 2
      DaemonLinux.cpp
  5. 412
      Datagram.cpp
  6. 117
      Datagram.h
  7. 74
      Destination.cpp
  8. 23
      Destination.h
  9. 9
      FS.cpp
  10. 4
      FS.h
  11. 2
      Garlic.cpp
  12. 2
      Garlic.h
  13. 3
      HTTPServer.cpp
  14. 7
      I2PService.cpp
  15. 258
      I2PTunnel.cpp
  16. 116
      I2PTunnel.h
  17. 15
      Identity.cpp
  18. 6
      Identity.h
  19. 19
      LeaseSet.cpp
  20. 13
      LeaseSet.h
  21. 22
      Log.h
  22. 11
      Makefile.linux
  23. 2
      NTCPSession.cpp
  24. 67
      NetDb.cpp
  25. 20
      NetDb.h
  26. 96
      Streaming.cpp
  27. 42
      Streaming.h
  28. 10
      Tag.h
  29. 1
      Transports.cpp
  30. 35
      TunnelPool.cpp
  31. 21
      TunnelPool.h

7
AddressBook.cpp

@ -546,8 +546,8 @@ namespace client @@ -546,8 +546,8 @@ namespace client
auto datagram = dest->GetDatagramDestination ();
if (!datagram)
datagram = dest->CreateDatagramDestination ();
datagram->SetReceiver (std::bind (&AddressBook::HandleLookupResponse, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5),
datagram->SetReceiver (std::bind (&AddressBook::HandleLookupResponse, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5),
ADDRESS_RESPONSE_DATAGRAM_PORT);
}
}
@ -558,8 +558,7 @@ namespace client @@ -558,8 +558,7 @@ namespace client
if (dest)
{
auto datagram = dest->GetDatagramDestination ();
if (datagram)
datagram->ResetReceiver (ADDRESS_RESPONSE_DATAGRAM_PORT);
if (datagram) datagram->ResetReceiver (ADDRESS_RESPONSE_DATAGRAM_PORT);
}
}

192
ClientContext.cpp

@ -17,7 +17,8 @@ namespace client @@ -17,7 +17,8 @@ namespace client
ClientContext::ClientContext (): m_SharedLocalDestination (nullptr),
m_HttpProxy (nullptr), m_SocksProxy (nullptr), m_SamBridge (nullptr),
m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr)
m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr),
m_CleanupUDPTimer(m_Service, boost::posix_time::seconds(1))
{
}
@ -39,8 +40,9 @@ namespace client @@ -39,8 +40,9 @@ namespace client
m_SharedLocalDestination->Start ();
}
m_AddressBook.Start ();
std::shared_ptr<ClientDestination> localDestination;
bool httproxy; i2p::config::GetOption("httpproxy.enabled", httproxy);
if (httproxy) {
@ -51,8 +53,10 @@ namespace client @@ -51,8 +53,10 @@ namespace client
if (httpProxyKeys.length () > 0)
{
i2p::data::PrivateKeys keys;
LoadPrivateKeys (keys, httpProxyKeys);
localDestination = CreateNewLocalDestination (keys, false);
if(LoadPrivateKeys (keys, httpProxyKeys))
localDestination = CreateNewLocalDestination (keys, false);
else
LogPrint(eLogError, "Clients: failed to load HTTP Proxy key");
}
try {
m_HttpProxy = new i2p::proxy::HTTPProxy(httpProxyAddr, httpProxyPort, localDestination);
@ -61,7 +65,7 @@ namespace client @@ -61,7 +65,7 @@ namespace client
LogPrint(eLogError, "Clients: Exception in HTTP Proxy: ", e.what());
}
}
bool socksproxy; i2p::config::GetOption("socksproxy.enabled", socksproxy);
if (socksproxy) {
std::string socksProxyKeys; i2p::config::GetOption("socksproxy.keys", socksProxyKeys);
@ -83,10 +87,21 @@ namespace client @@ -83,10 +87,21 @@ namespace client
LogPrint(eLogError, "Clients: Exception in SOCKS Proxy: ", e.what());
}
}
if ( m_ServiceThread == nullptr ) {
m_ServiceThread = new std::thread([&] () {
LogPrint(eLogInfo, "ClientContext: starting service");
m_Service.run();
LogPrint(eLogError, "ClientContext: service died");
});
ScheduleCleanupUDP();
}
// I2P tunnels
ReadTunnels ();
// SAM
bool sam; i2p::config::GetOption("sam.enabled", sam);
if (sam) {
@ -193,20 +208,40 @@ namespace client @@ -193,20 +208,40 @@ namespace client
}
LogPrint(eLogInfo, "Clients: stopping AddressBook");
m_AddressBook.Stop ();
m_AddressBook.Stop ();
{
std::lock_guard<std::mutex> lock(m_ForwardsMutex);
m_ServerForwards.clear();
m_ClientForwards.clear();
}
for (auto& it: m_Destinations)
it.second->Stop ();
m_Destinations.clear ();
m_SharedLocalDestination = nullptr;
m_SharedLocalDestination = nullptr;
// stop io service thread
if(m_ServiceThread)
{
m_Service.stop();
m_ServiceThread->join();
delete m_ServiceThread;
m_ServiceThread = nullptr;
}
}
void ClientContext::ReloadConfig ()
{
ReadTunnels (); // TODO: it reads new tunnels only, should be implemented better
std::string config; i2p::config::GetOption("conf", config);
i2p::config::ParseConfig(config);
Stop();
Start();
}
void ClientContext::LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType)
bool ClientContext::LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType)
{
bool success = true;
std::string fullPath = i2p::fs::DataDirPath (filename);
std::ifstream s(fullPath, std::ifstream::binary);
if (s.is_open ())
@ -216,9 +251,14 @@ namespace client @@ -216,9 +251,14 @@ namespace client
s.seekg (0, std::ios::beg);
uint8_t * buf = new uint8_t[len];
s.read ((char *)buf, len);
keys.FromBuffer (buf, len);
if(!keys.FromBuffer (buf, len))
{
LogPrint (eLogError, "Clients: failed to load keyfile ", filename);
success = false;
}
else
LogPrint (eLogInfo, "Clients: Local address ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " loaded");
delete[] buf;
LogPrint (eLogInfo, "Clients: Local address ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " loaded");
}
else
{
@ -232,7 +272,31 @@ namespace client @@ -232,7 +272,31 @@ namespace client
delete[] buf;
LogPrint (eLogInfo, "Clients: New private keys file ", fullPath, " for ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " created");
}
}
return success;
}
std::vector<std::shared_ptr<DatagramSessionInfo> > ClientContext::GetForwardInfosFor(const i2p::data::IdentHash & destination)
{
std::vector<std::shared_ptr<DatagramSessionInfo> > infos;
std::lock_guard<std::mutex> lock(m_ForwardsMutex);
for(const auto & c : m_ClientForwards)
{
if (c.second->IsLocalDestination(destination))
{
for (auto & i : c.second->GetSessions()) infos.push_back(i);
break;
}
}
for(const auto & s : m_ServerForwards)
{
if(std::get<0>(s.first) == destination)
{
for( auto & i : s.second->GetSessions()) infos.push_back(i);
break;
}
}
return infos;
}
std::shared_ptr<ClientDestination> ClientContext::CreateNewLocalDestination (bool isPublic, i2p::data::SigningKeyType sigType,
@ -337,7 +401,7 @@ namespace client @@ -337,7 +401,7 @@ namespace client
try
{
std::string type = section.second.get<std::string> (I2P_TUNNELS_SECTION_TYPE);
if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT)
if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT || type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT)
{
// mandatory params
std::string dest = section.second.get<std::string> (I2P_CLIENT_TUNNEL_DESTINATION);
@ -355,22 +419,43 @@ namespace client @@ -355,22 +419,43 @@ namespace client
if (keys.length () > 0)
{
i2p::data::PrivateKeys k;
LoadPrivateKeys (k, keys, sigType);
localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ());
if (!localDestination)
localDestination = CreateNewLocalDestination (k, false, &options);
if(LoadPrivateKeys (k, keys, sigType))
{
localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ());
if (!localDestination)
localDestination = CreateNewLocalDestination (k, type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT, &options);
}
}
auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort);
if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (),
std::unique_ptr<I2PClientTunnel>(clientTunnel))).second)
{
clientTunnel->Start ();
numClientTunnels++;
if (type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) {
// udp client
// TODO: hostnames
boost::asio::ip::udp::endpoint end(boost::asio::ip::address::from_string(address), port);
if (!localDestination)
{
localDestination = m_SharedLocalDestination;
}
auto clientTunnel = new I2PUDPClientTunnel(name, dest, end, localDestination, destinationPort);
if(m_ClientForwards.insert(std::make_pair(end, std::unique_ptr<I2PUDPClientTunnel>(clientTunnel))).second)
{
clientTunnel->Start();
}
else
LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists");
} else {
// tcp client
auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort);
if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (),
std::unique_ptr<I2PClientTunnel>(clientTunnel))).second)
{
clientTunnel->Start ();
numClientTunnels++;
}
else
LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists");
}
else
LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists");
}
else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC)
else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER)
{
// mandatory params
std::string host = section.second.get<std::string> (I2P_SERVER_TUNNEL_HOST);
@ -383,17 +468,43 @@ namespace client @@ -383,17 +468,43 @@ namespace client
std::string webircpass = section.second.get<std::string> (I2P_SERVER_TUNNEL_WEBIRC_PASSWORD, "");
bool gzip = section.second.get (I2P_SERVER_TUNNEL_GZIP, true);
i2p::data::SigningKeyType sigType = section.second.get (I2P_SERVER_TUNNEL_SIGNATURE_TYPE, i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256);
uint32_t maxConns = section.second.get(i2p::stream::I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN, i2p::stream::DEFAULT_MAX_CONNS_PER_MIN);
std::string address = section.second.get<std::string> (I2P_SERVER_TUNNEL_ADDRESS, "127.0.0.1");
// I2CP
std::map<std::string, std::string> options;
ReadI2CPOptions (section, options);
std::shared_ptr<ClientDestination> localDestination = nullptr;
i2p::data::PrivateKeys k;
LoadPrivateKeys (k, keys, sigType);
if(!LoadPrivateKeys (k, keys, sigType))
continue;
localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ());
if (!localDestination)
localDestination = CreateNewLocalDestination (k, true, &options);
if (type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER)
{
// udp server tunnel
// TODO: hostnames
auto localAddress = boost::asio::ip::address::from_string(address);
boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port);
I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, localAddress, endpoint, port);
std::lock_guard<std::mutex> lock(m_ForwardsMutex);
if(m_ServerForwards.insert(
std::make_pair(
std::make_pair(
localDestination->GetIdentHash(), port),
std::unique_ptr<I2PUDPServerTunnel>(serverTunnel))).second)
{
serverTunnel->Start();
LogPrint(eLogInfo, "Clients: I2P Server Forward created for UDP Endpoint ", host, ":", port, " bound on ", address, " for ",localDestination->GetIdentHash().ToBase32());
}
else
LogPrint(eLogError, "Clients: I2P Server Forward for destination/port ", m_AddressBook.ToAddress(localDestination->GetIdentHash()), "/", port, "already exists");
continue;
}
I2PServerTunnel * serverTunnel;
if (type == I2P_TUNNELS_SECTION_TYPE_HTTP)
serverTunnel = new I2PServerTunnelHTTP (name, host, port, localDestination, hostOverride, inPort, gzip);
@ -402,6 +513,10 @@ namespace client @@ -402,6 +513,10 @@ namespace client
else // regular server tunnel by default
serverTunnel = new I2PServerTunnel (name, host, port, localDestination, inPort, gzip);
LogPrint(eLogInfo, "Clients: Set Max Conns To ", maxConns);
serverTunnel->SetMaxConnsPerMinute(maxConns);
if (accessList.length () > 0)
{
std::set<i2p::data::IdentHash> idents;
@ -439,6 +554,23 @@ namespace client @@ -439,6 +554,23 @@ namespace client
}
LogPrint (eLogInfo, "Clients: ", numClientTunnels, " I2P client tunnels created");
LogPrint (eLogInfo, "Clients: ", numServerTunnels, " I2P server tunnels created");
}
}
void ClientContext::ScheduleCleanupUDP()
{
// schedule cleanup in 1 second
m_CleanupUDPTimer.expires_at(m_CleanupUDPTimer.expires_at() + boost::posix_time::seconds(1));
m_CleanupUDPTimer.async_wait(std::bind(&ClientContext::CleanupUDP, this, std::placeholders::_1));
}
void ClientContext::CleanupUDP(const boost::system::error_code & ecode)
{
if(!ecode)
{
std::lock_guard<std::mutex> lock(m_ForwardsMutex);
for ( auto & s : m_ServerForwards ) s.second->ExpireStale();
ScheduleCleanupUDP();
}
}
}
}

26
ClientContext.h

@ -24,6 +24,8 @@ namespace client @@ -24,6 +24,8 @@ namespace client
const char I2P_TUNNELS_SECTION_TYPE_SERVER[] = "server";
const char I2P_TUNNELS_SECTION_TYPE_HTTP[] = "http";
const char I2P_TUNNELS_SECTION_TYPE_IRC[] = "irc";
const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient";
const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver";
const char I2P_CLIENT_TUNNEL_PORT[] = "port";
const char I2P_CLIENT_TUNNEL_ADDRESS[] = "address";
const char I2P_CLIENT_TUNNEL_DESTINATION[] = "destination";
@ -39,7 +41,8 @@ namespace client @@ -39,7 +41,8 @@ namespace client
const char I2P_SERVER_TUNNEL_ACCESS_LIST[] = "accesslist";
const char I2P_SERVER_TUNNEL_GZIP[] = "gzip";
const char I2P_SERVER_TUNNEL_WEBIRC_PASSWORD[] = "webircpassword";
const char I2P_SERVER_TUNNEL_ADDRESS[] = "address";
class ClientContext
{
public:
@ -59,11 +62,13 @@ namespace client @@ -59,11 +62,13 @@ namespace client
const std::map<std::string, std::string> * params = nullptr);
void DeleteLocalDestination (std::shared_ptr<ClientDestination> destination);
std::shared_ptr<ClientDestination> FindLocalDestination (const i2p::data::IdentHash& destination) const;
void LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256);
bool LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256);
AddressBook& GetAddressBook () { return m_AddressBook; };
const SAMBridge * GetSAMBridge () const { return m_SamBridge; };
std::vector<std::shared_ptr<DatagramSessionInfo> > GetForwardInfosFor(const i2p::data::IdentHash & destination);
private:
void ReadTunnels ();
@ -72,6 +77,9 @@ namespace client @@ -72,6 +77,9 @@ namespace client
template<typename Section>
void ReadI2CPOptions (const Section& section, std::map<std::string, std::string>& options) const;
void CleanupUDP(const boost::system::error_code & ecode);
void ScheduleCleanupUDP();
private:
std::mutex m_DestinationsMutex;
@ -84,15 +92,27 @@ namespace client @@ -84,15 +92,27 @@ namespace client
i2p::proxy::SOCKSProxy * m_SocksProxy;
std::map<boost::asio::ip::tcp::endpoint, std::unique_ptr<I2PClientTunnel> > m_ClientTunnels; // local endpoint->tunnel
std::map<std::pair<i2p::data::IdentHash, int>, std::unique_ptr<I2PServerTunnel> > m_ServerTunnels; // <destination,port>->tunnel
std::mutex m_ForwardsMutex;
std::map<boost::asio::ip::udp::endpoint, std::unique_ptr<I2PUDPClientTunnel> > m_ClientForwards; // local endpoint -> udp tunnel
std::map<std::pair<i2p::data::IdentHash, int>, std::unique_ptr<I2PUDPServerTunnel> > m_ServerForwards; // <destination,port> -> udp tunnel
SAMBridge * m_SamBridge;
BOBCommandChannel * m_BOBCommandChannel;
I2CPServer * m_I2CPServer;
boost::asio::io_service m_Service;
std::thread * m_ServiceThread;
boost::asio::deadline_timer m_CleanupUDPTimer;
public:
// for HTTP
const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; };
const decltype(m_ClientTunnels)& GetClientTunnels () const { return m_ClientTunnels; };
const decltype(m_ServerTunnels)& GetServerTunnels () const { return m_ServerTunnels; };
const decltype(m_ClientForwards)& GetClientForwards () const { return m_ClientForwards; }
const decltype(m_ServerForwards)& GetServerForwards () const { return m_ServerForwards; }
};
extern ClientContext context;

2
DaemonLinux.cpp

@ -20,7 +20,7 @@ void handle_signal(int sig) @@ -20,7 +20,7 @@ void handle_signal(int sig)
switch (sig)
{
case SIGHUP:
LogPrint(eLogInfo, "Daemon: Got SIGHUP, reopening log...");
LogPrint(eLogInfo, "Daemon: Got SIGHUP, reopening logs and tunnel configuration...");
i2p::log::Logger().Reopen ();
i2p::client::context.ReloadConfig();
break;

412
Datagram.cpp

@ -12,72 +12,45 @@ namespace i2p @@ -12,72 +12,45 @@ namespace i2p
namespace datagram
{
DatagramDestination::DatagramDestination (std::shared_ptr<i2p::client::ClientDestination> owner):
m_Owner (owner), m_Receiver (nullptr)
m_Owner (owner.get()),
m_CleanupTimer(owner->GetService()),
m_Receiver (nullptr)
{
ScheduleCleanup();
}
DatagramDestination::~DatagramDestination ()
{
m_CleanupTimer.cancel();
m_Sessions.clear();
}
void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort)
{
auto owner = m_Owner;
auto i = owner->GetIdentity();
uint8_t buf[MAX_DATAGRAM_SIZE];
auto identityLen = m_Owner->GetIdentity ()->ToBuffer (buf, MAX_DATAGRAM_SIZE);
auto identityLen = i->ToBuffer (buf, MAX_DATAGRAM_SIZE);
uint8_t * signature = buf + identityLen;
auto signatureLen = m_Owner->GetIdentity ()->GetSignatureLen ();
auto signatureLen = i->GetSignatureLen ();
uint8_t * buf1 = signature + signatureLen;
size_t headerLen = identityLen + signatureLen;
memcpy (buf1, payload, len);
if (m_Owner->GetIdentity ()->GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1)
if (i->GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1)
{
uint8_t hash[32];
SHA256(buf1, len, hash);
m_Owner->Sign (hash, 32, signature);
owner->Sign (hash, 32, signature);
}
else
m_Owner->Sign (buf1, len, signature);
owner->Sign (buf1, len, signature);
auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort);
auto remote = m_Owner->FindLeaseSet (ident);
if (remote)
m_Owner->GetService ().post (std::bind (&DatagramDestination::SendMsg, this, msg, remote));
else
m_Owner->RequestDestination (ident, std::bind (&DatagramDestination::HandleLeaseSetRequestComplete, this, std::placeholders::_1, msg));
auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort);
auto session = ObtainSession(ident);
session->SendMsg(msg);
}
void DatagramDestination::HandleLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> remote, std::shared_ptr<I2NPMessage> msg)
{
if (remote)
SendMsg (msg, remote);
}
void DatagramDestination::SendMsg (std::shared_ptr<I2NPMessage> msg, std::shared_ptr<const i2p::data::LeaseSet> remote)
{
auto outboundTunnel = m_Owner->GetTunnelPool ()->GetNextOutboundTunnel ();
auto leases = remote->GetNonExpiredLeases ();
if (!leases.empty () && outboundTunnel)
{
std::vector<i2p::tunnel::TunnelMessageBlock> msgs;
uint32_t i = rand () % leases.size ();
auto garlic = m_Owner->WrapMessage (remote, msg, true);
msgs.push_back (i2p::tunnel::TunnelMessageBlock
{
i2p::tunnel::eDeliveryTypeTunnel,
leases[i]->tunnelGateway, leases[i]->tunnelID,
garlic
});
outboundTunnel->SendTunnelDataMsg (msgs);
}
else
{
if (outboundTunnel)
LogPrint (eLogWarning, "Failed to send datagram. All leases expired");
else
LogPrint (eLogWarning, "Failed to send datagram. No outbound tunnels");
}
}
void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
@ -98,18 +71,26 @@ namespace datagram @@ -98,18 +71,26 @@ namespace datagram
if (verified)
{
auto it = m_ReceiversByPorts.find (toPort);
if (it != m_ReceiversByPorts.end ())
it->second (identity, fromPort, toPort, buf + headerLen, len -headerLen);
else if (m_Receiver != nullptr)
m_Receiver (identity, fromPort, toPort, buf + headerLen, len -headerLen);
auto r = FindReceiver(toPort);
if(r)
r(identity, fromPort, toPort, buf + headerLen, len -headerLen);
else
LogPrint (eLogWarning, "Receiver for datagram is not set");
LogPrint (eLogWarning, "DatagramDestination: no receiver for port ", toPort);
}
else
LogPrint (eLogWarning, "Datagram signature verification failed");
}
DatagramDestination::Receiver DatagramDestination::FindReceiver(uint16_t port)
{
std::lock_guard<std::mutex> lock(m_ReceiversMutex);
Receiver r = m_Receiver;
auto itr = m_ReceiversByPorts.find(port);
if (itr != m_ReceiversByPorts.end())
r = itr->second;
return r;
}
void DatagramDestination::HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
// unzip it
@ -137,7 +118,338 @@ namespace datagram @@ -137,7 +118,338 @@ namespace datagram
else
msg = nullptr;
return msg;
}
}
void DatagramDestination::ScheduleCleanup()
{
m_CleanupTimer.expires_from_now(boost::posix_time::seconds(DATAGRAM_SESSION_CLEANUP_INTERVAL));
m_CleanupTimer.async_wait(std::bind(&DatagramDestination::HandleCleanUp, this, std::placeholders::_1));
}
void DatagramDestination::HandleCleanUp(const boost::system::error_code & ecode)
{
if(ecode)
return;
std::lock_guard<std::mutex> lock(m_SessionsMutex);
auto now = i2p::util::GetMillisecondsSinceEpoch();
LogPrint(eLogDebug, "DatagramDestination: clean up sessions");
std::vector<i2p::data::IdentHash> expiredSessions;
// for each session ...
for (auto & e : m_Sessions) {
// check if expired
if(now - e.second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE)
expiredSessions.push_back(e.first); // we are expired
}
// for each expired session ...
for (auto & ident : expiredSessions) {
// remove the expired session
LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", ident.ToBase32());
m_Sessions.erase(ident);
}
m_Owner->CleanupExpiredTags();
ScheduleCleanup();
}
std::shared_ptr<DatagramSession> DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident)
{
std::shared_ptr<DatagramSession> session = nullptr;
std::lock_guard<std::mutex> lock(m_SessionsMutex);
auto itr = m_Sessions.find(ident);
if (itr == m_Sessions.end()) {
// not found, create new session
session = std::make_shared<DatagramSession>(m_Owner, ident);
m_Sessions[ident] = session;
} else {
session = itr->second;
}
return session;
}
std::shared_ptr<DatagramSession::Info> DatagramDestination::GetInfoForRemote(const i2p::data::IdentHash & remote)
{
std::lock_guard<std::mutex> lock(m_SessionsMutex);
for ( auto & item : m_Sessions)
{
if(item.first == remote) return std::make_shared<DatagramSession::Info>(item.second->GetSessionInfo());
}
return nullptr;
}
DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination,
const i2p::data::IdentHash & remoteIdent) :
m_LocalDestination(localDestination),
m_RemoteIdentity(remoteIdent),
m_LastUse(i2p::util::GetMillisecondsSinceEpoch ()),
m_LastPathChange(0),
m_LastSuccess(0)
{
}
void DatagramSession::SendMsg(std::shared_ptr<I2NPMessage> msg)
{
// we used this session
m_LastUse = i2p::util::GetMillisecondsSinceEpoch();
// schedule send
m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg));
}
DatagramSession::Info DatagramSession::GetSessionInfo() const
{
if(!m_RoutingSession)
return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess);
auto routingPath = m_RoutingSession->GetSharedRoutingPath();
if (!routingPath)
return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess);
auto lease = routingPath->remoteLease;
auto tunnel = routingPath->outboundTunnel;
if(lease)
{
if(tunnel)
return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess);
else
return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse, m_LastSuccess);
}
else if(tunnel)
return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess);
else
return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess);
}
void DatagramSession::HandleSend(std::shared_ptr<I2NPMessage> msg)
{
if(!m_RoutingSession)
{
// try to get one
if(m_RemoteLeaseSet) m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true);
else
{
UpdateLeaseSet(msg);
return;
}
}
// do we have a routing session?
if(m_RoutingSession)
{
// should we switch paths?
if(ShouldUpdateRoutingPath ())
{
LogPrint(eLogDebug, "DatagramSession: try getting new routing path");
// try switching paths
auto path = GetNextRoutingPath();
if(path)
UpdateRoutingPath (path);
else
ResetRoutingPath();
}
auto routingPath = m_RoutingSession->GetSharedRoutingPath ();
// make sure we have a routing path
if (routingPath)
{
auto outboundTunnel = routingPath->outboundTunnel;
if (outboundTunnel)
{
if(outboundTunnel->IsEstablished())
{
m_LastSuccess = i2p::util::GetMillisecondsSinceEpoch ();
// we have a routing path and routing session and the outbound tunnel we are using is good
// wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW
auto m = m_RoutingSession->WrapSingleMessage(msg);
routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{
i2p::tunnel::eDeliveryTypeTunnel,
routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID,
m
}});
return;
}
}
}
}
auto now = i2p::util::GetMillisecondsSinceEpoch ();
// if this path looks dead reset the routing path since we didn't seem to be able to get a path in time
if (m_LastPathChange && now - m_LastPathChange >= DATAGRAM_SESSION_PATH_TIMEOUT ) ResetRoutingPath();
UpdateLeaseSet(msg);
}
void DatagramSession::UpdateRoutingPath(const std::shared_ptr<i2p::garlic::GarlicRoutingPath> & path)
{
if(m_RoutingSession == nullptr && m_RemoteLeaseSet)
m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true);
if(!m_RoutingSession) return;
// set routing path and update time we last updated the routing path
m_RoutingSession->SetSharedRoutingPath (path);
m_LastPathChange = i2p::util::GetMillisecondsSinceEpoch ();
}
bool DatagramSession::ShouldUpdateRoutingPath() const
{
auto now = i2p::util::GetMillisecondsSinceEpoch ();
// we need to rotate paths becuase the routing path is too old
if (now - m_LastPathChange >= DATAGRAM_SESSION_PATH_SWITCH_INTERVAL) return true;
// our path looks dead so we need to rotate paths
if (now - m_LastSuccess >= DATAGRAM_SESSION_PATH_TIMEOUT) return true;
// if we have a routing session and routing path we don't need to switch paths
return m_RoutingSession != nullptr && m_RoutingSession->GetSharedRoutingPath () != nullptr;
}
bool DatagramSession::ShouldSwitchLease() const
{
std::shared_ptr<i2p::garlic::GarlicRoutingPath> routingPath = nullptr;
std::shared_ptr<const i2p::data::Lease> currentLease = nullptr;
if(m_RoutingSession)
routingPath = m_RoutingSession->GetSharedRoutingPath ();
if(routingPath)
currentLease = routingPath->remoteLease;
if(currentLease) // if we have a lease return true if it's about to expire otherwise return false
return currentLease->ExpiresWithin( DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE );
// we have no current lease, we should switch
return true;
}
std::shared_ptr<i2p::garlic::GarlicRoutingPath> DatagramSession::GetNextRoutingPath()
{
std::shared_ptr<i2p::tunnel::OutboundTunnel> outboundTunnel = nullptr;
std::shared_ptr<i2p::garlic::GarlicRoutingPath> routingPath = nullptr;
// get existing routing path if we have one
if(m_RoutingSession)
routingPath = m_RoutingSession->GetSharedRoutingPath();
// do we have an existing outbound tunnel and routing path?
if(routingPath && routingPath->outboundTunnel)
{
// is the outbound tunnel we are using good?
if (routingPath->outboundTunnel->IsEstablished())
{
// ya so let's stick with it
outboundTunnel = routingPath->outboundTunnel;
}
else
outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels
}
// do we have an outbound tunnel that works already ?
if(!outboundTunnel)
outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started
if(outboundTunnel)
{
std::shared_ptr<const i2p::data::Lease> lease = nullptr;
// should we switch leases ?
if (ShouldSwitchLease ())
{
// yes, get next available lease
lease = GetNextLease();
}
else if (routingPath)
{
if(routingPath->remoteLease)
{
if(routingPath->remoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE))
lease = GetNextLease();
else
lease = routingPath->remoteLease;
}
}
else
lease = GetNextLease();
if(lease)
{
// we have a valid lease to use and an outbound tunnel
// create new routing path
uint32_t now = i2p::util::GetSecondsSinceEpoch();
routingPath = std::make_shared<i2p::garlic::GarlicRoutingPath>(i2p::garlic::GarlicRoutingPath{
outboundTunnel,
lease,
0,
now,
0
});
}
else // we don't have a new routing path to give
routingPath = nullptr;
}
return routingPath;
}
void DatagramSession::ResetRoutingPath()
{
if(m_RoutingSession)
{
auto routingPath = m_RoutingSession->GetSharedRoutingPath();
if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path
{
// get outbound tunnel on this path
auto outboundTunnel = routingPath->outboundTunnel;
// is this outbound tunnel there and established
if (outboundTunnel && outboundTunnel->IsEstablished())
m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine
}
// reset the routing path
UpdateRoutingPath(nullptr);
}
}
std::shared_ptr<const i2p::data::Lease> DatagramSession::GetNextLease()
{
auto now = i2p::util::GetMillisecondsSinceEpoch ();
std::shared_ptr<const i2p::data::Lease> next = nullptr;
if(m_RemoteLeaseSet)
{
std::vector<i2p::data::IdentHash> exclude;
for(const auto & ident : m_InvalidIBGW)
exclude.push_back(ident);
// find get all leases that are not in our ban list and are not going to expire within our lease set handover window + fudge
auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude, now] (const i2p::data::Lease & l) -> bool {
if(exclude.size())
{
auto end = std::end(exclude);
return std::find_if(exclude.begin(), end, [l, now] ( const i2p::data::IdentHash & ident) -> bool {
return ident == l.tunnelGateway;
}) != end;
}
else
return false;
});
if(leases.size())
{
// pick random valid next lease
uint32_t idx = rand() % leases.size();
next = leases[idx];
}
else
LogPrint(eLogWarning, "DatagramDestination: no leases to use");
}
return next;
}
void DatagramSession::UpdateLeaseSet(std::shared_ptr<I2NPMessage> msg)
{
LogPrint(eLogInfo, "DatagramSession: updating lease set");
m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg));
}
void DatagramSession::HandleGotLeaseSet(std::shared_ptr<const i2p::data::LeaseSet> remoteIdent, std::shared_ptr<I2NPMessage> msg)
{
if(remoteIdent)
{
// update routing session
if(m_RoutingSession)
m_RoutingSession = nullptr;
m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true);
// clear invalid IBGW as we have a new lease set
m_InvalidIBGW.clear();
m_RemoteLeaseSet = remoteIdent;
// update routing path
auto path = GetNextRoutingPath();
if (path)
UpdateRoutingPath(path);
else
ResetRoutingPath();
// send the message that was queued if it was provided
if(msg)
HandleSend(msg);
}
}
}
}

117
Datagram.h

@ -9,6 +9,7 @@ @@ -9,6 +9,7 @@
#include "Identity.h"
#include "LeaseSet.h"
#include "I2NPProtocol.h"
#include "Garlic.h"
namespace i2p
{
@ -18,13 +19,100 @@ namespace client @@ -18,13 +19,100 @@ namespace client
}
namespace datagram
{
const size_t MAX_DATAGRAM_SIZE = 32768;
// seconds interval for cleanup timer
const int DATAGRAM_SESSION_CLEANUP_INTERVAL = 3;
// milliseconds for max session idle time
const uint64_t DATAGRAM_SESSION_MAX_IDLE = 10 * 60 * 1000;
// milliseconds for how long we try sticking to a dead routing path before trying to switch
const uint64_t DATAGRAM_SESSION_PATH_TIMEOUT = 5000;
// milliseconds interval a routing path is used before switching
const uint64_t DATAGRAM_SESSION_PATH_SWITCH_INTERVAL = 60 * 1000;
// milliseconds before lease expire should we try switching leases
const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW = 10 * 1000;
// milliseconds fudge factor for leases handover
const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE = 1000;
class DatagramSession
{
public:
DatagramSession(i2p::client::ClientDestination * localDestination,
const i2p::data::IdentHash & remoteIdent);
/** send an i2np message to remote endpoint for this session */
void SendMsg(std::shared_ptr<I2NPMessage> msg);
/** get the last time in milliseconds for when we used this datagram session */
uint64_t LastActivity() const { return m_LastUse; }
/** get the last time in milliseconds when we successfully sent data */
uint64_t LastSuccess() const { return m_LastSuccess; }
struct Info
{
std::shared_ptr<const i2p::data::IdentHash> IBGW;
std::shared_ptr<const i2p::data::IdentHash> OBEP;
const uint64_t activity;
const uint64_t success;
Info() : IBGW(nullptr), OBEP(nullptr), activity(0), success(0) {}
Info(const uint8_t * ibgw, const uint8_t * obep, const uint64_t a, const uint64_t s) :
activity(a),
success(s) {
if(ibgw) IBGW = std::make_shared<i2p::data::IdentHash>(ibgw);
else IBGW = nullptr;
if(obep) OBEP = std::make_shared<i2p::data::IdentHash>(obep);
else OBEP = nullptr;
}
};
Info GetSessionInfo() const;
private:
/** update our routing path we are using, mark that we have changed paths */
void UpdateRoutingPath(const std::shared_ptr<i2p::garlic::GarlicRoutingPath> & path);
/** return true if we should switch routing paths because of path lifetime or timeout otherwise false */
bool ShouldUpdateRoutingPath() const;
/** return true if we should switch the lease for out routing path otherwise return false */
bool ShouldSwitchLease() const;
/** get next usable routing path, try reusing outbound tunnels */
std::shared_ptr<i2p::garlic::GarlicRoutingPath> GetNextRoutingPath();
/**
* mark current routing path as invalid and clear it
* if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time
*/
void ResetRoutingPath();
/** get next usable lease, does not fetch or update if expired or have no lease set */
std::shared_ptr<const i2p::data::Lease> GetNextLease();
void HandleSend(std::shared_ptr<I2NPMessage> msg);
void HandleGotLeaseSet(std::shared_ptr<const i2p::data::LeaseSet> remoteIdent,
std::shared_ptr<I2NPMessage> msg);
void UpdateLeaseSet(std::shared_ptr<I2NPMessage> msg=nullptr);
private:
i2p::client::ClientDestination * m_LocalDestination;
i2p::data::IdentHash m_RemoteIdentity;
std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession;
// Ident hash of IBGW that are invalid
std::vector<i2p::data::IdentHash> m_InvalidIBGW;
std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet;
uint64_t m_LastUse;
uint64_t m_LastPathChange;
uint64_t m_LastSuccess;
};
const size_t MAX_DATAGRAM_SIZE = 32768;
class DatagramDestination
{
typedef std::function<void (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)> Receiver;
public:
DatagramDestination (std::shared_ptr<i2p::client::ClientDestination> owner);
~DatagramDestination ();
@ -34,21 +122,34 @@ namespace datagram @@ -34,21 +122,34 @@ namespace datagram
void SetReceiver (const Receiver& receiver) { m_Receiver = receiver; };
void ResetReceiver () { m_Receiver = nullptr; };
void SetReceiver (const Receiver& receiver, uint16_t port) { m_ReceiversByPorts[port] = receiver; };
void ResetReceiver (uint16_t port) { m_ReceiversByPorts.erase (port); };
void SetReceiver (const Receiver& receiver, uint16_t port) { std::lock_guard<std::mutex> lock(m_ReceiversMutex); m_ReceiversByPorts[port] = receiver; };
void ResetReceiver (uint16_t port) { std::lock_guard<std::mutex> lock(m_ReceiversMutex); m_ReceiversByPorts.erase (port); };
std::shared_ptr<DatagramSession::Info> GetInfoForRemote(const i2p::data::IdentHash & remote);
private:
void HandleLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet, std::shared_ptr<I2NPMessage> msg);
// clean up after next tick
void ScheduleCleanup();
// clean up stale sessions and expire tags
void HandleCleanUp(const boost::system::error_code & ecode);
std::shared_ptr<DatagramSession> ObtainSession(const i2p::data::IdentHash & ident);
std::shared_ptr<I2NPMessage> CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort);
void SendMsg (std::shared_ptr<I2NPMessage> msg, std::shared_ptr<const i2p::data::LeaseSet> remote);
void HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);
/** find a receiver by port, if none by port is found try default receiever, otherwise returns nullptr */
Receiver FindReceiver(uint16_t port);
private:
std::shared_ptr<i2p::client::ClientDestination> m_Owner;
i2p::client::ClientDestination * m_Owner;
boost::asio::deadline_timer m_CleanupTimer;
Receiver m_Receiver; // default
std::mutex m_SessionsMutex;
std::map<i2p::data::IdentHash, std::shared_ptr<DatagramSession> > m_Sessions;
std::mutex m_ReceiversMutex;
std::map<uint16_t, Receiver> m_ReceiversByPorts;
i2p::data::GzipInflator m_Inflator;

74
Destination.cpp

@ -168,14 +168,32 @@ namespace client @@ -168,14 +168,32 @@ namespace client
else
return false;
}
std::shared_ptr<const i2p::data::LeaseSet> LeaseSetDestination::FindLeaseSet (const i2p::data::IdentHash& ident)
{
std::lock_guard<std::mutex> lock(m_RemoteLeaseSetsMutex);
auto it = m_RemoteLeaseSets.find (ident);
if (it != m_RemoteLeaseSets.end ())
{
{
if (!it->second->IsExpired ())
{
if (it->second->ExpiresSoon())
{
LogPrint(eLogDebug, "Destination: Lease Set expires soon, updating before expire");
// update now before expiration for smooth handover
RequestDestination(ident, [this, ident] (std::shared_ptr<i2p::data::LeaseSet> ls) {
if(ls && !ls->IsExpired())
{
ls->PopulateLeases();
{
std::lock_guard<std::mutex> _lock(m_RemoteLeaseSetsMutex);
m_RemoteLeaseSets[ident] = ls;
}
}
});
}
return it->second;
}
else
LogPrint (eLogWarning, "Destination: remote LeaseSet expired");
}
@ -185,7 +203,10 @@ namespace client @@ -185,7 +203,10 @@ namespace client
if (ls && !ls->IsExpired ())
{
ls->PopulateLeases (); // since we don't store them in netdb
m_RemoteLeaseSets[ident] = ls;
{
std::lock_guard<std::mutex> lock(m_RemoteLeaseSetsMutex);
m_RemoteLeaseSets[ident] = ls;
}
return ls;
}
}
@ -280,6 +301,7 @@ namespace client @@ -280,6 +301,7 @@ namespace client
if (buf[DATABASE_STORE_TYPE_OFFSET] == 1) // LeaseSet
{
LogPrint (eLogDebug, "Remote LeaseSet");
std::lock_guard<std::mutex> lock(m_RemoteLeaseSetsMutex);
auto it = m_RemoteLeaseSets.find (buf + DATABASE_STORE_KEY_OFFSET);
if (it != m_RemoteLeaseSets.end ())
{
@ -628,6 +650,7 @@ namespace client @@ -628,6 +650,7 @@ namespace client
void LeaseSetDestination::CleanupRemoteLeaseSets ()
{
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
std::lock_guard<std::mutex> lock(m_RemoteLeaseSetsMutex);
for (auto it = m_RemoteLeaseSets.begin (); it != m_RemoteLeaseSets.end ();)
{
if (it->second->IsEmpty () || ts > it->second->GetExpirationTime ()) // leaseset expired
@ -642,7 +665,8 @@ namespace client @@ -642,7 +665,8 @@ namespace client
ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params):
LeaseSetDestination (isPublic, params),
m_Keys (keys), m_DatagramDestination (nullptr)
m_Keys (keys), m_DatagramDestination (nullptr),
m_ReadyChecker(GetService())
{
if (isPublic)
PersistTemporaryKeys ();
@ -654,8 +678,6 @@ namespace client @@ -654,8 +678,6 @@ namespace client
ClientDestination::~ClientDestination ()
{
if (m_DatagramDestination)
delete m_DatagramDestination;
}
bool ClientDestination::Start ()
@ -676,22 +698,44 @@ namespace client @@ -676,22 +698,44 @@ namespace client
{
if (LeaseSetDestination::Stop ())
{
m_ReadyChecker.cancel();
m_StreamingDestination->Stop ();
m_StreamingDestination = nullptr;
for (auto& it: m_StreamingDestinationsByPorts)
it.second->Stop ();
if (m_DatagramDestination)
{
auto d = m_DatagramDestination;
m_DatagramDestination = nullptr;
delete d;
}
return true;
if(m_DatagramDestination)
delete m_DatagramDestination;
m_DatagramDestination = nullptr;
return true;
}
else
return false;
}
void ClientDestination::Ready(ReadyPromise & p)
{
ScheduleCheckForReady(&p);
}
void ClientDestination::ScheduleCheckForReady(ReadyPromise * p)
{
// tick every 100ms
m_ReadyChecker.expires_from_now(boost::posix_time::milliseconds(100));
m_ReadyChecker.async_wait([&, p] (const boost::system::error_code & ecode) {
HandleCheckForReady(ecode, p);
});
}
void ClientDestination::HandleCheckForReady(const boost::system::error_code & ecode, ReadyPromise * p)
{
if(ecode) // error happened
p->set_value(nullptr);
else if(IsReady()) // we are ready
p->set_value(std::shared_ptr<ClientDestination>(this));
else // we are not ready
ScheduleCheckForReady(p);
}
void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len)
{
uint32_t length = bufbe32toh (buf);
@ -796,9 +840,9 @@ namespace client @@ -796,9 +840,9 @@ namespace client
return dest;
}
i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination ()
i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination ()
{
if (!m_DatagramDestination)
if (m_DatagramDestination == nullptr)
m_DatagramDestination = new i2p::datagram::DatagramDestination (GetSharedFromThis ());
return m_DatagramDestination;
}

23
Destination.h

@ -8,6 +8,7 @@ @@ -8,6 +8,7 @@
#include <set>
#include <string>
#include <functional>
#include <future>
#include <boost/asio.hpp>
#include "Identity.h"
#include "TunnelPool.h"
@ -122,6 +123,7 @@ namespace client @@ -122,6 +123,7 @@ namespace client
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work;
mutable std::mutex m_RemoteLeaseSetsMutex;
std::map<i2p::data::IdentHash, std::shared_ptr<i2p::data::LeaseSet> > m_RemoteLeaseSets;
std::map<i2p::data::IdentHash, std::shared_ptr<LeaseSetRequest> > m_LeaseSetRequests;
@ -142,13 +144,19 @@ namespace client @@ -142,13 +144,19 @@ namespace client
class ClientDestination: public LeaseSetDestination
{
public:
// type for informing that a client destination is ready
typedef std::promise<std::shared_ptr<ClientDestination> > ReadyPromise;
ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params = nullptr);
~ClientDestination ();
bool Start ();
bool Stop ();
// informs promise with shared_from_this() when this destination is ready to use
// if cancelled before ready, informs promise with nullptr
void Ready(ReadyPromise & p);
const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; };
void Sign (const uint8_t * buf, int len, uint8_t * signature) const { m_Keys.Sign (buf, len, signature); };
@ -163,8 +171,8 @@ namespace client @@ -163,8 +171,8 @@ namespace client
bool IsAcceptingStreams () const;
// datagram
i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; };
i2p::datagram::DatagramDestination * CreateDatagramDestination ();
i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; };
i2p::datagram::DatagramDestination * CreateDatagramDestination ();
// implements LocalDestination
const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; };
@ -182,6 +190,9 @@ namespace client @@ -182,6 +190,9 @@ namespace client
{ return std::static_pointer_cast<ClientDestination>(shared_from_this ()); }
void PersistTemporaryKeys ();
void ScheduleCheckForReady(ReadyPromise * p);
void HandleCheckForReady(const boost::system::error_code & ecode, ReadyPromise * p);
private:
i2p::data::PrivateKeys m_Keys;
@ -189,8 +200,10 @@ namespace client @@ -189,8 +200,10 @@ namespace client
std::shared_ptr<i2p::stream::StreamingDestination> m_StreamingDestination; // default
std::map<uint16_t, std::shared_ptr<i2p::stream::StreamingDestination> > m_StreamingDestinationsByPorts;
i2p::datagram::DatagramDestination * m_DatagramDestination;
i2p::datagram::DatagramDestination * m_DatagramDestination;
boost::asio::deadline_timer m_ReadyChecker;
public:
// for HTTP only

9
FS.cpp

@ -158,6 +158,13 @@ namespace fs { @@ -158,6 +158,13 @@ namespace fs {
}
void HashedStorage::Traverse(std::vector<std::string> & files) {
Iterate([&files] (const std::string & fname) {
files.push_back(fname);
});
}
void HashedStorage::Iterate(FilenameVisitor v)
{
boost::filesystem::path p(root);
boost::filesystem::recursive_directory_iterator it(p);
boost::filesystem::recursive_directory_iterator end;
@ -166,7 +173,7 @@ namespace fs { @@ -166,7 +173,7 @@ namespace fs {
if (!boost::filesystem::is_regular_file( it->status() ))
continue;
const std::string & t = it->path().string();
files.push_back(t);
v(t);
}
}
} // fs

4
FS.h

@ -13,6 +13,7 @@ @@ -13,6 +13,7 @@
#include <string>
#include <iostream>
#include <sstream>
#include <functional>
namespace i2p {
namespace fs {
@ -43,6 +44,7 @@ namespace fs { @@ -43,6 +44,7 @@ namespace fs {
std::string suffix; /**< suffix of file in storage (extension) */
public:
typedef std::function<void(const std::string &)> FilenameVisitor;
HashedStorage(const char *n, const char *p1, const char *p2, const char *s):
name(n), prefix1(p1), prefix2(p2), suffix(s) {};
@ -58,6 +60,8 @@ namespace fs { @@ -58,6 +60,8 @@ namespace fs {
void Remove(const std::string & ident);
/** find all files in storage and store list in provided vector */
void Traverse(std::vector<std::string> & files);
/** visit every file in this storage with a visitor */
void Iterate(FilenameVisitor v);
};
/** @brief Returns current application name, default 'i2pd' */

2
Garlic.cpp

@ -178,7 +178,7 @@ namespace garlic @@ -178,7 +178,7 @@ namespace garlic
// create message
if (!tagFound) // new session
{
LogPrint (eLogWarning, "Garlic: No tags available, will use ElGamal");
LogPrint (eLogInfo, "Garlic: No tags available, will use ElGamal");
if (!m_Destination)
{
LogPrint (eLogError, "Garlic: Can't use ElGamal for unknown destination");

2
Garlic.h

@ -108,7 +108,7 @@ namespace garlic @@ -108,7 +108,7 @@ namespace garlic
std::shared_ptr<GarlicRoutingPath> GetSharedRoutingPath ();
void SetSharedRoutingPath (std::shared_ptr<GarlicRoutingPath> path);
private:
size_t CreateAESBlock (uint8_t * buf, std::shared_ptr<const I2NPMessage> msg);

3
HTTPServer.cpp

@ -337,7 +337,8 @@ namespace http { @@ -337,7 +337,8 @@ namespace http {
s << "<td>" << it->GetWindowSize () << "</td>";
s << "<td>" << (int)it->GetStatus () << "</td>";
s << "</tr><br>\r\n" << std::endl;
}
}
s << "</table>";
}
}

7
I2PService.cpp

@ -32,7 +32,12 @@ namespace client @@ -32,7 +32,12 @@ namespace client
}
}
TCPIPPipe::TCPIPPipe(I2PService * owner, std::shared_ptr<boost::asio::ip::tcp::socket> upstream, std::shared_ptr<boost::asio::ip::tcp::socket> downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream) {}
TCPIPPipe::TCPIPPipe(I2PService * owner, std::shared_ptr<boost::asio::ip::tcp::socket> upstream, std::shared_ptr<boost::asio::ip::tcp::socket> downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream)
{
boost::asio::socket_base::receive_buffer_size option(TCP_IP_PIPE_BUFFER_SIZE);
upstream->set_option(option);
downstream->set_option(option);
}
TCPIPPipe::~TCPIPPipe()
{

258
I2PTunnel.cpp

@ -9,6 +9,17 @@ namespace i2p @@ -9,6 +9,17 @@ namespace i2p
{
namespace client
{
/** set standard socket options */
static void I2PTunnelSetSocketOptions(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
{
if (socket && socket->is_open())
{
boost::asio::socket_base::receive_buffer_size option(I2P_TUNNEL_CONNECTION_BUFFER_SIZE);
socket->set_option(option);
}
}
I2PTunnelConnection::I2PTunnelConnection (I2PService * owner, std::shared_ptr<boost::asio::ip::tcp::socket> socket,
std::shared_ptr<const i2p::data::LeaseSet> leaseSet, int port):
I2PServiceHandler(owner), m_Socket (socket), m_RemoteEndpoint (socket->remote_endpoint ()),
@ -34,7 +45,7 @@ namespace client @@ -34,7 +45,7 @@ namespace client
I2PTunnelConnection::~I2PTunnelConnection ()
{
}
void I2PTunnelConnection::I2PConnect (const uint8_t * msg, size_t len)
{
if (m_Stream)
@ -50,6 +61,7 @@ namespace client @@ -50,6 +61,7 @@ namespace client
void I2PTunnelConnection::Connect ()
{
I2PTunnelSetSocketOptions(m_Socket);
if (m_Socket) {
#ifdef __linux__
// bind to 127.x.x.x address
@ -67,7 +79,7 @@ namespace client @@ -67,7 +79,7 @@ namespace client
m_Socket->bind (boost::asio::ip::tcp::endpoint (ourIP, 0));
}
#endif
m_Socket->async_connect (m_RemoteEndpoint, std::bind (&I2PTunnelConnection::HandleConnect,
m_Socket->async_connect (m_RemoteEndpoint, std::bind (&I2PTunnelConnection::HandleConnect,
shared_from_this (), std::placeholders::_1));
}
}
@ -401,7 +413,7 @@ namespace client @@ -401,7 +413,7 @@ namespace client
{
m_PortDestination = localDestination->CreateStreamingDestination (inport > 0 ? inport : port, gzip);
}
void I2PServerTunnel::Start ()
{
m_Endpoint.port (m_Port);
@ -516,5 +528,241 @@ namespace client @@ -516,5 +528,241 @@ namespace client
conn->Connect ();
}
}
}
void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
std::lock_guard<std::mutex> lock(m_SessionsMutex);
auto session = ObtainUDPSession(from, toPort, fromPort);
session->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint);
session->LastActivity = i2p::util::GetMillisecondsSinceEpoch();
}
void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) {
std::lock_guard<std::mutex> lock(m_SessionsMutex);
uint64_t now = i2p::util::GetMillisecondsSinceEpoch();
std::remove_if(m_Sessions.begin(), m_Sessions.end(), [now, delta](const UDPSession * u) -> bool {
return now - u->LastActivity >= delta;
});
}
UDPSession * I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort)
{
auto ih = from.GetIdentHash();
for ( UDPSession * s : m_Sessions )
{
if ( s->Identity == ih)
{
/** found existing session */
LogPrint(eLogDebug, "UDPServer: found session ", s->IPSocket.local_endpoint(), " ", ih.ToBase32());
return s;
}
}
/** create new udp session */
boost::asio::ip::udp::endpoint ep(m_LocalAddress, 0);
m_Sessions.push_back(new UDPSession(ep, m_LocalDest, m_RemoteEndpoint, ih, localPort, remotePort));
return m_Sessions.back();
}
UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint,
const std::shared_ptr<i2p::client::ClientDestination> & localDestination,
boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash to,
uint16_t ourPort, uint16_t theirPort) :
m_Destination(localDestination->GetDatagramDestination()),
m_Service(localDestination->GetService()),
IPSocket(localDestination->GetService(), localEndpoint),
Identity(to),
SendEndpoint(endpoint),
LastActivity(i2p::util::GetMillisecondsSinceEpoch()),
LocalPort(ourPort),
RemotePort(theirPort)
{
Receive();
}
void UDPSession::Receive() {
LogPrint(eLogDebug, "UDPSession: Receive");
IPSocket.async_receive_from(boost::asio::buffer(m_Buffer, I2P_UDP_MAX_MTU),
FromEndpoint, std::bind(&UDPSession::HandleReceived, this, std::placeholders::_1, std::placeholders::_2));
}
void UDPSession::HandleReceived(const boost::system::error_code & ecode, std::size_t len)
{
if(!ecode)
{
LogPrint(eLogDebug, "UDPSession: forward ", len, "B from ", FromEndpoint);
LastActivity = i2p::util::GetMillisecondsSinceEpoch();
uint8_t * data = new uint8_t[len];
memcpy(data, m_Buffer, len);
m_Service.post([&,len, data] () {
m_Destination->SendDatagramTo(data, len, Identity, 0, 0);
delete [] data;
});
Receive();
} else {
LogPrint(eLogError, "UDPSession: ", ecode.message());
}
}
I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr<i2p::client::ClientDestination> localDestination,
const boost::asio::ip::address& localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) :
m_Name(name),
LocalPort(port),
m_LocalAddress(localAddress),
m_RemoteEndpoint(forwardTo)
{
m_LocalDest = localDestination;
m_LocalDest->Start();
auto dgram = m_LocalDest->CreateDatagramDestination();
dgram->SetReceiver(std::bind(&I2PUDPServerTunnel::HandleRecvFromI2P, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
}
I2PUDPServerTunnel::~I2PUDPServerTunnel()
{
auto dgram = m_LocalDest->GetDatagramDestination();
if (dgram) dgram->ResetReceiver();
LogPrint(eLogInfo, "UDPServer: done");
}
void I2PUDPServerTunnel::Start() {
m_LocalDest->Start();
}
std::vector<std::shared_ptr<DatagramSessionInfo> > I2PUDPServerTunnel::GetSessions()
{
std::vector<std::shared_ptr<DatagramSessionInfo> > sessions;
std::lock_guard<std::mutex> lock(m_SessionsMutex);
for ( UDPSession * s : m_Sessions )
{
if (!s->m_Destination) continue;
auto info = s->m_Destination->GetInfoForRemote(s->Identity);
if(!info) continue;
auto sinfo = std::make_shared<DatagramSessionInfo>();
sinfo->Name = m_Name;
sinfo->LocalIdent = std::make_shared<i2p::data::IdentHash>(m_LocalDest->GetIdentHash().data());
sinfo->RemoteIdent = std::make_shared<i2p::data::IdentHash>(s->Identity.data());
sinfo->CurrentIBGW = info->IBGW;
sinfo->CurrentOBEP = info->OBEP;
sessions.push_back(sinfo);
}
return sessions;
}
I2PUDPClientTunnel::I2PUDPClientTunnel(const std::string & name, const std::string &remoteDest,
boost::asio::ip::udp::endpoint localEndpoint,
std::shared_ptr<i2p::client::ClientDestination> localDestination,
uint16_t remotePort) :
m_Name(name),
m_Session(nullptr),
m_RemoteDest(remoteDest),
m_LocalDest(localDestination),
m_LocalEndpoint(localEndpoint),
m_RemoteIdent(nullptr),
m_ResolveThread(nullptr),
LocalPort(localEndpoint.port()),
RemotePort(remotePort),
m_cancel_resolve(false)
{
auto dgram = m_LocalDest->CreateDatagramDestination();
dgram->SetReceiver(std::bind(&I2PUDPClientTunnel::HandleRecvFromI2P, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5));
}
void I2PUDPClientTunnel::Start() {
m_LocalDest->Start();
if (m_ResolveThread == nullptr)
m_ResolveThread = new std::thread(std::bind(&I2PUDPClientTunnel::TryResolving, this));
}
std::vector<std::shared_ptr<DatagramSessionInfo> > I2PUDPClientTunnel::GetSessions()
{
std::vector<std::shared_ptr<DatagramSessionInfo> > infos;
if(m_Session && m_LocalDest)
{
auto s = m_Session;
if (s->m_Destination)
{
auto info = m_Session->m_Destination->GetInfoForRemote(s->Identity);
if(info)
{
auto sinfo = std::make_shared<DatagramSessionInfo>();
sinfo->Name = m_Name;
sinfo->LocalIdent = std::make_shared<i2p::data::IdentHash>(m_LocalDest->GetIdentHash().data());
sinfo->RemoteIdent = std::make_shared<i2p::data::IdentHash>(s->Identity.data());
sinfo->CurrentIBGW = info->IBGW;
sinfo->CurrentOBEP = info->OBEP;
infos.push_back(sinfo);
}
}
}
return infos;
}
void I2PUDPClientTunnel::TryResolving() {
LogPrint(eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest);
m_RemoteIdent = new i2p::data::IdentHash;
m_RemoteIdent->Fill(0);
while(!context.GetAddressBook().GetIdentHash(m_RemoteDest, *m_RemoteIdent) && !m_cancel_resolve)
{
LogPrint(eLogWarning, "UDP Tunnel: failed to lookup ", m_RemoteDest);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
if(m_cancel_resolve)
{
LogPrint(eLogError, "UDP Tunnel: lookup of ", m_RemoteDest, " was cancelled");
return;
}
LogPrint(eLogInfo, "UDP Tunnel: resolved ", m_RemoteDest, " to ", m_RemoteIdent->ToBase32());
// delete existing session
if(m_Session) delete m_Session;
boost::asio::ip::udp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 0);
m_Session = new UDPSession(m_LocalEndpoint, m_LocalDest, ep, *m_RemoteIdent, LocalPort, RemotePort);
}
void I2PUDPClientTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
if(m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent)
{
// address match
if(m_Session)
{
// tell session
LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32());
m_Session->IPSocket.send_to(boost::asio::buffer(buf, len), m_Session->FromEndpoint);
}
else
LogPrint(eLogWarning, "UDP Client: no session");
}
else
LogPrint(eLogWarning, "UDP Client: unwarrented traffic from ", from.GetIdentHash().ToBase32());
}
I2PUDPClientTunnel::~I2PUDPClientTunnel() {
auto dgram = m_LocalDest->GetDatagramDestination();
if (dgram) dgram->ResetReceiver();
if (m_Session) delete m_Session;
m_cancel_resolve = true;
if(m_ResolveThread)
{
m_ResolveThread->join();
delete m_ResolveThread;
m_ResolveThread = nullptr;
}
if (m_RemoteIdent) delete m_RemoteIdent;
}
}
}

116
I2PTunnel.h

@ -9,6 +9,7 @@ @@ -9,6 +9,7 @@
#include <boost/asio.hpp>
#include "Identity.h"
#include "Destination.h"
#include "Datagram.h"
#include "Streaming.h"
#include "I2PService.h"
@ -128,8 +129,113 @@ namespace client @@ -128,8 +129,113 @@ namespace client
std::string m_Name, m_Destination;
const i2p::data::IdentHash * m_DestinationIdentHash;
int m_DestinationPort;
};
};
/** 2 minute timeout for udp sessions */
const uint64_t I2P_UDP_SESSION_TIMEOUT = 1000 * 60 * 2;
/** max size for i2p udp */
const size_t I2P_UDP_MAX_MTU = i2p::datagram::MAX_DATAGRAM_SIZE;
struct UDPSession
{
i2p::datagram::DatagramDestination * m_Destination;
boost::asio::io_service & m_Service;
boost::asio::ip::udp::socket IPSocket;
i2p::data::IdentHash Identity;
boost::asio::ip::udp::endpoint FromEndpoint;
boost::asio::ip::udp::endpoint SendEndpoint;
uint64_t LastActivity;
uint16_t LocalPort;
uint16_t RemotePort;
uint8_t m_Buffer[I2P_UDP_MAX_MTU];
UDPSession(boost::asio::ip::udp::endpoint localEndpoint,
const std::shared_ptr<i2p::client::ClientDestination> & localDestination,
boost::asio::ip::udp::endpoint remote, const i2p::data::IdentHash ident,
uint16_t ourPort, uint16_t theirPort);
void HandleReceived(const boost::system::error_code & ecode, std::size_t len);
void Receive();
};
/** read only info about a datagram session */
struct DatagramSessionInfo
{
/** the name of this forward */
std::string Name;
/** ident hash of local destination */
std::shared_ptr<const i2p::data::IdentHash> LocalIdent;
/** ident hash of remote destination */
std::shared_ptr<const i2p::data::IdentHash> RemoteIdent;
/** ident hash of IBGW in use currently in this session or nullptr if none is set */
std::shared_ptr<const i2p::data::IdentHash> CurrentIBGW;
/** ident hash of OBEP in use for this session or nullptr if none is set */
std::shared_ptr<const i2p::data::IdentHash> CurrentOBEP;
/** i2p router's udp endpoint */
boost::asio::ip::udp::endpoint LocalEndpoint;
/** client's udp endpoint */
boost::asio::ip::udp::endpoint RemoteEndpoint;
/** how long has this converstation been idle in ms */
uint64_t idle;
};
/** server side udp tunnel, many i2p inbound to 1 ip outbound */
class I2PUDPServerTunnel
{
public:
I2PUDPServerTunnel(const std::string & name,
std::shared_ptr<i2p::client::ClientDestination> localDestination,
const boost::asio::ip::address & localAddress,
boost::asio::ip::udp::endpoint forwardTo, uint16_t port);
~I2PUDPServerTunnel();
/** expire stale udp conversations */
void ExpireStale(const uint64_t delta=I2P_UDP_SESSION_TIMEOUT);
void Start();
const char * GetName() const { return m_Name.c_str(); }
std::vector<std::shared_ptr<DatagramSessionInfo> > GetSessions();
private:
void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);
UDPSession * ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort);
private:
const std::string m_Name;
const uint16_t LocalPort;
boost::asio::ip::address m_LocalAddress;
boost::asio::ip::udp::endpoint m_RemoteEndpoint;
std::mutex m_SessionsMutex;
std::vector<UDPSession*> m_Sessions;
std::shared_ptr<i2p::client::ClientDestination> m_LocalDest;
};
class I2PUDPClientTunnel
{
public:
I2PUDPClientTunnel(const std::string & name, const std::string &remoteDest,
boost::asio::ip::udp::endpoint localEndpoint, std::shared_ptr<i2p::client::ClientDestination> localDestination,
uint16_t remotePort);
~I2PUDPClientTunnel();
void Start();
const char * GetName() const { return m_Name.c_str(); }
std::vector<std::shared_ptr<DatagramSessionInfo> > GetSessions();
bool IsLocalDestination(const i2p::data::IdentHash & destination) const { return destination == m_LocalDest->GetIdentHash(); }
private:
void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);
void TryResolving();
const std::string m_Name;
UDPSession * m_Session;
const std::string m_RemoteDest;
std::shared_ptr<i2p::client::ClientDestination> m_LocalDest;
const boost::asio::ip::udp::endpoint m_LocalEndpoint;
i2p::data::IdentHash * m_RemoteIdent;
std::thread * m_ResolveThread;
uint16_t LocalPort;
uint16_t RemotePort;
bool m_cancel_resolve;
};
class I2PServerTunnel: public I2PService
{
public:
@ -148,8 +254,10 @@ namespace client @@ -148,8 +254,10 @@ namespace client
const boost::asio::ip::tcp::endpoint& GetEndpoint () const { return m_Endpoint; }
const char* GetName() { return m_Name.c_str (); }
private:
void SetMaxConnsPerMinute(const uint32_t conns) { m_PortDestination->SetMaxConnsPerMinute(conns); }
private:
void HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it,
std::shared_ptr<boost::asio::ip::tcp::resolver> resolver);
@ -165,7 +273,7 @@ namespace client @@ -165,7 +273,7 @@ namespace client
boost::asio::ip::tcp::endpoint m_Endpoint;
std::shared_ptr<i2p::stream::StreamingDestination> m_PortDestination;
std::set<i2p::data::IdentHash> m_AccessList;
bool m_IsAccessList;
bool m_IsAccessList;
};
class I2PServerTunnelHTTP: public I2PServerTunnel

15
Identity.cpp

@ -200,7 +200,9 @@ namespace data @@ -200,7 +200,9 @@ namespace data
}
memcpy (&m_StandardIdentity, buf, DEFAULT_IDENTITY_SIZE);
delete[] m_ExtendedBuffer; m_ExtendedBuffer = nullptr;
if(m_ExtendedBuffer) delete[] m_ExtendedBuffer;
m_ExtendedBuffer = nullptr;
m_ExtendedLen = bufbe16toh (m_StandardIdentity.certificate + 1);
if (m_ExtendedLen)
{
@ -410,6 +412,7 @@ namespace data @@ -410,6 +412,7 @@ namespace data
memcpy (m_PrivateKey, buf + ret, 256); // private key always 256
ret += 256;
size_t signingPrivateKeySize = m_Public->GetSigningPrivateKeyLen ();
if(signingPrivateKeySize + ret > len) return 0; // overflow
memcpy (m_SigningPrivateKey, buf + ret, signingPrivateKeySize);
ret += signingPrivateKeySize;
m_Signer = nullptr;
@ -422,7 +425,8 @@ namespace data @@ -422,7 +425,8 @@ namespace data
size_t ret = m_Public->ToBuffer (buf, len);
memcpy (buf + ret, m_PrivateKey, 256); // private key always 256
ret += 256;
size_t signingPrivateKeySize = m_Public->GetSigningPrivateKeyLen ();
size_t signingPrivateKeySize = m_Public->GetSigningPrivateKeyLen ();
if(ret + signingPrivateKeySize > len) return 0; // overflow
memcpy (buf + ret, m_SigningPrivateKey, signingPrivateKeySize);
ret += signingPrivateKeySize;
return ret;
@ -452,11 +456,12 @@ namespace data @@ -452,11 +456,12 @@ namespace data
void PrivateKeys::Sign (const uint8_t * buf, int len, uint8_t * signature) const
{
if (m_Signer)
m_Signer->Sign (buf, len, signature);
if (!m_Signer)
CreateSigner();
m_Signer->Sign (buf, len, signature);
}
void PrivateKeys::CreateSigner ()
void PrivateKeys::CreateSigner () const
{
switch (m_Public->GetSigningKeyType ())
{

6
Identity.h

@ -92,6 +92,8 @@ namespace data @@ -92,6 +92,8 @@ namespace data
CryptoKeyType GetCryptoKeyType () const;
void DropVerifier () const; // to save memory
bool operator == (const IdentityEx & other) const { return GetIdentHash() == other.GetIdentHash(); }
private:
void CreateVerifier () const;
@ -133,14 +135,14 @@ namespace data @@ -133,14 +135,14 @@ namespace data
private:
void CreateSigner ();
void CreateSigner () const;
private:
std::shared_ptr<IdentityEx> m_Public;
uint8_t m_PrivateKey[256];
uint8_t m_SigningPrivateKey[1024]; // assume private key doesn't exceed 1024 bytes
std::unique_ptr<i2p::crypto::Signer> m_Signer;
mutable std::unique_ptr<i2p::crypto::Signer> m_Signer;
};
// kademlia

19
LeaseSet.cpp

@ -162,8 +162,21 @@ namespace data @@ -162,8 +162,21 @@ namespace data
{
return ExtractTimestamp (buf, len) > ExtractTimestamp (m_Buffer, m_BufferLen);
}
const std::vector<std::shared_ptr<const Lease> > LeaseSet::GetNonExpiredLeases (bool withThreshold) const
bool LeaseSet::ExpiresSoon(const uint64_t dlt, const uint64_t fudge) const
{
auto now = i2p::util::GetMillisecondsSinceEpoch ();
if (fudge) now += rand() % fudge;
if (now >= m_ExpirationTime) return true;
return m_ExpirationTime - now <= dlt;
}
const std::vector<std::shared_ptr<const Lease> > LeaseSet::GetNonExpiredLeases (bool withThreshold) const
{
return GetNonExpiredLeasesExcluding( [] (const Lease & l) -> bool { return false; }, withThreshold);
}
const std::vector<std::shared_ptr<const Lease> > LeaseSet::GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold) const
{
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
std::vector<std::shared_ptr<const Lease> > leases;
@ -174,7 +187,7 @@ namespace data @@ -174,7 +187,7 @@ namespace data
endDate += LEASE_ENDDATE_THRESHOLD;
else
endDate -= LEASE_ENDDATE_THRESHOLD;
if (ts < endDate)
if (ts < endDate && !exclude(*it))
leases.push_back (it);
}
return leases;

13
LeaseSet.h

@ -7,6 +7,7 @@ @@ -7,6 +7,7 @@
#include <set>
#include <memory>
#include "Identity.h"
#include "Timestamp.h"
namespace i2p
{
@ -24,7 +25,13 @@ namespace data @@ -24,7 +25,13 @@ namespace data
IdentHash tunnelGateway;
uint32_t tunnelID;
uint64_t endDate; // 0 means invalid
bool isUpdated; // trasient
bool isUpdated; // trasient
/* return true if this lease expires within t millisecond + fudge factor */
bool ExpiresWithin( const uint64_t t, const uint64_t fudge = 1000 ) const {
auto expire = i2p::util::GetMillisecondsSinceEpoch ();
if(fudge) expire += rand() % fudge;
return endDate - expire >= t;
}
};
struct LeaseCmp
@ -38,6 +45,8 @@ namespace data @@ -38,6 +45,8 @@ namespace data
};
};
typedef std::function<bool(const Lease & l)> LeaseInspectFunc;
const size_t MAX_LS_BUFFER_SIZE = 3072;
const size_t LEASE_SIZE = 44; // 32 + 4 + 8
const uint8_t MAX_NUM_LEASES = 16;
@ -56,10 +65,12 @@ namespace data @@ -56,10 +65,12 @@ namespace data
size_t GetBufferLen () const { return m_BufferLen; };
bool IsValid () const { return m_IsValid; };
const std::vector<std::shared_ptr<const Lease> > GetNonExpiredLeases (bool withThreshold = true) const;
const std::vector<std::shared_ptr<const Lease> > GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold = true) const;
bool HasExpiredLeases () const;
bool IsExpired () const;
bool IsEmpty () const { return m_Leases.empty (); };
uint64_t GetExpirationTime () const { return m_ExpirationTime; };
bool ExpiresSoon(const uint64_t dlt=1000 * 5, const uint64_t fudge = 0) const ;
bool operator== (const LeaseSet& other) const
{ return m_BufferLen == other.m_BufferLen && !memcmp (m_Buffer, other.m_Buffer, m_BufferLen); };

22
Log.h

@ -40,8 +40,20 @@ enum LogType { @@ -40,8 +40,20 @@ enum LogType {
#endif
};
#ifdef _WIN32
const char LOG_COLOR_ERROR[] = "";
const char LOG_COLOR_WARNING[] = "";
const char LOG_COLOR_RESET[] = "";
#else
const char LOG_COLOR_ERROR[] = "\033[1;31m";
const char LOG_COLOR_WARNING[] = "\033[1;33m";
const char LOG_COLOR_RESET[] = "\033[0m";
#endif
namespace i2p {
namespace log {
struct LogMsg; /* forward declaration */
class Log
@ -177,8 +189,16 @@ void LogPrint (LogLevel level, TArgs... args) @@ -177,8 +189,16 @@ void LogPrint (LogLevel level, TArgs... args)
// fold message to single string
std::stringstream ss("");
LogPrint (ss, args ...);
if(level == eLogError) // if log level is ERROR color log message red
ss << LOG_COLOR_ERROR;
else if (level == eLogWarning) // if log level is WARN color log message yellow
ss << LOG_COLOR_WARNING;
LogPrint (ss, args ...);
// reset color
ss << LOG_COLOR_RESET;
auto msg = std::make_shared<i2p::log::LogMsg>(level, std::time(nullptr), ss.str());
msg->tid = std::this_thread::get_id();
log.Append(msg);

11
Makefile.linux

@ -32,9 +32,14 @@ ifeq ($(USE_STATIC),yes) @@ -32,9 +32,14 @@ ifeq ($(USE_STATIC),yes)
# Using 'getaddrinfo' in statically linked applications requires at runtime
# the shared libraries from the glibc version used for linking
LIBDIR := /usr/lib
LDLIBS = -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options
LDLIBS += -lssl -lcrypto -lz -ldl -lpthread -lrt
LDLIBS += -static-libstdc++ -static-libgcc -static
LDLIBS = $(LIBDIR)/libboost_system.a
LDLIBS += $(LIBDIR)/libboost_date_time.a
LDLIBS += $(LIBDIR)/libboost_filesystem.a
LDLIBS += $(LIBDIR)/libboost_program_options.a
LDLIBS += $(LIBDIR)/libssl.a
LDLIBS += $(LIBDIR)/libcrypto.a
LDLIBS += $(LIBDIR)/libz.a
LDLIBS += -lpthread -static-libstdc++ -static-libgcc -lrt
USE_AESNI := no
else
LDLIBS = -lcrypto -lssl -lz -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options -lpthread

2
NTCPSession.cpp

@ -747,6 +747,7 @@ namespace transport @@ -747,6 +747,7 @@ namespace transport
auto& addresses = context.GetRouterInfo ().GetAddresses ();
for (const auto& address: addresses)
{
if (!address) continue;
if (address->transportStyle == i2p::data::RouterInfo::eTransportNTCP)
{
if (address->host.is_v4())
@ -844,6 +845,7 @@ namespace transport @@ -844,6 +845,7 @@ namespace transport
if (it != m_NTCPSessions.end ())
{
LogPrint (eLogWarning, "NTCP: session to ", ident.ToBase64 (), " already exists");
session->Terminate();
return false;
}
m_NTCPSessions.insert (std::pair<i2p::data::IdentHash, std::shared_ptr<NTCPSession> >(ident, session));

67
NetDb.cpp

@ -34,11 +34,11 @@ namespace data @@ -34,11 +34,11 @@ namespace data
}
void NetDb::Start ()
{
{
m_Storage.SetPlace(i2p::fs::GetDataDir());
m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64);
InitProfilesStorage ();
m_Families.LoadCertificates ();
m_Families.LoadCertificates ();
Load ();
if (m_RouterInfos.size () < 25) // reseed if # of router less than 50
Reseed ();
@ -68,7 +68,7 @@ namespace data @@ -68,7 +68,7 @@ namespace data
m_Requests.Stop ();
}
}
void NetDb::Run ()
{
uint32_t lastSave = 0, lastPublish = 0, lastExploratory = 0, lastManageRequest = 0, lastDestinationCleanup = 0;
@ -329,6 +329,67 @@ namespace data @@ -329,6 +329,67 @@ namespace data
for ( auto & entry : m_LeaseSets)
v(entry.first, entry.second);
}
void NetDb::VisitStoredRouterInfos(RouterInfoVisitor v)
{
m_Storage.Iterate([v] (const std::string & filename) {
auto ri = std::make_shared<i2p::data::RouterInfo>(filename);
v(ri);
});
}
void NetDb::VisitRouterInfos(RouterInfoVisitor v)
{
std::unique_lock<std::mutex> lock(m_RouterInfosMutex);
for ( const auto & item : m_RouterInfos )
v(item.second);
}
size_t NetDb::VisitRandomRouterInfos(RouterInfoFilter filter, RouterInfoVisitor v, size_t n)
{
std::vector<std::shared_ptr<const RouterInfo> > found;
const size_t max_iters_per_cyle = 3;
size_t iters = max_iters_per_cyle;
while(n > 0)
{
std::unique_lock<std::mutex> lock(m_RouterInfosMutex);
uint32_t idx = rand () % m_RouterInfos.size ();
uint32_t i = 0;
for (const auto & it : m_RouterInfos) {
if(i >= idx) // are we at the random start point?
{
// yes, check if we want this one
if(filter(it.second))
{
// we have a match
--n;
found.push_back(it.second);
// reset max iterations per cycle
iters = max_iters_per_cyle;
break;
}
}
else // not there yet
++i;
}
// we have enough
if(n == 0) break;
--iters;
// have we tried enough this cycle ?
if(!iters) {
// yes let's try the next cycle
--n;
iters = max_iters_per_cyle;
}
}
// visit the ones we found
size_t visited = 0;
for(const auto & ri : found ) {
v(ri);
++visited;
}
return visited;
}
void NetDb::Load ()
{

20
NetDb.h

@ -8,6 +8,7 @@ @@ -8,6 +8,7 @@
#include <string>
#include <thread>
#include <mutex>
#include <future>
#include "Base.h"
#include "Gzip.h"
@ -35,7 +36,13 @@ namespace data @@ -35,7 +36,13 @@ namespace data
/** function for visiting a leaseset stored in a floodfill */
typedef std::function<void(const IdentHash, std::shared_ptr<LeaseSet>)> LeaseSetVisitor;
/** function for visiting a router info we have locally */
typedef std::function<void(std::shared_ptr<const i2p::data::RouterInfo>)> RouterInfoVisitor;
/** function for visiting a router info and determining if we want to use it */
typedef std::function<bool(std::shared_ptr<const i2p::data::RouterInfo>)> RouterInfoFilter;
class NetDb
{
public:
@ -45,7 +52,7 @@ namespace data @@ -45,7 +52,7 @@ namespace data
void Start ();
void Stop ();
bool AddRouterInfo (const uint8_t * buf, int len);
bool AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len);
bool AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len, std::shared_ptr<i2p::tunnel::InboundTunnel> from);
@ -86,7 +93,12 @@ namespace data @@ -86,7 +93,12 @@ namespace data
/** visit all lease sets we currently store */
void VisitLeaseSets(LeaseSetVisitor v);
/** visit all router infos we have currently on disk, usually insanely expensive, does not access in memory RI */
void VisitStoredRouterInfos(RouterInfoVisitor v);
/** visit all router infos we have loaded in memory, cheaper than VisitLocalRouterInfos but locks access while visiting */
void VisitRouterInfos(RouterInfoVisitor v);
/** visit N random router that match using filter, then visit them with a visitor, return number of RouterInfos that were visited */
size_t VisitRandomRouterInfos(RouterInfoFilter f, RouterInfoVisitor v, size_t n);
private:
void Load ();
@ -103,7 +115,7 @@ namespace data @@ -103,7 +115,7 @@ namespace data
std::shared_ptr<const RouterInfo> GetRandomRouter (Filter filter) const;
private:
mutable std::mutex m_LeaseSetsMutex;
std::map<IdentHash, std::shared_ptr<LeaseSet> > m_LeaseSets;
mutable std::mutex m_RouterInfosMutex;

96
Streaming.cpp

@ -269,9 +269,14 @@ namespace stream @@ -269,9 +269,14 @@ namespace stream
}
auto sentPacket = *it;
uint64_t rtt = ts - sentPacket->sendTime;
if(ts < sentPacket->sendTime)
{
LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime);
rtt = 1;
}
m_RTT = (m_RTT*seqn + rtt)/(seqn + 1);
m_RTO = m_RTT*1.5; // TODO: implement it better
LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt);
LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime);
m_SentPackets.erase (it++);
delete sentPacket;
acknowledged = true;
@ -803,7 +808,10 @@ namespace stream @@ -803,7 +808,10 @@ namespace stream
StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort, bool gzip):
m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip),
m_PendingIncomingTimer (m_Owner->GetService ())
m_PendingIncomingTimer (m_Owner->GetService ()),
m_ConnTrackTimer(m_Owner->GetService()),
m_ConnsPerMinute(DEFAULT_MAX_CONNS_PER_MIN),
m_LastBanClear(i2p::util::GetMillisecondsSinceEpoch())
{
}
@ -818,17 +826,23 @@ namespace stream @@ -818,17 +826,23 @@ namespace stream
}
void StreamingDestination::Start ()
{
{
ScheduleConnTrack();
}
void StreamingDestination::Stop ()
{
ResetAcceptor ();
m_PendingIncomingTimer.cancel ();
m_ConnTrackTimer.cancel();
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams.clear ();
}
}
{
std::unique_lock<std::mutex> l(m_ConnsMutex);
m_Conns.clear ();
}
}
void StreamingDestination::HandleNextPacket (Packet * packet)
@ -852,6 +866,18 @@ namespace stream @@ -852,6 +866,18 @@ namespace stream
auto incomingStream = CreateNewIncomingStream ();
uint32_t receiveStreamID = packet->GetReceiveStreamID ();
incomingStream->HandleNextPacket (packet); // SYN
auto ident = incomingStream->GetRemoteIdentity();
if(ident)
{
auto ih = ident->GetIdentHash();
if(DropNewStream(ih))
{
// drop
LogPrint(eLogWarning, "Streaming: Dropping connection, too many inbound streams from ", ih.ToBase32());
incomingStream->Terminate();
return;
}
}
// handle saved packets if any
{
auto it = m_SavedPackets.find (receiveStreamID);
@ -862,7 +888,7 @@ namespace stream @@ -862,7 +888,7 @@ namespace stream
incomingStream->HandleNextPacket (it1);
m_SavedPackets.erase (it);
}
}
}
// accept
if (m_Acceptor != nullptr)
m_Acceptor (incomingStream);
@ -1015,6 +1041,64 @@ namespace stream @@ -1015,6 +1041,64 @@ namespace stream
else
msg = nullptr;
return msg;
}
}
void StreamingDestination::SetMaxConnsPerMinute(const uint32_t conns)
{
m_ConnsPerMinute = conns;
LogPrint(eLogDebug, "Streaming: Set max conns per minute per destination to ", conns);
}
bool StreamingDestination::DropNewStream(const i2p::data::IdentHash & ih)
{
std::lock_guard<std::mutex> lock(m_ConnsMutex);
if (m_Banned.size() > MAX_BANNED_CONNS) return true; // overload
auto end = std::end(m_Banned);
if ( std::find(std::begin(m_Banned), end, ih) != end) return true; // already banned
auto itr = m_Conns.find(ih);
if (itr == m_Conns.end())
m_Conns[ih] = 0;
m_Conns[ih] += 1;
bool ban = m_Conns[ih] >= m_ConnsPerMinute;
if (ban)
{
m_Banned.push_back(ih);
m_Conns.erase(ih);
LogPrint(eLogWarning, "Streaming: ban ", ih.ToBase32());
}
return ban;
}
void StreamingDestination::HandleConnTrack(const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
{ // acquire lock
std::lock_guard<std::mutex> lock(m_ConnsMutex);
// clear conn tracking
m_Conns.clear();
// check for ban clear
auto ts = i2p::util::GetMillisecondsSinceEpoch();
if (ts - m_LastBanClear >= DEFAULT_BAN_INTERVAL)
{
// clear bans
m_Banned.clear();
m_LastBanClear = ts;
}
}
// reschedule timer
ScheduleConnTrack();
}
}
void StreamingDestination::ScheduleConnTrack()
{
m_ConnTrackTimer.expires_from_now (boost::posix_time::seconds(60));
m_ConnTrackTimer.async_wait (
std::bind (&StreamingDestination::HandleConnTrack,
shared_from_this (), std::placeholders::_1));
}
}
}

42
Streaming.h

@ -51,6 +51,22 @@ namespace stream @@ -51,6 +51,22 @@ namespace stream
const int INITIAL_RTO = 9000; // in milliseconds
const size_t MAX_PENDING_INCOMING_BACKLOG = 128;
const int PENDING_INCOMING_TIMEOUT = 10; // in seconds
/** i2cp option for limiting inbound stremaing connections */
const char I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN[] = "maxconns";
/** default maximum connections attempts per minute per destination */
const uint32_t DEFAULT_MAX_CONNS_PER_MIN = 600;
/**
* max banned destinations per local destination
* TODO: make configurable
*/
const uint16_t MAX_BANNED_CONNS = 9999;
/**
* length of a ban in ms
* TODO: make configurable
*/
const uint64_t DEFAULT_BAN_INTERVAL = 60 * 60 * 1000;
struct Packet
{
@ -134,10 +150,11 @@ namespace stream @@ -134,10 +150,11 @@ namespace stream
size_t GetSendBufferSize () const { return m_SendBuffer.rdbuf ()->in_avail (); };
int GetWindowSize () const { return m_WindowSize; };
int GetRTT () const { return m_RTT; };
private:
/** don't call me */
void Terminate ();
private:
void SendBuffer ();
void SendQuickAck ();
@ -210,12 +227,22 @@ namespace stream @@ -210,12 +227,22 @@ namespace stream
void HandleDataMessagePayload (const uint8_t * buf, size_t len);
std::shared_ptr<I2NPMessage> CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort);
/** set max connections per minute per destination */
void SetMaxConnsPerMinute(const uint32_t conns);
private:
void HandleNextPacket (Packet * packet);
std::shared_ptr<Stream> CreateNewIncomingStream ();
void HandlePendingIncomingTimer (const boost::system::error_code& ecode);
/** handle cleaning up connection tracking for ratelimits */
void HandleConnTrack(const boost::system::error_code& ecode);
bool DropNewStream(const i2p::data::IdentHash & ident);
void ScheduleConnTrack();
private:
std::shared_ptr<i2p::client::ClientDestination> m_Owner;
@ -227,7 +254,16 @@ namespace stream @@ -227,7 +254,16 @@ namespace stream
std::list<std::shared_ptr<Stream> > m_PendingIncomingStreams;
boost::asio::deadline_timer m_PendingIncomingTimer;
std::map<uint32_t, std::list<Packet *> > m_SavedPackets; // receiveStreamID->packets, arrived before SYN
std::mutex m_ConnsMutex;
/** how many connections per minute did each identity have */
std::map<i2p::data::IdentHash, uint32_t> m_Conns;
boost::asio::deadline_timer m_ConnTrackTimer;
uint32_t m_ConnsPerMinute;
/** banned identities */
std::vector<i2p::data::IdentHash> m_Banned;
uint64_t m_LastBanClear;
public:
i2p::data::GzipInflator m_Inflator;

10
Tag.h

@ -20,7 +20,7 @@ namespace data { @@ -20,7 +20,7 @@ namespace data {
{
public:
Tag (const uint8_t * buf) { memcpy (m_Buf, buf, sz); };
Tag (const uint8_t * buf) { memcpy (m_Buf, buf, sz); };
Tag (const Tag<sz>& ) = default;
#ifndef _WIN32 // FIXME!!! msvs 2013 can't compile it
Tag (Tag<sz>&& ) = default;
@ -50,6 +50,14 @@ namespace data { @@ -50,6 +50,14 @@ namespace data {
return true;
}
const uint8_t * data() const { return m_Buf; }
/** fill with a value */
void Fill(uint8_t c)
{
memset(m_Buf, c, sz);
}
std::string ToBase64 () const
{
char str[sz*2];

1
Transports.cpp

@ -114,6 +114,7 @@ namespace transport @@ -114,6 +114,7 @@ namespace transport
auto& addresses = context.GetRouterInfo ().GetAddresses ();
for (const auto& address : addresses)
{
if (!address) continue;
if (m_NTCPServer == nullptr && enableNTCP)
{
m_NTCPServer = new NTCPServer ();

35
TunnelPool.cpp

@ -15,7 +15,8 @@ namespace tunnel @@ -15,7 +15,8 @@ namespace tunnel
{
TunnelPool::TunnelPool (int numInboundHops, int numOutboundHops, int numInboundTunnels, int numOutboundTunnels):
m_NumInboundHops (numInboundHops), m_NumOutboundHops (numOutboundHops),
m_NumInboundTunnels (numInboundTunnels), m_NumOutboundTunnels (numOutboundTunnels), m_IsActive (true)
m_NumInboundTunnels (numInboundTunnels), m_NumOutboundTunnels (numOutboundTunnels), m_IsActive (true),
m_CustomPeerSelector(nullptr)
{
}
@ -327,9 +328,18 @@ namespace tunnel @@ -327,9 +328,18 @@ namespace tunnel
bool TunnelPool::SelectPeers (std::vector<std::shared_ptr<const i2p::data::IdentityEx> >& peers, bool isInbound)
{
if (m_ExplicitPeers) return SelectExplicitPeers (peers, isInbound);
int numHops = isInbound ? m_NumInboundHops : m_NumOutboundHops;
if (numHops <= 0) return true; // peers is empty
// peers is empty
if (numHops <= 0) return true;
// custom peer selector in use ?
{
std::lock_guard<std::mutex> lock(m_CustomPeerSelectorMutex);
if (m_CustomPeerSelector)
return m_CustomPeerSelector->SelectPeers(peers, numHops, isInbound);
}
// explicit peers in use
if (m_ExplicitPeers) return SelectExplicitPeers (peers, isInbound);
auto prevHop = i2p::context.GetSharedRouterInfo ();
if(i2p::transport::transports.RoutesRestricted())
{
@ -477,6 +487,23 @@ namespace tunnel @@ -477,6 +487,23 @@ namespace tunnel
LogPrint (eLogDebug, "Tunnels: Creating paired inbound tunnel...");
auto tunnel = tunnels.CreateInboundTunnel (std::make_shared<TunnelConfig>(outboundTunnel->GetInvertedPeers ()), outboundTunnel);
tunnel->SetTunnelPool (shared_from_this ());
}
}
void TunnelPool::SetCustomPeerSelector(TunnelPeerSelector selector)
{
std::lock_guard<std::mutex> lock(m_CustomPeerSelectorMutex);
m_CustomPeerSelector = selector;
}
void TunnelPool::UnsetCustomPeerSelector()
{
SetCustomPeerSelector(nullptr);
}
bool TunnelPool::HasCustomPeerSelector()
{
std::lock_guard<std::mutex> lock(m_CustomPeerSelectorMutex);
return m_CustomPeerSelector != nullptr;
}
}
}

21
TunnelPool.h

@ -23,6 +23,16 @@ namespace tunnel @@ -23,6 +23,16 @@ namespace tunnel
class InboundTunnel;
class OutboundTunnel;
/** interface for custom tunnel peer selection algorithm */
struct ITunnelPeerSelector
{
typedef std::shared_ptr<const i2p::data::IdentityEx> Peer;
typedef std::vector<Peer> TunnelPath;
virtual bool SelectPeers(TunnelPath & peers, int hops, bool isInbound) = 0;
};
typedef std::shared_ptr<ITunnelPeerSelector> TunnelPeerSelector;
class TunnelPool: public std::enable_shared_from_this<TunnelPool> // per local destination
{
public:
@ -45,7 +55,6 @@ namespace tunnel @@ -45,7 +55,6 @@ namespace tunnel
std::shared_ptr<OutboundTunnel> GetNextOutboundTunnel (std::shared_ptr<OutboundTunnel> excluded = nullptr) const;
std::shared_ptr<InboundTunnel> GetNextInboundTunnel (std::shared_ptr<InboundTunnel> excluded = nullptr) const;
std::shared_ptr<OutboundTunnel> GetNewOutboundTunnel (std::shared_ptr<OutboundTunnel> old) const;
void TestTunnels ();
void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg);
void ProcessDeliveryStatus (std::shared_ptr<I2NPMessage> msg);
@ -56,9 +65,12 @@ namespace tunnel @@ -56,9 +65,12 @@ namespace tunnel
int GetNumInboundTunnels () const { return m_NumInboundTunnels; };
int GetNumOutboundTunnels () const { return m_NumOutboundTunnels; };
private:
void SetCustomPeerSelector(TunnelPeerSelector selector);
void UnsetCustomPeerSelector();
bool HasCustomPeerSelector();
private:
void CreateInboundTunnel ();
void CreateOutboundTunnel ();
void CreatePairedInboundTunnel (std::shared_ptr<OutboundTunnel> outboundTunnel);
@ -80,7 +92,8 @@ namespace tunnel @@ -80,7 +92,8 @@ namespace tunnel
mutable std::mutex m_TestsMutex;
std::map<uint32_t, std::pair<std::shared_ptr<OutboundTunnel>, std::shared_ptr<InboundTunnel> > > m_Tests;
bool m_IsActive;
std::mutex m_CustomPeerSelectorMutex;
TunnelPeerSelector m_CustomPeerSelector;
public:
// for HTTP only

Loading…
Cancel
Save