diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/fake_monitor.cc | 31 | ||||
| -rw-r--r-- | src/monitor.cc | 38 | ||||
| -rw-r--r-- | src/monitor.hh | 1 | ||||
| -rw-r--r-- | src/observers.hh | 71 |
4 files changed, 122 insertions, 19 deletions
diff --git a/src/fake_monitor.cc b/src/fake_monitor.cc index ebdc79b..b05e521 100644 --- a/src/fake_monitor.cc +++ b/src/fake_monitor.cc @@ -1,10 +1,10 @@ #include "common.hh" -#include <vector> #include <unordered_map> #include "fake_monitor.hh" #include "looper.hh" +#include "observers.hh" namespace { @@ -35,8 +35,10 @@ public: timer_ = 0; } machines_.clear(); - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->state(this, SEARCHING); + observer = observers_.next(); } } @@ -74,8 +76,11 @@ public: } void add_observer(Observer* observer) override { - assert(observer); - observers_.push_back(observer); + observers_.add(observer); + } + + void remove_observer(Observer* observer) override { + observers_.remove(observer); } private: @@ -95,8 +100,10 @@ private: void fake_connect(Looper*, uint32_t) { timer_ = 0; - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->state(this, CONNECTED); + observer = observers_.next(); } add_machine("alice", 4); @@ -114,8 +121,10 @@ private: entry.active = 0; max_jobs_ += max_jobs; - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->added_machine(this, id); + observer = observers_.next(); } } @@ -137,8 +146,10 @@ private: auto job = it->second; jobs_.erase(it); machines_[job.worker].active--; - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->removed_job(this, job.host, job.worker); + observer = observers_.next(); } } @@ -168,8 +179,10 @@ private: void add_job(uint32_t job_id, uint32_t host, uint32_t worker) { jobs_.insert(std::make_pair(job_id, Job(host, worker))); - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->added_job(this, host, worker); + observer = observers_.next(); } } @@ -179,7 +192,7 @@ private: uint32_t job_id_; size_t max_jobs_; std::unordered_map<uint32_t, Entry> machines_; - std::vector<Observer*> observers_; + Observers<Observer> observers_; std::unordered_map<uint32_t, Job> jobs_; }; diff --git a/src/monitor.cc b/src/monitor.cc index e5a231d..6194268 100644 --- a/src/monitor.cc +++ b/src/monitor.cc @@ -3,10 +3,10 @@ #include <icecc/comm.h> #include <iostream> #include <unordered_map> -#include <vector> #include "looper.hh" #include "monitor.hh" +#include "observers.hh" namespace { @@ -75,7 +75,11 @@ public: } void add_observer(Observer* observer) override { - observers_.push_back(observer); + observers_.add(observer); + } + + void remove_observer(Observer* observer) override { + observers_.remove(observer); } private: @@ -190,8 +194,10 @@ private: return; } - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->state(this, CONNECTED); + observer = observers_.next(); } } @@ -201,8 +207,10 @@ private: looper_->remove(channel_->fd); channel_.reset(); - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->state(this, SEARCHING); + observer = observers_.next(); } } @@ -286,8 +294,10 @@ private: auto& job = active_jobs_[job_id]; job.source = source; job.target = target; - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->added_job(this, job.source, job.target); + observer = observers_.next(); } } @@ -300,8 +310,10 @@ private: auto source = it->second.source; auto target = it->second.target; active_jobs_.erase(it); - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->removed_job(this, source, target); + observer = observers_.next(); } } @@ -314,18 +326,24 @@ private: if (machine.name.empty()) { machines_.erase(msg->hostid); if (known) { - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->removed_machine(this, msg->hostid); + observer = observers_.next(); } } } else { if (known) { - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->updated_machine(this, msg->hostid); + observer = observers_.next(); } } else { - for (auto* observer : observers_) { + auto observer = observers_.first(); + while (observer) { observer->added_machine(this, msg->hostid); + observer = observers_.next(); } } } @@ -379,7 +397,7 @@ private: } std::shared_ptr<Looper> looper_; - std::vector<Observer*> observers_; + Observers<Observer> observers_; std::unique_ptr<MsgChannel> channel_; std::unique_ptr<DiscoverSched> discover_; uint32_t discover_timer_; diff --git a/src/monitor.hh b/src/monitor.hh index 99e172b..b7ddbea 100644 --- a/src/monitor.hh +++ b/src/monitor.hh @@ -52,6 +52,7 @@ public: virtual Machine machine(uint32_t id) const = 0; virtual void add_observer(Observer* observer) = 0; + virtual void remove_observer(Observer* observer) = 0; protected: Monitor() {} diff --git a/src/observers.hh b/src/observers.hh new file mode 100644 index 0000000..3e56e6b --- /dev/null +++ b/src/observers.hh @@ -0,0 +1,71 @@ +#ifndef OBSERVERS_HH +#define OBSERVERS_HH + +#include <memory> +#include <vector> + +template<typename T> +class Observers { +public: + Observers() + : current_(0), protected_(0), removed_(0) { + } + + void add(T* observer) { + if (!observer) return; + for (auto it = observers_.begin(); it != observers_.end(); ++it) { + if (*it == observer) return; + } + observers_.push_back(observer); + } + + void remove(T* observer) { + if (!observer) return; + size_t index = 0; + for (; index < observers_.size(); ++index) { + if (observers_[index] == observer) { + if (index < protected_) { + observers_[index] = nullptr; + ++removed_; + } else { + observers_.erase(observers_.begin() + index); + } + break; + } + } + } + + T* first() { + if (protected_) return nullptr; + if (observers_.empty()) return nullptr; + protected_ = observers_.size(); + return observers_[current_]; + } + + T* next() { + if (current_ >= protected_) return nullptr; + ++current_; + if (current_ >= protected_) { + if (removed_) { + for (size_t i = protected_; i > 0; --i) { + if (!observers_[i - 1]) { + observers_.erase(observers_.begin() + i - 1); + if (--removed_ == 0) break; + } + } + } + protected_ = current_ = 0; + return nullptr; + } + auto ret = observers_[current_]; + return ret ? ret : next(); + } + +private: + size_t current_; + size_t protected_; + size_t removed_; + std::vector<T*> observers_; +}; + +#endif // OBSERVERS_HH |
