diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index 3d71e8d5..70c59067 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -78,9 +78,9 @@ namespace stream m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_RTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_SlowRTT2 (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), - m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), m_WindowSizeTail (0), + m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), m_Jitter (0), m_MinPacingTime (0), - m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_LastACKRecieveTime (0), m_ACKRecieveInterval (local.GetOwner ()->GetStreamingAckDelay ()), m_RemoteLeaseChangeTime (0), + m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_RemoteLeaseChangeTime (0), m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed m_NumResendAttempts (0), m_NumPacketsToSend (0), m_MTU (STREAMING_MTU) { @@ -106,8 +106,8 @@ namespace stream m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_SlowRTT2 (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), - m_PrevRTTSample (INITIAL_RTT), m_WindowSizeTail (0), m_Jitter (0), m_MinPacingTime (0), - m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_LastACKRecieveTime (0), m_ACKRecieveInterval (local.GetOwner ()->GetStreamingAckDelay ()), m_RemoteLeaseChangeTime (0), + m_PrevRTTSample (INITIAL_RTT), m_Jitter (0), m_MinPacingTime (0), + m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_RemoteLeaseChangeTime (0), m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed m_NumResendAttempts (0), m_NumPacketsToSend (0), m_MTU (STREAMING_MTU) { @@ -366,7 +366,6 @@ namespace stream { if (!m_IsWinDropped) { - LogPrint (eLogDebug, "Streaming: Client choked, set min. window size"); m_WindowDropTargetSize = MIN_WINDOW_SIZE; m_LastWindowDropSize = 0; m_WindowIncCounter = 0; @@ -538,21 +537,12 @@ namespace stream m_SentPackets.erase (it++); m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; - if (m_WindowIncCounter < MAX_WINDOW_SIZE && !m_IsFirstACK && !m_IsWinDropped) + if (m_WindowSize < MAX_WINDOW_SIZE && !m_IsFirstACK) incCounter++; } else break; } - if (m_LastACKRecieveTime) - { - uint64_t interval = ts - m_LastACKRecieveTime; - if (m_ACKRecieveInterval) - m_ACKRecieveInterval = (m_ACKRecieveInterval + interval) / 2; - else - m_ACKRecieveInterval = interval; - } - m_LastACKRecieveTime = ts; if (rttSample != INT_MAX) { if (m_IsFirstRttSample && !m_IsFirstACK) @@ -599,15 +589,12 @@ namespace stream // // delay-based CC if ((m_SlowRTT2 > m_SlowRTT + m_Jitter && rttSample > m_SlowRTT2 && rttSample > m_PrevRTTSample) && !m_IsWinDropped) // Drop window if RTT grows too fast, late detection - { - LogPrint (eLogDebug, "Streaming: Congestion detected, reduce window size"); ProcessWindowDrop (); - } UpdatePacingTime (); m_PrevRTTSample = rttSample; bool wasInitial = m_RTO == INITIAL_RTO; - m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.3 + m_Jitter + m_ACKRecieveInterval)); // TODO: implement it better + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.3 + m_Jitter)); // TODO: implement it better if (wasInitial) ScheduleResend (); @@ -617,10 +604,20 @@ namespace stream m_IsFirstRttSample = true; m_IsWinDropped = false; } - if (m_WindowDropTargetSize && int(m_SentPackets.size ()) <= m_WindowDropTargetSize) + if (m_WindowDropTargetSize && m_WindowSize <= m_WindowDropTargetSize) { - m_WindowSize = m_WindowDropTargetSize; m_WindowDropTargetSize = 0; + m_DropWindowDelaySequenceNumber = m_SequenceNumber; + } + if (acknowledged && m_WindowDropTargetSize && m_WindowSize > m_WindowDropTargetSize) + { + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5 + m_Jitter)); // we assume that the next rtt sample may be much larger than the current + m_IsResendNeeded = true; + m_WindowSize = m_SentPackets.size () + 1; // if there are no packets to resend, just send one regular packet + if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; + if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; + m_WindowIncCounter = 0; + UpdatePacingTime (); } if (acknowledged || m_IsNAcked) { @@ -629,14 +626,12 @@ namespace stream if (m_SendBuffer.IsEmpty () && m_SentPackets.size () > 0) // tail loss { m_IsResendNeeded = true; - m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5 + m_Jitter + m_ACKRecieveInterval)); // to prevent spurious retransmit + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5 + m_Jitter)); // to prevent spurious retransmit } if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) { m_ResendTimer.cancel (); m_SendTimer.cancel (); - m_LastACKRecieveTime = 0; - m_ACKRecieveInterval = m_AckDelay; } if (acknowledged && m_IsFirstACK) { @@ -1163,7 +1158,6 @@ namespace stream } if (m_RemoteLeaseChangeTime && m_IsRemoteLeaseChangeInProgress && ts > m_RemoteLeaseChangeTime + INITIAL_RTO) { - LogPrint (eLogDebug, "Streaming: RemoteLease changed, set initial window size"); CancelRemoteLeaseChange (); m_CurrentRemoteLease = m_NextRemoteLease; HalveWindowSize (); @@ -1311,11 +1305,6 @@ namespace stream } UpdatePacingTime (); } - else if (m_WindowIncCounter && m_WindowSize == MAX_WINDOW_SIZE && !m_SendBuffer.IsEmpty () && m_PacingTime > m_MinPacingTime) - { - m_WindowSizeTail = m_WindowSizeTail + m_WindowIncCounter; - if (m_WindowSizeTail > MAX_WINDOW_SIZE) m_WindowSizeTail = MAX_WINDOW_SIZE; - } if (m_IsNAcked) ResendPacket (); else if (m_IsResendNeeded) // resend packets @@ -1420,10 +1409,7 @@ namespace stream { // loss-based CC if (!m_IsWinDropped && LOSS_BASED_CONTROL_ENABLED) - { - LogPrint (eLogDebug, "Streaming: Packet loss, reduce window size"); ProcessWindowDrop (); - } } else if (m_IsTimeOutResend) { @@ -1436,8 +1422,6 @@ namespace stream m_IsFirstRttSample = true; m_DropWindowDelaySequenceNumber = 0; m_IsFirstACK = true; - m_LastACKRecieveTime = 0; - m_ACKRecieveInterval = m_AckDelay; UpdatePacingTime (); if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); if (m_NumResendAttempts & 1) @@ -1601,7 +1585,6 @@ namespace stream } if (isLeaseChanged && !m_IsRemoteLeaseChangeInProgress) { - LogPrint (eLogDebug, "Streaming: RemoteLease changed, set initial window size"); HalveWindowSize (); } } @@ -1618,10 +1601,7 @@ namespace stream void Stream::UpdatePacingTime () { - if (m_WindowDropTargetSize) - m_PacingTime = std::round (m_RTT*1000/m_WindowDropTargetSize); - else - m_PacingTime = std::round (m_RTT*1000/m_WindowSize); + m_PacingTime = std::round (m_RTT*1000/m_WindowSize); if (m_MinPacingTime && m_PacingTime < m_MinPacingTime) m_PacingTime = m_MinPacingTime; } @@ -1629,20 +1609,18 @@ namespace stream void Stream::ProcessWindowDrop () { if (m_WindowSize > m_LastWindowDropSize) - { - m_LastWindowDropSize = (m_LastWindowDropSize + m_WindowSize + m_WindowSizeTail) / 2; - if (m_LastWindowDropSize > MAX_WINDOW_SIZE) m_LastWindowDropSize = MAX_WINDOW_SIZE; - } + m_LastWindowDropSize = (m_LastWindowDropSize + m_WindowSize) / 2; else m_LastWindowDropSize = m_WindowSize; m_WindowDropTargetSize = m_LastWindowDropSize - (m_LastWindowDropSize / 4); // -25%; - if (m_WindowDropTargetSize < MIN_WINDOW_SIZE) - m_WindowDropTargetSize = MIN_WINDOW_SIZE; + if (m_WindowDropTargetSize < MIN_WINDOW_SIZE + 1) + m_WindowDropTargetSize = MIN_WINDOW_SIZE + 1; + m_WindowSize = m_SentPackets.size (); // stop sending now + if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; m_WindowIncCounter = 0; // disable window growth - m_DropWindowDelaySequenceNumber = m_SequenceNumber + int(m_WindowDropTargetSize); + m_DropWindowDelaySequenceNumber = m_SequenceNumber; m_IsFirstACK = true; // ignore first RTT sample m_IsWinDropped = true; // don't drop window twice - m_WindowSizeTail = 0; UpdatePacingTime (); } @@ -1660,7 +1638,6 @@ namespace stream m_WindowIncCounter = 0; m_IsFirstRttSample = true; m_IsFirstACK = true; - m_WindowSizeTail = 0; UpdatePacingTime (); } diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index a9e8a404..71450b2b 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -295,10 +295,10 @@ namespace stream SendBufferQueue m_SendBuffer; double m_RTT, m_SlowRTT, m_SlowRTT2; float m_WindowSize, m_LastWindowDropSize, m_WindowDropTargetSize; - int m_WindowIncCounter, m_RTO, m_AckDelay, m_PrevRTTSample, m_WindowSizeTail; + int m_WindowIncCounter, m_RTO, m_AckDelay, m_PrevRTTSample; double m_Jitter; uint64_t m_MinPacingTime, m_PacingTime, m_PacingTimeRem, // microseconds - m_LastSendTime, m_LastACKRecieveTime, m_ACKRecieveInterval, m_RemoteLeaseChangeTime; // milliseconds + m_LastSendTime, m_RemoteLeaseChangeTime; // milliseconds uint64_t m_LastACKSendTime, m_PacketACKInterval, m_PacketACKIntervalRem; // for limit inbound speed int m_NumResendAttempts, m_NumPacketsToSend; size_t m_MTU;