mirror of
https://github.com/Zygo/bees.git
synced 2025-08-03 06:13:29 +02:00
Merge branch 'master' into subvol-threads
This commit is contained in:
@@ -8,7 +8,7 @@ all: $(PROGRAMS) depends.mk
|
||||
include ../makeflags
|
||||
|
||||
LIBS = -lcrucible -lpthread
|
||||
LDFLAGS = -L../lib -Wl,-rpath=$(shell realpath ../lib)
|
||||
LDFLAGS = -L../lib
|
||||
|
||||
depends.mk: Makefile *.cc
|
||||
for x in *.cc; do $(CXX) $(CXXFLAGS) -M "$$x"; done > depends.mk.new
|
||||
|
223
src/bees-hash.cc
223
src/bees-hash.cc
@@ -24,14 +24,16 @@ operator<<(ostream &os, const BeesHashTable::Cell &bhte)
|
||||
<< BeesAddress(bhte.e_addr) << " }";
|
||||
}
|
||||
|
||||
#if 0
|
||||
static
|
||||
void
|
||||
dump_bucket(BeesHashTable::Cell *p, BeesHashTable::Cell *q)
|
||||
dump_bucket_locked(BeesHashTable::Cell *p, BeesHashTable::Cell *q)
|
||||
{
|
||||
// Must be called while holding m_bucket_mutex
|
||||
for (auto i = p; i < q; ++i) {
|
||||
BEESLOG("Entry " << i - p << " " << *i);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
const bool VERIFY_CLEARS_BUGS = false;
|
||||
|
||||
@@ -91,52 +93,74 @@ BeesHashTable::get_extent_range(HashType hash)
|
||||
return make_pair(bp, ep);
|
||||
}
|
||||
|
||||
bool
|
||||
BeesHashTable::flush_dirty_extent(uint64_t extent_index)
|
||||
{
|
||||
BEESNOTE("flushing extent #" << extent_index << " of " << m_extents << " extents");
|
||||
|
||||
auto lock = lock_extent_by_index(extent_index);
|
||||
|
||||
// Not dirty, nothing to do
|
||||
if (!m_extent_metadata.at(extent_index).m_dirty) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool wrote_extent = false;
|
||||
|
||||
catch_all([&]() {
|
||||
uint8_t *dirty_extent = m_extent_ptr[extent_index].p_byte;
|
||||
uint8_t *dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte;
|
||||
THROW_CHECK1(out_of_range, dirty_extent, dirty_extent >= m_byte_ptr);
|
||||
THROW_CHECK1(out_of_range, dirty_extent_end, dirty_extent_end <= m_byte_ptr_end);
|
||||
THROW_CHECK2(out_of_range, dirty_extent_end, dirty_extent, dirty_extent_end - dirty_extent == BLOCK_SIZE_HASHTAB_EXTENT);
|
||||
BEESTOOLONG("pwrite(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
|
||||
// Copy the extent because we might be stuck writing for a while
|
||||
vector<uint8_t> extent_copy(dirty_extent, dirty_extent_end);
|
||||
|
||||
// Mark extent non-dirty while we still hold the lock
|
||||
m_extent_metadata.at(extent_index).m_dirty = false;
|
||||
|
||||
// Release the lock
|
||||
lock.unlock();
|
||||
|
||||
// Write the extent (or not)
|
||||
pwrite_or_die(m_fd, extent_copy, dirty_extent - m_byte_ptr);
|
||||
BEESCOUNT(hash_extent_out);
|
||||
|
||||
wrote_extent = true;
|
||||
});
|
||||
|
||||
BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents");
|
||||
m_flush_rate_limit.sleep_for(BLOCK_SIZE_HASHTAB_EXTENT);
|
||||
return wrote_extent;
|
||||
}
|
||||
|
||||
void
|
||||
BeesHashTable::flush_dirty_extents()
|
||||
{
|
||||
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
|
||||
|
||||
unique_lock<mutex> lock(m_extent_mutex);
|
||||
auto dirty_extent_copy = m_buckets_dirty;
|
||||
m_buckets_dirty.clear();
|
||||
if (dirty_extent_copy.empty()) {
|
||||
BEESNOTE("idle");
|
||||
m_condvar.wait(lock);
|
||||
return; // please call later, i.e. immediately
|
||||
uint64_t wrote_extents = 0;
|
||||
for (size_t extent_index = 0; extent_index < m_extents; ++extent_index) {
|
||||
if (flush_dirty_extent(extent_index)) {
|
||||
++wrote_extents;
|
||||
}
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
size_t extent_counter = 0;
|
||||
for (auto extent_number : dirty_extent_copy) {
|
||||
++extent_counter;
|
||||
BEESNOTE("flush extent #" << extent_number << " (" << extent_counter << " of " << dirty_extent_copy.size() << ")");
|
||||
catch_all([&]() {
|
||||
uint8_t *dirty_extent = m_extent_ptr[extent_number].p_byte;
|
||||
uint8_t *dirty_extent_end = m_extent_ptr[extent_number + 1].p_byte;
|
||||
THROW_CHECK1(out_of_range, dirty_extent, dirty_extent >= m_byte_ptr);
|
||||
THROW_CHECK1(out_of_range, dirty_extent_end, dirty_extent_end <= m_byte_ptr_end);
|
||||
THROW_CHECK2(out_of_range, dirty_extent_end, dirty_extent, dirty_extent_end - dirty_extent == BLOCK_SIZE_HASHTAB_EXTENT);
|
||||
BEESTOOLONG("pwrite(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
|
||||
// Page locks slow us down more than copying the data does
|
||||
vector<uint8_t> extent_copy(dirty_extent, dirty_extent_end);
|
||||
pwrite_or_die(m_fd, extent_copy, dirty_extent - m_byte_ptr);
|
||||
BEESCOUNT(hash_extent_out);
|
||||
});
|
||||
BEESNOTE("flush rate limited at extent #" << extent_number << " (" << extent_counter << " of " << dirty_extent_copy.size() << ")");
|
||||
m_flush_rate_limit.sleep_for(BLOCK_SIZE_HASHTAB_EXTENT);
|
||||
}
|
||||
BEESNOTE("idle after writing " << wrote_extents << " of " << m_extents << " extents");
|
||||
unique_lock<mutex> lock(m_dirty_mutex);
|
||||
m_dirty_condvar.wait(lock);
|
||||
}
|
||||
|
||||
void
|
||||
BeesHashTable::set_extent_dirty(HashType hash)
|
||||
BeesHashTable::set_extent_dirty_locked(uint64_t extent_index)
|
||||
{
|
||||
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
|
||||
auto pr = get_extent_range(hash);
|
||||
uint64_t extent_number = reinterpret_cast<Extent *>(pr.first) - m_extent_ptr;
|
||||
THROW_CHECK1(runtime_error, extent_number, extent_number < m_extents);
|
||||
unique_lock<mutex> lock(m_extent_mutex);
|
||||
m_buckets_dirty.insert(extent_number);
|
||||
m_condvar.notify_one();
|
||||
// Must already be locked
|
||||
m_extent_metadata.at(extent_index).m_dirty = true;
|
||||
|
||||
// Signal writeback thread
|
||||
unique_lock<mutex> dirty_lock(m_dirty_mutex);
|
||||
m_dirty_condvar.notify_one();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -179,13 +203,13 @@ BeesHashTable::prefetch_loop()
|
||||
size_t unaligned_eof_count = 0;
|
||||
|
||||
for (uint64_t ext = 0; ext < m_extents; ++ext) {
|
||||
BEESNOTE("prefetching hash table extent " << ext << " of " << m_extent_ptr_end - m_extent_ptr);
|
||||
BEESNOTE("prefetching hash table extent " << ext << " of " << m_extents);
|
||||
catch_all([&]() {
|
||||
fetch_missing_extent(ext * c_buckets_per_extent);
|
||||
fetch_missing_extent_by_index(ext);
|
||||
|
||||
BEESNOTE("analyzing hash table extent " << ext << " of " << m_extent_ptr_end - m_extent_ptr);
|
||||
BEESNOTE("analyzing hash table extent " << ext << " of " << m_extents);
|
||||
bool duplicate_bugs_found = false;
|
||||
unique_lock<mutex> lock(m_bucket_mutex);
|
||||
auto lock = lock_extent_by_index(ext);
|
||||
for (Bucket *bucket = m_extent_ptr[ext].p_buckets; bucket < m_extent_ptr[ext + 1].p_buckets; ++bucket) {
|
||||
if (verify_cell_range(bucket[0].p_cells, bucket[1].p_cells)) {
|
||||
duplicate_bugs_found = true;
|
||||
@@ -214,9 +238,8 @@ BeesHashTable::prefetch_loop()
|
||||
// Count these instead of calculating the number so we get better stats in case of exceptions
|
||||
occupied_count += this_bucket_occupied_count;
|
||||
}
|
||||
lock.unlock();
|
||||
if (duplicate_bugs_found) {
|
||||
set_extent_dirty(ext);
|
||||
set_extent_dirty_locked(ext);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -291,52 +314,70 @@ BeesHashTable::prefetch_loop()
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
BeesHashTable::fetch_missing_extent(HashType hash)
|
||||
size_t
|
||||
BeesHashTable::hash_to_extent_index(HashType hash)
|
||||
{
|
||||
auto pr = get_extent_range(hash);
|
||||
uint64_t extent_index = reinterpret_cast<const Extent *>(pr.first) - m_extent_ptr;
|
||||
THROW_CHECK2(runtime_error, extent_index, m_extents, extent_index < m_extents);
|
||||
return extent_index;
|
||||
}
|
||||
|
||||
BeesHashTable::ExtentMetaData::ExtentMetaData() :
|
||||
m_mutex_ptr(make_shared<mutex>())
|
||||
{
|
||||
}
|
||||
|
||||
unique_lock<mutex>
|
||||
BeesHashTable::lock_extent_by_index(uint64_t extent_index)
|
||||
{
|
||||
THROW_CHECK2(out_of_range, extent_index, m_extents, extent_index < m_extents);
|
||||
return unique_lock<mutex>(*m_extent_metadata.at(extent_index).m_mutex_ptr);
|
||||
}
|
||||
|
||||
unique_lock<mutex>
|
||||
BeesHashTable::lock_extent_by_hash(HashType hash)
|
||||
{
|
||||
BEESTOOLONG("fetch_missing_extent for hash " << to_hex(hash));
|
||||
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
|
||||
auto pr = get_extent_range(hash);
|
||||
uint64_t extent_number = reinterpret_cast<Extent *>(pr.first) - m_extent_ptr;
|
||||
THROW_CHECK1(runtime_error, extent_number, extent_number < m_extents);
|
||||
return lock_extent_by_index(hash_to_extent_index(hash));
|
||||
}
|
||||
|
||||
unique_lock<mutex> lock(m_extent_mutex);
|
||||
if (!m_buckets_missing.count(extent_number)) {
|
||||
void
|
||||
BeesHashTable::fetch_missing_extent_by_index(uint64_t extent_index)
|
||||
{
|
||||
BEESNOTE("checking hash extent #" << extent_index << " of " << m_extents << " extents");
|
||||
auto lock = lock_extent_by_index(extent_index);
|
||||
if (!m_extent_metadata.at(extent_index).m_missing) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t missing_buckets = m_buckets_missing.size();
|
||||
lock.unlock();
|
||||
|
||||
BEESNOTE("waiting to fetch hash extent #" << extent_number << ", " << missing_buckets << " left to fetch");
|
||||
|
||||
// Acquire blocking lock on this extent only
|
||||
auto extent_lock = m_extent_lock_set.make_lock(extent_number);
|
||||
|
||||
// Check missing again because someone else might have fetched this
|
||||
// extent for us while we didn't hold any locks
|
||||
lock.lock();
|
||||
if (!m_buckets_missing.count(extent_number)) {
|
||||
BEESCOUNT(hash_extent_in_twice);
|
||||
return;
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
// OK we have to read this extent
|
||||
BEESNOTE("fetching hash extent #" << extent_number << ", " << missing_buckets << " left to fetch");
|
||||
BEESNOTE("fetching hash extent #" << extent_index << " of " << m_extents << " extents");
|
||||
BEESTRACE("Fetching hash extent #" << extent_index << " of " << m_extents << " extents");
|
||||
BEESTOOLONG("Fetching hash extent #" << extent_index << " of " << m_extents << " extents");
|
||||
|
||||
BEESTRACE("Fetching missing hash extent " << extent_number);
|
||||
uint8_t *dirty_extent = m_extent_ptr[extent_number].p_byte;
|
||||
uint8_t *dirty_extent_end = m_extent_ptr[extent_number + 1].p_byte;
|
||||
uint8_t *dirty_extent = m_extent_ptr[extent_index].p_byte;
|
||||
uint8_t *dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte;
|
||||
|
||||
{
|
||||
// If the read fails don't retry, just go with whatever data we have
|
||||
m_extent_metadata.at(extent_index).m_missing = false;
|
||||
|
||||
catch_all([&]() {
|
||||
BEESTOOLONG("pread(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
|
||||
pread_or_die(m_fd, dirty_extent, dirty_extent_end - dirty_extent, dirty_extent - m_byte_ptr);
|
||||
}
|
||||
});
|
||||
|
||||
// Only count extents successfully read
|
||||
BEESCOUNT(hash_extent_in);
|
||||
lock.lock();
|
||||
m_buckets_missing.erase(extent_number);
|
||||
}
|
||||
|
||||
void
|
||||
BeesHashTable::fetch_missing_extent_by_hash(HashType hash)
|
||||
{
|
||||
uint64_t extent_index = hash_to_extent_index(hash);
|
||||
BEESNOTE("waiting to fetch hash extent #" << extent_index << " of " << m_extents << " extents");
|
||||
|
||||
fetch_missing_extent_by_index(extent_index);
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -358,10 +399,10 @@ BeesHashTable::find_cell(HashType hash)
|
||||
rv.push_back(toxic_cell);
|
||||
return rv;
|
||||
}
|
||||
fetch_missing_extent(hash);
|
||||
fetch_missing_extent_by_hash(hash);
|
||||
BEESTOOLONG("find_cell hash " << BeesHash(hash));
|
||||
vector<Cell> rv;
|
||||
unique_lock<mutex> lock(m_bucket_mutex);
|
||||
auto lock = lock_extent_by_hash(hash);
|
||||
auto er = get_cell_range(hash);
|
||||
// FIXME: Weed out zero addresses in the table due to earlier bugs
|
||||
copy_if(er.first, er.second, back_inserter(rv), [=](const Cell &ip) { return ip.e_hash == hash && ip.e_addr >= 0x1000; });
|
||||
@@ -377,9 +418,9 @@ BeesHashTable::find_cell(HashType hash)
|
||||
void
|
||||
BeesHashTable::erase_hash_addr(HashType hash, AddrType addr)
|
||||
{
|
||||
fetch_missing_extent(hash);
|
||||
fetch_missing_extent_by_hash(hash);
|
||||
BEESTOOLONG("erase hash " << to_hex(hash) << " addr " << addr);
|
||||
unique_lock<mutex> lock(m_bucket_mutex);
|
||||
auto lock = lock_extent_by_hash(hash);
|
||||
auto er = get_cell_range(hash);
|
||||
Cell mv(hash, addr);
|
||||
Cell *ip = find(er.first, er.second, mv);
|
||||
@@ -387,7 +428,7 @@ BeesHashTable::erase_hash_addr(HashType hash, AddrType addr)
|
||||
if (found) {
|
||||
// Lookups on invalid addresses really hurt us. Kill it with fire!
|
||||
*ip = Cell(0, 0);
|
||||
set_extent_dirty(hash);
|
||||
set_extent_dirty_locked(hash_to_extent_index(hash));
|
||||
BEESCOUNT(hash_erase);
|
||||
#if 0
|
||||
if (verify_cell_range(er.first, er.second)) {
|
||||
@@ -405,9 +446,9 @@ BeesHashTable::erase_hash_addr(HashType hash, AddrType addr)
|
||||
bool
|
||||
BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr)
|
||||
{
|
||||
fetch_missing_extent(hash);
|
||||
fetch_missing_extent_by_hash(hash);
|
||||
BEESTOOLONG("push_front_hash_addr hash " << BeesHash(hash) <<" addr " << BeesAddress(addr));
|
||||
unique_lock<mutex> lock(m_bucket_mutex);
|
||||
auto lock = lock_extent_by_hash(hash);
|
||||
auto er = get_cell_range(hash);
|
||||
Cell mv(hash, addr);
|
||||
Cell *ip = find(er.first, er.second, mv);
|
||||
@@ -437,7 +478,7 @@ BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr)
|
||||
// There is now a space at the front, insert there if different
|
||||
if (er.first[0] != mv) {
|
||||
er.first[0] = mv;
|
||||
set_extent_dirty(hash);
|
||||
set_extent_dirty_locked(hash_to_extent_index(hash));
|
||||
BEESCOUNT(hash_front);
|
||||
}
|
||||
#if 0
|
||||
@@ -456,9 +497,9 @@ BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr)
|
||||
bool
|
||||
BeesHashTable::push_random_hash_addr(HashType hash, AddrType addr)
|
||||
{
|
||||
fetch_missing_extent(hash);
|
||||
fetch_missing_extent_by_hash(hash);
|
||||
BEESTOOLONG("push_random_hash_addr hash " << BeesHash(hash) << " addr " << BeesAddress(addr));
|
||||
unique_lock<mutex> lock(m_bucket_mutex);
|
||||
auto lock = lock_extent_by_hash(hash);
|
||||
auto er = get_cell_range(hash);
|
||||
Cell mv(hash, addr);
|
||||
Cell *ip = find(er.first, er.second, mv);
|
||||
@@ -521,14 +562,14 @@ BeesHashTable::push_random_hash_addr(HashType hash, AddrType addr)
|
||||
case_cond = 5;
|
||||
ret_dirty:
|
||||
BEESCOUNT(hash_insert);
|
||||
set_extent_dirty(hash);
|
||||
set_extent_dirty_locked(hash_to_extent_index(hash));
|
||||
ret:
|
||||
#if 0
|
||||
if (verify_cell_range(er.first, er.second, false)) {
|
||||
BEESLOG("while push_randoming (case " << case_cond << ") pos " << pos
|
||||
<< " ip " << (ip - er.first) << " " << mv);
|
||||
// dump_bucket(saved.data(), saved.data() + saved.size());
|
||||
// dump_bucket(er.first, er.second);
|
||||
// dump_bucket_locked(saved.data(), saved.data() + saved.size());
|
||||
// dump_bucket_locked(er.first, er.second);
|
||||
}
|
||||
#else
|
||||
(void)case_cond;
|
||||
@@ -657,9 +698,7 @@ BeesHashTable::BeesHashTable(shared_ptr<BeesContext> ctx, string filename, off_t
|
||||
}
|
||||
}
|
||||
|
||||
for (uint64_t i = 0; i < m_size / sizeof(Extent); ++i) {
|
||||
m_buckets_missing.insert(i);
|
||||
}
|
||||
m_extent_metadata.resize(m_extents);
|
||||
|
||||
m_writeback_thread.exec([&]() {
|
||||
writeback_loop();
|
||||
|
28
src/bees.h
28
src/bees.h
@@ -432,18 +432,24 @@ private:
|
||||
uint64_t m_buckets;
|
||||
uint64_t m_extents;
|
||||
uint64_t m_cells;
|
||||
set<uint64_t> m_buckets_dirty;
|
||||
set<uint64_t> m_buckets_missing;
|
||||
BeesThread m_writeback_thread;
|
||||
BeesThread m_prefetch_thread;
|
||||
RateLimiter m_flush_rate_limit;
|
||||
mutex m_extent_mutex;
|
||||
mutex m_bucket_mutex;
|
||||
condition_variable m_condvar;
|
||||
set<HashType> m_toxic_hashes;
|
||||
BeesStringFile m_stats_file;
|
||||
|
||||
LockSet<uint64_t> m_extent_lock_set;
|
||||
// Mutex/condvar for the writeback thread
|
||||
mutex m_dirty_mutex;
|
||||
condition_variable m_dirty_condvar;
|
||||
|
||||
// Per-extent structures
|
||||
struct ExtentMetaData {
|
||||
shared_ptr<mutex> m_mutex_ptr; // Access serializer
|
||||
bool m_dirty = false; // Needs to be written back to disk
|
||||
bool m_missing = true; // Needs to be read from disk
|
||||
ExtentMetaData();
|
||||
};
|
||||
vector<ExtentMetaData> m_extent_metadata;
|
||||
|
||||
void open_file();
|
||||
void writeback_loop();
|
||||
@@ -451,11 +457,17 @@ private:
|
||||
void try_mmap_flags(int flags);
|
||||
pair<Cell *, Cell *> get_cell_range(HashType hash);
|
||||
pair<uint8_t *, uint8_t *> get_extent_range(HashType hash);
|
||||
void fetch_missing_extent(HashType hash);
|
||||
void set_extent_dirty(HashType hash);
|
||||
void fetch_missing_extent_by_hash(HashType hash);
|
||||
void fetch_missing_extent_by_index(uint64_t extent_index);
|
||||
void set_extent_dirty_locked(uint64_t extent_index);
|
||||
void flush_dirty_extents();
|
||||
bool flush_dirty_extent(uint64_t extent_index);
|
||||
bool is_toxic_hash(HashType h) const;
|
||||
|
||||
size_t hash_to_extent_index(HashType ht);
|
||||
unique_lock<mutex> lock_extent_by_hash(HashType ht);
|
||||
unique_lock<mutex> lock_extent_by_index(uint64_t extent_index);
|
||||
|
||||
BeesHashTable(const BeesHashTable &) = delete;
|
||||
BeesHashTable &operator=(const BeesHashTable &) = delete;
|
||||
};
|
||||
|
Reference in New Issue
Block a user