Browse Source

Support more than one CScheduler thread for serial clients

This will be used by CValidationInterface soon.

This requires a bit of work as we need to ensure that most of our
callbacks happen in-order (to avoid synchronization issues in
wallet) - we keep our own internal queue and push things onto it,
scheduling a queue-draining function immediately upon new
callbacks.
0.15
Matt Corallo 8 years ago
parent
commit
08096bbbc6
  1. 52
      src/scheduler.cpp
  2. 24
      src/scheduler.h
  3. 22
      src/validationinterface.cpp
  4. 2
      src/validationinterface.h

52
src/scheduler.cpp

@ -139,3 +139,55 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
} }
return result; return result;
} }
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
{
LOCK(m_cs_callbacks_pending);
// Try to avoid scheduling too many copies here, but if we
// accidentally have two ProcessQueue's scheduled at once its
// not a big deal.
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
}
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
}
void SingleThreadedSchedulerClient::ProcessQueue() {
std::function<void (void)> callback;
{
LOCK(m_cs_callbacks_pending);
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
m_are_callbacks_running = true;
callback = std::move(m_callbacks_pending.front());
m_callbacks_pending.pop_front();
}
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
// to ensure both happen safely even if callback() throws.
struct RAIICallbacksRunning {
SingleThreadedSchedulerClient* instance;
RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
~RAIICallbacksRunning() {
{
LOCK(instance->m_cs_callbacks_pending);
instance->m_are_callbacks_running = false;
}
instance->MaybeScheduleProcessQueue();
}
} raiicallbacksrunning(this);
callback();
}
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
assert(m_pscheduler);
{
LOCK(m_cs_callbacks_pending);
m_callbacks_pending.emplace_back(std::move(func));
}
MaybeScheduleProcessQueue();
}

24
src/scheduler.h

@ -14,6 +14,8 @@
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <map> #include <map>
#include "sync.h"
// //
// Simple class for background tasks that should be run // Simple class for background tasks that should be run
// periodically or once "after a while" // periodically or once "after a while"
@ -79,4 +81,26 @@ private:
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
}; };
/**
* Class used by CScheduler clients which may schedule multiple jobs
* which are required to be run serially. Does not require such jobs
* to be executed on the same thread, but no two jobs will be executed
* at the same time.
*/
class SingleThreadedSchedulerClient {
private:
CScheduler *m_pscheduler;
CCriticalSection m_cs_callbacks_pending;
std::list<std::function<void (void)>> m_callbacks_pending;
bool m_are_callbacks_running = false;
void MaybeScheduleProcessQueue();
void ProcessQueue();
public:
SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
void AddToProcessQueue(std::function<void (void)> func);
};
#endif #endif

22
src/validationinterface.cpp

@ -6,6 +6,11 @@
#include "validationinterface.h" #include "validationinterface.h"
#include "init.h" #include "init.h"
#include "scheduler.h" #include "scheduler.h"
#include "sync.h"
#include "util.h"
#include <list>
#include <atomic>
#include <boost/signals2/signal.hpp> #include <boost/signals2/signal.hpp>
@ -20,22 +25,23 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked; boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock; boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;
CScheduler *m_scheduler = NULL; // We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SingleThreadedSchedulerClient m_schedulerClient;
MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}
}; };
static CMainSignals g_signals; static CMainSignals g_signals;
CMainSignals::CMainSignals() {
m_internals.reset(new MainSignalsInstance());
}
void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) { void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) {
assert(!m_internals->m_scheduler); assert(!m_internals);
m_internals->m_scheduler = &scheduler; m_internals.reset(new MainSignalsInstance(&scheduler));
} }
void CMainSignals::UnregisterBackgroundSignalScheduler() { void CMainSignals::UnregisterBackgroundSignalScheduler() {
m_internals->m_scheduler = NULL; m_internals.reset(nullptr);
} }
CMainSignals& GetMainSignals() CMainSignals& GetMainSignals()

2
src/validationinterface.h

@ -75,8 +75,6 @@ private:
friend void ::UnregisterAllValidationInterfaces(); friend void ::UnregisterAllValidationInterfaces();
public: public:
CMainSignals();
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */ /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
void RegisterBackgroundSignalScheduler(CScheduler& scheduler); void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
/** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */ /** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */

Loading…
Cancel
Save