Browse Source

net: rework the way that the messagehandler sleeps

In order to sleep accurately, the message handler needs to know if _any_ node
has more processing that it should do before the entire thread sleeps.

Rather than returning a value that represents whether ProcessMessages
encountered a message that should trigger a disconnnect, interpret the return
value as whether or not that node has more work to do.

Also, use a global fProcessWake value that can be set by other threads,
which takes precedence (for one cycle) over the messagehandler's decision.

Note that the previous behavior was to only process one message per loop
(except in the case of a bad checksum or invalid header). That was changed in
PR #3180.

The only change here in that regard is that the current node now falls to the
back of the processing queue for the bad checksum/invalid header cases.
0.14
Cory Fields 8 years ago
parent
commit
c5a8b1b946
  1. 30
      src/net.cpp
  2. 3
      src/net.h
  3. 48
      src/net_processing.cpp
  4. 1
      src/net_processing.h

30
src/net.cpp

@ -1317,6 +1317,10 @@ void CConnman::ThreadSocketHandler() @@ -1317,6 +1317,10 @@ void CConnman::ThreadSocketHandler()
void CConnman::WakeMessageHandler()
{
{
std::lock_guard<std::mutex> lock(mutexMsgProc);
fMsgProcWake = true;
}
condMsgProc.notify_one();
}
@ -1839,7 +1843,7 @@ void CConnman::ThreadMessageHandler() @@ -1839,7 +1843,7 @@ void CConnman::ThreadMessageHandler()
}
}
bool fSleep = true;
bool fMoreWork = false;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
@ -1851,16 +1855,8 @@ void CConnman::ThreadMessageHandler() @@ -1851,16 +1855,8 @@ void CConnman::ThreadMessageHandler()
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
pnode->CloseSocketDisconnect();
if (pnode->nSendSize < GetSendBufferSize())
{
if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete()))
{
fSleep = false;
}
}
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize());
}
}
if (flagInterruptMsgProc)
@ -1882,10 +1878,11 @@ void CConnman::ThreadMessageHandler() @@ -1882,10 +1878,11 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}
if (fSleep) {
std::unique_lock<std::mutex> lock(mutexMsgProc);
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(mutexMsgProc);
if (!fMoreWork) {
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
}
fMsgProcWake = false;
}
}
@ -2156,6 +2153,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c @@ -2156,6 +2153,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset();
flagInterruptMsgProc = false;
{
std::unique_lock<std::mutex> lock(mutexMsgProc);
fMsgProcWake = false;
}
// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));

3
src/net.h

@ -424,6 +424,9 @@ private: @@ -424,6 +424,9 @@ private:
/** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1;
/** flag for waking the message processor. */
bool fMsgProcWake;
std::condition_variable condMsgProc;
std::mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc;

48
src/net_processing.cpp

@ -2453,36 +2453,43 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru @@ -2453,36 +2453,43 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
// (4) checksum
// (x) data
//
bool fOk = true;
bool fMoreWork = false;
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
if (pfrom->fDisconnect)
return false;
// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return fOk;
if (!pfrom->vRecvGetData.empty()) return true;
auto it = pfrom->vRecvMsg.begin();
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway
if (pfrom->nSendSize >= nMaxSendBufferSize)
break;
return false;
// get next message
CNetMessage& msg = *it;
auto it = pfrom->vRecvMsg.begin();
if (it == pfrom->vRecvMsg.end())
return false;
// end, if an incomplete message is found
if (!msg.complete())
break;
if (!it->complete())
return false;
// get next message
CNetMessage msg = std::move(*it);
// at this point, any failure means we can delete the current message
it++;
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
fOk = false;
break;
pfrom->fDisconnect = true;
return false;
}
// Read header
@ -2490,7 +2497,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru @@ -2490,7 +2497,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!hdr.IsValid(chainparams.MessageStart()))
{
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
continue;
return fMoreWork;
}
string strCommand = hdr.GetCommand();
@ -2506,7 +2513,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru @@ -2506,7 +2513,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
SanitizeString(strCommand), nMessageSize,
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
continue;
return fMoreWork;
}
// Process message
@ -2515,7 +2522,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru @@ -2515,7 +2522,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
{
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
if (interruptMsgProc)
return true;
return false;
if (!pfrom->vRecvGetData.empty())
fMoreWork = true;
}
catch (const std::ios_base::failure& e)
{
@ -2549,14 +2558,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru @@ -2549,14 +2558,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!fRet)
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
break;
}
// In case the connection got shut down, its receive buffer was wiped
if (!pfrom->fDisconnect)
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
return fOk;
return fMoreWork;
}
class CompareInvMempoolOrder

1
src/net_processing.h

@ -46,6 +46,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru @@ -46,6 +46,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
* @param[in] pto The node which we are sending messages to.
* @param[in] connman The connection manager for that node.
* @param[in] interrupt Interrupt condition for processing threads
* @return True if there is more work to be done
*/
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);

Loading…
Cancel
Save