mirror of
https://github.com/Zygo/bees.git
synced 2025-05-17 13:25:45 +02:00
Task started out as a self-organizing parallel-make algorithm, but ended up becoming a half-broken wait-die algorithm. When a contended object is already locked, Tasks enter a FIFO queue to restart and acquire the lock. This is the "die" part of wait-die (all locks on an Exclusion are non-blocking, so no Task ever does "wait"). The lock queue is FIFO wrt _lock acquisition order_, not _Task age_ as required by the wait-die algorithm. Make it a 25%-broken wait-die algorithm by sorting the Tasks on lock queues in order of Task ID, i.e. oldest-first, or FIFO wrt Task age. This ensures the oldest Task waiting for an object is the one to get it when it becomes available, as expected from the wait-die algorithm. This should reduce the amount of time Tasks spend on the execution queue, and reduce memory usage by avoiding the accumulation of Tasks that cannot make forward progress. Note that turning `TaskQueue` into an ordered container would have undesirable side-effects: * `std::list` has some useful properties wrt stability of object location and cost of splicing. Other containers may not have these, and `std::list` does have a `sort` method. * Some Task objects are created at the beginning and reused continually, but we really do want those Tasks to be executed in FIFO order wrt submission, not Task ID. We can exclude these tasks by only doing the sorting when a Task is queued for an Exclusin object. Signed-off-by: Zygo Blaxell <bees@furryterror.org>
189 lines
5.3 KiB
C++
189 lines
5.3 KiB
C++
#ifndef CRUCIBLE_TASK_H
|
|
#define CRUCIBLE_TASK_H
|
|
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <mutex>
|
|
#include <ostream>
|
|
#include <string>
|
|
|
|
namespace crucible {
|
|
using namespace std;
|
|
|
|
class TaskState;
|
|
|
|
using TaskId = uint64_t;
|
|
|
|
/// A unit of work to be scheduled by TaskMaster.
|
|
class Task {
|
|
shared_ptr<TaskState> m_task_state;
|
|
|
|
Task(shared_ptr<TaskState> pts);
|
|
|
|
public:
|
|
|
|
/// Create empty Task object.
|
|
Task() = default;
|
|
|
|
/// Create Task object containing closure and description.
|
|
Task(string title, function<void()> exec_fn);
|
|
|
|
/// Schedule Task for at most one future execution.
|
|
/// May run Task in current thread or in other thread.
|
|
/// May run Task before or after returning.
|
|
/// Schedules Task at the end of the global execution queue.
|
|
///
|
|
/// Only one instance of a Task may execute at a time.
|
|
/// If a Task is already scheduled, run() does nothing.
|
|
/// If a Task is already running when a new instance reaches
|
|
/// the front of the queue, the new instance will execute
|
|
/// after the current instance exits.
|
|
void run() const;
|
|
|
|
/// Schedule task to run when no other Task is available.
|
|
void idle() const;
|
|
|
|
/// Schedule Task to run after this Task has run or
|
|
/// been destroyed.
|
|
void append(const Task &task) const;
|
|
|
|
/// Schedule Task to run after this Task has run or
|
|
/// been destroyed, in Task ID order.
|
|
void insert(const Task &task) const;
|
|
|
|
/// Describe Task as text.
|
|
string title() const;
|
|
|
|
/// Returns currently executing task if called from exec_fn.
|
|
/// Usually used to reschedule the currently executing Task.
|
|
static Task current_task();
|
|
|
|
/// Returns number of currently existing Task objects.
|
|
/// Good for spotting leaks.
|
|
static size_t instance_count();
|
|
|
|
/// Ordering operator for containers
|
|
bool operator<(const Task &that) const;
|
|
|
|
/// Null test
|
|
operator bool() const;
|
|
|
|
/// Unique non-repeating(ish) ID for task
|
|
TaskId id() const;
|
|
};
|
|
|
|
ostream &operator<<(ostream &os, const Task &task);
|
|
|
|
class TaskMaster {
|
|
public:
|
|
/// Blocks until the running thread count reaches this number
|
|
static void set_thread_count(size_t threads);
|
|
|
|
/// Sets minimum thread count when load average tracking enabled
|
|
static void set_thread_min_count(size_t min_threads);
|
|
|
|
/// Calls set_thread_count with default
|
|
static void set_thread_count();
|
|
|
|
/// Creates thread to track load average and adjust thread count dynamically
|
|
static void set_loadavg_target(double target);
|
|
|
|
/// Writes the current non-executing Task queue
|
|
static ostream & print_queue(ostream &);
|
|
|
|
/// Writes the current executing Task for each worker
|
|
static ostream & print_workers(ostream &);
|
|
|
|
/// Gets the current number of queued Tasks
|
|
static size_t get_queue_count();
|
|
|
|
/// Gets the current number of active workers
|
|
static size_t get_thread_count();
|
|
|
|
/// Gets the current load tracking statistics
|
|
struct LoadStats {
|
|
/// Current load extracted from last two 5-second load average samples
|
|
double current_load;
|
|
/// Target thread count computed from previous thread count and current load
|
|
double thread_target;
|
|
/// Load average for last 60 seconds
|
|
double loadavg;
|
|
};
|
|
static LoadStats get_current_load();
|
|
|
|
/// Drop the current queue and discard new Tasks without
|
|
/// running them. Currently executing tasks are not
|
|
/// affected (use set_thread_count(0) to wait for those
|
|
/// to complete).
|
|
static void cancel();
|
|
|
|
/// Stop running any new Tasks. All existing
|
|
/// Consumer threads will exit. Does not affect queue.
|
|
/// Does not wait for threads to exit. Reversible.
|
|
static void pause(bool paused = true);
|
|
};
|
|
|
|
class BarrierState;
|
|
|
|
/// Barrier delays the execution of one or more Tasks.
|
|
/// The Tasks are executed when the last shared reference to the
|
|
/// BarrierState is released. Copies of Barrier objects refer
|
|
/// to the same Barrier state.
|
|
class Barrier {
|
|
shared_ptr<BarrierState> m_barrier_state;
|
|
|
|
public:
|
|
Barrier();
|
|
|
|
/// Schedule a task for execution when last Barrier is released.
|
|
void insert_task(Task t);
|
|
|
|
/// Release this reference to the barrier state.
|
|
/// Last released reference executes the task.
|
|
/// Barrier can only be released once, after which the
|
|
/// object can no longer be used.
|
|
void release();
|
|
};
|
|
|
|
class ExclusionLock {
|
|
shared_ptr<Task> m_owner;
|
|
ExclusionLock(shared_ptr<Task> owner);
|
|
friend class Exclusion;
|
|
public:
|
|
/// Explicit default constructor because we have other kinds
|
|
ExclusionLock() = default;
|
|
|
|
/// Release this Lock immediately and permanently
|
|
void release();
|
|
|
|
/// Test for locked state
|
|
operator bool() const;
|
|
};
|
|
|
|
class Exclusion {
|
|
mutex m_mutex;
|
|
weak_ptr<Task> m_owner;
|
|
|
|
public:
|
|
/// Attempt to obtain a Lock. If successful, current Task
|
|
/// owns the Lock until the ExclusionLock is released
|
|
/// (it is the ExclusionLock that owns the lock, so it can
|
|
/// be passed to other Tasks or threads, but this is not
|
|
/// recommended practice).
|
|
/// If not successful, the argument Task is appended to the
|
|
/// task that currently holds the lock. Current task is
|
|
/// expected to immediately release any other ExclusionLock
|
|
/// objects it holds, and exit its Task function.
|
|
ExclusionLock try_lock(const Task &task);
|
|
|
|
};
|
|
|
|
/// Wrapper around pthread_setname_np which handles length limits
|
|
void pthread_setname(const string &name);
|
|
|
|
/// Wrapper around pthread_getname_np for symmetry
|
|
string pthread_getname();
|
|
}
|
|
|
|
#endif // CRUCIBLE_TASK_H
|