From 7073a6bf384e763b46dc891d0091f69d295bca58 Mon Sep 17 00:00:00 2001 From: Simon Vetter Date: Thu, 28 Oct 2021 21:59:27 +0200 Subject: [PATCH 1/3] libi2pd: make Tunnel and TunnelConfig destructors virtual --- libi2pd/Tunnel.h | 2 +- libi2pd/TunnelConfig.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libi2pd/Tunnel.h b/libi2pd/Tunnel.h index 56166e0b..80d29a15 100644 --- a/libi2pd/Tunnel.h +++ b/libi2pd/Tunnel.h @@ -68,7 +68,7 @@ namespace tunnel public: Tunnel (std::shared_ptr config); - ~Tunnel (); + virtual ~Tunnel (); void Build (uint32_t replyMsgID, std::shared_ptr outboundTunnel = nullptr); diff --git a/libi2pd/TunnelConfig.h b/libi2pd/TunnelConfig.h index f198dffd..ae924ba6 100644 --- a/libi2pd/TunnelConfig.h +++ b/libi2pd/TunnelConfig.h @@ -99,7 +99,7 @@ namespace tunnel m_LastHop->SetReplyHop (replyTunnelID, replyIdent); } - ~TunnelConfig () + virtual ~TunnelConfig () { TunnelHopConfig * hop = m_FirstHop; From 1de1c79d4f72a1c7aa3ad51e6a582df5c8302c39 Mon Sep 17 00:00:00 2001 From: Simon Vetter Date: Sun, 31 Oct 2021 14:51:41 +0100 Subject: [PATCH 2/3] libi2pd: add missing locks to i2p::tunnel::Tunnels m_InboundTunnelsMutex, m_OutboundTunnelsMutex and m_PoolsMutex have been changed to recursive_mutexes since they can be acquired multiple times by the same thread. --- libi2pd/Tunnel.cpp | 33 ++++++++++++++++++++++++++++----- libi2pd/Tunnel.h | 8 +++++++- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/libi2pd/Tunnel.cpp b/libi2pd/Tunnel.cpp index a315ff49..f3cdd432 100644 --- a/libi2pd/Tunnel.cpp +++ b/libi2pd/Tunnel.cpp @@ -366,6 +366,7 @@ namespace tunnel std::shared_ptr Tunnels::GetTunnel (uint32_t tunnelID) { + std::unique_lock l(m_TunnelsMutex); auto it = m_Tunnels.find(tunnelID); if (it != m_Tunnels.end ()) return it->second; @@ -374,11 +375,13 @@ namespace tunnel std::shared_ptr Tunnels::GetPendingInboundTunnel (uint32_t replyMsgID) { + std::unique_lock l(m_PendingInboundTunnelsMutex); return GetPendingTunnel (replyMsgID, m_PendingInboundTunnels); } std::shared_ptr Tunnels::GetPendingOutboundTunnel (uint32_t replyMsgID) { + std::unique_lock l(m_PendingOutboundTunnelsMutex); return GetPendingTunnel (replyMsgID, m_PendingOutboundTunnels); } @@ -459,9 +462,11 @@ namespace tunnel void Tunnels::AddTransitTunnel (std::shared_ptr tunnel) { - if (m_Tunnels.emplace (tunnel->GetTunnelID (), tunnel).second) + std::unique_lock l(m_TunnelsMutex); + if (m_Tunnels.emplace (tunnel->GetTunnelID (), tunnel).second) { + std::unique_lock l(m_TransitTunnelsMutex); m_TransitTunnels.push_back (tunnel); - else + } else LogPrint (eLogError, "Tunnel: tunnel with id ", tunnel->GetTunnelID (), " already exists"); } @@ -616,7 +621,10 @@ namespace tunnel void Tunnels::ManagePendingTunnels () { + std::unique_lock li(m_PendingInboundTunnelsMutex); ManagePendingTunnels (m_PendingInboundTunnels); + li.unlock(); + std::unique_lock lo(m_PendingOutboundTunnelsMutex); ManagePendingTunnels (m_PendingOutboundTunnels); } @@ -676,6 +684,7 @@ namespace tunnel void Tunnels::ManageOutboundTunnels () { + std::unique_lock l(m_OutboundTunnelsMutex); uint64_t ts = i2p::util::GetSecondsSinceEpoch (); { for (auto it = m_OutboundTunnels.begin (); it != m_OutboundTunnels.end ();) @@ -730,6 +739,8 @@ namespace tunnel void Tunnels::ManageInboundTunnels () { + std::unique_lock lt(m_TunnelsMutex); + std::unique_lock li(m_InboundTunnelsMutex); uint64_t ts = i2p::util::GetSecondsSinceEpoch (); { for (auto it = m_InboundTunnels.begin (); it != m_InboundTunnels.end ();) @@ -786,6 +797,7 @@ namespace tunnel return; } + std::unique_lock lo(m_OutboundTunnelsMutex); if (m_OutboundTunnels.empty () || m_InboundTunnels.size () < 3) { // trying to create one more inbound tunnel @@ -806,6 +818,8 @@ namespace tunnel void Tunnels::ManageTransitTunnels () { + std::unique_lock lt(m_TunnelsMutex); + std::unique_lock l(m_TransitTunnelsMutex); uint32_t ts = i2p::util::GetSecondsSinceEpoch (); for (auto it = m_TransitTunnels.begin (); it != m_TransitTunnels.end ();) { @@ -876,17 +890,20 @@ namespace tunnel void Tunnels::AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel) { + std::unique_lock l(m_PendingInboundTunnelsMutex); m_PendingInboundTunnels[replyMsgID] = tunnel; } void Tunnels::AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel) { + std::unique_lock l(m_PendingOutboundTunnelsMutex); m_PendingOutboundTunnels[replyMsgID] = tunnel; } void Tunnels::AddOutboundTunnel (std::shared_ptr newTunnel) { // we don't need to insert it to m_Tunnels + std::unique_lock l(m_OutboundTunnelsMutex); m_OutboundTunnels.push_back (newTunnel); auto pool = newTunnel->GetTunnelPool (); if (pool && pool->IsActive ()) @@ -897,8 +914,10 @@ namespace tunnel void Tunnels::AddInboundTunnel (std::shared_ptr newTunnel) { + std::unique_lock l(m_TunnelsMutex); if (m_Tunnels.emplace (newTunnel->GetTunnelID (), newTunnel).second) { + std::unique_lock l(m_InboundTunnelsMutex); m_InboundTunnels.push_back (newTunnel); auto pool = newTunnel->GetTunnelPool (); if (!pool) @@ -926,6 +945,8 @@ namespace tunnel auto inboundTunnel = std::make_shared (); inboundTunnel->SetTunnelPool (pool); inboundTunnel->SetState (eTunnelStateEstablished); + std::unique_lock lt(m_TunnelsMutex); + std::unique_lock li(m_InboundTunnelsMutex); m_InboundTunnels.push_back (inboundTunnel); m_Tunnels[inboundTunnel->GetTunnelID ()] = inboundTunnel; return inboundTunnel; @@ -936,6 +957,7 @@ namespace tunnel auto outboundTunnel = std::make_shared (); outboundTunnel->SetTunnelPool (pool); outboundTunnel->SetState (eTunnelStateEstablished); + std::unique_lock l(m_OutboundTunnelsMutex); m_OutboundTunnels.push_back (outboundTunnel); // we don't insert into m_Tunnels return outboundTunnel; @@ -964,6 +986,7 @@ namespace tunnel int timeout = 0; uint32_t ts = i2p::util::GetSecondsSinceEpoch (); // TODO: possible race condition with I2PControl + std::unique_lock l(m_TransitTunnelsMutex); for (const auto& it : m_TransitTunnels) { int t = it->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT - ts; @@ -974,19 +997,19 @@ namespace tunnel size_t Tunnels::CountTransitTunnels() const { - // TODO: locking + std::unique_lock l(m_TransitTunnelsMutex); return m_TransitTunnels.size(); } size_t Tunnels::CountInboundTunnels() const { - // TODO: locking + std::unique_lock l(m_InboundTunnelsMutex); return m_InboundTunnels.size(); } size_t Tunnels::CountOutboundTunnels() const { - // TODO: locking + std::unique_lock l(m_OutboundTunnelsMutex); return m_OutboundTunnels.size(); } } diff --git a/libi2pd/Tunnel.h b/libi2pd/Tunnel.h index 80d29a15..640a9ca7 100644 --- a/libi2pd/Tunnel.h +++ b/libi2pd/Tunnel.h @@ -254,12 +254,18 @@ namespace tunnel bool m_IsRunning; std::thread * m_Thread; std::map > m_PendingInboundTunnels; // by replyMsgID + mutable std::mutex m_PendingInboundTunnelsMutex; std::map > m_PendingOutboundTunnels; // by replyMsgID + mutable std::mutex m_PendingOutboundTunnelsMutex; std::list > m_InboundTunnels; + mutable std::recursive_mutex m_InboundTunnelsMutex; std::list > m_OutboundTunnels; + mutable std::recursive_mutex m_OutboundTunnelsMutex; std::list > m_TransitTunnels; + mutable std::mutex m_TransitTunnelsMutex; std::unordered_map > m_Tunnels; // tunnelID->tunnel known by this id - std::mutex m_PoolsMutex; + mutable std::recursive_mutex m_TunnelsMutex; + mutable std::mutex m_PoolsMutex; std::list> m_Pools; std::shared_ptr m_ExploratoryPool; i2p::util::Queue > m_Queue; From 58b7b7d7317df28a40a6bc620761eb218d551a02 Mon Sep 17 00:00:00 2001 From: Simon Vetter Date: Sat, 30 Oct 2021 23:14:48 +0200 Subject: [PATCH 3/3] libi2pd: add missing locks to i2p::tunnel::TunnelPool --- libi2pd/TunnelPool.cpp | 46 +++++++++++++++++++++++++++++++++--------- libi2pd/TunnelPool.h | 6 ++++-- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/libi2pd/TunnelPool.cpp b/libi2pd/TunnelPool.cpp index 5ed330c8..4d00ad6f 100644 --- a/libi2pd/TunnelPool.cpp +++ b/libi2pd/TunnelPool.cpp @@ -59,6 +59,7 @@ namespace tunnel void TunnelPool::SetExplicitPeers (std::shared_ptr > explicitPeers) { + std::unique_lock l(m_ExplicitPeersMutex); m_ExplicitPeers = explicitPeers; if (m_ExplicitPeers) { @@ -92,6 +93,7 @@ namespace tunnel it->SetTunnelPool (nullptr); m_OutboundTunnels.clear (); } + std::unique_lock l(m_TestsMutex); m_Tests.clear (); } @@ -126,6 +128,7 @@ namespace tunnel } m_InboundTunnels.insert (createdTunnel); } + std::unique_lock l(m_LocalDestinationMutex); if (m_LocalDestination) m_LocalDestination->SetLeaseSetUpdated (); } @@ -135,10 +138,11 @@ namespace tunnel if (expiredTunnel) { expiredTunnel->SetTunnelPool (nullptr); + std::unique_lock lt(m_TestsMutex); for (auto& it: m_Tests) if (it.second.second == expiredTunnel) it.second.second = nullptr; - std::unique_lock l(m_InboundTunnelsMutex); + std::unique_lock li(m_InboundTunnelsMutex); m_InboundTunnels.erase (expiredTunnel); } } @@ -157,10 +161,11 @@ namespace tunnel if (expiredTunnel) { expiredTunnel->SetTunnelPool (nullptr); + std::unique_lock lt(m_TestsMutex); for (auto& it: m_Tests) if (it.second.first == expiredTunnel) it.second.first = nullptr; - std::unique_lock l(m_OutboundTunnelsMutex); + std::unique_lock lo(m_OutboundTunnelsMutex); m_OutboundTunnels.erase (expiredTunnel); } } @@ -261,11 +266,21 @@ namespace tunnel return tunnel; } + std::shared_ptr TunnelPool::GetLocalDestination () const { + std::unique_lock l(m_LocalDestinationMutex); + return m_LocalDestination; + } + + void TunnelPool::SetLocalDestination (std::shared_ptr destination) { + std::unique_lock l(m_LocalDestinationMutex); + m_LocalDestination = destination; + } + void TunnelPool::CreateTunnels () { int num = 0; { - std::unique_lock l(m_OutboundTunnelsMutex); + std::unique_lock lo(m_OutboundTunnelsMutex); for (const auto& it : m_OutboundTunnels) if (it->IsEstablished ()) num++; } @@ -274,10 +289,11 @@ namespace tunnel num = 0; { - std::unique_lock l(m_InboundTunnelsMutex); + std::unique_lock li(m_InboundTunnelsMutex); for (const auto& it : m_InboundTunnels) if (it->IsEstablished ()) num++; } + std::unique_lock lo(m_OutboundTunnelsMutex); if (!num && !m_OutboundTunnels.empty () && m_NumOutboundHops > 0) { for (auto it: m_OutboundTunnels) @@ -287,9 +303,11 @@ namespace tunnel if (num >= m_NumInboundTunnels) break; } } + lo.unlock(); for (int i = num; i < m_NumInboundTunnels; i++) CreateInboundTunnel (); + std::unique_lock l(m_LocalDestinationMutex); if (num < m_NumInboundTunnels && m_NumInboundHops <= 0 && m_LocalDestination) // zero hops IB m_LocalDestination->SetLeaseSetUpdated (); // update LeaseSet immediately } @@ -311,7 +329,7 @@ namespace tunnel if (it.second.first->GetState () == eTunnelStateTestFailed) { it.second.first->SetState (eTunnelStateFailed); - std::unique_lock l(m_OutboundTunnelsMutex); + std::unique_lock lo(m_OutboundTunnelsMutex); m_OutboundTunnels.erase (it.second.first); } else @@ -323,9 +341,10 @@ namespace tunnel { it.second.second->SetState (eTunnelStateFailed); { - std::unique_lock l(m_InboundTunnelsMutex); + std::unique_lock li(m_InboundTunnelsMutex); m_InboundTunnels.erase (it.second.second); } + std::unique_lock ld(m_LocalDestinationMutex); if (m_LocalDestination) m_LocalDestination->SetLeaseSetUpdated (); } @@ -335,7 +354,10 @@ namespace tunnel } // new tests + std::unique_lock lt(m_TestsMutex); + std::unique_lock lo(m_OutboundTunnelsMutex); auto it1 = m_OutboundTunnels.begin (); + std::unique_lock li(m_InboundTunnelsMutex); auto it2 = m_InboundTunnels.begin (); while (it1 != m_OutboundTunnels.end () && it2 != m_InboundTunnels.end ()) { @@ -355,7 +377,6 @@ namespace tunnel uint32_t msgID; RAND_bytes ((uint8_t *)&msgID, 4); { - std::unique_lock l(m_TestsMutex); m_Tests[msgID] = std::make_pair (*it1, *it2); } (*it1)->SendTunnelDataMsg ((*it2)->GetNextIdentHash (), (*it2)->GetNextTunnelID (), @@ -377,10 +398,11 @@ namespace tunnel void TunnelPool::ProcessGarlicMessage (std::shared_ptr msg) { + std::unique_lock l(m_LocalDestinationMutex); if (m_LocalDestination) m_LocalDestination->ProcessGarlicMessage (msg); else - LogPrint (eLogWarning, "Tunnels: local destination doesn't exist, dropped"); + LogPrint (eLogWarning, "Tunnels: local destination doesn't exist, garlic message dropped"); } void TunnelPool::ProcessDeliveryStatus (std::shared_ptr msg) @@ -425,10 +447,11 @@ namespace tunnel } else { + std::unique_lock l(m_LocalDestinationMutex); if (m_LocalDestination) m_LocalDestination->ProcessDeliveryStatusMessage (msg); else - LogPrint (eLogWarning, "Tunnels: Local destination doesn't exist, dropped"); + LogPrint (eLogWarning, "Tunnels: Local destination doesn't exist, delivery status message dropped"); } } @@ -512,13 +535,16 @@ namespace tunnel if (m_CustomPeerSelector) return m_CustomPeerSelector->SelectPeers(path, numHops, isInbound); } + // explicit peers in use + std::lock_guard lock(m_ExplicitPeersMutex); if (m_ExplicitPeers) return SelectExplicitPeers (path, isInbound); return StandardSelectPeers(path, numHops, isInbound, std::bind(&TunnelPool::SelectNextHop, this, std::placeholders::_1, std::placeholders::_2)); } bool TunnelPool::SelectExplicitPeers (Path& path, bool isInbound) { + std::unique_lock l(m_ExplicitPeersMutex); int numHops = isInbound ? m_NumInboundHops : m_NumOutboundHops; if (numHops > (int)m_ExplicitPeers->size ()) numHops = m_ExplicitPeers->size (); if (!numHops) return false; @@ -610,8 +636,10 @@ namespace tunnel Path path; if (SelectPeers (path, false)) { + std::unique_lock ld(m_LocalDestinationMutex); if (m_LocalDestination && !m_LocalDestination->SupportsEncryptionType (i2p::data::CRYPTO_KEY_TYPE_ECIES_X25519_AEAD)) path.isShort = false; // because can't handle ECIES encrypted reply + ld.unlock(); std::shared_ptr config; if (m_NumOutboundHops > 0) diff --git a/libi2pd/TunnelPool.h b/libi2pd/TunnelPool.h index 875a9955..1069e1b9 100644 --- a/libi2pd/TunnelPool.h +++ b/libi2pd/TunnelPool.h @@ -64,8 +64,8 @@ namespace tunnel TunnelPool (int numInboundHops, int numOutboundHops, int numInboundTunnels, int numOutboundTunnels); ~TunnelPool (); - std::shared_ptr GetLocalDestination () const { return m_LocalDestination; }; - void SetLocalDestination (std::shared_ptr destination) { m_LocalDestination = destination; }; + std::shared_ptr GetLocalDestination () const; + void SetLocalDestination (std::shared_ptr destination); void SetExplicitPeers (std::shared_ptr > explicitPeers); void CreateTunnels (); @@ -126,8 +126,10 @@ namespace tunnel private: + mutable std::mutex m_LocalDestinationMutex; std::shared_ptr m_LocalDestination; int m_NumInboundHops, m_NumOutboundHops, m_NumInboundTunnels, m_NumOutboundTunnels; + mutable std::mutex m_ExplicitPeersMutex; std::shared_ptr > m_ExplicitPeers; mutable std::mutex m_InboundTunnelsMutex; std::set, TunnelCreationTimeCmp> m_InboundTunnels; // recent tunnel appears first