Browse Source

Merge pull request #2050 from PurpleI2P/openssl

2.51.0
pull/1939/merge
orignal 8 months ago committed by GitHub
parent
commit
46c72a7137
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 61
      ChangeLog
  2. 7
      Makefile.bsd
  3. 24
      Win32/Win32App.cpp
  4. 5
      contrib/rpm/i2pd-git.spec
  5. 5
      contrib/rpm/i2pd.spec
  6. 7
      daemon/I2PControl.cpp
  7. 10
      debian/changelog
  8. 2
      libi2pd/ECIESX25519AEADRatchetSession.cpp
  9. 23
      libi2pd/FS.cpp
  10. 4
      libi2pd/Garlic.cpp
  11. 2
      libi2pd/Garlic.h
  12. 10
      libi2pd/I2NPProtocol.h
  13. 6
      libi2pd/SSU2.cpp
  14. 4
      libi2pd/SSU2.h
  15. 57
      libi2pd/SSU2Session.cpp
  16. 10
      libi2pd/SSU2Session.h
  17. 160
      libi2pd/Streaming.cpp
  18. 13
      libi2pd/Streaming.h
  19. 6
      libi2pd/version.h

61
ChangeLog

@ -1,6 +1,67 @@
# for this file format description, # for this file format description,
# see https://github.com/olivierlacan/keep-a-changelog # see https://github.com/olivierlacan/keep-a-changelog
## [2.51.0] - 2024-04-06
### Added
- Non-blocking mode for UDP sockets
- Set SSU2 socket buffer size based on bandwidth limit
- Encrypted tunnel tests
- Support for multiple UDP server tunnels on one destination
- Publish medium congestion indication
- Local domain sockets for SOCKS proxy upstream
- Tunnel status "declined" in web console
- SAM error reply "Incompatible crypto" if remote destination has incompatible crypto
- Reduce amount of traffic by handling local message drops
- Keep SSU2 socket open even if it fails to bind
- Lower SSU2 resend traffic spikes
- Expiration for messages in SSU2 send queue
- Use EWMA for stream RTT estimation
- Request choking delay if too many NACKs in stream
- Allow 0ms latency for tunnel
- Randomize tunnels selection for tests
### Changed
- Upstream SOCKS proxy from SOCKS4 to SOCKS5
- Transit tunnels limit to 4 bytes. Default value to 10K
- Reply CANT_REACH_PEER if connect to ourselves in SAM
- Don't send already expired I2NP messages
- Use monotonic timer to measure tunnel test latency
- Standard NTCP2 frame doesn't exceed 16K
- Always send request through tunnels in case of restricted routes
- Don't delete connected routers from NetDb
- Send lookup reply directly to reply tunnel gateway if possible
- Reduce unreachable router ban interval to 8 minutes
- Don't request banned routers / don't try to connect to unreachable router
- Consider 'M' routers as low bandwidth
- Limit minimal received SSU2 packet size to 40 bytes
- Bob picks peer test session only if Charlie's address supports peer testing
- Reject peer test msg 2 if peer testing is not supported
- Don't request termination if SSU2 session was not established
- Set maximum SSU2 queue size depending on RTT value
- New streaming RTT calculation algorithm
- Don't double initial RTO for streams when changing tunnels
- Restore failed tunnel if test or data for inbound tunnel received
- Don't fail last remaining tunnel in pool
- Publish LeasetSet again if local destination was not ready or no tunnels
- Make more attempts to pick high bandwidth hop for client tunnel
- Reduced SSU2 session termination timeout to 165 seconds
- Reseeds list
### Fixed
- ECIESx25519 symmetric key tagset early expiration
- Encrypted LeaseSet lookup
- Outbound tunnel build fails if it's endpoint is the same as reply tunnel gateway
- I2PControl RouterManager returns invalid JSON when unknown params are passed
- Mix of data between different UDP sessions on the same server
- TARGET_OS_SIMULATOR check
- Handling of "reservedrange" param
- New NTCP2 session gets teminated upon termination of old one
- New SSU2 session gets teminated upon termination of old one
- Peer test to non-supporting router
- Streaming ackThrough off 1 if number of NACKs exceeds 255
- Race condition in ECIESx25519 tags table
- Good tunnel becomes failed
- Crash when packet comes to terminated stream
- Stream hangs during LeaseSet update
## [2.50.2] - 2024-01-06 ## [2.50.2] - 2024-01-06
###Fixed ###Fixed
- Crash with OpenSSL 3.2.0 - Crash with OpenSSL 3.2.0

7
Makefile.bsd

@ -6,7 +6,12 @@ CXXFLAGS ?= ${CXX_DEBUG} -Wall -Wextra -Wno-unused-parameter -pedantic -Wno-misl
## (e.g. -fstack-protector-strong -Wformat -Werror=format-security), we do not want to remove ## (e.g. -fstack-protector-strong -Wformat -Werror=format-security), we do not want to remove
## -std=c++11. If you want to remove this variable please do so in a way that allows setting ## -std=c++11. If you want to remove this variable please do so in a way that allows setting
## custom FLAGS to work at build-time. ## custom FLAGS to work at build-time.
NEEDED_CXXFLAGS = -std=c++11 CXXVER := $(shell $(CXX) -dumpversion)
ifeq (${CXXVER}, "4.2.1") # older clang always returned 4.2.1
NEEDED_CXXFLAGS = -std=c++11
else # newer versions support C++17
NEEDED_CXXFLAGS = -std=c++17
endif
DEFINES = -D_GLIBCXX_USE_NANOSLEEP=1 DEFINES = -D_GLIBCXX_USE_NANOSLEEP=1
INCFLAGS = -I/usr/include/ -I/usr/local/include/ INCFLAGS = -I/usr/include/ -I/usr/local/include/
LDFLAGS = ${LD_DEBUG} -Wl,-rpath,/usr/local/lib -L/usr/local/lib LDFLAGS = ${LD_DEBUG} -Wl,-rpath,/usr/local/lib -L/usr/local/lib

