Browse Source

Merge #9289: net: drop boost::thread_group

67ee4ec net: misc header cleanups (Cory Fields)
8b3159e net: make proxy receives interruptible (Cory Fields)
5cb0fce net: remove thread_interrupted catch (Cory Fields)
d3d7056 net: make net processing interruptible (Cory Fields)
0985052 net: make net interruptible (Cory Fields)
799df91 net: add CThreadInterrupt and InterruptibleSleep (Cory Fields)
7325b15 net: a few small cleanups before replacing boost threads (Cory Fields)
0.14
Wladimir J. van der Laan 8 years ago
parent
commit
d9ae1cefa0
No known key found for this signature in database
GPG Key ID: 74810B012346C9A6
  1. 2
      src/Makefile.am
  2. 4
      src/init.cpp
  3. 118
      src/net.cpp
  4. 23
      src/net.h
  5. 36
      src/net_processing.cpp
  6. 5
      src/net_processing.h
  7. 19
      src/netbase.cpp
  8. 1
      src/netbase.h
  9. 20
      src/test/DoS_tests.cpp
  10. 41
      src/threadinterrupt.cpp
  11. 34
      src/threadinterrupt.h

2
src/Makefile.am

@ -138,6 +138,7 @@ BITCOIN_CORE_H = \
support/lockedpool.h \ support/lockedpool.h \
sync.h \ sync.h \
threadsafety.h \ threadsafety.h \
threadinterrupt.h \
timedata.h \ timedata.h \
torcontrol.h \ torcontrol.h \
txdb.h \ txdb.h \
@ -327,6 +328,7 @@ libbitcoin_util_a_SOURCES = \
rpc/protocol.cpp \ rpc/protocol.cpp \
support/cleanse.cpp \ support/cleanse.cpp \
sync.cpp \ sync.cpp \
threadinterrupt.cpp \
util.cpp \ util.cpp \
utilmoneystr.cpp \ utilmoneystr.cpp \
utilstrencodings.cpp \ utilstrencodings.cpp \

4
src/init.cpp

@ -176,6 +176,8 @@ void Interrupt(boost::thread_group& threadGroup)
InterruptRPC(); InterruptRPC();
InterruptREST(); InterruptREST();
InterruptTorControl(); InterruptTorControl();
if (g_connman)
g_connman->Interrupt();
threadGroup.interrupt_all(); threadGroup.interrupt_all();
} }
@ -1572,7 +1574,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe; connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe;
connOptions.nMaxOutboundLimit = nMaxOutboundLimit; connOptions.nMaxOutboundLimit = nMaxOutboundLimit;
if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions)) if (!connman.Start(scheduler, strNodeError, connOptions))
return InitError(strNodeError); return InitError(strNodeError);
// ********************************************************* Step 12: finished // ********************************************************* Step 12: finished

118
src/net.cpp

