diff options
| author | Joel Klinghed <the_jk@opera.com> | 2017-10-31 16:35:11 +0100 |
|---|---|---|
| committer | Joel Klinghed <the_jk@opera.com> | 2017-10-31 16:35:11 +0100 |
| commit | f4602c1bb6f9fe4090ace211c5013f71653bcd4c (patch) | |
| tree | ecda369875a1411e6902e4425d58bca5c8b01826 | |
| parent | 38140372d8c8dd32267943d5d79b2ce2c0a032fb (diff) | |
Add a blissful monitor proxy
It takes a given monitor and "fixes" calls from it by expiring
"dead" jobs (jobs older than 10min) and by sanitychecking that
hosts exist and such
| -rw-r--r-- | meson.build | 2 | ||||
| -rw-r--r-- | src/blissful_monitor.cc | 207 | ||||
| -rw-r--r-- | src/blissful_monitor.hh | 16 | ||||
| -rw-r--r-- | src/main.cc | 7 |
4 files changed, 229 insertions, 3 deletions
diff --git a/meson.build b/meson.build index c92b0c6..6be31bb 100644 --- a/meson.build +++ b/meson.build @@ -45,7 +45,7 @@ executable('monmon', sources: ['src/main.cc', 'src/x.cc', 'src/args.cc', 'src/poll_looper.cc', 'src/looper.cc', 'src/clock.cc', 'src/io.cc', 'src/monitor.cc', 'src/fake_monitor.cc', - 'src/animator.cc'], + 'src/animator.cc', 'src/blissful_monitor.cc'], dependencies: [dep_thread, dep_pango, dep_cairo, dep_icecc, xcb_deps], install: true) diff --git a/src/blissful_monitor.cc b/src/blissful_monitor.cc new file mode 100644 index 0000000..c17bc10 --- /dev/null +++ b/src/blissful_monitor.cc @@ -0,0 +1,207 @@ +#include "common.hh" + +#include <unordered_set> + +#include "blissful_monitor.hh" +#include "clock.hh" +#include "looper.hh" +#include "observers.hh" + +namespace { + +static const double JOB_EXPIRE_S = 10 * 60.0; // 10 min + +class BlissfulMonitorImpl : public BlissfulMonitor, private Monitor::Observer { +public: + BlissfulMonitorImpl(std::shared_ptr<Looper> const& looper, + std::unique_ptr<Monitor>&& monitor) + : looper_(looper), monitor_(std::move(monitor)), timer_(0) { + monitor_->add_observer(this); + } + + ~BlissfulMonitorImpl() override { + if (timer_) { + looper_->cancel(timer_); + } + monitor_->remove_observer(this); + } + + void connect(std::string const& netname, std::string const& scheduler, + uint16_t port) override { + monitor_->connect(netname, scheduler, port); + } + void disconnect() override { + monitor_->disconnect(); + } + size_t machines() const override { + return monitor_->machines(); + } + uint32_t id(size_t index) const override { + return monitor_->id(index); + } + Machine machine_at(size_t index) const override { + return monitor_->machine_at(index); + } + Machine machine(uint32_t id) const override { + return monitor_->machine(id); + } + + void add_observer(Monitor::Observer* observer) override { + observers_.add(observer); + } + void remove_observer(Monitor::Observer* observer) override { + observers_.remove(observer); + } + +private: + struct Job { + uint32_t source; + uint32_t target; + double start; + + Job(uint32_t source, uint32_t target) + : source(source), target(target), start(clk::steady()) { + } + }; + + void state(Monitor* monitor, State state) override { + assert(monitor == monitor_.get()); + auto observer = observers_.first(); + while (observer) { + observer->state(this, state); + observer = observers_.next(); + } + } + + void added_machine(Monitor* monitor, uint32_t id) override { + assert(monitor == monitor_.get()); + known_machines_.insert(id); + auto observer = observers_.first(); + while (observer) { + observer->added_machine(this, id); + observer = observers_.next(); + } + } + + void updated_machine(Monitor* monitor, uint32_t id) override { + assert(monitor == monitor_.get()); + auto observer = observers_.first(); + while (observer) { + observer->updated_machine(this, id); + observer = observers_.next(); + } + } + + void removed_machine(Monitor* monitor, uint32_t id) override { + assert(monitor == monitor_.get()); + known_machines_.erase(id); + + auto it = active_jobs_.begin(); + while (it != active_jobs_.end()) { + if (it->target == id || it->source == id) { + // Remove active jobs linked to removed machine + notify_removed_job(it->source, it->target); + it = active_jobs_.erase(it); + } else { + ++it; + } + } + + auto observer = observers_.first(); + while (observer) { + observer->removed_machine(this, id); + observer = observers_.next(); + } + } + + void added_job(Monitor* monitor, uint32_t source_id, + uint32_t target_id) override { + assert(monitor == monitor_.get()); + if (known_machines_.count(source_id) == 0 + || (source_id != target_id && known_machines_.count(target_id) == 0)) { + // Unknown machine listed for job, ignore + return; + } + if (!timer_) { + timer_ = looper_->schedule(JOB_EXPIRE_S, + std::bind(&BlissfulMonitorImpl::check_jobs, this, + std::placeholders::_1, + std::placeholders::_2)); + } + active_jobs_.emplace_back(source_id, target_id); + auto observer = observers_.first(); + while (observer) { + observer->added_job(this, source_id, target_id); + observer = observers_.next(); + } + } + + void removed_job(Monitor* monitor, uint32_t source_id, + uint32_t target_id) override { + assert(monitor == monitor_.get()); + + for (auto it = active_jobs_.begin(); it != active_jobs_.end(); ++it) { + if (it->target == target_id && it->source == source_id) { + notify_removed_job(it->source, it->target); + active_jobs_.erase(it); + return; + } + } + + // Unknown job removed + } + + void notify_removed_job(uint32_t source_id, uint32_t target_id) { + auto observer = observers_.first(); + while (observer) { + observer->removed_job(this, source_id, target_id); + observer = observers_.next(); + } + } + + void check_jobs(Looper* looper, uint32_t id) { + assert(looper_.get() == looper); + assert(timer_ == id); + + auto const expired = clk::steady() - JOB_EXPIRE_S; + auto next = 0.0; + + auto it = active_jobs_.begin(); + while (it != active_jobs_.end()) { + if (it->start <= expired) { + notify_removed_job(it->source, it->target); + it = active_jobs_.erase(it); + } else { + next = it->start; + break; + } + } + + if (active_jobs_.empty()) { + timer_ = 0; + } else { + timer_ = looper_->schedule(next - expired, + std::bind(&BlissfulMonitorImpl::check_jobs, this, + std::placeholders::_1, + std::placeholders::_2)); + } + } + + std::shared_ptr<Looper> looper_; + std::unique_ptr<Monitor> monitor_; + Observers<Monitor::Observer> observers_; + std::unordered_set<uint32_t> known_machines_; + std::vector<Job> active_jobs_; + + uint32_t timer_; +}; + +} // namespace + +// static +std::unique_ptr<Monitor> BlissfulMonitor::create( + std::shared_ptr<Looper> const& looper, std::unique_ptr<Monitor>&& monitor) { + return std::make_unique<BlissfulMonitorImpl>(looper, std::move(monitor)); +} + + diff --git a/src/blissful_monitor.hh b/src/blissful_monitor.hh new file mode 100644 index 0000000..d4a1d28 --- /dev/null +++ b/src/blissful_monitor.hh @@ -0,0 +1,16 @@ +#ifndef BLISSFUL_MONITOR_HH +#define BLISSFUL_MONITOR_HH + +#include "monitor.hh" + +/** + * A monitor that will try to hide problems, like never finished requests + * or jobs and things like that. + */ +class BlissfulMonitor : public Monitor { +public: + static std::unique_ptr<Monitor> create(std::shared_ptr<Looper> const& looper, + std::unique_ptr<Monitor>&& monitor); +}; + +#endif // BLISSFUL_MONITOR_HH diff --git a/src/main.cc b/src/main.cc index fdf2e61..62620c8 100644 --- a/src/main.cc +++ b/src/main.cc @@ -15,6 +15,7 @@ #include "animation.hh" #include "animator.hh" #include "args.hh" +#include "blissful_monitor.hh" #include "fake_monitor.hh" #include "io.hh" #include "monitor.hh" @@ -222,11 +223,13 @@ public: } void connect(Args const* args) { + std::unique_ptr<Monitor> monitor; #if FAKE_MONITOR - monitor_ = FakeMonitor::create(looper_); + monitor = FakeMonitor::create(looper_); #else - monitor_ = Monitor::create(looper_); + monitor = Monitor::create(looper_); #endif + monitor_ = BlissfulMonitor::create(looper_, std::move(monitor)); monitor_->add_observer(this); monitor_->connect(args->arg("network", ""), args->arg("scheduler", "")); |
