diff options
Diffstat (limited to 'src/monitor.cc')
| -rw-r--r-- | src/monitor.cc | 476 |
1 files changed, 476 insertions, 0 deletions
diff --git a/src/monitor.cc b/src/monitor.cc new file mode 100644 index 0000000..673ad14 --- /dev/null +++ b/src/monitor.cc @@ -0,0 +1,476 @@ +// -*- mode: c++; c-basic-offset: 2; -*- + +#include "common.hh" + +#include <cstring> + +#include "buffer.hh" +#include "chunked.hh" +#include "http.hh" +#include "io.hh" +#include "looper.hh" +#include "monitor.hh" +#include "resolver.hh" + +namespace { + +Version const http_version = { 1, 1 }; + +class MonitorImpl : public Monitor { +public: + MonitorImpl(Looper* looper, Resolver* resolver, Delegate* delegate) + : looper_(looper), resolver_(resolver), delegate_(delegate), + state_(DISCONNECTED) { + } + + ~MonitorImpl() { + do_disconnect(true); + } + + void connect(std::string const& host, uint16_t port) override { + do_disconnect(); + new_state(CONNECTING); + if (state_ == CONNECTING) { + resolv_ = resolver_->request( + host, port, std::bind(&MonitorImpl::resolved, this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3)); + } + } + + void disconnect() override { + do_disconnect(); + new_state(DISCONNECTED); + } + + void attach() override { + if (state_ != CONNECTED) return; + new_state(ATTACHED); + send_attach(); + } + + void detach() override { + if (state_ != ATTACHED) return; + new_state(CONNECTED); + send_detach(); + } + + State state() const override { + return state_; + } + +private: + void do_disconnect(bool skip_detach = false) { + switch (state_) { + case ATTACHED: + if (!skip_detach) detach(); + // Fallthrough + case CONNECTED: + looper_->remove(sock_.get()); + sock_.reset(); + break; + case CONNECTING: + if (sock_) { + looper_->remove(sock_.get()); + sock_.reset(); + } + if (resolv_) { + resolver_->cancel(resolv_); + resolv_ = nullptr; + } + break; + case DISCONNECTED: + break; + } + } + + void new_state(State state) { + if (state_ == state) return; + state_ = state; + delegate_->state(this, state_); + } + + void resolved(int fd, bool connected, char const* error) { + resolv_ = nullptr; + if (fd < 0) { + delegate_->error(this, error); + new_state(DISCONNECTED); + return; + } + sock_.reset(fd); + if (!in_) { + in_.reset(Buffer::create(65536, 4096)); + } else { + in_->clear(); + } + if (!out_) { + out_.reset(Buffer::create(8192, 1024)); + } else { + out_->clear(); + } + looper_->add(sock_.get(), Looper::EVENT_WRITE, + std::bind(&MonitorImpl::event, this, std::placeholders::_1, + std::placeholders::_2)); + sent_hello_ = false; + if (connected) { + event(sock_.get(), Looper::EVENT_WRITE); + } + } + + bool parse_content_length(HttpResponse const* resp, uint64_t* bytes) { + std::string len = resp->first_header("content-length"); + char* end = nullptr; + errno = 0; + auto tmp = strtoull(len.c_str(), &end, 10); + if (errno || !end || *end) { + return false; + } + if (bytes) *bytes = tmp; + return true; + } + + bool setup_attach(HttpResponse const* resp) { + assert(!active_attach_); + std::string te = resp->first_header("transfer-encoding"); + if (te != "chunked") return false; + package_fill_ = 0; + active_attach_.reset( + Chunked::create( + std::bind(&MonitorImpl::package, this, std::placeholders::_1, + std::placeholders::_2))); + return true; + } + + static uint64_t read_u64(uint8_t const* data) { + return static_cast<uint64_t>(read_u32(data)) << 32 | read_u32(data + 4); + } + + static uint32_t read_u32(uint8_t const* data) { + return static_cast<uint32_t>(read_u16(data)) << 16 | read_u16(data + 2); + } + + static uint16_t read_u16(uint8_t const* data) { + return data[0] << 8 | data[1]; + } + + void package(void const* data, size_t size) { + auto d = reinterpret_cast<char const*>(data); + auto const end = d + size; + while (d < end) { + size_t avail = sizeof(package_) - package_fill_; + if (avail == 0) break; + if (d + avail > end) avail = end - d; + memcpy(package_ + package_fill_, d, avail); + package_fill_ += avail; + d += avail; + size_t offset = 0; + while (offset + 5 < package_fill_) { + uint16_t size = read_u16(package_ + offset + 3); + if (offset + size > package_fill_) break; + size_t o = 5; + if (size >= 29 && memcmp(package_ + offset, "PKG", 3) == 0) { + Package pkg; + pkg.id = read_u32(package_ + offset + o); + o += 4; + pkg.timestamp.tv_sec = read_u64(package_ + offset + o); + o += 8; + pkg.timestamp.tv_nsec = read_u32(package_ + offset + o); + o += 4; + pkg.flags = read_u16(package_ + offset + o); + o += 2; + pkg.source_port = read_u16(package_ + offset + o); + o += 2; + pkg.target_port = read_u16(package_ + offset + o); + o += 2; + auto len = read_u16(package_ + offset + o); + o += 2; + if (o + len + 2 <= size) { + pkg.source_host.assign( + reinterpret_cast<char*>(package_) + offset + o, len); + o += len; + len = read_u16(package_ + offset + o); + o += 2; + if (o + len <= size) { + pkg.target_host.assign( + reinterpret_cast<char*>(package_) + offset + o, len); + o += len; + bool last = !(pkg.flags & 0x01); + pkg.flags >>= 1; + delegate_->package(this, pkg); + if (o < size || last) { + delegate_->package_data( + this, pkg.id, + reinterpret_cast<char*>(package_) + offset + o, size - o, + last); + } + } + } + } else if (size >= 10 && memcmp(package_ + offset, "DAT", 3) == 0) { + uint32_t id = read_u32(package_ + offset + o); + o += 4; + uint8_t flags = package_[offset + o]; + ++o; + delegate_->package_data( + this, id, + reinterpret_cast<char*>(package_) + offset + o, size - o, + !(flags & 0x01)); + } + offset += size; + } + if (offset > 0) { + package_fill_ -= offset; + memmove(package_, package_ + offset, package_fill_); + } + } + } + + void consume_attach() { + assert(active_attach_); + while (true) { + size_t avail; + auto ptr = in_->read_ptr(&avail); + if (avail == 0) return; + auto used = active_attach_->add(ptr, avail); + if (!active_attach_->good()) { + delegate_->error(this, "Bad chunked data"); + disconnect(); + return; + } + if (used == 0) return; + in_->consume(used); + if (active_attach_->eof()) { + active_attach_.reset(); + new_state(CONNECTED); + return; + } + } + } + + void event(int fd, uint8_t events) { + if (fd != sock_.get() || events == 0) { + assert(false); + return; + } + + if (events & (Looper::EVENT_HUP | Looper::EVENT_ERROR)) { + if (state_ == CONNECTING && !sent_hello_) { + delegate_->error(this, "Connection denied"); + } else { + delegate_->error(this, "Connection lost"); + } + disconnect(); + return; + } + + if (events & Looper::EVENT_READ) { + while (true) { + size_t avail; + auto ptr = in_->write_ptr(&avail); + if (avail == 0) { + assert(false); + break; + } + auto ret = io::read(sock_.get(), ptr, avail); + if (ret == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) break; + delegate_->error(this, "Read error"); + disconnect(); + return; + } + if (ret == 0) { + delegate_->error(this, "Connection closed"); + disconnect(); + return; + } + in_->commit(ret); + if (static_cast<size_t>(ret) < avail) break; + } + } + if (events & Looper::EVENT_WRITE) { + if (state_ == CONNECTING && !sent_hello_) { + sent_hello_ = true; + send_hello(); + return; + } + + size_t avail; + while (true) { + auto ptr = out_->read_ptr(&avail); + if (avail == 0) break; + auto ret = io::write(sock_.get(), ptr, avail); + if (ret == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) break; + delegate_->error(this, "Write error"); + disconnect(); + return; + } + if (ret == 0) { + delegate_->error(this, "Connection lost"); + disconnect(); + return; + } + out_->consume(ret); + if (static_cast<size_t>(ret) < avail) break; + } + looper_->modify(sock_.get(), Looper::EVENT_READ + | (avail == 0 ? 0 : Looper::EVENT_WRITE)); + } + + if (active_attach_) { + consume_attach(); + if (active_attach_) return; + } + + while (true) { + size_t avail; + auto ptr = in_->read_ptr(&avail); + if (avail == 0) return; + if (content_skip_ > 0) { + if (content_skip_ > avail) { + in_->consume(avail); + content_skip_ -= avail; + return; + } + in_->consume(content_skip_); + content_skip_ = 0; + ptr = in_->read_ptr(&avail); + } + auto resp = std::unique_ptr<HttpResponse>( + HttpResponse::parse( + reinterpret_cast<char const*>(ptr), avail, false)); + if (!resp) { + if (avail > 1024 * 1024) { + delegate_->error(this, "Server sending too much unexpected data"); + disconnect(); + } + return; + } + if (!resp->good()) { + delegate_->error(this, "Server sent invalid response"); + disconnect(); + return; + } + switch (state_) { + case CONNECTING: + if (resp->status_code() != 200) { + delegate_->error( + this, "Unexpected server response: " + resp->status_message()); + disconnect(); + return; + } + if (!parse_content_length(resp.get(), &content_skip_)) { + delegate_->error( + this, "Invalid server response, bad content length"); + disconnect(); + return; + } + in_->consume(resp->size()); + new_state(CONNECTED); + active_attach_.reset(); + continue; + case ATTACHED: + if (resp->status_code() != 200) { + delegate_->error( + this, "Unexpected server response: " + resp->status_message()); + new_state(CONNECTED); + if (!parse_content_length(resp.get(), &content_skip_)) { + delegate_->error( + this, "Invalid server response, bad content length"); + disconnect(); + return; + } + in_->consume(resp->size()); + } else { + if (!setup_attach(resp.get())) { + delegate_->error( + this, "Invalid server response, bad chunked attach"); + disconnect(); + return; + } + in_->consume(resp->size()); + consume_attach(); + if (active_attach_) return; + } + continue; + default: + delegate_->error( + this, "Unexpected server response: " + resp->status_message()); + disconnect(); + return; + } + } + } + + void send_hello() { + auto request = std::unique_ptr<HttpRequestBuilder>( + HttpRequestBuilder::create("GET", "/hello", "HTTP", http_version)); + request->add_header("X-TP-Monitor-Version", VERSION); + request->add_header("Content-Length", "0"); + auto data = request->build(); + send(data.data(), data.size()); + } + + void send_attach() { + auto request = std::unique_ptr<HttpRequestBuilder>( + HttpRequestBuilder::create("GET", "/attach", "HTTP", http_version)); + request->add_header("Content-Length", "0"); + auto data = request->build(); + send(data.data(), data.size()); + } + + void send_detach() { + auto request = std::unique_ptr<HttpRequestBuilder>( + HttpRequestBuilder::create("GET", "/detach", "HTTP", http_version)); + request->add_header("Content-Length", "0"); + auto data = request->build(); + send(data.data(), data.size()); + } + + void send(void const* data, size_t size) { + if (size == 0) return; + + if (out_->empty()) { + ssize_t ret = io::write(sock_.get(), data, size); + if (ret == -1) { + if (errno != EWOULDBLOCK && errno != EAGAIN) { + delegate_->error(this, "Write error"); + disconnect(); + return; + } + } else if (ret == 0) { + delegate_->error(this, "Write error"); + disconnect(); + return; + } else if (static_cast<size_t>(ret) == size) { + return; + } + out_->write(reinterpret_cast<char const*>(data) + ret, size - ret); + looper_->modify(sock_.get(), Looper::EVENT_READ | Looper::EVENT_WRITE); + } else { + out_->write(data, size); + } + } + + Looper* const looper_; + Resolver* const resolver_; + Delegate* const delegate_; + State state_; + void* resolv_; + io::auto_fd sock_; + std::unique_ptr<Buffer> in_; + std::unique_ptr<Buffer> out_; + bool sent_hello_; + std::unique_ptr<Chunked> active_attach_; + uint64_t content_skip_; + uint8_t package_[65535]; + size_t package_fill_; +}; + +} // namespace + +// static +Monitor* Monitor::create( + Looper* looper, Resolver* resolver, Delegate* delegate) { + return new MonitorImpl(looper, resolver, delegate); +} |
