From 3ea1eca350ea85871d5df29a3367e391e64de315 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 29 May 2017 01:28:16 -0400 Subject: [PATCH] ntcp socks proxy (initial) --- daemon/Daemon.cpp | 41 +-- libi2pd/Config.cpp | 87 ++--- libi2pd/NTCPSession.cpp | 714 +++++++++++++++++++++++----------------- libi2pd/NTCPSession.h | 78 +++-- libi2pd/Transports.cpp | 308 +++++++++-------- libi2pd/Transports.h | 44 +-- 6 files changed, 716 insertions(+), 556 deletions(-) diff --git a/daemon/Daemon.cpp b/daemon/Daemon.cpp index 7b1cc361..bd3f01e6 100644 --- a/daemon/Daemon.cpp +++ b/daemon/Daemon.cpp @@ -140,7 +140,7 @@ namespace i2p } i2p::context.SetSupportsV6 (ipv6); i2p::context.SetSupportsV4 (ipv4); - + bool transit; i2p::config::GetOption("notransit", transit); i2p::context.SetAcceptsTunnels (!transit); uint16_t transitTunnels; i2p::config::GetOption("limits.transittunnels", transitTunnels); @@ -157,17 +157,17 @@ namespace i2p /* this section also honors 'floodfill' flag, if set above */ std::string bandwidth; i2p::config::GetOption("bandwidth", bandwidth); if (bandwidth.length () > 0) - { - if (bandwidth[0] >= 'K' && bandwidth[0] <= 'X') + { + if (bandwidth[0] >= 'K' && bandwidth[0] <= 'X') { i2p::context.SetBandwidth (bandwidth[0]); LogPrint(eLogInfo, "Daemon: bandwidth set to ", i2p::context.GetBandwidthLimit (), "KBps"); - } - else + } + else { auto value = std::atoi(bandwidth.c_str()); - if (value > 0) - { + if (value > 0) + { i2p::context.SetBandwidth (value); LogPrint(eLogInfo, "Daemon: bandwidth set to ", i2p::context.GetBandwidthLimit (), " KBps"); } @@ -175,19 +175,19 @@ namespace i2p { LogPrint(eLogInfo, "Daemon: unexpected bandwidth ", bandwidth, ". Set to 'low'"); i2p::context.SetBandwidth (i2p::data::CAPS_FLAG_LOW_BANDWIDTH2); - } - } - } - else if (isFloodfill) + } + } + } + else if (isFloodfill) { LogPrint(eLogInfo, "Daemon: floodfill bandwidth set to 'extra'"); i2p::context.SetBandwidth (i2p::data::CAPS_FLAG_EXTRA_BANDWIDTH1); - } + } else { LogPrint(eLogInfo, "Daemon: bandwidth set to 'low'"); i2p::context.SetBandwidth (i2p::data::CAPS_FLAG_LOW_BANDWIDTH2); - } + } int shareRatio; i2p::config::GetOption("share", shareRatio); i2p::context.SetShareRatio (shareRatio); @@ -195,7 +195,7 @@ namespace i2p std::string family; i2p::config::GetOption("family", family); i2p::context.SetFamily (family); if (family.length () > 0) - LogPrint(eLogInfo, "Daemon: family set to ", family); + LogPrint(eLogInfo, "Daemon: family set to ", family); bool trust; i2p::config::GetOption("trust.enabled", trust); if (trust) @@ -214,7 +214,7 @@ namespace i2p fams.insert (fam.substr (pos, comma != std::string::npos ? comma - pos : std::string::npos)); pos = comma + 1; } - while (comma != std::string::npos); + while (comma != std::string::npos); i2p::transport::transports.RestrictRoutesToFamilies(fams); restricted = fams.size() > 0; } @@ -225,11 +225,11 @@ namespace i2p { comma = routers.find (',', pos); i2p::data::IdentHash ident; - ident.FromBase64 (routers.substr (pos, comma != std::string::npos ? comma - pos : std::string::npos)); + ident.FromBase64 (routers.substr (pos, comma != std::string::npos ? comma - pos : std::string::npos)); idents.insert (ident); pos = comma + 1; } - while (comma != std::string::npos); + while (comma != std::string::npos); LogPrint(eLogInfo, "Daemon: setting restricted routes to use ", idents.size(), " trusted routesrs"); i2p::transport::transports.RestrictRoutesToRouters(idents); restricted = idents.size() > 0; @@ -245,7 +245,7 @@ namespace i2p } return true; } - + bool Daemon_Singleton::start() { i2p::log::Logger().Start(); @@ -263,6 +263,7 @@ namespace i2p LogPrint(eLogInfo, "Daemon: starting Transports"); if(!ssu) LogPrint(eLogInfo, "Daemon: ssu disabled"); if(!ntcp) LogPrint(eLogInfo, "Daemon: ntcp disabled"); + i2p::transport::transports.Start(ntcp, ssu); if (i2p::transport::transports.IsBoundNTCP() || i2p::transport::transports.IsBoundSSU()) { LogPrint(eLogInfo, "Daemon: Transports started"); @@ -273,7 +274,7 @@ namespace i2p i2p::data::netdb.Stop(); return false; } - + bool http; i2p::config::GetOption("http.enabled", http); if (http) { std::string httpAddr; i2p::config::GetOption("http.address", httpAddr); @@ -283,7 +284,7 @@ namespace i2p d.httpServer->Start(); } - + LogPrint(eLogInfo, "Daemon: starting Tunnels"); i2p::tunnel::tunnels.Start(); diff --git a/libi2pd/Config.cpp b/libi2pd/Config.cpp index 796d4871..309c0e9c 100644 --- a/libi2pd/Config.cpp +++ b/libi2pd/Config.cpp @@ -48,7 +48,7 @@ namespace config { ("port", value()->default_value(0), "Port to listen for incoming connections (default: auto)") ("ipv4", value()->zero_tokens()->default_value(true), "Enable communication through ipv4") ("ipv6", value()->zero_tokens()->default_value(false), "Enable communication through ipv6") - ("netid", value()->default_value(I2PD_NET_ID), "Specify NetID. Main I2P is 2") + ("netid", value()->default_value(I2PD_NET_ID), "Specify NetID. Main I2P is 2") ("daemon", value()->zero_tokens()->default_value(false), "Router will go to background after start") ("service", value()->zero_tokens()->default_value(false), "Router will use system folders like '/var/lib/i2pd'") ("notransit", value()->zero_tokens()->default_value(false), "Router will not accept transit tunnels at startup") @@ -57,13 +57,14 @@ namespace config { ("share", value()->default_value(100), "Limit of transit traffic from max bandwidth in percents. (default: 100") ("ntcp", value()->zero_tokens()->default_value(true), "Enable NTCP transport") ("ssu", value()->zero_tokens()->default_value(true), "Enable SSU transport") + ("ntcpproxy", value()->default_value(""), "proxy url for ntcp transport") #ifdef _WIN32 ("svcctl", value()->default_value(""), "Windows service management ('install' or 'remove')") ("insomnia", value()->zero_tokens()->default_value(false), "Prevent system from sleeping") ("close", value()->default_value("ask"), "Action on close: minimize, exit, ask") // TODO: add custom validator or something #endif ; - + options_description limits("Limits options"); limits.add_options() ("limits.coresize", value()->default_value(0), "Maximum size of corefile in Kb (0 - use system limit)") @@ -87,12 +88,12 @@ namespace config { ("httpproxy.address", value()->default_value("127.0.0.1"), "HTTP Proxy listen address") ("httpproxy.port", value()->default_value(4444), "HTTP Proxy listen port") ("httpproxy.keys", value()->default_value(""), "File to persist HTTP Proxy keys") - ("httpproxy.signaturetype", value()->default_value(i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519), "Signature type for new keys. 7 (EdDSA) by default") - ("httpproxy.inbound.length", value()->default_value("3"), "HTTP proxy inbound tunnel length") + ("httpproxy.signaturetype", value()->default_value(i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519), "Signature type for new keys. 7 (EdDSA) by default") + ("httpproxy.inbound.length", value()->default_value("3"), "HTTP proxy inbound tunnel length") ("httpproxy.outbound.length", value()->default_value("3"), "HTTP proxy outbound tunnel length") - ("httpproxy.inbound.quantity", value()->default_value("5"), "HTTP proxy inbound tunnels quantity") + ("httpproxy.inbound.quantity", value()->default_value("5"), "HTTP proxy inbound tunnels quantity") ("httpproxy.outbound.quantity", value()->default_value("5"), "HTTP proxy outbound tunnels quantity") - ("httpproxy.latency.min", value()->default_value("0"), "HTTP proxy min latency for tunnels") + ("httpproxy.latency.min", value()->default_value("0"), "HTTP proxy min latency for tunnels") ("httpproxy.latency.max", value()->default_value("0"), "HTTP proxy max latency for tunnels") ("httpproxy.outproxy", value()->default_value(""), "HTTP proxy upstream out proxy url") ("httpproxy.addresshelper", value()->default_value(true), "Enable or disable addresshelper") @@ -104,13 +105,13 @@ namespace config { ("socksproxy.address", value()->default_value("127.0.0.1"), "SOCKS Proxy listen address") ("socksproxy.port", value()->default_value(4447), "SOCKS Proxy listen port") ("socksproxy.keys", value()->default_value(""), "File to persist SOCKS Proxy keys") - ("socksproxy.signaturetype", value()->default_value(i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519), "Signature type for new keys. 7 (EdDSA) by default") - ("socksproxy.inbound.length", value()->default_value("3"), "SOCKS proxy inbound tunnel length") + ("socksproxy.signaturetype", value()->default_value(i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519), "Signature type for new keys. 7 (EdDSA) by default") + ("socksproxy.inbound.length", value()->default_value("3"), "SOCKS proxy inbound tunnel length") ("socksproxy.outbound.length", value()->default_value("3"), "SOCKS proxy outbound tunnel length") - ("socksproxy.inbound.quantity", value()->default_value("5"), "SOCKS proxy inbound tunnels quantity") + ("socksproxy.inbound.quantity", value()->default_value("5"), "SOCKS proxy inbound tunnels quantity") ("socksproxy.outbound.quantity", value()->default_value("5"), "SOCKS proxy outbound tunnels quantity") - ("socksproxy.latency.min", value()->default_value("0"), "SOCKS proxy min latency for tunnels") - ("socksproxy.latency.max", value()->default_value("0"), "SOCKS proxy max latency for tunnels") + ("socksproxy.latency.min", value()->default_value("0"), "SOCKS proxy min latency for tunnels") + ("socksproxy.latency.max", value()->default_value("0"), "SOCKS proxy max latency for tunnels") ("socksproxy.outproxy", value()->default_value("127.0.0.1"), "Upstream outproxy address for SOCKS Proxy") ("socksproxy.outproxyport", value()->default_value(9050), "Upstream outproxy port for SOCKS Proxy") ; @@ -148,26 +149,26 @@ namespace config { bool upnp_default = false; #if (defined(USE_UPNP) && (defined(WIN32_APP) || defined(ANDROID))) - upnp_default = true; // enable UPNP for windows GUI and android by default + upnp_default = true; // enable UPNP for windows GUI and android by default #endif options_description upnp("UPnP options"); upnp.add_options() ("upnp.enabled", value()->default_value(upnp_default), "Enable or disable UPnP: automatic port forwarding") - ("upnp.name", value()->default_value("I2Pd"), "Name i2pd appears in UPnP forwardings list") + ("upnp.name", value()->default_value("I2Pd"), "Name i2pd appears in UPnP forwardings list") ; options_description precomputation("Precomputation options"); - precomputation.add_options() - ("precomputation.elgamal", -#if defined(__x86_64__) - value()->default_value(false), + precomputation.add_options() + ("precomputation.elgamal", +#if defined(__x86_64__) + value()->default_value(false), #else - value()->default_value(true), -#endif + value()->default_value(true), +#endif "Enable or disable elgamal precomputation table") ; - - options_description reseed("Reseed options"); + + options_description reseed("Reseed options"); reseed.add_options() ("reseed.verify", value()->default_value(false), "Verify .su3 signature") ("reseed.threshold", value()->default_value(25), "Minimum number of known routers before requesting reseed") @@ -188,16 +189,16 @@ namespace config { "https://reseed.memcpy.io/," "https://reseed.onion.im/," "https://itoopie.atomike.ninja/" -// "https://randomrng.ddns.net/" // dead +// "https://randomrng.ddns.net/" // dead ), "Reseed URLs, separated by comma") - ; + ; options_description addressbook("AddressBook options"); addressbook.add_options() ("addressbook.defaulturl", value()->default_value( "http://joajgazyztfssty4w2on5oaqksz6tqoxbduy553y34mf4byv6gpq.b32.i2p/export/alive-hosts.txt" ), "AddressBook subscription URL for initial setup") - ("addressbook.subscriptions", value()->default_value(""), + ("addressbook.subscriptions", value()->default_value(""), "AddressBook subscriptions URLs, separated by comma"); options_description trust("Trust options"); @@ -206,7 +207,7 @@ namespace config { ("trust.family", value()->default_value(""), "Router Familiy to trust for first hops") ("trust.routers", value()->default_value(""), "Only Connect to these routers") ("trust.hidden", value()->default_value(false), "Should we hide our router from other routers?"); - + options_description websocket("Websocket Options"); websocket.add_options() ("websockets.enabled", value()->default_value(false), "enable websocket server") @@ -215,50 +216,50 @@ namespace config { options_description exploratory("Exploratory Options"); exploratory.add_options() - ("exploratory.inbound.length", value()->default_value(2), "Exploratory inbound tunnel length") + ("exploratory.inbound.length", value()->default_value(2), "Exploratory inbound tunnel length") ("exploratory.outbound.length", value()->default_value(2), "Exploratory outbound tunnel length") - ("exploratory.inbound.quantity", value()->default_value(3), "Exploratory inbound tunnels quantity") - ("exploratory.outbound.quantity", value()->default_value(3), "Exploratory outbound tunnels quantity"); + ("exploratory.inbound.quantity", value()->default_value(3), "Exploratory inbound tunnels quantity") + ("exploratory.outbound.quantity", value()->default_value(3), "Exploratory outbound tunnels quantity"); m_OptionsDesc .add(general) - .add(limits) + .add(limits) .add(httpserver) .add(httpproxy) .add(socksproxy) .add(sam) .add(bob) - .add(i2cp) + .add(i2cp) .add(i2pcontrol) .add(upnp) .add(precomputation) - .add(reseed) - .add(addressbook) + .add(reseed) + .add(addressbook) .add(trust) .add(websocket) .add(exploratory) ; } - void ParseCmdline(int argc, char* argv[], bool ignoreUnknown) + void ParseCmdline(int argc, char* argv[], bool ignoreUnknown) { - try + try { auto style = boost::program_options::command_line_style::unix_style | boost::program_options::command_line_style::allow_long_disguise; style &= ~ boost::program_options::command_line_style::allow_guessing; if (ignoreUnknown) - store(command_line_parser(argc, argv).options(m_OptionsDesc).style (style).allow_unregistered().run(), m_Options); - else + store(command_line_parser(argc, argv).options(m_OptionsDesc).style (style).allow_unregistered().run(), m_Options); + else store(parse_command_line(argc, argv, m_OptionsDesc, style), m_Options); - } - catch (boost::program_options::error& e) + } + catch (boost::program_options::error& e) { std::cerr << "args: " << e.what() << std::endl; exit(EXIT_FAILURE); } - if (!ignoreUnknown && (m_Options.count("help") || m_Options.count("h"))) + if (!ignoreUnknown && (m_Options.count("help") || m_Options.count("h"))) { std::cout << "i2pd version " << I2PD_VERSION << " (" << I2P_VERSION << ")" << std::endl; std::cout << m_OptionsDesc; @@ -271,17 +272,17 @@ namespace config { std::ifstream config(path, std::ios::in); - if (!config.is_open()) + if (!config.is_open()) { std::cerr << "missing/unreadable config file: " << path << std::endl; exit(EXIT_FAILURE); } - try + try { store(boost::program_options::parse_config_file(config, m_OptionsDesc), m_Options); - } - catch (boost::program_options::error& e) + } + catch (boost::program_options::error& e) { std::cerr << e.what() << std::endl; exit(EXIT_FAILURE); diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index 4402a71e..d06d2de4 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -22,15 +22,15 @@ namespace i2p { namespace transport { - NTCPSession::NTCPSession (NTCPServer& server, std::shared_ptr in_RemoteRouter): - TransportSession (in_RemoteRouter, NTCP_ESTABLISH_TIMEOUT), - m_Server (server), m_Socket (m_Server.GetService ()), + NTCPSession::NTCPSession (NTCPServer& server, std::shared_ptr in_RemoteRouter): + TransportSession (in_RemoteRouter, NTCP_ESTABLISH_TIMEOUT), + m_Server (server), m_Socket (m_Server.GetService ()), m_IsEstablished (false), m_IsTerminated (false), m_ReceiveBufferOffset (0), m_NextMessage (nullptr), m_IsSending (false) - { + { m_Establisher = new Establisher; } - + NTCPSession::~NTCPSession () { delete m_Establisher; @@ -40,14 +40,14 @@ namespace transport { uint8_t sharedKey[256]; m_DHKeysPair->Agree (pubKey, sharedKey); // time consuming operation - + i2p::crypto::AESKey aesKey; if (sharedKey[0] & 0x80) { aesKey[0] = 0; memcpy (aesKey + 1, sharedKey, 31); - } - else if (sharedKey[0]) + } + else if (sharedKey[0]) memcpy (aesKey, sharedKey, 32); else { @@ -60,24 +60,24 @@ namespace transport { LogPrint (eLogWarning, "NTCP: First 32 bytes of shared key is all zeros, ignored"); return; - } + } } memcpy (aesKey, nonZero, 32); } m_Decryption.SetKey (aesKey); m_Encryption.SetKey (aesKey); - } + } void NTCPSession::Done () { - m_Server.GetService ().post (std::bind (&NTCPSession::Terminate, shared_from_this ())); - } - + m_Server.GetService ().post (std::bind (&NTCPSession::Terminate, shared_from_this ())); + } + void NTCPSession::Terminate () { if (!m_IsTerminated) - { + { m_IsTerminated = true; m_IsEstablished = false; m_Socket.close (); @@ -86,8 +86,8 @@ namespace transport m_SendQueue.clear (); m_NextMessage = nullptr; LogPrint (eLogDebug, "NTCP: session terminated"); - } - } + } + } void NTCPSession::Connected () { @@ -95,14 +95,14 @@ namespace transport delete m_Establisher; m_Establisher = nullptr; - - m_DHKeysPair = nullptr; + + m_DHKeysPair = nullptr; SetTerminationTimeout (NTCP_TERMINATION_TIMEOUT); - SendTimeSyncMessage (); + SendTimeSyncMessage (); transports.PeerConnected (shared_from_this ()); - } - + } + void NTCPSession::ClientLogin () { if (!m_DHKeysPair) @@ -114,61 +114,61 @@ namespace transport const uint8_t * ident = m_RemoteIdentity->GetIdentHash (); for (int i = 0; i < 32; i++) m_Establisher->phase1.HXxorHI[i] ^= ident[i]; - + boost::asio::async_write (m_Socket, boost::asio::buffer (&m_Establisher->phase1, sizeof (NTCPPhase1)), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase1Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - } + std::bind(&NTCPSession::HandlePhase1Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + } void NTCPSession::ServerLogin () { m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); // receive Phase1 - boost::asio::async_read (m_Socket, boost::asio::buffer(&m_Establisher->phase1, sizeof (NTCPPhase1)), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase1Received, shared_from_this (), + boost::asio::async_read (m_Socket, boost::asio::buffer(&m_Establisher->phase1, sizeof (NTCPPhase1)), boost::asio::transfer_all (), + std::bind(&NTCPSession::HandlePhase1Received, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - } - + } + void NTCPSession::HandlePhase1Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred) { (void) bytes_transferred; if (ecode) - { + { LogPrint (eLogInfo, "NTCP: couldn't send Phase 1 message: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate (); } else - { - boost::asio::async_read (m_Socket, boost::asio::buffer(&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase2Received, shared_from_this (), + { + boost::asio::async_read (m_Socket, boost::asio::buffer(&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all (), + std::bind(&NTCPSession::HandlePhase2Received, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - } - } + } + } void NTCPSession::HandlePhase1Received (const boost::system::error_code& ecode, std::size_t bytes_transferred) { (void) bytes_transferred; if (ecode) - { + { LogPrint (eLogInfo, "NTCP: phase 1 read error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate (); } else - { + { // verify ident uint8_t digest[32]; SHA256(m_Establisher->phase1.pubKey, 256, digest); const uint8_t * ident = i2p::context.GetIdentHash (); for (int i = 0; i < 32; i++) - { + { if ((m_Establisher->phase1.HXxorHI[i] ^ ident[i]) != digest[i]) { LogPrint (eLogError, "NTCP: phase 1 error: ident mismatch"); Terminate (); return; - } - } + } + } #if (__GNUC__ == 4) && (__GNUC_MINOR__ <= 7) // due the bug in gcc 4.7. std::shared_future.get() is not const if (!m_DHKeysPair) @@ -183,15 +183,15 @@ namespace transport if (!s->m_DHKeysPair) s->m_DHKeysPair = transports.GetNextDHKeysPair (); s->CreateAESKey (s->m_Establisher->phase1.pubKey); - }).share (); + }).share (); m_Server.GetService ().post ([s, keyCreated]() - { - keyCreated.get (); + { + keyCreated.get (); s->SendPhase2 (); }); -#endif - } - } +#endif + } + } void NTCPSession::SendPhase2 () { @@ -200,42 +200,42 @@ namespace transport uint8_t xy[512]; memcpy (xy, m_Establisher->phase1.pubKey, 256); memcpy (xy + 256, y, 256); - SHA256(xy, 512, m_Establisher->phase2.encrypted.hxy); + SHA256(xy, 512, m_Establisher->phase2.encrypted.hxy); uint32_t tsB = htobe32 (i2p::util::GetSecondsSinceEpoch ()); memcpy (m_Establisher->phase2.encrypted.timestamp, &tsB, 4); RAND_bytes (m_Establisher->phase2.encrypted.filler, 12); m_Encryption.SetIV (y + 240); m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16); - + m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); boost::asio::async_write (m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsB)); + std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsB)); + + } - } - void NTCPSession::HandlePhase2Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB) { (void) bytes_transferred; if (ecode) - { + { LogPrint (eLogInfo, "NTCP: Couldn't send Phase 2 message: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate (); } else - { - boost::asio::async_read (m_Socket, boost::asio::buffer(m_ReceiveBuffer, NTCP_DEFAULT_PHASE3_SIZE), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase3Received, shared_from_this (), + { + boost::asio::async_read (m_Socket, boost::asio::buffer(m_ReceiveBuffer, NTCP_DEFAULT_PHASE3_SIZE), boost::asio::transfer_all (), + std::bind(&NTCPSession::HandlePhase3Received, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsB)); - } - } - + } + } + void NTCPSession::HandlePhase2Received (const boost::system::error_code& ecode, std::size_t bytes_transferred) { (void) bytes_transferred; if (ecode) - { + { LogPrint (eLogInfo, "NTCP: Phase 2 read error: ", ecode.message (), ". Wrong ident assumed"); if (ecode != boost::asio::error::operation_aborted) { @@ -247,7 +247,7 @@ namespace transport } } else - { + { #if (__GNUC__ == 4) && (__GNUC_MINOR__ <= 7) // due the bug in gcc 4.7. std::shared_future.get() is not const CreateAESKey (m_Establisher->phase2.pubKey); @@ -258,22 +258,22 @@ namespace transport auto keyCreated = std::async (std::launch::async, [s] () { s->CreateAESKey (s->m_Establisher->phase2.pubKey); - }).share (); // TODO: use move capture in C++ 14 instead shared_future + }).share (); // TODO: use move capture in C++ 14 instead shared_future // let other operations execute while a key gets created m_Server.GetService ().post ([s, keyCreated]() - { + { keyCreated.get (); // we might wait if no more pending operations s->HandlePhase2 (); - }); + }); #endif - } - } + } + } void NTCPSession::HandlePhase2 () { m_Decryption.SetIV (m_Establisher->phase2.pubKey + 240); m_Encryption.SetIV (m_Establisher->phase1.HXxorHI + 16); - + m_Decryption.Decrypt((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); // verify uint8_t xy[512]; @@ -281,31 +281,31 @@ namespace transport memcpy (xy + 256, m_Establisher->phase2.pubKey, 256); uint8_t digest[32]; SHA256 (xy, 512, digest); - if (memcmp(m_Establisher->phase2.encrypted.hxy, digest, 32)) + if (memcmp(m_Establisher->phase2.encrypted.hxy, digest, 32)) { LogPrint (eLogError, "NTCP: Phase 2 process error: incorrect hash"); transports.ReuseDHKeysPair (m_DHKeysPair); m_DHKeysPair = nullptr; Terminate (); return ; - } + } SendPhase3 (); - } - + } + void NTCPSession::SendPhase3 () { auto& keys = i2p::context.GetPrivateKeys (); - uint8_t * buf = m_ReceiveBuffer; + uint8_t * buf = m_ReceiveBuffer; htobe16buf (buf, keys.GetPublic ()->GetFullLen ()); buf += 2; buf += i2p::context.GetIdentity ()->ToBuffer (buf, NTCP_BUFFER_SIZE); uint32_t tsA = htobe32 (i2p::util::GetSecondsSinceEpoch ()); htobuf32(buf,tsA); - buf += 4; + buf += 4; size_t signatureLen = keys.GetPublic ()->GetSignatureLen (); size_t len = (buf - m_ReceiveBuffer) + signatureLen; size_t paddingSize = len & 0x0F; // %16 - if (paddingSize > 0) + if (paddingSize > 0) { paddingSize = 16 - paddingSize; // fill padding with random data @@ -318,46 +318,46 @@ namespace transport s.Insert (m_Establisher->phase1.pubKey, 256); // x s.Insert (m_Establisher->phase2.pubKey, 256); // y s.Insert (m_RemoteIdentity->GetIdentHash (), 32); // ident - s.Insert (tsA); // tsA + s.Insert (tsA); // tsA s.Insert (m_Establisher->phase2.encrypted.timestamp, 4); // tsB s.Sign (keys, buf); - m_Encryption.Encrypt(m_ReceiveBuffer, len, m_ReceiveBuffer); + m_Encryption.Encrypt(m_ReceiveBuffer, len, m_ReceiveBuffer); boost::asio::async_write (m_Socket, boost::asio::buffer (m_ReceiveBuffer, len), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); - } - + std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); + } + void NTCPSession::HandlePhase3Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA) { - (void) bytes_transferred; + (void) bytes_transferred; if (ecode) - { + { LogPrint (eLogInfo, "NTCP: Couldn't send Phase 3 message: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate (); } else - { - // wait for phase4 + { + // wait for phase4 auto signatureLen = m_RemoteIdentity->GetSignatureLen (); size_t paddingSize = signatureLen & 0x0F; // %16 - if (paddingSize > 0) signatureLen += (16 - paddingSize); - boost::asio::async_read (m_Socket, boost::asio::buffer(m_ReceiveBuffer, signatureLen), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase4Received, shared_from_this (), + if (paddingSize > 0) signatureLen += (16 - paddingSize); + boost::asio::async_read (m_Socket, boost::asio::buffer(m_ReceiveBuffer, signatureLen), boost::asio::transfer_all (), + std::bind(&NTCPSession::HandlePhase4Received, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); - } - } + } + } void NTCPSession::HandlePhase3Received (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB) - { + { if (ecode) - { + { LogPrint (eLogInfo, "NTCP: Phase 3 read error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate (); } else - { + { m_Decryption.Decrypt (m_ReceiveBuffer, bytes_transferred, m_ReceiveBuffer); uint8_t * buf = m_ReceiveBuffer; uint16_t size = bufbe16toh (buf); @@ -366,30 +366,30 @@ namespace transport { LogPrint (eLogInfo, "NTCP: session already exists"); Terminate (); - } + } auto existing = i2p::data::netdb.FindRouter (identity->GetIdentHash ()); // check if exists already SetRemoteIdentity (existing ? existing->GetRouterIdentity () : identity); - + size_t expectedSize = size + 2/*size*/ + 4/*timestamp*/ + m_RemoteIdentity->GetSignatureLen (); size_t paddingLen = expectedSize & 0x0F; - if (paddingLen) paddingLen = (16 - paddingLen); + if (paddingLen) paddingLen = (16 - paddingLen); if (expectedSize > NTCP_DEFAULT_PHASE3_SIZE) { // we need more bytes for Phase3 - expectedSize += paddingLen; - boost::asio::async_read (m_Socket, boost::asio::buffer(m_ReceiveBuffer + NTCP_DEFAULT_PHASE3_SIZE, expectedSize - NTCP_DEFAULT_PHASE3_SIZE), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase3ExtraReceived, shared_from_this (), + expectedSize += paddingLen; + boost::asio::async_read (m_Socket, boost::asio::buffer(m_ReceiveBuffer + NTCP_DEFAULT_PHASE3_SIZE, expectedSize - NTCP_DEFAULT_PHASE3_SIZE), boost::asio::transfer_all (), + std::bind(&NTCPSession::HandlePhase3ExtraReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsB, paddingLen)); } else HandlePhase3 (tsB, paddingLen); - } + } } void NTCPSession::HandlePhase3ExtraReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB, size_t paddingLen) { if (ecode) - { + { LogPrint (eLogInfo, "NTCP: Phase 3 extra read error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate (); @@ -398,25 +398,25 @@ namespace transport { m_Decryption.Decrypt (m_ReceiveBuffer + NTCP_DEFAULT_PHASE3_SIZE, bytes_transferred, m_ReceiveBuffer+ NTCP_DEFAULT_PHASE3_SIZE); HandlePhase3 (tsB, paddingLen); - } - } + } + } void NTCPSession::HandlePhase3 (uint32_t tsB, size_t paddingLen) { uint8_t * buf = m_ReceiveBuffer + m_RemoteIdentity->GetFullLen () + 2 /*size*/; - uint32_t tsA = buf32toh(buf); + uint32_t tsA = buf32toh(buf); buf += 4; - buf += paddingLen; + buf += paddingLen; // check timestamp auto ts = i2p::util::GetSecondsSinceEpoch (); uint32_t tsA1 = be32toh (tsA); if (tsA1 < ts - NTCP_CLOCK_SKEW || tsA1 > ts + NTCP_CLOCK_SKEW) { - LogPrint (eLogError, "NTCP: Phase3 time difference ", ts - tsA1, " exceeds clock skew"); + LogPrint (eLogError, "NTCP: Phase3 time difference ", ts - tsA1, " exceeds clock skew"); Terminate (); return; - } + } // check signature SignedData s; @@ -424,13 +424,13 @@ namespace transport s.Insert (m_Establisher->phase2.pubKey, 256); // y s.Insert (i2p::context.GetRouterInfo ().GetIdentHash (), 32); // ident s.Insert (tsA); // tsA - s.Insert (tsB); // tsB + s.Insert (tsB); // tsB if (!s.Verify (m_RemoteIdentity, buf)) - { + { LogPrint (eLogError, "NTCP: signature verification failed"); Terminate (); return; - } + } SendPhase4 (tsA, tsB); } @@ -444,27 +444,27 @@ namespace transport s.Insert (tsA); // tsA s.Insert (tsB); // tsB auto& keys = i2p::context.GetPrivateKeys (); - auto signatureLen = keys.GetPublic ()->GetSignatureLen (); + auto signatureLen = keys.GetPublic ()->GetSignatureLen (); s.Sign (keys, m_ReceiveBuffer); size_t paddingSize = signatureLen & 0x0F; // %16 - if (paddingSize > 0) signatureLen += (16 - paddingSize); + if (paddingSize > 0) signatureLen += (16 - paddingSize); m_Encryption.Encrypt (m_ReceiveBuffer, signatureLen, m_ReceiveBuffer); boost::asio::async_write (m_Socket, boost::asio::buffer (m_ReceiveBuffer, signatureLen), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase4Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - } + std::bind(&NTCPSession::HandlePhase4Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + } void NTCPSession::HandlePhase4Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred) { (void) bytes_transferred; if (ecode) - { + { LogPrint (eLogWarning, "NTCP: Couldn't send Phase 4 message: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate (); } else - { + { LogPrint (eLogDebug, "NTCP: Server session from ", m_Socket.remote_endpoint (), " connected"); m_Server.AddNTCPSession (shared_from_this ()); @@ -472,23 +472,23 @@ namespace transport m_ReceiveBufferOffset = 0; m_NextMessage = nullptr; Receive (); - } - } - + } + } + void NTCPSession::HandlePhase4Received (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA) { if (ecode) - { + { LogPrint (eLogError, "NTCP: Phase 4 read error: ", ecode.message (), ". Check your clock"); if (ecode != boost::asio::error::operation_aborted) { - // this router doesn't like us + // this router doesn't like us i2p::data::netdb.SetUnreachable (GetRemoteIdentity ()->GetIdentHash (), true); Terminate (); - } + } } else - { + { m_Decryption.Decrypt(m_ReceiveBuffer, bytes_transferred, m_ReceiveBuffer); // check timestamp @@ -496,11 +496,11 @@ namespace transport auto ts = i2p::util::GetSecondsSinceEpoch (); if (tsB < ts - NTCP_CLOCK_SKEW || tsB > ts + NTCP_CLOCK_SKEW) { - LogPrint (eLogError, "NTCP: Phase4 time difference ", ts - tsB, " exceeds clock skew"); + LogPrint (eLogError, "NTCP: Phase4 time difference ", ts - tsB, " exceeds clock skew"); Terminate (); return; - } - + } + // verify signature SignedData s; s.Insert (m_Establisher->phase1.pubKey, 256); // x @@ -510,14 +510,14 @@ namespace transport s.Insert (m_Establisher->phase2.encrypted.timestamp, 4); // tsB if (!s.Verify (m_RemoteIdentity, m_ReceiveBuffer)) - { + { LogPrint (eLogError, "NTCP: Phase 4 process error: signature verification failed"); Terminate (); return; - } + } LogPrint (eLogDebug, "NTCP: session to ", m_Socket.remote_endpoint (), " connected"); Connected (); - + m_ReceiveBufferOffset = 0; m_NextMessage = nullptr; Receive (); @@ -526,14 +526,14 @@ namespace transport void NTCPSession::Receive () { - m_Socket.async_read_some (boost::asio::buffer(m_ReceiveBuffer + m_ReceiveBufferOffset, NTCP_BUFFER_SIZE - m_ReceiveBufferOffset), - std::bind(&NTCPSession::HandleReceived, shared_from_this (), + m_Socket.async_read_some (boost::asio::buffer(m_ReceiveBuffer + m_ReceiveBufferOffset, NTCP_BUFFER_SIZE - m_ReceiveBufferOffset), + std::bind(&NTCPSession::HandleReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - } - + } + void NTCPSession::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { - if (ecode) + if (ecode) { if (ecode != boost::asio::error::operation_aborted) LogPrint (eLogDebug, "NTCP: Read error: ", ecode.message ()); @@ -547,7 +547,7 @@ namespace transport m_ReceiveBufferOffset += bytes_transferred; if (m_ReceiveBufferOffset >= 16) - { + { // process received data uint8_t * nextBlock = m_ReceiveBuffer; while (m_ReceiveBufferOffset >= 16) @@ -555,14 +555,14 @@ namespace transport if (!DecryptNextBlock (nextBlock)) // 16 bytes { Terminate (); - return; - } + return; + } nextBlock += 16; m_ReceiveBufferOffset -= 16; - } - if (m_ReceiveBufferOffset > 0) - memcpy (m_ReceiveBuffer, nextBlock, m_ReceiveBufferOffset); - } + } + if (m_ReceiveBufferOffset > 0) + memcpy (m_ReceiveBuffer, nextBlock, m_ReceiveBufferOffset); + } // read and process more is available boost::system::error_code ec; @@ -586,11 +586,11 @@ namespace transport delete[] buf; Terminate (); return; - } - m_ReceiveBufferOffset += moreBytes; + } + m_ReceiveBufferOffset += moreBytes; m_NumReceivedBytes += moreBytes; i2p::transport::transports.UpdateReceivedBytes (moreBytes); - // process more data + // process more data uint8_t * nextBlock = moreBuf; while (m_ReceiveBufferOffset >= 16) { @@ -598,26 +598,26 @@ namespace transport { delete[] buf; Terminate (); - return; - } + return; + } nextBlock += 16; m_ReceiveBufferOffset -= 16; - } - if (m_ReceiveBufferOffset > 0) + } + if (m_ReceiveBufferOffset > 0) memcpy (m_ReceiveBuffer, nextBlock, m_ReceiveBufferOffset); // nextBlock points to memory inside buf delete[] buf; - } - m_Handler.Flush (); - + } + m_Handler.Flush (); + m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); Receive (); - } - } + } + } bool NTCPSession::DecryptNextBlock (const uint8_t * encrypted) // 16 bytes { if (!m_NextMessage) // new message, header expected - { + { // decrypt header and extract length uint8_t buf[16]; m_Decryption.Decrypt (encrypted, buf); @@ -633,26 +633,26 @@ namespace transport m_NextMessage = (dataSize + 16U + 15U) <= I2NP_MAX_SHORT_MESSAGE_SIZE - 2 ? NewI2NPShortMessage () : NewI2NPMessage (); m_NextMessage->Align (16); m_NextMessage->offset += 2; // size field - m_NextMessage->len = m_NextMessage->offset + dataSize; + m_NextMessage->len = m_NextMessage->offset + dataSize; memcpy (m_NextMessage->GetBuffer () - 2, buf, 16); m_NextMessageOffset = 16; - } + } else - { + { // timestamp int diff = (int)bufbe32toh (buf + 2) - (int)i2p::util::GetSecondsSinceEpoch (); LogPrint (eLogInfo, "NTCP: Timestamp. Time difference ", diff, " seconds"); return true; - } - } + } + } else // message continues - { + { m_Decryption.Decrypt (encrypted, m_NextMessage->GetBuffer () - 2 + m_NextMessageOffset); m_NextMessageOffset += 16; - } - + } + if (m_NextMessageOffset >= m_NextMessage->GetLength () + 2 + 4) // +checksum - { + { // we have a complete I2NP message uint8_t checksum[4]; htobe32buf (checksum, adler32 (adler32 (0, Z_NULL, 0), m_NextMessage->GetBuffer () - 2, m_NextMessageOffset - 4)); @@ -667,19 +667,19 @@ namespace transport } else LogPrint (eLogInfo, "NTCP: message expired"); - } + } else LogPrint (eLogWarning, "NTCP: Incorrect adler checksum of message, dropped"); m_NextMessage = nullptr; } - return true; - } + return true; + } void NTCPSession::Send (std::shared_ptr msg) { m_IsSending = true; - boost::asio::async_write (m_Socket, CreateMsgBuffer (msg), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector >{ msg })); + boost::asio::async_write (m_Socket, CreateMsgBuffer (msg), boost::asio::transfer_all (), + std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector >{ msg })); } boost::asio::const_buffers_1 NTCPSession::CreateMsgBuffer (std::shared_ptr msg) @@ -688,14 +688,14 @@ namespace transport int len; if (msg) - { + { // regular I2NP if (msg->offset < 2) LogPrint (eLogError, "NTCP: Malformed I2NP message"); // TODO: - sendBuffer = msg->GetBuffer () - 2; + sendBuffer = msg->GetBuffer () - 2; len = msg->GetLength (); htobe16buf (sendBuffer, len); - } + } else { // prepare timestamp @@ -703,7 +703,7 @@ namespace transport len = 4; htobuf16(sendBuffer, 0); htobe32buf (sendBuffer + 2, i2p::util::GetSecondsSinceEpoch ()); - } + } int rem = (len + 6) & 0x0F; // %16 int padding = 0; if (rem > 0) { @@ -714,9 +714,9 @@ namespace transport htobe32buf (sendBuffer + len + 2 + padding, adler32 (adler32 (0, Z_NULL, 0), sendBuffer, len + 2+ padding)); int l = len + padding + 6; - m_Encryption.Encrypt(sendBuffer, l, sendBuffer); + m_Encryption.Encrypt(sendBuffer, l, sendBuffer); return boost::asio::buffer ((const uint8_t *)sendBuffer, l); - } + } void NTCPSession::Send (const std::vector >& msgs) @@ -725,23 +725,23 @@ namespace transport std::vector bufs; for (const auto& it: msgs) bufs.push_back (CreateMsgBuffer (it)); - boost::asio::async_write (m_Socket, bufs, boost::asio::transfer_all (), - std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, msgs)); + boost::asio::async_write (m_Socket, bufs, boost::asio::transfer_all (), + std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, msgs)); } - + void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector > msgs) { (void) msgs; m_IsSending = false; if (ecode) - { + { LogPrint (eLogWarning, "NTCP: Couldn't send msgs: ", ecode.message ()); // we shouldn't call Terminate () here, because HandleReceive takes care // TODO: 'delete this' statement in Terminate () must be eliminated later // Terminate (); } else - { + { m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); m_NumSentBytes += bytes_transferred; i2p::transport::transports.UpdateSentBytes (bytes_transferred); @@ -749,28 +749,28 @@ namespace transport { Send (m_SendQueue); m_SendQueue.clear (); - } - } - } + } + } + } + - void NTCPSession::SendTimeSyncMessage () { Send (nullptr); - } + } void NTCPSession::SendI2NPMessages (const std::vector >& msgs) { - m_Server.GetService ().post (std::bind (&NTCPSession::PostI2NPMessages, shared_from_this (), msgs)); - } + m_Server.GetService ().post (std::bind (&NTCPSession::PostI2NPMessages, shared_from_this (), msgs)); + } void NTCPSession::PostI2NPMessages (std::vector > msgs) { if (m_IsTerminated) return; if (m_IsSending) { - if (m_SendQueue.size () < NTCP_MAX_OUTGOING_QUEUE_SIZE) + if (m_SendQueue.size () < NTCP_MAX_OUTGOING_QUEUE_SIZE) { for (const auto& it: msgs) m_SendQueue.push_back (it); @@ -779,131 +779,151 @@ namespace transport { LogPrint (eLogWarning, "NTCP: outgoing messages queue size exceeds ", NTCP_MAX_OUTGOING_QUEUE_SIZE); Terminate (); - } - } - else + } + } + else Send (msgs); - } + } //----------------------------------------- NTCPServer::NTCPServer (): - m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), - m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr) + m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), + m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr), + m_UseSocks(false), m_Resolver(m_Service), m_SocksEndpoint(nullptr) { } - + NTCPServer::~NTCPServer () { Stop (); - } + } void NTCPServer::Start () { if (!m_IsRunning) - { + { m_IsRunning = true; m_Thread = new std::thread (std::bind (&NTCPServer::Run, this)); - // create acceptors - auto& addresses = context.GetRouterInfo ().GetAddresses (); - for (const auto& address: addresses) + // we are using a socks proxy, don't create any acceptors + if(m_UseSocks) + { + boost::asio::ip::tcp::resolver::query q(m_SocksAddress, std::to_string(m_SocksPort)); + boost::system::error_code e; + auto itr = m_Resolver.resolve(q, e); + if(e) + { + LogPrint(eLogError, "NTCP: Failed to resolve proxy ", e.message()); + } + else + { + m_SocksEndpoint = new boost::asio::ip::tcp::endpoint(*itr); + } + } + else { - if (!address) continue; - if (address->transportStyle == i2p::data::RouterInfo::eTransportNTCP) + // create acceptors + auto& addresses = context.GetRouterInfo ().GetAddresses (); + for (const auto& address: addresses) { - if (address->host.is_v4()) + if (!address) continue; + if (address->transportStyle == i2p::data::RouterInfo::eTransportNTCP) { - try + if (address->host.is_v4()) { - m_NTCPAcceptor = new boost::asio::ip::tcp::acceptor (m_Service, - boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address->port)); - } catch ( std::exception & ex ) { - /** fail to bind ip4 */ - LogPrint(eLogError, "NTCP: Failed to bind to ip4 port ",address->port, ex.what()); - continue; + try + { + m_NTCPAcceptor = new boost::asio::ip::tcp::acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address->port)); + } catch ( std::exception & ex ) { + /** fail to bind ip4 */ + LogPrint(eLogError, "NTCP: Failed to bind to ip4 port ",address->port, ex.what()); + continue; + } + + LogPrint (eLogInfo, "NTCP: Start listening TCP port ", address->port); + auto conn = std::make_shared(*this); + m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this, conn, std::placeholders::_1)); } - - LogPrint (eLogInfo, "NTCP: Start listening TCP port ", address->port); - auto conn = std::make_shared(*this); - m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this, - conn, std::placeholders::_1)); - } - else if (address->host.is_v6() && context.SupportsV6 ()) - { - m_NTCPV6Acceptor = new boost::asio::ip::tcp::acceptor (m_Service); - try + else if (address->host.is_v6() && context.SupportsV6 ()) { - m_NTCPV6Acceptor->open (boost::asio::ip::tcp::v6()); - m_NTCPV6Acceptor->set_option (boost::asio::ip::v6_only (true)); - - m_NTCPV6Acceptor->bind (boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v6(), address->port)); - m_NTCPV6Acceptor->listen (); - - LogPrint (eLogInfo, "NTCP: Start listening V6 TCP port ", address->port); - auto conn = std::make_shared (*this); - m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6, - this, conn, std::placeholders::_1)); - } catch ( std::exception & ex ) { - LogPrint(eLogError, "NTCP: failed to bind to ip6 port ", address->port); - continue; + m_NTCPV6Acceptor = new boost::asio::ip::tcp::acceptor (m_Service); + try + { + m_NTCPV6Acceptor->open (boost::asio::ip::tcp::v6()); + m_NTCPV6Acceptor->set_option (boost::asio::ip::v6_only (true)); + m_NTCPV6Acceptor->bind (boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v6(), address->port)); + m_NTCPV6Acceptor->listen (); + + LogPrint (eLogInfo, "NTCP: Start listening V6 TCP port ", address->port); + auto conn = std::make_shared (*this); + m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6, this, conn, std::placeholders::_1)); + } catch ( std::exception & ex ) { + LogPrint(eLogError, "NTCP: failed to bind to ip6 port ", address->port); + continue; + } } - } + } } } - ScheduleTermination (); - } + ScheduleTermination (); + } } - + void NTCPServer::Stop () - { + { { // we have to copy it because Terminate changes m_NTCPSessions - auto ntcpSessions = m_NTCPSessions; + auto ntcpSessions = m_NTCPSessions; for (auto& it: ntcpSessions) it.second->Terminate (); for (auto& it: m_PendingIncomingSessions) it->Terminate (); - } + } m_NTCPSessions.clear (); if (m_IsRunning) - { + { m_IsRunning = false; - m_TerminationTimer.cancel (); - if (m_NTCPAcceptor) - { - delete m_NTCPAcceptor; + m_TerminationTimer.cancel (); + if (m_NTCPAcceptor) + { + delete m_NTCPAcceptor; m_NTCPAcceptor = nullptr; } - if (m_NTCPV6Acceptor) + if (m_NTCPV6Acceptor) { - delete m_NTCPV6Acceptor; + delete m_NTCPV6Acceptor; m_NTCPV6Acceptor = nullptr; } m_Service.stop (); if (m_Thread) - { - m_Thread->join (); + { + m_Thread->join (); delete m_Thread; m_Thread = nullptr; - } - } - } + } + if(m_SocksEndpoint) + { + delete m_SocksEndpoint; + m_SocksEndpoint = nullptr; + } + } + } - - void NTCPServer::Run () - { + + void NTCPServer::Run () + { while (m_IsRunning) { try - { + { m_Service.run (); } catch (std::exception& ex) { LogPrint (eLogError, "NTCP: runtime exception: ", ex.what ()); - } - } - } + } + } + } bool NTCPServer::AddNTCPSession (std::shared_ptr session) { @@ -918,13 +938,13 @@ namespace transport } m_NTCPSessions.insert (std::pair >(ident, session)); return true; - } + } void NTCPServer::RemoveNTCPSession (std::shared_ptr session) { if (session && session->GetRemoteIdentity ()) m_NTCPSessions.erase (session->GetRemoteIdentity ()->GetIdentHash ()); - } + } std::shared_ptr NTCPServer::FindNTCPSession (const i2p::data::IdentHash& ident) { @@ -932,14 +952,14 @@ namespace transport if (it != m_NTCPSessions.end ()) return it->second; return nullptr; - } - + } + void NTCPServer::HandleAccept (std::shared_ptr conn, const boost::system::error_code& error) - { + { if (!error) { boost::system::error_code ec; - auto ep = conn->GetSocket ().remote_endpoint(ec); + auto ep = conn->GetSocket ().remote_endpoint(ec); if (!ec) { LogPrint (eLogDebug, "NTCP: Connected from ", ep); @@ -947,35 +967,35 @@ namespace transport { conn->ServerLogin (); m_PendingIncomingSessions.push_back (conn); - } + } } else LogPrint (eLogError, "NTCP: Connected from error ", ec.message ()); } - + if (error != boost::asio::error::operation_aborted) { - conn = std::make_shared (*this); - m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this, + conn = std::make_shared (*this); + m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this, conn, std::placeholders::_1)); - } + } } void NTCPServer::HandleAcceptV6 (std::shared_ptr conn, const boost::system::error_code& error) - { + { if (!error) { boost::system::error_code ec; - auto ep = conn->GetSocket ().remote_endpoint(ec); + auto ep = conn->GetSocket ().remote_endpoint(ec); if (!ec) { LogPrint (eLogDebug, "NTCP: Connected from ", ep); if (conn) - { + { conn->ServerLogin (); m_PendingIncomingSessions.push_back (conn); - } + } } else LogPrint (eLogError, "NTCP: Connected from error ", ec.message ()); @@ -983,40 +1003,65 @@ namespace transport if (error != boost::asio::error::operation_aborted) { - conn = std::make_shared (*this); - m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6, this, + conn = std::make_shared (*this); + m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6, this, conn, std::placeholders::_1)); - } - } + } + } - void NTCPServer::Connect (const boost::asio::ip::address& address, int port, std::shared_ptr conn) + void NTCPServer::Connect(const boost::asio::ip::address & address, uint16_t port, std::shared_ptr conn) { LogPrint (eLogDebug, "NTCP: Connecting to ", address ,":", port); - m_Service.post([=]() - { + m_Service.post([&]() { if (this->AddNTCPSession (conn)) { + auto timer = std::make_shared(m_Service); - timer->expires_from_now (boost::posix_time::seconds(NTCP_CONNECT_TIMEOUT)); - timer->async_wait ([conn](const boost::system::error_code& ecode) + timer->expires_from_now (boost::posix_time::seconds(NTCP_CONNECT_TIMEOUT)); + timer->async_wait ([conn](const boost::system::error_code& ecode) { + if (ecode != boost::asio::error::operation_aborted) { - if (ecode != boost::asio::error::operation_aborted) - { - LogPrint (eLogInfo, "NTCP: Not connected in ", NTCP_CONNECT_TIMEOUT, " seconds"); - conn->Terminate (); - } - }); - conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port), - std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn, timer)); - } - }); + LogPrint (eLogInfo, "NTCP: Not connected in ", NTCP_CONNECT_TIMEOUT, " seconds"); + conn->Terminate (); + } + }); + conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port), std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn, timer)); + } + }); + } + + void NTCPServer::ConnectSocks (const std::string& host, uint16_t port, std::shared_ptr conn) + { + if(m_SocksEndpoint == nullptr) + { + return; + } + LogPrint (eLogDebug, "NTCP: Connecting to ", host ,":", port, " Via socks proxy"); + m_Service.post([=]() { + if (this->AddNTCPSession (conn)) + { + + auto timer = std::make_shared(m_Service); + auto timeout = NTCP_CONNECT_TIMEOUT * 2; + timer->expires_from_now (boost::posix_time::seconds(timeout)); + timer->async_wait ([conn, timeout](const boost::system::error_code& ecode) { + if (ecode != boost::asio::error::operation_aborted) + { + LogPrint (eLogInfo, "NTCP: Not connected in ", timeout, " seconds"); + i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ()->GetIdentHash (), true); + conn->Terminate (); + } + }); + conn->GetSocket ().async_connect (*m_SocksEndpoint, std::bind (&NTCPServer::HandleSocksConnect, this, std::placeholders::_1, conn, timer, host, port)); + } + }); } void NTCPServer::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn, std::shared_ptr timer) { - timer->cancel (); + timer->cancel (); if (ecode) - { + { LogPrint (eLogInfo, "NTCP: Connect error ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ()->GetIdentHash (), true); @@ -1028,8 +1073,68 @@ namespace transport if (conn->GetSocket ().local_endpoint ().protocol () == boost::asio::ip::tcp::v6()) // ipv6 context.UpdateNTCPV6Address (conn->GetSocket ().local_endpoint ().address ()); conn->ClientLogin (); - } - } + } + } + + void NTCPServer::UseSocksProxy(const std::string & addr, uint16_t port) + { + m_UseSocks = true; + m_SocksAddress = addr; + m_SocksPort = port; + } + + void NTCPServer::HandleSocksConnect(const boost::system::error_code& ecode, std::shared_ptr conn, std::shared_ptr timer, const std::string & host, uint16_t port) + { + if(ecode) + { + LogPrint(eLogInfo, "NTCP: Socks Proxy connect error ", ecode.message()); + return; + } + LogPrint(eLogDebug, "NTCP: connecting via socks proxy to ",host, ":", port); + uint8_t readbuff[8]; + // build socks4a request + size_t addrsz = host.size(); + size_t sz = 8 + 1 + 4 + addrsz + 1; + uint8_t buff[256]; + if(sz > 256) + { + // hostname too big + return; + } + buff[0] = 4; + buff[1] = 1; + htobe16buf(buff+2, port); + buff[4] = 0; + buff[5] = 0; + buff[6] = 0; + buff[7] = 1; + buff[8] = 105; // i + buff[9] = 50; // 2 + buff[10] = 112; // p + buff[11] = 100; // d + buff[12] = 0; + memcpy(buff+12, host.c_str(), addrsz); + buff[12+addrsz] = 0; + + boost::asio::async_write(conn->GetSocket(), boost::asio::buffer(buff, sz), boost::asio::transfer_all(), [&](const boost::system::error_code & ec, std::size_t written) { + if(ec) + { + LogPrint(eLogError, "NTCP: failed to write handshake to socks proxy ", ec.message()); + return; + } + }); + + boost::asio::async_read(conn->GetSocket(), boost::asio::buffer(readbuff, 8), [&](const boost::system::error_code & e, std::size_t transferred) { + if(transferred == 8 && readbuff[1] == 0x5a) + { + timer->cancel(); + conn->ClientLogin(); + LogPrint(eLogDebug, "NTCP: connected via socks"); + } + else + LogPrint(eLogDebug, "NTCP: connection via socks failed"); + }); + } void NTCPServer::ScheduleTermination () { @@ -1041,19 +1146,18 @@ namespace transport void NTCPServer::HandleTerminationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) - { + { auto ts = i2p::util::GetSecondsSinceEpoch (); // established for (auto& it: m_NTCPSessions) - if (it.second->IsTerminationTimeoutExpired (ts)) + if (it.second->IsTerminationTimeoutExpired (ts)) { auto session = it.second; // Termniate modifies m_NTCPSession, so we postpone it - m_Service.post ([session] - { + m_Service.post ([session] { LogPrint (eLogDebug, "NTCP: No activity for ", session->GetTerminationTimeout (), " seconds"); session->Terminate (); - }); + }); } // pending for (auto it = m_PendingIncomingSessions.begin (); it != m_PendingIncomingSessions.end ();) @@ -1062,15 +1166,15 @@ namespace transport it = m_PendingIncomingSessions.erase (it); // established or terminated else if ((*it)->IsTerminationTimeoutExpired (ts)) { - (*it)->Terminate (); + (*it)->Terminate (); it = m_PendingIncomingSessions.erase (it); // expired - } + } else it++; } - - ScheduleTermination (); - } - } -} -} + + ScheduleTermination (); + } + } +} +} diff --git a/libi2pd/NTCPSession.h b/libi2pd/NTCPSession.h index 9de75d02..e7bd53fb 100644 --- a/libi2pd/NTCPSession.h +++ b/libi2pd/NTCPSession.h @@ -21,8 +21,8 @@ namespace transport { uint8_t pubKey[256]; uint8_t HXxorHI[32]; - }; - + }; + struct NTCPPhase2 { uint8_t pubKey[256]; @@ -31,17 +31,17 @@ namespace transport uint8_t hxy[32]; uint8_t timestamp[4]; uint8_t filler[12]; - } encrypted; - }; + } encrypted; + }; - const size_t NTCP_MAX_MESSAGE_SIZE = 16384; + const size_t NTCP_MAX_MESSAGE_SIZE = 16384; const size_t NTCP_BUFFER_SIZE = 1028; // fits 1 tunnel data message const int NTCP_CONNECT_TIMEOUT = 5; // 5 seconds const int NTCP_ESTABLISH_TIMEOUT = 10; // 10 seconds const int NTCP_TERMINATION_TIMEOUT = 120; // 2 minutes - const int NTCP_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds - const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448 - const int NTCP_CLOCK_SKEW = 60; // in seconds + const int NTCP_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds + const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448 + const int NTCP_CLOCK_SKEW = 60; // in seconds const int NTCP_MAX_OUTGOING_QUEUE_SIZE = 200; // how many messages we can queue up class NTCPServer; @@ -55,13 +55,13 @@ namespace transport void Done (); boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; - bool IsEstablished () const { return m_IsEstablished; }; + bool IsEstablished () const { return m_IsEstablished; }; bool IsTerminated () const { return m_IsTerminated; }; - + void ClientLogin (); void ServerLogin (); void SendI2NPMessages (const std::vector >& msgs); - + private: void PostI2NPMessages (std::vector > msgs); @@ -70,7 +70,7 @@ namespace transport void SetIsEstablished (bool isEstablished) { m_IsEstablished = isEstablished; } void CreateAESKey (uint8_t * pubKey); - + // client void SendPhase3 (); void HandlePhase1Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred); @@ -88,35 +88,35 @@ namespace transport void HandlePhase3ExtraReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB, size_t paddingLen); void HandlePhase3 (uint32_t tsB, size_t paddingLen); void HandlePhase4Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred); - + // common void Receive (); void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); - bool DecryptNextBlock (const uint8_t * encrypted); - + bool DecryptNextBlock (const uint8_t * encrypted); + void Send (std::shared_ptr msg); boost::asio::const_buffers_1 CreateMsgBuffer (std::shared_ptr msg); void Send (const std::vector >& msgs); void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector > msgs); - + private: NTCPServer& m_Server; boost::asio::ip::tcp::socket m_Socket; bool m_IsEstablished, m_IsTerminated; - + i2p::crypto::CBCDecryption m_Decryption; i2p::crypto::CBCEncryption m_Encryption; struct Establisher - { + { NTCPPhase1 phase1; NTCPPhase2 phase2; - } * m_Establisher; - + } * m_Establisher; + i2p::crypto::AESAlignedBuffer m_ReceiveBuffer; i2p::crypto::AESAlignedBuffer<16> m_TimeSyncBuffer; - int m_ReceiveBufferOffset; + int m_ReceiveBufferOffset; std::shared_ptr m_NextMessage; size_t m_NextMessageOffset; @@ -124,7 +124,7 @@ namespace transport bool m_IsSending; std::vector > m_SendQueue; - }; + }; // TODO: move to NTCP.h/.cpp class NTCPServer @@ -140,12 +140,17 @@ namespace transport bool AddNTCPSession (std::shared_ptr session); void RemoveNTCPSession (std::shared_ptr session); std::shared_ptr FindNTCPSession (const i2p::data::IdentHash& ident); - void Connect (const boost::asio::ip::address& address, int port, std::shared_ptr conn); + void ConnectSocks (const std::string& addr, uint16_t port, std::shared_ptr conn); + void Connect(const boost::asio::ip::address & address, uint16_t port, std::shared_ptr conn); + + bool IsBoundV4() const { return m_NTCPAcceptor != nullptr; }; + bool IsBoundV6() const { return m_NTCPV6Acceptor != nullptr; }; + bool NetworkIsReady() const { return IsBoundV4() || IsBoundV6() || m_UseSocks; }; + bool UsingSocksProxy() const { return m_UseSocks; }; + + void UseSocksProxy(const std::string & address, uint16_t port); - bool IsBoundV4() const { return m_NTCPAcceptor != nullptr; }; - bool IsBoundV6() const { return m_NTCPV6Acceptor != nullptr; }; - - boost::asio::io_service& GetService () { return m_Service; }; + boost::asio::io_service& GetService () { return m_Service; }; private: @@ -155,14 +160,16 @@ namespace transport void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn, std::shared_ptr timer); + void HandleSocksConnect(const boost::system::error_code& ecode, std::shared_ptr conn, std::shared_ptr timer, const std::string & host, uint16_t port); + // timer void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode); - - private: + + private: bool m_IsRunning; - std::thread * m_Thread; + std::thread * m_Thread; boost::asio::io_service m_Service; boost::asio::io_service::work m_Work; boost::asio::deadline_timer m_TerminationTimer; @@ -170,12 +177,17 @@ namespace transport std::map > m_NTCPSessions; // access from m_Thread only std::list > m_PendingIncomingSessions; + bool m_UseSocks; + std::string m_SocksAddress; + uint16_t m_SocksPort; + boost::asio::ip::tcp::resolver m_Resolver; + boost::asio::ip::tcp::endpoint * m_SocksEndpoint; public: // for HTTP/I2PControl const decltype(m_NTCPSessions)& GetNTCPSessions () const { return m_NTCPSessions; }; - }; -} -} + }; +} +} #endif diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index ef885e46..4091c413 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -5,6 +5,7 @@ #include "NetDb.hpp" #include "Transports.h" #include "Config.h" +#include "HTTP.h" #ifdef WITH_EVENTS #include "Event.h" #include "util.h" @@ -19,7 +20,7 @@ namespace transport DHKeysPairSupplier::DHKeysPairSupplier (int size): m_QueueSize (size), m_IsRunning (false), m_Thread (nullptr) { - } + } DHKeysPairSupplier::~DHKeysPairSupplier () { @@ -35,13 +36,13 @@ namespace transport void DHKeysPairSupplier::Stop () { m_IsRunning = false; - m_Acquired.notify_one (); + m_Acquired.notify_one (); if (m_Thread) - { - m_Thread->join (); + { + m_Thread->join (); delete m_Thread; m_Thread = 0; - } + } } void DHKeysPairSupplier::Run () @@ -50,7 +51,7 @@ namespace transport { int num, total = 0; while ((num = m_QueueSize - (int)m_Queue.size ()) > 0 && total < 20) - { + { CreateDHKeysPairs (num); total += num; } @@ -63,9 +64,9 @@ namespace transport { std::unique_lock l(m_AcquiredMutex); m_Acquired.wait (l); // wait for element gets aquired - } + } } - } + } void DHKeysPairSupplier::CreateDHKeysPairs (int num) { @@ -91,8 +92,8 @@ namespace transport m_Queue.pop (); m_Acquired.notify_one (); return pair; - } - } + } + } // queue is empty, create new auto pair = std::make_shared (); pair->GenerateKeys (); @@ -106,21 +107,21 @@ namespace transport m_Queue.push (pair); } - Transports transports; - - Transports::Transports (): + Transports transports; + + Transports::Transports (): m_IsOnline (true), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_PeerCleanupTimer (nullptr), m_PeerTestTimer (nullptr), m_NTCPServer (nullptr), m_SSUServer (nullptr), m_DHKeysPairSupplier (5), // 5 pre-generated keys m_TotalSentBytes(0), m_TotalReceivedBytes(0), m_TotalTransitTransmittedBytes (0), - m_InBandwidth (0), m_OutBandwidth (0), m_TransitBandwidth(0), + m_InBandwidth (0), m_OutBandwidth (0), m_TransitBandwidth(0), m_LastInBandwidthUpdateBytes (0), m_LastOutBandwidthUpdateBytes (0), - m_LastTransitBandwidthUpdateBytes (0), m_LastBandwidthUpdateTime (0) - { + m_LastTransitBandwidthUpdateBytes (0), m_LastBandwidthUpdateTime (0) + { } - - Transports::~Transports () - { + + Transports::~Transports () + { Stop (); if (m_Service) { @@ -128,8 +129,8 @@ namespace transport delete m_PeerTestTimer; m_PeerTestTimer = nullptr; delete m_Work; m_Work = nullptr; delete m_Service; m_Service = nullptr; - } - } + } + } void Transports::Start (bool enableNTCP, bool enableSSU) { @@ -144,6 +145,34 @@ namespace transport m_DHKeysPairSupplier.Start (); m_IsRunning = true; m_Thread = new std::thread (std::bind (&Transports::Run, this)); + std::string ntcpproxy; i2p::config::GetOption("ntcpproxy", ntcpproxy); + i2p::http::URL proxyurl; + if(ntcpproxy.size() && enableNTCP) + { + if(proxyurl.parse(ntcpproxy)) + { + if(proxyurl.schema == "socks") + { + m_NTCPServer = new NTCPServer(); + m_NTCPServer->UseSocksProxy(proxyurl.host, proxyurl.port) ; + m_NTCPServer->Start(); + if(!m_NTCPServer->NetworkIsReady()) + { + LogPrint(eLogError, "Transports: NTCP failed to start with socks proxy"); + m_NTCPServer->Stop(); + delete m_NTCPServer; + m_NTCPServer = nullptr; + } + } + else + LogPrint(eLogError, "Transports: unsupported NTCP proxy URL ", ntcpproxy); + } + else + LogPrint(eLogError, "Transports: invalid NTCP proxy url ", ntcpproxy); + + return; + } + // create acceptors auto& addresses = context.GetRouterInfo ().GetAddresses (); for (const auto& address : addresses) @@ -160,8 +189,8 @@ namespace transport delete m_NTCPServer; m_NTCPServer = nullptr; } - } - + } + if (address->transportStyle == RouterInfo::eTransportSSU) { if (m_SSUServer == nullptr && enableSSU) @@ -184,16 +213,16 @@ namespace transport else LogPrint (eLogError, "Transports: SSU server already exists"); } - } + } m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT)); m_PeerCleanupTimer->async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); m_PeerTestTimer->expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL)); m_PeerTestTimer->async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1)); } - + void Transports::Stop () - { - if (m_PeerCleanupTimer) m_PeerCleanupTimer->cancel (); + { + if (m_PeerCleanupTimer) m_PeerCleanupTimer->cancel (); if (m_PeerTestTimer) m_PeerTestTimer->cancel (); m_Peers.clear (); if (m_SSUServer) @@ -201,40 +230,40 @@ namespace transport m_SSUServer->Stop (); delete m_SSUServer; m_SSUServer = nullptr; - } + } if (m_NTCPServer) { m_NTCPServer->Stop (); delete m_NTCPServer; m_NTCPServer = nullptr; - } + } m_DHKeysPairSupplier.Stop (); m_IsRunning = false; if (m_Service) m_Service->stop (); if (m_Thread) - { - m_Thread->join (); + { + m_Thread->join (); delete m_Thread; m_Thread = nullptr; - } - } + } + } - void Transports::Run () - { + void Transports::Run () + { while (m_IsRunning && m_Service) { try - { + { m_Service->run (); } catch (std::exception& ex) { LogPrint (eLogError, "Transports: runtime exception: ", ex.what ()); - } - } + } + } } - + void Transports::UpdateBandwidth () { uint64_t ts = i2p::util::GetMillisecondsSinceEpoch (); @@ -243,15 +272,15 @@ namespace transport auto delta = ts - m_LastBandwidthUpdateTime; if (delta > 0) { - m_InBandwidth = (m_TotalReceivedBytes - m_LastInBandwidthUpdateBytes)*1000/delta; // per second - m_OutBandwidth = (m_TotalSentBytes - m_LastOutBandwidthUpdateBytes)*1000/delta; // per second + m_InBandwidth = (m_TotalReceivedBytes - m_LastInBandwidthUpdateBytes)*1000/delta; // per second + m_OutBandwidth = (m_TotalSentBytes - m_LastOutBandwidthUpdateBytes)*1000/delta; // per second m_TransitBandwidth = (m_TotalTransitTransmittedBytes - m_LastTransitBandwidthUpdateBytes)*1000/delta; - } + } } m_LastBandwidthUpdateTime = ts; - m_LastInBandwidthUpdateBytes = m_TotalReceivedBytes; - m_LastOutBandwidthUpdateBytes = m_TotalSentBytes; - m_LastTransitBandwidthUpdateBytes = m_TotalTransitTransmittedBytes; + m_LastInBandwidthUpdateBytes = m_TotalReceivedBytes; + m_LastOutBandwidthUpdateBytes = m_TotalSentBytes; + m_LastTransitBandwidthUpdateBytes = m_TotalTransitTransmittedBytes; } bool Transports::IsBandwidthExceeded () const @@ -265,12 +294,12 @@ namespace transport { auto limit = i2p::context.GetTransitBandwidthLimit() * 1024; // convert to bytes return m_TransitBandwidth > limit; - } + } void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg) { - SendMessages (ident, std::vector > {msg }); - } + SendMessages (ident, std::vector > {msg }); + } void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs) { @@ -278,12 +307,12 @@ namespace transport QueueIntEvent("transport.send", ident.ToBase64(), msgs.size()); #endif m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs)); - } + } void Transports::PostMessages (i2p::data::IdentHash ident, std::vector > msgs) { if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) - { + { // we send it to ourself for (auto& it: msgs) m_LoopbackHandler.PutNextMessage (it); @@ -294,12 +323,12 @@ namespace transport auto it = m_Peers.find (ident); if (it == m_Peers.end ()) { - bool connected = false; + bool connected = false; try { auto r = netdb.FindRouter (ident); { - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); it = m_Peers.insert (std::pair(ident, { 0, r, {}, i2p::util::GetSecondsSinceEpoch (), {} })).first; } @@ -310,29 +339,29 @@ namespace transport LogPrint (eLogError, "Transports: PostMessages exception:", ex.what ()); } if (!connected) return; - } + } if (!it->second.sessions.empty ()) it->second.sessions.front ()->SendI2NPMessages (msgs); else - { + { if (it->second.delayedMessages.size () < MAX_NUM_DELAYED_MESSAGES) - { + { for (auto& it1: msgs) it->second.delayedMessages.push_back (it1); } else { LogPrint (eLogWarning, "Transports: delayed messages queue size exceeds ", MAX_NUM_DELAYED_MESSAGES); - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); m_Peers.erase (it); - } - } - } - + } + } + } + bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer) { if (peer.router) // we have RI already - { + { if (!peer.numAttempts) // NTCP { peer.numAttempts++; @@ -343,14 +372,20 @@ namespace transport if (!address->host.is_unspecified ()) // we have address now #else boost::system::error_code ecode; - address->host.to_string (ecode); + address->host.to_string (ecode); if (!ecode) #endif { if (!peer.router->UsesIntroducer () && !peer.router->IsUnreachable ()) - { + { auto s = std::make_shared (*m_NTCPServer, peer.router); - m_NTCPServer->Connect (address->host, address->port, s); + if(m_NTCPServer->UsingSocksProxy()) + { + std::string addr = address->host.to_string(); + m_NTCPServer->ConnectSocks(addr, address->port, s); + } + else + m_NTCPServer->Connect (address->host, address->port, s); return true; } } @@ -358,12 +393,20 @@ namespace transport { if (address->addressString.length () > 0) // trying to resolve { - LogPrint (eLogDebug, "Transports: Resolving NTCP ", address->addressString); - NTCPResolve (address->addressString, ident); + if(m_NTCPServer->UsingSocksProxy()) + { + auto s = std::make_shared (*m_NTCPServer, peer.router); + m_NTCPServer->ConnectSocks(address->addressString, address->port, s); + } + else + { + LogPrint (eLogDebug, "Transports: Resolving NTCP ", address->addressString); + NTCPResolve (address->addressString, ident); + } return true; } } - } + } else LogPrint (eLogDebug, "Transports: NTCP address is not present for ", i2p::data::GetIdentHashAbbreviation (ident), ", trying SSU"); } @@ -394,56 +437,56 @@ namespace transport } } } - } + } LogPrint (eLogInfo, "Transports: No NTCP or SSU addresses available"); peer.Done (); - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); m_Peers.erase (ident); return false; - } + } else // otherwise request RI { LogPrint (eLogInfo, "Transports: RouterInfo for ", ident.ToBase64 (), " not found, requested"); i2p::data::netdb.RequestDestination (ident, std::bind ( &Transports::RequestComplete, this, std::placeholders::_1, ident)); - } + } return true; - } - + } + void Transports::RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident) { m_Service->post (std::bind (&Transports::HandleRequestComplete, this, r, ident)); - } - + } + void Transports::HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident) { auto it = m_Peers.find (ident); if (it != m_Peers.end ()) - { + { if (r) { LogPrint (eLogDebug, "Transports: RouterInfo for ", ident.ToBase64 (), " found, Trying to connect"); it->second.router = r; ConnectToPeer (ident, it->second); - } + } else { LogPrint (eLogWarning, "Transports: RouterInfo not found, Failed to send messages"); - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); m_Peers.erase (it); - } - } - } + } + } + } void Transports::NTCPResolve (const std::string& addr, const i2p::data::IdentHash& ident) { auto resolver = std::make_shared(*m_Service); - resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""), - std::bind (&Transports::HandleNTCPResolve, this, + resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""), + std::bind (&Transports::HandleNTCPResolve, this, std::placeholders::_1, std::placeholders::_2, ident, resolver)); } - void Transports::HandleNTCPResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, + void Transports::HandleNTCPResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, i2p::data::IdentHash ident, std::shared_ptr resolver) { auto it1 = m_Peers.find (ident); @@ -453,7 +496,7 @@ namespace transport if (!ecode && peer.router) { while (it != boost::asio::ip::tcp::resolver::iterator()) - { + { auto address = (*it).endpoint ().address (); LogPrint (eLogDebug, "Transports: ", (*it).host_name (), " has been resolved to ", address); if (address.is_v4 () || context.SupportsV6 ()) @@ -466,14 +509,14 @@ namespace transport return; } break; - } + } else LogPrint (eLogInfo, "Transports: NTCP ", address, " is not supported"); it++; - } + } } LogPrint (eLogError, "Transports: Unable to resolve NTCP address: ", ecode.message ()); - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); m_Peers.erase (it1); } } @@ -481,12 +524,12 @@ namespace transport void Transports::SSUResolve (const std::string& addr, const i2p::data::IdentHash& ident) { auto resolver = std::make_shared(*m_Service); - resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""), - std::bind (&Transports::HandleSSUResolve, this, + resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""), + std::bind (&Transports::HandleSSUResolve, this, std::placeholders::_1, std::placeholders::_2, ident, resolver)); } - void Transports::HandleSSUResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, + void Transports::HandleSSUResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, i2p::data::IdentHash ident, std::shared_ptr resolver) { auto it1 = m_Peers.find (ident); @@ -496,7 +539,7 @@ namespace transport if (!ecode && peer.router) { while (it != boost::asio::ip::tcp::resolver::iterator()) - { + { auto address = (*it).endpoint ().address (); LogPrint (eLogDebug, "Transports: ", (*it).host_name (), " has been resolved to ", address); if (address.is_v4 () || context.SupportsV6 ()) @@ -512,10 +555,10 @@ namespace transport else LogPrint (eLogInfo, "Transports: SSU ", address, " is not supported"); it++; - } + } } LogPrint (eLogError, "Transports: Unable to resolve SSU address: ", ecode.message ()); - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); m_Peers.erase (it1); } } @@ -523,14 +566,14 @@ namespace transport void Transports::CloseSession (std::shared_ptr router) { if (!router) return; - m_Service->post (std::bind (&Transports::PostCloseSession, this, router)); - } + m_Service->post (std::bind (&Transports::PostCloseSession, this, router)); + } void Transports::PostCloseSession (std::shared_ptr router) { auto ssuSession = m_SSUServer ? m_SSUServer->FindSession (router) : nullptr; if (ssuSession) // try SSU first - { + { m_SSUServer->DeleteSession (ssuSession); LogPrint (eLogDebug, "Transports: SSU session closed"); } @@ -540,8 +583,8 @@ namespace transport ntcpSession->Terminate (); LogPrint(eLogDebug, "Transports: NTCP session closed"); } - } - + } + void Transports::DetectExternalIP () { if (RoutesRestricted()) @@ -560,7 +603,7 @@ namespace transport { auto router = i2p::data::netdb.GetRandomPeerTestRouter (isv4); // v4 only if v4 if (router) - m_SSUServer->CreateSession (router, true, isv4); // peer test + m_SSUServer->CreateSession (router, true, isv4); // peer test else { // if not peer test capable routers found pick any @@ -568,7 +611,7 @@ namespace transport if (router && router->IsSSU ()) m_SSUServer->CreateSession (router); // no peer test } - } + } } else LogPrint (eLogError, "Transports: Can't detect external IP. SSU is not available"); @@ -578,26 +621,26 @@ namespace transport { if (RoutesRestricted() || !i2p::context.SupportsV4 ()) return; if (m_SSUServer) - { + { bool statusChanged = false; for (int i = 0; i < 5; i++) { auto router = i2p::data::netdb.GetRandomPeerTestRouter (true); // v4 only if (router) - { + { if (!statusChanged) - { + { statusChanged = true; i2p::context.SetStatus (eRouterStatusTesting); // first time only - } - m_SSUServer->CreateSession (router, true, true); // peer test v4 - } + } + m_SSUServer->CreateSession (router, true, true); // peer test v4 + } } if (!statusChanged) - LogPrint (eLogWarning, "Can't find routers for peer test"); + LogPrint (eLogWarning, "Can't find routers for peer test"); } - } - + } + std::shared_ptr Transports::GetNextDHKeysPair () { return m_DHKeysPairSupplier.Acquire (); @@ -611,8 +654,8 @@ namespace transport void Transports::PeerConnected (std::shared_ptr session) { m_Service->post([session, this]() - { - auto remoteIdentity = session->GetRemoteIdentity (); + { + auto remoteIdentity = session->GetRemoteIdentity (); if (!remoteIdentity) return; auto ident = remoteIdentity->GetIdentHash (); auto it = m_Peers.find (ident); @@ -629,7 +672,7 @@ namespace transport if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore && i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ()) sendDatabaseStore = false; // we have it in the list already - } + } if (sendDatabaseStore) session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); else @@ -650,17 +693,17 @@ namespace transport EmitEvent({{"type" , "transport.connected"}, {"ident", ident.ToBase64()}, {"inbound", "true"}}); #endif session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); m_Peers.insert (std::make_pair (ident, Peer{ 0, nullptr, { session }, i2p::util::GetSecondsSinceEpoch (), {} })); } }); } - + void Transports::PeerDisconnected (std::shared_ptr session) { m_Service->post([session, this]() { - auto remoteIdentity = session->GetRemoteIdentity (); + auto remoteIdentity = session->GetRemoteIdentity (); if (!remoteIdentity) return; auto ident = remoteIdentity->GetIdentHash (); #ifdef WITH_EVENTS @@ -671,26 +714,26 @@ namespace transport { it->second.sessions.remove (session); if (it->second.sessions.empty ()) // TODO: why? - { + { if (it->second.delayedMessages.size () > 0) ConnectToPeer (ident, it->second); else { - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); m_Peers.erase (it); } } } - }); - } + }); + } bool Transports::IsConnected (const i2p::data::IdentHash& ident) const - { - std::unique_lock l(m_PeersMutex); + { + std::unique_lock l(m_PeersMutex); auto it = m_Peers.find (ident); return it != m_Peers.end (); - } - + } + void Transports::HandlePeerCleanupTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) @@ -707,7 +750,7 @@ namespace transport profile->TunnelNonReplied(); profile->Save(it->first); } - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); it = m_Peers.erase (it); } else @@ -718,7 +761,7 @@ namespace transport DetectExternalIP (); m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT)); m_PeerCleanupTimer->async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); - } + } } void Transports::HandlePeerTestTimer (const boost::system::error_code& ecode) @@ -728,15 +771,15 @@ namespace transport PeerTest (); m_PeerTestTimer->expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL)); m_PeerTestTimer->async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1)); - } - } - + } + } + std::shared_ptr Transports::GetRandomPeer () const { if (m_Peers.empty ()) return nullptr; - std::unique_lock l(m_PeersMutex); + std::unique_lock l(m_PeersMutex); auto it = m_Peers.begin (); - std::advance (it, rand () % m_Peers.size ()); + std::advance (it, rand () % m_Peers.size ()); return it != m_Peers.end () ? it->second.router : nullptr; } void Transports::RestrictRoutesToFamilies(std::set families) @@ -754,7 +797,7 @@ namespace transport for (const auto & ri : routers ) m_TrustedRouters.push_back(ri); } - + bool Transports::RoutesRestricted() const { std::unique_lock famlock(m_FamilyMutex); std::unique_lock routerslock(m_TrustedRoutersMutex); @@ -814,4 +857,3 @@ namespace transport } } } - diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index b05f1eab..0a7a9310 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -45,7 +45,7 @@ namespace transport std::queue > m_Queue; bool m_IsRunning; - std::thread * m_Thread; + std::thread * m_Thread; std::condition_variable m_Acquired; std::mutex m_AcquiredMutex; }; @@ -62,12 +62,12 @@ namespace transport { for (auto& it: sessions) it->Done (); - } - }; - + } + }; + const size_t SESSION_CREATION_TIMEOUT = 10; // in seconds const int PEER_TEST_INTERVAL = 71; // in minutes - const int MAX_NUM_DELAYED_MESSAGES = 50; + const int MAX_NUM_DELAYED_MESSAGES = 50; class Transports { public: @@ -80,12 +80,12 @@ namespace transport bool IsBoundNTCP() const { return m_NTCPServer != nullptr; } bool IsBoundSSU() const { return m_SSUServer != nullptr; } - + bool IsOnline() const { return m_IsOnline; }; void SetOnline (bool online) { m_IsOnline = online; }; boost::asio::io_service& GetService () { return *m_Service; }; - std::shared_ptr GetNextDHKeysPair (); + std::shared_ptr GetNextDHKeysPair (); void ReuseDHKeysPair (std::shared_ptr pair); void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg); @@ -95,7 +95,7 @@ namespace transport void PeerConnected (std::shared_ptr session); void PeerDisconnected (std::shared_ptr session); bool IsConnected (const i2p::data::IdentHash& ident) const; - + void UpdateSentBytes (uint64_t numBytes) { m_TotalSentBytes += numBytes; }; void UpdateReceivedBytes (uint64_t numBytes) { m_TotalReceivedBytes += numBytes; }; uint64_t GetTotalSentBytes () const { return m_TotalSentBytes; }; @@ -113,16 +113,16 @@ namespace transport /** get a trusted first hop for restricted routes */ std::shared_ptr GetRestrictedPeer() const; /** do we want to use restricted routes? */ - bool RoutesRestricted() const; + bool RoutesRestricted() const; /** restrict routes to use only these router families for first hops */ void RestrictRoutesToFamilies(std::set families); /** restrict routes to use only these routers for first hops */ void RestrictRoutesToRouters(std::set routers); bool IsRestrictedPeer(const i2p::data::IdentHash & ident) const; - + void PeerTest (); - + private: void Run (); @@ -131,9 +131,9 @@ namespace transport void PostMessages (i2p::data::IdentHash ident, std::vector > msgs); void PostCloseSession (std::shared_ptr router); bool ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer); - void HandlePeerCleanupTimer (const boost::system::error_code& ecode); + void HandlePeerCleanupTimer (const boost::system::error_code& ecode); void HandlePeerTestTimer (const boost::system::error_code& ecode); - + void NTCPResolve (const std::string& addr, const i2p::data::IdentHash& ident); void HandleNTCPResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, i2p::data::IdentHash ident, std::shared_ptr resolver); @@ -143,11 +143,11 @@ namespace transport void UpdateBandwidth (); void DetectExternalIP (); - + private: bool m_IsOnline, m_IsRunning; - std::thread * m_Thread; + std::thread * m_Thread; boost::asio::io_service * m_Service; boost::asio::io_service::work * m_Work; boost::asio::deadline_timer * m_PeerCleanupTimer, * m_PeerTestTimer; @@ -156,13 +156,13 @@ namespace transport SSUServer * m_SSUServer; mutable std::mutex m_PeersMutex; std::map m_Peers; - + DHKeysPairSupplier m_DHKeysPairSupplier; std::atomic m_TotalSentBytes, m_TotalReceivedBytes, m_TotalTransitTransmittedBytes; uint32_t m_InBandwidth, m_OutBandwidth, m_TransitBandwidth; // bytes per second - uint64_t m_LastInBandwidthUpdateBytes, m_LastOutBandwidthUpdateBytes, m_LastTransitBandwidthUpdateBytes; - uint64_t m_LastBandwidthUpdateTime; + uint64_t m_LastInBandwidthUpdateBytes, m_LastOutBandwidthUpdateBytes, m_LastTransitBandwidthUpdateBytes; + uint64_t m_LastBandwidthUpdateTime; /** which router families to trust for first hops */ std::vector m_TrustedFamilies; @@ -172,18 +172,18 @@ namespace transport std::vector m_TrustedRouters; mutable std::mutex m_TrustedRoutersMutex; - i2p::I2NPMessagesHandler m_LoopbackHandler; - + i2p::I2NPMessagesHandler m_LoopbackHandler; + public: // for HTTP only const NTCPServer * GetNTCPServer () const { return m_NTCPServer; }; const SSUServer * GetSSUServer () const { return m_SSUServer; }; const decltype(m_Peers)& GetPeers () const { return m_Peers; }; - }; + }; extern Transports transports; -} +} } #endif