diff options
Diffstat (limited to 'src/fake_monitor.cc')
| -rw-r--r-- | src/fake_monitor.cc | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/src/fake_monitor.cc b/src/fake_monitor.cc new file mode 100644 index 0000000..ebdc79b --- /dev/null +++ b/src/fake_monitor.cc @@ -0,0 +1,191 @@ +#include "common.hh" + +#include <vector> +#include <unordered_map> + +#include "fake_monitor.hh" +#include "looper.hh" + +namespace { + +class FakeMonitorImpl : public Monitor { +public: + explicit FakeMonitorImpl(std::shared_ptr<Looper> const& looper) + : looper_(looper), timer_(0), host_id_(0), job_id_(0), max_jobs_(0) { + } + + ~FakeMonitorImpl() override { + disconnect(); + } + + void connect(std::string const&, std::string const&, + uint16_t) override { + disconnect(); + + timer_ = looper_->schedule( + (rand() % 500) / 1000.0, + std::bind(&FakeMonitorImpl::fake_connect, this, + std::placeholders::_1, + std::placeholders::_2)); + } + + void disconnect() override { + if (timer_) { + looper_->cancel(timer_); + timer_ = 0; + } + machines_.clear(); + for (auto* observer : observers_) { + observer->state(this, SEARCHING); + } + } + + size_t machines() const override { + return machines_.size(); + } + + uint32_t id(size_t index) const override { + auto it = machines_.begin(); + while (index--) ++it; + if (it == machines_.end()) { + assert(false); + return static_cast<uint32_t>(-1); + } + return it->first; + } + + Machine machine_at(size_t index) const override { + auto it = machines_.begin(); + while (index--) ++it; + if (it == machines_.end()) { + assert(false); + return Machine(); + } + return it->second.data; + } + + Machine machine(uint32_t id) const override { + auto it = machines_.find(id); + if (it == machines_.end()) { + assert(false); + return Machine(); + } + return it->second.data; + } + + void add_observer(Observer* observer) override { + assert(observer); + observers_.push_back(observer); + } + +private: + struct Job { + uint32_t const host; + uint32_t const worker; + + Job(uint32_t host, uint32_t worker) + : host(host), worker(worker) { + } + }; + struct Entry { + Machine data; + unsigned active; + }; + + void fake_connect(Looper*, uint32_t) { + timer_ = 0; + + for (auto* observer : observers_) { + observer->state(this, CONNECTED); + } + + add_machine("alice", 4); + add_machine("bob", 10); + add_machine("DeuX", 22); + + schedule_jobs(); + } + + void add_machine(std::string const& name, unsigned max_jobs) { + auto id = ++host_id_; + auto& entry = machines_[id]; + entry.data.name = name; + entry.data.max_jobs = max_jobs; + entry.active = 0; + max_jobs_ += max_jobs; + + for (auto* observer : observers_) { + observer->added_machine(this, id); + } + } + + void schedule_jobs() { + assert(timer_ == 0); + timer_ = looper_->schedule( + (rand() % 2000) / 1000.0, + std::bind(&FakeMonitorImpl::fiddle_jobs, this, + std::placeholders::_1, + std::placeholders::_2)); + } + + void fiddle_jobs(Looper*, uint32_t) { + timer_ = 0; + + auto remove = !jobs_.empty() ? rand() % jobs_.size() : 0; + while (remove--) { + auto it = jobs_.begin(); + auto job = it->second; + jobs_.erase(it); + machines_[job.worker].active--; + for (auto* observer : observers_) { + observer->removed_job(this, job.host, job.worker); + } + } + + auto avail = (max_jobs_ * 7) / 8 - jobs_.size(); + auto add = avail ? rand() % avail : 0; + while (add--) { + auto id = ++job_id_; + while (jobs_.count(id)) { + id = ++job_id_; + } + auto host = rand() % machines_.size(); + auto target = rand() % machines_.size(); + do { + auto tgtid = this->id(target); + auto& entry = machines_[tgtid]; + if (entry.active < entry.data.max_jobs) { + entry.active++; + add_job(id, host != target ? this->id(host) : tgtid, tgtid); + break; + } + target = rand() % machines_.size(); + } while (true); + } + + schedule_jobs(); + } + + void add_job(uint32_t job_id, uint32_t host, uint32_t worker) { + jobs_.insert(std::make_pair(job_id, Job(host, worker))); + for (auto* observer : observers_) { + observer->added_job(this, host, worker); + } + } + + std::shared_ptr<Looper> looper_; + uint32_t timer_; + uint32_t host_id_; + uint32_t job_id_; + size_t max_jobs_; + std::unordered_map<uint32_t, Entry> machines_; + std::vector<Observer*> observers_; + std::unordered_map<uint32_t, Job> jobs_; +}; + +} // namespace + +// static +Monitor* FakeMonitor::create(std::shared_ptr<Looper> const& looper) { + return new FakeMonitorImpl(looper); +} |
