Browse Source

common RunnableService

pull/1474/head
orignal 5 years ago
parent
commit
49810eb153
  1. 49
      libi2pd/NTCP2.cpp
  2. 13
      libi2pd/NTCP2.h
  3. 39
      libi2pd/util.cpp
  4. 38
      libi2pd/util.h

49
libi2pd/NTCP2.cpp

@ -1,12 +1,10 @@
/* /*
* Copyright (c) 2013-2018, The PurpleI2P Project * Copyright (c) 2013-2020, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
* See full license text in LICENSE file at top of project tree * See full license text in LICENSE file at top of project tree
* *
* Kovri go write your own code
*
*/ */
#include <openssl/rand.h> #include <openssl/rand.h>
@ -1143,8 +1141,8 @@ namespace transport
} }
NTCP2Server::NTCP2Server (): NTCP2Server::NTCP2Server ():
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), RunnableServiceWithWork ("NTCP2"),
m_TerminationTimer (m_Service) m_TerminationTimer (GetService ())
{ {
} }
@ -1155,10 +1153,9 @@ namespace transport
void NTCP2Server::Start () void NTCP2Server::Start ()
{ {
if (!m_IsRunning) if (!IsRunning ())
{ {
m_IsRunning = true; StartService ();
m_Thread = new std::thread (std::bind (&NTCP2Server::Run, this));
auto& addresses = context.GetRouterInfo ().GetAddresses (); auto& addresses = context.GetRouterInfo ().GetAddresses ();
for (const auto& address: addresses) for (const auto& address: addresses)
{ {
@ -1169,7 +1166,7 @@ namespace transport
{ {
try try
{ {
m_NTCP2Acceptor.reset (new boost::asio::ip::tcp::acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address->port))); m_NTCP2Acceptor.reset (new boost::asio::ip::tcp::acceptor (GetService (), boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address->port)));
} }
catch ( std::exception & ex ) catch ( std::exception & ex )
{ {
@ -1183,7 +1180,7 @@ namespace transport
} }
else if (address->host.is_v6() && context.SupportsV6 ()) else if (address->host.is_v6() && context.SupportsV6 ())
{ {
m_NTCP2V6Acceptor.reset (new boost::asio::ip::tcp::acceptor (m_Service)); m_NTCP2V6Acceptor.reset (new boost::asio::ip::tcp::acceptor (GetService ()));
try try
{ {
m_NTCP2V6Acceptor->open (boost::asio::ip::tcp::v6()); m_NTCP2V6Acceptor->open (boost::asio::ip::tcp::v6());
@ -1218,33 +1215,9 @@ namespace transport
} }
m_NTCP2Sessions.clear (); m_NTCP2Sessions.clear ();
if (m_IsRunning) if (IsRunning ())
{
m_IsRunning = false;
m_TerminationTimer.cancel (); m_TerminationTimer.cancel ();
m_Service.stop (); StopService ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
}
}
}
void NTCP2Server::Run ()
{
while (m_IsRunning)
{
try
{
m_Service.run ();
}
catch (std::exception& ex)
{
LogPrint (eLogError, "NTCP2: runtime exception: ", ex.what ());
}
}
} }
bool NTCP2Server::AddNTCP2Session (std::shared_ptr<NTCP2Session> session, bool incoming) bool NTCP2Server::AddNTCP2Session (std::shared_ptr<NTCP2Session> session, bool incoming)
@ -1282,11 +1255,11 @@ namespace transport
void NTCP2Server::Connect(const boost::asio::ip::address & address, uint16_t port, std::shared_ptr<NTCP2Session> conn) void NTCP2Server::Connect(const boost::asio::ip::address & address, uint16_t port, std::shared_ptr<NTCP2Session> conn)
{ {
LogPrint (eLogDebug, "NTCP2: Connecting to ", address ,":", port); LogPrint (eLogDebug, "NTCP2: Connecting to ", address ,":", port);
m_Service.post([this, address, port, conn]() GetService ().post([this, address, port, conn]()
{ {
if (this->AddNTCP2Session (conn)) if (this->AddNTCP2Session (conn))
{ {
auto timer = std::make_shared<boost::asio::deadline_timer>(m_Service); auto timer = std::make_shared<boost::asio::deadline_timer>(GetService ());
auto timeout = NTCP2_CONNECT_TIMEOUT * 5; auto timeout = NTCP2_CONNECT_TIMEOUT * 5;
conn->SetTerminationTimeout(timeout * 2); conn->SetTerminationTimeout(timeout * 2);
timer->expires_from_now (boost::posix_time::seconds(timeout)); timer->expires_from_now (boost::posix_time::seconds(timeout));

13
libi2pd/NTCP2.h

@ -1,12 +1,10 @@
/* /*
* Copyright (c) 2013-2018, The PurpleI2P Project * Copyright (c) 2013-2020, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
* See full license text in LICENSE file at top of project tree * See full license text in LICENSE file at top of project tree
* *
* Kovri go write your own code
*
*/ */
#ifndef NTCP2_H__ #ifndef NTCP2_H__
#define NTCP2_H__ #define NTCP2_H__
@ -218,7 +216,7 @@ namespace transport
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
}; };
class NTCP2Server class NTCP2Server: public i2p::util::RunnableServiceWithWork
{ {
public: public:
@ -231,14 +229,11 @@ namespace transport
bool AddNTCP2Session (std::shared_ptr<NTCP2Session> session, bool incoming = false); bool AddNTCP2Session (std::shared_ptr<NTCP2Session> session, bool incoming = false);
void RemoveNTCP2Session (std::shared_ptr<NTCP2Session> session); void RemoveNTCP2Session (std::shared_ptr<NTCP2Session> session);
std::shared_ptr<NTCP2Session> FindNTCP2Session (const i2p::data::IdentHash& ident); std::shared_ptr<NTCP2Session> FindNTCP2Session (const i2p::data::IdentHash& ident);
boost::asio::io_service& GetService () { return m_Service; };
void Connect(const boost::asio::ip::address & address, uint16_t port, std::shared_ptr<NTCP2Session> conn); void Connect(const boost::asio::ip::address & address, uint16_t port, std::shared_ptr<NTCP2Session> conn);
private: private:
void Run ();
void HandleAccept (std::shared_ptr<NTCP2Session> conn, const boost::system::error_code& error); void HandleAccept (std::shared_ptr<NTCP2Session> conn, const boost::system::error_code& error);
void HandleAcceptV6 (std::shared_ptr<NTCP2Session> conn, const boost::system::error_code& error); void HandleAcceptV6 (std::shared_ptr<NTCP2Session> conn, const boost::system::error_code& error);
@ -250,10 +245,6 @@ namespace transport
private: private:
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work;
boost::asio::deadline_timer m_TerminationTimer; boost::asio::deadline_timer m_TerminationTimer;
std::unique_ptr<boost::asio::ip::tcp::acceptor> m_NTCP2Acceptor, m_NTCP2V6Acceptor; std::unique_ptr<boost::asio::ip::tcp::acceptor> m_NTCP2Acceptor, m_NTCP2V6Acceptor;
std::map<i2p::data::IdentHash, std::shared_ptr<NTCP2Session> > m_NTCP2Sessions; std::map<i2p::data::IdentHash, std::shared_ptr<NTCP2Session> > m_NTCP2Sessions;

39
libi2pd/util.cpp

@ -57,6 +57,45 @@ namespace i2p
{ {
namespace util namespace util
{ {
void RunnableService::StartService ()
{
if (!m_IsRunning)
{
m_IsRunning = true;
m_Thread.reset (new std::thread (std::bind (& RunnableService::Run, this)));
}
}
void RunnableService::StopService ()
{
if (m_IsRunning)
{
m_IsRunning = false;
m_Service.stop ();
if (m_Thread)
{
m_Thread->join ();
m_Thread = nullptr;
}
}
}
void RunnableService::Run ()
{
while (m_IsRunning)
{
try
{
m_Service.run ();
}
catch (std::exception& ex)
{
LogPrint (eLogError, m_Name, ": runtime exception: ", ex.what ());
}
}
}
namespace net namespace net
{ {
#ifdef WIN32 #ifdef WIN32

38
libi2pd/util.h

@ -5,6 +5,7 @@
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <thread>
#include <utility> #include <utility>
#include <boost/asio.hpp> #include <boost/asio.hpp>
@ -122,6 +123,43 @@ namespace util
std::mutex m_Mutex; std::mutex m_Mutex;
}; };
class RunnableService
{
public:
RunnableService (const std::string& name): m_Name (name), m_IsRunning (false) {}
virtual ~RunnableService () {}
boost::asio::io_service& GetService () { return m_Service; }
bool IsRunning () const { return m_IsRunning; };
void StartService ();
void StopService ();
private:
void Run ();
private:
std::string m_Name;
bool m_IsRunning;
std::unique_ptr<std::thread> m_Thread;
boost::asio::io_service m_Service;
};
class RunnableServiceWithWork: public RunnableService
{
public:
RunnableServiceWithWork (const std::string& name):
RunnableService (name), m_Work (GetService ()) {}
private:
boost::asio::io_service::work m_Work;
};
namespace net namespace net
{ {
int GetMTU (const boost::asio::ip::address& localAddress); int GetMTU (const boost::asio::ip::address& localAddress);

Loading…
Cancel
Save