summaryrefslogtreecommitdiff
path: root/src/monitor.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/monitor.cc')
-rw-r--r--src/monitor.cc476
1 files changed, 476 insertions, 0 deletions
diff --git a/src/monitor.cc b/src/monitor.cc
new file mode 100644
index 0000000..673ad14
--- /dev/null
+++ b/src/monitor.cc
@@ -0,0 +1,476 @@
+// -*- mode: c++; c-basic-offset: 2; -*-
+
+#include "common.hh"
+
+#include <cstring>
+
+#include "buffer.hh"
+#include "chunked.hh"
+#include "http.hh"
+#include "io.hh"
+#include "looper.hh"
+#include "monitor.hh"
+#include "resolver.hh"
+
+namespace {
+
+Version const http_version = { 1, 1 };
+
+class MonitorImpl : public Monitor {
+public:
+ MonitorImpl(Looper* looper, Resolver* resolver, Delegate* delegate)
+ : looper_(looper), resolver_(resolver), delegate_(delegate),
+ state_(DISCONNECTED) {
+ }
+
+ ~MonitorImpl() {
+ do_disconnect(true);
+ }
+
+ void connect(std::string const& host, uint16_t port) override {
+ do_disconnect();
+ new_state(CONNECTING);
+ if (state_ == CONNECTING) {
+ resolv_ = resolver_->request(
+ host, port, std::bind(&MonitorImpl::resolved, this,
+ std::placeholders::_1,
+ std::placeholders::_2,
+ std::placeholders::_3));
+ }
+ }
+
+ void disconnect() override {
+ do_disconnect();
+ new_state(DISCONNECTED);
+ }
+
+ void attach() override {
+ if (state_ != CONNECTED) return;
+ new_state(ATTACHED);
+ send_attach();
+ }
+
+ void detach() override {
+ if (state_ != ATTACHED) return;
+ new_state(CONNECTED);
+ send_detach();
+ }
+
+ State state() const override {
+ return state_;
+ }
+
+private:
+ void do_disconnect(bool skip_detach = false) {
+ switch (state_) {
+ case ATTACHED:
+ if (!skip_detach) detach();
+ // Fallthrough
+ case CONNECTED:
+ looper_->remove(sock_.get());
+ sock_.reset();
+ break;
+ case CONNECTING:
+ if (sock_) {
+ looper_->remove(sock_.get());
+ sock_.reset();
+ }
+ if (resolv_) {
+ resolver_->cancel(resolv_);
+ resolv_ = nullptr;
+ }
+ break;
+ case DISCONNECTED:
+ break;
+ }
+ }
+
+ void new_state(State state) {
+ if (state_ == state) return;
+ state_ = state;
+ delegate_->state(this, state_);
+ }
+
+ void resolved(int fd, bool connected, char const* error) {
+ resolv_ = nullptr;
+ if (fd < 0) {
+ delegate_->error(this, error);
+ new_state(DISCONNECTED);
+ return;
+ }
+ sock_.reset(fd);
+ if (!in_) {
+ in_.reset(Buffer::create(65536, 4096));
+ } else {
+ in_->clear();
+ }
+ if (!out_) {
+ out_.reset(Buffer::create(8192, 1024));
+ } else {
+ out_->clear();
+ }
+ looper_->add(sock_.get(), Looper::EVENT_WRITE,
+ std::bind(&MonitorImpl::event, this, std::placeholders::_1,
+ std::placeholders::_2));
+ sent_hello_ = false;
+ if (connected) {
+ event(sock_.get(), Looper::EVENT_WRITE);
+ }
+ }
+
+ bool parse_content_length(HttpResponse const* resp, uint64_t* bytes) {
+ std::string len = resp->first_header("content-length");
+ char* end = nullptr;
+ errno = 0;
+ auto tmp = strtoull(len.c_str(), &end, 10);
+ if (errno || !end || *end) {
+ return false;
+ }
+ if (bytes) *bytes = tmp;
+ return true;
+ }
+
+ bool setup_attach(HttpResponse const* resp) {
+ assert(!active_attach_);
+ std::string te = resp->first_header("transfer-encoding");
+ if (te != "chunked") return false;
+ package_fill_ = 0;
+ active_attach_.reset(
+ Chunked::create(
+ std::bind(&MonitorImpl::package, this, std::placeholders::_1,
+ std::placeholders::_2)));
+ return true;
+ }
+
+ static uint64_t read_u64(uint8_t const* data) {
+ return static_cast<uint64_t>(read_u32(data)) << 32 | read_u32(data + 4);
+ }
+
+ static uint32_t read_u32(uint8_t const* data) {
+ return static_cast<uint32_t>(read_u16(data)) << 16 | read_u16(data + 2);
+ }
+
+ static uint16_t read_u16(uint8_t const* data) {
+ return data[0] << 8 | data[1];
+ }
+
+ void package(void const* data, size_t size) {
+ auto d = reinterpret_cast<char const*>(data);
+ auto const end = d + size;
+ while (d < end) {
+ size_t avail = sizeof(package_) - package_fill_;
+ if (avail == 0) break;
+ if (d + avail > end) avail = end - d;
+ memcpy(package_ + package_fill_, d, avail);
+ package_fill_ += avail;
+ d += avail;
+ size_t offset = 0;
+ while (offset + 5 < package_fill_) {
+ uint16_t size = read_u16(package_ + offset + 3);
+ if (offset + size > package_fill_) break;
+ size_t o = 5;
+ if (size >= 29 && memcmp(package_ + offset, "PKG", 3) == 0) {
+ Package pkg;
+ pkg.id = read_u32(package_ + offset + o);
+ o += 4;
+ pkg.timestamp.tv_sec = read_u64(package_ + offset + o);
+ o += 8;
+ pkg.timestamp.tv_nsec = read_u32(package_ + offset + o);
+ o += 4;
+ pkg.flags = read_u16(package_ + offset + o);
+ o += 2;
+ pkg.source_port = read_u16(package_ + offset + o);
+ o += 2;
+ pkg.target_port = read_u16(package_ + offset + o);
+ o += 2;
+ auto len = read_u16(package_ + offset + o);
+ o += 2;
+ if (o + len + 2 <= size) {
+ pkg.source_host.assign(
+ reinterpret_cast<char*>(package_) + offset + o, len);
+ o += len;
+ len = read_u16(package_ + offset + o);
+ o += 2;
+ if (o + len <= size) {
+ pkg.target_host.assign(
+ reinterpret_cast<char*>(package_) + offset + o, len);
+ o += len;
+ bool last = !(pkg.flags & 0x01);
+ pkg.flags >>= 1;
+ delegate_->package(this, pkg);
+ if (o < size || last) {
+ delegate_->package_data(
+ this, pkg.id,
+ reinterpret_cast<char*>(package_) + offset + o, size - o,
+ last);
+ }
+ }
+ }
+ } else if (size >= 10 && memcmp(package_ + offset, "DAT", 3) == 0) {
+ uint32_t id = read_u32(package_ + offset + o);
+ o += 4;
+ uint8_t flags = package_[offset + o];
+ ++o;
+ delegate_->package_data(
+ this, id,
+ reinterpret_cast<char*>(package_) + offset + o, size - o,
+ !(flags & 0x01));
+ }
+ offset += size;
+ }
+ if (offset > 0) {
+ package_fill_ -= offset;
+ memmove(package_, package_ + offset, package_fill_);
+ }
+ }
+ }
+
+ void consume_attach() {
+ assert(active_attach_);
+ while (true) {
+ size_t avail;
+ auto ptr = in_->read_ptr(&avail);
+ if (avail == 0) return;
+ auto used = active_attach_->add(ptr, avail);
+ if (!active_attach_->good()) {
+ delegate_->error(this, "Bad chunked data");
+ disconnect();
+ return;
+ }
+ if (used == 0) return;
+ in_->consume(used);
+ if (active_attach_->eof()) {
+ active_attach_.reset();
+ new_state(CONNECTED);
+ return;
+ }
+ }
+ }
+
+ void event(int fd, uint8_t events) {
+ if (fd != sock_.get() || events == 0) {
+ assert(false);
+ return;
+ }
+
+ if (events & (Looper::EVENT_HUP | Looper::EVENT_ERROR)) {
+ if (state_ == CONNECTING && !sent_hello_) {
+ delegate_->error(this, "Connection denied");
+ } else {
+ delegate_->error(this, "Connection lost");
+ }
+ disconnect();
+ return;
+ }
+
+ if (events & Looper::EVENT_READ) {
+ while (true) {
+ size_t avail;
+ auto ptr = in_->write_ptr(&avail);
+ if (avail == 0) {
+ assert(false);
+ break;
+ }
+ auto ret = io::read(sock_.get(), ptr, avail);
+ if (ret == -1) {
+ if (errno == EWOULDBLOCK || errno == EAGAIN) break;
+ delegate_->error(this, "Read error");
+ disconnect();
+ return;
+ }
+ if (ret == 0) {
+ delegate_->error(this, "Connection closed");
+ disconnect();
+ return;
+ }
+ in_->commit(ret);
+ if (static_cast<size_t>(ret) < avail) break;
+ }
+ }
+ if (events & Looper::EVENT_WRITE) {
+ if (state_ == CONNECTING && !sent_hello_) {
+ sent_hello_ = true;
+ send_hello();
+ return;
+ }
+
+ size_t avail;
+ while (true) {
+ auto ptr = out_->read_ptr(&avail);
+ if (avail == 0) break;
+ auto ret = io::write(sock_.get(), ptr, avail);
+ if (ret == -1) {
+ if (errno == EWOULDBLOCK || errno == EAGAIN) break;
+ delegate_->error(this, "Write error");
+ disconnect();
+ return;
+ }
+ if (ret == 0) {
+ delegate_->error(this, "Connection lost");
+ disconnect();
+ return;
+ }
+ out_->consume(ret);
+ if (static_cast<size_t>(ret) < avail) break;
+ }
+ looper_->modify(sock_.get(), Looper::EVENT_READ
+ | (avail == 0 ? 0 : Looper::EVENT_WRITE));
+ }
+
+ if (active_attach_) {
+ consume_attach();
+ if (active_attach_) return;
+ }
+
+ while (true) {
+ size_t avail;
+ auto ptr = in_->read_ptr(&avail);
+ if (avail == 0) return;
+ if (content_skip_ > 0) {
+ if (content_skip_ > avail) {
+ in_->consume(avail);
+ content_skip_ -= avail;
+ return;
+ }
+ in_->consume(content_skip_);
+ content_skip_ = 0;
+ ptr = in_->read_ptr(&avail);
+ }
+ auto resp = std::unique_ptr<HttpResponse>(
+ HttpResponse::parse(
+ reinterpret_cast<char const*>(ptr), avail, false));
+ if (!resp) {
+ if (avail > 1024 * 1024) {
+ delegate_->error(this, "Server sending too much unexpected data");
+ disconnect();
+ }
+ return;
+ }
+ if (!resp->good()) {
+ delegate_->error(this, "Server sent invalid response");
+ disconnect();
+ return;
+ }
+ switch (state_) {
+ case CONNECTING:
+ if (resp->status_code() != 200) {
+ delegate_->error(
+ this, "Unexpected server response: " + resp->status_message());
+ disconnect();
+ return;
+ }
+ if (!parse_content_length(resp.get(), &content_skip_)) {
+ delegate_->error(
+ this, "Invalid server response, bad content length");
+ disconnect();
+ return;
+ }
+ in_->consume(resp->size());
+ new_state(CONNECTED);
+ active_attach_.reset();
+ continue;
+ case ATTACHED:
+ if (resp->status_code() != 200) {
+ delegate_->error(
+ this, "Unexpected server response: " + resp->status_message());
+ new_state(CONNECTED);
+ if (!parse_content_length(resp.get(), &content_skip_)) {
+ delegate_->error(
+ this, "Invalid server response, bad content length");
+ disconnect();
+ return;
+ }
+ in_->consume(resp->size());
+ } else {
+ if (!setup_attach(resp.get())) {
+ delegate_->error(
+ this, "Invalid server response, bad chunked attach");
+ disconnect();
+ return;
+ }
+ in_->consume(resp->size());
+ consume_attach();
+ if (active_attach_) return;
+ }
+ continue;
+ default:
+ delegate_->error(
+ this, "Unexpected server response: " + resp->status_message());
+ disconnect();
+ return;
+ }
+ }
+ }
+
+ void send_hello() {
+ auto request = std::unique_ptr<HttpRequestBuilder>(
+ HttpRequestBuilder::create("GET", "/hello", "HTTP", http_version));
+ request->add_header("X-TP-Monitor-Version", VERSION);
+ request->add_header("Content-Length", "0");
+ auto data = request->build();
+ send(data.data(), data.size());
+ }
+
+ void send_attach() {
+ auto request = std::unique_ptr<HttpRequestBuilder>(
+ HttpRequestBuilder::create("GET", "/attach", "HTTP", http_version));
+ request->add_header("Content-Length", "0");
+ auto data = request->build();
+ send(data.data(), data.size());
+ }
+
+ void send_detach() {
+ auto request = std::unique_ptr<HttpRequestBuilder>(
+ HttpRequestBuilder::create("GET", "/detach", "HTTP", http_version));
+ request->add_header("Content-Length", "0");
+ auto data = request->build();
+ send(data.data(), data.size());
+ }
+
+ void send(void const* data, size_t size) {
+ if (size == 0) return;
+
+ if (out_->empty()) {
+ ssize_t ret = io::write(sock_.get(), data, size);
+ if (ret == -1) {
+ if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ delegate_->error(this, "Write error");
+ disconnect();
+ return;
+ }
+ } else if (ret == 0) {
+ delegate_->error(this, "Write error");
+ disconnect();
+ return;
+ } else if (static_cast<size_t>(ret) == size) {
+ return;
+ }
+ out_->write(reinterpret_cast<char const*>(data) + ret, size - ret);
+ looper_->modify(sock_.get(), Looper::EVENT_READ | Looper::EVENT_WRITE);
+ } else {
+ out_->write(data, size);
+ }
+ }
+
+ Looper* const looper_;
+ Resolver* const resolver_;
+ Delegate* const delegate_;
+ State state_;
+ void* resolv_;
+ io::auto_fd sock_;
+ std::unique_ptr<Buffer> in_;
+ std::unique_ptr<Buffer> out_;
+ bool sent_hello_;
+ std::unique_ptr<Chunked> active_attach_;
+ uint64_t content_skip_;
+ uint8_t package_[65535];
+ size_t package_fill_;
+};
+
+} // namespace
+
+// static
+Monitor* Monitor::create(
+ Looper* looper, Resolver* resolver, Delegate* delegate) {
+ return new MonitorImpl(looper, resolver, delegate);
+}