// -*- mode: c++; c-basic-offset: 2; -*- #include "common.hh" #include #include #include #include "looper.hh" namespace { int const read_events = POLLIN | POLLPRI #ifdef POLLRDHUP | POLLRDHUP #endif ; int const write_events = POLLOUT; int const error_events = POLLERR | POLLNVAL; int const hup_events = POLLHUP; class LooperImpl : public Looper { public: LooperImpl() : fds_to_remove_(0), fds_protected_(0), quit_(false) { } ~LooperImpl() override { for (auto& timeout : timeouts_) { delete timeout; } } void add(int fd, uint8_t events, FdCallback const& callback) override { if (fd < 0) { assert(false); return; } for (auto it = fds_.begin(); it != fds_.end(); ++it) { if (it->fd == fd) { size_t index = it - fds_.begin(); auto& entry = fdentries_[index]; if (index >= fds_protected_) { entry.callback = callback; it->events = pollevents(events); return; } else { // Don't call new callback this run, so add it at the end but // remove the old callback as it would be replaced if (!entry.removed) { entry.removed = true; fds_to_remove_++; } } } } fds_.emplace_back((struct pollfd) { fd, pollevents(events), 0 }); fdentries_.emplace_back(callback); } void modify(int fd, uint8_t events) override { if (fd < 0) { assert(false); return; } for (auto it = fds_.begin(); it != fds_.end(); ++it) { if (it->fd == fd) { size_t index = it - fds_.begin(); if (index < fds_protected_) { auto entry = fdentries_.begin() + index; // If entry is removed we need to write to the later one if (entry->removed) { continue; } } it->events = pollevents(events); return; } } assert(false); } void remove(int fd) override { if (fd < 0) return; for (auto it = fds_.begin(); it != fds_.end(); ++it) { if (it->fd == fd) { size_t index = it - fds_.begin(); auto entry = fdentries_.begin() + index; if (index < fds_protected_) { if (!entry->removed) { entry->removed = true; fds_to_remove_++; } } else { fds_.erase(it); fdentries_.erase(entry); } return; } } assert(false); } void* schedule(float delay_s, ScheduleCallback const& callback) override { clock::time_point target = clock::now() + std::chrono::duration_cast( std::chrono::duration(delay_s)); auto timeout = new Timeout(target, callback); for (auto it = timeouts_.begin(); it != timeouts_.end(); ++it) { if (target < (*it)->target) { timeouts_.insert(it, timeout); return timeout; } } timeouts_.push_back(timeout); return timeout; } void cancel(void* handle) override { auto timeout = reinterpret_cast(handle); if (!timeout) { assert(false); return; } if (timeout->expired) return; for (auto it = timeouts_.begin(); it != timeouts_.end(); ++it) { if (*it == timeout) { timeouts_.erase(it); delete timeout; return; } } assert(false); } void quit() override { quit_ = true; } bool run() override { std::vector expired; while (!quit_) { int timeout = -1; if (!timeouts_.empty()) { auto dur = std::chrono::duration_cast( timeouts_.front()->target - clock::now()); if (dur.count() <= 0) { timeout = 0; } else if (dur.count() < std::numeric_limits::max()) { timeout = dur.count(); } else { timeout = std::numeric_limits::max(); } } auto ret = poll(fds_.data(), fds_.size(), timeout); if (ret < 0) { if (errno == EINTR) continue; return false; } now_ = clock::now(); fds_protected_ = fds_.size(); if (!timeouts_.empty()) { while (timeouts_.front()->target <= now_) { auto timeout = timeouts_.front(); timeouts_.pop_front(); timeout->expired = true; expired.push_back(timeout); if (timeouts_.empty()) break; } for (auto& timeout : expired) { timeout->callback(timeout); } for (auto& timeout : expired) { delete timeout; } expired.clear(); } // Not using iterators here as that would be unsafe with // add() and remove() modifying the vector outside protected range // while callbacks are called size_t i; for (i = 0; ret > 0 && i < fds_protected_; ++i) { if (fds_[i].revents) { --ret; if (!fdentries_[i].removed) { fdentries_[i].callback(fds_[i].fd, unpollevents(fds_[i].revents)); } } } assert(ret == 0); assert(fds_.size() >= fds_protected_); assert(fdentries_.size() >= fds_protected_); for (i = fds_protected_; fds_to_remove_ > 0 && i > 0; --i) { if (fdentries_[i - 1].removed) { --fds_to_remove_; fds_.erase(fds_.begin() + i - 1); fdentries_.erase(fdentries_.begin() + i - 1); } } assert(fds_to_remove_ == 0); fds_protected_ = 0; } return true; } clock::time_point now() const override { return now_; } private: struct FdEntry { FdCallback callback; bool removed; FdEntry(FdCallback const& callback) : callback(callback), removed(false) { } }; struct Timeout { clock::time_point target; ScheduleCallback callback; bool expired; Timeout(clock::time_point target, ScheduleCallback const& callback) : target(target), callback(callback), expired(false) { } }; static uint8_t unpollevents(short events) { uint8_t ret = 0; if (events & read_events) { ret |= EVENT_READ; } if (events & write_events) { ret |= EVENT_WRITE; } if (events & error_events) { ret |= EVENT_ERROR; } if (events & hup_events) { ret |= EVENT_HUP; } return ret; } static short pollevents(uint8_t events) { int ret = 0; if (events & EVENT_READ) { ret |= read_events; } if (events & EVENT_WRITE) { ret |= write_events; } return ret; } std::vector fds_; std::vector fdentries_; size_t fds_to_remove_; size_t fds_protected_; std::list timeouts_; bool quit_; clock::time_point now_; }; } // namespace // static Looper* Looper::create() { return new LooperImpl(); } // static const uint8_t Looper::EVENT_READ = 1 << 0; // static const uint8_t Looper::EVENT_WRITE = 1 << 1; // static const uint8_t Looper::EVENT_ERROR = 1 << 2; // static const uint8_t Looper::EVENT_HUP = 1 << 3;