diff --git a/SSU.cpp b/SSU.cpp index 8d53fcb4..93eedb23 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -13,12 +13,13 @@ namespace transport SSUServer::SSUServer (const boost::asio::ip::address & addr, int port): m_OnlyV6(true), m_IsRunning(false), - m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr), - m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), - m_EndpointV6 (addr, port), - m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), - m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), - m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) + m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr), + m_ReceiversThreadV6 (nullptr), m_Work (m_Service), m_WorkV6 (m_ServiceV6), + m_ReceiversWork (m_ReceiversService), m_ReceiversWorkV6 (m_ReceiversServiceV6), + m_EndpointV6 (addr, port), m_Socket (m_ReceiversService, m_Endpoint), + m_SocketV6 (m_ReceiversServiceV6), m_IntroducersUpdateTimer (m_Service), + m_PeerTestsCleanupTimer (m_Service), m_TerminationTimer (m_Service), + m_TerminationTimerV6 (m_ServiceV6) { OpenSocketV6 (); } @@ -26,9 +27,10 @@ namespace transport SSUServer::SSUServer (int port): m_OnlyV6(false), m_IsRunning(false), m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr), - m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), + m_ReceiversThreadV6 (nullptr), m_Work (m_Service), m_WorkV6 (m_ServiceV6), + m_ReceiversWork (m_ReceiversService), m_ReceiversWorkV6 (m_ReceiversServiceV6), m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port), - m_Socket (m_ReceiversService), m_SocketV6 (m_ReceiversService), + m_Socket (m_ReceiversService), m_SocketV6 (m_ReceiversServiceV6), m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) { @@ -61,17 +63,18 @@ namespace transport void SSUServer::Start () { m_IsRunning = true; - m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this)); if (!m_OnlyV6) { + m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this)); m_Thread = new std::thread (std::bind (&SSUServer::Run, this)); m_ReceiversService.post (std::bind (&SSUServer::Receive, this)); ScheduleTermination (); } if (context.SupportsV6 ()) { + m_ReceiversThreadV6 = new std::thread (std::bind (&SSUServer::RunReceiversV6, this)); m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this)); - m_ReceiversService.post (std::bind (&SSUServer::ReceiveV6, this)); + m_ReceiversServiceV6.post (std::bind (&SSUServer::ReceiveV6, this)); ScheduleTerminationV6 (); } SchedulePeerTestsCleanupTimer (); @@ -89,6 +92,7 @@ namespace transport m_ServiceV6.stop (); m_SocketV6.close (); m_ReceiversService.stop (); + m_ReceiversServiceV6.stop (); if (m_ReceiversThread) { m_ReceiversThread->join (); @@ -101,6 +105,12 @@ namespace transport delete m_Thread; m_Thread = nullptr; } + if (m_ReceiversThreadV6) + { + m_ReceiversThreadV6->join (); + delete m_ReceiversThreadV6; + m_ReceiversThreadV6 = nullptr; + } if (m_ThreadV6) { m_ThreadV6->join (); @@ -154,6 +164,21 @@ namespace transport } } + void SSUServer::RunReceiversV6 () + { + while (m_IsRunning) + { + try + { + m_ReceiversServiceV6.run (); + } + catch (std::exception& ex) + { + LogPrint (eLogError, "SSU: v6 receivers runtime exception: ", ex.what ()); + } + } + } + void SSUServer::AddRelay (uint32_t tag, std::shared_ptr relay) { m_Relays[tag] = relay; diff --git a/SSU.h b/SSU.h index 44dab63b..c9881fd0 100644 --- a/SSU.h +++ b/SSU.h @@ -26,7 +26,7 @@ namespace transport const int SSU_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds const size_t SSU_MAX_NUM_INTRODUCERS = 3; const size_t SSU_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K - const size_t SSU_SOCKET_SEND_BUFFER_SIZE = 0xFFFF; // 64K + const size_t SSU_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K struct SSUPacket { @@ -76,6 +76,7 @@ namespace transport void Run (); void RunV6 (); void RunReceivers (); + void RunReceiversV6 (); void Receive (); void ReceiveV6 (); void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); @@ -113,9 +114,9 @@ namespace transport bool m_OnlyV6; bool m_IsRunning; - std::thread * m_Thread, * m_ThreadV6, * m_ReceiversThread; - boost::asio::io_service m_Service, m_ServiceV6, m_ReceiversService; - boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork; + std::thread * m_Thread, * m_ThreadV6, * m_ReceiversThread, * m_ReceiversThreadV6; + boost::asio::io_service m_Service, m_ServiceV6, m_ReceiversService, m_ReceiversServiceV6; + boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork, m_ReceiversWorkV6; boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6; boost::asio::ip::udp::socket m_Socket, m_SocketV6; boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer,