1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00

crucible: progress: a progress tracker for worker queues

The task queue can become very large with many subvols, requiring hours
for the queue to clear.  'beescrawl.dat' saves in the meantime will save
the work currently scheduled, not the work currently completed.

Fix by tracking progress with ProgressTracker.  ProgressTracker::begin()
gives the last completed crawl position.  ProgressTracker::end() gives
the last scheduled crawl position.  begin() does not advance if there
is any item between begin() and end() is not yet completed.  In between
are crawled extents that are on the task queue but not yet processed.
The file 'beescrawl.dat' saves the begin() position while the extent
scanning task queue is fed from the end() position.

Also remove an unused method crawl_state_get() and repurpose the
operator<(BeesCrawlState) that nobody was using.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2018-02-26 23:51:20 -05:00
parent 90c32c3f05
commit 5bdad7fc93
5 changed files with 244 additions and 58 deletions

133
include/crucible/progress.h Normal file
View File

@ -0,0 +1,133 @@
#ifndef CRUCIBLE_PROGRESS_H
#define CRUCIBLE_PROGRESS_H
#include "crucible/error.h"
#include <functional>
#include <map>
#include <memory>
#include <mutex>
namespace crucible {
using namespace std;
template <class T>
class ProgressTracker {
class ProgressTrackerState;
class ProgressHolderState;
public:
using value_type = T;
using ProgressHolder = shared_ptr<ProgressHolderState>;
ProgressTracker(const value_type &v);
value_type begin();
value_type end();
void set(const value_type &v);
ProgressHolder hold(const value_type &v);
friend class ProgressHolderState;
private:
struct ProgressTrackerState {
using key_type = pair<value_type, ProgressHolderState *>;
mutex m_mutex;
map<key_type, bool> m_in_progress;
value_type m_begin;
value_type m_end;
};
class ProgressHolderState {
shared_ptr<ProgressTrackerState> m_state;
const value_type m_value;
public:
ProgressHolderState(shared_ptr<ProgressTrackerState> state, const value_type &v);
~ProgressHolderState();
value_type get() const;
};
shared_ptr<ProgressTrackerState> m_state;
};
template <class T>
typename ProgressTracker<T>::value_type
ProgressTracker<T>::begin()
{
unique_lock<mutex> lock(m_state->m_mutex);
return m_state->m_begin;
}
template <class T>
typename ProgressTracker<T>::value_type
ProgressTracker<T>::end()
{
unique_lock<mutex> lock(m_state->m_mutex);
return m_state->m_end;
}
template <class T>
void
ProgressTracker<T>::set(const value_type &v)
{
unique_lock<mutex> lock(m_state->m_mutex);
if (m_state->m_end < v) {
m_state->m_end = v;
}
}
template <class T>
typename ProgressTracker<T>::value_type
ProgressTracker<T>::ProgressHolderState::get() const
{
return m_value;
}
template <class T>
ProgressTracker<T>::ProgressTracker(const ProgressTracker::value_type &t) :
m_state(make_shared<ProgressTrackerState>())
{
m_state->m_begin = t;
m_state->m_end = t;
}
template <class T>
ProgressTracker<T>::ProgressHolderState::ProgressHolderState(shared_ptr<ProgressTrackerState> state, const value_type &v) :
m_state(state),
m_value(v)
{
unique_lock<mutex> lock(m_state->m_mutex);
m_state->m_in_progress[make_pair(m_value, this)] = true;
if (m_state->m_end < m_value) {
m_state->m_end = m_value;
}
}
template <class T>
ProgressTracker<T>::ProgressHolderState::~ProgressHolderState()
{
unique_lock<mutex> lock(m_state->m_mutex);
m_state->m_in_progress[make_pair(m_value, this)] = false;
auto p = m_state->m_in_progress.begin();
while (p!= m_state->m_in_progress.end()) {
if (p->second) {
break;
}
if (m_state->m_begin < p->first.first) {
m_state->m_begin = p->first.first;
}
m_state->m_in_progress.erase(p);
p = m_state->m_in_progress.begin();
}
}
template <class T>
shared_ptr<typename ProgressTracker<T>::ProgressHolderState>
ProgressTracker<T>::hold(const value_type &v)
{
return make_shared<ProgressHolderState>(m_state, v);
}
}
#endif // CRUCIBLE_PROGRESS_H

