Browse Source

use weak_ptr for Bob's peer tests and relay tags

gha
orignal 2 months ago
parent
commit
c3a1631319
  1. 13
      libi2pd/SSU2.cpp
  2. 3
      libi2pd/SSU2.h
  3. 16
      libi2pd/SSU2Session.cpp
  4. 2
      libi2pd/SSU2Session.h

13
libi2pd/SSU2.cpp

@ -370,7 +370,7 @@ namespace transport
{ {
std::vector<Packet *> packets; std::vector<Packet *> packets;
packets.push_back (packet); packets.push_back (packet);
while (moreBytes && packets.size () < 32) while (moreBytes && packets.size () < SSU2_MAX_NUM_PACKETS_PER_BATCH)
{ {
packet = m_PacketsPool.AcquireMt (); packet = m_PacketsPool.AcquireMt ();
packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 0, ec); packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 0, ec);
@ -599,9 +599,12 @@ namespace transport
auto it = m_Relays.find (tag); auto it = m_Relays.find (tag);
if (it != m_Relays.end ()) if (it != m_Relays.end ())
{ {
if (it->second->IsEstablished ()) if (!it->second.expired ())
return it->second; {
else auto s = it->second.lock ();
if (s && s->IsEstablished ())
return s;
}
m_Relays.erase (it); m_Relays.erase (it);
} }
return nullptr; return nullptr;
@ -1045,7 +1048,7 @@ namespace transport
auto ts = i2p::util::GetSecondsSinceEpoch (); auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto it = m_Relays.begin (); it != m_Relays.begin ();) for (auto it = m_Relays.begin (); it != m_Relays.begin ();)
{ {
if (it->second && it->second->GetState () == eSSU2SessionStateTerminated) if (it->second.expired ())
it = m_Relays.erase (it); it = m_Relays.erase (it);
else else
it++; it++;

3
libi2pd/SSU2.h

@ -40,6 +40,7 @@ namespace transport
const int SSU2_KEEP_ALIVE_INTERVAL_VARIANCE = 4; // in seconds const int SSU2_KEEP_ALIVE_INTERVAL_VARIANCE = 4; // in seconds
const int SSU2_PROXY_CONNECT_RETRY_TIMEOUT = 30; // in seconds const int SSU2_PROXY_CONNECT_RETRY_TIMEOUT = 30; // in seconds
const int SSU2_HOLE_PUNCH_EXPIRATION = 150; // in seconds const int SSU2_HOLE_PUNCH_EXPIRATION = 150; // in seconds
const size_t SSU2_MAX_NUM_PACKETS_PER_BATCH = 32;
class SSU2Server: private i2p::util::RunnableServiceWithWork class SSU2Server: private i2p::util::RunnableServiceWithWork
{ {
@ -168,7 +169,7 @@ namespace transport
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions; std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions;
mutable std::mutex m_PendingOutgoingSessionsMutex; mutable std::mutex m_PendingOutgoingSessionsMutex;
std::map<boost::asio::ip::udp::endpoint, std::pair<uint64_t, uint32_t> > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds) std::map<boost::asio::ip::udp::endpoint, std::pair<uint64_t, uint32_t> > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds)
std::unordered_map<uint32_t, std::shared_ptr<SSU2Session> > m_Relays; // we are introducer, relay tag -> session std::unordered_map<uint32_t, std::weak_ptr<SSU2Session> > m_Relays; // we are introducer, relay tag -> session
std::list<i2p::data::IdentHash> m_Introducers, m_IntroducersV6; // introducers we are connected to std::list<i2p::data::IdentHash> m_Introducers, m_IntroducersV6; // introducers we are connected to
i2p::util::MemoryPoolMt<Packet> m_PacketsPool; i2p::util::MemoryPoolMt<Packet> m_PacketsPool;
i2p::util::MemoryPool<SSU2SentPacket> m_SentPacketsPool; i2p::util::MemoryPool<SSU2SentPacket> m_SentPacketsPool;

16
libi2pd/SSU2Session.cpp

@ -2252,7 +2252,10 @@ namespace transport
case 3: // Bob from Charlie case 3: // Bob from Charlie
{ {
auto it = m_PeerTests.find (nonce); auto it = m_PeerTests.find (nonce);
if (it != m_PeerTests.end () && it->second.first) if (it != m_PeerTests.end ())
{
auto aliceSession = it->second.first.lock ();
if (aliceSession && aliceSession->IsEstablished ())
{ {
uint8_t payload[SSU2_MAX_PACKET_SIZE]; uint8_t payload[SSU2_MAX_PACKET_SIZE];
// Charlie's RouterInfo // Charlie's RouterInfo
@ -2260,11 +2263,11 @@ namespace transport
if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr; if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr;
size_t payloadSize = r ? CreateRouterInfoBlock (payload, m_MaxPayloadSize - len - 32, r) : 0; size_t payloadSize = r ? CreateRouterInfoBlock (payload, m_MaxPayloadSize - len - 32, r) : 0;
if (!payloadSize && r) if (!payloadSize && r)
it->second.first->SendFragmentedMessage (CreateDatabaseStoreMsg (r)); aliceSession->SendFragmentedMessage (CreateDatabaseStoreMsg (r));
if (payloadSize + len + 16 > m_MaxPayloadSize) if (payloadSize + len + 16 > m_MaxPayloadSize)
{ {
// doesn't fit one message, send RouterInfo in separate message // doesn't fit one message, send RouterInfo in separate message
it->second.first->SendData (payload, payloadSize); aliceSession->SendData (payload, payloadSize);
payloadSize = 0; payloadSize = 0;
} }
// PeerTest to Alice // PeerTest to Alice
@ -2272,7 +2275,8 @@ namespace transport
(SSU2PeerTestCode)buf[1], GetRemoteIdentity ()->GetIdentHash (), buf + offset, len - offset); (SSU2PeerTestCode)buf[1], GetRemoteIdentity ()->GetIdentHash (), buf + offset, len - offset);
if (payloadSize < m_MaxPayloadSize) if (payloadSize < m_MaxPayloadSize)
payloadSize += CreatePaddingBlock (payload + payloadSize, m_MaxPayloadSize - payloadSize); payloadSize += CreatePaddingBlock (payload + payloadSize, m_MaxPayloadSize - payloadSize);
it->second.first->SendData (payload, payloadSize); aliceSession->SendData (payload, payloadSize);
}
m_PeerTests.erase (it); m_PeerTests.erase (it);
} }
else else
@ -3028,9 +3032,9 @@ namespace transport
} }
for (auto it = m_PeerTests.begin (); it != m_PeerTests.end ();) for (auto it = m_PeerTests.begin (); it != m_PeerTests.end ();)
{ {
if (ts > it->second.second + SSU2_PEER_TEST_EXPIRATION_TIMEOUT) if (ts > it->second.second + SSU2_PEER_TEST_EXPIRATION_TIMEOUT || it->second.first.expired ())
{ {
LogPrint (eLogWarning, "SSU2: Peer test nonce ", it->first, " was not responded in ", SSU2_PEER_TEST_EXPIRATION_TIMEOUT, " seconds, deleted"); LogPrint (eLogWarning, "SSU2: Peer test nonce ", it->first, " was not responded in ", SSU2_PEER_TEST_EXPIRATION_TIMEOUT, " seconds or session invalid. Deleted");
it = m_PeerTests.erase (it); it = m_PeerTests.erase (it);
} }
else else

2
libi2pd/SSU2Session.h

@ -373,7 +373,7 @@ namespace transport
std::map<uint32_t, std::shared_ptr<SSU2SentPacket> > m_SentPackets; // packetNum -> packet std::map<uint32_t, std::shared_ptr<SSU2SentPacket> > m_SentPackets; // packetNum -> packet
std::unordered_map<uint32_t, std::shared_ptr<SSU2IncompleteMessage> > m_IncompleteMessages; // msgID -> I2NP std::unordered_map<uint32_t, std::shared_ptr<SSU2IncompleteMessage> > m_IncompleteMessages; // msgID -> I2NP
std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice
std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_PeerTests; // same as for relay sessions std::unordered_map<uint32_t, std::pair <std::weak_ptr<SSU2Session>, uint64_t > > m_PeerTests; // nonce->(Alice, timestamp). We are Bob
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
i2p::I2NPMessagesHandler m_Handler; i2p::I2NPMessagesHandler m_Handler;
bool m_IsDataReceived; bool m_IsDataReceived;

Loading…
Cancel
Save