From c53fa04a2fdfe39e2bf0d0c70dde84a2211b39eb Mon Sep 17 00:00:00 2001 From: Zygo Blaxell Date: Sun, 12 Jan 2025 18:40:14 -0500 Subject: [PATCH] task: fixes for priority and idle Tasks Tasks are not allowed to be queued more than once, but it is allowed to queue a Task while it's already running, which means a Task can be executed on two threads in parallel. Tasks detect this and handle it by queueing the Task on its own post-exec queue. That in turn leads to Workers which continually execute the same Task if that Task doesn't create any new Tasks, while other Tasks sit on the Master queue waiting for a Worker to dequeue them. For idle Tasks, we don't want the Task to be rescheduled immediately. We want the idle Task to execute again after every available Task on both the main and idle queues has been executed. Fix these by having each Task reschedule itself on the appropriate queue when it finishes executing. Priority queued Tasks should executed in priority order not just one Task's post-exec queue, but the entire local queue of the TaskConsumer. Fix this by moving the sort into either the TaskConsumer that receives a post-exec queue, if there is one, or into the Task that is created to insert the post-exec queue into a TaskConsumer when one becomes available in the future. Signed-off-by: Zygo Blaxell --- lib/task.cc | 52 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/lib/task.cc b/lib/task.cc index 679a714..fb90dcc 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -86,6 +86,14 @@ namespace crucible { /// Cleared when exec() ends. bool m_is_running = false; + /// Set when task is queued while already running. + /// Cleared when task is requeued. + bool m_run_again = false; + + /// Set when task is queued as idle task while already running. + /// Cleared when task is queued as non-idle task. + bool m_idle = false; + /// Sequential identifier for next task static atomic s_next_id; @@ -231,16 +239,16 @@ namespace crucible { if (queue.empty()) { return; } - if (sort_queue) { - queue.sort([&](const TaskStatePtr &a, const TaskStatePtr &b) { - return a->m_id < b->m_id; - }); - } - const auto tlcc = tl_current_consumer; + const auto &tlcc = tl_current_consumer; if (tlcc) { // We are executing under a TaskConsumer, splice our post-exec queue at front. // No locks needed because we are using only thread-local objects. tlcc->m_local_queue.splice(tlcc->m_local_queue.begin(), queue); + if (sort_queue) { + tlcc->m_local_queue.sort([&](const TaskStatePtr &a, const TaskStatePtr &b) { + return a->m_id < b->m_id; + }); + } } else { // We are not executing under a TaskConsumer. // If there is only one task, then just insert it at the front of the queue. @@ -251,6 +259,8 @@ namespace crucible { // then push it to the front of the global queue using normal locking methods. TaskStatePtr rescue_task(make_shared("rescue_task", [](){})); swap(rescue_task->m_post_exec_queue, queue); + // Do the sort--once--when a new Consumer has picked up the Task + rescue_task->m_sort_queue = sort_queue; TaskQueue tq_one { rescue_task }; TaskMasterState::push_front(tq_one); } @@ -264,6 +274,7 @@ namespace crucible { unique_lock lock(m_mutex); // If any dependent Tasks were appended since the last exec, run them now TaskState::rescue_queue(m_post_exec_queue, m_sort_queue); + // No need to clear m_sort_queue here, it won't exist soon } TaskState::TaskState(string title, function exec_fn) : @@ -322,6 +333,7 @@ namespace crucible { task->m_run_now = true; append_nolock(task); } + task->m_idle = false; } void @@ -338,6 +350,7 @@ namespace crucible { m_post_exec_queue.push_back(task); m_post_exec_queue.splice(m_post_exec_queue.end(), task->m_post_exec_queue); } + task->m_idle = false; } void @@ -348,7 +361,7 @@ namespace crucible { unique_lock lock(m_mutex); if (m_is_running) { - append_nolock(shared_from_this()); + m_run_again = true; return; } else { m_run_now = false; @@ -372,6 +385,17 @@ namespace crucible { swap(this_task, tl_current_task); m_is_running = false; + if (m_run_again) { + m_run_again = false; + if (m_idle) { + // All the way back to the end of the line + TaskMasterState::push_back_idle(shared_from_this()); + } else { + // Insert after any dependents waiting for this Task + m_post_exec_queue.push_back(shared_from_this()); + } + } + // Splice task post_exec queue at front of local queue TaskState::rescue_queue(m_post_exec_queue, m_sort_queue); m_sort_queue = false; @@ -394,22 +418,32 @@ namespace crucible { TaskState::run() { unique_lock lock(m_mutex); + m_idle = false; if (m_run_now) { return; } m_run_now = true; - TaskMasterState::push_back(shared_from_this()); + if (m_is_running) { + m_run_again = true; + } else { + TaskMasterState::push_back(shared_from_this()); + } } void TaskState::idle() { unique_lock lock(m_mutex); + m_idle = true; if (m_run_now) { return; } m_run_now = true; - TaskMasterState::push_back_idle(shared_from_this()); + if (m_is_running) { + m_run_again = true; + } else { + TaskMasterState::push_back_idle(shared_from_this()); + } } TaskMasterState::TaskMasterState(size_t thread_max) :