2020-05-22 16:18:41 +03:00
/*
2024-01-27 12:17:59 -05:00
* Copyright ( c ) 2013 - 2024 , The PurpleI2P Project
2020-05-22 16:18:41 +03:00
*
* This file is part of Purple i2pd project and licensed under BSD3
*
* See full license text in LICENSE file at top of project tree
*/
2016-05-11 15:12:38 -04:00
# include "Crypto.h"
2013-12-12 21:36:24 -05:00
# include "Log.h"
# include "RouterInfo.h"
2013-12-19 21:19:44 -05:00
# include "RouterContext.h"
2013-12-30 20:46:33 -05:00
# include "Tunnel.h"
# include "Timestamp.h"
2014-10-05 08:54:59 -04:00
# include "Destination.h"
2013-12-12 21:36:24 -05:00
# include "Streaming.h"
namespace i2p
{
namespace stream
{
2020-11-29 14:59:34 -05:00
void SendBufferQueue : : Add ( std : : shared_ptr < SendBuffer > buf )
{
if ( buf )
2021-11-27 23:30:35 +03:00
{
2020-11-29 14:59:34 -05:00
m_Buffers . push_back ( buf ) ;
m_Size + = buf - > len ;
2021-11-27 23:30:35 +03:00
}
}
2017-02-26 15:05:14 -05:00
size_t SendBufferQueue : : Get ( uint8_t * buf , size_t len )
{
size_t offset = 0 ;
while ( ! m_Buffers . empty ( ) & & offset < len )
{
auto nextBuffer = m_Buffers . front ( ) ;
auto rem = nextBuffer - > GetRemainingSize ( ) ;
if ( offset + rem < = len )
{
// whole buffer
memcpy ( buf + offset , nextBuffer - > GetRemaningBuffer ( ) , rem ) ;
offset + = rem ;
m_Buffers . pop_front ( ) ; // delete it
2018-01-06 11:48:51 +08:00
}
2017-02-26 15:05:14 -05:00
else
{
// partially
2020-03-01 13:25:50 +03:00
rem = len - offset ;
2023-03-02 12:14:49 +02:00
memcpy ( buf + offset , nextBuffer - > GetRemaningBuffer ( ) , rem ) ;
nextBuffer - > offset + = rem ;
2017-02-26 15:05:14 -05:00
offset = len ; // break
2018-01-06 11:48:51 +08:00
}
}
2017-02-26 15:05:14 -05:00
m_Size - = offset ;
return offset ;
2018-01-06 11:48:51 +08:00
}
2017-02-26 15:05:14 -05:00
2018-01-06 11:48:51 +08:00
void SendBufferQueue : : CleanUp ( )
{
2017-02-26 15:05:14 -05:00
if ( ! m_Buffers . empty ( ) )
2018-01-06 11:48:51 +08:00
{
2017-02-26 15:05:14 -05:00
for ( auto it : m_Buffers )
it - > Cancel ( ) ;
2018-01-06 11:48:51 +08:00
m_Buffers . clear ( ) ;
2017-02-26 15:05:14 -05:00
m_Size = 0 ;
2018-01-06 11:48:51 +08:00
}
2017-02-26 15:05:14 -05:00
}
2018-01-06 11:48:51 +08:00
2024-11-25 10:08:27 -05:00
Stream : : Stream ( boost : : asio : : io_context & service , StreamingDestination & local ,
2015-03-08 19:36:33 -04:00
std : : shared_ptr < const i2p : : data : : LeaseSet > remote , int port ) : m_Service ( service ) ,
2024-08-29 18:57:14 -04:00
m_SendStreamID ( 0 ) , m_SequenceNumber ( 0 ) , m_DropWindowDelaySequenceNumber ( 0 ) ,
2024-06-29 09:17:11 -04:00
m_TunnelsChangeSequenceNumber ( 0 ) , m_LastReceivedSequenceNumber ( - 1 ) , m_PreviousReceivedSequenceNumber ( - 1 ) ,
2024-08-17 17:11:28 -04:00
m_LastConfirmedReceivedSequenceNumber ( 0 ) , // for limit inbound speed
2024-11-10 16:49:44 -05:00
m_Status ( eStreamStatusNew ) , m_IsIncoming ( false ) , m_IsAckSendScheduled ( false ) , m_IsNAcked ( false ) , m_IsFirstACK ( false ) ,
2024-08-29 18:57:14 -04:00
m_IsResendNeeded ( false ) , m_IsFirstRttSample ( false ) , m_IsSendTime ( true ) , m_IsWinDropped ( false ) ,
2024-12-05 22:15:11 -05:00
m_IsTimeOutResend ( false ) , m_IsImmediateAckRequested ( false ) , m_IsRemoteLeaseChangeInProgress ( false ) , m_DoubleWinIncCounter ( false ) , m_LocalDestination ( local ) ,
2024-06-29 09:17:11 -04:00
m_RemoteLeaseSet ( remote ) , m_ReceiveTimer ( m_Service ) , m_SendTimer ( m_Service ) , m_ResendTimer ( m_Service ) ,
2020-03-01 13:25:50 +03:00
m_AckSendTimer ( m_Service ) , m_NumSentBytes ( 0 ) , m_NumReceivedBytes ( 0 ) , m_Port ( port ) ,
2024-09-11 20:54:22 -04:00
m_RTT ( INITIAL_RTT ) , m_SlowRTT ( INITIAL_RTT ) , m_SlowRTT2 ( INITIAL_RTT ) , m_WindowSize ( INITIAL_WINDOW_SIZE ) , m_LastWindowDropSize ( 0 ) ,
2024-08-29 18:57:14 -04:00
m_WindowDropTargetSize ( 0 ) , m_WindowIncCounter ( 0 ) , m_RTO ( INITIAL_RTO ) ,
2024-07-04 13:28:18 -04:00
m_AckDelay ( local . GetOwner ( ) - > GetStreamingAckDelay ( ) ) , m_PrevRTTSample ( INITIAL_RTT ) ,
2024-08-29 18:57:14 -04:00
m_Jitter ( 0 ) , m_MinPacingTime ( 0 ) ,
2024-10-18 15:59:37 -04:00
m_PacingTime ( INITIAL_PACING_TIME ) , m_PacingTimeRem ( 0 ) , m_LastSendTime ( 0 ) , m_RemoteLeaseChangeTime ( 0 ) ,
2024-08-17 17:11:28 -04:00
m_LastACKSendTime ( 0 ) , m_PacketACKInterval ( 1 ) , m_PacketACKIntervalRem ( 0 ) , // for limit inbound speed
2024-08-27 15:33:59 -04:00
m_NumResendAttempts ( 0 ) , m_NumPacketsToSend ( 0 ) , m_MTU ( STREAMING_MTU )
2013-12-19 21:19:44 -05:00
{
2015-11-03 09:15:49 -05:00
RAND_bytes ( ( uint8_t * ) & m_RecvStreamID , 4 ) ;
2015-03-23 18:07:43 -04:00
m_RemoteIdentity = remote - > GetIdentity ( ) ;
2024-07-04 13:28:18 -04:00
auto outboundSpeed = local . GetOwner ( ) - > GetStreamingOutboundSpeed ( ) ;
if ( outboundSpeed )
2024-08-01 13:49:32 -04:00
m_MinPacingTime = ( 1000000LL * STREAMING_MTU ) / outboundSpeed ;
2024-08-17 17:11:28 -04:00
auto inboundSpeed = local . GetOwner ( ) - > GetStreamingInboundSpeed ( ) ; // for limit inbound speed
if ( inboundSpeed )
m_PacketACKInterval = ( 1000000LL * STREAMING_MTU ) / inboundSpeed ;
2017-01-28 19:23:14 +03:00
}
2013-12-19 21:19:44 -05:00
2024-11-25 10:08:27 -05:00
Stream : : Stream ( boost : : asio : : io_context & service , StreamingDestination & local ) :
2024-08-29 18:57:14 -04:00
m_Service ( service ) , m_SendStreamID ( 0 ) , m_SequenceNumber ( 0 ) , m_DropWindowDelaySequenceNumber ( 0 ) ,
2024-06-29 09:17:11 -04:00
m_TunnelsChangeSequenceNumber ( 0 ) , m_LastReceivedSequenceNumber ( - 1 ) , m_PreviousReceivedSequenceNumber ( - 1 ) ,
2024-08-17 17:11:28 -04:00
m_LastConfirmedReceivedSequenceNumber ( 0 ) , // for limit inbound speed
2024-11-10 16:49:44 -05:00
m_Status ( eStreamStatusNew ) , m_IsIncoming ( true ) , m_IsAckSendScheduled ( false ) , m_IsNAcked ( false ) , m_IsFirstACK ( false ) ,
2024-08-29 18:57:14 -04:00
m_IsResendNeeded ( false ) , m_IsFirstRttSample ( false ) , m_IsSendTime ( true ) , m_IsWinDropped ( false ) ,
2024-12-05 22:15:11 -05:00
m_IsTimeOutResend ( false ) , m_IsImmediateAckRequested ( false ) , m_IsRemoteLeaseChangeInProgress ( false ) , m_DoubleWinIncCounter ( false ) , m_LocalDestination ( local ) ,
2024-06-29 09:17:11 -04:00
m_ReceiveTimer ( m_Service ) , m_SendTimer ( m_Service ) , m_ResendTimer ( m_Service ) , m_AckSendTimer ( m_Service ) ,
2024-09-11 20:54:22 -04:00
m_NumSentBytes ( 0 ) , m_NumReceivedBytes ( 0 ) , m_Port ( 0 ) , m_RTT ( INITIAL_RTT ) , m_SlowRTT ( INITIAL_RTT ) , m_SlowRTT2 ( INITIAL_RTT ) ,
2024-08-29 18:57:14 -04:00
m_WindowSize ( INITIAL_WINDOW_SIZE ) , m_LastWindowDropSize ( 0 ) , m_WindowDropTargetSize ( 0 ) , m_WindowIncCounter ( 0 ) ,
2024-08-01 13:49:32 -04:00
m_RTO ( INITIAL_RTO ) , m_AckDelay ( local . GetOwner ( ) - > GetStreamingAckDelay ( ) ) ,
2024-08-29 18:57:14 -04:00
m_PrevRTTSample ( INITIAL_RTT ) , m_Jitter ( 0 ) , m_MinPacingTime ( 0 ) ,
2024-10-18 15:59:37 -04:00
m_PacingTime ( INITIAL_PACING_TIME ) , m_PacingTimeRem ( 0 ) , m_LastSendTime ( 0 ) , m_RemoteLeaseChangeTime ( 0 ) ,
2024-08-17 17:11:28 -04:00
m_LastACKSendTime ( 0 ) , m_PacketACKInterval ( 1 ) , m_PacketACKIntervalRem ( 0 ) , // for limit inbound speed
2024-08-27 15:33:59 -04:00
m_NumResendAttempts ( 0 ) , m_NumPacketsToSend ( 0 ) , m_MTU ( STREAMING_MTU )
2014-08-01 14:54:14 -04:00
{
2015-11-03 09:15:49 -05:00
RAND_bytes ( ( uint8_t * ) & m_RecvStreamID , 4 ) ;
2024-07-04 13:28:18 -04:00
auto outboundSpeed = local . GetOwner ( ) - > GetStreamingOutboundSpeed ( ) ;
if ( outboundSpeed )
2024-08-17 17:11:28 -04:00
m_MinPacingTime = ( 1000000LL * STREAMING_MTU ) / outboundSpeed ;
auto inboundSpeed = local . GetOwner ( ) - > GetStreamingInboundSpeed ( ) ; // for limit inbound speed
if ( inboundSpeed )
m_PacketACKInterval = ( 1000000LL * STREAMING_MTU ) / inboundSpeed ;
2014-08-01 14:54:14 -04:00
}
2014-01-10 20:21:38 -05:00
Stream : : ~ Stream ( )
2017-01-28 19:23:14 +03:00
{
CleanUp ( ) ;
2015-12-18 13:33:58 +00:00
LogPrint ( eLogDebug , " Streaming: Stream deleted " ) ;
2017-01-28 19:23:14 +03:00
}
2015-03-09 22:05:26 -04:00
2021-11-12 18:33:51 +02:00
void Stream : : Terminate ( bool deleteFromDestination ) // should be called from StreamingDestination::Stop only
2015-03-09 22:05:26 -04:00
{
2021-08-11 12:23:43 -04:00
m_Status = eStreamStatusTerminated ;
2015-03-09 22:05:26 -04:00
m_AckSendTimer . cancel ( ) ;
m_ReceiveTimer . cancel ( ) ;
m_ResendTimer . cancel ( ) ;
2024-06-30 08:11:12 -04:00
m_SendTimer . cancel ( ) ;
2017-01-28 19:23:14 +03:00
//CleanUp (); /* Need to recheck - broke working on windows */
2020-03-04 18:31:22 -05:00
if ( deleteFromDestination )
m_LocalDestination . DeleteStream ( shared_from_this ( ) ) ;
2017-01-28 19:23:14 +03:00
}
2017-01-22 21:22:12 -05:00
void Stream : : CleanUp ( )
{
2023-11-14 09:39:36 -05:00
m_SendBuffer . CleanUp ( ) ;
2017-01-22 21:22:12 -05:00
while ( ! m_ReceiveQueue . empty ( ) )
{
auto packet = m_ReceiveQueue . front ( ) ;
m_ReceiveQueue . pop ( ) ;
m_LocalDestination . DeletePacket ( packet ) ;
}
2024-08-27 15:33:59 -04:00
m_NACKedPackets . clear ( ) ;
2017-01-28 19:23:14 +03:00
2017-01-22 21:22:12 -05:00
for ( auto it : m_SentPackets )
m_LocalDestination . DeletePacket ( it ) ;
m_SentPackets . clear ( ) ;
2017-01-28 19:23:14 +03:00
2017-01-22 21:22:12 -05:00
for ( auto it : m_SavedPackets )
m_LocalDestination . DeletePacket ( it ) ;
m_SavedPackets . clear ( ) ;
2017-01-28 19:23:14 +03:00
}
2014-01-10 20:21:38 -05:00
void Stream : : HandleNextPacket ( Packet * packet )
{
2024-02-18 21:28:06 -05:00
if ( m_Status = = eStreamStatusTerminated )
{
m_LocalDestination . DeletePacket ( packet ) ;
return ;
}
2014-10-13 17:03:27 -04:00
m_NumReceivedBytes + = packet - > GetLength ( ) ;
2017-01-28 19:23:14 +03:00
if ( ! m_SendStreamID )
2023-03-11 08:44:07 -05:00
{
2017-01-28 19:23:14 +03:00
m_SendStreamID = packet - > GetReceiveStreamID ( ) ;
2023-03-11 08:44:07 -05:00
if ( ! m_RemoteIdentity & & packet - > GetNACKCount ( ) = = 8 & & // first incoming packet
memcmp ( packet - > GetNACKs ( ) , m_LocalDestination . GetOwner ( ) - > GetIdentHash ( ) , 32 ) )
{
LogPrint ( eLogWarning , " Streaming: Destination mismatch for " , m_LocalDestination . GetOwner ( ) - > GetIdentHash ( ) . ToBase32 ( ) ) ;
m_LocalDestination . DeletePacket ( packet ) ;
return ;
}
}
2014-08-01 14:54:14 -04:00
2014-08-10 18:27:23 -04:00
if ( ! packet - > IsNoAck ( ) ) // ack received
ProcessAck ( packet ) ;
2017-01-28 19:23:14 +03:00
2014-08-06 19:19:59 -04:00
int32_t receivedSeqn = packet - > GetSeqn ( ) ;
2024-09-09 22:37:35 -04:00
if ( ! receivedSeqn & & m_LastReceivedSequenceNumber > = 0 )
2014-08-06 15:44:00 -04:00
{
2024-09-09 22:26:03 -04:00
uint16_t flags = packet - > GetFlags ( ) ;
if ( flags )
// plain ack with options
ProcessOptions ( flags , packet ) ;
else
// plain ack
2024-09-10 12:22:42 -04:00
{
LogPrint ( eLogDebug , " Streaming: Plain ACK received " ) ;
if ( m_IsImmediateAckRequested )
{
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2024-09-11 11:24:51 -04:00
if ( m_IsFirstRttSample )
{
m_RTT = ts - m_LastSendTime ;
m_IsFirstRttSample = false ;
}
else
m_RTT = ( m_RTT + ( ts - m_LastSendTime ) ) / 2 ;
2024-09-10 12:22:42 -04:00
m_IsImmediateAckRequested = false ;
}
}
2017-01-10 21:31:52 -05:00
m_LocalDestination . DeletePacket ( packet ) ;
2014-08-06 15:44:00 -04:00
return ;
}
2016-06-29 01:00:00 +00:00
LogPrint ( eLogDebug , " Streaming: Received seqn= " , receivedSeqn , " on sSID= " , m_SendStreamID ) ;
2016-02-15 15:16:53 -05:00
if ( receivedSeqn = = m_LastReceivedSequenceNumber + 1 )
2017-01-28 19:23:14 +03:00
{
2014-02-01 22:20:41 -05:00
// we have received next in sequence message
ProcessPacket ( packet ) ;
2024-02-18 21:28:06 -05:00
if ( m_Status = = eStreamStatusTerminated ) return ;
2014-01-26 18:22:30 -05:00
// we should also try stored messages if any
for ( auto it = m_SavedPackets . begin ( ) ; it ! = m_SavedPackets . end ( ) ; )
2017-01-28 19:23:14 +03:00
{
2014-08-06 19:19:59 -04:00
if ( ( * it ) - > GetSeqn ( ) = = ( uint32_t ) ( m_LastReceivedSequenceNumber + 1 ) )
2014-01-26 18:22:30 -05:00
{
2014-02-01 22:20:41 -05:00
Packet * savedPacket = * it ;
2014-01-26 18:22:30 -05:00
m_SavedPackets . erase ( it + + ) ;
2014-02-01 22:20:41 -05:00
ProcessPacket ( savedPacket ) ;
2024-02-18 21:28:06 -05:00
if ( m_Status = = eStreamStatusTerminated ) return ;
2014-01-26 18:22:30 -05:00
}
else
break ;
2014-03-25 17:43:36 -04:00
}
2014-10-10 11:53:27 -04:00
// schedule ack for last message
2015-03-08 19:36:33 -04:00
if ( m_Status = = eStreamStatusOpen )
2014-10-10 11:53:27 -04:00
{
if ( ! m_IsAckSendScheduled )
{
2016-02-13 23:10:51 -05:00
auto ackTimeout = m_RTT / 10 ;
2017-11-24 15:37:17 -05:00
if ( ackTimeout > m_AckDelay ) ackTimeout = m_AckDelay ;
2024-01-27 12:17:59 -05:00
ScheduleAck ( ackTimeout ) ;
2014-10-10 11:53:27 -04:00
}
2017-01-28 19:23:14 +03:00
}
2023-03-23 19:52:39 -04:00
else if ( packet - > IsSYN ( ) )
2014-08-06 22:08:57 -04:00
// we have to send SYN back to incoming connection
2017-01-28 19:23:14 +03:00
SendBuffer ( ) ; // also sets m_IsOpen
}
else
{
2014-01-19 12:01:12 -05:00
if ( receivedSeqn < = m_LastReceivedSequenceNumber )
{
2015-01-29 19:17:44 -05:00
// we have received duplicate
2016-06-29 01:00:00 +00:00
LogPrint ( eLogWarning , " Streaming: Duplicate message " , receivedSeqn , " on sSID= " , m_SendStreamID ) ;
2024-06-29 09:17:11 -04:00
if ( receivedSeqn < = m_PreviousReceivedSequenceNumber | | receivedSeqn = = m_LastReceivedSequenceNumber )
{
m_CurrentOutboundTunnel = m_LocalDestination . GetOwner ( ) - > GetTunnelPool ( ) - > GetNextOutboundTunnel ( m_CurrentOutboundTunnel ) ;
2024-10-18 15:59:37 -04:00
CancelRemoteLeaseChange ( ) ;
2024-06-29 09:17:11 -04:00
UpdateCurrentRemoteLease ( ) ;
}
m_PreviousReceivedSequenceNumber = receivedSeqn ;
2017-01-10 21:31:52 -05:00
m_LocalDestination . DeletePacket ( packet ) ; // packet dropped
2024-08-18 13:33:16 -04:00
if ( ! m_IsAckSendScheduled )
{
SendQuickAck ( ) ; // resend ack for previous message again
auto ackTimeout = m_RTT / 10 ;
if ( ackTimeout > m_AckDelay ) ackTimeout = m_AckDelay ;
ScheduleAck ( ackTimeout ) ;
}
2017-01-28 19:23:14 +03:00
}
2014-01-19 12:01:12 -05:00
else
{
2016-06-29 01:00:00 +00:00
LogPrint ( eLogWarning , " Streaming: Missing messages on sSID= " , m_SendStreamID , " : from " , m_LastReceivedSequenceNumber + 1 , " to " , receivedSeqn - 1 ) ;
2014-01-26 18:22:30 -05:00
// save message and wait for missing message again
SavePacket ( packet ) ;
2015-03-23 18:07:43 -04:00
if ( m_LastReceivedSequenceNumber > = 0 )
2024-01-27 17:57:50 -05:00
{
if ( ! m_IsAckSendScheduled )
2024-01-28 18:12:40 -05:00
{
2024-01-27 17:57:50 -05:00
// send NACKs for missing messages
2024-08-17 08:30:16 -04:00
SendQuickAck ( ) ;
auto ackTimeout = m_RTT / 10 ;
2024-01-28 18:12:40 -05:00
if ( ackTimeout > m_AckDelay ) ackTimeout = m_AckDelay ;
ScheduleAck ( ackTimeout ) ;
}
2024-01-27 17:57:50 -05:00
}
2015-03-23 18:07:43 -04:00
else
// wait for SYN
2024-01-27 12:17:59 -05:00
ScheduleAck ( SYN_TIMEOUT ) ;
2017-01-28 19:23:14 +03:00
}
}
}
2014-02-01 22:20:41 -05:00
void Stream : : SavePacket ( Packet * packet )
{
2016-03-27 12:06:00 -04:00
if ( ! m_SavedPackets . insert ( packet ) . second )
2017-01-10 21:31:52 -05:00
m_LocalDestination . DeletePacket ( packet ) ;
2017-01-28 19:23:14 +03:00
}
2014-02-01 22:20:41 -05:00
void Stream : : ProcessPacket ( Packet * packet )
{
uint32_t receivedSeqn = packet - > GetSeqn ( ) ;
uint16_t flags = packet - > GetFlags ( ) ;
2015-12-18 13:33:58 +00:00
LogPrint ( eLogDebug , " Streaming: Process seqn= " , receivedSeqn , " , flags= " , flags ) ;
2017-01-28 19:23:14 +03:00
2019-02-05 15:32:18 -05:00
if ( ! ProcessOptions ( flags , packet ) )
{
m_LocalDestination . DeletePacket ( packet ) ;
Terminate ( ) ;
return ;
}
2020-03-01 13:25:50 +03:00
2019-02-05 15:32:18 -05:00
packet - > offset = packet - > GetPayload ( ) - packet - > buf ;
if ( packet - > GetLength ( ) > 0 )
{
m_ReceiveQueue . push ( packet ) ;
m_ReceiveTimer . cancel ( ) ;
}
else
m_LocalDestination . DeletePacket ( packet ) ;
2014-02-01 22:20:41 -05:00
2019-02-05 15:32:18 -05:00
m_LastReceivedSequenceNumber = receivedSeqn ;
if ( flags & PACKET_FLAG_RESET )
{
LogPrint ( eLogDebug , " Streaming: closing stream sSID= " , m_SendStreamID , " , rSID= " , m_RecvStreamID , " : reset flag received in packet # " , receivedSeqn ) ;
m_Status = eStreamStatusReset ;
Close ( ) ;
}
else if ( flags & PACKET_FLAG_CLOSE )
{
if ( m_Status ! = eStreamStatusClosed )
SendClose ( ) ;
m_Status = eStreamStatusClosed ;
Terminate ( ) ;
}
}
bool Stream : : ProcessOptions ( uint16_t flags , Packet * packet )
{
const uint8_t * optionData = packet - > GetOptionData ( ) ;
2020-03-01 13:25:50 +03:00
size_t optionSize = packet - > GetOptionSize ( ) ;
2024-08-14 13:43:24 -04:00
if ( optionSize > packet - > len )
{
LogPrint ( eLogInfo , " Streaming: Invalid option size " , optionSize , " Discarded " ) ;
return false ;
}
2024-09-03 13:00:04 -04:00
if ( ! flags ) return true ;
bool immediateAckRequested = false ;
2014-08-06 19:19:59 -04:00
if ( flags & PACKET_FLAG_DELAY_REQUESTED )
2021-11-27 23:30:35 +03:00
{
2024-09-03 13:00:04 -04:00
uint16_t delayRequested = bufbe16toh ( optionData ) ;
if ( ! delayRequested ) // 0 requests an immediate ack
immediateAckRequested = true ;
else if ( ! m_IsAckSendScheduled )
2021-07-08 22:22:00 -04:00
{
2024-09-03 13:00:04 -04:00
if ( delayRequested < m_RTT )
2021-11-27 23:30:35 +03:00
{
2021-07-08 22:22:00 -04:00
m_IsAckSendScheduled = true ;
m_AckSendTimer . expires_from_now ( boost : : posix_time : : milliseconds ( delayRequested ) ) ;
m_AckSendTimer . async_wait ( std : : bind ( & Stream : : HandleAckSendTimer ,
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
2021-11-27 23:30:35 +03:00
}
2024-04-02 18:49:16 -04:00
if ( delayRequested > = DELAY_CHOKING )
2024-08-01 13:49:32 -04:00
{
2024-08-29 18:57:14 -04:00
if ( ! m_IsWinDropped )
{
m_WindowDropTargetSize = MIN_WINDOW_SIZE ;
m_LastWindowDropSize = 0 ;
m_WindowIncCounter = 0 ;
m_IsWinDropped = true ; // don't drop window twice
m_DropWindowDelaySequenceNumber = m_SequenceNumber ;
UpdatePacingTime ( ) ;
}
2024-08-01 13:49:32 -04:00
}
2021-07-08 22:22:00 -04:00
}
2014-08-06 19:19:59 -04:00
optionData + = 2 ;
2021-11-27 23:30:35 +03:00
}
2017-01-28 19:23:14 +03:00
2014-02-01 22:20:41 -05:00
if ( flags & PACKET_FLAG_FROM_INCLUDED )
{
2019-02-06 21:19:44 -05:00
if ( m_RemoteLeaseSet ) m_RemoteIdentity = m_RemoteLeaseSet - > GetIdentity ( ) ;
if ( ! m_RemoteIdentity )
m_RemoteIdentity = std : : make_shared < i2p : : data : : IdentityEx > ( optionData , optionSize ) ;
2017-11-28 11:33:51 -05:00
if ( m_RemoteIdentity - > IsRSA ( ) )
{
2020-03-01 13:25:50 +03:00
LogPrint ( eLogInfo , " Streaming: Incoming stream from RSA destination " , m_RemoteIdentity - > GetIdentHash ( ) . ToBase64 ( ) , " Discarded " ) ;
2019-02-05 15:32:18 -05:00
return false ;
2017-11-28 11:33:51 -05:00
}
2015-11-03 09:15:49 -05:00
optionData + = m_RemoteIdentity - > GetFullLen ( ) ;
2014-08-06 19:19:59 -04:00
if ( ! m_RemoteLeaseSet )
2016-06-29 01:00:00 +00:00
LogPrint ( eLogDebug , " Streaming: Incoming stream from " , m_RemoteIdentity - > GetIdentHash ( ) . ToBase64 ( ) , " , sSID= " , m_SendStreamID , " , rSID= " , m_RecvStreamID ) ;
2017-01-28 19:23:14 +03:00
}
2014-02-01 22:20:41 -05:00
2014-08-04 18:22:54 -04:00
if ( flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED )
{
2014-12-29 23:04:02 +01:00
uint16_t maxPacketSize = bufbe16toh ( optionData ) ;
2015-12-23 20:47:44 -05:00
LogPrint ( eLogDebug , " Streaming: Max packet size " , maxPacketSize ) ;
2014-08-04 18:22:54 -04:00
optionData + = 2 ;
2017-01-28 19:23:14 +03:00
}
2019-02-05 15:32:18 -05:00
if ( flags & PACKET_FLAG_OFFLINE_SIGNATURE )
{
if ( ! m_RemoteIdentity )
{
LogPrint ( eLogInfo , " Streaming: offline signature without identity " ) ;
return false ;
}
2019-04-08 22:22:42 +03:00
// if we have it in LeaseSet already we don't need to parse it again
2019-02-06 13:36:03 -05:00
if ( m_RemoteLeaseSet ) m_TransientVerifier = m_RemoteLeaseSet - > GetTransientVerifier ( ) ;
if ( m_TransientVerifier )
2019-02-05 15:32:18 -05:00
{
2019-02-06 13:36:03 -05:00
// skip option data
optionData + = 6 ; // timestamp and key type
optionData + = m_TransientVerifier - > GetPublicKeyLen ( ) ; // public key
optionData + = m_RemoteIdentity - > GetSignatureLen ( ) ; // signature
2019-02-05 15:32:18 -05:00
}
2019-02-06 13:36:03 -05:00
else
2019-02-05 15:32:18 -05:00
{
2019-02-06 13:36:03 -05:00
// transient key
size_t offset = 0 ;
m_TransientVerifier = i2p : : data : : ProcessOfflineSignature ( m_RemoteIdentity , optionData , optionSize - ( optionData - packet - > GetOptionData ( ) ) , offset ) ;
optionData + = offset ;
2020-03-01 13:25:50 +03:00
if ( ! m_TransientVerifier )
2019-02-06 13:36:03 -05:00
{
LogPrint ( eLogError , " Streaming: offline signature failed " ) ;
return false ;
2020-03-01 13:25:50 +03:00
}
2019-02-05 15:32:18 -05:00
}
}
2014-08-04 18:22:54 -04:00
if ( flags & PACKET_FLAG_SIGNATURE_INCLUDED )
{
2017-11-23 13:45:46 -05:00
uint8_t signature [ 256 ] ;
2020-11-03 09:20:14 -05:00
auto signatureLen = m_TransientVerifier ? m_TransientVerifier - > GetSignatureLen ( ) : m_RemoteIdentity - > GetSignatureLen ( ) ;
2017-11-23 12:26:42 -05:00
if ( signatureLen < = sizeof ( signature ) )
2017-01-28 19:23:14 +03:00
{
2017-11-23 12:26:42 -05:00
memcpy ( signature , optionData , signatureLen ) ;
memset ( const_cast < uint8_t * > ( optionData ) , 0 , signatureLen ) ;
2020-03-01 13:25:50 +03:00
bool verified = m_TransientVerifier ?
2019-02-05 15:32:18 -05:00
m_TransientVerifier - > Verify ( packet - > GetBuffer ( ) , packet - > GetLength ( ) , signature ) :
m_RemoteIdentity - > Verify ( packet - > GetBuffer ( ) , packet - > GetLength ( ) , signature ) ;
if ( ! verified )
2017-11-23 12:26:42 -05:00
{
LogPrint ( eLogError , " Streaming: Signature verification failed, sSID= " , m_SendStreamID , " , rSID= " , m_RecvStreamID ) ;
Close ( ) ;
flags | = PACKET_FLAG_CLOSE ;
}
memcpy ( const_cast < uint8_t * > ( optionData ) , signature , signatureLen ) ;
optionData + = signatureLen ;
}
else
{
2019-02-05 15:32:18 -05:00
LogPrint ( eLogError , " Streaming: Signature too big, " , signatureLen , " bytes " ) ;
return false ;
2017-01-28 19:23:14 +03:00
}
}
2024-09-03 13:00:04 -04:00
if ( immediateAckRequested )
SendQuickAck ( ) ;
2020-03-01 13:25:50 +03:00
return true ;
2017-01-28 19:23:14 +03:00
}
2014-08-10 18:27:23 -04:00
2020-09-30 17:12:28 -04:00
void Stream : : HandlePing ( Packet * packet )
{
uint16_t flags = packet - > GetFlags ( ) ;
if ( ProcessOptions ( flags , packet ) & & m_RemoteIdentity )
{
// send pong
Packet p ;
memset ( p . buf , 0 , 22 ) ; // minimal header all zeroes
memcpy ( p . buf + 4 , packet - > buf , 4 ) ; // but receiveStreamID is the sendStreamID from the ping
htobe16buf ( p . buf + 18 , PACKET_FLAG_ECHO ) ; // and echo flag
2023-02-28 19:41:17 +02:00
auto payloadLen = int ( packet - > len ) - ( packet - > GetPayload ( ) - packet - > buf ) ;
2020-09-30 17:12:28 -04:00
if ( payloadLen > 0 )
memcpy ( p . buf + 22 , packet - > GetPayload ( ) , payloadLen ) ;
else
payloadLen = 0 ;
p . len = payloadLen + 22 ;
SendPackets ( std : : vector < Packet * > { & p } ) ;
LogPrint ( eLogDebug , " Streaming: Pong of " , p . len , " bytes sent " ) ;
2021-11-27 23:30:35 +03:00
}
2020-09-30 17:12:28 -04:00
m_LocalDestination . DeletePacket ( packet ) ;
2021-11-27 23:30:35 +03:00
}
2014-08-10 18:27:23 -04:00
void Stream : : ProcessAck ( Packet * packet )
{
2015-01-25 17:43:34 -05:00
bool acknowledged = false ;
2015-02-03 13:46:44 -05:00
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2014-08-10 18:27:23 -04:00
uint32_t ackThrough = packet - > GetAckThrough ( ) ;
2024-08-27 15:33:59 -04:00
m_NACKedPackets . clear ( ) ;
2016-06-09 14:34:38 -04:00
if ( ackThrough > m_SequenceNumber )
{
LogPrint ( eLogError , " Streaming: Unexpected ackThrough= " , ackThrough , " > seqn= " , m_SequenceNumber ) ;
return ;
2017-01-28 19:23:14 +03:00
}
2024-03-19 08:28:34 +02:00
int rttSample = INT_MAX ;
2024-12-05 22:15:11 -05:00
int incCounter = 0 ;
2024-06-29 09:17:11 -04:00
m_IsNAcked = false ;
2024-08-01 13:49:32 -04:00
m_IsResendNeeded = false ;
2014-08-12 15:57:23 -04:00
int nackCount = packet - > GetNACKCount ( ) ;
2014-08-10 18:27:23 -04:00
for ( auto it = m_SentPackets . begin ( ) ; it ! = m_SentPackets . end ( ) ; )
2016-08-09 01:53:37 +03:00
{
2014-08-12 15:57:23 -04:00
auto seqn = ( * it ) - > GetSeqn ( ) ;
if ( seqn < = ackThrough )
2014-08-10 18:27:23 -04:00
{
2014-08-12 15:57:23 -04:00
if ( nackCount > 0 )
{
bool nacked = false ;
for ( int i = 0 ; i < nackCount ; i + + )
if ( seqn = = packet - > GetNACK ( i ) )
{
2024-08-27 15:33:59 -04:00
m_NACKedPackets . insert ( * it ) ;
2024-06-29 09:17:11 -04:00
m_IsNAcked = true ;
2014-08-12 15:57:23 -04:00
nacked = true ;
break ;
}
if ( nacked )
{
2015-12-18 13:33:58 +00:00
LogPrint ( eLogDebug , " Streaming: Packet " , seqn , " NACK " ) ;
2016-08-09 01:53:37 +03:00
+ + it ;
2014-08-12 15:57:23 -04:00
continue ;
2017-01-28 19:23:14 +03:00
}
2014-08-12 15:57:23 -04:00
}
2014-08-10 18:27:23 -04:00
auto sentPacket = * it ;
2024-03-16 19:29:47 +02:00
int64_t rtt = ( int64_t ) ts - ( int64_t ) sentPacket - > sendTime ;
if ( rtt < 0 )
LogPrint ( eLogError , " Streaming: Packet " , seqn , " sent from the future, sendTime= " , sentPacket - > sendTime ) ;
if ( ! seqn )
2024-03-19 08:28:34 +02:00
{
2024-08-01 13:49:32 -04:00
m_IsFirstRttSample = true ;
2024-03-19 08:28:34 +02:00
rttSample = rtt < 0 ? 1 : rtt ;
}
2024-03-19 08:43:49 +02:00
else if ( ! sentPacket - > resent & & seqn > m_TunnelsChangeSequenceNumber & & rtt > = 0 )
2024-03-19 08:28:34 +02:00
rttSample = std : : min ( rttSample , ( int ) rtt ) ;
2016-07-28 10:01:20 -04:00
LogPrint ( eLogDebug , " Streaming: Packet " , seqn , " acknowledged rtt= " , rtt , " sentTime= " , sentPacket - > sendTime ) ;
2014-08-10 18:27:23 -04:00
m_SentPackets . erase ( it + + ) ;
2017-01-28 19:23:14 +03:00
m_LocalDestination . DeletePacket ( sentPacket ) ;
2015-01-25 17:43:34 -05:00
acknowledged = true ;
2024-08-01 13:49:32 -04:00
if ( m_WindowSize < MAX_WINDOW_SIZE & & ! m_IsFirstACK )
2024-12-05 22:15:11 -05:00
incCounter + + ;
2014-08-10 18:27:23 -04:00
}
else
break ;
}
2024-03-19 08:28:34 +02:00
if ( rttSample ! = INT_MAX )
{
2024-08-29 18:57:14 -04:00
if ( m_IsFirstRttSample & & ! m_IsFirstACK )
2024-06-29 09:17:11 -04:00
{
2024-03-19 08:28:34 +02:00
m_RTT = rttSample ;
2024-08-01 13:49:32 -04:00
m_SlowRTT = rttSample ;
2024-09-11 20:54:22 -04:00
m_SlowRTT2 = rttSample ;
2024-06-30 08:11:12 -04:00
m_PrevRTTSample = rttSample ;
2024-09-11 20:54:22 -04:00
m_Jitter = rttSample / 10 ; // 10%
2024-12-05 22:15:11 -05:00
m_Jitter + = 15 ; // for low-latency connections
2024-08-01 13:49:32 -04:00
m_IsFirstRttSample = false ;
2024-06-29 09:17:11 -04:00
}
2024-03-19 08:28:34 +02:00
else
2024-09-11 20:54:22 -04:00
m_RTT = ( m_PrevRTTSample + rttSample ) / 2 ;
if ( ! m_IsWinDropped )
{
m_SlowRTT = SLOWRTT_EWMA_ALPHA * m_RTT + ( 1.0 - SLOWRTT_EWMA_ALPHA ) * m_SlowRTT ;
m_SlowRTT2 = RTT_EWMA_ALPHA * m_RTT + ( 1.0 - RTT_EWMA_ALPHA ) * m_SlowRTT2 ;
// calculate jitter
double jitter = 0 ;
if ( rttSample > m_PrevRTTSample )
jitter = rttSample - m_PrevRTTSample ;
else if ( rttSample < m_PrevRTTSample )
jitter = m_PrevRTTSample - rttSample ;
else
jitter = rttSample / 10 ; // 10%
2024-12-05 22:15:11 -05:00
jitter + = 15 ; // for low-latency connections
2024-09-11 20:54:22 -04:00
m_Jitter = ( 0.05 * jitter ) + ( 1.0 - 0.05 ) * m_Jitter ;
}
2024-12-05 22:15:11 -05:00
if ( rttSample > m_SlowRTT )
{
incCounter = 0 ;
m_DoubleWinIncCounter = 1 ;
}
else if ( rttSample < m_SlowRTT )
{
if ( m_DoubleWinIncCounter )
{
incCounter = incCounter * 2 ;
m_DoubleWinIncCounter = 0 ;
}
}
m_WindowIncCounter = m_WindowIncCounter + incCounter ;
2024-06-29 09:17:11 -04:00
//
// delay-based CC
2024-09-11 20:54:22 -04:00
if ( ( m_SlowRTT2 > m_SlowRTT + m_Jitter & & rttSample > m_SlowRTT2 & & rttSample > m_PrevRTTSample ) & & ! m_IsWinDropped ) // Drop window if RTT grows too fast, late detection
2024-08-29 18:57:14 -04:00
ProcessWindowDrop ( ) ;
2024-07-04 13:07:57 -04:00
UpdatePacingTime ( ) ;
2024-08-29 18:57:14 -04:00
m_PrevRTTSample = rttSample ;
2024-07-04 13:07:57 -04:00
2024-03-19 08:43:49 +02:00
bool wasInitial = m_RTO = = INITIAL_RTO ;
2024-06-29 09:17:11 -04:00
m_RTO = std : : max ( MIN_RTO , ( int ) ( m_RTT * 1.3 + m_Jitter ) ) ; // TODO: implement it better
2024-03-19 08:43:49 +02:00
if ( wasInitial )
ScheduleResend ( ) ;
2024-03-19 08:28:34 +02:00
}
2024-09-11 20:54:22 -04:00
if ( m_IsWinDropped & & ackThrough > m_DropWindowDelaySequenceNumber )
{
m_IsFirstRttSample = true ;
2024-06-29 09:17:11 -04:00
m_IsWinDropped = false ;
2024-09-11 20:54:22 -04:00
}
if ( m_WindowDropTargetSize & & m_WindowSize < = m_WindowDropTargetSize )
2024-08-29 18:57:14 -04:00
{
2024-09-11 20:54:22 -04:00
m_WindowDropTargetSize = 0 ;
m_DropWindowDelaySequenceNumber = m_SequenceNumber ;
}
if ( acknowledged & & m_WindowDropTargetSize & & m_WindowSize > m_WindowDropTargetSize )
{
m_RTO = std : : max ( MIN_RTO , ( int ) ( m_RTT * 1.5 + m_Jitter ) ) ; // we assume that the next rtt sample may be much larger than the current
m_IsResendNeeded = true ;
m_WindowSize = m_SentPackets . size ( ) + 1 ; // if there are no packets to resend, just send one regular packet
2024-08-29 18:57:14 -04:00
if ( m_WindowSize < MIN_WINDOW_SIZE ) m_WindowSize = MIN_WINDOW_SIZE ;
2024-09-11 20:54:22 -04:00
if ( m_WindowSize > MAX_WINDOW_SIZE ) m_WindowSize = MAX_WINDOW_SIZE ;
2024-08-29 18:57:14 -04:00
m_WindowIncCounter = 0 ;
UpdatePacingTime ( ) ;
}
2024-06-29 09:17:11 -04:00
if ( acknowledged | | m_IsNAcked )
2024-03-19 08:28:34 +02:00
{
2024-06-29 09:17:11 -04:00
ScheduleResend ( ) ;
2024-03-19 08:28:34 +02:00
}
2024-09-11 20:54:22 -04:00
if ( m_SendBuffer . IsEmpty ( ) & & m_SentPackets . size ( ) > 0 ) // tail loss
2024-08-01 13:49:32 -04:00
{
m_IsResendNeeded = true ;
2024-09-11 20:54:22 -04:00
m_RTO = std : : max ( MIN_RTO , ( int ) ( m_RTT * 1.5 + m_Jitter ) ) ; // to prevent spurious retransmit
2024-08-01 13:49:32 -04:00
}
2024-07-04 13:07:57 -04:00
if ( m_SentPackets . empty ( ) & & m_SendBuffer . IsEmpty ( ) )
2024-06-29 09:17:11 -04:00
{
2020-05-21 19:38:25 -04:00
m_ResendTimer . cancel ( ) ;
2024-06-29 09:17:11 -04:00
m_SendTimer . cancel ( ) ;
}
2024-08-29 18:57:14 -04:00
if ( acknowledged & & m_IsFirstACK )
{
if ( m_RoutingSession )
m_RoutingSession - > SetSharedRoutingPath (
std : : make_shared < i2p : : garlic : : GarlicRoutingPath > (
i2p : : garlic : : GarlicRoutingPath { m_CurrentOutboundTunnel , m_CurrentRemoteLease , ( int ) m_RTT , 0 } ) ) ;
m_IsFirstACK = false ;
}
2015-01-25 17:43:34 -05:00
if ( acknowledged )
2015-03-21 16:26:14 -04:00
{
m_NumResendAttempts = 0 ;
2024-08-29 18:57:14 -04:00
m_IsTimeOutResend = false ;
2015-01-25 17:43:34 -05:00
SendBuffer ( ) ;
2017-01-28 19:23:14 +03:00
}
2016-07-05 17:52:11 -04:00
if ( m_Status = = eStreamStatusClosed )
Terminate ( ) ;
else if ( m_Status = = eStreamStatusClosing )
2016-07-05 09:52:18 -04:00
Close ( ) ; // check is all outgoing messages have been sent and we can send close
2017-01-28 19:23:14 +03:00
}
2022-11-08 18:34:59 -05:00
size_t Stream : : Receive ( uint8_t * buf , size_t len , int timeout )
{
if ( ! len ) return 0 ;
size_t ret = 0 ;
2022-12-25 15:35:00 -05:00
volatile bool done = false ;
2022-11-08 18:34:59 -05:00
std : : condition_variable newDataReceived ;
std : : mutex newDataReceivedMutex ;
AsyncReceive ( boost : : asio : : buffer ( buf , len ) ,
2022-12-25 15:35:00 -05:00
[ & ret , & done , & newDataReceived , & newDataReceivedMutex ] ( const boost : : system : : error_code & ecode , std : : size_t bytes_transferred )
2022-11-08 18:34:59 -05:00
{
if ( ecode = = boost : : asio : : error : : timed_out )
ret = 0 ;
else
ret = bytes_transferred ;
std : : unique_lock < std : : mutex > l ( newDataReceivedMutex ) ;
newDataReceived . notify_all ( ) ;
2022-12-30 22:12:49 -05:00
done = true ;
2022-11-08 18:34:59 -05:00
} ,
timeout ) ;
2022-12-30 22:12:49 -05:00
if ( ! done )
{ std : : unique_lock < std : : mutex > l ( newDataReceivedMutex ) ;
2023-01-03 21:25:19 +03:00
if ( ! done & & newDataReceived . wait_for ( l , std : : chrono : : seconds ( timeout ) ) = = std : : cv_status : : timeout )
2022-12-30 22:12:49 -05:00
ret = 0 ;
2023-01-03 21:25:19 +03:00
}
2022-12-25 15:35:00 -05:00
if ( ! done )
{
// make sure that AsycReceive complete
auto s = shared_from_this ( ) ;
2024-11-25 16:00:06 -05:00
boost : : asio : : post ( m_Service , [ s ] ( )
2023-01-03 21:25:19 +03:00
{
2022-12-25 15:35:00 -05:00
s - > m_ReceiveTimer . cancel ( ) ;
} ) ;
int i = 0 ;
while ( ! done & & i < 100 ) // 1 sec
2023-01-03 21:25:19 +03:00
{
2022-12-25 15:35:00 -05:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 10 ) ) ;
i + + ;
2023-01-03 21:25:19 +03:00
}
}
2022-11-08 18:34:59 -05:00
return ret ;
2023-01-03 21:25:19 +03:00
}
2014-10-01 21:18:41 -04:00
size_t Stream : : Send ( const uint8_t * buf , size_t len )
2014-01-01 18:19:03 -05:00
{
2017-02-26 15:05:14 -05:00
AsyncSend ( buf , len , nullptr ) ;
2020-05-18 17:51:45 -04:00
return len ;
2017-01-28 19:23:14 +03:00
}
2014-04-18 19:27:39 -04:00
2015-04-09 15:07:25 -04:00
void Stream : : AsyncSend ( const uint8_t * buf , size_t len , SendHandler handler )
{
2023-11-13 19:12:07 -05:00
std : : shared_ptr < i2p : : stream : : SendBuffer > buffer ;
2017-02-26 15:05:14 -05:00
if ( len > 0 & & buf )
2023-11-13 19:12:07 -05:00
buffer = std : : make_shared < i2p : : stream : : SendBuffer > ( buf , len , handler ) ;
2017-02-26 15:05:14 -05:00
else if ( handler )
2020-03-01 13:25:50 +03:00
handler ( boost : : system : : error_code ( ) ) ;
2023-11-13 19:12:07 -05:00
auto s = shared_from_this ( ) ;
2024-11-25 16:00:06 -05:00
boost : : asio : : post ( m_Service , [ s , buffer ] ( )
2023-11-13 19:12:07 -05:00
{
if ( buffer )
s - > m_SendBuffer . Add ( buffer ) ;
s - > SendBuffer ( ) ;
} ) ;
2015-04-09 15:07:25 -04:00
}
2021-11-27 23:30:35 +03:00
2015-01-25 16:18:26 -05:00
void Stream : : SendBuffer ( )
2017-01-28 19:23:14 +03:00
{
2024-11-17 17:29:04 -05:00
if ( m_RemoteLeaseSet ) // don't scheudle send for first SYN for incoming stream
ScheduleSend ( ) ;
2024-08-01 13:49:32 -04:00
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2015-01-29 15:34:43 -05:00
int numMsgs = m_WindowSize - m_SentPackets . size ( ) ;
2024-08-01 13:49:32 -04:00
if ( numMsgs < = 0 | | ! m_IsSendTime ) // window is full
{
m_LastSendTime = ts ;
return ;
}
else if ( numMsgs > m_NumPacketsToSend )
numMsgs = m_NumPacketsToSend ;
2015-01-25 16:18:26 -05:00
bool isNoAck = m_LastReceivedSequenceNumber < 0 ; // first packet
std : : vector < Packet * > packets ;
2023-11-14 09:39:36 -05:00
while ( ( m_Status = = eStreamStatusNew ) | | ( IsEstablished ( ) & & ! m_SendBuffer . IsEmpty ( ) & & numMsgs > 0 ) )
2015-01-25 16:18:26 -05:00
{
2023-11-14 09:39:36 -05:00
Packet * p = m_LocalDestination . NewPacket ( ) ;
uint8_t * packet = p - > GetBuffer ( ) ;
// TODO: implement setters
size_t size = 0 ;
htobe32buf ( packet + size , m_SendStreamID ) ;
size + = 4 ; // sendStreamID
htobe32buf ( packet + size , m_RecvStreamID ) ;
size + = 4 ; // receiveStreamID
htobe32buf ( packet + size , m_SequenceNumber + + ) ;
size + = 4 ; // sequenceNum
if ( isNoAck )
htobuf32 ( packet + size , 0 ) ;
else
htobe32buf ( packet + size , m_LastReceivedSequenceNumber ) ;
size + = 4 ; // ack Through
if ( m_Status = = eStreamStatusNew & & ! m_SendStreamID & & m_RemoteIdentity )
2015-01-25 16:18:26 -05:00
{
2023-11-14 09:39:36 -05:00
// first SYN packet
packet [ size ] = 8 ;
size + + ; // NACK count
memcpy ( packet + size , m_RemoteIdentity - > GetIdentHash ( ) , 32 ) ;
size + = 32 ;
}
else
{
packet [ size ] = 0 ;
size + + ; // NACK count
}
packet [ size ] = m_RTO / 1000 ;
size + + ; // resend delay
if ( m_Status = = eStreamStatusNew )
{
// initial packet
m_Status = eStreamStatusOpen ;
if ( ! m_RemoteLeaseSet ) m_RemoteLeaseSet = m_LocalDestination . GetOwner ( ) - > FindLeaseSet ( m_RemoteIdentity - > GetIdentHash ( ) ) ; ;
if ( m_RemoteLeaseSet )
2017-01-28 19:23:14 +03:00
{
2024-11-17 18:53:21 -05:00
m_RoutingSession = m_LocalDestination . GetOwner ( ) - > GetRoutingSession ( m_RemoteLeaseSet , true , ! m_IsIncoming ) ;
m_MTU = ( m_RoutingSession & & m_RoutingSession - > IsRatchets ( ) ) ? STREAMING_MTU_RATCHETS : STREAMING_MTU ;
2017-01-28 19:23:14 +03:00
}
2023-11-14 09:39:36 -05:00
uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED |
PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED ;
if ( isNoAck ) flags | = PACKET_FLAG_NO_ACK ;
bool isOfflineSignature = m_LocalDestination . GetOwner ( ) - > GetPrivateKeys ( ) . IsOfflineSignature ( ) ;
if ( isOfflineSignature ) flags | = PACKET_FLAG_OFFLINE_SIGNATURE ;
htobe16buf ( packet + size , flags ) ;
size + = 2 ; // flags
size_t identityLen = m_LocalDestination . GetOwner ( ) - > GetIdentity ( ) - > GetFullLen ( ) ;
size_t signatureLen = m_LocalDestination . GetOwner ( ) - > GetPrivateKeys ( ) . GetSignatureLen ( ) ;
uint8_t * optionsSize = packet + size ; // set options size later
size + = 2 ; // options size
m_LocalDestination . GetOwner ( ) - > GetIdentity ( ) - > ToBuffer ( packet + size , identityLen ) ;
size + = identityLen ; // from
htobe16buf ( packet + size , m_MTU ) ;
size + = 2 ; // max packet size
if ( isOfflineSignature )
2015-01-25 16:18:26 -05:00
{
2023-11-14 09:39:36 -05:00
const auto & offlineSignature = m_LocalDestination . GetOwner ( ) - > GetPrivateKeys ( ) . GetOfflineSignature ( ) ;
memcpy ( packet + size , offlineSignature . data ( ) , offlineSignature . size ( ) ) ;
size + = offlineSignature . size ( ) ; // offline signature
2017-01-28 19:23:14 +03:00
}
2023-11-14 09:39:36 -05:00
uint8_t * signature = packet + size ; // set it later
memset ( signature , 0 , signatureLen ) ; // zeroes for now
size + = signatureLen ; // signature
htobe16buf ( optionsSize , packet + size - 2 - optionsSize ) ; // actual options size
size + = m_SendBuffer . Get ( packet + size , m_MTU ) ; // payload
m_LocalDestination . GetOwner ( ) - > Sign ( packet , size , signature ) ;
}
else
{
// follow on packet
htobuf16 ( packet + size , 0 ) ;
size + = 2 ; // flags
htobuf16 ( packet + size , 0 ) ; // no options
size + = 2 ; // options size
size + = m_SendBuffer . Get ( packet + size , m_MTU ) ; // payload
2015-01-25 16:18:26 -05:00
}
2023-11-14 09:39:36 -05:00
p - > len = size ;
packets . push_back ( p ) ;
numMsgs - - ;
2017-01-28 19:23:14 +03:00
}
2015-01-25 16:18:26 -05:00
if ( packets . size ( ) > 0 )
{
2016-06-24 21:54:58 -04:00
if ( m_SavedPackets . empty ( ) ) // no NACKS
2017-01-28 19:23:14 +03:00
{
m_IsAckSendScheduled = false ;
2016-06-24 21:54:58 -04:00
m_AckSendTimer . cancel ( ) ;
2017-01-28 19:23:14 +03:00
}
2015-01-25 16:18:26 -05:00
bool isEmpty = m_SentPackets . empty ( ) ;
2024-08-01 13:49:32 -04:00
// auto ts = i2p::util::GetMillisecondsSinceEpoch ();
2016-08-09 01:53:37 +03:00
for ( auto & it : packets )
2015-02-03 13:46:44 -05:00
{
it - > sendTime = ts ;
2015-01-25 16:18:26 -05:00
m_SentPackets . insert ( it ) ;
2015-02-03 13:46:44 -05:00
}
2015-01-25 16:18:26 -05:00
SendPackets ( packets ) ;
2024-08-01 13:49:32 -04:00
m_LastSendTime = ts ;
2024-07-04 13:07:57 -04:00
m_IsSendTime = false ;
2017-02-26 15:05:14 -05:00
if ( m_Status = = eStreamStatusClosing & & m_SendBuffer . IsEmpty ( ) )
2015-03-06 21:39:05 -05:00
SendClose ( ) ;
2015-01-25 16:18:26 -05:00
if ( isEmpty )
ScheduleResend ( ) ;
2017-01-28 19:23:14 +03:00
}
2015-01-25 16:18:26 -05:00
}
2017-01-28 19:23:14 +03:00
2014-08-07 22:03:25 -04:00
void Stream : : SendQuickAck ( )
2014-01-10 22:23:17 -05:00
{
2014-11-29 16:21:19 -05:00
int32_t lastReceivedSeqn = m_LastReceivedSequenceNumber ;
2024-08-17 17:11:28 -04:00
// for limit inbound speed
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
int numPackets = 0 ;
2024-09-18 15:19:18 -04:00
bool lostPackets = false ;
2024-08-17 17:11:28 -04:00
int64_t passedTime = m_PacketACKInterval * INITIAL_WINDOW_SIZE ; // in microseconds // while m_LastACKSendTime == 0
if ( m_LastACKSendTime )
passedTime = ( ts - m_LastACKSendTime ) * 1000 ; // in microseconds
numPackets = ( passedTime + m_PacketACKIntervalRem ) / m_PacketACKInterval ;
m_PacketACKIntervalRem = ( passedTime + m_PacketACKIntervalRem ) - ( numPackets * m_PacketACKInterval ) ;
if ( m_LastConfirmedReceivedSequenceNumber + numPackets < m_LastReceivedSequenceNumber )
2024-09-16 19:09:18 -04:00
{
2024-08-17 17:11:28 -04:00
lastReceivedSeqn = m_LastConfirmedReceivedSequenceNumber + numPackets ;
2024-09-16 19:09:18 -04:00
if ( ! m_IsAckSendScheduled )
{
auto ackTimeout = m_RTT / 10 ;
if ( ackTimeout > m_AckDelay ) ackTimeout = m_AckDelay ;
ScheduleAck ( ackTimeout ) ;
}
}
2024-08-17 17:11:28 -04:00
if ( numPackets = = 0 ) return ;
// for limit inbound speed
2014-11-29 16:21:19 -05:00
if ( ! m_SavedPackets . empty ( ) )
{
2024-08-17 17:11:28 -04:00
for ( auto it : m_SavedPackets )
{
auto seqn = it - > GetSeqn ( ) ;
2024-09-18 15:19:18 -04:00
// for limit inbound speed
if ( m_LastConfirmedReceivedSequenceNumber + numPackets < int ( seqn ) )
{
if ( ! m_IsAckSendScheduled )
{
auto ackTimeout = m_RTT / 10 ;
if ( ackTimeout > m_AckDelay ) ackTimeout = m_AckDelay ;
ScheduleAck ( ackTimeout ) ;
}
if ( lostPackets )
break ;
else
return ;
}
// for limit inbound speed
if ( ( int ) seqn > lastReceivedSeqn )
{
lastReceivedSeqn = seqn ;
lostPackets = true ; // for limit inbound speed
}
2024-08-17 17:11:28 -04:00
}
2017-01-28 19:23:14 +03:00
}
if ( lastReceivedSeqn < 0 )
{
2015-12-18 13:33:58 +00:00
LogPrint ( eLogError , " Streaming: No packets have been received yet " ) ;
2014-11-29 16:21:19 -05:00
return ;
}
2017-01-28 19:23:14 +03:00
2014-08-12 16:52:04 -04:00
Packet p ;
2017-01-28 19:23:14 +03:00
uint8_t * packet = p . GetBuffer ( ) ;
2014-01-10 22:23:17 -05:00
size_t size = 0 ;
2014-12-30 15:37:24 +01:00
htobe32buf ( packet + size , m_SendStreamID ) ;
2014-01-10 22:23:17 -05:00
size + = 4 ; // sendStreamID
2014-12-30 15:37:24 +01:00
htobe32buf ( packet + size , m_RecvStreamID ) ;
2014-01-10 22:23:17 -05:00
size + = 4 ; // receiveStreamID
2014-12-30 15:37:24 +01:00
htobuf32 ( packet + size , 0 ) ; // this is plain Ack message
2014-01-10 22:23:17 -05:00
size + = 4 ; // sequenceNum
2014-12-30 15:37:24 +01:00
htobe32buf ( packet + size , lastReceivedSeqn ) ;
2014-01-10 22:23:17 -05:00
size + = 4 ; // ack Through
2015-01-01 09:53:30 -05:00
uint8_t numNacks = 0 ;
2024-04-02 18:49:16 -04:00
bool choking = false ;
2017-01-28 19:23:14 +03:00
if ( lastReceivedSeqn > m_LastReceivedSequenceNumber )
{
2014-11-29 16:21:19 -05:00
// fill NACKs
uint8_t * nacks = packet + size + 1 ;
auto nextSeqn = m_LastReceivedSequenceNumber + 1 ;
for ( auto it : m_SavedPackets )
{
auto seqn = it - > GetSeqn ( ) ;
2024-08-17 17:11:28 -04:00
if ( m_LastConfirmedReceivedSequenceNumber + numPackets < int ( seqn ) ) // for limit inbound speed
{
htobe32buf ( packet + 12 , nextSeqn - 1 ) ;
break ;
}
2015-01-25 22:01:09 -05:00
if ( numNacks + ( seqn - nextSeqn ) > = 256 )
{
2016-06-29 01:00:00 +00:00
LogPrint ( eLogError , " Streaming: Number of NACKs exceeds 256. seqn= " , seqn , " nextSeqn= " , nextSeqn ) ;
2024-04-02 11:30:38 -04:00
htobe32buf ( packet + 12 , nextSeqn - 1 ) ; // change ack Through back
2024-04-02 18:49:16 -04:00
choking = true ;
2015-01-25 22:01:09 -05:00
break ;
2017-01-28 19:23:14 +03:00
}
2014-11-29 16:21:19 -05:00
for ( uint32_t i = nextSeqn ; i < seqn ; i + + )
{
2014-12-30 15:37:24 +01:00
htobe32buf ( nacks , i ) ;
2014-11-29 16:21:19 -05:00
nacks + = 4 ;
numNacks + + ;
2017-01-28 19:23:14 +03:00
}
2014-11-29 16:21:19 -05:00
nextSeqn = seqn + 1 ;
}
2017-01-28 19:23:14 +03:00
packet [ size ] = numNacks ;
size + + ; // NACK count
2014-11-29 19:15:41 -05:00
size + = numNacks * 4 ; // NACKs
2017-01-28 19:23:14 +03:00
}
2014-11-29 16:21:19 -05:00
else
{
// No NACKs
2017-01-28 19:23:14 +03:00
packet [ size ] = 0 ;
size + + ; // NACK count
}
2020-09-30 17:12:28 -04:00
packet [ size ] = 0 ;
2024-04-02 18:49:16 -04:00
size + + ; // resend delay
2024-09-09 18:40:06 -04:00
bool requestImmediateAck = false ;
if ( ! choking )
requestImmediateAck = m_LastSendTime & & ts > m_LastSendTime + REQUEST_IMMEDIATE_ACK_INTERVAL & &
ts > m_LastSendTime + REQUEST_IMMEDIATE_ACK_INTERVAL + m_LocalDestination . GetRandom ( ) % REQUEST_IMMEDIATE_ACK_INTERVAL_VARIANCE ;
2024-09-10 10:27:26 -04:00
htobe16buf ( packet + size , ( choking | | requestImmediateAck ) ? PACKET_FLAG_DELAY_REQUESTED : 0 ) ; // no flags set or delay requested
2014-01-10 22:23:17 -05:00
size + = 2 ; // flags
2024-09-09 18:40:06 -04:00
if ( choking | | requestImmediateAck )
2024-04-02 18:49:16 -04:00
{
2024-09-10 10:27:26 -04:00
htobe16buf ( packet + size , 2 ) ; // 2 bytes delay interval
2024-12-06 17:11:31 +02:00
htobe16buf ( packet + size + 2 , choking ? DELAY_CHOKING : 0 ) ; // set choking or immediate ack interval
2024-04-02 18:49:16 -04:00
size + = 2 ;
2024-09-10 12:22:42 -04:00
if ( requestImmediateAck ) // ack request sent
{
m_LastSendTime = ts ;
m_IsImmediateAckRequested = true ;
}
2024-04-02 18:49:16 -04:00
}
else
htobuf16 ( packet + size , 0 ) ; // no options
2014-01-10 22:23:17 -05:00
size + = 2 ; // options size
2017-01-28 19:23:14 +03:00
p . len = size ;
2014-08-12 16:52:04 -04:00
2014-08-12 18:42:53 -04:00
SendPackets ( std : : vector < Packet * > { & p } ) ;
2024-08-17 17:11:28 -04:00
m_LastACKSendTime = ts ; // for limit inbound speed
m_LastConfirmedReceivedSequenceNumber = lastReceivedSeqn ; // for limit inbound speed
2015-12-18 13:33:58 +00:00
LogPrint ( eLogDebug , " Streaming: Quick Ack sent. " , ( int ) numNacks , " NACKs " ) ;
2017-01-28 19:23:14 +03:00
}
2014-01-12 15:57:10 -05:00
2021-09-26 11:20:20 -04:00
void Stream : : SendPing ( )
{
Packet p ;
uint8_t * packet = p . GetBuffer ( ) ;
size_t size = 0 ;
htobe32buf ( packet , m_RecvStreamID ) ;
size + = 4 ; // sendStreamID
2021-11-27 23:30:35 +03:00
memset ( packet + size , 0 , 14 ) ;
size + = 14 ; // all zeroes
2021-09-26 11:20:20 -04:00
uint16_t flags = PACKET_FLAG_ECHO | PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_FROM_INCLUDED ;
bool isOfflineSignature = m_LocalDestination . GetOwner ( ) - > GetPrivateKeys ( ) . IsOfflineSignature ( ) ;
if ( isOfflineSignature ) flags | = PACKET_FLAG_OFFLINE_SIGNATURE ;
htobe16buf ( packet + size , flags ) ;
size + = 2 ; // flags
size_t identityLen = m_LocalDestination . GetOwner ( ) - > GetIdentity ( ) - > GetFullLen ( ) ;
size_t signatureLen = m_LocalDestination . GetOwner ( ) - > GetPrivateKeys ( ) . GetSignatureLen ( ) ;
uint8_t * optionsSize = packet + size ; // set options size later
size + = 2 ; // options size
m_LocalDestination . GetOwner ( ) - > GetIdentity ( ) - > ToBuffer ( packet + size , identityLen ) ;
size + = identityLen ; // from
if ( isOfflineSignature )
{
const auto & offlineSignature = m_LocalDestination . GetOwner ( ) - > GetPrivateKeys ( ) . GetOfflineSignature ( ) ;
memcpy ( packet + size , offlineSignature . data ( ) , offlineSignature . size ( ) ) ;
size + = offlineSignature . size ( ) ; // offline signature
}
uint8_t * signature = packet + size ; // set it later
memset ( signature , 0 , signatureLen ) ; // zeroes for now
size + = signatureLen ; // signature
htobe16buf ( optionsSize , packet + size - 2 - optionsSize ) ; // actual options size
2021-11-27 23:30:35 +03:00
m_LocalDestination . GetOwner ( ) - > Sign ( packet , size , signature ) ;
2021-09-26 11:20:20 -04:00
p . len = size ;
SendPackets ( std : : vector < Packet * > { & p } ) ;
LogPrint ( eLogDebug , " Streaming: Ping of " , p . len , " bytes sent " ) ;
2021-11-27 23:30:35 +03:00
}
2014-01-12 15:57:10 -05:00
void Stream : : Close ( )
{
2016-07-12 00:00:00 +00:00
LogPrint ( eLogDebug , " Streaming: closing stream with sSID= " , m_SendStreamID , " , rSID= " , m_RecvStreamID , " , status= " , m_Status ) ;
2015-03-09 12:06:35 -04:00
switch ( m_Status )
{
case eStreamStatusOpen :
m_Status = eStreamStatusClosing ;
Close ( ) ; // recursion
if ( m_Status = = eStreamStatusClosing ) //still closing
2016-06-29 01:00:00 +00:00
LogPrint ( eLogDebug , " Streaming: Trying to send stream data before closing, sSID= " , m_SendStreamID ) ;
2015-03-09 12:06:35 -04:00
break ;
case eStreamStatusReset :
2016-07-05 09:52:18 -04:00
// TODO: send reset
2017-01-28 19:23:14 +03:00
Terminate ( ) ;
2015-03-09 12:06:35 -04:00
break ;
case eStreamStatusClosing :
2017-02-26 15:05:14 -05:00
if ( m_SentPackets . empty ( ) & & m_SendBuffer . IsEmpty ( ) ) // nothing to send
2015-03-09 12:06:35 -04:00
{
2015-03-09 21:37:51 -04:00
m_Status = eStreamStatusClosed ;
2018-04-24 09:45:16 -04:00
SendClose ( ) ;
2015-03-09 12:06:35 -04:00
}
break ;
case eStreamStatusClosed :
// already closed
Terminate ( ) ;
2017-01-28 19:23:14 +03:00
break ;
2015-03-09 12:06:35 -04:00
default :
2022-08-23 22:19:23 +03:00
LogPrint ( eLogWarning , " Streaming: Unexpected stream status= " , ( int ) m_Status , " for sSID= " , m_SendStreamID ) ;
2017-01-28 19:23:14 +03:00
} ;
2014-01-12 15:57:10 -05:00
}
2014-03-26 15:06:27 -04:00
2015-04-13 11:38:23 -04:00
void Stream : : SendClose ( )
2015-03-06 21:39:05 -05:00
{
2017-01-10 21:31:52 -05:00
Packet * p = m_LocalDestination . NewPacket ( ) ;
2015-03-06 21:39:05 -05:00
uint8_t * packet = p - > GetBuffer ( ) ;
size_t size = 0 ;
htobe32buf ( packet + size , m_SendStreamID ) ;
size + = 4 ; // sendStreamID
htobe32buf ( packet + size , m_RecvStreamID ) ;
size + = 4 ; // receiveStreamID
htobe32buf ( packet + size , m_SequenceNumber + + ) ;
size + = 4 ; // sequenceNum
2020-03-01 13:25:50 +03:00
htobe32buf ( packet + size , m_LastReceivedSequenceNumber > = 0 ? m_LastReceivedSequenceNumber : 0 ) ;
2015-03-06 21:39:05 -05:00
size + = 4 ; // ack Through
2017-01-28 19:23:14 +03:00
packet [ size ] = 0 ;
2015-03-06 21:39:05 -05:00
size + + ; // NACK count
2020-09-30 17:12:28 -04:00
packet [ size ] = 0 ;
2015-03-06 21:39:05 -05:00
size + + ; // resend delay
2015-04-13 11:38:23 -04:00
htobe16buf ( packet + size , PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED ) ;
2015-03-06 21:39:05 -05:00
size + = 2 ; // flags
2020-12-13 21:55:51 -05:00
size_t signatureLen = m_LocalDestination . GetOwner ( ) - > GetPrivateKeys ( ) . GetSignatureLen ( ) ;
2015-03-06 21:39:05 -05:00
htobe16buf ( packet + size , signatureLen ) ; // signature only
size + = 2 ; // options size
uint8_t * signature = packet + size ;
memset ( packet + size , 0 , signatureLen ) ;
size + = signatureLen ; // signature
2015-11-03 09:15:49 -05:00
m_LocalDestination . GetOwner ( ) - > Sign ( packet , size , signature ) ;
2017-01-28 19:23:14 +03:00
2015-03-06 21:39:05 -05:00
p - > len = size ;
2024-11-25 16:00:06 -05:00
boost : : asio : : post ( m_Service , std : : bind ( & Stream : : SendPacket , shared_from_this ( ) , p ) ) ;
2016-06-29 01:00:00 +00:00
LogPrint ( eLogDebug , " Streaming: FIN sent, sSID= " , m_SendStreamID ) ;
2017-01-28 19:23:14 +03:00
}
2014-03-26 15:06:27 -04:00
size_t Stream : : ConcatenatePackets ( uint8_t * buf , size_t len )
{
2014-01-10 20:21:38 -05:00
size_t pos = 0 ;
2014-04-11 21:13:52 -04:00
while ( pos < len & & ! m_ReceiveQueue . empty ( ) )
2014-01-10 20:21:38 -05:00
{
2014-04-11 21:13:52 -04:00
Packet * packet = m_ReceiveQueue . front ( ) ;
size_t l = std : : min ( packet - > GetLength ( ) , len - pos ) ;
memcpy ( buf + pos , packet - > GetBuffer ( ) , l ) ;
pos + = l ;
packet - > offset + = l ;
if ( ! packet - > GetLength ( ) )
2014-01-10 20:21:38 -05:00
{
2014-04-11 21:13:52 -04:00
m_ReceiveQueue . pop ( ) ;
2017-01-10 21:31:52 -05:00
m_LocalDestination . DeletePacket ( packet ) ;
2017-01-28 19:23:14 +03:00
}
}
return pos ;
2014-03-26 15:06:27 -04:00
}
2014-03-17 22:55:02 -04:00
2014-03-24 19:27:20 -04:00
bool Stream : : SendPacket ( Packet * packet )
{
if ( packet )
2017-01-28 19:23:14 +03:00
{
2014-10-10 15:58:17 -04:00
if ( m_IsAckSendScheduled )
{
2017-01-28 19:23:14 +03:00
m_IsAckSendScheduled = false ;
2014-10-10 15:58:17 -04:00
m_AckSendTimer . cancel ( ) ;
}
2024-06-29 09:17:11 -04:00
if ( ! packet - > sendTime ) packet - > sendTime = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2014-08-12 16:52:04 -04:00
SendPackets ( std : : vector < Packet * > { packet } ) ;
2016-07-05 09:52:18 -04:00
bool isEmpty = m_SentPackets . empty ( ) ;
m_SentPackets . insert ( packet ) ;
if ( isEmpty )
ScheduleResend ( ) ;
2017-01-28 19:23:14 +03:00
return true ;
}
2014-03-24 19:27:20 -04:00
else
return false ;
2017-01-28 19:23:14 +03:00
}
2014-08-12 16:35:35 -04:00
void Stream : : SendPackets ( const std : : vector < Packet * > & packets )
{
if ( ! m_RemoteLeaseSet )
{
2024-10-18 15:59:37 -04:00
CancelRemoteLeaseChange ( ) ;
2017-01-28 19:23:14 +03:00
UpdateCurrentRemoteLease ( ) ;
2014-08-12 16:35:35 -04:00
if ( ! m_RemoteLeaseSet )
{
2016-06-29 01:00:00 +00:00
LogPrint ( eLogError , " Streaming: Can't send packets, missing remote LeaseSet, sSID= " , m_SendStreamID ) ;
2014-08-12 16:35:35 -04:00
return ;
}
}
2021-01-05 15:56:48 -05:00
if ( ! m_RoutingSession | | m_RoutingSession - > IsTerminated ( ) | | ! m_RoutingSession - > IsReadyToSend ( ) ) // expired and detached or new session sent
2024-11-17 18:53:21 -05:00
{
2024-11-19 13:00:13 -05:00
m_RoutingSession = m_LocalDestination . GetOwner ( ) - > GetRoutingSession ( m_RemoteLeaseSet , true , ! m_IsIncoming | | m_SequenceNumber > 1 ) ;
2024-11-17 18:53:21 -05:00
if ( ! m_RoutingSession )
{
LogPrint ( eLogError , " Streaming: Can't obtain routing session, sSID= " , m_SendStreamID ) ;
Terminate ( ) ;
return ;
}
}
2016-11-22 15:20:48 -05:00
if ( ! m_CurrentOutboundTunnel & & m_RoutingSession ) // first message to send
2016-02-10 22:51:08 -05:00
{
// try to get shared path first
2016-11-22 15:20:48 -05:00
auto routingPath = m_RoutingSession - > GetSharedRoutingPath ( ) ;
if ( routingPath )
{
m_CurrentOutboundTunnel = routingPath - > outboundTunnel ;
m_CurrentRemoteLease = routingPath - > remoteLease ;
m_RTT = routingPath - > rtt ;
}
2017-01-28 19:23:14 +03:00
}
2014-08-12 16:35:35 -04:00
2017-01-28 19:23:14 +03:00
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2024-10-18 15:59:37 -04:00
if ( ! m_CurrentRemoteLease | | ! m_CurrentRemoteLease - > endDate ) // excluded from LeaseSet
{
CancelRemoteLeaseChange ( ) ;
2015-04-16 11:38:36 -04:00
UpdateCurrentRemoteLease ( true ) ;
2024-10-18 15:59:37 -04:00
}
2024-11-25 08:12:40 -05:00
if ( m_RemoteLeaseChangeTime & & m_IsRemoteLeaseChangeInProgress & & ts > m_RemoteLeaseChangeTime + INITIAL_RTO )
2024-10-18 15:59:37 -04:00
{
CancelRemoteLeaseChange ( ) ;
m_CurrentRemoteLease = m_NextRemoteLease ;
HalveWindowSize ( ) ;
}
auto currentRemoteLease = m_CurrentRemoteLease ;
if ( ! m_IsRemoteLeaseChangeInProgress & & m_RemoteLeaseSet & & m_CurrentRemoteLease & & ts > = m_CurrentRemoteLease - > endDate - i2p : : data : : LEASE_ENDDATE_THRESHOLD )
{
auto leases = m_RemoteLeaseSet - > GetNonExpiredLeases ( false ) ;
2024-10-19 08:45:25 -04:00
if ( leases . size ( ) )
2024-10-18 15:59:37 -04:00
{
m_IsRemoteLeaseChangeInProgress = true ;
UpdateCurrentRemoteLease ( true ) ;
m_NextRemoteLease = m_CurrentRemoteLease ;
}
2024-10-19 08:45:25 -04:00
else
UpdateCurrentRemoteLease ( true ) ;
2024-10-18 15:59:37 -04:00
}
2016-02-12 20:56:29 -05:00
if ( m_CurrentRemoteLease & & ts < m_CurrentRemoteLease - > endDate + i2p : : data : : LEASE_ENDDATE_THRESHOLD )
2017-01-28 19:23:14 +03:00
{
2024-04-07 21:24:56 +03:00
bool freshTunnel = false ;
2021-11-07 17:18:31 -05:00
if ( ! m_CurrentOutboundTunnel )
{
auto leaseRouter = i2p : : data : : netdb . FindRouter ( m_CurrentRemoteLease - > tunnelGateway ) ;
m_CurrentOutboundTunnel = m_LocalDestination . GetOwner ( ) - > GetTunnelPool ( ) - > GetNextOutboundTunnel ( nullptr ,
leaseRouter ? leaseRouter - > GetCompatibleTransports ( false ) : ( i2p : : data : : RouterInfo : : CompatibleTransports ) i2p : : data : : RouterInfo : : eAllTransports ) ;
2024-04-07 21:24:56 +03:00
freshTunnel = true ;
2021-11-27 23:30:35 +03:00
}
2021-11-07 17:18:31 -05:00
else if ( ! m_CurrentOutboundTunnel - > IsEstablished ( ) )
2024-04-08 19:00:02 -04:00
std : : tie ( m_CurrentOutboundTunnel , freshTunnel ) = m_LocalDestination . GetOwner ( ) - > GetTunnelPool ( ) - > GetNewOutboundTunnel ( m_CurrentOutboundTunnel ) ;
2021-11-07 17:18:31 -05:00
if ( ! m_CurrentOutboundTunnel )
{
LogPrint ( eLogError , " Streaming: No outbound tunnels in the pool, sSID= " , m_SendStreamID ) ;
m_CurrentRemoteLease = nullptr ;
return ;
}
2024-04-07 21:24:56 +03:00
if ( freshTunnel )
{
m_RTO = INITIAL_RTO ;
2024-06-29 09:17:11 -04:00
// m_TunnelsChangeSequenceNumber = m_SequenceNumber; // should be determined more precisely
2024-04-07 21:24:56 +03:00
}
2021-11-27 23:30:35 +03:00
2014-10-07 10:33:17 -04:00
std : : vector < i2p : : tunnel : : TunnelMessageBlock > msgs ;
2021-07-08 22:22:00 -04:00
for ( const auto & it : packets )
2017-01-28 19:23:14 +03:00
{
2020-05-02 11:13:40 -04:00
auto msg = m_RoutingSession - > WrapSingleMessage ( m_LocalDestination . CreateDataMessage (
2022-10-24 19:21:58 -04:00
it - > GetBuffer ( ) , it - > GetLength ( ) , m_Port , ! m_RoutingSession - > IsRatchets ( ) , it - > IsSYN ( ) ) ) ;
2017-01-28 19:23:14 +03:00
msgs . push_back ( i2p : : tunnel : : TunnelMessageBlock
{
2015-03-22 18:34:39 -04:00
i2p : : tunnel : : eDeliveryTypeTunnel ,
2016-02-09 10:46:27 -05:00
m_CurrentRemoteLease - > tunnelGateway , m_CurrentRemoteLease - > tunnelID ,
2015-06-21 22:29:50 -04:00
msg
2017-01-28 19:23:14 +03:00
} ) ;
2014-10-13 17:03:27 -04:00
m_NumSentBytes + = it - > GetLength ( ) ;
2024-10-18 15:59:37 -04:00
if ( m_IsRemoteLeaseChangeInProgress & & ! m_RemoteLeaseChangeTime )
{
m_RemoteLeaseChangeTime = ts ;
m_CurrentRemoteLease = currentRemoteLease ; // change it back before new lease is confirmed
}
2014-10-07 10:33:17 -04:00
}
2023-04-04 13:19:08 -04:00
m_CurrentOutboundTunnel - > SendTunnelDataMsgs ( msgs ) ;
2017-01-28 19:23:14 +03:00
}
2014-10-07 10:33:17 -04:00
else
2017-01-28 19:23:14 +03:00
{
2016-10-26 13:02:19 -04:00
LogPrint ( eLogWarning , " Streaming: Remote lease is not available, sSID= " , m_SendStreamID ) ;
2017-01-28 19:23:14 +03:00
if ( m_RoutingSession )
2016-10-26 13:02:19 -04:00
m_RoutingSession - > SetSharedRoutingPath ( nullptr ) ; // invalidate routing path
}
2014-08-12 16:35:35 -04:00
}
2016-10-22 20:08:15 -04:00
void Stream : : SendUpdatedLeaseSet ( )
{
2021-01-18 12:58:27 -05:00
if ( m_RoutingSession & & ! m_RoutingSession - > IsTerminated ( ) )
2017-01-28 19:23:14 +03:00
{
if ( m_RoutingSession - > IsLeaseSetNonConfirmed ( ) )
2016-10-24 20:58:25 -04:00
{
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2024-04-01 08:41:58 -04:00
if ( ts > m_RoutingSession - > GetLeaseSetSubmissionTime ( ) + i2p : : garlic : : LEASESET_CONFIRMATION_TIMEOUT )
2016-10-24 20:58:25 -04:00
{
// LeaseSet was not confirmed, should try other tunnels
2024-04-01 08:41:58 -04:00
LogPrint ( eLogWarning , " Streaming: LeaseSet was not confirmed in " , i2p : : garlic : : LEASESET_CONFIRMATION_TIMEOUT , " milliseconds. Trying to resubmit " ) ;
2016-10-24 20:58:25 -04:00
m_RoutingSession - > SetSharedRoutingPath ( nullptr ) ;
2017-01-28 19:23:14 +03:00
m_CurrentOutboundTunnel = nullptr ;
2016-10-24 20:58:25 -04:00
m_CurrentRemoteLease = nullptr ;
SendQuickAck ( ) ;
2017-01-28 19:23:14 +03:00
}
}
2016-10-24 20:58:25 -04:00
else if ( m_RoutingSession - > IsLeaseSetUpdated ( ) )
2017-01-28 19:23:14 +03:00
{
2016-10-24 20:58:25 -04:00
LogPrint ( eLogDebug , " Streaming: sending updated LeaseSet " ) ;
SendQuickAck ( ) ;
2017-01-28 19:23:14 +03:00
}
}
2021-01-18 12:58:27 -05:00
else
SendQuickAck ( ) ;
2017-01-28 19:23:14 +03:00
}
2024-06-29 09:17:11 -04:00
void Stream : : ScheduleSend ( )
{
if ( m_Status ! = eStreamStatusTerminated )
{
m_SendTimer . cancel ( ) ;
2024-10-26 19:26:25 -04:00
m_SendTimer . expires_from_now ( boost : : posix_time : : microseconds (
SEND_INTERVAL + m_LocalDestination . GetRandom ( ) % SEND_INTERVAL_VARIANCE ) ) ;
2024-06-29 09:17:11 -04:00
m_SendTimer . async_wait ( std : : bind ( & Stream : : HandleSendTimer ,
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
}
}
void Stream : : HandleSendTimer ( const boost : : system : : error_code & ecode )
{
if ( ecode ! = boost : : asio : : error : : operation_aborted )
{
2024-08-01 13:49:32 -04:00
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
if ( m_LastSendTime & & ts * 1000 > m_LastSendTime * 1000 + m_PacingTime )
{
2024-10-26 19:26:25 -04:00
if ( m_PacingTime )
{
auto numPackets = std : : lldiv ( m_PacingTimeRem + ts * 1000 - m_LastSendTime * 1000 , m_PacingTime ) ;
m_NumPacketsToSend = numPackets . quot ;
m_PacingTimeRem = numPackets . rem ;
}
else
{
LogPrint ( eLogError , " Streaming: pacing time is zero " ) ;
m_NumPacketsToSend = 1 ; m_PacingTimeRem = 0 ;
}
2024-08-01 13:49:32 -04:00
m_IsSendTime = true ;
2024-10-14 22:29:55 -04:00
if ( m_WindowIncCounter & & m_WindowSize < MAX_WINDOW_SIZE & & ! m_SendBuffer . IsEmpty ( ) & & m_PacingTime > m_MinPacingTime )
2024-08-29 18:57:14 -04:00
{
for ( int i = 0 ; i < m_NumPacketsToSend ; i + + )
{
if ( m_WindowIncCounter )
{
if ( m_LastWindowDropSize & & ( m_LastWindowDropSize > = m_WindowSize ) )
m_WindowSize + = 1 - ( 1 / ( ( m_LastWindowDropSize + PREV_SPEED_KEEP_TIME_COEFF ) / m_WindowSize ) ) ; // some magic here
else if ( m_LastWindowDropSize & & ( m_LastWindowDropSize < m_WindowSize ) )
m_WindowSize + = ( m_WindowSize - ( m_LastWindowDropSize - PREV_SPEED_KEEP_TIME_COEFF ) ) / m_WindowSize ; // some magic here
else
m_WindowSize + = ( m_WindowSize - ( 1 - PREV_SPEED_KEEP_TIME_COEFF ) ) / m_WindowSize ;
if ( m_WindowSize > MAX_WINDOW_SIZE ) m_WindowSize = MAX_WINDOW_SIZE ;
2024-10-28 08:38:04 -04:00
m_WindowIncCounter - - ;
2024-08-29 18:57:14 -04:00
}
2024-10-28 08:38:04 -04:00
else
break ;
2024-08-29 18:57:14 -04:00
}
2024-10-28 08:38:04 -04:00
UpdatePacingTime ( ) ;
2024-08-29 18:57:14 -04:00
}
if ( m_IsNAcked )
ResendPacket ( ) ;
else if ( m_IsResendNeeded ) // resend packets
2024-08-01 13:49:32 -04:00
ResendPacket ( ) ;
// delay-based CC
else if ( m_WindowSize > int ( m_SentPackets . size ( ) ) ) // send packets
SendBuffer ( ) ;
}
2024-06-29 09:17:11 -04:00
else // pass
ScheduleSend ( ) ;
}
}
2014-08-10 18:27:23 -04:00
void Stream : : ScheduleResend ( )
{
2021-08-11 12:23:43 -04:00
if ( m_Status ! = eStreamStatusTerminated )
2021-11-27 23:30:35 +03:00
{
2021-08-11 12:23:43 -04:00
m_ResendTimer . cancel ( ) ;
// check for invalid value
if ( m_RTO < = 0 ) m_RTO = INITIAL_RTO ;
m_ResendTimer . expires_from_now ( boost : : posix_time : : milliseconds ( m_RTO ) ) ;
m_ResendTimer . async_wait ( std : : bind ( & Stream : : HandleResendTimer ,
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
2021-11-27 23:30:35 +03:00
}
2014-08-10 18:27:23 -04:00
}
2017-01-28 19:23:14 +03:00
2014-08-10 18:27:23 -04:00
void Stream : : HandleResendTimer ( const boost : : system : : error_code & ecode )
{
2017-01-28 19:23:14 +03:00
if ( ecode ! = boost : : asio : : error : : operation_aborted )
{
2024-07-04 13:07:57 -04:00
m_IsSendTime = true ;
2024-06-29 09:17:11 -04:00
if ( m_RTO > INITIAL_RTO ) m_RTO = INITIAL_RTO ;
m_SendTimer . cancel ( ) ; // if no ack's in RTO, disable fast retransmit
m_IsTimeOutResend = true ;
m_IsNAcked = false ;
2024-08-01 13:49:32 -04:00
m_IsResendNeeded = false ;
2024-08-19 18:30:49 -04:00
m_NumPacketsToSend = 1 ;
2024-06-29 09:17:11 -04:00
ResendPacket ( ) ; // send one packet per RTO, waiting for ack
}
}
void Stream : : ResendPacket ( )
{
2024-11-17 20:51:59 -05:00
// check for resend attempts
2024-11-18 07:59:39 -05:00
if ( m_IsIncoming & & m_SequenceNumber = = 1 & & m_NumResendAttempts > 0 )
2024-11-17 20:51:59 -05:00
{
LogPrint ( eLogWarning , " Streaming: SYNACK packet was not ACKed after " , m_NumResendAttempts , " attempts, terminate, rSID= " , m_RecvStreamID , " , sSID= " , m_SendStreamID ) ;
m_Status = eStreamStatusReset ;
Close ( ) ;
return ;
}
if ( m_NumResendAttempts > = MAX_NUM_RESEND_ATTEMPTS )
{
LogPrint ( eLogWarning , " Streaming: packet was not ACKed after " , MAX_NUM_RESEND_ATTEMPTS , " attempts, terminate, rSID= " , m_RecvStreamID , " , sSID= " , m_SendStreamID ) ;
m_Status = eStreamStatusReset ;
Close ( ) ;
return ;
}
2015-03-21 16:26:14 -04:00
2024-11-17 20:51:59 -05:00
// collect packets to resend
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
std : : vector < Packet * > packets ;
if ( m_IsNAcked )
{
for ( auto it : m_NACKedPackets )
2014-08-11 19:32:07 -04:00
{
2024-11-17 20:51:59 -05:00
if ( ts > = it - > sendTime + m_RTO )
2015-02-03 13:46:44 -05:00
{
2024-11-17 20:51:59 -05:00
if ( ts < it - > sendTime + m_RTO * 2 )
it - > resent = true ;
else
it - > resent = false ;
it - > sendTime = ts ;
packets . push_back ( it ) ;
if ( ( int ) packets . size ( ) > = m_NumPacketsToSend ) break ;
2024-08-27 15:33:59 -04:00
}
}
2024-11-17 20:51:59 -05:00
}
else
{
for ( auto it : m_SentPackets )
2024-08-27 15:33:59 -04:00
{
2024-11-17 20:51:59 -05:00
if ( ts > = it - > sendTime + m_RTO )
2024-08-27 15:33:59 -04:00
{
2024-11-17 20:51:59 -05:00
if ( ts < it - > sendTime + m_RTO * 2 )
it - > resent = true ;
else
it - > resent = false ;
it - > sendTime = ts ;
packets . push_back ( it ) ;
if ( ( int ) packets . size ( ) > = m_NumPacketsToSend ) break ;
2017-01-28 19:23:14 +03:00
}
}
2024-11-17 20:51:59 -05:00
}
// select tunnels if necessary and send
if ( packets . size ( ) > 0 & & m_IsSendTime )
{
if ( m_IsNAcked ) m_NumResendAttempts = 1 ;
else if ( m_IsTimeOutResend ) m_NumResendAttempts + + ;
if ( m_NumResendAttempts = = 1 & & m_RTO ! = INITIAL_RTO )
{
// loss-based CC
if ( ! m_IsWinDropped & & LOSS_BASED_CONTROL_ENABLED )
ProcessWindowDrop ( ) ;
}
else if ( m_IsTimeOutResend )
2014-08-14 21:38:46 -04:00
{
2024-11-17 20:51:59 -05:00
m_IsTimeOutResend = false ;
m_RTO = INITIAL_RTO ; // drop RTO to initial upon tunnels pair change
m_WindowDropTargetSize = INITIAL_WINDOW_SIZE ;
m_LastWindowDropSize = 0 ;
m_WindowIncCounter = 0 ;
m_IsWinDropped = true ;
m_IsFirstRttSample = true ;
m_DropWindowDelaySequenceNumber = 0 ;
m_IsFirstACK = true ;
UpdatePacingTime ( ) ;
if ( m_RoutingSession ) m_RoutingSession - > SetSharedRoutingPath ( nullptr ) ;
if ( m_NumResendAttempts & 1 )
2024-04-07 16:47:06 +03:00
{
2024-11-17 20:51:59 -05:00
// pick another outbound tunnel
m_CurrentOutboundTunnel = m_LocalDestination . GetOwner ( ) - > GetTunnelPool ( ) - > GetNextOutboundTunnel ( m_CurrentOutboundTunnel ) ;
LogPrint ( eLogWarning , " Streaming: Resend # " , m_NumResendAttempts ,
" , another outbound tunnel has been selected for stream with sSID= " , m_SendStreamID ) ;
2024-04-07 16:47:06 +03:00
}
2024-11-17 20:51:59 -05:00
else
2017-01-28 19:23:14 +03:00
{
2024-11-17 20:51:59 -05:00
CancelRemoteLeaseChange ( ) ;
UpdateCurrentRemoteLease ( ) ; // pick another lease
LogPrint ( eLogWarning , " Streaming: Resend # " , m_NumResendAttempts ,
" , another remote lease has been selected for stream with rSID= " , m_RecvStreamID , " , sSID= " , m_SendStreamID ) ;
2017-01-28 19:23:14 +03:00
}
}
2024-11-17 20:51:59 -05:00
SendPackets ( packets ) ;
m_LastSendTime = ts ;
m_IsSendTime = false ;
if ( m_IsNAcked | | m_IsResendNeeded ) ScheduleSend ( ) ;
}
else
SendBuffer ( ) ;
if ( ! m_IsNAcked & & ! m_IsResendNeeded ) ScheduleResend ( ) ;
2017-01-28 19:23:14 +03:00
}
2024-01-27 12:17:59 -05:00
void Stream : : ScheduleAck ( int timeout )
{
if ( m_IsAckSendScheduled )
m_AckSendTimer . cancel ( ) ;
m_IsAckSendScheduled = true ;
if ( timeout < MIN_SEND_ACK_TIMEOUT ) timeout = MIN_SEND_ACK_TIMEOUT ;
m_AckSendTimer . expires_from_now ( boost : : posix_time : : milliseconds ( timeout ) ) ;
m_AckSendTimer . async_wait ( std : : bind ( & Stream : : HandleAckSendTimer ,
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
}
2014-10-10 11:53:27 -04:00
void Stream : : HandleAckSendTimer ( const boost : : system : : error_code & ecode )
{
2014-10-10 15:58:17 -04:00
if ( m_IsAckSendScheduled )
2014-10-10 11:53:27 -04:00
{
2015-03-23 18:07:43 -04:00
if ( m_LastReceivedSequenceNumber < 0 )
{
2017-11-24 15:37:17 -05:00
LogPrint ( eLogWarning , " Streaming: SYN has not been received after " , SYN_TIMEOUT , " milliseconds after follow on, terminate rSID= " , m_RecvStreamID , " , sSID= " , m_SendStreamID ) ;
2015-03-23 18:07:43 -04:00
m_Status = eStreamStatusReset ;
Close ( ) ;
return ;
2017-01-28 19:23:14 +03:00
}
2015-03-08 19:36:33 -04:00
if ( m_Status = = eStreamStatusOpen )
2016-09-07 13:25:11 -04:00
{
2017-01-28 19:23:14 +03:00
if ( m_RoutingSession & & m_RoutingSession - > IsLeaseSetNonConfirmed ( ) )
2016-09-07 13:25:11 -04:00
{
2024-04-01 08:41:58 -04:00
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
if ( ts > m_RoutingSession - > GetLeaseSetSubmissionTime ( ) + i2p : : garlic : : LEASESET_CONFIRMATION_TIMEOUT )
{
// seems something went wrong and we should re-select tunnels
m_CurrentOutboundTunnel = nullptr ;
m_CurrentRemoteLease = nullptr ;
}
2016-09-07 13:25:11 -04:00
}
2014-10-10 11:53:27 -04:00
SendQuickAck ( ) ;
2016-09-07 13:25:11 -04:00
}
2014-10-10 15:58:17 -04:00
m_IsAckSendScheduled = false ;
2017-01-28 19:23:14 +03:00
}
2014-10-10 11:53:27 -04:00
}
2015-04-16 11:38:36 -04:00
void Stream : : UpdateCurrentRemoteLease ( bool expired )
2014-03-23 09:25:16 -04:00
{
2024-09-16 19:09:18 -04:00
bool isLeaseChanged = true ;
2017-01-28 19:23:14 +03:00
if ( ! m_RemoteLeaseSet | | m_RemoteLeaseSet - > IsExpired ( ) )
2014-08-06 11:09:06 -04:00
{
2021-05-07 22:15:12 -04:00
auto remoteLeaseSet = m_LocalDestination . GetOwner ( ) - > FindLeaseSet ( m_RemoteIdentity - > GetIdentHash ( ) ) ;
if ( ! remoteLeaseSet )
2017-01-28 19:23:14 +03:00
{
2021-05-07 22:15:12 -04:00
LogPrint ( eLogWarning , " Streaming: LeaseSet " , m_RemoteIdentity - > GetIdentHash ( ) . ToBase64 ( ) , m_RemoteLeaseSet ? " expired " : " not found " ) ;
2024-11-10 16:49:44 -05:00
if ( ! m_IsIncoming ) // outgoing
{
if ( m_RemoteLeaseSet & & m_RemoteLeaseSet - > IsPublishedEncrypted ( ) )
{
m_LocalDestination . GetOwner ( ) - > RequestDestinationWithEncryptedLeaseSet (
std : : make_shared < i2p : : data : : BlindedPublicKey > ( m_RemoteIdentity ) ) ;
return ; // we keep m_RemoteLeaseSet for possible next request
}
else
{
m_RemoteLeaseSet = nullptr ;
m_LocalDestination . GetOwner ( ) - > RequestDestination ( m_RemoteIdentity - > GetIdentHash ( ) ) ; // try to request for a next attempt
}
2021-11-27 23:30:35 +03:00
}
2024-11-10 16:49:44 -05:00
else // incoming
2021-11-27 23:30:35 +03:00
{
2024-11-10 16:49:44 -05:00
// just close the socket without sending FIN or RST
m_Status = eStreamStatusClosed ;
AsyncClose ( ) ;
}
2017-01-28 19:23:14 +03:00
}
2019-02-06 21:19:44 -05:00
else
{
// LeaseSet updated
2021-05-07 22:15:12 -04:00
m_RemoteLeaseSet = remoteLeaseSet ;
2019-02-06 21:19:44 -05:00
m_RemoteIdentity = m_RemoteLeaseSet - > GetIdentity ( ) ;
m_TransientVerifier = m_RemoteLeaseSet - > GetTransientVerifier ( ) ;
2020-03-01 13:25:50 +03:00
}
2014-08-06 11:09:06 -04:00
}
2014-08-01 14:54:14 -04:00
if ( m_RemoteLeaseSet )
{
2014-08-29 22:10:00 -04:00
if ( ! m_RoutingSession )
2016-01-24 22:24:39 -05:00
m_RoutingSession = m_LocalDestination . GetOwner ( ) - > GetRoutingSession ( m_RemoteLeaseSet , true ) ;
2015-03-26 10:30:29 -04:00
auto leases = m_RemoteLeaseSet - > GetNonExpiredLeases ( false ) ; // try without threshold first
if ( leases . empty ( ) )
{
2015-04-16 11:38:36 -04:00
expired = false ;
2019-04-17 15:53:07 -04:00
// time to request
2019-08-07 16:18:00 -04:00
if ( m_RemoteLeaseSet - > IsPublishedEncrypted ( ) )
2019-04-17 15:53:07 -04:00
m_LocalDestination . GetOwner ( ) - > RequestDestinationWithEncryptedLeaseSet (
std : : make_shared < i2p : : data : : BlindedPublicKey > ( m_RemoteIdentity ) ) ;
else
2020-03-01 13:25:50 +03:00
m_LocalDestination . GetOwner ( ) - > RequestDestination ( m_RemoteIdentity - > GetIdentHash ( ) ) ;
2017-01-28 19:23:14 +03:00
leases = m_RemoteLeaseSet - > GetNonExpiredLeases ( true ) ; // then with threshold
2015-03-26 10:30:29 -04:00
}
2014-08-01 14:54:14 -04:00
if ( ! leases . empty ( ) )
2017-01-28 19:23:14 +03:00
{
bool updated = false ;
2016-02-09 10:46:27 -05:00
if ( expired & & m_CurrentRemoteLease )
2015-04-16 11:38:36 -04:00
{
2016-08-09 01:53:37 +03:00
for ( const auto & it : leases )
2016-02-09 10:46:27 -05:00
if ( ( it - > tunnelGateway = = m_CurrentRemoteLease - > tunnelGateway ) & & ( it - > tunnelID ! = m_CurrentRemoteLease - > tunnelID ) )
2015-04-16 11:38:36 -04:00
{
m_CurrentRemoteLease = it ;
updated = true ;
break ;
2017-01-28 19:23:14 +03:00
}
2015-04-16 11:38:36 -04:00
}
2024-09-17 20:56:00 -04:00
if ( ! updated )
2015-04-16 11:38:36 -04:00
{
2024-08-31 08:02:56 -04:00
uint32_t i = m_LocalDestination . GetRandom ( ) % leases . size ( ) ;
2016-02-09 10:46:27 -05:00
if ( m_CurrentRemoteLease & & leases [ i ] - > tunnelID = = m_CurrentRemoteLease - > tunnelID )
2024-09-17 20:56:00 -04:00
{
2017-01-28 19:23:14 +03:00
// make sure we don't select previous
2024-09-17 20:56:00 -04:00
if ( leases . size ( ) > 1 )
i = ( i + 1 ) % leases . size ( ) ; // if so, pick next
else
isLeaseChanged = false ;
}
2017-01-28 19:23:14 +03:00
m_CurrentRemoteLease = leases [ i ] ;
2015-04-16 11:38:36 -04:00
}
2017-01-28 19:23:14 +03:00
}
2014-08-01 14:54:14 -04:00
else
2017-01-28 19:23:14 +03:00
{
2016-10-26 13:02:19 -04:00
LogPrint ( eLogWarning , " Streaming: All remote leases are expired " ) ;
2015-03-26 10:30:29 -04:00
m_RemoteLeaseSet = nullptr ;
2016-02-09 10:46:27 -05:00
m_CurrentRemoteLease = nullptr ;
2016-02-26 16:16:59 -05:00
// we have requested expired before, no need to do it twice
2017-01-28 19:23:14 +03:00
}
2014-08-01 14:54:14 -04:00
}
2014-03-23 09:25:16 -04:00
else
2016-10-26 13:02:19 -04:00
{
LogPrint ( eLogWarning , " Streaming: Remote LeaseSet not found " ) ;
2016-02-09 10:46:27 -05:00
m_CurrentRemoteLease = nullptr ;
2017-01-28 19:23:14 +03:00
}
2024-10-18 15:59:37 -04:00
if ( isLeaseChanged & & ! m_IsRemoteLeaseChangeInProgress )
2024-08-29 18:57:14 -04:00
{
2024-10-18 15:59:37 -04:00
HalveWindowSize ( ) ;
2024-08-29 18:57:14 -04:00
}
2017-01-28 19:23:14 +03:00
}
2014-10-22 11:46:54 -04:00
2024-05-17 19:19:17 -04:00
void Stream : : ResetRoutingPath ( )
{
m_CurrentOutboundTunnel = nullptr ;
m_CurrentRemoteLease = nullptr ;
m_RTT = INITIAL_RTT ;
m_RTO = INITIAL_RTO ;
if ( m_RoutingSession )
m_RoutingSession - > SetSharedRoutingPath ( nullptr ) ; // TODO: count failures
}
2024-07-04 13:07:57 -04:00
void Stream : : UpdatePacingTime ( )
{
m_PacingTime = std : : round ( m_RTT * 1000 / m_WindowSize ) ;
2024-07-04 13:28:18 -04:00
if ( m_MinPacingTime & & m_PacingTime < m_MinPacingTime )
m_PacingTime = m_MinPacingTime ;
2024-07-04 13:07:57 -04:00
}
2024-08-29 18:57:14 -04:00
void Stream : : ProcessWindowDrop ( )
{
if ( m_WindowSize > m_LastWindowDropSize )
m_LastWindowDropSize = ( m_LastWindowDropSize + m_WindowSize ) / 2 ;
else
m_LastWindowDropSize = m_WindowSize ;
m_WindowDropTargetSize = m_LastWindowDropSize - ( m_LastWindowDropSize / 4 ) ; // -25%;
if ( m_WindowDropTargetSize < MIN_WINDOW_SIZE + 1 )
m_WindowDropTargetSize = MIN_WINDOW_SIZE + 1 ;
m_WindowSize = m_SentPackets . size ( ) ; // stop sending now
if ( m_WindowSize < MIN_WINDOW_SIZE ) m_WindowSize = MIN_WINDOW_SIZE ;
m_WindowIncCounter = 0 ; // disable window growth
m_DropWindowDelaySequenceNumber = m_SequenceNumber ;
m_IsFirstACK = true ; // ignore first RTT sample
m_IsWinDropped = true ; // don't drop window twice
UpdatePacingTime ( ) ;
}
2024-10-18 15:59:37 -04:00
void Stream : : HalveWindowSize ( )
{
m_RTO = INITIAL_RTO ;
if ( m_WindowSize > INITIAL_WINDOW_SIZE )
{
m_WindowDropTargetSize = std : : max ( m_WindowSize / 2 , ( float ) INITIAL_WINDOW_SIZE ) ;
m_IsWinDropped = true ;
}
else
m_WindowSize = INITIAL_WINDOW_SIZE ;
m_LastWindowDropSize = 0 ;
m_WindowIncCounter = 0 ;
m_IsFirstRttSample = true ;
m_IsFirstACK = true ;
UpdatePacingTime ( ) ;
}
void Stream : : CancelRemoteLeaseChange ( )
{
m_RemoteLeaseChangeTime = 0 ;
m_IsRemoteLeaseChangeInProgress = false ;
}
2017-01-28 19:23:14 +03:00
StreamingDestination : : StreamingDestination ( std : : shared_ptr < i2p : : client : : ClientDestination > owner , uint16_t localPort , bool gzip ) :
m_Owner ( owner ) , m_LocalPort ( localPort ) , m_Gzip ( gzip ) ,
2024-09-16 13:39:11 -04:00
m_PendingIncomingTimer ( m_Owner - > GetService ( ) ) ,
m_LastCleanupTime ( i2p : : util : : GetSecondsSinceEpoch ( ) )
2015-11-03 09:15:49 -05:00
{
}
2017-01-28 19:23:14 +03:00
2015-11-03 09:15:49 -05:00
StreamingDestination : : ~ StreamingDestination ( )
{
2016-08-09 01:53:37 +03:00
for ( auto & it : m_SavedPackets )
2016-02-08 14:42:20 -05:00
{
2017-01-10 21:31:52 -05:00
for ( auto it1 : it . second ) DeletePacket ( it1 ) ;
2017-01-28 19:23:14 +03:00
it . second . clear ( ) ;
2016-02-08 14:42:20 -05:00
}
m_SavedPackets . clear ( ) ;
2015-11-03 09:15:49 -05:00
}
2014-10-22 11:46:54 -04:00
void StreamingDestination : : Start ( )
2016-07-28 11:16:29 -04:00
{
2014-10-22 11:46:54 -04:00
}
2017-01-28 19:23:14 +03:00
2014-10-22 11:46:54 -04:00
void StreamingDestination : : Stop ( )
2017-01-28 19:23:14 +03:00
{
2014-10-22 11:46:54 -04:00
ResetAcceptor ( ) ;
2015-12-14 22:23:28 -05:00
m_PendingIncomingTimer . cancel ( ) ;
2017-01-12 14:19:57 -05:00
m_PendingIncomingStreams . clear ( ) ;
2014-10-22 11:46:54 -04:00
{
std : : unique_lock < std : : mutex > l ( m_StreamsMutex ) ;
2020-02-15 16:30:10 -05:00
for ( auto it : m_Streams )
2020-03-01 13:25:50 +03:00
it . second - > Terminate ( false ) ; // we delete here
2014-10-22 11:46:54 -04:00
m_Streams . clear ( ) ;
2019-08-23 10:00:49 -04:00
m_IncomingStreams . clear ( ) ;
2021-11-27 23:30:35 +03:00
m_LastStream = nullptr ;
2016-07-28 11:16:29 -04:00
}
2017-01-28 19:23:14 +03:00
}
2014-10-22 11:46:54 -04:00
void StreamingDestination : : HandleNextPacket ( Packet * packet )
{
uint32_t sendStreamID = packet - > GetSendStreamID ( ) ;
if ( sendStreamID )
2017-01-28 19:23:14 +03:00
{
2021-06-29 19:08:11 -04:00
if ( ! m_LastStream | | sendStreamID ! = m_LastStream - > GetRecvStreamID ( ) )
2021-11-27 23:30:35 +03:00
{
2021-06-29 19:08:11 -04:00
auto it = m_Streams . find ( sendStreamID ) ;
if ( it ! = m_Streams . end ( ) )
m_LastStream = it - > second ;
else
m_LastStream = nullptr ;
}
if ( m_LastStream )
m_LastStream - > HandleNextPacket ( packet ) ;
2020-09-30 17:12:28 -04:00
else if ( packet - > IsEcho ( ) & & m_Owner - > IsStreamingAnswerPings ( ) )
{
// ping
LogPrint ( eLogInfo , " Streaming: Ping received sSID= " , sendStreamID ) ;
auto s = std : : make_shared < Stream > ( m_Owner - > GetService ( ) , * this ) ;
s - > HandlePing ( packet ) ;
2021-11-27 23:30:35 +03:00
}
2014-10-22 11:46:54 -04:00
else
2017-01-28 19:23:14 +03:00
{
2017-01-30 20:36:35 -05:00
LogPrint ( eLogInfo , " Streaming: Unknown stream sSID= " , sendStreamID ) ;
2017-01-10 21:31:52 -05:00
DeletePacket ( packet ) ;
2014-10-22 11:46:54 -04:00
}
2017-01-28 19:23:14 +03:00
}
else
2014-10-22 11:46:54 -04:00
{
2021-09-26 11:20:20 -04:00
if ( packet - > IsEcho ( ) )
{
// pong
LogPrint ( eLogInfo , " Streaming: Pong received rSID= " , packet - > GetReceiveStreamID ( ) ) ;
DeletePacket ( packet ) ;
return ;
2021-11-27 23:30:35 +03:00
}
2015-01-02 10:04:57 -05:00
if ( packet - > IsSYN ( ) & & ! packet - > GetSeqn ( ) ) // new incoming stream
2017-01-28 19:23:14 +03:00
{
2016-07-28 11:42:31 -04:00
uint32_t receiveStreamID = packet - > GetReceiveStreamID ( ) ;
2019-07-26 14:23:21 -04:00
auto it1 = m_IncomingStreams . find ( receiveStreamID ) ;
if ( it1 ! = m_IncomingStreams . end ( ) )
2016-10-27 20:46:05 -04:00
{
// already pending
LogPrint ( eLogWarning , " Streaming: Incoming streaming with rSID= " , receiveStreamID , " already exists " ) ;
2024-05-17 19:19:17 -04:00
it1 - > second - > ResetRoutingPath ( ) ; // Ack was not delivered, changing path
2017-01-10 21:31:52 -05:00
DeletePacket ( packet ) ; // drop it, because previous should be connected
2016-10-27 20:46:05 -04:00
return ;
2017-01-28 19:23:14 +03:00
}
2024-11-11 14:43:20 -05:00
if ( m_Owner - > GetStreamingMaxConcurrentStreams ( ) > 0 & & ( int ) m_Streams . size ( ) > m_Owner - > GetStreamingMaxConcurrentStreams ( ) )
2024-11-10 16:49:44 -05:00
{
2024-11-11 13:41:27 -05:00
LogPrint ( eLogWarning , " Streaming: Number of streams exceeds " , m_Owner - > GetStreamingMaxConcurrentStreams ( ) ) ;
2024-11-10 16:49:44 -05:00
DeletePacket ( packet ) ;
return ;
}
2019-07-26 14:23:21 -04:00
auto incomingStream = CreateNewIncomingStream ( receiveStreamID ) ;
2016-07-28 11:42:31 -04:00
incomingStream - > HandleNextPacket ( packet ) ; // SYN
2024-11-10 20:15:50 -05:00
if ( ! incomingStream - > GetRemoteLeaseSet ( ) )
{
LogPrint ( eLogWarning , " Streaming: No remote LeaseSet for incoming stream. Terminated " ) ;
incomingStream - > Terminate ( ) ; // can't send FIN anyway
return ;
}
2017-01-28 19:23:14 +03:00
2016-07-28 11:37:33 -04:00
// handle saved packets if any
{
auto it = m_SavedPackets . find ( receiveStreamID ) ;
if ( it ! = m_SavedPackets . end ( ) )
{
LogPrint ( eLogDebug , " Streaming: Processing " , it - > second . size ( ) , " saved packets for rSID= " , receiveStreamID ) ;
for ( auto it1 : it - > second )
incomingStream - > HandleNextPacket ( it1 ) ;
m_SavedPackets . erase ( it ) ;
2017-01-28 19:23:14 +03:00
}
2016-07-28 11:37:33 -04:00
}
2016-02-08 14:42:20 -05:00
// accept
2015-01-02 10:04:57 -05:00
if ( m_Acceptor ! = nullptr )
m_Acceptor ( incomingStream ) ;
else
{
2015-12-18 13:33:58 +00:00
LogPrint ( eLogWarning , " Streaming: Acceptor for incoming stream is not set " ) ;
2015-12-14 22:23:28 -05:00
if ( m_PendingIncomingStreams . size ( ) < MAX_PENDING_INCOMING_BACKLOG )
{
m_PendingIncomingStreams . push_back ( incomingStream ) ;
m_PendingIncomingTimer . cancel ( ) ;
m_PendingIncomingTimer . expires_from_now ( boost : : posix_time : : seconds ( PENDING_INCOMING_TIMEOUT ) ) ;
2017-01-28 19:23:14 +03:00
m_PendingIncomingTimer . async_wait ( std : : bind ( & StreamingDestination : : HandlePendingIncomingTimer ,
2016-02-08 14:42:20 -05:00
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
2016-06-29 01:00:00 +00:00
LogPrint ( eLogDebug , " Streaming: Pending incoming stream added, rSID= " , receiveStreamID ) ;
2015-12-14 22:23:28 -05:00
}
else
2017-01-28 19:23:14 +03:00
{
2015-12-18 13:33:58 +00:00
LogPrint ( eLogWarning , " Streaming: Pending incoming streams backlog exceeds " , MAX_PENDING_INCOMING_BACKLOG ) ;
2015-12-14 22:23:28 -05:00
incomingStream - > Close ( ) ;
2017-01-28 19:23:14 +03:00
}
}
}
2015-01-02 10:04:57 -05:00
else // follow on packet without SYN
{
uint32_t receiveStreamID = packet - > GetReceiveStreamID ( ) ;
2019-07-26 14:23:21 -04:00
auto it1 = m_IncomingStreams . find ( receiveStreamID ) ;
if ( it1 ! = m_IncomingStreams . end ( ) )
{
// found
it1 - > second - > HandleNextPacket ( packet ) ;
return ;
}
2016-02-08 14:42:20 -05:00
// save follow on packet
auto it = m_SavedPackets . find ( receiveStreamID ) ;
if ( it ! = m_SavedPackets . end ( ) )
it - > second . push_back ( packet ) ;
else
{
2016-02-10 00:00:00 +00:00
m_SavedPackets [ receiveStreamID ] = std : : list < Packet * > { packet } ;
2016-02-08 14:42:20 -05:00
auto timer = std : : make_shared < boost : : asio : : deadline_timer > ( m_Owner - > GetService ( ) ) ;
timer - > expires_from_now ( boost : : posix_time : : seconds ( PENDING_INCOMING_TIMEOUT ) ) ;
auto s = shared_from_this ( ) ;
timer - > async_wait ( [ s , timer , receiveStreamID ] ( const boost : : system : : error_code & ecode )
{
2016-02-08 15:47:39 -05:00
if ( ecode ! = boost : : asio : : error : : operation_aborted )
2016-02-08 14:42:20 -05:00
{
auto it = s - > m_SavedPackets . find ( receiveStreamID ) ;
if ( it ! = s - > m_SavedPackets . end ( ) )
{
2017-01-10 21:31:52 -05:00
for ( auto it1 : it - > second ) s - > DeletePacket ( it1 ) ;
2016-02-08 14:42:20 -05:00
it - > second . clear ( ) ;
s - > m_SavedPackets . erase ( it ) ;
}
}
} ) ;
}
2017-01-28 19:23:14 +03:00
}
}
}
2015-01-27 11:27:58 -05:00
std : : shared_ptr < Stream > StreamingDestination : : CreateNewOutgoingStream ( std : : shared_ptr < const i2p : : data : : LeaseSet > remote , int port )
2014-10-22 11:46:54 -04:00
{
2015-11-03 09:15:49 -05:00
auto s = std : : make_shared < Stream > ( m_Owner - > GetService ( ) , * this , remote , port ) ;
2014-10-22 11:46:54 -04:00
std : : unique_lock < std : : mutex > l ( m_StreamsMutex ) ;
2021-06-29 19:08:11 -04:00
m_Streams . emplace ( s - > GetRecvStreamID ( ) , s ) ;
2014-10-22 11:46:54 -04:00
return s ;
2017-01-28 19:23:14 +03:00
}
2014-10-22 11:46:54 -04:00
2022-05-20 19:56:05 +03:00
void StreamingDestination : : SendPing ( std : : shared_ptr < const i2p : : data : : LeaseSet > remote )
2021-09-26 11:20:20 -04:00
{
auto s = std : : make_shared < Stream > ( m_Owner - > GetService ( ) , * this , remote , 0 ) ;
s - > SendPing ( ) ;
2021-11-27 23:30:35 +03:00
}
2019-07-26 14:23:21 -04:00
std : : shared_ptr < Stream > StreamingDestination : : CreateNewIncomingStream ( uint32_t receiveStreamID )
2014-10-22 11:46:54 -04:00
{
2015-11-03 09:15:49 -05:00
auto s = std : : make_shared < Stream > ( m_Owner - > GetService ( ) , * this ) ;
2014-10-22 11:46:54 -04:00
std : : unique_lock < std : : mutex > l ( m_StreamsMutex ) ;
2021-06-29 19:08:11 -04:00
m_Streams . emplace ( s - > GetRecvStreamID ( ) , s ) ;
m_IncomingStreams . emplace ( receiveStreamID , s ) ;
2014-10-22 11:46:54 -04:00
return s ;
}
2014-11-23 11:33:58 -05:00
void StreamingDestination : : DeleteStream ( std : : shared_ptr < Stream > stream )
2014-10-22 11:46:54 -04:00
{
if ( stream )
2017-01-28 19:23:14 +03:00
{
2014-10-22 11:46:54 -04:00
std : : unique_lock < std : : mutex > l ( m_StreamsMutex ) ;
2019-07-26 14:23:21 -04:00
m_Streams . erase ( stream - > GetRecvStreamID ( ) ) ;
2024-11-10 20:15:50 -05:00
if ( stream - > IsIncoming ( ) )
m_IncomingStreams . erase ( stream - > GetSendStreamID ( ) ) ;
2021-06-29 19:08:11 -04:00
if ( m_LastStream = = stream ) m_LastStream = nullptr ;
2017-01-28 19:23:14 +03:00
}
2024-09-16 13:39:11 -04:00
auto ts = i2p : : util : : GetSecondsSinceEpoch ( ) ;
if ( m_Streams . empty ( ) | | ts > m_LastCleanupTime + STREAMING_DESTINATION_POOLS_CLEANUP_INTERVAL )
2022-01-21 21:34:50 -05:00
{
m_PacketsPool . CleanUp ( ) ;
m_I2NPMsgsPool . CleanUp ( ) ;
2024-09-16 13:39:11 -04:00
m_LastCleanupTime = ts ;
2022-05-20 19:56:05 +03:00
}
2017-01-28 19:23:14 +03:00
}
2014-10-22 14:01:23 -04:00
2020-03-04 15:54:09 -05:00
bool StreamingDestination : : DeleteStream ( uint32_t recvStreamID )
{
auto it = m_Streams . find ( recvStreamID ) ;
if ( it = = m_Streams . end ( ) )
return false ;
2022-05-11 11:44:27 -04:00
auto s = it - > second ;
2024-11-25 16:00:06 -05:00
boost : : asio : : post ( m_Owner - > GetService ( ) , [ this , s ] ( )
2022-05-20 19:56:05 +03:00
{
2022-05-08 11:49:11 -04:00
s - > Close ( ) ; // try to send FIN
s - > Terminate ( false ) ;
DeleteStream ( s ) ;
} ) ;
2020-03-04 15:54:09 -05:00
return true ;
2020-03-01 13:25:50 +03:00
}
2017-01-28 19:23:14 +03:00
void StreamingDestination : : SetAcceptor ( const Acceptor & acceptor )
{
2017-01-12 14:19:57 -05:00
m_Acceptor = acceptor ; // we must set it immediately for IsAcceptorSet
auto s = shared_from_this ( ) ;
2024-11-25 16:00:06 -05:00
boost : : asio : : post ( m_Owner - > GetService ( ) , [ s ] ( void )
2017-01-28 19:23:14 +03:00
{
2017-01-12 14:19:57 -05:00
// take care about incoming queue
for ( auto & it : s - > m_PendingIncomingStreams )
2015-12-14 22:23:28 -05:00
if ( it - > GetStatus ( ) = = eStreamStatusOpen ) // still open?
2017-01-12 14:19:57 -05:00
s - > m_Acceptor ( it ) ;
s - > m_PendingIncomingStreams . clear ( ) ;
s - > m_PendingIncomingTimer . cancel ( ) ;
2017-01-28 19:23:14 +03:00
} ) ;
2015-12-14 22:23:28 -05:00
}
2017-01-28 19:23:14 +03:00
void StreamingDestination : : ResetAcceptor ( )
{
if ( m_Acceptor ) m_Acceptor ( nullptr ) ;
m_Acceptor = nullptr ;
2015-12-14 22:23:28 -05:00
}
2016-12-23 10:09:40 -05:00
void StreamingDestination : : AcceptOnce ( const Acceptor & acceptor )
{
2024-11-25 16:00:06 -05:00
boost : : asio : : post ( m_Owner - > GetService ( ) , [ acceptor , this ] ( void )
2017-01-28 19:23:14 +03:00
{
2016-12-23 10:09:40 -05:00
if ( ! m_PendingIncomingStreams . empty ( ) )
2020-03-01 13:25:50 +03:00
{
2016-12-23 10:09:40 -05:00
acceptor ( m_PendingIncomingStreams . front ( ) ) ;
m_PendingIncomingStreams . pop_front ( ) ;
if ( m_PendingIncomingStreams . empty ( ) )
m_PendingIncomingTimer . cancel ( ) ;
}
else // we must save old acceptor and set it back
{
2018-01-06 11:48:51 +08:00
m_Acceptor = std : : bind ( & StreamingDestination : : AcceptOnceAcceptor , this ,
2017-03-30 20:27:31 -04:00
std : : placeholders : : _1 , acceptor , m_Acceptor ) ;
2016-12-23 10:09:40 -05:00
}
2017-01-28 19:23:14 +03:00
} ) ;
2016-12-23 10:09:40 -05:00
}
2017-03-30 20:27:31 -04:00
void StreamingDestination : : AcceptOnceAcceptor ( std : : shared_ptr < Stream > stream , Acceptor acceptor , Acceptor prev )
{
m_Acceptor = prev ;
acceptor ( stream ) ;
2018-01-06 11:48:51 +08:00
}
2022-11-11 13:31:54 -05:00
std : : shared_ptr < Stream > StreamingDestination : : AcceptStream ( int timeout )
{
std : : shared_ptr < i2p : : stream : : Stream > stream ;
std : : condition_variable streamAccept ;
std : : mutex streamAcceptMutex ;
std : : unique_lock < std : : mutex > l ( streamAcceptMutex ) ;
AcceptOnce (
[ & streamAccept , & streamAcceptMutex , & stream ] ( std : : shared_ptr < i2p : : stream : : Stream > s )
{
stream = s ;
std : : unique_lock < std : : mutex > l ( streamAcceptMutex ) ;
streamAccept . notify_all ( ) ;
} ) ;
if ( timeout )
streamAccept . wait_for ( l , std : : chrono : : seconds ( timeout ) ) ;
2023-01-03 21:25:19 +03:00
else
2022-11-11 13:31:54 -05:00
streamAccept . wait ( l ) ;
return stream ;
2023-01-03 21:25:19 +03:00
}
2015-12-14 22:23:28 -05:00
void StreamingDestination : : HandlePendingIncomingTimer ( const boost : : system : : error_code & ecode )
{
if ( ecode ! = boost : : asio : : error : : operation_aborted )
{
2015-12-18 13:33:58 +00:00
LogPrint ( eLogWarning , " Streaming: Pending incoming timeout expired " ) ;
2016-08-09 01:53:37 +03:00
for ( auto & it : m_PendingIncomingStreams )
2015-12-14 22:23:28 -05:00
it - > Close ( ) ;
m_PendingIncomingStreams . clear ( ) ;
2017-01-28 19:23:14 +03:00
}
}
2014-10-22 15:01:30 -04:00
void StreamingDestination : : HandleDataMessagePayload ( const uint8_t * buf , size_t len )
{
// unzip it
2017-01-10 21:31:52 -05:00
Packet * uncompressed = NewPacket ( ) ;
2014-10-22 15:01:30 -04:00
uncompressed - > offset = 0 ;
2015-11-03 09:15:49 -05:00
uncompressed - > len = m_Inflator . Inflate ( buf , len , uncompressed - > buf , MAX_PACKET_SIZE ) ;
if ( uncompressed - > len )
2017-01-28 19:23:14 +03:00
HandleNextPacket ( uncompressed ) ;
2014-10-22 15:01:30 -04:00
else
2017-01-10 21:31:52 -05:00
DeletePacket ( uncompressed ) ;
2014-10-22 15:01:30 -04:00
}
2016-02-29 14:44:15 -05:00
2020-05-02 11:13:40 -04:00
std : : shared_ptr < I2NPMessage > StreamingDestination : : CreateDataMessage (
2022-10-24 19:21:58 -04:00
const uint8_t * payload , size_t len , uint16_t toPort , bool checksum , bool gzip )
2016-02-29 14:44:15 -05:00
{
2021-09-14 14:48:21 +03:00
size_t size ;
2023-03-26 22:12:43 -04:00
auto msg = ( len < = STREAMING_MTU_RATCHETS ) ? m_I2NPMsgsPool . AcquireShared ( ) : NewI2NPMessage ( ) ;
2016-02-29 14:44:15 -05:00
uint8_t * buf = msg - > GetPayload ( ) ;
buf + = 4 ; // reserve for lengthlength
msg - > len + = 4 ;
2021-09-14 14:48:21 +03:00
2022-10-24 19:21:58 -04:00
if ( m_Gzip | | gzip )
size = m_Deflator . Deflate ( payload , len , buf , msg - > maxLen - msg - > len ) ;
2021-09-14 14:48:21 +03:00
else
size = i2p : : data : : GzipNoCompression ( payload , len , buf , msg - > maxLen - msg - > len ) ;
2016-02-29 14:44:15 -05:00
if ( size )
{
htobe32buf ( msg - > GetPayload ( ) , size ) ; // length
htobe16buf ( buf + 4 , m_LocalPort ) ; // source port
2017-01-28 19:23:14 +03:00
htobe16buf ( buf + 6 , toPort ) ; // destination port
2016-02-29 14:44:15 -05:00
buf [ 9 ] = i2p : : client : : PROTOCOL_TYPE_STREAMING ; // streaming protocol
2017-01-28 19:23:14 +03:00
msg - > len + = size ;
2020-05-02 11:13:40 -04:00
msg - > FillI2NPMessageHeader ( eI2NPData , 0 , checksum ) ;
2017-01-28 19:23:14 +03:00
}
2016-02-29 14:44:15 -05:00
else
msg = nullptr ;
return msg ;
2016-07-28 11:16:29 -04:00
}
2024-08-31 08:02:56 -04:00
uint32_t StreamingDestination : : GetRandom ( )
{
if ( m_Owner )
{
auto pool = m_Owner - > GetTunnelPool ( ) ;
if ( pool )
return pool - > GetRng ( ) ( ) ;
}
return rand ( ) ;
}
2017-01-28 19:23:14 +03:00
}
}