#!/usr/bin/env python import argparse import json import sys from urllib.parse import parse_qs, urlencode, urlparse, urlunparse def validate_schema(schema): if "format" not in schema: raise ValueError("No format") if schema["format"] != "clicks": raise ValueError(f"Unsupported format: {schema['format']}") if "version" not in schema: raise ValueError("No version") if schema["version"] != "1.0": raise ValueError(f"Unsupported version: {schema['version']}") def cleanup_url(url): parts = urlparse(url) query = parse_qs(parts.query, keep_blank_values=True) if "activeTab" in query: new_query = urlencode({"activeTab": query["activeTab"]}) else: new_query = "" parts = parts._replace(query=new_query) return urlunparse(parts) def bad_event(event): # There is a confusing causing "header" event to be reported as click events # "header" events can be reconized by containing brands in tag or x > 1 return "Chromium" in event["tag"] or event["x"] > 1.0 def merge(input_files, output_fh): merged = { "format": "clicks", "version": "1.0", } schemas = [] for input_file in input_files: with open(input_file, "r", encoding="utf-8") as in_fh: try: schema = json.load(in_fh) validate_schema(schema) schemas.append(schema) except json.JSONDecodeError: print(f"{input_file}: Invalid JSON, ignored", file=sys.stderr) continue except ValueError: print(f"{input_file}: Invalid JSON, ignored", file=sys.stderr) continue max_width = 0.0 max_height = 0.0 for schema in schemas: if schema["width"] > max_width: max_width = schema["width"] if schema["height"] > max_height: max_height = schema["height"] merged["width"] = max_width merged["height"] = max_height url_events = {} for schema in schemas: events = None for event in schema["events"]: if "url" in event: url = cleanup_url(event["url"]) events = url_events.get(url, None) if events is None: events = [] url_events[url] = { "url": event["url"], "events": events, } else: events = events["events"] elif not bad_event(event): # x and y in event is already in percentage of width so should scale events.append(event) merged["events"] = [] for url, events in url_events.items(): merged["events"].append( { "url": events["url"], } ) merged["events"].extend(events["events"]) json.dump(merged, output_fh) def main(): parser = argparse.ArgumentParser( prog="merge", description="Merges multiple session reports into one", ) parser.add_argument( "-o", "--output", nargs="?", help="output to FILE instead of stdout", metavar="FILE", ) parser.add_argument("filename", nargs="+", help="input files") args = parser.parse_args() if args.output is not None: with open(args.output, "w", encoding="utf-8") as out: merge(args.filename, out) else: merge(args.filename, sys.stdout) if __name__ == "__main__": main()