1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00

hash: reduce mutex contention using one mutex per hash table extent

This avoids PERFORMANCE warnings when large hash tables are used on slow
CPUs or with lots of worker threads.  It also simplifies the code (no
locksets, only one object-wide mutex instead of two).

Fixed a few minor bugs along the way (e.g. we were not setting the dirty
flag on the right hash table extent when we detected hash table errors).

Simplified error handling:  IO errors on the hash table are ignored,
instead of throwing an exception into the function that tried to use the
hash table.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2018-01-10 23:08:22 -05:00
parent 365a913a26
commit 4aa5978a89
2 changed files with 151 additions and 100 deletions

View File

@ -24,14 +24,16 @@ operator<<(ostream &os, const BeesHashTable::Cell &bhte)
<< BeesAddress(bhte.e_addr) << " }";
}
#if 0
static
void
dump_bucket(BeesHashTable::Cell *p, BeesHashTable::Cell *q)
dump_bucket_locked(BeesHashTable::Cell *p, BeesHashTable::Cell *q)
{
// Must be called while holding m_bucket_mutex
for (auto i = p; i < q; ++i) {
BEESLOG("Entry " << i - p << " " << *i);
}
}
#endif
const bool VERIFY_CLEARS_BUGS = false;
@ -91,52 +93,74 @@ BeesHashTable::get_extent_range(HashType hash)
return make_pair(bp, ep);
}
bool
BeesHashTable::flush_dirty_extent(uint64_t extent_index)
{
BEESNOTE("flushing extent #" << extent_index << " of " << m_extents << " extents");
auto lock = lock_extent_by_index(extent_index);
// Not dirty, nothing to do
if (!m_extent_metadata.at(extent_index).m_dirty) {
return false;
}
bool wrote_extent = false;
catch_all([&]() {
uint8_t *dirty_extent = m_extent_ptr[extent_index].p_byte;
uint8_t *dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte;
THROW_CHECK1(out_of_range, dirty_extent, dirty_extent >= m_byte_ptr);
THROW_CHECK1(out_of_range, dirty_extent_end, dirty_extent_end <= m_byte_ptr_end);
THROW_CHECK2(out_of_range, dirty_extent_end, dirty_extent, dirty_extent_end - dirty_extent == BLOCK_SIZE_HASHTAB_EXTENT);
BEESTOOLONG("pwrite(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
// Copy the extent because we might be stuck writing for a while
vector<uint8_t> extent_copy(dirty_extent, dirty_extent_end);
// Mark extent non-dirty while we still hold the lock
m_extent_metadata.at(extent_index).m_dirty = false;
// Release the lock
lock.unlock();
// Write the extent (or not)
pwrite_or_die(m_fd, extent_copy, dirty_extent - m_byte_ptr);
BEESCOUNT(hash_extent_out);
wrote_extent = true;
});
BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents");
m_flush_rate_limit.sleep_for(BLOCK_SIZE_HASHTAB_EXTENT);
return wrote_extent;
}
void
BeesHashTable::flush_dirty_extents()
{
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
unique_lock<mutex> lock(m_extent_mutex);
auto dirty_extent_copy = m_buckets_dirty;
m_buckets_dirty.clear();
if (dirty_extent_copy.empty()) {
BEESNOTE("idle");
m_condvar.wait(lock);
return; // please call later, i.e. immediately
uint64_t wrote_extents = 0;
for (size_t extent_index = 0; extent_index < m_extents; ++extent_index) {
if (flush_dirty_extent(extent_index)) {
++wrote_extents;
}
}
lock.unlock();
size_t extent_counter = 0;
for (auto extent_number : dirty_extent_copy) {
++extent_counter;
BEESNOTE("flush extent #" << extent_number << " (" << extent_counter << " of " << dirty_extent_copy.size() << ")");
catch_all([&]() {
uint8_t *dirty_extent = m_extent_ptr[extent_number].p_byte;
uint8_t *dirty_extent_end = m_extent_ptr[extent_number + 1].p_byte;
THROW_CHECK1(out_of_range, dirty_extent, dirty_extent >= m_byte_ptr);
THROW_CHECK1(out_of_range, dirty_extent_end, dirty_extent_end <= m_byte_ptr_end);
THROW_CHECK2(out_of_range, dirty_extent_end, dirty_extent, dirty_extent_end - dirty_extent == BLOCK_SIZE_HASHTAB_EXTENT);
BEESTOOLONG("pwrite(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
// Page locks slow us down more than copying the data does
vector<uint8_t> extent_copy(dirty_extent, dirty_extent_end);
pwrite_or_die(m_fd, extent_copy, dirty_extent - m_byte_ptr);
BEESCOUNT(hash_extent_out);
});
BEESNOTE("flush rate limited at extent #" << extent_number << " (" << extent_counter << " of " << dirty_extent_copy.size() << ")");
m_flush_rate_limit.sleep_for(BLOCK_SIZE_HASHTAB_EXTENT);
}
BEESNOTE("idle after writing " << wrote_extents << " of " << m_extents << " extents");
unique_lock<mutex> lock(m_dirty_mutex);
m_dirty_condvar.wait(lock);
}
void
BeesHashTable::set_extent_dirty(HashType hash)
BeesHashTable::set_extent_dirty_locked(uint64_t extent_index)
{
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
auto pr = get_extent_range(hash);
uint64_t extent_number = reinterpret_cast<Extent *>(pr.first) - m_extent_ptr;
THROW_CHECK1(runtime_error, extent_number, extent_number < m_extents);
unique_lock<mutex> lock(m_extent_mutex);
m_buckets_dirty.insert(extent_number);
m_condvar.notify_one();
// Must already be locked
m_extent_metadata.at(extent_index).m_dirty = true;
// Signal writeback thread
unique_lock<mutex> dirty_lock(m_dirty_mutex);
m_dirty_condvar.notify_one();
}
void
@ -179,13 +203,13 @@ BeesHashTable::prefetch_loop()
size_t unaligned_eof_count = 0;
for (uint64_t ext = 0; ext < m_extents; ++ext) {
BEESNOTE("prefetching hash table extent " << ext << " of " << m_extent_ptr_end - m_extent_ptr);
BEESNOTE("prefetching hash table extent " << ext << " of " << m_extents);
catch_all([&]() {
fetch_missing_extent(ext * c_buckets_per_extent);
fetch_missing_extent_by_index(ext);
BEESNOTE("analyzing hash table extent " << ext << " of " << m_extent_ptr_end - m_extent_ptr);
BEESNOTE("analyzing hash table extent " << ext << " of " << m_extents);
bool duplicate_bugs_found = false;
unique_lock<mutex> lock(m_bucket_mutex);
auto lock = lock_extent_by_index(ext);
for (Bucket *bucket = m_extent_ptr[ext].p_buckets; bucket < m_extent_ptr[ext + 1].p_buckets; ++bucket) {
if (verify_cell_range(bucket[0].p_cells, bucket[1].p_cells)) {
duplicate_bugs_found = true;
@ -214,9 +238,8 @@ BeesHashTable::prefetch_loop()
// Count these instead of calculating the number so we get better stats in case of exceptions
occupied_count += this_bucket_occupied_count;
}
lock.unlock();
if (duplicate_bugs_found) {
set_extent_dirty(ext);
set_extent_dirty_locked(ext);
}
});
}
@ -291,52 +314,70 @@ BeesHashTable::prefetch_loop()
}
}
void
BeesHashTable::fetch_missing_extent(HashType hash)
size_t
BeesHashTable::hash_to_extent_index(HashType hash)
{
auto pr = get_extent_range(hash);
uint64_t extent_index = reinterpret_cast<const Extent *>(pr.first) - m_extent_ptr;
THROW_CHECK2(runtime_error, extent_index, m_extents, extent_index < m_extents);
return extent_index;
}
BeesHashTable::ExtentMetaData::ExtentMetaData() :
m_mutex_ptr(make_shared<mutex>())
{
}
unique_lock<mutex>
BeesHashTable::lock_extent_by_index(uint64_t extent_index)
{
THROW_CHECK2(out_of_range, extent_index, m_extents, extent_index < m_extents);
return unique_lock<mutex>(*m_extent_metadata.at(extent_index).m_mutex_ptr);
}
unique_lock<mutex>
BeesHashTable::lock_extent_by_hash(HashType hash)
{
BEESTOOLONG("fetch_missing_extent for hash " << to_hex(hash));
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
auto pr = get_extent_range(hash);
uint64_t extent_number = reinterpret_cast<Extent *>(pr.first) - m_extent_ptr;
THROW_CHECK1(runtime_error, extent_number, extent_number < m_extents);
return lock_extent_by_index(hash_to_extent_index(hash));
}
unique_lock<mutex> lock(m_extent_mutex);
if (!m_buckets_missing.count(extent_number)) {
void
BeesHashTable::fetch_missing_extent_by_index(uint64_t extent_index)
{
BEESNOTE("checking hash extent #" << extent_index << " of " << m_extents << " extents");
auto lock = lock_extent_by_index(extent_index);
if (!m_extent_metadata.at(extent_index).m_missing) {
return;
}
size_t missing_buckets = m_buckets_missing.size();
lock.unlock();
BEESNOTE("waiting to fetch hash extent #" << extent_number << ", " << missing_buckets << " left to fetch");
// Acquire blocking lock on this extent only
auto extent_lock = m_extent_lock_set.make_lock(extent_number);
// Check missing again because someone else might have fetched this
// extent for us while we didn't hold any locks
lock.lock();
if (!m_buckets_missing.count(extent_number)) {
BEESCOUNT(hash_extent_in_twice);
return;
}
lock.unlock();
// OK we have to read this extent
BEESNOTE("fetching hash extent #" << extent_number << ", " << missing_buckets << " left to fetch");
BEESNOTE("fetching hash extent #" << extent_index << " of " << m_extents << " extents");
BEESTRACE("Fetching hash extent #" << extent_index << " of " << m_extents << " extents");
BEESTOOLONG("Fetching hash extent #" << extent_index << " of " << m_extents << " extents");
BEESTRACE("Fetching missing hash extent " << extent_number);
uint8_t *dirty_extent = m_extent_ptr[extent_number].p_byte;
uint8_t *dirty_extent_end = m_extent_ptr[extent_number + 1].p_byte;
uint8_t *dirty_extent = m_extent_ptr[extent_index].p_byte;
uint8_t *dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte;
{
// If the read fails don't retry, just go with whatever data we have
m_extent_metadata.at(extent_index).m_missing = false;
catch_all([&]() {
BEESTOOLONG("pread(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
pread_or_die(m_fd, dirty_extent, dirty_extent_end - dirty_extent, dirty_extent - m_byte_ptr);
}
});
// Only count extents successfully read
BEESCOUNT(hash_extent_in);
lock.lock();
m_buckets_missing.erase(extent_number);
}
void
BeesHashTable::fetch_missing_extent_by_hash(HashType hash)
{
uint64_t extent_index = hash_to_extent_index(hash);
BEESNOTE("waiting to fetch hash extent #" << extent_index << " of " << m_extents << " extents");
fetch_missing_extent_by_index(extent_index);
}
bool
@ -358,10 +399,10 @@ BeesHashTable::find_cell(HashType hash)
rv.push_back(toxic_cell);
return rv;
}
fetch_missing_extent(hash);
fetch_missing_extent_by_hash(hash);
BEESTOOLONG("find_cell hash " << BeesHash(hash));
vector<Cell> rv;
unique_lock<mutex> lock(m_bucket_mutex);
auto lock = lock_extent_by_hash(hash);
auto er = get_cell_range(hash);
// FIXME: Weed out zero addresses in the table due to earlier bugs
copy_if(er.first, er.second, back_inserter(rv), [=](const Cell &ip) { return ip.e_hash == hash && ip.e_addr >= 0x1000; });
@ -377,9 +418,9 @@ BeesHashTable::find_cell(HashType hash)
void
BeesHashTable::erase_hash_addr(HashType hash, AddrType addr)
{
fetch_missing_extent(hash);
fetch_missing_extent_by_hash(hash);
BEESTOOLONG("erase hash " << to_hex(hash) << " addr " << addr);
unique_lock<mutex> lock(m_bucket_mutex);
auto lock = lock_extent_by_hash(hash);
auto er = get_cell_range(hash);
Cell mv(hash, addr);
Cell *ip = find(er.first, er.second, mv);
@ -387,7 +428,7 @@ BeesHashTable::erase_hash_addr(HashType hash, AddrType addr)
if (found) {
// Lookups on invalid addresses really hurt us. Kill it with fire!
*ip = Cell(0, 0);
set_extent_dirty(hash);
set_extent_dirty_locked(hash_to_extent_index(hash));
BEESCOUNT(hash_erase);
#if 0
if (verify_cell_range(er.first, er.second)) {
@ -405,9 +446,9 @@ BeesHashTable::erase_hash_addr(HashType hash, AddrType addr)
bool
BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr)
{
fetch_missing_extent(hash);
fetch_missing_extent_by_hash(hash);
BEESTOOLONG("push_front_hash_addr hash " << BeesHash(hash) <<" addr " << BeesAddress(addr));
unique_lock<mutex> lock(m_bucket_mutex);
auto lock = lock_extent_by_hash(hash);
auto er = get_cell_range(hash);
Cell mv(hash, addr);
Cell *ip = find(er.first, er.second, mv);
@ -437,7 +478,7 @@ BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr)
// There is now a space at the front, insert there if different
if (er.first[0] != mv) {
er.first[0] = mv;
set_extent_dirty(hash);
set_extent_dirty_locked(hash_to_extent_index(hash));
BEESCOUNT(hash_front);
}
#if 0
@ -456,9 +497,9 @@ BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr)
bool
BeesHashTable::push_random_hash_addr(HashType hash, AddrType addr)
{
fetch_missing_extent(hash);
fetch_missing_extent_by_hash(hash);
BEESTOOLONG("push_random_hash_addr hash " << BeesHash(hash) << " addr " << BeesAddress(addr));
unique_lock<mutex> lock(m_bucket_mutex);
auto lock = lock_extent_by_hash(hash);
auto er = get_cell_range(hash);
Cell mv(hash, addr);
Cell *ip = find(er.first, er.second, mv);
@ -521,14 +562,14 @@ BeesHashTable::push_random_hash_addr(HashType hash, AddrType addr)
case_cond = 5;
ret_dirty:
BEESCOUNT(hash_insert);
set_extent_dirty(hash);
set_extent_dirty_locked(hash_to_extent_index(hash));
ret:
#if 0
if (verify_cell_range(er.first, er.second, false)) {
BEESLOG("while push_randoming (case " << case_cond << ") pos " << pos
<< " ip " << (ip - er.first) << " " << mv);
// dump_bucket(saved.data(), saved.data() + saved.size());
// dump_bucket(er.first, er.second);
// dump_bucket_locked(saved.data(), saved.data() + saved.size());
// dump_bucket_locked(er.first, er.second);
}
#else
(void)case_cond;
@ -652,9 +693,7 @@ BeesHashTable::BeesHashTable(shared_ptr<BeesContext> ctx, string filename, off_t
}
}
for (uint64_t i = 0; i < m_size / sizeof(Extent); ++i) {
m_buckets_missing.insert(i);
}
m_extent_metadata.resize(m_extents);
m_writeback_thread.exec([&]() {
writeback_loop();

View File

@ -432,18 +432,24 @@ private:
uint64_t m_buckets;
uint64_t m_extents;
uint64_t m_cells;
set<uint64_t> m_buckets_dirty;
set<uint64_t> m_buckets_missing;
BeesThread m_writeback_thread;
BeesThread m_prefetch_thread;
RateLimiter m_flush_rate_limit;
mutex m_extent_mutex;
mutex m_bucket_mutex;
condition_variable m_condvar;
set<HashType> m_toxic_hashes;
BeesStringFile m_stats_file;
LockSet<uint64_t> m_extent_lock_set;
// Mutex/condvar for the writeback thread
mutex m_dirty_mutex;
condition_variable m_dirty_condvar;
// Per-extent structures
struct ExtentMetaData {
shared_ptr<mutex> m_mutex_ptr; // Access serializer
bool m_dirty = false; // Needs to be written back to disk
bool m_missing = true; // Needs to be read from disk
ExtentMetaData();
};
vector<ExtentMetaData> m_extent_metadata;
void open_file();
void writeback_loop();
@ -451,11 +457,17 @@ private:
void try_mmap_flags(int flags);
pair<Cell *, Cell *> get_cell_range(HashType hash);
pair<uint8_t *, uint8_t *> get_extent_range(HashType hash);
void fetch_missing_extent(HashType hash);
void set_extent_dirty(HashType hash);
void fetch_missing_extent_by_hash(HashType hash);
void fetch_missing_extent_by_index(uint64_t extent_index);
void set_extent_dirty_locked(uint64_t extent_index);
void flush_dirty_extents();
bool flush_dirty_extent(uint64_t extent_index);
bool is_toxic_hash(HashType h) const;
size_t hash_to_extent_index(HashType ht);
unique_lock<mutex> lock_extent_by_hash(HashType ht);
unique_lock<mutex> lock_extent_by_index(uint64_t extent_index);
BeesHashTable(const BeesHashTable &) = delete;
BeesHashTable &operator=(const BeesHashTable &) = delete;
};