// -*- mode: c++; c-basic-offset: 2; -*- #include "common.hh" #include #include #include #include #include #include #include #include #include #include #include "gui_attrtext.hh" #include "gui_plainattrtext.hh" #include "http_protocol.hh" #include "io.hh" #include "looper.hh" #include "protocol.hh" #include "protocols.hh" namespace { class ProtocolsImpl : public Protocols { public: ProtocolsImpl(size_t workers, size_t buffer, size_t cache, Looper* looper, Listener* listener) : listener_(listener), workers_size_( std::max(static_cast(1), workers)), buffer_size_(buffer), cache_size_(cache), looper_(looper), wanted_(std::string::npos), quit_(false) { protocols_.emplace_back(HttpProtocol::create()); wanted_poke_.open(); if (wanted_poke_) { looper_->add(wanted_poke_.read(), Looper::EVENT_READ, std::bind(&ProtocolsImpl::notify, this, std::placeholders::_1, std::placeholders::_2)); } } ~ProtocolsImpl() override { { std::unique_lock lock(mutex_); quit_ = true; cond_.notify_all(); } for (auto& worker : workers_) { worker.join(); } if (wanted_poke_) { looper_->remove(wanted_poke_.read()); } } void add(size_t id, void const* data, size_t size) override { std::unique_lock lock(mutex_); auto& entry = entries_[id]; if (entry.data_) { assert(false); return; } entry.data_ = data; entry.size_ = size; if (size) reschedule_with_lock(); } void update(size_t id, void const* data, size_t size) override { std::unique_lock lock(mutex_); auto& entry = entries_[id]; entry.data_ = data; if (entry.size_ == size) return; auto old = entry.size_; entry.size_ = size; if (old >= buffer_size_) return; for (auto it = cache_.begin(); it != cache_.end(); ++it) { if (it->id_ == id) { queue_.emplace_back(id, it->size_, std::move(it->match_), std::move(it->text_)); cache_.erase(it); if (active_.size() < workers_size_) cond_.notify_one(); return; } } for (auto const& active : active_) { if (active == id) { invalidate_.emplace(id); return; } } if (old == 0) reschedule_with_lock(); } void remove(size_t id) override { auto it = entries_.find(id); if (it == entries_.end()) { assert(false); return; } std::unique_lock lock(mutex_); for (auto it = cache_.begin(); it != cache_.end(); ++it) { if (it->id_ == id) { cache_.erase(it); reschedule_with_lock(); return; } } if (wanted_ == id) wanted_ = std::string::npos; for (auto it = active_.begin(); it != active_.end(); ++it) { if (*it == id) { active_.erase(it); reschedule_with_lock(); return; } } for (auto it = queue_.begin(); it != queue_.end(); ++it) { if (it->id_ == id) { queue_.erase(it); reschedule_with_lock(); return; } } } void clear() override { std::unique_lock lock(mutex_); entries_.clear(); cache_.clear(); wanted_ = std::string::npos; active_.clear(); queue_.clear(); } void text(size_t id) override { auto it = entries_.find(id); if (it == entries_.end()) { assert(false); return; } std::unique_lock lock(mutex_); for (auto& entry : cache_) { if (entry.id_ == id) { entry.last_ = std::chrono::steady_clock::now(); if (!entry.text_) { // Already an active entry assert(false); return; } std::unique_ptr text(std::move(entry.text_)); std::string name(entry.match_ ? entry.match_->name() : ""); lock.unlock(); listener_->text(this, id, name, std::move(text)); return; } } wanted_ = id; for (auto& active : active_) { if (active == id) { // Already working on it return; } } for (auto it = queue_.begin(); it != queue_.end(); ++it) { if (it->id_ == id) { // Already in queue, remove so we can add it first if (it == queue_.begin()) { return; } auto offset = it->offset_; std::unique_ptr match(std::move(it->match_)); std::unique_ptr text(std::move(it->text_)); queue_.erase(it); queue_.emplace_front(id, offset, std::move(match), std::move(text)); return; } } queue_.emplace_front(id); if (active_.size() < workers_size_) cond_.notify_one(); } void free(size_t id, std::unique_ptr&& text) override { std::unique_lock lock(mutex_); for (auto& entry : cache_) { if (entry.id_ == id) { if (entry.text_) { // Entry updated while shown return; } entry.text_.swap(text); break; } } } void content(size_t id, std::ostream* out) override { auto it = entries_.find(id); if (it == entries_.end()) { assert(false); return; } std::unique_lock lock(mutex_); content_queue_.emplace_back(id, out); if (active_.size() < workers_size_) cond_.notify_one(); } private: struct Entry { void const* data_; size_t size_; Entry() : data_(nullptr), size_(0) { } }; struct CacheEntry { std::chrono::steady_clock::time_point last_; size_t id_; size_t size_; std::unique_ptr match_; std::unique_ptr text_; CacheEntry(size_t id, size_t size, std::unique_ptr&& match, std::unique_ptr&& text) : last_(std::chrono::steady_clock::now()), id_(id), size_(size), match_(std::move(match)), text_(std::move(text)) { } }; struct QueueEntry { size_t id_; size_t offset_; std::unique_ptr match_; std::unique_ptr text_; QueueEntry(size_t id) : id_(id), offset_(0) { } QueueEntry(size_t id, size_t offset, std::unique_ptr&& match, std::unique_ptr&& text) : id_(id), offset_(offset), match_(std::move(match)), text_(std::move(text)) { } }; struct ContentEntry { size_t id_; std::string name_; std::ostream* out_; ContentEntry(size_t id, std::ostream* out) : id_(id), out_(out) { } ContentEntry(size_t id, std::string const& name, std::ostream* out) : id_(id), name_(name), out_(out) { } }; void reschedule_with_lock() { if (cache_.size() < cache_size_) { size_t want = cache_size_ - cache_.size(); size_t queued = active_.size() + queue_.size(); if (want < queued) { want = 0; } else { want -= queued; } if (want) { std::unordered_set taken; for (auto const& cache : cache_) taken.insert(cache.id_); for (auto const& active : active_) taken.insert(active); for (auto const& queue : queue_) taken.insert(queue.id_); for (auto const& pair : entries_) { if (pair.second.size_ == 0) continue; // No point caching 0 bytes if (taken.count(pair.first)) continue; // Already in cache or queues queue_.emplace_back(pair.first); if (!--want) break; } } } if (queue_.empty()) return; if (active_.size() == workers_size_) return; while (workers_.size() < workers_size_) { workers_.emplace_back(&ProtocolsImpl::worker, this, buffer_size_); } cond_.notify_all(); } void worker(size_t buffer_size) { std::unique_ptr text; std::unique_ptr match; std::ostream* out = nullptr; size_t id = 0; size_t offset = 0; size_t size; std::unique_ptr buf(new char[buffer_size]); char const* ptr; size_t fill; while (true) { { std::unique_lock lock(mutex_); if (out) { content_done_with_lock(id, match ? match->name() : "", out); out = nullptr; id = 0; match.reset(); } else if (text) { cache_entry_with_lock(id, size, std::move(match), std::move(text)); assert(!match); assert(!text); id = 0; offset = 0; } while (true) { if (quit_) return; if (!content_queue_.empty()) { id = content_queue_.front().id_; out = content_queue_.front().out_; content_queue_.pop_front(); break; } if (!queue_.empty()) { id = queue_.front().id_; offset = queue_.front().offset_; match.swap(queue_.front().match_); text.swap(queue_.front().text_); queue_.pop_front(); active_.emplace_back(id); break; } cond_.wait(lock); } auto const& entry = entries_[id]; size = entry.size_; if (out) { fill = size; ptr = reinterpret_cast(entry.data_); } else { fill = std::min(size, buffer_size); memcpy(buf.get(), entry.data_, fill); ptr = buf.get(); } } if (!out && !text) text.reset(AttributedText::create()); if (!match) { for (auto const& protocol : protocols_) { match.reset(protocol->match(ptr, fill)); if (match) break; } offset = 0; } if (match) { if (out) { if (!match->content(ptr, fill, out)) { out->write(ptr, fill); } } else { if (offset == 0) { match->full(ptr, fill, text.get()); } else { match->append(ptr, offset, fill, text.get()); } } } } } void cache_entry_with_lock(size_t id, size_t size, std::unique_ptr&& match, std::unique_ptr&& text) { active_.erase(std::find(active_.begin(), active_.end(), id)); if (invalidate_.erase(id)) { if (wanted_ == id) { queue_.emplace_front(id, size, std::move(match), std::move(text)); } else { queue_.emplace_back(id, size, std::move(match), std::move(text)); } return; } while (cache_.size() >= cache_size_) { auto oldest = cache_.end(); for (auto it = cache_.begin(); it != cache_.end(); ++it) { if (!it->text_) continue; // Do not remove active if (oldest == cache_.end() || it->last_ < oldest->last_) { oldest = it; } } if (oldest == cache_.end()) break; cache_.erase(oldest); } cache_.emplace_back(id, size, std::move(match), std::move(text)); if (wanted_ == id && wanted_poke_) { io::write(wanted_poke_.write(), "a", 1); } } void content_done_with_lock(size_t id, std::string const& name, std::ostream* out) { content_done_.emplace_back(id, name, out); if (wanted_poke_) { io::write(wanted_poke_.write(), "b", 1); } } void notify(int fd, uint8_t) { char tmp[10]; io::read(fd, tmp, sizeof(tmp)); std::unique_lock lock(mutex_); while (!content_done_.empty()) { ContentEntry entry(content_done_.front()); content_done_.pop_front(); lock.unlock(); listener_->content(this, entry.id_, entry.name_, entry.out_); lock.lock(); } if (wanted_ == std::string::npos) return; for (auto& entry : cache_) { if (entry.id_ == wanted_) { wanted_ = std::string::npos; entry.last_ = std::chrono::steady_clock::now(); if (!entry.text_) { // Already an active entry assert(false); return; } auto id = entry.id_; std::unique_ptr text(std::move(entry.text_)); std::string name(entry.match_ ? entry.match_->name() : ""); lock.unlock(); listener_->text(this, id, name, std::move(text)); return; } } } Listener* const listener_; size_t const workers_size_; size_t const buffer_size_; size_t const cache_size_; Looper* const looper_; std::unordered_map entries_; std::vector> protocols_; std::vector workers_; io::auto_pipe wanted_poke_; std::mutex mutex_; std::condition_variable cond_; size_t wanted_; bool quit_; std::vector cache_; std::vector active_; std::deque queue_; std::unordered_set invalidate_; std::deque content_done_; std::deque content_queue_; }; } // namespace // static Protocols* Protocols::create(size_t workers, size_t buffer, size_t cache, Looper* looper, Listener* listener) { assert(listener); return new ProtocolsImpl(workers, buffer, cache, looper, listener); }