diff --git a/include/crucible/task.h b/include/crucible/task.h index e747ac0..9279490 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -13,6 +13,7 @@ namespace crucible { using TaskId = uint64_t; + /// A unit of work to be scheduled by TaskMaster. class Task { shared_ptr m_task_state; @@ -20,34 +21,45 @@ namespace crucible { public: - // create empty Task object + /// Create empty Task object. Task() = default; - // create Task object containing closure and description + /// Create Task object containing closure and description. Task(string title, function exec_fn); - // schedule Task at end of queue. - // May run Task in current thread or in other thread. - // May run Task before or after returning. + /// Insert at tail of queue (default). + void queue_at_tail() const; + + /// Insert at head of queue instead of tail. + /// May insert onto current thread/CPU core's queue. + void queue_at_head() const; + + // Add other insertion points here (same CPU, time delay, etc). + + /// Schedule Task at designated queue position. + /// May run Task in current thread or in other thread. + /// May run Task before or after returning. + /// + /// Only one instance of a Task may execute at a time. + /// If a Task is already scheduled, run() does nothing. + /// If a Task is already running, run() reschedules the + /// task after the currently running instance returns. void run() const; - // schedule Task before other queued tasks - void run_earlier() const; - - // describe Task as text + /// Describe Task as text. string title() const; - // Returns currently executing task if called from exec_fn. - // Usually used to reschedule the currently executing Task. + /// Returns currently executing task if called from exec_fn. + /// Usually used to reschedule the currently executing Task. static Task current_task(); - // Ordering for containers + /// Ordering operator for containers bool operator<(const Task &that) const; - // Null test + /// Null test operator bool() const; - // Unique non-repeating(ish) ID for task + /// Unique non-repeating(ish) ID for task TaskId id() const; }; @@ -76,7 +88,10 @@ namespace crucible { /// Gets the current number of queued Tasks static size_t get_queue_count(); - /// Forcibly drop the queue and stop accepting new entries + /// Drop the current queue and discard new Tasks without + /// running them. Currently executing tasks are not + /// affected (use set_thread_count(0) to wait for those + /// to complete). static void cancel(); }; @@ -153,9 +168,9 @@ namespace crucible { // objects it holds, and exit its Task function. ExclusionLock try_lock(); - // Execute Task when Exclusion is unlocked (possibly immediately). - // First Task is scheduled with run_earlier(), all others are - // scheduled with run(). + // Execute Task when Exclusion is unlocked (possibly + // immediately). First Task is scheduled at head, + // all others are scheduled at tail. void insert_task(Task t); }; diff --git a/lib/task.cc b/lib/task.cc index 0caef9c..cd37d4f 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -1,6 +1,5 @@ #include "crucible/task.h" -#include "crucible/cleanup.h" #include "crucible/error.h" #include "crucible/process.h" #include "crucible/time.h" @@ -19,22 +18,79 @@ namespace crucible { static thread_local weak_ptr tl_current_task_wp; + class TaskStateLock; + class TaskState : public enable_shared_from_this { const function m_exec_fn; const string m_title; TaskId m_id; + function)> m_queue_fn; + + mutex m_mutex; + + /// Set when task is on some queue and does not need to be queued again. + /// Cleared when exec() begins. + bool m_is_queued = false; + + /// Set when task starts execution by exec(). + /// Cleared when exec() completes. + bool m_is_running = false; + + /// Set when task is being executed by exec(), but is + /// already running an earlier call to exec() in a second thread. + /// The earlier exec() shall requeue the task according to its + /// queue function when the earlier exec() completes. + bool m_run_again = false; static atomic s_next_id; + + friend class TaskStateLock; + public: TaskState(string title, function exec_fn); + /// Queue task for execution according to previously stored queue policy. + void run(); + + /// Execute task immediately in current thread if it is not already + /// executing in another thread. If m_run_again is set while m_is_running + /// is true, the thread that set m_is_running will requeue the task. void exec(); + + /// Return title of task. string title() const; + + /// Return ID of task. TaskId id() const; + + /// Set queue policy to queue at head (before existing tasks on queue). + void queue_at_head(); + + /// Set queue policy to queue at tail (after existing tasks on queue). + void queue_at_tail(); + + /// Lock the task's queueing state so a queue can decide whether to + /// accept the task queue request or discard it. The decision + /// is communicated through TaskStateLock::set_queued(bool). + TaskStateLock lock_queue(); }; atomic TaskState::s_next_id; + class TaskStateLock { + TaskState &m_state; + unique_lock m_lock; + TaskStateLock(TaskState &state); + friend class TaskState; + public: + /// Returns true if TaskState is currently queued. + bool is_queued() const; + + /// Sets TaskState::m_queued to a different queued state. + /// Throws an exception on requests to transition to the current state. + void set_queued(bool is_queued_now); + }; + class TaskConsumer; class TaskMasterState; @@ -90,10 +146,30 @@ namespace crucible { static shared_ptr s_tms = make_shared(); + TaskStateLock::TaskStateLock(TaskState &state) : + m_state(state), + m_lock(state.m_mutex) + { + } + + bool + TaskStateLock::is_queued() const + { + return m_state.m_is_queued; + } + + void + TaskStateLock::set_queued(bool is_queued_now) + { + THROW_CHECK2(runtime_error, m_state.m_is_queued, is_queued_now, m_state.m_is_queued != is_queued_now); + m_state.m_is_queued = is_queued_now; + } + TaskState::TaskState(string title, function exec_fn) : m_exec_fn(exec_fn), m_title(title), - m_id(++s_next_id) + m_id(++s_next_id), + m_queue_fn(TaskMasterState::push_back) { THROW_CHECK0(invalid_argument, !m_title.empty()); } @@ -104,21 +180,38 @@ namespace crucible { THROW_CHECK0(invalid_argument, m_exec_fn); THROW_CHECK0(invalid_argument, !m_title.empty()); - char buf[24]; - memset(buf, '\0', sizeof(buf)); + unique_lock lock(m_mutex); + m_is_queued = false; + if (m_is_running) { + m_run_again = true; + return; + } else { + m_is_running = true; + } + lock.unlock(); + + char buf[24] = { 0 }; DIE_IF_MINUS_ERRNO(pthread_getname_np(pthread_self(), buf, sizeof(buf))); - Cleanup pthread_name_cleaner([&]() { - pthread_setname_np(pthread_self(), buf); - }); DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), m_title.c_str())); weak_ptr this_task_wp = shared_from_this(); - Cleanup current_task_cleaner([&]() { - swap(this_task_wp, tl_current_task_wp); - }); swap(this_task_wp, tl_current_task_wp); - m_exec_fn(); + catch_all([&]() { + m_exec_fn(); + }); + + swap(this_task_wp, tl_current_task_wp); + pthread_setname_np(pthread_self(), buf); + + lock.lock(); + m_is_running = false; + bool run_again = m_run_again; + m_run_again = false; + lock.unlock(); + if (run_again) { + run(); + } } string @@ -134,6 +227,37 @@ namespace crucible { return m_id; } + void + TaskState::queue_at_head() + { + unique_lock lock(m_mutex); + m_queue_fn = TaskMasterState::push_front; + } + + void + TaskState::queue_at_tail() + { + unique_lock lock(m_mutex); + m_queue_fn = TaskMasterState::push_back; + } + + TaskStateLock + TaskState::lock_queue() + { + return TaskStateLock(*this); + } + + void + TaskState::run() + { + unique_lock lock(m_mutex); + THROW_CHECK0(runtime_error, m_queue_fn); + if (!m_is_queued) { + m_queue_fn(shared_from_this()); + m_is_queued = true; + } + } + TaskMasterState::TaskMasterState(size_t thread_max) : m_thread_max(thread_max), m_configured_thread_max(thread_max), @@ -330,12 +454,10 @@ namespace crucible { { unique_lock lock(m_mutex); m_cancelled = true; - m_configured_thread_max = m_thread_min = m_thread_max = 0; decltype(m_queue) empty_queue; m_queue.swap(empty_queue); m_condvar.notify_all(); lock.unlock(); - start_stop_threads(); } void @@ -418,14 +540,21 @@ namespace crucible { Task::run() const { THROW_CHECK0(runtime_error, m_task_state); - TaskMasterState::push_back(m_task_state); + m_task_state->run(); } void - Task::run_earlier() const + Task::queue_at_head() const { THROW_CHECK0(runtime_error, m_task_state); - TaskMasterState::push_front(m_task_state); + m_task_state->queue_at_head(); + } + + void + Task::queue_at_tail() const + { + THROW_CHECK0(runtime_error, m_task_state); + m_task_state->queue_at_tail(); } Task @@ -483,6 +612,7 @@ namespace crucible { TaskConsumer::consumer_thread() { auto master_locked = m_master.lock(); + unique_lock lock(master_locked->m_mutex); while (!master_locked->m_cancelled) { if (master_locked->m_thread_max < master_locked->m_threads.size()) { @@ -495,15 +625,28 @@ namespace crucible { } m_current_task = *master_locked->m_queue.begin(); + auto hold_task = m_current_task; master_locked->m_queue.pop_front(); + + // Execute task without lock lock.unlock(); catch_all([&]() { m_current_task->exec(); }); + + // Update m_current_task with lock lock.lock(); m_current_task.reset(); + + // Destroy hold_task without lock + lock.unlock(); + hold_task.reset(); + + // Invariant: lock held + lock.lock(); } + // Still holding lock m_thread.detach(); master_locked->m_threads.erase(shared_from_this()); master_locked->m_condvar.notify_all(); @@ -610,7 +753,8 @@ namespace crucible { bool first = true; for (auto i : m_tasks) { if (first) { - i.run_earlier(); + i.queue_at_head(); + i.run(); first = false; } else { i.run();