1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 21:35:45 +02:00
bees/src/bees-hash.cc
Zygo Blaxell 14cd6ed033 bees: deprecate vector<uint8_t> and replace with ByteVector
The vector<uint8_t> in the hash table doesn't hurt very much--only a few
microseconds per 128K hash block.

The vector<uint8_t> in BeesBlockData hurts a bit more--we run that
constructor thousands of times per second.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
2021-10-31 19:42:01 -04:00

823 lines
26 KiB
C++

#include "bees.h"
#include "crucible/city.h"
#include "crucible/crc64.h"
#include "crucible/string.h"
#include <algorithm>
#include <random>
#include <sys/mman.h>
using namespace crucible;
using namespace std;
BeesHash::BeesHash(const uint8_t *ptr, size_t len) :
// m_hash(CityHash64(reinterpret_cast<const char *>(ptr), len))
m_hash(Digest::CRC::crc64(ptr, len))
{
}
ostream &
operator<<(ostream &os, const BeesHash &bh)
{
return os << to_hex(BeesHash::Type(bh));
}
ostream &
operator<<(ostream &os, const BeesHashTable::Cell &bhte)
{
return os << "BeesHashTable::Cell { hash = " << BeesHash(bhte.e_hash) << ", addr = "
<< BeesAddress(bhte.e_addr) << " }";
}
#if 0
static
void
dump_bucket_locked(BeesHashTable::Cell *p, BeesHashTable::Cell *q)
{
for (auto i = p; i < q; ++i) {
BEESLOG("Entry " << i - p << " " << *i);
}
}
#endif
static const bool VERIFY_CLEARS_BUGS = false;
bool
verify_cell_range(BeesHashTable::Cell *p, BeesHashTable::Cell *q, bool clear_bugs = VERIFY_CLEARS_BUGS)
{
// Must be called while holding m_bucket_mutex
bool bugs_found = false;
set<BeesHashTable::Cell> seen_it;
for (BeesHashTable::Cell *cell = p; cell < q; ++cell) {
if (cell->e_addr && cell->e_addr < 0x1000) {
BEESCOUNT(bug_hash_magic_addr);
BEESLOGDEBUG("Bad hash table address hash " << to_hex(cell->e_hash) << " addr " << to_hex(cell->e_addr));
if (clear_bugs) {
cell->e_addr = 0;
cell->e_hash = 0;
}
bugs_found = true;
}
if (cell->e_addr && !seen_it.insert(*cell).second) {
BEESCOUNT(bug_hash_duplicate_cell);
// BEESLOGDEBUG("Duplicate hash table entry:\nthis = " << *cell << "\nold = " << *seen_it.find(*cell));
BEESLOGDEBUG("Duplicate hash table entry: " << *cell);
if (clear_bugs) {
cell->e_addr = 0;
cell->e_hash = 0;
}
bugs_found = true;
}
}
return bugs_found;
}
pair<BeesHashTable::Cell *, BeesHashTable::Cell *>
BeesHashTable::get_cell_range(HashType hash)
{
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
THROW_CHECK1(runtime_error, m_bucket_ptr, m_bucket_ptr != nullptr);
Bucket *pp = &m_bucket_ptr[hash % m_buckets];
Cell *bp = pp[0].p_cells;
Cell *ep = pp[1].p_cells;
THROW_CHECK2(out_of_range, m_cell_ptr, bp, bp >= m_cell_ptr);
THROW_CHECK2(out_of_range, m_cell_ptr_end, ep, ep <= m_cell_ptr_end);
return make_pair(bp, ep);
}
pair<uint8_t *, uint8_t *>
BeesHashTable::get_extent_range(HashType hash)
{
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
THROW_CHECK1(runtime_error, m_bucket_ptr, m_bucket_ptr != nullptr);
Extent *iop = &m_extent_ptr[ (hash % m_buckets) / c_buckets_per_extent ];
uint8_t *bp = iop[0].p_byte;
uint8_t *ep = iop[1].p_byte;
THROW_CHECK2(out_of_range, m_byte_ptr, bp, bp >= m_byte_ptr);
THROW_CHECK2(out_of_range, m_byte_ptr_end, ep, ep <= m_byte_ptr_end);
return make_pair(bp, ep);
}
bool
BeesHashTable::flush_dirty_extent(uint64_t extent_index)
{
BEESNOTE("flushing extent #" << extent_index << " of " << m_extents << " extents");
auto lock = lock_extent_by_index(extent_index);
// Not dirty, nothing to do
if (!m_extent_metadata.at(extent_index).m_dirty) {
return false;
}
bool wrote_extent = false;
catch_all([&]() {
uint8_t *const dirty_extent = m_extent_ptr[extent_index].p_byte;
uint8_t *const dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte;
const size_t dirty_extent_offset = dirty_extent - m_byte_ptr;
THROW_CHECK1(out_of_range, dirty_extent, dirty_extent >= m_byte_ptr);
THROW_CHECK1(out_of_range, dirty_extent_end, dirty_extent_end <= m_byte_ptr_end);
THROW_CHECK2(out_of_range, dirty_extent_end, dirty_extent, dirty_extent_end - dirty_extent == BLOCK_SIZE_HASHTAB_EXTENT);
BEESTOOLONG("pwrite(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
// Copy the extent because we might be stuck writing for a while
ByteVector extent_copy(dirty_extent, dirty_extent_end);
// Mark extent non-dirty while we still hold the lock
m_extent_metadata.at(extent_index).m_dirty = false;
// Release the lock
lock.unlock();
// Write the extent (or not)
pwrite_or_die(m_fd, extent_copy, dirty_extent_offset);
BEESCOUNT(hash_extent_out);
// Nope, this causes a _dramatic_ loss of performance.
// const size_t dirty_extent_size = dirty_extent_end - dirty_extent;
// bees_unreadahead(m_fd, dirty_extent_offset, dirty_extent_size);
wrote_extent = true;
});
return wrote_extent;
}
size_t
BeesHashTable::flush_dirty_extents(bool slowly)
{
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
uint64_t wrote_extents = 0;
for (size_t extent_index = 0; extent_index < m_extents; ++extent_index) {
if (flush_dirty_extent(extent_index)) {
++wrote_extents;
if (slowly) {
BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents");
chrono::duration<double> sleep_time(m_flush_rate_limit.sleep_time(BLOCK_SIZE_HASHTAB_EXTENT));
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
BEESLOGDEBUG("Stop requested in hash table flush_dirty_extents");
// This function is called by another thread with !slowly,
// so we just get out of the way here.
break;
}
m_stop_condvar.wait_for(lock, sleep_time);
}
}
}
if (!slowly) {
BEESLOGINFO("Flushed " << wrote_extents << " of " << m_extents << " extents");
}
return wrote_extents;
}
void
BeesHashTable::set_extent_dirty_locked(uint64_t extent_index)
{
// Must already be locked
m_extent_metadata.at(extent_index).m_dirty = true;
// Signal writeback thread
unique_lock<mutex> dirty_lock(m_dirty_mutex);
m_dirty = true;
m_dirty_condvar.notify_one();
}
void
BeesHashTable::writeback_loop()
{
while (!m_stop_requested) {
auto wrote_extents = flush_dirty_extents(true);
BEESNOTE("idle after writing " << wrote_extents << " of " << m_extents << " extents");
unique_lock<mutex> lock(m_dirty_mutex);
if (m_stop_requested) {
break;
}
if (m_dirty) {
m_dirty = false;
} else {
m_dirty_condvar.wait(lock);
}
}
catch_all([&]() {
// trigger writeback on our way out
BEESTOOLONG("unreadahead hash table size " << pretty(m_size));
bees_unreadahead(m_fd, 0, m_size);
});
BEESLOGDEBUG("Exited hash table writeback_loop");
}
static
string
percent(size_t num, size_t den)
{
if (den) {
return astringprintf("%u%%", num * 100 / den);
} else {
return "--%";
}
}
void
BeesHashTable::prefetch_loop()
{
bool not_locked = true;
while (!m_stop_requested) {
size_t width = 64;
vector<size_t> occupancy(width, 0);
size_t occupied_count = 0;
size_t total_count = 0;
size_t compressed_count = 0;
size_t compressed_offset_count = 0;
size_t toxic_count = 0;
size_t unaligned_eof_count = 0;
m_prefetch_running = true;
for (uint64_t ext = 0; ext < m_extents && !m_stop_requested; ++ext) {
BEESNOTE("prefetching hash table extent #" << ext << " of " << m_extents);
catch_all([&]() {
fetch_missing_extent_by_index(ext);
BEESNOTE("analyzing hash table extent #" << ext << " of " << m_extents);
bool duplicate_bugs_found = false;
auto lock = lock_extent_by_index(ext);
for (Bucket *bucket = m_extent_ptr[ext].p_buckets; bucket < m_extent_ptr[ext + 1].p_buckets; ++bucket) {
if (verify_cell_range(bucket[0].p_cells, bucket[1].p_cells)) {
duplicate_bugs_found = true;
}
size_t this_bucket_occupied_count = 0;
for (Cell *cell = bucket[0].p_cells; cell < bucket[1].p_cells; ++cell) {
if (cell->e_addr) {
++this_bucket_occupied_count;
BeesAddress a(cell->e_addr);
if (a.is_compressed()) {
++compressed_count;
if (a.has_compressed_offset()) {
++compressed_offset_count;
}
}
if (a.is_toxic()) {
++toxic_count;
}
if (a.is_unaligned_eof()) {
++unaligned_eof_count;
}
}
++total_count;
}
++occupancy.at(this_bucket_occupied_count * width / (1 + c_cells_per_bucket) );
// Count these instead of calculating the number so we get better stats in case of exceptions
occupied_count += this_bucket_occupied_count;
}
if (duplicate_bugs_found) {
set_extent_dirty_locked(ext);
}
});
}
m_prefetch_running = false;
BEESNOTE("calculating hash table statistics");
vector<string> histogram;
vector<size_t> thresholds;
size_t threshold = 1;
bool threshold_exceeded = false;
do {
threshold_exceeded = false;
histogram.push_back(string(width, ' '));
thresholds.push_back(threshold);
for (size_t x = 0; x < width; ++x) {
if (occupancy.at(x) >= threshold) {
histogram.back().at(x) = '#';
threshold_exceeded = true;
}
}
threshold *= 2;
} while (threshold_exceeded);
ostringstream out;
size_t count = histogram.size();
bool first_line = true;
for (auto it = histogram.rbegin(); it != histogram.rend(); ++it) {
out << *it << " " << thresholds.at(--count);
if (first_line) {
first_line = false;
out << " pages";
}
out << "\n";
}
size_t uncompressed_count = occupied_count - compressed_offset_count;
ostringstream graph_blob;
graph_blob << "Now: " << format_time(time(NULL)) << "\n";
graph_blob << "Uptime: " << m_ctx->total_timer().age() << " seconds\n";
graph_blob << "Version: " << BEES_VERSION << "\n";
graph_blob
<< "\nHash table page occupancy histogram (" << occupied_count << "/" << total_count << " cells occupied, " << (occupied_count * 100 / total_count) << "%)\n"
<< out.str() << "0% | 25% | 50% | 75% | 100% page fill\n"
<< "compressed " << compressed_count << " (" << percent(compressed_count, occupied_count) << ")\n"
<< "uncompressed " << uncompressed_count << " (" << percent(uncompressed_count, occupied_count) << ")"
<< " unaligned_eof " << unaligned_eof_count << " (" << percent(unaligned_eof_count, occupied_count) << ")"
<< " toxic " << toxic_count << " (" << percent(toxic_count, occupied_count) << ")";
graph_blob << "\n\n";
graph_blob << "TOTAL:\n";
auto thisStats = BeesStats::s_global;
graph_blob << "\t" << thisStats << "\n";
graph_blob << "\nRATES:\n";
auto avg_rates = thisStats / m_ctx->total_timer().age();
graph_blob << "\t" << avg_rates << "\n";
BEESLOGINFO(graph_blob.str());
catch_all([&]() {
m_stats_file.write(graph_blob.str());
});
if (not_locked && !m_stop_requested) {
// Always do the mlock, whether shared or not
THROW_CHECK1(runtime_error, m_size, m_size > 0);
BEESLOGINFO("mlock(" << pretty(m_size) << ")...");
Timer lock_time;
catch_all([&]() {
BEESNOTE("mlock " << pretty(m_size));
DIE_IF_NON_ZERO(mlock(m_byte_ptr, m_size));
});
BEESLOGINFO("mlock(" << pretty(m_size) << ") done in " << lock_time << " sec");
not_locked = false;
}
BEESNOTE("idle " << BEES_HASH_TABLE_ANALYZE_INTERVAL << "s");
unique_lock<mutex> lock(m_stop_mutex);
if (m_stop_requested) {
BEESLOGDEBUG("Stop requested in hash table prefetch");
return;
}
m_stop_condvar.wait_for(lock, chrono::duration<double>(BEES_HASH_TABLE_ANALYZE_INTERVAL));
}
}
size_t
BeesHashTable::hash_to_extent_index(HashType hash)
{
auto pr = get_extent_range(hash);
uint64_t extent_index = reinterpret_cast<const Extent *>(pr.first) - m_extent_ptr;
THROW_CHECK2(runtime_error, extent_index, m_extents, extent_index < m_extents);
return extent_index;
}
BeesHashTable::ExtentMetaData::ExtentMetaData() :
m_mutex_ptr(make_shared<mutex>())
{
}
unique_lock<mutex>
BeesHashTable::lock_extent_by_index(uint64_t extent_index)
{
THROW_CHECK2(out_of_range, extent_index, m_extents, extent_index < m_extents);
return unique_lock<mutex>(*m_extent_metadata.at(extent_index).m_mutex_ptr);
}
unique_lock<mutex>
BeesHashTable::lock_extent_by_hash(HashType hash)
{
BEESTOOLONG("fetch_missing_extent for hash " << to_hex(hash));
return lock_extent_by_index(hash_to_extent_index(hash));
}
void
BeesHashTable::fetch_missing_extent_by_index(uint64_t extent_index)
{
BEESNOTE("checking hash extent #" << extent_index << " of " << m_extents << " extents");
auto lock = lock_extent_by_index(extent_index);
if (!m_extent_metadata.at(extent_index).m_missing) {
return;
}
// OK we have to read this extent
BEESNOTE("fetching hash extent #" << extent_index << " of " << m_extents << " extents");
BEESTRACE("Fetching hash extent #" << extent_index << " of " << m_extents << " extents");
BEESTOOLONG("Fetching hash extent #" << extent_index << " of " << m_extents << " extents");
uint8_t *const dirty_extent = m_extent_ptr[extent_index].p_byte;
uint8_t *const dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte;
const size_t dirty_extent_size = dirty_extent_end - dirty_extent;
const size_t dirty_extent_offset = dirty_extent - m_byte_ptr;
// If the read fails don't retry, just go with whatever data we have
m_extent_metadata.at(extent_index).m_missing = false;
catch_all([&]() {
BEESTOOLONG("pread(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
pread_or_die(m_fd, dirty_extent, dirty_extent_size, dirty_extent_offset);
// Only count extents successfully read
BEESCOUNT(hash_extent_in);
// Won't need that again
bees_unreadahead(m_fd, dirty_extent_offset, dirty_extent_size);
// If we are in prefetch, give the kernel a hint about the next extent
if (m_prefetch_running) {
// XXX: don't call this if bees_readahead is implemented by pread()
bees_readahead(m_fd, dirty_extent_offset + dirty_extent_size, dirty_extent_size);
}
});
}
void
BeesHashTable::fetch_missing_extent_by_hash(HashType hash)
{
uint64_t extent_index = hash_to_extent_index(hash);
BEESNOTE("waiting to fetch hash extent #" << extent_index << " of " << m_extents << " extents");
fetch_missing_extent_by_index(extent_index);
}
vector<BeesHashTable::Cell>
BeesHashTable::find_cell(HashType hash)
{
fetch_missing_extent_by_hash(hash);
BEESTOOLONG("find_cell hash " << BeesHash(hash));
vector<Cell> rv;
auto lock = lock_extent_by_hash(hash);
auto er = get_cell_range(hash);
// FIXME: Weed out zero addresses in the table due to earlier bugs
copy_if(er.first, er.second, back_inserter(rv), [=](const Cell &ip) { return ip.e_hash == hash && ip.e_addr >= 0x1000; });
BEESCOUNT(hash_lookup);
return rv;
}
/// Remove a hash from the table, leaving an empty space on the list
/// where the hash used to be. Used when an invalid address is found
/// because lookups on invalid addresses really hurt.
void
BeesHashTable::erase_hash_addr(HashType hash, AddrType addr)
{
fetch_missing_extent_by_hash(hash);
BEESTOOLONG("erase hash " << to_hex(hash) << " addr " << addr);
auto lock = lock_extent_by_hash(hash);
auto er = get_cell_range(hash);
Cell mv(hash, addr);
Cell *ip = find(er.first, er.second, mv);
bool found = (ip < er.second);
if (found) {
*ip = Cell(0, 0);
set_extent_dirty_locked(hash_to_extent_index(hash));
BEESCOUNT(hash_erase);
#if 0
if (verify_cell_range(er.first, er.second)) {
BEESLOGDEBUG("while erasing hash " << hash << " addr " << addr);
}
#endif
} else {
BEESCOUNT(hash_erase_miss);
}
}
/// Insert a hash entry at the head of the list. If entry is already
/// present in list, move it to the front of the list without dropping
/// any entries, and return true. If entry is not present in list,
/// insert it at the front of the list, possibly dropping the last entry
/// in the list, and return false. Used to move duplicate hash blocks
/// to the front of the list.
bool
BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr)
{
fetch_missing_extent_by_hash(hash);
BEESTOOLONG("push_front_hash_addr hash " << BeesHash(hash) <<" addr " << BeesAddress(addr));
auto lock = lock_extent_by_hash(hash);
auto er = get_cell_range(hash);
Cell mv(hash, addr);
Cell *ip = find(er.first, er.second, mv);
bool found = (ip < er.second);
if (!found) {
// If no match found, get rid of an empty space instead
// If no empty spaces, ip will point to end
ip = find(er.first, er.second, Cell(0, 0));
}
if (ip > er.first) {
// Delete matching entry, first empty entry,
// or last entry whether empty or not
// move_backward(er.first, ip - 1, ip);
auto sp = ip;
auto dp = ip;
--sp;
// If we are deleting the last entry then don't copy it
if (dp == er.second) {
--sp;
--dp;
BEESCOUNT(hash_evict);
}
while (dp > er.first) {
*dp-- = *sp--;
}
}
// There is now a space at the front, insert there if different
if (er.first[0] != mv) {
er.first[0] = mv;
set_extent_dirty_locked(hash_to_extent_index(hash));
BEESCOUNT(hash_front);
} else {
BEESCOUNT(hash_front_already);
}
#if 0
if (verify_cell_range(er.first, er.second)) {
BEESLOGDEBUG("while push_fronting hash " << hash << " addr " << addr);
}
#endif
return found;
}
/// Insert a hash entry at some unspecified point in the list.
/// If entry is already present in list, returns true and does not
/// modify list. If entry is not present in list, returns false and
/// inserts at a random position in the list, possibly evicting the entry
/// at the end of the list. Used to insert new unique (not-yet-duplicate)
/// blocks in random order.
bool
BeesHashTable::push_random_hash_addr(HashType hash, AddrType addr)
{
fetch_missing_extent_by_hash(hash);
BEESTOOLONG("push_random_hash_addr hash " << BeesHash(hash) << " addr " << BeesAddress(addr));
auto lock = lock_extent_by_hash(hash);
auto er = get_cell_range(hash);
Cell mv(hash, addr);
Cell *ip = find(er.first, er.second, mv);
bool found = (ip < er.second);
thread_local default_random_engine generator;
thread_local uniform_int_distribution<int> distribution(0, c_cells_per_bucket - 1);
auto pos = distribution(generator);
int case_cond = 0;
#if 0
vector<Cell> saved(er.first, er.second);
#endif
if (found) {
// If hash already exists after pos, swap with pos
if (ip > er.first + pos) {
// move_backward(er.first + pos, ip - 1, ip);
auto sp = ip;
auto dp = ip;
--sp;
while (dp > er.first + pos) {
*dp-- = *sp--;
}
*dp = mv;
BEESCOUNT(hash_bump);
case_cond = 1;
goto ret_dirty;
}
// Hash already exists before (or at) pos, leave it there
BEESCOUNT(hash_already);
case_cond = 2;
goto ret;
}
// Find an empty space to back of pos
for (ip = er.first + pos; ip < er.second; ++ip) {
if (*ip == Cell(0, 0)) {
*ip = mv;
case_cond = 3;
goto ret_dirty;
}
}
// Find an empty space to front of pos
// if there is anything to front of pos
if (pos > 0) {
for (ip = er.first + pos - 1; ip >= er.first; --ip) {
if (*ip == Cell(0, 0)) {
*ip = mv;
case_cond = 4;
goto ret_dirty;
}
}
}
// Evict something and insert at pos
// move_backward(er.first + pos, er.second - 1, er.second);
ip = er.second - 1;
while (ip > er.first + pos) {
auto dp = ip;
*dp = *--ip;
}
er.first[pos] = mv;
BEESCOUNT(hash_evict);
case_cond = 5;
ret_dirty:
BEESCOUNT(hash_insert);
set_extent_dirty_locked(hash_to_extent_index(hash));
ret:
#if 0
if (verify_cell_range(er.first, er.second, false)) {
BEESLOG("while push_randoming (case " << case_cond << ") pos " << pos
<< " ip " << (ip - er.first) << " " << mv);
// dump_bucket_locked(saved.data(), saved.data() + saved.size());
// dump_bucket_locked(er.first, er.second);
}
#else
(void)case_cond;
#endif
return found;
}
void
BeesHashTable::try_mmap_flags(int flags)
{
if (!m_cell_ptr) {
THROW_CHECK1(out_of_range, m_size, m_size > 0);
Timer map_time;
catch_all([&]() {
BEESLOGINFO("mapping hash table size " << m_size << " with flags " << mmap_flags_ntoa(flags));
void *ptr = mmap_or_die(nullptr, m_size, PROT_READ | PROT_WRITE, flags, flags & MAP_ANONYMOUS ? -1 : int(m_fd), 0);
BEESLOGINFO("mmap done in " << map_time << " sec");
m_cell_ptr = static_cast<Cell *>(ptr);
void *ptr_end = static_cast<uint8_t *>(ptr) + m_size;
m_cell_ptr_end = static_cast<Cell *>(ptr_end);
});
}
}
void
BeesHashTable::open_file()
{
// OK open hash table
BEESNOTE("opening hash table '" << m_filename << "' target size " << m_size << " (" << pretty(m_size) << ")");
// Try to open existing hash table
Fd new_fd = openat(m_ctx->home_fd(), m_filename.c_str(), FLAGS_OPEN_FILE_RW, 0700);
// If that doesn't work, try to make a new one
if (!new_fd) {
string tmp_filename = m_filename + ".tmp";
BEESNOTE("creating new hash table '" << tmp_filename << "'");
BEESLOGINFO("Creating new hash table '" << tmp_filename << "'");
unlinkat(m_ctx->home_fd(), tmp_filename.c_str(), 0);
new_fd = openat_or_die(m_ctx->home_fd(), tmp_filename, FLAGS_CREATE_FILE, 0700);
BEESNOTE("truncating new hash table '" << tmp_filename << "' size " << m_size << " (" << pretty(m_size) << ")");
BEESLOGINFO("Truncating new hash table '" << tmp_filename << "' size " << m_size << " (" << pretty(m_size) << ")");
ftruncate_or_die(new_fd, m_size);
BEESNOTE("truncating new hash table '" << tmp_filename << "' -> '" << m_filename << "'");
BEESLOGINFO("Truncating new hash table '" << tmp_filename << "' -> '" << m_filename << "'");
renameat_or_die(m_ctx->home_fd(), tmp_filename, m_ctx->home_fd(), m_filename);
}
Stat st(new_fd);
off_t new_size = st.st_size;
THROW_CHECK1(invalid_argument, new_size, new_size > 0);
THROW_CHECK1(invalid_argument, new_size, (new_size % BLOCK_SIZE_HASHTAB_EXTENT) == 0);
m_size = new_size;
m_fd = new_fd;
}
BeesHashTable::BeesHashTable(shared_ptr<BeesContext> ctx, string filename, off_t size) :
m_ctx(ctx),
m_size(0),
m_void_ptr(nullptr),
m_void_ptr_end(nullptr),
m_buckets(0),
m_cells(0),
m_writeback_thread("hash_writeback"),
m_prefetch_thread("hash_prefetch"),
m_flush_rate_limit(BEES_FLUSH_RATE),
m_stats_file(m_ctx->home_fd(), "beesstats.txt")
{
// Sanity checks to protect the implementation from its weaknesses
THROW_CHECK2(invalid_argument, BLOCK_SIZE_HASHTAB_BUCKET, BLOCK_SIZE_HASHTAB_EXTENT, (BLOCK_SIZE_HASHTAB_EXTENT % BLOCK_SIZE_HASHTAB_BUCKET) == 0);
// There's more than one union
THROW_CHECK2(runtime_error, sizeof(Bucket), BLOCK_SIZE_HASHTAB_BUCKET, BLOCK_SIZE_HASHTAB_BUCKET == sizeof(Bucket));
THROW_CHECK2(runtime_error, sizeof(Bucket::p_byte), BLOCK_SIZE_HASHTAB_BUCKET, BLOCK_SIZE_HASHTAB_BUCKET == sizeof(Bucket::p_byte));
THROW_CHECK2(runtime_error, sizeof(Extent), BLOCK_SIZE_HASHTAB_EXTENT, BLOCK_SIZE_HASHTAB_EXTENT == sizeof(Extent));
THROW_CHECK2(runtime_error, sizeof(Extent::p_byte), BLOCK_SIZE_HASHTAB_EXTENT, BLOCK_SIZE_HASHTAB_EXTENT == sizeof(Extent::p_byte));
m_filename = filename;
m_size = size;
open_file();
// Now we know size we can compute stuff
BEESTRACE("hash table size " << m_size);
BEESTRACE("hash table bucket size " << BLOCK_SIZE_HASHTAB_BUCKET);
BEESTRACE("hash table extent size " << BLOCK_SIZE_HASHTAB_EXTENT);
BEESLOGINFO("opened hash table filename '" << filename << "' length " << m_size);
m_buckets = m_size / BLOCK_SIZE_HASHTAB_BUCKET;
m_cells = m_buckets * c_cells_per_bucket;
m_extents = (m_size + BLOCK_SIZE_HASHTAB_EXTENT - 1) / BLOCK_SIZE_HASHTAB_EXTENT;
BEESLOGINFO("\tcells " << m_cells << ", buckets " << m_buckets << ", extents " << m_extents);
BEESLOGINFO("\tflush rate limit " << BEES_FLUSH_RATE);
// Try to mmap that much memory
try_mmap_flags(MAP_PRIVATE | MAP_ANONYMOUS);
if (!m_cell_ptr) {
THROW_ERRNO("unable to mmap " << filename);
}
// Do unions work the way we think (and rely on)?
THROW_CHECK2(runtime_error, m_void_ptr, m_cell_ptr, m_void_ptr == m_cell_ptr);
THROW_CHECK2(runtime_error, m_void_ptr, m_byte_ptr, m_void_ptr == m_byte_ptr);
THROW_CHECK2(runtime_error, m_void_ptr, m_bucket_ptr, m_void_ptr == m_bucket_ptr);
THROW_CHECK2(runtime_error, m_void_ptr, m_extent_ptr, m_void_ptr == m_extent_ptr);
// Give all the madvise hints that the kernel understands
const struct madv_flag {
const char *name;
int value;
} madv_flags[] = {
{ .name = "MADV_HUGEPAGE", .value = MADV_HUGEPAGE },
{ .name = "MADV_DONTFORK", .value = MADV_DONTFORK },
{ .name = "MADV_DONTDUMP", .value = MADV_DONTDUMP },
{ .name = "", .value = 0 },
};
for (auto fp = madv_flags; fp->value; ++fp) {
BEESTOOLONG("madvise(" << fp->name << ")");
if (madvise(m_byte_ptr, m_size, fp->value)) {
BEESLOGWARN("madvise(..., " << fp->name << "): " << strerror(errno) << " (ignored)");
}
}
m_extent_metadata.resize(m_extents);
m_writeback_thread.exec([&]() {
writeback_loop();
});
m_prefetch_thread.exec([&]() {
prefetch_loop();
});
// Blacklist might fail if the hash table is not stored on a btrfs
catch_all([&]() {
m_ctx->blacklist_insert(BeesFileId(m_fd));
});
}
BeesHashTable::~BeesHashTable()
{
BEESLOGDEBUG("Destroy BeesHashTable");
if (m_cell_ptr && m_size) {
// Dirty extents should have been flushed before now,
// e.g. in stop(). If that didn't happen, don't fall
// into the same trap (and maybe throw an exception) here.
// flush_dirty_extents(false);
catch_all([&]() {
// drop the memory mapping
BEESTOOLONG("unmap handle table size " << pretty(m_size));
DIE_IF_NON_ZERO(munmap(m_cell_ptr, m_size));
});
m_cell_ptr = nullptr;
m_size = 0;
}
BEESLOGDEBUG("BeesHashTable destroyed");
}
void
BeesHashTable::stop()
{
BEESNOTE("stopping BeesHashTable threads");
BEESLOGDEBUG("Stopping BeesHashTable threads");
unique_lock<mutex> lock(m_stop_mutex);
m_stop_requested = true;
m_stop_condvar.notify_all();
lock.unlock();
// Wake up hash writeback too
unique_lock<mutex> dirty_lock(m_dirty_mutex);
m_dirty_condvar.notify_all();
dirty_lock.unlock();
BEESNOTE("waiting for hash_prefetch thread");
BEESLOGDEBUG("Waiting for hash_prefetch thread");
m_prefetch_thread.join();
BEESNOTE("waiting for hash_writeback thread");
BEESLOGDEBUG("Waiting for hash_writeback thread");
m_writeback_thread.join();
if (m_cell_ptr && m_size) {
BEESLOGDEBUG("Flushing hash table");
BEESNOTE("flushing hash table");
flush_dirty_extents(false);
}
BEESLOGDEBUG("BeesHashTable stopped");
}