From 81c63b0c3017cfd84b7235a224400bd51a9c68f9 Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 22 Nov 2014 16:35:58 -0500 Subject: [PATCH] shared pointers for SAM sockets --- SAM.cpp | 78 ++++++++++++++++++++++++--------------------------------- SAM.h | 10 ++++---- 2 files changed, 38 insertions(+), 50 deletions(-) diff --git a/SAM.cpp b/SAM.cpp index dc1aa3ac..557487b3 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -26,21 +26,18 @@ namespace client SAMSocket::~SAMSocket () { if (m_Stream) - { - m_Stream->Close (); i2p::stream::DeleteStream (m_Stream); - m_Stream = nullptr; - } } void SAMSocket::Terminate () { if (m_Stream) - { m_Stream->Close (); - i2p::stream::DeleteStream (m_Stream); - m_Stream = nullptr; - } + + // TODO: make this swap atomic + auto session = m_Session; + m_Session = nullptr; + switch (m_SocketType) { case eSAMSocketTypeSession: @@ -48,16 +45,16 @@ namespace client break; case eSAMSocketTypeStream: { - if (m_Session) - m_Session->sockets.remove (this); + if (session) + session->sockets.remove (shared_from_this ()); break; } case eSAMSocketTypeAcceptor: { - if (m_Session) + if (session) { - m_Session->sockets.remove (this); - m_Session->localDestination->StopAcceptingStreams (); + session->sockets.remove (shared_from_this ()); + session->localDestination->StopAcceptingStreams (); } break; } @@ -65,13 +62,12 @@ namespace client ; } m_Socket.close (); - // delete this; } void SAMSocket::ReceiveHandshake () { m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), - boost::bind(&SAMSocket::HandleHandshakeReceived, this, + boost::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } @@ -91,7 +87,7 @@ namespace client { // TODO: check version boost::asio::async_write (m_Socket, boost::asio::buffer (SAM_HANDSHAKE_REPLY, strlen (SAM_HANDSHAKE_REPLY)), boost::asio::transfer_all (), - boost::bind(&SAMSocket::HandleHandshakeReplySent, this, + boost::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } else @@ -113,7 +109,7 @@ namespace client else { m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), - boost::bind(&SAMSocket::HandleMessage, this, + boost::bind(&SAMSocket::HandleMessage, shared_from_this (), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } } @@ -122,7 +118,7 @@ namespace client { if (!m_IsSilent || m_SocketType == eSAMSocketTypeAcceptor) boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), - boost::bind(&SAMSocket::HandleMessageReplySent, this, + boost::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, close)); else { @@ -228,7 +224,7 @@ namespace client if (style == SAM_VALUE_DATAGRAM) { auto dest = m_Session->localDestination->CreateDatagramDestination (); - dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, this, + dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } SendSessionCreateReplyOk (); @@ -237,7 +233,7 @@ namespace client { m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL)); m_Timer.async_wait (boost::bind (&SAMSocket::HandleSessionReadinessCheckTimer, - this, boost::asio::placeholders::error)); + shared_from_this (), boost::asio::placeholders::error)); } } else @@ -254,7 +250,7 @@ namespace client { m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL)); m_Timer.async_wait (boost::bind (&SAMSocket::HandleSessionReadinessCheckTimer, - this, boost::asio::placeholders::error)); + shared_from_this (), boost::asio::placeholders::error)); } } } @@ -299,7 +295,7 @@ namespace client i2p::data::netdb.RequestDestination (dest.GetIdentHash (), true, m_Session->localDestination->GetTunnelPool ()); m_Timer.expires_from_now (boost::posix_time::seconds(SAM_CONNECT_TIMEOUT)); m_Timer.async_wait (boost::bind (&SAMSocket::HandleStreamDestinationRequestTimer, - this, boost::asio::placeholders::error, dest.GetIdentHash ())); + shared_from_this (), boost::asio::placeholders::error, dest.GetIdentHash ())); } } else @@ -309,7 +305,7 @@ namespace client void SAMSocket::Connect (const i2p::data::LeaseSet& remote) { m_SocketType = eSAMSocketTypeStream; - m_Session->sockets.push_back (this); + m_Session->sockets.push_back (shared_from_this ()); m_Stream = m_Session->localDestination->CreateStream (remote); m_Stream->Send ((uint8_t *)m_Buffer, 0); // connect I2PReceive (); @@ -366,8 +362,8 @@ namespace client if (!m_Session->localDestination->IsAcceptingStreams ()) { m_SocketType = eSAMSocketTypeAcceptor; - m_Session->sockets.push_back (this); - m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1)); + m_Session->sockets.push_back (shared_from_this ()); + m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } else @@ -422,7 +418,7 @@ namespace client i2p::data::netdb.RequestDestination (ident, true, m_Session->localDestination->GetTunnelPool ()); m_Timer.expires_from_now (boost::posix_time::seconds(SAM_NAMING_LOOKUP_TIMEOUT)); m_Timer.async_wait (boost::bind (&SAMSocket::HandleNamingLookupDestinationRequestTimer, - this, boost::asio::placeholders::error, ident)); + shared_from_this (), boost::asio::placeholders::error, ident)); } } else @@ -475,7 +471,7 @@ namespace client { m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), boost::bind((m_SocketType == eSAMSocketTypeSession) ? &SAMSocket::HandleMessage : &SAMSocket::HandleReceived, - this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + shared_from_this (), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) @@ -498,7 +494,7 @@ namespace client { if (m_Stream) m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE), - boost::bind (&SAMSocket::HandleI2PReceive, this, + boost::bind (&SAMSocket::HandleI2PReceive, shared_from_this (), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred), SAM_SOCKET_CONNECTION_MAX_IDLE); } @@ -514,7 +510,7 @@ namespace client else { boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), - boost::bind (&SAMSocket::HandleWriteI2PData, this, boost::asio::placeholders::error)); + boost::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), boost::asio::placeholders::error)); } } @@ -567,7 +563,7 @@ namespace client { memcpy (m_StreamBuffer + l2, buf, len); boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len + l2), - boost::bind (&SAMSocket::HandleWriteI2PData, this, boost::asio::placeholders::error)); + boost::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), boost::asio::placeholders::error)); } else LogPrint (eLogWarning, "Datagram size ", len," exceeds buffer"); @@ -576,15 +572,13 @@ namespace client SAMBridge::SAMBridge (int port): m_IsRunning (false), m_Thread (nullptr), m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)), - m_DatagramEndpoint (boost::asio::ip::udp::v4 (), port-1), m_DatagramSocket (m_Service, m_DatagramEndpoint), - m_NewSocket (nullptr) + m_DatagramEndpoint (boost::asio::ip::udp::v4 (), port-1), m_DatagramSocket (m_Service, m_DatagramEndpoint) { } SAMBridge::~SAMBridge () { Stop (); - delete m_NewSocket; } void SAMBridge::Start () @@ -624,24 +618,20 @@ namespace client void SAMBridge::Accept () { - m_NewSocket = new SAMSocket (*this); - m_Acceptor.async_accept (m_NewSocket->GetSocket (), boost::bind (&SAMBridge::HandleAccept, this, - boost::asio::placeholders::error)); + auto newSocket = std::make_shared (*this); + m_Acceptor.async_accept (newSocket->GetSocket (), boost::bind (&SAMBridge::HandleAccept, this, + boost::asio::placeholders::error, newSocket)); } - void SAMBridge::HandleAccept(const boost::system::error_code& ecode) + void SAMBridge::HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket) { if (!ecode) { - LogPrint ("New SAM connection from ", m_NewSocket->GetSocket ().remote_endpoint ()); - m_NewSocket->ReceiveHandshake (); + LogPrint ("New SAM connection from ", socket->GetSocket ().remote_endpoint ()); + socket->ReceiveHandshake (); } else - { LogPrint ("SAM accept error: ", ecode.message ()); - delete m_NewSocket; - m_NewSocket = nullptr; - } if (ecode != boost::asio::error::operation_aborted) Accept (); @@ -680,8 +670,6 @@ namespace client auto it = m_Sessions.find (id); if (it != m_Sessions.end ()) { - for (auto it1 : it->second.sockets) - delete it1; it->second.sockets.clear (); it->second.localDestination->Stop (); m_Sessions.erase (it); diff --git a/SAM.h b/SAM.h index 069b3109..8f22cad7 100644 --- a/SAM.h +++ b/SAM.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "Identity.h" #include "LeaseSet.h" @@ -39,7 +40,7 @@ namespace client const char SAM_DEST_REPLY_I2P_ERROR[] = "DEST REPLY RESULT=I2P_ERROR\n"; const char SAM_NAMING_LOOKUP[] = "NAMING LOOKUP"; const char SAM_NAMING_REPLY[] = "NAMING REPLY RESULT=OK NAME=ME VALUE=%s\n"; - const char SAM_DATAGRAM_RECEIVED[] = "DATAGRAM_RECEIVED DESTINATION=%s SIZE=%i\n"; + const char SAM_DATAGRAM_RECEIVED[] = "DATAGRAM_RECEIVED DESTINATION=%s SIZE=%lu\n"; const char SAM_NAMING_REPLY_INVALID_KEY[] = "NAMING REPLY RESULT=INVALID_KEY NAME=%s\n"; const char SAM_NAMING_REPLY_KEY_NOT_FOUND[] = "NAMING REPLY RESULT=INVALID_KEY_NOT_FOUND NAME=%s\n"; const char SAM_PARAM_STYLE[] = "STYLE"; @@ -64,7 +65,7 @@ namespace client class SAMBridge; class SAMSession; - class SAMSocket + class SAMSocket: public std::enable_shared_from_this { public: @@ -122,7 +123,7 @@ namespace client struct SAMSession { ClientDestination * localDestination; - std::list sockets; + std::list > sockets; }; class SAMBridge @@ -145,7 +146,7 @@ namespace client void Run (); void Accept (); - void HandleAccept(const boost::system::error_code& ecode); + void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket); void ReceiveDatagram (); void HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred); @@ -158,7 +159,6 @@ namespace client boost::asio::ip::tcp::acceptor m_Acceptor; boost::asio::ip::udp::endpoint m_DatagramEndpoint, m_SenderEndpoint; boost::asio::ip::udp::socket m_DatagramSocket; - SAMSocket * m_NewSocket; std::mutex m_SessionsMutex; std::map m_Sessions; uint8_t m_DatagramReceiveBuffer[i2p::datagram::MAX_DATAGRAM_SIZE+1];