1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-18 05:45:45 +02:00

bees: process each subvol in its own thread

This is yet another multi-threaded Bees experiment.

This time we are dividing the work by subvol:  one thread is created to
process each subvol in the filesystem.  There is no change in behavior
on filesystems containing only one subvol.

In order to avoid or mitigate the impact of kernel bugs and performance
issues, the btrfs ioctls FILE_EXTENT_SAME, SEARCH_V2, and LOGICAL_INO are
serialized.  Only one thread may execute any of these ioctls at any time.
All three ioctls share a single lock.

In order to simplify the implementation, only one thread is permitted to
create a temporary file during one call to scan_one_extent.  This prevents
multiple threads from racing to replace the same physical extent with
separate physical copies.

The single "crawl" thread is replaced by one "crawl_<root_number>"
for each subvol.

The crawl size is reduced from 4096 items to 1024.  This reduces the
memory requirement per subvol and keeps the data in memory fresher.
It also increases the number of log messages, so turn some of them off.

TODO:  Currently there is no configurable limit on the total number
of threads.  The number of CPUs is used as an upper bound on the number
of active threads; however, we still have one thread per subvol even if
all most of the threads do is wait for locks.

TODO:  Some of the single-threaded code is left behind until I make up
my mind about whether this experiment is successful.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2017-01-08 23:50:29 -05:00
parent 4113a171be
commit b22b4ed427
4 changed files with 120 additions and 26 deletions

View File

@ -260,9 +260,7 @@ BeesContext::BeesContext(shared_ptr<BeesContext> parent) :
bool
BeesContext::dedup(const BeesRangePair &brp)
{
// TOOLONG and NOTE can retroactively fill in the filename details, but LOG can't
BEESNOTE("dedup " << brp);
// Open the files
brp.first.fd(shared_from_this());
brp.second.fd(shared_from_this());
@ -273,6 +271,12 @@ BeesContext::dedup(const BeesRangePair &brp)
bees_sync(brp.first.fd());
#endif
// To avoid hammering all the cores with long-running ioctls,
// only do one dedup at any given time.
BEESNOTE("Waiting to dedup " << brp);
unique_lock<mutex> lock(bees_ioctl_mutex);
BEESNOTE("dedup " << brp);
BEESTOOLONG("dedup " << brp);
thread_local BeesFileId tl_first_fid, tl_second_fid;
@ -436,6 +440,9 @@ BeesContext::scan_one_extent(const BeesFileRange &bfr, const Extent &e)
BEESTRACE(e << " block_count " << block_count);
string bar(block_count, '#');
// Only one thread may create tmpfiles at any given time
unique_lock<mutex> tmpfile_lock(bees_tmpfile_mutex, defer_lock);
for (off_t next_p = e.begin(); next_p < e.end(); ) {
// Guarantee forward progress
@ -736,6 +743,10 @@ BeesContext::scan_one_extent(const BeesFileRange &bfr, const Extent &e)
// BEESLOG("noinsert_set.count(" << to_hex(p) << ") " << noinsert_set.count(p));
if (noinsert_set.count(p)) {
if (p - last_p > 0) {
if (!tmpfile_lock) {
BEESNOTE("waiting for tmpfile");
tmpfile_lock.lock();
}
rewrite_file_range(BeesFileRange(bfr.fd(), last_p, p));
blocks_rewritten = true;
}
@ -747,6 +758,10 @@ BeesContext::scan_one_extent(const BeesFileRange &bfr, const Extent &e)
}
BEESTRACE("last");
if (next_p - last_p > 0) {
if (!tmpfile_lock) {
BEESNOTE("waiting for tmpfile");
tmpfile_lock.lock();
}
rewrite_file_range(BeesFileRange(bfr.fd(), last_p, next_p));
blocks_rewritten = true;
}
@ -868,12 +883,19 @@ BeesContext::resolve_addr_uncached(BeesAddress addr)
{
THROW_CHECK1(invalid_argument, addr, !addr.is_magic());
THROW_CHECK0(invalid_argument, !!root_fd());
// To avoid hammering all the cores with long-running ioctls,
// only do one resolve at any given time.
BEESNOTE("waiting to resolve addr " << addr);
unique_lock<mutex> lock(bees_ioctl_mutex);
Timer resolve_timer;
// There is no performance benefit if we restrict the buffer size.
BtrfsIoctlLogicalInoArgs log_ino(addr.get_physical_or_zero());
{
BEESNOTE("resolving addr " << addr);
BEESTOOLONG("Resolving addr " << addr << " in " << root_path() << " refs " << log_ino.m_iors.size());
if (log_ino.do_ioctl_nothrow(root_fd())) {
BEESCOUNT(resolve_ok);

View File

@ -6,6 +6,8 @@
#include <fstream>
#include <tuple>
#include <inttypes.h>
using namespace crucible;
using namespace std;
@ -150,8 +152,9 @@ BeesRoots::crawl_state_erase(const BeesCrawlState &bcs)
return;
}
if (m_root_crawl_map.count(bcs.m_root)) {
m_root_crawl_map.erase(bcs.m_root);
auto found = m_root_crawl_map.find(bcs.m_root);
if (found != m_root_crawl_map.end()) {
m_root_crawl_map.erase(found);
m_crawl_dirty = true;
}
}
@ -234,7 +237,8 @@ BeesRoots::crawl_roots()
THROW_CHECK2(runtime_error, first_range, first_range_popped, first_range == first_range_popped);
return;
}
#else
#endif
#if 0
// Scan each subvol one extent at a time (good for continuous forward progress)
bool crawled = false;
for (auto i : crawl_map_copy) {
@ -270,7 +274,7 @@ void
BeesRoots::crawl_thread()
{
BEESNOTE("crawling");
while (1) {
while (true) {
catch_all([&]() {
crawl_roots();
});
@ -280,8 +284,9 @@ BeesRoots::crawl_thread()
void
BeesRoots::writeback_thread()
{
while (1) {
BEESNOTE(m_crawl_current << (m_crawl_dirty ? " (dirty)" : ""));
while (true) {
// BEESNOTE(m_crawl_current << (m_crawl_dirty ? " (dirty)" : ""));
BEESNOTE((m_crawl_dirty ? "dirty" : "clean") << ", interval " << BEES_WRITEBACK_INTERVAL << "s");
catch_all([&]() {
BEESNOTE("saving crawler state");
@ -382,15 +387,18 @@ BeesRoots::BeesRoots(shared_ptr<BeesContext> ctx) :
m_crawl_thread("crawl"),
m_writeback_thread("crawl_writeback")
{
m_crawl_thread.exec([&]() {
unsigned max_crawlers = max(1U, thread::hardware_concurrency());
m_lock_set.max_size(max_crawlers);
// m_crawl_thread.exec([&]() {
catch_all([&]() {
state_load();
});
m_writeback_thread.exec([&]() {
writeback_thread();
});
crawl_thread();
});
// crawl_thread();
// });
}
Fd
@ -612,8 +620,53 @@ BeesRoots::open_root_ino(uint64_t root, uint64_t ino)
BeesCrawl::BeesCrawl(shared_ptr<BeesContext> ctx, BeesCrawlState initial_state) :
m_ctx(ctx),
m_state(initial_state)
m_state(initial_state),
m_thread(astringprintf("crawl_%" PRIu64, m_state.m_root))
{
m_thread.exec([&]() {
crawl_thread();
});
}
BeesCrawl::~BeesCrawl()
{
BEESLOGNOTE("Stopping crawl thread " << m_state);
unique_lock<mutex> lock(m_mutex);
m_stopped = true;
m_cond_stopped.notify_all();
lock.unlock();
BEESLOGNOTE("Joining crawl thread " << m_state);
m_thread.join();
BEESLOG("Stopped crawl thread " << m_state);
}
void
BeesCrawl::crawl_thread()
{
Timer crawl_timer;
while (!m_stopped) {
BEESNOTE("waiting for crawl thread limit");
LockSet<uint64_t>::Lock crawl_lock(m_ctx->roots()->lock_set(), m_state.m_root);
auto this_range = pop_front();
if (this_range) {
catch_all([&]() {
// BEESINFO("scan_forward " << this_range);
m_ctx->scan_forward(this_range);
});
BEESCOUNT(crawl_scan);
} else {
auto crawl_time = crawl_timer.age();
BEESLOGNOTE("Crawl ran out of data after " << crawl_time << "s, waiting for more...");
crawl_lock.unlock();
unique_lock<mutex> lock(m_mutex);
if (m_stopped) {
break;
}
m_cond_stopped.wait_for(lock, chrono::duration<double>(BEES_COMMIT_INTERVAL));
crawl_timer.reset();
}
}
BEESLOG("Crawl thread stopped");
}
bool
@ -661,7 +714,7 @@ BeesCrawl::fetch_extents()
}
BEESNOTE("crawling " << get_state());
BEESLOG("Crawling " << get_state());
// BEESLOG("Crawling " << get_state());
Timer crawl_timer;
@ -680,6 +733,9 @@ BeesCrawl::fetch_extents()
BEESTRACE("Searching crawl sk " << static_cast<btrfs_ioctl_search_key&>(sk));
bool ioctl_ok = false;
{
BEESNOTE("waiting to search crawl sk " << static_cast<btrfs_ioctl_search_key&>(sk));
unique_lock<mutex> lock(bees_ioctl_mutex);
BEESNOTE("searching crawl sk " << static_cast<btrfs_ioctl_search_key&>(sk));
BEESTOOLONG("Searching crawl sk " << static_cast<btrfs_ioctl_search_key&>(sk));
Timer crawl_timer;
@ -700,7 +756,7 @@ BeesCrawl::fetch_extents()
return next_transid();
}
BEESLOG("Crawling " << sk.m_result.size() << " results from " << get_state());
// BEESLOG("Crawling " << sk.m_result.size() << " results from " << get_state());
auto results_left = sk.m_result.size();
BEESNOTE("crawling " << results_left << " results from " << get_state());
size_t count_other = 0;
@ -717,7 +773,6 @@ BeesCrawl::fetch_extents()
BEESTRACE("i = " << i);
#if 1
// We need the "+ 1" and objectid rollover that next_min does.
auto new_state = get_state();
new_state.m_objectid = sk.min_objectid;
@ -729,7 +784,6 @@ BeesCrawl::fetch_extents()
// is a lot of metadata we can't process. Favor forward
// progress over losing search results.
set_state(new_state);
#endif
// Ignore things that aren't EXTENT_DATA_KEY
if (i.type != BTRFS_EXTENT_DATA_KEY) {
@ -797,7 +851,7 @@ BeesCrawl::fetch_extents()
}
}
}
BEESLOG("Crawled inline " << count_inline << " data " << count_data << " other " << count_other << " unknown " << count_unknown << " gen_low " << count_low << " gen_high " << count_high << " " << get_state() << " in " << crawl_timer << "s");
// BEESLOG("Crawled inline " << count_inline << " data " << count_data << " other " << count_other << " unknown " << count_unknown << " gen_low " << count_low << " gen_high " << count_high << " " << get_state() << " in " << crawl_timer << "s");
return true;
}
@ -836,12 +890,6 @@ BeesCrawl::pop_front()
}
auto rv = *m_extents.begin();
m_extents.erase(m_extents.begin());
#if 0
auto state = get_state();
state.m_objectid = rv.fid().ino();
state.m_offset = rv.begin();
set_state(state);
#endif
return rv;
}

View File

@ -198,6 +198,21 @@ operator<<(ostream &os, const BeesStatTmpl<T> &bs)
// other ----------------------------------------
/**
* Don't allow two threads to use some btrfs ioctls at the same time.
* Some of them consume egregious amounts of kernel CPU time and are
* not interruptible, so if we have more threads than cores we will
* effectively crash the kernel. */
mutex bees_ioctl_mutex;
/**
* Don't allow two threads to create temporary copies of extent data at
* the same time. If two threads create temporary copies of the same
* extent at the same time they will not be properly deduped. This lock
* goes into effect as the first temporary extent is created by a thread,
* and is released after the source extent scan is finished. */
mutex bees_tmpfile_mutex;
template <class T>
T&
BeesStatTmpl<T>::at(string idx)

View File

@ -101,7 +101,7 @@ const double BEES_INFO_BURST = 1.0;
const size_t BEES_MAX_QUEUE_SIZE = 1024;
// Read this many items at a time in SEARCHv2
const size_t BEES_MAX_CRAWL_SIZE = 4096;
const size_t BEES_MAX_CRAWL_SIZE = 1024;
// If an extent has this many refs, pretend it does not exist
// to avoid a crippling btrfs performance bug
@ -490,23 +490,28 @@ class BeesCrawl {
mutex m_state_mutex;
BeesCrawlState m_state;
BeesThread m_thread;
bool m_stopped = false;
condition_variable m_cond_stopped;
bool fetch_extents();
void fetch_extents_harder();
bool next_transid();
public:
~BeesCrawl();
BeesCrawl(shared_ptr<BeesContext> ctx, BeesCrawlState initial_state);
BeesFileRange peek_front();
BeesFileRange pop_front();
BeesCrawlState get_state();
void set_state(const BeesCrawlState &bcs);
void crawl_thread();
};
class BeesRoots {
shared_ptr<BeesContext> m_ctx;
BeesStringFile m_crawl_state_file;
BeesCrawlState m_crawl_current;
map<uint64_t, shared_ptr<BeesCrawl>> m_root_crawl_map;
mutex m_mutex;
condition_variable m_condvar;
@ -514,6 +519,7 @@ class BeesRoots {
Timer m_crawl_timer;
BeesThread m_crawl_thread;
BeesThread m_writeback_thread;
LockSet<uint64_t> m_lock_set;
void insert_new_crawl();
void insert_root(const BeesCrawlState &bcs);
@ -541,6 +547,7 @@ public:
Fd open_root(uint64_t root);
Fd open_root_ino(uint64_t root, uint64_t ino);
Fd open_root_ino(const BeesFileId &bfi) { return open_root_ino(bfi.root(), bfi.ino()); }
LockSet<uint64_t> &lock_set() { return m_lock_set; }
};
struct BeesHash {
@ -822,5 +829,7 @@ string pretty(double d);
extern RateLimiter bees_info_rate_limit;
void bees_sync(int fd);
string format_time(time_t t);
extern mutex bees_ioctl_mutex;
extern mutex bees_tmpfile_mutex;
#endif