From 41b052ad87633d5a8a989c512c8710b875f2ba88 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Sun, 24 Mar 2013 16:52:24 +0100 Subject: [PATCH] Use per-message send buffer, rather than per connection --- src/main.cpp | 13 ++++++---- src/net.cpp | 59 ++++++++++++++++++++++++++++---------------- src/net.h | 65 +++++++++++++++++++++++-------------------------- src/protocol.h | 3 ++- src/serialize.h | 8 +++++- 5 files changed, 86 insertions(+), 62 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 0c80f23d..77061fab 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3104,7 +3104,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) // Change version pfrom->PushMessage("verack"); - pfrom->vSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); + pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); if (!pfrom->fInbound) { @@ -3722,9 +3722,9 @@ bool ProcessMessages(CNode* pfrom) bool fOk = true; std::deque::iterator it = pfrom->vRecvMsg.begin(); - while (it != pfrom->vRecvMsg.end()) { + while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->vSend.size() >= SendBufferSize()) + if (pfrom->nSendSize >= SendBufferSize()) break; // get next message @@ -3811,7 +3811,10 @@ bool ProcessMessages(CNode* pfrom) printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); } - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); + // In case the connection got shut down, its receive buffer was wiped + if (!pfrom->fDisconnect) + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); + return fOk; } @@ -3826,7 +3829,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle) // Keep-alive ping. We send a nonce of zero because we don't use it anywhere // right now. - if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSend.empty()) { + if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSendMsg.empty()) { uint64 nonce = 0; if (pto->nVersion > BIP0031_VERSION) pto->PushMessage("ping", nonce); diff --git a/src/net.cpp b/src/net.cpp index 1016d5d9..9ee6cb42 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -715,26 +715,43 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes) // requires LOCK(cs_vSend) void SocketSendData(CNode *pnode) { - CDataStream& vSend = pnode->vSend; - if (vSend.empty()) - return; - - int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); - if (nBytes > 0) - { - vSend.erase(vSend.begin(), vSend.begin() + nBytes); - pnode->nLastSend = GetTime(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - printf("socket send error %d\n", nErr); - pnode->CloseSocketDisconnect(); + std::deque::iterator it = pnode->vSendMsg.begin(); + + while (it != pnode->vSendMsg.end()) { + const CSerializeData &data = *it; + assert(data.size() > pnode->nSendOffset); + int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + if (nBytes > 0) { + pnode->nLastSend = GetTime(); + pnode->nSendOffset += nBytes; + if (pnode->nSendOffset == data.size()) { + pnode->nSendOffset = 0; + pnode->nSendSize -= data.size(); + it++; + } else { + // could not send full message; stop sending more + break; + } + } else { + if (nBytes < 0) { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + printf("socket send error %d\n", nErr); + pnode->CloseSocketDisconnect(); + } + } + // couldn't send anything at all + break; } } + + if (it == pnode->vSendMsg.end()) { + assert(pnode->nSendOffset == 0); + assert(pnode->nSendSize == 0); + } + pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); } void ThreadSocketHandler(void* parg) @@ -776,7 +793,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -863,7 +880,7 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { // do not read, if draining write queue - if (!pnode->vSend.empty()) + if (!pnode->vSendMsg.empty()) FD_SET(pnode->hSocket, &fdsetSend); else FD_SET(pnode->hSocket, &fdsetRecv); @@ -1032,7 +1049,7 @@ void ThreadSocketHandler2(void* parg) // // Inactivity checking // - if (pnode->vSend.empty()) + if (pnode->vSendMsg.empty()) pnode->nLastSendEmpty = GetTime(); if (GetTime() - pnode->nTimeConnected > 60) { diff --git a/src/net.h b/src/net.h index d779265b..9805e39f 100644 --- a/src/net.h +++ b/src/net.h @@ -173,7 +173,10 @@ public: // socket uint64 nServices; SOCKET hSocket; - CDataStream vSend; + CDataStream ssSend; + size_t nSendSize; // total size of all vSendMsg entries + size_t nSendOffset; // offset inside the first vSendMsg already sent + std::deque vSendMsg; CCriticalSection cs_vSend; std::deque vRecvMsg; @@ -184,8 +187,6 @@ public: int64 nLastRecv; int64 nLastSendEmpty; int64 nTimeConnected; - int nHeaderStart; - unsigned int nMessageStart; CAddress addr; std::string addrName; CService addrLocal; @@ -233,7 +234,7 @@ public: CCriticalSection cs_inventory; std::multimap mapAskFor; - CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION) + CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, MIN_PROTO_VERSION) { nServices = 0; hSocket = hSocketIn; @@ -242,8 +243,6 @@ public: nLastRecv = 0; nLastSendEmpty = GetTime(); nTimeConnected = GetTime(); - nHeaderStart = -1; - nMessageStart = -1; addr = addrIn; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; nVersion = 0; @@ -256,6 +255,8 @@ public: fDisconnect = false; nRefCount = 0; nReleaseTime = 0; + nSendSize = 0; + nSendOffset = 0; hashContinue = 0; pindexLastGetBlocksBegin = 0; hashLastGetBlocksEnd = 0; @@ -387,11 +388,8 @@ public: void BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend) { ENTER_CRITICAL_SECTION(cs_vSend); - if (nHeaderStart != -1) - AbortMessage(); - nHeaderStart = vSend.size(); - vSend << CMessageHeader(pszCommand, 0); - nMessageStart = vSend.size(); + assert(ssSend.size() == 0); + ssSend << CMessageHeader(pszCommand, 0); if (fDebug) printf("sending: %s ", pszCommand); } @@ -399,11 +397,8 @@ public: // TODO: Document the precondition of this function. Is cs_vSend locked? void AbortMessage() UNLOCK_FUNCTION(cs_vSend) { - if (nHeaderStart < 0) - return; - vSend.resize(nHeaderStart); - nHeaderStart = -1; - nMessageStart = -1; + ssSend.clear(); + LEAVE_CRITICAL_SECTION(cs_vSend); if (fDebug) @@ -420,30 +415,32 @@ public: return; } - if (nHeaderStart < 0) + if (ssSend.size() == 0) return; // Set the size - unsigned int nSize = vSend.size() - nMessageStart; - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::MESSAGE_SIZE_OFFSET, &nSize, sizeof(nSize)); + unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE; + memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize)); // Set the checksum - uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end()); + uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end()); unsigned int nChecksum = 0; memcpy(&nChecksum, &hash, sizeof(nChecksum)); - assert(nMessageStart - nHeaderStart >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::CHECKSUM_OFFSET, &nChecksum, sizeof(nChecksum)); + assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); + memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum)); if (fDebug) { printf("(%d bytes)\n", nSize); } + std::deque::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData()); + ssSend.GetAndClear(*it); + nSendSize += (*it).size(); + // If write queue empty, attempt "optimistic write" - if (nHeaderStart == 0) + if (it == vSendMsg.begin()) SocketSendData(this); - nHeaderStart = -1; - nMessageStart = -1; LEAVE_CRITICAL_SECTION(cs_vSend); } @@ -470,7 +467,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1; + ssSend << a1; EndMessage(); } catch (...) @@ -486,7 +483,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2; + ssSend << a1 << a2; EndMessage(); } catch (...) @@ -502,7 +499,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3; + ssSend << a1 << a2 << a3; EndMessage(); } catch (...) @@ -518,7 +515,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4; + ssSend << a1 << a2 << a3 << a4; EndMessage(); } catch (...) @@ -534,7 +531,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5; + ssSend << a1 << a2 << a3 << a4 << a5; EndMessage(); } catch (...) @@ -550,7 +547,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6; + ssSend << a1 << a2 << a3 << a4 << a5 << a6; EndMessage(); } catch (...) @@ -566,7 +563,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; EndMessage(); } catch (...) @@ -582,7 +579,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; EndMessage(); } catch (...) @@ -598,7 +595,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; EndMessage(); } catch (...) diff --git a/src/protocol.h b/src/protocol.h index f5c16205..49984250 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -56,7 +56,8 @@ class CMessageHeader CHECKSUM_SIZE=sizeof(int), MESSAGE_SIZE_OFFSET=MESSAGE_START_SIZE+COMMAND_SIZE, - CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE + CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE, + HEADER_SIZE=MESSAGE_START_SIZE+COMMAND_SIZE+MESSAGE_SIZE_SIZE+CHECKSUM_SIZE }; char pchMessageStart[MESSAGE_START_SIZE]; char pchCommand[COMMAND_SIZE]; diff --git a/src/serialize.h b/src/serialize.h index f2626281..e3d9939b 100644 --- a/src/serialize.h +++ b/src/serialize.h @@ -789,6 +789,7 @@ struct ser_streamplaceholder +typedef std::vector > CSerializeData; /** Double ended buffer combining vector and stream-like interfaces. * @@ -798,7 +799,7 @@ struct ser_streamplaceholder class CDataStream { protected: - typedef std::vector > vector_type; + typedef CSerializeData vector_type; vector_type vch; unsigned int nReadPos; short state; @@ -1095,6 +1096,11 @@ public: ::Unserialize(*this, obj, nType, nVersion); return (*this); } + + void GetAndClear(CSerializeData &data) { + vch.swap(data); + CSerializeData().swap(vch); + } };