1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00

roots: scan in parallel using Tasks

Distribute incoming extents across a thread pool for faster execution
on multi-core, multi-disk environments.

Switch extent enumeration model to scan extent refs consecutively(ish).

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2018-01-15 23:07:12 -05:00
parent 090d79e13b
commit 055c8d4c75
5 changed files with 109 additions and 29 deletions

View File

@ -488,6 +488,28 @@ of information about the contents of the filesystem through the log file.
There are also some shell wrappers in the `scripts/` directory.
Command Line Options
--------------------
* --thread-count (-c) COUNT
* Specify number of worker threads for scanning. Overrides --thread-factor (-C)
and default/autodetected values.
* --thread-factor (-C) FACTOR
* Specify ratio of worker threads to CPU cores. Overridden by --thread-count (-c).
Default is 1.0, i.e. 1 worker thread per detected CPU. Use values
below 1.0 to leave some cores idle, or above 1.0 if there are more
disks than CPUs in the filesystem.
* --timestamps (-t)
* Enable timestamps in log output.
* --notimestamps (-T)
* Disable timestamps in log output.
* --absolute-paths (-p)
* Paths in log output will be absolute.
* --relative-paths (-P)
* Paths in log output will be relative to the working directory at Bees startup.
Bug Reports and Contributions
-----------------------------

View File

