From 6232d13f5321b87ddf12a1aa36b4545da45f173d Mon Sep 17 00:00:00 2001 From: Joel Klinghed Date: Wed, 17 Nov 2021 22:34:57 +0100 Subject: Travel3: Simple image and video display site Reads the images and videos from filesystem and builds a site in memroy. --- src/transport_fastcgi.cc | 707 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 707 insertions(+) create mode 100644 src/transport_fastcgi.cc (limited to 'src/transport_fastcgi.cc') diff --git a/src/transport_fastcgi.cc b/src/transport_fastcgi.cc new file mode 100644 index 0000000..8a5c3cc --- /dev/null +++ b/src/transport_fastcgi.cc @@ -0,0 +1,707 @@ +#include "common.hh" + +#include "config.hh" +#include "fcgi_protocol.hh" +#include "http_protocol.hh" +#include "logger.hh" +#include "strutil.hh" +#include "transport_base.hh" +#include "transport_fastcgi.hh" + +#include +#include + +namespace { + +class FastCgiTransport : public TransportBase { +public: + FastCgiTransport(std::shared_ptr logger, + std::shared_ptr looper, + std::shared_ptr runner, + Transport::Handler* handler) + : TransportBase(logger, looper, runner, handler) { + } + + bool setup(Logger* logger, Config const* config) override { + if (!TransportBase::setup(logger, config)) + return false; + + auto max_requests = config->get("transport.fcgi.max_requests", 20); + if (!max_requests.has_value()) { + logger->err("transport.fcgi.max_requests is unknown value: '%s'", + config->get("transport.fcgi.max_requests", nullptr)); + return false; + } + if (max_requests.value() < 1) { + logger->err("transport.fcgi.max_requests must be > 0"); + return false; + } + + max_requests_ = max_requests.value(); + + extra_.resize(clients()); + for (auto& extra : extra_) { + extra.requests.resize(max_requests_); + } + + return true; + } + + bool client_handle(Client* client) override { + auto& extra = extra_[client->index_]; + + if (extra.close_connection) { + client->expect_in_ = 0; + if (client->out_->empty()) { + client_abort(client); + return false; + } + return true; + } + + // A fastcgi records can be quite small, handle as many as possible + // before returning. + while (true) { + if (!extra.record) { + extra.record = fcgi::Record::parse(client->in_.get()); + if (!extra.record) { + // Need more input + size_t avail; + client->in_->rbuf(8, avail); + assert(avail < 8); // if avail >= 8 parse should not return nullptr + client->expect_in_ = avail >= 8 ? 1 : 8 - avail; + if (client->in_closed_) { + if (client->out_->empty()) { + client_abort(client); + return false; + } + // Wait for output to be sent (or timeout, whichever is first). + } + return true; + } + if (!extra.record->good()) { + logger_->warn("Bad record sent by client: %zu", client->index_); + client_abort(client); + return false; + } + extra.content_offset = 0; + } + + if (extra.record->request_id() != 0 && + extra.record->type() != fcgi::RecordType::BeginRequest && + extra.request_map.count(extra.record->request_id()) == 0) { + logger_->dbg("Ignoring message for unknown request for client: %zu", + client->index_); + // Ignoring messages for unknown requests + if (!client_consume_content(client)) + return false; + } else { + switch (extra.record->type()) { + case fcgi::RecordType::GetValues: + if (extra.record->request_id() == 0) { + if (!client_handle_get_values(client)) + return false; + } else { + if (!client_handle_unknown(client)) + return false; + } + break; + case fcgi::RecordType::GetValuesResult: + case fcgi::RecordType::EndRequest: + case fcgi::RecordType::Stdout: + case fcgi::RecordType::Stderr: + logger_->warn("Client %zu sending application record type", + client->index_); + client_abort(client); + return false; + case fcgi::RecordType::BeginRequest: + if (extra.record->request_id() == 0) { + logger_->warn("Client %zu sending BeginRequest for 0", + client->index_); + client_abort(client); + return false; + } + if (!client_handle_begin_request(client)) + return false; + break; + case fcgi::RecordType::AbortRequest: + if (extra.record->request_id() == 0) { + logger_->warn("Client %zu sending AbortRequest for 0", + client->index_); + client_abort(client); + return false; + } + if (!client_handle_abort_request(client)) + return false; + break; + case fcgi::RecordType::Params: + if (extra.record->request_id() == 0) { + logger_->warn("Client %zu sending Params for 0", client->index_); + client_abort(client); + return false; + } + if (!client_handle_params(client)) + return false; + break; + case fcgi::RecordType::Stdin: + if (extra.record->request_id() == 0) { + logger_->warn("Client %zu sending Stdin for 0", client->index_); + client_abort(client); + return false; + } + if (!client_handle_stdin(client)) + return false; + break; + default: + if (!client_handle_unknown(client)) + return false; + } + } + + if (extra.record) { + // Need more data + return true; + } + } + } + + void client_new(Client* client) override { + TransportBase::client_new(client); + + auto& extra = extra_[client->index_]; + extra.request_map.clear(); + extra.record.reset(); + extra.stream.reset(); + extra.pair.reset(); + extra.content_offset = 0; + extra.values.clear(); + extra.close_connection = false; + + for (auto& request : extra.requests) { + request.active = false; + request.stream.reset(); + request.pair.reset(); + request.params.clear(); + } + } + +private: + class StdoutBuffer : public Buffer { + public: + void use(uint32_t request_id, Buffer* out) { + assert(!ptr_); + id_ = request_id; + out_ = out; + good_ = true; + } + + bool good() const { + return good_; + } + + bool full() const override { + assert(out_); + return out_->full(); + } + + void clear() override { + assert(false); + } + + char* wbuf(size_t request, size_t& avail) override { + assert(!ptr_); + assert(out_); + auto ptr = out_->wbuf(request + 16, avail); + if (avail >= 16) { + avail -= 16; + if (avail > std::numeric_limits::max()) + avail = std::numeric_limits::max(); + ptr_ = ptr; + return ptr + 8; + } else { + avail = 0; + ptr_ = nullptr; + return nullptr; + } + } + + void wcommit(size_t bytes) override { + assert(out_); + if (bytes > 0) { + assert(ptr_); + assert(bytes <= std::numeric_limits::max()); + auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::Stdout, + id_, bytes); + bool result = builder->build(ptr_, 8) && + builder->padding(ptr_ + 8 + bytes, 8); + if (!result) + good_ = false; + out_->wcommit(builder->size()); + } + ptr_ = nullptr; + } + + bool empty() const override { + assert(out_); + return out_->empty(); + } + + char const* rbuf(size_t, size_t& avail) override { + assert(false); + avail = 0; + return nullptr; + } + + void rcommit(size_t) override { + assert(false); + } + + private: + uint32_t id_ = 0; + Buffer* out_ = nullptr; + char* ptr_ = nullptr; + bool good_ = true; + }; + + class FastCgiRequest : public UrlRequest { + public: + explicit FastCgiRequest(std::vector> + params) + : params_(std::move(params)) { + } + + std::string_view method() const override { + return find("REQUEST_METHOD", "GET"); + } + + std::string_view url() const override { + return find("REQUEST_URI", "/"); + } + + std::optional header_one( + std::string_view name) const override { + for (auto const& pair : params_) { + if (lower_case_equal(pair.first, name)) + return pair.second; + } + return std::nullopt; + } + + std::vector header_all(std::string_view name) const override { + std::vector ret; + for (auto const& pair : params_) { + if (lower_case_equal(pair.first, name)) { + auto tmp = str::split(pair.second, ','); + for (auto str : tmp) + ret.push_back(std::string(str::trim(str))); + } + } + return ret; + } + + private: + static bool lower_case_equal(std::string_view a, std::string_view b) { + if (a.size() != b.size()) + return false; + for (size_t i = 0; i < a.size(); ++i) { + if (lower_case(a[i]) != lower_case(b[i])) + return false; + } + return true; + } + + static bool lower_case(char c) { + return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; + } + + std::string_view find(std::string_view name, + std::string_view fallback) const { + for (auto const& pair : params_) { + if (pair.first == name) + return pair.second; + } + return fallback; + } + + std::vector> params_; + }; + + struct Request { + bool keep_conn{false}; + bool active{false}; + std::unique_ptr stream; + std::unique_ptr pair; + bool add_to_stream; + std::vector> params; + std::unique_ptr stdin; + bool add_to_stdin; + std::optional app_state; + }; + + struct Extra { + std::unordered_map request_map; + std::vector requests; + std::unique_ptr record; + std::unique_ptr stream; + std::unique_ptr pair; + size_t content_offset; + std::vector values; + bool close_connection; + }; + + bool client_handle_unknown(Client* client) { + auto& extra = extra_[client->index_]; + if (extra.record->request_id() == 0) { + if (extra.content_offset == 0) + logger_->dbg("Client %zu got unknown maintainence record.", + client->index_); + auto type = extra.record->type(); + if (!client_consume_content(client)) + return false; + if (extra.record) + return true; + auto builder = fcgi::RecordBuilder::create_unknown_type(type); + return client_send(client, std::move(builder)); + } else { + if (extra.content_offset == 0) + logger_->info("Client %zu got unknown record type for request: %u", + client->index_, extra.record->type()); + return client_consume_content(client); + } + } + + bool client_send(Client* client, + std::unique_ptr builder) { + if (!builder->build(client->out_.get())) { + logger_->warn("Client output buffer full: %zu", client->index_); + client_abort(client); + return false; + } + return client_flush(client); + } + + bool client_handle_get_values(Client* client) { + auto& extra = extra_[client->index_]; + assert(extra.record); + assert(extra.record->type() == fcgi::RecordType::GetValues); + bool need_more; + if (!extra.stream) + extra.stream = fcgi::RecordStream::create_single(extra.record.get()); + if (!extra.pair) { + extra.pair = fcgi::Pair::start(extra.stream.get(), client->in_.get()); + need_more = !extra.pair; + } else { + need_more = !extra.pair->next(extra.stream.get(), client->in_.get()); + } + while (!need_more) { + if (!extra.pair->good()) { + logger_->warn("Client %zu sent invalid GetValues", client->index_); + client_abort(client); + return false; + } + extra.values.push_back(extra.pair->name()); + need_more = !extra.pair->next(extra.stream.get(), client->in_.get()); + } + if (extra.stream->end_of_stream()) + return client_handle_end_of_get_values(client); + // Need more data + return true; + } + + bool client_handle_end_of_get_values(Client* client) { + auto& extra = extra_[client->index_]; + extra.pair.reset(); + extra.stream.reset(); + extra.record.reset(); + auto pair_builder = fcgi::PairBuilder::create(); + for (auto& value : extra.values) { + if (value == "FCGI_MAX_CONNS") { + pair_builder->add(std::move(value), std::to_string(clients())); + } else if (value == "FCGI_MAX_REQS") { + pair_builder->add(std::move(value), std::to_string(max_requests_)); + } else if (value == "FCGI_MPXS_CONNS") { + pair_builder->add(std::move(value), "1"); + } else { + logger_->dbg("Unknown value `%s` ignored by client: %zu", + value.c_str(), client->index_); + } + } + extra.values.clear(); + auto builder = + fcgi::RecordBuilder::create(fcgi::RecordType::GetValuesResult, + 0, + pair_builder->size()); + if (!builder->build(client->out_.get()) || + !pair_builder->build(client->out_.get()) || + !builder->padding(client->out_.get())) { + logger_->warn("Client output buffer full: %zu", client->index_); + client_abort(client); + return false; + } + return client_flush(client); + } + + bool client_handle_begin_request(Client* client) { + auto& extra = extra_[client->index_]; + assert(extra.record); + assert(extra.record->type() == fcgi::RecordType::BeginRequest); + auto body = fcgi::BeginRequestBody::parse(extra.record.get(), + client->in_.get()); + if (!body) + return true; // Need more data + if (!body->good()) { + logger_->warn("Client sent invalid begin request: %zu", client->index_); + client_abort(client); + return false; + } + auto request_id = extra.record->request_id(); + extra.record.reset(); + if (body->role() != fcgi::Role::Responder) { + return client_send(client, + fcgi::RecordBuilder::create_end_request( + request_id, 1, + fcgi::ProtocolStatus::UnknownRole)); + } + size_t i = 0; + for (; i < max_requests_; ++i) + if (!extra.requests[i].active) + break; + if (i == max_requests_) { + return client_send(client, + fcgi::RecordBuilder::create_end_request( + request_id, 1, + fcgi::ProtocolStatus::Overloaded)); + } + auto ret = extra.request_map.emplace(request_id, i); + if (!ret.second) { + logger_->warn("Client sent double begin request: %zu", client->index_); + client_abort(client); + return false; + } + extra.requests[i].active = true; + extra.requests[i].keep_conn = body->flags() & fcgi::Flags::KeepConn; + return true; + } + + bool client_handle_abort_request(Client* client) { + auto& extra = extra_[client->index_]; + assert(extra.record); + assert(extra.record->type() == fcgi::RecordType::AbortRequest); + auto request_id = extra.record->request_id(); + if (!client_consume_content(client)) + return false; + if (extra.record) + return true; // Need more data + return client_end_request(client, request_id, 1); + } + + bool client_end_request(Client* client, uint16_t request_id, + uint32_t app_state) { + auto& extra = extra_[client->index_]; + auto it = extra.request_map.find(request_id); + if (it == extra.request_map.end()) { + assert(false); + return true; + } + auto& request = extra.requests[it->second]; + request.params.clear(); + if (!request.stdin || !request.stdin->end_of_stream()) { + request.app_state = app_state; + return true; + } + auto keep_conn = request.keep_conn; + request.active = false; + request.app_state.reset(); + request.stream.reset(); + request.stdin.reset(); + request.pair.reset(); + extra.request_map.erase(it); + if (!client_send(client, + fcgi::RecordBuilder::create_end_request( + request_id, app_state, + fcgi::ProtocolStatus::RequestComplete))) + return false; + if (keep_conn) + return true; + + // TODO: Should this check that no other requests are active? + extra.close_connection = true; + client->expect_in_ = 0; + if (client->out_->empty()) { + client_abort(client); + return false; + } + return true; + } + + bool client_handle_params(Client* client) { + auto& extra = extra_[client->index_]; + assert(extra.record); + assert(extra.record->type() == fcgi::RecordType::Params); + auto& request = extra.requests.at( + extra.request_map.at(extra.record->request_id())); + if (!request.stream) { + request.stream = fcgi::RecordStream::create_stream(extra.record.get()); + request.add_to_stream = false; + } else { + if (request.add_to_stream) { + request.stream->add(extra.record.get()); + request.add_to_stream = false; + } + } + bool need_more; + if (!request.pair) { + request.pair = fcgi::Pair::start(request.stream.get(), client->in_.get()); + need_more = !request.pair; + } else { + need_more = !request.pair->next(request.stream.get(), client->in_.get()); + } + while (!need_more) { + if (!request.pair->good()) { + logger_->warn("Client %zu sending invalid params", client->index_); + client_abort(client); + return false; + } + request.params.emplace_back(request.pair->name(), request.pair->value()); + need_more = !request.pair->next(request.stream.get(), client->in_.get()); + } + if (request.stream->end_of_stream()) { + auto id = extra.record->request_id(); + extra.record.reset(); + request.stream.reset(); + request.pair.reset(); + return TransportBase::client_request( + client, id, + std::make_unique(std::move(request.params))); + } + if (request.stream->end_of_record()) { + extra.record.reset(); + request.add_to_stream = true; + } + return true; + } + + bool client_handle_stdin(Client* client) { + auto& extra = extra_[client->index_]; + assert(extra.record); + assert(extra.record->type() == fcgi::RecordType::Stdin); + auto& request = extra.requests.at( + extra.request_map.at(extra.record->request_id())); + if (!request.stdin) { + request.stdin = fcgi::RecordStream::create_stream(extra.record.get()); + request.add_to_stdin = false; + } else { + if (request.add_to_stdin) { + request.stdin->add(extra.record.get()); + request.add_to_stdin = false; + } + } + // Just consume the stdin stream data, it isn't used for anything + while (true) { + size_t avail; + request.stdin->rbuf(client->in_.get(), 1, avail); + if (avail == 0) + break; + request.stdin->rcommit(client->in_.get(), avail); + } + if (request.stdin->end_of_stream()) { + auto id = extra.record->request_id(); + extra.record.reset(); + if (request.app_state) + return client_end_request(client, id, *request.app_state); + } else if (request.stdin->end_of_record()) { + extra.record.reset(); + request.add_to_stdin = true; + } + return true; + } + + bool client_consume_content(Client* client) { + auto& extra = extra_[client->index_]; + assert(extra.record); + size_t content_size = static_cast(extra.record->content_length()) + + extra.record->padding_length(); + if (extra.content_offset >= content_size) { + extra.record.reset(); + return true; + } + size_t need = content_size - extra.content_offset; + size_t avail; + client->in_->rbuf(need, avail); + if (avail >= need) { + client->in_->rcommit(need); + extra.record.reset(); + return true; + } + extra.content_offset += avail; + client->in_->rcommit(avail); + return true; + } + + bool client_response_header(Client* client, uint32_t id) override { + assert(client->responses_.count(id)); + auto& cli_response = client->responses_[id]; + auto content_builder = + CgiResponseBuilder::create(cli_response.response_->code()); + for (auto const& pair : cli_response.response_->headers()) + content_builder->add_header(pair.first, pair.second); + auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::Stdout, + id, content_builder->size()); + if (!builder->build(client->out_.get()) || + !content_builder->build(client->out_.get()) || + !builder->padding(client->out_.get())) { + logger_->warn("Output buffer full for client: %zu", client->index_); + client_abort(client); + return false; + } + return client_flush(client); + } + + bool client_response_content(Client* client, uint32_t id) override { + stdout_.use(id, client->out_.get()); + if (!TransportBase::client_response_content(client, id, &stdout_)) + return false; + return stdout_.good(); + } + + bool client_response_footer(Client* client, uint32_t id) override { + assert(client->responses_.count(id)); + auto builder = fcgi::RecordBuilder::create(fcgi::RecordType::Stdout, id, + std::string()); + if (!builder->build(client->out_.get())) { + logger_->warn("Output buffer full for client: %zu", client->index_); + client_abort(client); + return false; + } + if (!client_flush(client)) + return false; + return client_end_request(client, id, 0); + } + + std::vector extra_; + size_t max_requests_{0}; + StdoutBuffer stdout_; +}; + +class FastCgiFactory : public Transport::Factory { +public: + std::unique_ptr create( + std::shared_ptr logger, + std::shared_ptr looper, + std::shared_ptr runner, + Logger* config_logger, + Config const* config, + Transport::Handler* handler) { + auto transport = + std::make_unique(logger, looper, runner, handler); + if (transport->setup(config_logger, config)) + return transport; + return nullptr; + } +}; + +} // namespace + +std::unique_ptr create_transport_factory_fastcgi() { + return std::make_unique(); +} -- cgit v1.2.3-70-g09d2