Browse Source

Merge #10286: Call wallet notify callbacks in scheduler thread (without cs_main)

89f0312 Remove redundant pwallet nullptr check (Matt Corallo)
c4784b5 Add a dev notes document describing the new wallet RPC blocking (Matt Corallo)
3ea8b75 Give ZMQ consistent order with UpdatedBlockTip on scheduler thread (Matt Corallo)
cb06edf Fix wallet RPC race by waiting for callbacks in sendrawtransaction (Matt Corallo)
e545ded Also call other wallet notify callbacks in scheduler thread (Matt Corallo)
17220d6 Use callbacks to cache whether wallet transactions are in mempool (Matt Corallo)
5d67a78 Add calls to CWallet::BlockUntilSyncedToCurrentChain() in RPCs (Matt Corallo)
5ee3172 Add CWallet::BlockUntilSyncedToCurrentChain() (Matt Corallo)
0b2f42d Add CallFunctionInQueue to wait on validation interface queue drain (Matt Corallo)
2b4b345 Add ability to assert a lock is not held in DEBUG_LOCKORDER (Matt Corallo)
0343676 Call TransactionRemovedFromMempool in the CScheduler thread (Matt Corallo)
a7d3936 Add a CValidationInterface::TransactionRemovedFromMempool (Matt Corallo)

Pull request description:

  Based on #10179, this effectively reverts #9583, regaining most of the original speedups of #7946.

  This concludes the work of #9725, #10178, and #10179.

  See individual commit messages for more information.

Tree-SHA512: eead4809b0a75d1fb33b0765174ff52c972e45040635e38cf3686cef310859c1e6b3c00e7186cbd17374c6ae547bfbd6c1718fe36f26c76ba8a8b052d6ed7bc9
0.16
Wladimir J. van der Laan 7 years ago
parent
commit
927a1d7d08
No known key found for this signature in database
GPG Key ID: 1E4AED62986CD25D
  1. 13
      doc/developer-notes.md
  2. 2
      src/init.cpp
  3. 23
      src/rpc/rawtransaction.cpp
  4. 10
      src/sync.cpp
  5. 3
      src/sync.h
  6. 3
      src/txmempool.h
  7. 2
      src/validation.cpp
  8. 51
      src/validationinterface.cpp
  9. 70
      src/validationinterface.h
  10. 101
      src/wallet/rpcwallet.cpp
  11. 76
      src/wallet/wallet.cpp
  12. 28
      src/wallet/wallet.h

13
doc/developer-notes.md

@ -675,3 +675,16 @@ A few guidelines for introducing and reviewing new RPC interfaces: @@ -675,3 +675,16 @@ A few guidelines for introducing and reviewing new RPC interfaces:
- *Rationale*: If a RPC response is not a JSON object then it is harder to avoid API breakage if
new data in the response is needed.
- Wallet RPCs call BlockUntilSyncedToCurrentChain to maintain consistency with
`getblockchaininfo`'s state immediately prior to the call's execution. Wallet
RPCs whose behavior does *not* depend on the current chainstate may omit this
call.
- *Rationale*: In previous versions of Bitcoin Core, the wallet was always
in-sync with the chainstate (by virtue of them all being updated in the
same cs_main lock). In order to maintain the behavior that wallet RPCs
return results as of at least the highest best-known block an RPC
client may be aware of prior to entering a wallet RPC call, we must block
until the wallet is caught up to the chainstate as of the RPC call's entry.
This also makes the API much easier for RPC clients to reason about.

2
src/init.cpp

