diff options
Diffstat (limited to 'src/task_runner_looper.cc')
| -rw-r--r-- | src/task_runner_looper.cc | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/src/task_runner_looper.cc b/src/task_runner_looper.cc new file mode 100644 index 0000000..4c1292b --- /dev/null +++ b/src/task_runner_looper.cc @@ -0,0 +1,75 @@ +#include "common.hh" + +#include <deque> +#include <mutex> + +#include "io.hh" +#include "looper.hh" +#include "task_runner.hh" +#include "unique_pipe.hh" + +namespace { + +constexpr char kMessage = 1; + +class TaskRunnerLooper : public TaskRunner { +public: + explicit TaskRunnerLooper(std::shared_ptr<Looper> looper) + : looper_(looper), pipe_(true, false) { + looper_->add(pipe_.reader(), Looper::EVENT_READ, + std::bind(&TaskRunnerLooper::run, this, + std::placeholders::_1)); + } + + ~TaskRunnerLooper() override { + looper_->remove(pipe_.reader()); + } + + void post(std::function<void()> callback) override { + bool notify; + { + std::lock_guard<std::mutex> lock(mutex_); + notify = queue_.empty(); + queue_.push_back(std::move(callback)); + } + if (notify) + io::write_all(pipe_.writer(), &kMessage, 1); + } + +private: + void run(uint8_t event) { + if (event & Looper::EVENT_READ) { + char in[1]; + io::read_all(pipe_.reader(), &in, 1); + } + if (event & Looper::EVENT_ERROR) { + assert(false); + looper_->quit(); + } + bool more; + do { + std::function<void()> callback; + { + std::lock_guard<std::mutex> lock(mutex_); + if (queue_.empty()) + break; + callback = std::move(queue_.front()); + queue_.pop_front(); + more = !queue_.empty(); + } + callback(); + } while (more); + } + + std::shared_ptr<Looper> looper_; + unique_pipe pipe_; + std::mutex mutex_; + std::deque<std::function<void()>> queue_; +}; + +} // namespace + +std::unique_ptr<TaskRunner> TaskRunner::create(std::shared_ptr<Looper> looper) { + return std::make_unique<TaskRunnerLooper>(looper); +} + |
