Browse Source

net: Create CConnman to encapsulate p2p connections

0.14
Cory Fields 9 years ago
parent
commit
cd16f48028
  1. 1
      src/addrdb.h
  2. 14
      src/init.cpp
  3. 83
      src/net.cpp
  4. 30
      src/net.h
  5. 6
      src/test/test_bitcoin.cpp
  6. 2
      src/test/test_bitcoin.h

1
src/addrdb.h

@ -14,6 +14,7 @@
class CSubNet; class CSubNet;
class CAddrMan; class CAddrMan;
class CDataStream;
typedef enum BanReason typedef enum BanReason
{ {

14
src/init.cpp

@ -42,6 +42,7 @@
#endif #endif
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <memory>
#ifndef WIN32 #ifndef WIN32
#include <signal.h> #include <signal.h>
@ -70,6 +71,7 @@ static const bool DEFAULT_REST_ENABLE = false;
static const bool DEFAULT_DISABLE_SAFEMODE = false; static const bool DEFAULT_DISABLE_SAFEMODE = false;
static const bool DEFAULT_STOPAFTERBLOCKIMPORT = false; static const bool DEFAULT_STOPAFTERBLOCKIMPORT = false;
std::unique_ptr<CConnman> g_connman;
#if ENABLE_ZMQ #if ENABLE_ZMQ
static CZMQNotificationInterface* pzmqNotificationInterface = NULL; static CZMQNotificationInterface* pzmqNotificationInterface = NULL;
@ -197,7 +199,9 @@ void Shutdown()
if (pwalletMain) if (pwalletMain)
pwalletMain->Flush(false); pwalletMain->Flush(false);
#endif #endif
StopNode(); StopNode(*g_connman);
g_connman.reset();
StopTorControl(); StopTorControl();
UnregisterNodeSignals(GetNodeSignals()); UnregisterNodeSignals(GetNodeSignals());
@ -1101,6 +1105,10 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
#endif // ENABLE_WALLET #endif // ENABLE_WALLET
// ********************************************************* Step 6: network initialization // ********************************************************* Step 6: network initialization
assert(!g_connman);
g_connman = std::unique_ptr<CConnman>(new CConnman());
CConnman& connman = *g_connman;
RegisterNodeSignals(GetNodeSignals()); RegisterNodeSignals(GetNodeSignals());
// sanitize comments per BIP-0014, format user agent and check total size // sanitize comments per BIP-0014, format user agent and check total size
@ -1497,7 +1505,9 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
if (GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)) if (GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION))
StartTorControl(threadGroup, scheduler); StartTorControl(threadGroup, scheduler);
StartNode(threadGroup, scheduler); std::string strNodeError;
if(!StartNode(connman, threadGroup, scheduler, strNodeError))
return InitError(strNodeError);
// ********************************************************* Step 12: finished // ********************************************************* Step 12: finished

83
src/net.cpp

