diff --git a/SSUData.cpp b/SSUData.cpp index ab0ed45d..f2236cfd 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -1,5 +1,7 @@ #include +#include #include "Log.h" +#include "Timestamp.h" #include "SSU.h" #include "SSUData.h" @@ -8,7 +10,7 @@ namespace i2p namespace ssu { SSUData::SSUData (SSUSession& session): - m_Session (session) + m_Session (session), m_ResendTimer (session.m_Server.GetService ()) { } @@ -21,10 +23,7 @@ namespace ssu delete it.second; } for (auto it: m_SentMessages) - { - for (auto f: it.second.fragments) - delete[] f; - } + delete it.second; } void SSUData::ProcessSentMessageAck (uint32_t msgID) @@ -32,10 +31,10 @@ namespace ssu auto it = m_SentMessages.find (msgID); if (it != m_SentMessages.end ()) { - // delete all ack-ed message's fragments - for (auto f: it->second.fragments) - delete[] f; + delete it->second; m_SentMessages.erase (it); + if (m_SentMessages.empty ()) + m_ResendTimer.cancel (); } } @@ -70,7 +69,7 @@ namespace ssu bitfield &= 0x7F; // clear MSB if (bitfield && it != m_SentMessages.end ()) { - int numSentFragments = it->second.fragments.size (); + int numSentFragments = it->second->fragments.size (); // process bits uint8_t mask = 0x40; for (int j = 0; j < 7; j++) @@ -79,8 +78,8 @@ namespace ssu { if (fragment < numSentFragments) { - delete[] it->second.fragments[fragment]; - it->second.fragments[fragment] = nullptr; + delete it->second->fragments[fragment]; + it->second->fragments[fragment] = nullptr; } } fragment++; @@ -237,9 +236,14 @@ namespace ssu LogPrint ("SSU message ", msgID, " already sent"); DeleteI2NPMessage (msg); return; - } - SentMessage& sentMessage = m_SentMessages[msgID]; - auto& fragments = sentMessage.fragments; + } + if (m_SentMessages.empty ()) // schedule resend at first message only + ScheduleResend (); + SentMessage * sentMessage = new SentMessage; + m_SentMessages[msgID] = sentMessage; + sentMessage->nextResendTime = i2p::util::GetSecondsSinceEpoch () + RESEND_INTERVAL; + sentMessage->numResends = 0; + auto& fragments = sentMessage->fragments; msgID = htobe32 (msgID); size_t payloadSize = SSU_MTU - sizeof (SSUHeader) - 9; // 9 = flag + #frg(1) + messageID(4) + frag info (3) size_t len = msg->GetLength (); @@ -248,8 +252,9 @@ namespace ssu uint32_t fragmentNum = 0; while (len > 0) { - uint8_t * buf = new uint8_t[SSU_MTU + 18]; - fragments.push_back (buf); + Fragment * fragment = new Fragment; + uint8_t * buf = fragment->buf; + fragments.push_back (fragment); uint8_t * payload = buf + sizeof (SSUHeader); *payload = DATA_FLAG_WANT_REPLY; // for compatibility payload++; @@ -272,6 +277,7 @@ namespace ssu size += payload - buf; if (size & 0x0F) // make sure 16 bytes boundary size = ((size >> 4) + 1) << 4; // (/16 + 1)*16 + fragment->len = size; // encrypt message with session key m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, size); @@ -335,6 +341,47 @@ namespace ssu m_Session.Send (buf, len); } + void SSUData::ScheduleResend() + { + m_ResendTimer.cancel (); + m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_INTERVAL)); + m_ResendTimer.async_wait (boost::bind (&SSUData::HandleResendTimer, + this, boost::asio::placeholders::error)); + } + + void SSUData::HandleResendTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + uint32_t ts = i2p::util::GetSecondsSinceEpoch (); + for (auto it = m_SentMessages.begin (); it != m_SentMessages.end ();) + { + if (ts >= it->second->nextResendTime) + { + bool isEmpty = true; + for (auto f: it->second->fragments) + if (f) + { + isEmpty = false; + m_Session.Send (f->buf, f->len); // resend + } + + it->second->numResends++; + if (isEmpty || it->second->numResends >= MAX_NUM_RESENDS) + { + delete it->second; + it = m_SentMessages.erase (it); + } + else + it++; + } + else + it++; + } + if (!m_SentMessages.empty ()) + ScheduleResend (); + } + } } } diff --git a/SSUData.h b/SSUData.h index 1cf18eff..45ba31c1 100644 --- a/SSUData.h +++ b/SSUData.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "I2NPProtocol.h" namespace i2p @@ -14,6 +15,8 @@ namespace ssu { const size_t SSU_MTU = 1484; + const int RESEND_INTERVAL = 3; // in seconds + const int MAX_NUM_RESENDS = 5; // data flags const uint8_t DATA_FLAG_EXTENDED_DATA_INCLUDED = 0x02; const uint8_t DATA_FLAG_WANT_REPLY = 0x04; @@ -24,10 +27,12 @@ namespace ssu struct Fragment { - int fragmentNum, len; + int fragmentNum; + size_t len; bool isLast; - uint8_t buf[SSU_MTU]; + uint8_t buf[SSU_MTU + 18]; + Fragment () = default; Fragment (int n, const uint8_t * b, int l, bool last): fragmentNum (n), len (l), isLast (last) { memcpy (buf, b, len); }; }; @@ -52,9 +57,11 @@ namespace ssu struct SentMessage { - std::vector fragments; + std::vector fragments; uint32_t nextResendTime; // in seconds int numResends; + + ~SentMessage () { for (auto it: fragments) { delete it; }; }; }; class SSUSession; @@ -76,13 +83,15 @@ namespace ssu void ProcessFragments (uint8_t * buf); void ProcessSentMessageAck (uint32_t msgID); - private: - + void ScheduleResend (); + void HandleResendTimer (const boost::system::error_code& ecode); + private: SSUSession& m_Session; std::map m_IncomleteMessages; - std::map m_SentMessages; // msgID -> fragments + std::map m_SentMessages; + boost::asio::deadline_timer m_ResendTimer; }; } }