refactor: enhance security comments and error handling across multiple modules

This commit is contained in:
Rightup
2026-05-27 22:07:34 +01:00
parent a5355f188d
commit 60ca184dbd
11 changed files with 30 additions and 23 deletions
+1 -1
View File
@@ -32,7 +32,7 @@ class CompanionFrameServer(_BaseFrameServer):
bridge,
companion_hash: str,
port: int = 5000,
bind_address: str = "0.0.0.0",
bind_address: str = "0.0.0.0", # nosec B104 - intentional default for LAN reachability
client_idle_timeout_sec: Optional[int] = 8 * 60 * 60, # 8 hours
sqlite_handler=None,
local_hash: Optional[int] = None,
+1 -1
View File
@@ -45,7 +45,7 @@ class GlassHandler:
self.base_url = "http://localhost:8080"
self.request_timeout_seconds = 10
self.verify_tls = True
self.api_token = ""
self.api_token = "" # nosec - runtime config value, not a hardcoded credential
self.inform_interval_seconds = 30
self.cert_store_dir = "/etc/pymc_repeater/glass"
self._cert_expires_at: Optional[str] = None
+6 -3
View File
@@ -1715,11 +1715,14 @@ class SQLiteHandler:
except Exception as e:
logger.error(f"Failed to generate transport key using get_auto_key_for: {e}")
# Fallback to a transport-compatible random 16-byte key if derivation fails.
# Fallback to a transport-compatible random key if derivation fails.
try:
random_bytes = secrets.token_bytes(16)
fallback_length = max(1, int(key_length_bytes))
random_bytes = secrets.token_bytes(fallback_length)
key = base64.b64encode(random_bytes).decode("utf-8")
logger.warning(f"Using fallback random key generation for '{name}'")
logger.warning(
f"Using fallback random key generation for '{name}' with {fallback_length} bytes"
)
return key
except Exception as fallback_e:
logger.error(f"Fallback key generation also failed: {fallback_e}")
+5 -2
View File
@@ -43,8 +43,8 @@ class ACL:
allow_read_only: bool = True,
):
self.max_clients = max_clients
self.admin_password = admin_password
self.guest_password = guest_password
self.admin_password = admin_password or ""
self.guest_password = guest_password or ""
self.allow_read_only = allow_read_only
self.clients: Dict[bytes, ClientInfo] = {}
@@ -93,6 +93,9 @@ class ACL:
f"guest: {'SET' if guest_pwd else 'NONE'}"
)
admin_pwd = admin_pwd or ""
guest_pwd = guest_pwd or ""
if target_identity_name:
logger.debug(
f"Authenticating for identity '{target_identity_name}' (room_server={is_room_server})"
+3 -3
View File
@@ -539,7 +539,7 @@ class RepeaterDaemon:
node_name = settings.get("node_name", name)
tcp_port = settings.get("tcp_port", 5000)
bind_address = settings.get("bind_address", "0.0.0.0")
bind_address = settings.get("bind_address", "0.0.0.0") # nosec B104
tcp_timeout_raw = settings.get("tcp_timeout", 8 * 60 * 60) # 8 hours
client_idle_timeout_sec = None if tcp_timeout_raw == 0 else int(tcp_timeout_raw)
@@ -721,7 +721,7 @@ class RepeaterDaemon:
node_name = settings.get("node_name", name)
tcp_port = settings.get("tcp_port", 5000)
bind_address = settings.get("bind_address", "0.0.0.0")
bind_address = settings.get("bind_address", "0.0.0.0") # nosec B104
tcp_timeout_raw = settings.get("tcp_timeout", 120)
client_idle_timeout_sec = None if tcp_timeout_raw == 0 else int(tcp_timeout_raw)
@@ -1291,7 +1291,7 @@ class RepeaterDaemon:
# Start HTTP stats server
http_port = self.config.get("http", {}).get("port", 8000)
http_host = self.config.get("http", {}).get("host", "0.0.0.0")
http_host = self.config.get("http", {}).get("host", "0.0.0.0") # nosec B104
node_name = self.config.get("repeater", {}).get("node_name", "Repeater")
+4 -3
View File
@@ -5,6 +5,7 @@ Provides functions for service control operations like restart.
import logging
import os
import shutil
import subprocess # nosec B404
import threading
import time
@@ -14,9 +15,9 @@ logger = logging.getLogger("ServiceUtils")
INIT_SCRIPT = "/etc/init.d/S80pymc-repeater"
BUILDROOT_METADATA_PATH = "/etc/pymc-image-build-id"
_CONTAINER_RESTART_DELAY_SECONDS = 1.0
_SH_BIN = "/bin/sh"
_SYSTEMCTL_BIN = "/bin/systemctl"
_SUDO_BIN = "/usr/bin/sudo"
_SH_BIN = shutil.which("sh") or "sh"
_SYSTEMCTL_BIN = shutil.which("systemctl") or "systemctl"
_SUDO_BIN = shutil.which("sudo") or "sudo"
def is_buildroot() -> bool:
+1 -1
View File
@@ -3522,7 +3522,7 @@ class APIEndpoints:
comp_settings = {
"node_name": settings.get("node_name") or name,
"tcp_port": settings.get("tcp_port", 5000),
"bind_address": settings.get("bind_address", "0.0.0.0"),
"bind_address": settings.get("bind_address", "0.0.0.0"), # nosec B104
}
if "tcp_timeout" in settings:
comp_settings["tcp_timeout"] = settings["tcp_timeout"]
+1 -2
View File
@@ -28,8 +28,7 @@ def check_auth():
if not jwt_handler or not token_manager:
logger.error("Auth handlers not initialized in cherrypy.config")
cherrypy.response.status = 500
return {"success": False, "error": "Authentication system not configured"}
raise cherrypy.HTTPError(500, "Authentication system not configured")
# Check for JWT token in Authorization header first
auth_header = cherrypy.request.headers.get("Authorization", "")
+2 -2
View File
@@ -172,9 +172,9 @@ class CompanionFrameWebSocket(WebSocket):
if entry.get("name") == companion_name:
settings = entry.get("settings") or {}
port = settings.get("tcp_port", 5000)
bind = settings.get("bind_address", "0.0.0.0")
bind = settings.get("bind_address", "0.0.0.0") # nosec B104
# 0.0.0.0 = all interfaces — connect via loopback
host = "127.0.0.1" if bind == "0.0.0.0" else bind
host = "127.0.0.1" if bind == "0.0.0.0" else bind # nosec B104
logger.debug(f"_resolve_tcp_endpoint: '{companion_name}'{host}:{port}")
return (host, port)
+1 -1
View File
@@ -177,7 +177,7 @@ class StatsApp:
class HTTPStatsServer:
def __init__(
self,
host: str = "0.0.0.0",
host: str = "0.0.0.0", # nosec B104 - intentional default for service exposure
port: int = 8000,
stats_getter: Optional[Callable] = None,
node_name: str = "Repeater",
+5 -4
View File
@@ -73,11 +73,12 @@ def test_check_auth_skips_options_and_login(monkeypatch):
assert check_auth() is None
def test_check_auth_missing_handlers_returns_500_json(monkeypatch):
def test_check_auth_missing_handlers_raises_http_500(monkeypatch):
_set_cp(monkeypatch, cfg={})
out = check_auth()
assert out["success"] is False
assert cherrypy.response.status == 500
with pytest.raises(cherrypy.HTTPError) as exc_info:
check_auth()
assert exc_info.value.status == 500
def test_check_auth_accepts_bearer_token(monkeypatch):