#include "common.hh" #include #include "fake_monitor.hh" #include "looper.hh" #include "observers.hh" namespace { class FakeMonitorImpl : public Monitor { public: explicit FakeMonitorImpl(std::shared_ptr const& looper) : looper_(looper), timer_(0), host_id_(0), job_id_(0), max_jobs_(0) { } ~FakeMonitorImpl() override { disconnect(); } void connect(std::string const&, std::string const&, uint16_t) override { disconnect(); timer_ = looper_->schedule( (rand() % 500) / 1000.0, std::bind(&FakeMonitorImpl::fake_connect, this, std::placeholders::_1, std::placeholders::_2)); } void disconnect() override { if (timer_) { looper_->cancel(timer_); timer_ = 0; } machines_.clear(); auto observer = observers_.first(); while (observer) { observer->state(this, SEARCHING); observer = observers_.next(); } } size_t machines() const override { return machines_.size(); } uint32_t id(size_t index) const override { auto it = machines_.begin(); while (index--) ++it; if (it == machines_.end()) { assert(false); return static_cast(-1); } return it->first; } Machine machine_at(size_t index) const override { auto it = machines_.begin(); while (index--) ++it; if (it == machines_.end()) { assert(false); return Machine(); } return it->second.data; } Machine machine(uint32_t id) const override { auto it = machines_.find(id); if (it == machines_.end()) { assert(false); return Machine(); } return it->second.data; } void add_observer(Observer* observer) override { observers_.add(observer); } void remove_observer(Observer* observer) override { observers_.remove(observer); } private: struct Job { uint32_t const host; uint32_t const worker; Job(uint32_t host, uint32_t worker) : host(host), worker(worker) { } }; struct Entry { Machine data; unsigned active; }; void fake_connect(Looper*, uint32_t) { timer_ = 0; auto observer = observers_.first(); while (observer) { observer->state(this, CONNECTED); observer = observers_.next(); } add_machine("alice", 4); add_machine("bob", 10); add_machine("DeuX", 22); add_machine("machine1", 4); add_machine("machine2", 4); add_machine("machine3", 4); add_machine("machine4", 4); add_machine("machine5", 4); add_machine("machine6", 4); add_machine("machine7", 4); add_machine("machine8", 4); schedule_jobs(); } void add_machine(std::string const& name, unsigned max_jobs) { auto id = ++host_id_; auto& entry = machines_[id]; entry.data.name = name; entry.data.max_jobs = max_jobs; entry.active = 0; max_jobs_ += max_jobs; auto observer = observers_.first(); while (observer) { observer->added_machine(this, id); observer = observers_.next(); } } void schedule_jobs() { assert(timer_ == 0); timer_ = looper_->schedule( (rand() % 2000) / 1000.0, std::bind(&FakeMonitorImpl::fiddle_jobs, this, std::placeholders::_1, std::placeholders::_2)); } void fiddle_jobs(Looper*, uint32_t) { timer_ = 0; auto remove = !jobs_.empty() ? rand() % jobs_.size() : 0; while (remove--) { auto it = jobs_.begin(); auto job = it->second; jobs_.erase(it); machines_[job.worker].active--; auto observer = observers_.first(); while (observer) { observer->removed_job(this, job.host, job.worker); observer = observers_.next(); } } auto avail = (max_jobs_ * 7) / 8 - jobs_.size(); auto add = avail ? rand() % avail : 0; while (add--) { auto id = ++job_id_; while (jobs_.count(id)) { id = ++job_id_; } auto host = rand() % machines_.size(); auto target = rand() % machines_.size(); do { auto tgtid = this->id(target); auto& entry = machines_[tgtid]; if (entry.active < entry.data.max_jobs) { entry.active++; add_job(id, host != target ? this->id(host) : tgtid, tgtid); break; } target = rand() % machines_.size(); } while (true); } schedule_jobs(); } void add_job(uint32_t job_id, uint32_t host, uint32_t worker) { jobs_.insert(std::make_pair(job_id, Job(host, worker))); auto observer = observers_.first(); while (observer) { observer->added_job(this, host, worker); observer = observers_.next(); } } std::shared_ptr looper_; uint32_t timer_; uint32_t host_id_; uint32_t job_id_; size_t max_jobs_; std::unordered_map machines_; Observers observers_; std::unordered_map jobs_; }; } // namespace // static std::unique_ptr FakeMonitor::create( std::shared_ptr const& looper) { return std::make_unique(looper); }