diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index 6c08086f..43cd21f0 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -180,7 +180,6 @@ namespace client i2p::tunnel::tunnels.StopTunnelPool (m_Pool); } SaveTags (); - m_Service.stop (); // make sure we don't process more messages after this point. TODO: implement it better CleanUp (); // GarlicDestination } @@ -980,6 +979,7 @@ namespace client bool isPublic, const std::map * params): LeaseSetDestination (service, isPublic, params), m_Keys (keys), m_StreamingAckDelay (DEFAULT_INITIAL_ACK_DELAY), + m_StreamingOutboundSpeed (DEFAULT_MAX_OUTBOUND_SPEED), m_IsStreamingAnswerPings (DEFAULT_ANSWER_PINGS), m_LastPort (0), m_DatagramDestination (nullptr), m_RefCounter (0), m_ReadyChecker(service) @@ -1048,6 +1048,9 @@ namespace client auto it = params->find (I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY); if (it != params->end ()) m_StreamingAckDelay = std::stoi(it->second); + it = params->find (I2CP_PARAM_STREAMING_MAX_OUTBOUND_SPEED); + if (it != params->end ()) + m_StreamingOutboundSpeed = std::stoi(it->second); it = params->find (I2CP_PARAM_STREAMING_ANSWER_PINGS); if (it != params->end ()) m_IsStreamingAnswerPings = std::stoi (it->second); // 1 for true diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h index 9600594d..40c97ee8 100644 --- a/libi2pd/Destination.h +++ b/libi2pd/Destination.h @@ -84,6 +84,8 @@ namespace client // streaming const char I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY[] = "i2p.streaming.initialAckDelay"; const int DEFAULT_INITIAL_ACK_DELAY = 200; // milliseconds + const char I2CP_PARAM_STREAMING_MAX_OUTBOUND_SPEED[] = "i2p.streaming.maxOutboundSpeed"; // bytes/sec + const int DEFAULT_MAX_OUTBOUND_SPEED = 1730000000; // no more than 1.73 Gbytes/s const char I2CP_PARAM_STREAMING_ANSWER_PINGS[] = "i2p.streaming.answerPings"; const int DEFAULT_ANSWER_PINGS = true; @@ -259,6 +261,7 @@ namespace client bool IsAcceptingStreams () const; void AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor); int GetStreamingAckDelay () const { return m_StreamingAckDelay; } + int GetStreamingOutboundSpeed () const { return m_StreamingOutboundSpeed; } bool IsStreamingAnswerPings () const { return m_IsStreamingAnswerPings; } // datagram @@ -296,6 +299,7 @@ namespace client std::unique_ptr m_ECIESx25519EncryptionKey; int m_StreamingAckDelay; + int m_StreamingOutboundSpeed; bool m_IsStreamingAnswerPings; std::shared_ptr m_StreamingDestination; // default std::map > m_StreamingDestinationsByPorts; diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index d9d2b10d..f1187c26 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -70,12 +70,14 @@ namespace stream std::shared_ptr remote, int port): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), - m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsWinDropped (true), + m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsTimeOutResend (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_RTT (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_RTO (INITIAL_RTO), - m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), m_PrevRTT (INITIAL_RTT), m_Jitter (0), + m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), + m_OutboundSpeed (local.GetOwner ()->GetStreamingOutboundSpeed ()), m_PrevRTTSample (INITIAL_RTT), + m_PrevRTT (INITIAL_RTT), m_Jitter (0), m_PacingTime (INITIAL_PACING_TIME), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); @@ -85,11 +87,12 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), - m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsWinDropped (true), + m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsTimeOutResend (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), + m_OutboundSpeed (local.GetOwner ()->GetStreamingOutboundSpeed ()), m_PrevRTTSample (INITIAL_RTT), m_PrevRTT (INITIAL_RTT), m_Jitter (0), m_PacingTime (INITIAL_PACING_TIME), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) { @@ -492,10 +495,9 @@ namespace stream m_IsWinDropped = true; // don't drop window twice } if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; - int pacTime = std::round (m_RTT*1000/m_WindowSize); - m_PacingTime = std::max (MIN_PACING_TIME, pacTime); + UpdatePacingTime (); m_PrevRTT = m_RTT * 1.1 + m_Jitter; - // + bool wasInitial = m_RTO == INITIAL_RTO; m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.3 + m_Jitter)); // TODO: implement it better @@ -515,7 +517,7 @@ namespace stream m_RoutingSession->SetSharedRoutingPath ( std::make_shared ( i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0, 0})); - if (m_SentPackets.empty ()) + if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) { m_ResendTimer.cancel (); m_SendTimer.cancel (); @@ -599,7 +601,7 @@ namespace stream { ScheduleSend (); int numMsgs = m_WindowSize - m_SentPackets.size (); - if (numMsgs <= 0) return; // window is full + if (numMsgs <= 0 || !m_IsSendTime) return; // window is full else numMsgs = 1; bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet std::vector packets; @@ -701,6 +703,7 @@ namespace stream m_SentPackets.insert (it); } SendPackets (packets); + m_IsSendTime = false; if (m_Status == eStreamStatusClosing && m_SendBuffer.IsEmpty ()) SendClose (); if (isEmpty) @@ -1045,6 +1048,7 @@ namespace stream { if (ecode != boost::asio::error::operation_aborted) { + m_IsSendTime = true; if (m_IsNAcked) // || m_WindowSize < int(m_SentPackets.size ())) // resend one packet ResendPacket (); // delay-based CC @@ -1053,10 +1057,8 @@ namespace stream m_WindowSize >>= 1; // /2 m_IsWinDropped = true; // don't drop window twice if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; - int pacTime = std::round (m_RTT*1000/m_WindowSize); - m_PacingTime = std::max (MIN_PACING_TIME, pacTime); + UpdatePacingTime (); } - // else if (m_WindowSize > int(m_SentPackets.size ())) // send one packet SendBuffer (); else // pass @@ -1081,6 +1083,7 @@ namespace stream { if (ecode != boost::asio::error::operation_aborted) { + m_IsSendTime = true; if (m_RTO > INITIAL_RTO) m_RTO = INITIAL_RTO; m_SendTimer.cancel (); // if no ack's in RTO, disable fast retransmit m_IsTimeOutResend = true; @@ -1118,7 +1121,7 @@ namespace stream } // select tunnels if necessary and send - if (packets.size () > 0) + if (packets.size () > 0 && m_IsSendTime) { if (m_IsNAcked) m_NumResendAttempts = 1; else if (m_IsTimeOutResend) m_NumResendAttempts++; @@ -1130,10 +1133,8 @@ namespace stream m_WindowSize >>= 1; // /2 m_IsWinDropped = true; // don't drop window twice if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; - int pacTime = std::round (m_RTT*1000/m_WindowSize); - m_PacingTime = std::max (MIN_PACING_TIME, pacTime); + UpdatePacingTime (); } - // } else if (m_IsTimeOutResend) { @@ -1141,8 +1142,7 @@ namespace stream m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change m_WindowSize = INITIAL_WINDOW_SIZE; m_IsWinDropped = true; - int pacTime = std::round (m_RTT*1000/m_WindowSize); - m_PacingTime = std::max (MIN_PACING_TIME, pacTime); + UpdatePacingTime (); if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); if (m_NumResendAttempts & 1) { @@ -1159,6 +1159,7 @@ namespace stream } } SendPackets (packets); + m_IsSendTime = false; if (m_IsNAcked) ScheduleSend (); } else @@ -1296,6 +1297,16 @@ namespace stream if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); // TODO: count failures } + + void Stream::UpdatePacingTime () + { + m_PacingTime = std::round (m_RTT*1000/m_WindowSize); + if (m_OutboundSpeed) + { + auto minTime = (1000000LL*STREAMING_MTU)/m_OutboundSpeed; + if (m_PacingTime < minTime) m_PacingTime = minTime; + } + } StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort, bool gzip): m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 3b59adc2..6400fc02 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -53,8 +53,6 @@ namespace stream const size_t MAX_PACKET_SIZE = 4096; const size_t COMPRESSION_THRESHOLD_SIZE = 66; const int MAX_NUM_RESEND_ATTEMPTS = 10; - const int MAX_STREAM_SPEED = 1730000000; // 1 - 1730000000 // in bytes/sec // no more than 1.73 Gbytes/s - const int MIN_PACING_TIME = 1000000 * STREAMING_MTU / MAX_STREAM_SPEED; // in microseconds const int INITIAL_WINDOW_SIZE = 10; const int MIN_WINDOW_SIZE = 1; const int MAX_WINDOW_SIZE = 128; @@ -241,6 +239,8 @@ namespace stream void ScheduleAck (int timeout); void HandleAckSendTimer (const boost::system::error_code& ecode); + void UpdatePacingTime (); + private: boost::asio::io_service& m_Service; @@ -251,6 +251,7 @@ namespace stream StreamStatus m_Status; bool m_IsAckSendScheduled; bool m_IsNAcked; + bool m_IsSendTime; bool m_IsWinDropped; bool m_IsTimeOutResend; StreamingDestination& m_LocalDestination; @@ -269,7 +270,7 @@ namespace stream SendBufferQueue m_SendBuffer; double m_RTT; - int m_WindowSize, m_RTO, m_AckDelay, m_PrevRTTSample, m_PrevRTT, m_Jitter; + int m_WindowSize, m_RTO, m_AckDelay, m_OutboundSpeed, m_PrevRTTSample, m_PrevRTT, m_Jitter; uint64_t m_PacingTime; int m_NumResendAttempts; size_t m_MTU; diff --git a/libi2pd_client/ClientContext.cpp b/libi2pd_client/ClientContext.cpp index 62295842..3dfc6040 100644 --- a/libi2pd_client/ClientContext.cpp +++ b/libi2pd_client/ClientContext.cpp @@ -471,6 +471,7 @@ namespace client options[I2CP_PARAM_MIN_TUNNEL_LATENCY] = GetI2CPOption(section, I2CP_PARAM_MIN_TUNNEL_LATENCY, DEFAULT_MIN_TUNNEL_LATENCY); options[I2CP_PARAM_MAX_TUNNEL_LATENCY] = GetI2CPOption(section, I2CP_PARAM_MAX_TUNNEL_LATENCY, DEFAULT_MAX_TUNNEL_LATENCY); options[I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY] = GetI2CPOption(section, I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY); + options[I2CP_PARAM_STREAMING_MAX_OUTBOUND_SPEED] = GetI2CPOption(section, I2CP_PARAM_STREAMING_MAX_OUTBOUND_SPEED, DEFAULT_MAX_OUTBOUND_SPEED); options[I2CP_PARAM_STREAMING_ANSWER_PINGS] = GetI2CPOption(section, I2CP_PARAM_STREAMING_ANSWER_PINGS, isServer ? DEFAULT_ANSWER_PINGS : false); options[I2CP_PARAM_LEASESET_TYPE] = GetI2CPOption(section, I2CP_PARAM_LEASESET_TYPE, DEFAULT_LEASESET_TYPE); std::string encType = GetI2CPStringOption(section, I2CP_PARAM_LEASESET_ENCRYPTION_TYPE, "0,4");