@ -35,8 +35,6 @@
#include <miniupnpc/upnperrors.h> #include <miniupnpc/upnperrors.h>
#endif #endif
#include <boost/filesystem.hpp>
#include <boost/thread.hpp>
#include <math.h> #include <math.h>
@ -1042,7 +1040,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
void CConnman::ThreadSocketHandler() void CConnman::ThreadSocketHandler()
{ {
unsigned int nPrevNodeCount = 0; unsigned int nPrevNodeCount = 0;
while (true) while (!interruptNet)
{ {
// //
// Disconnect nodes // Disconnect nodes
@ -1180,7 +1178,8 @@ void CConnman::ThreadSocketHandler()
int nSelect = select(have_fds ? hSocketMax + 1 : 0, int nSelect = select(have_fds ? hSocketMax + 1 : 0,
&fdsetRecv, &fdsetSend, &fdsetError, &timeout); &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
boost::this_thread::interruption_point(); if (interruptNet)
return;
if (nSelect == SOCKET_ERROR) if (nSelect == SOCKET_ERROR)
{ {
@ -1193,7 +1192,8 @@ void CConnman::ThreadSocketHandler()
} }
FD_ZERO(&fdsetSend); FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError); FD_ZERO(&fdsetError);
MilliSleep(timeout.tv_usec/1000); if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
return;
} }
// //
@ -1219,7 +1219,8 @@ void CConnman::ThreadSocketHandler()
} }
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy)
{ {
boost::this_thread::interruption_point(); if (interruptNet)
return;
// //
// Receive // Receive
@ -1241,7 +1242,7 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
if(notify) if(notify)
messageHandlerCondition.notify_one(); condMsgProc.notify_one();
pnode->nLastRecv = GetTime(); pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes; pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes); RecordBytesRecv(nBytes);
@ -1469,7 +1470,8 @@ void CConnman::ThreadDNSAddressSeed()
// less influence on the network topology, and reduces traffic to the seeds. // less influence on the network topology, and reduces traffic to the seeds.
if ((addrman.size() > 0) && if ((addrman.size() > 0) &&
(!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) { (!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) {
MilliSleep(11 * 1000); if (!interruptNet.sleep_for(std::chrono::seconds(11)))
return;
LOCK(cs_vNodes); LOCK(cs_vNodes);
int nRelevant = 0; int nRelevant = 0;
@ -1580,10 +1582,12 @@ void CConnman::ThreadOpenConnections()
OpenNetworkConnection(addr, false, NULL, strAddr.c_str()); OpenNetworkConnection(addr, false, NULL, strAddr.c_str());
for (int i = 0; i < 10 && i < nLoop; i++) for (int i = 0; i < 10 && i < nLoop; i++)
{ {
MilliSleep(500); if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
} }
} }
MilliSleep(500); if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
} }
} }
@ -1592,14 +1596,16 @@ void CConnman::ThreadOpenConnections()
// Minimum time before next feeler connection (in microseconds). // Minimum time before next feeler connection (in microseconds).
int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL); int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL);
while (true) while (!interruptNet)
{ {
ProcessOneShot(); ProcessOneShot();
MilliSleep(500); if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
CSemaphoreGrant grant(*semOutbound); CSemaphoreGrant grant(*semOutbound);
boost::this_thread::interruption_point(); if (interruptNet)
return;
// Add seed nodes if DNS seeds are all down (an infrastructure attack?). // Add seed nodes if DNS seeds are all down (an infrastructure attack?).
if (addrman.size() == 0 && (GetTime() - nStart > 60)) { if (addrman.size() == 0 && (GetTime() - nStart > 60)) {
@ -1657,7 +1663,7 @@ void CConnman::ThreadOpenConnections()
int64_t nANow = GetAdjustedTime(); int64_t nANow = GetAdjustedTime();
int nTries = 0; int nTries = 0;
while (true) while (!interruptNet)
{ {
CAddrInfo addr = addrman.Select(fFeeler); CAddrInfo addr = addrman.Select(fFeeler);
@ -1700,7 +1706,8 @@ void CConnman::ThreadOpenConnections()
if (fFeeler) { if (fFeeler) {
// Add small amount of random noise before connection to avoid synchronization. // Add small amount of random noise before connection to avoid synchronization.
int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000); int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
MilliSleep(randsleep); if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep)))
return;
LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString()); LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString());
} }
@ -1779,11 +1786,12 @@ void CConnman::ThreadOpenAddedConnections()
// OpenNetworkConnection can detect existing connections to that IP/port. // OpenNetworkConnection can detect existing connections to that IP/port.
CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort())); CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort()));
OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false); OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false);
MilliSleep(500); if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
} }
} }
if (!interruptNet.sleep_for(std::chrono::minutes(2)))
MilliSleep(120000); // Retry every 2 minutes return;
} }
} }
@ -1793,7 +1801,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
// //
// Initiate outbound network connection // Initiate outbound network connection
// //
boost::this_thread::interruption_point(); if (interruptNet) {
return false;
}
if (!fNetworkActive) { if (!fNetworkActive) {
return false; return false;
} }
@ -1806,7 +1816,6 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
return false; return false;
CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure); CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure);
boost::this_thread::interruption_point();
if (!pnode) if (!pnode)
return false; return false;
@ -1820,13 +1829,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
return true; return true;
} }
void CConnman::ThreadMessageHandler() void CConnman::ThreadMessageHandler()
{ {
boost::mutex condition_mutex; while (!flagInterruptMsgProc)
boost::unique_lock<boost::mutex> lock(condition_mutex);
while (true)
{ {
std::vector<CNode*> vNodesCopy; std::vector<CNode*> vNodesCopy;
{ {
@ -1849,7 +1854,7 @@ void CConnman::ThreadMessageHandler()
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv) if (lockRecv)
{ {
if (!GetNodeSignals().ProcessMessages(pnode, *this)) if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
if (pnode->nSendSize < GetSendBufferSize()) if (pnode->nSendSize < GetSendBufferSize())
@ -1861,15 +1866,17 @@ void CConnman::ThreadMessageHandler()
} }
} }
} }
boost::this_thread::interruption_point(); if (flagInterruptMsgProc)
return;
// Send messages // Send messages
{ {
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) if (lockSend)
GetNodeSignals().SendMessages(pnode, *this); GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
} }
boost::this_thread::interruption_point(); if (flagInterruptMsgProc)
return;
} }
{ {
@ -1878,8 +1885,10 @@ void CConnman::ThreadMessageHandler()
pnode->Release(); pnode->Release();
} }
if (fSleep) if (fSleep) {
messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100)); std::unique_lock<std::mutex> lock(mutexMsgProc);
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
}
} }
} }
@ -2071,6 +2080,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe
nMaxOutbound = 0; nMaxOutbound = 0;
nBestHeight = 0; nBestHeight = 0;
clientInterface = NULL; clientInterface = NULL;
flagInterruptMsgProc = false;
} }
NodeId CConnman::GetNewNodeId() NodeId CConnman::GetNewNodeId()
@ -2078,7 +2088,7 @@ NodeId CConnman::GetNewNodeId()
return nLastNodeId.fetch_add(1, std::memory_order_relaxed); return nLastNodeId.fetch_add(1, std::memory_order_relaxed);
} }
bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions) bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions)
{ {
nTotalBytesRecv = 0; nTotalBytesRecv = 0;
nTotalBytesSent = 0; nTotalBytesSent = 0;
@ -2145,24 +2155,27 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st
// //
// Start threads // Start threads
// //
InterruptSocks5(false);
interruptNet.reset();
flagInterruptMsgProc = false;
// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
if (!GetBoolArg("-dnsseed", true)) if (!GetBoolArg("-dnsseed", true))
LogPrintf("DNS seeding disabled\n"); LogPrintf("DNS seeding disabled\n");
else else
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this)))); threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));
// Send and receive from sockets, accept connections
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this))));
// Initiate outbound connections from -addnode // Initiate outbound connections from -addnode
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this)))); threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));
// Initiate outbound connections unless connect=0 // Initiate outbound connections unless connect=0
if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0") if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0")
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this)))); threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this)));
// Process messages // Process messages
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this)))); threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
// Dump network addresses // Dump network addresses
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL); scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
@ -2185,12 +2198,34 @@ public:
} }
instance_of_cnetcleanup; instance_of_cnetcleanup;
void CConnman::Stop() void CConnman::Interrupt()
{ {
LogPrintf("%s\n",__func__); {
std::lock_guard<std::mutex> lock(mutexMsgProc);
flagInterruptMsgProc = true;
}
condMsgProc.notify_all();
interruptNet();
InterruptSocks5(true);
if (semOutbound) if (semOutbound)
for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
semOutbound->post(); semOutbound->post();
}
void CConnman::Stop()
{
if (threadMessageHandler.joinable())
threadMessageHandler.join();
if (threadOpenConnections.joinable())
threadOpenConnections.join();
if (threadOpenAddedConnections.joinable())
threadOpenAddedConnections.join();
if (threadDNSAddressSeed.joinable())
threadDNSAddressSeed.join();
if (threadSocketHandler.joinable())
threadSocketHandler.join();
if (fAddressesInitialized) if (fAddressesInitialized)
{ {
@ -2233,6 +2268,7 @@ void CConnman::DeleteNode(CNode* pnode)
CConnman::~CConnman() CConnman::~CConnman()
{ {
Interrupt();
Stop(); Stop();
} }

23
src/net.h

@ -19,11 +19,14 @@
#include "streams.h" #include "streams.h"
#include "sync.h" #include "sync.h"
#include "uint256.h" #include "uint256.h"
#include "threadinterrupt.h"
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <stdint.h> #include <stdint.h>
#include <thread>
#include <memory> #include <memory>
#include <condition_variable>
#ifndef WIN32 #ifndef WIN32
#include <arpa/inet.h> #include <arpa/inet.h>
@ -142,8 +145,9 @@ public:
}; };
CConnman(uint64_t seed0, uint64_t seed1); CConnman(uint64_t seed0, uint64_t seed1);
~CConnman(); ~CConnman();
bool Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options options); bool Start(CScheduler& scheduler, std::string& strNodeError, Options options);
void Stop(); void Stop();
void Interrupt();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
bool GetNetworkActive() const { return fNetworkActive; }; bool GetNetworkActive() const { return fNetworkActive; };
void SetNetworkActive(bool active); void SetNetworkActive(bool active);
@ -402,7 +406,6 @@ private:
std::list<CNode*> vNodesDisconnected; std::list<CNode*> vNodesDisconnected;
mutable CCriticalSection cs_vNodes; mutable CCriticalSection cs_vNodes;
std::atomic<NodeId> nLastNodeId; std::atomic<NodeId> nLastNodeId;
boost::condition_variable messageHandlerCondition;
/** Services this instance offers */ /** Services this instance offers */
ServiceFlags nLocalServices; ServiceFlags nLocalServices;
@ -419,6 +422,18 @@ private:
/** SipHasher seeds for deterministic randomness */ /** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1; const uint64_t nSeed0, nSeed1;
std::condition_variable condMsgProc;
std::mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc;
CThreadInterrupt interruptNet;
std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
std::thread threadOpenConnections;
std::thread threadMessageHandler;
}; };
extern std::unique_ptr<CConnman> g_connman; extern std::unique_ptr<CConnman> g_connman;
void Discover(boost::thread_group& threadGroup); void Discover(boost::thread_group& threadGroup);
@ -445,8 +460,8 @@ struct CombinerAll
// Signals for message handling // Signals for message handling
struct CNodeSignals struct CNodeSignals
{ {
boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> ProcessMessages; boost::signals2::signal<bool (CNode*, CConnman&, std::atomic<bool>&), CombinerAll> ProcessMessages;
boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> SendMessages; boost::signals2::signal<bool (CNode*, CConnman&, std::atomic<bool>&), CombinerAll> SendMessages;
boost::signals2::signal<void (CNode*, CConnman&)> InitializeNode; boost::signals2::signal<void (CNode*, CConnman&)> InitializeNode;
boost::signals2::signal<void (NodeId, bool&)> FinalizeNode; boost::signals2::signal<void (NodeId, bool&)> FinalizeNode;
}; };

36
src/net_processing.cpp

@ -886,7 +886,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
} }
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman) void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
@ -901,7 +901,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
const CInv &inv = *it; const CInv &inv = *it;
{ {
boost::this_thread::interruption_point(); if (interruptMsgProc)
return;
it++; it++;
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK)
@ -1055,7 +1057,7 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params
return nFetchFlags; return nFetchFlags;
} }
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman) bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
@ -1295,7 +1297,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
int64_t nSince = nNow - 10 * 60; int64_t nSince = nNow - 10 * 60;
BOOST_FOREACH(CAddress& addr, vAddr) BOOST_FOREACH(CAddress& addr, vAddr)
{ {
boost::this_thread::interruption_point(); if (interruptMsgProc)
return true;
if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES)
continue; continue;
@ -1377,7 +1380,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
{ {
CInv &inv = vInv[nInv]; CInv &inv = vInv[nInv];
boost::this_thread::interruption_point(); if (interruptMsgProc)
return true;
bool fAlreadyHave = AlreadyHave(inv); bool fAlreadyHave = AlreadyHave(inv);
LogPrint("net", "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->id); LogPrint("net", "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->id);
@ -1439,7 +1443,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id); LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id);
pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, chainparams.GetConsensus(), connman); ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
} }
@ -1513,7 +1517,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
inv.hash = req.blockhash; inv.hash = req.blockhash;
pfrom->vRecvGetData.push_back(inv); pfrom->vRecvGetData.push_back(inv);
ProcessGetData(pfrom, chainparams.GetConsensus(), connman); ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
return true; return true;
} }
@ -1925,10 +1929,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
} // cs_main } // cs_main
if (fProcessBLOCKTXN) if (fProcessBLOCKTXN)
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman); return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, interruptMsgProc);
if (fRevertToHeaderProcessing) if (fRevertToHeaderProcessing)
return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman); return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman, interruptMsgProc);
if (fBlockReconstructed) { if (fBlockReconstructed) {
// If we got here, we were able to optimistically reconstruct a // If we got here, we were able to optimistically reconstruct a
@ -2441,7 +2445,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
} }
// requires LOCK(cs_vRecvMsg) // requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom, CConnman& connman) bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
const CChainParams& chainparams = Params(); const CChainParams& chainparams = Params();
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
@ -2459,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
bool fOk = true; bool fOk = true;
if (!pfrom->vRecvGetData.empty()) if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus(), connman); ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
// this maintains the order of responses // this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return fOk; if (!pfrom->vRecvGetData.empty()) return fOk;
@ -2520,8 +2524,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
bool fRet = false; bool fRet = false;
try try
{ {
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman); fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
boost::this_thread::interruption_point(); if (interruptMsgProc)
return true;
} }
catch (const std::ios_base::failure& e) catch (const std::ios_base::failure& e)
{ {
@ -2546,9 +2551,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
PrintExceptionContinue(&e, "ProcessMessages()"); PrintExceptionContinue(&e, "ProcessMessages()");
} }
} }
catch (const boost::thread_interrupted&) {
throw;
}
catch (const std::exception& e) { catch (const std::exception& e) {
PrintExceptionContinue(&e, "ProcessMessages()"); PrintExceptionContinue(&e, "ProcessMessages()");
} catch (...) { } catch (...) {
@ -2585,7 +2587,7 @@ public:
} }
}; };
bool SendMessages(CNode* pto, CConnman& connman) bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
const Consensus::Params& consensusParams = Params().GetConsensus(); const Consensus::Params& consensusParams = Params().GetConsensus();
{ {

5
src/net_processing.h

@ -39,13 +39,14 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
void Misbehaving(NodeId nodeid, int howmuch); void Misbehaving(NodeId nodeid, int howmuch);
/** Process protocol messages received from a given node */ /** Process protocol messages received from a given node */
bool ProcessMessages(CNode* pfrom, CConnman& connman); bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interrupt);
/** /**
* Send queued protocol messages to be sent to a give node. * Send queued protocol messages to be sent to a give node.
* *
* @param[in] pto The node which we are sending messages to. * @param[in] pto The node which we are sending messages to.
* @param[in] connman The connection manager for that node. * @param[in] connman The connection manager for that node.
* @param[in] interrupt Interrupt condition for processing threads
*/ */
bool SendMessages(CNode* pto, CConnman& connman); bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);
#endif // BITCOIN_NET_PROCESSING_H #endif // BITCOIN_NET_PROCESSING_H

