mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-03-28 17:43:06 +01:00
Reapply refactor from ce8381a (replace monolithic FrameServer with thin pymc_core subclass, re-export constants, SQLite persistence hooks) while preserving pre-refactor whitespace where patch applied cleanly. Remaining files match refactor commit exactly. Diff vs ce8381a is whitespace-only. Co-authored-by: Cursor <cursoragent@cursor.com>
164 lines
5.1 KiB
Python
164 lines
5.1 KiB
Python
"""
|
|
WebSocket handler for real-time packet updates - simple ws4py implementation
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
from urllib.parse import parse_qs
|
|
|
|
import cherrypy
|
|
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
|
|
from ws4py.websocket import WebSocket
|
|
|
|
logger = logging.getLogger("WebSocket")
|
|
|
|
# Suppress noisy ws4py error logs for normal disconnections (ConnectionResetError, etc.)
|
|
logging.getLogger("ws4py").setLevel(logging.CRITICAL)
|
|
|
|
# Global set of connected clients
|
|
_connected_clients = set()
|
|
|
|
# Heartbeat configuration
|
|
PING_INTERVAL = 30 # seconds
|
|
_heartbeat_thread = None
|
|
_heartbeat_running = False
|
|
|
|
|
|
class PacketWebSocket(WebSocket):
|
|
|
|
def opened(self):
|
|
"""Called when a WebSocket connection is established"""
|
|
# Authenticate using JWT provided as query parameter (token=)
|
|
jwt_handler = cherrypy.config.get("jwt_handler")
|
|
|
|
# Get query string from environ
|
|
qs = ""
|
|
if hasattr(self, "environ"):
|
|
qs = self.environ.get("QUERY_STRING", "")
|
|
|
|
params = parse_qs(qs)
|
|
token = params.get("token", [None])[0]
|
|
client_id = params.get("client_id", [None])[0]
|
|
|
|
if not jwt_handler:
|
|
logger.warning("WebSocket connection rejected: no JWT handler configured")
|
|
self.close(code=1011, reason="server configuration error")
|
|
return
|
|
|
|
if not token:
|
|
logger.warning("WebSocket connection rejected: missing token")
|
|
self.close(code=1008, reason="unauthorized")
|
|
return
|
|
|
|
try:
|
|
payload = jwt_handler.verify_jwt(token)
|
|
if not payload:
|
|
logger.warning("WebSocket connection rejected: invalid token")
|
|
self.close(code=1008, reason="unauthorized")
|
|
return
|
|
except Exception as e:
|
|
logger.warning(f"WebSocket auth error: {e}")
|
|
self.close(code=1008, reason="unauthorized")
|
|
return
|
|
|
|
if client_id and payload.get("client_id") and payload.get("client_id") != client_id:
|
|
logger.warning("WebSocket connection rejected: client_id mismatch")
|
|
self.close(code=1008, reason="unauthorized")
|
|
return
|
|
|
|
# Auth success - store user and add to connected clients
|
|
self.user = payload.get("sub") # type: ignore[attr-defined]
|
|
_connected_clients.add(self)
|
|
logger.info(
|
|
f"WebSocket connected ({self.user or 'unknown user'}). Total clients: {len(_connected_clients)}"
|
|
)
|
|
|
|
def closed(self, code, reason=None):
|
|
"""Called when a WebSocket connection is closed"""
|
|
_connected_clients.discard(self)
|
|
user = getattr(self, "user", "unknown")
|
|
logger.info(
|
|
f"WebSocket disconnected (user: {user}, code: {code}, reason: {reason}). Total clients: {len(_connected_clients)}"
|
|
)
|
|
|
|
def received_message(self, message):
|
|
"""Handle messages from client"""
|
|
try:
|
|
data = json.loads(str(message))
|
|
if data.get("type") == "ping":
|
|
self.send(json.dumps({"type": "pong"}))
|
|
elif data.get("type") == "pong":
|
|
# Client responded to our ping
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def broadcast_packet(packet_data: dict):
|
|
|
|
if not _connected_clients:
|
|
return
|
|
|
|
message = json.dumps({"type": "packet", "data": packet_data})
|
|
|
|
for client in list(_connected_clients):
|
|
try:
|
|
client.send(message)
|
|
except Exception as e:
|
|
logger.error(f"WebSocket send error: {e}")
|
|
_connected_clients.discard(client)
|
|
|
|
|
|
def broadcast_stats(stats_data: dict):
|
|
|
|
if not _connected_clients:
|
|
return
|
|
|
|
message = json.dumps({"type": "stats", "data": stats_data})
|
|
|
|
for client in list(_connected_clients):
|
|
try:
|
|
client.send(message)
|
|
except Exception as e:
|
|
logger.error(f"WebSocket send error: {e}")
|
|
_connected_clients.discard(client)
|
|
|
|
|
|
def _heartbeat_loop():
|
|
"""Background thread to send periodic pings to all connected clients"""
|
|
global _heartbeat_running
|
|
|
|
while _heartbeat_running:
|
|
time.sleep(PING_INTERVAL)
|
|
|
|
if not _connected_clients:
|
|
continue
|
|
|
|
ping_message = json.dumps({"type": "ping"})
|
|
|
|
for client in list(_connected_clients):
|
|
try:
|
|
client.send(ping_message)
|
|
except Exception as e:
|
|
logger.debug(f"Heartbeat ping failed: {e}")
|
|
_connected_clients.discard(client)
|
|
|
|
|
|
def init_websocket():
|
|
"""Initialize WebSocket plugin and start heartbeat"""
|
|
global _heartbeat_thread, _heartbeat_running
|
|
|
|
WebSocketPlugin(cherrypy.engine).subscribe()
|
|
cherrypy.tools.websocket = WebSocketTool()
|
|
|
|
# Start heartbeat thread
|
|
if not _heartbeat_running:
|
|
_heartbeat_running = True
|
|
_heartbeat_thread = threading.Thread(target=_heartbeat_loop, daemon=True)
|
|
_heartbeat_thread.start()
|
|
logger.info(f"WebSocket initialized with {PING_INTERVAL}s heartbeat")
|
|
else:
|
|
logger.info("WebSocket initialized")
|