From e66086516fdb9f9cc2d703fb8101f6116ce169e9 Mon Sep 17 00:00:00 2001 From: Zygo Blaxell Date: Sun, 8 Jul 2018 21:20:15 -0400 Subject: [PATCH] bees: dynamic thread pool size based on system load average Add -g / --loadavg-target parameter to track system load and add or remove bees worker threads dynamically to keep system load close to the loadavg target. Thread count may vary from zero to the maximum specified by -c or -C, and is adjusted every 5 seconds. This is better than implementing a similar load average scheme from outside of the process (though that is still possible) because the in-process load tracker does not disrupt the performance timing feedback mechanisms as a freezer cgroup or SIGSTOP would when controlling bees from outside. The internal load average tracker can also adjust the number of active threads while an external tracker can only choose from the maximum or zero. Also fix a bug where a Task could deadlock waiting for itself to exit if it tries to insert a new Task after the number of worker threads has been set to zero. Also correct usage message for --scan-mode (values are 0..2) since we are touching adjacent lines anyway. Signed-off-by: Zygo Blaxell --- README.md | 11 ++- include/crucible/process.h | 3 + include/crucible/task.h | 3 + lib/process.cc | 34 +++++++++ lib/task.cc | 152 ++++++++++++++++++++++++++++++++----- src/bees-roots.cc | 3 + src/bees.cc | 21 ++++- 7 files changed, 203 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index d7bad82..2021bca 100644 --- a/README.md +++ b/README.md @@ -535,13 +535,19 @@ Command Line Options -------------------- * --thread-count (-c) COUNT - * Specify number of worker threads for scanning. Overrides --thread-factor (-C) - and default/autodetected values. + * Specify maximum number of worker threads for scanning. Overrides + --thread-factor (-C) and default/autodetected values. * --thread-factor (-C) FACTOR * Specify ratio of worker threads to CPU cores. Overridden by --thread-count (-c). Default is 1.0, i.e. 1 worker thread per detected CPU. Use values below 1.0 to leave some cores idle, or above 1.0 if there are more disks than CPUs in the filesystem. +* --loadavg-target (-g) LOADAVG + * Specify load average target for dynamic worker threads. + Threads will be started or stopped subject to the upper limit imposed + by thread-factor and thread-count until the load average is within + +/- 0.5 of LOADAVG. + * --scan-mode (-m) MODE * Specify extent scanning algorithm. Default mode is 0. _EXPERIMENTAL_ feature that may go away. @@ -552,6 +558,7 @@ Command Line Options on non-spinning media when subvols are unrelated. * Mode 2: scan all extents from one subvol at a time. Good sequential read performance for spinning media. Maximizes temporary space usage. + * --timestamps (-t) * Enable timestamps in log output. * --no-timestamps (-T) diff --git a/include/crucible/process.h b/include/crucible/process.h index f986f43..3afcea6 100644 --- a/include/crucible/process.h +++ b/include/crucible/process.h @@ -74,5 +74,8 @@ namespace crucible { typedef ResourceHandle Pid; pid_t gettid(); + double getloadavg1(); + double getloadavg5(); + double getloadavg15(); } #endif // CRUCIBLE_PROCESS_H diff --git a/include/crucible/task.h b/include/crucible/task.h index 9bb8f26..755eae2 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -61,6 +61,9 @@ namespace crucible { // Calls set_thread_count with default static void set_thread_count(); + // Creates thread to track load average and adjust thread count dynamically + static void set_loadavg_target(double target); + // Writes the current non-executing Task queue static ostream & print_queue(ostream &); diff --git a/lib/process.cc b/lib/process.cc index a9ce153..9be1070 100644 --- a/lib/process.cc +++ b/lib/process.cc @@ -3,6 +3,7 @@ #include "crucible/chatter.h" #include "crucible/error.h" +#include #include // for gettid() @@ -118,4 +119,37 @@ namespace crucible { return syscall(SYS_gettid); } + double + getloadavg1() + { + double loadavg[1]; + const int rv = ::getloadavg(loadavg, 1); + if (rv != 1) { + THROW_ERRNO("getloadavg(..., 1)"); + } + return loadavg[0]; + } + + double + getloadavg5() + { + double loadavg[2]; + const int rv = ::getloadavg(loadavg, 2); + if (rv != 2) { + THROW_ERRNO("getloadavg(..., 2)"); + } + return loadavg[1]; + } + + double + getloadavg15() + { + double loadavg[3]; + const int rv = ::getloadavg(loadavg, 3); + if (rv != 3) { + THROW_ERRNO("getloadavg(..., 3)"); + } + return loadavg[2]; + } + } diff --git a/lib/task.cc b/lib/task.cc index 4a4a4a9..8a981e4 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -3,8 +3,10 @@ #include "crucible/cleanup.h" #include "crucible/error.h" #include "crucible/process.h" +#include "crucible/time.h" #include +#include #include #include #include @@ -42,12 +44,22 @@ namespace crucible { list> m_queue; size_t m_thread_max; set> m_threads; + shared_ptr m_load_tracking_thread; + double m_load_target = 0; + double m_prev_loadavg; + size_t m_configured_thread_max; + double m_thread_target; friend class TaskConsumer; friend class TaskMaster; + void start_threads_nolock(); void start_stop_threads(); void set_thread_count(size_t thread_max); + void adjust_thread_count(); + size_t calculate_thread_count_nolock(); + void set_loadavg_target(double target); + void loadavg_thread_fn(); public: ~TaskMasterState(); @@ -69,6 +81,7 @@ namespace crucible { TaskConsumer(weak_ptr tms); shared_ptr current_task(); friend class TaskMaster; + friend class TaskMasterState; }; static shared_ptr s_tms = make_shared(); @@ -118,19 +131,30 @@ namespace crucible { } TaskMasterState::TaskMasterState(size_t thread_max) : - m_thread_max(thread_max) + m_thread_max(thread_max), + m_configured_thread_max(thread_max), + m_thread_target(thread_max) { } + void + TaskMasterState::start_threads_nolock() + { + while (m_threads.size() < m_thread_max) { + m_threads.insert(make_shared(shared_from_this())); + } + } + void TaskMasterState::start_stop_threads() { unique_lock lock(m_mutex); - while (m_threads.size() < m_thread_max) { - m_threads.insert(make_shared(shared_from_this())); - } - while (m_threads.size() > m_thread_max) { - m_condvar.wait(lock); + while (m_threads.size() != m_thread_max) { + if (m_threads.size() < m_thread_max) { + m_threads.insert(make_shared(shared_from_this())); + } else if (m_threads.size() > m_thread_max) { + m_condvar.wait(lock); + } } } @@ -138,20 +162,20 @@ namespace crucible { TaskMasterState::push_back(shared_ptr task) { THROW_CHECK0(runtime_error, task); - s_tms->start_stop_threads(); unique_lock lock(s_tms->m_mutex); s_tms->m_queue.push_back(task); s_tms->m_condvar.notify_all(); + s_tms->start_threads_nolock(); } void TaskMasterState::push_front(shared_ptr task) { THROW_CHECK0(runtime_error, task); - s_tms->start_stop_threads(); unique_lock lock(s_tms->m_mutex); s_tms->m_queue.push_front(task); s_tms->m_condvar.notify_all(); + s_tms->start_threads_nolock(); } TaskMasterState::~TaskMasterState() @@ -197,21 +221,82 @@ namespace crucible { return os << "Workers End" << endl; } + size_t + TaskMasterState::calculate_thread_count_nolock() + { + if (m_load_target == 0) { + // No limits, no stats, use configured thread count + return m_configured_thread_max; + } + + if (m_configured_thread_max == 0) { + // Not a lot of choice here, and zeros break the algorithm + return 0; + } + + const double loadavg = getloadavg1(); + + static const double load_exp = exp(-5.0 / 60.0); + + // Averages are fun, but want to know the load from the last 5 seconds. + // Invert the load average function: + // LA = LA * load_exp + N * (1 - load_exp) + // LA2 - LA1 = LA1 * load_exp + N * (1 - load_exp) - LA1 + // LA2 - LA1 + LA1 = LA1 * load_exp + N * (1 - load_exp) + // LA2 - LA1 + LA1 - LA1 * load_exp = N * (1 - load_exp) + // LA2 - LA1 * load_exp = N * (1 - load_exp) + // LA2 / (1 - load_exp) - (LA1 * load_exp / 1 - load_exp) = N + // (LA2 - LA1 * load_exp) / (1 - load_exp) = N + // except for rounding error which might make this just a bit below zero. + const double current_load = max(0.0, (loadavg - m_prev_loadavg * load_exp) / (1 - load_exp)); + + m_prev_loadavg = loadavg; + + // Change the thread target based on the + // difference between current and desired load + // but don't get too close all at once due to rounding and sample error. + // If m_load_target < 1.0 then we are just doing PWM with one thread. + + if (m_load_target <= 1.0) { + m_thread_target = 1.0; + } else if (m_load_target - current_load >= 1.0) { + m_thread_target += (m_load_target - current_load - 1.0) / 2.0; + } else if (m_load_target < current_load) { + m_thread_target += m_load_target - current_load; + } + + // Cannot exceed configured maximum thread count or less than zero + m_thread_target = min(max(0.0, m_thread_target), double(m_configured_thread_max)); + + // Convert to integer but keep within range + const size_t rv = min(size_t(ceil(m_thread_target)), m_configured_thread_max); + + return rv; + } + + void + TaskMasterState::adjust_thread_count() + { + unique_lock lock(m_mutex); + size_t new_thread_max = calculate_thread_count_nolock(); + size_t old_thread_max = m_thread_max; + m_thread_max = new_thread_max; + + // If we are reducing the number of threads we have to wake them up so they can exit their loops + // If we are increasing the number of threads we have to notify start_stop_threads it can stop waiting for threads to stop + if (new_thread_max != old_thread_max) { + m_condvar.notify_all(); + start_threads_nolock(); + } + } + void TaskMasterState::set_thread_count(size_t thread_max) { unique_lock lock(m_mutex); - - // If we are reducing the number of threads we have to wake them up so they can exit their loops - if (thread_max < m_thread_max) { - m_condvar.notify_all(); - } - - // Lower maximum then release lock - m_thread_max = thread_max; + m_configured_thread_max = thread_max; lock.unlock(); - - // Wait for threads to be stopped or go start them now + adjust_thread_count(); start_stop_threads(); } @@ -221,6 +306,37 @@ namespace crucible { s_tms->set_thread_count(thread_max); } + void + TaskMasterState::loadavg_thread_fn() + { + pthread_setname_np(pthread_self(), "load_tracker"); + while (true) { + adjust_thread_count(); + nanosleep(5.0); + } + } + + void + TaskMasterState::set_loadavg_target(double target) + { + THROW_CHECK1(out_of_range, target, target >= 0); + + unique_lock lock(m_mutex); + m_load_target = target; + m_prev_loadavg = getloadavg1(); + + if (target && !m_load_tracking_thread) { + m_load_tracking_thread = make_shared([=] () { loadavg_thread_fn(); }); + m_load_tracking_thread->detach(); + } + } + + void + TaskMaster::set_loadavg_target(double target) + { + s_tms->set_loadavg_target(target); + } + void TaskMaster::set_thread_count() { diff --git a/src/bees-roots.cc b/src/bees-roots.cc index 2c04cf6..65ba8fc 100644 --- a/src/bees-roots.cc +++ b/src/bees-roots.cc @@ -244,6 +244,7 @@ BeesRoots::transid_max() size_t BeesRoots::crawl_batch(shared_ptr this_crawl) { + BEESNOTE("Crawling batch " << this_crawl->get_state_begin()); auto ctx_copy = m_ctx; size_t batch_count = 0; auto subvol = this_crawl->get_state_begin().m_root; @@ -257,6 +258,7 @@ BeesRoots::crawl_batch(shared_ptr this_crawl) } auto this_hold = this_crawl->hold_state(this_range); auto shared_this_copy = shared_from_this(); + BEESNOTE("Starting task " << this_range); Task(task_title, [ctx_copy, this_hold, this_range, shared_this_copy]() { BEESNOTE("scan_forward " << this_range); ctx_copy->scan_forward(this_range); @@ -357,6 +359,7 @@ BeesRoots::crawl_roots() case SCAN_MODE_COUNT: assert(false); break; } + BEESNOTE("Crawl done"); BEESCOUNT(crawl_done); auto want_transid = m_transid_re.count() + m_transid_factor; diff --git a/src/bees.cc b/src/bees.cc index b32683b..2bf4914 100644 --- a/src/bees.cc +++ b/src/bees.cc @@ -45,7 +45,8 @@ do_cmd_help(char *argv[]) "\t-h, --help\t\tShow this help\n" "\t-c, --thread-count\tWorker thread count (default CPU count * factor)\n" "\t-C, --thread-factor\tWorker thread factor (default " << BEES_DEFAULT_THREAD_FACTOR << ")\n" - "\t-m, --scan-mode\t\tScanning mode (0..1, default 0)\n" + "\t-g, --loadavg-target\t\tTarget load average for worker threads (default is no target)\n" + "\t-m, --scan-mode\t\tScanning mode (0..2, default 0)\n" "\t-t, --timestamps\tShow timestamps in log output (default)\n" "\t-T, --no-timestamps\tOmit timestamps in log output\n" "\t-p, --absolute-paths\tShow absolute paths (default)\n" @@ -652,24 +653,26 @@ bees_main(int argc, char *argv[]) bool chatter_prefix_timestamp = true; double thread_factor = 0; unsigned thread_count = 0; + double load_target = 0; // Parse options int c; while (1) { int option_index = 0; - static struct option long_options[] = { + static const struct option long_options[] = { { "thread-factor", required_argument, NULL, 'C' }, { "strip-paths", no_argument, NULL, 'P' }, { "no-timestamps", no_argument, NULL, 'T' }, { "thread-count", required_argument, NULL, 'c' }, { "help", no_argument, NULL, 'h' }, + { "loadavg-target", required_argument, NULL, 'g' }, { "scan-mode", required_argument, NULL, 'm' }, { "absolute-paths", no_argument, NULL, 'p' }, { "timestamps", no_argument, NULL, 't' }, { "verbose", required_argument, NULL, 'v' }, }; - c = getopt_long(argc, argv, "C:PTc:hm:ptv:", long_options, &option_index); + c = getopt_long(argc, argv, "C:PTc:hg:m:ptv:", long_options, &option_index); if (-1 == c) { break; } @@ -688,6 +691,9 @@ bees_main(int argc, char *argv[]) case 'c': thread_count = stoul(optarg); break; + case 'g': + load_target = stod(optarg); + break; case 'm': BeesRoots::set_scan_mode(static_cast(stoul(optarg))); break; @@ -700,7 +706,8 @@ bees_main(int argc, char *argv[]) case 'v': { int new_log_level = stoul(optarg); - THROW_CHECK1(out_of_range, new_log_level, new_log_level >= 0 || new_log_level <= 8); + THROW_CHECK1(out_of_range, new_log_level, new_log_level <= 8); + THROW_CHECK1(out_of_range, new_log_level, new_log_level >= 0); bees_log_level = new_log_level; BEESLOGNOTICE("log level set to " << bees_log_level); } @@ -739,6 +746,12 @@ bees_main(int argc, char *argv[]) thread_count = max(1U, static_cast(ceil(thread::hardware_concurrency() * thread_factor))); } + if (load_target != 0) { + BEESLOGNOTICE("setting load average target to " << load_target); + } + TaskMaster::set_loadavg_target(load_target); + + BEESLOGNOTICE("setting worker thread pool maximum size to " << load_target); TaskMaster::set_thread_count(thread_count); // Create a context and start crawlers