From 8849e57bf059ee1fb43fa2f00eec48b02b8d984a Mon Sep 17 00:00:00 2001 From: Zygo Blaxell Date: Mon, 15 Jan 2018 23:07:12 -0500 Subject: [PATCH] crucible: add Task class We need a mechanism for distributing work across processor cores and disks. Task implements a simple FIFO/LIFO queue model for executing closures. Some locking primitives are included (mutex and barrier). Signed-off-by: Zygo Blaxell --- include/crucible/task.h | 143 ++++++++++++ lib/Makefile | 1 + lib/task.cc | 490 ++++++++++++++++++++++++++++++++++++++++ test/Makefile | 1 + test/task.cc | 230 +++++++++++++++++++ 5 files changed, 865 insertions(+) create mode 100644 include/crucible/task.h create mode 100644 lib/task.cc create mode 100644 test/task.cc diff --git a/include/crucible/task.h b/include/crucible/task.h new file mode 100644 index 0000000..72ee359 --- /dev/null +++ b/include/crucible/task.h @@ -0,0 +1,143 @@ +#ifndef CRUCIBLE_TASK_H +#define CRUCIBLE_TASK_H + +#include +#include +#include + +namespace crucible { + using namespace std; + + class TaskState; + + using TaskId = uint64_t; + + class Task { + shared_ptr m_task_state; + + Task(shared_ptr pts); + + TaskId id() const; + + public: + + // create Task object containing closure and description + Task(function exec_fn, function print_fn); + + // schedule Task at end of queue. + // May run Task in current thread or in other thread. + // May run Task before or after returning. + void run() const; + + // schedule Task before other queued tasks + void run_earlier() const; + + // Returns currently executing task if called from exec_fn. + // Usually used to reschedule the currently executing Task. + static Task current_task(); + + bool operator<(const Task &that) const; + }; + + class TaskMaster { + public: + // Blocks until the running thread count reaches this number + static void set_thread_count(size_t threads); + + // Calls set_thread_count with default + static void set_thread_count(); + + // Writes the current non-executing Task queue + static ostream & print_queue(ostream &); + + // Writes the current executing Task for each worker + static ostream & print_workers(ostream &); + + // Gets the current number of queued Tasks + static size_t get_queue_count(); + + }; + + // Barrier executes waiting Tasks once the last BarrierLock + // is released. Multiple unique Tasks may be scheduled while + // BarrierLocks exist and all will be run() at once upon + // release. If no BarrierLocks exist, Tasks are executed + // immediately upon insertion. + + class BarrierState; + + class BarrierLock { + shared_ptr m_barrier_state; + BarrierLock(shared_ptr pbs); + friend class Barrier; + public: + // Release this Lock immediately and permanently + void release(); + }; + + class Barrier { + shared_ptr m_barrier_state; + + Barrier(shared_ptr pbs); + public: + Barrier(); + + // Prevent execution of tasks behind barrier until + // BarrierLock destructor or release() method is called. + BarrierLock lock(); + + // Schedule a task for execution when no Locks exist + void insert_task(Task t); + }; + + // Exclusion provides exclusive access to a ExclusionLock. + // One Task will be able to obtain the ExclusionLock; other Tasks + // may schedule themselves for re-execution after the ExclusionLock + // is released. + + class ExclusionState; + class Exclusion; + + class ExclusionLock { + shared_ptr m_exclusion_state; + ExclusionLock(shared_ptr pes); + ExclusionLock() = default; + friend class Exclusion; + public: + // Calls release() + ~ExclusionLock(); + + // Release this Lock immediately and permanently + void release(); + + // Test for locked state + operator bool() const; + }; + + class Exclusion { + shared_ptr m_exclusion_state; + + Exclusion(shared_ptr pes); + public: + Exclusion(); + + // Attempt to obtain a Lock. If successful, current Task + // owns the Lock until the ExclusionLock is released + // (it is the ExclusionLock that owns the lock, so it can + // be passed to other Tasks or threads, but this is not + // recommended practice). + // If not successful, current Task is expected to call + // insert_task(current_task()), release any ExclusionLock + // objects it holds, and exit its Task function. + ExclusionLock try_lock(); + + // Execute Task when Exclusion is unlocked (possibly immediately). + // First Task is scheduled with run_earlier(), all others are + // scheduled with run(). + void insert_task(Task t); + }; + + +} + +#endif // CRUCIBLE_TASK_H diff --git a/lib/Makefile b/lib/Makefile index 50c38d9..9ca53af 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -14,6 +14,7 @@ OBJS = \ path.o \ process.o \ string.o \ + task.o \ time.o \ uuid.o \ .version.o \ diff --git a/lib/task.cc b/lib/task.cc new file mode 100644 index 0000000..3d31aa2 --- /dev/null +++ b/lib/task.cc @@ -0,0 +1,490 @@ +#include "crucible/task.h" + +#include "crucible/cleanup.h" +#include "crucible/error.h" +#include "crucible/process.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace crucible { + using namespace std; + + static thread_local weak_ptr s_current_task_wp; + + class TaskState : public enable_shared_from_this { + const function m_exec_fn; + const function m_print_fn; + TaskId m_id; + + static atomic s_next_id; + public: + TaskState(function exec_fn, function print_fn); + + void exec(); + ostream &print(ostream &os); + TaskId id(); + }; + + atomic TaskState::s_next_id; + + class TaskConsumer; + class TaskMasterState; + + class TaskMasterState : public enable_shared_from_this { + mutex m_mutex; + condition_variable m_condvar; + list> m_queue; + size_t m_thread_max; + set> m_threads; + + friend class TaskConsumer; + friend class TaskMaster; + + void start_stop_threads(); + void set_thread_count(size_t thread_max); + + public: + ~TaskMasterState(); + TaskMasterState(size_t thread_max = thread::hardware_concurrency()); + + static void push_back(shared_ptr task); + static void push_front(shared_ptr task); + size_t get_queue_count(); + }; + + class TaskConsumer : public enable_shared_from_this { + weak_ptr m_master; + thread m_thread; + shared_ptr m_current_task; + + void consumer_thread(); + shared_ptr current_task_locked(); + public: + TaskConsumer(weak_ptr tms); + shared_ptr current_task(); + friend class TaskMaster; + }; + + static shared_ptr s_tms = make_shared(); + + TaskState::TaskState(function exec_fn, + function print_fn) : + m_exec_fn(exec_fn), + m_print_fn(print_fn), + m_id(++s_next_id) + { + } + + void + TaskState::exec() + { + THROW_CHECK0(invalid_argument, m_exec_fn); + weak_ptr this_task_wp = shared_from_this(); + Cleanup cleaner([&]() { + swap(this_task_wp, s_current_task_wp); + }); + swap(this_task_wp, s_current_task_wp); + m_exec_fn(); + } + + ostream & + TaskState::print(ostream &os) + { + THROW_CHECK0(invalid_argument, m_print_fn); + return m_print_fn(os); + } + + TaskId + TaskState::id() + { + return m_id; + } + + TaskMasterState::TaskMasterState(size_t thread_max) : + m_thread_max(thread_max) + { + } + + void + TaskMasterState::start_stop_threads() + { + unique_lock lock(m_mutex); + while (m_threads.size() < m_thread_max) { + m_threads.insert(make_shared(shared_from_this())); + } + while (m_threads.size() > m_thread_max) { + m_condvar.wait(lock); + } + } + + void + TaskMasterState::push_back(shared_ptr task) + { + assert(task); + THROW_CHECK0(runtime_error, task); + s_tms->start_stop_threads(); + unique_lock lock(s_tms->m_mutex); + s_tms->m_queue.push_back(task); + s_tms->m_condvar.notify_all(); + } + + void + TaskMasterState::push_front(shared_ptr task) + { + assert(task); + THROW_CHECK0(runtime_error, task); + s_tms->start_stop_threads(); + unique_lock lock(s_tms->m_mutex); + s_tms->m_queue.push_front(task); + s_tms->m_condvar.notify_all(); + } + + TaskMasterState::~TaskMasterState() + { + set_thread_count(0); + } + + size_t + TaskMaster::get_queue_count() + { + unique_lock lock(s_tms->m_mutex); + return s_tms->m_queue.size(); + } + + ostream & + TaskMaster::print_queue(ostream &os) + { + unique_lock lock(s_tms->m_mutex); + os << "Queue (size " << s_tms->m_queue.size() << "):" << endl; + size_t counter = 0; + for (auto i : s_tms->m_queue) { + os << "Queue #" << ++counter << " Task ID " << i->id() << " "; + i->print(os); + os << endl; + } + return os << "Queue End" << endl; + } + + ostream & + TaskMaster::print_workers(ostream &os) + { + unique_lock lock(s_tms->m_mutex); + os << "Workers (size " << s_tms->m_threads.size() << "):" << endl; + size_t counter = 0; + for (auto i : s_tms->m_threads) { + os << "Worker #" << ++counter << " "; + auto task = i->current_task_locked(); + if (task) { + os << "Task ID " << task->id() << " "; + task->print(os); + } else { + os << "(idle)"; + } + os << endl; + } + return os << "Workers End" << endl; + } + + void + TaskMasterState::set_thread_count(size_t thread_max) + { + unique_lock lock(m_mutex); + + // If we are reducing the number of threads we have to wake them up so they can exit their loops + if (thread_max < m_thread_max) { + m_condvar.notify_all(); + } + + // Lower maximum then release lock + m_thread_max = thread_max; + lock.unlock(); + + // Wait for threads to be stopped or go start them now + start_stop_threads(); + } + + void + TaskMaster::set_thread_count(size_t thread_max) + { + s_tms->set_thread_count(thread_max); + } + + void + TaskMaster::set_thread_count() + { + set_thread_count(thread::hardware_concurrency()); + } + + Task::Task(shared_ptr pts) : + m_task_state(pts) + { + assert(m_task_state); + } + + Task::Task(function exec_fn, function print_fn) : + m_task_state(make_shared(exec_fn, print_fn)) + { + assert(m_task_state); + } + + void + Task::run() const + { + assert(m_task_state); + TaskMasterState::push_back(m_task_state); + } + + void + Task::run_earlier() const + { + assert(m_task_state); + TaskMasterState::push_front(m_task_state); + } + + Task + Task::current_task() + { + return Task(s_current_task_wp.lock()); + } + + TaskId + Task::id() const + { + if (m_task_state) { + return m_task_state->id(); + } + return 0; + } + + bool + Task::operator<(const Task &that) const + { + return id() < that.id(); + } + + shared_ptr + TaskConsumer::current_task_locked() + { + return m_current_task; + } + + shared_ptr + TaskConsumer::current_task() + { + auto master_locked = m_master.lock(); + unique_lock lock(master_locked->m_mutex); + return current_task_locked(); + } + + void + TaskConsumer::consumer_thread() + { + auto master_locked = m_master.lock(); + while (true) { + unique_lock lock(master_locked->m_mutex); + if (master_locked->m_thread_max < master_locked->m_threads.size()) { + break; + } + + if (master_locked->m_queue.empty()) { + master_locked->m_condvar.wait(lock); + continue; + } + + m_current_task = *master_locked->m_queue.begin(); + master_locked->m_queue.pop_front(); + lock.unlock(); + catch_all([&]() { + m_current_task->exec(); + }); + lock.lock(); + m_current_task.reset(); + } + + unique_lock lock(master_locked->m_mutex); + m_thread.detach(); + master_locked->m_threads.erase(shared_from_this()); + master_locked->m_condvar.notify_all(); + } + + TaskConsumer::TaskConsumer(weak_ptr tms) : + m_master(tms), + m_thread([=](){ consumer_thread(); }) + { + } + + class BarrierState { + mutex m_mutex; + set m_tasks; + + void release(); + public: + ~BarrierState(); + void insert_task(Task t); + }; + + Barrier::Barrier(shared_ptr pbs) : + m_barrier_state(pbs) + { + } + + Barrier::Barrier() : + m_barrier_state(make_shared()) + { + } + + void + BarrierState::release() + { + unique_lock lock(m_mutex); + for (auto i : m_tasks) { + i.run(); + } + m_tasks.clear(); + } + + BarrierState::~BarrierState() + { + release(); + } + + BarrierLock::BarrierLock(shared_ptr pbs) : + m_barrier_state(pbs) + { + } + + void + BarrierLock::release() + { + m_barrier_state.reset(); + } + + void + BarrierState::insert_task(Task t) + { + unique_lock lock(m_mutex); + m_tasks.insert(t); + } + + void + Barrier::insert_task(Task t) + { + m_barrier_state->insert_task(t); + } + + BarrierLock + Barrier::lock() + { + return BarrierLock(m_barrier_state); + } + + class ExclusionState { + mutex m_mutex; + bool m_locked = false; + set m_tasks; + + public: + ~ExclusionState(); + void release(); + bool try_lock(); + void insert_task(Task t); + }; + + Exclusion::Exclusion(shared_ptr pbs) : + m_exclusion_state(pbs) + { + } + + Exclusion::Exclusion() : + m_exclusion_state(make_shared()) + { + } + + void + ExclusionState::release() + { + unique_lock lock(m_mutex); + m_locked = false; + bool first = true; + for (auto i : m_tasks) { + if (first) { + i.run_earlier(); + first = false; + } else { + i.run(); + } + } + m_tasks.clear(); + } + + ExclusionState::~ExclusionState() + { + release(); + } + + ExclusionLock::ExclusionLock(shared_ptr pbs) : + m_exclusion_state(pbs) + { + } + + void + ExclusionLock::release() + { + if (m_exclusion_state) { + m_exclusion_state->release(); + m_exclusion_state.reset(); + } + } + + ExclusionLock::~ExclusionLock() + { + release(); + } + + void + ExclusionState::insert_task(Task task) + { + unique_lock lock(m_mutex); + m_tasks.insert(task); + } + + bool + ExclusionState::try_lock() + { + unique_lock lock(m_mutex); + if (m_locked) { + return false; + } else { + m_locked = true; + return true; + } + } + + void + Exclusion::insert_task(Task t) + { + m_exclusion_state->insert_task(t); + } + + ExclusionLock::operator bool() const + { + return !!m_exclusion_state; + } + + ExclusionLock + Exclusion::try_lock() + { + THROW_CHECK0(runtime_error, m_exclusion_state); + if (m_exclusion_state->try_lock()) { + return ExclusionLock(m_exclusion_state); + } else { + return ExclusionLock(); + } + } +} diff --git a/test/Makefile b/test/Makefile index 704b079..c0091e1 100644 --- a/test/Makefile +++ b/test/Makefile @@ -5,6 +5,7 @@ PROGRAMS = \ limits \ path \ process \ + task \ all: test diff --git a/test/task.cc b/test/task.cc new file mode 100644 index 0000000..37e8d16 --- /dev/null +++ b/test/task.cc @@ -0,0 +1,230 @@ +#include "tests.h" + +#include "crucible/task.h" +#include "crucible/time.h" + +#include +#include +#include +#include +#include + +#include + +using namespace crucible; +using namespace std; + +void +test_tasks(size_t count) +{ + TaskMaster::set_thread_count(); + + vector task_done(count, false); + + mutex mtx; + condition_variable cv; + + unique_lock lock(mtx); + + // Run several tasks in parallel + for (size_t c = 0; c < count; ++c) { + Task t([c, &task_done, &mtx, &cv]() { + unique_lock lock(mtx); + // cerr << "Task #" << c << endl; + task_done.at(c) = true; + cv.notify_one(); + }, [=](ostream& os) -> ostream& { return os << "task #" << c; }); + t.run(); + } + + // Get current status + ostringstream oss; + TaskMaster::print_queue(oss); + TaskMaster::print_workers(oss); + + while (true) { + size_t tasks_done = 0; + for (auto i : task_done) { + if (i) { + ++tasks_done; + } + } + if (tasks_done == count) { + return; + } + // cerr << "Tasks done: " << tasks_done << endl; + + cv.wait(lock); + } +} + +#define TASK_MACRO(printer, body) Task( \ + [=]() { body; }, \ + [=](ostream &__os) -> ostream& { return __os << printer; } \ +) + +void +test_macro() +{ + TASK_MACRO( + "A Task", + cerr << "Hello, World!" << endl; + ).run(); +} + +void +test_finish() +{ + ostringstream oss; + TaskMaster::print_queue(oss); + TaskMaster::print_workers(oss); + TaskMaster::set_thread_count(0); + // cerr << "finish done" << endl; +} + +void +test_unfinish() +{ + TaskMaster::set_thread_count(); +} + + +void +test_barrier(size_t count) +{ + vector task_done(count, false); + + mutex mtx; + condition_variable cv; + + unique_lock lock(mtx); + + auto b = make_shared(); + + // Run several tasks in parallel + for (size_t c = 0; c < count; ++c) { + auto bl = b->lock(); + Task t([c, &task_done, &mtx, &cv, bl]() mutable { + // cerr << "Task #" << c << endl; + unique_lock lock(mtx); + task_done.at(c) = true; + bl.release(); + }, [=](ostream& os) -> ostream& { return os << "task #" << c; }); + t.run(); + } + + // Get current status + ostringstream oss; + TaskMaster::print_queue(oss); + TaskMaster::print_workers(oss); + + bool done_flag = false; + + Task completed( + [&mtx, &cv, &done_flag]() { + unique_lock lock(mtx); + // cerr << "Running cv notify" << endl; + done_flag = true; + cv.notify_all(); + }, + [](ostream &os) -> ostream& { + return os << "Waiting for Barrier"; + } + ); + b->insert_task(completed); + + b.reset(); + + while (true) { + size_t tasks_done = 0; + for (auto i : task_done) { + if (i) { + ++tasks_done; + } + } + // cerr << "Tasks done: " << tasks_done << " done_flag " << done_flag << endl; + if (tasks_done == count && done_flag) { + break; + } + + cv.wait(lock); + } + // cerr << "test_barrier return" << endl; +} + +void +test_exclusion(size_t count) +{ + mutex only_one; + Exclusion excl; + + mutex mtx; + condition_variable cv; + + unique_lock lock(mtx); + + auto b = make_shared(); + + // Run several tasks in parallel + for (size_t c = 0; c < count; ++c) { + auto bl = b->lock(); + Task t([c, &only_one, &mtx, &excl, bl]() mutable { + // cerr << "Task #" << c << endl; + auto lock = excl.try_lock(); + if (!lock) { + excl.insert_task(Task::current_task()); + return; + } + bool locked = only_one.try_lock(); + assert(locked); + nanosleep(0.0001); + only_one.unlock(); + bl.release(); + }, [=](ostream& os) -> ostream& { return os << "task #" << c; }); + t.run(); + } + + bool done_flag = false; + + Task completed( + [&mtx, &cv, &done_flag]() { + unique_lock lock(mtx); + // cerr << "Running cv notify" << endl; + done_flag = true; + cv.notify_all(); + }, + [](ostream &os) -> ostream& { + return os << "Waiting for Barrier"; + } + ); + b->insert_task(completed); + + b.reset(); + + while (true) { + if (done_flag) { + break; + } + + cv.wait(lock); + } +} + +int +main(int, char**) +{ + // in case of deadlock + alarm(9); + + RUN_A_TEST(test_tasks(256)); + RUN_A_TEST(test_macro()); + RUN_A_TEST(test_finish()); + RUN_A_TEST(test_unfinish()); + RUN_A_TEST(test_barrier(256)); + RUN_A_TEST(test_finish()); + RUN_A_TEST(test_unfinish()); + RUN_A_TEST(test_exclusion(256)); + RUN_A_TEST(test_finish()); + + exit(EXIT_SUCCESS); +}