Browse Source

Merge #8708: net: have CConnman handle message sending

9027680 net: handle version push in InitializeNode (Cory Fields)
7588b85 net: construct CNodeStates in place (Cory Fields)
440f1d3 net: remove now-unused ssSend and Fuzz (Cory Fields)
5c2169c drop the optimistic write counter hack (Cory Fields)
ea33268 net: switch all callers to connman for pushing messages (Cory Fields)
3e32cd0 connman is in charge of pushing messages (Cory Fields)
b98c14c serialization: teach serializers variadics (Cory Fields)
0.14
Wladimir J. van der Laan 8 years ago
parent
commit
c8c572f8f1
No known key found for this signature in database
GPG Key ID: 74810B012346C9A6
  1. 154
      src/main.cpp
  2. 157
      src/net.cpp
  3. 250
      src/net.h
  4. 49
      src/serialize.h
  5. 7
      src/streams.h
  6. 12
      src/test/DoS_tests.cpp
  7. 71
      src/test/serialize_tests.cpp

154
src/main.cpp

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
#include "init.h"
#include "merkleblock.h"
#include "net.h"
#include "netbase.h"
#include "policy/fees.h"
#include "policy/policy.h"
#include "pow.h"
@ -257,7 +258,7 @@ struct CBlockReject { @@ -257,7 +258,7 @@ struct CBlockReject {
*/
struct CNodeState {
//! The peer's address
CService address;
const CService address;
//! Whether we have a fully established connection.
bool fCurrentlyConnected;
//! Accumulated misbehaviour score for this peer.
@ -265,7 +266,7 @@ struct CNodeState { @@ -265,7 +266,7 @@ struct CNodeState {
//! Whether this peer should be disconnected and banned (unless whitelisted).
bool fShouldBan;
//! String name of this peer (debugging/logging purposes).
std::string name;
const std::string name;
//! List of asynchronously-determined block rejections to notify this peer about.
std::vector<CBlockReject> rejects;
//! The best known block we know this peer has announced.
@ -309,7 +310,7 @@ struct CNodeState { @@ -309,7 +310,7 @@ struct CNodeState {
*/
bool fSupportsDesiredCmpctVersion;
CNodeState() {
CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) {
fCurrentlyConnected = false;
nMisbehavior = 0;
fShouldBan = false;
@ -354,11 +355,36 @@ void UpdatePreferredDownload(CNode* node, CNodeState* state) @@ -354,11 +355,36 @@ void UpdatePreferredDownload(CNode* node, CNodeState* state)
nPreferredDownload += state->fPreferredDownload;
}
void InitializeNode(NodeId nodeid, const CNode *pnode) {
LOCK(cs_main);
CNodeState &state = mapNodeState.insert(std::make_pair(nodeid, CNodeState())).first->second;
state.name = pnode->addrName;
state.address = pnode->addr;
void PushNodeVersion(CNode *pnode, CConnman& connman, int64_t nTime)
{
ServiceFlags nLocalNodeServices = pnode->GetLocalServices();
uint64_t nonce = pnode->GetLocalNonce();
int nNodeStartingHeight = pnode->GetMyStartingHeight();
NodeId nodeid = pnode->GetId();
CAddress addr = pnode->addr;
CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
CAddress addrMe = CAddress(CService(), nLocalNodeServices);
connman.PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes);
if (fLogIPs)
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), nodeid);
else
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), nodeid);
}
void InitializeNode(CNode *pnode, CConnman& connman) {
CAddress addr = pnode->addr;
std::string addrName = pnode->addrName;
NodeId nodeid = pnode->GetId();
{
LOCK(cs_main);
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName)));
}
if(!pnode->fInbound)
PushNodeVersion(pnode, connman, GetTime());
}
void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
@ -501,15 +527,15 @@ void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pf @@ -501,15 +527,15 @@ void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pf
if (lNodesAnnouncingHeaderAndIDs.size() >= 3) {
// As per BIP152, we only get 3 of our peers to announce
// blocks using compact encodings.
bool found = connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){
pnodeStop->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
bool found = connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){
connman.PushMessage(pnodeStop, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
return true;
});
if(found)
lNodesAnnouncingHeaderAndIDs.pop_front();
}
fAnnounceUsingCMPCTBLOCK = true;
pfrom->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
connman.PushMessage(pfrom, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId());
}
}
@ -4900,9 +4926,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam @@ -4900,9 +4926,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
if (!ReadBlockFromDisk(block, (*mi).second, consensusParams))
assert(!"cannot load block from disk");
if (inv.type == MSG_BLOCK)
pfrom->PushMessageWithFlag(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block);
connman.PushMessageWithFlag(pfrom, SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block);
else if (inv.type == MSG_WITNESS_BLOCK)
pfrom->PushMessage(NetMsgType::BLOCK, block);
connman.PushMessage(pfrom, NetMsgType::BLOCK, block);
else if (inv.type == MSG_FILTERED_BLOCK)
{
bool sendMerkleBlock = false;
@ -4915,7 +4941,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam @@ -4915,7 +4941,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
}
}
if (sendMerkleBlock) {
pfrom->PushMessage(NetMsgType::MERKLEBLOCK, merkleBlock);
connman.PushMessage(pfrom, NetMsgType::MERKLEBLOCK, merkleBlock);
// CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
// This avoids hurting performance by pointlessly requiring a round-trip
// Note that there is currently no way for a node to request any single transactions we didn't send here -
@ -4924,7 +4950,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam @@ -4924,7 +4950,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// however we MUST always provide at least what the remote peer needs
typedef std::pair<unsigned int, uint256> PairType;
BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn)
pfrom->PushMessageWithFlag(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, block.vtx[pair.first]);
connman.PushMessageWithFlag(pfrom, SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, block.vtx[pair.first]);
}
// else
// no response
@ -4938,9 +4964,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam @@ -4938,9 +4964,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness;
if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) {
CBlockHeaderAndShortTxIDs cmpctblock(block, fPeerWantsWitness);
pfrom->PushMessageWithFlag(fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::CMPCTBLOCK, cmpctblock);
connman.PushMessageWithFlag(pfrom, fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::CMPCTBLOCK, cmpctblock);
} else
pfrom->PushMessageWithFlag(fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block);
connman.PushMessageWithFlag(pfrom, fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block);
}
// Trigger the peer node to send a getblocks request for the next batch of inventory
@ -4951,7 +4977,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam @@ -4951,7 +4977,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// wait for other stuff first.
vector<CInv> vInv;
vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash()));
pfrom->PushMessage(NetMsgType::INV, vInv);
connman.PushMessage(pfrom, NetMsgType::INV, vInv);
pfrom->hashContinue.SetNull();
}
}
@ -4962,14 +4988,14 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam @@ -4962,14 +4988,14 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
bool push = false;
auto mi = mapRelay.find(inv.hash);
if (mi != mapRelay.end()) {
pfrom->PushMessageWithFlag(inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0, NetMsgType::TX, *mi->second);
connman.PushMessageWithFlag(pfrom, inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0, NetMsgType::TX, *mi->second);
push = true;
} else if (pfrom->timeLastMempoolReq) {
auto txinfo = mempool.info(inv.hash);
// To protect privacy, do not answer getdata using the mempool when
// that TX couldn't have been INVed in reply to a MEMPOOL request.
if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) {
pfrom->PushMessageWithFlag(inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0, NetMsgType::TX, *txinfo.tx);
connman.PushMessageWithFlag(pfrom, inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0, NetMsgType::TX, *txinfo.tx);
push = true;
}
}
@ -4996,7 +5022,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam @@ -4996,7 +5022,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// do that because they want to know about (and store and rebroadcast and
// risk analyze) the dependencies of transactions relevant to them, without
// having to download the entire memory pool.
pfrom->PushMessage(NetMsgType::NOTFOUND, vNotFound);
connman.PushMessage(pfrom, NetMsgType::NOTFOUND, vNotFound);
}
}
@ -5047,7 +5073,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5047,7 +5073,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Each connection can only send one version message
if (pfrom->nVersion != 0)
{
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message"));
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message"));
LOCK(cs_main);
Misbehaving(pfrom->GetId(), 1);
return false;
@ -5067,7 +5093,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5067,7 +5093,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
if (pfrom->nServicesExpected & ~pfrom->nServices)
{
LogPrint("net", "peer=%d does not offer the expected services (%08x offered, %08x expected); disconnecting\n", pfrom->id, pfrom->nServices, pfrom->nServicesExpected);
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
strprintf("Expected to offer services %08x", pfrom->nServicesExpected));
pfrom->fDisconnect = true;
return false;
@ -5077,7 +5103,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5077,7 +5103,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
{
// disconnect from peers older than this proto version
LogPrintf("peer=%d using obsolete version %i; disconnecting\n", pfrom->id, pfrom->nVersion);
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
strprintf("Version must be %d or greater", MIN_PEER_PROTO_VERSION));
pfrom->fDisconnect = true;
return false;
@ -5118,7 +5144,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5118,7 +5144,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Be shy and don't send version until we hear
if (pfrom->fInbound)
pfrom->PushVersion();
PushNodeVersion(pfrom, connman, GetAdjustedTime());
pfrom->fClient = !(pfrom->nServices & NODE_NETWORK);
@ -5135,8 +5161,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5135,8 +5161,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
// Change version
pfrom->PushMessage(NetMsgType::VERACK);
pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::VERACK);
pfrom->SetSendVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
if (!pfrom->fInbound)
{
@ -5159,7 +5185,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5159,7 +5185,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Get recent addresses
if (pfrom->fOneShot || pfrom->nVersion >= CADDR_TIME_VERSION || connman.GetAddressCount() < 1000)
{
pfrom->PushMessage(NetMsgType::GETADDR);
connman.PushMessage(pfrom, NetMsgType::GETADDR);
pfrom->fGetAddr = true;
}
connman.MarkAddressGood(pfrom->addr);
@ -5206,7 +5232,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5206,7 +5232,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// We send this to non-NODE NETWORK peers as well, because even
// non-NODE NETWORK peers can announce blocks (such as pruning
// nodes)
pfrom->PushMessage(NetMsgType::SENDHEADERS);
connman.PushMessage(pfrom, NetMsgType::SENDHEADERS);
}
if (pfrom->nVersion >= SHORT_IDS_BLOCKS_VERSION) {
// Tell our peer we are willing to provide version 1 or 2 cmpctblocks
@ -5217,9 +5243,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5217,9 +5243,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
bool fAnnounceUsingCMPCTBLOCK = false;
uint64_t nCMPCTBLOCKVersion = 2;
if (pfrom->GetLocalServices() & NODE_WITNESS)
pfrom->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
connman.PushMessage(pfrom, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
nCMPCTBLOCKVersion = 1;
pfrom->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
connman.PushMessage(pfrom, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
}
}
@ -5347,7 +5373,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5347,7 +5373,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// time the block arrives, the header chain leading up to it is already validated. Not
// doing this will result in the received block being rejected as an orphan in case it is
// not a direct successor.
pfrom->PushMessage(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), inv.hash);
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), inv.hash);
CNodeState *nodestate = State(pfrom->GetId());
if (CanDirectFetch(chainparams.GetConsensus()) &&
nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER &&
@ -5383,7 +5409,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5383,7 +5409,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
if (!vToFetch.empty())
pfrom->PushMessage(NetMsgType::GETDATA, vToFetch);
connman.PushMessage(pfrom, NetMsgType::GETDATA, vToFetch);
}
@ -5483,7 +5509,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5483,7 +5509,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
resp.txn[i] = block.vtx[req.indexes[i]];
}
pfrom->PushMessageWithFlag(State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCKTXN, resp);
connman.PushMessageWithFlag(pfrom, State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCKTXN, resp);
}
@ -5532,7 +5558,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5532,7 +5558,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// headers message). In both cases it's safe to update
// pindexBestHeaderSent to be our tip.
nodestate->pindexBestHeaderSent = pindex ? pindex : chainActive.Tip();
pfrom->PushMessage(NetMsgType::HEADERS, vHeaders);
connman.PushMessage(pfrom, NetMsgType::HEADERS, vHeaders);
}
@ -5695,7 +5721,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5695,7 +5721,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
pfrom->id,
FormatStateMessage(state));
if (state.GetRejectCode() < REJECT_INTERNAL) // Never send AcceptToMemoryPool's internal codes over P2P
pfrom->PushMessage(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
connman.PushMessage(pfrom, NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), inv.hash);
if (nDoS > 0) {
Misbehaving(pfrom->GetId(), nDoS);
@ -5714,7 +5740,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5714,7 +5740,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
if (mapBlockIndex.find(cmpctblock.header.hashPrevBlock) == mapBlockIndex.end()) {
// Doesn't connect (or is genesis), instead of DoSing in AcceptBlockHeader, request deeper headers
if (!IsInitialBlockDownload())
pfrom->PushMessage(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256());
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256());
return true;
}
@ -5747,7 +5773,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5747,7 +5773,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// so we just grab the block via normal getdata
std::vector<CInv> vInv(1);
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom, pindex->pprev, chainparams.GetConsensus()), cmpctblock.header.GetHash());
pfrom->PushMessage(NetMsgType::GETDATA, vInv);
connman.PushMessage(pfrom, NetMsgType::GETDATA, vInv);
}
return true;
}
@ -5791,7 +5817,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5791,7 +5817,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Duplicate txindexes, the block is now in-flight, so just request it
std::vector<CInv> vInv(1);
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom, pindex->pprev, chainparams.GetConsensus()), cmpctblock.header.GetHash());
pfrom->PushMessage(NetMsgType::GETDATA, vInv);
connman.PushMessage(pfrom, NetMsgType::GETDATA, vInv);
return true;
}
@ -5815,7 +5841,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5815,7 +5841,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman);
} else {
req.blockhash = pindex->GetBlockHash();
pfrom->PushMessage(NetMsgType::GETBLOCKTXN, req);
connman.PushMessage(pfrom, NetMsgType::GETBLOCKTXN, req);
}
}
} else {
@ -5824,7 +5850,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5824,7 +5850,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// mempool will probably be useless - request the block normally
std::vector<CInv> vInv(1);
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom, pindex->pprev, chainparams.GetConsensus()), cmpctblock.header.GetHash());
pfrom->PushMessage(NetMsgType::GETDATA, vInv);
connman.PushMessage(pfrom, NetMsgType::GETDATA, vInv);
return true;
} else {
// If this was an announce-cmpctblock, we want the same treatment as a header message
@ -5866,7 +5892,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5866,7 +5892,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Might have collided, fall back to getdata now :(
std::vector<CInv> invs;
invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(pfrom, chainActive.Tip(), chainparams.GetConsensus()), resp.blockhash));
pfrom->PushMessage(NetMsgType::GETDATA, invs);
connman.PushMessage(pfrom, NetMsgType::GETDATA, invs);
} else {
MarkBlockAsReceived(resp.blockhash); // it is now an empty pointer
fBlockRead = true;
@ -5880,7 +5906,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5880,7 +5906,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
int nDoS;
if (state.IsInvalid(nDoS)) {
assert (state.GetRejectCode() < REJECT_INTERNAL); // Blocks are never rejected with internal reject codes
pfrom->PushMessage(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
connman.PushMessage(pfrom, NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), block.GetHash());
if (nDoS > 0) {
LOCK(cs_main);
@ -5928,7 +5954,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5928,7 +5954,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// nUnconnectingHeaders gets reset back to 0.
if (mapBlockIndex.find(headers[0].hashPrevBlock) == mapBlockIndex.end() && nCount < MAX_BLOCKS_TO_ANNOUNCE) {
nodestate->nUnconnectingHeaders++;
pfrom->PushMessage(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256());
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256());
LogPrint("net", "received header %s: missing prev block %s, sending getheaders (%d) to end (peer=%d, nUnconnectingHeaders=%d)\n",
headers[0].GetHash().ToString(),
headers[0].hashPrevBlock.ToString(),
@ -5975,7 +6001,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -5975,7 +6001,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// TODO: optimize: if pindexLast is an ancestor of chainActive.Tip or pindexBestHeader, continue
// from there instead.
LogPrint("net", "more getheaders (%d) to end to peer=%d (startheight:%d)\n", pindexLast->nHeight, pfrom->id, pfrom->nStartingHeight);
pfrom->PushMessage(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexLast), uint256());
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexLast), uint256());
}
bool fCanDirectFetch = CanDirectFetch(chainparams.GetConsensus());
@ -6028,7 +6054,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -6028,7 +6054,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// In any case, we want to download using a compact block, not a regular one
vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash);
}
pfrom->PushMessage(NetMsgType::GETDATA, vGetData);
connman.PushMessage(pfrom, NetMsgType::GETDATA, vGetData);
}
}
}
@ -6060,7 +6086,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -6060,7 +6086,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
int nDoS;
if (state.IsInvalid(nDoS)) {
assert (state.GetRejectCode() < REJECT_INTERNAL); // Blocks are never rejected with internal reject codes
pfrom->PushMessage(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
connman.PushMessage(pfrom, NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), block.GetHash());
if (nDoS > 0) {
LOCK(cs_main);
@ -6137,7 +6163,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, @@ -6137,7 +6163,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// it, if the remote node sends a ping once per second and this node takes 5
// seconds to respond to each, the 5th ping the remote sends would appear to
// return very quickly.
pfrom->PushMessage(NetMsgType::PONG, nonce);
connman.PushMessage(pfrom, NetMsgType::PONG, nonce);
}
}
@ -6391,7 +6417,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) @@ -6391,7 +6417,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
}
catch (const std::ios_base::failure& e)
{
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message"));
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message"));
if (strstr(e.what(), "end of data"))
{
// Allow exceptions from under-length message on vRecv
@ -6480,11 +6506,11 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6480,11 +6506,11 @@ bool SendMessages(CNode* pto, CConnman& connman)
pto->nPingUsecStart = GetTimeMicros();
if (pto->nVersion > BIP0031_VERSION) {
pto->nPingNonceSent = nonce;
pto->PushMessage(NetMsgType::PING, nonce);
connman.PushMessage(pto, NetMsgType::PING, nonce);
} else {
// Peer is too old to support ping command with nonce, pong will never arrive.
pto->nPingNonceSent = 0;
pto->PushMessage(NetMsgType::PING);
connman.PushMessage(pto, NetMsgType::PING);
}
}
@ -6515,14 +6541,14 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6515,14 +6541,14 @@ bool SendMessages(CNode* pto, CConnman& connman)
// receiver rejects addr messages larger than 1000
if (vAddr.size() >= 1000)
{
pto->PushMessage(NetMsgType::ADDR, vAddr);
connman.PushMessage(pto, NetMsgType::ADDR, vAddr);
vAddr.clear();
}
}
}
pto->vAddrToSend.clear();
if (!vAddr.empty())
pto->PushMessage(NetMsgType::ADDR, vAddr);
connman.PushMessage(pto, NetMsgType::ADDR, vAddr);
// we only send the big addr message once
if (pto->vAddrToSend.capacity() > 40)
pto->vAddrToSend.shrink_to_fit();
@ -6545,7 +6571,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6545,7 +6571,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
}
BOOST_FOREACH(const CBlockReject& reject, state.rejects)
pto->PushMessage(NetMsgType::REJECT, (string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock);
connman.PushMessage(pto, NetMsgType::REJECT, (string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock);
state.rejects.clear();
// Start block sync
@ -6568,7 +6594,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6568,7 +6594,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
if (pindexStart->pprev)
pindexStart = pindexStart->pprev;
LogPrint("net", "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->id, pto->nStartingHeight);
pto->PushMessage(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256());
connman.PushMessage(pto, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256());
}
}
@ -6657,7 +6683,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6657,7 +6683,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
CBlock block;
assert(ReadBlockFromDisk(block, pBestIndex, consensusParams));
CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness);
pto->PushMessageWithFlag(state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::CMPCTBLOCK, cmpctblock);
connman.PushMessageWithFlag(pto, state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::CMPCTBLOCK, cmpctblock);
state.pindexBestHeaderSent = pBestIndex;
} else if (state.fPreferHeaders) {
if (vHeaders.size() > 1) {
@ -6669,7 +6695,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6669,7 +6695,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
LogPrint("net", "%s: sending header %s to peer=%d\n", __func__,
vHeaders.front().GetHash().ToString(), pto->id);
}
pto->PushMessage(NetMsgType::HEADERS, vHeaders);
connman.PushMessage(pto, NetMsgType::HEADERS, vHeaders);
state.pindexBestHeaderSent = pBestIndex;
} else
fRevertToInv = true;
@ -6715,7 +6741,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6715,7 +6741,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
BOOST_FOREACH(const uint256& hash, pto->vInventoryBlockToSend) {
vInv.push_back(CInv(MSG_BLOCK, hash));
if (vInv.size() == MAX_INV_SZ) {
pto->PushMessage(NetMsgType::INV, vInv);
connman.PushMessage(pto, NetMsgType::INV, vInv);
vInv.clear();
}
}
@ -6761,7 +6787,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6761,7 +6787,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
pto->filterInventoryKnown.insert(hash);
vInv.push_back(inv);
if (vInv.size() == MAX_INV_SZ) {
pto->PushMessage(NetMsgType::INV, vInv);
connman.PushMessage(pto, NetMsgType::INV, vInv);
vInv.clear();
}
}
@ -6827,7 +6853,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6827,7 +6853,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
}
}
if (vInv.size() == MAX_INV_SZ) {
pto->PushMessage(NetMsgType::INV, vInv);
connman.PushMessage(pto, NetMsgType::INV, vInv);
vInv.clear();
}
pto->filterInventoryKnown.insert(hash);
@ -6835,7 +6861,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6835,7 +6861,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
}
}
if (!vInv.empty())
pto->PushMessage(NetMsgType::INV, vInv);
connman.PushMessage(pto, NetMsgType::INV, vInv);
// Detect whether we're stalling
nNow = GetTimeMicros();
@ -6896,7 +6922,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6896,7 +6922,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
vGetData.push_back(inv);
if (vGetData.size() >= 1000)
{
pto->PushMessage(NetMsgType::GETDATA, vGetData);
connman.PushMessage(pto, NetMsgType::GETDATA, vGetData);
vGetData.clear();
}
} else {
@ -6906,7 +6932,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6906,7 +6932,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
pto->mapAskFor.erase(pto->mapAskFor.begin());
}
if (!vGetData.empty())
pto->PushMessage(NetMsgType::GETDATA, vGetData);
connman.PushMessage(pto, NetMsgType::GETDATA, vGetData);
//
// Message: feefilter
@ -6919,7 +6945,7 @@ bool SendMessages(CNode* pto, CConnman& connman) @@ -6919,7 +6945,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
if (timeNow > pto->nextSendTimeFeeFilter) {
CAmount filterToSend = filterRounder.round(currentFilter);
if (filterToSend != pto->lastSentFeeFilter) {
pto->PushMessage(NetMsgType::FEEFILTER, filterToSend);
connman.PushMessage(pto, NetMsgType::FEEFILTER, filterToSend);
pto->lastSentFeeFilter = filterToSend;
}
pto->nextSendTimeFeeFilter = PoissonNextSend(timeNow, AVG_FEEFILTER_BROADCAST_INTERVAL);

157
src/net.cpp

@ -393,18 +393,15 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo @@ -393,18 +393,15 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
NodeId id = GetNewNodeId();
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
GetNodeSignals().InitializeNode(pnode->GetId(), pnode);
pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices);
pnode->nTimeConnected = GetTime();
pnode->AddRef();
GetNodeSignals().InitializeNode(pnode, *this);
{
LOCK(cs_vNodes);
vNodes.push_back(pnode);
}
pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices);
pnode->nTimeConnected = GetTime();
return pnode;
} else if (!proxyConnectionFailed) {
// If connecting to the node failed, and failure is not caused by a problem connecting to
@ -450,23 +447,6 @@ void CNode::CloseSocketDisconnect() @@ -450,23 +447,6 @@ void CNode::CloseSocketDisconnect()
vRecvMsg.clear();
}
void CNode::PushVersion()
{
int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime());
CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
CAddress addrMe = CAddress(CService(), nLocalServices);
if (fLogIPs)
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
else
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), id);
PushMessage(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalServices, nTime, addrYou, addrMe,
nLocalHostNonce, strSubVersion, nMyStartingHeight, ::fRelayTxes);
}
void CConnman::ClearBanned()
{
{
@ -1032,9 +1012,9 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { @@ -1032,9 +1012,9 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, "", true);
GetNodeSignals().InitializeNode(pnode->GetId(), pnode);
pnode->AddRef();
pnode->fWhitelisted = whitelisted;
GetNodeSignals().InitializeNode(pnode, *this);
LogPrint("net", "connection from %s accepted\n", addr.ToString());
@ -1059,7 +1039,7 @@ void CConnman::ThreadSocketHandler() @@ -1059,7 +1039,7 @@ void CConnman::ThreadSocketHandler()
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect ||
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty()))
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@ -1163,10 +1143,6 @@ void CConnman::ThreadSocketHandler() @@ -1163,10 +1143,6 @@ void CConnman::ThreadSocketHandler()
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
if (pnode->nOptimisticBytesWritten) {
RecordBytesSent(pnode->nOptimisticBytesWritten);
pnode->nOptimisticBytesWritten = 0;
}
if (!pnode->vSendMsg.empty()) {
FD_SET(pnode->hSocket, &fdsetSend);
continue;
@ -2130,7 +2106,7 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st @@ -2130,7 +2106,7 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
pnodeLocalHost = new CNode(id, nLocalServices, GetBestHeight(), INVALID_SOCKET, CAddress(CService(local, 0), nLocalServices), 0, nonce);
GetNodeSignals().InitializeNode(pnodeLocalHost->GetId(), pnodeLocalHost);
GetNodeSignals().InitializeNode(pnodeLocalHost, *this);
}
//
@ -2482,46 +2458,10 @@ int CConnman::GetBestHeight() const @@ -2482,46 +2458,10 @@ int CConnman::GetBestHeight() const
return nBestHeight.load(std::memory_order_acquire);
}
void CNode::Fuzz(int nChance)
{
if (!fSuccessfullyConnected) return; // Don't fuzz initial handshake
if (GetRand(nChance) != 0) return; // Fuzz 1 of every nChance messages
switch (GetRand(3))
{
case 0:
// xor a random byte with a random value:
if (!ssSend.empty()) {
CDataStream::size_type pos = GetRand(ssSend.size());
ssSend[pos] ^= (unsigned char)(GetRand(256));
}
break;
case 1:
// delete a random byte:
if (!ssSend.empty()) {
CDataStream::size_type pos = GetRand(ssSend.size());
ssSend.erase(ssSend.begin()+pos);
}
break;
case 2:
// insert a random byte at a random position
{
CDataStream::size_type pos = GetRand(ssSend.size());
char ch = (char)GetRand(256);
ssSend.insert(ssSend.begin()+pos, ch);
}
break;
}
// Chance of more than one change half the time:
// (more changes exponentially less likely):
Fuzz(2);
}
unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; }
unsigned int CConnman::GetSendBufferSize() const{ return nSendBufferMaxSize; }
CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const std::string& addrNameIn, bool fInboundIn) :
ssSend(SER_NETWORK, INIT_PROTO_VERSION),
addr(addrIn),
fInbound(fInboundIn),
id(idIn),
@ -2530,7 +2470,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn @@ -2530,7 +2470,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
filterInventoryKnown(50000, 0.000001),
nLocalHostNonce(nLocalHostNonceIn),
nLocalServices(nLocalServicesIn),
nMyStartingHeight(nMyStartingHeightIn)
nMyStartingHeight(nMyStartingHeightIn),
nSendVersion(0)
{
nServices = NODE_NONE;
nServicesExpected = NODE_NONE;
@ -2577,7 +2518,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn @@ -2577,7 +2518,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0;
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
nOptimisticBytesWritten = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0;
@ -2587,10 +2527,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn @@ -2587,10 +2527,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
LogPrint("net", "Added connection to %s peer=%d\n", addrName, id);
else
LogPrint("net", "Added connection peer=%d\n", id);
// Be shy and don't send version until we hear
if (hSocket != INVALID_SOCKET && !fInbound)
PushVersion();
}
CNode::~CNode()
@ -2635,65 +2571,50 @@ void CNode::AskFor(const CInv& inv) @@ -2635,65 +2571,50 @@ void CNode::AskFor(const CInv& inv)
mapAskFor.insert(std::make_pair(nRequestTime, inv));
}
void CNode::BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend)
CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
{
ENTER_CRITICAL_SECTION(cs_vSend);
assert(ssSend.size() == 0);
ssSend << CMessageHeader(Params().MessageStart(), pszCommand, 0);
LogPrint("net", "sending: %s ", SanitizeString(pszCommand));
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
}
void CNode::AbortMessage() UNLOCK_FUNCTION(cs_vSend)
void CConnman::EndMessage(CDataStream& strm)
{
ssSend.clear();
LEAVE_CRITICAL_SECTION(cs_vSend);
// Set the size
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
// Set the checksum
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
LogPrint("net", "(aborted)\n");
}
void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend)
void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
{
// The -*messagestest options are intentionally not documented in the help message,
// since they are only used during development to debug the networking code and are
// not intended for end-users.
if (mapArgs.count("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 2)) == 0)
{
LogPrint("net", "dropmessages DROPPING SEND MESSAGE\n");
AbortMessage();
return;
}
if (mapArgs.count("-fuzzmessagestest"))
Fuzz(GetArg("-fuzzmessagestest", 10));
if (ssSend.size() == 0)
{
LEAVE_CRITICAL_SECTION(cs_vSend);
if(strm.empty())
return;
}
// Set the size
unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE;
WriteLE32((uint8_t*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
//log total amount of bytes per command
mapSendBytesPerMsgCmd[std::string(pszCommand)] += nSize + CMessageHeader::HEADER_SIZE;
// Set the checksum
uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end());
assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + CMessageHeader::CHECKSUM_SIZE);
memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
LogPrint("net", "(%d bytes) peer=%d\n", nSize, id);
std::deque<CSerializeData>::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData());
ssSend.GetAndClear(*it);
nSendSize += (*it).size();
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
if(pnode->hSocket == INVALID_SOCKET) {
return;
}
bool optimisticSend(pnode->vSendMsg.empty());
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
// If write queue empty, attempt "optimistic write"
if (it == vSendMsg.begin())
nOptimisticBytesWritten += SocketSendData(this);
//log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
pnode->nSendSize += strm.size();
LEAVE_CRITICAL_SECTION(cs_vSend);
// If write queue empty, attempt "optimistic write"
if (optimisticSend == true)
nBytesSent = SocketSendData(pnode);
}
if (nBytesSent)
RecordBytesSent(nBytesSent);
}
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)

