diff --git a/Streaming.cpp b/Streaming.cpp index 3f32e9a0..e11df406 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -18,7 +18,7 @@ namespace stream m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), - m_LastWindowSizeIncreaseTime (0) + m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -29,7 +29,7 @@ namespace stream m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), - m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0) + m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -267,7 +267,10 @@ namespace stream if (m_SentPackets.empty ()) m_ResendTimer.cancel (); if (acknowledged) + { + m_NumResendAttempts = 0; SendBuffer (); + } if (m_Status == eStreamStatusClosing) Close (); // all outgoing messages have been sent } @@ -602,45 +605,51 @@ namespace stream void Stream::HandleResendTimer (const boost::system::error_code& ecode) { - if (ecode != boost::asio::error::operation_aborted) + if (ecode != boost::asio::error::operation_aborted) { + // check for resend attempts + if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS) + { + LogPrint (eLogWarning, "Stream packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate"); + m_Status = eStreamStatusReset; + Close (); + return; + } + + // collect packets to resend auto ts = i2p::util::GetMillisecondsSinceEpoch (); - bool congesion = false, first = true; std::vector packets; for (auto it : m_SentPackets) { - if (ts < it->sendTime + m_RTO) continue; // don't resend too early - it->numResendAttempts++; - if (first && it->numResendAttempts == 1) // detect congesion at first attempt of first packet only - congesion = true; - first = false; - if (it->numResendAttempts <= MAX_NUM_RESEND_ATTEMPTS) + if (ts >= it->sendTime + m_RTO) { it->sendTime = ts; packets.push_back (it); - } - else - { - LogPrint (eLogWarning, "Packet ", it->GetSeqn (), " was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate"); - m_Status = eStreamStatusReset; - Close (); - return; - } + } } + + // select tunnels if necessary and send if (packets.size () > 0) { - if (congesion) - { - // congesion avoidance - m_WindowSize /= 2; - if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; - } - else + m_NumResendAttempts++; + switch (m_NumResendAttempts) { - // congesion avoidance didn't help - m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel - UpdateCurrentRemoteLease (); // pick another lease - m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change + case 1: // congesion avoidance + m_WindowSize /= 2; + if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; + break; + case 2: + case 4: + UpdateCurrentRemoteLease (); // pick another lease + m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change + LogPrint (eLogWarning, "Another remote lease has been selected stream"); + break; + case 3: + // pick another outbound tunnel + m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ().GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); + LogPrint (eLogWarning, "Another outbound tunnel has been selected for stream"); + break; + default: ; } SendPackets (packets); } diff --git a/Streaming.h b/Streaming.h index 9cccd19d..1ba01ea0 100644 --- a/Streaming.h +++ b/Streaming.h @@ -42,7 +42,7 @@ namespace stream const size_t MAX_PACKET_SIZE = 4096; const size_t COMPRESSION_THRESHOLD_SIZE = 66; const int ACK_SEND_TIMEOUT = 200; // in milliseconds - const int MAX_NUM_RESEND_ATTEMPTS = 5; + const int MAX_NUM_RESEND_ATTEMPTS = 6; const int WINDOW_SIZE = 6; // in messages const int MIN_WINDOW_SIZE = 1; const int MAX_WINDOW_SIZE = 128; @@ -53,10 +53,9 @@ namespace stream { size_t len, offset; uint8_t buf[MAX_PACKET_SIZE]; - int numResendAttempts; uint64_t sendTime; - Packet (): len (0), offset (0), numResendAttempts (0), sendTime (0) {}; + Packet (): len (0), offset (0), sendTime (0) {}; uint8_t * GetBuffer () { return buf + offset; }; size_t GetLength () const { return len - offset; }; @@ -179,6 +178,7 @@ namespace stream std::stringstream m_SendBuffer; int m_WindowSize, m_RTT, m_RTO; uint64_t m_LastWindowSizeIncreaseTime; + int m_NumResendAttempts; }; class StreamingDestination diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 7a98621c..82de6267 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -101,20 +101,20 @@ namespace tunnel return v; } - std::shared_ptr TunnelPool::GetNextOutboundTunnel () const + std::shared_ptr TunnelPool::GetNextOutboundTunnel (std::shared_ptr excluded) const { std::unique_lock l(m_OutboundTunnelsMutex); - return GetNextTunnel (m_OutboundTunnels); + return GetNextTunnel (m_OutboundTunnels, excluded); } - std::shared_ptr TunnelPool::GetNextInboundTunnel () const + std::shared_ptr TunnelPool::GetNextInboundTunnel (std::shared_ptr excluded) const { std::unique_lock l(m_InboundTunnelsMutex); - return GetNextTunnel (m_InboundTunnels); + return GetNextTunnel (m_InboundTunnels, excluded); } template - typename TTunnels::value_type TunnelPool::GetNextTunnel (TTunnels& tunnels) const + typename TTunnels::value_type TunnelPool::GetNextTunnel (TTunnels& tunnels, typename TTunnels::value_type excluded) const { if (tunnels.empty ()) return nullptr; CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator (); @@ -122,13 +122,14 @@ namespace tunnel typename TTunnels::value_type tunnel = nullptr; for (auto it: tunnels) { - if (it->IsEstablished ()) + if (it->IsEstablished () && it != excluded) { tunnel = it; i++; } if (i > ind && tunnel) break; - } + } + if (!tunnel && excluded && excluded->IsEstablished ()) tunnel = excluded; return tunnel; } diff --git a/TunnelPool.h b/TunnelPool.h index 14f70841..ee0687cc 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -39,8 +39,8 @@ namespace tunnel void TunnelCreated (std::shared_ptr createdTunnel); void TunnelExpired (std::shared_ptr expiredTunnel); std::vector > GetInboundTunnels (int num) const; - std::shared_ptr GetNextOutboundTunnel () const; - std::shared_ptr GetNextInboundTunnel () const; + std::shared_ptr GetNextOutboundTunnel (std::shared_ptr excluded = nullptr) const; + std::shared_ptr GetNextInboundTunnel (std::shared_ptr excluded = nullptr) const; void TestTunnels (); void ProcessGarlicMessage (I2NPMessage * msg); @@ -57,7 +57,7 @@ namespace tunnel void RecreateInboundTunnel (std::shared_ptr tunnel); void RecreateOutboundTunnel (std::shared_ptr tunnel); template - typename TTunnels::value_type GetNextTunnel (TTunnels& tunnels) const; + typename TTunnels::value_type GetNextTunnel (TTunnels& tunnels, typename TTunnels::value_type excluded) const; std::shared_ptr SelectNextHop (std::shared_ptr prevHop) const; private: