From 53ad9a133a53feb35e31698720cec69c14f56dc1 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:05 -0500 Subject: [PATCH 01/16] net: fix typo causing the wrong receive buffer size Surprisingly this hasn't been causing me any issues while testing, probably because it requires lots of large blocks to be flying around. Send/Recv corks need tests! --- src/net.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net.cpp b/src/net.cpp index bf2beb774..e7c7cf011 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2102,7 +2102,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c nMaxFeeler = connOptions.nMaxFeeler; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; - nReceiveFloodSize = connOptions.nSendBufferMaxSize; + nReceiveFloodSize = connOptions.nReceiveFloodSize; nMaxOutboundLimit = connOptions.nMaxOutboundLimit; nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe; From e5bcd9c84fd3107321ff6dbdef067ba03f2b43cb Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:07 -0500 Subject: [PATCH 02/16] net: make vRecvMsg a list so that we can use splice() --- src/net.cpp | 2 +- src/net.h | 2 +- src/net_processing.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index e7c7cf011..3478d04a7 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1859,7 +1859,7 @@ void CConnman::ThreadMessageHandler() if (pnode->nSendSize < GetSendBufferSize()) { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete())) + if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete())) { fSleep = false; } diff --git a/src/net.h b/src/net.h index 6ca402f71..f346b0735 100644 --- a/src/net.h +++ b/src/net.h @@ -605,7 +605,7 @@ public: CCriticalSection cs_vSend; std::deque vRecvGetData; - std::deque vRecvMsg; + std::list vRecvMsg; CCriticalSection cs_vRecvMsg; uint64_t nRecvBytes; int nRecvVersion; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index ccfbb77fc..f53a4b263 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2471,7 +2471,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return fOk; - std::deque::iterator it = pfrom->vRecvMsg.begin(); + auto it = pfrom->vRecvMsg.begin(); while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway if (pfrom->nSendSize >= nMaxSendBufferSize) From 5b4a8ac6d6b4d8e17b448f2b4927025fb352f7af Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:09 -0500 Subject: [PATCH 03/16] net: make GetReceiveFloodSize public This will be needed so that the message processor can cork incoming messages --- src/net.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/net.h b/src/net.h index f346b0735..a11165c7e 100644 --- a/src/net.h +++ b/src/net.h @@ -324,6 +324,7 @@ public: /** Get a unique deterministic randomizer. */ CSipHasher GetDeterministicRandomizer(uint64_t id); + unsigned int GetReceiveFloodSize() const; private: struct ListenSocket { SOCKET socket; @@ -365,8 +366,6 @@ private: void DumpData(); void DumpBanlist(); - unsigned int GetReceiveFloodSize() const; - // Network stats void RecordBytesRecv(uint64_t bytes); void RecordBytesSent(uint64_t bytes); From f6315e07f9383f3f43e37ada0d6a835810d610b9 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:11 -0500 Subject: [PATCH 04/16] net: only disconnect if fDisconnect has been set These conditions are problematic to check without locking, and we shouldn't be relying on the refcount to disconnect. --- src/net.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 3478d04a7..65f1c62ad 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1051,8 +1051,7 @@ void CConnman::ThreadSocketHandler() std::vector vNodesCopy = vNodes; BOOST_FOREACH(CNode* pnode, vNodesCopy) { - if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0)) + if (pnode->fDisconnect) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); From 60425870d78cf2bef1fce926ad53f51166c6a3f0 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:13 -0500 Subject: [PATCH 05/16] net: wait until the node is destroyed to delete its recv buffer when vRecvMsg becomes a private buffer, it won't make sense to allow other threads to mess with it anymore. --- src/net.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 65f1c62ad..9ce7475bc 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -437,11 +437,6 @@ void CNode::CloseSocketDisconnect() LogPrint("net", "disconnecting peer=%d\n", id); CloseSocket(hSocket); } - - // in case this fails, we'll empty the recv buffer when the CNode is deleted - TRY_LOCK(cs_vRecvMsg, lockRecv); - if (lockRecv) - vRecvMsg.clear(); } void CConnman::ClearBanned() From 0e973d970a2114c11f4a95f09721d977da7f0a94 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Wed, 4 Jan 2017 09:32:29 -0500 Subject: [PATCH 06/16] net: remove redundant max sendbuffer size check This is left-over from before there was proper accounting. Hitting 2x the sendbuffer size should not be possible. --- src/net_processing.cpp | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index f53a4b263..3d38995c5 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1059,8 +1059,6 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic& interruptMsgProc) { - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id); if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0) { @@ -1413,11 +1411,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Track requests for our stuff GetMainSignals().Inventory(inv.hash); - - if (pfrom->nSendSize > (nMaxSendBufferSize * 2)) { - Misbehaving(pfrom->GetId(), 50); - return error("send buffer size() = %u", pfrom->nSendSize); - } } if (!vToFetch.empty()) From 56212e20acf1534d443cb910c9bf3a30f84d0f02 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:15 -0500 Subject: [PATCH 07/16] net: set message deserialization version when it's actually time to deserialize We'll soon no longer have access to vRecvMsg, and this is more intuitive anyway. --- src/net.cpp | 2 +- src/net.h | 9 +++++---- src/net_processing.cpp | 1 + 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 9ce7475bc..6d9971a0f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -653,7 +653,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete // get current incomplete message, or create a new one if (vRecvMsg.empty() || vRecvMsg.back().complete()) - vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion)); + vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION)); CNetMessage& msg = vRecvMsg.back(); diff --git a/src/net.h b/src/net.h index a11165c7e..6c12171bf 100644 --- a/src/net.h +++ b/src/net.h @@ -607,7 +607,7 @@ public: std::list vRecvMsg; CCriticalSection cs_vRecvMsg; uint64_t nRecvBytes; - int nRecvVersion; + std::atomic nRecvVersion; int64_t nLastSend; int64_t nLastRecv; @@ -747,12 +747,13 @@ public: // requires LOCK(cs_vRecvMsg) bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); - // requires LOCK(cs_vRecvMsg) void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; - BOOST_FOREACH(CNetMessage &msg, vRecvMsg) - msg.SetVersion(nVersionIn); + } + int GetRecvVersion() + { + return nRecvVersion; } void SetSendVersion(int nVersionIn) { diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3d38995c5..556975d14 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2485,6 +2485,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // at this point, any failure means we can delete the current message it++; + msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); From 60befa3997b98d3f913010f2621bec734f643526 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:17 -0500 Subject: [PATCH 08/16] net: handle message accounting in ReceiveMsgBytes This allows locking to be pushed down to only where it's needed Also reuse the current time rather than checking multiple times. --- src/net.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 6d9971a0f..776047171 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -648,6 +648,9 @@ void CNode::copyStats(CNodeStats &stats) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) { complete = false; + int64_t nTimeMicros = GetTimeMicros(); + nLastRecv = nTimeMicros / 1000000; + nRecvBytes += nBytes; while (nBytes > 0) { // get current incomplete message, or create a new one @@ -685,7 +688,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete assert(i != mapRecvBytesPerMsgCmd.end()); i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE; - msg.nTime = GetTimeMicros(); + msg.nTime = nTimeMicros; complete = true; } } @@ -1237,8 +1240,6 @@ void CConnman::ThreadSocketHandler() pnode->CloseSocketDisconnect(); if(notify) condMsgProc.notify_one(); - pnode->nLastRecv = GetTime(); - pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); } else if (nBytes == 0) From f5c36d19b636f01cc24417bc2b2f5b226ff3dd2c Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:19 -0500 Subject: [PATCH 09/16] net: record bytes written before notifying the message processor --- src/net.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net.cpp b/src/net.cpp index 776047171..312a6e094 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1238,9 +1238,9 @@ void CConnman::ThreadSocketHandler() bool notify = false; if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); + RecordBytesRecv(nBytes); if(notify) condMsgProc.notify_one(); - RecordBytesRecv(nBytes); } else if (nBytes == 0) { From ef7b5ecbb73e9fd3670494c99cfc13ccf3574170 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:21 -0500 Subject: [PATCH 10/16] net: Add a simple function for waking the message handler This may be used publicly in the future --- src/net.cpp | 10 ++++++---- src/net.h | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 312a6e094..36db77abb 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1239,8 +1239,8 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); - if(notify) - condMsgProc.notify_one(); + if (notify) + WakeMessageHandler(); } else if (nBytes == 0) { @@ -1315,8 +1315,10 @@ void CConnman::ThreadSocketHandler() } } - - +void CConnman::WakeMessageHandler() +{ + condMsgProc.notify_one(); +} diff --git a/src/net.h b/src/net.h index 6c12171bf..cc79cce85 100644 --- a/src/net.h +++ b/src/net.h @@ -341,6 +341,8 @@ private: void ThreadSocketHandler(); void ThreadDNSAddressSeed(); + void WakeMessageHandler(); + uint64_t CalculateKeyedNetGroup(const CAddress& ad); CNode* FindNode(const CNetAddr& ip); From c72cc88ed39652e44b5be48e9455f6f395bd7e83 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:23 -0500 Subject: [PATCH 11/16] net: remove useless comments --- src/net_processing.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 556975d14..43a97ff71 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2445,9 +2445,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru { const CChainParams& chainparams = Params(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - //if (fDebug) - // LogPrintf("%s(%u messages)\n", __func__, pfrom->vRecvMsg.size()); - // // Message format // (4) message start @@ -2473,11 +2470,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // get next message CNetMessage& msg = *it; - //if (fDebug) - // LogPrintf("%s(message %u msgsz, %u bytes, complete:%s)\n", __func__, - // msg.hdr.nMessageSize, msg.vRecv.size(), - // msg.complete() ? "Y" : "N"); - // end, if an incomplete message is found if (!msg.complete()) break; From c5a8b1b946b1ab0bb82bd4270b2a40f5731abcff Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:26 -0500 Subject: [PATCH 12/16] net: rework the way that the messagehandler sleeps In order to sleep accurately, the message handler needs to know if _any_ node has more processing that it should do before the entire thread sleeps. Rather than returning a value that represents whether ProcessMessages encountered a message that should trigger a disconnnect, interpret the return value as whether or not that node has more work to do. Also, use a global fProcessWake value that can be set by other threads, which takes precedence (for one cycle) over the messagehandler's decision. Note that the previous behavior was to only process one message per loop (except in the case of a bad checksum or invalid header). That was changed in PR #3180. The only change here in that regard is that the current node now falls to the back of the processing queue for the bad checksum/invalid header cases. --- src/net.cpp | 30 ++++++++++++++------------ src/net.h | 3 +++ src/net_processing.cpp | 48 ++++++++++++++++++++++-------------------- src/net_processing.h | 1 + 4 files changed, 45 insertions(+), 37 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 36db77abb..947f01679 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1317,6 +1317,10 @@ void CConnman::ThreadSocketHandler() void CConnman::WakeMessageHandler() { + { + std::lock_guard lock(mutexMsgProc); + fMsgProcWake = true; + } condMsgProc.notify_one(); } @@ -1839,7 +1843,7 @@ void CConnman::ThreadMessageHandler() } } - bool fSleep = true; + bool fMoreWork = false; BOOST_FOREACH(CNode* pnode, vNodesCopy) { @@ -1851,16 +1855,8 @@ void CConnman::ThreadMessageHandler() TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) - pnode->CloseSocketDisconnect(); - - if (pnode->nSendSize < GetSendBufferSize()) - { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete())) - { - fSleep = false; - } - } + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize()); } } if (flagInterruptMsgProc) @@ -1882,10 +1878,11 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) { - std::unique_lock lock(mutexMsgProc); - condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + std::unique_lock lock(mutexMsgProc); + if (!fMoreWork) { + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; }); } + fMsgProcWake = false; } } @@ -2156,6 +2153,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c interruptNet.reset(); flagInterruptMsgProc = false; + { + std::unique_lock lock(mutexMsgProc); + fMsgProcWake = false; + } + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); diff --git a/src/net.h b/src/net.h index cc79cce85..4fc41bdda 100644 --- a/src/net.h +++ b/src/net.h @@ -424,6 +424,9 @@ private: /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; + /** flag for waking the message processor. */ + bool fMsgProcWake; + std::condition_variable condMsgProc; std::mutex mutexMsgProc; std::atomic flagInterruptMsgProc; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 43a97ff71..605e142e8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2453,36 +2453,43 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // (4) checksum // (x) data // - bool fOk = true; + bool fMoreWork = false; if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); + if (pfrom->fDisconnect) + return false; + // this maintains the order of responses - if (!pfrom->vRecvGetData.empty()) return fOk; + if (!pfrom->vRecvGetData.empty()) return true; - auto it = pfrom->vRecvMsg.begin(); - while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway if (pfrom->nSendSize >= nMaxSendBufferSize) - break; + return false; - // get next message - CNetMessage& msg = *it; + auto it = pfrom->vRecvMsg.begin(); + if (it == pfrom->vRecvMsg.end()) + return false; // end, if an incomplete message is found - if (!msg.complete()) - break; + if (!it->complete()) + return false; + + // get next message + CNetMessage msg = std::move(*it); // at this point, any failure means we can delete the current message - it++; + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin()); + + fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete(); msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); - fOk = false; - break; + pfrom->fDisconnect = true; + return false; } // Read header @@ -2490,7 +2497,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!hdr.IsValid(chainparams.MessageStart())) { LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); - continue; + return fMoreWork; } string strCommand = hdr.GetCommand(); @@ -2506,7 +2513,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru SanitizeString(strCommand), nMessageSize, HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); - continue; + return fMoreWork; } // Process message @@ -2515,7 +2522,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru { fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); if (interruptMsgProc) - return true; + return false; + if (!pfrom->vRecvGetData.empty()) + fMoreWork = true; } catch (const std::ios_base::failure& e) { @@ -2549,14 +2558,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!fRet) LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id); - break; - } - - // In case the connection got shut down, its receive buffer was wiped - if (!pfrom->fDisconnect) - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); - - return fOk; + return fMoreWork; } class CompareInvMempoolOrder diff --git a/src/net_processing.h b/src/net_processing.h index 230d805bd..1f33def1f 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -46,6 +46,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru * @param[in] pto The node which we are sending messages to. * @param[in] connman The connection manager for that node. * @param[in] interrupt Interrupt condition for processing threads + * @return True if there is more work to be done */ bool SendMessages(CNode* pto, CConnman& connman, std::atomic& interrupt); From 4d712e366ca7fffaf96394ef01c9246482c0d92e Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:28 -0500 Subject: [PATCH 13/16] net: add a new message queue for the message processor This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split. --- src/net.cpp | 12 +++++++++++- src/net.h | 3 +++ src/net_processing.cpp | 25 ++++++++++--------------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 947f01679..df2109e3f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); - if (notify) + if (notify) { + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + } WakeMessageHandler(); + } } else if (nBytes == 0) { diff --git a/src/net.h b/src/net.h index 4fc41bdda..21864e73d 100644 --- a/src/net.h +++ b/src/net.h @@ -608,6 +608,9 @@ public: std::deque> vSendMsg; CCriticalSection cs_vSend; + CCriticalSection cs_vProcessMsg; + std::list vProcessMsg; + std::deque vRecvGetData; std::list vRecvMsg; CCriticalSection cs_vRecvMsg; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 605e142e8..9963a872e 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (pfrom->nSendSize >= nMaxSendBufferSize) return false; - auto it = pfrom->vRecvMsg.begin(); - if (it == pfrom->vRecvMsg.end()) - return false; - - // end, if an incomplete message is found - if (!it->complete()) - return false; - - // get next message - CNetMessage msg = std::move(*it); - - // at this point, any failure means we can delete the current message - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin()); - - fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete(); + std::list msgs; + { + LOCK(pfrom->cs_vProcessMsg); + if (pfrom->vProcessMsg.empty()) + return false; + // Just take one message + msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + fMoreWork = !pfrom->vProcessMsg.empty(); + } + CNetMessage& msg(msgs.front()); msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start From c6e8a9bcffe4c0f236e27c663f08785d1a0a783b Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:30 -0500 Subject: [PATCH 14/16] net: add a flag to indicate when a node's process queue is full Messages are dumped very quickly from the socket handler to the processor, so it's the depth of the processing queue that's interesting. The socket handler checks the process queue's size during the brief message hand-off and pauses if necessary, and the processor possibly unpauses each time a message is popped off of its queue. --- src/net.cpp | 10 +++++++--- src/net.h | 11 ++--------- src/net_processing.cpp | 2 ++ 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index df2109e3f..70c04d7a0 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1165,9 +1165,7 @@ void CConnman::ThreadSocketHandler() } { TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && ( - pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || - pnode->GetTotalRecvSize() <= GetReceiveFloodSize())) + if (lockRecv && !pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1240,14 +1238,18 @@ void CConnman::ThreadSocketHandler() pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); if (notify) { + size_t nSizeAdded = 0; auto it(pnode->vRecvMsg.begin()); for (; it != pnode->vRecvMsg.end(); ++it) { if (!it->complete()) break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; } { LOCK(pnode->cs_vProcessMsg); pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } WakeMessageHandler(); } @@ -2592,6 +2594,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn minFeeFilter = 0; lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; + fPauseRecv = false; + nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; diff --git a/src/net.h b/src/net.h index 21864e73d..0eb430a8b 100644 --- a/src/net.h +++ b/src/net.h @@ -610,6 +610,7 @@ public: CCriticalSection cs_vProcessMsg; std::list vProcessMsg; + size_t nProcessQueueSize; std::deque vRecvGetData; std::list vRecvMsg; @@ -650,6 +651,7 @@ public: const NodeId id; const uint64_t nKeyedNetGroup; + std::atomic_bool fPauseRecv; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; @@ -743,15 +745,6 @@ public: return nRefCount; } - // requires LOCK(cs_vRecvMsg) - unsigned int GetTotalRecvSize() - { - unsigned int total = 0; - BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) - total += msg.vRecv.size() + 24; - return total; - } - // requires LOCK(cs_vRecvMsg) bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 9963a872e..93b6e2ec0 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2475,6 +2475,8 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru return false; // Just take one message msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; + pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); fMoreWork = !pfrom->vProcessMsg.empty(); } CNetMessage& msg(msgs.front()); From 991955ee81034dc3fbc1c2a8e60c04fc9e0b538c Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:32 -0500 Subject: [PATCH 15/16] net: add a flag to indicate when a node's send buffer is full Similar to the recv flag, but this one indicates whether or not the net's send buffer is full. The socket handler checks the send queue when a new message is added and pauses if necessary, and possibly unpauses after each message is drained from its buffer. --- src/net.cpp | 11 ++++++++--- src/net.h | 3 ++- src/net_processing.cpp | 6 ++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 70c04d7a0..8ae2bebd3 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -761,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const // requires LOCK(cs_vSend) -size_t SocketSendData(CNode *pnode) +size_t CConnman::SocketSendData(CNode *pnode) { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; @@ -778,6 +778,7 @@ size_t SocketSendData(CNode *pnode) if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); + pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more @@ -1286,8 +1287,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { size_t nBytes = SocketSendData(pnode); - if (nBytes) + if (nBytes) { RecordBytesSent(nBytes); + } } } @@ -1868,7 +1870,7 @@ void CConnman::ThreadMessageHandler() if (lockRecv) { bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); - fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize()); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); } } if (flagInterruptMsgProc) @@ -2595,6 +2597,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; fPauseRecv = false; + fPauseSend = false; nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) @@ -2675,6 +2678,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->nSendSize += nTotalSize; + if (pnode->nSendSize > nSendBufferMaxSize) + pnode->fPauseSend = true; pnode->vSendMsg.push_back(std::move(serializedHeader)); if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); diff --git a/src/net.h b/src/net.h index 0eb430a8b..db73be477 100644 --- a/src/net.h +++ b/src/net.h @@ -358,6 +358,7 @@ private: NodeId GetNewNodeId(); + size_t SocketSendData(CNode *pnode); //!check is the banlist has unwritten changes bool BannedSetIsDirty(); //!set the "dirty" flag for the banlist @@ -444,7 +445,6 @@ void Discover(boost::thread_group& threadGroup); void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); -size_t SocketSendData(CNode *pnode); struct CombinerAll { @@ -652,6 +652,7 @@ public: const uint64_t nKeyedNetGroup; std::atomic_bool fPauseRecv; + std::atomic_bool fPauseSend; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 93b6e2ec0..185ab980f 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -889,14 +889,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic& interruptMsgProc) { std::deque::iterator it = pfrom->vRecvGetData.begin(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); vector vNotFound; CNetMsgMaker msgMaker(pfrom->GetSendVersion()); LOCK(cs_main); while (it != pfrom->vRecvGetData.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) break; const CInv &inv = *it; @@ -2444,7 +2443,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interruptMsgProc) { const CChainParams& chainparams = Params(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); // // Message format // (4) message start @@ -2465,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!pfrom->vRecvGetData.empty()) return true; // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) return false; std::list msgs; From e60360e139852c655930e99d4bb4db554cd8385e Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:36 -0500 Subject: [PATCH 16/16] net: remove cs_vRecvMsg vRecvMsg is now only touched by the socket handler thread. The accounting vars (nRecvBytes/nLastRecv/mapRecvBytesPerMsgCmd) are also only used by the socket handler thread, with the exception of queries from rpc/gui. These accesses are not threadsafe, but they never were. This needs to be addressed separately. Also, update comment describing data flow --- src/net.cpp | 33 +++++++-------------------------- src/net.h | 4 +--- src/net_processing.cpp | 1 - 3 files changed, 8 insertions(+), 30 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 8ae2bebd3..11638fcc0 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -644,7 +644,6 @@ void CNode::copyStats(CNodeStats &stats) } #undef X -// requires LOCK(cs_vRecvMsg) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) { complete = false; @@ -1080,13 +1079,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { TRY_LOCK(pnode->cs_inventory, lockInv); if (lockInv) fDelete = true; - } } } if (fDelete) @@ -1146,15 +1141,10 @@ void CConnman::ThreadSocketHandler() // write buffer in this case before receiving more. This avoids // needlessly queueing received data, if the remote peer is not themselves // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is no (complete) message in the receive buffer, - // or there is space left in the buffer, select() for receiving data. - // * (if neither of the above applies, there is certainly one message - // in the receiver buffer ready to be processed). - // Together, that means that at least one of the following is always possible, - // so we don't deadlock: - // * We send some data. - // * We wait for data to be received (and disconnect after timeout). - // * We process a message in the buffer (message handler thread). + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { @@ -1165,8 +1155,7 @@ void CConnman::ThreadSocketHandler() } } { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && !pnode->fPauseRecv) + if (!pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1225,8 +1214,6 @@ void CConnman::ThreadSocketHandler() continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) { { // typical socket buffer is 8K-64K @@ -1865,14 +1852,8 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { - bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); - fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); - } - } + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; diff --git a/src/net.h b/src/net.h index db73be477..982a08f03 100644 --- a/src/net.h +++ b/src/net.h @@ -613,8 +613,6 @@ public: size_t nProcessQueueSize; std::deque vRecvGetData; - std::list vRecvMsg; - CCriticalSection cs_vRecvMsg; uint64_t nRecvBytes; std::atomic nRecvVersion; @@ -726,6 +724,7 @@ private: const ServiceFlags nLocalServices; const int nMyStartingHeight; int nSendVersion; + std::list vRecvMsg; // Used only by SocketHandler thread public: NodeId GetId() const { @@ -746,7 +745,6 @@ public: return nRefCount; } - // requires LOCK(cs_vRecvMsg) bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); void SetRecvVersion(int nVersionIn) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 185ab980f..32a5862f2 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2439,7 +2439,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, return true; } -// requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interruptMsgProc) { const CChainParams& chainparams = Params();