summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Klinghed <the_jk@spawned.biz>2025-02-20 22:54:56 +0100
committerJoel Klinghed <the_jk@spawned.biz>2025-02-20 22:54:56 +0100
commitb4d6df902253637f24647d3db2bc3781d69eec1c (patch)
treed8bf9ac04a270fabdfee1c15628c702471ef8bf5
parent441cafc7124f633e5abc684e85a11ce3c991f6ae (diff)
Initial commitHEADmain
-rw-r--r--meson.build62
-rw-r--r--src/args.cc293
-rw-r--r--src/args.hh56
-rw-r--r--src/buffer.cc233
-rw-r--r--src/buffer.hh25
-rw-r--r--src/logger.hh51
-rw-r--r--src/logger_base.cc68
-rw-r--r--src/logger_base.hh25
-rw-r--r--src/logger_file.cc52
-rw-r--r--src/logger_file.hh16
-rw-r--r--src/logger_null.cc18
-rw-r--r--src/logger_stdio.cc39
-rw-r--r--src/logger_syslog.cc49
-rw-r--r--src/logger_syslog.hh14
-rw-r--r--src/looper.hh38
-rw-r--r--src/looper_poll.cc224
-rw-r--r--src/main.cc153
-rw-r--r--src/net.cc136
-rw-r--r--src/net.hh22
-rw-r--r--src/ro_buffer.cc27
-rw-r--r--src/ro_buffer.hh23
-rw-r--r--src/server.cc275
-rw-r--r--src/server.hh26
-rw-r--r--src/unique_fd.cc9
-rw-r--r--src/unique_fd.hh48
-rw-r--r--subprojects/protobuf.wrap16
26 files changed, 1998 insertions, 0 deletions
diff --git a/meson.build b/meson.build
new file mode 100644
index 0000000..847c660
--- /dev/null
+++ b/meson.build
@@ -0,0 +1,62 @@
+project(
+ 'buildhelper',
+ 'cpp',
+ version : '0.1',
+ default_options : [
+ 'warning_level=3',
+ 'cpp_std=c++17',
+ ],
+)
+
+protobuf_dep = dependency(
+ 'protobuf-lite',
+ fallback: ['protobuf', 'protobuf_lite_dep'],
+)
+
+protoc = find_program('protoc', native: true)
+
+protoc_gen = generator(
+ protoc,
+ output: ['@BASENAME@.pb.cc', '@BASENAME@.pb.h'],
+ depfile: '@BASENAME@.deps',
+ arguments: ['-I@SOURCE_ROOT@/apis/remote-apis', '-I@SOURCE_ROOT@/apis/googleapis', '--cpp_out=@BUILD_DIR@', '--dependency_out=@DEPFILE@', '@INPUT@'],
+)
+
+google_apis = protoc_gen.process(
+ [
+ 'apis/googleapis/google/api/annotations.proto',
+ 'apis/googleapis/google/api/client.proto',
+ 'apis/googleapis/google/api/http.proto',
+ 'apis/googleapis/google/api/launch_stage.proto',
+ 'apis/googleapis/google/longrunning/operations.proto',
+ 'apis/googleapis/google/rpc/status.proto',
+ ],
+ preserve_path_from: meson.project_source_root() + '/apis/googleapis',
+)
+
+remote_apis = protoc_gen.process(
+ [
+ 'apis/remote-apis/build/bazel/semver/semver.proto',
+ 'apis/remote-apis/build/bazel/remote/execution/v2/remote_execution.proto',
+ ],
+ preserve_path_from: meson.project_source_root() + '/apis/remote-apis',
+)
+
+libapis = static_library('apis', [google_apis, remote_apis])
+
+executable('buildhelper',
+ 'src/args.cc',
+ 'src/buffer.cc',
+ 'src/logger_base.cc',
+ 'src/logger_file.cc',
+ 'src/logger_null.cc',
+ 'src/logger_stdio.cc',
+ 'src/logger_syslog.cc',
+ 'src/looper_poll.cc',
+ 'src/main.cc',
+ 'src/net.cc',
+ 'src/ro_buffer.cc',
+ 'src/server.cc',
+ 'src/unique_fd.cc',
+ link_with: libapis,
+ install : true)
diff --git a/src/args.cc b/src/args.cc
new file mode 100644
index 0000000..a89c418
--- /dev/null
+++ b/src/args.cc
@@ -0,0 +1,293 @@
+#include "args.hh"
+
+#include <cassert>
+#include <iostream>
+#include <unordered_map>
+#include <vector>
+
+namespace {
+
+class OptionImpl : public OptionWithArg {
+public:
+ OptionImpl(char short_name, std::string long_name, std::string description,
+ bool require_arg, std::string arg_description)
+ : short_name_(short_name),
+ long_name_(std::move(long_name)),
+ description_(std::move(description)),
+ require_arg_(require_arg),
+ arg_description_(std::move(arg_description)) {
+ }
+
+ bool is_set() const override { return set_; }
+
+ std::string const& arg() const override { return arg_; }
+
+ char short_name() const { return short_name_; }
+
+ std::string const& long_name() const { return long_name_; }
+
+ std::string const& description() const { return description_; }
+
+ bool require_arg() const { return require_arg_; }
+
+ std::string const& arg_description() const { return arg_description_; }
+
+ void reset() {
+ set_ = false;
+ arg_.clear();
+ }
+
+ void set() {
+ set_ = true;
+ }
+
+ void set_arg(std::string arg) {
+ arg_ = std::move(arg);
+ }
+
+private:
+ char const short_name_;
+ std::string const long_name_;
+ std::string const description_;
+ bool const require_arg_;
+ std::string const arg_description_;
+ bool set_ = false;
+ std::string arg_;
+};
+
+class ArgsImpl : public Args {
+public:
+ ArgsImpl() = default;
+
+ std::shared_ptr<Option const> add_option(char short_name,
+ std::string long_name,
+ std::string description) override {
+ prepare_option(short_name, long_name);
+ options_.push_back(std::make_shared<OptionImpl>(short_name,
+ std::move(long_name),
+ std::move(description),
+ false, std::string()));
+ return options_.back();
+ }
+
+ std::shared_ptr<OptionWithArg const> add_option_with_arg(
+ char short_name,
+ std::string long_name,
+ std::string description,
+ std::string arg_description) override {
+ prepare_option(short_name, long_name);
+ options_.push_back(std::make_shared<OptionImpl>(short_name,
+ std::move(long_name),
+ std::move(description),
+ true, arg_description));
+ return options_.back();
+ }
+
+ bool run(int argc, char** argv, std::string_view prgname, std::ostream& err,
+ std::vector<std::string>& out) override {
+ for (int a = 1; a < argc; ++a) {
+ if (argv[a][0] == '-') {
+ if (argv[a][1] == '-') {
+ if (argv[a][2] != '\0') {
+ // A long name with optional "=" argument
+ size_t len = 2;
+ while (argv[a][len] != '=' && argv[a][len])
+ ++len;
+ std::string name(argv[a] + 2, len - 2);
+ auto it = long_names_.find(name);
+ if (it == long_names_.end()) {
+ err << prgname << ": unrecognized option '--"
+ << name << "'" << std::endl;
+ return false;
+ }
+ auto* opt = options_[it->second].get();
+ opt->set();
+ if (argv[a][len]) {
+ if (opt->require_arg()) {
+ opt->set_arg(std::string(argv[a] + len + 1));
+ } else {
+ err << prgname << ": option '--"
+ << name << "' doesn't allow an argument" << std::endl;
+ return false;
+ }
+ } else {
+ if (opt->require_arg()) {
+ if (a + 1 >= argc) {
+ err << prgname << ": option '--"
+ << name << "' requires an argument" << std::endl;
+ return false;
+ } else {
+ opt->set_arg(argv[++a]);
+ }
+ }
+ }
+ continue;
+ } else {
+ // "--", all following values are arguments
+ for (++a; a < argc; ++a)
+ out.push_back(argv[a]);
+ break;
+ }
+ } else if (argv[a][1] != '\0') {
+ // One or more short names
+ for (auto* name = argv[a] + 1; *name; ++name) {
+ auto it = short_names_.find(*name);
+ if (it == short_names_.end()) {
+ err << prgname << ": invalid option -- '"
+ << *name << "'" << std::endl;
+ return false;
+ }
+ auto* opt = options_[it->second].get();
+ opt->set();
+ if (opt->require_arg()) {
+ if (a + 1 >= argc) {
+ err << prgname << ": option requires an argument"
+ << " -- '" << *name << "'" << std::endl;
+ return false;
+ } else {
+ opt->set_arg(argv[++a]);
+ }
+ }
+ }
+ continue;
+ } else {
+ // single "-", treat as argument
+ }
+ }
+
+ out.push_back(argv[a]);
+ }
+ return true;
+ }
+
+ void print_descriptions(std::ostream& out,
+ uint32_t column_width) const override {
+ uint32_t max_left = 0;
+ for (auto const& option : options_) {
+ uint32_t left = 0;
+ if (option->short_name() != '\0') {
+ if (!option->long_name().empty()) {
+ left = 6 + option->long_name().size(); // -S, --long
+ } else {
+ left = 2; // -S
+ }
+ } else if (!option->long_name().empty()) {
+ left = 2 + option->long_name().size(); // --long
+ }
+ if (option->require_arg())
+ left += 1 + option->arg_description().size(); // (=| )ARG
+ if (left > 0)
+ left += 2; // Need at least two spaces between option and desc
+ // Prefix with two spaces (either infront of option or desc)
+ left += 2;
+
+ if (left > max_left)
+ max_left = left;
+ }
+
+ uint32_t const avail_right =
+ max_left > column_width ? 0 : column_width - max_left;
+
+ if (avail_right < 20) {
+ // Fallback mode, description on its own row.
+ for (auto const& option : options_) {
+ print_option(out, *option);
+ out << '\n' << option->description() << '\n';
+ }
+ return;
+ }
+
+ // Check if all descriptions fit, justify to the right on a 80 col width
+ bool all_desc_fit = true;
+ uint32_t max_right = 0;
+ for (auto const& option : options_) {
+ uint32_t right = option->description().size();
+ if (right > avail_right) {
+ all_desc_fit = false;
+ break;
+ }
+ if (right > max_right)
+ max_right = right;
+ }
+
+ if (all_desc_fit)
+ max_left = std::max(80u, column_width) - max_right;
+
+ for (auto const& option : options_) {
+ out << " ";
+ uint32_t left = 2 + print_option(out, *option);
+ std::fill_n(std::ostreambuf_iterator<char>(out), max_left - left, ' ');
+
+ if (option->description().size() <= avail_right) {
+ out << option->description() << '\n';
+ continue;
+ }
+
+ // Wrap description
+ size_t last = 0;
+ bool first = true;
+ while (true) {
+ if (first) {
+ first = false;
+ } else {
+ std::fill_n(std::ostreambuf_iterator<char>(out), max_left, ' ');
+ }
+
+ size_t end = last + avail_right;
+ if (end >= option->description().size()) {
+ out << option->description().substr(last) << '\n';
+ break;
+ }
+ size_t space = option->description().rfind(' ', end);
+ if (space == std::string::npos || space < last) {
+ space = end;
+ }
+ out << option->description().substr(last, space - last) << '\n';
+ last = space < end ? space + 1 : end;
+ }
+ }
+ }
+
+private:
+ void prepare_option(char short_name, std::string const& long_name) {
+ if (short_name != '\0')
+ short_names_.emplace(short_name, options_.size());
+ if (!long_name.empty()) {
+ assert(long_name.find('=') == std::string::npos);
+ long_names_.emplace(long_name, options_.size());
+ }
+ }
+
+ size_t print_option(std::ostream& out, const OptionImpl& option) const {
+ bool only_short = false;
+ size_t ret = 0;
+ if (option.short_name() != '\0') {
+ out << '-' << option.short_name();
+ if (!option.long_name().empty()) {
+ out << ", --" << option.long_name();
+ ret = 6 + option.long_name().size();
+ } else {
+ ret = 2;
+ only_short = true;
+ }
+ } else if (!option.long_name().empty()) {
+ out << "--" << option.long_name();
+ ret = 2 + option.long_name().size();
+ }
+ if (option.require_arg()) {
+ out << (only_short ? ' ' : '=') << option.arg_description();
+ ret += 1 + option.arg_description().size();
+ }
+ return ret;
+ }
+
+ std::vector<std::shared_ptr<OptionImpl>> options_;
+ std::unordered_map<char, size_t> short_names_;
+ std::unordered_map<std::string, size_t> long_names_;
+};
+
+} // namespace
+
+std::unique_ptr<Args> Args::create() {
+ return std::make_unique<ArgsImpl>();
+}
diff --git a/src/args.hh b/src/args.hh
new file mode 100644
index 0000000..8442b16
--- /dev/null
+++ b/src/args.hh
@@ -0,0 +1,56 @@
+#ifndef ARGS_HH
+#define ARGS_HH
+
+#include <iosfwd>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+class Option {
+public:
+ virtual ~Option() = default;
+
+ virtual bool is_set() const = 0;
+
+protected:
+ Option() = default;
+ Option(Option const&) = delete;
+ Option& operator=(Option const&) = delete;
+};
+
+class OptionWithArg : public Option {
+public:
+ virtual std::string const& arg() const = 0;
+};
+
+class Args {
+public:
+ virtual ~Args() = default;
+
+ static std::unique_ptr<Args> create();
+
+ virtual std::shared_ptr<Option const> add_option(
+ char short_name,
+ std::string long_name,
+ std::string description) = 0;
+
+ virtual std::shared_ptr<OptionWithArg const> add_option_with_arg(
+ char short_name,
+ std::string long_name,
+ std::string description,
+ std::string arg_description) = 0;
+
+ virtual bool run(int argc, char** argv, std::string_view prgname,
+ std::ostream& err, std::vector<std::string>& out) = 0;
+
+ virtual void print_descriptions(std::ostream& out,
+ uint32_t column_width) const = 0;
+
+protected:
+ Args() = default;
+ Args(Args const&) = delete;
+ Args& operator=(Args const&) = delete;
+};
+
+#endif // ARGS_HH
diff --git a/src/buffer.cc b/src/buffer.cc
new file mode 100644
index 0000000..d39ec86
--- /dev/null
+++ b/src/buffer.cc
@@ -0,0 +1,233 @@
+#include "buffer.hh"
+
+#include <algorithm>
+#include <cassert>
+#include <vector>
+
+namespace {
+
+class Round : public Buffer {
+public:
+ explicit Round(size_t size)
+ : data_(std::make_unique<char[]>(size)), size_(size) {}
+
+ bool empty() const override {
+ return rptr_ == wptr_ && !full_;
+ }
+
+ bool full() const override {
+ return rptr_ == wptr_ && full_;
+ }
+
+ void clear() override {
+ rptr_ = wptr_ = 0;
+ full_ = false;
+ }
+
+ char const* rbuf(size_t want, size_t& avail) override {
+ if (rptr_ < wptr_) {
+ avail = wptr_ - rptr_;
+ } else if (rptr_ == wptr_ && !full_) {
+ avail = 0;
+ } else {
+ avail = size_ - rptr_;
+ if (want > avail) {
+ auto* target = data_.get();
+ for (; rptr_ < size_; ++rptr_) {
+ std::swap(*target, data_[rptr_]);
+ ++target;
+ }
+ rptr_ = 0;
+ wptr_ += avail;
+ if (wptr_ == size_) {
+ assert(full_);
+ wptr_ = 0;
+ avail = size_ - rptr_;
+ } else {
+ avail = wptr_ - rptr_;
+ }
+ }
+ }
+ return data_.get() + rptr_;
+ }
+
+ void rcommit(size_t bytes) override {
+ if (bytes == 0)
+ return;
+ full_ = false;
+ assert(rptr_ < wptr_ ? rptr_ + bytes <= wptr_ : rptr_ + bytes <= size_);
+ rptr_ += bytes;
+ if (rptr_ == size_)
+ rptr_ = 0;
+ if (rptr_ == wptr_)
+ rptr_ = wptr_ = 0;
+ }
+
+ char* wbuf(size_t request, size_t& avail) override {
+ if (wptr_ < rptr_) {
+ avail = rptr_ - wptr_;
+ } else if (rptr_ == wptr_ && full_) {
+ avail = 0;
+ } else {
+ avail = size_ - wptr_;
+ if (avail < request && wptr_ != 0 && rptr_ != 0) {
+ std::copy(data_.get() + rptr_, data_.get() + wptr_, data_.get());
+ auto size = wptr_ - rptr_;
+ wptr_ = size;
+ rptr_ = 0;
+ avail = size_ - wptr_;
+ }
+ }
+ return data_.get() + wptr_;
+ }
+
+ void wcommit(size_t bytes) override {
+ if (bytes == 0)
+ return;
+ assert(wptr_ < rptr_ ? wptr_ + bytes <= rptr_ : wptr_ + bytes <= size_);
+ wptr_ += bytes;
+ if (wptr_ == size_)
+ wptr_ = 0;
+ if (wptr_ == rptr_)
+ full_ = true;
+ }
+
+private:
+ std::unique_ptr<char[]> data_;
+ size_t const size_;
+ size_t rptr_{0};
+ size_t wptr_{0};
+ bool full_{false};
+};
+
+class Growing : public Buffer {
+public:
+ Growing(size_t base_size, size_t max_size)
+ : base_size_(base_size), max_size_(max_size), data_(base_size) {
+ }
+
+ bool empty() const override {
+ return rptr_ == wptr_;
+ }
+
+ bool full() const override {
+ return rptr_ == 0 && wptr_ == max_size_;
+ }
+
+ void clear() override {
+ data_.resize(base_size_);
+ rptr_ = wptr_ = 0;
+ }
+
+ char const* rbuf(size_t, size_t& avail) override {
+ avail = wptr_ - rptr_;
+ return data_.data() + rptr_;
+ }
+
+ void rcommit(size_t bytes) override {
+ assert(rptr_ + bytes <= wptr_);
+ rptr_ += bytes;
+ if (rptr_ == wptr_)
+ rptr_ = wptr_ = 0;
+ }
+
+ char* wbuf(size_t request, size_t& avail) override {
+ avail = data_.size() - wptr_;
+ if (request > avail && rptr_ > 0) {
+ std::copy(data_.begin() + rptr_, data_.begin() + wptr_, data_.begin());
+ wptr_ -= rptr_;
+ rptr_ = 0;
+ avail = data_.size() - wptr_;
+ }
+ if (request > avail && data_.size() < max_size_) {
+ data_.resize(
+ std::min(max_size_,
+ data_.size() + std::max(request - avail,
+ (max_size_ - base_size_) / 8)));
+ avail = data_.size() - wptr_;
+ }
+ return data_.data() + wptr_;
+ }
+
+ void wcommit(size_t bytes) override {
+ assert(wptr_ + bytes <= data_.size());
+ wptr_ += bytes;
+ }
+
+private:
+ size_t const base_size_;
+ size_t const max_size_;
+ std::vector<char> data_;
+ size_t rptr_{0};
+ size_t wptr_{0};
+};
+
+class Null : public Buffer {
+public:
+ bool empty() const override {
+ return true;
+ }
+
+ char const* rbuf(size_t, size_t& avail) override {
+ avail = 0;
+ return buf_;
+ }
+
+ void rcommit(size_t bytes) override {
+ assert(bytes == 0);
+ }
+
+ bool full() const override {
+ return false;
+ }
+
+ void clear() override {}
+
+ char* wbuf(size_t, size_t& avail) override {
+ avail = sizeof(buf_);
+ return buf_;
+ }
+
+ void wcommit(size_t) override {
+ }
+
+private:
+ char buf_[4096];
+};
+
+} // namespace
+
+std::unique_ptr<Buffer> Buffer::fixed(size_t size) {
+ return std::make_unique<Round>(size);
+}
+
+std::unique_ptr<Buffer> Buffer::growing(size_t base_size, size_t max_size) {
+ return std::make_unique<Growing>(base_size, max_size);
+}
+
+std::unique_ptr<Buffer> Buffer::null() {
+ return std::make_unique<Null>();
+}
+
+size_t Buffer::write(void const* data, size_t len) {
+ assert(data);
+ if (len == 0)
+ return 0;
+ auto* d = reinterpret_cast<char const*>(data);
+ size_t wrote = 0;
+ while (true) {
+ size_t avail;
+ auto want = len - wrote;
+ auto* ptr = wbuf(want, avail);
+ if (avail == 0)
+ return wrote;
+ if (avail >= want) {
+ std::copy_n(d + wrote, want, ptr);
+ wcommit(want);
+ return len;
+ }
+ std::copy_n(d + wrote, avail, ptr);
+ wcommit(avail);
+ wrote += avail;
+ }
+}
diff --git a/src/buffer.hh b/src/buffer.hh
new file mode 100644
index 0000000..ae02e23
--- /dev/null
+++ b/src/buffer.hh
@@ -0,0 +1,25 @@
+#ifndef BUFFER_HH
+#define BUFFER_HH
+
+#include "ro_buffer.hh"
+
+#include <memory>
+
+class Buffer : public RoBuffer {
+public:
+ static std::unique_ptr<Buffer> fixed(size_t size);
+ static std::unique_ptr<Buffer> growing(size_t base_size, size_t max_size);
+ // Acts as /dev/null, ie always empty, can write anything to it.
+ static std::unique_ptr<Buffer> null();
+
+ virtual bool full() const = 0;
+
+ virtual void clear() = 0;
+
+ virtual char* wbuf(size_t request, size_t& avail) = 0;
+ virtual void wcommit(size_t bytes) = 0;
+
+ size_t write(void const* data, size_t len);
+};
+
+#endif // BUFFER_HH
diff --git a/src/logger.hh b/src/logger.hh
new file mode 100644
index 0000000..48d21a7
--- /dev/null
+++ b/src/logger.hh
@@ -0,0 +1,51 @@
+#ifndef LOGGER_HH
+#define LOGGER_HH
+
+#include <memory>
+
+class Logger {
+public:
+ virtual ~Logger() = default;
+
+ static std::unique_ptr<Logger> create_stdio();
+ static std::unique_ptr<Logger> create_null();
+
+ virtual void err(char const* format, ...)
+#if HAVE_ATTRIBUTE_FORMAT
+ __attribute__((format(printf, 2, 3))) // this takes up 1
+#endif // HAVE_ATTRIBUTE_FORMAT
+ = 0;
+
+ virtual void warn(char const* format, ...)
+#if HAVE_ATTRIBUTE_FORMAT
+ __attribute__((format(printf, 2, 3)))
+#endif // HAVE_ATTRIBUTE_FORMAT
+ = 0;
+
+ virtual void info(char const* format, ...)
+#if HAVE_ATTRIBUTE_FORMAT
+ __attribute__((format(printf, 2, 3)))
+#endif // HAVE_ATTRIBUTE_FORMAT
+ = 0;
+
+#ifdef NDEBUG
+ void dbg(char const*, ...)
+#if HAVE_ATTRIBUTE_FORMAT
+ __attribute__((format(printf, 2, 3)))
+#endif // HAVE_ATTRIBUTE_FORMAT
+ {}
+#else
+ virtual void dbg(char const* format, ...)
+#if HAVE_ATTRIBUTE_FORMAT
+ __attribute__((format(printf, 2, 3)))
+#endif // HAVE_ATTRIBUTE_FORMAT
+ = 0;
+#endif
+
+protected:
+ Logger() = default;
+ Logger(Logger const&) = delete;
+ Logger& operator=(Logger const&) = delete;
+};
+
+#endif // LOGGER_HH
diff --git a/src/logger_base.cc b/src/logger_base.cc
new file mode 100644
index 0000000..0bd4e98
--- /dev/null
+++ b/src/logger_base.cc
@@ -0,0 +1,68 @@
+#include "logger_base.hh"
+
+#include <cassert>
+#include <cstdarg>
+#include <cstdio>
+#include <cstdlib>
+
+void LoggerBase::err(char const* format, ...) {
+ va_list args;
+ va_start(args, format);
+ char* tmp = nullptr;
+ auto len = vasprintf(&tmp, format, args);
+ va_end(args);
+ if (len == -1) {
+ assert(false);
+ return;
+ }
+ while (len > 0 && tmp[len - 1] == '\n') --len;
+ msg(Level::ERR, std::string_view(tmp, len));
+ free(tmp);
+}
+
+void LoggerBase::warn(char const* format, ...) {
+ va_list args;
+ va_start(args, format);
+ char* tmp = nullptr;
+ auto len = vasprintf(&tmp, format, args);
+ va_end(args);
+ if (len == -1) {
+ assert(false);
+ return;
+ }
+ while (len > 0 && tmp[len - 1] == '\n') --len;
+ msg(Level::WARN, std::string_view(tmp, len));
+ free(tmp);
+}
+
+void LoggerBase::info(char const* format, ...) {
+ va_list args;
+ va_start(args, format);
+ char* tmp = nullptr;
+ auto len = vasprintf(&tmp, format, args);
+ va_end(args);
+ if (len == -1) {
+ assert(false);
+ return;
+ }
+ while (len > 0 && tmp[len - 1] == '\n') --len;
+ msg(Level::INFO, std::string_view(tmp, len));
+ free(tmp);
+}
+
+#ifndef NDEBUG
+void LoggerBase::dbg(char const* format, ...) {
+ va_list args;
+ va_start(args, format);
+ char* tmp = nullptr;
+ auto len = vasprintf(&tmp, format, args);
+ va_end(args);
+ if (len == -1) {
+ assert(false);
+ return;
+ }
+ while (len > 0 && tmp[len - 1] == '\n') --len;
+ msg(Level::DBG, std::string_view(tmp, len));
+ free(tmp);
+}
+#endif
diff --git a/src/logger_base.hh b/src/logger_base.hh
new file mode 100644
index 0000000..56911f0
--- /dev/null
+++ b/src/logger_base.hh
@@ -0,0 +1,25 @@
+#ifndef LOGGER_BASE_HH
+#define LOGGER_BASE_HH
+
+#include <string_view>
+
+#include "logger.hh"
+
+class LoggerBase : public Logger {
+public:
+ void err(char const* format, ...) override;
+ void warn(char const* format, ...) override;
+ void info(char const* format, ...) override;
+#ifndef NDEBUG
+ void dbg(char const* format, ...) override;
+#endif
+
+protected:
+ enum class Level {
+ ERR, WARN, INFO, DBG
+ };
+
+ virtual void msg(Level level, std::string_view str) = 0;
+};
+
+#endif // LOGGER_BASE_HH
diff --git a/src/logger_file.cc b/src/logger_file.cc
new file mode 100644
index 0000000..d813b2a
--- /dev/null
+++ b/src/logger_file.cc
@@ -0,0 +1,52 @@
+#include "logger_file.hh"
+
+#include <fstream>
+#include <mutex>
+
+#include "logger_base.hh"
+
+namespace {
+
+class LoggerFileImpl : public LoggerBase {
+public:
+ explicit LoggerFileImpl(std::filesystem::path const& path)
+ : out_(path, std::ios::out | std::ios::app) {}
+
+ bool good() const {
+ return out_.good();
+ }
+
+protected:
+ void msg(Level lvl, std::string_view msg) override {
+ std::lock_guard<std::mutex> lock(mutex_);
+ switch (lvl) {
+ case Level::ERR:
+ out_ << "Error: " << msg << std::endl;
+ break;
+ case Level::WARN:
+ out_ << "Warning: " << msg << std::endl;
+ break;
+ case Level::INFO:
+ out_ << msg << std::endl;
+ break;
+ case Level::DBG:
+ out_ << "Debug: " << msg << std::endl;
+ break;
+ }
+ }
+
+private:
+ std::mutex mutex_;
+ std::fstream out_;
+};
+
+} // namespace
+
+std::unique_ptr<Logger> LoggerFile::create(std::filesystem::path const& path,
+ Logger* fallback) {
+ auto logger = std::make_unique<LoggerFileImpl>(path);
+ if (logger->good())
+ return logger;
+ fallback->warn("Unable to open %s for appending.", path.c_str());
+ return nullptr;
+}
diff --git a/src/logger_file.hh b/src/logger_file.hh
new file mode 100644
index 0000000..131798c
--- /dev/null
+++ b/src/logger_file.hh
@@ -0,0 +1,16 @@
+#ifndef LOGGER_FILE_HH
+#define LOGGER_FILE_HH
+
+#include <memory>
+#include <filesystem>
+
+#include "logger.hh"
+
+class LoggerFile : public Logger {
+ public:
+ // Can return nullptr, will write reason to fallback.
+ static std::unique_ptr<Logger> create(std::filesystem::path const& path,
+ Logger* fallback);
+};
+
+#endif // LOGGER_FILE_HH
diff --git a/src/logger_null.cc b/src/logger_null.cc
new file mode 100644
index 0000000..2294a70
--- /dev/null
+++ b/src/logger_null.cc
@@ -0,0 +1,18 @@
+#include "logger_base.hh"
+
+namespace {
+
+class LoggerNull : public LoggerBase {
+public:
+ LoggerNull() = default;
+
+protected:
+ void msg(Level, std::string_view) override {
+ }
+};
+
+} // namespace
+
+std::unique_ptr<Logger> Logger::create_null() {
+ return std::make_unique<LoggerNull>();
+}
diff --git a/src/logger_stdio.cc b/src/logger_stdio.cc
new file mode 100644
index 0000000..e1f26d2
--- /dev/null
+++ b/src/logger_stdio.cc
@@ -0,0 +1,39 @@
+#include "logger_base.hh"
+
+#include <iostream>
+#include <mutex>
+
+namespace {
+
+class LoggerStdio : public LoggerBase {
+public:
+ LoggerStdio() = default;
+
+protected:
+ void msg(Level lvl, std::string_view msg) override {
+ std::lock_guard<std::mutex> lock(mutex_);
+ switch (lvl) {
+ case Level::ERR:
+ std::cerr << "Error: " << msg << std::endl;
+ break;
+ case Level::WARN:
+ std::cout << "Warning: " << msg << std::endl;
+ break;
+ case Level::INFO:
+ std::cout << msg << std::endl;
+ break;
+ case Level::DBG:
+ std::cout << "Debug: " << msg << std::endl;
+ break;
+ }
+ }
+
+private:
+ std::mutex mutex_;
+};
+
+} // namespace
+
+std::unique_ptr<Logger> Logger::create_stdio() {
+ return std::make_unique<LoggerStdio>();
+}
diff --git a/src/logger_syslog.cc b/src/logger_syslog.cc
new file mode 100644
index 0000000..54cdaf3
--- /dev/null
+++ b/src/logger_syslog.cc
@@ -0,0 +1,49 @@
+#include "logger_syslog.hh"
+
+#include <string>
+#include <syslog.h>
+
+#include "logger_base.hh"
+
+namespace {
+
+class LoggerSyslogImpl : public LoggerBase {
+public:
+ explicit LoggerSyslogImpl(std::string const& prgname)
+ : ident_(prgname) {
+ openlog(ident_.c_str(), LOG_PID, LOG_DAEMON);
+ }
+
+ ~LoggerSyslogImpl() override {
+ closelog();
+ }
+
+protected:
+ void msg(Level lvl, std::string_view msg) override {
+ int prio;
+ switch (lvl) {
+ case Level::ERR:
+ prio = LOG_ERR;
+ break;
+ case Level::WARN:
+ prio = LOG_WARNING;
+ break;
+ case Level::INFO:
+ prio = LOG_INFO;
+ break;
+ case Level::DBG:
+ prio = LOG_DEBUG;
+ break;
+ }
+ syslog(prio, "%.*s", static_cast<int>(msg.length()), msg.data());
+ }
+
+private:
+ std::string const ident_;
+};
+
+} // namespace
+
+std::unique_ptr<Logger> LoggerSyslog::create(std::string const& prgname) {
+ return std::make_unique<LoggerSyslogImpl>(prgname);
+}
diff --git a/src/logger_syslog.hh b/src/logger_syslog.hh
new file mode 100644
index 0000000..1417edc
--- /dev/null
+++ b/src/logger_syslog.hh
@@ -0,0 +1,14 @@
+#ifndef LOGGER_SYSLOG_HH
+#define LOGGER_SYSLOG_HH
+
+#include <memory>
+#include <string>
+
+#include "logger.hh"
+
+class LoggerSyslog : public Logger {
+ public:
+ static std::unique_ptr<Logger> create(std::string const& prgname);
+};
+
+#endif // LOGGER_SYSLOG_HH
diff --git a/src/looper.hh b/src/looper.hh
new file mode 100644
index 0000000..3286cb1
--- /dev/null
+++ b/src/looper.hh
@@ -0,0 +1,38 @@
+#ifndef LOOPER_HH
+#define LOOPER_HH
+
+#include <functional>
+#include <memory>
+
+class Logger;
+
+class Looper {
+public:
+ constexpr static uint8_t EVENT_READ = 1;
+ constexpr static uint8_t EVENT_WRITE = 2;
+ constexpr static uint8_t EVENT_ERROR = 4;
+
+ virtual ~Looper() = default;
+
+ static std::unique_ptr<Looper> create();
+
+ virtual void add(int fd, uint8_t events,
+ std::function<void(uint8_t)> callback) = 0;
+ virtual void update(int fd, uint8_t events) = 0;
+ virtual void remove(int fd) = 0;
+
+ // Returned id is never 0
+ virtual uint32_t schedule(double delay,
+ std::function<void(uint32_t)> callback) = 0;
+ virtual void cancel(uint32_t id) = 0;
+
+ virtual bool run(Logger* logger) = 0;
+ virtual void quit() = 0;
+
+protected:
+ Looper() = default;
+ Looper(Looper const&) = delete;
+ Looper& operator=(Looper const&) = delete;
+};
+
+#endif // LOOPER_HH
diff --git a/src/looper_poll.cc b/src/looper_poll.cc
new file mode 100644
index 0000000..1945583
--- /dev/null
+++ b/src/looper_poll.cc
@@ -0,0 +1,224 @@
+#include "looper.hh"
+
+#include <algorithm>
+#include <cassert>
+#include <chrono>
+#include <deque>
+#include <errno.h>
+#include <poll.h>
+#include <string.h>
+#include <unordered_map>
+#include <vector>
+
+#include "logger.hh"
+
+namespace {
+
+class LooperPoll : public Looper {
+public:
+ LooperPoll() = default;
+
+ void add(int fd, uint8_t events,
+ std::function<void(uint8_t)> callback) override {
+ if (fd < 0)
+ return;
+ auto ret = entry_.emplace(fd, Entry(events));
+ if (!ret.second) {
+ assert(ret.first->second.delete_);
+ ret.first->second.delete_ = false;
+ ret.first->second.events_ = events;
+ }
+ ret.first->second.callback_ = std::move(callback);
+ }
+
+ void update(int fd, uint8_t events) override {
+ if (fd < 0)
+ return;
+ auto it = entry_.find(fd);
+ if (it == entry_.end() || it->second.delete_) {
+ assert(false);
+ return;
+ }
+ it->second.events_ = events;
+ }
+
+ void remove(int fd) override {
+ if (fd < 0)
+ return;
+ auto it = entry_.find(fd);
+ if (it == entry_.end())
+ return;
+ it->second.delete_ = true;
+ }
+
+ bool run(Logger* logger) override {
+ while (!quit_) {
+ int timeout;
+ if (scheduled_.empty()) {
+ timeout = -1;
+ } else {
+ auto now = std::chrono::steady_clock::now();
+ while (true) {
+ if (now < scheduled_.front().target_) {
+ auto delay = std::chrono::duration_cast<std::chrono::milliseconds>(
+ scheduled_.front().target_ - now);
+ if (delay.count() <= std::numeric_limits<int>::max())
+ timeout = delay.count();
+ else
+ timeout = std::numeric_limits<int>::max();
+ break;
+ }
+ auto id = scheduled_.front().id_;
+ auto callback = std::move(scheduled_.front().callback_);
+ scheduled_.pop_front();
+ callback(id);
+ if (scheduled_.empty()) {
+ timeout = -1;
+ break;
+ }
+ }
+ // Scheduled callbacks might call quit().
+ if (quit_)
+ break;
+ }
+ std::vector<struct pollfd> pollfd;
+ pollfd.reserve(entry_.size());
+ auto it = entry_.begin();
+ while (it != entry_.end()) {
+ if (it->second.delete_) {
+ it = entry_.erase(it);
+ } else {
+ struct pollfd tmp;
+ tmp.fd = it->first;
+ tmp.events = events_looper2poll(it->second.events_);
+ pollfd.push_back(std::move(tmp));
+ ++it;
+ }
+ }
+ int active = poll(pollfd.data(), pollfd.size(), timeout);
+ if (active < 0) {
+ if (errno == EINTR)
+ continue;
+ logger->err("Poll failed: %s", strerror(errno));
+ return false;
+ }
+ for (auto it2 = pollfd.begin(); active; ++it2) {
+ if (it2->revents == 0)
+ continue;
+ --active;
+ auto events = events_poll2looper(it2->revents);
+ if (events) {
+ it = entry_.find(it2->fd);
+ if (!it->second.delete_) {
+ events &= (it->second.events_ | EVENT_ERROR);
+ if (events) {
+ it->second.callback_(events);
+ }
+ }
+ }
+ }
+ }
+ // Reset quit_ so run() can be called again
+ quit_ = false;
+ return true;
+ }
+
+ void quit() override {
+ quit_ = true;
+ }
+
+ uint32_t schedule(double delay,
+ std::function<void(uint32_t)> callback) override {
+ assert(delay >= 0.0);
+ uint32_t id = next_schedule_id();
+ auto target = std::chrono::steady_clock::now() +
+ std::chrono::duration_cast<std::chrono::steady_clock::duration>(
+ std::chrono::duration<double>(delay));
+ auto insert = scheduled_.end();
+ while (insert != scheduled_.begin()) {
+ auto prev = insert - 1;
+ if (prev->target_ < target)
+ break;
+ insert = prev;
+ }
+ scheduled_.emplace(insert, std::move(callback), id, target);
+ return id;
+ }
+
+ void cancel(uint32_t id) override {
+ auto it = std::find_if(scheduled_.begin(), scheduled_.end(),
+ [id](auto const& scheduled) {
+ return scheduled.id_ == id;
+ });
+ if (it != scheduled_.end()) {
+ scheduled_.erase(it);
+ } else {
+ assert(false);
+ }
+ }
+
+private:
+ struct Entry {
+ uint8_t events_;
+ std::function<void(uint8_t)> callback_;
+ bool delete_;
+
+ explicit Entry(uint8_t events)
+ : events_(events), delete_(false) {}
+ };
+
+ struct Scheduled {
+ std::function<void(uint32_t)> callback_;
+ uint32_t id_;
+ std::chrono::steady_clock::time_point target_;
+
+ Scheduled(std::function<void(uint32_t)> callback, uint32_t id,
+ std::chrono::steady_clock::time_point target)
+ : callback_(std::move(callback)), id_(id), target_(target) {}
+ };
+
+ static short events_looper2poll(uint8_t events) {
+ short ret = 0;
+ if (events & EVENT_READ)
+ ret |= POLLIN | POLLPRI;
+ if (events & EVENT_WRITE)
+ ret |= POLLOUT;
+ return ret;
+ }
+
+ static uint8_t events_poll2looper(short events) {
+ uint8_t ret = 0;
+ if (events & (POLLIN | POLLPRI | POLLHUP))
+ ret |= EVENT_READ;
+ if (events & POLLOUT)
+ ret |= EVENT_WRITE;
+ if (events & (POLLERR | POLLNVAL))
+ ret |= EVENT_ERROR;
+ return ret;
+ }
+
+ uint32_t next_schedule_id() {
+ while (true) {
+ uint32_t ret = next_schedule_id_++;
+ if (ret) {
+ bool found = std::any_of(scheduled_.begin(), scheduled_.end(),
+ [ret](auto const& scheduled) {
+ return scheduled.id_ == ret;
+ });
+ if (!found)
+ return ret;
+ }
+ }
+ }
+
+ bool quit_{false};
+ std::unordered_map<int, Entry> entry_;
+ uint32_t next_schedule_id_{1};
+ std::deque<Scheduled> scheduled_;
+};
+
+} // namespace
+
+std::unique_ptr<Looper> Looper::create() {
+ return std::make_unique<LooperPoll>();
+}
diff --git a/src/main.cc b/src/main.cc
new file mode 100644
index 0000000..980adba
--- /dev/null
+++ b/src/main.cc
@@ -0,0 +1,153 @@
+#include <cerrno>
+#include <charconv>
+#include <cstring>
+#include <iostream>
+#include <unistd.h>
+
+#include "args.hh"
+#include "logger.hh"
+#include "logger_file.hh"
+#include "logger_syslog.hh"
+#include "server.hh"
+
+namespace {
+
+struct Context {
+ std::shared_ptr<Logger> logger_;
+ bool verbose_{false};
+ bool daemon_{false};
+ std::string host_{"localhost"};
+ uint16_t port_{12000};
+};
+
+bool parse_args(int argc, char** argv, std::shared_ptr<Logger> logger,
+ Context& context, int& exit_code) {
+ auto args = Args::create();
+ auto opt_help = args->add_option('h', "help", "display this text and exit");
+ auto opt_version = args->add_option(
+ 'V', "version", "display version and exit");
+ auto opt_verbose = args->add_option('v', "verbose", "be verbose");
+ auto opt_daemon = args->add_option(
+ 'd', "daemon", "fork into the background as a daemon");
+ auto opt_logfile = args->add_option_with_arg(
+ 'L', "log", "log to FILE instead of default ("
+ "syslog for daemon, stdout otherwise)", "FILE");
+ auto opt_host = args->add_option_with_arg(
+ 'h', "host", "HOST to listen for requests on (default localhost)",
+ "HOST");
+ auto opt_port = args->add_option_with_arg(
+ 'p', "port", "PORT to listen for requests on (default 12000)",
+ "PORT");
+ std::vector<std::string> extra;
+ if (!args->run(argc, argv, "buildhelper", std::cerr, extra)) {
+ std::cerr << "Try `buildhelper --help` for usage." << std::endl;
+ exit_code = 1;
+ return false;
+ }
+ if (opt_help->is_set()) {
+ std::cout << "Usage `buildhelper [OPTIONS]`.\n"
+ << "Runs a buildhelper instance, implementing the Remote Execution API. Answers requests from bazel and others.\n"
+ << "\n"
+ << "Options:" << std::endl;
+ args->print_descriptions(std::cout, 80);
+ exit_code = 0;
+ return false;
+ }
+ if (opt_version->is_set()) {
+ std::cout << "buildhelper 0.1 written by Joel Klinghed <the_jk@spawned.biz>"
+ << std::endl;
+ exit_code = 0;
+ return false;
+ }
+
+ context.verbose_ = opt_verbose->is_set();
+ context.daemon_ = opt_daemon->is_set();
+
+ if (opt_host->is_set()) {
+ context.host_ = opt_host->arg();
+ }
+
+ if (opt_port->is_set()) {
+ auto const& arg = opt_port->arg();
+ auto [ptr, err] =
+ std::from_chars(arg.data(), arg.data() + arg.size(), context.port_);
+ if (ptr != arg.data() + arg.size() || err != std::errc()) {
+ std::cerr << "Invalid port value: " << arg << std::endl;
+ exit_code = 1;
+ return false;
+ }
+ }
+
+ if (opt_logfile->is_set()) {
+ if (opt_logfile->arg() == "-") {
+ if (context.daemon_) {
+ std::cerr << "Cannot log to stdout AND fork into background."
+ << std::endl;
+ exit_code = 1;
+ return false;
+ }
+ context.logger_ = logger;
+ } else {
+ context.logger_ = LoggerFile::create(
+ std::filesystem::path(opt_logfile->arg()),
+ logger.get());
+ }
+ } else {
+ if (context.daemon_) {
+ context.logger_ = LoggerSyslog::create("buildhelper");
+ } else {
+ context.logger_ = logger;
+ }
+ }
+
+ return true;
+}
+
+} // namespace
+
+int main(int argc, char** argv) {
+ std::shared_ptr<Logger> logger = Logger::create_stdio();
+ Context context;
+ int exit_code = 0;
+ if (!parse_args(argc, argv, logger, context, exit_code)) {
+ return exit_code;
+ }
+
+ auto server = Server::create();
+ if (!server->setup(logger, context.host_, context.port_)) {
+ return 1;
+ }
+
+ if (context.daemon_) {
+ auto pid = fork();
+ if (pid == -1) {
+ logger->err("Unable to fork: %s", strerror(errno));
+ return 1;
+ }
+ if (pid == 0) {
+ // Child process
+ // Close stdin, stdout and stderr.
+ close(0);
+ close(1);
+ close(2);
+
+ // Chdir to root, avoid keeping started directory alive
+ chdir("/");
+
+ // Create new session
+ setsid();
+ } else {
+ // Parent process
+ if (context.verbose_)
+ logger->info("Forked as %ld", static_cast<long>(pid));
+ // Don't run atexit or anything like that, all is owned by child.
+ _exit(0);
+ }
+ }
+
+ logger.reset();
+
+ if (server->run(context.logger_))
+ return 0;
+ return 1;
+}
diff --git a/src/net.cc b/src/net.cc
new file mode 100644
index 0000000..bb25e69
--- /dev/null
+++ b/src/net.cc
@@ -0,0 +1,136 @@
+#include "net.hh"
+
+#include <charconv>
+#include <cerrno>
+#include <cstring>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+
+namespace net {
+
+std::vector<unique_fd> bind_and_listen(
+ Logger* logger, std::string const& host, uint16_t port) {
+ std::vector<unique_fd> fds;
+
+ std::string service;
+ service.resize(10);
+ {
+ auto [ptr, err] = std::to_chars(service.data(),
+ service.data() + service.size(), port);
+ if (err != std::errc{}) {
+ logger->err("Invalid port: %u", static_cast<unsigned int>(port));
+ return {};
+ }
+ service.resize(ptr - service.data());
+ }
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV | AI_ADDRCONFIG;
+ struct addrinfo* result;
+
+ if (auto err = getaddrinfo(host.empty() ? nullptr : host.c_str(),
+ service.c_str(),
+ &hints, &result)) {
+ logger->err("Unable to resolve %s:%u: %s",
+ host.c_str(), static_cast<unsigned int>(port),
+ gai_strerror(err));
+ return {};
+ }
+
+ for (auto* rp = result; rp; rp = rp->ai_next) {
+ unique_fd fd(socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol));
+ if (!fd) {
+ logger->warn("Unable to create socket: %s", strerror(errno));
+ continue;
+ }
+
+ if (bind(fd.get(), rp->ai_addr, rp->ai_addrlen)) {
+ logger->warn("Unable to bind socket: %s", strerror(errno));
+ continue;
+ }
+
+ fds.push_back(std::move(fd));
+ }
+
+ freeaddrinfo(result);
+
+ {
+ auto it = fds.begin();
+ while (it != fds.end()) {
+ if (listen(it->get(), SOMAXCONN)) {
+ logger->err("Unable to listen: %s", strerror(errno));
+ it = fds.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+ return fds;
+}
+
+unique_fd connect(Logger* logger, std::string const& host, uint16_t port) {
+ unique_fd fd;
+ std::string service;
+ service.resize(10);
+ {
+ auto [ptr, err] = std::to_chars(service.data(),
+ service.data() + service.size(), port);
+ if (err != std::errc{}) {
+ logger->err("Invalid port: %u", static_cast<unsigned int>(port));
+ return fd;
+ }
+ service.resize(ptr - service.data());
+ }
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG;
+ struct addrinfo* result;
+
+ if (auto err = getaddrinfo(host.c_str(), service.c_str(),
+ &hints, &result)) {
+ logger->err("Unable to resolve %s:%u: %s",
+ host.c_str(), static_cast<unsigned int>(port),
+ gai_strerror(err));
+ return fd;
+ }
+
+ for (auto* rp = result; rp; rp = rp->ai_next) {
+ // Note: SOCK_NONBLOCK is a linux 2.6.27+ thing.
+ fd.reset(socket(rp->ai_family, rp->ai_socktype | SOCK_NONBLOCK,
+ rp->ai_protocol));
+ if (!fd) {
+ logger->warn("Unable to create socket: %s", strerror(errno));
+ continue;
+ }
+
+ if (connect(fd.get(), rp->ai_addr, rp->ai_addrlen)) {
+ logger->warn("Unable to connect socket: %s", strerror(errno));
+ fd.reset();
+ continue;
+ }
+
+ break;
+ }
+
+ freeaddrinfo(result);
+ return fd;
+}
+
+unique_fd accept(Logger* logger, int fd) {
+ unique_fd ret(accept4(fd, nullptr, nullptr, SOCK_NONBLOCK));
+ if (!ret) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
+ logger->warn("Error accepting connection: %s", strerror(errno));
+ }
+ }
+ return ret;
+}
+
+} // namespace net
diff --git a/src/net.hh b/src/net.hh
new file mode 100644
index 0000000..918bde9
--- /dev/null
+++ b/src/net.hh
@@ -0,0 +1,22 @@
+#ifndef NET_HH
+#define NET_HH
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "logger.hh"
+#include "unique_fd.hh"
+
+namespace net {
+
+std::vector<unique_fd> bind_and_listen(
+ Logger* logger, std::string const& host, uint16_t port);
+
+unique_fd connect(Logger* logger, std::string const& host, uint16_t port);
+
+unique_fd accept(Logger* logger, int fd);
+
+} // namespace net
+
+#endif // NET_HH
diff --git a/src/ro_buffer.cc b/src/ro_buffer.cc
new file mode 100644
index 0000000..a071321
--- /dev/null
+++ b/src/ro_buffer.cc
@@ -0,0 +1,27 @@
+#include "ro_buffer.hh"
+
+#include <algorithm>
+#include <cassert>
+
+size_t RoBuffer::read(void* data, size_t len) {
+ assert(data);
+ if (len == 0)
+ return 0;
+ auto* d = reinterpret_cast<char*>(data);
+ size_t got = 0;
+ while (true) {
+ size_t avail;
+ auto want = len - got;
+ auto* ptr = rbuf(want, avail);
+ if (avail == 0)
+ return got;
+ if (avail >= want) {
+ std::copy_n(ptr, want, d + got);
+ rcommit(want);
+ return len;
+ }
+ std::copy_n(ptr, avail, d + got);
+ rcommit(avail);
+ got += avail;
+ }
+}
diff --git a/src/ro_buffer.hh b/src/ro_buffer.hh
new file mode 100644
index 0000000..93b2b3c
--- /dev/null
+++ b/src/ro_buffer.hh
@@ -0,0 +1,23 @@
+#ifndef RO_BUFFER_HH
+#define RO_BUFFER_HH
+
+#include <cstddef>
+
+class RoBuffer {
+public:
+ virtual ~RoBuffer() = default;
+
+ virtual bool empty() const = 0;
+
+ virtual char const* rbuf(size_t want, size_t& avail) = 0;
+ virtual void rcommit(size_t bytes) = 0;
+
+ size_t read(void* data, size_t len);
+
+protected:
+ RoBuffer() = default;
+ RoBuffer(RoBuffer const&) = delete;
+ RoBuffer& operator=(RoBuffer const&) = delete;
+};
+
+#endif // RO_BUFFER_HH
diff --git a/src/server.cc b/src/server.cc
new file mode 100644
index 0000000..57315df
--- /dev/null
+++ b/src/server.cc
@@ -0,0 +1,275 @@
+#include "server.hh"
+
+#include <algorithm>
+#include <cassert>
+#include <cerrno>
+#include <chrono>
+#include <cstring>
+#include <signal.h>
+#include <unistd.h>
+
+#include "buffer.hh"
+#include "logger.hh"
+#include "looper.hh"
+#include "net.hh"
+
+namespace {
+
+constexpr auto CLIENT_TIMEOUT = std::chrono::seconds(30);
+constexpr size_t BUFFER_SIZE = 1 * 1024 * 1024;
+
+int g_signal_pipe[2];
+
+void signal_handler(int) {
+ write(g_signal_pipe[1], "1", 1);
+}
+
+class ServerImpl : public Server {
+ public:
+ ServerImpl()
+ : looper_(Looper::create()) {}
+
+ ~ServerImpl() override {
+ for (auto& fd : listen_) {
+ looper_->remove(fd.get());
+ }
+ }
+
+ bool setup(std::shared_ptr<Logger> logger, std::string const& host,
+ uint16_t port) override {
+ logger_ = logger;
+ listen_ = net::bind_and_listen(logger.get(), host, port);
+ if (listen_.empty()) {
+ logger->err("Unable to listen");
+ return false;
+ }
+
+ for (auto& fd : listen_) {
+ looper_->add(fd.get(), Looper::EVENT_READ,
+ std::bind(&ServerImpl::listen_read, this, fd.get(),
+ std::placeholders::_1));
+ }
+ return true;
+ }
+
+ bool run(std::shared_ptr<Logger> logger) override {
+ logger_ = logger;
+
+ bool close_pipe;
+ if (pipe(g_signal_pipe) == 0) {
+ close_pipe = true;
+ struct sigaction sa;
+ sa.sa_handler = signal_handler;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_flags = SA_RESTART;
+ sigaction(SIGINT, &sa, nullptr);
+ sigaction(SIGTERM, &sa, nullptr);
+ sigaction(SIGQUIT, &sa, nullptr);
+
+ looper_->add(g_signal_pipe[0], Looper::EVENT_READ,
+ std::bind(&ServerImpl::signal, this, std::placeholders::_1));
+ } else {
+ logger->warn("Unable to create a pipe: %s", strerror(errno));
+ close_pipe = false;
+ }
+
+ bool ret = looper_->run(logger.get());
+
+ if (close_pipe) {
+ looper_->remove(g_signal_pipe[0]);
+ close(g_signal_pipe[0]);
+ close(g_signal_pipe[1]);
+ }
+ return ret;
+ }
+
+ private:
+ void signal(uint8_t) {
+ looper_->quit();
+ }
+
+ struct Client {
+ unique_fd fd_;
+ uint8_t events_{Looper::EVENT_WRITE};
+ std::unique_ptr<Buffer> in_;
+ std::unique_ptr<Buffer> out_;
+ uint32_t watchdog_{0};
+ std::chrono::steady_clock::time_point last_event_;
+
+ explicit Client(unique_fd fd)
+ : fd_(std::move(fd)) {}
+ };
+
+ void listen_read(int fd, uint8_t events) {
+ if (events & Looper::EVENT_ERROR) {
+ auto it = std::find_if(listen_.begin(), listen_.end(),
+ [fd](auto& listen) {
+ return listen.get() == fd;
+ });
+ if (it != listen_.end()) {
+ logger_->warn("Listening socket error");
+ listen_.erase(it);
+ if (listen_.empty()) {
+ logger_->err("No listening sockets left");
+ looper_->quit();
+ return;
+ }
+ } else {
+ assert(false);
+ }
+ return;
+ }
+
+ unique_fd client_fd = net::accept(logger_.get(), fd);
+ if (client_fd) {
+ logger_->dbg("New client: %d", client_fd.get());
+ auto client = new Client(std::move(client_fd));
+ client->in_ = Buffer::fixed(BUFFER_SIZE);
+ client->out_ = Buffer::fixed(BUFFER_SIZE);
+ clients_.push_back(client);
+
+ looper_->add(client->fd_.get(), client->events_,
+ std::bind(&ServerImpl::client_event,
+ this, client,
+ std::placeholders::_1));
+ client->watchdog_ = looper_->schedule(
+ CLIENT_TIMEOUT.count(),
+ std::bind(&ServerImpl::client_check_timeout,
+ this, client,
+ std::placeholders::_1));
+ }
+ }
+
+ void client_event(Client* client, uint8_t events) {
+ if (events & Looper::EVENT_ERROR) {
+ logger_->warn("%d: Unexpected client error", client->fd_.get());
+ remove_client(client);
+ return;
+ }
+
+ client->last_event_ = std::chrono::steady_clock::now();
+
+ if (events & Looper::EVENT_READ && client->in_) {
+ while (true) {
+ size_t avail;
+ auto* ptr = client->in_->wbuf(65535, avail);
+ if (avail == 0)
+ break;
+ auto ret = read(client->fd_.get(), ptr, avail);
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ break;
+ if (errno == EINTR)
+ continue;
+ logger_->warn("%d: Read error: %s", client->fd_.get(),
+ strerror(errno));
+ break;
+ } else if (ret == 0) {
+ logger_->dbg("%d: Connection closed", client->fd_.get());
+ if (client->out_->empty()) {
+ remove_client(client);
+ return;
+ } else {
+ // Might be only read side that is shutdown, try writing the
+ // rest before we give up.
+ client->in_.reset();
+ break;
+ }
+ } else {
+ client->in_->wcommit(ret);
+ if (static_cast<size_t>(ret) < avail)
+ break;
+ }
+ }
+ }
+
+ // Always try to write, usually we write as a result of a read above
+ // so saves having to call back to the loop just to answer.
+ // And if there wasn't a read event then we already know there was an write
+ // event.
+ if (/* events & Looper::EVENT_WRITE */ true) {
+ while (true) {
+ size_t avail;
+ auto* ptr = client->out_->rbuf(65535, avail);
+ if (avail == 0)
+ break;
+ auto ret = write(client->fd_.get(), ptr, avail);
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ break;
+ if (errno == EINTR)
+ continue;
+ logger_->warn("%d: Write error: %s", client->fd_.get(),
+ strerror(errno));
+ break;
+ } else {
+ client->out_->rcommit(ret);
+ if (static_cast<size_t>(ret) < avail)
+ break;
+ }
+ }
+ }
+
+ auto const old_events = client->events_;
+ client->events_ = 0;
+
+ if (client->in_) {
+ if (!client->in_->full())
+ client->events_ |= Looper::EVENT_READ;
+ if (!client->out_->empty())
+ client->events_ |= Looper::EVENT_WRITE;
+ } else {
+ if (client->out_->empty()) {
+ remove_client(client);
+ return;
+ } else {
+ client->events_ |= Looper::EVENT_WRITE;
+ }
+ }
+
+ if (client->events_ != old_events) {
+ looper_->update(client->fd_.get(), client->events_);
+ }
+ }
+
+ void client_check_timeout(Client* client, uint32_t) {
+ auto since_last = std::chrono::steady_clock::now() - client->last_event_;
+ if (since_last >= CLIENT_TIMEOUT) {
+ client->watchdog_ = 0;
+ remove_client(client);
+ return;
+ }
+ auto delay = CLIENT_TIMEOUT - since_last;
+ client->watchdog_ = looper_->schedule(
+ std::chrono::duration_cast<std::chrono::seconds>(delay).count(),
+ std::bind(&ServerImpl::client_check_timeout,
+ this, client,
+ std::placeholders::_1));
+ }
+
+ void remove_client(Client* client) {
+ if (client->fd_)
+ looper_->remove(client->fd_.get());
+ if (client->watchdog_)
+ looper_->cancel(client->watchdog_);
+ auto it = std::find(clients_.begin(), clients_.end(), client);
+ if (it != clients_.end()) {
+ clients_.erase(it);
+ } else {
+ assert(false);
+ }
+ delete client;
+ }
+
+ std::shared_ptr<Looper> looper_;
+ std::shared_ptr<Logger> logger_;
+ std::vector<unique_fd> listen_;
+ std::vector<Client*> clients_;
+};
+
+} // namespace
+
+// static
+std::unique_ptr<Server> Server::create() {
+ return std::make_unique<ServerImpl>();
+}
diff --git a/src/server.hh b/src/server.hh
new file mode 100644
index 0000000..9e73c0d
--- /dev/null
+++ b/src/server.hh
@@ -0,0 +1,26 @@
+#ifndef SERVER_HH
+#define SERVER_HH
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+class Logger;
+
+class Server {
+ public:
+ virtual ~Server() = default;
+
+ static std::unique_ptr<Server> create();
+
+ virtual bool setup(std::shared_ptr<Logger> logger, std::string const& host,
+ uint16_t port) = 0;
+
+ virtual bool run(std::shared_ptr<Logger> logger) = 0;
+
+ protected:
+ Server() = default;
+ Server(Server const&) = delete;
+};
+
+#endif // SERVER_HH
diff --git a/src/unique_fd.cc b/src/unique_fd.cc
new file mode 100644
index 0000000..b864fb4
--- /dev/null
+++ b/src/unique_fd.cc
@@ -0,0 +1,9 @@
+#include "unique_fd.hh"
+
+#include <unistd.h>
+
+void unique_fd::reset(int fd) {
+ if (fd_ >= 0)
+ close(fd_);
+ fd_ = fd;
+}
diff --git a/src/unique_fd.hh b/src/unique_fd.hh
new file mode 100644
index 0000000..dc60b3f
--- /dev/null
+++ b/src/unique_fd.hh
@@ -0,0 +1,48 @@
+#ifndef UNIQUE_FD_HH
+#define UNIQUE_FD_HH
+
+#include <cstddef>
+
+class unique_fd {
+public:
+ constexpr unique_fd() noexcept
+ : fd_(-1) {}
+ constexpr unique_fd(std::nullptr_t) noexcept
+ : fd_(-1) {}
+ explicit unique_fd(int fd) noexcept
+ : fd_(fd) {}
+ unique_fd(unique_fd&& fd) noexcept
+ : fd_(fd.release()) {}
+
+ ~unique_fd() { reset(); }
+
+ unique_fd& operator=(unique_fd&& fd) noexcept {
+ reset(fd.release());
+ return *this;
+ }
+ unique_fd& operator=(std::nullptr_t) noexcept {
+ reset();
+ return *this;
+ }
+
+ int get() const noexcept { return fd_; }
+ int operator*() const { return get(); }
+
+ explicit operator bool() const noexcept { return fd_ >= 0; }
+
+ int release() noexcept {
+ int ret = fd_;
+ fd_ = -1;
+ return ret;
+ }
+
+ void reset(int fd = -1);
+
+private:
+ unique_fd(unique_fd const&) = delete;
+ unique_fd& operator=(unique_fd const&) = delete;
+
+ int fd_;
+};
+
+#endif // UNIQUE_FD_HH
diff --git a/subprojects/protobuf.wrap b/subprojects/protobuf.wrap
new file mode 100644
index 0000000..c6d2ebb
--- /dev/null
+++ b/subprojects/protobuf.wrap
@@ -0,0 +1,16 @@
+[wrap-file]
+directory = protobuf-25.2
+source_url = https://github.com/protocolbuffers/protobuf/releases/download/v25.2/protobuf-25.2.tar.gz
+source_filename = protobuf-25.2.tar.gz
+source_hash = 8ff511a64fc46ee792d3fe49a5a1bcad6f7dc50dfbba5a28b0e5b979c17f9871
+patch_filename = protobuf_25.2-2_patch.zip
+patch_url = https://wrapdb.mesonbuild.com/v2/protobuf_25.2-2/get_patch
+patch_hash = a2f5968097eb036c228b72258435d09e93dca4093d09acb5078a376d8155df46
+source_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/protobuf_25.2-2/protobuf-25.2.tar.gz
+wrapdb_version = 25.2-2
+
+[provide]
+protobuf = protobuf_dep
+protobuf-lite = protobuf_lite_dep
+protoc = protoc_dep
+program_names = protoc