Browse Source

common termination timer for all SSU sessions

pull/623/head
orignal 8 years ago
parent
commit
b02677ee21
  1. 1
      NTCPSession.cpp
  2. 3
      NTCPSession.h
  3. 60
      SSU.cpp
  4. 10
      SSU.h
  5. 44
      SSUSession.cpp
  6. 7
      SSUSession.h
  7. 7
      TransportSession.h

1
NTCPSession.cpp

@ -22,7 +22,6 @@ namespace transport
TransportSession (in_RemoteRouter, NTCP_TERMINATION_TIMEOUT), TransportSession (in_RemoteRouter, NTCP_TERMINATION_TIMEOUT),
m_Server (server), m_Socket (m_Server.GetService ()), m_Server (server), m_Socket (m_Server.GetService ()),
m_IsEstablished (false), m_IsTerminated (false), m_IsEstablished (false), m_IsTerminated (false),
m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ()),
m_ReceiveBufferOffset (0), m_NextMessage (nullptr), m_IsSending (false) m_ReceiveBufferOffset (0), m_NextMessage (nullptr), m_IsSending (false)
{ {
m_Establisher = new Establisher; m_Establisher = new Establisher;

3
NTCPSession.h

@ -55,8 +55,6 @@ namespace transport
boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; };
bool IsEstablished () const { return m_IsEstablished; }; bool IsEstablished () const { return m_IsEstablished; };
bool IsTerminationTimeoutExpired (uint64_t ts) const
{ return ts >= m_LastActivityTimestamp + GetTerminationTimeout (); };
void ClientLogin (); void ClientLogin ();
void ServerLogin (); void ServerLogin ();
@ -103,7 +101,6 @@ namespace transport
NTCPServer& m_Server; NTCPServer& m_Server;
boost::asio::ip::tcp::socket m_Socket; boost::asio::ip::tcp::socket m_Socket;
bool m_IsEstablished, m_IsTerminated; bool m_IsEstablished, m_IsTerminated;
uint64_t m_LastActivityTimestamp;
i2p::crypto::CBCDecryption m_Decryption; i2p::crypto::CBCDecryption m_Decryption;
i2p::crypto::CBCEncryption m_Encryption; i2p::crypto::CBCEncryption m_Encryption;

60
SSU.cpp

@ -17,7 +17,8 @@ namespace transport
m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService),
m_EndpointV6 (addr, port), m_EndpointV6 (addr, port),
m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService),
m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service) m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service),
m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6)
{ {
m_SocketV6.open (boost::asio::ip::udp::v6()); m_SocketV6.open (boost::asio::ip::udp::v6());
m_SocketV6.set_option (boost::asio::ip::v6_only (true)); m_SocketV6.set_option (boost::asio::ip::v6_only (true));
@ -32,7 +33,8 @@ namespace transport
m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService),
m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port), m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port),
m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService),
m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service) m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service),
m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6)
{ {
m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535)); m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535));
@ -59,11 +61,13 @@ namespace transport
{ {
m_Thread = new std::thread (std::bind (&SSUServer::Run, this)); m_Thread = new std::thread (std::bind (&SSUServer::Run, this));
m_ReceiversService.post (std::bind (&SSUServer::Receive, this)); m_ReceiversService.post (std::bind (&SSUServer::Receive, this));
ScheduleTermination ();
} }
if (context.SupportsV6 ()) if (context.SupportsV6 ())
{ {
m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this)); m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this));
m_ReceiversService.post (std::bind (&SSUServer::ReceiveV6, this)); m_ReceiversService.post (std::bind (&SSUServer::ReceiveV6, this));
ScheduleTerminationV6 ();
} }
SchedulePeerTestsCleanupTimer (); SchedulePeerTestsCleanupTimer ();
ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers
@ -640,6 +644,58 @@ namespace transport
SchedulePeerTestsCleanupTimer (); SchedulePeerTestsCleanupTimer ();
} }
} }
void SSUServer::ScheduleTermination ()
{
m_TerminationTimer.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT));
m_TerminationTimer.async_wait (std::bind (&SSUServer::HandleTerminationTimer,
this, std::placeholders::_1));
}
void SSUServer::HandleTerminationTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto& it: m_Sessions)
if (it.second->IsTerminationTimeoutExpired (ts))
{
auto session = it.second;
m_Service.post ([session]
{
LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds");
session->Failed ();
});
}
ScheduleTermination ();
}
}
void SSUServer::ScheduleTerminationV6 ()
{
m_TerminationTimerV6.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT));
m_TerminationTimerV6.async_wait (std::bind (&SSUServer::HandleTerminationTimerV6,
this, std::placeholders::_1));
}
void SSUServer::HandleTerminationTimerV6 (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto& it: m_SessionsV6)
if (it.second->IsTerminationTimeoutExpired (ts))
{
auto session = it.second;
m_ServiceV6.post ([session]
{
LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds");
session->Failed ();
});
}
ScheduleTerminationV6 ();
}
}
} }
} }

