Browse Source

Merge pull request #5241

a206950 Introduce separate flushing modes (Pieter Wuille)
51ce901 Improve chainstate/blockindex disk writing policy (Pieter Wuille)
0.10
Wladimir J. van der Laan 10 years ago
parent
commit
397b9011c9
No known key found for this signature in database
GPG Key ID: 74810B012346C9A6
  1. 11
      src/init.cpp
  2. 113
      src/main.cpp
  3. 4
      src/main.h
  4. 2
      src/rpcblockchain.cpp

11
src/init.cpp

@ -150,14 +150,9 @@ void Shutdown()
{ {
LOCK(cs_main); LOCK(cs_main);
#ifdef ENABLE_WALLET if (pcoinsTip != NULL) {
if (pwalletMain) FlushStateToDisk();
pwalletMain->SetBestChain(chainActive.GetLocator()); }
#endif
if (pblocktree)
pblocktree->Flush();
if (pcoinsTip)
pcoinsTip->Flush();
delete pcoinsTip; delete pcoinsTip;
pcoinsTip = NULL; pcoinsTip = NULL;
delete pcoinsdbview; delete pcoinsdbview;

113
src/main.cpp

@ -130,6 +130,12 @@ namespace {
// Number of preferrable block download peers. // Number of preferrable block download peers.
int nPreferredDownload = 0; int nPreferredDownload = 0;
// Dirty block index entries.
set<CBlockIndex*> setDirtyBlockIndex;
// Dirty block file entries.
set<int> setDirtyFileInfo;
} // anon namespace } // anon namespace
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -1137,11 +1143,6 @@ bool WriteBlockToDisk(CBlock& block, CDiskBlockPos& pos)
pos.nPos = (unsigned int)fileOutPos; pos.nPos = (unsigned int)fileOutPos;
fileout << block; fileout << block;
// Flush stdio buffers and commit to disk before returning
fflush(fileout.Get());
if (!IsInitialBlockDownload())
FileCommit(fileout.Get());
return true; return true;
} }
@ -1335,7 +1336,7 @@ void static InvalidBlockFound(CBlockIndex *pindex, const CValidationState &state
} }
if (!state.CorruptionPossible()) { if (!state.CorruptionPossible()) {
pindex->nStatus |= BLOCK_FAILED_VALID; pindex->nStatus |= BLOCK_FAILED_VALID;
pblocktree->WriteBlockIndex(CDiskBlockIndex(pindex)); setDirtyBlockIndex.insert(pindex);
setBlockIndexCandidates.erase(pindex); setBlockIndexCandidates.erase(pindex);
InvalidChainFound(pindex); InvalidChainFound(pindex);
} }
@ -1732,10 +1733,7 @@ bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pin
} }
pindex->RaiseValidity(BLOCK_VALID_SCRIPTS); pindex->RaiseValidity(BLOCK_VALID_SCRIPTS);
setDirtyBlockIndex.insert(pindex);
CDiskBlockIndex blockindex(pindex);
if (!pblocktree->WriteBlockIndex(blockindex))
return state.Abort("Failed to write block index");
} }
if (fTxIndex) if (fTxIndex)
@ -1759,10 +1757,23 @@ bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pin
return true; return true;
} }
// Update the on-disk chain state. enum FlushStateMode {
bool static WriteChainState(CValidationState &state, bool forceWrite=false) { FLUSH_STATE_IF_NEEDED,
FLUSH_STATE_PERIODIC,
FLUSH_STATE_ALWAYS
};
/**
* Update the on-disk chain state.
* The caches and indexes are flushed if either they're too large, forceWrite is set, or
* fast is not set and it's been a while since the last write.
*/
bool static FlushStateToDisk(CValidationState &state, FlushStateMode mode) {
LOCK(cs_main);
static int64_t nLastWrite = 0; static int64_t nLastWrite = 0;
if (forceWrite || pcoinsTip->GetCacheSize() > nCoinCacheSize || (!IsInitialBlockDownload() && GetTimeMicros() > nLastWrite + 600*1000000)) { if ((mode == FLUSH_STATE_ALWAYS) ||
((mode == FLUSH_STATE_PERIODIC || mode == FLUSH_STATE_IF_NEEDED) && pcoinsTip->GetCacheSize() > nCoinCacheSize) ||
(mode == FLUSH_STATE_PERIODIC && GetTimeMicros() > nLastWrite + DATABASE_WRITE_INTERVAL * 1000000)) {
// Typical CCoins structures on disk are around 100 bytes in size. // Typical CCoins structures on disk are around 100 bytes in size.
// Pushing a new one to the database can cause it to be written // Pushing a new one to the database can cause it to be written
// twice (once in the log, and once in the tables). This is already // twice (once in the log, and once in the tables). This is already
@ -1770,15 +1781,44 @@ bool static WriteChainState(CValidationState &state, bool forceWrite=false) {
// overwrite one. Still, use a conservative safety factor of 2. // overwrite one. Still, use a conservative safety factor of 2.
if (!CheckDiskSpace(100 * 2 * 2 * pcoinsTip->GetCacheSize())) if (!CheckDiskSpace(100 * 2 * 2 * pcoinsTip->GetCacheSize()))
return state.Error("out of disk space"); return state.Error("out of disk space");
// First make sure all block and undo data is flushed to disk.
FlushBlockFile(); FlushBlockFile();
// Then update all block file information (which may refer to block and undo files).
bool fileschanged = false;
for (set<int>::iterator it = setDirtyFileInfo.begin(); it != setDirtyFileInfo.end(); ) {
if (!pblocktree->WriteBlockFileInfo(*it, vinfoBlockFile[*it])) {
return state.Abort("Failed to write to block index");
}
fileschanged = true;
setDirtyFileInfo.erase(it++);
}
if (fileschanged && !pblocktree->WriteLastBlockFile(nLastBlockFile)) {
return state.Abort("Failed to write to block index");
}
for (set<CBlockIndex*>::iterator it = setDirtyBlockIndex.begin(); it != setDirtyBlockIndex.end(); ) {
if (!pblocktree->WriteBlockIndex(CDiskBlockIndex(*it))) {
return state.Abort("Failed to write to block index");
}
setDirtyBlockIndex.erase(it++);
}
pblocktree->Sync(); pblocktree->Sync();
// Finally flush the chainstate (which may refer to block index entries).
if (!pcoinsTip->Flush()) if (!pcoinsTip->Flush())
return state.Abort("Failed to write to coin database"); return state.Abort("Failed to write to coin database");
// Update best block in wallet (so we can detect restored wallets).
if (mode != FLUSH_STATE_IF_NEEDED) {
g_signals.SetBestChain(chainActive.GetLocator());
}
nLastWrite = GetTimeMicros(); nLastWrite = GetTimeMicros();
} }
return true; return true;
} }
void FlushStateToDisk() {
CValidationState state;
FlushStateToDisk(state, FLUSH_STATE_ALWAYS);
}
// Update chainActive and related internal data structures. // Update chainActive and related internal data structures.
void static UpdateTip(CBlockIndex *pindexNew) { void static UpdateTip(CBlockIndex *pindexNew) {
chainActive.SetTip(pindexNew); chainActive.SetTip(pindexNew);
@ -1837,7 +1877,7 @@ bool static DisconnectTip(CValidationState &state) {
} }
LogPrint("bench", "- Disconnect block: %.2fms\n", (GetTimeMicros() - nStart) * 0.001); LogPrint("bench", "- Disconnect block: %.2fms\n", (GetTimeMicros() - nStart) * 0.001);
// Write the chain state to disk, if necessary. // Write the chain state to disk, if necessary.
if (!WriteChainState(state)) if (!FlushStateToDisk(state, FLUSH_STATE_IF_NEEDED))
return false; return false;
// Resurrect mempool transactions from the disconnected block. // Resurrect mempool transactions from the disconnected block.
BOOST_FOREACH(const CTransaction &tx, block.vtx) { BOOST_FOREACH(const CTransaction &tx, block.vtx) {
@ -1900,7 +1940,7 @@ bool static ConnectTip(CValidationState &state, CBlockIndex *pindexNew, CBlock *
int64_t nTime4 = GetTimeMicros(); nTimeFlush += nTime4 - nTime3; int64_t nTime4 = GetTimeMicros(); nTimeFlush += nTime4 - nTime3;
LogPrint("bench", " - Flush: %.2fms [%.2fs]\n", (nTime4 - nTime3) * 0.001, nTimeFlush * 0.000001); LogPrint("bench", " - Flush: %.2fms [%.2fs]\n", (nTime4 - nTime3) * 0.001, nTimeFlush * 0.000001);
// Write the chain state to disk, if necessary. // Write the chain state to disk, if necessary.
if (!WriteChainState(state)) if (!FlushStateToDisk(state, FLUSH_STATE_IF_NEEDED))
return false; return false;
int64_t nTime5 = GetTimeMicros(); nTimeChainState += nTime5 - nTime4; int64_t nTime5 = GetTimeMicros(); nTimeChainState += nTime5 - nTime4;
LogPrint("bench", " - Writing chainstate: %.2fms [%.2fs]\n", (nTime5 - nTime4) * 0.001, nTimeChainState * 0.000001); LogPrint("bench", " - Writing chainstate: %.2fms [%.2fs]\n", (nTime5 - nTime4) * 0.001, nTimeChainState * 0.000001);
@ -1919,10 +1959,6 @@ bool static ConnectTip(CValidationState &state, CBlockIndex *pindexNew, CBlock *
BOOST_FOREACH(const CTransaction &tx, pblock->vtx) { BOOST_FOREACH(const CTransaction &tx, pblock->vtx) {
SyncWithWallets(tx, pblock); SyncWithWallets(tx, pblock);
} }
// Update best block in wallet (so we can detect restored wallets)
// Emit this signal after the SyncWithWallets signals as the wallet relies on that everything up to this point has been synced
if ((chainActive.Height() % 20160) == 0 || ((chainActive.Height() % 144) == 0 && !IsInitialBlockDownload()))
g_signals.SetBestChain(chainActive.GetLocator());
int64_t nTime6 = GetTimeMicros(); nTimePostConnect += nTime6 - nTime5; nTimeTotal += nTime6 - nTime1; int64_t nTime6 = GetTimeMicros(); nTimePostConnect += nTime6 - nTime5; nTimeTotal += nTime6 - nTime1;
LogPrint("bench", " - Connect postprocess: %.2fms [%.2fs]\n", (nTime6 - nTime5) * 0.001, nTimePostConnect * 0.000001); LogPrint("bench", " - Connect postprocess: %.2fms [%.2fs]\n", (nTime6 - nTime5) * 0.001, nTimePostConnect * 0.000001);
@ -2043,9 +2079,6 @@ static bool ActivateBestChainStep(CValidationState &state, CBlockIndex *pindexMo
else else
CheckForkWarningConditions(); CheckForkWarningConditions();
if (!pblocktree->Flush())
return state.Abort("Failed to sync block index");
return true; return true;
} }
@ -2086,11 +2119,16 @@ bool ActivateBestChain(CValidationState &state, CBlock *pblock) {
if (chainActive.Height() > (pnode->nStartingHeight != -1 ? pnode->nStartingHeight - 2000 : nBlockEstimate)) if (chainActive.Height() > (pnode->nStartingHeight != -1 ? pnode->nStartingHeight - 2000 : nBlockEstimate))
pnode->PushInventory(CInv(MSG_BLOCK, hashNewTip)); pnode->PushInventory(CInv(MSG_BLOCK, hashNewTip));
} }
// Notify external listeners about the new tip.
uiInterface.NotifyBlockTip(hashNewTip); uiInterface.NotifyBlockTip(hashNewTip);
} }
} while(pindexMostWork != chainActive.Tip()); } while(pindexMostWork != chainActive.Tip());
// Write changes periodically to disk, after relay.
if (!FlushStateToDisk(state, FLUSH_STATE_PERIODIC)) {
return false;
}
return true; return true;
} }
@ -2123,8 +2161,7 @@ CBlockIndex* AddToBlockIndex(const CBlockHeader& block)
if (pindexBestHeader == NULL || pindexBestHeader->nChainWork < pindexNew->nChainWork) if (pindexBestHeader == NULL || pindexBestHeader->nChainWork < pindexNew->nChainWork)
pindexBestHeader = pindexNew; pindexBestHeader = pindexNew;
// Ok if it fails, we'll download the header again next time. setDirtyBlockIndex.insert(pindexNew);
pblocktree->WriteBlockIndex(CDiskBlockIndex(pindexNew));
return pindexNew; return pindexNew;
} }
@ -2143,6 +2180,7 @@ bool ReceivedBlockTransactions(const CBlock &block, CValidationState& state, CBl
LOCK(cs_nBlockSequenceId); LOCK(cs_nBlockSequenceId);
pindexNew->nSequenceId = nBlockSequenceId++; pindexNew->nSequenceId = nBlockSequenceId++;
} }
setDirtyBlockIndex.insert(pindexNew);
if (pindexNew->pprev == NULL || pindexNew->pprev->nChainTx) { if (pindexNew->pprev == NULL || pindexNew->pprev->nChainTx) {
// If pindexNew is the genesis block or all parents are BLOCK_VALID_TRANSACTIONS. // If pindexNew is the genesis block or all parents are BLOCK_VALID_TRANSACTIONS.
@ -2162,15 +2200,11 @@ bool ReceivedBlockTransactions(const CBlock &block, CValidationState& state, CBl
range.first++; range.first++;
mapBlocksUnlinked.erase(it); mapBlocksUnlinked.erase(it);
} }
if (!pblocktree->WriteBlockIndex(CDiskBlockIndex(pindex)))
return state.Abort("Failed to write block index");
} }
} else { } else {
if (pindexNew->pprev && pindexNew->pprev->IsValid(BLOCK_VALID_TREE)) { if (pindexNew->pprev && pindexNew->pprev->IsValid(BLOCK_VALID_TREE)) {
mapBlocksUnlinked.insert(std::make_pair(pindexNew->pprev, pindexNew)); mapBlocksUnlinked.insert(std::make_pair(pindexNew->pprev, pindexNew));
} }
if (!pblocktree->WriteBlockIndex(CDiskBlockIndex(pindexNew)))
return state.Abort("Failed to write block index");
} }
return true; return true;
@ -2178,8 +2212,6 @@ bool ReceivedBlockTransactions(const CBlock &block, CValidationState& state, CBl
bool FindBlockPos(CValidationState &state, CDiskBlockPos &pos, unsigned int nAddSize, unsigned int nHeight, uint64_t nTime, bool fKnown = false) bool FindBlockPos(CValidationState &state, CDiskBlockPos &pos, unsigned int nAddSize, unsigned int nHeight, uint64_t nTime, bool fKnown = false)
{ {
bool fUpdatedLast = false;
LOCK(cs_LastBlockFile); LOCK(cs_LastBlockFile);
unsigned int nFile = fKnown ? pos.nFile : nLastBlockFile; unsigned int nFile = fKnown ? pos.nFile : nLastBlockFile;
@ -2195,7 +2227,6 @@ bool FindBlockPos(CValidationState &state, CDiskBlockPos &pos, unsigned int nAdd
if (vinfoBlockFile.size() <= nFile) { if (vinfoBlockFile.size() <= nFile) {
vinfoBlockFile.resize(nFile + 1); vinfoBlockFile.resize(nFile + 1);
} }
fUpdatedLast = true;
} }
pos.nFile = nFile; pos.nFile = nFile;
pos.nPos = vinfoBlockFile[nFile].nSize; pos.nPos = vinfoBlockFile[nFile].nSize;
@ -2222,11 +2253,7 @@ bool FindBlockPos(CValidationState &state, CDiskBlockPos &pos, unsigned int nAdd
} }
} }
if (!pblocktree->WriteBlockFileInfo(nLastBlockFile, vinfoBlockFile[nFile])) setDirtyFileInfo.insert(nFile);
return state.Abort("Failed to write file info");
if (fUpdatedLast)
pblocktree->WriteLastBlockFile(nLastBlockFile);
return true; return true;
} }
@ -2239,9 +2266,7 @@ bool FindUndoPos(CValidationState &state, int nFile, CDiskBlockPos &pos, unsigne
unsigned int nNewSize; unsigned int nNewSize;
pos.nPos = vinfoBlockFile[nFile].nUndoSize; pos.nPos = vinfoBlockFile[nFile].nUndoSize;
nNewSize = vinfoBlockFile[nFile].nUndoSize += nAddSize; nNewSize = vinfoBlockFile[nFile].nUndoSize += nAddSize;
if (!pblocktree->WriteBlockFileInfo(nLastBlockFile, vinfoBlockFile[nLastBlockFile])) { setDirtyFileInfo.insert(nFile);
return state.Abort("Failed to write block info");
}
unsigned int nOldChunks = (pos.nPos + UNDOFILE_CHUNK_SIZE - 1) / UNDOFILE_CHUNK_SIZE; unsigned int nOldChunks = (pos.nPos + UNDOFILE_CHUNK_SIZE - 1) / UNDOFILE_CHUNK_SIZE;
unsigned int nNewChunks = (nNewSize + UNDOFILE_CHUNK_SIZE - 1) / UNDOFILE_CHUNK_SIZE; unsigned int nNewChunks = (nNewSize + UNDOFILE_CHUNK_SIZE - 1) / UNDOFILE_CHUNK_SIZE;
@ -2462,6 +2487,7 @@ bool AcceptBlock(CBlock& block, CValidationState& state, CBlockIndex** ppindex,
if ((!CheckBlock(block, state)) || !ContextualCheckBlock(block, state, pindex->pprev)) { if ((!CheckBlock(block, state)) || !ContextualCheckBlock(block, state, pindex->pprev)) {
if (state.IsInvalid() && !state.CorruptionPossible()) { if (state.IsInvalid() && !state.CorruptionPossible()) {
pindex->nStatus |= BLOCK_FAILED_VALID; pindex->nStatus |= BLOCK_FAILED_VALID;
setDirtyBlockIndex.insert(pindex);
} }
return false; return false;
} }
@ -3070,7 +3096,7 @@ bool InitBlockIndex() {
if (!ActivateBestChain(state, &block)) if (!ActivateBestChain(state, &block))
return error("LoadBlockIndex() : genesis block cannot be activated"); return error("LoadBlockIndex() : genesis block cannot be activated");
// Force a chainstate write so that when we VerifyDB in a moment, it doesnt check stale data // Force a chainstate write so that when we VerifyDB in a moment, it doesnt check stale data
return WriteChainState(state, true); return FlushStateToDisk(state, FLUSH_STATE_ALWAYS);
} catch(std::runtime_error &e) { } catch(std::runtime_error &e) {
return error("LoadBlockIndex() : failed to initialize block database: %s", e.what()); return error("LoadBlockIndex() : failed to initialize block database: %s", e.what());
} }
@ -4641,11 +4667,6 @@ bool CBlockUndo::WriteToDisk(CDiskBlockPos &pos, const uint256 &hashBlock)
hasher << *this; hasher << *this;
fileout << hasher.GetHash(); fileout << hasher.GetHash();
// Flush stdio buffers and commit to disk before returning
fflush(fileout.Get());
if (!IsInitialBlockDownload())
FileCommit(fileout.Get());
return true; return true;
} }

