1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00

task: fixes for priority and idle Tasks

Tasks are not allowed to be queued more than once, but it is allowed
to queue a Task while it's already running, which means a Task can be
executed on two threads in parallel.  Tasks detect this and handle it
by queueing the Task on its own post-exec queue.  That in turn leads
to Workers which continually execute the same Task if that Task doesn't
create any new Tasks, while other Tasks sit on the Master queue waiting
for a Worker to dequeue them.

For idle Tasks, we don't want the Task to be rescheduled immediately.
We want the idle Task to execute again after every available Task on
both the main and idle queues has been executed.

Fix these by having each Task reschedule itself on the appropriate
queue when it finishes executing.

Priority queued Tasks should executed in priority order not just one
Task's post-exec queue, but the entire local queue of the TaskConsumer.

Fix this by moving the sort into either the TaskConsumer that receives
a post-exec queue, if there is one, or into the Task that is created
to insert the post-exec queue into a TaskConsumer when one becomes
available in the future.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2025-01-12 18:40:14 -05:00
parent d4a681c8a2
commit c53fa04a2f

View File

@ -86,6 +86,14 @@ namespace crucible {
/// Cleared when exec() ends.
bool m_is_running = false;
/// Set when task is queued while already running.
/// Cleared when task is requeued.
bool m_run_again = false;
/// Set when task is queued as idle task while already running.
/// Cleared when task is queued as non-idle task.
bool m_idle = false;
/// Sequential identifier for next task
static atomic<TaskId> s_next_id;
@ -231,16 +239,16 @@ namespace crucible {
if (queue.empty()) {
return;
}
if (sort_queue) {
queue.sort([&](const TaskStatePtr &a, const TaskStatePtr &b) {
return a->m_id < b->m_id;
});
}
const auto tlcc = tl_current_consumer;
const auto &tlcc = tl_current_consumer;
if (tlcc) {
// We are executing under a TaskConsumer, splice our post-exec queue at front.
// No locks needed because we are using only thread-local objects.
tlcc->m_local_queue.splice(tlcc->m_local_queue.begin(), queue);
if (sort_queue) {
tlcc->m_local_queue.sort([&](const TaskStatePtr &a, const TaskStatePtr &b) {
return a->m_id < b->m_id;
});
}
} else {
// We are not executing under a TaskConsumer.
// If there is only one task, then just insert it at the front of the queue.
@ -251,6 +259,8 @@ namespace crucible {
// then push it to the front of the global queue using normal locking methods.
TaskStatePtr rescue_task(make_shared<TaskState>("rescue_task", [](){}));
swap(rescue_task->m_post_exec_queue, queue);
// Do the sort--once--when a new Consumer has picked up the Task
rescue_task->m_sort_queue = sort_queue;
TaskQueue tq_one { rescue_task };
TaskMasterState::push_front(tq_one);
}
@ -264,6 +274,7 @@ namespace crucible {
unique_lock<mutex> lock(m_mutex);
// If any dependent Tasks were appended since the last exec, run them now
TaskState::rescue_queue(m_post_exec_queue, m_sort_queue);
// No need to clear m_sort_queue here, it won't exist soon
}
TaskState::TaskState(string title, function<void()> exec_fn) :
@ -322,6 +333,7 @@ namespace crucible {
task->m_run_now = true;
append_nolock(task);
}
task->m_idle = false;
}
void
@ -338,6 +350,7 @@ namespace crucible {
m_post_exec_queue.push_back(task);
m_post_exec_queue.splice(m_post_exec_queue.end(), task->m_post_exec_queue);
}
task->m_idle = false;
}
void
@ -348,7 +361,7 @@ namespace crucible {
unique_lock<mutex> lock(m_mutex);
if (m_is_running) {
append_nolock(shared_from_this());
m_run_again = true;
return;
} else {
m_run_now = false;
@ -372,6 +385,17 @@ namespace crucible {
swap(this_task, tl_current_task);
m_is_running = false;
if (m_run_again) {
m_run_again = false;
if (m_idle) {
// All the way back to the end of the line
TaskMasterState::push_back_idle(shared_from_this());
} else {
// Insert after any dependents waiting for this Task
m_post_exec_queue.push_back(shared_from_this());
}
}
// Splice task post_exec queue at front of local queue
TaskState::rescue_queue(m_post_exec_queue, m_sort_queue);
m_sort_queue = false;
@ -394,22 +418,32 @@ namespace crucible {
TaskState::run()
{
unique_lock<mutex> lock(m_mutex);
m_idle = false;
if (m_run_now) {
return;
}
m_run_now = true;
TaskMasterState::push_back(shared_from_this());
if (m_is_running) {
m_run_again = true;
} else {
TaskMasterState::push_back(shared_from_this());
}
}
void
TaskState::idle()
{
unique_lock<mutex> lock(m_mutex);
m_idle = true;
if (m_run_now) {
return;
}
m_run_now = true;
TaskMasterState::push_back_idle(shared_from_this());
if (m_is_running) {
m_run_again = true;
} else {
TaskMasterState::push_back_idle(shared_from_this());
}
}
TaskMasterState::TaskMasterState(size_t thread_max) :