diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 966bd267..40d06b73 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -27,7 +27,7 @@ namespace transport SSU2Session::SSU2Session (SSU2Server& server, std::shared_ptr in_RemoteRouter, std::shared_ptr addr, bool peerTest): - TransportSession (in_RemoteRouter, SSU2_TERMINATION_TIMEOUT), + TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT), m_Server (server), m_Address (addr), m_DestConnID (0), m_SourceConnID (0) { m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState); @@ -96,7 +96,7 @@ namespace transport i2p::crypto::ChaCha20 (headerX, 48, m_Address->i, nonce, headerX); m_NoiseState->MixHash (payload, 24); // h = SHA256(h || 24 byte encrypted payload from Session Request) for SessionCreated // send - m_Server.AddPendingOutgoingSession (boost::asio::ip::udp::endpoint (m_Address->host, m_Address->port), shared_from_this ()); + m_Server.AddPendingOutgoingSession (m_RemoteEndpoint, shared_from_this ()); m_Server.Send (header.buf, 16, headerX, 48, payload, payloadSize, m_RemoteEndpoint); } @@ -354,7 +354,8 @@ namespace transport } SSU2Server::SSU2Server (): - RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()) + RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()), + m_TerminationTimer (GetService ()) { } @@ -390,12 +391,16 @@ namespace transport else LogPrint (eLogError, "SSU2: Can't start server because port not specified"); } - } + } + ScheduleTermination (); } } void SSU2Server::Stop () { + if (IsRunning ()) + m_TerminationTimer.cancel (); + StopIOService (); } @@ -519,6 +524,43 @@ namespace transport return false; return true; } - + + void SSU2Server::ScheduleTermination () + { + m_TerminationTimer.expires_from_now (boost::posix_time::seconds(SSU2_TERMINATION_CHECK_TIMEOUT)); + m_TerminationTimer.async_wait (std::bind (&SSU2Server::HandleTerminationTimer, + this, std::placeholders::_1)); + } + + void SSU2Server::HandleTerminationTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + auto ts = i2p::util::GetSecondsSinceEpoch (); + for (auto it = m_PendingOutgoingSessions.begin (); it != m_PendingOutgoingSessions.end ();) + { + if (it->second->IsTerminationTimeoutExpired (ts)) + { + //it->second->Terminate (); + it = m_PendingOutgoingSessions.erase (it); + } + else + it++; + } + + for (auto it = m_Sessions.begin (); it != m_Sessions.end ();) + { + if (it->second->IsTerminationTimeoutExpired (ts)) + { + //it->second->Terminate (); + it = m_Sessions.erase (it); + } + else + it++; + } + + ScheduleTermination (); + } + } } } diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 14714388..5dd1d14b 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -21,7 +21,9 @@ namespace i2p { namespace transport { + const int SSU2_CONNECT_TIMEOUT = 5; // 5 seconds const int SSU2_TERMINATION_TIMEOUT = 330; // 5.5 minutes + const int SSU2_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds const size_t SSU2_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K const size_t SSU2_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K const size_t SSU2_MTU = 1488; @@ -142,6 +144,9 @@ namespace transport void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, Packet * packet, boost::asio::ip::udp::socket& socket); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); + + void ScheduleTermination (); + void HandleTerminationTimer (const boost::system::error_code& ecode); private: @@ -149,6 +154,7 @@ namespace transport std::unordered_map > m_Sessions; std::map > m_PendingOutgoingSessions; i2p::util::MemoryPoolMt m_PacketsPool; + boost::asio::deadline_timer m_TerminationTimer; }; } }