mirror of
https://github.com/pyMC-dev/pyMC_Repeater.git
synced 2026-06-11 08:44:46 +02:00
995 lines
38 KiB
Python
995 lines
38 KiB
Python
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import ssl
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
from urllib import error, request
|
|
from urllib.parse import urlparse
|
|
|
|
import psutil
|
|
|
|
try:
|
|
import paho.mqtt.client as mqtt
|
|
except ImportError:
|
|
mqtt = None
|
|
|
|
from repeater import __version__
|
|
from repeater.service_utils import restart_service
|
|
|
|
logger = logging.getLogger("GlassHandler")
|
|
_SENSITIVE_KEY_MARKERS = (
|
|
"password",
|
|
"passphrase",
|
|
"secret",
|
|
"token",
|
|
"private_key",
|
|
"identity_key",
|
|
"client_key",
|
|
"api_key",
|
|
)
|
|
_SENSITIVE_KEY_EXCEPTIONS = ("pubkey", "public_key")
|
|
|
|
|
|
class GlassHandler:
|
|
def __init__(self, config: dict, daemon_instance=None, config_manager=None):
|
|
self.config = config
|
|
self.daemon_instance = daemon_instance
|
|
self.config_manager = config_manager
|
|
|
|
self.enabled = False
|
|
self.base_url = "http://localhost:8080"
|
|
self.request_timeout_seconds = 10
|
|
self.verify_tls = True
|
|
self.api_token = "" # nosec - runtime config value, not a hardcoded credential
|
|
self.inform_interval_seconds = 30
|
|
self.cert_store_dir = "/etc/pymc_repeater/glass"
|
|
self._cert_expires_at: Optional[str] = None
|
|
self.mqtt_enabled = False
|
|
self.mqtt_broker_host = "localhost"
|
|
self.mqtt_broker_port = 1883
|
|
self.mqtt_base_topic = "glass"
|
|
self.mqtt_tls_enabled = False
|
|
self.mqtt_username: Optional[str] = None
|
|
self.mqtt_password: Optional[str] = None
|
|
self.client_cert_path: Optional[str] = None
|
|
self.client_key_path: Optional[str] = None
|
|
self.ca_cert_path: Optional[str] = None
|
|
self._mqtt_client = None
|
|
self._mqtt_ready = False
|
|
self._mqtt_runtime_signature: Optional[
|
|
Tuple[
|
|
str,
|
|
int,
|
|
str,
|
|
bool,
|
|
bool,
|
|
Optional[str],
|
|
Optional[str],
|
|
Optional[str],
|
|
Optional[str],
|
|
Optional[str],
|
|
]
|
|
] = None
|
|
self._managed_settings_filename = "managed.json"
|
|
|
|
self._task: Optional[asyncio.Task] = None
|
|
self._stop_event: Optional[asyncio.Event] = None
|
|
self._pending_command_results: List[Dict[str, Any]] = []
|
|
self._pending_lock = asyncio.Lock()
|
|
|
|
self._reload_runtime_settings()
|
|
|
|
async def start(self) -> None:
|
|
self._reload_runtime_settings()
|
|
if not self.enabled:
|
|
logger.info("Glass integration disabled")
|
|
self._close_mqtt_publisher()
|
|
return
|
|
|
|
if self._task and not self._task.done():
|
|
return
|
|
self._sync_mqtt_publisher()
|
|
|
|
self._stop_event = asyncio.Event()
|
|
self._task = asyncio.create_task(self._run_loop(), name="glass-inform-loop")
|
|
logger.info(
|
|
"Glass integration started (base_url=%s, inform_interval=%ss)",
|
|
self.base_url,
|
|
self.inform_interval_seconds,
|
|
)
|
|
|
|
async def stop(self) -> None:
|
|
if self._task:
|
|
if self._stop_event:
|
|
self._stop_event.set()
|
|
|
|
try:
|
|
await self._task
|
|
except Exception as exc:
|
|
logger.debug("Glass task stop ignored exception: %s", exc)
|
|
finally:
|
|
self._task = None
|
|
self._stop_event = None
|
|
|
|
self._close_mqtt_publisher()
|
|
|
|
def _reload_runtime_settings(self) -> None:
|
|
glass_cfg = self.config.get("glass", {})
|
|
self.enabled = bool(glass_cfg.get("enabled", False))
|
|
|
|
base_url = str(glass_cfg.get("base_url", "http://localhost:8080")).strip()
|
|
self.base_url = base_url.rstrip("/") if base_url else "http://localhost:8080"
|
|
|
|
self.request_timeout_seconds = max(3, int(glass_cfg.get("request_timeout_seconds", 10)))
|
|
self.verify_tls = bool(glass_cfg.get("verify_tls", True))
|
|
self.api_token = str(glass_cfg.get("api_token", "") or "").strip()
|
|
self.inform_interval_seconds = self._clamp_interval(
|
|
int(glass_cfg.get("inform_interval_seconds", self.inform_interval_seconds))
|
|
)
|
|
self.cert_store_dir = str(
|
|
glass_cfg.get("cert_store_dir", "/etc/pymc_repeater/glass")
|
|
or "/etc/pymc_repeater/glass"
|
|
)
|
|
self.client_cert_path = (
|
|
str(glass_cfg.get("client_cert_path")).strip()
|
|
if glass_cfg.get("client_cert_path")
|
|
else None
|
|
)
|
|
self.client_key_path = (
|
|
str(glass_cfg.get("client_key_path")).strip()
|
|
if glass_cfg.get("client_key_path")
|
|
else None
|
|
)
|
|
self.ca_cert_path = (
|
|
str(glass_cfg.get("ca_cert_path")).strip() if glass_cfg.get("ca_cert_path") else None
|
|
)
|
|
managed_cfg = self._load_managed_settings()
|
|
parsed_base_url = urlparse(self.base_url)
|
|
default_host = parsed_base_url.hostname or "localhost"
|
|
|
|
self.mqtt_enabled = bool(managed_cfg.get("mqtt_enabled", False))
|
|
host_value = managed_cfg.get("mqtt_broker_host", default_host)
|
|
self.mqtt_broker_host = str(host_value or default_host).strip() or default_host
|
|
try:
|
|
self.mqtt_broker_port = max(1, int(managed_cfg.get("mqtt_broker_port", 1883)))
|
|
except (TypeError, ValueError):
|
|
self.mqtt_broker_port = 1883
|
|
topic_value = managed_cfg.get("mqtt_base_topic", "glass")
|
|
self.mqtt_base_topic = str(topic_value or "glass").strip("/")
|
|
self.mqtt_tls_enabled = bool(managed_cfg.get("mqtt_tls_enabled", False))
|
|
username = managed_cfg.get("mqtt_username")
|
|
password = managed_cfg.get("mqtt_password")
|
|
self.mqtt_username = (
|
|
str(username).strip() if isinstance(username, str) and username else None
|
|
)
|
|
self.mqtt_password = str(password) if isinstance(password, str) and password else None
|
|
|
|
def _managed_settings_path(self) -> Path:
|
|
return Path(self.cert_store_dir) / self._managed_settings_filename
|
|
|
|
def _load_managed_settings(self) -> Dict[str, Any]:
|
|
path = self._managed_settings_path()
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
raw = json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception as exc:
|
|
logger.warning("Invalid Glass managed settings file at %s: %s", path, exc)
|
|
return {}
|
|
if not isinstance(raw, dict):
|
|
logger.warning("Ignoring non-object Glass managed settings file at %s", path)
|
|
return {}
|
|
return raw
|
|
|
|
def _save_managed_settings(self, updates: Dict[str, Any], *, replace: bool) -> Tuple[bool, str]:
|
|
if not isinstance(updates, dict):
|
|
return False, "glass_managed must be an object"
|
|
|
|
path = self._managed_settings_path()
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
current = {} if replace else self._load_managed_settings()
|
|
if not isinstance(current, dict):
|
|
current = {}
|
|
merged = dict(current)
|
|
merged.update(updates)
|
|
try:
|
|
path.write_text(
|
|
json.dumps(merged, indent=2, sort_keys=True),
|
|
encoding="utf-8",
|
|
)
|
|
os.chmod(path, 0o600)
|
|
return True, "Managed settings updated"
|
|
except Exception as exc:
|
|
return False, f"Failed writing managed settings: {exc}"
|
|
|
|
async def _run_loop(self) -> None:
|
|
while self._stop_event and not self._stop_event.is_set():
|
|
self._reload_runtime_settings()
|
|
self._sync_mqtt_publisher()
|
|
try:
|
|
interval = await self._inform_once()
|
|
except Exception as exc:
|
|
logger.warning("Glass inform failed: %s", exc)
|
|
interval = self.inform_interval_seconds
|
|
|
|
wait_seconds = self._clamp_interval(interval)
|
|
if not self._stop_event:
|
|
break
|
|
try:
|
|
await asyncio.wait_for(self._stop_event.wait(), timeout=wait_seconds)
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
|
|
async def _inform_once(self) -> int:
|
|
self._reload_runtime_settings()
|
|
if not self.enabled:
|
|
return self.inform_interval_seconds
|
|
|
|
payload = await self._build_inform_payload()
|
|
response = await self._post_inform(payload)
|
|
|
|
if payload.get("command_results"):
|
|
async with self._pending_lock:
|
|
self._pending_command_results = []
|
|
|
|
response_type = str(response.get("type", "noop"))
|
|
response_interval = response.get("interval")
|
|
|
|
if response_type == "command":
|
|
await self._handle_command_response(response)
|
|
elif response_type == "config_update":
|
|
ok, message = self._apply_config_update(
|
|
response.get("config", {}),
|
|
str(response.get("merge_mode", "patch")),
|
|
)
|
|
if ok:
|
|
logger.info("Applied Glass config update")
|
|
else:
|
|
logger.warning("Failed to apply Glass config update: %s", message)
|
|
elif response_type == "cert_renewal":
|
|
ok, message = self._apply_cert_renewal(response)
|
|
if ok:
|
|
logger.info("Applied Glass certificate renewal")
|
|
else:
|
|
logger.warning("Failed to apply Glass certificate renewal: %s", message)
|
|
elif response_type == "upgrade":
|
|
logger.warning("Glass upgrade action received but not implemented on repeater")
|
|
elif response_type != "noop":
|
|
logger.warning("Unknown Glass response type: %s", response_type)
|
|
|
|
if isinstance(response_interval, int):
|
|
self.inform_interval_seconds = self._clamp_interval(response_interval)
|
|
return self.inform_interval_seconds
|
|
|
|
async def _build_inform_payload(self) -> Dict[str, Any]:
|
|
if not self.daemon_instance or not getattr(self.daemon_instance, "local_identity", None):
|
|
raise RuntimeError("Local identity not available for Glass inform")
|
|
|
|
stats = self.daemon_instance.get_stats() if self.daemon_instance else {}
|
|
local_identity = self.daemon_instance.local_identity
|
|
public_key = bytes(local_identity.get_public_key()).hex()
|
|
node_name = self.config.get("repeater", {}).get("node_name", "unknown-repeater")
|
|
|
|
uptime_seconds = int(stats.get("uptime_seconds", 0))
|
|
if uptime_seconds <= 0:
|
|
repeater_handler = getattr(self.daemon_instance, "repeater_handler", None)
|
|
if repeater_handler and getattr(repeater_handler, "start_time", None):
|
|
uptime_seconds = max(0, int(time.time() - repeater_handler.start_time))
|
|
|
|
tx_total = int(stats.get("sent_flood_count", 0)) + int(stats.get("sent_direct_count", 0))
|
|
if tx_total <= 0:
|
|
tx_total = int(stats.get("forwarded_count", 0))
|
|
|
|
command_results = await self._get_pending_command_results()
|
|
settings_snapshot = self._build_settings_snapshot()
|
|
location = self._extract_location_from_settings(settings_snapshot)
|
|
|
|
return {
|
|
"type": "inform",
|
|
"version": 1,
|
|
"node_name": node_name,
|
|
"pubkey": f"0x{public_key}",
|
|
"software_version": __version__,
|
|
"state": self.config.get("repeater", {}).get("mode", "forward"),
|
|
"location": location,
|
|
"uptime_seconds": uptime_seconds,
|
|
"config_hash": self._compute_config_hash(self.config),
|
|
"cert_expires_at": self._cert_expires_at,
|
|
"system": self._collect_system_stats(),
|
|
"radio": {
|
|
"frequency": int(self.config.get("radio", {}).get("frequency", 0)),
|
|
"spreading_factor": int(self.config.get("radio", {}).get("spreading_factor", 7)),
|
|
"bandwidth": int(self.config.get("radio", {}).get("bandwidth", 0)),
|
|
"tx_power": int(self.config.get("radio", {}).get("tx_power", 0)),
|
|
"noise_floor_dbm": stats.get("noise_floor_dbm"),
|
|
"mode": self.config.get("repeater", {}).get("mode", "forward"),
|
|
},
|
|
"counters": {
|
|
"rx_total": int(stats.get("rx_count", 0)),
|
|
"tx_total": max(0, tx_total),
|
|
"forwarded": int(stats.get("forwarded_count", 0)),
|
|
"dropped": int(stats.get("dropped_count", 0)),
|
|
"duplicates": int(stats.get("flood_dup_count", 0))
|
|
+ int(stats.get("direct_dup_count", 0)),
|
|
"airtime_percent": float(stats.get("utilization_percent", 0.0)),
|
|
},
|
|
"settings": settings_snapshot,
|
|
"command_results": command_results,
|
|
}
|
|
|
|
def _build_settings_snapshot(self) -> Dict[str, Any]:
|
|
normalized = self._normalize_for_hash(self.config)
|
|
sanitized = self._sanitize_settings_for_export(normalized)
|
|
if isinstance(sanitized, dict):
|
|
return sanitized
|
|
return {}
|
|
|
|
def _sanitize_settings_for_export(self, value: Any, key_name: Optional[str] = None) -> Any:
|
|
if isinstance(value, dict):
|
|
output: Dict[str, Any] = {}
|
|
for child_key, child_value in value.items():
|
|
if self._is_sensitive_key(child_key):
|
|
output[child_key] = "<redacted>"
|
|
continue
|
|
output[child_key] = self._sanitize_settings_for_export(child_value, child_key)
|
|
return output
|
|
if isinstance(value, list):
|
|
return [self._sanitize_settings_for_export(item, key_name) for item in value]
|
|
return value
|
|
|
|
@staticmethod
|
|
def _is_sensitive_key(key: str) -> bool:
|
|
lowered = str(key).lower()
|
|
if any(exception in lowered for exception in _SENSITIVE_KEY_EXCEPTIONS):
|
|
return False
|
|
return any(marker in lowered for marker in _SENSITIVE_KEY_MARKERS)
|
|
|
|
@staticmethod
|
|
def _normalize_location(value: Any) -> Optional[str]:
|
|
if isinstance(value, str):
|
|
text = value.strip()
|
|
if not text:
|
|
return None
|
|
parts = [part.strip() for part in text.split(",")]
|
|
if len(parts) != 2:
|
|
return None
|
|
try:
|
|
lat = float(parts[0])
|
|
lng = float(parts[1])
|
|
except ValueError:
|
|
return None
|
|
elif isinstance(value, dict):
|
|
lat = value.get("lat", value.get("latitude"))
|
|
lng = value.get("lng", value.get("longitude"))
|
|
try:
|
|
if lat is None or lng is None:
|
|
return None
|
|
lat = float(lat)
|
|
lng = float(lng)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
elif isinstance(value, (list, tuple)) and len(value) == 2:
|
|
try:
|
|
lat = float(value[0])
|
|
lng = float(value[1])
|
|
except (TypeError, ValueError):
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
if lat < -90 or lat > 90 or lng < -180 or lng > 180:
|
|
return None
|
|
return f"{lat:.6f},{lng:.6f}"
|
|
|
|
def _extract_location_from_settings(self, settings: Dict[str, Any]) -> Optional[str]:
|
|
repeater_settings = settings.get("repeater")
|
|
repeater_dict = repeater_settings if isinstance(repeater_settings, dict) else {}
|
|
candidates = [
|
|
settings.get("location"),
|
|
repeater_dict.get("location"),
|
|
settings.get("gps"),
|
|
repeater_dict.get("gps"),
|
|
{
|
|
"lat": repeater_dict.get("latitude"),
|
|
"lng": repeater_dict.get("longitude"),
|
|
},
|
|
]
|
|
for candidate in candidates:
|
|
location = self._normalize_location(candidate)
|
|
if location:
|
|
return location
|
|
return None
|
|
|
|
def _collect_system_stats(self) -> Dict[str, Any]:
|
|
temperature_c = None
|
|
try:
|
|
temperatures = (
|
|
psutil.sensors_temperatures() if hasattr(psutil, "sensors_temperatures") else {}
|
|
)
|
|
if temperatures:
|
|
for values in temperatures.values():
|
|
if values:
|
|
temperature_c = values[0].current
|
|
break
|
|
except Exception:
|
|
temperature_c = None
|
|
|
|
load_avg_1m = None
|
|
try:
|
|
if hasattr(os, "getloadavg"):
|
|
load_avg_1m = float(os.getloadavg()[0])
|
|
except Exception:
|
|
load_avg_1m = None
|
|
|
|
return {
|
|
"cpu_percent": float(psutil.cpu_percent(interval=None)),
|
|
"memory_percent": float(psutil.virtual_memory().percent),
|
|
"disk_percent": float(psutil.disk_usage("/").percent),
|
|
"temperature_c": temperature_c,
|
|
"load_avg_1m": load_avg_1m,
|
|
}
|
|
|
|
async def _post_inform(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
loop = asyncio.get_running_loop()
|
|
return await loop.run_in_executor(None, self._post_inform_sync, payload)
|
|
|
|
def _post_inform_sync(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
url = f"{self.base_url}/inform"
|
|
self._validate_http_url(url)
|
|
headers = {"Content-Type": "application/json"}
|
|
if self.api_token:
|
|
headers["Authorization"] = f"Bearer {self.api_token}"
|
|
|
|
body = json.dumps(payload).encode("utf-8")
|
|
req = request.Request(url=url, data=body, method="POST", headers=headers)
|
|
ssl_context = self._build_ssl_context(url)
|
|
|
|
try:
|
|
with request.urlopen(
|
|
req,
|
|
timeout=self.request_timeout_seconds,
|
|
context=ssl_context,
|
|
) as response: # nosec B310
|
|
response_bytes = response.read()
|
|
except error.HTTPError as exc:
|
|
details = ""
|
|
try:
|
|
details = exc.read().decode("utf-8")
|
|
except Exception:
|
|
details = str(exc)
|
|
raise RuntimeError(f"HTTP {exc.code}: {details}") from exc
|
|
except error.URLError as exc:
|
|
raise RuntimeError(f"Connection error: {exc}") from exc
|
|
|
|
if not response_bytes:
|
|
return {"type": "noop", "interval": self.inform_interval_seconds}
|
|
|
|
try:
|
|
response_payload = json.loads(response_bytes.decode("utf-8"))
|
|
except Exception as exc:
|
|
raise RuntimeError("Invalid JSON response from Glass backend") from exc
|
|
|
|
if not isinstance(response_payload, dict):
|
|
raise RuntimeError("Invalid response payload from Glass backend")
|
|
return response_payload
|
|
|
|
def _build_ssl_context(self, url: str) -> Optional[ssl.SSLContext]:
|
|
if not str(url).startswith("https"):
|
|
return None
|
|
|
|
if self.verify_tls:
|
|
if self.ca_cert_path:
|
|
ca_path = self._require_ssl_file(self.ca_cert_path, "ca_cert_path")
|
|
context = ssl.create_default_context(cafile=ca_path)
|
|
else:
|
|
context = ssl.create_default_context()
|
|
else:
|
|
context = ssl.create_default_context()
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
|
|
if self.client_cert_path or self.client_key_path:
|
|
cert_path = self._require_ssl_file(self.client_cert_path, "client_cert_path")
|
|
key_path = self._require_ssl_file(self.client_key_path, "client_key_path")
|
|
context.load_cert_chain(certfile=cert_path, keyfile=key_path)
|
|
|
|
return context
|
|
|
|
@staticmethod
|
|
def _require_ssl_file(path_value: Optional[str], field_name: str) -> str:
|
|
if not path_value or not str(path_value).strip():
|
|
raise RuntimeError(f"Missing {field_name} for Glass TLS configuration")
|
|
normalized = str(path_value).strip()
|
|
if not Path(normalized).exists():
|
|
raise RuntimeError(f"Configured {field_name} does not exist: {normalized}")
|
|
return normalized
|
|
|
|
@staticmethod
|
|
def _validate_http_url(url: str) -> None:
|
|
parsed = urlparse(url)
|
|
if parsed.scheme not in {"http", "https"}:
|
|
raise RuntimeError(f"Unsupported Glass base_url scheme: {parsed.scheme or '<missing>'}")
|
|
if not parsed.netloc:
|
|
raise RuntimeError("Glass base_url must include a host")
|
|
|
|
async def _handle_command_response(self, response: Dict[str, Any]) -> None:
|
|
command_id = str(response.get("command_id", "")).strip()
|
|
action = str(response.get("action", "")).strip()
|
|
params = response.get("params", {})
|
|
|
|
if not command_id or not action:
|
|
logger.warning("Glass command response missing command_id or action")
|
|
return
|
|
|
|
success = False
|
|
message = "Action failed"
|
|
details: Optional[Dict[str, Any]] = None
|
|
try:
|
|
success, message, details = await self._execute_command_action(action, params)
|
|
except Exception as exc:
|
|
success = False
|
|
message = f"Exception executing action: {exc}"
|
|
details = None
|
|
|
|
await self._queue_command_result(
|
|
command_id=command_id,
|
|
status="success" if success else "failed",
|
|
message=message,
|
|
details=details,
|
|
)
|
|
|
|
async def _execute_command_action(
|
|
self,
|
|
action: str,
|
|
params: Any,
|
|
) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
|
|
params = params if isinstance(params, dict) else {}
|
|
|
|
if action == "restart_service":
|
|
success, message = restart_service()
|
|
return success, message, None
|
|
|
|
if action == "send_advert":
|
|
if not self.daemon_instance or not hasattr(self.daemon_instance, "send_advert"):
|
|
return False, "send_advert unavailable", None
|
|
success = await self.daemon_instance.send_advert()
|
|
return success, "Advert sent" if success else "Failed to send advert", None
|
|
|
|
if action == "set_mode":
|
|
mode = str(params.get("mode", "")).strip()
|
|
if mode not in ("forward", "monitor", "no_tx"):
|
|
return False, "Invalid mode parameter", None
|
|
success, message = self._apply_config_update(
|
|
{"repeater": {"mode": mode}},
|
|
merge_mode="patch",
|
|
)
|
|
return success, message, None
|
|
|
|
if action == "set_inform_interval":
|
|
interval = params.get("interval_seconds", params.get("interval"))
|
|
if not isinstance(interval, int):
|
|
return False, "interval_seconds must be an integer", None
|
|
interval = self._clamp_interval(interval)
|
|
self.inform_interval_seconds = interval
|
|
success, message = self._apply_config_update(
|
|
{"glass": {"inform_interval_seconds": interval}},
|
|
merge_mode="patch",
|
|
)
|
|
return success, message, None
|
|
if action == "rotate_cert":
|
|
return True, "Certificate rotation requested", None
|
|
|
|
if action == "config_update":
|
|
config_patch = params.get("config", params)
|
|
merge_mode = str(params.get("merge_mode", "patch"))
|
|
success, message = self._apply_config_update(config_patch, merge_mode=merge_mode)
|
|
return success, message, None
|
|
|
|
if action == "transport_keys_sync":
|
|
success, message, details = self._apply_transport_keys_sync(params)
|
|
return success, message, details
|
|
|
|
if action == "set_radio":
|
|
radio_values = params.get("radio", params)
|
|
if not isinstance(radio_values, dict):
|
|
return False, "radio settings must be an object", None
|
|
success, message = self._apply_config_update(
|
|
{"radio": radio_values}, merge_mode="patch"
|
|
)
|
|
return success, message, None
|
|
|
|
if action == "run_diagnostic":
|
|
stats = self.daemon_instance.get_stats() if self.daemon_instance else {}
|
|
return (
|
|
True,
|
|
(
|
|
f"rx={int(stats.get('rx_count', 0))}, "
|
|
f"tx={int(stats.get('forwarded_count', 0))}, "
|
|
f"dropped={int(stats.get('dropped_count', 0))}"
|
|
),
|
|
None,
|
|
)
|
|
|
|
if action == "export_config":
|
|
normalized_config = self._normalize_for_hash(self.config)
|
|
return (
|
|
True,
|
|
"Configuration exported",
|
|
{
|
|
"config": normalized_config,
|
|
"config_hash": self._compute_config_hash(self.config),
|
|
},
|
|
)
|
|
|
|
return False, f"Unsupported action: {action}", None
|
|
|
|
def _apply_config_update(self, updates: Any, merge_mode: str = "patch") -> Tuple[bool, str]:
|
|
if not isinstance(updates, dict) or not updates:
|
|
return False, "Config update payload must be a non-empty object"
|
|
merge_mode = merge_mode.lower().strip()
|
|
|
|
if merge_mode not in ("patch", "replace"):
|
|
return False, f"Unsupported merge_mode: {merge_mode}"
|
|
updates_to_apply = dict(updates)
|
|
managed_updates = updates_to_apply.pop("glass_managed", None)
|
|
if managed_updates is not None:
|
|
managed_ok, managed_message = self._save_managed_settings(
|
|
managed_updates,
|
|
replace=merge_mode == "replace",
|
|
)
|
|
if not managed_ok:
|
|
return False, managed_message
|
|
self._reload_runtime_settings()
|
|
self._sync_mqtt_publisher()
|
|
|
|
if not updates_to_apply:
|
|
return True, "Managed settings updated"
|
|
|
|
sections = list(updates_to_apply.keys())
|
|
|
|
if merge_mode == "replace":
|
|
for section, value in updates_to_apply.items():
|
|
self.config[section] = value
|
|
if self.config_manager:
|
|
saved = self.config_manager.save_to_file()
|
|
live_updated = self.config_manager.live_update_daemon(sections)
|
|
return (
|
|
bool(saved and live_updated),
|
|
"Config replaced"
|
|
if saved and live_updated
|
|
else "Failed to persist replace update",
|
|
)
|
|
return True, "Config replaced"
|
|
|
|
# patch mode
|
|
if self.config_manager:
|
|
result = self.config_manager.update_and_save(
|
|
updates=updates_to_apply,
|
|
live_update=True,
|
|
live_update_sections=sections,
|
|
)
|
|
if result.get("success"):
|
|
if "glass" in sections:
|
|
self._reload_runtime_settings()
|
|
self._sync_mqtt_publisher()
|
|
return True, "Config patched"
|
|
return False, str(result.get("error", "Failed to patch config"))
|
|
self._deep_merge(self.config, updates_to_apply)
|
|
if "glass" in sections:
|
|
self._reload_runtime_settings()
|
|
self._sync_mqtt_publisher()
|
|
return True, "Config patched"
|
|
|
|
def _get_sqlite_handler(self):
|
|
if not self.daemon_instance:
|
|
return None
|
|
repeater_handler = getattr(self.daemon_instance, "repeater_handler", None)
|
|
storage = getattr(repeater_handler, "storage", None)
|
|
return getattr(storage, "sqlite_handler", None)
|
|
|
|
def _apply_transport_keys_sync(
|
|
self,
|
|
params: Dict[str, Any],
|
|
) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
|
|
if not isinstance(params, dict):
|
|
return False, "transport_keys_sync params must be an object", None
|
|
entries = params.get("transport_keys")
|
|
if not isinstance(entries, list):
|
|
return False, "transport_keys_sync payload must include a transport_keys list", None
|
|
sqlite_handler = self._get_sqlite_handler()
|
|
if sqlite_handler is None:
|
|
return False, "SQLite handler unavailable for transport key sync", None
|
|
try:
|
|
result = sqlite_handler.sync_transport_keys(entries)
|
|
except Exception as exc:
|
|
return False, f"Transport key sync failed: {exc}", None
|
|
payload_hash = params.get("payload_hash")
|
|
details: Dict[str, Any] = {
|
|
"applied_nodes": int(result.get("applied_nodes", 0)),
|
|
"generated_keys": int(result.get("generated_keys", 0)),
|
|
}
|
|
if isinstance(payload_hash, str) and payload_hash.strip():
|
|
details["payload_hash"] = payload_hash
|
|
return True, f"Applied transport key sync ({details['applied_nodes']} nodes)", details
|
|
|
|
def _apply_cert_renewal(self, response: Dict[str, Any]) -> Tuple[bool, str]:
|
|
client_cert = response.get("client_cert")
|
|
client_key = response.get("client_key")
|
|
ca_cert = response.get("ca_cert")
|
|
|
|
if not all(
|
|
isinstance(item, str) and item.strip() for item in (client_cert, client_key, ca_cert)
|
|
):
|
|
return False, "Missing certificate payload values"
|
|
|
|
cert_dir = Path(self.cert_store_dir)
|
|
cert_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
client_cert_path = cert_dir / "glass-client.crt"
|
|
client_key_path = cert_dir / "glass-client.key"
|
|
ca_cert_path = cert_dir / "glass-ca.crt"
|
|
|
|
client_cert_path.write_text(client_cert, encoding="utf-8")
|
|
client_key_path.write_text(client_key, encoding="utf-8")
|
|
ca_cert_path.write_text(ca_cert, encoding="utf-8")
|
|
os.chmod(client_key_path, 0o600)
|
|
|
|
return self._apply_config_update(
|
|
{
|
|
"glass": {
|
|
"client_cert_path": str(client_cert_path),
|
|
"client_key_path": str(client_key_path),
|
|
"ca_cert_path": str(ca_cert_path),
|
|
}
|
|
},
|
|
merge_mode="patch",
|
|
)
|
|
|
|
async def _get_pending_command_results(self) -> List[Dict[str, Any]]:
|
|
async with self._pending_lock:
|
|
return list(self._pending_command_results)
|
|
|
|
async def _queue_command_result(
|
|
self,
|
|
command_id: str,
|
|
status: str,
|
|
message: str,
|
|
details: Optional[Dict[str, Any]] = None,
|
|
) -> None:
|
|
completed_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
|
result = {
|
|
"command_id": command_id,
|
|
"status": status,
|
|
"message": message[:1024] if message else "",
|
|
"completed_at": completed_at,
|
|
}
|
|
if details:
|
|
result["details"] = details
|
|
async with self._pending_lock:
|
|
self._pending_command_results.append(result)
|
|
|
|
def publish_telemetry(self, record_type: str, record: Dict[str, Any]) -> None:
|
|
if not self.enabled or not self.mqtt_enabled or not self._mqtt_ready:
|
|
return
|
|
if not self._mqtt_client:
|
|
return
|
|
|
|
node_name = self.config.get("repeater", {}).get("node_name", "unknown-repeater")
|
|
event_type = "event"
|
|
event_name: Optional[str] = record_type
|
|
if record_type in ("packet", "advert"):
|
|
event_type = record_type
|
|
event_name = None
|
|
|
|
topic = self._mqtt_topic_for_record(node_name=node_name, record_type=record_type)
|
|
timestamp = self._to_rfc3339_timestamp(record.get("timestamp"))
|
|
payload = self._normalize_for_hash(record)
|
|
|
|
envelope: Dict[str, Any] = {
|
|
"version": 1,
|
|
"type": event_type,
|
|
"topic": topic,
|
|
"node_name": node_name,
|
|
"timestamp": timestamp,
|
|
"payload": payload,
|
|
}
|
|
if event_type == "event" and event_name:
|
|
envelope["event_name"] = event_name
|
|
|
|
try:
|
|
message = json.dumps(envelope, separators=(",", ":"), sort_keys=True, default=str)
|
|
self._mqtt_client.publish(topic, message, qos=0, retain=False)
|
|
except Exception as exc:
|
|
logger.debug("Failed publishing Glass telemetry MQTT message: %s", exc)
|
|
|
|
def _mqtt_topic_for_record(self, *, node_name: str, record_type: str) -> str:
|
|
base = self.mqtt_base_topic.strip("/") or "glass"
|
|
if record_type in ("packet", "advert"):
|
|
return f"{base}/{node_name}/{record_type}"
|
|
return f"{base}/{node_name}/event/{record_type}"
|
|
|
|
def _to_rfc3339_timestamp(self, value: Any) -> str:
|
|
if isinstance(value, (int, float)):
|
|
dt = datetime.fromtimestamp(float(value), timezone.utc)
|
|
elif isinstance(value, str):
|
|
normalized = value.strip()
|
|
if normalized.endswith("Z"):
|
|
return normalized
|
|
try:
|
|
dt = datetime.fromisoformat(normalized)
|
|
except ValueError:
|
|
dt = datetime.now(timezone.utc)
|
|
elif isinstance(value, datetime):
|
|
dt = value
|
|
else:
|
|
dt = datetime.now(timezone.utc)
|
|
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
else:
|
|
dt = dt.astimezone(timezone.utc)
|
|
return dt.isoformat().replace("+00:00", "Z")
|
|
|
|
def _init_mqtt_publisher(self) -> None:
|
|
if not self.mqtt_enabled:
|
|
self._close_mqtt_publisher()
|
|
return
|
|
if mqtt is None:
|
|
logger.warning("Glass MQTT telemetry publishing enabled but paho-mqtt is unavailable")
|
|
self._close_mqtt_publisher()
|
|
return
|
|
if self._mqtt_client is not None:
|
|
return
|
|
|
|
try:
|
|
client = mqtt.Client()
|
|
if self.mqtt_username:
|
|
client.username_pw_set(self.mqtt_username, self.mqtt_password)
|
|
if self.mqtt_tls_enabled:
|
|
ca_certs = (
|
|
self._require_ssl_file(self.ca_cert_path, "ca_cert_path")
|
|
if self.ca_cert_path
|
|
else None
|
|
)
|
|
certfile = None
|
|
keyfile = None
|
|
if self.client_cert_path or self.client_key_path:
|
|
certfile = self._require_ssl_file(self.client_cert_path, "client_cert_path")
|
|
keyfile = self._require_ssl_file(self.client_key_path, "client_key_path")
|
|
cert_reqs = ssl.CERT_REQUIRED if self.verify_tls else ssl.CERT_NONE
|
|
client.tls_set(
|
|
ca_certs=ca_certs,
|
|
certfile=certfile,
|
|
keyfile=keyfile,
|
|
cert_reqs=cert_reqs,
|
|
tls_version=ssl.PROTOCOL_TLS_CLIENT,
|
|
)
|
|
if not self.verify_tls:
|
|
client.tls_insecure_set(True)
|
|
client.on_connect = self._on_mqtt_connect
|
|
client.on_disconnect = self._on_mqtt_disconnect
|
|
client.connect_async(self.mqtt_broker_host, self.mqtt_broker_port, 60)
|
|
client.loop_start()
|
|
self._mqtt_client = client
|
|
self._mqtt_runtime_signature = self._current_mqtt_signature()
|
|
logger.info(
|
|
"Glass MQTT telemetry publisher started (%s:%s, base_topic=%s)",
|
|
self.mqtt_broker_host,
|
|
self.mqtt_broker_port,
|
|
self.mqtt_base_topic,
|
|
)
|
|
except Exception as exc:
|
|
self._mqtt_client = None
|
|
self._mqtt_ready = False
|
|
self._mqtt_runtime_signature = None
|
|
logger.warning("Failed to start Glass MQTT telemetry publisher: %s", exc)
|
|
|
|
def _close_mqtt_publisher(self) -> None:
|
|
client = self._mqtt_client
|
|
self._mqtt_client = None
|
|
self._mqtt_ready = False
|
|
self._mqtt_runtime_signature = None
|
|
if client is None:
|
|
return
|
|
try:
|
|
client.loop_stop()
|
|
client.disconnect()
|
|
except Exception as exc:
|
|
logger.debug("Error stopping Glass MQTT telemetry publisher: %s", exc)
|
|
|
|
def _on_mqtt_connect(self, _client, _userdata, _flags, reason_code, _properties=None) -> None:
|
|
rc = getattr(reason_code, "value", reason_code)
|
|
if rc == 0:
|
|
self._mqtt_ready = True
|
|
logger.info("Glass MQTT telemetry publisher connected")
|
|
return
|
|
self._mqtt_ready = False
|
|
logger.warning("Glass MQTT telemetry publisher connect failed (code=%s)", rc)
|
|
|
|
def _on_mqtt_disconnect(self, _client, _userdata, reason_code, _properties=None) -> None:
|
|
self._mqtt_ready = False
|
|
rc = getattr(reason_code, "value", reason_code)
|
|
if rc:
|
|
logger.warning("Glass MQTT telemetry publisher disconnected (code=%s)", rc)
|
|
|
|
def _current_mqtt_signature(
|
|
self,
|
|
) -> Tuple[
|
|
str,
|
|
int,
|
|
str,
|
|
bool,
|
|
bool,
|
|
Optional[str],
|
|
Optional[str],
|
|
Optional[str],
|
|
Optional[str],
|
|
Optional[str],
|
|
]:
|
|
return (
|
|
self.mqtt_broker_host,
|
|
self.mqtt_broker_port,
|
|
self.mqtt_base_topic,
|
|
self.mqtt_tls_enabled,
|
|
self.verify_tls,
|
|
self.ca_cert_path,
|
|
self.client_cert_path,
|
|
self.client_key_path,
|
|
self.mqtt_username,
|
|
self.mqtt_password,
|
|
)
|
|
|
|
def _sync_mqtt_publisher(self) -> None:
|
|
if not self.enabled or not self.mqtt_enabled:
|
|
self._close_mqtt_publisher()
|
|
return
|
|
if mqtt is None:
|
|
self._close_mqtt_publisher()
|
|
return
|
|
|
|
signature = self._current_mqtt_signature()
|
|
if self._mqtt_client is None:
|
|
self._init_mqtt_publisher()
|
|
return
|
|
if self._mqtt_runtime_signature != signature:
|
|
self._close_mqtt_publisher()
|
|
self._init_mqtt_publisher()
|
|
|
|
@staticmethod
|
|
def _deep_merge(target: Dict[str, Any], source: Dict[str, Any]) -> None:
|
|
for key, value in source.items():
|
|
if isinstance(value, dict) and isinstance(target.get(key), dict):
|
|
GlassHandler._deep_merge(target[key], value)
|
|
else:
|
|
target[key] = value
|
|
|
|
@staticmethod
|
|
def _normalize_for_hash(value: Any) -> Any:
|
|
if isinstance(value, bytes):
|
|
return value.hex()
|
|
if isinstance(value, dict):
|
|
return {k: GlassHandler._normalize_for_hash(v) for k, v in value.items()}
|
|
if isinstance(value, list):
|
|
return [GlassHandler._normalize_for_hash(v) for v in value]
|
|
return value
|
|
|
|
@staticmethod
|
|
def _compute_config_hash(config: dict) -> str:
|
|
normalized = GlassHandler._normalize_for_hash(config)
|
|
encoded = json.dumps(normalized, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
|
digest = hashlib.sha256(encoded).hexdigest()
|
|
return f"sha256:{digest}"
|
|
|
|
@staticmethod
|
|
def _clamp_interval(interval_seconds: int) -> int:
|
|
if interval_seconds < 5:
|
|
return 5
|
|
if interval_seconds > 3600:
|
|
return 3600
|
|
return interval_seconds
|