1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00

context: speed up orderly process termination

Quite often bees exceeds its service timeout for termination because
it is waiting for a loop embedded in a Task to finish some long-running
btrfs operation.  This can cause bees to be aborted by SIGKILL before
it can completely flush the hash table or save crawl state.

There are only two important things SIGTERM does when bees terminates:
 1.  Save crawl progress
 2.  Flush out the hash table

Everything else is automatically handled by the kernel when the process
is terminated by SIGKILL, so we don't have to bother doing it ourselves.
This can save considerable time at shutdown since we don't have to wait
for every thread to reach a point where it becomes idle, or force loops
to terminate by throwing exceptions, or check a condition every time we
access a pointer.  Instead, we need do only the things in the list
above, and then call _exit() to clean up everything else.

Hash table and crawl state writeback can happen in their background
threads instead of the foreground one.  Separate the "stop" method for
these classes into "stop_request" and "stop_wait" so that these writebacks
can run at the same time.

Deprecate and remove all references to the BeesHalt exception, and remove
several unnecessary checks for BeesContext::stop_requested.

Pause the task queue instead of cancelling it, which preserves the
crawl progress state and stops new Tasks from competing for iops and
CPU during writeback.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2022-11-19 02:45:15 -05:00
parent 594ad1786d
commit 31b2aa3c0d
4 changed files with 72 additions and 106 deletions

View File

