1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
#include "common.hh"
#include "file_opener.hh"
#include "io.hh"
#include "task_runner.hh"
#include "weak_ptr.hh"
#include <mutex>
#include <unordered_map>
namespace {
class FileOpenerImpl : public FileOpener {
public:
FileOpenerImpl(std::shared_ptr<TaskRunner> runner, size_t threads)
: runner_(std::move(runner)), workers_(TaskRunner::create(threads)),
weak_ptr_owner_(this) {}
uint32_t open(std::filesystem::path path,
std::function<void(uint32_t, unique_fd)> callback) override {
uint32_t id;
{
std::lock_guard<std::mutex> lock(jobs_mutex_);
while (true) {
id = next_id_++;
if (next_id_ == 0)
next_id_ = 1;
if (jobs_.find(id) == jobs_.end())
break;
}
jobs_[id].callback_ = std::move(callback);
}
workers_->post(std::bind(&FileOpenerImpl::do_open, this, id, path));
return id;
}
void cancel(uint32_t id) override {
std::lock_guard<std::mutex> lock(jobs_mutex_);
auto it = jobs_.find(id);
if (it == jobs_.end())
return;
jobs_.erase(it);
}
private:
struct Job {
std::function<void(uint32_t, unique_fd)> callback_;
unique_fd fd_;
};
void done(uint32_t id) {
std::lock_guard<std::mutex> lock(jobs_mutex_);
auto it = jobs_.find(id);
if (it == jobs_.end())
return;
auto fd = std::move(it->second.fd_);
auto callback = std::move(it->second.callback_);
jobs_.erase(it);
callback(id, std::move(fd));
}
void do_open(uint32_t id, std::filesystem::path path) {
auto fd = io::open(path, io::open_flags::rdonly);
if (fd) {
if (!io::make_nonblocking(fd.get())) {
assert(false);
fd.reset();
}
}
if (fd) {
std::lock_guard<std::mutex> lock(jobs_mutex_);
auto it = jobs_.find(id);
if (it == jobs_.end())
return;
it->second.fd_ = std::move(fd);
}
runner_->post(
std::bind(&FileOpenerImpl::weak_done, weak_ptr_owner_.get(), id));
}
static void weak_done(std::shared_ptr<WeakPtr<FileOpenerImpl>> weak_ptr,
uint32_t id) {
auto* ptr = weak_ptr->get();
if (ptr)
ptr->done(id);
}
std::shared_ptr<TaskRunner> runner_;
std::shared_ptr<WeakPtr<FileOpenerImpl>> weak_ptr_;
uint32_t next_id_{1};
std::mutex jobs_mutex_;
std::unordered_map<uint32_t, Job> jobs_;
// It is important that workers_ is (next to) last as it blocks leftover
// workers in destructor so should be destroyed first.
std::unique_ptr<TaskRunner> workers_;
WeakPtrOwner<FileOpenerImpl> weak_ptr_owner_;
};
} // namespace
std::unique_ptr<FileOpener> FileOpener::create(
std::shared_ptr<TaskRunner> runner, size_t threads) {
return std::make_unique<FileOpenerImpl>(std::move(runner), threads);
}
|