Browse Source

Merge pull request #2040 from PurpleI2P/openssl

Recent changes
pull/1939/merge
orignal 9 months ago committed by GitHub
parent
commit
0141489d34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 64
      libi2pd/Destination.cpp
  2. 43
      libi2pd/Garlic.cpp
  3. 3
      libi2pd/Garlic.h
  4. 15
      libi2pd/I2NPProtocol.cpp
  5. 9
      libi2pd/I2NPProtocol.h
  6. 41
      libi2pd/NetDb.cpp
  7. 11
      libi2pd/Profiling.cpp
  8. 1
      libi2pd/Profiling.h
  9. 8
      libi2pd/RouterContext.cpp
  10. 3
      libi2pd/RouterContext.h
  11. 50
      libi2pd/SSU2.cpp
  12. 6
      libi2pd/SSU2.h
  13. 15
      libi2pd/SSU2Session.cpp
  14. 3
      libi2pd/SSU2Session.h
  15. 9
      libi2pd/Streaming.cpp
  16. 2
      libi2pd/Streaming.h
  17. 20
      libi2pd/Timestamp.cpp
  18. 6
      libi2pd/Timestamp.h
  19. 18
      libi2pd/Transports.cpp
  20. 6
      libi2pd/Tunnel.cpp
  21. 15
      libi2pd/Tunnel.h
  22. 69
      libi2pd/TunnelPool.cpp
  23. 9
      libi2pd/TunnelPool.h

64
libi2pd/Destination.cpp

@ -367,10 +367,12 @@ namespace client @@ -367,10 +367,12 @@ namespace client
HandleDataMessage (payload, len);
break;
case eI2NPDeliveryStatus:
// try tunnel test first
if (!m_Pool || !m_Pool->ProcessDeliveryStatus (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET), bufbe64toh (payload + DELIVERY_STATUS_TIMESTAMP_OFFSET)))
HandleDeliveryStatusMessage (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET));
break;
case eI2NPTunnelTest:
if (m_Pool)
m_Pool->ProcessTunnelTest (bufbe32toh (payload + TUNNEL_TEST_MSGID_OFFSET), bufbe64toh (payload + TUNNEL_TEST_TIMESTAMP_OFFSET));
break;
case eI2NPDatabaseStore:
HandleDatabaseStoreMessage (payload, len);
break;
@ -408,6 +410,7 @@ namespace client @@ -408,6 +410,7 @@ namespace client
}
i2p::data::IdentHash key (buf + DATABASE_STORE_KEY_OFFSET);
std::shared_ptr<i2p::data::LeaseSet> leaseSet;
std::shared_ptr<LeaseSetRequest> request;
switch (buf[DATABASE_STORE_TYPE_OFFSET])
{
case i2p::data::NETDB_STORE_TYPE_LEASESET: // 1
@ -463,10 +466,14 @@ namespace client @@ -463,10 +466,14 @@ namespace client
case i2p::data::NETDB_STORE_TYPE_ENCRYPTED_LEASESET2: // 5
{
auto it2 = m_LeaseSetRequests.find (key);
if (it2 != m_LeaseSetRequests.end () && it2->second->requestedBlindedKey)
if (it2 != m_LeaseSetRequests.end ())
{
request = it2->second;
m_LeaseSetRequests.erase (it2);
if (request->requestedBlindedKey)
{
auto ls2 = std::make_shared<i2p::data::LeaseSet2> (buf + offset, len - offset,
it2->second->requestedBlindedKey, m_LeaseSetPrivKey ? ((const uint8_t *)*m_LeaseSetPrivKey) : nullptr , GetPreferredCryptoType ());
request->requestedBlindedKey, m_LeaseSetPrivKey ? ((const uint8_t *)*m_LeaseSetPrivKey) : nullptr , GetPreferredCryptoType ());
if (ls2->IsValid () && !ls2->IsExpired ())
{
leaseSet = ls2;
@ -478,21 +485,42 @@ namespace client @@ -478,21 +485,42 @@ namespace client
LogPrint (eLogError, "Destination: New remote encrypted LeaseSet2 failed");
}
else
LogPrint (eLogInfo, "Destination: Couldn't find request for encrypted LeaseSet2");
{
// publishing verification doesn't have requestedBlindedKey
auto localLeaseSet = GetLeaseSetMt ();
if (localLeaseSet->GetStoreHash () == key)
{
auto ls = std::make_shared<i2p::data::LeaseSet2> (i2p::data::NETDB_STORE_TYPE_ENCRYPTED_LEASESET2,
localLeaseSet->GetBuffer (), localLeaseSet->GetBufferLen (), false);
leaseSet = ls;
}
else
LogPrint (eLogWarning, "Destination: Encrypted LeaseSet2 received for request without blinded key");
}
}
else
LogPrint (eLogWarning, "Destination: Couldn't find request for encrypted LeaseSet2");
break;
}
default:
LogPrint (eLogError, "Destination: Unexpected client's DatabaseStore type ", buf[DATABASE_STORE_TYPE_OFFSET], ", dropped");
}
if (!request)
{
auto it1 = m_LeaseSetRequests.find (key);
if (it1 != m_LeaseSetRequests.end ())
{
it1->second->requestTimeoutTimer.cancel ();
if (it1->second) it1->second->Complete (leaseSet);
request = it1->second;
m_LeaseSetRequests.erase (it1);
}
}
if (request)
{
request->requestTimeoutTimer.cancel ();
request->Complete (leaseSet);
}
}
void LeaseSetDestination::HandleDatabaseSearchReplyMessage (const uint8_t * buf, size_t len)
{
@ -584,12 +612,7 @@ namespace client @@ -584,12 +612,7 @@ namespace client
shared_from_this (), std::placeholders::_1));
return;
}
if (!m_Pool->GetInboundTunnels ().size () || !m_Pool->GetOutboundTunnels ().size ())
{
LogPrint (eLogError, "Destination: Can't publish LeaseSet. Destination is not ready");
return;
}
auto floodfill = i2p::data::netdb.GetClosestFloodfill (leaseSet->GetIdentHash (), m_ExcludedFloodfills);
auto floodfill = i2p::data::netdb.GetClosestFloodfill (leaseSet->GetStoreHash (), m_ExcludedFloodfills);
if (!floodfill)
{
LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found");
@ -599,10 +622,12 @@ namespace client @@ -599,10 +622,12 @@ namespace client
auto outbound = m_Pool->GetNextOutboundTunnel (nullptr, floodfill->GetCompatibleTransports (false));
auto inbound = m_Pool->GetNextInboundTunnel (nullptr, floodfill->GetCompatibleTransports (true));
if (!outbound || !inbound)
{
if (!m_Pool->GetInboundTunnels ().empty () && !m_Pool->GetOutboundTunnels ().empty ())
{
LogPrint (eLogInfo, "Destination: No compatible tunnels with ", floodfill->GetIdentHash ().ToBase64 (), ". Trying another floodfill");
m_ExcludedFloodfills.insert (floodfill->GetIdentHash ());
floodfill = i2p::data::netdb.GetClosestFloodfill (leaseSet->GetIdentHash (), m_ExcludedFloodfills);
floodfill = i2p::data::netdb.GetClosestFloodfill (leaseSet->GetStoreHash (), m_ExcludedFloodfills);
if (floodfill)
{
outbound = m_Pool->GetNextOutboundTunnel (nullptr, floodfill->GetCompatibleTransports (false));
@ -617,9 +642,20 @@ namespace client @@ -617,9 +642,20 @@ namespace client
}
else
LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found");
}
else
LogPrint (eLogDebug, "Destination: No tunnels in pool");
if (!floodfill || !outbound || !inbound)
{
// we can't publish now
m_ExcludedFloodfills.clear ();
m_PublishReplyToken = 1; // dummy non-zero value
// try again after a while
LogPrint (eLogInfo, "Destination: Can't publish LeasetSet because destination is not ready. Try publishing again after ", PUBLISH_CONFIRMATION_TIMEOUT, " seconds");
m_PublishConfirmationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_CONFIRMATION_TIMEOUT));
m_PublishConfirmationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishConfirmationTimer,
shared_from_this (), std::placeholders::_1));
return;
}
}

