#include "common.hh" #include #include #include #include #include #include #include "clock.hh" #include "poll_looper.hh" namespace { void empty_callback(Looper*, int, uint8_t) { assert(false); } int const READ_POLL_MASK = POLLIN | POLLPRI; int const WRITE_POLL_MASK = POLLOUT; class PollLooperImpl : public PollLooper { public: PollLooperImpl() : next_id_(0), active_(0), exit_(false) { } void add(int fd, uint8_t events, std::function const& callback) override { if (fd < 0) { assert(false); return; } auto it = fds_.find(fd); if (it != fds_.end()) { if (!removed_.count(it->second)) { modify(fd, events); return; } } struct pollfd poll; poll.fd = fd; poll.events = 0; if (events & EV_READ) { poll.events |= READ_POLL_MASK; } if (events & EV_WRITE) { poll.events |= WRITE_POLL_MASK; } if (free_.empty()) { fds_[fd] = poll_.size(); callback_.push_back(callback); poll_.push_back(poll); } else { auto it = free_.begin(); auto index = *it; free_.erase(it); fds_[fd] = index; callback_[index] = callback; poll_[index] = poll; } } void modify(int fd, uint8_t events) override { auto it = fds_.find(fd); if (it == fds_.end()) { assert(false); return; } auto& poll = poll_[it->second]; poll.events = 0; if (events & EV_READ) { poll.events |= READ_POLL_MASK; } if (events & EV_WRITE) { poll.events |= WRITE_POLL_MASK; } } void remove(int fd) override { auto it = fds_.find(fd); if (it == fds_.end()) { assert(false); return; } if (it->second < active_) { removed_.insert(it->second); } else { free_.insert(it->second); poll_[it->second].fd = -1; poll_[it->second].events = 0; callback_[it->second] = empty_callback; fds_.erase(it); } } uint32_t schedule(double delay, std::function const& callback) override { assert(delay >= 0.0); uint32_t id = next_id(); auto target = clk::steady() + delay; std::vector::iterator insert = timer_.end(); if (timer_.empty() || timer_.back().target >= target) { auto it = timer_.begin(); for (; it != timer_.end(); ++it) { if (it->target >= target) { insert = it; break; } } } timer_.emplace(insert, id, target, callback); return id; } void cancel(uint32_t id) override { auto active = active_ids_.find(id); if (active == active_ids_.end()) { assert(false); return; } for (auto it = timer_.begin(); it != timer_.end(); ++it) { if (it->id == id) { active_ids_.erase(active); timer_.erase(it); break; } } } void exit_when_empty() override { exit_ = true; } void run() override { while (true) { auto now = clk::steady(); while (true) { if (timer_.empty()) { now = -1.0; break; } if (now < timer_.front().target) { now = timer_.front().target - clk::steady(); if (now < 0.0) now = 0.0; break; } auto id = timer_.front().id; auto callback = timer_.front().callback; timer_.erase(timer_.begin()); active_ids_.erase(id); callback(this, id); } active_ = poll_.size(); while (active_ > 0 && poll_[active_ - 1].fd < 0) --active_; if (exit_ && now < 0.0 && active_ == 0) return; auto ret = poll(&poll_[0], active_, now < 0.0 ? -1 : now * 1000); if (ret < 0) { if (errno == EINTR) continue; return; } for (size_t i = 0; ret && i < active_; ++i) { if (poll_[i].revents) { --ret; if (removed_.count(i)) continue; uint8_t events = 0; if (poll_[i].revents & READ_POLL_MASK && poll_[i].events & READ_POLL_MASK) { events |= EV_READ; } if (poll_[i].revents & WRITE_POLL_MASK && poll_[i].events & WRITE_POLL_MASK) { events |= EV_WRITE; } if (poll_[i].revents & POLLHUP) { if (poll_[i].events & READ_POLL_MASK) { events |= EV_READ; } else { events |= EV_ERR; } } if (poll_[i].revents & POLLERR) { events |= EV_ERR; } if (poll_[i].revents & POLLNVAL) { events |= EV_ERR; removed_.insert(i); } if (events) { callback_[i](this, poll_[i].fd, events); } } } active_ = 0; while (!removed_.empty()) { auto index = *removed_.begin(); removed_.erase(removed_.begin()); auto it = fds_.find(poll_[index].fd); if (it == fds_.end() || it->second != index) { free_.insert(index); poll_[index].fd = -1; poll_[index].events = 0; callback_[index] = empty_callback; } else { remove(it->first); } } } } private: struct Timer { uint32_t id; double target; std::function callback; Timer(uint32_t id, double target, std::function const& callback) : id(id), target(target), callback(callback) { } }; uint32_t next_id() { uint32_t ret = ++next_id_; if (ret == 0) return next_id(); if (!active_ids_.emplace(ret).second) return next_id(); return ret; } std::vector timer_; uint32_t next_id_; std::unordered_set active_ids_; std::vector poll_; std::vector> callback_; std::unordered_set free_; std::unordered_map fds_; std::unordered_set removed_; size_t active_; bool exit_; }; } // namespace // static std::unique_ptr PollLooper::create() { return std::make_unique(); }