summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Klinghed <the_jk@yahoo.com>2017-09-26 21:58:23 +0200
committerJoel Klinghed <the_jk@yahoo.com>2017-09-26 21:58:23 +0200
commit811e04305457108bc32d8895fd9bd274715d02fc (patch)
tree4bf4e89448ce3be4858404a7ea81c166ab2b8d37
parent37e7c66d65e00b9543003aba1f0f9b13a8dc151b (diff)
Use a modification safe observers list
-rw-r--r--src/fake_monitor.cc31
-rw-r--r--src/monitor.cc38
-rw-r--r--src/monitor.hh1
-rw-r--r--src/observers.hh71
4 files changed, 122 insertions, 19 deletions
diff --git a/src/fake_monitor.cc b/src/fake_monitor.cc
index ebdc79b..b05e521 100644
--- a/src/fake_monitor.cc
+++ b/src/fake_monitor.cc
@@ -1,10 +1,10 @@
#include "common.hh"
-#include <vector>
#include <unordered_map>
#include "fake_monitor.hh"
#include "looper.hh"
+#include "observers.hh"
namespace {
@@ -35,8 +35,10 @@ public:
timer_ = 0;
}
machines_.clear();
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->state(this, SEARCHING);
+ observer = observers_.next();
}
}
@@ -74,8 +76,11 @@ public:
}
void add_observer(Observer* observer) override {
- assert(observer);
- observers_.push_back(observer);
+ observers_.add(observer);
+ }
+
+ void remove_observer(Observer* observer) override {
+ observers_.remove(observer);
}
private:
@@ -95,8 +100,10 @@ private:
void fake_connect(Looper*, uint32_t) {
timer_ = 0;
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->state(this, CONNECTED);
+ observer = observers_.next();
}
add_machine("alice", 4);
@@ -114,8 +121,10 @@ private:
entry.active = 0;
max_jobs_ += max_jobs;
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->added_machine(this, id);
+ observer = observers_.next();
}
}
@@ -137,8 +146,10 @@ private:
auto job = it->second;
jobs_.erase(it);
machines_[job.worker].active--;
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->removed_job(this, job.host, job.worker);
+ observer = observers_.next();
}
}
@@ -168,8 +179,10 @@ private:
void add_job(uint32_t job_id, uint32_t host, uint32_t worker) {
jobs_.insert(std::make_pair(job_id, Job(host, worker)));
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->added_job(this, host, worker);
+ observer = observers_.next();
}
}
@@ -179,7 +192,7 @@ private:
uint32_t job_id_;
size_t max_jobs_;
std::unordered_map<uint32_t, Entry> machines_;
- std::vector<Observer*> observers_;
+ Observers<Observer> observers_;
std::unordered_map<uint32_t, Job> jobs_;
};
diff --git a/src/monitor.cc b/src/monitor.cc
index e5a231d..6194268 100644
--- a/src/monitor.cc
+++ b/src/monitor.cc
@@ -3,10 +3,10 @@
#include <icecc/comm.h>
#include <iostream>
#include <unordered_map>
-#include <vector>
#include "looper.hh"
#include "monitor.hh"
+#include "observers.hh"
namespace {
@@ -75,7 +75,11 @@ public:
}
void add_observer(Observer* observer) override {
- observers_.push_back(observer);
+ observers_.add(observer);
+ }
+
+ void remove_observer(Observer* observer) override {
+ observers_.remove(observer);
}
private:
@@ -190,8 +194,10 @@ private:
return;
}
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->state(this, CONNECTED);
+ observer = observers_.next();
}
}
@@ -201,8 +207,10 @@ private:
looper_->remove(channel_->fd);
channel_.reset();
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->state(this, SEARCHING);
+ observer = observers_.next();
}
}
@@ -286,8 +294,10 @@ private:
auto& job = active_jobs_[job_id];
job.source = source;
job.target = target;
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->added_job(this, job.source, job.target);
+ observer = observers_.next();
}
}
@@ -300,8 +310,10 @@ private:
auto source = it->second.source;
auto target = it->second.target;
active_jobs_.erase(it);
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->removed_job(this, source, target);
+ observer = observers_.next();
}
}
@@ -314,18 +326,24 @@ private:
if (machine.name.empty()) {
machines_.erase(msg->hostid);
if (known) {
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->removed_machine(this, msg->hostid);
+ observer = observers_.next();
}
}
} else {
if (known) {
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->updated_machine(this, msg->hostid);
+ observer = observers_.next();
}
} else {
- for (auto* observer : observers_) {
+ auto observer = observers_.first();
+ while (observer) {
observer->added_machine(this, msg->hostid);
+ observer = observers_.next();
}
}
}
@@ -379,7 +397,7 @@ private:
}
std::shared_ptr<Looper> looper_;
- std::vector<Observer*> observers_;
+ Observers<Observer> observers_;
std::unique_ptr<MsgChannel> channel_;
std::unique_ptr<DiscoverSched> discover_;
uint32_t discover_timer_;
diff --git a/src/monitor.hh b/src/monitor.hh
index 99e172b..b7ddbea 100644
--- a/src/monitor.hh
+++ b/src/monitor.hh
@@ -52,6 +52,7 @@ public:
virtual Machine machine(uint32_t id) const = 0;
virtual void add_observer(Observer* observer) = 0;
+ virtual void remove_observer(Observer* observer) = 0;
protected:
Monitor() {}
diff --git a/src/observers.hh b/src/observers.hh
new file mode 100644
index 0000000..3e56e6b
--- /dev/null
+++ b/src/observers.hh
@@ -0,0 +1,71 @@
+#ifndef OBSERVERS_HH
+#define OBSERVERS_HH
+
+#include <memory>
+#include <vector>
+
+template<typename T>
+class Observers {
+public:
+ Observers()
+ : current_(0), protected_(0), removed_(0) {
+ }
+
+ void add(T* observer) {
+ if (!observer) return;
+ for (auto it = observers_.begin(); it != observers_.end(); ++it) {
+ if (*it == observer) return;
+ }
+ observers_.push_back(observer);
+ }
+
+ void remove(T* observer) {
+ if (!observer) return;
+ size_t index = 0;
+ for (; index < observers_.size(); ++index) {
+ if (observers_[index] == observer) {
+ if (index < protected_) {
+ observers_[index] = nullptr;
+ ++removed_;
+ } else {
+ observers_.erase(observers_.begin() + index);
+ }
+ break;
+ }
+ }
+ }
+
+ T* first() {
+ if (protected_) return nullptr;
+ if (observers_.empty()) return nullptr;
+ protected_ = observers_.size();
+ return observers_[current_];
+ }
+
+ T* next() {
+ if (current_ >= protected_) return nullptr;
+ ++current_;
+ if (current_ >= protected_) {
+ if (removed_) {
+ for (size_t i = protected_; i > 0; --i) {
+ if (!observers_[i - 1]) {
+ observers_.erase(observers_.begin() + i - 1);
+ if (--removed_ == 0) break;
+ }
+ }
+ }
+ protected_ = current_ = 0;
+ return nullptr;
+ }
+ auto ret = observers_[current_];
+ return ret ? ret : next();
+ }
+
+private:
+ size_t current_;
+ size_t protected_;
+ size_t removed_;
+ std::vector<T*> observers_;
+};
+
+#endif // OBSERVERS_HH