Browse Source

handle fragments

pull/1752/head
orignal 3 years ago
parent
commit
82f9585b7a
  1. 120
      libi2pd/SSU2.cpp
  2. 20
      libi2pd/SSU2.h

120
libi2pd/SSU2.cpp

@ -729,9 +729,13 @@ namespace transport
break; break;
} }
case eSSU2BlkFirstFragment: case eSSU2BlkFirstFragment:
LogPrint (eLogDebug, "SSU2: First fragment");
HandleFirstFragment (buf + offset, size);
isData = true; isData = true;
break; break;
case eSSU2BlkFollowOnFragment: case eSSU2BlkFollowOnFragment:
LogPrint (eLogDebug, "SSU2: Follow-on fragment");
HandleFollowOnFragment (buf + offset, size);
isData = true; isData = true;
break; break;
case eSSU2BlkTermination: case eSSU2BlkTermination:
@ -825,6 +829,103 @@ namespace transport
m_SentPackets.erase (it, it1); m_SentPackets.erase (it, it1);
} }
void SSU2Session::HandleFirstFragment (const uint8_t * buf, size_t len)
{
uint32_t msgID; memcpy (&msgID, buf + 1, 4);
auto msg = NewI2NPMessage ();
// same format as I2NP message block
msg->len = msg->offset + len + 7;
memcpy (msg->GetNTCP2Header (), buf, len);
msg->FromNTCP2 ();
std::shared_ptr<SSU2IncompleteMessage> m;
bool found = false;
auto it = m_IncompleteMessages.find (msgID);
if (it != m_IncompleteMessages.end ())
{
found = true;
m = it->second;
}
else
{
m = std::make_shared<SSU2IncompleteMessage>();
m_IncompleteMessages.emplace (msgID, m);
}
m->msg = msg;
m->nextFragmentNum = 1;
m->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch ();
if (found && ConcatOutOfSequenceFragments (m))
{
// we have all follow-on fragments already
m_Handler.PutNextMessage (std::move (m->msg));
m_IncompleteMessages.erase (it);
}
}
void SSU2Session::HandleFollowOnFragment (const uint8_t * buf, size_t len)
{
if (len < 5) return;
uint8_t fragmentNum = buf[0] >> 1;
bool isLast = buf[0] & 0x01;
uint32_t msgID; memcpy (&msgID, buf + 1, 4);
auto it = m_IncompleteMessages.find (msgID);
if (it != m_IncompleteMessages.end ())
{
if (it->second->nextFragmentNum == fragmentNum && it->second->msg)
{
// in sequence
it->second->msg->Concat (buf + 5, len - 5);
if (isLast)
{
m_Handler.PutNextMessage (std::move (it->second->msg));
m_IncompleteMessages.erase (it);
}
else
{
it->second->nextFragmentNum++;
if (ConcatOutOfSequenceFragments (it->second))
{
m_Handler.PutNextMessage (std::move (it->second->msg));
m_IncompleteMessages.erase (it);
}
else
it->second->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch ();
}
return;
}
}
else
{
// follow-on fragment before first fragment
auto msg = std::make_shared<SSU2IncompleteMessage> ();
msg->nextFragmentNum = 0;
it = m_IncompleteMessages.emplace (msgID, msg).first;
}
// insert out of sequence fragment
auto fragment = std::make_shared<SSU2IncompleteMessage::Fragment> ();
memcpy (fragment->buf, buf + 5, len -5);
fragment->len = len - 5;
fragment->isLast = isLast;
it->second->outOfSequenceFragments.emplace (fragmentNum, fragment);
it->second->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch ();
}
bool SSU2Session::ConcatOutOfSequenceFragments (std::shared_ptr<SSU2IncompleteMessage> m)
{
if (!m) return false;
bool isLast = false;
for (auto it = m->outOfSequenceFragments.begin (); it != m->outOfSequenceFragments.end ();)
if (it->first == m->nextFragmentNum)
{
m->msg->Concat (it->second->buf, it->second->len);
isLast = it->second->isLast;
it = m->outOfSequenceFragments.erase (it);
m->nextFragmentNum++;
}
else
break;
return isLast;
}
bool SSU2Session::ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep) bool SSU2Session::ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep)
{ {
if (size < 2) return false; if (size < 2) return false;
@ -884,7 +985,7 @@ namespace transport
htobe32buf (buf + 3, ackThrough); // Ack Through htobe32buf (buf + 3, ackThrough); // Ack Through
uint8_t acnt = 0; uint8_t acnt = 0;
if (ackThrough) if (ackThrough)
acnt = std::min ((int)ackThrough - 1, 255); acnt = std::min ((int)ackThrough, 255);
buf[7] = acnt; // acnt buf[7] = acnt; // acnt
// TODO: ranges // TODO: ranges
return 8; return 8;
@ -986,6 +1087,20 @@ namespace transport
SendData (payload, payloadSize); SendData (payload, payloadSize);
} }
void SSU2Session::CleanUp (uint64_t ts)
{
for (auto it = m_IncompleteMessages.begin (); it != m_IncompleteMessages.end ();)
{
if (ts > it->second->lastFragmentInsertTime + SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT)
{
LogPrint (eLogWarning, "SSU2: message ", it->first, " was not completed in ", SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT, " seconds, deleted");
it = m_IncompleteMessages.erase (it);
}
else
++it;
}
}
SSU2Server::SSU2Server (): SSU2Server::SSU2Server ():
RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()), RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()),
m_TerminationTimer (GetService ()), m_ResendTimer (GetService ()) m_TerminationTimer (GetService ()), m_ResendTimer (GetService ())
@ -1221,7 +1336,10 @@ namespace transport
it = m_Sessions.erase (it); it = m_Sessions.erase (it);
} }
else else
{
it->second->CleanUp (ts);
it++; it++;
}
} }
for (auto it = m_IncomingTokens.begin (); it != m_IncomingTokens.end (); ) for (auto it = m_IncomingTokens.begin (); it != m_IncomingTokens.end (); )

