#include "crucible/task.h" #include "crucible/cleanup.h" #include "crucible/error.h" #include "crucible/process.h" #include #include #include #include #include #include #include namespace crucible { using namespace std; static thread_local weak_ptr tl_current_task_wp; class TaskState : public enable_shared_from_this { const function m_exec_fn; const function m_print_fn; TaskId m_id; static atomic s_next_id; public: TaskState(function exec_fn, function print_fn); void exec(); ostream &print(ostream &os); TaskId id() const; }; atomic TaskState::s_next_id; class TaskConsumer; class TaskMasterState; class TaskMasterState : public enable_shared_from_this { mutex m_mutex; condition_variable m_condvar; list> m_queue; size_t m_thread_max; set> m_threads; friend class TaskConsumer; friend class TaskMaster; void start_stop_threads(); void set_thread_count(size_t thread_max); public: ~TaskMasterState(); TaskMasterState(size_t thread_max = thread::hardware_concurrency()); static void push_back(shared_ptr task); static void push_front(shared_ptr task); size_t get_queue_count(); }; class TaskConsumer : public enable_shared_from_this { weak_ptr m_master; thread m_thread; shared_ptr m_current_task; void consumer_thread(); shared_ptr current_task_locked(); public: TaskConsumer(weak_ptr tms); shared_ptr current_task(); friend class TaskMaster; }; static shared_ptr s_tms = make_shared(); TaskState::TaskState(function exec_fn, function print_fn) : m_exec_fn(exec_fn), m_print_fn(print_fn), m_id(++s_next_id) { } void TaskState::exec() { THROW_CHECK0(invalid_argument, m_exec_fn); THROW_CHECK0(invalid_argument, m_print_fn); char buf[24]; memset(buf, '\0', sizeof(buf)); DIE_IF_MINUS_ERRNO(pthread_getname_np(pthread_self(), buf, sizeof(buf))); Cleanup pthread_name_cleaner([&]() { pthread_setname_np(pthread_self(), buf); }); ostringstream oss; m_print_fn(oss); auto thread_name = oss.str(); DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), thread_name.c_str())); weak_ptr this_task_wp = shared_from_this(); Cleanup current_task_cleaner([&]() { swap(this_task_wp, tl_current_task_wp); }); swap(this_task_wp, tl_current_task_wp); m_exec_fn(); } ostream & TaskState::print(ostream &os) { THROW_CHECK0(invalid_argument, m_print_fn); return m_print_fn(os); } TaskId TaskState::id() const { return m_id; } TaskMasterState::TaskMasterState(size_t thread_max) : m_thread_max(thread_max) { } void TaskMasterState::start_stop_threads() { unique_lock lock(m_mutex); while (m_threads.size() < m_thread_max) { m_threads.insert(make_shared(shared_from_this())); } while (m_threads.size() > m_thread_max) { m_condvar.wait(lock); } } void TaskMasterState::push_back(shared_ptr task) { THROW_CHECK0(runtime_error, task); s_tms->start_stop_threads(); unique_lock lock(s_tms->m_mutex); s_tms->m_queue.push_back(task); s_tms->m_condvar.notify_all(); } void TaskMasterState::push_front(shared_ptr task) { THROW_CHECK0(runtime_error, task); s_tms->start_stop_threads(); unique_lock lock(s_tms->m_mutex); s_tms->m_queue.push_front(task); s_tms->m_condvar.notify_all(); } TaskMasterState::~TaskMasterState() { set_thread_count(0); } size_t TaskMaster::get_queue_count() { unique_lock lock(s_tms->m_mutex); return s_tms->m_queue.size(); } ostream & TaskMaster::print_queue(ostream &os) { unique_lock lock(s_tms->m_mutex); os << "Queue (size " << s_tms->m_queue.size() << "):" << endl; size_t counter = 0; for (auto i : s_tms->m_queue) { os << "Queue #" << ++counter << " Task ID " << i->id() << " "; i->print(os); os << endl; } return os << "Queue End" << endl; } ostream & TaskMaster::print_workers(ostream &os) { unique_lock lock(s_tms->m_mutex); os << "Workers (size " << s_tms->m_threads.size() << "):" << endl; size_t counter = 0; for (auto i : s_tms->m_threads) { os << "Worker #" << ++counter << " "; auto task = i->current_task_locked(); if (task) { os << "Task ID " << task->id() << " "; task->print(os); } else { os << "(idle)"; } os << endl; } return os << "Workers End" << endl; } void TaskMasterState::set_thread_count(size_t thread_max) { unique_lock lock(m_mutex); // If we are reducing the number of threads we have to wake them up so they can exit their loops if (thread_max < m_thread_max) { m_condvar.notify_all(); } // Lower maximum then release lock m_thread_max = thread_max; lock.unlock(); // Wait for threads to be stopped or go start them now start_stop_threads(); } void TaskMaster::set_thread_count(size_t thread_max) { s_tms->set_thread_count(thread_max); } void TaskMaster::set_thread_count() { set_thread_count(thread::hardware_concurrency()); } Task::Task(shared_ptr pts) : m_task_state(pts) { } Task::Task(function exec_fn, function print_fn) : m_task_state(make_shared(exec_fn, print_fn)) { } void Task::run() const { THROW_CHECK0(runtime_error, m_task_state); TaskMasterState::push_back(m_task_state); } void Task::run_earlier() const { THROW_CHECK0(runtime_error, m_task_state); TaskMasterState::push_front(m_task_state); } Task Task::current_task() { return Task(tl_current_task_wp.lock()); } ostream & Task::print(ostream &os) const { THROW_CHECK0(runtime_error, m_task_state); return m_task_state->print(os); } ostream & operator<<(ostream &os, const Task &task) { return task.print(os); }; TaskId Task::id() const { THROW_CHECK0(runtime_error, m_task_state); return m_task_state->id(); } bool Task::operator<(const Task &that) const { return id() < that.id(); } Task::operator bool() const { return !!m_task_state; } shared_ptr TaskConsumer::current_task_locked() { return m_current_task; } shared_ptr TaskConsumer::current_task() { auto master_locked = m_master.lock(); unique_lock lock(master_locked->m_mutex); return current_task_locked(); } void TaskConsumer::consumer_thread() { auto master_locked = m_master.lock(); while (true) { unique_lock lock(master_locked->m_mutex); if (master_locked->m_thread_max < master_locked->m_threads.size()) { break; } if (master_locked->m_queue.empty()) { master_locked->m_condvar.wait(lock); continue; } m_current_task = *master_locked->m_queue.begin(); master_locked->m_queue.pop_front(); lock.unlock(); catch_all([&]() { m_current_task->exec(); }); lock.lock(); m_current_task.reset(); } unique_lock lock(master_locked->m_mutex); m_thread.detach(); master_locked->m_threads.erase(shared_from_this()); master_locked->m_condvar.notify_all(); } TaskConsumer::TaskConsumer(weak_ptr tms) : m_master(tms), m_thread([=](){ consumer_thread(); }) { } class BarrierState { mutex m_mutex; set m_tasks; void release(); public: ~BarrierState(); void insert_task(Task t); }; Barrier::Barrier(shared_ptr pbs) : m_barrier_state(pbs) { } Barrier::Barrier() : m_barrier_state(make_shared()) { } void BarrierState::release() { unique_lock lock(m_mutex); for (auto i : m_tasks) { i.run(); } m_tasks.clear(); } BarrierState::~BarrierState() { release(); } BarrierLock::BarrierLock(shared_ptr pbs) : m_barrier_state(pbs) { } void BarrierLock::release() { m_barrier_state.reset(); } void BarrierState::insert_task(Task t) { unique_lock lock(m_mutex); m_tasks.insert(t); } void Barrier::insert_task(Task t) { m_barrier_state->insert_task(t); } BarrierLock Barrier::lock() { return BarrierLock(m_barrier_state); } class ExclusionState { mutex m_mutex; bool m_locked = false; set m_tasks; public: ~ExclusionState(); void release(); bool try_lock(); void insert_task(Task t); }; Exclusion::Exclusion(shared_ptr pbs) : m_exclusion_state(pbs) { } Exclusion::Exclusion() : m_exclusion_state(make_shared()) { } void ExclusionState::release() { unique_lock lock(m_mutex); m_locked = false; bool first = true; for (auto i : m_tasks) { if (first) { i.run_earlier(); first = false; } else { i.run(); } } m_tasks.clear(); } ExclusionState::~ExclusionState() { release(); } ExclusionLock::ExclusionLock(shared_ptr pbs) : m_exclusion_state(pbs) { } void ExclusionLock::release() { if (m_exclusion_state) { m_exclusion_state->release(); m_exclusion_state.reset(); } } ExclusionLock::~ExclusionLock() { release(); } void ExclusionState::insert_task(Task task) { unique_lock lock(m_mutex); m_tasks.insert(task); } bool ExclusionState::try_lock() { unique_lock lock(m_mutex); if (m_locked) { return false; } else { m_locked = true; return true; } } void Exclusion::insert_task(Task t) { m_exclusion_state->insert_task(t); } ExclusionLock::operator bool() const { return !!m_exclusion_state; } ExclusionLock Exclusion::try_lock() { THROW_CHECK0(runtime_error, m_exclusion_state); if (m_exclusion_state->try_lock()) { return ExclusionLock(m_exclusion_state); } else { return ExclusionLock(); } } }