1
0
mirror of https://github.com/PurpleI2P/i2pd.git synced 2025-08-26 12:51:54 +00:00

limit simultinously sent streaming packets

This commit is contained in:
orignal 2025-07-20 08:42:39 -04:00
parent 27b005a9b2
commit 2fafca1571
3 changed files with 48 additions and 3 deletions

View File

@ -28,7 +28,7 @@ namespace garlic
{ {
GarlicRoutingSession::GarlicRoutingSession (GarlicDestination * owner, bool attachLeaseSet): GarlicRoutingSession::GarlicRoutingSession (GarlicDestination * owner, bool attachLeaseSet):
m_Owner (owner), m_LeaseSetUpdateStatus (attachLeaseSet ? eLeaseSetUpdated : eLeaseSetDoNotSend), m_Owner (owner), m_LeaseSetUpdateStatus (attachLeaseSet ? eLeaseSetUpdated : eLeaseSetDoNotSend),
m_LeaseSetUpdateMsgID (0), m_IsWithJava (false) m_LeaseSetUpdateMsgID (0), m_IsWithJava (false), m_NumSentPackets (0)
{ {
} }

View File

@ -134,6 +134,9 @@ namespace garlic
bool IsWithJava () const { return m_IsWithJava; } bool IsWithJava () const { return m_IsWithJava; }
void SetIsWithJava (bool isWithJava) { m_IsWithJava = isWithJava; } void SetIsWithJava (bool isWithJava) { m_IsWithJava = isWithJava; }
int NumSentPackets () const { return m_NumSentPackets; }
void SetNumSentPackets (int numSentPackets) { m_NumSentPackets = numSentPackets; }
GarlicDestination * GetOwner () const { return m_Owner; } GarlicDestination * GetOwner () const { return m_Owner; }
void SetOwner (GarlicDestination * owner) { m_Owner = owner; } void SetOwner (GarlicDestination * owner) { m_Owner = owner; }
@ -157,6 +160,7 @@ namespace garlic
std::shared_ptr<GarlicRoutingPath> m_SharedRoutingPath; std::shared_ptr<GarlicRoutingPath> m_SharedRoutingPath;
bool m_IsWithJava; // based on choked value from streaming bool m_IsWithJava; // based on choked value from streaming
int m_NumSentPackets; // for limit number of sent messages in streaming
public: public:

View File

@ -157,6 +157,15 @@ namespace stream
void Stream::CleanUp () void Stream::CleanUp ()
{ {
if (m_RoutingSession && !m_SentPackets.empty ()) // free up space in shared window
{
int numPackets = m_SentPackets.size ();
int numSentPackets = m_RoutingSession->NumSentPackets ();
numSentPackets -= numPackets;
if (numSentPackets < 0) numSentPackets = 0;
m_RoutingSession->SetNumSentPackets (numSentPackets);
}
m_SendBuffer.CleanUp (); m_SendBuffer.CleanUp ();
while (!m_ReceiveQueue.empty ()) while (!m_ReceiveQueue.empty ())
{ {
@ -612,6 +621,7 @@ namespace stream
} }
int rttSample = INT_MAX; int rttSample = INT_MAX;
int incCounter = 0; int incCounter = 0;
int ackPacketsCounter = 0;
m_IsNAcked = false; m_IsNAcked = false;
m_IsResendNeeded = false; m_IsResendNeeded = false;
int nackCount = packet->GetNACKCount (); int nackCount = packet->GetNACKCount ();
@ -653,6 +663,7 @@ namespace stream
m_SentPackets.erase (it++); m_SentPackets.erase (it++);
m_LocalDestination.DeletePacket (sentPacket); m_LocalDestination.DeletePacket (sentPacket);
acknowledged = true; acknowledged = true;
ackPacketsCounter++;
if (m_WindowIncCounter < m_MaxWindowSize && !m_IsFirstACK && !m_IsWinDropped) if (m_WindowIncCounter < m_MaxWindowSize && !m_IsFirstACK && !m_IsWinDropped)
incCounter++; incCounter++;
} }
@ -773,6 +784,13 @@ namespace stream
} }
if (acknowledged) if (acknowledged)
{ {
if (m_RoutingSession)
{
int numSentPackets = m_RoutingSession->NumSentPackets ();
numSentPackets -= ackPacketsCounter;
if (numSentPackets < 0) numSentPackets = 0;
m_RoutingSession->SetNumSentPackets (numSentPackets);
}
m_NumResendAttempts = 0; m_NumResendAttempts = 0;
m_IsTimeOutResend = false; m_IsTimeOutResend = false;
SendBuffer (); SendBuffer ();
@ -860,6 +878,18 @@ namespace stream
} }
else if (numMsgs > m_NumPacketsToSend) else if (numMsgs > m_NumPacketsToSend)
numMsgs = m_NumPacketsToSend; numMsgs = m_NumPacketsToSend;
if (m_RoutingSession)
{
int numSentPackets = m_RoutingSession->NumSentPackets ();
int numPacketsToSend = m_MaxWindowSize - numSentPackets;
if (numPacketsToSend <= 0) // shared window is full
{
m_LastSendTime = ts;
return;
}
else if (numMsgs > numPacketsToSend)
numMsgs = numPacketsToSend;
}
bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet
std::vector<Packet *> packets; std::vector<Packet *> packets;
while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0)) while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0))
@ -947,6 +977,7 @@ namespace stream
} }
if (m_SendBuffer.GetSize() == 0) m_IsBufferEmpty = true; if (m_SendBuffer.GetSize() == 0) m_IsBufferEmpty = true;
else m_IsBufferEmpty = false; else m_IsBufferEmpty = false;
int numPackets = packets.size ();
if (packets.size () > 0) if (packets.size () > 0)
{ {
if (m_SavedPackets.empty ()) // no NACKS if (m_SavedPackets.empty ()) // no NACKS
@ -964,6 +995,11 @@ namespace stream
SendPackets (packets); SendPackets (packets);
m_LastSendTime = ts; m_LastSendTime = ts;
m_IsSendTime = false; m_IsSendTime = false;
if (m_RoutingSession)
{
int numSentPackets = m_RoutingSession->NumSentPackets ();
m_RoutingSession->SetNumSentPackets (numSentPackets + numPackets);
}
if (m_Status == eStreamStatusClosing && m_SendBuffer.IsEmpty ()) if (m_Status == eStreamStatusClosing && m_SendBuffer.IsEmpty ())
SendClose (); SendClose ();
if (isEmpty) if (isEmpty)
@ -1209,6 +1245,11 @@ namespace stream
p->len = size; p->len = size;
boost::asio::post (m_Service, std::bind (&Stream::SendPacket, shared_from_this (), p)); boost::asio::post (m_Service, std::bind (&Stream::SendPacket, shared_from_this (), p));
if (m_RoutingSession)
{
int numSentPackets = m_RoutingSession->NumSentPackets ();
m_RoutingSession->SetNumSentPackets (numSentPackets + 1);
}
LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID); LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID);
} }