Browse Source

net: move send/recv statistics to CConnman

0.14
Cory Fields 9 years ago
parent
commit
63cafa6329
  1. 2
      src/init.cpp
  2. 10
      src/main.cpp
  3. 63
      src/net.cpp
  4. 82
      src/net.h
  5. 8
      src/qt/clientmodel.cpp
  6. 18
      src/rpc/net.cpp

2
src/init.cpp

@ -1240,7 +1240,7 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
} }
#endif #endif
if (mapArgs.count("-maxuploadtarget")) { if (mapArgs.count("-maxuploadtarget")) {
CNode::SetMaxOutboundTarget(GetArg("-maxuploadtarget", DEFAULT_MAX_UPLOAD_TARGET)*1024*1024); connman.SetMaxOutboundTarget(GetArg("-maxuploadtarget", DEFAULT_MAX_UPLOAD_TARGET)*1024*1024);
} }
// ********************************************************* Step 7: load block chain // ********************************************************* Step 7: load block chain

10
src/main.cpp

@ -4766,7 +4766,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
} }
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams) void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman)
{ {
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
@ -4808,7 +4808,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// disconnect node in case we have reached the outbound limit for serving historical blocks // disconnect node in case we have reached the outbound limit for serving historical blocks
// never disconnect whitelisted nodes // never disconnect whitelisted nodes
static const int nOneWeek = 7 * 24 * 60 * 60; // assume > 1 week = historical static const int nOneWeek = 7 * 24 * 60 * 60; // assume > 1 week = historical
if (send && CNode::OutboundTargetReached(true) && ( ((pindexBestHeader != NULL) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > nOneWeek)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted) if (send && connman.OutboundTargetReached(true) && ( ((pindexBestHeader != NULL) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > nOneWeek)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted)
{ {
LogPrint("net", "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId()); LogPrint("net", "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId());
@ -5312,7 +5312,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id); LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id);
pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, chainparams.GetConsensus()); ProcessGetData(pfrom, chainparams.GetConsensus(), connman);
} }
@ -5986,7 +5986,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return true; return true;
} }
if (CNode::OutboundTargetReached(false) && !pfrom->fWhitelisted) if (connman.OutboundTargetReached(false) && !pfrom->fWhitelisted)
{ {
LogPrint("net", "mempool request with bandwidth limit reached, disconnect peer=%d\n", pfrom->GetId()); LogPrint("net", "mempool request with bandwidth limit reached, disconnect peer=%d\n", pfrom->GetId());
pfrom->fDisconnect = true; pfrom->fDisconnect = true;
@ -6202,7 +6202,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
bool fOk = true; bool fOk = true;
if (!pfrom->vRecvGetData.empty()) if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus()); ProcessGetData(pfrom, chainparams.GetConsensus(), connman);
// this maintains the order of responses // this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return fOk; if (!pfrom->vRecvGetData.empty()) return fOk;

63
src/net.cpp

@ -298,15 +298,6 @@ bool IsReachable(const CNetAddr& addr)
return IsReachable(net); return IsReachable(net);
} }
uint64_t CNode::nTotalBytesRecv = 0;
uint64_t CNode::nTotalBytesSent = 0;
CCriticalSection CNode::cs_totalBytesRecv;
CCriticalSection CNode::cs_totalBytesSent;
uint64_t CNode::nMaxOutboundLimit = 0;
uint64_t CNode::nMaxOutboundTotalBytesSentInCycle = 0;
uint64_t CNode::nMaxOutboundTimeframe = 60*60*24; //1 day
uint64_t CNode::nMaxOutboundCycleStartTime = 0;
CNode* CConnman::FindNode(const CNetAddr& ip) CNode* CConnman::FindNode(const CNetAddr& ip)
{ {
@ -804,7 +795,6 @@ size_t SocketSendData(CNode *pnode)
pnode->nLastSend = GetTime(); pnode->nLastSend = GetTime();
pnode->nSendBytes += nBytes; pnode->nSendBytes += nBytes;
pnode->nSendOffset += nBytes; pnode->nSendOffset += nBytes;
pnode->RecordBytesSent(nBytes);
nSentSize += nBytes; nSentSize += nBytes;
if (pnode->nSendOffset == data.size()) { if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0; pnode->nSendOffset = 0;
@ -1176,9 +1166,15 @@ void CConnman::ThreadSocketHandler()
// * We process a message in the buffer (message handler thread). // * We process a message in the buffer (message handler thread).
{ {
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend && !pnode->vSendMsg.empty()) { if (lockSend) {
FD_SET(pnode->hSocket, &fdsetSend); if (pnode->nOptimisticBytesWritten) {
continue; RecordBytesSent(pnode->nOptimisticBytesWritten);
pnode->nOptimisticBytesWritten = 0;
}
if (!pnode->vSendMsg.empty()) {
FD_SET(pnode->hSocket, &fdsetSend);
continue;
}
} }
} }
{ {
@ -1257,7 +1253,7 @@ void CConnman::ThreadSocketHandler()
messageHandlerCondition.notify_one(); messageHandlerCondition.notify_one();
pnode->nLastRecv = GetTime(); pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes; pnode->nRecvBytes += nBytes;
pnode->RecordBytesRecv(nBytes); RecordBytesRecv(nBytes);
} }
else if (nBytes == 0) else if (nBytes == 0)
{ {
@ -1289,8 +1285,11 @@ void CConnman::ThreadSocketHandler()
if (FD_ISSET(pnode->hSocket, &fdsetSend)) if (FD_ISSET(pnode->hSocket, &fdsetSend))
{ {
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) if (lockSend) {
SocketSendData(pnode); size_t nBytes = SocketSendData(pnode);
if (nBytes)
RecordBytesSent(nBytes);
}
} }
// //
@ -2060,6 +2059,13 @@ NodeId CConnman::GetNewNodeId()
bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError) bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError)
{ {
nTotalBytesRecv = 0;
nTotalBytesSent = 0;
nMaxOutboundLimit = 0;
nMaxOutboundTotalBytesSentInCycle = 0;
nMaxOutboundTimeframe = 60*60*24; //1 day
nMaxOutboundCycleStartTime = 0;
uiInterface.InitMessage(_("Loading addresses...")); uiInterface.InitMessage(_("Loading addresses..."));
// Load addresses from peers.dat // Load addresses from peers.dat
int64_t nStart = GetTimeMillis(); int64_t nStart = GetTimeMillis();
@ -2344,13 +2350,13 @@ void CConnman::RelayTransaction(const CTransaction& tx)
} }
} }
void CNode::RecordBytesRecv(uint64_t bytes) void CConnman::RecordBytesRecv(uint64_t bytes)
{ {
LOCK(cs_totalBytesRecv); LOCK(cs_totalBytesRecv);
nTotalBytesRecv += bytes; nTotalBytesRecv += bytes;
} }
void CNode::RecordBytesSent(uint64_t bytes) void CConnman::RecordBytesSent(uint64_t bytes)
{ {
LOCK(cs_totalBytesSent); LOCK(cs_totalBytesSent);
nTotalBytesSent += bytes; nTotalBytesSent += bytes;
@ -2367,7 +2373,7 @@ void CNode::RecordBytesSent(uint64_t bytes)
nMaxOutboundTotalBytesSentInCycle += bytes; nMaxOutboundTotalBytesSentInCycle += bytes;
} }
void CNode::SetMaxOutboundTarget(uint64_t limit) void CConnman::SetMaxOutboundTarget(uint64_t limit)
{ {
LOCK(cs_totalBytesSent); LOCK(cs_totalBytesSent);
uint64_t recommendedMinimum = (nMaxOutboundTimeframe / 600) * MAX_BLOCK_SERIALIZED_SIZE; uint64_t recommendedMinimum = (nMaxOutboundTimeframe / 600) * MAX_BLOCK_SERIALIZED_SIZE;
@ -2377,19 +2383,19 @@ void CNode::SetMaxOutboundTarget(uint64_t limit)
LogPrintf("Max outbound target is very small (%s bytes) and will be overshot. Recommended minimum is %s bytes.\n", nMaxOutboundLimit, recommendedMinimum); LogPrintf("Max outbound target is very small (%s bytes) and will be overshot. Recommended minimum is %s bytes.\n", nMaxOutboundLimit, recommendedMinimum);
} }
uint64_t CNode::GetMaxOutboundTarget() uint64_t CConnman::GetMaxOutboundTarget()
{ {
LOCK(cs_totalBytesSent); LOCK(cs_totalBytesSent);
return nMaxOutboundLimit; return nMaxOutboundLimit;
} }
uint64_t CNode::GetMaxOutboundTimeframe() uint64_t CConnman::GetMaxOutboundTimeframe()
{ {
LOCK(cs_totalBytesSent); LOCK(cs_totalBytesSent);
return nMaxOutboundTimeframe; return nMaxOutboundTimeframe;
} }
uint64_t CNode::GetMaxOutboundTimeLeftInCycle() uint64_t CConnman::GetMaxOutboundTimeLeftInCycle()
{ {
LOCK(cs_totalBytesSent); LOCK(cs_totalBytesSent);
if (nMaxOutboundLimit == 0) if (nMaxOutboundLimit == 0)
@ -2403,7 +2409,7 @@ uint64_t CNode::GetMaxOutboundTimeLeftInCycle()
return (cycleEndTime < now) ? 0 : cycleEndTime - GetTime(); return (cycleEndTime < now) ? 0 : cycleEndTime - GetTime();
} }
void CNode::SetMaxOutboundTimeframe(uint64_t timeframe) void CConnman::SetMaxOutboundTimeframe(uint64_t timeframe)
{ {
LOCK(cs_totalBytesSent); LOCK(cs_totalBytesSent);
if (nMaxOutboundTimeframe != timeframe) if (nMaxOutboundTimeframe != timeframe)
@ -2415,7 +2421,7 @@ void CNode::SetMaxOutboundTimeframe(uint64_t timeframe)
nMaxOutboundTimeframe = timeframe; nMaxOutboundTimeframe = timeframe;
} }
bool CNode::OutboundTargetReached(bool historicalBlockServingLimit) bool CConnman::OutboundTargetReached(bool historicalBlockServingLimit)
{ {
LOCK(cs_totalBytesSent); LOCK(cs_totalBytesSent);
if (nMaxOutboundLimit == 0) if (nMaxOutboundLimit == 0)
@ -2435,7 +2441,7 @@ bool CNode::OutboundTargetReached(bool historicalBlockServingLimit)
return false; return false;
} }
uint64_t CNode::GetOutboundTargetBytesLeft() uint64_t CConnman::GetOutboundTargetBytesLeft()
{ {
LOCK(cs_totalBytesSent); LOCK(cs_totalBytesSent);
if (nMaxOutboundLimit == 0) if (nMaxOutboundLimit == 0)
@ -2444,13 +2450,13 @@ uint64_t CNode::GetOutboundTargetBytesLeft()
return (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit) ? 0 : nMaxOutboundLimit - nMaxOutboundTotalBytesSentInCycle; return (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit) ? 0 : nMaxOutboundLimit - nMaxOutboundTotalBytesSentInCycle;
} }
uint64_t CNode::GetTotalBytesRecv() uint64_t CConnman::GetTotalBytesRecv()
{ {
LOCK(cs_totalBytesRecv); LOCK(cs_totalBytesRecv);
return nTotalBytesRecv; return nTotalBytesRecv;
} }
uint64_t CNode::GetTotalBytesSent() uint64_t CConnman::GetTotalBytesSent()
{ {
LOCK(cs_totalBytesSent); LOCK(cs_totalBytesSent);
return nTotalBytesSent; return nTotalBytesSent;
@ -2548,6 +2554,7 @@ CNode::CNode(NodeId idIn, SOCKET hSocketIn, const CAddress& addrIn, const std::s
lastSentFeeFilter = 0; lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0; nextSendTimeFeeFilter = 0;
id = idIn; id = idIn;
nOptimisticBytesWritten = 0;
GetRandBytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce)); GetRandBytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce));
@ -2665,7 +2672,7 @@ void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend)
// If write queue empty, attempt "optimistic write" // If write queue empty, attempt "optimistic write"
if (it == vSendMsg.begin()) if (it == vSendMsg.begin())
SocketSendData(this); nOptimisticBytesWritten += SocketSendData(this);
LEAVE_CRITICAL_SECTION(cs_vSend); LEAVE_CRITICAL_SECTION(cs_vSend);
} }

