diff options
| -rw-r--r-- | meson.build | 62 | ||||
| -rw-r--r-- | src/args.cc | 293 | ||||
| -rw-r--r-- | src/args.hh | 56 | ||||
| -rw-r--r-- | src/buffer.cc | 233 | ||||
| -rw-r--r-- | src/buffer.hh | 25 | ||||
| -rw-r--r-- | src/logger.hh | 51 | ||||
| -rw-r--r-- | src/logger_base.cc | 68 | ||||
| -rw-r--r-- | src/logger_base.hh | 25 | ||||
| -rw-r--r-- | src/logger_file.cc | 52 | ||||
| -rw-r--r-- | src/logger_file.hh | 16 | ||||
| -rw-r--r-- | src/logger_null.cc | 18 | ||||
| -rw-r--r-- | src/logger_stdio.cc | 39 | ||||
| -rw-r--r-- | src/logger_syslog.cc | 49 | ||||
| -rw-r--r-- | src/logger_syslog.hh | 14 | ||||
| -rw-r--r-- | src/looper.hh | 38 | ||||
| -rw-r--r-- | src/looper_poll.cc | 224 | ||||
| -rw-r--r-- | src/main.cc | 153 | ||||
| -rw-r--r-- | src/net.cc | 136 | ||||
| -rw-r--r-- | src/net.hh | 22 | ||||
| -rw-r--r-- | src/ro_buffer.cc | 27 | ||||
| -rw-r--r-- | src/ro_buffer.hh | 23 | ||||
| -rw-r--r-- | src/server.cc | 275 | ||||
| -rw-r--r-- | src/server.hh | 26 | ||||
| -rw-r--r-- | src/unique_fd.cc | 9 | ||||
| -rw-r--r-- | src/unique_fd.hh | 48 | ||||
| -rw-r--r-- | subprojects/protobuf.wrap | 16 |
26 files changed, 1998 insertions, 0 deletions
diff --git a/meson.build b/meson.build new file mode 100644 index 0000000..847c660 --- /dev/null +++ b/meson.build @@ -0,0 +1,62 @@ +project( + 'buildhelper', + 'cpp', + version : '0.1', + default_options : [ + 'warning_level=3', + 'cpp_std=c++17', + ], +) + +protobuf_dep = dependency( + 'protobuf-lite', + fallback: ['protobuf', 'protobuf_lite_dep'], +) + +protoc = find_program('protoc', native: true) + +protoc_gen = generator( + protoc, + output: ['@BASENAME@.pb.cc', '@BASENAME@.pb.h'], + depfile: '@BASENAME@.deps', + arguments: ['-I@SOURCE_ROOT@/apis/remote-apis', '-I@SOURCE_ROOT@/apis/googleapis', '--cpp_out=@BUILD_DIR@', '--dependency_out=@DEPFILE@', '@INPUT@'], +) + +google_apis = protoc_gen.process( + [ + 'apis/googleapis/google/api/annotations.proto', + 'apis/googleapis/google/api/client.proto', + 'apis/googleapis/google/api/http.proto', + 'apis/googleapis/google/api/launch_stage.proto', + 'apis/googleapis/google/longrunning/operations.proto', + 'apis/googleapis/google/rpc/status.proto', + ], + preserve_path_from: meson.project_source_root() + '/apis/googleapis', +) + +remote_apis = protoc_gen.process( + [ + 'apis/remote-apis/build/bazel/semver/semver.proto', + 'apis/remote-apis/build/bazel/remote/execution/v2/remote_execution.proto', + ], + preserve_path_from: meson.project_source_root() + '/apis/remote-apis', +) + +libapis = static_library('apis', [google_apis, remote_apis]) + +executable('buildhelper', + 'src/args.cc', + 'src/buffer.cc', + 'src/logger_base.cc', + 'src/logger_file.cc', + 'src/logger_null.cc', + 'src/logger_stdio.cc', + 'src/logger_syslog.cc', + 'src/looper_poll.cc', + 'src/main.cc', + 'src/net.cc', + 'src/ro_buffer.cc', + 'src/server.cc', + 'src/unique_fd.cc', + link_with: libapis, + install : true) diff --git a/src/args.cc b/src/args.cc new file mode 100644 index 0000000..a89c418 --- /dev/null +++ b/src/args.cc @@ -0,0 +1,293 @@ +#include "args.hh" + +#include <cassert> +#include <iostream> +#include <unordered_map> +#include <vector> + +namespace { + +class OptionImpl : public OptionWithArg { +public: + OptionImpl(char short_name, std::string long_name, std::string description, + bool require_arg, std::string arg_description) + : short_name_(short_name), + long_name_(std::move(long_name)), + description_(std::move(description)), + require_arg_(require_arg), + arg_description_(std::move(arg_description)) { + } + + bool is_set() const override { return set_; } + + std::string const& arg() const override { return arg_; } + + char short_name() const { return short_name_; } + + std::string const& long_name() const { return long_name_; } + + std::string const& description() const { return description_; } + + bool require_arg() const { return require_arg_; } + + std::string const& arg_description() const { return arg_description_; } + + void reset() { + set_ = false; + arg_.clear(); + } + + void set() { + set_ = true; + } + + void set_arg(std::string arg) { + arg_ = std::move(arg); + } + +private: + char const short_name_; + std::string const long_name_; + std::string const description_; + bool const require_arg_; + std::string const arg_description_; + bool set_ = false; + std::string arg_; +}; + +class ArgsImpl : public Args { +public: + ArgsImpl() = default; + + std::shared_ptr<Option const> add_option(char short_name, + std::string long_name, + std::string description) override { + prepare_option(short_name, long_name); + options_.push_back(std::make_shared<OptionImpl>(short_name, + std::move(long_name), + std::move(description), + false, std::string())); + return options_.back(); + } + + std::shared_ptr<OptionWithArg const> add_option_with_arg( + char short_name, + std::string long_name, + std::string description, + std::string arg_description) override { + prepare_option(short_name, long_name); + options_.push_back(std::make_shared<OptionImpl>(short_name, + std::move(long_name), + std::move(description), + true, arg_description)); + return options_.back(); + } + + bool run(int argc, char** argv, std::string_view prgname, std::ostream& err, + std::vector<std::string>& out) override { + for (int a = 1; a < argc; ++a) { + if (argv[a][0] == '-') { + if (argv[a][1] == '-') { + if (argv[a][2] != '\0') { + // A long name with optional "=" argument + size_t len = 2; + while (argv[a][len] != '=' && argv[a][len]) + ++len; + std::string name(argv[a] + 2, len - 2); + auto it = long_names_.find(name); + if (it == long_names_.end()) { + err << prgname << ": unrecognized option '--" + << name << "'" << std::endl; + return false; + } + auto* opt = options_[it->second].get(); + opt->set(); + if (argv[a][len]) { + if (opt->require_arg()) { + opt->set_arg(std::string(argv[a] + len + 1)); + } else { + err << prgname << ": option '--" + << name << "' doesn't allow an argument" << std::endl; + return false; + } + } else { + if (opt->require_arg()) { + if (a + 1 >= argc) { + err << prgname << ": option '--" + << name << "' requires an argument" << std::endl; + return false; + } else { + opt->set_arg(argv[++a]); + } + } + } + continue; + } else { + // "--", all following values are arguments + for (++a; a < argc; ++a) + out.push_back(argv[a]); + break; + } + } else if (argv[a][1] != '\0') { + // One or more short names + for (auto* name = argv[a] + 1; *name; ++name) { + auto it = short_names_.find(*name); + if (it == short_names_.end()) { + err << prgname << ": invalid option -- '" + << *name << "'" << std::endl; + return false; + } + auto* opt = options_[it->second].get(); + opt->set(); + if (opt->require_arg()) { + if (a + 1 >= argc) { + err << prgname << ": option requires an argument" + << " -- '" << *name << "'" << std::endl; + return false; + } else { + opt->set_arg(argv[++a]); + } + } + } + continue; + } else { + // single "-", treat as argument + } + } + + out.push_back(argv[a]); + } + return true; + } + + void print_descriptions(std::ostream& out, + uint32_t column_width) const override { + uint32_t max_left = 0; + for (auto const& option : options_) { + uint32_t left = 0; + if (option->short_name() != '\0') { + if (!option->long_name().empty()) { + left = 6 + option->long_name().size(); // -S, --long + } else { + left = 2; // -S + } + } else if (!option->long_name().empty()) { + left = 2 + option->long_name().size(); // --long + } + if (option->require_arg()) + left += 1 + option->arg_description().size(); // (=| )ARG + if (left > 0) + left += 2; // Need at least two spaces between option and desc + // Prefix with two spaces (either infront of option or desc) + left += 2; + + if (left > max_left) + max_left = left; + } + + uint32_t const avail_right = + max_left > column_width ? 0 : column_width - max_left; + + if (avail_right < 20) { + // Fallback mode, description on its own row. + for (auto const& option : options_) { + print_option(out, *option); + out << '\n' << option->description() << '\n'; + } + return; + } + + // Check if all descriptions fit, justify to the right on a 80 col width + bool all_desc_fit = true; + uint32_t max_right = 0; + for (auto const& option : options_) { + uint32_t right = option->description().size(); + if (right > avail_right) { + all_desc_fit = false; + break; + } + if (right > max_right) + max_right = right; + } + + if (all_desc_fit) + max_left = std::max(80u, column_width) - max_right; + + for (auto const& option : options_) { + out << " "; + uint32_t left = 2 + print_option(out, *option); + std::fill_n(std::ostreambuf_iterator<char>(out), max_left - left, ' '); + + if (option->description().size() <= avail_right) { + out << option->description() << '\n'; + continue; + } + + // Wrap description + size_t last = 0; + bool first = true; + while (true) { + if (first) { + first = false; + } else { + std::fill_n(std::ostreambuf_iterator<char>(out), max_left, ' '); + } + + size_t end = last + avail_right; + if (end >= option->description().size()) { + out << option->description().substr(last) << '\n'; + break; + } + size_t space = option->description().rfind(' ', end); + if (space == std::string::npos || space < last) { + space = end; + } + out << option->description().substr(last, space - last) << '\n'; + last = space < end ? space + 1 : end; + } + } + } + +private: + void prepare_option(char short_name, std::string const& long_name) { + if (short_name != '\0') + short_names_.emplace(short_name, options_.size()); + if (!long_name.empty()) { + assert(long_name.find('=') == std::string::npos); + long_names_.emplace(long_name, options_.size()); + } + } + + size_t print_option(std::ostream& out, const OptionImpl& option) const { + bool only_short = false; + size_t ret = 0; + if (option.short_name() != '\0') { + out << '-' << option.short_name(); + if (!option.long_name().empty()) { + out << ", --" << option.long_name(); + ret = 6 + option.long_name().size(); + } else { + ret = 2; + only_short = true; + } + } else if (!option.long_name().empty()) { + out << "--" << option.long_name(); + ret = 2 + option.long_name().size(); + } + if (option.require_arg()) { + out << (only_short ? ' ' : '=') << option.arg_description(); + ret += 1 + option.arg_description().size(); + } + return ret; + } + + std::vector<std::shared_ptr<OptionImpl>> options_; + std::unordered_map<char, size_t> short_names_; + std::unordered_map<std::string, size_t> long_names_; +}; + +} // namespace + +std::unique_ptr<Args> Args::create() { + return std::make_unique<ArgsImpl>(); +} diff --git a/src/args.hh b/src/args.hh new file mode 100644 index 0000000..8442b16 --- /dev/null +++ b/src/args.hh @@ -0,0 +1,56 @@ +#ifndef ARGS_HH +#define ARGS_HH + +#include <iosfwd> +#include <memory> +#include <string> +#include <string_view> +#include <vector> + +class Option { +public: + virtual ~Option() = default; + + virtual bool is_set() const = 0; + +protected: + Option() = default; + Option(Option const&) = delete; + Option& operator=(Option const&) = delete; +}; + +class OptionWithArg : public Option { +public: + virtual std::string const& arg() const = 0; +}; + +class Args { +public: + virtual ~Args() = default; + + static std::unique_ptr<Args> create(); + + virtual std::shared_ptr<Option const> add_option( + char short_name, + std::string long_name, + std::string description) = 0; + + virtual std::shared_ptr<OptionWithArg const> add_option_with_arg( + char short_name, + std::string long_name, + std::string description, + std::string arg_description) = 0; + + virtual bool run(int argc, char** argv, std::string_view prgname, + std::ostream& err, std::vector<std::string>& out) = 0; + + virtual void print_descriptions(std::ostream& out, + uint32_t column_width) const = 0; + +protected: + Args() = default; + Args(Args const&) = delete; + Args& operator=(Args const&) = delete; +}; + +#endif // ARGS_HH diff --git a/src/buffer.cc b/src/buffer.cc new file mode 100644 index 0000000..d39ec86 --- /dev/null +++ b/src/buffer.cc @@ -0,0 +1,233 @@ +#include "buffer.hh" + +#include <algorithm> +#include <cassert> +#include <vector> + +namespace { + +class Round : public Buffer { +public: + explicit Round(size_t size) + : data_(std::make_unique<char[]>(size)), size_(size) {} + + bool empty() const override { + return rptr_ == wptr_ && !full_; + } + + bool full() const override { + return rptr_ == wptr_ && full_; + } + + void clear() override { + rptr_ = wptr_ = 0; + full_ = false; + } + + char const* rbuf(size_t want, size_t& avail) override { + if (rptr_ < wptr_) { + avail = wptr_ - rptr_; + } else if (rptr_ == wptr_ && !full_) { + avail = 0; + } else { + avail = size_ - rptr_; + if (want > avail) { + auto* target = data_.get(); + for (; rptr_ < size_; ++rptr_) { + std::swap(*target, data_[rptr_]); + ++target; + } + rptr_ = 0; + wptr_ += avail; + if (wptr_ == size_) { + assert(full_); + wptr_ = 0; + avail = size_ - rptr_; + } else { + avail = wptr_ - rptr_; + } + } + } + return data_.get() + rptr_; + } + + void rcommit(size_t bytes) override { + if (bytes == 0) + return; + full_ = false; + assert(rptr_ < wptr_ ? rptr_ + bytes <= wptr_ : rptr_ + bytes <= size_); + rptr_ += bytes; + if (rptr_ == size_) + rptr_ = 0; + if (rptr_ == wptr_) + rptr_ = wptr_ = 0; + } + + char* wbuf(size_t request, size_t& avail) override { + if (wptr_ < rptr_) { + avail = rptr_ - wptr_; + } else if (rptr_ == wptr_ && full_) { + avail = 0; + } else { + avail = size_ - wptr_; + if (avail < request && wptr_ != 0 && rptr_ != 0) { + std::copy(data_.get() + rptr_, data_.get() + wptr_, data_.get()); + auto size = wptr_ - rptr_; + wptr_ = size; + rptr_ = 0; + avail = size_ - wptr_; + } + } + return data_.get() + wptr_; + } + + void wcommit(size_t bytes) override { + if (bytes == 0) + return; + assert(wptr_ < rptr_ ? wptr_ + bytes <= rptr_ : wptr_ + bytes <= size_); + wptr_ += bytes; + if (wptr_ == size_) + wptr_ = 0; + if (wptr_ == rptr_) + full_ = true; + } + +private: + std::unique_ptr<char[]> data_; + size_t const size_; + size_t rptr_{0}; + size_t wptr_{0}; + bool full_{false}; +}; + +class Growing : public Buffer { +public: + Growing(size_t base_size, size_t max_size) + : base_size_(base_size), max_size_(max_size), data_(base_size) { + } + + bool empty() const override { + return rptr_ == wptr_; + } + + bool full() const override { + return rptr_ == 0 && wptr_ == max_size_; + } + + void clear() override { + data_.resize(base_size_); + rptr_ = wptr_ = 0; + } + + char const* rbuf(size_t, size_t& avail) override { + avail = wptr_ - rptr_; + return data_.data() + rptr_; + } + + void rcommit(size_t bytes) override { + assert(rptr_ + bytes <= wptr_); + rptr_ += bytes; + if (rptr_ == wptr_) + rptr_ = wptr_ = 0; + } + + char* wbuf(size_t request, size_t& avail) override { + avail = data_.size() - wptr_; + if (request > avail && rptr_ > 0) { + std::copy(data_.begin() + rptr_, data_.begin() + wptr_, data_.begin()); + wptr_ -= rptr_; + rptr_ = 0; + avail = data_.size() - wptr_; + } + if (request > avail && data_.size() < max_size_) { + data_.resize( + std::min(max_size_, + data_.size() + std::max(request - avail, + (max_size_ - base_size_) / 8))); + avail = data_.size() - wptr_; + } + return data_.data() + wptr_; + } + + void wcommit(size_t bytes) override { + assert(wptr_ + bytes <= data_.size()); + wptr_ += bytes; + } + +private: + size_t const base_size_; + size_t const max_size_; + std::vector<char> data_; + size_t rptr_{0}; + size_t wptr_{0}; +}; + +class Null : public Buffer { +public: + bool empty() const override { + return true; + } + + char const* rbuf(size_t, size_t& avail) override { + avail = 0; + return buf_; + } + + void rcommit(size_t bytes) override { + assert(bytes == 0); + } + + bool full() const override { + return false; + } + + void clear() override {} + + char* wbuf(size_t, size_t& avail) override { + avail = sizeof(buf_); + return buf_; + } + + void wcommit(size_t) override { + } + +private: + char buf_[4096]; +}; + +} // namespace + +std::unique_ptr<Buffer> Buffer::fixed(size_t size) { + return std::make_unique<Round>(size); +} + +std::unique_ptr<Buffer> Buffer::growing(size_t base_size, size_t max_size) { + return std::make_unique<Growing>(base_size, max_size); +} + +std::unique_ptr<Buffer> Buffer::null() { + return std::make_unique<Null>(); +} + +size_t Buffer::write(void const* data, size_t len) { + assert(data); + if (len == 0) + return 0; + auto* d = reinterpret_cast<char const*>(data); + size_t wrote = 0; + while (true) { + size_t avail; + auto want = len - wrote; + auto* ptr = wbuf(want, avail); + if (avail == 0) + return wrote; + if (avail >= want) { + std::copy_n(d + wrote, want, ptr); + wcommit(want); + return len; + } + std::copy_n(d + wrote, avail, ptr); + wcommit(avail); + wrote += avail; + } +} diff --git a/src/buffer.hh b/src/buffer.hh new file mode 100644 index 0000000..ae02e23 --- /dev/null +++ b/src/buffer.hh @@ -0,0 +1,25 @@ +#ifndef BUFFER_HH +#define BUFFER_HH + +#include "ro_buffer.hh" + +#include <memory> + +class Buffer : public RoBuffer { +public: + static std::unique_ptr<Buffer> fixed(size_t size); + static std::unique_ptr<Buffer> growing(size_t base_size, size_t max_size); + // Acts as /dev/null, ie always empty, can write anything to it. + static std::unique_ptr<Buffer> null(); + + virtual bool full() const = 0; + + virtual void clear() = 0; + + virtual char* wbuf(size_t request, size_t& avail) = 0; + virtual void wcommit(size_t bytes) = 0; + + size_t write(void const* data, size_t len); +}; + +#endif // BUFFER_HH diff --git a/src/logger.hh b/src/logger.hh new file mode 100644 index 0000000..48d21a7 --- /dev/null +++ b/src/logger.hh @@ -0,0 +1,51 @@ +#ifndef LOGGER_HH +#define LOGGER_HH + +#include <memory> + +class Logger { +public: + virtual ~Logger() = default; + + static std::unique_ptr<Logger> create_stdio(); + static std::unique_ptr<Logger> create_null(); + + virtual void err(char const* format, ...) +#if HAVE_ATTRIBUTE_FORMAT + __attribute__((format(printf, 2, 3))) // this takes up 1 +#endif // HAVE_ATTRIBUTE_FORMAT + = 0; + + virtual void warn(char const* format, ...) +#if HAVE_ATTRIBUTE_FORMAT + __attribute__((format(printf, 2, 3))) +#endif // HAVE_ATTRIBUTE_FORMAT + = 0; + + virtual void info(char const* format, ...) +#if HAVE_ATTRIBUTE_FORMAT + __attribute__((format(printf, 2, 3))) +#endif // HAVE_ATTRIBUTE_FORMAT + = 0; + +#ifdef NDEBUG + void dbg(char const*, ...) +#if HAVE_ATTRIBUTE_FORMAT + __attribute__((format(printf, 2, 3))) +#endif // HAVE_ATTRIBUTE_FORMAT + {} +#else + virtual void dbg(char const* format, ...) +#if HAVE_ATTRIBUTE_FORMAT + __attribute__((format(printf, 2, 3))) +#endif // HAVE_ATTRIBUTE_FORMAT + = 0; +#endif + +protected: + Logger() = default; + Logger(Logger const&) = delete; + Logger& operator=(Logger const&) = delete; +}; + +#endif // LOGGER_HH diff --git a/src/logger_base.cc b/src/logger_base.cc new file mode 100644 index 0000000..0bd4e98 --- /dev/null +++ b/src/logger_base.cc @@ -0,0 +1,68 @@ +#include "logger_base.hh" + +#include <cassert> +#include <cstdarg> +#include <cstdio> +#include <cstdlib> + +void LoggerBase::err(char const* format, ...) { + va_list args; + va_start(args, format); + char* tmp = nullptr; + auto len = vasprintf(&tmp, format, args); + va_end(args); + if (len == -1) { + assert(false); + return; + } + while (len > 0 && tmp[len - 1] == '\n') --len; + msg(Level::ERR, std::string_view(tmp, len)); + free(tmp); +} + +void LoggerBase::warn(char const* format, ...) { + va_list args; + va_start(args, format); + char* tmp = nullptr; + auto len = vasprintf(&tmp, format, args); + va_end(args); + if (len == -1) { + assert(false); + return; + } + while (len > 0 && tmp[len - 1] == '\n') --len; + msg(Level::WARN, std::string_view(tmp, len)); + free(tmp); +} + +void LoggerBase::info(char const* format, ...) { + va_list args; + va_start(args, format); + char* tmp = nullptr; + auto len = vasprintf(&tmp, format, args); + va_end(args); + if (len == -1) { + assert(false); + return; + } + while (len > 0 && tmp[len - 1] == '\n') --len; + msg(Level::INFO, std::string_view(tmp, len)); + free(tmp); +} + +#ifndef NDEBUG +void LoggerBase::dbg(char const* format, ...) { + va_list args; + va_start(args, format); + char* tmp = nullptr; + auto len = vasprintf(&tmp, format, args); + va_end(args); + if (len == -1) { + assert(false); + return; + } + while (len > 0 && tmp[len - 1] == '\n') --len; + msg(Level::DBG, std::string_view(tmp, len)); + free(tmp); +} +#endif diff --git a/src/logger_base.hh b/src/logger_base.hh new file mode 100644 index 0000000..56911f0 --- /dev/null +++ b/src/logger_base.hh @@ -0,0 +1,25 @@ +#ifndef LOGGER_BASE_HH +#define LOGGER_BASE_HH + +#include <string_view> + +#include "logger.hh" + +class LoggerBase : public Logger { +public: + void err(char const* format, ...) override; + void warn(char const* format, ...) override; + void info(char const* format, ...) override; +#ifndef NDEBUG + void dbg(char const* format, ...) override; +#endif + +protected: + enum class Level { + ERR, WARN, INFO, DBG + }; + + virtual void msg(Level level, std::string_view str) = 0; +}; + +#endif // LOGGER_BASE_HH diff --git a/src/logger_file.cc b/src/logger_file.cc new file mode 100644 index 0000000..d813b2a --- /dev/null +++ b/src/logger_file.cc @@ -0,0 +1,52 @@ +#include "logger_file.hh" + +#include <fstream> +#include <mutex> + +#include "logger_base.hh" + +namespace { + +class LoggerFileImpl : public LoggerBase { +public: + explicit LoggerFileImpl(std::filesystem::path const& path) + : out_(path, std::ios::out | std::ios::app) {} + + bool good() const { + return out_.good(); + } + +protected: + void msg(Level lvl, std::string_view msg) override { + std::lock_guard<std::mutex> lock(mutex_); + switch (lvl) { + case Level::ERR: + out_ << "Error: " << msg << std::endl; + break; + case Level::WARN: + out_ << "Warning: " << msg << std::endl; + break; + case Level::INFO: + out_ << msg << std::endl; + break; + case Level::DBG: + out_ << "Debug: " << msg << std::endl; + break; + } + } + +private: + std::mutex mutex_; + std::fstream out_; +}; + +} // namespace + +std::unique_ptr<Logger> LoggerFile::create(std::filesystem::path const& path, + Logger* fallback) { + auto logger = std::make_unique<LoggerFileImpl>(path); + if (logger->good()) + return logger; + fallback->warn("Unable to open %s for appending.", path.c_str()); + return nullptr; +} diff --git a/src/logger_file.hh b/src/logger_file.hh new file mode 100644 index 0000000..131798c --- /dev/null +++ b/src/logger_file.hh @@ -0,0 +1,16 @@ +#ifndef LOGGER_FILE_HH +#define LOGGER_FILE_HH + +#include <memory> +#include <filesystem> + +#include "logger.hh" + +class LoggerFile : public Logger { + public: + // Can return nullptr, will write reason to fallback. + static std::unique_ptr<Logger> create(std::filesystem::path const& path, + Logger* fallback); +}; + +#endif // LOGGER_FILE_HH diff --git a/src/logger_null.cc b/src/logger_null.cc new file mode 100644 index 0000000..2294a70 --- /dev/null +++ b/src/logger_null.cc @@ -0,0 +1,18 @@ +#include "logger_base.hh" + +namespace { + +class LoggerNull : public LoggerBase { +public: + LoggerNull() = default; + +protected: + void msg(Level, std::string_view) override { + } +}; + +} // namespace + +std::unique_ptr<Logger> Logger::create_null() { + return std::make_unique<LoggerNull>(); +} diff --git a/src/logger_stdio.cc b/src/logger_stdio.cc new file mode 100644 index 0000000..e1f26d2 --- /dev/null +++ b/src/logger_stdio.cc @@ -0,0 +1,39 @@ +#include "logger_base.hh" + +#include <iostream> +#include <mutex> + +namespace { + +class LoggerStdio : public LoggerBase { +public: + LoggerStdio() = default; + +protected: + void msg(Level lvl, std::string_view msg) override { + std::lock_guard<std::mutex> lock(mutex_); + switch (lvl) { + case Level::ERR: + std::cerr << "Error: " << msg << std::endl; + break; + case Level::WARN: + std::cout << "Warning: " << msg << std::endl; + break; + case Level::INFO: + std::cout << msg << std::endl; + break; + case Level::DBG: + std::cout << "Debug: " << msg << std::endl; + break; + } + } + +private: + std::mutex mutex_; +}; + +} // namespace + +std::unique_ptr<Logger> Logger::create_stdio() { + return std::make_unique<LoggerStdio>(); +} diff --git a/src/logger_syslog.cc b/src/logger_syslog.cc new file mode 100644 index 0000000..54cdaf3 --- /dev/null +++ b/src/logger_syslog.cc @@ -0,0 +1,49 @@ +#include "logger_syslog.hh" + +#include <string> +#include <syslog.h> + +#include "logger_base.hh" + +namespace { + +class LoggerSyslogImpl : public LoggerBase { +public: + explicit LoggerSyslogImpl(std::string const& prgname) + : ident_(prgname) { + openlog(ident_.c_str(), LOG_PID, LOG_DAEMON); + } + + ~LoggerSyslogImpl() override { + closelog(); + } + +protected: + void msg(Level lvl, std::string_view msg) override { + int prio; + switch (lvl) { + case Level::ERR: + prio = LOG_ERR; + break; + case Level::WARN: + prio = LOG_WARNING; + break; + case Level::INFO: + prio = LOG_INFO; + break; + case Level::DBG: + prio = LOG_DEBUG; + break; + } + syslog(prio, "%.*s", static_cast<int>(msg.length()), msg.data()); + } + +private: + std::string const ident_; +}; + +} // namespace + +std::unique_ptr<Logger> LoggerSyslog::create(std::string const& prgname) { + return std::make_unique<LoggerSyslogImpl>(prgname); +} diff --git a/src/logger_syslog.hh b/src/logger_syslog.hh new file mode 100644 index 0000000..1417edc --- /dev/null +++ b/src/logger_syslog.hh @@ -0,0 +1,14 @@ +#ifndef LOGGER_SYSLOG_HH +#define LOGGER_SYSLOG_HH + +#include <memory> +#include <string> + +#include "logger.hh" + +class LoggerSyslog : public Logger { + public: + static std::unique_ptr<Logger> create(std::string const& prgname); +}; + +#endif // LOGGER_SYSLOG_HH diff --git a/src/looper.hh b/src/looper.hh new file mode 100644 index 0000000..3286cb1 --- /dev/null +++ b/src/looper.hh @@ -0,0 +1,38 @@ +#ifndef LOOPER_HH +#define LOOPER_HH + +#include <functional> +#include <memory> + +class Logger; + +class Looper { +public: + constexpr static uint8_t EVENT_READ = 1; + constexpr static uint8_t EVENT_WRITE = 2; + constexpr static uint8_t EVENT_ERROR = 4; + + virtual ~Looper() = default; + + static std::unique_ptr<Looper> create(); + + virtual void add(int fd, uint8_t events, + std::function<void(uint8_t)> callback) = 0; + virtual void update(int fd, uint8_t events) = 0; + virtual void remove(int fd) = 0; + + // Returned id is never 0 + virtual uint32_t schedule(double delay, + std::function<void(uint32_t)> callback) = 0; + virtual void cancel(uint32_t id) = 0; + + virtual bool run(Logger* logger) = 0; + virtual void quit() = 0; + +protected: + Looper() = default; + Looper(Looper const&) = delete; + Looper& operator=(Looper const&) = delete; +}; + +#endif // LOOPER_HH diff --git a/src/looper_poll.cc b/src/looper_poll.cc new file mode 100644 index 0000000..1945583 --- /dev/null +++ b/src/looper_poll.cc @@ -0,0 +1,224 @@ +#include "looper.hh" + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <deque> +#include <errno.h> +#include <poll.h> +#include <string.h> +#include <unordered_map> +#include <vector> + +#include "logger.hh" + +namespace { + +class LooperPoll : public Looper { +public: + LooperPoll() = default; + + void add(int fd, uint8_t events, + std::function<void(uint8_t)> callback) override { + if (fd < 0) + return; + auto ret = entry_.emplace(fd, Entry(events)); + if (!ret.second) { + assert(ret.first->second.delete_); + ret.first->second.delete_ = false; + ret.first->second.events_ = events; + } + ret.first->second.callback_ = std::move(callback); + } + + void update(int fd, uint8_t events) override { + if (fd < 0) + return; + auto it = entry_.find(fd); + if (it == entry_.end() || it->second.delete_) { + assert(false); + return; + } + it->second.events_ = events; + } + + void remove(int fd) override { + if (fd < 0) + return; + auto it = entry_.find(fd); + if (it == entry_.end()) + return; + it->second.delete_ = true; + } + + bool run(Logger* logger) override { + while (!quit_) { + int timeout; + if (scheduled_.empty()) { + timeout = -1; + } else { + auto now = std::chrono::steady_clock::now(); + while (true) { + if (now < scheduled_.front().target_) { + auto delay = std::chrono::duration_cast<std::chrono::milliseconds>( + scheduled_.front().target_ - now); + if (delay.count() <= std::numeric_limits<int>::max()) + timeout = delay.count(); + else + timeout = std::numeric_limits<int>::max(); + break; + } + auto id = scheduled_.front().id_; + auto callback = std::move(scheduled_.front().callback_); + scheduled_.pop_front(); + callback(id); + if (scheduled_.empty()) { + timeout = -1; + break; + } + } + // Scheduled callbacks might call quit(). + if (quit_) + break; + } + std::vector<struct pollfd> pollfd; + pollfd.reserve(entry_.size()); + auto it = entry_.begin(); + while (it != entry_.end()) { + if (it->second.delete_) { + it = entry_.erase(it); + } else { + struct pollfd tmp; + tmp.fd = it->first; + tmp.events = events_looper2poll(it->second.events_); + pollfd.push_back(std::move(tmp)); + ++it; + } + } + int active = poll(pollfd.data(), pollfd.size(), timeout); + if (active < 0) { + if (errno == EINTR) + continue; + logger->err("Poll failed: %s", strerror(errno)); + return false; + } + for (auto it2 = pollfd.begin(); active; ++it2) { + if (it2->revents == 0) + continue; + --active; + auto events = events_poll2looper(it2->revents); + if (events) { + it = entry_.find(it2->fd); + if (!it->second.delete_) { + events &= (it->second.events_ | EVENT_ERROR); + if (events) { + it->second.callback_(events); + } + } + } + } + } + // Reset quit_ so run() can be called again + quit_ = false; + return true; + } + + void quit() override { + quit_ = true; + } + + uint32_t schedule(double delay, + std::function<void(uint32_t)> callback) override { + assert(delay >= 0.0); + uint32_t id = next_schedule_id(); + auto target = std::chrono::steady_clock::now() + + std::chrono::duration_cast<std::chrono::steady_clock::duration>( + std::chrono::duration<double>(delay)); + auto insert = scheduled_.end(); + while (insert != scheduled_.begin()) { + auto prev = insert - 1; + if (prev->target_ < target) + break; + insert = prev; + } + scheduled_.emplace(insert, std::move(callback), id, target); + return id; + } + + void cancel(uint32_t id) override { + auto it = std::find_if(scheduled_.begin(), scheduled_.end(), + [id](auto const& scheduled) { + return scheduled.id_ == id; + }); + if (it != scheduled_.end()) { + scheduled_.erase(it); + } else { + assert(false); + } + } + +private: + struct Entry { + uint8_t events_; + std::function<void(uint8_t)> callback_; + bool delete_; + + explicit Entry(uint8_t events) + : events_(events), delete_(false) {} + }; + + struct Scheduled { + std::function<void(uint32_t)> callback_; + uint32_t id_; + std::chrono::steady_clock::time_point target_; + + Scheduled(std::function<void(uint32_t)> callback, uint32_t id, + std::chrono::steady_clock::time_point target) + : callback_(std::move(callback)), id_(id), target_(target) {} + }; + + static short events_looper2poll(uint8_t events) { + short ret = 0; + if (events & EVENT_READ) + ret |= POLLIN | POLLPRI; + if (events & EVENT_WRITE) + ret |= POLLOUT; + return ret; + } + + static uint8_t events_poll2looper(short events) { + uint8_t ret = 0; + if (events & (POLLIN | POLLPRI | POLLHUP)) + ret |= EVENT_READ; + if (events & POLLOUT) + ret |= EVENT_WRITE; + if (events & (POLLERR | POLLNVAL)) + ret |= EVENT_ERROR; + return ret; + } + + uint32_t next_schedule_id() { + while (true) { + uint32_t ret = next_schedule_id_++; + if (ret) { + bool found = std::any_of(scheduled_.begin(), scheduled_.end(), + [ret](auto const& scheduled) { + return scheduled.id_ == ret; + }); + if (!found) + return ret; + } + } + } + + bool quit_{false}; + std::unordered_map<int, Entry> entry_; + uint32_t next_schedule_id_{1}; + std::deque<Scheduled> scheduled_; +}; + +} // namespace + +std::unique_ptr<Looper> Looper::create() { + return std::make_unique<LooperPoll>(); +} diff --git a/src/main.cc b/src/main.cc new file mode 100644 index 0000000..980adba --- /dev/null +++ b/src/main.cc @@ -0,0 +1,153 @@ +#include <cerrno> +#include <charconv> +#include <cstring> +#include <iostream> +#include <unistd.h> + +#include "args.hh" +#include "logger.hh" +#include "logger_file.hh" +#include "logger_syslog.hh" +#include "server.hh" + +namespace { + +struct Context { + std::shared_ptr<Logger> logger_; + bool verbose_{false}; + bool daemon_{false}; + std::string host_{"localhost"}; + uint16_t port_{12000}; +}; + +bool parse_args(int argc, char** argv, std::shared_ptr<Logger> logger, + Context& context, int& exit_code) { + auto args = Args::create(); + auto opt_help = args->add_option('h', "help", "display this text and exit"); + auto opt_version = args->add_option( + 'V', "version", "display version and exit"); + auto opt_verbose = args->add_option('v', "verbose", "be verbose"); + auto opt_daemon = args->add_option( + 'd', "daemon", "fork into the background as a daemon"); + auto opt_logfile = args->add_option_with_arg( + 'L', "log", "log to FILE instead of default (" + "syslog for daemon, stdout otherwise)", "FILE"); + auto opt_host = args->add_option_with_arg( + 'h', "host", "HOST to listen for requests on (default localhost)", + "HOST"); + auto opt_port = args->add_option_with_arg( + 'p', "port", "PORT to listen for requests on (default 12000)", + "PORT"); + std::vector<std::string> extra; + if (!args->run(argc, argv, "buildhelper", std::cerr, extra)) { + std::cerr << "Try `buildhelper --help` for usage." << std::endl; + exit_code = 1; + return false; + } + if (opt_help->is_set()) { + std::cout << "Usage `buildhelper [OPTIONS]`.\n" + << "Runs a buildhelper instance, implementing the Remote Execution API. Answers requests from bazel and others.\n" + << "\n" + << "Options:" << std::endl; + args->print_descriptions(std::cout, 80); + exit_code = 0; + return false; + } + if (opt_version->is_set()) { + std::cout << "buildhelper 0.1 written by Joel Klinghed <the_jk@spawned.biz>" + << std::endl; + exit_code = 0; + return false; + } + + context.verbose_ = opt_verbose->is_set(); + context.daemon_ = opt_daemon->is_set(); + + if (opt_host->is_set()) { + context.host_ = opt_host->arg(); + } + + if (opt_port->is_set()) { + auto const& arg = opt_port->arg(); + auto [ptr, err] = + std::from_chars(arg.data(), arg.data() + arg.size(), context.port_); + if (ptr != arg.data() + arg.size() || err != std::errc()) { + std::cerr << "Invalid port value: " << arg << std::endl; + exit_code = 1; + return false; + } + } + + if (opt_logfile->is_set()) { + if (opt_logfile->arg() == "-") { + if (context.daemon_) { + std::cerr << "Cannot log to stdout AND fork into background." + << std::endl; + exit_code = 1; + return false; + } + context.logger_ = logger; + } else { + context.logger_ = LoggerFile::create( + std::filesystem::path(opt_logfile->arg()), + logger.get()); + } + } else { + if (context.daemon_) { + context.logger_ = LoggerSyslog::create("buildhelper"); + } else { + context.logger_ = logger; + } + } + + return true; +} + +} // namespace + +int main(int argc, char** argv) { + std::shared_ptr<Logger> logger = Logger::create_stdio(); + Context context; + int exit_code = 0; + if (!parse_args(argc, argv, logger, context, exit_code)) { + return exit_code; + } + + auto server = Server::create(); + if (!server->setup(logger, context.host_, context.port_)) { + return 1; + } + + if (context.daemon_) { + auto pid = fork(); + if (pid == -1) { + logger->err("Unable to fork: %s", strerror(errno)); + return 1; + } + if (pid == 0) { + // Child process + // Close stdin, stdout and stderr. + close(0); + close(1); + close(2); + + // Chdir to root, avoid keeping started directory alive + chdir("/"); + + // Create new session + setsid(); + } else { + // Parent process + if (context.verbose_) + logger->info("Forked as %ld", static_cast<long>(pid)); + // Don't run atexit or anything like that, all is owned by child. + _exit(0); + } + } + + logger.reset(); + + if (server->run(context.logger_)) + return 0; + return 1; +} diff --git a/src/net.cc b/src/net.cc new file mode 100644 index 0000000..bb25e69 --- /dev/null +++ b/src/net.cc @@ -0,0 +1,136 @@ +#include "net.hh" + +#include <charconv> +#include <cerrno> +#include <cstring> +#include <netdb.h> +#include <sys/socket.h> +#include <sys/types.h> + +namespace net { + +std::vector<unique_fd> bind_and_listen( + Logger* logger, std::string const& host, uint16_t port) { + std::vector<unique_fd> fds; + + std::string service; + service.resize(10); + { + auto [ptr, err] = std::to_chars(service.data(), + service.data() + service.size(), port); + if (err != std::errc{}) { + logger->err("Invalid port: %u", static_cast<unsigned int>(port)); + return {}; + } + service.resize(ptr - service.data()); + } + + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV | AI_ADDRCONFIG; + struct addrinfo* result; + + if (auto err = getaddrinfo(host.empty() ? nullptr : host.c_str(), + service.c_str(), + &hints, &result)) { + logger->err("Unable to resolve %s:%u: %s", + host.c_str(), static_cast<unsigned int>(port), + gai_strerror(err)); + return {}; + } + + for (auto* rp = result; rp; rp = rp->ai_next) { + unique_fd fd(socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol)); + if (!fd) { + logger->warn("Unable to create socket: %s", strerror(errno)); + continue; + } + + if (bind(fd.get(), rp->ai_addr, rp->ai_addrlen)) { + logger->warn("Unable to bind socket: %s", strerror(errno)); + continue; + } + + fds.push_back(std::move(fd)); + } + + freeaddrinfo(result); + + { + auto it = fds.begin(); + while (it != fds.end()) { + if (listen(it->get(), SOMAXCONN)) { + logger->err("Unable to listen: %s", strerror(errno)); + it = fds.erase(it); + } else { + ++it; + } + } + } + return fds; +} + +unique_fd connect(Logger* logger, std::string const& host, uint16_t port) { + unique_fd fd; + std::string service; + service.resize(10); + { + auto [ptr, err] = std::to_chars(service.data(), + service.data() + service.size(), port); + if (err != std::errc{}) { + logger->err("Invalid port: %u", static_cast<unsigned int>(port)); + return fd; + } + service.resize(ptr - service.data()); + } + + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG; + struct addrinfo* result; + + if (auto err = getaddrinfo(host.c_str(), service.c_str(), + &hints, &result)) { + logger->err("Unable to resolve %s:%u: %s", + host.c_str(), static_cast<unsigned int>(port), + gai_strerror(err)); + return fd; + } + + for (auto* rp = result; rp; rp = rp->ai_next) { + // Note: SOCK_NONBLOCK is a linux 2.6.27+ thing. + fd.reset(socket(rp->ai_family, rp->ai_socktype | SOCK_NONBLOCK, + rp->ai_protocol)); + if (!fd) { + logger->warn("Unable to create socket: %s", strerror(errno)); + continue; + } + + if (connect(fd.get(), rp->ai_addr, rp->ai_addrlen)) { + logger->warn("Unable to connect socket: %s", strerror(errno)); + fd.reset(); + continue; + } + + break; + } + + freeaddrinfo(result); + return fd; +} + +unique_fd accept(Logger* logger, int fd) { + unique_fd ret(accept4(fd, nullptr, nullptr, SOCK_NONBLOCK)); + if (!ret) { + if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { + logger->warn("Error accepting connection: %s", strerror(errno)); + } + } + return ret; +} + +} // namespace net diff --git a/src/net.hh b/src/net.hh new file mode 100644 index 0000000..918bde9 --- /dev/null +++ b/src/net.hh @@ -0,0 +1,22 @@ +#ifndef NET_HH +#define NET_HH + +#include <cstdint> +#include <string> +#include <vector> + +#include "logger.hh" +#include "unique_fd.hh" + +namespace net { + +std::vector<unique_fd> bind_and_listen( + Logger* logger, std::string const& host, uint16_t port); + +unique_fd connect(Logger* logger, std::string const& host, uint16_t port); + +unique_fd accept(Logger* logger, int fd); + +} // namespace net + +#endif // NET_HH diff --git a/src/ro_buffer.cc b/src/ro_buffer.cc new file mode 100644 index 0000000..a071321 --- /dev/null +++ b/src/ro_buffer.cc @@ -0,0 +1,27 @@ +#include "ro_buffer.hh" + +#include <algorithm> +#include <cassert> + +size_t RoBuffer::read(void* data, size_t len) { + assert(data); + if (len == 0) + return 0; + auto* d = reinterpret_cast<char*>(data); + size_t got = 0; + while (true) { + size_t avail; + auto want = len - got; + auto* ptr = rbuf(want, avail); + if (avail == 0) + return got; + if (avail >= want) { + std::copy_n(ptr, want, d + got); + rcommit(want); + return len; + } + std::copy_n(ptr, avail, d + got); + rcommit(avail); + got += avail; + } +} diff --git a/src/ro_buffer.hh b/src/ro_buffer.hh new file mode 100644 index 0000000..93b2b3c --- /dev/null +++ b/src/ro_buffer.hh @@ -0,0 +1,23 @@ +#ifndef RO_BUFFER_HH +#define RO_BUFFER_HH + +#include <cstddef> + +class RoBuffer { +public: + virtual ~RoBuffer() = default; + + virtual bool empty() const = 0; + + virtual char const* rbuf(size_t want, size_t& avail) = 0; + virtual void rcommit(size_t bytes) = 0; + + size_t read(void* data, size_t len); + +protected: + RoBuffer() = default; + RoBuffer(RoBuffer const&) = delete; + RoBuffer& operator=(RoBuffer const&) = delete; +}; + +#endif // RO_BUFFER_HH diff --git a/src/server.cc b/src/server.cc new file mode 100644 index 0000000..57315df --- /dev/null +++ b/src/server.cc @@ -0,0 +1,275 @@ +#include "server.hh" + +#include <algorithm> +#include <cassert> +#include <cerrno> +#include <chrono> +#include <cstring> +#include <signal.h> +#include <unistd.h> + +#include "buffer.hh" +#include "logger.hh" +#include "looper.hh" +#include "net.hh" + +namespace { + +constexpr auto CLIENT_TIMEOUT = std::chrono::seconds(30); +constexpr size_t BUFFER_SIZE = 1 * 1024 * 1024; + +int g_signal_pipe[2]; + +void signal_handler(int) { + write(g_signal_pipe[1], "1", 1); +} + +class ServerImpl : public Server { + public: + ServerImpl() + : looper_(Looper::create()) {} + + ~ServerImpl() override { + for (auto& fd : listen_) { + looper_->remove(fd.get()); + } + } + + bool setup(std::shared_ptr<Logger> logger, std::string const& host, + uint16_t port) override { + logger_ = logger; + listen_ = net::bind_and_listen(logger.get(), host, port); + if (listen_.empty()) { + logger->err("Unable to listen"); + return false; + } + + for (auto& fd : listen_) { + looper_->add(fd.get(), Looper::EVENT_READ, + std::bind(&ServerImpl::listen_read, this, fd.get(), + std::placeholders::_1)); + } + return true; + } + + bool run(std::shared_ptr<Logger> logger) override { + logger_ = logger; + + bool close_pipe; + if (pipe(g_signal_pipe) == 0) { + close_pipe = true; + struct sigaction sa; + sa.sa_handler = signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART; + sigaction(SIGINT, &sa, nullptr); + sigaction(SIGTERM, &sa, nullptr); + sigaction(SIGQUIT, &sa, nullptr); + + looper_->add(g_signal_pipe[0], Looper::EVENT_READ, + std::bind(&ServerImpl::signal, this, std::placeholders::_1)); + } else { + logger->warn("Unable to create a pipe: %s", strerror(errno)); + close_pipe = false; + } + + bool ret = looper_->run(logger.get()); + + if (close_pipe) { + looper_->remove(g_signal_pipe[0]); + close(g_signal_pipe[0]); + close(g_signal_pipe[1]); + } + return ret; + } + + private: + void signal(uint8_t) { + looper_->quit(); + } + + struct Client { + unique_fd fd_; + uint8_t events_{Looper::EVENT_WRITE}; + std::unique_ptr<Buffer> in_; + std::unique_ptr<Buffer> out_; + uint32_t watchdog_{0}; + std::chrono::steady_clock::time_point last_event_; + + explicit Client(unique_fd fd) + : fd_(std::move(fd)) {} + }; + + void listen_read(int fd, uint8_t events) { + if (events & Looper::EVENT_ERROR) { + auto it = std::find_if(listen_.begin(), listen_.end(), + [fd](auto& listen) { + return listen.get() == fd; + }); + if (it != listen_.end()) { + logger_->warn("Listening socket error"); + listen_.erase(it); + if (listen_.empty()) { + logger_->err("No listening sockets left"); + looper_->quit(); + return; + } + } else { + assert(false); + } + return; + } + + unique_fd client_fd = net::accept(logger_.get(), fd); + if (client_fd) { + logger_->dbg("New client: %d", client_fd.get()); + auto client = new Client(std::move(client_fd)); + client->in_ = Buffer::fixed(BUFFER_SIZE); + client->out_ = Buffer::fixed(BUFFER_SIZE); + clients_.push_back(client); + + looper_->add(client->fd_.get(), client->events_, + std::bind(&ServerImpl::client_event, + this, client, + std::placeholders::_1)); + client->watchdog_ = looper_->schedule( + CLIENT_TIMEOUT.count(), + std::bind(&ServerImpl::client_check_timeout, + this, client, + std::placeholders::_1)); + } + } + + void client_event(Client* client, uint8_t events) { + if (events & Looper::EVENT_ERROR) { + logger_->warn("%d: Unexpected client error", client->fd_.get()); + remove_client(client); + return; + } + + client->last_event_ = std::chrono::steady_clock::now(); + + if (events & Looper::EVENT_READ && client->in_) { + while (true) { + size_t avail; + auto* ptr = client->in_->wbuf(65535, avail); + if (avail == 0) + break; + auto ret = read(client->fd_.get(), ptr, avail); + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + if (errno == EINTR) + continue; + logger_->warn("%d: Read error: %s", client->fd_.get(), + strerror(errno)); + break; + } else if (ret == 0) { + logger_->dbg("%d: Connection closed", client->fd_.get()); + if (client->out_->empty()) { + remove_client(client); + return; + } else { + // Might be only read side that is shutdown, try writing the + // rest before we give up. + client->in_.reset(); + break; + } + } else { + client->in_->wcommit(ret); + if (static_cast<size_t>(ret) < avail) + break; + } + } + } + + // Always try to write, usually we write as a result of a read above + // so saves having to call back to the loop just to answer. + // And if there wasn't a read event then we already know there was an write + // event. + if (/* events & Looper::EVENT_WRITE */ true) { + while (true) { + size_t avail; + auto* ptr = client->out_->rbuf(65535, avail); + if (avail == 0) + break; + auto ret = write(client->fd_.get(), ptr, avail); + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + if (errno == EINTR) + continue; + logger_->warn("%d: Write error: %s", client->fd_.get(), + strerror(errno)); + break; + } else { + client->out_->rcommit(ret); + if (static_cast<size_t>(ret) < avail) + break; + } + } + } + + auto const old_events = client->events_; + client->events_ = 0; + + if (client->in_) { + if (!client->in_->full()) + client->events_ |= Looper::EVENT_READ; + if (!client->out_->empty()) + client->events_ |= Looper::EVENT_WRITE; + } else { + if (client->out_->empty()) { + remove_client(client); + return; + } else { + client->events_ |= Looper::EVENT_WRITE; + } + } + + if (client->events_ != old_events) { + looper_->update(client->fd_.get(), client->events_); + } + } + + void client_check_timeout(Client* client, uint32_t) { + auto since_last = std::chrono::steady_clock::now() - client->last_event_; + if (since_last >= CLIENT_TIMEOUT) { + client->watchdog_ = 0; + remove_client(client); + return; + } + auto delay = CLIENT_TIMEOUT - since_last; + client->watchdog_ = looper_->schedule( + std::chrono::duration_cast<std::chrono::seconds>(delay).count(), + std::bind(&ServerImpl::client_check_timeout, + this, client, + std::placeholders::_1)); + } + + void remove_client(Client* client) { + if (client->fd_) + looper_->remove(client->fd_.get()); + if (client->watchdog_) + looper_->cancel(client->watchdog_); + auto it = std::find(clients_.begin(), clients_.end(), client); + if (it != clients_.end()) { + clients_.erase(it); + } else { + assert(false); + } + delete client; + } + + std::shared_ptr<Looper> looper_; + std::shared_ptr<Logger> logger_; + std::vector<unique_fd> listen_; + std::vector<Client*> clients_; +}; + +} // namespace + +// static +std::unique_ptr<Server> Server::create() { + return std::make_unique<ServerImpl>(); +} diff --git a/src/server.hh b/src/server.hh new file mode 100644 index 0000000..9e73c0d --- /dev/null +++ b/src/server.hh @@ -0,0 +1,26 @@ +#ifndef SERVER_HH +#define SERVER_HH + +#include <cstdint> +#include <memory> +#include <string> + +class Logger; + +class Server { + public: + virtual ~Server() = default; + + static std::unique_ptr<Server> create(); + + virtual bool setup(std::shared_ptr<Logger> logger, std::string const& host, + uint16_t port) = 0; + + virtual bool run(std::shared_ptr<Logger> logger) = 0; + + protected: + Server() = default; + Server(Server const&) = delete; +}; + +#endif // SERVER_HH diff --git a/src/unique_fd.cc b/src/unique_fd.cc new file mode 100644 index 0000000..b864fb4 --- /dev/null +++ b/src/unique_fd.cc @@ -0,0 +1,9 @@ +#include "unique_fd.hh" + +#include <unistd.h> + +void unique_fd::reset(int fd) { + if (fd_ >= 0) + close(fd_); + fd_ = fd; +} diff --git a/src/unique_fd.hh b/src/unique_fd.hh new file mode 100644 index 0000000..dc60b3f --- /dev/null +++ b/src/unique_fd.hh @@ -0,0 +1,48 @@ +#ifndef UNIQUE_FD_HH +#define UNIQUE_FD_HH + +#include <cstddef> + +class unique_fd { +public: + constexpr unique_fd() noexcept + : fd_(-1) {} + constexpr unique_fd(std::nullptr_t) noexcept + : fd_(-1) {} + explicit unique_fd(int fd) noexcept + : fd_(fd) {} + unique_fd(unique_fd&& fd) noexcept + : fd_(fd.release()) {} + + ~unique_fd() { reset(); } + + unique_fd& operator=(unique_fd&& fd) noexcept { + reset(fd.release()); + return *this; + } + unique_fd& operator=(std::nullptr_t) noexcept { + reset(); + return *this; + } + + int get() const noexcept { return fd_; } + int operator*() const { return get(); } + + explicit operator bool() const noexcept { return fd_ >= 0; } + + int release() noexcept { + int ret = fd_; + fd_ = -1; + return ret; + } + + void reset(int fd = -1); + +private: + unique_fd(unique_fd const&) = delete; + unique_fd& operator=(unique_fd const&) = delete; + + int fd_; +}; + +#endif // UNIQUE_FD_HH diff --git a/subprojects/protobuf.wrap b/subprojects/protobuf.wrap new file mode 100644 index 0000000..c6d2ebb --- /dev/null +++ b/subprojects/protobuf.wrap @@ -0,0 +1,16 @@ +[wrap-file] +directory = protobuf-25.2 +source_url = https://github.com/protocolbuffers/protobuf/releases/download/v25.2/protobuf-25.2.tar.gz +source_filename = protobuf-25.2.tar.gz +source_hash = 8ff511a64fc46ee792d3fe49a5a1bcad6f7dc50dfbba5a28b0e5b979c17f9871 +patch_filename = protobuf_25.2-2_patch.zip +patch_url = https://wrapdb.mesonbuild.com/v2/protobuf_25.2-2/get_patch +patch_hash = a2f5968097eb036c228b72258435d09e93dca4093d09acb5078a376d8155df46 +source_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/protobuf_25.2-2/protobuf-25.2.tar.gz +wrapdb_version = 25.2-2 + +[provide] +protobuf = protobuf_dep +protobuf-lite = protobuf_lite_dep +protoc = protoc_dep +program_names = protoc |
