Browse Source

Merge pull request #5964

9a1dcea Use CScheduler for net's DumpAddresses (Gavin Andresen)
ddd0acd Create a scheduler thread for lightweight tasks (Gavin Andresen)
68d370b CScheduler unit test (Gavin Andresen)
cfefe5b scheduler: fix with boost <= 1.50 (Cory Fields)
ca66717 build: make libboost_chrono mandatory (Cory Fields)
928b950 CScheduler class for lightweight task scheduling (Gavin Andresen)
e656560 [Qt] add defaultConfirmTarget constant to sendcoinsdialog (Philip Kaufmann)
0.13
Gavin Andresen 10 years ago
parent
commit
b4c219b622
No known key found for this signature in database
GPG Key ID: 7588242FBE38D3A8
  1. 9
      configure.ac
  2. 2
      src/Makefile.am
  3. 1
      src/Makefile.test.include
  4. 4
      src/bitcoind.cpp
  5. 9
      src/init.cpp
  6. 3
      src/init.h
  7. 5
      src/net.cpp
  8. 3
      src/net.h
  9. 4
      src/qt/bitcoin.cpp
  10. 98
      src/scheduler.cpp
  11. 70
      src/scheduler.h
  12. 110
      src/test/scheduler_tests.cpp
  13. 37
      src/util.h

9
configure.ac

