diff --git a/libi2pd/Config.cpp b/libi2pd/Config.cpp index 558d5e0a..d9cc6dec 100644 --- a/libi2pd/Config.cpp +++ b/libi2pd/Config.cpp @@ -73,6 +73,7 @@ namespace config { ("limits.transittunnels", value()->default_value(2500), "Maximum active transit sessions (default:2500)") ("limits.ntcpsoft", value()->default_value(0), "Threshold to start probabalistic backoff with ntcp sessions (default: use system limit)") ("limits.ntcphard", value()->default_value(0), "Maximum number of ntcp sessions (default: use system limit)") + ("limits.ntcpthreads", value()->default_value(1), "Maximum number of threads used by NTCP DH worker (default: 1)") ; options_description httpserver("HTTP Server options"); diff --git a/libi2pd/CryptoWorker.h b/libi2pd/CryptoWorker.h new file mode 100644 index 00000000..d43e356c --- /dev/null +++ b/libi2pd/CryptoWorker.h @@ -0,0 +1,81 @@ +#ifndef CRYPTO_WORKER_H_ +#define CRYPTO_WORKER_H_ + +#include +#include +#include +#include +#include +#include + +namespace i2p +{ +namespace worker +{ + template + struct ThreadPool + { + typedef std::function ResultFunc; + typedef std::function WorkFunc; + typedef std::pair, WorkFunc> Job; + typedef std::mutex mtx_t; + typedef std::unique_lock lock_t; + typedef std::condition_variable cond_t; + ThreadPool(int workers) + { + stop = false; + if(workers > 0) + { + while(workers--) + { + threads.emplace_back([this] { + for (;;) + { + Job job; + { + lock_t lock(this->queue_mutex); + this->condition.wait( + lock, [this] { return this->stop || !this->jobs.empty(); }); + if (this->stop && this->jobs.empty()) return; + job = std::move(this->jobs.front()); + this->jobs.pop_front(); + } + ResultFunc result = job.second(); + job.first->GetService().post(result); + } + }); + } + } + }; + + void Offer(const Job & job) + { + { + lock_t lock(queue_mutex); + if (stop) return; + jobs.emplace_back(job); + } + condition.notify_one(); + } + + ~ThreadPool() + { + { + lock_t lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for(auto &t: threads) t.join(); + } + + std::vector threads; + std::deque jobs; + mtx_t queue_mutex; + cond_t condition; + bool stop; + }; +} +} + + +#endif diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index 85c32743..e3d6c004 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -105,6 +105,11 @@ namespace transport transports.PeerConnected (shared_from_this ()); } + boost::asio::io_service & NTCPSession::GetService() + { + return m_Server.GetService(); + } + void NTCPSession::ClientLogin () { if (!m_DHKeysPair) @@ -171,27 +176,14 @@ namespace transport return; } } -#if ((__GNUC__ == 4) && (__GNUC_MINOR__ <= 7)) || defined(__NetBSD__) -// due the bug in gcc 4.7. std::shared_future.get() is not const - if (!m_DHKeysPair) - m_DHKeysPair = transports.GetNextDHKeysPair (); - CreateAESKey (m_Establisher->phase1.pubKey); - SendPhase2 (); -#else // TODO: check for number of pending keys auto s = shared_from_this (); - auto keyCreated = std::async (std::launch::async, [s] () - { + m_Server.Work(s, [s]() -> std::function { if (!s->m_DHKeysPair) s->m_DHKeysPair = transports.GetNextDHKeysPair (); s->CreateAESKey (s->m_Establisher->phase1.pubKey); - }).share (); - m_Server.GetService ().post ([s, keyCreated]() - { - keyCreated.get (); - s->SendPhase2 (); + return std::bind(&NTCPSession::SendPhase2, s); }); -#endif } } @@ -211,9 +203,8 @@ namespace transport m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16); m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); - boost::asio::async_write (m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsB)); - + boost::asio::async_write(m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all(), + std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this(), std::placeholders::_1, std::placeholders::_2, tsB)); } void NTCPSession::HandlePhase2Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB) @@ -250,24 +241,11 @@ namespace transport } else { -#if ((__GNUC__ == 4) && (__GNUC_MINOR__ <= 7)) || defined(__NetBSD__) -// due the bug in gcc 4.7. std::shared_future.get() is not const - CreateAESKey (m_Establisher->phase2.pubKey); - HandlePhase2 (); -#else auto s = shared_from_this (); - // create AES key in separate thread - auto keyCreated = std::async (std::launch::async, [s] () - { - s->CreateAESKey (s->m_Establisher->phase2.pubKey); - }).share (); // TODO: use move capture in C++ 14 instead shared_future - // let other operations execute while a key gets created - m_Server.GetService ().post ([s, keyCreated]() - { - keyCreated.get (); // we might wait if no more pending operations - s->HandlePhase2 (); - }); -#endif + m_Server.Work(s, [s]() -> std::function { + s->CreateAESKey (s->m_Establisher->phase2.pubKey); + return std::bind(&NTCPSession::HandlePhase2, s); + }); } } @@ -788,12 +766,14 @@ namespace transport } //----------------------------------------- - NTCPServer::NTCPServer (): + NTCPServer::NTCPServer (int workers): m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr), m_ProxyType(eNoProxy), m_Resolver(m_Service), m_ProxyEndpoint(nullptr), m_SoftLimit(0), m_HardLimit(0) { + if(workers <= 0) workers = 1; + m_CryptoPool = std::make_shared(workers); } NTCPServer::~NTCPServer () diff --git a/libi2pd/NTCPSession.h b/libi2pd/NTCPSession.h index b64f63aa..5335fbdd 100644 --- a/libi2pd/NTCPSession.h +++ b/libi2pd/NTCPSession.h @@ -12,6 +12,7 @@ #include "RouterInfo.h" #include "I2NPProtocol.h" #include "TransportSession.h" +#include "CryptoWorker.h" namespace i2p { @@ -55,6 +56,7 @@ namespace transport void Done (); boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; + boost::asio::io_service & GetService(); bool IsEstablished () const { return m_IsEstablished; }; bool IsTerminated () const { return m_IsTerminated; }; @@ -131,6 +133,8 @@ namespace transport { public: + typedef i2p::worker::ThreadPool Pool; + enum RemoteAddressType { eIP4Address, @@ -146,7 +150,7 @@ namespace transport }; - NTCPServer (); + NTCPServer (int workers=4); ~NTCPServer (); void Start (); @@ -169,6 +173,10 @@ namespace transport void SetSessionLimits(uint16_t softLimit, uint16_t hardLimit) { m_SoftLimit = softLimit; m_HardLimit = hardLimit; } bool ShouldLimit() const { return ShouldHardLimit() || ShouldSoftLimit(); } + void Work(std::shared_ptr conn, Pool::WorkFunc work) + { + m_CryptoPool->Offer({conn, work}); + } private: /** @brief return true for hard limit */ @@ -210,6 +218,8 @@ namespace transport boost::asio::ip::tcp::resolver m_Resolver; boost::asio::ip::tcp::endpoint * m_ProxyEndpoint; + std::shared_ptr m_CryptoPool; + uint16_t m_SoftLimit, m_HardLimit; public: diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index 6dcbe56d..9914c75e 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -153,9 +153,10 @@ namespace transport m_Thread = new std::thread (std::bind (&Transports::Run, this)); std::string ntcpproxy; i2p::config::GetOption("ntcpproxy", ntcpproxy); i2p::http::URL proxyurl; - uint16_t softLimit, hardLimit; + uint16_t softLimit, hardLimit, threads; i2p::config::GetOption("limits.ntcpsoft", softLimit); i2p::config::GetOption("limits.ntcphard", hardLimit); + i2p::config::GetOption("limits.ntcpthreads", threads); if(softLimit > 0 && hardLimit > 0 && softLimit >= hardLimit) { LogPrint(eLogError, "ntcp soft limit must be less than ntcp hard limit"); @@ -167,7 +168,7 @@ namespace transport { if(proxyurl.schema == "socks" || proxyurl.schema == "http") { - m_NTCPServer = new NTCPServer(); + m_NTCPServer = new NTCPServer(threads); m_NTCPServer->SetSessionLimits(softLimit, hardLimit); NTCPServer::ProxyType proxytype = NTCPServer::eSocksProxy; @@ -198,7 +199,7 @@ namespace transport if (!address) continue; if (m_NTCPServer == nullptr && enableNTCP) { - m_NTCPServer = new NTCPServer (); + m_NTCPServer = new NTCPServer (threads); m_NTCPServer->SetSessionLimits(softLimit, hardLimit); m_NTCPServer->Start (); if (!(m_NTCPServer->IsBoundV6() || m_NTCPServer->IsBoundV4())) {