@ -826,17 +826,7 @@ BeesContext::wait_for_balance()
}
BEESLOGDEBUG("WORKAROUND: Waiting " << balance_timer << "s for balance to stop");
unique_lock<mutex> lock(m_abort_mutex);
if (m_abort_requested) {
// Force the calling function to stop. We cannot
// proceed to LOGICAL_INO while balance is running
// until the bugs are fixed, and it's probably
// not going to be particularly fast to have
// both bees and balance banging the disk anyway.
BeesTracer::set_silent();
throw std::runtime_error("Stop requested while balance running");
}
m_abort_condvar.wait_for(lock, chrono::duration<double>(BEES_BALANCE_POLL_INTERVAL));
nanosleep(BEES_BALANCE_POLL_INTERVAL);
}
}
@ -960,12 +950,6 @@ BeesContext::set_root_fd(Fd fd)
});
}
const char *
BeesHalt::what() const noexcept
{
return "bees stop requested";
}
void
BeesContext::start()
{
@ -1006,17 +990,37 @@ BeesContext::stop()
Timer stop_timer;
BEESLOGNOTICE("Stopping bees...");
BEESNOTE("aborting blocked tasks");
BEESLOGDEBUG("Aborting blocked tasks");
unique_lock<mutex> abort_lock(m_abort_mutex);
m_abort_requested = true;
m_abort_condvar.notify_all();
abort_lock.unlock();
// Stop TaskConsumers without hurting the Task objects that carry the Crawl state
BEESNOTE("pausing work queue");
BEESLOGDEBUG("Pausing work queue");
TaskMaster::set_thread_count(0);
TaskMaster::pause();
// Stop crawlers first so we get good progress persisted on disk
BEESNOTE("stopping crawlers and flushing crawl state");
BEESLOGDEBUG("Stopping crawlers and flushing crawl state");
if (m_roots) {
m_roots->stop_request();
} else {
BEESLOGDEBUG("Crawlers not running");
}
BEESNOTE("stopping and flushing hash table");
BEESLOGDEBUG("Stopping and flushing hash table");
if (m_hash_table) {
m_hash_table->stop_request();
} else {
BEESLOGDEBUG("Hash table not running");
}
// Wait for crawler writeback to finish
BEESNOTE("waiting for crawlers to stop");
BEESLOGDEBUG("Waiting for crawlers to stop");
if (m_roots) {
m_roots->stop_wait();
}
// It is now no longer possible to update progress in $BEESHOME,
// so we can destroy Tasks with reckless abandon.
BEESNOTE("setting stop_request flag");
BEESLOGDEBUG("Setting stop_request flag");
unique_lock<mutex> lock(m_stop_mutex);
@ -1024,46 +1028,13 @@ BeesContext::stop()
m_stop_condvar.notify_all();
lock.unlock();
// Stop crawlers first so we get good progress persisted on disk
BEESNOTE("stopping crawlers");
BEESLOGDEBUG("Stopping crawlers");
if (m_roots) {
m_roots->stop();
m_roots.reset();
} else {
BEESLOGDEBUG("Crawlers not running");
}
BEESNOTE("cancelling work queue");
BEESLOGDEBUG("Cancelling work queue");
TaskMaster::cancel();
BEESNOTE("stopping hash table");
BEESLOGDEBUG("Stopping hash table");
// Wait for hash table flush to complete
BEESNOTE("waiting for hash table flush to stop");
BEESLOGDEBUG("waiting for hash table flush to stop");
if (m_hash_table) {
m_hash_table->stop();
m_hash_table.reset();
} else {
BEESLOGDEBUG("Hash table not running");
m_hash_table->stop_wait();
}
BEESNOTE("closing tmpfiles");
BEESLOGDEBUG("Closing tmpfiles");
m_tmpfile_pool.clear();
BEESNOTE("closing FD caches");
BEESLOGDEBUG("Closing FD caches");
if (m_fd_cache) {
m_fd_cache->clear();
BEESNOTE("destroying FD caches");
BEESLOGDEBUG("Destroying FD caches");
m_fd_cache.reset();
}
BEESNOTE("waiting for progress thread");
BEESLOGDEBUG("Waiting for progress thread");
m_progress_thread->join();
// Write status once with this message...
BEESNOTE("stopping status thread at " << stop_timer << " sec");
lock.lock();
@ -1079,6 +1050,9 @@ BeesContext::stop()
m_status_thread->join();
BEESLOGNOTICE("bees stopped in " << stop_timer << " sec");
// Skip all destructors, do not pass GO, do not collect atexit() functions
_exit(EXIT_SUCCESS);
}
bool
@ -1119,13 +1093,7 @@ shared_ptr<BeesTempFile>
BeesContext::tmpfile()
{
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
throw BeesHalt();
}
lock.unlock();
return m_tmpfile_pool();
}
@ -1133,9 +1101,6 @@ shared_ptr<BeesFdCache>
BeesContext::fd_cache()
{
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
throw BeesHalt();
}
if (!m_fd_cache) {
m_fd_cache = make_shared<BeesFdCache>(shared_from_this());
}
@ -1146,9 +1111,6 @@ shared_ptr<BeesRoots>
BeesContext::roots()
{
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
throw BeesHalt();
}
if (!m_roots) {
m_roots = make_shared<BeesRoots>(shared_from_this());
}
@ -1159,9 +1121,6 @@ shared_ptr<BeesHashTable>
BeesContext::hash_table()
{
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
throw BeesHalt();
}
if (!m_hash_table) {
m_hash_table = make_shared<BeesHashTable>(shared_from_this(), "beeshash.dat");
}

View File

