diff --git a/include/crucible/task.h b/include/crucible/task.h index 64add93..70dc3f2 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -40,6 +40,9 @@ namespace crucible { /// after the current instance exits. void run() const; + /// Schedule task to run when no other Task is available. + void idle() const; + /// Schedule Task to run after this Task has run or /// been destroyed. void append(const Task &task) const; diff --git a/lib/task.cc b/lib/task.cc index 9604ff2..3f2a7e8 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -124,6 +124,9 @@ namespace crucible { /// instance at the end of TaskMaster's global queue. void run(); + /// Run the task when there are no more Tasks on the main queue. + void idle(); + /// Execute task immediately in current thread if it is not already /// executing in another thread; otherwise, append the current task /// to itself to be executed immediately in the other thread. @@ -150,6 +153,7 @@ namespace crucible { mutex m_mutex; condition_variable m_condvar; TaskQueue m_queue; + TaskQueue m_idle_queue; size_t m_thread_max; size_t m_thread_min = 0; set m_threads; @@ -184,6 +188,7 @@ namespace crucible { TaskMasterState(size_t thread_max = thread::hardware_concurrency()); static void push_back(const TaskStatePtr &task); + static void push_back_idle(const TaskStatePtr &task); static void push_front(TaskQueue &queue); size_t get_queue_count(); size_t get_thread_count(); @@ -367,6 +372,17 @@ namespace crucible { TaskMasterState::push_back(shared_from_this()); } + void + TaskState::idle() + { + unique_lock lock(m_mutex); + if (m_run_now) { + return; + } + m_run_now = true; + TaskMasterState::push_back_idle(shared_from_this()); + } + TaskMasterState::TaskMasterState(size_t thread_max) : m_thread_max(thread_max), m_configured_thread_max(thread_max), @@ -410,6 +426,20 @@ namespace crucible { s_tms->start_threads_nolock(); } + void + TaskMasterState::push_back_idle(const TaskStatePtr &task) + { + THROW_CHECK0(runtime_error, task); + unique_lock lock(s_tms->m_mutex); + if (s_tms->m_cancelled) { + task->clear(); + return; + } + s_tms->m_idle_queue.push_back(task); + s_tms->m_condvar.notify_all(); + s_tms->start_threads_nolock(); + } + void TaskMasterState::push_front(TaskQueue &queue) { @@ -456,12 +486,26 @@ namespace crucible { TaskMaster::print_queue(ostream &os) { unique_lock lock(s_tms->m_mutex); - os << "Queue (size " << s_tms->m_queue.size() << "):" << endl; + auto queue_copy = s_tms->m_queue; + lock.unlock(); + os << "Queue (size " << queue_copy.size() << "):" << endl; size_t counter = 0; - for (auto i : s_tms->m_queue) { + for (auto i : queue_copy) { os << "Queue #" << ++counter << " Task ID " << i->id() << " " << i->title() << endl; } - return os << "Queue End" << endl; + os << "Queue End" << endl; + + lock.lock(); + queue_copy = s_tms->m_idle_queue; + lock.unlock(); + os << "Idle (size " << queue_copy.size() << "):" << endl; + counter = 0; + for (const auto &i : queue_copy) { + os << "Idle #" << ++counter << " Task ID " << i->id() << " " << i->title() << endl; + } + os << "Idle End" << endl; + + return os; } ostream & @@ -583,6 +627,7 @@ namespace crucible { m_cancelled = true; decltype(m_queue) empty_queue; m_queue.swap(empty_queue); + empty_queue.splice(empty_queue.end(), m_idle_queue); m_condvar.notify_all(); lock.unlock(); TaskState::clear_queue(empty_queue); @@ -682,6 +727,13 @@ namespace crucible { m_task_state->run(); } + void + Task::idle() const + { + THROW_CHECK0(runtime_error, m_task_state); + m_task_state->idle(); + } + void Task::append(const Task &that) const { @@ -772,6 +824,9 @@ namespace crucible { } else if (!master_copy->m_queue.empty()) { m_current_task = *master_copy->m_queue.begin(); master_copy->m_queue.pop_front(); + } else if (!master_copy->m_idle_queue.empty()) { + m_current_task = *master_copy->m_idle_queue.begin(); + master_copy->m_idle_queue.pop_front(); } else { master_copy->m_condvar.wait(lock); continue;