mirror of
https://github.com/Zygo/bees.git
synced 2025-05-17 21:35:45 +02:00
crawl: don't block a Task waiting for new transids
Task should not block for extended periods of time. Remove the RateEstimator::wait_for() in crawl_roots. When crawl_roots runs out of data, let the last crawl_task end without rescheduling. Schedule crawl_task again on transid polls if it was not already running. Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
parent
b67fba0acd
commit
4f0bc78a4c
@ -247,7 +247,7 @@ BeesRoots::crawl_batch(shared_ptr<BeesCrawl> this_crawl)
|
|||||||
return batch_count;
|
return batch_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
bool
|
||||||
BeesRoots::crawl_roots()
|
BeesRoots::crawl_roots()
|
||||||
{
|
{
|
||||||
BEESNOTE("Crawling roots");
|
BEESNOTE("Crawling roots");
|
||||||
@ -280,13 +280,13 @@ BeesRoots::crawl_roots()
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!first_crawl) {
|
if (!first_crawl) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto batch_count = crawl_batch(first_crawl);
|
auto batch_count = crawl_batch(first_crawl);
|
||||||
|
|
||||||
if (batch_count) {
|
if (batch_count) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
@ -300,7 +300,7 @@ BeesRoots::crawl_roots()
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (batch_count) {
|
if (batch_count) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
@ -322,7 +322,7 @@ BeesRoots::crawl_roots()
|
|||||||
for (auto i : crawl_vector) {
|
for (auto i : crawl_vector) {
|
||||||
batch_count += crawl_batch(i);
|
batch_count += crawl_batch(i);
|
||||||
if (batch_count) {
|
if (batch_count) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -337,13 +337,9 @@ BeesRoots::crawl_roots()
|
|||||||
auto want_transid = m_transid_re.count() + m_transid_factor;
|
auto want_transid = m_transid_re.count() + m_transid_factor;
|
||||||
auto ran_out_time = m_crawl_timer.lap();
|
auto ran_out_time = m_crawl_timer.lap();
|
||||||
BEESLOGINFO("Crawl master ran out of data after " << ran_out_time << "s, waiting about " << m_transid_re.seconds_until(want_transid) << "s for transid " << want_transid << "...");
|
BEESLOGINFO("Crawl master ran out of data after " << ran_out_time << "s, waiting about " << m_transid_re.seconds_until(want_transid) << "s for transid " << want_transid << "...");
|
||||||
BEESNOTE("idle, waiting for transid " << want_transid << ": " << m_transid_re);
|
|
||||||
// FIXME: Tasks should not block arbitrarily
|
|
||||||
m_transid_re.wait_until(want_transid);
|
|
||||||
|
|
||||||
auto resumed_after_time = m_crawl_timer.lap();
|
// Do not run again
|
||||||
auto new_transid = m_transid_re.count();
|
return false;
|
||||||
BEESLOGINFO("Crawl master resumed after " << resumed_after_time << "s at transid " << new_transid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -351,19 +347,25 @@ BeesRoots::crawl_thread()
|
|||||||
{
|
{
|
||||||
BEESNOTE("creating crawl task");
|
BEESNOTE("creating crawl task");
|
||||||
|
|
||||||
// Start the Task that does the crawling
|
// Create the Task that does the crawling
|
||||||
auto shared_this = shared_from_this();
|
auto shared_this = shared_from_this();
|
||||||
Task("crawl_master", [shared_this]() {
|
m_crawl_task = Task("crawl_master", [shared_this]() {
|
||||||
auto tqs = TaskMaster::get_queue_count();
|
auto tqs = TaskMaster::get_queue_count();
|
||||||
BEESNOTE("queueing extents to scan, " << tqs << " of " << BEES_MAX_QUEUE_SIZE);
|
BEESNOTE("queueing extents to scan, " << tqs << " of " << BEES_MAX_QUEUE_SIZE);
|
||||||
|
bool run_again = false;
|
||||||
while (tqs < BEES_MAX_QUEUE_SIZE) {
|
while (tqs < BEES_MAX_QUEUE_SIZE) {
|
||||||
catch_all([&]() {
|
run_again = shared_this->crawl_roots();
|
||||||
shared_this->crawl_roots();
|
|
||||||
});
|
|
||||||
tqs = TaskMaster::get_queue_count();
|
tqs = TaskMaster::get_queue_count();
|
||||||
|
if (!run_again) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Task::current_task().run();
|
if (run_again) {
|
||||||
}).run();
|
shared_this->m_crawl_task.run();
|
||||||
|
} else {
|
||||||
|
shared_this->m_task_running = false;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Monitor transid_max and wake up roots when it changes
|
// Monitor transid_max and wake up roots when it changes
|
||||||
BEESNOTE("tracking transid");
|
BEESNOTE("tracking transid");
|
||||||
@ -389,6 +391,14 @@ BeesRoots::crawl_thread()
|
|||||||
}
|
}
|
||||||
last_count = new_count;
|
last_count = new_count;
|
||||||
|
|
||||||
|
// If no crawl task is running, start a new one
|
||||||
|
bool already_running = m_task_running.exchange(true);
|
||||||
|
if (!already_running) {
|
||||||
|
auto resumed_after_time = m_crawl_timer.lap();
|
||||||
|
BEESLOGINFO("Crawl master resumed after " << resumed_after_time << "s at transid " << new_count);
|
||||||
|
m_crawl_task.run();
|
||||||
|
}
|
||||||
|
|
||||||
auto poll_time = m_transid_re.seconds_for(m_transid_factor);
|
auto poll_time = m_transid_re.seconds_for(m_transid_factor);
|
||||||
BEESLOGDEBUG("Polling " << poll_time << "s for next " << m_transid_factor << " transid " << m_transid_re);
|
BEESLOGDEBUG("Polling " << poll_time << "s for next " << m_transid_factor << " transid " << m_transid_re);
|
||||||
BEESNOTE("waiting " << poll_time << "s for next " << m_transid_factor << " transid " << m_transid_re);
|
BEESNOTE("waiting " << poll_time << "s for next " << m_transid_factor << " transid " << m_transid_re);
|
||||||
@ -497,7 +507,8 @@ BeesRoots::BeesRoots(shared_ptr<BeesContext> ctx) :
|
|||||||
m_ctx(ctx),
|
m_ctx(ctx),
|
||||||
m_crawl_state_file(ctx->home_fd(), crawl_state_filename()),
|
m_crawl_state_file(ctx->home_fd(), crawl_state_filename()),
|
||||||
m_crawl_thread("crawl_transid"),
|
m_crawl_thread("crawl_transid"),
|
||||||
m_writeback_thread("crawl_writeback")
|
m_writeback_thread("crawl_writeback"),
|
||||||
|
m_task_running(false)
|
||||||
{
|
{
|
||||||
m_crawl_thread.exec([&]() {
|
m_crawl_thread.exec([&]() {
|
||||||
// Measure current transid before creating any crawlers
|
// Measure current transid before creating any crawlers
|
||||||
|
@ -9,8 +9,9 @@
|
|||||||
#include "crucible/fs.h"
|
#include "crucible/fs.h"
|
||||||
#include "crucible/lockset.h"
|
#include "crucible/lockset.h"
|
||||||
#include "crucible/time.h"
|
#include "crucible/time.h"
|
||||||
|
#include "crucible/task.h"
|
||||||
|
|
||||||
#include <array>
|
#include <atomic>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
@ -528,6 +529,8 @@ class BeesRoots : public enable_shared_from_this<BeesRoots> {
|
|||||||
BeesThread m_writeback_thread;
|
BeesThread m_writeback_thread;
|
||||||
RateEstimator m_transid_re;
|
RateEstimator m_transid_re;
|
||||||
size_t m_transid_factor = BEES_TRANSID_FACTOR;
|
size_t m_transid_factor = BEES_TRANSID_FACTOR;
|
||||||
|
atomic<bool> m_task_running;
|
||||||
|
Task m_crawl_task;
|
||||||
|
|
||||||
void insert_new_crawl();
|
void insert_new_crawl();
|
||||||
void insert_root(const BeesCrawlState &bcs);
|
void insert_root(const BeesCrawlState &bcs);
|
||||||
@ -538,7 +541,7 @@ class BeesRoots : public enable_shared_from_this<BeesRoots> {
|
|||||||
uint64_t transid_max_nocache();
|
uint64_t transid_max_nocache();
|
||||||
void state_load();
|
void state_load();
|
||||||
void state_save();
|
void state_save();
|
||||||
void crawl_roots();
|
bool crawl_roots();
|
||||||
string crawl_state_filename() const;
|
string crawl_state_filename() const;
|
||||||
BeesCrawlState crawl_state_get(uint64_t root);
|
BeesCrawlState crawl_state_get(uint64_t root);
|
||||||
void crawl_state_set_dirty();
|
void crawl_state_set_dirty();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user