Browse Source

Merge pull request #1122 from majestrate/ntcp-threadpool

NTCP threadpool
pull/1128/head
orignal 7 years ago committed by GitHub
parent
commit
c7accd4a5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      libi2pd/Config.cpp
  2. 81
      libi2pd/CryptoWorker.h
  3. 52
      libi2pd/NTCPSession.cpp
  4. 12
      libi2pd/NTCPSession.h
  5. 7
      libi2pd/Transports.cpp

1
libi2pd/Config.cpp

@ -73,6 +73,7 @@ namespace config {
("limits.transittunnels", value<uint16_t>()->default_value(2500), "Maximum active transit sessions (default:2500)") ("limits.transittunnels", value<uint16_t>()->default_value(2500), "Maximum active transit sessions (default:2500)")
("limits.ntcpsoft", value<uint16_t>()->default_value(0), "Threshold to start probabalistic backoff with ntcp sessions (default: use system limit)") ("limits.ntcpsoft", value<uint16_t>()->default_value(0), "Threshold to start probabalistic backoff with ntcp sessions (default: use system limit)")
("limits.ntcphard", value<uint16_t>()->default_value(0), "Maximum number of ntcp sessions (default: use system limit)") ("limits.ntcphard", value<uint16_t>()->default_value(0), "Maximum number of ntcp sessions (default: use system limit)")
("limits.ntcpthreads", value<uint16_t>()->default_value(1), "Maximum number of threads used by NTCP DH worker (default: 1)")
; ;
options_description httpserver("HTTP Server options"); options_description httpserver("HTTP Server options");

81
libi2pd/CryptoWorker.h

@ -0,0 +1,81 @@
#ifndef CRYPTO_WORKER_H_
#define CRYPTO_WORKER_H_
#include <condition_variable>
#include <mutex>
#include <deque>
#include <thread>
#include <vector>
#include <memory>
namespace i2p
{
namespace worker
{
template<typename Caller>
struct ThreadPool
{
typedef std::function<void(void)> ResultFunc;
typedef std::function<ResultFunc(void)> WorkFunc;
typedef std::pair<std::shared_ptr<Caller>, WorkFunc> Job;
typedef std::mutex mtx_t;
typedef std::unique_lock<mtx_t> lock_t;
typedef std::condition_variable cond_t;
ThreadPool(int workers)
{
stop = false;
if(workers > 0)
{
while(workers--)
{
threads.emplace_back([this] {
for (;;)
{
Job job;
{
lock_t lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->jobs.empty(); });
if (this->stop && this->jobs.empty()) return;
job = std::move(this->jobs.front());
this->jobs.pop_front();
}
ResultFunc result = job.second();
job.first->GetService().post(result);
}
});
}
}
};
void Offer(const Job & job)
{
{
lock_t lock(queue_mutex);
if (stop) return;
jobs.emplace_back(job);
}
condition.notify_one();
}
~ThreadPool()
{
{
lock_t lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(auto &t: threads) t.join();
}
std::vector<std::thread> threads;
std::deque<Job> jobs;
mtx_t queue_mutex;
cond_t condition;
bool stop;
};
}
}
#endif

52
libi2pd/NTCPSession.cpp