24
Win32/Win32App.cpp

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2022, The PurpleI2P Project * Copyright (c) 2013-2024, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -145,7 +145,7 @@ namespace win32
s << bytes << " Bytes\n"; s << bytes << " Bytes\n";
} }
static void ShowNetworkStatus (std::stringstream& s, RouterStatus status, bool testing) static void ShowNetworkStatus (std::stringstream& s, RouterStatus status, bool testing, RouterError error)
{ {
switch (status) switch (status)
{ {
@ -158,18 +158,24 @@ namespace win32
}; };
if (testing) if (testing)
s << " (Test)"; s << " (Test)";
if (i2p::context.GetError () != eRouterErrorNone) if (error != eRouterErrorNone)
{ {
switch (i2p::context.GetError ()) switch (error)
{ {
case eRouterErrorClockSkew: case eRouterErrorClockSkew:
s << " - Clock skew"; s << " - " << tr("Clock skew");
break; break;
case eRouterErrorOffline: case eRouterErrorOffline:
s << " - Offline"; s << " - " << tr("Offline");
break; break;
case eRouterErrorSymmetricNAT: case eRouterErrorSymmetricNAT:
s << " - Symmetric NAT"; s << " - " << tr("Symmetric NAT");
break;
case eRouterErrorFullConeNAT:
s << " - " << tr("Full cone NAT");
break;
case eRouterErrorNoDescriptors:
s << " - " << tr("No Descriptors");
break; break;
default: ; default: ;
} }
@ -180,11 +186,11 @@ namespace win32
{ {
s << "\n"; s << "\n";
s << "Status: "; s << "Status: ";
ShowNetworkStatus (s, i2p::context.GetStatus (), i2p::context.GetTesting ()); ShowNetworkStatus (s, i2p::context.GetStatus (), i2p::context.GetTesting(), i2p::context.GetError ());
if (i2p::context.SupportsV6 ()) if (i2p::context.SupportsV6 ())
{ {
s << " / "; s << " / ";
ShowNetworkStatus (s, i2p::context.GetStatusV6 (), i2p::context.GetTestingV6 ()); ShowNetworkStatus (s, i2p::context.GetStatusV6 (), i2p::context.GetTestingV6(), i2p::context.GetErrorV6 ());
} }
s << "; "; s << "; ";
s << "Success Rate: " << i2p::tunnel::tunnels.GetTunnelCreationSuccessRate() << "%\n"; s << "Success Rate: " << i2p::tunnel::tunnels.GetTunnelCreationSuccessRate() << "%\n";

5
contrib/rpm/i2pd-git.spec

@ -1,7 +1,7 @@
%define git_hash %(git rev-parse HEAD | cut -c -7) %define git_hash %(git rev-parse HEAD | cut -c -7)
Name: i2pd-git Name: i2pd-git
Version: 2.50.2 Version: 2.51.0
Release: git%{git_hash}%{?dist} Release: git%{git_hash}%{?dist}
Summary: I2P router written in C++ Summary: I2P router written in C++
Conflicts: i2pd Conflicts: i2pd
@ -144,6 +144,9 @@ getent passwd i2pd >/dev/null || \
%changelog %changelog
* Sat Apr 06 2024 orignal <orignal@i2pmail.org> - 2.51.0
- update to 2.51.0
* Sat Jan 06 2024 orignal <orignal@i2pmail.org> - 2.50.2 * Sat Jan 06 2024 orignal <orignal@i2pmail.org> - 2.50.2
- update to 2.50.2 - update to 2.50.2

5
contrib/rpm/i2pd.spec

@ -1,5 +1,5 @@
Name: i2pd Name: i2pd
Version: 2.50.2 Version: 2.51.0
Release: 1%{?dist} Release: 1%{?dist}
Summary: I2P router written in C++ Summary: I2P router written in C++
Conflicts: i2pd-git Conflicts: i2pd-git
@ -142,6 +142,9 @@ getent passwd i2pd >/dev/null || \
%changelog %changelog
* Sat Apr 06 2024 orignal <orignal@i2pmail.org> - 2.51.0
- update to 2.51.0
* Sat Jan 06 2024 orignal <orignal@i2pmail.org> - 2.50.2 * Sat Jan 06 2024 orignal <orignal@i2pmail.org> - 2.50.2
- update to 2.50.2 - update to 2.50.2

7
daemon/I2PControl.cpp

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2022, The PurpleI2P Project * Copyright (c) 2013-2024, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -338,10 +338,11 @@ namespace client
{ {
for (auto it = params.begin (); it != params.end (); it++) for (auto it = params.begin (); it != params.end (); it++)
{ {
if (it != params.begin ()) results << ",";
LogPrint (eLogDebug, "I2PControl: RouterManager request: ", it->first); LogPrint (eLogDebug, "I2PControl: RouterManager request: ", it->first);
auto it1 = m_RouterManagerHandlers.find (it->first); auto it1 = m_RouterManagerHandlers.find (it->first);
if (it1 != m_RouterManagerHandlers.end ()) { if (it1 != m_RouterManagerHandlers.end ())
{
if (it != params.begin ()) results << ",";
(this->*(it1->second))(results); (this->*(it1->second))(results);
} else } else
LogPrint (eLogError, "I2PControl: RouterManager unknown request: ", it->first); LogPrint (eLogError, "I2PControl: RouterManager unknown request: ", it->first);

10
debian/changelog vendored

@ -1,8 +1,14 @@
i2pd (2.50.2) unstable; urgency=medium i2pd (2.51.0-1) unstable; urgency=medium
* updated to version 2.51.0/0.9.62
-- orignal <orignal@i2pmail.org> Sat, 06 Apr 2024 16:00:00 +0000
i2pd (2.50.2-1) unstable; urgency=medium
* updated to version 2.50.2/0.9.61 * updated to version 2.50.2/0.9.61
-- orignal <orignal@i2pmail.org> Sat, 06 Jan 2024 16:00:00 +0000 -- orignal <orignal@i2pmail.org> Sat, 06 Jan 2024 16:00:00 +0000
i2pd (2.50.1-1) unstable; urgency=medium i2pd (2.50.1-1) unstable; urgency=medium

2
libi2pd/ECIESX25519AEADRatchetSession.cpp

@ -863,7 +863,7 @@ namespace garlic
payloadLen += msg->GetPayloadLength () + 13; payloadLen += msg->GetPayloadLength () + 13;
if (m_Destination) payloadLen += 32; if (m_Destination) payloadLen += 32;
} }
if (GetLeaseSetUpdateStatus () == eLeaseSetSubmitted && ts > GetLeaseSetSubmissionTime () + LEASET_CONFIRMATION_TIMEOUT) if (GetLeaseSetUpdateStatus () == eLeaseSetSubmitted && ts > GetLeaseSetSubmissionTime () + LEASESET_CONFIRMATION_TIMEOUT)
{ {
// resubmit non-confirmed LeaseSet // resubmit non-confirmed LeaseSet
SetLeaseSetUpdateStatus (eLeaseSetUpdated); SetLeaseSetUpdateStatus (eLeaseSetUpdated);

23
libi2pd/FS.cpp

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2022, The PurpleI2P Project * Copyright (c) 2013-2024, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -9,6 +9,11 @@
#include <algorithm> #include <algorithm>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#if defined(MAC_OSX)
#include <boost/system/system_error.hpp>
#include <TargetConditionals.h>
#endif
#ifdef _WIN32 #ifdef _WIN32
#include <shlobj.h> #include <shlobj.h>
#include <windows.h> #include <windows.h>
@ -251,8 +256,22 @@ namespace fs {
auto p = root + i2p::fs::dirSep + prefix1 + chars[i]; auto p = root + i2p::fs::dirSep + prefix1 + chars[i];
if (boost::filesystem::exists(p)) if (boost::filesystem::exists(p))
continue; continue;
if (boost::filesystem::create_directory(p)) #if TARGET_OS_SIMULATOR
// ios simulator fs says it is case sensitive, but it is not
boost::system::error_code ec;
if (boost::filesystem::create_directory(p, ec))
continue;
switch (ec.value()) {
case boost::system::errc::file_exists:
case boost::system::errc::success:
continue;
default:
throw boost::system::system_error( ec, __func__ );
}
#else
if (boost::filesystem::create_directory(p))
continue; /* ^ throws exception on failure */ continue; /* ^ throws exception on failure */
#endif
return false; return false;
} }
return true; return true;

4
libi2pd/Garlic.cpp

@ -80,7 +80,7 @@ namespace garlic
void GarlicRoutingSession::CleanupUnconfirmedLeaseSet (uint64_t ts) void GarlicRoutingSession::CleanupUnconfirmedLeaseSet (uint64_t ts)
{ {
if (m_LeaseSetUpdateMsgID && ts*1000LL > m_LeaseSetSubmissionTime + LEASET_CONFIRMATION_TIMEOUT) if (m_LeaseSetUpdateMsgID && ts*1000LL > m_LeaseSetSubmissionTime + LEASESET_CONFIRMATION_TIMEOUT)
{ {
if (GetOwner ()) if (GetOwner ())
GetOwner ()->RemoveDeliveryStatusSession (m_LeaseSetUpdateMsgID); GetOwner ()->RemoveDeliveryStatusSession (m_LeaseSetUpdateMsgID);
@ -232,7 +232,7 @@ namespace garlic
if (GetOwner ()) if (GetOwner ())
{ {
// resubmit non-confirmed LeaseSet // resubmit non-confirmed LeaseSet
if (GetLeaseSetUpdateStatus () == eLeaseSetSubmitted && ts > GetLeaseSetSubmissionTime () + LEASET_CONFIRMATION_TIMEOUT) if (GetLeaseSetUpdateStatus () == eLeaseSetSubmitted && ts > GetLeaseSetSubmissionTime () + LEASESET_CONFIRMATION_TIMEOUT)
{ {
SetLeaseSetUpdateStatus (eLeaseSetUpdated); SetLeaseSetUpdateStatus (eLeaseSetUpdated);
SetSharedRoutingPath (nullptr); // invalidate path since leaseset was not confirmed SetSharedRoutingPath (nullptr); // invalidate path since leaseset was not confirmed

2
libi2pd/Garlic.h

@ -50,7 +50,7 @@ namespace garlic
const int INCOMING_TAGS_EXPIRATION_TIMEOUT = 960; // 16 minutes const int INCOMING_TAGS_EXPIRATION_TIMEOUT = 960; // 16 minutes
const int OUTGOING_TAGS_EXPIRATION_TIMEOUT = 720; // 12 minutes const int OUTGOING_TAGS_EXPIRATION_TIMEOUT = 720; // 12 minutes
const int OUTGOING_TAGS_CONFIRMATION_TIMEOUT = 10; // 10 seconds const int OUTGOING_TAGS_CONFIRMATION_TIMEOUT = 10; // 10 seconds
const int LEASET_CONFIRMATION_TIMEOUT = 4000; // in milliseconds const int LEASESET_CONFIRMATION_TIMEOUT = 4000; // in milliseconds
const int ROUTING_PATH_EXPIRATION_TIMEOUT = 30; // 30 seconds const int ROUTING_PATH_EXPIRATION_TIMEOUT = 30; // 30 seconds
const int ROUTING_PATH_MAX_NUM_TIMES_USED = 100; // how many times might be used const int ROUTING_PATH_MAX_NUM_TIMES_USED = 100; // how many times might be used

10
libi2pd/I2NPProtocol.h

@ -152,6 +152,9 @@ namespace tunnel
const size_t I2NP_MAX_MESSAGE_SIZE = 62708; const size_t I2NP_MAX_MESSAGE_SIZE = 62708;
const size_t I2NP_MAX_SHORT_MESSAGE_SIZE = 4096; const size_t I2NP_MAX_SHORT_MESSAGE_SIZE = 4096;
const size_t I2NP_MAX_MEDIUM_MESSAGE_SIZE = 16384; const size_t I2NP_MAX_MEDIUM_MESSAGE_SIZE = 16384;
const unsigned int I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_FACTOR = 3; // multiples of RTT
const unsigned int I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MIN = 200000; // in microseconds
const unsigned int I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX = 2000000; // in microseconds
const unsigned int I2NP_MESSAGE_EXPIRATION_TIMEOUT = 8000; // in milliseconds (as initial RTT) const unsigned int I2NP_MESSAGE_EXPIRATION_TIMEOUT = 8000; // in milliseconds (as initial RTT)
const unsigned int I2NP_MESSAGE_CLOCK_SKEW = 60*1000; // 1 minute in milliseconds const unsigned int I2NP_MESSAGE_CLOCK_SKEW = 60*1000; // 1 minute in milliseconds
@ -161,9 +164,10 @@ namespace tunnel
size_t len, offset, maxLen; size_t len, offset, maxLen;
std::shared_ptr<i2p::tunnel::InboundTunnel> from; std::shared_ptr<i2p::tunnel::InboundTunnel> from;
std::function<void ()> onDrop; std::function<void ()> onDrop;
uint64_t enqueueTime; // monotonic microseconds
I2NPMessage (): buf (nullptr),len (I2NP_HEADER_SIZE + 2), I2NPMessage (): buf (nullptr), len (I2NP_HEADER_SIZE + 2),
offset(2), maxLen (0), from (nullptr) {}; // reserve 2 bytes for NTCP header offset(2), maxLen (0), from (nullptr), enqueueTime (0) {}; // reserve 2 bytes for NTCP header
// header accessors // header accessors
uint8_t * GetHeader () { return GetBuffer (); }; uint8_t * GetHeader () { return GetBuffer (); };
@ -173,7 +177,9 @@ namespace tunnel
void SetMsgID (uint32_t msgID) { htobe32buf (GetHeader () + I2NP_HEADER_MSGID_OFFSET, msgID); }; void SetMsgID (uint32_t msgID) { htobe32buf (GetHeader () + I2NP_HEADER_MSGID_OFFSET, msgID); };
uint32_t GetMsgID () const { return bufbe32toh (GetHeader () + I2NP_HEADER_MSGID_OFFSET); }; uint32_t GetMsgID () const { return bufbe32toh (GetHeader () + I2NP_HEADER_MSGID_OFFSET); };
void SetExpiration (uint64_t expiration) { htobe64buf (GetHeader () + I2NP_HEADER_EXPIRATION_OFFSET, expiration); }; void SetExpiration (uint64_t expiration) { htobe64buf (GetHeader () + I2NP_HEADER_EXPIRATION_OFFSET, expiration); };
void SetEnqueueTime (uint64_t mts) { enqueueTime = mts; };
uint64_t GetExpiration () const { return bufbe64toh (GetHeader () + I2NP_HEADER_EXPIRATION_OFFSET); }; uint64_t GetExpiration () const { return bufbe64toh (GetHeader () + I2NP_HEADER_EXPIRATION_OFFSET); };
uint64_t GetEnqueueTime () const { return enqueueTime; };
void SetSize (uint16_t size) { htobe16buf (GetHeader () + I2NP_HEADER_SIZE_OFFSET, size); }; void SetSize (uint16_t size) { htobe16buf (GetHeader () + I2NP_HEADER_SIZE_OFFSET, size); };
uint16_t GetSize () const { return bufbe16toh (GetHeader () + I2NP_HEADER_SIZE_OFFSET); }; uint16_t GetSize () const { return bufbe16toh (GetHeader () + I2NP_HEADER_SIZE_OFFSET); };
void UpdateSize () { SetSize (GetPayloadLength ()); }; void UpdateSize () { SetSize (GetPayloadLength ()); };

6
libi2pd/SSU2.cpp

@ -436,7 +436,11 @@ namespace transport
{ {
auto ident = it->second->GetRemoteIdentity (); auto ident = it->second->GetRemoteIdentity ();
if (ident) if (ident)
m_SessionsByRouterHash.erase (ident->GetIdentHash ()); {
auto it1 = m_SessionsByRouterHash.find (ident->GetIdentHash ());
if (it1 != m_SessionsByRouterHash.end () && it->second == it1->second)
m_SessionsByRouterHash.erase (it1);
}
if (m_LastSession == it->second) if (m_LastSession == it->second)
m_LastSession = nullptr; m_LastSession = nullptr;
m_Sessions.erase (it); m_Sessions.erase (it);

4
libi2pd/SSU2.h

@ -21,8 +21,8 @@ namespace transport
{ {
const int SSU2_TERMINATION_CHECK_TIMEOUT = 25; // in seconds const int SSU2_TERMINATION_CHECK_TIMEOUT = 25; // in seconds
const int SSU2_CLEANUP_INTERVAL = 72; // in seconds const int SSU2_CLEANUP_INTERVAL = 72; // in seconds
const int SSU2_RESEND_CHECK_TIMEOUT = 400; // in milliseconds const int SSU2_RESEND_CHECK_TIMEOUT = 40; // in milliseconds
const int SSU2_RESEND_CHECK_TIMEOUT_VARIANCE = 100; // in milliseconds const int SSU2_RESEND_CHECK_TIMEOUT_VARIANCE = 10; // in milliseconds
const int SSU2_RESEND_CHECK_MORE_TIMEOUT = 10; // in milliseconds const int SSU2_RESEND_CHECK_MORE_TIMEOUT = 10; // in milliseconds
const size_t SSU2_MAX_RESEND_PACKETS = 128; // packets to resend at the time const size_t SSU2_MAX_RESEND_PACKETS = 128; // packets to resend at the time
const uint64_t SSU2_SOCKET_MIN_BUFFER_SIZE = 128 * 1024; const uint64_t SSU2_SOCKET_MIN_BUFFER_SIZE = 128 * 1024;

57
libi2pd/SSU2Session.cpp

@ -84,9 +84,12 @@ namespace transport
m_Server (server), m_Address (addr), m_RemoteTransports (0), m_RemotePeerTestTransports (0), m_Server (server), m_Address (addr), m_RemoteTransports (0), m_RemotePeerTestTransports (0),
m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown), m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown),
m_SendPacketNum (0), m_ReceivePacketNum (0), m_LastDatetimeSentPacketNum (0), m_SendPacketNum (0), m_ReceivePacketNum (0), m_LastDatetimeSentPacketNum (0),
m_IsDataReceived (false), m_WindowSize (SSU2_MIN_WINDOW_SIZE), m_IsDataReceived (false), m_RTT (SSU2_UNKNOWN_RTT),
m_RTT (SSU2_RESEND_INTERVAL), m_RTO (SSU2_RESEND_INTERVAL*SSU2_kAPPA), m_RelayTag (0), m_MsgLocalExpirationTimeout (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX),
m_ConnectTimer (server.GetService ()), m_TerminationReason (eSSU2TerminationReasonNormalClose), m_MsgLocalSemiExpirationTimeout (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX / 2),
m_WindowSize (SSU2_MIN_WINDOW_SIZE),
m_RTO (SSU2_INITIAL_RTO), m_RelayTag (0),m_ConnectTimer (server.GetService ()),
m_TerminationReason (eSSU2TerminationReasonNormalClose),
m_MaxPayloadSize (SSU2_MIN_PACKET_SIZE - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - 32) // min size m_MaxPayloadSize (SSU2_MIN_PACKET_SIZE - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - 32) // min size
{ {
m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState); m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState);
@ -294,8 +297,10 @@ namespace transport
{ {
m_TerminationReason = reason; m_TerminationReason = reason;
SendTermination (); SendTermination ();
m_State = eSSU2SessionStateClosing;
} }
m_State = eSSU2SessionStateClosing; else
Done ();
} }
void SSU2Session::Established () void SSU2Session::Established ()
@ -353,25 +358,33 @@ namespace transport
void SSU2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs) void SSU2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
{ {
if (m_State == eSSU2SessionStateTerminated) return; if (m_State == eSSU2SessionStateTerminated) return;
bool isSemiFull = m_SendQueue.size () > SSU2_MAX_OUTGOING_QUEUE_SIZE/2; uint64_t mts = i2p::util::GetMonotonicMicroseconds ();
bool isSemiFull = false;
if (m_SendQueue.size ())
{
int64_t queueLag = (int64_t)mts - (int64_t)m_SendQueue.front ()->GetEnqueueTime ();
isSemiFull = queueLag > m_MsgLocalSemiExpirationTimeout;
if (isSemiFull)
{
LogPrint (eLogWarning, "SSU2: Outgoing messages queue to ",
i2p::data::GetIdentHashAbbreviation (GetRemoteIdentity ()->GetIdentHash ()),
" is semi-full (size = ", m_SendQueue.size (), ", lag = ", queueLag / 1000, ", rtt = ", (int)m_RTT, ")");
}
}
for (auto it: msgs) for (auto it: msgs)
{
if (isSemiFull && it->onDrop) if (isSemiFull && it->onDrop)
it->Drop (); // drop earlier because we can handle it it->Drop (); // drop earlier because we can handle it
else else
{
it->SetEnqueueTime (mts);
m_SendQueue.push_back (std::move (it)); m_SendQueue.push_back (std::move (it));
}
}
SendQueue (); SendQueue ();
if (m_SendQueue.size () > 0) // windows is full if (m_SendQueue.size () > 0) // windows is full
{ Resend (i2p::util::GetMillisecondsSinceEpoch ());
if (m_SendQueue.size () <= SSU2_MAX_OUTGOING_QUEUE_SIZE)
Resend (i2p::util::GetMillisecondsSinceEpoch ());
else
{
LogPrint (eLogWarning, "SSU2: Outgoing messages queue size to ",
GetIdentHashBase64(), " exceeds ", SSU2_MAX_OUTGOING_QUEUE_SIZE);
RequestTermination (eSSU2TerminationReasonTimeout);
}
}
SetSendQueueSize (m_SendQueue.size ()); SetSendQueueSize (m_SendQueue.size ());
} }
@ -380,6 +393,7 @@ namespace transport
if (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize) if (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize)
{ {
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
uint64_t mts = i2p::util::GetMonotonicMicroseconds ();
auto packet = m_Server.GetSentPacketsPool ().AcquireShared (); auto packet = m_Server.GetSentPacketsPool ().AcquireShared ();
size_t ackBlockSize = CreateAckBlock (packet->payload, m_MaxPayloadSize); size_t ackBlockSize = CreateAckBlock (packet->payload, m_MaxPayloadSize);
bool ackBlockSent = false; bool ackBlockSent = false;
@ -387,7 +401,7 @@ namespace transport
while (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize) while (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize)
{ {
auto msg = m_SendQueue.front (); auto msg = m_SendQueue.front ();
if (!msg || msg->IsExpired (ts)) if (!msg || msg->IsExpired (ts) || msg->GetEnqueueTime() + m_MsgLocalExpirationTimeout < mts)
{ {
// drop null or expired message // drop null or expired message
if (msg) msg->Drop (); if (msg) msg->Drop ();
@ -527,7 +541,7 @@ namespace transport
if (m_SentPackets.empty ()) return 0; if (m_SentPackets.empty ()) return 0;
std::map<uint32_t, std::shared_ptr<SSU2SentPacket> > resentPackets; std::map<uint32_t, std::shared_ptr<SSU2SentPacket> > resentPackets;
for (auto it = m_SentPackets.begin (); it != m_SentPackets.end (); ) for (auto it = m_SentPackets.begin (); it != m_SentPackets.end (); )
if (ts >= it->second->sendTime + it->second->numResends*m_RTO) if (ts >= it->second->sendTime + (it->second->numResends + 1) * m_RTO)
{ {
if (it->second->numResends > SSU2_MAX_NUM_RESENDS) if (it->second->numResends > SSU2_MAX_NUM_RESENDS)
{ {
@ -1743,8 +1757,15 @@ namespace transport
if (ts > it1->second->sendTime) if (ts > it1->second->sendTime)
{ {
auto rtt = ts - it1->second->sendTime; auto rtt = ts - it1->second->sendTime;
m_RTT = std::round ((m_RTT*m_SendPacketNum + rtt)/(m_SendPacketNum + 1.0)); if (m_RTT != SSU2_UNKNOWN_RTT)
m_RTT = SSU2_RTT_EWMA_ALPHA * rtt + (1.0 - SSU2_RTT_EWMA_ALPHA) * m_RTT;
else
m_RTT = rtt;
m_RTO = m_RTT*SSU2_kAPPA; m_RTO = m_RTT*SSU2_kAPPA;
m_MsgLocalExpirationTimeout = std::max (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MIN,
std::min (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX,
(unsigned int)(m_RTT * 1000 * I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_FACTOR)));
m_MsgLocalSemiExpirationTimeout = m_MsgLocalExpirationTimeout / 2;
if (m_RTO < SSU2_MIN_RTO) m_RTO = SSU2_MIN_RTO; if (m_RTO < SSU2_MIN_RTO) m_RTO = SSU2_MIN_RTO;
if (m_RTO > SSU2_MAX_RTO) m_RTO = SSU2_MAX_RTO; if (m_RTO > SSU2_MAX_RTO) m_RTO = SSU2_MAX_RTO;
} }

10
libi2pd/SSU2Session.h

@ -36,7 +36,6 @@ namespace transport
const size_t SSU2_MAX_PACKET_SIZE = 1500; const size_t SSU2_MAX_PACKET_SIZE = 1500;
const size_t SSU2_MIN_PACKET_SIZE = 1280; const size_t SSU2_MIN_PACKET_SIZE = 1280;
const int SSU2_HANDSHAKE_RESEND_INTERVAL = 1000; // in milliseconds const int SSU2_HANDSHAKE_RESEND_INTERVAL = 1000; // in milliseconds
const int SSU2_RESEND_INTERVAL = 300; // in milliseconds
const int SSU2_MAX_NUM_RESENDS = 5; const int SSU2_MAX_NUM_RESENDS = 5;
const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds
const int SSU2_MAX_NUM_RECEIVED_I2NP_MSGIDS = 5000; // how many msgID we store for duplicates check const int SSU2_MAX_NUM_RECEIVED_I2NP_MSGIDS = 5000; // how many msgID we store for duplicates check
@ -45,9 +44,11 @@ namespace transport
const size_t SSU2_MIN_WINDOW_SIZE = 16; // in packets const size_t SSU2_MIN_WINDOW_SIZE = 16; // in packets
const size_t SSU2_MAX_WINDOW_SIZE = 256; // in packets const size_t SSU2_MAX_WINDOW_SIZE = 256; // in packets
const size_t SSU2_MIN_RTO = 100; // in milliseconds const size_t SSU2_MIN_RTO = 100; // in milliseconds
const size_t SSU2_INITIAL_RTO = 540; // in milliseconds
const size_t SSU2_MAX_RTO = 2500; // in milliseconds const size_t SSU2_MAX_RTO = 2500; // in milliseconds
const double SSU2_UNKNOWN_RTT = -1;
const double SSU2_RTT_EWMA_ALPHA = 0.125;
const float SSU2_kAPPA = 1.8; const float SSU2_kAPPA = 1.8;
const size_t SSU2_MAX_OUTGOING_QUEUE_SIZE = 500; // in messages
const int SSU2_MAX_NUM_ACNT = 255; // acnt, acks or nacks const int SSU2_MAX_NUM_ACNT = 255; // acnt, acks or nacks
const int SSU2_MAX_NUM_ACK_PACKETS = 511; // ackthrough + acnt + 1 range const int SSU2_MAX_NUM_ACK_PACKETS = 511; // ackthrough + acnt + 1 range
const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send
@ -357,7 +358,10 @@ namespace transport
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
i2p::I2NPMessagesHandler m_Handler; i2p::I2NPMessagesHandler m_Handler;
bool m_IsDataReceived; bool m_IsDataReceived;
size_t m_WindowSize, m_RTT, m_RTO; double m_RTT;
int m_MsgLocalExpirationTimeout;
int m_MsgLocalSemiExpirationTimeout;
size_t m_WindowSize, m_RTO;
uint32_t m_RelayTag; // between Bob and Charlie uint32_t m_RelayTag; // between Bob and Charlie
OnEstablished m_OnEstablished; // callback from Established OnEstablished m_OnEstablished; // callback from Established
boost::asio::deadline_timer m_ConnectTimer; boost::asio::deadline_timer m_ConnectTimer;

160
libi2pd/Streaming.cpp

@ -68,11 +68,12 @@ namespace stream
Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, Stream::Stream (boost::asio::io_service& service, StreamingDestination& local,
std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service), std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service),
m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_SendStreamID (0), m_SequenceNumber (0),
m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service),
m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port),
m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_RTT (INITIAL_RTT), m_WindowSize (MIN_WINDOW_SIZE), m_RTO (INITIAL_RTO),
m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()),
m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU)
{ {
@ -81,11 +82,12 @@ namespace stream
} }
Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): Stream::Stream (boost::asio::io_service& service, StreamingDestination& local):
m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Service (service), m_SendStreamID (0), m_SequenceNumber (0),
m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT),
m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_WindowSize (MIN_WINDOW_SIZE), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()),
m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU)
{ {
RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); RAND_bytes ((uint8_t *)&m_RecvStreamID, 4);
@ -287,6 +289,8 @@ namespace stream
m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer, m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer,
shared_from_this (), std::placeholders::_1)); shared_from_this (), std::placeholders::_1));
} }
if (delayRequested >= DELAY_CHOKING)
m_WindowSize = 1;
} }
optionData += 2; optionData += 2;
} }
@ -404,6 +408,8 @@ namespace stream
LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber); LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber);
return; return;
} }
int rttSample = INT_MAX;
bool firstRttSample = false;
int nackCount = packet->GetNACKCount (); int nackCount = packet->GetNACKCount ();
for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();)
{ {
@ -427,41 +433,51 @@ namespace stream
} }
} }
auto sentPacket = *it; auto sentPacket = *it;
uint64_t rtt = ts - sentPacket->sendTime; int64_t rtt = (int64_t)ts - (int64_t)sentPacket->sendTime;
if(ts < sentPacket->sendTime) if (rtt < 0)
LogPrint (eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime);
if (!seqn)
{ {
LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); firstRttSample = true;
rtt = 1; rttSample = rtt < 0 ? 1 : rtt;
} }
if (seqn) else if (!sentPacket->resent && seqn > m_TunnelsChangeSequenceNumber && rtt >= 0)
m_RTT = std::round (RTT_EWMA_ALPHA * m_RTT + (1.0 - RTT_EWMA_ALPHA) * rtt); rttSample = std::min (rttSample, (int)rtt);
else
m_RTT = rtt;
m_RTO = m_RTT*1.5; // TODO: implement it better
LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime);
m_SentPackets.erase (it++); m_SentPackets.erase (it++);
m_LocalDestination.DeletePacket (sentPacket); m_LocalDestination.DeletePacket (sentPacket);
acknowledged = true; acknowledged = true;
if (m_WindowSize < WINDOW_SIZE) if (m_WindowSize < WINDOW_SIZE)
m_WindowSize++; // slow start m_WindowSize++; // slow start
else
{
// linear growth
if (ts > m_LastWindowSizeIncreaseTime + m_RTT)
{
m_WindowSize++;
if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE;
m_LastWindowSizeIncreaseTime = ts;
}
}
if (!seqn && m_RoutingSession) // first message confirmed
m_RoutingSession->SetSharedRoutingPath (
std::make_shared<i2p::garlic::GarlicRoutingPath> (
i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, m_RTT, 0, 0}));
} }
else else
break; break;
} }
if (rttSample != INT_MAX)
{
if (firstRttSample)
m_RTT = rttSample;
else
m_RTT = RTT_EWMA_ALPHA * rttSample + (1.0 - RTT_EWMA_ALPHA) * m_RTT;
bool wasInitial = m_RTO == INITIAL_RTO;
m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better
if (wasInitial)
ScheduleResend ();
}
if (acknowledged && m_WindowSize >= WINDOW_SIZE)
{
// linear growth
if (ts > m_LastWindowSizeIncreaseTime + m_RTT)
{
m_WindowSize++;
if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE;
m_LastWindowSizeIncreaseTime = ts;
}
}
if (firstRttSample && m_RoutingSession)
m_RoutingSession->SetSharedRoutingPath (
std::make_shared<i2p::garlic::GarlicRoutingPath> (
i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0, 0}));
if (m_SentPackets.empty ()) if (m_SentPackets.empty ())
m_ResendTimer.cancel (); m_ResendTimer.cancel ();
if (acknowledged) if (acknowledged)
@ -677,6 +693,7 @@ namespace stream
htobe32buf (packet + size, lastReceivedSeqn); htobe32buf (packet + size, lastReceivedSeqn);
size += 4; // ack Through size += 4; // ack Through
uint8_t numNacks = 0; uint8_t numNacks = 0;
bool choking = false;
if (lastReceivedSeqn > m_LastReceivedSequenceNumber) if (lastReceivedSeqn > m_LastReceivedSequenceNumber)
{ {
// fill NACKs // fill NACKs
@ -688,7 +705,8 @@ namespace stream
if (numNacks + (seqn - nextSeqn) >= 256) if (numNacks + (seqn - nextSeqn) >= 256)
{ {
LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn);
htobe32buf (packet + 12, nextSeqn); // change ack Through htobe32buf (packet + 12, nextSeqn - 1); // change ack Through back
choking = true;
break; break;
} }
for (uint32_t i = nextSeqn; i < seqn; i++) for (uint32_t i = nextSeqn; i < seqn; i++)
@ -711,9 +729,16 @@ namespace stream
} }
packet[size] = 0; packet[size] = 0;
size++; // resend delay size++; // resend delay
htobuf16 (packet + size, 0); // no flags set htobuf16 (packet + size, choking ? PACKET_FLAG_DELAY_REQUESTED : 0); // no flags set or delay
size += 2; // flags size += 2; // flags
htobuf16 (packet + size, 0); // no options if (choking)
{
htobuf16 (packet + size, 2); // 2 bytes delay interval
htobuf16 (packet + size + 2, DELAY_CHOKING); // set choking interval
size += 2;
}
else
htobuf16 (packet + size, 0); // no options
size += 2; // options size size += 2; // options size
p.len = size; p.len = size;
@ -881,7 +906,7 @@ namespace stream
m_CurrentOutboundTunnel = routingPath->outboundTunnel; m_CurrentOutboundTunnel = routingPath->outboundTunnel;
m_CurrentRemoteLease = routingPath->remoteLease; m_CurrentRemoteLease = routingPath->remoteLease;
m_RTT = routingPath->rtt; m_RTT = routingPath->rtt;
m_RTO = m_RTT*1.5; // TODO: implement it better m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better
} }
} }
@ -891,20 +916,32 @@ namespace stream
UpdateCurrentRemoteLease (true); UpdateCurrentRemoteLease (true);
if (m_CurrentRemoteLease && ts < m_CurrentRemoteLease->endDate + i2p::data::LEASE_ENDDATE_THRESHOLD) if (m_CurrentRemoteLease && ts < m_CurrentRemoteLease->endDate + i2p::data::LEASE_ENDDATE_THRESHOLD)
{ {
bool freshTunnel = false;
if (!m_CurrentOutboundTunnel) if (!m_CurrentOutboundTunnel)
{ {
auto leaseRouter = i2p::data::netdb.FindRouter (m_CurrentRemoteLease->tunnelGateway); auto leaseRouter = i2p::data::netdb.FindRouter (m_CurrentRemoteLease->tunnelGateway);
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (nullptr, m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (nullptr,
leaseRouter ? leaseRouter->GetCompatibleTransports (false) : (i2p::data::RouterInfo::CompatibleTransports)i2p::data::RouterInfo::eAllTransports); leaseRouter ? leaseRouter->GetCompatibleTransports (false) : (i2p::data::RouterInfo::CompatibleTransports)i2p::data::RouterInfo::eAllTransports);
freshTunnel = true;
} }
else if (!m_CurrentOutboundTunnel->IsEstablished ()) else if (!m_CurrentOutboundTunnel->IsEstablished ())
{
auto oldOutboundTunnel = m_CurrentOutboundTunnel;
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel);
if (m_CurrentOutboundTunnel && oldOutboundTunnel->GetEndpointIdentHash() != m_CurrentOutboundTunnel->GetEndpointIdentHash())
freshTunnel = true;
}
if (!m_CurrentOutboundTunnel) if (!m_CurrentOutboundTunnel)
{ {
LogPrint (eLogError, "Streaming: No outbound tunnels in the pool, sSID=", m_SendStreamID); LogPrint (eLogError, "Streaming: No outbound tunnels in the pool, sSID=", m_SendStreamID);
m_CurrentRemoteLease = nullptr; m_CurrentRemoteLease = nullptr;
return; return;
} }
if (freshTunnel)
{
m_RTO = INITIAL_RTO;
m_TunnelsChangeSequenceNumber = m_SequenceNumber; // should be determined more precisely
}
std::vector<i2p::tunnel::TunnelMessageBlock> msgs; std::vector<i2p::tunnel::TunnelMessageBlock> msgs;
for (const auto& it: packets) for (const auto& it: packets)
@ -936,10 +973,10 @@ namespace stream
if (m_RoutingSession->IsLeaseSetNonConfirmed ()) if (m_RoutingSession->IsLeaseSetNonConfirmed ())
{ {
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASET_CONFIRMATION_TIMEOUT) if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT)
{ {
// LeaseSet was not confirmed, should try other tunnels // LeaseSet was not confirmed, should try other tunnels
LogPrint (eLogWarning, "Streaming: LeaseSet was not confirmed in ", i2p::garlic::LEASET_CONFIRMATION_TIMEOUT, " milliseconds. Trying to resubmit"); LogPrint (eLogWarning, "Streaming: LeaseSet was not confirmed in ", i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT, " milliseconds. Trying to resubmit");
m_RoutingSession->SetSharedRoutingPath (nullptr); m_RoutingSession->SetSharedRoutingPath (nullptr);
m_CurrentOutboundTunnel = nullptr; m_CurrentOutboundTunnel = nullptr;
m_CurrentRemoteLease = nullptr; m_CurrentRemoteLease = nullptr;
@ -989,6 +1026,7 @@ namespace stream
{ {
if (ts >= it->sendTime + m_RTO) if (ts >= it->sendTime + m_RTO)
{ {
it->resent = true;
it->sendTime = ts; it->sendTime = ts;
packets.push_back (it); packets.push_back (it);
} }
@ -998,31 +1036,31 @@ namespace stream
if (packets.size () > 0) if (packets.size () > 0)
{ {
m_NumResendAttempts++; m_NumResendAttempts++;
m_RTO *= 2; if (m_NumResendAttempts == 1 && m_RTO != INITIAL_RTO)
switch (m_NumResendAttempts)
{ {
case 1: // congestion avoidance // congestion avoidance
m_WindowSize -= (m_WindowSize + WINDOW_SIZE_DROP_FRACTION) / WINDOW_SIZE_DROP_FRACTION; // adjustment >= 1 m_RTO *= 2;
if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; m_WindowSize -= (m_WindowSize + WINDOW_SIZE_DROP_FRACTION) / WINDOW_SIZE_DROP_FRACTION; // adjustment >= 1
break; if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE;
case 2: }
m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change first time else
#if (__cplusplus >= 201703L) // C++ 17 or higher {
[[fallthrough]]; m_TunnelsChangeSequenceNumber = m_SequenceNumber;
#endif m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change
// no break here if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr);
case 4: if (m_NumResendAttempts & 1)
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); {
UpdateCurrentRemoteLease (); // pick another lease
LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
break;
case 3:
// pick another outbound tunnel // pick another outbound tunnel
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr);
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts,
break; ", another outbound tunnel has been selected for stream with sSID=", m_SendStreamID);
default: ; }
else
{
UpdateCurrentRemoteLease (); // pick another lease
LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts,
", another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
}
} }
SendPackets (packets); SendPackets (packets);
} }
@ -1056,9 +1094,13 @@ namespace stream
{ {
if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ()) if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ())
{ {
// seems something went wrong and we should re-select tunnels auto ts = i2p::util::GetMillisecondsSinceEpoch ();
m_CurrentOutboundTunnel = nullptr; if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT)
m_CurrentRemoteLease = nullptr; {
// seems something went wrong and we should re-select tunnels
m_CurrentOutboundTunnel = nullptr;
m_CurrentRemoteLease = nullptr;
}
} }
SendQuickAck (); SendQuickAck ();
} }