@ -155,22 +155,18 @@ BeesHashTable::flush_dirty_extents(bool slowly)
if (flush_dirty_extent(extent_index)) {
++wrote_extents;
if (slowly) {
if (m_stop_requested) {
slowly = false;
continue;
}
BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents");
chrono::duration<double> sleep_time(m_flush_rate_limit.sleep_time(BLOCK_SIZE_HASHTAB_EXTENT));
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
BEESLOGDEBUG("Stop requested in hash table flush_dirty_extents");
// This function is called by another thread with !slowly,
// so we just get out of the way here.
break;
}
m_stop_condvar.wait_for(lock, sleep_time);
}
}
}
if (!slowly) {
BEESLOGINFO("Flushed " << wrote_extents << " of " << m_extents << " extents");
}
BEESLOGINFO("Flushed " << wrote_extents << " of " << m_extents << " hash table extents");
return wrote_extents;
}
@ -204,12 +200,27 @@ BeesHashTable::writeback_loop()
m_dirty_condvar.wait(lock);
}
}
// The normal loop exits at the end of one iteration when stop requested,
// but stop request will be in the middle of the loop, and some extents
// will still be dirty. Run the flush loop again to get those.
BEESNOTE("flushing hash table, round 2");
BEESLOGDEBUG("Flushing hash table");
flush_dirty_extents(false);
// If there were any Tasks still running, they may have updated
// some hash table pages during the second flush. These updates
// will be lost. The Tasks will be repeated on the next run because
// they were not completed prior to the stop request, and the
// Crawl progress was already flushed out before the Hash table
// started writing, so nothing is really lost here.
catch_all([&]() {
// trigger writeback on our way out
#if 0
// seems to trigger huge latency spikes
BEESTOOLONG("unreadahead hash table size " << pretty(m_size));
bees_unreadahead(m_fd, 0, m_size);
BEESTOOLONG("unreadahead hash table size " <<
pretty(m_size)); bees_unreadahead(m_fd, 0, m_size);
#endif
});
BEESLOGDEBUG("Exited hash table writeback_loop");
@ -794,7 +805,7 @@ BeesHashTable::~BeesHashTable()
}
void
BeesHashTable::stop()
BeesHashTable::stop_request()
{
BEESNOTE("stopping BeesHashTable threads");
BEESLOGDEBUG("Stopping BeesHashTable threads");
@ -808,7 +819,11 @@ BeesHashTable::stop()
unique_lock<mutex> dirty_lock(m_dirty_mutex);
m_dirty_condvar.notify_all();
dirty_lock.unlock();
}
void
BeesHashTable::stop_wait()
{
BEESNOTE("waiting for hash_prefetch thread");
BEESLOGDEBUG("Waiting for hash_prefetch thread");
m_prefetch_thread.join();
@ -817,11 +832,5 @@ BeesHashTable::stop()
BEESLOGDEBUG("Waiting for hash_writeback thread");
m_writeback_thread.join();
if (m_cell_ptr && m_size) {
BEESLOGDEBUG("Flushing hash table");
BEESNOTE("flushing hash table");
flush_dirty_extents(false);
}
BEESLOGDEBUG("BeesHashTable stopped");
}

View File

@ -595,7 +595,7 @@ BeesRoots::start()
}
void
BeesRoots::stop()
BeesRoots::stop_request()
{
BEESLOGDEBUG("BeesRoots stop requested");
BEESNOTE("stopping BeesRoots");
@ -603,7 +603,11 @@ BeesRoots::stop()
m_stop_requested = true;
m_stop_condvar.notify_all();
lock.unlock();
}
void
BeesRoots::stop_wait()
{
// Stop crawl writeback first because we will break progress
// state tracking when we cancel the TaskMaster queue
BEESLOGDEBUG("Waiting for crawl writeback");

View File

@ -423,7 +423,8 @@ public:
BeesHashTable(shared_ptr<BeesContext> ctx, string filename, off_t size = BLOCK_SIZE_HASHTAB_EXTENT);
~BeesHashTable();
void stop();
void stop_request();
void stop_wait();
vector<Cell> find_cell(HashType hash);
bool push_random_hash_addr(HashType hash, AddrType addr);
@ -595,7 +596,8 @@ friend class BeesTempFile;
public:
BeesRoots(shared_ptr<BeesContext> ctx);
void start();
void stop();
void stop_request();
void stop_wait();
Fd open_root(uint64_t root);
Fd open_root_ino(uint64_t root, uint64_t ino);
@ -720,10 +722,6 @@ struct BeesResolveAddrResult {
bool is_toxic() const { return m_is_toxic; }
};
struct BeesHalt : exception {
const char *what() const noexcept override;
};
class BeesContext : public enable_shared_from_this<BeesContext> {
Fd m_home_fd;
@ -749,10 +747,6 @@ class BeesContext : public enable_shared_from_this<BeesContext> {
bool m_stop_requested = false;
bool m_stop_status = false;
mutable mutex m_abort_mutex;
condition_variable m_abort_condvar;
bool m_abort_requested = false;
shared_ptr<BeesThread> m_progress_thread;
shared_ptr<BeesThread> m_status_thread;