@ -590,17 +590,15 @@ fi @@ -590,17 +590,15 @@ fi
if test x$use_boost = xyes; then
BOOST_LIBS="$BOOST_LDFLAGS $BOOST_SYSTEM_LIB $BOOST_FILESYSTEM_LIB $BOOST_PROGRAM_OPTIONS_LIB $BOOST_THREAD_LIB"
BOOST_LIBS="$BOOST_LDFLAGS $BOOST_SYSTEM_LIB $BOOST_FILESYSTEM_LIB $BOOST_PROGRAM_OPTIONS_LIB $BOOST_THREAD_LIB $BOOST_CHRONO_LIB"
dnl Boost >= 1.50 uses sleep_for rather than the now-deprecated sleep, however
dnl it was broken from 1.50 to 1.52 when backed by nanosleep. Use sleep_for if
dnl a working version is available, else fall back to sleep. sleep was removed
dnl after 1.56.
dnl If neither is available, abort.
dnl If sleep_for is used, boost_chrono becomes a requirement.
if test x$ax_cv_boost_chrono = xyes; then
TEMP_LIBS="$LIBS"
LIBS="$BOOST_LIBS $BOOST_CHRONO_LIB $LIBS"
LIBS="$BOOST_LIBS $LIBS"
TEMP_CPPFLAGS="$CPPFLAGS"
CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
AC_LINK_IFELSE([AC_LANG_PROGRAM([[
@ -613,12 +611,11 @@ AC_LINK_IFELSE([AC_LANG_PROGRAM([[ @@ -613,12 +611,11 @@ AC_LINK_IFELSE([AC_LANG_PROGRAM([[
choke me
#endif
]])],
[boost_sleep=yes; BOOST_LIBS="$BOOST_LIBS $BOOST_CHRONO_LIB";
[boost_sleep=yes;
AC_DEFINE(HAVE_WORKING_BOOST_SLEEP_FOR, 1, [Define this symbol if boost sleep_for works])],
[boost_sleep=no])
LIBS="$TEMP_LIBS"
CPPFLAGS="$TEMP_CPPFLAGS"
fi
if test x$boost_sleep != xyes; then
TEMP_LIBS="$LIBS"

2
src/Makefile.am

@ -116,6 +116,7 @@ BITCOIN_CORE_H = \ @@ -116,6 +116,7 @@ BITCOIN_CORE_H = \
rpcclient.h \
rpcprotocol.h \
rpcserver.h \
scheduler.h \
script/interpreter.h \
script/script_error.h \
script/script.h \
@ -259,6 +260,7 @@ libbitcoin_common_a_SOURCES = \ @@ -259,6 +260,7 @@ libbitcoin_common_a_SOURCES = \
netbase.cpp \
protocol.cpp \
pubkey.cpp \
scheduler.cpp \
script/interpreter.cpp \
script/script.cpp \
script/sign.cpp \

1
src/Makefile.test.include

@ -61,6 +61,7 @@ BITCOIN_TESTS =\ @@ -61,6 +61,7 @@ BITCOIN_TESTS =\
test/pow_tests.cpp \
test/rpc_tests.cpp \
test/sanity_tests.cpp \
test/scheduler_tests.cpp \
test/script_P2SH_tests.cpp \
test/script_tests.cpp \
test/scriptnum_tests.cpp \

4
src/bitcoind.cpp

@ -8,6 +8,7 @@ @@ -8,6 +8,7 @@
#include "init.h"
#include "main.h"
#include "noui.h"
#include "scheduler.h"
#include "util.h"
#include <boost/algorithm/string/predicate.hpp>
@ -55,6 +56,7 @@ void WaitForShutdown(boost::thread_group* threadGroup) @@ -55,6 +56,7 @@ void WaitForShutdown(boost::thread_group* threadGroup)
bool AppInit(int argc, char* argv[])
{
boost::thread_group threadGroup;
CScheduler scheduler;
bool fRet = false;
@ -142,7 +144,7 @@ bool AppInit(int argc, char* argv[]) @@ -142,7 +144,7 @@ bool AppInit(int argc, char* argv[])
#endif
SoftSetBoolArg("-server", true);
fRet = AppInit2(threadGroup);
fRet = AppInit2(threadGroup, scheduler);
}
catch (const std::exception& e) {
PrintExceptionContinue(&e, "AppInit()");

9
src/init.cpp

@ -19,6 +19,7 @@ @@ -19,6 +19,7 @@
#include "net.h"
#include "rpcserver.h"
#include "script/standard.h"
#include "scheduler.h"
#include "txdb.h"
#include "ui_interface.h"
#include "util.h"
@ -564,7 +565,7 @@ bool InitSanityCheck(void) @@ -564,7 +565,7 @@ bool InitSanityCheck(void)
/** Initialize bitcoin.
* @pre Parameters should be parsed and config file should be read.
*/
bool AppInit2(boost::thread_group& threadGroup)
bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
{
// ********************************************************* Step 1: setup
#ifdef _MSC_VER
@ -890,6 +891,10 @@ bool AppInit2(boost::thread_group& threadGroup) @@ -890,6 +891,10 @@ bool AppInit2(boost::thread_group& threadGroup)
threadGroup.create_thread(&ThreadScriptCheck);
}
// Start the lightweight task scheduler thread
CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
/* Start the RPC server already. It will be started in "warmup" mode
* and not really process calls already (but it will signify connections
* that the server is there and will be ready later). Warmup mode will
@ -1373,7 +1378,7 @@ bool AppInit2(boost::thread_group& threadGroup) @@ -1373,7 +1378,7 @@ bool AppInit2(boost::thread_group& threadGroup)
LogPrintf("mapAddressBook.size() = %u\n", pwalletMain ? pwalletMain->mapAddressBook.size() : 0);
#endif
StartNode(threadGroup);
StartNode(threadGroup, scheduler);
#ifdef ENABLE_WALLET
// Generate coins in the background

3
src/init.h

@ -8,6 +8,7 @@ @@ -8,6 +8,7 @@
#include <string>
class CScheduler;
class CWallet;
namespace boost
@ -20,7 +21,7 @@ extern CWallet* pwalletMain; @@ -20,7 +21,7 @@ extern CWallet* pwalletMain;
void StartShutdown();
bool ShutdownRequested();
void Shutdown();
bool AppInit2(boost::thread_group& threadGroup);
bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler);
/** The help message mode determines what help message to show */
enum HelpMessageMode {

5
src/net.cpp

@ -13,6 +13,7 @@ @@ -13,6 +13,7 @@
#include "chainparams.h"
#include "clientversion.h"
#include "primitives/transaction.h"
#include "scheduler.h"
#include "ui_interface.h"
#include "crypto/common.h"
@ -1590,7 +1591,7 @@ void static Discover(boost::thread_group& threadGroup) @@ -1590,7 +1591,7 @@ void static Discover(boost::thread_group& threadGroup)
#endif
}
void StartNode(boost::thread_group& threadGroup)
void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
{
uiInterface.InitMessage(_("Loading addresses..."));
// Load addresses for peers.dat
@ -1640,7 +1641,7 @@ void StartNode(boost::thread_group& threadGroup) @@ -1640,7 +1641,7 @@ void StartNode(boost::thread_group& threadGroup)
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler));
// Dump network addresses
threadGroup.create_thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, DUMP_ADDRESSES_INTERVAL * 1000));
scheduler.scheduleEvery(&DumpAddresses, DUMP_ADDRESSES_INTERVAL);
}
bool StopNode()

3
src/net.h

@ -32,6 +32,7 @@ @@ -32,6 +32,7 @@
class CAddrMan;
class CBlockIndex;
class CScheduler;
class CNode;
namespace boost {
@ -72,7 +73,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu @@ -72,7 +73,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu
void MapPort(bool fUseUPnP);
unsigned short GetListenPort();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
void StartNode(boost::thread_group& threadGroup);
void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler);
bool StopNode();
void SocketSendData(CNode *pnode);

4
src/qt/bitcoin.cpp

@ -26,6 +26,7 @@ @@ -26,6 +26,7 @@
#include "init.h"
#include "main.h"
#include "rpcserver.h"
#include "scheduler.h"
#include "ui_interface.h"
#include "util.h"
@ -178,6 +179,7 @@ signals: @@ -178,6 +179,7 @@ signals:
private:
boost::thread_group threadGroup;
CScheduler scheduler;
/// Pass fatal exception message to UI thread
void handleRunawayException(const std::exception *e);
@ -258,7 +260,7 @@ void BitcoinCore::initialize() @@ -258,7 +260,7 @@ void BitcoinCore::initialize()
try
{
qDebug() << __func__ << ": Running AppInit2 in thread";
int rv = AppInit2(threadGroup);
int rv = AppInit2(threadGroup, scheduler);
if(rv)
{
/* Start a dummy RPC thread if no RPC thread is active yet

98
src/scheduler.cpp

@ -0,0 +1,98 @@ @@ -0,0 +1,98 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "scheduler.h"
#include <assert.h>
#include <boost/bind.hpp>
#include <utility>
CScheduler::CScheduler() : nThreadsServicingQueue(0)
{
}
CScheduler::~CScheduler()
{
assert(nThreadsServicingQueue == 0);
}
#if BOOST_VERSION < 105000
static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
{
return boost::posix_time::from_time_t(boost::chrono::system_clock::to_time_t(t));
}
#endif
void CScheduler::serviceQueue()
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
++nThreadsServicingQueue;
// newTaskMutex is locked throughout this loop EXCEPT
// when the thread is waiting or when the user's function
// is called.
while (1) {
try {
while (taskQueue.empty()) {
// Wait until there is something to do.
newTaskScheduled.wait(lock);
}
// Wait until either there is a new task, or until
// the time of the first item on the queue:
// wait_until needs boost 1.50 or later; older versions have timed_wait:
#if BOOST_VERSION < 105000
while (!taskQueue.empty() && newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
// Keep waiting until timeout
}
#else
while (!taskQueue.empty() && newTaskScheduled.wait_until(lock, taskQueue.begin()->first) != boost::cv_status::timeout) {
// Keep waiting until timeout
}
#endif
// If there are multiple threads, the queue can empty while we're waiting (another
// thread may service the task we were waiting on).
if (taskQueue.empty())
continue;
Function f = taskQueue.begin()->second;
taskQueue.erase(taskQueue.begin());
// Unlock before calling f, so it can reschedule itself or another task
// without deadlocking:
lock.unlock();
f();
lock.lock();
} catch (...) {
--nThreadsServicingQueue;
throw;
}
}
}
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
{
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
taskQueue.insert(std::make_pair(t, f));
}
newTaskScheduled.notify_one();
}
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaSeconds)
{
schedule(f, boost::chrono::system_clock::now() + boost::chrono::seconds(deltaSeconds));
}
static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaSeconds)
{
f();
s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaSeconds), deltaSeconds);
}
void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds)
{
scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds);
}

70
src/scheduler.h

@ -0,0 +1,70 @@ @@ -0,0 +1,70 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_SCHEDULER_H
#define BITCOIN_SCHEDULER_H
//
// NOTE:
// boost::thread / boost::function / boost::chrono should be ported to
// std::thread / std::function / std::chrono when we support C++11.
//
#include <boost/function.hpp>
#include <boost/chrono/chrono.hpp>
#include <boost/thread.hpp>
#include <map>
//
// Simple class for background tasks that should be run
// periodically or once "after a while"
//
// Usage:
//
// CScheduler* s = new CScheduler();
// s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { }
// s->scheduleFromNow(boost::bind(Class::func, this, argument), 3);
// boost::thread* t = new boost::thread(boost::bind(CScheduler::serviceQueue, s));
//
// ... then at program shutdown, clean up the thread running serviceQueue:
// t->interrupt();
// t->join();
// delete t;
// delete s; // Must be done after thread is interrupted/joined.
//
class CScheduler
{
public:
CScheduler();
~CScheduler();
typedef boost::function<void(void)> Function;
// Call func at/after time t
void schedule(Function f, boost::chrono::system_clock::time_point t);
// Convenience method: call f once deltaSeconds from now
void scheduleFromNow(Function f, int64_t deltaSeconds);
// Another convenience method: call f approximately
// every deltaSeconds forever, starting deltaSeconds from now.
// To be more precise: every time f is finished, it
// is rescheduled to run deltaSeconds later. If you
// need more accurate scheduling, don't use this method.
void scheduleEvery(Function f, int64_t deltaSeconds);
// To keep things as simple as possible, there is no unschedule.
// Services the queue 'forever'. Should be run in a thread,
// and interrupted using boost::interrupt_thread
void serviceQueue();
private:
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
boost::condition_variable newTaskScheduled;
boost::mutex newTaskMutex;
int nThreadsServicingQueue;
};
#endif

110
src/test/scheduler_tests.cpp

@ -0,0 +1,110 @@ @@ -0,0 +1,110 @@
// Copyright (c) 2012-2013 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "random.h"
#include "scheduler.h"
#include "test/test_bitcoin.h"
#include <boost/bind.hpp>
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_int_distribution.hpp>
#include <boost/thread.hpp>
#include <boost/test/unit_test.hpp>
BOOST_AUTO_TEST_SUITE(scheduler_tests)
static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, boost::chrono::system_clock::time_point rescheduleTime)
{
{
boost::unique_lock<boost::mutex> lock(mutex);
counter += delta;
}
boost::chrono::system_clock::time_point noTime = boost::chrono::system_clock::time_point::min();
if (rescheduleTime != noTime) {
CScheduler::Function f = boost::bind(&microTask, boost::ref(s), boost::ref(mutex), boost::ref(counter), -delta + 1, noTime);
s.schedule(f, rescheduleTime);
}
}
static void MicroSleep(uint64_t n)
{
#if defined(HAVE_WORKING_BOOST_SLEEP_FOR)
boost::this_thread::sleep_for(boost::chrono::microseconds(n));
#elif defined(HAVE_WORKING_BOOST_SLEEP)
boost::this_thread::sleep(boost::posix_time::microseconds(n));
#else
//should never get here
#error missing boost sleep implementation
#endif
}
BOOST_AUTO_TEST_CASE(manythreads)
{
// Stress test: hundreds of microsecond-scheduled tasks,
// serviced by 10 threads.
//
// So... ten shared counters, which if all the tasks execute
// properly will sum to the number of tasks done.
// Each task adds or subtracts from one of the counters a
// random amount, and then schedules another task 0-1000
// microseconds in the future to subtract or add from
// the counter -random_amount+1, so in the end the shared
// counters should sum to the number of initial tasks performed.
CScheduler microTasks;
boost::thread_group microThreads;
for (int i = 0; i < 5; i++)
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, &microTasks));
boost::mutex counterMutex[10];
int counter[10] = { 0 };
boost::random::mt19937 rng(insecure_rand());
boost::random::uniform_int_distribution<> zeroToNine(0, 9);
boost::random::uniform_int_distribution<> randomMsec(-11, 1000);
boost::random::uniform_int_distribution<> randomDelta(-1000, 1000);
boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
boost::chrono::system_clock::time_point now = start;
for (int i = 0; i < 100; i++) {
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng);
CScheduler::Function f = boost::bind(&microTask, boost::ref(microTasks),
boost::ref(counterMutex[whichCounter]), boost::ref(counter[whichCounter]),
randomDelta(rng), tReschedule);
microTasks.schedule(f, t);
}
MicroSleep(600);
now = boost::chrono::system_clock::now();
// More threads and more tasks:
for (int i = 0; i < 5; i++)
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, &microTasks));
for (int i = 0; i < 100; i++) {
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng);
CScheduler::Function f = boost::bind(&microTask, boost::ref(microTasks),
boost::ref(counterMutex[whichCounter]), boost::ref(counter[whichCounter]),
randomDelta(rng), tReschedule);
microTasks.schedule(f, t);
}
// All 2,000 tasks should be finished within 2 milliseconds. Sleep a bit longer.
MicroSleep(2100);
microThreads.interrupt_all();
microThreads.join_all();
int counterSum = 0;
for (int i = 0; i < 10; i++) {
BOOST_CHECK(counter[i] != 0);
counterSum += counter[i];
}
BOOST_CHECK_EQUAL(counterSum, 200);
}
BOOST_AUTO_TEST_SUITE_END()

37
src/util.h

@ -202,43 +202,6 @@ std::string HelpMessageOpt(const std::string& option, const std::string& message @@ -202,43 +202,6 @@ std::string HelpMessageOpt(const std::string& option, const std::string& message
void SetThreadPriority(int nPriority);
void RenameThread(const char* name);
/**
* Standard wrapper for do-something-forever thread functions.
* "Forever" really means until the thread is interrupted.
* Use it like:
* new boost::thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, 900000));
* or maybe:
* boost::function<void()> f = boost::bind(&FunctionWithArg, argument);
* threadGroup.create_thread(boost::bind(&LoopForever<boost::function<void()> >, "nothing", f, milliseconds));
*/
template <typename Callable> void LoopForever(const char* name, Callable func, int64_t msecs)
{
std::string s = strprintf("bitcoin-%s", name);
RenameThread(s.c_str());
LogPrintf("%s thread start\n", name);
try
{
while (1)
{
MilliSleep(msecs);
func();
}
}
catch (const boost::thread_interrupted&)
{
LogPrintf("%s thread stop\n", name);
throw;
}
catch (const std::exception& e) {
PrintExceptionContinue(&e, name);
throw;
}
catch (...) {
PrintExceptionContinue(NULL, name);
throw;
}
}
/**
* .. and a wrapper that just calls func once
*/

Loading…
Cancel
Save