mirror of
https://github.com/Zygo/bees.git
synced 2025-05-18 05:45:45 +02:00
Get rid of the thread-local TempFiles and use Pool instead. This eliminates a potential FD leak when the loadavg governor repeatedly creates and destroys threads. With the old per-thread TempFiles, we were guaranteed to have exclusive ownership of the TempFile object within the current thread. Pool is somewhat stricter: it only guarantees ownership while the checked-out Handle exists. Adjust the users of TempFile objects to ensure they hold the Handle object until they are finished using the TempFile. It appears that maintaining large, heavily-reflinked, long-lived temporary files costs more than truncating after every use: btrfs has to write multiple references to the temporary file's extents, then some commits later, remove references as the temporary file is deleted or truncated. Using the temporary file in a dedupe operation flushes the data to disk, so nothing is saved by pretending that there is writeback pipelining and trying to avoid flushes in truncate. Pool provides usage tracking and a checkin callback, so use it to truncate the temporary file immediately after every use. Redesign TempFile so that every instance creates exactly one Fd which persists over the lifetime of the TempFile object. Provide a reset() method which resets the file back to the initial state and call it from the Pool checkin callback. This makes TempFile's lifetime equivalent to its Fd's lifetime, which simplifies interactions with FdCache and Roots. This change means we can now blacklist temporary files without having an effective memory leak, so do that. We also have a reason to ever remove something from the blacklist, so add a method for that too. In order to move to extent-centric addressing, we need to be able to reliably open temporary files by root and inode number. Previously we would place TempFile fd's into the cache with insert_root_ino, but the cache would be cleared periodically, and it would not be possible to reopen temporary files after that happened. Now that the TempFile's lifetime is the same as the TempFile Fd's lifetime, we can have TempFile manage a separate FileId -> Fd map in Roots which is unaffected by the periodic cache clearing. BeesRoots::open_root_ino_nocache will check this map before attempting to open the file via btrfs root+ino lookup, and return it through the cache as if Roots had opened the file via btrfs. Hold a reference to BeesRoots in BeesTempFile because the usual way to get such a reference now throws an exception in BeesTempFile's destructor. These changes make method BeesTempFile::create() and all methods named insert_root_ino unnecessary, so delete them. We construct and destroy TempFiles much less often now, so make their constructor and destructor more informative. Signed-off-by: Zygo Blaxell <bees@furryterror.org>
796 lines
24 KiB
C++
796 lines
24 KiB
C++
#include "bees.h"
|
|
|
|
#include "crucible/city.h"
|
|
#include "crucible/crc64.h"
|
|
#include "crucible/string.h"
|
|
|
|
#include <algorithm>
|
|
#include <random>
|
|
|
|
#include <sys/mman.h>
|
|
|
|
using namespace crucible;
|
|
using namespace std;
|
|
|
|
BeesHash::BeesHash(const uint8_t *ptr, size_t len) :
|
|
// m_hash(CityHash64(reinterpret_cast<const char *>(ptr), len))
|
|
m_hash(Digest::CRC::crc64(ptr, len))
|
|
{
|
|
}
|
|
|
|
ostream &
|
|
operator<<(ostream &os, const BeesHash &bh)
|
|
{
|
|
return os << to_hex(BeesHash::Type(bh));
|
|
}
|
|
|
|
ostream &
|
|
operator<<(ostream &os, const BeesHashTable::Cell &bhte)
|
|
{
|
|
return os << "BeesHashTable::Cell { hash = " << BeesHash(bhte.e_hash) << ", addr = "
|
|
<< BeesAddress(bhte.e_addr) << " }";
|
|
}
|
|
|
|
#if 0
|
|
static
|
|
void
|
|
dump_bucket_locked(BeesHashTable::Cell *p, BeesHashTable::Cell *q)
|
|
{
|
|
for (auto i = p; i < q; ++i) {
|
|
BEESLOG("Entry " << i - p << " " << *i);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
static const bool VERIFY_CLEARS_BUGS = false;
|
|
|
|
bool
|
|
verify_cell_range(BeesHashTable::Cell *p, BeesHashTable::Cell *q, bool clear_bugs = VERIFY_CLEARS_BUGS)
|
|
{
|
|
// Must be called while holding m_bucket_mutex
|
|
bool bugs_found = false;
|
|
set<BeesHashTable::Cell> seen_it;
|
|
for (BeesHashTable::Cell *cell = p; cell < q; ++cell) {
|
|
if (cell->e_addr && cell->e_addr < 0x1000) {
|
|
BEESCOUNT(bug_hash_magic_addr);
|
|
BEESLOGDEBUG("Bad hash table address hash " << to_hex(cell->e_hash) << " addr " << to_hex(cell->e_addr));
|
|
if (clear_bugs) {
|
|
cell->e_addr = 0;
|
|
cell->e_hash = 0;
|
|
}
|
|
bugs_found = true;
|
|
}
|
|
if (cell->e_addr && !seen_it.insert(*cell).second) {
|
|
BEESCOUNT(bug_hash_duplicate_cell);
|
|
// BEESLOGDEBUG("Duplicate hash table entry:\nthis = " << *cell << "\nold = " << *seen_it.find(*cell));
|
|
BEESLOGDEBUG("Duplicate hash table entry: " << *cell);
|
|
if (clear_bugs) {
|
|
cell->e_addr = 0;
|
|
cell->e_hash = 0;
|
|
}
|
|
bugs_found = true;
|
|
}
|
|
}
|
|
return bugs_found;
|
|
}
|
|
|
|
pair<BeesHashTable::Cell *, BeesHashTable::Cell *>
|
|
BeesHashTable::get_cell_range(HashType hash)
|
|
{
|
|
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
|
|
THROW_CHECK1(runtime_error, m_bucket_ptr, m_bucket_ptr != nullptr);
|
|
Bucket *pp = &m_bucket_ptr[hash % m_buckets];
|
|
Cell *bp = pp[0].p_cells;
|
|
Cell *ep = pp[1].p_cells;
|
|
THROW_CHECK2(out_of_range, m_cell_ptr, bp, bp >= m_cell_ptr);
|
|
THROW_CHECK2(out_of_range, m_cell_ptr_end, ep, ep <= m_cell_ptr_end);
|
|
return make_pair(bp, ep);
|
|
}
|
|
|
|
pair<uint8_t *, uint8_t *>
|
|
BeesHashTable::get_extent_range(HashType hash)
|
|
{
|
|
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
|
|
THROW_CHECK1(runtime_error, m_bucket_ptr, m_bucket_ptr != nullptr);
|
|
Extent *iop = &m_extent_ptr[ (hash % m_buckets) / c_buckets_per_extent ];
|
|
uint8_t *bp = iop[0].p_byte;
|
|
uint8_t *ep = iop[1].p_byte;
|
|
THROW_CHECK2(out_of_range, m_byte_ptr, bp, bp >= m_byte_ptr);
|
|
THROW_CHECK2(out_of_range, m_byte_ptr_end, ep, ep <= m_byte_ptr_end);
|
|
return make_pair(bp, ep);
|
|
}
|
|
|
|
bool
|
|
BeesHashTable::flush_dirty_extent(uint64_t extent_index)
|
|
{
|
|
BEESNOTE("flushing extent #" << extent_index << " of " << m_extents << " extents");
|
|
|
|
auto lock = lock_extent_by_index(extent_index);
|
|
|
|
// Not dirty, nothing to do
|
|
if (!m_extent_metadata.at(extent_index).m_dirty) {
|
|
return false;
|
|
}
|
|
|
|
bool wrote_extent = false;
|
|
|
|
catch_all([&]() {
|
|
uint8_t *dirty_extent = m_extent_ptr[extent_index].p_byte;
|
|
uint8_t *dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte;
|
|
THROW_CHECK1(out_of_range, dirty_extent, dirty_extent >= m_byte_ptr);
|
|
THROW_CHECK1(out_of_range, dirty_extent_end, dirty_extent_end <= m_byte_ptr_end);
|
|
THROW_CHECK2(out_of_range, dirty_extent_end, dirty_extent, dirty_extent_end - dirty_extent == BLOCK_SIZE_HASHTAB_EXTENT);
|
|
BEESTOOLONG("pwrite(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
|
|
// Copy the extent because we might be stuck writing for a while
|
|
vector<uint8_t> extent_copy(dirty_extent, dirty_extent_end);
|
|
|
|
// Mark extent non-dirty while we still hold the lock
|
|
m_extent_metadata.at(extent_index).m_dirty = false;
|
|
|
|
// Release the lock
|
|
lock.unlock();
|
|
|
|
// Write the extent (or not)
|
|
pwrite_or_die(m_fd, extent_copy, dirty_extent - m_byte_ptr);
|
|
BEESCOUNT(hash_extent_out);
|
|
|
|
wrote_extent = true;
|
|
});
|
|
|
|
return wrote_extent;
|
|
}
|
|
|
|
size_t
|
|
BeesHashTable::flush_dirty_extents(bool slowly)
|
|
{
|
|
THROW_CHECK1(runtime_error, m_buckets, m_buckets > 0);
|
|
|
|
uint64_t wrote_extents = 0;
|
|
for (size_t extent_index = 0; extent_index < m_extents; ++extent_index) {
|
|
if (flush_dirty_extent(extent_index)) {
|
|
++wrote_extents;
|
|
if (slowly) {
|
|
BEESNOTE("flush rate limited after extent #" << extent_index << " of " << m_extents << " extents");
|
|
chrono::duration<double> sleep_time(m_flush_rate_limit.sleep_time(BLOCK_SIZE_HASHTAB_EXTENT));
|
|
unique_lock<mutex> lock(m_stop_mutex);
|
|
if (m_stop_requested) {
|
|
BEESLOGDEBUG("Stop requested in hash table flush_dirty_extents");
|
|
break;
|
|
}
|
|
m_stop_condvar.wait_for(lock, sleep_time);
|
|
}
|
|
}
|
|
}
|
|
if (!slowly) {
|
|
BEESLOGINFO("Flushed " << wrote_extents << " of " << m_extents << " extents");
|
|
}
|
|
return wrote_extents;
|
|
}
|
|
|
|
void
|
|
BeesHashTable::set_extent_dirty_locked(uint64_t extent_index)
|
|
{
|
|
// Must already be locked
|
|
m_extent_metadata.at(extent_index).m_dirty = true;
|
|
|
|
// Signal writeback thread
|
|
unique_lock<mutex> dirty_lock(m_dirty_mutex);
|
|
m_dirty = true;
|
|
m_dirty_condvar.notify_one();
|
|
}
|
|
|
|
void
|
|
BeesHashTable::writeback_loop()
|
|
{
|
|
while (!m_stop_requested) {
|
|
auto wrote_extents = flush_dirty_extents(true);
|
|
|
|
BEESNOTE("idle after writing " << wrote_extents << " of " << m_extents << " extents");
|
|
|
|
unique_lock<mutex> lock(m_dirty_mutex);
|
|
if (m_stop_requested) {
|
|
break;
|
|
}
|
|
if (m_dirty) {
|
|
m_dirty = false;
|
|
} else {
|
|
m_dirty_condvar.wait(lock);
|
|
}
|
|
}
|
|
BEESLOGDEBUG("Exited hash table writeback_loop");
|
|
}
|
|
|
|
static
|
|
string
|
|
percent(size_t num, size_t den)
|
|
{
|
|
if (den) {
|
|
return astringprintf("%u%%", num * 100 / den);
|
|
} else {
|
|
return "--%";
|
|
}
|
|
}
|
|
|
|
void
|
|
BeesHashTable::prefetch_loop()
|
|
{
|
|
bool not_locked = true;
|
|
while (!m_stop_requested) {
|
|
size_t width = 64;
|
|
vector<size_t> occupancy(width, 0);
|
|
size_t occupied_count = 0;
|
|
size_t total_count = 0;
|
|
size_t compressed_count = 0;
|
|
size_t compressed_offset_count = 0;
|
|
size_t toxic_count = 0;
|
|
size_t unaligned_eof_count = 0;
|
|
|
|
for (uint64_t ext = 0; ext < m_extents && !m_stop_requested; ++ext) {
|
|
BEESNOTE("prefetching hash table extent #" << ext << " of " << m_extents);
|
|
catch_all([&]() {
|
|
fetch_missing_extent_by_index(ext);
|
|
|
|
BEESNOTE("analyzing hash table extent #" << ext << " of " << m_extents);
|
|
bool duplicate_bugs_found = false;
|
|
auto lock = lock_extent_by_index(ext);
|
|
for (Bucket *bucket = m_extent_ptr[ext].p_buckets; bucket < m_extent_ptr[ext + 1].p_buckets; ++bucket) {
|
|
if (verify_cell_range(bucket[0].p_cells, bucket[1].p_cells)) {
|
|
duplicate_bugs_found = true;
|
|
}
|
|
size_t this_bucket_occupied_count = 0;
|
|
for (Cell *cell = bucket[0].p_cells; cell < bucket[1].p_cells; ++cell) {
|
|
if (cell->e_addr) {
|
|
++this_bucket_occupied_count;
|
|
BeesAddress a(cell->e_addr);
|
|
if (a.is_compressed()) {
|
|
++compressed_count;
|
|
if (a.has_compressed_offset()) {
|
|
++compressed_offset_count;
|
|
}
|
|
}
|
|
if (a.is_toxic()) {
|
|
++toxic_count;
|
|
}
|
|
if (a.is_unaligned_eof()) {
|
|
++unaligned_eof_count;
|
|
}
|
|
}
|
|
++total_count;
|
|
}
|
|
++occupancy.at(this_bucket_occupied_count * width / (1 + c_cells_per_bucket) );
|
|
// Count these instead of calculating the number so we get better stats in case of exceptions
|
|
occupied_count += this_bucket_occupied_count;
|
|
}
|
|
if (duplicate_bugs_found) {
|
|
set_extent_dirty_locked(ext);
|
|
}
|
|
});
|
|
}
|
|
|
|
BEESNOTE("calculating hash table statistics");
|
|
|
|
vector<string> histogram;
|
|
vector<size_t> thresholds;
|
|
size_t threshold = 1;
|
|
bool threshold_exceeded = false;
|
|
do {
|
|
threshold_exceeded = false;
|
|
histogram.push_back(string(width, ' '));
|
|
thresholds.push_back(threshold);
|
|
for (size_t x = 0; x < width; ++x) {
|
|
if (occupancy.at(x) >= threshold) {
|
|
histogram.back().at(x) = '#';
|
|
threshold_exceeded = true;
|
|
}
|
|
}
|
|
threshold *= 2;
|
|
} while (threshold_exceeded);
|
|
|
|
ostringstream out;
|
|
size_t count = histogram.size();
|
|
bool first_line = true;
|
|
for (auto it = histogram.rbegin(); it != histogram.rend(); ++it) {
|
|
out << *it << " " << thresholds.at(--count);
|
|
if (first_line) {
|
|
first_line = false;
|
|
out << " pages";
|
|
}
|
|
out << "\n";
|
|
}
|
|
|
|
size_t uncompressed_count = occupied_count - compressed_offset_count;
|
|
|
|
ostringstream graph_blob;
|
|
|
|
graph_blob << "Now: " << format_time(time(NULL)) << "\n";
|
|
graph_blob << "Uptime: " << m_ctx->total_timer().age() << " seconds\n";
|
|
graph_blob << "Version: " << BEES_VERSION << "\n";
|
|
|
|
graph_blob
|
|
<< "\nHash table page occupancy histogram (" << occupied_count << "/" << total_count << " cells occupied, " << (occupied_count * 100 / total_count) << "%)\n"
|
|
<< out.str() << "0% | 25% | 50% | 75% | 100% page fill\n"
|
|
<< "compressed " << compressed_count << " (" << percent(compressed_count, occupied_count) << ")\n"
|
|
<< "uncompressed " << uncompressed_count << " (" << percent(uncompressed_count, occupied_count) << ")"
|
|
<< " unaligned_eof " << unaligned_eof_count << " (" << percent(unaligned_eof_count, occupied_count) << ")"
|
|
<< " toxic " << toxic_count << " (" << percent(toxic_count, occupied_count) << ")";
|
|
|
|
graph_blob << "\n\n";
|
|
|
|
graph_blob << "TOTAL:\n";
|
|
auto thisStats = BeesStats::s_global;
|
|
graph_blob << "\t" << thisStats << "\n";
|
|
|
|
graph_blob << "\nRATES:\n";
|
|
auto avg_rates = thisStats / m_ctx->total_timer().age();
|
|
graph_blob << "\t" << avg_rates << "\n";
|
|
|
|
BEESLOGINFO(graph_blob.str());
|
|
catch_all([&]() {
|
|
m_stats_file.write(graph_blob.str());
|
|
});
|
|
|
|
if (not_locked && !m_stop_requested) {
|
|
// Always do the mlock, whether shared or not
|
|
THROW_CHECK1(runtime_error, m_size, m_size > 0);
|
|
BEESLOGINFO("mlock(" << pretty(m_size) << ")...");
|
|
Timer lock_time;
|
|
catch_all([&]() {
|
|
BEESNOTE("mlock " << pretty(m_size));
|
|
DIE_IF_NON_ZERO(mlock(m_byte_ptr, m_size));
|
|
});
|
|
BEESLOGINFO("mlock(" << pretty(m_size) << ") done in " << lock_time << " sec");
|
|
not_locked = false;
|
|
}
|
|
|
|
BEESNOTE("idle " << BEES_HASH_TABLE_ANALYZE_INTERVAL << "s");
|
|
unique_lock<mutex> lock(m_stop_mutex);
|
|
if (m_stop_requested) {
|
|
BEESLOGDEBUG("Stop requested in hash table prefetch");
|
|
return;
|
|
}
|
|
m_stop_condvar.wait_for(lock, chrono::duration<double>(BEES_HASH_TABLE_ANALYZE_INTERVAL));
|
|
}
|
|
}
|
|
|
|
size_t
|
|
BeesHashTable::hash_to_extent_index(HashType hash)
|
|
{
|
|
auto pr = get_extent_range(hash);
|
|
uint64_t extent_index = reinterpret_cast<const Extent *>(pr.first) - m_extent_ptr;
|
|
THROW_CHECK2(runtime_error, extent_index, m_extents, extent_index < m_extents);
|
|
return extent_index;
|
|
}
|
|
|
|
BeesHashTable::ExtentMetaData::ExtentMetaData() :
|
|
m_mutex_ptr(make_shared<mutex>())
|
|
{
|
|
}
|
|
|
|
unique_lock<mutex>
|
|
BeesHashTable::lock_extent_by_index(uint64_t extent_index)
|
|
{
|
|
THROW_CHECK2(out_of_range, extent_index, m_extents, extent_index < m_extents);
|
|
return unique_lock<mutex>(*m_extent_metadata.at(extent_index).m_mutex_ptr);
|
|
}
|
|
|
|
unique_lock<mutex>
|
|
BeesHashTable::lock_extent_by_hash(HashType hash)
|
|
{
|
|
BEESTOOLONG("fetch_missing_extent for hash " << to_hex(hash));
|
|
return lock_extent_by_index(hash_to_extent_index(hash));
|
|
}
|
|
|
|
void
|
|
BeesHashTable::fetch_missing_extent_by_index(uint64_t extent_index)
|
|
{
|
|
BEESNOTE("checking hash extent #" << extent_index << " of " << m_extents << " extents");
|
|
auto lock = lock_extent_by_index(extent_index);
|
|
if (!m_extent_metadata.at(extent_index).m_missing) {
|
|
return;
|
|
}
|
|
|
|
// OK we have to read this extent
|
|
BEESNOTE("fetching hash extent #" << extent_index << " of " << m_extents << " extents");
|
|
BEESTRACE("Fetching hash extent #" << extent_index << " of " << m_extents << " extents");
|
|
BEESTOOLONG("Fetching hash extent #" << extent_index << " of " << m_extents << " extents");
|
|
|
|
uint8_t *dirty_extent = m_extent_ptr[extent_index].p_byte;
|
|
uint8_t *dirty_extent_end = m_extent_ptr[extent_index + 1].p_byte;
|
|
|
|
// If the read fails don't retry, just go with whatever data we have
|
|
m_extent_metadata.at(extent_index).m_missing = false;
|
|
|
|
catch_all([&]() {
|
|
BEESTOOLONG("pread(fd " << m_fd << " '" << name_fd(m_fd)<< "', length " << to_hex(dirty_extent_end - dirty_extent) << ", offset " << to_hex(dirty_extent - m_byte_ptr) << ")");
|
|
pread_or_die(m_fd, dirty_extent, dirty_extent_end - dirty_extent, dirty_extent - m_byte_ptr);
|
|
|
|
// Only count extents successfully read
|
|
BEESCOUNT(hash_extent_in);
|
|
});
|
|
}
|
|
|
|
void
|
|
BeesHashTable::fetch_missing_extent_by_hash(HashType hash)
|
|
{
|
|
uint64_t extent_index = hash_to_extent_index(hash);
|
|
BEESNOTE("waiting to fetch hash extent #" << extent_index << " of " << m_extents << " extents");
|
|
|
|
fetch_missing_extent_by_index(extent_index);
|
|
}
|
|
|
|
vector<BeesHashTable::Cell>
|
|
BeesHashTable::find_cell(HashType hash)
|
|
{
|
|
fetch_missing_extent_by_hash(hash);
|
|
BEESTOOLONG("find_cell hash " << BeesHash(hash));
|
|
vector<Cell> rv;
|
|
auto lock = lock_extent_by_hash(hash);
|
|
auto er = get_cell_range(hash);
|
|
// FIXME: Weed out zero addresses in the table due to earlier bugs
|
|
copy_if(er.first, er.second, back_inserter(rv), [=](const Cell &ip) { return ip.e_hash == hash && ip.e_addr >= 0x1000; });
|
|
BEESCOUNT(hash_lookup);
|
|
return rv;
|
|
}
|
|
|
|
/// Remove a hash from the table, leaving an empty space on the list
|
|
/// where the hash used to be. Used when an invalid address is found
|
|
/// because lookups on invalid addresses really hurt.
|
|
void
|
|
BeesHashTable::erase_hash_addr(HashType hash, AddrType addr)
|
|
{
|
|
fetch_missing_extent_by_hash(hash);
|
|
BEESTOOLONG("erase hash " << to_hex(hash) << " addr " << addr);
|
|
auto lock = lock_extent_by_hash(hash);
|
|
auto er = get_cell_range(hash);
|
|
Cell mv(hash, addr);
|
|
Cell *ip = find(er.first, er.second, mv);
|
|
bool found = (ip < er.second);
|
|
if (found) {
|
|
*ip = Cell(0, 0);
|
|
set_extent_dirty_locked(hash_to_extent_index(hash));
|
|
BEESCOUNT(hash_erase);
|
|
#if 0
|
|
if (verify_cell_range(er.first, er.second)) {
|
|
BEESLOGDEBUG("while erasing hash " << hash << " addr " << addr);
|
|
}
|
|
#endif
|
|
} else {
|
|
BEESCOUNT(hash_erase_miss);
|
|
}
|
|
}
|
|
|
|
/// Insert a hash entry at the head of the list. If entry is already
|
|
/// present in list, move it to the front of the list without dropping
|
|
/// any entries, and return true. If entry is not present in list,
|
|
/// insert it at the front of the list, possibly dropping the last entry
|
|
/// in the list, and return false. Used to move duplicate hash blocks
|
|
/// to the front of the list.
|
|
bool
|
|
BeesHashTable::push_front_hash_addr(HashType hash, AddrType addr)
|
|
{
|
|
fetch_missing_extent_by_hash(hash);
|
|
BEESTOOLONG("push_front_hash_addr hash " << BeesHash(hash) <<" addr " << BeesAddress(addr));
|
|
auto lock = lock_extent_by_hash(hash);
|
|
auto er = get_cell_range(hash);
|
|
Cell mv(hash, addr);
|
|
Cell *ip = find(er.first, er.second, mv);
|
|
bool found = (ip < er.second);
|
|
if (!found) {
|
|
// If no match found, get rid of an empty space instead
|
|
// If no empty spaces, ip will point to end
|
|
ip = find(er.first, er.second, Cell(0, 0));
|
|
}
|
|
if (ip > er.first) {
|
|
// Delete matching entry, first empty entry,
|
|
// or last entry whether empty or not
|
|
// move_backward(er.first, ip - 1, ip);
|
|
auto sp = ip;
|
|
auto dp = ip;
|
|
--sp;
|
|
// If we are deleting the last entry then don't copy it
|
|
if (dp == er.second) {
|
|
--sp;
|
|
--dp;
|
|
BEESCOUNT(hash_evict);
|
|
}
|
|
while (dp > er.first) {
|
|
*dp-- = *sp--;
|
|
}
|
|
}
|
|
// There is now a space at the front, insert there if different
|
|
if (er.first[0] != mv) {
|
|
er.first[0] = mv;
|
|
set_extent_dirty_locked(hash_to_extent_index(hash));
|
|
BEESCOUNT(hash_front);
|
|
} else {
|
|
BEESCOUNT(hash_front_already);
|
|
}
|
|
#if 0
|
|
if (verify_cell_range(er.first, er.second)) {
|
|
BEESLOGDEBUG("while push_fronting hash " << hash << " addr " << addr);
|
|
}
|
|
#endif
|
|
return found;
|
|
}
|
|
|
|
/// Insert a hash entry at some unspecified point in the list.
|
|
/// If entry is already present in list, returns true and does not
|
|
/// modify list. If entry is not present in list, returns false and
|
|
/// inserts at a random position in the list, possibly evicting the entry
|
|
/// at the end of the list. Used to insert new unique (not-yet-duplicate)
|
|
/// blocks in random order.
|
|
bool
|
|
BeesHashTable::push_random_hash_addr(HashType hash, AddrType addr)
|
|
{
|
|
fetch_missing_extent_by_hash(hash);
|
|
BEESTOOLONG("push_random_hash_addr hash " << BeesHash(hash) << " addr " << BeesAddress(addr));
|
|
auto lock = lock_extent_by_hash(hash);
|
|
auto er = get_cell_range(hash);
|
|
Cell mv(hash, addr);
|
|
Cell *ip = find(er.first, er.second, mv);
|
|
bool found = (ip < er.second);
|
|
|
|
thread_local default_random_engine generator;
|
|
thread_local uniform_int_distribution<int> distribution(0, c_cells_per_bucket - 1);
|
|
auto pos = distribution(generator);
|
|
|
|
int case_cond = 0;
|
|
#if 0
|
|
vector<Cell> saved(er.first, er.second);
|
|
#endif
|
|
|
|
if (found) {
|
|
// If hash already exists after pos, swap with pos
|
|
if (ip > er.first + pos) {
|
|
|
|
// move_backward(er.first + pos, ip - 1, ip);
|
|
auto sp = ip;
|
|
auto dp = ip;
|
|
--sp;
|
|
while (dp > er.first + pos) {
|
|
*dp-- = *sp--;
|
|
}
|
|
*dp = mv;
|
|
BEESCOUNT(hash_bump);
|
|
case_cond = 1;
|
|
goto ret_dirty;
|
|
}
|
|
// Hash already exists before (or at) pos, leave it there
|
|
BEESCOUNT(hash_already);
|
|
case_cond = 2;
|
|
goto ret;
|
|
}
|
|
|
|
// Find an empty space to back of pos
|
|
for (ip = er.first + pos; ip < er.second; ++ip) {
|
|
if (*ip == Cell(0, 0)) {
|
|
*ip = mv;
|
|
case_cond = 3;
|
|
goto ret_dirty;
|
|
}
|
|
}
|
|
|
|
// Find an empty space to front of pos
|
|
// if there is anything to front of pos
|
|
if (pos > 0) {
|
|
for (ip = er.first + pos - 1; ip >= er.first; --ip) {
|
|
if (*ip == Cell(0, 0)) {
|
|
*ip = mv;
|
|
case_cond = 4;
|
|
goto ret_dirty;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Evict something and insert at pos
|
|
// move_backward(er.first + pos, er.second - 1, er.second);
|
|
ip = er.second - 1;
|
|
while (ip > er.first + pos) {
|
|
auto dp = ip;
|
|
*dp = *--ip;
|
|
}
|
|
er.first[pos] = mv;
|
|
BEESCOUNT(hash_evict);
|
|
case_cond = 5;
|
|
ret_dirty:
|
|
BEESCOUNT(hash_insert);
|
|
set_extent_dirty_locked(hash_to_extent_index(hash));
|
|
ret:
|
|
#if 0
|
|
if (verify_cell_range(er.first, er.second, false)) {
|
|
BEESLOG("while push_randoming (case " << case_cond << ") pos " << pos
|
|
<< " ip " << (ip - er.first) << " " << mv);
|
|
// dump_bucket_locked(saved.data(), saved.data() + saved.size());
|
|
// dump_bucket_locked(er.first, er.second);
|
|
}
|
|
#else
|
|
(void)case_cond;
|
|
#endif
|
|
return found;
|
|
}
|
|
|
|
void
|
|
BeesHashTable::try_mmap_flags(int flags)
|
|
{
|
|
if (!m_cell_ptr) {
|
|
THROW_CHECK1(out_of_range, m_size, m_size > 0);
|
|
Timer map_time;
|
|
catch_all([&]() {
|
|
BEESLOGINFO("mapping hash table size " << m_size << " with flags " << mmap_flags_ntoa(flags));
|
|
void *ptr = mmap_or_die(nullptr, m_size, PROT_READ | PROT_WRITE, flags, flags & MAP_ANONYMOUS ? -1 : int(m_fd), 0);
|
|
BEESLOGINFO("mmap done in " << map_time << " sec");
|
|
m_cell_ptr = static_cast<Cell *>(ptr);
|
|
void *ptr_end = static_cast<uint8_t *>(ptr) + m_size;
|
|
m_cell_ptr_end = static_cast<Cell *>(ptr_end);
|
|
});
|
|
}
|
|
}
|
|
|
|
void
|
|
BeesHashTable::open_file()
|
|
{
|
|
// OK open hash table
|
|
BEESNOTE("opening hash table '" << m_filename << "' target size " << m_size << " (" << pretty(m_size) << ")");
|
|
|
|
// Try to open existing hash table
|
|
Fd new_fd = openat(m_ctx->home_fd(), m_filename.c_str(), FLAGS_OPEN_FILE_RW, 0700);
|
|
|
|
// If that doesn't work, try to make a new one
|
|
if (!new_fd) {
|
|
string tmp_filename = m_filename + ".tmp";
|
|
BEESNOTE("creating new hash table '" << tmp_filename << "'");
|
|
BEESLOGINFO("Creating new hash table '" << tmp_filename << "'");
|
|
unlinkat(m_ctx->home_fd(), tmp_filename.c_str(), 0);
|
|
new_fd = openat_or_die(m_ctx->home_fd(), tmp_filename, FLAGS_CREATE_FILE, 0700);
|
|
BEESNOTE("truncating new hash table '" << tmp_filename << "' size " << m_size << " (" << pretty(m_size) << ")");
|
|
BEESLOGINFO("Truncating new hash table '" << tmp_filename << "' size " << m_size << " (" << pretty(m_size) << ")");
|
|
ftruncate_or_die(new_fd, m_size);
|
|
BEESNOTE("truncating new hash table '" << tmp_filename << "' -> '" << m_filename << "'");
|
|
BEESLOGINFO("Truncating new hash table '" << tmp_filename << "' -> '" << m_filename << "'");
|
|
renameat_or_die(m_ctx->home_fd(), tmp_filename, m_ctx->home_fd(), m_filename);
|
|
}
|
|
|
|
Stat st(new_fd);
|
|
off_t new_size = st.st_size;
|
|
|
|
THROW_CHECK1(invalid_argument, new_size, new_size > 0);
|
|
THROW_CHECK1(invalid_argument, new_size, (new_size % BLOCK_SIZE_HASHTAB_EXTENT) == 0);
|
|
m_size = new_size;
|
|
m_fd = new_fd;
|
|
}
|
|
|
|
BeesHashTable::BeesHashTable(shared_ptr<BeesContext> ctx, string filename, off_t size) :
|
|
m_ctx(ctx),
|
|
m_size(0),
|
|
m_void_ptr(nullptr),
|
|
m_void_ptr_end(nullptr),
|
|
m_buckets(0),
|
|
m_cells(0),
|
|
m_writeback_thread("hash_writeback"),
|
|
m_prefetch_thread("hash_prefetch"),
|
|
m_flush_rate_limit(BEES_FLUSH_RATE),
|
|
m_stats_file(m_ctx->home_fd(), "beesstats.txt")
|
|
{
|
|
// Sanity checks to protect the implementation from its weaknesses
|
|
THROW_CHECK2(invalid_argument, BLOCK_SIZE_HASHTAB_BUCKET, BLOCK_SIZE_HASHTAB_EXTENT, (BLOCK_SIZE_HASHTAB_EXTENT % BLOCK_SIZE_HASHTAB_BUCKET) == 0);
|
|
|
|
// There's more than one union
|
|
THROW_CHECK2(runtime_error, sizeof(Bucket), BLOCK_SIZE_HASHTAB_BUCKET, BLOCK_SIZE_HASHTAB_BUCKET == sizeof(Bucket));
|
|
THROW_CHECK2(runtime_error, sizeof(Bucket::p_byte), BLOCK_SIZE_HASHTAB_BUCKET, BLOCK_SIZE_HASHTAB_BUCKET == sizeof(Bucket::p_byte));
|
|
THROW_CHECK2(runtime_error, sizeof(Extent), BLOCK_SIZE_HASHTAB_EXTENT, BLOCK_SIZE_HASHTAB_EXTENT == sizeof(Extent));
|
|
THROW_CHECK2(runtime_error, sizeof(Extent::p_byte), BLOCK_SIZE_HASHTAB_EXTENT, BLOCK_SIZE_HASHTAB_EXTENT == sizeof(Extent::p_byte));
|
|
|
|
m_filename = filename;
|
|
m_size = size;
|
|
open_file();
|
|
|
|
// Now we know size we can compute stuff
|
|
|
|
BEESTRACE("hash table size " << m_size);
|
|
BEESTRACE("hash table bucket size " << BLOCK_SIZE_HASHTAB_BUCKET);
|
|
BEESTRACE("hash table extent size " << BLOCK_SIZE_HASHTAB_EXTENT);
|
|
|
|
BEESLOGINFO("opened hash table filename '" << filename << "' length " << m_size);
|
|
m_buckets = m_size / BLOCK_SIZE_HASHTAB_BUCKET;
|
|
m_cells = m_buckets * c_cells_per_bucket;
|
|
m_extents = (m_size + BLOCK_SIZE_HASHTAB_EXTENT - 1) / BLOCK_SIZE_HASHTAB_EXTENT;
|
|
BEESLOGINFO("\tcells " << m_cells << ", buckets " << m_buckets << ", extents " << m_extents);
|
|
|
|
BEESLOGINFO("\tflush rate limit " << BEES_FLUSH_RATE);
|
|
|
|
// Try to mmap that much memory
|
|
try_mmap_flags(MAP_PRIVATE | MAP_ANONYMOUS);
|
|
|
|
if (!m_cell_ptr) {
|
|
THROW_ERRNO("unable to mmap " << filename);
|
|
}
|
|
|
|
// Do unions work the way we think (and rely on)?
|
|
THROW_CHECK2(runtime_error, m_void_ptr, m_cell_ptr, m_void_ptr == m_cell_ptr);
|
|
THROW_CHECK2(runtime_error, m_void_ptr, m_byte_ptr, m_void_ptr == m_byte_ptr);
|
|
THROW_CHECK2(runtime_error, m_void_ptr, m_bucket_ptr, m_void_ptr == m_bucket_ptr);
|
|
THROW_CHECK2(runtime_error, m_void_ptr, m_extent_ptr, m_void_ptr == m_extent_ptr);
|
|
|
|
// Give all the madvise hints that the kernel understands
|
|
const struct madv_flag {
|
|
const char *name;
|
|
int value;
|
|
} madv_flags[] = {
|
|
{ .name = "MADV_HUGEPAGE", .value = MADV_HUGEPAGE },
|
|
{ .name = "MADV_DONTFORK", .value = MADV_DONTFORK },
|
|
{ .name = "MADV_DONTDUMP", .value = MADV_DONTDUMP },
|
|
{ .name = "", .value = 0 },
|
|
};
|
|
for (auto fp = madv_flags; fp->value; ++fp) {
|
|
BEESTOOLONG("madvise(" << fp->name << ")");
|
|
if (madvise(m_byte_ptr, m_size, fp->value)) {
|
|
BEESLOGWARN("madvise(..., " << fp->name << "): " << strerror(errno) << " (ignored)");
|
|
}
|
|
}
|
|
|
|
m_extent_metadata.resize(m_extents);
|
|
|
|
m_writeback_thread.exec([&]() {
|
|
writeback_loop();
|
|
});
|
|
|
|
m_prefetch_thread.exec([&]() {
|
|
prefetch_loop();
|
|
});
|
|
|
|
// Blacklist might fail if the hash table is not stored on a btrfs
|
|
catch_all([&]() {
|
|
m_ctx->blacklist_insert(BeesFileId(m_fd));
|
|
});
|
|
}
|
|
|
|
BeesHashTable::~BeesHashTable()
|
|
{
|
|
BEESLOGDEBUG("Destroy BeesHashTable");
|
|
if (m_cell_ptr && m_size) {
|
|
// Dirty extents should have been flushed before now,
|
|
// e.g. in stop(). If that didn't happen, don't fall
|
|
// into the same trap (and maybe throw an exception) here.
|
|
// flush_dirty_extents(false);
|
|
catch_all([&]() {
|
|
DIE_IF_NON_ZERO(munmap(m_cell_ptr, m_size));
|
|
m_cell_ptr = nullptr;
|
|
m_size = 0;
|
|
});
|
|
}
|
|
BEESLOGDEBUG("BeesHashTable destroyed");
|
|
}
|
|
|
|
void
|
|
BeesHashTable::stop()
|
|
{
|
|
BEESNOTE("stopping BeesHashTable threads");
|
|
BEESLOGDEBUG("Stopping BeesHashTable threads");
|
|
|
|
unique_lock<mutex> lock(m_stop_mutex);
|
|
m_stop_requested = true;
|
|
m_stop_condvar.notify_all();
|
|
lock.unlock();
|
|
|
|
// Wake up hash writeback too
|
|
unique_lock<mutex> dirty_lock(m_dirty_mutex);
|
|
m_dirty_condvar.notify_all();
|
|
dirty_lock.unlock();
|
|
|
|
BEESNOTE("waiting for hash_prefetch thread");
|
|
BEESLOGDEBUG("Waiting for hash_prefetch thread");
|
|
m_prefetch_thread.join();
|
|
|
|
BEESNOTE("waiting for hash_writeback thread");
|
|
BEESLOGDEBUG("Waiting for hash_writeback thread");
|
|
m_writeback_thread.join();
|
|
|
|
if (m_cell_ptr && m_size) {
|
|
BEESLOGDEBUG("Flushing hash table");
|
|
BEESNOTE("flushing hash table");
|
|
flush_dirty_extents(false);
|
|
}
|
|
|
|
BEESLOGDEBUG("BeesHashTable stopped");
|
|
}
|