Browse Source

eliminated extra I2NP messages for fragments

pull/1677/head
orignal 3 years ago
parent
commit
b9476791f4
  1. 85
      libi2pd/TunnelEndpoint.cpp
  2. 10
      libi2pd/TunnelEndpoint.h

85
libi2pd/TunnelEndpoint.cpp

@ -52,26 +52,25 @@ namespace tunnel
bool isFollowOnFragment = flag & 0x80, isLastFragment = true; bool isFollowOnFragment = flag & 0x80, isLastFragment = true;
uint32_t msgID = 0; uint32_t msgID = 0;
int fragmentNum = 0; int fragmentNum = 0;
TunnelMessageBlockEx& m = m_CurrentMessage;
if (!isFollowOnFragment) if (!isFollowOnFragment)
{ {
// first fragment // first fragment
if (m_CurrentMsgID) if (m_CurrentMsgID)
AddIncompleteCurrentMessage (); // we have got a new message while previous is not complete AddIncompleteCurrentMessage (); // we have got a new message while previous is not complete
m.deliveryType = (TunnelDeliveryType)((flag >> 5) & 0x03); m_CurrentMessage.deliveryType = (TunnelDeliveryType)((flag >> 5) & 0x03);
switch (m.deliveryType) switch (m_CurrentMessage.deliveryType)
{ {
case eDeliveryTypeLocal: // 0 case eDeliveryTypeLocal: // 0
break; break;
case eDeliveryTypeTunnel: // 1 case eDeliveryTypeTunnel: // 1
m.tunnelID = bufbe32toh (fragment); m_CurrentMessage.tunnelID = bufbe32toh (fragment);
fragment += 4; // tunnelID fragment += 4; // tunnelID
m.hash = i2p::data::IdentHash (fragment); m_CurrentMessage.hash = i2p::data::IdentHash (fragment);
fragment += 32; // hash fragment += 32; // hash
break; break;
case eDeliveryTypeRouter: // 2 case eDeliveryTypeRouter: // 2
m.hash = i2p::data::IdentHash (fragment); m_CurrentMessage.hash = i2p::data::IdentHash (fragment);
fragment += 32; // to hash fragment += 32; // to hash
break; break;
default: ; default: ;
@ -99,42 +98,51 @@ namespace tunnel
uint16_t size = bufbe16toh (fragment); uint16_t size = bufbe16toh (fragment);
fragment += 2; fragment += 2;
if (isFollowOnFragment && m_CurrentMsgID && m_CurrentMsgID == msgID && // handle fragment
m_CurrentMessage.nextFragmentNum == fragmentNum) if (isFollowOnFragment)
{ {
HandleCurrenMessageFollowOnFragment (fragment, size, isLastFragment); // existing message
fragment += size; if (m_CurrentMsgID && m_CurrentMsgID == msgID && m_CurrentMessage.nextFragmentNum == fragmentNum)
continue; HandleCurrenMessageFollowOnFragment (fragment, size, isLastFragment); // previous
else
{
HandleFollowOnFragment (msgID, isLastFragment, fragmentNum, fragment, size); // another
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
} }
}
else
{
// new message
msg->offset = fragment - msg->buf; msg->offset = fragment - msg->buf;
msg->len = msg->offset + size; msg->len = msg->offset + size;
// check message size
if (msg->len > msg->maxLen) if (msg->len > msg->maxLen)
{ {
LogPrint (eLogError, "TunnelMessage: fragment is too long ", (int)size); LogPrint (eLogError, "TunnelMessage: fragment is too long ", (int)size);
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr; m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
return; return;
} }
// create new or assign I2NP message
if (fragment + size < decrypted + TUNNEL_DATA_ENCRYPTED_SIZE) if (fragment + size < decrypted + TUNNEL_DATA_ENCRYPTED_SIZE)
{ {
// this is not last message. we have to copy it // this is not last message. we have to copy it
m.data = NewI2NPTunnelMessage (); m_CurrentMessage.data = NewI2NPTunnelMessage ();
m.data->offset += TUNNEL_GATEWAY_HEADER_SIZE; // reserve room for TunnelGateway header m_CurrentMessage.data->offset += TUNNEL_GATEWAY_HEADER_SIZE; // reserve room for TunnelGateway header
m.data->len += TUNNEL_GATEWAY_HEADER_SIZE; m_CurrentMessage.data->len += TUNNEL_GATEWAY_HEADER_SIZE;
*(m.data) = *msg; *(m_CurrentMessage.data) = *msg;
} }
else else
m.data = msg; m_CurrentMessage.data = msg;
if (!isFollowOnFragment)
{
if (isLastFragment) if (isLastFragment)
{ {
HandleNextMessage (m); // single message
HandleNextMessage (m_CurrentMessage);
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr; m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
} }
else if (msgID) else if (msgID)
{ {
// first fragment of a new message
m_CurrentMessage.nextFragmentNum = 1; m_CurrentMessage.nextFragmentNum = 1;
m_CurrentMessage.receiveTime = i2p::util::GetMillisecondsSinceEpoch (); m_CurrentMessage.receiveTime = i2p::util::GetMillisecondsSinceEpoch ();
HandleOutOfSequenceFragments (msgID, m_CurrentMessage); HandleOutOfSequenceFragments (msgID, m_CurrentMessage);
@ -145,12 +153,6 @@ namespace tunnel
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr; m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
} }
} }
else
{
m.nextFragmentNum = fragmentNum;
HandleFollowOnFragment (msgID, isLastFragment, m);
m_CurrentMsgID = 0; m_CurrentMessage.data = nullptr;
}
fragment += size; fragment += size;
} }
@ -159,15 +161,14 @@ namespace tunnel
LogPrint (eLogError, "TunnelMessage: zero not found"); LogPrint (eLogError, "TunnelMessage: zero not found");
} }
void TunnelEndpoint::HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, const TunnelMessageBlockEx& m) void TunnelEndpoint::HandleFollowOnFragment (uint32_t msgID, bool isLastFragment,
uint8_t fragmentNum, const uint8_t * fragment, size_t size)
{ {
auto fragment = m.data->GetBuffer ();
auto size = m.data->GetLength ();
auto it = m_IncompleteMessages.find (msgID); auto it = m_IncompleteMessages.find (msgID);
if (it != m_IncompleteMessages.end()) if (it != m_IncompleteMessages.end())
{ {
auto& msg = it->second; auto& msg = it->second;
if (m.nextFragmentNum == msg.nextFragmentNum) if (fragmentNum == msg.nextFragmentNum)
{ {
if (ConcatFollowOnFragment (msg, fragment, size)) if (ConcatFollowOnFragment (msg, fragment, size))
{ {
@ -185,20 +186,20 @@ namespace tunnel
} }
else else
{ {
LogPrint (eLogError, "TunnelMessage: Fragment ", m.nextFragmentNum, " of message ", msgID, "exceeds max I2NP message size, message dropped"); LogPrint (eLogError, "TunnelMessage: Fragment ", fragmentNum, " of message ", msgID, "exceeds max I2NP message size, message dropped");
m_IncompleteMessages.erase (it); m_IncompleteMessages.erase (it);
} }
} }
else else
{ {
LogPrint (eLogWarning, "TunnelMessage: Unexpected fragment ", (int)m.nextFragmentNum, " instead ", (int)msg.nextFragmentNum, " of message ", msgID, ", saved"); LogPrint (eLogWarning, "TunnelMessage: Unexpected fragment ", (int)fragmentNum, " instead ", (int)msg.nextFragmentNum, " of message ", msgID, ", saved");
AddOutOfSequenceFragment (msgID, m.nextFragmentNum, isLastFragment, m.data); AddOutOfSequenceFragment (msgID, fragmentNum, isLastFragment, fragment, size);
} }
} }
else else
{ {
LogPrint (eLogWarning, "TunnelMessage: First fragment of message ", msgID, " not found, saved"); LogPrint (eLogWarning, "TunnelMessage: First fragment of message ", msgID, " not found, saved");
AddOutOfSequenceFragment (msgID, m.nextFragmentNum, isLastFragment, m.data); AddOutOfSequenceFragment (msgID, fragmentNum, isLastFragment, fragment, size);
} }
} }
@ -259,10 +260,12 @@ namespace tunnel
} }
} }
void TunnelEndpoint::AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr<I2NPMessage> data) void TunnelEndpoint::AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum,
bool isLastFragment, const uint8_t * fragment, size_t size)
{ {
if (!m_OutOfSequenceFragments.insert ({(uint64_t)msgID << 32 | fragmentNum, std::unique_ptr<Fragment> f(new Fragment (isLastFragment, i2p::util::GetMillisecondsSinceEpoch (), size));
{isLastFragment, data, i2p::util::GetMillisecondsSinceEpoch () }}).second) memcpy (f->data.data (), fragment, size);
if (!m_OutOfSequenceFragments.emplace ((uint64_t)msgID << 32 | fragmentNum, std::move (f)).second)
LogPrint (eLogInfo, "TunnelMessage: duplicate out-of-sequence fragment ", fragmentNum, " of message ", msgID); LogPrint (eLogInfo, "TunnelMessage: duplicate out-of-sequence fragment ", fragmentNum, " of message ", msgID);
} }
@ -291,7 +294,7 @@ namespace tunnel
if (it != m_OutOfSequenceFragments.end ()) if (it != m_OutOfSequenceFragments.end ())
{ {
LogPrint (eLogDebug, "TunnelMessage: Out-of-sequence fragment ", (int)msg.nextFragmentNum, " of message ", msgID, " found"); LogPrint (eLogDebug, "TunnelMessage: Out-of-sequence fragment ", (int)msg.nextFragmentNum, " of message ", msgID, " found");
size_t size = it->second.data->GetLength (); size_t size = it->second->data.size ();
if (msg.data->len + size > msg.data->maxLen) if (msg.data->len + size > msg.data->maxLen)
{ {
LogPrint (eLogWarning, "TunnelMessage: Tunnel endpoint I2NP message size ", msg.data->maxLen, " is not enough"); LogPrint (eLogWarning, "TunnelMessage: Tunnel endpoint I2NP message size ", msg.data->maxLen, " is not enough");
@ -299,9 +302,9 @@ namespace tunnel
*newMsg = *(msg.data); *newMsg = *(msg.data);
msg.data = newMsg; msg.data = newMsg;
} }
if (msg.data->Concat (it->second.data->GetBuffer (), size) < size) // concatenate out-of-sync fragment if (msg.data->Concat (it->second->data.data (), size) < size) // concatenate out-of-sync fragment
LogPrint (eLogError, "TunnelMessage: Tunnel endpoint I2NP buffer overflow ", msg.data->maxLen); LogPrint (eLogError, "TunnelMessage: Tunnel endpoint I2NP buffer overflow ", msg.data->maxLen);
if (it->second.isLastFragment) if (it->second->isLastFragment)
// message complete // message complete
msg.nextFragmentNum = 0; msg.nextFragmentNum = 0;
else else
@ -354,7 +357,7 @@ namespace tunnel
// out-of-sequence fragments // out-of-sequence fragments
for (auto it = m_OutOfSequenceFragments.begin (); it != m_OutOfSequenceFragments.end ();) for (auto it = m_OutOfSequenceFragments.begin (); it != m_OutOfSequenceFragments.end ();)
{ {
if (ts > it->second.receiveTime + i2p::I2NP_MESSAGE_EXPIRATION_TIMEOUT) if (ts > it->second->receiveTime + i2p::I2NP_MESSAGE_EXPIRATION_TIMEOUT)
it = m_OutOfSequenceFragments.erase (it); it = m_OutOfSequenceFragments.erase (it);
else else
++it; ++it;

10
libi2pd/TunnelEndpoint.h

@ -11,6 +11,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <unordered_map> #include <unordered_map>
#include <vector>
#include <string> #include <string>
#include "I2NPProtocol.h" #include "I2NPProtocol.h"
#include "TunnelBase.h" #include "TunnelBase.h"
@ -29,9 +30,10 @@ namespace tunnel
struct Fragment struct Fragment
{ {
Fragment (bool last, uint64_t t, size_t size): isLastFragment (last), receiveTime (t), data (size) {};
bool isLastFragment; bool isLastFragment;
std::shared_ptr<I2NPMessage> data;
uint64_t receiveTime; // milliseconds since epoch uint64_t receiveTime; // milliseconds since epoch
std::vector<uint8_t> data;
}; };
public: public:
@ -45,12 +47,12 @@ namespace tunnel
private: private:
void HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, const TunnelMessageBlockEx& m); void HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, uint8_t fragmentNum, const uint8_t * fragment, size_t size);
bool ConcatFollowOnFragment (TunnelMessageBlockEx& msg, const uint8_t * fragment, size_t size) const; // true if success bool ConcatFollowOnFragment (TunnelMessageBlockEx& msg, const uint8_t * fragment, size_t size) const; // true if success
void HandleCurrenMessageFollowOnFragment (const uint8_t * frgament, size_t size, bool isLastFragment); void HandleCurrenMessageFollowOnFragment (const uint8_t * frgament, size_t size, bool isLastFragment);
void HandleNextMessage (const TunnelMessageBlock& msg); void HandleNextMessage (const TunnelMessageBlock& msg);
void AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr<I2NPMessage> data); void AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, const uint8_t * fragment, size_t size);
bool ConcatNextOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg); // true if something added bool ConcatNextOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg); // true if something added
void HandleOutOfSequenceFragments (uint32_t msgID, TunnelMessageBlockEx& msg); void HandleOutOfSequenceFragments (uint32_t msgID, TunnelMessageBlockEx& msg);
void AddIncompleteCurrentMessage (); void AddIncompleteCurrentMessage ();
@ -58,7 +60,7 @@ namespace tunnel
private: private:
std::unordered_map<uint32_t, TunnelMessageBlockEx> m_IncompleteMessages; std::unordered_map<uint32_t, TunnelMessageBlockEx> m_IncompleteMessages;
std::unordered_map<uint64_t, Fragment> m_OutOfSequenceFragments; // ((msgID << 8) + fragment#)->fragment std::unordered_map<uint64_t, std::unique_ptr<Fragment> > m_OutOfSequenceFragments; // ((msgID << 8) + fragment#)->fragment
bool m_IsInbound; bool m_IsInbound;
size_t m_NumReceivedBytes; size_t m_NumReceivedBytes;
TunnelMessageBlockEx m_CurrentMessage; TunnelMessageBlockEx m_CurrentMessage;

Loading…
Cancel
Save