#include "looper.hh" #include #include #include #include #include #include #include #include #include #include "logger.hh" namespace { class LooperPoll : public Looper { public: LooperPoll() = default; void add(int fd, uint8_t events, std::function callback) override { if (fd < 0) return; auto ret = entry_.emplace(fd, Entry(events)); if (!ret.second) { assert(ret.first->second.delete_); ret.first->second.delete_ = false; ret.first->second.events_ = events; } ret.first->second.callback_ = std::move(callback); } void update(int fd, uint8_t events) override { if (fd < 0) return; auto it = entry_.find(fd); if (it == entry_.end() || it->second.delete_) { assert(false); return; } it->second.events_ = events; } void remove(int fd) override { if (fd < 0) return; auto it = entry_.find(fd); if (it == entry_.end()) return; it->second.delete_ = true; } bool run(Logger* logger) override { while (!quit_) { int timeout; if (scheduled_.empty()) { timeout = -1; } else { auto now = std::chrono::steady_clock::now(); while (true) { if (now < scheduled_.front().target_) { auto delay = std::chrono::duration_cast( scheduled_.front().target_ - now); if (delay.count() <= std::numeric_limits::max()) timeout = delay.count(); else timeout = std::numeric_limits::max(); break; } auto id = scheduled_.front().id_; auto callback = std::move(scheduled_.front().callback_); scheduled_.pop_front(); callback(id); if (scheduled_.empty()) { timeout = -1; break; } } // Scheduled callbacks might call quit(). if (quit_) break; } std::vector pollfd; pollfd.reserve(entry_.size()); auto it = entry_.begin(); while (it != entry_.end()) { if (it->second.delete_) { it = entry_.erase(it); } else { struct pollfd tmp; tmp.fd = it->first; tmp.events = events_looper2poll(it->second.events_); pollfd.push_back(std::move(tmp)); ++it; } } int active = poll(pollfd.data(), pollfd.size(), timeout); if (active < 0) { if (errno == EINTR) continue; logger->err("Poll failed: %s", strerror(errno)); return false; } for (auto it2 = pollfd.begin(); active; ++it2) { if (it2->revents == 0) continue; --active; auto events = events_poll2looper(it2->revents); if (events) { it = entry_.find(it2->fd); if (!it->second.delete_) { events &= (it->second.events_ | EVENT_ERROR); if (events) { it->second.callback_(events); } } } } } // Reset quit_ so run() can be called again quit_ = false; return true; } void quit() override { quit_ = true; } uint32_t schedule(double delay, std::function callback) override { assert(delay >= 0.0); uint32_t id = next_schedule_id(); auto target = std::chrono::steady_clock::now() + std::chrono::duration_cast( std::chrono::duration(delay)); auto insert = scheduled_.end(); while (insert != scheduled_.begin()) { auto prev = insert - 1; if (prev->target_ < target) break; insert = prev; } scheduled_.emplace(insert, std::move(callback), id, target); return id; } void cancel(uint32_t id) override { auto it = std::find_if(scheduled_.begin(), scheduled_.end(), [id](auto const& scheduled) { return scheduled.id_ == id; }); if (it != scheduled_.end()) { scheduled_.erase(it); } else { assert(false); } } private: struct Entry { uint8_t events_; std::function callback_; bool delete_; explicit Entry(uint8_t events) : events_(events), delete_(false) {} }; struct Scheduled { std::function callback_; uint32_t id_; std::chrono::steady_clock::time_point target_; Scheduled(std::function callback, uint32_t id, std::chrono::steady_clock::time_point target) : callback_(std::move(callback)), id_(id), target_(target) {} }; static short events_looper2poll(uint8_t events) { short ret = 0; if (events & EVENT_READ) ret |= POLLIN | POLLPRI; if (events & EVENT_WRITE) ret |= POLLOUT; return ret; } static uint8_t events_poll2looper(short events) { uint8_t ret = 0; if (events & (POLLIN | POLLPRI | POLLHUP)) ret |= EVENT_READ; if (events & POLLOUT) ret |= EVENT_WRITE; if (events & (POLLERR | POLLNVAL)) ret |= EVENT_ERROR; return ret; } uint32_t next_schedule_id() { while (true) { uint32_t ret = next_schedule_id_++; if (ret) { bool found = std::any_of(scheduled_.begin(), scheduled_.end(), [ret](auto const& scheduled) { return scheduled.id_ == ret; }); if (!found) return ret; } } } bool quit_{false}; std::unordered_map entry_; uint32_t next_schedule_id_{1}; std::deque scheduled_; }; } // namespace std::unique_ptr Looper::create() { return std::make_unique(); }