#include "common.hh" #include "file_opener.hh" #include "io.hh" #include "task_runner.hh" #include "weak_ptr.hh" #include #include namespace { class FileOpenerImpl : public FileOpener { public: FileOpenerImpl(std::shared_ptr runner, size_t threads) : runner_(std::move(runner)), workers_(TaskRunner::create(threads)), weak_ptr_owner_(this) {} uint32_t open(std::filesystem::path path, std::function callback) override { uint32_t id; { std::lock_guard lock(jobs_mutex_); while (true) { id = next_id_++; if (next_id_ == 0) next_id_ = 1; if (jobs_.find(id) == jobs_.end()) break; } jobs_[id].callback_ = std::move(callback); } workers_->post(std::bind(&FileOpenerImpl::do_open, this, id, path)); return id; } void cancel(uint32_t id) override { std::lock_guard lock(jobs_mutex_); auto it = jobs_.find(id); if (it == jobs_.end()) return; jobs_.erase(it); } private: struct Job { std::function callback_; unique_fd fd_; }; void done(uint32_t id) { std::lock_guard lock(jobs_mutex_); auto it = jobs_.find(id); if (it == jobs_.end()) return; auto fd = std::move(it->second.fd_); auto callback = std::move(it->second.callback_); jobs_.erase(it); callback(id, std::move(fd)); } void do_open(uint32_t id, std::filesystem::path path) { auto fd = io::open(path, io::open_flags::rdonly); if (fd) { if (!io::make_nonblocking(fd.get())) { assert(false); fd.reset(); } } if (fd) { std::lock_guard lock(jobs_mutex_); auto it = jobs_.find(id); if (it == jobs_.end()) return; it->second.fd_ = std::move(fd); } runner_->post( std::bind(&FileOpenerImpl::weak_done, weak_ptr_owner_.get(), id)); } static void weak_done(std::shared_ptr> weak_ptr, uint32_t id) { auto* ptr = weak_ptr->get(); if (ptr) ptr->done(id); } std::shared_ptr runner_; std::shared_ptr> weak_ptr_; uint32_t next_id_{1}; std::mutex jobs_mutex_; std::unordered_map jobs_; // It is important that workers_ is (next to) last as it blocks leftover // workers in destructor so should be destroyed first. std::unique_ptr workers_; WeakPtrOwner weak_ptr_owner_; }; } // namespace std::unique_ptr FileOpener::create( std::shared_ptr runner, size_t threads) { return std::make_unique(std::move(runner), threads); }