Browse Source

cleanup pending sessions

pull/1743/head
orignal 3 years ago
parent
commit
87bf5c2418
  1. 48
      libi2pd/SSU2.cpp
  2. 6
      libi2pd/SSU2.h

48
libi2pd/SSU2.cpp

@ -27,7 +27,7 @@ namespace transport
SSU2Session::SSU2Session (SSU2Server& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter, SSU2Session::SSU2Session (SSU2Server& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter,
std::shared_ptr<const i2p::data::RouterInfo::Address> addr, bool peerTest): std::shared_ptr<const i2p::data::RouterInfo::Address> addr, bool peerTest):
TransportSession (in_RemoteRouter, SSU2_TERMINATION_TIMEOUT), TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT),
m_Server (server), m_Address (addr), m_DestConnID (0), m_SourceConnID (0) m_Server (server), m_Address (addr), m_DestConnID (0), m_SourceConnID (0)
{ {
m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState); m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState);
@ -96,7 +96,7 @@ namespace transport
i2p::crypto::ChaCha20 (headerX, 48, m_Address->i, nonce, headerX); i2p::crypto::ChaCha20 (headerX, 48, m_Address->i, nonce, headerX);
m_NoiseState->MixHash (payload, 24); // h = SHA256(h || 24 byte encrypted payload from Session Request) for SessionCreated m_NoiseState->MixHash (payload, 24); // h = SHA256(h || 24 byte encrypted payload from Session Request) for SessionCreated
// send // send
m_Server.AddPendingOutgoingSession (boost::asio::ip::udp::endpoint (m_Address->host, m_Address->port), shared_from_this ()); m_Server.AddPendingOutgoingSession (m_RemoteEndpoint, shared_from_this ());
m_Server.Send (header.buf, 16, headerX, 48, payload, payloadSize, m_RemoteEndpoint); m_Server.Send (header.buf, 16, headerX, 48, payload, payloadSize, m_RemoteEndpoint);
} }
@ -354,7 +354,8 @@ namespace transport
} }
SSU2Server::SSU2Server (): SSU2Server::SSU2Server ():
RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()) RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()),
m_TerminationTimer (GetService ())
{ {
} }
@ -391,11 +392,15 @@ namespace transport
LogPrint (eLogError, "SSU2: Can't start server because port not specified"); LogPrint (eLogError, "SSU2: Can't start server because port not specified");
} }
} }
ScheduleTermination ();
} }
} }
void SSU2Server::Stop () void SSU2Server::Stop ()
{ {
if (IsRunning ())
m_TerminationTimer.cancel ();
StopIOService (); StopIOService ();
} }
@ -520,5 +525,42 @@ namespace transport
return true; return true;
} }
void SSU2Server::ScheduleTermination ()
{
m_TerminationTimer.expires_from_now (boost::posix_time::seconds(SSU2_TERMINATION_CHECK_TIMEOUT));
m_TerminationTimer.async_wait (std::bind (&SSU2Server::HandleTerminationTimer,
this, std::placeholders::_1));
}
void SSU2Server::HandleTerminationTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto it = m_PendingOutgoingSessions.begin (); it != m_PendingOutgoingSessions.end ();)
{
if (it->second->IsTerminationTimeoutExpired (ts))
{
//it->second->Terminate ();
it = m_PendingOutgoingSessions.erase (it);
}
else
it++;
}
for (auto it = m_Sessions.begin (); it != m_Sessions.end ();)
{
if (it->second->IsTerminationTimeoutExpired (ts))
{
//it->second->Terminate ();
it = m_Sessions.erase (it);
}
else
it++;
}
ScheduleTermination ();
}
}
} }
} }

6
libi2pd/SSU2.h

@ -21,7 +21,9 @@ namespace i2p
{ {
namespace transport namespace transport
{ {
const int SSU2_CONNECT_TIMEOUT = 5; // 5 seconds
const int SSU2_TERMINATION_TIMEOUT = 330; // 5.5 minutes const int SSU2_TERMINATION_TIMEOUT = 330; // 5.5 minutes
const int SSU2_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds
const size_t SSU2_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K const size_t SSU2_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K
const size_t SSU2_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K const size_t SSU2_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K
const size_t SSU2_MTU = 1488; const size_t SSU2_MTU = 1488;
@ -143,12 +145,16 @@ namespace transport
Packet * packet, boost::asio::ip::udp::socket& socket); Packet * packet, boost::asio::ip::udp::socket& socket);
void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
void ScheduleTermination ();
void HandleTerminationTimer (const boost::system::error_code& ecode);
private: private:
boost::asio::ip::udp::socket m_Socket, m_SocketV6; boost::asio::ip::udp::socket m_Socket, m_SocketV6;
std::unordered_map<uint64_t, std::shared_ptr<SSU2Session> > m_Sessions; std::unordered_map<uint64_t, std::shared_ptr<SSU2Session> > m_Sessions;
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions; std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions;
i2p::util::MemoryPoolMt<Packet> m_PacketsPool; i2p::util::MemoryPoolMt<Packet> m_PacketsPool;
boost::asio::deadline_timer m_TerminationTimer;
}; };
} }
} }

Loading…
Cancel
Save