diff --git a/include/crucible/task.h b/include/crucible/task.h index 00ce6d3..142a0fc 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -98,6 +98,11 @@ namespace crucible { /// affected (use set_thread_count(0) to wait for those /// to complete). static void cancel(); + + /// Stop running any new Tasks. All existing + /// Consumer threads will exit. Does not affect queue. + /// Does not wait for threads to exit. Reversible. + static void pause(bool paused = true); }; class BarrierState; diff --git a/lib/task.cc b/lib/task.cc index a67f1db..2bb2ff1 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -137,6 +137,7 @@ namespace crucible { size_t m_configured_thread_max; double m_thread_target; bool m_cancelled = false; + bool m_paused = false; friend class TaskConsumer; friend class TaskMaster; @@ -150,6 +151,7 @@ namespace crucible { void set_loadavg_target(double target); void loadavg_thread_fn(); void cancel(); + void pause(bool paused = true); TaskMasterState &operator=(const TaskMasterState &) = delete; TaskMasterState(const TaskMasterState &) = delete; @@ -348,7 +350,7 @@ namespace crucible { void TaskMasterState::start_threads_nolock() { - while (m_threads.size() < m_thread_max) { + while (m_threads.size() < m_thread_max && !m_paused) { m_threads.insert(make_shared(shared_from_this())); } } @@ -449,8 +451,8 @@ namespace crucible { size_t TaskMasterState::calculate_thread_count_nolock() { - if (m_cancelled) { - // No threads running while cancelled + if (m_paused) { + // No threads running while paused or cancelled return 0; } @@ -524,12 +526,6 @@ namespace crucible { TaskMasterState::set_thread_count(size_t thread_max) { unique_lock lock(m_mutex); - // XXX: someday we might want to uncancel, and this would be the place to do it; - // however, when we cancel we destroy the entire Task queue, and that might be - // non-trivial to recover from - if (m_cancelled) { - return; - } m_configured_thread_max = thread_max; lock.unlock(); adjust_thread_count(); @@ -546,6 +542,7 @@ namespace crucible { TaskMasterState::cancel() { unique_lock lock(m_mutex); + m_paused = true; m_cancelled = true; decltype(m_queue) empty_queue; m_queue.swap(empty_queue); @@ -560,14 +557,25 @@ namespace crucible { s_tms->cancel(); } + void + TaskMasterState::pause(const bool paused) + { + unique_lock lock(m_mutex); + m_paused = paused; + m_condvar.notify_all(); + lock.unlock(); + } + + void + TaskMaster::pause(const bool paused) + { + s_tms->pause(paused); + } + void TaskMasterState::set_thread_min_count(size_t thread_min) { unique_lock lock(m_mutex); - // XXX: someday we might want to uncancel, and this would be the place to do it - if (m_cancelled) { - return; - } m_thread_min = thread_min; lock.unlock(); adjust_thread_count(); @@ -699,7 +707,7 @@ namespace crucible { TaskConsumer::consumer_thread() { // Keep a copy because we will be destroying *this later - auto master_copy = m_master; + const auto master_copy = m_master; // Constructor is running with master locked. // Wait until that is done before trying to do anything. @@ -715,7 +723,7 @@ namespace crucible { TaskConsumerPtr this_consumer = shared_from_this(); swap(this_consumer, tl_current_consumer); - while (!master_copy->m_cancelled) { + while (!master_copy->m_paused) { if (master_copy->m_thread_max < master_copy->m_threads.size()) { // We are one of too many threads, exit now break;