From 4aa5978a89e8dbf055749316d43568d6aef07414 Mon Sep 17 00:00:00 2001 From: Zygo Blaxell Date: Wed, 10 Jan 2018 23:08:22 -0500 Subject: [PATCH] hash: reduce mutex contention using one mutex per hash table extent This avoids PERFORMANCE warnings when large hash tables are used on slow CPUs or with lots of worker threads. It also simplifies the code (no locksets, only one object-wide mutex instead of two). Fixed a few minor bugs along the way (e.g. we were not setting the dirty flag on the right hash table extent when we detected hash table errors). Simplified error handling: IO errors on the hash table are ignored, instead of throwing an exception into the function that tried to use the hash table. Signed-off-by: Zygo Blaxell --- src/bees-hash.cc | 223 ++++++++++++++++++++++++++++------------------- src/bees.h | 28 ++++-- 2 files changed, 151 insertions(+), 100 deletions(-) diff --git a/src/bees-hash.cc b/src/bees-hash.cc index 37b6f6c..7ba562f 100644 --- a/src/bees-hash.cc +++ b/src/bees-hash.cc @@ -24,14 +24,16 @@ operator<<(ostream &os, const BeesHashTable::Cell &bhte) << BeesAddress(bhte.e_addr) << " }"; } +#if 0 +static void -dump_bucket(BeesHashTable::Cell *p, BeesHashTable::Cell *q) +dump_bucket_locked(BeesHashTable::Cell *p, BeesHashTable::Cell *q) { - // Must be called while holding m_bucket_mutex for (auto i = p; i < q; ++i) { BEESLOG("Entry " << i - p << " " << *i); } } +#endif const bool VERIFY_CLEARS_BUGS = false; @@ -91,52 +93,74 @@ BeesHashTable::get_extent_range(HashType hash) return make_pair(bp, ep); } +bool +BeesHashTable::flush_dirty_extent(uint64_t extent_index) +{ + BEESNOTE("flushing extent #" << extent_index << " of " << m_extents << " extents"); + + auto lock = lock_extent_by_index(extent_index); + + // Not dirty, nothing to do + if (!m_extent_metadata.at(extent_index).m_dirty) { + return false; + } + + bool wrote_extent = false; + + catch_all([&]() { + uint8_t *dirty_extent = m_extent_ptr[extent_index].p_byte; + uint8_t *dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte; + THROW_CHECK1(out_of_range, dirty_extent, dirty_extent >= m_byte_ptr); + THROW_CHECK1(out_of_range, dirty_extent_end, dirty_extent_end <= m_byte_ptr_end); + THROW_CHECK2(out_of_range, dirty_extent_end, dirty_extent, dirty_extent_end - dirty_extent == BLOCK_SIZE_HASHTAB_EXTENT); + BEESTOOLONG("pwrite(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")"); + // Copy the extent because we might be stuck writing for a while + vector extent_copy(dirty_extent, dirty_extent_end); + + // Mark extent non-dirty while we still hold the lock + m_extent_metadata.at(extent_index).m_dirty = false; + + // Release the lock + lock.unlock(); + + // Write the extent (or not) + pwrite_or_die(m_fd, extent_copy, dirty_extent - m_byte_ptr); + BEESCOUNT(hash_extent_out); + + wrote_extent = true; + }); + + BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents"); + m_flush_rate_limit.sleep_for(BLOCK_SIZE_HASHTAB_EXTENT); + return wrote_extent; +} + void BeesHashTable::flush_dirty_extents() { THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0); - unique_lock lock(m_extent_mutex); - auto dirty_extent_copy = m_buckets_dirty; - m_buckets_dirty.clear(); - if (dirty_extent_copy.empty()) { - BEESNOTE("idle"); - m_condvar.wait(lock); - return; // please call later, i.e. immediately + uint64_t wrote_extents = 0; + for (size_t extent_index = 0; extent_index < m_extents; ++extent_index) { + if (flush_dirty_extent(extent_index)) { + ++wrote_extents; + } } - lock.unlock(); - size_t extent_counter = 0; - for (auto extent_number : dirty_extent_copy) { - ++extent_counter; - BEESNOTE("flush extent #" << extent_number << " (" << extent_counter << " of " << dirty_extent_copy.size() << ")"); - catch_all([&]() { - uint8_t *dirty_extent = m_extent_ptr[extent_number].p_byte; - uint8_t *dirty_extent_end = m_extent_ptr[extent_number + 1].p_byte; - THROW_CHECK1(out_of_range, dirty_extent, dirty_extent >= m_byte_ptr); - THROW_CHECK1(out_of_range, dirty_extent_end, dirty_extent_end <= m_byte_ptr_end); - THROW_CHECK2(out_of_range, dirty_extent_end, dirty_extent, dirty_extent_end - dirty_extent == BLOCK_SIZE_HASHTAB_EXTENT); - BEESTOOLONG("pwrite(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")"); - // Page locks slow us down more than copying the data does - vector extent_copy(dirty_extent, dirty_extent_end); - pwrite_or_die(m_fd, extent_copy, dirty_extent - m_byte_ptr); - BEESCOUNT(hash_extent_out); - }); - BEESNOTE("flush rate limited at extent #" << extent_number << " (" << extent_counter << " of " << dirty_extent_copy.size() << ")"); - m_flush_rate_limit.sleep_for(BLOCK_SIZE_HASHTAB_EXTENT); - } + BEESNOTE("idle after writing " << wrote_extents << " of " << m_extents << " extents"); + unique_lock lock(m_dirty_mutex); + m_dirty_condvar.wait(lock); } void -BeesHashTable::set_extent_dirty(HashType hash) +BeesHashTable::set_extent_dirty_locked(uint64_t extent_index) { - THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0); - auto pr = get_extent_range(hash); - uint64_t extent_number = reinterpret_cast(pr.first) - m_extent_ptr; - THROW_CHECK1(runtime_error, extent_number, extent_number < m_extents); - unique_lock lock(m_extent_mutex); - m_buckets_dirty.insert(extent_number); - m_condvar.notify_one(); + // Must already be locked + m_extent_metadata.at(extent_index).m_dirty = true; + + // Signal writeback thread + unique_lock dirty_lock(m_dirty_mutex); + m_dirty_condvar.notify_one(); } void @@ -179,13 +203,13 @@ BeesHashTable::prefetch_loop() size_t unaligned_eof_count = 0; for (uint64_t ext = 0; ext < m_extents; ++ext) { - BEESNOTE("prefetching hash table extent " << ext << " of " << m_extent_ptr_end - m_extent_ptr); + BEESNOTE("prefetching hash table extent " << ext << " of " << m_extents); catch_all([&]() { - fetch_missing_extent(ext * c_buckets_per_extent); + fetch_missing_extent_by_index(ext); - BEESNOTE("analyzing hash table extent " << ext << " of " << m_extent_ptr_end - m_extent_ptr); + BEESNOTE("analyzing hash table extent " << ext << " of " << m_extents); bool duplicate_bugs_found = false; - unique_lock lock(m_bucket_mutex); + auto lock = lock_extent_by_index(ext); for (Bucket *bucket = m_extent_ptr[ext].p_buckets; bucket < m_extent_ptr[ext + 1].p_buckets; ++bucket) { if (verify_cell_range(bucket[0].p_cells, bucket[1].p_cells)) { duplicate_bugs_found = true; @@ -214,9 +238,8 @@ BeesHashTable::prefetch_loop() // Count these instead of calculating the number so we get better stats in case of exceptions occupied_count += this_bucket_occupied_count; } - lock.unlock(); if (duplicate_bugs_found) { - set_extent_dirty(ext); + set_extent_dirty_locked(ext); } }); } @@ -291,52 +314,70 @@ BeesHashTable::prefetch_loop() } } -void -BeesHashTable::fetch_missing_extent(HashType hash) +size_t +BeesHashTable::hash_to_extent_index(HashType hash) +{ + auto pr = get_extent_range(hash); + uint64_t extent_index = reinterpret_cast(pr.first) - m_extent_ptr; + THROW_CHECK2(runtime_error, extent_index, m_extents, extent_index < m_extents); + return extent_index; +} + +BeesHashTable::ExtentMetaData::ExtentMetaData() : + m_mutex_ptr(make_shared()) +{ +} + +unique_lock +BeesHashTable::lock_extent_by_index(uint64_t extent_index) +{ + THROW_CHECK2(out_of_range, extent_index, m_extents, extent_index < m_extents); + return unique_lock(*m_extent_metadata.at(extent_index).m_mutex_ptr); +} + +unique_lock +BeesHashTable::lock_extent_by_hash(HashType hash) { BEESTOOLONG("fetch_missing_extent for hash " << to_hex(hash)); - THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0); - auto pr = get_extent_range(hash); - uint64_t extent_number = reinterpret_cast(pr.first) - m_extent_ptr; - THROW_CHECK1(runtime_error, extent_number, extent_number < m_extents); + return lock_extent_by_index(hash_to_extent_index(hash)); +} - unique_lock lock(m_extent_mutex); - if (!m_buckets_missing.count(extent_number)) { +void +BeesHashTable::fetch_missing_extent_by_index(uint64_t extent_index) +{ + BEESNOTE("checking hash extent #" << extent_index << " of " << m_extents << " extents"); + auto lock = lock_extent_by_index(extent_index); + if (!m_extent_metadata.at(extent_index).m_missing) { return; } - size_t missing_buckets = m_buckets_missing.size(); - lock.unlock(); - - BEESNOTE("waiting to fetch hash extent #" << extent_number << ", " << missing_buckets << " left to fetch"); - - // Acquire blocking lock on this extent only - auto extent_lock = m_extent_lock_set.make_lock(extent_number); - - // Check missing again because someone else might have fetched this - // extent for us while we didn't hold any locks - lock.lock(); - if (!m_buckets_missing.count(extent_number)) { - BEESCOUNT(hash_extent_in_twice); - return; - } - lock.unlock(); - // OK we have to read this extent - BEESNOTE("fetching hash extent #" << extent_number << ", " << missing_buckets << " left to fetch"); + BEESNOTE("fetching hash extent #" << extent_index << " of " << m_extents << " extents"); + BEESTRACE("Fetching hash extent #" << extent_index << " of " << m_extents << " extents"); + BEESTOOLONG("Fetching hash extent #" << extent_index << " of " << m_extents << " extents"); - BEESTRACE("Fetching missing hash extent " << extent_number); - uint8_t *dirty_extent = m_extent_ptr[extent_number].p_byte; - uint8_t *dirty_extent_end = m_extent_ptr[extent_number + 1].p_byte; + uint8_t *dirty_extent = m_extent_ptr[extent_index].p_byte; + uint8_t *dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte; - { + // If the read fails don't retry, just go with whatever data we have + m_extent_metadata.at(extent_index).m_missing = false; + + catch_all([&]() { BEESTOOLONG("pread(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")"); pread_or_die(m_fd, dirty_extent, dirty_extent_end - dirty_extent, dirty_extent - m_byte_ptr); - } + }); + // Only count extents successfully read BEESCOUNT(hash_extent_in); - lock.lock(); - m_buckets_missing.erase(extent_number); +} + +void +BeesHashTable::fetch_missing_extent_by_hash(HashType hash) +{ + uint64_t extent_index = hash_to_extent_index(hash); + BEESNOTE("waiting to fetch hash extent #" << extent_index << " of " << m_extents << " extents"); + + fetch_missing_extent_by_index(extent_index); } bool @@ -358,10 +399,10 @@ BeesHashTable::find_cell(HashType hash) rv.push_back(toxic_cell); return rv; } - fetch_missing_extent(hash); + fetch_missing_extent_by_hash(hash); BEESTOOLONG("find_cell hash " << BeesHash(hash)); vector rv; - unique_lock lock(m_bucket_mutex); + auto lock = lock_extent_by_hash(hash); auto er = get_cell_range(hash); // FIXME: Weed out zero addresses in the table due to earlier bugs copy_if(er.first, er.second, back_inserter(rv), [=](const Cell &ip) { return ip.e_hash == hash && ip.e_addr >= 0x1000; }); @@ -377,9 +418,9 @@ BeesHashTable::find_cell(HashType hash) void BeesHashTable::erase_hash_addr(HashType hash, AddrType addr) { - fetch_missing_extent(hash); + fetch_missing_extent_by_hash(hash); BEESTOOLONG("erase hash " << to_hex(hash) << " addr " << addr); - unique_lock lock(m_bucket_mutex); + auto lock = lock_extent_by_hash(hash); auto er = get_cell_range(hash); Cell mv(hash, addr); Cell *ip = find(er.first, er.second, mv); @@ -387,7 +428,7 @@ BeesHashTable::erase_hash_addr(HashType hash, AddrType addr) if (found) { // Lookups on invalid addresses really hurt us. Kill it with fire! *ip = Cell(0, 0); - set_extent_dirty(hash); + set_extent_dirty_locked(hash_to_extent_index(hash)); BEESCOUNT(hash_erase); #if 0 if (verify_cell_range(er.first, er.second)) { @@ -405,9 +446,9 @@ BeesHashTable::erase_hash_addr(HashType hash, AddrType addr) bool BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr) { - fetch_missing_extent(hash); + fetch_missing_extent_by_hash(hash); BEESTOOLONG("push_front_hash_addr hash " << BeesHash(hash) <<" addr " << BeesAddress(addr)); - unique_lock lock(m_bucket_mutex); + auto lock = lock_extent_by_hash(hash); auto er = get_cell_range(hash); Cell mv(hash, addr); Cell *ip = find(er.first, er.second, mv); @@ -437,7 +478,7 @@ BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr) // There is now a space at the front, insert there if different if (er.first[0] != mv) { er.first[0] = mv; - set_extent_dirty(hash); + set_extent_dirty_locked(hash_to_extent_index(hash)); BEESCOUNT(hash_front); } #if 0 @@ -456,9 +497,9 @@ BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr) bool BeesHashTable::push_random_hash_addr(HashType hash, AddrType addr) { - fetch_missing_extent(hash); + fetch_missing_extent_by_hash(hash); BEESTOOLONG("push_random_hash_addr hash " << BeesHash(hash) << " addr " << BeesAddress(addr)); - unique_lock lock(m_bucket_mutex); + auto lock = lock_extent_by_hash(hash); auto er = get_cell_range(hash); Cell mv(hash, addr); Cell *ip = find(er.first, er.second, mv); @@ -521,14 +562,14 @@ BeesHashTable::push_random_hash_addr(HashType hash, AddrType addr) case_cond = 5; ret_dirty: BEESCOUNT(hash_insert); - set_extent_dirty(hash); + set_extent_dirty_locked(hash_to_extent_index(hash)); ret: #if 0 if (verify_cell_range(er.first, er.second, false)) { BEESLOG("while push_randoming (case " << case_cond << ") pos " << pos << " ip " << (ip - er.first) << " " << mv); - // dump_bucket(saved.data(), saved.data() + saved.size()); - // dump_bucket(er.first, er.second); + // dump_bucket_locked(saved.data(), saved.data() + saved.size()); + // dump_bucket_locked(er.first, er.second); } #else (void)case_cond; @@ -652,9 +693,7 @@ BeesHashTable::BeesHashTable(shared_ptr ctx, string filename, off_t } } - for (uint64_t i = 0; i < m_size / sizeof(Extent); ++i) { - m_buckets_missing.insert(i); - } + m_extent_metadata.resize(m_extents); m_writeback_thread.exec([&]() { writeback_loop(); diff --git a/src/bees.h b/src/bees.h index 5155bc8..04af577 100644 --- a/src/bees.h +++ b/src/bees.h @@ -432,18 +432,24 @@ private: uint64_t m_buckets; uint64_t m_extents; uint64_t m_cells; - set m_buckets_dirty; - set m_buckets_missing; BeesThread m_writeback_thread; BeesThread m_prefetch_thread; RateLimiter m_flush_rate_limit; - mutex m_extent_mutex; - mutex m_bucket_mutex; - condition_variable m_condvar; set m_toxic_hashes; BeesStringFile m_stats_file; - LockSet m_extent_lock_set; + // Mutex/condvar for the writeback thread + mutex m_dirty_mutex; + condition_variable m_dirty_condvar; + + // Per-extent structures + struct ExtentMetaData { + shared_ptr m_mutex_ptr; // Access serializer + bool m_dirty = false; // Needs to be written back to disk + bool m_missing = true; // Needs to be read from disk + ExtentMetaData(); + }; + vector m_extent_metadata; void open_file(); void writeback_loop(); @@ -451,11 +457,17 @@ private: void try_mmap_flags(int flags); pair get_cell_range(HashType hash); pair get_extent_range(HashType hash); - void fetch_missing_extent(HashType hash); - void set_extent_dirty(HashType hash); + void fetch_missing_extent_by_hash(HashType hash); + void fetch_missing_extent_by_index(uint64_t extent_index); + void set_extent_dirty_locked(uint64_t extent_index); void flush_dirty_extents(); + bool flush_dirty_extent(uint64_t extent_index); bool is_toxic_hash(HashType h) const; + size_t hash_to_extent_index(HashType ht); + unique_lock lock_extent_by_hash(HashType ht); + unique_lock lock_extent_by_index(uint64_t extent_index); + BeesHashTable(const BeesHashTable &) = delete; BeesHashTable &operator=(const BeesHashTable &) = delete; };