Browse Source

separate inbound and outbound pending tunnels

pull/151/head
orignal 10 years ago
parent
commit
8f562215b0
  1. 8
      I2NPProtocol.cpp
  2. 51
      Tunnel.cpp
  3. 13
      Tunnel.h

8
I2NPProtocol.cpp

@ -312,7 +312,7 @@ namespace i2p
int num = buf[0]; int num = buf[0];
LogPrint ("VariableTunnelBuild ", num, " records"); LogPrint ("VariableTunnelBuild ", num, " records");
i2p::tunnel::Tunnel * tunnel = i2p::tunnel::tunnels.GetPendingTunnel (replyMsgID); auto tunnel = i2p::tunnel::tunnels.GetPendingInboundTunnel (replyMsgID);
if (tunnel) if (tunnel)
{ {
// endpoint of inbound tunnel // endpoint of inbound tunnel
@ -321,7 +321,7 @@ namespace i2p
{ {
LogPrint ("Inbound tunnel ", tunnel->GetTunnelID (), " has been created"); LogPrint ("Inbound tunnel ", tunnel->GetTunnelID (), " has been created");
tunnel->SetState (i2p::tunnel::eTunnelStateEstablished); tunnel->SetState (i2p::tunnel::eTunnelStateEstablished);
i2p::tunnel::tunnels.AddInboundTunnel (static_cast<i2p::tunnel::InboundTunnel *>(tunnel)); i2p::tunnel::tunnels.AddInboundTunnel (tunnel);
} }
else else
{ {
@ -373,7 +373,7 @@ namespace i2p
void HandleVariableTunnelBuildReplyMsg (uint32_t replyMsgID, uint8_t * buf, size_t len) void HandleVariableTunnelBuildReplyMsg (uint32_t replyMsgID, uint8_t * buf, size_t len)
{ {
LogPrint ("VariableTunnelBuildReplyMsg replyMsgID=", replyMsgID); LogPrint ("VariableTunnelBuildReplyMsg replyMsgID=", replyMsgID);
i2p::tunnel::Tunnel * tunnel = i2p::tunnel::tunnels.GetPendingTunnel (replyMsgID); auto tunnel = i2p::tunnel::tunnels.GetPendingOutboundTunnel (replyMsgID);
if (tunnel) if (tunnel)
{ {
// reply for outbound tunnel // reply for outbound tunnel
@ -381,7 +381,7 @@ namespace i2p
{ {
LogPrint ("Outbound tunnel ", tunnel->GetTunnelID (), " has been created"); LogPrint ("Outbound tunnel ", tunnel->GetTunnelID (), " has been created");
tunnel->SetState (i2p::tunnel::eTunnelStateEstablished); tunnel->SetState (i2p::tunnel::eTunnelStateEstablished);
i2p::tunnel::tunnels.AddOutboundTunnel (static_cast<i2p::tunnel::OutboundTunnel *>(tunnel)); i2p::tunnel::tunnels.AddOutboundTunnel (tunnel);
} }
else else
{ {

51
Tunnel.cpp

@ -222,10 +222,13 @@ namespace tunnel
m_TransitTunnels.clear (); m_TransitTunnels.clear ();
ManagePendingTunnels (); ManagePendingTunnels ();
for (auto& it : m_PendingTunnels) for (auto& it : m_PendingInboundTunnels)
delete it.second; delete it.second;
m_PendingTunnels.clear (); m_PendingInboundTunnels.clear ();
for (auto& it : m_PendingOutboundTunnels)
delete it.second;
m_PendingOutboundTunnels.clear ();
} }
InboundTunnel * Tunnels::GetInboundTunnel (uint32_t tunnelID) InboundTunnel * Tunnels::GetInboundTunnel (uint32_t tunnelID)
@ -244,10 +247,21 @@ namespace tunnel
return nullptr; return nullptr;
} }
Tunnel * Tunnels::GetPendingTunnel (uint32_t replyMsgID) InboundTunnel * Tunnels::GetPendingInboundTunnel (uint32_t replyMsgID)
{ {
auto it = m_PendingTunnels.find(replyMsgID); return GetPendingTunnel (replyMsgID, m_PendingInboundTunnels);
if (it != m_PendingTunnels.end () && it->second->GetState () == eTunnelStatePending) }
OutboundTunnel * Tunnels::GetPendingOutboundTunnel (uint32_t replyMsgID)
{
return GetPendingTunnel (replyMsgID, m_PendingOutboundTunnels);
}
template<class TTunnel>
TTunnel * Tunnels::GetPendingTunnel (uint32_t replyMsgID, const std::map<uint32_t, TTunnel *>& pendingTunnels)
{
auto it = pendingTunnels.find(replyMsgID);
if (it != pendingTunnels.end () && it->second->GetState () == eTunnelStatePending)
{ {
it->second->SetState (eTunnelStateBuildReplyReceived); it->second->SetState (eTunnelStateBuildReplyReceived);
return it->second; return it->second;
@ -447,10 +461,17 @@ namespace tunnel
} }
void Tunnels::ManagePendingTunnels () void Tunnels::ManagePendingTunnels ()
{
ManagePendingTunnels (m_PendingInboundTunnels);
ManagePendingTunnels (m_PendingOutboundTunnels);
}
template<class PendingTunnels>
void Tunnels::ManagePendingTunnels (PendingTunnels& pendingTunnels)
{ {
// check pending tunnel. delete failed or timeout // check pending tunnel. delete failed or timeout
uint64_t ts = i2p::util::GetSecondsSinceEpoch (); uint64_t ts = i2p::util::GetSecondsSinceEpoch ();
for (auto it = m_PendingTunnels.begin (); it != m_PendingTunnels.end ();) for (auto it = pendingTunnels.begin (); it != pendingTunnels.end ();)
{ {
auto tunnel = it->second; auto tunnel = it->second;
switch (tunnel->GetState ()) switch (tunnel->GetState ())
@ -460,7 +481,7 @@ namespace tunnel
{ {
LogPrint ("Pending tunnel build request ", it->first, " timeout. Deleted"); LogPrint ("Pending tunnel build request ", it->first, " timeout. Deleted");
delete tunnel; delete tunnel;
it = m_PendingTunnels.erase (it); it = pendingTunnels.erase (it);
} }
else else
it++; it++;
@ -468,14 +489,14 @@ namespace tunnel
case eTunnelStateBuildFailed: case eTunnelStateBuildFailed:
LogPrint ("Pending tunnel build request ", it->first, " failed. Deleted"); LogPrint ("Pending tunnel build request ", it->first, " failed. Deleted");
delete tunnel; delete tunnel;
it = m_PendingTunnels.erase (it); it = pendingTunnels.erase (it);
break; break;
case eTunnelStateBuildReplyReceived: case eTunnelStateBuildReplyReceived:
// intermidiate state, will be either established of build failed // intermidiate state, will be either established of build failed
it++; it++;
break; break;
default: default:
it = m_PendingTunnels.erase (it); it = pendingTunnels.erase (it);
} }
} }
} }
@ -625,11 +646,21 @@ namespace tunnel
{ {
TTunnel * newTunnel = new TTunnel (config); TTunnel * newTunnel = new TTunnel (config);
uint32_t replyMsgID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); uint32_t replyMsgID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
m_PendingTunnels[replyMsgID] = newTunnel; AddPendingTunnel (replyMsgID, newTunnel);
newTunnel->Build (replyMsgID, outboundTunnel); newTunnel->Build (replyMsgID, outboundTunnel);
return newTunnel; return newTunnel;
} }
void Tunnels::AddPendingTunnel (uint32_t replyMsgID, InboundTunnel * tunnel)
{
m_PendingInboundTunnels[replyMsgID] = tunnel;
}
void Tunnels::AddPendingTunnel (uint32_t replyMsgID, OutboundTunnel * tunnel)
{
m_PendingOutboundTunnels[replyMsgID] = tunnel;
}
void Tunnels::AddOutboundTunnel (OutboundTunnel * newTunnel) void Tunnels::AddOutboundTunnel (OutboundTunnel * newTunnel)
{ {
std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex); std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex);

13
Tunnel.h

@ -121,7 +121,8 @@ namespace tunnel
void Stop (); void Stop ();
InboundTunnel * GetInboundTunnel (uint32_t tunnelID); InboundTunnel * GetInboundTunnel (uint32_t tunnelID);
Tunnel * GetPendingTunnel (uint32_t replyMsgID); InboundTunnel * GetPendingInboundTunnel (uint32_t replyMsgID);
OutboundTunnel * GetPendingOutboundTunnel (uint32_t replyMsgID);
InboundTunnel * GetNextInboundTunnel (); InboundTunnel * GetNextInboundTunnel ();
OutboundTunnel * GetNextOutboundTunnel (); OutboundTunnel * GetNextOutboundTunnel ();
std::shared_ptr<TunnelPool> GetExploratoryPool () const { return m_ExploratoryPool; }; std::shared_ptr<TunnelPool> GetExploratoryPool () const { return m_ExploratoryPool; };
@ -134,12 +135,17 @@ namespace tunnel
void PostTunnelData (const std::vector<I2NPMessage *>& msgs); void PostTunnelData (const std::vector<I2NPMessage *>& msgs);
template<class TTunnel> template<class TTunnel>
TTunnel * CreateTunnel (TunnelConfig * config, OutboundTunnel * outboundTunnel = 0); TTunnel * CreateTunnel (TunnelConfig * config, OutboundTunnel * outboundTunnel = 0);
void AddPendingTunnel (uint32_t replyMsgID, InboundTunnel * tunnel);
void AddPendingTunnel (uint32_t replyMsgID, OutboundTunnel * tunnel);
std::shared_ptr<TunnelPool> CreateTunnelPool (i2p::garlic::GarlicDestination * localDestination, int numInboundHops, int numOuboundHops); std::shared_ptr<TunnelPool> CreateTunnelPool (i2p::garlic::GarlicDestination * localDestination, int numInboundHops, int numOuboundHops);
void DeleteTunnelPool (std::shared_ptr<TunnelPool> pool); void DeleteTunnelPool (std::shared_ptr<TunnelPool> pool);
void StopTunnelPool (std::shared_ptr<TunnelPool> pool); void StopTunnelPool (std::shared_ptr<TunnelPool> pool);
private: private:
template<class TTunnel>
TTunnel * GetPendingTunnel (uint32_t replyMsgID, const std::map<uint32_t, TTunnel *>& pendingTunnels);
void HandleTunnelGatewayMsg (TunnelBase * tunnel, I2NPMessage * msg); void HandleTunnelGatewayMsg (TunnelBase * tunnel, I2NPMessage * msg);
void Run (); void Run ();
@ -148,6 +154,8 @@ namespace tunnel
void ManageInboundTunnels (); void ManageInboundTunnels ();
void ManageTransitTunnels (); void ManageTransitTunnels ();
void ManagePendingTunnels (); void ManagePendingTunnels ();
template<class PendingTunnels>
void ManagePendingTunnels (PendingTunnels& pendingTunnels);
void ManageTunnelPools (); void ManageTunnelPools ();
void CreateZeroHopsInboundTunnel (); void CreateZeroHopsInboundTunnel ();
@ -156,7 +164,8 @@ namespace tunnel
bool m_IsRunning; bool m_IsRunning;
std::thread * m_Thread; std::thread * m_Thread;
std::map<uint32_t, Tunnel *> m_PendingTunnels; // by replyMsgID std::map<uint32_t, InboundTunnel *> m_PendingInboundTunnels; // by replyMsgID
std::map<uint32_t, OutboundTunnel *> m_PendingOutboundTunnels; // by replyMsgID
std::mutex m_InboundTunnelsMutex; std::mutex m_InboundTunnelsMutex;
std::map<uint32_t, InboundTunnel *> m_InboundTunnels; std::map<uint32_t, InboundTunnel *> m_InboundTunnels;
std::mutex m_OutboundTunnelsMutex; std::mutex m_OutboundTunnelsMutex;

Loading…
Cancel
Save