@ -105,6 +105,11 @@ namespace transport
transports.PeerConnected (shared_from_this ()); transports.PeerConnected (shared_from_this ());
} }
boost::asio::io_service & NTCPSession::GetService()
{
return m_Server.GetService();
}
void NTCPSession::ClientLogin () void NTCPSession::ClientLogin ()
{ {
if (!m_DHKeysPair) if (!m_DHKeysPair)
@ -171,27 +176,14 @@ namespace transport
return; return;
} }
} }
#if ((__GNUC__ == 4) && (__GNUC_MINOR__ <= 7)) || defined(__NetBSD__)
// due the bug in gcc 4.7. std::shared_future.get() is not const
if (!m_DHKeysPair)
m_DHKeysPair = transports.GetNextDHKeysPair ();
CreateAESKey (m_Establisher->phase1.pubKey);
SendPhase2 ();
#else
// TODO: check for number of pending keys // TODO: check for number of pending keys
auto s = shared_from_this (); auto s = shared_from_this ();
auto keyCreated = std::async (std::launch::async, [s] () m_Server.Work(s, [s]() -> std::function<void(void)> {
{
if (!s->m_DHKeysPair) if (!s->m_DHKeysPair)
s->m_DHKeysPair = transports.GetNextDHKeysPair (); s->m_DHKeysPair = transports.GetNextDHKeysPair ();
s->CreateAESKey (s->m_Establisher->phase1.pubKey); s->CreateAESKey (s->m_Establisher->phase1.pubKey);
}).share (); return std::bind(&NTCPSession::SendPhase2, s);
m_Server.GetService ().post ([s, keyCreated]()
{
keyCreated.get ();
s->SendPhase2 ();
}); });
#endif
} }
} }
@ -211,9 +203,8 @@ namespace transport
m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16); m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16);
m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted);
boost::asio::async_write (m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all (), boost::asio::async_write(m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all(),
std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsB)); std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this(), std::placeholders::_1, std::placeholders::_2, tsB));
} }
void NTCPSession::HandlePhase2Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB) void NTCPSession::HandlePhase2Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB)
@ -250,24 +241,11 @@ namespace transport
} }
else else
{ {
#if ((__GNUC__ == 4) && (__GNUC_MINOR__ <= 7)) || defined(__NetBSD__)
// due the bug in gcc 4.7. std::shared_future.get() is not const
CreateAESKey (m_Establisher->phase2.pubKey);
HandlePhase2 ();
#else
auto s = shared_from_this (); auto s = shared_from_this ();
// create AES key in separate thread m_Server.Work(s, [s]() -> std::function<void(void)> {
auto keyCreated = std::async (std::launch::async, [s] () s->CreateAESKey (s->m_Establisher->phase2.pubKey);
{ return std::bind(&NTCPSession::HandlePhase2, s);
s->CreateAESKey (s->m_Establisher->phase2.pubKey); });
}).share (); // TODO: use move capture in C++ 14 instead shared_future
// let other operations execute while a key gets created
m_Server.GetService ().post ([s, keyCreated]()
{
keyCreated.get (); // we might wait if no more pending operations
s->HandlePhase2 ();
});
#endif
} }
} }
@ -788,12 +766,14 @@ namespace transport
} }
//----------------------------------------- //-----------------------------------------
NTCPServer::NTCPServer (): NTCPServer::NTCPServer (int workers):
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service),
m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr), m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr),
m_ProxyType(eNoProxy), m_Resolver(m_Service), m_ProxyEndpoint(nullptr), m_ProxyType(eNoProxy), m_Resolver(m_Service), m_ProxyEndpoint(nullptr),
m_SoftLimit(0), m_HardLimit(0) m_SoftLimit(0), m_HardLimit(0)
{ {
if(workers <= 0) workers = 1;
m_CryptoPool = std::make_shared<Pool>(workers);
} }
NTCPServer::~NTCPServer () NTCPServer::~NTCPServer ()

12
libi2pd/NTCPSession.h

