Browse Source

Merge pull request #1036 from gavinandresen/pubsubcleanup

Remove half-implemented publish/subscribe system
miguelfreitas
Gavin Andresen 13 years ago
parent
commit
c7fa55bb87
  1. 106
      src/net.cpp
  2. 61
      src/net.h

106
src/net.cpp

@ -289,105 +289,6 @@ void AddressCurrentlyConnected(const CService& addr)
void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1)
{
// If the dialog might get closed before the reply comes back,
// call this in the destructor so it doesn't get called after it's deleted.
CRITICAL_BLOCK(cs_vNodes)
{
BOOST_FOREACH(CNode* pnode, vNodes)
{
CRITICAL_BLOCK(pnode->cs_mapRequests)
{
for (map<uint256, CRequestTracker>::iterator mi = pnode->mapRequests.begin(); mi != pnode->mapRequests.end();)
{
CRequestTracker& tracker = (*mi).second;
if (tracker.fn == fn && tracker.param1 == param1)
pnode->mapRequests.erase(mi++);
else
mi++;
}
}
}
}
}
//
// Subscription methods for the broadcast and subscription system.
// Channel numbers are message numbers, i.e. MSG_TABLE and MSG_PRODUCT.
//
// The subscription system uses a meet-in-the-middle strategy.
// With 100,000 nodes, if senders broadcast to 1000 random nodes and receivers
// subscribe to 1000 random nodes, 99.995% (1 - 0.99^1000) of messages will get through.
//
bool AnySubscribed(unsigned int nChannel)
{
if (pnodeLocalHost->IsSubscribed(nChannel))
return true;
CRITICAL_BLOCK(cs_vNodes)
BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode->IsSubscribed(nChannel))
return true;
return false;
}
bool CNode::IsSubscribed(unsigned int nChannel)
{
if (nChannel >= vfSubscribe.size())
return false;
return vfSubscribe[nChannel];
}
void CNode::Subscribe(unsigned int nChannel, unsigned int nHops)
{
if (nChannel >= vfSubscribe.size())
return;
if (!AnySubscribed(nChannel))
{
// Relay subscribe
CRITICAL_BLOCK(cs_vNodes)
BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode != this)
pnode->PushMessage("subscribe", nChannel, nHops);
}
vfSubscribe[nChannel] = true;
}
void CNode::CancelSubscribe(unsigned int nChannel)
{
if (nChannel >= vfSubscribe.size())
return;
// Prevent from relaying cancel if wasn't subscribed
if (!vfSubscribe[nChannel])
return;
vfSubscribe[nChannel] = false;
if (!AnySubscribed(nChannel))
{
// Relay subscription cancel
CRITICAL_BLOCK(cs_vNodes)
BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode != this)
pnode->PushMessage("sub-cancel", nChannel);
}
}
CNode* FindNode(const CNetAddr& ip) CNode* FindNode(const CNetAddr& ip)
@ -486,13 +387,6 @@ void CNode::CloseSocketDisconnect()
void CNode::Cleanup() void CNode::Cleanup()
{ {
// All of a nodes broadcasts and subscriptions are automatically torn down
// when it goes down, so a node has to stay up to keep its broadcast going.
// Cancel subscriptions
for (unsigned int nChannel = 0; nChannel < vfSubscribe.size(); nChannel++)
if (vfSubscribe[nChannel])
CancelSubscribe(nChannel);
} }

61
src/net.h

@ -29,7 +29,6 @@ extern int nBestHeight;
inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 10*1000); } inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 10*1000); }
inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 10*1000); } inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 10*1000); }
static const unsigned int PUBLISH_HOPS = 5;
bool RecvLine(SOCKET hSocket, std::string& strLine); bool RecvLine(SOCKET hSocket, std::string& strLine);
bool GetMyExternalIP(CNetAddr& ipRet); bool GetMyExternalIP(CNetAddr& ipRet);
@ -37,8 +36,6 @@ void AddressCurrentlyConnected(const CService& addr);
CNode* FindNode(const CNetAddr& ip); CNode* FindNode(const CNetAddr& ip);
CNode* FindNode(const CService& ip); CNode* FindNode(const CService& ip);
CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0); CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0);
void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1);
bool AnySubscribed(unsigned int nChannel);
void MapPort(bool fMapPort); void MapPort(bool fMapPort);
bool BindListenPort(std::string& strError=REF(std::string())); bool BindListenPort(std::string& strError=REF(std::string()));
void StartNode(void* parg); void StartNode(void* parg);
@ -160,9 +157,6 @@ public:
CCriticalSection cs_inventory; CCriticalSection cs_inventory;
std::multimap<int64, CInv> mapAskFor; std::multimap<int64, CInv> mapAskFor;
// publish and subscription
std::vector<char> vfSubscribe;
CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false) CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false)
{ {
nServices = 0; nServices = 0;
@ -192,7 +186,6 @@ public:
hashLastGetBlocksEnd = 0; hashLastGetBlocksEnd = 0;
nStartingHeight = -1; nStartingHeight = -1;
fGetAddr = false; fGetAddr = false;
vfSubscribe.assign(256, false);
nMisbehavior = 0; nMisbehavior = 0;
setInventoryKnown.max_size(SendBufferSize() / 1000); setInventoryKnown.max_size(SendBufferSize() / 1000);
@ -634,58 +627,4 @@ inline void RelayMessage<>(const CInv& inv, const CDataStream& ss)
} }
//
// Templates for the publish and subscription system.
// The object being published as T& obj needs to have:
// a set<unsigned int> setSources member
// specializations of AdvertInsert and AdvertErase
// Currently implemented for CTable and CProduct.
//
template<typename T>
void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
{
// Add to sources
obj.setSources.insert(pfrom->addr.ip);
if (!AdvertInsert(obj))
return;
// Relay
CRITICAL_BLOCK(cs_vNodes)
BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
pnode->PushMessage("publish", nChannel, nHops, obj);
}
template<typename T>
void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
{
uint256 hash = obj.GetHash();
CRITICAL_BLOCK(cs_vNodes)
BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
pnode->PushMessage("pub-cancel", nChannel, nHops, hash);
AdvertErase(obj);
}
template<typename T>
void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
{
// Remove a source
obj.setSources.erase(pfrom->addr.ip);
// If no longer supported by any sources, cancel it
if (obj.setSources.empty())
AdvertStopPublish(pfrom, nChannel, nHops, obj);
}
#endif #endif

Loading…
Cancel
Save