diff options
| author | Joel Klinghed <the_jk@spawned.biz> | 2025-02-20 22:54:56 +0100 |
|---|---|---|
| committer | Joel Klinghed <the_jk@spawned.biz> | 2025-02-20 22:54:56 +0100 |
| commit | b4d6df902253637f24647d3db2bc3781d69eec1c (patch) | |
| tree | d8bf9ac04a270fabdfee1c15628c702471ef8bf5 /src/looper_poll.cc | |
| parent | 441cafc7124f633e5abc684e85a11ce3c991f6ae (diff) | |
Diffstat (limited to 'src/looper_poll.cc')
| -rw-r--r-- | src/looper_poll.cc | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/src/looper_poll.cc b/src/looper_poll.cc new file mode 100644 index 0000000..1945583 --- /dev/null +++ b/src/looper_poll.cc @@ -0,0 +1,224 @@ +#include "looper.hh" + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <deque> +#include <errno.h> +#include <poll.h> +#include <string.h> +#include <unordered_map> +#include <vector> + +#include "logger.hh" + +namespace { + +class LooperPoll : public Looper { +public: + LooperPoll() = default; + + void add(int fd, uint8_t events, + std::function<void(uint8_t)> callback) override { + if (fd < 0) + return; + auto ret = entry_.emplace(fd, Entry(events)); + if (!ret.second) { + assert(ret.first->second.delete_); + ret.first->second.delete_ = false; + ret.first->second.events_ = events; + } + ret.first->second.callback_ = std::move(callback); + } + + void update(int fd, uint8_t events) override { + if (fd < 0) + return; + auto it = entry_.find(fd); + if (it == entry_.end() || it->second.delete_) { + assert(false); + return; + } + it->second.events_ = events; + } + + void remove(int fd) override { + if (fd < 0) + return; + auto it = entry_.find(fd); + if (it == entry_.end()) + return; + it->second.delete_ = true; + } + + bool run(Logger* logger) override { + while (!quit_) { + int timeout; + if (scheduled_.empty()) { + timeout = -1; + } else { + auto now = std::chrono::steady_clock::now(); + while (true) { + if (now < scheduled_.front().target_) { + auto delay = std::chrono::duration_cast<std::chrono::milliseconds>( + scheduled_.front().target_ - now); + if (delay.count() <= std::numeric_limits<int>::max()) + timeout = delay.count(); + else + timeout = std::numeric_limits<int>::max(); + break; + } + auto id = scheduled_.front().id_; + auto callback = std::move(scheduled_.front().callback_); + scheduled_.pop_front(); + callback(id); + if (scheduled_.empty()) { + timeout = -1; + break; + } + } + // Scheduled callbacks might call quit(). + if (quit_) + break; + } + std::vector<struct pollfd> pollfd; + pollfd.reserve(entry_.size()); + auto it = entry_.begin(); + while (it != entry_.end()) { + if (it->second.delete_) { + it = entry_.erase(it); + } else { + struct pollfd tmp; + tmp.fd = it->first; + tmp.events = events_looper2poll(it->second.events_); + pollfd.push_back(std::move(tmp)); + ++it; + } + } + int active = poll(pollfd.data(), pollfd.size(), timeout); + if (active < 0) { + if (errno == EINTR) + continue; + logger->err("Poll failed: %s", strerror(errno)); + return false; + } + for (auto it2 = pollfd.begin(); active; ++it2) { + if (it2->revents == 0) + continue; + --active; + auto events = events_poll2looper(it2->revents); + if (events) { + it = entry_.find(it2->fd); + if (!it->second.delete_) { + events &= (it->second.events_ | EVENT_ERROR); + if (events) { + it->second.callback_(events); + } + } + } + } + } + // Reset quit_ so run() can be called again + quit_ = false; + return true; + } + + void quit() override { + quit_ = true; + } + + uint32_t schedule(double delay, + std::function<void(uint32_t)> callback) override { + assert(delay >= 0.0); + uint32_t id = next_schedule_id(); + auto target = std::chrono::steady_clock::now() + + std::chrono::duration_cast<std::chrono::steady_clock::duration>( + std::chrono::duration<double>(delay)); + auto insert = scheduled_.end(); + while (insert != scheduled_.begin()) { + auto prev = insert - 1; + if (prev->target_ < target) + break; + insert = prev; + } + scheduled_.emplace(insert, std::move(callback), id, target); + return id; + } + + void cancel(uint32_t id) override { + auto it = std::find_if(scheduled_.begin(), scheduled_.end(), + [id](auto const& scheduled) { + return scheduled.id_ == id; + }); + if (it != scheduled_.end()) { + scheduled_.erase(it); + } else { + assert(false); + } + } + +private: + struct Entry { + uint8_t events_; + std::function<void(uint8_t)> callback_; + bool delete_; + + explicit Entry(uint8_t events) + : events_(events), delete_(false) {} + }; + + struct Scheduled { + std::function<void(uint32_t)> callback_; + uint32_t id_; + std::chrono::steady_clock::time_point target_; + + Scheduled(std::function<void(uint32_t)> callback, uint32_t id, + std::chrono::steady_clock::time_point target) + : callback_(std::move(callback)), id_(id), target_(target) {} + }; + + static short events_looper2poll(uint8_t events) { + short ret = 0; + if (events & EVENT_READ) + ret |= POLLIN | POLLPRI; + if (events & EVENT_WRITE) + ret |= POLLOUT; + return ret; + } + + static uint8_t events_poll2looper(short events) { + uint8_t ret = 0; + if (events & (POLLIN | POLLPRI | POLLHUP)) + ret |= EVENT_READ; + if (events & POLLOUT) + ret |= EVENT_WRITE; + if (events & (POLLERR | POLLNVAL)) + ret |= EVENT_ERROR; + return ret; + } + + uint32_t next_schedule_id() { + while (true) { + uint32_t ret = next_schedule_id_++; + if (ret) { + bool found = std::any_of(scheduled_.begin(), scheduled_.end(), + [ret](auto const& scheduled) { + return scheduled.id_ == ret; + }); + if (!found) + return ret; + } + } + } + + bool quit_{false}; + std::unordered_map<int, Entry> entry_; + uint32_t next_schedule_id_{1}; + std::deque<Scheduled> scheduled_; +}; + +} // namespace + +std::unique_ptr<Looper> Looper::create() { + return std::make_unique<LooperPoll>(); +} |
