diff --git a/src/net.cpp b/src/net.cpp index 36db77abb..947f01679 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1317,6 +1317,10 @@ void CConnman::ThreadSocketHandler() void CConnman::WakeMessageHandler() { + { + std::lock_guard lock(mutexMsgProc); + fMsgProcWake = true; + } condMsgProc.notify_one(); } @@ -1839,7 +1843,7 @@ void CConnman::ThreadMessageHandler() } } - bool fSleep = true; + bool fMoreWork = false; BOOST_FOREACH(CNode* pnode, vNodesCopy) { @@ -1851,16 +1855,8 @@ void CConnman::ThreadMessageHandler() TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) - pnode->CloseSocketDisconnect(); - - if (pnode->nSendSize < GetSendBufferSize()) - { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete())) - { - fSleep = false; - } - } + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize()); } } if (flagInterruptMsgProc) @@ -1882,10 +1878,11 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) { - std::unique_lock lock(mutexMsgProc); - condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + std::unique_lock lock(mutexMsgProc); + if (!fMoreWork) { + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; }); } + fMsgProcWake = false; } } @@ -2156,6 +2153,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c interruptNet.reset(); flagInterruptMsgProc = false; + { + std::unique_lock lock(mutexMsgProc); + fMsgProcWake = false; + } + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); diff --git a/src/net.h b/src/net.h index cc79cce85..4fc41bdda 100644 --- a/src/net.h +++ b/src/net.h @@ -424,6 +424,9 @@ private: /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; + /** flag for waking the message processor. */ + bool fMsgProcWake; + std::condition_variable condMsgProc; std::mutex mutexMsgProc; std::atomic flagInterruptMsgProc; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 43a97ff71..605e142e8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2453,36 +2453,43 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // (4) checksum // (x) data // - bool fOk = true; + bool fMoreWork = false; if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); + if (pfrom->fDisconnect) + return false; + // this maintains the order of responses - if (!pfrom->vRecvGetData.empty()) return fOk; + if (!pfrom->vRecvGetData.empty()) return true; - auto it = pfrom->vRecvMsg.begin(); - while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway if (pfrom->nSendSize >= nMaxSendBufferSize) - break; + return false; - // get next message - CNetMessage& msg = *it; + auto it = pfrom->vRecvMsg.begin(); + if (it == pfrom->vRecvMsg.end()) + return false; // end, if an incomplete message is found - if (!msg.complete()) - break; + if (!it->complete()) + return false; + + // get next message + CNetMessage msg = std::move(*it); // at this point, any failure means we can delete the current message - it++; + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin()); + + fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete(); msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); - fOk = false; - break; + pfrom->fDisconnect = true; + return false; } // Read header @@ -2490,7 +2497,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!hdr.IsValid(chainparams.MessageStart())) { LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); - continue; + return fMoreWork; } string strCommand = hdr.GetCommand(); @@ -2506,7 +2513,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru SanitizeString(strCommand), nMessageSize, HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); - continue; + return fMoreWork; } // Process message @@ -2515,7 +2522,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru { fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); if (interruptMsgProc) - return true; + return false; + if (!pfrom->vRecvGetData.empty()) + fMoreWork = true; } catch (const std::ios_base::failure& e) { @@ -2549,14 +2558,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!fRet) LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id); - break; - } - - // In case the connection got shut down, its receive buffer was wiped - if (!pfrom->fDisconnect) - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); - - return fOk; + return fMoreWork; } class CompareInvMempoolOrder diff --git a/src/net_processing.h b/src/net_processing.h index 230d805bd..1f33def1f 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -46,6 +46,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru * @param[in] pto The node which we are sending messages to. * @param[in] connman The connection manager for that node. * @param[in] interrupt Interrupt condition for processing threads + * @return True if there is more work to be done */ bool SendMessages(CNode* pto, CConnman& connman, std::atomic& interrupt);