diff --git a/libi2pd/Datagram.h b/libi2pd/Datagram.h index a55c8edf..8f3d5ceb 100644 --- a/libi2pd/Datagram.h +++ b/libi2pd/Datagram.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2021, The PurpleI2P Project +* Copyright (c) 2013-2022, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -15,6 +15,7 @@ #include #include #include "Base.h" +#include "Gzip.h" #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" diff --git a/libi2pd_client/ClientContext.h b/libi2pd_client/ClientContext.h index ded8ea75..4e969a6b 100644 --- a/libi2pd_client/ClientContext.h +++ b/libi2pd_client/ClientContext.h @@ -18,6 +18,7 @@ #include "HTTPProxy.h" #include "SOCKS.h" #include "I2PTunnel.h" +#include "UDPTunnel.h" #include "SAM.h" #include "BOB.h" #include "I2CP.h" diff --git a/libi2pd_client/I2PTunnel.cpp b/libi2pd_client/I2PTunnel.cpp index 12fa08e2..5558cd8d 100644 --- a/libi2pd_client/I2PTunnel.cpp +++ b/libi2pd_client/I2PTunnel.cpp @@ -72,7 +72,7 @@ namespace client Receive (); } - static boost::asio::ip::address GetLoopbackAddressFor(const i2p::data::IdentHash & addr) + boost::asio::ip::address GetLoopbackAddressFor(const i2p::data::IdentHash & addr) { boost::asio::ip::address_v4::bytes_type bytes; const uint8_t * ident = addr; @@ -874,370 +874,6 @@ namespace client { return std::make_shared (this, stream, GetEndpoint (), m_WebircPass, GetSSLCtx ()); } - - void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) - { - if (!m_LastSession || m_LastSession->Identity.GetLL()[0] != from.GetIdentHash ().GetLL()[0] || fromPort != m_LastSession->RemotePort) - { - std::lock_guard lock(m_SessionsMutex); - m_LastSession = ObtainUDPSession(from, toPort, fromPort); - } - m_LastSession->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint); - m_LastSession->LastActivity = i2p::util::GetMillisecondsSinceEpoch(); - } - - void I2PUDPServerTunnel::HandleRecvFromI2PRaw (uint16_t, uint16_t, const uint8_t * buf, size_t len) - { - if (m_LastSession) - { - m_LastSession->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint); - m_LastSession->LastActivity = i2p::util::GetMillisecondsSinceEpoch(); - } - } - - void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) - { - std::lock_guard lock(m_SessionsMutex); - uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); - auto itr = m_Sessions.begin(); - while(itr != m_Sessions.end()) { - if(now - (*itr)->LastActivity >= delta ) - itr = m_Sessions.erase(itr); - else - ++itr; - } - } - - void I2PUDPClientTunnel::ExpireStale(const uint64_t delta) - { - std::lock_guard lock(m_SessionsMutex); - uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); - std::vector removePorts; - for (const auto & s : m_Sessions) { - if (now - s.second->second >= delta) - removePorts.push_back(s.first); - } - for(auto port : removePorts) { - m_Sessions.erase(port); - } - } - - UDPSessionPtr I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) - { - auto ih = from.GetIdentHash(); - for (auto & s : m_Sessions ) - { - if (s->Identity.GetLL()[0] == ih.GetLL()[0] && remotePort == s->RemotePort) - { - /** found existing session */ - LogPrint(eLogDebug, "UDPServer: Found session ", s->IPSocket.local_endpoint(), " ", ih.ToBase32()); - return s; - } - } - boost::asio::ip::address addr; - /** create new udp session */ - if(m_IsUniqueLocal && m_LocalAddress.is_loopback()) - { - auto ident = from.GetIdentHash(); - addr = GetLoopbackAddressFor(ident); - } - else - addr = m_LocalAddress; - boost::asio::ip::udp::endpoint ep(addr, 0); - m_Sessions.push_back(std::make_shared(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort)); - auto & back = m_Sessions.back(); - return back; - } - - UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint, - const std::shared_ptr & localDestination, - boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash * to, - uint16_t ourPort, uint16_t theirPort) : - m_Destination(localDestination->GetDatagramDestination()), - IPSocket(localDestination->GetService(), localEndpoint), - SendEndpoint(endpoint), - LastActivity(i2p::util::GetMillisecondsSinceEpoch()), - LocalPort(ourPort), - RemotePort(theirPort) - { - IPSocket.set_option (boost::asio::socket_base::receive_buffer_size (I2P_UDP_MAX_MTU )); - memcpy(Identity, to->data(), 32); - Receive(); - } - - void UDPSession::Receive() - { - LogPrint(eLogDebug, "UDPSession: Receive"); - IPSocket.async_receive_from(boost::asio::buffer(m_Buffer, I2P_UDP_MAX_MTU), - FromEndpoint, std::bind(&UDPSession::HandleReceived, this, std::placeholders::_1, std::placeholders::_2)); - } - - void UDPSession::HandleReceived(const boost::system::error_code & ecode, std::size_t len) - { - if(!ecode) - { - LogPrint(eLogDebug, "UDPSession: Forward ", len, "B from ", FromEndpoint); - auto ts = i2p::util::GetMillisecondsSinceEpoch(); - auto session = m_Destination->GetSession (Identity); - if (ts > LastActivity + I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL) - m_Destination->SendDatagram(session, m_Buffer, len, LocalPort, RemotePort); - else - m_Destination->SendRawDatagram(session, m_Buffer, len, LocalPort, RemotePort); - size_t numPackets = 0; - while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE) - { - boost::system::error_code ec; - size_t moreBytes = IPSocket.available(ec); - if (ec || !moreBytes) break; - len = IPSocket.receive_from (boost::asio::buffer (m_Buffer, I2P_UDP_MAX_MTU), FromEndpoint, 0, ec); - m_Destination->SendRawDatagram (session, m_Buffer, len, LocalPort, RemotePort); - numPackets++; - } - if (numPackets > 0) - LogPrint(eLogDebug, "UDPSession: Forward more ", numPackets, "packets B from ", FromEndpoint); - m_Destination->FlushSendQueue (session); - LastActivity = ts; - Receive(); - } - else - LogPrint(eLogError, "UDPSession: ", ecode.message()); - } - - I2PUDPServerTunnel::I2PUDPServerTunnel (const std::string & name, std::shared_ptr localDestination, - boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port, bool gzip) : - m_IsUniqueLocal (true), m_Name (name), m_LocalAddress (localAddress), - m_RemoteEndpoint (forwardTo), m_LocalDest (localDestination), m_Gzip (gzip) - { - } - - I2PUDPServerTunnel::~I2PUDPServerTunnel () - { - Stop (); - } - - void I2PUDPServerTunnel::Start () - { - m_LocalDest->Start (); - - auto dgram = m_LocalDest->CreateDatagramDestination (m_Gzip); - dgram->SetReceiver (std::bind (&I2PUDPServerTunnel::HandleRecvFromI2P, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); - dgram->SetRawReceiver (std::bind (&I2PUDPServerTunnel::HandleRecvFromI2PRaw, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); - } - - void I2PUDPServerTunnel::Stop () - { - auto dgram = m_LocalDest->GetDatagramDestination (); - if (dgram) dgram->ResetReceiver (); - } - - std::vector > I2PUDPServerTunnel::GetSessions () - { - std::vector > sessions; - std::lock_guard lock (m_SessionsMutex); - - for (UDPSessionPtr s: m_Sessions) - { - if (!s->m_Destination) continue; - auto info = s->m_Destination->GetInfoForRemote (s->Identity); - if (!info) continue; - - auto sinfo = std::make_shared (); - sinfo->Name = m_Name; - sinfo->LocalIdent = std::make_shared (m_LocalDest->GetIdentHash ().data ()); - sinfo->RemoteIdent = std::make_shared (s->Identity.data ()); - sinfo->CurrentIBGW = info->IBGW; - sinfo->CurrentOBEP = info->OBEP; - sessions.push_back (sinfo); - } - return sessions; - } - - I2PUDPClientTunnel::I2PUDPClientTunnel (const std::string & name, const std::string &remoteDest, - boost::asio::ip::udp::endpoint localEndpoint, - std::shared_ptr localDestination, - uint16_t remotePort, bool gzip) : - m_Name (name), m_RemoteDest (remoteDest), m_LocalDest (localDestination), m_LocalEndpoint (localEndpoint), - m_RemoteIdent (nullptr), m_ResolveThread (nullptr), m_LocalSocket (nullptr), RemotePort (remotePort), - m_LastPort (0), m_cancel_resolve (false), m_Gzip (gzip) - { - } - - I2PUDPClientTunnel::~I2PUDPClientTunnel () - { - Stop (); - } - - void I2PUDPClientTunnel::Start () - { - // Reset flag in case of tunnel reload - if (m_cancel_resolve) m_cancel_resolve = false; - - m_LocalSocket.reset (new boost::asio::ip::udp::socket (m_LocalDest->GetService (), m_LocalEndpoint)); - m_LocalSocket->set_option (boost::asio::socket_base::receive_buffer_size (I2P_UDP_MAX_MTU)); - m_LocalSocket->set_option (boost::asio::socket_base::reuse_address (true)); - - auto dgram = m_LocalDest->CreateDatagramDestination (m_Gzip); - dgram->SetReceiver (std::bind (&I2PUDPClientTunnel::HandleRecvFromI2P, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5)); - dgram->SetRawReceiver (std::bind (&I2PUDPClientTunnel::HandleRecvFromI2PRaw, this, - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); - - m_LocalDest->Start (); - if (m_ResolveThread == nullptr) - m_ResolveThread = new std::thread (std::bind (&I2PUDPClientTunnel::TryResolving, this)); - RecvFromLocal (); - } - - void I2PUDPClientTunnel::Stop () - { - auto dgram = m_LocalDest->GetDatagramDestination (); - if (dgram) dgram->ResetReceiver (); - m_cancel_resolve = true; - - m_Sessions.clear(); - - if(m_LocalSocket && m_LocalSocket->is_open ()) - m_LocalSocket->close (); - - if(m_ResolveThread) - { - m_ResolveThread->join (); - delete m_ResolveThread; - m_ResolveThread = nullptr; - } - if (m_RemoteIdent) - { - delete m_RemoteIdent; - m_RemoteIdent = nullptr; - } - } - - void I2PUDPClientTunnel::RecvFromLocal () - { - m_LocalSocket->async_receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU), - m_RecvEndpoint, std::bind (&I2PUDPClientTunnel::HandleRecvFromLocal, this, std::placeholders::_1, std::placeholders::_2)); - } - - void I2PUDPClientTunnel::HandleRecvFromLocal (const boost::system::error_code & ec, std::size_t transferred) - { - if (m_cancel_resolve) { - LogPrint (eLogDebug, "UDP Client: Ignoring incomming data: stopping"); - return; - } - if (ec) { - LogPrint (eLogError, "UDP Client: Reading from socket error: ", ec.message (), ". Restarting listener..."); - RecvFromLocal (); // Restart listener and continue work - return; - } - if (!m_RemoteIdent) { - LogPrint (eLogWarning, "UDP Client: Remote endpoint not resolved yet"); - RecvFromLocal (); - return; // drop, remote not resolved - } - auto remotePort = m_RecvEndpoint.port (); - if (!m_LastPort || m_LastPort != remotePort) - { - auto itr = m_Sessions.find (remotePort); - if (itr != m_Sessions.end ()) - m_LastSession = itr->second; - else - { - m_LastSession = std::make_shared (boost::asio::ip::udp::endpoint (m_RecvEndpoint), 0); - m_Sessions.emplace (remotePort, m_LastSession); - } - m_LastPort = remotePort; - } - // send off to remote i2p destination - auto ts = i2p::util::GetMillisecondsSinceEpoch (); - LogPrint (eLogDebug, "UDP Client: Send ", transferred, " to ", m_RemoteIdent->ToBase32 (), ":", RemotePort); - auto session = m_LocalDest->GetDatagramDestination ()->GetSession (*m_RemoteIdent); - if (ts > m_LastSession->second + I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL) - m_LocalDest->GetDatagramDestination ()->SendDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); - else - m_LocalDest->GetDatagramDestination ()->SendRawDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); - size_t numPackets = 0; - while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE) - { - boost::system::error_code ec; - size_t moreBytes = m_LocalSocket->available (ec); - if (ec || !moreBytes) break; - transferred = m_LocalSocket->receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU), m_RecvEndpoint, 0, ec); - remotePort = m_RecvEndpoint.port (); - // TODO: check remotePort - m_LocalDest->GetDatagramDestination ()->SendRawDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); - numPackets++; - } - if (numPackets) - LogPrint (eLogDebug, "UDP Client: Sent ", numPackets, " more packets to ", m_RemoteIdent->ToBase32 ()); - m_LocalDest->GetDatagramDestination ()->FlushSendQueue (session); - - // mark convo as active - if (m_LastSession) - m_LastSession->second = ts; - RecvFromLocal (); - } - - std::vector > I2PUDPClientTunnel::GetSessions () - { - // TODO: implement - std::vector > infos; - return infos; - } - - void I2PUDPClientTunnel::TryResolving () - { - i2p::util::SetThreadName ("UDP Resolver"); - LogPrint (eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest); - - std::shared_ptr addr; - while (!(addr = context.GetAddressBook().GetAddress(m_RemoteDest)) && !m_cancel_resolve) - { - LogPrint (eLogWarning, "UDP Tunnel: Failed to lookup ", m_RemoteDest); - std::this_thread::sleep_for (std::chrono::seconds (1)); - } - if (m_cancel_resolve) - { - LogPrint(eLogError, "UDP Tunnel: Lookup of ", m_RemoteDest, " was cancelled"); - return; - } - if (!addr || !addr->IsIdentHash ()) - { - LogPrint (eLogError, "UDP Tunnel: ", m_RemoteDest, " not found"); - return; - } - m_RemoteIdent = new i2p::data::IdentHash; - *m_RemoteIdent = addr->identHash; - LogPrint(eLogInfo, "UDP Tunnel: Resolved ", m_RemoteDest, " to ", m_RemoteIdent->ToBase32 ()); - } - - void I2PUDPClientTunnel::HandleRecvFromI2P (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) - { - if (m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent) - HandleRecvFromI2PRaw (fromPort, toPort, buf, len); - else - LogPrint(eLogWarning, "UDP Client: Unwarranted traffic from ", from.GetIdentHash().ToBase32 ()); - } - - void I2PUDPClientTunnel::HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) - { - auto itr = m_Sessions.find (toPort); - // found convo ? - if (itr != m_Sessions.end ()) - { - // found convo - if (len > 0) - { - LogPrint (eLogDebug, "UDP Client: Got ", len, "B from ", m_RemoteIdent ? m_RemoteIdent->ToBase32 () : ""); - m_LocalSocket->send_to (boost::asio::buffer (buf, len), itr->second->first); - // mark convo as active - itr->second->second = i2p::util::GetMillisecondsSinceEpoch (); - } - } - else - LogPrint (eLogWarning, "UDP Client: Not tracking udp session using port ", (int) toPort); - } } } diff --git a/libi2pd_client/I2PTunnel.h b/libi2pd_client/I2PTunnel.h index 0da11a46..4c7b2002 100644 --- a/libi2pd_client/I2PTunnel.h +++ b/libi2pd_client/I2PTunnel.h @@ -19,7 +19,6 @@ #include #include "Identity.h" #include "Destination.h" -#include "Datagram.h" #include "Streaming.h" #include "I2PService.h" #include "AddressBook.h" @@ -180,162 +179,6 @@ namespace client std::unique_ptr m_KeepAliveTimer; }; - - /** 2 minute timeout for udp sessions */ - const uint64_t I2P_UDP_SESSION_TIMEOUT = 1000 * 60 * 2; - const uint64_t I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL = 100; // in milliseconds - - /** max size for i2p udp */ - const size_t I2P_UDP_MAX_MTU = 64*1024; - - struct UDPSession - { - i2p::datagram::DatagramDestination * m_Destination; - boost::asio::ip::udp::socket IPSocket; - i2p::data::IdentHash Identity; - boost::asio::ip::udp::endpoint FromEndpoint; - boost::asio::ip::udp::endpoint SendEndpoint; - uint64_t LastActivity; - - uint16_t LocalPort; - uint16_t RemotePort; - - uint8_t m_Buffer[I2P_UDP_MAX_MTU]; - - UDPSession(boost::asio::ip::udp::endpoint localEndpoint, - const std::shared_ptr & localDestination, - boost::asio::ip::udp::endpoint remote, const i2p::data::IdentHash * ident, - uint16_t ourPort, uint16_t theirPort); - void HandleReceived(const boost::system::error_code & ecode, std::size_t len); - void Receive(); - }; - - - /** read only info about a datagram session */ - struct DatagramSessionInfo - { - /** the name of this forward */ - std::string Name; - /** ident hash of local destination */ - std::shared_ptr LocalIdent; - /** ident hash of remote destination */ - std::shared_ptr RemoteIdent; - /** ident hash of IBGW in use currently in this session or nullptr if none is set */ - std::shared_ptr CurrentIBGW; - /** ident hash of OBEP in use for this session or nullptr if none is set */ - std::shared_ptr CurrentOBEP; - /** i2p router's udp endpoint */ - boost::asio::ip::udp::endpoint LocalEndpoint; - /** client's udp endpoint */ - boost::asio::ip::udp::endpoint RemoteEndpoint; - /** how long has this converstation been idle in ms */ - uint64_t idle; - }; - - typedef std::shared_ptr UDPSessionPtr; - - /** server side udp tunnel, many i2p inbound to 1 ip outbound */ - class I2PUDPServerTunnel - { - public: - - I2PUDPServerTunnel (const std::string & name, - std::shared_ptr localDestination, - boost::asio::ip::address localAddress, - boost::asio::ip::udp::endpoint forwardTo, uint16_t port, bool gzip); - ~I2PUDPServerTunnel (); - - /** expire stale udp conversations */ - void ExpireStale (const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); - void Start (); - void Stop (); - const char * GetName () const { return m_Name.c_str(); } - std::vector > GetSessions (); - std::shared_ptr GetLocalDestination () const { return m_LocalDest; } - - void SetUniqueLocal (bool isUniqueLocal = true) { m_IsUniqueLocal = isUniqueLocal; } - - private: - - void HandleRecvFromI2P (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); - void HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); - UDPSessionPtr ObtainUDPSession (const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); - - private: - - bool m_IsUniqueLocal; - const std::string m_Name; - boost::asio::ip::address m_LocalAddress; - boost::asio::ip::udp::endpoint m_RemoteEndpoint; - std::mutex m_SessionsMutex; - std::vector m_Sessions; - std::shared_ptr m_LocalDest; - UDPSessionPtr m_LastSession; - bool m_Gzip; - - public: - - bool isUpdated; // transient, used during reload only - }; - - class I2PUDPClientTunnel - { - public: - - I2PUDPClientTunnel (const std::string & name, const std::string &remoteDest, - boost::asio::ip::udp::endpoint localEndpoint, std::shared_ptr localDestination, - uint16_t remotePort, bool gzip); - ~I2PUDPClientTunnel (); - - void Start (); - void Stop (); - const char * GetName () const { return m_Name.c_str(); } - std::vector > GetSessions (); - - bool IsLocalDestination (const i2p::data::IdentHash & destination) const { return destination == m_LocalDest->GetIdentHash(); } - - std::shared_ptr GetLocalDestination () const { return m_LocalDest; } - inline void SetLocalDestination (std::shared_ptr dest) - { - if (m_LocalDest) m_LocalDest->Release (); - if (dest) dest->Acquire (); - m_LocalDest = dest; - } - - void ExpireStale (const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); - - private: - - typedef std::pair UDPConvo; - void RecvFromLocal (); - void HandleRecvFromLocal (const boost::system::error_code & e, std::size_t transferred); - void HandleRecvFromI2P (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); - void HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); - void TryResolving (); - - private: - - const std::string m_Name; - std::mutex m_SessionsMutex; - std::unordered_map > m_Sessions; // maps i2p port -> local udp convo - const std::string m_RemoteDest; - std::shared_ptr m_LocalDest; - const boost::asio::ip::udp::endpoint m_LocalEndpoint; - i2p::data::IdentHash * m_RemoteIdent; - std::thread * m_ResolveThread; - std::unique_ptr m_LocalSocket; - boost::asio::ip::udp::endpoint m_RecvEndpoint; - uint8_t m_RecvBuff[I2P_UDP_MAX_MTU]; - uint16_t RemotePort, m_LastPort; - bool m_cancel_resolve; - bool m_Gzip; - std::shared_ptr m_LastSession; - - public: - - bool isUpdated; // transient, used during reload only - }; - class I2PServerTunnel: public I2PService { public: @@ -418,6 +261,8 @@ namespace client std::string m_WebircPass; }; + + boost::asio::ip::address GetLoopbackAddressFor(const i2p::data::IdentHash & addr); } } diff --git a/libi2pd_client/UDPTunnel.cpp b/libi2pd_client/UDPTunnel.cpp new file mode 100644 index 00000000..a2c9061c --- /dev/null +++ b/libi2pd_client/UDPTunnel.cpp @@ -0,0 +1,384 @@ +/* +* Copyright (c) 2013-2022, The PurpleI2P Project +* +* This file is part of Purple i2pd project and licensed under BSD3 +* +* See full license text in LICENSE file at top of project tree +*/ + +#include "Log.h" +#include "util.h" +#include "ClientContext.h" +#include "I2PTunnel.h" // for GetLoopbackAddressFor +#include "UDPTunnel.h" + +namespace i2p +{ +namespace client +{ + void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + if (!m_LastSession || m_LastSession->Identity.GetLL()[0] != from.GetIdentHash ().GetLL()[0] || fromPort != m_LastSession->RemotePort) + { + std::lock_guard lock(m_SessionsMutex); + m_LastSession = ObtainUDPSession(from, toPort, fromPort); + } + m_LastSession->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint); + m_LastSession->LastActivity = i2p::util::GetMillisecondsSinceEpoch(); + } + + void I2PUDPServerTunnel::HandleRecvFromI2PRaw (uint16_t, uint16_t, const uint8_t * buf, size_t len) + { + if (m_LastSession) + { + m_LastSession->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint); + m_LastSession->LastActivity = i2p::util::GetMillisecondsSinceEpoch(); + } + } + + void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) + { + std::lock_guard lock(m_SessionsMutex); + uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); + auto itr = m_Sessions.begin(); + while(itr != m_Sessions.end()) { + if(now - (*itr)->LastActivity >= delta ) + itr = m_Sessions.erase(itr); + else + ++itr; + } + } + + void I2PUDPClientTunnel::ExpireStale(const uint64_t delta) + { + std::lock_guard lock(m_SessionsMutex); + uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); + std::vector removePorts; + for (const auto & s : m_Sessions) { + if (now - s.second->second >= delta) + removePorts.push_back(s.first); + } + for(auto port : removePorts) { + m_Sessions.erase(port); + } + } + + UDPSessionPtr I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) + { + auto ih = from.GetIdentHash(); + for (auto & s : m_Sessions ) + { + if (s->Identity.GetLL()[0] == ih.GetLL()[0] && remotePort == s->RemotePort) + { + /** found existing session */ + LogPrint(eLogDebug, "UDPServer: Found session ", s->IPSocket.local_endpoint(), " ", ih.ToBase32()); + return s; + } + } + boost::asio::ip::address addr; + /** create new udp session */ + if(m_IsUniqueLocal && m_LocalAddress.is_loopback()) + { + auto ident = from.GetIdentHash(); + addr = GetLoopbackAddressFor(ident); + } + else + addr = m_LocalAddress; + boost::asio::ip::udp::endpoint ep(addr, 0); + m_Sessions.push_back(std::make_shared(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort)); + auto & back = m_Sessions.back(); + return back; + } + + UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint, + const std::shared_ptr & localDestination, + boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash * to, + uint16_t ourPort, uint16_t theirPort) : + m_Destination(localDestination->GetDatagramDestination()), + IPSocket(localDestination->GetService(), localEndpoint), + SendEndpoint(endpoint), + LastActivity(i2p::util::GetMillisecondsSinceEpoch()), + LocalPort(ourPort), + RemotePort(theirPort) + { + IPSocket.set_option (boost::asio::socket_base::receive_buffer_size (I2P_UDP_MAX_MTU )); + memcpy(Identity, to->data(), 32); + Receive(); + } + + void UDPSession::Receive() + { + LogPrint(eLogDebug, "UDPSession: Receive"); + IPSocket.async_receive_from(boost::asio::buffer(m_Buffer, I2P_UDP_MAX_MTU), + FromEndpoint, std::bind(&UDPSession::HandleReceived, this, std::placeholders::_1, std::placeholders::_2)); + } + + void UDPSession::HandleReceived(const boost::system::error_code & ecode, std::size_t len) + { + if(!ecode) + { + LogPrint(eLogDebug, "UDPSession: Forward ", len, "B from ", FromEndpoint); + auto ts = i2p::util::GetMillisecondsSinceEpoch(); + auto session = m_Destination->GetSession (Identity); + if (ts > LastActivity + I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL) + m_Destination->SendDatagram(session, m_Buffer, len, LocalPort, RemotePort); + else + m_Destination->SendRawDatagram(session, m_Buffer, len, LocalPort, RemotePort); + size_t numPackets = 0; + while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE) + { + boost::system::error_code ec; + size_t moreBytes = IPSocket.available(ec); + if (ec || !moreBytes) break; + len = IPSocket.receive_from (boost::asio::buffer (m_Buffer, I2P_UDP_MAX_MTU), FromEndpoint, 0, ec); + m_Destination->SendRawDatagram (session, m_Buffer, len, LocalPort, RemotePort); + numPackets++; + } + if (numPackets > 0) + LogPrint(eLogDebug, "UDPSession: Forward more ", numPackets, "packets B from ", FromEndpoint); + m_Destination->FlushSendQueue (session); + LastActivity = ts; + Receive(); + } + else + LogPrint(eLogError, "UDPSession: ", ecode.message()); + } + + I2PUDPServerTunnel::I2PUDPServerTunnel (const std::string & name, std::shared_ptr localDestination, + boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port, bool gzip) : + m_IsUniqueLocal (true), m_Name (name), m_LocalAddress (localAddress), + m_RemoteEndpoint (forwardTo), m_LocalDest (localDestination), m_Gzip (gzip) + { + } + + I2PUDPServerTunnel::~I2PUDPServerTunnel () + { + Stop (); + } + + void I2PUDPServerTunnel::Start () + { + m_LocalDest->Start (); + + auto dgram = m_LocalDest->CreateDatagramDestination (m_Gzip); + dgram->SetReceiver (std::bind (&I2PUDPServerTunnel::HandleRecvFromI2P, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); + dgram->SetRawReceiver (std::bind (&I2PUDPServerTunnel::HandleRecvFromI2PRaw, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); + } + + void I2PUDPServerTunnel::Stop () + { + auto dgram = m_LocalDest->GetDatagramDestination (); + if (dgram) dgram->ResetReceiver (); + } + + std::vector > I2PUDPServerTunnel::GetSessions () + { + std::vector > sessions; + std::lock_guard lock (m_SessionsMutex); + + for (UDPSessionPtr s: m_Sessions) + { + if (!s->m_Destination) continue; + auto info = s->m_Destination->GetInfoForRemote (s->Identity); + if (!info) continue; + + auto sinfo = std::make_shared (); + sinfo->Name = m_Name; + sinfo->LocalIdent = std::make_shared (m_LocalDest->GetIdentHash ().data ()); + sinfo->RemoteIdent = std::make_shared (s->Identity.data ()); + sinfo->CurrentIBGW = info->IBGW; + sinfo->CurrentOBEP = info->OBEP; + sessions.push_back (sinfo); + } + return sessions; + } + + I2PUDPClientTunnel::I2PUDPClientTunnel (const std::string & name, const std::string &remoteDest, + boost::asio::ip::udp::endpoint localEndpoint, + std::shared_ptr localDestination, + uint16_t remotePort, bool gzip) : + m_Name (name), m_RemoteDest (remoteDest), m_LocalDest (localDestination), m_LocalEndpoint (localEndpoint), + m_RemoteIdent (nullptr), m_ResolveThread (nullptr), m_LocalSocket (nullptr), RemotePort (remotePort), + m_LastPort (0), m_cancel_resolve (false), m_Gzip (gzip) + { + } + + I2PUDPClientTunnel::~I2PUDPClientTunnel () + { + Stop (); + } + + void I2PUDPClientTunnel::Start () + { + // Reset flag in case of tunnel reload + if (m_cancel_resolve) m_cancel_resolve = false; + + m_LocalSocket.reset (new boost::asio::ip::udp::socket (m_LocalDest->GetService (), m_LocalEndpoint)); + m_LocalSocket->set_option (boost::asio::socket_base::receive_buffer_size (I2P_UDP_MAX_MTU)); + m_LocalSocket->set_option (boost::asio::socket_base::reuse_address (true)); + + auto dgram = m_LocalDest->CreateDatagramDestination (m_Gzip); + dgram->SetReceiver (std::bind (&I2PUDPClientTunnel::HandleRecvFromI2P, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4, + std::placeholders::_5)); + dgram->SetRawReceiver (std::bind (&I2PUDPClientTunnel::HandleRecvFromI2PRaw, this, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); + + m_LocalDest->Start (); + if (m_ResolveThread == nullptr) + m_ResolveThread = new std::thread (std::bind (&I2PUDPClientTunnel::TryResolving, this)); + RecvFromLocal (); + } + + void I2PUDPClientTunnel::Stop () + { + auto dgram = m_LocalDest->GetDatagramDestination (); + if (dgram) dgram->ResetReceiver (); + m_cancel_resolve = true; + + m_Sessions.clear(); + + if(m_LocalSocket && m_LocalSocket->is_open ()) + m_LocalSocket->close (); + + if(m_ResolveThread) + { + m_ResolveThread->join (); + delete m_ResolveThread; + m_ResolveThread = nullptr; + } + if (m_RemoteIdent) + { + delete m_RemoteIdent; + m_RemoteIdent = nullptr; + } + } + + void I2PUDPClientTunnel::RecvFromLocal () + { + m_LocalSocket->async_receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU), + m_RecvEndpoint, std::bind (&I2PUDPClientTunnel::HandleRecvFromLocal, this, std::placeholders::_1, std::placeholders::_2)); + } + + void I2PUDPClientTunnel::HandleRecvFromLocal (const boost::system::error_code & ec, std::size_t transferred) + { + if (m_cancel_resolve) { + LogPrint (eLogDebug, "UDP Client: Ignoring incomming data: stopping"); + return; + } + if (ec) { + LogPrint (eLogError, "UDP Client: Reading from socket error: ", ec.message (), ". Restarting listener..."); + RecvFromLocal (); // Restart listener and continue work + return; + } + if (!m_RemoteIdent) { + LogPrint (eLogWarning, "UDP Client: Remote endpoint not resolved yet"); + RecvFromLocal (); + return; // drop, remote not resolved + } + auto remotePort = m_RecvEndpoint.port (); + if (!m_LastPort || m_LastPort != remotePort) + { + auto itr = m_Sessions.find (remotePort); + if (itr != m_Sessions.end ()) + m_LastSession = itr->second; + else + { + m_LastSession = std::make_shared (boost::asio::ip::udp::endpoint (m_RecvEndpoint), 0); + m_Sessions.emplace (remotePort, m_LastSession); + } + m_LastPort = remotePort; + } + // send off to remote i2p destination + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + LogPrint (eLogDebug, "UDP Client: Send ", transferred, " to ", m_RemoteIdent->ToBase32 (), ":", RemotePort); + auto session = m_LocalDest->GetDatagramDestination ()->GetSession (*m_RemoteIdent); + if (ts > m_LastSession->second + I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL) + m_LocalDest->GetDatagramDestination ()->SendDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); + else + m_LocalDest->GetDatagramDestination ()->SendRawDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); + size_t numPackets = 0; + while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE) + { + boost::system::error_code ec; + size_t moreBytes = m_LocalSocket->available (ec); + if (ec || !moreBytes) break; + transferred = m_LocalSocket->receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU), m_RecvEndpoint, 0, ec); + remotePort = m_RecvEndpoint.port (); + // TODO: check remotePort + m_LocalDest->GetDatagramDestination ()->SendRawDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); + numPackets++; + } + if (numPackets) + LogPrint (eLogDebug, "UDP Client: Sent ", numPackets, " more packets to ", m_RemoteIdent->ToBase32 ()); + m_LocalDest->GetDatagramDestination ()->FlushSendQueue (session); + + // mark convo as active + if (m_LastSession) + m_LastSession->second = ts; + RecvFromLocal (); + } + + std::vector > I2PUDPClientTunnel::GetSessions () + { + // TODO: implement + std::vector > infos; + return infos; + } + + void I2PUDPClientTunnel::TryResolving () + { + i2p::util::SetThreadName ("UDP Resolver"); + LogPrint (eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest); + + std::shared_ptr addr; + while (!(addr = context.GetAddressBook().GetAddress(m_RemoteDest)) && !m_cancel_resolve) + { + LogPrint (eLogWarning, "UDP Tunnel: Failed to lookup ", m_RemoteDest); + std::this_thread::sleep_for (std::chrono::seconds (1)); + } + if (m_cancel_resolve) + { + LogPrint(eLogError, "UDP Tunnel: Lookup of ", m_RemoteDest, " was cancelled"); + return; + } + if (!addr || !addr->IsIdentHash ()) + { + LogPrint (eLogError, "UDP Tunnel: ", m_RemoteDest, " not found"); + return; + } + m_RemoteIdent = new i2p::data::IdentHash; + *m_RemoteIdent = addr->identHash; + LogPrint(eLogInfo, "UDP Tunnel: Resolved ", m_RemoteDest, " to ", m_RemoteIdent->ToBase32 ()); + } + + void I2PUDPClientTunnel::HandleRecvFromI2P (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + if (m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent) + HandleRecvFromI2PRaw (fromPort, toPort, buf, len); + else + LogPrint(eLogWarning, "UDP Client: Unwarranted traffic from ", from.GetIdentHash().ToBase32 ()); + } + + void I2PUDPClientTunnel::HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + auto itr = m_Sessions.find (toPort); + // found convo ? + if (itr != m_Sessions.end ()) + { + // found convo + if (len > 0) + { + LogPrint (eLogDebug, "UDP Client: Got ", len, "B from ", m_RemoteIdent ? m_RemoteIdent->ToBase32 () : ""); + m_LocalSocket->send_to (boost::asio::buffer (buf, len), itr->second->first); + // mark convo as active + itr->second->second = i2p::util::GetMillisecondsSinceEpoch (); + } + } + else + LogPrint (eLogWarning, "UDP Client: Not tracking udp session using port ", (int) toPort); + } + +} +} diff --git a/libi2pd_client/UDPTunnel.h b/libi2pd_client/UDPTunnel.h new file mode 100644 index 00000000..4bba8768 --- /dev/null +++ b/libi2pd_client/UDPTunnel.h @@ -0,0 +1,186 @@ +/* +* Copyright (c) 2013-2022, The PurpleI2P Project +* +* This file is part of Purple i2pd project and licensed under BSD3 +* +* See full license text in LICENSE file at top of project tree +*/ + +#ifndef UDPTUNNEL_H__ +#define UDPTUNNEL_H__ + +#include +#include +#include +#include +#include +#include +#include +#include "Identity.h" +#include "Destination.h" +#include "Datagram.h" + +namespace i2p +{ +namespace client +{ + /** 2 minute timeout for udp sessions */ + const uint64_t I2P_UDP_SESSION_TIMEOUT = 1000 * 60 * 2; + const uint64_t I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL = 100; // in milliseconds + + /** max size for i2p udp */ + const size_t I2P_UDP_MAX_MTU = 64*1024; + + struct UDPSession + { + i2p::datagram::DatagramDestination * m_Destination; + boost::asio::ip::udp::socket IPSocket; + i2p::data::IdentHash Identity; + boost::asio::ip::udp::endpoint FromEndpoint; + boost::asio::ip::udp::endpoint SendEndpoint; + uint64_t LastActivity; + + uint16_t LocalPort; + uint16_t RemotePort; + + uint8_t m_Buffer[I2P_UDP_MAX_MTU]; + + UDPSession(boost::asio::ip::udp::endpoint localEndpoint, + const std::shared_ptr & localDestination, + boost::asio::ip::udp::endpoint remote, const i2p::data::IdentHash * ident, + uint16_t ourPort, uint16_t theirPort); + void HandleReceived(const boost::system::error_code & ecode, std::size_t len); + void Receive(); + }; + + + /** read only info about a datagram session */ + struct DatagramSessionInfo + { + /** the name of this forward */ + std::string Name; + /** ident hash of local destination */ + std::shared_ptr LocalIdent; + /** ident hash of remote destination */ + std::shared_ptr RemoteIdent; + /** ident hash of IBGW in use currently in this session or nullptr if none is set */ + std::shared_ptr CurrentIBGW; + /** ident hash of OBEP in use for this session or nullptr if none is set */ + std::shared_ptr CurrentOBEP; + /** i2p router's udp endpoint */ + boost::asio::ip::udp::endpoint LocalEndpoint; + /** client's udp endpoint */ + boost::asio::ip::udp::endpoint RemoteEndpoint; + /** how long has this converstation been idle in ms */ + uint64_t idle; + }; + + typedef std::shared_ptr UDPSessionPtr; + + /** server side udp tunnel, many i2p inbound to 1 ip outbound */ + class I2PUDPServerTunnel + { + public: + + I2PUDPServerTunnel (const std::string & name, + std::shared_ptr localDestination, + boost::asio::ip::address localAddress, + boost::asio::ip::udp::endpoint forwardTo, uint16_t port, bool gzip); + ~I2PUDPServerTunnel (); + + /** expire stale udp conversations */ + void ExpireStale (const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); + void Start (); + void Stop (); + const char * GetName () const { return m_Name.c_str(); } + std::vector > GetSessions (); + std::shared_ptr GetLocalDestination () const { return m_LocalDest; } + + void SetUniqueLocal (bool isUniqueLocal = true) { m_IsUniqueLocal = isUniqueLocal; } + + private: + + void HandleRecvFromI2P (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + void HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + UDPSessionPtr ObtainUDPSession (const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); + + private: + + bool m_IsUniqueLocal; + const std::string m_Name; + boost::asio::ip::address m_LocalAddress; + boost::asio::ip::udp::endpoint m_RemoteEndpoint; + std::mutex m_SessionsMutex; + std::vector m_Sessions; + std::shared_ptr m_LocalDest; + UDPSessionPtr m_LastSession; + bool m_Gzip; + + public: + + bool isUpdated; // transient, used during reload only + }; + + class I2PUDPClientTunnel + { + public: + + I2PUDPClientTunnel (const std::string & name, const std::string &remoteDest, + boost::asio::ip::udp::endpoint localEndpoint, std::shared_ptr localDestination, + uint16_t remotePort, bool gzip); + ~I2PUDPClientTunnel (); + + void Start (); + void Stop (); + const char * GetName () const { return m_Name.c_str(); } + std::vector > GetSessions (); + + bool IsLocalDestination (const i2p::data::IdentHash & destination) const { return destination == m_LocalDest->GetIdentHash(); } + + std::shared_ptr GetLocalDestination () const { return m_LocalDest; } + inline void SetLocalDestination (std::shared_ptr dest) + { + if (m_LocalDest) m_LocalDest->Release (); + if (dest) dest->Acquire (); + m_LocalDest = dest; + } + + void ExpireStale (const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); + + private: + + typedef std::pair UDPConvo; + void RecvFromLocal (); + void HandleRecvFromLocal (const boost::system::error_code & e, std::size_t transferred); + void HandleRecvFromI2P (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + void HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + void TryResolving (); + + private: + + const std::string m_Name; + std::mutex m_SessionsMutex; + std::unordered_map > m_Sessions; // maps i2p port -> local udp convo + const std::string m_RemoteDest; + std::shared_ptr m_LocalDest; + const boost::asio::ip::udp::endpoint m_LocalEndpoint; + i2p::data::IdentHash * m_RemoteIdent; + std::thread * m_ResolveThread; + std::unique_ptr m_LocalSocket; + boost::asio::ip::udp::endpoint m_RecvEndpoint; + uint8_t m_RecvBuff[I2P_UDP_MAX_MTU]; + uint16_t RemotePort, m_LastPort; + bool m_cancel_resolve; + bool m_Gzip; + std::shared_ptr m_LastSession; + + public: + + bool isUpdated; // transient, used during reload only + }; + + +} +} + +#endif