10
SSU.h

@ -23,6 +23,7 @@ namespace transport
const int SSU_KEEP_ALIVE_INTERVAL = 30; // 30 seconds const int SSU_KEEP_ALIVE_INTERVAL = 30; // 30 seconds
const int SSU_PEER_TEST_TIMEOUT = 60; // 60 seconds const int SSU_PEER_TEST_TIMEOUT = 60; // 60 seconds
const int SSU_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour const int SSU_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour
const int SSU_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds
const size_t SSU_MAX_NUM_INTRODUCERS = 3; const size_t SSU_MAX_NUM_INTRODUCERS = 3;
struct SSUPacket struct SSUPacket
@ -90,6 +91,12 @@ namespace transport
void SchedulePeerTestsCleanupTimer (); void SchedulePeerTestsCleanupTimer ();
void HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode); void HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode);
// timer
void ScheduleTermination ();
void HandleTerminationTimer (const boost::system::error_code& ecode);
void ScheduleTerminationV6 ();
void HandleTerminationTimerV6 (const boost::system::error_code& ecode);
private: private:
struct PeerTest struct PeerTest
@ -106,7 +113,8 @@ namespace transport
boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork; boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork;
boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6; boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6;
boost::asio::ip::udp::socket m_Socket, m_SocketV6; boost::asio::ip::udp::socket m_Socket, m_SocketV6;
boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer; boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer,
m_TerminationTimer, m_TerminationTimerV6;
std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions, m_SessionsV6; std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions, m_SessionsV6;
std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer

44
SSUSession.cpp

