diff --git a/libi2pd/TransitTunnel.cpp b/libi2pd/TransitTunnel.cpp index edf96c31..8284dc14 100644 --- a/libi2pd/TransitTunnel.cpp +++ b/libi2pd/TransitTunnel.cpp @@ -122,14 +122,86 @@ namespace tunnel } } + TransitTunnels::TransitTunnels (): + m_IsRunning (false) + { + } + + TransitTunnels::~TransitTunnels () + { + Stop (); + } + void TransitTunnels::Start () { + m_IsRunning = true; + m_Thread.reset (new std::thread (std::bind (&TransitTunnels::Run, this))); } void TransitTunnels::Stop () { + m_IsRunning = false; + m_TunnelBuildMsgQueue.WakeUp (); + if (m_Thread) + { + m_Thread->join (); + m_Thread = nullptr; + } m_TransitTunnels.clear (); } + + void TransitTunnels::Run () + { + i2p::util::SetThreadName("TBM"); + uint64_t lastTs = 0; + std::list > msgs; + while (m_IsRunning) + { + try + { + if (m_TunnelBuildMsgQueue.Wait (TRANSIT_TUNNELS_QUEUE_WAIT_INTERVAL, 0)) + { + m_TunnelBuildMsgQueue.GetWholeQueue (msgs); + while (!msgs.empty ()) + { + auto msg = msgs.front (); msgs.pop_front (); + if (!msg) continue; + uint8_t typeID = msg->GetTypeID (); + switch (typeID) + { + case eI2NPShortTunnelBuild: + HandleShortTransitTunnelBuildMsg (std::move (msg)); + break; + case eI2NPVariableTunnelBuild: + HandleVariableTransitTunnelBuildMsg (std::move (msg)); + break; + default: + LogPrint (eLogWarning, "TransitTunnel: Unexpected message type ", (int) typeID); + } + if (!m_IsRunning) break; + } + } + if (m_IsRunning) + { + uint64_t ts = i2p::util::GetSecondsSinceEpoch (); + if (ts >= lastTs + TUNNEL_MANAGE_INTERVAL || ts + TUNNEL_MANAGE_INTERVAL < lastTs) + { + ManageTransitTunnels (ts); + lastTs = ts; + } + } + } + catch (std::exception& ex) + { + LogPrint (eLogError, "TransitTunnel: Runtime exception: ", ex.what ()); + } + } + } + + void TransitTunnels::PostTransitTunnelBuildMsg (std::shared_ptr&& msg) + { + if (msg) m_TunnelBuildMsgQueue.Put (msg); + } void TransitTunnels::HandleShortTransitTunnelBuildMsg (std::shared_ptr&& msg) { @@ -234,6 +306,7 @@ namespace tunnel { if (transitTunnel) { + LogPrint (eLogDebug, "TransitTunnel: Failed to send reply for transit tunnel ", transitTunnel->GetTunnelID ()); auto t = transitTunnel->GetCreationTime (); if (t > i2p::tunnel::TUNNEL_EXPIRATION_TIMEOUT) // make transit tunnel expired diff --git a/libi2pd/TransitTunnel.h b/libi2pd/TransitTunnel.h index fc8676d0..bff1a8dd 100644 --- a/libi2pd/TransitTunnel.h +++ b/libi2pd/TransitTunnel.h @@ -14,6 +14,7 @@ #include #include #include "Crypto.h" +#include "Queue.h" #include "I2NPProtocol.h" #include "TunnelEndpoint.h" #include "TunnelGateway.h" @@ -109,33 +110,45 @@ namespace tunnel const i2p::crypto::AESKey& layerKey, const i2p::crypto::AESKey& ivKey, bool isGateway, bool isEndpoint); + + const int TRANSIT_TUNNELS_QUEUE_WAIT_INTERVAL = 10; // in seconds + class TransitTunnels { public: + TransitTunnels (); + ~TransitTunnels (); + void Start (); void Stop (); - void ManageTransitTunnels (uint64_t ts); - + void PostTransitTunnelBuildMsg (std::shared_ptr&& msg); + size_t GetNumTransitTunnels () const { return m_TransitTunnels.size (); } int GetTransitTunnelsExpirationTimeout (); - - void HandleShortTransitTunnelBuildMsg (std::shared_ptr&& msg); - void HandleVariableTransitTunnelBuildMsg (std::shared_ptr&& msg); private: bool AddTransitTunnel (std::shared_ptr tunnel); + void ManageTransitTunnels (uint64_t ts); + + void HandleShortTransitTunnelBuildMsg (std::shared_ptr&& msg); + void HandleVariableTransitTunnelBuildMsg (std::shared_ptr&& msg); bool HandleBuildRequestRecords (int num, uint8_t * records, uint8_t * clearText); + void Run (); + private: + volatile bool m_IsRunning; + std::unique_ptr m_Thread; std::list > m_TransitTunnels; - + i2p::util::Queue > m_TunnelBuildMsgQueue; + public: // for HTTP only - auto& GetTransitTunnels () const { return m_TransitTunnels; }; + const auto& GetTransitTunnels () const { return m_TransitTunnels; }; }; } } diff --git a/libi2pd/Tunnel.cpp b/libi2pd/Tunnel.cpp index d809e48a..e7443b6c 100644 --- a/libi2pd/Tunnel.cpp +++ b/libi2pd/Tunnel.cpp @@ -373,6 +373,7 @@ namespace tunnel std::shared_ptr Tunnels::GetTunnel (uint32_t tunnelID) { + std::lock_guard l(m_TunnelsMutex); auto it = m_Tunnels.find(tunnelID); if (it != m_Tunnels.end ()) return it->second; @@ -382,11 +383,13 @@ namespace tunnel bool Tunnels::AddTunnel (std::shared_ptr tunnel) { if (!tunnel) return false; + std::lock_guard l(m_TunnelsMutex); return m_Tunnels.emplace (tunnel->GetTunnelID (), tunnel).second; } void Tunnels::RemoveTunnel (uint32_t tunnelID) { + std::lock_guard l(m_TunnelsMutex); m_Tunnels.erase (tunnelID); } @@ -655,7 +658,7 @@ namespace tunnel return; } else - m_TransitTunnels.HandleShortTransitTunnelBuildMsg (std::move (msg)); + m_TransitTunnels.PostTransitTunnelBuildMsg (std::move (msg)); } void Tunnels::HandleVariableTunnelBuildMsg (std::shared_ptr msg) @@ -678,7 +681,7 @@ namespace tunnel } } else - m_TransitTunnels.HandleVariableTransitTunnelBuildMsg (std::move (msg)); + m_TransitTunnels.PostTransitTunnelBuildMsg (std::move (msg)); } void Tunnels::HandleTunnelBuildReplyMsg (std::shared_ptr msg, bool isShort) @@ -710,7 +713,6 @@ namespace tunnel ManagePendingTunnels (ts); ManageInboundTunnels (ts); ManageOutboundTunnels (ts); - m_TransitTunnels.ManageTransitTunnels (ts); } void Tunnels::ManagePendingTunnels (uint64_t ts) diff --git a/libi2pd/Tunnel.h b/libi2pd/Tunnel.h index fcccd236..a25012a4 100644 --- a/libi2pd/Tunnel.h +++ b/libi2pd/Tunnel.h @@ -300,8 +300,9 @@ namespace tunnel std::map > m_PendingOutboundTunnels; // by replyMsgID std::list > m_InboundTunnels; std::list > m_OutboundTunnels; + mutable std::mutex m_TunnelsMutex; std::unordered_map > m_Tunnels; // tunnelID->tunnel known by this id - std::mutex m_PoolsMutex; + mutable std::mutex m_PoolsMutex; std::list> m_Pools; std::shared_ptr m_ExploratoryPool; i2p::util::Queue > m_Queue; @@ -320,7 +321,7 @@ namespace tunnel // for HTTP only const decltype(m_OutboundTunnels)& GetOutboundTunnels () const { return m_OutboundTunnels; }; const decltype(m_InboundTunnels)& GetInboundTunnels () const { return m_InboundTunnels; }; - auto& GetTransitTunnels () const { return m_TransitTunnels.GetTransitTunnels (); }; + const auto& GetTransitTunnels () const { return m_TransitTunnels.GetTransitTunnels (); }; size_t CountTransitTunnels() const; size_t CountInboundTunnels() const;