diff options
| author | Joel Klinghed <the_jk@yahoo.com> | 2017-09-26 20:09:31 +0200 |
|---|---|---|
| committer | Joel Klinghed <the_jk@yahoo.com> | 2017-09-26 20:09:31 +0200 |
| commit | c85b624d28564a6f785b25000e2b7825592a919d (patch) | |
| tree | 647b756c824b470b35f1371eb869e9534ed6c1bb /src/poll_looper.cc | |
Initial commit
Diffstat (limited to 'src/poll_looper.cc')
| -rw-r--r-- | src/poll_looper.cc | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/src/poll_looper.cc b/src/poll_looper.cc new file mode 100644 index 0000000..71788e0 --- /dev/null +++ b/src/poll_looper.cc @@ -0,0 +1,249 @@ +#include "common.hh" + +#include <poll.h> +#include <string.h> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "clock.hh" +#include "poll_looper.hh" + +namespace { + +void empty_callback(Looper*, int, uint8_t) { + assert(false); +} + +int const READ_POLL_MASK = POLLIN | POLLPRI; +int const WRITE_POLL_MASK = POLLOUT; + +class PollLooperImpl : public PollLooper { +public: + PollLooperImpl() + : next_id_(0), active_(0), exit_(false) { + } + + void add(int fd, uint8_t events, + std::function<void(Looper*, int, uint8_t)> const& callback) + override { + if (fd < 0) { + assert(false); + return; + } + auto it = fds_.find(fd); + if (it != fds_.end()) { + if (!removed_.count(it->second)) { + modify(fd, events); + return; + } + } + struct pollfd poll; + poll.fd = fd; + poll.events = 0; + if (events & EV_READ) { + poll.events |= READ_POLL_MASK; + } + if (events & EV_WRITE) { + poll.events |= WRITE_POLL_MASK; + } + if (free_.empty()) { + fds_[fd] = poll_.size(); + callback_.push_back(callback); + poll_.push_back(poll); + } else { + auto it = free_.begin(); + auto index = *it; + free_.erase(it); + fds_[fd] = index; + callback_[index] = callback; + poll_[index] = poll; + } + } + + void modify(int fd, uint8_t events) override { + auto it = fds_.find(fd); + if (it == fds_.end()) { + assert(false); + return; + } + auto& poll = poll_[it->second]; + poll.events = 0; + if (events & EV_READ) { + poll.events |= READ_POLL_MASK; + } + if (events & EV_WRITE) { + poll.events |= WRITE_POLL_MASK; + } + } + + void remove(int fd) override { + auto it = fds_.find(fd); + if (it == fds_.end()) { + assert(false); + return; + } + if (it->second < active_) { + removed_.insert(it->second); + } else { + free_.insert(it->second); + poll_[it->second].fd = -1; + poll_[it->second].events = 0; + callback_[it->second] = empty_callback; + fds_.erase(it); + } + } + + uint32_t schedule(double delay, + std::function<void(Looper*, uint32_t)> const& callback) + override { + assert(delay >= 0.0); + uint32_t id = next_id(); + auto target = clk::steady() + delay; + std::vector<Timer>::iterator insert = timer_.end(); + if (timer_.empty() || timer_.back().target >= target) { + auto it = timer_.begin(); + for (; it != timer_.end(); ++it) { + if (it->target >= target) { + insert = it; + break; + } + } + } + timer_.emplace(insert, id, target, callback); + return id; + } + + void cancel(uint32_t id) override { + auto active = active_ids_.find(id); + if (active == active_ids_.end()) { + assert(false); + return; + } + for (auto it = timer_.begin(); it != timer_.end(); ++it) { + if (it->id == id) { + active_ids_.erase(active); + timer_.erase(it); + break; + } + } + } + + void exit_when_empty() override { + exit_ = true; + } + + void run() override { + while (true) { + auto now = clk::steady(); + while (true) { + if (timer_.empty()) { + now = -1.0; + break; + } + if (now < timer_.front().target) { + now = timer_.front().target - clk::steady(); + if (now < 0.0) now = 0.0; + break; + } + auto id = timer_.front().id; + auto callback = timer_.front().callback; + timer_.erase(timer_.begin()); + active_ids_.erase(id); + callback(this, id); + } + active_ = poll_.size(); + while (active_ > 0 && poll_[active_ - 1].fd < 0) --active_; + if (exit_ && now < 0.0 && active_ == 0) return; + auto ret = poll(&poll_[0], active_, now < 0.0 ? -1 : now * 1000); + if (ret < 0) { + if (errno == EINTR) continue; + return; + } + for (size_t i = 0; ret && i < active_; ++i) { + if (poll_[i].revents) { + --ret; + if (removed_.count(i)) continue; + uint8_t events = 0; + if (poll_[i].revents & READ_POLL_MASK + && poll_[i].events & READ_POLL_MASK) { + events |= EV_READ; + } + if (poll_[i].revents & WRITE_POLL_MASK + && poll_[i].events & WRITE_POLL_MASK) { + events |= EV_WRITE; + } + if (poll_[i].revents & POLLHUP) { + if (poll_[i].events & READ_POLL_MASK) { + events |= EV_READ; + } else { + events |= EV_ERR; + } + } + if (poll_[i].revents & POLLERR) { + events |= EV_ERR; + } + if (poll_[i].revents & POLLNVAL) { + events |= EV_ERR; + removed_.insert(i); + } + if (events) { + callback_[i](this, poll_[i].fd, events); + } + } + } + + active_ = 0; + while (!removed_.empty()) { + auto index = *removed_.begin(); + removed_.erase(removed_.begin()); + auto it = fds_.find(poll_[index].fd); + if (it == fds_.end() || it->second != index) { + free_.insert(index); + poll_[index].fd = -1; + poll_[index].events = 0; + callback_[index] = empty_callback; + } else { + remove(it->first); + } + } + } + } + +private: + struct Timer { + uint32_t id; + double target; + std::function<void(Looper*, uint32_t)> callback; + + Timer(uint32_t id, double target, + std::function<void(Looper*, uint32_t)> const& callback) + : id(id), target(target), callback(callback) { + } + }; + + uint32_t next_id() { + uint32_t ret = ++next_id_; + if (ret == 0) return next_id(); + if (!active_ids_.emplace(ret).second) return next_id(); + return ret; + } + + std::vector<Timer> timer_; + uint32_t next_id_; + std::unordered_set<uint32_t> active_ids_; + std::vector<struct pollfd> poll_; + std::vector<std::function<void(Looper*, int, uint8_t)>> callback_; + std::unordered_set<size_t> free_; + std::unordered_map<int, size_t> fds_; + std::unordered_set<size_t> removed_; + size_t active_; + bool exit_; +}; + +} // namespace + +// static +PollLooper* PollLooper::create() { + return new PollLooperImpl(); +} |
