diff --git a/SSU.h b/SSU.h index f938a580..a64b555c 100644 --- a/SSU.h +++ b/SSU.h @@ -31,7 +31,6 @@ namespace ssu }; #pragma pack() - const size_t SSU_MTU = 1484; const int SSU_CONNECT_TIMEOUT = 5; // 5 seconds const int SSU_TERMINATION_TIMEOUT = 330; // 5.5 minutes diff --git a/SSUData.cpp b/SSUData.cpp index 46569c5d..fb887ea7 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -82,73 +82,90 @@ namespace ssu bool isLast = fragmentInfo & 0x010000; // bit 16 uint8_t fragmentNum = fragmentInfo >> 17; // bits 23 - 17 LogPrint ("SSU data fragment ", (int)fragmentNum, " of message ", msgID, " size=", (int)fragmentSize, isLast ? " last" : " non-last"); + + // find message with msgID I2NPMessage * msg = nullptr; - if (fragmentNum > 0) // follow-up fragment - { - auto it = m_IncomleteMessages.find (msgID); - if (it != m_IncomleteMessages.end ()) - { - if (fragmentNum == it->second->nextFragmentNum) - { - // expected fragment - msg = it->second->msg; - memcpy (msg->buf + msg->len, buf, fragmentSize); - msg->len += fragmentSize; - it->second->nextFragmentNum++; - } - else if (fragmentNum < it->second->nextFragmentNum) - // duplicate fragment - LogPrint ("Duplicate fragment ", (int)fragmentNum, " of message ", msgID, ". Ignored"); - else - { - // missing fragment - LogPrint ("Missing fragments from ", it->second->nextFragmentNum, " to ", fragmentNum - 1, " of message ", msgID); - //TODO - } - - if (isLast) - { - if (!msg) - DeleteI2NPMessage (it->second->msg); - delete it->second; - m_IncomleteMessages.erase (it); - } - } - else - // TODO: - LogPrint ("Unexpected follow-on fragment ", (int)fragmentNum, " of message ", msgID); - } - else // first fragment + IncompleteMessage * incompleteMessage = nullptr; + auto it = m_IncomleteMessages.find (msgID); + if (it != m_IncomleteMessages.end ()) + { + // message exists + incompleteMessage = it->second; + msg = incompleteMessage->msg; + } + else { + // create new message msg = NewI2NPMessage (); - memcpy (msg->GetSSUHeader (), buf, fragmentSize); - msg->len += fragmentSize - sizeof (I2NPHeaderShort); - } + msg->len -= sizeof (I2NPHeaderShort); + incompleteMessage = new IncompleteMessage (msg); + m_IncomleteMessages[msgID] = incompleteMessage; + } - if (msg) - { - if (!fragmentNum && !isLast) - m_IncomleteMessages[msgID] = new IncompleteMessage (msg); - if (isLast) + // handle current fragment + if (fragmentNum == incompleteMessage->nextFragmentNum) + { + // expected fragment + memcpy (msg->buf + msg->len, buf, fragmentSize); + msg->len += fragmentSize; + incompleteMessage->nextFragmentNum++; + if (!isLast && !incompleteMessage->savedFragments.empty ()) { - SendMsgAck (msgID); - msg->FromSSU (msgID); - if (m_Session.GetState () == eSessionStateEstablished) - i2p::HandleI2NPMessage (msg); - else + // try saved fragments + for (auto it1 = incompleteMessage->savedFragments.begin (); it1 != incompleteMessage->savedFragments.end ();) { - // we expect DeliveryStatus - if (msg->GetHeader ()->typeID == eI2NPDeliveryStatus) + auto savedFragment = *it1; + if (savedFragment->fragmentNum == incompleteMessage->nextFragmentNum) { - LogPrint ("SSU session established"); - m_Session.Established (); - } + memcpy (msg->buf + msg->len, savedFragment->buf, savedFragment->len); + msg->len += savedFragment->len; + isLast = savedFragment->isLast; + incompleteMessage->nextFragmentNum++; + incompleteMessage->savedFragments.erase (it1++); + delete savedFragment; + } else - LogPrint ("SSU unexpected message ", (int)msg->GetHeader ()->typeID); - DeleteI2NPMessage (msg); + break; } + } + } + else + { + if (fragmentNum < incompleteMessage->nextFragmentNum) + // duplicate fragment + LogPrint ("Duplicate fragment ", (int)fragmentNum, " of message ", msgID, ". Ignored"); + else + { + // missing fragment + LogPrint ("Missing fragments from ", (int)incompleteMessage->nextFragmentNum, " to ", fragmentNum - 1, " of message ", msgID); + incompleteMessage->savedFragments.insert (new Fragment (fragmentNum, buf, fragmentSize, isLast)); } - } + isLast = false; + } + + if (isLast) + { + // delete incomplete message + delete incompleteMessage; + m_IncomleteMessages.erase (msgID); + // process message + SendMsgAck (msgID); + msg->FromSSU (msgID); + if (m_Session.GetState () == eSessionStateEstablished) + i2p::HandleI2NPMessage (msg); + else + { + // we expect DeliveryStatus + if (msg->GetHeader ()->typeID == eI2NPDeliveryStatus) + { + LogPrint ("SSU session established"); + m_Session.Established (); + } + else + LogPrint ("SSU unexpected message ", (int)msg->GetHeader ()->typeID); + DeleteI2NPMessage (msg); + } + } buf += fragmentSize; } } diff --git a/SSUData.h b/SSUData.h index 4829aa56..fb04879d 100644 --- a/SSUData.h +++ b/SSUData.h @@ -2,8 +2,10 @@ #define SSU_DATA_H__ #include +#include #include #include +#include #include "I2NPProtocol.h" namespace i2p @@ -11,6 +13,7 @@ namespace i2p namespace ssu { + const size_t SSU_MTU = 1484; // data flags const uint8_t DATA_FLAG_EXTENDED_DATA_INCLUDED = 0x02; const uint8_t DATA_FLAG_WANT_REPLY = 0x04; @@ -19,6 +22,34 @@ namespace ssu const uint8_t DATA_FLAG_ACK_BITFIELDS_INCLUDED = 0x40; const uint8_t DATA_FLAG_EXPLICIT_ACKS_INCLUDED = 0x80; + struct Fragment + { + int fragmentNum, len; + bool isLast; + uint8_t buf[SSU_MTU]; + + Fragment (int n, const uint8_t * b, int l, bool last): + fragmentNum (n), len (l), isLast (last) { memcpy (buf, b, len); }; + }; + + struct FragmentCmp + { + bool operator() (const Fragment * f1, const Fragment * f2) const + { + return f1->fragmentNum < f2->fragmentNum; + }; + }; + + struct IncompleteMessage + { + I2NPMessage * msg; + int nextFragmentNum; + std::set savedFragments; + + IncompleteMessage (I2NPMessage * m): msg (m), nextFragmentNum (0) {}; + ~IncompleteMessage () { for (auto it: savedFragments) { delete it; }; }; + }; + class SSUSession; class SSUData { @@ -37,13 +68,7 @@ namespace ssu private: - struct IncompleteMessage - { - I2NPMessage * msg; - uint8_t nextFragmentNum; - - IncompleteMessage (I2NPMessage * m): msg (m), nextFragmentNum (1) {}; - }; + SSUSession& m_Session; std::map m_IncomleteMessages;