summaryrefslogtreecommitdiff
path: root/src/file_opener.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/file_opener.cc')
-rw-r--r--src/file_opener.cc106
1 files changed, 106 insertions, 0 deletions
diff --git a/src/file_opener.cc b/src/file_opener.cc
new file mode 100644
index 0000000..60aaa66
--- /dev/null
+++ b/src/file_opener.cc
@@ -0,0 +1,106 @@
+#include "common.hh"
+
+#include "file_opener.hh"
+#include "io.hh"
+#include "task_runner.hh"
+#include "weak_ptr.hh"
+
+#include <mutex>
+#include <unordered_map>
+
+namespace {
+
+class FileOpenerImpl : public FileOpener {
+public:
+ FileOpenerImpl(std::shared_ptr<TaskRunner> runner, size_t threads)
+ : runner_(std::move(runner)), workers_(TaskRunner::create(threads)),
+ weak_ptr_owner_(this) {}
+
+ uint32_t open(std::filesystem::path path,
+ std::function<void(uint32_t, unique_fd)> callback) override {
+ uint32_t id;
+ {
+ std::lock_guard<std::mutex> lock(jobs_mutex_);
+ while (true) {
+ id = next_id_++;
+ if (next_id_ == 0)
+ next_id_ = 1;
+ if (jobs_.find(id) == jobs_.end())
+ break;
+ }
+ jobs_[id].callback_ = std::move(callback);
+ }
+ workers_->post(std::bind(&FileOpenerImpl::do_open, this, id, path));
+ return id;
+ }
+
+ void cancel(uint32_t id) override {
+ std::lock_guard<std::mutex> lock(jobs_mutex_);
+ auto it = jobs_.find(id);
+ if (it == jobs_.end())
+ return;
+ jobs_.erase(it);
+ }
+
+private:
+ struct Job {
+ std::function<void(uint32_t, unique_fd)> callback_;
+ unique_fd fd_;
+ };
+
+ void done(uint32_t id) {
+ std::lock_guard<std::mutex> lock(jobs_mutex_);
+ auto it = jobs_.find(id);
+ if (it == jobs_.end())
+ return;
+ auto fd = std::move(it->second.fd_);
+ auto callback = std::move(it->second.callback_);
+ jobs_.erase(it);
+ callback(id, std::move(fd));
+ }
+
+ void do_open(uint32_t id, std::filesystem::path path) {
+ auto fd = io::open(path, io::open_flags::rdonly);
+ if (fd) {
+ if (!io::make_nonblocking(fd.get())) {
+ assert(false);
+ fd.reset();
+ }
+ }
+ if (fd) {
+ std::lock_guard<std::mutex> lock(jobs_mutex_);
+ auto it = jobs_.find(id);
+ if (it == jobs_.end())
+ return;
+ it->second.fd_ = std::move(fd);
+ }
+ runner_->post(
+ std::bind(&FileOpenerImpl::weak_done, weak_ptr_owner_.get(), id));
+ }
+
+ static void weak_done(std::shared_ptr<WeakPtr<FileOpenerImpl>> weak_ptr,
+ uint32_t id) {
+ auto* ptr = weak_ptr->get();
+ if (ptr)
+ ptr->done(id);
+ }
+
+ std::shared_ptr<TaskRunner> runner_;
+ std::shared_ptr<WeakPtr<FileOpenerImpl>> weak_ptr_;
+ uint32_t next_id_{1};
+
+ std::mutex jobs_mutex_;
+ std::unordered_map<uint32_t, Job> jobs_;
+
+ // It is important that workers_ is (next to) last as it blocks leftover
+ // workers in destructor so should be destroyed first.
+ std::unique_ptr<TaskRunner> workers_;
+ WeakPtrOwner<FileOpenerImpl> weak_ptr_owner_;
+};
+
+} // namespace
+
+std::unique_ptr<FileOpener> FileOpener::create(
+ std::shared_ptr<TaskRunner> runner, size_t threads) {
+ return std::make_unique<FileOpenerImpl>(std::move(runner), threads);
+}