19
src/netbase.cpp

@ -16,20 +16,14 @@
#include "util.h" #include "util.h"
#include "utilstrencodings.h" #include "utilstrencodings.h"
#ifdef HAVE_GETADDRINFO_A #include <atomic>
#include <netdb.h>
#endif
#ifndef WIN32 #ifndef WIN32
#if HAVE_INET_PTON
#include <arpa/inet.h>
#endif
#include <fcntl.h> #include <fcntl.h>
#endif #endif
#include <boost/algorithm/string/case_conv.hpp> // for to_lower() #include <boost/algorithm/string/case_conv.hpp> // for to_lower()
#include <boost/algorithm/string/predicate.hpp> // for startswith() and endswith() #include <boost/algorithm/string/predicate.hpp> // for startswith() and endswith()
#include <boost/thread.hpp>
#if !defined(HAVE_MSG_NOSIGNAL) && !defined(MSG_NOSIGNAL) #if !defined(HAVE_MSG_NOSIGNAL) && !defined(MSG_NOSIGNAL)
#define MSG_NOSIGNAL 0 #define MSG_NOSIGNAL 0
@ -44,6 +38,7 @@ bool fNameLookup = DEFAULT_NAME_LOOKUP;
// Need ample time for negotiation for very slow proxies such as Tor (milliseconds) // Need ample time for negotiation for very slow proxies such as Tor (milliseconds)
static const int SOCKS5_RECV_TIMEOUT = 20 * 1000; static const int SOCKS5_RECV_TIMEOUT = 20 * 1000;
static std::atomic<bool> interruptSocks5Recv(false);
enum Network ParseNetwork(std::string net) { enum Network ParseNetwork(std::string net) {
boost::to_lower(net); boost::to_lower(net);
@ -206,7 +201,7 @@ struct timeval MillisToTimeval(int64_t nTimeout)
/** /**
* Read bytes from socket. This will either read the full number of bytes requested * Read bytes from socket. This will either read the full number of bytes requested
* or return False on error or timeout. * or return False on error or timeout.
* This function can be interrupted by boost thread interrupt. * This function can be interrupted by calling InterruptSocks5()
* *
* @param data Buffer to receive into * @param data Buffer to receive into
* @param len Length of data to receive * @param len Length of data to receive
@ -246,7 +241,8 @@ bool static InterruptibleRecv(char* data, size_t len, int timeout, SOCKET& hSock
return false; return false;
} }
} }
boost::this_thread::interruption_point(); if (interruptSocks5Recv)
return false;
curTime = GetTimeMillis(); curTime = GetTimeMillis();
} }
return len == 0; return len == 0;
@ -715,3 +711,8 @@ bool SetSocketNonBlocking(SOCKET& hSocket, bool fNonBlocking)
return true; return true;
} }
void InterruptSocks5(bool interrupt)
{
interruptSocks5Recv = interrupt;
}

1
src/netbase.h

@ -63,5 +63,6 @@ bool SetSocketNonBlocking(SOCKET& hSocket, bool fNonBlocking);
* Convert milliseconds to a struct timeval for e.g. select. * Convert milliseconds to a struct timeval for e.g. select.
*/ */
struct timeval MillisToTimeval(int64_t nTimeout); struct timeval MillisToTimeval(int64_t nTimeout);
void InterruptSocks5(bool interrupt);
#endif // BITCOIN_NETBASE_H #endif // BITCOIN_NETBASE_H

20
src/test/DoS_tests.cpp

@ -47,6 +47,8 @@ BOOST_FIXTURE_TEST_SUITE(DoS_tests, TestingSetup)
BOOST_AUTO_TEST_CASE(DoS_banning) BOOST_AUTO_TEST_CASE(DoS_banning)
{ {
std::atomic<bool> interruptDummy(false);
connman->ClearBanned(); connman->ClearBanned();
CAddress addr1(ip(0xa0b0c001), NODE_NONE); CAddress addr1(ip(0xa0b0c001), NODE_NONE);
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true); CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true);
@ -54,7 +56,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
GetNodeSignals().InitializeNode(&dummyNode1, *connman); GetNodeSignals().InitializeNode(&dummyNode1, *connman);
dummyNode1.nVersion = 1; dummyNode1.nVersion = 1;
Misbehaving(dummyNode1.GetId(), 100); // Should get banned Misbehaving(dummyNode1.GetId(), 100); // Should get banned
SendMessages(&dummyNode1, *connman); SendMessages(&dummyNode1, *connman, interruptDummy);
BOOST_CHECK(connman->IsBanned(addr1)); BOOST_CHECK(connman->IsBanned(addr1));
BOOST_CHECK(!connman->IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned BOOST_CHECK(!connman->IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned
@ -64,16 +66,18 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
GetNodeSignals().InitializeNode(&dummyNode2, *connman); GetNodeSignals().InitializeNode(&dummyNode2, *connman);
dummyNode2.nVersion = 1; dummyNode2.nVersion = 1;
Misbehaving(dummyNode2.GetId(), 50); Misbehaving(dummyNode2.GetId(), 50);
SendMessages(&dummyNode2, *connman); SendMessages(&dummyNode2, *connman, interruptDummy);
BOOST_CHECK(!connman->IsBanned(addr2)); // 2 not banned yet... BOOST_CHECK(!connman->IsBanned(addr2)); // 2 not banned yet...
BOOST_CHECK(connman->IsBanned(addr1)); // ... but 1 still should be BOOST_CHECK(connman->IsBanned(addr1)); // ... but 1 still should be
Misbehaving(dummyNode2.GetId(), 50); Misbehaving(dummyNode2.GetId(), 50);
SendMessages(&dummyNode2, *connman); SendMessages(&dummyNode2, *connman, interruptDummy);
BOOST_CHECK(connman->IsBanned(addr2)); BOOST_CHECK(connman->IsBanned(addr2));
} }
BOOST_AUTO_TEST_CASE(DoS_banscore) BOOST_AUTO_TEST_CASE(DoS_banscore)
{ {
std::atomic<bool> interruptDummy(false);
connman->ClearBanned(); connman->ClearBanned();
ForceSetArg("-banscore", "111"); // because 11 is my favorite number ForceSetArg("-banscore", "111"); // because 11 is my favorite number
CAddress addr1(ip(0xa0b0c001), NODE_NONE); CAddress addr1(ip(0xa0b0c001), NODE_NONE);
@ -82,19 +86,21 @@ BOOST_AUTO_TEST_CASE(DoS_banscore)
GetNodeSignals().InitializeNode(&dummyNode1, *connman); GetNodeSignals().InitializeNode(&dummyNode1, *connman);
dummyNode1.nVersion = 1; dummyNode1.nVersion = 1;
Misbehaving(dummyNode1.GetId(), 100); Misbehaving(dummyNode1.GetId(), 100);
SendMessages(&dummyNode1, *connman); SendMessages(&dummyNode1, *connman, interruptDummy);
BOOST_CHECK(!connman->IsBanned(addr1)); BOOST_CHECK(!connman->IsBanned(addr1));
Misbehaving(dummyNode1.GetId(), 10); Misbehaving(dummyNode1.GetId(), 10);
SendMessages(&dummyNode1, *connman); SendMessages(&dummyNode1, *connman, interruptDummy);
BOOST_CHECK(!connman->IsBanned(addr1)); BOOST_CHECK(!connman->IsBanned(addr1));
Misbehaving(dummyNode1.GetId(), 1); Misbehaving(dummyNode1.GetId(), 1);
SendMessages(&dummyNode1, *connman); SendMessages(&dummyNode1, *connman, interruptDummy);
BOOST_CHECK(connman->IsBanned(addr1)); BOOST_CHECK(connman->IsBanned(addr1));
ForceSetArg("-banscore", std::to_string(DEFAULT_BANSCORE_THRESHOLD)); ForceSetArg("-banscore", std::to_string(DEFAULT_BANSCORE_THRESHOLD));
} }
BOOST_AUTO_TEST_CASE(DoS_bantime) BOOST_AUTO_TEST_CASE(DoS_bantime)
{ {
std::atomic<bool> interruptDummy(false);
connman->ClearBanned(); connman->ClearBanned();
int64_t nStartTime = GetTime(); int64_t nStartTime = GetTime();
SetMockTime(nStartTime); // Overrides future calls to GetTime() SetMockTime(nStartTime); // Overrides future calls to GetTime()
@ -106,7 +112,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
dummyNode.nVersion = 1; dummyNode.nVersion = 1;
Misbehaving(dummyNode.GetId(), 100); Misbehaving(dummyNode.GetId(), 100);
SendMessages(&dummyNode, *connman); SendMessages(&dummyNode, *connman, interruptDummy);
BOOST_CHECK(connman->IsBanned(addr)); BOOST_CHECK(connman->IsBanned(addr));
SetMockTime(nStartTime+60*60); SetMockTime(nStartTime+60*60);

41
src/threadinterrupt.cpp

@ -0,0 +1,41 @@
// Copyright (c) 2009-2010 Satoshi Nakamoto
// Copyright (c) 2009-2016 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "threadinterrupt.h"
CThreadInterrupt::operator bool() const
{
return flag.load(std::memory_order_acquire);
}
void CThreadInterrupt::reset()
{
flag.store(false, std::memory_order_release);
}
void CThreadInterrupt::operator()()
{
{
std::unique_lock<std::mutex> lock(mut);
flag.store(true, std::memory_order_release);
}
cond.notify_all();
}
bool CThreadInterrupt::sleep_for(std::chrono::milliseconds rel_time)
{
std::unique_lock<std::mutex> lock(mut);
return !cond.wait_for(lock, rel_time, [this]() { return flag.load(std::memory_order_acquire); });
}
bool CThreadInterrupt::sleep_for(std::chrono::seconds rel_time)
{
return sleep_for(std::chrono::duration_cast<std::chrono::milliseconds>(rel_time));
}
bool CThreadInterrupt::sleep_for(std::chrono::minutes rel_time)
{
return sleep_for(std::chrono::duration_cast<std::chrono::milliseconds>(rel_time));
}

34
src/threadinterrupt.h

@ -0,0 +1,34 @@
// Copyright (c) 2016 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_THREADINTERRUPT_H
#define BITCOIN_THREADINTERRUPT_H
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
/*
A helper class for interruptible sleeps. Calling operator() will interrupt
any current sleep, and after that point operator bool() will return true
until reset.
*/
class CThreadInterrupt
{
public:
explicit operator bool() const;
void operator()();
void reset();
bool sleep_for(std::chrono::milliseconds rel_time);
bool sleep_for(std::chrono::seconds rel_time);
bool sleep_for(std::chrono::minutes rel_time);
private:
std::condition_variable cond;
std::mutex mut;
std::atomic<bool> flag;
};
#endif //BITCOIN_THREADINTERRUPT_H
Loading…
Cancel
Save