mirror of
https://github.com/kvazar-network/kevacoin.git
synced 2025-01-24 05:44:30 +00:00
net: rework the way that the messagehandler sleeps
In order to sleep accurately, the message handler needs to know if _any_ node has more processing that it should do before the entire thread sleeps. Rather than returning a value that represents whether ProcessMessages encountered a message that should trigger a disconnnect, interpret the return value as whether or not that node has more work to do. Also, use a global fProcessWake value that can be set by other threads, which takes precedence (for one cycle) over the messagehandler's decision. Note that the previous behavior was to only process one message per loop (except in the case of a bad checksum or invalid header). That was changed in PR #3180. The only change here in that regard is that the current node now falls to the back of the processing queue for the bad checksum/invalid header cases.
This commit is contained in:
parent
c72cc88ed3
commit
c5a8b1b946
30
src/net.cpp
30
src/net.cpp
@ -1317,6 +1317,10 @@ void CConnman::ThreadSocketHandler()
|
|||||||
|
|
||||||
void CConnman::WakeMessageHandler()
|
void CConnman::WakeMessageHandler()
|
||||||
{
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutexMsgProc);
|
||||||
|
fMsgProcWake = true;
|
||||||
|
}
|
||||||
condMsgProc.notify_one();
|
condMsgProc.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1839,7 +1843,7 @@ void CConnman::ThreadMessageHandler()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fSleep = true;
|
bool fMoreWork = false;
|
||||||
|
|
||||||
BOOST_FOREACH(CNode* pnode, vNodesCopy)
|
BOOST_FOREACH(CNode* pnode, vNodesCopy)
|
||||||
{
|
{
|
||||||
@ -1851,16 +1855,8 @@ void CConnman::ThreadMessageHandler()
|
|||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
||||||
if (lockRecv)
|
if (lockRecv)
|
||||||
{
|
{
|
||||||
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
|
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
|
||||||
pnode->CloseSocketDisconnect();
|
fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize());
|
||||||
|
|
||||||
if (pnode->nSendSize < GetSendBufferSize())
|
|
||||||
{
|
|
||||||
if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete()))
|
|
||||||
{
|
|
||||||
fSleep = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (flagInterruptMsgProc)
|
if (flagInterruptMsgProc)
|
||||||
@ -1882,10 +1878,11 @@ void CConnman::ThreadMessageHandler()
|
|||||||
pnode->Release();
|
pnode->Release();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fSleep) {
|
std::unique_lock<std::mutex> lock(mutexMsgProc);
|
||||||
std::unique_lock<std::mutex> lock(mutexMsgProc);
|
if (!fMoreWork) {
|
||||||
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
|
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
|
||||||
}
|
}
|
||||||
|
fMsgProcWake = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2156,6 +2153,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
|
|||||||
interruptNet.reset();
|
interruptNet.reset();
|
||||||
flagInterruptMsgProc = false;
|
flagInterruptMsgProc = false;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(mutexMsgProc);
|
||||||
|
fMsgProcWake = false;
|
||||||
|
}
|
||||||
|
|
||||||
// Send and receive from sockets, accept connections
|
// Send and receive from sockets, accept connections
|
||||||
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
|
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
|
||||||
|
|
||||||
|
@ -424,6 +424,9 @@ private:
|
|||||||
/** SipHasher seeds for deterministic randomness */
|
/** SipHasher seeds for deterministic randomness */
|
||||||
const uint64_t nSeed0, nSeed1;
|
const uint64_t nSeed0, nSeed1;
|
||||||
|
|
||||||
|
/** flag for waking the message processor. */
|
||||||
|
bool fMsgProcWake;
|
||||||
|
|
||||||
std::condition_variable condMsgProc;
|
std::condition_variable condMsgProc;
|
||||||
std::mutex mutexMsgProc;
|
std::mutex mutexMsgProc;
|
||||||
std::atomic<bool> flagInterruptMsgProc;
|
std::atomic<bool> flagInterruptMsgProc;
|
||||||
|
@ -2453,36 +2453,43 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
// (4) checksum
|
// (4) checksum
|
||||||
// (x) data
|
// (x) data
|
||||||
//
|
//
|
||||||
bool fOk = true;
|
bool fMoreWork = false;
|
||||||
|
|
||||||
if (!pfrom->vRecvGetData.empty())
|
if (!pfrom->vRecvGetData.empty())
|
||||||
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
|
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
|
||||||
|
|
||||||
// this maintains the order of responses
|
if (pfrom->fDisconnect)
|
||||||
if (!pfrom->vRecvGetData.empty()) return fOk;
|
return false;
|
||||||
|
|
||||||
|
// this maintains the order of responses
|
||||||
|
if (!pfrom->vRecvGetData.empty()) return true;
|
||||||
|
|
||||||
auto it = pfrom->vRecvMsg.begin();
|
|
||||||
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
|
|
||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
||||||
break;
|
return false;
|
||||||
|
|
||||||
// get next message
|
auto it = pfrom->vRecvMsg.begin();
|
||||||
CNetMessage& msg = *it;
|
if (it == pfrom->vRecvMsg.end())
|
||||||
|
return false;
|
||||||
|
|
||||||
// end, if an incomplete message is found
|
// end, if an incomplete message is found
|
||||||
if (!msg.complete())
|
if (!it->complete())
|
||||||
break;
|
return false;
|
||||||
|
|
||||||
|
// get next message
|
||||||
|
CNetMessage msg = std::move(*it);
|
||||||
|
|
||||||
// at this point, any failure means we can delete the current message
|
// at this point, any failure means we can delete the current message
|
||||||
it++;
|
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
|
||||||
|
|
||||||
|
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
|
||||||
|
|
||||||
msg.SetVersion(pfrom->GetRecvVersion());
|
msg.SetVersion(pfrom->GetRecvVersion());
|
||||||
// Scan for message start
|
// Scan for message start
|
||||||
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
|
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
|
||||||
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
|
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
|
||||||
fOk = false;
|
pfrom->fDisconnect = true;
|
||||||
break;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read header
|
// Read header
|
||||||
@ -2490,7 +2497,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
if (!hdr.IsValid(chainparams.MessageStart()))
|
if (!hdr.IsValid(chainparams.MessageStart()))
|
||||||
{
|
{
|
||||||
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
|
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
|
||||||
continue;
|
return fMoreWork;
|
||||||
}
|
}
|
||||||
string strCommand = hdr.GetCommand();
|
string strCommand = hdr.GetCommand();
|
||||||
|
|
||||||
@ -2506,7 +2513,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
SanitizeString(strCommand), nMessageSize,
|
SanitizeString(strCommand), nMessageSize,
|
||||||
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
|
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
|
||||||
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
|
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
|
||||||
continue;
|
return fMoreWork;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process message
|
// Process message
|
||||||
@ -2515,7 +2522,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
{
|
{
|
||||||
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
|
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
|
||||||
if (interruptMsgProc)
|
if (interruptMsgProc)
|
||||||
return true;
|
return false;
|
||||||
|
if (!pfrom->vRecvGetData.empty())
|
||||||
|
fMoreWork = true;
|
||||||
}
|
}
|
||||||
catch (const std::ios_base::failure& e)
|
catch (const std::ios_base::failure& e)
|
||||||
{
|
{
|
||||||
@ -2549,14 +2558,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
if (!fRet)
|
if (!fRet)
|
||||||
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
|
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
|
||||||
|
|
||||||
break;
|
return fMoreWork;
|
||||||
}
|
|
||||||
|
|
||||||
// In case the connection got shut down, its receive buffer was wiped
|
|
||||||
if (!pfrom->fDisconnect)
|
|
||||||
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
|
|
||||||
|
|
||||||
return fOk;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class CompareInvMempoolOrder
|
class CompareInvMempoolOrder
|
||||||
|
@ -46,6 +46,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
* @param[in] pto The node which we are sending messages to.
|
* @param[in] pto The node which we are sending messages to.
|
||||||
* @param[in] connman The connection manager for that node.
|
* @param[in] connman The connection manager for that node.
|
||||||
* @param[in] interrupt Interrupt condition for processing threads
|
* @param[in] interrupt Interrupt condition for processing threads
|
||||||
|
* @return True if there is more work to be done
|
||||||
*/
|
*/
|
||||||
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);
|
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user