diff --git a/src/main.cpp b/src/main.cpp index 277724565..d2b7c6bc4 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -4556,12 +4556,16 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, vRecv >> LIMITED_STRING(pfrom->strSubVer, MAX_SUBVERSION_LENGTH); pfrom->cleanSubVer = SanitizeString(pfrom->strSubVer); } - if (!vRecv.empty()) + if (!vRecv.empty()) { vRecv >> pfrom->nStartingHeight; - if (!vRecv.empty()) - vRecv >> pfrom->fRelayTxes; // set to true after we get the first filter* message - else - pfrom->fRelayTxes = true; + } + { + LOCK(pfrom->cs_filter); + if (!vRecv.empty()) + vRecv >> pfrom->fRelayTxes; // set to true after we get the first filter* message + else + pfrom->fRelayTxes = true; + } // Disconnect if we connected to ourself if (nNonce == nLocalHostNonce && nNonce > 1) @@ -5234,34 +5238,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, pfrom->fDisconnect = true; return true; } - LOCK2(cs_main, pfrom->cs_filter); - std::vector vtxid; - mempool.queryHashes(vtxid); - vector vInv; - BOOST_FOREACH(uint256& hash, vtxid) { - CInv inv(MSG_TX, hash); - if (pfrom->pfilter) { - CTransaction tx; - bool fInMemPool = mempool.lookup(hash, tx); - if (!fInMemPool) continue; // another thread removed since queryHashes, maybe... - if (!pfrom->pfilter->IsRelevantAndUpdate(tx)) continue; - } - if (pfrom->minFeeFilter) { - CFeeRate feeRate; - mempool.lookupFeeRate(hash, feeRate); - LOCK(pfrom->cs_feeFilter); - if (feeRate.GetFeePerK() < pfrom->minFeeFilter) - continue; - } - vInv.push_back(inv); - if (vInv.size() == MAX_INV_SZ) { - pfrom->PushMessage(NetMsgType::INV, vInv); - vInv.clear(); - } - } - if (vInv.size() > 0) - pfrom->PushMessage(NetMsgType::INV, vInv); + LOCK(pfrom->cs_inventory); + pfrom->fSendMempool = true; } @@ -5349,12 +5328,13 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, CBloomFilter filter; vRecv >> filter; + LOCK(pfrom->cs_filter); + if (!filter.IsWithinSizeConstraints()) // There is no excuse for sending a too-large filter Misbehaving(pfrom->GetId(), 100); else { - LOCK(pfrom->cs_filter); delete pfrom->pfilter; pfrom->pfilter = new CBloomFilter(filter); pfrom->pfilter->UpdateEmptyFull(); @@ -5559,6 +5539,22 @@ bool ProcessMessages(CNode* pfrom) return fOk; } +class CompareInvMempoolOrder +{ + CTxMemPool *mp; +public: + CompareInvMempoolOrder(CTxMemPool *mempool) + { + mp = mempool; + } + + bool operator()(std::set::iterator a, std::set::iterator b) + { + /* As std::make_heap produces a max-heap, we want the entries with the + * fewest ancestors/highest fee to sort later. */ + return mp->CompareDepthAndScore(*b, *a); + } +}; bool SendMessages(CNode* pto) { @@ -5798,49 +5794,127 @@ bool SendMessages(CNode* pto) // Message: inventory // vector vInv; - vector vInvWait; { + LOCK(pto->cs_inventory); + vInv.reserve(std::max(pto->vInventoryBlockToSend.size(), INVENTORY_BROADCAST_MAX)); + + // Add blocks + BOOST_FOREACH(const uint256& hash, pto->vInventoryBlockToSend) { + vInv.push_back(CInv(MSG_BLOCK, hash)); + if (vInv.size() == MAX_INV_SZ) { + pto->PushMessage(NetMsgType::INV, vInv); + vInv.clear(); + } + } + pto->vInventoryBlockToSend.clear(); + + // Check whether periodic sends should happen bool fSendTrickle = pto->fWhitelisted; if (pto->nNextInvSend < nNow) { fSendTrickle = true; - pto->nNextInvSend = PoissonNextSend(nNow, AVG_INVENTORY_BROADCAST_INTERVAL); + // Use half the delay for outbound peers, as there is less privacy concern for them. + pto->nNextInvSend = PoissonNextSend(nNow, INVENTORY_BROADCAST_INTERVAL >> !pto->fInbound); + } + + // Time to send but the peer has requested we not relay transactions. + if (fSendTrickle) { + LOCK(pto->cs_filter); + if (!pto->fRelayTxes) pto->setInventoryTxToSend.clear(); } - LOCK(pto->cs_inventory); - vInv.reserve(std::min(1000, pto->vInventoryToSend.size())); - vInvWait.reserve(pto->vInventoryToSend.size()); - BOOST_FOREACH(const CInv& inv, pto->vInventoryToSend) - { - if (inv.type == MSG_TX && pto->filterInventoryKnown.contains(inv.hash)) - continue; - // trickle out tx inv to protect privacy - if (inv.type == MSG_TX && !fSendTrickle) + // Respond to BIP35 mempool requests + if (fSendTrickle && pto->fSendMempool) { + std::vector vtxid; + mempool.queryHashes(vtxid); + pto->fSendMempool = false; + CAmount filterrate = 0; { - // 1/4 of tx invs blast to all immediately - static uint256 hashSalt; - if (hashSalt.IsNull()) - hashSalt = GetRandHash(); - uint256 hashRand = ArithToUint256(UintToArith256(inv.hash) ^ UintToArith256(hashSalt)); - hashRand = Hash(BEGIN(hashRand), END(hashRand)); - bool fTrickleWait = ((UintToArith256(hashRand) & 3) != 0); + LOCK(pto->cs_feeFilter); + filterrate = pto->minFeeFilter; + } - if (fTrickleWait) - { - vInvWait.push_back(inv); - continue; + LOCK(pto->cs_filter); + + BOOST_FOREACH(const uint256& hash, vtxid) { + CInv inv(MSG_TX, hash); + pto->setInventoryTxToSend.erase(hash); + if (filterrate) { + CFeeRate feeRate; + mempool.lookupFeeRate(hash, feeRate); + if (feeRate.GetFeePerK() < filterrate) + continue; + } + if (pto->pfilter) { + CTransaction tx; + bool fInMemPool = mempool.lookup(hash, tx); + if (!fInMemPool) continue; // another thread removed since queryHashes, maybe... + if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue; + } + pto->filterInventoryKnown.insert(hash); + vInv.push_back(inv); + if (vInv.size() == MAX_INV_SZ) { + pto->PushMessage(NetMsgType::INV, vInv); + vInv.clear(); } } + } - pto->filterInventoryKnown.insert(inv.hash); - - vInv.push_back(inv); - if (vInv.size() >= 1000) + // Determine transactions to relay + if (fSendTrickle) { + // Produce a vector with all candidates for sending + vector::iterator> vInvTx; + vInvTx.reserve(pto->setInventoryTxToSend.size()); + for (std::set::iterator it = pto->setInventoryTxToSend.begin(); it != pto->setInventoryTxToSend.end(); it++) { + vInvTx.push_back(it); + } + CAmount filterrate = 0; { - pto->PushMessage(NetMsgType::INV, vInv); - vInv.clear(); + LOCK(pto->cs_feeFilter); + filterrate = pto->minFeeFilter; + } + // Topologically and fee-rate sort the inventory we send for privacy and priority reasons. + // A heap is used so that not all items need sorting if only a few are being sent. + CompareInvMempoolOrder compareInvMempoolOrder(&mempool); + std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder); + // No reason to drain out at many times the network's capacity, + // especially since we have many peers and some will draw much shorter delays. + unsigned int nRelayedTransactions = 0; + LOCK(pto->cs_filter); + while (!vInvTx.empty() && nRelayedTransactions < INVENTORY_BROADCAST_MAX) { + // Fetch the top element from the heap + std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder); + std::set::iterator it = vInvTx.back(); + vInvTx.pop_back(); + uint256 hash = *it; + // Remove it from the to-be-sent set + pto->setInventoryTxToSend.erase(it); + // Check if not in the filter already + if (pto->filterInventoryKnown.contains(hash)) { + continue; + } + // Not in the mempool anymore? don't bother sending it. + CFeeRate feeRate; + if (!mempool.lookupFeeRate(hash, feeRate)) { + continue; + } + if (filterrate && feeRate.GetFeePerK() < filterrate) { + continue; + } + if (pto->pfilter) { + CTransaction tx; + if (!mempool.lookup(hash, tx)) continue; + if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue; + } + // Send + vInv.push_back(CInv(MSG_TX, hash)); + nRelayedTransactions++; + if (vInv.size() == MAX_INV_SZ) { + pto->PushMessage(NetMsgType::INV, vInv); + vInv.clear(); + } + pto->filterInventoryKnown.insert(hash); } } - pto->vInventoryToSend = vInvWait; } if (!vInv.empty()) pto->PushMessage(NetMsgType::INV, vInv); diff --git a/src/main.h b/src/main.h index 2c9635bcf..71d44979a 100644 --- a/src/main.h +++ b/src/main.h @@ -99,9 +99,12 @@ static const unsigned int MAX_REJECT_MESSAGE_LENGTH = 111; static const unsigned int AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL = 24 * 24 * 60; /** Average delay between peer address broadcasts in seconds. */ static const unsigned int AVG_ADDRESS_BROADCAST_INTERVAL = 30; -/** Average delay between trickled inventory broadcasts in seconds. - * Blocks, whitelisted receivers, and a random 25% of transactions bypass this. */ -static const unsigned int AVG_INVENTORY_BROADCAST_INTERVAL = 5; +/** Average delay between trickled inventory transmissions in seconds. + * Blocks and whitelisted receivers bypass this, outbound peers get half this delay. */ +static const unsigned int INVENTORY_BROADCAST_INTERVAL = 5; +/** Maximum number of inventory items to send per transmission. + * Limits the impact of low-fee transaction floods. */ +static const unsigned int INVENTORY_BROADCAST_MAX = 7 * INVENTORY_BROADCAST_INTERVAL; /** Average delay between feefilter broadcasts in seconds. */ static const unsigned int AVG_FEEFILTER_BROADCAST_INTERVAL = 10 * 60; /** Maximum feefilter broadcast delay after significant change. */ diff --git a/src/net.cpp b/src/net.cpp index 1afe087be..6642ef651 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2088,20 +2088,7 @@ void RelayTransaction(const CTransaction& tx, CFeeRate feerate) LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) { - if(!pnode->fRelayTxes) - continue; - { - LOCK(pnode->cs_feeFilter); - if (feerate.GetFeePerK() < pnode->minFeeFilter) - continue; - } - LOCK(pnode->cs_filter); - if (pnode->pfilter) - { - if (pnode->pfilter->IsRelevantAndUpdate(tx)) - pnode->PushInventory(inv); - } else - pnode->PushInventory(inv); + pnode->PushInventory(inv); } } @@ -2387,6 +2374,7 @@ CNode::CNode(SOCKET hSocketIn, const CAddress& addrIn, const std::string& addrNa hashContinue = uint256(); nStartingHeight = -1; filterInventoryKnown.reset(); + fSendMempool = false; fGetAddr = false; nNextLocalAddrSend = 0; nNextAddrSend = 0; diff --git a/src/net.h b/src/net.h index bf367684f..b6ec7bf3e 100644 --- a/src/net.h +++ b/src/net.h @@ -357,7 +357,7 @@ public: // a) it allows us to not relay tx invs before receiving the peer's version message // b) the peer may tell us in its version message that we should not relay tx invs // unless it loads a bloom filter. - bool fRelayTxes; + bool fRelayTxes; //protected by cs_filter bool fSentAddr; CSemaphoreGrant grantOutbound; CCriticalSection cs_filter; @@ -397,7 +397,13 @@ public: // inventory based relay CRollingBloomFilter filterInventoryKnown; - std::vector vInventoryToSend; + // Set of transaction ids we still have to announce. + // They are sorted by the mempool before relay, so the order is not important. + std::set setInventoryTxToSend; + // List of block ids we still have announce. + // There is no final sorting before sending, as they are always sent immediately + // and in the order requested. + std::vector vInventoryBlockToSend; CCriticalSection cs_inventory; std::set setAskFor; std::multimap mapAskFor; @@ -405,6 +411,8 @@ public: // Used for headers announcements - unfiltered blocks to relay // Also protected by cs_inventory std::vector vBlockHashesToAnnounce; + // Used for BIP35 mempool sending, also protected by cs_inventory + bool fSendMempool; // Ping time measurement: // The pong reply we're expecting, or 0 if no pong expected. @@ -517,11 +525,13 @@ public: void PushInventory(const CInv& inv) { - { - LOCK(cs_inventory); - if (inv.type == MSG_TX && filterInventoryKnown.contains(inv.hash)) - return; - vInventoryToSend.push_back(inv); + LOCK(cs_inventory); + if (inv.type == MSG_TX) { + if (!filterInventoryKnown.contains(inv.hash)) { + setInventoryTxToSend.insert(inv.hash); + } + } else if (inv.type == MSG_BLOCK) { + vInventoryBlockToSend.push_back(inv.hash); } } diff --git a/src/txmempool.cpp b/src/txmempool.cpp index 420f6896b..a6070f526 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -752,6 +752,31 @@ void CTxMemPool::check(const CCoinsViewCache *pcoins) const assert(innerUsage == cachedInnerUsage); } +bool CTxMemPool::CompareDepthAndScore(const uint256& hasha, const uint256& hashb) +{ + LOCK(cs); + indexed_transaction_set::const_iterator i = mapTx.find(hasha); + if (i == mapTx.end()) return false; + indexed_transaction_set::const_iterator j = mapTx.find(hashb); + if (j == mapTx.end()) return true; + uint64_t counta = i->GetCountWithAncestors(); + uint64_t countb = j->GetCountWithAncestors(); + if (counta == countb) { + return CompareTxMemPoolEntryByScore()(*i, *j); + } + return counta < countb; +} + +namespace { +class DepthAndScoreComparator +{ + CTxMemPool *mp; +public: + DepthAndScoreComparator(CTxMemPool *mempool) : mp(mempool) {} + bool operator()(const uint256& a, const uint256& b) { return mp->CompareDepthAndScore(a, b); } +}; +} + void CTxMemPool::queryHashes(vector& vtxid) { vtxid.clear(); @@ -760,6 +785,8 @@ void CTxMemPool::queryHashes(vector& vtxid) vtxid.reserve(mapTx.size()); for (indexed_transaction_set::iterator mi = mapTx.begin(); mi != mapTx.end(); ++mi) vtxid.push_back(mi->GetTx().GetHash()); + + std::sort(vtxid.begin(), vtxid.end(), DepthAndScoreComparator(this)); } bool CTxMemPool::lookup(uint256 hash, CTransaction& result) const diff --git a/src/txmempool.h b/src/txmempool.h index d17e3322d..bca8dd979 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -511,6 +511,7 @@ public: std::list& conflicts, bool fCurrentEstimate = true); void clear(); void _clear(); //lock free + bool CompareDepthAndScore(const uint256& hasha, const uint256& hashb); void queryHashes(std::vector& vtxid); void pruneSpent(const uint256& hash, CCoins &coins); unsigned int GetTransactionsUpdated() const;