diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 4846446f..5f1b4bb5 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -24,7 +24,8 @@ namespace transport m_Server (server), m_Address (addr), m_RemoteTransports (0), m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown), m_SendPacketNum (0), m_ReceivePacketNum (0), m_IsDataReceived (false), - m_WindowSize (SSU2_MIN_WINDOW_SIZE), m_RelayTag (0), + m_WindowSize (SSU2_MIN_WINDOW_SIZE), m_RTT (SSU2_RESEND_INTERVAL), + m_RTO (SSU2_RESEND_INTERVAL*1.5), m_RelayTag (0), m_ConnectTimer (server.GetService ()), m_TerminationReason (eSSU2TerminationReasonNormalClose), m_MaxPayloadSize (SSU2_MIN_PACKET_SIZE - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - 32) // min size { @@ -265,7 +266,7 @@ namespace transport { if (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize) { - auto nextResend = i2p::util::GetMillisecondsSinceEpoch () + SSU2_RESEND_INTERVAL; + auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto packet = std::make_shared(); size_t ackBlockSize = CreateAckBlock (packet->payload, m_MaxPayloadSize); bool ackBlockSent = false; @@ -318,7 +319,7 @@ namespace transport } // send right a way uint32_t packetNum = SendData (packet->payload, packet->payloadSize); - packet->nextResendTime = nextResend; + packet->sendTime = ts; m_SentPackets.emplace (packetNum, packet); packet = newPacket; // just ack block } @@ -329,7 +330,7 @@ namespace transport if (packet->payloadSize + 16 < m_MaxPayloadSize) packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); uint32_t packetNum = SendData (packet->payload, packet->payloadSize); - packet->nextResendTime = nextResend; + packet->sendTime = ts; m_SentPackets.emplace (packetNum, packet); } return ackBlockSent; @@ -344,7 +345,7 @@ namespace transport bool ackBlockSent = false; uint32_t msgID; memcpy (&msgID, msg->GetHeader () + I2NP_HEADER_MSGID_OFFSET, 4); - auto nextResend = i2p::util::GetMillisecondsSinceEpoch () + SSU2_RESEND_INTERVAL; + auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto packet = std::make_shared(); if (extraSize >= 8) { @@ -353,7 +354,7 @@ namespace transport if (packet->payloadSize + 12 < m_MaxPayloadSize) { uint32_t packetNum = SendData (packet->payload, packet->payloadSize); - packet->nextResendTime = nextResend; + packet->sendTime = ts; m_SentPackets.emplace (packetNum, packet); packet = std::make_shared(); } @@ -367,7 +368,7 @@ namespace transport extraSize -= offset; packet->payloadSize += size; uint32_t firstPacketNum = SendData (packet->payload, packet->payloadSize); - packet->nextResendTime = nextResend; + packet->sendTime = ts; m_SentPackets.emplace (firstPacketNum, packet); uint8_t fragmentNum = 0; while (msg->offset < msg->len) @@ -379,7 +380,7 @@ namespace transport if (msg->offset >= msg->len && packet->payloadSize + 16 < m_MaxPayloadSize) // last fragment packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); uint32_t followonPacketNum = SendData (packet->payload, packet->payloadSize); - packet->nextResendTime = nextResend; + packet->sendTime = ts; m_SentPackets.emplace (followonPacketNum, packet); } return ackBlockSent; @@ -388,18 +389,18 @@ namespace transport void SSU2Session::Resend (uint64_t ts) { // resend handshake packet - if (m_SentHandshakePacket && ts >= m_SentHandshakePacket->nextResendTime) + if (m_SentHandshakePacket && ts >= m_SentHandshakePacket->sendTime + SSU2_HANDSHAKE_RESEND_INTERVAL) { LogPrint (eLogDebug, "SSU2: Resending ", (int)m_State); ResendHandshakePacket (); - m_SentHandshakePacket->nextResendTime = ts + SSU2_HANDSHAKE_RESEND_INTERVAL; + m_SentHandshakePacket->sendTime = ts; return; } // resend data packets if (m_SentPackets.empty ()) return; std::map > resentPackets; for (auto it = m_SentPackets.begin (); it != m_SentPackets.end (); ) - if (ts >= it->second->nextResendTime) + if (ts >= it->second->sendTime + it->second->numResends*m_RTO) { if (it->second->numResends > SSU2_MAX_NUM_RESENDS) { @@ -413,7 +414,7 @@ namespace transport { uint32_t packetNum = SendData (it->second->payload, it->second->payloadSize); it->second->numResends++; - it->second->nextResendTime = ts + it->second->numResends*SSU2_RESEND_INTERVAL; + it->second->sendTime = ts; resentPackets.emplace (packetNum, it->second); it = m_SentPackets.erase (it); } @@ -486,7 +487,7 @@ namespace transport m_EphemeralKeys = i2p::transport::transports.GetNextX25519KeysPair (); m_SentHandshakePacket.reset (new HandshakePacket); auto ts = i2p::util::GetMillisecondsSinceEpoch (); - m_SentHandshakePacket->nextResendTime = ts + SSU2_HANDSHAKE_RESEND_INTERVAL; + m_SentHandshakePacket->sendTime = ts; Header& header = m_SentHandshakePacket->header; uint8_t * headerX = m_SentHandshakePacket->headerX, @@ -587,7 +588,7 @@ namespace transport m_EphemeralKeys = i2p::transport::transports.GetNextX25519KeysPair (); m_SentHandshakePacket.reset (new HandshakePacket); auto ts = i2p::util::GetMillisecondsSinceEpoch (); - m_SentHandshakePacket->nextResendTime = ts + SSU2_HANDSHAKE_RESEND_INTERVAL; + m_SentHandshakePacket->sendTime = ts; uint8_t kh2[32]; i2p::crypto::HKDF (m_NoiseState->m_CK, nullptr, 0, "SessCreateHeader", kh2, 32); // k_header_2 = HKDF(chainKey, ZEROLEN, "SessCreateHeader", 32) @@ -699,7 +700,7 @@ namespace transport { // we are Alice m_SentHandshakePacket.reset (new HandshakePacket); - m_SentHandshakePacket->nextResendTime = i2p::util::GetMillisecondsSinceEpoch () + SSU2_HANDSHAKE_RESEND_INTERVAL; + m_SentHandshakePacket->sendTime = i2p::util::GetMillisecondsSinceEpoch (); uint8_t kh2[32]; i2p::crypto::HKDF (m_NoiseState->m_CK, nullptr, 0, "SessionConfirmed", kh2, 32); // k_header_2 = HKDF(chainKey, ZEROLEN, "SessionConfirmed", 32) @@ -1428,7 +1429,7 @@ namespace transport // acnt uint32_t ackThrough = bufbe32toh (buf); uint32_t firstPacketNum = ackThrough > buf[4] ? ackThrough - buf[4] : 0; - HandleAckRange (firstPacketNum, ackThrough); // acnt + HandleAckRange (firstPacketNum, ackThrough, i2p::util::GetMillisecondsSinceEpoch ()); // acnt // ranges len -= 5; const uint8_t * ranges = buf + 5; @@ -1440,11 +1441,11 @@ namespace transport if (*ranges > lastPacketNum + 1) break; firstPacketNum = lastPacketNum - *ranges + 1; ranges++; // acks len -= 2; - HandleAckRange (firstPacketNum, lastPacketNum); + HandleAckRange (firstPacketNum, lastPacketNum, 0); } } - void SSU2Session::HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum) + void SSU2Session::HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum, uint64_t ts) { if (firstPacketNum > lastPacketNum) return; auto it = m_SentPackets.begin (); @@ -1454,6 +1455,18 @@ namespace transport int numPackets = 0; while (it1 != m_SentPackets.end () && it1->first <= lastPacketNum) { + if (ts && !it1->second->numResends) + { + if (ts > it1->second->sendTime) + { + auto rtt = ts - it1->second->sendTime; + m_RTT = (m_RTT*m_SendPacketNum + rtt)/(m_SendPacketNum + 1); + m_RTO = m_RTT*1.5; + if (m_RTO < SSU2_MIN_RTO) m_RTO = SSU2_MIN_RTO; + if (m_RTO > SSU2_MAX_RTO) m_RTO = SSU2_MAX_RTO; + } + ts = 0; // update RTT one time per range + } it1++; numPackets++; } diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index d8c7ce4f..14ec04a2 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -40,6 +40,8 @@ namespace transport const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds const size_t SSU2_MIN_WINDOW_SIZE = 8; // in packets const size_t SSU2_MAX_WINDOW_SIZE = 128; // in packets + const size_t SSU2_MIN_RTO = 10; // in milliseconds + const size_t SSU2_MAX_RTO = 2000; // in milliseconds const size_t SSU2_MAX_OUTGOING_QUEUE_SIZE = 300; // in messages const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send @@ -189,7 +191,7 @@ namespace transport { uint8_t payload[SSU2_MAX_PACKET_SIZE]; size_t payloadSize = 0; - uint64_t nextResendTime; // in milliseconds + uint64_t sendTime; // in milliseconds int numResends = 0; }; @@ -199,7 +201,7 @@ namespace transport uint8_t headerX[48]; // part1 for SessionConfirmed uint8_t payload[SSU2_MAX_PACKET_SIZE*2]; size_t payloadSize = 0; - uint64_t nextResendTime = 0; // in milliseconds + uint64_t sendTime = 0; // in milliseconds bool isSecondFragment = false; // for SessionConfirmed }; @@ -272,7 +274,7 @@ namespace transport void HandlePayload (const uint8_t * buf, size_t len); void HandleAck (const uint8_t * buf, size_t len); - void HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum); + void HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum, uint64_t ts); bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep); size_t CreateEndpoint (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& ep); std::shared_ptr FindLocalAddress () const; @@ -325,7 +327,7 @@ namespace transport std::list > m_SendQueue; i2p::I2NPMessagesHandler m_Handler; bool m_IsDataReceived; - size_t m_WindowSize; + size_t m_WindowSize, m_RTT, m_RTO; uint32_t m_RelayTag; // between Bob and Charlie OnEstablished m_OnEstablished; // callback from Established boost::asio::deadline_timer m_ConnectTimer;