@ -14,7 +14,7 @@ namespace transport
SSUSession::SSUSession (SSUServer& server, boost::asio::ip::udp::endpoint& remoteEndpoint, SSUSession::SSUSession (SSUServer& server, boost::asio::ip::udp::endpoint& remoteEndpoint,
std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest ): std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest ):
TransportSession (router, SSU_TERMINATION_TIMEOUT), TransportSession (router, SSU_TERMINATION_TIMEOUT),
m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_Timer (GetService ()), m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_ConnectTimer (GetService ()),
m_IsPeerTest (peerTest),m_State (eSessionStateUnknown), m_IsSessionKey (false), m_IsPeerTest (peerTest),m_State (eSessionStateUnknown), m_IsSessionKey (false),
m_RelayTag (0),m_Data (*this), m_IsDataReceived (false) m_RelayTag (0),m_Data (*this), m_IsDataReceived (false)
{ {
@ -97,7 +97,7 @@ namespace transport
{ {
if (!len) return; // ignore zero-length packets if (!len) return; // ignore zero-length packets
if (m_State == eSessionStateEstablished) if (m_State == eSessionStateEstablished)
ScheduleTermination (); m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch ();
if (m_IsSessionKey && Validate (buf, len, m_MacKey)) // try session key first if (m_IsSessionKey && Validate (buf, len, m_MacKey)) // try session key first
DecryptSessionKey (buf, len); DecryptSessionKey (buf, len);
@ -229,7 +229,7 @@ namespace transport
} }
LogPrint (eLogDebug, "SSU message: session created"); LogPrint (eLogDebug, "SSU message: session created");
m_Timer.cancel (); // connect timer m_ConnectTimer.cancel (); // connect timer
SignedData s; // x,y, our IP, our port, remote IP, remote port, relayTag, signed on time SignedData s; // x,y, our IP, our port, remote IP, remote port, relayTag, signed on time
auto headerSize = GetSSUHeaderSize (buf); auto headerSize = GetSSUHeaderSize (buf);
if (headerSize >= len) if (headerSize >= len)
@ -804,9 +804,9 @@ namespace transport
void SSUSession::ScheduleConnectTimer () void SSUSession::ScheduleConnectTimer ()
{ {
m_Timer.cancel (); m_ConnectTimer.cancel ();
m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); m_ConnectTimer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT));
m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer, m_ConnectTimer.async_wait (std::bind (&SSUSession::HandleConnectTimer,
shared_from_this (), std::placeholders::_1)); shared_from_this (), std::placeholders::_1));
} }
@ -826,8 +826,8 @@ namespace transport
if (m_State == eSessionStateUnknown) if (m_State == eSessionStateUnknown)
{ {
// set connect timer // set connect timer
m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); m_ConnectTimer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT));
m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer, m_ConnectTimer.async_wait (std::bind (&SSUSession::HandleConnectTimer,
shared_from_this (), std::placeholders::_1)); shared_from_this (), std::placeholders::_1));
} }
uint32_t nonce; uint32_t nonce;
@ -840,8 +840,8 @@ namespace transport
{ {
m_State = eSessionStateIntroduced; m_State = eSessionStateIntroduced;
// set connect timer // set connect timer
m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); m_ConnectTimer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT));
m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer, m_ConnectTimer.async_wait (std::bind (&SSUSession::HandleConnectTimer,
shared_from_this (), std::placeholders::_1)); shared_from_this (), std::placeholders::_1));
} }
@ -851,7 +851,7 @@ namespace transport
SendSesionDestroyed (); SendSesionDestroyed ();
transports.PeerDisconnected (shared_from_this ()); transports.PeerDisconnected (shared_from_this ());
m_Data.Stop (); m_Data.Stop ();
m_Timer.cancel (); m_ConnectTimer.cancel ();
} }
void SSUSession::Done () void SSUSession::Done ()
@ -868,7 +868,7 @@ namespace transport
transports.PeerConnected (shared_from_this ()); transports.PeerConnected (shared_from_this ());
if (m_IsPeerTest) if (m_IsPeerTest)
SendPeerTest (); SendPeerTest ();
ScheduleTermination (); m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch ();
} }
void SSUSession::Failed () void SSUSession::Failed ()
@ -880,24 +880,6 @@ namespace transport
} }
} }
void SSUSession::ScheduleTermination ()
{
m_Timer.cancel ();
m_Timer.expires_from_now (boost::posix_time::seconds(GetTerminationTimeout ()));
m_Timer.async_wait (std::bind (&SSUSession::HandleTerminationTimer,
shared_from_this (), std::placeholders::_1));
}
void SSUSession::HandleTerminationTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogWarning, "SSU: no activity with ", m_RemoteEndpoint, " for ", GetTerminationTimeout (), " seconds");
Failed ();
}
}
void SSUSession::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) void SSUSession::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
{ {
GetService ().post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs)); GetService ().post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs));
@ -1126,7 +1108,7 @@ namespace transport
FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, 48); FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, 48);
Send (buf, 48); Send (buf, 48);
LogPrint (eLogDebug, "SSU: keep-alive sent"); LogPrint (eLogDebug, "SSU: keep-alive sent");
ScheduleTermination (); m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch ();
} }
} }

7
SSUSession.h

