diff --git a/libi2pd/Config.cpp b/libi2pd/Config.cpp index 7668d6a7..b8016a7c 100644 --- a/libi2pd/Config.cpp +++ b/libi2pd/Config.cpp @@ -152,6 +152,7 @@ namespace config { ("i2cp.enabled", value()->default_value(false), "Enable or disable I2CP") ("i2cp.address", value()->default_value("127.0.0.1"), "I2CP listen address") ("i2cp.port", value()->default_value(7654), "I2CP listen port") + ("i2cp.singlethread", value()->default_value(true), "Destinations run in the I2CP server's thread") ; options_description i2pcontrol("I2PControl options"); diff --git a/libi2pd_client/ClientContext.cpp b/libi2pd_client/ClientContext.cpp index a70add7d..e83e293a 100644 --- a/libi2pd_client/ClientContext.cpp +++ b/libi2pd_client/ClientContext.cpp @@ -102,10 +102,11 @@ namespace client { std::string i2cpAddr; i2p::config::GetOption("i2cp.address", i2cpAddr); uint16_t i2cpPort; i2p::config::GetOption("i2cp.port", i2cpPort); + bool singleThread; i2p::config::GetOption("i2cp.singlethread", singleThread); LogPrint(eLogInfo, "Clients: starting I2CP at ", i2cpAddr, ":", i2cpPort); try { - m_I2CPServer = new I2CPServer (i2cpAddr, i2cpPort); + m_I2CPServer = new I2CPServer (i2cpAddr, i2cpPort, singleThread); m_I2CPServer->Start (); } catch (std::exception& e) diff --git a/libi2pd_client/I2CP.cpp b/libi2pd_client/I2CP.cpp index b0f334f5..24c2496b 100644 --- a/libi2pd_client/I2CP.cpp +++ b/libi2pd_client/I2CP.cpp @@ -23,36 +23,13 @@ namespace i2p namespace client { - I2CPDestination::I2CPDestination (std::shared_ptr owner, std::shared_ptr identity, bool isPublic, const std::map& params): - RunnableService ("I2CP"), LeaseSetDestination (GetIOService (), isPublic, ¶ms), + I2CPDestination::I2CPDestination (boost::asio::io_service& service, std::shared_ptr owner, + std::shared_ptr identity, bool isPublic, const std::map& params): + LeaseSetDestination (service, isPublic, ¶ms), m_Owner (owner), m_Identity (identity), m_EncryptionKeyType (m_Identity->GetCryptoKeyType ()) { } - I2CPDestination::~I2CPDestination () - { - if (IsRunning ()) - Stop (); - } - - void I2CPDestination::Start () - { - if (!IsRunning ()) - { - LeaseSetDestination::Start (); - StartIOService (); - } - } - - void I2CPDestination::Stop () - { - if (IsRunning ()) - { - LeaseSetDestination::Stop (); - StopIOService (); - } - } - void I2CPDestination::SetEncryptionPrivateKey (const uint8_t * key) { m_Decryptor = i2p::data::PrivateKeys::CreateDecryptor (m_Identity->GetCryptoKeyType (), key); @@ -217,6 +194,37 @@ namespace client } } + RunnableI2CPDestination::RunnableI2CPDestination (std::shared_ptr owner, + std::shared_ptr identity, bool isPublic, const std::map& params): + RunnableService ("I2CP"), + I2CPDestination (GetIOService (), owner, identity, isPublic, params) + { + } + + RunnableI2CPDestination::~RunnableI2CPDestination () + { + if (IsRunning ()) + Stop (); + } + + void RunnableI2CPDestination::Start () + { + if (!IsRunning ()) + { + I2CPDestination::Start (); + StartIOService (); + } + } + + void RunnableI2CPDestination::Stop () + { + if (IsRunning ()) + { + I2CPDestination::Stop (); + StopIOService (); + } + } + I2CPSession::I2CPSession (I2CPServer& owner, std::shared_ptr socket): m_Owner (owner), m_Socket (socket), m_Payload (nullptr), m_SessionID (0xFFFF), m_MessageID (0), m_IsSendAccepted (true) @@ -451,7 +459,9 @@ namespace client if (params[I2CP_PARAM_DONT_PUBLISH_LEASESET] == "true") isPublic = false; if (!m_Destination) { - m_Destination = std::make_shared(shared_from_this (), identity, isPublic, params); + m_Destination = m_Owner.IsSingleThread () ? + std::make_shared(m_Owner.GetService (), shared_from_this (), identity, isPublic, params): + std::make_shared(shared_from_this (), identity, isPublic, params); SendSessionStatusMessage (1); // created LogPrint (eLogDebug, "I2CP: session ", m_SessionID, " created"); m_Destination->Start (); @@ -800,9 +810,9 @@ namespace client std::placeholders::_1, std::placeholders::_2, buf)); } - I2CPServer::I2CPServer (const std::string& interface, int port): - m_IsRunning (false), m_Thread (nullptr), - m_Acceptor (m_Service, + I2CPServer::I2CPServer (const std::string& interface, int port, bool isSingleThread): + RunnableService ("I2CP"), m_IsSingleThread (isSingleThread), + m_Acceptor (GetIOService (), #ifdef ANDROID I2CPSession::proto::endpoint(std::string (1, '\0') + interface)) // leading 0 for abstract address #else @@ -825,20 +835,18 @@ namespace client I2CPServer::~I2CPServer () { - if (m_IsRunning) + if (IsRunning ()) Stop (); } void I2CPServer::Start () { Accept (); - m_IsRunning = true; - m_Thread = new std::thread (std::bind (&I2CPServer::Run, this)); + StartIOService (); } void I2CPServer::Stop () { - m_IsRunning = false; m_Acceptor.cancel (); { auto sessions = m_Sessions; @@ -846,33 +854,12 @@ namespace client it.second->Stop (); } m_Sessions.clear (); - m_Service.stop (); - if (m_Thread) - { - m_Thread->join (); - delete m_Thread; - m_Thread = nullptr; - } - } - - void I2CPServer::Run () - { - while (m_IsRunning) - { - try - { - m_Service.run (); - } - catch (std::exception& ex) - { - LogPrint (eLogError, "I2CP: runtime exception: ", ex.what ()); - } - } + StopIOService (); } void I2CPServer::Accept () { - auto newSocket = std::make_shared (m_Service); + auto newSocket = std::make_shared (GetIOService ()); m_Acceptor.async_accept (*newSocket, std::bind (&I2CPServer::HandleAccept, this, std::placeholders::_1, newSocket)); } diff --git a/libi2pd_client/I2CP.h b/libi2pd_client/I2CP.h index adb648f4..4c6b7531 100644 --- a/libi2pd_client/I2CP.h +++ b/libi2pd_client/I2CP.h @@ -63,16 +63,14 @@ namespace client const char I2CP_PARAM_MESSAGE_RELIABILITY[] = "i2cp.messageReliability"; class I2CPSession; - class I2CPDestination: private i2p::util::RunnableService, public LeaseSetDestination + class I2CPDestination: public LeaseSetDestination { public: - I2CPDestination (std::shared_ptr owner, std::shared_ptr identity, bool isPublic, const std::map& params); - ~I2CPDestination (); - - void Start (); - void Stop (); - + I2CPDestination (boost::asio::io_service& service, std::shared_ptr owner, + std::shared_ptr identity, bool isPublic, const std::map& params); + ~I2CPDestination () {}; + void SetEncryptionPrivateKey (const uint8_t * key); void SetEncryptionType (i2p::data::CryptoKeyType keyType) { m_EncryptionKeyType = keyType; }; void SetECIESx25519EncryptionPrivateKey (const uint8_t * key); @@ -109,6 +107,18 @@ namespace client uint64_t m_LeaseSetExpirationTime; }; + class RunnableI2CPDestination: private i2p::util::RunnableService, public I2CPDestination + { + public: + + RunnableI2CPDestination (std::shared_ptr owner, std::shared_ptr identity, + bool isPublic, const std::map& params); + ~RunnableI2CPDestination (); + + void Start (); + void Stop (); + }; + class I2CPServer; class I2CPSession: public std::enable_shared_from_this { @@ -179,17 +189,18 @@ namespace client }; typedef void (I2CPSession::*I2CPMessageHandler)(const uint8_t * buf, size_t len); - class I2CPServer + class I2CPServer: private i2p::util::RunnableService { public: - I2CPServer (const std::string& interface, int port); + I2CPServer (const std::string& interface, int port, bool isSingleThread); ~I2CPServer (); void Start (); void Stop (); - boost::asio::io_service& GetService () { return m_Service; }; - + boost::asio::io_service& GetService () { return GetIOService (); }; + bool IsSingleThread () const { return m_IsSingleThread; }; + bool InsertSession (std::shared_ptr session); void RemoveSession (uint16_t sessionID); @@ -203,12 +214,10 @@ namespace client private: + bool m_IsSingleThread; I2CPMessageHandler m_MessagesHandlers[256]; std::map > m_Sessions; - bool m_IsRunning; - std::thread * m_Thread; - boost::asio::io_service m_Service; I2CPSession::proto::acceptor m_Acceptor; public: