From fdebbc4498948435075d427abfa99b18baf7c8c0 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 10 Jul 2022 17:13:25 -0400 Subject: [PATCH] select sessions for introducers --- libi2pd/SSU2.cpp | 61 ++++++++++++++++++++++++++++++++++++++++- libi2pd/SSU2.h | 11 ++++++-- libi2pd/SSU2Session.cpp | 12 +++++++- libi2pd/SSU2Session.h | 1 + 4 files changed, 81 insertions(+), 4 deletions(-) diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index f99c67b6..b48274c4 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -84,11 +84,17 @@ namespace transport void SSU2Server::Stop () { for (auto& it: m_Sessions) + { + it.second->RequestTermination (eSSU2TerminationReasonRouterShutdown); it.second->Done (); + } m_Sessions.clear (); m_SessionsByRouterHash.clear (); m_PendingOutgoingSessions.clear (); - + m_Relays.clear (); + m_Introducers.clear (); + m_IntroducersV6.clear (); + if (context.SupportsV4 () || context.SupportsV6 ()) m_ReceiveService.Stop (); @@ -715,5 +721,58 @@ namespace transport m_IncomingTokens.emplace (ep, ret); return ret; } + + std::list > SSU2Server::FindIntroducers (int maxNumIntroducers, + bool v4, const std::set& excluded) const + { + std::list > ret; + for (const auto& s : m_Sessions) + { + if (s.second->IsEstablished () && (s.second->GetRelayTag () && !s.second->IsOutgoing ()) && + !excluded.count (s.second->GetRemoteIdentity ()->GetIdentHash ()) && + ((v4 && (s.second->GetRemoteTransports () | i2p::data::RouterInfo::eSSU2V4)) || + (!v4 && (s.second->GetRemoteTransports () | i2p::data::RouterInfo::eSSU2V6)))) + ret.push_back (s.second); + } + if ((int)ret.size () > maxNumIntroducers) + { + // shink ret randomly + int sz = ret.size () - maxNumIntroducers; + for (int i = 0; i < sz; i++) + { + auto ind = rand () % ret.size (); + auto it = ret.begin (); + std::advance (it, ind); + ret.erase (it); + } + } + return ret; + } + + void SSU2Server::UpdateIntroducers (bool v4) + { + std::list> newList; + auto& introducers = v4 ? m_Introducers : m_IntroducersV6; + for (const auto& it : introducers) + { + if (it->IsEstablished ()) + { + it->SendKeepAlive (); + newList.push_back (it); + } + } + if (newList.size () < SSU2_MAX_NUM_INTRODUCERS) + { + std::set excluded; + for (auto& it1: newList) + excluded.insert (it1->GetRemoteIdentity ()->GetIdentHash ()); + auto sessions = FindIntroducers (SSU_MAX_NUM_INTRODUCERS - newList.size (), v4, excluded); + for (const auto& it : sessions) + { + newList.push_back (it); + } + } + introducers = newList; + } } } diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 99862947..b9d59d68 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -20,6 +20,9 @@ namespace transport const int SSU2_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds const size_t SSU2_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K const size_t SSU2_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K + const size_t SSU2_MAX_NUM_INTRODUCERS = 3; + const int SSU2_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour + const int SSU2_TO_INTRODUCER_SESSION_EXPIRATION = 4800; // 80 minutes class SSU2Server: private i2p::util::RunnableServiceWithWork { @@ -97,7 +100,10 @@ namespace transport void HandleResendTimer (const boost::system::error_code& ecode); void ConnectThroughIntroducer (std::shared_ptr session); - + std::list > FindIntroducers (int maxNumIntroducers, + bool v4, const std::set& excluded) const; + void UpdateIntroducers (bool v4); + private: ReceiveService m_ReceiveService; @@ -108,10 +114,11 @@ namespace transport std::map > m_PendingOutgoingSessions; std::map > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds) std::map > m_Relays; // we are introducer, relay tag -> session + std::list > m_Introducers, m_IntroducersV6; // introducers we are connected to i2p::util::MemoryPoolMt m_PacketsPool; boost::asio::deadline_timer m_TerminationTimer, m_ResendTimer; std::shared_ptr m_LastSession; - + public: // for HTTP/I2PControl diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 4e1d63ca..aaa7ebff 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -150,6 +150,16 @@ namespace transport payloadSize += CreatePaddingBlock (payload + payloadSize, SSU2_MAX_PAYLOAD_SIZE - payloadSize); SendData (payload, payloadSize); } + + void SSU2Session::SendKeepAlive () + { + if (IsEstablished ()) + { + uint8_t payload[20]; + size_t payloadSize = CreatePaddingBlock (payload, 20, 5); + SendData (payload, payloadSize); + } + } void SSU2Session::Terminate () { @@ -416,7 +426,7 @@ namespace transport htobe16buf (payload + 1, 4); htobe32buf (payload + 3, ts); size_t payloadSize = 7; - if (GetRouterStatus () == eRouterStatusFirewalled) + if (GetRouterStatus () == eRouterStatusFirewalled && m_Address->IsIntroducer ()) { // relay tag request payload[payloadSize] = eSSU2BlkRelayTagRequest; diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index a287a6de..f5964b8f 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -215,6 +215,7 @@ namespace transport bool Introduce (std::shared_ptr session, uint32_t relayTag); void WaitForIntroduction (); void SendPeerTest (); // Alice, Data message + void SendKeepAlive (); void Terminate (); void RequestTermination (SSU2TerminationReason reason); void CleanUp (uint64_t ts);