From 74d8bdd60fab1d8d5149f66f73fb5bc3438fcc82 Mon Sep 17 00:00:00 2001 From: Zygo Blaxell Date: Sat, 11 Jan 2025 23:21:31 -0500 Subject: [PATCH] task: add an `insert` method for priority-queueing Tasks by age Task started out as a self-organizing parallel-make algorithm, but ended up becoming a half-broken wait-die algorithm. When a contended object is already locked, Tasks enter a FIFO queue to restart and acquire the lock. This is the "die" part of wait-die (all locks on an Exclusion are non-blocking, so no Task ever does "wait"). The lock queue is FIFO wrt _lock acquisition order_, not _Task age_ as required by the wait-die algorithm. Make it a 25%-broken wait-die algorithm by sorting the Tasks on lock queues in order of Task ID, i.e. oldest-first, or FIFO wrt Task age. This ensures the oldest Task waiting for an object is the one to get it when it becomes available, as expected from the wait-die algorithm. This should reduce the amount of time Tasks spend on the execution queue, and reduce memory usage by avoiding the accumulation of Tasks that cannot make forward progress. Note that turning `TaskQueue` into an ordered container would have undesirable side-effects: * `std::list` has some useful properties wrt stability of object location and cost of splicing. Other containers may not have these, and `std::list` does have a `sort` method. * Some Task objects are created at the beginning and reused continually, but we really do want those Tasks to be executed in FIFO order wrt submission, not Task ID. We can exclude these tasks by only doing the sorting when a Task is queued for an Exclusin object. Signed-off-by: Zygo Blaxell --- include/crucible/task.h | 4 +++ lib/task.cc | 55 +++++++++++++++++++++++++++++++++-------- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/include/crucible/task.h b/include/crucible/task.h index 81c8550..caabada 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -47,6 +47,10 @@ namespace crucible { /// been destroyed. void append(const Task &task) const; + /// Schedule Task to run after this Task has run or + /// been destroyed, in Task ID order. + void insert(const Task &task) const; + /// Describe Task as text. string title() const; diff --git a/lib/task.cc b/lib/task.cc index 35faaf5..6bbf165 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -76,9 +76,12 @@ namespace crucible { /// Tasks to be executed after the current task is executed list m_post_exec_queue; - /// Set by run() and append(). Cleared by exec(). + /// Set by run(), append(), and insert(). Cleared by exec(). bool m_run_now = false; + /// Set by insert(). Cleared by exec() and destructor. + bool m_sort_queue = false; + /// Set when task starts execution by exec(). /// Cleared when exec() ends. bool m_is_running = false; @@ -107,7 +110,7 @@ namespace crucible { static void clear_queue(TaskQueue &tq); /// Rescue any TaskQueue, not just this one. - static void rescue_queue(TaskQueue &tq); + static void rescue_queue(TaskQueue &tq, const bool sort_queue); TaskState &operator=(const TaskState &) = delete; TaskState(const TaskState &) = delete; @@ -142,6 +145,10 @@ namespace crucible { /// or is destroyed. void append(const TaskStatePtr &task); + /// Queue task to execute after current task finishes executing + /// or is destroyed, in task ID order. + void insert(const TaskStatePtr &task); + /// How masy Tasks are there? Good for catching leaks static size_t instance_count(); }; @@ -219,11 +226,16 @@ namespace crucible { static auto s_tms = make_shared(); void - TaskState::rescue_queue(TaskQueue &queue) + TaskState::rescue_queue(TaskQueue &queue, const bool sort_queue) { if (queue.empty()) { return; } + if (sort_queue) { + queue.sort([&](const TaskStatePtr &a, const TaskStatePtr &b) { + return a->m_id < b->m_id; + }); + } const auto tlcc = tl_current_consumer; if (tlcc) { // We are executing under a TaskConsumer, splice our post-exec queue at front. @@ -251,7 +263,7 @@ namespace crucible { --s_instance_count; unique_lock lock(m_mutex); // If any dependent Tasks were appended since the last exec, run them now - TaskState::rescue_queue(m_post_exec_queue); + TaskState::rescue_queue(m_post_exec_queue, m_sort_queue); } TaskState::TaskState(string title, function exec_fn) : @@ -312,6 +324,18 @@ namespace crucible { } } + void + TaskState::insert(const TaskStatePtr &task) + { + THROW_CHECK0(invalid_argument, task); + PairLock lock(m_mutex, task->m_mutex); + if (!task->m_run_now) { + task->m_run_now = true; + m_sort_queue = true; + append_nolock(task); + } + } + void TaskState::exec() { @@ -345,7 +369,8 @@ namespace crucible { m_is_running = false; // Splice task post_exec queue at front of local queue - TaskState::rescue_queue(m_post_exec_queue); + TaskState::rescue_queue(m_post_exec_queue, m_sort_queue); + m_sort_queue = false; } string @@ -740,6 +765,14 @@ namespace crucible { m_task_state->append(that.m_task_state); } + void + Task::insert(const Task &that) const + { + THROW_CHECK0(runtime_error, m_task_state); + THROW_CHECK0(runtime_error, that); + m_task_state->insert(that.m_task_state); + } + Task Task::current_task() { @@ -854,11 +887,13 @@ namespace crucible { swap(this_consumer, tl_current_consumer); assert(!tl_current_consumer); - // Release lock to rescue queue (may attempt to queue a new task at TaskMaster). - // rescue_queue normally sends tasks to the local queue of the current TaskConsumer thread, - // but we just disconnected ourselves from that. + // Release lock to rescue queue (may attempt to queue a + // new task at TaskMaster). rescue_queue normally sends + // tasks to the local queue of the current TaskConsumer + // thread, but we just disconnected ourselves from that. + // No sorting here because this is not a TaskState. lock.unlock(); - TaskState::rescue_queue(m_local_queue); + TaskState::rescue_queue(m_local_queue, false); // Hold lock so we can erase ourselves lock.lock(); @@ -943,7 +978,7 @@ namespace crucible { const auto sp = m_owner.lock(); if (sp) { if (task) { - sp->append(task); + sp->insert(task); } return ExclusionLock(); } else {