#include "crucible/task.h" #include "crucible/error.h" #include "crucible/process.h" #include "crucible/time.h" #include #include #include #include #include #include #include #include #include namespace crucible { using namespace std; class TaskState; using TaskStatePtr = shared_ptr; using TaskStateWeak = weak_ptr; class TaskConsumer; using TaskConsumerPtr = shared_ptr; using TaskConsumerWeak = weak_ptr; using TaskQueue = list; static thread_local TaskStatePtr tl_current_task; /// because we don't want to bump -std=c++-17 just to get scoped_lock class PairLock { unique_lock m_lock1, m_lock2; public: PairLock(mutex &m1, mutex &m2) : m_lock1(m1, defer_lock), m_lock2(m2, defer_lock) { if (&m1 == &m2) { m_lock1.lock(); } else { lock(m_lock1, m_lock2); } } }; class TaskState : public enable_shared_from_this { mutex m_mutex; const function m_exec_fn; const string m_title; /// Tasks to be executed after the current task is executed list m_post_exec_queue; /// Incremented by run() and append(). Decremented by exec(). size_t m_run_count = 0; /// Set when task starts execution by exec(). /// Cleared when exec() ends. bool m_is_running = false; /// Sequential identifier for next task static atomic s_next_id; /// Sequential identifier for next task static atomic s_instance_count; /// Identifier for this task TaskId m_id; /// Backend for append() void append_nolock(const TaskStatePtr &task); /// Clear the post-execution queue. Recursively destroys post-exec /// queues of all tasks in post-exec queue. Useful only when /// cancelling the entire task queue. void clear(); friend class TaskMasterState; friend class TaskConsumer; /// Clear any TaskQueue, not just this one. static void clear_queue(TaskQueue &tq); /// Rescue any TaskQueue, not just this one. static void rescue_queue(TaskQueue &tq); TaskState &operator=(const TaskState &) = delete; TaskState(const TaskState &) = delete; public: ~TaskState(); TaskState(string title, function exec_fn); /// Run the task at most one more time. If task has /// already started running, a new instance is scheduled. /// If an instance is already scheduled by run() or /// append(), does nothing. Otherwise, schedules a new /// instance at the end of TaskMaster's global queue. void run(); /// Execute task immediately in current thread if it is not already /// executing in another thread; otherwise, append the current task /// to itself to be executed immediately in the other thread. void exec(); /// Return title of task. string title() const; /// Return ID of task. TaskId id() const; /// Queue task to execute after current task finishes executing /// or is destroyed. void append(const TaskStatePtr &task); /// How masy Tasks are there? Good for catching leaks static size_t instance_count(); }; atomic TaskState::s_next_id; atomic TaskState::s_instance_count; class TaskMasterState : public enable_shared_from_this { mutex m_mutex; condition_variable m_condvar; TaskQueue m_queue; size_t m_thread_max; size_t m_thread_min = 0; set m_threads; shared_ptr m_load_tracking_thread; double m_load_target = 0; double m_prev_loadavg; size_t m_configured_thread_max; double m_thread_target; bool m_cancelled = false; friend class TaskConsumer; friend class TaskMaster; void start_threads_nolock(); void start_stop_threads(); void set_thread_count(size_t thread_max); void set_thread_min_count(size_t thread_min); void adjust_thread_count(); size_t calculate_thread_count_nolock(); void set_loadavg_target(double target); void loadavg_thread_fn(); void cancel(); TaskMasterState &operator=(const TaskMasterState &) = delete; TaskMasterState(const TaskMasterState &) = delete; public: ~TaskMasterState(); TaskMasterState(size_t thread_max = thread::hardware_concurrency()); static void push_back(const TaskStatePtr &task); static void push_front(TaskQueue &queue); size_t get_queue_count(); size_t get_thread_count(); }; class TaskConsumer : public enable_shared_from_this { weak_ptr m_master; TaskStatePtr m_current_task; friend class TaskState; TaskQueue m_local_queue; void consumer_thread(); shared_ptr current_task_locked(); friend class TaskMaster; friend class TaskMasterState; public: TaskConsumer(weak_ptr tms); shared_ptr current_task(); private: // Make sure this gets constructed _last_ thread m_thread; }; static thread_local TaskConsumerPtr tl_current_consumer; static auto s_tms = make_shared(); void TaskState::rescue_queue(TaskQueue &queue) { if (queue.empty()) { return; } auto tlcc = tl_current_consumer; if (tlcc) { // We are executing under a TaskConsumer, splice our post-exec queue at front. // No locks needed because we are using only thread-local objects. tlcc->m_local_queue.splice(tlcc->m_local_queue.begin(), queue); } else { // We are not executing under a TaskConsumer. // Create a new task to wrap our post-exec queue, // then push it to the front of the global queue using normal locking methods. TaskStatePtr rescue_task(make_shared("rescue_task", [](){})); swap(rescue_task->m_post_exec_queue, queue); TaskQueue tq_one { rescue_task }; TaskMasterState::push_front(tq_one); } } TaskState::~TaskState() { --s_instance_count; } TaskState::TaskState(string title, function exec_fn) : m_exec_fn(exec_fn), m_title(title), m_id(++s_next_id) { THROW_CHECK0(invalid_argument, !m_title.empty()); ++s_instance_count; } size_t TaskState::instance_count() { return s_instance_count; } size_t Task::instance_count() { return TaskState::instance_count(); } void TaskState::clear() { TaskQueue post_exec_queue; unique_lock lock(m_mutex); swap(post_exec_queue, m_post_exec_queue); lock.unlock(); clear_queue(post_exec_queue); } void TaskState::clear_queue(TaskQueue &tq) { while (!tq.empty()) { auto i = *tq.begin(); tq.pop_front(); i->clear(); } } void TaskState::append_nolock(const TaskStatePtr &task) { THROW_CHECK0(invalid_argument, task); m_post_exec_queue.push_back(task); } void TaskState::append(const TaskStatePtr &task) { THROW_CHECK0(invalid_argument, task); PairLock lock(m_mutex, task->m_mutex); if (!task->m_run_count) { ++task->m_run_count; append_nolock(task); } } void TaskState::exec() { THROW_CHECK0(invalid_argument, m_exec_fn); THROW_CHECK0(invalid_argument, !m_title.empty()); unique_lock lock(m_mutex); if (m_is_running) { append_nolock(shared_from_this()); return; } else { --m_run_count; m_is_running = true; } lock.unlock(); char buf[24] = { 0 }; DIE_IF_MINUS_ERRNO(pthread_getname_np(pthread_self(), buf, sizeof(buf))); DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), m_title.c_str())); TaskStatePtr this_task = shared_from_this(); swap(this_task, tl_current_task); catch_all([&]() { m_exec_fn(); }); swap(this_task, tl_current_task); pthread_setname_np(pthread_self(), buf); lock.lock(); m_is_running = false; // Splice task post_exec queue at front of local queue TaskState::rescue_queue(m_post_exec_queue); } string TaskState::title() const { THROW_CHECK0(runtime_error, !m_title.empty()); return m_title; } TaskId TaskState::id() const { return m_id; } void TaskState::run() { unique_lock lock(m_mutex); if (m_run_count) { return; } ++m_run_count; TaskMasterState::push_back(shared_from_this()); } TaskMasterState::TaskMasterState(size_t thread_max) : m_thread_max(thread_max), m_configured_thread_max(thread_max), m_thread_target(thread_max) { } void TaskMasterState::start_threads_nolock() { while (m_threads.size() < m_thread_max) { m_threads.insert(make_shared(shared_from_this())); } } void TaskMasterState::start_stop_threads() { unique_lock lock(m_mutex); while (m_threads.size() != m_thread_max) { if (m_threads.size() < m_thread_max) { m_threads.insert(make_shared(shared_from_this())); } else if (m_threads.size() > m_thread_max) { m_condvar.wait(lock); } } } void TaskMasterState::push_back(const TaskStatePtr &task) { THROW_CHECK0(runtime_error, task); unique_lock lock(s_tms->m_mutex); if (s_tms->m_cancelled) { task->clear(); return; } s_tms->m_queue.push_back(task); s_tms->m_condvar.notify_all(); s_tms->start_threads_nolock(); } void TaskMasterState::push_front(TaskQueue &queue) { if (queue.empty()) { return; } unique_lock lock(s_tms->m_mutex); if (s_tms->m_cancelled) { TaskState::clear_queue(queue); return; } s_tms->m_queue.splice(s_tms->m_queue.begin(), queue); s_tms->m_condvar.notify_all(); s_tms->start_threads_nolock(); } TaskMasterState::~TaskMasterState() { set_thread_count(0); } size_t TaskMaster::get_queue_count() { unique_lock lock(s_tms->m_mutex); return s_tms->m_queue.size(); } size_t TaskMaster::get_thread_count() { unique_lock lock(s_tms->m_mutex); return s_tms->m_threads.size(); } ostream & TaskMaster::print_queue(ostream &os) { unique_lock lock(s_tms->m_mutex); os << "Queue (size " << s_tms->m_queue.size() << "):" << endl; size_t counter = 0; for (auto i : s_tms->m_queue) { os << "Queue #" << ++counter << " Task ID " << i->id() << " " << i->title() << endl; } return os << "Queue End" << endl; } ostream & TaskMaster::print_workers(ostream &os) { unique_lock lock(s_tms->m_mutex); os << "Workers (size " << s_tms->m_threads.size() << "):" << endl; size_t counter = 0; for (auto i : s_tms->m_threads) { os << "Worker #" << ++counter << " "; auto task = i->current_task_locked(); if (task) { os << "Task ID " << task->id() << " " << task->title(); } else { os << "(idle)"; } os << endl; } return os << "Workers End" << endl; } size_t TaskMasterState::calculate_thread_count_nolock() { if (m_cancelled) { // No threads running while cancelled return 0; } if (m_load_target == 0) { // No limits, no stats, use configured thread count return m_configured_thread_max; } if (m_configured_thread_max == 0) { // Not a lot of choice here, and zeros break the algorithm return 0; } const double loadavg = getloadavg1(); static const double load_exp = exp(-5.0 / 60.0); // Averages are fun, but want to know the load from the last 5 seconds. // Invert the load average function: // LA = LA * load_exp + N * (1 - load_exp) // LA2 - LA1 = LA1 * load_exp + N * (1 - load_exp) - LA1 // LA2 - LA1 + LA1 = LA1 * load_exp + N * (1 - load_exp) // LA2 - LA1 + LA1 - LA1 * load_exp = N * (1 - load_exp) // LA2 - LA1 * load_exp = N * (1 - load_exp) // LA2 / (1 - load_exp) - (LA1 * load_exp / 1 - load_exp) = N // (LA2 - LA1 * load_exp) / (1 - load_exp) = N // except for rounding error which might make this just a bit below zero. const double current_load = max(0.0, (loadavg - m_prev_loadavg * load_exp) / (1 - load_exp)); m_prev_loadavg = loadavg; // Change the thread target based on the // difference between current and desired load // but don't get too close all at once due to rounding and sample error. // If m_load_target < 1.0 then we are just doing PWM with one thread. if (m_load_target <= 1.0) { m_thread_target = 1.0; } else if (m_load_target - current_load >= 1.0) { m_thread_target += (m_load_target - current_load - 1.0) / 2.0; } else if (m_load_target < current_load) { m_thread_target += m_load_target - current_load; } // Cannot exceed configured maximum thread count or less than zero m_thread_target = min(max(0.0, m_thread_target), double(m_configured_thread_max)); // Convert to integer but keep within range const size_t rv = max(m_thread_min, min(size_t(ceil(m_thread_target)), m_configured_thread_max)); return rv; } void TaskMasterState::adjust_thread_count() { unique_lock lock(m_mutex); size_t new_thread_max = calculate_thread_count_nolock(); size_t old_thread_max = m_thread_max; m_thread_max = new_thread_max; // If we are reducing the number of threads we have to wake them up so they can exit their loops // If we are increasing the number of threads we have to notify start_stop_threads it can stop waiting for threads to stop if (new_thread_max != old_thread_max) { m_condvar.notify_all(); start_threads_nolock(); } } void TaskMasterState::set_thread_count(size_t thread_max) { unique_lock lock(m_mutex); // XXX: someday we might want to uncancel, and this would be the place to do it; // however, when we cancel we destroy the entire Task queue, and that might be // non-trivial to recover from if (m_cancelled) { return; } m_configured_thread_max = thread_max; lock.unlock(); adjust_thread_count(); start_stop_threads(); } void TaskMaster::set_thread_count(size_t thread_max) { s_tms->set_thread_count(thread_max); } void TaskMasterState::cancel() { unique_lock lock(m_mutex); m_cancelled = true; decltype(m_queue) empty_queue; m_queue.swap(empty_queue); m_condvar.notify_all(); lock.unlock(); TaskState::clear_queue(empty_queue); } void TaskMaster::cancel() { s_tms->cancel(); } void TaskMasterState::set_thread_min_count(size_t thread_min) { unique_lock lock(m_mutex); // XXX: someday we might want to uncancel, and this would be the place to do it if (m_cancelled) { return; } m_thread_min = thread_min; lock.unlock(); adjust_thread_count(); start_stop_threads(); } void TaskMaster::set_thread_min_count(size_t thread_min) { s_tms->set_thread_min_count(thread_min); } void TaskMasterState::loadavg_thread_fn() { pthread_setname_np(pthread_self(), "load_tracker"); while (!m_cancelled) { adjust_thread_count(); nanosleep(5.0); } } void TaskMasterState::set_loadavg_target(double target) { THROW_CHECK1(out_of_range, target, target >= 0); unique_lock lock(m_mutex); if (m_cancelled) { return; } m_load_target = target; m_prev_loadavg = getloadavg1(); if (target && !m_load_tracking_thread) { m_load_tracking_thread = make_shared([=] () { loadavg_thread_fn(); }); m_load_tracking_thread->detach(); } } void TaskMaster::set_loadavg_target(double target) { s_tms->set_loadavg_target(target); } void TaskMaster::set_thread_count() { set_thread_count(thread::hardware_concurrency()); } Task::Task(shared_ptr pts) : m_task_state(pts) { } Task::Task(string title, function exec_fn) : m_task_state(make_shared(title, exec_fn)) { } void Task::run() const { THROW_CHECK0(runtime_error, m_task_state); m_task_state->run(); } void Task::append(const Task &that) const { THROW_CHECK0(runtime_error, m_task_state); THROW_CHECK0(runtime_error, that); m_task_state->append(that.m_task_state); } Task Task::current_task() { return Task(tl_current_task); } string Task::title() const { THROW_CHECK0(runtime_error, m_task_state); return m_task_state->title(); } ostream & operator<<(ostream &os, const Task &task) { return os << task.title(); }; TaskId Task::id() const { THROW_CHECK0(runtime_error, m_task_state); return m_task_state->id(); } bool Task::operator<(const Task &that) const { return id() < that.id(); } Task::operator bool() const { return !!m_task_state; } shared_ptr TaskConsumer::current_task_locked() { return m_current_task; } shared_ptr TaskConsumer::current_task() { auto master_locked = m_master.lock(); unique_lock lock(master_locked->m_mutex); return current_task_locked(); } void TaskConsumer::consumer_thread() { m_thread.detach(); auto master_locked = m_master.lock(); unique_lock lock(master_locked->m_mutex); // It is now safe to access our own shared_ptr TaskConsumerPtr this_consumer = shared_from_this(); swap(this_consumer, tl_current_consumer); while (!master_locked->m_cancelled) { if (master_locked->m_thread_max < master_locked->m_threads.size()) { // We are one of too many threads, exit now break; } if (!m_local_queue.empty()) { m_current_task = *m_local_queue.begin(); m_local_queue.pop_front(); } else if (!master_locked->m_queue.empty()) { m_current_task = *master_locked->m_queue.begin(); master_locked->m_queue.pop_front(); } else { master_locked->m_condvar.wait(lock); continue; } // Execute task without lock lock.unlock(); catch_all([&]() { m_current_task->exec(); }); // Update m_current_task with lock decltype(m_current_task) hold_task; lock.lock(); swap(hold_task, m_current_task); // Destroy hold_task without lock lock.unlock(); hold_task.reset(); // Invariant: lock held lock.lock(); } // There is no longer a current consumer, but hold our own shared // state so it's still there in the destructor swap(this_consumer, tl_current_consumer); // Release lock to rescue queue (may attempt to queue a new task at TaskMaster). // rescue_queue normally sends tasks to the local queue of the current TaskConsumer thread, // but we just disconnected ourselves from that. lock.unlock(); TaskState::rescue_queue(m_local_queue); // Hold lock so we can erase ourselves lock.lock(); // Fun fact: shared_from_this() isn't usable until the constructor returns... master_locked->m_threads.erase(shared_from_this()); master_locked->m_condvar.notify_all(); } TaskConsumer::TaskConsumer(weak_ptr tms) : m_master(tms), m_thread([=](){ consumer_thread(); }) { } class BarrierState { mutex m_mutex; set m_tasks; void release(); public: ~BarrierState(); void insert_task(Task t); }; Barrier::Barrier(shared_ptr pbs) : m_barrier_state(pbs) { } Barrier::Barrier() : m_barrier_state(make_shared()) { } void BarrierState::release() { unique_lock lock(m_mutex); for (auto i : m_tasks) { i.run(); } m_tasks.clear(); } BarrierState::~BarrierState() { release(); } BarrierLock::BarrierLock(shared_ptr pbs) : m_barrier_state(pbs) { } void BarrierLock::release() { m_barrier_state.reset(); } void BarrierState::insert_task(Task t) { unique_lock lock(m_mutex); m_tasks.insert(t); } void Barrier::insert_task(Task t) { m_barrier_state->insert_task(t); } BarrierLock Barrier::lock() { return BarrierLock(m_barrier_state); } class ExclusionState { mutex m_mutex; bool m_locked = false; Task m_task; public: ExclusionState(const string &title); ~ExclusionState(); void release(); bool try_lock(); void insert_task(Task t); }; Exclusion::Exclusion(shared_ptr pbs) : m_exclusion_state(pbs) { } Exclusion::Exclusion(const string &title) : m_exclusion_state(make_shared(title)) { } ExclusionState::ExclusionState(const string &title) : m_task(title, [](){}) { } void ExclusionState::release() { unique_lock lock(m_mutex); m_locked = false; m_task.run(); } ExclusionState::~ExclusionState() { release(); } ExclusionLock::ExclusionLock(shared_ptr pbs) : m_exclusion_state(pbs) { } void ExclusionLock::release() { if (m_exclusion_state) { m_exclusion_state->release(); m_exclusion_state.reset(); } } ExclusionLock::~ExclusionLock() { release(); } void ExclusionState::insert_task(Task task) { unique_lock lock(m_mutex); if (m_locked) { // If Exclusion is locked then queue task for release; m_task.append(task); } else { // otherwise, run the inserted task immediately task.run(); } } bool ExclusionState::try_lock() { unique_lock lock(m_mutex); if (m_locked) { return false; } else { m_locked = true; return true; } } void Exclusion::insert_task(Task t) { m_exclusion_state->insert_task(t); } ExclusionLock::operator bool() const { return !!m_exclusion_state; } ExclusionLock Exclusion::try_lock() { THROW_CHECK0(runtime_error, m_exclusion_state); if (m_exclusion_state->try_lock()) { return ExclusionLock(m_exclusion_state); } else { return ExclusionLock(); } } }