@ -2,6 +2,7 @@
#include "crucible/limits.h"
#include "crucible/string.h"
#include "crucible/task.h"
#include <fstream>
#include <iostream>
@ -94,11 +95,19 @@ BeesContext::dump_status()
ofs << "RATES:\n";
ofs << "\t" << avg_rates << "\n";
ofs << "THREADS:\n";
ofs << "THREADS (work queue " << TaskMaster::get_queue_count() << " tasks):\n";
for (auto t : BeesNote::get_status()) {
ofs << "\ttid " << t.first << ": " << t.second << "\n";
}
#if 0
// Huge amount of data, not a lot of information (yet)
ofs << "WORKERS:\n";
TaskMaster::print_workers(ofs);
ofs << "QUEUE:\n";
TaskMaster::print_queue(ofs);
#endif
ofs.close();
BEESNOTE("renaming status file '" << status_file << "'");
@ -719,6 +728,9 @@ BeesContext::scan_forward(const BeesFileRange &bfr)
e = ew.current();
catch_all([&]() {
uint64_t extent_bytenr = e.bytenr();
BEESNOTE("waiting for extent bytenr " << to_hex(extent_bytenr));
auto extent_lock = m_extent_lock_set.make_lock(extent_bytenr);
Timer one_extent_timer;
return_bfr = scan_one_extent(bfr, e);
BEESCOUNTADD(scanf_extent_ms, one_extent_timer.age() * 1000);
@ -810,7 +822,8 @@ BeesContext::set_root_fd(Fd fd)
m_root_uuid = fsinfo.uuid();
BEESLOG("Filesystem UUID is " << m_root_uuid);
// 65536 is big enough for two max-sized extents
// 65536 is big enough for two max-sized extents.
// Need enough total space in the cache for the maximum number of active threads.
m_resolve_cache.max_size(65536);
m_resolve_cache.func([&](BeesAddress addr) -> BeesResolveAddrResult {
return resolve_addr_uncached(addr);

View File

@ -2,6 +2,7 @@
#include "crucible/cache.h"
#include "crucible/string.h"
#include "crucible/task.h"
#include <fstream>
#include <tuple>
@ -197,17 +198,17 @@ BeesRoots::crawl_roots()
BEESNOTE("Crawling roots");
unique_lock<mutex> lock(m_mutex);
if (m_root_crawl_map.empty()) {
BEESNOTE("idle, crawl map is empty");
m_condvar.wait(lock);
// Don't count the time we were waiting as part of the crawl time
m_crawl_timer.reset();
}
// Work from a copy because BeesCrawl might change the world under us
auto crawl_map_copy = m_root_crawl_map;
lock.unlock();
// Nothing to crawl? Seems suspicious...
if (m_root_crawl_map.empty()) {
BEESLOG("idle: crawl map is empty!");
}
auto ctx_copy = m_ctx;
#if 0
// Scan the same inode/offset tuple in each subvol (good for snapshots)
BeesFileRange first_range;
@ -224,10 +225,13 @@ BeesRoots::crawl_roots()
}
if (first_range) {
catch_all([&]() {
Task([ctx_copy, first_range]() {
// BEESINFO("scan_forward " << first_range);
m_ctx->scan_forward(first_range);
});
ctx_copy->scan_forward(first_range);
},
[first_range](ostream &os) -> ostream & {
return os << "scan_forward " << first_range;
}).run();
BEESCOUNT(crawl_scan);
m_crawl_current = first_crawl->get_state();
auto first_range_popped = first_crawl->pop_front();
@ -241,10 +245,13 @@ BeesRoots::crawl_roots()
auto this_crawl = i.second;
auto this_range = this_crawl->peek_front();
if (this_range) {
catch_all([&]() {
Task([ctx_copy, this_range]() {
// BEESINFO("scan_forward " << this_range);
m_ctx->scan_forward(this_range);
});
ctx_copy->scan_forward(this_range);
},
[this_range](ostream &os) -> ostream & {
return os << "scan_forward " << this_range;
}).run();
crawled = true;
BEESCOUNT(crawl_scan);
m_crawl_current = this_crawl->get_state();
@ -269,12 +276,23 @@ BeesRoots::crawl_roots()
void
BeesRoots::crawl_thread()
{
// TODO: get rid of the thread. For now it is a convenient
// way to avoid the weird things that happen when you try to
// shared_from_this() in a constructor.
BEESNOTE("crawling");
while (1) {
catch_all([&]() {
crawl_roots();
});
}
auto shared_this = shared_from_this();
Task([shared_this]() {
auto tqs = TaskMaster::get_queue_count();
while (tqs < BEES_MAX_QUEUE_SIZE) {
// BEESLOG("Task queue size " << tqs << ", crawling...");
catch_all([&]() {
shared_this->crawl_roots();
});
tqs = TaskMaster::get_queue_count();
}
BEESLOG("Task queue size " << tqs << ", paused");
Task::current_task().run();
}, [](ostream &os) -> ostream& { return os << "crawl task"; }).run();
}
void

View File

@ -3,6 +3,7 @@
#include "crucible/limits.h"
#include "crucible/process.h"
#include "crucible/string.h"
#include "crucible/task.h"
#include <cctype>
#include <cmath>
@ -39,6 +40,8 @@ do_cmd_help(char *argv[])
"\n"
"Options:\n"
"\t-h, --help\t\tShow this help\n"
"\t-c, --thread-count\tWorker thread count (default CPU count * factor)\n"
"\t-C, --thread-factor\tWorker thread factor (default " << BEES_DEFAULT_THREAD_FACTOR << ")\n"
"\t-t, --timestamps\tShow timestamps in log output (default)\n"
"\t-T, --notimestamps\tOmit timestamps in log output\n"
"\t-p, --absolute-paths\tShow absolute paths (default)\n"
@ -613,25 +616,35 @@ bees_main(int argc, char *argv[])
// Defaults
bool chatter_prefix_timestamp = true;
double thread_factor = 0;
unsigned thread_count = 0;
// Parse options
int c;
while (1) {
int option_index = 0;
static struct option long_options[] = {
{ "timestamps", no_argument, NULL, 't' },
{ "notimestamps", no_argument, NULL, 'T' },
{ "absolute-paths", no_argument, NULL, 'p' },
{ "relative-paths", no_argument, NULL, 'P' },
{ "help", no_argument, NULL, 'h' }
{ "thread-count", required_argument, NULL, 'c' },
{ "thread-factor", required_argument, NULL, 'C' },
{ "timestamps", no_argument, NULL, 't' },
{ "notimestamps", no_argument, NULL, 'T' },
{ "absolute-paths", no_argument, NULL, 'p' },
{ "relative-paths", no_argument, NULL, 'P' },
{ "help", no_argument, NULL, 'h' }
};
c = getopt_long(argc, argv, "TtPph", long_options, &option_index);
c = getopt_long(argc, argv, "c:C:TtPph", long_options, &option_index);
if (-1 == c) {
break;
}
switch (c) {
case 'c':
thread_count = stoul(optarg);
break;
case 'C':
thread_factor = stod(optarg);
break;
case 'T':
chatter_prefix_timestamp = false;
break;
@ -668,6 +681,17 @@ bees_main(int argc, char *argv[])
BEESLOG("setrlimit(RLIMIT_NOFILE, { " << lim.rlim_cur << " }): " << strerror(errno));
};
// Set up worker thread pool
THROW_CHECK1(out_of_range, thread_factor, thread_factor >= 0);
if (thread_count < 1) {
if (thread_factor == 0) {
thread_factor = BEES_DEFAULT_THREAD_FACTOR;
}
thread_count = max(1U, static_cast<unsigned>(ceil(thread::hardware_concurrency() * thread_factor)));
}
TaskMaster::set_thread_count(thread_count);
// Create a context and start crawlers
bool did_subscription = false;
while (optind < argc) {

View File

@ -82,8 +82,8 @@ const size_t BEES_ROOT_FD_CACHE_SIZE = 1024;
// Number of FDs to open (rlimit)
const size_t BEES_OPEN_FILE_LIMIT = (BEES_FILE_FD_CACHE_SIZE + BEES_ROOT_FD_CACHE_SIZE) * 2 + 100;
// Worker thread limit (more threads may be created, but only this number will be active concurrently)
const size_t BEES_WORKER_THREAD_LIMIT = 128;
// Worker thread factor (multiplied by detected number of CPU cores)
const double BEES_DEFAULT_THREAD_FACTOR = 1.0;
// Log warnings when an operation takes too long
const double BEES_TOO_LONG = 2.5;
@ -516,7 +516,7 @@ public:
void set_state(const BeesCrawlState &bcs);
};
class BeesRoots {
class BeesRoots : public enable_shared_from_this<BeesRoots> {
shared_ptr<BeesContext> m_ctx;
BeesStringFile m_crawl_state_file;
@ -677,6 +677,8 @@ class BeesContext : public enable_shared_from_this<BeesContext> {
Timer m_total_timer;
LockSet<uint64_t> m_extent_lock_set;
void set_root_fd(Fd fd);
BeesResolveAddrResult resolve_addr_uncached(BeesAddress addr);
@ -714,6 +716,7 @@ public:
shared_ptr<BeesTempFile> tmpfile();
const Timer &total_timer() const { return m_total_timer; }
LockSet<uint64_t> &extent_lock_set() { return m_extent_lock_set; }
// TODO: move the rest of the FD cache methods here
void insert_root_ino(Fd fd);