From 254d2b82b3e9613514c21eaaac97a3c39f550620 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 26 Jul 2019 14:23:21 -0400 Subject: [PATCH] fixed #1393. store streams by recvStreamID --- libi2pd/Streaming.cpp | 30 ++++++++++++++---------------- libi2pd/Streaming.h | 6 ++++-- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index 7b1f187b..b8ce8167 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -964,7 +964,6 @@ namespace stream StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort, bool gzip): m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), - m_LastIncomingReceiveStreamID (0), m_PendingIncomingTimer (m_Owner->GetService ()) { } @@ -1013,18 +1012,17 @@ namespace stream if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream { uint32_t receiveStreamID = packet->GetReceiveStreamID (); - if (receiveStreamID == m_LastIncomingReceiveStreamID) + auto it1 = m_IncomingStreams.find (receiveStreamID); + if (it1 != m_IncomingStreams.end ()) { // already pending LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists"); DeletePacket (packet); // drop it, because previous should be connected return; } - auto incomingStream = CreateNewIncomingStream (); + auto incomingStream = CreateNewIncomingStream (receiveStreamID); incomingStream->HandleNextPacket (packet); // SYN auto ident = incomingStream->GetRemoteIdentity(); - - m_LastIncomingReceiveStreamID = receiveStreamID; // handle saved packets if any { @@ -1062,13 +1060,13 @@ namespace stream else // follow on packet without SYN { uint32_t receiveStreamID = packet->GetReceiveStreamID (); - for (auto& it: m_Streams) - if (it.second->GetSendStreamID () == receiveStreamID) - { - // found - it.second->HandleNextPacket (packet); - return; - } + auto it1 = m_IncomingStreams.find (receiveStreamID); + if (it1 != m_IncomingStreams.end ()) + { + // found + it1->second->HandleNextPacket (packet); + return; + } // save follow on packet auto it = m_SavedPackets.find (receiveStreamID); if (it != m_SavedPackets.end ()) @@ -1105,11 +1103,12 @@ namespace stream return s; } - std::shared_ptr StreamingDestination::CreateNewIncomingStream () + std::shared_ptr StreamingDestination::CreateNewIncomingStream (uint32_t receiveStreamID) { auto s = std::make_shared (m_Owner->GetService (), *this); std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; + m_IncomingStreams[receiveStreamID] = s; return s; } @@ -1118,9 +1117,8 @@ namespace stream if (stream) { std::unique_lock l(m_StreamsMutex); - auto it = m_Streams.find (stream->GetRecvStreamID ()); - if (it != m_Streams.end ()) - m_Streams.erase (it); + m_Streams.erase (stream->GetRecvStreamID ()); + m_IncomingStreams.erase (stream->GetSendStreamID ()); } } diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index ba52464f..49962507 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -269,8 +269,10 @@ namespace stream void AcceptOnceAcceptor (std::shared_ptr stream, Acceptor acceptor, Acceptor prev); + private: + void HandleNextPacket (Packet * packet); - std::shared_ptr CreateNewIncomingStream (); + std::shared_ptr CreateNewIncomingStream (uint32_t receiveStreamID); void HandlePendingIncomingTimer (const boost::system::error_code& ecode); private: @@ -280,8 +282,8 @@ namespace stream bool m_Gzip; // gzip compression of data messages std::mutex m_StreamsMutex; std::map > m_Streams; // sendStreamID->stream + std::map > m_IncomingStreams; // receiveStreamID->stream Acceptor m_Acceptor; - uint32_t m_LastIncomingReceiveStreamID; std::list > m_PendingIncomingStreams; boost::asio::deadline_timer m_PendingIncomingTimer; std::map > m_SavedPackets; // receiveStreamID->packets, arrived before SYN