From c7f039b674b43b741f20bf7521eb8a68426f4275 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Fri, 29 Mar 2013 23:49:38 +0100 Subject: [PATCH] Process getdata invs separately until send buffer overflows There exists a per-message-processed send buffer overflow protection, where processing is halted when the send buffer is larger than the allowed maximum. This protection does not apply to individual items, however, and getdata has the potential for causing large amounts of data to be sent. In case several hundreds of blocks are requested in one getdata, the send buffer can easily grow 50 megabytes above the send buffer limit. This commit breaks up the processing of getdata requests, remembering them inside a CNode when too many are requested at once. --- src/main.cpp | 210 ++++++++++++++++++++++++++++----------------------- src/net.h | 1 + 2 files changed, 117 insertions(+), 94 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 77061fab..b29091b4 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3029,6 +3029,115 @@ bool static AlreadyHave(const CInv& inv) unsigned char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 }; +void static ProcessGetData(CNode* pfrom) +{ + std::deque::iterator it = pfrom->vRecvGetData.begin(); + + vector vNotFound; + + while (it != pfrom->vRecvGetData.end()) { + // Don't bother if send buffer is too full to respond anyway + if (pfrom->nSendSize >= SendBufferSize()) + break; + + const CInv &inv = *it; + { + if (fShutdown) + break; + it++; + + if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK) + { + // Send block from disk + map::iterator mi = mapBlockIndex.find(inv.hash); + if (mi != mapBlockIndex.end()) + { + CBlock block; + block.ReadFromDisk((*mi).second); + if (inv.type == MSG_BLOCK) + pfrom->PushMessage("block", block); + else // MSG_FILTERED_BLOCK) + { + LOCK(pfrom->cs_filter); + if (pfrom->pfilter) + { + CMerkleBlock merkleBlock(block, *pfrom->pfilter); + pfrom->PushMessage("merkleblock", merkleBlock); + // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see + // This avoids hurting performance by pointlessly requiring a round-trip + // Note that there is currently no way for a node to request any single transactions we didnt send here - + // they must either disconnect and retry or request the full block. + // Thus, the protocol spec specified allows for us to provide duplicate txn here, + // however we MUST always provide at least what the remote peer needs + typedef std::pair PairType; + BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn) + if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second))) + pfrom->PushMessage("tx", block.vtx[pair.first]); + } + // else + // no response + } + + // Trigger them to send a getblocks request for the next batch of inventory + if (inv.hash == pfrom->hashContinue) + { + // Bypass PushInventory, this must send even if redundant, + // and we want it right after the last block so they don't + // wait for other stuff first. + vector vInv; + vInv.push_back(CInv(MSG_BLOCK, hashBestChain)); + pfrom->PushMessage("inv", vInv); + pfrom->hashContinue = 0; + } + } + } + else if (inv.IsKnownType()) + { + // Send stream from relay memory + bool pushed = false; + { + LOCK(cs_mapRelay); + map::iterator mi = mapRelay.find(inv); + if (mi != mapRelay.end()) { + pfrom->PushMessage(inv.GetCommand(), (*mi).second); + pushed = true; + } + } + if (!pushed && inv.type == MSG_TX) { + LOCK(mempool.cs); + if (mempool.exists(inv.hash)) { + CTransaction tx = mempool.lookup(inv.hash); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + ss.reserve(1000); + ss << tx; + pfrom->PushMessage("tx", ss); + pushed = true; + } + } + if (!pushed) { + vNotFound.push_back(inv); + } + } + + // Track requests for our stuff. + Inventory(inv.hash); + } + } + + pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it); + + if (!vNotFound.empty()) { + // Let the peer know that we didn't find what it asked for, so it doesn't + // have to wait around forever. Currently only SPV clients actually care + // about this message: it's needed when they are recursively walking the + // dependencies of relevant unconfirmed transactions. SPV clients want to + // do that because they want to know about (and store and rebroadcast and + // risk analyze) the dependencies of transactions relevant to them, without + // having to download the entire memory pool. + pfrom->PushMessage("notfound", vNotFound); + } +} + bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) { RandAddSeedPerfmon(); @@ -3302,101 +3411,11 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) if (fDebugNet || (vInv.size() != 1)) printf("received getdata (%"PRIszu" invsz)\n", vInv.size()); - vector vNotFound; - BOOST_FOREACH(const CInv& inv, vInv) - { - if (fShutdown) - return true; - if (fDebugNet || (vInv.size() == 1)) - printf("received getdata for: %s\n", inv.ToString().c_str()); - - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK) - { - // Send block from disk - map::iterator mi = mapBlockIndex.find(inv.hash); - if (mi != mapBlockIndex.end()) - { - CBlock block; - block.ReadFromDisk((*mi).second); - if (inv.type == MSG_BLOCK) - pfrom->PushMessage("block", block); - else // MSG_FILTERED_BLOCK) - { - LOCK(pfrom->cs_filter); - if (pfrom->pfilter) - { - CMerkleBlock merkleBlock(block, *pfrom->pfilter); - pfrom->PushMessage("merkleblock", merkleBlock); - // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see - // This avoids hurting performance by pointlessly requiring a round-trip - // Note that there is currently no way for a node to request any single transactions we didnt send here - - // they must either disconnect and retry or request the full block. - // Thus, the protocol spec specified allows for us to provide duplicate txn here, - // however we MUST always provide at least what the remote peer needs - typedef std::pair PairType; - BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn) - if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second))) - pfrom->PushMessage("tx", block.vtx[pair.first]); - } - // else - // no response - } + if ((fDebugNet && vInv.size() > 0) || (vInv.size() == 1)) + printf("received getdata for: %s\n", vInv[0].ToString().c_str()); - // Trigger them to send a getblocks request for the next batch of inventory - if (inv.hash == pfrom->hashContinue) - { - // Bypass PushInventory, this must send even if redundant, - // and we want it right after the last block so they don't - // wait for other stuff first. - vector vInv; - vInv.push_back(CInv(MSG_BLOCK, hashBestChain)); - pfrom->PushMessage("inv", vInv); - pfrom->hashContinue = 0; - } - } - } - else if (inv.IsKnownType()) - { - // Send stream from relay memory - bool pushed = false; - { - LOCK(cs_mapRelay); - map::iterator mi = mapRelay.find(inv); - if (mi != mapRelay.end()) { - pfrom->PushMessage(inv.GetCommand(), (*mi).second); - pushed = true; - } - } - if (!pushed && inv.type == MSG_TX) { - LOCK(mempool.cs); - if (mempool.exists(inv.hash)) { - CTransaction tx = mempool.lookup(inv.hash); - CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); - ss.reserve(1000); - ss << tx; - pfrom->PushMessage("tx", ss); - pushed = true; - } - } - if (!pushed) { - vNotFound.push_back(inv); - } - } - - // Track requests for our stuff. - Inventory(inv.hash); - - if (!vNotFound.empty()) { - // Let the peer know that we didn't find what it asked for, so it doesn't - // have to wait around forever. Currently only SPV clients actually care - // about this message: it's needed when they are recursively walking the - // dependencies of relevant unconfirmed transactions. SPV clients want to - // do that because they want to know about (and store and rebroadcast and - // risk analyze) the dependencies of transactions relevant to them, without - // having to download the entire memory pool. - pfrom->PushMessage("notfound", vNotFound); - } - } + pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); + ProcessGetData(pfrom); } @@ -3721,6 +3740,9 @@ bool ProcessMessages(CNode* pfrom) // bool fOk = true; + if (!pfrom->vRecvGetData.empty()) + ProcessGetData(pfrom); + std::deque::iterator it = pfrom->vRecvMsg.begin(); while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway diff --git a/src/net.h b/src/net.h index 9805e39f..368e4cd4 100644 --- a/src/net.h +++ b/src/net.h @@ -179,6 +179,7 @@ public: std::deque vSendMsg; CCriticalSection cs_vSend; + std::deque vRecvGetData; std::deque vRecvMsg; CCriticalSection cs_vRecvMsg; int nRecvVersion;