diff options
Diffstat (limited to 'src/task_runner_thread.cc')
| -rw-r--r-- | src/task_runner_thread.cc | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/src/task_runner_thread.cc b/src/task_runner_thread.cc new file mode 100644 index 0000000..6e1a06f --- /dev/null +++ b/src/task_runner_thread.cc @@ -0,0 +1,73 @@ +#include "common.hh" + +#include <algorithm> +#include <condition_variable> +#include <deque> +#include <mutex> +#include <thread> + +#include "task_runner.hh" + +namespace { + +class TaskRunnerThread : public TaskRunner { +public: + explicit TaskRunnerThread(size_t threads) + : threads_(std::max<size_t>(1, threads)) { + thread_ = std::make_unique<std::thread[]>(threads_); + for (size_t i = 0; i < threads_; ++i) + thread_[i] = std::thread(&TaskRunnerThread::thread, this); + } + + ~TaskRunnerThread() override { + { + std::lock_guard<std::mutex> lock(mutex_); + quit_ = true; + } + cond_.notify_all(); + for (size_t i = 0; i < threads_; ++i) + thread_[i].join(); + } + + void post(std::function<void()> callback) override { + { + std::lock_guard<std::mutex> lock(mutex_); + queue_.push_back(std::move(callback)); + } + cond_.notify_one(); + } + +private: + void thread() { + while (true) { + std::function<void()> callback; + while (true) { + std::unique_lock<std::mutex> lock(mutex_); + if (queue_.empty()) { + if (quit_) + return; + cond_.wait(lock); + } else { + callback = std::move(queue_.front()); + queue_.pop_front(); + break; + } + } + + callback(); + } + } + + size_t const threads_; + bool quit_{false}; + std::condition_variable cond_; + std::mutex mutex_; + std::deque<std::function<void()>> queue_; + std::unique_ptr<std::thread[]> thread_; +}; + +} // namespace + +std::unique_ptr<TaskRunner> TaskRunner::create(size_t threads) { + return std::make_unique<TaskRunnerThread>(threads); +} |
