Merge pull request #2840 from sipa/nosendlock

Allow SendMessages to run partially without cs_main
This commit is contained in:
Gavin Andresen 2013-10-20 19:06:53 -07:00
commit 749230d05c
2 changed files with 38 additions and 22 deletions

View File

@ -3234,6 +3234,8 @@ void static ProcessGetData(CNode* pfrom)
vector<CInv> vNotFound; vector<CInv> vNotFound;
LOCK(cs_main);
while (it != pfrom->vRecvGetData.end()) { while (it != pfrom->vRecvGetData.end()) {
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
if (pfrom->nSendSize >= SendBufferSize()) if (pfrom->nSendSize >= SendBufferSize())
@ -3406,7 +3408,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
pfrom->fClient = !(pfrom->nServices & NODE_NETWORK); pfrom->fClient = !(pfrom->nServices & NODE_NETWORK);
AddTimeData(pfrom->addr, nTime);
// Change version // Change version
pfrom->PushMessage("verack"); pfrom->PushMessage("verack");
@ -3448,6 +3449,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
LogPrintf("receive version message: version %d, blocks=%d, us=%s, them=%s, peer=%s\n", pfrom->nVersion, pfrom->nStartingHeight, addrMe.ToString().c_str(), addrFrom.ToString().c_str(), pfrom->addr.ToString().c_str()); LogPrintf("receive version message: version %d, blocks=%d, us=%s, them=%s, peer=%s\n", pfrom->nVersion, pfrom->nStartingHeight, addrMe.ToString().c_str(), addrFrom.ToString().c_str(), pfrom->addr.ToString().c_str());
LOCK(cs_main);
AddTimeData(pfrom->addr, nTime);
cPeerBlockCounts.input(pfrom->nStartingHeight); cPeerBlockCounts.input(pfrom->nStartingHeight);
} }
@ -3551,6 +3554,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
break; break;
} }
} }
LOCK(cs_main);
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) for (unsigned int nInv = 0; nInv < vInv.size(); nInv++)
{ {
const CInv &inv = vInv[nInv]; const CInv &inv = vInv[nInv];
@ -3608,6 +3614,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
uint256 hashStop; uint256 hashStop;
vRecv >> locator >> hashStop; vRecv >> locator >> hashStop;
LOCK(cs_main);
// Find the last block the caller has in the main chain // Find the last block the caller has in the main chain
CBlockIndex* pindex = chainActive.FindFork(locator); CBlockIndex* pindex = chainActive.FindFork(locator);
@ -3642,6 +3650,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
uint256 hashStop; uint256 hashStop;
vRecv >> locator >> hashStop; vRecv >> locator >> hashStop;
LOCK(cs_main);
CBlockIndex* pindex = NULL; CBlockIndex* pindex = NULL;
if (locator.IsNull()) if (locator.IsNull())
{ {
@ -3683,6 +3693,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
CInv inv(MSG_TX, tx.GetHash()); CInv inv(MSG_TX, tx.GetHash());
pfrom->AddInventoryKnown(inv); pfrom->AddInventoryKnown(inv);
LOCK(cs_main);
bool fMissingInputs = false; bool fMissingInputs = false;
CValidationState state; CValidationState state;
if (mempool.accept(state, tx, true, &fMissingInputs)) if (mempool.accept(state, tx, true, &fMissingInputs))
@ -3757,6 +3769,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
CInv inv(MSG_BLOCK, block.GetHash()); CInv inv(MSG_BLOCK, block.GetHash());
pfrom->AddInventoryKnown(inv); pfrom->AddInventoryKnown(inv);
LOCK(cs_main);
CValidationState state; CValidationState state;
if (ProcessBlock(state, pfrom, &block)) if (ProcessBlock(state, pfrom, &block))
mapAlreadyAskedFor.erase(inv); mapAlreadyAskedFor.erase(inv);
@ -3778,6 +3792,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
else if (strCommand == "mempool") else if (strCommand == "mempool")
{ {
LOCK(cs_main);
std::vector<uint256> vtxid; std::vector<uint256> vtxid;
LOCK2(mempool.cs, pfrom->cs_filter); LOCK2(mempool.cs, pfrom->cs_filter);
mempool.queryHashes(vtxid); mempool.queryHashes(vtxid);
@ -4043,10 +4059,7 @@ bool ProcessMessages(CNode* pfrom)
bool fRet = false; bool fRet = false;
try try
{ {
{ fRet = ProcessMessage(pfrom, strCommand, vRecv);
LOCK(cs_main);
fRet = ProcessMessage(pfrom, strCommand, vRecv);
}
boost::this_thread::interruption_point(); boost::this_thread::interruption_point();
} }
catch (std::ios_base::failure& e) catch (std::ios_base::failure& e)
@ -4089,8 +4102,7 @@ bool ProcessMessages(CNode* pfrom)
bool SendMessages(CNode* pto, bool fSendTrickle) bool SendMessages(CNode* pto, bool fSendTrickle)
{ {
TRY_LOCK(cs_main, lockMain); {
if (lockMain) {
// Don't send anything until we get their version message // Don't send anything until we get their version message
if (pto->nVersion == 0) if (pto->nVersion == 0)
return true; return true;
@ -4125,20 +4137,6 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
} }
} }
// Start block sync
if (pto->fStartSync && !fImporting && !fReindex) {
pto->fStartSync = false;
PushGetBlocks(pto, chainActive.Tip(), uint256(0));
}
// Resend wallet transactions that haven't gotten in a block yet
// Except during reindex, importing and IBD, when old wallet
// transactions become unconfirmed and spams other nodes.
if (!fReindex && !fImporting && !IsInitialBlockDownload())
{
ResendWalletTransactions();
}
// Address refresh broadcast // Address refresh broadcast
static int64 nLastRebroadcast; static int64 nLastRebroadcast;
if (!IsInitialBlockDownload() && (GetTime() - nLastRebroadcast > 24 * 60 * 60)) if (!IsInitialBlockDownload() && (GetTime() - nLastRebroadcast > 24 * 60 * 60))
@ -4189,6 +4187,23 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
pto->PushMessage("addr", vAddr); pto->PushMessage("addr", vAddr);
} }
TRY_LOCK(cs_main, lockMain);
if (!lockMain)
return true;
// Start block sync
if (pto->fStartSync && !fImporting && !fReindex) {
pto->fStartSync = false;
PushGetBlocks(pto, chainActive.Tip(), uint256(0));
}
// Resend wallet transactions that haven't gotten in a block yet
// Except during reindex, importing and IBD, when old wallet
// transactions become unconfirmed and spams other nodes.
if (!fReindex && !fImporting && !IsInitialBlockDownload())
{
ResendWalletTransactions();
}
// //
// Message: inventory // Message: inventory

View File

@ -801,7 +801,8 @@ void ThreadSocketHandler()
vNodesDisconnected.push_back(pnode); vNodesDisconnected.push_back(pnode);
} }
} }
}
{
// Delete disconnected nodes // Delete disconnected nodes
list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected; list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
BOOST_FOREACH(CNode* pnode, vNodesDisconnectedCopy) BOOST_FOREACH(CNode* pnode, vNodesDisconnectedCopy)