diff options
| author | Joel Klinghed <the_jk@yahoo.com> | 2017-08-06 22:23:41 +0200 |
|---|---|---|
| committer | Joel Klinghed <the_jk@yahoo.com> | 2017-08-06 22:25:44 +0200 |
| commit | 178bb3a1ceab88f29aa7d0ceb453e76de172fd27 (patch) | |
| tree | 09f830338d5490552ae878152de0f104cb7f6e5b /src/protocols.cc | |
| parent | 9d586aec3a5615377e389318e97e7d756c970c96 (diff) | |
Add protools, used for getting content out of packages
Only HTTP protocol implemented yet, but with gzip, deflate and bzip2
suport
Diffstat (limited to 'src/protocols.cc')
| -rw-r--r-- | src/protocols.cc | 477 |
1 files changed, 477 insertions, 0 deletions
diff --git a/src/protocols.cc b/src/protocols.cc new file mode 100644 index 0000000..a1eaa58 --- /dev/null +++ b/src/protocols.cc @@ -0,0 +1,477 @@ +// -*- mode: c++; c-basic-offset: 2; -*- + +#include "common.hh" + +#include <algorithm> +#include <chrono> +#include <condition_variable> +#include <deque> +#include <mutex> +#include <string.h> +#include <thread> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "gui_attrtext.hh" +#include "gui_plainattrtext.hh" +#include "http_protocol.hh" +#include "io.hh" +#include "looper.hh" +#include "protocol.hh" +#include "protocols.hh" + +namespace { + +class ProtocolsImpl : public Protocols { +public: + ProtocolsImpl(size_t workers, size_t buffer, size_t cache, Looper* looper, + Listener* listener) + : listener_(listener), workers_size_( + std::max(static_cast<size_t>(1), workers)), buffer_size_(buffer), + cache_size_(cache), looper_(looper), + wanted_(std::string::npos), quit_(false) { + protocols_.emplace_back(HttpProtocol::create()); + wanted_poke_.open(); + if (wanted_poke_) { + looper_->add(wanted_poke_.read(), Looper::EVENT_READ, + std::bind(&ProtocolsImpl::notify, this, + std::placeholders::_1, + std::placeholders::_2)); + + } + } + + ~ProtocolsImpl() override { + { + std::unique_lock<std::mutex> lock(mutex_); + quit_ = true; + cond_.notify_all(); + } + for (auto& worker : workers_) { + worker.join(); + } + if (wanted_poke_) { + looper_->remove(wanted_poke_.read()); + } + } + + void add(size_t id, void const* data, size_t size) override { + std::unique_lock<std::mutex> lock(mutex_); + auto& entry = entries_[id]; + if (entry.data_) { + assert(false); + return; + } + entry.data_ = data; + entry.size_ = size; + + reschedule_with_lock(); + } + + void update(size_t id, void const* data, size_t size) override { + std::unique_lock<std::mutex> lock(mutex_); + auto& entry = entries_[id]; + if (!entry.data_) { + assert(false); + entries_.erase(id); + return; + } + entry.data_ = data; + if (entry.size_ == size) return; + auto old = entry.size_; + entry.size_ = size; + if (old >= buffer_size_) return; + + for (auto it = cache_.begin(); it != cache_.end(); ++it) { + if (it->id_ == id) { + queue_.emplace_back(id, it->size_, + std::move(it->match_), std::move(it->text_)); + cache_.erase(it); + if (active_.size() < workers_size_) cond_.notify_one(); + return; + } + } + } + + void remove(size_t id) override { + auto it = entries_.find(id); + if (it == entries_.end()) { + assert(false); + return; + } + + std::unique_lock<std::mutex> lock(mutex_); + for (auto it = cache_.begin(); it != cache_.end(); ++it) { + if (it->id_ == id) { + cache_.erase(it); + reschedule_with_lock(); + return; + } + } + + if (wanted_ == id) wanted_ = std::string::npos; + + for (auto it = active_.begin(); it != active_.end(); ++it) { + if (*it == id) { + active_.erase(it); + reschedule_with_lock(); + return; + } + } + + for (auto it = queue_.begin(); it != queue_.end(); ++it) { + if (it->id_ == id) { + queue_.erase(it); + reschedule_with_lock(); + return; + } + } + } + + void clear() override { + std::unique_lock<std::mutex> lock(mutex_); + cache_.clear(); + wanted_ = std::string::npos; + active_.clear(); + queue_.clear(); + } + + void text(size_t id) override { + auto it = entries_.find(id); + if (it == entries_.end()) { + assert(false); + return; + } + + std::unique_lock<std::mutex> lock(mutex_); + for (auto& entry : cache_) { + if (entry.id_ == id) { + entry.last_ = std::chrono::steady_clock::now(); + if (!entry.text_) { + // Already an active entry + assert(false); + return; + } + std::unique_ptr<AttributedText> text(std::move(entry.text_)); + std::string name(entry.match_ ? entry.match_->name() : ""); + lock.unlock(); + listener_->text(this, id, name, std::move(text)); + return; + } + } + + wanted_ = id; + + for (auto& active : active_) { + if (active == id) { + // Already working on it + return; + } + } + + for (auto it = queue_.begin(); it != queue_.end(); ++it) { + if (it->id_ == id) { + // Already in queue, remove so we can add it first + if (it == queue_.begin()) { + return; + } + auto offset = it->offset_; + std::unique_ptr<Protocol::Match> match(std::move(it->match_)); + std::unique_ptr<AttributedText> text(std::move(it->text_)); + queue_.erase(it); + queue_.emplace_front(id, offset, std::move(match), std::move(text)); + return; + } + } + + queue_.emplace_front(id); + if (active_.size() < workers_size_) cond_.notify_one(); + } + + void free(size_t id, std::unique_ptr<AttributedText>&& text) override { + std::unique_lock<std::mutex> lock(mutex_); + for (auto& entry : cache_) { + if (entry.id_ == id) { + if (entry.text_) { + // Entry updated while shown + return; + } + entry.text_.swap(text); + break; + } + } + } + + void content(size_t id, std::ostream* out) override { + auto it = entries_.find(id); + if (it == entries_.end()) { + assert(false); + return; + } + + std::unique_lock<std::mutex> lock(mutex_); + content_queue_.emplace_back(id, out); + if (active_.size() < workers_size_) cond_.notify_one(); + } + +private: + struct Entry { + void const* data_; + size_t size_; + + Entry() + : data_(nullptr), size_(0) { + } + }; + + struct CacheEntry { + std::chrono::steady_clock::time_point last_; + size_t id_; + size_t size_; + std::unique_ptr<Protocol::Match> match_; + std::unique_ptr<AttributedText> text_; + + CacheEntry(size_t id, size_t size, std::unique_ptr<Protocol::Match>&& match, + std::unique_ptr<AttributedText>&& text) + : last_(std::chrono::steady_clock::now()), id_(id), size_(size), + match_(std::move(match)), text_(std::move(text)) { + } + }; + + struct QueueEntry { + size_t id_; + size_t offset_; + std::unique_ptr<Protocol::Match> match_; + std::unique_ptr<AttributedText> text_; + + QueueEntry(size_t id) + : id_(id), offset_(0) { + } + + QueueEntry(size_t id, size_t offset, + std::unique_ptr<Protocol::Match>&& match, + std::unique_ptr<AttributedText>&& text) + : id_(id), offset_(offset), match_(std::move(match)), + text_(std::move(text)) { + } + }; + + struct ContentEntry { + size_t id_; + std::string name_; + std::ostream* out_; + + ContentEntry(size_t id, std::ostream* out) + : id_(id), out_(out) { + } + + ContentEntry(size_t id, std::string const& name, std::ostream* out) + : id_(id), name_(name), out_(out) { + } + }; + + void reschedule_with_lock() { + if (cache_.size() < cache_size_) { + size_t want = cache_.size() - cache_size_; + size_t queued = active_.size() + queue_.size(); + if (want < queued) { + want = 0; + } else { + want -= queued; + } + if (want) { + std::unordered_set<size_t> taken; + for (auto const& cache : cache_) taken.insert(cache.id_); + for (auto const& active : active_) taken.insert(active); + for (auto const& queue : queue_) taken.insert(queue.id_); + for (auto const& pair : entries_) { + if (pair.second.size_ == 0) continue; // No point caching 0 bytes + if (taken.count(pair.first)) continue; // Already in cache or queues + queue_.emplace_back(pair.first); + if (!--want) break; + } + } + } + + if (queue_.empty()) return; + if (active_.size() == workers_size_) return; + while (workers_.size() < workers_size_) { + workers_.emplace_back(&ProtocolsImpl::worker, this, buffer_size_); + } + cond_.notify_all(); + } + + void worker(size_t buffer_size) { + std::unique_ptr<AttributedText> text; + std::unique_ptr<Protocol::Match> match; + std::ostream* out = nullptr; + size_t id = 0; + size_t offset = 0; + size_t size; + std::unique_ptr<char[]> buf(new char[buffer_size]); + char const* ptr; + size_t fill; + while (true) { + { + std::unique_lock<std::mutex> lock(mutex_); + if (out) { + content_done_with_lock(id, match ? match->name() : "", out); + out = nullptr; + id = 0; + match.reset(); + } else if (text) { + cache_entry_with_lock(id, size, std::move(match), std::move(text)); + assert(!match); + assert(!text); + id = 0; + offset = 0; + } + while (true) { + if (quit_) return; + if (!content_queue_.empty()) { + id = content_queue_.front().id_; + out = content_queue_.front().out_; + content_queue_.pop_front(); + break; + } + if (!queue_.empty()) { + id = queue_.front().id_; + offset = queue_.front().offset_; + match.swap(queue_.front().match_); + text.swap(queue_.front().text_); + queue_.pop_front(); + active_.emplace_back(id); + break; + } + cond_.wait(lock); + } + + auto const& entry = entries_[id]; + size = entry.size_; + if (out) { + fill = size; + ptr = reinterpret_cast<char const*>(entry.data_); + } else { + fill = std::min(size, buffer_size); + memcpy(buf.get(), entry.data_, fill); + ptr = buf.get(); + } + } + + if (!out && !text) text.reset(AttributedText::create()); + if (!match) { + for (auto const& protocol : protocols_) { + match.reset(protocol->match(ptr, fill)); + if (match) break; + } + offset = 0; + } + if (match) { + if (out) { + if (!match->content(ptr, fill, out)) { + out->write(ptr, fill); + } + } else { + if (offset == 0) { + match->full(ptr, fill, text.get()); + } else { + match->append(ptr, offset, fill, text.get()); + } + } + } + } + } + + void cache_entry_with_lock(size_t id, size_t size, + std::unique_ptr<Protocol::Match>&& match, + std::unique_ptr<AttributedText>&& text) { + active_.erase(std::find(active_.begin(), active_.end(), id)); + while (cache_.size() >= cache_size_) { + auto oldest = cache_.end(); + for (auto it = cache_.begin(); it != cache_.end(); ++it) { + if (!it->text_) continue; // Do not remove active + if (oldest == cache_.end() || it->last_ < oldest->last_) { + oldest = it; + } + } + if (oldest == cache_.end()) break; + cache_.erase(oldest); + } + cache_.emplace_back(id, size, std::move(match), std::move(text)); + if (wanted_ == id && wanted_poke_) { + io::write(wanted_poke_.write(), "a", 1); + } + } + + void content_done_with_lock(size_t id, std::string const& name, + std::ostream* out) { + content_done_.emplace_back(id, name, out); + if (wanted_poke_) { + io::write(wanted_poke_.write(), "b", 1); + } + } + + void notify(int fd, uint8_t) { + char tmp[10]; + io::read(fd, tmp, sizeof(tmp)); + std::unique_lock<std::mutex> lock(mutex_); + while (!content_done_.empty()) { + ContentEntry entry(content_done_.front()); + content_done_.pop_front(); + lock.unlock(); + listener_->content(this, entry.id_, entry.name_, entry.out_); + lock.lock(); + } + if (wanted_ == std::string::npos) return; + for (auto& entry : cache_) { + if (entry.id_ == wanted_) { + wanted_ = std::string::npos; + entry.last_ = std::chrono::steady_clock::now(); + if (!entry.text_) { + // Already an active entry + assert(false); + return; + } + auto id = entry.id_; + std::unique_ptr<AttributedText> text(std::move(entry.text_)); + std::string name(entry.match_ ? entry.match_->name() : ""); + lock.unlock(); + listener_->text(this, id, name, std::move(text)); + return; + } + } + } + + Listener* const listener_; + size_t const workers_size_; + size_t const buffer_size_; + size_t const cache_size_; + Looper* const looper_; + std::unordered_map<size_t, Entry> entries_; + + std::vector<std::unique_ptr<Protocol>> protocols_; + std::vector<std::thread> workers_; + + io::auto_pipe wanted_poke_; + + std::mutex mutex_; + std::condition_variable cond_; + size_t wanted_; + bool quit_; + std::vector<CacheEntry> cache_; + std::vector<size_t> active_; + std::deque<QueueEntry> queue_; + + std::deque<ContentEntry> content_done_; + std::deque<ContentEntry> content_queue_; +}; + +} // namespace + +// static +Protocols* Protocols::create(size_t workers, size_t buffer, size_t cache, + Looper* looper, Listener* listener) { + assert(listener); + return new ProtocolsImpl(workers, buffer, cache, looper, listener); +} |
