diff --git a/src/bees-context.cc b/src/bees-context.cc index da1e27c..b0677e7 100644 --- a/src/bees-context.cc +++ b/src/bees-context.cc @@ -826,17 +826,7 @@ BeesContext::wait_for_balance() } BEESLOGDEBUG("WORKAROUND: Waiting " << balance_timer << "s for balance to stop"); - unique_lock lock(m_abort_mutex); - if (m_abort_requested) { - // Force the calling function to stop. We cannot - // proceed to LOGICAL_INO while balance is running - // until the bugs are fixed, and it's probably - // not going to be particularly fast to have - // both bees and balance banging the disk anyway. - BeesTracer::set_silent(); - throw std::runtime_error("Stop requested while balance running"); - } - m_abort_condvar.wait_for(lock, chrono::duration(BEES_BALANCE_POLL_INTERVAL)); + nanosleep(BEES_BALANCE_POLL_INTERVAL); } } @@ -960,12 +950,6 @@ BeesContext::set_root_fd(Fd fd) }); } -const char * -BeesHalt::what() const noexcept -{ - return "bees stop requested"; -} - void BeesContext::start() { @@ -1006,17 +990,37 @@ BeesContext::stop() Timer stop_timer; BEESLOGNOTICE("Stopping bees..."); - BEESNOTE("aborting blocked tasks"); - BEESLOGDEBUG("Aborting blocked tasks"); - unique_lock abort_lock(m_abort_mutex); - m_abort_requested = true; - m_abort_condvar.notify_all(); - abort_lock.unlock(); - + // Stop TaskConsumers without hurting the Task objects that carry the Crawl state BEESNOTE("pausing work queue"); BEESLOGDEBUG("Pausing work queue"); - TaskMaster::set_thread_count(0); + TaskMaster::pause(); + // Stop crawlers first so we get good progress persisted on disk + BEESNOTE("stopping crawlers and flushing crawl state"); + BEESLOGDEBUG("Stopping crawlers and flushing crawl state"); + if (m_roots) { + m_roots->stop_request(); + } else { + BEESLOGDEBUG("Crawlers not running"); + } + + BEESNOTE("stopping and flushing hash table"); + BEESLOGDEBUG("Stopping and flushing hash table"); + if (m_hash_table) { + m_hash_table->stop_request(); + } else { + BEESLOGDEBUG("Hash table not running"); + } + + // Wait for crawler writeback to finish + BEESNOTE("waiting for crawlers to stop"); + BEESLOGDEBUG("Waiting for crawlers to stop"); + if (m_roots) { + m_roots->stop_wait(); + } + + // It is now no longer possible to update progress in $BEESHOME, + // so we can destroy Tasks with reckless abandon. BEESNOTE("setting stop_request flag"); BEESLOGDEBUG("Setting stop_request flag"); unique_lock lock(m_stop_mutex); @@ -1024,46 +1028,13 @@ BeesContext::stop() m_stop_condvar.notify_all(); lock.unlock(); - // Stop crawlers first so we get good progress persisted on disk - BEESNOTE("stopping crawlers"); - BEESLOGDEBUG("Stopping crawlers"); - if (m_roots) { - m_roots->stop(); - m_roots.reset(); - } else { - BEESLOGDEBUG("Crawlers not running"); - } - - BEESNOTE("cancelling work queue"); - BEESLOGDEBUG("Cancelling work queue"); - TaskMaster::cancel(); - - BEESNOTE("stopping hash table"); - BEESLOGDEBUG("Stopping hash table"); + // Wait for hash table flush to complete + BEESNOTE("waiting for hash table flush to stop"); + BEESLOGDEBUG("waiting for hash table flush to stop"); if (m_hash_table) { - m_hash_table->stop(); - m_hash_table.reset(); - } else { - BEESLOGDEBUG("Hash table not running"); + m_hash_table->stop_wait(); } - BEESNOTE("closing tmpfiles"); - BEESLOGDEBUG("Closing tmpfiles"); - m_tmpfile_pool.clear(); - - BEESNOTE("closing FD caches"); - BEESLOGDEBUG("Closing FD caches"); - if (m_fd_cache) { - m_fd_cache->clear(); - BEESNOTE("destroying FD caches"); - BEESLOGDEBUG("Destroying FD caches"); - m_fd_cache.reset(); - } - - BEESNOTE("waiting for progress thread"); - BEESLOGDEBUG("Waiting for progress thread"); - m_progress_thread->join(); - // Write status once with this message... BEESNOTE("stopping status thread at " << stop_timer << " sec"); lock.lock(); @@ -1079,6 +1050,9 @@ BeesContext::stop() m_status_thread->join(); BEESLOGNOTICE("bees stopped in " << stop_timer << " sec"); + + // Skip all destructors, do not pass GO, do not collect atexit() functions + _exit(EXIT_SUCCESS); } bool @@ -1119,13 +1093,7 @@ shared_ptr BeesContext::tmpfile() { unique_lock lock(m_stop_mutex); - - if (m_stop_requested) { - throw BeesHalt(); - } - lock.unlock(); - return m_tmpfile_pool(); } @@ -1133,9 +1101,6 @@ shared_ptr BeesContext::fd_cache() { unique_lock lock(m_stop_mutex); - if (m_stop_requested) { - throw BeesHalt(); - } if (!m_fd_cache) { m_fd_cache = make_shared(shared_from_this()); } @@ -1146,9 +1111,6 @@ shared_ptr BeesContext::roots() { unique_lock lock(m_stop_mutex); - if (m_stop_requested) { - throw BeesHalt(); - } if (!m_roots) { m_roots = make_shared(shared_from_this()); } @@ -1159,9 +1121,6 @@ shared_ptr BeesContext::hash_table() { unique_lock lock(m_stop_mutex); - if (m_stop_requested) { - throw BeesHalt(); - } if (!m_hash_table) { m_hash_table = make_shared(shared_from_this(), "beeshash.dat"); } diff --git a/src/bees-hash.cc b/src/bees-hash.cc index 0d2b35e..fb0abbc 100644 --- a/src/bees-hash.cc +++ b/src/bees-hash.cc @@ -155,22 +155,18 @@ BeesHashTable::flush_dirty_extents(bool slowly) if (flush_dirty_extent(extent_index)) { ++wrote_extents; if (slowly) { + if (m_stop_requested) { + slowly = false; + continue; + } BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents"); chrono::duration sleep_time(m_flush_rate_limit.sleep_time(BLOCK_SIZE_HASHTAB_EXTENT)); unique_lock lock(m_stop_mutex); - if (m_stop_requested) { - BEESLOGDEBUG("Stop requested in hash table flush_dirty_extents"); - // This function is called by another thread with !slowly, - // so we just get out of the way here. - break; - } m_stop_condvar.wait_for(lock, sleep_time); } } } - if (!slowly) { - BEESLOGINFO("Flushed " << wrote_extents << " of " << m_extents << " extents"); - } + BEESLOGINFO("Flushed " << wrote_extents << " of " << m_extents << " hash table extents"); return wrote_extents; } @@ -204,12 +200,27 @@ BeesHashTable::writeback_loop() m_dirty_condvar.wait(lock); } } + + // The normal loop exits at the end of one iteration when stop requested, + // but stop request will be in the middle of the loop, and some extents + // will still be dirty. Run the flush loop again to get those. + BEESNOTE("flushing hash table, round 2"); + BEESLOGDEBUG("Flushing hash table"); + flush_dirty_extents(false); + + // If there were any Tasks still running, they may have updated + // some hash table pages during the second flush. These updates + // will be lost. The Tasks will be repeated on the next run because + // they were not completed prior to the stop request, and the + // Crawl progress was already flushed out before the Hash table + // started writing, so nothing is really lost here. + catch_all([&]() { // trigger writeback on our way out #if 0 // seems to trigger huge latency spikes - BEESTOOLONG("unreadahead hash table size " << pretty(m_size)); - bees_unreadahead(m_fd, 0, m_size); + BEESTOOLONG("unreadahead hash table size " << + pretty(m_size)); bees_unreadahead(m_fd, 0, m_size); #endif }); BEESLOGDEBUG("Exited hash table writeback_loop"); @@ -794,7 +805,7 @@ BeesHashTable::~BeesHashTable() } void -BeesHashTable::stop() +BeesHashTable::stop_request() { BEESNOTE("stopping BeesHashTable threads"); BEESLOGDEBUG("Stopping BeesHashTable threads"); @@ -808,7 +819,11 @@ BeesHashTable::stop() unique_lock dirty_lock(m_dirty_mutex); m_dirty_condvar.notify_all(); dirty_lock.unlock(); +} +void +BeesHashTable::stop_wait() +{ BEESNOTE("waiting for hash_prefetch thread"); BEESLOGDEBUG("Waiting for hash_prefetch thread"); m_prefetch_thread.join(); @@ -817,11 +832,5 @@ BeesHashTable::stop() BEESLOGDEBUG("Waiting for hash_writeback thread"); m_writeback_thread.join(); - if (m_cell_ptr && m_size) { - BEESLOGDEBUG("Flushing hash table"); - BEESNOTE("flushing hash table"); - flush_dirty_extents(false); - } - BEESLOGDEBUG("BeesHashTable stopped"); } diff --git a/src/bees-roots.cc b/src/bees-roots.cc index 7dc877a..a0105b8 100644 --- a/src/bees-roots.cc +++ b/src/bees-roots.cc @@ -595,7 +595,7 @@ BeesRoots::start() } void -BeesRoots::stop() +BeesRoots::stop_request() { BEESLOGDEBUG("BeesRoots stop requested"); BEESNOTE("stopping BeesRoots"); @@ -603,7 +603,11 @@ BeesRoots::stop() m_stop_requested = true; m_stop_condvar.notify_all(); lock.unlock(); +} +void +BeesRoots::stop_wait() +{ // Stop crawl writeback first because we will break progress // state tracking when we cancel the TaskMaster queue BEESLOGDEBUG("Waiting for crawl writeback"); diff --git a/src/bees.h b/src/bees.h index 49ca769..b300621 100644 --- a/src/bees.h +++ b/src/bees.h @@ -423,7 +423,8 @@ public: BeesHashTable(shared_ptr ctx, string filename, off_t size = BLOCK_SIZE_HASHTAB_EXTENT); ~BeesHashTable(); - void stop(); + void stop_request(); + void stop_wait(); vector find_cell(HashType hash); bool push_random_hash_addr(HashType hash, AddrType addr); @@ -595,7 +596,8 @@ friend class BeesTempFile; public: BeesRoots(shared_ptr ctx); void start(); - void stop(); + void stop_request(); + void stop_wait(); Fd open_root(uint64_t root); Fd open_root_ino(uint64_t root, uint64_t ino); @@ -720,10 +722,6 @@ struct BeesResolveAddrResult { bool is_toxic() const { return m_is_toxic; } }; -struct BeesHalt : exception { - const char *what() const noexcept override; -}; - class BeesContext : public enable_shared_from_this { Fd m_home_fd; @@ -749,10 +747,6 @@ class BeesContext : public enable_shared_from_this { bool m_stop_requested = false; bool m_stop_status = false; - mutable mutex m_abort_mutex; - condition_variable m_abort_condvar; - bool m_abort_requested = false; - shared_ptr m_progress_thread; shared_ptr m_status_thread;