1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-18 05:45:45 +02:00

task: add a pause() method as an alternative to cancel()

pause(true) stops the TaskMaster from processing any more Tasks,
but does not destroy any queued Tasks.

pause(false) re-enables Task processing.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2022-11-23 22:09:33 -05:00
parent 3f740d6b2d
commit 7873988dac
2 changed files with 28 additions and 15 deletions

View File

@ -98,6 +98,11 @@ namespace crucible {
/// affected (use set_thread_count(0) to wait for those /// affected (use set_thread_count(0) to wait for those
/// to complete). /// to complete).
static void cancel(); static void cancel();
/// Stop running any new Tasks. All existing
/// Consumer threads will exit. Does not affect queue.
/// Does not wait for threads to exit. Reversible.
static void pause(bool paused = true);
}; };
class BarrierState; class BarrierState;

View File

@ -137,6 +137,7 @@ namespace crucible {
size_t m_configured_thread_max; size_t m_configured_thread_max;
double m_thread_target; double m_thread_target;
bool m_cancelled = false; bool m_cancelled = false;
bool m_paused = false;
friend class TaskConsumer; friend class TaskConsumer;
friend class TaskMaster; friend class TaskMaster;
@ -150,6 +151,7 @@ namespace crucible {
void set_loadavg_target(double target); void set_loadavg_target(double target);
void loadavg_thread_fn(); void loadavg_thread_fn();
void cancel(); void cancel();
void pause(bool paused = true);
TaskMasterState &operator=(const TaskMasterState &) = delete; TaskMasterState &operator=(const TaskMasterState &) = delete;
TaskMasterState(const TaskMasterState &) = delete; TaskMasterState(const TaskMasterState &) = delete;
@ -348,7 +350,7 @@ namespace crucible {
void void
TaskMasterState::start_threads_nolock() TaskMasterState::start_threads_nolock()
{ {
while (m_threads.size() < m_thread_max) { while (m_threads.size() < m_thread_max && !m_paused) {
m_threads.insert(make_shared<TaskConsumer>(shared_from_this())); m_threads.insert(make_shared<TaskConsumer>(shared_from_this()));
} }
} }
@ -449,8 +451,8 @@ namespace crucible {
size_t size_t
TaskMasterState::calculate_thread_count_nolock() TaskMasterState::calculate_thread_count_nolock()
{ {
if (m_cancelled) { if (m_paused) {
// No threads running while cancelled // No threads running while paused or cancelled
return 0; return 0;
} }
@ -524,12 +526,6 @@ namespace crucible {
TaskMasterState::set_thread_count(size_t thread_max) TaskMasterState::set_thread_count(size_t thread_max)
{ {
unique_lock<mutex> lock(m_mutex); unique_lock<mutex> lock(m_mutex);
// XXX: someday we might want to uncancel, and this would be the place to do it;
// however, when we cancel we destroy the entire Task queue, and that might be
// non-trivial to recover from
if (m_cancelled) {
return;
}
m_configured_thread_max = thread_max; m_configured_thread_max = thread_max;
lock.unlock(); lock.unlock();
adjust_thread_count(); adjust_thread_count();
@ -546,6 +542,7 @@ namespace crucible {
TaskMasterState::cancel() TaskMasterState::cancel()
{ {
unique_lock<mutex> lock(m_mutex); unique_lock<mutex> lock(m_mutex);
m_paused = true;
m_cancelled = true; m_cancelled = true;
decltype(m_queue) empty_queue; decltype(m_queue) empty_queue;
m_queue.swap(empty_queue); m_queue.swap(empty_queue);
@ -560,14 +557,25 @@ namespace crucible {
s_tms->cancel(); s_tms->cancel();
} }
void
TaskMasterState::pause(const bool paused)
{
unique_lock<mutex> lock(m_mutex);
m_paused = paused;
m_condvar.notify_all();
lock.unlock();
}
void
TaskMaster::pause(const bool paused)
{
s_tms->pause(paused);
}
void void
TaskMasterState::set_thread_min_count(size_t thread_min) TaskMasterState::set_thread_min_count(size_t thread_min)
{ {
unique_lock<mutex> lock(m_mutex); unique_lock<mutex> lock(m_mutex);
// XXX: someday we might want to uncancel, and this would be the place to do it
if (m_cancelled) {
return;
}
m_thread_min = thread_min; m_thread_min = thread_min;
lock.unlock(); lock.unlock();
adjust_thread_count(); adjust_thread_count();
@ -699,7 +707,7 @@ namespace crucible {
TaskConsumer::consumer_thread() TaskConsumer::consumer_thread()
{ {
// Keep a copy because we will be destroying *this later // Keep a copy because we will be destroying *this later
auto master_copy = m_master; const auto master_copy = m_master;
// Constructor is running with master locked. // Constructor is running with master locked.
// Wait until that is done before trying to do anything. // Wait until that is done before trying to do anything.
@ -715,7 +723,7 @@ namespace crucible {
TaskConsumerPtr this_consumer = shared_from_this(); TaskConsumerPtr this_consumer = shared_from_this();
swap(this_consumer, tl_current_consumer); swap(this_consumer, tl_current_consumer);
while (!master_copy->m_cancelled) { while (!master_copy->m_paused) {
if (master_copy->m_thread_max < master_copy->m_threads.size()) { if (master_copy->m_thread_max < master_copy->m_threads.size()) {
// We are one of too many threads, exit now // We are one of too many threads, exit now
break; break;