Browse Source

slow start and congestion avoidance

pull/151/head
orignal 10 years ago
parent
commit
5887e8c8c4
  1. 27
      Streaming.cpp
  2. 5
      Streaming.h

27
Streaming.cpp

@ -17,7 +17,8 @@ namespace stream
m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service),
m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port) m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port),
m_WindowSize (MIN_WINDOW_SIZE), m_LastWindowSizeIncreaseTime (0)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
@ -27,7 +28,8 @@ namespace stream
m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0) m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0),
m_WindowSize (MIN_WINDOW_SIZE), m_LastWindowSizeIncreaseTime (0)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
} }
@ -239,10 +241,29 @@ namespace stream
m_SentPackets.erase (it++); m_SentPackets.erase (it++);
delete sentPacket; delete sentPacket;
acknowledged = true; acknowledged = true;
if (m_WindowSize < WINDOW_SIZE)
m_WindowSize++; // slow start
else
{
// linear growth
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
if (ts > m_LastWindowSizeIncreaseTime + INITIAL_RTT)
{
m_WindowSize++;
if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE;
m_LastWindowSizeIncreaseTime = ts;
}
}
} }
else else
break; break;
} }
if (nackCount > 0)
{
// congesion avoidance
m_WindowSize -= nackCount;
if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE;
}
if (m_SentPackets.empty ()) if (m_SentPackets.empty ())
m_ResendTimer.cancel (); m_ResendTimer.cancel ();
if (acknowledged) if (acknowledged)
@ -263,7 +284,7 @@ namespace stream
void Stream::SendBuffer () void Stream::SendBuffer ()
{ {
int numMsgs = WINDOW_SIZE - m_SentPackets.size (); int numMsgs = m_WindowSize - m_SentPackets.size ();
if (numMsgs <= 0) return; // window is full if (numMsgs <= 0) return; // window is full
bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet

5
Streaming.h

@ -45,6 +45,9 @@ namespace stream
const int ACK_SEND_TIMEOUT = 200; // in milliseconds const int ACK_SEND_TIMEOUT = 200; // in milliseconds
const int MAX_NUM_RESEND_ATTEMPTS = 5; const int MAX_NUM_RESEND_ATTEMPTS = 5;
const int WINDOW_SIZE = 6; // in messages const int WINDOW_SIZE = 6; // in messages
const int MIN_WINDOW_SIZE = 1;
const int MAX_WINDOW_SIZE = 128;
const int INITIAL_RTT = 8000; // in milliseconds
struct Packet struct Packet
{ {
@ -157,6 +160,8 @@ namespace stream
std::mutex m_SendBufferMutex; std::mutex m_SendBufferMutex;
std::stringstream m_SendBuffer; std::stringstream m_SendBuffer;
int m_WindowSize;
uint64_t m_LastWindowSizeIncreaseTime;
}; };
class StreamingDestination class StreamingDestination

Loading…
Cancel
Save