20
libi2pd/SSU2.h

@ -32,6 +32,7 @@ namespace transport
const size_t SSU2_MAX_PAYLOAD_SIZE = SSU2_MTU - 32; const size_t SSU2_MAX_PAYLOAD_SIZE = SSU2_MTU - 32;
const int SSU2_RESEND_INTERVAL = 3; // in seconds const int SSU2_RESEND_INTERVAL = 3; // in seconds
const int SSU2_MAX_NUM_RESENDS = 5; const int SSU2_MAX_NUM_RESENDS = 5;
const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds
enum SSU2MessageType enum SSU2MessageType
{ {
@ -77,6 +78,20 @@ namespace transport
eSSU2SessionStateFailed eSSU2SessionStateFailed
}; };
struct SSU2IncompleteMessage
{
struct Fragment
{
uint8_t buf[SSU2_MTU];
size_t len;
bool isLast;
};
std::shared_ptr<I2NPMessage> msg;
int nextFragmentNum;
uint32_t lastFragmentInsertTime; // in seconds
std::map<int, std::shared_ptr<Fragment> > outOfSequenceFragments;
};
// RouterInfo flags // RouterInfo flags
const uint8_t SSU2_ROUTER_INFO_FLAG_REQUEST_FLOOD = 0x01; const uint8_t SSU2_ROUTER_INFO_FLAG_REQUEST_FLOOD = 0x01;
@ -120,6 +135,7 @@ namespace transport
void Connect (); void Connect ();
void Terminate (); void Terminate ();
void TerminateByTimeout (); void TerminateByTimeout ();
void CleanUp (uint64_t ts);
void Done () override; void Done () override;
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override; void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
void Resend (uint64_t ts); void Resend (uint64_t ts);
@ -158,6 +174,9 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo> ExtractRouterInfo (const uint8_t * buf, size_t size); std::shared_ptr<const i2p::data::RouterInfo> ExtractRouterInfo (const uint8_t * buf, size_t size);
void CreateNonce (uint64_t seqn, uint8_t * nonce); void CreateNonce (uint64_t seqn, uint8_t * nonce);
bool UpdateReceivePacketNum (uint32_t packetNum); // for Ack, returns false if duplicate bool UpdateReceivePacketNum (uint32_t packetNum); // for Ack, returns false if duplicate
void HandleFirstFragment (const uint8_t * buf, size_t len);
void HandleFollowOnFragment (const uint8_t * buf, size_t len);
bool ConcatOutOfSequenceFragments (std::shared_ptr<SSU2IncompleteMessage> m); // true if message complete
size_t CreateAddressBlock (const boost::asio::ip::udp::endpoint& ep, uint8_t * buf, size_t len); size_t CreateAddressBlock (const boost::asio::ip::udp::endpoint& ep, uint8_t * buf, size_t len);
size_t CreateAckBlock (uint8_t * buf, size_t len); size_t CreateAckBlock (uint8_t * buf, size_t len);
@ -177,6 +196,7 @@ namespace transport
uint32_t m_SendPacketNum, m_ReceivePacketNum; uint32_t m_SendPacketNum, m_ReceivePacketNum;
std::set<uint32_t> m_OutOfSequencePackets; // packet nums > receive packet num std::set<uint32_t> m_OutOfSequencePackets; // packet nums > receive packet num
std::map<uint32_t, std::shared_ptr<SentPacket> > m_SentPackets; // packetNum -> packet std::map<uint32_t, std::shared_ptr<SentPacket> > m_SentPackets; // packetNum -> packet
std::map<uint32_t, std::shared_ptr<SSU2IncompleteMessage> > m_IncompleteMessages; // I2NP
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
i2p::I2NPMessagesHandler m_Handler; i2p::I2NPMessagesHandler m_Handler;
}; };

Loading…
Cancel
Save