@ -261,6 +261,7 @@ void Shutdown() @@ -261,6 +261,7 @@ void Shutdown()
#endif
UnregisterAllValidationInterfaces();
GetMainSignals().UnregisterBackgroundSignalScheduler();
GetMainSignals().UnregisterWithMempoolSignals(mempool);
#ifdef ENABLE_WALLET
CloseWallets();
#endif
@ -1236,6 +1237,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) @@ -1236,6 +1237,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
GetMainSignals().RegisterWithMempoolSignals(mempool);
/* Start the RPC server already. It will be started in "warmup" mode
* and not really process calls already (but it will signify connections

23
src/rpc/rawtransaction.cpp

@ -11,6 +11,7 @@ @@ -11,6 +11,7 @@
#include "init.h"
#include "keystore.h"
#include "validation.h"
#include "validationinterface.h"
#include "merkleblock.h"
#include "net.h"
#include "policy/policy.h"
@ -30,6 +31,7 @@ @@ -30,6 +31,7 @@
#include "wallet/wallet.h"
#endif
#include <future>
#include <stdint.h>
#include <univalue.h>
@ -917,7 +919,9 @@ UniValue sendrawtransaction(const JSONRPCRequest& request) @@ -917,7 +919,9 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
);
ObserveSafeMode();
LOCK(cs_main);
std::promise<void> promise;
RPCTypeCheck(request.params, {UniValue::VSTR, UniValue::VBOOL});
// parse hex string from parameter
@ -931,6 +935,8 @@ UniValue sendrawtransaction(const JSONRPCRequest& request) @@ -931,6 +935,8 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
if (!request.params[1].isNull() && request.params[1].get_bool())
nMaxRawTxFee = 0;
{ // cs_main scope
LOCK(cs_main);
CCoinsViewCache &view = *pcoinsTip;
bool fHaveChain = false;
for (size_t o = 0; !fHaveChain && o < tx->vout.size(); o++) {
@ -952,10 +958,24 @@ UniValue sendrawtransaction(const JSONRPCRequest& request) @@ -952,10 +958,24 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
}
throw JSONRPCError(RPC_TRANSACTION_ERROR, state.GetRejectReason());
}
} else {
// If wallet is enabled, ensure that the wallet has been made aware
// of the new transaction prior to returning. This prevents a race
// where a user might call sendrawtransaction with a transaction
// to/from their wallet, immediately call some wallet RPC, and get
// a stale result because callbacks have not yet been processed.
CallFunctionInValidationInterfaceQueue([&promise] {
promise.set_value();
});
}
} else if (fHaveChain) {
throw JSONRPCError(RPC_TRANSACTION_ALREADY_IN_CHAIN, "transaction already in block chain");
}
} // cs_main
promise.get_future().wait();
if(!g_connman)
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled");
@ -964,6 +984,7 @@ UniValue sendrawtransaction(const JSONRPCRequest& request) @@ -964,6 +984,7 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
{
pnode->PushInventory(inv);
});
return hashTx.GetHex();
}

10
src/sync.cpp

@ -155,6 +155,16 @@ void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, @@ -155,6 +155,16 @@ void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine,
abort();
}
void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs)
{
for (const std::pair<void*, CLockLocation>& i : *lockstack) {
if (i.first == cs) {
fprintf(stderr, "Assertion failed: lock %s held in %s:%i; locks held:\n%s", pszName, pszFile, nLine, LocksHeld().c_str());
abort();
}
}
}
void DeleteLock(void* cs)
{
if (!lockdata.available) {

3
src/sync.h

@ -77,14 +77,17 @@ void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs @@ -77,14 +77,17 @@ void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs
void LeaveCritical();
std::string LocksHeld();
void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs);
void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs);
void DeleteLock(void* cs);
#else
void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false) {}
void static inline LeaveCritical() {}
void static inline AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {}
void static inline AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {}
void static inline DeleteLock(void* cs) {}
#endif
#define AssertLockHeld(cs) AssertLockHeldInternal(#cs, __FILE__, __LINE__, &cs)
#define AssertLockNotHeld(cs) AssertLockNotHeldInternal(#cs, __FILE__, __LINE__, &cs)
/**
* Wrapped mutex: supports recursive locking, but no waiting

3
src/txmempool.h

@ -513,6 +513,9 @@ public: @@ -513,6 +513,9 @@ public:
// to track size/count of descendant transactions. First version of
// addUnchecked can be used to have it call CalculateMemPoolAncestors(), and
// then invoke the second version.
// Note that addUnchecked is ONLY called from ATMP outside of tests
// and any other callers may break wallet's in-mempool tracking (due to
// lack of CValidationInterface::TransactionAddedToMempool callbacks).
bool addUnchecked(const uint256& hash, const CTxMemPoolEntry &entry, bool validFeeEstimate = true);
bool addUnchecked(const uint256& hash, const CTxMemPoolEntry &entry, setEntries &setAncestors, bool validFeeEstimate = true);

2
src/validation.cpp

@ -2494,7 +2494,7 @@ bool ActivateBestChain(CValidationState &state, const CChainParams& chainparams, @@ -2494,7 +2494,7 @@ bool ActivateBestChain(CValidationState &state, const CChainParams& chainparams,
for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
assert(trace.pblock && trace.pindex);
GetMainSignals().BlockConnected(trace.pblock, trace.pindex, *trace.conflictedTxs);
GetMainSignals().BlockConnected(trace.pblock, trace.pindex, trace.conflictedTxs);
}
}
// When we reach this point, we switched to a new tip (stored in pindexNewTip).

51
src/validationinterface.cpp

@ -9,6 +9,7 @@ @@ -9,6 +9,7 @@
#include "primitives/block.h"
#include "scheduler.h"
#include "sync.h"
#include "txmempool.h"
#include "util.h"
#include <list>
@ -21,6 +22,7 @@ struct MainSignalsInstance { @@ -21,6 +22,7 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const CTransactionRef &)> TransactionAddedToMempool;
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef>&)> BlockConnected;
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &)> BlockDisconnected;
boost::signals2::signal<void (const CTransactionRef &)> TransactionRemovedFromMempool;
boost::signals2::signal<void (const CBlockLocator &)> SetBestChain;
boost::signals2::signal<void (const uint256 &)> Inventory;
boost::signals2::signal<void (int64_t nBestBlockTime, CConnman* connman)> Broadcast;
@ -50,6 +52,14 @@ void CMainSignals::FlushBackgroundCallbacks() { @@ -50,6 +52,14 @@ void CMainSignals::FlushBackgroundCallbacks() {
m_internals->m_schedulerClient.EmptyQueue();
}
void CMainSignals::RegisterWithMempoolSignals(CTxMemPool& pool) {
pool.NotifyEntryRemoved.connect(boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2));
}
void CMainSignals::UnregisterWithMempoolSignals(CTxMemPool& pool) {
pool.NotifyEntryRemoved.disconnect(boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2));
}
CMainSignals& GetMainSignals()
{
return g_signals;
@ -60,6 +70,7 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) { @@ -60,6 +70,7 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->TransactionAddedToMempool.connect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
g_signals.m_internals->BlockConnected.connect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
g_signals.m_internals->BlockDisconnected.connect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
g_signals.m_internals->TransactionRemovedFromMempool.connect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
g_signals.m_internals->SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
g_signals.m_internals->Inventory.connect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1));
g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2));
@ -75,6 +86,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) { @@ -75,6 +86,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->TransactionAddedToMempool.disconnect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
g_signals.m_internals->BlockConnected.disconnect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
g_signals.m_internals->BlockDisconnected.disconnect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
}
@ -87,32 +99,57 @@ void UnregisterAllValidationInterfaces() { @@ -87,32 +99,57 @@ void UnregisterAllValidationInterfaces() {
g_signals.m_internals->TransactionAddedToMempool.disconnect_all_slots();
g_signals.m_internals->BlockConnected.disconnect_all_slots();
g_signals.m_internals->BlockDisconnected.disconnect_all_slots();
g_signals.m_internals->TransactionRemovedFromMempool.disconnect_all_slots();
g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots();
g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots();
}
void CallFunctionInValidationInterfaceQueue(std::function<void ()> func) {
g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func));
}
void CMainSignals::MempoolEntryRemoved(CTransactionRef ptx, MemPoolRemovalReason reason) {
if (reason != MemPoolRemovalReason::BLOCK && reason != MemPoolRemovalReason::CONFLICT) {
m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] {
m_internals->TransactionRemovedFromMempool(ptx);
});
}
}
void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
m_internals->m_schedulerClient.AddToProcessQueue([pindexNew, pindexFork, fInitialDownload, this] {
m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
});
}
void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) {
m_internals->TransactionAddedToMempool(ptx);
m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] {
m_internals->TransactionAddedToMempool(ptx);
});
}
void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex, const std::vector<CTransactionRef>& vtxConflicted) {
m_internals->BlockConnected(pblock, pindex, vtxConflicted);
void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>>& pvtxConflicted) {
m_internals->m_schedulerClient.AddToProcessQueue([pblock, pindex, pvtxConflicted, this] {
m_internals->BlockConnected(pblock, pindex, *pvtxConflicted);
});
}
void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock> &pblock) {
m_internals->BlockDisconnected(pblock);
m_internals->m_schedulerClient.AddToProcessQueue([pblock, this] {
m_internals->BlockDisconnected(pblock);
});
}
void CMainSignals::SetBestChain(const CBlockLocator &locator) {
m_internals->SetBestChain(locator);
m_internals->m_schedulerClient.AddToProcessQueue([locator, this] {
m_internals->SetBestChain(locator);
});
}
void CMainSignals::Inventory(const uint256 &hash) {
m_internals->Inventory(hash);
m_internals->m_schedulerClient.AddToProcessQueue([hash, this] {
m_internals->Inventory(hash);
});
}
void CMainSignals::Broadcast(int64_t nBestBlockTime, CConnman* connman) {

70
src/validationinterface.h

@ -6,10 +6,11 @@ @@ -6,10 +6,11 @@
#ifndef BITCOIN_VALIDATIONINTERFACE_H
#define BITCOIN_VALIDATIONINTERFACE_H
#include <memory>
#include "primitives/transaction.h" // CTransaction(Ref)
#include <functional>
#include <memory>
class CBlock;
class CBlockIndex;
struct CBlockLocator;
@ -20,6 +21,8 @@ class CValidationInterface; @@ -20,6 +21,8 @@ class CValidationInterface;
class CValidationState;
class uint256;
class CScheduler;
class CTxMemPool;
enum class MemPoolRemovalReason;
// These functions dispatch to one or all registered wallets
@ -29,23 +32,66 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn); @@ -29,23 +32,66 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn);
void UnregisterValidationInterface(CValidationInterface* pwalletIn);
/** Unregister all wallets from core */
void UnregisterAllValidationInterfaces();
/**
* Pushes a function to callback onto the notification queue, guaranteeing any
* callbacks generated prior to now are finished when the function is called.
*
* Be very careful blocking on func to be called if any locks are held -
* validation interface clients may not be able to make progress as they often
* wait for things like cs_main, so blocking until func is called with cs_main
* will result in a deadlock (that DEBUG_LOCKORDER will miss).
*/
void CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
class CValidationInterface {
protected:
/** Notifies listeners of updated block chain tip */
/**
* Notifies listeners of updated block chain tip
*
* Called on a background thread.
*/
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}
/** Notifies listeners of a transaction having been added to mempool. */
/**
* Notifies listeners of a transaction having been added to mempool.
*
* Called on a background thread.
*/
virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {}
/**
* Notifies listeners of a transaction leaving mempool.
*
* This only fires for transactions which leave mempool because of expiry,
* size limiting, reorg (changes in lock times/coinbase maturity), or
* replacement. This does not include any transactions which are included
* in BlockConnectedDisconnected either in block->vtx or in txnConflicted.
*
* Called on a background thread.
*/
virtual void TransactionRemovedFromMempool(const CTransactionRef &ptx) {}
/**
* Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result.
*
* Called on a background thread.
*/
virtual void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex, const std::vector<CTransactionRef> &txnConflicted) {}
/** Notifies listeners of a block being disconnected */
/**
* Notifies listeners of a block being disconnected
*
* Called on a background thread.
*/
virtual void BlockDisconnected(const std::shared_ptr<const CBlock> &block) {}
/** Notifies listeners of the new active block chain on-disk. */
/**
* Notifies listeners of the new active block chain on-disk.
*
* Called on a background thread.
*/
virtual void SetBestChain(const CBlockLocator &locator) {}
/** Notifies listeners about an inventory item being seen on the network. */
/**
* Notifies listeners about an inventory item being seen on the network.
*
* Called on a background thread.
*/
virtual void Inventory(const uint256 &hash) {}
/** Tells listeners to broadcast their data. */
virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) {}
@ -73,6 +119,9 @@ private: @@ -73,6 +119,9 @@ private:
friend void ::RegisterValidationInterface(CValidationInterface*);
friend void ::UnregisterValidationInterface(CValidationInterface*);
friend void ::UnregisterAllValidationInterfaces();
friend void ::CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
void MempoolEntryRemoved(CTransactionRef tx, MemPoolRemovalReason reason);
public:
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
@ -82,9 +131,14 @@ public: @@ -82,9 +131,14 @@ public:
/** Call any remaining callbacks on the calling thread */
void FlushBackgroundCallbacks();
/** Register with mempool to call TransactionRemovedFromMempool callbacks */
void RegisterWithMempoolSignals(CTxMemPool& pool);
/** Unregister with mempool */
void UnregisterWithMempoolSignals(CTxMemPool& pool);
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const CTransactionRef &);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef> &);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>> &);
void BlockDisconnected(const std::shared_ptr<const CBlock> &);
void SetBestChain(const CBlockLocator &);
void Inventory(const uint256 &);

