1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00

task: add an idle queue

Add a second level queue which is only serviced when the local and global
queues are empty.

At some point there might be a need to implement a full priority queue,
but for now two classes are sufficient.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2024-11-28 00:01:37 -05:00
parent 099ad2ce7c
commit b99d80b40f
2 changed files with 61 additions and 3 deletions

View File

@ -40,6 +40,9 @@ namespace crucible {
/// after the current instance exits.
void run() const;
/// Schedule task to run when no other Task is available.
void idle() const;
/// Schedule Task to run after this Task has run or
/// been destroyed.
void append(const Task &task) const;

View File

@ -124,6 +124,9 @@ namespace crucible {
/// instance at the end of TaskMaster's global queue.
void run();
/// Run the task when there are no more Tasks on the main queue.
void idle();
/// Execute task immediately in current thread if it is not already
/// executing in another thread; otherwise, append the current task
/// to itself to be executed immediately in the other thread.
@ -150,6 +153,7 @@ namespace crucible {
mutex m_mutex;
condition_variable m_condvar;
TaskQueue m_queue;
TaskQueue m_idle_queue;
size_t m_thread_max;
size_t m_thread_min = 0;
set<TaskConsumerPtr> m_threads;
@ -184,6 +188,7 @@ namespace crucible {
TaskMasterState(size_t thread_max = thread::hardware_concurrency());
static void push_back(const TaskStatePtr &task);
static void push_back_idle(const TaskStatePtr &task);
static void push_front(TaskQueue &queue);
size_t get_queue_count();
size_t get_thread_count();
@ -367,6 +372,17 @@ namespace crucible {
TaskMasterState::push_back(shared_from_this());
}
void
TaskState::idle()
{
unique_lock<mutex> lock(m_mutex);
if (m_run_now) {
return;
}
m_run_now = true;
TaskMasterState::push_back_idle(shared_from_this());
}
TaskMasterState::TaskMasterState(size_t thread_max) :
m_thread_max(thread_max),
m_configured_thread_max(thread_max),
@ -410,6 +426,20 @@ namespace crucible {
s_tms->start_threads_nolock();
}
void
TaskMasterState::push_back_idle(const TaskStatePtr &task)
{
THROW_CHECK0(runtime_error, task);
unique_lock<mutex> lock(s_tms->m_mutex);
if (s_tms->m_cancelled) {
task->clear();
return;
}
s_tms->m_idle_queue.push_back(task);
s_tms->m_condvar.notify_all();
s_tms->start_threads_nolock();
}
void
TaskMasterState::push_front(TaskQueue &queue)
{
@ -456,12 +486,26 @@ namespace crucible {
TaskMaster::print_queue(ostream &os)
{
unique_lock<mutex> lock(s_tms->m_mutex);
os << "Queue (size " << s_tms->m_queue.size() << "):" << endl;
auto queue_copy = s_tms->m_queue;
lock.unlock();
os << "Queue (size " << queue_copy.size() << "):" << endl;
size_t counter = 0;
for (auto i : s_tms->m_queue) {
for (auto i : queue_copy) {
os << "Queue #" << ++counter << " Task ID " << i->id() << " " << i->title() << endl;
}
return os << "Queue End" << endl;
os << "Queue End" << endl;
lock.lock();
queue_copy = s_tms->m_idle_queue;
lock.unlock();
os << "Idle (size " << queue_copy.size() << "):" << endl;
counter = 0;
for (const auto &i : queue_copy) {
os << "Idle #" << ++counter << " Task ID " << i->id() << " " << i->title() << endl;
}
os << "Idle End" << endl;
return os;
}
ostream &
@ -583,6 +627,7 @@ namespace crucible {
m_cancelled = true;
decltype(m_queue) empty_queue;
m_queue.swap(empty_queue);
empty_queue.splice(empty_queue.end(), m_idle_queue);
m_condvar.notify_all();
lock.unlock();
TaskState::clear_queue(empty_queue);
@ -682,6 +727,13 @@ namespace crucible {
m_task_state->run();
}
void
Task::idle() const
{
THROW_CHECK0(runtime_error, m_task_state);
m_task_state->idle();
}
void
Task::append(const Task &that) const
{
@ -772,6 +824,9 @@ namespace crucible {
} else if (!master_copy->m_queue.empty()) {
m_current_task = *master_copy->m_queue.begin();
master_copy->m_queue.pop_front();
} else if (!master_copy->m_idle_queue.empty()) {
m_current_task = *master_copy->m_idle_queue.begin();
master_copy->m_idle_queue.pop_front();
} else {
master_copy->m_condvar.wait(lock);
continue;