Browse Source

net: remove cs_vRecvMsg

vRecvMsg is now only touched by the socket handler thread.

The accounting vars (nRecvBytes/nLastRecv/mapRecvBytesPerMsgCmd) are also
only used by the socket handler thread, with the exception of queries from
rpc/gui. These accesses are not threadsafe, but they never were. This needs to
be addressed separately.

Also, update comment describing data flow
0.14
Cory Fields 8 years ago
parent
commit
e60360e139
  1. 33
      src/net.cpp
  2. 4
      src/net.h
  3. 1
      src/net_processing.cpp

33
src/net.cpp

@ -644,7 +644,6 @@ void CNode::copyStats(CNodeStats &stats)
} }
#undef X #undef X
// requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{ {
complete = false; complete = false;
@ -1080,13 +1079,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) if (lockSend)
{ {
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
TRY_LOCK(pnode->cs_inventory, lockInv); TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv) if (lockInv)
fDelete = true; fDelete = true;
}
} }
} }
if (fDelete) if (fDelete)
@ -1146,15 +1141,10 @@ void CConnman::ThreadSocketHandler()
// write buffer in this case before receiving more. This avoids // write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves // needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling. // receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is no (complete) message in the receive buffer, // * Otherwise, if there is space left in the receive buffer, select() for
// or there is space left in the buffer, select() for receiving data. // receiving data.
// * (if neither of the above applies, there is certainly one message // * Hand off all complete messages to the processor, to be handled without
// in the receiver buffer ready to be processed). // blocking here.
// Together, that means that at least one of the following is always possible,
// so we don't deadlock:
// * We send some data.
// * We wait for data to be received (and disconnect after timeout).
// * We process a message in the buffer (message handler thread).
{ {
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) { if (lockSend) {
@ -1165,8 +1155,7 @@ void CConnman::ThreadSocketHandler()
} }
} }
{ {
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (!pnode->fPauseRecv)
if (lockRecv && !pnode->fPauseRecv)
FD_SET(pnode->hSocket, &fdsetRecv); FD_SET(pnode->hSocket, &fdsetRecv);
} }
} }
@ -1225,8 +1214,6 @@ void CConnman::ThreadSocketHandler()
continue; continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{ {
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{ {
{ {
// typical socket buffer is 8K-64K // typical socket buffer is 8K-64K
@ -1865,14 +1852,8 @@ void CConnman::ThreadMessageHandler()
continue; continue;
// Receive messages // Receive messages
{ bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (lockRecv)
{
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
}
}
if (flagInterruptMsgProc) if (flagInterruptMsgProc)
return; return;

4
src/net.h

@ -613,8 +613,6 @@ public:
size_t nProcessQueueSize; size_t nProcessQueueSize;
std::deque<CInv> vRecvGetData; std::deque<CInv> vRecvGetData;
std::list<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg;
uint64_t nRecvBytes; uint64_t nRecvBytes;
std::atomic<int> nRecvVersion; std::atomic<int> nRecvVersion;
@ -726,6 +724,7 @@ private:
const ServiceFlags nLocalServices; const ServiceFlags nLocalServices;
const int nMyStartingHeight; const int nMyStartingHeight;
int nSendVersion; int nSendVersion;
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
public: public:
NodeId GetId() const { NodeId GetId() const {
@ -746,7 +745,6 @@ public:
return nRefCount; return nRefCount;
} }
// requires LOCK(cs_vRecvMsg)
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
void SetRecvVersion(int nVersionIn) void SetRecvVersion(int nVersionIn)

1
src/net_processing.cpp

@ -2439,7 +2439,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return true; return true;
} }
// requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
const CChainParams& chainparams = Params(); const CChainParams& chainparams = Params();

Loading…
Cancel
Save