summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJoel Klinghed <the_jk@opera.com>2017-10-31 16:35:11 +0100
committerJoel Klinghed <the_jk@opera.com>2017-10-31 16:35:11 +0100
commitf4602c1bb6f9fe4090ace211c5013f71653bcd4c (patch)
treeecda369875a1411e6902e4425d58bca5c8b01826 /src
parent38140372d8c8dd32267943d5d79b2ce2c0a032fb (diff)
Add a blissful monitor proxy
It takes a given monitor and "fixes" calls from it by expiring "dead" jobs (jobs older than 10min) and by sanitychecking that hosts exist and such
Diffstat (limited to 'src')
-rw-r--r--src/blissful_monitor.cc207
-rw-r--r--src/blissful_monitor.hh16
-rw-r--r--src/main.cc7
3 files changed, 228 insertions, 2 deletions
diff --git a/src/blissful_monitor.cc b/src/blissful_monitor.cc
new file mode 100644
index 0000000..c17bc10
--- /dev/null
+++ b/src/blissful_monitor.cc
@@ -0,0 +1,207 @@
+#include "common.hh"
+
+#include <unordered_set>
+
+#include "blissful_monitor.hh"
+#include "clock.hh"
+#include "looper.hh"
+#include "observers.hh"
+
+namespace {
+
+static const double JOB_EXPIRE_S = 10 * 60.0; // 10 min
+
+class BlissfulMonitorImpl : public BlissfulMonitor, private Monitor::Observer {
+public:
+ BlissfulMonitorImpl(std::shared_ptr<Looper> const& looper,
+ std::unique_ptr<Monitor>&& monitor)
+ : looper_(looper), monitor_(std::move(monitor)), timer_(0) {
+ monitor_->add_observer(this);
+ }
+
+ ~BlissfulMonitorImpl() override {
+ if (timer_) {
+ looper_->cancel(timer_);
+ }
+ monitor_->remove_observer(this);
+ }
+
+ void connect(std::string const& netname, std::string const& scheduler,
+ uint16_t port) override {
+ monitor_->connect(netname, scheduler, port);
+ }
+ void disconnect() override {
+ monitor_->disconnect();
+ }
+ size_t machines() const override {
+ return monitor_->machines();
+ }
+ uint32_t id(size_t index) const override {
+ return monitor_->id(index);
+ }
+ Machine machine_at(size_t index) const override {
+ return monitor_->machine_at(index);
+ }
+ Machine machine(uint32_t id) const override {
+ return monitor_->machine(id);
+ }
+
+ void add_observer(Monitor::Observer* observer) override {
+ observers_.add(observer);
+ }
+ void remove_observer(Monitor::Observer* observer) override {
+ observers_.remove(observer);
+ }
+
+private:
+ struct Job {
+ uint32_t source;
+ uint32_t target;
+ double start;
+
+ Job(uint32_t source, uint32_t target)
+ : source(source), target(target), start(clk::steady()) {
+ }
+ };
+
+ void state(Monitor* monitor, State state) override {
+ assert(monitor == monitor_.get());
+ auto observer = observers_.first();
+ while (observer) {
+ observer->state(this, state);
+ observer = observers_.next();
+ }
+ }
+
+ void added_machine(Monitor* monitor, uint32_t id) override {
+ assert(monitor == monitor_.get());
+ known_machines_.insert(id);
+ auto observer = observers_.first();
+ while (observer) {
+ observer->added_machine(this, id);
+ observer = observers_.next();
+ }
+ }
+
+ void updated_machine(Monitor* monitor, uint32_t id) override {
+ assert(monitor == monitor_.get());
+ auto observer = observers_.first();
+ while (observer) {
+ observer->updated_machine(this, id);
+ observer = observers_.next();
+ }
+ }
+
+ void removed_machine(Monitor* monitor, uint32_t id) override {
+ assert(monitor == monitor_.get());
+ known_machines_.erase(id);
+
+ auto it = active_jobs_.begin();
+ while (it != active_jobs_.end()) {
+ if (it->target == id || it->source == id) {
+ // Remove active jobs linked to removed machine
+ notify_removed_job(it->source, it->target);
+ it = active_jobs_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ auto observer = observers_.first();
+ while (observer) {
+ observer->removed_machine(this, id);
+ observer = observers_.next();
+ }
+ }
+
+ void added_job(Monitor* monitor, uint32_t source_id,
+ uint32_t target_id) override {
+ assert(monitor == monitor_.get());
+ if (known_machines_.count(source_id) == 0
+ || (source_id != target_id && known_machines_.count(target_id) == 0)) {
+ // Unknown machine listed for job, ignore
+ return;
+ }
+ if (!timer_) {
+ timer_ = looper_->schedule(JOB_EXPIRE_S,
+ std::bind(&BlissfulMonitorImpl::check_jobs, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+ }
+ active_jobs_.emplace_back(source_id, target_id);
+ auto observer = observers_.first();
+ while (observer) {
+ observer->added_job(this, source_id, target_id);
+ observer = observers_.next();
+ }
+ }
+
+ void removed_job(Monitor* monitor, uint32_t source_id,
+ uint32_t target_id) override {
+ assert(monitor == monitor_.get());
+
+ for (auto it = active_jobs_.begin(); it != active_jobs_.end(); ++it) {
+ if (it->target == target_id && it->source == source_id) {
+ notify_removed_job(it->source, it->target);
+ active_jobs_.erase(it);
+ return;
+ }
+ }
+
+ // Unknown job removed
+ }
+
+ void notify_removed_job(uint32_t source_id, uint32_t target_id) {
+ auto observer = observers_.first();
+ while (observer) {
+ observer->removed_job(this, source_id, target_id);
+ observer = observers_.next();
+ }
+ }
+
+ void check_jobs(Looper* looper, uint32_t id) {
+ assert(looper_.get() == looper);
+ assert(timer_ == id);
+
+ auto const expired = clk::steady() - JOB_EXPIRE_S;
+ auto next = 0.0;
+
+ auto it = active_jobs_.begin();
+ while (it != active_jobs_.end()) {
+ if (it->start <= expired) {
+ notify_removed_job(it->source, it->target);
+ it = active_jobs_.erase(it);
+ } else {
+ next = it->start;
+ break;
+ }
+ }
+
+ if (active_jobs_.empty()) {
+ timer_ = 0;
+ } else {
+ timer_ = looper_->schedule(next - expired,
+ std::bind(&BlissfulMonitorImpl::check_jobs, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+ }
+ }
+
+ std::shared_ptr<Looper> looper_;
+ std::unique_ptr<Monitor> monitor_;
+ Observers<Monitor::Observer> observers_;
+ std::unordered_set<uint32_t> known_machines_;
+ std::vector<Job> active_jobs_;
+
+ uint32_t timer_;
+};
+
+} // namespace
+
+// static
+std::unique_ptr<Monitor> BlissfulMonitor::create(
+ std::shared_ptr<Looper> const& looper, std::unique_ptr<Monitor>&& monitor) {
+ return std::make_unique<BlissfulMonitorImpl>(looper, std::move(monitor));
+}
+
+
diff --git a/src/blissful_monitor.hh b/src/blissful_monitor.hh
new file mode 100644
index 0000000..d4a1d28
--- /dev/null
+++ b/src/blissful_monitor.hh
@@ -0,0 +1,16 @@
+#ifndef BLISSFUL_MONITOR_HH
+#define BLISSFUL_MONITOR_HH
+
+#include "monitor.hh"
+
+/**
+ * A monitor that will try to hide problems, like never finished requests
+ * or jobs and things like that.
+ */
+class BlissfulMonitor : public Monitor {
+public:
+ static std::unique_ptr<Monitor> create(std::shared_ptr<Looper> const& looper,
+ std::unique_ptr<Monitor>&& monitor);
+};
+
+#endif // BLISSFUL_MONITOR_HH
diff --git a/src/main.cc b/src/main.cc
index fdf2e61..62620c8 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -15,6 +15,7 @@
#include "animation.hh"
#include "animator.hh"
#include "args.hh"
+#include "blissful_monitor.hh"
#include "fake_monitor.hh"
#include "io.hh"
#include "monitor.hh"
@@ -222,11 +223,13 @@ public:
}
void connect(Args const* args) {
+ std::unique_ptr<Monitor> monitor;
#if FAKE_MONITOR
- monitor_ = FakeMonitor::create(looper_);
+ monitor = FakeMonitor::create(looper_);
#else
- monitor_ = Monitor::create(looper_);
+ monitor = Monitor::create(looper_);
#endif
+ monitor_ = BlissfulMonitor::create(looper_, std::move(monitor));
monitor_->add_observer(this);
monitor_->connect(args->arg("network", ""),
args->arg("scheduler", ""));