Browse Source

net: add a new message queue for the message processor

This separates the storage of messages from the net and queued messages for
processing, allowing the locks to be split.
0.14
Cory Fields 8 years ago
parent
commit
4d712e366c
  1. 12
      src/net.cpp
  2. 3
      src/net.h
  3. 25
      src/net_processing.cpp

12
src/net.cpp

@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler() @@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes);
if (notify)
if (notify) {
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
}
WakeMessageHandler();
}
}
else if (nBytes == 0)
{

3
src/net.h

@ -608,6 +608,9 @@ public: @@ -608,6 +608,9 @@ public:
std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend;
CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg;
std::deque<CInv> vRecvGetData;
std::list<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg;

25
src/net_processing.cpp

@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru @@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (pfrom->nSendSize >= nMaxSendBufferSize)
return false;
auto it = pfrom->vRecvMsg.begin();
if (it == pfrom->vRecvMsg.end())
return false;
// end, if an incomplete message is found
if (!it->complete())
return false;
// get next message
CNetMessage msg = std::move(*it);
// at this point, any failure means we can delete the current message
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
std::list<CNetMessage> msgs;
{
LOCK(pfrom->cs_vProcessMsg);
if (pfrom->vProcessMsg.empty())
return false;
// Just take one message
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
fMoreWork = !pfrom->vProcessMsg.empty();
}
CNetMessage& msg(msgs.front());
msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start

Loading…
Cancel
Save