diff --git a/src/net.cpp b/src/net.cpp index 8603514f9..67427a3e8 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -35,7 +35,7 @@ void ThreadOpenAddedConnections2(void* parg); void ThreadMapPort2(void* parg); #endif void ThreadDNSAddressSeed2(void* parg); -bool OpenNetworkConnection(const CAddress& addrConnect, const char *strDest = NULL, bool fOneShot = false); +bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false); @@ -66,10 +66,7 @@ CCriticalSection cs_vOneShots; set setservAddNodeAddresses; CCriticalSection cs_setservAddNodeAddresses; -static CWaitableCriticalSection csOutbound; -static int nOutbound = 0; -static CConditionVariable condOutbound; - +static CSemaphore *semOutbound = NULL; void AddOneShot(string strDest) { @@ -463,10 +460,6 @@ CNode* ConnectNode(CAddress addrConnect, const char *pszDest, int64 nTimeout) LOCK(cs_vNodes); vNodes.push_back(pnode); } - { - WAITABLE_LOCK(csOutbound); - nOutbound++; - } pnode->nTimeConnected = GetTime(); return pnode; @@ -612,14 +605,8 @@ void ThreadSocketHandler2(void* parg) // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); - if (!pnode->fInbound) - { - WAITABLE_LOCK(csOutbound); - nOutbound--; - - // Connection slot(s) were removed, notify connection creator(s) - NOTIFY(condOutbound); - } + // release outbound grant (if any) + pnode->grantOutbound.Release(); // close socket and cleanup pnode->CloseSocketDisconnect(); @@ -1295,8 +1282,11 @@ void static ProcessOneShot() vOneShots.pop_front(); } CAddress addr; - if (!OpenNetworkConnection(addr, strDest.c_str(), true)) - AddOneShot(strDest); + CSemaphoreGrant grant(*semOutbound, true); + if (grant) { + if (!OpenNetworkConnection(addr, &grant, strDest.c_str(), true)) + AddOneShot(strDest); + } } void ThreadOpenConnections2(void* parg) @@ -1312,7 +1302,7 @@ void ThreadOpenConnections2(void* parg) BOOST_FOREACH(string strAddr, mapMultiArgs["-connect"]) { CAddress addr; - OpenNetworkConnection(addr, strAddr.c_str()); + OpenNetworkConnection(addr, NULL, strAddr.c_str()); for (int i = 0; i < 10 && i < nLoop; i++) { Sleep(500); @@ -1335,13 +1325,9 @@ void ThreadOpenConnections2(void* parg) if (fShutdown) return; - // Limit outbound connections - int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125)); + vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - { - WAITABLE_LOCK(csOutbound); - WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound); - } + CSemaphoreGrant grant(*semOutbound); vnThreadsRunning[THREAD_OPENCONNECTIONS]++; if (fShutdown) return; @@ -1374,11 +1360,15 @@ void ThreadOpenConnections2(void* parg) // Only connect to one address per a.b.?.? range. // Do this here so we don't have to critsect vNodes inside mapAddresses critsect. + int nOutbound = 0; set > setConnected; { LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) + BOOST_FOREACH(CNode* pnode, vNodes) { setConnected.insert(pnode->addr.GetGroup()); + if (!pnode->fInbound) + nOutbound++; + } } int64 nANow = GetAdjustedTime(); @@ -1408,7 +1398,7 @@ void ThreadOpenConnections2(void* parg) } if (addrConnect.IsValid()) - OpenNetworkConnection(addrConnect); + OpenNetworkConnection(addrConnect, &grant); } } @@ -1442,7 +1432,8 @@ void ThreadOpenAddedConnections2(void* parg) while(!fShutdown) { BOOST_FOREACH(string& strAddNode, mapMultiArgs["-addnode"]) { CAddress addr; - OpenNetworkConnection(addr, strAddNode.c_str()); + CSemaphoreGrant grant(*semOutbound); + OpenNetworkConnection(addr, &grant, strAddNode.c_str()); Sleep(500); } vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; @@ -1485,7 +1476,8 @@ void ThreadOpenAddedConnections2(void* parg) } BOOST_FOREACH(vector& vserv, vservConnectAddresses) { - OpenNetworkConnection(CAddress(*(vserv.begin()))); + CSemaphoreGrant grant(*semOutbound); + OpenNetworkConnection(CAddress(*(vserv.begin())), &grant); Sleep(500); if (fShutdown) return; @@ -1500,7 +1492,8 @@ void ThreadOpenAddedConnections2(void* parg) } } -bool OpenNetworkConnection(const CAddress& addrConnect, const char *strDest, bool fOneShot) +// if succesful, this moves the passed grant to the constructed node +bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound, const char *strDest, bool fOneShot) { // // Initiate outbound network connection @@ -1522,6 +1515,8 @@ bool OpenNetworkConnection(const CAddress& addrConnect, const char *strDest, boo return false; if (!pnode) return false; + if (grantOutbound) + grantOutbound->MoveTo(pnode->grantOutbound); pnode->fNetworkNode = true; if (fOneShot) pnode->fOneShot = true; @@ -1770,6 +1765,12 @@ void StartNode(void* parg) #endif #endif + if (semOutbound == NULL) { + // initialize semaphore + int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125)); + semOutbound = new CSemaphore(nMaxOutbound); + } + if (pnodeLocalHost == NULL) pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService("127.0.0.1", 0), nLocalServices)); @@ -1823,7 +1824,8 @@ bool StopNode() fShutdown = true; nTransactionsUpdated++; int64 nStart = GetTime(); - NOTIFY_ALL(condOutbound); + for (int i=0; ipost(); do { int nThreadsRunning = 0; diff --git a/src/net.h b/src/net.h index a00dd1b8c..4e4ea31ea 100644 --- a/src/net.h +++ b/src/net.h @@ -147,6 +147,7 @@ public: bool fNetworkNode; bool fSuccessfullyConnected; bool fDisconnect; + CSemaphoreGrant grantOutbound; protected: int nRefCount; diff --git a/src/util.h b/src/util.h index ebd574f89..61ff55353 100644 --- a/src/util.h +++ b/src/util.h @@ -23,7 +23,7 @@ typedef int pid_t; /* define for windows compatiblity */ #include #include #include -#include +#include #include #include #include @@ -275,24 +275,10 @@ public: }; typedef CMutexLock CCriticalBlock; -typedef CMutexLock CWaitableCriticalBlock; -typedef boost::interprocess::interprocess_condition CConditionVariable; - -/** Wait for a given condition inside a WAITABLE_CRITICAL_BLOCK */ -#define WAIT(name,condition) \ - do { while(!(condition)) { (name).wait(waitablecriticalblock.GetLock()); } } while(0) - -/** Notify waiting threads that a condition may hold now */ -#define NOTIFY(name) \ - do { (name).notify_one(); } while(0) - -#define NOTIFY_ALL(name) \ - do { (name).notify_all(); } while(0) #define LOCK(cs) CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__) #define LOCK2(cs1,cs2) CCriticalBlock criticalblock1(cs1, #cs1, __FILE__, __LINE__),criticalblock2(cs2, #cs2, __FILE__, __LINE__) #define TRY_LOCK(cs,name) CCriticalBlock name(cs, #cs, __FILE__, __LINE__, true) -#define WAITABLE_LOCK(cs) CWaitableCriticalBlock waitablecriticalblock(cs, #cs, __FILE__, __LINE__) #define ENTER_CRITICAL_SECTION(cs) \ { \ @@ -306,6 +292,61 @@ typedef boost::interprocess::interprocess_condition CConditionVariable; LeaveCritical(); \ } +typedef boost::interprocess::interprocess_semaphore CSemaphore; + +/** RAII-style semaphore lock */ +class CSemaphoreGrant +{ +private: + CSemaphore *sem; + bool fHaveGrant; + +public: + void Acquire() { + if (fHaveGrant) + return; + sem->wait(); + fHaveGrant = true; + } + + void Release() { + if (!fHaveGrant) + return; + sem->post(); + fHaveGrant = false; + } + + bool TryAcquire() { + if (!fHaveGrant && sem->try_wait()) + fHaveGrant = true; + return fHaveGrant; + } + + void MoveTo(CSemaphoreGrant &grant) { + grant.Release(); + grant.sem = sem; + grant.fHaveGrant = fHaveGrant; + sem = NULL; + fHaveGrant = false; + } + + CSemaphoreGrant() : sem(NULL), fHaveGrant(false) {} + + CSemaphoreGrant(CSemaphore &sema, bool fTry = false) : sem(&sema), fHaveGrant(false) { + if (fTry) + TryAcquire(); + else + Acquire(); + } + + ~CSemaphoreGrant() { + Release(); + } + + operator bool() { + return fHaveGrant; + } +}; inline std::string i64tostr(int64 n) {