diff --git a/Streaming.cpp b/Streaming.cpp index 86f2a55c..f73c5125 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -34,55 +34,22 @@ namespace stream if (!m_SendStreamID) m_SendStreamID = packet->GetReceiveStreamID (); - // process flags - uint16_t flags = packet->GetFlags (); - const uint8_t * optionData = packet->GetOptionData (); - if (flags & PACKET_FLAG_SYNCHRONIZE) - { - LogPrint ("Synchronize"); - } - - if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) - { - LogPrint ("Signature"); - optionData += 40; - } - - if (flags & PACKET_FLAG_FROM_INCLUDED) - { - LogPrint ("From identity"); - optionData += sizeof (i2p::data::Identity); - } - uint32_t receivedSeqn = packet->GetSeqn (); - LogPrint ("seqn=", receivedSeqn, ", flags=", flags); + LogPrint ("Received seqn=", receivedSeqn); if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1) { - // we have received next message - packet->offset = packet->GetPayload () - packet->buf; - if (packet->GetLength () > 0) - m_ReceiveQueue.Put (packet); - else - delete packet; - - m_LastReceivedSequenceNumber = receivedSeqn; - SendQuickAck (); + // we have received next in sequence message + ProcessPacket (packet); // we should also try stored messages if any for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();) { if ((*it)->GetSeqn () == m_LastReceivedSequenceNumber + 1) { - Packet * packet = *it; + Packet * savedPacket = *it; m_SavedPackets.erase (it++); - LogPrint ("Process saved packet seqn=", packet->GetSeqn ()); - if (packet->GetLength () > 0) - m_ReceiveQueue.Put (packet); - else - delete packet; - m_LastReceivedSequenceNumber++; - SendQuickAck (); + ProcessPacket (savedPacket); } else break; @@ -106,7 +73,47 @@ namespace stream SavePacket (packet); } } - + } + + void Stream::SavePacket (Packet * packet) + { + m_SavedPackets.insert (packet); + } + + void Stream::ProcessPacket (Packet * packet) + { + // process flags + uint32_t receivedSeqn = packet->GetSeqn (); + uint16_t flags = packet->GetFlags (); + LogPrint ("Process seqn=", receivedSeqn, ", flags=", flags); + + const uint8_t * optionData = packet->GetOptionData (); + if (flags & PACKET_FLAG_SYNCHRONIZE) + { + LogPrint ("Synchronize"); + } + + if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) + { + LogPrint ("Signature"); + optionData += 40; + } + + if (flags & PACKET_FLAG_FROM_INCLUDED) + { + LogPrint ("From identity"); + optionData += sizeof (i2p::data::Identity); + } + + packet->offset = packet->GetPayload () - packet->buf; + if (packet->GetLength () > 0) + m_ReceiveQueue.Put (packet); + else + delete packet; + + m_LastReceivedSequenceNumber = receivedSeqn; + SendQuickAck (); + if (flags & PACKET_FLAG_CLOSE) { LogPrint ("Closed"); @@ -114,11 +121,6 @@ namespace stream m_ReceiveQueue.WakeUp (); } } - - void Stream::SavePacket (Packet * packet) - { - m_SavedPackets.insert (packet); - } size_t Stream::Send (uint8_t * buf, size_t len, int timeout) { diff --git a/Streaming.h b/Streaming.h index 5939d058..18d09b5d 100644 --- a/Streaming.h +++ b/Streaming.h @@ -84,6 +84,7 @@ namespace stream void SendQuickAck (); void SavePacket (Packet * packet); + void ProcessPacket (Packet * packet); private: