1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00

task: queue and run exactly once per instance

Enable much simpler Task management:  each time a Task needs to be done
at least once in the future, simply invoke the run() method on the Task.
The Task will ensure that it only runs once, only appears in a queue
once, and will run again if a run request is made while the Task is
already running.

Make the queue policy a member of the Task rather than a method.  This
enables Tasks to reschedule themselves, possibly on the appropriate queue
if we have more than one of those some day.

This happens to make Tasks more similar to Linux kernel workers.
This similarity is coincidental, but not undesirable.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2018-12-15 12:56:10 -05:00
parent e1de933f93
commit 4021dd42ca
2 changed files with 194 additions and 35 deletions

View File

@ -13,6 +13,7 @@ namespace crucible {
using TaskId = uint64_t; using TaskId = uint64_t;
/// A unit of work to be scheduled by TaskMaster.
class Task { class Task {
shared_ptr<TaskState> m_task_state; shared_ptr<TaskState> m_task_state;
@ -20,34 +21,45 @@ namespace crucible {
public: public:
// create empty Task object /// Create empty Task object.
Task() = default; Task() = default;
// create Task object containing closure and description /// Create Task object containing closure and description.
Task(string title, function<void()> exec_fn); Task(string title, function<void()> exec_fn);
// schedule Task at end of queue. /// Insert at tail of queue (default).
// May run Task in current thread or in other thread. void queue_at_tail() const;
// May run Task before or after returning.
/// Insert at head of queue instead of tail.
/// May insert onto current thread/CPU core's queue.
void queue_at_head() const;
// Add other insertion points here (same CPU, time delay, etc).
/// Schedule Task at designated queue position.
/// May run Task in current thread or in other thread.
/// May run Task before or after returning.
///
/// Only one instance of a Task may execute at a time.
/// If a Task is already scheduled, run() does nothing.
/// If a Task is already running, run() reschedules the
/// task after the currently running instance returns.
void run() const; void run() const;
// schedule Task before other queued tasks /// Describe Task as text.
void run_earlier() const;
// describe Task as text
string title() const; string title() const;
// Returns currently executing task if called from exec_fn. /// Returns currently executing task if called from exec_fn.
// Usually used to reschedule the currently executing Task. /// Usually used to reschedule the currently executing Task.
static Task current_task(); static Task current_task();
// Ordering for containers /// Ordering operator for containers
bool operator<(const Task &that) const; bool operator<(const Task &that) const;
// Null test /// Null test
operator bool() const; operator bool() const;
// Unique non-repeating(ish) ID for task /// Unique non-repeating(ish) ID for task
TaskId id() const; TaskId id() const;
}; };
@ -76,7 +88,10 @@ namespace crucible {
/// Gets the current number of queued Tasks /// Gets the current number of queued Tasks
static size_t get_queue_count(); static size_t get_queue_count();
/// Forcibly drop the queue and stop accepting new entries /// Drop the current queue and discard new Tasks without
/// running them. Currently executing tasks are not
/// affected (use set_thread_count(0) to wait for those
/// to complete).
static void cancel(); static void cancel();
}; };
@ -153,9 +168,9 @@ namespace crucible {
// objects it holds, and exit its Task function. // objects it holds, and exit its Task function.
ExclusionLock try_lock(); ExclusionLock try_lock();
// Execute Task when Exclusion is unlocked (possibly immediately). // Execute Task when Exclusion is unlocked (possibly
// First Task is scheduled with run_earlier(), all others are // immediately). First Task is scheduled at head,
// scheduled with run(). // all others are scheduled at tail.
void insert_task(Task t); void insert_task(Task t);
}; };

View File

