From 3e889ee06c4b6a55265c9769c5b47456d37b2f5d Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 10 Mar 2015 11:11:42 -0400 Subject: [PATCH] resend packing with RTO interval --- Streaming.cpp | 14 +++++++------- Streaming.h | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 63d3320c..0826a110 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -17,7 +17,8 @@ namespace stream m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), - m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0) + m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), + m_LastWindowSizeIncreaseTime (0) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -28,7 +29,7 @@ namespace stream m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), - m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0) + m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -235,9 +236,6 @@ namespace stream if (nacked) { LogPrint (eLogDebug, "Packet ", seqn, " NACK"); - (*it)->numResendAttempts++; - (*it)->sendTime = ts; - SendPackets (std::vector { *it }); it++; continue; } @@ -245,6 +243,7 @@ namespace stream auto sentPacket = *it; uint64_t rtt = ts - sentPacket->sendTime; m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); + m_RTO = m_RTT*1.5; // TODO: implement it better LogPrint (eLogDebug, "Packet ", seqn, " acknowledged rtt=", rtt); m_SentPackets.erase (it++); delete sentPacket; @@ -313,7 +312,7 @@ namespace stream size += 4; // ack Through packet[size] = 0; size++; // NACK count - packet[size] = RESEND_TIMEOUT; + packet[size] = m_RTO/1000; size++; // resend delay if (m_Status == eStreamStatusNew) { @@ -596,7 +595,7 @@ namespace stream void Stream::ScheduleResend () { m_ResendTimer.cancel (); - m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_TIMEOUT)); + m_ResendTimer.expires_from_now (boost::posix_time::milliseconds(m_RTO)); m_ResendTimer.async_wait (std::bind (&Stream::HandleResendTimer, shared_from_this (), std::placeholders::_1)); } @@ -610,6 +609,7 @@ namespace stream std::vector packets; for (auto it : m_SentPackets) { + if (ts < it->sendTime + m_RTO) continue; // don't resend too early it->numResendAttempts++; if (first && it->numResendAttempts == 1) // detect congesion at first attempt of first packet only congesion = true; diff --git a/Streaming.h b/Streaming.h index 541a931a..2f73ad92 100644 --- a/Streaming.h +++ b/Streaming.h @@ -41,13 +41,13 @@ namespace stream const size_t STREAMING_MTU = 1730; const size_t MAX_PACKET_SIZE = 4096; const size_t COMPRESSION_THRESHOLD_SIZE = 66; - const int RESEND_TIMEOUT = 10; // in seconds const int ACK_SEND_TIMEOUT = 200; // in milliseconds const int MAX_NUM_RESEND_ATTEMPTS = 5; const int WINDOW_SIZE = 6; // in messages const int MIN_WINDOW_SIZE = 1; const int MAX_WINDOW_SIZE = 128; const int INITIAL_RTT = 8000; // in milliseconds + const int INITIAL_RTO = 9000; // in milliseconds struct Packet { @@ -177,7 +177,7 @@ namespace stream std::mutex m_SendBufferMutex; std::stringstream m_SendBuffer; - int m_WindowSize, m_RTT; + int m_WindowSize, m_RTT, m_RTO; uint64_t m_LastWindowSizeIncreaseTime; };