From cc61af1e2c933f2178a5dc3e5a5599e74d380eb3 Mon Sep 17 00:00:00 2001 From: Joel Klinghed Date: Thu, 10 Aug 2017 21:55:29 +0200 Subject: Add duration to packages and tpp format --- src/monitor-cmd.cc | 19 ++++++++++-------- src/monitor-gui.cc | 59 ++++++++++++++++++++++++++++++++++++++++++++++-------- src/monitor.cc | 11 +++++++--- src/monitor.hh | 3 ++- src/packages.cc | 48 +++++++++++++++++++++++++++++++------------- src/packages.hh | 10 +++++---- src/proxy.cc | 17 ++++++++++------ 7 files changed, 123 insertions(+), 44 deletions(-) (limited to 'src') diff --git a/src/monitor-cmd.cc b/src/monitor-cmd.cc index 580f45f..eb7245d 100644 --- a/src/monitor-cmd.cc +++ b/src/monitor-cmd.cc @@ -63,27 +63,28 @@ public: } void package_data(Monitor*, uint32_t id, - char const* data, size_t size, bool last) override { + char const* data, size_t size, bool last, + struct timespec timestamp) override { auto it = packages_.find(id); if (it == packages_.end()) { assert(false); return; } if (interleave_) { - print_package(it->second, last, data, size); + print_package(it->second, last, data, size, timestamp); if (last) packages_.erase(it); return; } auto buf = data_.find(id); if (last) { if (buf == data_.end()) { - print_package(it->second, true, data, size); + print_package(it->second, true, data, size, timestamp); } else { buf->second->write(data, size); size_t avail; auto ptr = buf->second->read_ptr(&avail); print_package(it->second, true, - reinterpret_cast(ptr), avail); + reinterpret_cast(ptr), avail, timestamp); data_.erase(buf); } packages_.erase(it); @@ -97,17 +98,19 @@ public: } private: - void print_package(Package& pkg, bool last, char const* data, size_t size) { + void print_package(Package& pkg, bool last, char const* data, size_t size, + struct timespec timestamp) { if (size == 0 && !last) return; if (writer_) { assert(last); - writer_->write(pkg, data, size); + writer_->write(pkg, data, size, timestamp); return; } { ios_save save(out_); - out_ << "*** " << pkg.timestamp.tv_sec << '.' - << std::setfill('0') << std::setw(9) << pkg.timestamp.tv_nsec + auto ts = interleave_ ? timestamp : pkg.timestamp; + out_ << "*** " << ts.tv_sec << '.' + << std::setfill('0') << std::setw(9) << ts.tv_nsec << '\n'; } out_ << "* Source: " << pkg.source_host << ':' << pkg.source_port << '\n' diff --git a/src/monitor-gui.cc b/src/monitor-gui.cc index 61c720f..f4f80d5 100644 --- a/src/monitor-gui.cc +++ b/src/monitor-gui.cc @@ -126,9 +126,12 @@ public: std::string from; std::string to; std::string size; + std::string duration; std::string data; + struct timespec last; + size_t related; Package() @@ -141,7 +144,7 @@ public: } size_t columns() const override { - return 4; + return 5; } std::string header(size_t column) const override { @@ -154,6 +157,8 @@ public: return "To"; case 3: return "Data"; + case 4: + return "Duration"; } assert(false); return ""; @@ -167,6 +172,7 @@ public: case 1: return pkg.from; case 2: return pkg.to; case 3: return pkg.size; + case 4: return pkg.duration; } } assert(false); @@ -187,10 +193,12 @@ public: packages_.emplace_back(); auto& pkg = packages_.back(); pkg.pkg = package; + pkg.last = package.timestamp; format_timestamp(&pkg.timestamp, package.timestamp); format_host_port(&pkg.from, package.source_host, package.source_port); format_host_port(&pkg.to, package.target_host, package.target_port); format_size(&pkg.size, 0, false); + format_duration(&pkg.duration, pkg.pkg.timestamp, pkg.last, false); auto related = related_.find(pkg.to); if (related == related_.end()) { @@ -204,7 +212,8 @@ public: notify_added(index, index); } - size_t package_data(uint32_t id, char const* data, size_t size, bool last) { + size_t package_data(uint32_t id, char const* data, size_t size, bool last, + struct timespec timestamp) { auto it = open_.find(id); if (it == open_.end()) { return std::string::npos; @@ -213,6 +222,8 @@ public: auto& pkg = packages_[index]; pkg.data.append(data, size); format_size(&pkg.size, pkg.data.size(), last); + pkg.last = timestamp; + format_duration(&pkg.duration, pkg.pkg.timestamp, pkg.last, last); if (last) open_.erase(it); notify_changed(index, index); @@ -271,6 +282,31 @@ private: out->assign(tmp, len); } + static void format_duration(std::string* out, struct timespec const& start, + struct timespec const& end, bool) { + char tmp[50]; + if (start.tv_sec == end.tv_sec) { + if (start.tv_nsec == end.tv_nsec) { + out->clear(); + return; + } + auto len = snprintf(tmp, sizeof(tmp), "%lums", + static_cast( + (end.tv_nsec - start.tv_nsec) / 1000000)); + out->assign(tmp, len); + } else { + auto s = end.tv_sec - start.tv_sec; + auto n = (end.tv_nsec - start.tv_nsec) / 1000000; + if (n < 0) { + --s; + n += 1000; + } + auto len = snprintf(tmp, sizeof(tmp), "%ld.%03lus", + static_cast(s), static_cast(n)); + out->assign(tmp, len); + } + } + void notify_added(size_t first, size_t last) { auto it = listeners_.notify(); while (it.has_next()) { @@ -689,16 +725,21 @@ private: : packages_(packages) { } - void package(Package const& package) override { + void package(Package const& package, struct timespec end) override { packages_->package(package); + id_ = package.id; + timestamp_ = end; } void data(uint32_t id, char const* data, size_t size, bool last) override { - packages_->package_data(id, data, size, last); + assert(id_ == id); + packages_->package_data(id, data, size, last, timestamp_); } private: PackageList* const packages_; + uint32_t id_; + struct timespec timestamp_; }; public: @@ -1177,9 +1218,10 @@ public: } void package_data(Monitor* monitor, uint32_t id, - char const* data, size_t size, bool last) override { + char const* data, size_t size, bool last, + struct timespec timestamp) override { assert(monitor == monitor_.get()); - auto index = packages_->package_data(id, data, size, last); + auto index = packages_->package_data(id, data, size, last, timestamp); auto const& pkg = packages_->package(index); protocols_->update(index, pkg.data.data(), pkg.data.size()); if (has_selection_ && index == selection_) { @@ -1416,7 +1458,8 @@ private: packages->package(pkg); packages->package_data(pkg.id, reinterpret_cast(data) + offset + len_ip + len_tcp, - len - offset - len_ip - len_tcp, true); + len - offset - len_ip - len_tcp, true, + pkg.timestamp); } else if (ret == -2) { break; } else if (ret == -1) { @@ -1638,7 +1681,7 @@ private: auto writer = std::unique_ptr(PackagesWriter::create(&out)); for (size_t i = 0; i < packages->rows(); ++i) { auto const& pkg = packages->package(i); - writer->write(pkg.pkg, pkg.data); + writer->write(pkg.pkg, pkg.data, pkg.last); } writer->flush(); if (!out.good()) { diff --git a/src/monitor.cc b/src/monitor.cc index e07b036..cffe3d5 100644 --- a/src/monitor.cc +++ b/src/monitor.cc @@ -179,18 +179,23 @@ private: delegate_->package_data( this, pkg.id, reinterpret_cast(package_) + offset + o, size - o, - last); + last, pkg.timestamp); } } - } else if (size >= 10 && memcmp(package_ + offset, "DAT", 3) == 0) { + } else if (size >= 22 && memcmp(package_ + offset, "DAT", 3) == 0) { uint32_t id = read_u32(package_ + offset + o); o += 4; uint8_t flags = package_[offset + o]; ++o; + struct timespec ts; + ts.tv_sec = read_u64(package_ + offset + o); + o += 8; + ts.tv_nsec = read_u32(package_ + offset + o); + o += 4; delegate_->package_data( this, id, reinterpret_cast(package_) + offset + o, size - o, - !(flags & 0x01)); + !(flags & 0x01), ts); } offset += size; } diff --git a/src/monitor.hh b/src/monitor.hh index 7c6cc40..c3504d0 100644 --- a/src/monitor.hh +++ b/src/monitor.hh @@ -33,7 +33,8 @@ public: virtual void error(Monitor* monitor, std::string const& error) = 0; virtual void package(Monitor* monitor, Package const& package) = 0; virtual void package_data(Monitor* monitor, uint32_t id, - char const* data, size_t size, bool last) = 0; + char const* data, size_t size, bool last, + struct timespec timestamp) = 0; protected: Delegate() {} diff --git a/src/packages.cc b/src/packages.cc index 70f9de4..6aa7500 100644 --- a/src/packages.cc +++ b/src/packages.cc @@ -18,7 +18,7 @@ public: : open_(true), count_(0), out_(out) { uint8_t header[8]; memcpy(header, "TPP", 3); - header[3] = 0x1; // Version + header[3] = 0x2; // Version write_u32(header + 4, 0); // Count out->write(reinterpret_cast(header), 8); } @@ -29,7 +29,8 @@ public: } } - void write(Package const& package, char const* data, size_t size) override { + void write(Package const& package, char const* data, size_t size, + struct timespec end) override { if (!open_) { assert(false); return; @@ -43,9 +44,11 @@ public: ptr = backup.get(); write_package(package, ptr, need); } - uint8_t size_data[8]; - write_u64(size_data, need + size); - out_->write(reinterpret_cast(size_data), 8); + uint8_t size_end[20]; + write_u64(size_end, need + size); + write_u64(size_end + 8, end.tv_sec); + write_u32(size_end + 16, end.tv_nsec); + out_->write(reinterpret_cast(size_end), 20); out_->write(reinterpret_cast(ptr), need); backup.reset(); out_->write(data, size); @@ -84,14 +87,29 @@ PackagesReader::Status PackagesReader::read(std::istream& in, Delegate* delegate) { uint8_t header[8]; in.read(reinterpret_cast(header), 8); - if (!in.good() || memcmp(header, "TPP", 3) || header[3] != 1) { + if (!in.good() || memcmp(header, "TPP", 3)) { return INVALID; } + if (header[3] < 1 || header[3] > 2) { + return INVALID; + } + bool const have_end = header[3] > 1; auto count = read_u32(header + 4); while (count--) { - in.read(reinterpret_cast(header), 8); - if (!in.good()) return IO_ERROR; - size_t size = read_u64(header); + struct timespec end; + size_t size; + if (have_end) { + uint8_t size_end[20]; + in.read(reinterpret_cast(size_end), 20); + if (!in.good()) return IO_ERROR; + size = read_u64(size_end); + end.tv_sec = read_u64(size_end + 8); + end.tv_nsec = read_u32(size_end + 16); + } else { + in.read(reinterpret_cast(header), 8); + if (!in.good()) return IO_ERROR; + size = read_u64(header); + } if (size == 0) return INVALID; uint8_t buf[8192]; size_t avail = std::min(size, sizeof(buf)); @@ -108,15 +126,17 @@ PackagesReader::Status PackagesReader::read(std::istream& in, if (!in.good()) return IO_ERROR; pkg_size = read_package(&pkg, mem.get(), need); if (pkg_size == 0) return INVALID; - delegate->package(pkg); + if (!have_end) end = pkg.timestamp; + delegate->package(pkg, end); delegate->data(pkg.id, reinterpret_cast(mem.get()) + pkg_size, need - pkg_size, need == size); size -= need; } else { - delegate->package(pkg); - delegate->data(pkg.id, reinterpret_cast(buf) + pkg_size, - avail - pkg_size, avail == size); - size -= avail; + if (!have_end) end = pkg.timestamp; + delegate->package(pkg, end); + delegate->data(pkg.id, reinterpret_cast(buf) + pkg_size, + avail - pkg_size, avail == size); + size -= avail; } while (size) { avail = std::min(sizeof(buf), size); diff --git a/src/packages.hh b/src/packages.hh index 682a75b..a10e0bd 100644 --- a/src/packages.hh +++ b/src/packages.hh @@ -11,9 +11,11 @@ public: static PackagesWriter* create(std::ostream* out); - virtual void write(Package const& package, char const* data, size_t size) = 0; - void write(Package const& package, std::string const& data) { - write(package, data.data(), data.size()); + virtual void write(Package const& package, char const* data, size_t size, + struct timespec end) = 0; + void write(Package const& package, std::string const& data, + struct timespec end) { + write(package, data.data(), data.size(), end); } virtual void flush() = 0; @@ -35,7 +37,7 @@ public: public: virtual ~Delegate() {} - virtual void package(Package const& package) = 0; + virtual void package(Package const& package, struct timespec end) = 0; virtual void data(uint32_t id, char const* data, size_t size, bool last) = 0; diff --git a/src/proxy.cc b/src/proxy.cc index b1e5940..b8df00c 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1929,30 +1929,35 @@ void ProxyImpl::send_attached_data(uint32_t id, void const* ptr, size_t size, } if (attached_.empty()) return; if (size == 0 && !last) return; - uint8_t data[10]; + uint8_t data[22]; data[0] = 'D'; data[1] = 'A'; data[2] = 'T'; write_u32(data + 5, id); + auto dur = looper_->now().time_since_epoch(); + auto sec = std::chrono::duration_cast(dur); + write_u64(data + 10, sec.count()); + write_u32(data + 18, std::chrono::duration_cast( + dur - sec).count()); if (size == 0) { assert(last); assert(ptr == nullptr); write_u16(data + 3, 10); data[9] = 0; - send_attached(data, 10, nullptr, 0); + send_attached(data, 22, nullptr, 0); } else { - size_t max = 0xffff - 10; + size_t max = 0xffff - 22; auto p = reinterpret_cast(ptr); data[9] = 1; while (size > max) { write_u16(data + 3, 0xffff); - send_attached(data, 10, p, max); + send_attached(data, 22, p, max); p += max; size -= max; } data[9] = last ? 0 : 1; - write_u16(data + 3, size + 10); - send_attached(data, 10, p, size); + write_u16(data + 3, size + 22); + send_attached(data, 22, p, size); } } -- cgit v1.2.3-70-g09d2