@ -77,6 +77,7 @@ namespace transport
void WaitForIntroduction (); void WaitForIntroduction ();
void Close (); void Close ();
void Done (); void Done ();
void Failed ();
boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; }; boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; };
bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); }; bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); };
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs); void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
@ -114,7 +115,6 @@ namespace transport
void ProcessRelayResponse (const uint8_t * buf, size_t len); void ProcessRelayResponse (const uint8_t * buf, size_t len);
void ProcessRelayIntro (const uint8_t * buf, size_t len); void ProcessRelayIntro (const uint8_t * buf, size_t len);
void Established (); void Established ();
void Failed ();
void ScheduleConnectTimer (); void ScheduleConnectTimer ();
void HandleConnectTimer (const boost::system::error_code& ecode); void HandleConnectTimer (const boost::system::error_code& ecode);
void ProcessPeerTest (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void ProcessPeerTest (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
@ -131,15 +131,12 @@ namespace transport
void DecryptSessionKey (uint8_t * buf, size_t len); void DecryptSessionKey (uint8_t * buf, size_t len);
bool Validate (uint8_t * buf, size_t len, const i2p::crypto::MACKey& macKey); bool Validate (uint8_t * buf, size_t len, const i2p::crypto::MACKey& macKey);
void ScheduleTermination ();
void HandleTerminationTimer (const boost::system::error_code& ecode);
private: private:
friend class SSUData; // TODO: change in later friend class SSUData; // TODO: change in later
SSUServer& m_Server; SSUServer& m_Server;
boost::asio::ip::udp::endpoint m_RemoteEndpoint; boost::asio::ip::udp::endpoint m_RemoteEndpoint;
boost::asio::deadline_timer m_Timer; boost::asio::deadline_timer m_ConnectTimer;
bool m_IsPeerTest; bool m_IsPeerTest;
SessionState m_State; SessionState m_State;
bool m_IsSessionKey; bool m_IsSessionKey;

7
TransportSession.h

@ -9,6 +9,7 @@
#include "Crypto.h" #include "Crypto.h"
#include "RouterInfo.h" #include "RouterInfo.h"
#include "I2NPProtocol.h" #include "I2NPProtocol.h"
#include "Timestamp.h"
namespace i2p namespace i2p
{ {
@ -54,7 +55,8 @@ namespace transport
public: public:
TransportSession (std::shared_ptr<const i2p::data::RouterInfo> router, int terminationTimeout): TransportSession (std::shared_ptr<const i2p::data::RouterInfo> router, int terminationTimeout):
m_DHKeysPair (nullptr), m_NumSentBytes (0), m_NumReceivedBytes (0), m_IsOutgoing (router), m_TerminationTimeout (terminationTimeout) m_DHKeysPair (nullptr), m_NumSentBytes (0), m_NumReceivedBytes (0), m_IsOutgoing (router), m_TerminationTimeout (terminationTimeout),
m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ())
{ {
if (router) if (router)
m_RemoteIdentity = router->GetRouterIdentity (); m_RemoteIdentity = router->GetRouterIdentity ();
@ -72,6 +74,8 @@ namespace transport
int GetTerminationTimeout () const { return m_TerminationTimeout; }; int GetTerminationTimeout () const { return m_TerminationTimeout; };
void SetTerminationTimeout (int terminationTimeout) { m_TerminationTimeout = terminationTimeout; }; void SetTerminationTimeout (int terminationTimeout) { m_TerminationTimeout = terminationTimeout; };
bool IsTerminationTimeoutExpired (uint64_t ts) const
{ return ts >= m_LastActivityTimestamp + GetTerminationTimeout (); };
virtual void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) = 0; virtual void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) = 0;
@ -82,6 +86,7 @@ namespace transport
size_t m_NumSentBytes, m_NumReceivedBytes; size_t m_NumSentBytes, m_NumReceivedBytes;
bool m_IsOutgoing; bool m_IsOutgoing;
int m_TerminationTimeout; int m_TerminationTimeout;
uint64_t m_LastActivityTimestamp;
}; };
} }
} }

Loading…
Cancel
Save