From b02677ee2156797d5e23943e04f78b8e4b1d1125 Mon Sep 17 00:00:00 2001 From: orignal Date: Wed, 24 Aug 2016 11:21:49 -0400 Subject: [PATCH] common termination timer for all SSU sessions --- NTCPSession.cpp | 1 - NTCPSession.h | 5 +--- SSU.cpp | 60 ++++++++++++++++++++++++++++++++++++++++++++-- SSU.h | 10 +++++++- SSUSession.cpp | 44 ++++++++++------------------------ SSUSession.h | 7 ++---- TransportSession.h | 7 +++++- 7 files changed, 89 insertions(+), 45 deletions(-) diff --git a/NTCPSession.cpp b/NTCPSession.cpp index a1b26aca..41252f97 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -22,7 +22,6 @@ namespace transport TransportSession (in_RemoteRouter, NTCP_TERMINATION_TIMEOUT), m_Server (server), m_Socket (m_Server.GetService ()), m_IsEstablished (false), m_IsTerminated (false), - m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ()), m_ReceiveBufferOffset (0), m_NextMessage (nullptr), m_IsSending (false) { m_Establisher = new Establisher; diff --git a/NTCPSession.h b/NTCPSession.h index adf33553..a3d95c62 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -54,9 +54,7 @@ namespace transport void Done (); boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; - bool IsEstablished () const { return m_IsEstablished; }; - bool IsTerminationTimeoutExpired (uint64_t ts) const - { return ts >= m_LastActivityTimestamp + GetTerminationTimeout (); }; + bool IsEstablished () const { return m_IsEstablished; }; void ClientLogin (); void ServerLogin (); @@ -103,7 +101,6 @@ namespace transport NTCPServer& m_Server; boost::asio::ip::tcp::socket m_Socket; bool m_IsEstablished, m_IsTerminated; - uint64_t m_LastActivityTimestamp; i2p::crypto::CBCDecryption m_Decryption; i2p::crypto::CBCEncryption m_Encryption; diff --git a/SSU.cpp b/SSU.cpp index 0c3662d3..3d3fefdd 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -17,7 +17,8 @@ namespace transport m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), m_EndpointV6 (addr, port), m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), - m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service) + m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), + m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) { m_SocketV6.open (boost::asio::ip::udp::v6()); m_SocketV6.set_option (boost::asio::ip::v6_only (true)); @@ -32,7 +33,8 @@ namespace transport m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port), m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), - m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service) + m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), + m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) { m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535)); @@ -59,11 +61,13 @@ namespace transport { m_Thread = new std::thread (std::bind (&SSUServer::Run, this)); m_ReceiversService.post (std::bind (&SSUServer::Receive, this)); + ScheduleTermination (); } if (context.SupportsV6 ()) { m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this)); m_ReceiversService.post (std::bind (&SSUServer::ReceiveV6, this)); + ScheduleTerminationV6 (); } SchedulePeerTestsCleanupTimer (); ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers @@ -640,6 +644,58 @@ namespace transport SchedulePeerTestsCleanupTimer (); } } + + void SSUServer::ScheduleTermination () + { + m_TerminationTimer.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT)); + m_TerminationTimer.async_wait (std::bind (&SSUServer::HandleTerminationTimer, + this, std::placeholders::_1)); + } + + void SSUServer::HandleTerminationTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + auto ts = i2p::util::GetSecondsSinceEpoch (); + for (auto& it: m_Sessions) + if (it.second->IsTerminationTimeoutExpired (ts)) + { + auto session = it.second; + m_Service.post ([session] + { + LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds"); + session->Failed (); + }); + } + ScheduleTermination (); + } + } + + void SSUServer::ScheduleTerminationV6 () + { + m_TerminationTimerV6.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT)); + m_TerminationTimerV6.async_wait (std::bind (&SSUServer::HandleTerminationTimerV6, + this, std::placeholders::_1)); + } + + void SSUServer::HandleTerminationTimerV6 (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + auto ts = i2p::util::GetSecondsSinceEpoch (); + for (auto& it: m_SessionsV6) + if (it.second->IsTerminationTimeoutExpired (ts)) + { + auto session = it.second; + m_ServiceV6.post ([session] + { + LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds"); + session->Failed (); + }); + } + ScheduleTerminationV6 (); + } + } } } diff --git a/SSU.h b/SSU.h index d779c025..182fa0eb 100644 --- a/SSU.h +++ b/SSU.h @@ -23,6 +23,7 @@ namespace transport const int SSU_KEEP_ALIVE_INTERVAL = 30; // 30 seconds const int SSU_PEER_TEST_TIMEOUT = 60; // 60 seconds const int SSU_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour + const int SSU_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds const size_t SSU_MAX_NUM_INTRODUCERS = 3; struct SSUPacket @@ -90,6 +91,12 @@ namespace transport void SchedulePeerTestsCleanupTimer (); void HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode); + // timer + void ScheduleTermination (); + void HandleTerminationTimer (const boost::system::error_code& ecode); + void ScheduleTerminationV6 (); + void HandleTerminationTimerV6 (const boost::system::error_code& ecode); + private: struct PeerTest @@ -106,7 +113,8 @@ namespace transport boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork; boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6; boost::asio::ip::udp::socket m_Socket, m_SocketV6; - boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer; + boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer, + m_TerminationTimer, m_TerminationTimerV6; std::list m_Introducers; // introducers we are connected to std::map > m_Sessions, m_SessionsV6; std::map m_Relays; // we are introducer diff --git a/SSUSession.cpp b/SSUSession.cpp index bf4058a5..9245f652 100644 --- a/SSUSession.cpp +++ b/SSUSession.cpp @@ -14,7 +14,7 @@ namespace transport SSUSession::SSUSession (SSUServer& server, boost::asio::ip::udp::endpoint& remoteEndpoint, std::shared_ptr router, bool peerTest ): TransportSession (router, SSU_TERMINATION_TIMEOUT), - m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_Timer (GetService ()), + m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_ConnectTimer (GetService ()), m_IsPeerTest (peerTest),m_State (eSessionStateUnknown), m_IsSessionKey (false), m_RelayTag (0),m_Data (*this), m_IsDataReceived (false) { @@ -97,7 +97,7 @@ namespace transport { if (!len) return; // ignore zero-length packets if (m_State == eSessionStateEstablished) - ScheduleTermination (); + m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); if (m_IsSessionKey && Validate (buf, len, m_MacKey)) // try session key first DecryptSessionKey (buf, len); @@ -229,7 +229,7 @@ namespace transport } LogPrint (eLogDebug, "SSU message: session created"); - m_Timer.cancel (); // connect timer + m_ConnectTimer.cancel (); // connect timer SignedData s; // x,y, our IP, our port, remote IP, remote port, relayTag, signed on time auto headerSize = GetSSUHeaderSize (buf); if (headerSize >= len) @@ -804,9 +804,9 @@ namespace transport void SSUSession::ScheduleConnectTimer () { - m_Timer.cancel (); - m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); - m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer, + m_ConnectTimer.cancel (); + m_ConnectTimer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); + m_ConnectTimer.async_wait (std::bind (&SSUSession::HandleConnectTimer, shared_from_this (), std::placeholders::_1)); } @@ -826,8 +826,8 @@ namespace transport if (m_State == eSessionStateUnknown) { // set connect timer - m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); - m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer, + m_ConnectTimer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); + m_ConnectTimer.async_wait (std::bind (&SSUSession::HandleConnectTimer, shared_from_this (), std::placeholders::_1)); } uint32_t nonce; @@ -840,8 +840,8 @@ namespace transport { m_State = eSessionStateIntroduced; // set connect timer - m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); - m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer, + m_ConnectTimer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); + m_ConnectTimer.async_wait (std::bind (&SSUSession::HandleConnectTimer, shared_from_this (), std::placeholders::_1)); } @@ -851,7 +851,7 @@ namespace transport SendSesionDestroyed (); transports.PeerDisconnected (shared_from_this ()); m_Data.Stop (); - m_Timer.cancel (); + m_ConnectTimer.cancel (); } void SSUSession::Done () @@ -868,7 +868,7 @@ namespace transport transports.PeerConnected (shared_from_this ()); if (m_IsPeerTest) SendPeerTest (); - ScheduleTermination (); + m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); } void SSUSession::Failed () @@ -880,24 +880,6 @@ namespace transport } } - void SSUSession::ScheduleTermination () - { - m_Timer.cancel (); - m_Timer.expires_from_now (boost::posix_time::seconds(GetTerminationTimeout ())); - m_Timer.async_wait (std::bind (&SSUSession::HandleTerminationTimer, - shared_from_this (), std::placeholders::_1)); - } - - void SSUSession::HandleTerminationTimer (const boost::system::error_code& ecode) - { - if (ecode != boost::asio::error::operation_aborted) - { - LogPrint (eLogWarning, "SSU: no activity with ", m_RemoteEndpoint, " for ", GetTerminationTimeout (), " seconds"); - Failed (); - } - } - - void SSUSession::SendI2NPMessages (const std::vector >& msgs) { GetService ().post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs)); @@ -1126,7 +1108,7 @@ namespace transport FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, 48); Send (buf, 48); LogPrint (eLogDebug, "SSU: keep-alive sent"); - ScheduleTermination (); + m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); } } diff --git a/SSUSession.h b/SSUSession.h index c2f24ce3..69669187 100644 --- a/SSUSession.h +++ b/SSUSession.h @@ -77,6 +77,7 @@ namespace transport void WaitForIntroduction (); void Close (); void Done (); + void Failed (); boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; }; bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); }; void SendI2NPMessages (const std::vector >& msgs); @@ -114,7 +115,6 @@ namespace transport void ProcessRelayResponse (const uint8_t * buf, size_t len); void ProcessRelayIntro (const uint8_t * buf, size_t len); void Established (); - void Failed (); void ScheduleConnectTimer (); void HandleConnectTimer (const boost::system::error_code& ecode); void ProcessPeerTest (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); @@ -131,15 +131,12 @@ namespace transport void DecryptSessionKey (uint8_t * buf, size_t len); bool Validate (uint8_t * buf, size_t len, const i2p::crypto::MACKey& macKey); - void ScheduleTermination (); - void HandleTerminationTimer (const boost::system::error_code& ecode); - private: friend class SSUData; // TODO: change in later SSUServer& m_Server; boost::asio::ip::udp::endpoint m_RemoteEndpoint; - boost::asio::deadline_timer m_Timer; + boost::asio::deadline_timer m_ConnectTimer; bool m_IsPeerTest; SessionState m_State; bool m_IsSessionKey; diff --git a/TransportSession.h b/TransportSession.h index 1b057bc7..9c97d02e 100644 --- a/TransportSession.h +++ b/TransportSession.h @@ -9,6 +9,7 @@ #include "Crypto.h" #include "RouterInfo.h" #include "I2NPProtocol.h" +#include "Timestamp.h" namespace i2p { @@ -54,7 +55,8 @@ namespace transport public: TransportSession (std::shared_ptr router, int terminationTimeout): - m_DHKeysPair (nullptr), m_NumSentBytes (0), m_NumReceivedBytes (0), m_IsOutgoing (router), m_TerminationTimeout (terminationTimeout) + m_DHKeysPair (nullptr), m_NumSentBytes (0), m_NumReceivedBytes (0), m_IsOutgoing (router), m_TerminationTimeout (terminationTimeout), + m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ()) { if (router) m_RemoteIdentity = router->GetRouterIdentity (); @@ -72,6 +74,8 @@ namespace transport int GetTerminationTimeout () const { return m_TerminationTimeout; }; void SetTerminationTimeout (int terminationTimeout) { m_TerminationTimeout = terminationTimeout; }; + bool IsTerminationTimeoutExpired (uint64_t ts) const + { return ts >= m_LastActivityTimestamp + GetTerminationTimeout (); }; virtual void SendI2NPMessages (const std::vector >& msgs) = 0; @@ -82,6 +86,7 @@ namespace transport size_t m_NumSentBytes, m_NumReceivedBytes; bool m_IsOutgoing; int m_TerminationTimeout; + uint64_t m_LastActivityTimestamp; }; } }