From 4bd8b44ab2fe34085908029624c1438be69cf4e9 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 23 Nov 2014 11:33:58 -0500 Subject: [PATCH] shared pointers for streams --- Destination.cpp | 2 +- Destination.h | 3 ++- HTTPServer.cpp | 2 +- HTTPServer.h | 3 ++- I2PTunnel.cpp | 6 +++--- I2PTunnel.h | 7 ++++--- SAM.cpp | 4 ++-- SAM.h | 4 ++-- SOCKS.cpp | 3 +-- SOCKS.h | 4 ++-- Streaming.cpp | 24 ++++++++---------------- Streaming.h | 21 ++++++++++++--------- api/api.cpp | 4 ++-- api/api.h | 7 ++++--- 14 files changed, 46 insertions(+), 48 deletions(-) diff --git a/Destination.cpp b/Destination.cpp index 4b5f6af4..e90d214e 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -274,7 +274,7 @@ namespace client } } - i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote, int port) + std::shared_ptr ClientDestination::CreateStream (const i2p::data::LeaseSet& remote, int port) { if (m_StreamingDestination) return m_StreamingDestination->CreateNewOutgoingStream (remote, port); diff --git a/Destination.h b/Destination.h index c804dea8..cab024ac 100644 --- a/Destination.h +++ b/Destination.h @@ -3,6 +3,7 @@ #include #include +#include #include "Identity.h" #include "TunnelPool.h" #include "CryptoConst.h" @@ -41,7 +42,7 @@ namespace client // streaming i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; }; - i2p::stream::Stream * CreateStream (const i2p::data::LeaseSet& remote, int port = 0); + std::shared_ptr CreateStream (const i2p::data::LeaseSet& remote, int port = 0); void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor); void StopAcceptingStreams (); bool IsAcceptingStreams () const; diff --git a/HTTPServer.cpp b/HTTPServer.cpp index e5f74184..e80d3d19 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -518,7 +518,7 @@ namespace util { m_Stream->Close (); i2p::stream::DeleteStream (m_Stream); - m_Stream = nullptr; + m_Stream.reset (); } m_Socket->close (); //delete this; diff --git a/HTTPServer.h b/HTTPServer.h index c7d6b0e4..51bbc98b 100644 --- a/HTTPServer.h +++ b/HTTPServer.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include "LeaseSet.h" @@ -79,7 +80,7 @@ namespace util boost::asio::ip::tcp::socket * m_Socket; boost::asio::deadline_timer m_Timer; - i2p::stream::Stream * m_Stream; + std::shared_ptr m_Stream; char m_Buffer[HTTP_CONNECTION_BUFFER_SIZE + 1], m_StreamBuffer[HTTP_CONNECTION_BUFFER_SIZE + 1]; size_t m_BufferLen; request m_Request; diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 8dadabae..4c73a3c7 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -20,7 +20,7 @@ namespace client Receive (); } - I2PTunnelConnection::I2PTunnelConnection (I2PTunnel * owner, i2p::stream::Stream * stream, + I2PTunnelConnection::I2PTunnelConnection (I2PTunnel * owner, std::shared_ptr stream, boost::asio::ip::tcp::socket * socket, const boost::asio::ip::tcp::endpoint& target): m_Socket (socket), m_Stream (stream), m_Owner (owner) { @@ -40,7 +40,7 @@ namespace client { m_Stream->Close (); i2p::stream::DeleteStream (m_Stream); - m_Stream = nullptr; + m_Stream.reset (); } m_Socket->close (); if (m_Owner) @@ -275,7 +275,7 @@ namespace client LogPrint ("Local destination not set for server tunnel"); } - void I2PServerTunnel::HandleAccept (i2p::stream::Stream * stream) + void I2PServerTunnel::HandleAccept (std::shared_ptr stream) { if (stream) new I2PTunnelConnection (this, stream, new boost::asio::ip::tcp::socket (GetService ()), m_Endpoint); diff --git a/I2PTunnel.h b/I2PTunnel.h index 4f99bc79..72eb8c46 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "Identity.h" #include "Destination.h" @@ -24,7 +25,7 @@ namespace client I2PTunnelConnection (I2PTunnel * owner, boost::asio::ip::tcp::socket * socket, const i2p::data::LeaseSet * leaseSet); - I2PTunnelConnection (I2PTunnel * owner, i2p::stream::Stream * stream, boost::asio::ip::tcp::socket * socket, + I2PTunnelConnection (I2PTunnel * owner, std::shared_ptr stream, boost::asio::ip::tcp::socket * socket, const boost::asio::ip::tcp::endpoint& target); ~I2PTunnelConnection (); @@ -44,7 +45,7 @@ namespace client uint8_t m_Buffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE], m_StreamBuffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE]; boost::asio::ip::tcp::socket * m_Socket; - i2p::stream::Stream * m_Stream; + std::shared_ptr m_Stream; I2PTunnel * m_Owner; }; @@ -111,7 +112,7 @@ namespace client private: void Accept (); - void HandleAccept (i2p::stream::Stream * stream); + void HandleAccept (std::shared_ptr stream); private: diff --git a/SAM.cpp b/SAM.cpp index d2b90292..c02d2583 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -34,7 +34,7 @@ namespace client { m_Stream->Close (); i2p::stream::DeleteStream (m_Stream); - m_Stream = nullptr; + m_Stream.reset (); } } @@ -531,7 +531,7 @@ namespace client I2PReceive (); } - void SAMSocket::HandleI2PAccept (i2p::stream::Stream * stream) + void SAMSocket::HandleI2PAccept (std::shared_ptr stream) { if (stream) { diff --git a/SAM.h b/SAM.h index e51a86ba..48d05615 100644 --- a/SAM.h +++ b/SAM.h @@ -90,7 +90,7 @@ namespace client void I2PReceive (); void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void HandleI2PAccept (i2p::stream::Stream * stream); + void HandleI2PAccept (std::shared_ptr stream); void HandleWriteI2PData (const boost::system::error_code& ecode); void HandleI2PDatagramReceive (const i2p::data::IdentityEx& ident, const uint8_t * buf, size_t len); @@ -118,7 +118,7 @@ namespace client SAMSocketType m_SocketType; std::string m_ID; // nickname bool m_IsSilent; - i2p::stream::Stream * m_Stream; + std::shared_ptr m_Stream; SAMSession * m_Session; }; diff --git a/SOCKS.cpp b/SOCKS.cpp index f2386321..11c124e4 100644 --- a/SOCKS.cpp +++ b/SOCKS.cpp @@ -77,8 +77,7 @@ namespace proxy { if (m_stream) { LogPrint("--- socks4a close stream"); - delete m_stream; - m_stream = nullptr; + m_stream.reset (); } } diff --git a/SOCKS.h b/SOCKS.h index 6b9dc9d4..f08910db 100644 --- a/SOCKS.h +++ b/SOCKS.h @@ -5,7 +5,7 @@ #include #include #include - +#include #include "Identity.h" #include "Streaming.h" @@ -47,7 +47,7 @@ namespace proxy boost::asio::io_service * m_ios; boost::asio::ip::tcp::socket * m_sock; boost::asio::deadline_timer m_ls_timer; - i2p::stream::Stream * m_stream; + std::shared_ptr m_stream; i2p::data::LeaseSet * m_ls; i2p::data::IdentHash m_dest; state m_state; diff --git a/Streaming.cpp b/Streaming.cpp index dba42d29..61aa03af 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -103,7 +103,7 @@ namespace stream m_IsAckSendScheduled = true; m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(ACK_SEND_TIMEOUT)); m_AckSendTimer.async_wait (boost::bind (&Stream::HandleAckSendTimer, - this, boost::asio::placeholders::error)); + shared_from_this (), boost::asio::placeholders::error)); } } else if (isSyn) @@ -461,7 +461,7 @@ namespace stream m_ResendTimer.cancel (); m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_TIMEOUT)); m_ResendTimer.async_wait (boost::bind (&Stream::HandleResendTimer, - this, boost::asio::placeholders::error)); + shared_from_this (), boost::asio::placeholders::error)); } void Stream::HandleResendTimer (const boost::system::error_code& ecode) @@ -563,8 +563,6 @@ namespace stream ResetAcceptor (); { std::unique_lock l(m_StreamsMutex); - for (auto it: m_Streams) - delete it.second; m_Streams.clear (); } } @@ -597,36 +595,30 @@ namespace stream } } - Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port) + std::shared_ptr StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port) { - Stream * s = new Stream (*m_Owner.GetService (), *this, remote, port); + auto s = std::make_shared (*m_Owner.GetService (), *this, remote, port); std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; return s; } - Stream * StreamingDestination::CreateNewIncomingStream () + std::shared_ptr StreamingDestination::CreateNewIncomingStream () { - Stream * s = new Stream (*m_Owner.GetService (), *this); + auto s = std::make_shared (*m_Owner.GetService (), *this); std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; return s; } - void StreamingDestination::DeleteStream (Stream * stream) + void StreamingDestination::DeleteStream (std::shared_ptr stream) { if (stream) { std::unique_lock l(m_StreamsMutex); auto it = m_Streams.find (stream->GetRecvStreamID ()); if (it != m_Streams.end ()) - { m_Streams.erase (it); - if (m_Owner.GetService ()) - m_Owner.GetService ()->post ([stream](void) { delete stream; }); - else - delete stream; - } } } @@ -651,7 +643,7 @@ namespace stream } } - void DeleteStream (Stream * stream) + void DeleteStream (std::shared_ptr stream) { if (stream) stream->GetLocalDestination ().DeleteStream (stream); diff --git a/Streaming.h b/Streaming.h index 82a01b30..005f00d0 100644 --- a/Streaming.h +++ b/Streaming.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include "I2PEndian.h" @@ -78,7 +79,7 @@ namespace stream }; class StreamingDestination; - class Stream + class Stream: public std::enable_shared_from_this { public: @@ -153,7 +154,7 @@ namespace stream { public: - typedef std::function Acceptor; + typedef std::function)> Acceptor; StreamingDestination (i2p::client::ClientDestination& owner): m_Owner (owner) {}; ~StreamingDestination () {}; @@ -161,8 +162,8 @@ namespace stream void Start (); void Stop (); - Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port = 0); - void DeleteStream (Stream * stream); + std::shared_ptr CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port = 0); + void DeleteStream (std::shared_ptr stream); void SetAcceptor (const Acceptor& acceptor) { m_Acceptor = acceptor; }; void ResetAcceptor () { m_Acceptor = nullptr; }; bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; @@ -173,13 +174,13 @@ namespace stream private: void HandleNextPacket (Packet * packet); - Stream * CreateNewIncomingStream (); + std::shared_ptr CreateNewIncomingStream (); private: i2p::client::ClientDestination& m_Owner; std::mutex m_StreamsMutex; - std::map m_Streams; + std::map > m_Streams; Acceptor m_Acceptor; public: @@ -188,7 +189,7 @@ namespace stream const decltype(m_Streams)& GetStreams () const { return m_Streams; }; }; - void DeleteStream (Stream * stream); + void DeleteStream (std::shared_ptr stream); //------------------------------------------------- @@ -197,15 +198,17 @@ namespace stream { if (!m_ReceiveQueue.empty ()) { - m_Service.post ([=](void) { this->HandleReceiveTimer ( + auto s = shared_from_this(); + m_Service.post ([=](void) { s->HandleReceiveTimer ( boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler); }); } else { m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(timeout)); + auto s = shared_from_this(); m_ReceiveTimer.async_wait ([=](const boost::system::error_code& ecode) - { this->HandleReceiveTimer (ecode, buffer, handler); }); + { s->HandleReceiveTimer (ecode, buffer, handler); }); } } diff --git a/api/api.cpp b/api/api.cpp index f4766722..82611c41 100644 --- a/api/api.cpp +++ b/api/api.cpp @@ -73,7 +73,7 @@ namespace api i2p::data::netdb.RequestDestination (remote, true, dest->GetTunnelPool ()); } - i2p::stream::Stream * CreateStream (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote) + std::shared_ptr CreateStream (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote) { auto leaseSet = i2p::data::netdb.FindLeaseSet (remote); if (leaseSet) @@ -95,7 +95,7 @@ namespace api dest->AcceptStreams (acceptor); } - void DestroyStream (i2p::stream::Stream * stream) + void DestroyStream (std::shared_ptr stream) { if (stream) { diff --git a/api/api.h b/api/api.h index 2fea8627..5b4273af 100644 --- a/api/api.h +++ b/api/api.h @@ -1,6 +1,7 @@ #ifndef API_H__ #define API_H__ +#include #include "Identity.h" #include "Destination.h" #include "Streaming.h" @@ -16,14 +17,14 @@ namespace api // destinations i2p::client::ClientDestination * CreateLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true); - i2p::client::ClientDestination * CreateLocalDestination (bool isPublic = false, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // transient destinations usually not published + i2p::client::ClientDestination * CreateLocalDestination (bool isPublic = false, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256); // transient destinations usually not published void DestoroyLocalDestination (i2p::client::ClientDestination * dest); // streams void RequestLeaseSet (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote); - i2p::stream::Stream * CreateStream (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote); + std::shared_ptr CreateStream (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote); void AcceptStream (i2p::client::ClientDestination * dest, const i2p::stream::StreamingDestination::Acceptor& acceptor); - void DestroyStream (i2p::stream::Stream * stream); + void DestroyStream (std::shared_ptr stream); } }