Browse Source

handle received packets in right order

pull/27/head
orignal 11 years ago
parent
commit
04cf2031ef
  1. 88
      Streaming.cpp
  2. 1
      Streaming.h

88
Streaming.cpp

@ -34,55 +34,22 @@ namespace stream
if (!m_SendStreamID) if (!m_SendStreamID)
m_SendStreamID = packet->GetReceiveStreamID (); m_SendStreamID = packet->GetReceiveStreamID ();
// process flags
uint16_t flags = packet->GetFlags ();
const uint8_t * optionData = packet->GetOptionData ();
if (flags & PACKET_FLAG_SYNCHRONIZE)
{
LogPrint ("Synchronize");
}
if (flags & PACKET_FLAG_SIGNATURE_INCLUDED)
{
LogPrint ("Signature");
optionData += 40;
}
if (flags & PACKET_FLAG_FROM_INCLUDED)
{
LogPrint ("From identity");
optionData += sizeof (i2p::data::Identity);
}
uint32_t receivedSeqn = packet->GetSeqn (); uint32_t receivedSeqn = packet->GetSeqn ();
LogPrint ("seqn=", receivedSeqn, ", flags=", flags); LogPrint ("Received seqn=", receivedSeqn);
if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1) if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1)
{ {
// we have received next message // we have received next in sequence message
packet->offset = packet->GetPayload () - packet->buf; ProcessPacket (packet);
if (packet->GetLength () > 0)
m_ReceiveQueue.Put (packet);
else
delete packet;
m_LastReceivedSequenceNumber = receivedSeqn;
SendQuickAck ();
// we should also try stored messages if any // we should also try stored messages if any
for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();) for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();)
{ {
if ((*it)->GetSeqn () == m_LastReceivedSequenceNumber + 1) if ((*it)->GetSeqn () == m_LastReceivedSequenceNumber + 1)
{ {
Packet * packet = *it; Packet * savedPacket = *it;
m_SavedPackets.erase (it++); m_SavedPackets.erase (it++);
LogPrint ("Process saved packet seqn=", packet->GetSeqn ()); ProcessPacket (savedPacket);
if (packet->GetLength () > 0)
m_ReceiveQueue.Put (packet);
else
delete packet;
m_LastReceivedSequenceNumber++;
SendQuickAck ();
} }
else else
break; break;
@ -106,6 +73,46 @@ namespace stream
SavePacket (packet); SavePacket (packet);
} }
} }
}
void Stream::SavePacket (Packet * packet)
{
m_SavedPackets.insert (packet);
}
void Stream::ProcessPacket (Packet * packet)
{
// process flags
uint32_t receivedSeqn = packet->GetSeqn ();
uint16_t flags = packet->GetFlags ();
LogPrint ("Process seqn=", receivedSeqn, ", flags=", flags);
const uint8_t * optionData = packet->GetOptionData ();
if (flags & PACKET_FLAG_SYNCHRONIZE)
{
LogPrint ("Synchronize");
}
if (flags & PACKET_FLAG_SIGNATURE_INCLUDED)
{
LogPrint ("Signature");
optionData += 40;
}
if (flags & PACKET_FLAG_FROM_INCLUDED)
{
LogPrint ("From identity");
optionData += sizeof (i2p::data::Identity);
}
packet->offset = packet->GetPayload () - packet->buf;
if (packet->GetLength () > 0)
m_ReceiveQueue.Put (packet);
else
delete packet;
m_LastReceivedSequenceNumber = receivedSeqn;
SendQuickAck ();
if (flags & PACKET_FLAG_CLOSE) if (flags & PACKET_FLAG_CLOSE)
{ {
@ -115,11 +122,6 @@ namespace stream
} }
} }
void Stream::SavePacket (Packet * packet)
{
m_SavedPackets.insert (packet);
}
size_t Stream::Send (uint8_t * buf, size_t len, int timeout) size_t Stream::Send (uint8_t * buf, size_t len, int timeout)
{ {
if (!m_IsOpen) if (!m_IsOpen)

1
Streaming.h

@ -84,6 +84,7 @@ namespace stream
void SendQuickAck (); void SendQuickAck ();
void SavePacket (Packet * packet); void SavePacket (Packet * packet);
void ProcessPacket (Packet * packet);
private: private:

Loading…
Cancel
Save