Browse Source

Merge #10148: Use non-atomic flushing with block replay

176c021 [qa] Test non-atomic chainstate writes (Suhas Daftuar)
d6af06d Dont create pcoinsTip until after ReplayBlocks. (Matt Corallo)
eaca1b7 Random db flush crash simulator (Pieter Wuille)
0580ee0 Adapt memory usage estimation for flushing (Pieter Wuille)
013a56a Non-atomic flushing using the blockchain as replay journal (Pieter Wuille)
b3a279c [MOVEONLY] Move LastCommonAncestor to chain (Pieter Wuille)

Tree-SHA512: 47ccc62303f9075c44d2a914be75bd6969ff881a857a2ff1227f05ec7def6f4c71c46680c5a28cb150c814999526797dc05cf2701fde1369c06169f46eccddee
0.15
Wladimir J. van der Laan 8 years ago
parent
commit
d4e551adfe
No known key found for this signature in database
GPG Key ID: 1E4AED62986CD25D
  1. 2
      contrib/devtools/check-doc.py
  2. 19
      src/chain.cpp
  3. 3
      src/chain.h
  4. 9
      src/coins.cpp
  5. 13
      src/coins.h
  6. 11
      src/init.cpp
  7. 19
      src/net_processing.cpp
  8. 49
      src/txdb.cpp
  9. 7
      src/txdb.h
  10. 117
      src/validation.cpp
  11. 5
      src/validation.h
  12. 268
      test/functional/dbcrash.py
  13. 2
      test/functional/test_framework/util.py
  14. 1
      test/functional/test_runner.py

2
contrib/devtools/check-doc.py

