Browse Source

resend packing with RTO interval

pull/163/head
orignal 10 years ago
parent
commit
3e889ee06c
  1. 14
      Streaming.cpp
  2. 4
      Streaming.h

14
Streaming.cpp

@ -17,7 +17,8 @@ namespace stream @@ -17,7 +17,8 @@ namespace stream
m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service),
m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port),
m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0)
m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO),
m_LastWindowSizeIncreaseTime (0)
{
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease ();
@ -28,7 +29,7 @@ namespace stream @@ -28,7 +29,7 @@ namespace stream
m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE),
m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0)
m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0)
{
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
}
@ -235,9 +236,6 @@ namespace stream @@ -235,9 +236,6 @@ namespace stream
if (nacked)
{
LogPrint (eLogDebug, "Packet ", seqn, " NACK");
(*it)->numResendAttempts++;
(*it)->sendTime = ts;
SendPackets (std::vector<Packet *> { *it });
it++;
continue;
}
@ -245,6 +243,7 @@ namespace stream @@ -245,6 +243,7 @@ namespace stream
auto sentPacket = *it;
uint64_t rtt = ts - sentPacket->sendTime;
m_RTT = (m_RTT*seqn + rtt)/(seqn + 1);
m_RTO = m_RTT*1.5; // TODO: implement it better
LogPrint (eLogDebug, "Packet ", seqn, " acknowledged rtt=", rtt);
m_SentPackets.erase (it++);
delete sentPacket;
@ -313,7 +312,7 @@ namespace stream @@ -313,7 +312,7 @@ namespace stream
size += 4; // ack Through
packet[size] = 0;
size++; // NACK count
packet[size] = RESEND_TIMEOUT;
packet[size] = m_RTO/1000;
size++; // resend delay
if (m_Status == eStreamStatusNew)
{
@ -596,7 +595,7 @@ namespace stream @@ -596,7 +595,7 @@ namespace stream
void Stream::ScheduleResend ()
{
m_ResendTimer.cancel ();
m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_TIMEOUT));
m_ResendTimer.expires_from_now (boost::posix_time::milliseconds(m_RTO));
m_ResendTimer.async_wait (std::bind (&Stream::HandleResendTimer,
shared_from_this (), std::placeholders::_1));
}
@ -610,6 +609,7 @@ namespace stream @@ -610,6 +609,7 @@ namespace stream
std::vector<Packet *> packets;
for (auto it : m_SentPackets)
{
if (ts < it->sendTime + m_RTO) continue; // don't resend too early
it->numResendAttempts++;
if (first && it->numResendAttempts == 1) // detect congesion at first attempt of first packet only
congesion = true;

4
Streaming.h

@ -41,13 +41,13 @@ namespace stream @@ -41,13 +41,13 @@ namespace stream
const size_t STREAMING_MTU = 1730;
const size_t MAX_PACKET_SIZE = 4096;
const size_t COMPRESSION_THRESHOLD_SIZE = 66;
const int RESEND_TIMEOUT = 10; // in seconds
const int ACK_SEND_TIMEOUT = 200; // in milliseconds
const int MAX_NUM_RESEND_ATTEMPTS = 5;
const int WINDOW_SIZE = 6; // in messages
const int MIN_WINDOW_SIZE = 1;
const int MAX_WINDOW_SIZE = 128;
const int INITIAL_RTT = 8000; // in milliseconds
const int INITIAL_RTO = 9000; // in milliseconds
struct Packet
{
@ -177,7 +177,7 @@ namespace stream @@ -177,7 +177,7 @@ namespace stream
std::mutex m_SendBufferMutex;
std::stringstream m_SendBuffer;
int m_WindowSize, m_RTT;
int m_WindowSize, m_RTT, m_RTO;
uint64_t m_LastWindowSizeIncreaseTime;
};

Loading…
Cancel
Save