diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 98ab2f1b..ebc6d675 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -729,9 +729,13 @@ namespace transport break; } case eSSU2BlkFirstFragment: + LogPrint (eLogDebug, "SSU2: First fragment"); + HandleFirstFragment (buf + offset, size); isData = true; break; case eSSU2BlkFollowOnFragment: + LogPrint (eLogDebug, "SSU2: Follow-on fragment"); + HandleFollowOnFragment (buf + offset, size); isData = true; break; case eSSU2BlkTermination: @@ -824,6 +828,103 @@ namespace transport it1--; m_SentPackets.erase (it, it1); } + + void SSU2Session::HandleFirstFragment (const uint8_t * buf, size_t len) + { + uint32_t msgID; memcpy (&msgID, buf + 1, 4); + auto msg = NewI2NPMessage (); + // same format as I2NP message block + msg->len = msg->offset + len + 7; + memcpy (msg->GetNTCP2Header (), buf, len); + msg->FromNTCP2 (); + std::shared_ptr m; + bool found = false; + auto it = m_IncompleteMessages.find (msgID); + if (it != m_IncompleteMessages.end ()) + { + found = true; + m = it->second; + } + else + { + m = std::make_shared(); + m_IncompleteMessages.emplace (msgID, m); + } + m->msg = msg; + m->nextFragmentNum = 1; + m->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch (); + if (found && ConcatOutOfSequenceFragments (m)) + { + // we have all follow-on fragments already + m_Handler.PutNextMessage (std::move (m->msg)); + m_IncompleteMessages.erase (it); + } + } + + void SSU2Session::HandleFollowOnFragment (const uint8_t * buf, size_t len) + { + if (len < 5) return; + uint8_t fragmentNum = buf[0] >> 1; + bool isLast = buf[0] & 0x01; + uint32_t msgID; memcpy (&msgID, buf + 1, 4); + auto it = m_IncompleteMessages.find (msgID); + if (it != m_IncompleteMessages.end ()) + { + if (it->second->nextFragmentNum == fragmentNum && it->second->msg) + { + // in sequence + it->second->msg->Concat (buf + 5, len - 5); + if (isLast) + { + m_Handler.PutNextMessage (std::move (it->second->msg)); + m_IncompleteMessages.erase (it); + } + else + { + it->second->nextFragmentNum++; + if (ConcatOutOfSequenceFragments (it->second)) + { + m_Handler.PutNextMessage (std::move (it->second->msg)); + m_IncompleteMessages.erase (it); + } + else + it->second->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch (); + } + return; + } + } + else + { + // follow-on fragment before first fragment + auto msg = std::make_shared (); + msg->nextFragmentNum = 0; + it = m_IncompleteMessages.emplace (msgID, msg).first; + } + // insert out of sequence fragment + auto fragment = std::make_shared (); + memcpy (fragment->buf, buf + 5, len -5); + fragment->len = len - 5; + fragment->isLast = isLast; + it->second->outOfSequenceFragments.emplace (fragmentNum, fragment); + it->second->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch (); + } + + bool SSU2Session::ConcatOutOfSequenceFragments (std::shared_ptr m) + { + if (!m) return false; + bool isLast = false; + for (auto it = m->outOfSequenceFragments.begin (); it != m->outOfSequenceFragments.end ();) + if (it->first == m->nextFragmentNum) + { + m->msg->Concat (it->second->buf, it->second->len); + isLast = it->second->isLast; + it = m->outOfSequenceFragments.erase (it); + m->nextFragmentNum++; + } + else + break; + return isLast; + } bool SSU2Session::ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep) { @@ -884,7 +985,7 @@ namespace transport htobe32buf (buf + 3, ackThrough); // Ack Through uint8_t acnt = 0; if (ackThrough) - acnt = std::min ((int)ackThrough - 1, 255); + acnt = std::min ((int)ackThrough, 255); buf[7] = acnt; // acnt // TODO: ranges return 8; @@ -985,6 +1086,20 @@ namespace transport payloadSize += CreatePaddingBlock (payload + payloadSize, 32 - payloadSize); SendData (payload, payloadSize); } + + void SSU2Session::CleanUp (uint64_t ts) + { + for (auto it = m_IncompleteMessages.begin (); it != m_IncompleteMessages.end ();) + { + if (ts > it->second->lastFragmentInsertTime + SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT) + { + LogPrint (eLogWarning, "SSU2: message ", it->first, " was not completed in ", SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT, " seconds, deleted"); + it = m_IncompleteMessages.erase (it); + } + else + ++it; + } + } SSU2Server::SSU2Server (): RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()), @@ -1221,7 +1336,10 @@ namespace transport it = m_Sessions.erase (it); } else + { + it->second->CleanUp (ts); it++; + } } for (auto it = m_IncomingTokens.begin (); it != m_IncomingTokens.end (); ) diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index f7f73ec2..abf6a18c 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -32,6 +32,7 @@ namespace transport const size_t SSU2_MAX_PAYLOAD_SIZE = SSU2_MTU - 32; const int SSU2_RESEND_INTERVAL = 3; // in seconds const int SSU2_MAX_NUM_RESENDS = 5; + const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds enum SSU2MessageType { @@ -77,6 +78,20 @@ namespace transport eSSU2SessionStateFailed }; + struct SSU2IncompleteMessage + { + struct Fragment + { + uint8_t buf[SSU2_MTU]; + size_t len; + bool isLast; + }; + + std::shared_ptr msg; + int nextFragmentNum; + uint32_t lastFragmentInsertTime; // in seconds + std::map > outOfSequenceFragments; + }; // RouterInfo flags const uint8_t SSU2_ROUTER_INFO_FLAG_REQUEST_FLOOD = 0x01; @@ -120,6 +135,7 @@ namespace transport void Connect (); void Terminate (); void TerminateByTimeout (); + void CleanUp (uint64_t ts); void Done () override; void SendI2NPMessages (const std::vector >& msgs) override; void Resend (uint64_t ts); @@ -158,7 +174,10 @@ namespace transport std::shared_ptr ExtractRouterInfo (const uint8_t * buf, size_t size); void CreateNonce (uint64_t seqn, uint8_t * nonce); bool UpdateReceivePacketNum (uint32_t packetNum); // for Ack, returns false if duplicate - + void HandleFirstFragment (const uint8_t * buf, size_t len); + void HandleFollowOnFragment (const uint8_t * buf, size_t len); + bool ConcatOutOfSequenceFragments (std::shared_ptr m); // true if message complete + size_t CreateAddressBlock (const boost::asio::ip::udp::endpoint& ep, uint8_t * buf, size_t len); size_t CreateAckBlock (uint8_t * buf, size_t len); size_t CreatePaddingBlock (uint8_t * buf, size_t len, size_t minSize = 0); @@ -177,6 +196,7 @@ namespace transport uint32_t m_SendPacketNum, m_ReceivePacketNum; std::set m_OutOfSequencePackets; // packet nums > receive packet num std::map > m_SentPackets; // packetNum -> packet + std::map > m_IncompleteMessages; // I2NP std::list > m_SendQueue; i2p::I2NPMessagesHandler m_Handler; };