mirror of
https://github.com/Zygo/bees.git
synced 2025-05-17 21:35:45 +02:00
It turns out I've been using pthread_setname_np wrong the whole time: * on Linux, the thread name length is 15 characters. TASK_COMM_LEN is 16 bytes, and the last one is always 0. This is now hardcoded in many places and cannot be changed. * pthread_setname_np doesn't return -errno, so DIE_IF_MINUS_ERRNO was the wrong macro. On the other hand, we never want to do anything differently when pthread_setname_np fails, so we never needed to check the return value. Also, libc silently ignores attempts to set the thread name when it is too long. That's almost certainly a libc bug, but libc probably suppresses the error result for the same reasons I ignore the error result. Wrap the pthread_setname function with a C++ std::string overload that truncates the argument at 15 characters, so we at least get the first part of the task name in the thread name field. Later commits can deal with making the bees thread names shorter. Also wrap pthread_getname for symmetry. Signed-off-by: Zygo Blaxell <bees@furryterror.org>
924 lines
21 KiB
C++
924 lines
21 KiB
C++
#include "crucible/task.h"
|
|
|
|
#include "crucible/error.h"
|
|
#include "crucible/process.h"
|
|
#include "crucible/time.h"
|
|
|
|
#include <atomic>
|
|
#include <condition_variable>
|
|
#include <list>
|
|
#include <map>
|
|
#include <mutex>
|
|
#include <set>
|
|
#include <thread>
|
|
|
|
#include <cassert>
|
|
#include <cmath>
|
|
|
|
namespace crucible {
|
|
using namespace std;
|
|
|
|
static const size_t thread_name_length = 15; // TASK_COMM_LEN on Linux
|
|
|
|
void
|
|
pthread_setname(const string &name)
|
|
{
|
|
auto name_copy = name.substr(0, thread_name_length);
|
|
// Don't care if a debugging facility fails
|
|
pthread_setname_np(pthread_self(), name_copy.c_str());
|
|
}
|
|
|
|
string
|
|
pthread_getname()
|
|
{
|
|
char buf[thread_name_length + 1] = { 0 };
|
|
// We'll get an empty name if this fails...
|
|
pthread_getname_np(pthread_self(), buf, sizeof(buf));
|
|
// ...or at least null-terminated garbage
|
|
buf[thread_name_length] = '\0';
|
|
return buf;
|
|
}
|
|
|
|
class TaskState;
|
|
using TaskStatePtr = shared_ptr<TaskState>;
|
|
using TaskStateWeak = weak_ptr<TaskState>;
|
|
|
|
class TaskConsumer;
|
|
using TaskConsumerPtr = shared_ptr<TaskConsumer>;
|
|
using TaskConsumerWeak = weak_ptr<TaskConsumer>;
|
|
|
|
using TaskQueue = list<TaskStatePtr>;
|
|
|
|
static thread_local TaskStatePtr tl_current_task;
|
|
|
|
/// because we don't want to bump -std=c++-17 just to get scoped_lock.
|
|
/// Also we don't want to self-deadlock if both mutexes are the same mutex.
|
|
class PairLock {
|
|
unique_lock<mutex> m_lock1, m_lock2;
|
|
public:
|
|
PairLock(mutex &m1, mutex &m2) :
|
|
m_lock1(m1, defer_lock),
|
|
m_lock2(m2, defer_lock)
|
|
{
|
|
if (&m1 == &m2) {
|
|
m_lock1.lock();
|
|
} else {
|
|
lock(m_lock1, m_lock2);
|
|
}
|
|
}
|
|
};
|
|
|
|
class TaskState : public enable_shared_from_this<TaskState> {
|
|
mutex m_mutex;
|
|
const function<void()> m_exec_fn;
|
|
const string m_title;
|
|
|
|
/// Tasks to be executed after the current task is executed
|
|
list<TaskStatePtr> m_post_exec_queue;
|
|
|
|
/// Set by run() and append(). Cleared by exec().
|
|
bool m_run_now = false;
|
|
|
|
/// Set when task starts execution by exec().
|
|
/// Cleared when exec() ends.
|
|
bool m_is_running = false;
|
|
|
|
/// Sequential identifier for next task
|
|
static atomic<TaskId> s_next_id;
|
|
|
|
/// Sequential identifier for next task
|
|
static atomic<size_t> s_instance_count;
|
|
|
|
/// Identifier for this task
|
|
TaskId m_id;
|
|
|
|
/// Backend for append()
|
|
void append_nolock(const TaskStatePtr &task);
|
|
|
|
/// Clear the post-execution queue. Recursively destroys post-exec
|
|
/// queues of all tasks in post-exec queue. Useful only when
|
|
/// cancelling the entire task queue.
|
|
void clear();
|
|
|
|
friend class TaskMasterState;
|
|
friend class TaskConsumer;
|
|
|
|
/// Clear any TaskQueue, not just this one.
|
|
static void clear_queue(TaskQueue &tq);
|
|
|
|
/// Rescue any TaskQueue, not just this one.
|
|
static void rescue_queue(TaskQueue &tq);
|
|
|
|
TaskState &operator=(const TaskState &) = delete;
|
|
TaskState(const TaskState &) = delete;
|
|
TaskState(TaskState &&) = delete;
|
|
|
|
public:
|
|
~TaskState();
|
|
TaskState(string title, function<void()> exec_fn);
|
|
|
|
/// Run the task at most one more time. If task has
|
|
/// already started running, a new instance is scheduled.
|
|
/// If an instance is already scheduled by run() or
|
|
/// append(), does nothing. Otherwise, schedules a new
|
|
/// instance at the end of TaskMaster's global queue.
|
|
void run();
|
|
|
|
/// Execute task immediately in current thread if it is not already
|
|
/// executing in another thread; otherwise, append the current task
|
|
/// to itself to be executed immediately in the other thread.
|
|
void exec();
|
|
|
|
/// Return title of task.
|
|
string title() const;
|
|
|
|
/// Return ID of task.
|
|
TaskId id() const;
|
|
|
|
/// Queue task to execute after current task finishes executing
|
|
/// or is destroyed.
|
|
void append(const TaskStatePtr &task);
|
|
|
|
/// How masy Tasks are there? Good for catching leaks
|
|
static size_t instance_count();
|
|
};
|
|
|
|
atomic<TaskId> TaskState::s_next_id;
|
|
atomic<size_t> TaskState::s_instance_count;
|
|
|
|
class TaskMasterState : public enable_shared_from_this<TaskMasterState> {
|
|
mutex m_mutex;
|
|
condition_variable m_condvar;
|
|
TaskQueue m_queue;
|
|
size_t m_thread_max;
|
|
size_t m_thread_min = 0;
|
|
set<TaskConsumerPtr> m_threads;
|
|
shared_ptr<thread> m_load_tracking_thread;
|
|
double m_load_target = 0;
|
|
double m_prev_loadavg;
|
|
size_t m_configured_thread_max;
|
|
double m_thread_target;
|
|
bool m_cancelled = false;
|
|
bool m_paused = false;
|
|
TaskMaster::LoadStats m_load_stats;
|
|
|
|
friend class TaskConsumer;
|
|
friend class TaskMaster;
|
|
|
|
void start_threads_nolock();
|
|
void start_stop_threads();
|
|
void set_thread_count(size_t thread_max);
|
|
void set_thread_min_count(size_t thread_min);
|
|
void adjust_thread_count();
|
|
size_t calculate_thread_count_nolock();
|
|
void set_loadavg_target(double target);
|
|
void loadavg_thread_fn();
|
|
void cancel();
|
|
void pause(bool paused = true);
|
|
|
|
TaskMasterState &operator=(const TaskMasterState &) = delete;
|
|
TaskMasterState(const TaskMasterState &) = delete;
|
|
|
|
public:
|
|
~TaskMasterState();
|
|
TaskMasterState(size_t thread_max = thread::hardware_concurrency());
|
|
|
|
static void push_back(const TaskStatePtr &task);
|
|
static void push_front(TaskQueue &queue);
|
|
size_t get_queue_count();
|
|
size_t get_thread_count();
|
|
static TaskMaster::LoadStats get_current_load();
|
|
};
|
|
|
|
class TaskConsumer : public enable_shared_from_this<TaskConsumer> {
|
|
shared_ptr<TaskMasterState> m_master;
|
|
TaskStatePtr m_current_task;
|
|
|
|
friend class TaskState;
|
|
TaskQueue m_local_queue;
|
|
|
|
void consumer_thread();
|
|
shared_ptr<TaskState> current_task_locked();
|
|
friend class TaskMaster;
|
|
friend class TaskMasterState;
|
|
public:
|
|
TaskConsumer(const shared_ptr<TaskMasterState> &tms);
|
|
shared_ptr<TaskState> current_task();
|
|
private:
|
|
// Make sure this gets constructed _last_
|
|
shared_ptr<thread> m_thread;
|
|
};
|
|
|
|
static thread_local TaskConsumerPtr tl_current_consumer;
|
|
|
|
static auto s_tms = make_shared<TaskMasterState>();
|
|
|
|
void
|
|
TaskState::rescue_queue(TaskQueue &queue)
|
|
{
|
|
if (queue.empty()) {
|
|
return;
|
|
}
|
|
const auto tlcc = tl_current_consumer;
|
|
if (tlcc) {
|
|
// We are executing under a TaskConsumer, splice our post-exec queue at front.
|
|
// No locks needed because we are using only thread-local objects.
|
|
tlcc->m_local_queue.splice(tlcc->m_local_queue.begin(), queue);
|
|
} else {
|
|
// We are not executing under a TaskConsumer.
|
|
// If there is only one task, then just insert it at the front of the queue.
|
|
if (queue.size() == 1) {
|
|
TaskMasterState::push_front(queue);
|
|
} else {
|
|
// If there are multiple tasks, create a new task to wrap our post-exec queue,
|
|
// then push it to the front of the global queue using normal locking methods.
|
|
TaskStatePtr rescue_task(make_shared<TaskState>("rescue_task", [](){}));
|
|
swap(rescue_task->m_post_exec_queue, queue);
|
|
TaskQueue tq_one { rescue_task };
|
|
TaskMasterState::push_front(tq_one);
|
|
}
|
|
}
|
|
assert(queue.empty());
|
|
}
|
|
|
|
TaskState::~TaskState()
|
|
{
|
|
--s_instance_count;
|
|
unique_lock<mutex> lock(m_mutex);
|
|
// If any dependent Tasks were appended since the last exec, run them now
|
|
TaskState::rescue_queue(m_post_exec_queue);
|
|
}
|
|
|
|
TaskState::TaskState(string title, function<void()> exec_fn) :
|
|
m_exec_fn(exec_fn),
|
|
m_title(title),
|
|
m_id(++s_next_id)
|
|
{
|
|
THROW_CHECK0(invalid_argument, !m_title.empty());
|
|
++s_instance_count;
|
|
}
|
|
|
|
size_t
|
|
TaskState::instance_count()
|
|
{
|
|
return s_instance_count;
|
|
}
|
|
|
|
size_t
|
|
Task::instance_count()
|
|
{
|
|
return TaskState::instance_count();
|
|
}
|
|
|
|
void
|
|
TaskState::clear()
|
|
{
|
|
TaskQueue post_exec_queue;
|
|
unique_lock<mutex> lock(m_mutex);
|
|
swap(post_exec_queue, m_post_exec_queue);
|
|
lock.unlock();
|
|
clear_queue(post_exec_queue);
|
|
}
|
|
|
|
void
|
|
TaskState::clear_queue(TaskQueue &tq)
|
|
{
|
|
for (auto &i : tq) {
|
|
i->clear();
|
|
}
|
|
tq.clear();
|
|
}
|
|
|
|
void
|
|
TaskState::append_nolock(const TaskStatePtr &task)
|
|
{
|
|
THROW_CHECK0(invalid_argument, task);
|
|
m_post_exec_queue.push_back(task);
|
|
}
|
|
|
|
void
|
|
TaskState::append(const TaskStatePtr &task)
|
|
{
|
|
THROW_CHECK0(invalid_argument, task);
|
|
PairLock lock(m_mutex, task->m_mutex);
|
|
if (!task->m_run_now) {
|
|
task->m_run_now = true;
|
|
append_nolock(task);
|
|
}
|
|
}
|
|
|
|
void
|
|
TaskState::exec()
|
|
{
|
|
THROW_CHECK0(invalid_argument, m_exec_fn);
|
|
THROW_CHECK0(invalid_argument, !m_title.empty());
|
|
|
|
unique_lock<mutex> lock(m_mutex);
|
|
if (m_is_running) {
|
|
append_nolock(shared_from_this());
|
|
return;
|
|
} else {
|
|
m_run_now = false;
|
|
m_is_running = true;
|
|
}
|
|
|
|
TaskStatePtr this_task = shared_from_this();
|
|
swap(this_task, tl_current_task);
|
|
lock.unlock();
|
|
|
|
const auto old_thread_name = pthread_getname();
|
|
pthread_setname(m_title);
|
|
|
|
catch_all([&]() {
|
|
m_exec_fn();
|
|
});
|
|
|
|
pthread_setname(old_thread_name);
|
|
|
|
lock.lock();
|
|
swap(this_task, tl_current_task);
|
|
m_is_running = false;
|
|
|
|
// Splice task post_exec queue at front of local queue
|
|
TaskState::rescue_queue(m_post_exec_queue);
|
|
}
|
|
|
|
string
|
|
TaskState::title() const
|
|
{
|
|
THROW_CHECK0(runtime_error, !m_title.empty());
|
|
return m_title;
|
|
}
|
|
|
|
TaskId
|
|
TaskState::id() const
|
|
{
|
|
return m_id;
|
|
}
|
|
|
|
void
|
|
TaskState::run()
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
if (m_run_now) {
|
|
return;
|
|
}
|
|
m_run_now = true;
|
|
TaskMasterState::push_back(shared_from_this());
|
|
}
|
|
|
|
TaskMasterState::TaskMasterState(size_t thread_max) :
|
|
m_thread_max(thread_max),
|
|
m_configured_thread_max(thread_max),
|
|
m_thread_target(thread_max),
|
|
m_load_stats(TaskMaster::LoadStats { 0 })
|
|
{
|
|
}
|
|
|
|
void
|
|
TaskMasterState::start_threads_nolock()
|
|
{
|
|
while (m_threads.size() < m_thread_max && !m_paused) {
|
|
m_threads.insert(make_shared<TaskConsumer>(shared_from_this()));
|
|
}
|
|
}
|
|
|
|
void
|
|
TaskMasterState::start_stop_threads()
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
while (m_threads.size() != m_thread_max) {
|
|
if (m_threads.size() < m_thread_max) {
|
|
m_threads.insert(make_shared<TaskConsumer>(shared_from_this()));
|
|
} else if (m_threads.size() > m_thread_max) {
|
|
m_condvar.wait(lock);
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
TaskMasterState::push_back(const TaskStatePtr &task)
|
|
{
|
|
THROW_CHECK0(runtime_error, task);
|
|
unique_lock<mutex> lock(s_tms->m_mutex);
|
|
if (s_tms->m_cancelled) {
|
|
task->clear();
|
|
return;
|
|
}
|
|
s_tms->m_queue.push_back(task);
|
|
s_tms->m_condvar.notify_all();
|
|
s_tms->start_threads_nolock();
|
|
}
|
|
|
|
void
|
|
TaskMasterState::push_front(TaskQueue &queue)
|
|
{
|
|
if (queue.empty()) {
|
|
return;
|
|
}
|
|
unique_lock<mutex> lock(s_tms->m_mutex);
|
|
if (s_tms->m_cancelled) {
|
|
TaskState::clear_queue(queue);
|
|
return;
|
|
}
|
|
s_tms->m_queue.splice(s_tms->m_queue.begin(), queue);
|
|
s_tms->m_condvar.notify_all();
|
|
s_tms->start_threads_nolock();
|
|
}
|
|
|
|
TaskMasterState::~TaskMasterState()
|
|
{
|
|
set_thread_count(0);
|
|
}
|
|
|
|
size_t
|
|
TaskMaster::get_queue_count()
|
|
{
|
|
unique_lock<mutex> lock(s_tms->m_mutex);
|
|
return s_tms->m_queue.size();
|
|
}
|
|
|
|
size_t
|
|
TaskMaster::get_thread_count()
|
|
{
|
|
unique_lock<mutex> lock(s_tms->m_mutex);
|
|
return s_tms->m_threads.size();
|
|
}
|
|
|
|
TaskMaster::LoadStats
|
|
TaskMaster::get_current_load()
|
|
{
|
|
unique_lock<mutex> lock(s_tms->m_mutex);
|
|
return s_tms->m_load_stats;
|
|
}
|
|
|
|
ostream &
|
|
TaskMaster::print_queue(ostream &os)
|
|
{
|
|
unique_lock<mutex> lock(s_tms->m_mutex);
|
|
os << "Queue (size " << s_tms->m_queue.size() << "):" << endl;
|
|
size_t counter = 0;
|
|
for (auto i : s_tms->m_queue) {
|
|
os << "Queue #" << ++counter << " Task ID " << i->id() << " " << i->title() << endl;
|
|
}
|
|
return os << "Queue End" << endl;
|
|
}
|
|
|
|
ostream &
|
|
TaskMaster::print_workers(ostream &os)
|
|
{
|
|
unique_lock<mutex> lock(s_tms->m_mutex);
|
|
os << "Workers (size " << s_tms->m_threads.size() << "):" << endl;
|
|
size_t counter = 0;
|
|
for (auto i : s_tms->m_threads) {
|
|
os << "Worker #" << ++counter << " ";
|
|
auto task = i->current_task_locked();
|
|
if (task) {
|
|
os << "Task ID " << task->id() << " " << task->title();
|
|
} else {
|
|
os << "(idle)";
|
|
}
|
|
os << endl;
|
|
}
|
|
return os << "Workers End" << endl;
|
|
}
|
|
|
|
size_t
|
|
TaskMasterState::calculate_thread_count_nolock()
|
|
{
|
|
if (m_paused) {
|
|
// No threads running while paused or cancelled
|
|
return 0;
|
|
}
|
|
|
|
if (m_load_target == 0) {
|
|
// No limits, no stats, use configured thread count
|
|
return m_configured_thread_max;
|
|
}
|
|
|
|
if (m_configured_thread_max == 0) {
|
|
// Not a lot of choice here, and zeros break the algorithm
|
|
return 0;
|
|
}
|
|
|
|
const double loadavg = getloadavg1();
|
|
|
|
static const double load_exp = exp(-5.0 / 60.0);
|
|
|
|
// Averages are fun, but want to know the load from the last 5 seconds.
|
|
// Invert the load average function:
|
|
// LA = LA * load_exp + N * (1 - load_exp)
|
|
// LA2 - LA1 = LA1 * load_exp + N * (1 - load_exp) - LA1
|
|
// LA2 - LA1 + LA1 = LA1 * load_exp + N * (1 - load_exp)
|
|
// LA2 - LA1 + LA1 - LA1 * load_exp = N * (1 - load_exp)
|
|
// LA2 - LA1 * load_exp = N * (1 - load_exp)
|
|
// LA2 / (1 - load_exp) - (LA1 * load_exp / 1 - load_exp) = N
|
|
// (LA2 - LA1 * load_exp) / (1 - load_exp) = N
|
|
// except for rounding error which might make this just a bit below zero.
|
|
const double current_load = max(0.0, (loadavg - m_prev_loadavg * load_exp) / (1 - load_exp));
|
|
|
|
m_prev_loadavg = loadavg;
|
|
|
|
const double load_deficit = m_load_target - loadavg;
|
|
if (load_deficit > 0) {
|
|
// Load is too low, solve by adding another worker
|
|
m_thread_target += load_deficit / 3;
|
|
} else if (load_deficit < 0) {
|
|
// Load is too high, solve by removing all known excess tasks
|
|
m_thread_target += load_deficit;
|
|
}
|
|
|
|
m_load_stats = TaskMaster::LoadStats {
|
|
.current_load = current_load,
|
|
.thread_target = m_thread_target,
|
|
.loadavg = loadavg,
|
|
};
|
|
|
|
// Cannot exceed configured maximum thread count or less than zero
|
|
m_thread_target = min(max(0.0, m_thread_target), double(m_configured_thread_max));
|
|
|
|
// Convert to integer but keep within range
|
|
const size_t rv = max(m_thread_min, min(size_t(ceil(m_thread_target)), m_configured_thread_max));
|
|
|
|
return rv;
|
|
}
|
|
|
|
void
|
|
TaskMasterState::adjust_thread_count()
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
size_t new_thread_max = calculate_thread_count_nolock();
|
|
size_t old_thread_max = m_thread_max;
|
|
m_thread_max = new_thread_max;
|
|
|
|
// If we are reducing the number of threads we have to wake them up so they can exit their loops
|
|
// If we are increasing the number of threads we have to notify start_stop_threads it can stop waiting for threads to stop
|
|
if (new_thread_max != old_thread_max) {
|
|
m_condvar.notify_all();
|
|
start_threads_nolock();
|
|
}
|
|
}
|
|
|
|
void
|
|
TaskMasterState::set_thread_count(size_t thread_max)
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
m_configured_thread_max = thread_max;
|
|
lock.unlock();
|
|
adjust_thread_count();
|
|
start_stop_threads();
|
|
}
|
|
|
|
void
|
|
TaskMaster::set_thread_count(size_t thread_max)
|
|
{
|
|
s_tms->set_thread_count(thread_max);
|
|
}
|
|
|
|
void
|
|
TaskMasterState::cancel()
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
m_paused = true;
|
|
m_cancelled = true;
|
|
decltype(m_queue) empty_queue;
|
|
m_queue.swap(empty_queue);
|
|
m_condvar.notify_all();
|
|
lock.unlock();
|
|
TaskState::clear_queue(empty_queue);
|
|
}
|
|
|
|
void
|
|
TaskMaster::cancel()
|
|
{
|
|
s_tms->cancel();
|
|
}
|
|
|
|
void
|
|
TaskMasterState::pause(const bool paused)
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
m_paused = paused;
|
|
m_condvar.notify_all();
|
|
lock.unlock();
|
|
}
|
|
|
|
void
|
|
TaskMaster::pause(const bool paused)
|
|
{
|
|
s_tms->pause(paused);
|
|
}
|
|
|
|
void
|
|
TaskMasterState::set_thread_min_count(size_t thread_min)
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
m_thread_min = thread_min;
|
|
lock.unlock();
|
|
adjust_thread_count();
|
|
start_stop_threads();
|
|
}
|
|
|
|
void
|
|
TaskMaster::set_thread_min_count(size_t thread_min)
|
|
{
|
|
s_tms->set_thread_min_count(thread_min);
|
|
}
|
|
|
|
void
|
|
TaskMasterState::loadavg_thread_fn()
|
|
{
|
|
pthread_setname("load_tracker");
|
|
while (!m_cancelled) {
|
|
adjust_thread_count();
|
|
nanosleep(5.0);
|
|
}
|
|
}
|
|
|
|
void
|
|
TaskMasterState::set_loadavg_target(double target)
|
|
{
|
|
THROW_CHECK1(out_of_range, target, target >= 0);
|
|
|
|
unique_lock<mutex> lock(m_mutex);
|
|
if (m_cancelled) {
|
|
return;
|
|
}
|
|
m_load_target = target;
|
|
m_prev_loadavg = getloadavg1();
|
|
|
|
if (target && !m_load_tracking_thread) {
|
|
m_load_tracking_thread = make_shared<thread>([=] () { loadavg_thread_fn(); });
|
|
m_load_tracking_thread->detach();
|
|
}
|
|
}
|
|
|
|
void
|
|
TaskMaster::set_loadavg_target(double target)
|
|
{
|
|
s_tms->set_loadavg_target(target);
|
|
}
|
|
|
|
void
|
|
TaskMaster::set_thread_count()
|
|
{
|
|
set_thread_count(thread::hardware_concurrency());
|
|
}
|
|
|
|
Task::Task(shared_ptr<TaskState> pts) :
|
|
m_task_state(pts)
|
|
{
|
|
}
|
|
|
|
Task::Task(string title, function<void()> exec_fn) :
|
|
m_task_state(make_shared<TaskState>(title, exec_fn))
|
|
{
|
|
}
|
|
|
|
void
|
|
Task::run() const
|
|
{
|
|
THROW_CHECK0(runtime_error, m_task_state);
|
|
m_task_state->run();
|
|
}
|
|
|
|
void
|
|
Task::append(const Task &that) const
|
|
{
|
|
THROW_CHECK0(runtime_error, m_task_state);
|
|
THROW_CHECK0(runtime_error, that);
|
|
m_task_state->append(that.m_task_state);
|
|
}
|
|
|
|
Task
|
|
Task::current_task()
|
|
{
|
|
return Task(tl_current_task);
|
|
}
|
|
|
|
string
|
|
Task::title() const
|
|
{
|
|
THROW_CHECK0(runtime_error, m_task_state);
|
|
return m_task_state->title();
|
|
}
|
|
|
|
ostream &
|
|
operator<<(ostream &os, const Task &task)
|
|
{
|
|
return os << task.title();
|
|
};
|
|
|
|
TaskId
|
|
Task::id() const
|
|
{
|
|
THROW_CHECK0(runtime_error, m_task_state);
|
|
return m_task_state->id();
|
|
}
|
|
|
|
bool
|
|
Task::operator<(const Task &that) const
|
|
{
|
|
return id() < that.id();
|
|
}
|
|
|
|
Task::operator bool() const
|
|
{
|
|
return !!m_task_state;
|
|
}
|
|
|
|
shared_ptr<TaskState>
|
|
TaskConsumer::current_task_locked()
|
|
{
|
|
return m_current_task;
|
|
}
|
|
|
|
shared_ptr<TaskState>
|
|
TaskConsumer::current_task()
|
|
{
|
|
unique_lock<mutex> lock(m_master->m_mutex);
|
|
return current_task_locked();
|
|
}
|
|
|
|
void
|
|
TaskConsumer::consumer_thread()
|
|
{
|
|
// Keep a copy because we will be destroying *this later
|
|
const auto master_copy = m_master;
|
|
|
|
// Constructor is running with master locked.
|
|
// Wait until that is done before trying to do anything.
|
|
unique_lock<mutex> lock(master_copy->m_mutex);
|
|
|
|
// Detach thread so destructor doesn't call terminate
|
|
m_thread->detach();
|
|
|
|
// Set thread name so it isn't empty or the name of some other thread
|
|
pthread_setname("task_consumer");
|
|
|
|
// It is now safe to access our own shared_ptr
|
|
TaskConsumerPtr this_consumer = shared_from_this();
|
|
swap(this_consumer, tl_current_consumer);
|
|
|
|
while (!master_copy->m_paused) {
|
|
if (master_copy->m_thread_max < master_copy->m_threads.size()) {
|
|
// We are one of too many threads, exit now
|
|
break;
|
|
}
|
|
|
|
if (!m_local_queue.empty()) {
|
|
m_current_task = *m_local_queue.begin();
|
|
m_local_queue.pop_front();
|
|
} else if (!master_copy->m_queue.empty()) {
|
|
m_current_task = *master_copy->m_queue.begin();
|
|
master_copy->m_queue.pop_front();
|
|
} else {
|
|
master_copy->m_condvar.wait(lock);
|
|
continue;
|
|
}
|
|
|
|
// Execute task without lock
|
|
lock.unlock();
|
|
catch_all([&]() {
|
|
m_current_task->exec();
|
|
});
|
|
|
|
// Update m_current_task with lock
|
|
TaskStatePtr hold_task;
|
|
lock.lock();
|
|
swap(hold_task, m_current_task);
|
|
|
|
// Destroy hold_task without lock
|
|
lock.unlock();
|
|
hold_task.reset();
|
|
|
|
// Invariant: lock held
|
|
lock.lock();
|
|
}
|
|
|
|
// There is no longer a current consumer, but hold our own shared
|
|
// state so it's still there in the destructor
|
|
swap(this_consumer, tl_current_consumer);
|
|
assert(!tl_current_consumer);
|
|
|
|
// Release lock to rescue queue (may attempt to queue a new task at TaskMaster).
|
|
// rescue_queue normally sends tasks to the local queue of the current TaskConsumer thread,
|
|
// but we just disconnected ourselves from that.
|
|
lock.unlock();
|
|
TaskState::rescue_queue(m_local_queue);
|
|
|
|
// Hold lock so we can erase ourselves
|
|
lock.lock();
|
|
|
|
// Fun fact: shared_from_this() isn't usable until the constructor returns...
|
|
master_copy->m_threads.erase(shared_from_this());
|
|
master_copy->m_condvar.notify_all();
|
|
}
|
|
|
|
TaskConsumer::TaskConsumer(const shared_ptr<TaskMasterState> &tms) :
|
|
m_master(tms)
|
|
{
|
|
m_thread = make_shared<thread>([=](){ consumer_thread(); });
|
|
}
|
|
|
|
class BarrierState {
|
|
mutex m_mutex;
|
|
set<Task> m_tasks;
|
|
|
|
void release();
|
|
public:
|
|
~BarrierState();
|
|
void insert_task(Task t);
|
|
};
|
|
|
|
void
|
|
BarrierState::release()
|
|
{
|
|
set<Task> tasks_local;
|
|
unique_lock<mutex> lock(m_mutex);
|
|
swap(tasks_local, m_tasks);
|
|
lock.unlock();
|
|
for (const auto &i : tasks_local) {
|
|
i.run();
|
|
}
|
|
}
|
|
|
|
BarrierState::~BarrierState()
|
|
{
|
|
release();
|
|
}
|
|
|
|
void
|
|
BarrierState::insert_task(Task t)
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
m_tasks.insert(t);
|
|
}
|
|
|
|
Barrier::Barrier() :
|
|
m_barrier_state(make_shared<BarrierState>())
|
|
{
|
|
}
|
|
|
|
void
|
|
Barrier::insert_task(Task t)
|
|
{
|
|
m_barrier_state->insert_task(t);
|
|
}
|
|
|
|
void
|
|
Barrier::release()
|
|
{
|
|
m_barrier_state.reset();
|
|
}
|
|
|
|
ExclusionLock::ExclusionLock(shared_ptr<Task> owner) :
|
|
m_owner(owner)
|
|
{
|
|
}
|
|
|
|
void
|
|
ExclusionLock::release()
|
|
{
|
|
m_owner.reset();
|
|
}
|
|
|
|
void
|
|
Exclusion::insert_task(const Task &task)
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
const auto sp = m_owner.lock();
|
|
lock.unlock();
|
|
if (sp) {
|
|
// If Exclusion is locked then queue task for release;
|
|
sp->append(task);
|
|
} else {
|
|
// otherwise, run the inserted task immediately
|
|
task.run();
|
|
}
|
|
}
|
|
|
|
ExclusionLock
|
|
Exclusion::try_lock(const Task &task)
|
|
{
|
|
unique_lock<mutex> lock(m_mutex);
|
|
const auto sp = m_owner.lock();
|
|
if (sp) {
|
|
if (task) {
|
|
sp->append(task);
|
|
}
|
|
return ExclusionLock();
|
|
} else {
|
|
const auto rv = make_shared<Task>(task);
|
|
m_owner = rv;
|
|
return ExclusionLock(rv);
|
|
}
|
|
}
|
|
|
|
ExclusionLock::operator bool() const
|
|
{
|
|
return !!m_owner;
|
|
}
|
|
|
|
}
|