1
0
mirror of https://github.com/Zygo/bees.git synced 2025-06-16 09:36:17 +02:00

bees: dynamic thread pool size based on system load average

Add -g / --loadavg-target parameter to track system load and add or
remove bees worker threads dynamically to keep system load close to the
loadavg target.  Thread count may vary from zero to the maximum
specified by -c or -C, and is adjusted every 5 seconds.

This is better than implementing a similar load average scheme from
outside of the process (though that is still possible) because the
in-process load tracker does not disrupt the performance timing feedback
mechanisms as a freezer cgroup or SIGSTOP would when controlling bees
from outside.  The internal load average tracker can also adjust the
number of active threads while an external tracker can only choose from
the maximum or zero.

Also fix a bug where a Task could deadlock waiting for itself to exit
if it tries to insert a new Task after the number of worker threads has
been set to zero.

Also correct usage message for --scan-mode (values are 0..2) since
we are touching adjacent lines anyway.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell
2018-07-08 21:20:15 -04:00
parent 96eb100ded
commit e66086516f
7 changed files with 203 additions and 24 deletions

View File

@ -3,8 +3,10 @@
#include "crucible/cleanup.h"
#include "crucible/error.h"
#include "crucible/process.h"
#include "crucible/time.h"
#include <atomic>
#include <cmath>
#include <condition_variable>
#include <list>
#include <map>
@ -42,12 +44,22 @@ namespace crucible {
list<shared_ptr<TaskState>> m_queue;
size_t m_thread_max;
set<shared_ptr<TaskConsumer>> m_threads;
shared_ptr<thread> m_load_tracking_thread;
double m_load_target = 0;
double m_prev_loadavg;
size_t m_configured_thread_max;
double m_thread_target;
friend class TaskConsumer;
friend class TaskMaster;
void start_threads_nolock();
void start_stop_threads();
void set_thread_count(size_t thread_max);
void adjust_thread_count();
size_t calculate_thread_count_nolock();
void set_loadavg_target(double target);
void loadavg_thread_fn();
public:
~TaskMasterState();
@ -69,6 +81,7 @@ namespace crucible {
TaskConsumer(weak_ptr<TaskMasterState> tms);
shared_ptr<TaskState> current_task();
friend class TaskMaster;
friend class TaskMasterState;
};
static shared_ptr<TaskMasterState> s_tms = make_shared<TaskMasterState>();
@ -118,19 +131,30 @@ namespace crucible {
}
TaskMasterState::TaskMasterState(size_t thread_max) :
m_thread_max(thread_max)
m_thread_max(thread_max),
m_configured_thread_max(thread_max),
m_thread_target(thread_max)
{
}
void
TaskMasterState::start_threads_nolock()
{
while (m_threads.size() < m_thread_max) {
m_threads.insert(make_shared<TaskConsumer>(shared_from_this()));
}
}
void
TaskMasterState::start_stop_threads()
{
unique_lock<mutex> lock(m_mutex);
while (m_threads.size() < m_thread_max) {
m_threads.insert(make_shared<TaskConsumer>(shared_from_this()));
}
while (m_threads.size() > m_thread_max) {
m_condvar.wait(lock);
while (m_threads.size() != m_thread_max) {
if (m_threads.size() < m_thread_max) {
m_threads.insert(make_shared<TaskConsumer>(shared_from_this()));
} else if (m_threads.size() > m_thread_max) {
m_condvar.wait(lock);
}
}
}
@ -138,20 +162,20 @@ namespace crucible {
TaskMasterState::push_back(shared_ptr<TaskState> task)
{
THROW_CHECK0(runtime_error, task);
s_tms->start_stop_threads();
unique_lock<mutex> lock(s_tms->m_mutex);
s_tms->m_queue.push_back(task);
s_tms->m_condvar.notify_all();
s_tms->start_threads_nolock();
}
void
TaskMasterState::push_front(shared_ptr<TaskState> task)
{
THROW_CHECK0(runtime_error, task);
s_tms->start_stop_threads();
unique_lock<mutex> lock(s_tms->m_mutex);
s_tms->m_queue.push_front(task);
s_tms->m_condvar.notify_all();
s_tms->start_threads_nolock();
}
TaskMasterState::~TaskMasterState()
@ -197,21 +221,82 @@ namespace crucible {
return os << "Workers End" << endl;
}
size_t
TaskMasterState::calculate_thread_count_nolock()
{
if (m_load_target == 0) {
// No limits, no stats, use configured thread count
return m_configured_thread_max;
}
if (m_configured_thread_max == 0) {
// Not a lot of choice here, and zeros break the algorithm
return 0;
}
const double loadavg = getloadavg1();
static const double load_exp = exp(-5.0 / 60.0);
// Averages are fun, but want to know the load from the last 5 seconds.
// Invert the load average function:
// LA = LA * load_exp + N * (1 - load_exp)
// LA2 - LA1 = LA1 * load_exp + N * (1 - load_exp) - LA1
// LA2 - LA1 + LA1 = LA1 * load_exp + N * (1 - load_exp)
// LA2 - LA1 + LA1 - LA1 * load_exp = N * (1 - load_exp)
// LA2 - LA1 * load_exp = N * (1 - load_exp)
// LA2 / (1 - load_exp) - (LA1 * load_exp / 1 - load_exp) = N
// (LA2 - LA1 * load_exp) / (1 - load_exp) = N
// except for rounding error which might make this just a bit below zero.
const double current_load = max(0.0, (loadavg - m_prev_loadavg * load_exp) / (1 - load_exp));
m_prev_loadavg = loadavg;
// Change the thread target based on the
// difference between current and desired load
// but don't get too close all at once due to rounding and sample error.
// If m_load_target < 1.0 then we are just doing PWM with one thread.
if (m_load_target <= 1.0) {
m_thread_target = 1.0;
} else if (m_load_target - current_load >= 1.0) {
m_thread_target += (m_load_target - current_load - 1.0) / 2.0;
} else if (m_load_target < current_load) {
m_thread_target += m_load_target - current_load;
}
// Cannot exceed configured maximum thread count or less than zero
m_thread_target = min(max(0.0, m_thread_target), double(m_configured_thread_max));
// Convert to integer but keep within range
const size_t rv = min(size_t(ceil(m_thread_target)), m_configured_thread_max);
return rv;
}
void
TaskMasterState::adjust_thread_count()
{
unique_lock<mutex> lock(m_mutex);
size_t new_thread_max = calculate_thread_count_nolock();
size_t old_thread_max = m_thread_max;
m_thread_max = new_thread_max;
// If we are reducing the number of threads we have to wake them up so they can exit their loops
// If we are increasing the number of threads we have to notify start_stop_threads it can stop waiting for threads to stop
if (new_thread_max != old_thread_max) {
m_condvar.notify_all();
start_threads_nolock();
}
}
void
TaskMasterState::set_thread_count(size_t thread_max)
{
unique_lock<mutex> lock(m_mutex);
// If we are reducing the number of threads we have to wake them up so they can exit their loops
if (thread_max < m_thread_max) {
m_condvar.notify_all();
}
// Lower maximum then release lock
m_thread_max = thread_max;
m_configured_thread_max = thread_max;
lock.unlock();
// Wait for threads to be stopped or go start them now
adjust_thread_count();
start_stop_threads();
}
@ -221,6 +306,37 @@ namespace crucible {
s_tms->set_thread_count(thread_max);
}
void
TaskMasterState::loadavg_thread_fn()
{
pthread_setname_np(pthread_self(), "load_tracker");
while (true) {
adjust_thread_count();
nanosleep(5.0);
}
}
void
TaskMasterState::set_loadavg_target(double target)
{
THROW_CHECK1(out_of_range, target, target >= 0);
unique_lock<mutex> lock(m_mutex);
m_load_target = target;
m_prev_loadavg = getloadavg1();
if (target && !m_load_tracking_thread) {
m_load_tracking_thread = make_shared<thread>([=] () { loadavg_thread_fn(); });
m_load_tracking_thread->detach();
}
}
void
TaskMaster::set_loadavg_target(double target)
{
s_tms->set_loadavg_target(target);
}
void
TaskMaster::set_thread_count()
{