#include "common.hh" #include #include #include "io.hh" #include "looper.hh" #include "task_runner.hh" #include "unique_pipe.hh" namespace { constexpr char kMessage = 1; class TaskRunnerLooper : public TaskRunner { public: explicit TaskRunnerLooper(std::shared_ptr looper) : looper_(looper), pipe_(true, false) { looper_->add(pipe_.reader(), Looper::EVENT_READ, std::bind(&TaskRunnerLooper::run, this, std::placeholders::_1)); } ~TaskRunnerLooper() override { looper_->remove(pipe_.reader()); } void post(std::function callback) override { bool notify; { std::lock_guard lock(mutex_); notify = queue_.empty(); queue_.push_back(std::move(callback)); } if (notify) io::write_all(pipe_.writer(), &kMessage, 1); } private: void run(uint8_t event) { if (event & Looper::EVENT_READ) { char in[1]; io::read_all(pipe_.reader(), &in, 1); } if (event & Looper::EVENT_ERROR) { assert(false); looper_->quit(); } bool more; do { std::function callback; { std::lock_guard lock(mutex_); if (queue_.empty()) break; callback = std::move(queue_.front()); queue_.pop_front(); more = !queue_.empty(); } callback(); } while (more); } std::shared_ptr looper_; unique_pipe pipe_; std::mutex mutex_; std::deque> queue_; }; } // namespace std::unique_ptr TaskRunner::create(std::shared_ptr looper) { return std::make_unique(looper); }