#include "common.hh" #include #include "fake_monitor.hh" #include "looper.hh" #include "observers.hh" namespace { class FakeMonitorImpl : public Monitor { public: explicit FakeMonitorImpl(std::shared_ptr const& looper) : looper_(looper), timer_(0), host_id_(0), job_id_(0), max_jobs_(0) { } ~FakeMonitorImpl() override { disconnect(); } void connect(std::string const&, std::string const&, uint16_t) override { disconnect(); timer_ = looper_->schedule( (rand() % 500) / 1000.0, std::bind(&FakeMonitorImpl::fake_connect, this, std::placeholders::_1, std::placeholders::_2)); } void disconnect() override { if (timer_) { looper_->cancel(timer_); timer_ = 0; } machines_.clear(); auto observer = observers_.first(); while (observer) { observer->state(this, SEARCHING); observer = observers_.next(); } } size_t machines() const override { return machines_.size(); } Machine machine(uint32_t id) const override { auto it = machines_.find(id); if (it == machines_.end()) { assert(false); return Machine(); } return it->second.data; } void add_observer(Observer* observer) override { observers_.add(observer); } void remove_observer(Observer* observer) override { observers_.remove(observer); } #if FAKE_MONITOR void toggle_fakes() override { if (timer_) { looper_->cancel(timer_); timer_ = 0; #if 0 remove_jobs((jobs_.size() * 3) / 4); #else auto index = rand() % machines_.size(); auto it = machines_.begin(); while (index--) ++it; auto id = it->first; machines_.erase(it); auto jit = jobs_.begin(); while (jit != jobs_.end()) { if (jit->second.worker == id) { jit = jobs_.erase(jit); } else { ++jit; } } auto observer = observers_.first(); while (observer) { observer->removed_machine(this, id); observer = observers_.next(); } #endif } else { schedule_jobs(); } } #endif private: struct Job { uint32_t const host; uint32_t const worker; Job(uint32_t host, uint32_t worker) : host(host), worker(worker) { } }; struct Entry { Machine data; unsigned active; }; void fake_connect(Looper*, uint32_t) { timer_ = 0; auto observer = observers_.first(); while (observer) { observer->state(this, CONNECTED); observer = observers_.next(); } add_machine("alice", 4); add_machine("bob", 10); add_machine("DeuX", 22); add_machine("machine1", 4); add_machine("machine2", 4); add_machine("machine3", 4); add_machine("machine4", 4); add_machine("machine5", 4); add_machine("machine6", 4); add_machine("machine7", 4); add_machine("machine8", 4); schedule_jobs(); } void add_machine(std::string const& name, unsigned max_jobs) { auto id = ++host_id_; auto& entry = machines_[id]; entry.data.name = name; entry.data.max_jobs = max_jobs; entry.active = 0; max_jobs_ += max_jobs; auto observer = observers_.first(); while (observer) { observer->added_machine(this, id); observer = observers_.next(); } } void schedule_jobs() { assert(timer_ == 0); timer_ = looper_->schedule( (rand() % 2000) / 1000.0, std::bind(&FakeMonitorImpl::fiddle_jobs, this, std::placeholders::_1, std::placeholders::_2)); } void remove_jobs(size_t remove) { while (remove--) { auto it = jobs_.begin(); auto job = it->second; jobs_.erase(it); machines_[job.worker].active--; auto observer = observers_.first(); while (observer) { observer->removed_job(this, job.host, job.worker); observer = observers_.next(); } } } uint32_t id_at(size_t index) const { auto it = machines_.begin(); while (index--) ++it; return it->first; } void fiddle_jobs(Looper*, uint32_t) { timer_ = 0; remove_jobs(!jobs_.empty() ? rand() % jobs_.size() : 0); auto avail = (max_jobs_ * 7) / 8 - jobs_.size(); auto add = avail ? rand() % avail : 0; while (add--) { auto id = ++job_id_; while (jobs_.count(id)) { id = ++job_id_; } auto host = rand() % machines_.size(); auto target = rand() % machines_.size(); do { auto tgtid = id_at(target); auto& entry = machines_[tgtid]; if (entry.active < entry.data.max_jobs) { entry.active++; add_job(id, host != target ? id_at(host) : tgtid, tgtid); break; } target = rand() % machines_.size(); } while (true); } schedule_jobs(); } void add_job(uint32_t job_id, uint32_t host, uint32_t worker) { jobs_.insert(std::make_pair(job_id, Job(host, worker))); auto observer = observers_.first(); while (observer) { observer->added_job(this, host, worker); observer = observers_.next(); } } std::shared_ptr looper_; uint32_t timer_; uint32_t host_id_; uint32_t job_id_; size_t max_jobs_; std::unordered_map machines_; Observers observers_; std::unordered_map jobs_; }; } // namespace // static std::unique_ptr FakeMonitor::create( std::shared_ptr const& looper) { return std::make_unique(looper); }