From 27426a60237223d187003db711a810e3bc6d9b3f Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 26 Jan 2014 18:22:30 -0500 Subject: [PATCH] save out-of-seq messages and process them later --- Streaming.cpp | 33 ++++++++++++++++++++++++++++++--- Streaming.h | 15 +++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 99d51618..1a848c28 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -1,4 +1,3 @@ -#include "I2PEndian.h" #include #include #include @@ -26,6 +25,8 @@ namespace stream { while (auto packet = m_ReceiveQueue.Get ()) delete packet; + for (auto it: m_SavedPackets) + delete it; } void Stream::HandleNextPacket (Packet * packet) @@ -80,6 +81,26 @@ namespace stream m_LastReceivedSequenceNumber = receivedSeqn; SendQuickAck (); + + // we should also try stored messages if any + for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();) + { + if ((*it)->GetReceivedSeqn () == m_LastReceivedSequenceNumber + 1) + { + Packet * packet = *it; + m_SavedPackets.erase (it++); + + LogPrint ("Process saved packet seqn=", packet->GetReceivedSeqn ()); + if (packet->GetLength () > 0) + m_ReceiveQueue.Put (packet); + else + delete packet; + m_LastReceivedSequenceNumber++; + SendQuickAck (); + } + else + break; + } } else { @@ -90,13 +111,14 @@ namespace stream m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); // pick another tunnel if (m_OutboundTunnel) SendQuickAck (); // resend ack for previous message again + delete packet; // packet dropped } else { LogPrint ("Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); - // actually do nothing. just wait for missing message again + // save message and wait for missing message again + SavePacket (packet); } - delete packet; // packet dropped } if (flags & PACKET_FLAG_CLOSE) @@ -107,6 +129,11 @@ namespace stream } } + void Stream::SavePacket (Packet * packet) + { + m_SavedPackets.insert (packet); + } + size_t Stream::Send (uint8_t * buf, size_t len, int timeout) { if (!m_IsOpen) diff --git a/Streaming.h b/Streaming.h index cc772190..248d8ee2 100644 --- a/Streaming.h +++ b/Streaming.h @@ -3,7 +3,9 @@ #include #include +#include #include +#include "I2PEndian.h" #include "Queue.h" #include "Identity.h" #include "LeaseSet.h" @@ -37,6 +39,16 @@ namespace stream Packet (): len (0), offset (0) {}; uint8_t * GetBuffer () { return buf + offset; }; size_t GetLength () const { return len - offset; }; + + uint32_t GetReceivedSeqn () const { return be32toh (*(uint32_t *)(buf + 8)); }; + }; + + struct PacketCmp + { + bool operator() (const Packet * p1, const Packet * p2) const + { + return p1->GetReceivedSeqn () < p2->GetReceivedSeqn (); + }; }; class StreamingDestination; @@ -61,6 +73,8 @@ namespace stream void ConnectAndSend (uint8_t * buf, size_t len); void SendQuickAck (); + + void SavePacket (Packet * packet); private: @@ -69,6 +83,7 @@ namespace stream StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet * m_RemoteLeaseSet; i2p::util::Queue m_ReceiveQueue; + std::set m_SavedPackets; i2p::tunnel::OutboundTunnel * m_OutboundTunnel; };