43
libi2pd/Garlic.cpp

@ -431,8 +431,7 @@ namespace garlic @@ -431,8 +431,7 @@ namespace garlic
}
GarlicDestination::GarlicDestination (): m_NumTags (32), // 32 tags by default
m_PayloadBuffer (nullptr), m_NumRatchetInboundTags (0), // 0 means standard
m_NumUsedECIESx25519Tags (0)
m_PayloadBuffer (nullptr), m_NumRatchetInboundTags (0) // 0 means standard
{
}
@ -589,18 +588,11 @@ namespace garlic @@ -589,18 +588,11 @@ namespace garlic
auto it = m_ECIESx25519Tags.find (tag);
if (it != m_ECIESx25519Tags.end ())
{
if (!it->second.tagset) return true; // duplicate
if (it->second.tagset->HandleNextMessage (buf, len, it->second.index))
{
if (it->second.tagset && it->second.tagset->HandleNextMessage (buf, len, it->second.index))
m_LastTagset = it->second.tagset;
it->second.tagset = nullptr; // mark as used
}
else
{
LogPrint (eLogError, "Garlic: Can't handle ECIES-X25519-AEAD-Ratchet message");
m_ECIESx25519Tags.erase (it);
}
m_NumUsedECIESx25519Tags++;
return true;
}
return false;
@ -885,41 +877,17 @@ namespace garlic @@ -885,41 +877,17 @@ namespace garlic
}
numExpiredTags = 0;
if (m_NumUsedECIESx25519Tags > ECIESX25519_TAGSET_MAX_NUM_TAGS) // too many used tags
{
std::unordered_map<uint64_t, ECIESX25519AEADRatchetIndexTagset> oldTags;
std::swap (m_ECIESx25519Tags, oldTags); // re-create
for (auto& it: oldTags)
if (it.second.tagset)
{
if (it.second.tagset->IsExpired (ts) || it.second.tagset->IsIndexExpired (it.second.index))
{
it.second.tagset->DeleteSymmKey (it.second.index);
numExpiredTags++;
}
else if (it.second.tagset->IsSessionTerminated())
numExpiredTags++;
else
m_ECIESx25519Tags.emplace (it);
}
}
else
{
for (auto it = m_ECIESx25519Tags.begin (); it != m_ECIESx25519Tags.end ();)
{
if (!it->second.tagset)
{
// delete used tag
it = m_ECIESx25519Tags.erase (it);
continue;
}
if (it->second.tagset->IsExpired (ts) || it->second.tagset->IsIndexExpired (it->second.index))
{
it->second.tagset->DeleteSymmKey (it->second.index);
it = m_ECIESx25519Tags.erase (it);
numExpiredTags++;
}
else if (it->second.tagset->IsSessionTerminated())
else
{
if (it->second.tagset->IsSessionTerminated ())
{
it = m_ECIESx25519Tags.erase (it);
numExpiredTags++;
@ -928,7 +896,6 @@ namespace garlic @@ -928,7 +896,6 @@ namespace garlic
++it;
}
}
m_NumUsedECIESx25519Tags = 0;
if (numExpiredTags > 0)
LogPrint (eLogDebug, "Garlic: ", numExpiredTags, " ECIESx25519 tags expired for ", GetIdentHash().ToBase64 ());
if (m_LastTagset && m_LastTagset->IsExpired (ts))

3
libi2pd/Garlic.h

@ -291,7 +291,6 @@ namespace garlic @@ -291,7 +291,6 @@ namespace garlic
std::unordered_map<SessionTag, std::shared_ptr<AESDecryption>, std::hash<i2p::data::Tag<32> > > m_Tags;
std::unordered_map<uint64_t, ECIESX25519AEADRatchetIndexTagset> m_ECIESx25519Tags; // session tag -> session
ReceiveRatchetTagSetPtr m_LastTagset; // tagset last message came for
int m_NumUsedECIESx25519Tags;
// DeliveryStatus
std::mutex m_DeliveryStatusSessionsMutex;
std::unordered_map<uint32_t, GarlicRoutingSessionPtr> m_DeliveryStatusSessions; // msgID -> session
@ -300,7 +299,7 @@ namespace garlic @@ -300,7 +299,7 @@ namespace garlic
// for HTTP only
size_t GetNumIncomingTags () const { return m_Tags.size (); }
size_t GetNumIncomingECIESx25519Tags () const { return m_ECIESx25519Tags.size () - m_NumUsedECIESx25519Tags; }
size_t GetNumIncomingECIESx25519Tags () const { return m_ECIESx25519Tags.size (); }
const decltype(m_Sessions)& GetSessions () const { return m_Sessions; };
const decltype(m_ECIESx25519Sessions)& GetECIESx25519Sessions () const { return m_ECIESx25519Sessions; }
};

15
libi2pd/I2NPProtocol.cpp

@ -115,6 +115,17 @@ namespace i2p @@ -115,6 +115,17 @@ namespace i2p
return newMsg;
}
std::shared_ptr<I2NPMessage> CreateTunnelTestMsg (uint32_t msgID)
{
auto m = NewI2NPShortMessage ();
uint8_t * buf = m->GetPayload ();
htobe32buf (buf + TUNNEL_TEST_MSGID_OFFSET, msgID);
htobe64buf (buf + TUNNEL_TEST_TIMESTAMP_OFFSET, i2p::util::GetMonotonicMicroseconds ());
m->len += TUNNEL_TEST_SIZE;
m->FillI2NPMessageHeader (eI2NPTunnelTest);
return m;
}
std::shared_ptr<I2NPMessage> CreateDeliveryStatusMsg (uint32_t msgID)
{
auto m = NewI2NPShortMessage ();
@ -870,6 +881,10 @@ namespace i2p @@ -870,6 +881,10 @@ namespace i2p
i2p::context.ProcessDeliveryStatusMessage (msg);
break;
}
case eI2NPTunnelTest:
if (msg->from && msg->from->GetTunnelPool ())
msg->from->GetTunnelPool ()->ProcessTunnelTest (msg);
break;
case eI2NPVariableTunnelBuild:
case eI2NPTunnelBuild:
case eI2NPShortTunnelBuild:

9
libi2pd/I2NPProtocol.h

@ -48,6 +48,11 @@ namespace i2p @@ -48,6 +48,11 @@ namespace i2p
const size_t DELIVERY_STATUS_TIMESTAMP_OFFSET = DELIVERY_STATUS_MSGID_OFFSET + 4;
const size_t DELIVERY_STATUS_SIZE = DELIVERY_STATUS_TIMESTAMP_OFFSET + 8;
// TunnelTest
const size_t TUNNEL_TEST_MSGID_OFFSET = 0;
const size_t TUNNEL_TEST_TIMESTAMP_OFFSET = TUNNEL_TEST_MSGID_OFFSET + 4;
const size_t TUNNEL_TEST_SIZE = TUNNEL_TEST_TIMESTAMP_OFFSET + 8;
// DatabaseStore
const size_t DATABASE_STORE_KEY_OFFSET = 0;
const size_t DATABASE_STORE_TYPE_OFFSET = DATABASE_STORE_KEY_OFFSET + 32;
@ -116,7 +121,8 @@ namespace i2p @@ -116,7 +121,8 @@ namespace i2p
eI2NPVariableTunnelBuild = 23,
eI2NPVariableTunnelBuildReply = 24,
eI2NPShortTunnelBuild = 25,
eI2NPShortTunnelBuildReply = 26
eI2NPShortTunnelBuildReply = 26,
eI2NPTunnelTest = 231
};
const uint8_t TUNNEL_BUILD_RECORD_GATEWAY_FLAG = 0x80;
@ -279,6 +285,7 @@ namespace tunnel @@ -279,6 +285,7 @@ namespace tunnel
std::shared_ptr<I2NPMessage> CreateI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr<i2p::tunnel::InboundTunnel> from = nullptr);
std::shared_ptr<I2NPMessage> CopyI2NPMessage (std::shared_ptr<I2NPMessage> msg);
std::shared_ptr<I2NPMessage> CreateTunnelTestMsg (uint32_t msgID);
std::shared_ptr<I2NPMessage> CreateDeliveryStatusMsg (uint32_t msgID);
std::shared_ptr<I2NPMessage> CreateRouterInfoDatabaseLookupMsg (const uint8_t * key, const uint8_t * from,
uint32_t replyTunnelID, bool exploratory = false, std::set<i2p::data::IdentHash> * excludedPeers = nullptr);

41
libi2pd/NetDb.cpp

@ -672,9 +672,10 @@ namespace data @@ -672,9 +672,10 @@ namespace data
(CreateRoutingKey (it.second->GetIdentHash ()) ^ i2p::context.GetIdentHash ()).metric[0] >= 0x02)) // different first 7 bits
it.second->SetUnreachable (true);
}
if (it.second->IsUnreachable () && i2p::transport::transports.IsConnected (it.second->GetIdentHash ()))
it.second->SetUnreachable (false); // don't expire connected router
}
// make router reachable back if connected now
if (it.second->IsUnreachable () && i2p::transport::transports.IsConnected (it.second->GetIdentHash ()))
it.second->SetUnreachable (false);
if (it.second->IsUnreachable ())
{
@ -823,19 +824,33 @@ namespace data @@ -823,19 +824,33 @@ namespace data
offset += 4;
if (replyToken != 0xFFFFFFFFU) // if not caught on OBEP or IBGW
{
IdentHash replyIdent(buf + offset);
auto deliveryStatus = CreateDeliveryStatusMsg (replyToken);
if (!tunnelID) // send response directly
transports.SendMessage (buf + offset, deliveryStatus);
transports.SendMessage (replyIdent, deliveryStatus);
else
{
bool direct = true;
if (!i2p::transport::transports.IsConnected (replyIdent))
{
auto r = FindRouter (replyIdent);
if (r && !r->IsReachableFrom (i2p::context.GetRouterInfo ()))
direct = false;
}
if (direct) // send response directly to IBGW
transports.SendMessage (replyIdent, i2p::CreateTunnelGatewayMsg (tunnelID, deliveryStatus));
else
{
// send response through exploratory tunnel
auto pool = i2p::tunnel::tunnels.GetExploratoryPool ();
auto outbound = pool ? pool->GetNextOutboundTunnel () : nullptr;
if (outbound)
outbound->SendTunnelDataMsgTo (buf + offset, tunnelID, deliveryStatus);
outbound->SendTunnelDataMsgTo (replyIdent, tunnelID, deliveryStatus);
else
LogPrint (eLogWarning, "NetDb: No outbound tunnels for DatabaseStore reply found");
}
}
}
offset += 32;
}
// we must send reply back before this check
@ -956,8 +971,10 @@ namespace data @@ -956,8 +971,10 @@ namespace data
LogPrint (eLogDebug, "NetDb: Found new/outdated router. Requesting RouterInfo...");
if(m_FloodfillBootstrap)
RequestDestinationFrom(router, m_FloodfillBootstrap->GetIdentHash(), true);
else
else if (!IsRouterBanned (router))
RequestDestination (router);
else
LogPrint (eLogDebug, "NetDb: Router ", peerHash, " is banned. Skipped");
}
else
LogPrint (eLogDebug, "NetDb: [:|||:]");
@ -1111,12 +1128,24 @@ namespace data @@ -1111,12 +1128,24 @@ namespace data
else
LogPrint(eLogWarning, "NetDb: Encrypted reply requested but no tags provided");
}
bool direct = true;
if (!i2p::transport::transports.IsConnected (replyIdent))
{
auto r = FindRouter (replyIdent);
if (r && !r->IsReachableFrom (i2p::context.GetRouterInfo ()))
direct = false;
}
if (direct)
transports.SendMessage (replyIdent, i2p::CreateTunnelGatewayMsg (replyTunnelID, replyMsg));
else
{
auto exploratoryPool = i2p::tunnel::tunnels.GetExploratoryPool ();
auto outbound = exploratoryPool ? exploratoryPool->GetNextOutboundTunnel () : nullptr;
if (outbound)
outbound->SendTunnelDataMsgTo (replyIdent, replyTunnelID, replyMsg);
else
transports.SendMessage (replyIdent, i2p::CreateTunnelGatewayMsg (replyTunnelID, replyMsg));
LogPrint (eLogWarning, "NetDb: Can't send lookup reply to ", replyIdent.ToBase64 (), ". Non reachable and no outbound tunnels");
}
}
else
transports.SendMessage (replyIdent, replyMsg);

