diff --git a/include/crucible/task.h b/include/crucible/task.h index 3f806a1..c97339c 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -27,16 +27,7 @@ namespace crucible { /// Create Task object containing closure and description. Task(string title, function exec_fn); - /// Insert at tail of queue (default). - void queue_at_tail() const; - - /// Insert at head of queue instead of tail. - /// May insert onto current thread/CPU core's queue. - void queue_at_head() const; - - // Add other insertion points here (same CPU, time delay, etc). - - /// Schedule Task at designated queue position. + /// Schedule Task for at least one future execution. /// May run Task in current thread or in other thread. /// May run Task before or after returning. /// @@ -46,6 +37,9 @@ namespace crucible { /// task after the currently running instance returns. void run() const; + /// Schedule Task to run after this task has run at least once. + void append(const Task &task) const; + /// Describe Task as text. string title() const; @@ -159,7 +153,7 @@ namespace crucible { Exclusion(shared_ptr pes); public: - Exclusion(); + Exclusion(const string &title); // Attempt to obtain a Lock. If successful, current Task // owns the Lock until the ExclusionLock is released @@ -172,9 +166,8 @@ namespace crucible { ExclusionLock try_lock(); // Execute Task when Exclusion is unlocked (possibly - // immediately). First Task is scheduled at head, - // all others are scheduled at tail. - void insert_task(Task t); + // immediately). + void insert_task(Task t = Task::current_task()); }; diff --git a/lib/task.cc b/lib/task.cc index 5c033de..91a28be 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -5,7 +5,6 @@ #include "crucible/time.h" #include -#include #include #include #include @@ -13,43 +12,89 @@ #include #include +#include +#include + namespace crucible { using namespace std; - static thread_local weak_ptr tl_current_task_wp; + class TaskState; + using TaskStatePtr = shared_ptr; + using TaskStateWeak = weak_ptr; - class TaskStateLock; + class TaskConsumer; + using TaskConsumerPtr = shared_ptr; + using TaskConsumerWeak = weak_ptr; + + using TaskQueue = list; + + static thread_local TaskStatePtr tl_current_task; + + /// because we don't want to bump -std=c++-17 just to get scoped_lock + class PairLock { + unique_lock m_lock1, m_lock2; + public: + PairLock(mutex &m1, mutex &m2) : + m_lock1(m1, defer_lock), + m_lock2(m2, defer_lock) + { + if (&m1 == &m2) { + m_lock1.lock(); + } else { + lock(m_lock1, m_lock2); + } + } + }; class TaskState : public enable_shared_from_this { + mutex m_mutex; const function m_exec_fn; const string m_title; - TaskId m_id; - function)> m_queue_fn; - mutex m_mutex; + /// Tasks to be executed after the current task is executed + list m_post_exec_queue; - /// Set when task is on some queue and does not need to be queued again. + /// Set when task is waiting to execute. /// Cleared when exec() begins. - bool m_is_queued = false; + bool m_is_waiting = false; /// Set when task starts execution by exec(). - /// Cleared when exec() completes. + /// Cleared when exec() ends. bool m_is_running = false; - /// Set when task is being executed by exec(), but is - /// already running an earlier call to exec() in a second thread. - /// The earlier exec() shall requeue the task according to its - /// queue function when the earlier exec() completes. - bool m_run_again = false; - + /// Sequential identifier for next task static atomic s_next_id; - friend class TaskStateLock; + /// Identifier for this task + TaskId m_id; + + /// Backend for append() + void append_nolock(const TaskStatePtr &task); + + /// Clear the post-execution queue. Recursively destroys post-exec + /// queues of all tasks in post-exec queue. Useful only when + /// cancelling the entire task queue. + void clear(); + + friend class TaskMasterState; + friend class TaskConsumer; + + /// Clear any TaskQueue, not just this one. + static void clear_queue(TaskQueue &tq); + + /// Rescue any TaskQueue, not just this one. + static void rescue_queue(TaskQueue &tq); + + TaskState &operator=(const TaskState &) = delete; + TaskState(const TaskState &) = delete; public: TaskState(string title, function exec_fn); - /// Queue task for execution according to previously stored queue policy. + /// Run the task at least once. If task is already running, appends + /// a self-reference to its after queue. If task is not running + /// and task is not waiting, adds task to a master queue and marks + /// the task waiting. void run(); /// Execute task immediately in current thread if it is not already @@ -63,44 +108,21 @@ namespace crucible { /// Return ID of task. TaskId id() const; - /// Set queue policy to queue at head (before existing tasks on queue). - void queue_at_head(); - - /// Set queue policy to queue at tail (after existing tasks on queue). - void queue_at_tail(); - - /// Lock the task's queueing state so a queue can decide whether to - /// accept the task queue request or discard it. The decision - /// is communicated through TaskStateLock::set_queued(bool). - TaskStateLock lock_queue(); + /// Queue task to execute after current task finishes executing. + /// If current task is neither running nor waiting, this + /// places the argument task on a worker queue immediately. + void append(const TaskStatePtr &task); }; atomic TaskState::s_next_id; - class TaskStateLock { - TaskState &m_state; - unique_lock m_lock; - TaskStateLock(TaskState &state); - friend class TaskState; - public: - /// Returns true if TaskState is currently queued. - bool is_queued() const; - - /// Sets TaskState::m_queued to a different queued state. - /// Throws an exception on requests to transition to the current state. - void set_queued(bool is_queued_now); - }; - - class TaskConsumer; - class TaskMasterState; - class TaskMasterState : public enable_shared_from_this { mutex m_mutex; condition_variable m_condvar; - list> m_queue; + TaskQueue m_queue; size_t m_thread_max; size_t m_thread_min = 0; - set> m_threads; + set m_threads; shared_ptr m_load_tracking_thread; double m_load_target = 0; double m_prev_loadavg; @@ -121,60 +143,110 @@ namespace crucible { void loadavg_thread_fn(); void cancel(); + TaskMasterState &operator=(const TaskMasterState &) = delete; + TaskMasterState(const TaskMasterState &) = delete; + public: ~TaskMasterState(); TaskMasterState(size_t thread_max = thread::hardware_concurrency()); - static void push_back(shared_ptr task); - static void push_front(shared_ptr task); + static void push_back(const TaskStatePtr &task); + static void push_front(TaskQueue &queue); size_t get_queue_count(); size_t get_thread_count(); }; class TaskConsumer : public enable_shared_from_this { weak_ptr m_master; - thread m_thread; - shared_ptr m_current_task; + TaskStatePtr m_current_task; + + friend class TaskState; + TaskQueue m_local_queue; void consumer_thread(); shared_ptr current_task_locked(); + friend class TaskMaster; + friend class TaskMasterState; public: TaskConsumer(weak_ptr tms); shared_ptr current_task(); - friend class TaskMaster; - friend class TaskMasterState; + private: + // Make sure this gets constructed _last_ + thread m_thread; }; - static shared_ptr s_tms = make_shared(); + static thread_local TaskConsumerPtr tl_current_consumer; - TaskStateLock::TaskStateLock(TaskState &state) : - m_state(state), - m_lock(state.m_mutex) - { - } - - bool - TaskStateLock::is_queued() const - { - return m_state.m_is_queued; - } + static auto s_tms = make_shared(); void - TaskStateLock::set_queued(bool is_queued_now) + TaskState::rescue_queue(TaskQueue &queue) { - THROW_CHECK2(runtime_error, m_state.m_is_queued, is_queued_now, m_state.m_is_queued != is_queued_now); - m_state.m_is_queued = is_queued_now; + if (queue.empty()) { + return; + } + auto tlcc = tl_current_consumer; + if (tlcc) { + // We are executing under a TaskConsumer, splice our post-exec queue at front. + // No locks needed because we are using only thread-local objects. + tlcc->m_local_queue.splice(tlcc->m_local_queue.begin(), queue); + } else { + // We are not executing under a TaskConsumer. + // Create a new task to wrap our post-exec queue, + // then push it to the front of the global queue using normal locking methods. + TaskStatePtr rescue_task(make_shared("rescue_task", [](){})); + swap(rescue_task->m_post_exec_queue, queue); + rescue_task->m_is_waiting; + TaskQueue tq_one { rescue_task }; + TaskMasterState::push_front(tq_one); + } } TaskState::TaskState(string title, function exec_fn) : m_exec_fn(exec_fn), m_title(title), - m_id(++s_next_id), - m_queue_fn(TaskMasterState::push_back) + m_id(++s_next_id) { THROW_CHECK0(invalid_argument, !m_title.empty()); } + void + TaskState::clear() + { + TaskQueue post_exec_queue; + unique_lock lock(m_mutex); + swap(post_exec_queue, m_post_exec_queue); + lock.unlock(); + clear_queue(post_exec_queue); + } + + void + TaskState::clear_queue(TaskQueue &tq) + { + while (!tq.empty()) { + auto i = *tq.begin(); + tq.pop_front(); + i->clear(); + } + } + + void + TaskState::append_nolock(const TaskStatePtr &task) + { + task->m_is_waiting = true; + m_post_exec_queue.push_back(task); + } + + void + TaskState::append(const TaskStatePtr &task) + { + if (!task) { + return; + } + PairLock lock(m_mutex, task->m_mutex); + append_nolock(task); + } + void TaskState::exec() { @@ -182,9 +254,9 @@ namespace crucible { THROW_CHECK0(invalid_argument, !m_title.empty()); unique_lock lock(m_mutex); - m_is_queued = false; + m_is_waiting = false; if (m_is_running) { - m_run_again = true; + append_nolock(shared_from_this()); return; } else { m_is_running = true; @@ -195,24 +267,21 @@ namespace crucible { DIE_IF_MINUS_ERRNO(pthread_getname_np(pthread_self(), buf, sizeof(buf))); DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), m_title.c_str())); - weak_ptr this_task_wp = shared_from_this(); - swap(this_task_wp, tl_current_task_wp); + TaskStatePtr this_task = shared_from_this(); + swap(this_task, tl_current_task); catch_all([&]() { m_exec_fn(); }); - swap(this_task_wp, tl_current_task_wp); + swap(this_task, tl_current_task); pthread_setname_np(pthread_self(), buf); lock.lock(); m_is_running = false; - bool run_again = m_run_again; - m_run_again = false; - lock.unlock(); - if (run_again) { - run(); - } + + // Splice task post_exec queue at front of local queue + TaskState::rescue_queue(m_post_exec_queue); } string @@ -228,34 +297,13 @@ namespace crucible { return m_id; } - void - TaskState::queue_at_head() - { - unique_lock lock(m_mutex); - m_queue_fn = TaskMasterState::push_front; - } - - void - TaskState::queue_at_tail() - { - unique_lock lock(m_mutex); - m_queue_fn = TaskMasterState::push_back; - } - - TaskStateLock - TaskState::lock_queue() - { - return TaskStateLock(*this); - } - void TaskState::run() { unique_lock lock(m_mutex); - THROW_CHECK0(runtime_error, m_queue_fn); - if (!m_is_queued) { - m_queue_fn(shared_from_this()); - m_is_queued = true; + if (!m_is_waiting) { + TaskMasterState::push_back(shared_from_this()); + m_is_waiting = true; } } @@ -288,11 +336,12 @@ namespace crucible { } void - TaskMasterState::push_back(shared_ptr task) + TaskMasterState::push_back(const TaskStatePtr &task) { THROW_CHECK0(runtime_error, task); unique_lock lock(s_tms->m_mutex); if (s_tms->m_cancelled) { + task->clear(); return; } s_tms->m_queue.push_back(task); @@ -301,14 +350,17 @@ namespace crucible { } void - TaskMasterState::push_front(shared_ptr task) + TaskMasterState::push_front(TaskQueue &queue) { - THROW_CHECK0(runtime_error, task); - unique_lock lock(s_tms->m_mutex); - if (s_tms->m_cancelled) { + if (queue.empty()) { return; } - s_tms->m_queue.push_front(task); + unique_lock lock(s_tms->m_mutex); + if (s_tms->m_cancelled) { + TaskState::clear_queue(queue); + return; + } + s_tms->m_queue.splice(s_tms->m_queue.begin(), queue); s_tms->m_condvar.notify_all(); s_tms->start_threads_nolock(); } @@ -441,7 +493,9 @@ namespace crucible { TaskMasterState::set_thread_count(size_t thread_max) { unique_lock lock(m_mutex); - // XXX: someday we might want to uncancel, and this would be the place to do it + // XXX: someday we might want to uncancel, and this would be the place to do it; + // however, when we cancel we destroy the entire Task queue, and that might be + // non-trivial to recover from if (m_cancelled) { return; } @@ -466,6 +520,7 @@ namespace crucible { m_queue.swap(empty_queue); m_condvar.notify_all(); lock.unlock(); + TaskState::clear_queue(empty_queue); } void @@ -552,23 +607,19 @@ namespace crucible { } void - Task::queue_at_head() const + Task::append(const Task &that) const { THROW_CHECK0(runtime_error, m_task_state); - m_task_state->queue_at_head(); - } - - void - Task::queue_at_tail() const - { - THROW_CHECK0(runtime_error, m_task_state); - m_task_state->queue_at_tail(); + if (!that) { + return; + } + m_task_state->append(that.m_task_state); } Task Task::current_task() { - return Task(tl_current_task_wp.lock()); + return Task(tl_current_task); } string @@ -619,23 +670,32 @@ namespace crucible { void TaskConsumer::consumer_thread() { - auto master_locked = m_master.lock(); + m_thread.detach(); + auto master_locked = m_master.lock(); unique_lock lock(master_locked->m_mutex); + + // It is now safe to access our own shared_ptr + TaskConsumerPtr this_consumer = shared_from_this(); + swap(this_consumer, tl_current_consumer); + while (!master_locked->m_cancelled) { if (master_locked->m_thread_max < master_locked->m_threads.size()) { + // We are one of too many threads, exit now break; } - if (master_locked->m_queue.empty()) { + if (!m_local_queue.empty()) { + m_current_task = *m_local_queue.begin(); + m_local_queue.pop_front(); + } else if (!master_locked->m_queue.empty()) { + m_current_task = *master_locked->m_queue.begin(); + master_locked->m_queue.pop_front(); + } else { master_locked->m_condvar.wait(lock); continue; } - m_current_task = *master_locked->m_queue.begin(); - auto hold_task = m_current_task; - master_locked->m_queue.pop_front(); - // Execute task without lock lock.unlock(); catch_all([&]() { @@ -643,8 +703,9 @@ namespace crucible { }); // Update m_current_task with lock + decltype(m_current_task) hold_task; lock.lock(); - m_current_task.reset(); + swap(hold_task, m_current_task); // Destroy hold_task without lock lock.unlock(); @@ -654,8 +715,20 @@ namespace crucible { lock.lock(); } - // Still holding lock - m_thread.detach(); + // There is no longer a current consumer, but hold our own shared + // state so it's still there in the destructor + swap(this_consumer, tl_current_consumer); + + // Release lock to rescue queue (may attempt to queue a new task at TaskMaster). + // rescue_queue normally sends tasks to the local queue of the current TaskConsumer thread, + // but we just disconnected ourselves from that. + lock.unlock(); + TaskState::rescue_queue(m_local_queue); + + // Hold lock so we can erase ourselves + lock.lock(); + + // Fun fact: shared_from_this() isn't usable until the constructor returns... master_locked->m_threads.erase(shared_from_this()); master_locked->m_condvar.notify_all(); } @@ -734,9 +807,10 @@ namespace crucible { class ExclusionState { mutex m_mutex; bool m_locked = false; - set m_tasks; + Task m_task; public: + ExclusionState(const string &title); ~ExclusionState(); void release(); bool try_lock(); @@ -748,8 +822,13 @@ namespace crucible { { } - Exclusion::Exclusion() : - m_exclusion_state(make_shared()) + Exclusion::Exclusion(const string &title) : + m_exclusion_state(make_shared(title)) + { + } + + ExclusionState::ExclusionState(const string &title) : + m_task(title, [](){}) { } @@ -758,17 +837,7 @@ namespace crucible { { unique_lock lock(m_mutex); m_locked = false; - bool first = true; - for (auto i : m_tasks) { - if (first) { - i.queue_at_head(); - i.run(); - first = false; - } else { - i.run(); - } - } - m_tasks.clear(); + m_task.run(); } ExclusionState::~ExclusionState() @@ -799,7 +868,13 @@ namespace crucible { ExclusionState::insert_task(Task task) { unique_lock lock(m_mutex); - m_tasks.insert(task); + if (m_locked) { + // If Exclusion is locked then queue task for release; + m_task.append(task); + } else { + // otherwise, run the inserted task immediately + task.run(); + } } bool diff --git a/test/task.cc b/test/task.cc index 4011ea1..e688a49 100644 --- a/test/task.cc +++ b/test/task.cc @@ -3,6 +3,8 @@ #include "crucible/task.h" #include "crucible/time.h" +#include +#include #include #include #include @@ -70,13 +72,14 @@ test_finish() TaskMaster::print_queue(oss); TaskMaster::print_workers(oss); TaskMaster::set_thread_count(0); - // cerr << "finish done" << endl; + cerr << "finish done..."; } void test_unfinish() { TaskMaster::set_thread_count(); + cerr << "unfinish done..."; } @@ -150,61 +153,78 @@ void test_exclusion(size_t count) { mutex only_one; - Exclusion excl; + auto excl = make_shared("test_excl"); mutex mtx; condition_variable cv; - unique_lock lock(mtx); + size_t tasks_running(0); + atomic lock_success_count(0); + atomic lock_failure_count(0); - auto b = make_shared(); + vector pings; + pings.resize(count); // Run several tasks in parallel for (size_t c = 0; c < count; ++c) { - auto bl = b->lock(); ostringstream oss; oss << "task #" << c; Task t( oss.str(), - [c, &only_one, &excl, bl]() mutable { + [c, &only_one, excl, &lock_success_count, &lock_failure_count, &pings, &tasks_running, &cv, &mtx]() mutable { // cerr << "Task #" << c << endl; (void)c; - auto lock = excl.try_lock(); + auto lock = excl->try_lock(); if (!lock) { - excl.insert_task(Task::current_task()); + excl->insert_task(Task::current_task()); + ++lock_failure_count; return; } + ++lock_success_count; bool locked = only_one.try_lock(); assert(locked); nanosleep(0.0001); only_one.unlock(); - bl.release(); + unique_lock mtx_lock(mtx); + --tasks_running; + ++pings[c]; + cv.notify_all(); } ); + unique_lock mtx_lock(mtx); + ++tasks_running; t.run(); } - bool done_flag = false; + // excl.reset(); - Task completed( - "Waiting for Barrier", - [&mtx, &cv, &done_flag]() { - unique_lock lock(mtx); - // cerr << "Running cv notify" << endl; - done_flag = true; - cv.notify_all(); + unique_lock lock(mtx); + while (tasks_running) { + auto cv_rv = cv.wait_for(lock, chrono::duration(1)); + if (cv_rv == cv_status::timeout) { + // TaskMaster::print_tasks(cerr); + for (auto i : pings) { + cerr << i << " "; + } + cerr << endl << "tasks_running = " << tasks_running << endl; + cerr << "lock_success_count " << lock_success_count << endl; + cerr << "lock_failure_count " << lock_failure_count << endl; } - ); - b->insert_task(completed); + } + cerr << "lock_success_count " << lock_success_count << endl; + cerr << "lock_failure_count " << lock_failure_count << endl; - b.reset(); - - while (true) { - if (done_flag) { - break; + bool oops = false; + for (size_t c = 0; c < pings.size(); ++c) { + if (pings[c] != 1) { + cerr << "pings[" << c << "] = " << pings[c] << endl; + oops = true; } - - cv.wait(lock); + } + if (oops) { + assert(!"Pings not OK"); + } else { + cerr << "Pings OK" << endl; } }