summaryrefslogtreecommitdiff
path: root/src/packages.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/packages.cc')
-rw-r--r--src/packages.cc48
1 files changed, 34 insertions, 14 deletions
diff --git a/src/packages.cc b/src/packages.cc
index 70f9de4..6aa7500 100644
--- a/src/packages.cc
+++ b/src/packages.cc
@@ -18,7 +18,7 @@ public:
: open_(true), count_(0), out_(out) {
uint8_t header[8];
memcpy(header, "TPP", 3);
- header[3] = 0x1; // Version
+ header[3] = 0x2; // Version
write_u32(header + 4, 0); // Count
out->write(reinterpret_cast<char*>(header), 8);
}
@@ -29,7 +29,8 @@ public:
}
}
- void write(Package const& package, char const* data, size_t size) override {
+ void write(Package const& package, char const* data, size_t size,
+ struct timespec end) override {
if (!open_) {
assert(false);
return;
@@ -43,9 +44,11 @@ public:
ptr = backup.get();
write_package(package, ptr, need);
}
- uint8_t size_data[8];
- write_u64(size_data, need + size);
- out_->write(reinterpret_cast<char*>(size_data), 8);
+ uint8_t size_end[20];
+ write_u64(size_end, need + size);
+ write_u64(size_end + 8, end.tv_sec);
+ write_u32(size_end + 16, end.tv_nsec);
+ out_->write(reinterpret_cast<char*>(size_end), 20);
out_->write(reinterpret_cast<char*>(ptr), need);
backup.reset();
out_->write(data, size);
@@ -84,14 +87,29 @@ PackagesReader::Status PackagesReader::read(std::istream& in,
Delegate* delegate) {
uint8_t header[8];
in.read(reinterpret_cast<char*>(header), 8);
- if (!in.good() || memcmp(header, "TPP", 3) || header[3] != 1) {
+ if (!in.good() || memcmp(header, "TPP", 3)) {
return INVALID;
}
+ if (header[3] < 1 || header[3] > 2) {
+ return INVALID;
+ }
+ bool const have_end = header[3] > 1;
auto count = read_u32(header + 4);
while (count--) {
- in.read(reinterpret_cast<char*>(header), 8);
- if (!in.good()) return IO_ERROR;
- size_t size = read_u64(header);
+ struct timespec end;
+ size_t size;
+ if (have_end) {
+ uint8_t size_end[20];
+ in.read(reinterpret_cast<char*>(size_end), 20);
+ if (!in.good()) return IO_ERROR;
+ size = read_u64(size_end);
+ end.tv_sec = read_u64(size_end + 8);
+ end.tv_nsec = read_u32(size_end + 16);
+ } else {
+ in.read(reinterpret_cast<char*>(header), 8);
+ if (!in.good()) return IO_ERROR;
+ size = read_u64(header);
+ }
if (size == 0) return INVALID;
uint8_t buf[8192];
size_t avail = std::min(size, sizeof(buf));
@@ -108,15 +126,17 @@ PackagesReader::Status PackagesReader::read(std::istream& in,
if (!in.good()) return IO_ERROR;
pkg_size = read_package(&pkg, mem.get(), need);
if (pkg_size == 0) return INVALID;
- delegate->package(pkg);
+ if (!have_end) end = pkg.timestamp;
+ delegate->package(pkg, end);
delegate->data(pkg.id, reinterpret_cast<char*>(mem.get())
+ pkg_size, need - pkg_size, need == size);
size -= need;
} else {
- delegate->package(pkg);
- delegate->data(pkg.id, reinterpret_cast<char*>(buf) + pkg_size,
- avail - pkg_size, avail == size);
- size -= avail;
+ if (!have_end) end = pkg.timestamp;
+ delegate->package(pkg, end);
+ delegate->data(pkg.id, reinterpret_cast<char*>(buf) + pkg_size,
+ avail - pkg_size, avail == size);
+ size -= avail;
}
while (size) {
avail = std::min(sizeof(buf), size);