// -*- mode: c++; c-basic-offset: 2; -*- #include "common.hh" #include #include "buffer.hh" #include "chunked.hh" #include "http.hh" #include "io.hh" #include "looper.hh" #include "monitor.hh" #include "resolver.hh" namespace { Version const http_version = { 1, 1 }; class MonitorImpl : public Monitor { public: MonitorImpl(Looper* looper, Resolver* resolver, Delegate* delegate) : looper_(looper), resolver_(resolver), delegate_(delegate), state_(DISCONNECTED) { } ~MonitorImpl() { do_disconnect(true); } void connect(std::string const& host, uint16_t port) override { do_disconnect(); new_state(CONNECTING); if (state_ == CONNECTING) { resolv_ = resolver_->request( host, port, std::bind(&MonitorImpl::resolved, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } } void disconnect() override { do_disconnect(); new_state(DISCONNECTED); } void attach() override { if (state_ != CONNECTED) return; new_state(ATTACHED); send_attach(); } void detach() override { if (state_ != ATTACHED) return; new_state(CONNECTED); send_detach(); } State state() const override { return state_; } private: void do_disconnect(bool skip_detach = false) { switch (state_) { case ATTACHED: if (!skip_detach) detach(); // Fallthrough case CONNECTED: looper_->remove(sock_.get()); sock_.reset(); break; case CONNECTING: if (sock_) { looper_->remove(sock_.get()); sock_.reset(); } if (resolv_) { resolver_->cancel(resolv_); resolv_ = nullptr; } break; case DISCONNECTED: break; } } void new_state(State state) { if (state_ == state) return; state_ = state; delegate_->state(this, state_); } void resolved(int fd, bool connected, char const* error) { resolv_ = nullptr; if (fd < 0) { delegate_->error(this, error); new_state(DISCONNECTED); return; } sock_.reset(fd); if (!in_) { in_.reset(Buffer::create(65536, 4096)); } else { in_->clear(); } if (!out_) { out_.reset(Buffer::create(8192, 1024)); } else { out_->clear(); } looper_->add(sock_.get(), Looper::EVENT_WRITE, std::bind(&MonitorImpl::event, this, std::placeholders::_1, std::placeholders::_2)); sent_hello_ = false; if (connected) { event(sock_.get(), Looper::EVENT_WRITE); } } bool parse_content_length(HttpResponse const* resp, uint64_t* bytes) { std::string len = resp->first_header("content-length"); char* end = nullptr; errno = 0; auto tmp = strtoull(len.c_str(), &end, 10); if (errno || !end || *end) { return false; } if (bytes) *bytes = tmp; return true; } bool setup_attach(HttpResponse const* resp) { assert(!active_attach_); std::string te = resp->first_header("transfer-encoding"); if (te != "chunked") return false; package_fill_ = 0; active_attach_.reset( Chunked::create( std::bind(&MonitorImpl::package, this, std::placeholders::_1, std::placeholders::_2))); return true; } static uint64_t read_u64(uint8_t const* data) { return static_cast(read_u32(data)) << 32 | read_u32(data + 4); } static uint32_t read_u32(uint8_t const* data) { return static_cast(read_u16(data)) << 16 | read_u16(data + 2); } static uint16_t read_u16(uint8_t const* data) { return data[0] << 8 | data[1]; } void package(void const* data, size_t size) { auto d = reinterpret_cast(data); auto const end = d + size; while (d < end) { size_t avail = sizeof(package_) - package_fill_; if (avail == 0) break; if (d + avail > end) avail = end - d; memcpy(package_ + package_fill_, d, avail); package_fill_ += avail; d += avail; size_t offset = 0; while (offset + 5 < package_fill_) { uint16_t size = read_u16(package_ + offset + 3); if (offset + size > package_fill_) break; size_t o = 5; if (size >= 29 && memcmp(package_ + offset, "PKG", 3) == 0) { Package pkg; pkg.id = read_u32(package_ + offset + o); o += 4; pkg.timestamp.tv_sec = read_u64(package_ + offset + o); o += 8; pkg.timestamp.tv_nsec = read_u32(package_ + offset + o); o += 4; pkg.flags = read_u16(package_ + offset + o); o += 2; pkg.source_port = read_u16(package_ + offset + o); o += 2; pkg.target_port = read_u16(package_ + offset + o); o += 2; auto len = read_u16(package_ + offset + o); o += 2; if (o + len + 2 <= size) { pkg.source_host.assign( reinterpret_cast(package_) + offset + o, len); o += len; len = read_u16(package_ + offset + o); o += 2; if (o + len <= size) { pkg.target_host.assign( reinterpret_cast(package_) + offset + o, len); o += len; bool last = !(pkg.flags & 0x01); pkg.flags >>= 1; delegate_->package(this, pkg); if (o < size || last) { delegate_->package_data( this, pkg.id, reinterpret_cast(package_) + offset + o, size - o, last); } } } } else if (size >= 10 && memcmp(package_ + offset, "DAT", 3) == 0) { uint32_t id = read_u32(package_ + offset + o); o += 4; uint8_t flags = package_[offset + o]; ++o; delegate_->package_data( this, id, reinterpret_cast(package_) + offset + o, size - o, !(flags & 0x01)); } offset += size; } if (offset > 0) { package_fill_ -= offset; memmove(package_, package_ + offset, package_fill_); } } } void consume_attach() { assert(active_attach_); while (true) { size_t avail; auto ptr = in_->read_ptr(&avail); if (avail == 0) return; auto used = active_attach_->add(ptr, avail); if (!active_attach_->good()) { delegate_->error(this, "Bad chunked data"); disconnect(); return; } if (used == 0) return; in_->consume(used); if (active_attach_->eof()) { active_attach_.reset(); new_state(CONNECTED); return; } } } void event(int fd, uint8_t events) { if (fd != sock_.get() || events == 0) { assert(false); return; } if (events & (Looper::EVENT_HUP | Looper::EVENT_ERROR)) { if (state_ == CONNECTING && !sent_hello_) { delegate_->error(this, "Connection denied"); } else { delegate_->error(this, "Connection lost"); } disconnect(); return; } if (events & Looper::EVENT_READ) { while (true) { size_t avail; auto ptr = in_->write_ptr(&avail); if (avail == 0) { assert(false); break; } auto ret = io::read(sock_.get(), ptr, avail); if (ret == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) break; delegate_->error(this, "Read error"); disconnect(); return; } if (ret == 0) { delegate_->error(this, "Connection closed"); disconnect(); return; } in_->commit(ret); if (static_cast(ret) < avail) break; } } if (events & Looper::EVENT_WRITE) { if (state_ == CONNECTING && !sent_hello_) { sent_hello_ = true; send_hello(); return; } size_t avail; while (true) { auto ptr = out_->read_ptr(&avail); if (avail == 0) break; auto ret = io::write(sock_.get(), ptr, avail); if (ret == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) break; delegate_->error(this, "Write error"); disconnect(); return; } if (ret == 0) { delegate_->error(this, "Connection lost"); disconnect(); return; } out_->consume(ret); if (static_cast(ret) < avail) break; } looper_->modify(sock_.get(), Looper::EVENT_READ | (avail == 0 ? 0 : Looper::EVENT_WRITE)); } if (active_attach_) { consume_attach(); if (active_attach_) return; } while (true) { size_t avail; auto ptr = in_->read_ptr(&avail); if (avail == 0) return; if (content_skip_ > 0) { if (content_skip_ > avail) { in_->consume(avail); content_skip_ -= avail; return; } in_->consume(content_skip_); content_skip_ = 0; ptr = in_->read_ptr(&avail); } auto resp = std::unique_ptr( HttpResponse::parse( reinterpret_cast(ptr), avail, false)); if (!resp) { if (avail > 1024 * 1024) { delegate_->error(this, "Server sending too much unexpected data"); disconnect(); } return; } if (!resp->good()) { delegate_->error(this, "Server sent invalid response"); disconnect(); return; } switch (state_) { case CONNECTING: if (resp->status_code() != 200) { delegate_->error( this, "Unexpected server response: " + resp->status_message()); disconnect(); return; } if (!parse_content_length(resp.get(), &content_skip_)) { delegate_->error( this, "Invalid server response, bad content length"); disconnect(); return; } in_->consume(resp->size()); new_state(CONNECTED); active_attach_.reset(); continue; case ATTACHED: if (resp->status_code() != 200) { delegate_->error( this, "Unexpected server response: " + resp->status_message()); new_state(CONNECTED); if (!parse_content_length(resp.get(), &content_skip_)) { delegate_->error( this, "Invalid server response, bad content length"); disconnect(); return; } in_->consume(resp->size()); } else { if (!setup_attach(resp.get())) { delegate_->error( this, "Invalid server response, bad chunked attach"); disconnect(); return; } in_->consume(resp->size()); consume_attach(); if (active_attach_) return; } continue; default: delegate_->error( this, "Unexpected server response: " + resp->status_message()); disconnect(); return; } } } void send_hello() { auto request = std::unique_ptr( HttpRequestBuilder::create("GET", "/hello", "HTTP", http_version)); request->add_header("X-TP-Monitor-Version", VERSION); request->add_header("Content-Length", "0"); auto data = request->build(); send(data.data(), data.size()); } void send_attach() { auto request = std::unique_ptr( HttpRequestBuilder::create("GET", "/attach", "HTTP", http_version)); request->add_header("Content-Length", "0"); auto data = request->build(); send(data.data(), data.size()); } void send_detach() { auto request = std::unique_ptr( HttpRequestBuilder::create("GET", "/detach", "HTTP", http_version)); request->add_header("Content-Length", "0"); auto data = request->build(); send(data.data(), data.size()); } void send(void const* data, size_t size) { if (size == 0) return; if (out_->empty()) { ssize_t ret = io::write(sock_.get(), data, size); if (ret == -1) { if (errno != EWOULDBLOCK && errno != EAGAIN) { delegate_->error(this, "Write error"); disconnect(); return; } } else if (ret == 0) { delegate_->error(this, "Write error"); disconnect(); return; } else if (static_cast(ret) == size) { return; } out_->write(reinterpret_cast(data) + ret, size - ret); looper_->modify(sock_.get(), Looper::EVENT_READ | Looper::EVENT_WRITE); } else { out_->write(data, size); } } Looper* const looper_; Resolver* const resolver_; Delegate* const delegate_; State state_; void* resolv_; io::auto_fd sock_; std::unique_ptr in_; std::unique_ptr out_; bool sent_hello_; std::unique_ptr active_attach_; uint64_t content_skip_; uint8_t package_[65535]; size_t package_fill_; }; } // namespace // static Monitor* Monitor::create( Looper* looper, Resolver* resolver, Delegate* delegate) { return new MonitorImpl(looper, resolver, delegate); }