@ -65,13 +65,6 @@
namespace { namespace {
const int MAX_OUTBOUND_CONNECTIONS = 8; const int MAX_OUTBOUND_CONNECTIONS = 8;
const int MAX_FEELER_CONNECTIONS = 1; const int MAX_FEELER_CONNECTIONS = 1;
struct ListenSocket {
SOCKET socket;
bool whitelisted;
ListenSocket(SOCKET _socket, bool _whitelisted) : socket(_socket), whitelisted(_whitelisted) {}
};
} }
const static std::string NET_MESSAGE_COMMAND_OTHER = "*other*"; const static std::string NET_MESSAGE_COMMAND_OTHER = "*other*";
@ -1015,7 +1008,7 @@ static bool AttemptToEvictConnection() {
return false; return false;
} }
static void AcceptConnection(const ListenSocket& hListenSocket) { void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
struct sockaddr_storage sockaddr; struct sockaddr_storage sockaddr;
socklen_t len = sizeof(sockaddr); socklen_t len = sizeof(sockaddr);
SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len); SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len);
@ -1089,7 +1082,7 @@ static void AcceptConnection(const ListenSocket& hListenSocket) {
} }
} }
void ThreadSocketHandler() void CConnman::ThreadSocketHandler()
{ {
unsigned int nPrevNodeCount = 0; unsigned int nPrevNodeCount = 0;
while (true) while (true)
@ -1497,7 +1490,7 @@ static std::string GetDNSHost(const CDNSSeedData& data, ServiceFlags* requiredSe
} }
void ThreadDNSAddressSeed() void CConnman::ThreadDNSAddressSeed()
{ {
// goal: only query DNS seeds if address need is acute // goal: only query DNS seeds if address need is acute
if ((addrman.size() > 0) && if ((addrman.size() > 0) &&
@ -1577,7 +1570,7 @@ void DumpData()
DumpBanlist(); DumpBanlist();
} }
void static ProcessOneShot() void CConnman::ProcessOneShot()
{ {
std::string strDest; std::string strDest;
{ {
@ -1595,7 +1588,7 @@ void static ProcessOneShot()
} }
} }
void ThreadOpenConnections() void CConnman::ThreadOpenConnections()
{ {
// Connect to specific addresses // Connect to specific addresses
if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0) if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0)
@ -1791,7 +1784,7 @@ std::vector<AddedNodeInfo> GetAddedNodeInfo()
return ret; return ret;
} }
void ThreadOpenAddedConnections() void CConnman::ThreadOpenAddedConnections()
{ {
{ {
LOCK(cs_vAddedNodes); LOCK(cs_vAddedNodes);
@ -1848,7 +1841,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSem
} }
void ThreadMessageHandler() void CConnman::ThreadMessageHandler()
{ {
boost::mutex condition_mutex; boost::mutex condition_mutex;
boost::unique_lock<boost::mutex> lock(condition_mutex); boost::unique_lock<boost::mutex> lock(condition_mutex);
@ -2064,7 +2057,11 @@ void static Discover(boost::thread_group& threadGroup)
#endif #endif
} }
void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler) CConnman::CConnman()
{
}
bool StartNode(CConnman& connman, boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError)
{ {
uiInterface.InitMessage(_("Loading addresses...")); uiInterface.InitMessage(_("Loading addresses..."));
// Load addresses from peers.dat // Load addresses from peers.dat
@ -2102,6 +2099,17 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
fAddressesInitialized = true; fAddressesInitialized = true;
Discover(threadGroup);
bool ret = connman.Start(threadGroup, strNodeError);
// Dump network addresses
scheduler.scheduleEvery(DumpData, DUMP_ADDRESSES_INTERVAL);
return ret;
}
bool CConnman::Start(boost::thread_group& threadGroup, std::string& strNodeError)
{
if (semOutbound == NULL) { if (semOutbound == NULL) {
// initialize semaphore // initialize semaphore
int nMaxOutbound = std::min((MAX_OUTBOUND_CONNECTIONS + MAX_FEELER_CONNECTIONS), nMaxConnections); int nMaxOutbound = std::min((MAX_OUTBOUND_CONNECTIONS + MAX_FEELER_CONNECTIONS), nMaxConnections);
@ -2114,8 +2122,6 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService(local, 0), nLocalServices)); pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService(local, 0), nLocalServices));
} }
Discover(threadGroup);
// //
// Start threads // Start threads
// //
@ -2123,34 +2129,30 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
if (!GetBoolArg("-dnsseed", true)) if (!GetBoolArg("-dnsseed", true))
LogPrintf("DNS seeding disabled\n"); LogPrintf("DNS seeding disabled\n");
else else
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "dnsseed", &ThreadDNSAddressSeed)); threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this))));
// Map ports with UPnP // Map ports with UPnP
MapPort(GetBoolArg("-upnp", DEFAULT_UPNP)); MapPort(GetBoolArg("-upnp", DEFAULT_UPNP));
// Send and receive from sockets, accept connections // Send and receive from sockets, accept connections
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler)); threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this))));
// Initiate outbound connections from -addnode // Initiate outbound connections from -addnode
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections)); threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this))));
// Initiate outbound connections // Initiate outbound connections
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections)); threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this))));
// Process messages // Process messages
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler)); threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this))));
// Dump network addresses return true;
scheduler.scheduleEvery(&DumpData, DUMP_ADDRESSES_INTERVAL);
} }
bool StopNode() bool StopNode(CConnman& connman)
{ {
LogPrintf("StopNode()\n"); LogPrintf("StopNode()\n");
MapPort(false); MapPort(false);
if (semOutbound)
for (int i=0; i<(MAX_OUTBOUND_CONNECTIONS + MAX_FEELER_CONNECTIONS); i++)
semOutbound->post();
if (fAddressesInitialized) if (fAddressesInitialized)
{ {
@ -2158,6 +2160,7 @@ bool StopNode()
fAddressesInitialized = false; fAddressesInitialized = false;
} }
connman.Stop();
return true; return true;
} }
@ -2168,6 +2171,20 @@ public:
~CNetCleanup() ~CNetCleanup()
{ {
#ifdef WIN32
// Shutdown Windows Sockets
WSACleanup();
#endif
}
}
instance_of_cnetcleanup;
void CConnman::Stop()
{
if (semOutbound)
for (int i=0; i<(MAX_OUTBOUND_CONNECTIONS + MAX_FEELER_CONNECTIONS); i++)
semOutbound->post();
// Close sockets // Close sockets
BOOST_FOREACH(CNode* pnode, vNodes) BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode->hSocket != INVALID_SOCKET) if (pnode->hSocket != INVALID_SOCKET)
@ -2189,15 +2206,11 @@ public:
semOutbound = NULL; semOutbound = NULL;
delete pnodeLocalHost; delete pnodeLocalHost;
pnodeLocalHost = NULL; pnodeLocalHost = NULL;
#ifdef WIN32
// Shutdown Windows Sockets
WSACleanup();
#endif
}
} }
instance_of_cnetcleanup;
CConnman::~CConnman()
{
}
void RelayTransaction(const CTransaction& tx) void RelayTransaction(const CTransaction& tx)
{ {

30
src/net.h

@ -21,6 +21,7 @@
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <stdint.h> #include <stdint.h>
#include <memory>
#ifndef WIN32 #ifndef WIN32
#include <arpa/inet.h> #include <arpa/inet.h>
@ -93,11 +94,36 @@ CNode* FindNode(const std::string& addrName);
CNode* FindNode(const CService& ip); CNode* FindNode(const CService& ip);
CNode* FindNode(const NodeId id); //TODO: Remove this CNode* FindNode(const NodeId id); //TODO: Remove this
bool OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false, bool fFeeler = false); bool OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false, bool fFeeler = false);
struct ListenSocket {
SOCKET socket;
bool whitelisted;
ListenSocket(SOCKET socket_, bool whitelisted_) : socket(socket_), whitelisted(whitelisted_) {}
};
class CConnman
{
public:
CConnman();
~CConnman();
bool Start(boost::thread_group& threadGroup, std::string& strNodeError);
void Stop();
private:
void ThreadOpenAddedConnections();
void ProcessOneShot();
void ThreadOpenConnections();
void ThreadMessageHandler();
void AcceptConnection(const ListenSocket& hListenSocket);
void ThreadSocketHandler();
void ThreadDNSAddressSeed();
};
extern std::unique_ptr<CConnman> g_connman;
void MapPort(bool fUseUPnP); void MapPort(bool fUseUPnP);
unsigned short GetListenPort(); unsigned short GetListenPort();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler); bool StartNode(CConnman& connman, boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError);
bool StopNode(); bool StopNode(CConnman& connman);
void SocketSendData(CNode *pnode); void SocketSendData(CNode *pnode);
struct CombinerAll struct CombinerAll

6
src/test/test_bitcoin.cpp

@ -26,6 +26,8 @@
#include <boost/test/unit_test.hpp> #include <boost/test/unit_test.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
std::unique_ptr<CConnman> g_connman;
extern bool fPrintToConsole; extern bool fPrintToConsole;
extern void noui_connect(); extern void noui_connect();
@ -43,6 +45,7 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName)
BasicTestingSetup::~BasicTestingSetup() BasicTestingSetup::~BasicTestingSetup()
{ {
ECC_Stop(); ECC_Stop();
g_connman.reset();
} }
TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(chainName) TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(chainName)
@ -50,6 +53,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
const CChainParams& chainparams = Params(); const CChainParams& chainparams = Params();
// Ideally we'd move all the RPC tests to the functional testing framework // Ideally we'd move all the RPC tests to the functional testing framework
// instead of unit tests, but for now we need these here. // instead of unit tests, but for now we need these here.
RegisterAllCoreRPCCommands(tableRPC); RegisterAllCoreRPCCommands(tableRPC);
ClearDatadirCache(); ClearDatadirCache();
pathTemp = GetTempPath() / strprintf("test_bitcoin_%lu_%i", (unsigned long)GetTime(), (int)(GetRand(100000))); pathTemp = GetTempPath() / strprintf("test_bitcoin_%lu_%i", (unsigned long)GetTime(), (int)(GetRand(100000)));
@ -68,6 +72,8 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
nScriptCheckThreads = 3; nScriptCheckThreads = 3;
for (int i=0; i < nScriptCheckThreads-1; i++) for (int i=0; i < nScriptCheckThreads-1; i++)
threadGroup.create_thread(&ThreadScriptCheck); threadGroup.create_thread(&ThreadScriptCheck);
g_connman = std::unique_ptr<CConnman>(new CConnman());
connman = g_connman.get();
RegisterNodeSignals(GetNodeSignals()); RegisterNodeSignals(GetNodeSignals());
} }

2
src/test/test_bitcoin.h

@ -27,10 +27,12 @@ struct BasicTestingSetup {
/** Testing setup that configures a complete environment. /** Testing setup that configures a complete environment.
* Included are data directory, coins database, script check threads setup. * Included are data directory, coins database, script check threads setup.
*/ */
class CConnman;
struct TestingSetup: public BasicTestingSetup { struct TestingSetup: public BasicTestingSetup {
CCoinsViewDB *pcoinsdbview; CCoinsViewDB *pcoinsdbview;
boost::filesystem::path pathTemp; boost::filesystem::path pathTemp;
boost::thread_group threadGroup; boost::thread_group threadGroup;
CConnman* connman;
TestingSetup(const std::string& chainName = CBaseChainParams::MAIN); TestingSetup(const std::string& chainName = CBaseChainParams::MAIN);
~TestingSetup(); ~TestingSetup();

Loading…
Cancel
Save