#!/usr/bin/env python import argparse import asyncio import json import sys from datetime import datetime, timezone from pathlib import Path from websockets.asyncio.server import serve from websockets.exceptions import ConnectionClosedOK class Context(): def __init__(self, output): self.output = output output.mkdir(parents = True, exist_ok = True) def next(self): ts = datetime.now(timezone.utc).timestamp() return self.output / f"session-{ts}.json" def quote(instr): return json.dumps(instr) def parse_header(message): parts = message.split('|') if len(parts) < 3: return None return { 'width': float(parts[0]), 'height': float(parts[1]), 'brands': '|'.join(parts[2:]), } def parse_event(message): parts = message.split('|') if parts[0] == 'u' and len(parts) > 1: return { 'type': 'url', 'url': '|'.join(parts[1:]), } if len(parts) < 3: return None return { 'type': 'click', 'x': float(parts[0]), 'y': float(parts[1]), 'tag': parts[2], } async def handler(context, websocket): filename = context.next() with open(filename, "w", encoding="utf-8") as session_file: try: message = await websocket.recv() except ConnectionClosedOK: filename.unlink() return header = parse_header(message) if not header: print(f"Bad header: {message}", file=sys.stderr) filename.unlink() return print(f'''{{ "format":"clicks", "version":"1.0", "width": {header['width']}, "height": {header['height']}, "brands": {quote(header['brands'])}, "events": [''', file=session_file) comma = '' while True: try: message = await websocket.recv() event = parse_event(message) if event: if event['type'] == 'click': print(f'''{comma}{{ "x": {event['x']}, "y": {event['y']}, "tag": {quote(event['tag'])} }}''', file=session_file) else: print(f'''{comma}{{"url": {quote(event['url'])}}}''', file=session_file) if not comma: comma = ',' except ConnectionClosedOK: break print(']}', file=session_file) async def server(output): context = Context(output) async with serve(lambda x: handler(context, x), "127.0.0.1", 8001): await asyncio.get_running_loop().create_future() def main(): parser = argparse.ArgumentParser( description='Example server') parser.add_argument('output', type=Path, help='output directory') args = parser.parse_args() asyncio.run(server(args.output)) if __name__ == "__main__": main()