From 3d19e920599fb15145e1281fdc300f2a6e174553 Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 8 Feb 2016 14:42:20 -0500 Subject: [PATCH 1/3] queue up out of sequence packets --- Streaming.cpp | 48 ++++++++++++++++++++++++++++++++++++++++++++++-- Streaming.h | 5 +++-- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index ea90c1f0..7c27c5ce 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -787,6 +787,12 @@ namespace stream StreamingDestination::~StreamingDestination () { + for (auto it: m_SavedPackets) + { + for (auto it1: it.second) delete it1; + it.second.clear (); + } + m_SavedPackets.clear (); } void StreamingDestination::Start () @@ -822,7 +828,20 @@ namespace stream if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream { auto incomingStream = CreateNewIncomingStream (); - incomingStream->HandleNextPacket (packet); + uint32_t receiveStreamID = packet->GetReceiveStreamID (); + incomingStream->HandleNextPacket (packet); // SYN + // handle saved packets if any + { + auto it = m_SavedPackets.find (receiveStreamID); + if (it != m_SavedPackets.end ()) + { + LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for receiveStreamID=", receiveStreamID); + for (auto it1: it->second) + incomingStream->HandleNextPacket (it1); + m_SavedPackets.erase (it); + } + } + // accept if (m_Acceptor != nullptr) m_Acceptor (incomingStream); else @@ -834,7 +853,7 @@ namespace stream m_PendingIncomingTimer.cancel (); m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, - this, std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); LogPrint (eLogDebug, "Streaming: Pending incoming stream added"); } else @@ -854,6 +873,31 @@ namespace stream it.second->HandleNextPacket (packet); return; } + // save follow on packet + auto it = m_SavedPackets.find (receiveStreamID); + if (it != m_SavedPackets.end ()) + it->second.push_back (packet); + else + { + m_SavedPackets.emplace (receiveStreamID, std::list{ packet }); + auto timer = std::make_shared (m_Owner->GetService ()); + timer->expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); + auto s = shared_from_this (); + timer->async_wait ([s,timer,receiveStreamID](const boost::system::error_code& ecode) + { + if (ecode == boost::asio::error::operation_aborted) + { + auto it = s->m_SavedPackets.find (receiveStreamID); + if (it != s->m_SavedPackets.end ()) + { + for (auto it1: it->second) delete it1; + it->second.clear (); + s->m_SavedPackets.erase (it); + } + } + }); + } + // TODO: should queue it up LogPrint (eLogError, "Streaming: Unknown stream receiveStreamID=", receiveStreamID); delete packet; diff --git a/Streaming.h b/Streaming.h index 519bda31..0b240c0b 100644 --- a/Streaming.h +++ b/Streaming.h @@ -189,7 +189,7 @@ namespace stream SendHandler m_SendHandler; }; - class StreamingDestination + class StreamingDestination: public std::enable_shared_from_this { public: @@ -222,10 +222,11 @@ namespace stream std::shared_ptr m_Owner; uint16_t m_LocalPort; std::mutex m_StreamsMutex; - std::map > m_Streams; + std::map > m_Streams; // sendStreamID->stream Acceptor m_Acceptor; std::list > m_PendingIncomingStreams; boost::asio::deadline_timer m_PendingIncomingTimer; + std::map > m_SavedPackets; // receiveStreamID->packets, arrived before SYN public: From 74f03202b787a4405ce76f994c9afa02711b7376 Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 8 Feb 2016 15:02:17 -0500 Subject: [PATCH 2/3] queue up out of sequence packets --- Streaming.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 7c27c5ce..ee5576c7 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -897,10 +897,6 @@ namespace stream } }); } - - // TODO: should queue it up - LogPrint (eLogError, "Streaming: Unknown stream receiveStreamID=", receiveStreamID); - delete packet; } } } From e2e101e4fb74d6c5ed54e06cedce5ca578b9180b Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 8 Feb 2016 15:47:39 -0500 Subject: [PATCH 3/3] queue up out of sequence packets --- Streaming.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Streaming.cpp b/Streaming.cpp index ee5576c7..8e0b223d 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -885,7 +885,7 @@ namespace stream auto s = shared_from_this (); timer->async_wait ([s,timer,receiveStreamID](const boost::system::error_code& ecode) { - if (ecode == boost::asio::error::operation_aborted) + if (ecode != boost::asio::error::operation_aborted) { auto it = s->m_SavedPackets.find (receiveStreamID); if (it != s->m_SavedPackets.end ())