1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00

task: add an insert method for priority-queueing Tasks by age

Task started out as a self-organizing parallel-make algorithm, but ended
up becoming a half-broken wait-die algorithm.  When a contended object
is already locked, Tasks enter a FIFO queue to restart and acquire the
lock.  This is the "die" part of wait-die (all locks on an Exclusion are
non-blocking, so no Task ever does "wait").  The lock queue is FIFO wrt
_lock acquisition order_, not _Task age_ as required by the wait-die
algorithm.

Make it a 25%-broken wait-die algorithm by sorting the Tasks on lock
queues in order of Task ID, i.e. oldest-first, or FIFO wrt Task age.
This ensures the oldest Task waiting for an object is the one to get
it when it becomes available, as expected from the wait-die algorithm.

This should reduce the amount of time Tasks spend on the execution queue,
and reduce memory usage by avoiding the accumulation of Tasks that cannot
make forward progress.

Note that turning `TaskQueue` into an ordered container would have
undesirable side-effects:

 * `std::list` has some useful properties wrt stability of object
 location and cost of splicing.  Other containers may not have these,
 and `std::list` does have a `sort` method.

 * Some Task objects are created at the beginning and reused continually,
 but we really do want those Tasks to be executed in FIFO order wrt
 submission, not Task ID.  We can exclude these tasks by only doing the
 sorting when a Task is queued for an Exclusin object.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2025-01-11 23:21:31 -05:00
parent a5d078d48b
commit 74d8bdd60f
2 changed files with 49 additions and 10 deletions

View File

@ -47,6 +47,10 @@ namespace crucible {
/// been destroyed. /// been destroyed.
void append(const Task &task) const; void append(const Task &task) const;
/// Schedule Task to run after this Task has run or
/// been destroyed, in Task ID order.
void insert(const Task &task) const;
/// Describe Task as text. /// Describe Task as text.
string title() const; string title() const;

View File

@ -76,9 +76,12 @@ namespace crucible {
/// Tasks to be executed after the current task is executed /// Tasks to be executed after the current task is executed
list<TaskStatePtr> m_post_exec_queue; list<TaskStatePtr> m_post_exec_queue;
/// Set by run() and append(). Cleared by exec(). /// Set by run(), append(), and insert(). Cleared by exec().
bool m_run_now = false; bool m_run_now = false;
/// Set by insert(). Cleared by exec() and destructor.
bool m_sort_queue = false;
/// Set when task starts execution by exec(). /// Set when task starts execution by exec().
/// Cleared when exec() ends. /// Cleared when exec() ends.
bool m_is_running = false; bool m_is_running = false;
@ -107,7 +110,7 @@ namespace crucible {
static void clear_queue(TaskQueue &tq); static void clear_queue(TaskQueue &tq);
/// Rescue any TaskQueue, not just this one. /// Rescue any TaskQueue, not just this one.
static void rescue_queue(TaskQueue &tq); static void rescue_queue(TaskQueue &tq, const bool sort_queue);
TaskState &operator=(const TaskState &) = delete; TaskState &operator=(const TaskState &) = delete;
TaskState(const TaskState &) = delete; TaskState(const TaskState &) = delete;
@ -142,6 +145,10 @@ namespace crucible {
/// or is destroyed. /// or is destroyed.
void append(const TaskStatePtr &task); void append(const TaskStatePtr &task);
/// Queue task to execute after current task finishes executing
/// or is destroyed, in task ID order.
void insert(const TaskStatePtr &task);
/// How masy Tasks are there? Good for catching leaks /// How masy Tasks are there? Good for catching leaks
static size_t instance_count(); static size_t instance_count();
}; };
@ -219,11 +226,16 @@ namespace crucible {
static auto s_tms = make_shared<TaskMasterState>(); static auto s_tms = make_shared<TaskMasterState>();
void void
TaskState::rescue_queue(TaskQueue &queue) TaskState::rescue_queue(TaskQueue &queue, const bool sort_queue)
{ {
if (queue.empty()) { if (queue.empty()) {
return; return;
} }
if (sort_queue) {
queue.sort([&](const TaskStatePtr &a, const TaskStatePtr &b) {
return a->m_id < b->m_id;
});
}
const auto tlcc = tl_current_consumer; const auto tlcc = tl_current_consumer;
if (tlcc) { if (tlcc) {
// We are executing under a TaskConsumer, splice our post-exec queue at front. // We are executing under a TaskConsumer, splice our post-exec queue at front.
@ -251,7 +263,7 @@ namespace crucible {
--s_instance_count; --s_instance_count;
unique_lock<mutex> lock(m_mutex); unique_lock<mutex> lock(m_mutex);
// If any dependent Tasks were appended since the last exec, run them now // If any dependent Tasks were appended since the last exec, run them now
TaskState::rescue_queue(m_post_exec_queue); TaskState::rescue_queue(m_post_exec_queue, m_sort_queue);
} }
TaskState::TaskState(string title, function<void()> exec_fn) : TaskState::TaskState(string title, function<void()> exec_fn) :
@ -312,6 +324,18 @@ namespace crucible {
} }
} }
void
TaskState::insert(const TaskStatePtr &task)
{
THROW_CHECK0(invalid_argument, task);
PairLock lock(m_mutex, task->m_mutex);
if (!task->m_run_now) {
task->m_run_now = true;
m_sort_queue = true;
append_nolock(task);
}
}
void void
TaskState::exec() TaskState::exec()
{ {
@ -345,7 +369,8 @@ namespace crucible {
m_is_running = false; m_is_running = false;
// Splice task post_exec queue at front of local queue // Splice task post_exec queue at front of local queue
TaskState::rescue_queue(m_post_exec_queue); TaskState::rescue_queue(m_post_exec_queue, m_sort_queue);
m_sort_queue = false;
} }
string string
@ -740,6 +765,14 @@ namespace crucible {
m_task_state->append(that.m_task_state); m_task_state->append(that.m_task_state);
} }
void
Task::insert(const Task &that) const
{
THROW_CHECK0(runtime_error, m_task_state);
THROW_CHECK0(runtime_error, that);
m_task_state->insert(that.m_task_state);
}
Task Task
Task::current_task() Task::current_task()
{ {
@ -854,11 +887,13 @@ namespace crucible {
swap(this_consumer, tl_current_consumer); swap(this_consumer, tl_current_consumer);
assert(!tl_current_consumer); assert(!tl_current_consumer);
// Release lock to rescue queue (may attempt to queue a new task at TaskMaster). // Release lock to rescue queue (may attempt to queue a
// rescue_queue normally sends tasks to the local queue of the current TaskConsumer thread, // new task at TaskMaster). rescue_queue normally sends
// but we just disconnected ourselves from that. // tasks to the local queue of the current TaskConsumer
// thread, but we just disconnected ourselves from that.
// No sorting here because this is not a TaskState.
lock.unlock(); lock.unlock();
TaskState::rescue_queue(m_local_queue); TaskState::rescue_queue(m_local_queue, false);
// Hold lock so we can erase ourselves // Hold lock so we can erase ourselves
lock.lock(); lock.lock();
@ -943,7 +978,7 @@ namespace crucible {
const auto sp = m_owner.lock(); const auto sp = m_owner.lock();
if (sp) { if (sp) {
if (task) { if (task) {
sp->append(task); sp->insert(task);
} }
return ExclusionLock(); return ExclusionLock();
} else { } else {