diff --git a/Destination.cpp b/Destination.cpp index a7dfcf3c..9930c72e 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -10,8 +10,9 @@ namespace i2p { namespace stream { - StreamingDestination::StreamingDestination (boost::asio::io_service& service, bool isPublic): - m_Service (service), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) + StreamingDestination::StreamingDestination (bool isPublic): + m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), + m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) { m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); @@ -21,8 +22,9 @@ namespace stream LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created"); } - StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic): - m_Service (service), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) + StreamingDestination::StreamingDestination (const std::string& fullPath, bool isPublic): + m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), + m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) { std::ifstream s(fullPath.c_str (), std::ifstream::binary); if (s.is_open ()) @@ -55,8 +57,9 @@ namespace stream m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel } - StreamingDestination::StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic): - m_Service (service), m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) + StreamingDestination::StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic): + m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), + m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) { CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); @@ -73,11 +76,35 @@ namespace stream delete it.second; m_Streams.clear (); } + Stop (); if (m_Pool) i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); delete m_LeaseSet; } + void StreamingDestination::Run () + { + m_Service.run (); + } + + void StreamingDestination::Start () + { + m_IsRunning = true; + m_Thread = new std::thread (std::bind (&StreamingDestination::Run, this)); + } + + void StreamingDestination::Stop () + { + m_IsRunning = false; + m_Service.stop (); + if (m_Thread) + { + m_Thread->join (); + delete m_Thread; + m_Thread = 0; + } + } + void StreamingDestination::SendTunnelDataMsgs (const std::vector& msgs) { m_CurrentOutboundTunnel = m_Pool->GetNextOutboundTunnel (m_CurrentOutboundTunnel); @@ -246,35 +273,21 @@ namespace stream { if (!m_SharedLocalDestination) { - m_SharedLocalDestination = new StreamingDestination (m_Service, false); // non-public + m_SharedLocalDestination = new StreamingDestination (false); // non-public m_Destinations[m_SharedLocalDestination->GetIdentity ().GetIdentHash ()] = m_SharedLocalDestination; + m_SharedLocalDestination->Start (); } - // LoadLocalDestinations (); - - m_IsRunning = true; - m_Thread = new std::thread (std::bind (&StreamingDestinations::Run, this)); } void StreamingDestinations::Stop () { for (auto it: m_Destinations) - delete it.second; + { + it.second->Stop (); + delete it.second; + } m_Destinations.clear (); m_SharedLocalDestination = 0; // deleted through m_Destination - - m_IsRunning = false; - m_Service.stop (); - if (m_Thread) - { - m_Thread->join (); - delete m_Thread; - m_Thread = 0; - } - } - - void StreamingDestinations::Run () - { - m_Service.run (); } void StreamingDestinations::LoadLocalDestinations () @@ -292,7 +305,7 @@ namespace stream #else it->path(); #endif - auto localDestination = new StreamingDestination (m_Service, fullPath, true); + auto localDestination = new StreamingDestination (fullPath, true); m_Destinations[localDestination->GetIdentHash ()] = localDestination; numDestinations++; } @@ -303,17 +316,19 @@ namespace stream StreamingDestination * StreamingDestinations::LoadLocalDestination (const std::string& filename, bool isPublic) { - auto localDestination = new StreamingDestination (m_Service, i2p::util::filesystem::GetFullPath (filename), isPublic); + auto localDestination = new StreamingDestination (i2p::util::filesystem::GetFullPath (filename), isPublic); std::unique_lock l(m_DestinationsMutex); m_Destinations[localDestination->GetIdentHash ()] = localDestination; + localDestination->Start (); return localDestination; } StreamingDestination * StreamingDestinations::CreateNewLocalDestination (bool isPublic) { - auto localDestination = new StreamingDestination (m_Service, isPublic); + auto localDestination = new StreamingDestination (isPublic); std::unique_lock l(m_DestinationsMutex); m_Destinations[localDestination->GetIdentHash ()] = localDestination; + localDestination->Start (); return localDestination; } @@ -328,6 +343,7 @@ namespace stream std::unique_lock l(m_DestinationsMutex); m_Destinations.erase (it); } + d->Stop (); delete d; } } @@ -340,9 +356,10 @@ namespace stream LogPrint ("Local destination ", keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p exists"); return nullptr; } - auto localDestination = new StreamingDestination (m_Service, keys, isPublic); + auto localDestination = new StreamingDestination (keys, isPublic); std::unique_lock l(m_DestinationsMutex); m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination; + localDestination->Start (); return localDestination; } diff --git a/Destination.h b/Destination.h index 9de28b4b..682cfb83 100644 --- a/Destination.h +++ b/Destination.h @@ -18,11 +18,14 @@ namespace stream { public: - StreamingDestination (boost::asio::io_service& service, bool isPublic); - StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic); - StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic); + StreamingDestination (bool isPublic); + StreamingDestination (const std::string& fullPath, bool isPublic); + StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic); ~StreamingDestination (); + void Start (); + void Stop (); + i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; }; Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); @@ -52,12 +55,17 @@ namespace stream private: + void Run (); Stream * CreateNewIncomingStream (); void UpdateLeaseSet (); private: - boost::asio::io_service& m_Service; + bool m_IsRunning; + std::thread * m_Thread; + boost::asio::io_service m_Service; + boost::asio::io_service::work m_Work; + std::mutex m_StreamsMutex; std::map m_Streams; i2p::data::PrivateKeys m_Keys; @@ -75,8 +83,7 @@ namespace stream { public: - StreamingDestinations (): m_IsRunning (false), m_Thread (nullptr), - m_Work (m_Service), m_SharedLocalDestination (nullptr) {}; + StreamingDestinations (): m_SharedLocalDestination (nullptr) {}; ~StreamingDestinations () {}; void Start (); @@ -93,16 +100,10 @@ namespace stream private: - void Run (); void LoadLocalDestinations (); private: - bool m_IsRunning; - std::thread * m_Thread; - boost::asio::io_service m_Service; - boost::asio::io_service::work m_Work; - std::mutex m_DestinationsMutex; std::map m_Destinations; StreamingDestination * m_SharedLocalDestination;