diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index 64e5d085..3d5893c7 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -367,9 +367,11 @@ namespace client HandleDataMessage (payload, len); break; case eI2NPDeliveryStatus: - // try tunnel test first - if (!m_Pool || !m_Pool->ProcessDeliveryStatus (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET), bufbe64toh (payload + DELIVERY_STATUS_TIMESTAMP_OFFSET))) - HandleDeliveryStatusMessage (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET)); + HandleDeliveryStatusMessage (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET)); + break; + case eI2NPTunnelTest: + if (m_Pool) + m_Pool->ProcessTunnelTest (bufbe32toh (payload + TUNNEL_TEST_MSGID_OFFSET), bufbe64toh (payload + TUNNEL_TEST_TIMESTAMP_OFFSET)); break; case eI2NPDatabaseStore: HandleDatabaseStoreMessage (payload, len); @@ -408,6 +410,7 @@ namespace client } i2p::data::IdentHash key (buf + DATABASE_STORE_KEY_OFFSET); std::shared_ptr leaseSet; + std::shared_ptr request; switch (buf[DATABASE_STORE_TYPE_OFFSET]) { case i2p::data::NETDB_STORE_TYPE_LEASESET: // 1 @@ -463,34 +466,59 @@ namespace client case i2p::data::NETDB_STORE_TYPE_ENCRYPTED_LEASESET2: // 5 { auto it2 = m_LeaseSetRequests.find (key); - if (it2 != m_LeaseSetRequests.end () && it2->second->requestedBlindedKey) - { - auto ls2 = std::make_shared (buf + offset, len - offset, - it2->second->requestedBlindedKey, m_LeaseSetPrivKey ? ((const uint8_t *)*m_LeaseSetPrivKey) : nullptr , GetPreferredCryptoType ()); - if (ls2->IsValid () && !ls2->IsExpired ()) + if (it2 != m_LeaseSetRequests.end ()) + { + request = it2->second; + m_LeaseSetRequests.erase (it2); + if (request->requestedBlindedKey) { - leaseSet = ls2; - std::lock_guard lock(m_RemoteLeaseSetsMutex); - m_RemoteLeaseSets[ls2->GetIdentHash ()] = ls2; // ident is not key - m_RemoteLeaseSets[key] = ls2; // also store as key for next lookup + auto ls2 = std::make_shared (buf + offset, len - offset, + request->requestedBlindedKey, m_LeaseSetPrivKey ? ((const uint8_t *)*m_LeaseSetPrivKey) : nullptr , GetPreferredCryptoType ()); + if (ls2->IsValid () && !ls2->IsExpired ()) + { + leaseSet = ls2; + std::lock_guard lock(m_RemoteLeaseSetsMutex); + m_RemoteLeaseSets[ls2->GetIdentHash ()] = ls2; // ident is not key + m_RemoteLeaseSets[key] = ls2; // also store as key for next lookup + } + else + LogPrint (eLogError, "Destination: New remote encrypted LeaseSet2 failed"); } else - LogPrint (eLogError, "Destination: New remote encrypted LeaseSet2 failed"); + { + // publishing verification doesn't have requestedBlindedKey + auto localLeaseSet = GetLeaseSetMt (); + if (localLeaseSet->GetStoreHash () == key) + { + auto ls = std::make_shared (i2p::data::NETDB_STORE_TYPE_ENCRYPTED_LEASESET2, + localLeaseSet->GetBuffer (), localLeaseSet->GetBufferLen (), false); + leaseSet = ls; + } + else + LogPrint (eLogWarning, "Destination: Encrypted LeaseSet2 received for request without blinded key"); + } } else - LogPrint (eLogInfo, "Destination: Couldn't find request for encrypted LeaseSet2"); + LogPrint (eLogWarning, "Destination: Couldn't find request for encrypted LeaseSet2"); break; } default: LogPrint (eLogError, "Destination: Unexpected client's DatabaseStore type ", buf[DATABASE_STORE_TYPE_OFFSET], ", dropped"); } - auto it1 = m_LeaseSetRequests.find (key); - if (it1 != m_LeaseSetRequests.end ()) + if (!request) + { + auto it1 = m_LeaseSetRequests.find (key); + if (it1 != m_LeaseSetRequests.end ()) + { + request = it1->second; + m_LeaseSetRequests.erase (it1); + } + } + if (request) { - it1->second->requestTimeoutTimer.cancel (); - if (it1->second) it1->second->Complete (leaseSet); - m_LeaseSetRequests.erase (it1); + request->requestTimeoutTimer.cancel (); + request->Complete (leaseSet); } } @@ -584,12 +612,7 @@ namespace client shared_from_this (), std::placeholders::_1)); return; } - if (!m_Pool->GetInboundTunnels ().size () || !m_Pool->GetOutboundTunnels ().size ()) - { - LogPrint (eLogError, "Destination: Can't publish LeaseSet. Destination is not ready"); - return; - } - auto floodfill = i2p::data::netdb.GetClosestFloodfill (leaseSet->GetIdentHash (), m_ExcludedFloodfills); + auto floodfill = i2p::data::netdb.GetClosestFloodfill (leaseSet->GetStoreHash (), m_ExcludedFloodfills); if (!floodfill) { LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found"); @@ -600,26 +623,39 @@ namespace client auto inbound = m_Pool->GetNextInboundTunnel (nullptr, floodfill->GetCompatibleTransports (true)); if (!outbound || !inbound) { - LogPrint (eLogInfo, "Destination: No compatible tunnels with ", floodfill->GetIdentHash ().ToBase64 (), ". Trying another floodfill"); - m_ExcludedFloodfills.insert (floodfill->GetIdentHash ()); - floodfill = i2p::data::netdb.GetClosestFloodfill (leaseSet->GetIdentHash (), m_ExcludedFloodfills); - if (floodfill) - { - outbound = m_Pool->GetNextOutboundTunnel (nullptr, floodfill->GetCompatibleTransports (false)); - if (outbound) + if (!m_Pool->GetInboundTunnels ().empty () && !m_Pool->GetOutboundTunnels ().empty ()) + { + LogPrint (eLogInfo, "Destination: No compatible tunnels with ", floodfill->GetIdentHash ().ToBase64 (), ". Trying another floodfill"); + m_ExcludedFloodfills.insert (floodfill->GetIdentHash ()); + floodfill = i2p::data::netdb.GetClosestFloodfill (leaseSet->GetStoreHash (), m_ExcludedFloodfills); + if (floodfill) { - inbound = m_Pool->GetNextInboundTunnel (nullptr, floodfill->GetCompatibleTransports (true)); - if (!inbound) - LogPrint (eLogError, "Destination: Can't publish LeaseSet. No inbound tunnels"); + outbound = m_Pool->GetNextOutboundTunnel (nullptr, floodfill->GetCompatibleTransports (false)); + if (outbound) + { + inbound = m_Pool->GetNextInboundTunnel (nullptr, floodfill->GetCompatibleTransports (true)); + if (!inbound) + LogPrint (eLogError, "Destination: Can't publish LeaseSet. No inbound tunnels"); + } + else + LogPrint (eLogError, "Destination: Can't publish LeaseSet. No outbound tunnels"); } else - LogPrint (eLogError, "Destination: Can't publish LeaseSet. No outbound tunnels"); - } + LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found"); + } else - LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found"); + LogPrint (eLogDebug, "Destination: No tunnels in pool"); + if (!floodfill || !outbound || !inbound) { + // we can't publish now m_ExcludedFloodfills.clear (); + m_PublishReplyToken = 1; // dummy non-zero value + // try again after a while + LogPrint (eLogInfo, "Destination: Can't publish LeasetSet because destination is not ready. Try publishing again after ", PUBLISH_CONFIRMATION_TIMEOUT, " seconds"); + m_PublishConfirmationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_CONFIRMATION_TIMEOUT)); + m_PublishConfirmationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishConfirmationTimer, + shared_from_this (), std::placeholders::_1)); return; } } diff --git a/libi2pd/Garlic.cpp b/libi2pd/Garlic.cpp index ba35995f..dd66af60 100644 --- a/libi2pd/Garlic.cpp +++ b/libi2pd/Garlic.cpp @@ -431,8 +431,7 @@ namespace garlic } GarlicDestination::GarlicDestination (): m_NumTags (32), // 32 tags by default - m_PayloadBuffer (nullptr), m_NumRatchetInboundTags (0), // 0 means standard - m_NumUsedECIESx25519Tags (0) + m_PayloadBuffer (nullptr), m_NumRatchetInboundTags (0) // 0 means standard { } @@ -589,18 +588,11 @@ namespace garlic auto it = m_ECIESx25519Tags.find (tag); if (it != m_ECIESx25519Tags.end ()) { - if (!it->second.tagset) return true; // duplicate - if (it->second.tagset->HandleNextMessage (buf, len, it->second.index)) - { + if (it->second.tagset && it->second.tagset->HandleNextMessage (buf, len, it->second.index)) m_LastTagset = it->second.tagset; - it->second.tagset = nullptr; // mark as used - } else - { LogPrint (eLogError, "Garlic: Can't handle ECIES-X25519-AEAD-Ratchet message"); - m_ECIESx25519Tags.erase (it); - } - m_NumUsedECIESx25519Tags++; + m_ECIESx25519Tags.erase (it); return true; } return false; @@ -885,41 +877,17 @@ namespace garlic } numExpiredTags = 0; - if (m_NumUsedECIESx25519Tags > ECIESX25519_TAGSET_MAX_NUM_TAGS) // too many used tags + for (auto it = m_ECIESx25519Tags.begin (); it != m_ECIESx25519Tags.end ();) { - std::unordered_map oldTags; - std::swap (m_ECIESx25519Tags, oldTags); // re-create - for (auto& it: oldTags) - if (it.second.tagset) - { - if (it.second.tagset->IsExpired (ts) || it.second.tagset->IsIndexExpired (it.second.index)) - { - it.second.tagset->DeleteSymmKey (it.second.index); - numExpiredTags++; - } - else if (it.second.tagset->IsSessionTerminated()) - numExpiredTags++; - else - m_ECIESx25519Tags.emplace (it); - } - } - else - { - for (auto it = m_ECIESx25519Tags.begin (); it != m_ECIESx25519Tags.end ();) + if (it->second.tagset->IsExpired (ts) || it->second.tagset->IsIndexExpired (it->second.index)) { - if (!it->second.tagset) - { - // delete used tag - it = m_ECIESx25519Tags.erase (it); - continue; - } - if (it->second.tagset->IsExpired (ts) || it->second.tagset->IsIndexExpired (it->second.index)) - { - it->second.tagset->DeleteSymmKey (it->second.index); - it = m_ECIESx25519Tags.erase (it); - numExpiredTags++; - } - else if (it->second.tagset->IsSessionTerminated()) + it->second.tagset->DeleteSymmKey (it->second.index); + it = m_ECIESx25519Tags.erase (it); + numExpiredTags++; + } + else + { + if (it->second.tagset->IsSessionTerminated ()) { it = m_ECIESx25519Tags.erase (it); numExpiredTags++; @@ -927,8 +895,7 @@ namespace garlic else ++it; } - } - m_NumUsedECIESx25519Tags = 0; + } if (numExpiredTags > 0) LogPrint (eLogDebug, "Garlic: ", numExpiredTags, " ECIESx25519 tags expired for ", GetIdentHash().ToBase64 ()); if (m_LastTagset && m_LastTagset->IsExpired (ts)) diff --git a/libi2pd/Garlic.h b/libi2pd/Garlic.h index 0386752b..83e3b050 100644 --- a/libi2pd/Garlic.h +++ b/libi2pd/Garlic.h @@ -291,7 +291,6 @@ namespace garlic std::unordered_map, std::hash > > m_Tags; std::unordered_map m_ECIESx25519Tags; // session tag -> session ReceiveRatchetTagSetPtr m_LastTagset; // tagset last message came for - int m_NumUsedECIESx25519Tags; // DeliveryStatus std::mutex m_DeliveryStatusSessionsMutex; std::unordered_map m_DeliveryStatusSessions; // msgID -> session @@ -300,7 +299,7 @@ namespace garlic // for HTTP only size_t GetNumIncomingTags () const { return m_Tags.size (); } - size_t GetNumIncomingECIESx25519Tags () const { return m_ECIESx25519Tags.size () - m_NumUsedECIESx25519Tags; } + size_t GetNumIncomingECIESx25519Tags () const { return m_ECIESx25519Tags.size (); } const decltype(m_Sessions)& GetSessions () const { return m_Sessions; }; const decltype(m_ECIESx25519Sessions)& GetECIESx25519Sessions () const { return m_ECIESx25519Sessions; } }; diff --git a/libi2pd/I2NPProtocol.cpp b/libi2pd/I2NPProtocol.cpp index da6da638..bea3f93f 100644 --- a/libi2pd/I2NPProtocol.cpp +++ b/libi2pd/I2NPProtocol.cpp @@ -115,6 +115,17 @@ namespace i2p return newMsg; } + std::shared_ptr CreateTunnelTestMsg (uint32_t msgID) + { + auto m = NewI2NPShortMessage (); + uint8_t * buf = m->GetPayload (); + htobe32buf (buf + TUNNEL_TEST_MSGID_OFFSET, msgID); + htobe64buf (buf + TUNNEL_TEST_TIMESTAMP_OFFSET, i2p::util::GetMonotonicMicroseconds ()); + m->len += TUNNEL_TEST_SIZE; + m->FillI2NPMessageHeader (eI2NPTunnelTest); + return m; + } + std::shared_ptr CreateDeliveryStatusMsg (uint32_t msgID) { auto m = NewI2NPShortMessage (); @@ -870,6 +881,10 @@ namespace i2p i2p::context.ProcessDeliveryStatusMessage (msg); break; } + case eI2NPTunnelTest: + if (msg->from && msg->from->GetTunnelPool ()) + msg->from->GetTunnelPool ()->ProcessTunnelTest (msg); + break; case eI2NPVariableTunnelBuild: case eI2NPTunnelBuild: case eI2NPShortTunnelBuild: diff --git a/libi2pd/I2NPProtocol.h b/libi2pd/I2NPProtocol.h index 36facbe3..b1e2d170 100644 --- a/libi2pd/I2NPProtocol.h +++ b/libi2pd/I2NPProtocol.h @@ -48,6 +48,11 @@ namespace i2p const size_t DELIVERY_STATUS_TIMESTAMP_OFFSET = DELIVERY_STATUS_MSGID_OFFSET + 4; const size_t DELIVERY_STATUS_SIZE = DELIVERY_STATUS_TIMESTAMP_OFFSET + 8; + // TunnelTest + const size_t TUNNEL_TEST_MSGID_OFFSET = 0; + const size_t TUNNEL_TEST_TIMESTAMP_OFFSET = TUNNEL_TEST_MSGID_OFFSET + 4; + const size_t TUNNEL_TEST_SIZE = TUNNEL_TEST_TIMESTAMP_OFFSET + 8; + // DatabaseStore const size_t DATABASE_STORE_KEY_OFFSET = 0; const size_t DATABASE_STORE_TYPE_OFFSET = DATABASE_STORE_KEY_OFFSET + 32; @@ -116,7 +121,8 @@ namespace i2p eI2NPVariableTunnelBuild = 23, eI2NPVariableTunnelBuildReply = 24, eI2NPShortTunnelBuild = 25, - eI2NPShortTunnelBuildReply = 26 + eI2NPShortTunnelBuildReply = 26, + eI2NPTunnelTest = 231 }; const uint8_t TUNNEL_BUILD_RECORD_GATEWAY_FLAG = 0x80; @@ -279,6 +285,7 @@ namespace tunnel std::shared_ptr CreateI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr from = nullptr); std::shared_ptr CopyI2NPMessage (std::shared_ptr msg); + std::shared_ptr CreateTunnelTestMsg (uint32_t msgID); std::shared_ptr CreateDeliveryStatusMsg (uint32_t msgID); std::shared_ptr CreateRouterInfoDatabaseLookupMsg (const uint8_t * key, const uint8_t * from, uint32_t replyTunnelID, bool exploratory = false, std::set * excludedPeers = nullptr); diff --git a/libi2pd/NetDb.cpp b/libi2pd/NetDb.cpp index bfd07df3..5922c740 100644 --- a/libi2pd/NetDb.cpp +++ b/libi2pd/NetDb.cpp @@ -672,10 +672,11 @@ namespace data (CreateRoutingKey (it.second->GetIdentHash ()) ^ i2p::context.GetIdentHash ()).metric[0] >= 0x02)) // different first 7 bits it.second->SetUnreachable (true); } - if (it.second->IsUnreachable () && i2p::transport::transports.IsConnected (it.second->GetIdentHash ())) - it.second->SetUnreachable (false); // don't expire connected router } - + // make router reachable back if connected now + if (it.second->IsUnreachable () && i2p::transport::transports.IsConnected (it.second->GetIdentHash ())) + it.second->SetUnreachable (false); + if (it.second->IsUnreachable ()) { if (it.second->IsFloodfill ()) deletedFloodfillsCount++; @@ -823,17 +824,31 @@ namespace data offset += 4; if (replyToken != 0xFFFFFFFFU) // if not caught on OBEP or IBGW { + IdentHash replyIdent(buf + offset); auto deliveryStatus = CreateDeliveryStatusMsg (replyToken); if (!tunnelID) // send response directly - transports.SendMessage (buf + offset, deliveryStatus); + transports.SendMessage (replyIdent, deliveryStatus); else { - auto pool = i2p::tunnel::tunnels.GetExploratoryPool (); - auto outbound = pool ? pool->GetNextOutboundTunnel () : nullptr; - if (outbound) - outbound->SendTunnelDataMsgTo (buf + offset, tunnelID, deliveryStatus); + bool direct = true; + if (!i2p::transport::transports.IsConnected (replyIdent)) + { + auto r = FindRouter (replyIdent); + if (r && !r->IsReachableFrom (i2p::context.GetRouterInfo ())) + direct = false; + } + if (direct) // send response directly to IBGW + transports.SendMessage (replyIdent, i2p::CreateTunnelGatewayMsg (tunnelID, deliveryStatus)); else - LogPrint (eLogWarning, "NetDb: No outbound tunnels for DatabaseStore reply found"); + { + // send response through exploratory tunnel + auto pool = i2p::tunnel::tunnels.GetExploratoryPool (); + auto outbound = pool ? pool->GetNextOutboundTunnel () : nullptr; + if (outbound) + outbound->SendTunnelDataMsgTo (replyIdent, tunnelID, deliveryStatus); + else + LogPrint (eLogWarning, "NetDb: No outbound tunnels for DatabaseStore reply found"); + } } } offset += 32; @@ -956,8 +971,10 @@ namespace data LogPrint (eLogDebug, "NetDb: Found new/outdated router. Requesting RouterInfo..."); if(m_FloodfillBootstrap) RequestDestinationFrom(router, m_FloodfillBootstrap->GetIdentHash(), true); - else + else if (!IsRouterBanned (router)) RequestDestination (router); + else + LogPrint (eLogDebug, "NetDb: Router ", peerHash, " is banned. Skipped"); } else LogPrint (eLogDebug, "NetDb: [:|||:]"); @@ -1111,12 +1128,24 @@ namespace data else LogPrint(eLogWarning, "NetDb: Encrypted reply requested but no tags provided"); } - auto exploratoryPool = i2p::tunnel::tunnels.GetExploratoryPool (); - auto outbound = exploratoryPool ? exploratoryPool->GetNextOutboundTunnel () : nullptr; - if (outbound) - outbound->SendTunnelDataMsgTo (replyIdent, replyTunnelID, replyMsg); - else + bool direct = true; + if (!i2p::transport::transports.IsConnected (replyIdent)) + { + auto r = FindRouter (replyIdent); + if (r && !r->IsReachableFrom (i2p::context.GetRouterInfo ())) + direct = false; + } + if (direct) transports.SendMessage (replyIdent, i2p::CreateTunnelGatewayMsg (replyTunnelID, replyMsg)); + else + { + auto exploratoryPool = i2p::tunnel::tunnels.GetExploratoryPool (); + auto outbound = exploratoryPool ? exploratoryPool->GetNextOutboundTunnel () : nullptr; + if (outbound) + outbound->SendTunnelDataMsgTo (replyIdent, replyTunnelID, replyMsg); + else + LogPrint (eLogWarning, "NetDb: Can't send lookup reply to ", replyIdent.ToBase64 (), ". Non reachable and no outbound tunnels"); + } } else transports.SendMessage (replyIdent, replyMsg); diff --git a/libi2pd/Profiling.cpp b/libi2pd/Profiling.cpp index 2031fa39..3a2db9a5 100644 --- a/libi2pd/Profiling.cpp +++ b/libi2pd/Profiling.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2023, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -245,6 +245,15 @@ namespace data return profile; } + bool IsRouterBanned (const IdentHash& identHash) + { + std::unique_lock l(g_ProfilesMutex); + auto it = g_Profiles.find (identHash); + if (it != g_Profiles.end ()) + return it->second->IsUnreachable (); + return false; + } + void InitProfilesStorage () { g_ProfilesStorage.SetPlace(i2p::fs::GetDataDir()); diff --git a/libi2pd/Profiling.h b/libi2pd/Profiling.h index ed23fb12..c4aa6a9a 100644 --- a/libi2pd/Profiling.h +++ b/libi2pd/Profiling.h @@ -87,6 +87,7 @@ namespace data }; std::shared_ptr GetRouterProfile (const IdentHash& identHash); + bool IsRouterBanned (const IdentHash& identHash); // check only existing profiles void InitProfilesStorage (); void DeleteObsoleteProfiles (); void SaveProfiles (); diff --git a/libi2pd/RouterContext.cpp b/libi2pd/RouterContext.cpp index 2c1dc979..3eb6ffbf 100644 --- a/libi2pd/RouterContext.cpp +++ b/libi2pd/RouterContext.cpp @@ -40,7 +40,7 @@ namespace i2p void RouterContext::Init () { srand (i2p::util::GetMillisecondsSinceEpoch () % 1000); - m_StartupTime = std::chrono::steady_clock::now(); + m_StartupTime = i2p::util::GetMonotonicSeconds (); if (!Load ()) CreateNewRouter (); @@ -1152,13 +1152,13 @@ namespace i2p bool RouterContext::HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID) { - if (typeID == eI2NPDeliveryStatus) + if (typeID == eI2NPTunnelTest) { // try tunnel test auto pool = GetTunnelPool (); - if (pool && pool->ProcessDeliveryStatus (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET), bufbe64toh (payload + DELIVERY_STATUS_TIMESTAMP_OFFSET))) + if (pool && pool->ProcessTunnelTest (bufbe32toh (payload + TUNNEL_TEST_MSGID_OFFSET), bufbe64toh (payload + TUNNEL_TEST_TIMESTAMP_OFFSET))) return true; - } + } auto msg = CreateI2NPMessage (typeID, payload, len, msgID); if (!msg) return false; i2p::HandleI2NPMessage (msg); @@ -1236,7 +1236,7 @@ namespace i2p uint32_t RouterContext::GetUptime () const { - return std::chrono::duration_cast (std::chrono::steady_clock::now() - m_StartupTime).count (); + return i2p::util::GetMonotonicSeconds () - m_StartupTime; } bool RouterContext::Decrypt (const uint8_t * encrypted, uint8_t * data, i2p::data::CryptoKeyType preferredCrypto) const diff --git a/libi2pd/RouterContext.h b/libi2pd/RouterContext.h index 062c187d..941974b2 100644 --- a/libi2pd/RouterContext.h +++ b/libi2pd/RouterContext.h @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include "Identity.h" @@ -241,7 +240,7 @@ namespace garlic std::shared_ptr m_ECIESSession; uint64_t m_LastUpdateTime; // in seconds bool m_AcceptsTunnels, m_IsFloodfill; - std::chrono::time_point m_StartupTime; + uint64_t m_StartupTime; // monotonic seconds uint64_t m_BandwidthLimit; // allowed bandwidth int m_ShareRatio; RouterStatus m_Status, m_StatusV6; diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 3e0583ea..812f49d3 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -256,8 +256,33 @@ namespace transport socket.open (localEndpoint.protocol ()); if (localEndpoint.address ().is_v6 ()) socket.set_option (boost::asio::ip::v6_only (true)); - socket.set_option (boost::asio::socket_base::receive_buffer_size (SSU2_SOCKET_RECEIVE_BUFFER_SIZE)); - socket.set_option (boost::asio::socket_base::send_buffer_size (SSU2_SOCKET_SEND_BUFFER_SIZE)); + + uint64_t bufferSize = i2p::context.GetBandwidthLimit() * 1024 / 5; // max lag = 200ms + bufferSize = std::max(SSU2_SOCKET_MIN_BUFFER_SIZE, std::min(bufferSize, SSU2_SOCKET_MAX_BUFFER_SIZE)); + + boost::asio::socket_base::receive_buffer_size receiveBufferSizeSet (bufferSize); + boost::asio::socket_base::send_buffer_size sendBufferSizeSet (bufferSize); + socket.set_option (receiveBufferSizeSet); + socket.set_option (sendBufferSizeSet); + boost::asio::socket_base::receive_buffer_size receiveBufferSizeGet; + boost::asio::socket_base::send_buffer_size sendBufferSizeGet; + socket.get_option (receiveBufferSizeGet); + socket.get_option (sendBufferSizeGet); + if (receiveBufferSizeGet.value () != receiveBufferSizeSet.value () || + sendBufferSizeGet.value () != sendBufferSizeSet.value ()) + { + LogPrint (eLogWarning, "SSU2: Socket receive buffer size: requested = ", + receiveBufferSizeSet.value (), ", got = ", receiveBufferSizeGet.value ()); + LogPrint (eLogWarning, "SSU2: Socket send buffer size: requested = ", + sendBufferSizeSet.value (), ", got = ", sendBufferSizeGet.value ()); + } + else + { + LogPrint (eLogInfo, "SSU2: Socket receive buffer size: ", receiveBufferSizeGet.value ()); + LogPrint (eLogInfo, "SSU2: Socket send buffer size: ", sendBufferSizeGet.value ()); + } + + socket.non_blocking (true); } catch (std::exception& ex ) { @@ -469,7 +494,7 @@ namespace transport m_PendingOutgoingSessions.erase (ep); } - std::shared_ptr SSU2Server::GetRandomSession ( + std::shared_ptr SSU2Server::GetRandomPeerTestSession ( i2p::data::RouterInfo::CompatibleTransports remoteTransports, const i2p::data::IdentHash& excluded) const { if (m_Sessions.empty ()) return nullptr; @@ -480,7 +505,7 @@ namespace transport std::advance (it, ind); while (it != m_Sessions.end ()) { - if ((it->second->GetRemoteTransports () & remoteTransports) && + if ((it->second->GetRemotePeerTestTransports () & remoteTransports) && it->second->GetRemoteIdentity ()->GetIdentHash () != excluded) return it->second; it++; @@ -489,7 +514,7 @@ namespace transport it = m_Sessions.begin (); while (it != m_Sessions.end () && ind) { - if ((it->second->GetRemoteTransports () & remoteTransports) && + if ((it->second->GetRemotePeerTestTransports () & remoteTransports) && it->second->GetRemoteIdentity ()->GetIdentHash () != excluded) return it->second; it++; ind--; @@ -637,7 +662,10 @@ namespace transport if (!ec) i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen); else - LogPrint (eLogError, "SSU2: Send exception: ", ec.message (), " to ", to); + { + LogPrint (ec == boost::asio::error::would_block ? eLogInfo : eLogError, + "SSU2: Send exception: ", ec.message (), " to ", to); + } } void SSU2Server::Send (const uint8_t * header, size_t headerLen, const uint8_t * headerX, size_t headerXLen, @@ -671,7 +699,10 @@ namespace transport if (!ec) i2p::transport::transports.UpdateSentBytes (headerLen + headerXLen + payloadLen); else - LogPrint (eLogError, "SSU2: Send exception: ", ec.message (), " to ", to); + { + LogPrint (ec == boost::asio::error::would_block ? eLogInfo : eLogError, + "SSU2: Send exception: ", ec.message (), " to ", to); + } } bool SSU2Server::CreateSession (std::shared_ptr router, @@ -825,8 +856,11 @@ namespace transport auto it = m_SessionsByRouterHash.find (router->GetIdentHash ()); if (it != m_SessionsByRouterHash.end ()) { - auto s = it->second; - if (it->second->IsEstablished ()) + auto remoteAddr = it->second->GetAddress (); + if (!remoteAddr || !remoteAddr->IsPeerTesting () || + (v4 && !remoteAddr->IsV4 ()) || (!v4 && !remoteAddr->IsV6 ())) return false; + auto s = it->second; + if (s->IsEstablished ()) GetService ().post ([s]() { s->SendPeerTest (); }); else s->SetOnEstablished ([s]() { s->SendPeerTest (); }); diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index da5ca317..60e70e7a 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -25,8 +25,8 @@ namespace transport const int SSU2_RESEND_CHECK_TIMEOUT_VARIANCE = 100; // in milliseconds const int SSU2_RESEND_CHECK_MORE_TIMEOUT = 10; // in milliseconds const size_t SSU2_MAX_RESEND_PACKETS = 128; // packets to resend at the time - const size_t SSU2_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K - const size_t SSU2_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K + const uint64_t SSU2_SOCKET_MIN_BUFFER_SIZE = 128 * 1024; + const uint64_t SSU2_SOCKET_MAX_BUFFER_SIZE = 4 * 1024 * 1024; const size_t SSU2_MAX_NUM_INTRODUCERS = 3; const size_t SSU2_MIN_RECEIVED_PACKET_SIZE = 40; // 16 byte short header + 8 byte minimum payload + 16 byte MAC const int SSU2_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour @@ -77,7 +77,7 @@ namespace transport void RemovePendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep); std::shared_ptr FindSession (const i2p::data::IdentHash& ident) const; std::shared_ptr FindPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) const; - std::shared_ptr GetRandomSession (i2p::data::RouterInfo::CompatibleTransports remoteTransports, + std::shared_ptr GetRandomPeerTestSession (i2p::data::RouterInfo::CompatibleTransports remoteTransports, const i2p::data::IdentHash& excluded) const; void AddRelay (uint32_t tag, std::shared_ptr relay); diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 4015dcac..2c499753 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -81,7 +81,7 @@ namespace transport SSU2Session::SSU2Session (SSU2Server& server, std::shared_ptr in_RemoteRouter, std::shared_ptr addr): TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT), - m_Server (server), m_Address (addr), m_RemoteTransports (0), + m_Server (server), m_Address (addr), m_RemoteTransports (0), m_RemotePeerTestTransports (0), m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown), m_SendPacketNum (0), m_ReceivePacketNum (0), m_LastDatetimeSentPacketNum (0), m_IsDataReceived (false), m_WindowSize (SSU2_MIN_WINDOW_SIZE), @@ -96,6 +96,8 @@ namespace transport InitNoiseXKState1 (*m_NoiseState, m_Address->s); m_RemoteEndpoint = boost::asio::ip::udp::endpoint (m_Address->host, m_Address->port); m_RemoteTransports = in_RemoteRouter->GetCompatibleTransports (false); + if (in_RemoteRouter->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4; + if (in_RemoteRouter->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6; RAND_bytes ((uint8_t *)&m_DestConnID, 8); RAND_bytes ((uint8_t *)&m_SourceConnID, 8); } @@ -1110,6 +1112,10 @@ namespace transport AdjustMaxPayloadSize (); m_Server.AddSessionByRouterHash (shared_from_this ()); // we know remote router now m_RemoteTransports = ri->GetCompatibleTransports (false); + m_RemotePeerTestTransports = 0; + if (ri->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4; + if (ri->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6; + // handle other blocks HandlePayload (decryptedPayload.data () + riSize + 3, decryptedPayload.size () - riSize - 3); Established (); @@ -2109,7 +2115,7 @@ namespace transport { case 1: // Bob from Alice { - auto session = m_Server.GetRandomSession ((buf[12] == 6) ? i2p::data::RouterInfo::eSSU2V4 : i2p::data::RouterInfo::eSSU2V6, + auto session = m_Server.GetRandomPeerTestSession ((buf[12] == 6) ? i2p::data::RouterInfo::eSSU2V4 : i2p::data::RouterInfo::eSSU2V6, GetRemoteIdentity ()->GetIdentHash ()); if (session) // session with Charlie { @@ -2180,7 +2186,8 @@ namespace transport std::shared_ptr addr; if (ExtractEndpoint (buf + offset + 10, asz, ep)) addr = r->GetSSU2Address (ep.address ().is_v4 ()); - if (addr && m_Server.IsSupported (ep.address ())) + if (addr && m_Server.IsSupported (ep.address ()) && + i2p::context.GetRouterInfo ().IsSSU2PeerTesting (ep.address ().is_v4 ())) { // send msg 5 to Alice auto session = std::make_shared (m_Server, r, addr); @@ -2280,7 +2287,7 @@ namespace transport if (GetTestingState ()) { SetTestingState (false); - if (GetRouterStatus () != eRouterStatusFirewalled) + if (GetRouterStatus () != eRouterStatusFirewalled && addr->IsPeerTesting ()) { SetRouterStatus (eRouterStatusFirewalled); if (m_Address->IsV4 ()) diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index 5dd45fd4..ed17a6f4 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -238,6 +238,7 @@ namespace transport void SetRemoteEndpoint (const boost::asio::ip::udp::endpoint& ep) { m_RemoteEndpoint = ep; }; const boost::asio::ip::udp::endpoint& GetRemoteEndpoint () const { return m_RemoteEndpoint; }; i2p::data::RouterInfo::CompatibleTransports GetRemoteTransports () const { return m_RemoteTransports; }; + i2p::data::RouterInfo::CompatibleTransports GetRemotePeerTestTransports () const { return m_RemotePeerTestTransports; }; std::shared_ptr GetAddress () const { return m_Address; }; void SetOnEstablished (OnEstablished e) { m_OnEstablished = e; }; OnEstablished GetOnEstablished () const { return m_OnEstablished; }; @@ -343,7 +344,7 @@ namespace transport std::unique_ptr m_SentHandshakePacket; // SessionRequest, SessionCreated or SessionConfirmed std::shared_ptr m_Address; boost::asio::ip::udp::endpoint m_RemoteEndpoint; - i2p::data::RouterInfo::CompatibleTransports m_RemoteTransports; // for peer tests + i2p::data::RouterInfo::CompatibleTransports m_RemoteTransports, m_RemotePeerTestTransports; uint64_t m_DestConnID, m_SourceConnID; SSU2SessionState m_State; uint8_t m_KeyDataSend[64], m_KeyDataReceive[64]; diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index fa809c19..f6518163 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -433,7 +433,10 @@ namespace stream LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); rtt = 1; } - m_RTT = std::round ((m_RTT*seqn + rtt)/(seqn + 1.0)); + if (seqn) + m_RTT = std::round (RTT_EWMA_ALPHA * m_RTT + (1.0 - RTT_EWMA_ALPHA) * rtt); + else + m_RTT = rtt; m_RTO = m_RTT*1.5; // TODO: implement it better LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); @@ -998,8 +1001,8 @@ namespace stream m_RTO *= 2; switch (m_NumResendAttempts) { - case 1: // congesion avoidance - m_WindowSize >>= 1; // /2 + case 1: // congestion avoidance + m_WindowSize -= (m_WindowSize + WINDOW_SIZE_DROP_FRACTION) / WINDOW_SIZE_DROP_FRACTION; // adjustment >= 1 if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; break; case 2: diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index ed79edc9..1980b6fd 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -56,6 +56,8 @@ namespace stream const int WINDOW_SIZE = 6; // in messages const int MIN_WINDOW_SIZE = 1; const int MAX_WINDOW_SIZE = 128; + const int WINDOW_SIZE_DROP_FRACTION = 10; // 1/10 + const double RTT_EWMA_ALPHA = 0.5; const int INITIAL_RTT = 8000; // in milliseconds const int INITIAL_RTO = 9000; // in milliseconds const int MIN_SEND_ACK_TIMEOUT = 2; // in milliseconds diff --git a/libi2pd/Timestamp.cpp b/libi2pd/Timestamp.cpp index 99507398..f13efe68 100644 --- a/libi2pd/Timestamp.cpp +++ b/libi2pd/Timestamp.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2022, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -232,6 +232,24 @@ namespace util return GetLocalHoursSinceEpoch () + g_TimeOffset/3600; } + uint64_t GetMonotonicMicroseconds() + { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + } + + uint64_t GetMonotonicMilliseconds() + { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + } + + uint64_t GetMonotonicSeconds () + { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + } + void GetCurrentDate (char * date) { GetDateString (GetSecondsSinceEpoch (), date); diff --git a/libi2pd/Timestamp.h b/libi2pd/Timestamp.h index ff777257..025908af 100644 --- a/libi2pd/Timestamp.h +++ b/libi2pd/Timestamp.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2022, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -24,6 +24,10 @@ namespace util uint32_t GetMinutesSinceEpoch (); uint32_t GetHoursSinceEpoch (); + uint64_t GetMonotonicMicroseconds (); + uint64_t GetMonotonicMilliseconds (); + uint64_t GetMonotonicSeconds (); + void GetCurrentDate (char * date); // returns date as YYYYMMDD string, 9 bytes void GetDateString (uint64_t timestamp, char * date); // timestamp is seconds since epoch, returns date as YYYYMMDD string, 9 bytes void AdjustTimeOffset (int64_t offset); // in seconds from current diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index 04c58cf3..8531e820 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -460,9 +460,8 @@ namespace transport auto it = m_Peers.find (ident); if (it == m_Peers.end ()) { - // check if not known as unreachable - auto profile = i2p::data::GetRouterProfile (ident); - if (profile && profile->IsUnreachable ()) return; // don't create peer to unreachable router + // check if not banned + if (i2p::data::IsRouterBanned (ident)) return; // don't create peer to unreachable router // try to connect bool connected = false; try @@ -491,10 +490,9 @@ namespace transport { if (sz < CHECK_PROFILE_NUM_DELAYED_MESSAGES && sz + msgs.size () >= CHECK_PROFILE_NUM_DELAYED_MESSAGES) { - auto profile = i2p::data::GetRouterProfile (ident); - if (profile && profile->IsUnreachable ()) + if (i2p::data::IsRouterBanned (ident)) { - LogPrint (eLogWarning, "Transports: Peer profile for ", ident.ToBase64 (), " reports unreachable. Dropped"); + LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped"); std::unique_lock l(m_PeersMutex); m_Peers.erase (it); return; @@ -589,6 +587,14 @@ namespace transport m_Peers.erase (ident); return false; } + else if (i2p::data::IsRouterBanned (ident)) + { + LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped"); + peer.Done (); + std::unique_lock l(m_PeersMutex); + m_Peers.erase (ident); + return false; + } else // otherwise request RI { LogPrint (eLogInfo, "Transports: RouterInfo for ", ident.ToBase64 (), " not found, requested"); diff --git a/libi2pd/Tunnel.cpp b/libi2pd/Tunnel.cpp index 246f68e4..57feff45 100644 --- a/libi2pd/Tunnel.cpp +++ b/libi2pd/Tunnel.cpp @@ -33,7 +33,7 @@ namespace tunnel TunnelBase (config->GetTunnelID (), config->GetNextTunnelID (), config->GetNextIdentHash ()), m_Config (config), m_IsShortBuildMessage (false), m_Pool (nullptr), m_State (eTunnelStatePending), m_FarEndTransports (i2p::data::RouterInfo::eAllTransports), - m_IsRecreated (false), m_Latency (0) + m_IsRecreated (false), m_Latency (UNKNOWN_LATENCY) { } @@ -198,10 +198,10 @@ namespace tunnel return established; } - bool Tunnel::LatencyFitsRange(uint64_t lower, uint64_t upper) const + bool Tunnel::LatencyFitsRange(int lowerbound, int upperbound) const { auto latency = GetMeanLatency(); - return latency >= lower && latency <= upper; + return latency >= lowerbound && latency <= upperbound; } void Tunnel::EncryptTunnelMsg (std::shared_ptr in, std::shared_ptr out) diff --git a/libi2pd/Tunnel.h b/libi2pd/Tunnel.h index 4d9ea830..d3de272d 100644 --- a/libi2pd/Tunnel.h +++ b/libi2pd/Tunnel.h @@ -39,7 +39,8 @@ namespace tunnel const int TUNNEL_CREATION_TIMEOUT = 30; // 30 seconds const int STANDARD_NUM_RECORDS = 4; // in VariableTunnelBuild message const int MAX_NUM_RECORDS = 8; - const int HIGH_LATENCY_PER_HOP = 250; // in milliseconds + const int UNKNOWN_LATENCY = -1; + const int HIGH_LATENCY_PER_HOP = 250000; // in microseconds const int MAX_TUNNEL_MSGS_BATCH_SIZE = 100; // handle messages without interrupt const uint16_t DEFAULT_MAX_NUM_TRANSIT_TUNNELS = 5000; const int TUNNEL_MANAGE_INTERVAL = 15; // in seconds @@ -108,14 +109,14 @@ namespace tunnel void EncryptTunnelMsg (std::shared_ptr in, std::shared_ptr out) override; /** @brief add latency sample */ - void AddLatencySample(const uint64_t ms) { m_Latency = (m_Latency + ms) >> 1; } + void AddLatencySample(const int us) { m_Latency = LatencyIsKnown() ? (m_Latency + us) >> 1 : us; } /** @brief get this tunnel's estimated latency */ - uint64_t GetMeanLatency() const { return m_Latency; } + int GetMeanLatency() const { return (m_Latency + 500) / 1000; } /** @brief return true if this tunnel's latency fits in range [lowerbound, upperbound] */ - bool LatencyFitsRange(uint64_t lowerbound, uint64_t upperbound) const; + bool LatencyFitsRange(int lowerbound, int upperbound) const; - bool LatencyIsKnown() const { return m_Latency > 0; } - bool IsSlow () const { return LatencyIsKnown() && (int)m_Latency > HIGH_LATENCY_PER_HOP*GetNumHops (); } + bool LatencyIsKnown() const { return m_Latency != UNKNOWN_LATENCY; } + bool IsSlow () const { return LatencyIsKnown() && m_Latency > HIGH_LATENCY_PER_HOP*GetNumHops (); } /** visit all hops we currently store */ void VisitTunnelHops(TunnelHopVisitor v); @@ -129,7 +130,7 @@ namespace tunnel TunnelState m_State; i2p::data::RouterInfo::CompatibleTransports m_FarEndTransports; bool m_IsRecreated; // if tunnel is replaced by new, or new tunnel requested to replace - uint64_t m_Latency; // in milliseconds + int m_Latency; // in microseconds }; class OutboundTunnel: public Tunnel diff --git a/libi2pd/TunnelPool.cpp b/libi2pd/TunnelPool.cpp index 2a5e9dd1..5fdf963c 100644 --- a/libi2pd/TunnelPool.cpp +++ b/libi2pd/TunnelPool.cpp @@ -102,7 +102,10 @@ namespace tunnel it->SetTunnelPool (nullptr); m_OutboundTunnels.clear (); } - m_Tests.clear (); + { + std::unique_lock l(m_TestsMutex); + m_Tests.clear (); + } } bool TunnelPool::Reconfigure(int inHops, int outHops, int inQuant, int outQuant) @@ -145,8 +148,11 @@ namespace tunnel if (expiredTunnel) { expiredTunnel->SetTunnelPool (nullptr); - for (auto& it: m_Tests) - if (it.second.second == expiredTunnel) it.second.second = nullptr; + { + std::unique_lock l(m_TestsMutex); + for (auto& it: m_Tests) + if (it.second.second == expiredTunnel) it.second.second = nullptr; + } std::unique_lock l(m_InboundTunnelsMutex); m_InboundTunnels.erase (expiredTunnel); @@ -167,8 +173,11 @@ namespace tunnel if (expiredTunnel) { expiredTunnel->SetTunnelPool (nullptr); - for (auto& it: m_Tests) - if (it.second.first == expiredTunnel) it.second.first = nullptr; + { + std::unique_lock l(m_TestsMutex); + for (auto& it: m_Tests) + if (it.second.first == expiredTunnel) it.second.first = nullptr; + } std::unique_lock l(m_OutboundTunnelsMutex); m_OutboundTunnels.erase (expiredTunnel); @@ -342,7 +351,7 @@ namespace tunnel else it.second.first->SetState (eTunnelStateTestFailed); } - else + else if (it.second.first->GetState () != eTunnelStateExpiring) it.second.first->SetState (eTunnelStateTestFailed); } if (it.second.second) @@ -360,18 +369,19 @@ namespace tunnel if (m_LocalDestination) m_LocalDestination->SetLeaseSetUpdated (); } - else + else if (it.second.second->GetState () != eTunnelStateExpiring) it.second.second->SetState (eTunnelStateTestFailed); } } // new tests + if (!m_LocalDestination) return; std::vector, std::shared_ptr > > newTests; std::vector > outboundTunnels; { std::unique_lock l(m_OutboundTunnelsMutex); for (auto& it: m_OutboundTunnels) - if (it->IsEstablished () || it->GetState () == eTunnelStateTestFailed) + if (it->IsEstablished ()) outboundTunnels.push_back (it); } std::shuffle (outboundTunnels.begin(), outboundTunnels.end(), m_Rng); @@ -379,7 +389,7 @@ namespace tunnel { std::unique_lock l(m_InboundTunnelsMutex); for (auto& it: m_InboundTunnels) - if (it->IsEstablished () || it->GetState () == eTunnelStateTestFailed) + if (it->IsEstablished ()) inboundTunnels.push_back (it); } std::shuffle (inboundTunnels.begin(), inboundTunnels.end(), m_Rng); @@ -390,7 +400,7 @@ namespace tunnel newTests.push_back(std::make_pair (*it1, *it2)); ++it1; ++it2; } - bool encrypt = m_LocalDestination ? m_LocalDestination->SupportsEncryptionType (i2p::data::CRYPTO_KEY_TYPE_ECIES_X25519_AEAD) : false; + bool isECIES = m_LocalDestination->SupportsEncryptionType (i2p::data::CRYPTO_KEY_TYPE_ECIES_X25519_AEAD); for (auto& it: newTests) { uint32_t msgID; @@ -399,7 +409,7 @@ namespace tunnel std::unique_lock l(m_TestsMutex); m_Tests[msgID] = it; } - auto msg = CreateDeliveryStatusMsg (msgID); + auto msg = CreateTunnelTestMsg (msgID); auto outbound = it.first; auto s = shared_from_this (); msg->onDrop = [msgID, outbound, s]() @@ -414,14 +424,22 @@ namespace tunnel std::unique_lock l(s->m_OutboundTunnelsMutex); s->m_OutboundTunnels.erase (outbound); } - }; - if (encrypt) + }; + // encrypt + if (isECIES) { - // encrypt uint8_t key[32]; RAND_bytes (key, 32); uint64_t tag; RAND_bytes ((uint8_t *)&tag, 8); m_LocalDestination->SubmitECIESx25519Key (key, tag); msg = i2p::garlic::WrapECIESX25519Message (msg, key, tag); + } + else + { + uint8_t key[32], tag[32]; + RAND_bytes (key, 32); RAND_bytes (tag, 32); + m_LocalDestination->SubmitSessionKey (key, tag); + i2p::garlic::ElGamalAESSession garlic (key, tag); + msg = garlic.WrapSingleMessage (msg); } outbound->SendTunnelDataMsgTo (it.second->GetNextIdentHash (), it.second->GetNextTunnelID (), msg); } @@ -446,22 +464,24 @@ namespace tunnel } void TunnelPool::ProcessDeliveryStatus (std::shared_ptr msg) + { + if (m_LocalDestination) + m_LocalDestination->ProcessDeliveryStatusMessage (msg); + else + LogPrint (eLogWarning, "Tunnels: Local destination doesn't exist, dropped"); + } + + void TunnelPool::ProcessTunnelTest (std::shared_ptr msg) { const uint8_t * buf = msg->GetPayload (); uint32_t msgID = bufbe32toh (buf); buf += 4; uint64_t timestamp = bufbe64toh (buf); - if (!ProcessDeliveryStatus (msgID, timestamp)) - { - if (m_LocalDestination) - m_LocalDestination->ProcessDeliveryStatusMessage (msg); - else - LogPrint (eLogWarning, "Tunnels: Local destination doesn't exist, dropped"); - } + ProcessTunnelTest (msgID, timestamp); } - bool TunnelPool::ProcessDeliveryStatus (uint32_t msgID, uint64_t timestamp) + bool TunnelPool::ProcessTunnelTest (uint32_t msgID, uint64_t timestamp) { decltype(m_Tests)::mapped_type test; bool found = false; @@ -477,8 +497,9 @@ namespace tunnel } if (found) { - uint64_t dlt = i2p::util::GetMillisecondsSinceEpoch () - timestamp; - LogPrint (eLogDebug, "Tunnels: Test of ", msgID, " successful. ", dlt, " milliseconds"); + int dlt = (uint64_t)i2p::util::GetMonotonicMicroseconds () - (int64_t)timestamp; + LogPrint (eLogDebug, "Tunnels: Test of ", msgID, " successful. ", dlt, " microseconds"); + if (dlt < 0) dlt = 0; // should not happen int numHops = 0; if (test.first) numHops += test.first->GetNumHops (); if (test.second) numHops += test.second->GetNumHops (); @@ -488,20 +509,20 @@ namespace tunnel if (test.first->GetState () != eTunnelStateExpiring) test.first->SetState (eTunnelStateEstablished); // update latency - uint64_t latency = 0; + int latency = 0; if (numHops) latency = dlt*test.first->GetNumHops ()/numHops; if (!latency) latency = dlt/2; - test.first->AddLatencySample(latency); + test.first->AddLatencySample (latency); } if (test.second) { if (test.second->GetState () != eTunnelStateExpiring) test.second->SetState (eTunnelStateEstablished); // update latency - uint64_t latency = 0; + int latency = 0; if (numHops) latency = dlt*test.second->GetNumHops ()/numHops; if (!latency) latency = dlt/2; - test.second->AddLatencySample(latency); + test.second->AddLatencySample (latency); } } return found; @@ -823,7 +844,7 @@ namespace tunnel { std::shared_ptr tun = nullptr; std::unique_lock lock(m_InboundTunnelsMutex); - uint64_t min = 1000000; + int min = 1000000; for (const auto & itr : m_InboundTunnels) { if(!itr->LatencyIsKnown()) continue; auto l = itr->GetMeanLatency(); @@ -839,7 +860,7 @@ namespace tunnel { std::shared_ptr tun = nullptr; std::unique_lock lock(m_OutboundTunnelsMutex); - uint64_t min = 1000000; + int min = 1000000; for (const auto & itr : m_OutboundTunnels) { if(!itr->LatencyIsKnown()) continue; auto l = itr->GetMeanLatency(); diff --git a/libi2pd/TunnelPool.h b/libi2pd/TunnelPool.h index 3e845c0e..e76453be 100644 --- a/libi2pd/TunnelPool.h +++ b/libi2pd/TunnelPool.h @@ -85,7 +85,8 @@ namespace tunnel void ManageTunnels (uint64_t ts); void ProcessGarlicMessage (std::shared_ptr msg); void ProcessDeliveryStatus (std::shared_ptr msg); - bool ProcessDeliveryStatus (uint32_t msgID, uint64_t timestamp); + void ProcessTunnelTest (std::shared_ptr msg); + bool ProcessTunnelTest (uint32_t msgID, uint64_t timestamp); bool IsExploratory () const; bool IsActive () const { return m_IsActive; }; @@ -105,7 +106,7 @@ namespace tunnel bool HasCustomPeerSelector(); /** @brief make this tunnel pool yield tunnels that fit latency range [min, max] */ - void RequireLatency(uint64_t min, uint64_t max) { m_MinLatency = min; m_MaxLatency = max; } + void RequireLatency(int min, int max) { m_MinLatency = min; m_MaxLatency = max; } /** @brief return true if this tunnel pool has a latency requirement */ bool HasLatencyRequirement() const { return m_MinLatency > 0 && m_MaxLatency > 0; } @@ -150,8 +151,8 @@ namespace tunnel std::mutex m_CustomPeerSelectorMutex; ITunnelPeerSelector * m_CustomPeerSelector; - uint64_t m_MinLatency = 0; // if > 0 this tunnel pool will try building tunnels with minimum latency by ms - uint64_t m_MaxLatency = 0; // if > 0 this tunnel pool will try building tunnels with maximum latency by ms + int m_MinLatency = 0; // if > 0 this tunnel pool will try building tunnels with minimum latency by ms + int m_MaxLatency = 0; // if > 0 this tunnel pool will try building tunnels with maximum latency by ms std::random_device m_Rd; std::mt19937 m_Rng;