diff --git a/libi2pd/CryptoWorker.h b/libi2pd/CryptoWorker.h new file mode 100644 index 00000000..f310032f --- /dev/null +++ b/libi2pd/CryptoWorker.h @@ -0,0 +1,82 @@ +#ifndef CRYPTO_WORKER_H_ +#define CRYPTO_WORKER_H_ + +#include +#include +#include +#include +#include +#include +#include + +namespace i2p +{ +namespace worker +{ + template + struct ThreadPool + { + typedef std::function ResultFunc; + typedef std::function WorkFunc; + typedef std::pair, WorkFunc> Job; + typedef std::mutex mtx_t; + typedef std::unique_lock lock_t; + typedef std::condition_variable cond_t; + ThreadPool(int workers) + { + stop = false; + if(workers > 0) + { + while(workers--) + { + threads.emplace_back([this] { + Job job; + for (;;) + { + { + lock_t lock(this->queue_mutex); + this->condition.wait( + lock, [this] { return this->stop || !this->jobs.empty(); }); + if (this->stop && this->jobs.empty()) return; + job = std::move(this->jobs.front()); + this->jobs.pop_front(); + } + } + job.first->GetService().post(job.second()); + }); + } + } + }; + + void Offer(const Job & job) + { + { + lock_t lock(queue_mutex); + if (stop) return; + jobs.emplace_back(job); + } + condition.notify_one(); + } + + ~ThreadPool() + { + { + lock_t lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for(auto &t: threads) t.join(); + } + + std::vector threads; + std::deque jobs; + mtx_t queue_mutex; + cond_t condition; + bool stop; + + }; +} +} + + +#endif diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index 85c32743..653c24e5 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -171,27 +171,14 @@ namespace transport return; } } -#if ((__GNUC__ == 4) && (__GNUC_MINOR__ <= 7)) || defined(__NetBSD__) -// due the bug in gcc 4.7. std::shared_future.get() is not const - if (!m_DHKeysPair) - m_DHKeysPair = transports.GetNextDHKeysPair (); - CreateAESKey (m_Establisher->phase1.pubKey); - SendPhase2 (); -#else // TODO: check for number of pending keys auto s = shared_from_this (); - auto keyCreated = std::async (std::launch::async, [s] () - { + m_Server.Work(s, [s]() -> std::function { if (!s->m_DHKeysPair) s->m_DHKeysPair = transports.GetNextDHKeysPair (); s->CreateAESKey (s->m_Establisher->phase1.pubKey); - }).share (); - m_Server.GetService ().post ([s, keyCreated]() - { - keyCreated.get (); - s->SendPhase2 (); + return std::bind(&NTCPSession::SendPhase2, s); }); -#endif } } @@ -788,12 +775,14 @@ namespace transport } //----------------------------------------- - NTCPServer::NTCPServer (): + NTCPServer::NTCPServer (int workers): m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr), m_ProxyType(eNoProxy), m_Resolver(m_Service), m_ProxyEndpoint(nullptr), m_SoftLimit(0), m_HardLimit(0) { + if(workers <= 0) workers = 1; + m_CryptoPool = std::make_shared(workers); } NTCPServer::~NTCPServer () diff --git a/libi2pd/NTCPSession.h b/libi2pd/NTCPSession.h index b64f63aa..12ca92fc 100644 --- a/libi2pd/NTCPSession.h +++ b/libi2pd/NTCPSession.h @@ -12,6 +12,7 @@ #include "RouterInfo.h" #include "I2NPProtocol.h" #include "TransportSession.h" +#include "CryptoWorker.h" namespace i2p { @@ -131,6 +132,8 @@ namespace transport { public: + typedef i2p::worker::ThreadPool Pool; + enum RemoteAddressType { eIP4Address, @@ -146,7 +149,7 @@ namespace transport }; - NTCPServer (); + NTCPServer (int workers=4); ~NTCPServer (); void Start (); @@ -193,6 +196,11 @@ namespace transport void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode); + void Work(std::shared_ptr conn, Pool::WorkFunc work) + { + m_CryptoPool->Offer({conn, work}); + } + private: bool m_IsRunning; @@ -210,6 +218,8 @@ namespace transport boost::asio::ip::tcp::resolver m_Resolver; boost::asio::ip::tcp::endpoint * m_ProxyEndpoint; + std::shared_ptr m_CryptoPool; + uint16_t m_SoftLimit, m_HardLimit; public: