Browse Source

use share_ptr to store peers

pull/1941/merge
orignal 8 months ago
parent
commit
8fe989050e
  1. 148
      libi2pd/Transports.cpp
  2. 6
      libi2pd/Transports.h

148
libi2pd/Transports.cpp

@ -457,6 +457,7 @@ namespace transport @@ -457,6 +457,7 @@ namespace transport
return;
}
if(RoutesRestricted() && !IsRestrictedPeer(ident)) return;
std::shared_ptr<Peer> peer;
auto it = m_Peers.find (ident);
if (it == m_Peers.end ())
{
@ -470,10 +471,12 @@ namespace transport @@ -470,10 +471,12 @@ namespace transport
if (r && (r->IsUnreachable () || !r->IsReachableFrom (i2p::context.GetRouterInfo ()))) return; // router found but non-reachable
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
peer = std::make_shared<Peer>(r, ts);
std::unique_lock<std::mutex> l(m_PeersMutex);
it = m_Peers.insert (std::pair<i2p::data::IdentHash, Peer>(ident, {r, ts})).first;
peer = m_Peers.emplace (ident, peer).first->second;
}
connected = ConnectToPeer (ident, it->second);
if (peer)
connected = ConnectToPeer (ident, peer);
}
catch (std::exception& ex)
{
@ -481,11 +484,15 @@ namespace transport @@ -481,11 +484,15 @@ namespace transport
}
if (!connected) return;
}
if (it->second.IsConnected ())
it->second.sessions.front ()->SendI2NPMessages (msgs);
else
peer = it->second;
if (!peer) return;
if (peer->IsConnected ())
peer->sessions.front ()->SendI2NPMessages (msgs);
else
{
auto sz = it->second.delayedMessages.size ();
auto sz = peer->delayedMessages.size ();
if (sz < MAX_NUM_DELAYED_MESSAGES)
{
if (sz < CHECK_PROFILE_NUM_DELAYED_MESSAGES && sz + msgs.size () >= CHECK_PROFILE_NUM_DELAYED_MESSAGES)
@ -494,7 +501,7 @@ namespace transport @@ -494,7 +501,7 @@ namespace transport
{
LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped");
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (it);
m_Peers.erase (ident);
return;
}
}
@ -502,30 +509,30 @@ namespace transport @@ -502,30 +509,30 @@ namespace transport
if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop)
it1->Drop (); // drop earlier because we can handle it
else
it->second.delayedMessages.push_back (it1);
peer->delayedMessages.push_back (it1);
}
else
{
LogPrint (eLogWarning, "Transports: Delayed messages queue size to ",
ident.ToBase64 (), " exceeds ", MAX_NUM_DELAYED_MESSAGES);
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (it);
m_Peers.erase (ident);
}
}
}
bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer)
bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr<Peer> peer)
{
if (!peer.router) // reconnect
peer.SetRouter (netdb.FindRouter (ident)); // try to get new one from netdb
if (peer.router) // we have RI already
if (!peer->router) // reconnect
peer->SetRouter (netdb.FindRouter (ident)); // try to get new one from netdb
if (peer->router) // we have RI already
{
if (peer.priority.empty ())
if (peer->priority.empty ())
SetPriority (peer);
while (peer.numAttempts < (int)peer.priority.size ())
while (peer->numAttempts < (int)peer->priority.size ())
{
auto tr = peer.priority[peer.numAttempts];
peer.numAttempts++;
auto tr = peer->priority[peer->numAttempts];
peer->numAttempts++;
switch (tr)
{
case i2p::data::RouterInfo::eNTCP2V4:
@ -533,12 +540,12 @@ namespace transport @@ -533,12 +540,12 @@ namespace transport
{
if (!m_NTCP2Server) continue;
std::shared_ptr<const RouterInfo::Address> address = (tr == i2p::data::RouterInfo::eNTCP2V6) ?
peer.router->GetPublishedNTCP2V6Address () : peer.router->GetPublishedNTCP2V4Address ();
peer->router->GetPublishedNTCP2V6Address () : peer->router->GetPublishedNTCP2V4Address ();
if (address && IsInReservedRange(address->host))
address = nullptr;
if (address)
{
auto s = std::make_shared<NTCP2Session> (*m_NTCP2Server, peer.router, address);
auto s = std::make_shared<NTCP2Session> (*m_NTCP2Server, peer->router, address);
if( m_NTCP2Server->UsingProxy())
m_NTCP2Server->ConnectWithProxy(s);
else
@ -552,12 +559,12 @@ namespace transport @@ -552,12 +559,12 @@ namespace transport
{
if (!m_SSU2Server) continue;
std::shared_ptr<const RouterInfo::Address> address = (tr == i2p::data::RouterInfo::eSSU2V6) ?
peer.router->GetSSU2V6Address () : peer.router->GetSSU2V4Address ();
peer->router->GetSSU2V6Address () : peer->router->GetSSU2V4Address ();
if (address && IsInReservedRange(address->host))
address = nullptr;
if (address && address->IsReachableSSU ())
{
if (m_SSU2Server->CreateSession (peer.router, address))
if (m_SSU2Server->CreateSession (peer->router, address))
return true;
}
break;
@ -565,10 +572,10 @@ namespace transport @@ -565,10 +572,10 @@ namespace transport
case i2p::data::RouterInfo::eNTCP2V6Mesh:
{
if (!m_NTCP2Server) continue;
auto address = peer.router->GetYggdrasilAddress ();
auto address = peer->router->GetYggdrasilAddress ();
if (address)
{
auto s = std::make_shared<NTCP2Session> (*m_NTCP2Server, peer.router, address);
auto s = std::make_shared<NTCP2Session> (*m_NTCP2Server, peer->router, address);
m_NTCP2Server->Connect (s);
return true;
}
@ -580,9 +587,9 @@ namespace transport @@ -580,9 +587,9 @@ namespace transport
}
LogPrint (eLogInfo, "Transports: No compatible addresses available");
if (peer.router->IsReachableFrom (i2p::context.GetRouterInfo ()))
if (peer->router->IsReachableFrom (i2p::context.GetRouterInfo ()))
i2p::data::netdb.SetUnreachable (ident, true); // we are here because all connection attempts failed but router claimed them
peer.Done ();
peer->Done ();
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (ident);
return false;
@ -590,7 +597,7 @@ namespace transport @@ -590,7 +597,7 @@ namespace transport
else if (i2p::data::IsRouterBanned (ident))
{
LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped");
peer.Done ();
peer->Done ();
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (ident);
return false;
@ -604,7 +611,7 @@ namespace transport @@ -604,7 +611,7 @@ namespace transport
return true;
}
void Transports::SetPriority (Peer& peer) const
void Transports::SetPriority (std::shared_ptr<Peer> peer) const
{
static const std::vector<i2p::data::RouterInfo::SupportedTransports>
ntcp2Priority =
@ -623,13 +630,13 @@ namespace transport @@ -623,13 +630,13 @@ namespace transport
i2p::data::RouterInfo::eNTCP2V4,
i2p::data::RouterInfo::eNTCP2V6Mesh
};
if (!peer.router) return;
if (!peer || !peer->router) return;
auto compatibleTransports = context.GetRouterInfo ().GetCompatibleTransports (false) &
peer.router->GetCompatibleTransports (true);
auto directTransports = compatibleTransports & peer.router->GetPublishedTransports ();
peer.numAttempts = 0;
peer.priority.clear ();
bool isReal = peer.router->GetProfile ()->IsReal ();
peer->router->GetCompatibleTransports (true);
auto directTransports = compatibleTransports & peer->router->GetPublishedTransports ();
peer->numAttempts = 0;
peer->priority.clear ();
bool isReal = peer->router->GetProfile ()->IsReal ();
bool ssu2 = isReal ? (rand () & 1) : false; // try NTCP2 if router is not confirmed real
const auto& priority = ssu2 ? ssu2Priority : ntcp2Priority;
if (directTransports)
@ -643,7 +650,7 @@ namespace transport @@ -643,7 +650,7 @@ namespace transport
}
for (auto transport: priority)
if (transport & directTransports)
peer.priority.push_back (transport);
peer->priority.push_back (transport);
compatibleTransports &= ~directTransports;
}
if (compatibleTransports)
@ -651,7 +658,7 @@ namespace transport @@ -651,7 +658,7 @@ namespace transport
// then remaining
for (auto transport: priority)
if (transport & compatibleTransports)
peer.priority.push_back (transport);
peer->priority.push_back (transport);
}
}
@ -668,8 +675,8 @@ namespace transport @@ -668,8 +675,8 @@ namespace transport
if (r)
{
LogPrint (eLogDebug, "Transports: RouterInfo for ", ident.ToBase64 (), " found, trying to connect");
it->second.SetRouter (r);
if (!it->second.IsConnected ())
it->second->SetRouter (r);
if (!it->second->IsConnected ())
ConnectToPeer (ident, it->second);
}
else
@ -796,31 +803,32 @@ namespace transport @@ -796,31 +803,32 @@ namespace transport
auto it = m_Peers.find (ident);
if (it != m_Peers.end ())
{
if (it->second.numAttempts > 1)
auto peer = it->second;
if (peer->numAttempts > 1)
{
// exclude failed transports
i2p::data::RouterInfo::CompatibleTransports transports = 0;
int numExcluded = it->second.numAttempts - 1;
if (numExcluded > (int)it->second.priority.size ()) numExcluded = it->second.priority.size ();
int numExcluded = peer->numAttempts - 1;
if (numExcluded > (int)peer->priority.size ()) numExcluded = peer->priority.size ();
for (int i = 0; i < numExcluded; i++)
transports |= it->second.priority[i];
transports |= peer->priority[i];
i2p::data::netdb.ExcludeReachableTransports (ident, transports);
}
if (it->second.router && it->second.numAttempts)
if (peer->router && peer->numAttempts)
{
auto transport = it->second.priority[it->second.numAttempts-1];
auto transport = peer->priority[peer->numAttempts-1];
if (transport == i2p::data::RouterInfo::eNTCP2V4 ||
transport == i2p::data::RouterInfo::eNTCP2V6 || transport == i2p::data::RouterInfo::eNTCP2V6Mesh)
it->second.router->GetProfile ()->Connected (); // outgoing NTCP2 connection if always real
peer->router->GetProfile ()->Connected (); // outgoing NTCP2 connection if always real
i2p::data::netdb.SetUnreachable (ident, false); // clear unreachable
}
it->second.numAttempts = 0;
it->second.router = nullptr; // we don't need RouterInfo after successive connect
peer->numAttempts = 0;
peer->router = nullptr; // we don't need RouterInfo after successive connect
bool sendDatabaseStore = true;
if (it->second.delayedMessages.size () > 0)
if (it->second->delayedMessages.size () > 0)
{
// check if first message is our DatabaseStore (publishing)
auto firstMsg = it->second.delayedMessages[0];
auto firstMsg = peer->delayedMessages[0];
if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore &&
i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ())
sendDatabaseStore = false; // we have it in the list already
@ -829,9 +837,9 @@ namespace transport @@ -829,9 +837,9 @@ namespace transport
session->SendLocalRouterInfo ();
else
session->SetTerminationTimeout (10); // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds
it->second.sessions.push_back (session);
session->SendI2NPMessages (it->second.delayedMessages);
it->second.delayedMessages.clear ();
peer->sessions.push_back (session);
session->SendI2NPMessages (peer->delayedMessages);
peer->delayedMessages.clear ();
}
else // incoming connection or peer test
{
@ -846,10 +854,11 @@ namespace transport @@ -846,10 +854,11 @@ namespace transport
auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed
if (r) r->GetProfile ()->Connected ();
auto ts = i2p::util::GetSecondsSinceEpoch ();
auto peer = std::make_shared<Peer>(r, ts);
peer->sessions.push_back (session);
peer->router = nullptr;
std::unique_lock<std::mutex> l(m_PeersMutex);
auto it = m_Peers.insert (std::make_pair (ident, Peer{ r, ts })).first;
it->second.sessions.push_back (session);
it->second.router = nullptr;
m_Peers.emplace (ident, peer);
}
});
}
@ -864,15 +873,16 @@ namespace transport @@ -864,15 +873,16 @@ namespace transport
auto it = m_Peers.find (ident);
if (it != m_Peers.end ())
{
bool wasConnected = it->second.IsConnected ();
it->second.sessions.remove (session);
if (!it->second.IsConnected ())
auto peer = it->second;
bool wasConnected = peer->IsConnected ();
peer->sessions.remove (session);
if (!peer->IsConnected ())
{
if (it->second.delayedMessages.size () > 0)
if (peer->delayedMessages.size () > 0)
{
if (wasConnected) // we had an active session before
it->second.numAttempts = 0; // start over
ConnectToPeer (ident, it->second);
peer->numAttempts = 0; // start over
ConnectToPeer (ident, peer);
}
else
{
@ -898,12 +908,12 @@ namespace transport @@ -898,12 +908,12 @@ namespace transport
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto it = m_Peers.begin (); it != m_Peers.end (); )
{
it->second.sessions.remove_if (
it->second->sessions.remove_if (
[](std::shared_ptr<TransportSession> session)->bool
{
return !session || !session->IsEstablished ();
});
if (!it->second.IsConnected () && ts > it->second.creationTime + SESSION_CREATION_TIMEOUT)
if (!it->second->IsConnected () && ts > it->second->creationTime + SESSION_CREATION_TIMEOUT)
{
LogPrint (eLogWarning, "Transports: Session to peer ", it->first.ToBase64 (), " has not been created in ", SESSION_CREATION_TIMEOUT, " seconds");
/* if (!it->second.router)
@ -917,12 +927,12 @@ namespace transport @@ -917,12 +927,12 @@ namespace transport
}
else
{
if (ts > it->second.nextRouterInfoUpdateTime)
if (ts > it->second->nextRouterInfoUpdateTime)
{
auto session = it->second.sessions.front ();
auto session = it->second->sessions.front ();
if (session)
session->SendLocalRouterInfo (true);
it->second.nextRouterInfoUpdateTime = ts + PEER_ROUTER_INFO_UPDATE_INTERVAL +
it->second->nextRouterInfoUpdateTime = ts + PEER_ROUTER_INFO_UPDATE_INTERVAL +
rand () % PEER_ROUTER_INFO_UPDATE_INTERVAL_VARIANCE;
}
++it;
@ -1042,13 +1052,13 @@ namespace transport @@ -1042,13 +1052,13 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo> Transports::GetRandomPeer (bool isHighBandwidth) const
{
return GetRandomPeer (
[isHighBandwidth](const Peer& peer)->bool
[isHighBandwidth](std::shared_ptr<const Peer> peer)->bool
{
// connected, not overloaded and not slow
return !peer.router && peer.IsConnected () && peer.isReachable &&
peer.sessions.front ()->GetSendQueueSize () <= PEER_ROUTER_INFO_OVERLOAD_QUEUE_SIZE &&
!peer.sessions.front ()->IsSlow () && !peer.sessions.front ()->IsBandwidthExceeded (peer.isHighBandwidth) &&
(!isHighBandwidth || peer.isHighBandwidth);
return !peer->router && peer->IsConnected () && peer->isReachable &&
peer->sessions.front ()->GetSendQueueSize () <= PEER_ROUTER_INFO_OVERLOAD_QUEUE_SIZE &&
!peer->sessions.front ()->IsSlow () && !peer->sessions.front ()->IsBandwidthExceeded (peer->isHighBandwidth) &&
(!isHighBandwidth || peer->isHighBandwidth);
});
}

6
libi2pd/Transports.h

@ -191,8 +191,8 @@ namespace transport @@ -191,8 +191,8 @@ namespace transport
void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident);
void PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs);
bool ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer);
void SetPriority (Peer& peer) const;
bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr<Peer> peer);
void SetPriority (std::shared_ptr<Peer> peer) const;
void HandlePeerCleanupTimer (const boost::system::error_code& ecode);
void HandlePeerTestTimer (const boost::system::error_code& ecode);
void HandleUpdateBandwidthTimer (const boost::system::error_code& ecode);
@ -215,7 +215,7 @@ namespace transport @@ -215,7 +215,7 @@ namespace transport
SSU2Server * m_SSU2Server;
NTCP2Server * m_NTCP2Server;
mutable std::mutex m_PeersMutex;
std::unordered_map<i2p::data::IdentHash, Peer> m_Peers;
std::unordered_map<i2p::data::IdentHash, std::shared_ptr<Peer> > m_Peers;
X25519KeysPairSupplier m_X25519KeysPairSupplier;

Loading…
Cancel
Save