Browse Source

keep pending incoming streams if acceptor is not set

pull/308/head
orignal 9 years ago
parent
commit
5930e2d221
  1. 55
      Streaming.cpp
  2. 9
      Streaming.h

55
Streaming.cpp

@ -787,7 +787,7 @@ namespace stream
} }
StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort): StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort):
m_Owner (owner), m_LocalPort (localPort) m_Owner (owner), m_LocalPort (localPort), m_PendingIncomingTimer (m_Owner->GetService ())
{ {
} }
@ -802,6 +802,7 @@ namespace stream
void StreamingDestination::Stop () void StreamingDestination::Stop ()
{ {
ResetAcceptor (); ResetAcceptor ();
m_PendingIncomingTimer.cancel ();
{ {
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams.clear (); m_Streams.clear ();
@ -832,8 +833,21 @@ namespace stream
m_Acceptor (incomingStream); m_Acceptor (incomingStream);
else else
{ {
LogPrint ("Acceptor for incoming stream is not set"); LogPrint (eLogInfo, "Acceptor for incoming stream is not set");
DeleteStream (incomingStream); if (m_PendingIncomingStreams.size () < MAX_PENDING_INCOMING_BACKLOG)
{
m_PendingIncomingStreams.push_back (incomingStream);
m_PendingIncomingTimer.cancel ();
m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT));
m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer,
this, std::placeholders::_1));
LogPrint (eLogInfo, "Pending incoming stream added");
}
else
{
LogPrint (eLogError, "Pending incoming streams backlog exceeds ", MAX_PENDING_INCOMING_BACKLOG);
incomingStream->Close ();
}
} }
} }
else // follow on packet without SYN else // follow on packet without SYN
@ -852,7 +866,7 @@ namespace stream
} }
} }
} }
std::shared_ptr<Stream> StreamingDestination::CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port) std::shared_ptr<Stream> StreamingDestination::CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port)
{ {
auto s = std::make_shared<Stream> (m_Owner->GetService (), *this, remote, port); auto s = std::make_shared<Stream> (m_Owner->GetService (), *this, remote, port);
@ -880,6 +894,39 @@ namespace stream
} }
} }
void StreamingDestination::SetAcceptor (const Acceptor& acceptor)
{
m_Owner->GetService ().post([acceptor, this](void)
{
m_Acceptor = acceptor;
for (auto it: m_PendingIncomingStreams)
if (it->GetStatus () == eStreamStatusOpen) // still open?
m_Acceptor (it);
m_PendingIncomingStreams.clear ();
m_PendingIncomingTimer.cancel ();
});
}
void StreamingDestination::ResetAcceptor ()
{
m_Owner->GetService ().post([this](void)
{
if (m_Acceptor) m_Acceptor (nullptr);
m_Acceptor = nullptr;
});
}
void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogInfo, "Pending incoming timeout expired");
for (auto it: m_PendingIncomingStreams)
it->Close ();
m_PendingIncomingStreams.clear ();
}
}
void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len) void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len)
{ {
// unzip it // unzip it

9
Streaming.h

@ -49,6 +49,8 @@ namespace stream
const int MAX_WINDOW_SIZE = 128; const int MAX_WINDOW_SIZE = 128;
const int INITIAL_RTT = 8000; // in milliseconds const int INITIAL_RTT = 8000; // in milliseconds
const int INITIAL_RTO = 9000; // in milliseconds const int INITIAL_RTO = 9000; // in milliseconds
const size_t MAX_PENDING_INCOMING_BACKLOG = 128;
const int PENDING_INCOMING_TIMEOUT = 10; // in seconds
struct Packet struct Packet
{ {
@ -201,8 +203,8 @@ namespace stream
std::shared_ptr<Stream> CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0); std::shared_ptr<Stream> CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0);
void DeleteStream (std::shared_ptr<Stream> stream); void DeleteStream (std::shared_ptr<Stream> stream);
void SetAcceptor (const Acceptor& acceptor) { m_Acceptor = acceptor; }; void SetAcceptor (const Acceptor& acceptor);
void ResetAcceptor () { if (m_Acceptor) m_Acceptor (nullptr); m_Acceptor = nullptr; }; void ResetAcceptor ();
bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
std::shared_ptr<i2p::client::ClientDestination> GetOwner () const { return m_Owner; }; std::shared_ptr<i2p::client::ClientDestination> GetOwner () const { return m_Owner; };
uint16_t GetLocalPort () const { return m_LocalPort; }; uint16_t GetLocalPort () const { return m_LocalPort; };
@ -213,6 +215,7 @@ namespace stream
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
std::shared_ptr<Stream> CreateNewIncomingStream (); std::shared_ptr<Stream> CreateNewIncomingStream ();
void HandlePendingIncomingTimer (const boost::system::error_code& ecode);
private: private:
@ -221,6 +224,8 @@ namespace stream
std::mutex m_StreamsMutex; std::mutex m_StreamsMutex;
std::map<uint32_t, std::shared_ptr<Stream> > m_Streams; std::map<uint32_t, std::shared_ptr<Stream> > m_Streams;
Acceptor m_Acceptor; Acceptor m_Acceptor;
std::list<std::shared_ptr<Stream> > m_PendingIncomingStreams;
boost::asio::deadline_timer m_PendingIncomingTimer;
public: public:

Loading…
Cancel
Save