|
|
@ -841,19 +841,39 @@ void ThreadSocketHandler() |
|
|
|
{ |
|
|
|
{ |
|
|
|
if (pnode->hSocket == INVALID_SOCKET) |
|
|
|
if (pnode->hSocket == INVALID_SOCKET) |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
|
|
|
|
FD_SET(pnode->hSocket, &fdsetError); |
|
|
|
|
|
|
|
hSocketMax = max(hSocketMax, pnode->hSocket); |
|
|
|
|
|
|
|
have_fds = true; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Implement the following logic:
|
|
|
|
|
|
|
|
// * If there is data to send, select() for sending data. As this only
|
|
|
|
|
|
|
|
// happens when optimistic write failed, we choose to first drain the
|
|
|
|
|
|
|
|
// write buffer in this case before receiving more. This avoids
|
|
|
|
|
|
|
|
// needlessly queueing received data, if the remote peer is not themselves
|
|
|
|
|
|
|
|
// receiving data. This means properly utilizing TCP flow control signalling.
|
|
|
|
|
|
|
|
// * Otherwise, if there is no (complete) message in the receive buffer,
|
|
|
|
|
|
|
|
// or there is space left in the buffer, select() for receiving data.
|
|
|
|
|
|
|
|
// * (if neither of the above applies, there is certainly one message
|
|
|
|
|
|
|
|
// in the receiver buffer ready to be processed).
|
|
|
|
|
|
|
|
// Together, that means that at least one of the following is always possible,
|
|
|
|
|
|
|
|
// so we don't deadlock:
|
|
|
|
|
|
|
|
// * We send some data.
|
|
|
|
|
|
|
|
// * We wait for data to be received (and disconnect after timeout).
|
|
|
|
|
|
|
|
// * We process a message in the buffer (message handler thread).
|
|
|
|
{ |
|
|
|
{ |
|
|
|
TRY_LOCK(pnode->cs_vSend, lockSend); |
|
|
|
TRY_LOCK(pnode->cs_vSend, lockSend); |
|
|
|
if (lockSend) { |
|
|
|
if (lockSend && !pnode->vSendMsg.empty()) { |
|
|
|
// do not read, if draining write queue
|
|
|
|
FD_SET(pnode->hSocket, &fdsetSend); |
|
|
|
if (!pnode->vSendMsg.empty()) |
|
|
|
continue; |
|
|
|
FD_SET(pnode->hSocket, &fdsetSend); |
|
|
|
|
|
|
|
else |
|
|
|
|
|
|
|
FD_SET(pnode->hSocket, &fdsetRecv); |
|
|
|
|
|
|
|
FD_SET(pnode->hSocket, &fdsetError); |
|
|
|
|
|
|
|
hSocketMax = max(hSocketMax, pnode->hSocket); |
|
|
|
|
|
|
|
have_fds = true; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); |
|
|
|
|
|
|
|
if (lockRecv && ( |
|
|
|
|
|
|
|
pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || |
|
|
|
|
|
|
|
pnode->GetTotalRecvSize() <= ReceiveFloodSize())) |
|
|
|
|
|
|
|
FD_SET(pnode->hSocket, &fdsetRecv); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -959,12 +979,7 @@ void ThreadSocketHandler() |
|
|
|
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); |
|
|
|
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); |
|
|
|
if (lockRecv) |
|
|
|
if (lockRecv) |
|
|
|
{ |
|
|
|
{ |
|
|
|
if (pnode->GetTotalRecvSize() > ReceiveFloodSize()) { |
|
|
|
{ |
|
|
|
if (!pnode->fDisconnect) |
|
|
|
|
|
|
|
printf("socket recv flood control disconnect (%u bytes)\n", pnode->GetTotalRecvSize()); |
|
|
|
|
|
|
|
pnode->CloseSocketDisconnect(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
else { |
|
|
|
|
|
|
|
// typical socket buffer is 8K-64K
|
|
|
|
// typical socket buffer is 8K-64K
|
|
|
|
char pchBuf[0x10000]; |
|
|
|
char pchBuf[0x10000]; |
|
|
|
int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); |
|
|
|
int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); |
|
|
|