diff --git a/daemon/HTTPServer.cpp b/daemon/HTTPServer.cpp index 6f884a9b..554ba45d 100644 --- a/daemon/HTTPServer.cpp +++ b/daemon/HTTPServer.cpp @@ -649,7 +649,7 @@ namespace http { s << i2p::client::context.GetAddressBook ().ToAddress(ident) << "
\r\n"; s << "
\r\n"; s << "Streams:
\r\n"; - for (const auto& it: session->ListSockets()) + for (const auto& it: sam->ListSockets(id)) { switch (it->GetSocketType ()) { diff --git a/daemon/I2PControl.cpp b/daemon/I2PControl.cpp index fcff78cd..6ac87cbb 100644 --- a/daemon/I2PControl.cpp +++ b/daemon/I2PControl.cpp @@ -727,7 +727,7 @@ namespace client sam_session.put("name", name); sam_session.put("address", i2p::client::context.GetAddressBook ().ToAddress(ident)); - for (const auto& socket: it.second->ListSockets()) + for (const auto& socket: sam->ListSockets(it.first)) { boost::property_tree::ptree stream; stream.put("type", socket->GetSocketType ()); diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index dd8e3634..e8386b61 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -578,9 +578,7 @@ namespace stream if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send { m_Status = eStreamStatusClosed; - // close could be called from another thread so do SendClose from the destination thread - // this is so m_LocalDestination.NewPacket () does not trigger a race condition - m_Service.post(std::bind(&Stream::SendClose, shared_from_this())); + SendClose(); } break; case eStreamStatusClosed: diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 47f99833..7f2598c0 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -165,6 +165,9 @@ namespace stream void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); }; + void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); }; + + /** only call close from destination thread, use Stream::AsyncClose for other threads */ void Close (); void Cancel () { m_ReceiveTimer.cancel (); }; diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 05943981..b8a72f0c 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -15,8 +15,8 @@ namespace i2p { namespace client { - SAMSocket::SAMSocket (SAMBridge& owner, std::shared_ptr socket): - m_Owner (owner), m_Socket(socket), m_Timer (m_Owner.GetService ()), + SAMSocket::SAMSocket (SAMBridge& owner): + m_Owner (owner), m_Socket(owner.GetService()), m_Timer (m_Owner.GetService ()), m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false), m_IsAccepting (false), m_Stream (nullptr) @@ -25,51 +25,18 @@ namespace client SAMSocket::~SAMSocket () { - if(m_Stream) - { - m_Stream->Close (); - m_Stream.reset (); - } - auto Session = m_Owner.FindSession(m_ID); - - switch (m_SocketType) - { - case eSAMSocketTypeSession: - m_Owner.CloseSession (m_ID); - break; - case eSAMSocketTypeStream: - { - if (Session) - Session->DelSocket (this); - break; - } - case eSAMSocketTypeAcceptor: - { - if (Session) - { - Session->DelSocket (this); - if (m_IsAccepting && Session->localDestination) - Session->localDestination->StopAcceptingStreams (); - } - break; - } - default: - ; - } - m_SocketType = eSAMSocketTypeTerminated; - if (m_Socket && m_Socket->is_open()) m_Socket->close (); - m_Socket.reset (); + m_Stream.reset (); + if (m_Socket.is_open()) m_Socket.close (); } void SAMSocket::Terminate (const char* reason) { if(m_Stream) { - m_Stream->Close (); + m_Stream->AsyncClose (); m_Stream.reset (); } auto Session = m_Owner.FindSession(m_ID); - switch (m_SocketType) { case eSAMSocketTypeSession: @@ -77,15 +44,12 @@ namespace client break; case eSAMSocketTypeStream: { - if (Session) - Session->DelSocket (this); break; } case eSAMSocketTypeAcceptor: { if (Session) { - Session->DelSocket (this); if (m_IsAccepting && Session->localDestination) Session->localDestination->StopAcceptingStreams (); } @@ -95,16 +59,15 @@ namespace client ; } m_SocketType = eSAMSocketTypeTerminated; - if (m_Socket && m_Socket->is_open()) m_Socket->close (); - m_Socket.reset (); + if (m_Socket.is_open()) m_Socket.close (); + m_Owner.RemoveSocket(this); } void SAMSocket::ReceiveHandshake () - { - if(m_Socket) - m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), - std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), - std::placeholders::_1, std::placeholders::_2)); + { + m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), + std::placeholders::_1, std::placeholders::_2)); } static bool SAMVersionAcceptable(const std::string & ver) @@ -125,7 +88,7 @@ namespace client void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) - { + { LogPrint (eLogError, "SAM: handshake read error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate ("SAM: handshake read error"); @@ -184,7 +147,7 @@ namespace client #else size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ()); #endif - boost::asio::async_write (*m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), + boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -199,17 +162,22 @@ namespace client } } + bool SAMSocket::IsSession(const std::string & id) const + { + return id == m_ID; + } + void SAMSocket::HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) - { + { LogPrint (eLogError, "SAM: handshake reply send error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate ("SAM: handshake reply send error"); } - else if(m_Socket) + else { - m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), std::bind(&SAMSocket::HandleMessage, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -220,7 +188,7 @@ namespace client LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg); if (!m_IsSilent) - boost::asio::async_write (*m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), + boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, close)); else @@ -501,7 +469,6 @@ namespace client if(session) { m_SocketType = eSAMSocketTypeStream; - session->AddSocket (shared_from_this ()); m_Stream = session->localDestination->CreateStream (remote); m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send m_BufferOffset = 0; @@ -534,7 +501,6 @@ namespace client if (session) { m_SocketType = eSAMSocketTypeAcceptor; - session->AddSocket (shared_from_this ()); if (!session->localDestination->IsAcceptingStreams ()) { m_IsAccepting = true; @@ -704,17 +670,9 @@ namespace client void SAMSocket::Receive () { - if (m_BufferOffset >= SAM_SOCKET_BUFFER_SIZE) - { - LogPrint (eLogError, "SAM: Buffer is full, terminate"); - Terminate ("Buffer is full"); - return; - } else if (m_Socket) - m_Socket->async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), - std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage, - shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - else - LogPrint(eLogError, "SAM: receive with no native socket"); + m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), + std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage, + shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) @@ -731,15 +689,12 @@ namespace client { bytes_transferred += m_BufferOffset; m_BufferOffset = 0; - auto s = shared_from_this (); m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred, - [s](const boost::system::error_code& ecode) - { - if (!ecode) - s->m_Owner.GetService ().post ([s] { s->Receive (); }); - else - s->m_Owner.GetService ().post ([s] { s->Terminate ("AsyncSend failed"); }); - }); + std::bind(&SAMSocket::HandleStreamSend, shared_from_this(), std::placeholders::_1)); + } + else + { + Terminate("No Stream Remaining"); } } } @@ -773,14 +728,11 @@ namespace client void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz) { - if(m_Socket) - boost::asio::async_write ( - *m_Socket, - boost::asio::buffer (buff, sz), - boost::asio::transfer_all(), - std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination - else - LogPrint(eLogError, "SAM: no native socket"); + boost::asio::async_write ( + m_Socket, + boost::asio::buffer (buff, sz), + boost::asio::transfer_all(), + std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination } void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff) @@ -858,7 +810,7 @@ namespace client if (session) { // find more pending acceptors - for (auto it: session->ListSockets ()) + for (auto it: m_Owner.ListSockets (m_ID)) if (it->m_SocketType == eSAMSocketTypeAcceptor) { it->m_IsAccepting = true; @@ -930,12 +882,19 @@ namespace client } } - SAMSession::SAMSession (std::shared_ptr dest): + void SAMSocket::HandleStreamSend(const boost::system::error_code & ec) + { + m_Owner.GetService ().post (std::bind( !ec ? &SAMSocket::Receive : &SAMSocket::TerminateClose, shared_from_this())); + } + + SAMSession::SAMSession (SAMBridge & parent, const std::string & id, std::shared_ptr dest): + m_Bridge(parent), localDestination (dest), - UDPEndpoint(nullptr) + UDPEndpoint(nullptr), + Name(id) { } - + SAMSession::~SAMSession () { CloseStreams(); @@ -944,15 +903,10 @@ namespace client void SAMSession::CloseStreams () { - std::vector > socks; + for(const auto & itr : m_Bridge.ListSockets(Name)) { - std::lock_guard lock(m_SocketsMutex); - for (const auto& sock : m_Sockets) { - socks.push_back(sock); - } + itr->Terminate(nullptr); } - for (auto & sock : socks ) sock->Terminate("SAMSession::CloseStreams()"); - m_Sockets.clear(); } SAMBridge::SAMBridge (const std::string& address, int port): @@ -1009,12 +963,16 @@ namespace client void SAMBridge::Accept () { - auto native = std::make_shared(m_Service); - auto newSocket = std::make_shared (*this, native); - m_Acceptor.async_accept (*native, std::bind (&SAMBridge::HandleAccept, this, + auto newSocket = std::make_shared(*this); + m_Acceptor.async_accept (newSocket->GetSocket(), std::bind (&SAMBridge::HandleAccept, this, std::placeholders::_1, newSocket)); } + void SAMBridge::RemoveSocket(const SAMSocket * socket) + { + m_OpenSockets.remove_if([socket](const std::shared_ptr & item) -> bool { return item.get() == socket; }); + } + void SAMBridge::HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket) { if (!ecode) @@ -1024,6 +982,7 @@ namespace client if (!ec) { LogPrint (eLogDebug, "SAM: new connection from ", ep); + m_OpenSockets.push_back(socket); socket->ReceiveHandshake (); } else @@ -1066,7 +1025,7 @@ namespace client if (localDestination) { localDestination->Acquire (); - auto session = std::make_shared(localDestination); + auto session = std::make_shared(*this, id, localDestination); std::unique_lock l(m_SessionsMutex); auto ret = m_Sessions.insert (std::make_pair(id, session)); if (!ret.second) @@ -1105,6 +1064,18 @@ namespace client return nullptr; } + std::list > SAMBridge::ListSockets(const std::string & id) const + { + std::list > list; + { + std::unique_lock l(m_SessionsMutex); + for (const auto & itr : m_OpenSockets) + if (itr->IsSession(id)) + list.push_back(itr); + } + return list; + } + void SAMBridge::SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote) { if(remote) @@ -1127,33 +1098,38 @@ namespace client { m_DatagramReceiveBuffer[bytes_transferred] = 0; char * eol = strchr ((char *)m_DatagramReceiveBuffer, '\n'); - *eol = 0; eol++; - size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer); - LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen); - char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' '); - if (sessionID) + if(eol) { - sessionID++; - char * destination = strchr (sessionID, ' '); - if (destination) + *eol = 0; eol++; + size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer); + LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen); + char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' '); + if (sessionID) { - *destination = 0; destination++; - auto session = FindSession (sessionID); - if (session) + sessionID++; + char * destination = strchr (sessionID, ' '); + if (destination) { - i2p::data::IdentityEx dest; - dest.FromBase64 (destination); - session->localDestination->GetDatagramDestination ()-> - SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ()); + *destination = 0; destination++; + auto session = FindSession (sessionID); + if (session) + { + i2p::data::IdentityEx dest; + dest.FromBase64 (destination); + session->localDestination->GetDatagramDestination ()-> + SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ()); + } + else + LogPrint (eLogError, "SAM: Session ", sessionID, " not found"); } else - LogPrint (eLogError, "SAM: Session ", sessionID, " not found"); + LogPrint (eLogError, "SAM: Missing destination key"); } else - LogPrint (eLogError, "SAM: Missing destination key"); + LogPrint (eLogError, "SAM: Missing sessionID"); } else - LogPrint (eLogError, "SAM: Missing sessionID"); + LogPrint(eLogError, "SAM: invalid datagram"); ReceiveDatagram (); } else diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index 6ecd14a4..d14e5e39 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -77,24 +77,27 @@ namespace client class SAMBridge; struct SAMSession; - class SAMSocket: public std::enable_shared_from_this + class SAMSocket :public std::enable_shared_from_this { public: typedef boost::asio::ip::tcp::socket Socket_t; - SAMSocket (SAMBridge& owner, std::shared_ptr socket); + SAMSocket (SAMBridge& owner); ~SAMSocket (); - boost::asio::ip::tcp::socket& GetSocket () { return *m_Socket; }; + Socket_t& GetSocket () { return m_Socket; }; void ReceiveHandshake (); void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; SAMSocketType GetSocketType () const { return m_SocketType; }; void Terminate (const char* reason); + bool IsSession(const std::string & id) const; + private: - - void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void TerminateClose() { Terminate(nullptr); } + + void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred); void SendMessageReply (const char * msg, size_t len, bool close); @@ -128,10 +131,12 @@ namespace client void WriteI2PDataImmediate(uint8_t * ptr, size_t sz); void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff); + void HandleStreamSend(const boost::system::error_code & ec); + private: SAMBridge& m_Owner; - std::shared_ptr m_Socket; + Socket_t m_Socket; boost::asio::deadline_timer m_Timer; char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1]; size_t m_BufferOffset; @@ -145,34 +150,12 @@ namespace client struct SAMSession { + SAMBridge & m_Bridge; std::shared_ptr localDestination; - std::list > m_Sockets; std::shared_ptr UDPEndpoint; - std::mutex m_SocketsMutex; - - /** safely add a socket to this session */ - void AddSocket(std::shared_ptr sock) { - std::lock_guard lock(m_SocketsMutex); - m_Sockets.push_back(sock); - } - - /** safely remove a socket from this session */ - void DelSocket(SAMSocket * sock) { - std::lock_guard lock(m_SocketsMutex); - m_Sockets.remove_if([sock](const std::shared_ptr s) -> bool { return s.get() == sock; }); - } - - /** get a list holding a copy of all sam sockets from this session */ - std::list > ListSockets() { - std::list > l; - { - std::lock_guard lock(m_SocketsMutex); - for(const auto& sock : m_Sockets ) l.push_back(sock); - } - return l; - } - - SAMSession (std::shared_ptr dest); + std::string Name; + + SAMSession (SAMBridge & parent, const std::string & name, std::shared_ptr dest); ~SAMSession (); void CloseStreams (); @@ -194,15 +177,19 @@ namespace client void CloseSession (const std::string& id); std::shared_ptr FindSession (const std::string& id) const; + std::list > ListSockets(const std::string & id) const; + /** send raw data to remote endpoint from our UDP Socket */ void SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote); + void RemoveSocket(const SAMSocket * socket); + private: void Run (); void Accept (); - void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket); + void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket); void ReceiveDatagram (); void HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred); @@ -217,6 +204,7 @@ namespace client boost::asio::ip::udp::socket m_DatagramSocket; mutable std::mutex m_SessionsMutex; std::map > m_Sessions; + std::list > m_OpenSockets; uint8_t m_DatagramReceiveBuffer[i2p::datagram::MAX_DATAGRAM_SIZE+1]; public: