summaryrefslogtreecommitdiff
path: root/src/transport_fastcgi.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport_fastcgi.cc')
-rw-r--r--src/transport_fastcgi.cc707
1 files changed, 707 insertions, 0 deletions
diff --git a/src/transport_fastcgi.cc b/src/transport_fastcgi.cc
new file mode 100644
index 0000000..8a5c3cc
--- /dev/null
+++ b/src/transport_fastcgi.cc
@@ -0,0 +1,707 @@
+#include "common.hh"
+
+#include "config.hh"
+#include "fcgi_protocol.hh"
+#include "http_protocol.hh"
+#include "logger.hh"
+#include "strutil.hh"
+#include "transport_base.hh"
+#include "transport_fastcgi.hh"
+
+#include <unordered_map>
+#include <vector>
+
+namespace {
+
+class FastCgiTransport : public TransportBase {
+public:
+ FastCgiTransport(std::shared_ptr<Logger> logger,
+ std::shared_ptr<Looper> looper,
+ std::shared_ptr<TaskRunner> runner,
+ Transport::Handler* handler)
+ : TransportBase(logger, looper, runner, handler) {
+ }
+
+ bool setup(Logger* logger, Config const* config) override {
+ if (!TransportBase::setup(logger, config))
+ return false;
+
+ auto max_requests = config->get("transport.fcgi.max_requests", 20);
+ if (!max_requests.has_value()) {
+ logger->err("transport.fcgi.max_requests is unknown value: '%s'",
+ config->get("transport.fcgi.max_requests", nullptr));
+ return false;
+ }
+ if (max_requests.value() < 1) {
+ logger->err("transport.fcgi.max_requests must be > 0");
+ return false;
+ }
+
+ max_requests_ = max_requests.value();
+
+ extra_.resize(clients());
+ for (auto& extra : extra_) {
+ extra.requests.resize(max_requests_);
+ }
+
+ return true;
+ }
+
+ bool client_handle(Client* client) override {
+ auto& extra = extra_[client->index_];
+
+ if (extra.close_connection) {
+ client->expect_in_ = 0;
+ if (client->out_->empty()) {
+ client_abort(client);
+ return false;
+ }
+ return true;
+ }
+
+ // A fastcgi records can be quite small, handle as many as possible
+ // before returning.
+ while (true) {
+ if (!extra.record) {
+ extra.record = fcgi::Record::parse(client->in_.get());
+ if (!extra.record) {
+ // Need more input
+ size_t avail;
+ client->in_->rbuf(8, avail);
+ assert(avail < 8); // if avail >= 8 parse should not return nullptr
+ client->expect_in_ = avail >= 8 ? 1 : 8 - avail;
+ if (client->in_closed_) {
+ if (client->out_->empty()) {
+ client_abort(client);
+ return false;
+ }
+ // Wait for output to be sent (or timeout, whichever is first).
+ }
+ return true;
+ }
+ if (!extra.record->good()) {
+ logger_->warn("Bad record sent by client: %zu", client->index_);
+ client_abort(client);
+ return false;
+ }
+ extra.content_offset = 0;
+ }
+
+ if (extra.record->request_id() != 0 &&
+ extra.record->type() != fcgi::RecordType::BeginRequest &&
+ extra.request_map.count(extra.record->request_id()) == 0) {
+ logger_->dbg("Ignoring message for unknown request for client: %zu",
+ client->index_);
+ // Ignoring messages for unknown requests
+ if (!client_consume_content(client))
+ return false;
+ } else {
+ switch (extra.record->type()) {
+ case fcgi::RecordType::GetValues:
+ if (extra.record->request_id() == 0) {
+ if (!client_handle_get_values(client))
+ return false;
+ } else {
+ if (!client_handle_unknown(client))
+ return false;
+ }
+ break;
+ case fcgi::RecordType::GetValuesResult:
+ case fcgi::RecordType::EndRequest:
+ case fcgi::RecordType::Stdout:
+ case fcgi::RecordType::Stderr:
+ logger_->warn("Client %zu sending application record type",
+ client->index_);
+ client_abort(client);
+ return false;
+ case fcgi::RecordType::BeginRequest:
+ if (extra.record->request_id() == 0) {
+ logger_->warn("Client %zu sending BeginRequest for 0",
+ client->index_);
+ client_abort(client);
+ return false;
+ }
+ if (!client_handle_begin_request(client))
+ return false;
+ break;
+ case fcgi::RecordType::AbortRequest:
+ if (extra.record->request_id() == 0) {
+ logger_->warn("Client %zu sending AbortRequest for 0",
+ client->index_);
+ client_abort(client);
+ return false;
+ }
+ if (!client_handle_abort_request(client))
+ return false;
+ break;
+ case fcgi::RecordType::Params:
+ if (extra.record->request_id() == 0) {
+ logger_->warn("Client %zu sending Params for 0", client->index_);
+ client_abort(client);
+ return false;
+ }
+ if (!client_handle_params(client))
+ return false;
+ break;
+ case fcgi::RecordType::Stdin:
+ if (extra.record->request_id() == 0) {
+ logger_->warn("Client %zu sending Stdin for 0", client->index_);
+ client_abort(client);
+ return false;
+ }
+ if (!client_handle_stdin(client))
+ return false;
+ break;
+ default:
+ if (!client_handle_unknown(client))
+ return false;
+ }
+ }
+
+ if (extra.record) {
+ // Need more data
+ return true;
+ }
+ }
+ }
+
+ void client_new(Client* client) override {
+ TransportBase::client_new(client);
+
+ auto& extra = extra_[client->index_];
+ extra.request_map.clear();
+ extra.record.reset();
+ extra.stream.reset();
+ extra.pair.reset();
+ extra.content_offset = 0;
+ extra.values.clear();
+ extra.close_connection = false;
+
+ for (auto& request : extra.requests) {
+ request.active = false;
+ request.stream.reset();
+ request.pair.reset();
+ request.params.clear();
+ }
+ }
+
+private:
+ class StdoutBuffer : public Buffer {
+ public:
+ void use(uint32_t request_id, Buffer* out) {
+ assert(!ptr_);
+ id_ = request_id;
+ out_ = out;
+ good_ = true;
+ }
+
+ bool good() const {
+ return good_;
+ }
+
+ bool full() const override {
+ assert(out_);
+ return out_->full();
+ }
+
+ void clear() override {
+ assert(false);
+ }
+
+ char* wbuf(size_t request, size_t& avail) override {
+ assert(!ptr_);
+ assert(out_);
+ auto ptr = out_->wbuf(request + 16, avail);
+ if (avail >= 16) {
+ avail -= 16;
+ if (avail > std::numeric_limits<uint16_t>::max())
+ avail = std::numeric_limits<uint16_t>::max();
+ ptr_ = ptr;
+ return ptr + 8;
+ } else {
+ avail = 0;
+ ptr_ = nullptr;
+ return nullptr;
+ }
+ }
+
+ void wcommit(size_t bytes) override {
+ assert(out_);
+ if (bytes > 0) {
+ assert(ptr_);
+ assert(bytes <= std::numeric_limits<uint16_t>::max());
+ auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::Stdout,
+ id_, bytes);
+ bool result = builder->build(ptr_, 8) &&
+ builder->padding(ptr_ + 8 + bytes, 8);
+ if (!result)
+ good_ = false;
+ out_->wcommit(builder->size());
+ }
+ ptr_ = nullptr;
+ }
+
+ bool empty() const override {
+ assert(out_);
+ return out_->empty();
+ }
+
+ char const* rbuf(size_t, size_t& avail) override {
+ assert(false);
+ avail = 0;
+ return nullptr;
+ }
+
+ void rcommit(size_t) override {
+ assert(false);
+ }
+
+ private:
+ uint32_t id_ = 0;
+ Buffer* out_ = nullptr;
+ char* ptr_ = nullptr;
+ bool good_ = true;
+ };
+
+ class FastCgiRequest : public UrlRequest {
+ public:
+ explicit FastCgiRequest(std::vector<std::pair<std::string, std::string>>
+ params)
+ : params_(std::move(params)) {
+ }
+
+ std::string_view method() const override {
+ return find("REQUEST_METHOD", "GET");
+ }
+
+ std::string_view url() const override {
+ return find("REQUEST_URI", "/");
+ }
+
+ std::optional<std::string> header_one(
+ std::string_view name) const override {
+ for (auto const& pair : params_) {
+ if (lower_case_equal(pair.first, name))
+ return pair.second;
+ }
+ return std::nullopt;
+ }
+
+ std::vector<std::string> header_all(std::string_view name) const override {
+ std::vector<std::string> ret;
+ for (auto const& pair : params_) {
+ if (lower_case_equal(pair.first, name)) {
+ auto tmp = str::split(pair.second, ',');
+ for (auto str : tmp)
+ ret.push_back(std::string(str::trim(str)));
+ }
+ }
+ return ret;
+ }
+
+ private:
+ static bool lower_case_equal(std::string_view a, std::string_view b) {
+ if (a.size() != b.size())
+ return false;
+ for (size_t i = 0; i < a.size(); ++i) {
+ if (lower_case(a[i]) != lower_case(b[i]))
+ return false;
+ }
+ return true;
+ }
+
+ static bool lower_case(char c) {
+ return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
+ }
+
+ std::string_view find(std::string_view name,
+ std::string_view fallback) const {
+ for (auto const& pair : params_) {
+ if (pair.first == name)
+ return pair.second;
+ }
+ return fallback;
+ }
+
+ std::vector<std::pair<std::string, std::string>> params_;
+ };
+
+ struct Request {
+ bool keep_conn{false};
+ bool active{false};
+ std::unique_ptr<fcgi::RecordStream> stream;
+ std::unique_ptr<fcgi::Pair> pair;
+ bool add_to_stream;
+ std::vector<std::pair<std::string, std::string>> params;
+ std::unique_ptr<fcgi::RecordStream> stdin;
+ bool add_to_stdin;
+ std::optional<uint32_t> app_state;
+ };
+
+ struct Extra {
+ std::unordered_map<uint16_t, size_t> request_map;
+ std::vector<Request> requests;
+ std::unique_ptr<fcgi::Record> record;
+ std::unique_ptr<fcgi::RecordStream> stream;
+ std::unique_ptr<fcgi::Pair> pair;
+ size_t content_offset;
+ std::vector<std::string> values;
+ bool close_connection;
+ };
+
+ bool client_handle_unknown(Client* client) {
+ auto& extra = extra_[client->index_];
+ if (extra.record->request_id() == 0) {
+ if (extra.content_offset == 0)
+ logger_->dbg("Client %zu got unknown maintainence record.",
+ client->index_);
+ auto type = extra.record->type();
+ if (!client_consume_content(client))
+ return false;
+ if (extra.record)
+ return true;
+ auto builder = fcgi::RecordBuilder::create_unknown_type(type);
+ return client_send(client, std::move(builder));
+ } else {
+ if (extra.content_offset == 0)
+ logger_->info("Client %zu got unknown record type for request: %u",
+ client->index_, extra.record->type());
+ return client_consume_content(client);
+ }
+ }
+
+ bool client_send(Client* client,
+ std::unique_ptr<fcgi::RecordBuilder> builder) {
+ if (!builder->build(client->out_.get())) {
+ logger_->warn("Client output buffer full: %zu", client->index_);
+ client_abort(client);
+ return false;
+ }
+ return client_flush(client);
+ }
+
+ bool client_handle_get_values(Client* client) {
+ auto& extra = extra_[client->index_];
+ assert(extra.record);
+ assert(extra.record->type() == fcgi::RecordType::GetValues);
+ bool need_more;
+ if (!extra.stream)
+ extra.stream = fcgi::RecordStream::create_single(extra.record.get());
+ if (!extra.pair) {
+ extra.pair = fcgi::Pair::start(extra.stream.get(), client->in_.get());
+ need_more = !extra.pair;
+ } else {
+ need_more = !extra.pair->next(extra.stream.get(), client->in_.get());
+ }
+ while (!need_more) {
+ if (!extra.pair->good()) {
+ logger_->warn("Client %zu sent invalid GetValues", client->index_);
+ client_abort(client);
+ return false;
+ }
+ extra.values.push_back(extra.pair->name());
+ need_more = !extra.pair->next(extra.stream.get(), client->in_.get());
+ }
+ if (extra.stream->end_of_stream())
+ return client_handle_end_of_get_values(client);
+ // Need more data
+ return true;
+ }
+
+ bool client_handle_end_of_get_values(Client* client) {
+ auto& extra = extra_[client->index_];
+ extra.pair.reset();
+ extra.stream.reset();
+ extra.record.reset();
+ auto pair_builder = fcgi::PairBuilder::create();
+ for (auto& value : extra.values) {
+ if (value == "FCGI_MAX_CONNS") {
+ pair_builder->add(std::move(value), std::to_string(clients()));
+ } else if (value == "FCGI_MAX_REQS") {
+ pair_builder->add(std::move(value), std::to_string(max_requests_));
+ } else if (value == "FCGI_MPXS_CONNS") {
+ pair_builder->add(std::move(value), "1");
+ } else {
+ logger_->dbg("Unknown value `%s` ignored by client: %zu",
+ value.c_str(), client->index_);
+ }
+ }
+ extra.values.clear();
+ auto builder =
+ fcgi::RecordBuilder::create(fcgi::RecordType::GetValuesResult,
+ 0,
+ pair_builder->size());
+ if (!builder->build(client->out_.get()) ||
+ !pair_builder->build(client->out_.get()) ||
+ !builder->padding(client->out_.get())) {
+ logger_->warn("Client output buffer full: %zu", client->index_);
+ client_abort(client);
+ return false;
+ }
+ return client_flush(client);
+ }
+
+ bool client_handle_begin_request(Client* client) {
+ auto& extra = extra_[client->index_];
+ assert(extra.record);
+ assert(extra.record->type() == fcgi::RecordType::BeginRequest);
+ auto body = fcgi::BeginRequestBody::parse(extra.record.get(),
+ client->in_.get());
+ if (!body)
+ return true; // Need more data
+ if (!body->good()) {
+ logger_->warn("Client sent invalid begin request: %zu", client->index_);
+ client_abort(client);
+ return false;
+ }
+ auto request_id = extra.record->request_id();
+ extra.record.reset();
+ if (body->role() != fcgi::Role::Responder) {
+ return client_send(client,
+ fcgi::RecordBuilder::create_end_request(
+ request_id, 1,
+ fcgi::ProtocolStatus::UnknownRole));
+ }
+ size_t i = 0;
+ for (; i < max_requests_; ++i)
+ if (!extra.requests[i].active)
+ break;
+ if (i == max_requests_) {
+ return client_send(client,
+ fcgi::RecordBuilder::create_end_request(
+ request_id, 1,
+ fcgi::ProtocolStatus::Overloaded));
+ }
+ auto ret = extra.request_map.emplace(request_id, i);
+ if (!ret.second) {
+ logger_->warn("Client sent double begin request: %zu", client->index_);
+ client_abort(client);
+ return false;
+ }
+ extra.requests[i].active = true;
+ extra.requests[i].keep_conn = body->flags() & fcgi::Flags::KeepConn;
+ return true;
+ }
+
+ bool client_handle_abort_request(Client* client) {
+ auto& extra = extra_[client->index_];
+ assert(extra.record);
+ assert(extra.record->type() == fcgi::RecordType::AbortRequest);
+ auto request_id = extra.record->request_id();
+ if (!client_consume_content(client))
+ return false;
+ if (extra.record)
+ return true; // Need more data
+ return client_end_request(client, request_id, 1);
+ }
+
+ bool client_end_request(Client* client, uint16_t request_id,
+ uint32_t app_state) {
+ auto& extra = extra_[client->index_];
+ auto it = extra.request_map.find(request_id);
+ if (it == extra.request_map.end()) {
+ assert(false);
+ return true;
+ }
+ auto& request = extra.requests[it->second];
+ request.params.clear();
+ if (!request.stdin || !request.stdin->end_of_stream()) {
+ request.app_state = app_state;
+ return true;
+ }
+ auto keep_conn = request.keep_conn;
+ request.active = false;
+ request.app_state.reset();
+ request.stream.reset();
+ request.stdin.reset();
+ request.pair.reset();
+ extra.request_map.erase(it);
+ if (!client_send(client,
+ fcgi::RecordBuilder::create_end_request(
+ request_id, app_state,
+ fcgi::ProtocolStatus::RequestComplete)))
+ return false;
+ if (keep_conn)
+ return true;
+
+ // TODO: Should this check that no other requests are active?
+ extra.close_connection = true;
+ client->expect_in_ = 0;
+ if (client->out_->empty()) {
+ client_abort(client);
+ return false;
+ }
+ return true;
+ }
+
+ bool client_handle_params(Client* client) {
+ auto& extra = extra_[client->index_];
+ assert(extra.record);
+ assert(extra.record->type() == fcgi::RecordType::Params);
+ auto& request = extra.requests.at(
+ extra.request_map.at(extra.record->request_id()));
+ if (!request.stream) {
+ request.stream = fcgi::RecordStream::create_stream(extra.record.get());
+ request.add_to_stream = false;
+ } else {
+ if (request.add_to_stream) {
+ request.stream->add(extra.record.get());
+ request.add_to_stream = false;
+ }
+ }
+ bool need_more;
+ if (!request.pair) {
+ request.pair = fcgi::Pair::start(request.stream.get(), client->in_.get());
+ need_more = !request.pair;
+ } else {
+ need_more = !request.pair->next(request.stream.get(), client->in_.get());
+ }
+ while (!need_more) {
+ if (!request.pair->good()) {
+ logger_->warn("Client %zu sending invalid params", client->index_);
+ client_abort(client);
+ return false;
+ }
+ request.params.emplace_back(request.pair->name(), request.pair->value());
+ need_more = !request.pair->next(request.stream.get(), client->in_.get());
+ }
+ if (request.stream->end_of_stream()) {
+ auto id = extra.record->request_id();
+ extra.record.reset();
+ request.stream.reset();
+ request.pair.reset();
+ return TransportBase::client_request(
+ client, id,
+ std::make_unique<FastCgiRequest>(std::move(request.params)));
+ }
+ if (request.stream->end_of_record()) {
+ extra.record.reset();
+ request.add_to_stream = true;
+ }
+ return true;
+ }
+
+ bool client_handle_stdin(Client* client) {
+ auto& extra = extra_[client->index_];
+ assert(extra.record);
+ assert(extra.record->type() == fcgi::RecordType::Stdin);
+ auto& request = extra.requests.at(
+ extra.request_map.at(extra.record->request_id()));
+ if (!request.stdin) {
+ request.stdin = fcgi::RecordStream::create_stream(extra.record.get());
+ request.add_to_stdin = false;
+ } else {
+ if (request.add_to_stdin) {
+ request.stdin->add(extra.record.get());
+ request.add_to_stdin = false;
+ }
+ }
+ // Just consume the stdin stream data, it isn't used for anything
+ while (true) {
+ size_t avail;
+ request.stdin->rbuf(client->in_.get(), 1, avail);
+ if (avail == 0)
+ break;
+ request.stdin->rcommit(client->in_.get(), avail);
+ }
+ if (request.stdin->end_of_stream()) {
+ auto id = extra.record->request_id();
+ extra.record.reset();
+ if (request.app_state)
+ return client_end_request(client, id, *request.app_state);
+ } else if (request.stdin->end_of_record()) {
+ extra.record.reset();
+ request.add_to_stdin = true;
+ }
+ return true;
+ }
+
+ bool client_consume_content(Client* client) {
+ auto& extra = extra_[client->index_];
+ assert(extra.record);
+ size_t content_size = static_cast<size_t>(extra.record->content_length()) +
+ extra.record->padding_length();
+ if (extra.content_offset >= content_size) {
+ extra.record.reset();
+ return true;
+ }
+ size_t need = content_size - extra.content_offset;
+ size_t avail;
+ client->in_->rbuf(need, avail);
+ if (avail >= need) {
+ client->in_->rcommit(need);
+ extra.record.reset();
+ return true;
+ }
+ extra.content_offset += avail;
+ client->in_->rcommit(avail);
+ return true;
+ }
+
+ bool client_response_header(Client* client, uint32_t id) override {
+ assert(client->responses_.count(id));
+ auto& cli_response = client->responses_[id];
+ auto content_builder =
+ CgiResponseBuilder::create(cli_response.response_->code());
+ for (auto const& pair : cli_response.response_->headers())
+ content_builder->add_header(pair.first, pair.second);
+ auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::Stdout,
+ id, content_builder->size());
+ if (!builder->build(client->out_.get()) ||
+ !content_builder->build(client->out_.get()) ||
+ !builder->padding(client->out_.get())) {
+ logger_->warn("Output buffer full for client: %zu", client->index_);
+ client_abort(client);
+ return false;
+ }
+ return client_flush(client);
+ }
+
+ bool client_response_content(Client* client, uint32_t id) override {
+ stdout_.use(id, client->out_.get());
+ if (!TransportBase::client_response_content(client, id, &stdout_))
+ return false;
+ return stdout_.good();
+ }
+
+ bool client_response_footer(Client* client, uint32_t id) override {
+ assert(client->responses_.count(id));
+ auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::Stdout, id,
+ std::string());
+ if (!builder->build(client->out_.get())) {
+ logger_->warn("Output buffer full for client: %zu", client->index_);
+ client_abort(client);
+ return false;
+ }
+ if (!client_flush(client))
+ return false;
+ return client_end_request(client, id, 0);
+ }
+
+ std::vector<Extra> extra_;
+ size_t max_requests_{0};
+ StdoutBuffer stdout_;
+};
+
+class FastCgiFactory : public Transport::Factory {
+public:
+ std::unique_ptr<Transport> create(
+ std::shared_ptr<Logger> logger,
+ std::shared_ptr<Looper> looper,
+ std::shared_ptr<TaskRunner> runner,
+ Logger* config_logger,
+ Config const* config,
+ Transport::Handler* handler) {
+ auto transport =
+ std::make_unique<FastCgiTransport>(logger, looper, runner, handler);
+ if (transport->setup(config_logger, config))
+ return transport;
+ return nullptr;
+ }
+};
+
+} // namespace
+
+std::unique_ptr<Transport::Factory> create_transport_factory_fastcgi() {
+ return std::make_unique<FastCgiFactory>();
+}