From cd1af85e39e0e683af0ea74980d6bb3874bead71 Mon Sep 17 00:00:00 2001 From: orignal Date: Wed, 29 Mar 2023 15:54:53 -0400 Subject: [PATCH] bypass slow transport sessions --- daemon/HTTPServer.cpp | 1 + libi2pd/NTCP2.cpp | 4 ++++ libi2pd/SSU2Session.cpp | 4 ++++ libi2pd/TransportSession.h | 10 ++++++++-- libi2pd/Transports.cpp | 5 +++-- libi2pd/Transports.h | 10 ++++++++-- 6 files changed, 28 insertions(+), 6 deletions(-) diff --git a/daemon/HTTPServer.cpp b/daemon/HTTPServer.cpp index 3d5ff1a0..29f3f569 100644 --- a/daemon/HTTPServer.cpp +++ b/daemon/HTTPServer.cpp @@ -838,6 +838,7 @@ namespace http { tmp_s << " [itag:" << it->GetRelayTag () << "]"; if (it->GetSendQueueSize () > 0) tmp_s << " [queue:" << it->GetSendQueueSize () << "]"; + if (it->IsSlow ()) tmp_s << " [slow]"; tmp_s << "\r\n" << std::endl; cnt++; } diff --git a/libi2pd/NTCP2.cpp b/libi2pd/NTCP2.cpp index eaa186f8..48c905ca 100644 --- a/libi2pd/NTCP2.cpp +++ b/libi2pd/NTCP2.cpp @@ -452,6 +452,7 @@ namespace transport { m_Establisher->CreateSessionRequestMessage (); // send message + m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch (); boost::asio::async_write (m_Socket, boost::asio::buffer (m_Establisher->m_SessionRequestBuffer, m_Establisher->m_SessionRequestBufferLen), boost::asio::transfer_all (), std::bind(&NTCP2Session::HandleSessionRequestSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -529,6 +530,7 @@ namespace transport { m_Establisher->CreateSessionCreatedMessage (); // send message + m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch (); boost::asio::async_write (m_Socket, boost::asio::buffer (m_Establisher->m_SessionCreatedBuffer, m_Establisher->m_SessionCreatedBufferLen), boost::asio::transfer_all (), std::bind(&NTCP2Session::HandleSessionCreatedSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -542,6 +544,7 @@ namespace transport } else { + m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch () - m_HandshakeInterval; LogPrint (eLogDebug, "NTCP2: SessionCreated received ", bytes_transferred); uint16_t paddingLen = 0; if (m_Establisher->ProcessSessionCreatedMessage (paddingLen)) @@ -646,6 +649,7 @@ namespace transport } else { + m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch () - m_HandshakeInterval; LogPrint (eLogDebug, "NTCP2: SessionConfirmed received"); // part 1 uint8_t nonce[12]; diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index c492279d..c618d136 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -648,6 +648,7 @@ namespace transport if (m_State == eSSU2SessionStateTokenReceived || m_Server.AddPendingOutgoingSession (shared_from_this ())) { m_State = eSSU2SessionStateSessionRequestSent; + m_HandshakeInterval = ts; m_Server.Send (header.buf, 16, headerX, 48, payload, payloadSize, m_RemoteEndpoint); } else @@ -770,6 +771,7 @@ namespace transport m_State = eSSU2SessionStateSessionCreatedSent; m_SentHandshakePacket->payloadSize = payloadSize; // send + m_HandshakeInterval = ts; m_Server.Send (header.buf, 16, headerX, 48, payload, payloadSize, m_RemoteEndpoint); } @@ -790,6 +792,7 @@ namespace transport LogPrint (eLogWarning, "SSU2: SessionCreated message too short ", len); return false; } + m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch () - m_HandshakeInterval; const uint8_t nonce[12] = {0}; uint8_t headerX[48]; i2p::crypto::ChaCha20 (buf + 16, 48, kh2, nonce, headerX); @@ -995,6 +998,7 @@ namespace transport if (m_SessionConfirmedFragment) m_SessionConfirmedFragment.reset (nullptr); return false; } + m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch () - m_HandshakeInterval; // KDF for Session Confirmed part 1 m_NoiseState->MixHash (header.buf, 16); // h = SHA256(h || header) // decrypt part1 diff --git a/libi2pd/TransportSession.h b/libi2pd/TransportSession.h index 83b6cacd..7d2f2653 100644 --- a/libi2pd/TransportSession.h +++ b/libi2pd/TransportSession.h @@ -69,6 +69,8 @@ namespace transport std::stringstream m_Stream; }; + const int64_t TRANSPORT_SESSION_SLOWNESS_THRESHOLD = 500; // in milliseconds + const int64_t TRANSPORT_SESSION_MAX_HANDSHAKE_INTERVAL = 10000; // in milliseconds class TransportSession { public: @@ -76,7 +78,8 @@ namespace transport TransportSession (std::shared_ptr router, int terminationTimeout): m_NumSentBytes (0), m_NumReceivedBytes (0), m_SendQueueSize (0), m_IsOutgoing (router), m_TerminationTimeout (terminationTimeout), - m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ()) + m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ()), + m_HandshakeInterval (0) { if (router) m_RemoteIdentity = router->GetRouterIdentity (); @@ -103,7 +106,9 @@ namespace transport size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; size_t GetSendQueueSize () const { return m_SendQueueSize; }; bool IsOutgoing () const { return m_IsOutgoing; }; - + bool IsSlow () const { return m_HandshakeInterval > TRANSPORT_SESSION_SLOWNESS_THRESHOLD && + m_HandshakeInterval < TRANSPORT_SESSION_MAX_HANDSHAKE_INTERVAL; }; + int GetTerminationTimeout () const { return m_TerminationTimeout; }; void SetTerminationTimeout (int terminationTimeout) { m_TerminationTimeout = terminationTimeout; }; bool IsTerminationTimeoutExpired (uint64_t ts) const @@ -129,6 +134,7 @@ namespace transport int m_TerminationTimeout; uint64_t m_LastActivityTimestamp; uint32_t m_CreationTime; // seconds since epoch + int64_t m_HandshakeInterval; // in milliseconds between SessionRequest->SessionCreated or SessionCreated->SessionConfirmed }; // SOCKS5 proxy diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index aa326a92..4e97bb26 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -906,9 +906,10 @@ namespace transport return GetRandomPeer ( [isHighBandwidth](const Peer& peer)->bool { - // connected and not overloaded - return !peer.router && !peer.sessions.empty () && + // connected, not overloaded and not slow + return !peer.router && !peer.sessions.empty () && peer.isReachable && peer.sessions.front ()->GetSendQueueSize () <= PEER_ROUTER_INFO_OVERLOAD_QUEUE_SIZE && + !peer.sessions.front ()->IsSlow () && (!isHighBandwidth || peer.isHighBandwidth); }); } diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index dc97a952..1058d31b 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -72,15 +72,18 @@ namespace transport uint64_t creationTime, nextRouterInfoUpdateTime; std::vector > delayedMessages; std::vector priority; - bool isHighBandwidth; + bool isHighBandwidth, isReachable; Peer (std::shared_ptr r, uint64_t ts): numAttempts (0), router (r), creationTime (ts), nextRouterInfoUpdateTime (ts + PEER_ROUTER_INFO_UPDATE_INTERVAL), - isHighBandwidth (false) + isHighBandwidth (false), isReachable (false) { if (router) + { isHighBandwidth = router->IsHighBandwidth (); + isReachable = (bool)router->GetCompatibleTransports (true); + } } void Done () @@ -93,7 +96,10 @@ namespace transport { router = r; if (router) + { isHighBandwidth = router->IsHighBandwidth (); + isReachable = (bool)router->GetCompatibleTransports (true); + } } };