diff --git a/Streaming.cpp b/Streaming.cpp index 0b20c0e3..58335731 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -17,7 +17,8 @@ namespace stream m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), - m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port) + m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), + m_WindowSize (MIN_WINDOW_SIZE), m_LastWindowSizeIncreaseTime (0) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -27,7 +28,8 @@ namespace stream m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), - m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0) + m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), + m_WindowSize (MIN_WINDOW_SIZE), m_LastWindowSizeIncreaseTime (0) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -239,10 +241,29 @@ namespace stream m_SentPackets.erase (it++); delete sentPacket; acknowledged = true; + if (m_WindowSize < WINDOW_SIZE) + m_WindowSize++; // slow start + else + { + // linear growth + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + if (ts > m_LastWindowSizeIncreaseTime + INITIAL_RTT) + { + m_WindowSize++; + if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; + m_LastWindowSizeIncreaseTime = ts; + } + } } else break; } + if (nackCount > 0) + { + // congesion avoidance + m_WindowSize -= nackCount; + if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; + } if (m_SentPackets.empty ()) m_ResendTimer.cancel (); if (acknowledged) @@ -263,7 +284,7 @@ namespace stream void Stream::SendBuffer () { - int numMsgs = WINDOW_SIZE - m_SentPackets.size (); + int numMsgs = m_WindowSize - m_SentPackets.size (); if (numMsgs <= 0) return; // window is full bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet diff --git a/Streaming.h b/Streaming.h index 62452e6b..3d31f3e6 100644 --- a/Streaming.h +++ b/Streaming.h @@ -45,6 +45,9 @@ namespace stream const int ACK_SEND_TIMEOUT = 200; // in milliseconds const int MAX_NUM_RESEND_ATTEMPTS = 5; const int WINDOW_SIZE = 6; // in messages + const int MIN_WINDOW_SIZE = 1; + const int MAX_WINDOW_SIZE = 128; + const int INITIAL_RTT = 8000; // in milliseconds struct Packet { @@ -157,6 +160,8 @@ namespace stream std::mutex m_SendBufferMutex; std::stringstream m_SendBuffer; + int m_WindowSize; + uint64_t m_LastWindowSizeIncreaseTime; }; class StreamingDestination