@ -903,11 +903,7 @@ namespace stream
@@ -903,11 +903,7 @@ namespace stream
StreamingDestination : : StreamingDestination ( std : : shared_ptr < i2p : : client : : ClientDestination > owner , uint16_t localPort , bool gzip ) :
m_Owner ( owner ) , m_LocalPort ( localPort ) , m_Gzip ( gzip ) ,
m_LastIncomingReceiveStreamID ( 0 ) ,
m_PendingIncomingTimer ( m_Owner - > GetService ( ) ) ,
m_ConnTrackTimer ( m_Owner - > GetService ( ) ) ,
m_ConnsPerMinute ( DEFAULT_MAX_CONNS_PER_MIN ) ,
m_LastBanClear ( i2p : : util : : GetMillisecondsSinceEpoch ( ) ) ,
m_EnableDrop ( false )
m_PendingIncomingTimer ( m_Owner - > GetService ( ) )
{
}
@ -923,7 +919,6 @@ namespace stream
@@ -923,7 +919,6 @@ namespace stream
void StreamingDestination : : Start ( )
{
ScheduleConnTrack ( ) ;
}
void StreamingDestination : : Stop ( )
@ -931,15 +926,10 @@ namespace stream
@@ -931,15 +926,10 @@ namespace stream
ResetAcceptor ( ) ;
m_PendingIncomingTimer . cancel ( ) ;
m_PendingIncomingStreams . clear ( ) ;
m_ConnTrackTimer . cancel ( ) ;
{
std : : unique_lock < std : : mutex > l ( m_StreamsMutex ) ;
m_Streams . clear ( ) ;
}
{
std : : unique_lock < std : : mutex > l ( m_ConnsMutex ) ;
m_Conns . clear ( ) ;
}
}
void StreamingDestination : : HandleNextPacket ( Packet * packet )
@ -971,17 +961,7 @@ namespace stream
@@ -971,17 +961,7 @@ namespace stream
auto incomingStream = CreateNewIncomingStream ( ) ;
incomingStream - > HandleNextPacket ( packet ) ; // SYN
auto ident = incomingStream - > GetRemoteIdentity ( ) ;
if ( ident & & m_EnableDrop )
{
auto ih = ident - > GetIdentHash ( ) ;
if ( DropNewStream ( ih ) )
{
// drop
LogPrint ( eLogWarning , " Streaming: Dropping connection, too many inbound streams from " , ih . ToBase32 ( ) ) ;
incomingStream - > Terminate ( ) ;
return ;
}
}
m_LastIncomingReceiveStreamID = receiveStreamID ;
// handle saved packets if any
@ -1176,63 +1156,5 @@ namespace stream
@@ -1176,63 +1156,5 @@ namespace stream
return msg ;
}
void StreamingDestination : : SetMaxConnsPerMinute ( const uint32_t conns )
{
m_EnableDrop = conns > 0 ;
m_ConnsPerMinute = conns ;
LogPrint ( eLogDebug , " Streaming: Set max conns per minute per destination to " , conns ) ;
}
bool StreamingDestination : : DropNewStream ( const i2p : : data : : IdentHash & ih )
{
std : : lock_guard < std : : mutex > lock ( m_ConnsMutex ) ;
if ( m_Banned . size ( ) > MAX_BANNED_CONNS ) return true ; // overload
auto end = std : : end ( m_Banned ) ;
if ( std : : find ( std : : begin ( m_Banned ) , end , ih ) ! = end ) return true ; // already banned
auto itr = m_Conns . find ( ih ) ;
if ( itr = = m_Conns . end ( ) )
m_Conns [ ih ] = 0 ;
m_Conns [ ih ] + = 1 ;
bool ban = m_Conns [ ih ] > = m_ConnsPerMinute ;
if ( ban )
{
m_Banned . push_back ( ih ) ;
m_Conns . erase ( ih ) ;
LogPrint ( eLogWarning , " Streaming: ban " , ih . ToBase32 ( ) ) ;
}
return ban ;
}
void StreamingDestination : : HandleConnTrack ( const boost : : system : : error_code & ecode )
{
if ( ecode ! = boost : : asio : : error : : operation_aborted )
{
{ // acquire lock
std : : lock_guard < std : : mutex > lock ( m_ConnsMutex ) ;
// clear conn tracking
m_Conns . clear ( ) ;
// check for ban clear
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
if ( ts - m_LastBanClear > = DEFAULT_BAN_INTERVAL )
{
// clear bans
m_Banned . clear ( ) ;
m_LastBanClear = ts ;
}
}
// reschedule timer
ScheduleConnTrack ( ) ;
}
}
void StreamingDestination : : ScheduleConnTrack ( )
{
m_ConnTrackTimer . expires_from_now ( boost : : posix_time : : seconds ( 60 ) ) ;
m_ConnTrackTimer . async_wait (
std : : bind ( & StreamingDestination : : HandleConnTrack ,
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
}
}
}