diff --git a/Streaming.cpp b/Streaming.cpp index ea90c1f0..8e0b223d 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -787,6 +787,12 @@ namespace stream StreamingDestination::~StreamingDestination () { + for (auto it: m_SavedPackets) + { + for (auto it1: it.second) delete it1; + it.second.clear (); + } + m_SavedPackets.clear (); } void StreamingDestination::Start () @@ -822,7 +828,20 @@ namespace stream if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream { auto incomingStream = CreateNewIncomingStream (); - incomingStream->HandleNextPacket (packet); + uint32_t receiveStreamID = packet->GetReceiveStreamID (); + incomingStream->HandleNextPacket (packet); // SYN + // handle saved packets if any + { + auto it = m_SavedPackets.find (receiveStreamID); + if (it != m_SavedPackets.end ()) + { + LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for receiveStreamID=", receiveStreamID); + for (auto it1: it->second) + incomingStream->HandleNextPacket (it1); + m_SavedPackets.erase (it); + } + } + // accept if (m_Acceptor != nullptr) m_Acceptor (incomingStream); else @@ -834,7 +853,7 @@ namespace stream m_PendingIncomingTimer.cancel (); m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, - this, std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); LogPrint (eLogDebug, "Streaming: Pending incoming stream added"); } else @@ -854,9 +873,30 @@ namespace stream it.second->HandleNextPacket (packet); return; } - // TODO: should queue it up - LogPrint (eLogError, "Streaming: Unknown stream receiveStreamID=", receiveStreamID); - delete packet; + // save follow on packet + auto it = m_SavedPackets.find (receiveStreamID); + if (it != m_SavedPackets.end ()) + it->second.push_back (packet); + else + { + m_SavedPackets.emplace (receiveStreamID, std::list{ packet }); + auto timer = std::make_shared (m_Owner->GetService ()); + timer->expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); + auto s = shared_from_this (); + timer->async_wait ([s,timer,receiveStreamID](const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + auto it = s->m_SavedPackets.find (receiveStreamID); + if (it != s->m_SavedPackets.end ()) + { + for (auto it1: it->second) delete it1; + it->second.clear (); + s->m_SavedPackets.erase (it); + } + } + }); + } } } } diff --git a/Streaming.h b/Streaming.h index 519bda31..0b240c0b 100644 --- a/Streaming.h +++ b/Streaming.h @@ -189,7 +189,7 @@ namespace stream SendHandler m_SendHandler; }; - class StreamingDestination + class StreamingDestination: public std::enable_shared_from_this { public: @@ -222,10 +222,11 @@ namespace stream std::shared_ptr m_Owner; uint16_t m_LocalPort; std::mutex m_StreamsMutex; - std::map > m_Streams; + std::map > m_Streams; // sendStreamID->stream Acceptor m_Acceptor; std::list > m_PendingIncomingStreams; boost::asio::deadline_timer m_PendingIncomingTimer; + std::map > m_SavedPackets; // receiveStreamID->packets, arrived before SYN public: