Browse Source

net: make net interruptible

Also now that net threads are interruptible, switch them to use std
threads/binds/mutexes/condvars.
0.14
Cory Fields 8 years ago
parent
commit
0985052319
  1. 4
      src/init.cpp
  2. 105
      src/net.cpp
  3. 19
      src/net.h

4
src/init.cpp

@ -176,6 +176,8 @@ void Interrupt(boost::thread_group& threadGroup)
InterruptRPC(); InterruptRPC();
InterruptREST(); InterruptREST();
InterruptTorControl(); InterruptTorControl();
if (g_connman)
g_connman->Interrupt();
threadGroup.interrupt_all(); threadGroup.interrupt_all();
} }
@ -1572,7 +1574,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe; connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe;
connOptions.nMaxOutboundLimit = nMaxOutboundLimit; connOptions.nMaxOutboundLimit = nMaxOutboundLimit;
if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions)) if (!connman.Start(scheduler, strNodeError, connOptions))
return InitError(strNodeError); return InitError(strNodeError);
// ********************************************************* Step 12: finished // ********************************************************* Step 12: finished

105
src/net.cpp

@ -1042,7 +1042,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
void CConnman::ThreadSocketHandler() void CConnman::ThreadSocketHandler()
{ {
unsigned int nPrevNodeCount = 0; unsigned int nPrevNodeCount = 0;
while (true) while (!interruptNet)
{ {
// //
// Disconnect nodes // Disconnect nodes
@ -1180,7 +1180,8 @@ void CConnman::ThreadSocketHandler()
int nSelect = select(have_fds ? hSocketMax + 1 : 0, int nSelect = select(have_fds ? hSocketMax + 1 : 0,
&fdsetRecv, &fdsetSend, &fdsetError, &timeout); &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
boost::this_thread::interruption_point(); if (interruptNet)
return;
if (nSelect == SOCKET_ERROR) if (nSelect == SOCKET_ERROR)
{ {
@ -1193,7 +1194,8 @@ void CConnman::ThreadSocketHandler()
} }
FD_ZERO(&fdsetSend); FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError); FD_ZERO(&fdsetError);
MilliSleep(timeout.tv_usec/1000); if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
return;
} }
// //
@ -1219,7 +1221,8 @@ void CConnman::ThreadSocketHandler()
} }
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy)
{ {
boost::this_thread::interruption_point(); if (interruptNet)
return;
// //
// Receive // Receive
@ -1241,7 +1244,7 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
if(notify) if(notify)
messageHandlerCondition.notify_one(); condMsgProc.notify_one();
pnode->nLastRecv = GetTime(); pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes; pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes); RecordBytesRecv(nBytes);
@ -1469,7 +1472,8 @@ void CConnman::ThreadDNSAddressSeed()
// less influence on the network topology, and reduces traffic to the seeds. // less influence on the network topology, and reduces traffic to the seeds.
if ((addrman.size() > 0) && if ((addrman.size() > 0) &&
(!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) { (!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) {
MilliSleep(11 * 1000); if (!interruptNet.sleep_for(std::chrono::seconds(11)))
return;
LOCK(cs_vNodes); LOCK(cs_vNodes);
int nRelevant = 0; int nRelevant = 0;
@ -1580,10 +1584,12 @@ void CConnman::ThreadOpenConnections()
OpenNetworkConnection(addr, false, NULL, strAddr.c_str()); OpenNetworkConnection(addr, false, NULL, strAddr.c_str());
for (int i = 0; i < 10 && i < nLoop; i++) for (int i = 0; i < 10 && i < nLoop; i++)
{ {
MilliSleep(500); if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
} }
} }
MilliSleep(500); if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
} }
} }
@ -1592,14 +1598,16 @@ void CConnman::ThreadOpenConnections()
// Minimum time before next feeler connection (in microseconds). // Minimum time before next feeler connection (in microseconds).
int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL); int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL);
while (true) while (!interruptNet)
{ {
ProcessOneShot(); ProcessOneShot();
MilliSleep(500); if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
CSemaphoreGrant grant(*semOutbound); CSemaphoreGrant grant(*semOutbound);
boost::this_thread::interruption_point(); if (interruptNet)
return;
// Add seed nodes if DNS seeds are all down (an infrastructure attack?). // Add seed nodes if DNS seeds are all down (an infrastructure attack?).
if (addrman.size() == 0 && (GetTime() - nStart > 60)) { if (addrman.size() == 0 && (GetTime() - nStart > 60)) {
@ -1657,7 +1665,7 @@ void CConnman::ThreadOpenConnections()
int64_t nANow = GetAdjustedTime(); int64_t nANow = GetAdjustedTime();
int nTries = 0; int nTries = 0;
while (true) while (!interruptNet)
{ {
CAddrInfo addr = addrman.Select(fFeeler); CAddrInfo addr = addrman.Select(fFeeler);
@ -1700,7 +1708,8 @@ void CConnman::ThreadOpenConnections()
if (fFeeler) { if (fFeeler) {
// Add small amount of random noise before connection to avoid synchronization. // Add small amount of random noise before connection to avoid synchronization.
int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000); int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
MilliSleep(randsleep); if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep)))
return;
LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString()); LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString());
} }
@ -1779,11 +1788,12 @@ void CConnman::ThreadOpenAddedConnections()
// OpenNetworkConnection can detect existing connections to that IP/port. // OpenNetworkConnection can detect existing connections to that IP/port.
CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort())); CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort()));
OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false); OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false);
MilliSleep(500); if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
} }
} }
if (!interruptNet.sleep_for(std::chrono::minutes(2)))
MilliSleep(120000); // Retry every 2 minutes return;
} }
} }
@ -1793,7 +1803,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
// //
// Initiate outbound network connection // Initiate outbound network connection
// //
boost::this_thread::interruption_point(); if (interruptNet) {
return false;
}
if (!fNetworkActive) { if (!fNetworkActive) {
return false; return false;
} }
@ -1819,13 +1831,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
return true; return true;
} }
void CConnman::ThreadMessageHandler() void CConnman::ThreadMessageHandler()
{ {
boost::mutex condition_mutex; while (!flagInterruptMsgProc)
boost::unique_lock<boost::mutex> lock(condition_mutex);
while (true)
{ {
std::vector<CNode*> vNodesCopy; std::vector<CNode*> vNodesCopy;
{ {
@ -1860,7 +1868,8 @@ void CConnman::ThreadMessageHandler()
} }
} }
} }
boost::this_thread::interruption_point(); if (flagInterruptMsgProc)
return;
// Send messages // Send messages
{ {
@ -1868,7 +1877,8 @@ void CConnman::ThreadMessageHandler()
if (lockSend) if (lockSend)
GetNodeSignals().SendMessages(pnode, *this); GetNodeSignals().SendMessages(pnode, *this);
} }
boost::this_thread::interruption_point(); if (flagInterruptMsgProc)
return;
} }
{ {
@ -1877,8 +1887,10 @@ void CConnman::ThreadMessageHandler()
pnode->Release(); pnode->Release();
} }
if (fSleep) if (fSleep) {
messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100)); std::unique_lock<std::mutex> lock(mutexMsgProc);
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
}
} }
} }
@ -2070,6 +2082,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe
nMaxOutbound = 0; nMaxOutbound = 0;
nBestHeight = 0; nBestHeight = 0;
clientInterface = NULL; clientInterface = NULL;
flagInterruptMsgProc = false;
} }
NodeId CConnman::GetNewNodeId() NodeId CConnman::GetNewNodeId()
@ -2077,7 +2090,7 @@ NodeId CConnman::GetNewNodeId()
return nLastNodeId.fetch_add(1, std::memory_order_relaxed); return nLastNodeId.fetch_add(1, std::memory_order_relaxed);
} }
bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions) bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions)
{ {
nTotalBytesRecv = 0; nTotalBytesRecv = 0;
nTotalBytesSent = 0; nTotalBytesSent = 0;
@ -2144,24 +2157,26 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st
// //
// Start threads // Start threads
// //
interruptNet.reset();
flagInterruptMsgProc = false;
// Send and receive from sockets, accept connections // Send and receive from sockets, accept connections
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this)))); threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
if (!GetBoolArg("-dnsseed", true)) if (!GetBoolArg("-dnsseed", true))
LogPrintf("DNS seeding disabled\n"); LogPrintf("DNS seeding disabled\n");
else else
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this)))); threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));
// Initiate outbound connections from -addnode // Initiate outbound connections from -addnode
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this)))); threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));
// Initiate outbound connections unless connect=0 // Initiate outbound connections unless connect=0
if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0") if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0")
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this)))); threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this)));
// Process messages // Process messages
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this)))); threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
// Dump network addresses // Dump network addresses
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL); scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
@ -2184,12 +2199,33 @@ public:
} }
instance_of_cnetcleanup; instance_of_cnetcleanup;
void CConnman::Stop() void CConnman::Interrupt()
{ {
LogPrintf("%s\n",__func__); {
std::lock_guard<std::mutex> lock(mutexMsgProc);
flagInterruptMsgProc = true;
}
condMsgProc.notify_all();
interruptNet();
if (semOutbound) if (semOutbound)
for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
semOutbound->post(); semOutbound->post();
}
void CConnman::Stop()
{
if (threadMessageHandler.joinable())
threadMessageHandler.join();
if (threadOpenConnections.joinable())
threadOpenConnections.join();
if (threadOpenAddedConnections.joinable())
threadOpenAddedConnections.join();
if (threadDNSAddressSeed.joinable())
threadDNSAddressSeed.join();
if (threadSocketHandler.joinable())
threadSocketHandler.join();
if (fAddressesInitialized) if (fAddressesInitialized)
{ {
@ -2232,6 +2268,7 @@ void CConnman::DeleteNode(CNode* pnode)
CConnman::~CConnman() CConnman::~CConnman()
{ {
Interrupt();
Stop(); Stop();
} }

19
src/net.h

@ -19,11 +19,14 @@
#include "streams.h" #include "streams.h"
#include "sync.h" #include "sync.h"
#include "uint256.h" #include "uint256.h"
#include "threadinterrupt.h"
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <stdint.h> #include <stdint.h>
#include <thread>
#include <memory> #include <memory>
#include <condition_variable>
#ifndef WIN32 #ifndef WIN32
#include <arpa/inet.h> #include <arpa/inet.h>
@ -142,8 +145,9 @@ public:
}; };
CConnman(uint64_t seed0, uint64_t seed1); CConnman(uint64_t seed0, uint64_t seed1);
~CConnman(); ~CConnman();
bool Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options options); bool Start(CScheduler& scheduler, std::string& strNodeError, Options options);
void Stop(); void Stop();
void Interrupt();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
bool GetNetworkActive() const { return fNetworkActive; }; bool GetNetworkActive() const { return fNetworkActive; };
void SetNetworkActive(bool active); void SetNetworkActive(bool active);
@ -402,7 +406,6 @@ private:
std::list<CNode*> vNodesDisconnected; std::list<CNode*> vNodesDisconnected;
mutable CCriticalSection cs_vNodes; mutable CCriticalSection cs_vNodes;
std::atomic<NodeId> nLastNodeId; std::atomic<NodeId> nLastNodeId;
boost::condition_variable messageHandlerCondition;
/** Services this instance offers */ /** Services this instance offers */
ServiceFlags nLocalServices; ServiceFlags nLocalServices;
@ -419,6 +422,18 @@ private:
/** SipHasher seeds for deterministic randomness */ /** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1; const uint64_t nSeed0, nSeed1;
std::condition_variable condMsgProc;
std::mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc;
CThreadInterrupt interruptNet;
std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
std::thread threadOpenConnections;
std::thread threadMessageHandler;
}; };
extern std::unique_ptr<CConnman> g_connman; extern std::unique_ptr<CConnman> g_connman;
void Discover(boost::thread_group& threadGroup); void Discover(boost::thread_group& threadGroup);

Loading…
Cancel
Save