summaryrefslogtreecommitdiff
path: root/src/fake_monitor.cc
diff options
context:
space:
mode:
authorJoel Klinghed <the_jk@yahoo.com>2017-09-26 20:09:31 +0200
committerJoel Klinghed <the_jk@yahoo.com>2017-09-26 20:09:31 +0200
commitc85b624d28564a6f785b25000e2b7825592a919d (patch)
tree647b756c824b470b35f1371eb869e9534ed6c1bb /src/fake_monitor.cc
Initial commit
Diffstat (limited to 'src/fake_monitor.cc')
-rw-r--r--src/fake_monitor.cc191
1 files changed, 191 insertions, 0 deletions
diff --git a/src/fake_monitor.cc b/src/fake_monitor.cc
new file mode 100644
index 0000000..ebdc79b
--- /dev/null
+++ b/src/fake_monitor.cc
@@ -0,0 +1,191 @@
+#include "common.hh"
+
+#include <vector>
+#include <unordered_map>
+
+#include "fake_monitor.hh"
+#include "looper.hh"
+
+namespace {
+
+class FakeMonitorImpl : public Monitor {
+public:
+ explicit FakeMonitorImpl(std::shared_ptr<Looper> const& looper)
+ : looper_(looper), timer_(0), host_id_(0), job_id_(0), max_jobs_(0) {
+ }
+
+ ~FakeMonitorImpl() override {
+ disconnect();
+ }
+
+ void connect(std::string const&, std::string const&,
+ uint16_t) override {
+ disconnect();
+
+ timer_ = looper_->schedule(
+ (rand() % 500) / 1000.0,
+ std::bind(&FakeMonitorImpl::fake_connect, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+ }
+
+ void disconnect() override {
+ if (timer_) {
+ looper_->cancel(timer_);
+ timer_ = 0;
+ }
+ machines_.clear();
+ for (auto* observer : observers_) {
+ observer->state(this, SEARCHING);
+ }
+ }
+
+ size_t machines() const override {
+ return machines_.size();
+ }
+
+ uint32_t id(size_t index) const override {
+ auto it = machines_.begin();
+ while (index--) ++it;
+ if (it == machines_.end()) {
+ assert(false);
+ return static_cast<uint32_t>(-1);
+ }
+ return it->first;
+ }
+
+ Machine machine_at(size_t index) const override {
+ auto it = machines_.begin();
+ while (index--) ++it;
+ if (it == machines_.end()) {
+ assert(false);
+ return Machine();
+ }
+ return it->second.data;
+ }
+
+ Machine machine(uint32_t id) const override {
+ auto it = machines_.find(id);
+ if (it == machines_.end()) {
+ assert(false);
+ return Machine();
+ }
+ return it->second.data;
+ }
+
+ void add_observer(Observer* observer) override {
+ assert(observer);
+ observers_.push_back(observer);
+ }
+
+private:
+ struct Job {
+ uint32_t const host;
+ uint32_t const worker;
+
+ Job(uint32_t host, uint32_t worker)
+ : host(host), worker(worker) {
+ }
+ };
+ struct Entry {
+ Machine data;
+ unsigned active;
+ };
+
+ void fake_connect(Looper*, uint32_t) {
+ timer_ = 0;
+
+ for (auto* observer : observers_) {
+ observer->state(this, CONNECTED);
+ }
+
+ add_machine("alice", 4);
+ add_machine("bob", 10);
+ add_machine("DeuX", 22);
+
+ schedule_jobs();
+ }
+
+ void add_machine(std::string const& name, unsigned max_jobs) {
+ auto id = ++host_id_;
+ auto& entry = machines_[id];
+ entry.data.name = name;
+ entry.data.max_jobs = max_jobs;
+ entry.active = 0;
+ max_jobs_ += max_jobs;
+
+ for (auto* observer : observers_) {
+ observer->added_machine(this, id);
+ }
+ }
+
+ void schedule_jobs() {
+ assert(timer_ == 0);
+ timer_ = looper_->schedule(
+ (rand() % 2000) / 1000.0,
+ std::bind(&FakeMonitorImpl::fiddle_jobs, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+ }
+
+ void fiddle_jobs(Looper*, uint32_t) {
+ timer_ = 0;
+
+ auto remove = !jobs_.empty() ? rand() % jobs_.size() : 0;
+ while (remove--) {
+ auto it = jobs_.begin();
+ auto job = it->second;
+ jobs_.erase(it);
+ machines_[job.worker].active--;
+ for (auto* observer : observers_) {
+ observer->removed_job(this, job.host, job.worker);
+ }
+ }
+
+ auto avail = (max_jobs_ * 7) / 8 - jobs_.size();
+ auto add = avail ? rand() % avail : 0;
+ while (add--) {
+ auto id = ++job_id_;
+ while (jobs_.count(id)) {
+ id = ++job_id_;
+ }
+ auto host = rand() % machines_.size();
+ auto target = rand() % machines_.size();
+ do {
+ auto tgtid = this->id(target);
+ auto& entry = machines_[tgtid];
+ if (entry.active < entry.data.max_jobs) {
+ entry.active++;
+ add_job(id, host != target ? this->id(host) : tgtid, tgtid);
+ break;
+ }
+ target = rand() % machines_.size();
+ } while (true);
+ }
+
+ schedule_jobs();
+ }
+
+ void add_job(uint32_t job_id, uint32_t host, uint32_t worker) {
+ jobs_.insert(std::make_pair(job_id, Job(host, worker)));
+ for (auto* observer : observers_) {
+ observer->added_job(this, host, worker);
+ }
+ }
+
+ std::shared_ptr<Looper> looper_;
+ uint32_t timer_;
+ uint32_t host_id_;
+ uint32_t job_id_;
+ size_t max_jobs_;
+ std::unordered_map<uint32_t, Entry> machines_;
+ std::vector<Observer*> observers_;
+ std::unordered_map<uint32_t, Job> jobs_;
+};
+
+} // namespace
+
+// static
+Monitor* FakeMonitor::create(std::shared_ptr<Looper> const& looper) {
+ return new FakeMonitorImpl(looper);
+}