From fddf21b8dee309c5b02ebeb02537256f71d5d858 Mon Sep 17 00:00:00 2001 From: Michael Fabian 'Xaymar' Dirks Date: Sun, 18 Sep 2022 18:30:15 +0200 Subject: [PATCH] util/threadpool: Optimize for dynamic thread pooling While the previous approach of a static thread pool worked, it was sub-optimal in its resource usage. Many of the threads would never see a single task, and simply permanently sleep. This seems like a good idea, except that sleeping threads still end up in the scheduler, and thus waste a tiny amount of resources. It is better to instead dynamically spawn threads when needed and only keeping the bare minimum around all the time. These dynamically spawned threads are also explicitly set to background priority which further reduces scheduling overhead. Finally optimizing the memory layout to prevent unwanted false sharing should also keep sporadic wake ups at a minimum. This new model should be able to handle many more tasks than ever before, but is still not as optimal as it could be. --- source/filters/filter-autoframing.cpp | 2 +- source/filters/filter-autoframing.hpp | 2 +- source/filters/filter-denoising.cpp | 2 +- source/filters/filter-denoising.hpp | 2 +- source/filters/filter-upscaling.cpp | 2 +- source/filters/filter-upscaling.hpp | 2 +- source/filters/filter-virtual-greenscreen.cpp | 2 +- source/filters/filter-virtual-greenscreen.hpp | 2 +- source/plugin.cpp | 12 +- source/plugin.hpp | 2 +- source/updater.cpp | 2 +- source/updater.hpp | 2 +- source/util/util-threadpool.cpp | 362 +++++++++++------- source/util/util-threadpool.hpp | 173 ++++++--- 14 files changed, 368 insertions(+), 201 deletions(-) diff --git a/source/filters/filter-autoframing.cpp b/source/filters/filter-autoframing.cpp index c9151ac..ab54daf 100644 --- a/source/filters/filter-autoframing.cpp +++ b/source/filters/filter-autoframing.cpp @@ -897,7 +897,7 @@ void streamfx::filter::autoframing::autoframing_instance::switch_provider(tracki std::bind(&autoframing_instance::task_switch_provider, this, std::placeholders::_1), spd); } -void streamfx::filter::autoframing::autoframing_instance::task_switch_provider(util::threadpool_data_t data) +void streamfx::filter::autoframing::autoframing_instance::task_switch_provider(util::threadpool::task_data_t data) { std::shared_ptr spd = std::static_pointer_cast(data); diff --git a/source/filters/filter-autoframing.hpp b/source/filters/filter-autoframing.hpp index 29ee24f..a4aa508 100644 --- a/source/filters/filter-autoframing.hpp +++ b/source/filters/filter-autoframing.hpp @@ -148,7 +148,7 @@ namespace streamfx::filter::autoframing { void tracking_tick(float seconds); void switch_provider(tracking_provider provider); - void task_switch_provider(util::threadpool_data_t data); + void task_switch_provider(util::threadpool::task_data_t data); #ifdef ENABLE_FILTER_AUTOFRAMING_NVIDIA void nvar_facedetection_load(); diff --git a/source/filters/filter-denoising.cpp b/source/filters/filter-denoising.cpp index 7b38cce..19b64f5 100644 --- a/source/filters/filter-denoising.cpp +++ b/source/filters/filter-denoising.cpp @@ -409,7 +409,7 @@ void streamfx::filter::denoising::denoising_instance::switch_provider(denoising_ std::bind(&denoising_instance::task_switch_provider, this, std::placeholders::_1), spd); } -void streamfx::filter::denoising::denoising_instance::task_switch_provider(util::threadpool_data_t data) +void streamfx::filter::denoising::denoising_instance::task_switch_provider(util::threadpool::task_data_t data) { std::shared_ptr spd = std::static_pointer_cast(data); diff --git a/source/filters/filter-denoising.hpp b/source/filters/filter-denoising.hpp index a524e6a..72f67bf 100644 --- a/source/filters/filter-denoising.hpp +++ b/source/filters/filter-denoising.hpp @@ -85,7 +85,7 @@ namespace streamfx::filter::denoising { private: void switch_provider(denoising_provider provider); - void task_switch_provider(util::threadpool_data_t data); + void task_switch_provider(util::threadpool::task_data_t data); #ifdef ENABLE_FILTER_DENOISING_NVIDIA void nvvfx_denoising_load(); diff --git a/source/filters/filter-upscaling.cpp b/source/filters/filter-upscaling.cpp index cc58f56..ab69ed4 100644 --- a/source/filters/filter-upscaling.cpp +++ b/source/filters/filter-upscaling.cpp @@ -394,7 +394,7 @@ void streamfx::filter::upscaling::upscaling_instance::switch_provider(upscaling_ std::bind(&upscaling_instance::task_switch_provider, this, std::placeholders::_1), spd); } -void streamfx::filter::upscaling::upscaling_instance::task_switch_provider(util::threadpool_data_t data) +void streamfx::filter::upscaling::upscaling_instance::task_switch_provider(util::threadpool::task_data_t data) { std::shared_ptr spd = std::static_pointer_cast(data); diff --git a/source/filters/filter-upscaling.hpp b/source/filters/filter-upscaling.hpp index 35932bd..2491f6e 100644 --- a/source/filters/filter-upscaling.hpp +++ b/source/filters/filter-upscaling.hpp @@ -86,7 +86,7 @@ namespace streamfx::filter::upscaling { private: void switch_provider(upscaling_provider provider); - void task_switch_provider(util::threadpool_data_t data); + void task_switch_provider(util::threadpool::task_data_t data); #ifdef ENABLE_FILTER_UPSCALING_NVIDIA void nvvfxsr_load(); diff --git a/source/filters/filter-virtual-greenscreen.cpp b/source/filters/filter-virtual-greenscreen.cpp index 62fb96f..7d664de 100644 --- a/source/filters/filter-virtual-greenscreen.cpp +++ b/source/filters/filter-virtual-greenscreen.cpp @@ -406,7 +406,7 @@ void streamfx::filter::virtual_greenscreen::virtual_greenscreen_instance::switch } void streamfx::filter::virtual_greenscreen::virtual_greenscreen_instance::task_switch_provider( - util::threadpool_data_t data) + util::threadpool::task_data_t data) { std::shared_ptr spd = std::static_pointer_cast(data); diff --git a/source/filters/filter-virtual-greenscreen.hpp b/source/filters/filter-virtual-greenscreen.hpp index c170add..267e1b2 100644 --- a/source/filters/filter-virtual-greenscreen.hpp +++ b/source/filters/filter-virtual-greenscreen.hpp @@ -86,7 +86,7 @@ namespace streamfx::filter::virtual_greenscreen { private: void switch_provider(virtual_greenscreen_provider provider); - void task_switch_provider(util::threadpool_data_t data); + void task_switch_provider(util::threadpool::task_data_t data); #ifdef ENABLE_FILTER_VIRTUAL_GREENSCREEN_NVIDIA void nvvfxgs_load(); diff --git a/source/plugin.cpp b/source/plugin.cpp index 25dff0c..2bcf078 100644 --- a/source/plugin.cpp +++ b/source/plugin.cpp @@ -94,10 +94,10 @@ #include #include "warning-enable.hpp" -static std::shared_ptr _threadpool; -static std::shared_ptr _gs_fstri_vb; -static std::shared_ptr _streamfx_gfx_opengl; -static std::shared_ptr _source_tracker; +static std::shared_ptr _threadpool; +static std::shared_ptr _gs_fstri_vb; +static std::shared_ptr _streamfx_gfx_opengl; +static std::shared_ptr _source_tracker; MODULE_EXPORT bool obs_module_load(void) { @@ -108,7 +108,7 @@ MODULE_EXPORT bool obs_module_load(void) streamfx::configuration::initialize(); // Initialize global Thread Pool. - _threadpool = std::make_shared(); + _threadpool = std::make_shared(); // Initialize Source Tracker _source_tracker = streamfx::obs::source_tracker::get(); @@ -338,7 +338,7 @@ MODULE_EXPORT void obs_module_unload(void) } } -std::shared_ptr streamfx::threadpool() +std::shared_ptr streamfx::threadpool() { return _threadpool; } diff --git a/source/plugin.hpp b/source/plugin.hpp index 3475bdb..c2f8a7a 100644 --- a/source/plugin.hpp +++ b/source/plugin.hpp @@ -22,7 +22,7 @@ namespace streamfx { // Threadpool - std::shared_ptr threadpool(); + std::shared_ptr threadpool(); void gs_draw_fullscreen_tri(); diff --git a/source/updater.cpp b/source/updater.cpp index 98b5924..1144bbc 100644 --- a/source/updater.cpp +++ b/source/updater.cpp @@ -228,7 +228,7 @@ streamfx::version_info::operator std::string() } } -void streamfx::updater::task(streamfx::util::threadpool_data_t) +void streamfx::updater::task(streamfx::util::threadpool::task_data_t) { try { auto query_fn = [](std::vector& buffer) { diff --git a/source/updater.hpp b/source/updater.hpp index 7b8a8fd..7e53278 100644 --- a/source/updater.hpp +++ b/source/updater.hpp @@ -82,7 +82,7 @@ namespace streamfx { bool _dirty; private: - void task(streamfx::util::threadpool_data_t); + void task(streamfx::util::threadpool::task_data_t); bool can_check(); diff --git a/source/util/util-threadpool.cpp b/source/util/util-threadpool.cpp index 5e719a4..cb67add 100644 --- a/source/util/util-threadpool.cpp +++ b/source/util/util-threadpool.cpp @@ -1,21 +1,18 @@ -/* - * Modern effects for a modern Streamer - * Copyright (C) 2020 Michael Fabian Dirks - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -*/ +// Copyright (C) 2020-2022 Michael Fabian Dirks +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA #include "util-threadpool.hpp" #include "common.hpp" @@ -25,6 +22,14 @@ #include #include "warning-enable.hpp" +#include "warning-disable.hpp" +#if defined(D_PLATFORM_WINDOWS) +#include +#elif defined(D_PLATFORM_LINUX) +#include +#endif +#include "warning-enable.hpp" + #ifdef _DEBUG #define ST_PREFIX "<%s> " #define D_LOG_ERROR(x, ...) P_LOG_ERROR(ST_PREFIX##x, __FUNCTION_SIG__, __VA_ARGS__) @@ -39,125 +44,214 @@ #define D_LOG_DEBUG(...) P_LOG_DEBUG(ST_PREFIX __VA_ARGS__) #endif -// Most Tasks likely wait for IO, so we can use that time for other tasks. -#define ST_CONCURRENCY_MULTIPLIER 2 - -streamfx::util::threadpool::threadpool() - : _workers(), _worker_stop(false), _worker_idx(0), _tasks(), _tasks_lock(), _tasks_cv() -{ - std::size_t concurrency = static_cast(std::thread::hardware_concurrency() * ST_CONCURRENCY_MULTIPLIER); - for (std::size_t n = 0; n < concurrency; n++) { - _workers.emplace_back(std::bind(&streamfx::util::threadpool::work, this)); - } -} - -streamfx::util::threadpool::~threadpool() -{ - _worker_stop = true; - _tasks_cv.notify_all(); - for (auto& thread : _workers) { - _tasks_cv.notify_all(); - if (thread.joinable()) { - thread.join(); - } - } -} - -std::shared_ptr<::streamfx::util::threadpool::task> streamfx::util::threadpool::push(threadpool_callback_t fn, - threadpool_data_t data) -{ - auto task = std::make_shared(fn, data); - - // Append the task to the queue. - std::unique_lock lock(_tasks_lock); - _tasks.emplace_back(task); - _tasks_cv.notify_one(); - - return task; -} - -void streamfx::util::threadpool::pop(std::shared_ptr<::streamfx::util::threadpool::task> work) -{ - if (work) { - { - std::unique_lock lock(work->_mutex); - work->_is_dead = true; - } - work->_is_complete.notify_all(); - } -} - -void streamfx::util::threadpool::work() -{ - std::shared_ptr local_work{}; - uint32_t local_number = _worker_idx.fetch_add(1); - - while (!_worker_stop) { - // Wait for more work, or immediately continue if there is still work to do. - { - // Lock the tasks mutex to check for work. - std::unique_lock lock(_tasks_lock); - - // If there are currently no tasks queued, wait on the condition variable. - // This temporarily unlocks the mutex until it is woken up. - if (_tasks.size() == 0) { - _tasks_cv.wait(lock, [this]() { return _worker_stop || _tasks.size() > 0; }); - } - - // If there is either no tasks or we were asked to stop, skip everything. - if (_worker_stop || (_tasks.size() == 0)) { - continue; - } - - // Grab the latest task and immediately remove it from the queue. - local_work = _tasks.front(); - _tasks.pop_front(); - } - - // If the task was killed, skip everything again. - if (local_work->_is_dead.load()) { - continue; - } - - // Try to execute work, but don't crash on catchable exceptions. - if (local_work->_callback) { - try { - local_work->_callback(local_work->_data); - } catch (std::exception const& ex) { - D_LOG_WARNING("Worker %" PRIx32 " caught exception from task (%" PRIxPTR ", %" PRIxPTR - ") with message: %s", - local_number, reinterpret_cast(local_work->_callback.target()), - reinterpret_cast(local_work->_data.get()), ex.what()); - } catch (...) { - D_LOG_WARNING("Worker %" PRIx32 " caught exception of unknown type from task (%" PRIxPTR ", %" PRIxPTR - ").", - local_number, reinterpret_cast(local_work->_callback.target()), - reinterpret_cast(local_work->_data.get())); - } - { - std::unique_lock lock(local_work->_mutex); - local_work->_is_dead.store(true); - } - local_work->_is_complete.notify_all(); - } - - // Remove our reference to the work unit. - local_work.reset(); - } - - _worker_idx.fetch_sub(1); -} - -streamfx::util::threadpool::task::task() = default; - -streamfx::util::threadpool::task::task(threadpool_callback_t fn, threadpool_data_t dt) - : _mutex(), _is_complete(), _is_dead(false), _callback(fn), _data(dt) +streamfx::util::threadpool::task::task(task_callback_t callback, task_data_t data) + : _callback(callback), _data(data), _lock(), _status_changed(), _cancelled(false), _completed(false), _failed(false) {} +streamfx::util::threadpool::task::~task() {} + +void streamfx::util::threadpool::task::run() +{ + std::lock_guard lg(_lock); + if (!_cancelled) { + try { + _callback(_data); + } catch (const std::exception& ex) { + D_LOG_ERROR("Unhandled exception in Task: %s.", ex.what()); + _failed = false; + } catch (...) { + D_LOG_ERROR("Unhandled exception in Task.", nullptr); + _failed = true; + } + } + _completed = true; + _status_changed.notify_all(); +} + +void streamfx::util::threadpool::task::cancel() +{ + std::lock_guard lg(_lock); + _cancelled = true; + _completed = true; + _status_changed.notify_all(); +} + +bool streamfx::util::threadpool::task::is_cancelled() +{ + return _cancelled; +} + +bool streamfx::util::threadpool::task::is_completed() +{ + return _completed; +} + +bool streamfx::util::threadpool::task::has_failed() +{ + return _failed; +} + +void streamfx::util::threadpool::task::wait() +{ + std::unique_lock ul(_lock); + if (!_cancelled && !_completed && !_failed) { + _status_changed.wait(ul, + [this]() { return this->is_completed() || this->is_cancelled() || this->has_failed(); }); + } +} + void streamfx::util::threadpool::task::await_completion() { - if (!_is_dead) { - std::unique_lock lock(_mutex); - _is_complete.wait(lock, [this]() { return this->_is_dead.load(); }); + wait(); +} + +streamfx::util::threadpool::threadpool::~threadpool() +{ + { // Terminate all remaining tasks. + std::lock_guard lg(_tasks_lock); + for (auto task : _tasks) { + task->cancel(); + } + _tasks.clear(); + } + + { // Notify workers to stop working. + { + std::lock_guard lg(_workers_lock); + for (auto worker : _workers) { + worker->stop = true; + } + } + { + std::lock_guard lg(_tasks_lock); + _tasks_cv.notify_all(); + } + for (auto worker : _workers) { + std::lock_guard lg(worker->lifeline); + } + } +} + +streamfx::util::threadpool::threadpool::threadpool(size_t minimum, size_t maximum) + : _limits{minimum, maximum}, _workers_lock(), _workers(), _tasks_lock(), _tasks_cv(), _tasks() +{ + // Spawn the minimum number of threads. + spawn(_limits.first); +} + +std::shared_ptr + streamfx::util::threadpool::threadpool::push(task_callback_t callback, task_data_t data /*= nullptr*/) +{ + std::lock_guard lg(_tasks_lock); + constexpr size_t threshold = 3; + + // Enqueue the new task. + auto task = std::make_shared(callback, data); + _tasks.emplace_back(task); + + // Spawn additional workers if the number of queued tasks exceeds a threshold. + if (_tasks.size() > (threshold * _worker_count)) { + spawn(_tasks.size() / threshold); + } + + // Return handle to caller. + return task; +} + +void streamfx::util::threadpool::threadpool::pop(std::shared_ptr task) +{ + if (task) { + task->cancel(); + } + std::lock_guard lg(_tasks_lock); + _tasks.remove(task); +} + +void streamfx::util::threadpool::threadpool::spawn(size_t count) +{ + std::lock_guard lg(_workers_lock); + for (size_t n = 0; (n < count) && (_worker_count < _limits.second); n++) { + auto wi = std::make_shared(); + wi->stop = false; + wi->last_work_time = std::chrono::high_resolution_clock::now(); + wi->thread = std::thread(std::bind(&streamfx::util::threadpool::threadpool::work, this, wi)); + wi->thread.detach(); + _workers.emplace_back(wi); + ++_worker_count; + D_LOG_DEBUG("Spawning new worker thread (%zu < %zu < %zu).", _limits.first, _worker_count.load(), + _limits.second); + } +} + +bool streamfx::util::threadpool::threadpool::die(std::shared_ptr wi) +{ + constexpr std::chrono::seconds delay{1}; + + std::lock_guard lg(_workers_lock); + bool result = false; + + if (_worker_count > _limits.first) { + auto now = std::chrono::high_resolution_clock::now(); + result = ((wi->last_work_time + delay) <= now) && ((_last_worker_death + delay) <= now); + + if (result) { + _last_worker_death = now; + --_worker_count; + _workers.remove(wi); + D_LOG_DEBUG("Terminated idle worker thread (%zu < %zu < %zu).", _limits.first, _worker_count.load(), + _limits.second); + } + } + + return result; +} + +void streamfx::util::threadpool::threadpool::work(std::shared_ptr wi) +{ + std::shared_ptr task{}; + std::lock_guard lg(wi->lifeline); + +#if defined(D_PLATFORM_WINDOWS) + SetThreadPriority(GetCurrentThread(), THREAD_MODE_BACKGROUND_BEGIN | THREAD_PRIORITY_BELOW_NORMAL); + SetThreadDescription(GetCurrentThread(), L"StreamFX Worker Thread"); +#elif defined(D_PLATFORM_LINUX) + struct sched_param param; + param.sched_priority = 0; + pthread_setschedparam(pthread_self(), SCHED_IDLE, ¶m); + pthread_setname_np(pthread_self(), "StreamFX Worker Thread"); +#endif + + while (!wi->stop) { + { // Try and acquire new work. + std::unique_lock ul(_tasks_lock); + + // Is there any work available right now? + if (_tasks.size() == 0) { // If not: + // Block this thread until it is notified of a change. + _tasks_cv.wait_until( + ul, + std::chrono::time_point(std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(250)), + [this, wi]() { return wi->stop || _tasks.size() > 0; }); + } + + // If we were asked to stop, skip everything. + if (wi->stop) { + continue; + } + + // If there is work to be done, take it. + if (_tasks.size() > 0) { + wi->last_work_time = std::chrono::high_resolution_clock::now(); + task = _tasks.front(); + _tasks.pop_front(); + } else if (die(wi)) { // Is the threadpool requesting less threads? + break; + } + } + + if (task) { + task->run(); + task.reset(); + } } } diff --git a/source/util/util-threadpool.hpp b/source/util/util-threadpool.hpp index a003a17..89152ef 100644 --- a/source/util/util-threadpool.hpp +++ b/source/util/util-threadpool.hpp @@ -1,75 +1,148 @@ -/* - * Modern effects for a modern Streamer - * Copyright (C) 2020 Michael Fabian Dirks - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA - */ +// Copyright (C) 2020-2022 Michael Fabian Dirks +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA #pragma once #include "warning-disable.hpp" #include +#include +#include #include +#include #include #include #include #include +#include #include #include #include "warning-enable.hpp" -namespace streamfx::util { - typedef std::shared_ptr threadpool_data_t; - typedef std::function threadpool_callback_t; +namespace streamfx::util::threadpool { + typedef std::shared_ptr task_data_t; + typedef std::function task_callback_t; + + struct worker_info { +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic stop; + +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::mutex lifeline; + + std::chrono::high_resolution_clock::time_point last_work_time; + + std::thread thread; + }; + + class task { + task_callback_t _callback; + task_data_t _data; + std::mutex _lock; + +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::condition_variable _status_changed; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic _cancelled; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic _completed; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic _failed; + + public: + task(task_callback_t callback, task_data_t data); + + public: + ~task(); + + public: + void run(); + + public: + void cancel(); + + public: + bool is_cancelled(); + + public: + bool is_completed(); + + public: + bool has_failed(); + + public: + void wait(); + + public: + void await_completion(); + }; class threadpool { - public: - class task { - protected: - std::mutex _mutex; - std::condition_variable _is_complete; - std::atomic _is_dead; - threadpool_callback_t _callback; - threadpool_data_t _data; + std::pair _limits; - public: - task(); - task(threadpool_callback_t callback_function, threadpool_data_t data); +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::mutex _workers_lock; + std::list> _workers; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::atomic _worker_count; + std::chrono::high_resolution_clock::time_point _last_worker_death; - void await_completion(); - - friend class streamfx::util::threadpool; - }; - - private: - std::list _workers; - std::atomic _worker_stop; - std::atomic _worker_idx; - std::list> _tasks; - std::mutex _tasks_lock; - std::condition_variable _tasks_cv; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::mutex _tasks_lock; +#if __cpp_lib_hardware_interference_size >= 201603 + alignas(std::hardware_destructive_interference_size) +#endif + std::condition_variable _tasks_cv; + std::list> _tasks; public: - threadpool(); ~threadpool(); - std::shared_ptr<::streamfx::util::threadpool::task> push(threadpool_callback_t callback_function, - threadpool_data_t data); + public: + threadpool(size_t minimum = 2, size_t maximum = std::thread::hardware_concurrency()); - void pop(std::shared_ptr<::streamfx::util::threadpool::task> work); + public: + std::shared_ptr push(task_callback_t callback, task_data_t data = nullptr); + + public: + void pop(std::shared_ptr task); private: - void work(); + void spawn(size_t count = 1); + + private: + bool die(std::shared_ptr); + + private: + void work(std::shared_ptr); }; -} // namespace streamfx::util +} // namespace streamfx::util::threadpool