Browse Source

CScheduler class for lightweight task scheduling

Simple class to manage a task queue that is serviced by one or
more threads.
0.13
Gavin Andresen 10 years ago
parent
commit
928b950e3b
No known key found for this signature in database
GPG Key ID: 7588242FBE38D3A8
  1. 2
      src/Makefile.am
  2. 102
      src/scheduler.cpp
  3. 70
      src/scheduler.h

2
src/Makefile.am

@ -115,6 +115,7 @@ BITCOIN_CORE_H = \
rpcclient.h \ rpcclient.h \
rpcprotocol.h \ rpcprotocol.h \
rpcserver.h \ rpcserver.h \
scheduler.h \
script/interpreter.h \ script/interpreter.h \
script/script_error.h \ script/script_error.h \
script/script.h \ script/script.h \
@ -257,6 +258,7 @@ libbitcoin_common_a_SOURCES = \
netbase.cpp \ netbase.cpp \
protocol.cpp \ protocol.cpp \
pubkey.cpp \ pubkey.cpp \
scheduler.cpp \
script/interpreter.cpp \ script/interpreter.cpp \
script/script.cpp \ script/script.cpp \
script/sign.cpp \ script/sign.cpp \

102
src/scheduler.cpp

@ -0,0 +1,102 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "scheduler.h"
#include <assert.h>
#include <boost/bind.hpp>
#include <utility>
CScheduler::CScheduler() : nThreadsServicingQueue(0)
{
}
CScheduler::~CScheduler()
{
assert(nThreadsServicingQueue == 0);
}
#if BOOST_VERSION < 105000
static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
{
boost::chrono::system_clock::duration d = t.time_since_epoch();
boost::chrono::microseconds usecs = boost::chrono::duration_cast<boost::chrono::microseconds>(d);
boost::system_time result = boost::posix_time::from_time_t(0) +
boost::posix_time::microseconds(usecs.count());
return result;
}
#endif
void CScheduler::serviceQueue()
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
++nThreadsServicingQueue;
// newTaskMutex is locked throughout this loop EXCEPT
// when the thread is waiting or when the user's function
// is called.
while (1) {
try {
while (taskQueue.empty()) {
// Wait until there is something to do.
newTaskScheduled.wait(lock);
}
// Wait until either there is a new task, or until
// the time of the first item on the queue:
// wait_until needs boost 1.50 or later; older versions have timed_wait:
#if BOOST_VERSION < 105000
while (!taskQueue.empty() && newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
// Keep waiting until timeout
}
#else
while (!taskQueue.empty() && newTaskScheduled.wait_until(lock, taskQueue.begin()->first) != boost::cv_status::timeout) {
// Keep waiting until timeout
}
#endif
// If there are multiple threads, the queue can empty while we're waiting (another
// thread may service the task we were waiting on).
if (taskQueue.empty())
continue;
Function f = taskQueue.begin()->second;
taskQueue.erase(taskQueue.begin());
// Unlock before calling f, so it can reschedule itself or another task
// without deadlocking:
lock.unlock();
f();
lock.lock();
} catch (...) {
--nThreadsServicingQueue;
throw;
}
}
}
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
{
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
taskQueue.insert(std::make_pair(t, f));
}
newTaskScheduled.notify_one();
}
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaSeconds)
{
schedule(f, boost::chrono::system_clock::now() + boost::chrono::seconds(deltaSeconds));
}
static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaSeconds)
{
f();
s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaSeconds), deltaSeconds);
}
void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds)
{
scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds);
}

70
src/scheduler.h

@ -0,0 +1,70 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_SCHEDULER_H
#define BITCOIN_SCHEDULER_H
//
// NOTE:
// boost::thread / boost::function / boost::chrono should be ported to
// std::thread / std::function / std::chrono when we support C++11.
//
#include <boost/function.hpp>
#include <boost/chrono/chrono.hpp>
#include <boost/thread.hpp>
#include <map>
//
// Simple class for background tasks that should be run
// periodically or once "after a while"
//
// Usage:
//
// CScheduler* s = new CScheduler();
// s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { }
// s->scheduleFromNow(boost::bind(Class::func, this, argument), 3);
// boost::thread* t = new boost::thread(boost::bind(CScheduler::serviceQueue, s));
//
// ... then at program shutdown, clean up the thread running serviceQueue:
// t->interrupt();
// t->join();
// delete t;
// delete s; // Must be done after thread is interrupted/joined.
//
class CScheduler
{
public:
CScheduler();
~CScheduler();
typedef boost::function<void(void)> Function;
// Call func at/after time t
void schedule(Function f, boost::chrono::system_clock::time_point t);
// Convenience method: call f once deltaSeconds from now
void scheduleFromNow(Function f, int64_t deltaSeconds);
// Another convenience method: call f approximately
// every deltaSeconds forever, starting deltaSeconds from now.
// To be more precise: every time f is finished, it
// is rescheduled to run deltaSeconds later. If you
// need more accurate scheduling, don't use this method.
void scheduleEvery(Function f, int64_t deltaSeconds);
// To keep things as simple as possible, there is no unschedule.
// Services the queue 'forever'. Should be run in a thread,
// and interrupted using boost::interrupt_thread
void serviceQueue();
private:
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
boost::condition_variable newTaskScheduled;
boost::mutex newTaskMutex;
int nThreadsServicingQueue;
};
#endif
Loading…
Cancel
Save