diff options
Diffstat (limited to 'src/files_finder.cc')
| -rw-r--r-- | src/files_finder.cc | 231 |
1 files changed, 231 insertions, 0 deletions
diff --git a/src/files_finder.cc b/src/files_finder.cc new file mode 100644 index 0000000..fef05ba --- /dev/null +++ b/src/files_finder.cc @@ -0,0 +1,231 @@ +#include "common.hh" + +#include "files_finder.hh" +#include "io.hh" +#include "logger.hh" +#include "task_runner.hh" +#include "unique_fd.hh" +#include "weak_ptr.hh" + +#include <condition_variable> +#include <dirent.h> +#include <errno.h> +#include <fcntl.h> +#include <mutex> +#include <optional> +#include <string.h> +#include <sys/stat.h> +#include <sys/types.h> + +namespace { + +constexpr uint8_t kMaxQueued = 128; + +class FilesFinderImpl : public FilesFinder { +public: + FilesFinderImpl( + std::shared_ptr<Logger> logger, + std::shared_ptr<TaskRunner> runner, + std::filesystem::path root, + Delegate* delegate, + size_t threads) + : logger_(std::move(logger)), runner_(std::move(runner)), + workers_(TaskRunner::create(threads)), root_(std::move(root)), + delegate_(delegate) { + workers_->post(std::bind(&FilesFinderImpl::open_root, this)); + } + +private: + void open_root() { + unique_fd fd = io::open( + root_, io::open_flags::rdonly | io::open_flags::directory); + if (fd) { + increment_queued(); + list_dir(std::move(fd), root_, 0); + } else { + logger_->warn("Unable to open %s: %s", root_.c_str(), strerror(errno)); + runner_->post(std::bind(&Delegate::done, delegate_)); + } + } + + void open_dir(int fd, std::filesystem::path path, uint16_t depth) { + list_dir(unique_fd(fd), path, depth); + } + + void list_dir(unique_fd fd, std::filesystem::path path, uint16_t depth) { + DIR* dh = fdopendir(fd.get()); + if (!dh) { + logger_->warn("Unable to list %s: %s", path.c_str(), strerror(errno)); + return; + } + fd.release(); // fd is now owned by dh + while (true) { + errno = 0; + auto* de = readdir(dh); + if (!de) { + if (errno) + logger_->warn("Error listing %s: %s", path.c_str(), strerror(errno)); + break; + } + size_t namlen; +#ifdef _DIRENT_HAVE_D_NAMLEN + namlen = de->d_namlen; +#else + namlen = strlen(de->d_name); +#endif + if (namlen == 2 && de->d_name[0] == '.' && de->d_name[1] == '.') + continue; + std::optional<bool> is_dir; +#ifdef _DIRENT_HAVE_D_TYPE + switch (de->d_type) { + case DT_DIR: + is_dir = true; + break; + case DT_CHR: + case DT_FIFO: + case DT_BLK: + case DT_REG: + case DT_SOCK: + case DT_WHT: + is_dir = false; + break; + case DT_LNK: + case DT_UNKNOWN: + default: + break; + } +#endif + if (!is_dir.has_value()) { + struct stat buf; + if (fstatat(dirfd(dh), de->d_name, &buf, 0)) { + logger_->warn("Unable to stat: %s/%s: %s", + path.c_str(), de->d_name, strerror(errno)); + continue; + } + is_dir = S_ISDIR(buf.st_mode); + } + + std::string_view name(de->d_name, namlen); + + if (is_dir.value()) { + if (delegate_->include_dir(name, depth)) { + unique_fd new_fd = io::openat( + dirfd(dh), de->d_name, + io::open_flags::rdonly | io::open_flags::directory); + if (new_fd) { + increment_queued(); + workers_->post(std::bind(&FilesFinderImpl::open_dir, this, + new_fd.release(), path / name, + depth + 1)); + } else { + logger_->warn("Unable to open %s/%s: %s", path.c_str(), de->d_name, + strerror(errno)); + } + } + } else { + if (delegate_->include_file(name, depth)) { + increment_active(); + runner_->post(std::bind( + &FilesFinderImpl::weak_call_file, + weak_ptr_owner_.get(), + path / name)); + } + } + } + closedir(dh); + + decrement_queued(); + } + + void increment_queued() { + // Queued is used both to keep track of when done() should be called + // but also to max sure we don't queued too many directories as + // each queued entry costs one open file. + std::unique_lock<std::mutex> lock(queued_mutex_); + while (queued_ >= kMaxQueued) { + queued_cond_.wait(lock); + } + ++queued_; + } + + void increment_active() { + std::lock_guard<std::mutex> lock(queued_mutex_); + ++active_; + } + + void decrement_queued() { + bool notify; + bool post_done; + { + std::lock_guard<std::mutex> lock(queued_mutex_); + notify = queued_ >= kMaxQueued; + --queued_; + post_done = queued_ == 0 && active_ == 0; + } + if (notify) + queued_cond_.notify_one(); + if (post_done) + runner_->post(std::bind(&Delegate::done, delegate_)); + } + + void decrement_active() { + bool post_done; + { + std::lock_guard<std::mutex> lock(queued_mutex_); + --active_; + post_done = queued_ == 0 && active_ == 0; + } + if (post_done) + runner_->post(std::bind(&Delegate::done, delegate_)); + } + + static void weak_call_file(std::shared_ptr<WeakPtr<FilesFinderImpl>> weak_ptr, + std::filesystem::path path) { + auto* ptr = weak_ptr->get(); + if (ptr) + ptr->call_file(std::move(path)); + } + + void call_file(std::filesystem::path path) { + delegate_->file(std::move(path)); + decrement_active(); + } + + std::shared_ptr<Logger> logger_; + std::shared_ptr<TaskRunner> runner_; + std::shared_ptr<TaskRunner> workers_; + std::filesystem::path const root_; + Delegate* const delegate_; + + std::mutex queued_mutex_; + std::condition_variable queued_cond_; + uint8_t queued_{0}; + size_t active_{0}; + + WeakPtrOwner<FilesFinderImpl> weak_ptr_owner_{this}; +}; + +} // namespace + +bool FilesFinder::Delegate::include_file(std::string_view name, + uint16_t /* depth */) const { + return name.empty() || name.front() != '.'; +} + +bool FilesFinder::Delegate::include_dir(std::string_view name, + uint16_t /* depth */) const { + return name.empty() || name.front() != '.'; +} + +void FilesFinder::Delegate::done() {} + +std::unique_ptr<FilesFinder> FilesFinder::create( + std::shared_ptr<Logger> logger, + std::shared_ptr<TaskRunner> runner, + std::filesystem::path root, + Delegate* delegate, + size_t threads) { + return std::make_unique<FilesFinderImpl>(std::move(logger), std::move(runner), + std::move(root), delegate, threads); +} + |
