#include "common.hh" #include "config.hh" #include "fcgi_protocol.hh" #include "http_protocol.hh" #include "logger.hh" #include "strutil.hh" #include "transport_base.hh" #include "transport_fastcgi.hh" #include #include namespace { class FastCgiTransport : public TransportBase { public: FastCgiTransport(std::shared_ptr logger, std::shared_ptr looper, std::shared_ptr runner, Transport::Handler* handler) : TransportBase(logger, looper, runner, handler) { } bool setup(Logger* logger, Config const* config) override { if (!TransportBase::setup(logger, config)) return false; auto max_requests = config->get("transport.fcgi.max_requests", 20); if (!max_requests.has_value()) { logger->err("transport.fcgi.max_requests is unknown value: '%s'", config->get("transport.fcgi.max_requests", nullptr)); return false; } if (max_requests.value() < 1) { logger->err("transport.fcgi.max_requests must be > 0"); return false; } max_requests_ = max_requests.value(); extra_.resize(clients()); for (auto& extra : extra_) { extra.requests.resize(max_requests_); } return true; } bool client_handle(Client* client) override { auto& extra = extra_[client->index_]; if (extra.close_connection) { client->expect_in_ = 0; if (client->out_->empty()) { client_abort(client); return false; } return true; } // A fastcgi records can be quite small, handle as many as possible // before returning. while (true) { if (!extra.record) { extra.record = fcgi::Record::parse(client->in_.get()); if (!extra.record) { // Need more input size_t avail; client->in_->rbuf(8, avail); assert(avail < 8); // if avail >= 8 parse should not return nullptr client->expect_in_ = avail >= 8 ? 1 : 8 - avail; if (client->in_closed_) { if (client->out_->empty()) { client_abort(client); return false; } // Wait for output to be sent (or timeout, whichever is first). } return true; } if (!extra.record->good()) { logger_->warn("Bad record sent by client: %zu", client->index_); client_abort(client); return false; } extra.content_offset = 0; } if (extra.record->request_id() != 0 && extra.record->type() != fcgi::RecordType::BeginRequest && extra.request_map.count(extra.record->request_id()) == 0) { logger_->dbg("Ignoring message for unknown request for client: %zu", client->index_); // Ignoring messages for unknown requests if (!client_consume_content(client)) return false; } else { switch (extra.record->type()) { case fcgi::RecordType::GetValues: if (extra.record->request_id() == 0) { if (!client_handle_get_values(client)) return false; } else { if (!client_handle_unknown(client)) return false; } break; case fcgi::RecordType::GetValuesResult: case fcgi::RecordType::EndRequest: case fcgi::RecordType::Stdout: case fcgi::RecordType::Stderr: logger_->warn("Client %zu sending application record type", client->index_); client_abort(client); return false; case fcgi::RecordType::BeginRequest: if (extra.record->request_id() == 0) { logger_->warn("Client %zu sending BeginRequest for 0", client->index_); client_abort(client); return false; } if (!client_handle_begin_request(client)) return false; break; case fcgi::RecordType::AbortRequest: if (extra.record->request_id() == 0) { logger_->warn("Client %zu sending AbortRequest for 0", client->index_); client_abort(client); return false; } if (!client_handle_abort_request(client)) return false; break; case fcgi::RecordType::Params: if (extra.record->request_id() == 0) { logger_->warn("Client %zu sending Params for 0", client->index_); client_abort(client); return false; } if (!client_handle_params(client)) return false; break; case fcgi::RecordType::Stdin: if (extra.record->request_id() == 0) { logger_->warn("Client %zu sending Stdin for 0", client->index_); client_abort(client); return false; } if (!client_handle_stdin(client)) return false; break; default: if (!client_handle_unknown(client)) return false; } } if (extra.record) { // Need more data return true; } } } void client_new(Client* client) override { TransportBase::client_new(client); auto& extra = extra_[client->index_]; extra.request_map.clear(); extra.record.reset(); extra.stream.reset(); extra.pair.reset(); extra.content_offset = 0; extra.values.clear(); extra.close_connection = false; for (auto& request : extra.requests) { request.active = false; request.stream.reset(); request.pair.reset(); request.params.clear(); } } private: class StdoutBuffer : public Buffer { public: void use(uint32_t request_id, Buffer* out) { assert(!ptr_); id_ = request_id; out_ = out; good_ = true; } bool good() const { return good_; } bool full() const override { assert(out_); return out_->full(); } void clear() override { assert(false); } char* wbuf(size_t request, size_t& avail) override { assert(!ptr_); assert(out_); auto ptr = out_->wbuf(request + 16, avail); if (avail >= 16) { avail -= 16; if (avail > std::numeric_limits::max()) avail = std::numeric_limits::max(); ptr_ = ptr; return ptr + 8; } else { avail = 0; ptr_ = nullptr; return nullptr; } } void wcommit(size_t bytes) override { assert(out_); if (bytes > 0) { assert(ptr_); assert(bytes <= std::numeric_limits::max()); auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::Stdout, id_, bytes); bool result = builder->build(ptr_, 8) && builder->padding(ptr_ + 8 + bytes, 8); if (!result) good_ = false; out_->wcommit(builder->size()); } ptr_ = nullptr; } bool empty() const override { assert(out_); return out_->empty(); } char const* rbuf(size_t, size_t& avail) override { assert(false); avail = 0; return nullptr; } void rcommit(size_t) override { assert(false); } private: uint32_t id_ = 0; Buffer* out_ = nullptr; char* ptr_ = nullptr; bool good_ = true; }; class FastCgiRequest : public UrlRequest { public: explicit FastCgiRequest(std::vector> params) : params_(std::move(params)) { } std::string_view method() const override { return find("REQUEST_METHOD", "GET"); } std::string_view url() const override { return find("REQUEST_URI", "/"); } std::optional header_one( std::string_view name) const override { for (auto const& pair : params_) { if (lower_case_equal(pair.first, name)) return pair.second; } return std::nullopt; } std::vector header_all(std::string_view name) const override { std::vector ret; for (auto const& pair : params_) { if (lower_case_equal(pair.first, name)) { auto tmp = str::split(pair.second, ','); for (auto str : tmp) ret.push_back(std::string(str::trim(str))); } } return ret; } private: static bool lower_case_equal(std::string_view a, std::string_view b) { if (a.size() != b.size()) return false; for (size_t i = 0; i < a.size(); ++i) { if (lower_case(a[i]) != lower_case(b[i])) return false; } return true; } static bool lower_case(char c) { return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; } std::string_view find(std::string_view name, std::string_view fallback) const { for (auto const& pair : params_) { if (pair.first == name) return pair.second; } return fallback; } std::vector> params_; }; struct Request { bool keep_conn{false}; bool active{false}; std::unique_ptr stream; std::unique_ptr pair; bool add_to_stream; std::vector> params; std::unique_ptr stdin; bool add_to_stdin; std::optional app_state; }; struct Extra { std::unordered_map request_map; std::vector requests; std::unique_ptr record; std::unique_ptr stream; std::unique_ptr pair; size_t content_offset; std::vector values; bool close_connection; }; bool client_handle_unknown(Client* client) { auto& extra = extra_[client->index_]; if (extra.record->request_id() == 0) { if (extra.content_offset == 0) logger_->dbg("Client %zu got unknown maintainence record.", client->index_); auto type = extra.record->type(); if (!client_consume_content(client)) return false; if (extra.record) return true; auto builder = fcgi::RecordBuilder::create_unknown_type(type); return client_send(client, std::move(builder)); } else { if (extra.content_offset == 0) logger_->info("Client %zu got unknown record type for request: %u", client->index_, extra.record->type()); return client_consume_content(client); } } bool client_send(Client* client, std::unique_ptr builder) { if (!builder->build(client->out_.get())) { logger_->warn("Client output buffer full: %zu", client->index_); client_abort(client); return false; } return client_flush(client); } bool client_handle_get_values(Client* client) { auto& extra = extra_[client->index_]; assert(extra.record); assert(extra.record->type() == fcgi::RecordType::GetValues); bool need_more; if (!extra.stream) extra.stream = fcgi::RecordStream::create_single(extra.record.get()); if (!extra.pair) { extra.pair = fcgi::Pair::start(extra.stream.get(), client->in_.get()); need_more = !extra.pair; } else { need_more = !extra.pair->next(extra.stream.get(), client->in_.get()); } while (!need_more) { if (!extra.pair->good()) { logger_->warn("Client %zu sent invalid GetValues", client->index_); client_abort(client); return false; } extra.values.push_back(extra.pair->name()); need_more = !extra.pair->next(extra.stream.get(), client->in_.get()); } if (extra.stream->end_of_stream()) return client_handle_end_of_get_values(client); // Need more data return true; } bool client_handle_end_of_get_values(Client* client) { auto& extra = extra_[client->index_]; extra.pair.reset(); extra.stream.reset(); extra.record.reset(); auto pair_builder = fcgi::PairBuilder::create(); for (auto& value : extra.values) { if (value == "FCGI_MAX_CONNS") { pair_builder->add(std::move(value), std::to_string(clients())); } else if (value == "FCGI_MAX_REQS") { pair_builder->add(std::move(value), std::to_string(max_requests_)); } else if (value == "FCGI_MPXS_CONNS") { pair_builder->add(std::move(value), "1"); } else { logger_->dbg("Unknown value `%s` ignored by client: %zu", value.c_str(), client->index_); } } extra.values.clear(); auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::GetValuesResult, 0, pair_builder->size()); if (!builder->build(client->out_.get()) || !pair_builder->build(client->out_.get()) || !builder->padding(client->out_.get())) { logger_->warn("Client output buffer full: %zu", client->index_); client_abort(client); return false; } return client_flush(client); } bool client_handle_begin_request(Client* client) { auto& extra = extra_[client->index_]; assert(extra.record); assert(extra.record->type() == fcgi::RecordType::BeginRequest); auto body = fcgi::BeginRequestBody::parse(extra.record.get(), client->in_.get()); if (!body) return true; // Need more data if (!body->good()) { logger_->warn("Client sent invalid begin request: %zu", client->index_); client_abort(client); return false; } auto request_id = extra.record->request_id(); extra.record.reset(); if (body->role() != fcgi::Role::Responder) { return client_send(client, fcgi::RecordBuilder::create_end_request( request_id, 1, fcgi::ProtocolStatus::UnknownRole)); } size_t i = 0; for (; i < max_requests_; ++i) if (!extra.requests[i].active) break; if (i == max_requests_) { return client_send(client, fcgi::RecordBuilder::create_end_request( request_id, 1, fcgi::ProtocolStatus::Overloaded)); } auto ret = extra.request_map.emplace(request_id, i); if (!ret.second) { logger_->warn("Client sent double begin request: %zu", client->index_); client_abort(client); return false; } extra.requests[i].active = true; extra.requests[i].keep_conn = body->flags() & fcgi::Flags::KeepConn; return true; } bool client_handle_abort_request(Client* client) { auto& extra = extra_[client->index_]; assert(extra.record); assert(extra.record->type() == fcgi::RecordType::AbortRequest); auto request_id = extra.record->request_id(); if (!client_consume_content(client)) return false; if (extra.record) return true; // Need more data return client_end_request(client, request_id, 1); } bool client_end_request(Client* client, uint16_t request_id, uint32_t app_state) { auto& extra = extra_[client->index_]; auto it = extra.request_map.find(request_id); if (it == extra.request_map.end()) { assert(false); return true; } auto& request = extra.requests[it->second]; request.params.clear(); if (!request.stdin || !request.stdin->end_of_stream()) { request.app_state = app_state; return true; } auto keep_conn = request.keep_conn; request.active = false; request.app_state.reset(); request.stream.reset(); request.stdin.reset(); request.pair.reset(); extra.request_map.erase(it); if (!client_send(client, fcgi::RecordBuilder::create_end_request( request_id, app_state, fcgi::ProtocolStatus::RequestComplete))) return false; if (keep_conn) return true; // TODO: Should this check that no other requests are active? extra.close_connection = true; client->expect_in_ = 0; if (client->out_->empty()) { client_abort(client); return false; } return true; } bool client_handle_params(Client* client) { auto& extra = extra_[client->index_]; assert(extra.record); assert(extra.record->type() == fcgi::RecordType::Params); auto& request = extra.requests.at( extra.request_map.at(extra.record->request_id())); if (!request.stream) { request.stream = fcgi::RecordStream::create_stream(extra.record.get()); request.add_to_stream = false; } else { if (request.add_to_stream) { request.stream->add(extra.record.get()); request.add_to_stream = false; } } bool need_more; if (!request.pair) { request.pair = fcgi::Pair::start(request.stream.get(), client->in_.get()); need_more = !request.pair; } else { need_more = !request.pair->next(request.stream.get(), client->in_.get()); } while (!need_more) { if (!request.pair->good()) { logger_->warn("Client %zu sending invalid params", client->index_); client_abort(client); return false; } request.params.emplace_back(request.pair->name(), request.pair->value()); need_more = !request.pair->next(request.stream.get(), client->in_.get()); } if (request.stream->end_of_stream()) { auto id = extra.record->request_id(); extra.record.reset(); request.stream.reset(); request.pair.reset(); return TransportBase::client_request( client, id, std::make_unique(std::move(request.params))); } if (request.stream->end_of_record()) { extra.record.reset(); request.add_to_stream = true; } return true; } bool client_handle_stdin(Client* client) { auto& extra = extra_[client->index_]; assert(extra.record); assert(extra.record->type() == fcgi::RecordType::Stdin); auto& request = extra.requests.at( extra.request_map.at(extra.record->request_id())); if (!request.stdin) { request.stdin = fcgi::RecordStream::create_stream(extra.record.get()); request.add_to_stdin = false; } else { if (request.add_to_stdin) { request.stdin->add(extra.record.get()); request.add_to_stdin = false; } } // Just consume the stdin stream data, it isn't used for anything while (true) { size_t avail; request.stdin->rbuf(client->in_.get(), 1, avail); if (avail == 0) break; request.stdin->rcommit(client->in_.get(), avail); } if (request.stdin->end_of_stream()) { auto id = extra.record->request_id(); extra.record.reset(); if (request.app_state) return client_end_request(client, id, *request.app_state); } else if (request.stdin->end_of_record()) { extra.record.reset(); request.add_to_stdin = true; } return true; } bool client_consume_content(Client* client) { auto& extra = extra_[client->index_]; assert(extra.record); size_t content_size = static_cast(extra.record->content_length()) + extra.record->padding_length(); if (extra.content_offset >= content_size) { extra.record.reset(); return true; } size_t need = content_size - extra.content_offset; size_t avail; client->in_->rbuf(need, avail); if (avail >= need) { client->in_->rcommit(need); extra.record.reset(); return true; } extra.content_offset += avail; client->in_->rcommit(avail); return true; } bool client_response_header(Client* client, uint32_t id) override { assert(client->responses_.count(id)); auto& cli_response = client->responses_[id]; auto content_builder = CgiResponseBuilder::create(cli_response.response_->code()); for (auto const& pair : cli_response.response_->headers()) content_builder->add_header(pair.first, pair.second); auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::Stdout, id, content_builder->size()); if (!builder->build(client->out_.get()) || !content_builder->build(client->out_.get()) || !builder->padding(client->out_.get())) { logger_->warn("Output buffer full for client: %zu", client->index_); client_abort(client); return false; } return client_flush(client); } bool client_response_content(Client* client, uint32_t id) override { stdout_.use(id, client->out_.get()); if (!TransportBase::client_response_content(client, id, &stdout_)) return false; return stdout_.good(); } bool client_response_footer(Client* client, uint32_t id) override { assert(client->responses_.count(id)); auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::Stdout, id, std::string()); if (!builder->build(client->out_.get())) { logger_->warn("Output buffer full for client: %zu", client->index_); client_abort(client); return false; } if (!client_flush(client)) return false; return client_end_request(client, id, 0); } std::vector extra_; size_t max_requests_{0}; StdoutBuffer stdout_; }; class FastCgiFactory : public Transport::Factory { public: std::unique_ptr create( std::shared_ptr logger, std::shared_ptr looper, std::shared_ptr runner, Logger* config_logger, Config const* config, Transport::Handler* handler) { auto transport = std::make_unique(logger, looper, runner, handler); if (transport->setup(config_logger, config)) return transport; return nullptr; } }; } // namespace std::unique_ptr create_transport_factory_fastcgi() { return std::make_unique(); }