1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00

bees: handle SIGTERM and SIGINT, force immediate flush and exit

Capture SIGINT and SIGTERM and shut down, preserving current completed
crawl and hash table state.

  * Executing tasks are completed, queued tasks are paused.
  * Crawl state is saved.
  * The crawl master and crawl writeback threads are terminated.
  * The task queue is flushed.
  * Dirty hash table extents are flushed.
  * Hash prefetch and writeback threads are terminated.
  * Hash table is deallocated.
  * FD caches and tmpfiles are destroyed.
  * Assuming the above didn't crash or deadlock, bees exits.

The above order isn't the fastest, but it does roughly follow the
shared_ptr dependencies and avoids data races--especially those that
might lead to bees reporting an extent scanned when it was only queued
for future scanning that did not occur.

In case of a violation of expected shared_ptr dependency order,
exceptions in BeesContext child object accessor methods (i.e. roots(),
hash_table(), etc) prevent any further progress in threads that somehow
remain unexpectedly active.

Move some threads from main into BeesContext so they can be stopped
via BeesContext.  The main thread now runs a loop waiting for signals.

A slow FD leak was discovered in TempFile handling.  This has not been
fixed yet, but an implementation detail of the C++ runtime library makes
the leak so slow it may never be important enough to fix.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2018-12-02 00:51:35 -05:00
parent cbc6725f0f
commit 570b3f7de0
5 changed files with 423 additions and 64 deletions

View File

@ -1,5 +1,6 @@
#include "bees.h"
#include "crucible/cleanup.h"
#include "crucible/limits.h"
#include "crucible/string.h"
#include "crucible/task.h"
@ -13,9 +14,13 @@
// struct rusage
#include <sys/resource.h>
// struct sigset
#include <signal.h>
using namespace crucible;
using namespace std;
static inline
const char *
getenv_or_die(const char *name)
@ -82,17 +87,15 @@ BeesContext::dump_status()
if (!status_charp) return;
string status_file(status_charp);
BEESLOGINFO("Writing status to file '" << status_file << "' every " << BEES_STATUS_INTERVAL << " sec");
while (1) {
BEESNOTE("waiting " << BEES_STATUS_INTERVAL);
sleep(BEES_STATUS_INTERVAL);
Timer total_timer;
while (!m_stop_status) {
BEESNOTE("writing status to file '" << status_file << "'");
ofstream ofs(status_file + ".tmp");
auto thisStats = BeesStats::s_global;
ofs << "TOTAL:\n";
ofs << "\t" << thisStats << "\n";
auto avg_rates = thisStats / m_total_timer.age();
auto avg_rates = thisStats / total_timer.age();
ofs << "RATES:\n";
ofs << "\t" << avg_rates << "\n";
@ -113,42 +116,65 @@ BeesContext::dump_status()
BEESNOTE("renaming status file '" << status_file << "'");
rename((status_file + ".tmp").c_str(), status_file.c_str());
BEESNOTE("idle " << BEES_STATUS_INTERVAL);
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_status) {
return;
}
m_stop_condvar.wait_for(lock, chrono::duration<double>(BEES_STATUS_INTERVAL));
}
}
void
BeesContext::show_progress()
{
auto lastProgressStats = BeesStats::s_global;
auto lastStats = lastProgressStats;
auto lastStats = BeesStats::s_global;
Timer stats_timer;
while (1) {
sleep(BEES_PROGRESS_INTERVAL);
Timer all_timer;
while (!stop_requested()) {
BEESNOTE("idle " << BEES_PROGRESS_INTERVAL);
if (stats_timer.age() > BEES_STATS_INTERVAL) {
stats_timer.lap();
auto thisStats = BeesStats::s_global;
auto avg_rates = lastStats / BEES_STATS_INTERVAL;
BEESLOGINFO("TOTAL: " << thisStats);
BEESLOGINFO("RATES: " << avg_rates);
lastStats = thisStats;
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
return;
}
m_stop_condvar.wait_for(lock, chrono::duration<double>(BEES_PROGRESS_INTERVAL));
BEESLOGINFO("ACTIVITY:");
// Snapshot stats and timer state
auto thisStats = BeesStats::s_global;
auto deltaStats = thisStats - lastProgressStats;
if (deltaStats) {
BEESLOGINFO("\t" << deltaStats / BEES_PROGRESS_INTERVAL);
};
lastProgressStats = thisStats;
auto stats_age = stats_timer.age();
auto all_age = all_timer.age();
stats_timer.lap();
auto avg_rates = thisStats / stats_age;
BEESNOTE("logging event counter totals for last " << all_timer);
BEESLOGINFO("TOTAL COUNTS (" << all_age << "s):\n\t" << thisStats);
BEESNOTE("logging event counter rates for last " << all_timer);
BEESLOGINFO("TOTAL RATES (" << all_age << "s):\n\t" << avg_rates);
BEESNOTE("logging event counter delta counts for last " << stats_age);
BEESLOGINFO("DELTA COUNTS (" << stats_age << "s):");
auto deltaStats = thisStats - lastStats;
BEESLOGINFO("\t" << deltaStats / stats_age);
BEESNOTE("logging event counter delta rates for last " << stats_age);
BEESLOGINFO("DELTA RATES (" << stats_age << "s):");
auto deltaRates = deltaStats / stats_age;
BEESLOGINFO("\t" << deltaRates);
BEESNOTE("logging current thread status");
BEESLOGINFO("THREADS:");
for (auto t : BeesNote::get_status()) {
BEESLOGINFO("\ttid " << t.first << ": " << t.second);
}
lastStats = thisStats;
}
}
@ -171,11 +197,19 @@ BeesContext::home_fd()
}
BeesContext::BeesContext(shared_ptr<BeesContext> parent) :
m_parent_ctx(parent)
m_parent_ctx(parent),
m_progress_thread("progress_report"),
m_status_thread("status_report")
{
if (m_parent_ctx) {
m_fd_cache = m_parent_ctx->fd_cache();
}
m_progress_thread.exec([=]() {
show_progress();
});
m_status_thread.exec([=]() {
dump_status();
});
}
bool
@ -745,7 +779,7 @@ BeesContext::scan_forward(const BeesFileRange &bfr)
Extent e;
catch_all([&]() {
while (true) {
while (!stop_requested()) {
e = ew.current();
catch_all([&]() {
@ -881,11 +915,106 @@ BeesContext::set_root_fd(Fd fd)
m_resolve_cache.func([&](BeesAddress addr) -> BeesResolveAddrResult {
return resolve_addr_uncached(addr);
});
}
// Start queue producers
const char *
BeesHalt::what() const noexcept
{
return "bees stop requested";
}
void
BeesContext::start()
{
BEESLOGNOTICE("Starting bees main loop...");
BEESNOTE("starting BeesContext");
// Force these to exist now so we don't have recursive locking
// operations trying to access them
fd_cache();
hash_table();
// Kick off the crawlers
roots();
}
BEESLOGINFO("returning from set_root_fd in " << name_fd(fd));
void
BeesContext::stop()
{
Timer stop_timer;
BEESLOGNOTICE("Stopping bees...");
BEESLOGWARN("WARNING: This feature is EXPERIMENTAL and may not work!");
BEESNOTE("setting stop_request flag");
BEESNOTE("pausing work queue");
BEESLOGDEBUG("Pausing work queue");
TaskMaster::set_thread_count(0);
BEESLOGDEBUG("Setting stop_request flag");
unique_lock<mutex> lock(m_stop_mutex);
m_stop_requested = true;
m_stop_condvar.notify_all();
lock.unlock();
// Stop crawlers first so we get good progress persisted on disk
BEESNOTE("stopping crawlers");
BEESLOGDEBUG("Stopping crawlers");
if (m_roots) {
m_roots->stop();
m_roots.reset();
} else {
BEESLOGDEBUG("Crawlers not running");
}
BEESNOTE("cancelling work queue");
BEESLOGDEBUG("Cancelling work queue");
TaskMaster::cancel();
BEESNOTE("stopping hash table");
BEESLOGDEBUG("Stopping hash table");
if (m_hash_table) {
m_hash_table->stop();
m_hash_table.reset();
} else {
BEESLOGDEBUG("Hash table not running");
}
BEESNOTE("closing tmpfiles");
BEESLOGDEBUG("Closing tmpfiles");
m_tmpfiles.clear();
BEESNOTE("closing FD caches");
BEESLOGDEBUG("Closing FD caches");
if (m_fd_cache) {
m_fd_cache->clear();
BEESNOTE("destroying FD caches");
BEESLOGDEBUG("Destroying FD caches");
m_fd_cache.reset();
}
BEESNOTE("waiting for progress thread");
BEESLOGDEBUG("Waiting for progress thread");
m_progress_thread.join();
// XXX: nobody can see this BEESNOTE because we are killing the
// thread that publishes it
BEESNOTE("waiting for progress thread");
BEESLOGDEBUG("Waiting for progress thread");
lock.lock();
m_stop_status = true;
m_stop_condvar.notify_all();
lock.unlock();
m_status_thread.join();
BEESLOGNOTICE("bees stopped in " << stop_timer << " sec");
}
bool
BeesContext::stop_requested() const
{
unique_lock<mutex> lock(m_stop_mutex);
return m_stop_requested;
}
void
@ -910,22 +1039,32 @@ BeesContext::is_blacklisted(const BeesFileId &fid) const
shared_ptr<BeesTempFile>
BeesContext::tmpfile()
{
// There need be only one, this is not a high-contention path
static mutex s_mutex;
unique_lock<mutex> lock(s_mutex);
// FIXME: this whole thing leaks FDs (quite slowly). Make a pool instead.
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
throw BeesHalt();
}
if (!m_tmpfiles[this_thread::get_id()]) {
m_tmpfiles[this_thread::get_id()] = make_shared<BeesTempFile>(shared_from_this());
// We know we are the only possible accessor of this,
// so drop the lock to avoid a deadlock loop
lock.unlock();
auto rv = make_shared<BeesTempFile>(shared_from_this());
lock.lock();
m_tmpfiles[this_thread::get_id()] = rv;
}
auto rv = m_tmpfiles[this_thread::get_id()];
return rv;
return m_tmpfiles[this_thread::get_id()];
}
shared_ptr<BeesFdCache>
BeesContext::fd_cache()
{
static mutex s_mutex;
unique_lock<mutex> lock(s_mutex);
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
throw BeesHalt();
}
if (!m_fd_cache) {
m_fd_cache = make_shared<BeesFdCache>();
}
@ -936,8 +1075,10 @@ BeesContext::fd_cache()
shared_ptr<BeesRoots>
BeesContext::roots()
{
static mutex s_mutex;
unique_lock<mutex> lock(s_mutex);
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
throw BeesHalt();
}
if (!m_roots) {
m_roots = make_shared<BeesRoots>(shared_from_this());
}
@ -948,8 +1089,10 @@ BeesContext::roots()
shared_ptr<BeesHashTable>
BeesContext::hash_table()
{
static mutex s_mutex;
unique_lock<mutex> lock(s_mutex);
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
throw BeesHalt();
}
if (!m_hash_table) {
m_hash_table = make_shared<BeesHashTable>(shared_from_this(), "beeshash.dat");
}

View File

@ -130,13 +130,11 @@ BeesHashTable::flush_dirty_extent(uint64_t extent_index)
wrote_extent = true;
});
BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents");
m_flush_rate_limit.sleep_for(BLOCK_SIZE_HASHTAB_EXTENT);
return wrote_extent;
}
void
BeesHashTable::flush_dirty_extents()
size_t
BeesHashTable::flush_dirty_extents(bool slowly)
{
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
@ -144,12 +142,22 @@ BeesHashTable::flush_dirty_extents()
for (size_t extent_index = 0; extent_index < m_extents; ++extent_index) {
if (flush_dirty_extent(extent_index)) {
++wrote_extents;
if (slowly) {
BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents");
chrono::duration<double> sleep_time(m_flush_rate_limit.sleep_time(BLOCK_SIZE_HASHTAB_EXTENT));
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
BEESLOGDEBUG("Stop requested in hash table flush_dirty_extents");
break;
}
m_stop_condvar.wait_for(lock, sleep_time);
}
}
}
BEESNOTE("idle after writing " << wrote_extents << " of " << m_extents << " extents");
unique_lock<mutex> lock(m_dirty_mutex);
m_dirty_condvar.wait(lock);
if (!slowly) {
BEESLOGINFO("Flushed " << wrote_extents << " of " << m_extents << " extents");
}
return wrote_extents;
}
void
@ -160,15 +168,29 @@ BeesHashTable::set_extent_dirty_locked(uint64_t extent_index)
// Signal writeback thread
unique_lock<mutex> dirty_lock(m_dirty_mutex);
m_dirty = true;
m_dirty_condvar.notify_one();
}
void
BeesHashTable::writeback_loop()
{
while (true) {
flush_dirty_extents();
while (!m_stop_requested) {
auto wrote_extents = flush_dirty_extents(true);
BEESNOTE("idle after writing " << wrote_extents << " of " << m_extents << " extents");
unique_lock<mutex> lock(m_dirty_mutex);
if (m_stop_requested) {
break;
}
if (m_dirty) {
m_dirty = false;
} else {
m_dirty_condvar.wait(lock);
}
}
BEESLOGDEBUG("Exited hash table writeback_loop");
}
static
@ -186,7 +208,7 @@ void
BeesHashTable::prefetch_loop()
{
bool not_locked = true;
while (true) {
while (!m_stop_requested) {
size_t width = 64;
vector<size_t> occupancy(width, 0);
size_t occupied_count = 0;
@ -196,7 +218,7 @@ BeesHashTable::prefetch_loop()
size_t toxic_count = 0;
size_t unaligned_eof_count = 0;
for (uint64_t ext = 0; ext < m_extents; ++ext) {
for (uint64_t ext = 0; ext < m_extents && !m_stop_requested; ++ext) {
BEESNOTE("prefetching hash table extent #" << ext << " of " << m_extents);
catch_all([&]() {
fetch_missing_extent_by_index(ext);
@ -300,7 +322,7 @@ BeesHashTable::prefetch_loop()
m_stats_file.write(graph_blob.str());
});
if (not_locked) {
if (not_locked && !m_stop_requested) {
// Always do the mlock, whether shared or not
THROW_CHECK1(runtime_error, m_size, m_size > 0);
BEESLOGINFO("mlock(" << pretty(m_size) << ")...");
@ -314,7 +336,12 @@ BeesHashTable::prefetch_loop()
}
BEESNOTE("idle " << BEES_HASH_TABLE_ANALYZE_INTERVAL << "s");
nanosleep(BEES_HASH_TABLE_ANALYZE_INTERVAL);
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
BEESLOGDEBUG("Stop requested in hash table prefetch");
return;
}
m_stop_condvar.wait_for(lock, chrono::duration<double>(BEES_HASH_TABLE_ANALYZE_INTERVAL));
}
}
@ -704,13 +731,50 @@ BeesHashTable::BeesHashTable(shared_ptr<BeesContext> ctx, string filename, off_t
BeesHashTable::~BeesHashTable()
{
BEESLOGDEBUG("Destroy BeesHashTable");
if (m_cell_ptr && m_size) {
flush_dirty_extents();
// Dirty extents should have been flushed before now,
// e.g. in stop(). If that didn't happen, don't fall
// into the same trap (and maybe throw an exception) here.
// flush_dirty_extents(false);
catch_all([&]() {
DIE_IF_NON_ZERO(munmap(m_cell_ptr, m_size));
m_cell_ptr = nullptr;
m_size = 0;
});
}
BEESLOGDEBUG("BeesHashTable destroyed");
}
void
BeesHashTable::stop()
{
BEESNOTE("stopping BeesHashTable threads");
BEESLOGDEBUG("Stopping BeesHashTable threads");
unique_lock<mutex> lock(m_stop_mutex);
m_stop_requested = true;
m_stop_condvar.notify_all();
lock.unlock();
// Wake up hash writeback too
unique_lock<mutex> dirty_lock(m_dirty_mutex);
m_dirty_condvar.notify_all();
dirty_lock.unlock();
BEESNOTE("waiting for hash_prefetch thread");
BEESLOGDEBUG("Waiting for hash_prefetch thread");
m_prefetch_thread.join();
BEESNOTE("waiting for hash_writeback thread");
BEESLOGDEBUG("Waiting for hash_writeback thread");
m_writeback_thread.join();
if (m_cell_ptr && m_size) {
BEESLOGDEBUG("Flushing hash table");
BEESNOTE("flushing hash table");
flush_dirty_extents(false);
}
BEESLOGDEBUG("BeesHashTable stopped");
}

View File

@ -414,7 +414,7 @@ BeesRoots::crawl_thread()
// Monitor transid_max and wake up roots when it changes
BEESNOTE("tracking transid");
auto last_count = m_transid_re.count();
while (true) {
while (!m_stop_requested) {
// Measure current transid
catch_all([&]() {
m_transid_re.update(transid_max_nocache());
@ -441,7 +441,12 @@ BeesRoots::crawl_thread()
auto poll_time = m_transid_re.seconds_for(m_transid_factor);
BEESLOGDEBUG("Polling " << poll_time << "s for next " << m_transid_factor << " transid " << m_transid_re);
BEESNOTE("waiting " << poll_time << "s for next " << m_transid_factor << " transid " << m_transid_re);
nanosleep(poll_time);
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
BEESLOGDEBUG("Stop requested in crawl thread");
break;
}
m_stop_condvar.wait_for(lock, chrono::duration<double>(poll_time));
}
}
@ -456,7 +461,16 @@ BeesRoots::writeback_thread()
state_save();
});
nanosleep(BEES_WRITEBACK_INTERVAL);
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
BEESLOGDEBUG("Stop requested in writeback thread");
catch_all([&]() {
BEESNOTE("flushing crawler state");
state_save();
});
return;
}
m_stop_condvar.wait_for(lock, chrono::duration<double>(BEES_WRITEBACK_INTERVAL));
}
}
@ -574,6 +588,7 @@ BeesRoots::BeesRoots(shared_ptr<BeesContext> ctx) :
catch_all([&]() {
state_load();
});
m_writeback_thread.exec([&]() {
writeback_thread();
});
@ -581,6 +596,29 @@ BeesRoots::BeesRoots(shared_ptr<BeesContext> ctx) :
});
}
void
BeesRoots::stop()
{
BEESLOGDEBUG("BeesRoots stop requested");
BEESNOTE("stopping BeesRoots");
unique_lock<mutex> lock(m_stop_mutex);
m_stop_requested = true;
m_stop_condvar.notify_all();
lock.unlock();
// Stop crawl writeback first because we will break progress
// state tracking when we cancel the TaskMaster queue
BEESLOGDEBUG("Waiting for crawl writeback");
BEESNOTE("waiting for crawl_writeback thread");
m_writeback_thread.join();
BEESLOGDEBUG("Waiting for crawl thread");
BEESNOTE("waiting for crawl_thread thread");
m_crawl_thread.join();
BEESLOGDEBUG("BeesRoots stopped");
}
Fd
BeesRoots::open_root_nocache(uint64_t rootid)
{

View File

@ -531,10 +531,16 @@ BeesTempFile::resize(off_t offset)
BEESCOUNTADD(tmp_resize_ms, resize_timer.age() * 1000);
}
BeesTempFile::~BeesTempFile()
{
BEESLOGDEBUG("Destructing BeesTempFile " << this);
}
BeesTempFile::BeesTempFile(shared_ptr<BeesContext> ctx) :
m_ctx(ctx),
m_end_offset(0)
{
BEESLOGDEBUG("Constructing BeesTempFile " << this);
create();
}
@ -640,6 +646,77 @@ BeesTempFile::make_copy(const BeesFileRange &src)
return rv;
}
static
ostream &
operator<<(ostream &os, const siginfo_t &si)
{
return os << "siginfo_t { "
<< "signo = " << si.si_signo << " (" << signal_ntoa(si.si_signo) << "), "
<< "errno = " << si.si_errno << ", "
<< "code = " << si.si_code << ", "
// << "trapno = " << si.si_trapno << ", "
<< "pid = " << si.si_pid << ", "
<< "uid = " << si.si_uid << ", "
<< "status = " << si.si_status << ", "
<< "utime = " << si.si_utime << ", "
<< "stime = " << si.si_stime << ", "
// << "value = " << si.si_value << ", "
<< "int = " << si.si_int << ", "
<< "ptr = " << si.si_ptr << ", "
<< "overrun = " << si.si_overrun << ", "
<< "timerid = " << si.si_timerid << ", "
<< "addr = " << si.si_addr << ", "
<< "band = " << si.si_band << ", "
<< "fd = " << si.si_fd << ", "
<< "addr_lsb = " << si.si_addr_lsb << ", "
<< "lower = " << si.si_lower << ", "
<< "upper = " << si.si_upper << ", "
// << "pkey = " << si.si_pkey << ", "
<< "call_addr = " << si.si_call_addr << ", "
<< "syscall = " << si.si_syscall << ", "
<< "arch = " << si.si_arch
<< " }";
}
static sigset_t new_sigset, old_sigset;
void
block_term_signal()
{
BEESLOGDEBUG("Masking signals");
DIE_IF_NON_ZERO(sigemptyset(&new_sigset));
DIE_IF_NON_ZERO(sigaddset(&new_sigset, SIGTERM));
DIE_IF_NON_ZERO(sigaddset(&new_sigset, SIGINT));
DIE_IF_NON_ZERO(sigprocmask(SIG_BLOCK, &new_sigset, &old_sigset));
}
void
wait_for_term_signal()
{
BEESNOTE("waiting for signals");
BEESLOGDEBUG("Waiting for signals...");
siginfo_t info;
// Ironically, sigwaitinfo can be interrupted by a signal.
while (true) {
const int rv = sigwaitinfo(&new_sigset, &info);
if (rv == -1) {
if (errno == EINTR) {
BEESLOGDEBUG("Restarting sigwaitinfo");
continue;
}
THROW_ERRNO("sigwaitinfo errno = " << errno);
} else {
BEESLOGNOTICE("Received signal " << rv << " info " << info);
// Unblock so we die immediately if signalled again
DIE_IF_NON_ZERO(sigprocmask(SIG_BLOCK, &old_sigset, &new_sigset));
break;
}
}
BEESLOGDEBUG("Signal catcher exiting");
}
int
bees_main(int argc, char *argv[])
{
@ -656,6 +733,10 @@ bees_main(int argc, char *argv[])
THROW_CHECK1(invalid_argument, argc, argc >= 0);
// Have to block signals now before we create a bunch of threads
// so the threads will also have the signals blocked.
block_term_signal();
// Create a context so we can apply configuration to it
shared_ptr<BeesContext> bc = make_shared<BeesContext>();
@ -813,12 +894,14 @@ bees_main(int argc, char *argv[])
// Create a context and start crawlers
bc->set_root_path(argv[optind++]);
BeesThread status_thread("status", [&]() {
bc->dump_status();
});
// Start crawlers
bc->start();
// Now we just wait forever
bc->show_progress();
wait_for_term_signal();
// Shut it down
bc->stop();
// That is all.
return EXIT_SUCCESS;

View File

@ -410,6 +410,8 @@ public:
BeesHashTable(shared_ptr<BeesContext> ctx, string filename, off_t size = BLOCK_SIZE_HASHTAB_EXTENT);
~BeesHashTable();
void stop();
vector<Cell> find_cell(HashType hash);
bool push_random_hash_addr(HashType hash, AddrType addr);
void erase_hash_addr(HashType hash, AddrType addr);
@ -444,6 +446,12 @@ private:
// Mutex/condvar for the writeback thread
mutex m_dirty_mutex;
condition_variable m_dirty_condvar;
bool m_dirty;
// Mutex/condvar to stop
mutex m_stop_mutex;
condition_variable m_stop_condvar;
bool m_stop_requested = false;
// Per-extent structures
struct ExtentMetaData {
@ -463,7 +471,7 @@ private:
void fetch_missing_extent_by_hash(HashType hash);
void fetch_missing_extent_by_index(uint64_t extent_index);
void set_extent_dirty_locked(uint64_t extent_index);
void flush_dirty_extents();
size_t flush_dirty_extents(bool slowly);
bool flush_dirty_extent(uint64_t extent_index);
size_t hash_to_extent_index(HashType ht);
@ -529,6 +537,10 @@ class BeesRoots : public enable_shared_from_this<BeesRoots> {
bool m_workaround_btrfs_send = false;
LRUCache<bool, uint64_t> m_root_ro_cache;
mutex m_stop_mutex;
condition_variable m_stop_condvar;
bool m_stop_requested = false;
void insert_new_crawl();
void insert_root(const BeesCrawlState &bcs);
Fd open_root_nocache(uint64_t root);
@ -557,6 +569,9 @@ friend class BeesCrawl;
public:
BeesRoots(shared_ptr<BeesContext> ctx);
void start();
void stop();
Fd open_root(uint64_t root);
Fd open_root_ino(uint64_t root, uint64_t ino);
Fd open_root_ino(const BeesFileId &bfi) { return open_root_ino(bfi.root(), bfi.ino()); }
@ -651,6 +666,7 @@ class BeesTempFile {
void resize(off_t new_end_offset);
public:
~BeesTempFile();
BeesTempFile(shared_ptr<BeesContext> ctx);
BeesFileRange make_hole(off_t count);
BeesFileRange make_copy(const BeesFileRange &src);
@ -677,6 +693,10 @@ struct BeesResolveAddrResult {
bool is_toxic() const { return m_is_toxic; }
};
struct BeesHalt : exception {
const char *what() const noexcept override;
};
class BeesContext : public enable_shared_from_this<BeesContext> {
shared_ptr<BeesContext> m_parent_ctx;
@ -685,7 +705,6 @@ class BeesContext : public enable_shared_from_this<BeesContext> {
shared_ptr<BeesFdCache> m_fd_cache;
shared_ptr<BeesHashTable> m_hash_table;
shared_ptr<BeesRoots> m_roots;
map<thread::id, shared_ptr<BeesTempFile>> m_tmpfiles;
LRUCache<BeesResolveAddrResult, BeesAddress> m_resolve_cache;
@ -701,6 +720,14 @@ class BeesContext : public enable_shared_from_this<BeesContext> {
LockSet<uint64_t> m_extent_lock_set;
mutable mutex m_stop_mutex;
condition_variable m_stop_condvar;
bool m_stop_requested = false;
bool m_stop_status = false;
BeesThread m_progress_thread;
BeesThread m_status_thread;
void set_root_fd(Fd fd);
BeesResolveAddrResult resolve_addr_uncached(BeesAddress addr);
@ -733,6 +760,10 @@ public:
void dump_status();
void show_progress();
void start();
void stop();
bool stop_requested() const;
shared_ptr<BeesFdCache> fd_cache();
shared_ptr<BeesHashTable> hash_table();
shared_ptr<BeesRoots> roots();