summaryrefslogtreecommitdiff
path: root/src/poll_looper.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/poll_looper.cc')
-rw-r--r--src/poll_looper.cc249
1 files changed, 249 insertions, 0 deletions
diff --git a/src/poll_looper.cc b/src/poll_looper.cc
new file mode 100644
index 0000000..71788e0
--- /dev/null
+++ b/src/poll_looper.cc
@@ -0,0 +1,249 @@
+#include "common.hh"
+
+#include <poll.h>
+#include <string.h>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "clock.hh"
+#include "poll_looper.hh"
+
+namespace {
+
+void empty_callback(Looper*, int, uint8_t) {
+ assert(false);
+}
+
+int const READ_POLL_MASK = POLLIN | POLLPRI;
+int const WRITE_POLL_MASK = POLLOUT;
+
+class PollLooperImpl : public PollLooper {
+public:
+ PollLooperImpl()
+ : next_id_(0), active_(0), exit_(false) {
+ }
+
+ void add(int fd, uint8_t events,
+ std::function<void(Looper*, int, uint8_t)> const& callback)
+ override {
+ if (fd < 0) {
+ assert(false);
+ return;
+ }
+ auto it = fds_.find(fd);
+ if (it != fds_.end()) {
+ if (!removed_.count(it->second)) {
+ modify(fd, events);
+ return;
+ }
+ }
+ struct pollfd poll;
+ poll.fd = fd;
+ poll.events = 0;
+ if (events & EV_READ) {
+ poll.events |= READ_POLL_MASK;
+ }
+ if (events & EV_WRITE) {
+ poll.events |= WRITE_POLL_MASK;
+ }
+ if (free_.empty()) {
+ fds_[fd] = poll_.size();
+ callback_.push_back(callback);
+ poll_.push_back(poll);
+ } else {
+ auto it = free_.begin();
+ auto index = *it;
+ free_.erase(it);
+ fds_[fd] = index;
+ callback_[index] = callback;
+ poll_[index] = poll;
+ }
+ }
+
+ void modify(int fd, uint8_t events) override {
+ auto it = fds_.find(fd);
+ if (it == fds_.end()) {
+ assert(false);
+ return;
+ }
+ auto& poll = poll_[it->second];
+ poll.events = 0;
+ if (events & EV_READ) {
+ poll.events |= READ_POLL_MASK;
+ }
+ if (events & EV_WRITE) {
+ poll.events |= WRITE_POLL_MASK;
+ }
+ }
+
+ void remove(int fd) override {
+ auto it = fds_.find(fd);
+ if (it == fds_.end()) {
+ assert(false);
+ return;
+ }
+ if (it->second < active_) {
+ removed_.insert(it->second);
+ } else {
+ free_.insert(it->second);
+ poll_[it->second].fd = -1;
+ poll_[it->second].events = 0;
+ callback_[it->second] = empty_callback;
+ fds_.erase(it);
+ }
+ }
+
+ uint32_t schedule(double delay,
+ std::function<void(Looper*, uint32_t)> const& callback)
+ override {
+ assert(delay >= 0.0);
+ uint32_t id = next_id();
+ auto target = clk::steady() + delay;
+ std::vector<Timer>::iterator insert = timer_.end();
+ if (timer_.empty() || timer_.back().target >= target) {
+ auto it = timer_.begin();
+ for (; it != timer_.end(); ++it) {
+ if (it->target >= target) {
+ insert = it;
+ break;
+ }
+ }
+ }
+ timer_.emplace(insert, id, target, callback);
+ return id;
+ }
+
+ void cancel(uint32_t id) override {
+ auto active = active_ids_.find(id);
+ if (active == active_ids_.end()) {
+ assert(false);
+ return;
+ }
+ for (auto it = timer_.begin(); it != timer_.end(); ++it) {
+ if (it->id == id) {
+ active_ids_.erase(active);
+ timer_.erase(it);
+ break;
+ }
+ }
+ }
+
+ void exit_when_empty() override {
+ exit_ = true;
+ }
+
+ void run() override {
+ while (true) {
+ auto now = clk::steady();
+ while (true) {
+ if (timer_.empty()) {
+ now = -1.0;
+ break;
+ }
+ if (now < timer_.front().target) {
+ now = timer_.front().target - clk::steady();
+ if (now < 0.0) now = 0.0;
+ break;
+ }
+ auto id = timer_.front().id;
+ auto callback = timer_.front().callback;
+ timer_.erase(timer_.begin());
+ active_ids_.erase(id);
+ callback(this, id);
+ }
+ active_ = poll_.size();
+ while (active_ > 0 && poll_[active_ - 1].fd < 0) --active_;
+ if (exit_ && now < 0.0 && active_ == 0) return;
+ auto ret = poll(&poll_[0], active_, now < 0.0 ? -1 : now * 1000);
+ if (ret < 0) {
+ if (errno == EINTR) continue;
+ return;
+ }
+ for (size_t i = 0; ret && i < active_; ++i) {
+ if (poll_[i].revents) {
+ --ret;
+ if (removed_.count(i)) continue;
+ uint8_t events = 0;
+ if (poll_[i].revents & READ_POLL_MASK
+ && poll_[i].events & READ_POLL_MASK) {
+ events |= EV_READ;
+ }
+ if (poll_[i].revents & WRITE_POLL_MASK
+ && poll_[i].events & WRITE_POLL_MASK) {
+ events |= EV_WRITE;
+ }
+ if (poll_[i].revents & POLLHUP) {
+ if (poll_[i].events & READ_POLL_MASK) {
+ events |= EV_READ;
+ } else {
+ events |= EV_ERR;
+ }
+ }
+ if (poll_[i].revents & POLLERR) {
+ events |= EV_ERR;
+ }
+ if (poll_[i].revents & POLLNVAL) {
+ events |= EV_ERR;
+ removed_.insert(i);
+ }
+ if (events) {
+ callback_[i](this, poll_[i].fd, events);
+ }
+ }
+ }
+
+ active_ = 0;
+ while (!removed_.empty()) {
+ auto index = *removed_.begin();
+ removed_.erase(removed_.begin());
+ auto it = fds_.find(poll_[index].fd);
+ if (it == fds_.end() || it->second != index) {
+ free_.insert(index);
+ poll_[index].fd = -1;
+ poll_[index].events = 0;
+ callback_[index] = empty_callback;
+ } else {
+ remove(it->first);
+ }
+ }
+ }
+ }
+
+private:
+ struct Timer {
+ uint32_t id;
+ double target;
+ std::function<void(Looper*, uint32_t)> callback;
+
+ Timer(uint32_t id, double target,
+ std::function<void(Looper*, uint32_t)> const& callback)
+ : id(id), target(target), callback(callback) {
+ }
+ };
+
+ uint32_t next_id() {
+ uint32_t ret = ++next_id_;
+ if (ret == 0) return next_id();
+ if (!active_ids_.emplace(ret).second) return next_id();
+ return ret;
+ }
+
+ std::vector<Timer> timer_;
+ uint32_t next_id_;
+ std::unordered_set<uint32_t> active_ids_;
+ std::vector<struct pollfd> poll_;
+ std::vector<std::function<void(Looper*, int, uint8_t)>> callback_;
+ std::unordered_set<size_t> free_;
+ std::unordered_map<int, size_t> fds_;
+ std::unordered_set<size_t> removed_;
+ size_t active_;
+ bool exit_;
+};
+
+} // namespace
+
+// static
+PollLooper* PollLooper::create() {
+ return new PollLooperImpl();
+}