Files
pyMC_Repeater/repeater/data_acquisition/websocket_handler.py
agessaman c2f8a2e3cd refactor: companion FrameServer and related (substantive only, no Black)
Reapply refactor from ce8381a (replace monolithic FrameServer with thin
pymc_core subclass, re-export constants, SQLite persistence hooks) while
preserving pre-refactor whitespace where patch applied cleanly. Remaining
files match refactor commit exactly. Diff vs ce8381a is whitespace-only.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-21 15:35:47 -08:00

164 lines
5.1 KiB
Python

"""
WebSocket handler for real-time packet updates - simple ws4py implementation
"""
import json
import logging
import threading
import time
from urllib.parse import parse_qs
import cherrypy
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
logger = logging.getLogger("WebSocket")
# Suppress noisy ws4py error logs for normal disconnections (ConnectionResetError, etc.)
logging.getLogger("ws4py").setLevel(logging.CRITICAL)
# Global set of connected clients
_connected_clients = set()
# Heartbeat configuration
PING_INTERVAL = 30 # seconds
_heartbeat_thread = None
_heartbeat_running = False
class PacketWebSocket(WebSocket):
def opened(self):
"""Called when a WebSocket connection is established"""
# Authenticate using JWT provided as query parameter (token=)
jwt_handler = cherrypy.config.get("jwt_handler")
# Get query string from environ
qs = ""
if hasattr(self, "environ"):
qs = self.environ.get("QUERY_STRING", "")
params = parse_qs(qs)
token = params.get("token", [None])[0]
client_id = params.get("client_id", [None])[0]
if not jwt_handler:
logger.warning("WebSocket connection rejected: no JWT handler configured")
self.close(code=1011, reason="server configuration error")
return
if not token:
logger.warning("WebSocket connection rejected: missing token")
self.close(code=1008, reason="unauthorized")
return
try:
payload = jwt_handler.verify_jwt(token)
if not payload:
logger.warning("WebSocket connection rejected: invalid token")
self.close(code=1008, reason="unauthorized")
return
except Exception as e:
logger.warning(f"WebSocket auth error: {e}")
self.close(code=1008, reason="unauthorized")
return
if client_id and payload.get("client_id") and payload.get("client_id") != client_id:
logger.warning("WebSocket connection rejected: client_id mismatch")
self.close(code=1008, reason="unauthorized")
return
# Auth success - store user and add to connected clients
self.user = payload.get("sub") # type: ignore[attr-defined]
_connected_clients.add(self)
logger.info(
f"WebSocket connected ({self.user or 'unknown user'}). Total clients: {len(_connected_clients)}"
)
def closed(self, code, reason=None):
"""Called when a WebSocket connection is closed"""
_connected_clients.discard(self)
user = getattr(self, "user", "unknown")
logger.info(
f"WebSocket disconnected (user: {user}, code: {code}, reason: {reason}). Total clients: {len(_connected_clients)}"
)
def received_message(self, message):
"""Handle messages from client"""
try:
data = json.loads(str(message))
if data.get("type") == "ping":
self.send(json.dumps({"type": "pong"}))
elif data.get("type") == "pong":
# Client responded to our ping
pass
except Exception:
pass
def broadcast_packet(packet_data: dict):
if not _connected_clients:
return
message = json.dumps({"type": "packet", "data": packet_data})
for client in list(_connected_clients):
try:
client.send(message)
except Exception as e:
logger.error(f"WebSocket send error: {e}")
_connected_clients.discard(client)
def broadcast_stats(stats_data: dict):
if not _connected_clients:
return
message = json.dumps({"type": "stats", "data": stats_data})
for client in list(_connected_clients):
try:
client.send(message)
except Exception as e:
logger.error(f"WebSocket send error: {e}")
_connected_clients.discard(client)
def _heartbeat_loop():
"""Background thread to send periodic pings to all connected clients"""
global _heartbeat_running
while _heartbeat_running:
time.sleep(PING_INTERVAL)
if not _connected_clients:
continue
ping_message = json.dumps({"type": "ping"})
for client in list(_connected_clients):
try:
client.send(ping_message)
except Exception as e:
logger.debug(f"Heartbeat ping failed: {e}")
_connected_clients.discard(client)
def init_websocket():
"""Initialize WebSocket plugin and start heartbeat"""
global _heartbeat_thread, _heartbeat_running
WebSocketPlugin(cherrypy.engine).subscribe()
cherrypy.tools.websocket = WebSocketTool()
# Start heartbeat thread
if not _heartbeat_running:
_heartbeat_running = True
_heartbeat_thread = threading.Thread(target=_heartbeat_loop, daemon=True)
_heartbeat_thread.start()
logger.info(f"WebSocket initialized with {PING_INTERVAL}s heartbeat")
else:
logger.info("WebSocket initialized")