diff --git a/src/openrct2/core/JobPool.cpp b/src/openrct2/core/JobPool.cpp new file mode 100644 index 0000000000..4f319ea056 --- /dev/null +++ b/src/openrct2/core/JobPool.cpp @@ -0,0 +1,103 @@ +#include "JobPool.hpp" + +JobPool::JobPool() + : _shouldStop(false), + _processing(0) +{ + for (size_t n = 0; n < std::thread::hardware_concurrency(); n++) + { + _threads.emplace_back(&JobPool::processQueue, this); + } +} + +JobPool::~JobPool() +{ + { + unique_lock lock(_mutex); + _shouldStop = true; + _condPending.notify_all(); + } + + for (auto&& th : _threads) + { + if(th.joinable()) + th.join(); + } +} + +void JobPool::addTask(std::function workFn, std::function completionFn) +{ + unique_lock lock(_mutex); + _pending.push_back(TaskData_t{workFn, completionFn}); + _condPending.notify_one(); +} + +void JobPool::addTask(std::function workFn) +{ + unique_lock lock(_mutex); + _pending.push_back(TaskData_t{ workFn, nullptr }); + _condPending.notify_one(); +} + +void JobPool::join() +{ + while (true) + { + unique_lock lock(_mutex); + _condComplete.wait(lock, [this]() + { + return (_pending.empty() && _processing == 0) || + (_completed.empty() == false); + }); + + if (_completed.empty() && + _pending.empty() && + _processing == 0) + { + break; + } + + auto taskData = _completed.front(); + _completed.pop_front(); + + lock.unlock(); + + taskData.completionFn(); + } +} + +void JobPool::processQueue() +{ + while (true) + { + unique_lock lock(_mutex); + _condPending.wait(lock, [this]() { + return _shouldStop || !_pending.empty(); + }); + if (!_pending.empty()) + { + _processing++; + + auto taskData = _pending.front(); + _pending.pop_front(); + + lock.unlock(); + + taskData.workFn(); + + lock.lock(); + + if (taskData.completionFn) + { + _completed.push_back(taskData); + } + + _processing--; + _condComplete.notify_one(); + } + if(_shouldStop) + break; + } +} + + diff --git a/src/openrct2/core/JobPool.hpp b/src/openrct2/core/JobPool.hpp new file mode 100644 index 0000000000..4e886ce373 --- /dev/null +++ b/src/openrct2/core/JobPool.hpp @@ -0,0 +1,57 @@ +#pragma region Copyright (c) 2014-2017 OpenRCT2 Developers +/***************************************************************************** + * OpenRCT2, an open source clone of Roller Coaster Tycoon 2. + * + * OpenRCT2 is the work of many authors, a full list can be found in contributors.md + * For more information, visit https://github.com/OpenRCT2/OpenRCT2 + * + * OpenRCT2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * A full copy of the GNU General Public License can be found in licence.txt + *****************************************************************************/ +#pragma endregion + +#include +#include +#include +#include +#include +#include + +class JobPool +{ +private: + struct TaskData_t + { + const std::function workFn; + const std::function completionFn; + }; + + std::vector _threads; + std::deque _pending; + std::deque _completed; + std::condition_variable _condPending; + std::condition_variable _condComplete; + std::atomic _processing; + std::atomic_bool _shouldStop; + std::mutex _mutex; + + typedef std::unique_lock unique_lock; + +public: + JobPool(); + ~JobPool(); + + void addTask(std::function workFn, + std::function completionFn); + + void addTask(std::function workFn); + + void join(); + +private: + void processQueue(); +};