View File

@ -46,8 +46,8 @@ BeesCrawlState::BeesCrawlState() :
bool
BeesCrawlState::operator<(const BeesCrawlState &that) const
{
return tie(m_objectid, m_offset, m_root, m_min_transid, m_max_transid)
< tie(that.m_objectid, that.m_offset, that.m_root, that.m_min_transid, that.m_max_transid);
return tie(m_min_transid, m_objectid, m_offset, m_root, m_max_transid)
< tie(that.m_min_transid, that.m_objectid, that.m_offset, that.m_root, that.m_max_transid);
}
string
@ -90,6 +90,24 @@ BeesRoots::crawl_state_filename() const
return rv;
}
ostream &
BeesRoots::state_to_stream(ostream &ofs)
{
for (auto i : m_root_crawl_map) {
auto ibcs = i.second->get_state_begin();
if (ibcs.m_max_transid) {
ofs << "root " << ibcs.m_root << " ";
ofs << "objectid " << ibcs.m_objectid << " ";
ofs << "offset " << ibcs.m_offset << " ";
ofs << "min_transid " << ibcs.m_min_transid << " ";
ofs << "max_transid " << ibcs.m_max_transid << " ";
ofs << "started " << ibcs.m_started << " ";
ofs << "start_ts " << format_time(ibcs.m_started) << "\n";
}
}
return ofs;
}
void
BeesRoots::state_save()
{
@ -109,18 +127,7 @@ BeesRoots::state_save()
return;
}
for (auto i : m_root_crawl_map) {
auto ibcs = i.second->get_state();
if (ibcs.m_max_transid) {
ofs << "root " << ibcs.m_root << " ";
ofs << "objectid " << ibcs.m_objectid << " ";
ofs << "offset " << ibcs.m_offset << " ";
ofs << "min_transid " << ibcs.m_min_transid << " ";
ofs << "max_transid " << ibcs.m_max_transid << " ";
ofs << "started " << ibcs.m_started << " ";
ofs << "start_ts " << format_time(ibcs.m_started) << "\n";
}
}
state_to_stream(ofs);
if (ofs.str().empty()) {
BEESLOGWARN("Crawl state empty!");
@ -145,15 +152,6 @@ BeesRoots::state_save()
BEESLOGINFO("Saved crawl state in " << save_time << "s");
}
BeesCrawlState
BeesRoots::crawl_state_get(uint64_t rootid)
{
unique_lock<mutex> lock(m_mutex);
auto rv = m_root_crawl_map.at(rootid)->get_state();
THROW_CHECK2(runtime_error, rv.m_root, rootid, rv.m_root == rootid);
return rv;
}
void
BeesRoots::crawl_state_set_dirty()
{
@ -188,7 +186,7 @@ BeesRoots::transid_min()
}
uint64_t rv = numeric_limits<uint64_t>::max();
for (auto i : m_root_crawl_map) {
rv = min(rv, i.second->get_state().m_min_transid);
rv = min(rv, i.second->get_state_end().m_min_transid);
}
return rv;
}
@ -248,22 +246,23 @@ BeesRoots::crawl_batch(shared_ptr<BeesCrawl> this_crawl)
{
auto ctx_copy = m_ctx;
size_t batch_count = 0;
auto subvol = this_crawl->get_state().m_root;
auto subvol = this_crawl->get_state_begin().m_root;
ostringstream oss;
oss << "crawl_" << subvol;
auto task_title = oss.str();
while (batch_count < BEES_MAX_CRAWL_BATCH) {
auto this_state = this_crawl->get_state();
auto this_range = this_crawl->pop_front();
if (!this_range) {
break;
}
Task(task_title, [ctx_copy, this_range]() {
auto this_hold = this_crawl->hold_state(this_range);
auto shared_this_copy = shared_from_this();
Task(task_title, [ctx_copy, this_hold, this_range, shared_this_copy]() {
BEESNOTE("scan_forward " << this_range);
ctx_copy->scan_forward(this_range);
shared_this_copy->crawl_state_set_dirty();
}).run();
BEESCOUNT(crawl_scan);
m_crawl_current = this_state;
++batch_count;
}
return batch_count;
@ -335,8 +334,8 @@ BeesRoots::crawl_roots()
crawl_vector.push_back(i.second);
}
sort(crawl_vector.begin(), crawl_vector.end(), [&](const shared_ptr<BeesCrawl> &a, const shared_ptr<BeesCrawl> &b) -> bool {
auto a_state = a->get_state();
auto b_state = b->get_state();
auto a_state = a->get_state_end();
auto b_state = b->get_state_end();
return tie(a_state.m_started, a_state.m_root) < tie(b_state.m_started, b_state.m_root);
});
@ -432,7 +431,7 @@ void
BeesRoots::writeback_thread()
{
while (true) {
BEESNOTE(m_crawl_current << (m_crawl_dirty ? " (dirty)" : ""));
BEESNOTE("idle, " << (m_crawl_dirty ? "dirty" : "clean"));
catch_all([&]() {
BEESNOTE("saving crawler state");
@ -836,7 +835,7 @@ BeesCrawl::next_transid()
{
auto roots = m_ctx->roots();
auto next_transid = roots->transid_max();
auto crawl_state = get_state();
auto crawl_state = get_state_end();
// If we are already at transid_max then we are still finished
m_finished = crawl_state.m_max_transid >= next_transid;
@ -872,15 +871,15 @@ BeesCrawl::fetch_extents()
return false;
}
auto old_state = get_state();
auto old_state = get_state_end();
// We can't scan an empty transid interval.
if (m_finished || old_state.m_max_transid <= old_state.m_min_transid) {
BEESTRACE("Crawl finished " << get_state());
BEESTRACE("Crawl finished " << get_state_end());
return next_transid();
}
BEESNOTE("crawling " << get_state());
BEESNOTE("crawling " << get_state_end());
Timer crawl_timer;
@ -919,13 +918,13 @@ BeesCrawl::fetch_extents()
if (!ioctl_ok || sk.m_result.empty()) {
BEESCOUNT(crawl_empty);
BEESLOGINFO("Crawl finished " << get_state());
BEESLOGINFO("Crawl finished " << get_state_end());
return next_transid();
}
// BEESLOGINFO("Crawling " << sk.m_result.size() << " results from " << get_state());
// BEESLOGINFO("Crawling " << sk.m_result.size() << " results from " << get_state_end());
auto results_left = sk.m_result.size();
BEESNOTE("crawling " << results_left << " results from " << get_state());
BEESNOTE("crawling " << results_left << " results from " << get_state_end());
size_t count_other = 0;
size_t count_inline = 0;
size_t count_unknown = 0;
@ -941,7 +940,7 @@ BeesCrawl::fetch_extents()
BEESTRACE("i = " << i);
// We need the "+ 1" and objectid rollover that next_min does.
auto new_state = get_state();
auto new_state = get_state_end();
new_state.m_objectid = sk.min_objectid;
new_state.m_offset = sk.min_offset;
@ -960,7 +959,7 @@ BeesCrawl::fetch_extents()
}
auto gen = call_btrfs_get(btrfs_stack_file_extent_generation, i.m_data);
if (gen < get_state().m_min_transid) {
if (gen < get_state_end().m_min_transid) {
BEESCOUNT(crawl_gen_low);
++count_low;
// We want (need?) to scan these anyway?
@ -977,7 +976,7 @@ BeesCrawl::fetch_extents()
// an extent.
continue;
}
if (gen > get_state().m_max_transid) {
if (gen > get_state_end().m_max_transid) {
BEESCOUNT(crawl_gen_high);
++count_high;
// We have to filter these here because we can't
@ -988,13 +987,13 @@ BeesCrawl::fetch_extents()
auto type = call_btrfs_get(btrfs_stack_file_extent_type, i.m_data);
switch (type) {
default:
BEESLOGDEBUG("Unhandled file extent type " << type << " in root " << get_state().m_root << " ino " << i.objectid << " offset " << to_hex(i.offset));
BEESLOGDEBUG("Unhandled file extent type " << type << " in root " << get_state_end().m_root << " ino " << i.objectid << " offset " << to_hex(i.offset));
++count_unknown;
BEESCOUNT(crawl_unknown);
break;
case BTRFS_FILE_EXTENT_INLINE:
// Ignore these for now.
// BEESLOGDEBUG("Ignored file extent type INLINE in root " << get_state().m_root << " ino " << i.objectid << " offset " << to_hex(i.offset));
// BEESLOGDEBUG("Ignored file extent type INLINE in root " << get_state_end().m_root << " ino " << i.objectid << " offset " << to_hex(i.offset));
++count_inline;
// TODO: replace with out-of-line dup extents
BEESCOUNT(crawl_inline);
@ -1007,7 +1006,7 @@ BeesCrawl::fetch_extents()
auto ram = call_btrfs_get(btrfs_stack_file_extent_ram_bytes, i.m_data);
auto len = call_btrfs_get(btrfs_stack_file_extent_num_bytes, i.m_data);
auto offset = call_btrfs_get(btrfs_stack_file_extent_offset, i.m_data);
BEESTRACE("Root " << get_state().m_root << " ino " << i.objectid << " physical " << to_hex(physical)
BEESTRACE("Root " << get_state_end().m_root << " ino " << i.objectid << " physical " << to_hex(physical)
<< " logical " << to_hex(i.offset) << ".." << to_hex(i.offset + len)
<< " gen " << gen);
++count_data;
@ -1015,7 +1014,7 @@ BeesCrawl::fetch_extents()
THROW_CHECK1(runtime_error, ram, ram > 0);
THROW_CHECK1(runtime_error, len, len > 0);
THROW_CHECK2(runtime_error, offset, ram, offset < ram);
BeesFileId bfi(get_state().m_root, i.objectid);
BeesFileId bfi(get_state_end().m_root, i.objectid);
if (m_ctx->is_blacklisted(bfi)) {
BEESCOUNT(crawl_blacklisted);
} else {
@ -1031,7 +1030,7 @@ BeesCrawl::fetch_extents()
}
}
}
// BEESLOGINFO("Crawled inline " << count_inline << " data " << count_data << " other " << count_other << " unknown " << count_unknown << " gen_low " << count_low << " gen_high " << count_high << " " << get_state() << " in " << crawl_timer << "s");
// BEESLOGINFO("Crawled inline " << count_inline << " data " << count_data << " other " << count_other << " unknown " << count_unknown << " gen_low " << count_low << " gen_high " << count_high << " " << get_state_end() << " in " << crawl_timer << "s");
return true;
}
@ -1039,7 +1038,7 @@ BeesCrawl::fetch_extents()
void
BeesCrawl::fetch_extents_harder()
{
BEESNOTE("fetch_extents_harder " << get_state() << " with " << m_extents.size() << " extents");
BEESNOTE("fetch_extents_harder " << get_state_end() << " with " << m_extents.size() << " extents");
while (m_extents.empty()) {
bool progress_made = fetch_extents();
if (!progress_made) {
@ -1074,19 +1073,30 @@ BeesCrawl::pop_front()
}
BeesCrawlState
BeesCrawl::get_state()
BeesCrawl::get_state_begin()
{
unique_lock<mutex> lock(m_state_mutex);
auto rv = m_state;
return rv;
return m_state.begin();
}
BeesCrawlState
BeesCrawl::get_state_end()
{
return m_state.end();
}
ProgressTracker<BeesCrawlState>::ProgressHolder
BeesCrawl::hold_state(const BeesFileRange &bfr)
{
auto bcs = m_state.end();
bcs.m_objectid = bfr.fid().ino();
bcs.m_offset = bfr.begin();
return m_state.hold(bcs);
}
void
BeesCrawl::set_state(const BeesCrawlState &bcs)
{
unique_lock<mutex> lock(m_state_mutex);
m_state = bcs;
lock.unlock();
m_state.set(bcs);
m_ctx->roots()->crawl_state_set_dirty();
}

View File

@ -8,6 +8,7 @@
#include "crucible/fd.h"
#include "crucible/fs.h"
#include "crucible/lockset.h"
#include "crucible/progress.h"
#include "crucible/time.h"
#include "crucible/task.h"
@ -501,7 +502,7 @@ class BeesCrawl {
bool m_finished = false;
mutex m_state_mutex;
BeesCrawlState m_state;
ProgressTracker<BeesCrawlState> m_state;
bool fetch_extents();
void fetch_extents_harder();
@ -511,7 +512,9 @@ public:
BeesCrawl(shared_ptr<BeesContext> ctx, BeesCrawlState initial_state);
BeesFileRange peek_front();
BeesFileRange pop_front();
BeesCrawlState get_state();
ProgressTracker<BeesCrawlState>::ProgressHolder hold_state(const BeesFileRange &bfr);
BeesCrawlState get_state_begin();
BeesCrawlState get_state_end();
void set_state(const BeesCrawlState &bcs);
void deferred(bool def_setting);
};
@ -520,7 +523,6 @@ class BeesRoots : public enable_shared_from_this<BeesRoots> {
shared_ptr<BeesContext> m_ctx;
BeesStringFile m_crawl_state_file;
BeesCrawlState m_crawl_current;
map<uint64_t, shared_ptr<BeesCrawl>> m_root_crawl_map;
mutex m_mutex;
bool m_crawl_dirty = false;
@ -540,10 +542,10 @@ class BeesRoots : public enable_shared_from_this<BeesRoots> {
uint64_t transid_max();
uint64_t transid_max_nocache();
void state_load();
ostream &state_to_stream(ostream &os);
void state_save();
bool crawl_roots();
string crawl_state_filename() const;
BeesCrawlState crawl_state_get(uint64_t root);
void crawl_state_set_dirty();
void crawl_state_erase(const BeesCrawlState &bcs);
void crawl_thread();

View File

@ -5,6 +5,7 @@ PROGRAMS = \
limits \
path \
process \
progress \
task \
all: test

40
test/progress.cc Normal file
View File

@ -0,0 +1,40 @@
#include "tests.h"
#include "crucible/progress.h"
#include <cassert>
#include <unistd.h>
using namespace crucible;
using namespace std;
void
test_progress()
{
ProgressTracker<uint64_t> pt(123);
auto hold = pt.hold(234);
auto hold2 = pt.hold(345);
assert(pt.begin() == 123);
assert(pt.end() == 345);
auto hold3 = pt.hold(456);
assert(pt.begin() == 123);
assert(pt.end() == 456);
hold2.reset();
assert(pt.begin() == 123);
assert(pt.end() == 456);
hold.reset();
assert(pt.begin() == 345);
assert(pt.end() == 456);
hold3.reset();
assert(pt.begin() == 456);
assert(pt.end() == 456);
}
int
main(int, char**)
{
RUN_A_TEST(test_progress());
exit(EXIT_SUCCESS);
}