mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-03-28 17:43:06 +01:00
feat: add /ws/companion_frame WebSocket proxy
Bridges browser WebSocket connections to companion TCP frame servers. Uses configured bind_address (not hardcoded 127.0.0.1) so the proxy works regardless of how the frame server is bound. - JWT auth (same pattern as PacketWebSocket) - Resolves companion by name → (host, port) from config - Raw byte pipe: WS ↔ TCP, no protocol parsing - Diagnostic logging throughout for troubleshooting Co-Authored-By: Oz <oz-agent@warp.dev>
This commit is contained in:
@@ -65,31 +65,37 @@ class CompanionFrameWebSocket(WebSocket):
|
||||
self.close(code=1008, reason="missing companion_name")
|
||||
return
|
||||
|
||||
# Resolve companion TCP port from config
|
||||
tcp_port = self._resolve_tcp_port(companion_name)
|
||||
if tcp_port is None:
|
||||
# Resolve companion TCP port + bind address from config
|
||||
resolved = self._resolve_tcp_endpoint(companion_name)
|
||||
if resolved is None:
|
||||
logger.warning(f"Connection rejected: companion '{companion_name}' not found")
|
||||
self.close(code=1008, reason="companion not found")
|
||||
return
|
||||
|
||||
tcp_host, tcp_port = resolved
|
||||
|
||||
# Open TCP socket to the companion frame server
|
||||
try:
|
||||
self._tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._tcp.settimeout(5.0)
|
||||
self._tcp.connect(("127.0.0.1", tcp_port))
|
||||
self._tcp.connect((tcp_host, tcp_port))
|
||||
self._tcp.settimeout(None)
|
||||
logger.debug(f"TCP connected to {tcp_host}:{tcp_port} for '{companion_name}'")
|
||||
except Exception as e:
|
||||
logger.error(f"TCP connect failed for '{companion_name}' port {tcp_port}: {e}")
|
||||
logger.error(f"TCP connect failed for '{companion_name}' {tcp_host}:{tcp_port}: {e}")
|
||||
self._tcp = None
|
||||
self.close(code=1011, reason="TCP connect failed")
|
||||
return
|
||||
|
||||
self._closing = False
|
||||
self._reader = threading.Thread(target=self._tcp_to_ws, daemon=True)
|
||||
self._companion_name = companion_name
|
||||
self._reader = threading.Thread(
|
||||
target=self._tcp_to_ws, daemon=True, name=f"ws-tcp-{companion_name}"
|
||||
)
|
||||
self._reader.start()
|
||||
|
||||
user = payload.get("sub", "unknown")
|
||||
logger.info(f"Companion WS opened: user={user}, companion={companion_name}, port={tcp_port}")
|
||||
logger.info(f"Companion WS opened: user={user}, companion={companion_name}, tcp={tcp_host}:{tcp_port}")
|
||||
|
||||
def received_message(self, message):
|
||||
"""WS → TCP"""
|
||||
@@ -101,46 +107,84 @@ class CompanionFrameWebSocket(WebSocket):
|
||||
if isinstance(data, str):
|
||||
data = data.encode("latin-1")
|
||||
tcp.sendall(data)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
name = getattr(self, "_companion_name", "?")
|
||||
logger.warning(f"WS→TCP send failed for '{name}': {e}")
|
||||
self._teardown()
|
||||
|
||||
def closed(self, code, reason=None):
|
||||
logger.info(f"Companion WS closed (code={code})")
|
||||
name = getattr(self, "_companion_name", "?")
|
||||
logger.info(f"Companion WS closed: companion={name}, code={code}, reason={reason}")
|
||||
self._teardown()
|
||||
|
||||
# ── internal ─────────────────────────────────────────────────────
|
||||
# ── internal ─────────────────────────────────────────────────────────
|
||||
|
||||
def _resolve_tcp_port(self, companion_name):
|
||||
"""Look up companion TCP port from daemon config. Returns port or None."""
|
||||
def _resolve_tcp_endpoint(self, companion_name):
|
||||
"""Look up companion TCP host + port from daemon config.
|
||||
|
||||
Returns ``(host, port)`` tuple or ``None`` if the companion can't be
|
||||
resolved. When ``bind_address`` is ``0.0.0.0`` (all interfaces) we
|
||||
connect via ``127.0.0.1``; otherwise we use the configured address.
|
||||
"""
|
||||
if not _daemon:
|
||||
logger.warning("_resolve_tcp_endpoint: daemon not set")
|
||||
return None
|
||||
|
||||
# Verify the companion is actually registered
|
||||
identity_manager = getattr(_daemon, "identity_manager", None)
|
||||
bridges = getattr(_daemon, "companion_bridges", {})
|
||||
if not identity_manager or not bridges:
|
||||
|
||||
if not identity_manager:
|
||||
logger.warning("_resolve_tcp_endpoint: no identity_manager")
|
||||
return None
|
||||
if not bridges:
|
||||
logger.warning("_resolve_tcp_endpoint: no companion_bridges (dict empty or missing)")
|
||||
return None
|
||||
|
||||
# Find the companion identity by name and verify its bridge is running
|
||||
found = False
|
||||
for name, identity, _cfg in identity_manager.get_identities_by_type("companion"):
|
||||
if name == companion_name:
|
||||
h = identity.get_public_key()[0]
|
||||
if h in bridges:
|
||||
found = True
|
||||
else:
|
||||
logger.warning(
|
||||
f"_resolve_tcp_endpoint: companion '{companion_name}' identity found "
|
||||
f"(hash=0x{h:02x}) but no bridge registered for that hash. "
|
||||
f"Known bridge hashes: {[f'0x{k:02x}' for k in bridges.keys()]}"
|
||||
)
|
||||
break
|
||||
else:
|
||||
# Loop completed without finding the name
|
||||
known = [n for n, _, _ in identity_manager.get_identities_by_type("companion")]
|
||||
logger.warning(
|
||||
f"_resolve_tcp_endpoint: companion '{companion_name}' not in identity_manager. "
|
||||
f"Known companions: {known}"
|
||||
)
|
||||
|
||||
if not found:
|
||||
return None
|
||||
|
||||
# Look up TCP port + bind address from config
|
||||
companions = _daemon.config.get("identities", {}).get("companions") or []
|
||||
for entry in companions:
|
||||
if entry.get("name") == companion_name:
|
||||
return (entry.get("settings") or {}).get("tcp_port", 5000)
|
||||
settings = entry.get("settings") or {}
|
||||
port = settings.get("tcp_port", 5000)
|
||||
bind = settings.get("bind_address", "0.0.0.0")
|
||||
# 0.0.0.0 = all interfaces — connect via loopback
|
||||
host = "127.0.0.1" if bind == "0.0.0.0" else bind
|
||||
logger.debug(f"_resolve_tcp_endpoint: '{companion_name}' → {host}:{port}")
|
||||
return (host, port)
|
||||
|
||||
logger.warning(
|
||||
f"_resolve_tcp_endpoint: '{companion_name}' found in identity_manager but missing from config"
|
||||
)
|
||||
return None
|
||||
|
||||
def _tcp_to_ws(self):
|
||||
"""TCP → WS reader loop"""
|
||||
name = getattr(self, "_companion_name", "?")
|
||||
tcp = getattr(self, "_tcp", None)
|
||||
if tcp is None:
|
||||
return
|
||||
@@ -148,13 +192,19 @@ class CompanionFrameWebSocket(WebSocket):
|
||||
while not getattr(self, "_closing", True):
|
||||
data = tcp.recv(4096)
|
||||
if not data:
|
||||
logger.info(f"TCP→WS: frame server closed connection for '{name}'")
|
||||
break
|
||||
try:
|
||||
self.send(data, binary=True)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logger.warning(f"TCP→WS: WS send failed for '{name}': {e}")
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
except OSError as e:
|
||||
# Socket error (connection reset, etc.) — normal during teardown
|
||||
if not getattr(self, "_closing", True):
|
||||
logger.warning(f"TCP→WS: socket error for '{name}': {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"TCP→WS: unexpected error for '{name}': {e}")
|
||||
finally:
|
||||
self._teardown()
|
||||
|
||||
@@ -163,6 +213,9 @@ class CompanionFrameWebSocket(WebSocket):
|
||||
return
|
||||
self._closing = True
|
||||
|
||||
name = getattr(self, "_companion_name", "?")
|
||||
logger.debug(f"Tearing down WS proxy for '{name}'")
|
||||
|
||||
tcp = getattr(self, "_tcp", None)
|
||||
if tcp:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user