diff --git a/Streaming.cpp b/Streaming.cpp index 315f513f..f2985c3b 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -21,7 +21,8 @@ namespace stream const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), - m_RemoteLeaseSet (&remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) + m_RemoteLeaseSet (&remote), m_CurrentOutboundTunnel (nullptr), + m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -30,7 +31,8 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination * local): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), - m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) + m_RemoteLeaseSet (nullptr), m_CurrentOutboundTunnel (nullptr), + m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -102,6 +104,7 @@ namespace stream { // we have received duplicate. Most likely our outbound tunnel is dead LogPrint ("Duplicate message ", receivedSeqn, " received"); + m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel UpdateCurrentRemoteLease (); // pick another lease SendQuickAck (); // resend ack for previous message again delete packet; // packet dropped @@ -401,8 +404,8 @@ namespace stream m_LeaseSetUpdated = false; } - auto outboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); - if (outboundTunnel) + m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); + if (m_CurrentOutboundTunnel) { auto ts = i2p::util::GetMillisecondsSinceEpoch (); if (ts >= m_CurrentRemoteLease.endDate) @@ -422,7 +425,7 @@ namespace stream }); leaseSet = nullptr; // send leaseSet only one time } - outboundTunnel->SendTunnelDataMsg (msgs); + m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); } else LogPrint ("All leases are expired"); @@ -458,6 +461,7 @@ namespace stream } if (packets.size () > 0) { + m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel UpdateCurrentRemoteLease (); // pick another lease SendPackets (packets); } diff --git a/Streaming.h b/Streaming.h index 94f71e2e..e3425e57 100644 --- a/Streaming.h +++ b/Streaming.h @@ -128,6 +128,7 @@ namespace stream i2p::data::Identity m_RemoteIdentity; const i2p::data::LeaseSet * m_RemoteLeaseSet; i2p::data::Lease m_CurrentRemoteLease; + i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel; std::queue m_ReceiveQueue; std::set m_SavedPackets; std::set m_SentPackets; diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 46859c53..2cfcac99 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -74,23 +74,29 @@ namespace tunnel return v; } - OutboundTunnel * TunnelPool::GetNextOutboundTunnel () + OutboundTunnel * TunnelPool::GetNextOutboundTunnel (OutboundTunnel * suggested) { - return GetNextTunnel (m_OutboundTunnels); + return GetNextTunnel (m_OutboundTunnels, suggested); } - InboundTunnel * TunnelPool::GetNextInboundTunnel () + InboundTunnel * TunnelPool::GetNextInboundTunnel (InboundTunnel * suggested) { - return GetNextTunnel (m_InboundTunnels); + return GetNextTunnel (m_InboundTunnels, suggested); } template - typename TTunnels::value_type TunnelPool::GetNextTunnel (TTunnels& tunnels) + typename TTunnels::value_type TunnelPool::GetNextTunnel (TTunnels& tunnels, + typename TTunnels::value_type suggested) { if (tunnels.empty ()) return nullptr; + uint64_t ts = i2p::util::GetSecondsSinceEpoch (); + if (suggested && tunnels.count (suggested) > 0 && + ts + TUNNEL_EXPIRATION_THRESHOLD <= suggested->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) + return suggested; for (auto it: tunnels) - if (!it->IsFailed ()) - return it; + if (!it->IsFailed () && ts + TUNNEL_EXPIRATION_THRESHOLD <= + it->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) + return it; return nullptr; } diff --git a/TunnelPool.h b/TunnelPool.h index 15682ace..043a872e 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -19,6 +19,7 @@ namespace tunnel class InboundTunnel; class OutboundTunnel; + const int TUNNEL_EXPIRATION_THRESHOLD = 60; // 1 minute class TunnelPool // per local destination { public: @@ -37,8 +38,8 @@ namespace tunnel void TunnelCreated (OutboundTunnel * createdTunnel); void TunnelExpired (OutboundTunnel * expiredTunnel); std::vector GetInboundTunnels (int num) const; - OutboundTunnel * GetNextOutboundTunnel (); - InboundTunnel * GetNextInboundTunnel (); + OutboundTunnel * GetNextOutboundTunnel (OutboundTunnel * suggested = nullptr); + InboundTunnel * GetNextInboundTunnel (InboundTunnel * suggested = nullptr); const i2p::data::IdentHash& GetIdentHash () { return m_LocalDestination.GetIdentHash (); }; void TestTunnels (); @@ -51,7 +52,8 @@ namespace tunnel void RecreateInboundTunnel (InboundTunnel * tunnel); void RecreateOutboundTunnel (OutboundTunnel * tunnel); template - typename TTunnels::value_type GetNextTunnel (TTunnels& tunnels); + typename TTunnels::value_type GetNextTunnel (TTunnels& tunnels, + typename TTunnels::value_type suggested = nullptr); private: