1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
#!/usr/bin/env python
import argparse
import json
import sys
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
def validate_schema(schema):
if "format" not in schema:
raise ValueError("No format")
if schema["format"] != "clicks":
raise ValueError(f"Unsupported format: {schema['format']}")
if "version" not in schema:
raise ValueError("No version")
if schema["version"] != "1.0":
raise ValueError(f"Unsupported version: {schema['version']}")
def cleanup_url(url):
parts = urlparse(url)
query = parse_qs(parts.query, keep_blank_values=True)
if "activeTab" in query:
new_query = urlencode({"activeTab": query["activeTab"]})
else:
new_query = ""
parts = parts._replace(query=new_query)
return urlunparse(parts)
def bad_event(event):
# There is a confusing causing "header" event to be reported as click events
# "header" events can be reconized by containing brands in tag or x > 1
return "Chromium" in event["tag"] or event["x"] > 1.0
def merge(input_files, output_fh):
merged = {
"format": "clicks",
"version": "1.0",
}
schemas = []
for input_file in input_files:
with open(input_file, "r", encoding="utf-8") as in_fh:
try:
schema = json.load(in_fh)
validate_schema(schema)
schemas.append(schema)
except json.JSONDecodeError:
print(f"{input_file}: Invalid JSON, ignored", file=sys.stderr)
continue
except ValueError:
print(f"{input_file}: Invalid JSON, ignored", file=sys.stderr)
continue
max_width = 0.0
max_height = 0.0
for schema in schemas:
if schema["width"] > max_width:
max_width = schema["width"]
if schema["height"] > max_height:
max_height = schema["height"]
merged["width"] = max_width
merged["height"] = max_height
url_events = {}
for schema in schemas:
events = None
for event in schema["events"]:
if "url" in event:
url = cleanup_url(event["url"])
events = url_events.get(url, None)
if events is None:
events = []
url_events[url] = {
"url": event["url"],
"events": events,
}
else:
events = events["events"]
elif not bad_event(event):
# x and y in event is already in percentage of width so should scale
events.append(event)
merged["events"] = []
for url, events in url_events.items():
merged["events"].append(
{
"url": events["url"],
}
)
merged["events"].extend(events["events"])
json.dump(merged, output_fh)
def main():
parser = argparse.ArgumentParser(
prog="merge",
description="Merges multiple session reports into one",
)
parser.add_argument(
"-o",
"--output",
nargs="?",
help="output to FILE instead of stdout",
metavar="FILE",
)
parser.add_argument("filename", nargs="+", help="input files")
args = parser.parse_args()
if args.output is not None:
with open(args.output, "w", encoding="utf-8") as out:
merge(args.filename, out)
else:
merge(args.filename, sys.stdout)
if __name__ == "__main__":
main()
|