@ -25,7 +25,7 @@ namespace transport
{
{
template < typename Keys >
template < typename Keys >
EphemeralKeysSupplier < Keys > : : EphemeralKeysSupplier ( int size ) :
EphemeralKeysSupplier < Keys > : : EphemeralKeysSupplier ( int size ) :
m_QueueSize ( size ) , m_IsRunning ( false ) , m_Thread ( nullptr )
m_QueueSize ( size ) , m_IsRunning ( false )
{
{
}
}
@ -39,7 +39,7 @@ namespace transport
void EphemeralKeysSupplier < Keys > : : Start ( )
void EphemeralKeysSupplier < Keys > : : Start ( )
{
{
m_IsRunning = true ;
m_IsRunning = true ;
m_Thread = new std : : thread ( std : : bind ( & EphemeralKeysSupplier < Keys > : : Run , this ) ) ;
m_Thread . reset ( new std : : thread ( std : : bind ( & EphemeralKeysSupplier < Keys > : : Run , this ) ) ) ;
}
}
template < typename Keys >
template < typename Keys >
@ -53,8 +53,7 @@ namespace transport
if ( m_Thread )
if ( m_Thread )
{
{
m_Thread - > join ( ) ;
m_Thread - > join ( ) ;
delete m_Thread ;
m_Thread = nullptr ;
m_Thread = 0 ;
}
}
}
}
@ -66,18 +65,19 @@ namespace transport
while ( m_IsRunning )
while ( m_IsRunning )
{
{
int num , total = 0 ;
int num , total = 0 ;
while ( ( num = m_QueueSize - ( int ) m_Queue . size ( ) ) > 0 & & total < 10 )
while ( ( num = m_QueueSize - ( int ) m_Queue . size ( ) ) > 0 & & total < m_QueueSize )
{
{
CreateEphemeralKeys ( num ) ;
CreateEphemeralKeys ( num ) ;
total + = num ;
total + = num ;
}
}
if ( total > = 10 )
if ( total > m_QueueSize )
{
{
LogPrint ( eLogWarning , " Transports: " , total , " ephemeral keys generated at the time " ) ;
LogPrint ( eLogWarning , " Transports: " , total , " ephemeral keys generated at the time " ) ;
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 1 ) ) ; // take a break
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 1 ) ) ; // take a break
}
}
else
else
{
{
m_KeysPool . CleanUpMt ( ) ;
std : : unique_lock < std : : mutex > l ( m_AcquiredMutex ) ;
std : : unique_lock < std : : mutex > l ( m_AcquiredMutex ) ;
if ( ! m_IsRunning ) break ;
if ( ! m_IsRunning ) break ;
m_Acquired . wait ( l ) ; // wait for element gets acquired
m_Acquired . wait ( l ) ; // wait for element gets acquired
@ -92,7 +92,7 @@ namespace transport
{
{
for ( int i = 0 ; i < num ; i + + )
for ( int i = 0 ; i < num ; i + + )
{
{
auto pair = std : : make_shared < Keys > ( ) ;
auto pair = m_KeysPool . AcquireSharedMt ( ) ;
pair - > GenerateKeys ( ) ;
pair - > GenerateKeys ( ) ;
std : : unique_lock < std : : mutex > l ( m_AcquiredMutex ) ;
std : : unique_lock < std : : mutex > l ( m_AcquiredMutex ) ;
m_Queue . push ( pair ) ;
m_Queue . push ( pair ) ;
@ -114,7 +114,7 @@ namespace transport
}
}
}
}
// queue is empty, create new
// queue is empty, create new
auto pair = std : : make_shared < Keys > ( ) ;
auto pair = m_KeysPool . AcquireSharedMt ( ) ;
pair - > GenerateKeys ( ) ;
pair - > GenerateKeys ( ) ;
return pair ;
return pair ;
}
}
@ -124,12 +124,12 @@ namespace transport
{
{
if ( pair )
if ( pair )
{
{
std : : unique_lock < std : : mutex > l ( m_AcquiredMutex ) ;
std : : unique_lock < std : : mutex > l ( m_AcquiredMutex ) ;
if ( ( int ) m_Queue . size ( ) < 2 * m_QueueSize )
if ( ( int ) m_Queue . size ( ) < 2 * m_QueueSize )
m_Queue . push ( pair ) ;
m_Queue . push ( pair ) ;
}
}
else
else
LogPrint ( eLogError , " Transports: Return null DHK eys " ) ;
LogPrint ( eLogError , " Transports: Return null k eys " ) ;
}
}
void Peer : : UpdateParams ( std : : shared_ptr < const i2p : : data : : RouterInfo > router )
void Peer : : UpdateParams ( std : : shared_ptr < const i2p : : data : : RouterInfo > router )
@ -149,7 +149,7 @@ namespace transport
m_IsOnline ( true ) , m_IsRunning ( false ) , m_IsNAT ( true ) , m_CheckReserved ( true ) , m_Thread ( nullptr ) ,
m_IsOnline ( true ) , m_IsRunning ( false ) , m_IsNAT ( true ) , m_CheckReserved ( true ) , m_Thread ( nullptr ) ,
m_Service ( nullptr ) , m_Work ( nullptr ) , m_PeerCleanupTimer ( nullptr ) , m_PeerTestTimer ( nullptr ) ,
m_Service ( nullptr ) , m_Work ( nullptr ) , m_PeerCleanupTimer ( nullptr ) , m_PeerTestTimer ( nullptr ) ,
m_UpdateBandwidthTimer ( nullptr ) , m_SSU2Server ( nullptr ) , m_NTCP2Server ( nullptr ) ,
m_UpdateBandwidthTimer ( nullptr ) , m_SSU2Server ( nullptr ) , m_NTCP2Server ( nullptr ) ,
m_X25519KeysPairSupplier ( 15 ) , // 15 pre-generated keys
m_X25519KeysPairSupplier ( NUM_X25519_PRE_GENERATED_KEYS ) ,
m_TotalSentBytes ( 0 ) , m_TotalReceivedBytes ( 0 ) , m_TotalTransitTransmittedBytes ( 0 ) ,
m_TotalSentBytes ( 0 ) , m_TotalReceivedBytes ( 0 ) , m_TotalTransitTransmittedBytes ( 0 ) ,
m_InBandwidth ( 0 ) , m_OutBandwidth ( 0 ) , m_TransitBandwidth ( 0 ) ,
m_InBandwidth ( 0 ) , m_OutBandwidth ( 0 ) , m_TransitBandwidth ( 0 ) ,
m_InBandwidth15s ( 0 ) , m_OutBandwidth15s ( 0 ) , m_TransitBandwidth15s ( 0 ) ,
m_InBandwidth15s ( 0 ) , m_OutBandwidth15s ( 0 ) , m_TransitBandwidth15s ( 0 ) ,
@ -450,15 +450,25 @@ namespace transport
void Transports : : SendMessage ( const i2p : : data : : IdentHash & ident , std : : shared_ptr < i2p : : I2NPMessage > msg )
void Transports : : SendMessage ( const i2p : : data : : IdentHash & ident , std : : shared_ptr < i2p : : I2NPMessage > msg )
{
{
if ( m_IsOnline )
if ( m_IsOnline )
SendMessages ( ident , std : : vector < std : : shared_ptr < i2p : : I2NPMessage > > { msg } ) ;
SendMessages ( ident , { msg } ) ;
}
}
void Transports : : SendMessages ( const i2p : : data : : IdentHash & ident , const std : : vector < std : : shared_ptr < i2p : : I2NPMessage > > & msgs )
void Transports : : SendMessages ( const i2p : : data : : IdentHash & ident , std : : list < std : : shared_ptr < i2p : : I2NPMessage > > & msgs )
{
{
m_Service - > post ( std : : bind ( & Transports : : PostMessages , this , ident , msgs ) ) ;
std : : list < std : : shared_ptr < i2p : : I2NPMessage > > msgs1 ;
msgs . swap ( msgs1 ) ;
SendMessages ( ident , std : : move ( msgs1 ) ) ;
}
}
void Transports : : PostMessages ( i2p : : data : : IdentHash ident , std : : vector < std : : shared_ptr < i2p : : I2NPMessage > > msgs )
void Transports : : SendMessages ( const i2p : : data : : IdentHash & ident , std : : list < std : : shared_ptr < i2p : : I2NPMessage > > & & msgs )
{
m_Service - > post ( [ this , ident , msgs = std : : move ( msgs ) ] ( ) mutable
{
PostMessages ( ident , msgs ) ;
} ) ;
}
void Transports : : PostMessages ( const i2p : : data : : IdentHash & ident , std : : list < std : : shared_ptr < i2p : : I2NPMessage > > & msgs )
{
{
if ( ident = = i2p : : context . GetRouterInfo ( ) . GetIdentHash ( ) )
if ( ident = = i2p : : context . GetRouterInfo ( ) . GetIdentHash ( ) )
{
{
@ -470,8 +480,13 @@ namespace transport
}
}
if ( RoutesRestricted ( ) & & ! IsRestrictedPeer ( ident ) ) return ;
if ( RoutesRestricted ( ) & & ! IsRestrictedPeer ( ident ) ) return ;
std : : shared_ptr < Peer > peer ;
std : : shared_ptr < Peer > peer ;
{
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
auto it = m_Peers . find ( ident ) ;
auto it = m_Peers . find ( ident ) ;
if ( it = = m_Peers . end ( ) )
if ( it ! = m_Peers . end ( ) )
peer = it - > second ;
}
if ( ! peer )
{
{
// check if not banned
// check if not banned
if ( i2p : : data : : IsRouterBanned ( ident ) ) return ; // don't create peer to unreachable router
if ( i2p : : data : : IsRouterBanned ( ident ) ) return ; // don't create peer to unreachable router
@ -481,10 +496,10 @@ namespace transport
{
{
auto r = netdb . FindRouter ( ident ) ;
auto r = netdb . FindRouter ( ident ) ;
if ( r & & ( r - > IsUnreachable ( ) | | ! r - > IsReachableFrom ( i2p : : context . GetRouterInfo ( ) ) ) ) return ; // router found but non-reachable
if ( r & & ( r - > IsUnreachable ( ) | | ! r - > IsReachableFrom ( i2p : : context . GetRouterInfo ( ) ) ) ) return ; // router found but non-reachable
peer = std : : make_shared < Peer > ( r , i2p : : util : : GetSecondsSinceEpoch ( ) ) ;
{
{
auto ts = i2p : : util : : GetSecondsSinceEpoch ( ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
peer = std : : make_shared < Peer > ( r , ts ) ;
std : : unique_lock < std : : mutex > l ( m_PeersMutex ) ;
peer = m_Peers . emplace ( ident , peer ) . first - > second ;
peer = m_Peers . emplace ( ident , peer ) . first - > second ;
}
}
if ( peer )
if ( peer )
@ -496,8 +511,6 @@ namespace transport
}
}
if ( ! connected ) return ;
if ( ! connected ) return ;
}
}
else
peer = it - > second ;
if ( ! peer ) return ;
if ( ! peer ) return ;
if ( peer - > IsConnected ( ) )
if ( peer - > IsConnected ( ) )
@ -512,22 +525,27 @@ namespace transport
if ( i2p : : data : : IsRouterBanned ( ident ) )
if ( i2p : : data : : IsRouterBanned ( ident ) )
{
{
LogPrint ( eLogWarning , " Transports: Router " , ident . ToBase64 ( ) , " is banned. Peer dropped " ) ;
LogPrint ( eLogWarning , " Transports: Router " , ident . ToBase64 ( ) , " is banned. Peer dropped " ) ;
std : : unique_ lock< std : : mutex > l ( m_PeersMutex ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
m_Peers . erase ( ident ) ;
m_Peers . erase ( ident ) ;
return ;
return ;
}
}
}
}
if ( sz > MAX_NUM_DELAYED_MESSAGES / 2 )
{
for ( auto & it1 : msgs )
for ( auto & it1 : msgs )
if ( sz > MAX_NUM_DELAYED_MESSAGES / 2 & & it1 - > onDrop )
if ( it1 - > onDrop )
it1 - > Drop ( ) ; // drop earlier because we can handle it
it1 - > Drop ( ) ; // drop earlier because we can handle it
else
else
peer - > delayedMessages . push_back ( it1 ) ;
peer - > delayedMessages . push_back ( it1 ) ;
}
}
else
peer - > delayedMessages . splice ( peer - > delayedMessages . end ( ) , msgs ) ;
}
else
else
{
{
LogPrint ( eLogWarning , " Transports: Delayed messages queue size to " ,
LogPrint ( eLogWarning , " Transports: Delayed messages queue size to " ,
ident . ToBase64 ( ) , " exceeds " , MAX_NUM_DELAYED_MESSAGES ) ;
ident . ToBase64 ( ) , " exceeds " , MAX_NUM_DELAYED_MESSAGES ) ;
std : : unique_ lock< std : : mutex > l ( m_PeersMutex ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
m_Peers . erase ( ident ) ;
m_Peers . erase ( ident ) ;
}
}
}
}
@ -602,7 +620,7 @@ namespace transport
if ( ! i2p : : context . IsLimitedConnectivity ( ) & & peer - > router - > IsReachableFrom ( i2p : : context . GetRouterInfo ( ) ) )
if ( ! i2p : : context . IsLimitedConnectivity ( ) & & peer - > router - > IsReachableFrom ( i2p : : context . GetRouterInfo ( ) ) )
i2p : : data : : netdb . SetUnreachable ( ident , true ) ; // we are here because all connection attempts failed but router claimed them
i2p : : data : : netdb . SetUnreachable ( ident , true ) ; // we are here because all connection attempts failed but router claimed them
peer - > Done ( ) ;
peer - > Done ( ) ;
std : : unique_ lock< std : : mutex > l ( m_PeersMutex ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
m_Peers . erase ( ident ) ;
m_Peers . erase ( ident ) ;
return false ;
return false ;
}
}
@ -610,7 +628,7 @@ namespace transport
{
{
LogPrint ( eLogWarning , " Transports: Router " , ident . ToBase64 ( ) , " is banned. Peer dropped " ) ;
LogPrint ( eLogWarning , " Transports: Router " , ident . ToBase64 ( ) , " is banned. Peer dropped " ) ;
peer - > Done ( ) ;
peer - > Done ( ) ;
std : : unique_ lock< std : : mutex > l ( m_PeersMutex ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
m_Peers . erase ( ident ) ;
m_Peers . erase ( ident ) ;
return false ;
return false ;
}
}
@ -706,23 +724,29 @@ namespace transport
void Transports : : HandleRequestComplete ( std : : shared_ptr < const i2p : : data : : RouterInfo > r , i2p : : data : : IdentHash ident )
void Transports : : HandleRequestComplete ( std : : shared_ptr < const i2p : : data : : RouterInfo > r , i2p : : data : : IdentHash ident )
{
{
std : : shared_ptr < Peer > peer ;
{
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
auto it = m_Peers . find ( ident ) ;
auto it = m_Peers . find ( ident ) ;
if ( it ! = m_Peers . end ( ) )
if ( it ! = m_Peers . end ( ) )
{
{
if ( r )
if ( r )
{
peer = it - > second ;
LogPrint ( eLogDebug , " Transports: RouterInfo for " , ident . ToBase64 ( ) , " found, trying to connect " ) ;
it - > second - > SetRouter ( r ) ;
if ( ! it - > second - > IsConnected ( ) )
ConnectToPeer ( ident , it - > second ) ;
}
else
else
{
LogPrint ( eLogWarning , " Transports: RouterInfo not found, failed to send messages " ) ;
std : : unique_lock < std : : mutex > l ( m_PeersMutex ) ;
m_Peers . erase ( it ) ;
m_Peers . erase ( it ) ;
}
}
}
}
if ( peer & & ! peer - > router & & r )
{
LogPrint ( eLogDebug , " Transports: RouterInfo for " , ident . ToBase64 ( ) , " found, trying to connect " ) ;
peer - > SetRouter ( r ) ;
if ( ! peer - > IsConnected ( ) )
ConnectToPeer ( ident , peer ) ;
}
else if ( ! r )
LogPrint ( eLogInfo , " Transports: RouterInfo not found, failed to send messages " ) ;
}
}
void Transports : : DetectExternalIP ( )
void Transports : : DetectExternalIP ( )
@ -865,7 +889,7 @@ namespace transport
if ( it - > second - > delayedMessages . size ( ) > 0 )
if ( it - > second - > delayedMessages . size ( ) > 0 )
{
{
// check if first message is our DatabaseStore (publishing)
// check if first message is our DatabaseStore (publishing)
auto firstMsg = peer - > delayedMessages [ 0 ] ;
auto firstMsg = peer - > delayedMessages . front ( ) ;
if ( firstMsg & & firstMsg - > GetTypeID ( ) = = eI2NPDatabaseStore & &
if ( firstMsg & & firstMsg - > GetTypeID ( ) = = eI2NPDatabaseStore & &
i2p : : data : : IdentHash ( firstMsg - > GetPayload ( ) + DATABASE_STORE_KEY_OFFSET ) = = i2p : : context . GetIdentHash ( ) )
i2p : : data : : IdentHash ( firstMsg - > GetPayload ( ) + DATABASE_STORE_KEY_OFFSET ) = = i2p : : context . GetIdentHash ( ) )
sendDatabaseStore = false ; // we have it in the list already
sendDatabaseStore = false ; // we have it in the list already
@ -875,8 +899,7 @@ namespace transport
else
else
session - > SetTerminationTimeout ( 10 ) ; // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds
session - > SetTerminationTimeout ( 10 ) ; // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds
peer - > sessions . push_back ( session ) ;
peer - > sessions . push_back ( session ) ;
session - > SendI2NPMessages ( peer - > delayedMessages ) ;
session - > SendI2NPMessages ( peer - > delayedMessages ) ; // send and clear
peer - > delayedMessages . clear ( ) ;
}
}
else // incoming connection or peer test
else // incoming connection or peer test
{
{
@ -887,14 +910,17 @@ namespace transport
return ;
return ;
}
}
if ( ! session - > IsOutgoing ( ) ) // incoming
if ( ! session - > IsOutgoing ( ) ) // incoming
session - > SendI2NPMessages ( { CreateDatabaseStoreMsg ( ) } ) ; // send DatabaseStore
{
std : : list < std : : shared_ptr < I2NPMessage > > msgs { CreateDatabaseStoreMsg ( ) } ;
session - > SendI2NPMessages ( msgs ) ; // send DatabaseStore
}
auto r = i2p : : data : : netdb . FindRouter ( ident ) ; // router should be in netdb after SessionConfirmed
auto r = i2p : : data : : netdb . FindRouter ( ident ) ; // router should be in netdb after SessionConfirmed
if ( r ) r - > GetProfile ( ) - > Connected ( ) ;
if ( r ) r - > GetProfile ( ) - > Connected ( ) ;
auto ts = i2p : : util : : GetSecondsSinceEpoch ( ) ;
auto ts = i2p : : util : : GetSecondsSinceEpoch ( ) ;
auto peer = std : : make_shared < Peer > ( r , ts ) ;
auto peer = std : : make_shared < Peer > ( r , ts ) ;
peer - > sessions . push_back ( session ) ;
peer - > sessions . push_back ( session ) ;
peer - > router = nullptr ;
peer - > router = nullptr ;
std : : unique_ lock< std : : mutex > l ( m_PeersMutex ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
m_Peers . emplace ( ident , peer ) ;
m_Peers . emplace ( ident , peer ) ;
}
}
} ) ;
} ) ;
@ -923,7 +949,7 @@ namespace transport
}
}
else
else
{
{
std : : unique_ lock< std : : mutex > l ( m_PeersMutex ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
m_Peers . erase ( it ) ;
m_Peers . erase ( it ) ;
}
}
}
}
@ -933,9 +959,13 @@ namespace transport
bool Transports : : IsConnected ( const i2p : : data : : IdentHash & ident ) const
bool Transports : : IsConnected ( const i2p : : data : : IdentHash & ident ) const
{
{
std : : unique_lock < std : : mutex > l ( m_PeersMutex ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
# if __cplusplus >= 202002L // C++20
return m_Peers . contains ( ident ) ;
# else
auto it = m_Peers . find ( ident ) ;
auto it = m_Peers . find ( ident ) ;
return it ! = m_Peers . end ( ) ;
return it ! = m_Peers . end ( ) ;
# endif
}
}
void Transports : : HandlePeerCleanupTimer ( const boost : : system : : error_code & ecode )
void Transports : : HandlePeerCleanupTimer ( const boost : : system : : error_code & ecode )
@ -959,7 +989,7 @@ namespace transport
auto profile = i2p : : data : : GetRouterProfile ( it - > first ) ;
auto profile = i2p : : data : : GetRouterProfile ( it - > first ) ;
if ( profile ) profile - > Unreachable ( ) ;
if ( profile ) profile - > Unreachable ( ) ;
} */
} */
std : : unique_ lock< std : : mutex > l ( m_PeersMutex ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
it = m_Peers . erase ( it ) ;
it = m_Peers . erase ( it ) ;
}
}
else
else
@ -1009,7 +1039,7 @@ namespace transport
{
{
uint16_t inds [ 3 ] ;
uint16_t inds [ 3 ] ;
RAND_bytes ( ( uint8_t * ) inds , sizeof ( inds ) ) ;
RAND_bytes ( ( uint8_t * ) inds , sizeof ( inds ) ) ;
std : : unique_ lock< std : : mutex > l ( m_PeersMutex ) ;
std : : lock_guard < std : : mutex > l ( m_PeersMutex ) ;
auto count = m_Peers . size ( ) ;
auto count = m_Peers . size ( ) ;
if ( count = = 0 ) return nullptr ;
if ( count = = 0 ) return nullptr ;
inds [ 0 ] % = count ;
inds [ 0 ] % = count ;