Browse Source

Use per-message send buffer, rather than per connection

0.8
Pieter Wuille 12 years ago committed by Pieter Wuille
parent
commit
41b052ad87
  1. 13
      src/main.cpp
  2. 59
      src/net.cpp
  3. 65
      src/net.h
  4. 3
      src/protocol.h
  5. 8
      src/serialize.h

13
src/main.cpp

@ -3104,7 +3104,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
// Change version // Change version
pfrom->PushMessage("verack"); pfrom->PushMessage("verack");
pfrom->vSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
if (!pfrom->fInbound) if (!pfrom->fInbound)
{ {
@ -3722,9 +3722,9 @@ bool ProcessMessages(CNode* pfrom)
bool fOk = true; bool fOk = true;
std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin(); std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
while (it != pfrom->vRecvMsg.end()) { while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
if (pfrom->vSend.size() >= SendBufferSize()) if (pfrom->nSendSize >= SendBufferSize())
break; break;
// get next message // get next message
@ -3811,7 +3811,10 @@ bool ProcessMessages(CNode* pfrom)
printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize);
} }
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); // In case the connection got shut down, its receive buffer was wiped
if (!pfrom->fDisconnect)
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
return fOk; return fOk;
} }
@ -3826,7 +3829,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
// Keep-alive ping. We send a nonce of zero because we don't use it anywhere // Keep-alive ping. We send a nonce of zero because we don't use it anywhere
// right now. // right now.
if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSend.empty()) { if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSendMsg.empty()) {
uint64 nonce = 0; uint64 nonce = 0;
if (pto->nVersion > BIP0031_VERSION) if (pto->nVersion > BIP0031_VERSION)
pto->PushMessage("ping", nonce); pto->PushMessage("ping", nonce);

59
src/net.cpp

@ -715,26 +715,43 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes)
// requires LOCK(cs_vSend) // requires LOCK(cs_vSend)
void SocketSendData(CNode *pnode) void SocketSendData(CNode *pnode)
{ {
CDataStream& vSend = pnode->vSend; std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
if (vSend.empty())
return; while (it != pnode->vSendMsg.end()) {
const CSerializeData &data = *it;
int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); assert(data.size() > pnode->nSendOffset);
if (nBytes > 0) int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
{ if (nBytes > 0) {
vSend.erase(vSend.begin(), vSend.begin() + nBytes); pnode->nLastSend = GetTime();
pnode->nLastSend = GetTime(); pnode->nSendOffset += nBytes;
} if (pnode->nSendOffset == data.size()) {
else if (nBytes < 0) pnode->nSendOffset = 0;
{ pnode->nSendSize -= data.size();
// error it++;
int nErr = WSAGetLastError(); } else {
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) // could not send full message; stop sending more
{ break;
printf("socket send error %d\n", nErr); }
pnode->CloseSocketDisconnect(); } else {
if (nBytes < 0) {
// error
int nErr = WSAGetLastError();
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
{
printf("socket send error %d\n", nErr);
pnode->CloseSocketDisconnect();
}
}
// couldn't send anything at all
break;
} }
} }
if (it == pnode->vSendMsg.end()) {
assert(pnode->nSendOffset == 0);
assert(pnode->nSendSize == 0);
}
pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it);
} }
void ThreadSocketHandler(void* parg) void ThreadSocketHandler(void* parg)
@ -776,7 +793,7 @@ void ThreadSocketHandler2(void* parg)
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy)
{ {
if (pnode->fDisconnect || if (pnode->fDisconnect ||
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty())) (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty()))
{ {
// remove from vNodes // remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@ -863,7 +880,7 @@ void ThreadSocketHandler2(void* parg)
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) { if (lockSend) {
// do not read, if draining write queue // do not read, if draining write queue
if (!pnode->vSend.empty()) if (!pnode->vSendMsg.empty())
FD_SET(pnode->hSocket, &fdsetSend); FD_SET(pnode->hSocket, &fdsetSend);
else else
FD_SET(pnode->hSocket, &fdsetRecv); FD_SET(pnode->hSocket, &fdsetRecv);
@ -1032,7 +1049,7 @@ void ThreadSocketHandler2(void* parg)
// //
// Inactivity checking // Inactivity checking
// //
if (pnode->vSend.empty()) if (pnode->vSendMsg.empty())
pnode->nLastSendEmpty = GetTime(); pnode->nLastSendEmpty = GetTime();
if (GetTime() - pnode->nTimeConnected > 60) if (GetTime() - pnode->nTimeConnected > 60)
{ {

65
src/net.h

@ -173,7 +173,10 @@ public:
// socket // socket
uint64 nServices; uint64 nServices;
SOCKET hSocket; SOCKET hSocket;
CDataStream vSend; CDataStream ssSend;
size_t nSendSize; // total size of all vSendMsg entries
size_t nSendOffset; // offset inside the first vSendMsg already sent
std::deque<CSerializeData> vSendMsg;
CCriticalSection cs_vSend; CCriticalSection cs_vSend;
std::deque<CNetMessage> vRecvMsg; std::deque<CNetMessage> vRecvMsg;
@ -184,8 +187,6 @@ public:
int64 nLastRecv; int64 nLastRecv;
int64 nLastSendEmpty; int64 nLastSendEmpty;
int64 nTimeConnected; int64 nTimeConnected;
int nHeaderStart;
unsigned int nMessageStart;
CAddress addr; CAddress addr;
std::string addrName; std::string addrName;
CService addrLocal; CService addrLocal;
@ -233,7 +234,7 @@ public:
CCriticalSection cs_inventory; CCriticalSection cs_inventory;
std::multimap<int64, CInv> mapAskFor; std::multimap<int64, CInv> mapAskFor;
CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION) CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, MIN_PROTO_VERSION)
{ {
nServices = 0; nServices = 0;
hSocket = hSocketIn; hSocket = hSocketIn;
@ -242,8 +243,6 @@ public:
nLastRecv = 0; nLastRecv = 0;
nLastSendEmpty = GetTime(); nLastSendEmpty = GetTime();
nTimeConnected = GetTime(); nTimeConnected = GetTime();
nHeaderStart = -1;
nMessageStart = -1;
addr = addrIn; addr = addrIn;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
nVersion = 0; nVersion = 0;
@ -256,6 +255,8 @@ public:
fDisconnect = false; fDisconnect = false;
nRefCount = 0; nRefCount = 0;
nReleaseTime = 0; nReleaseTime = 0;
nSendSize = 0;
nSendOffset = 0;
hashContinue = 0; hashContinue = 0;
pindexLastGetBlocksBegin = 0; pindexLastGetBlocksBegin = 0;
hashLastGetBlocksEnd = 0; hashLastGetBlocksEnd = 0;
@ -387,11 +388,8 @@ public:
void BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend) void BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend)
{ {
ENTER_CRITICAL_SECTION(cs_vSend); ENTER_CRITICAL_SECTION(cs_vSend);
if (nHeaderStart != -1) assert(ssSend.size() == 0);
AbortMessage(); ssSend << CMessageHeader(pszCommand, 0);
nHeaderStart = vSend.size();
vSend << CMessageHeader(pszCommand, 0);
nMessageStart = vSend.size();
if (fDebug) if (fDebug)
printf("sending: %s ", pszCommand); printf("sending: %s ", pszCommand);
} }
@ -399,11 +397,8 @@ public:
// TODO: Document the precondition of this function. Is cs_vSend locked? // TODO: Document the precondition of this function. Is cs_vSend locked?
void AbortMessage() UNLOCK_FUNCTION(cs_vSend) void AbortMessage() UNLOCK_FUNCTION(cs_vSend)
{ {
if (nHeaderStart < 0) ssSend.clear();
return;
vSend.resize(nHeaderStart);
nHeaderStart = -1;
nMessageStart = -1;
LEAVE_CRITICAL_SECTION(cs_vSend); LEAVE_CRITICAL_SECTION(cs_vSend);
if (fDebug) if (fDebug)
@ -420,30 +415,32 @@ public:
return; return;
} }
if (nHeaderStart < 0) if (ssSend.size() == 0)
return; return;
// Set the size // Set the size
unsigned int nSize = vSend.size() - nMessageStart; unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE;
memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::MESSAGE_SIZE_OFFSET, &nSize, sizeof(nSize)); memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize));
// Set the checksum // Set the checksum
uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end()); uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end());
unsigned int nChecksum = 0; unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum)); memcpy(&nChecksum, &hash, sizeof(nChecksum));
assert(nMessageStart - nHeaderStart >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum));
memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::CHECKSUM_OFFSET, &nChecksum, sizeof(nChecksum)); memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum));
if (fDebug) { if (fDebug) {
printf("(%d bytes)\n", nSize); printf("(%d bytes)\n", nSize);
} }
std::deque<CSerializeData>::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData());
ssSend.GetAndClear(*it);
nSendSize += (*it).size();
// If write queue empty, attempt "optimistic write" // If write queue empty, attempt "optimistic write"
if (nHeaderStart == 0) if (it == vSendMsg.begin())
SocketSendData(this); SocketSendData(this);
nHeaderStart = -1;
nMessageStart = -1;
LEAVE_CRITICAL_SECTION(cs_vSend); LEAVE_CRITICAL_SECTION(cs_vSend);
} }
@ -470,7 +467,7 @@ public:
try try
{ {
BeginMessage(pszCommand); BeginMessage(pszCommand);
vSend << a1; ssSend << a1;
EndMessage(); EndMessage();
} }
catch (...) catch (...)
@ -486,7 +483,7 @@ public:
try try
{ {
BeginMessage(pszCommand); BeginMessage(pszCommand);
vSend << a1 << a2; ssSend << a1 << a2;
EndMessage(); EndMessage();
} }
catch (...) catch (...)
@ -502,7 +499,7 @@ public:
try try
{ {
BeginMessage(pszCommand); BeginMessage(pszCommand);
vSend << a1 << a2 << a3; ssSend << a1 << a2 << a3;
EndMessage(); EndMessage();
} }
catch (...) catch (...)
@ -518,7 +515,7 @@ public:
try try
{ {
BeginMessage(pszCommand); BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4; ssSend << a1 << a2 << a3 << a4;
EndMessage(); EndMessage();
} }
catch (...) catch (...)
@ -534,7 +531,7 @@ public:
try try
{ {
BeginMessage(pszCommand); BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5; ssSend << a1 << a2 << a3 << a4 << a5;
EndMessage(); EndMessage();
} }
catch (...) catch (...)
@ -550,7 +547,7 @@ public:
try try
{ {
BeginMessage(pszCommand); BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5 << a6; ssSend << a1 << a2 << a3 << a4 << a5 << a6;
EndMessage(); EndMessage();
} }
catch (...) catch (...)
@ -566,7 +563,7 @@ public:
try try
{ {
BeginMessage(pszCommand); BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
EndMessage(); EndMessage();
} }
catch (...) catch (...)
@ -582,7 +579,7 @@ public:
try try
{ {
BeginMessage(pszCommand); BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
EndMessage(); EndMessage();
} }
catch (...) catch (...)
@ -598,7 +595,7 @@ public:
try try
{ {
BeginMessage(pszCommand); BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
EndMessage(); EndMessage();
} }
catch (...) catch (...)

