From aa3723d2bdeb637554c4f7cb9376c5c0a1141a7c Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 21 Aug 2016 15:02:17 -0400 Subject: [PATCH] udp tunnels --- ClientContext.cpp | 105 +++++++++++++++++++++++++++----- ClientContext.h | 20 ++++++- I2PTunnel.cpp | 148 ++++++++++++++++++++++++++++++++++++++++++++++ I2PTunnel.h | 78 +++++++++++++++++++++++- Identity.h | 2 + Tag.h | 5 ++ 6 files changed, 338 insertions(+), 20 deletions(-) diff --git a/ClientContext.cpp b/ClientContext.cpp index 2c917025..68115402 100644 --- a/ClientContext.cpp +++ b/ClientContext.cpp @@ -17,7 +17,8 @@ namespace client ClientContext::ClientContext (): m_SharedLocalDestination (nullptr), m_HttpProxy (nullptr), m_SocksProxy (nullptr), m_SamBridge (nullptr), - m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr) + m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr), + m_CleanupUDPTimer(m_Service, boost::posix_time::seconds(1)) { } @@ -39,6 +40,13 @@ namespace client m_SharedLocalDestination->Start (); } + if ( m_ServiceThread == nullptr ) { + m_ServiceThread = new std::thread([&] () { + m_Service.run(); + }); + ScheduleCleanupUDP(); + } + m_AddressBook.Start (); std::shared_ptr localDestination; @@ -195,11 +203,26 @@ namespace client } LogPrint(eLogInfo, "Clients: stopping AddressBook"); - m_AddressBook.Stop (); + m_AddressBook.Stop (); + + { + std::lock_guard lock(m_ForwardsMutex); + m_ServerForwards.clear(); + m_ClientForwards.clear(); + } + + for (auto& it: m_Destinations) it.second->Stop (); m_Destinations.clear (); - m_SharedLocalDestination = nullptr; + m_SharedLocalDestination = nullptr; + // stop io service thread + if(m_ServiceThread) { + m_Service.stop(); + m_ServiceThread->join(); + delete m_ServiceThread; + m_ServiceThread = nullptr; + } } void ClientContext::ReloadConfig () @@ -349,7 +372,7 @@ namespace client try { std::string type = section.second.get (I2P_TUNNELS_SECTION_TYPE); - if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT) + if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT || type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { // mandatory params std::string dest = section.second.get (I2P_CLIENT_TUNNEL_DESTINATION); @@ -374,17 +397,34 @@ namespace client localDestination = CreateNewLocalDestination (k, false, &options); } } - auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); - if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), - std::unique_ptr(clientTunnel))).second) - { - clientTunnel->Start (); - numClientTunnels++; - } - else - LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); + if (type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { + // udp client + // TODO: ip6 and hostnames + boost::asio::ip::udp::endpoint end(boost::asio::ip::address::from_string(address), port); + if(destinationPort == 0) { + destinationPort = port; + } + auto clientTunnel = new I2PUDPClientTunnel(name, dest, end, localDestination, destinationPort, m_Service); + if(m_ClientForwards.insert(std::make_pair(end, std::unique_ptr(clientTunnel))).second) { + clientTunnel->Start(); + } else { + LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists"); + delete clientTunnel; + } + } else { + // tcp client + auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); + if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), + std::unique_ptr(clientTunnel))).second) + { + clientTunnel->Start (); + numClientTunnels++; + } + else + LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); + } } - else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC) + else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) { // mandatory params std::string host = section.second.get (I2P_SERVER_TUNNEL_HOST); @@ -411,6 +451,25 @@ namespace client if (!localDestination) localDestination = CreateNewLocalDestination (k, true, &options); + if (type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) { + // udp server tunnel + // TODO: ipv6 and hostnames + boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port); + I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, endpoint, port, m_Service); + std::lock_guard lock(m_ForwardsMutex); + if(m_ServerForwards.insert( + std::make_pair( + std::make_pair( + localDestination->GetIdentHash(), port), + std::unique_ptr(serverTunnel))).second) { + LogPrint(eLogInfo, "Cleints: I2P Server Forward created for UDP Endpoint ", host, ":", port); + } else { + LogPrint(eLogError, "Clients: I2P Server Forward for destination/port ", m_AddressBook.ToAddress(localDestination->GetIdentHash()), "/", port, "already exists"); + delete serverTunnel; + } + continue; + } + I2PServerTunnel * serverTunnel; if (type == I2P_TUNNELS_SECTION_TYPE_HTTP) serverTunnel = new I2PServerTunnelHTTP (name, host, port, localDestination, hostOverride, inPort, gzip); @@ -460,6 +519,22 @@ namespace client } LogPrint (eLogInfo, "Clients: ", numClientTunnels, " I2P client tunnels created"); LogPrint (eLogInfo, "Clients: ", numServerTunnels, " I2P server tunnels created"); - } + } + void ClientContext::ScheduleCleanupUDP() + { + // schedule cleanup in 1 second + m_CleanupUDPTimer.expires_at(m_CleanupUDPTimer.expires_at() + boost::posix_time::seconds(1)); + m_CleanupUDPTimer.async_wait(std::bind(&ClientContext::CleanupUDP, this, std::placeholders::_1)); + } + + void ClientContext::CleanupUDP(const boost::system::error_code & ecode) + { + if(!ecode) { + std::lock_guard lock(m_ForwardsMutex); + for ( auto & s : m_ServerForwards ) { + s.second->ExpireStale(); + } + } + } } } diff --git a/ClientContext.h b/ClientContext.h index 55176b91..5a054f29 100644 --- a/ClientContext.h +++ b/ClientContext.h @@ -24,6 +24,8 @@ namespace client const char I2P_TUNNELS_SECTION_TYPE_SERVER[] = "server"; const char I2P_TUNNELS_SECTION_TYPE_HTTP[] = "http"; const char I2P_TUNNELS_SECTION_TYPE_IRC[] = "irc"; + const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient"; + const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver"; const char I2P_CLIENT_TUNNEL_PORT[] = "port"; const char I2P_CLIENT_TUNNEL_ADDRESS[] = "address"; const char I2P_CLIENT_TUNNEL_DESTINATION[] = "destination"; @@ -72,7 +74,10 @@ namespace client template void ReadI2CPOptions (const Section& section, std::map& options) const; - private: + void CleanupUDP(const boost::system::error_code & ecode); + void ScheduleCleanupUDP(); + + private: std::mutex m_DestinationsMutex; std::map > m_Destinations; @@ -84,10 +89,21 @@ namespace client i2p::proxy::SOCKSProxy * m_SocksProxy; std::map > m_ClientTunnels; // local endpoint->tunnel std::map, std::unique_ptr > m_ServerTunnels; // ->tunnel - SAMBridge * m_SamBridge; + + std::mutex m_ForwardsMutex; + + std::map > m_ClientForwards; // local endpoint -> udp tunnel + std::map, std::unique_ptr > m_ServerForwards; // -> udp tunnel + + SAMBridge * m_SamBridge; BOBCommandChannel * m_BOBCommandChannel; I2CPServer * m_I2CPServer; + boost::asio::io_service m_Service; + std::thread * m_ServiceThread; + + boost::asio::deadline_timer m_CleanupUDPTimer; + public: // for HTTP const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; }; diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 87764a84..e639c12a 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -511,5 +511,153 @@ namespace client conn->Connect (); } + void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + std::lock_guard lock(m_SessionsMutex); + auto & session = ObtainUDPSession(from, toPort, fromPort); + + session.IPSocket.send_to(boost::asio::buffer(buf, len), m_Endpoint); + session.LastActivity = i2p::util::GetMillisecondsSinceEpoch(); + + } + + void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) { + std::lock_guard lock(m_SessionsMutex); + uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); + std::remove_if(m_Sessions.begin(), m_Sessions.end(), [now, delta](const UDPSession & u) -> bool { + return now - u.LastActivity >= delta; + }); + } + + UDPSession & I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) + { + auto ih = from.GetIdentHash(); + for ( UDPSession & s : m_Sessions ) { + if ( s.Identity == ih) { + /** found existing */ + return s; + } + } + /** create new */ + m_Sessions.push_back(UDPSession(m_Service, m_Destination, m_Endpoint, ih, localPort, remotePort)); + return m_Sessions.back(); + } + + UDPSession::UDPSession(boost::asio::io_service & ios, std::shared_ptr localDestination, boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash from, uint16_t ourPort, uint16_t theirPort) : + Destination(localDestination), + IPSocket(ios, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)), + Identity(from), + ExpectedEndpoint(endpoint), + LocalPort(ourPort), + RemotePort(theirPort) + { + Receive(); + } + + + void UDPSession::Receive() { + IPSocket.async_receive_from(boost::asio::buffer(m_Buffer, I2P_UDP_MAX_MTU), FromEndpoint, std::bind(&UDPSession::HandleReceived, this, std::placeholders::_1, std::placeholders::_2)); + } + + void UDPSession::HandleReceived(const boost::system::error_code & ecode, std::size_t len) + { + if(!ecode) { + i2p::datagram::DatagramDestination * dgram = Destination->GetDatagramDestination(); + if(dgram) { + LastActivity = i2p::util::GetMillisecondsSinceEpoch(); + dgram->SendDatagramTo(m_Buffer, len, Identity, LocalPort, RemotePort); + } + Receive(); + } + } + + I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, boost::asio::ip::udp::endpoint forwardTo, uint16_t port, boost::asio::io_service & service) : + LocalPort(port), + m_Endpoint(forwardTo), + m_Service(service), + m_Destination(localDestination) + { + i2p::datagram::DatagramDestination * dgram = m_Destination->CreateDatagramDestination(); + if(dgram) + dgram->SetReceiver(std::bind(&I2PUDPServerTunnel::HandleRecvFromI2P, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5), LocalPort); + } + + I2PUDPServerTunnel::~I2PUDPServerTunnel() + { + i2p::datagram::DatagramDestination * dgram = m_Destination->GetDatagramDestination(); + if (dgram) { + dgram->ResetReceiver(LocalPort); + } + } + + I2PUDPClientTunnel::I2PUDPClientTunnel(const std::string & name, const std::string &remoteDest, boost::asio::ip::udp::endpoint localEndpoint, std::shared_ptr localDestination, uint16_t remotePort, boost::asio::io_service & service) : + m_Session(nullptr), + m_RemoteDest(remoteDest), + m_RemoteIdent(nullptr), + m_LocalDest(localDestination), + m_LocalEndpoint(localEndpoint), + m_ResolveThread(nullptr), + m_Service(service), + LocalPort(localEndpoint.port()), + RemotePort(remotePort), + m_cancel_resolve(false) + { + + } + + + + void I2PUDPClientTunnel::Start() { + if (m_ResolveThread == nullptr) + m_ResolveThread = new std::thread(std::bind(&I2PUDPClientTunnel::TryResolving, this)); + } + + void I2PUDPClientTunnel::TryResolving() { + LogPrint(eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest); + m_RemoteIdent = new i2p::data::IdentHash; + m_RemoteIdent->Fill(0); + while(!context.GetAddressBook().GetIdentHash(m_RemoteDest, *m_RemoteIdent) && !m_cancel_resolve) { + LogPrint(eLogWarning, "UDP Tunnel: failed to lookup ", m_RemoteDest); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + if(m_cancel_resolve) { + LogPrint(eLogError, "UDP Tunnel: lookup of ", m_RemoteDest, " was cancelled"); + return; + } + // delete existing session + if(m_Session) delete m_Session; + m_Session = new UDPSession(m_Service, m_LocalDest, m_LocalEndpoint, *m_RemoteIdent, LocalPort, RemotePort); + auto dgram = m_LocalDest->CreateDatagramDestination(); + dgram->SetReceiver(std::bind(&I2PUDPClientTunnel::HandleRecvFromI2P, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4, + std::placeholders::_5), LocalPort); + } + + void I2PUDPClientTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + if(m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent) { + // address match + if(m_Session) { + // tell session + m_Session->IPSocket.send_to(boost::asio::buffer(buf, len), m_LocalEndpoint); + } + } + } + + I2PUDPClientTunnel::~I2PUDPClientTunnel() { + auto dgram = m_LocalDest->GetDatagramDestination(); + if (dgram) { + dgram->ResetReceiver(LocalPort); + } + if (m_Session) delete m_Session; + m_cancel_resolve = true; + if(m_ResolveThread) { + m_ResolveThread->join(); + delete m_ResolveThread; + m_ResolveThread = nullptr; + } + if (m_RemoteIdent) delete m_RemoteIdent; + } } } diff --git a/I2PTunnel.h b/I2PTunnel.h index f0332722..232ed2ef 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -9,6 +9,7 @@ #include #include "Identity.h" #include "Destination.h" +#include "Datagram.h" #include "Streaming.h" #include "I2PService.h" @@ -128,8 +129,79 @@ namespace client std::string m_Name, m_Destination; const i2p::data::IdentHash * m_DestinationIdentHash; int m_DestinationPort; - }; + }; + + /** 2 minute timeout for udp sessions */ + const uint64_t I2P_UDP_SESSION_TIMEOUT = 1000 * 60 * 2; + + /** max size for i2p udp */ + const size_t I2P_UDP_MAX_MTU = i2p::datagram::MAX_DATAGRAM_SIZE; + + struct UDPSession + { + std::shared_ptr Destination; + boost::asio::ip::udp::socket IPSocket; + i2p::data::IdentHash Identity; + boost::asio::ip::udp::endpoint ExpectedEndpoint; + boost::asio::ip::udp::endpoint FromEndpoint; + uint64_t LastActivity; + + uint16_t LocalPort; + uint16_t RemotePort; + + uint8_t m_Buffer[I2P_UDP_MAX_MTU]; + + UDPSession(boost::asio::io_service & ios, std::shared_ptr localDestination, boost::asio::ip::udp::endpoint remote, const i2p::data::IdentHash ident, uint16_t ourPort, uint16_t theirPort); + + void HandleReceived(const boost::system::error_code & ecode, std::size_t len); + void Receive(); + + }; + + /** server side udp tunnel, many i2p inbound to 1 ip outbound */ + class I2PUDPServerTunnel + { + public: + I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, boost::asio::ip::udp::endpoint forwardTo, uint16_t port, boost::asio::io_service & service); + ~I2PUDPServerTunnel(); + /** expire stale udp conversations */ + void ExpireStale(const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); + + private: + void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + UDPSession & ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); + private: + const uint16_t LocalPort; + boost::asio::ip::udp::endpoint m_Endpoint; + std::mutex m_SessionsMutex; + std::vector m_Sessions; + boost::asio::io_service & m_Service; + std::shared_ptr m_Destination; + uint8_t m_Buffer[I2P_UDP_MAX_MTU]; + }; + + class I2PUDPClientTunnel + { + public: + I2PUDPClientTunnel(const std::string & name, const std::string &remoteDest, boost::asio::ip::udp::endpoint localEndpoint, std::shared_ptr localDestination, uint16_t remotePort, boost::asio::io_service & service); + ~I2PUDPClientTunnel(); + void Start(); + private: + void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + void TryResolving(); + UDPSession * m_Session; + const std::string m_RemoteDest; + std::shared_ptr m_LocalDest; + i2p::data::IdentHash * m_RemoteIdent; + const boost::asio::ip::udp::endpoint m_LocalEndpoint; + std::thread * m_ResolveThread; + boost::asio::io_service & m_Service; + uint16_t LocalPort; + uint16_t RemotePort; + bool m_cancel_resolve; + }; + class I2PServerTunnel: public I2PService { public: @@ -150,7 +222,7 @@ namespace client const char* GetName() { return m_Name.c_str (); } void SetMaxConnsPerMinute(const uint32_t conns) { m_PortDestination->SetMaxConnsPerMinute(conns); } - + private: void HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, @@ -167,7 +239,7 @@ namespace client boost::asio::ip::tcp::endpoint m_Endpoint; std::shared_ptr m_PortDestination; std::set m_AccessList; - bool m_IsAccessList; + bool m_IsAccessList; }; class I2PServerTunnelHTTP: public I2PServerTunnel diff --git a/Identity.h b/Identity.h index d9fb7761..8f3e9e9d 100644 --- a/Identity.h +++ b/Identity.h @@ -92,6 +92,8 @@ namespace data CryptoKeyType GetCryptoKeyType () const; void DropVerifier () const; // to save memory + bool operator == (const IdentityEx & other) const { return GetIdentHash() == other.GetIdentHash(); } + private: void CreateVerifier () const; diff --git a/Tag.h b/Tag.h index b6f94de7..6e5638a4 100644 --- a/Tag.h +++ b/Tag.h @@ -50,6 +50,11 @@ namespace data { return true; } + /** fill with a value */ + void Fill(uint8_t c) { + memset(m_Buf, c, sz); + } + std::string ToBase64 () const { char str[sz*2];