From 53347f0cb99e514815e44a56439a4a10012238f8 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 16 Apr 2016 19:13:12 -0400 Subject: [PATCH] net: create generic functor accessors and move vNodes to CConnman --- src/main.cpp | 85 ++++++++++++++++++++++++-------------- src/net.cpp | 83 ++++++++++++++++++++++++++++--------- src/net.h | 27 +++++++----- src/rpc/misc.cpp | 9 ++-- src/rpc/net.cpp | 13 +++--- src/rpc/rawtransaction.cpp | 9 +++- src/wallet/wallet.cpp | 11 ++++- 7 files changed, 164 insertions(+), 73 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index f3babb71d..af598d487 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -470,7 +470,7 @@ void UpdateBlockAvailability(NodeId nodeid, const uint256 &hash) { } } -void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pfrom) { +void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pfrom, CConnman& connman) { if (nLocalServices & NODE_WITNESS) { // Don't ever request compact blocks when segwit is enabled. return; @@ -484,11 +484,12 @@ void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pf if (lNodesAnnouncingHeaderAndIDs.size() >= 3) { // As per BIP152, we only get 3 of our peers to announce // blocks using compact encodings. - CNode* pnodeStop = FindNode(lNodesAnnouncingHeaderAndIDs.front()); - if (pnodeStop) { + bool found = connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){ pnodeStop->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion); + return true; + }); + if(found) lNodesAnnouncingHeaderAndIDs.pop_front(); - } } fAnnounceUsingCMPCTBLOCK = true; pfrom->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion); @@ -3089,15 +3090,15 @@ bool ActivateBestChain(CValidationState &state, const CChainParams& chainparams, int nBlockEstimate = 0; if (fCheckpointsEnabled) nBlockEstimate = Checkpoints::GetTotalBlocksEstimate(chainparams.Checkpoints()); - { - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) { + if(connman) { + connman->ForEachNode([nNewHeight, nBlockEstimate, &vHashes](CNode* pnode) { if (nNewHeight > (pnode->nStartingHeight != -1 ? pnode->nStartingHeight - 2000 : nBlockEstimate)) { BOOST_REVERSE_FOREACH(const uint256& hash, vHashes) { pnode->PushBlockHash(hash); } } - } + return true; + }); } // Notify external listeners about the new tip. if (!vHashes.empty()) { @@ -4726,6 +4727,45 @@ bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) return true; } +static void RelayTransaction(const CTransaction& tx, CConnman& connman) +{ + CInv inv(MSG_TX, tx.GetHash()); + connman.ForEachNode([&inv](CNode* pnode) + { + pnode->PushInventory(inv); + return true; + }); +} + +static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connman) +{ + int nRelayNodes = fReachable ? 2 : 1; // limited relaying of addresses outside our network(s) + + // Relay to a limited number of other nodes + // Use deterministic randomness to send to the same nodes for 24 hours + // at a time so the addrKnowns of the chosen nodes prevent repeats + static const uint64_t salt0 = GetRand(std::numeric_limits::max()); + static const uint64_t salt1 = GetRand(std::numeric_limits::max()); + uint64_t hashAddr = addr.GetHash(); + std::multimap mapMix; + const CSipHasher hasher = CSipHasher(salt0, salt1).Write(hashAddr << 32).Write((GetTime() + hashAddr) / (24*60*60)); + + auto sortfunc = [&mapMix, &hasher](CNode* pnode) { + if (pnode->nVersion >= CADDR_TIME_VERSION) { + uint64_t hashKey = CSipHasher(hasher).Write(pnode->id).Finalize(); + mapMix.emplace(hashKey, pnode); + } + return true; + }; + + auto pushfunc = [&addr, &mapMix, &nRelayNodes] { + for (auto mi = mapMix.begin(); mi != mapMix.end() && nRelayNodes-- > 0; ++mi) + mi->second->PushAddress(addr); + }; + + connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); +} + void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams) { std::deque::iterator it = pfrom->vRecvGetData.begin(); @@ -5135,26 +5175,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, if (addr.nTime > nSince && !pfrom->fGetAddr && vAddr.size() <= 10 && addr.IsRoutable()) { // Relay to a limited number of other nodes - { - LOCK(cs_vNodes); - // Use deterministic randomness to send to the same nodes for 24 hours - // at a time so the addrKnowns of the chosen nodes prevent repeats - static const uint64_t salt0 = GetRand(std::numeric_limits::max()); - static const uint64_t salt1 = GetRand(std::numeric_limits::max()); - uint64_t hashAddr = addr.GetHash(); - multimap mapMix; - const CSipHasher hasher = CSipHasher(salt0, salt1).Write(hashAddr << 32).Write((GetTime() + hashAddr) / (24*60*60)); - BOOST_FOREACH(CNode* pnode, vNodes) - { - if (pnode->nVersion < CADDR_TIME_VERSION) - continue; - uint64_t hashKey = CSipHasher(hasher).Write(pnode->id).Finalize(); - mapMix.insert(make_pair(hashKey, pnode)); - } - int nRelayNodes = fReachable ? 2 : 1; // limited relaying of addresses outside our network(s) - for (multimap::iterator mi = mapMix.begin(); mi != mapMix.end() && nRelayNodes-- > 0; ++mi) - ((*mi).second)->PushAddress(addr); - } + RelayAddress(addr, fReachable, connman); } // Do not store addresses outside our network if (fReachable) @@ -5448,7 +5469,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, tx, true, &fMissingInputs)) { mempool.check(pcoinsTip); - RelayTransaction(tx); + RelayTransaction(tx, connman); for (unsigned int i = 0; i < tx.vout.size(); i++) { vWorkQueue.emplace_back(inv.hash, i); } @@ -5485,7 +5506,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, continue; if (AcceptToMemoryPool(mempool, stateDummy, orphanTx, true, &fMissingInputs2)) { LogPrint("mempool", " accepted orphan tx %s\n", orphanHash.ToString()); - RelayTransaction(orphanTx); + RelayTransaction(orphanTx, connman); for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { vWorkQueue.emplace_back(orphanHash, i); } @@ -5560,7 +5581,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int nDoS = 0; if (!state.IsInvalid(nDoS) || nDoS == 0) { LogPrintf("Force relaying tx %s from whitelisted peer=%d\n", tx.GetHash().ToString(), pfrom->id); - RelayTransaction(tx); + RelayTransaction(tx, connman); } else { LogPrintf("Not relaying invalid transaction %s from whitelisted peer=%d (%s)\n", tx.GetHash().ToString(), pfrom->id, FormatStateMessage(state)); } @@ -5886,7 +5907,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, if (nodestate->fProvidesHeaderAndIDs && vGetData.size() == 1 && mapBlocksInFlight.size() == 1 && pindexLast->pprev->IsValid(BLOCK_VALID_CHAIN) && !(nLocalServices & NODE_WITNESS)) { // We seem to be rather well-synced, so it appears pfrom was the first to provide us // with this block! Let's get them to announce using compact blocks in the future. - MaybeSetPeerAsAnnouncingHeaderAndIDs(nodestate, pfrom); + MaybeSetPeerAsAnnouncingHeaderAndIDs(nodestate, pfrom, connman); // In any case, we want to download using a compact block, not a regular one vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash); } diff --git a/src/net.cpp b/src/net.cpp index e1cfc565d..f20f63e04 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -87,8 +87,6 @@ uint64_t nLocalHostNonce = 0; int nMaxConnections = DEFAULT_MAX_PEER_CONNECTIONS; std::string strSubVersion; -std::vector vNodes; -CCriticalSection cs_vNodes; limitedmap mapAlreadyAskedFor(MAX_INV_SZ); NodeId nLastNodeId = 0; @@ -315,7 +313,7 @@ uint64_t CNode::nMaxOutboundTotalBytesSentInCycle = 0; uint64_t CNode::nMaxOutboundTimeframe = 60*60*24; //1 day uint64_t CNode::nMaxOutboundCycleStartTime = 0; -CNode* FindNode(const CNetAddr& ip) +CNode* CConnman::FindNode(const CNetAddr& ip) { LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) @@ -324,7 +322,7 @@ CNode* FindNode(const CNetAddr& ip) return NULL; } -CNode* FindNode(const CSubNet& subNet) +CNode* CConnman::FindNode(const CSubNet& subNet) { LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) @@ -333,7 +331,7 @@ CNode* FindNode(const CSubNet& subNet) return NULL; } -CNode* FindNode(const std::string& addrName) +CNode* CConnman::FindNode(const std::string& addrName) { LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) @@ -342,7 +340,7 @@ CNode* FindNode(const std::string& addrName) return NULL; } -CNode* FindNode(const CService& addr) +CNode* CConnman::FindNode(const CService& addr) { LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) @@ -351,16 +349,6 @@ CNode* FindNode(const CService& addr) return NULL; } -//TODO: This is used in only one place in main, and should be removed -CNode* FindNode(const NodeId nodeid) -{ - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode->GetId() == nodeid) - return (pnode); - return NULL; -} - CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure) { if (pszDest == NULL) { @@ -899,7 +887,8 @@ static bool CompareNodeTXTime(const NodeEvictionCandidate &a, const NodeEviction * to forge. In order to partition a node the attacker must be * simultaneously better at all of them than honest peers. */ -static bool AttemptToEvictConnection() { +bool CConnman::AttemptToEvictConnection() +{ std::vector vEvictionCandidates; { LOCK(cs_vNodes); @@ -2320,7 +2309,6 @@ bool CConnman::DisconnectNode(const std::string& strNode) } return false; } - bool CConnman::DisconnectNode(NodeId id) { LOCK(cs_vNodes); @@ -2333,7 +2321,7 @@ bool CConnman::DisconnectNode(NodeId id) return false; } -void RelayTransaction(const CTransaction& tx) +void CConnman::RelayTransaction(const CTransaction& tx) { CInv inv(MSG_TX, tx.GetHash()); LOCK(cs_vNodes); @@ -2671,6 +2659,63 @@ void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend) LEAVE_CRITICAL_SECTION(cs_vSend); } +bool CConnman::ForNode(NodeId id, std::function func) +{ + CNode* found = nullptr; + LOCK(cs_vNodes); + for (auto&& pnode : vNodes) { + if(pnode->id == id) { + found = pnode; + break; + } + } + return found != nullptr && func(found); +} + +bool CConnman::ForEachNode(std::function func) +{ + LOCK(cs_vNodes); + for (auto&& node : vNodes) + if(!func(node)) + return false; + return true; +} + +bool CConnman::ForEachNode(std::function func) const +{ + LOCK(cs_vNodes); + for (const auto& node : vNodes) + if(!func(node)) + return false; + return true; +} + +bool CConnman::ForEachNodeThen(std::function pre, std::function post) +{ + bool ret = true; + LOCK(cs_vNodes); + for (auto&& node : vNodes) + if(!pre(node)) { + ret = false; + break; + } + post(); + return ret; +} + +bool CConnman::ForEachNodeThen(std::function pre, std::function post) const +{ + bool ret = true; + LOCK(cs_vNodes); + for (const auto& node : vNodes) + if(!pre(node)) { + ret = false; + break; + } + post(); + return ret; +} + int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) { return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5); } diff --git a/src/net.h b/src/net.h index 9e2408cd7..5b1e80bfb 100644 --- a/src/net.h +++ b/src/net.h @@ -95,12 +95,7 @@ struct AddedNodeInfo bool fInbound; }; -CNode* FindNode(const CNetAddr& ip); -CNode* FindNode(const CSubNet& subNet); -CNode* FindNode(const std::string& addrName); -CNode* FindNode(const CService& ip); -CNode* FindNode(const NodeId id); //TODO: Remove this - +class CTransaction; class CNodeStats; class CConnman { @@ -120,6 +115,14 @@ public: bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); bool OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false, bool fFeeler = false); + bool ForNode(NodeId id, std::function func); + bool ForEachNode(std::function func); + bool ForEachNode(std::function func) const; + bool ForEachNodeThen(std::function pre, std::function post); + bool ForEachNodeThen(std::function pre, std::function post) const; + + void RelayTransaction(const CTransaction& tx); + // Addrman functions size_t GetAddressCount() const; void SetServices(const CService &addr, ServiceFlags nServices); @@ -182,6 +185,12 @@ private: void ThreadSocketHandler(); void ThreadDNSAddressSeed(); + CNode* FindNode(const CNetAddr& ip); + CNode* FindNode(const CSubNet& subNet); + CNode* FindNode(const std::string& addrName); + CNode* FindNode(const CService& addr); + + bool AttemptToEvictConnection(); CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure); void DeleteNode(CNode* pnode); //!check is the banlist has unwritten changes @@ -204,6 +213,8 @@ private: CCriticalSection cs_vOneShots; std::vector vAddedNodes; CCriticalSection cs_vAddedNodes; + std::vector vNodes; + mutable CCriticalSection cs_vNodes; }; extern std::unique_ptr g_connman; void MapPort(bool fUseUPnP); @@ -279,8 +290,6 @@ extern uint64_t nLocalHostNonce; /** Maximum number of connections to simultaneously allow (aka connection slots) */ extern int nMaxConnections; -extern std::vector vNodes; -extern CCriticalSection cs_vNodes; extern limitedmap mapAlreadyAskedFor; extern NodeId nLastNodeId; @@ -828,8 +837,6 @@ public: -class CTransaction; -void RelayTransaction(const CTransaction& tx); /** Return a timestamp in the future (in microseconds) for exponentially distributed events. */ diff --git a/src/rpc/misc.cpp b/src/rpc/misc.cpp index f9f161561..ffd377b48 100644 --- a/src/rpc/misc.cpp +++ b/src/rpc/misc.cpp @@ -472,14 +472,17 @@ UniValue setmocktime(const UniValue& params, bool fHelp) // atomically with the time change to prevent peers from being // disconnected because we think we haven't communicated with them // in a long time. - LOCK2(cs_main, cs_vNodes); + LOCK(cs_main); RPCTypeCheck(params, boost::assign::list_of(UniValue::VNUM)); SetMockTime(params[0].get_int64()); uint64_t t = GetTime(); - BOOST_FOREACH(CNode* pnode, vNodes) { - pnode->nLastSend = pnode->nLastRecv = t; + if(g_connman) { + g_connman->ForEachNode([t](CNode* pnode) { + pnode->nLastSend = pnode->nLastRecv = t; + return true; + }); } return NullUniValue; diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index 0d494a2e7..509b57aa7 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -55,13 +55,14 @@ UniValue ping(const UniValue& params, bool fHelp) + HelpExampleRpc("ping", "") ); - // Request that each node send a ping during next message processing pass - LOCK2(cs_main, cs_vNodes); - - BOOST_FOREACH(CNode* pNode, vNodes) { - pNode->fPingQueued = true; - } + if(!g_connman) + throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled"); + // Request that each node send a ping during next message processing pass + g_connman->ForEachNode([](CNode* pnode) { + pnode->fPingQueued = true; + return true; + }); return NullUniValue; } diff --git a/src/rpc/rawtransaction.cpp b/src/rpc/rawtransaction.cpp index 9461a7280..3daf1681f 100644 --- a/src/rpc/rawtransaction.cpp +++ b/src/rpc/rawtransaction.cpp @@ -891,8 +891,15 @@ UniValue sendrawtransaction(const UniValue& params, bool fHelp) } else if (fHaveChain) { throw JSONRPCError(RPC_TRANSACTION_ALREADY_IN_CHAIN, "transaction already in block chain"); } - RelayTransaction(tx); + if(!g_connman) + throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled"); + CInv inv(MSG_TX, hashTx); + g_connman->ForEachNode([&inv](CNode* pnode) + { + pnode->PushInventory(inv); + return true; + }); return hashTx.GetHex(); } diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 4396c2a2b..fd8b056bf 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1460,8 +1460,15 @@ bool CWalletTx::RelayWalletTransaction(CConnman* connman) { if (GetDepthInMainChain() == 0 && !isAbandoned() && InMempool()) { LogPrintf("Relaying wtx %s\n", GetHash().ToString()); - RelayTransaction((CTransaction)*this); - return true; + if (connman) { + CInv inv(MSG_TX, GetHash()); + connman->ForEachNode([&inv](CNode* pnode) + { + pnode->PushInventory(inv); + return true; + }); + return true; + } } } return false;