diff --git a/NTCPSession.cpp b/NTCPSession.cpp index b22b0f3c..a1b26aca 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -21,7 +21,8 @@ namespace transport NTCPSession::NTCPSession (NTCPServer& server, std::shared_ptr in_RemoteRouter): TransportSession (in_RemoteRouter, NTCP_TERMINATION_TIMEOUT), m_Server (server), m_Socket (m_Server.GetService ()), - m_TerminationTimer (m_Server.GetService ()), m_IsEstablished (false), m_IsTerminated (false), + m_IsEstablished (false), m_IsTerminated (false), + m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ()), m_ReceiveBufferOffset (0), m_NextMessage (nullptr), m_IsSending (false) { m_Establisher = new Establisher; @@ -78,7 +79,6 @@ namespace transport m_Server.RemoveNTCPSession (shared_from_this ()); m_SendQueue.clear (); m_NextMessage = nullptr; - m_TerminationTimer.cancel (); LogPrint (eLogDebug, "NTCP: session terminated"); } } @@ -110,7 +110,6 @@ namespace transport boost::asio::async_write (m_Socket, boost::asio::buffer (&m_Establisher->phase1, sizeof (NTCPPhase1)), boost::asio::transfer_all (), std::bind(&NTCPSession::HandlePhase1Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - ScheduleTermination (); } void NTCPSession::ServerLogin () @@ -124,7 +123,6 @@ namespace transport boost::asio::async_read (m_Socket, boost::asio::buffer(&m_Establisher->phase1, sizeof (NTCPPhase1)), boost::asio::transfer_all (), std::bind(&NTCPSession::HandlePhase1Received, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - ScheduleTermination (); } } @@ -558,7 +556,7 @@ namespace transport m_Handler.Flush (); } - ScheduleTermination (); // reset termination timer + m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); Receive (); } } @@ -685,6 +683,7 @@ namespace transport } else { + m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); m_NumSentBytes += bytes_transferred; i2p::transport::transports.UpdateSentBytes (bytes_transferred); if (!m_SendQueue.empty()) @@ -692,8 +691,6 @@ namespace transport Send (m_SendQueue); m_SendQueue.clear (); } - else - ScheduleTermination (); // reset termination timer } } @@ -728,29 +725,11 @@ namespace transport else Send (msgs); } - - void NTCPSession::ScheduleTermination () - { - m_TerminationTimer.cancel (); - m_TerminationTimer.expires_from_now (boost::posix_time::seconds(GetTerminationTimeout ())); - m_TerminationTimer.async_wait (std::bind (&NTCPSession::HandleTerminationTimer, - shared_from_this (), std::placeholders::_1)); - } - - void NTCPSession::HandleTerminationTimer (const boost::system::error_code& ecode) - { - if (ecode != boost::asio::error::operation_aborted) - { - LogPrint (eLogDebug, "NTCP: No activity for ", GetTerminationTimeout (), " seconds"); - //Terminate (); - m_Socket.close ();// invoke Terminate () from HandleReceive - } - } //----------------------------------------- NTCPServer::NTCPServer (): m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), - m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr) + m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr) { } @@ -809,7 +788,8 @@ namespace transport } } } - } + } + ScheduleTermination (); } } @@ -820,13 +800,17 @@ namespace transport if (m_IsRunning) { m_IsRunning = false; - if (m_NTCPAcceptor) - delete m_NTCPAcceptor; - m_NTCPAcceptor = nullptr; - if (m_NTCPV6Acceptor) - delete m_NTCPV6Acceptor; - m_NTCPV6Acceptor = nullptr; - + m_TerminationTimer.cancel (); + if (m_NTCPAcceptor) + { + delete m_NTCPAcceptor; + m_NTCPAcceptor = nullptr; + } + if (m_NTCPV6Acceptor) + { + delete m_NTCPV6Acceptor; + m_NTCPV6Acceptor = nullptr; + } m_Service.stop (); if (m_Thread) { @@ -989,5 +973,31 @@ namespace transport m_BanList[addr] = ts + NTCP_BAN_EXPIRATION_TIMEOUT; LogPrint (eLogWarning, "NTCP: ", addr, " has been banned for ", NTCP_BAN_EXPIRATION_TIMEOUT, " seconds"); } + + void NTCPServer::ScheduleTermination () + { + m_TerminationTimer.expires_from_now (boost::posix_time::seconds(NTCP_TERMINATION_CHECK_TIMEOUT)); + m_TerminationTimer.async_wait (std::bind (&NTCPServer::HandleTerminationTimer, + this, std::placeholders::_1)); + } + + void NTCPServer::HandleTerminationTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + auto ts = i2p::util::GetSecondsSinceEpoch (); + for (auto& it: m_NTCPSessions) + if (it.second->IsTerminationTimeoutExpired (ts)) + { + auto session = it.second; + m_Service.post ([session] + { + LogPrint (eLogDebug, "NTCP: No activity for ", session->GetTerminationTimeout (), " seconds"); + session->Terminate (); + }); + } + ScheduleTermination (); + } + } } } diff --git a/NTCPSession.h b/NTCPSession.h index 59b6bb58..adf33553 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -37,6 +37,7 @@ namespace transport const size_t NTCP_MAX_MESSAGE_SIZE = 16384; const size_t NTCP_BUFFER_SIZE = 4160; // fits 4 tunnel messages (4*1028) const int NTCP_TERMINATION_TIMEOUT = 120; // 2 minutes + const int NTCP_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448 const int NTCP_BAN_EXPIRATION_TIMEOUT = 70; // in second const int NTCP_CLOCK_SKEW = 60; // in seconds @@ -54,7 +55,9 @@ namespace transport boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; bool IsEstablished () const { return m_IsEstablished; }; - + bool IsTerminationTimeoutExpired (uint64_t ts) const + { return ts >= m_LastActivityTimestamp + GetTerminationTimeout (); }; + void ClientLogin (); void ServerLogin (); void SendI2NPMessages (const std::vector >& msgs); @@ -95,17 +98,12 @@ namespace transport void Send (const std::vector >& msgs); void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector > msgs); - - // timer - void ScheduleTermination (); - void HandleTerminationTimer (const boost::system::error_code& ecode); - private: NTCPServer& m_Server; boost::asio::ip::tcp::socket m_Socket; - boost::asio::deadline_timer m_TerminationTimer; bool m_IsEstablished, m_IsTerminated; + uint64_t m_LastActivityTimestamp; i2p::crypto::CBCDecryption m_Decryption; i2p::crypto::CBCEncryption m_Encryption; @@ -146,8 +144,8 @@ namespace transport std::shared_ptr FindNTCPSession (const i2p::data::IdentHash& ident); void Connect (const boost::asio::ip::address& address, int port, std::shared_ptr conn); - bool IsBoundV4() const { return m_NTCPAcceptor != nullptr; }; - bool IsBoundV6() const { return m_NTCPV6Acceptor != nullptr; }; + bool IsBoundV4() const { return m_NTCPAcceptor != nullptr; }; + bool IsBoundV6() const { return m_NTCPV6Acceptor != nullptr; }; boost::asio::io_service& GetService () { return m_Service; }; void Ban (const boost::asio::ip::address& addr); @@ -159,6 +157,10 @@ namespace transport void HandleAcceptV6 (std::shared_ptr conn, const boost::system::error_code& error); void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn); + + // timer + void ScheduleTermination (); + void HandleTerminationTimer (const boost::system::error_code& ecode); private: @@ -166,6 +168,7 @@ namespace transport std::thread * m_Thread; boost::asio::io_service m_Service; boost::asio::io_service::work m_Work; + boost::asio::deadline_timer m_TerminationTimer; boost::asio::ip::tcp::acceptor * m_NTCPAcceptor, * m_NTCPV6Acceptor; std::map > m_NTCPSessions; // access from m_Thread only std::map m_BanList; // IP -> ban expiration time in seconds