diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 1c6aeb3b..2d0bf19a 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -580,18 +580,16 @@ namespace transport LogPrint (eLogWarning, "SSU2: Data AEAD verification failed "); return; } - HandlePayload (payload, payloadSize); m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); m_NumReceivedBytes += len; - if (packetNum > m_ReceivePacketNum) - { - m_ReceivePacketNum = packetNum; + UpdateReceivePacketNum (packetNum); + if (HandlePayload (payload, payloadSize)) SendQuickAck (); // TODO: don't send too requently - } } - void SSU2Session::HandlePayload (const uint8_t * buf, size_t len) + bool SSU2Session::HandlePayload (const uint8_t * buf, size_t len) { + bool isData = false; size_t offset = 0; while (offset < len) { @@ -630,11 +628,14 @@ namespace transport memcpy (nextMsg->GetNTCP2Header (), buf + offset, size); nextMsg->FromNTCP2 (); // SSU2 has the same format as NTCP2 m_Handler.PutNextMessage (std::move (nextMsg)); + isData = true; break; } case eSSU2BlkFirstFragment: + isData = true; break; case eSSU2BlkFollowOnFragment: + isData = true; break; case eSSU2BlkTermination: LogPrint (eLogDebug, "SSU2: Termination"); @@ -688,6 +689,7 @@ namespace transport offset += size; } m_Handler.Flush (); + return isData; } bool SSU2Session::ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep) @@ -744,9 +746,14 @@ namespace transport { if (len < 8) return 0; buf[0] = eSSU2BlkAck; + uint32_t ackThrough = m_OutOfSequencePackets.empty () ? m_ReceivePacketNum : *m_OutOfSequencePackets.rbegin (); htobe16buf (buf + 1, 5); - htobe32buf (buf + 3, m_ReceivePacketNum); // Ack Through - buf[7] = 0; // acnt + htobe32buf (buf + 3, ackThrough); // Ack Through + uint8_t acnt = 0; + if (ackThrough) + acnt = std::min ((int)ackThrough - 1, 255); + buf[7] = acnt; // acnt + // TODO: ranges return 8; } @@ -793,6 +800,27 @@ namespace transport htole64buf (nonce + 4, seqn); } + void SSU2Session::UpdateReceivePacketNum (uint32_t packetNum) + { + if (packetNum <= m_ReceivePacketNum) return; // duplicate + if (packetNum == m_ReceivePacketNum + 1) + { + for (auto it = m_OutOfSequencePackets.begin (); it != m_OutOfSequencePackets.end ();) + { + if (*it == packetNum + 1) + { + packetNum++; + it = m_OutOfSequencePackets.erase (it); + } + else + break; + } + m_ReceivePacketNum = packetNum; + } + else + m_OutOfSequencePackets.insert (packetNum); + } + void SSU2Session::SendQuickAck () { uint8_t payload[SSU2_MTU]; diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index df5f8be9..3da75e85 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -11,6 +11,7 @@ #include #include +#include #include #include #include "Crypto.h" @@ -134,10 +135,11 @@ namespace transport void SendQuickAck (); void SendTermination (); - void HandlePayload (const uint8_t * buf, size_t len); + bool HandlePayload (const uint8_t * buf, size_t len); // returns true is contains data bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep); std::shared_ptr ExtractRouterInfo (const uint8_t * buf, size_t size); void CreateNonce (uint64_t seqn, uint8_t * nonce); + void UpdateReceivePacketNum (uint32_t packetNum); // for Ack size_t CreateAddressBlock (const boost::asio::ip::udp::endpoint& ep, uint8_t * buf, size_t len); size_t CreateAckBlock (uint8_t * buf, size_t len); @@ -154,6 +156,7 @@ namespace transport SSU2SessionState m_State; uint8_t m_KeyDataSend[64], m_KeyDataReceive[64]; uint32_t m_SendPacketNum, m_ReceivePacketNum; + std::set m_OutOfSequencePackets; // packet nums > receive packet num i2p::I2NPMessagesHandler m_Handler; };