diff options
Diffstat (limited to 'src/proxy.cc')
| -rw-r--r-- | src/proxy.cc | 471 |
1 files changed, 440 insertions, 31 deletions
diff --git a/src/proxy.cc b/src/proxy.cc index dcc52df..5abe257 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -6,6 +6,7 @@ #define _GNU_SOURCE #endif +#include <arpa/inet.h> #include <cerrno> #include <chrono> #include <cstring> @@ -17,6 +18,7 @@ #include <sys/types.h> #include <unistd.h> #include <vector> +#include <unordered_set> #include "buffer.hh" #include "chunked.hh" @@ -229,6 +231,7 @@ struct RemoteClient : public BaseClient { Content content; std::string host; uint16_t port; + uint32_t pkg_id; }; struct Client : public BaseClient { @@ -242,9 +245,13 @@ struct Client : public BaseClient { RemoteState remote_state; void* resolve; RemoteClient remote; + std::string source_host; + uint16_t source_port; + uint32_t pkg_id; }; struct Monitor : public BaseClient { + bool got_hello; }; class ProxyImpl : public Proxy { @@ -254,7 +261,8 @@ public: : config_(config), cwd_(cwd), configfile_(configfile), logfile_(logfile), logger_(logger), accept_socket_(accept_fd), monitor_socket_(monitor_fd), looper_(Looper::create()), resolver_(Resolver::create(looper_.get())), - new_timeout_(nullptr), timeout_(nullptr) { + new_timeout_(nullptr), timeout_(nullptr), next_package_id_(1), + monitor_send_proxied_(false) { setup(); } ~ProxyImpl() override { @@ -307,11 +315,33 @@ private: void client_error(size_t index, uint16_t status_code, std::string const& status); bool client_request(size_t index); - bool client_send(size_t index, void const* ptr, size_t avail); void client_remote_error(size_t index, uint16_t error); void close_client_when_done(size_t index); void client_remote_resolved(size_t index, int fd, bool connected, char const* error); + void monitor_error(size_t index, + uint16_t status_code, std::string const& status); + bool support_monitor_version(size_t index, std::string const& version); + bool monitor_send_chunked(size_t index, + void const* header, size_t header_size, + void const* data, size_t data_size); + uint32_t get_next_package_id(); + void send_attached_package(uint32_t id, uint16_t flags, + std::string const& source_host, + uint16_t source_port, + std::string const& target_host, + uint16_t target_port, + bool last); + void send_attached_package2(uint8_t* buffer, size_t size, + uint32_t id, uint16_t flags, + std::string const& source_host, + uint16_t source_port, + std::string const& target_host, + uint16_t target_port, + bool last); + void send_attached_data(uint32_t id, void const* ptr, size_t size, bool last); + void send_attached(void const* header, size_t header_size, + void const* data, size_t data_size); Config* const config_; std::string cwd_; @@ -327,9 +357,12 @@ private: bool good_; void* new_timeout_; void* timeout_; + uint32_t next_package_id_; + bool monitor_send_proxied_; clients<Client> clients_; clients<Monitor> monitors_; + std::unordered_set<size_t> attached_; }; size_t get_size(Config* config, Logger* logger, std::string const& name, @@ -349,6 +382,7 @@ size_t get_size(Config* config, Logger* logger, std::string const& name, } void ProxyImpl::setup() { + monitor_send_proxied_ = !config_->get("monitor_proxy_request", false); clients_.resize(get_size(config_, logger_, "max_clients", 1024)); monitors_.resize(get_size(config_, logger_, "max_monitors", 2)); looper_->add(accept_socket_.get(), @@ -380,6 +414,7 @@ bool ProxyImpl::reload_config() { logger_->out(Logger::WARN, "New config invalid, ignored."); return true; } + monitor_send_proxied_ = !config_->get("monitor_proxy_request", false); if (!logfile_) { auto logfile = config_->get("logfile", nullptr); if (logfile) { @@ -461,6 +496,16 @@ void ProxyImpl::close_base(BaseClient* client) { void ProxyImpl::close_client(size_t index) { bool was_full = clients_.full(); auto& client = clients_[index]; + if (client.pkg_id != 0) { + size_t avail; + auto ptr = client.in->read_ptr(&avail); + if (avail) { + send_attached_data(client.pkg_id, ptr, avail, true); + } else { + send_attached_data(client.pkg_id, nullptr, 0, true); + } + client.pkg_id = 0; + } client.request.reset(); client.url.reset(); client.connect.reset(); @@ -471,6 +516,16 @@ void ProxyImpl::close_client(size_t index) { resolver_->cancel(client.resolve); client.resolve = nullptr; } + if (client.remote.pkg_id) { + size_t avail; + auto ptr = client.remote.in->read_ptr(&avail); + if (avail) { + send_attached_data(client.remote.pkg_id, ptr, avail, true); + } else { + send_attached_data(client.remote.pkg_id, nullptr, 0, true); + } + client.remote.pkg_id = 0; + } close_base(&client.remote); close_base(&client); clients_.erase(index); @@ -482,6 +537,10 @@ void ProxyImpl::close_client(size_t index) { void ProxyImpl::close_monitor(size_t index) { bool was_full = monitors_.full(); auto& monitor = monitors_[index]; + auto it = attached_.find(index); + if (it != attached_.end()) { + attached_.erase(it); + } close_base(&monitor); monitors_.erase(index); if (was_full && !monitors_.full() && monitor_socket_) { @@ -530,6 +589,8 @@ float ProxyImpl::handle_timeout(bool new_conn, close.clear(); for (auto i = monitors_.begin(); i != monitors_.end(); ++i) { if (i->new_connection != new_conn) continue; + // Monitors are safe from timeout after hello + if (i->got_hello) continue; auto diff = std::chrono::duration_cast<std::chrono::duration<float>>( ((i->last + timeout) - now)).count(); if (diff < 0.0f) { @@ -728,21 +789,9 @@ void ProxyImpl::client_error(size_t index, uint16_t status_code, client.url.reset(); client.connect.reset(); - std::string proto; - Version version; - - if (!client.request) { - proto = "HTTP"; - version.major = 1; - version.minor = 1; - } else { - proto = client.request->proto(); - version = client.request->proto_version(); - } - auto resp = std::unique_ptr<HttpResponseBuilder>( HttpResponseBuilder::create( - proto, version, status_code, status_message)); + "HTTP", Version(1, 0), status_code, status_message)); resp->add_header("Content-Length", "0"); resp->add_header("Connection", "close"); @@ -781,6 +830,13 @@ bool ProxyImpl::client_request(size_t index) { host = client.url->host(); port = client.url->port(); } + if (port == 0) port = 80; + client.pkg_id = get_next_package_id(); + if (client.pkg_id != 0) { + send_attached_package(client.pkg_id, 0, + client.source_host, client.source_port, + host, port, false); + } if (client.remote_state == WAITING) { if (client.connect || host != client.remote.host || port != client.remote.port) { @@ -803,7 +859,6 @@ bool ProxyImpl::client_request(size_t index) { client.remote.host = host; client.remote.port = port; - if (port == 0) port = 80; client.resolve = resolver_->request( host, port, std::bind(&ProxyImpl::client_remote_resolved, this, index, @@ -909,7 +964,10 @@ void ProxyImpl::client_empty_input(size_t index) { // falltrough case CONTENT_NONE: { if (client.connect && client.remote_state == CONNECTED) { - if (!client_send(index, ptr, avail)) { + if (client.pkg_id != 0) { + send_attached_data(client.pkg_id, ptr, avail, false); + } + if (!base_send(&client.remote, ptr, avail, index, "Client remote")) { return; } client.in->consume(avail); @@ -919,6 +977,10 @@ void ProxyImpl::client_empty_input(size_t index) { // Still working on the last request, wait return; } + if (client.pkg_id != 0) { + send_attached_data(client.pkg_id, nullptr, 0, true); + client.pkg_id = 0; + } client.request.reset( HttpRequest::parse( reinterpret_cast<char const*>(ptr), avail, false)); @@ -968,14 +1030,22 @@ void ProxyImpl::client_empty_input(size_t index) { return; } if (avail < client.content.len) { - if (!client_send(index, ptr, avail)) { + if (client.pkg_id != 0) { + send_attached_data(client.pkg_id, ptr, avail, false); + } + if (!base_send(&client.remote, ptr, avail, index, "Client remote")) { return; } client.in->consume(avail); client.content.len -= avail; return; } - if (!client_send(index, ptr, client.content.len)) { + if (client.pkg_id != 0) { + send_attached_data(client.pkg_id, ptr, client.content.len, true); + client.pkg_id = 0; + } + if (!base_send(&client.remote, ptr, client.content.len, index, + "Client remote")) { return; } client.in->consume(client.content.len); @@ -992,11 +1062,17 @@ void ProxyImpl::client_empty_input(size_t index) { client_error(index, 400, "Bad request"); return; } - if (!client_send(index, ptr, used)) { + if (client.pkg_id != 0) { + send_attached_data(client.pkg_id, ptr, used, false); + } + if (!base_send(&client.remote, ptr, used, index, "Client remote")) { return; } client.in->consume(used); if (client.content.chunked->eof()) { + if (client.pkg_id != 0) { + send_attached_data(client.pkg_id, nullptr, 0, true); + } client.content.chunked.reset(); client.content.type = CONTENT_NONE; } @@ -1017,6 +1093,10 @@ void ProxyImpl::close_client_when_done(size_t index) { client.read_flag = 0; client.request.reset(); client.content.type = CONTENT_NONE; + if (client.pkg_id != 0) { + send_attached_data(client.pkg_id, nullptr, 0, true); + client.pkg_id = 0; + } looper_->modify(client.fd.get(), client.write_flag); } @@ -1045,6 +1125,12 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) { client.in->consume(client.request->size()); client.request.reset(); client.remote.content.type = CONTENT_CLOSE; + client.remote.pkg_id = get_next_package_id(); + if (client.remote.pkg_id) { + send_attached_package(client.remote.pkg_id, 0, + client.remote.host, client.remote.port, + client.source_host, client.source_port, false); + } if (!base_send(&client, data.data(), data.size(), index, "Client")) { return; } @@ -1080,10 +1166,30 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) { req->add_header("host", client.url->host()); } auto data = req->build(); + if (client.pkg_id != 0) { + bool const last = client.content.type == CONTENT_NONE; + if (monitor_send_proxied_) { + send_attached_data(client.pkg_id, data.data(), data.size(), last); + } else { + auto ptr = client.in->read_ptr(nullptr); + send_attached_data(client.pkg_id, + ptr, client.request->size(), last); + } + if (last) { + client.pkg_id = 0; + } + } client.in->consume(client.request->size()); client.request.reset(); client.url.reset(); client.remote.out->write(data.data(), data.size()); + + client.remote.pkg_id = get_next_package_id(); + if (client.remote.pkg_id) { + send_attached_package(client.remote.pkg_id, 0, + client.remote.host, client.remote.port, + client.source_host, client.source_port, false); + } } client.remote_state = CONNECTED; client.remote.read_flag = Looper::EVENT_READ; @@ -1117,6 +1223,10 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) { client_remote_error(index, 502); break; case WAITING: + if (client.remote.pkg_id != 0) { + send_attached_data(client.remote.pkg_id, nullptr, 0, true); + client.remote.pkg_id = 0; + } close_base(&client.remote); client.remote_state = CLOSED; break; @@ -1159,6 +1269,16 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) { looper_->modify(client.remote.fd.get(), 0); } } + if (client.remote.pkg_id != 0) { + if (client.remote.content.type == CONTENT_NONE) { + send_attached_data(client.remote.pkg_id, + ptr, response->size(), true); + client.remote.pkg_id = 0; + } else { + send_attached_data(client.remote.pkg_id, + ptr, response->size(), false); + } + } if (!base_send(&client, ptr, response->size(), index, "Client")) { return; } @@ -1167,6 +1287,9 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) { } case CONTENT_LEN: if (avail < client.remote.content.len) { + if (client.remote.pkg_id != 0) { + send_attached_data(client.remote.pkg_id, ptr, avail, false); + } if (!base_send(&client, ptr, avail, index, "Client")) { return; } @@ -1174,6 +1297,11 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) { client.remote.content.len -= avail; return; } + if (client.remote.pkg_id != 0) { + send_attached_data(client.remote.pkg_id, ptr, + client.remote.content.len, true); + client.remote.pkg_id = 0; + } if (!base_send(&client, ptr, client.remote.content.len, index, "Client")) { return; @@ -1194,14 +1322,20 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) { client.remote.content.type = CONTENT_CLOSE; break; } + if (client.remote.pkg_id != 0) { + send_attached_data(client.remote.pkg_id, ptr, used, false); + } if (!base_send(&client, ptr, used, index, "Client")) { return; } client.remote.in->consume(used); if (client.remote.content.chunked->eof()) { + if (client.remote.pkg_id != 0) { + send_attached_data(client.remote.pkg_id, nullptr, 0, true); + client.remote.pkg_id = 0; + } client.remote.content.chunked.reset(); client.remote.content.type = CONTENT_NONE; - logger_->out(Logger::INFO, "%zu: chunked -> waiting", index); client.remote_state = WAITING; client.remote.read_flag = 0; client.remote.write_flag = 0; @@ -1210,6 +1344,9 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) { break; } case CONTENT_CLOSE: + if (client.remote.pkg_id != 0) { + send_attached_data(client.remote.pkg_id, ptr, avail, false); + } if (!base_send(&client, ptr, avail, index, "Client")) { return; } @@ -1228,12 +1365,51 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) { } } -bool ProxyImpl::client_send(size_t index, void const* ptr, size_t size) { - auto& client = clients_[index]; - assert(!client.request); - assert(client.remote_state >= CONNECTED); +void ProxyImpl::monitor_error(size_t index, uint16_t status_code, + std::string const& status_message) { + auto& monitor = monitors_[index]; + // No more input + monitor.read_flag = 0; + looper_->modify(monitor.fd.get(), monitor.write_flag); + + auto resp = std::unique_ptr<HttpResponseBuilder>( + HttpResponseBuilder::create( + "HTTP", Version(1, 0), status_code, status_message)); + resp->add_header("Content-Length", "0"); + resp->add_header("Connection", "close"); - return base_send(&client.remote, ptr, size, index, "Client remote"); + monitor.in.reset(); + + auto data = resp->build(); + + if (!base_send(&monitor, data.data(), data.size(), index, "Monitor")) { + close_monitor(index); + } +} + +bool ProxyImpl::support_monitor_version( + size_t index, std::string const& version) { + if (version.empty()) return false; + // TODO: Actually do some version check here + return version.compare(VERSION) == 0; +} + +bool ProxyImpl::monitor_send_chunked( + size_t index, void const* header, size_t header_size, + void const* data, size_t data_size) { + auto& monitor = monitors_[index]; + char chunked_header[10]; + auto len = snprintf(chunked_header, 10, "%zx\r\n", header_size + data_size); + if (!base_send(&monitor, chunked_header, len, index, "Monitor")) return false; + if (header_size > 0 && !base_send(&monitor, + header, header_size, index, "Monitor")) { + return false; + } + if (data_size > 0 + && !base_send(&monitor, data, data_size, index, "Monitor")) { + return false; + } + return base_send(&monitor, "\r\n", 2, index, "Monitor"); } void ProxyImpl::monitor_event(size_t index, int fd, uint8_t events) { @@ -1252,6 +1428,87 @@ void ProxyImpl::monitor_event(size_t index, int fd, uint8_t events) { close_monitor(index); return; } + + while (true) { + size_t avail; + auto ptr = monitor.in->read_ptr(&avail); + if (avail == 0) return; + auto request = std::unique_ptr<HttpRequest>( + HttpRequest::parse( + reinterpret_cast<char const*>(ptr), avail, false)); + if (!request) { + if (avail >= 1024 * 1024) { + logger_->out(Logger::INFO, "%zu: Monitor too large request %zu", + index, avail); + close_monitor(index); + } + return; + } + if (!request->good() + || !request->proto_equal("HTTP") + || !request->method_equal("GET") + // Only support 1.1 or above + || request->proto_version().major == 0 + || (request->proto_version().major == 1 + && request->proto_version().minor == 0)) { + monitor_error(index, 400, "Bad request"); + return; + } + Content content; + content.type = CONTENT_NONE; + if (!setup_content(request.get(), &content)) { + monitor_error(index, 400, "Bad request"); + return; + } + if (content.type != CONTENT_NONE) { + monitor_error(index, 400, "Bad request"); + return; + } + std::unique_ptr<HttpResponseBuilder> resp; + if (request->url_equal("/hello")) { + auto version = request->first_header("x-tp-monitor-version"); + if (support_monitor_version(index, version)) { + resp.reset(HttpResponseBuilder::create( + "HTTP", Version(1, 1), 200, "OK")); + monitor.got_hello = true; + } else { + resp.reset(HttpResponseBuilder::create( + "HTTP", Version(1, 1), 500, "Unsupported version")); + } + resp->add_header("X-TP-Version", VERSION); + resp->add_header("Content-Length", "0"); + } else if (!monitor.got_hello) { + monitor_error(index, 500, "Unexpected request"); + return; + } else if (request->url_equal("/attach")) { + attached_.insert(index); + resp.reset(HttpResponseBuilder::create( + "HTTP", Version(1, 1), 200, "OK")); + resp->add_header("Transfer-Encoding", "chunked"); + } else if (request->url_equal("/detach")) { + auto it = attached_.find(index); + if (it != attached_.end()) { + if (!monitor_send_chunked(index, nullptr, 0, nullptr, 0)) { + close_monitor(index); + return; + } + attached_.erase(it); + } + resp.reset(HttpResponseBuilder::create( + "HTTP", Version(1, 1), 200, "OK")); + resp->add_header("Content-Length", "0"); + } else { + resp.reset(HttpResponseBuilder::create( + "HTTP", Version(1, 1), 404, "Not found")); + resp->add_header("Content-Length", "0"); + } + auto data = resp->build(); + monitor.in->consume(request->size()); + if (!base_send(&monitor, data.data(), data.size(), index, "Monitor")) { + close_monitor(index); + return; + } + } } void ProxyImpl::new_base(BaseClient* client, int fd) { @@ -1286,12 +1543,20 @@ int my_accept4(int sockfd, struct sockaddr* addr, socklen_t* addrlen, } #endif // HAVE_ACCEPT4 +union big_addr { + struct sockaddr_in addr_in; + struct sockaddr_in6 addr_in6; +}; + void ProxyImpl::new_client(int fd, uint8_t events) { assert(fd == accept_socket_.get()); if (events == Looper::EVENT_READ) { assert(!clients_.full()); while (true) { - int ret = accept4(fd, nullptr, nullptr, SOCK_NONBLOCK); + big_addr addr; + socklen_t len = sizeof(addr); + auto a = reinterpret_cast<struct sockaddr*>(&addr); + int ret = accept4(fd, a, &len, SOCK_NONBLOCK); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) return; if (errno == EINTR) continue; @@ -1300,9 +1565,28 @@ void ProxyImpl::new_client(int fd, uint8_t events) { } auto index = clients_.new_client(); new_base(&clients_[index], ret); + clients_[index].source_host.clear(); + if (len == sizeof(struct sockaddr_in) && a->sa_family == AF_INET) { + char tmp[INET_ADDRSTRLEN]; + if (inet_ntop(AF_INET, &addr.addr_in.sin_addr, tmp, sizeof(tmp))) { + clients_[index].source_host = tmp; + } + clients_[index].source_port = ntohs(addr.addr_in.sin_port); + } else if (len == sizeof(struct sockaddr_in6) + && a->sa_family == AF_INET6) { + char tmp[INET6_ADDRSTRLEN]; + if (inet_ntop(AF_INET6, &addr.addr_in6.sin6_addr, tmp, sizeof(tmp))) { + clients_[index].source_host = tmp; + } + clients_[index].source_port = ntohs(addr.addr_in6.sin6_port); + } else { + clients_[index].source_port = 0; + } clients_[index].content.type = CONTENT_NONE; clients_[index].remote_state = CLOSED; clients_[index].remote.content.type = CONTENT_NONE; + clients_[index].pkg_id = 0; + clients_[index].remote.pkg_id = 0; looper_->add(ret, clients_[index].read_flag | clients_[index].write_flag, std::bind(&ProxyImpl::client_event, this, index, @@ -1331,16 +1615,16 @@ void ProxyImpl::new_monitor(int fd, uint8_t events) { } auto index = monitors_.new_client(); new_base(&monitors_[index], ret); - looper_->add(ret, clients_[index].read_flag | clients_[index].write_flag, + monitors_[index].got_hello = false; + looper_->add(ret, monitors_[index].read_flag + | monitors_[index].write_flag, std::bind(&ProxyImpl::monitor_event, this, index, std::placeholders::_1, std::placeholders::_2)); break; } - if (monitors_.full()) { - looper_->modify(fd, 0); - } + if (monitors_.full()) looper_->modify(fd, 0); } else { logger_->out(Logger::ERR, "Monitor socket died"); fatal_error(); @@ -1383,6 +1667,131 @@ bool ProxyImpl::run() { return good_; } +uint32_t ProxyImpl::get_next_package_id() { + if (attached_.empty()) return 0; + while (!next_package_id_) { + ++next_package_id_; + } + return next_package_id_++; +} + +void write_u16(uint8_t* dst, uint16_t value) { + dst[0] = value >> 8; + dst[1] = value & 0xff; +} + +void write_u32(uint8_t* dst, uint32_t value) { + write_u16(dst, value >> 16); + write_u16(dst + 2, value & 0xffff); +} + +void write_u64(uint8_t* dst, uint64_t value) { + write_u32(dst, value >> 32); + write_u32(dst + 4, value & 0xffffffff); +} + +void ProxyImpl::send_attached_package(uint32_t id, uint16_t flags, + std::string const& source_host, + uint16_t source_port, + std::string const& target_host, + uint16_t target_port, + bool last) { + if (id == 0) { + assert(false); + return; + } + if (attached_.empty()) return; + uint8_t data[256]; + size_t need = 2 + 3 + 4 + 8 + 4 + 2 + 2 + 2 + 2 + 2 + source_host.size() + + target_host.size(); + if (need <= sizeof(data)) { + send_attached_package2(data, need, id, flags, source_host, source_port, + target_host, target_port, last); + } else { + // TODO: Might need better handling of really long source_host/target_host + auto p = std::unique_ptr<uint8_t[]>(new uint8_t[need]); + send_attached_package2(p.get(), need, id, flags, source_host, source_port, + target_host, target_port, last); + } +} + +void ProxyImpl::send_attached_package2(uint8_t* buffer, size_t size, + uint32_t id, uint16_t flags, + std::string const& source_host, + uint16_t source_port, + std::string const& target_host, + uint16_t target_port, + bool last) { + buffer[0] = 'P'; + buffer[1] = 'K'; + buffer[2] = 'G'; + write_u16(buffer + 3, size); + write_u32(buffer + 5, id); + auto dur = looper_->now().time_since_epoch(); + auto sec = std::chrono::duration_cast<std::chrono::seconds>(dur); + auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(dur - sec); + write_u64(buffer + 9, sec.count()); + write_u32(buffer + 17, nsec.count()); + write_u16(buffer + 21, (last ? 0 : 1) | (flags << 1)); + write_u16(buffer + 23, source_port); + write_u16(buffer + 25, target_port); + write_u16(buffer + 27, source_host.size()); + memcpy(buffer + 29, source_host.data(), source_host.size()); + write_u16(buffer + 29 + source_host.size(), target_host.size()); + memcpy(buffer + 31 + source_host.size(), + target_host.data(), target_host.size()); + send_attached(buffer, size, nullptr, 0); +} + +void ProxyImpl::send_attached_data(uint32_t id, void const* ptr, size_t size, + bool last) { + if (id == 0) { + assert(false); + return; + } + if (attached_.empty()) return; + if (size == 0 && !last) return; + uint8_t data[10]; + data[0] = 'D'; + data[1] = 'A'; + data[2] = 'T'; + write_u32(data + 5, id); + if (size == 0) { + assert(last); + assert(ptr == nullptr); + write_u16(data + 3, 10); + data[9] = 0; + send_attached(data, 10, nullptr, 0); + } else { + size_t max = 0xffff - 10; + auto p = reinterpret_cast<char const*>(ptr); + data[9] = 1; + while (size > max) { + write_u16(data + 3, 0xffff); + send_attached(data, 10, p, max); + p += max; + size -= max; + } + data[9] = last ? 0 : 1; + write_u16(data + 3, size + 10); + send_attached(data, 10, p, size); + } +} + +void ProxyImpl::send_attached(void const* header, size_t header_size, + void const* data, size_t data_size) { + auto it = attached_.begin(); + while (it != attached_.end()) { + if (monitor_send_chunked(*it, header, header_size, data, data_size)) { + ++it; + } else { + auto index = *it; + it = attached_.erase(it); + close_monitor(index); + } + } +} + int setup_socket(char const* host, std::string const& port, Logger* logger) { io::auto_fd ret; struct addrinfo hints; |
