Browse Source

calculate RTT and RTO

pull/1786/head
orignal 2 years ago
parent
commit
3bd40fc8b3
  1. 49
      libi2pd/SSU2Session.cpp
  2. 10
      libi2pd/SSU2Session.h

49
libi2pd/SSU2Session.cpp

@ -24,7 +24,8 @@ namespace transport
m_Server (server), m_Address (addr), m_RemoteTransports (0), m_Server (server), m_Address (addr), m_RemoteTransports (0),
m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown), m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown),
m_SendPacketNum (0), m_ReceivePacketNum (0), m_IsDataReceived (false), m_SendPacketNum (0), m_ReceivePacketNum (0), m_IsDataReceived (false),
m_WindowSize (SSU2_MIN_WINDOW_SIZE), m_RelayTag (0), m_WindowSize (SSU2_MIN_WINDOW_SIZE), m_RTT (SSU2_RESEND_INTERVAL),
m_RTO (SSU2_RESEND_INTERVAL*1.5), m_RelayTag (0),
m_ConnectTimer (server.GetService ()), m_TerminationReason (eSSU2TerminationReasonNormalClose), m_ConnectTimer (server.GetService ()), m_TerminationReason (eSSU2TerminationReasonNormalClose),
m_MaxPayloadSize (SSU2_MIN_PACKET_SIZE - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - 32) // min size m_MaxPayloadSize (SSU2_MIN_PACKET_SIZE - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - 32) // min size
{ {
@ -265,7 +266,7 @@ namespace transport
{ {
if (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize) if (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize)
{ {
auto nextResend = i2p::util::GetMillisecondsSinceEpoch () + SSU2_RESEND_INTERVAL; auto ts = i2p::util::GetMillisecondsSinceEpoch ();
auto packet = std::make_shared<SentPacket>(); auto packet = std::make_shared<SentPacket>();
size_t ackBlockSize = CreateAckBlock (packet->payload, m_MaxPayloadSize); size_t ackBlockSize = CreateAckBlock (packet->payload, m_MaxPayloadSize);
bool ackBlockSent = false; bool ackBlockSent = false;
@ -318,7 +319,7 @@ namespace transport
} }
// send right a way // send right a way
uint32_t packetNum = SendData (packet->payload, packet->payloadSize); uint32_t packetNum = SendData (packet->payload, packet->payloadSize);
packet->nextResendTime = nextResend; packet->sendTime = ts;
m_SentPackets.emplace (packetNum, packet); m_SentPackets.emplace (packetNum, packet);
packet = newPacket; // just ack block packet = newPacket; // just ack block
} }
@ -329,7 +330,7 @@ namespace transport
if (packet->payloadSize + 16 < m_MaxPayloadSize) if (packet->payloadSize + 16 < m_MaxPayloadSize)
packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
uint32_t packetNum = SendData (packet->payload, packet->payloadSize); uint32_t packetNum = SendData (packet->payload, packet->payloadSize);
packet->nextResendTime = nextResend; packet->sendTime = ts;
m_SentPackets.emplace (packetNum, packet); m_SentPackets.emplace (packetNum, packet);
} }
return ackBlockSent; return ackBlockSent;
@ -344,7 +345,7 @@ namespace transport
bool ackBlockSent = false; bool ackBlockSent = false;
uint32_t msgID; uint32_t msgID;
memcpy (&msgID, msg->GetHeader () + I2NP_HEADER_MSGID_OFFSET, 4); memcpy (&msgID, msg->GetHeader () + I2NP_HEADER_MSGID_OFFSET, 4);
auto nextResend = i2p::util::GetMillisecondsSinceEpoch () + SSU2_RESEND_INTERVAL; auto ts = i2p::util::GetMillisecondsSinceEpoch ();
auto packet = std::make_shared<SentPacket>(); auto packet = std::make_shared<SentPacket>();
if (extraSize >= 8) if (extraSize >= 8)
{ {
@ -353,7 +354,7 @@ namespace transport
if (packet->payloadSize + 12 < m_MaxPayloadSize) if (packet->payloadSize + 12 < m_MaxPayloadSize)
{ {
uint32_t packetNum = SendData (packet->payload, packet->payloadSize); uint32_t packetNum = SendData (packet->payload, packet->payloadSize);
packet->nextResendTime = nextResend; packet->sendTime = ts;
m_SentPackets.emplace (packetNum, packet); m_SentPackets.emplace (packetNum, packet);
packet = std::make_shared<SentPacket>(); packet = std::make_shared<SentPacket>();
} }
@ -367,7 +368,7 @@ namespace transport
extraSize -= offset; extraSize -= offset;
packet->payloadSize += size; packet->payloadSize += size;
uint32_t firstPacketNum = SendData (packet->payload, packet->payloadSize); uint32_t firstPacketNum = SendData (packet->payload, packet->payloadSize);
packet->nextResendTime = nextResend; packet->sendTime = ts;
m_SentPackets.emplace (firstPacketNum, packet); m_SentPackets.emplace (firstPacketNum, packet);
uint8_t fragmentNum = 0; uint8_t fragmentNum = 0;
while (msg->offset < msg->len) while (msg->offset < msg->len)
@ -379,7 +380,7 @@ namespace transport
if (msg->offset >= msg->len && packet->payloadSize + 16 < m_MaxPayloadSize) // last fragment if (msg->offset >= msg->len && packet->payloadSize + 16 < m_MaxPayloadSize) // last fragment
packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
uint32_t followonPacketNum = SendData (packet->payload, packet->payloadSize); uint32_t followonPacketNum = SendData (packet->payload, packet->payloadSize);
packet->nextResendTime = nextResend; packet->sendTime = ts;
m_SentPackets.emplace (followonPacketNum, packet); m_SentPackets.emplace (followonPacketNum, packet);
} }
return ackBlockSent; return ackBlockSent;
@ -388,18 +389,18 @@ namespace transport
void SSU2Session::Resend (uint64_t ts) void SSU2Session::Resend (uint64_t ts)
{ {
// resend handshake packet // resend handshake packet
if (m_SentHandshakePacket && ts >= m_SentHandshakePacket->nextResendTime) if (m_SentHandshakePacket && ts >= m_SentHandshakePacket->sendTime + SSU2_HANDSHAKE_RESEND_INTERVAL)
{ {
LogPrint (eLogDebug, "SSU2: Resending ", (int)m_State); LogPrint (eLogDebug, "SSU2: Resending ", (int)m_State);
ResendHandshakePacket (); ResendHandshakePacket ();
m_SentHandshakePacket->nextResendTime = ts + SSU2_HANDSHAKE_RESEND_INTERVAL; m_SentHandshakePacket->sendTime = ts;
return; return;
} }
// resend data packets // resend data packets
if (m_SentPackets.empty ()) return; if (m_SentPackets.empty ()) return;
std::map<uint32_t, std::shared_ptr<SentPacket> > resentPackets; std::map<uint32_t, std::shared_ptr<SentPacket> > resentPackets;
for (auto it = m_SentPackets.begin (); it != m_SentPackets.end (); ) for (auto it = m_SentPackets.begin (); it != m_SentPackets.end (); )
if (ts >= it->second->nextResendTime) if (ts >= it->second->sendTime + it->second->numResends*m_RTO)
{ {
if (it->second->numResends > SSU2_MAX_NUM_RESENDS) if (it->second->numResends > SSU2_MAX_NUM_RESENDS)
{ {
@ -413,7 +414,7 @@ namespace transport
{ {
uint32_t packetNum = SendData (it->second->payload, it->second->payloadSize); uint32_t packetNum = SendData (it->second->payload, it->second->payloadSize);
it->second->numResends++; it->second->numResends++;
it->second->nextResendTime = ts + it->second->numResends*SSU2_RESEND_INTERVAL; it->second->sendTime = ts;
resentPackets.emplace (packetNum, it->second); resentPackets.emplace (packetNum, it->second);
it = m_SentPackets.erase (it); it = m_SentPackets.erase (it);
} }
@ -486,7 +487,7 @@ namespace transport
m_EphemeralKeys = i2p::transport::transports.GetNextX25519KeysPair (); m_EphemeralKeys = i2p::transport::transports.GetNextX25519KeysPair ();
m_SentHandshakePacket.reset (new HandshakePacket); m_SentHandshakePacket.reset (new HandshakePacket);
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
m_SentHandshakePacket->nextResendTime = ts + SSU2_HANDSHAKE_RESEND_INTERVAL; m_SentHandshakePacket->sendTime = ts;
Header& header = m_SentHandshakePacket->header; Header& header = m_SentHandshakePacket->header;
uint8_t * headerX = m_SentHandshakePacket->headerX, uint8_t * headerX = m_SentHandshakePacket->headerX,
@ -587,7 +588,7 @@ namespace transport
m_EphemeralKeys = i2p::transport::transports.GetNextX25519KeysPair (); m_EphemeralKeys = i2p::transport::transports.GetNextX25519KeysPair ();
m_SentHandshakePacket.reset (new HandshakePacket); m_SentHandshakePacket.reset (new HandshakePacket);
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
m_SentHandshakePacket->nextResendTime = ts + SSU2_HANDSHAKE_RESEND_INTERVAL; m_SentHandshakePacket->sendTime = ts;
uint8_t kh2[32]; uint8_t kh2[32];
i2p::crypto::HKDF (m_NoiseState->m_CK, nullptr, 0, "SessCreateHeader", kh2, 32); // k_header_2 = HKDF(chainKey, ZEROLEN, "SessCreateHeader", 32) i2p::crypto::HKDF (m_NoiseState->m_CK, nullptr, 0, "SessCreateHeader", kh2, 32); // k_header_2 = HKDF(chainKey, ZEROLEN, "SessCreateHeader", 32)
@ -699,7 +700,7 @@ namespace transport
{ {
// we are Alice // we are Alice
m_SentHandshakePacket.reset (new HandshakePacket); m_SentHandshakePacket.reset (new HandshakePacket);
m_SentHandshakePacket->nextResendTime = i2p::util::GetMillisecondsSinceEpoch () + SSU2_HANDSHAKE_RESEND_INTERVAL; m_SentHandshakePacket->sendTime = i2p::util::GetMillisecondsSinceEpoch ();
uint8_t kh2[32]; uint8_t kh2[32];
i2p::crypto::HKDF (m_NoiseState->m_CK, nullptr, 0, "SessionConfirmed", kh2, 32); // k_header_2 = HKDF(chainKey, ZEROLEN, "SessionConfirmed", 32) i2p::crypto::HKDF (m_NoiseState->m_CK, nullptr, 0, "SessionConfirmed", kh2, 32); // k_header_2 = HKDF(chainKey, ZEROLEN, "SessionConfirmed", 32)
@ -1428,7 +1429,7 @@ namespace transport
// acnt // acnt
uint32_t ackThrough = bufbe32toh (buf); uint32_t ackThrough = bufbe32toh (buf);
uint32_t firstPacketNum = ackThrough > buf[4] ? ackThrough - buf[4] : 0; uint32_t firstPacketNum = ackThrough > buf[4] ? ackThrough - buf[4] : 0;
HandleAckRange (firstPacketNum, ackThrough); // acnt HandleAckRange (firstPacketNum, ackThrough, i2p::util::GetMillisecondsSinceEpoch ()); // acnt
// ranges // ranges
len -= 5; len -= 5;
const uint8_t * ranges = buf + 5; const uint8_t * ranges = buf + 5;
@ -1440,11 +1441,11 @@ namespace transport
if (*ranges > lastPacketNum + 1) break; if (*ranges > lastPacketNum + 1) break;
firstPacketNum = lastPacketNum - *ranges + 1; ranges++; // acks firstPacketNum = lastPacketNum - *ranges + 1; ranges++; // acks
len -= 2; len -= 2;
HandleAckRange (firstPacketNum, lastPacketNum); HandleAckRange (firstPacketNum, lastPacketNum, 0);
} }
} }
void SSU2Session::HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum) void SSU2Session::HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum, uint64_t ts)
{ {
if (firstPacketNum > lastPacketNum) return; if (firstPacketNum > lastPacketNum) return;
auto it = m_SentPackets.begin (); auto it = m_SentPackets.begin ();
@ -1454,6 +1455,18 @@ namespace transport
int numPackets = 0; int numPackets = 0;
while (it1 != m_SentPackets.end () && it1->first <= lastPacketNum) while (it1 != m_SentPackets.end () && it1->first <= lastPacketNum)
{ {
if (ts && !it1->second->numResends)
{
if (ts > it1->second->sendTime)
{
auto rtt = ts - it1->second->sendTime;
m_RTT = (m_RTT*m_SendPacketNum + rtt)/(m_SendPacketNum + 1);
m_RTO = m_RTT*1.5;
if (m_RTO < SSU2_MIN_RTO) m_RTO = SSU2_MIN_RTO;
if (m_RTO > SSU2_MAX_RTO) m_RTO = SSU2_MAX_RTO;
}
ts = 0; // update RTT one time per range
}
it1++; it1++;
numPackets++; numPackets++;
} }

10
libi2pd/SSU2Session.h

@ -40,6 +40,8 @@ namespace transport
const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds
const size_t SSU2_MIN_WINDOW_SIZE = 8; // in packets const size_t SSU2_MIN_WINDOW_SIZE = 8; // in packets
const size_t SSU2_MAX_WINDOW_SIZE = 128; // in packets const size_t SSU2_MAX_WINDOW_SIZE = 128; // in packets
const size_t SSU2_MIN_RTO = 10; // in milliseconds
const size_t SSU2_MAX_RTO = 2000; // in milliseconds
const size_t SSU2_MAX_OUTGOING_QUEUE_SIZE = 300; // in messages const size_t SSU2_MAX_OUTGOING_QUEUE_SIZE = 300; // in messages
const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send
@ -189,7 +191,7 @@ namespace transport
{ {
uint8_t payload[SSU2_MAX_PACKET_SIZE]; uint8_t payload[SSU2_MAX_PACKET_SIZE];
size_t payloadSize = 0; size_t payloadSize = 0;
uint64_t nextResendTime; // in milliseconds uint64_t sendTime; // in milliseconds
int numResends = 0; int numResends = 0;
}; };
@ -199,7 +201,7 @@ namespace transport
uint8_t headerX[48]; // part1 for SessionConfirmed uint8_t headerX[48]; // part1 for SessionConfirmed
uint8_t payload[SSU2_MAX_PACKET_SIZE*2]; uint8_t payload[SSU2_MAX_PACKET_SIZE*2];
size_t payloadSize = 0; size_t payloadSize = 0;
uint64_t nextResendTime = 0; // in milliseconds uint64_t sendTime = 0; // in milliseconds
bool isSecondFragment = false; // for SessionConfirmed bool isSecondFragment = false; // for SessionConfirmed
}; };
@ -272,7 +274,7 @@ namespace transport
void HandlePayload (const uint8_t * buf, size_t len); void HandlePayload (const uint8_t * buf, size_t len);
void HandleAck (const uint8_t * buf, size_t len); void HandleAck (const uint8_t * buf, size_t len);
void HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum); void HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum, uint64_t ts);
bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep); bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep);
size_t CreateEndpoint (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& ep); size_t CreateEndpoint (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& ep);
std::shared_ptr<const i2p::data::RouterInfo::Address> FindLocalAddress () const; std::shared_ptr<const i2p::data::RouterInfo::Address> FindLocalAddress () const;
@ -325,7 +327,7 @@ namespace transport
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
i2p::I2NPMessagesHandler m_Handler; i2p::I2NPMessagesHandler m_Handler;
bool m_IsDataReceived; bool m_IsDataReceived;
size_t m_WindowSize; size_t m_WindowSize, m_RTT, m_RTO;
uint32_t m_RelayTag; // between Bob and Charlie uint32_t m_RelayTag; // between Bob and Charlie
OnEstablished m_OnEstablished; // callback from Established OnEstablished m_OnEstablished; // callback from Established
boost::asio::deadline_timer m_ConnectTimer; boost::asio::deadline_timer m_ConnectTimer;

Loading…
Cancel
Save