diff --git a/Destination.cpp b/Destination.cpp index 4712ac09..c5a3beff 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -14,9 +14,8 @@ namespace i2p namespace client { LeaseSetDestination::LeaseSetDestination (bool isPublic, const std::map * params): - m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), - m_IsPublic (isPublic), m_PublishReplyToken (0), - m_DatagramDestination (nullptr), m_PublishConfirmationTimer (m_Service), + m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_IsPublic (isPublic), + m_PublishReplyToken (0), m_PublishConfirmationTimer (m_Service), m_PublishVerificationTimer (m_Service), m_CleanupTimer (m_Service) { int inboundTunnelLen = DEFAULT_INBOUND_TUNNEL_LENGTH; @@ -109,8 +108,6 @@ namespace client m_LeaseSetRequests.clear (); if (m_Pool) i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); - if (m_DatagramDestination) - delete m_DatagramDestination; } void LeaseSetDestination::Run () @@ -128,7 +125,7 @@ namespace client } } - void LeaseSetDestination::Start () + bool LeaseSetDestination::Start () { if (!m_IsRunning) { @@ -136,18 +133,17 @@ namespace client m_Pool->SetLocalDestination (shared_from_this ()); m_Pool->SetActive (true); m_Thread = new std::thread (std::bind (&LeaseSetDestination::Run, shared_from_this ())); - m_StreamingDestination = std::make_shared (shared_from_this ()); // TODO: - m_StreamingDestination->Start (); - for (auto it: m_StreamingDestinationsByPorts) - it.second->Start (); m_CleanupTimer.expires_from_now (boost::posix_time::minutes (DESTINATION_CLEANUP_TIMEOUT)); m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, shared_from_this (), std::placeholders::_1)); + return true; } + else + return false; } - void LeaseSetDestination::Stop () + bool LeaseSetDestination::Stop () { if (m_IsRunning) { @@ -155,16 +151,6 @@ namespace client m_PublishConfirmationTimer.cancel (); m_PublishVerificationTimer.cancel (); m_IsRunning = false; - m_StreamingDestination->Stop (); - m_StreamingDestination = nullptr; - for (auto it: m_StreamingDestinationsByPorts) - it.second->Stop (); - if (m_DatagramDestination) - { - auto d = m_DatagramDestination; - m_DatagramDestination = nullptr; - delete d; - } if (m_Pool) { m_Pool->SetLocalDestination (nullptr); @@ -177,7 +163,10 @@ namespace client delete m_Thread; m_Thread = 0; } + return true; } + else + return false; } std::shared_ptr LeaseSetDestination::FindLeaseSet (const i2p::data::IdentHash& ident) @@ -476,117 +465,6 @@ namespace client } } - void LeaseSetDestination::HandleDataMessage (const uint8_t * buf, size_t len) - { - uint32_t length = bufbe32toh (buf); - buf += 4; - // we assume I2CP payload - uint16_t fromPort = bufbe16toh (buf + 4), // source - toPort = bufbe16toh (buf + 6); // destination - switch (buf[9]) - { - case PROTOCOL_TYPE_STREAMING: - { - // streaming protocol - auto dest = GetStreamingDestination (toPort); - if (dest) - dest->HandleDataMessagePayload (buf, length); - else - LogPrint (eLogError, "Destination: Missing streaming destination"); - } - break; - case PROTOCOL_TYPE_DATAGRAM: - // datagram protocol - if (m_DatagramDestination) - m_DatagramDestination->HandleDataMessagePayload (fromPort, toPort, buf, length); - else - LogPrint (eLogError, "Destination: Missing datagram destination"); - break; - default: - LogPrint (eLogError, "Destination: Data: unexpected protocol ", buf[9]); - } - } - - void LeaseSetDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) - { - if (!streamRequestComplete) - { - LogPrint (eLogError, "Destination: request callback is not specified in CreateStream"); - return; - } - auto leaseSet = FindLeaseSet (dest); - if (leaseSet) - streamRequestComplete(CreateStream (leaseSet, port)); - else - { - auto s = shared_from_this (); - RequestDestination (dest, - [s, streamRequestComplete, port](std::shared_ptr ls) - { - if (ls) - streamRequestComplete(s->CreateStream (ls, port)); - else - streamRequestComplete (nullptr); - }); - } - } - - std::shared_ptr LeaseSetDestination::CreateStream (std::shared_ptr remote, int port) - { - if (m_StreamingDestination) - return m_StreamingDestination->CreateNewOutgoingStream (remote, port); - else - return nullptr; - } - - std::shared_ptr LeaseSetDestination::GetStreamingDestination (int port) const - { - if (port) - { - auto it = m_StreamingDestinationsByPorts.find (port); - if (it != m_StreamingDestinationsByPorts.end ()) - return it->second; - } - // if port is zero or not found, use default destination - return m_StreamingDestination; - } - - void LeaseSetDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor) - { - if (m_StreamingDestination) - m_StreamingDestination->SetAcceptor (acceptor); - } - - void LeaseSetDestination::StopAcceptingStreams () - { - if (m_StreamingDestination) - m_StreamingDestination->ResetAcceptor (); - } - - bool LeaseSetDestination::IsAcceptingStreams () const - { - if (m_StreamingDestination) - return m_StreamingDestination->IsAcceptorSet (); - return false; - } - - std::shared_ptr LeaseSetDestination::CreateStreamingDestination (int port, bool gzip) - { - auto dest = std::make_shared (shared_from_this (), port, gzip); - if (port) - m_StreamingDestinationsByPorts[port] = dest; - else // update default - m_StreamingDestination = dest; - return dest; - } - - i2p::datagram::DatagramDestination * LeaseSetDestination::CreateDatagramDestination () - { - if (!m_DatagramDestination) - m_DatagramDestination = new i2p::datagram::DatagramDestination (shared_from_this ()); - return m_DatagramDestination; - } - bool LeaseSetDestination::RequestDestination (const i2p::data::IdentHash& dest, RequestComplete requestComplete) { if (!m_Pool || !IsReady ()) @@ -751,23 +629,9 @@ namespace client } } - std::vector > LeaseSetDestination::GetAllStreams () const - { - std::vector > ret; - if (m_StreamingDestination) - { - for (auto& it: m_StreamingDestination->GetStreams ()) - ret.push_back (it.second); - } - for (auto& it: m_StreamingDestinationsByPorts) - for (auto& it1: it.second->GetStreams ()) - ret.push_back (it1.second); - return ret; - } - ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params): LeaseSetDestination (isPublic, params), - m_Keys (keys) + m_Keys (keys), m_DatagramDestination (nullptr) { if (isPublic) PersistTemporaryKeys (); @@ -777,6 +641,157 @@ namespace client LogPrint (eLogInfo, "Destination: Local address ", GetIdentHash().ToBase32 (), " created"); } + ClientDestination::~ClientDestination () + { + if (m_DatagramDestination) + delete m_DatagramDestination; + } + + bool ClientDestination::Start () + { + if (LeaseSetDestination::Start ()) + { + m_StreamingDestination = std::make_shared (shared_from_this ()); // TODO: + m_StreamingDestination->Start (); + for (auto it: m_StreamingDestinationsByPorts) + it.second->Start (); + return true; + } + else + return false; + } + + bool ClientDestination::Stop () + { + if (LeaseSetDestination::Stop ()) + { + m_StreamingDestination->Stop (); + m_StreamingDestination = nullptr; + for (auto it: m_StreamingDestinationsByPorts) + it.second->Stop (); + if (m_DatagramDestination) + { + auto d = m_DatagramDestination; + m_DatagramDestination = nullptr; + delete d; + } + return true; + } + else + return false; + } + + void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len) + { + uint32_t length = bufbe32toh (buf); + buf += 4; + // we assume I2CP payload + uint16_t fromPort = bufbe16toh (buf + 4), // source + toPort = bufbe16toh (buf + 6); // destination + switch (buf[9]) + { + case PROTOCOL_TYPE_STREAMING: + { + // streaming protocol + auto dest = GetStreamingDestination (toPort); + if (dest) + dest->HandleDataMessagePayload (buf, length); + else + LogPrint (eLogError, "Destination: Missing streaming destination"); + } + break; + case PROTOCOL_TYPE_DATAGRAM: + // datagram protocol + if (m_DatagramDestination) + m_DatagramDestination->HandleDataMessagePayload (fromPort, toPort, buf, length); + else + LogPrint (eLogError, "Destination: Missing datagram destination"); + break; + default: + LogPrint (eLogError, "Destination: Data: unexpected protocol ", buf[9]); + } + } + + void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) + { + if (!streamRequestComplete) + { + LogPrint (eLogError, "Destination: request callback is not specified in CreateStream"); + return; + } + auto leaseSet = FindLeaseSet (dest); + if (leaseSet) + streamRequestComplete(CreateStream (leaseSet, port)); + else + { + auto s = std::static_pointer_cast(shared_from_this ()); + RequestDestination (dest, + [s, streamRequestComplete, port](std::shared_ptr ls) + { + if (ls) + streamRequestComplete(s->CreateStream (ls, port)); + else + streamRequestComplete (nullptr); + }); + } + } + + std::shared_ptr ClientDestination::CreateStream (std::shared_ptr remote, int port) + { + if (m_StreamingDestination) + return m_StreamingDestination->CreateNewOutgoingStream (remote, port); + else + return nullptr; + } + + std::shared_ptr ClientDestination::GetStreamingDestination (int port) const + { + if (port) + { + auto it = m_StreamingDestinationsByPorts.find (port); + if (it != m_StreamingDestinationsByPorts.end ()) + return it->second; + } + // if port is zero or not found, use default destination + return m_StreamingDestination; + } + + void ClientDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor) + { + if (m_StreamingDestination) + m_StreamingDestination->SetAcceptor (acceptor); + } + + void ClientDestination::StopAcceptingStreams () + { + if (m_StreamingDestination) + m_StreamingDestination->ResetAcceptor (); + } + + bool ClientDestination::IsAcceptingStreams () const + { + if (m_StreamingDestination) + return m_StreamingDestination->IsAcceptorSet (); + return false; + } + + std::shared_ptr ClientDestination::CreateStreamingDestination (int port, bool gzip) + { + auto dest = std::make_shared (shared_from_this (), port, gzip); + if (port) + m_StreamingDestinationsByPorts[port] = dest; + else // update default + m_StreamingDestination = dest; + return dest; + } + + i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination () + { + if (!m_DatagramDestination) + m_DatagramDestination = new i2p::datagram::DatagramDestination (shared_from_this ()); + return m_DatagramDestination; + } + void ClientDestination::PersistTemporaryKeys () { std::string ident = GetIdentHash().ToBase32(); @@ -800,5 +815,19 @@ namespace client } LogPrint(eLogError, "Destinations: Can't save keys to ", path); } + + std::vector > ClientDestination::GetAllStreams () const + { + std::vector > ret; + if (m_StreamingDestination) + { + for (auto& it: m_StreamingDestination->GetStreams ()) + ret.push_back (it.second); + } + for (auto& it: m_StreamingDestinationsByPorts) + for (auto& it1: it.second->GetStreams ()) + ret.push_back (it1.second); + return ret; + } } } diff --git a/Destination.h b/Destination.h index 3cf4ff48..c2292cfe 100644 --- a/Destination.h +++ b/Destination.h @@ -71,8 +71,8 @@ namespace client LeaseSetDestination (bool isPublic, const std::map * params = nullptr); ~LeaseSetDestination (); - virtual void Start (); - virtual void Stop (); + virtual bool Start (); + virtual bool Stop (); bool IsRunning () const { return m_IsRunning; }; boost::asio::io_service& GetService () { return m_Service; }; std::shared_ptr GetTunnelPool () { return m_Pool; }; @@ -81,20 +81,6 @@ namespace client bool RequestDestination (const i2p::data::IdentHash& dest, RequestComplete requestComplete = nullptr); void CancelDestinationRequest (const i2p::data::IdentHash& dest); - // streaming - std::shared_ptr CreateStreamingDestination (int port, bool gzip = true); // additional - std::shared_ptr GetStreamingDestination (int port = 0) const; - // following methods operate with default streaming destination - void CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port = 0); - std::shared_ptr CreateStream (std::shared_ptr remote, int port = 0); - void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor); - void StopAcceptingStreams (); - bool IsAcceptingStreams () const; - - // datagram - i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; }; - i2p::datagram::DatagramDestination * CreateDatagramDestination (); - // implements GarlicDestination std::shared_ptr GetLeaseSet (); std::shared_ptr GetTunnelPool () const { return m_Pool; } @@ -106,9 +92,11 @@ namespace client void ProcessDeliveryStatusMessage (std::shared_ptr msg); void SetLeaseSetUpdated (); - // I2CP - void HandleDataMessage (const uint8_t * buf, size_t len); + protected: + // I2CP + virtual void HandleDataMessage (const uint8_t * buf, size_t len) = 0; + private: void Run (); @@ -125,7 +113,7 @@ namespace client void HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest); void HandleCleanupTimer (const boost::system::error_code& ecode); void CleanupRemoteLeaseSets (); - + private: volatile bool m_IsRunning; @@ -140,10 +128,6 @@ namespace client bool m_IsPublic; uint32_t m_PublishReplyToken; std::set m_ExcludedFloodfills; // for publishing - - std::shared_ptr m_StreamingDestination; // default - std::map > m_StreamingDestinationsByPorts; - i2p::datagram::DatagramDestination * m_DatagramDestination; boost::asio::deadline_timer m_PublishConfirmationTimer, m_PublishVerificationTimer, m_CleanupTimer; @@ -151,7 +135,6 @@ namespace client // for HTTP only int GetNumRemoteLeaseSets () const { return m_RemoteLeaseSets.size (); }; - std::vector > GetAllStreams () const; }; class ClientDestination: public LeaseSetDestination @@ -159,12 +142,35 @@ namespace client public: ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params = nullptr); + ~ClientDestination (); + + bool Start (); + bool Stop (); + // streaming + std::shared_ptr CreateStreamingDestination (int port, bool gzip = true); // additional + std::shared_ptr GetStreamingDestination (int port = 0) const; + // following methods operate with default streaming destination + void CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port = 0); + std::shared_ptr CreateStream (std::shared_ptr remote, int port = 0); + void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor); + void StopAcceptingStreams (); + bool IsAcceptingStreams () const; + + // datagram + i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; }; + i2p::datagram::DatagramDestination * CreateDatagramDestination (); + // implements LocalDestination const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; }; const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; }; + protected: + + // I2CP + void HandleDataMessage (const uint8_t * buf, size_t len); + private: void PersistTemporaryKeys (); @@ -173,6 +179,15 @@ namespace client i2p::data::PrivateKeys m_Keys; uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256]; + + std::shared_ptr m_StreamingDestination; // default + std::map > m_StreamingDestinationsByPorts; + i2p::datagram::DatagramDestination * m_DatagramDestination; + + public: + + // for HTTP only + std::vector > GetAllStreams () const; }; } }