Browse Source

More robust CScheduler unit test

On a busy or slow system, the CScheduler unit test could fail because it
assumed all threads would be done after a couple of milliseconds.

Replace the hard-coded sleep with CScheduler stop() method that
will cleanly exit the servicing threads when all tasks are completely
finished.
0.13
Gavin Andresen 10 years ago
parent
commit
f50105486f
No known key found for this signature in database
GPG Key ID: 7588242FBE38D3A8
  1. 46
      src/scheduler.cpp
  2. 15
      src/scheduler.h
  3. 27
      src/test/scheduler_tests.cpp

46
src/scheduler.cpp

@ -8,7 +8,7 @@
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <utility> #include <utility>
CScheduler::CScheduler() : nThreadsServicingQueue(0) CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false)
{ {
} }
@ -29,32 +29,37 @@ void CScheduler::serviceQueue()
{ {
boost::unique_lock<boost::mutex> lock(newTaskMutex); boost::unique_lock<boost::mutex> lock(newTaskMutex);
++nThreadsServicingQueue; ++nThreadsServicingQueue;
stopRequested = false;
stopWhenEmpty = false;
// newTaskMutex is locked throughout this loop EXCEPT // newTaskMutex is locked throughout this loop EXCEPT
// when the thread is waiting or when the user's function // when the thread is waiting or when the user's function
// is called. // is called.
while (1) { while (!shouldStop()) {
try { try {
while (taskQueue.empty()) { while (!shouldStop() && taskQueue.empty()) {
// Wait until there is something to do. // Wait until there is something to do.
newTaskScheduled.wait(lock); newTaskScheduled.wait(lock);
} }
// Wait until either there is a new task, or until
// the time of the first item on the queue: // Wait until either there is a new task, or until
// the time of the first item on the queue:
// wait_until needs boost 1.50 or later; older versions have timed_wait: // wait_until needs boost 1.50 or later; older versions have timed_wait:
#if BOOST_VERSION < 105000 #if BOOST_VERSION < 105000
while (!taskQueue.empty() && newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) { while (!shouldStop() && !taskQueue.empty() &&
newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
// Keep waiting until timeout // Keep waiting until timeout
} }
#else #else
while (!taskQueue.empty() && newTaskScheduled.wait_until(lock, taskQueue.begin()->first) != boost::cv_status::timeout) { while (!shouldStop() && !taskQueue.empty() &&
newTaskScheduled.wait_until(lock, taskQueue.begin()->first) != boost::cv_status::timeout) {
// Keep waiting until timeout // Keep waiting until timeout
} }
#endif #endif
// If there are multiple threads, the queue can empty while we're waiting (another // If there are multiple threads, the queue can empty while we're waiting (another
// thread may service the task we were waiting on). // thread may service the task we were waiting on).
if (taskQueue.empty()) if (shouldStop() || taskQueue.empty())
continue; continue;
Function f = taskQueue.begin()->second; Function f = taskQueue.begin()->second;
@ -70,6 +75,19 @@ void CScheduler::serviceQueue()
throw; throw;
} }
} }
--nThreadsServicingQueue;
}
void CScheduler::stop(bool drain)
{
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
if (drain)
stopWhenEmpty = true;
else
stopRequested = true;
}
newTaskScheduled.notify_all();
} }
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t) void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
@ -96,3 +114,15 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds)
{ {
scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds); scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds);
} }
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
size_t result = taskQueue.size();
if (!taskQueue.empty()) {
first = taskQueue.begin()->first;
last = taskQueue.rbegin()->first;
}
return result;
}

15
src/scheduler.h

@ -60,11 +60,24 @@ public:
// and interrupted using boost::interrupt_thread // and interrupted using boost::interrupt_thread
void serviceQueue(); void serviceQueue();
// Tell any threads running serviceQueue to stop as soon as they're
// done servicing whatever task they're currently servicing (drain=false)
// or when there is no work left to be done (drain=true)
void stop(bool drain=false);
// Returns number of tasks waiting to be serviced,
// and first and last task times
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const;
private: private:
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue; std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
boost::condition_variable newTaskScheduled; boost::condition_variable newTaskScheduled;
boost::mutex newTaskMutex; mutable boost::mutex newTaskMutex;
int nThreadsServicingQueue; int nThreadsServicingQueue;
bool stopRequested;
bool stopWhenEmpty;
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
}; };
#endif #endif

27
src/test/scheduler_tests.cpp

@ -42,6 +42,8 @@ static void MicroSleep(uint64_t n)
BOOST_AUTO_TEST_CASE(manythreads) BOOST_AUTO_TEST_CASE(manythreads)
{ {
seed_insecure_rand(false);
// Stress test: hundreds of microsecond-scheduled tasks, // Stress test: hundreds of microsecond-scheduled tasks,
// serviced by 10 threads. // serviced by 10 threads.
// //
@ -54,10 +56,6 @@ BOOST_AUTO_TEST_CASE(manythreads)
// counters should sum to the number of initial tasks performed. // counters should sum to the number of initial tasks performed.
CScheduler microTasks; CScheduler microTasks;
boost::thread_group microThreads;
for (int i = 0; i < 5; i++)
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, &microTasks));
boost::mutex counterMutex[10]; boost::mutex counterMutex[10];
int counter[10] = { 0 }; int counter[10] = { 0 };
boost::random::mt19937 rng(insecure_rand()); boost::random::mt19937 rng(insecure_rand());
@ -67,6 +65,9 @@ BOOST_AUTO_TEST_CASE(manythreads)
boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now(); boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
boost::chrono::system_clock::time_point now = start; boost::chrono::system_clock::time_point now = start;
boost::chrono::system_clock::time_point first, last;
size_t nTasks = microTasks.getQueueInfo(first, last);
BOOST_CHECK(nTasks == 0);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
@ -77,9 +78,19 @@ BOOST_AUTO_TEST_CASE(manythreads)
randomDelta(rng), tReschedule); randomDelta(rng), tReschedule);
microTasks.schedule(f, t); microTasks.schedule(f, t);
} }
nTasks = microTasks.getQueueInfo(first, last);
BOOST_CHECK(nTasks == 100);
BOOST_CHECK(first < last);
BOOST_CHECK(last > now);
// As soon as these are created they will start running and servicing the queue
boost::thread_group microThreads;
for (int i = 0; i < 5; i++)
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, &microTasks));
MicroSleep(600); MicroSleep(600);
now = boost::chrono::system_clock::now(); now = boost::chrono::system_clock::now();
// More threads and more tasks: // More threads and more tasks:
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, &microTasks)); microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, &microTasks));
@ -93,11 +104,9 @@ BOOST_AUTO_TEST_CASE(manythreads)
microTasks.schedule(f, t); microTasks.schedule(f, t);
} }
// All 2,000 tasks should be finished within 2 milliseconds. Sleep a bit longer. // Drain the task queue then exit threads
MicroSleep(2100); microTasks.stop(true);
microThreads.join_all(); // ... wait until all the threads are done
microThreads.interrupt_all();
microThreads.join_all();
int counterSum = 0; int counterSum = 0;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {

Loading…
Cancel
Save