From f4602c1bb6f9fe4090ace211c5013f71653bcd4c Mon Sep 17 00:00:00 2001 From: Joel Klinghed Date: Tue, 31 Oct 2017 16:35:11 +0100 Subject: Add a blissful monitor proxy It takes a given monitor and "fixes" calls from it by expiring "dead" jobs (jobs older than 10min) and by sanitychecking that hosts exist and such --- src/blissful_monitor.cc | 207 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 src/blissful_monitor.cc (limited to 'src/blissful_monitor.cc') diff --git a/src/blissful_monitor.cc b/src/blissful_monitor.cc new file mode 100644 index 0000000..c17bc10 --- /dev/null +++ b/src/blissful_monitor.cc @@ -0,0 +1,207 @@ +#include "common.hh" + +#include + +#include "blissful_monitor.hh" +#include "clock.hh" +#include "looper.hh" +#include "observers.hh" + +namespace { + +static const double JOB_EXPIRE_S = 10 * 60.0; // 10 min + +class BlissfulMonitorImpl : public BlissfulMonitor, private Monitor::Observer { +public: + BlissfulMonitorImpl(std::shared_ptr const& looper, + std::unique_ptr&& monitor) + : looper_(looper), monitor_(std::move(monitor)), timer_(0) { + monitor_->add_observer(this); + } + + ~BlissfulMonitorImpl() override { + if (timer_) { + looper_->cancel(timer_); + } + monitor_->remove_observer(this); + } + + void connect(std::string const& netname, std::string const& scheduler, + uint16_t port) override { + monitor_->connect(netname, scheduler, port); + } + void disconnect() override { + monitor_->disconnect(); + } + size_t machines() const override { + return monitor_->machines(); + } + uint32_t id(size_t index) const override { + return monitor_->id(index); + } + Machine machine_at(size_t index) const override { + return monitor_->machine_at(index); + } + Machine machine(uint32_t id) const override { + return monitor_->machine(id); + } + + void add_observer(Monitor::Observer* observer) override { + observers_.add(observer); + } + void remove_observer(Monitor::Observer* observer) override { + observers_.remove(observer); + } + +private: + struct Job { + uint32_t source; + uint32_t target; + double start; + + Job(uint32_t source, uint32_t target) + : source(source), target(target), start(clk::steady()) { + } + }; + + void state(Monitor* monitor, State state) override { + assert(monitor == monitor_.get()); + auto observer = observers_.first(); + while (observer) { + observer->state(this, state); + observer = observers_.next(); + } + } + + void added_machine(Monitor* monitor, uint32_t id) override { + assert(monitor == monitor_.get()); + known_machines_.insert(id); + auto observer = observers_.first(); + while (observer) { + observer->added_machine(this, id); + observer = observers_.next(); + } + } + + void updated_machine(Monitor* monitor, uint32_t id) override { + assert(monitor == monitor_.get()); + auto observer = observers_.first(); + while (observer) { + observer->updated_machine(this, id); + observer = observers_.next(); + } + } + + void removed_machine(Monitor* monitor, uint32_t id) override { + assert(monitor == monitor_.get()); + known_machines_.erase(id); + + auto it = active_jobs_.begin(); + while (it != active_jobs_.end()) { + if (it->target == id || it->source == id) { + // Remove active jobs linked to removed machine + notify_removed_job(it->source, it->target); + it = active_jobs_.erase(it); + } else { + ++it; + } + } + + auto observer = observers_.first(); + while (observer) { + observer->removed_machine(this, id); + observer = observers_.next(); + } + } + + void added_job(Monitor* monitor, uint32_t source_id, + uint32_t target_id) override { + assert(monitor == monitor_.get()); + if (known_machines_.count(source_id) == 0 + || (source_id != target_id && known_machines_.count(target_id) == 0)) { + // Unknown machine listed for job, ignore + return; + } + if (!timer_) { + timer_ = looper_->schedule(JOB_EXPIRE_S, + std::bind(&BlissfulMonitorImpl::check_jobs, this, + std::placeholders::_1, + std::placeholders::_2)); + } + active_jobs_.emplace_back(source_id, target_id); + auto observer = observers_.first(); + while (observer) { + observer->added_job(this, source_id, target_id); + observer = observers_.next(); + } + } + + void removed_job(Monitor* monitor, uint32_t source_id, + uint32_t target_id) override { + assert(monitor == monitor_.get()); + + for (auto it = active_jobs_.begin(); it != active_jobs_.end(); ++it) { + if (it->target == target_id && it->source == source_id) { + notify_removed_job(it->source, it->target); + active_jobs_.erase(it); + return; + } + } + + // Unknown job removed + } + + void notify_removed_job(uint32_t source_id, uint32_t target_id) { + auto observer = observers_.first(); + while (observer) { + observer->removed_job(this, source_id, target_id); + observer = observers_.next(); + } + } + + void check_jobs(Looper* looper, uint32_t id) { + assert(looper_.get() == looper); + assert(timer_ == id); + + auto const expired = clk::steady() - JOB_EXPIRE_S; + auto next = 0.0; + + auto it = active_jobs_.begin(); + while (it != active_jobs_.end()) { + if (it->start <= expired) { + notify_removed_job(it->source, it->target); + it = active_jobs_.erase(it); + } else { + next = it->start; + break; + } + } + + if (active_jobs_.empty()) { + timer_ = 0; + } else { + timer_ = looper_->schedule(next - expired, + std::bind(&BlissfulMonitorImpl::check_jobs, this, + std::placeholders::_1, + std::placeholders::_2)); + } + } + + std::shared_ptr looper_; + std::unique_ptr monitor_; + Observers observers_; + std::unordered_set known_machines_; + std::vector active_jobs_; + + uint32_t timer_; +}; + +} // namespace + +// static +std::unique_ptr BlissfulMonitor::create( + std::shared_ptr const& looper, std::unique_ptr&& monitor) { + return std::make_unique(looper, std::move(monitor)); +} + + -- cgit v1.3