summaryrefslogtreecommitdiff
path: root/src/server.cc
diff options
context:
space:
mode:
authorJoel Klinghed <the_jk@spawned.biz>2025-02-20 22:54:56 +0100
committerJoel Klinghed <the_jk@spawned.biz>2025-02-20 22:54:56 +0100
commitb4d6df902253637f24647d3db2bc3781d69eec1c (patch)
treed8bf9ac04a270fabdfee1c15628c702471ef8bf5 /src/server.cc
parent441cafc7124f633e5abc684e85a11ce3c991f6ae (diff)
Initial commitHEADmain
Diffstat (limited to 'src/server.cc')
-rw-r--r--src/server.cc275
1 files changed, 275 insertions, 0 deletions
diff --git a/src/server.cc b/src/server.cc
new file mode 100644
index 0000000..57315df
--- /dev/null
+++ b/src/server.cc
@@ -0,0 +1,275 @@
+#include "server.hh"
+
+#include <algorithm>
+#include <cassert>
+#include <cerrno>
+#include <chrono>
+#include <cstring>
+#include <signal.h>
+#include <unistd.h>
+
+#include "buffer.hh"
+#include "logger.hh"
+#include "looper.hh"
+#include "net.hh"
+
+namespace {
+
+constexpr auto CLIENT_TIMEOUT = std::chrono::seconds(30);
+constexpr size_t BUFFER_SIZE = 1 * 1024 * 1024;
+
+int g_signal_pipe[2];
+
+void signal_handler(int) {
+ write(g_signal_pipe[1], "1", 1);
+}
+
+class ServerImpl : public Server {
+ public:
+ ServerImpl()
+ : looper_(Looper::create()) {}
+
+ ~ServerImpl() override {
+ for (auto& fd : listen_) {
+ looper_->remove(fd.get());
+ }
+ }
+
+ bool setup(std::shared_ptr<Logger> logger, std::string const& host,
+ uint16_t port) override {
+ logger_ = logger;
+ listen_ = net::bind_and_listen(logger.get(), host, port);
+ if (listen_.empty()) {
+ logger->err("Unable to listen");
+ return false;
+ }
+
+ for (auto& fd : listen_) {
+ looper_->add(fd.get(), Looper::EVENT_READ,
+ std::bind(&ServerImpl::listen_read, this, fd.get(),
+ std::placeholders::_1));
+ }
+ return true;
+ }
+
+ bool run(std::shared_ptr<Logger> logger) override {
+ logger_ = logger;
+
+ bool close_pipe;
+ if (pipe(g_signal_pipe) == 0) {
+ close_pipe = true;
+ struct sigaction sa;
+ sa.sa_handler = signal_handler;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_flags = SA_RESTART;
+ sigaction(SIGINT, &sa, nullptr);
+ sigaction(SIGTERM, &sa, nullptr);
+ sigaction(SIGQUIT, &sa, nullptr);
+
+ looper_->add(g_signal_pipe[0], Looper::EVENT_READ,
+ std::bind(&ServerImpl::signal, this, std::placeholders::_1));
+ } else {
+ logger->warn("Unable to create a pipe: %s", strerror(errno));
+ close_pipe = false;
+ }
+
+ bool ret = looper_->run(logger.get());
+
+ if (close_pipe) {
+ looper_->remove(g_signal_pipe[0]);
+ close(g_signal_pipe[0]);
+ close(g_signal_pipe[1]);
+ }
+ return ret;
+ }
+
+ private:
+ void signal(uint8_t) {
+ looper_->quit();
+ }
+
+ struct Client {
+ unique_fd fd_;
+ uint8_t events_{Looper::EVENT_WRITE};
+ std::unique_ptr<Buffer> in_;
+ std::unique_ptr<Buffer> out_;
+ uint32_t watchdog_{0};
+ std::chrono::steady_clock::time_point last_event_;
+
+ explicit Client(unique_fd fd)
+ : fd_(std::move(fd)) {}
+ };
+
+ void listen_read(int fd, uint8_t events) {
+ if (events & Looper::EVENT_ERROR) {
+ auto it = std::find_if(listen_.begin(), listen_.end(),
+ [fd](auto& listen) {
+ return listen.get() == fd;
+ });
+ if (it != listen_.end()) {
+ logger_->warn("Listening socket error");
+ listen_.erase(it);
+ if (listen_.empty()) {
+ logger_->err("No listening sockets left");
+ looper_->quit();
+ return;
+ }
+ } else {
+ assert(false);
+ }
+ return;
+ }
+
+ unique_fd client_fd = net::accept(logger_.get(), fd);
+ if (client_fd) {
+ logger_->dbg("New client: %d", client_fd.get());
+ auto client = new Client(std::move(client_fd));
+ client->in_ = Buffer::fixed(BUFFER_SIZE);
+ client->out_ = Buffer::fixed(BUFFER_SIZE);
+ clients_.push_back(client);
+
+ looper_->add(client->fd_.get(), client->events_,
+ std::bind(&ServerImpl::client_event,
+ this, client,
+ std::placeholders::_1));
+ client->watchdog_ = looper_->schedule(
+ CLIENT_TIMEOUT.count(),
+ std::bind(&ServerImpl::client_check_timeout,
+ this, client,
+ std::placeholders::_1));
+ }
+ }
+
+ void client_event(Client* client, uint8_t events) {
+ if (events & Looper::EVENT_ERROR) {
+ logger_->warn("%d: Unexpected client error", client->fd_.get());
+ remove_client(client);
+ return;
+ }
+
+ client->last_event_ = std::chrono::steady_clock::now();
+
+ if (events & Looper::EVENT_READ && client->in_) {
+ while (true) {
+ size_t avail;
+ auto* ptr = client->in_->wbuf(65535, avail);
+ if (avail == 0)
+ break;
+ auto ret = read(client->fd_.get(), ptr, avail);
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ break;
+ if (errno == EINTR)
+ continue;
+ logger_->warn("%d: Read error: %s", client->fd_.get(),
+ strerror(errno));
+ break;
+ } else if (ret == 0) {
+ logger_->dbg("%d: Connection closed", client->fd_.get());
+ if (client->out_->empty()) {
+ remove_client(client);
+ return;
+ } else {
+ // Might be only read side that is shutdown, try writing the
+ // rest before we give up.
+ client->in_.reset();
+ break;
+ }
+ } else {
+ client->in_->wcommit(ret);
+ if (static_cast<size_t>(ret) < avail)
+ break;
+ }
+ }
+ }
+
+ // Always try to write, usually we write as a result of a read above
+ // so saves having to call back to the loop just to answer.
+ // And if there wasn't a read event then we already know there was an write
+ // event.
+ if (/* events & Looper::EVENT_WRITE */ true) {
+ while (true) {
+ size_t avail;
+ auto* ptr = client->out_->rbuf(65535, avail);
+ if (avail == 0)
+ break;
+ auto ret = write(client->fd_.get(), ptr, avail);
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ break;
+ if (errno == EINTR)
+ continue;
+ logger_->warn("%d: Write error: %s", client->fd_.get(),
+ strerror(errno));
+ break;
+ } else {
+ client->out_->rcommit(ret);
+ if (static_cast<size_t>(ret) < avail)
+ break;
+ }
+ }
+ }
+
+ auto const old_events = client->events_;
+ client->events_ = 0;
+
+ if (client->in_) {
+ if (!client->in_->full())
+ client->events_ |= Looper::EVENT_READ;
+ if (!client->out_->empty())
+ client->events_ |= Looper::EVENT_WRITE;
+ } else {
+ if (client->out_->empty()) {
+ remove_client(client);
+ return;
+ } else {
+ client->events_ |= Looper::EVENT_WRITE;
+ }
+ }
+
+ if (client->events_ != old_events) {
+ looper_->update(client->fd_.get(), client->events_);
+ }
+ }
+
+ void client_check_timeout(Client* client, uint32_t) {
+ auto since_last = std::chrono::steady_clock::now() - client->last_event_;
+ if (since_last >= CLIENT_TIMEOUT) {
+ client->watchdog_ = 0;
+ remove_client(client);
+ return;
+ }
+ auto delay = CLIENT_TIMEOUT - since_last;
+ client->watchdog_ = looper_->schedule(
+ std::chrono::duration_cast<std::chrono::seconds>(delay).count(),
+ std::bind(&ServerImpl::client_check_timeout,
+ this, client,
+ std::placeholders::_1));
+ }
+
+ void remove_client(Client* client) {
+ if (client->fd_)
+ looper_->remove(client->fd_.get());
+ if (client->watchdog_)
+ looper_->cancel(client->watchdog_);
+ auto it = std::find(clients_.begin(), clients_.end(), client);
+ if (it != clients_.end()) {
+ clients_.erase(it);
+ } else {
+ assert(false);
+ }
+ delete client;
+ }
+
+ std::shared_ptr<Looper> looper_;
+ std::shared_ptr<Logger> logger_;
+ std::vector<unique_fd> listen_;
+ std::vector<Client*> clients_;
+};
+
+} // namespace
+
+// static
+std::unique_ptr<Server> Server::create() {
+ return std::make_unique<ServerImpl>();
+}