diff --git a/include/crucible/task.h b/include/crucible/task.h index 14b07b0..ef12803 100644 --- a/include/crucible/task.h +++ b/include/crucible/task.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace crucible { using namespace std; @@ -22,7 +23,7 @@ namespace crucible { public: // create Task object containing closure and description - Task(function exec_fn, function print_fn); + Task(string title, function exec_fn); // schedule Task at end of queue. // May run Task in current thread or in other thread. @@ -33,7 +34,7 @@ namespace crucible { void run_earlier() const; // describe Task as text - ostream &print(ostream &os) const; + string title() const; // Returns currently executing task if called from exec_fn. // Usually used to reschedule the currently executing Task. diff --git a/lib/task.cc b/lib/task.cc index 61d1f75..4a4a4a9 100644 --- a/lib/task.cc +++ b/lib/task.cc @@ -19,15 +19,15 @@ namespace crucible { class TaskState : public enable_shared_from_this { const function m_exec_fn; - const function m_print_fn; + const string m_title; TaskId m_id; static atomic s_next_id; public: - TaskState(function exec_fn, function print_fn); + TaskState(string title, function exec_fn); void exec(); - ostream &print(ostream &os); + string title() const; TaskId id() const; }; @@ -73,19 +73,19 @@ namespace crucible { static shared_ptr s_tms = make_shared(); - TaskState::TaskState(function exec_fn, - function print_fn) : + TaskState::TaskState(string title, function exec_fn) : m_exec_fn(exec_fn), - m_print_fn(print_fn), + m_title(title), m_id(++s_next_id) { + THROW_CHECK0(invalid_argument, !m_title.empty()); } void TaskState::exec() { THROW_CHECK0(invalid_argument, m_exec_fn); - THROW_CHECK0(invalid_argument, m_print_fn); + THROW_CHECK0(invalid_argument, !m_title.empty()); char buf[24]; memset(buf, '\0', sizeof(buf)); @@ -93,10 +93,7 @@ namespace crucible { Cleanup pthread_name_cleaner([&]() { pthread_setname_np(pthread_self(), buf); }); - ostringstream oss; - m_print_fn(oss); - auto thread_name = oss.str(); - DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), thread_name.c_str())); + DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), m_title.c_str())); weak_ptr this_task_wp = shared_from_this(); Cleanup current_task_cleaner([&]() { @@ -107,11 +104,11 @@ namespace crucible { m_exec_fn(); } - ostream & - TaskState::print(ostream &os) + string + TaskState::title() const { - THROW_CHECK0(invalid_argument, m_print_fn); - return m_print_fn(os); + THROW_CHECK0(runtime_error, !m_title.empty()); + return m_title; } TaskId @@ -176,9 +173,7 @@ namespace crucible { os << "Queue (size " << s_tms->m_queue.size() << "):" << endl; size_t counter = 0; for (auto i : s_tms->m_queue) { - os << "Queue #" << ++counter << " Task ID " << i->id() << " "; - i->print(os); - os << endl; + os << "Queue #" << ++counter << " Task ID " << i->id() << " " << i->title() << endl; } return os << "Queue End" << endl; } @@ -193,8 +188,7 @@ namespace crucible { os << "Worker #" << ++counter << " "; auto task = i->current_task_locked(); if (task) { - os << "Task ID " << task->id() << " "; - task->print(os); + os << "Task ID " << task->id() << " " << task->title(); } else { os << "(idle)"; } @@ -238,8 +232,8 @@ namespace crucible { { } - Task::Task(function exec_fn, function print_fn) : - m_task_state(make_shared(exec_fn, print_fn)) + Task::Task(string title, function exec_fn) : + m_task_state(make_shared(title, exec_fn)) { } @@ -263,17 +257,17 @@ namespace crucible { return Task(tl_current_task_wp.lock()); } - ostream & - Task::print(ostream &os) const + string + Task::title() const { THROW_CHECK0(runtime_error, m_task_state); - return m_task_state->print(os); + return m_task_state->title(); } ostream & operator<<(ostream &os, const Task &task) { - return task.print(os); + return os << task.title(); }; TaskId diff --git a/src/bees-roots.cc b/src/bees-roots.cc index 6b69faa..06ae695 100644 --- a/src/bees-roots.cc +++ b/src/bees-roots.cc @@ -251,12 +251,12 @@ BeesRoots::crawl_roots() size_t batch_count = 0; while (first_range && batch_count < BEES_MAX_CRAWL_BATCH) { auto subvol = first_crawl->get_state().m_root; - Task([ctx_copy, first_range]() { + ostringstream oss; + oss << "crawl_" << subvol; + auto task_title = oss.str(); + Task(task_title, [ctx_copy, first_range]() { BEESNOTE("scan_forward " << first_range); ctx_copy->scan_forward(first_range); - }, - [first_range, subvol](ostream &os) -> ostream & { - return os << "crawl_" << subvol; }).run(); BEESCOUNT(crawl_scan); m_crawl_current = first_crawl->get_state(); @@ -281,12 +281,12 @@ BeesRoots::crawl_roots() size_t batch_count = 0; while (this_range && batch_count < BEES_MAX_CRAWL_BATCH) { auto subvol = this_crawl->get_state().m_root; - Task([ctx_copy, this_range]() { + ostringstream oss; + oss << "crawl_" << subvol; + auto task_title = oss.str(); + Task(task_title, [ctx_copy, this_range]() { BEESNOTE("scan_forward " << this_range); ctx_copy->scan_forward(this_range); - }, - [this_range, subvol](ostream &os) -> ostream & { - return os << "crawl_" << subvol; }).run(); crawled = true; BEESCOUNT(crawl_scan); @@ -322,7 +322,7 @@ BeesRoots::crawl_thread() // shared_from_this() in a constructor. BEESNOTE("crawling"); auto shared_this = shared_from_this(); - Task([shared_this]() { + Task("crawl", [shared_this]() { auto tqs = TaskMaster::get_queue_count(); BEESNOTE("queueing extents to scan, " << tqs << " of " << BEES_MAX_QUEUE_SIZE); while (tqs < BEES_MAX_QUEUE_SIZE) { @@ -332,7 +332,7 @@ BeesRoots::crawl_thread() tqs = TaskMaster::get_queue_count(); } Task::current_task().run(); - }, [](ostream &os) -> ostream& { return os << "crawl"; }).run(); + }).run(); } void diff --git a/src/bees.cc b/src/bees.cc index c1762bf..753b16e 100644 --- a/src/bees.cc +++ b/src/bees.cc @@ -151,9 +151,7 @@ BeesNote::get_name() // about it being destroyed under us. auto current_task = Task::current_task(); if (current_task) { - ostringstream oss; - oss << current_task; - return oss.str(); + return current_task.title(); } // OK try the pthread name next. diff --git a/test/task.cc b/test/task.cc index 37e8d16..6526c03 100644 --- a/test/task.cc +++ b/test/task.cc @@ -28,12 +28,17 @@ test_tasks(size_t count) // Run several tasks in parallel for (size_t c = 0; c < count; ++c) { - Task t([c, &task_done, &mtx, &cv]() { - unique_lock lock(mtx); - // cerr << "Task #" << c << endl; - task_done.at(c) = true; - cv.notify_one(); - }, [=](ostream& os) -> ostream& { return os << "task #" << c; }); + ostringstream oss; + oss << "task #" << c; + Task t( + oss.str(), + [c, &task_done, &mtx, &cv]() { + unique_lock lock(mtx); + // cerr << "Task #" << c << endl; + task_done.at(c) = true; + cv.notify_one(); + } + ); t.run(); } @@ -58,20 +63,6 @@ test_tasks(size_t count) } } -#define TASK_MACRO(printer, body) Task( \ - [=]() { body; }, \ - [=](ostream &__os) -> ostream& { return __os << printer; } \ -) - -void -test_macro() -{ - TASK_MACRO( - "A Task", - cerr << "Hello, World!" << endl; - ).run(); -} - void test_finish() { @@ -104,12 +95,17 @@ test_barrier(size_t count) // Run several tasks in parallel for (size_t c = 0; c < count; ++c) { auto bl = b->lock(); - Task t([c, &task_done, &mtx, &cv, bl]() mutable { - // cerr << "Task #" << c << endl; - unique_lock lock(mtx); - task_done.at(c) = true; - bl.release(); - }, [=](ostream& os) -> ostream& { return os << "task #" << c; }); + ostringstream oss; + oss << "task #" << c; + Task t( + oss.str(), + [c, &task_done, &mtx, &cv, bl]() mutable { + // cerr << "Task #" << c << endl; + unique_lock lock(mtx); + task_done.at(c) = true; + bl.release(); + } + ); t.run(); } @@ -121,14 +117,12 @@ test_barrier(size_t count) bool done_flag = false; Task completed( + "Waiting for Barrier", [&mtx, &cv, &done_flag]() { unique_lock lock(mtx); // cerr << "Running cv notify" << endl; done_flag = true; cv.notify_all(); - }, - [](ostream &os) -> ostream& { - return os << "Waiting for Barrier"; } ); b->insert_task(completed); @@ -168,33 +162,36 @@ test_exclusion(size_t count) // Run several tasks in parallel for (size_t c = 0; c < count; ++c) { auto bl = b->lock(); - Task t([c, &only_one, &mtx, &excl, bl]() mutable { - // cerr << "Task #" << c << endl; - auto lock = excl.try_lock(); - if (!lock) { - excl.insert_task(Task::current_task()); - return; + ostringstream oss; + oss << "task #" << c; + Task t( + oss.str(), + [c, &only_one, &mtx, &excl, bl]() mutable { + // cerr << "Task #" << c << endl; + auto lock = excl.try_lock(); + if (!lock) { + excl.insert_task(Task::current_task()); + return; + } + bool locked = only_one.try_lock(); + assert(locked); + nanosleep(0.0001); + only_one.unlock(); + bl.release(); } - bool locked = only_one.try_lock(); - assert(locked); - nanosleep(0.0001); - only_one.unlock(); - bl.release(); - }, [=](ostream& os) -> ostream& { return os << "task #" << c; }); + ); t.run(); } bool done_flag = false; Task completed( + "Waiting for Barrier", [&mtx, &cv, &done_flag]() { unique_lock lock(mtx); // cerr << "Running cv notify" << endl; done_flag = true; cv.notify_all(); - }, - [](ostream &os) -> ostream& { - return os << "Waiting for Barrier"; } ); b->insert_task(completed); @@ -217,7 +214,6 @@ main(int, char**) alarm(9); RUN_A_TEST(test_tasks(256)); - RUN_A_TEST(test_macro()); RUN_A_TEST(test_finish()); RUN_A_TEST(test_unfinish()); RUN_A_TEST(test_barrier(256));