diff --git a/Streaming.cpp b/Streaming.cpp index 1a848c28..86f2a55c 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -31,26 +31,12 @@ namespace stream void Stream::HandleNextPacket (Packet * packet) { - const uint8_t * buf = packet->buf; - buf += 4; // sendStreamID if (!m_SendStreamID) - m_SendStreamID = be32toh (*(uint32_t *)buf); - buf += 4; // receiveStreamID - uint32_t receivedSeqn = be32toh (*(uint32_t *)buf); - buf += 4; // sequenceNum - buf += 4; // ackThrough - int nackCount = buf[0]; - buf++; // NACK count - buf += 4*nackCount; // NACKs - buf++; // resendDelay - uint16_t flags = be16toh (*(uint16_t *)buf); - buf += 2; // flags - uint16_t optionalSize = be16toh (*(uint16_t *)buf); - buf += 2; // optional size - const uint8_t * optionalData = buf; - buf += optionalSize; - + m_SendStreamID = packet->GetReceiveStreamID (); + // process flags + uint16_t flags = packet->GetFlags (); + const uint8_t * optionData = packet->GetOptionData (); if (flags & PACKET_FLAG_SYNCHRONIZE) { LogPrint ("Synchronize"); @@ -59,21 +45,21 @@ namespace stream if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) { LogPrint ("Signature"); - optionalData += 40; + optionData += 40; } if (flags & PACKET_FLAG_FROM_INCLUDED) { LogPrint ("From identity"); - optionalData += sizeof (i2p::data::Identity); + optionData += sizeof (i2p::data::Identity); } - // we have reached payload section + uint32_t receivedSeqn = packet->GetSeqn (); LogPrint ("seqn=", receivedSeqn, ", flags=", flags); if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1) { // we have received next message - packet->offset = buf - packet->buf; + packet->offset = packet->GetPayload () - packet->buf; if (packet->GetLength () > 0) m_ReceiveQueue.Put (packet); else @@ -85,12 +71,12 @@ namespace stream // we should also try stored messages if any for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();) { - if ((*it)->GetReceivedSeqn () == m_LastReceivedSequenceNumber + 1) + if ((*it)->GetSeqn () == m_LastReceivedSequenceNumber + 1) { Packet * packet = *it; m_SavedPackets.erase (it++); - LogPrint ("Process saved packet seqn=", packet->GetReceivedSeqn ()); + LogPrint ("Process saved packet seqn=", packet->GetSeqn ()); if (packet->GetLength () > 0) m_ReceiveQueue.Put (packet); else diff --git a/Streaming.h b/Streaming.h index 248d8ee2..5939d058 100644 --- a/Streaming.h +++ b/Streaming.h @@ -40,14 +40,23 @@ namespace stream uint8_t * GetBuffer () { return buf + offset; }; size_t GetLength () const { return len - offset; }; - uint32_t GetReceivedSeqn () const { return be32toh (*(uint32_t *)(buf + 8)); }; + uint32_t GetSendStreamID () const { return be32toh (*(uint32_t *)buf); }; + uint32_t GetReceiveStreamID () const { return be32toh (*(uint32_t *)(buf + 4)); }; + uint32_t GetSeqn () const { return be32toh (*(uint32_t *)(buf + 8)); }; + uint32_t GetAckThrough () const { return be32toh (*(uint32_t *)(buf + 12)); }; + uint8_t GetNACKCount () const { return buf[16]; }; + const uint8_t * GetOption () const { return buf + 17 + GetNACKCount ()*4 + 3; }; // 3 = resendDelay + flags + uint16_t GetFlags () const { return be16toh (*(uint16_t *)(GetOption () - 2)); }; + uint16_t GetOptionSize () const { return be16toh (*(uint16_t *)GetOption ()); }; + const uint8_t * GetOptionData () const { return GetOption () + 2; }; + const uint8_t * GetPayload () const { return GetOptionData () + GetOptionSize (); }; }; struct PacketCmp { bool operator() (const Packet * p1, const Packet * p2) const { - return p1->GetReceivedSeqn () < p2->GetReceivedSeqn (); + return p1->GetSeqn () < p2->GetSeqn (); }; };