diff --git a/libi2pd/CryptoWorker.h b/libi2pd/CryptoWorker.h index 3188fa03..d43e356c 100644 --- a/libi2pd/CryptoWorker.h +++ b/libi2pd/CryptoWorker.h @@ -12,69 +12,68 @@ namespace i2p { namespace worker { - template - struct ThreadPool - { - typedef std::function ResultFunc; - typedef std::function WorkFunc; - typedef std::pair, WorkFunc> Job; - typedef std::mutex mtx_t; - typedef std::unique_lock lock_t; - typedef std::condition_variable cond_t; - ThreadPool(int workers) - { - stop = false; - if(workers > 0) - { - while(workers--) - { - threads.emplace_back([this] { - for (;;) - { - Job job; - { - lock_t lock(this->queue_mutex); - this->condition.wait( - lock, [this] { return this->stop || !this->jobs.empty(); }); - if (this->stop && this->jobs.empty()) return; - job = std::move(this->jobs.front()); - this->jobs.pop_front(); - } - ResultFunc result = job.second(); - job.first->GetService().post(result); - } - }); - } - } - }; + template + struct ThreadPool + { + typedef std::function ResultFunc; + typedef std::function WorkFunc; + typedef std::pair, WorkFunc> Job; + typedef std::mutex mtx_t; + typedef std::unique_lock lock_t; + typedef std::condition_variable cond_t; + ThreadPool(int workers) + { + stop = false; + if(workers > 0) + { + while(workers--) + { + threads.emplace_back([this] { + for (;;) + { + Job job; + { + lock_t lock(this->queue_mutex); + this->condition.wait( + lock, [this] { return this->stop || !this->jobs.empty(); }); + if (this->stop && this->jobs.empty()) return; + job = std::move(this->jobs.front()); + this->jobs.pop_front(); + } + ResultFunc result = job.second(); + job.first->GetService().post(result); + } + }); + } + } + }; - void Offer(const Job & job) - { - { - lock_t lock(queue_mutex); - if (stop) return; - jobs.emplace_back(job); - } - condition.notify_one(); - } + void Offer(const Job & job) + { + { + lock_t lock(queue_mutex); + if (stop) return; + jobs.emplace_back(job); + } + condition.notify_one(); + } - ~ThreadPool() - { - { - lock_t lock(queue_mutex); - stop = true; - } - condition.notify_all(); - for(auto &t: threads) t.join(); - } + ~ThreadPool() + { + { + lock_t lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for(auto &t: threads) t.join(); + } - std::vector threads; - std::deque jobs; - mtx_t queue_mutex; - cond_t condition; - bool stop; - - }; + std::vector threads; + std::deque jobs; + mtx_t queue_mutex; + cond_t condition; + bool stop; + }; } } diff --git a/libi2pd/NTCPSession.cpp b/libi2pd/NTCPSession.cpp index ba657bed..b7268818 100644 --- a/libi2pd/NTCPSession.cpp +++ b/libi2pd/NTCPSession.cpp @@ -109,7 +109,7 @@ namespace transport { return m_Server.GetService(); } - + void NTCPSession::ClientLogin () { if (!m_DHKeysPair) @@ -202,7 +202,8 @@ namespace transport m_Encryption.SetIV (y + 240); m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16); m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); - boost::asio::async_write(m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all(), std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this(), std::placeholders::_1, std::placeholders::_2, tsB)); + boost::asio::async_write(m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all(), + std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this(), std::placeholders::_1, std::placeholders::_2, tsB)); } void NTCPSession::HandlePhase2Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB) @@ -299,7 +300,7 @@ namespace transport s.Sign (keys, buf); m_Encryption.Encrypt(m_ReceiveBuffer, len, m_ReceiveBuffer); boost::asio::async_write (m_Socket, boost::asio::buffer (m_ReceiveBuffer, len), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); + std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA)); } void NTCPSession::HandlePhase3Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA) diff --git a/libi2pd/NTCPSession.h b/libi2pd/NTCPSession.h index fae9874e..5335fbdd 100644 --- a/libi2pd/NTCPSession.h +++ b/libi2pd/NTCPSession.h @@ -102,7 +102,7 @@ namespace transport void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector > msgs); private: - + NTCPServer& m_Server; boost::asio::ip::tcp::socket m_Socket; bool m_IsEstablished, m_IsTerminated; @@ -133,7 +133,7 @@ namespace transport { public: - typedef i2p::worker::ThreadPool Pool; + typedef i2p::worker::ThreadPool Pool; enum RemoteAddressType { @@ -177,7 +177,6 @@ namespace transport { m_CryptoPool->Offer({conn, work}); } - private: /** @brief return true for hard limit */ @@ -203,7 +202,7 @@ namespace transport void HandleTerminationTimer (const boost::system::error_code& ecode); private: - + bool m_IsRunning; std::thread * m_Thread; boost::asio::io_service m_Service; @@ -219,8 +218,8 @@ namespace transport boost::asio::ip::tcp::resolver m_Resolver; boost::asio::ip::tcp::endpoint * m_ProxyEndpoint; - std::shared_ptr m_CryptoPool; - + std::shared_ptr m_CryptoPool; + uint16_t m_SoftLimit, m_HardLimit; public: