From b7f9ce3f080ea3c8529b36a87ba5f52dfb563eaf Mon Sep 17 00:00:00 2001 From: Zygo Blaxell Date: Sat, 29 May 2021 23:58:28 -0400 Subject: [PATCH] task: serialize Task execution when Tasks block due to mutex contention Quite often we want to execute task B after task A finishes executing, especially if tasks A and B attempt to acquire locks on the same objects. Implement that capability in Task directly: each Task holds a queue of Tasks which will be executed strictly after this Task has finished executing, or if the Task is destroyed. Add a local queue to each TaskConsumer. This queue contains a list of Tasks which are to be executed by a single thread in sequential order. These tasks are executed before fetching any tasks from TaskMaster. Each time a Task finishes executing, the list of tasks appended to the recently executed Task are spliced at the beginning of the thread's TaskConsumer local queue. These tasks will be executed in the same thread in the same order they were appended to the recently executed Task. If a Task is destroyed with a post-execution queue, that queue is also inserted at the front of the current TaskConsumer's local queue. If a Task is destroyed or somehow executed outside of a TaskConsumer thread, or a TaskConsumer thread is destroyed, the local queue of Tasks is wrapped in a "rescue_task" Task, and spliced before the head of the global queue. This preserves the sequential ordering of tasks. In all cases the order of sequential execution of Tasks that are appended to another Task is preserved. The unused queue insertion functions are removed. Exclusion is now simply a mutex, a bool, and a Task with an empty function. Tasks that queue up waiting for the mutex are stored in Exclusion's Task, and Exclusion simply runs that task when the ExclusionState is released. Signed-off-by: Zygo Blaxell --- include/crucible/task.h | 21 +-- lib/task.cc | 365 ++++++++++++++++++++++++---------------- test/task.cc | 72 +++++--- 3 files changed, 273 insertions(+), 185 deletions(-) diff --git a/include/crucible/task.h b/include/crucible/task.h index 3f806a1..c97339c 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -27,16 +27,7 @@ namespace crucible { /// Create Task object containing closure and description. Task(string title, function exec_fn); - /// Insert at tail of queue (default). - void queue_at_tail() const; - - /// Insert at head of queue instead of tail. - /// May insert onto current thread/CPU core's queue. - void queue_at_head() const; - - // Add other insertion points here (same CPU, time delay, etc). - - /// Schedule Task at designated queue position. + /// Schedule Task for at least one future execution. /// May run Task in current thread or in other thread. /// May run Task before or after returning. /// @@ -46,6 +37,9 @@ namespace crucible { /// task after the currently running instance returns. void run() const; + /// Schedule Task to run after this task has run at least once. + void append(const Task &task) const; + /// Describe Task as text. string title() const; @@ -159,7 +153,7 @@ namespace crucible { Exclusion(shared_ptr pes); public: - Exclusion(); + Exclusion(const string &title); // Attempt to obtain a Lock. If successful, current Task // owns the Lock until the ExclusionLock is released @@ -172,9 +166,8 @@ namespace crucible { ExclusionLock try_lock(); // Execute Task when Exclusion is unlocked (possibly - // immediately). First Task is scheduled at head, - // all others are scheduled at tail. - void insert_task(Task t); + // immediately). + void insert_task(Task t = Task::current_task()); }; diff --git a/lib/task.cc b/lib/task.cc index 5c033de..91a28be 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -5,7 +5,6 @@ #include "crucible/time.h" #include -#include #include #include #include @@ -13,43 +12,89 @@ #include #include +#include +#include + namespace crucible { using namespace std; - static thread_local weak_ptr tl_current_task_wp; + class TaskState; + using TaskStatePtr = shared_ptr; + using TaskStateWeak = weak_ptr; - class TaskStateLock; + class TaskConsumer; + using TaskConsumerPtr = shared_ptr; + using TaskConsumerWeak = weak_ptr; + + using TaskQueue = list; + + static thread_local TaskStatePtr tl_current_task; + + /// because we don't want to bump -std=c++-17 just to get scoped_lock + class PairLock { + unique_lock m_lock1, m_lock2; + public: + PairLock(mutex &m1, mutex &m2) : + m_lock1(m1, defer_lock), + m_lock2(m2, defer_lock) + { + if (&m1 == &m2) { + m_lock1.lock(); + } else { + lock(m_lock1, m_lock2); + } + } + }; class TaskState : public enable_shared_from_this { + mutex m_mutex; const function m_exec_fn; const string m_title; - TaskId m_id; - function)> m_queue_fn; - mutex m_mutex; + /// Tasks to be executed after the current task is executed + list m_post_exec_queue; - /// Set when task is on some queue and does not need to be queued again. + /// Set when task is waiting to execute. /// Cleared when exec() begins. - bool m_is_queued = false; + bool m_is_waiting = false; /// Set when task starts execution by exec(). - /// Cleared when exec() completes. + /// Cleared when exec() ends. bool m_is_running = false; - /// Set when task is being executed by exec(), but is - /// already running an earlier call to exec() in a second thread. - /// The earlier exec() shall requeue the task according to its - /// queue function when the earlier exec() completes. - bool m_run_again = false; - + /// Sequential identifier for next task static atomic s_next_id; - friend class TaskStateLock; + /// Identifier for this task + TaskId m_id; + + /// Backend for append() + void append_nolock(const TaskStatePtr &task); + + /// Clear the post-execution queue. Recursively destroys post-exec + /// queues of all tasks in post-exec queue. Useful only when + /// cancelling the entire task queue. + void clear(); + + friend class TaskMasterState; + friend class TaskConsumer; + + /// Clear any TaskQueue, not just this one. + static void clear_queue(TaskQueue &tq); + + /// Rescue any TaskQueue, not just this one. + static void rescue_queue(TaskQueue &tq); + + TaskState &operator=(const TaskState &) = delete; + TaskState(const TaskState &) = delete; public: TaskState(string title, function exec_fn); - /// Queue task for execution according to previously stored queue policy. + /// Run the task at least once. If task is already running, appends + /// a self-reference to its after queue. If task is not running + /// and task is not waiting, adds task to a master queue and marks + /// the task waiting. void run(); /// Execute task immediately in current thread if it is not already @@ -63,44 +108,21 @@ namespace crucible { /// Return ID of task. TaskId id() const; - /// Set queue policy to queue at head (before existing tasks on queue). - void queue_at_head(); - - /// Set queue policy to queue at tail (after existing tasks on queue). - void queue_at_tail(); - - /// Lock the task's queueing state so a queue can decide whether to - /// accept the task queue request or discard it. The decision - /// is communicated through TaskStateLock::set_queued(bool). - TaskStateLock lock_queue(); + /// Queue task to execute after current task finishes executing. + /// If current task is neither running nor waiting, this + /// places the argument task on a worker queue immediately. + void append(const TaskStatePtr &task); }; atomic TaskState::s_next_id; - class TaskStateLock { - TaskState &m_state; - unique_lock m_lock; - TaskStateLock(TaskState &state); - friend class TaskState; - public: - /// Returns true if TaskState is currently queued. - bool is_queued() const; - - /// Sets TaskState::m_queued to a different queued state. - /// Throws an exception on requests to transition to the current state. - void set_queued(bool is_queued_now); - }; - - class TaskConsumer; - class TaskMasterState; - class TaskMasterState : public enable_shared_from_this { mutex m_mutex; condition_variable m_condvar; - list> m_queue; + TaskQueue m_queue; size_t m_thread_max; size_t m_thread_min = 0; - set> m_threads; + set m_threads; shared_ptr m_load_tracking_thread; double m_load_target = 0; double m_prev_loadavg; @@ -121,60 +143,110 @@ namespace crucible { void loadavg_thread_fn(); void cancel(); + TaskMasterState &operator=(const TaskMasterState &) = delete; + TaskMasterState(const TaskMasterState &) = delete; + public: ~TaskMasterState(); TaskMasterState(size_t thread_max = thread::hardware_concurrency()); - static void push_back(shared_ptr task); - static void push_front(shared_ptr task); + static void push_back(const TaskStatePtr &task); + static void push_front(TaskQueue &queue); size_t get_queue_count(); size_t get_thread_count(); }; class TaskConsumer : public enable_shared_from_this { weak_ptr m_master; - thread m_thread; - shared_ptr m_current_task; + TaskStatePtr m_current_task; + + friend class TaskState; + TaskQueue m_local_queue; void consumer_thread(); shared_ptr current_task_locked(); + friend class TaskMaster; + friend class TaskMasterState; public: TaskConsumer(weak_ptr tms); shared_ptr current_task(); - friend class TaskMaster; - friend class TaskMasterState; + private: + // Make sure this gets constructed _last_ + thread m_thread; }; - static shared_ptr s_tms = make_shared(); + static thread_local TaskConsumerPtr tl_current_consumer; - TaskStateLock::TaskStateLock(TaskState &state) : - m_state(state), - m_lock(state.m_mutex) - { - } - - bool - TaskStateLock::is_queued() const - { - return m_state.m_is_queued; - } + static auto s_tms = make_shared(); void - TaskStateLock::set_queued(bool is_queued_now) + TaskState::rescue_queue(TaskQueue &queue) { - THROW_CHECK2(runtime_error, m_state.m_is_queued, is_queued_now, m_state.m_is_queued != is_queued_now); - m_state.m_is_queued = is_queued_now; + if (queue.empty()) { + return; + } + auto tlcc = tl_current_consumer; + if (tlcc) { + // We are executing under a TaskConsumer, splice our post-exec queue at front. + // No locks needed because we are using only thread-local objects. + tlcc->m_local_queue.splice(tlcc->m_local_queue.begin(), queue); + } else { + // We are not executing under a TaskConsumer. + // Create a new task to wrap our post-exec queue, + // then push it to the front of the global queue using normal locking methods. + TaskStatePtr rescue_task(make_shared("rescue_task", [](){})); + swap(rescue_task->m_post_exec_queue, queue); + rescue_task->m_is_waiting; + TaskQueue tq_one { rescue_task }; + TaskMasterState::push_front(tq_one); + } } TaskState::TaskState(string title, function exec_fn) : m_exec_fn(exec_fn), m_title(title), - m_id(++s_next_id), - m_queue_fn(TaskMasterState::push_back) + m_id(++s_next_id) { THROW_CHECK0(invalid_argument, !m_title.empty()); } + void + TaskState::clear() + { + TaskQueue post_exec_queue; + unique_lock lock(m_mutex); + swap(post_exec_queue, m_post_exec_queue); + lock.unlock(); + clear_queue(post_exec_queue); + } + + void + TaskState::clear_queue(TaskQueue &tq) + { + while (!tq.empty()) { + auto i = *tq.begin(); + tq.pop_front(); + i->clear(); + } + } + + void + TaskState::append_nolock(const TaskStatePtr &task) + { + task->m_is_waiting = true; + m_post_exec_queue.push_back(task); + } + + void + TaskState::append(const TaskStatePtr &task) + { + if (!task) { + return; + } + PairLock lock(m_mutex, task->m_mutex); + append_nolock(task); + } + void TaskState::exec() { @@ -182,9 +254,9 @@ namespace crucible { THROW_CHECK0(invalid_argument, !m_title.empty()); unique_lock lock(m_mutex); - m_is_queued = false; + m_is_waiting = false; if (m_is_running) { - m_run_again = true; + append_nolock(shared_from_this()); return; } else { m_is_running = true; @@ -195,24 +267,21 @@ namespace crucible { DIE_IF_MINUS_ERRNO(pthread_getname_np(pthread_self(), buf, sizeof(buf))); DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), m_title.c_str())); - weak_ptr this_task_wp = shared_from_this(); - swap(this_task_wp, tl_current_task_wp); + TaskStatePtr this_task = shared_from_this(); + swap(this_task, tl_current_task); catch_all([&]() { m_exec_fn(); }); - swap(this_task_wp, tl_current_task_wp); + swap(this_task, tl_current_task); pthread_setname_np(pthread_self(), buf); lock.lock(); m_is_running = false; - bool run_again = m_run_again; - m_run_again = false; - lock.unlock(); - if (run_again) { - run(); - } + + // Splice task post_exec queue at front of local queue + TaskState::rescue_queue(m_post_exec_queue); } string @@ -228,34 +297,13 @@ namespace crucible { return m_id; } - void - TaskState::queue_at_head() - { - unique_lock lock(m_mutex); - m_queue_fn = TaskMasterState::push_front; - } - - void - TaskState::queue_at_tail() - { - unique_lock lock(m_mutex); - m_queue_fn = TaskMasterState::push_back; - } - - TaskStateLock - TaskState::lock_queue() - { - return TaskStateLock(*this); - } - void TaskState::run() { unique_lock lock(m_mutex); - THROW_CHECK0(runtime_error, m_queue_fn); - if (!m_is_queued) { - m_queue_fn(shared_from_this()); - m_is_queued = true; + if (!m_is_waiting) { + TaskMasterState::push_back(shared_from_this()); + m_is_waiting = true; } } @@ -288,11 +336,12 @@ namespace crucible { } void - TaskMasterState::push_back(shared_ptr task) + TaskMasterState::push_back(const TaskStatePtr &task) { THROW_CHECK0(runtime_error, task); unique_lock lock(s_tms->m_mutex); if (s_tms->m_cancelled) { + task->clear(); return; } s_tms->m_queue.push_back(task); @@ -301,14 +350,17 @@ namespace crucible { } void - TaskMasterState::push_front(shared_ptr task) + TaskMasterState::push_front(TaskQueue &queue) { - THROW_CHECK0(runtime_error, task); - unique_lock lock(s_tms->m_mutex); - if (s_tms->m_cancelled) { + if (queue.empty()) { return; } - s_tms->m_queue.push_front(task); + unique_lock lock(s_tms->m_mutex); + if (s_tms->m_cancelled) { + TaskState::clear_queue(queue); + return; + } + s_tms->m_queue.splice(s_tms->m_queue.begin(), queue); s_tms->m_condvar.notify_all(); s_tms->start_threads_nolock(); } @@ -441,7 +493,9 @@ namespace crucible { TaskMasterState::set_thread_count(size_t thread_max) { unique_lock lock(m_mutex); - // XXX: someday we might want to uncancel, and this would be the place to do it + // XXX: someday we might want to uncancel, and this would be the place to do it; + // however, when we cancel we destroy the entire Task queue, and that might be + // non-trivial to recover from if (m_cancelled) { return; } @@ -466,6 +520,7 @@ namespace crucible { m_queue.swap(empty_queue); m_condvar.notify_all(); lock.unlock(); + TaskState::clear_queue(empty_queue); } void @@ -552,23 +607,19 @@ namespace crucible { } void - Task::queue_at_head() const + Task::append(const Task &that) const { THROW_CHECK0(runtime_error, m_task_state); - m_task_state->queue_at_head(); - } - - void - Task::queue_at_tail() const - { - THROW_CHECK0(runtime_error, m_task_state); - m_task_state->queue_at_tail(); + if (!that) { + return; + } + m_task_state->append(that.m_task_state); } Task Task::current_task() { - return Task(tl_current_task_wp.lock()); + return Task(tl_current_task); } string @@ -619,23 +670,32 @@ namespace crucible { void TaskConsumer::consumer_thread() { - auto master_locked = m_master.lock(); + m_thread.detach(); + auto master_locked = m_master.lock(); unique_lock lock(master_locked->m_mutex); + + // It is now safe to access our own shared_ptr + TaskConsumerPtr this_consumer = shared_from_this(); + swap(this_consumer, tl_current_consumer); + while (!master_locked->m_cancelled) { if (master_locked->m_thread_max < master_locked->m_threads.size()) { + // We are one of too many threads, exit now break; } - if (master_locked->m_queue.empty()) { + if (!m_local_queue.empty()) { + m_current_task = *m_local_queue.begin(); + m_local_queue.pop_front(); + } else if (!master_locked->m_queue.empty()) { + m_current_task = *master_locked->m_queue.begin(); + master_locked->m_queue.pop_front(); + } else { master_locked->m_condvar.wait(lock); continue; } - m_current_task = *master_locked->m_queue.begin(); - auto hold_task = m_current_task; - master_locked->m_queue.pop_front(); - // Execute task without lock lock.unlock(); catch_all([&]() { @@ -643,8 +703,9 @@ namespace crucible { }); // Update m_current_task with lock + decltype(m_current_task) hold_task; lock.lock(); - m_current_task.reset(); + swap(hold_task, m_current_task); // Destroy hold_task without lock lock.unlock(); @@ -654,8 +715,20 @@ namespace crucible { lock.lock(); } - // Still holding lock - m_thread.detach(); + // There is no longer a current consumer, but hold our own shared + // state so it's still there in the destructor + swap(this_consumer, tl_current_consumer); + + // Release lock to rescue queue (may attempt to queue a new task at TaskMaster). + // rescue_queue normally sends tasks to the local queue of the current TaskConsumer thread, + // but we just disconnected ourselves from that. + lock.unlock(); + TaskState::rescue_queue(m_local_queue); + + // Hold lock so we can erase ourselves + lock.lock(); + + // Fun fact: shared_from_this() isn't usable until the constructor returns... master_locked->m_threads.erase(shared_from_this()); master_locked->m_condvar.notify_all(); } @@ -734,9 +807,10 @@ namespace crucible { class ExclusionState { mutex m_mutex; bool m_locked = false; - set m_tasks; + Task m_task; public: + ExclusionState(const string &title); ~ExclusionState(); void release(); bool try_lock(); @@ -748,8 +822,13 @@ namespace crucible { { } - Exclusion::Exclusion() : - m_exclusion_state(make_shared()) + Exclusion::Exclusion(const string &title) : + m_exclusion_state(make_shared(title)) + { + } + + ExclusionState::ExclusionState(const string &title) : + m_task(title, [](){}) { } @@ -758,17 +837,7 @@ namespace crucible { { unique_lock lock(m_mutex); m_locked = false; - bool first = true; - for (auto i : m_tasks) { - if (first) { - i.queue_at_head(); - i.run(); - first = false; - } else { - i.run(); - } - } - m_tasks.clear(); + m_task.run(); } ExclusionState::~ExclusionState() @@ -799,7 +868,13 @@ namespace crucible { ExclusionState::insert_task(Task task) { unique_lock lock(m_mutex); - m_tasks.insert(task); + if (m_locked) { + // If Exclusion is locked then queue task for release; + m_task.append(task); + } else { + // otherwise, run the inserted task immediately + task.run(); + } } bool diff --git a/test/task.cc b/test/task.cc index 4011ea1..e688a49 100644 --- a/test/task.cc +++ b/test/task.cc @@ -3,6 +3,8 @@ #include "crucible/task.h" #include "crucible/time.h" +#include +#include #include #include #include @@ -70,13 +72,14 @@ test_finish() TaskMaster::print_queue(oss); TaskMaster::print_workers(oss); TaskMaster::set_thread_count(0); - // cerr << "finish done" << endl; + cerr << "finish done..."; } void test_unfinish() { TaskMaster::set_thread_count(); + cerr << "unfinish done..."; } @@ -150,61 +153,78 @@ void test_exclusion(size_t count) { mutex only_one; - Exclusion excl; + auto excl = make_shared("test_excl"); mutex mtx; condition_variable cv; - unique_lock lock(mtx); + size_t tasks_running(0); + atomic lock_success_count(0); + atomic lock_failure_count(0); - auto b = make_shared(); + vector pings; + pings.resize(count); // Run several tasks in parallel for (size_t c = 0; c < count; ++c) { - auto bl = b->lock(); ostringstream oss; oss << "task #" << c; Task t( oss.str(), - [c, &only_one, &excl, bl]() mutable { + [c, &only_one, excl, &lock_success_count, &lock_failure_count, &pings, &tasks_running, &cv, &mtx]() mutable { // cerr << "Task #" << c << endl; (void)c; - auto lock = excl.try_lock(); + auto lock = excl->try_lock(); if (!lock) { - excl.insert_task(Task::current_task()); + excl->insert_task(Task::current_task()); + ++lock_failure_count; return; } + ++lock_success_count; bool locked = only_one.try_lock(); assert(locked); nanosleep(0.0001); only_one.unlock(); - bl.release(); + unique_lock mtx_lock(mtx); + --tasks_running; + ++pings[c]; + cv.notify_all(); } ); + unique_lock mtx_lock(mtx); + ++tasks_running; t.run(); } - bool done_flag = false; + // excl.reset(); - Task completed( - "Waiting for Barrier", - [&mtx, &cv, &done_flag]() { - unique_lock lock(mtx); - // cerr << "Running cv notify" << endl; - done_flag = true; - cv.notify_all(); + unique_lock lock(mtx); + while (tasks_running) { + auto cv_rv = cv.wait_for(lock, chrono::duration(1)); + if (cv_rv == cv_status::timeout) { + // TaskMaster::print_tasks(cerr); + for (auto i : pings) { + cerr << i << " "; + } + cerr << endl << "tasks_running = " << tasks_running << endl; + cerr << "lock_success_count " << lock_success_count << endl; + cerr << "lock_failure_count " << lock_failure_count << endl; } - ); - b->insert_task(completed); + } + cerr << "lock_success_count " << lock_success_count << endl; + cerr << "lock_failure_count " << lock_failure_count << endl; - b.reset(); - - while (true) { - if (done_flag) { - break; + bool oops = false; + for (size_t c = 0; c < pings.size(); ++c) { + if (pings[c] != 1) { + cerr << "pings[" << c << "] = " << pings[c] << endl; + oops = true; } - - cv.wait(lock); + } + if (oops) { + assert(!"Pings not OK"); + } else { + cerr << "Pings OK" << endl; } }