Browse Source

net: add a new message queue for the message processor

This separates the storage of messages from the net and queued messages for
processing, allowing the locks to be split.
0.14
Cory Fields 8 years ago
parent
commit
4d712e366c
  1. 12
      src/net.cpp
  2. 3
      src/net.h
  3. 23
      src/net_processing.cpp

12
src/net.cpp

@ -1239,9 +1239,19 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes); RecordBytesRecv(nBytes);
if (notify) if (notify) {
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
}
WakeMessageHandler(); WakeMessageHandler();
} }
}
else if (nBytes == 0) else if (nBytes == 0)
{ {
// socket closed gracefully // socket closed gracefully

3
src/net.h

@ -608,6 +608,9 @@ public:
std::deque<std::vector<unsigned char>> vSendMsg; std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend; CCriticalSection cs_vSend;
CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg;
std::deque<CInv> vRecvGetData; std::deque<CInv> vRecvGetData;
std::list<CNetMessage> vRecvMsg; std::list<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg; CCriticalSection cs_vRecvMsg;

23
src/net_processing.cpp

@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (pfrom->nSendSize >= nMaxSendBufferSize) if (pfrom->nSendSize >= nMaxSendBufferSize)
return false; return false;
auto it = pfrom->vRecvMsg.begin(); std::list<CNetMessage> msgs;
if (it == pfrom->vRecvMsg.end()) {
return false; LOCK(pfrom->cs_vProcessMsg);
if (pfrom->vProcessMsg.empty())
// end, if an incomplete message is found
if (!it->complete())
return false; return false;
// Just take one message
// get next message msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
CNetMessage msg = std::move(*it); fMoreWork = !pfrom->vProcessMsg.empty();
}
// at this point, any failure means we can delete the current message CNetMessage& msg(msgs.front());
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
msg.SetVersion(pfrom->GetRecvVersion()); msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start // Scan for message start

Loading…
Cancel
Save