diff --git a/README.md b/README.md index d7bad82..2021bca 100644 --- a/README.md +++ b/README.md @@ -535,13 +535,19 @@ Command Line Options -------------------- * --thread-count (-c) COUNT - * Specify number of worker threads for scanning. Overrides --thread-factor (-C) - and default/autodetected values. + * Specify maximum number of worker threads for scanning. Overrides + --thread-factor (-C) and default/autodetected values. * --thread-factor (-C) FACTOR * Specify ratio of worker threads to CPU cores. Overridden by --thread-count (-c). Default is 1.0, i.e. 1 worker thread per detected CPU. Use values below 1.0 to leave some cores idle, or above 1.0 if there are more disks than CPUs in the filesystem. +* --loadavg-target (-g) LOADAVG + * Specify load average target for dynamic worker threads. + Threads will be started or stopped subject to the upper limit imposed + by thread-factor and thread-count until the load average is within + +/- 0.5 of LOADAVG. + * --scan-mode (-m) MODE * Specify extent scanning algorithm. Default mode is 0. _EXPERIMENTAL_ feature that may go away. @@ -552,6 +558,7 @@ Command Line Options on non-spinning media when subvols are unrelated. * Mode 2: scan all extents from one subvol at a time. Good sequential read performance for spinning media. Maximizes temporary space usage. + * --timestamps (-t) * Enable timestamps in log output. * --no-timestamps (-T) diff --git a/include/crucible/process.h b/include/crucible/process.h index f986f43..3afcea6 100644 --- a/include/crucible/process.h +++ b/include/crucible/process.h @@ -74,5 +74,8 @@ namespace crucible { typedef ResourceHandle Pid; pid_t gettid(); + double getloadavg1(); + double getloadavg5(); + double getloadavg15(); } #endif // CRUCIBLE_PROCESS_H diff --git a/include/crucible/task.h b/include/crucible/task.h index 9bb8f26..755eae2 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -61,6 +61,9 @@ namespace crucible { // Calls set_thread_count with default static void set_thread_count(); + // Creates thread to track load average and adjust thread count dynamically + static void set_loadavg_target(double target); + // Writes the current non-executing Task queue static ostream & print_queue(ostream &); diff --git a/lib/process.cc b/lib/process.cc index a9ce153..9be1070 100644 --- a/lib/process.cc +++ b/lib/process.cc @@ -3,6 +3,7 @@ #include "crucible/chatter.h" #include "crucible/error.h" +#include #include // for gettid() @@ -118,4 +119,37 @@ namespace crucible { return syscall(SYS_gettid); } + double + getloadavg1() + { + double loadavg[1]; + const int rv = ::getloadavg(loadavg, 1); + if (rv != 1) { + THROW_ERRNO("getloadavg(..., 1)"); + } + return loadavg[0]; + } + + double + getloadavg5() + { + double loadavg[2]; + const int rv = ::getloadavg(loadavg, 2); + if (rv != 2) { + THROW_ERRNO("getloadavg(..., 2)"); + } + return loadavg[1]; + } + + double + getloadavg15() + { + double loadavg[3]; + const int rv = ::getloadavg(loadavg, 3); + if (rv != 3) { + THROW_ERRNO("getloadavg(..., 3)"); + } + return loadavg[2]; + } + } diff --git a/lib/task.cc b/lib/task.cc index 4a4a4a9..8a981e4 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -3,8 +3,10 @@ #include "crucible/cleanup.h" #include "crucible/error.h" #include "crucible/process.h" +#include "crucible/time.h" #include +#include #include #include #include @@ -42,12 +44,22 @@ namespace crucible { list> m_queue; size_t m_thread_max; set> m_threads; + shared_ptr m_load_tracking_thread; + double m_load_target = 0; + double m_prev_loadavg; + size_t m_configured_thread_max; + double m_thread_target; friend class TaskConsumer; friend class TaskMaster; + void start_threads_nolock(); void start_stop_threads(); void set_thread_count(size_t thread_max); + void adjust_thread_count(); + size_t calculate_thread_count_nolock(); + void set_loadavg_target(double target); + void loadavg_thread_fn(); public: ~TaskMasterState(); @@ -69,6 +81,7 @@ namespace crucible { TaskConsumer(weak_ptr tms); shared_ptr current_task(); friend class TaskMaster; + friend class TaskMasterState; }; static shared_ptr s_tms = make_shared(); @@ -118,19 +131,30 @@ namespace crucible { } TaskMasterState::TaskMasterState(size_t thread_max) : - m_thread_max(thread_max) + m_thread_max(thread_max), + m_configured_thread_max(thread_max), + m_thread_target(thread_max) { } + void + TaskMasterState::start_threads_nolock() + { + while (m_threads.size() < m_thread_max) { + m_threads.insert(make_shared(shared_from_this())); + } + } + void TaskMasterState::start_stop_threads() { unique_lock lock(m_mutex); - while (m_threads.size() < m_thread_max) { - m_threads.insert(make_shared(shared_from_this())); - } - while (m_threads.size() > m_thread_max) { - m_condvar.wait(lock); + while (m_threads.size() != m_thread_max) { + if (m_threads.size() < m_thread_max) { + m_threads.insert(make_shared(shared_from_this())); + } else if (m_threads.size() > m_thread_max) { + m_condvar.wait(lock); + } } } @@ -138,20 +162,20 @@ namespace crucible { TaskMasterState::push_back(shared_ptr task) { THROW_CHECK0(runtime_error, task); - s_tms->start_stop_threads(); unique_lock lock(s_tms->m_mutex); s_tms->m_queue.push_back(task); s_tms->m_condvar.notify_all(); + s_tms->start_threads_nolock(); } void TaskMasterState::push_front(shared_ptr task) { THROW_CHECK0(runtime_error, task); - s_tms->start_stop_threads(); unique_lock lock(s_tms->m_mutex); s_tms->m_queue.push_front(task); s_tms->m_condvar.notify_all(); + s_tms->start_threads_nolock(); } TaskMasterState::~TaskMasterState() @@ -197,21 +221,82 @@ namespace crucible { return os << "Workers End" << endl; } + size_t + TaskMasterState::calculate_thread_count_nolock() + { + if (m_load_target == 0) { + // No limits, no stats, use configured thread count + return m_configured_thread_max; + } + + if (m_configured_thread_max == 0) { + // Not a lot of choice here, and zeros break the algorithm + return 0; + } + + const double loadavg = getloadavg1(); + + static const double load_exp = exp(-5.0 / 60.0); + + // Averages are fun, but want to know the load from the last 5 seconds. + // Invert the load average function: + // LA = LA * load_exp + N * (1 - load_exp) + // LA2 - LA1 = LA1 * load_exp + N * (1 - load_exp) - LA1 + // LA2 - LA1 + LA1 = LA1 * load_exp + N * (1 - load_exp) + // LA2 - LA1 + LA1 - LA1 * load_exp = N * (1 - load_exp) + // LA2 - LA1 * load_exp = N * (1 - load_exp) + // LA2 / (1 - load_exp) - (LA1 * load_exp / 1 - load_exp) = N + // (LA2 - LA1 * load_exp) / (1 - load_exp) = N + // except for rounding error which might make this just a bit below zero. + const double current_load = max(0.0, (loadavg - m_prev_loadavg * load_exp) / (1 - load_exp)); + + m_prev_loadavg = loadavg; + + // Change the thread target based on the + // difference between current and desired load + // but don't get too close all at once due to rounding and sample error. + // If m_load_target < 1.0 then we are just doing PWM with one thread. + + if (m_load_target <= 1.0) { + m_thread_target = 1.0; + } else if (m_load_target - current_load >= 1.0) { + m_thread_target += (m_load_target - current_load - 1.0) / 2.0; + } else if (m_load_target < current_load) { + m_thread_target += m_load_target - current_load; + } + + // Cannot exceed configured maximum thread count or less than zero + m_thread_target = min(max(0.0, m_thread_target), double(m_configured_thread_max)); + + // Convert to integer but keep within range + const size_t rv = min(size_t(ceil(m_thread_target)), m_configured_thread_max); + + return rv; + } + + void + TaskMasterState::adjust_thread_count() + { + unique_lock lock(m_mutex); + size_t new_thread_max = calculate_thread_count_nolock(); + size_t old_thread_max = m_thread_max; + m_thread_max = new_thread_max; + + // If we are reducing the number of threads we have to wake them up so they can exit their loops + // If we are increasing the number of threads we have to notify start_stop_threads it can stop waiting for threads to stop + if (new_thread_max != old_thread_max) { + m_condvar.notify_all(); + start_threads_nolock(); + } + } + void TaskMasterState::set_thread_count(size_t thread_max) { unique_lock lock(m_mutex); - - // If we are reducing the number of threads we have to wake them up so they can exit their loops - if (thread_max < m_thread_max) { - m_condvar.notify_all(); - } - - // Lower maximum then release lock - m_thread_max = thread_max; + m_configured_thread_max = thread_max; lock.unlock(); - - // Wait for threads to be stopped or go start them now + adjust_thread_count(); start_stop_threads(); } @@ -221,6 +306,37 @@ namespace crucible { s_tms->set_thread_count(thread_max); } + void + TaskMasterState::loadavg_thread_fn() + { + pthread_setname_np(pthread_self(), "load_tracker"); + while (true) { + adjust_thread_count(); + nanosleep(5.0); + } + } + + void + TaskMasterState::set_loadavg_target(double target) + { + THROW_CHECK1(out_of_range, target, target >= 0); + + unique_lock lock(m_mutex); + m_load_target = target; + m_prev_loadavg = getloadavg1(); + + if (target && !m_load_tracking_thread) { + m_load_tracking_thread = make_shared([=] () { loadavg_thread_fn(); }); + m_load_tracking_thread->detach(); + } + } + + void + TaskMaster::set_loadavg_target(double target) + { + s_tms->set_loadavg_target(target); + } + void TaskMaster::set_thread_count() { diff --git a/src/bees-roots.cc b/src/bees-roots.cc index 2c04cf6..65ba8fc 100644 --- a/src/bees-roots.cc +++ b/src/bees-roots.cc @@ -244,6 +244,7 @@ BeesRoots::transid_max() size_t BeesRoots::crawl_batch(shared_ptr this_crawl) { + BEESNOTE("Crawling batch " << this_crawl->get_state_begin()); auto ctx_copy = m_ctx; size_t batch_count = 0; auto subvol = this_crawl->get_state_begin().m_root; @@ -257,6 +258,7 @@ BeesRoots::crawl_batch(shared_ptr this_crawl) } auto this_hold = this_crawl->hold_state(this_range); auto shared_this_copy = shared_from_this(); + BEESNOTE("Starting task " << this_range); Task(task_title, [ctx_copy, this_hold, this_range, shared_this_copy]() { BEESNOTE("scan_forward " << this_range); ctx_copy->scan_forward(this_range); @@ -357,6 +359,7 @@ BeesRoots::crawl_roots() case SCAN_MODE_COUNT: assert(false); break; } + BEESNOTE("Crawl done"); BEESCOUNT(crawl_done); auto want_transid = m_transid_re.count() + m_transid_factor; diff --git a/src/bees.cc b/src/bees.cc index b32683b..2bf4914 100644 --- a/src/bees.cc +++ b/src/bees.cc @@ -45,7 +45,8 @@ do_cmd_help(char *argv[]) "\t-h, --help\t\tShow this help\n" "\t-c, --thread-count\tWorker thread count (default CPU count * factor)\n" "\t-C, --thread-factor\tWorker thread factor (default " << BEES_DEFAULT_THREAD_FACTOR << ")\n" - "\t-m, --scan-mode\t\tScanning mode (0..1, default 0)\n" + "\t-g, --loadavg-target\t\tTarget load average for worker threads (default is no target)\n" + "\t-m, --scan-mode\t\tScanning mode (0..2, default 0)\n" "\t-t, --timestamps\tShow timestamps in log output (default)\n" "\t-T, --no-timestamps\tOmit timestamps in log output\n" "\t-p, --absolute-paths\tShow absolute paths (default)\n" @@ -652,24 +653,26 @@ bees_main(int argc, char *argv[]) bool chatter_prefix_timestamp = true; double thread_factor = 0; unsigned thread_count = 0; + double load_target = 0; // Parse options int c; while (1) { int option_index = 0; - static struct option long_options[] = { + static const struct option long_options[] = { { "thread-factor", required_argument, NULL, 'C' }, { "strip-paths", no_argument, NULL, 'P' }, { "no-timestamps", no_argument, NULL, 'T' }, { "thread-count", required_argument, NULL, 'c' }, { "help", no_argument, NULL, 'h' }, + { "loadavg-target", required_argument, NULL, 'g' }, { "scan-mode", required_argument, NULL, 'm' }, { "absolute-paths", no_argument, NULL, 'p' }, { "timestamps", no_argument, NULL, 't' }, { "verbose", required_argument, NULL, 'v' }, }; - c = getopt_long(argc, argv, "C:PTc:hm:ptv:", long_options, &option_index); + c = getopt_long(argc, argv, "C:PTc:hg:m:ptv:", long_options, &option_index); if (-1 == c) { break; } @@ -688,6 +691,9 @@ bees_main(int argc, char *argv[]) case 'c': thread_count = stoul(optarg); break; + case 'g': + load_target = stod(optarg); + break; case 'm': BeesRoots::set_scan_mode(static_cast(stoul(optarg))); break; @@ -700,7 +706,8 @@ bees_main(int argc, char *argv[]) case 'v': { int new_log_level = stoul(optarg); - THROW_CHECK1(out_of_range, new_log_level, new_log_level >= 0 || new_log_level <= 8); + THROW_CHECK1(out_of_range, new_log_level, new_log_level <= 8); + THROW_CHECK1(out_of_range, new_log_level, new_log_level >= 0); bees_log_level = new_log_level; BEESLOGNOTICE("log level set to " << bees_log_level); } @@ -739,6 +746,12 @@ bees_main(int argc, char *argv[]) thread_count = max(1U, static_cast(ceil(thread::hardware_concurrency() * thread_factor))); } + if (load_target != 0) { + BEESLOGNOTICE("setting load average target to " << load_target); + } + TaskMaster::set_loadavg_target(load_target); + + BEESLOGNOTICE("setting worker thread pool maximum size to " << load_target); TaskMaster::set_thread_count(thread_count); // Create a context and start crawlers