diff --git a/include/crucible/time.h b/include/crucible/time.h index b806a7d..c788869 100644 --- a/include/crucible/time.h +++ b/include/crucible/time.h @@ -4,6 +4,8 @@ #include "crucible/error.h" #include +#include +#include #include #include @@ -17,6 +19,7 @@ namespace crucible { public: Timer(); double age() const; + chrono::high_resolution_clock::time_point get() const; double report(int precision = 1000) const; void reset(); double lap(); @@ -43,6 +46,56 @@ namespace crucible { void borrow(double cost = 1.0); }; + class RateEstimator { + mutable mutex m_mutex; + mutable condition_variable m_condvar; + Timer m_timer; + double m_num = 0.0; + double m_den = 0.0; + uint64_t m_last_count = numeric_limits::max(); + Timer m_last_update; + const double m_decay = 0.99; + Timer m_last_decay; + double m_min_delay; + double m_max_delay; + + chrono::duration duration_unlocked(uint64_t relative_count) const; + chrono::high_resolution_clock::time_point time_point_unlocked(uint64_t absolute_count) const; + double rate_unlocked() const; + pair ratio_unlocked() const; + void update_unlocked(uint64_t new_count); + public: + RateEstimator(double min_delay = 1, double max_delay = 3600); + + // Block until count reached + void wait_for(uint64_t new_count_relative) const; + void wait_until(uint64_t new_count_absolute) const; + + // Computed rates and ratios + double rate() const; + pair ratio() const; + + // Inspect raw num/den + pair raw() const; + + // Write count + void update(uint64_t new_count); + + // Read count + uint64_t count() const; + + // Convert counts to chrono types + chrono::high_resolution_clock::time_point time_point(uint64_t absolute_count) const; + chrono::duration duration(uint64_t relative_count) const; + + // Polling delay until count reached (limited by min/max delay) + double seconds_for(uint64_t new_count_relative) const; + double seconds_until(uint64_t new_count_absolute) const; + }; + + ostream & + operator<<(ostream &os, const RateEstimator &re); + } #endif // CRUCIBLE_TIME_H diff --git a/lib/time.cc b/lib/time.cc index 8cc783b..e7b4721 100644 --- a/lib/time.cc +++ b/lib/time.cc @@ -1,11 +1,13 @@ #include "crucible/time.h" #include "crucible/error.h" +#include "crucible/process.h" #include +#include + #include #include -#include namespace crucible { @@ -59,6 +61,12 @@ namespace crucible { m_start = chrono::high_resolution_clock::now(); } + chrono::high_resolution_clock::time_point + Timer::get() const + { + return m_start; + } + double Timer::lap() { @@ -143,4 +151,178 @@ namespace crucible { m_tokens -= cost; } + RateEstimator::RateEstimator(double min_delay, double max_delay) : + m_min_delay(min_delay), + m_max_delay(max_delay) + { + THROW_CHECK1(invalid_argument, min_delay, min_delay > 0); + THROW_CHECK1(invalid_argument, max_delay, max_delay > 0); + THROW_CHECK2(invalid_argument, min_delay, max_delay, max_delay > min_delay); + } + + void + RateEstimator::update_unlocked(uint64_t new_count) + { + // Gradually reduce the effect of previous updates + if (m_last_decay.age() > 1) { + m_num *= m_decay; + m_den *= m_decay; + m_last_decay.reset(); + } + // Add units over time to running totals + auto increment = new_count - min(new_count, m_last_count); + auto delta = max(0.0, m_last_update.lap()); + m_num += increment; + m_den += delta; + m_last_count = new_count; + // If count increased, wake up any waiters + if (delta > 0) { + m_condvar.notify_all(); + } + } + + void + RateEstimator::update(uint64_t new_count) + { + unique_lock lock(m_mutex); + return update_unlocked(new_count); + } + + uint64_t + RateEstimator::count() const + { + unique_lock lock(m_mutex); + return m_last_count; + } + + pair + RateEstimator::ratio_unlocked() const + { + auto num = max(m_num, 1.0); + // auto den = max(m_den, 1.0); + // Rate estimation slows down if there are no new units to count + auto den = max(m_den + m_last_update.age(), 1.0); + auto sec_per_count = den / num; + if (sec_per_count < m_min_delay) { + return make_pair(1.0, m_min_delay); + } + if (sec_per_count > m_max_delay) { + return make_pair(1.0, m_max_delay); + } + return make_pair(num, den); + } + + pair + RateEstimator::ratio() const + { + unique_lock lock(m_mutex); + return ratio_unlocked(); + } + + pair + RateEstimator::raw() const + { + unique_lock lock(m_mutex); + return make_pair(m_num, m_den); + } + + double + RateEstimator::rate_unlocked() const + { + auto r = ratio_unlocked(); + return r.first / r.second; + } + + double + RateEstimator::rate() const + { + unique_lock lock(m_mutex); + return rate_unlocked(); + } + + ostream & + operator<<(ostream &os, const RateEstimator &re) + { + os << "RateEstimator { "; + auto ratio = re.ratio(); + auto raw = re.raw(); + os << "count = " << re.count() << ", raw = " << raw.first << " / " << raw.second << ", ratio = " << ratio.first << " / " << ratio.second << ", rate = " << re.rate() << ", duration(1) = " << re.duration(1).count() << ", seconds_for(1) = " << re.seconds_for(1) << " }"; + return os; + } + + chrono::duration + RateEstimator::duration_unlocked(uint64_t relative_count) const + { + auto dur = relative_count / rate_unlocked(); + dur = min(m_max_delay, dur); + dur = max(m_min_delay, dur); + return chrono::duration(dur); + } + + chrono::duration + RateEstimator::duration(uint64_t relative_count) const + { + unique_lock lock(m_mutex); + return duration_unlocked(relative_count); + } + + chrono::high_resolution_clock::time_point + RateEstimator::time_point_unlocked(uint64_t absolute_count) const + { + auto relative_count = absolute_count - min(m_last_count, absolute_count); + auto relative_duration = duration_unlocked(relative_count); + return m_last_update.get() + chrono::duration_cast(relative_duration); + // return chrono::high_resolution_clock::now() + chrono::duration_cast(relative_duration); + } + + chrono::high_resolution_clock::time_point + RateEstimator::time_point(uint64_t absolute_count) const + { + unique_lock lock(m_mutex); + return time_point_unlocked(absolute_count); + } + + void + RateEstimator::wait_until(uint64_t new_count_absolute) const + { + unique_lock lock(m_mutex); + auto saved_count = m_last_count; + while (saved_count <= m_last_count && m_last_count < new_count_absolute) { + // Stop waiting if clock runs backwards + saved_count = m_last_count; + m_condvar.wait(lock); + } + } + + void + RateEstimator::wait_for(uint64_t new_count_relative) const + { + unique_lock lock(m_mutex); + auto saved_count = m_last_count; + auto new_count_absolute = m_last_count + new_count_relative; + while (saved_count <= m_last_count && m_last_count < new_count_absolute) { + // Stop waiting if clock runs backwards + saved_count = m_last_count; + m_condvar.wait(lock); + } + } + + double + RateEstimator::seconds_for(uint64_t new_count_relative) const + { + unique_lock lock(m_mutex); + auto ts = time_point_unlocked(new_count_relative + m_last_count); + auto delta_dur = ts - chrono::high_resolution_clock::now(); + return max(min(chrono::duration(delta_dur).count(), m_max_delay), m_min_delay); + } + + double + RateEstimator::seconds_until(uint64_t new_count_absolute) const + { + unique_lock lock(m_mutex); + auto ts = time_point_unlocked(new_count_absolute); + auto delta_dur = ts - chrono::high_resolution_clock::now(); + return max(min(chrono::duration(delta_dur).count(), m_max_delay), m_min_delay); + } + }