From 48746b7409ad4454d76b5a84c4d3b895c563a8a6 Mon Sep 17 00:00:00 2001 From: Hirviturkki Date: Wed, 23 Apr 2025 22:36:34 +0200 Subject: [PATCH] Updated server --- server.py | 27 +++++++++-- server_old_new.py | 116 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 5 deletions(-) create mode 100644 server_old_new.py diff --git a/server.py b/server.py index ce1e4c0..28e077c 100644 --- a/server.py +++ b/server.py @@ -18,6 +18,8 @@ CLIENT_LABELS_TS: dict[str, float] = {} # After this many seconds without an update, we'll clear their list STALE_TIMEOUT = 5.0 +broadcast = False + async def handler(ws: websockets.WebSocketServerProtocol): peer_id = None try: @@ -50,9 +52,21 @@ async def handler(ws: websockets.WebSocketServerProtocol): logging.info(f"Got labels from {peer_id}: {msg['labels']}") labels = msg.get("labels") if peer_id and isinstance(labels, list): - CLIENT_LABELS[peer_id] = labels - CLIENT_LABELS_TS[peer_id] = time.monotonic() - logging.debug("Updated labels for %s: %s", peer_id, labels) + + payload = json.dumps({ + "type": "broadcast_labels", + "from": peer_id, + "labels": labels + }) + + # send to all except the originator + count = 0 + for other_id, other_ws in CLIENTS.items(): + if other_id != peer_id: + await other_ws.send(payload) + count += 1 + + logging.info(f"Re-broadcasted labels to {count} other clients (excluding {peer_id})") else: logging.warning("Malformed labels payload from %s: %s", peer_id, raw) @@ -102,14 +116,17 @@ async def broadcast_labels_periodically(): async def main(): # start the broadcaster task - broadcaster = asyncio.create_task(broadcast_labels_periodically()) + broadcaster = None + if broadcast: + broadcaster = asyncio.create_task(broadcast_labels_periodically()) # start the websocket server async with websockets.serve(handler, "0.0.0.0", 8080): logging.info("Signalling server listening on :8080") await asyncio.Future() # run forever - broadcaster.cancel() + if broadcast: + broadcaster.cancel() if __name__ == "__main__": diff --git a/server_old_new.py b/server_old_new.py new file mode 100644 index 0000000..ce1e4c0 --- /dev/null +++ b/server_old_new.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +""" +Signalling relay + periodic label aggregation broadcaster with stale‑timeout. +""" + +import asyncio, json, logging, time +import websockets + +logging.basicConfig(level=logging.INFO) + +# ws → peer_id mapping, and peer_id → ws +CLIENTS: dict[str, websockets.WebSocketServerProtocol] = {} +# peer_id → latest labels list +CLIENT_LABELS: dict[str, list[str]] = {} +# peer_id → timestamp of last labels update +CLIENT_LABELS_TS: dict[str, float] = {} + +# After this many seconds without an update, we'll clear their list +STALE_TIMEOUT = 5.0 + +async def handler(ws: websockets.WebSocketServerProtocol): + peer_id = None + try: + async for raw in ws: + try: + msg = json.loads(raw) + except json.JSONDecodeError: + logging.warning("Bad JSON from %s: %s", peer_id, raw) + continue + + mtype = msg.get("type") + if mtype == "register": + peer_id = msg.get("id") + if not peer_id: + await ws.close(code=4000, reason="Missing id") + return + CLIENTS[peer_id] = ws + CLIENT_LABELS[peer_id] = [] + CLIENT_LABELS_TS[peer_id] = time.monotonic() + logging.info("Registered %s (%d clients)", peer_id, len(CLIENTS)) + + elif mtype == "signal": + target = msg.get("target") + if target in CLIENTS: + await CLIENTS[target].send(raw) + else: + logging.warning("Target %s not found", target) + + elif mtype == "labels": + logging.info(f"Got labels from {peer_id}: {msg['labels']}") + labels = msg.get("labels") + if peer_id and isinstance(labels, list): + CLIENT_LABELS[peer_id] = labels + CLIENT_LABELS_TS[peer_id] = time.monotonic() + logging.debug("Updated labels for %s: %s", peer_id, labels) + else: + logging.warning("Malformed labels payload from %s: %s", peer_id, raw) + + else: + logging.warning("Unknown message type from %s: %s", peer_id, mtype) + + except websockets.ConnectionClosed: + pass + + finally: + if peer_id: + CLIENTS.pop(peer_id, None) + CLIENT_LABELS.pop(peer_id, None) + CLIENT_LABELS_TS.pop(peer_id, None) + logging.info("Disconnected %s (%d clients left)", peer_id, len(CLIENTS)) + + +async def broadcast_labels_periodically(): + while True: + now = time.monotonic() + + # 1) clear stale clients + for pid, last_ts in list(CLIENT_LABELS_TS.items()): + if now - last_ts > STALE_TIMEOUT: + + CLIENT_LABELS[pid] = [] + # Optionally also remove their timestamp entry if you don't + # want them checked again until next registration/update: + # CLIENT_LABELS_TS.pop(pid, None) + logging.debug("Cleared labels for %s due to timeout", pid) + + # 2) broadcast nested lists + if CLIENTS: + nested = list(CLIENT_LABELS.values()) + payload = json.dumps({ + "type": "broadcast_labels", + "data": nested + }) + await asyncio.gather(*( + ws.send(payload) for ws in CLIENTS.values() + ), return_exceptions=True) + logging.debug("Broadcasted labels to %d clients", len(CLIENTS)) + logging.info(f"Broadcasted labels >>>>>> {payload}") + + await asyncio.sleep(1) + + +async def main(): + # start the broadcaster task + broadcaster = asyncio.create_task(broadcast_labels_periodically()) + + # start the websocket server + async with websockets.serve(handler, "0.0.0.0", 8080): + logging.info("Signalling server listening on :8080") + await asyncio.Future() # run forever + + broadcaster.cancel() + + +if __name__ == "__main__": + asyncio.run(main())