@ -1,6 +1,5 @@
#include "crucible/task.h" #include "crucible/task.h"
#include "crucible/cleanup.h"
#include "crucible/error.h" #include "crucible/error.h"
#include "crucible/process.h" #include "crucible/process.h"
#include "crucible/time.h" #include "crucible/time.h"
@ -19,22 +18,79 @@ namespace crucible {
static thread_local weak_ptr<TaskState> tl_current_task_wp; static thread_local weak_ptr<TaskState> tl_current_task_wp;
class TaskStateLock;
class TaskState : public enable_shared_from_this<TaskState> { class TaskState : public enable_shared_from_this<TaskState> {
const function<void()> m_exec_fn; const function<void()> m_exec_fn;
const string m_title; const string m_title;
TaskId m_id; TaskId m_id;
function<void(shared_ptr<TaskState>)> m_queue_fn;
mutex m_mutex;
/// Set when task is on some queue and does not need to be queued again.
/// Cleared when exec() begins.
bool m_is_queued = false;
/// Set when task starts execution by exec().
/// Cleared when exec() completes.
bool m_is_running = false;
/// Set when task is being executed by exec(), but is
/// already running an earlier call to exec() in a second thread.
/// The earlier exec() shall requeue the task according to its
/// queue function when the earlier exec() completes.
bool m_run_again = false;
static atomic<TaskId> s_next_id; static atomic<TaskId> s_next_id;
friend class TaskStateLock;
public: public:
TaskState(string title, function<void()> exec_fn); TaskState(string title, function<void()> exec_fn);
/// Queue task for execution according to previously stored queue policy.
void run();
/// Execute task immediately in current thread if it is not already
/// executing in another thread. If m_run_again is set while m_is_running
/// is true, the thread that set m_is_running will requeue the task.
void exec(); void exec();
/// Return title of task.
string title() const; string title() const;
/// Return ID of task.
TaskId id() const; TaskId id() const;
/// Set queue policy to queue at head (before existing tasks on queue).
void queue_at_head();
/// Set queue policy to queue at tail (after existing tasks on queue).
void queue_at_tail();
/// Lock the task's queueing state so a queue can decide whether to
/// accept the task queue request or discard it. The decision
/// is communicated through TaskStateLock::set_queued(bool).
TaskStateLock lock_queue();
}; };
atomic<TaskId> TaskState::s_next_id; atomic<TaskId> TaskState::s_next_id;
class TaskStateLock {
TaskState &m_state;
unique_lock<mutex> m_lock;
TaskStateLock(TaskState &state);
friend class TaskState;
public:
/// Returns true if TaskState is currently queued.
bool is_queued() const;
/// Sets TaskState::m_queued to a different queued state.
/// Throws an exception on requests to transition to the current state.
void set_queued(bool is_queued_now);
};
class TaskConsumer; class TaskConsumer;
class TaskMasterState; class TaskMasterState;
@ -90,10 +146,30 @@ namespace crucible {
static shared_ptr<TaskMasterState> s_tms = make_shared<TaskMasterState>(); static shared_ptr<TaskMasterState> s_tms = make_shared<TaskMasterState>();
TaskStateLock::TaskStateLock(TaskState &state) :
m_state(state),
m_lock(state.m_mutex)
{
}
bool
TaskStateLock::is_queued() const
{
return m_state.m_is_queued;
}
void
TaskStateLock::set_queued(bool is_queued_now)
{
THROW_CHECK2(runtime_error, m_state.m_is_queued, is_queued_now, m_state.m_is_queued != is_queued_now);
m_state.m_is_queued = is_queued_now;
}
TaskState::TaskState(string title, function<void()> exec_fn) : TaskState::TaskState(string title, function<void()> exec_fn) :
m_exec_fn(exec_fn), m_exec_fn(exec_fn),
m_title(title), m_title(title),
m_id(++s_next_id) m_id(++s_next_id),
m_queue_fn(TaskMasterState::push_back)
{ {
THROW_CHECK0(invalid_argument, !m_title.empty()); THROW_CHECK0(invalid_argument, !m_title.empty());
} }
@ -104,21 +180,38 @@ namespace crucible {
THROW_CHECK0(invalid_argument, m_exec_fn); THROW_CHECK0(invalid_argument, m_exec_fn);
THROW_CHECK0(invalid_argument, !m_title.empty()); THROW_CHECK0(invalid_argument, !m_title.empty());
char buf[24]; unique_lock<mutex> lock(m_mutex);
memset(buf, '\0', sizeof(buf)); m_is_queued = false;
if (m_is_running) {
m_run_again = true;
return;
} else {
m_is_running = true;
}
lock.unlock();
char buf[24] = { 0 };
DIE_IF_MINUS_ERRNO(pthread_getname_np(pthread_self(), buf, sizeof(buf))); DIE_IF_MINUS_ERRNO(pthread_getname_np(pthread_self(), buf, sizeof(buf)));
Cleanup pthread_name_cleaner([&]() {
pthread_setname_np(pthread_self(), buf);
});
DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), m_title.c_str())); DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), m_title.c_str()));
weak_ptr<TaskState> this_task_wp = shared_from_this(); weak_ptr<TaskState> this_task_wp = shared_from_this();
Cleanup current_task_cleaner([&]() {
swap(this_task_wp, tl_current_task_wp);
});
swap(this_task_wp, tl_current_task_wp); swap(this_task_wp, tl_current_task_wp);
m_exec_fn(); catch_all([&]() {
m_exec_fn();
});
swap(this_task_wp, tl_current_task_wp);
pthread_setname_np(pthread_self(), buf);
lock.lock();
m_is_running = false;
bool run_again = m_run_again;
m_run_again = false;
lock.unlock();
if (run_again) {
run();
}
} }
string string
@ -134,6 +227,37 @@ namespace crucible {
return m_id; return m_id;
} }
void
TaskState::queue_at_head()
{
unique_lock<mutex> lock(m_mutex);
m_queue_fn = TaskMasterState::push_front;
}
void
TaskState::queue_at_tail()
{
unique_lock<mutex> lock(m_mutex);
m_queue_fn = TaskMasterState::push_back;
}
TaskStateLock
TaskState::lock_queue()
{
return TaskStateLock(*this);
}
void
TaskState::run()
{
unique_lock<mutex> lock(m_mutex);
THROW_CHECK0(runtime_error, m_queue_fn);
if (!m_is_queued) {
m_queue_fn(shared_from_this());
m_is_queued = true;
}
}
TaskMasterState::TaskMasterState(size_t thread_max) : TaskMasterState::TaskMasterState(size_t thread_max) :
m_thread_max(thread_max), m_thread_max(thread_max),
m_configured_thread_max(thread_max), m_configured_thread_max(thread_max),
@ -330,12 +454,10 @@ namespace crucible {
{ {
unique_lock<mutex> lock(m_mutex); unique_lock<mutex> lock(m_mutex);
m_cancelled = true; m_cancelled = true;
m_configured_thread_max = m_thread_min = m_thread_max = 0;
decltype(m_queue) empty_queue; decltype(m_queue) empty_queue;
m_queue.swap(empty_queue); m_queue.swap(empty_queue);
m_condvar.notify_all(); m_condvar.notify_all();
lock.unlock(); lock.unlock();
start_stop_threads();
} }
void void
@ -418,14 +540,21 @@ namespace crucible {
Task::run() const Task::run() const
{ {
THROW_CHECK0(runtime_error, m_task_state); THROW_CHECK0(runtime_error, m_task_state);
TaskMasterState::push_back(m_task_state); m_task_state->run();
} }
void void
Task::run_earlier() const Task::queue_at_head() const
{ {
THROW_CHECK0(runtime_error, m_task_state); THROW_CHECK0(runtime_error, m_task_state);
TaskMasterState::push_front(m_task_state); m_task_state->queue_at_head();
}
void
Task::queue_at_tail() const
{
THROW_CHECK0(runtime_error, m_task_state);
m_task_state->queue_at_tail();
} }
Task Task
@ -483,6 +612,7 @@ namespace crucible {
TaskConsumer::consumer_thread() TaskConsumer::consumer_thread()
{ {
auto master_locked = m_master.lock(); auto master_locked = m_master.lock();
unique_lock<mutex> lock(master_locked->m_mutex); unique_lock<mutex> lock(master_locked->m_mutex);
while (!master_locked->m_cancelled) { while (!master_locked->m_cancelled) {
if (master_locked->m_thread_max < master_locked->m_threads.size()) { if (master_locked->m_thread_max < master_locked->m_threads.size()) {
@ -495,15 +625,28 @@ namespace crucible {
} }
m_current_task = *master_locked->m_queue.begin(); m_current_task = *master_locked->m_queue.begin();
auto hold_task = m_current_task;
master_locked->m_queue.pop_front(); master_locked->m_queue.pop_front();
// Execute task without lock
lock.unlock(); lock.unlock();
catch_all([&]() { catch_all([&]() {
m_current_task->exec(); m_current_task->exec();
}); });
// Update m_current_task with lock
lock.lock(); lock.lock();
m_current_task.reset(); m_current_task.reset();
// Destroy hold_task without lock
lock.unlock();
hold_task.reset();
// Invariant: lock held
lock.lock();
} }
// Still holding lock
m_thread.detach(); m_thread.detach();
master_locked->m_threads.erase(shared_from_this()); master_locked->m_threads.erase(shared_from_this());
master_locked->m_condvar.notify_all(); master_locked->m_condvar.notify_all();
@ -610,7 +753,8 @@ namespace crucible {
bool first = true; bool first = true;
for (auto i : m_tasks) { for (auto i : m_tasks) {
if (first) { if (first) {
i.run_earlier(); i.queue_at_head();
i.run();
first = false; first = false;
} else { } else {
i.run(); i.run();