Browse Source

NTCP2 idle timeout

pull/1221/head
orignal 6 years ago
parent
commit
50cd321818
  1. 78
      libi2pd/NTCP2.cpp
  2. 16
      libi2pd/NTCP2.h

78
libi2pd/NTCP2.cpp

@ -139,7 +139,7 @@ namespace transport
} }
NTCP2Session::NTCP2Session (NTCP2Server& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter): NTCP2Session::NTCP2Session (NTCP2Server& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter):
TransportSession (in_RemoteRouter, 30), TransportSession (in_RemoteRouter, NTCP2_ESTABLISH_TIMEOUT),
m_Server (server), m_Socket (m_Server.GetService ()), m_Server (server), m_Socket (m_Server.GetService ()),
m_IsEstablished (false), m_IsTerminated (false), m_IsEstablished (false), m_IsTerminated (false),
m_SessionRequestBuffer (nullptr), m_SessionCreatedBuffer (nullptr), m_SessionConfirmedBuffer (nullptr), m_SessionRequestBuffer (nullptr), m_SessionCreatedBuffer (nullptr), m_SessionConfirmedBuffer (nullptr),
@ -183,6 +183,11 @@ namespace transport
} }
} }
void NTCP2Session::TerminateByTimeout ()
{
SendTerminationAndTerminate (eNTCP2IdleTimeout);
}
void NTCP2Session::Done () void NTCP2Session::Done ()
{ {
m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ()));
@ -192,6 +197,7 @@ namespace transport
{ {
m_IsEstablished = true; m_IsEstablished = true;
m_Establisher.reset (nullptr); m_Establisher.reset (nullptr);
SetTerminationTimeout (NTCP2_TERMINATION_TIMEOUT);
transports.PeerConnected (shared_from_this ()); transports.PeerConnected (shared_from_this ());
} }
@ -666,6 +672,7 @@ namespace transport
} }
else else
{ {
m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch ();
m_NumReceivedBytes += bytes_transferred + 2; // + length m_NumReceivedBytes += bytes_transferred + 2; // + length
i2p::transport::transports.UpdateReceivedBytes (bytes_transferred); i2p::transport::transports.UpdateReceivedBytes (bytes_transferred);
uint8_t nonce[12]; uint8_t nonce[12];
@ -679,8 +686,8 @@ namespace transport
} }
else else
{ {
LogPrint (eLogWarning, "NTCP2: Received MAC verification failed "); LogPrint (eLogWarning, "NTCP2: Received AEAD verification failed ");
Terminate (); SendTerminationAndTerminate (eNTCP2DataPhaseAEADFailure);
} }
delete[] decrypted; delete[] decrypted;
} }
@ -764,6 +771,7 @@ namespace transport
void NTCP2Session::HandleNextFrameSent (const boost::system::error_code& ecode, std::size_t bytes_transferred) void NTCP2Session::HandleNextFrameSent (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{ {
m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch ();
m_NumSentBytes += bytes_transferred; m_NumSentBytes += bytes_transferred;
i2p::transport::transports.UpdateSentBytes (bytes_transferred); i2p::transport::transports.UpdateSentBytes (bytes_transferred);
delete[] m_NextSendBuffer; m_NextSendBuffer = nullptr; delete[] m_NextSendBuffer; m_NextSendBuffer = nullptr;
@ -857,7 +865,8 @@ namespace transport
} }
NTCP2Server::NTCP2Server (): NTCP2Server::NTCP2Server ():
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service) m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service),
m_TerminationTimer (m_Service)
{ {
} }
@ -914,6 +923,7 @@ namespace transport
} }
} }
} }
ScheduleTermination ();
} }
} }
@ -993,13 +1003,27 @@ namespace transport
{ {
if (this->AddNTCP2Session (conn)) if (this->AddNTCP2Session (conn))
{ {
conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port), std::bind (&NTCP2Server::HandleConnect, this, std::placeholders::_1, conn)); auto timer = std::make_shared<boost::asio::deadline_timer>(m_Service);
auto timeout = NTCP2_CONNECT_TIMEOUT * 5;
conn->SetTerminationTimeout(timeout * 2);
timer->expires_from_now (boost::posix_time::seconds(timeout));
timer->async_wait ([conn, timeout](const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogInfo, "NTCP2: Not connected in ", timeout, " seconds");
//i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ()->GetIdentHash (), true);
conn->Terminate ();
}
});
conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port), std::bind (&NTCP2Server::HandleConnect, this, std::placeholders::_1, conn, timer));
} }
}); });
} }
void NTCP2Server::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCP2Session> conn) void NTCP2Server::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCP2Session> conn, std::shared_ptr<boost::asio::deadline_timer> timer)
{ {
timer->cancel ();
if (ecode) if (ecode)
{ {
LogPrint (eLogInfo, "NTCP2: Connect error ", ecode.message ()); LogPrint (eLogInfo, "NTCP2: Connect error ", ecode.message ());
@ -1024,7 +1048,7 @@ namespace transport
if (conn) if (conn)
{ {
conn->ServerLogin (); conn->ServerLogin ();
// m_PendingIncomingSessions.push_back (conn); m_PendingIncomingSessions.push_back (conn);
} }
} }
else else
@ -1051,7 +1075,7 @@ namespace transport
if (conn) if (conn)
{ {
conn->ServerLogin (); conn->ServerLogin ();
// m_PendingIncomingSessions.push_back (conn); m_PendingIncomingSessions.push_back (conn);
} }
} }
else else
@ -1065,6 +1089,44 @@ namespace transport
conn, std::placeholders::_1)); conn, std::placeholders::_1));
} }
} }
void NTCP2Server::ScheduleTermination ()
{
m_TerminationTimer.expires_from_now (boost::posix_time::seconds(NTCP2_TERMINATION_CHECK_TIMEOUT));
m_TerminationTimer.async_wait (std::bind (&NTCP2Server::HandleTerminationTimer,
this, std::placeholders::_1));
}
void NTCP2Server::HandleTerminationTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
// established
for (auto& it: m_NTCP2Sessions)
if (it.second->IsTerminationTimeoutExpired (ts))
{
auto session = it.second;
LogPrint (eLogDebug, "NTCP2: No activity for ", session->GetTerminationTimeout (), " seconds");
session->TerminateByTimeout (); // it doesn't change m_NTCP2Session right a way
}
// pending
for (auto it = m_PendingIncomingSessions.begin (); it != m_PendingIncomingSessions.end ();)
{
if ((*it)->IsEstablished () || (*it)->IsTerminated ())
it = m_PendingIncomingSessions.erase (it); // established or terminated
else if ((*it)->IsTerminationTimeoutExpired (ts))
{
(*it)->Terminate ();
it = m_PendingIncomingSessions.erase (it); // expired
}
else
it++;
}
ScheduleTermination ();
}
}
} }
} }

16
libi2pd/NTCP2.h

@ -18,6 +18,12 @@ namespace transport
const size_t NTCP2_UNENCRYPTED_FRAME_MAX_SIZE = 65519; const size_t NTCP2_UNENCRYPTED_FRAME_MAX_SIZE = 65519;
const int NTCP2_MAX_PADDING_RATIO = 6; // in % const int NTCP2_MAX_PADDING_RATIO = 6; // in %
const int NTCP2_CONNECT_TIMEOUT = 5; // 5 seconds
const int NTCP2_ESTABLISH_TIMEOUT = 10; // 10 seconds
const int NTCP2_TERMINATION_TIMEOUT = 120; // 2 minutes
const int NTCP2_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds
enum NTCP2BlockType enum NTCP2BlockType
{ {
eNTCP2BlkDateTime = 0, eNTCP2BlkDateTime = 0,
@ -39,7 +45,7 @@ namespace transport
eNTCP2IncompatibleSignatureType, // 6 eNTCP2IncompatibleSignatureType, // 6
eNTCP2ClockSkew, // 7 eNTCP2ClockSkew, // 7
eNTCP2PaddingViolation, // 8 eNTCP2PaddingViolation, // 8
eNTCP2AEADFraminError, // 9 eNTCP2AEADFramingError, // 9
eNTCP2PayloadFormatError, // 10 eNTCP2PayloadFormatError, // 10
eNTCP2Message1Error, // 11 eNTCP2Message1Error, // 11
eNTCP2Message2Error, // 12 eNTCP2Message2Error, // 12
@ -92,6 +98,7 @@ namespace transport
NTCP2Session (NTCP2Server& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter = nullptr); NTCP2Session (NTCP2Server& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter = nullptr);
~NTCP2Session (); ~NTCP2Session ();
void Terminate (); void Terminate ();
void TerminateByTimeout ();
void Done (); void Done ();
boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; };
@ -186,7 +193,11 @@ namespace transport
void HandleAccept (std::shared_ptr<NTCP2Session> conn, const boost::system::error_code& error); void HandleAccept (std::shared_ptr<NTCP2Session> conn, const boost::system::error_code& error);
void HandleAcceptV6 (std::shared_ptr<NTCP2Session> conn, const boost::system::error_code& error); void HandleAcceptV6 (std::shared_ptr<NTCP2Session> conn, const boost::system::error_code& error);
void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCP2Session> conn); void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCP2Session> conn, std::shared_ptr<boost::asio::deadline_timer> timer);
// timer
void ScheduleTermination ();
void HandleTerminationTimer (const boost::system::error_code& ecode);
private: private:
@ -194,6 +205,7 @@ namespace transport
std::thread * m_Thread; std::thread * m_Thread;
boost::asio::io_service m_Service; boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work; boost::asio::io_service::work m_Work;
boost::asio::deadline_timer m_TerminationTimer;
std::unique_ptr<boost::asio::ip::tcp::acceptor> m_NTCP2Acceptor, m_NTCP2V6Acceptor; std::unique_ptr<boost::asio::ip::tcp::acceptor> m_NTCP2Acceptor, m_NTCP2V6Acceptor;
std::map<i2p::data::IdentHash, std::shared_ptr<NTCP2Session> > m_NTCP2Sessions; std::map<i2p::data::IdentHash, std::shared_ptr<NTCP2Session> > m_NTCP2Sessions;
std::list<std::shared_ptr<NTCP2Session> > m_PendingIncomingSessions; std::list<std::shared_ptr<NTCP2Session> > m_PendingIncomingSessions;

Loading…
Cancel
Save