diff --git a/include/crucible/task.h b/include/crucible/task.h index ed60181..e747ac0 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -55,27 +55,29 @@ namespace crucible { class TaskMaster { public: - // Blocks until the running thread count reaches this number + /// Blocks until the running thread count reaches this number static void set_thread_count(size_t threads); - // Sets minimum thread count when load average tracking enabled + /// Sets minimum thread count when load average tracking enabled static void set_thread_min_count(size_t min_threads); - // Calls set_thread_count with default + /// Calls set_thread_count with default static void set_thread_count(); - // Creates thread to track load average and adjust thread count dynamically + /// Creates thread to track load average and adjust thread count dynamically static void set_loadavg_target(double target); - // Writes the current non-executing Task queue + /// Writes the current non-executing Task queue static ostream & print_queue(ostream &); - // Writes the current executing Task for each worker + /// Writes the current executing Task for each worker static ostream & print_workers(ostream &); - // Gets the current number of queued Tasks + /// Gets the current number of queued Tasks static size_t get_queue_count(); + /// Forcibly drop the queue and stop accepting new entries + static void cancel(); }; // Barrier executes waiting Tasks once the last BarrierLock diff --git a/lib/task.cc b/lib/task.cc index 4bdaeec..0caef9c 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -50,6 +50,7 @@ namespace crucible { double m_prev_loadavg; size_t m_configured_thread_max; double m_thread_target; + bool m_cancelled = false; friend class TaskConsumer; friend class TaskMaster; @@ -62,6 +63,7 @@ namespace crucible { size_t calculate_thread_count_nolock(); void set_loadavg_target(double target); void loadavg_thread_fn(); + void cancel(); public: ~TaskMasterState(); @@ -165,6 +167,9 @@ namespace crucible { { THROW_CHECK0(runtime_error, task); unique_lock lock(s_tms->m_mutex); + if (s_tms->m_cancelled) { + return; + } s_tms->m_queue.push_back(task); s_tms->m_condvar.notify_all(); s_tms->start_threads_nolock(); @@ -175,6 +180,9 @@ namespace crucible { { THROW_CHECK0(runtime_error, task); unique_lock lock(s_tms->m_mutex); + if (s_tms->m_cancelled) { + return; + } s_tms->m_queue.push_front(task); s_tms->m_condvar.notify_all(); s_tms->start_threads_nolock(); @@ -226,6 +234,11 @@ namespace crucible { size_t TaskMasterState::calculate_thread_count_nolock() { + if (m_cancelled) { + // No threads running while cancelled + return 0; + } + if (m_load_target == 0) { // No limits, no stats, use configured thread count return m_configured_thread_max; @@ -296,6 +309,10 @@ namespace crucible { TaskMasterState::set_thread_count(size_t thread_max) { unique_lock lock(m_mutex); + // XXX: someday we might want to uncancel, and this would be the place to do it + if (m_cancelled) { + return; + } m_configured_thread_max = thread_max; lock.unlock(); adjust_thread_count(); @@ -308,10 +325,33 @@ namespace crucible { s_tms->set_thread_count(thread_max); } + void + TaskMasterState::cancel() + { + unique_lock lock(m_mutex); + m_cancelled = true; + m_configured_thread_max = m_thread_min = m_thread_max = 0; + decltype(m_queue) empty_queue; + m_queue.swap(empty_queue); + m_condvar.notify_all(); + lock.unlock(); + start_stop_threads(); + } + + void + TaskMaster::cancel() + { + s_tms->cancel(); + } + void TaskMasterState::set_thread_min_count(size_t thread_min) { unique_lock lock(m_mutex); + // XXX: someday we might want to uncancel, and this would be the place to do it + if (m_cancelled) { + return; + } m_thread_min = thread_min; lock.unlock(); adjust_thread_count(); @@ -328,7 +368,7 @@ namespace crucible { TaskMasterState::loadavg_thread_fn() { pthread_setname_np(pthread_self(), "load_tracker"); - while (true) { + while (!m_cancelled) { adjust_thread_count(); nanosleep(5.0); } @@ -340,6 +380,9 @@ namespace crucible { THROW_CHECK1(out_of_range, target, target >= 0); unique_lock lock(m_mutex); + if (m_cancelled) { + return; + } m_load_target = target; m_prev_loadavg = getloadavg1(); @@ -440,8 +483,8 @@ namespace crucible { TaskConsumer::consumer_thread() { auto master_locked = m_master.lock(); - while (true) { - unique_lock lock(master_locked->m_mutex); + unique_lock lock(master_locked->m_mutex); + while (!master_locked->m_cancelled) { if (master_locked->m_thread_max < master_locked->m_threads.size()) { break; } @@ -461,7 +504,6 @@ namespace crucible { m_current_task.reset(); } - unique_lock lock(master_locked->m_mutex); m_thread.detach(); master_locked->m_threads.erase(shared_from_this()); master_locked->m_condvar.notify_all();