1
0
mirror of https://github.com/Zygo/bees.git synced 2025-05-17 13:25:45 +02:00

task: handle thread lifecycle more strictly

Testing sometimes crashes during exec of the first Task object, which
triggers construction of TaskConsumer threads.  Manage the life cycle
of the thread more strictly--don't access any methods of TaskConsumer
or std::thread until the constructor's caller's lock on TaskMaster
is released.

Signed-off-by: Zygo Blaxell <bees@furryterror.org>
This commit is contained in:
Zygo Blaxell 2021-06-02 11:45:42 -04:00
parent 0928362aab
commit 5f763f6d41

View File

@ -164,7 +164,7 @@ namespace crucible {
};
class TaskConsumer : public enable_shared_from_this<TaskConsumer> {
weak_ptr<TaskMasterState> m_master;
shared_ptr<TaskMasterState> m_master;
TaskStatePtr m_current_task;
friend class TaskState;
@ -175,11 +175,11 @@ namespace crucible {
friend class TaskMaster;
friend class TaskMasterState;
public:
TaskConsumer(weak_ptr<TaskMasterState> tms);
TaskConsumer(const shared_ptr<TaskMasterState> &tms);
shared_ptr<TaskState> current_task();
private:
// Make sure this gets constructed _last_
thread m_thread;
shared_ptr<thread> m_thread;
};
static thread_local TaskConsumerPtr tl_current_consumer;
@ -686,25 +686,29 @@ namespace crucible {
shared_ptr<TaskState>
TaskConsumer::current_task()
{
auto master_locked = m_master.lock();
unique_lock<mutex> lock(master_locked->m_mutex);
unique_lock<mutex> lock(m_master->m_mutex);
return current_task_locked();
}
void
TaskConsumer::consumer_thread()
{
m_thread.detach();
// Keep a copy because we will be destroying *this later
auto master_copy = m_master;
auto master_locked = m_master.lock();
unique_lock<mutex> lock(master_locked->m_mutex);
// Constructor is running with master locked.
// Wait until that is done before trying to do anything.
unique_lock<mutex> lock(master_copy->m_mutex);
// Detach thread so destructor doesn't call terminate
m_thread->detach();
// It is now safe to access our own shared_ptr
TaskConsumerPtr this_consumer = shared_from_this();
swap(this_consumer, tl_current_consumer);
while (!master_locked->m_cancelled) {
if (master_locked->m_thread_max < master_locked->m_threads.size()) {
while (!master_copy->m_cancelled) {
if (master_copy->m_thread_max < master_copy->m_threads.size()) {
// We are one of too many threads, exit now
break;
}
@ -712,11 +716,11 @@ namespace crucible {
if (!m_local_queue.empty()) {
m_current_task = *m_local_queue.begin();
m_local_queue.pop_front();
} else if (!master_locked->m_queue.empty()) {
m_current_task = *master_locked->m_queue.begin();
master_locked->m_queue.pop_front();
} else if (!master_copy->m_queue.empty()) {
m_current_task = *master_copy->m_queue.begin();
master_copy->m_queue.pop_front();
} else {
master_locked->m_condvar.wait(lock);
master_copy->m_condvar.wait(lock);
continue;
}
@ -727,7 +731,7 @@ namespace crucible {
});
// Update m_current_task with lock
decltype(m_current_task) hold_task;
TaskStatePtr hold_task;
lock.lock();
swap(hold_task, m_current_task);
@ -753,14 +757,14 @@ namespace crucible {
lock.lock();
// Fun fact: shared_from_this() isn't usable until the constructor returns...
master_locked->m_threads.erase(shared_from_this());
master_locked->m_condvar.notify_all();
master_copy->m_threads.erase(shared_from_this());
master_copy->m_condvar.notify_all();
}
TaskConsumer::TaskConsumer(weak_ptr<TaskMasterState> tms) :
m_master(tms),
m_thread([=](){ consumer_thread(); })
TaskConsumer::TaskConsumer(const shared_ptr<TaskMasterState> &tms) :
m_master(tms)
{
m_thread = make_shared<thread>([=](){ consumer_thread(); });
}
class BarrierState {