From 4a6847da8d2342222df3f60fb48e2322040ae083 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 3 Feb 2015 13:46:44 -0500 Subject: [PATCH] RTT --- HTTPServer.cpp | 3 ++- Streaming.cpp | 27 ++++++++++++++++++--------- Streaming.h | 7 +++++-- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 569ebe51..2e50e0fd 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -840,7 +840,8 @@ namespace util s << it.first << "->" << i2p::client::context.GetAddressBook ().ToAddress(it.second->GetRemoteIdentity ()) << " "; s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]"; s << " [out:" << it.second->GetSendQueueSize () << "][in:" << it.second->GetReceiveQueueSize () << "]"; - s << " [buf:" << it.second->GetSendBufferSize () << "]"; + s << "[buf:" << it.second->GetSendBufferSize () << "]"; + s << "[RTT:" << it.second->GetRTT () << "]"; s << "
"<< std::endl; } } diff --git a/Streaming.cpp b/Streaming.cpp index b69b2d48..d4fbc208 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -15,10 +15,9 @@ namespace stream std::shared_ptr remote, int port): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), - m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), - m_ResendTimer (m_Service), m_AckSendTimer (m_Service), - m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), - m_WindowSize (MIN_WINDOW_SIZE), m_LastWindowSizeIncreaseTime (0) + m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), + m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), + m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -28,8 +27,8 @@ namespace stream m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), - m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), - m_WindowSize (MIN_WINDOW_SIZE), m_LastWindowSizeIncreaseTime (0) + m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), + m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -211,6 +210,7 @@ namespace stream void Stream::ProcessAck (Packet * packet) { bool acknowledged = false; + auto ts = i2p::util::GetMillisecondsSinceEpoch (); uint32_t ackThrough = packet->GetAckThrough (); int nackCount = packet->GetNACKCount (); for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) @@ -235,7 +235,9 @@ namespace stream } } auto sentPacket = *it; - LogPrint (eLogDebug, "Packet ", seqn, " acknowledged"); + uint64_t rtt = ts - sentPacket->sendTime; + m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); + LogPrint (eLogDebug, "Packet ", seqn, " acknowledged rtt=", rtt); m_SentPackets.erase (it++); delete sentPacket; acknowledged = true; @@ -244,8 +246,7 @@ namespace stream else { // linear growth - auto ts = i2p::util::GetMillisecondsSinceEpoch (); - if (ts > m_LastWindowSizeIncreaseTime + INITIAL_RTT) + if (ts > m_LastWindowSizeIncreaseTime + m_RTT) { m_WindowSize++; if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; @@ -348,8 +349,12 @@ namespace stream m_IsAckSendScheduled = false; m_AckSendTimer.cancel (); bool isEmpty = m_SentPackets.empty (); + auto ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto it: packets) + { + it->sendTime = ts; m_SentPackets.insert (it); + } SendPackets (packets); if (isEmpty) ScheduleResend (); @@ -560,6 +565,7 @@ namespace stream { if (ecode != boost::asio::error::operation_aborted) { + auto ts = i2p::util::GetMillisecondsSinceEpoch (); bool congesion = false, first = true; std::vector packets; for (auto it : m_SentPackets) @@ -569,7 +575,10 @@ namespace stream congesion = true; first = false; if (it->numResendAttempts <= MAX_NUM_RESEND_ATTEMPTS) + { + it->sendTime = ts; packets.push_back (it); + } else { LogPrint (eLogWarning, "Packet ", it->GetSeqn (), " was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate"); diff --git a/Streaming.h b/Streaming.h index 7b68dba7..e91b11c7 100644 --- a/Streaming.h +++ b/Streaming.h @@ -54,8 +54,9 @@ namespace stream size_t len, offset; uint8_t buf[MAX_PACKET_SIZE]; int numResendAttempts; + uint64_t sendTime; - Packet (): len (0), offset (0), numResendAttempts (0) {}; + Packet (): len (0), offset (0), numResendAttempts (0), sendTime (0) {}; uint8_t * GetBuffer () { return buf + offset; }; size_t GetLength () const { return len - offset; }; @@ -116,6 +117,8 @@ namespace stream size_t GetSendQueueSize () const { return m_SentPackets.size (); }; size_t GetReceiveQueueSize () const { return m_ReceiveQueue.size (); }; size_t GetSendBufferSize () const { return m_SendBuffer.rdbuf ()->in_avail (); }; + int GetWindowSize () const { return m_WindowSize; }; + int GetRTT () const { return m_RTT; }; private: @@ -161,7 +164,7 @@ namespace stream std::mutex m_SendBufferMutex; std::stringstream m_SendBuffer; - int m_WindowSize; + int m_WindowSize, m_RTT; uint64_t m_LastWindowSizeIncreaseTime; };