diff --git a/README.md b/README.md index ae3d2d4..fe5d540 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,18 @@ The configuration file is created and configured during installation at: /etc/pymc_repeater/config.yaml ``` +### Optional pyMC_Glass integration +The repeater now supports an additive `glass` config section for central control-plane integration. +When enabled, it sends periodic `/inform` payloads to pyMC_Glass, receives queued commands, and reports command results on the next inform cycle. + +Minimal example: +```yaml +glass: + enabled: true + base_url: "http://localhost:8080" + inform_interval_seconds: 30 +``` + To reconfigure radio and hardware settings after installation, run: ```bash sudo bash setup-radio-config.sh /etc/pymc_repeater diff --git a/config.yaml.example b/config.yaml.example index 37ecdef..a7c0bd9 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -402,6 +402,32 @@ letsmesh: # - TRACE # Don't publish trace packets # - RAW_CUSTOM # Don't publish custom raw packets +# pyMC_Glass control-plane integration (optional) +glass: + # Enable repeater -> pyMC_Glass /inform loop + enabled: false + + # Base URL of Glass backend + # Example local dev: "http://localhost:8080" + # Example production: "https://glass.example.com" + base_url: "http://localhost:8080" + + # Inform interval in seconds (used as initial/default interval; + # backend may override via noop.interval response) + inform_interval_seconds: 30 + + # HTTP timeout per inform request + request_timeout_seconds: 10 + + # Verify TLS certificates when using HTTPS + verify_tls: true + + # Optional bearer token for future authenticated inform endpoints + api_token: "" + + # Where cert_renewal payloads are written + cert_store_dir: "/etc/pymc_repeater/glass" + logging: # Log level: DEBUG, INFO, WARNING, ERROR level: INFO diff --git a/repeater/config.py b/repeater/config.py index 604a85f..7136bd3 100644 --- a/repeater/config.py +++ b/repeater/config.py @@ -77,6 +77,17 @@ def load_config(config_path: Optional[str] = None) -> Dict[str, Any]: if "mesh" not in config: config["mesh"] = {} + if "glass" not in config: + config["glass"] = { + "enabled": False, + "base_url": "http://localhost:8080", + "inform_interval_seconds": 30, + "request_timeout_seconds": 10, + "verify_tls": True, + "api_token": "", + "cert_store_dir": "/etc/pymc_repeater/glass", + } + # Ensure repeater.security exists with defaults for upgrades from older configs if "repeater" not in config: config["repeater"] = {} diff --git a/repeater/config_manager.py b/repeater/config_manager.py index b131f70..66af0e1 100644 --- a/repeater/config_manager.py +++ b/repeater/config_manager.py @@ -69,7 +69,7 @@ class ConfigManager: # Default sections to update if not specified if sections is None: - sections = ['repeater', 'delays', 'radio', 'acl', 'identities'] + sections = ['repeater', 'delays', 'radio', 'acl', 'identities', 'glass'] # Update each section for section in sections: diff --git a/repeater/data_acquisition/__init__.py b/repeater/data_acquisition/__init__.py index 14a0e13..adeb872 100644 --- a/repeater/data_acquisition/__init__.py +++ b/repeater/data_acquisition/__init__.py @@ -1,6 +1,6 @@ +from .glass_handler import GlassHandler from .mqtt_handler import MQTTHandler from .rrdtool_handler import RRDToolHandler from .sqlite_handler import SQLiteHandler from .storage_collector import StorageCollector - -__all__ = ["SQLiteHandler", "RRDToolHandler", "MQTTHandler", "StorageCollector"] +__all__ = ["SQLiteHandler", "RRDToolHandler", "MQTTHandler", "StorageCollector", "GlassHandler"] diff --git a/repeater/data_acquisition/glass_handler.py b/repeater/data_acquisition/glass_handler.py new file mode 100644 index 0000000..61b6269 --- /dev/null +++ b/repeater/data_acquisition/glass_handler.py @@ -0,0 +1,957 @@ +import asyncio +import hashlib +import json +import logging +import os +import ssl +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse +from urllib import error, request + +import psutil +try: + import paho.mqtt.client as mqtt +except ImportError: + mqtt = None + +from repeater import __version__ +from repeater.service_utils import restart_service + +logger = logging.getLogger("GlassHandler") +_SENSITIVE_KEY_MARKERS = ( + "password", + "passphrase", + "secret", + "token", + "private_key", + "identity_key", + "client_key", + "api_key", +) +_SENSITIVE_KEY_EXCEPTIONS = ("pubkey", "public_key") + + +class GlassHandler: + def __init__(self, config: dict, daemon_instance=None, config_manager=None): + self.config = config + self.daemon_instance = daemon_instance + self.config_manager = config_manager + + self.enabled = False + self.base_url = "http://localhost:8080" + self.request_timeout_seconds = 10 + self.verify_tls = True + self.api_token = "" + self.inform_interval_seconds = 30 + self.cert_store_dir = "/etc/pymc_repeater/glass" + self._cert_expires_at: Optional[str] = None + self.mqtt_enabled = False + self.mqtt_broker_host = "localhost" + self.mqtt_broker_port = 1883 + self.mqtt_base_topic = "glass" + self.mqtt_tls_enabled = False + self.mqtt_username: Optional[str] = None + self.mqtt_password: Optional[str] = None + self.client_cert_path: Optional[str] = None + self.client_key_path: Optional[str] = None + self.ca_cert_path: Optional[str] = None + self._mqtt_client = None + self._mqtt_ready = False + self._mqtt_runtime_signature: Optional[ + Tuple[ + str, + int, + str, + bool, + bool, + Optional[str], + Optional[str], + Optional[str], + Optional[str], + Optional[str], + ] + ] = None + self._managed_settings_filename = "managed.json" + + self._task: Optional[asyncio.Task] = None + self._stop_event: Optional[asyncio.Event] = None + self._pending_command_results: List[Dict[str, Any]] = [] + self._pending_lock = asyncio.Lock() + + self._reload_runtime_settings() + + async def start(self) -> None: + self._reload_runtime_settings() + if not self.enabled: + logger.info("Glass integration disabled") + self._close_mqtt_publisher() + return + + if self._task and not self._task.done(): + return + self._sync_mqtt_publisher() + + self._stop_event = asyncio.Event() + self._task = asyncio.create_task(self._run_loop(), name="glass-inform-loop") + logger.info( + "Glass integration started (base_url=%s, inform_interval=%ss)", + self.base_url, + self.inform_interval_seconds, + ) + + async def stop(self) -> None: + if self._task: + if self._stop_event: + self._stop_event.set() + + try: + await self._task + except Exception as exc: + logger.debug("Glass task stop ignored exception: %s", exc) + finally: + self._task = None + self._stop_event = None + + self._close_mqtt_publisher() + + def _reload_runtime_settings(self) -> None: + glass_cfg = self.config.get("glass", {}) + self.enabled = bool(glass_cfg.get("enabled", False)) + + base_url = str(glass_cfg.get("base_url", "http://localhost:8080")).strip() + self.base_url = base_url.rstrip("/") if base_url else "http://localhost:8080" + + self.request_timeout_seconds = max(3, int(glass_cfg.get("request_timeout_seconds", 10))) + self.verify_tls = bool(glass_cfg.get("verify_tls", True)) + self.api_token = str(glass_cfg.get("api_token", "") or "").strip() + self.inform_interval_seconds = self._clamp_interval( + int(glass_cfg.get("inform_interval_seconds", self.inform_interval_seconds)) + ) + self.cert_store_dir = str( + glass_cfg.get("cert_store_dir", "/etc/pymc_repeater/glass") or "/etc/pymc_repeater/glass" + ) + self.client_cert_path = ( + str(glass_cfg.get("client_cert_path")).strip() + if glass_cfg.get("client_cert_path") + else None + ) + self.client_key_path = ( + str(glass_cfg.get("client_key_path")).strip() + if glass_cfg.get("client_key_path") + else None + ) + self.ca_cert_path = ( + str(glass_cfg.get("ca_cert_path")).strip() + if glass_cfg.get("ca_cert_path") + else None + ) + managed_cfg = self._load_managed_settings() + parsed_base_url = urlparse(self.base_url) + default_host = parsed_base_url.hostname or "localhost" + + self.mqtt_enabled = bool(managed_cfg.get("mqtt_enabled", False)) + host_value = managed_cfg.get("mqtt_broker_host", default_host) + self.mqtt_broker_host = str(host_value or default_host).strip() or default_host + try: + self.mqtt_broker_port = max(1, int(managed_cfg.get("mqtt_broker_port", 1883))) + except (TypeError, ValueError): + self.mqtt_broker_port = 1883 + topic_value = managed_cfg.get("mqtt_base_topic", "glass") + self.mqtt_base_topic = str(topic_value or "glass").strip("/") + self.mqtt_tls_enabled = bool(managed_cfg.get("mqtt_tls_enabled", False)) + username = managed_cfg.get("mqtt_username") + password = managed_cfg.get("mqtt_password") + self.mqtt_username = str(username).strip() if isinstance(username, str) and username else None + self.mqtt_password = str(password) if isinstance(password, str) and password else None + + def _managed_settings_path(self) -> Path: + return Path(self.cert_store_dir) / self._managed_settings_filename + + def _load_managed_settings(self) -> Dict[str, Any]: + path = self._managed_settings_path() + if not path.exists(): + return {} + try: + raw = json.loads(path.read_text(encoding="utf-8")) + except Exception as exc: + logger.warning("Invalid Glass managed settings file at %s: %s", path, exc) + return {} + if not isinstance(raw, dict): + logger.warning("Ignoring non-object Glass managed settings file at %s", path) + return {} + return raw + + def _save_managed_settings(self, updates: Dict[str, Any], *, replace: bool) -> Tuple[bool, str]: + if not isinstance(updates, dict): + return False, "glass_managed must be an object" + + path = self._managed_settings_path() + path.parent.mkdir(parents=True, exist_ok=True) + current = {} if replace else self._load_managed_settings() + if not isinstance(current, dict): + current = {} + merged = dict(current) + merged.update(updates) + try: + path.write_text( + json.dumps(merged, indent=2, sort_keys=True), + encoding="utf-8", + ) + os.chmod(path, 0o600) + return True, "Managed settings updated" + except Exception as exc: + return False, f"Failed writing managed settings: {exc}" + + async def _run_loop(self) -> None: + while self._stop_event and not self._stop_event.is_set(): + self._reload_runtime_settings() + self._sync_mqtt_publisher() + try: + interval = await self._inform_once() + except Exception as exc: + logger.warning("Glass inform failed: %s", exc) + interval = self.inform_interval_seconds + + wait_seconds = self._clamp_interval(interval) + if not self._stop_event: + break + try: + await asyncio.wait_for(self._stop_event.wait(), timeout=wait_seconds) + except asyncio.TimeoutError: + continue + + async def _inform_once(self) -> int: + self._reload_runtime_settings() + if not self.enabled: + return self.inform_interval_seconds + + payload = await self._build_inform_payload() + response = await self._post_inform(payload) + + if payload.get("command_results"): + async with self._pending_lock: + self._pending_command_results = [] + + response_type = str(response.get("type", "noop")) + response_interval = response.get("interval") + + if response_type == "command": + await self._handle_command_response(response) + elif response_type == "config_update": + ok, message = self._apply_config_update( + response.get("config", {}), + str(response.get("merge_mode", "patch")), + ) + if ok: + logger.info("Applied Glass config update") + else: + logger.warning("Failed to apply Glass config update: %s", message) + elif response_type == "cert_renewal": + ok, message = self._apply_cert_renewal(response) + if ok: + logger.info("Applied Glass certificate renewal") + else: + logger.warning("Failed to apply Glass certificate renewal: %s", message) + elif response_type == "upgrade": + logger.warning("Glass upgrade action received but not implemented on repeater") + elif response_type != "noop": + logger.warning("Unknown Glass response type: %s", response_type) + + if isinstance(response_interval, int): + self.inform_interval_seconds = self._clamp_interval(response_interval) + return self.inform_interval_seconds + + async def _build_inform_payload(self) -> Dict[str, Any]: + if not self.daemon_instance or not getattr(self.daemon_instance, "local_identity", None): + raise RuntimeError("Local identity not available for Glass inform") + + stats = self.daemon_instance.get_stats() if self.daemon_instance else {} + local_identity = self.daemon_instance.local_identity + public_key = bytes(local_identity.get_public_key()).hex() + node_name = self.config.get("repeater", {}).get("node_name", "unknown-repeater") + + uptime_seconds = int(stats.get("uptime_seconds", 0)) + if uptime_seconds <= 0: + repeater_handler = getattr(self.daemon_instance, "repeater_handler", None) + if repeater_handler and getattr(repeater_handler, "start_time", None): + uptime_seconds = max(0, int(time.time() - repeater_handler.start_time)) + + tx_total = int(stats.get("sent_flood_count", 0)) + int(stats.get("sent_direct_count", 0)) + if tx_total <= 0: + tx_total = int(stats.get("forwarded_count", 0)) + + command_results = await self._get_pending_command_results() + settings_snapshot = self._build_settings_snapshot() + location = self._extract_location_from_settings(settings_snapshot) + + return { + "type": "inform", + "version": 1, + "node_name": node_name, + "pubkey": f"0x{public_key}", + "software_version": __version__, + "state": self.config.get("repeater", {}).get("mode", "forward"), + "location": location, + "uptime_seconds": uptime_seconds, + "config_hash": self._compute_config_hash(self.config), + "cert_expires_at": self._cert_expires_at, + "system": self._collect_system_stats(), + "radio": { + "frequency": int(self.config.get("radio", {}).get("frequency", 0)), + "spreading_factor": int(self.config.get("radio", {}).get("spreading_factor", 7)), + "bandwidth": int(self.config.get("radio", {}).get("bandwidth", 0)), + "tx_power": int(self.config.get("radio", {}).get("tx_power", 0)), + "noise_floor_dbm": stats.get("noise_floor_dbm"), + "mode": self.config.get("repeater", {}).get("mode", "forward"), + }, + "counters": { + "rx_total": int(stats.get("rx_count", 0)), + "tx_total": max(0, tx_total), + "forwarded": int(stats.get("forwarded_count", 0)), + "dropped": int(stats.get("dropped_count", 0)), + "duplicates": int(stats.get("flood_dup_count", 0)) + + int(stats.get("direct_dup_count", 0)), + "airtime_percent": float(stats.get("utilization_percent", 0.0)), + }, + "settings": settings_snapshot, + "command_results": command_results, + } + + def _build_settings_snapshot(self) -> Dict[str, Any]: + normalized = self._normalize_for_hash(self.config) + sanitized = self._sanitize_settings_for_export(normalized) + if isinstance(sanitized, dict): + return sanitized + return {} + + def _sanitize_settings_for_export(self, value: Any, key_name: Optional[str] = None) -> Any: + if isinstance(value, dict): + output: Dict[str, Any] = {} + for child_key, child_value in value.items(): + if self._is_sensitive_key(child_key): + output[child_key] = "" + continue + output[child_key] = self._sanitize_settings_for_export(child_value, child_key) + return output + if isinstance(value, list): + return [self._sanitize_settings_for_export(item, key_name) for item in value] + return value + + @staticmethod + def _is_sensitive_key(key: str) -> bool: + lowered = str(key).lower() + if any(exception in lowered for exception in _SENSITIVE_KEY_EXCEPTIONS): + return False + return any(marker in lowered for marker in _SENSITIVE_KEY_MARKERS) + + @staticmethod + def _normalize_location(value: Any) -> Optional[str]: + if isinstance(value, str): + text = value.strip() + if not text: + return None + parts = [part.strip() for part in text.split(",")] + if len(parts) != 2: + return None + try: + lat = float(parts[0]) + lng = float(parts[1]) + except ValueError: + return None + elif isinstance(value, dict): + lat = value.get("lat", value.get("latitude")) + lng = value.get("lng", value.get("longitude")) + try: + if lat is None or lng is None: + return None + lat = float(lat) + lng = float(lng) + except (TypeError, ValueError): + return None + elif isinstance(value, (list, tuple)) and len(value) == 2: + try: + lat = float(value[0]) + lng = float(value[1]) + except (TypeError, ValueError): + return None + else: + return None + + if lat < -90 or lat > 90 or lng < -180 or lng > 180: + return None + return f"{lat:.6f},{lng:.6f}" + + def _extract_location_from_settings(self, settings: Dict[str, Any]) -> Optional[str]: + repeater_settings = settings.get("repeater") + repeater_dict = repeater_settings if isinstance(repeater_settings, dict) else {} + candidates = [ + settings.get("location"), + repeater_dict.get("location"), + settings.get("gps"), + repeater_dict.get("gps"), + { + "lat": repeater_dict.get("latitude"), + "lng": repeater_dict.get("longitude"), + }, + ] + for candidate in candidates: + location = self._normalize_location(candidate) + if location: + return location + return None + + def _collect_system_stats(self) -> Dict[str, Any]: + temperature_c = None + try: + temperatures = psutil.sensors_temperatures() if hasattr(psutil, "sensors_temperatures") else {} + if temperatures: + for values in temperatures.values(): + if values: + temperature_c = values[0].current + break + except Exception: + temperature_c = None + + load_avg_1m = None + try: + if hasattr(os, "getloadavg"): + load_avg_1m = float(os.getloadavg()[0]) + except Exception: + load_avg_1m = None + + return { + "cpu_percent": float(psutil.cpu_percent(interval=None)), + "memory_percent": float(psutil.virtual_memory().percent), + "disk_percent": float(psutil.disk_usage("/").percent), + "temperature_c": temperature_c, + "load_avg_1m": load_avg_1m, + } + + async def _post_inform(self, payload: Dict[str, Any]) -> Dict[str, Any]: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, self._post_inform_sync, payload) + + def _post_inform_sync(self, payload: Dict[str, Any]) -> Dict[str, Any]: + url = f"{self.base_url}/inform" + headers = {"Content-Type": "application/json"} + if self.api_token: + headers["Authorization"] = f"Bearer {self.api_token}" + + body = json.dumps(payload).encode("utf-8") + req = request.Request(url=url, data=body, method="POST", headers=headers) + ssl_context = self._build_ssl_context(url) + + try: + with request.urlopen( + req, + timeout=self.request_timeout_seconds, + context=ssl_context, + ) as response: + response_bytes = response.read() + except error.HTTPError as exc: + details = "" + try: + details = exc.read().decode("utf-8") + except Exception: + details = str(exc) + raise RuntimeError(f"HTTP {exc.code}: {details}") from exc + except error.URLError as exc: + raise RuntimeError(f"Connection error: {exc}") from exc + + if not response_bytes: + return {"type": "noop", "interval": self.inform_interval_seconds} + + try: + response_payload = json.loads(response_bytes.decode("utf-8")) + except Exception as exc: + raise RuntimeError("Invalid JSON response from Glass backend") from exc + + if not isinstance(response_payload, dict): + raise RuntimeError("Invalid response payload from Glass backend") + return response_payload + + def _build_ssl_context(self, url: str) -> Optional[ssl.SSLContext]: + if not str(url).startswith("https"): + return None + + if self.verify_tls: + if self.ca_cert_path: + ca_path = self._require_ssl_file(self.ca_cert_path, "ca_cert_path") + context = ssl.create_default_context(cafile=ca_path) + else: + context = ssl.create_default_context() + else: + context = ssl._create_unverified_context() + + if self.client_cert_path or self.client_key_path: + cert_path = self._require_ssl_file(self.client_cert_path, "client_cert_path") + key_path = self._require_ssl_file(self.client_key_path, "client_key_path") + context.load_cert_chain(certfile=cert_path, keyfile=key_path) + + return context + + @staticmethod + def _require_ssl_file(path_value: Optional[str], field_name: str) -> str: + if not path_value or not str(path_value).strip(): + raise RuntimeError(f"Missing {field_name} for Glass TLS configuration") + normalized = str(path_value).strip() + if not Path(normalized).exists(): + raise RuntimeError(f"Configured {field_name} does not exist: {normalized}") + return normalized + + async def _handle_command_response(self, response: Dict[str, Any]) -> None: + command_id = str(response.get("command_id", "")).strip() + action = str(response.get("action", "")).strip() + params = response.get("params", {}) + + if not command_id or not action: + logger.warning("Glass command response missing command_id or action") + return + + success = False + message = "Action failed" + details: Optional[Dict[str, Any]] = None + try: + success, message, details = await self._execute_command_action(action, params) + except Exception as exc: + success = False + message = f"Exception executing action: {exc}" + details = None + + await self._queue_command_result( + command_id=command_id, + status="success" if success else "failed", + message=message, + details=details, + ) + + async def _execute_command_action( + self, + action: str, + params: Any, + ) -> Tuple[bool, str, Optional[Dict[str, Any]]]: + params = params if isinstance(params, dict) else {} + + if action == "restart_service": + success, message = restart_service() + return success, message, None + + if action == "send_advert": + if not self.daemon_instance or not hasattr(self.daemon_instance, "send_advert"): + return False, "send_advert unavailable", None + success = await self.daemon_instance.send_advert() + return success, "Advert sent" if success else "Failed to send advert", None + + if action == "set_mode": + mode = str(params.get("mode", "")).strip() + if mode not in ("forward", "monitor", "no_tx"): + return False, "Invalid mode parameter", None + success, message = self._apply_config_update( + {"repeater": {"mode": mode}}, + merge_mode="patch", + ) + return success, message, None + + if action == "set_inform_interval": + interval = params.get("interval_seconds", params.get("interval")) + if not isinstance(interval, int): + return False, "interval_seconds must be an integer", None + interval = self._clamp_interval(interval) + self.inform_interval_seconds = interval + success, message = self._apply_config_update( + {"glass": {"inform_interval_seconds": interval}}, + merge_mode="patch", + ) + return success, message, None + if action == "rotate_cert": + return True, "Certificate rotation requested", None + + if action == "config_update": + config_patch = params.get("config", params) + merge_mode = str(params.get("merge_mode", "patch")) + success, message = self._apply_config_update(config_patch, merge_mode=merge_mode) + return success, message, None + + if action == "transport_keys_sync": + success, message, details = self._apply_transport_keys_sync(params) + return success, message, details + + if action == "set_radio": + radio_values = params.get("radio", params) + if not isinstance(radio_values, dict): + return False, "radio settings must be an object", None + success, message = self._apply_config_update({"radio": radio_values}, merge_mode="patch") + return success, message, None + + if action == "run_diagnostic": + stats = self.daemon_instance.get_stats() if self.daemon_instance else {} + return True, ( + f"rx={int(stats.get('rx_count', 0))}, " + f"tx={int(stats.get('forwarded_count', 0))}, " + f"dropped={int(stats.get('dropped_count', 0))}" + ), None + + if action == "export_config": + normalized_config = self._normalize_for_hash(self.config) + return ( + True, + "Configuration exported", + { + "config": normalized_config, + "config_hash": self._compute_config_hash(self.config), + }, + ) + + return False, f"Unsupported action: {action}", None + + def _apply_config_update(self, updates: Any, merge_mode: str = "patch") -> Tuple[bool, str]: + if not isinstance(updates, dict) or not updates: + return False, "Config update payload must be a non-empty object" + merge_mode = merge_mode.lower().strip() + + if merge_mode not in ("patch", "replace"): + return False, f"Unsupported merge_mode: {merge_mode}" + updates_to_apply = dict(updates) + managed_updates = updates_to_apply.pop("glass_managed", None) + if managed_updates is not None: + managed_ok, managed_message = self._save_managed_settings( + managed_updates, + replace=merge_mode == "replace", + ) + if not managed_ok: + return False, managed_message + self._reload_runtime_settings() + self._sync_mqtt_publisher() + + if not updates_to_apply: + return True, "Managed settings updated" + + sections = list(updates_to_apply.keys()) + + if merge_mode == "replace": + for section, value in updates_to_apply.items(): + self.config[section] = value + if self.config_manager: + saved = self.config_manager.save_to_file() + live_updated = self.config_manager.live_update_daemon(sections) + return ( + bool(saved and live_updated), + "Config replaced" if saved and live_updated else "Failed to persist replace update", + ) + return True, "Config replaced" + + # patch mode + if self.config_manager: + result = self.config_manager.update_and_save( + updates=updates_to_apply, + live_update=True, + live_update_sections=sections, + ) + if result.get("success"): + if "glass" in sections: + self._reload_runtime_settings() + self._sync_mqtt_publisher() + return True, "Config patched" + return False, str(result.get("error", "Failed to patch config")) + self._deep_merge(self.config, updates_to_apply) + if "glass" in sections: + self._reload_runtime_settings() + self._sync_mqtt_publisher() + return True, "Config patched" + + def _get_sqlite_handler(self): + if not self.daemon_instance: + return None + repeater_handler = getattr(self.daemon_instance, "repeater_handler", None) + storage = getattr(repeater_handler, "storage", None) + return getattr(storage, "sqlite_handler", None) + + def _apply_transport_keys_sync( + self, + params: Dict[str, Any], + ) -> Tuple[bool, str, Optional[Dict[str, Any]]]: + if not isinstance(params, dict): + return False, "transport_keys_sync params must be an object", None + entries = params.get("transport_keys") + if not isinstance(entries, list): + return False, "transport_keys_sync payload must include a transport_keys list", None + sqlite_handler = self._get_sqlite_handler() + if sqlite_handler is None: + return False, "SQLite handler unavailable for transport key sync", None + try: + result = sqlite_handler.sync_transport_keys(entries) + except Exception as exc: + return False, f"Transport key sync failed: {exc}", None + payload_hash = params.get("payload_hash") + details: Dict[str, Any] = { + "applied_nodes": int(result.get("applied_nodes", 0)), + "generated_keys": int(result.get("generated_keys", 0)), + } + if isinstance(payload_hash, str) and payload_hash.strip(): + details["payload_hash"] = payload_hash + return True, f"Applied transport key sync ({details['applied_nodes']} nodes)", details + + def _apply_cert_renewal(self, response: Dict[str, Any]) -> Tuple[bool, str]: + client_cert = response.get("client_cert") + client_key = response.get("client_key") + ca_cert = response.get("ca_cert") + + if not all(isinstance(item, str) and item.strip() for item in (client_cert, client_key, ca_cert)): + return False, "Missing certificate payload values" + + cert_dir = Path(self.cert_store_dir) + cert_dir.mkdir(parents=True, exist_ok=True) + + client_cert_path = cert_dir / "glass-client.crt" + client_key_path = cert_dir / "glass-client.key" + ca_cert_path = cert_dir / "glass-ca.crt" + + client_cert_path.write_text(client_cert, encoding="utf-8") + client_key_path.write_text(client_key, encoding="utf-8") + ca_cert_path.write_text(ca_cert, encoding="utf-8") + os.chmod(client_key_path, 0o600) + + return self._apply_config_update( + { + "glass": { + "client_cert_path": str(client_cert_path), + "client_key_path": str(client_key_path), + "ca_cert_path": str(ca_cert_path), + } + }, + merge_mode="patch", + ) + + async def _get_pending_command_results(self) -> List[Dict[str, Any]]: + async with self._pending_lock: + return list(self._pending_command_results) + + async def _queue_command_result( + self, + command_id: str, + status: str, + message: str, + details: Optional[Dict[str, Any]] = None, + ) -> None: + completed_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + result = { + "command_id": command_id, + "status": status, + "message": message[:1024] if message else "", + "completed_at": completed_at, + } + if details: + result["details"] = details + async with self._pending_lock: + self._pending_command_results.append(result) + + def publish_telemetry(self, record_type: str, record: Dict[str, Any]) -> None: + if not self.enabled or not self.mqtt_enabled or not self._mqtt_ready: + return + if not self._mqtt_client: + return + + node_name = self.config.get("repeater", {}).get("node_name", "unknown-repeater") + event_type = "event" + event_name: Optional[str] = record_type + if record_type in ("packet", "advert"): + event_type = record_type + event_name = None + + topic = self._mqtt_topic_for_record(node_name=node_name, record_type=record_type) + timestamp = self._to_rfc3339_timestamp(record.get("timestamp")) + payload = self._normalize_for_hash(record) + + envelope: Dict[str, Any] = { + "version": 1, + "type": event_type, + "topic": topic, + "node_name": node_name, + "timestamp": timestamp, + "payload": payload, + } + if event_type == "event" and event_name: + envelope["event_name"] = event_name + + try: + message = json.dumps(envelope, separators=(",", ":"), sort_keys=True, default=str) + self._mqtt_client.publish(topic, message, qos=0, retain=False) + except Exception as exc: + logger.debug("Failed publishing Glass telemetry MQTT message: %s", exc) + + def _mqtt_topic_for_record(self, *, node_name: str, record_type: str) -> str: + base = self.mqtt_base_topic.strip("/") or "glass" + if record_type in ("packet", "advert"): + return f"{base}/{node_name}/{record_type}" + return f"{base}/{node_name}/event/{record_type}" + + def _to_rfc3339_timestamp(self, value: Any) -> str: + if isinstance(value, (int, float)): + dt = datetime.fromtimestamp(float(value), timezone.utc) + elif isinstance(value, str): + normalized = value.strip() + if normalized.endswith("Z"): + return normalized + try: + dt = datetime.fromisoformat(normalized) + except ValueError: + dt = datetime.now(timezone.utc) + elif isinstance(value, datetime): + dt = value + else: + dt = datetime.now(timezone.utc) + + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + else: + dt = dt.astimezone(timezone.utc) + return dt.isoformat().replace("+00:00", "Z") + + def _init_mqtt_publisher(self) -> None: + if not self.mqtt_enabled: + self._close_mqtt_publisher() + return + if mqtt is None: + logger.warning("Glass MQTT telemetry publishing enabled but paho-mqtt is unavailable") + self._close_mqtt_publisher() + return + if self._mqtt_client is not None: + return + + try: + client = mqtt.Client() + if self.mqtt_username: + client.username_pw_set(self.mqtt_username, self.mqtt_password) + if self.mqtt_tls_enabled: + ca_certs = self._require_ssl_file(self.ca_cert_path, "ca_cert_path") if self.ca_cert_path else None + certfile = None + keyfile = None + if self.client_cert_path or self.client_key_path: + certfile = self._require_ssl_file(self.client_cert_path, "client_cert_path") + keyfile = self._require_ssl_file(self.client_key_path, "client_key_path") + cert_reqs = ssl.CERT_REQUIRED if self.verify_tls else ssl.CERT_NONE + client.tls_set( + ca_certs=ca_certs, + certfile=certfile, + keyfile=keyfile, + cert_reqs=cert_reqs, + tls_version=ssl.PROTOCOL_TLS_CLIENT, + ) + if not self.verify_tls: + client.tls_insecure_set(True) + client.on_connect = self._on_mqtt_connect + client.on_disconnect = self._on_mqtt_disconnect + client.connect_async(self.mqtt_broker_host, self.mqtt_broker_port, 60) + client.loop_start() + self._mqtt_client = client + self._mqtt_runtime_signature = self._current_mqtt_signature() + logger.info( + "Glass MQTT telemetry publisher started (%s:%s, base_topic=%s)", + self.mqtt_broker_host, + self.mqtt_broker_port, + self.mqtt_base_topic, + ) + except Exception as exc: + self._mqtt_client = None + self._mqtt_ready = False + self._mqtt_runtime_signature = None + logger.warning("Failed to start Glass MQTT telemetry publisher: %s", exc) + + def _close_mqtt_publisher(self) -> None: + client = self._mqtt_client + self._mqtt_client = None + self._mqtt_ready = False + self._mqtt_runtime_signature = None + if client is None: + return + try: + client.loop_stop() + client.disconnect() + except Exception as exc: + logger.debug("Error stopping Glass MQTT telemetry publisher: %s", exc) + + def _on_mqtt_connect(self, _client, _userdata, _flags, reason_code, _properties=None) -> None: + rc = getattr(reason_code, "value", reason_code) + if rc == 0: + self._mqtt_ready = True + logger.info("Glass MQTT telemetry publisher connected") + return + self._mqtt_ready = False + logger.warning("Glass MQTT telemetry publisher connect failed (code=%s)", rc) + + def _on_mqtt_disconnect(self, _client, _userdata, reason_code, _properties=None) -> None: + self._mqtt_ready = False + rc = getattr(reason_code, "value", reason_code) + if rc: + logger.warning("Glass MQTT telemetry publisher disconnected (code=%s)", rc) + + def _current_mqtt_signature( + self, + ) -> Tuple[str, int, str, bool, bool, Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]: + return ( + self.mqtt_broker_host, + self.mqtt_broker_port, + self.mqtt_base_topic, + self.mqtt_tls_enabled, + self.verify_tls, + self.ca_cert_path, + self.client_cert_path, + self.client_key_path, + self.mqtt_username, + self.mqtt_password, + ) + + def _sync_mqtt_publisher(self) -> None: + if not self.enabled or not self.mqtt_enabled: + self._close_mqtt_publisher() + return + if mqtt is None: + self._close_mqtt_publisher() + return + + signature = self._current_mqtt_signature() + if self._mqtt_client is None: + self._init_mqtt_publisher() + return + if self._mqtt_runtime_signature != signature: + self._close_mqtt_publisher() + self._init_mqtt_publisher() + + @staticmethod + def _deep_merge(target: Dict[str, Any], source: Dict[str, Any]) -> None: + for key, value in source.items(): + if ( + isinstance(value, dict) + and isinstance(target.get(key), dict) + ): + GlassHandler._deep_merge(target[key], value) + else: + target[key] = value + + @staticmethod + def _normalize_for_hash(value: Any) -> Any: + if isinstance(value, bytes): + return value.hex() + if isinstance(value, dict): + return {k: GlassHandler._normalize_for_hash(v) for k, v in value.items()} + if isinstance(value, list): + return [GlassHandler._normalize_for_hash(v) for v in value] + return value + + @staticmethod + def _compute_config_hash(config: dict) -> str: + normalized = GlassHandler._normalize_for_hash(config) + encoded = json.dumps(normalized, sort_keys=True, separators=(",", ":")).encode("utf-8") + digest = hashlib.sha256(encoded).hexdigest() + return f"sha256:{digest}" + + @staticmethod + def _clamp_interval(interval_seconds: int) -> int: + if interval_seconds < 5: + return 5 + if interval_seconds > 3600: + return 3600 + return interval_seconds diff --git a/repeater/data_acquisition/sqlite_handler.py b/repeater/data_acquisition/sqlite_handler.py index 9a582be..4edfd09 100644 --- a/repeater/data_acquisition/sqlite_handler.py +++ b/repeater/data_acquisition/sqlite_handler.py @@ -1588,6 +1588,122 @@ class SQLiteHandler: logger.error(f"Failed to delete transport key: {e}") return False + def sync_transport_keys(self, entries: List[Dict[str, Any]]) -> Dict[str, int]: + """ + Replace transport key tree from a canonical Glass payload. + + Args: + entries: Flat list of nodes with fields: + - node_id: unique stable id in payload + - name: key/group display name + - flood_policy: 'allow' | 'deny' + - transport_key: optional explicit key material + - parent_node_id: optional parent node reference + + Returns: + Dict containing applied node count and generated key count. + """ + if not isinstance(entries, list): + raise ValueError("transport_keys payload must be a list") + + normalized: Dict[str, Dict[str, Any]] = {} + used_names: set[str] = set() + for raw in entries: + if not isinstance(raw, dict): + raise ValueError("Each transport key entry must be an object") + node_id = str(raw.get("node_id", "")).strip() + name = str(raw.get("name", "")).strip() + flood_policy = str(raw.get("flood_policy", "")).strip().lower() + parent_node_id = raw.get("parent_node_id") + transport_key = raw.get("transport_key") + if not node_id: + raise ValueError("transport key entry is missing node_id") + if node_id in normalized: + raise ValueError(f"Duplicate node_id in payload: {node_id}") + if not name: + raise ValueError(f"transport key entry '{node_id}' is missing name") + if name in used_names: + raise ValueError(f"Duplicate transport key name in payload: {name}") + if flood_policy not in {"allow", "deny"}: + raise ValueError(f"Invalid flood_policy for '{name}': {flood_policy}") + if transport_key is not None and not isinstance(transport_key, str): + raise ValueError(f"transport_key for '{name}' must be a string or null") + normalized[node_id] = { + "node_id": node_id, + "name": name, + "flood_policy": flood_policy, + "parent_node_id": str(parent_node_id).strip() if parent_node_id else None, + "transport_key": transport_key.strip() if isinstance(transport_key, str) else None, + } + used_names.add(name) + + for node in normalized.values(): + parent_node_id = node.get("parent_node_id") + if parent_node_id and parent_node_id not in normalized: + raise ValueError( + f"Parent node '{parent_node_id}' does not exist for '{node['node_id']}'" + ) + + ordered: List[Dict[str, Any]] = [] + pending = dict(normalized) + resolved_ids: set[str] = set() + while pending: + progressed = False + for node_id, node in list(pending.items()): + parent_node_id = node.get("parent_node_id") + if parent_node_id and parent_node_id not in resolved_ids: + continue + ordered.append(node) + resolved_ids.add(node_id) + pending.pop(node_id) + progressed = True + if not progressed: + raise ValueError("Cycle detected in transport key tree payload") + + generated_keys = 0 + now = time.time() + with sqlite3.connect(self.sqlite_path) as conn: + conn.execute("PRAGMA foreign_keys = ON") + conn.execute("DELETE FROM transport_keys") + db_ids: Dict[str, int] = {} + for node in ordered: + transport_key = node.get("transport_key") + if not transport_key: + transport_key = self.generate_transport_key(node["name"]) + generated_keys += 1 + parent_id = ( + db_ids.get(node["parent_node_id"]) + if node.get("parent_node_id") + else None + ) + cursor = conn.execute( + """ + INSERT INTO transport_keys ( + name, + flood_policy, + transport_key, + parent_id, + last_used, + created_at, + updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + node["name"], + node["flood_policy"], + transport_key, + parent_id, + None, + now, + now, + ), + ) + db_ids[node["node_id"]] = int(cursor.lastrowid) + conn.commit() + + return {"applied_nodes": len(ordered), "generated_keys": generated_keys} + def delete_advert(self, advert_id: int) -> bool: try: with sqlite3.connect(self.sqlite_path) as conn: diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index b128165..fb33b59 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -18,6 +18,7 @@ class StorageCollector: def __init__(self, config: dict, local_identity=None, repeater_handler=None): self.config = config self.repeater_handler = repeater_handler + self.glass_publish_callback = None storage_dir_cfg = ( config.get("storage", {}).get("storage_dir") @@ -147,6 +148,7 @@ class StorageCollector: cumulative_counts = self.sqlite_handler.get_cumulative_counts() self.rrd_handler.update_packet_metrics(packet_record, cumulative_counts) self.mqtt_handler.publish(packet_record, "packet") + self._publish_to_glass(packet_record, "packet") # Broadcast to WebSocket clients for real-time updates if self.websocket_available: @@ -210,17 +212,20 @@ class StorageCollector: def record_advert(self, advert_record: dict): self.sqlite_handler.store_advert(advert_record) self.mqtt_handler.publish(advert_record, "advert") + self._publish_to_glass(advert_record, "advert") def record_noise_floor(self, noise_floor_dbm: float): noise_record = {"timestamp": time.time(), "noise_floor_dbm": noise_floor_dbm} self.sqlite_handler.store_noise_floor(noise_record) self.mqtt_handler.publish(noise_record, "noise_floor") + self._publish_to_glass(noise_record, "noise_floor") def record_crc_errors(self, count: int): """Record a batch of CRC errors detected since last poll.""" crc_record = {"timestamp": time.time(), "count": count} self.sqlite_handler.store_crc_errors(crc_record) self.mqtt_handler.publish(crc_record, "crc_errors") + self._publish_to_glass(crc_record, "crc_errors") def get_crc_error_count(self, hours: int = 24) -> int: return self.sqlite_handler.get_crc_error_count(hours) @@ -321,6 +326,17 @@ class StorageCollector: except Exception as e: logger.error(f"Error disconnecting LetsMesh handler: {e}") + def set_glass_publisher(self, publish_callback): + self.glass_publish_callback = publish_callback + + def _publish_to_glass(self, record: dict, record_type: str): + if not self.glass_publish_callback: + return + try: + self.glass_publish_callback(record_type, record) + except Exception as e: + logger.debug(f"Failed to publish telemetry to Glass MQTT: {e}") + def create_transport_key( self, name: str, diff --git a/repeater/main.py b/repeater/main.py index dab9fe5..d74d6d3 100644 --- a/repeater/main.py +++ b/repeater/main.py @@ -10,6 +10,7 @@ import time from repeater.companion.utils import validate_companion_node_name, normalize_companion_identity_key from repeater.config import get_radio_for_board, load_config, save_config from repeater.config_manager import ConfigManager +from repeater.data_acquisition.glass_handler import GlassHandler from repeater.engine import RepeaterHandler from repeater.handler_helpers import ( AdvertHelper, @@ -47,6 +48,7 @@ class RepeaterDaemon: self.text_helper = None self.path_helper = None self.protocol_request_helper = None + self.glass_handler = None self.acl = None self.router = None self.companion_bridges: dict[int, object] = {} @@ -337,6 +339,20 @@ class RepeaterDaemon: # When trace reaches final node, push PUSH_CODE_TRACE_DATA (0x89) to companion clients (firmware onTraceRecv) self.trace_helper.on_trace_complete = self._on_trace_complete_for_companions + # Optional pyMC_Glass integration loop (inform/control plane) + self.glass_handler = GlassHandler( + config=self.config, + daemon_instance=self, + config_manager=self.config_manager, + ) + await self.glass_handler.start() + if ( + self.repeater_handler + and self.repeater_handler.storage + and hasattr(self.repeater_handler.storage, "set_glass_publisher") + ): + self.repeater_handler.storage.set_glass_publisher(self.glass_handler.publish_telemetry) + except Exception as e: logger.error(f"Failed to initialize dispatcher: {e}") raise @@ -1023,6 +1039,13 @@ class RepeaterDaemon: except Exception as e: logger.warning(f"Error stopping HTTP server: {e}") + # Stop Glass inform loop + if self.glass_handler: + try: + await self.glass_handler.stop() + except Exception as e: + logger.warning(f"Error stopping Glass handler: {e}") + # Release radio resources if self.radio and hasattr(self.radio, "cleanup"): try: diff --git a/repeater/web/api_endpoints.py b/repeater/web/api_endpoints.py index 8b27504..f2eda0a 100644 --- a/repeater/web/api_endpoints.py +++ b/repeater/web/api_endpoints.py @@ -4492,7 +4492,7 @@ class APIEndpoints: # Sections we allow to be imported ALLOWED_SECTIONS = { "repeater", "mesh", "radio", "identities", "delays", - "ch341", "web", "letsmesh", "logging", "radio_type", + "ch341", "web", "letsmesh", "glass", "logging", "radio_type", } updated_sections = [] diff --git a/tests/test_glass_handler.py b/tests/test_glass_handler.py new file mode 100644 index 0000000..694520a --- /dev/null +++ b/tests/test_glass_handler.py @@ -0,0 +1,471 @@ +import asyncio +import importlib.util +import json +import time +from pathlib import Path +import pytest + +_MODULE_PATH = Path(__file__).resolve().parents[1] / "repeater" / "data_acquisition" / "glass_handler.py" +_SPEC = importlib.util.spec_from_file_location("repeater_glass_handler", _MODULE_PATH) +_MODULE = importlib.util.module_from_spec(_SPEC) +assert _SPEC and _SPEC.loader +_SPEC.loader.exec_module(_MODULE) +GlassHandler = _MODULE.GlassHandler + + +class _DummyIdentity: + @staticmethod + def get_public_key(): + return bytes.fromhex("ab" * 32) + + +class _DummyConfigManager: + def __init__(self): + self.calls = [] + + def update_and_save(self, updates, live_update=True, live_update_sections=None): + self.calls.append( + { + "updates": updates, + "live_update": live_update, + "live_update_sections": live_update_sections, + } + ) + return {"success": True, "saved": True, "live_updated": True} + + @staticmethod + def save_to_file(): + return True + + @staticmethod + def live_update_daemon(_sections): + return True + + +class _DummyDaemon: + def __init__(self): + self.local_identity = _DummyIdentity() + self.repeater_handler = type("RH", (), {"start_time": time.time() - 60})() + + @staticmethod + def get_stats(): + return { + "rx_count": 11, + "forwarded_count": 7, + "dropped_count": 2, + "flood_dup_count": 3, + "direct_dup_count": 1, + "sent_flood_count": 5, + "sent_direct_count": 2, + "utilization_percent": 4.2, + "noise_floor_dbm": -111.5, + "uptime_seconds": 60, + } + + @staticmethod + async def send_advert(): + return True + + +class _DummyMqttClient: + def __init__(self): + self.published = [] + + def publish(self, topic, message, qos=0, retain=False): + self.published.append( + { + "topic": topic, + "message": message, + "qos": qos, + "retain": retain, + } + ) + + +class _DummyPahoClient: + def __init__(self): + self.username = None + self.password = None + self.tls_set_kwargs = None + self.tls_insecure = None + self.connected = None + self.loop_started = False + self.loop_stopped = False + self.disconnected = False + self.on_connect = None + self.on_disconnect = None + + def username_pw_set(self, username, password): + self.username = username + self.password = password + + def tls_set(self, **kwargs): + self.tls_set_kwargs = kwargs + + def tls_insecure_set(self, value): + self.tls_insecure = value + + def connect_async(self, host, port, keepalive): + self.connected = (host, port, keepalive) + + def loop_start(self): + self.loop_started = True + + def loop_stop(self): + self.loop_stopped = True + + def disconnect(self): + self.disconnected = True + + +class _DummyPahoModule: + def __init__(self, client): + self._client = client + + def Client(self): + return self._client + + +class _DummyHttpResponse: + def __init__(self, payload): + self._payload = json.dumps(payload).encode("utf-8") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def read(self): + return self._payload + + +class _DummySslContext: + def __init__(self): + self.cert_chain = None + + def load_cert_chain(self, certfile, keyfile): + self.cert_chain = (certfile, keyfile) + + +def _make_config(): + return { + "repeater": { + "node_name": "mesh-repeater-01", + "mode": "forward", + "location": "51.5074,-0.1278", + "identity_key": "PRIVATE-KEY", + }, + "radio": { + "frequency": 869618000, + "spreading_factor": 8, + "bandwidth": 62500, + "tx_power": 14, + }, + "glass": { + "enabled": False, + "base_url": "http://localhost:8080", + "inform_interval_seconds": 30, + "request_timeout_seconds": 10, + "verify_tls": True, + "api_token": "", + "cert_store_dir": "/tmp/pymc-glass-test", + "mqtt_password": "super-secret", + }, + } + + +def test_compute_config_hash_has_expected_format(): + config = _make_config() + config["repeater"]["identity_key"] = b"\x01" * 32 + digest = GlassHandler._compute_config_hash(config) + assert digest.startswith("sha256:") + assert len(digest) == 71 + + +def test_build_inform_payload_contains_expected_fields(): + config = _make_config() + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + + asyncio.run(handler._queue_command_result("cmd-1", "success", "ok")) + payload = asyncio.run(handler._build_inform_payload()) + + assert payload["type"] == "inform" + assert payload["version"] == 1 + assert payload["node_name"] == "mesh-repeater-01" + assert payload["pubkey"].startswith("0x") + assert payload["config_hash"].startswith("sha256:") + assert payload["location"] == "51.907400,-0.157800" + assert payload["radio"]["frequency"] == 869618000 + assert payload["counters"]["duplicates"] == 4 + assert payload["settings"]["repeater"]["location"] == "51.9074,-0.1570" + assert payload["settings"]["repeater"]["identity_key"] == "" + assert payload["settings"]["glass"]["mqtt_password"] == "" + assert payload["command_results"][0]["command_id"] == "cmd-1" + + +def test_execute_set_mode_command_updates_config(): + config = _make_config() + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + + ok, message = asyncio.run(handler._execute_command_action("set_mode", {"mode": "monitor"})) + assert ok is True + assert "Config patched" in message + assert manager.calls + assert manager.calls[-1]["updates"]["repeater"]["mode"] == "monitor" + + +def test_handle_command_response_queues_result(): + config = _make_config() + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + + asyncio.run( + handler._handle_command_response( + { + "type": "command", + "command_id": "cmd-42", + "action": "run_diagnostic", + "params": {}, + } + ) + ) + + assert handler._pending_command_results + queued = handler._pending_command_results[-1] + assert queued["command_id"] == "cmd-42" + assert queued["status"] == "success" + + +def test_publish_telemetry_packet_envelope_to_expected_topic(): + config = _make_config() + config["glass"]["enabled"] = True + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + handler.mqtt_enabled = True + handler._mqtt_ready = True + handler._mqtt_client = _DummyMqttClient() + + handler.publish_telemetry( + "packet", + { + "timestamp": 1760000000, + "packet_hash": "ABCDEF123456", + "rssi": -80.5, + }, + ) + + assert len(handler._mqtt_client.published) == 1 + published = handler._mqtt_client.published[0] + assert published["topic"] == "glass/mesh-repeater-01/packet" + payload = json.loads(published["message"]) + assert payload["version"] == 1 + assert payload["type"] == "packet" + assert payload["node_name"] == "mesh-repeater-01" + assert payload["topic"] == "glass/mesh-repeater-01/packet" + assert payload["payload"]["packet_hash"] == "ABCDEF123456" + assert published["qos"] == 0 + assert published["retain"] is False + + +def test_publish_telemetry_event_uses_event_topic_suffix(): + config = _make_config() + config["glass"]["enabled"] = True + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + handler.mqtt_enabled = True + handler._mqtt_ready = True + handler._mqtt_client = _DummyMqttClient() + + handler.publish_telemetry( + "noise_floor", + { + "timestamp": "2026-04-15T12:30:45Z", + "noise_floor_dbm": -112.3, + }, + ) + + assert len(handler._mqtt_client.published) == 1 + published = handler._mqtt_client.published[0] + assert published["topic"] == "glass/mesh-repeater-01/event/noise_floor" + payload = json.loads(published["message"]) + assert payload["type"] == "event" + assert payload["event_name"] == "noise_floor" + assert payload["topic"] == "glass/mesh-repeater-01/event/noise_floor" + + +def test_apply_config_update_glass_managed_updates_runtime_and_file(tmp_path): + config = _make_config() + config["glass"]["enabled"] = True + config["glass"]["cert_store_dir"] = str(tmp_path) + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + handler._sync_mqtt_publisher = lambda: None + + ok, message = handler._apply_config_update( + { + "glass_managed": { + "mqtt_enabled": True, + "mqtt_broker_host": "emqx", + "mqtt_broker_port": 1883, + "mqtt_base_topic": "glass/fleet", + "mqtt_tls_enabled": True, + } + }, + merge_mode="patch", + ) + + assert ok is True + assert "Managed settings updated" in message + managed_path = tmp_path / "managed.json" + assert managed_path.exists() + managed = json.loads(managed_path.read_text(encoding="utf-8")) + assert managed["mqtt_enabled"] is True + assert managed["mqtt_broker_host"] == "emqx" + assert managed["mqtt_broker_port"] == 1883 + assert managed["mqtt_base_topic"] == "glass/fleet" + assert managed["mqtt_tls_enabled"] is True + assert handler.mqtt_enabled is True + assert handler.mqtt_broker_host == "emqx" + assert handler.mqtt_broker_port == 1883 + assert handler.mqtt_base_topic == "glass/fleet" + assert handler.mqtt_tls_enabled is True + + +def test_sync_mqtt_publisher_restarts_when_signature_changes(monkeypatch): + config = _make_config() + config["glass"]["enabled"] = True + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + handler.enabled = True + handler.mqtt_enabled = True + handler._mqtt_client = object() + handler._mqtt_runtime_signature = ("old-host", 1883, "glass", None, None) + + calls = [] + + def _fake_close(): + calls.append("close") + handler._mqtt_client = None + handler._mqtt_runtime_signature = None + + def _fake_init(): + calls.append("init") + + monkeypatch.setattr(_MODULE, "mqtt", object()) + monkeypatch.setattr(handler, "_close_mqtt_publisher", _fake_close) + monkeypatch.setattr(handler, "_init_mqtt_publisher", _fake_init) + + handler.mqtt_broker_host = "new-host" + handler._sync_mqtt_publisher() + + assert calls == ["close", "init"] + + +def test_init_mqtt_publisher_uses_mtls_cert_material(tmp_path, monkeypatch): + cert_path = tmp_path / "glass-client.crt" + key_path = tmp_path / "glass-client.key" + ca_path = tmp_path / "glass-ca.crt" + cert_path.write_text("CERT", encoding="utf-8") + key_path.write_text("KEY", encoding="utf-8") + ca_path.write_text("CA", encoding="utf-8") + + config = _make_config() + config["glass"]["enabled"] = True + config["glass"]["verify_tls"] = True + config["glass"]["client_cert_path"] = str(cert_path) + config["glass"]["client_key_path"] = str(key_path) + config["glass"]["ca_cert_path"] = str(ca_path) + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + handler.enabled = True + handler.mqtt_enabled = True + handler.mqtt_tls_enabled = True + handler.mqtt_broker_host = "emqx" + handler.mqtt_broker_port = 8883 + handler.mqtt_base_topic = "glass" + + fake_client = _DummyPahoClient() + monkeypatch.setattr(_MODULE, "mqtt", _DummyPahoModule(fake_client)) + + handler._init_mqtt_publisher() + + assert fake_client.tls_set_kwargs is not None + assert fake_client.tls_set_kwargs["ca_certs"] == str(ca_path) + assert fake_client.tls_set_kwargs["certfile"] == str(cert_path) + assert fake_client.tls_set_kwargs["keyfile"] == str(key_path) + assert fake_client.tls_set_kwargs["cert_reqs"] == _MODULE.ssl.CERT_REQUIRED + assert fake_client.connected == ("emqx", 8883, 60) + assert fake_client.loop_started is True + assert handler._mqtt_client is fake_client + assert handler._mqtt_runtime_signature == handler._current_mqtt_signature() + + +def test_post_inform_sync_uses_configured_ca_and_client_cert_chain(tmp_path, monkeypatch): + cert_path = tmp_path / "glass-client.crt" + key_path = tmp_path / "glass-client.key" + ca_path = tmp_path / "glass-ca.crt" + cert_path.write_text("CERT", encoding="utf-8") + key_path.write_text("KEY", encoding="utf-8") + ca_path.write_text("CA", encoding="utf-8") + + config = _make_config() + config["glass"]["enabled"] = True + config["glass"]["base_url"] = "https://glass.example" + config["glass"]["verify_tls"] = True + config["glass"]["client_cert_path"] = str(cert_path) + config["glass"]["client_key_path"] = str(key_path) + config["glass"]["ca_cert_path"] = str(ca_path) + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + + calls = {} + fake_context = _DummySslContext() + + def _fake_create_default_context(cafile=None): + calls["cafile"] = cafile + return fake_context + + def _fake_urlopen(req, timeout=None, context=None): + calls["timeout"] = timeout + calls["context"] = context + assert req.full_url == "https://glass.example/inform" + return _DummyHttpResponse({"type": "noop", "interval": 30}) + + monkeypatch.setattr(_MODULE.ssl, "create_default_context", _fake_create_default_context) + monkeypatch.setattr(_MODULE.request, "urlopen", _fake_urlopen) + + response = handler._post_inform_sync({"type": "inform"}) + + assert response["type"] == "noop" + assert calls["cafile"] == str(ca_path) + assert calls["context"] is fake_context + assert fake_context.cert_chain == (str(cert_path), str(key_path)) + + +def test_build_ssl_context_raises_when_client_key_missing(tmp_path): + cert_path = tmp_path / "glass-client.crt" + cert_path.write_text("CERT", encoding="utf-8") + + config = _make_config() + config["glass"]["enabled"] = True + config["glass"]["base_url"] = "https://glass.example" + config["glass"]["verify_tls"] = False + config["glass"]["client_cert_path"] = str(cert_path) + daemon = _DummyDaemon() + manager = _DummyConfigManager() + handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager) + + with pytest.raises(RuntimeError, match="client_key_path"): + handler._build_ssl_context("https://glass.example/inform")