250
src/net.h

@ -136,6 +136,33 @@ public: @@ -136,6 +136,33 @@ public:
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
template <typename... Args>
void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
{
auto msg(BeginMessage(pnode, nVersion, flag, sCommand));
::SerializeMany(msg, msg.nType, msg.nVersion, std::forward<Args>(args)...);
EndMessage(msg);
PushMessage(pnode, msg, sCommand);
}
template <typename... Args>
void PushMessageWithFlag(CNode* pnode, int flag, const std::string& sCommand, Args&&... args)
{
PushMessageWithVersionAndFlag(pnode, 0, flag, sCommand, std::forward<Args>(args)...);
}
template <typename... Args>
void PushMessageWithVersion(CNode* pnode, int nVersion, const std::string& sCommand, Args&&... args)
{
PushMessageWithVersionAndFlag(pnode, nVersion, 0, sCommand, std::forward<Args>(args)...);
}
template <typename... Args>
void PushMessage(CNode* pnode, const std::string& sCommand, Args&&... args)
{
PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
}
template<typename Callable>
bool ForEachNodeContinueIf(Callable&& func)
{
@ -345,6 +372,10 @@ private: @@ -345,6 +372,10 @@ private:
unsigned int GetReceiveFloodSize() const;
CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
void EndMessage(CDataStream& strm);
// Network stats
void RecordBytesRecv(uint64_t bytes);
void RecordBytesSent(uint64_t bytes);
@ -428,7 +459,7 @@ struct CNodeSignals @@ -428,7 +459,7 @@ struct CNodeSignals
{
boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> ProcessMessages;
boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> SendMessages;
boost::signals2::signal<void (NodeId, const CNode*)> InitializeNode;
boost::signals2::signal<void (CNode*, CConnman&)> InitializeNode;
boost::signals2::signal<void (NodeId, bool&)> FinalizeNode;
};
@ -553,15 +584,14 @@ public: @@ -553,15 +584,14 @@ public:
/** Information about a peer */
class CNode
{
friend class CConnman;
public:
// socket
ServiceFlags nServices;
ServiceFlags nServicesExpected;
SOCKET hSocket;
CDataStream ssSend;
size_t nSendSize; // total size of all vSendMsg entries
size_t nSendOffset; // offset inside the first vSendMsg already sent
uint64_t nOptimisticBytesWritten;
uint64_t nSendBytes;
std::deque<CSerializeData> vSendMsg;
CCriticalSection cs_vSend;
@ -611,9 +641,6 @@ protected: @@ -611,9 +641,6 @@ protected:
mapMsgCmdSize mapSendBytesPerMsgCmd;
mapMsgCmdSize mapRecvBytesPerMsgCmd;
// Basic fuzz-testing
void Fuzz(int nChance); // modifies ssSend
public:
uint256 hashContinue;
int nStartingHeight;
@ -681,6 +708,7 @@ private: @@ -681,6 +708,7 @@ private:
// Services offered to this peer
const ServiceFlags nLocalServices;
const int nMyStartingHeight;
int nSendVersion;
public:
NodeId GetId() const {
@ -691,6 +719,10 @@ public: @@ -691,6 +719,10 @@ public:
return nLocalHostNonce;
}
int GetMyStartingHeight() const {
return nMyStartingHeight;
}
int GetRefCount()
{
assert(nRefCount >= 0);
@ -716,6 +748,25 @@ public: @@ -716,6 +748,25 @@ public:
BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
msg.SetVersion(nVersionIn);
}
void SetSendVersion(int nVersionIn)
{
// Send version may only be changed in the version message, and
// only one version message is allowed per session. We can therefore
// treat this value as const and even atomic as long as it's only used
// once the handshake is complete. Any attempt to set this twice is an
// error.
assert(nSendVersion == 0);
nSendVersion = nVersionIn;
}
int GetSendVersion() const
{
// The send version should always be explicitly set to
// INIT_PROTO_VERSION rather than using this value until the handshake
// is complete. See PushMessageWithVersion().
assert(nSendVersion != 0);
return nSendVersion;
}
CNode* AddRef()
{
@ -778,193 +829,6 @@ public: @@ -778,193 +829,6 @@ public:
void AskFor(const CInv& inv);
// TODO: Document the postcondition of this function. Is cs_vSend locked?
void BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend);
// TODO: Document the precondition of this function. Is cs_vSend locked?
void AbortMessage() UNLOCK_FUNCTION(cs_vSend);
// TODO: Document the precondition of this function. Is cs_vSend locked?
void EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend);
void PushVersion();
void PushMessage(const char* pszCommand)
{
try
{
BeginMessage(pszCommand);
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
template<typename T1>
void PushMessage(const char* pszCommand, const T1& a1)
{
try
{
BeginMessage(pszCommand);
ssSend << a1;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
/** Send a message containing a1, serialized with flag flag. */
template<typename T1>
void PushMessageWithFlag(int flag, const char* pszCommand, const T1& a1)
{
try
{
BeginMessage(pszCommand);
WithOrVersion(&ssSend, flag) << a1;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
template<typename T1, typename T2>
void PushMessage(const char* pszCommand, const T1& a1, const T2& a2)
{
try
{
BeginMessage(pszCommand);
ssSend << a1 << a2;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
template<typename T1, typename T2, typename T3>
void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3)
{
try
{
BeginMessage(pszCommand);
ssSend << a1 << a2 << a3;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
template<typename T1, typename T2, typename T3, typename T4>
void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4)
{
try
{
BeginMessage(pszCommand);
ssSend << a1 << a2 << a3 << a4;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
template<typename T1, typename T2, typename T3, typename T4, typename T5>
void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5)
{
try
{
BeginMessage(pszCommand);
ssSend << a1 << a2 << a3 << a4 << a5;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6)
{
try
{
BeginMessage(pszCommand);
ssSend << a1 << a2 << a3 << a4 << a5 << a6;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7>
void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7)
{
try
{
BeginMessage(pszCommand);
ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8>
void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8)
{
try
{
BeginMessage(pszCommand);
ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8, typename T9>
void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8, const T9& a9)
{
try
{
BeginMessage(pszCommand);
ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
EndMessage(pszCommand);
}
catch (...)
{
AbortMessage();
throw;
}
}
void CloseSocketDisconnect();
void copyStats(CNodeStats &stats);

49
src/serialize.h

@ -160,6 +160,7 @@ enum @@ -160,6 +160,7 @@ enum
};
#define READWRITE(obj) (::SerReadWrite(s, (obj), nType, nVersion, ser_action))
#define READWRITEMANY(...) (::SerReadWriteMany(s, nType, nVersion, ser_action, __VA_ARGS__))
/**
* Implement three methods for serializable objects. These are actually wrappers over
@ -960,4 +961,52 @@ public: @@ -960,4 +961,52 @@ public:
}
};
template<typename Stream>
void SerializeMany(Stream& s, int nType, int nVersion)
{
}
template<typename Stream, typename Arg>
void SerializeMany(Stream& s, int nType, int nVersion, Arg&& arg)
{
::Serialize(s, std::forward<Arg>(arg), nType, nVersion);
}
template<typename Stream, typename Arg, typename... Args>
void SerializeMany(Stream& s, int nType, int nVersion, Arg&& arg, Args&&... args)
{
::Serialize(s, std::forward<Arg>(arg), nType, nVersion);
::SerializeMany(s, nType, nVersion, std::forward<Args>(args)...);
}
template<typename Stream>
inline void UnserializeMany(Stream& s, int nType, int nVersion)
{
}
template<typename Stream, typename Arg>
inline void UnserializeMany(Stream& s, int nType, int nVersion, Arg& arg)
{
::Unserialize(s, arg, nType, nVersion);
}
template<typename Stream, typename Arg, typename... Args>
inline void UnserializeMany(Stream& s, int nType, int nVersion, Arg& arg, Args&... args)
{
::Unserialize(s, arg, nType, nVersion);
::UnserializeMany(s, nType, nVersion, args...);
}
template<typename Stream, typename... Args>
inline void SerReadWriteMany(Stream& s, int nType, int nVersion, CSerActionSerialize ser_action, Args&&... args)
{
::SerializeMany(s, nType, nVersion, std::forward<Args>(args)...);
}
template<typename Stream, typename... Args>
inline void SerReadWriteMany(Stream& s, int nType, int nVersion, CSerActionUnserialize ser_action, Args&... args)
{
::UnserializeMany(s, nType, nVersion, args...);
}
#endif // BITCOIN_SERIALIZE_H

7
src/streams.h

@ -112,6 +112,13 @@ public: @@ -112,6 +112,13 @@ public:
Init(nTypeIn, nVersionIn);
}
template <typename... Args>
CDataStream(int nTypeIn, int nVersionIn, Args&&... args)
{
Init(nTypeIn, nVersionIn);
::SerializeMany(*this, nType, nVersion, std::forward<Args>(args)...);
}
void Init(int nTypeIn, int nVersionIn)
{
nReadPos = 0;

12
src/test/DoS_tests.cpp

@ -49,7 +49,8 @@ BOOST_AUTO_TEST_CASE(DoS_banning) @@ -49,7 +49,8 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
connman->ClearBanned();
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true);
GetNodeSignals().InitializeNode(dummyNode1.GetId(), &dummyNode1);
dummyNode1.SetSendVersion(PROTOCOL_VERSION);
GetNodeSignals().InitializeNode(&dummyNode1, *connman);
dummyNode1.nVersion = 1;
Misbehaving(dummyNode1.GetId(), 100); // Should get banned
SendMessages(&dummyNode1, *connman);
@ -58,7 +59,8 @@ BOOST_AUTO_TEST_CASE(DoS_banning) @@ -58,7 +59,8 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
CAddress addr2(ip(0xa0b0c002), NODE_NONE);
CNode dummyNode2(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr2, 1, 1, "", true);
GetNodeSignals().InitializeNode(dummyNode2.GetId(), &dummyNode2);
dummyNode2.SetSendVersion(PROTOCOL_VERSION);
GetNodeSignals().InitializeNode(&dummyNode2, *connman);
dummyNode2.nVersion = 1;
Misbehaving(dummyNode2.GetId(), 50);
SendMessages(&dummyNode2, *connman);
@ -75,7 +77,8 @@ BOOST_AUTO_TEST_CASE(DoS_banscore) @@ -75,7 +77,8 @@ BOOST_AUTO_TEST_CASE(DoS_banscore)
mapArgs["-banscore"] = "111"; // because 11 is my favorite number
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 3, 1, "", true);
GetNodeSignals().InitializeNode(dummyNode1.GetId(), &dummyNode1);
dummyNode1.SetSendVersion(PROTOCOL_VERSION);
GetNodeSignals().InitializeNode(&dummyNode1, *connman);
dummyNode1.nVersion = 1;
Misbehaving(dummyNode1.GetId(), 100);
SendMessages(&dummyNode1, *connman);
@ -97,7 +100,8 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) @@ -97,7 +100,8 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
CAddress addr(ip(0xa0b0c001), NODE_NONE);
CNode dummyNode(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr, 4, 4, "", true);
GetNodeSignals().InitializeNode(dummyNode.GetId(), &dummyNode);
dummyNode.SetSendVersion(PROTOCOL_VERSION);
GetNodeSignals().InitializeNode(&dummyNode, *connman);
dummyNode.nVersion = 1;
Misbehaving(dummyNode.GetId(), 100);

71
src/test/serialize_tests.cpp

@ -10,11 +10,54 @@ @@ -10,11 +10,54 @@
#include <stdint.h>
#include <boost/test/unit_test.hpp>
using namespace std;
BOOST_FIXTURE_TEST_SUITE(serialize_tests, BasicTestingSetup)
class CSerializeMethodsTestSingle
{
protected:
int intval;
bool boolval;
std::string stringval;
const char* charstrval;
CTransaction txval;
public:
CSerializeMethodsTestSingle() = default;
CSerializeMethodsTestSingle(int intvalin, bool boolvalin, std::string stringvalin, const char* charstrvalin, CTransaction txvalin) : intval(intvalin), boolval(boolvalin), stringval(std::move(stringvalin)), charstrval(charstrvalin), txval(txvalin){}
ADD_SERIALIZE_METHODS;
template <typename Stream, typename Operation>
inline void SerializationOp(Stream& s, Operation ser_action, int nType, int nVersion) {
READWRITE(intval);
READWRITE(boolval);
READWRITE(stringval);
READWRITE(FLATDATA(charstrval));
READWRITE(txval);
}
bool operator==(const CSerializeMethodsTestSingle& rhs)
{
return intval == rhs.intval && \
boolval == rhs.boolval && \
stringval == rhs.stringval && \
strcmp(charstrval, rhs.charstrval) == 0 && \
txval == rhs.txval;
}
};
class CSerializeMethodsTestMany : public CSerializeMethodsTestSingle
{
public:
using CSerializeMethodsTestSingle::CSerializeMethodsTestSingle;
ADD_SERIALIZE_METHODS;
template <typename Stream, typename Operation>
inline void SerializationOp(Stream& s, Operation ser_action, int nType, int nVersion) {
READWRITEMANY(intval, boolval, stringval, FLATDATA(charstrval), txval);
}
};
BOOST_AUTO_TEST_CASE(sizes)
{
BOOST_CHECK_EQUAL(sizeof(char), GetSerializeSize(char(0), 0));
@ -297,4 +340,30 @@ BOOST_AUTO_TEST_CASE(insert_delete) @@ -297,4 +340,30 @@ BOOST_AUTO_TEST_CASE(insert_delete)
BOOST_CHECK_EQUAL(ss.size(), 0);
}
BOOST_AUTO_TEST_CASE(class_methods)
{
int intval(100);
bool boolval(true);
std::string stringval("testing");
const char* charstrval("testing charstr");
CMutableTransaction txval;
CSerializeMethodsTestSingle methodtest1(intval, boolval, stringval, charstrval, txval);
CSerializeMethodsTestMany methodtest2(intval, boolval, stringval, charstrval, txval);
CSerializeMethodsTestSingle methodtest3;
CSerializeMethodsTestMany methodtest4;
CDataStream ss(SER_DISK, PROTOCOL_VERSION);
BOOST_CHECK(methodtest1 == methodtest2);
ss << methodtest1;
ss >> methodtest4;
ss << methodtest2;
ss >> methodtest3;
BOOST_CHECK(methodtest1 == methodtest2);
BOOST_CHECK(methodtest2 == methodtest3);
BOOST_CHECK(methodtest3 == methodtest4);
CDataStream ss2(SER_DISK, PROTOCOL_VERSION, intval, boolval, stringval, FLATDATA(charstrval), txval);
ss2 >> methodtest3;
BOOST_CHECK(methodtest3 == methodtest4);
}
BOOST_AUTO_TEST_SUITE_END()

Loading…
Cancel
Save