From 2fef595b83a4fdee89a02c35ed4bf1e235b66bb3 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 31 Mar 2022 15:35:55 -0400 Subject: [PATCH] resend packets --- libi2pd/SSU2.cpp | 65 +++++++++++++++++++++++++++++++++++++----------- libi2pd/SSU2.h | 6 ++++- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 54783f9b..67cf9658 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -104,8 +104,8 @@ namespace transport for (auto it: msgs) m_SendQueue.push_back (it); SendQueue (); - } - + } + void SSU2Session::SendQueue () { if (!m_SendQueue.empty ()) @@ -146,6 +146,28 @@ namespace transport } } } + + void SSU2Session::Resend (uint64_t ts) + { + if (m_SendQueue.empty ()) return; + for (auto it = m_SentPackets.begin (); it != m_SentPackets.end (); ) + if (ts > it->second->nextResendTime) + { + if (it->second->numResends > SSU2_MAX_NUM_RESENDS) + it = m_SentPackets.erase (it); + else + { + m_Server.Send (it->second->header.buf, 16, it->second->payload, it->second->payloadLen, m_RemoteEndpoint); + it->second->numResends++; + it->second->nextResendTime = ts + it->second->numResends*SSU2_RESEND_INTERVAL; + m_LastActivityTimestamp = ts; + m_NumSentBytes += it->second->payloadLen + 16; + it++; + } + } + else + it++; + } void SSU2Session::ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len) { @@ -765,19 +787,14 @@ namespace transport { if (m_SentPackets.empty ()) return; uint32_t ackThrough = bufbe32toh (buf); - auto it = m_SentPackets.rbegin (); - while (it != m_SentPackets.rend () && it->first > ackThrough) ++it; // find first less pack <= ackThrough - if (it == m_SentPackets.rend ()) return; - int32_t firstPacketNum = ackThrough - buf[4]; // acnt - if (firstPacketNum < 0) firstPacketNum = 0; + uint32_t firstPacketNum = ackThrough > buf[4] ? ackThrough - buf[4] : 0; // acnt + auto it = m_SentPackets.begin (); + while (it != m_SentPackets.end () && it->first < firstPacketNum) it++; // find first acked packet + if (it == m_SentPackets.end ()) return; // not found auto it1 = it; - while (it1 != m_SentPackets.rend () && it1->first >= (uint32_t)firstPacketNum) it1++; - if (it1 == m_SentPackets.rend ()) - { - m_SentPackets.erase (m_SentPackets.begin (), it.base ()); - return; - } - m_SentPackets.erase (it1.base (), it.base ()); + while (it1 != m_SentPackets.end () && it1->first <= ackThrough) it1++; + it1--; + m_SentPackets.erase (it, it1); // TODO: handle ranges } @@ -944,7 +961,7 @@ namespace transport SSU2Server::SSU2Server (): RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()), - m_TerminationTimer (GetService ()) + m_TerminationTimer (GetService ()), m_ResendTimer (GetService ()) { } @@ -1200,6 +1217,24 @@ namespace transport } } + void SSU2Server::ScheduleResend () + { + m_ResendTimer.expires_from_now (boost::posix_time::seconds(SSU2_RESEND_INTERVAL)); + m_ResendTimer.async_wait (std::bind (&SSU2Server::HandleResendTimer, + this, std::placeholders::_1)); + } + + void SSU2Server::HandleResendTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + auto ts = i2p::util::GetSecondsSinceEpoch (); + for (auto it: m_Sessions) + it.second->Resend (ts); + ScheduleResend (); + } + } + void SSU2Server::UpdateOutgoingToken (const boost::asio::ip::udp::endpoint& ep, uint64_t token, uint32_t exp) { m_OutgoingTokens[ep] = {token, exp}; diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 8b04bbc4..f50aa7b4 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -122,6 +122,7 @@ namespace transport void TerminateByTimeout (); void Done () override; void SendI2NPMessages (const std::vector >& msgs) override; + void Resend (uint64_t ts); bool IsEstablished () const { return m_State == eSSU2SessionStateEstablished; }; uint64_t GetConnID () const { return m_SourceConnID; }; @@ -223,6 +224,9 @@ namespace transport void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode); + + void ScheduleResend (); + void HandleResendTimer (const boost::system::error_code& ecode); private: @@ -231,7 +235,7 @@ namespace transport std::map > m_PendingOutgoingSessions; std::map > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds) i2p::util::MemoryPoolMt m_PacketsPool; - boost::asio::deadline_timer m_TerminationTimer; + boost::asio::deadline_timer m_TerminationTimer, m_ResendTimer; public: