From f9731a76ec6dbf6e448780b55fdae438513a18b4 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 8 Feb 2015 20:28:18 -0500 Subject: [PATCH] separate receivers thread --- SSU.cpp | 79 ++++++++++++++++++++++++++++++++++++++++++--------------- SSU.h | 23 ++++++++++------- 2 files changed, 72 insertions(+), 30 deletions(-) diff --git a/SSU.cpp b/SSU.cpp index 91856345..54c4fb6c 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -9,10 +9,11 @@ namespace i2p { namespace transport { - SSUServer::SSUServer (int port): m_Thread (nullptr), m_ThreadV6 (nullptr), m_Work (m_Service), - m_WorkV6 (m_ServiceV6),m_Endpoint (boost::asio::ip::udp::v4 (), port), - m_EndpointV6 (boost::asio::ip::udp::v6 (), port), m_Socket (m_Service, m_Endpoint), - m_SocketV6 (m_ServiceV6), m_IntroducersUpdateTimer (m_Service) + SSUServer::SSUServer (int port): m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr), + m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), + m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port), + m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), + m_IntroducersUpdateTimer (m_Service) { m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535)); m_Socket.set_option (boost::asio::socket_base::send_buffer_size (65535)); @@ -33,12 +34,13 @@ namespace transport void SSUServer::Start () { m_IsRunning = true; + m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this)); m_Thread = new std::thread (std::bind (&SSUServer::Run, this)); - m_Service.post (std::bind (&SSUServer::Receive, this)); + m_ReceiversService.post (std::bind (&SSUServer::Receive, this)); if (context.SupportsV6 ()) { m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this)); - m_ServiceV6.post (std::bind (&SSUServer::ReceiveV6, this)); + m_ReceiversService.post (std::bind (&SSUServer::ReceiveV6, this)); } if (i2p::context.IsUnreachable ()) ScheduleIntroducersUpdateTimer (); @@ -52,6 +54,12 @@ namespace transport m_Socket.close (); m_ServiceV6.stop (); m_SocketV6.close (); + if (m_ReceiversThread) + { + m_ReceiversThread->join (); + delete m_ReceiversThread; + m_ReceiversThread = nullptr; + } if (m_Thread) { m_Thread->join (); @@ -95,6 +103,21 @@ namespace transport } } } + + void SSUServer::RunReceivers () + { + while (m_IsRunning) + { + try + { + m_ReceiversService.run (); + } + catch (std::exception& ex) + { + LogPrint (eLogError, "SSU receivers: ", ex.what ()); + } + } + } void SSUServer::AddRelay (uint32_t tag, const boost::asio::ip::udp::endpoint& relay) { @@ -119,55 +142,66 @@ namespace transport void SSUServer::Receive () { - m_Socket.async_receive_from (boost::asio::buffer (m_ReceiveBuffer, SSU_MTU_V4), m_SenderEndpoint, - boost::bind (&SSUServer::HandleReceivedFrom, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + SSUPacket * packet = new SSUPacket (); + m_Socket.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from, + std::bind (&SSUServer::HandleReceivedFrom, this, std::placeholders::_1, std::placeholders::_2, packet)); } void SSUServer::ReceiveV6 () { - m_SocketV6.async_receive_from (boost::asio::buffer (m_ReceiveBufferV6, SSU_MTU_V6), m_SenderEndpointV6, - boost::bind (&SSUServer::HandleReceivedFromV6, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + SSUPacket * packet = new SSUPacket (); + m_SocketV6.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from, + std::bind (&SSUServer::HandleReceivedFromV6, this, std::placeholders::_1, std::placeholders::_2, packet)); } - void SSUServer::HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred) + void SSUServer::HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet) { if (!ecode) { - HandleReceivedBuffer (m_SenderEndpoint, m_ReceiveBuffer, bytes_transferred); + packet->len = bytes_transferred; + m_Service.post (std::bind (&SSUServer::HandleReceivedBuffer, this, packet)); Receive (); } else + { LogPrint ("SSU receive error: ", ecode.message ()); + delete packet; + } } - void SSUServer::HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred) + void SSUServer::HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet) { if (!ecode) { - HandleReceivedBuffer (m_SenderEndpointV6, m_ReceiveBufferV6, bytes_transferred); + packet->len = bytes_transferred; + m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedBuffer, this, packet)); ReceiveV6 (); } else + { LogPrint ("SSU V6 receive error: ", ecode.message ()); + delete packet; + } } - void SSUServer::HandleReceivedBuffer (boost::asio::ip::udp::endpoint& from, uint8_t * buf, std::size_t bytes_transferred) + void SSUServer::HandleReceivedBuffer (SSUPacket * packet) { std::shared_ptr session; - auto it = m_Sessions.find (from); + auto it = m_Sessions.find (packet->from); if (it != m_Sessions.end ()) session = it->second; if (!session) { - session = std::make_shared (*this, from); + session = std::make_shared (*this, packet->from); session->WaitForConnect (); { std::unique_lock l(m_SessionsMutex); - m_Sessions[from] = session; + m_Sessions[packet->from] = session; } - LogPrint ("New SSU session from ", from.address ().to_string (), ":", from.port (), " created"); + LogPrint ("New SSU session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created"); } - session->ProcessNextMessage (buf, bytes_transferred, from); + session->ProcessNextMessage (packet->buf, packet->len, packet->from); + delete packet; } std::shared_ptr SSUServer::FindSession (std::shared_ptr router) const @@ -256,7 +290,10 @@ namespace transport "] through introducer ", introducer->iHost, ":", introducer->iPort); session->WaitForIntroduction (); if (i2p::context.GetRouterInfo ().UsesIntroducer ()) // if we are unreachable - Send (m_ReceiveBuffer, 0, remoteEndpoint); // send HolePunch + { + uint8_t buf[1]; + Send (buf, 0, remoteEndpoint); // send HolePunch + } introducerSession->Introduce (introducer->iTag, introducer->iKey); } else diff --git a/SSU.h b/SSU.h index a1511f51..01ca2607 100644 --- a/SSU.h +++ b/SSU.h @@ -23,6 +23,13 @@ namespace transport const int SSU_KEEP_ALIVE_INTERVAL = 30; // 30 seconds const int SSU_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour const size_t SSU_MAX_NUM_INTRODUCERS = 3; + + struct SSUPacket + { + i2p::crypto::AESAlignedBuffer<1500> buf; + boost::asio::ip::udp::endpoint from; + size_t len; + }; class SSUServer { @@ -50,11 +57,12 @@ namespace transport void Run (); void RunV6 (); + void RunReceivers (); void Receive (); void ReceiveV6 (); - void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void HandleReceivedBuffer (boost::asio::ip::udp::endpoint& from, uint8_t * buf, std::size_t bytes_transferred); + void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); + void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); + void HandleReceivedBuffer (SSUPacket * packet); template std::shared_ptr GetRandomSession (Filter filter); @@ -66,16 +74,13 @@ namespace transport private: bool m_IsRunning; - std::thread * m_Thread, * m_ThreadV6; - boost::asio::io_service m_Service, m_ServiceV6; - boost::asio::io_service::work m_Work, m_WorkV6; + std::thread * m_Thread, * m_ThreadV6, * m_ReceiversThread; + boost::asio::io_service m_Service, m_ServiceV6, m_ReceiversService; + boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork; boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6; boost::asio::ip::udp::socket m_Socket, m_SocketV6; - boost::asio::ip::udp::endpoint m_SenderEndpoint, m_SenderEndpointV6; boost::asio::deadline_timer m_IntroducersUpdateTimer; std::list m_Introducers; // introducers we are connected to - i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V4> m_ReceiveBuffer; - i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V6> m_ReceiveBufferV6; std::mutex m_SessionsMutex; std::map > m_Sessions; std::map m_Relays; // we are introducer