Browse Source

Merge #8126: std::shared_ptr based CTransaction storage in mempool

288d85d Get rid of CTxMempool::lookup() entirely (Pieter Wuille)
c2a4724 Optimization: use usec in expiration and reuse nNow (Pieter Wuille)
e9b4780 Optimization: don't check the mempool at all if no mempool req ever (Pieter Wuille)
dbfb426 Optimize the relay map to use shared_ptr's (Pieter Wuille)
8d39d7a Switch CTransaction storage in mempool to std::shared_ptr (Pieter Wuille)
1b9e6d3 Add support for unique_ptr and shared_ptr to memusage (Pieter Wuille)
0.13
Wladimir J. van der Laan 9 years ago
parent
commit
a7c41f2de0
No known key found for this signature in database
GPG Key ID: 74810B012346C9A6
  1. 67
      src/main.cpp
  2. 24
      src/memusage.h
  3. 18
      src/test/policyestimator_tests.cpp
  4. 86
      src/txmempool.cpp
  5. 29
      src/txmempool.h

67
src/main.cpp

@ -81,9 +81,6 @@ uint64_t nPruneTarget = 0; @@ -81,9 +81,6 @@ uint64_t nPruneTarget = 0;
int64_t nMaxTipAge = DEFAULT_MAX_TIP_AGE;
bool fEnableReplacement = DEFAULT_ENABLE_REPLACEMENT;
std::map<uint256, CTransaction> mapRelay;
std::deque<std::pair<int64_t, uint256> > vRelayExpiration;
CCriticalSection cs_mapRelay;
CFeeRate minRelayTxFee = CFeeRate(DEFAULT_MIN_RELAY_TX_FEE);
CAmount maxTxFee = DEFAULT_TRANSACTION_MAXFEE;
@ -216,6 +213,12 @@ namespace { @@ -216,6 +213,12 @@ namespace {
/** Number of peers from which we're downloading blocks. */
int nPeersWithValidatedDownloads = 0;
/** Relay map, protected by cs_main. */
typedef std::map<uint256, std::shared_ptr<const CTransaction>> MapRelay;
MapRelay mapRelay;
/** Expiration-time ordered list of (expire time, relay map entry) pairs, protected by cs_main). */
std::deque<std::pair<int64_t, MapRelay::iterator>> vRelayExpiration;
} // anon namespace
//////////////////////////////////////////////////////////////////////////////
@ -1443,8 +1446,10 @@ bool GetTransaction(const uint256 &hash, CTransaction &txOut, const Consensus::P @@ -1443,8 +1446,10 @@ bool GetTransaction(const uint256 &hash, CTransaction &txOut, const Consensus::P
LOCK(cs_main);
if (mempool.lookup(hash, txOut))
std::shared_ptr<const CTransaction> ptx = mempool.get(hash);
if (ptx)
{
txOut = *ptx;
return true;
}
@ -4521,30 +4526,24 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam @@ -4521,30 +4526,24 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
}
}
}
else if (inv.IsKnownType())
else if (inv.type == MSG_TX)
{
CTransaction tx;
// Send stream from relay memory
bool push = false;
{
LOCK(cs_mapRelay);
map<uint256, CTransaction>::iterator mi = mapRelay.find(inv.hash);
auto mi = mapRelay.find(inv.hash);
if (mi != mapRelay.end()) {
tx = (*mi).second;
pfrom->PushMessage(NetMsgType::TX, *mi->second);
push = true;
}
}
if (!push && inv.type == MSG_TX) {
int64_t txtime;
} else if (pfrom->timeLastMempoolReq) {
auto txinfo = mempool.info(inv.hash);
// To protect privacy, do not answer getdata using the mempool when
// that TX couldn't have been INVed in reply to a MEMPOOL request.
if (mempool.lookup(inv.hash, tx, txtime) && txtime <= pfrom->timeLastMempoolReq) {
if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) {
pfrom->PushMessage(NetMsgType::TX, *txinfo.tx);
push = true;
}
}
if (push) {
pfrom->PushMessage(inv.GetCommand(), tx);
} else {
if (!push) {
vNotFound.push_back(inv);
}
}
@ -5923,8 +5922,7 @@ bool SendMessages(CNode* pto) @@ -5923,8 +5922,7 @@ bool SendMessages(CNode* pto)
// Respond to BIP35 mempool requests
if (fSendTrickle && pto->fSendMempool) {
std::vector<uint256> vtxid;
mempool.queryHashes(vtxid);
auto vtxinfo = mempool.infoAll();
pto->fSendMempool = false;
CAmount filterrate = 0;
{
@ -5934,20 +5932,16 @@ bool SendMessages(CNode* pto) @@ -5934,20 +5932,16 @@ bool SendMessages(CNode* pto)
LOCK(pto->cs_filter);
BOOST_FOREACH(const uint256& hash, vtxid) {
for (const auto& txinfo : vtxinfo) {
const uint256& hash = txinfo.tx->GetHash();
CInv inv(MSG_TX, hash);
pto->setInventoryTxToSend.erase(hash);
if (filterrate) {
CFeeRate feeRate;
mempool.lookupFeeRate(hash, feeRate);
if (feeRate.GetFeePerK() < filterrate)
if (txinfo.feeRate.GetFeePerK() < filterrate)
continue;
}
if (pto->pfilter) {
CTransaction tx;
bool fInMemPool = mempool.lookup(hash, tx);
if (!fInMemPool) continue; // another thread removed since queryHashes, maybe...
if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue;
if (!pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue;
}
pto->filterInventoryKnown.insert(hash);
vInv.push_back(inv);
@ -5993,31 +5987,28 @@ bool SendMessages(CNode* pto) @@ -5993,31 +5987,28 @@ bool SendMessages(CNode* pto)
continue;
}
// Not in the mempool anymore? don't bother sending it.
CFeeRate feeRate;
if (!mempool.lookupFeeRate(hash, feeRate)) {
auto txinfo = mempool.info(hash);
if (!txinfo.tx) {
continue;
}
if (filterrate && feeRate.GetFeePerK() < filterrate) {
if (filterrate && txinfo.feeRate.GetFeePerK() < filterrate) {
continue;
}
CTransaction tx;
if (!mempool.lookup(hash, tx)) continue;
if (pto->pfilter && !pto->pfilter->IsRelevantAndUpdate(tx)) continue;
if (pto->pfilter && !pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue;
// Send
vInv.push_back(CInv(MSG_TX, hash));
nRelayedTransactions++;
{
LOCK(cs_mapRelay);
// Expire old relay messages
while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime())
while (!vRelayExpiration.empty() && vRelayExpiration.front().first < nNow)
{
mapRelay.erase(vRelayExpiration.front().second);
vRelayExpiration.pop_front();
}
auto ret = mapRelay.insert(std::make_pair(hash, tx));
auto ret = mapRelay.insert(std::make_pair(hash, std::move(txinfo.tx)));
if (ret.second) {
vRelayExpiration.push_back(std::make_pair(GetTime() + 15 * 60, hash));
vRelayExpiration.push_back(std::make_pair(nNow + 15 * 60 * 1000000, ret.first));
}
}
if (vInv.size() == MAX_INV_SZ) {

24
src/memusage.h

@ -72,6 +72,15 @@ private: @@ -72,6 +72,15 @@ private:
X x;
};
struct stl_shared_counter
{
/* Various platforms use different sized counters here.
* Conservatively assume that they won't be larger than size_t. */
void* class_type;
size_t use_count;
size_t weak_count;
};
template<typename X>
static inline size_t DynamicUsage(const std::vector<X>& v)
{
@ -122,6 +131,21 @@ static inline size_t IncrementalDynamicUsage(const indirectmap<X, Y>& m) @@ -122,6 +131,21 @@ static inline size_t IncrementalDynamicUsage(const indirectmap<X, Y>& m)
return MallocUsage(sizeof(stl_tree_node<std::pair<const X*, Y> >));
}
template<typename X>
static inline size_t DynamicUsage(const std::unique_ptr<X>& p)
{
return p ? MallocUsage(sizeof(X)) : 0;
}
template<typename X>
static inline size_t DynamicUsage(const std::shared_ptr<X>& p)
{
// A shared_ptr can either use a single continuous memory block for both
// the counter and the storage (when using std::make_shared), or separate.
// We can't observe the difference, however, so assume the worst.
return p ? MallocUsage(sizeof(X)) + MallocUsage(sizeof(stl_shared_counter)) : 0;
}
// Boost data structures
template<typename X>

18
src/test/policyestimator_tests.cpp

@ -74,9 +74,9 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) @@ -74,9 +74,9 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
// 9/10 blocks add 2nd highest and so on until ...
// 1/10 blocks add lowest fee/pri transactions
while (txHashes[9-h].size()) {
CTransaction btx;
if (mpool.lookup(txHashes[9-h].back(), btx))
block.push_back(btx);
std::shared_ptr<const CTransaction> ptx = mpool.get(txHashes[9-h].back());
if (ptx)
block.push_back(*ptx);
txHashes[9-h].pop_back();
}
}
@ -160,9 +160,9 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) @@ -160,9 +160,9 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
// Estimates should still not be below original
for (int j = 0; j < 10; j++) {
while(txHashes[j].size()) {
CTransaction btx;
if (mpool.lookup(txHashes[j].back(), btx))
block.push_back(btx);
std::shared_ptr<const CTransaction> ptx = mpool.get(txHashes[j].back());
if (ptx)
block.push_back(*ptx);
txHashes[j].pop_back();
}
}
@ -181,9 +181,9 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) @@ -181,9 +181,9 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
tx.vin[0].prevout.n = 10000*blocknum+100*j+k;
uint256 hash = tx.GetHash();
mpool.addUnchecked(hash, entry.Fee(feeV[k/4][j]).Time(GetTime()).Priority(priV[k/4][j]).Height(blocknum).FromTx(tx, &mpool));
CTransaction btx;
if (mpool.lookup(hash, btx))
block.push_back(btx);
std::shared_ptr<const CTransaction> ptx = mpool.get(hash);
if (ptx)
block.push_back(*ptx);
}
}
mpool.removeForBlock(block, ++blocknum, dummyConflicted);

86
src/txmempool.cpp

@ -23,18 +23,18 @@ CTxMemPoolEntry::CTxMemPoolEntry(const CTransaction& _tx, const CAmount& _nFee, @@ -23,18 +23,18 @@ CTxMemPoolEntry::CTxMemPoolEntry(const CTransaction& _tx, const CAmount& _nFee,
int64_t _nTime, double _entryPriority, unsigned int _entryHeight,
bool poolHasNoInputsOf, CAmount _inChainInputValue,
bool _spendsCoinbase, unsigned int _sigOps, LockPoints lp):
tx(_tx), nFee(_nFee), nTime(_nTime), entryPriority(_entryPriority), entryHeight(_entryHeight),
tx(std::make_shared<CTransaction>(_tx)), nFee(_nFee), nTime(_nTime), entryPriority(_entryPriority), entryHeight(_entryHeight),
hadNoDependencies(poolHasNoInputsOf), inChainInputValue(_inChainInputValue),
spendsCoinbase(_spendsCoinbase), sigOpCount(_sigOps), lockPoints(lp)
{
nTxSize = ::GetSerializeSize(tx, SER_NETWORK, PROTOCOL_VERSION);
nModSize = tx.CalculateModifiedSize(nTxSize);
nUsageSize = RecursiveDynamicUsage(tx);
nTxSize = ::GetSerializeSize(_tx, SER_NETWORK, PROTOCOL_VERSION);
nModSize = _tx.CalculateModifiedSize(nTxSize);
nUsageSize = RecursiveDynamicUsage(*tx) + memusage::DynamicUsage(tx);
nCountWithDescendants = 1;
nSizeWithDescendants = nTxSize;
nModFeesWithDescendants = nFee;
CAmount nValueIn = tx.GetValueOut()+nFee;
CAmount nValueIn = _tx.GetValueOut()+nFee;
assert(inChainInputValue <= nValueIn);
feeDelta = 0;
@ -768,50 +768,76 @@ bool CTxMemPool::CompareDepthAndScore(const uint256& hasha, const uint256& hashb @@ -768,50 +768,76 @@ bool CTxMemPool::CompareDepthAndScore(const uint256& hasha, const uint256& hashb
namespace {
class DepthAndScoreComparator
{
CTxMemPool *mp;
public:
DepthAndScoreComparator(CTxMemPool *mempool) : mp(mempool) {}
bool operator()(const uint256& a, const uint256& b) { return mp->CompareDepthAndScore(a, b); }
bool operator()(const CTxMemPool::indexed_transaction_set::const_iterator& a, const CTxMemPool::indexed_transaction_set::const_iterator& b)
{
uint64_t counta = a->GetCountWithAncestors();
uint64_t countb = b->GetCountWithAncestors();
if (counta == countb) {
return CompareTxMemPoolEntryByScore()(*a, *b);
}
return counta < countb;
}
};
}
void CTxMemPool::queryHashes(vector<uint256>& vtxid)
std::vector<CTxMemPool::indexed_transaction_set::const_iterator> CTxMemPool::GetSortedDepthAndScore() const
{
vtxid.clear();
std::vector<indexed_transaction_set::const_iterator> iters;
AssertLockHeld(cs);
iters.reserve(mapTx.size());
for (indexed_transaction_set::iterator mi = mapTx.begin(); mi != mapTx.end(); ++mi) {
iters.push_back(mi);
}
std::sort(iters.begin(), iters.end(), DepthAndScoreComparator());
return iters;
}
void CTxMemPool::queryHashes(vector<uint256>& vtxid)
{
LOCK(cs);
auto iters = GetSortedDepthAndScore();
vtxid.clear();
vtxid.reserve(mapTx.size());
for (indexed_transaction_set::iterator mi = mapTx.begin(); mi != mapTx.end(); ++mi)
vtxid.push_back(mi->GetTx().GetHash());
std::sort(vtxid.begin(), vtxid.end(), DepthAndScoreComparator(this));
for (auto it : iters) {
vtxid.push_back(it->GetTx().GetHash());
}
}
bool CTxMemPool::lookup(uint256 hash, CTransaction& result, int64_t& time) const
std::vector<TxMempoolInfo> CTxMemPool::infoAll() const
{
LOCK(cs);
indexed_transaction_set::const_iterator i = mapTx.find(hash);
if (i == mapTx.end()) return false;
result = i->GetTx();
time = i->GetTime();
return true;
auto iters = GetSortedDepthAndScore();
std::vector<TxMempoolInfo> ret;
ret.reserve(mapTx.size());
for (auto it : iters) {
ret.push_back(TxMempoolInfo{it->GetSharedTx(), it->GetTime(), CFeeRate(it->GetFee(), it->GetTxSize())});
}
return ret;
}
bool CTxMemPool::lookup(uint256 hash, CTransaction& result) const
std::shared_ptr<const CTransaction> CTxMemPool::get(const uint256& hash) const
{
int64_t time;
return CTxMemPool::lookup(hash, result, time);
LOCK(cs);
indexed_transaction_set::const_iterator i = mapTx.find(hash);
if (i == mapTx.end())
return nullptr;
return i->GetSharedTx();
}
bool CTxMemPool::lookupFeeRate(const uint256& hash, CFeeRate& feeRate) const
TxMempoolInfo CTxMemPool::info(const uint256& hash) const
{
LOCK(cs);
indexed_transaction_set::const_iterator i = mapTx.find(hash);
if (i == mapTx.end())
return false;
feeRate = CFeeRate(i->GetFee(), i->GetTxSize());
return true;
return TxMempoolInfo();
return TxMempoolInfo{i->GetSharedTx(), i->GetTime(), CFeeRate(i->GetFee(), i->GetTxSize())};
}
CFeeRate CTxMemPool::estimateFee(int nBlocks) const
@ -924,9 +950,9 @@ bool CCoinsViewMemPool::GetCoins(const uint256 &txid, CCoins &coins) const { @@ -924,9 +950,9 @@ bool CCoinsViewMemPool::GetCoins(const uint256 &txid, CCoins &coins) const {
// If an entry in the mempool exists, always return that one, as it's guaranteed to never
// conflict with the underlying cache, and it cannot have pruned entries (as it contains full)
// transactions. First checking the underlying cache risks returning a pruned entry instead.
CTransaction tx;
if (mempool.lookup(txid, tx)) {
coins = CCoins(tx, MEMPOOL_HEIGHT);
shared_ptr<const CTransaction> ptx = mempool.get(txid);
if (ptx) {
coins = CCoins(*ptx, MEMPOOL_HEIGHT);
return true;
}
return (base->GetCoins(txid, coins) && !coins.IsPruned());

29
src/txmempool.h

@ -7,6 +7,7 @@ @@ -7,6 +7,7 @@
#define BITCOIN_TXMEMPOOL_H
#include <list>
#include <memory>
#include <set>
#include "amount.h"
@ -75,7 +76,7 @@ class CTxMemPool; @@ -75,7 +76,7 @@ class CTxMemPool;
class CTxMemPoolEntry
{
private:
CTransaction tx;
std::shared_ptr<const CTransaction> tx;
CAmount nFee; //!< Cached to avoid expensive parent-transaction lookups
size_t nTxSize; //!< ... and avoid recomputing tx size
size_t nModSize; //!< ... and modified size for priority
@ -112,7 +113,8 @@ public: @@ -112,7 +113,8 @@ public:
unsigned int nSigOps, LockPoints lp);
CTxMemPoolEntry(const CTxMemPoolEntry& other);
const CTransaction& GetTx() const { return this->tx; }
const CTransaction& GetTx() const { return *this->tx; }
std::shared_ptr<const CTransaction> GetSharedTx() const { return this->tx; }
/**
* Fast calculation of lower bound of current priority as update
* from entry priority. Only inputs that were originally in-chain will age.
@ -307,6 +309,21 @@ struct ancestor_score {}; @@ -307,6 +309,21 @@ struct ancestor_score {};
class CBlockPolicyEstimator;
/**
* Information about a mempool transaction.
*/
struct TxMempoolInfo
{
/** The transaction itself */
std::shared_ptr<const CTransaction> tx;
/** Time the transaction entered the mempool. */
int64_t nTime;
/** Feerate of the transaction. */
CFeeRate feeRate;
};
/**
* CTxMemPool stores valid-according-to-the-current-best-chain
* transactions that may be included in the next block.
@ -464,6 +481,8 @@ private: @@ -464,6 +481,8 @@ private:
void UpdateParent(txiter entry, txiter parent, bool add);
void UpdateChild(txiter entry, txiter child, bool add);
std::vector<indexed_transaction_set::const_iterator> GetSortedDepthAndScore() const;
public:
indirectmap<COutPoint, const CTransaction*> mapNextTx;
std::map<uint256, std::pair<double, CAmount> > mapDeltas;
@ -588,9 +607,9 @@ public: @@ -588,9 +607,9 @@ public:
return (mapTx.count(hash) != 0);
}
bool lookup(uint256 hash, CTransaction& result) const;
bool lookup(uint256 hash, CTransaction& result, int64_t& time) const;
bool lookupFeeRate(const uint256& hash, CFeeRate& feeRate) const;
std::shared_ptr<const CTransaction> get(const uint256& hash) const;
TxMempoolInfo info(const uint256& hash) const;
std::vector<TxMempoolInfo> infoAll() const;
/** Estimate fee rate needed to get into the next nBlocks
* If no answer can be given at nBlocks, return an estimate

Loading…
Cancel
Save