From 6683a9cf76cd141b5b613ec9279912a03f21273d Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 11 Jan 2015 17:41:56 -0500 Subject: [PATCH] moved NTCP to separate thread --- HTTPServer.cpp | 32 +++++---- NTCPSession.cpp | 182 ++++++++++++++++++++++++++++++++++++++++++++++-- NTCPSession.h | 48 ++++++++++++- Transports.cpp | 139 +++++------------------------------- Transports.h | 16 +---- 5 files changed, 261 insertions(+), 156 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 9da6f206..042a03c4 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -709,22 +709,26 @@ namespace util void HTTPConnection::ShowTransports (std::stringstream& s) { - s << "NTCP
"; - for (auto it: i2p::transport::transports.GetNTCPSessions ()) - { - if (it.second && it.second->IsEstablished ()) + auto ntcpServer = i2p::transport::transports.GetNTCPServer (); + if (ntcpServer) + { + s << "NTCP
"; + for (auto it: ntcpServer->GetNTCPSessions ()) { - // incoming connection doesn't have remote RI - auto outgoing = it.second->GetRemoteRouter (); - if (outgoing) s << "-->"; - s << it.second->GetRemoteIdentity ().GetIdentHash ().ToBase64 ().substr (0, 4) << ": " - << it.second->GetSocket ().remote_endpoint().address ().to_string (); - if (!outgoing) s << "-->"; - s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]"; - s << "
"; + if (it.second && it.second->IsEstablished ()) + { + // incoming connection doesn't have remote RI + auto outgoing = it.second->GetRemoteRouter (); + if (outgoing) s << "-->"; + s << it.second->GetRemoteIdentity ().GetIdentHash ().ToBase64 ().substr (0, 4) << ": " + << it.second->GetSocket ().remote_endpoint().address ().to_string (); + if (!outgoing) s << "-->"; + s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]"; + s << "
"; + } + s << std::endl; } - s << std::endl; - } + } auto ssuServer = i2p::transport::transports.GetSSUServer (); if (ssuServer) { diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 3f326098..753d1950 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -18,9 +18,9 @@ namespace i2p { namespace transport { - NTCPSession::NTCPSession (boost::asio::io_service& service, std::shared_ptr in_RemoteRouter): - TransportSession (in_RemoteRouter), m_Socket (service), - m_TerminationTimer (service), m_IsEstablished (false), m_ReceiveBufferOffset (0), + NTCPSession::NTCPSession (NTCPServer& server, std::shared_ptr in_RemoteRouter): + TransportSession (in_RemoteRouter), m_Server (server), m_Socket (m_Server.GetService ()), + m_TerminationTimer (m_Server.GetService ()), m_IsEstablished (false), m_ReceiveBufferOffset (0), m_NextMessage (nullptr), m_NumSentBytes (0), m_NumReceivedBytes (0) { m_DHKeysPair = transports.GetNextDHKeysPair (); @@ -89,7 +89,7 @@ namespace transport if (numDelayed > 0) LogPrint (eLogWarning, "NTCP session ", numDelayed, " not sent"); // TODO: notify tunnels - transports.RemoveNTCPSession (shared_from_this ()); + m_Server.RemoveNTCPSession (shared_from_this ()); LogPrint ("NTCP session terminated"); } @@ -427,7 +427,7 @@ namespace transport { LogPrint (eLogDebug, "Phase 4 sent: ", bytes_transferred); LogPrint ("NTCP server session connected"); - transports.AddNTCPSession (shared_from_this ()); + m_Server.AddNTCPSession (shared_from_this ()); Connected (); m_ReceiveBufferOffset = 0; @@ -654,5 +654,177 @@ namespace transport m_Socket.close ();// invoke Terminate () from HandleReceive } } + +//----------------------------------------- + NTCPServer::NTCPServer (int port): + m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), + m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr) + { + } + + NTCPServer::~NTCPServer () + { + Stop (); + } + + void NTCPServer::Start () + { + if (!m_IsRunning) + { + m_IsRunning = true; + m_Thread = new std::thread (std::bind (&NTCPServer::Run, this)); + // create acceptors + auto addresses = context.GetRouterInfo ().GetAddresses (); + for (auto& address : addresses) + { + if (address.transportStyle == i2p::data::RouterInfo::eTransportNTCP && address.host.is_v4 ()) + { + m_NTCPAcceptor = new boost::asio::ip::tcp::acceptor (m_Service, + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address.port)); + + LogPrint ("Start listening TCP port ", address.port); + auto conn = std::make_shared(*this); + m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this, + conn, std::placeholders::_1)); + + if (context.SupportsV6 ()) + { + m_NTCPV6Acceptor = new boost::asio::ip::tcp::acceptor (m_Service); + m_NTCPV6Acceptor->open (boost::asio::ip::tcp::v6()); + m_NTCPV6Acceptor->set_option (boost::asio::ip::v6_only (true)); + m_NTCPV6Acceptor->bind (boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v6(), address.port)); + m_NTCPV6Acceptor->listen (); + + LogPrint ("Start listening V6 TCP port ", address.port); + auto conn = std::make_shared (*this); + m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6, + this, conn, std::placeholders::_1)); + } + } + } + } + } + + void NTCPServer::Stop () + { + m_NTCPSessions.clear (); + + if (m_IsRunning) + { + m_IsRunning = false; + delete m_NTCPAcceptor; + m_NTCPAcceptor = nullptr; + delete m_NTCPV6Acceptor; + m_NTCPV6Acceptor = nullptr; + + m_Service.stop (); + if (m_Thread) + { + m_Thread->join (); + delete m_Thread; + m_Thread = nullptr; + } + } + } + + + void NTCPServer::Run () + { + while (m_IsRunning) + { + try + { + m_Service.run (); + } + catch (std::exception& ex) + { + LogPrint ("NTCP server: ", ex.what ()); + } + } + } + + void NTCPServer::AddNTCPSession (std::shared_ptr session) + { + if (session) + m_NTCPSessions[session->GetRemoteIdentity ().GetIdentHash ()] = session; + } + + void NTCPServer::RemoveNTCPSession (std::shared_ptr session) + { + if (session) + m_NTCPSessions.erase (session->GetRemoteIdentity ().GetIdentHash ()); + } + + std::shared_ptr NTCPServer::FindNTCPSession (const i2p::data::IdentHash& ident) + { + auto it = m_NTCPSessions.find (ident); + if (it != m_NTCPSessions.end ()) + return it->second; + return nullptr; + } + + void NTCPServer::HandleAccept (std::shared_ptr conn, const boost::system::error_code& error) + { + if (!error) + { + LogPrint ("Connected from ", conn->GetSocket ().remote_endpoint().address ().to_string ()); + conn->ServerLogin (); + } + + + if (error != boost::asio::error::operation_aborted) + { + conn = std::make_shared (*this); + m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this, + conn, std::placeholders::_1)); + } + } + + void NTCPServer::HandleAcceptV6 (std::shared_ptr conn, const boost::system::error_code& error) + { + if (!error) + { + LogPrint ("Connected from ", conn->GetSocket ().remote_endpoint().address ().to_string ()); + conn->ServerLogin (); + } + + if (error != boost::asio::error::operation_aborted) + { + conn = std::make_shared (*this); + m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6, this, + conn, std::placeholders::_1)); + } + } + + void NTCPServer::Connect (const boost::asio::ip::address& address, int port, std::shared_ptr conn) + { + LogPrint ("Connecting to ", address ,":", port); + m_Service.post([conn, this]() + { + this->AddNTCPSession (conn); + }); + conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port), + std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn)); + } + + void NTCPServer::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn) + { + if (ecode) + { + LogPrint ("Connect error: ", ecode.message ()); + if (ecode != boost::asio::error::operation_aborted) + { + i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ().GetIdentHash (), true); + conn->Terminate (); + } + } + else + { + LogPrint ("Connected"); + if (conn->GetSocket ().local_endpoint ().protocol () == boost::asio::ip::tcp::v6()) // ipv6 + context.UpdateNTCPV6Address (conn->GetSocket ().local_endpoint ().address ()); + conn->ClientLogin (); + } + } } } diff --git a/NTCPSession.h b/NTCPSession.h index 1d94475d..b08f6e44 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -3,7 +3,10 @@ #include #include +#include #include +#include +#include #include #include #include @@ -43,11 +46,12 @@ namespace transport const int NTCP_TERMINATION_TIMEOUT = 120; // 2 minutes const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448 + class NTCPServer; class NTCPSession: public TransportSession, public std::enable_shared_from_this { public: - NTCPSession (boost::asio::io_service& service, std::shared_ptr in_RemoteRouter = nullptr); + NTCPSession (NTCPServer& server, std::shared_ptr in_RemoteRouter = nullptr); ~NTCPSession (); void Terminate (); @@ -103,6 +107,7 @@ namespace transport private: + NTCPServer& m_Server; boost::asio::ip::tcp::socket m_Socket; boost::asio::deadline_timer m_TerminationTimer; bool m_IsEstablished; @@ -127,6 +132,47 @@ namespace transport size_t m_NumSentBytes, m_NumReceivedBytes; }; + + // TODO: move to NTCP.h/.cpp + class NTCPServer + { + public: + + NTCPServer (int port); + ~NTCPServer (); + + void Start (); + void Stop (); + + void AddNTCPSession (std::shared_ptr session); + void RemoveNTCPSession (std::shared_ptr session); + std::shared_ptr FindNTCPSession (const i2p::data::IdentHash& ident); + void Connect (const boost::asio::ip::address& address, int port, std::shared_ptr conn); + + boost::asio::io_service& GetService () { return m_Service; }; + + private: + + void Run (); + void HandleAccept (std::shared_ptr conn, const boost::system::error_code& error); + void HandleAcceptV6 (std::shared_ptr conn, const boost::system::error_code& error); + + void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn); + + private: + + bool m_IsRunning; + std::thread * m_Thread; + boost::asio::io_service m_Service; + boost::asio::io_service::work m_Work; + boost::asio::ip::tcp::acceptor * m_NTCPAcceptor, * m_NTCPV6Acceptor; + std::map > m_NTCPSessions; + + public: + + // for HTTP/I2PControl + const decltype(m_NTCPSessions)& GetNTCPSessions () const { return m_NTCPSessions; }; + }; } } diff --git a/Transports.cpp b/Transports.cpp index 680389e7..f36cfd2a 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -96,8 +96,9 @@ namespace transport Transports transports; Transports::Transports (): - m_Thread (nullptr), m_Work (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr), - m_SSUServer (nullptr), m_DHKeysPairSupplier (5) // 5 pre-generated keys + m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), + m_NTCPServer (nullptr), m_SSUServer (nullptr), + m_DHKeysPairSupplier (5) // 5 pre-generated keys { } @@ -115,31 +116,13 @@ namespace transport auto addresses = context.GetRouterInfo ().GetAddresses (); for (auto& address : addresses) { - if (address.transportStyle == RouterInfo::eTransportNTCP && address.host.is_v4 ()) + if (!m_NTCPServer) { - m_NTCPAcceptor = new boost::asio::ip::tcp::acceptor (m_Service, - boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address.port)); - - LogPrint ("Start listening TCP port ", address.port); - auto conn = std::make_shared(m_Service); - m_NTCPAcceptor->async_accept(conn->GetSocket (), boost::bind (&Transports::HandleAccept, this, - conn, boost::asio::placeholders::error)); - - if (context.SupportsV6 ()) - { - m_NTCPV6Acceptor = new boost::asio::ip::tcp::acceptor (m_Service); - m_NTCPV6Acceptor->open (boost::asio::ip::tcp::v6()); - m_NTCPV6Acceptor->set_option (boost::asio::ip::v6_only (true)); - m_NTCPV6Acceptor->bind (boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v6(), address.port)); - m_NTCPV6Acceptor->listen (); - - LogPrint ("Start listening V6 TCP port ", address.port); - auto conn = std::make_shared (m_Service); - m_NTCPV6Acceptor->async_accept(conn->GetSocket (), boost::bind (&Transports::HandleAcceptV6, - this, conn, boost::asio::placeholders::error)); - } + m_NTCPServer = new NTCPServer (address.port); + m_NTCPServer->Start (); } - else if (address.transportStyle == RouterInfo::eTransportSSU && address.host.is_v4 ()) + + if (address.transportStyle == RouterInfo::eTransportSSU && address.host.is_v4 ()) { if (!m_SSUServer) { @@ -162,12 +145,12 @@ namespace transport delete m_SSUServer; m_SSUServer = nullptr; } - m_NTCPSessions.clear (); - - delete m_NTCPAcceptor; - m_NTCPAcceptor = nullptr; - delete m_NTCPV6Acceptor; - m_NTCPV6Acceptor = nullptr; + if (m_NTCPServer) + { + m_NTCPServer->Stop (); + delete m_NTCPServer; + m_NTCPServer = nullptr; + } m_DHKeysPairSupplier.Stop (); m_IsRunning = false; @@ -195,93 +178,6 @@ namespace transport } } - void Transports::AddNTCPSession (std::shared_ptr session) - { - if (session) - m_NTCPSessions[session->GetRemoteIdentity ().GetIdentHash ()] = session; - } - - void Transports::RemoveNTCPSession (std::shared_ptr session) - { - if (session) - m_NTCPSessions.erase (session->GetRemoteIdentity ().GetIdentHash ()); - } - - void Transports::HandleAccept (std::shared_ptr conn, const boost::system::error_code& error) - { - if (!error) - { - LogPrint ("Connected from ", conn->GetSocket ().remote_endpoint().address ().to_string ()); - conn->ServerLogin (); - } - - - if (error != boost::asio::error::operation_aborted) - { - conn = std::make_shared (m_Service); - m_NTCPAcceptor->async_accept(conn->GetSocket (), boost::bind (&Transports::HandleAccept, this, - conn, boost::asio::placeholders::error)); - } - } - - void Transports::HandleAcceptV6 (std::shared_ptr conn, const boost::system::error_code& error) - { - if (!error) - { - LogPrint ("Connected from ", conn->GetSocket ().remote_endpoint().address ().to_string ()); - conn->ServerLogin (); - } - - if (error != boost::asio::error::operation_aborted) - { - conn = std::make_shared (m_Service); - m_NTCPV6Acceptor->async_accept(conn->GetSocket (), boost::bind (&Transports::HandleAcceptV6, this, - conn, boost::asio::placeholders::error)); - } - } - - void Transports::Connect (const boost::asio::ip::address& address, int port, std::shared_ptr conn) - { - LogPrint ("Connecting to ", address ,":", port); - conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port), - boost::bind (&Transports::HandleConnect, this, boost::asio::placeholders::error, conn)); - } - - void Transports::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn) - { - if (ecode) - { - LogPrint ("Connect error: ", ecode.message ()); - if (ecode != boost::asio::error::operation_aborted) - { - i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ().GetIdentHash (), true); - conn->Terminate (); - } - } - else - { - LogPrint ("Connected"); - if (conn->GetSocket ().local_endpoint ().protocol () == boost::asio::ip::tcp::v6()) // ipv6 - context.UpdateNTCPV6Address (conn->GetSocket ().local_endpoint ().address ()); - conn->ClientLogin (); - } - } - - std::shared_ptr Transports::GetNextNTCPSession () - { - for (auto session: m_NTCPSessions) - if (session.second->IsEstablished ()) - return session.second; - return 0; - } - - std::shared_ptr Transports::FindNTCPSession (const i2p::data::IdentHash& ident) - { - auto it = m_NTCPSessions.find (ident); - if (it != m_NTCPSessions.end ()) - return it->second; - return 0; - } void Transports::SendMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg) { @@ -296,7 +192,7 @@ namespace transport i2p::HandleI2NPMessage (msg); return; } - std::shared_ptr session = FindNTCPSession (ident); + std::shared_ptr session = m_NTCPServer->FindNTCPSession (ident); if (!session) { auto r = netdb.FindRouter (ident); @@ -311,10 +207,9 @@ namespace transport auto address = r->GetNTCPAddress (!context.SupportsV6 ()); if (address && !r->UsesIntroducer () && !r->IsUnreachable () && msg->GetLength () < NTCP_MAX_MESSAGE_SIZE) { - auto s = std::make_shared (m_Service, r); - AddNTCPSession (s); + auto s = std::make_shared (*m_NTCPServer, r); session = s; - Connect (address->host, address->port, s); + m_NTCPServer->Connect (address->host, address->port, s); } else { diff --git a/Transports.h b/Transports.h index 3b5b2d4e..07f2afd3 100644 --- a/Transports.h +++ b/Transports.h @@ -64,27 +64,16 @@ namespace transport i2p::transport::DHKeysPair * GetNextDHKeysPair (); void ReuseDHKeysPair (DHKeysPair * pair); - void AddNTCPSession (std::shared_ptr session); - void RemoveNTCPSession (std::shared_ptr session); - - std::shared_ptr GetNextNTCPSession (); - std::shared_ptr FindNTCPSession (const i2p::data::IdentHash& ident); - void SendMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); void CloseSession (std::shared_ptr router); private: void Run (); - void HandleAccept (std::shared_ptr conn, const boost::system::error_code& error); - void HandleAcceptV6 (std::shared_ptr conn, const boost::system::error_code& error); void HandleResendTimer (const boost::system::error_code& ecode, boost::asio::deadline_timer * timer, const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); void PostCloseSession (std::shared_ptr router); - - void Connect (const boost::asio::ip::address& address, int port, std::shared_ptr conn); - void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn); void DetectExternalIP (); @@ -94,9 +83,8 @@ namespace transport std::thread * m_Thread; boost::asio::io_service m_Service; boost::asio::io_service::work m_Work; - boost::asio::ip::tcp::acceptor * m_NTCPAcceptor, * m_NTCPV6Acceptor; - std::map > m_NTCPSessions; + NTCPServer * m_NTCPServer; SSUServer * m_SSUServer; DHKeysPairSupplier m_DHKeysPairSupplier; @@ -104,7 +92,7 @@ namespace transport public: // for HTTP only - const decltype(m_NTCPSessions)& GetNTCPSessions () const { return m_NTCPSessions; }; + const NTCPServer * GetNTCPServer () const { return m_NTCPServer; }; const SSUServer * GetSSUServer () const { return m_SSUServer; }; };