diff --git a/SAM.cpp b/SAM.cpp index ce42753b..36c25b91 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -9,7 +9,8 @@ namespace i2p namespace stream { SAMSocket::SAMSocket (SAMBridge& owner): - m_Owner (owner), m_Socket (m_Owner.GetService ()), m_Stream (nullptr) + m_Owner (owner), m_Socket (m_Owner.GetService ()), m_SocketType (eSAMSocketTypeUnknown), + m_Stream (nullptr) { } @@ -30,6 +31,8 @@ namespace stream DeleteStream (m_Stream); m_Stream = nullptr; } + if (m_SocketType == eSAMSocketTypeSession) + m_Owner.CloseSession (m_ID); delete this; } @@ -76,9 +79,112 @@ namespace stream Terminate (); } else - Receive (); + { + m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + boost::bind(&SAMSocket::HandleMessage, this, + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + } + + void SAMSocket::SendMessageReply (const char * msg, size_t len, bool close) + { + boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), + boost::bind(&SAMSocket::HandleMessageReplySent, this, + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, close)); + } + + void SAMSocket::HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close) + { + if (ecode) + { + LogPrint ("SAM reply send error: ", ecode.message ()); + if (ecode != boost::asio::error::operation_aborted) + Terminate (); + } + else + { + if (close) + Terminate (); + else + Receive (); + } } + void SAMSocket::HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred) + { + if (ecode) + { + LogPrint ("SAM read error: ", ecode.message ()); + if (ecode != boost::asio::error::operation_aborted) + Terminate (); + } + else + { + m_Buffer[bytes_transferred] = 0; + char * eol = strchr (m_Buffer, '\n'); + if (eol) + { + *eol = 0; + if (!strcmp (m_Buffer, SAM_SESSION_CREATE)) + ProcessSessionCreate (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); + else if (!strcmp (m_Buffer, SAM_STREAM_CONNECT)) + ProcessStreamConnect (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); + else + { + LogPrint ("SAM unexpected message ", m_Buffer); + Terminate (); + } + } + else + { + LogPrint ("SAM malformed message"); + Terminate (); + } + } + } + + void SAMSocket::ProcessSessionCreate (char * buf, size_t len) + { + std::map params; + ExtractParams (buf, len, params); + std::string& id = params[SAM_PARAM_ID]; + std::string& destination = params[SAM_PARAM_DESTINATION]; + m_ID = id; + auto session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination); + if (session) + { + memcpy (m_Buffer, SAM_SESSION_CREATE_REPLY_OK, sizeof (SAM_SESSION_CREATE_REPLY_OK)); + uint8_t ident[1024]; + size_t l = session->localDestination->GetPrivateKeys ().ToBuffer (ident, 1024); + size_t l1 = i2p::data::ByteStreamToBase64 (ident, l, m_Buffer + sizeof (SAM_SESSION_CREATE_REPLY_OK), + SAM_SOCKET_BUFFER_SIZE - sizeof (SAM_SESSION_CREATE_REPLY_OK)); + SendMessageReply (m_Buffer, sizeof (SAM_SESSION_CREATE_REPLY_OK) + l1, false); + } + else + SendMessageReply (SAM_SESSION_CREATE_DUPLICATED_ID, sizeof(SAM_SESSION_CREATE_DUPLICATED_ID), true); + } + + void SAMSocket::ProcessStreamConnect (char * buf, size_t len) + { + Receive (); + } + + void SAMSocket::ExtractParams (char * buf, size_t len, std::map& params) + { + while (char * eol = strchr (buf, '\n')) + { + *eol = 0; + char * value = strchr (buf, '='); + if (value) + { + *value = 0; + value++; + params[buf] = value; + } + buf = eol + 1; + } + } + void SAMSocket::Receive () { m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), @@ -207,16 +313,16 @@ namespace stream Accept (); } - bool SAMBridge::CreateSession (const std::string& id, const char * destination, size_t len) + SAMSession * SAMBridge::CreateSession (const std::string& id, const std::string& destination) { if (m_Sessions.find (id) != m_Sessions.end ()) // session exists - return false; + return nullptr; StreamingDestination * localDestination = nullptr; - if (destination) + if (destination != "") { - uint8_t * buf = new uint8_t[len]; - size_t l = i2p::data::Base64ToByteStream (destination, len, buf, len); + uint8_t * buf = new uint8_t[destination.length ()]; + size_t l = i2p::data::Base64ToByteStream (destination.c_str (), destination.length (), buf, destination.length ()); i2p::data::PrivateKeys keys; keys.FromBuffer (buf, l); delete[] buf; @@ -228,11 +334,11 @@ namespace stream { SAMSession session; session.localDestination = localDestination; - session.isTransient = !destination; + session.isTransient = destination == ""; m_Sessions[id] = session; - return true; + return &m_Sessions[id]; } - return false; + return nullptr; } void SAMBridge::CloseSession (const std::string& id) @@ -248,5 +354,13 @@ namespace stream m_Sessions.erase (it); } } + + SAMSession * SAMBridge::FindSession (const std::string& id) + { + auto it = m_Sessions.find (id); + if (it != m_Sessions.end ()) + return &it->second; + return nullptr; + } } } diff --git a/SAM.h b/SAM.h index bf59425f..28b26c30 100644 --- a/SAM.h +++ b/SAM.h @@ -17,6 +17,21 @@ namespace stream const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=3.1"; + const char SAM_SESSION_CREATE[] = "SESSION CREATE"; + const char SAM_SESSION_CREATE_REPLY_OK[] = "SESSION STATUS RESULT=OK DESTINATION="; + const char SAM_SESSION_CREATE_DUPLICATED_ID[] = "SESSION STATUS RESULT=DUPLICATED_ID"; + const char SAM_STREAM_CONNECT[] = "STREAM CONNECT"; + const char SAM_PARAM_STYLE[] = "STYLE"; + const char SAM_PARAM_ID[] = "ID"; + const char SAM_PARAM_DESTINATION[] = "DESTINATION"; + const char SAM_VALUE_TRANSIENT[] = "TRANSIENT"; + + enum SAMSocketType + { + eSAMSocketTypeUnknown, + eSAMSocketTypeSession, + eSAMSocketTypeStream + }; class SAMBridge; class SAMSocket @@ -34,6 +49,9 @@ namespace stream void Terminate (); void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void SendMessageReply (const char * msg, size_t len, bool close); + void HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close); void Receive (); void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); @@ -41,12 +59,18 @@ namespace stream void HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleWriteStreamData (const boost::system::error_code& ecode); + void ProcessSessionCreate (char * buf, size_t len); + void ProcessStreamConnect (char * buf, size_t len); + void ExtractParams (char * buf, size_t len, std::map& params); + private: SAMBridge& m_Owner; boost::asio::ip::tcp::socket m_Socket; char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1]; uint8_t m_StreamBuffer[SAM_SOCKET_BUFFER_SIZE]; + SAMSocketType m_SocketType; + std::string m_ID; // nickname Stream * m_Stream; }; @@ -68,8 +92,9 @@ namespace stream void Stop (); boost::asio::io_service& GetService () { return m_Service; }; - bool CreateSession (const std::string& id, const char * destination = nullptr, size_t len = 0); // null means transient + SAMSession * CreateSession (const std::string& id, const std::string& destination = ""); // empty string means transient void CloseSession (const std::string& id); + SAMSession * FindSession (const std::string& id); private: