diff --git a/src/bees-context.cc b/src/bees-context.cc index 127c051..85418ab 100644 --- a/src/bees-context.cc +++ b/src/bees-context.cc @@ -1,5 +1,6 @@ #include "bees.h" +#include "crucible/cleanup.h" #include "crucible/limits.h" #include "crucible/string.h" #include "crucible/task.h" @@ -13,9 +14,13 @@ // struct rusage #include +// struct sigset +#include + using namespace crucible; using namespace std; + static inline const char * getenv_or_die(const char *name) @@ -82,17 +87,15 @@ BeesContext::dump_status() if (!status_charp) return; string status_file(status_charp); BEESLOGINFO("Writing status to file '" << status_file << "' every " << BEES_STATUS_INTERVAL << " sec"); - while (1) { - BEESNOTE("waiting " << BEES_STATUS_INTERVAL); - sleep(BEES_STATUS_INTERVAL); - + Timer total_timer; + while (!m_stop_status) { BEESNOTE("writing status to file '" << status_file << "'"); ofstream ofs(status_file + ".tmp"); auto thisStats = BeesStats::s_global; ofs << "TOTAL:\n"; ofs << "\t" << thisStats << "\n"; - auto avg_rates = thisStats / m_total_timer.age(); + auto avg_rates = thisStats / total_timer.age(); ofs << "RATES:\n"; ofs << "\t" << avg_rates << "\n"; @@ -113,42 +116,65 @@ BeesContext::dump_status() BEESNOTE("renaming status file '" << status_file << "'"); rename((status_file + ".tmp").c_str(), status_file.c_str()); + + BEESNOTE("idle " << BEES_STATUS_INTERVAL); + unique_lock lock(m_stop_mutex); + if (m_stop_status) { + return; + } + m_stop_condvar.wait_for(lock, chrono::duration(BEES_STATUS_INTERVAL)); } } void BeesContext::show_progress() { - auto lastProgressStats = BeesStats::s_global; - auto lastStats = lastProgressStats; + auto lastStats = BeesStats::s_global; Timer stats_timer; - while (1) { - sleep(BEES_PROGRESS_INTERVAL); + Timer all_timer; + while (!stop_requested()) { + BEESNOTE("idle " << BEES_PROGRESS_INTERVAL); - if (stats_timer.age() > BEES_STATS_INTERVAL) { - stats_timer.lap(); - - auto thisStats = BeesStats::s_global; - auto avg_rates = lastStats / BEES_STATS_INTERVAL; - BEESLOGINFO("TOTAL: " << thisStats); - BEESLOGINFO("RATES: " << avg_rates); - lastStats = thisStats; + unique_lock lock(m_stop_mutex); + if (m_stop_requested) { + return; } + m_stop_condvar.wait_for(lock, chrono::duration(BEES_PROGRESS_INTERVAL)); - BEESLOGINFO("ACTIVITY:"); - + // Snapshot stats and timer state auto thisStats = BeesStats::s_global; - auto deltaStats = thisStats - lastProgressStats; - if (deltaStats) { - BEESLOGINFO("\t" << deltaStats / BEES_PROGRESS_INTERVAL); - }; - lastProgressStats = thisStats; + auto stats_age = stats_timer.age(); + auto all_age = all_timer.age(); + stats_timer.lap(); + auto avg_rates = thisStats / stats_age; + + BEESNOTE("logging event counter totals for last " << all_timer); + BEESLOGINFO("TOTAL COUNTS (" << all_age << "s):\n\t" << thisStats); + + BEESNOTE("logging event counter rates for last " << all_timer); + BEESLOGINFO("TOTAL RATES (" << all_age << "s):\n\t" << avg_rates); + + BEESNOTE("logging event counter delta counts for last " << stats_age); + BEESLOGINFO("DELTA COUNTS (" << stats_age << "s):"); + + auto deltaStats = thisStats - lastStats; + BEESLOGINFO("\t" << deltaStats / stats_age); + + BEESNOTE("logging event counter delta rates for last " << stats_age); + BEESLOGINFO("DELTA RATES (" << stats_age << "s):"); + + auto deltaRates = deltaStats / stats_age; + BEESLOGINFO("\t" << deltaRates); + + BEESNOTE("logging current thread status"); BEESLOGINFO("THREADS:"); for (auto t : BeesNote::get_status()) { BEESLOGINFO("\ttid " << t.first << ": " << t.second); } + + lastStats = thisStats; } } @@ -171,11 +197,19 @@ BeesContext::home_fd() } BeesContext::BeesContext(shared_ptr parent) : - m_parent_ctx(parent) + m_parent_ctx(parent), + m_progress_thread("progress_report"), + m_status_thread("status_report") { if (m_parent_ctx) { m_fd_cache = m_parent_ctx->fd_cache(); } + m_progress_thread.exec([=]() { + show_progress(); + }); + m_status_thread.exec([=]() { + dump_status(); + }); } bool @@ -745,7 +779,7 @@ BeesContext::scan_forward(const BeesFileRange &bfr) Extent e; catch_all([&]() { - while (true) { + while (!stop_requested()) { e = ew.current(); catch_all([&]() { @@ -881,11 +915,106 @@ BeesContext::set_root_fd(Fd fd) m_resolve_cache.func([&](BeesAddress addr) -> BeesResolveAddrResult { return resolve_addr_uncached(addr); }); +} - // Start queue producers +const char * +BeesHalt::what() const noexcept +{ + return "bees stop requested"; +} + +void +BeesContext::start() +{ + BEESLOGNOTICE("Starting bees main loop..."); + BEESNOTE("starting BeesContext"); + + // Force these to exist now so we don't have recursive locking + // operations trying to access them + fd_cache(); + hash_table(); + + // Kick off the crawlers roots(); +} - BEESLOGINFO("returning from set_root_fd in " << name_fd(fd)); +void +BeesContext::stop() +{ + Timer stop_timer; + BEESLOGNOTICE("Stopping bees..."); + BEESLOGWARN("WARNING: This feature is EXPERIMENTAL and may not work!"); + + BEESNOTE("setting stop_request flag"); + + BEESNOTE("pausing work queue"); + BEESLOGDEBUG("Pausing work queue"); + TaskMaster::set_thread_count(0); + + BEESLOGDEBUG("Setting stop_request flag"); + unique_lock lock(m_stop_mutex); + m_stop_requested = true; + m_stop_condvar.notify_all(); + lock.unlock(); + + // Stop crawlers first so we get good progress persisted on disk + BEESNOTE("stopping crawlers"); + BEESLOGDEBUG("Stopping crawlers"); + if (m_roots) { + m_roots->stop(); + m_roots.reset(); + } else { + BEESLOGDEBUG("Crawlers not running"); + } + + BEESNOTE("cancelling work queue"); + BEESLOGDEBUG("Cancelling work queue"); + TaskMaster::cancel(); + + BEESNOTE("stopping hash table"); + BEESLOGDEBUG("Stopping hash table"); + if (m_hash_table) { + m_hash_table->stop(); + m_hash_table.reset(); + } else { + BEESLOGDEBUG("Hash table not running"); + } + + BEESNOTE("closing tmpfiles"); + BEESLOGDEBUG("Closing tmpfiles"); + m_tmpfiles.clear(); + + BEESNOTE("closing FD caches"); + BEESLOGDEBUG("Closing FD caches"); + if (m_fd_cache) { + m_fd_cache->clear(); + BEESNOTE("destroying FD caches"); + BEESLOGDEBUG("Destroying FD caches"); + m_fd_cache.reset(); + } + + BEESNOTE("waiting for progress thread"); + BEESLOGDEBUG("Waiting for progress thread"); + m_progress_thread.join(); + + // XXX: nobody can see this BEESNOTE because we are killing the + // thread that publishes it + BEESNOTE("waiting for progress thread"); + BEESLOGDEBUG("Waiting for progress thread"); + lock.lock(); + m_stop_status = true; + m_stop_condvar.notify_all(); + lock.unlock(); + m_status_thread.join(); + + BEESLOGNOTICE("bees stopped in " << stop_timer << " sec"); +} + +bool +BeesContext::stop_requested() const +{ + unique_lock lock(m_stop_mutex); + return m_stop_requested; } void @@ -910,22 +1039,32 @@ BeesContext::is_blacklisted(const BeesFileId &fid) const shared_ptr BeesContext::tmpfile() { - // There need be only one, this is not a high-contention path - static mutex s_mutex; - unique_lock lock(s_mutex); + // FIXME: this whole thing leaks FDs (quite slowly). Make a pool instead. + + unique_lock lock(m_stop_mutex); + + if (m_stop_requested) { + throw BeesHalt(); + } if (!m_tmpfiles[this_thread::get_id()]) { - m_tmpfiles[this_thread::get_id()] = make_shared(shared_from_this()); + // We know we are the only possible accessor of this, + // so drop the lock to avoid a deadlock loop + lock.unlock(); + auto rv = make_shared(shared_from_this()); + lock.lock(); + m_tmpfiles[this_thread::get_id()] = rv; } - auto rv = m_tmpfiles[this_thread::get_id()]; - return rv; + return m_tmpfiles[this_thread::get_id()]; } shared_ptr BeesContext::fd_cache() { - static mutex s_mutex; - unique_lock lock(s_mutex); + unique_lock lock(m_stop_mutex); + if (m_stop_requested) { + throw BeesHalt(); + } if (!m_fd_cache) { m_fd_cache = make_shared(); } @@ -936,8 +1075,10 @@ BeesContext::fd_cache() shared_ptr BeesContext::roots() { - static mutex s_mutex; - unique_lock lock(s_mutex); + unique_lock lock(m_stop_mutex); + if (m_stop_requested) { + throw BeesHalt(); + } if (!m_roots) { m_roots = make_shared(shared_from_this()); } @@ -948,8 +1089,10 @@ BeesContext::roots() shared_ptr BeesContext::hash_table() { - static mutex s_mutex; - unique_lock lock(s_mutex); + unique_lock lock(m_stop_mutex); + if (m_stop_requested) { + throw BeesHalt(); + } if (!m_hash_table) { m_hash_table = make_shared(shared_from_this(), "beeshash.dat"); } diff --git a/src/bees-hash.cc b/src/bees-hash.cc index 45a46ef..31d99b3 100644 --- a/src/bees-hash.cc +++ b/src/bees-hash.cc @@ -130,13 +130,11 @@ BeesHashTable::flush_dirty_extent(uint64_t extent_index) wrote_extent = true; }); - BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents"); - m_flush_rate_limit.sleep_for(BLOCK_SIZE_HASHTAB_EXTENT); return wrote_extent; } -void -BeesHashTable::flush_dirty_extents() +size_t +BeesHashTable::flush_dirty_extents(bool slowly) { THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0); @@ -144,12 +142,22 @@ BeesHashTable::flush_dirty_extents() for (size_t extent_index = 0; extent_index < m_extents; ++extent_index) { if (flush_dirty_extent(extent_index)) { ++wrote_extents; + if (slowly) { + BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents"); + chrono::duration sleep_time(m_flush_rate_limit.sleep_time(BLOCK_SIZE_HASHTAB_EXTENT)); + unique_lock lock(m_stop_mutex); + if (m_stop_requested) { + BEESLOGDEBUG("Stop requested in hash table flush_dirty_extents"); + break; + } + m_stop_condvar.wait_for(lock, sleep_time); + } } } - - BEESNOTE("idle after writing " << wrote_extents << " of " << m_extents << " extents"); - unique_lock lock(m_dirty_mutex); - m_dirty_condvar.wait(lock); + if (!slowly) { + BEESLOGINFO("Flushed " << wrote_extents << " of " << m_extents << " extents"); + } + return wrote_extents; } void @@ -160,15 +168,29 @@ BeesHashTable::set_extent_dirty_locked(uint64_t extent_index) // Signal writeback thread unique_lock dirty_lock(m_dirty_mutex); + m_dirty = true; m_dirty_condvar.notify_one(); } void BeesHashTable::writeback_loop() { - while (true) { - flush_dirty_extents(); + while (!m_stop_requested) { + auto wrote_extents = flush_dirty_extents(true); + + BEESNOTE("idle after writing " << wrote_extents << " of " << m_extents << " extents"); + + unique_lock lock(m_dirty_mutex); + if (m_stop_requested) { + break; + } + if (m_dirty) { + m_dirty = false; + } else { + m_dirty_condvar.wait(lock); + } } + BEESLOGDEBUG("Exited hash table writeback_loop"); } static @@ -186,7 +208,7 @@ void BeesHashTable::prefetch_loop() { bool not_locked = true; - while (true) { + while (!m_stop_requested) { size_t width = 64; vector occupancy(width, 0); size_t occupied_count = 0; @@ -196,7 +218,7 @@ BeesHashTable::prefetch_loop() size_t toxic_count = 0; size_t unaligned_eof_count = 0; - for (uint64_t ext = 0; ext < m_extents; ++ext) { + for (uint64_t ext = 0; ext < m_extents && !m_stop_requested; ++ext) { BEESNOTE("prefetching hash table extent #" << ext << " of " << m_extents); catch_all([&]() { fetch_missing_extent_by_index(ext); @@ -300,7 +322,7 @@ BeesHashTable::prefetch_loop() m_stats_file.write(graph_blob.str()); }); - if (not_locked) { + if (not_locked && !m_stop_requested) { // Always do the mlock, whether shared or not THROW_CHECK1(runtime_error, m_size, m_size > 0); BEESLOGINFO("mlock(" << pretty(m_size) << ")..."); @@ -314,7 +336,12 @@ BeesHashTable::prefetch_loop() } BEESNOTE("idle " << BEES_HASH_TABLE_ANALYZE_INTERVAL << "s"); - nanosleep(BEES_HASH_TABLE_ANALYZE_INTERVAL); + unique_lock lock(m_stop_mutex); + if (m_stop_requested) { + BEESLOGDEBUG("Stop requested in hash table prefetch"); + return; + } + m_stop_condvar.wait_for(lock, chrono::duration(BEES_HASH_TABLE_ANALYZE_INTERVAL)); } } @@ -704,13 +731,50 @@ BeesHashTable::BeesHashTable(shared_ptr ctx, string filename, off_t BeesHashTable::~BeesHashTable() { + BEESLOGDEBUG("Destroy BeesHashTable"); if (m_cell_ptr && m_size) { - flush_dirty_extents(); + // Dirty extents should have been flushed before now, + // e.g. in stop(). If that didn't happen, don't fall + // into the same trap (and maybe throw an exception) here. + // flush_dirty_extents(false); catch_all([&]() { DIE_IF_NON_ZERO(munmap(m_cell_ptr, m_size)); m_cell_ptr = nullptr; m_size = 0; }); } + BEESLOGDEBUG("BeesHashTable destroyed"); } +void +BeesHashTable::stop() +{ + BEESNOTE("stopping BeesHashTable threads"); + BEESLOGDEBUG("Stopping BeesHashTable threads"); + + unique_lock lock(m_stop_mutex); + m_stop_requested = true; + m_stop_condvar.notify_all(); + lock.unlock(); + + // Wake up hash writeback too + unique_lock dirty_lock(m_dirty_mutex); + m_dirty_condvar.notify_all(); + dirty_lock.unlock(); + + BEESNOTE("waiting for hash_prefetch thread"); + BEESLOGDEBUG("Waiting for hash_prefetch thread"); + m_prefetch_thread.join(); + + BEESNOTE("waiting for hash_writeback thread"); + BEESLOGDEBUG("Waiting for hash_writeback thread"); + m_writeback_thread.join(); + + if (m_cell_ptr && m_size) { + BEESLOGDEBUG("Flushing hash table"); + BEESNOTE("flushing hash table"); + flush_dirty_extents(false); + } + + BEESLOGDEBUG("BeesHashTable stopped"); +} diff --git a/src/bees-roots.cc b/src/bees-roots.cc index cf471fa..8e05a3e 100644 --- a/src/bees-roots.cc +++ b/src/bees-roots.cc @@ -414,7 +414,7 @@ BeesRoots::crawl_thread() // Monitor transid_max and wake up roots when it changes BEESNOTE("tracking transid"); auto last_count = m_transid_re.count(); - while (true) { + while (!m_stop_requested) { // Measure current transid catch_all([&]() { m_transid_re.update(transid_max_nocache()); @@ -441,7 +441,12 @@ BeesRoots::crawl_thread() auto poll_time = m_transid_re.seconds_for(m_transid_factor); BEESLOGDEBUG("Polling " << poll_time << "s for next " << m_transid_factor << " transid " << m_transid_re); BEESNOTE("waiting " << poll_time << "s for next " << m_transid_factor << " transid " << m_transid_re); - nanosleep(poll_time); + unique_lock lock(m_stop_mutex); + if (m_stop_requested) { + BEESLOGDEBUG("Stop requested in crawl thread"); + break; + } + m_stop_condvar.wait_for(lock, chrono::duration(poll_time)); } } @@ -456,7 +461,16 @@ BeesRoots::writeback_thread() state_save(); }); - nanosleep(BEES_WRITEBACK_INTERVAL); + unique_lock lock(m_stop_mutex); + if (m_stop_requested) { + BEESLOGDEBUG("Stop requested in writeback thread"); + catch_all([&]() { + BEESNOTE("flushing crawler state"); + state_save(); + }); + return; + } + m_stop_condvar.wait_for(lock, chrono::duration(BEES_WRITEBACK_INTERVAL)); } } @@ -574,6 +588,7 @@ BeesRoots::BeesRoots(shared_ptr ctx) : catch_all([&]() { state_load(); }); + m_writeback_thread.exec([&]() { writeback_thread(); }); @@ -581,6 +596,29 @@ BeesRoots::BeesRoots(shared_ptr ctx) : }); } +void +BeesRoots::stop() +{ + BEESLOGDEBUG("BeesRoots stop requested"); + BEESNOTE("stopping BeesRoots"); + unique_lock lock(m_stop_mutex); + m_stop_requested = true; + m_stop_condvar.notify_all(); + lock.unlock(); + + // Stop crawl writeback first because we will break progress + // state tracking when we cancel the TaskMaster queue + BEESLOGDEBUG("Waiting for crawl writeback"); + BEESNOTE("waiting for crawl_writeback thread"); + m_writeback_thread.join(); + + BEESLOGDEBUG("Waiting for crawl thread"); + BEESNOTE("waiting for crawl_thread thread"); + m_crawl_thread.join(); + + BEESLOGDEBUG("BeesRoots stopped"); +} + Fd BeesRoots::open_root_nocache(uint64_t rootid) { diff --git a/src/bees.cc b/src/bees.cc index abbc25c..7267c06 100644 --- a/src/bees.cc +++ b/src/bees.cc @@ -531,10 +531,16 @@ BeesTempFile::resize(off_t offset) BEESCOUNTADD(tmp_resize_ms, resize_timer.age() * 1000); } +BeesTempFile::~BeesTempFile() +{ + BEESLOGDEBUG("Destructing BeesTempFile " << this); +} + BeesTempFile::BeesTempFile(shared_ptr ctx) : m_ctx(ctx), m_end_offset(0) { + BEESLOGDEBUG("Constructing BeesTempFile " << this); create(); } @@ -640,6 +646,77 @@ BeesTempFile::make_copy(const BeesFileRange &src) return rv; } +static +ostream & +operator<<(ostream &os, const siginfo_t &si) +{ + return os << "siginfo_t { " + << "signo = " << si.si_signo << " (" << signal_ntoa(si.si_signo) << "), " + << "errno = " << si.si_errno << ", " + << "code = " << si.si_code << ", " + // << "trapno = " << si.si_trapno << ", " + << "pid = " << si.si_pid << ", " + << "uid = " << si.si_uid << ", " + << "status = " << si.si_status << ", " + << "utime = " << si.si_utime << ", " + << "stime = " << si.si_stime << ", " + // << "value = " << si.si_value << ", " + << "int = " << si.si_int << ", " + << "ptr = " << si.si_ptr << ", " + << "overrun = " << si.si_overrun << ", " + << "timerid = " << si.si_timerid << ", " + << "addr = " << si.si_addr << ", " + << "band = " << si.si_band << ", " + << "fd = " << si.si_fd << ", " + << "addr_lsb = " << si.si_addr_lsb << ", " + << "lower = " << si.si_lower << ", " + << "upper = " << si.si_upper << ", " + // << "pkey = " << si.si_pkey << ", " + << "call_addr = " << si.si_call_addr << ", " + << "syscall = " << si.si_syscall << ", " + << "arch = " << si.si_arch + << " }"; +} + +static sigset_t new_sigset, old_sigset; + +void +block_term_signal() +{ + BEESLOGDEBUG("Masking signals"); + + DIE_IF_NON_ZERO(sigemptyset(&new_sigset)); + DIE_IF_NON_ZERO(sigaddset(&new_sigset, SIGTERM)); + DIE_IF_NON_ZERO(sigaddset(&new_sigset, SIGINT)); + DIE_IF_NON_ZERO(sigprocmask(SIG_BLOCK, &new_sigset, &old_sigset)); +} + +void +wait_for_term_signal() +{ + BEESNOTE("waiting for signals"); + BEESLOGDEBUG("Waiting for signals..."); + siginfo_t info; + + // Ironically, sigwaitinfo can be interrupted by a signal. + while (true) { + const int rv = sigwaitinfo(&new_sigset, &info); + if (rv == -1) { + if (errno == EINTR) { + BEESLOGDEBUG("Restarting sigwaitinfo"); + continue; + } + THROW_ERRNO("sigwaitinfo errno = " << errno); + } else { + BEESLOGNOTICE("Received signal " << rv << " info " << info); + // Unblock so we die immediately if signalled again + DIE_IF_NON_ZERO(sigprocmask(SIG_BLOCK, &old_sigset, &new_sigset)); + break; + } + } + BEESLOGDEBUG("Signal catcher exiting"); +} + int bees_main(int argc, char *argv[]) { @@ -656,6 +733,10 @@ bees_main(int argc, char *argv[]) THROW_CHECK1(invalid_argument, argc, argc >= 0); + // Have to block signals now before we create a bunch of threads + // so the threads will also have the signals blocked. + block_term_signal(); + // Create a context so we can apply configuration to it shared_ptr bc = make_shared(); @@ -813,12 +894,14 @@ bees_main(int argc, char *argv[]) // Create a context and start crawlers bc->set_root_path(argv[optind++]); - BeesThread status_thread("status", [&]() { - bc->dump_status(); - }); + // Start crawlers + bc->start(); // Now we just wait forever - bc->show_progress(); + wait_for_term_signal(); + + // Shut it down + bc->stop(); // That is all. return EXIT_SUCCESS; diff --git a/src/bees.h b/src/bees.h index 2b0260a..3ef655d 100644 --- a/src/bees.h +++ b/src/bees.h @@ -410,6 +410,8 @@ public: BeesHashTable(shared_ptr ctx, string filename, off_t size = BLOCK_SIZE_HASHTAB_EXTENT); ~BeesHashTable(); + void stop(); + vector find_cell(HashType hash); bool push_random_hash_addr(HashType hash, AddrType addr); void erase_hash_addr(HashType hash, AddrType addr); @@ -444,6 +446,12 @@ private: // Mutex/condvar for the writeback thread mutex m_dirty_mutex; condition_variable m_dirty_condvar; + bool m_dirty; + + // Mutex/condvar to stop + mutex m_stop_mutex; + condition_variable m_stop_condvar; + bool m_stop_requested = false; // Per-extent structures struct ExtentMetaData { @@ -463,7 +471,7 @@ private: void fetch_missing_extent_by_hash(HashType hash); void fetch_missing_extent_by_index(uint64_t extent_index); void set_extent_dirty_locked(uint64_t extent_index); - void flush_dirty_extents(); + size_t flush_dirty_extents(bool slowly); bool flush_dirty_extent(uint64_t extent_index); size_t hash_to_extent_index(HashType ht); @@ -529,6 +537,10 @@ class BeesRoots : public enable_shared_from_this { bool m_workaround_btrfs_send = false; LRUCache m_root_ro_cache; + mutex m_stop_mutex; + condition_variable m_stop_condvar; + bool m_stop_requested = false; + void insert_new_crawl(); void insert_root(const BeesCrawlState &bcs); Fd open_root_nocache(uint64_t root); @@ -557,6 +569,9 @@ friend class BeesCrawl; public: BeesRoots(shared_ptr ctx); + void start(); + void stop(); + Fd open_root(uint64_t root); Fd open_root_ino(uint64_t root, uint64_t ino); Fd open_root_ino(const BeesFileId &bfi) { return open_root_ino(bfi.root(), bfi.ino()); } @@ -651,6 +666,7 @@ class BeesTempFile { void resize(off_t new_end_offset); public: + ~BeesTempFile(); BeesTempFile(shared_ptr ctx); BeesFileRange make_hole(off_t count); BeesFileRange make_copy(const BeesFileRange &src); @@ -677,6 +693,10 @@ struct BeesResolveAddrResult { bool is_toxic() const { return m_is_toxic; } }; +struct BeesHalt : exception { + const char *what() const noexcept override; +}; + class BeesContext : public enable_shared_from_this { shared_ptr m_parent_ctx; @@ -685,7 +705,6 @@ class BeesContext : public enable_shared_from_this { shared_ptr m_fd_cache; shared_ptr m_hash_table; shared_ptr m_roots; - map> m_tmpfiles; LRUCache m_resolve_cache; @@ -701,6 +720,14 @@ class BeesContext : public enable_shared_from_this { LockSet m_extent_lock_set; + mutable mutex m_stop_mutex; + condition_variable m_stop_condvar; + bool m_stop_requested = false; + bool m_stop_status = false; + + BeesThread m_progress_thread; + BeesThread m_status_thread; + void set_root_fd(Fd fd); BeesResolveAddrResult resolve_addr_uncached(BeesAddress addr); @@ -733,6 +760,10 @@ public: void dump_status(); void show_progress(); + void start(); + void stop(); + bool stop_requested() const; + shared_ptr fd_cache(); shared_ptr hash_table(); shared_ptr roots();