summaryrefslogtreecommitdiff
path: root/src/file_opener.cc
blob: 60aaa66a425d074853f0179e82bf1471f2c0fe8d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include "common.hh"

#include "file_opener.hh"
#include "io.hh"
#include "task_runner.hh"
#include "weak_ptr.hh"

#include <mutex>
#include <unordered_map>

namespace {

class FileOpenerImpl : public FileOpener {
public:
  FileOpenerImpl(std::shared_ptr<TaskRunner> runner, size_t threads)
    : runner_(std::move(runner)), workers_(TaskRunner::create(threads)),
      weak_ptr_owner_(this) {}

  uint32_t open(std::filesystem::path path,
                std::function<void(uint32_t, unique_fd)> callback) override {
    uint32_t id;
    {
      std::lock_guard<std::mutex> lock(jobs_mutex_);
      while (true) {
        id = next_id_++;
        if (next_id_ == 0)
          next_id_ = 1;
        if (jobs_.find(id) == jobs_.end())
          break;
      }
      jobs_[id].callback_ = std::move(callback);
    }
    workers_->post(std::bind(&FileOpenerImpl::do_open, this, id, path));
    return id;
  }

  void cancel(uint32_t id) override {
    std::lock_guard<std::mutex> lock(jobs_mutex_);
    auto it = jobs_.find(id);
    if (it == jobs_.end())
      return;
    jobs_.erase(it);
  }

private:
  struct Job {
    std::function<void(uint32_t, unique_fd)> callback_;
    unique_fd fd_;
  };

  void done(uint32_t id) {
    std::lock_guard<std::mutex> lock(jobs_mutex_);
    auto it = jobs_.find(id);
    if (it == jobs_.end())
      return;
    auto fd = std::move(it->second.fd_);
    auto callback = std::move(it->second.callback_);
    jobs_.erase(it);
    callback(id, std::move(fd));
  }

  void do_open(uint32_t id, std::filesystem::path path) {
    auto fd = io::open(path, io::open_flags::rdonly);
    if (fd) {
      if (!io::make_nonblocking(fd.get())) {
        assert(false);
        fd.reset();
      }
    }
    if (fd) {
      std::lock_guard<std::mutex> lock(jobs_mutex_);
      auto it = jobs_.find(id);
      if (it == jobs_.end())
        return;
      it->second.fd_ = std::move(fd);
    }
    runner_->post(
        std::bind(&FileOpenerImpl::weak_done, weak_ptr_owner_.get(), id));
  }

  static void weak_done(std::shared_ptr<WeakPtr<FileOpenerImpl>> weak_ptr,
                        uint32_t id) {
    auto* ptr = weak_ptr->get();
    if (ptr)
      ptr->done(id);
  }

  std::shared_ptr<TaskRunner> runner_;
  std::shared_ptr<WeakPtr<FileOpenerImpl>> weak_ptr_;
  uint32_t next_id_{1};

  std::mutex jobs_mutex_;
  std::unordered_map<uint32_t, Job> jobs_;

  // It is important that workers_ is (next to) last as it blocks leftover
  // workers in destructor so should be destroyed first.
  std::unique_ptr<TaskRunner> workers_;
  WeakPtrOwner<FileOpenerImpl> weak_ptr_owner_;
};

}  // namespace

std::unique_ptr<FileOpener> FileOpener::create(
    std::shared_ptr<TaskRunner> runner, size_t threads) {
  return std::make_unique<FileOpenerImpl>(std::move(runner), threads);
}