Updates server logic
All checks were successful
Build and Push Multi-Platform Docker Image / build-and-push (push) Successful in 57s
All checks were successful
Build and Push Multi-Platform Docker Image / build-and-push (push) Successful in 57s
This commit is contained in:
88
server.py
88
server.py
@@ -1,18 +1,22 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Minimal signalling relay for WebRTC data‑channel projects.
|
Signalling relay + periodic label aggregation broadcaster with stale‑timeout.
|
||||||
|
|
||||||
Contract:
|
|
||||||
• Client sends {"type":"register","id": "..."} once on connect.
|
|
||||||
• Later it sends {"type":"signal","from": "...", "target": "...", "data": {...}}
|
|
||||||
The server forwards that JSON verbatim to the socket whose id == target.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio, json, logging
|
import asyncio, json, logging, time
|
||||||
import websockets
|
import websockets
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
# ws → peer_id mapping, and peer_id → ws
|
||||||
CLIENTS: dict[str, websockets.WebSocketServerProtocol] = {}
|
CLIENTS: dict[str, websockets.WebSocketServerProtocol] = {}
|
||||||
|
# peer_id → latest labels list
|
||||||
|
CLIENT_LABELS: dict[str, list[str]] = {}
|
||||||
|
# peer_id → timestamp of last labels update
|
||||||
|
CLIENT_LABELS_TS: dict[str, float] = {}
|
||||||
|
|
||||||
|
# After this many seconds without an update, we'll clear their list
|
||||||
|
STALE_TIMEOUT = 5.0
|
||||||
|
|
||||||
async def handler(ws: websockets.WebSocketServerProtocol):
|
async def handler(ws: websockets.WebSocketServerProtocol):
|
||||||
peer_id = None
|
peer_id = None
|
||||||
@@ -31,31 +35,79 @@ async def handler(ws: websockets.WebSocketServerProtocol):
|
|||||||
await ws.close(code=4000, reason="Missing id")
|
await ws.close(code=4000, reason="Missing id")
|
||||||
return
|
return
|
||||||
CLIENTS[peer_id] = ws
|
CLIENTS[peer_id] = ws
|
||||||
|
CLIENT_LABELS[peer_id] = []
|
||||||
|
CLIENT_LABELS_TS[peer_id] = time.monotonic()
|
||||||
logging.info("Registered %s (%d clients)", peer_id, len(CLIENTS))
|
logging.info("Registered %s (%d clients)", peer_id, len(CLIENTS))
|
||||||
|
|
||||||
elif mtype == "signal":
|
elif mtype == "signal":
|
||||||
target = msg.get("target")
|
target = msg.get("target")
|
||||||
if not target:
|
if target in CLIENTS:
|
||||||
continue
|
await CLIENTS[target].send(raw)
|
||||||
dest = CLIENTS.get(target)
|
|
||||||
if dest:
|
|
||||||
await dest.send(raw) # relay as‑is
|
|
||||||
logging.debug("Relayed %s → %s", peer_id, target)
|
|
||||||
else:
|
else:
|
||||||
logging.warning("Target %s not found", target)
|
logging.warning("Target %s not found", target)
|
||||||
|
|
||||||
|
elif mtype == "labels":
|
||||||
|
labels = msg.get("labels")
|
||||||
|
if peer_id and isinstance(labels, list):
|
||||||
|
CLIENT_LABELS[peer_id] = labels
|
||||||
|
CLIENT_LABELS_TS[peer_id] = time.monotonic()
|
||||||
|
logging.debug("Updated labels for %s: %s", peer_id, labels)
|
||||||
|
else:
|
||||||
|
logging.warning("Malformed labels payload from %s: %s", peer_id, raw)
|
||||||
|
|
||||||
|
else:
|
||||||
|
logging.warning("Unknown message type from %s: %s", peer_id, mtype)
|
||||||
|
|
||||||
except websockets.ConnectionClosed:
|
except websockets.ConnectionClosed:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# tidy up on disconnect
|
if peer_id:
|
||||||
if peer_id and CLIENTS.get(peer_id) is ws:
|
CLIENTS.pop(peer_id, None)
|
||||||
CLIENTS.pop(peer_id)
|
CLIENT_LABELS.pop(peer_id, None)
|
||||||
|
CLIENT_LABELS_TS.pop(peer_id, None)
|
||||||
logging.info("Disconnected %s (%d clients left)", peer_id, len(CLIENTS))
|
logging.info("Disconnected %s (%d clients left)", peer_id, len(CLIENTS))
|
||||||
|
|
||||||
|
|
||||||
|
async def broadcast_labels_periodically():
|
||||||
|
while True:
|
||||||
|
now = time.monotonic()
|
||||||
|
|
||||||
|
# 1) clear stale clients
|
||||||
|
for pid, last_ts in list(CLIENT_LABELS_TS.items()):
|
||||||
|
if now - last_ts > STALE_TIMEOUT:
|
||||||
|
CLIENT_LABELS[pid] = []
|
||||||
|
# Optionally also remove their timestamp entry if you don't
|
||||||
|
# want them checked again until next registration/update:
|
||||||
|
# CLIENT_LABELS_TS.pop(pid, None)
|
||||||
|
logging.debug("Cleared labels for %s due to timeout", pid)
|
||||||
|
|
||||||
|
# 2) broadcast nested lists
|
||||||
|
if CLIENTS:
|
||||||
|
nested = list(CLIENT_LABELS.values())
|
||||||
|
payload = json.dumps({
|
||||||
|
"type": "broadcast_labels",
|
||||||
|
"data": nested
|
||||||
|
})
|
||||||
|
await asyncio.gather(*(
|
||||||
|
ws.send(payload) for ws in CLIENTS.values()
|
||||||
|
), return_exceptions=True)
|
||||||
|
logging.debug("Broadcasted labels to %d clients", len(CLIENTS))
|
||||||
|
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with websockets.serve(handler, host="0.0.0.0", port=8080):
|
# start the broadcaster task
|
||||||
|
broadcaster = asyncio.create_task(broadcast_labels_periodically())
|
||||||
|
|
||||||
|
# start the websocket server
|
||||||
|
async with websockets.serve(handler, "0.0.0.0", 8080):
|
||||||
logging.info("Signalling server listening on :8080")
|
logging.info("Signalling server listening on :8080")
|
||||||
await asyncio.Future() # run forever
|
await asyncio.Future() # run forever
|
||||||
|
|
||||||
|
broadcaster.cancel()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|||||||
61
server_old.py
Normal file
61
server_old.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Minimal signalling relay for WebRTC data‑channel projects.
|
||||||
|
|
||||||
|
Contract:
|
||||||
|
• Client sends {"type":"register","id": "..."} once on connect.
|
||||||
|
• Later it sends {"type":"signal","from": "...", "target": "...", "data": {...}}
|
||||||
|
The server forwards that JSON verbatim to the socket whose id == target.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio, json, logging
|
||||||
|
import websockets
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
CLIENTS: dict[str, websockets.WebSocketServerProtocol] = {}
|
||||||
|
|
||||||
|
async def handler(ws: websockets.WebSocketServerProtocol):
|
||||||
|
peer_id = None
|
||||||
|
try:
|
||||||
|
async for raw in ws:
|
||||||
|
try:
|
||||||
|
msg = json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logging.warning("Bad JSON from %s: %s", peer_id, raw)
|
||||||
|
continue
|
||||||
|
|
||||||
|
mtype = msg.get("type")
|
||||||
|
if mtype == "register":
|
||||||
|
peer_id = msg.get("id")
|
||||||
|
if not peer_id:
|
||||||
|
await ws.close(code=4000, reason="Missing id")
|
||||||
|
return
|
||||||
|
CLIENTS[peer_id] = ws
|
||||||
|
logging.info("Registered %s (%d clients)", peer_id, len(CLIENTS))
|
||||||
|
|
||||||
|
elif mtype == "signal":
|
||||||
|
target = msg.get("target")
|
||||||
|
if not target:
|
||||||
|
continue
|
||||||
|
dest = CLIENTS.get(target)
|
||||||
|
if dest:
|
||||||
|
await dest.send(raw) # relay as‑is
|
||||||
|
logging.debug("Relayed %s → %s", peer_id, target)
|
||||||
|
else:
|
||||||
|
logging.warning("Target %s not found", target)
|
||||||
|
|
||||||
|
except websockets.ConnectionClosed:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
# tidy up on disconnect
|
||||||
|
if peer_id and CLIENTS.get(peer_id) is ws:
|
||||||
|
CLIENTS.pop(peer_id)
|
||||||
|
logging.info("Disconnected %s (%d clients left)", peer_id, len(CLIENTS))
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with websockets.serve(handler, host="0.0.0.0", port=8080):
|
||||||
|
logging.info("Signalling server listening on :8080")
|
||||||
|
await asyncio.Future() # run forever
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
Reference in New Issue
Block a user