diff --git a/source/util/util-threadpool.cpp b/source/util/util-threadpool.cpp index 8ddcb07..72cb535 100644 --- a/source/util/util-threadpool.cpp +++ b/source/util/util-threadpool.cpp @@ -65,6 +65,7 @@ std::shared_ptr<::streamfx::util::threadpool::task> streamfx::util::threadpool:: { auto task = std::make_shared(fn, data); + // Append the task to the queue. std::unique_lock lock(_tasks_lock); _tasks.emplace_back(task); _tasks_cv.notify_one(); @@ -75,7 +76,11 @@ std::shared_ptr<::streamfx::util::threadpool::task> streamfx::util::threadpool:: void streamfx::util::threadpool::pop(std::shared_ptr<::streamfx::util::threadpool::task> work) { if (work) { - work->_is_dead.store(true); + { + std::unique_lock lock(work->_mutex); + work->_is_dead = true; + } + work->_is_complete.notify_all(); } } @@ -107,7 +112,7 @@ void streamfx::util::threadpool::work() } // If the task was killed, skip everything again. - if (local_work->_is_dead) { + if (local_work->_is_dead.load()) { continue; } @@ -126,6 +131,11 @@ void streamfx::util::threadpool::work() local_number, reinterpret_cast(local_work->_callback.target()), reinterpret_cast(local_work->_data.get())); } + { + std::unique_lock lock(local_work->_mutex); + local_work->_is_dead.store(true); + } + local_work->_is_complete.notify_all(); } // Remove our reference to the work unit. @@ -138,5 +148,13 @@ void streamfx::util::threadpool::work() streamfx::util::threadpool::task::task() {} streamfx::util::threadpool::task::task(threadpool_callback_t fn, threadpool_data_t dt) - : _is_dead(false), _callback(fn), _data(dt) + : _mutex(), _is_complete(), _is_dead(false), _callback(fn), _data(dt) {} + +void streamfx::util::threadpool::task::await_completion() +{ + if (!_is_dead) { + std::unique_lock lock(_mutex); + _is_complete.wait(lock, [this]() { return this->_is_dead.load(); }); + } +} diff --git a/source/util/util-threadpool.hpp b/source/util/util-threadpool.hpp index 1f314e6..6b824f4 100644 --- a/source/util/util-threadpool.hpp +++ b/source/util/util-threadpool.hpp @@ -35,20 +35,24 @@ namespace streamfx::util { public: class task { protected: - std::atomic_bool _is_dead; - threadpool_callback_t _callback; - threadpool_data_t _data; + std::mutex _mutex; + std::condition_variable _is_complete; + std::atomic _is_dead; + threadpool_callback_t _callback; + threadpool_data_t _data; public: task(); task(threadpool_callback_t callback_function, threadpool_data_t data); + void await_completion(); + friend class streamfx::util::threadpool; }; private: std::list _workers; - std::atomic_bool _worker_stop; + std::atomic _worker_stop; std::atomic _worker_idx; std::list> _tasks; std::mutex _tasks_lock;