summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJoel Klinghed <the_jk@yahoo.com>2017-08-10 21:55:29 +0200
committerJoel Klinghed <the_jk@yahoo.com>2017-08-10 21:55:29 +0200
commitcc61af1e2c933f2178a5dc3e5a5599e74d380eb3 (patch)
tree80e034aeb4f4511f7ddb3cc990fb6eb942a657ab /src
parente3f7ecc6bdf7bbd1ae7ec10c387e15743842db90 (diff)
Add duration to packages and tpp format
Diffstat (limited to 'src')
-rw-r--r--src/monitor-cmd.cc19
-rw-r--r--src/monitor-gui.cc59
-rw-r--r--src/monitor.cc11
-rw-r--r--src/monitor.hh3
-rw-r--r--src/packages.cc48
-rw-r--r--src/packages.hh10
-rw-r--r--src/proxy.cc17
7 files changed, 123 insertions, 44 deletions
diff --git a/src/monitor-cmd.cc b/src/monitor-cmd.cc
index 580f45f..eb7245d 100644
--- a/src/monitor-cmd.cc
+++ b/src/monitor-cmd.cc
@@ -63,27 +63,28 @@ public:
}
void package_data(Monitor*, uint32_t id,
- char const* data, size_t size, bool last) override {
+ char const* data, size_t size, bool last,
+ struct timespec timestamp) override {
auto it = packages_.find(id);
if (it == packages_.end()) {
assert(false);
return;
}
if (interleave_) {
- print_package(it->second, last, data, size);
+ print_package(it->second, last, data, size, timestamp);
if (last) packages_.erase(it);
return;
}
auto buf = data_.find(id);
if (last) {
if (buf == data_.end()) {
- print_package(it->second, true, data, size);
+ print_package(it->second, true, data, size, timestamp);
} else {
buf->second->write(data, size);
size_t avail;
auto ptr = buf->second->read_ptr(&avail);
print_package(it->second, true,
- reinterpret_cast<char const*>(ptr), avail);
+ reinterpret_cast<char const*>(ptr), avail, timestamp);
data_.erase(buf);
}
packages_.erase(it);
@@ -97,17 +98,19 @@ public:
}
private:
- void print_package(Package& pkg, bool last, char const* data, size_t size) {
+ void print_package(Package& pkg, bool last, char const* data, size_t size,
+ struct timespec timestamp) {
if (size == 0 && !last) return;
if (writer_) {
assert(last);
- writer_->write(pkg, data, size);
+ writer_->write(pkg, data, size, timestamp);
return;
}
{
ios_save save(out_);
- out_ << "*** " << pkg.timestamp.tv_sec << '.'
- << std::setfill('0') << std::setw(9) << pkg.timestamp.tv_nsec
+ auto ts = interleave_ ? timestamp : pkg.timestamp;
+ out_ << "*** " << ts.tv_sec << '.'
+ << std::setfill('0') << std::setw(9) << ts.tv_nsec
<< '\n';
}
out_ << "* Source: " << pkg.source_host << ':' << pkg.source_port << '\n'
diff --git a/src/monitor-gui.cc b/src/monitor-gui.cc
index 61c720f..f4f80d5 100644
--- a/src/monitor-gui.cc
+++ b/src/monitor-gui.cc
@@ -126,9 +126,12 @@ public:
std::string from;
std::string to;
std::string size;
+ std::string duration;
std::string data;
+ struct timespec last;
+
size_t related;
Package()
@@ -141,7 +144,7 @@ public:
}
size_t columns() const override {
- return 4;
+ return 5;
}
std::string header(size_t column) const override {
@@ -154,6 +157,8 @@ public:
return "To";
case 3:
return "Data";
+ case 4:
+ return "Duration";
}
assert(false);
return "";
@@ -167,6 +172,7 @@ public:
case 1: return pkg.from;
case 2: return pkg.to;
case 3: return pkg.size;
+ case 4: return pkg.duration;
}
}
assert(false);
@@ -187,10 +193,12 @@ public:
packages_.emplace_back();
auto& pkg = packages_.back();
pkg.pkg = package;
+ pkg.last = package.timestamp;
format_timestamp(&pkg.timestamp, package.timestamp);
format_host_port(&pkg.from, package.source_host, package.source_port);
format_host_port(&pkg.to, package.target_host, package.target_port);
format_size(&pkg.size, 0, false);
+ format_duration(&pkg.duration, pkg.pkg.timestamp, pkg.last, false);
auto related = related_.find(pkg.to);
if (related == related_.end()) {
@@ -204,7 +212,8 @@ public:
notify_added(index, index);
}
- size_t package_data(uint32_t id, char const* data, size_t size, bool last) {
+ size_t package_data(uint32_t id, char const* data, size_t size, bool last,
+ struct timespec timestamp) {
auto it = open_.find(id);
if (it == open_.end()) {
return std::string::npos;
@@ -213,6 +222,8 @@ public:
auto& pkg = packages_[index];
pkg.data.append(data, size);
format_size(&pkg.size, pkg.data.size(), last);
+ pkg.last = timestamp;
+ format_duration(&pkg.duration, pkg.pkg.timestamp, pkg.last, last);
if (last) open_.erase(it);
notify_changed(index, index);
@@ -271,6 +282,31 @@ private:
out->assign(tmp, len);
}
+ static void format_duration(std::string* out, struct timespec const& start,
+ struct timespec const& end, bool) {
+ char tmp[50];
+ if (start.tv_sec == end.tv_sec) {
+ if (start.tv_nsec == end.tv_nsec) {
+ out->clear();
+ return;
+ }
+ auto len = snprintf(tmp, sizeof(tmp), "%lums",
+ static_cast<unsigned long>(
+ (end.tv_nsec - start.tv_nsec) / 1000000));
+ out->assign(tmp, len);
+ } else {
+ auto s = end.tv_sec - start.tv_sec;
+ auto n = (end.tv_nsec - start.tv_nsec) / 1000000;
+ if (n < 0) {
+ --s;
+ n += 1000;
+ }
+ auto len = snprintf(tmp, sizeof(tmp), "%ld.%03lus",
+ static_cast<long>(s), static_cast<unsigned long>(n));
+ out->assign(tmp, len);
+ }
+ }
+
void notify_added(size_t first, size_t last) {
auto it = listeners_.notify();
while (it.has_next()) {
@@ -689,16 +725,21 @@ private:
: packages_(packages) {
}
- void package(Package const& package) override {
+ void package(Package const& package, struct timespec end) override {
packages_->package(package);
+ id_ = package.id;
+ timestamp_ = end;
}
void data(uint32_t id, char const* data, size_t size, bool last) override {
- packages_->package_data(id, data, size, last);
+ assert(id_ == id);
+ packages_->package_data(id, data, size, last, timestamp_);
}
private:
PackageList* const packages_;
+ uint32_t id_;
+ struct timespec timestamp_;
};
public:
@@ -1177,9 +1218,10 @@ public:
}
void package_data(Monitor* monitor, uint32_t id,
- char const* data, size_t size, bool last) override {
+ char const* data, size_t size, bool last,
+ struct timespec timestamp) override {
assert(monitor == monitor_.get());
- auto index = packages_->package_data(id, data, size, last);
+ auto index = packages_->package_data(id, data, size, last, timestamp);
auto const& pkg = packages_->package(index);
protocols_->update(index, pkg.data.data(), pkg.data.size());
if (has_selection_ && index == selection_) {
@@ -1416,7 +1458,8 @@ private:
packages->package(pkg);
packages->package_data(pkg.id, reinterpret_cast<char const*>(data)
+ offset + len_ip + len_tcp,
- len - offset - len_ip - len_tcp, true);
+ len - offset - len_ip - len_tcp, true,
+ pkg.timestamp);
} else if (ret == -2) {
break;
} else if (ret == -1) {
@@ -1638,7 +1681,7 @@ private:
auto writer = std::unique_ptr<PackagesWriter>(PackagesWriter::create(&out));
for (size_t i = 0; i < packages->rows(); ++i) {
auto const& pkg = packages->package(i);
- writer->write(pkg.pkg, pkg.data);
+ writer->write(pkg.pkg, pkg.data, pkg.last);
}
writer->flush();
if (!out.good()) {
diff --git a/src/monitor.cc b/src/monitor.cc
index e07b036..cffe3d5 100644
--- a/src/monitor.cc
+++ b/src/monitor.cc
@@ -179,18 +179,23 @@ private:
delegate_->package_data(
this, pkg.id,
reinterpret_cast<char*>(package_) + offset + o, size - o,
- last);
+ last, pkg.timestamp);
}
}
- } else if (size >= 10 && memcmp(package_ + offset, "DAT", 3) == 0) {
+ } else if (size >= 22 && memcmp(package_ + offset, "DAT", 3) == 0) {
uint32_t id = read_u32(package_ + offset + o);
o += 4;
uint8_t flags = package_[offset + o];
++o;
+ struct timespec ts;
+ ts.tv_sec = read_u64(package_ + offset + o);
+ o += 8;
+ ts.tv_nsec = read_u32(package_ + offset + o);
+ o += 4;
delegate_->package_data(
this, id,
reinterpret_cast<char*>(package_) + offset + o, size - o,
- !(flags & 0x01));
+ !(flags & 0x01), ts);
}
offset += size;
}
diff --git a/src/monitor.hh b/src/monitor.hh
index 7c6cc40..c3504d0 100644
--- a/src/monitor.hh
+++ b/src/monitor.hh
@@ -33,7 +33,8 @@ public:
virtual void error(Monitor* monitor, std::string const& error) = 0;
virtual void package(Monitor* monitor, Package const& package) = 0;
virtual void package_data(Monitor* monitor, uint32_t id,
- char const* data, size_t size, bool last) = 0;
+ char const* data, size_t size, bool last,
+ struct timespec timestamp) = 0;
protected:
Delegate() {}
diff --git a/src/packages.cc b/src/packages.cc
index 70f9de4..6aa7500 100644
--- a/src/packages.cc
+++ b/src/packages.cc
@@ -18,7 +18,7 @@ public:
: open_(true), count_(0), out_(out) {
uint8_t header[8];
memcpy(header, "TPP", 3);
- header[3] = 0x1; // Version
+ header[3] = 0x2; // Version
write_u32(header + 4, 0); // Count
out->write(reinterpret_cast<char*>(header), 8);
}
@@ -29,7 +29,8 @@ public:
}
}
- void write(Package const& package, char const* data, size_t size) override {
+ void write(Package const& package, char const* data, size_t size,
+ struct timespec end) override {
if (!open_) {
assert(false);
return;
@@ -43,9 +44,11 @@ public:
ptr = backup.get();
write_package(package, ptr, need);
}
- uint8_t size_data[8];
- write_u64(size_data, need + size);
- out_->write(reinterpret_cast<char*>(size_data), 8);
+ uint8_t size_end[20];
+ write_u64(size_end, need + size);
+ write_u64(size_end + 8, end.tv_sec);
+ write_u32(size_end + 16, end.tv_nsec);
+ out_->write(reinterpret_cast<char*>(size_end), 20);
out_->write(reinterpret_cast<char*>(ptr), need);
backup.reset();
out_->write(data, size);
@@ -84,14 +87,29 @@ PackagesReader::Status PackagesReader::read(std::istream& in,
Delegate* delegate) {
uint8_t header[8];
in.read(reinterpret_cast<char*>(header), 8);
- if (!in.good() || memcmp(header, "TPP", 3) || header[3] != 1) {
+ if (!in.good() || memcmp(header, "TPP", 3)) {
return INVALID;
}
+ if (header[3] < 1 || header[3] > 2) {
+ return INVALID;
+ }
+ bool const have_end = header[3] > 1;
auto count = read_u32(header + 4);
while (count--) {
- in.read(reinterpret_cast<char*>(header), 8);
- if (!in.good()) return IO_ERROR;
- size_t size = read_u64(header);
+ struct timespec end;
+ size_t size;
+ if (have_end) {
+ uint8_t size_end[20];
+ in.read(reinterpret_cast<char*>(size_end), 20);
+ if (!in.good()) return IO_ERROR;
+ size = read_u64(size_end);
+ end.tv_sec = read_u64(size_end + 8);
+ end.tv_nsec = read_u32(size_end + 16);
+ } else {
+ in.read(reinterpret_cast<char*>(header), 8);
+ if (!in.good()) return IO_ERROR;
+ size = read_u64(header);
+ }
if (size == 0) return INVALID;
uint8_t buf[8192];
size_t avail = std::min(size, sizeof(buf));
@@ -108,15 +126,17 @@ PackagesReader::Status PackagesReader::read(std::istream& in,
if (!in.good()) return IO_ERROR;
pkg_size = read_package(&pkg, mem.get(), need);
if (pkg_size == 0) return INVALID;
- delegate->package(pkg);
+ if (!have_end) end = pkg.timestamp;
+ delegate->package(pkg, end);
delegate->data(pkg.id, reinterpret_cast<char*>(mem.get())
+ pkg_size, need - pkg_size, need == size);
size -= need;
} else {
- delegate->package(pkg);
- delegate->data(pkg.id, reinterpret_cast<char*>(buf) + pkg_size,
- avail - pkg_size, avail == size);
- size -= avail;
+ if (!have_end) end = pkg.timestamp;
+ delegate->package(pkg, end);
+ delegate->data(pkg.id, reinterpret_cast<char*>(buf) + pkg_size,
+ avail - pkg_size, avail == size);
+ size -= avail;
}
while (size) {
avail = std::min(sizeof(buf), size);
diff --git a/src/packages.hh b/src/packages.hh
index 682a75b..a10e0bd 100644
--- a/src/packages.hh
+++ b/src/packages.hh
@@ -11,9 +11,11 @@ public:
static PackagesWriter* create(std::ostream* out);
- virtual void write(Package const& package, char const* data, size_t size) = 0;
- void write(Package const& package, std::string const& data) {
- write(package, data.data(), data.size());
+ virtual void write(Package const& package, char const* data, size_t size,
+ struct timespec end) = 0;
+ void write(Package const& package, std::string const& data,
+ struct timespec end) {
+ write(package, data.data(), data.size(), end);
}
virtual void flush() = 0;
@@ -35,7 +37,7 @@ public:
public:
virtual ~Delegate() {}
- virtual void package(Package const& package) = 0;
+ virtual void package(Package const& package, struct timespec end) = 0;
virtual void data(uint32_t id, char const* data, size_t size,
bool last) = 0;
diff --git a/src/proxy.cc b/src/proxy.cc
index b1e5940..b8df00c 100644
--- a/src/proxy.cc
+++ b/src/proxy.cc
@@ -1929,30 +1929,35 @@ void ProxyImpl::send_attached_data(uint32_t id, void const* ptr, size_t size,
}
if (attached_.empty()) return;
if (size == 0 && !last) return;
- uint8_t data[10];
+ uint8_t data[22];
data[0] = 'D';
data[1] = 'A';
data[2] = 'T';
write_u32(data + 5, id);
+ auto dur = looper_->now().time_since_epoch();
+ auto sec = std::chrono::duration_cast<std::chrono::seconds>(dur);
+ write_u64(data + 10, sec.count());
+ write_u32(data + 18, std::chrono::duration_cast<std::chrono::nanoseconds>(
+ dur - sec).count());
if (size == 0) {
assert(last);
assert(ptr == nullptr);
write_u16(data + 3, 10);
data[9] = 0;
- send_attached(data, 10, nullptr, 0);
+ send_attached(data, 22, nullptr, 0);
} else {
- size_t max = 0xffff - 10;
+ size_t max = 0xffff - 22;
auto p = reinterpret_cast<char const*>(ptr);
data[9] = 1;
while (size > max) {
write_u16(data + 3, 0xffff);
- send_attached(data, 10, p, max);
+ send_attached(data, 22, p, max);
p += max;
size -= max;
}
data[9] = last ? 0 : 1;
- write_u16(data + 3, size + 10);
- send_attached(data, 10, p, size);
+ write_u16(data + 3, size + 22);
+ send_attached(data, 22, p, size);
}
}