From 5930e2d221ef3eb09e848ed7a92fb2fa3b692b8b Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 14 Dec 2015 22:23:28 -0500 Subject: [PATCH] keep pending incoming streams if acceptor is not set --- Streaming.cpp | 55 +++++++++++++++++++++++++++++++++++++++++++++++---- Streaming.h | 9 +++++++-- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 0b4e514d..deed2adb 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -787,7 +787,7 @@ namespace stream } StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort): - m_Owner (owner), m_LocalPort (localPort) + m_Owner (owner), m_LocalPort (localPort), m_PendingIncomingTimer (m_Owner->GetService ()) { } @@ -802,6 +802,7 @@ namespace stream void StreamingDestination::Stop () { ResetAcceptor (); + m_PendingIncomingTimer.cancel (); { std::unique_lock l(m_StreamsMutex); m_Streams.clear (); @@ -832,8 +833,21 @@ namespace stream m_Acceptor (incomingStream); else { - LogPrint ("Acceptor for incoming stream is not set"); - DeleteStream (incomingStream); + LogPrint (eLogInfo, "Acceptor for incoming stream is not set"); + if (m_PendingIncomingStreams.size () < MAX_PENDING_INCOMING_BACKLOG) + { + m_PendingIncomingStreams.push_back (incomingStream); + m_PendingIncomingTimer.cancel (); + m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); + m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, + this, std::placeholders::_1)); + LogPrint (eLogInfo, "Pending incoming stream added"); + } + else + { + LogPrint (eLogError, "Pending incoming streams backlog exceeds ", MAX_PENDING_INCOMING_BACKLOG); + incomingStream->Close (); + } } } else // follow on packet without SYN @@ -852,7 +866,7 @@ namespace stream } } } - + std::shared_ptr StreamingDestination::CreateNewOutgoingStream (std::shared_ptr remote, int port) { auto s = std::make_shared (m_Owner->GetService (), *this, remote, port); @@ -880,6 +894,39 @@ namespace stream } } + void StreamingDestination::SetAcceptor (const Acceptor& acceptor) + { + m_Owner->GetService ().post([acceptor, this](void) + { + m_Acceptor = acceptor; + for (auto it: m_PendingIncomingStreams) + if (it->GetStatus () == eStreamStatusOpen) // still open? + m_Acceptor (it); + m_PendingIncomingStreams.clear (); + m_PendingIncomingTimer.cancel (); + }); + } + + void StreamingDestination::ResetAcceptor () + { + m_Owner->GetService ().post([this](void) + { + if (m_Acceptor) m_Acceptor (nullptr); + m_Acceptor = nullptr; + }); + } + + void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + LogPrint (eLogInfo, "Pending incoming timeout expired"); + for (auto it: m_PendingIncomingStreams) + it->Close (); + m_PendingIncomingStreams.clear (); + } + } + void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len) { // unzip it diff --git a/Streaming.h b/Streaming.h index 580ddf7c..519bda31 100644 --- a/Streaming.h +++ b/Streaming.h @@ -49,6 +49,8 @@ namespace stream const int MAX_WINDOW_SIZE = 128; const int INITIAL_RTT = 8000; // in milliseconds const int INITIAL_RTO = 9000; // in milliseconds + const size_t MAX_PENDING_INCOMING_BACKLOG = 128; + const int PENDING_INCOMING_TIMEOUT = 10; // in seconds struct Packet { @@ -201,8 +203,8 @@ namespace stream std::shared_ptr CreateNewOutgoingStream (std::shared_ptr remote, int port = 0); void DeleteStream (std::shared_ptr stream); - void SetAcceptor (const Acceptor& acceptor) { m_Acceptor = acceptor; }; - void ResetAcceptor () { if (m_Acceptor) m_Acceptor (nullptr); m_Acceptor = nullptr; }; + void SetAcceptor (const Acceptor& acceptor); + void ResetAcceptor (); bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; std::shared_ptr GetOwner () const { return m_Owner; }; uint16_t GetLocalPort () const { return m_LocalPort; }; @@ -213,6 +215,7 @@ namespace stream void HandleNextPacket (Packet * packet); std::shared_ptr CreateNewIncomingStream (); + void HandlePendingIncomingTimer (const boost::system::error_code& ecode); private: @@ -221,6 +224,8 @@ namespace stream std::mutex m_StreamsMutex; std::map > m_Streams; Acceptor m_Acceptor; + std::list > m_PendingIncomingStreams; + boost::asio::deadline_timer m_PendingIncomingTimer; public: