mirror of
https://github.com/Zygo/bees.git
synced 2025-05-17 13:25:45 +02:00
ProgressTracker: reduce memory usage with long-running work items
ProgressTracker was only freeing memory for work items when they reach the head of the work tracking queue. If the first work item takes hours to complete, and thousands of items are processed every second, this leads to millions of completed items tracked in memory at a time, wasting gigabytes of system RAM. Rewrite ProgressHolderState methods to keep only incomplete work items in memory, regardless of the order in which they are added or removed. Also fix the unit tests which were relying on the memory leak to work, and add test cases for code coverage. Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
parent
299509ce32
commit
a9a5cd03a5
@ -4,13 +4,20 @@
|
||||
#include "crucible/error.h"
|
||||
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
|
||||
#include <cassert>
|
||||
|
||||
namespace crucible {
|
||||
using namespace std;
|
||||
|
||||
/// A class to track progress of multiple workers using only two points:
|
||||
/// the first and last incomplete state. The first incomplete
|
||||
/// state can be recorded as a checkpoint to resume later on.
|
||||
/// The last completed state is the starting point for workers that
|
||||
/// need something to do.
|
||||
template <class T>
|
||||
class ProgressTracker {
|
||||
struct ProgressTrackerState;
|
||||
@ -19,8 +26,16 @@ namespace crucible {
|
||||
using value_type = T;
|
||||
using ProgressHolder = shared_ptr<ProgressHolderState>;
|
||||
|
||||
/// Create ProgressTracker with initial begin and end state 'v'.
|
||||
ProgressTracker(const value_type &v);
|
||||
|
||||
/// The first incomplete state. This is not "sticky",
|
||||
/// it will revert to the end state if there are no
|
||||
/// items in progress.
|
||||
value_type begin() const;
|
||||
|
||||
/// The last incomplete state. This is "sticky",
|
||||
/// it can only increase and never decrease.
|
||||
value_type end() const;
|
||||
|
||||
ProgressHolder hold(const value_type &v);
|
||||
@ -31,7 +46,7 @@ namespace crucible {
|
||||
struct ProgressTrackerState {
|
||||
using key_type = pair<value_type, ProgressHolderState *>;
|
||||
mutex m_mutex;
|
||||
map<key_type, bool> m_in_progress;
|
||||
set<key_type> m_in_progress;
|
||||
value_type m_begin;
|
||||
value_type m_end;
|
||||
};
|
||||
@ -39,6 +54,7 @@ namespace crucible {
|
||||
class ProgressHolderState {
|
||||
shared_ptr<ProgressTrackerState> m_state;
|
||||
const value_type m_value;
|
||||
using key_type = typename ProgressTrackerState::key_type;
|
||||
public:
|
||||
ProgressHolderState(shared_ptr<ProgressTrackerState> state, const value_type &v);
|
||||
~ProgressHolderState();
|
||||
@ -86,7 +102,11 @@ namespace crucible {
|
||||
m_value(v)
|
||||
{
|
||||
unique_lock<mutex> lock(m_state->m_mutex);
|
||||
m_state->m_in_progress[make_pair(m_value, this)] = true;
|
||||
const auto rv = m_state->m_in_progress.insert(key_type(m_value, this));
|
||||
THROW_CHECK1(runtime_error, m_value, rv.second);
|
||||
// Set the beginning to the first existing in-progress item
|
||||
m_state->m_begin = m_state->m_in_progress.begin()->first;
|
||||
// If this value is past the end, move the end, but don't go backwards
|
||||
if (m_state->m_end < m_value) {
|
||||
m_state->m_end = m_value;
|
||||
}
|
||||
@ -96,17 +116,15 @@ namespace crucible {
|
||||
ProgressTracker<T>::ProgressHolderState::~ProgressHolderState()
|
||||
{
|
||||
unique_lock<mutex> lock(m_state->m_mutex);
|
||||
m_state->m_in_progress[make_pair(m_value, this)] = false;
|
||||
auto p = m_state->m_in_progress.begin();
|
||||
while (p != m_state->m_in_progress.end()) {
|
||||
if (p->second) {
|
||||
break;
|
||||
}
|
||||
if (m_state->m_begin < p->first.first) {
|
||||
m_state->m_begin = p->first.first;
|
||||
}
|
||||
m_state->m_in_progress.erase(p);
|
||||
p = m_state->m_in_progress.begin();
|
||||
const auto rv = m_state->m_in_progress.erase(key_type(m_value, this));
|
||||
// THROW_CHECK2(runtime_error, m_value, rv, rv == 1);
|
||||
assert(rv == 1);
|
||||
if (m_state->m_in_progress.empty()) {
|
||||
// If we made the list empty, then m_begin == m_end
|
||||
m_state->m_begin = m_state->m_end;
|
||||
} else {
|
||||
// If we deleted the first element, then m_begin = current first element
|
||||
m_state->m_begin = m_state->m_in_progress.begin()->first;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,23 +12,49 @@ using namespace std;
|
||||
void
|
||||
test_progress()
|
||||
{
|
||||
// On create, begin == end == constructor argument
|
||||
ProgressTracker<uint64_t> pt(123);
|
||||
auto hold = pt.hold(234);
|
||||
auto hold2 = pt.hold(345);
|
||||
assert(pt.begin() == 123);
|
||||
assert(pt.end() == 345);
|
||||
auto hold3 = pt.hold(456);
|
||||
assert(pt.begin() == 123);
|
||||
assert(pt.end() == 456);
|
||||
hold2.reset();
|
||||
assert(pt.begin() == 123);
|
||||
assert(pt.end() == 456);
|
||||
hold.reset();
|
||||
assert(pt.end() == 123);
|
||||
|
||||
// Holding a position past the end increases the end (and moves begin to match)
|
||||
auto hold345 = pt.hold(345);
|
||||
assert(pt.begin() == 345);
|
||||
assert(pt.end() == 345);
|
||||
|
||||
// Holding a position before begin reduces begin, without changing end
|
||||
auto hold234 = pt.hold(234);
|
||||
assert(pt.begin() == 234);
|
||||
assert(pt.end() == 345);
|
||||
|
||||
// Holding a position past the end increases the end, without affecting begin
|
||||
auto hold456 = pt.hold(456);
|
||||
assert(pt.begin() == 234);
|
||||
assert(pt.end() == 456);
|
||||
hold3.reset();
|
||||
|
||||
// Releasing a position in the middle affects neither begin nor end
|
||||
hold345.reset();
|
||||
assert(pt.begin() == 234);
|
||||
assert(pt.end() == 456);
|
||||
|
||||
// Hold another position in the middle to test begin moving forward
|
||||
auto hold400 = pt.hold(400);
|
||||
|
||||
// Releasing a position at the beginning moves begin forward
|
||||
hold234.reset();
|
||||
assert(pt.begin() == 400);
|
||||
assert(pt.end() == 456);
|
||||
|
||||
// Releasing a position at the end doesn't move end backward
|
||||
hold456.reset();
|
||||
assert(pt.begin() == 400);
|
||||
assert(pt.end() == 456);
|
||||
|
||||
// Releasing a position in the middle doesn't move end backward but does move begin forward
|
||||
hold400.reset();
|
||||
assert(pt.begin() == 456);
|
||||
assert(pt.end() == 456);
|
||||
|
||||
}
|
||||
|
||||
int
|
||||
|
Loading…
x
Reference in New Issue
Block a user