From d0c5732e16d28f0be86c4026ef23e0b70518f8ea Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 26 Jun 2021 07:18:42 -0400 Subject: [PATCH] eliminate extra lookups for sequential fragments --- libi2pd/TunnelEndpoint.cpp | 141 +++++++++++++++++++++++++++---------- libi2pd/TunnelEndpoint.h | 12 ++-- 2 files changed, 112 insertions(+), 41 deletions(-) diff --git a/libi2pd/TunnelEndpoint.cpp b/libi2pd/TunnelEndpoint.cpp index eb70bdca..a07230cb 100644 --- a/libi2pd/TunnelEndpoint.cpp +++ b/libi2pd/TunnelEndpoint.cpp @@ -52,11 +52,13 @@ namespace tunnel bool isFollowOnFragment = flag & 0x80, isLastFragment = true; uint32_t msgID = 0; int fragmentNum = 0; - TunnelMessageBlockEx m; + TunnelMessageBlockEx& m = m_CurrentMessage; if (!isFollowOnFragment) { // first fragment - + if (m_CurrentMsgID) + AddIncompleteCurrentMessage (); // we have got a new message while previous is not complete + m.deliveryType = (TunnelDeliveryType)((flag >> 5) & 0x03); switch (m.deliveryType) { @@ -81,6 +83,7 @@ namespace tunnel // Message ID msgID = bufbe32toh (fragment); fragment += 4; + m_CurrentMsgID = msgID; isLastFragment = false; } } @@ -96,11 +99,20 @@ namespace tunnel uint16_t size = bufbe16toh (fragment); fragment += 2; + if (isFollowOnFragment && m_CurrentMsgID && m_CurrentMsgID == msgID && + m_CurrentMessage.nextFragmentNum == fragmentNum) + { + HandleCurrenMessageFollowOnFragment (fragment, size, isLastFragment); + fragment += size; + continue; + } + msg->offset = fragment - msg->buf; msg->len = msg->offset + size; if (msg->len > msg->maxLen) { LogPrint (eLogError, "TunnelMessage: fragment is too long ", (int)size); + m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr; return; } if (fragment + size < decrypted + TUNNEL_DATA_ENCRYPTED_SIZE) @@ -114,31 +126,31 @@ namespace tunnel else m.data = msg; - if (!isFollowOnFragment && isLastFragment) - HandleNextMessage (m); - else - { - if (msgID) // msgID is presented, assume message is fragmented + if (!isFollowOnFragment) + { + if (isLastFragment) + { + HandleNextMessage (m); + m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr; + } + else if (msgID) { - if (!isFollowOnFragment) // create new incomlete message - { - m.nextFragmentNum = 1; - m.receiveTime = i2p::util::GetMillisecondsSinceEpoch (); - auto ret = m_IncompleteMessages.insert (std::pair(msgID, m)); - if (ret.second) - HandleOutOfSequenceFragments (msgID, ret.first->second); - else - LogPrint (eLogError, "TunnelMessage: Incomplete message ", msgID, " already exists"); - } - else - { - m.nextFragmentNum = fragmentNum; - HandleFollowOnFragment (msgID, isLastFragment, m); - } - } + m_CurrentMessage.nextFragmentNum = 1; + m_CurrentMessage.receiveTime = i2p::util::GetMillisecondsSinceEpoch (); + HandleOutOfSequenceFragments (msgID, m_CurrentMessage); + } else + { LogPrint (eLogError, "TunnelMessage: Message is fragmented, but msgID is not presented"); - } + m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr; + } + } + else + { + m.nextFragmentNum = fragmentNum; + HandleFollowOnFragment (msgID, isLastFragment, m); + m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr; + } fragment += size; } @@ -157,17 +169,8 @@ namespace tunnel auto& msg = it->second; if (m.nextFragmentNum == msg.nextFragmentNum) { - if (msg.data->len + size < I2NP_MAX_MESSAGE_SIZE) // check if message is not too long + if (ConcatFollowOnFragment (msg, fragment, size)) { - if (msg.data->len + size > msg.data->maxLen) - { - // LogPrint (eLogWarning, "TunnelMessage: I2NP message size ", msg.data->maxLen, " is not enough"); - auto newMsg = NewI2NPMessage (); - *newMsg = *(msg.data); - msg.data = newMsg; - } - if (msg.data->Concat (fragment, size) < size) // concatenate fragment - LogPrint (eLogError, "TunnelMessage: I2NP buffer overflow ", msg.data->maxLen); if (isLastFragment) { // message complete @@ -199,9 +202,67 @@ namespace tunnel } } + bool TunnelEndpoint::ConcatFollowOnFragment (TunnelMessageBlockEx& msg, const uint8_t * fragment, size_t size) const + { + if (msg.data->len + size < I2NP_MAX_MESSAGE_SIZE) // check if message is not too long + { + if (msg.data->len + size > msg.data->maxLen) + { + // LogPrint (eLogWarning, "TunnelMessage: I2NP message size ", msg.data->maxLen, " is not enough"); + auto newMsg = NewI2NPMessage (); + *newMsg = *(msg.data); + msg.data = newMsg; + } + if (msg.data->Concat (fragment, size) < size) // concatenate fragment + { + LogPrint (eLogError, "TunnelMessage: I2NP buffer overflow ", msg.data->maxLen); + return false; + } + } + else + return false; + return true; + } + + void TunnelEndpoint::HandleCurrenMessageFollowOnFragment (const uint8_t * fragment, size_t size, bool isLastFragment) + { + if (ConcatFollowOnFragment (m_CurrentMessage, fragment, size)) + { + if (isLastFragment) + { + // message complete + HandleNextMessage (m_CurrentMessage); + m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr; + } + else + { + m_CurrentMessage.nextFragmentNum++; + HandleOutOfSequenceFragments (m_CurrentMsgID, m_CurrentMessage); + } + } + else + { + LogPrint (eLogError, "TunnelMessage: Fragment ", m_CurrentMessage.nextFragmentNum, " of message ", m_CurrentMsgID, " exceeds max I2NP message size, message dropped"); + m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr; + } + } + + void TunnelEndpoint::AddIncompleteCurrentMessage () + { + if (m_CurrentMsgID) + { + auto ret = m_IncompleteMessages.emplace (m_CurrentMsgID, m_CurrentMessage); + if (!ret.second) + LogPrint (eLogError, "TunnelMessage: Incomplete message ", m_CurrentMsgID, " already exists"); + m_CurrentMessage.data = nullptr; + m_CurrentMsgID = 0; + } + } + void TunnelEndpoint::AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr data) { - if (!m_OutOfSequenceFragments.insert ({{msgID, fragmentNum}, {isLastFragment, data, i2p::util::GetMillisecondsSinceEpoch () }}).second) + if (!m_OutOfSequenceFragments.insert ({(uint64_t)msgID << 32 | fragmentNum, + {isLastFragment, data, i2p::util::GetMillisecondsSinceEpoch () }}).second) LogPrint (eLogInfo, "TunnelMessage: duplicate out-of-sequence fragment ", fragmentNum, " of message ", msgID); } @@ -212,7 +273,13 @@ namespace tunnel if (!msg.nextFragmentNum) // message complete { HandleNextMessage (msg); - m_IncompleteMessages.erase (msgID); + if (&msg == &m_CurrentMessage) + { + m_CurrentMsgID = 0; + m_CurrentMessage.data = nullptr; + } + else + m_IncompleteMessages.erase (msgID); break; } } @@ -220,7 +287,7 @@ namespace tunnel bool TunnelEndpoint::ConcatNextOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg) { - auto it = m_OutOfSequenceFragments.find ({msgID, msg.nextFragmentNum}); + auto it = m_OutOfSequenceFragments.find ((uint64_t)msgID << 32 | msg.nextFragmentNum); if (it != m_OutOfSequenceFragments.end ()) { LogPrint (eLogDebug, "TunnelMessage: Out-of-sequence fragment ", (int)msg.nextFragmentNum, " of message ", msgID, " found"); diff --git a/libi2pd/TunnelEndpoint.h b/libi2pd/TunnelEndpoint.h index 6466df3a..70514e35 100644 --- a/libi2pd/TunnelEndpoint.h +++ b/libi2pd/TunnelEndpoint.h @@ -10,7 +10,6 @@ #define TUNNEL_ENDPOINT_H__ #include -#include #include #include #include "I2NPProtocol.h" @@ -37,7 +36,7 @@ namespace tunnel public: - TunnelEndpoint (bool isInbound): m_IsInbound (isInbound), m_NumReceivedBytes (0) {}; + TunnelEndpoint (bool isInbound): m_IsInbound (isInbound), m_NumReceivedBytes (0), m_CurrentMsgID (0) {}; ~TunnelEndpoint (); size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; void Cleanup (); @@ -47,18 +46,23 @@ namespace tunnel private: void HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, const TunnelMessageBlockEx& m); + bool ConcatFollowOnFragment (TunnelMessageBlockEx& msg, const uint8_t * fragment, size_t size) const; // true if success + void HandleCurrenMessageFollowOnFragment (const uint8_t * frgament, size_t size, bool isLastFragment); void HandleNextMessage (const TunnelMessageBlock& msg); void AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr data); bool ConcatNextOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg); // true if something added void HandleOutOfSequenceFragments (uint32_t msgID, TunnelMessageBlockEx& msg); - + void AddIncompleteCurrentMessage (); + private: std::unordered_map m_IncompleteMessages; - std::map, Fragment> m_OutOfSequenceFragments; // (msgID, fragment#)->fragment + std::unordered_map m_OutOfSequenceFragments; // ((msgID << 8) + fragment#)->fragment bool m_IsInbound; size_t m_NumReceivedBytes; + TunnelMessageBlockEx m_CurrentMessage; + uint32_t m_CurrentMsgID; }; } }