diff --git a/server.py b/server.py index 6525bb3..b34b3a8 100644 --- a/server.py +++ b/server.py @@ -1,18 +1,22 @@ #!/usr/bin/env python3 """ -Minimal signalling relay for WebRTC data‑channel projects. - -Contract: -• Client sends {"type":"register","id": "..."} once on connect. -• Later it sends {"type":"signal","from": "...", "target": "...", "data": {...}} - The server forwards that JSON verbatim to the socket whose id == target. +Signalling relay + periodic label aggregation broadcaster with stale‑timeout. """ -import asyncio, json, logging +import asyncio, json, logging, time import websockets logging.basicConfig(level=logging.INFO) + +# ws → peer_id mapping, and peer_id → ws CLIENTS: dict[str, websockets.WebSocketServerProtocol] = {} +# peer_id → latest labels list +CLIENT_LABELS: dict[str, list[str]] = {} +# peer_id → timestamp of last labels update +CLIENT_LABELS_TS: dict[str, float] = {} + +# After this many seconds without an update, we'll clear their list +STALE_TIMEOUT = 5.0 async def handler(ws: websockets.WebSocketServerProtocol): peer_id = None @@ -31,31 +35,79 @@ async def handler(ws: websockets.WebSocketServerProtocol): await ws.close(code=4000, reason="Missing id") return CLIENTS[peer_id] = ws + CLIENT_LABELS[peer_id] = [] + CLIENT_LABELS_TS[peer_id] = time.monotonic() logging.info("Registered %s (%d clients)", peer_id, len(CLIENTS)) elif mtype == "signal": target = msg.get("target") - if not target: - continue - dest = CLIENTS.get(target) - if dest: - await dest.send(raw) # relay as‑is - logging.debug("Relayed %s → %s", peer_id, target) + if target in CLIENTS: + await CLIENTS[target].send(raw) else: logging.warning("Target %s not found", target) + elif mtype == "labels": + labels = msg.get("labels") + if peer_id and isinstance(labels, list): + CLIENT_LABELS[peer_id] = labels + CLIENT_LABELS_TS[peer_id] = time.monotonic() + logging.debug("Updated labels for %s: %s", peer_id, labels) + else: + logging.warning("Malformed labels payload from %s: %s", peer_id, raw) + + else: + logging.warning("Unknown message type from %s: %s", peer_id, mtype) + except websockets.ConnectionClosed: pass + finally: - # tidy up on disconnect - if peer_id and CLIENTS.get(peer_id) is ws: - CLIENTS.pop(peer_id) + if peer_id: + CLIENTS.pop(peer_id, None) + CLIENT_LABELS.pop(peer_id, None) + CLIENT_LABELS_TS.pop(peer_id, None) logging.info("Disconnected %s (%d clients left)", peer_id, len(CLIENTS)) + +async def broadcast_labels_periodically(): + while True: + now = time.monotonic() + + # 1) clear stale clients + for pid, last_ts in list(CLIENT_LABELS_TS.items()): + if now - last_ts > STALE_TIMEOUT: + CLIENT_LABELS[pid] = [] + # Optionally also remove their timestamp entry if you don't + # want them checked again until next registration/update: + # CLIENT_LABELS_TS.pop(pid, None) + logging.debug("Cleared labels for %s due to timeout", pid) + + # 2) broadcast nested lists + if CLIENTS: + nested = list(CLIENT_LABELS.values()) + payload = json.dumps({ + "type": "broadcast_labels", + "data": nested + }) + await asyncio.gather(*( + ws.send(payload) for ws in CLIENTS.values() + ), return_exceptions=True) + logging.debug("Broadcasted labels to %d clients", len(CLIENTS)) + + await asyncio.sleep(1) + + async def main(): - async with websockets.serve(handler, host="0.0.0.0", port=8080): + # start the broadcaster task + broadcaster = asyncio.create_task(broadcast_labels_periodically()) + + # start the websocket server + async with websockets.serve(handler, "0.0.0.0", 8080): logging.info("Signalling server listening on :8080") - await asyncio.Future() # run forever + await asyncio.Future() # run forever + + broadcaster.cancel() + if __name__ == "__main__": asyncio.run(main()) diff --git a/server_old.py b/server_old.py new file mode 100644 index 0000000..6525bb3 --- /dev/null +++ b/server_old.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +""" +Minimal signalling relay for WebRTC data‑channel projects. + +Contract: +• Client sends {"type":"register","id": "..."} once on connect. +• Later it sends {"type":"signal","from": "...", "target": "...", "data": {...}} + The server forwards that JSON verbatim to the socket whose id == target. +""" + +import asyncio, json, logging +import websockets + +logging.basicConfig(level=logging.INFO) +CLIENTS: dict[str, websockets.WebSocketServerProtocol] = {} + +async def handler(ws: websockets.WebSocketServerProtocol): + peer_id = None + try: + async for raw in ws: + try: + msg = json.loads(raw) + except json.JSONDecodeError: + logging.warning("Bad JSON from %s: %s", peer_id, raw) + continue + + mtype = msg.get("type") + if mtype == "register": + peer_id = msg.get("id") + if not peer_id: + await ws.close(code=4000, reason="Missing id") + return + CLIENTS[peer_id] = ws + logging.info("Registered %s (%d clients)", peer_id, len(CLIENTS)) + + elif mtype == "signal": + target = msg.get("target") + if not target: + continue + dest = CLIENTS.get(target) + if dest: + await dest.send(raw) # relay as‑is + logging.debug("Relayed %s → %s", peer_id, target) + else: + logging.warning("Target %s not found", target) + + except websockets.ConnectionClosed: + pass + finally: + # tidy up on disconnect + if peer_id and CLIENTS.get(peer_id) is ws: + CLIENTS.pop(peer_id) + logging.info("Disconnected %s (%d clients left)", peer_id, len(CLIENTS)) + +async def main(): + async with websockets.serve(handler, host="0.0.0.0", port=8080): + logging.info("Signalling server listening on :8080") + await asyncio.Future() # run forever + +if __name__ == "__main__": + asyncio.run(main())