From 2fafca1571e29a78dfe0aefd32ce556e7a245662 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 20 Jul 2025 08:42:39 -0400 Subject: [PATCH] limit simultinously sent streaming packets --- libi2pd/Garlic.cpp | 2 +- libi2pd/Garlic.h | 4 ++++ libi2pd/Streaming.cpp | 45 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/libi2pd/Garlic.cpp b/libi2pd/Garlic.cpp index b8794753..a61359f1 100644 --- a/libi2pd/Garlic.cpp +++ b/libi2pd/Garlic.cpp @@ -28,7 +28,7 @@ namespace garlic { GarlicRoutingSession::GarlicRoutingSession (GarlicDestination * owner, bool attachLeaseSet): m_Owner (owner), m_LeaseSetUpdateStatus (attachLeaseSet ? eLeaseSetUpdated : eLeaseSetDoNotSend), - m_LeaseSetUpdateMsgID (0), m_IsWithJava (false) + m_LeaseSetUpdateMsgID (0), m_IsWithJava (false), m_NumSentPackets (0) { } diff --git a/libi2pd/Garlic.h b/libi2pd/Garlic.h index 6479710e..1fdffdae 100644 --- a/libi2pd/Garlic.h +++ b/libi2pd/Garlic.h @@ -133,6 +133,9 @@ namespace garlic bool IsWithJava () const { return m_IsWithJava; } void SetIsWithJava (bool isWithJava) { m_IsWithJava = isWithJava; } + + int NumSentPackets () const { return m_NumSentPackets; } + void SetNumSentPackets (int numSentPackets) { m_NumSentPackets = numSentPackets; } GarlicDestination * GetOwner () const { return m_Owner; } void SetOwner (GarlicDestination * owner) { m_Owner = owner; } @@ -157,6 +160,7 @@ namespace garlic std::shared_ptr m_SharedRoutingPath; bool m_IsWithJava; // based on choked value from streaming + int m_NumSentPackets; // for limit number of sent messages in streaming public: diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index c9d97a2e..2a28f955 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -156,7 +156,16 @@ namespace stream } void Stream::CleanUp () - { + { + if (m_RoutingSession && !m_SentPackets.empty ()) // free up space in shared window + { + int numPackets = m_SentPackets.size (); + int numSentPackets = m_RoutingSession->NumSentPackets (); + numSentPackets -= numPackets; + if (numSentPackets < 0) numSentPackets = 0; + m_RoutingSession->SetNumSentPackets (numSentPackets); + } + m_SendBuffer.CleanUp (); while (!m_ReceiveQueue.empty ()) { @@ -612,6 +621,7 @@ namespace stream } int rttSample = INT_MAX; int incCounter = 0; + int ackPacketsCounter = 0; m_IsNAcked = false; m_IsResendNeeded = false; int nackCount = packet->GetNACKCount (); @@ -653,6 +663,7 @@ namespace stream m_SentPackets.erase (it++); m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; + ackPacketsCounter++; if (m_WindowIncCounter < m_MaxWindowSize && !m_IsFirstACK && !m_IsWinDropped) incCounter++; } @@ -773,6 +784,13 @@ namespace stream } if (acknowledged) { + if (m_RoutingSession) + { + int numSentPackets = m_RoutingSession->NumSentPackets (); + numSentPackets -= ackPacketsCounter; + if (numSentPackets < 0) numSentPackets = 0; + m_RoutingSession->SetNumSentPackets (numSentPackets); + } m_NumResendAttempts = 0; m_IsTimeOutResend = false; SendBuffer (); @@ -860,6 +878,18 @@ namespace stream } else if (numMsgs > m_NumPacketsToSend) numMsgs = m_NumPacketsToSend; + if (m_RoutingSession) + { + int numSentPackets = m_RoutingSession->NumSentPackets (); + int numPacketsToSend = m_MaxWindowSize - numSentPackets; + if (numPacketsToSend <= 0) // shared window is full + { + m_LastSendTime = ts; + return; + } + else if (numMsgs > numPacketsToSend) + numMsgs = numPacketsToSend; + } bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet std::vector packets; while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0)) @@ -947,6 +977,7 @@ namespace stream } if (m_SendBuffer.GetSize() == 0) m_IsBufferEmpty = true; else m_IsBufferEmpty = false; + int numPackets = packets.size (); if (packets.size () > 0) { if (m_SavedPackets.empty ()) // no NACKS @@ -963,7 +994,12 @@ namespace stream } SendPackets (packets); m_LastSendTime = ts; - m_IsSendTime = false; + m_IsSendTime = false; + if (m_RoutingSession) + { + int numSentPackets = m_RoutingSession->NumSentPackets (); + m_RoutingSession->SetNumSentPackets (numSentPackets + numPackets); + } if (m_Status == eStreamStatusClosing && m_SendBuffer.IsEmpty ()) SendClose (); if (isEmpty) @@ -1209,6 +1245,11 @@ namespace stream p->len = size; boost::asio::post (m_Service, std::bind (&Stream::SendPacket, shared_from_this (), p)); + if (m_RoutingSession) + { + int numSentPackets = m_RoutingSession->NumSentPackets (); + m_RoutingSession->SetNumSentPackets (numSentPackets + 1); + } LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID); }