#include "common.hh" #include #include "blissful_monitor.hh" #include "clock.hh" #include "looper.hh" #include "observers.hh" namespace { #if FAKE_MONITOR static const double JOB_EXPIRE_S = 1 * 60.0; // 1 min #else static const double JOB_EXPIRE_S = 10 * 60.0; // 10 min #endif class BlissfulMonitorImpl : public BlissfulMonitor, private Monitor::Observer { public: BlissfulMonitorImpl(std::shared_ptr const& looper, std::unique_ptr&& monitor) : looper_(looper), monitor_(std::move(monitor)), timer_(0) { monitor_->add_observer(this); } ~BlissfulMonitorImpl() override { if (timer_) { looper_->cancel(timer_); } monitor_->remove_observer(this); } void connect(std::string const& netname, std::string const& scheduler, uint16_t port) override { monitor_->connect(netname, scheduler, port); } void disconnect() override { monitor_->disconnect(); if (timer_) { looper_->cancel(timer_); timer_ = 0; } } size_t machines() const override { return monitor_->machines(); } uint32_t id(size_t index) const override { return monitor_->id(index); } Machine machine_at(size_t index) const override { return monitor_->machine_at(index); } Machine machine(uint32_t id) const override { return monitor_->machine(id); } void add_observer(Monitor::Observer* observer) override { observers_.add(observer); } void remove_observer(Monitor::Observer* observer) override { observers_.remove(observer); } #if FAKE_MONITOR void toggle_fakes() override { monitor_->toggle_fakes(); } #endif private: struct Job { uint32_t source; uint32_t target; double start; Job(uint32_t source, uint32_t target) : source(source), target(target), start(clk::steady()) { } }; void state(Monitor* monitor, State state) override { assert(monitor == monitor_.get()); auto observer = observers_.first(); while (observer) { observer->state(this, state); observer = observers_.next(); } } void added_machine(Monitor* monitor, uint32_t id) override { assert(monitor == monitor_.get()); known_machines_.insert(id); auto observer = observers_.first(); while (observer) { observer->added_machine(this, id); observer = observers_.next(); } } void updated_machine(Monitor* monitor, uint32_t id) override { assert(monitor == monitor_.get()); auto observer = observers_.first(); while (observer) { observer->updated_machine(this, id); observer = observers_.next(); } } void removed_machine(Monitor* monitor, uint32_t id) override { assert(monitor == monitor_.get()); known_machines_.erase(id); auto it = active_jobs_.begin(); while (it != active_jobs_.end()) { if (it->target == id || it->source == id) { // Remove active jobs linked to removed machine notify_removed_job(it->source, it->target); it = active_jobs_.erase(it); } else { ++it; } } auto observer = observers_.first(); while (observer) { observer->removed_machine(this, id); observer = observers_.next(); } } void added_job(Monitor* monitor, uint32_t source_id, uint32_t target_id) override { assert(monitor == monitor_.get()); if (known_machines_.count(source_id) == 0 || (source_id != target_id && known_machines_.count(target_id) == 0)) { // Unknown machine listed for job, ignore return; } if (!timer_) { timer_ = looper_->schedule(JOB_EXPIRE_S, std::bind(&BlissfulMonitorImpl::check_jobs, this, std::placeholders::_1, std::placeholders::_2)); } active_jobs_.emplace_back(source_id, target_id); auto observer = observers_.first(); while (observer) { observer->added_job(this, source_id, target_id); observer = observers_.next(); } } void removed_job(Monitor* monitor, uint32_t source_id, uint32_t target_id) override { assert(monitor == monitor_.get()); for (auto it = active_jobs_.begin(); it != active_jobs_.end(); ++it) { if (it->target == target_id && it->source == source_id) { notify_removed_job(it->source, it->target); active_jobs_.erase(it); return; } } // Unknown job removed } void notify_removed_job(uint32_t source_id, uint32_t target_id) { auto observer = observers_.first(); while (observer) { observer->removed_job(this, source_id, target_id); observer = observers_.next(); } } void check_jobs(Looper* looper, uint32_t id) { assert(looper_.get() == looper); assert(timer_ == id); auto const expired = clk::steady() - JOB_EXPIRE_S; auto next = 0.0; auto it = active_jobs_.begin(); while (it != active_jobs_.end()) { if (it->start <= expired) { notify_removed_job(it->source, it->target); it = active_jobs_.erase(it); } else { next = it->start; break; } } if (active_jobs_.empty()) { timer_ = 0; } else { timer_ = looper_->schedule(next - expired, std::bind(&BlissfulMonitorImpl::check_jobs, this, std::placeholders::_1, std::placeholders::_2)); } } std::shared_ptr looper_; std::unique_ptr monitor_; Observers observers_; std::unordered_set known_machines_; std::vector active_jobs_; uint32_t timer_; }; } // namespace // static std::unique_ptr BlissfulMonitor::create( std::shared_ptr const& looper, std::unique_ptr&& monitor) { return std::make_unique(looper, std::move(monitor)); }