summaryrefslogtreecommitdiff
path: root/src/proxy.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/proxy.cc')
-rw-r--r--src/proxy.cc471
1 files changed, 440 insertions, 31 deletions
diff --git a/src/proxy.cc b/src/proxy.cc
index dcc52df..5abe257 100644
--- a/src/proxy.cc
+++ b/src/proxy.cc
@@ -6,6 +6,7 @@
#define _GNU_SOURCE
#endif
+#include <arpa/inet.h>
#include <cerrno>
#include <chrono>
#include <cstring>
@@ -17,6 +18,7 @@
#include <sys/types.h>
#include <unistd.h>
#include <vector>
+#include <unordered_set>
#include "buffer.hh"
#include "chunked.hh"
@@ -229,6 +231,7 @@ struct RemoteClient : public BaseClient {
Content content;
std::string host;
uint16_t port;
+ uint32_t pkg_id;
};
struct Client : public BaseClient {
@@ -242,9 +245,13 @@ struct Client : public BaseClient {
RemoteState remote_state;
void* resolve;
RemoteClient remote;
+ std::string source_host;
+ uint16_t source_port;
+ uint32_t pkg_id;
};
struct Monitor : public BaseClient {
+ bool got_hello;
};
class ProxyImpl : public Proxy {
@@ -254,7 +261,8 @@ public:
: config_(config), cwd_(cwd), configfile_(configfile), logfile_(logfile),
logger_(logger), accept_socket_(accept_fd), monitor_socket_(monitor_fd),
looper_(Looper::create()), resolver_(Resolver::create(looper_.get())),
- new_timeout_(nullptr), timeout_(nullptr) {
+ new_timeout_(nullptr), timeout_(nullptr), next_package_id_(1),
+ monitor_send_proxied_(false) {
setup();
}
~ProxyImpl() override {
@@ -307,11 +315,33 @@ private:
void client_error(size_t index,
uint16_t status_code, std::string const& status);
bool client_request(size_t index);
- bool client_send(size_t index, void const* ptr, size_t avail);
void client_remote_error(size_t index, uint16_t error);
void close_client_when_done(size_t index);
void client_remote_resolved(size_t index, int fd, bool connected,
char const* error);
+ void monitor_error(size_t index,
+ uint16_t status_code, std::string const& status);
+ bool support_monitor_version(size_t index, std::string const& version);
+ bool monitor_send_chunked(size_t index,
+ void const* header, size_t header_size,
+ void const* data, size_t data_size);
+ uint32_t get_next_package_id();
+ void send_attached_package(uint32_t id, uint16_t flags,
+ std::string const& source_host,
+ uint16_t source_port,
+ std::string const& target_host,
+ uint16_t target_port,
+ bool last);
+ void send_attached_package2(uint8_t* buffer, size_t size,
+ uint32_t id, uint16_t flags,
+ std::string const& source_host,
+ uint16_t source_port,
+ std::string const& target_host,
+ uint16_t target_port,
+ bool last);
+ void send_attached_data(uint32_t id, void const* ptr, size_t size, bool last);
+ void send_attached(void const* header, size_t header_size,
+ void const* data, size_t data_size);
Config* const config_;
std::string cwd_;
@@ -327,9 +357,12 @@ private:
bool good_;
void* new_timeout_;
void* timeout_;
+ uint32_t next_package_id_;
+ bool monitor_send_proxied_;
clients<Client> clients_;
clients<Monitor> monitors_;
+ std::unordered_set<size_t> attached_;
};
size_t get_size(Config* config, Logger* logger, std::string const& name,
@@ -349,6 +382,7 @@ size_t get_size(Config* config, Logger* logger, std::string const& name,
}
void ProxyImpl::setup() {
+ monitor_send_proxied_ = !config_->get("monitor_proxy_request", false);
clients_.resize(get_size(config_, logger_, "max_clients", 1024));
monitors_.resize(get_size(config_, logger_, "max_monitors", 2));
looper_->add(accept_socket_.get(),
@@ -380,6 +414,7 @@ bool ProxyImpl::reload_config() {
logger_->out(Logger::WARN, "New config invalid, ignored.");
return true;
}
+ monitor_send_proxied_ = !config_->get("monitor_proxy_request", false);
if (!logfile_) {
auto logfile = config_->get("logfile", nullptr);
if (logfile) {
@@ -461,6 +496,16 @@ void ProxyImpl::close_base(BaseClient* client) {
void ProxyImpl::close_client(size_t index) {
bool was_full = clients_.full();
auto& client = clients_[index];
+ if (client.pkg_id != 0) {
+ size_t avail;
+ auto ptr = client.in->read_ptr(&avail);
+ if (avail) {
+ send_attached_data(client.pkg_id, ptr, avail, true);
+ } else {
+ send_attached_data(client.pkg_id, nullptr, 0, true);
+ }
+ client.pkg_id = 0;
+ }
client.request.reset();
client.url.reset();
client.connect.reset();
@@ -471,6 +516,16 @@ void ProxyImpl::close_client(size_t index) {
resolver_->cancel(client.resolve);
client.resolve = nullptr;
}
+ if (client.remote.pkg_id) {
+ size_t avail;
+ auto ptr = client.remote.in->read_ptr(&avail);
+ if (avail) {
+ send_attached_data(client.remote.pkg_id, ptr, avail, true);
+ } else {
+ send_attached_data(client.remote.pkg_id, nullptr, 0, true);
+ }
+ client.remote.pkg_id = 0;
+ }
close_base(&client.remote);
close_base(&client);
clients_.erase(index);
@@ -482,6 +537,10 @@ void ProxyImpl::close_client(size_t index) {
void ProxyImpl::close_monitor(size_t index) {
bool was_full = monitors_.full();
auto& monitor = monitors_[index];
+ auto it = attached_.find(index);
+ if (it != attached_.end()) {
+ attached_.erase(it);
+ }
close_base(&monitor);
monitors_.erase(index);
if (was_full && !monitors_.full() && monitor_socket_) {
@@ -530,6 +589,8 @@ float ProxyImpl::handle_timeout(bool new_conn,
close.clear();
for (auto i = monitors_.begin(); i != monitors_.end(); ++i) {
if (i->new_connection != new_conn) continue;
+ // Monitors are safe from timeout after hello
+ if (i->got_hello) continue;
auto diff = std::chrono::duration_cast<std::chrono::duration<float>>(
((i->last + timeout) - now)).count();
if (diff < 0.0f) {
@@ -728,21 +789,9 @@ void ProxyImpl::client_error(size_t index, uint16_t status_code,
client.url.reset();
client.connect.reset();
- std::string proto;
- Version version;
-
- if (!client.request) {
- proto = "HTTP";
- version.major = 1;
- version.minor = 1;
- } else {
- proto = client.request->proto();
- version = client.request->proto_version();
- }
-
auto resp = std::unique_ptr<HttpResponseBuilder>(
HttpResponseBuilder::create(
- proto, version, status_code, status_message));
+ "HTTP", Version(1, 0), status_code, status_message));
resp->add_header("Content-Length", "0");
resp->add_header("Connection", "close");
@@ -781,6 +830,13 @@ bool ProxyImpl::client_request(size_t index) {
host = client.url->host();
port = client.url->port();
}
+ if (port == 0) port = 80;
+ client.pkg_id = get_next_package_id();
+ if (client.pkg_id != 0) {
+ send_attached_package(client.pkg_id, 0,
+ client.source_host, client.source_port,
+ host, port, false);
+ }
if (client.remote_state == WAITING) {
if (client.connect || host != client.remote.host
|| port != client.remote.port) {
@@ -803,7 +859,6 @@ bool ProxyImpl::client_request(size_t index) {
client.remote.host = host;
client.remote.port = port;
- if (port == 0) port = 80;
client.resolve = resolver_->request(
host, port,
std::bind(&ProxyImpl::client_remote_resolved, this, index,
@@ -909,7 +964,10 @@ void ProxyImpl::client_empty_input(size_t index) {
// falltrough
case CONTENT_NONE: {
if (client.connect && client.remote_state == CONNECTED) {
- if (!client_send(index, ptr, avail)) {
+ if (client.pkg_id != 0) {
+ send_attached_data(client.pkg_id, ptr, avail, false);
+ }
+ if (!base_send(&client.remote, ptr, avail, index, "Client remote")) {
return;
}
client.in->consume(avail);
@@ -919,6 +977,10 @@ void ProxyImpl::client_empty_input(size_t index) {
// Still working on the last request, wait
return;
}
+ if (client.pkg_id != 0) {
+ send_attached_data(client.pkg_id, nullptr, 0, true);
+ client.pkg_id = 0;
+ }
client.request.reset(
HttpRequest::parse(
reinterpret_cast<char const*>(ptr), avail, false));
@@ -968,14 +1030,22 @@ void ProxyImpl::client_empty_input(size_t index) {
return;
}
if (avail < client.content.len) {
- if (!client_send(index, ptr, avail)) {
+ if (client.pkg_id != 0) {
+ send_attached_data(client.pkg_id, ptr, avail, false);
+ }
+ if (!base_send(&client.remote, ptr, avail, index, "Client remote")) {
return;
}
client.in->consume(avail);
client.content.len -= avail;
return;
}
- if (!client_send(index, ptr, client.content.len)) {
+ if (client.pkg_id != 0) {
+ send_attached_data(client.pkg_id, ptr, client.content.len, true);
+ client.pkg_id = 0;
+ }
+ if (!base_send(&client.remote, ptr, client.content.len, index,
+ "Client remote")) {
return;
}
client.in->consume(client.content.len);
@@ -992,11 +1062,17 @@ void ProxyImpl::client_empty_input(size_t index) {
client_error(index, 400, "Bad request");
return;
}
- if (!client_send(index, ptr, used)) {
+ if (client.pkg_id != 0) {
+ send_attached_data(client.pkg_id, ptr, used, false);
+ }
+ if (!base_send(&client.remote, ptr, used, index, "Client remote")) {
return;
}
client.in->consume(used);
if (client.content.chunked->eof()) {
+ if (client.pkg_id != 0) {
+ send_attached_data(client.pkg_id, nullptr, 0, true);
+ }
client.content.chunked.reset();
client.content.type = CONTENT_NONE;
}
@@ -1017,6 +1093,10 @@ void ProxyImpl::close_client_when_done(size_t index) {
client.read_flag = 0;
client.request.reset();
client.content.type = CONTENT_NONE;
+ if (client.pkg_id != 0) {
+ send_attached_data(client.pkg_id, nullptr, 0, true);
+ client.pkg_id = 0;
+ }
looper_->modify(client.fd.get(), client.write_flag);
}
@@ -1045,6 +1125,12 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) {
client.in->consume(client.request->size());
client.request.reset();
client.remote.content.type = CONTENT_CLOSE;
+ client.remote.pkg_id = get_next_package_id();
+ if (client.remote.pkg_id) {
+ send_attached_package(client.remote.pkg_id, 0,
+ client.remote.host, client.remote.port,
+ client.source_host, client.source_port, false);
+ }
if (!base_send(&client, data.data(), data.size(), index, "Client")) {
return;
}
@@ -1080,10 +1166,30 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) {
req->add_header("host", client.url->host());
}
auto data = req->build();
+ if (client.pkg_id != 0) {
+ bool const last = client.content.type == CONTENT_NONE;
+ if (monitor_send_proxied_) {
+ send_attached_data(client.pkg_id, data.data(), data.size(), last);
+ } else {
+ auto ptr = client.in->read_ptr(nullptr);
+ send_attached_data(client.pkg_id,
+ ptr, client.request->size(), last);
+ }
+ if (last) {
+ client.pkg_id = 0;
+ }
+ }
client.in->consume(client.request->size());
client.request.reset();
client.url.reset();
client.remote.out->write(data.data(), data.size());
+
+ client.remote.pkg_id = get_next_package_id();
+ if (client.remote.pkg_id) {
+ send_attached_package(client.remote.pkg_id, 0,
+ client.remote.host, client.remote.port,
+ client.source_host, client.source_port, false);
+ }
}
client.remote_state = CONNECTED;
client.remote.read_flag = Looper::EVENT_READ;
@@ -1117,6 +1223,10 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) {
client_remote_error(index, 502);
break;
case WAITING:
+ if (client.remote.pkg_id != 0) {
+ send_attached_data(client.remote.pkg_id, nullptr, 0, true);
+ client.remote.pkg_id = 0;
+ }
close_base(&client.remote);
client.remote_state = CLOSED;
break;
@@ -1159,6 +1269,16 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) {
looper_->modify(client.remote.fd.get(), 0);
}
}
+ if (client.remote.pkg_id != 0) {
+ if (client.remote.content.type == CONTENT_NONE) {
+ send_attached_data(client.remote.pkg_id,
+ ptr, response->size(), true);
+ client.remote.pkg_id = 0;
+ } else {
+ send_attached_data(client.remote.pkg_id,
+ ptr, response->size(), false);
+ }
+ }
if (!base_send(&client, ptr, response->size(), index, "Client")) {
return;
}
@@ -1167,6 +1287,9 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) {
}
case CONTENT_LEN:
if (avail < client.remote.content.len) {
+ if (client.remote.pkg_id != 0) {
+ send_attached_data(client.remote.pkg_id, ptr, avail, false);
+ }
if (!base_send(&client, ptr, avail, index, "Client")) {
return;
}
@@ -1174,6 +1297,11 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) {
client.remote.content.len -= avail;
return;
}
+ if (client.remote.pkg_id != 0) {
+ send_attached_data(client.remote.pkg_id, ptr,
+ client.remote.content.len, true);
+ client.remote.pkg_id = 0;
+ }
if (!base_send(&client, ptr, client.remote.content.len,
index, "Client")) {
return;
@@ -1194,14 +1322,20 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) {
client.remote.content.type = CONTENT_CLOSE;
break;
}
+ if (client.remote.pkg_id != 0) {
+ send_attached_data(client.remote.pkg_id, ptr, used, false);
+ }
if (!base_send(&client, ptr, used, index, "Client")) {
return;
}
client.remote.in->consume(used);
if (client.remote.content.chunked->eof()) {
+ if (client.remote.pkg_id != 0) {
+ send_attached_data(client.remote.pkg_id, nullptr, 0, true);
+ client.remote.pkg_id = 0;
+ }
client.remote.content.chunked.reset();
client.remote.content.type = CONTENT_NONE;
- logger_->out(Logger::INFO, "%zu: chunked -> waiting", index);
client.remote_state = WAITING;
client.remote.read_flag = 0;
client.remote.write_flag = 0;
@@ -1210,6 +1344,9 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) {
break;
}
case CONTENT_CLOSE:
+ if (client.remote.pkg_id != 0) {
+ send_attached_data(client.remote.pkg_id, ptr, avail, false);
+ }
if (!base_send(&client, ptr, avail, index, "Client")) {
return;
}
@@ -1228,12 +1365,51 @@ void ProxyImpl::client_remote_event(size_t index, int fd, uint8_t events) {
}
}
-bool ProxyImpl::client_send(size_t index, void const* ptr, size_t size) {
- auto& client = clients_[index];
- assert(!client.request);
- assert(client.remote_state >= CONNECTED);
+void ProxyImpl::monitor_error(size_t index, uint16_t status_code,
+ std::string const& status_message) {
+ auto& monitor = monitors_[index];
+ // No more input
+ monitor.read_flag = 0;
+ looper_->modify(monitor.fd.get(), monitor.write_flag);
+
+ auto resp = std::unique_ptr<HttpResponseBuilder>(
+ HttpResponseBuilder::create(
+ "HTTP", Version(1, 0), status_code, status_message));
+ resp->add_header("Content-Length", "0");
+ resp->add_header("Connection", "close");
- return base_send(&client.remote, ptr, size, index, "Client remote");
+ monitor.in.reset();
+
+ auto data = resp->build();
+
+ if (!base_send(&monitor, data.data(), data.size(), index, "Monitor")) {
+ close_monitor(index);
+ }
+}
+
+bool ProxyImpl::support_monitor_version(
+ size_t index, std::string const& version) {
+ if (version.empty()) return false;
+ // TODO: Actually do some version check here
+ return version.compare(VERSION) == 0;
+}
+
+bool ProxyImpl::monitor_send_chunked(
+ size_t index, void const* header, size_t header_size,
+ void const* data, size_t data_size) {
+ auto& monitor = monitors_[index];
+ char chunked_header[10];
+ auto len = snprintf(chunked_header, 10, "%zx\r\n", header_size + data_size);
+ if (!base_send(&monitor, chunked_header, len, index, "Monitor")) return false;
+ if (header_size > 0 && !base_send(&monitor,
+ header, header_size, index, "Monitor")) {
+ return false;
+ }
+ if (data_size > 0
+ && !base_send(&monitor, data, data_size, index, "Monitor")) {
+ return false;
+ }
+ return base_send(&monitor, "\r\n", 2, index, "Monitor");
}
void ProxyImpl::monitor_event(size_t index, int fd, uint8_t events) {
@@ -1252,6 +1428,87 @@ void ProxyImpl::monitor_event(size_t index, int fd, uint8_t events) {
close_monitor(index);
return;
}
+
+ while (true) {
+ size_t avail;
+ auto ptr = monitor.in->read_ptr(&avail);
+ if (avail == 0) return;
+ auto request = std::unique_ptr<HttpRequest>(
+ HttpRequest::parse(
+ reinterpret_cast<char const*>(ptr), avail, false));
+ if (!request) {
+ if (avail >= 1024 * 1024) {
+ logger_->out(Logger::INFO, "%zu: Monitor too large request %zu",
+ index, avail);
+ close_monitor(index);
+ }
+ return;
+ }
+ if (!request->good()
+ || !request->proto_equal("HTTP")
+ || !request->method_equal("GET")
+ // Only support 1.1 or above
+ || request->proto_version().major == 0
+ || (request->proto_version().major == 1
+ && request->proto_version().minor == 0)) {
+ monitor_error(index, 400, "Bad request");
+ return;
+ }
+ Content content;
+ content.type = CONTENT_NONE;
+ if (!setup_content(request.get(), &content)) {
+ monitor_error(index, 400, "Bad request");
+ return;
+ }
+ if (content.type != CONTENT_NONE) {
+ monitor_error(index, 400, "Bad request");
+ return;
+ }
+ std::unique_ptr<HttpResponseBuilder> resp;
+ if (request->url_equal("/hello")) {
+ auto version = request->first_header("x-tp-monitor-version");
+ if (support_monitor_version(index, version)) {
+ resp.reset(HttpResponseBuilder::create(
+ "HTTP", Version(1, 1), 200, "OK"));
+ monitor.got_hello = true;
+ } else {
+ resp.reset(HttpResponseBuilder::create(
+ "HTTP", Version(1, 1), 500, "Unsupported version"));
+ }
+ resp->add_header("X-TP-Version", VERSION);
+ resp->add_header("Content-Length", "0");
+ } else if (!monitor.got_hello) {
+ monitor_error(index, 500, "Unexpected request");
+ return;
+ } else if (request->url_equal("/attach")) {
+ attached_.insert(index);
+ resp.reset(HttpResponseBuilder::create(
+ "HTTP", Version(1, 1), 200, "OK"));
+ resp->add_header("Transfer-Encoding", "chunked");
+ } else if (request->url_equal("/detach")) {
+ auto it = attached_.find(index);
+ if (it != attached_.end()) {
+ if (!monitor_send_chunked(index, nullptr, 0, nullptr, 0)) {
+ close_monitor(index);
+ return;
+ }
+ attached_.erase(it);
+ }
+ resp.reset(HttpResponseBuilder::create(
+ "HTTP", Version(1, 1), 200, "OK"));
+ resp->add_header("Content-Length", "0");
+ } else {
+ resp.reset(HttpResponseBuilder::create(
+ "HTTP", Version(1, 1), 404, "Not found"));
+ resp->add_header("Content-Length", "0");
+ }
+ auto data = resp->build();
+ monitor.in->consume(request->size());
+ if (!base_send(&monitor, data.data(), data.size(), index, "Monitor")) {
+ close_monitor(index);
+ return;
+ }
+ }
}
void ProxyImpl::new_base(BaseClient* client, int fd) {
@@ -1286,12 +1543,20 @@ int my_accept4(int sockfd, struct sockaddr* addr, socklen_t* addrlen,
}
#endif // HAVE_ACCEPT4
+union big_addr {
+ struct sockaddr_in addr_in;
+ struct sockaddr_in6 addr_in6;
+};
+
void ProxyImpl::new_client(int fd, uint8_t events) {
assert(fd == accept_socket_.get());
if (events == Looper::EVENT_READ) {
assert(!clients_.full());
while (true) {
- int ret = accept4(fd, nullptr, nullptr, SOCK_NONBLOCK);
+ big_addr addr;
+ socklen_t len = sizeof(addr);
+ auto a = reinterpret_cast<struct sockaddr*>(&addr);
+ int ret = accept4(fd, a, &len, SOCK_NONBLOCK);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) return;
if (errno == EINTR) continue;
@@ -1300,9 +1565,28 @@ void ProxyImpl::new_client(int fd, uint8_t events) {
}
auto index = clients_.new_client();
new_base(&clients_[index], ret);
+ clients_[index].source_host.clear();
+ if (len == sizeof(struct sockaddr_in) && a->sa_family == AF_INET) {
+ char tmp[INET_ADDRSTRLEN];
+ if (inet_ntop(AF_INET, &addr.addr_in.sin_addr, tmp, sizeof(tmp))) {
+ clients_[index].source_host = tmp;
+ }
+ clients_[index].source_port = ntohs(addr.addr_in.sin_port);
+ } else if (len == sizeof(struct sockaddr_in6)
+ && a->sa_family == AF_INET6) {
+ char tmp[INET6_ADDRSTRLEN];
+ if (inet_ntop(AF_INET6, &addr.addr_in6.sin6_addr, tmp, sizeof(tmp))) {
+ clients_[index].source_host = tmp;
+ }
+ clients_[index].source_port = ntohs(addr.addr_in6.sin6_port);
+ } else {
+ clients_[index].source_port = 0;
+ }
clients_[index].content.type = CONTENT_NONE;
clients_[index].remote_state = CLOSED;
clients_[index].remote.content.type = CONTENT_NONE;
+ clients_[index].pkg_id = 0;
+ clients_[index].remote.pkg_id = 0;
looper_->add(ret, clients_[index].read_flag | clients_[index].write_flag,
std::bind(&ProxyImpl::client_event, this,
index,
@@ -1331,16 +1615,16 @@ void ProxyImpl::new_monitor(int fd, uint8_t events) {
}
auto index = monitors_.new_client();
new_base(&monitors_[index], ret);
- looper_->add(ret, clients_[index].read_flag | clients_[index].write_flag,
+ monitors_[index].got_hello = false;
+ looper_->add(ret, monitors_[index].read_flag
+ | monitors_[index].write_flag,
std::bind(&ProxyImpl::monitor_event, this,
index,
std::placeholders::_1,
std::placeholders::_2));
break;
}
- if (monitors_.full()) {
- looper_->modify(fd, 0);
- }
+ if (monitors_.full()) looper_->modify(fd, 0);
} else {
logger_->out(Logger::ERR, "Monitor socket died");
fatal_error();
@@ -1383,6 +1667,131 @@ bool ProxyImpl::run() {
return good_;
}
+uint32_t ProxyImpl::get_next_package_id() {
+ if (attached_.empty()) return 0;
+ while (!next_package_id_) {
+ ++next_package_id_;
+ }
+ return next_package_id_++;
+}
+
+void write_u16(uint8_t* dst, uint16_t value) {
+ dst[0] = value >> 8;
+ dst[1] = value & 0xff;
+}
+
+void write_u32(uint8_t* dst, uint32_t value) {
+ write_u16(dst, value >> 16);
+ write_u16(dst + 2, value & 0xffff);
+}
+
+void write_u64(uint8_t* dst, uint64_t value) {
+ write_u32(dst, value >> 32);
+ write_u32(dst + 4, value & 0xffffffff);
+}
+
+void ProxyImpl::send_attached_package(uint32_t id, uint16_t flags,
+ std::string const& source_host,
+ uint16_t source_port,
+ std::string const& target_host,
+ uint16_t target_port,
+ bool last) {
+ if (id == 0) {
+ assert(false);
+ return;
+ }
+ if (attached_.empty()) return;
+ uint8_t data[256];
+ size_t need = 2 + 3 + 4 + 8 + 4 + 2 + 2 + 2 + 2 + 2 + source_host.size() +
+ target_host.size();
+ if (need <= sizeof(data)) {
+ send_attached_package2(data, need, id, flags, source_host, source_port,
+ target_host, target_port, last);
+ } else {
+ // TODO: Might need better handling of really long source_host/target_host
+ auto p = std::unique_ptr<uint8_t[]>(new uint8_t[need]);
+ send_attached_package2(p.get(), need, id, flags, source_host, source_port,
+ target_host, target_port, last);
+ }
+}
+
+void ProxyImpl::send_attached_package2(uint8_t* buffer, size_t size,
+ uint32_t id, uint16_t flags,
+ std::string const& source_host,
+ uint16_t source_port,
+ std::string const& target_host,
+ uint16_t target_port,
+ bool last) {
+ buffer[0] = 'P';
+ buffer[1] = 'K';
+ buffer[2] = 'G';
+ write_u16(buffer + 3, size);
+ write_u32(buffer + 5, id);
+ auto dur = looper_->now().time_since_epoch();
+ auto sec = std::chrono::duration_cast<std::chrono::seconds>(dur);
+ auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(dur - sec);
+ write_u64(buffer + 9, sec.count());
+ write_u32(buffer + 17, nsec.count());
+ write_u16(buffer + 21, (last ? 0 : 1) | (flags << 1));
+ write_u16(buffer + 23, source_port);
+ write_u16(buffer + 25, target_port);
+ write_u16(buffer + 27, source_host.size());
+ memcpy(buffer + 29, source_host.data(), source_host.size());
+ write_u16(buffer + 29 + source_host.size(), target_host.size());
+ memcpy(buffer + 31 + source_host.size(),
+ target_host.data(), target_host.size());
+ send_attached(buffer, size, nullptr, 0);
+}
+
+void ProxyImpl::send_attached_data(uint32_t id, void const* ptr, size_t size,
+ bool last) {
+ if (id == 0) {
+ assert(false);
+ return;
+ }
+ if (attached_.empty()) return;
+ if (size == 0 && !last) return;
+ uint8_t data[10];
+ data[0] = 'D';
+ data[1] = 'A';
+ data[2] = 'T';
+ write_u32(data + 5, id);
+ if (size == 0) {
+ assert(last);
+ assert(ptr == nullptr);
+ write_u16(data + 3, 10);
+ data[9] = 0;
+ send_attached(data, 10, nullptr, 0);
+ } else {
+ size_t max = 0xffff - 10;
+ auto p = reinterpret_cast<char const*>(ptr);
+ data[9] = 1;
+ while (size > max) {
+ write_u16(data + 3, 0xffff);
+ send_attached(data, 10, p, max);
+ p += max;
+ size -= max;
+ }
+ data[9] = last ? 0 : 1;
+ write_u16(data + 3, size + 10);
+ send_attached(data, 10, p, size);
+ }
+}
+
+void ProxyImpl::send_attached(void const* header, size_t header_size,
+ void const* data, size_t data_size) {
+ auto it = attached_.begin();
+ while (it != attached_.end()) {
+ if (monitor_send_chunked(*it, header, header_size, data, data_size)) {
+ ++it;
+ } else {
+ auto index = *it;
+ it = attached_.erase(it);
+ close_monitor(index);
+ }
+ }
+}
+
int setup_socket(char const* host, std::string const& port, Logger* logger) {
io::auto_fd ret;
struct addrinfo hints;