summaryrefslogtreecommitdiff
path: root/src/files_finder.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/files_finder.cc')
-rw-r--r--src/files_finder.cc231
1 files changed, 231 insertions, 0 deletions
diff --git a/src/files_finder.cc b/src/files_finder.cc
new file mode 100644
index 0000000..fef05ba
--- /dev/null
+++ b/src/files_finder.cc
@@ -0,0 +1,231 @@
+#include "common.hh"
+
+#include "files_finder.hh"
+#include "io.hh"
+#include "logger.hh"
+#include "task_runner.hh"
+#include "unique_fd.hh"
+#include "weak_ptr.hh"
+
+#include <condition_variable>
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <mutex>
+#include <optional>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+namespace {
+
+constexpr uint8_t kMaxQueued = 128;
+
+class FilesFinderImpl : public FilesFinder {
+public:
+ FilesFinderImpl(
+ std::shared_ptr<Logger> logger,
+ std::shared_ptr<TaskRunner> runner,
+ std::filesystem::path root,
+ Delegate* delegate,
+ size_t threads)
+ : logger_(std::move(logger)), runner_(std::move(runner)),
+ workers_(TaskRunner::create(threads)), root_(std::move(root)),
+ delegate_(delegate) {
+ workers_->post(std::bind(&FilesFinderImpl::open_root, this));
+ }
+
+private:
+ void open_root() {
+ unique_fd fd = io::open(
+ root_, io::open_flags::rdonly | io::open_flags::directory);
+ if (fd) {
+ increment_queued();
+ list_dir(std::move(fd), root_, 0);
+ } else {
+ logger_->warn("Unable to open %s: %s", root_.c_str(), strerror(errno));
+ runner_->post(std::bind(&Delegate::done, delegate_));
+ }
+ }
+
+ void open_dir(int fd, std::filesystem::path path, uint16_t depth) {
+ list_dir(unique_fd(fd), path, depth);
+ }
+
+ void list_dir(unique_fd fd, std::filesystem::path path, uint16_t depth) {
+ DIR* dh = fdopendir(fd.get());
+ if (!dh) {
+ logger_->warn("Unable to list %s: %s", path.c_str(), strerror(errno));
+ return;
+ }
+ fd.release(); // fd is now owned by dh
+ while (true) {
+ errno = 0;
+ auto* de = readdir(dh);
+ if (!de) {
+ if (errno)
+ logger_->warn("Error listing %s: %s", path.c_str(), strerror(errno));
+ break;
+ }
+ size_t namlen;
+#ifdef _DIRENT_HAVE_D_NAMLEN
+ namlen = de->d_namlen;
+#else
+ namlen = strlen(de->d_name);
+#endif
+ if (namlen == 2 && de->d_name[0] == '.' && de->d_name[1] == '.')
+ continue;
+ std::optional<bool> is_dir;
+#ifdef _DIRENT_HAVE_D_TYPE
+ switch (de->d_type) {
+ case DT_DIR:
+ is_dir = true;
+ break;
+ case DT_CHR:
+ case DT_FIFO:
+ case DT_BLK:
+ case DT_REG:
+ case DT_SOCK:
+ case DT_WHT:
+ is_dir = false;
+ break;
+ case DT_LNK:
+ case DT_UNKNOWN:
+ default:
+ break;
+ }
+#endif
+ if (!is_dir.has_value()) {
+ struct stat buf;
+ if (fstatat(dirfd(dh), de->d_name, &buf, 0)) {
+ logger_->warn("Unable to stat: %s/%s: %s",
+ path.c_str(), de->d_name, strerror(errno));
+ continue;
+ }
+ is_dir = S_ISDIR(buf.st_mode);
+ }
+
+ std::string_view name(de->d_name, namlen);
+
+ if (is_dir.value()) {
+ if (delegate_->include_dir(name, depth)) {
+ unique_fd new_fd = io::openat(
+ dirfd(dh), de->d_name,
+ io::open_flags::rdonly | io::open_flags::directory);
+ if (new_fd) {
+ increment_queued();
+ workers_->post(std::bind(&FilesFinderImpl::open_dir, this,
+ new_fd.release(), path / name,
+ depth + 1));
+ } else {
+ logger_->warn("Unable to open %s/%s: %s", path.c_str(), de->d_name,
+ strerror(errno));
+ }
+ }
+ } else {
+ if (delegate_->include_file(name, depth)) {
+ increment_active();
+ runner_->post(std::bind(
+ &FilesFinderImpl::weak_call_file,
+ weak_ptr_owner_.get(),
+ path / name));
+ }
+ }
+ }
+ closedir(dh);
+
+ decrement_queued();
+ }
+
+ void increment_queued() {
+ // Queued is used both to keep track of when done() should be called
+ // but also to max sure we don't queued too many directories as
+ // each queued entry costs one open file.
+ std::unique_lock<std::mutex> lock(queued_mutex_);
+ while (queued_ >= kMaxQueued) {
+ queued_cond_.wait(lock);
+ }
+ ++queued_;
+ }
+
+ void increment_active() {
+ std::lock_guard<std::mutex> lock(queued_mutex_);
+ ++active_;
+ }
+
+ void decrement_queued() {
+ bool notify;
+ bool post_done;
+ {
+ std::lock_guard<std::mutex> lock(queued_mutex_);
+ notify = queued_ >= kMaxQueued;
+ --queued_;
+ post_done = queued_ == 0 && active_ == 0;
+ }
+ if (notify)
+ queued_cond_.notify_one();
+ if (post_done)
+ runner_->post(std::bind(&Delegate::done, delegate_));
+ }
+
+ void decrement_active() {
+ bool post_done;
+ {
+ std::lock_guard<std::mutex> lock(queued_mutex_);
+ --active_;
+ post_done = queued_ == 0 && active_ == 0;
+ }
+ if (post_done)
+ runner_->post(std::bind(&Delegate::done, delegate_));
+ }
+
+ static void weak_call_file(std::shared_ptr<WeakPtr<FilesFinderImpl>> weak_ptr,
+ std::filesystem::path path) {
+ auto* ptr = weak_ptr->get();
+ if (ptr)
+ ptr->call_file(std::move(path));
+ }
+
+ void call_file(std::filesystem::path path) {
+ delegate_->file(std::move(path));
+ decrement_active();
+ }
+
+ std::shared_ptr<Logger> logger_;
+ std::shared_ptr<TaskRunner> runner_;
+ std::shared_ptr<TaskRunner> workers_;
+ std::filesystem::path const root_;
+ Delegate* const delegate_;
+
+ std::mutex queued_mutex_;
+ std::condition_variable queued_cond_;
+ uint8_t queued_{0};
+ size_t active_{0};
+
+ WeakPtrOwner<FilesFinderImpl> weak_ptr_owner_{this};
+};
+
+} // namespace
+
+bool FilesFinder::Delegate::include_file(std::string_view name,
+ uint16_t /* depth */) const {
+ return name.empty() || name.front() != '.';
+}
+
+bool FilesFinder::Delegate::include_dir(std::string_view name,
+ uint16_t /* depth */) const {
+ return name.empty() || name.front() != '.';
+}
+
+void FilesFinder::Delegate::done() {}
+
+std::unique_ptr<FilesFinder> FilesFinder::create(
+ std::shared_ptr<Logger> logger,
+ std::shared_ptr<TaskRunner> runner,
+ std::filesystem::path root,
+ Delegate* delegate,
+ size_t threads) {
+ return std::make_unique<FilesFinderImpl>(std::move(logger), std::move(runner),
+ std::move(root), delegate, threads);
+}
+