Browse Source

Refactor ProcessGetData in anticipation of avoiding cs_main for ABC

0.16
Matt Corallo 7 years ago
parent
commit
66aa1d58a1
  1. 321
      src/net_processing.cpp

321
src/net_processing.cpp

@ -1038,182 +1038,193 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman* connma
connman->ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); connman->ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
} }
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman* connman, const std::atomic<bool>& interruptMsgProc) void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensusParams, const CInv& inv, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
{ {
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
std::vector<CInv> vNotFound;
const CNetMsgMaker msgMaker(pfrom->GetSendVersion());
LOCK(cs_main); LOCK(cs_main);
while (it != pfrom->vRecvGetData.end()) { bool send = false;
// Don't bother if send buffer is too full to respond anyway std::shared_ptr<const CBlock> a_recent_block;
if (pfrom->fPauseSend) std::shared_ptr<const CBlockHeaderAndShortTxIDs> a_recent_compact_block;
break; bool fWitnessesPresentInARecentCompactBlock;
{
LOCK(cs_most_recent_block);
a_recent_block = most_recent_block;
a_recent_compact_block = most_recent_compact_block;
fWitnessesPresentInARecentCompactBlock = fWitnessesPresentInMostRecentCompactBlock;
}
const CInv &inv = *it; {
BlockMap::iterator mi = mapBlockIndex.find(inv.hash);
if (mi != mapBlockIndex.end())
{ {
if (interruptMsgProc) if (mi->second->nChainTx && !mi->second->IsValid(BLOCK_VALID_SCRIPTS) &&
return; mi->second->IsValid(BLOCK_VALID_TREE)) {
// If we have the block and all of its parents, but have not yet validated it,
// we might be in the middle of connecting it (ie in the unlock of cs_main
// before ActivateBestChain but after AcceptBlock).
// In this case, we need to run ActivateBestChain prior to checking the relay
// conditions below.
CValidationState dummy;
ActivateBestChain(dummy, Params(), a_recent_block);
}
}
}
BlockMap::iterator mi = mapBlockIndex.find(inv.hash);
if (mi != mapBlockIndex.end()) {
send = BlockRequestAllowed(mi->second, consensusParams);
if (!send) {
LogPrint(BCLog::NET, "%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom->GetId());
}
}
const CNetMsgMaker msgMaker(pfrom->GetSendVersion());
// disconnect node in case we have reached the outbound limit for serving historical blocks
// never disconnect whitelisted nodes
if (send && connman->OutboundTargetReached(true) && ( ((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted)
{
LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId());
it++; //disconnect node
pfrom->fDisconnect = true;
send = false;
}
// Avoid leaking prune-height by never sending blocks below the NODE_NETWORK_LIMITED threshold
if (send && !pfrom->fWhitelisted && (
(((pfrom->GetLocalServices() & NODE_NETWORK_LIMITED) == NODE_NETWORK_LIMITED) && ((pfrom->GetLocalServices() & NODE_NETWORK) != NODE_NETWORK) && (chainActive.Tip()->nHeight - mi->second->nHeight > (int)NODE_NETWORK_LIMITED_MIN_BLOCKS + 2 /* add two blocks buffer extension for possible races */) )
)) {
LogPrint(BCLog::NET, "Ignore block request below NODE_NETWORK_LIMITED threshold from peer=%d\n", pfrom->GetId());
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) //disconnect node and prevent it from stalling (would otherwise wait for the missing block)
pfrom->fDisconnect = true;
send = false;
}
// Pruned nodes may have deleted the block, so check whether
// it's available before trying to send.
if (send && (mi->second->nStatus & BLOCK_HAVE_DATA))
{
std::shared_ptr<const CBlock> pblock;
if (a_recent_block && a_recent_block->GetHash() == (*mi).second->GetBlockHash()) {
pblock = a_recent_block;
} else {
// Send block from disk
std::shared_ptr<CBlock> pblockRead = std::make_shared<CBlock>();
if (!ReadBlockFromDisk(*pblockRead, (*mi).second, consensusParams))
assert(!"cannot load block from disk");
pblock = pblockRead;
}
if (inv.type == MSG_BLOCK)
connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock));
else if (inv.type == MSG_WITNESS_BLOCK)
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock));
else if (inv.type == MSG_FILTERED_BLOCK)
{
bool sendMerkleBlock = false;
CMerkleBlock merkleBlock;
{ {
bool send = false; LOCK(pfrom->cs_filter);
BlockMap::iterator mi = mapBlockIndex.find(inv.hash); if (pfrom->pfilter) {
std::shared_ptr<const CBlock> a_recent_block; sendMerkleBlock = true;
std::shared_ptr<const CBlockHeaderAndShortTxIDs> a_recent_compact_block; merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter);
bool fWitnessesPresentInARecentCompactBlock;
{
LOCK(cs_most_recent_block);
a_recent_block = most_recent_block;
a_recent_compact_block = most_recent_compact_block;
fWitnessesPresentInARecentCompactBlock = fWitnessesPresentInMostRecentCompactBlock;
} }
if (mi != mapBlockIndex.end()) }
{ if (sendMerkleBlock) {
if (mi->second->nChainTx && !mi->second->IsValid(BLOCK_VALID_SCRIPTS) && connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock));
mi->second->IsValid(BLOCK_VALID_TREE)) { // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
// If we have the block and all of its parents, but have not yet validated it, // This avoids hurting performance by pointlessly requiring a round-trip
// we might be in the middle of connecting it (ie in the unlock of cs_main // Note that there is currently no way for a node to request any single transactions we didn't send here -
// before ActivateBestChain but after AcceptBlock). // they must either disconnect and retry or request the full block.
// In this case, we need to run ActivateBestChain prior to checking the relay // Thus, the protocol spec specified allows for us to provide duplicate txn here,
// conditions below. // however we MUST always provide at least what the remote peer needs
CValidationState dummy; typedef std::pair<unsigned int, uint256> PairType;
ActivateBestChain(dummy, Params(), a_recent_block); for (PairType& pair : merkleBlock.vMatchedTxn)
} connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *pblock->vtx[pair.first]));
send = BlockRequestAllowed(mi->second, consensusParams); }
if (!send) { // else
LogPrint(BCLog::NET, "%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom->GetId()); // no response
} }
else if (inv.type == MSG_CMPCT_BLOCK)
{
// If a peer is asking for old blocks, we're almost guaranteed
// they won't have a useful mempool to match against a compact block,
// and we don't feel like constructing the object for them, so
// instead we respond with the full, non-compact block.
bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness;
int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) {
if ((fPeerWantsWitness || !fWitnessesPresentInARecentCompactBlock) && a_recent_compact_block && a_recent_compact_block->header.GetHash() == mi->second->GetBlockHash()) {
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *a_recent_compact_block));
} else {
CBlockHeaderAndShortTxIDs cmpctblock(*pblock, fPeerWantsWitness);
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
} }
// disconnect node in case we have reached the outbound limit for serving historical blocks } else {
// never disconnect whitelisted nodes connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, *pblock));
if (send && connman->OutboundTargetReached(true) && ( ((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted) }
{ }
LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId());
//disconnect node // Trigger the peer node to send a getblocks request for the next batch of inventory
pfrom->fDisconnect = true; if (inv.hash == pfrom->hashContinue)
send = false; {
} // Bypass PushInventory, this must send even if redundant,
// Avoid leaking prune-height by never sending blocks below the NODE_NETWORK_LIMITED threshold // and we want it right after the last block so they don't
if (send && !pfrom->fWhitelisted && ( // wait for other stuff first.
(((pfrom->GetLocalServices() & NODE_NETWORK_LIMITED) == NODE_NETWORK_LIMITED) && ((pfrom->GetLocalServices() & NODE_NETWORK) != NODE_NETWORK) && (chainActive.Tip()->nHeight - mi->second->nHeight > (int)NODE_NETWORK_LIMITED_MIN_BLOCKS + 2 /* add two blocks buffer extension for possible races */) ) std::vector<CInv> vInv;
)) { vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash()));
LogPrint(BCLog::NET, "Ignore block request below NODE_NETWORK_LIMITED threshold from peer=%d\n", pfrom->GetId()); connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv));
pfrom->hashContinue.SetNull();
}
}
}
//disconnect node and prevent it from stalling (would otherwise wait for the missing block) void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
pfrom->fDisconnect = true; {
send = false; std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
} std::vector<CInv> vNotFound;
// Pruned nodes may have deleted the block, so check whether const CNetMsgMaker msgMaker(pfrom->GetSendVersion());
// it's available before trying to send. {
if (send && (mi->second->nStatus & BLOCK_HAVE_DATA)) LOCK(cs_main);
{
std::shared_ptr<const CBlock> pblock;
if (a_recent_block && a_recent_block->GetHash() == (*mi).second->GetBlockHash()) {
pblock = a_recent_block;
} else {
// Send block from disk
std::shared_ptr<CBlock> pblockRead = std::make_shared<CBlock>();
if (!ReadBlockFromDisk(*pblockRead, (*mi).second, consensusParams))
assert(!"cannot load block from disk");
pblock = pblockRead;
}
if (inv.type == MSG_BLOCK)
connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock));
else if (inv.type == MSG_WITNESS_BLOCK)
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock));
else if (inv.type == MSG_FILTERED_BLOCK)
{
bool sendMerkleBlock = false;
CMerkleBlock merkleBlock;
{
LOCK(pfrom->cs_filter);
if (pfrom->pfilter) {
sendMerkleBlock = true;
merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter);
}
}
if (sendMerkleBlock) {
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock));
// CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
// This avoids hurting performance by pointlessly requiring a round-trip
// Note that there is currently no way for a node to request any single transactions we didn't send here -
// they must either disconnect and retry or request the full block.
// Thus, the protocol spec specified allows for us to provide duplicate txn here,
// however we MUST always provide at least what the remote peer needs
typedef std::pair<unsigned int, uint256> PairType;
for (PairType& pair : merkleBlock.vMatchedTxn)
connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *pblock->vtx[pair.first]));
}
// else
// no response
}
else if (inv.type == MSG_CMPCT_BLOCK)
{
// If a peer is asking for old blocks, we're almost guaranteed
// they won't have a useful mempool to match against a compact block,
// and we don't feel like constructing the object for them, so
// instead we respond with the full, non-compact block.
bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness;
int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) {
if ((fPeerWantsWitness || !fWitnessesPresentInARecentCompactBlock) && a_recent_compact_block && a_recent_compact_block->header.GetHash() == mi->second->GetBlockHash()) {
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *a_recent_compact_block));
} else {
CBlockHeaderAndShortTxIDs cmpctblock(*pblock, fPeerWantsWitness);
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
}
} else {
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, *pblock));
}
}
// Trigger the peer node to send a getblocks request for the next batch of inventory while (it != pfrom->vRecvGetData.end() && (it->type == MSG_TX || it->type == MSG_WITNESS_TX)) {
if (inv.hash == pfrom->hashContinue) if (interruptMsgProc)
{ return;
// Bypass PushInventory, this must send even if redundant, // Don't bother if send buffer is too full to respond anyway
// and we want it right after the last block so they don't if (pfrom->fPauseSend)
// wait for other stuff first. break;
std::vector<CInv> vInv;
vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash())); const CInv &inv = *it;
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv)); it++;
pfrom->hashContinue.SetNull();
} // Send stream from relay memory
} bool push = false;
} auto mi = mapRelay.find(inv.hash);
else if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) int nSendFlags = (inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0);
{ if (mi != mapRelay.end()) {
// Send stream from relay memory connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second));
bool push = false; push = true;
auto mi = mapRelay.find(inv.hash); } else if (pfrom->timeLastMempoolReq) {
int nSendFlags = (inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0); auto txinfo = mempool.info(inv.hash);
if (mi != mapRelay.end()) { // To protect privacy, do not answer getdata using the mempool when
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second)); // that TX couldn't have been INVed in reply to a MEMPOOL request.
if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) {
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx));
push = true; push = true;
} else if (pfrom->timeLastMempoolReq) {
auto txinfo = mempool.info(inv.hash);
// To protect privacy, do not answer getdata using the mempool when
// that TX couldn't have been INVed in reply to a MEMPOOL request.
if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) {
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx));
push = true;
}
}
if (!push) {
vNotFound.push_back(inv);
} }
} }
if (!push) {
vNotFound.push_back(inv);
}
// Track requests for our stuff. // Track requests for our stuff.
GetMainSignals().Inventory(inv.hash); GetMainSignals().Inventory(inv.hash);
}
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) if (it != pfrom->vRecvGetData.end()) {
break; const CInv &inv = *it;
it++;
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) {
ProcessGetBlockData(pfrom, consensusParams, inv, connman, interruptMsgProc);
}
} }
} } // release cs_main
pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it); pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it);

Loading…
Cancel
Save