82
src/net.h

@ -171,6 +171,31 @@ public:
bool DisconnectSubnet(const CSubNet& subnet); bool DisconnectSubnet(const CSubNet& subnet);
void AddWhitelistedRange(const CSubNet &subnet); void AddWhitelistedRange(const CSubNet &subnet);
//!set the max outbound target in bytes
void SetMaxOutboundTarget(uint64_t limit);
uint64_t GetMaxOutboundTarget();
//!set the timeframe for the max outbound target
void SetMaxOutboundTimeframe(uint64_t timeframe);
uint64_t GetMaxOutboundTimeframe();
//!check if the outbound target is reached
// if param historicalBlockServingLimit is set true, the function will
// response true if the limit for serving historical blocks has been reached
bool OutboundTargetReached(bool historicalBlockServingLimit);
//!response the bytes left in the current max outbound cycle
// in case of no limit, it will always response 0
uint64_t GetOutboundTargetBytesLeft();
//!response the time in second left in the current max outbound cycle
// in case of no limit, it will always response 0
uint64_t GetMaxOutboundTimeLeftInCycle();
uint64_t GetTotalBytesRecv();
uint64_t GetTotalBytesSent();
private: private:
struct ListenSocket { struct ListenSocket {
SOCKET socket; SOCKET socket;
@ -210,6 +235,22 @@ private:
void DumpData(); void DumpData();
void DumpBanlist(); void DumpBanlist();
// Network stats
void RecordBytesRecv(uint64_t bytes);
void RecordBytesSent(uint64_t bytes);
// Network usage totals
CCriticalSection cs_totalBytesRecv;
CCriticalSection cs_totalBytesSent;
uint64_t nTotalBytesRecv;
uint64_t nTotalBytesSent;
// outbound limit & stats
uint64_t nMaxOutboundTotalBytesSentInCycle;
uint64_t nMaxOutboundCycleStartTime;
uint64_t nMaxOutboundLimit;
uint64_t nMaxOutboundTimeframe;
// Whitelisted ranges. Any node connecting from these is automatically // Whitelisted ranges. Any node connecting from these is automatically
// whitelisted (as well as those connecting to whitelisted binds). // whitelisted (as well as those connecting to whitelisted binds).
std::vector<CSubNet> vWhitelistedRange; std::vector<CSubNet> vWhitelistedRange;
@ -396,6 +437,7 @@ public:
CDataStream ssSend; CDataStream ssSend;
size_t nSendSize; // total size of all vSendMsg entries size_t nSendSize; // total size of all vSendMsg entries
size_t nSendOffset; // offset inside the first vSendMsg already sent size_t nSendOffset; // offset inside the first vSendMsg already sent
uint64_t nOptimisticBytesWritten;
uint64_t nSendBytes; uint64_t nSendBytes;
std::deque<CSerializeData> vSendMsg; std::deque<CSerializeData> vSendMsg;
CCriticalSection cs_vSend; CCriticalSection cs_vSend;
@ -507,18 +549,6 @@ public:
~CNode(); ~CNode();
private: private:
// Network usage totals
static CCriticalSection cs_totalBytesRecv;
static CCriticalSection cs_totalBytesSent;
static uint64_t nTotalBytesRecv;
static uint64_t nTotalBytesSent;
// outbound limit & stats
static uint64_t nMaxOutboundTotalBytesSentInCycle;
static uint64_t nMaxOutboundCycleStartTime;
static uint64_t nMaxOutboundLimit;
static uint64_t nMaxOutboundTimeframe;
CNode(const CNode&); CNode(const CNode&);
void operator=(const CNode&); void operator=(const CNode&);
@ -812,34 +842,6 @@ public:
void CloseSocketDisconnect(); void CloseSocketDisconnect();
void copyStats(CNodeStats &stats); void copyStats(CNodeStats &stats);
// Network stats
static void RecordBytesRecv(uint64_t bytes);
static void RecordBytesSent(uint64_t bytes);
static uint64_t GetTotalBytesRecv();
static uint64_t GetTotalBytesSent();
//!set the max outbound target in bytes
static void SetMaxOutboundTarget(uint64_t limit);
static uint64_t GetMaxOutboundTarget();
//!set the timeframe for the max outbound target
static void SetMaxOutboundTimeframe(uint64_t timeframe);
static uint64_t GetMaxOutboundTimeframe();
//!check if the outbound target is reached
// if param historicalBlockServingLimit is set true, the function will
// response true if the limit for serving historical blocks has been reached
static bool OutboundTargetReached(bool historicalBlockServingLimit);
//!response the bytes left in the current max outbound cycle
// in case of no limit, it will always response 0
static uint64_t GetOutboundTargetBytesLeft();
//!response the time in second left in the current max outbound cycle
// in case of no limit, it will always response 0
static uint64_t GetMaxOutboundTimeLeftInCycle();
}; };

