summaryrefslogtreecommitdiff
path: root/src/packages.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/packages.cc')
-rw-r--r--src/packages.cc125
1 files changed, 125 insertions, 0 deletions
diff --git a/src/packages.cc b/src/packages.cc
new file mode 100644
index 0000000..0309df1
--- /dev/null
+++ b/src/packages.cc
@@ -0,0 +1,125 @@
+// -*- mode: c++; c-basic-offset: 2; -*-
+
+#include "common.hh"
+
+#include <istream>
+#include <memory>
+#include <ostream>
+#include <string.h>
+
+#include "data.hh"
+#include "packages.hh"
+
+namespace {
+
+class PackagesWriterImpl : public PackagesWriter {
+public:
+ PackagesWriterImpl(size_t count, std::ostream* out)
+ : count_(count), out_(out) {
+ uint8_t header[8];
+ memcpy(header, "TPP", 3);
+ header[3] = 0x1; // Version
+ write_u32(header + 4, count_); // Count
+ out->write(reinterpret_cast<char*>(header), 8);
+
+ if (count == 0) {
+ write_u64(header, 0); // EOF
+ out_->write(reinterpret_cast<char*>(header), 8);
+ }
+ }
+
+ ~PackagesWriterImpl() {
+ assert(count_ == 0);
+ }
+
+ void write(Package const& package, std::string const& data) override {
+ if (count_ == 0) {
+ assert(false);
+ return;
+ }
+ uint8_t buf[8192];
+ std::unique_ptr<uint8_t[]> backup;
+ uint8_t* ptr = buf;
+ size_t need = write_package(package, buf, sizeof(buf));
+ if (need > sizeof(buf)) {
+ backup.reset(new uint8_t[need]);
+ ptr = backup.get();
+ write_package(package, ptr, need);
+ }
+ uint8_t size[8];
+ write_u64(size, need + data.size());
+ out_->write(reinterpret_cast<char*>(size), 8);
+ out_->write(reinterpret_cast<char*>(ptr), need);
+ backup.reset();
+ out_->write(data.data(), data.size());
+
+ if (--count_ == 0) {
+ write_u64(size, 0); // EOF
+ out_->write(reinterpret_cast<char*>(size), 8);
+ }
+ }
+
+private:
+ size_t count_;
+ std::ostream* const out_;
+};
+
+} // namespace
+
+// static
+PackagesWriter* PackagesWriter::create(size_t count, std::ostream* out) {
+ return new PackagesWriterImpl(count, out);
+}
+
+// static
+PackagesReader::Status PackagesReader::read(std::istream& in,
+ Delegate* delegate) {
+ uint8_t header[8];
+ in.read(reinterpret_cast<char*>(header), 8);
+ if (!in.good() || memcmp(header, "TPP", 3) || header[3] != 1) {
+ return INVALID;
+ }
+ auto count = read_u32(header + 4);
+ while (count--) {
+ in.read(reinterpret_cast<char*>(header), 8);
+ if (!in.good()) return IO_ERROR;
+ size_t size = read_u64(header);
+ if (size == 0) return INVALID;
+ uint8_t buf[8192];
+ size_t avail = std::min(size, sizeof(buf));
+ in.read(reinterpret_cast<char*>(buf), avail);
+ if (!in.good()) return IO_ERROR;
+ Package pkg;
+ auto pkg_size = read_package(&pkg, buf, avail);
+ if (pkg_size == 0) {
+ if (avail == size) return INVALID;
+ size_t need = std::min(static_cast<size_t>(1024) * 1024, size);
+ std::unique_ptr<uint8_t[]> mem(new uint8_t[need]);
+ memcpy(mem.get(), buf, avail);
+ in.read(reinterpret_cast<char*>(mem.get()) + avail, need - avail);
+ if (!in.good()) return IO_ERROR;
+ pkg_size = read_package(&pkg, mem.get(), need);
+ if (pkg_size == 0) return INVALID;
+ delegate->package(pkg);
+ delegate->data(pkg.id, reinterpret_cast<char*>(mem.get())
+ + pkg_size, need - pkg_size, need == size);
+ size -= need;
+ } else {
+ delegate->package(pkg);
+ delegate->data(pkg.id, reinterpret_cast<char*>(buf) + pkg_size,
+ avail - pkg_size, avail == size);
+ size -= avail;
+ }
+ while (size) {
+ avail = std::min(sizeof(buf), size);
+ in.read(reinterpret_cast<char*>(buf), avail);
+ if (!in.good()) return IO_ERROR;
+ delegate->data(pkg.id, reinterpret_cast<char*>(buf), avail,
+ avail == size);
+ size -= avail;
+ }
+ }
+ in.read(reinterpret_cast<char*>(header), 8);
+ if (!in.good() || read_u64(header) != 0) return INVALID;
+ return GOOD;
+}