Browse Source

fixed #1393. store streams by recvStreamID

pull/1405/head
orignal 5 years ago
parent
commit
254d2b82b3
  1. 30
      libi2pd/Streaming.cpp
  2. 6
      libi2pd/Streaming.h

30
libi2pd/Streaming.cpp

@ -964,7 +964,6 @@ namespace stream
StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort, bool gzip): StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort, bool gzip):
m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip),
m_LastIncomingReceiveStreamID (0),
m_PendingIncomingTimer (m_Owner->GetService ()) m_PendingIncomingTimer (m_Owner->GetService ())
{ {
} }
@ -1013,19 +1012,18 @@ namespace stream
if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream
{ {
uint32_t receiveStreamID = packet->GetReceiveStreamID (); uint32_t receiveStreamID = packet->GetReceiveStreamID ();
if (receiveStreamID == m_LastIncomingReceiveStreamID) auto it1 = m_IncomingStreams.find (receiveStreamID);
if (it1 != m_IncomingStreams.end ())
{ {
// already pending // already pending
LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists"); LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists");
DeletePacket (packet); // drop it, because previous should be connected DeletePacket (packet); // drop it, because previous should be connected
return; return;
} }
auto incomingStream = CreateNewIncomingStream (); auto incomingStream = CreateNewIncomingStream (receiveStreamID);
incomingStream->HandleNextPacket (packet); // SYN incomingStream->HandleNextPacket (packet); // SYN
auto ident = incomingStream->GetRemoteIdentity(); auto ident = incomingStream->GetRemoteIdentity();
m_LastIncomingReceiveStreamID = receiveStreamID;
// handle saved packets if any // handle saved packets if any
{ {
auto it = m_SavedPackets.find (receiveStreamID); auto it = m_SavedPackets.find (receiveStreamID);
@ -1062,13 +1060,13 @@ namespace stream
else // follow on packet without SYN else // follow on packet without SYN
{ {
uint32_t receiveStreamID = packet->GetReceiveStreamID (); uint32_t receiveStreamID = packet->GetReceiveStreamID ();
for (auto& it: m_Streams) auto it1 = m_IncomingStreams.find (receiveStreamID);
if (it.second->GetSendStreamID () == receiveStreamID) if (it1 != m_IncomingStreams.end ())
{ {
// found // found
it.second->HandleNextPacket (packet); it1->second->HandleNextPacket (packet);
return; return;
} }
// save follow on packet // save follow on packet
auto it = m_SavedPackets.find (receiveStreamID); auto it = m_SavedPackets.find (receiveStreamID);
if (it != m_SavedPackets.end ()) if (it != m_SavedPackets.end ())
@ -1105,11 +1103,12 @@ namespace stream
return s; return s;
} }
std::shared_ptr<Stream> StreamingDestination::CreateNewIncomingStream () std::shared_ptr<Stream> StreamingDestination::CreateNewIncomingStream (uint32_t receiveStreamID)
{ {
auto s = std::make_shared<Stream> (m_Owner->GetService (), *this); auto s = std::make_shared<Stream> (m_Owner->GetService (), *this);
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
m_IncomingStreams[receiveStreamID] = s;
return s; return s;
} }
@ -1118,9 +1117,8 @@ namespace stream
if (stream) if (stream)
{ {
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
auto it = m_Streams.find (stream->GetRecvStreamID ()); m_Streams.erase (stream->GetRecvStreamID ());
if (it != m_Streams.end ()) m_IncomingStreams.erase (stream->GetSendStreamID ());
m_Streams.erase (it);
} }
} }

6
libi2pd/Streaming.h

@ -269,8 +269,10 @@ namespace stream
void AcceptOnceAcceptor (std::shared_ptr<Stream> stream, Acceptor acceptor, Acceptor prev); void AcceptOnceAcceptor (std::shared_ptr<Stream> stream, Acceptor acceptor, Acceptor prev);
private:
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
std::shared_ptr<Stream> CreateNewIncomingStream (); std::shared_ptr<Stream> CreateNewIncomingStream (uint32_t receiveStreamID);
void HandlePendingIncomingTimer (const boost::system::error_code& ecode); void HandlePendingIncomingTimer (const boost::system::error_code& ecode);
private: private:
@ -280,8 +282,8 @@ namespace stream
bool m_Gzip; // gzip compression of data messages bool m_Gzip; // gzip compression of data messages
std::mutex m_StreamsMutex; std::mutex m_StreamsMutex;
std::map<uint32_t, std::shared_ptr<Stream> > m_Streams; // sendStreamID->stream std::map<uint32_t, std::shared_ptr<Stream> > m_Streams; // sendStreamID->stream
std::map<uint32_t, std::shared_ptr<Stream> > m_IncomingStreams; // receiveStreamID->stream
Acceptor m_Acceptor; Acceptor m_Acceptor;
uint32_t m_LastIncomingReceiveStreamID;
std::list<std::shared_ptr<Stream> > m_PendingIncomingStreams; std::list<std::shared_ptr<Stream> > m_PendingIncomingStreams;
boost::asio::deadline_timer m_PendingIncomingTimer; boost::asio::deadline_timer m_PendingIncomingTimer;
std::map<uint32_t, std::list<Packet *> > m_SavedPackets; // receiveStreamID->packets, arrived before SYN std::map<uint32_t, std::list<Packet *> > m_SavedPackets; // receiveStreamID->packets, arrived before SYN

Loading…
Cancel
Save