Browse Source

RTT

pull/157/head
orignal 10 years ago
parent
commit
4a6847da8d
  1. 3
      HTTPServer.cpp
  2. 27
      Streaming.cpp
  3. 7
      Streaming.h

3
HTTPServer.cpp

@ -840,7 +840,8 @@ namespace util
s << it.first << "->" << i2p::client::context.GetAddressBook ().ToAddress(it.second->GetRemoteIdentity ()) << " "; s << it.first << "->" << i2p::client::context.GetAddressBook ().ToAddress(it.second->GetRemoteIdentity ()) << " ";
s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]"; s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]";
s << " [out:" << it.second->GetSendQueueSize () << "][in:" << it.second->GetReceiveQueueSize () << "]"; s << " [out:" << it.second->GetSendQueueSize () << "][in:" << it.second->GetReceiveQueueSize () << "]";
s << " [buf:" << it.second->GetSendBufferSize () << "]"; s << "[buf:" << it.second->GetSendBufferSize () << "]";
s << "[RTT:" << it.second->GetRTT () << "]";
s << "<br>"<< std::endl; s << "<br>"<< std::endl;
} }
} }

27
Streaming.cpp

@ -15,10 +15,9 @@ namespace stream
std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service), m_SendStreamID (0), std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service), m_SendStreamID (0),
m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false),
m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service),
m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0)
m_WindowSize (MIN_WINDOW_SIZE), m_LastWindowSizeIncreaseTime (0)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
@ -28,8 +27,8 @@ namespace stream
m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE),
m_WindowSize (MIN_WINDOW_SIZE), m_LastWindowSizeIncreaseTime (0) m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
} }
@ -211,6 +210,7 @@ namespace stream
void Stream::ProcessAck (Packet * packet) void Stream::ProcessAck (Packet * packet)
{ {
bool acknowledged = false; bool acknowledged = false;
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
uint32_t ackThrough = packet->GetAckThrough (); uint32_t ackThrough = packet->GetAckThrough ();
int nackCount = packet->GetNACKCount (); int nackCount = packet->GetNACKCount ();
for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();)
@ -235,7 +235,9 @@ namespace stream
} }
} }
auto sentPacket = *it; auto sentPacket = *it;
LogPrint (eLogDebug, "Packet ", seqn, " acknowledged"); uint64_t rtt = ts - sentPacket->sendTime;
m_RTT = (m_RTT*seqn + rtt)/(seqn + 1);
LogPrint (eLogDebug, "Packet ", seqn, " acknowledged rtt=", rtt);
m_SentPackets.erase (it++); m_SentPackets.erase (it++);
delete sentPacket; delete sentPacket;
acknowledged = true; acknowledged = true;
@ -244,8 +246,7 @@ namespace stream
else else
{ {
// linear growth // linear growth
auto ts = i2p::util::GetMillisecondsSinceEpoch (); if (ts > m_LastWindowSizeIncreaseTime + m_RTT)
if (ts > m_LastWindowSizeIncreaseTime + INITIAL_RTT)
{ {
m_WindowSize++; m_WindowSize++;
if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE;
@ -348,8 +349,12 @@ namespace stream
m_IsAckSendScheduled = false; m_IsAckSendScheduled = false;
m_AckSendTimer.cancel (); m_AckSendTimer.cancel ();
bool isEmpty = m_SentPackets.empty (); bool isEmpty = m_SentPackets.empty ();
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: packets) for (auto it: packets)
{
it->sendTime = ts;
m_SentPackets.insert (it); m_SentPackets.insert (it);
}
SendPackets (packets); SendPackets (packets);
if (isEmpty) if (isEmpty)
ScheduleResend (); ScheduleResend ();
@ -560,6 +565,7 @@ namespace stream
{ {
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
{ {
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
bool congesion = false, first = true; bool congesion = false, first = true;
std::vector<Packet *> packets; std::vector<Packet *> packets;
for (auto it : m_SentPackets) for (auto it : m_SentPackets)
@ -569,7 +575,10 @@ namespace stream
congesion = true; congesion = true;
first = false; first = false;
if (it->numResendAttempts <= MAX_NUM_RESEND_ATTEMPTS) if (it->numResendAttempts <= MAX_NUM_RESEND_ATTEMPTS)
{
it->sendTime = ts;
packets.push_back (it); packets.push_back (it);
}
else else
{ {
LogPrint (eLogWarning, "Packet ", it->GetSeqn (), " was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate"); LogPrint (eLogWarning, "Packet ", it->GetSeqn (), " was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate");

7
Streaming.h

@ -54,8 +54,9 @@ namespace stream
size_t len, offset; size_t len, offset;
uint8_t buf[MAX_PACKET_SIZE]; uint8_t buf[MAX_PACKET_SIZE];
int numResendAttempts; int numResendAttempts;
uint64_t sendTime;
Packet (): len (0), offset (0), numResendAttempts (0) {}; Packet (): len (0), offset (0), numResendAttempts (0), sendTime (0) {};
uint8_t * GetBuffer () { return buf + offset; }; uint8_t * GetBuffer () { return buf + offset; };
size_t GetLength () const { return len - offset; }; size_t GetLength () const { return len - offset; };
@ -116,6 +117,8 @@ namespace stream
size_t GetSendQueueSize () const { return m_SentPackets.size (); }; size_t GetSendQueueSize () const { return m_SentPackets.size (); };
size_t GetReceiveQueueSize () const { return m_ReceiveQueue.size (); }; size_t GetReceiveQueueSize () const { return m_ReceiveQueue.size (); };
size_t GetSendBufferSize () const { return m_SendBuffer.rdbuf ()->in_avail (); }; size_t GetSendBufferSize () const { return m_SendBuffer.rdbuf ()->in_avail (); };
int GetWindowSize () const { return m_WindowSize; };
int GetRTT () const { return m_RTT; };
private: private:
@ -161,7 +164,7 @@ namespace stream
std::mutex m_SendBufferMutex; std::mutex m_SendBufferMutex;
std::stringstream m_SendBuffer; std::stringstream m_SendBuffer;
int m_WindowSize; int m_WindowSize, m_RTT;
uint64_t m_LastWindowSizeIncreaseTime; uint64_t m_LastWindowSizeIncreaseTime;
}; };

Loading…
Cancel
Save