diff --git a/src/main.cpp b/src/main.cpp index 22baf0f3e..b29091b4f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3029,6 +3029,115 @@ bool static AlreadyHave(const CInv& inv) unsigned char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 }; +void static ProcessGetData(CNode* pfrom) +{ + std::deque::iterator it = pfrom->vRecvGetData.begin(); + + vector vNotFound; + + while (it != pfrom->vRecvGetData.end()) { + // Don't bother if send buffer is too full to respond anyway + if (pfrom->nSendSize >= SendBufferSize()) + break; + + const CInv &inv = *it; + { + if (fShutdown) + break; + it++; + + if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK) + { + // Send block from disk + map::iterator mi = mapBlockIndex.find(inv.hash); + if (mi != mapBlockIndex.end()) + { + CBlock block; + block.ReadFromDisk((*mi).second); + if (inv.type == MSG_BLOCK) + pfrom->PushMessage("block", block); + else // MSG_FILTERED_BLOCK) + { + LOCK(pfrom->cs_filter); + if (pfrom->pfilter) + { + CMerkleBlock merkleBlock(block, *pfrom->pfilter); + pfrom->PushMessage("merkleblock", merkleBlock); + // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see + // This avoids hurting performance by pointlessly requiring a round-trip + // Note that there is currently no way for a node to request any single transactions we didnt send here - + // they must either disconnect and retry or request the full block. + // Thus, the protocol spec specified allows for us to provide duplicate txn here, + // however we MUST always provide at least what the remote peer needs + typedef std::pair PairType; + BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn) + if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second))) + pfrom->PushMessage("tx", block.vtx[pair.first]); + } + // else + // no response + } + + // Trigger them to send a getblocks request for the next batch of inventory + if (inv.hash == pfrom->hashContinue) + { + // Bypass PushInventory, this must send even if redundant, + // and we want it right after the last block so they don't + // wait for other stuff first. + vector vInv; + vInv.push_back(CInv(MSG_BLOCK, hashBestChain)); + pfrom->PushMessage("inv", vInv); + pfrom->hashContinue = 0; + } + } + } + else if (inv.IsKnownType()) + { + // Send stream from relay memory + bool pushed = false; + { + LOCK(cs_mapRelay); + map::iterator mi = mapRelay.find(inv); + if (mi != mapRelay.end()) { + pfrom->PushMessage(inv.GetCommand(), (*mi).second); + pushed = true; + } + } + if (!pushed && inv.type == MSG_TX) { + LOCK(mempool.cs); + if (mempool.exists(inv.hash)) { + CTransaction tx = mempool.lookup(inv.hash); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + ss.reserve(1000); + ss << tx; + pfrom->PushMessage("tx", ss); + pushed = true; + } + } + if (!pushed) { + vNotFound.push_back(inv); + } + } + + // Track requests for our stuff. + Inventory(inv.hash); + } + } + + pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it); + + if (!vNotFound.empty()) { + // Let the peer know that we didn't find what it asked for, so it doesn't + // have to wait around forever. Currently only SPV clients actually care + // about this message: it's needed when they are recursively walking the + // dependencies of relevant unconfirmed transactions. SPV clients want to + // do that because they want to know about (and store and rebroadcast and + // risk analyze) the dependencies of transactions relevant to them, without + // having to download the entire memory pool. + pfrom->PushMessage("notfound", vNotFound); + } +} + bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) { RandAddSeedPerfmon(); @@ -3104,7 +3213,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) // Change version pfrom->PushMessage("verack"); - pfrom->vSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); + pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); if (!pfrom->fInbound) { @@ -3168,7 +3277,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) else if (strCommand == "verack") { - pfrom->vRecv.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); + pfrom->SetRecvVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); } @@ -3302,101 +3411,11 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) if (fDebugNet || (vInv.size() != 1)) printf("received getdata (%"PRIszu" invsz)\n", vInv.size()); - vector vNotFound; - BOOST_FOREACH(const CInv& inv, vInv) - { - if (fShutdown) - return true; - if (fDebugNet || (vInv.size() == 1)) - printf("received getdata for: %s\n", inv.ToString().c_str()); + if ((fDebugNet && vInv.size() > 0) || (vInv.size() == 1)) + printf("received getdata for: %s\n", vInv[0].ToString().c_str()); - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK) - { - // Send block from disk - map::iterator mi = mapBlockIndex.find(inv.hash); - if (mi != mapBlockIndex.end()) - { - CBlock block; - block.ReadFromDisk((*mi).second); - if (inv.type == MSG_BLOCK) - pfrom->PushMessage("block", block); - else // MSG_FILTERED_BLOCK) - { - LOCK(pfrom->cs_filter); - if (pfrom->pfilter) - { - CMerkleBlock merkleBlock(block, *pfrom->pfilter); - pfrom->PushMessage("merkleblock", merkleBlock); - // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see - // This avoids hurting performance by pointlessly requiring a round-trip - // Note that there is currently no way for a node to request any single transactions we didnt send here - - // they must either disconnect and retry or request the full block. - // Thus, the protocol spec specified allows for us to provide duplicate txn here, - // however we MUST always provide at least what the remote peer needs - typedef std::pair PairType; - BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn) - if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second))) - pfrom->PushMessage("tx", block.vtx[pair.first]); - } - // else - // no response - } - - // Trigger them to send a getblocks request for the next batch of inventory - if (inv.hash == pfrom->hashContinue) - { - // Bypass PushInventory, this must send even if redundant, - // and we want it right after the last block so they don't - // wait for other stuff first. - vector vInv; - vInv.push_back(CInv(MSG_BLOCK, hashBestChain)); - pfrom->PushMessage("inv", vInv); - pfrom->hashContinue = 0; - } - } - } - else if (inv.IsKnownType()) - { - // Send stream from relay memory - bool pushed = false; - { - LOCK(cs_mapRelay); - map::iterator mi = mapRelay.find(inv); - if (mi != mapRelay.end()) { - pfrom->PushMessage(inv.GetCommand(), (*mi).second); - pushed = true; - } - } - if (!pushed && inv.type == MSG_TX) { - LOCK(mempool.cs); - if (mempool.exists(inv.hash)) { - CTransaction tx = mempool.lookup(inv.hash); - CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); - ss.reserve(1000); - ss << tx; - pfrom->PushMessage("tx", ss); - pushed = true; - } - } - if (!pushed) { - vNotFound.push_back(inv); - } - } - - // Track requests for our stuff. - Inventory(inv.hash); - - if (!vNotFound.empty()) { - // Let the peer know that we didn't find what it asked for, so it doesn't - // have to wait around forever. Currently only SPV clients actually care - // about this message: it's needed when they are recursively walking the - // dependencies of relevant unconfirmed transactions. SPV clients want to - // do that because they want to know about (and store and rebroadcast and - // risk analyze) the dependencies of transactions relevant to them, without - // having to download the entire memory pool. - pfrom->PushMessage("notfound", vNotFound); - } - } + pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); + ProcessGetData(pfrom); } @@ -3705,13 +3724,11 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) return true; } +// requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom) { - CDataStream& vRecv = pfrom->vRecv; - if (vRecv.empty()) - return true; //if (fDebug) - // printf("ProcessMessages(%u bytes)\n", vRecv.size()); + // printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size()); // // Message format @@ -3721,33 +3738,41 @@ bool ProcessMessages(CNode* pfrom) // (4) checksum // (x) data // + bool fOk = true; - loop - { + if (!pfrom->vRecvGetData.empty()) + ProcessGetData(pfrom); + + std::deque::iterator it = pfrom->vRecvMsg.begin(); + while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->vSend.size() >= SendBufferSize()) + if (pfrom->nSendSize >= SendBufferSize()) + break; + + // get next message + CNetMessage& msg = *it; + + //if (fDebug) + // printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n", + // msg.hdr.nMessageSize, msg.vRecv.size(), + // msg.complete() ? "Y" : "N"); + + // end, if an incomplete message is found + if (!msg.complete()) break; + // at this point, any failure means we can delete the current message + it++; + // Scan for message start - CDataStream::iterator pstart = search(vRecv.begin(), vRecv.end(), BEGIN(pchMessageStart), END(pchMessageStart)); - int nHeaderSize = vRecv.GetSerializeSize(CMessageHeader()); - if (vRecv.end() - pstart < nHeaderSize) - { - if ((int)vRecv.size() > nHeaderSize) - { - printf("\n\nPROCESSMESSAGE MESSAGESTART NOT FOUND\n\n"); - vRecv.erase(vRecv.begin(), vRecv.end() - nHeaderSize); - } + if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) { + printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n"); + fOk = false; break; } - if (pstart - vRecv.begin() > 0) - printf("\n\nPROCESSMESSAGE SKIPPED %"PRIpdd" BYTES\n\n", pstart - vRecv.begin()); - vRecv.erase(vRecv.begin(), pstart); // Read header - vector vHeaderSave(vRecv.begin(), vRecv.begin() + nHeaderSize); - CMessageHeader hdr; - vRecv >> hdr; + CMessageHeader& hdr = msg.hdr; if (!hdr.IsValid()) { printf("\n\nPROCESSMESSAGE: ERRORS IN HEADER %s\n\n\n", hdr.GetCommand().c_str()); @@ -3757,19 +3782,9 @@ bool ProcessMessages(CNode* pfrom) // Message size unsigned int nMessageSize = hdr.nMessageSize; - if (nMessageSize > MAX_SIZE) - { - printf("ProcessMessages(%s, %u bytes) : nMessageSize > MAX_SIZE\n", strCommand.c_str(), nMessageSize); - continue; - } - if (nMessageSize > vRecv.size()) - { - // Rewind and wait for rest of message - vRecv.insert(vRecv.begin(), vHeaderSave.begin(), vHeaderSave.end()); - break; - } // Checksum + CDataStream& vRecv = msg.vRecv; uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize); unsigned int nChecksum = 0; memcpy(&nChecksum, &hash, sizeof(nChecksum)); @@ -3780,20 +3795,16 @@ bool ProcessMessages(CNode* pfrom) continue; } - // Copy message to its own buffer - CDataStream vMsg(vRecv.begin(), vRecv.begin() + nMessageSize, vRecv.nType, vRecv.nVersion); - vRecv.ignore(nMessageSize); - // Process message bool fRet = false; try { { LOCK(cs_main); - fRet = ProcessMessage(pfrom, strCommand, vMsg); + fRet = ProcessMessage(pfrom, strCommand, vRecv); } if (fShutdown) - return true; + break; } catch (std::ios_base::failure& e) { @@ -3822,8 +3833,11 @@ bool ProcessMessages(CNode* pfrom) printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); } - vRecv.Compact(); - return true; + // In case the connection got shut down, its receive buffer was wiped + if (!pfrom->fDisconnect) + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); + + return fOk; } @@ -3837,7 +3851,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle) // Keep-alive ping. We send a nonce of zero because we don't use it anywhere // right now. - if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSend.empty()) { + if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSendMsg.empty()) { uint64 nonce = 0; if (pto->nVersion > BIP0031_VERSION) pto->PushMessage("ping", nonce); diff --git a/src/net.cpp b/src/net.cpp index 6c8fe3ffc..9ee6cb423 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -536,7 +536,11 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - vRecv.clear(); + + // in case this fails, we'll empty the recv buffer when the CNode is deleted + TRY_LOCK(cs_vRecvMsg, lockRecv); + if (lockRecv) + vRecvMsg.clear(); } } @@ -628,15 +632,128 @@ void CNode::copyStats(CNodeStats &stats) } #undef X +// requires LOCK(cs_vRecvMsg) +bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) +{ + while (nBytes > 0) { + + // get current incomplete message, or create a new one + if (vRecvMsg.empty() || + vRecvMsg.back().complete()) + vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); + + CNetMessage& msg = vRecvMsg.back(); + + // absorb network data + int handled; + if (!msg.in_data) + handled = msg.readHeader(pch, nBytes); + else + handled = msg.readData(pch, nBytes); + + if (handled < 0) + return false; + + pch += handled; + nBytes -= handled; + } + + return true; +} + +int CNetMessage::readHeader(const char *pch, unsigned int nBytes) +{ + // copy data to temporary parsing buffer + unsigned int nRemaining = 24 - nHdrPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&hdrbuf[nHdrPos], pch, nCopy); + nHdrPos += nCopy; + + // if header incomplete, exit + if (nHdrPos < 24) + return nCopy; + // deserialize to CMessageHeader + try { + hdrbuf >> hdr; + } + catch (std::exception &e) { + return -1; + } + + // reject messages larger than MAX_SIZE + if (hdr.nMessageSize > MAX_SIZE) + return -1; + // switch state to reading message data + in_data = true; + vRecv.resize(hdr.nMessageSize); + + return nCopy; +} +int CNetMessage::readData(const char *pch, unsigned int nBytes) +{ + unsigned int nRemaining = hdr.nMessageSize - nDataPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&vRecv[nDataPos], pch, nCopy); + nDataPos += nCopy; + + return nCopy; +} + + + +// requires LOCK(cs_vSend) +void SocketSendData(CNode *pnode) +{ + std::deque::iterator it = pnode->vSendMsg.begin(); + + while (it != pnode->vSendMsg.end()) { + const CSerializeData &data = *it; + assert(data.size() > pnode->nSendOffset); + int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + if (nBytes > 0) { + pnode->nLastSend = GetTime(); + pnode->nSendOffset += nBytes; + if (pnode->nSendOffset == data.size()) { + pnode->nSendOffset = 0; + pnode->nSendSize -= data.size(); + it++; + } else { + // could not send full message; stop sending more + break; + } + } else { + if (nBytes < 0) { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + printf("socket send error %d\n", nErr); + pnode->CloseSocketDisconnect(); + } + } + // couldn't send anything at all + break; + } + } + + if (it == pnode->vSendMsg.end()) { + assert(pnode->nSendOffset == 0); + assert(pnode->nSendSize == 0); + } + pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); +} + void ThreadSocketHandler(void* parg) { // Make this thread recognisable as the networking thread @@ -676,7 +793,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -708,7 +825,7 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { TRY_LOCK(pnode->cs_inventory, lockInv); @@ -759,14 +876,18 @@ void ThreadSocketHandler2(void* parg) { if (pnode->hSocket == INVALID_SOCKET) continue; - FD_SET(pnode->hSocket, &fdsetRecv); - FD_SET(pnode->hSocket, &fdsetError); - hSocketMax = max(hSocketMax, pnode->hSocket); - have_fds = true; { TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend && !pnode->vSend.empty()) - FD_SET(pnode->hSocket, &fdsetSend); + if (lockSend) { + // do not read, if draining write queue + if (!pnode->vSendMsg.empty()) + FD_SET(pnode->hSocket, &fdsetSend); + else + FD_SET(pnode->hSocket, &fdsetRecv); + FD_SET(pnode->hSocket, &fdsetError); + hSocketMax = max(hSocketMax, pnode->hSocket); + have_fds = true; + } } } } @@ -873,15 +994,12 @@ void ThreadSocketHandler2(void* parg) continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - CDataStream& vRecv = pnode->vRecv; - unsigned int nPos = vRecv.size(); - - if (nPos > ReceiveBufferSize()) { + if (pnode->GetTotalRecvSize() > ReceiveFloodSize()) { if (!pnode->fDisconnect) - printf("socket recv flood control disconnect (%"PRIszu" bytes)\n", vRecv.size()); + printf("socket recv flood control disconnect (%u bytes)\n", pnode->GetTotalRecvSize()); pnode->CloseSocketDisconnect(); } else { @@ -890,8 +1008,8 @@ void ThreadSocketHandler2(void* parg) int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); if (nBytes > 0) { - vRecv.resize(nPos + nBytes); - memcpy(&vRecv[nPos], pchBuf, nBytes); + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) + pnode->CloseSocketDisconnect(); pnode->nLastRecv = GetTime(); } else if (nBytes == 0) @@ -925,34 +1043,13 @@ void ThreadSocketHandler2(void* parg) { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - { - CDataStream& vSend = pnode->vSend; - if (!vSend.empty()) - { - int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); - if (nBytes > 0) - { - vSend.erase(vSend.begin(), vSend.begin() + nBytes); - pnode->nLastSend = GetTime(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - printf("socket send error %d\n", nErr); - pnode->CloseSocketDisconnect(); - } - } - } - } + SocketSendData(pnode); } // // Inactivity checking // - if (pnode->vSend.empty()) + if (pnode->vSendMsg.empty()) pnode->nLastSendEmpty = GetTime(); if (GetTime() - pnode->nTimeConnected > 60) { @@ -1691,11 +1788,15 @@ void ThreadMessageHandler2(void* parg) pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; BOOST_FOREACH(CNode* pnode, vNodesCopy) { + if (pnode->fDisconnect) + continue; + // Receive messages { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) - ProcessMessages(pnode); + if (!ProcessMessages(pnode)) + pnode->CloseSocketDisconnect(); } if (fShutdown) return; diff --git a/src/net.h b/src/net.h index 3b46523cd..368e4cd4b 100644 --- a/src/net.h +++ b/src/net.h @@ -27,7 +27,7 @@ extern int nBestHeight; -inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); } +inline unsigned int ReceiveFloodSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); } inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 1*1000); } void AddOneShot(std::string strDest); @@ -42,6 +42,7 @@ unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError=REF(std::string())); void StartNode(void* parg); bool StopNode(); +void SocketSendData(CNode *pnode); enum { @@ -126,6 +127,44 @@ public: +class CNetMessage { +public: + bool in_data; // parsing header (false) or data (true) + + CDataStream hdrbuf; // partially received header + CMessageHeader hdr; // complete header + unsigned int nHdrPos; + + CDataStream vRecv; // received message data + unsigned int nDataPos; + + CNetMessage(int nTypeIn, int nVersionIn) : hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) { + hdrbuf.resize(24); + in_data = false; + nHdrPos = 0; + nDataPos = 0; + } + + bool complete() const + { + if (!in_data) + return false; + return (hdr.nMessageSize == nDataPos); + } + + void SetVersion(int nVersionIn) + { + hdrbuf.SetVersion(nVersionIn); + vRecv.SetVersion(nVersionIn); + } + + int readHeader(const char *pch, unsigned int nBytes); + int readData(const char *pch, unsigned int nBytes); +}; + + + + /** Information about a peer */ class CNode @@ -134,16 +173,21 @@ public: // socket uint64 nServices; SOCKET hSocket; - CDataStream vSend; - CDataStream vRecv; + CDataStream ssSend; + size_t nSendSize; // total size of all vSendMsg entries + size_t nSendOffset; // offset inside the first vSendMsg already sent + std::deque vSendMsg; CCriticalSection cs_vSend; - CCriticalSection cs_vRecv; + + std::deque vRecvGetData; + std::deque vRecvMsg; + CCriticalSection cs_vRecvMsg; + int nRecvVersion; + int64 nLastSend; int64 nLastRecv; int64 nLastSendEmpty; int64 nTimeConnected; - int nHeaderStart; - unsigned int nMessageStart; CAddress addr; std::string addrName; CService addrLocal; @@ -191,16 +235,15 @@ public: CCriticalSection cs_inventory; std::multimap mapAskFor; - CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION), vRecv(SER_NETWORK, MIN_PROTO_VERSION) + CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, MIN_PROTO_VERSION) { nServices = 0; hSocket = hSocketIn; + nRecvVersion = MIN_PROTO_VERSION; nLastSend = 0; nLastRecv = 0; nLastSendEmpty = GetTime(); nTimeConnected = GetTime(); - nHeaderStart = -1; - nMessageStart = -1; addr = addrIn; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; nVersion = 0; @@ -213,6 +256,8 @@ public: fDisconnect = false; nRefCount = 0; nReleaseTime = 0; + nSendSize = 0; + nSendOffset = 0; hashContinue = 0; pindexLastGetBlocksBegin = 0; hashLastGetBlocksEnd = 0; @@ -250,6 +295,26 @@ public: return std::max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0); } + // requires LOCK(cs_vRecvMsg) + unsigned int GetTotalRecvSize() + { + unsigned int total = 0; + BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) + total += msg.vRecv.size() + 24; + return total; + } + + // requires LOCK(cs_vRecvMsg) + bool ReceiveMsgBytes(const char *pch, unsigned int nBytes); + + // requires LOCK(cs_vRecvMsg) + void SetRecvVersion(int nVersionIn) + { + nRecvVersion = nVersionIn; + BOOST_FOREACH(CNetMessage &msg, vRecvMsg) + msg.SetVersion(nVersionIn); + } + CNode* AddRef(int64 nTimeout=0) { if (nTimeout != 0) @@ -324,11 +389,8 @@ public: void BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend) { ENTER_CRITICAL_SECTION(cs_vSend); - if (nHeaderStart != -1) - AbortMessage(); - nHeaderStart = vSend.size(); - vSend << CMessageHeader(pszCommand, 0); - nMessageStart = vSend.size(); + assert(ssSend.size() == 0); + ssSend << CMessageHeader(pszCommand, 0); if (fDebug) printf("sending: %s ", pszCommand); } @@ -336,11 +398,8 @@ public: // TODO: Document the precondition of this function. Is cs_vSend locked? void AbortMessage() UNLOCK_FUNCTION(cs_vSend) { - if (nHeaderStart < 0) - return; - vSend.resize(nHeaderStart); - nHeaderStart = -1; - nMessageStart = -1; + ssSend.clear(); + LEAVE_CRITICAL_SECTION(cs_vSend); if (fDebug) @@ -357,26 +416,32 @@ public: return; } - if (nHeaderStart < 0) + if (ssSend.size() == 0) return; // Set the size - unsigned int nSize = vSend.size() - nMessageStart; - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::MESSAGE_SIZE_OFFSET, &nSize, sizeof(nSize)); + unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE; + memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize)); // Set the checksum - uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end()); + uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end()); unsigned int nChecksum = 0; memcpy(&nChecksum, &hash, sizeof(nChecksum)); - assert(nMessageStart - nHeaderStart >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::CHECKSUM_OFFSET, &nChecksum, sizeof(nChecksum)); + assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); + memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum)); if (fDebug) { printf("(%d bytes)\n", nSize); } - nHeaderStart = -1; - nMessageStart = -1; + std::deque::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData()); + ssSend.GetAndClear(*it); + nSendSize += (*it).size(); + + // If write queue empty, attempt "optimistic write" + if (it == vSendMsg.begin()) + SocketSendData(this); + LEAVE_CRITICAL_SECTION(cs_vSend); } @@ -403,7 +468,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1; + ssSend << a1; EndMessage(); } catch (...) @@ -419,7 +484,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2; + ssSend << a1 << a2; EndMessage(); } catch (...) @@ -435,7 +500,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3; + ssSend << a1 << a2 << a3; EndMessage(); } catch (...) @@ -451,7 +516,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4; + ssSend << a1 << a2 << a3 << a4; EndMessage(); } catch (...) @@ -467,7 +532,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5; + ssSend << a1 << a2 << a3 << a4 << a5; EndMessage(); } catch (...) @@ -483,7 +548,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6; + ssSend << a1 << a2 << a3 << a4 << a5 << a6; EndMessage(); } catch (...) @@ -499,7 +564,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; EndMessage(); } catch (...) @@ -515,7 +580,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; EndMessage(); } catch (...) @@ -531,7 +596,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; EndMessage(); } catch (...) diff --git a/src/protocol.h b/src/protocol.h index f5c162054..499842507 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -56,7 +56,8 @@ class CMessageHeader CHECKSUM_SIZE=sizeof(int), MESSAGE_SIZE_OFFSET=MESSAGE_START_SIZE+COMMAND_SIZE, - CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE + CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE, + HEADER_SIZE=MESSAGE_START_SIZE+COMMAND_SIZE+MESSAGE_SIZE_SIZE+CHECKSUM_SIZE }; char pchMessageStart[MESSAGE_START_SIZE]; char pchCommand[COMMAND_SIZE]; diff --git a/src/serialize.h b/src/serialize.h index f2626281c..e3d9939bc 100644 --- a/src/serialize.h +++ b/src/serialize.h @@ -789,6 +789,7 @@ struct ser_streamplaceholder +typedef std::vector > CSerializeData; /** Double ended buffer combining vector and stream-like interfaces. * @@ -798,7 +799,7 @@ struct ser_streamplaceholder class CDataStream { protected: - typedef std::vector > vector_type; + typedef CSerializeData vector_type; vector_type vch; unsigned int nReadPos; short state; @@ -1095,6 +1096,11 @@ public: ::Unserialize(*this, obj, nType, nVersion); return (*this); } + + void GetAndClear(CSerializeData &data) { + vch.swap(data); + CSerializeData().swap(vch); + } };