4
src/main.h

@ -94,6 +94,8 @@ static const unsigned int MAX_HEADERS_RESULTS = 2000;
* degree of disordering of blocks on disk (which make reindexing and in the future perhaps pruning * degree of disordering of blocks on disk (which make reindexing and in the future perhaps pruning
* harder). We'll probably want to make this a per-peer adaptive value at some point. */ * harder). We'll probably want to make this a per-peer adaptive value at some point. */
static const unsigned int BLOCK_DOWNLOAD_WINDOW = 1024; static const unsigned int BLOCK_DOWNLOAD_WINDOW = 1024;
/** Time to wait (in seconds) between writing blockchain state to disk. */
static const unsigned int DATABASE_WRITE_INTERVAL = 3600;
/** "reject" message codes **/ /** "reject" message codes **/
static const unsigned char REJECT_MALFORMED = 0x01; static const unsigned char REJECT_MALFORMED = 0x01;
@ -201,6 +203,8 @@ bool AbortNode(const std::string &msg, const std::string &userMessage="");
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
/** Increase a node's misbehavior score. */ /** Increase a node's misbehavior score. */
void Misbehaving(NodeId nodeid, int howmuch); void Misbehaving(NodeId nodeid, int howmuch);
/** Flush all state, indexes and buffers to disk. */
void FlushStateToDisk();
/** (try to) add transaction to memory pool **/ /** (try to) add transaction to memory pool **/

2
src/rpcblockchain.cpp

@ -319,7 +319,7 @@ Value gettxoutsetinfo(const Array& params, bool fHelp)
Object ret; Object ret;
CCoinsStats stats; CCoinsStats stats;
pcoinsTip->Flush(); FlushStateToDisk();
if (pcoinsTip->GetStats(stats)) { if (pcoinsTip->GetStats(stats)) {
ret.push_back(Pair("height", (int64_t)stats.nHeight)); ret.push_back(Pair("height", (int64_t)stats.nHeight));
ret.push_back(Pair("bestblock", stats.hashBlock.GetHex())); ret.push_back(Pair("bestblock", stats.hashBlock.GetHex()));

Loading…
Cancel
Save