diff --git a/Tunnel.cpp b/Tunnel.cpp index 6a6accd6..7d2e6735 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -13,19 +13,19 @@ #include "Tunnel.h" namespace i2p -{ +{ namespace tunnel -{ - +{ + Tunnel::Tunnel (std::shared_ptr config): TunnelBase (config->GetTunnelID (), config->GetNextTunnelID (), config->GetNextIdentHash ()), m_Config (config), m_Pool (nullptr), m_State (eTunnelStatePending), m_IsRecreated (false) { - } + } Tunnel::~Tunnel () { - } + } void Tunnel::Build (uint32_t replyMsgID, std::shared_ptr outboundTunnel) { @@ -33,7 +33,7 @@ namespace tunnel int numRecords = numHops <= STANDARD_NUM_RECORDS ? STANDARD_NUM_RECORDS : numHops; auto msg = NewI2NPShortMessage (); *msg->GetPayload () = numRecords; - msg->len += numRecords*TUNNEL_BUILD_RECORD_SIZE + 1; + msg->len += numRecords*TUNNEL_BUILD_RECORD_SIZE + 1; // shuffle records std::vector recordIndicies; @@ -56,13 +56,13 @@ namespace tunnel hop->recordIndex = idx; i++; hop = hop->next; - } - // fill up fake records with random data + } + // fill up fake records with random data for (int i = numHops; i < numRecords; i++) { int idx = recordIndicies[i]; RAND_bytes (records + idx*TUNNEL_BUILD_RECORD_SIZE, TUNNEL_BUILD_RECORD_SIZE); - } + } // decrypt real records i2p::crypto::CBCDecryption decryption; @@ -73,31 +73,31 @@ namespace tunnel // decrypt records after current hop TunnelHopConfig * hop1 = hop->next; while (hop1) - { + { decryption.SetIV (hop->replyIV); uint8_t * record = records + hop1->recordIndex*TUNNEL_BUILD_RECORD_SIZE; decryption.Decrypt(record, TUNNEL_BUILD_RECORD_SIZE, record); hop1 = hop1->next; - } + } hop = hop->prev; - } + } msg->FillI2NPMessageHeader (eI2NPVariableTunnelBuild); // send message if (outboundTunnel) - outboundTunnel->SendTunnelDataMsg (GetNextIdentHash (), 0, msg); + outboundTunnel->SendTunnelDataMsg (GetNextIdentHash (), 0, msg); else i2p::transport::transports.SendMessage (GetNextIdentHash (), msg); - } - + } + bool Tunnel::HandleTunnelBuildResponse (uint8_t * msg, size_t len) { LogPrint (eLogDebug, "Tunnel: TunnelBuildResponse ", (int)msg[0], " records."); - + i2p::crypto::CBCDecryption decryption; TunnelHopConfig * hop = m_Config->GetLastHop (); while (hop) - { + { decryption.SetKey (hop->replyKey); // decrypt records before and including current hop TunnelHopConfig * hop1 = hop; @@ -105,22 +105,22 @@ namespace tunnel { auto idx = hop1->recordIndex; if (idx >= 0 && idx < msg[0]) - { + { uint8_t * record = msg + 1 + idx*TUNNEL_BUILD_RECORD_SIZE; decryption.SetIV (hop->replyIV); decryption.Decrypt(record, TUNNEL_BUILD_RECORD_SIZE, record); - } + } else LogPrint (eLogWarning, "Tunnel: hop index ", idx, " is out of range"); hop1 = hop1->prev; - } + } hop = hop->prev; } bool established = true; hop = m_Config->GetFirstHop (); while (hop) - { + { const uint8_t * record = msg + 1 + hop->recordIndex*TUNNEL_BUILD_RECORD_SIZE; uint8_t ret = record[BUILD_RESPONSE_RECORD_RET_OFFSET]; LogPrint (eLogDebug, "Tunnel: Build response ret code=", (int)ret); @@ -143,12 +143,12 @@ namespace tunnel tunnelHop->decryption.SetKeys (hop->layerKey, hop->ivKey); m_Hops.push_back (std::unique_ptr(tunnelHop)); hop = hop->prev; - } + } m_Config = nullptr; - } + } if (established) m_State = eTunnelStateEstablished; return established; - } + } void Tunnel::EncryptTunnelMsg (std::shared_ptr in, std::shared_ptr out) { @@ -158,20 +158,20 @@ namespace tunnel { it->decryption.Decrypt (inPayload, outPayload); inPayload = outPayload; - } - } + } + } void Tunnel::SendTunnelDataMsg (std::shared_ptr msg) { LogPrint (eLogWarning, "Tunnel: Can't send I2NP messages without delivery instructions"); - } + } std::vector > Tunnel::GetPeers () const { auto peers = GetInvertedPeers (); - std::reverse (peers.begin (), peers.end ()); + std::reverse (peers.begin (), peers.end ()); return peers; - } + } std::vector > Tunnel::GetInvertedPeers () const { @@ -179,54 +179,54 @@ namespace tunnel std::vector > ret; for (auto& it: m_Hops) ret.push_back (it->ident); - return ret; - } + return ret; + } void Tunnel::PrintHops (std::stringstream& s) const { for (auto& it: m_Hops) - { - s << " ⇒ "; + { + s << " ⇒ "; s << i2p::data::GetIdentHashAbbreviation (it->ident->GetIdentHash ()); - } - } - + } + } + void InboundTunnel::HandleTunnelDataMsg (std::shared_ptr msg) { - if (IsFailed ()) SetState (eTunnelStateEstablished); // incoming messages means a tunnel is alive + if (IsFailed ()) SetState (eTunnelStateEstablished); // incoming messages means a tunnel is alive auto newMsg = CreateEmptyTunnelDataMsg (); EncryptTunnelMsg (msg, newMsg); newMsg->from = shared_from_this (); - m_Endpoint.HandleDecryptedTunnelDataMsg (newMsg); - } + m_Endpoint.HandleDecryptedTunnelDataMsg (newMsg); + } void InboundTunnel::Print (std::stringstream& s) const { PrintHops (s); - s << " ⇒ " << GetTunnelID () << ":me"; - } + s << " ⇒ " << GetTunnelID () << ":me"; + } ZeroHopsInboundTunnel::ZeroHopsInboundTunnel (): InboundTunnel (std::make_shared ()), m_NumReceivedBytes (0) { - } - + } + void ZeroHopsInboundTunnel::SendTunnelDataMsg (std::shared_ptr msg) { if (msg) - { + { m_NumReceivedBytes += msg->GetLength (); msg->from = shared_from_this (); HandleI2NPMessage (msg); - } - } + } + } void ZeroHopsInboundTunnel::Print (std::stringstream& s) const { - s << " ⇒ " << GetTunnelID () << ":me"; - } - + s << " ⇒ " << GetTunnelID () << ":me"; + } + void OutboundTunnel::SendTunnelDataMsg (const uint8_t * gwHash, uint32_t gwTunnel, std::shared_ptr msg) { TunnelMessageBlock block; @@ -234,28 +234,28 @@ namespace tunnel { block.hash = gwHash; if (gwTunnel) - { + { block.deliveryType = eDeliveryTypeTunnel; block.tunnelID = gwTunnel; - } + } else block.deliveryType = eDeliveryTypeRouter; - } - else + } + else block.deliveryType = eDeliveryTypeLocal; block.data = msg; - + SendTunnelDataMsg ({block}); } - + void OutboundTunnel::SendTunnelDataMsg (const std::vector& msgs) { std::unique_lock l(m_SendMutex); for (auto& it : msgs) m_Gateway.PutTunnelDataMsg (it); m_Gateway.SendBuffer (); - } - + } + void OutboundTunnel::HandleTunnelDataMsg (std::shared_ptr tunnelMsg) { LogPrint (eLogError, "Tunnel: incoming message for outbound tunnel ", GetTunnelID ()); @@ -265,9 +265,9 @@ namespace tunnel { s << GetTunnelID () << ":me"; PrintHops (s); - s << " ⇒ "; - } - + s << " ⇒ "; + } + ZeroHopsOutboundTunnel::ZeroHopsOutboundTunnel (): OutboundTunnel (std::make_shared ()), m_NumSentBytes (0) @@ -286,30 +286,30 @@ namespace tunnel case eDeliveryTypeTunnel: i2p::transport::transports.SendMessage (msg.hash, i2p::CreateTunnelGatewayMsg (msg.tunnelID, msg.data)); break; - case eDeliveryTypeRouter: + case eDeliveryTypeRouter: i2p::transport::transports.SendMessage (msg.hash, msg.data); break; default: LogPrint (eLogError, "Tunnel: Unknown delivery type ", (int)msg.deliveryType); - } + } } } void ZeroHopsOutboundTunnel::Print (std::stringstream& s) const { - s << GetTunnelID () << ":me ⇒ "; + s << GetTunnelID () << ":me ⇒ "; } Tunnels tunnels; - + Tunnels::Tunnels (): m_IsRunning (false), m_Thread (nullptr), m_NumSuccesiveTunnelCreations (0), m_NumFailedTunnelCreations (0) { } - - Tunnels::~Tunnels () + + Tunnels::~Tunnels () { - } + } std::shared_ptr Tunnels::GetTunnel (uint32_t tunnelID) { @@ -317,29 +317,29 @@ namespace tunnel if (it != m_Tunnels.end ()) return it->second; return nullptr; - } - + } + std::shared_ptr Tunnels::GetPendingInboundTunnel (uint32_t replyMsgID) { - return GetPendingTunnel (replyMsgID, m_PendingInboundTunnels); + return GetPendingTunnel (replyMsgID, m_PendingInboundTunnels); } std::shared_ptr Tunnels::GetPendingOutboundTunnel (uint32_t replyMsgID) { - return GetPendingTunnel (replyMsgID, m_PendingOutboundTunnels); - } + return GetPendingTunnel (replyMsgID, m_PendingOutboundTunnels); + } - template + template std::shared_ptr Tunnels::GetPendingTunnel (uint32_t replyMsgID, const std::map >& pendingTunnels) { auto it = pendingTunnels.find(replyMsgID); if (it != pendingTunnels.end () && it->second->GetState () == eTunnelStatePending) - { - it->second->SetState (eTunnelStateBuildReplyReceived); + { + it->second->SetState (eTunnelStateBuildReplyReceived); return it->second; } return nullptr; - } + } std::shared_ptr Tunnels::GetNextInboundTunnel () { @@ -353,26 +353,26 @@ namespace tunnel tunnel = it; minReceived = it->GetNumReceivedBytes (); } - } + } return tunnel; } - + std::shared_ptr Tunnels::GetNextOutboundTunnel () { if (m_OutboundTunnels.empty ()) return nullptr; uint32_t ind = rand () % m_OutboundTunnels.size (), i = 0; std::shared_ptr tunnel; for (const auto& it: m_OutboundTunnels) - { + { if (it->IsEstablished ()) { tunnel = it; i++; } if (i > ind && tunnel) break; - } + } return tunnel; - } + } std::shared_ptr Tunnels::CreateTunnelPool (int numInboundHops, int numOutboundHops, int numInboundTunnels, int numOutboundTunnels) @@ -381,19 +381,19 @@ namespace tunnel std::unique_lock l(m_PoolsMutex); m_Pools.push_back (pool); return pool; - } + } void Tunnels::DeleteTunnelPool (std::shared_ptr pool) { if (pool) - { + { StopTunnelPool (pool); { std::unique_lock l(m_PoolsMutex); m_Pools.remove (pool); - } - } - } + } + } + } void Tunnels::StopTunnelPool (std::shared_ptr pool) { @@ -401,47 +401,47 @@ namespace tunnel { pool->SetActive (false); pool->DetachTunnels (); - } - } - + } + } + void Tunnels::AddTransitTunnel (std::shared_ptr tunnel) { if (m_Tunnels.emplace (tunnel->GetTunnelID (), tunnel).second) m_TransitTunnels.push_back (tunnel); else LogPrint (eLogError, "Tunnel: tunnel with id ", tunnel->GetTunnelID (), " already exists"); - } + } void Tunnels::Start () { m_IsRunning = true; m_Thread = new std::thread (std::bind (&Tunnels::Run, this)); } - + void Tunnels::Stop () { m_IsRunning = false; m_Queue.WakeUp (); if (m_Thread) - { + { m_Thread->join (); delete m_Thread; m_Thread = 0; - } - } + } + } void Tunnels::Run () { std::this_thread::sleep_for (std::chrono::seconds(1)); // wait for other parts are ready - + uint64_t lastTs = 0; while (m_IsRunning) { try - { + { auto msg = m_Queue.GetNextWithTimeout (1000); // 1 sec if (msg) - { + { uint32_t prevTunnelID = 0, tunnelID = 0; std::shared_ptr prevTunnel; do @@ -449,16 +449,16 @@ namespace tunnel std::shared_ptr tunnel; uint8_t typeID = msg->GetTypeID (); switch (typeID) - { + { case eI2NPTunnelData: case eI2NPTunnelGateway: - { + { tunnelID = bufbe32toh (msg->GetPayload ()); if (tunnelID == prevTunnelID) tunnel = prevTunnel; else if (prevTunnel) prevTunnel->FlushTunnelDataMsgs (); - + if (!tunnel) tunnel = GetTunnel (tunnelID); if (tunnel) @@ -472,17 +472,17 @@ namespace tunnel LogPrint (eLogWarning, "Tunnel: tunnel not found, tunnelID=", tunnelID, " previousTunnelID=", prevTunnelID, " type=", (int)typeID); break; - } - case eI2NPVariableTunnelBuild: + } + case eI2NPVariableTunnelBuild: case eI2NPVariableTunnelBuildReply: case eI2NPTunnelBuild: - case eI2NPTunnelBuildReply: + case eI2NPTunnelBuildReply: HandleI2NPMessage (msg->GetBuffer (), msg->GetLength ()); - break; + break; default: LogPrint (eLogWarning, "Tunnel: unexpected messsage type ", (int) typeID); } - + msg = m_Queue.Get (); if (msg) { @@ -493,8 +493,8 @@ namespace tunnel tunnel->FlushTunnelDataMsgs (); } while (msg); - } - + } + uint64_t ts = i2p::util::GetSecondsSinceEpoch (); if (ts - lastTs >= 15) // manage tunnels every 15 seconds { @@ -505,9 +505,9 @@ namespace tunnel catch (std::exception& ex) { LogPrint (eLogError, "Tunnel: runtime exception: ", ex.what ()); - } - } - } + } + } + } void Tunnels::HandleTunnelGatewayMsg (std::shared_ptr tunnel, std::shared_ptr msg) { @@ -528,7 +528,7 @@ namespace tunnel msg->len = msg->offset + len; auto typeID = msg->GetTypeID (); LogPrint (eLogDebug, "Tunnel: gateway of ", (int) len, " bytes for tunnel ", tunnel->GetTunnelID (), ", msg type ", (int)typeID); - + if (IsRouterInfoMsg (msg) || typeID == eI2NPDatabaseSearchReply) // transit DatabaseStore my contain new/updated RI // or DatabaseSearchReply with new routers @@ -543,7 +543,7 @@ namespace tunnel ManageOutboundTunnels (); ManageTransitTunnels (); ManageTunnelPools (); - } + } void Tunnels::ManagePendingTunnels () { @@ -557,7 +557,7 @@ namespace tunnel // check pending tunnel. delete failed or timeout uint64_t ts = i2p::util::GetSecondsSinceEpoch (); for (auto it = pendingTunnels.begin (); it != pendingTunnels.end ();) - { + { auto tunnel = it->second; switch (tunnel->GetState ()) { @@ -579,8 +579,8 @@ namespace tunnel profile->TunnelNonReplied (); } hop = hop->next; - } - } + } + } // delete it = pendingTunnels.erase (it); m_NumFailedTunnelCreations++; @@ -596,13 +596,13 @@ namespace tunnel case eTunnelStateBuildReplyReceived: // intermediate state, will be either established of build failed ++it; - break; + break; default: // success it = pendingTunnels.erase (it); m_NumSuccesiveTunnelCreations++; - } - } + } + } } void Tunnels::ManageOutboundTunnels () @@ -620,26 +620,26 @@ namespace tunnel pool->TunnelExpired (tunnel); // we don't have outbound tunnels in m_Tunnels it = m_OutboundTunnels.erase (it); - } + } else { if (tunnel->IsEstablished ()) - { + { if (!tunnel->IsRecreated () && ts + TUNNEL_RECREATION_THRESHOLD > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) { - tunnel->SetIsRecreated (); + tunnel->SetIsRecreated (); auto pool = tunnel->GetTunnelPool (); if (pool) pool->RecreateOutboundTunnel (tunnel); } if (ts + TUNNEL_EXPIRATION_THRESHOLD > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) tunnel->SetState (eTunnelStateExpiring); - } + } ++it; } } - } - + } + if (m_OutboundTunnels.size () < 5) { // trying to create one more oubound tunnel @@ -648,12 +648,12 @@ namespace tunnel if (!inboundTunnel || !router) return; LogPrint (eLogDebug, "Tunnel: creating one hop outbound tunnel"); CreateTunnel ( - std::make_shared (std::vector > { router->GetRouterIdentity () }, - inboundTunnel->GetNextTunnelID (), inboundTunnel->GetNextIdentHash ()) - ); + std::make_shared (std::vector > { router->GetRouterIdentity () }, + inboundTunnel->GetNextTunnelID (), inboundTunnel->GetNextIdentHash ()) + ); } } - + void Tunnels::ManageInboundTunnels () { uint64_t ts = i2p::util::GetSecondsSinceEpoch (); @@ -669,26 +669,26 @@ namespace tunnel pool->TunnelExpired (tunnel); m_Tunnels.erase (tunnel->GetTunnelID ()); it = m_InboundTunnels.erase (it); - } + } else { if (tunnel->IsEstablished ()) - { + { if (!tunnel->IsRecreated () && ts + TUNNEL_RECREATION_THRESHOLD > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) { - tunnel->SetIsRecreated (); + tunnel->SetIsRecreated (); auto pool = tunnel->GetTunnelPool (); if (pool) pool->RecreateInboundTunnel (tunnel); } - + if (ts + TUNNEL_EXPIRATION_THRESHOLD > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) tunnel->SetState (eTunnelStateExpiring); - } + } it++; } } - } + } if (m_InboundTunnels.empty ()) { @@ -702,10 +702,10 @@ namespace tunnel } return; } - + if (m_OutboundTunnels.empty () || m_InboundTunnels.size () < 5) { - // trying to create one more inbound tunnel + // trying to create one more inbound tunnel auto router = i2p::data::netdb.GetRandomRouter (); if (!router) { LogPrint (eLogWarning, "Tunnel: can't find any router, skip creating tunnel"); @@ -714,9 +714,9 @@ namespace tunnel LogPrint (eLogDebug, "Tunnel: creating one hop inbound tunnel"); CreateTunnel ( std::make_shared (std::vector > { router->GetRouterIdentity () }) - ); + ); } - } + } void Tunnels::ManageTransitTunnels () { @@ -729,35 +729,35 @@ namespace tunnel LogPrint (eLogDebug, "Tunnel: Transit tunnel with id ", tunnel->GetTunnelID (), " expired"); m_Tunnels.erase (tunnel->GetTunnelID ()); it = m_TransitTunnels.erase (it); - } + } else it++; } - } + } void Tunnels::ManageTunnelPools () { std::unique_lock l(m_PoolsMutex); for (auto& pool : m_Pools) - { + { if (pool && pool->IsActive ()) - { + { pool->CreateTunnels (); pool->TestTunnels (); - } + } } - } - + } + void Tunnels::PostTunnelData (std::shared_ptr msg) { - if (msg) m_Queue.Put (msg); - } + if (msg) m_Queue.Put (msg); + } void Tunnels::PostTunnelData (const std::vector >& msgs) { m_Queue.Put (msgs); - } - + } + template std::shared_ptr Tunnels::CreateTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel) { @@ -767,7 +767,7 @@ namespace tunnel AddPendingTunnel (replyMsgID, newTunnel); newTunnel->Build (replyMsgID, outboundTunnel); return newTunnel; - } + } std::shared_ptr Tunnels::CreateInboundTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel) { @@ -804,20 +804,20 @@ namespace tunnel pool->TunnelCreated (newTunnel); else newTunnel->SetTunnelPool (nullptr); - } + } void Tunnels::AddInboundTunnel (std::shared_ptr newTunnel) { if (m_Tunnels.emplace (newTunnel->GetTunnelID (), newTunnel).second) - { + { m_InboundTunnels.push_back (newTunnel); auto pool = newTunnel->GetTunnelPool (); if (!pool) - { + { // build symmetric outbound tunnel CreateTunnel (std::make_shared(newTunnel->GetInvertedPeers (), newTunnel->GetNextTunnelID (), newTunnel->GetNextIdentHash ()), - GetNextOutboundTunnel ()); + GetNextOutboundTunnel ()); } else { @@ -829,9 +829,9 @@ namespace tunnel } else LogPrint (eLogError, "Tunnel: tunnel with id ", newTunnel->GetTunnelID (), " already exists"); - } + } + - std::shared_ptr Tunnels::CreateZeroHopsInboundTunnel () { auto inboundTunnel = std::make_shared (); @@ -839,7 +839,7 @@ namespace tunnel m_InboundTunnels.push_back (inboundTunnel); m_Tunnels[inboundTunnel->GetTunnelID ()] = inboundTunnel; return inboundTunnel; - } + } std::shared_ptr Tunnels::CreateZeroHopsOutboundTunnel () { @@ -859,7 +859,7 @@ namespace tunnel { int t = it->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT - ts; if (t > timeout) timeout = t; - } + } return timeout; } @@ -882,4 +882,3 @@ namespace tunnel } } } -