diff --git a/SAM.cpp b/SAM.cpp index 6645b392..0f7334e3 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -33,13 +33,30 @@ namespace stream DeleteStream (m_Stream); m_Stream = nullptr; } - if (m_SocketType == eSAMSocketTypeSession) - m_Owner.CloseSession (m_ID); - else if (m_SocketType == eSAMSocketTypeStream) + switch (m_SocketType) { - auto session = m_Owner.FindSession (m_ID); - if (session) - session->sockets.remove (this); + case eSAMSocketTypeSession: + m_Owner.CloseSession (m_ID); + break; + case eSAMSocketTypeStream: + { + auto session = m_Owner.FindSession (m_ID); + if (session) + session->sockets.remove (this); + break; + } + case eSAMSocketTypeAcceptor: + { + auto session = m_Owner.FindSession (m_ID); + if (session) + { + session->sockets.remove (this); + session->localDestination->ResetAcceptor (); + } + break; + } + default: + ; } delete this; } @@ -137,6 +154,8 @@ namespace stream ProcessSessionCreate (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); else if (!strcmp (m_Buffer, SAM_STREAM_CONNECT)) ProcessStreamConnect (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); + else if (!strcmp (m_Buffer, SAM_STREAM_ACCEPT)) + ProcessStreamAccept (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); else { LogPrint ("SAM unexpected message ", m_Buffer); @@ -191,20 +210,43 @@ namespace stream if (leaseSet) { m_SocketType = eSAMSocketTypeStream; - m_Stream = i2p::stream::CreateStream (*leaseSet); - m_Stream->Send ((uint8_t *)m_Buffer, 0, 0); // connect - StreamReceive (); session->sockets.push_back (this); - SendMessageReply (SAM_STREAM_CONNECT_REPLY_OK, sizeof(SAM_STREAM_CONNECT_REPLY_OK), false); + m_Stream = session->localDestination->CreateNewOutgoingStream (*leaseSet); + m_Stream->Send ((uint8_t *)m_Buffer, 0, 0); // connect + I2PReceive (); + SendMessageReply (SAM_STREAM_STATUS_OK, sizeof(SAM_STREAM_STATUS_OK), false); } else { i2p::data::netdb.Subscribe (dest.GetIdentHash ()); - SendMessageReply (SAM_STREAM_CONNECT_CANT_REACH_PEER, sizeof(SAM_STREAM_CONNECT_CANT_REACH_PEER), true); + SendMessageReply (SAM_STREAM_STATUS_CANT_REACH_PEER, sizeof(SAM_STREAM_STATUS_CANT_REACH_PEER), true); } } else - SendMessageReply (SAM_STREAM_CONNECT_INVALID_ID, sizeof(SAM_STREAM_CONNECT_INVALID_ID), true); + SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, sizeof(SAM_STREAM_STATUS_INVALID_ID), true); + } + + void SAMSocket::ProcessStreamAccept (char * buf, size_t len) + { + std::map params; + ExtractParams (buf, len, params); + std::string& id = params[SAM_PARAM_ID]; + m_ID = id; + auto session = m_Owner.FindSession (id); + if (session) + { + if (!session->localDestination->IsAcceptorSet ()) + { + m_SocketType = eSAMSocketTypeAcceptor; + session->sockets.push_back (this); + session->localDestination->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1)); + SendMessageReply (SAM_STREAM_STATUS_OK, sizeof(SAM_STREAM_STATUS_OK), false); + } + else + SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, sizeof(SAM_STREAM_STATUS_I2P_ERROR), true); + } + else + SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, sizeof(SAM_STREAM_STATUS_INVALID_ID), true); } void SAMSocket::ExtractParams (char * buf, size_t len, std::map& params) @@ -246,16 +288,16 @@ namespace stream } } - void SAMSocket::StreamReceive () + void SAMSocket::I2PReceive () { if (m_Stream) m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE), - boost::bind (&SAMSocket::HandleStreamReceive, this, + boost::bind (&SAMSocket::HandleI2PReceive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred), SAM_SOCKET_CONNECTION_MAX_IDLE); } - void SAMSocket::HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) + void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) { @@ -265,11 +307,11 @@ namespace stream else { boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), - boost::bind (&SAMSocket::HandleWriteStreamData, this, boost::asio::placeholders::error)); + boost::bind (&SAMSocket::HandleWriteI2PData, this, boost::asio::placeholders::error)); } } - void SAMSocket::HandleWriteStreamData (const boost::system::error_code& ecode) + void SAMSocket::HandleWriteI2PData (const boost::system::error_code& ecode) { if (ecode) { @@ -278,9 +320,18 @@ namespace stream Terminate (); } else - StreamReceive (); + I2PReceive (); } + void SAMSocket::HandleI2PAccept (i2p::stream::Stream * stream) + { + m_Stream = stream; + auto session = m_Owner.FindSession (m_ID); + if (session) + session->localDestination->ResetAcceptor (); + I2PReceive (); + } + SAMBridge::SAMBridge (int port): m_IsRunning (false), m_Thread (nullptr), m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)), diff --git a/SAM.h b/SAM.h index 9c1a57df..87ba3fd7 100644 --- a/SAM.h +++ b/SAM.h @@ -21,9 +21,11 @@ namespace stream const char SAM_SESSION_CREATE_REPLY_OK[] = "SESSION STATUS RESULT=OK DESTINATION="; const char SAM_SESSION_CREATE_DUPLICATED_ID[] = "SESSION STATUS RESULT=DUPLICATED_ID"; const char SAM_STREAM_CONNECT[] = "STREAM CONNECT"; - const char SAM_STREAM_CONNECT_REPLY_OK[] = "STREAM STATUS RESULT=OK"; - const char SAM_STREAM_CONNECT_INVALID_ID[] = "STREAM STATUS RESULT=INVALID_ID"; - const char SAM_STREAM_CONNECT_CANT_REACH_PEER[] = "STREAM STATUS RESULT=CANT_REACH_PEER"; + const char SAM_STREAM_STATUS_OK[] = "STREAM STATUS RESULT=OK"; + const char SAM_STREAM_STATUS_INVALID_ID[] = "STREAM STATUS RESULT=INVALID_ID"; + const char SAM_STREAM_STATUS_CANT_REACH_PEER[] = "STREAM STATUS RESULT=CANT_REACH_PEER"; + const char SAM_STREAM_STATUS_I2P_ERROR[] = "STREAM STATUS RESULT=I2P_ERROR"; + const char SAM_STREAM_ACCEPT[] = "STREAM ACCEPT"; const char SAM_PARAM_STYLE[] = "STYLE"; const char SAM_PARAM_ID[] = "ID"; const char SAM_PARAM_DESTINATION[] = "DESTINATION"; @@ -33,7 +35,8 @@ namespace stream { eSAMSocketTypeUnknown, eSAMSocketTypeSession, - eSAMSocketTypeStream + eSAMSocketTypeStream, + eSAMSocketTypeAcceptor }; class SAMBridge; @@ -58,12 +61,14 @@ namespace stream void Receive (); void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void StreamReceive (); - void HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void HandleWriteStreamData (const boost::system::error_code& ecode); + void I2PReceive (); + void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void HandleI2PAccept (i2p::stream::Stream * stream); + void HandleWriteI2PData (const boost::system::error_code& ecode); void ProcessSessionCreate (char * buf, size_t len); void ProcessStreamConnect (char * buf, size_t len); + void ProcessStreamAccept (char * buf, size_t len); void ExtractParams (char * buf, size_t len, std::map& params); private: diff --git a/Streaming.cpp b/Streaming.cpp index 9f65d48e..63cb4f0c 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -568,7 +568,7 @@ namespace stream i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); delete m_LeaseSet; } - + void StreamingDestination::HandleNextPacket (Packet * packet) { uint32_t sendStreamID = packet->GetSendStreamID (); @@ -589,6 +589,11 @@ namespace stream incomingStream->HandleNextPacket (packet); if (m_Acceptor != nullptr) m_Acceptor (incomingStream); + else + { + LogPrint ("Acceptor for incoming stream is not set"); + DeleteStream (incomingStream); + } } } diff --git a/Streaming.h b/Streaming.h index 977246d2..17ce55d9 100644 --- a/Streaming.h +++ b/Streaming.h @@ -152,6 +152,8 @@ namespace stream Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); void DeleteStream (Stream * stream); void SetAcceptor (const std::function& acceptor) { m_Acceptor = acceptor; }; + void ResetAcceptor () { m_Acceptor = nullptr; }; + bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; void HandleNextPacket (Packet * packet); // implements LocalDestination