@ -12,6 +12,7 @@
#include "RouterInfo.h" #include "RouterInfo.h"
#include "I2NPProtocol.h" #include "I2NPProtocol.h"
#include "TransportSession.h" #include "TransportSession.h"
#include "CryptoWorker.h"
namespace i2p namespace i2p
{ {
@ -55,6 +56,7 @@ namespace transport
void Done (); void Done ();
boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; };
boost::asio::io_service & GetService();
bool IsEstablished () const { return m_IsEstablished; }; bool IsEstablished () const { return m_IsEstablished; };
bool IsTerminated () const { return m_IsTerminated; }; bool IsTerminated () const { return m_IsTerminated; };
@ -131,6 +133,8 @@ namespace transport
{ {
public: public:
typedef i2p::worker::ThreadPool<NTCPSession> Pool;
enum RemoteAddressType enum RemoteAddressType
{ {
eIP4Address, eIP4Address,
@ -146,7 +150,7 @@ namespace transport
}; };
NTCPServer (); NTCPServer (int workers=4);
~NTCPServer (); ~NTCPServer ();
void Start (); void Start ();
@ -169,6 +173,10 @@ namespace transport
void SetSessionLimits(uint16_t softLimit, uint16_t hardLimit) { m_SoftLimit = softLimit; m_HardLimit = hardLimit; } void SetSessionLimits(uint16_t softLimit, uint16_t hardLimit) { m_SoftLimit = softLimit; m_HardLimit = hardLimit; }
bool ShouldLimit() const { return ShouldHardLimit() || ShouldSoftLimit(); } bool ShouldLimit() const { return ShouldHardLimit() || ShouldSoftLimit(); }
void Work(std::shared_ptr<NTCPSession> conn, Pool::WorkFunc work)
{
m_CryptoPool->Offer({conn, work});
}
private: private:
/** @brief return true for hard limit */ /** @brief return true for hard limit */
@ -210,6 +218,8 @@ namespace transport
boost::asio::ip::tcp::resolver m_Resolver; boost::asio::ip::tcp::resolver m_Resolver;
boost::asio::ip::tcp::endpoint * m_ProxyEndpoint; boost::asio::ip::tcp::endpoint * m_ProxyEndpoint;
std::shared_ptr<Pool> m_CryptoPool;
uint16_t m_SoftLimit, m_HardLimit; uint16_t m_SoftLimit, m_HardLimit;
public: public:

7
libi2pd/Transports.cpp

@ -153,9 +153,10 @@ namespace transport
m_Thread = new std::thread (std::bind (&Transports::Run, this)); m_Thread = new std::thread (std::bind (&Transports::Run, this));
std::string ntcpproxy; i2p::config::GetOption("ntcpproxy", ntcpproxy); std::string ntcpproxy; i2p::config::GetOption("ntcpproxy", ntcpproxy);
i2p::http::URL proxyurl; i2p::http::URL proxyurl;
uint16_t softLimit, hardLimit; uint16_t softLimit, hardLimit, threads;
i2p::config::GetOption("limits.ntcpsoft", softLimit); i2p::config::GetOption("limits.ntcpsoft", softLimit);
i2p::config::GetOption("limits.ntcphard", hardLimit); i2p::config::GetOption("limits.ntcphard", hardLimit);
i2p::config::GetOption("limits.ntcpthreads", threads);
if(softLimit > 0 && hardLimit > 0 && softLimit >= hardLimit) if(softLimit > 0 && hardLimit > 0 && softLimit >= hardLimit)
{ {
LogPrint(eLogError, "ntcp soft limit must be less than ntcp hard limit"); LogPrint(eLogError, "ntcp soft limit must be less than ntcp hard limit");
@ -167,7 +168,7 @@ namespace transport
{ {
if(proxyurl.schema == "socks" || proxyurl.schema == "http") if(proxyurl.schema == "socks" || proxyurl.schema == "http")
{ {
m_NTCPServer = new NTCPServer(); m_NTCPServer = new NTCPServer(threads);
m_NTCPServer->SetSessionLimits(softLimit, hardLimit); m_NTCPServer->SetSessionLimits(softLimit, hardLimit);
NTCPServer::ProxyType proxytype = NTCPServer::eSocksProxy; NTCPServer::ProxyType proxytype = NTCPServer::eSocksProxy;
@ -198,7 +199,7 @@ namespace transport
if (!address) continue; if (!address) continue;
if (m_NTCPServer == nullptr && enableNTCP) if (m_NTCPServer == nullptr && enableNTCP)
{ {
m_NTCPServer = new NTCPServer (); m_NTCPServer = new NTCPServer (threads);
m_NTCPServer->SetSessionLimits(softLimit, hardLimit); m_NTCPServer->SetSessionLimits(softLimit, hardLimit);
m_NTCPServer->Start (); m_NTCPServer->Start ();
if (!(m_NTCPServer->IsBoundV6() || m_NTCPServer->IsBoundV4())) { if (!(m_NTCPServer->IsBoundV6() || m_NTCPServer->IsBoundV4())) {

Loading…
Cancel
Save