3
src/protocol.h

@ -56,7 +56,8 @@ class CMessageHeader
CHECKSUM_SIZE=sizeof(int), CHECKSUM_SIZE=sizeof(int),
MESSAGE_SIZE_OFFSET=MESSAGE_START_SIZE+COMMAND_SIZE, MESSAGE_SIZE_OFFSET=MESSAGE_START_SIZE+COMMAND_SIZE,
CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE,
HEADER_SIZE=MESSAGE_START_SIZE+COMMAND_SIZE+MESSAGE_SIZE_SIZE+CHECKSUM_SIZE
}; };
char pchMessageStart[MESSAGE_START_SIZE]; char pchMessageStart[MESSAGE_START_SIZE];
char pchCommand[COMMAND_SIZE]; char pchCommand[COMMAND_SIZE];

8
src/serialize.h

@ -789,6 +789,7 @@ struct ser_streamplaceholder
typedef std::vector<char, zero_after_free_allocator<char> > CSerializeData;
/** Double ended buffer combining vector and stream-like interfaces. /** Double ended buffer combining vector and stream-like interfaces.
* *
@ -798,7 +799,7 @@ struct ser_streamplaceholder
class CDataStream class CDataStream
{ {
protected: protected:
typedef std::vector<char, zero_after_free_allocator<char> > vector_type; typedef CSerializeData vector_type;
vector_type vch; vector_type vch;
unsigned int nReadPos; unsigned int nReadPos;
short state; short state;
@ -1095,6 +1096,11 @@ public:
::Unserialize(*this, obj, nType, nVersion); ::Unserialize(*this, obj, nType, nVersion);
return (*this); return (*this);
} }
void GetAndClear(CSerializeData &data) {
vch.swap(data);
CSerializeData().swap(vch);
}
}; };

Loading…
Cancel
Save