From 71ace33728c9b215ca90719a9192aec9a531639e Mon Sep 17 00:00:00 2001 From: Joel Klinghed Date: Thu, 4 Jun 2015 23:39:29 +0200 Subject: Adding sender --- src/.gitignore | 2 + src/Makefile.am | 14 +- src/event_main.cc | 31 ++-- src/json.cc | 388 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/json.hh | 100 +++++++++++++ src/sender.cc | 309 ++++++++++++++++++++++++++++++++++++++++ src/sender_client.cc | 217 ++++++++++++++++++++++++++++ src/sender_client.hh | 30 ++++ 8 files changed, 1075 insertions(+), 16 deletions(-) create mode 100644 src/json.cc create mode 100644 src/json.hh create mode 100644 src/sender.cc create mode 100644 src/sender_client.cc create mode 100644 src/sender_client.hh (limited to 'src') diff --git a/src/.gitignore b/src/.gitignore index 4a2bd2c..85ad493 100644 --- a/src/.gitignore +++ b/src/.gitignore @@ -4,4 +4,6 @@ /stamp-h1 /libcgi.la /libdb.la +/libutil.la /event +/sender diff --git a/src/Makefile.am b/src/Makefile.am index 6b6b797..8ab164c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -3,25 +3,29 @@ MAINTAINERCLEANFILES = Makefile.in AM_CPPFLAGS = @DEFINES@ -DLOCALSTATEDIR='"@localstatedir@/stuff"' \ -DSYSCONFDIR='"@sysconfdir@/stuff"' -bin_PROGRAMS = event -noinst_LTLIBRARIES = libdb.la libcgi.la +bin_PROGRAMS = event sender +noinst_LTLIBRARIES = libdb.la libcgi.la libutil.la event_SOURCES = event.cc event.hh event_main.cc common.hh cgi.hh db.hh \ - fsutils.cc fsutils.hh config.cc config.hh + sender_client.cc sender_client.hh event_LDADD = libdb.la libcgi.la +sender_SOURCES = common.hh sender.cc json.hh json.cc +sender_LDADD = libutil.la + libcgi_la_SOURCES = cgi.hh common.hh cgi.cc \ query_parser.hh query_parser.cc \ header_parser.hh header_parser.cc \ - strutils.hh strutils.cc \ args.hh args.cc \ http.hh http.cc \ multipart_formdata_parser.hh multipart_formdata_parser.cc libcgi_la_CPPFLAGS = $(AM_CPPFLAGS) @FASTCGI_CFLAGS@ -libcgi_la_LIBADD = @FASTCGI_LIBS@ +libcgi_la_LIBADD = @FASTCGI_LIBS@ libutil.la libdb_la_SOURCES = db.hh common.hh db.cc sqlite3_db.hh sqlite3_db.cc libdb_la_CPPFLAGS = $(AM_CPPFLAGS) @SQLITE3_CFLAGS@ libdb_la_LIBADD = @SQLITE3_LIBS@ +libutil_la_SOURCES = common.hh fsutils.cc fsutils.hh config.cc config.hh \ + strutils.hh strutils.cc diff --git a/src/event_main.cc b/src/event_main.cc index 200b035..8ba68d9 100644 --- a/src/event_main.cc +++ b/src/event_main.cc @@ -16,6 +16,7 @@ #include "event.hh" #include "fsutils.hh" #include "http.hh" +#include "sender_client.hh" #include "sqlite3_db.hh" /* @@ -35,6 +36,7 @@ using namespace stuff; namespace { std::unique_ptr g_cfg; +std::unique_ptr g_sender; std::shared_ptr open(const std::string& channel) { std::string tmp = channel; @@ -95,11 +97,13 @@ std::string format_date(time_t date) { return tmp; } -void signal_channel(const std::string& str) { - +void signal_channel(const std::string& channel, const std::string& str) { + if (!g_sender) return; + g_sender->send(channel, str); } -void signal_event(const std::unique_ptr& event) { +void signal_event(const std::string& channel, + const std::unique_ptr& event) { std::ostringstream ss; ss << event->name() << " @ " << format_date(event->start()) << std::endl; if (!event->text().empty()) { @@ -107,7 +111,7 @@ void signal_event(const std::unique_ptr& event) { } ss << std::endl; ss << "Use /going to join the event" << std::endl; - signal_channel(ss.str()); + signal_channel(channel, ss.str()); } bool parse_time(const std::string& value, time_t* date) { @@ -190,7 +194,7 @@ bool create(const std::string& channel, Http::response(200, "Event created"); auto next_event = Event::next(db); if (next_event->id() == event->id()) { - signal_event(next_event); + signal_event(channel, next_event); } return true; } @@ -256,7 +260,7 @@ bool cancel(const std::string& channel, Http::response(200, "Event removed"); } if (!signal.empty()) { - signal_channel(signal); + signal_channel(channel, signal); } return true; } @@ -341,8 +345,9 @@ bool update(const std::string& channel, if (event->store()) { Http::response(200, "Event updated"); auto next_event = Event::next(db); - if (next_event->id() != first_event || next_event->id() == event->id()) { - signal_event(next_event); + if (next_event->id() != first_event || + next_event->id() == event->id()) { + signal_event(channel, next_event); } } else { Http::response(200, "Update failed"); @@ -466,10 +471,10 @@ bool going(const std::string& channel, auto next_event = Event::next(db); if (next_event->id() == event->id()) { if (going) { - signal_channel(user_name + " will be attending " + + signal_channel(channel, user_name + " will be attending " + event->name()); } else { - signal_channel(user_name + " will not be attending " + + signal_channel(channel, user_name + " will not be attending " + event->name()); } } @@ -584,5 +589,9 @@ int main() { if (!g_cfg->load("./event.config")) { g_cfg->load(SYSCONFDIR "/event.config"); } - return CGI::run(handle_request); + g_sender = SenderClient::create(g_cfg.get()); + int ret = CGI::run(handle_request); + g_sender.reset(); + g_cfg.reset(); + return ret; } diff --git a/src/json.cc b/src/json.cc new file mode 100644 index 0000000..c6a1111 --- /dev/null +++ b/src/json.cc @@ -0,0 +1,388 @@ +#include "common.hh" + +#include +#include +#include + +#include "json.hh" + +namespace stuff { + +namespace { + +enum class JsonType { + STRING, + OBJECT, + ARRAY, + INT64, + DOUBLE, + BOOL, +}; + +struct JsonValue { + const JsonType type; + JsonValue(JsonType type) + : type(type) { + } +}; + +struct StringJsonValue : public JsonValue { + StringJsonValue(const std::string& str) + : JsonValue(JsonType::STRING), str(str) { + } + std::string str; +}; + +struct ObjectJsonValue : public JsonValue { + ObjectJsonValue(std::shared_ptr obj) + : JsonValue(JsonType::OBJECT), obj(obj) { + } + std::shared_ptr obj; +}; + +struct ArrayJsonValue : public JsonValue { + ArrayJsonValue(std::shared_ptr array) + : JsonValue(JsonType::ARRAY), array(array) { + } + std::shared_ptr array; +}; + +struct BasicJsonValue : public JsonValue { + BasicJsonValue(double d) + : JsonValue(JsonType::DOUBLE), data({ .d = d }) { + } + BasicJsonValue(int64_t i) + : JsonValue(JsonType::INT64), data({ .i = i }) { + } + BasicJsonValue(bool b) + : JsonValue(JsonType::BOOL), data({ .b = b }) { + } + union { + double d; + int64_t i; + bool b; + } data; +}; + +class JsonObjectImpl; +class JsonArrayImpl; + +std::ostream& write(std::ostream& os, const JsonObjectImpl* obj); +std::ostream& write(std::ostream& os, const JsonArrayImpl* array); +std::ostream& write(std::ostream& os, const JsonValue* value); + +class JsonObjectImpl : public JsonObject { +public: + JsonObjectImpl() { + } + + void put(const std::string& name, const std::string& value) override { + put(name, new StringJsonValue(value)); + } + void put(const std::string& name, double value) override { + put(name, new BasicJsonValue(value)); + } + void put(const std::string& name, int64_t value) override { + put(name, new BasicJsonValue(value)); + } + void put(const std::string& name, bool value) override { + put(name, new BasicJsonValue(value)); + } + void put(const std::string& name, std::nullptr_t value) override { + put(name, value); + } + void put(const std::string& name, + std::shared_ptr obj) override { + if (obj) { + put(name, new ObjectJsonValue(obj)); + } else { + put(name, nullptr); + } + } + void put(const std::string& name, + std::shared_ptr array) override { + if (array) { + put(name, new ArrayJsonValue(array)); + } else { + put(name, nullptr); + } + } + + bool contains(const std::string& name) const override { + return data_.find(name) != data_.end(); + } + bool is_null(const std::string& name) const override { + auto it = data_.find(name); + return it != data_.end() && !it->second; + } + const std::string& get(const std::string& name, + const std::string& fallback) const override { + auto it = data_.find(name); + if (it == data_.end() || !it->second) return fallback; + return it->second->type == JsonType::STRING ? + static_cast(it->second.get())->str : fallback; + } + double get(const std::string& name, double fallback) const override { + auto it = data_.find(name); + if (it == data_.end() || !it->second) return fallback; + return it->second->type == JsonType::DOUBLE ? + static_cast(it->second.get())->data.d : fallback; + } + int64_t get(const std::string& name, int64_t fallback) const override { + auto it = data_.find(name); + if (it == data_.end() || !it->second) return fallback; + return it->second->type == JsonType::INT64 ? + static_cast(it->second.get())->data.i : fallback; + } + bool get(const std::string& name, bool fallback) const override { + auto it = data_.find(name); + if (it == data_.end() || !it->second) return fallback; + return it->second->type == JsonType::BOOL ? + static_cast(it->second.get())->data.b : fallback; + } + bool get(const std::string& name, + std::shared_ptr* obj) const override { + auto it = data_.find(name); + obj->reset(); + if (it == data_.end() || !it->second) return false; + if (it->second->type != JsonType::OBJECT) return false; + *obj = static_cast(it->second.get())->obj; + return true; + } + bool get(const std::string& name, + std::shared_ptr* array) const override { + auto it = data_.find(name); + array->reset(); + if (it == data_.end() || !it->second) return false; + if (it->second->type != JsonType::ARRAY) return false; + *array = static_cast(it->second.get())->array; + return true; + } + + std::string str() const { + std::ostringstream ss; + write(ss, this); + return ss.str(); + } + + void put(const std::string& name, std::unique_ptr ptr) { + data_[name].swap(ptr); + } + + std::unordered_map> data_; +}; + +class JsonArrayImpl : public JsonArray { +public: + JsonArrayImpl() { + } + + void put(size_t index, const std::string& value) override { + put(index, new StringJsonValue(value)); + } + void put(size_t index, double value) override { + put(index, new BasicJsonValue(value)); + } + void put(size_t index, int64_t value) override { + put(index, new BasicJsonValue(value)); + } + void put(size_t index, bool value) override { + put(index, new BasicJsonValue(value)); + } + void put(size_t index, std::nullptr_t value) override { + put(index, value); + } + void put(size_t index, + std::shared_ptr obj) override { + if (obj) { + put(index, new ObjectJsonValue(obj)); + } else { + put(index, nullptr); + } + } + void put(size_t index, + std::shared_ptr array) override { + if (array) { + put(index, new ArrayJsonValue(array)); + } else { + put(index, nullptr); + } + } + + bool is_null(size_t index) const override { + return index < data_.size() && !data_[index]; + } + const std::string& get(size_t index, + const std::string& fallback) const override { + if (index >= data_.size() || !data_[index]) return fallback; + return data_[index]->type == JsonType::STRING ? + static_cast(data_[index].get())->str : fallback; + } + double get(size_t index, double fallback) const override { + if (index >= data_.size() || !data_[index]) return fallback; + return data_[index]->type == JsonType::DOUBLE ? + static_cast(data_[index].get())->data.d : fallback; + } + int64_t get(size_t index, int64_t fallback) const override { + if (index >= data_.size() || !data_[index]) return fallback; + return data_[index]->type == JsonType::INT64 ? + static_cast(data_[index].get())->data.i : fallback; + } + bool get(size_t index, bool fallback) const override { + if (index >= data_.size() || !data_[index]) return fallback; + return data_[index]->type == JsonType::BOOL ? + static_cast(data_[index].get())->data.b : fallback; + } + bool get(size_t index, + std::shared_ptr* obj) const override { + obj->reset(); + if (index >= data_.size() || !data_[index]) return false; + if (data_[index]->type != JsonType::OBJECT) return false; + *obj = static_cast(data_[index].get())->obj; + return true; + } + bool get(size_t index, + std::shared_ptr* array) const override { + array->reset(); + if (index >= data_.size() || !data_[index]) return false; + if (data_[index]->type != JsonType::ARRAY) return false; + *array = static_cast(data_[index].get())->array; + return true; + } + + std::string str() const { + std::ostringstream ss; + write(ss, this); + return ss.str(); + } + + size_t size() const override { + return data_.size(); + } + + void resize(size_t size) override { + data_.resize(size); + } + + void put(size_t index, std::unique_ptr ptr) { + while (index >= data_.size()) { + data_.emplace_back(nullptr); + } + data_[index].swap(ptr); + } + + std::vector> data_; +}; + +std::ostream& quoted(std::ostream& os, const std::string& str) { + os << '"'; + size_t last = 0; + for (size_t i = 0; i < str.size(); ++i) { + if (str[i] == '"' || str[i] == '\\') { + os << str.substr(last, i - last); + os << '\\'; + last = i; + } + } + os << str.substr(last); + return os << '"'; +} + +std::ostream& write(std::ostream& os, const JsonObjectImpl* obj) { + os << '{'; + bool first = true; + for (const auto& pair : obj->data_) { + if (!first) { + os << ','; + } else { + first = false; + } + quoted(os, pair.first); + os << ':'; + write(os, pair.second.get()); + } + return os << '}'; +} + +std::ostream& write(std::ostream& os, const JsonArrayImpl* array) { + os << '['; + bool first = true; + for (const auto& value : array->data_) { + if (!first) { + os << ','; + } else { + first = false; + } + write(os, value.get()); + } + return os << ']'; +} + +std::ostream& write(std::ostream& os, const JsonValue* value) { + if (value) { + switch (value->type) { + case JsonType::STRING: + return quoted(os, static_cast(value)->str); + case JsonType::DOUBLE: + return os << static_cast(value)->data.d; + case JsonType::INT64: + return os << static_cast(value)->data.i; + case JsonType::BOOL: + return os << (static_cast(value)->data.b ? + "true" : "false"); + case JsonType::OBJECT: + return write( + os, static_cast( + static_cast(value) + ->obj.get())); + case JsonType::ARRAY: + return write( + os, static_cast( + static_cast(value) + ->array.get())); + } + } + return os << "null"; +} + +} // namespace + +// static +std::shared_ptr JsonObject::create() { + return std::make_shared(); +} + +void JsonArray::add(const std::string& value) { + put(size(), value); +} + +void JsonArray::add(double value) { + put(size(), value); +} + +void JsonArray::add(int64_t value) { + put(size(), value); +} + +void JsonArray::add(bool value) { + put(size(), value); +} + +void JsonArray::add(std::nullptr_t value) { + put(size(), value); +} + +void JsonArray::add(std::shared_ptr obj) { + put(size(), obj); +} + +void JsonArray::add(std::shared_ptr arr) { + put(size(), arr); +} + +// static +std::shared_ptr JsonArray::create() { + return std::make_shared(); +} + +} // namespace stuff diff --git a/src/json.hh b/src/json.hh new file mode 100644 index 0000000..e14982e --- /dev/null +++ b/src/json.hh @@ -0,0 +1,100 @@ +#ifndef JSON_HH +#define JSON_HH + +#include +#include + +namespace stuff { + +class JsonArray; + +class JsonObject { +public: + virtual ~JsonObject() {} + virtual void put(const std::string& name, const std::string& value) = 0; + virtual void put(const std::string& name, double value) = 0; + virtual void put(const std::string& name, int64_t value) = 0; + virtual void put(const std::string& name, bool value) = 0; + virtual void put(const std::string& name, std::nullptr_t value) = 0; + virtual void put(const std::string& name, + std::shared_ptr obj) = 0; + virtual void put(const std::string& name, + std::shared_ptr arr) = 0; + + virtual bool contains(const std::string& name) const = 0; + virtual bool is_null(const std::string& name) const = 0; + virtual const std::string& get(const std::string& name, + const std::string& fallback + = std::string()) const = 0; + virtual double get(const std::string& name, + double fallback = 0.0) const = 0; + virtual int64_t get(const std::string& name, + int64_t fallback = 0) const = 0; + virtual bool get(const std::string& name, bool fallback) const = 0; + virtual bool get(const std::string& name, + std::shared_ptr* obj) const = 0; + virtual bool get(const std::string& name, + std::shared_ptr* arr) const = 0; + + virtual std::string str() const = 0; + + static std::shared_ptr create(); + +protected: + JsonObject() {} + JsonObject(const JsonObject&) = delete; + JsonObject& operator=(const JsonObject&) = delete; +}; + +class JsonArray { +public: + virtual ~JsonArray() {} + + virtual size_t size() const = 0; + virtual void resize(size_t size) = 0; + + virtual void put(size_t index, const std::string& value) = 0; + virtual void put(size_t index, double value) = 0; + virtual void put(size_t index, int64_t value) = 0; + virtual void put(size_t index, bool value) = 0; + virtual void put(size_t index, std::nullptr_t value) = 0; + virtual void put(size_t index, + std::shared_ptr obj) = 0; + virtual void put(size_t index, + std::shared_ptr arr) = 0; + + void add(const std::string& value); + void add(double value); + void add(int64_t value); + void add(bool value); + void add(std::nullptr_t value); + void add(std::shared_ptr obj); + void add(std::shared_ptr arr); + + virtual bool is_null(size_t index) const = 0; + virtual const std::string& get(size_t index, + const std::string& fallback + = std::string()) const = 0; + virtual double get(size_t index, + double fallback = 0.0) const = 0; + virtual int64_t get(size_t index, + int64_t fallback = 0) const = 0; + virtual bool get(size_t index, bool fallback) const = 0; + virtual bool get(size_t index, + std::shared_ptr* obj) const = 0; + virtual bool get(size_t index, + std::shared_ptr* arr) const = 0; + + virtual std::string str() const = 0; + + static std::shared_ptr create(); + +protected: + JsonArray() {} + JsonArray(const JsonArray&) = delete; + JsonArray& operator=(const JsonArray&) = delete; +}; + +} // namespace stuff + +#endif /* JSON_HH */ diff --git a/src/sender.cc b/src/sender.cc new file mode 100644 index 0000000..c5b84c4 --- /dev/null +++ b/src/sender.cc @@ -0,0 +1,309 @@ +#include "common.hh" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.hh" +#include "json.hh" + +/* +{ + "username": "new-bot-name", + + "text": " for details!" + + "icon_url": "https://slack.com/img/icons/app-57.png", + "icon_emoji": ":ghost:" + + "channel": "#other-channel", // A public channel override + "channel": "@username", // A Direct Message override +} + */ + +using namespace stuff; + +namespace { + +void queue_message(const std::string& channel, const std::string& message); + +class Client { +public: + Client(int sock) + : sock_(sock), have_channel_(false), size_(0) { + } + + ~Client() { + if (sock_ != -1) { + close(sock_); + } + } + + int sock() const { + return sock_; + } + + bool read() { + while (true) { + char buf[1024]; + auto ret = ::read(sock_, buf, sizeof(buf)); + if (ret < 0) { + if (errno == EINTR) continue; + return errno == EWOULDBLOCK || errno == EAGAIN; + } + if (ret == 0) { + return false; + } + size_t pos = 0; + auto const fill = static_cast(ret); + while (pos < fill) { + if (!have_channel_) { + if (size_ == 0) { + auto const avail = std::min(fill - pos, + static_cast(4)); + memcpy(buf_ + fill_, buf, avail); + fill_ += avail; + pos += avail; + if (fill_ == 4) { + fill_ = 0; + memcpy(&size_, buf_, 4); + size_ = ntohl(size_); + if (size_ == 0) { + have_channel_ = true; + } + } + } else { + auto const avail = std::min(fill - pos, + size_ - channel_.size()); + channel_.append(buf + pos, buf + pos + avail); + pos += avail; + if (channel_.size() == size_) { + have_channel_ = true; + } + } + } else { + if (size_ == 0) { + auto const avail = std::min(fill - pos, + static_cast(4)); + memcpy(buf_ + fill_, buf, avail); + fill_ += avail; + pos += avail; + if (fill_ == 4) { + fill_ = 0; + memcpy(&size_, buf_, 4); + size_ = ntohl(size_); + if (size_ == 0) { + send_message(); + } + } + } else { + auto const avail = std::min(fill - pos, + size_ - message_.size()); + channel_.append(buf + pos, buf + pos + avail); + pos += avail; + if (message_.size() == size_) { + send_message(); + } + } + } + } + } + } + +private: + void send_message() { + assert(have_channel_); + queue_message(channel_, message_); + have_channel_ = false; + size_ = 0; + channel_.clear(); + message_.clear(); + } + + int sock_; + + char buf_[4]; + size_t fill_; + + bool have_channel_; + uint32_t size_; + std::string channel_; + std::string message_; +}; + +bool g_quit; + +void quit(int sig) { + g_quit = true; +} + +static size_t const MAX_CLIENTS = 64; + +struct Info { + std::string username; + std::string icon_url; + std::string icon_emoji; +}; + +Info g_info; + +void queue_message(const std::string& channel, const std::string& message) { + auto obj = JsonObject::create(); + obj->put("username", g_info.username); + if (!g_info.icon_url.empty()) + obj->put("icon_url", g_info.icon_url); + if (!g_info.icon_emoji.empty()) + obj->put("icon_emoji", g_info.icon_emoji); + if (channel.empty()) return; + obj->put("channel", "#" + channel); + obj->put("text", message); + + +} + +} // namespace + +int main() { + auto cfg = Config::create(); + if (!cfg->load("./sender.config")) { + cfg->load(SYSCONFDIR "/sender.config"); + } + g_info.username = cfg->get("username", "stuff-sender-bot"); + g_info.icon_url = cfg->get("icon_url", ""); + g_info.icon_emoji = cfg->get("icon_emoji", ""); + auto const& listener = cfg->get("listener", ""); + if (listener.empty()) { + std::cerr << "No listener configured" << std::endl; + return EXIT_FAILURE; + } + + int sock_ = -1; + size_t pos = listener.find(':'); + if (pos != std::string::npos) { + // [host]:port + struct addrinfo hints, *res; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_PASSIVE; + if (getaddrinfo(pos == 0 ? nullptr : listener.substr(0, pos).c_str(), + listener.substr(pos + 1).c_str(), &hints, &res)) { + std::cerr << "Invalid host or port in: " << listener << std::endl; + return EXIT_FAILURE; + } + for (auto ptr = res; ptr; ptr = ptr->ai_next) { + sock_ = socket(ptr->ai_family, ptr->ai_socktype, + ptr->ai_protocol); + if (sock_ == -1) continue; + if (bind(sock_, res->ai_addr, res->ai_addrlen)) { + close(sock_); + sock_ = -1; + continue; + } + break; + } + freeaddrinfo(res); + if (sock_ == -1) { + std::cerr << "Unable to bind: " << listener << std::endl; + return EXIT_FAILURE; + } + } else { + // socket + sock_ = socket(PF_LOCAL, SOCK_STREAM, 0); + if (sock_ == -1) { + std::cerr << "Unable to create local socket: " << strerror(errno) + << std::endl; + return EXIT_FAILURE; + } + struct sockaddr_un name; + name.sun_family = AF_LOCAL; + strncpy(name.sun_path, listener.c_str(), sizeof(name.sun_path)); + name.sun_path[sizeof(name.sun_path) - 1] = '\0'; + if (bind(sock_, reinterpret_cast(&name), + SUN_LEN(&name))) { + std::cerr << "Bind failed: " << strerror(errno) << std::endl; + close(sock_); + sock_ = 1; + return EXIT_FAILURE; + } + } + + if (listen(sock_, 10)) { + std::cerr << "Listen failed: " << strerror(errno) << std::endl; + close(sock_); + return EXIT_FAILURE; + } + + int value = 1; + setsockopt(sock_, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value)); + + signal(SIGPIPE, SIG_IGN); + signal(SIGINT, quit); + signal(SIGTERM, quit); + + std::vector clients; + + while (!g_quit) { + fd_set read_set; + auto max = sock_ + 1; + FD_ZERO(&read_set); + FD_SET(sock_, &read_set); + for (auto it = clients.begin(); it != clients.end();) { + if (it->sock() == -1) { + it = clients.erase(it); + } else { + max = std::max(max, it->sock() + 1); + FD_SET(it->sock(), &read_set); + ++it; + } + } + auto ret = select(max, &read_set, nullptr, nullptr, nullptr); + if (ret < 0) { + if (errno == EINTR) continue; + std::cerr << "Select failed: " << strerror(errno); + close(sock_); + return EXIT_FAILURE; + } + for (auto it = clients.begin(); ret > 0 && it != clients.end();) { + if (FD_ISSET(it->sock(), &read_set)) { + ret--; + if (!it->read()) { + it = clients.erase(it); + } else { + ++it; + } + } else { + ++it; + } + } + if (ret > 0 && FD_ISSET(sock_, &read_set)) { + ret--; + auto sock = accept(sock_, nullptr, nullptr); + if (sock < 0) { + if (errno == EINTR) continue; + if (errno == EWOULDBLOCK || errno == EAGAIN) continue; + std::cerr << "Accept failed: " << strerror(errno); + close(sock_); + return EXIT_FAILURE; + } + if (clients.size() == MAX_CLIENTS) { + // Remove oldest + clients.erase(clients.begin()); + } + clients.emplace_back(sock); + } + } + close(sock_); + return EXIT_SUCCESS; +} diff --git a/src/sender_client.cc b/src/sender_client.cc new file mode 100644 index 0000000..c6129e9 --- /dev/null +++ b/src/sender_client.cc @@ -0,0 +1,217 @@ +#include "common.hh" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.hh" +#include "sender_client.hh" + +namespace stuff { + +namespace { + +class SenderClientImpl : public SenderClient { +public: + SenderClientImpl() + : sock_(-1) { + } + ~SenderClientImpl() override { + if (sock_ != -1) { + close(sock_); + } + } + + bool open(const Config* config) { + if (!config) return false; + sender_ = config->get("sender", ""); + if (sender_.empty()) return false; + return setup(); + } + + void send(const std::string& channel, const std::string& message) override { + struct timeval target; + gettimeofday(&target, NULL); + target.tv_sec += 5; + + send(channel, message, &target); + } + +private: + void send(const std::string& channel, const std::string& message, + const struct timeval* target) { + if (sock_ == -1) { + if (!setup()) return; + } + + uint32_t size1 = channel.size(); + uint32_t size2 = message.size(); + size_t pos = 0, len = 8 + size1 + size2; + size1 = htonl(size1); + size2 = htonl(size2); + while (pos < len) { + ssize_t ret; + if (pos < 4) { + size_t const avail = 4 - pos; + ret = write(sock_, + reinterpret_cast(&size1) + pos, avail); + if (ret > 0) { + pos += ret; + if (static_cast(ret) == avail) continue; + } + } else if (pos < 4 + channel.size()) { + size_t const avail = 4 + channel.size() - pos; + ret = write(sock_, channel.data() + (pos - 4), avail); + if (ret > 0) { + pos += ret; + if (static_cast(ret) == avail) continue; + } + } else if (pos < 8 + channel.size()) { + size_t const avail = 8 + channel.size() - pos; + ret = write(sock_, + reinterpret_cast(&size2) + pos, avail); + if (ret > 0) { + pos += ret; + if (static_cast(ret) == avail) continue; + } + } else { + size_t const avail = len - pos; + ret = write(sock_, message.data() + (pos - 8 - channel.size()), + avail); + if (ret > 0) { + pos += ret; + if (static_cast(ret) == avail) continue; + } + } + + if (ret < 0) { + if (errno == EINTR) continue; + if (errno != EAGAIN && errno != EWOULDBLOCK) { + close(sock_); + sock_ = -1; + return send(channel, message); + } + } + + fd_set write_set; + FD_ZERO(&write_set); + FD_SET(sock_, &write_set); + while (true) { + struct timeval timeout; + gettimeofday(&timeout, NULL); + if (target->tv_sec == timeout.tv_sec) { + timeout.tv_sec = 0; + if (target->tv_usec > timeout.tv_usec) { + timeout.tv_usec = target->tv_usec - timeout.tv_usec; + } else { + timeout.tv_usec = 0; + } + } else if (target->tv_sec > timeout.tv_sec) { + timeout.tv_sec = target->tv_sec - timeout.tv_sec; + if (target->tv_usec >= timeout.tv_usec) { + timeout.tv_usec = target->tv_usec - timeout.tv_usec; + } else { + timeout.tv_sec--; + timeout.tv_usec = + 1000000l + target->tv_usec - timeout.tv_usec; + } + } else { + timeout.tv_sec = 0; + timeout.tv_usec = 0; + } + auto ret = select(sock_ + 1, nullptr, &write_set, nullptr, + &timeout); + if (ret < 0 && errno == EINTR) continue; + if (ret <= 0) { + // Timeout or error + close(sock_); + sock_ = -1; + return send(channel, message); + } + break; + } + } + } + + bool setup() { + size_t pos = sender_.find(':'); + if (pos != std::string::npos) { + // host:port + struct addrinfo hints, *res; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + if (getaddrinfo(sender_.substr(0, pos).c_str(), + sender_.substr(pos + 1).c_str(), &hints, &res)) { + return false; + } + for (auto ptr = res; ptr; ptr = ptr->ai_next) { + sock_ = socket(ptr->ai_family, ptr->ai_socktype, + ptr->ai_protocol); + if (sock_ == -1) continue; + // TODO: Make async + if (connect(sock_, res->ai_addr, res->ai_addrlen)) { + close(sock_); + sock_ = -1; + continue; + } + break; + } + freeaddrinfo(res); + if (sock_ == -1) return false; + } else { + // socket + sock_ = socket(PF_LOCAL, SOCK_STREAM, 0); + if (sock_ == -1) return false; + struct sockaddr_un name; + name.sun_family = AF_LOCAL; + strncpy(name.sun_path, sender_.c_str(), sizeof(name.sun_path)); + name.sun_path[sizeof(name.sun_path) - 1] = '\0'; + // TODO: Make async + if (connect(sock_, reinterpret_cast(&name), + SUN_LEN(&name))) { + close(sock_); + sock_ = 1; + return false; + } + } + + int flags = fcntl(sock_, F_GETFL, 0); + if (flags < 0) { + close(sock_); + sock_ = -1; + return false; + } + if (!(flags & O_NONBLOCK)) { + flags |= O_NONBLOCK; + if (fcntl(sock_, F_SETFL, flags) < 0) { + close(sock_); + sock_ = -1; + return false; + } + } + + return true; + } + + std::string sender_; + int sock_; +}; + +} // namespace + +// static +std::unique_ptr SenderClient::create(const Config* config) { + std::unique_ptr ret(new SenderClientImpl()); + if (!ret->open(config)) return nullptr; + return std::move(ret); +} + +} // namespace stuff diff --git a/src/sender_client.hh b/src/sender_client.hh new file mode 100644 index 0000000..5d0bd35 --- /dev/null +++ b/src/sender_client.hh @@ -0,0 +1,30 @@ +#ifndef SENDER_CLIENT_HH +#define SENDER_CLIENT_HH + +#include +#include + +namespace stuff { + +class Config; + +class SenderClient { +public: + virtual ~SenderClient() {} + + virtual void send(const std::string& channel, + const std::string& message) = 0; + + static std::unique_ptr create(const Config* config); + +protected: + SenderClient() {} + +private: + SenderClient(const SenderClient&) = delete; + SenderClient& operator=(const SenderClient&) = delete; +}; + +} // namespace stuff + +#endif /* SENDER_CLIENT_HH */ -- cgit v1.2.3-70-g09d2