11
libi2pd/Profiling.cpp

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2023, The PurpleI2P Project
* Copyright (c) 2013-2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@ -245,6 +245,15 @@ namespace data @@ -245,6 +245,15 @@ namespace data
return profile;
}
bool IsRouterBanned (const IdentHash& identHash)
{
std::unique_lock<std::mutex> l(g_ProfilesMutex);
auto it = g_Profiles.find (identHash);
if (it != g_Profiles.end ())
return it->second->IsUnreachable ();
return false;
}
void InitProfilesStorage ()
{
g_ProfilesStorage.SetPlace(i2p::fs::GetDataDir());

1
libi2pd/Profiling.h

@ -87,6 +87,7 @@ namespace data @@ -87,6 +87,7 @@ namespace data
};
std::shared_ptr<RouterProfile> GetRouterProfile (const IdentHash& identHash);
bool IsRouterBanned (const IdentHash& identHash); // check only existing profiles
void InitProfilesStorage ();
void DeleteObsoleteProfiles ();
void SaveProfiles ();

8
libi2pd/RouterContext.cpp

@ -40,7 +40,7 @@ namespace i2p @@ -40,7 +40,7 @@ namespace i2p
void RouterContext::Init ()
{
srand (i2p::util::GetMillisecondsSinceEpoch () % 1000);
m_StartupTime = std::chrono::steady_clock::now();
m_StartupTime = i2p::util::GetMonotonicSeconds ();
if (!Load ())
CreateNewRouter ();
@ -1152,11 +1152,11 @@ namespace i2p @@ -1152,11 +1152,11 @@ namespace i2p
bool RouterContext::HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID)
{
if (typeID == eI2NPDeliveryStatus)
if (typeID == eI2NPTunnelTest)
{
// try tunnel test
auto pool = GetTunnelPool ();
if (pool && pool->ProcessDeliveryStatus (bufbe32toh (payload + DELIVERY_STATUS_MSGID_OFFSET), bufbe64toh (payload + DELIVERY_STATUS_TIMESTAMP_OFFSET)))
if (pool && pool->ProcessTunnelTest (bufbe32toh (payload + TUNNEL_TEST_MSGID_OFFSET), bufbe64toh (payload + TUNNEL_TEST_TIMESTAMP_OFFSET)))
return true;
}
auto msg = CreateI2NPMessage (typeID, payload, len, msgID);
@ -1236,7 +1236,7 @@ namespace i2p @@ -1236,7 +1236,7 @@ namespace i2p
uint32_t RouterContext::GetUptime () const
{
return std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now() - m_StartupTime).count ();
return i2p::util::GetMonotonicSeconds () - m_StartupTime;
}
bool RouterContext::Decrypt (const uint8_t * encrypted, uint8_t * data, i2p::data::CryptoKeyType preferredCrypto) const

3
libi2pd/RouterContext.h

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
#include <inttypes.h>
#include <string>
#include <memory>
#include <chrono>
#include <set>
#include <boost/asio.hpp>
#include "Identity.h"
@ -241,7 +240,7 @@ namespace garlic @@ -241,7 +240,7 @@ namespace garlic
std::shared_ptr<i2p::garlic::RouterIncomingRatchetSession> m_ECIESSession;
uint64_t m_LastUpdateTime; // in seconds
bool m_AcceptsTunnels, m_IsFloodfill;
std::chrono::time_point<std::chrono::steady_clock> m_StartupTime;
uint64_t m_StartupTime; // monotonic seconds
uint64_t m_BandwidthLimit; // allowed bandwidth
int m_ShareRatio;
RouterStatus m_Status, m_StatusV6;

50
libi2pd/SSU2.cpp