@ -21,7 +21,7 @@ CMD_GREP_DOCS = r"egrep -r -I 'HelpMessageOpt\(\"\-[^\"=]+?(=|\")' %s" % (CMD_RO @@ -21,7 +21,7 @@ CMD_GREP_DOCS = r"egrep -r -I 'HelpMessageOpt\(\"\-[^\"=]+?(=|\")' %s" % (CMD_RO
REGEX_ARG = re.compile(r'(?:map(?:Multi)?Args(?:\.count\(|\[)|Get(?:Bool)?Arg\()\"(\-[^\"]+?)\"')
REGEX_DOC = re.compile(r'HelpMessageOpt\(\"(\-[^\"=]+?)(?:=|\")')
# list unsupported, deprecated and duplicate args as they need no documentation
SET_DOC_OPTIONAL = set(['-rpcssl', '-benchmark', '-h', '-help', '-socks', '-tor', '-debugnet', '-whitelistalwaysrelay', '-prematurewitness', '-walletprematurewitness', '-promiscuousmempoolflags', '-blockminsize'])
SET_DOC_OPTIONAL = set(['-rpcssl', '-benchmark', '-h', '-help', '-socks', '-tor', '-debugnet', '-whitelistalwaysrelay', '-prematurewitness', '-walletprematurewitness', '-promiscuousmempoolflags', '-blockminsize', '-dbcrashratio'])
def main():
used = check_output(CMD_GREP_ARGS, shell=True)

19
src/chain.cpp

@ -148,3 +148,22 @@ int64_t GetBlockProofEquivalentTime(const CBlockIndex& to, const CBlockIndex& fr @@ -148,3 +148,22 @@ int64_t GetBlockProofEquivalentTime(const CBlockIndex& to, const CBlockIndex& fr
}
return sign * r.GetLow64();
}
/** Find the last common ancestor two blocks have.
* Both pa and pb must be non-NULL. */
const CBlockIndex* LastCommonAncestor(const CBlockIndex* pa, const CBlockIndex* pb) {
if (pa->nHeight > pb->nHeight) {
pa = pa->GetAncestor(pb->nHeight);
} else if (pb->nHeight > pa->nHeight) {
pb = pb->GetAncestor(pa->nHeight);
}
while (pa != pb && pa && pb) {
pa = pa->pprev;
pb = pb->pprev;
}
// Eventually all chain branches meet at the genesis block.
assert(pa == pb);
return pa;
}

3
src/chain.h

@ -362,6 +362,9 @@ public: @@ -362,6 +362,9 @@ public:
arith_uint256 GetBlockProof(const CBlockIndex& block);
/** Return the time it would take to redo the work difference between from and to, assuming the current hashrate corresponds to the difficulty at tip, in seconds. */
int64_t GetBlockProofEquivalentTime(const CBlockIndex& to, const CBlockIndex& from, const CBlockIndex& tip, const Consensus::Params&);
/** Find the forking point between two chain tips. */
const CBlockIndex* LastCommonAncestor(const CBlockIndex* pa, const CBlockIndex* pb);
/** Used to marshal pointers into hashes for db storage. */
class CDiskBlockIndex : public CBlockIndex

9
src/coins.cpp

@ -12,6 +12,7 @@ @@ -12,6 +12,7 @@
bool CCoinsView::GetCoin(const COutPoint &outpoint, Coin &coin) const { return false; }
uint256 CCoinsView::GetBestBlock() const { return uint256(); }
std::vector<uint256> CCoinsView::GetHeadBlocks() const { return std::vector<uint256>(); }
bool CCoinsView::BatchWrite(CCoinsMap &mapCoins, const uint256 &hashBlock) { return false; }
CCoinsViewCursor *CCoinsView::Cursor() const { return 0; }
@ -25,6 +26,7 @@ CCoinsViewBacked::CCoinsViewBacked(CCoinsView *viewIn) : base(viewIn) { } @@ -25,6 +26,7 @@ CCoinsViewBacked::CCoinsViewBacked(CCoinsView *viewIn) : base(viewIn) { }
bool CCoinsViewBacked::GetCoin(const COutPoint &outpoint, Coin &coin) const { return base->GetCoin(outpoint, coin); }
bool CCoinsViewBacked::HaveCoin(const COutPoint &outpoint) const { return base->HaveCoin(outpoint); }
uint256 CCoinsViewBacked::GetBestBlock() const { return base->GetBestBlock(); }
std::vector<uint256> CCoinsViewBacked::GetHeadBlocks() const { return base->GetHeadBlocks(); }
void CCoinsViewBacked::SetBackend(CCoinsView &viewIn) { base = &viewIn; }
bool CCoinsViewBacked::BatchWrite(CCoinsMap &mapCoins, const uint256 &hashBlock) { return base->BatchWrite(mapCoins, hashBlock); }
CCoinsViewCursor *CCoinsViewBacked::Cursor() const { return base->Cursor(); }
@ -85,13 +87,14 @@ void CCoinsViewCache::AddCoin(const COutPoint &outpoint, Coin&& coin, bool possi @@ -85,13 +87,14 @@ void CCoinsViewCache::AddCoin(const COutPoint &outpoint, Coin&& coin, bool possi
cachedCoinsUsage += it->second.coin.DynamicMemoryUsage();
}
void AddCoins(CCoinsViewCache& cache, const CTransaction &tx, int nHeight) {
void AddCoins(CCoinsViewCache& cache, const CTransaction &tx, int nHeight, bool check) {
bool fCoinbase = tx.IsCoinBase();
const uint256& txid = tx.GetHash();
for (size_t i = 0; i < tx.vout.size(); ++i) {
// Pass fCoinbase as the possible_overwrite flag to AddCoin, in order to correctly
bool overwrite = check ? cache.HaveCoin(COutPoint(txid, i)) : fCoinbase;
// Always set the possible_overwrite flag to AddCoin for coinbase txn, in order to correctly
// deal with the pre-BIP30 occurrences of duplicate coinbase transactions.
cache.AddCoin(COutPoint(txid, i), Coin(tx.vout[i], nHeight, fCoinbase), fCoinbase);
cache.AddCoin(COutPoint(txid, i), Coin(tx.vout[i], nHeight, fCoinbase), overwrite);
}
}

13
src/coins.h

@ -157,6 +157,12 @@ public: @@ -157,6 +157,12 @@ public:
//! Retrieve the block hash whose state this CCoinsView currently represents
virtual uint256 GetBestBlock() const;
//! Retrieve the range of blocks that may have been only partially written.
//! If the database is in a consistent state, the result is the empty vector.
//! Otherwise, a two-element vector is returned consisting of the new and
//! the old block hash, in that order.
virtual std::vector<uint256> GetHeadBlocks() const;
//! Do a bulk modification (multiple Coin changes + BestBlock change).
//! The passed mapCoins can be modified.
virtual bool BatchWrite(CCoinsMap &mapCoins, const uint256 &hashBlock);
@ -183,6 +189,7 @@ public: @@ -183,6 +189,7 @@ public:
bool GetCoin(const COutPoint &outpoint, Coin &coin) const override;
bool HaveCoin(const COutPoint &outpoint) const override;
uint256 GetBestBlock() const override;
std::vector<uint256> GetHeadBlocks() const override;
void SetBackend(CCoinsView &viewIn);
bool BatchWrite(CCoinsMap &mapCoins, const uint256 &hashBlock) override;
CCoinsViewCursor *Cursor() const override;
@ -291,10 +298,12 @@ private: @@ -291,10 +298,12 @@ private:
};
//! Utility function to add all of a transaction's outputs to a cache.
// It assumes that overwrites are only possible for coinbase transactions,
// When check is false, this assumes that overwrites are only possible for coinbase transactions.
// When check is true, the underlying view may be queried to determine whether an addition is
// an overwrite.
// TODO: pass in a boolean to limit these possible overwrites to known
// (pre-BIP34) cases.
void AddCoins(CCoinsViewCache& cache, const CTransaction& tx, int nHeight);
void AddCoins(CCoinsViewCache& cache, const CTransaction& tx, int nHeight, bool check = false);
//! Utility function to find any unspent output with a given txid.
const Coin& AccessByTxid(const CCoinsViewCache& cache, const uint256& txid);

11
src/init.cpp

@ -336,6 +336,9 @@ std::string HelpMessage(HelpMessageMode mode) @@ -336,6 +336,9 @@ std::string HelpMessage(HelpMessageMode mode)
#endif
}
strUsage += HelpMessageOpt("-datadir=<dir>", _("Specify data directory"));
if (showDebug) {
strUsage += HelpMessageOpt("-dbbatchsize", strprintf("Maximum database write batch size in bytes (default: %u)", nDefaultDbBatchSize));
}
strUsage += HelpMessageOpt("-dbcache=<n>", strprintf(_("Set database cache size in megabytes (%d to %d, default: %d)"), nMinDbCache, nMaxDbCache, nDefaultDbCache));
if (showDebug)
strUsage += HelpMessageOpt("-feefilter", strprintf("Tell other nodes to filter invs to us by our mempool min fee (default: %u)", DEFAULT_FEEFILTER));
@ -1373,7 +1376,6 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) @@ -1373,7 +1376,6 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
pblocktree = new CBlockTreeDB(nBlockTreeDBCache, false, fReindex);
pcoinsdbview = new CCoinsViewDB(nCoinDBCache, false, fReindex || fReindexChainState);
pcoinscatcher = new CCoinsViewErrorCatcher(pcoinsdbview);
pcoinsTip = new CCoinsViewCache(pcoinscatcher);
if (fReindex) {
pblocktree->WriteReindexing(true);
@ -1417,6 +1419,13 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) @@ -1417,6 +1419,13 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
break;
}
if (!ReplayBlocks(chainparams, pcoinsdbview)) {
strLoadError = _("Unable to replay blocks. You will need to rebuild the database using -reindex-chainstate.");
break;
}
pcoinsTip = new CCoinsViewCache(pcoinscatcher);
LoadChainTip(chainparams);
if (!fReindex && chainActive.Tip() != NULL) {
uiInterface.InitMessage(_("Rewinding blocks..."));
if (!RewindBlockIndex(chainparams)) {

19
src/net_processing.cpp

@ -452,25 +452,6 @@ bool PeerHasHeader(CNodeState *state, const CBlockIndex *pindex) @@ -452,25 +452,6 @@ bool PeerHasHeader(CNodeState *state, const CBlockIndex *pindex)
return false;
}
/** Find the last common ancestor two blocks have.
* Both pa and pb must be non-NULL. */
const CBlockIndex* LastCommonAncestor(const CBlockIndex* pa, const CBlockIndex* pb) {
if (pa->nHeight > pb->nHeight) {
pa = pa->GetAncestor(pb->nHeight);
} else if (pb->nHeight > pa->nHeight) {
pb = pb->GetAncestor(pa->nHeight);
}
while (pa != pb && pa && pb) {
pa = pa->pprev;
pb = pb->pprev;
}
// Eventually all chain branches meet at the genesis block.
assert(pa == pb);
return pa;
}
/** Update pindexLastCommonBlock and add not-in-flight missing successors to vBlocks, until it has
* at most count entries. */
void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, NodeId& nodeStaller, const Consensus::Params& consensusParams) {

49
src/txdb.cpp

@ -7,8 +7,10 @@ @@ -7,8 +7,10 @@
#include "chainparams.h"
#include "hash.h"
#include "random.h"
#include "pow.h"
#include "uint256.h"
#include "util.h"
#include <stdint.h>
@ -21,6 +23,7 @@ static const char DB_TXINDEX = 't'; @@ -21,6 +23,7 @@ static const char DB_TXINDEX = 't';
static const char DB_BLOCK_INDEX = 'b';
static const char DB_BEST_BLOCK = 'B';
static const char DB_HEAD_BLOCKS = 'H';
static const char DB_FLAG = 'F';
static const char DB_REINDEX_FLAG = 'R';
static const char DB_LAST_BLOCK = 'l';
@ -68,10 +71,39 @@ uint256 CCoinsViewDB::GetBestBlock() const { @@ -68,10 +71,39 @@ uint256 CCoinsViewDB::GetBestBlock() const {
return hashBestChain;
}
std::vector<uint256> CCoinsViewDB::GetHeadBlocks() const {
std::vector<uint256> vhashHeadBlocks;
if (!db.Read(DB_HEAD_BLOCKS, vhashHeadBlocks)) {
return std::vector<uint256>();
}
return vhashHeadBlocks;
}
bool CCoinsViewDB::BatchWrite(CCoinsMap &mapCoins, const uint256 &hashBlock) {
CDBBatch batch(db);
size_t count = 0;
size_t changed = 0;
size_t batch_size = (size_t)GetArg("-dbbatchsize", nDefaultDbBatchSize);
int crash_simulate = GetArg("-dbcrashratio", 0);
assert(!hashBlock.IsNull());
uint256 old_tip = GetBestBlock();
if (old_tip.IsNull()) {
// We may be in the middle of replaying.
std::vector<uint256> old_heads = GetHeadBlocks();
if (old_heads.size() == 2) {
assert(old_heads[0] == hashBlock);
old_tip = old_heads[1];
}
}
// In the first batch, mark the database as being in the middle of a
// transition from old_tip to hashBlock.
// A vector is used for future extensibility, as we may want to support
// interrupting after partial writes from multiple independent reorgs.
batch.Erase(DB_BEST_BLOCK);
batch.Write(DB_HEAD_BLOCKS, std::vector<uint256>{hashBlock, old_tip});
for (CCoinsMap::iterator it = mapCoins.begin(); it != mapCoins.end();) {
if (it->second.flags & CCoinsCacheEntry::DIRTY) {
CoinEntry entry(&it->first);
@ -84,10 +116,25 @@ bool CCoinsViewDB::BatchWrite(CCoinsMap &mapCoins, const uint256 &hashBlock) { @@ -84,10 +116,25 @@ bool CCoinsViewDB::BatchWrite(CCoinsMap &mapCoins, const uint256 &hashBlock) {
count++;
CCoinsMap::iterator itOld = it++;
mapCoins.erase(itOld);
if (batch.SizeEstimate() > batch_size) {
LogPrint(BCLog::COINDB, "Writing partial batch of %.2f MiB\n", batch.SizeEstimate() * (1.0 / 1048576.0));
db.WriteBatch(batch);
batch.Clear();
if (crash_simulate) {
static FastRandomContext rng;
if (rng.randrange(crash_simulate) == 0) {
LogPrintf("Simulating a crash. Goodbye.\n");
_Exit(0);
}
}
if (!hashBlock.IsNull())
}
}
// In the last batch, mark the database as consistent with hashBlock again.
batch.Erase(DB_HEAD_BLOCKS);
batch.Write(DB_BEST_BLOCK, hashBlock);
LogPrint(BCLog::COINDB, "Writing final batch of %.2f MiB\n", batch.SizeEstimate() * (1.0 / 1048576.0));
bool ret = db.WriteBatch(batch);
LogPrint(BCLog::COINDB, "Committed %u changed transaction outputs (out of %u) to coin database...\n", (unsigned int)changed, (unsigned int)count);
return ret;

7
src/txdb.h

@ -19,12 +19,12 @@ class CBlockIndex; @@ -19,12 +19,12 @@ class CBlockIndex;
class CCoinsViewDBCursor;
class uint256;
//! Compensate for extra memory peak (x1.5-x1.9) at flush time.
static constexpr int DB_PEAK_USAGE_FACTOR = 2;
//! No need to periodic flush if at least this much space still available.
static constexpr int MAX_BLOCK_COINSDB_USAGE = 10 * DB_PEAK_USAGE_FACTOR;
static constexpr int MAX_BLOCK_COINSDB_USAGE = 10;
//! -dbcache default (MiB)
static const int64_t nDefaultDbCache = 450;
//! -dbbatchsize default (bytes)
static const int64_t nDefaultDbBatchSize = 16 << 20;
//! max. -dbcache (MiB)
static const int64_t nMaxDbCache = sizeof(void*) > 4 ? 16384 : 1024;
//! min. -dbcache (MiB)
@ -74,6 +74,7 @@ public: @@ -74,6 +74,7 @@ public:
bool GetCoin(const COutPoint &outpoint, Coin &coin) const override;
bool HaveCoin(const COutPoint &outpoint) const override;
uint256 GetBestBlock() const override;
std::vector<uint256> GetHeadBlocks() const override;
bool BatchWrite(CCoinsMap &mapCoins, const uint256 &hashBlock) override;
CCoinsViewCursor *Cursor() const override;

117
src/validation.cpp

@ -96,7 +96,7 @@ namespace { @@ -96,7 +96,7 @@ namespace {
struct CBlockIndexWorkComparator
{
bool operator()(CBlockIndex *pa, CBlockIndex *pb) const {
bool operator()(const CBlockIndex *pa, const CBlockIndex *pb) const {
// First sort by most total work, ...
if (pa->nChainWork > pb->nChainWork) return false;
if (pa->nChainWork < pb->nChainWork) return true;
@ -1331,17 +1331,19 @@ int ApplyTxInUndo(Coin&& undo, CCoinsViewCache& view, const COutPoint& out) @@ -1331,17 +1331,19 @@ int ApplyTxInUndo(Coin&& undo, CCoinsViewCache& view, const COutPoint& out)
return DISCONNECT_FAILED; // adding output for transaction without known metadata
}
}
view.AddCoin(out, std::move(undo), undo.fCoinBase);
// The potential_overwrite parameter to AddCoin is only allowed to be false if we know for
// sure that the coin did not already exist in the cache. As we have queried for that above
// using HaveCoin, we don't need to guess. When fClean is false, a coin already existed and
// it is an overwrite.
view.AddCoin(out, std::move(undo), !fClean);
return fClean ? DISCONNECT_OK : DISCONNECT_UNCLEAN;
}
/** Undo the effects of this block (with given index) on the UTXO set represented by coins.
* When UNCLEAN or FAILED is returned, view is left in an indeterminate state. */
* When FAILED is returned, view is left in an indeterminate state. */
static DisconnectResult DisconnectBlock(const CBlock& block, const CBlockIndex* pindex, CCoinsViewCache& view)
{
assert(pindex->GetBlockHash() == view.GetBestBlock());
bool fClean = true;
CBlockUndo blockUndo;
@ -1779,7 +1781,7 @@ bool static FlushStateToDisk(const CChainParams& chainparams, CValidationState & @@ -1779,7 +1781,7 @@ bool static FlushStateToDisk(const CChainParams& chainparams, CValidationState &
nLastSetChain = nNow;
}
int64_t nMempoolSizeMax = GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000;
int64_t cacheSize = pcoinsTip->DynamicMemoryUsage() * DB_PEAK_USAGE_FACTOR;
int64_t cacheSize = pcoinsTip->DynamicMemoryUsage();
int64_t nTotalSpace = nCoinCacheUsage + std::max<int64_t>(nMempoolSizeMax - nMempoolUsage, 0);
// The cache is large and we're within 10% and 10 MiB of the limit, but we have time now (not in the middle of a block processing).
bool fCacheLarge = mode == FLUSH_STATE_PERIODIC && cacheSize > std::max((9 * nTotalSpace) / 10, nTotalSpace - MAX_BLOCK_COINSDB_USAGE * 1024 * 1024);
@ -1946,6 +1948,7 @@ bool static DisconnectTip(CValidationState& state, const CChainParams& chainpara @@ -1946,6 +1948,7 @@ bool static DisconnectTip(CValidationState& state, const CChainParams& chainpara
int64_t nStart = GetTimeMicros();
{
CCoinsViewCache view(pcoinsTip);
assert(view.GetBestBlock() == pindexDelete->GetBlockHash());
if (DisconnectBlock(block, pindexDelete, view) != DISCONNECT_OK)
return error("DisconnectTip(): DisconnectBlock %s failed", pindexDelete->GetBlockHash().ToString());
bool flushed = view.Flush();
@ -3417,20 +3420,25 @@ bool static LoadBlockIndexDB(const CChainParams& chainparams) @@ -3417,20 +3420,25 @@ bool static LoadBlockIndexDB(const CChainParams& chainparams)
pblocktree->ReadFlag("txindex", fTxIndex);
LogPrintf("%s: transaction index %s\n", __func__, fTxIndex ? "enabled" : "disabled");
return true;
}
void LoadChainTip(const CChainParams& chainparams)
{
if (chainActive.Tip() && chainActive.Tip()->GetBlockHash() == pcoinsTip->GetBestBlock()) return;
// Load pointer to end of best chain
BlockMap::iterator it = mapBlockIndex.find(pcoinsTip->GetBestBlock());
if (it == mapBlockIndex.end())
return true;
return;
chainActive.SetTip(it->second);
PruneBlockIndexCandidates();
LogPrintf("%s: hashBestChain=%s height=%d date=%s progress=%f\n", __func__,
LogPrintf("Loaded best chain: hashBestChain=%s height=%d date=%s progress=%f\n",
chainActive.Tip()->GetBlockHash().ToString(), chainActive.Height(),
DateTimeStrFormat("%Y-%m-%d %H:%M:%S", chainActive.Tip()->GetBlockTime()),
GuessVerificationProgress(chainparams.TxData(), chainActive.Tip()));
return true;
}
CVerifyDB::CVerifyDB()
@ -3499,6 +3507,7 @@ bool CVerifyDB::VerifyDB(const CChainParams& chainparams, CCoinsView *coinsview, @@ -3499,6 +3507,7 @@ bool CVerifyDB::VerifyDB(const CChainParams& chainparams, CCoinsView *coinsview,
}
// check level 3: check for inconsistencies during memory-only disconnect of tip blocks
if (nCheckLevel >= 3 && pindex == pindexState && (coins.DynamicMemoryUsage() + pcoinsTip->DynamicMemoryUsage()) <= nCoinCacheUsage) {
assert(coins.GetBestBlock() == pindex->GetBlockHash());
DisconnectResult res = DisconnectBlock(block, pindex, coins);
if (res == DISCONNECT_FAILED) {
return error("VerifyDB(): *** irrecoverable inconsistency in block data at %d, hash=%s", pindex->nHeight, pindex->GetBlockHash().ToString());
@ -3538,6 +3547,92 @@ bool CVerifyDB::VerifyDB(const CChainParams& chainparams, CCoinsView *coinsview, @@ -3538,6 +3547,92 @@ bool CVerifyDB::VerifyDB(const CChainParams& chainparams, CCoinsView *coinsview,
return true;
}
/** Apply the effects of a block on the utxo cache, ignoring that it may already have been applied. */
static bool RollforwardBlock(const CBlockIndex* pindex, CCoinsViewCache& inputs, const CChainParams& params)
{
// TODO: merge with ConnectBlock
CBlock block;
if (!ReadBlockFromDisk(block, pindex, params.GetConsensus())) {
return error("ReplayBlock(): ReadBlockFromDisk failed at %d, hash=%s", pindex->nHeight, pindex->GetBlockHash().ToString());
}
for (const CTransactionRef& tx : block.vtx) {
if (!tx->IsCoinBase()) {
for (const CTxIn &txin : tx->vin) {
inputs.SpendCoin(txin.prevout);
}
}
// Pass check = true as every addition may be an overwrite.
AddCoins(inputs, *tx, pindex->nHeight, true);
}
return true;
}
bool ReplayBlocks(const CChainParams& params, CCoinsView* view)
{
LOCK(cs_main);
CCoinsViewCache cache(view);
std::vector<uint256> hashHeads = view->GetHeadBlocks();
if (hashHeads.empty()) return true; // We're already in a consistent state.
if (hashHeads.size() != 2) return error("ReplayBlocks(): unknown inconsistent state");
uiInterface.ShowProgress(_("Replaying blocks..."), 0);
LogPrintf("Replaying blocks\n");
const CBlockIndex* pindexOld = nullptr; // Old tip during the interrupted flush.
const CBlockIndex* pindexNew; // New tip during the interrupted flush.
const CBlockIndex* pindexFork = nullptr; // Latest block common to both the old and the new tip.
if (mapBlockIndex.count(hashHeads[0]) == 0) {
return error("ReplayBlocks(): reorganization to unknown block requested");
}
pindexNew = mapBlockIndex[hashHeads[0]];
if (!hashHeads[1].IsNull()) { // The old tip is allowed to be 0, indicating it's the first flush.
if (mapBlockIndex.count(hashHeads[1]) == 0) {
return error("ReplayBlocks(): reorganization from unknown block requested");
}
pindexOld = mapBlockIndex[hashHeads[1]];
pindexFork = LastCommonAncestor(pindexOld, pindexNew);
assert(pindexFork != nullptr);
}
// Rollback along the old branch.
while (pindexOld != pindexFork) {
if (pindexOld->nHeight > 0) { // Never disconnect the genesis block.
CBlock block;
if (!ReadBlockFromDisk(block, pindexOld, params.GetConsensus())) {
return error("RollbackBlock(): ReadBlockFromDisk() failed at %d, hash=%s", pindexOld->nHeight, pindexOld->GetBlockHash().ToString());
}
LogPrintf("Rolling back %s (%i)\n", pindexOld->GetBlockHash().ToString(), pindexOld->nHeight);
DisconnectResult res = DisconnectBlock(block, pindexOld, cache);
if (res == DISCONNECT_FAILED) {
return error("RollbackBlock(): DisconnectBlock failed at %d, hash=%s", pindexOld->nHeight, pindexOld->GetBlockHash().ToString());
}
// If DISCONNECT_UNCLEAN is returned, it means a non-existing UTXO was deleted, or an existing UTXO was
// overwritten. It corresponds to cases where the block-to-be-disconnect never had all its operations
// applied to the UTXO set. However, as both writing a UTXO and deleting a UTXO are idempotent operations,
// the result is still a version of the UTXO set with the effects of that block undone.
}
pindexOld = pindexOld->pprev;
}
// Roll forward from the forking point to the new tip.
int nForkHeight = pindexFork ? pindexFork->nHeight : 0;
for (int nHeight = nForkHeight + 1; nHeight <= pindexNew->nHeight; ++nHeight) {
const CBlockIndex* pindex = pindexNew->GetAncestor(nHeight);
LogPrintf("Rolling forward %s (%i)\n", pindex->GetBlockHash().ToString(), nHeight);
if (!RollforwardBlock(pindex, cache, params)) return false;
}
cache.SetBestBlock(pindexNew->GetBlockHash());
cache.Flush();
uiInterface.ShowProgress("", 100);
return true;
}
bool RewindBlockIndex(const CChainParams& params)
{
LOCK(cs_main);
@ -3687,8 +3782,6 @@ bool InitBlockIndex(const CChainParams& chainparams) @@ -3687,8 +3782,6 @@ bool InitBlockIndex(const CChainParams& chainparams)
CBlockIndex *pindex = AddToBlockIndex(block);
if (!ReceivedBlockTransactions(block, state, pindex, blockPos, chainparams.GetConsensus()))
return error("LoadBlockIndex(): genesis block not accepted");
// Force a chainstate write so that when we VerifyDB in a moment, it doesn't check stale data
return FlushStateToDisk(chainparams, state, FLUSH_STATE_ALWAYS);
} catch (const std::runtime_error& e) {
return error("LoadBlockIndex(): failed to initialize block database: %s", e.what());
}

5
src/validation.h

@ -260,6 +260,8 @@ bool LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, CDiskB @@ -260,6 +260,8 @@ bool LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, CDiskB
bool InitBlockIndex(const CChainParams& chainparams);
/** Load the block tree and coins database from disk */
bool LoadBlockIndex(const CChainParams& chainparams);
/** Update the chain tip based on database information. */
void LoadChainTip(const CChainParams& chainparams);
/** Unload database information */
void UnloadBlockIndex();
/** Run an instance of the script checking thread */
@ -424,6 +426,9 @@ public: @@ -424,6 +426,9 @@ public:
bool VerifyDB(const CChainParams& chainparams, CCoinsView *coinsview, int nCheckLevel, int nCheckDepth);
};
/** Replay blocks that aren't fully applied to the database. */
bool ReplayBlocks(const CChainParams& params, CCoinsView* view);
/** Find the last common block between the parameter chain and a locator. */
CBlockIndex* FindForkInGlobalIndex(const CChain& chain, const CBlockLocator& locator);

268
test/functional/dbcrash.py

@ -0,0 +1,268 @@ @@ -0,0 +1,268 @@
#!/usr/bin/env python3
# Copyright (c) 2017 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test recovery from a crash during chainstate writing."""
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import *
from test_framework.script import *
from test_framework.mininode import *
import random
try:
import http.client as httplib
except ImportError:
import httplib
import errno
'''
Test structure:
- 4 nodes
* node0, node1, and node2 will have different dbcrash ratios, and different
dbcache sizes
* node3 will be a regular node, with no crashing.
* The nodes will not connect to each other.
- use default test framework starting chain. initialize starting_tip_height to
tip height.
- Main loop:
* generate lots of transactions on node3, enough to fill up a block.
* uniformly randomly pick a tip height from starting_tip_height to
tip_height; with probability 1/(height_difference+4), invalidate this block.
* mine enough blocks to overtake tip_height at start of loop.
* for each node in [node0,node1,node2]:
- for each mined block:
* submit block to node
* if node crashed on/after submitting:
- restart until recovery succeeds
- check that utxo matches node3 using gettxoutsetinfo
'''
class ChainstateWriteCrashTest(BitcoinTestFramework):
def __init__(self):
super().__init__()
self.num_nodes = 4
self.setup_clean_chain = False
# Set -maxmempool=0 to turn off mempool memory sharing with dbcache
# Set -rpcservertimeout=900 to reduce socket disconnects in this
# long-running test
self.base_args = ["-limitdescendantsize=0", "-maxmempool=0", "-rpcservertimeout=900"]
# Set different crash ratios and cache sizes. Note that not all of
# -dbcache goes to pcoinsTip.
self.node0_args = ["-dbcrashratio=8", "-dbcache=4", "-dbbatchsize=200000"] + self.base_args
self.node1_args = ["-dbcrashratio=16", "-dbcache=8", "-dbbatchsize=200000"] + self.base_args
self.node2_args = ["-dbcrashratio=24", "-dbcache=16", "-dbbatchsize=200000"] + self.base_args
# Node3 is a normal node with default args, except will mine full blocks
self.node3_args = ["-blockmaxweight=4000000"]
self.extra_args = [self.node0_args, self.node1_args, self.node2_args, self.node3_args]
# We'll track some test coverage statistics
self.restart_counts = [0, 0, 0] # Track the restarts for nodes 0-2
self.crashed_on_restart = 0 # Track count of crashes during recovery
def setup_network(self):
self.setup_nodes()
# Leave them unconnected, we'll use submitblock directly in this test
# Starts up a given node id, waits for the tip to reach the given block
# hash, and calculates the utxo hash. Exceptions on startup should
# indicate node crash (due to -dbcrashratio), in which case we try again.
# Give up after 60 seconds.
# Returns the utxo hash of the given node.
def restart_node(self, node_index, expected_tip):
time_start = time.time()
while time.time() - time_start < 60:
try:
# Any of these RPC calls could throw due to node crash
self.nodes[node_index] = self.start_node(node_index, self.options.tmpdir, self.extra_args[node_index])
self.nodes[node_index].waitforblock(expected_tip)
utxo_hash = self.nodes[node_index].gettxoutsetinfo()['hash_serialized_2']
return utxo_hash
except:
# An exception here should mean the node is about to crash.
# If bitcoind exits, then try again. wait_for_node_exit()
# should raise an exception if bitcoind doesn't exit.
wait_for_node_exit(node_index, timeout=10)
self.crashed_on_restart += 1
time.sleep(1)
# If we got here, bitcoind isn't coming back up on restart. Could be a
# bug in bitcoind, or we've gotten unlucky with our dbcrash ratio --
# perhaps we generated a test case that blew up our cache?
# TODO: If this happens a lot, we should try to restart without -dbcrashratio
# and make sure that recovery happens.
raise AssertionError("Unable to successfully restart node %d in allotted time", node_index)
# Try submitting a block to the given node.
# Catch any exceptions that indicate the node has crashed.
# Returns true if the block was submitted successfully; false otherwise.
def submit_block_catch_error(self, node_index, block):
try:
self.nodes[node_index].submitblock(block)
return True
except (httplib.CannotSendRequest, httplib.RemoteDisconnected) as e:
self.log.debug("node %d submitblock raised exception: %s", node_index, e)
return False
except OSError as e:
self.log.debug("node %d submitblock raised OSError exception: errno=%s", node_index, e.errno)
if e.errno in [errno.EPIPE, errno.ECONNREFUSED, errno.ECONNRESET]:
# The node has likely crashed
return False
else:
# Unexpected exception, raise
raise
# Use submitblock to sync node3's chain with the other nodes
# If submitblock fails, restart the node and get the new utxo hash.
def sync_node3blocks(self, block_hashes):
# If any nodes crash while updating, we'll compare utxo hashes to
# ensure recovery was successful.
node3_utxo_hash = self.nodes[3].gettxoutsetinfo()['hash_serialized_2']
# Retrieve all the blocks from node3
blocks = []
for block_hash in block_hashes:
blocks.append([block_hash, self.nodes[3].getblock(block_hash, 0)])
# Deliver each block to each other node
for i in range(3):
nodei_utxo_hash = None
self.log.debug("Syncing blocks to node %d", i)
for (block_hash, block) in blocks:
# Get the block from node3, and submit to node_i
self.log.debug("submitting block %s", block_hash)
if not self.submit_block_catch_error(i, block):
# TODO: more carefully check that the crash is due to -dbcrashratio
# (change the exit code perhaps, and check that here?)
wait_for_node_exit(i, timeout=30)
self.log.debug("Restarting node %d after block hash %s", i, block_hash)
nodei_utxo_hash = self.restart_node(i, block_hash)
assert nodei_utxo_hash is not None
self.restart_counts[i] += 1
else:
# Clear it out after successful submitblock calls -- the cached
# utxo hash will no longer be correct
nodei_utxo_hash = None
# Check that the utxo hash matches node3's utxo set
# NOTE: we only check the utxo set if we had to restart the node
# after the last block submitted:
# - checking the utxo hash causes a cache flush, which we don't
# want to do every time; so
# - we only update the utxo cache after a node restart, since flushing
# the cache is a no-op at that point
if nodei_utxo_hash is not None:
self.log.debug("Checking txoutsetinfo matches for node %d", i)
assert_equal(nodei_utxo_hash, node3_utxo_hash)
# Verify that the utxo hash of each node matches node3.
# Restart any nodes that crash while querying.
def verify_utxo_hash(self):
node3_utxo_hash = self.nodes[3].gettxoutsetinfo()['hash_serialized_2']
self.log.info("Verifying utxo hash matches for all nodes")
for i in range(3):
try:
nodei_utxo_hash = self.nodes[i].gettxoutsetinfo()['hash_serialized_2']
except OSError:
# probably a crash on db flushing
nodei_utxo_hash = self.restart_node(i, self.nodes[3].getbestblockhash())
assert_equal(nodei_utxo_hash, node3_utxo_hash)
def generate_small_transactions(self, node, count, utxo_list):
FEE = 1000 # TODO: replace this with node relay fee based calculation
num_transactions = 0
random.shuffle(utxo_list)
while len(utxo_list) >= 2 and num_transactions < count:
tx = CTransaction()
input_amount = 0
for i in range(2):
utxo = utxo_list.pop()
tx.vin.append(CTxIn(COutPoint(int(utxo['txid'], 16), utxo['vout'])))
input_amount += int(utxo['amount']*COIN)
output_amount = (input_amount - FEE)//3
if output_amount <= 0:
# Sanity check -- if we chose inputs that are too small, skip
continue
for i in range(3):
tx.vout.append(CTxOut(output_amount, hex_str_to_bytes(utxo['scriptPubKey'])))
# Sign and send the transaction to get into the mempool
tx_signed_hex = node.signrawtransaction(ToHex(tx))['hex']
node.sendrawtransaction(tx_signed_hex)
num_transactions += 1
def run_test(self):
# Start by creating a lot of utxos on node3
initial_height = self.nodes[3].getblockcount()
utxo_list = create_confirmed_utxos(self.nodes[3].getnetworkinfo()['relayfee'], self.nodes[3], 5000)
self.log.info("Prepped %d utxo entries", len(utxo_list))
# Sync these blocks with the other nodes
block_hashes_to_sync = []
for height in range(initial_height+1, self.nodes[3].getblockcount()+1):
block_hashes_to_sync.append(self.nodes[3].getblockhash(height))
self.log.debug("Syncing %d blocks with other nodes", len(block_hashes_to_sync))
# Syncing the blocks could cause nodes to crash, so the test begins here.
self.sync_node3blocks(block_hashes_to_sync)
starting_tip_height = self.nodes[3].getblockcount()
# Main test loop:
# each time through the loop, generate a bunch of transactions,
# and then either mine a single new block on the tip, or some-sized reorg.
for i in range(40):
self.log.info("Iteration %d, generating 2500 transactions %s", i, self.restart_counts)
# Generate a bunch of small-ish transactions
self.generate_small_transactions(self.nodes[3], 2500, utxo_list)
# Pick a random block between current tip, and starting tip
current_height = self.nodes[3].getblockcount()
random_height = random.randint(starting_tip_height, current_height)
self.log.debug("At height %d, considering height %d", current_height, random_height)
if random_height > starting_tip_height:
# Randomly reorg from this point with some probability (1/4 for
# tip, 1/5 for tip-1, ...)
if random.random() < 1.0/(current_height + 4 - random_height):
self.log.debug("Invalidating block at height %d", random_height)
self.nodes[3].invalidateblock(self.nodes[3].getblockhash(random_height))
# Now generate new blocks until we pass the old tip height
self.log.debug("Mining longer tip")
block_hashes = self.nodes[3].generate(current_height+1-self.nodes[3].getblockcount())
self.log.debug("Syncing %d new blocks...", len(block_hashes))
self.sync_node3blocks(block_hashes)
utxo_list = self.nodes[3].listunspent()
self.log.debug("Node3 utxo count: %d", len(utxo_list))
# Check that the utxo hashes agree with node3
# Useful side effect: each utxo cache gets flushed here, so that we
# won't get crashes on shutdown at the end of the test.
self.verify_utxo_hash()
# Check the test coverage
self.log.info("Restarted nodes: %s; crashes on restart: %d", self.restart_counts, self.crashed_on_restart)
# If no nodes were restarted, we didn't test anything.
assert self.restart_counts != [0, 0, 0]
# Make sure we tested the case of crash-during-recovery.
assert self.crashed_on_restart > 0
# Warn if any of the nodes escaped restart.
for i in range(3):
if self.restart_counts[i] == 0:
self.log.warn("Node %d never crashed during utxo flush!", i)
if __name__ == "__main__":
ChainstateWriteCrashTest().main()

2
test/functional/test_framework/util.py

@ -249,6 +249,8 @@ def wait_for_bitcoind_start(process, datadir, i, rpchost=None): @@ -249,6 +249,8 @@ def wait_for_bitcoind_start(process, datadir, i, rpchost=None):
raise
time.sleep(0.25)
def wait_for_node_exit(node_index, timeout):
bitcoind_processes[node_index].wait(timeout)
def _start_node(i, dirname, extra_args=None, rpchost=None, timewait=None, binary=None, stderr=None):
"""Start a bitcoind and return RPC connection to it

1
test/functional/test_runner.py

@ -125,6 +125,7 @@ EXTENDED_SCRIPTS = [ @@ -125,6 +125,7 @@ EXTENDED_SCRIPTS = [
# vv Tests less than 5m vv
'maxuploadtarget.py',
'mempool_packages.py',
'dbcrash.py',
# vv Tests less than 2m vv
'bip68-sequence.py',
'getblocktemplate_longpoll.py',

Loading…
Cancel
Save