mirror of
https://github.com/PurpleI2P/i2pd.git
synced 2025-01-11 13:27:52 +00:00
handle transit tunnel build messages in separate thread
This commit is contained in:
parent
b80278421d
commit
d241e5d5cb
@ -122,14 +122,86 @@ namespace tunnel
|
||||
}
|
||||
}
|
||||
|
||||
TransitTunnels::TransitTunnels ():
|
||||
m_IsRunning (false)
|
||||
{
|
||||
}
|
||||
|
||||
TransitTunnels::~TransitTunnels ()
|
||||
{
|
||||
Stop ();
|
||||
}
|
||||
|
||||
void TransitTunnels::Start ()
|
||||
{
|
||||
m_IsRunning = true;
|
||||
m_Thread.reset (new std::thread (std::bind (&TransitTunnels::Run, this)));
|
||||
}
|
||||
|
||||
void TransitTunnels::Stop ()
|
||||
{
|
||||
m_IsRunning = false;
|
||||
m_TunnelBuildMsgQueue.WakeUp ();
|
||||
if (m_Thread)
|
||||
{
|
||||
m_Thread->join ();
|
||||
m_Thread = nullptr;
|
||||
}
|
||||
m_TransitTunnels.clear ();
|
||||
}
|
||||
|
||||
void TransitTunnels::Run ()
|
||||
{
|
||||
i2p::util::SetThreadName("TBM");
|
||||
uint64_t lastTs = 0;
|
||||
std::list<std::shared_ptr<I2NPMessage> > msgs;
|
||||
while (m_IsRunning)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (m_TunnelBuildMsgQueue.Wait (TRANSIT_TUNNELS_QUEUE_WAIT_INTERVAL, 0))
|
||||
{
|
||||
m_TunnelBuildMsgQueue.GetWholeQueue (msgs);
|
||||
while (!msgs.empty ())
|
||||
{
|
||||
auto msg = msgs.front (); msgs.pop_front ();
|
||||
if (!msg) continue;
|
||||
uint8_t typeID = msg->GetTypeID ();
|
||||
switch (typeID)
|
||||
{
|
||||
case eI2NPShortTunnelBuild:
|
||||
HandleShortTransitTunnelBuildMsg (std::move (msg));
|
||||
break;
|
||||
case eI2NPVariableTunnelBuild:
|
||||
HandleVariableTransitTunnelBuildMsg (std::move (msg));
|
||||
break;
|
||||
default:
|
||||
LogPrint (eLogWarning, "TransitTunnel: Unexpected message type ", (int) typeID);
|
||||
}
|
||||
if (!m_IsRunning) break;
|
||||
}
|
||||
}
|
||||
if (m_IsRunning)
|
||||
{
|
||||
uint64_t ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
if (ts >= lastTs + TUNNEL_MANAGE_INTERVAL || ts + TUNNEL_MANAGE_INTERVAL < lastTs)
|
||||
{
|
||||
ManageTransitTunnels (ts);
|
||||
lastTs = ts;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LogPrint (eLogError, "TransitTunnel: Runtime exception: ", ex.what ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void TransitTunnels::PostTransitTunnelBuildMsg (std::shared_ptr<I2NPMessage>&& msg)
|
||||
{
|
||||
if (msg) m_TunnelBuildMsgQueue.Put (msg);
|
||||
}
|
||||
|
||||
void TransitTunnels::HandleShortTransitTunnelBuildMsg (std::shared_ptr<I2NPMessage>&& msg)
|
||||
{
|
||||
@ -234,6 +306,7 @@ namespace tunnel
|
||||
{
|
||||
if (transitTunnel)
|
||||
{
|
||||
LogPrint (eLogDebug, "TransitTunnel: Failed to send reply for transit tunnel ", transitTunnel->GetTunnelID ());
|
||||
auto t = transitTunnel->GetCreationTime ();
|
||||
if (t > i2p::tunnel::TUNNEL_EXPIRATION_TIMEOUT)
|
||||
// make transit tunnel expired
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
#include "Crypto.h"
|
||||
#include "Queue.h"
|
||||
#include "I2NPProtocol.h"
|
||||
#include "TunnelEndpoint.h"
|
||||
#include "TunnelGateway.h"
|
||||
@ -109,33 +110,45 @@ namespace tunnel
|
||||
const i2p::crypto::AESKey& layerKey, const i2p::crypto::AESKey& ivKey,
|
||||
bool isGateway, bool isEndpoint);
|
||||
|
||||
|
||||
const int TRANSIT_TUNNELS_QUEUE_WAIT_INTERVAL = 10; // in seconds
|
||||
|
||||
class TransitTunnels
|
||||
{
|
||||
public:
|
||||
|
||||
TransitTunnels ();
|
||||
~TransitTunnels ();
|
||||
|
||||
void Start ();
|
||||
void Stop ();
|
||||
void ManageTransitTunnels (uint64_t ts);
|
||||
|
||||
void PostTransitTunnelBuildMsg (std::shared_ptr<I2NPMessage>&& msg);
|
||||
|
||||
size_t GetNumTransitTunnels () const { return m_TransitTunnels.size (); }
|
||||
int GetTransitTunnelsExpirationTimeout ();
|
||||
|
||||
void HandleShortTransitTunnelBuildMsg (std::shared_ptr<I2NPMessage>&& msg);
|
||||
void HandleVariableTransitTunnelBuildMsg (std::shared_ptr<I2NPMessage>&& msg);
|
||||
|
||||
private:
|
||||
|
||||
bool AddTransitTunnel (std::shared_ptr<TransitTunnel> tunnel);
|
||||
void ManageTransitTunnels (uint64_t ts);
|
||||
|
||||
void HandleShortTransitTunnelBuildMsg (std::shared_ptr<I2NPMessage>&& msg);
|
||||
void HandleVariableTransitTunnelBuildMsg (std::shared_ptr<I2NPMessage>&& msg);
|
||||
bool HandleBuildRequestRecords (int num, uint8_t * records, uint8_t * clearText);
|
||||
|
||||
void Run ();
|
||||
|
||||
private:
|
||||
|
||||
volatile bool m_IsRunning;
|
||||
std::unique_ptr<std::thread> m_Thread;
|
||||
std::list<std::shared_ptr<TransitTunnel> > m_TransitTunnels;
|
||||
|
||||
i2p::util::Queue<std::shared_ptr<I2NPMessage> > m_TunnelBuildMsgQueue;
|
||||
|
||||
public:
|
||||
|
||||
// for HTTP only
|
||||
auto& GetTransitTunnels () const { return m_TransitTunnels; };
|
||||
const auto& GetTransitTunnels () const { return m_TransitTunnels; };
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -373,6 +373,7 @@ namespace tunnel
|
||||
|
||||
std::shared_ptr<TunnelBase> Tunnels::GetTunnel (uint32_t tunnelID)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_TunnelsMutex);
|
||||
auto it = m_Tunnels.find(tunnelID);
|
||||
if (it != m_Tunnels.end ())
|
||||
return it->second;
|
||||
@ -382,11 +383,13 @@ namespace tunnel
|
||||
bool Tunnels::AddTunnel (std::shared_ptr<TunnelBase> tunnel)
|
||||
{
|
||||
if (!tunnel) return false;
|
||||
std::lock_guard<std::mutex> l(m_TunnelsMutex);
|
||||
return m_Tunnels.emplace (tunnel->GetTunnelID (), tunnel).second;
|
||||
}
|
||||
|
||||
void Tunnels::RemoveTunnel (uint32_t tunnelID)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_TunnelsMutex);
|
||||
m_Tunnels.erase (tunnelID);
|
||||
}
|
||||
|
||||
@ -655,7 +658,7 @@ namespace tunnel
|
||||
return;
|
||||
}
|
||||
else
|
||||
m_TransitTunnels.HandleShortTransitTunnelBuildMsg (std::move (msg));
|
||||
m_TransitTunnels.PostTransitTunnelBuildMsg (std::move (msg));
|
||||
}
|
||||
|
||||
void Tunnels::HandleVariableTunnelBuildMsg (std::shared_ptr<I2NPMessage> msg)
|
||||
@ -678,7 +681,7 @@ namespace tunnel
|
||||
}
|
||||
}
|
||||
else
|
||||
m_TransitTunnels.HandleVariableTransitTunnelBuildMsg (std::move (msg));
|
||||
m_TransitTunnels.PostTransitTunnelBuildMsg (std::move (msg));
|
||||
}
|
||||
|
||||
void Tunnels::HandleTunnelBuildReplyMsg (std::shared_ptr<I2NPMessage> msg, bool isShort)
|
||||
@ -710,7 +713,6 @@ namespace tunnel
|
||||
ManagePendingTunnels (ts);
|
||||
ManageInboundTunnels (ts);
|
||||
ManageOutboundTunnels (ts);
|
||||
m_TransitTunnels.ManageTransitTunnels (ts);
|
||||
}
|
||||
|
||||
void Tunnels::ManagePendingTunnels (uint64_t ts)
|
||||
|
@ -300,8 +300,9 @@ namespace tunnel
|
||||
std::map<uint32_t, std::shared_ptr<OutboundTunnel> > m_PendingOutboundTunnels; // by replyMsgID
|
||||
std::list<std::shared_ptr<InboundTunnel> > m_InboundTunnels;
|
||||
std::list<std::shared_ptr<OutboundTunnel> > m_OutboundTunnels;
|
||||
mutable std::mutex m_TunnelsMutex;
|
||||
std::unordered_map<uint32_t, std::shared_ptr<TunnelBase> > m_Tunnels; // tunnelID->tunnel known by this id
|
||||
std::mutex m_PoolsMutex;
|
||||
mutable std::mutex m_PoolsMutex;
|
||||
std::list<std::shared_ptr<TunnelPool>> m_Pools;
|
||||
std::shared_ptr<TunnelPool> m_ExploratoryPool;
|
||||
i2p::util::Queue<std::shared_ptr<I2NPMessage> > m_Queue;
|
||||
@ -320,7 +321,7 @@ namespace tunnel
|
||||
// for HTTP only
|
||||
const decltype(m_OutboundTunnels)& GetOutboundTunnels () const { return m_OutboundTunnels; };
|
||||
const decltype(m_InboundTunnels)& GetInboundTunnels () const { return m_InboundTunnels; };
|
||||
auto& GetTransitTunnels () const { return m_TransitTunnels.GetTransitTunnels (); };
|
||||
const auto& GetTransitTunnels () const { return m_TransitTunnels.GetTransitTunnels (); };
|
||||
|
||||
size_t CountTransitTunnels() const;
|
||||
size_t CountInboundTunnels() const;
|
||||
|
Loading…
Reference in New Issue
Block a user