13
libi2pd/Streaming.h

@ -52,12 +52,13 @@ namespace stream
const size_t STREAMING_MTU_RATCHETS = 1812; const size_t STREAMING_MTU_RATCHETS = 1812;
const size_t MAX_PACKET_SIZE = 4096; const size_t MAX_PACKET_SIZE = 4096;
const size_t COMPRESSION_THRESHOLD_SIZE = 66; const size_t COMPRESSION_THRESHOLD_SIZE = 66;
const int MAX_NUM_RESEND_ATTEMPTS = 6; const int MAX_NUM_RESEND_ATTEMPTS = 9;
const int WINDOW_SIZE = 6; // in messages const int WINDOW_SIZE = 6; // in messages
const int MIN_WINDOW_SIZE = 1; const int MIN_WINDOW_SIZE = 1;
const int MAX_WINDOW_SIZE = 128; const int MAX_WINDOW_SIZE = 128;
const int WINDOW_SIZE_DROP_FRACTION = 10; // 1/10 const int WINDOW_SIZE_DROP_FRACTION = 10; // 1/10
const double RTT_EWMA_ALPHA = 0.5; const double RTT_EWMA_ALPHA = 0.125;
const int MIN_RTO = 20; // in milliseconds
const int INITIAL_RTT = 8000; // in milliseconds const int INITIAL_RTT = 8000; // in milliseconds
const int INITIAL_RTO = 9000; // in milliseconds const int INITIAL_RTO = 9000; // in milliseconds
const int MIN_SEND_ACK_TIMEOUT = 2; // in milliseconds const int MIN_SEND_ACK_TIMEOUT = 2; // in milliseconds
@ -65,14 +66,16 @@ namespace stream
const size_t MAX_PENDING_INCOMING_BACKLOG = 128; const size_t MAX_PENDING_INCOMING_BACKLOG = 128;
const int PENDING_INCOMING_TIMEOUT = 10; // in seconds const int PENDING_INCOMING_TIMEOUT = 10; // in seconds
const int MAX_RECEIVE_TIMEOUT = 20; // in seconds const int MAX_RECEIVE_TIMEOUT = 20; // in seconds
const uint16_t DELAY_CHOKING = 60000; // in milliseconds
struct Packet struct Packet
{ {
size_t len, offset; size_t len, offset;
uint8_t buf[MAX_PACKET_SIZE]; uint8_t buf[MAX_PACKET_SIZE];
uint64_t sendTime; uint64_t sendTime;
bool resent;
Packet (): len (0), offset (0), sendTime (0) {}; Packet (): len (0), offset (0), sendTime (0), resent (false) {};
uint8_t * GetBuffer () { return buf + offset; }; uint8_t * GetBuffer () { return buf + offset; };
size_t GetLength () const { return len - offset; }; size_t GetLength () const { return len - offset; };
@ -236,6 +239,7 @@ namespace stream
boost::asio::io_service& m_Service; boost::asio::io_service& m_Service;
uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber;
uint32_t m_TunnelsChangeSequenceNumber;
int32_t m_LastReceivedSequenceNumber; int32_t m_LastReceivedSequenceNumber;
StreamStatus m_Status; StreamStatus m_Status;
bool m_IsAckSendScheduled; bool m_IsAckSendScheduled;
@ -254,7 +258,8 @@ namespace stream
uint16_t m_Port; uint16_t m_Port;
SendBufferQueue m_SendBuffer; SendBufferQueue m_SendBuffer;
int m_WindowSize, m_RTT, m_RTO, m_AckDelay; double m_RTT;
int m_WindowSize, m_RTO, m_AckDelay;
uint64_t m_LastWindowSizeIncreaseTime; uint64_t m_LastWindowSizeIncreaseTime;
int m_NumResendAttempts; int m_NumResendAttempts;
size_t m_MTU; size_t m_MTU;

6
libi2pd/version.h

@ -18,8 +18,8 @@
#define MAKE_VERSION_NUMBER(a,b,c) ((a*100+b)*100+c) #define MAKE_VERSION_NUMBER(a,b,c) ((a*100+b)*100+c)
#define I2PD_VERSION_MAJOR 2 #define I2PD_VERSION_MAJOR 2
#define I2PD_VERSION_MINOR 50 #define I2PD_VERSION_MINOR 51
#define I2PD_VERSION_MICRO 2 #define I2PD_VERSION_MICRO 0
#define I2PD_VERSION_PATCH 0 #define I2PD_VERSION_PATCH 0
#ifdef GITVER #ifdef GITVER
#define I2PD_VERSION XSTRINGIZE(GITVER) #define I2PD_VERSION XSTRINGIZE(GITVER)
@ -33,7 +33,7 @@
#define I2P_VERSION_MAJOR 0 #define I2P_VERSION_MAJOR 0
#define I2P_VERSION_MINOR 9 #define I2P_VERSION_MINOR 9
#define I2P_VERSION_MICRO 61 #define I2P_VERSION_MICRO 62
#define I2P_VERSION_PATCH 0 #define I2P_VERSION_PATCH 0
#define I2P_VERSION MAKE_VERSION(I2P_VERSION_MAJOR, I2P_VERSION_MINOR, I2P_VERSION_MICRO) #define I2P_VERSION MAKE_VERSION(I2P_VERSION_MAJOR, I2P_VERSION_MINOR, I2P_VERSION_MICRO)
#define I2P_VERSION_NUMBER MAKE_VERSION_NUMBER(I2P_VERSION_MAJOR, I2P_VERSION_MINOR, I2P_VERSION_MICRO) #define I2P_VERSION_NUMBER MAKE_VERSION_NUMBER(I2P_VERSION_MAJOR, I2P_VERSION_MINOR, I2P_VERSION_MICRO)

Loading…
Cancel
Save