@ -15,8 +15,8 @@ namespace i2p
@@ -15,8 +15,8 @@ namespace i2p
{
namespace client
{
SAMSocket : : SAMSocket ( SAMBridge & owner , std : : shared_ptr < Socket_t > socket ) :
m_Owner ( owner ) , m_Socket ( socket ) , m_Timer ( m_Owner . GetService ( ) ) ,
SAMSocket : : SAMSocket ( SAMBridge & owner ) :
m_Owner ( owner ) , m_Socket ( owner . GetService ( ) ) , m_Timer ( m_Owner . GetService ( ) ) ,
m_BufferOffset ( 0 ) ,
m_SocketType ( eSAMSocketTypeUnknown ) , m_IsSilent ( false ) ,
m_IsAccepting ( false ) , m_Stream ( nullptr )
@ -25,51 +25,17 @@ namespace client
@@ -25,51 +25,17 @@ namespace client
SAMSocket : : ~ SAMSocket ( )
{
if ( m_Stream )
{
m_Stream - > Close ( ) ;
m_Stream . reset ( ) ;
}
auto Session = m_Owner . FindSession ( m_ID ) ;
switch ( m_SocketType )
{
case eSAMSocketTypeSession :
m_Owner . CloseSession ( m_ID ) ;
break ;
case eSAMSocketTypeStream :
{
if ( Session )
Session - > DelSocket ( this ) ;
break ;
}
case eSAMSocketTypeAcceptor :
{
if ( Session )
{
Session - > DelSocket ( this ) ;
if ( m_IsAccepting & & Session - > localDestination )
Session - > localDestination - > StopAcceptingStreams ( ) ;
}
break ;
}
default :
;
}
m_SocketType = eSAMSocketTypeTerminated ;
if ( m_Socket & & m_Socket - > is_open ( ) ) m_Socket - > close ( ) ;
m_Socket . reset ( ) ;
m_Stream = nullptr ;
}
void SAMSocket : : Terminate ( const char * reason )
{
if ( m_Stream )
{
m_Stream - > Close ( ) ;
m_Stream . reset ( ) ;
m_Stream - > AsyncClose ( ) ;
m_Stream = nullptr ;
}
auto Session = m_Owner . FindSession ( m_ID ) ;
switch ( m_SocketType )
{
case eSAMSocketTypeSession :
@ -77,15 +43,12 @@ namespace client
@@ -77,15 +43,12 @@ namespace client
break ;
case eSAMSocketTypeStream :
{
if ( Session )
Session - > DelSocket ( this ) ;
break ;
}
case eSAMSocketTypeAcceptor :
{
if ( Session )
{
Session - > DelSocket ( this ) ;
if ( m_IsAccepting & & Session - > localDestination )
Session - > localDestination - > StopAcceptingStreams ( ) ;
}
@ -95,14 +58,18 @@ namespace client
@@ -95,14 +58,18 @@ namespace client
;
}
m_SocketType = eSAMSocketTypeTerminated ;
if ( m_Socket & & m_Socket - > is_open ( ) ) m_Socket - > close ( ) ;
m_Socket . reset ( ) ;
if ( m_Socket . is_open ( ) )
{
boost : : system : : error_code ec ;
m_Socket . shutdown ( boost : : asio : : ip : : tcp : : socket : : shutdown_both , ec ) ;
m_Socket . close ( ) ;
}
m_Owner . RemoveSocket ( shared_from_this ( ) ) ;
}
void SAMSocket : : ReceiveHandshake ( )
{
if ( m_Socket )
m_Socket - > async_read_some ( boost : : asio : : buffer ( m_Buffer , SAM_SOCKET_BUFFER_SIZE ) ,
m_Socket . async_read_some ( boost : : asio : : buffer ( m_Buffer , SAM_SOCKET_BUFFER_SIZE ) ,
std : : bind ( & SAMSocket : : HandleHandshakeReceived , shared_from_this ( ) ,
std : : placeholders : : _1 , std : : placeholders : : _2 ) ) ;
}
@ -184,7 +151,7 @@ namespace client
@@ -184,7 +151,7 @@ namespace client
# else
size_t l = snprintf ( m_Buffer , SAM_SOCKET_BUFFER_SIZE , SAM_HANDSHAKE_REPLY , version . c_str ( ) ) ;
# endif
boost : : asio : : async_write ( * m_Socket , boost : : asio : : buffer ( m_Buffer , l ) , boost : : asio : : transfer_all ( ) ,
boost : : asio : : async_write ( m_Socket , boost : : asio : : buffer ( m_Buffer , l ) , boost : : asio : : transfer_all ( ) ,
std : : bind ( & SAMSocket : : HandleHandshakeReplySent , shared_from_this ( ) ,
std : : placeholders : : _1 , std : : placeholders : : _2 ) ) ;
}
@ -199,6 +166,11 @@ namespace client
@@ -199,6 +166,11 @@ namespace client
}
}
bool SAMSocket : : IsSession ( const std : : string & id ) const
{
return id = = m_ID ;
}
void SAMSocket : : HandleHandshakeReplySent ( const boost : : system : : error_code & ecode , std : : size_t bytes_transferred )
{
if ( ecode )
@ -207,9 +179,9 @@ namespace client
@@ -207,9 +179,9 @@ namespace client
if ( ecode ! = boost : : asio : : error : : operation_aborted )
Terminate ( " SAM: handshake reply send error " ) ;
}
else if ( m_Socket )
else
{
m_Socket - > async_read_some ( boost : : asio : : buffer ( m_Buffer , SAM_SOCKET_BUFFER_SIZE ) ,
m_Socket . async_read_some ( boost : : asio : : buffer ( m_Buffer , SAM_SOCKET_BUFFER_SIZE ) ,
std : : bind ( & SAMSocket : : HandleMessage , shared_from_this ( ) ,
std : : placeholders : : _1 , std : : placeholders : : _2 ) ) ;
}
@ -220,7 +192,7 @@ namespace client
@@ -220,7 +192,7 @@ namespace client
LogPrint ( eLogDebug , " SAMSocket::SendMessageReply, close= " , close ? " true " : " false " , " reason: " , msg ) ;
if ( ! m_IsSilent )
boost : : asio : : async_write ( * m_Socket , boost : : asio : : buffer ( msg , len ) , boost : : asio : : transfer_all ( ) ,
boost : : asio : : async_write ( m_Socket , boost : : asio : : buffer ( msg , len ) , boost : : asio : : transfer_all ( ) ,
std : : bind ( & SAMSocket : : HandleMessageReplySent , shared_from_this ( ) ,
std : : placeholders : : _1 , std : : placeholders : : _2 , close ) ) ;
else
@ -501,7 +473,6 @@ namespace client
@@ -501,7 +473,6 @@ namespace client
if ( session )
{
m_SocketType = eSAMSocketTypeStream ;
session - > AddSocket ( shared_from_this ( ) ) ;
m_Stream = session - > localDestination - > CreateStream ( remote ) ;
m_Stream - > Send ( ( uint8_t * ) m_Buffer , m_BufferOffset ) ; // connect and send
m_BufferOffset = 0 ;
@ -534,7 +505,6 @@ namespace client
@@ -534,7 +505,6 @@ namespace client
if ( session )
{
m_SocketType = eSAMSocketTypeAcceptor ;
session - > AddSocket ( shared_from_this ( ) ) ;
if ( ! session - > localDestination - > IsAcceptingStreams ( ) )
{
m_IsAccepting = true ;
@ -704,17 +674,9 @@ namespace client
@@ -704,17 +674,9 @@ namespace client
void SAMSocket : : Receive ( )
{
if ( m_BufferOffset > = SAM_SOCKET_BUFFER_SIZE )
{
LogPrint ( eLogError , " SAM: Buffer is full, terminate " ) ;
Terminate ( " Buffer is full " ) ;
return ;
} else if ( m_Socket )
m_Socket - > async_read_some ( boost : : asio : : buffer ( m_Buffer + m_BufferOffset , SAM_SOCKET_BUFFER_SIZE - m_BufferOffset ) ,
m_Socket . async_read_some ( boost : : asio : : buffer ( m_Buffer + m_BufferOffset , SAM_SOCKET_BUFFER_SIZE - m_BufferOffset ) ,
std : : bind ( ( m_SocketType = = eSAMSocketTypeStream ) ? & SAMSocket : : HandleReceived : & SAMSocket : : HandleMessage ,
shared_from_this ( ) , std : : placeholders : : _1 , std : : placeholders : : _2 ) ) ;
else
LogPrint ( eLogError , " SAM: receive with no native socket " ) ;
}
void SAMSocket : : HandleReceived ( const boost : : system : : error_code & ecode , std : : size_t bytes_transferred )
@ -731,15 +693,12 @@ namespace client
@@ -731,15 +693,12 @@ namespace client
{
bytes_transferred + = m_BufferOffset ;
m_BufferOffset = 0 ;
auto s = shared_from_this ( ) ;
m_Stream - > AsyncSend ( ( uint8_t * ) m_Buffer , bytes_transferred ,
[ s ] ( const boost : : system : : error_code & ecode )
{
if ( ! ecode )
s - > m_Owner . GetService ( ) . post ( [ s ] { s - > Receive ( ) ; } ) ;
std : : bind ( & SAMSocket : : HandleStreamSend , shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
}
else
s - > m_Owner . GetService ( ) . post ( [ s ] { s - > Terminate ( " AsyncSend failed " ) ; } ) ;
} ) ;
{
Terminate ( " No Stream Remaining " ) ;
}
}
}
@ -766,21 +725,21 @@ namespace client
@@ -766,21 +725,21 @@ namespace client
WriteI2PDataImmediate ( buff , len ) ;
}
else // no more data
{
delete [ ] buff ;
Terminate ( " no more data " ) ;
}
}
}
}
void SAMSocket : : WriteI2PDataImmediate ( uint8_t * buff , size_t sz )
{
if ( m_Socket )
boost : : asio : : async_write (
* m_Socket ,
m_Socket ,
boost : : asio : : buffer ( buff , sz ) ,
boost : : asio : : transfer_all ( ) ,
std : : bind ( & SAMSocket : : HandleWriteI2PDataImmediate , shared_from_this ( ) , std : : placeholders : : _1 , buff ) ) ; // postpone termination
else
LogPrint ( eLogError , " SAM: no native socket " ) ;
}
void SAMSocket : : HandleWriteI2PDataImmediate ( const boost : : system : : error_code & ec , uint8_t * buff )
@ -790,9 +749,11 @@ namespace client
@@ -790,9 +749,11 @@ namespace client
void SAMSocket : : WriteI2PData ( size_t sz )
{
uint8_t * sendbuff = new uint8_t [ sz ] ;
memcpy ( sendbuff , m_StreamBuffer , sz ) ;
WriteI2PDataImmediate ( sendbuff , sz ) ;
boost : : asio : : async_write (
m_Socket ,
boost : : asio : : buffer ( m_StreamBuffer , sz ) ,
boost : : asio : : transfer_all ( ) ,
std : : bind ( & SAMSocket : : HandleWriteI2PData , shared_from_this ( ) , std : : placeholders : : _1 , std : : placeholders : : _2 ) ) ;
}
void SAMSocket : : HandleI2PReceive ( const boost : : system : : error_code & ecode , std : : size_t bytes_transferred )
@ -826,6 +787,7 @@ namespace client
@@ -826,6 +787,7 @@ namespace client
{
WriteI2PData ( bytes_transferred ) ;
}
else
I2PReceive ( ) ;
}
}
@ -858,7 +820,7 @@ namespace client
@@ -858,7 +820,7 @@ namespace client
if ( session )
{
// find more pending acceptors
for ( auto it : session - > ListSockets ( ) )
for ( auto & it : m_Owner . ListSockets ( m_ID ) )
if ( it - > m_SocketType = = eSAMSocketTypeAcceptor )
{
it - > m_IsAccepting = true ;
@ -930,29 +892,30 @@ namespace client
@@ -930,29 +892,30 @@ namespace client
}
}
SAMSession : : SAMSession ( std : : shared_ptr < ClientDestination > dest ) :
void SAMSocket : : HandleStreamSend ( const boost : : system : : error_code & ec )
{
m_Owner . GetService ( ) . post ( std : : bind ( ! ec ? & SAMSocket : : Receive : & SAMSocket : : TerminateClose , shared_from_this ( ) ) ) ;
}
SAMSession : : SAMSession ( SAMBridge & parent , const std : : string & id , std : : shared_ptr < ClientDestination > dest ) :
m_Bridge ( parent ) ,
localDestination ( dest ) ,
UDPEndpoint ( nullptr )
UDPEndpoint ( nullptr ) ,
Name ( id )
{
}
SAMSession : : ~ SAMSession ( )
{
CloseStreams ( ) ;
i2p : : client : : context . DeleteLocalDestination ( localDestination ) ;
}
void SAMSession : : CloseStreams ( )
{
std : : vector < std : : shared_ptr < SAMSocket > > socks ;
for ( const auto & itr : m_Bridge . ListSockets ( Name ) )
{
std : : lock_guard < std : : mutex > lock ( m_SocketsMutex ) ;
for ( const auto & sock : m_Sockets ) {
socks . push_back ( sock ) ;
}
itr - > Terminate ( nullptr ) ;
}
for ( auto & sock : socks ) sock - > Terminate ( " SAMSession::CloseStreams() " ) ;
m_Sockets . clear ( ) ;
}
SAMBridge : : SAMBridge ( const std : : string & address , int port ) :
@ -1009,12 +972,17 @@ namespace client
@@ -1009,12 +972,17 @@ namespace client
void SAMBridge : : Accept ( )
{
auto native = std : : make_shared < boost : : asio : : ip : : tcp : : socket > ( m_Service ) ;
auto newSocket = std : : make_shared < SAMSocket > ( * this , native ) ;
m_Acceptor . async_accept ( * native , std : : bind ( & SAMBridge : : HandleAccept , this ,
auto newSocket = std : : make_shared < SAMSocket > ( * this ) ;
m_Acceptor . async_accept ( newSocket - > GetSocket ( ) , std : : bind ( & SAMBridge : : HandleAccept , this ,
std : : placeholders : : _1 , newSocket ) ) ;
}
void SAMBridge : : RemoveSocket ( const std : : shared_ptr < SAMSocket > & socket )
{
std : : unique_lock < std : : mutex > lock ( m_OpenSocketsMutex ) ;
m_OpenSockets . remove_if ( [ socket ] ( const std : : shared_ptr < SAMSocket > & item ) - > bool { return item = = socket ; } ) ;
}
void SAMBridge : : HandleAccept ( const boost : : system : : error_code & ecode , std : : shared_ptr < SAMSocket > socket )
{
if ( ! ecode )
@ -1024,6 +992,10 @@ namespace client
@@ -1024,6 +992,10 @@ namespace client
if ( ! ec )
{
LogPrint ( eLogDebug , " SAM: new connection from " , ep ) ;
{
std : : unique_lock < std : : mutex > l ( m_OpenSocketsMutex ) ;
m_OpenSockets . push_back ( socket ) ;
}
socket - > ReceiveHandshake ( ) ;
}
else
@ -1066,7 +1038,7 @@ namespace client
@@ -1066,7 +1038,7 @@ namespace client
if ( localDestination )
{
localDestination - > Acquire ( ) ;
auto session = std : : make_shared < SAMSession > ( localDestination ) ;
auto session = std : : make_shared < SAMSession > ( * this , id , localDestination ) ;
std : : unique_lock < std : : mutex > l ( m_SessionsMutex ) ;
auto ret = m_Sessions . insert ( std : : make_pair ( id , session ) ) ;
if ( ! ret . second )
@ -1105,6 +1077,18 @@ namespace client
@@ -1105,6 +1077,18 @@ namespace client
return nullptr ;
}
std : : list < std : : shared_ptr < SAMSocket > > SAMBridge : : ListSockets ( const std : : string & id ) const
{
std : : list < std : : shared_ptr < SAMSocket > > list ;
{
std : : unique_lock < std : : mutex > l ( m_OpenSocketsMutex ) ;
for ( const auto & itr : m_OpenSockets )
if ( itr - > IsSession ( id ) )
list . push_back ( itr ) ;
}
return list ;
}
void SAMBridge : : SendTo ( const uint8_t * buf , size_t len , std : : shared_ptr < boost : : asio : : ip : : udp : : endpoint > remote )
{
if ( remote )
@ -1127,6 +1111,8 @@ namespace client
@@ -1127,6 +1111,8 @@ namespace client
{
m_DatagramReceiveBuffer [ bytes_transferred ] = 0 ;
char * eol = strchr ( ( char * ) m_DatagramReceiveBuffer , ' \n ' ) ;
if ( eol )
{
* eol = 0 ; eol + + ;
size_t payloadLen = bytes_transferred - ( ( uint8_t * ) eol - m_DatagramReceiveBuffer ) ;
LogPrint ( eLogDebug , " SAM: datagram received " , m_DatagramReceiveBuffer , " size= " , payloadLen ) ;
@ -1154,6 +1140,9 @@ namespace client
@@ -1154,6 +1140,9 @@ namespace client
}
else
LogPrint ( eLogError , " SAM: Missing sessionID " ) ;
}
else
LogPrint ( eLogError , " SAM: invalid datagram " ) ;
ReceiveDatagram ( ) ;
}
else