diff --git a/lib/task.cc b/lib/task.cc index 679a714..fb90dcc 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -86,6 +86,14 @@ namespace crucible { /// Cleared when exec() ends. bool m_is_running = false; + /// Set when task is queued while already running. + /// Cleared when task is requeued. + bool m_run_again = false; + + /// Set when task is queued as idle task while already running. + /// Cleared when task is queued as non-idle task. + bool m_idle = false; + /// Sequential identifier for next task static atomic s_next_id; @@ -231,16 +239,16 @@ namespace crucible { if (queue.empty()) { return; } - if (sort_queue) { - queue.sort([&](const TaskStatePtr &a, const TaskStatePtr &b) { - return a->m_id < b->m_id; - }); - } - const auto tlcc = tl_current_consumer; + const auto &tlcc = tl_current_consumer; if (tlcc) { // We are executing under a TaskConsumer, splice our post-exec queue at front. // No locks needed because we are using only thread-local objects. tlcc->m_local_queue.splice(tlcc->m_local_queue.begin(), queue); + if (sort_queue) { + tlcc->m_local_queue.sort([&](const TaskStatePtr &a, const TaskStatePtr &b) { + return a->m_id < b->m_id; + }); + } } else { // We are not executing under a TaskConsumer. // If there is only one task, then just insert it at the front of the queue. @@ -251,6 +259,8 @@ namespace crucible { // then push it to the front of the global queue using normal locking methods. TaskStatePtr rescue_task(make_shared("rescue_task", [](){})); swap(rescue_task->m_post_exec_queue, queue); + // Do the sort--once--when a new Consumer has picked up the Task + rescue_task->m_sort_queue = sort_queue; TaskQueue tq_one { rescue_task }; TaskMasterState::push_front(tq_one); } @@ -264,6 +274,7 @@ namespace crucible { unique_lock lock(m_mutex); // If any dependent Tasks were appended since the last exec, run them now TaskState::rescue_queue(m_post_exec_queue, m_sort_queue); + // No need to clear m_sort_queue here, it won't exist soon } TaskState::TaskState(string title, function exec_fn) : @@ -322,6 +333,7 @@ namespace crucible { task->m_run_now = true; append_nolock(task); } + task->m_idle = false; } void @@ -338,6 +350,7 @@ namespace crucible { m_post_exec_queue.push_back(task); m_post_exec_queue.splice(m_post_exec_queue.end(), task->m_post_exec_queue); } + task->m_idle = false; } void @@ -348,7 +361,7 @@ namespace crucible { unique_lock lock(m_mutex); if (m_is_running) { - append_nolock(shared_from_this()); + m_run_again = true; return; } else { m_run_now = false; @@ -372,6 +385,17 @@ namespace crucible { swap(this_task, tl_current_task); m_is_running = false; + if (m_run_again) { + m_run_again = false; + if (m_idle) { + // All the way back to the end of the line + TaskMasterState::push_back_idle(shared_from_this()); + } else { + // Insert after any dependents waiting for this Task + m_post_exec_queue.push_back(shared_from_this()); + } + } + // Splice task post_exec queue at front of local queue TaskState::rescue_queue(m_post_exec_queue, m_sort_queue); m_sort_queue = false; @@ -394,22 +418,32 @@ namespace crucible { TaskState::run() { unique_lock lock(m_mutex); + m_idle = false; if (m_run_now) { return; } m_run_now = true; - TaskMasterState::push_back(shared_from_this()); + if (m_is_running) { + m_run_again = true; + } else { + TaskMasterState::push_back(shared_from_this()); + } } void TaskState::idle() { unique_lock lock(m_mutex); + m_idle = true; if (m_run_now) { return; } m_run_now = true; - TaskMasterState::push_back_idle(shared_from_this()); + if (m_is_running) { + m_run_again = true; + } else { + TaskMasterState::push_back_idle(shared_from_this()); + } } TaskMasterState::TaskMasterState(size_t thread_max) :