101
src/wallet/rpcwallet.cpp

@ -455,6 +455,11 @@ UniValue sendtoaddress(const JSONRPCRequest& request) @@ -455,6 +455,11 @@ UniValue sendtoaddress(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
CTxDestination dest = DecodeDestination(request.params[0].get_str());
@ -533,6 +538,11 @@ UniValue listaddressgroupings(const JSONRPCRequest& request) @@ -533,6 +538,11 @@ UniValue listaddressgroupings(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
UniValue jsonGroupings(UniValue::VARR);
@ -645,6 +655,11 @@ UniValue getreceivedbyaddress(const JSONRPCRequest& request) @@ -645,6 +655,11 @@ UniValue getreceivedbyaddress(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
// Bitcoin address
@ -707,6 +722,11 @@ UniValue getreceivedbyaccount(const JSONRPCRequest& request) @@ -707,6 +722,11 @@ UniValue getreceivedbyaccount(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
// Minimum confirmations
@ -780,6 +800,11 @@ UniValue getbalance(const JSONRPCRequest& request) @@ -780,6 +800,11 @@ UniValue getbalance(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
const UniValue& account_value = request.params[0];
@ -825,6 +850,11 @@ UniValue getunconfirmedbalance(const JSONRPCRequest &request) @@ -825,6 +850,11 @@ UniValue getunconfirmedbalance(const JSONRPCRequest &request)
"Returns the server's total unconfirmed balance\n");
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
return ValueFromAmount(pwallet->GetUnconfirmedBalance());
@ -919,6 +949,11 @@ UniValue sendfrom(const JSONRPCRequest& request) @@ -919,6 +949,11 @@ UniValue sendfrom(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
std::string strAccount = AccountFromValue(request.params[0]);
@ -1004,6 +1039,11 @@ UniValue sendmany(const JSONRPCRequest& request) @@ -1004,6 +1039,11 @@ UniValue sendmany(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
if (pwallet->GetBroadcastTransactions() && !g_connman) {
@ -1455,6 +1495,11 @@ UniValue listreceivedbyaddress(const JSONRPCRequest& request) @@ -1455,6 +1495,11 @@ UniValue listreceivedbyaddress(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
return ListReceived(pwallet, request.params, false);
@ -1495,6 +1540,11 @@ UniValue listreceivedbyaccount(const JSONRPCRequest& request) @@ -1495,6 +1540,11 @@ UniValue listreceivedbyaccount(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
return ListReceived(pwallet, request.params, true);
@ -1683,6 +1733,11 @@ UniValue listtransactions(const JSONRPCRequest& request) @@ -1683,6 +1733,11 @@ UniValue listtransactions(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
std::string strAccount = "*";
@ -1777,6 +1832,11 @@ UniValue listaccounts(const JSONRPCRequest& request) @@ -1777,6 +1832,11 @@ UniValue listaccounts(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
int nMinDepth = 1;
@ -1886,6 +1946,11 @@ UniValue listsinceblock(const JSONRPCRequest& request) @@ -1886,6 +1946,11 @@ UniValue listsinceblock(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
const CBlockIndex* pindex = nullptr; // Block index of the specified block or the common ancestor, if the block provided was in a deactivated chain.
@ -2019,6 +2084,11 @@ UniValue gettransaction(const JSONRPCRequest& request) @@ -2019,6 +2084,11 @@ UniValue gettransaction(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
uint256 hash;
@ -2081,6 +2151,11 @@ UniValue abandontransaction(const JSONRPCRequest& request) @@ -2081,6 +2151,11 @@ UniValue abandontransaction(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
uint256 hash;
@ -2115,6 +2190,10 @@ UniValue backupwallet(const JSONRPCRequest& request) @@ -2115,6 +2190,10 @@ UniValue backupwallet(const JSONRPCRequest& request)
+ HelpExampleRpc("backupwallet", "\"backup.dat\"")
);
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
std::string strDest = request.params[0].get_str();
@ -2434,6 +2513,10 @@ UniValue lockunspent(const JSONRPCRequest& request) @@ -2434,6 +2513,10 @@ UniValue lockunspent(const JSONRPCRequest& request)
+ HelpExampleRpc("lockunspent", "false, \"[{\\\"txid\\\":\\\"a08e6907dbbd3d809776dbfc5d82e371b764ed838b5655e72f463568df1aadf0\\\",\\\"vout\\\":1}]\"")
);
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
RPCTypeCheckArgument(request.params[0], UniValue::VBOOL);
@ -2593,6 +2676,11 @@ UniValue getwalletinfo(const JSONRPCRequest& request) @@ -2593,6 +2676,11 @@ UniValue getwalletinfo(const JSONRPCRequest& request)
);
ObserveSafeMode();
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
UniValue obj(UniValue::VOBJ);
@ -2802,9 +2890,12 @@ UniValue listunspent(const JSONRPCRequest& request) @@ -2802,9 +2890,12 @@ UniValue listunspent(const JSONRPCRequest& request)
nMaximumCount = options["maximumCount"].get_int64();
}
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
UniValue results(UniValue::VARR);
std::vector<COutput> vecOutputs;
assert(pwallet != nullptr);
LOCK2(cs_main, pwallet->cs_wallet);
pwallet->AvailableCoins(vecOutputs, !include_unsafe, nullptr, nMinimumAmount, nMaximumAmount, nMinimumSumAmount, nMaximumCount, nMinDepth, nMaxDepth);
@ -2912,6 +3003,10 @@ UniValue fundrawtransaction(const JSONRPCRequest& request) @@ -2912,6 +3003,10 @@ UniValue fundrawtransaction(const JSONRPCRequest& request)
ObserveSafeMode();
RPCTypeCheck(request.params, {UniValue::VSTR});
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
CCoinControl coinControl;
int changePosition = -1;
bool lockUnspents = false;
@ -3122,6 +3217,10 @@ UniValue bumpfee(const JSONRPCRequest& request) @@ -3122,6 +3217,10 @@ UniValue bumpfee(const JSONRPCRequest& request)
}
}
// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwallet->BlockUntilSyncedToCurrentChain();
LOCK2(cs_main, pwallet->cs_wallet);
EnsureWalletIsUnlocked(pwallet);

76
src/wallet/wallet.cpp

@ -33,6 +33,7 @@ @@ -33,6 +33,7 @@
#include "wallet/fees.h"
#include <assert.h>
#include <future>
#include <boost/algorithm/string/replace.hpp>
#include <boost/thread.hpp>
@ -1217,6 +1218,19 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, const CBlockIndex *pin @@ -1217,6 +1218,19 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, const CBlockIndex *pin
void CWallet::TransactionAddedToMempool(const CTransactionRef& ptx) {
LOCK2(cs_main, cs_wallet);
SyncTransaction(ptx);
auto it = mapWallet.find(ptx->GetHash());
if (it != mapWallet.end()) {
it->second.fInMempool = true;
}
}
void CWallet::TransactionRemovedFromMempool(const CTransactionRef &ptx) {
LOCK(cs_wallet);
auto it = mapWallet.find(ptx->GetHash());
if (it != mapWallet.end()) {
it->second.fInMempool = false;
}
}
void CWallet::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex *pindex, const std::vector<CTransactionRef>& vtxConflicted) {
@ -1231,10 +1245,14 @@ void CWallet::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const @@ -1231,10 +1245,14 @@ void CWallet::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const
for (const CTransactionRef& ptx : vtxConflicted) {
SyncTransaction(ptx);
TransactionRemovedFromMempool(ptx);
}
for (size_t i = 0; i < pblock->vtx.size(); i++) {
SyncTransaction(pblock->vtx[i], pindex, i);
TransactionRemovedFromMempool(pblock->vtx[i]);
}
m_last_block_processed = pindex;
}
void CWallet::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) {
@ -1247,6 +1265,36 @@ void CWallet::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) { @@ -1247,6 +1265,36 @@ void CWallet::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) {
void CWallet::BlockUntilSyncedToCurrentChain() {
AssertLockNotHeld(cs_main);
AssertLockNotHeld(cs_wallet);
{
// Skip the queue-draining stuff if we know we're caught up with
// chainActive.Tip()...
// We could also take cs_wallet here, and call m_last_block_processed
// protected by cs_wallet instead of cs_main, but as long as we need
// cs_main here anyway, its easier to just call it cs_main-protected.
LOCK(cs_main);
const CBlockIndex* initialChainTip = chainActive.Tip();
if (m_last_block_processed->GetAncestor(initialChainTip->nHeight) == initialChainTip) {
return;
}
}
// ...otherwise put a callback in the validation interface queue and wait
// for the queue to drain enough to execute it (indicating we are caught up
// at least with the time we entered this function).
std::promise<void> promise;
CallFunctionInValidationInterfaceQueue([&promise] {
promise.set_value();
});
promise.get_future().wait();
}
isminetype CWallet::IsMine(const CTxIn &txin) const
{
{
@ -1871,8 +1919,7 @@ CAmount CWalletTx::GetChange() const @@ -1871,8 +1919,7 @@ CAmount CWalletTx::GetChange() const
bool CWalletTx::InMempool() const
{
LOCK(mempool.cs);
return mempool.exists(GetHash());
return fInMempool;
}
bool CWalletTx::IsTrusted() const
@ -2980,14 +3027,18 @@ bool CWallet::CommitTransaction(CWalletTx& wtxNew, CReserveKey& reservekey, CCon @@ -2980,14 +3027,18 @@ bool CWallet::CommitTransaction(CWalletTx& wtxNew, CReserveKey& reservekey, CCon
// Track how many getdata requests our transaction gets
mapRequestCount[wtxNew.GetHash()] = 0;
// Get the inserted-CWalletTx from mapWallet so that the
// fInMempool flag is cached properly
CWalletTx& wtx = mapWallet[wtxNew.GetHash()];
if (fBroadcastTransactions)
{
// Broadcast
if (!wtxNew.AcceptToMemoryPool(maxTxFee, state)) {
if (!wtx.AcceptToMemoryPool(maxTxFee, state)) {
LogPrintf("CommitTransaction(): Transaction cannot be broadcast immediately, %s\n", state.GetRejectReason());
// TODO: if we expect the failure to be long term or permanent, instead delete wtx from the wallet and return failure.
} else {
wtxNew.RelayWalletTransaction(connman);
wtx.RelayWalletTransaction(connman);
}
}
}
@ -3903,8 +3954,6 @@ CWallet* CWallet::CreateWalletFromFile(const std::string walletFile) @@ -3903,8 +3954,6 @@ CWallet* CWallet::CreateWalletFromFile(const std::string walletFile)
LogPrintf(" wallet %15dms\n", GetTimeMillis() - nStart);
RegisterValidationInterface(walletInstance);
// Try to top up keypool. No-op if the wallet is locked.
walletInstance->TopUpKeyPool();
@ -3916,6 +3965,10 @@ CWallet* CWallet::CreateWalletFromFile(const std::string walletFile) @@ -3916,6 +3965,10 @@ CWallet* CWallet::CreateWalletFromFile(const std::string walletFile)
if (walletdb.ReadBestBlock(locator))
pindexRescan = FindForkInGlobalIndex(chainActive, locator);
}
walletInstance->m_last_block_processed = chainActive.Tip();
RegisterValidationInterface(walletInstance);
if (chainActive.Tip() && chainActive.Tip() != pindexRescan)
{
//We can't rescan beyond non-pruned blocks, stop and throw an error
@ -4059,8 +4112,15 @@ int CMerkleTx::GetBlocksToMaturity() const @@ -4059,8 +4112,15 @@ int CMerkleTx::GetBlocksToMaturity() const
}
bool CMerkleTx::AcceptToMemoryPool(const CAmount& nAbsurdFee, CValidationState& state)
bool CWalletTx::AcceptToMemoryPool(const CAmount& nAbsurdFee, CValidationState& state)
{
return ::AcceptToMemoryPool(mempool, state, tx, nullptr /* pfMissingInputs */,
// We must set fInMempool here - while it will be re-set to true by the
// entered-mempool callback, if we did not there would be a race where a
// user could call sendmoney in a loop and hit spurious out of funds errors
// because we think that the transaction they just generated's change is
// unavailable as we're not yet aware its in mempool.
bool ret = ::AcceptToMemoryPool(mempool, state, tx, nullptr /* pfMissingInputs */,
nullptr /* plTxnReplaced */, false /* bypass_limits */, nAbsurdFee);
fInMempool = ret;
return ret;
}

28
src/wallet/wallet.h

@ -248,8 +248,6 @@ public: @@ -248,8 +248,6 @@ public:
int GetDepthInMainChain() const { const CBlockIndex *pindexRet; return GetDepthInMainChain(pindexRet); }
bool IsInMainChain() const { const CBlockIndex *pindexRet; return GetDepthInMainChain(pindexRet) > 0; }
int GetBlocksToMaturity() const;
/** Pass this transaction to the mempool. Fails if absolute fee exceeds absurd fee. */
bool AcceptToMemoryPool(const CAmount& nAbsurdFee, CValidationState& state);
bool hashUnset() const { return (hashBlock.IsNull() || hashBlock == ABANDON_HASH); }
bool isAbandoned() const { return (hashBlock == ABANDON_HASH); }
void setAbandoned() { hashBlock = ABANDON_HASH; }
@ -326,6 +324,7 @@ public: @@ -326,6 +324,7 @@ public:
mutable bool fImmatureWatchCreditCached;
mutable bool fAvailableWatchCreditCached;
mutable bool fChangeCached;
mutable bool fInMempool;
mutable CAmount nDebitCached;
mutable CAmount nCreditCached;
mutable CAmount nImmatureCreditCached;
@ -365,6 +364,7 @@ public: @@ -365,6 +364,7 @@ public:
fImmatureWatchCreditCached = false;
fAvailableWatchCreditCached = false;
fChangeCached = false;
fInMempool = false;
nDebitCached = 0;
nCreditCached = 0;
nImmatureCreditCached = 0;
@ -469,6 +469,9 @@ public: @@ -469,6 +469,9 @@ public:
// RelayWalletTransaction may only be called if fBroadcastTransactions!
bool RelayWalletTransaction(CConnman* connman);
/** Pass this transaction to the mempool. Fails if absolute fee exceeds absurd fee. */
bool AcceptToMemoryPool(const CAmount& nAbsurdFee, CValidationState& state);
std::set<uint256> GetConflicts() const;
};
@ -718,6 +721,18 @@ private: @@ -718,6 +721,18 @@ private:
std::unique_ptr<CWalletDBWrapper> dbw;
/**
* The following is used to keep track of how far behind the wallet is
* from the chain sync, and to allow clients to block on us being caught up.
*
* Note that this is *not* how far we've processed, we may need some rescan
* to have seen all transactions in the chain, but is only used to track
* live BlockConnected callbacks.
*
* Protected by cs_main (see BlockUntilSyncedToCurrentChain)
*/
const CBlockIndex* m_last_block_processed;
public:
/*
* Main wallet lock.
@ -916,6 +931,7 @@ public: @@ -916,6 +931,7 @@ public:
bool AddToWalletIfInvolvingMe(const CTransactionRef& tx, const CBlockIndex* pIndex, int posInBlock, bool fUpdate);
int64_t RescanFromTime(int64_t startTime, bool update);
CBlockIndex* ScanForWalletTransactions(CBlockIndex* pindexStart, CBlockIndex* pindexStop, bool fUpdate = false);
void TransactionRemovedFromMempool(const CTransactionRef &ptx) override;
void ReacceptWalletTransactions();
void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) override;
// ResendWalletTransactionsBefore may only be called if fBroadcastTransactions!
@ -1102,6 +1118,14 @@ public: @@ -1102,6 +1118,14 @@ public:
caller must ensure the current wallet version is correct before calling
this function). */
bool SetHDMasterKey(const CPubKey& key);
/**
* Blocks until the wallet state is up-to-date to /at least/ the current
* chain at the time this function is entered
* Obviously holding cs_main/cs_wallet when going into this call may cause
* deadlock
*/
void BlockUntilSyncedToCurrentChain();
};
/** A key allocated from the key pool. */

Loading…
Cancel
Save