Browse Source

handle tunnel build messages in tunnels thread

pull/151/head
orignal 10 years ago
parent
commit
4b094b2156
  1. 7
      I2NPProtocol.cpp
  2. 86
      Tunnel.cpp
  3. 2
      Tunnel.h

7
I2NPProtocol.cpp

@ -544,6 +544,13 @@ namespace i2p
else else
i2p::context.ProcessDeliveryStatusMessage (msg); i2p::context.ProcessDeliveryStatusMessage (msg);
break; break;
case eI2NPVariableTunnelBuild:
case eI2NPVariableTunnelBuildReply:
case eI2NPTunnelBuild:
case eI2NPTunnelBuildReply:
// forward to tunnel thread
i2p::tunnel::tunnels.PostTunnelData (msg);
break;
default: default:
HandleI2NPMessage (msg->GetBuffer (), msg->GetLength ()); HandleI2NPMessage (msg->GetBuffer (), msg->GetLength ());
DeleteI2NPMessage (msg); DeleteI2NPMessage (msg);

86
Tunnel.cpp

@ -273,7 +273,6 @@ namespace tunnel
{ {
InboundTunnel * tunnel = nullptr; InboundTunnel * tunnel = nullptr;
size_t minReceived = 0; size_t minReceived = 0;
std::unique_lock<std::mutex> l(m_InboundTunnelsMutex);
for (auto it : m_InboundTunnels) for (auto it : m_InboundTunnels)
{ {
if (!it.second->IsEstablished ()) continue; if (!it.second->IsEstablished ()) continue;
@ -291,7 +290,6 @@ namespace tunnel
CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator (); CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator ();
uint32_t ind = rnd.GenerateWord32 (0, m_OutboundTunnels.size () - 1), i = 0; uint32_t ind = rnd.GenerateWord32 (0, m_OutboundTunnels.size () - 1), i = 0;
OutboundTunnel * tunnel = nullptr; OutboundTunnel * tunnel = nullptr;
std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex);
for (auto it: m_OutboundTunnels) for (auto it: m_OutboundTunnels)
{ {
if (it->IsEstablished ()) if (it->IsEstablished ())
@ -369,39 +367,59 @@ namespace tunnel
I2NPMessage * msg = m_Queue.GetNextWithTimeout (1000); // 1 sec I2NPMessage * msg = m_Queue.GetNextWithTimeout (1000); // 1 sec
if (msg) if (msg)
{ {
uint8_t typeID = msg->GetTypeID (); uint32_t prevTunnelID = 0, tunnelID = 0;
uint32_t prevTunnelID = 0; TunnelBase * prevTunnel = nullptr, * tunnel = nullptr;
TunnelBase * prevTunnel = nullptr;
do do
{ {
uint32_t tunnelID = bufbe32toh (msg->GetPayload ()); uint8_t typeID = msg->GetTypeID ();
TunnelBase * tunnel = nullptr; switch (typeID)
if (tunnelID == prevTunnelID) {
tunnel = prevTunnel; case eI2NPTunnelData:
else if (prevTunnel) case eI2NPTunnelGateway:
prevTunnel->FlushTunnelDataMsgs (); {
tunnelID = bufbe32toh (msg->GetPayload ());
if (tunnelID == prevTunnelID)
tunnel = prevTunnel;
else if (prevTunnel)
prevTunnel->FlushTunnelDataMsgs ();
if (!tunnel && typeID == eI2NPTunnelData) if (!tunnel && typeID == eI2NPTunnelData)
tunnel = GetInboundTunnel (tunnelID); tunnel = GetInboundTunnel (tunnelID);
if (!tunnel) if (!tunnel)
tunnel = GetTransitTunnel (tunnelID); tunnel = GetTransitTunnel (tunnelID);
if (tunnel) if (tunnel)
{ {
if (typeID == eI2NPTunnelData) if (typeID == eI2NPTunnelData)
tunnel->HandleTunnelDataMsg (msg); tunnel->HandleTunnelDataMsg (msg);
else // tunnel gateway assumed else // tunnel gateway assumed
HandleTunnelGatewayMsg (tunnel, msg); HandleTunnelGatewayMsg (tunnel, msg);
}
else
{
LogPrint (eLogWarning, "Tunnel ", tunnelID, " not found");
DeleteI2NPMessage (msg);
}
break;
}
case eI2NPVariableTunnelBuild:
case eI2NPVariableTunnelBuildReply:
case eI2NPTunnelBuild:
case eI2NPTunnelBuildReply:
{
HandleI2NPMessage (msg->GetBuffer (), msg->GetLength ());
DeleteI2NPMessage (msg);
break;
}
default:
{
LogPrint (eLogError, "Unexpected messsage type ", (int)typeID);
DeleteI2NPMessage (msg);
}
} }
else
{
LogPrint ("Tunnel ", tunnelID, " not found");
DeleteI2NPMessage (msg);
}
msg = m_Queue.Get (); msg = m_Queue.Get ();
if (msg) if (msg)
{ {
typeID = msg->GetTypeID ();
prevTunnelID = tunnelID; prevTunnelID = tunnelID;
prevTunnel = tunnel; prevTunnel = tunnel;
} }
@ -517,10 +535,7 @@ namespace tunnel
if (pool) if (pool)
pool->TunnelExpired (tunnel); pool->TunnelExpired (tunnel);
} }
{ it = m_OutboundTunnels.erase (it);
std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex);
it = m_OutboundTunnels.erase (it);
}
delete tunnel; delete tunnel;
} }
else else
@ -562,10 +577,7 @@ namespace tunnel
if (pool) if (pool)
pool->TunnelExpired (tunnel); pool->TunnelExpired (tunnel);
} }
{ it = m_InboundTunnels.erase (it);
std::unique_lock<std::mutex> l(m_InboundTunnelsMutex);
it = m_InboundTunnels.erase (it);
}
delete tunnel; delete tunnel;
} }
else else
@ -664,7 +676,6 @@ namespace tunnel
void Tunnels::AddOutboundTunnel (OutboundTunnel * newTunnel) void Tunnels::AddOutboundTunnel (OutboundTunnel * newTunnel)
{ {
std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex);
m_OutboundTunnels.push_back (newTunnel); m_OutboundTunnels.push_back (newTunnel);
auto pool = newTunnel->GetTunnelPool (); auto pool = newTunnel->GetTunnelPool ();
if (pool && pool->IsActive ()) if (pool && pool->IsActive ())
@ -675,7 +686,6 @@ namespace tunnel
void Tunnels::AddInboundTunnel (InboundTunnel * newTunnel) void Tunnels::AddInboundTunnel (InboundTunnel * newTunnel)
{ {
std::unique_lock<std::mutex> l(m_InboundTunnelsMutex);
m_InboundTunnels[newTunnel->GetTunnelID ()] = newTunnel; m_InboundTunnels[newTunnel->GetTunnelID ()] = newTunnel;
auto pool = newTunnel->GetTunnelPool (); auto pool = newTunnel->GetTunnelPool ();
if (!pool) if (!pool)

2
Tunnel.h

@ -166,9 +166,7 @@ namespace tunnel
std::thread * m_Thread; std::thread * m_Thread;
std::map<uint32_t, InboundTunnel *> m_PendingInboundTunnels; // by replyMsgID std::map<uint32_t, InboundTunnel *> m_PendingInboundTunnels; // by replyMsgID
std::map<uint32_t, OutboundTunnel *> m_PendingOutboundTunnels; // by replyMsgID std::map<uint32_t, OutboundTunnel *> m_PendingOutboundTunnels; // by replyMsgID
std::mutex m_InboundTunnelsMutex;
std::map<uint32_t, InboundTunnel *> m_InboundTunnels; std::map<uint32_t, InboundTunnel *> m_InboundTunnels;
std::mutex m_OutboundTunnelsMutex;
std::list<OutboundTunnel *> m_OutboundTunnels; std::list<OutboundTunnel *> m_OutboundTunnels;
std::mutex m_TransitTunnelsMutex; std::mutex m_TransitTunnelsMutex;
std::map<uint32_t, TransitTunnel *> m_TransitTunnels; std::map<uint32_t, TransitTunnel *> m_TransitTunnels;

Loading…
Cancel
Save