@ -256,8 +256,33 @@ namespace transport @@ -256,8 +256,33 @@ namespace transport
socket.open (localEndpoint.protocol ());
if (localEndpoint.address ().is_v6 ())
socket.set_option (boost::asio::ip::v6_only (true));
socket.set_option (boost::asio::socket_base::receive_buffer_size (SSU2_SOCKET_RECEIVE_BUFFER_SIZE));
socket.set_option (boost::asio::socket_base::send_buffer_size (SSU2_SOCKET_SEND_BUFFER_SIZE));
uint64_t bufferSize = i2p::context.GetBandwidthLimit() * 1024 / 5; // max lag = 200ms
bufferSize = std::max(SSU2_SOCKET_MIN_BUFFER_SIZE, std::min(bufferSize, SSU2_SOCKET_MAX_BUFFER_SIZE));
boost::asio::socket_base::receive_buffer_size receiveBufferSizeSet (bufferSize);
boost::asio::socket_base::send_buffer_size sendBufferSizeSet (bufferSize);
socket.set_option (receiveBufferSizeSet);
socket.set_option (sendBufferSizeSet);
boost::asio::socket_base::receive_buffer_size receiveBufferSizeGet;
boost::asio::socket_base::send_buffer_size sendBufferSizeGet;
socket.get_option (receiveBufferSizeGet);
socket.get_option (sendBufferSizeGet);
if (receiveBufferSizeGet.value () != receiveBufferSizeSet.value () ||
sendBufferSizeGet.value () != sendBufferSizeSet.value ())
{
LogPrint (eLogWarning, "SSU2: Socket receive buffer size: requested = ",
receiveBufferSizeSet.value (), ", got = ", receiveBufferSizeGet.value ());
LogPrint (eLogWarning, "SSU2: Socket send buffer size: requested = ",
sendBufferSizeSet.value (), ", got = ", sendBufferSizeGet.value ());
}
else
{
LogPrint (eLogInfo, "SSU2: Socket receive buffer size: ", receiveBufferSizeGet.value ());
LogPrint (eLogInfo, "SSU2: Socket send buffer size: ", sendBufferSizeGet.value ());
}
socket.non_blocking (true);
}
catch (std::exception& ex )
{
@ -469,7 +494,7 @@ namespace transport @@ -469,7 +494,7 @@ namespace transport
m_PendingOutgoingSessions.erase (ep);
}
std::shared_ptr<SSU2Session> SSU2Server::GetRandomSession (
std::shared_ptr<SSU2Session> SSU2Server::GetRandomPeerTestSession (
i2p::data::RouterInfo::CompatibleTransports remoteTransports, const i2p::data::IdentHash& excluded) const
{
if (m_Sessions.empty ()) return nullptr;
@ -480,7 +505,7 @@ namespace transport @@ -480,7 +505,7 @@ namespace transport
std::advance (it, ind);
while (it != m_Sessions.end ())
{
if ((it->second->GetRemoteTransports () & remoteTransports) &&
if ((it->second->GetRemotePeerTestTransports () & remoteTransports) &&
it->second->GetRemoteIdentity ()->GetIdentHash () != excluded)
return it->second;
it++;
@ -489,7 +514,7 @@ namespace transport @@ -489,7 +514,7 @@ namespace transport
it = m_Sessions.begin ();
while (it != m_Sessions.end () && ind)
{
if ((it->second->GetRemoteTransports () & remoteTransports) &&
if ((it->second->GetRemotePeerTestTransports () & remoteTransports) &&
it->second->GetRemoteIdentity ()->GetIdentHash () != excluded)
return it->second;
it++; ind--;
@ -637,7 +662,10 @@ namespace transport @@ -637,7 +662,10 @@ namespace transport
if (!ec)
i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen);
else
LogPrint (eLogError, "SSU2: Send exception: ", ec.message (), " to ", to);
{
LogPrint (ec == boost::asio::error::would_block ? eLogInfo : eLogError,
"SSU2: Send exception: ", ec.message (), " to ", to);
}
}
void SSU2Server::Send (const uint8_t * header, size_t headerLen, const uint8_t * headerX, size_t headerXLen,
@ -671,7 +699,10 @@ namespace transport @@ -671,7 +699,10 @@ namespace transport
if (!ec)
i2p::transport::transports.UpdateSentBytes (headerLen + headerXLen + payloadLen);
else
LogPrint (eLogError, "SSU2: Send exception: ", ec.message (), " to ", to);
{
LogPrint (ec == boost::asio::error::would_block ? eLogInfo : eLogError,
"SSU2: Send exception: ", ec.message (), " to ", to);
}
}
bool SSU2Server::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router,
@ -825,8 +856,11 @@ namespace transport @@ -825,8 +856,11 @@ namespace transport
auto it = m_SessionsByRouterHash.find (router->GetIdentHash ());
if (it != m_SessionsByRouterHash.end ())
{
auto remoteAddr = it->second->GetAddress ();
if (!remoteAddr || !remoteAddr->IsPeerTesting () ||
(v4 && !remoteAddr->IsV4 ()) || (!v4 && !remoteAddr->IsV6 ())) return false;
auto s = it->second;
if (it->second->IsEstablished ())
if (s->IsEstablished ())
GetService ().post ([s]() { s->SendPeerTest (); });
else
s->SetOnEstablished ([s]() { s->SendPeerTest (); });

6
libi2pd/SSU2.h

@ -25,8 +25,8 @@ namespace transport @@ -25,8 +25,8 @@ namespace transport
const int SSU2_RESEND_CHECK_TIMEOUT_VARIANCE = 100; // in milliseconds
const int SSU2_RESEND_CHECK_MORE_TIMEOUT = 10; // in milliseconds
const size_t SSU2_MAX_RESEND_PACKETS = 128; // packets to resend at the time
const size_t SSU2_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K
const size_t SSU2_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K
const uint64_t SSU2_SOCKET_MIN_BUFFER_SIZE = 128 * 1024;
const uint64_t SSU2_SOCKET_MAX_BUFFER_SIZE = 4 * 1024 * 1024;
const size_t SSU2_MAX_NUM_INTRODUCERS = 3;
const size_t SSU2_MIN_RECEIVED_PACKET_SIZE = 40; // 16 byte short header + 8 byte minimum payload + 16 byte MAC
const int SSU2_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour
@ -77,7 +77,7 @@ namespace transport @@ -77,7 +77,7 @@ namespace transport
void RemovePendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep);
std::shared_ptr<SSU2Session> FindSession (const i2p::data::IdentHash& ident) const;
std::shared_ptr<SSU2Session> FindPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) const;
std::shared_ptr<SSU2Session> GetRandomSession (i2p::data::RouterInfo::CompatibleTransports remoteTransports,
std::shared_ptr<SSU2Session> GetRandomPeerTestSession (i2p::data::RouterInfo::CompatibleTransports remoteTransports,
const i2p::data::IdentHash& excluded) const;
void AddRelay (uint32_t tag, std::shared_ptr<SSU2Session> relay);

15
libi2pd/SSU2Session.cpp

@ -81,7 +81,7 @@ namespace transport @@ -81,7 +81,7 @@ namespace transport
SSU2Session::SSU2Session (SSU2Server& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter,
std::shared_ptr<const i2p::data::RouterInfo::Address> addr):
TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT),
m_Server (server), m_Address (addr), m_RemoteTransports (0),
m_Server (server), m_Address (addr), m_RemoteTransports (0), m_RemotePeerTestTransports (0),
m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown),
m_SendPacketNum (0), m_ReceivePacketNum (0), m_LastDatetimeSentPacketNum (0),
m_IsDataReceived (false), m_WindowSize (SSU2_MIN_WINDOW_SIZE),
@ -96,6 +96,8 @@ namespace transport @@ -96,6 +96,8 @@ namespace transport
InitNoiseXKState1 (*m_NoiseState, m_Address->s);
m_RemoteEndpoint = boost::asio::ip::udp::endpoint (m_Address->host, m_Address->port);
m_RemoteTransports = in_RemoteRouter->GetCompatibleTransports (false);
if (in_RemoteRouter->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4;
if (in_RemoteRouter->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6;
RAND_bytes ((uint8_t *)&m_DestConnID, 8);
RAND_bytes ((uint8_t *)&m_SourceConnID, 8);
}
@ -1110,6 +1112,10 @@ namespace transport @@ -1110,6 +1112,10 @@ namespace transport
AdjustMaxPayloadSize ();
m_Server.AddSessionByRouterHash (shared_from_this ()); // we know remote router now
m_RemoteTransports = ri->GetCompatibleTransports (false);
m_RemotePeerTestTransports = 0;
if (ri->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4;
if (ri->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6;
// handle other blocks
HandlePayload (decryptedPayload.data () + riSize + 3, decryptedPayload.size () - riSize - 3);
Established ();
@ -2109,7 +2115,7 @@ namespace transport @@ -2109,7 +2115,7 @@ namespace transport
{
case 1: // Bob from Alice
{
auto session = m_Server.GetRandomSession ((buf[12] == 6) ? i2p::data::RouterInfo::eSSU2V4 : i2p::data::RouterInfo::eSSU2V6,
auto session = m_Server.GetRandomPeerTestSession ((buf[12] == 6) ? i2p::data::RouterInfo::eSSU2V4 : i2p::data::RouterInfo::eSSU2V6,
GetRemoteIdentity ()->GetIdentHash ());
if (session) // session with Charlie
{
@ -2180,7 +2186,8 @@ namespace transport @@ -2180,7 +2186,8 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo::Address> addr;
if (ExtractEndpoint (buf + offset + 10, asz, ep))
addr = r->GetSSU2Address (ep.address ().is_v4 ());
if (addr && m_Server.IsSupported (ep.address ()))
if (addr && m_Server.IsSupported (ep.address ()) &&
i2p::context.GetRouterInfo ().IsSSU2PeerTesting (ep.address ().is_v4 ()))
{
// send msg 5 to Alice
auto session = std::make_shared<SSU2Session> (m_Server, r, addr);
@ -2280,7 +2287,7 @@ namespace transport @@ -2280,7 +2287,7 @@ namespace transport
if (GetTestingState ())
{
SetTestingState (false);
if (GetRouterStatus () != eRouterStatusFirewalled)
if (GetRouterStatus () != eRouterStatusFirewalled && addr->IsPeerTesting ())
{
SetRouterStatus (eRouterStatusFirewalled);
if (m_Address->IsV4 ())

3
libi2pd/SSU2Session.h

@ -238,6 +238,7 @@ namespace transport @@ -238,6 +238,7 @@ namespace transport
void SetRemoteEndpoint (const boost::asio::ip::udp::endpoint& ep) { m_RemoteEndpoint = ep; };
const boost::asio::ip::udp::endpoint& GetRemoteEndpoint () const { return m_RemoteEndpoint; };
i2p::data::RouterInfo::CompatibleTransports GetRemoteTransports () const { return m_RemoteTransports; };
i2p::data::RouterInfo::CompatibleTransports GetRemotePeerTestTransports () const { return m_RemotePeerTestTransports; };
std::shared_ptr<const i2p::data::RouterInfo::Address> GetAddress () const { return m_Address; };
void SetOnEstablished (OnEstablished e) { m_OnEstablished = e; };
OnEstablished GetOnEstablished () const { return m_OnEstablished; };
@ -343,7 +344,7 @@ namespace transport @@ -343,7 +344,7 @@ namespace transport
std::unique_ptr<HandshakePacket> m_SentHandshakePacket; // SessionRequest, SessionCreated or SessionConfirmed
std::shared_ptr<const i2p::data::RouterInfo::Address> m_Address;
boost::asio::ip::udp::endpoint m_RemoteEndpoint;
i2p::data::RouterInfo::CompatibleTransports m_RemoteTransports; // for peer tests
i2p::data::RouterInfo::CompatibleTransports m_RemoteTransports, m_RemotePeerTestTransports;
uint64_t m_DestConnID, m_SourceConnID;
SSU2SessionState m_State;
uint8_t m_KeyDataSend[64], m_KeyDataReceive[64];

9
libi2pd/Streaming.cpp

@ -433,7 +433,10 @@ namespace stream @@ -433,7 +433,10 @@ namespace stream
LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime);
rtt = 1;
}
m_RTT = std::round ((m_RTT*seqn + rtt)/(seqn + 1.0));
if (seqn)
m_RTT = std::round (RTT_EWMA_ALPHA * m_RTT + (1.0 - RTT_EWMA_ALPHA) * rtt);
else
m_RTT = rtt;
m_RTO = m_RTT*1.5; // TODO: implement it better
LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime);
m_SentPackets.erase (it++);
@ -998,8 +1001,8 @@ namespace stream @@ -998,8 +1001,8 @@ namespace stream
m_RTO *= 2;
switch (m_NumResendAttempts)
{
case 1: // congesion avoidance
m_WindowSize >>= 1; // /2
case 1: // congestion avoidance
m_WindowSize -= (m_WindowSize + WINDOW_SIZE_DROP_FRACTION) / WINDOW_SIZE_DROP_FRACTION; // adjustment >= 1
if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE;
break;
case 2:

2
libi2pd/Streaming.h

@ -56,6 +56,8 @@ namespace stream @@ -56,6 +56,8 @@ namespace stream
const int WINDOW_SIZE = 6; // in messages
const int MIN_WINDOW_SIZE = 1;
const int MAX_WINDOW_SIZE = 128;
const int WINDOW_SIZE_DROP_FRACTION = 10; // 1/10
const double RTT_EWMA_ALPHA = 0.5;
const int INITIAL_RTT = 8000; // in milliseconds
const int INITIAL_RTO = 9000; // in milliseconds
const int MIN_SEND_ACK_TIMEOUT = 2; // in milliseconds

20
libi2pd/Timestamp.cpp

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
* Copyright (c) 2013-2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@ -232,6 +232,24 @@ namespace util @@ -232,6 +232,24 @@ namespace util
return GetLocalHoursSinceEpoch () + g_TimeOffset/3600;
}
uint64_t GetMonotonicMicroseconds()
{
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
uint64_t GetMonotonicMilliseconds()
{
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
uint64_t GetMonotonicSeconds ()
{
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
void GetCurrentDate (char * date)
{
GetDateString (GetSecondsSinceEpoch (), date);

6
libi2pd/Timestamp.h

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
* Copyright (c) 2013-2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@ -24,6 +24,10 @@ namespace util @@ -24,6 +24,10 @@ namespace util
uint32_t GetMinutesSinceEpoch ();
uint32_t GetHoursSinceEpoch ();
uint64_t GetMonotonicMicroseconds ();
uint64_t GetMonotonicMilliseconds ();
uint64_t GetMonotonicSeconds ();
void GetCurrentDate (char * date); // returns date as YYYYMMDD string, 9 bytes
void GetDateString (uint64_t timestamp, char * date); // timestamp is seconds since epoch, returns date as YYYYMMDD string, 9 bytes
void AdjustTimeOffset (int64_t offset); // in seconds from current

18
libi2pd/Transports.cpp

@ -460,9 +460,8 @@ namespace transport @@ -460,9 +460,8 @@ namespace transport
auto it = m_Peers.find (ident);
if (it == m_Peers.end ())
{
// check if not known as unreachable
auto profile = i2p::data::GetRouterProfile (ident);
if (profile && profile->IsUnreachable ()) return; // don't create peer to unreachable router
// check if not banned
if (i2p::data::IsRouterBanned (ident)) return; // don't create peer to unreachable router
// try to connect
bool connected = false;
try
@ -491,10 +490,9 @@ namespace transport @@ -491,10 +490,9 @@ namespace transport
{
if (sz < CHECK_PROFILE_NUM_DELAYED_MESSAGES && sz + msgs.size () >= CHECK_PROFILE_NUM_DELAYED_MESSAGES)
{
auto profile = i2p::data::GetRouterProfile (ident);
if (profile && profile->IsUnreachable ())
if (i2p::data::IsRouterBanned (ident))
{
LogPrint (eLogWarning, "Transports: Peer profile for ", ident.ToBase64 (), " reports unreachable. Dropped");
LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped");
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (it);
return;
@ -589,6 +587,14 @@ namespace transport @@ -589,6 +587,14 @@ namespace transport
m_Peers.erase (ident);
return false;
}
else if (i2p::data::IsRouterBanned (ident))
{
LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped");
peer.Done ();
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (ident);
return false;
}
else // otherwise request RI
{
LogPrint (eLogInfo, "Transports: RouterInfo for ", ident.ToBase64 (), " not found, requested");

6
libi2pd/Tunnel.cpp

@ -33,7 +33,7 @@ namespace tunnel @@ -33,7 +33,7 @@ namespace tunnel
TunnelBase (config->GetTunnelID (), config->GetNextTunnelID (), config->GetNextIdentHash ()),
m_Config (config), m_IsShortBuildMessage (false), m_Pool (nullptr),
m_State (eTunnelStatePending), m_FarEndTransports (i2p::data::RouterInfo::eAllTransports),
m_IsRecreated (false), m_Latency (0)
m_IsRecreated (false), m_Latency (UNKNOWN_LATENCY)
{
}
@ -198,10 +198,10 @@ namespace tunnel @@ -198,10 +198,10 @@ namespace tunnel
return established;
}
bool Tunnel::LatencyFitsRange(uint64_t lower, uint64_t upper) const
bool Tunnel::LatencyFitsRange(int lowerbound, int upperbound) const
{
auto latency = GetMeanLatency();
return latency >= lower && latency <= upper;
return latency >= lowerbound && latency <= upperbound;
}
void Tunnel::EncryptTunnelMsg (std::shared_ptr<const I2NPMessage> in, std::shared_ptr<I2NPMessage> out)

15
libi2pd/Tunnel.h

@ -39,7 +39,8 @@ namespace tunnel @@ -39,7 +39,8 @@ namespace tunnel
const int TUNNEL_CREATION_TIMEOUT = 30; // 30 seconds
const int STANDARD_NUM_RECORDS = 4; // in VariableTunnelBuild message
const int MAX_NUM_RECORDS = 8;
const int HIGH_LATENCY_PER_HOP = 250; // in milliseconds
const int UNKNOWN_LATENCY = -1;
const int HIGH_LATENCY_PER_HOP = 250000; // in microseconds
const int MAX_TUNNEL_MSGS_BATCH_SIZE = 100; // handle messages without interrupt
const uint16_t DEFAULT_MAX_NUM_TRANSIT_TUNNELS = 5000;
const int TUNNEL_MANAGE_INTERVAL = 15; // in seconds
@ -108,14 +109,14 @@ namespace tunnel @@ -108,14 +109,14 @@ namespace tunnel
void EncryptTunnelMsg (std::shared_ptr<const I2NPMessage> in, std::shared_ptr<I2NPMessage> out) override;
/** @brief add latency sample */
void AddLatencySample(const uint64_t ms) { m_Latency = (m_Latency + ms) >> 1; }
void AddLatencySample(const int us) { m_Latency = LatencyIsKnown() ? (m_Latency + us) >> 1 : us; }
/** @brief get this tunnel's estimated latency */
uint64_t GetMeanLatency() const { return m_Latency; }
int GetMeanLatency() const { return (m_Latency + 500) / 1000; }
/** @brief return true if this tunnel's latency fits in range [lowerbound, upperbound] */
bool LatencyFitsRange(uint64_t lowerbound, uint64_t upperbound) const;
bool LatencyFitsRange(int lowerbound, int upperbound) const;
bool LatencyIsKnown() const { return m_Latency > 0; }
bool IsSlow () const { return LatencyIsKnown() && (int)m_Latency > HIGH_LATENCY_PER_HOP*GetNumHops (); }
bool LatencyIsKnown() const { return m_Latency != UNKNOWN_LATENCY; }
bool IsSlow () const { return LatencyIsKnown() && m_Latency > HIGH_LATENCY_PER_HOP*GetNumHops (); }
/** visit all hops we currently store */
void VisitTunnelHops(TunnelHopVisitor v);
@ -129,7 +130,7 @@ namespace tunnel @@ -129,7 +130,7 @@ namespace tunnel
TunnelState m_State;
i2p::data::RouterInfo::CompatibleTransports m_FarEndTransports;
bool m_IsRecreated; // if tunnel is replaced by new, or new tunnel requested to replace
uint64_t m_Latency; // in milliseconds
int m_Latency; // in microseconds
};
class OutboundTunnel: public Tunnel

69
libi2pd/TunnelPool.cpp

@ -102,8 +102,11 @@ namespace tunnel @@ -102,8 +102,11 @@ namespace tunnel
it->SetTunnelPool (nullptr);
m_OutboundTunnels.clear ();
}
{
std::unique_lock<std::mutex> l(m_TestsMutex);
m_Tests.clear ();
}
}
bool TunnelPool::Reconfigure(int inHops, int outHops, int inQuant, int outQuant)
{
@ -145,8 +148,11 @@ namespace tunnel @@ -145,8 +148,11 @@ namespace tunnel
if (expiredTunnel)
{
expiredTunnel->SetTunnelPool (nullptr);
{
std::unique_lock<std::mutex> l(m_TestsMutex);
for (auto& it: m_Tests)
if (it.second.second == expiredTunnel) it.second.second = nullptr;
}
std::unique_lock<std::mutex> l(m_InboundTunnelsMutex);
m_InboundTunnels.erase (expiredTunnel);
@ -167,8 +173,11 @@ namespace tunnel @@ -167,8 +173,11 @@ namespace tunnel
if (expiredTunnel)
{
expiredTunnel->SetTunnelPool (nullptr);
{
std::unique_lock<std::mutex> l(m_TestsMutex);
for (auto& it: m_Tests)
if (it.second.first == expiredTunnel) it.second.first = nullptr;
}
std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex);
m_OutboundTunnels.erase (expiredTunnel);
@ -342,7 +351,7 @@ namespace tunnel @@ -342,7 +351,7 @@ namespace tunnel
else
it.second.first->SetState (eTunnelStateTestFailed);
}
else
else if (it.second.first->GetState () != eTunnelStateExpiring)
it.second.first->SetState (eTunnelStateTestFailed);
}
if (it.second.second)
@ -360,18 +369,19 @@ namespace tunnel @@ -360,18 +369,19 @@ namespace tunnel
if (m_LocalDestination)
m_LocalDestination->SetLeaseSetUpdated ();
}
else
else if (it.second.second->GetState () != eTunnelStateExpiring)
it.second.second->SetState (eTunnelStateTestFailed);
}
}
// new tests
if (!m_LocalDestination) return;
std::vector<std::pair<std::shared_ptr<OutboundTunnel>, std::shared_ptr<InboundTunnel> > > newTests;
std::vector<std::shared_ptr<OutboundTunnel> > outboundTunnels;
{
std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex);
for (auto& it: m_OutboundTunnels)
if (it->IsEstablished () || it->GetState () == eTunnelStateTestFailed)
if (it->IsEstablished ())
outboundTunnels.push_back (it);
}
std::shuffle (outboundTunnels.begin(), outboundTunnels.end(), m_Rng);
@ -379,7 +389,7 @@ namespace tunnel @@ -379,7 +389,7 @@ namespace tunnel
{
std::unique_lock<std::mutex> l(m_InboundTunnelsMutex);
for (auto& it: m_InboundTunnels)
if (it->IsEstablished () || it->GetState () == eTunnelStateTestFailed)
if (it->IsEstablished ())
inboundTunnels.push_back (it);
}
std::shuffle (inboundTunnels.begin(), inboundTunnels.end(), m_Rng);
@ -390,7 +400,7 @@ namespace tunnel @@ -390,7 +400,7 @@ namespace tunnel
newTests.push_back(std::make_pair (*it1, *it2));
++it1; ++it2;
}
bool encrypt = m_LocalDestination ? m_LocalDestination->SupportsEncryptionType (i2p::data::CRYPTO_KEY_TYPE_ECIES_X25519_AEAD) : false;
bool isECIES = m_LocalDestination->SupportsEncryptionType (i2p::data::CRYPTO_KEY_TYPE_ECIES_X25519_AEAD);
for (auto& it: newTests)
{
uint32_t msgID;
@ -399,7 +409,7 @@ namespace tunnel @@ -399,7 +409,7 @@ namespace tunnel
std::unique_lock<std::mutex> l(m_TestsMutex);
m_Tests[msgID] = it;
}
auto msg = CreateDeliveryStatusMsg (msgID);
auto msg = CreateTunnelTestMsg (msgID);
auto outbound = it.first;
auto s = shared_from_this ();
msg->onDrop = [msgID, outbound, s]()
@ -415,14 +425,22 @@ namespace tunnel @@ -415,14 +425,22 @@ namespace tunnel
s->m_OutboundTunnels.erase (outbound);
}
};
if (encrypt)
{
// encrypt
if (isECIES)
{
uint8_t key[32]; RAND_bytes (key, 32);
uint64_t tag; RAND_bytes ((uint8_t *)&tag, 8);
m_LocalDestination->SubmitECIESx25519Key (key, tag);
msg = i2p::garlic::WrapECIESX25519Message (msg, key, tag);
}
else
{
uint8_t key[32], tag[32];
RAND_bytes (key, 32); RAND_bytes (tag, 32);
m_LocalDestination->SubmitSessionKey (key, tag);
i2p::garlic::ElGamalAESSession garlic (key, tag);
msg = garlic.WrapSingleMessage (msg);
}
outbound->SendTunnelDataMsgTo (it.second->GetNextIdentHash (), it.second->GetNextTunnelID (), msg);
}
}
@ -446,22 +464,24 @@ namespace tunnel @@ -446,22 +464,24 @@ namespace tunnel
}
void TunnelPool::ProcessDeliveryStatus (std::shared_ptr<I2NPMessage> msg)
{
const uint8_t * buf = msg->GetPayload ();
uint32_t msgID = bufbe32toh (buf);
buf += 4;
uint64_t timestamp = bufbe64toh (buf);
if (!ProcessDeliveryStatus (msgID, timestamp))
{
if (m_LocalDestination)
m_LocalDestination->ProcessDeliveryStatusMessage (msg);
else
LogPrint (eLogWarning, "Tunnels: Local destination doesn't exist, dropped");
}
void TunnelPool::ProcessTunnelTest (std::shared_ptr<I2NPMessage> msg)
{
const uint8_t * buf = msg->GetPayload ();
uint32_t msgID = bufbe32toh (buf);
buf += 4;
uint64_t timestamp = bufbe64toh (buf);
ProcessTunnelTest (msgID, timestamp);
}
bool TunnelPool::ProcessDeliveryStatus (uint32_t msgID, uint64_t timestamp)
bool TunnelPool::ProcessTunnelTest (uint32_t msgID, uint64_t timestamp)
{
decltype(m_Tests)::mapped_type test;
bool found = false;
@ -477,8 +497,9 @@ namespace tunnel @@ -477,8 +497,9 @@ namespace tunnel
}
if (found)
{
uint64_t dlt = i2p::util::GetMillisecondsSinceEpoch () - timestamp;
LogPrint (eLogDebug, "Tunnels: Test of ", msgID, " successful. ", dlt, " milliseconds");
int dlt = (uint64_t)i2p::util::GetMonotonicMicroseconds () - (int64_t)timestamp;
LogPrint (eLogDebug, "Tunnels: Test of ", msgID, " successful. ", dlt, " microseconds");
if (dlt < 0) dlt = 0; // should not happen
int numHops = 0;
if (test.first) numHops += test.first->GetNumHops ();
if (test.second) numHops += test.second->GetNumHops ();
@ -488,20 +509,20 @@ namespace tunnel @@ -488,20 +509,20 @@ namespace tunnel
if (test.first->GetState () != eTunnelStateExpiring)
test.first->SetState (eTunnelStateEstablished);
// update latency
uint64_t latency = 0;
int latency = 0;
if (numHops) latency = dlt*test.first->GetNumHops ()/numHops;
if (!latency) latency = dlt/2;
test.first->AddLatencySample(latency);
test.first->AddLatencySample (latency);
}
if (test.second)
{
if (test.second->GetState () != eTunnelStateExpiring)
test.second->SetState (eTunnelStateEstablished);
// update latency
uint64_t latency = 0;
int latency = 0;
if (numHops) latency = dlt*test.second->GetNumHops ()/numHops;
if (!latency) latency = dlt/2;
test.second->AddLatencySample(latency);
test.second->AddLatencySample (latency);
}
}
return found;
@ -823,7 +844,7 @@ namespace tunnel @@ -823,7 +844,7 @@ namespace tunnel
{
std::shared_ptr<InboundTunnel> tun = nullptr;
std::unique_lock<std::mutex> lock(m_InboundTunnelsMutex);
uint64_t min = 1000000;
int min = 1000000;
for (const auto & itr : m_InboundTunnels) {
if(!itr->LatencyIsKnown()) continue;
auto l = itr->GetMeanLatency();
@ -839,7 +860,7 @@ namespace tunnel @@ -839,7 +860,7 @@ namespace tunnel
{
std::shared_ptr<OutboundTunnel> tun = nullptr;
std::unique_lock<std::mutex> lock(m_OutboundTunnelsMutex);
uint64_t min = 1000000;
int min = 1000000;
for (const auto & itr : m_OutboundTunnels) {
if(!itr->LatencyIsKnown()) continue;
auto l = itr->GetMeanLatency();

9
libi2pd/TunnelPool.h

@ -85,7 +85,8 @@ namespace tunnel @@ -85,7 +85,8 @@ namespace tunnel
void ManageTunnels (uint64_t ts);
void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg);
void ProcessDeliveryStatus (std::shared_ptr<I2NPMessage> msg);
bool ProcessDeliveryStatus (uint32_t msgID, uint64_t timestamp);
void ProcessTunnelTest (std::shared_ptr<I2NPMessage> msg);
bool ProcessTunnelTest (uint32_t msgID, uint64_t timestamp);
bool IsExploratory () const;
bool IsActive () const { return m_IsActive; };
@ -105,7 +106,7 @@ namespace tunnel @@ -105,7 +106,7 @@ namespace tunnel
bool HasCustomPeerSelector();
/** @brief make this tunnel pool yield tunnels that fit latency range [min, max] */
void RequireLatency(uint64_t min, uint64_t max) { m_MinLatency = min; m_MaxLatency = max; }
void RequireLatency(int min, int max) { m_MinLatency = min; m_MaxLatency = max; }
/** @brief return true if this tunnel pool has a latency requirement */
bool HasLatencyRequirement() const { return m_MinLatency > 0 && m_MaxLatency > 0; }
@ -150,8 +151,8 @@ namespace tunnel @@ -150,8 +151,8 @@ namespace tunnel
std::mutex m_CustomPeerSelectorMutex;
ITunnelPeerSelector * m_CustomPeerSelector;
uint64_t m_MinLatency = 0; // if > 0 this tunnel pool will try building tunnels with minimum latency by ms
uint64_t m_MaxLatency = 0; // if > 0 this tunnel pool will try building tunnels with maximum latency by ms
int m_MinLatency = 0; // if > 0 this tunnel pool will try building tunnels with minimum latency by ms
int m_MaxLatency = 0; // if > 0 this tunnel pool will try building tunnels with maximum latency by ms
std::random_device m_Rd;
std::mt19937 m_Rng;

Loading…
Cancel
Save