summaryrefslogtreecommitdiff
path: root/src/protocols.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocols.cc')
-rw-r--r--src/protocols.cc477
1 files changed, 477 insertions, 0 deletions
diff --git a/src/protocols.cc b/src/protocols.cc
new file mode 100644
index 0000000..a1eaa58
--- /dev/null
+++ b/src/protocols.cc
@@ -0,0 +1,477 @@
+// -*- mode: c++; c-basic-offset: 2; -*-
+
+#include "common.hh"
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <string.h>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "gui_attrtext.hh"
+#include "gui_plainattrtext.hh"
+#include "http_protocol.hh"
+#include "io.hh"
+#include "looper.hh"
+#include "protocol.hh"
+#include "protocols.hh"
+
+namespace {
+
+class ProtocolsImpl : public Protocols {
+public:
+ ProtocolsImpl(size_t workers, size_t buffer, size_t cache, Looper* looper,
+ Listener* listener)
+ : listener_(listener), workers_size_(
+ std::max(static_cast<size_t>(1), workers)), buffer_size_(buffer),
+ cache_size_(cache), looper_(looper),
+ wanted_(std::string::npos), quit_(false) {
+ protocols_.emplace_back(HttpProtocol::create());
+ wanted_poke_.open();
+ if (wanted_poke_) {
+ looper_->add(wanted_poke_.read(), Looper::EVENT_READ,
+ std::bind(&ProtocolsImpl::notify, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ }
+ }
+
+ ~ProtocolsImpl() override {
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ quit_ = true;
+ cond_.notify_all();
+ }
+ for (auto& worker : workers_) {
+ worker.join();
+ }
+ if (wanted_poke_) {
+ looper_->remove(wanted_poke_.read());
+ }
+ }
+
+ void add(size_t id, void const* data, size_t size) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ auto& entry = entries_[id];
+ if (entry.data_) {
+ assert(false);
+ return;
+ }
+ entry.data_ = data;
+ entry.size_ = size;
+
+ reschedule_with_lock();
+ }
+
+ void update(size_t id, void const* data, size_t size) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ auto& entry = entries_[id];
+ if (!entry.data_) {
+ assert(false);
+ entries_.erase(id);
+ return;
+ }
+ entry.data_ = data;
+ if (entry.size_ == size) return;
+ auto old = entry.size_;
+ entry.size_ = size;
+ if (old >= buffer_size_) return;
+
+ for (auto it = cache_.begin(); it != cache_.end(); ++it) {
+ if (it->id_ == id) {
+ queue_.emplace_back(id, it->size_,
+ std::move(it->match_), std::move(it->text_));
+ cache_.erase(it);
+ if (active_.size() < workers_size_) cond_.notify_one();
+ return;
+ }
+ }
+ }
+
+ void remove(size_t id) override {
+ auto it = entries_.find(id);
+ if (it == entries_.end()) {
+ assert(false);
+ return;
+ }
+
+ std::unique_lock<std::mutex> lock(mutex_);
+ for (auto it = cache_.begin(); it != cache_.end(); ++it) {
+ if (it->id_ == id) {
+ cache_.erase(it);
+ reschedule_with_lock();
+ return;
+ }
+ }
+
+ if (wanted_ == id) wanted_ = std::string::npos;
+
+ for (auto it = active_.begin(); it != active_.end(); ++it) {
+ if (*it == id) {
+ active_.erase(it);
+ reschedule_with_lock();
+ return;
+ }
+ }
+
+ for (auto it = queue_.begin(); it != queue_.end(); ++it) {
+ if (it->id_ == id) {
+ queue_.erase(it);
+ reschedule_with_lock();
+ return;
+ }
+ }
+ }
+
+ void clear() override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ cache_.clear();
+ wanted_ = std::string::npos;
+ active_.clear();
+ queue_.clear();
+ }
+
+ void text(size_t id) override {
+ auto it = entries_.find(id);
+ if (it == entries_.end()) {
+ assert(false);
+ return;
+ }
+
+ std::unique_lock<std::mutex> lock(mutex_);
+ for (auto& entry : cache_) {
+ if (entry.id_ == id) {
+ entry.last_ = std::chrono::steady_clock::now();
+ if (!entry.text_) {
+ // Already an active entry
+ assert(false);
+ return;
+ }
+ std::unique_ptr<AttributedText> text(std::move(entry.text_));
+ std::string name(entry.match_ ? entry.match_->name() : "");
+ lock.unlock();
+ listener_->text(this, id, name, std::move(text));
+ return;
+ }
+ }
+
+ wanted_ = id;
+
+ for (auto& active : active_) {
+ if (active == id) {
+ // Already working on it
+ return;
+ }
+ }
+
+ for (auto it = queue_.begin(); it != queue_.end(); ++it) {
+ if (it->id_ == id) {
+ // Already in queue, remove so we can add it first
+ if (it == queue_.begin()) {
+ return;
+ }
+ auto offset = it->offset_;
+ std::unique_ptr<Protocol::Match> match(std::move(it->match_));
+ std::unique_ptr<AttributedText> text(std::move(it->text_));
+ queue_.erase(it);
+ queue_.emplace_front(id, offset, std::move(match), std::move(text));
+ return;
+ }
+ }
+
+ queue_.emplace_front(id);
+ if (active_.size() < workers_size_) cond_.notify_one();
+ }
+
+ void free(size_t id, std::unique_ptr<AttributedText>&& text) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ for (auto& entry : cache_) {
+ if (entry.id_ == id) {
+ if (entry.text_) {
+ // Entry updated while shown
+ return;
+ }
+ entry.text_.swap(text);
+ break;
+ }
+ }
+ }
+
+ void content(size_t id, std::ostream* out) override {
+ auto it = entries_.find(id);
+ if (it == entries_.end()) {
+ assert(false);
+ return;
+ }
+
+ std::unique_lock<std::mutex> lock(mutex_);
+ content_queue_.emplace_back(id, out);
+ if (active_.size() < workers_size_) cond_.notify_one();
+ }
+
+private:
+ struct Entry {
+ void const* data_;
+ size_t size_;
+
+ Entry()
+ : data_(nullptr), size_(0) {
+ }
+ };
+
+ struct CacheEntry {
+ std::chrono::steady_clock::time_point last_;
+ size_t id_;
+ size_t size_;
+ std::unique_ptr<Protocol::Match> match_;
+ std::unique_ptr<AttributedText> text_;
+
+ CacheEntry(size_t id, size_t size, std::unique_ptr<Protocol::Match>&& match,
+ std::unique_ptr<AttributedText>&& text)
+ : last_(std::chrono::steady_clock::now()), id_(id), size_(size),
+ match_(std::move(match)), text_(std::move(text)) {
+ }
+ };
+
+ struct QueueEntry {
+ size_t id_;
+ size_t offset_;
+ std::unique_ptr<Protocol::Match> match_;
+ std::unique_ptr<AttributedText> text_;
+
+ QueueEntry(size_t id)
+ : id_(id), offset_(0) {
+ }
+
+ QueueEntry(size_t id, size_t offset,
+ std::unique_ptr<Protocol::Match>&& match,
+ std::unique_ptr<AttributedText>&& text)
+ : id_(id), offset_(offset), match_(std::move(match)),
+ text_(std::move(text)) {
+ }
+ };
+
+ struct ContentEntry {
+ size_t id_;
+ std::string name_;
+ std::ostream* out_;
+
+ ContentEntry(size_t id, std::ostream* out)
+ : id_(id), out_(out) {
+ }
+
+ ContentEntry(size_t id, std::string const& name, std::ostream* out)
+ : id_(id), name_(name), out_(out) {
+ }
+ };
+
+ void reschedule_with_lock() {
+ if (cache_.size() < cache_size_) {
+ size_t want = cache_.size() - cache_size_;
+ size_t queued = active_.size() + queue_.size();
+ if (want < queued) {
+ want = 0;
+ } else {
+ want -= queued;
+ }
+ if (want) {
+ std::unordered_set<size_t> taken;
+ for (auto const& cache : cache_) taken.insert(cache.id_);
+ for (auto const& active : active_) taken.insert(active);
+ for (auto const& queue : queue_) taken.insert(queue.id_);
+ for (auto const& pair : entries_) {
+ if (pair.second.size_ == 0) continue; // No point caching 0 bytes
+ if (taken.count(pair.first)) continue; // Already in cache or queues
+ queue_.emplace_back(pair.first);
+ if (!--want) break;
+ }
+ }
+ }
+
+ if (queue_.empty()) return;
+ if (active_.size() == workers_size_) return;
+ while (workers_.size() < workers_size_) {
+ workers_.emplace_back(&ProtocolsImpl::worker, this, buffer_size_);
+ }
+ cond_.notify_all();
+ }
+
+ void worker(size_t buffer_size) {
+ std::unique_ptr<AttributedText> text;
+ std::unique_ptr<Protocol::Match> match;
+ std::ostream* out = nullptr;
+ size_t id = 0;
+ size_t offset = 0;
+ size_t size;
+ std::unique_ptr<char[]> buf(new char[buffer_size]);
+ char const* ptr;
+ size_t fill;
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (out) {
+ content_done_with_lock(id, match ? match->name() : "", out);
+ out = nullptr;
+ id = 0;
+ match.reset();
+ } else if (text) {
+ cache_entry_with_lock(id, size, std::move(match), std::move(text));
+ assert(!match);
+ assert(!text);
+ id = 0;
+ offset = 0;
+ }
+ while (true) {
+ if (quit_) return;
+ if (!content_queue_.empty()) {
+ id = content_queue_.front().id_;
+ out = content_queue_.front().out_;
+ content_queue_.pop_front();
+ break;
+ }
+ if (!queue_.empty()) {
+ id = queue_.front().id_;
+ offset = queue_.front().offset_;
+ match.swap(queue_.front().match_);
+ text.swap(queue_.front().text_);
+ queue_.pop_front();
+ active_.emplace_back(id);
+ break;
+ }
+ cond_.wait(lock);
+ }
+
+ auto const& entry = entries_[id];
+ size = entry.size_;
+ if (out) {
+ fill = size;
+ ptr = reinterpret_cast<char const*>(entry.data_);
+ } else {
+ fill = std::min(size, buffer_size);
+ memcpy(buf.get(), entry.data_, fill);
+ ptr = buf.get();
+ }
+ }
+
+ if (!out && !text) text.reset(AttributedText::create());
+ if (!match) {
+ for (auto const& protocol : protocols_) {
+ match.reset(protocol->match(ptr, fill));
+ if (match) break;
+ }
+ offset = 0;
+ }
+ if (match) {
+ if (out) {
+ if (!match->content(ptr, fill, out)) {
+ out->write(ptr, fill);
+ }
+ } else {
+ if (offset == 0) {
+ match->full(ptr, fill, text.get());
+ } else {
+ match->append(ptr, offset, fill, text.get());
+ }
+ }
+ }
+ }
+ }
+
+ void cache_entry_with_lock(size_t id, size_t size,
+ std::unique_ptr<Protocol::Match>&& match,
+ std::unique_ptr<AttributedText>&& text) {
+ active_.erase(std::find(active_.begin(), active_.end(), id));
+ while (cache_.size() >= cache_size_) {
+ auto oldest = cache_.end();
+ for (auto it = cache_.begin(); it != cache_.end(); ++it) {
+ if (!it->text_) continue; // Do not remove active
+ if (oldest == cache_.end() || it->last_ < oldest->last_) {
+ oldest = it;
+ }
+ }
+ if (oldest == cache_.end()) break;
+ cache_.erase(oldest);
+ }
+ cache_.emplace_back(id, size, std::move(match), std::move(text));
+ if (wanted_ == id && wanted_poke_) {
+ io::write(wanted_poke_.write(), "a", 1);
+ }
+ }
+
+ void content_done_with_lock(size_t id, std::string const& name,
+ std::ostream* out) {
+ content_done_.emplace_back(id, name, out);
+ if (wanted_poke_) {
+ io::write(wanted_poke_.write(), "b", 1);
+ }
+ }
+
+ void notify(int fd, uint8_t) {
+ char tmp[10];
+ io::read(fd, tmp, sizeof(tmp));
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (!content_done_.empty()) {
+ ContentEntry entry(content_done_.front());
+ content_done_.pop_front();
+ lock.unlock();
+ listener_->content(this, entry.id_, entry.name_, entry.out_);
+ lock.lock();
+ }
+ if (wanted_ == std::string::npos) return;
+ for (auto& entry : cache_) {
+ if (entry.id_ == wanted_) {
+ wanted_ = std::string::npos;
+ entry.last_ = std::chrono::steady_clock::now();
+ if (!entry.text_) {
+ // Already an active entry
+ assert(false);
+ return;
+ }
+ auto id = entry.id_;
+ std::unique_ptr<AttributedText> text(std::move(entry.text_));
+ std::string name(entry.match_ ? entry.match_->name() : "");
+ lock.unlock();
+ listener_->text(this, id, name, std::move(text));
+ return;
+ }
+ }
+ }
+
+ Listener* const listener_;
+ size_t const workers_size_;
+ size_t const buffer_size_;
+ size_t const cache_size_;
+ Looper* const looper_;
+ std::unordered_map<size_t, Entry> entries_;
+
+ std::vector<std::unique_ptr<Protocol>> protocols_;
+ std::vector<std::thread> workers_;
+
+ io::auto_pipe wanted_poke_;
+
+ std::mutex mutex_;
+ std::condition_variable cond_;
+ size_t wanted_;
+ bool quit_;
+ std::vector<CacheEntry> cache_;
+ std::vector<size_t> active_;
+ std::deque<QueueEntry> queue_;
+
+ std::deque<ContentEntry> content_done_;
+ std::deque<ContentEntry> content_queue_;
+};
+
+} // namespace
+
+// static
+Protocols* Protocols::create(size_t workers, size_t buffer, size_t cache,
+ Looper* looper, Listener* listener) {
+ assert(listener);
+ return new ProtocolsImpl(workers, buffer, cache, looper, listener);
+}