Browse Source

common RuunableBase with private inheritance

pull/1474/head
orignal 5 years ago
parent
commit
969f9aa436
  1. 6
      libi2pd/Destination.cpp
  2. 3
      libi2pd/Destination.h
  3. 4
      libi2pd/NTCP2.cpp
  4. 3
      libi2pd/NTCP2.h
  5. 4
      libi2pd/util.cpp
  6. 12
      libi2pd/util.h
  7. 34
      libi2pd_client/BOB.cpp
  8. 9
      libi2pd_client/BOB.h
  9. 3
      libi2pd_client/ClientContext.cpp
  10. 6
      libi2pd_client/I2CP.cpp
  11. 2
      libi2pd_client/I2CP.h
  12. 36
      libi2pd_client/SAM.cpp
  13. 10
      libi2pd_client/SAM.h

6
libi2pd/Destination.cpp

@ -823,7 +823,7 @@ namespace client
} }
ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params): ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params):
RunnableService ("Destination"), LeaseSetDestination (GetService (), isPublic, params), RunnableService ("Destination"), LeaseSetDestination (GetIOService (), isPublic, params),
m_Keys (keys), m_StreamingAckDelay (DEFAULT_INITIAL_ACK_DELAY), m_Keys (keys), m_StreamingAckDelay (DEFAULT_INITIAL_ACK_DELAY),
m_DatagramDestination (nullptr), m_RefCounter (0), m_DatagramDestination (nullptr), m_RefCounter (0),
m_ReadyChecker(GetService()) m_ReadyChecker(GetService())
@ -905,7 +905,7 @@ namespace client
m_StreamingDestination->Start (); m_StreamingDestination->Start ();
for (auto& it: m_StreamingDestinationsByPorts) for (auto& it: m_StreamingDestinationsByPorts)
it.second->Start (); it.second->Start ();
StartService (); StartIOService ();
} }
} }
@ -929,7 +929,7 @@ namespace client
delete m_DatagramDestination; delete m_DatagramDestination;
m_DatagramDestination = nullptr; m_DatagramDestination = nullptr;
} }
StopService (); StopIOService ();
} }
} }

3
libi2pd/Destination.h

@ -102,6 +102,7 @@ namespace client
LeaseSetDestination (boost::asio::io_service& service, bool isPublic, const std::map<std::string, std::string> * params = nullptr); LeaseSetDestination (boost::asio::io_service& service, bool isPublic, const std::map<std::string, std::string> * params = nullptr);
~LeaseSetDestination (); ~LeaseSetDestination ();
const std::string& GetNickname () const { return m_Nickname; }; const std::string& GetNickname () const { return m_Nickname; };
boost::asio::io_service& GetService () { return m_Service; };
virtual void Start (); virtual void Start ();
virtual void Stop (); virtual void Stop ();
@ -191,7 +192,7 @@ namespace client
bool IsPerClientAuth () const { return m_AuthType > 0; }; bool IsPerClientAuth () const { return m_AuthType > 0; };
}; };
class ClientDestination: public i2p::util::RunnableService, public LeaseSetDestination class ClientDestination: private i2p::util::RunnableService, public LeaseSetDestination
{ {
public: public:
#ifdef I2LUA #ifdef I2LUA

4
libi2pd/NTCP2.cpp

@ -1155,7 +1155,7 @@ namespace transport
{ {
if (!IsRunning ()) if (!IsRunning ())
{ {
StartService (); StartIOService ();
auto& addresses = context.GetRouterInfo ().GetAddresses (); auto& addresses = context.GetRouterInfo ().GetAddresses ();
for (const auto& address: addresses) for (const auto& address: addresses)
{ {
@ -1217,7 +1217,7 @@ namespace transport
if (IsRunning ()) if (IsRunning ())
m_TerminationTimer.cancel (); m_TerminationTimer.cancel ();
StopService (); StopIOService ();
} }
bool NTCP2Server::AddNTCP2Session (std::shared_ptr<NTCP2Session> session, bool incoming) bool NTCP2Server::AddNTCP2Session (std::shared_ptr<NTCP2Session> session, bool incoming)

3
libi2pd/NTCP2.h

@ -216,7 +216,7 @@ namespace transport
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
}; };
class NTCP2Server: public i2p::util::RunnableServiceWithWork class NTCP2Server: private i2p::util::RunnableServiceWithWork
{ {
public: public:
@ -225,6 +225,7 @@ namespace transport
void Start (); void Start ();
void Stop (); void Stop ();
boost::asio::io_service& GetService () { return GetIOService (); };
bool AddNTCP2Session (std::shared_ptr<NTCP2Session> session, bool incoming = false); bool AddNTCP2Session (std::shared_ptr<NTCP2Session> session, bool incoming = false);
void RemoveNTCP2Session (std::shared_ptr<NTCP2Session> session); void RemoveNTCP2Session (std::shared_ptr<NTCP2Session> session);

4
libi2pd/util.cpp

@ -58,7 +58,7 @@ namespace i2p
namespace util namespace util
{ {
void RunnableService::StartService () void RunnableService::StartIOService ()
{ {
if (!m_IsRunning) if (!m_IsRunning)
{ {
@ -67,7 +67,7 @@ namespace util
} }
} }
void RunnableService::StopService () void RunnableService::StopIOService ()
{ {
if (m_IsRunning) if (m_IsRunning)
{ {

12
libi2pd/util.h

@ -125,16 +125,16 @@ namespace util
class RunnableService class RunnableService
{ {
public: protected:
RunnableService (const std::string& name): m_Name (name), m_IsRunning (false) {} RunnableService (const std::string& name): m_Name (name), m_IsRunning (false) {}
virtual ~RunnableService () {} virtual ~RunnableService () {}
boost::asio::io_service& GetService () { return m_Service; } boost::asio::io_service& GetIOService () { return m_Service; }
bool IsRunning () const { return m_IsRunning; }; bool IsRunning () const { return m_IsRunning; };
void StartService (); void StartIOService ();
void StopService (); void StopIOService ();
private: private:
@ -150,10 +150,10 @@ namespace util
class RunnableServiceWithWork: public RunnableService class RunnableServiceWithWork: public RunnableService
{ {
public: protected:
RunnableServiceWithWork (const std::string& name): RunnableServiceWithWork (const std::string& name):
RunnableService (name), m_Work (GetService ()) {} RunnableService (name), m_Work (GetIOService ()) {}
private: private:

34
libi2pd_client/BOB.cpp

@ -743,8 +743,8 @@ namespace client
} }
BOBCommandChannel::BOBCommandChannel (const std::string& address, int port): BOBCommandChannel::BOBCommandChannel (const std::string& address, int port):
m_IsRunning (false), m_Thread (nullptr), RunnableService ("BOB"),
m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), port)) m_Acceptor (GetIOService (), boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), port))
{ {
// command -> handler // command -> handler
m_CommandHandlers[BOB_COMMAND_ZAP] = &BOBCommandSession::ZapCommandHandler; m_CommandHandlers[BOB_COMMAND_ZAP] = &BOBCommandSession::ZapCommandHandler;
@ -794,7 +794,8 @@ namespace client
BOBCommandChannel::~BOBCommandChannel () BOBCommandChannel::~BOBCommandChannel ()
{ {
Stop (); if (IsRunning ())
Stop ();
for (const auto& it: m_Destinations) for (const auto& it: m_Destinations)
delete it.second; delete it.second;
} }
@ -802,38 +803,15 @@ namespace client
void BOBCommandChannel::Start () void BOBCommandChannel::Start ()
{ {
Accept (); Accept ();
m_IsRunning = true; StartIOService ();
m_Thread = new std::thread (std::bind (&BOBCommandChannel::Run, this));
} }
void BOBCommandChannel::Stop () void BOBCommandChannel::Stop ()
{ {
m_IsRunning = false;
for (auto& it: m_Destinations) for (auto& it: m_Destinations)
it.second->Stop (); it.second->Stop ();
m_Acceptor.cancel (); m_Acceptor.cancel ();
m_Service.stop (); StopIOService ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
}
}
void BOBCommandChannel::Run ()
{
while (m_IsRunning)
{
try
{
m_Service.run ();
}
catch (std::exception& ex)
{
LogPrint (eLogError, "BOB: runtime exception: ", ex.what ());
}
}
} }
void BOBCommandChannel::AddDestination (const std::string& name, BOBDestination * dest) void BOBCommandChannel::AddDestination (const std::string& name, BOBDestination * dest)

9
libi2pd_client/BOB.h

@ -7,6 +7,7 @@
#include <map> #include <map>
#include <string> #include <string>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include "util.h"
#include "I2PTunnel.h" #include "I2PTunnel.h"
#include "I2PService.h" #include "I2PService.h"
#include "Identity.h" #include "Identity.h"
@ -231,7 +232,7 @@ namespace client
}; };
typedef void (BOBCommandSession::*BOBCommandHandler)(const char * operand, size_t len); typedef void (BOBCommandSession::*BOBCommandHandler)(const char * operand, size_t len);
class BOBCommandChannel class BOBCommandChannel: private i2p::util::RunnableService
{ {
public: public:
@ -241,22 +242,18 @@ namespace client
void Start (); void Start ();
void Stop (); void Stop ();
boost::asio::io_service& GetService () { return m_Service; }; boost::asio::io_service& GetService () { return GetIOService (); };
void AddDestination (const std::string& name, BOBDestination * dest); void AddDestination (const std::string& name, BOBDestination * dest);
void DeleteDestination (const std::string& name); void DeleteDestination (const std::string& name);
BOBDestination * FindDestination (const std::string& name); BOBDestination * FindDestination (const std::string& name);
private: private:
void Run ();
void Accept (); void Accept ();
void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<BOBCommandSession> session); void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<BOBCommandSession> session);
private: private:
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::ip::tcp::acceptor m_Acceptor; boost::asio::ip::tcp::acceptor m_Acceptor;
std::map<std::string, BOBDestination *> m_Destinations; std::map<std::string, BOBDestination *> m_Destinations;
std::map<std::string, BOBCommandHandler> m_CommandHandlers; std::map<std::string, BOBCommandHandler> m_CommandHandlers;

3
libi2pd_client/ClientContext.cpp

@ -344,8 +344,7 @@ namespace client
if (it != m_Destinations.end ()) if (it != m_Destinations.end ())
{ {
LogPrint (eLogWarning, "Clients: Local destination ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " exists"); LogPrint (eLogWarning, "Clients: Local destination ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " exists");
if (!it->second->IsRunning ()) it->second->Start (); // make sure to start
it->second->Start ();
return it->second; return it->second;
} }
auto localDestination = std::make_shared<ClientDestination> (keys, isPublic, params); auto localDestination = std::make_shared<ClientDestination> (keys, isPublic, params);

6
libi2pd_client/I2CP.cpp

@ -24,7 +24,7 @@ namespace client
{ {
I2CPDestination::I2CPDestination (std::shared_ptr<I2CPSession> owner, std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params): I2CPDestination::I2CPDestination (std::shared_ptr<I2CPSession> owner, std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params):
RunnableService ("I2CP"), LeaseSetDestination (GetService (), isPublic, &params), RunnableService ("I2CP"), LeaseSetDestination (GetIOService (), isPublic, &params),
m_Owner (owner), m_Identity (identity) m_Owner (owner), m_Identity (identity)
{ {
} }
@ -40,7 +40,7 @@ namespace client
if (!IsRunning ()) if (!IsRunning ())
{ {
LeaseSetDestination::Start (); LeaseSetDestination::Start ();
StartService (); StartIOService ();
} }
} }
@ -49,7 +49,7 @@ namespace client
if (IsRunning ()) if (IsRunning ())
{ {
LeaseSetDestination::Stop (); LeaseSetDestination::Stop ();
StopService (); StopIOService ();
} }
} }

2
libi2pd_client/I2CP.h

@ -62,7 +62,7 @@ namespace client
const char I2CP_PARAM_MESSAGE_RELIABILITY[] = "i2cp.messageReliability"; const char I2CP_PARAM_MESSAGE_RELIABILITY[] = "i2cp.messageReliability";
class I2CPSession; class I2CPSession;
class I2CPDestination: public i2p::util::RunnableService, public LeaseSetDestination class I2CPDestination: private i2p::util::RunnableService, public LeaseSetDestination
{ {
public: public:

36
libi2pd_client/SAM.cpp

@ -1001,9 +1001,9 @@ namespace client
} }
SAMBridge::SAMBridge (const std::string& address, int port): SAMBridge::SAMBridge (const std::string& address, int port):
m_IsRunning (false), m_Thread (nullptr), RunnableService ("SAM"),
m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), port)), m_Acceptor (GetIOService (), boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), port)),
m_DatagramEndpoint (boost::asio::ip::address::from_string(address), port-1), m_DatagramSocket (m_Service, m_DatagramEndpoint), m_DatagramEndpoint (boost::asio::ip::address::from_string(address), port-1), m_DatagramSocket (GetIOService (), m_DatagramEndpoint),
m_SignatureTypes m_SignatureTypes
{ {
{"DSA_SHA1", i2p::data::SIGNING_KEY_TYPE_DSA_SHA1}, {"DSA_SHA1", i2p::data::SIGNING_KEY_TYPE_DSA_SHA1},
@ -1020,7 +1020,7 @@ namespace client
SAMBridge::~SAMBridge () SAMBridge::~SAMBridge ()
{ {
if (m_IsRunning) if (IsRunning ())
Stop (); Stop ();
} }
@ -1028,14 +1028,11 @@ namespace client
{ {
Accept (); Accept ();
ReceiveDatagram (); ReceiveDatagram ();
m_IsRunning = true; StartIOService ();
m_Thread = new std::thread (std::bind (&SAMBridge::Run, this));
} }
void SAMBridge::Stop () void SAMBridge::Stop ()
{ {
m_IsRunning = false;
try try
{ {
m_Acceptor.cancel (); m_Acceptor.cancel ();
@ -1048,28 +1045,7 @@ namespace client
for (auto& it: m_Sessions) for (auto& it: m_Sessions)
it.second->CloseStreams (); it.second->CloseStreams ();
m_Sessions.clear (); m_Sessions.clear ();
m_Service.stop (); StopIOService ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
}
}
void SAMBridge::Run ()
{
while (m_IsRunning)
{
try
{
m_Service.run ();
}
catch (std::exception& ex)
{
LogPrint (eLogError, "SAM: runtime exception: ", ex.what ());
}
}
} }
void SAMBridge::Accept () void SAMBridge::Accept ()

10
libi2pd_client/SAM.h

@ -9,6 +9,7 @@
#include <mutex> #include <mutex>
#include <memory> #include <memory>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include "util.h"
#include "Identity.h" #include "Identity.h"
#include "LeaseSet.h" #include "LeaseSet.h"
#include "Streaming.h" #include "Streaming.h"
@ -174,7 +175,7 @@ namespace client
void CloseStreams (); void CloseStreams ();
}; };
class SAMBridge class SAMBridge: private i2p::util::RunnableService
{ {
public: public:
@ -184,7 +185,7 @@ namespace client
void Start (); void Start ();
void Stop (); void Stop ();
boost::asio::io_service& GetService () { return m_Service; }; boost::asio::io_service& GetService () { return GetIOService (); };
std::shared_ptr<SAMSession> CreateSession (const std::string& id, SAMSessionType type, const std::string& destination, // empty string means transient std::shared_ptr<SAMSession> CreateSession (const std::string& id, SAMSessionType type, const std::string& destination, // empty string means transient
const std::map<std::string, std::string> * params); const std::map<std::string, std::string> * params);
void CloseSession (const std::string& id); void CloseSession (const std::string& id);
@ -201,8 +202,6 @@ namespace client
private: private:
void Run ();
void Accept (); void Accept ();
void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<SAMSocket> socket); void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<SAMSocket> socket);
@ -211,9 +210,6 @@ namespace client
private: private:
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::ip::tcp::acceptor m_Acceptor; boost::asio::ip::tcp::acceptor m_Acceptor;
boost::asio::ip::udp::endpoint m_DatagramEndpoint, m_SenderEndpoint; boost::asio::ip::udp::endpoint m_DatagramEndpoint, m_SenderEndpoint;
boost::asio::ip::udp::socket m_DatagramSocket; boost::asio::ip::udp::socket m_DatagramSocket;

Loading…
Cancel
Save