#include "common.hh" #include "files_finder.hh" #include "io.hh" #include "logger.hh" #include "task_runner.hh" #include "unique_fd.hh" #include "weak_ptr.hh" #include #include #include #include #include #include #include #include #include namespace { constexpr uint8_t kMaxQueued = 128; class FilesFinderImpl : public FilesFinder { public: FilesFinderImpl( std::shared_ptr logger, std::shared_ptr runner, std::filesystem::path root, Delegate* delegate, size_t threads) : logger_(std::move(logger)), runner_(std::move(runner)), workers_(TaskRunner::create(threads)), root_(std::move(root)), delegate_(delegate) { workers_->post(std::bind(&FilesFinderImpl::open_root, this)); } private: void open_root() { unique_fd fd = io::open( root_, io::open_flags::rdonly | io::open_flags::directory); if (fd) { increment_queued(); list_dir(std::move(fd), root_, 0); } else { logger_->warn("Unable to open %s: %s", root_.c_str(), strerror(errno)); runner_->post(std::bind(&Delegate::done, delegate_)); } } void open_dir(int fd, std::filesystem::path path, uint16_t depth) { list_dir(unique_fd(fd), path, depth); } void list_dir(unique_fd fd, std::filesystem::path path, uint16_t depth) { DIR* dh = fdopendir(fd.get()); if (!dh) { logger_->warn("Unable to list %s: %s", path.c_str(), strerror(errno)); return; } fd.release(); // fd is now owned by dh while (true) { errno = 0; auto* de = readdir(dh); if (!de) { if (errno) logger_->warn("Error listing %s: %s", path.c_str(), strerror(errno)); break; } size_t namlen; #ifdef _DIRENT_HAVE_D_NAMLEN namlen = de->d_namlen; #else namlen = strlen(de->d_name); #endif if (namlen == 2 && de->d_name[0] == '.' && de->d_name[1] == '.') continue; std::optional is_dir; #ifdef _DIRENT_HAVE_D_TYPE switch (de->d_type) { case DT_DIR: is_dir = true; break; case DT_CHR: case DT_FIFO: case DT_BLK: case DT_REG: case DT_SOCK: case DT_WHT: is_dir = false; break; case DT_LNK: case DT_UNKNOWN: default: break; } #endif if (!is_dir.has_value()) { struct stat buf; if (fstatat(dirfd(dh), de->d_name, &buf, 0)) { logger_->warn("Unable to stat: %s/%s: %s", path.c_str(), de->d_name, strerror(errno)); continue; } is_dir = S_ISDIR(buf.st_mode); } std::string_view name(de->d_name, namlen); if (is_dir.value()) { if (delegate_->include_dir(name, depth)) { unique_fd new_fd = io::openat( dirfd(dh), de->d_name, io::open_flags::rdonly | io::open_flags::directory); if (new_fd) { increment_queued(); workers_->post(std::bind(&FilesFinderImpl::open_dir, this, new_fd.release(), path / name, depth + 1)); } else { logger_->warn("Unable to open %s/%s: %s", path.c_str(), de->d_name, strerror(errno)); } } } else { if (delegate_->include_file(name, depth)) { increment_active(); runner_->post(std::bind( &FilesFinderImpl::weak_call_file, weak_ptr_owner_.get(), path / name)); } } } closedir(dh); decrement_queued(); } void increment_queued() { // Queued is used both to keep track of when done() should be called // but also to max sure we don't queued too many directories as // each queued entry costs one open file. std::unique_lock lock(queued_mutex_); while (queued_ >= kMaxQueued) { queued_cond_.wait(lock); } ++queued_; } void increment_active() { std::lock_guard lock(queued_mutex_); ++active_; } void decrement_queued() { bool notify; bool post_done; { std::lock_guard lock(queued_mutex_); notify = queued_ >= kMaxQueued; --queued_; post_done = queued_ == 0 && active_ == 0; } if (notify) queued_cond_.notify_one(); if (post_done) runner_->post(std::bind(&Delegate::done, delegate_)); } void decrement_active() { bool post_done; { std::lock_guard lock(queued_mutex_); --active_; post_done = queued_ == 0 && active_ == 0; } if (post_done) runner_->post(std::bind(&Delegate::done, delegate_)); } static void weak_call_file(std::shared_ptr> weak_ptr, std::filesystem::path path) { auto* ptr = weak_ptr->get(); if (ptr) ptr->call_file(std::move(path)); } void call_file(std::filesystem::path path) { delegate_->file(std::move(path)); decrement_active(); } std::shared_ptr logger_; std::shared_ptr runner_; std::shared_ptr workers_; std::filesystem::path const root_; Delegate* const delegate_; std::mutex queued_mutex_; std::condition_variable queued_cond_; uint8_t queued_{0}; size_t active_{0}; WeakPtrOwner weak_ptr_owner_{this}; }; } // namespace bool FilesFinder::Delegate::include_file(std::string_view name, uint16_t /* depth */) const { return name.empty() || name.front() != '.'; } bool FilesFinder::Delegate::include_dir(std::string_view name, uint16_t /* depth */) const { return name.empty() || name.front() != '.'; } void FilesFinder::Delegate::done() {} std::unique_ptr FilesFinder::create( std::shared_ptr logger, std::shared_ptr runner, std::filesystem::path root, Delegate* delegate, size_t threads) { return std::make_unique(std::move(logger), std::move(runner), std::move(root), delegate, threads); }