mirror of
https://github.com/Zygo/bees.git
synced 2025-05-18 05:45:45 +02:00
crucible: remove unused TimeQueue and WorkQueue classes
WorkQueue is superceded by Task. TimeQueue will be replaced by something based on Tasks. Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
parent
796aaed7f8
commit
090d79e13b
@ -1,188 +0,0 @@
|
|||||||
#ifndef CRUCIBLE_TIMEQUEUE_H
|
|
||||||
#define CRUCIBLE_TIMEQUEUE_H
|
|
||||||
|
|
||||||
#include <crucible/error.h>
|
|
||||||
#include <crucible/time.h>
|
|
||||||
|
|
||||||
#include <condition_variable>
|
|
||||||
#include <limits>
|
|
||||||
#include <list>
|
|
||||||
#include <memory>
|
|
||||||
#include <mutex>
|
|
||||||
#include <set>
|
|
||||||
|
|
||||||
namespace crucible {
|
|
||||||
using namespace std;
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
class TimeQueue {
|
|
||||||
|
|
||||||
public:
|
|
||||||
using Timestamp = chrono::high_resolution_clock::time_point;
|
|
||||||
|
|
||||||
private:
|
|
||||||
struct Item {
|
|
||||||
Timestamp m_time;
|
|
||||||
unsigned long m_id;
|
|
||||||
Task m_task;
|
|
||||||
|
|
||||||
bool operator<(const Item &that) const {
|
|
||||||
if (m_time < that.m_time) return true;
|
|
||||||
if (that.m_time < m_time) return false;
|
|
||||||
return m_id < that.m_id;
|
|
||||||
}
|
|
||||||
static unsigned s_id;
|
|
||||||
|
|
||||||
Item(const Timestamp &time, const Task& task) :
|
|
||||||
m_time(time),
|
|
||||||
m_id(++s_id),
|
|
||||||
m_task(task)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
set<Item> m_set;
|
|
||||||
mutable mutex m_mutex;
|
|
||||||
condition_variable m_cond_full, m_cond_empty;
|
|
||||||
size_t m_max_queue_depth;
|
|
||||||
|
|
||||||
public:
|
|
||||||
~TimeQueue();
|
|
||||||
TimeQueue(size_t max_queue_depth = numeric_limits<size_t>::max());
|
|
||||||
|
|
||||||
void push(const Task &task, double delay = 0);
|
|
||||||
void push_nowait(const Task &task, double delay = 0);
|
|
||||||
Task pop();
|
|
||||||
bool pop_nowait(Task &t);
|
|
||||||
double when() const;
|
|
||||||
|
|
||||||
size_t size() const;
|
|
||||||
bool empty() const;
|
|
||||||
|
|
||||||
list<Task> peek(size_t count) const;
|
|
||||||
};
|
|
||||||
|
|
||||||
template <class Task> unsigned TimeQueue<Task>::Item::s_id = 0;
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
TimeQueue<Task>::~TimeQueue()
|
|
||||||
{
|
|
||||||
if (!m_set.empty()) {
|
|
||||||
cerr << "ERROR: " << m_set.size() << " locked items still in TimeQueue at destruction" << endl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
void
|
|
||||||
TimeQueue<Task>::push(const Task &task, double delay)
|
|
||||||
{
|
|
||||||
Timestamp time = chrono::high_resolution_clock::now() +
|
|
||||||
chrono::duration_cast<chrono::high_resolution_clock::duration>(chrono::duration<double>(delay));
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
while (m_set.size() > m_max_queue_depth) {
|
|
||||||
m_cond_full.wait(lock);
|
|
||||||
}
|
|
||||||
m_set.insert(Item(time, task));
|
|
||||||
m_cond_empty.notify_all();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
void
|
|
||||||
TimeQueue<Task>::push_nowait(const Task &task, double delay)
|
|
||||||
{
|
|
||||||
Timestamp time = chrono::high_resolution_clock::now() +
|
|
||||||
chrono::duration_cast<chrono::high_resolution_clock::duration>(chrono::duration<double>(delay));
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
m_set.insert(Item(time, task));
|
|
||||||
m_cond_empty.notify_all();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
Task
|
|
||||||
TimeQueue<Task>::pop()
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
while (1) {
|
|
||||||
while (m_set.empty()) {
|
|
||||||
m_cond_empty.wait(lock);
|
|
||||||
}
|
|
||||||
Timestamp now = chrono::high_resolution_clock::now();
|
|
||||||
if (now > m_set.begin()->m_time) {
|
|
||||||
Task rv = m_set.begin()->m_task;
|
|
||||||
m_set.erase(m_set.begin());
|
|
||||||
m_cond_full.notify_all();
|
|
||||||
return rv;
|
|
||||||
}
|
|
||||||
m_cond_empty.wait_until(lock, m_set.begin()->m_time);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
bool
|
|
||||||
TimeQueue<Task>::pop_nowait(Task &t)
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
if (m_set.empty()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
Timestamp now = chrono::high_resolution_clock::now();
|
|
||||||
if (now <= m_set.begin()->m_time) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
t = m_set.begin()->m_task;
|
|
||||||
m_set.erase(m_set.begin());
|
|
||||||
m_cond_full.notify_all();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
double
|
|
||||||
TimeQueue<Task>::when() const
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
if (m_set.empty()) {
|
|
||||||
return numeric_limits<double>::infinity();
|
|
||||||
}
|
|
||||||
return chrono::duration<double>(m_set.begin()->m_time - chrono::high_resolution_clock::now()).count();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
size_t
|
|
||||||
TimeQueue<Task>::size() const
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
return m_set.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
bool
|
|
||||||
TimeQueue<Task>::empty() const
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
return m_set.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
list<Task>
|
|
||||||
TimeQueue<Task>::peek(size_t count) const
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
list<Task> rv;
|
|
||||||
auto it = m_set.begin();
|
|
||||||
while (count-- && it != m_set.end()) {
|
|
||||||
rv.push_back(it->m_task);
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
return rv;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
TimeQueue<Task>::TimeQueue(size_t max_depth) :
|
|
||||||
m_max_queue_depth(max_depth)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif // CRUCIBLE_TIMEQUEUE_H
|
|
@ -1,192 +0,0 @@
|
|||||||
#ifndef CRUCIBLE_WORKQUEUE_H
|
|
||||||
#define CRUCIBLE_WORKQUEUE_H
|
|
||||||
|
|
||||||
#include <crucible/error.h>
|
|
||||||
|
|
||||||
#include <condition_variable>
|
|
||||||
#include <limits>
|
|
||||||
#include <list>
|
|
||||||
#include <memory>
|
|
||||||
#include <mutex>
|
|
||||||
#include <set>
|
|
||||||
|
|
||||||
namespace crucible {
|
|
||||||
using namespace std;
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
class WorkQueue {
|
|
||||||
|
|
||||||
public:
|
|
||||||
using set_type = set<Task>;
|
|
||||||
using key_type = Task;
|
|
||||||
|
|
||||||
private:
|
|
||||||
|
|
||||||
set_type m_set;
|
|
||||||
mutable mutex m_mutex;
|
|
||||||
condition_variable m_cond_full, m_cond_empty;
|
|
||||||
size_t m_max_queue_depth;
|
|
||||||
|
|
||||||
public:
|
|
||||||
~WorkQueue();
|
|
||||||
template <class... Args> WorkQueue(size_t max_queue_depth, Args... args);
|
|
||||||
template <class... Args> WorkQueue(Args... args);
|
|
||||||
|
|
||||||
void push(const key_type &name);
|
|
||||||
void push_wait(const key_type &name, size_t limit);
|
|
||||||
void push_nowait(const key_type &name);
|
|
||||||
|
|
||||||
key_type pop();
|
|
||||||
bool pop_nowait(key_type &rv);
|
|
||||||
key_type peek();
|
|
||||||
|
|
||||||
size_t size() const;
|
|
||||||
bool empty();
|
|
||||||
set_type copy();
|
|
||||||
list<Task> peek(size_t count) const;
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
WorkQueue<Task>::~WorkQueue()
|
|
||||||
{
|
|
||||||
if (!m_set.empty()) {
|
|
||||||
cerr << "ERROR: " << m_set.size() << " locked items still in WorkQueue " << this << " at destruction" << endl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
void
|
|
||||||
WorkQueue<Task>::push(const key_type &name)
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
while (!m_set.count(name) && m_set.size() > m_max_queue_depth) {
|
|
||||||
m_cond_full.wait(lock);
|
|
||||||
}
|
|
||||||
m_set.insert(name);
|
|
||||||
m_cond_empty.notify_all();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
void
|
|
||||||
WorkQueue<Task>::push_wait(const key_type &name, size_t limit)
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
while (!m_set.count(name) && m_set.size() >= limit) {
|
|
||||||
m_cond_full.wait(lock);
|
|
||||||
}
|
|
||||||
m_set.insert(name);
|
|
||||||
m_cond_empty.notify_all();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
void
|
|
||||||
WorkQueue<Task>::push_nowait(const key_type &name)
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
m_set.insert(name);
|
|
||||||
m_cond_empty.notify_all();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
typename WorkQueue<Task>::key_type
|
|
||||||
WorkQueue<Task>::pop()
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
while (m_set.empty()) {
|
|
||||||
m_cond_empty.wait(lock);
|
|
||||||
}
|
|
||||||
key_type rv = *m_set.begin();
|
|
||||||
m_set.erase(m_set.begin());
|
|
||||||
m_cond_full.notify_all();
|
|
||||||
return rv;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
bool
|
|
||||||
WorkQueue<Task>::pop_nowait(key_type &rv)
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
if (m_set.empty()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
rv = *m_set.begin();
|
|
||||||
m_set.erase(m_set.begin());
|
|
||||||
m_cond_full.notify_all();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
typename WorkQueue<Task>::key_type
|
|
||||||
WorkQueue<Task>::peek()
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
if (m_set.empty()) {
|
|
||||||
return key_type();
|
|
||||||
} else {
|
|
||||||
// Make copy with lock held
|
|
||||||
auto rv = *m_set.begin();
|
|
||||||
return rv;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
size_t
|
|
||||||
WorkQueue<Task>::size() const
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
return m_set.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
bool
|
|
||||||
WorkQueue<Task>::empty()
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
return m_set.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
typename WorkQueue<Task>::set_type
|
|
||||||
WorkQueue<Task>::copy()
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
auto rv = m_set;
|
|
||||||
return rv;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
list<Task>
|
|
||||||
WorkQueue<Task>::peek(size_t count) const
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(m_mutex);
|
|
||||||
list<Task> rv;
|
|
||||||
for (auto i : m_set) {
|
|
||||||
if (count--) {
|
|
||||||
rv.push_back(i);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rv;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
template <class... Args>
|
|
||||||
WorkQueue<Task>::WorkQueue(Args... args) :
|
|
||||||
m_set(args...),
|
|
||||||
m_max_queue_depth(numeric_limits<size_t>::max())
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class Task>
|
|
||||||
template <class... Args>
|
|
||||||
WorkQueue<Task>::WorkQueue(size_t max_depth, Args... args) :
|
|
||||||
m_set(args...),
|
|
||||||
m_max_queue_depth(max_depth)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif // CRUCIBLE_WORKQUEUE_H
|
|
@ -9,8 +9,6 @@
|
|||||||
#include "crucible/fs.h"
|
#include "crucible/fs.h"
|
||||||
#include "crucible/lockset.h"
|
#include "crucible/lockset.h"
|
||||||
#include "crucible/time.h"
|
#include "crucible/time.h"
|
||||||
#include "crucible/timequeue.h"
|
|
||||||
#include "crucible/workqueue.h"
|
|
||||||
|
|
||||||
#include <array>
|
#include <array>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user