Browse Source

save out-of-seq messages and process them later

pull/14/head
orignal 11 years ago
parent
commit
27426a6023
  1. 33
      Streaming.cpp
  2. 15
      Streaming.h

33
Streaming.cpp

@ -1,4 +1,3 @@
#include "I2PEndian.h"
#include <string> #include <string>
#include <algorithm> #include <algorithm>
#include <cryptopp/gzip.h> #include <cryptopp/gzip.h>
@ -26,6 +25,8 @@ namespace stream
{ {
while (auto packet = m_ReceiveQueue.Get ()) while (auto packet = m_ReceiveQueue.Get ())
delete packet; delete packet;
for (auto it: m_SavedPackets)
delete it;
} }
void Stream::HandleNextPacket (Packet * packet) void Stream::HandleNextPacket (Packet * packet)
@ -80,6 +81,26 @@ namespace stream
m_LastReceivedSequenceNumber = receivedSeqn; m_LastReceivedSequenceNumber = receivedSeqn;
SendQuickAck (); SendQuickAck ();
// we should also try stored messages if any
for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();)
{
if ((*it)->GetReceivedSeqn () == m_LastReceivedSequenceNumber + 1)
{
Packet * packet = *it;
m_SavedPackets.erase (it++);
LogPrint ("Process saved packet seqn=", packet->GetReceivedSeqn ());
if (packet->GetLength () > 0)
m_ReceiveQueue.Put (packet);
else
delete packet;
m_LastReceivedSequenceNumber++;
SendQuickAck ();
}
else
break;
}
} }
else else
{ {
@ -90,13 +111,14 @@ namespace stream
m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); // pick another tunnel m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); // pick another tunnel
if (m_OutboundTunnel) if (m_OutboundTunnel)
SendQuickAck (); // resend ack for previous message again SendQuickAck (); // resend ack for previous message again
delete packet; // packet dropped
} }
else else
{ {
LogPrint ("Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); LogPrint ("Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1);
// actually do nothing. just wait for missing message again // save message and wait for missing message again
SavePacket (packet);
} }
delete packet; // packet dropped
} }
if (flags & PACKET_FLAG_CLOSE) if (flags & PACKET_FLAG_CLOSE)
@ -107,6 +129,11 @@ namespace stream
} }
} }
void Stream::SavePacket (Packet * packet)
{
m_SavedPackets.insert (packet);
}
size_t Stream::Send (uint8_t * buf, size_t len, int timeout) size_t Stream::Send (uint8_t * buf, size_t len, int timeout)
{ {
if (!m_IsOpen) if (!m_IsOpen)

15
Streaming.h

@ -3,7 +3,9 @@
#include <inttypes.h> #include <inttypes.h>
#include <map> #include <map>
#include <set>
#include <cryptopp/dsa.h> #include <cryptopp/dsa.h>
#include "I2PEndian.h"
#include "Queue.h" #include "Queue.h"
#include "Identity.h" #include "Identity.h"
#include "LeaseSet.h" #include "LeaseSet.h"
@ -37,6 +39,16 @@ namespace stream
Packet (): len (0), offset (0) {}; Packet (): len (0), offset (0) {};
uint8_t * GetBuffer () { return buf + offset; }; uint8_t * GetBuffer () { return buf + offset; };
size_t GetLength () const { return len - offset; }; size_t GetLength () const { return len - offset; };
uint32_t GetReceivedSeqn () const { return be32toh (*(uint32_t *)(buf + 8)); };
};
struct PacketCmp
{
bool operator() (const Packet * p1, const Packet * p2) const
{
return p1->GetReceivedSeqn () < p2->GetReceivedSeqn ();
};
}; };
class StreamingDestination; class StreamingDestination;
@ -61,6 +73,8 @@ namespace stream
void ConnectAndSend (uint8_t * buf, size_t len); void ConnectAndSend (uint8_t * buf, size_t len);
void SendQuickAck (); void SendQuickAck ();
void SavePacket (Packet * packet);
private: private:
@ -69,6 +83,7 @@ namespace stream
StreamingDestination * m_LocalDestination; StreamingDestination * m_LocalDestination;
const i2p::data::LeaseSet * m_RemoteLeaseSet; const i2p::data::LeaseSet * m_RemoteLeaseSet;
i2p::util::Queue<Packet> m_ReceiveQueue; i2p::util::Queue<Packet> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets;
i2p::tunnel::OutboundTunnel * m_OutboundTunnel; i2p::tunnel::OutboundTunnel * m_OutboundTunnel;
}; };

Loading…
Cancel
Save