8
src/qt/clientmodel.cpp

@ -72,12 +72,16 @@ int ClientModel::getNumBlocks() const
quint64 ClientModel::getTotalBytesRecv() const quint64 ClientModel::getTotalBytesRecv() const
{ {
return CNode::GetTotalBytesRecv(); if(!g_connman)
return 0;
return g_connman->GetTotalBytesRecv();
} }
quint64 ClientModel::getTotalBytesSent() const quint64 ClientModel::getTotalBytesSent() const
{ {
return CNode::GetTotalBytesSent(); if(!g_connman)
return 0;
return g_connman->GetTotalBytesSent();
} }
QDateTime ClientModel::getLastBlockDate() const QDateTime ClientModel::getLastBlockDate() const

18
src/rpc/net.cpp

@ -347,19 +347,21 @@ UniValue getnettotals(const UniValue& params, bool fHelp)
+ HelpExampleCli("getnettotals", "") + HelpExampleCli("getnettotals", "")
+ HelpExampleRpc("getnettotals", "") + HelpExampleRpc("getnettotals", "")
); );
if(!g_connman)
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled");
UniValue obj(UniValue::VOBJ); UniValue obj(UniValue::VOBJ);
obj.push_back(Pair("totalbytesrecv", CNode::GetTotalBytesRecv())); obj.push_back(Pair("totalbytesrecv", g_connman->GetTotalBytesRecv()));
obj.push_back(Pair("totalbytessent", CNode::GetTotalBytesSent())); obj.push_back(Pair("totalbytessent", g_connman->GetTotalBytesSent()));
obj.push_back(Pair("timemillis", GetTimeMillis())); obj.push_back(Pair("timemillis", GetTimeMillis()));
UniValue outboundLimit(UniValue::VOBJ); UniValue outboundLimit(UniValue::VOBJ);
outboundLimit.push_back(Pair("timeframe", CNode::GetMaxOutboundTimeframe())); outboundLimit.push_back(Pair("timeframe", g_connman->GetMaxOutboundTimeframe()));
outboundLimit.push_back(Pair("target", CNode::GetMaxOutboundTarget())); outboundLimit.push_back(Pair("target", g_connman->GetMaxOutboundTarget()));
outboundLimit.push_back(Pair("target_reached", CNode::OutboundTargetReached(false))); outboundLimit.push_back(Pair("target_reached", g_connman->OutboundTargetReached(false)));
outboundLimit.push_back(Pair("serve_historical_blocks", !CNode::OutboundTargetReached(true))); outboundLimit.push_back(Pair("serve_historical_blocks", !g_connman->OutboundTargetReached(true)));
outboundLimit.push_back(Pair("bytes_left_in_cycle", CNode::GetOutboundTargetBytesLeft())); outboundLimit.push_back(Pair("bytes_left_in_cycle", g_connman->GetOutboundTargetBytesLeft()));
outboundLimit.push_back(Pair("time_left_in_cycle", CNode::GetMaxOutboundTimeLeftInCycle())); outboundLimit.push_back(Pair("time_left_in_cycle", g_connman->GetMaxOutboundTimeLeftInCycle()));
obj.push_back(Pair("uploadtarget", outboundLimit)); obj.push_back(Pair("uploadtarget", outboundLimit));
return obj; return obj;
} }

Loading…
Cancel
Save