diff options
| author | Joel Klinghed <the_jk@spawned.biz> | 2025-02-20 22:54:56 +0100 |
|---|---|---|
| committer | Joel Klinghed <the_jk@spawned.biz> | 2025-02-20 22:54:56 +0100 |
| commit | b4d6df902253637f24647d3db2bc3781d69eec1c (patch) | |
| tree | d8bf9ac04a270fabdfee1c15628c702471ef8bf5 /src/server.cc | |
| parent | 441cafc7124f633e5abc684e85a11ce3c991f6ae (diff) | |
Diffstat (limited to 'src/server.cc')
| -rw-r--r-- | src/server.cc | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/src/server.cc b/src/server.cc new file mode 100644 index 0000000..57315df --- /dev/null +++ b/src/server.cc @@ -0,0 +1,275 @@ +#include "server.hh" + +#include <algorithm> +#include <cassert> +#include <cerrno> +#include <chrono> +#include <cstring> +#include <signal.h> +#include <unistd.h> + +#include "buffer.hh" +#include "logger.hh" +#include "looper.hh" +#include "net.hh" + +namespace { + +constexpr auto CLIENT_TIMEOUT = std::chrono::seconds(30); +constexpr size_t BUFFER_SIZE = 1 * 1024 * 1024; + +int g_signal_pipe[2]; + +void signal_handler(int) { + write(g_signal_pipe[1], "1", 1); +} + +class ServerImpl : public Server { + public: + ServerImpl() + : looper_(Looper::create()) {} + + ~ServerImpl() override { + for (auto& fd : listen_) { + looper_->remove(fd.get()); + } + } + + bool setup(std::shared_ptr<Logger> logger, std::string const& host, + uint16_t port) override { + logger_ = logger; + listen_ = net::bind_and_listen(logger.get(), host, port); + if (listen_.empty()) { + logger->err("Unable to listen"); + return false; + } + + for (auto& fd : listen_) { + looper_->add(fd.get(), Looper::EVENT_READ, + std::bind(&ServerImpl::listen_read, this, fd.get(), + std::placeholders::_1)); + } + return true; + } + + bool run(std::shared_ptr<Logger> logger) override { + logger_ = logger; + + bool close_pipe; + if (pipe(g_signal_pipe) == 0) { + close_pipe = true; + struct sigaction sa; + sa.sa_handler = signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART; + sigaction(SIGINT, &sa, nullptr); + sigaction(SIGTERM, &sa, nullptr); + sigaction(SIGQUIT, &sa, nullptr); + + looper_->add(g_signal_pipe[0], Looper::EVENT_READ, + std::bind(&ServerImpl::signal, this, std::placeholders::_1)); + } else { + logger->warn("Unable to create a pipe: %s", strerror(errno)); + close_pipe = false; + } + + bool ret = looper_->run(logger.get()); + + if (close_pipe) { + looper_->remove(g_signal_pipe[0]); + close(g_signal_pipe[0]); + close(g_signal_pipe[1]); + } + return ret; + } + + private: + void signal(uint8_t) { + looper_->quit(); + } + + struct Client { + unique_fd fd_; + uint8_t events_{Looper::EVENT_WRITE}; + std::unique_ptr<Buffer> in_; + std::unique_ptr<Buffer> out_; + uint32_t watchdog_{0}; + std::chrono::steady_clock::time_point last_event_; + + explicit Client(unique_fd fd) + : fd_(std::move(fd)) {} + }; + + void listen_read(int fd, uint8_t events) { + if (events & Looper::EVENT_ERROR) { + auto it = std::find_if(listen_.begin(), listen_.end(), + [fd](auto& listen) { + return listen.get() == fd; + }); + if (it != listen_.end()) { + logger_->warn("Listening socket error"); + listen_.erase(it); + if (listen_.empty()) { + logger_->err("No listening sockets left"); + looper_->quit(); + return; + } + } else { + assert(false); + } + return; + } + + unique_fd client_fd = net::accept(logger_.get(), fd); + if (client_fd) { + logger_->dbg("New client: %d", client_fd.get()); + auto client = new Client(std::move(client_fd)); + client->in_ = Buffer::fixed(BUFFER_SIZE); + client->out_ = Buffer::fixed(BUFFER_SIZE); + clients_.push_back(client); + + looper_->add(client->fd_.get(), client->events_, + std::bind(&ServerImpl::client_event, + this, client, + std::placeholders::_1)); + client->watchdog_ = looper_->schedule( + CLIENT_TIMEOUT.count(), + std::bind(&ServerImpl::client_check_timeout, + this, client, + std::placeholders::_1)); + } + } + + void client_event(Client* client, uint8_t events) { + if (events & Looper::EVENT_ERROR) { + logger_->warn("%d: Unexpected client error", client->fd_.get()); + remove_client(client); + return; + } + + client->last_event_ = std::chrono::steady_clock::now(); + + if (events & Looper::EVENT_READ && client->in_) { + while (true) { + size_t avail; + auto* ptr = client->in_->wbuf(65535, avail); + if (avail == 0) + break; + auto ret = read(client->fd_.get(), ptr, avail); + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + if (errno == EINTR) + continue; + logger_->warn("%d: Read error: %s", client->fd_.get(), + strerror(errno)); + break; + } else if (ret == 0) { + logger_->dbg("%d: Connection closed", client->fd_.get()); + if (client->out_->empty()) { + remove_client(client); + return; + } else { + // Might be only read side that is shutdown, try writing the + // rest before we give up. + client->in_.reset(); + break; + } + } else { + client->in_->wcommit(ret); + if (static_cast<size_t>(ret) < avail) + break; + } + } + } + + // Always try to write, usually we write as a result of a read above + // so saves having to call back to the loop just to answer. + // And if there wasn't a read event then we already know there was an write + // event. + if (/* events & Looper::EVENT_WRITE */ true) { + while (true) { + size_t avail; + auto* ptr = client->out_->rbuf(65535, avail); + if (avail == 0) + break; + auto ret = write(client->fd_.get(), ptr, avail); + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + if (errno == EINTR) + continue; + logger_->warn("%d: Write error: %s", client->fd_.get(), + strerror(errno)); + break; + } else { + client->out_->rcommit(ret); + if (static_cast<size_t>(ret) < avail) + break; + } + } + } + + auto const old_events = client->events_; + client->events_ = 0; + + if (client->in_) { + if (!client->in_->full()) + client->events_ |= Looper::EVENT_READ; + if (!client->out_->empty()) + client->events_ |= Looper::EVENT_WRITE; + } else { + if (client->out_->empty()) { + remove_client(client); + return; + } else { + client->events_ |= Looper::EVENT_WRITE; + } + } + + if (client->events_ != old_events) { + looper_->update(client->fd_.get(), client->events_); + } + } + + void client_check_timeout(Client* client, uint32_t) { + auto since_last = std::chrono::steady_clock::now() - client->last_event_; + if (since_last >= CLIENT_TIMEOUT) { + client->watchdog_ = 0; + remove_client(client); + return; + } + auto delay = CLIENT_TIMEOUT - since_last; + client->watchdog_ = looper_->schedule( + std::chrono::duration_cast<std::chrono::seconds>(delay).count(), + std::bind(&ServerImpl::client_check_timeout, + this, client, + std::placeholders::_1)); + } + + void remove_client(Client* client) { + if (client->fd_) + looper_->remove(client->fd_.get()); + if (client->watchdog_) + looper_->cancel(client->watchdog_); + auto it = std::find(clients_.begin(), clients_.end(), client); + if (it != clients_.end()) { + clients_.erase(it); + } else { + assert(false); + } + delete client; + } + + std::shared_ptr<Looper> looper_; + std::shared_ptr<Logger> logger_; + std::vector<unique_fd> listen_; + std::vector<Client*> clients_; +}; + +} // namespace + +// static +std::unique_ptr<Server> Server::create() { + return std::make_unique<ServerImpl>(); +} |
