Browse Source

save and check last stream

pull/1677/head
orignal 3 years ago
parent
commit
5781335814
  1. 21
      libi2pd/Streaming.cpp
  2. 11
      libi2pd/Streaming.h

21
libi2pd/Streaming.cpp

@ -1050,6 +1050,7 @@ namespace stream
it.second->Terminate (false); // we delete here it.second->Terminate (false); // we delete here
m_Streams.clear (); m_Streams.clear ();
m_IncomingStreams.clear (); m_IncomingStreams.clear ();
m_LastStream = nullptr;
} }
} }
@ -1058,9 +1059,16 @@ namespace stream
uint32_t sendStreamID = packet->GetSendStreamID (); uint32_t sendStreamID = packet->GetSendStreamID ();
if (sendStreamID) if (sendStreamID)
{ {
auto it = m_Streams.find (sendStreamID); if (!m_LastStream || sendStreamID != m_LastStream->GetRecvStreamID ())
if (it != m_Streams.end ()) {
it->second->HandleNextPacket (packet); auto it = m_Streams.find (sendStreamID);
if (it != m_Streams.end ())
m_LastStream = it->second;
else
m_LastStream = nullptr;
}
if (m_LastStream)
m_LastStream->HandleNextPacket (packet);
else if (packet->IsEcho () && m_Owner->IsStreamingAnswerPings ()) else if (packet->IsEcho () && m_Owner->IsStreamingAnswerPings ())
{ {
// ping // ping
@ -1166,7 +1174,7 @@ namespace stream
{ {
auto s = std::make_shared<Stream> (m_Owner->GetService (), *this, remote, port); auto s = std::make_shared<Stream> (m_Owner->GetService (), *this, remote, port);
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams.emplace (s->GetRecvStreamID (), s);
return s; return s;
} }
@ -1174,8 +1182,8 @@ namespace stream
{ {
auto s = std::make_shared<Stream> (m_Owner->GetService (), *this); auto s = std::make_shared<Stream> (m_Owner->GetService (), *this);
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams.emplace (s->GetRecvStreamID (), s);
m_IncomingStreams[receiveStreamID] = s; m_IncomingStreams.emplace (receiveStreamID, s);
return s; return s;
} }
@ -1186,6 +1194,7 @@ namespace stream
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams.erase (stream->GetRecvStreamID ()); m_Streams.erase (stream->GetRecvStreamID ());
m_IncomingStreams.erase (stream->GetSendStreamID ()); m_IncomingStreams.erase (stream->GetSendStreamID ());
if (m_LastStream == stream) m_LastStream = nullptr;
} }
} }

11
libi2pd/Streaming.h

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2020, The PurpleI2P Project * Copyright (c) 2013-2021, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -11,7 +11,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <string> #include <string>
#include <map> #include <unordered_map>
#include <set> #include <set>
#include <queue> #include <queue>
#include <functional> #include <functional>
@ -297,12 +297,13 @@ namespace stream
uint16_t m_LocalPort; uint16_t m_LocalPort;
bool m_Gzip; // gzip compression of data messages bool m_Gzip; // gzip compression of data messages
std::mutex m_StreamsMutex; std::mutex m_StreamsMutex;
std::map<uint32_t, std::shared_ptr<Stream> > m_Streams; // sendStreamID->stream std::unordered_map<uint32_t, std::shared_ptr<Stream> > m_Streams; // sendStreamID->stream
std::map<uint32_t, std::shared_ptr<Stream> > m_IncomingStreams; // receiveStreamID->stream std::unordered_map<uint32_t, std::shared_ptr<Stream> > m_IncomingStreams; // receiveStreamID->stream
std::shared_ptr<Stream> m_LastStream;
Acceptor m_Acceptor; Acceptor m_Acceptor;
std::list<std::shared_ptr<Stream> > m_PendingIncomingStreams; std::list<std::shared_ptr<Stream> > m_PendingIncomingStreams;
boost::asio::deadline_timer m_PendingIncomingTimer; boost::asio::deadline_timer m_PendingIncomingTimer;
std::map<uint32_t, std::list<Packet *> > m_SavedPackets; // receiveStreamID->packets, arrived before SYN std::unordered_map<uint32_t, std::list<Packet *> > m_SavedPackets; // receiveStreamID->packets, arrived before SYN
i2p::util::MemoryPool<Packet> m_PacketsPool; i2p::util::MemoryPool<Packet> m_PacketsPool;
i2p::util::MemoryPool<I2NPMessageBuffer<I2NP_MAX_MESSAGE_SIZE> > m_I2NPMsgsPool; i2p::util::MemoryPool<I2NPMessageBuffer<I2NP_MAX_MESSAGE_SIZE> > m_I2NPMsgsPool;

Loading…
Cancel
Save