#include "server.hh" #include #include #include #include #include #include #include #include "buffer.hh" #include "logger.hh" #include "looper.hh" #include "net.hh" namespace { constexpr auto CLIENT_TIMEOUT = std::chrono::seconds(30); constexpr size_t BUFFER_SIZE = 1 * 1024 * 1024; int g_signal_pipe[2]; void signal_handler(int) { write(g_signal_pipe[1], "1", 1); } class ServerImpl : public Server { public: ServerImpl() : looper_(Looper::create()) {} ~ServerImpl() override { for (auto& fd : listen_) { looper_->remove(fd.get()); } } bool setup(std::shared_ptr logger, std::string const& host, uint16_t port) override { logger_ = logger; listen_ = net::bind_and_listen(logger.get(), host, port); if (listen_.empty()) { logger->err("Unable to listen"); return false; } for (auto& fd : listen_) { looper_->add(fd.get(), Looper::EVENT_READ, std::bind(&ServerImpl::listen_read, this, fd.get(), std::placeholders::_1)); } return true; } bool run(std::shared_ptr logger) override { logger_ = logger; bool close_pipe; if (pipe(g_signal_pipe) == 0) { close_pipe = true; struct sigaction sa; sa.sa_handler = signal_handler; sigemptyset(&sa.sa_mask); sa.sa_flags = SA_RESTART; sigaction(SIGINT, &sa, nullptr); sigaction(SIGTERM, &sa, nullptr); sigaction(SIGQUIT, &sa, nullptr); looper_->add(g_signal_pipe[0], Looper::EVENT_READ, std::bind(&ServerImpl::signal, this, std::placeholders::_1)); } else { logger->warn("Unable to create a pipe: %s", strerror(errno)); close_pipe = false; } bool ret = looper_->run(logger.get()); if (close_pipe) { looper_->remove(g_signal_pipe[0]); close(g_signal_pipe[0]); close(g_signal_pipe[1]); } return ret; } private: void signal(uint8_t) { looper_->quit(); } struct Client { unique_fd fd_; uint8_t events_{Looper::EVENT_WRITE}; std::unique_ptr in_; std::unique_ptr out_; uint32_t watchdog_{0}; std::chrono::steady_clock::time_point last_event_; explicit Client(unique_fd fd) : fd_(std::move(fd)) {} }; void listen_read(int fd, uint8_t events) { if (events & Looper::EVENT_ERROR) { auto it = std::find_if(listen_.begin(), listen_.end(), [fd](auto& listen) { return listen.get() == fd; }); if (it != listen_.end()) { logger_->warn("Listening socket error"); listen_.erase(it); if (listen_.empty()) { logger_->err("No listening sockets left"); looper_->quit(); return; } } else { assert(false); } return; } unique_fd client_fd = net::accept(logger_.get(), fd); if (client_fd) { logger_->dbg("New client: %d", client_fd.get()); auto client = new Client(std::move(client_fd)); client->in_ = Buffer::fixed(BUFFER_SIZE); client->out_ = Buffer::fixed(BUFFER_SIZE); clients_.push_back(client); looper_->add(client->fd_.get(), client->events_, std::bind(&ServerImpl::client_event, this, client, std::placeholders::_1)); client->watchdog_ = looper_->schedule( CLIENT_TIMEOUT.count(), std::bind(&ServerImpl::client_check_timeout, this, client, std::placeholders::_1)); } } void client_event(Client* client, uint8_t events) { if (events & Looper::EVENT_ERROR) { logger_->warn("%d: Unexpected client error", client->fd_.get()); remove_client(client); return; } client->last_event_ = std::chrono::steady_clock::now(); if (events & Looper::EVENT_READ && client->in_) { while (true) { size_t avail; auto* ptr = client->in_->wbuf(65535, avail); if (avail == 0) break; auto ret = read(client->fd_.get(), ptr, avail); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; logger_->warn("%d: Read error: %s", client->fd_.get(), strerror(errno)); break; } else if (ret == 0) { logger_->dbg("%d: Connection closed", client->fd_.get()); if (client->out_->empty()) { remove_client(client); return; } else { // Might be only read side that is shutdown, try writing the // rest before we give up. client->in_.reset(); break; } } else { client->in_->wcommit(ret); if (static_cast(ret) < avail) break; } } } // Always try to write, usually we write as a result of a read above // so saves having to call back to the loop just to answer. // And if there wasn't a read event then we already know there was an write // event. if (/* events & Looper::EVENT_WRITE */ true) { while (true) { size_t avail; auto* ptr = client->out_->rbuf(65535, avail); if (avail == 0) break; auto ret = write(client->fd_.get(), ptr, avail); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; logger_->warn("%d: Write error: %s", client->fd_.get(), strerror(errno)); break; } else { client->out_->rcommit(ret); if (static_cast(ret) < avail) break; } } } auto const old_events = client->events_; client->events_ = 0; if (client->in_) { if (!client->in_->full()) client->events_ |= Looper::EVENT_READ; if (!client->out_->empty()) client->events_ |= Looper::EVENT_WRITE; } else { if (client->out_->empty()) { remove_client(client); return; } else { client->events_ |= Looper::EVENT_WRITE; } } if (client->events_ != old_events) { looper_->update(client->fd_.get(), client->events_); } } void client_check_timeout(Client* client, uint32_t) { auto since_last = std::chrono::steady_clock::now() - client->last_event_; if (since_last >= CLIENT_TIMEOUT) { client->watchdog_ = 0; remove_client(client); return; } auto delay = CLIENT_TIMEOUT - since_last; client->watchdog_ = looper_->schedule( std::chrono::duration_cast(delay).count(), std::bind(&ServerImpl::client_check_timeout, this, client, std::placeholders::_1)); } void remove_client(Client* client) { if (client->fd_) looper_->remove(client->fd_.get()); if (client->watchdog_) looper_->cancel(client->watchdog_); auto it = std::find(clients_.begin(), clients_.end(), client); if (it != clients_.end()) { clients_.erase(it); } else { assert(false); } delete client; } std::shared_ptr looper_; std::shared_ptr logger_; std::vector listen_; std::vector clients_; }; } // namespace // static std::unique_ptr Server::create() { return std::make_unique(); }