import asyncio import hashlib import json import logging import os import ssl import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib import error, request from urllib.parse import urlparse import psutil try: import paho.mqtt.client as mqtt except ImportError: mqtt = None from repeater import __version__ from repeater.service_utils import restart_service logger = logging.getLogger("GlassHandler") _SENSITIVE_KEY_MARKERS = ( "password", "passphrase", "secret", "token", "private_key", "identity_key", "client_key", "api_key", ) _SENSITIVE_KEY_EXCEPTIONS = ("pubkey", "public_key") class GlassHandler: def __init__(self, config: dict, daemon_instance=None, config_manager=None): self.config = config self.daemon_instance = daemon_instance self.config_manager = config_manager self.enabled = False self.base_url = "http://localhost:8080" self.request_timeout_seconds = 10 self.verify_tls = True self.api_token = "" # nosec - runtime config value, not a hardcoded credential self.inform_interval_seconds = 30 self.cert_store_dir = "/etc/pymc_repeater/glass" self._cert_expires_at: Optional[str] = None self.mqtt_enabled = False self.mqtt_broker_host = "localhost" self.mqtt_broker_port = 1883 self.mqtt_base_topic = "glass" self.mqtt_tls_enabled = False self.mqtt_username: Optional[str] = None self.mqtt_password: Optional[str] = None self.client_cert_path: Optional[str] = None self.client_key_path: Optional[str] = None self.ca_cert_path: Optional[str] = None self._mqtt_client = None self._mqtt_ready = False self._mqtt_runtime_signature: Optional[ Tuple[ str, int, str, bool, bool, Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], ] ] = None self._managed_settings_filename = "managed.json" self._task: Optional[asyncio.Task] = None self._stop_event: Optional[asyncio.Event] = None self._pending_command_results: List[Dict[str, Any]] = [] self._pending_lock = asyncio.Lock() self._reload_runtime_settings() async def start(self) -> None: self._reload_runtime_settings() if not self.enabled: logger.info("Glass integration disabled") self._close_mqtt_publisher() return if self._task and not self._task.done(): return self._sync_mqtt_publisher() self._stop_event = asyncio.Event() self._task = asyncio.create_task(self._run_loop(), name="glass-inform-loop") logger.info( "Glass integration started (base_url=%s, inform_interval=%ss)", self.base_url, self.inform_interval_seconds, ) async def stop(self) -> None: if self._task: if self._stop_event: self._stop_event.set() try: await self._task except Exception as exc: logger.debug("Glass task stop ignored exception: %s", exc) finally: self._task = None self._stop_event = None self._close_mqtt_publisher() def _reload_runtime_settings(self) -> None: glass_cfg = self.config.get("glass", {}) self.enabled = bool(glass_cfg.get("enabled", False)) base_url = str(glass_cfg.get("base_url", "http://localhost:8080")).strip() self.base_url = base_url.rstrip("/") if base_url else "http://localhost:8080" self.request_timeout_seconds = max(3, int(glass_cfg.get("request_timeout_seconds", 10))) self.verify_tls = bool(glass_cfg.get("verify_tls", True)) self.api_token = str(glass_cfg.get("api_token", "") or "").strip() self.inform_interval_seconds = self._clamp_interval( int(glass_cfg.get("inform_interval_seconds", self.inform_interval_seconds)) ) self.cert_store_dir = str( glass_cfg.get("cert_store_dir", "/etc/pymc_repeater/glass") or "/etc/pymc_repeater/glass" ) self.client_cert_path = ( str(glass_cfg.get("client_cert_path")).strip() if glass_cfg.get("client_cert_path") else None ) self.client_key_path = ( str(glass_cfg.get("client_key_path")).strip() if glass_cfg.get("client_key_path") else None ) self.ca_cert_path = ( str(glass_cfg.get("ca_cert_path")).strip() if glass_cfg.get("ca_cert_path") else None ) managed_cfg = self._load_managed_settings() parsed_base_url = urlparse(self.base_url) default_host = parsed_base_url.hostname or "localhost" self.mqtt_enabled = bool(managed_cfg.get("mqtt_enabled", False)) host_value = managed_cfg.get("mqtt_broker_host", default_host) self.mqtt_broker_host = str(host_value or default_host).strip() or default_host try: self.mqtt_broker_port = max(1, int(managed_cfg.get("mqtt_broker_port", 1883))) except (TypeError, ValueError): self.mqtt_broker_port = 1883 topic_value = managed_cfg.get("mqtt_base_topic", "glass") self.mqtt_base_topic = str(topic_value or "glass").strip("/") self.mqtt_tls_enabled = bool(managed_cfg.get("mqtt_tls_enabled", False)) username = managed_cfg.get("mqtt_username") password = managed_cfg.get("mqtt_password") self.mqtt_username = ( str(username).strip() if isinstance(username, str) and username else None ) self.mqtt_password = str(password) if isinstance(password, str) and password else None def _managed_settings_path(self) -> Path: return Path(self.cert_store_dir) / self._managed_settings_filename def _load_managed_settings(self) -> Dict[str, Any]: path = self._managed_settings_path() if not path.exists(): return {} try: raw = json.loads(path.read_text(encoding="utf-8")) except Exception as exc: logger.warning("Invalid Glass managed settings file at %s: %s", path, exc) return {} if not isinstance(raw, dict): logger.warning("Ignoring non-object Glass managed settings file at %s", path) return {} return raw def _save_managed_settings(self, updates: Dict[str, Any], *, replace: bool) -> Tuple[bool, str]: if not isinstance(updates, dict): return False, "glass_managed must be an object" path = self._managed_settings_path() path.parent.mkdir(parents=True, exist_ok=True) current = {} if replace else self._load_managed_settings() if not isinstance(current, dict): current = {} merged = dict(current) merged.update(updates) try: path.write_text( json.dumps(merged, indent=2, sort_keys=True), encoding="utf-8", ) os.chmod(path, 0o600) return True, "Managed settings updated" except Exception as exc: return False, f"Failed writing managed settings: {exc}" async def _run_loop(self) -> None: while self._stop_event and not self._stop_event.is_set(): self._reload_runtime_settings() self._sync_mqtt_publisher() try: interval = await self._inform_once() except Exception as exc: logger.warning("Glass inform failed: %s", exc) interval = self.inform_interval_seconds wait_seconds = self._clamp_interval(interval) if not self._stop_event: break try: await asyncio.wait_for(self._stop_event.wait(), timeout=wait_seconds) except asyncio.TimeoutError: continue async def _inform_once(self) -> int: self._reload_runtime_settings() if not self.enabled: return self.inform_interval_seconds payload = await self._build_inform_payload() response = await self._post_inform(payload) if payload.get("command_results"): async with self._pending_lock: self._pending_command_results = [] response_type = str(response.get("type", "noop")) response_interval = response.get("interval") if response_type == "command": await self._handle_command_response(response) elif response_type == "config_update": ok, message = self._apply_config_update( response.get("config", {}), str(response.get("merge_mode", "patch")), ) if ok: logger.info("Applied Glass config update") else: logger.warning("Failed to apply Glass config update: %s", message) elif response_type == "cert_renewal": ok, message = self._apply_cert_renewal(response) if ok: logger.info("Applied Glass certificate renewal") else: logger.warning("Failed to apply Glass certificate renewal: %s", message) elif response_type == "upgrade": logger.warning("Glass upgrade action received but not implemented on repeater") elif response_type != "noop": logger.warning("Unknown Glass response type: %s", response_type) if isinstance(response_interval, int): self.inform_interval_seconds = self._clamp_interval(response_interval) return self.inform_interval_seconds async def _build_inform_payload(self) -> Dict[str, Any]: if not self.daemon_instance or not getattr(self.daemon_instance, "local_identity", None): raise RuntimeError("Local identity not available for Glass inform") stats = self.daemon_instance.get_stats() if self.daemon_instance else {} local_identity = self.daemon_instance.local_identity public_key = bytes(local_identity.get_public_key()).hex() node_name = self.config.get("repeater", {}).get("node_name", "unknown-repeater") uptime_seconds = int(stats.get("uptime_seconds", 0)) if uptime_seconds <= 0: repeater_handler = getattr(self.daemon_instance, "repeater_handler", None) if repeater_handler and getattr(repeater_handler, "start_time", None): uptime_seconds = max(0, int(time.time() - repeater_handler.start_time)) tx_total = int(stats.get("sent_flood_count", 0)) + int(stats.get("sent_direct_count", 0)) if tx_total <= 0: tx_total = int(stats.get("forwarded_count", 0)) command_results = await self._get_pending_command_results() settings_snapshot = self._build_settings_snapshot() location = self._extract_location_from_settings(settings_snapshot) return { "type": "inform", "version": 1, "node_name": node_name, "pubkey": f"0x{public_key}", "software_version": __version__, "state": self.config.get("repeater", {}).get("mode", "forward"), "location": location, "uptime_seconds": uptime_seconds, "config_hash": self._compute_config_hash(self.config), "cert_expires_at": self._cert_expires_at, "system": self._collect_system_stats(), "radio": { "frequency": int(self.config.get("radio", {}).get("frequency", 0)), "spreading_factor": int(self.config.get("radio", {}).get("spreading_factor", 7)), "bandwidth": int(self.config.get("radio", {}).get("bandwidth", 0)), "tx_power": int(self.config.get("radio", {}).get("tx_power", 0)), "noise_floor_dbm": stats.get("noise_floor_dbm"), "mode": self.config.get("repeater", {}).get("mode", "forward"), }, "counters": { "rx_total": int(stats.get("rx_count", 0)), "tx_total": max(0, tx_total), "forwarded": int(stats.get("forwarded_count", 0)), "dropped": int(stats.get("dropped_count", 0)), "duplicates": int(stats.get("flood_dup_count", 0)) + int(stats.get("direct_dup_count", 0)), "airtime_percent": float(stats.get("utilization_percent", 0.0)), }, "settings": settings_snapshot, "command_results": command_results, } def _build_settings_snapshot(self) -> Dict[str, Any]: normalized = self._normalize_for_hash(self.config) sanitized = self._sanitize_settings_for_export(normalized) if isinstance(sanitized, dict): return sanitized return {} def _sanitize_settings_for_export(self, value: Any, key_name: Optional[str] = None) -> Any: if isinstance(value, dict): output: Dict[str, Any] = {} for child_key, child_value in value.items(): if self._is_sensitive_key(child_key): output[child_key] = "" continue output[child_key] = self._sanitize_settings_for_export(child_value, child_key) return output if isinstance(value, list): return [self._sanitize_settings_for_export(item, key_name) for item in value] return value @staticmethod def _is_sensitive_key(key: str) -> bool: lowered = str(key).lower() if any(exception in lowered for exception in _SENSITIVE_KEY_EXCEPTIONS): return False return any(marker in lowered for marker in _SENSITIVE_KEY_MARKERS) @staticmethod def _normalize_location(value: Any) -> Optional[str]: if isinstance(value, str): text = value.strip() if not text: return None parts = [part.strip() for part in text.split(",")] if len(parts) != 2: return None try: lat = float(parts[0]) lng = float(parts[1]) except ValueError: return None elif isinstance(value, dict): lat = value.get("lat", value.get("latitude")) lng = value.get("lng", value.get("longitude")) try: if lat is None or lng is None: return None lat = float(lat) lng = float(lng) except (TypeError, ValueError): return None elif isinstance(value, (list, tuple)) and len(value) == 2: try: lat = float(value[0]) lng = float(value[1]) except (TypeError, ValueError): return None else: return None if lat < -90 or lat > 90 or lng < -180 or lng > 180: return None return f"{lat:.6f},{lng:.6f}" def _extract_location_from_settings(self, settings: Dict[str, Any]) -> Optional[str]: repeater_settings = settings.get("repeater") repeater_dict = repeater_settings if isinstance(repeater_settings, dict) else {} candidates = [ settings.get("location"), repeater_dict.get("location"), settings.get("gps"), repeater_dict.get("gps"), { "lat": repeater_dict.get("latitude"), "lng": repeater_dict.get("longitude"), }, ] for candidate in candidates: location = self._normalize_location(candidate) if location: return location return None def _collect_system_stats(self) -> Dict[str, Any]: temperature_c = None try: temperatures = ( psutil.sensors_temperatures() if hasattr(psutil, "sensors_temperatures") else {} ) if temperatures: for values in temperatures.values(): if values: temperature_c = values[0].current break except Exception: temperature_c = None load_avg_1m = None try: if hasattr(os, "getloadavg"): load_avg_1m = float(os.getloadavg()[0]) except Exception: load_avg_1m = None return { "cpu_percent": float(psutil.cpu_percent(interval=None)), "memory_percent": float(psutil.virtual_memory().percent), "disk_percent": float(psutil.disk_usage("/").percent), "temperature_c": temperature_c, "load_avg_1m": load_avg_1m, } async def _post_inform(self, payload: Dict[str, Any]) -> Dict[str, Any]: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, self._post_inform_sync, payload) def _post_inform_sync(self, payload: Dict[str, Any]) -> Dict[str, Any]: url = f"{self.base_url}/inform" self._validate_http_url(url) headers = {"Content-Type": "application/json"} if self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" body = json.dumps(payload).encode("utf-8") req = request.Request(url=url, data=body, method="POST", headers=headers) ssl_context = self._build_ssl_context(url) try: with request.urlopen( req, timeout=self.request_timeout_seconds, context=ssl_context, ) as response: # nosec B310 response_bytes = response.read() except error.HTTPError as exc: details = "" try: details = exc.read().decode("utf-8") except Exception: details = str(exc) raise RuntimeError(f"HTTP {exc.code}: {details}") from exc except error.URLError as exc: raise RuntimeError(f"Connection error: {exc}") from exc if not response_bytes: return {"type": "noop", "interval": self.inform_interval_seconds} try: response_payload = json.loads(response_bytes.decode("utf-8")) except Exception as exc: raise RuntimeError("Invalid JSON response from Glass backend") from exc if not isinstance(response_payload, dict): raise RuntimeError("Invalid response payload from Glass backend") return response_payload def _build_ssl_context(self, url: str) -> Optional[ssl.SSLContext]: if not str(url).startswith("https"): return None if self.verify_tls: if self.ca_cert_path: ca_path = self._require_ssl_file(self.ca_cert_path, "ca_cert_path") context = ssl.create_default_context(cafile=ca_path) else: context = ssl.create_default_context() else: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE if self.client_cert_path or self.client_key_path: cert_path = self._require_ssl_file(self.client_cert_path, "client_cert_path") key_path = self._require_ssl_file(self.client_key_path, "client_key_path") context.load_cert_chain(certfile=cert_path, keyfile=key_path) return context @staticmethod def _require_ssl_file(path_value: Optional[str], field_name: str) -> str: if not path_value or not str(path_value).strip(): raise RuntimeError(f"Missing {field_name} for Glass TLS configuration") normalized = str(path_value).strip() if not Path(normalized).exists(): raise RuntimeError(f"Configured {field_name} does not exist: {normalized}") return normalized @staticmethod def _validate_http_url(url: str) -> None: parsed = urlparse(url) if parsed.scheme not in {"http", "https"}: raise RuntimeError(f"Unsupported Glass base_url scheme: {parsed.scheme or ''}") if not parsed.netloc: raise RuntimeError("Glass base_url must include a host") async def _handle_command_response(self, response: Dict[str, Any]) -> None: command_id = str(response.get("command_id", "")).strip() action = str(response.get("action", "")).strip() params = response.get("params", {}) if not command_id or not action: logger.warning("Glass command response missing command_id or action") return success = False message = "Action failed" details: Optional[Dict[str, Any]] = None try: success, message, details = await self._execute_command_action(action, params) except Exception as exc: success = False message = f"Exception executing action: {exc}" details = None await self._queue_command_result( command_id=command_id, status="success" if success else "failed", message=message, details=details, ) async def _execute_command_action( self, action: str, params: Any, ) -> Tuple[bool, str, Optional[Dict[str, Any]]]: params = params if isinstance(params, dict) else {} if action == "restart_service": success, message = restart_service() return success, message, None if action == "send_advert": if not self.daemon_instance or not hasattr(self.daemon_instance, "send_advert"): return False, "send_advert unavailable", None success = await self.daemon_instance.send_advert() return success, "Advert sent" if success else "Failed to send advert", None if action == "set_mode": mode = str(params.get("mode", "")).strip() if mode not in ("forward", "monitor", "no_tx"): return False, "Invalid mode parameter", None success, message = self._apply_config_update( {"repeater": {"mode": mode}}, merge_mode="patch", ) return success, message, None if action == "set_inform_interval": interval = params.get("interval_seconds", params.get("interval")) if not isinstance(interval, int): return False, "interval_seconds must be an integer", None interval = self._clamp_interval(interval) self.inform_interval_seconds = interval success, message = self._apply_config_update( {"glass": {"inform_interval_seconds": interval}}, merge_mode="patch", ) return success, message, None if action == "rotate_cert": return True, "Certificate rotation requested", None if action == "config_update": config_patch = params.get("config", params) merge_mode = str(params.get("merge_mode", "patch")) success, message = self._apply_config_update(config_patch, merge_mode=merge_mode) return success, message, None if action == "transport_keys_sync": success, message, details = self._apply_transport_keys_sync(params) return success, message, details if action == "set_radio": radio_values = params.get("radio", params) if not isinstance(radio_values, dict): return False, "radio settings must be an object", None success, message = self._apply_config_update( {"radio": radio_values}, merge_mode="patch" ) return success, message, None if action == "run_diagnostic": stats = self.daemon_instance.get_stats() if self.daemon_instance else {} return ( True, ( f"rx={int(stats.get('rx_count', 0))}, " f"tx={int(stats.get('forwarded_count', 0))}, " f"dropped={int(stats.get('dropped_count', 0))}" ), None, ) if action == "export_config": normalized_config = self._normalize_for_hash(self.config) return ( True, "Configuration exported", { "config": normalized_config, "config_hash": self._compute_config_hash(self.config), }, ) return False, f"Unsupported action: {action}", None def _apply_config_update(self, updates: Any, merge_mode: str = "patch") -> Tuple[bool, str]: if not isinstance(updates, dict) or not updates: return False, "Config update payload must be a non-empty object" merge_mode = merge_mode.lower().strip() if merge_mode not in ("patch", "replace"): return False, f"Unsupported merge_mode: {merge_mode}" updates_to_apply = dict(updates) managed_updates = updates_to_apply.pop("glass_managed", None) if managed_updates is not None: managed_ok, managed_message = self._save_managed_settings( managed_updates, replace=merge_mode == "replace", ) if not managed_ok: return False, managed_message self._reload_runtime_settings() self._sync_mqtt_publisher() if not updates_to_apply: return True, "Managed settings updated" sections = list(updates_to_apply.keys()) if merge_mode == "replace": for section, value in updates_to_apply.items(): self.config[section] = value if self.config_manager: saved = self.config_manager.save_to_file() live_updated = self.config_manager.live_update_daemon(sections) return ( bool(saved and live_updated), "Config replaced" if saved and live_updated else "Failed to persist replace update", ) return True, "Config replaced" # patch mode if self.config_manager: result = self.config_manager.update_and_save( updates=updates_to_apply, live_update=True, live_update_sections=sections, ) if result.get("success"): if "glass" in sections: self._reload_runtime_settings() self._sync_mqtt_publisher() return True, "Config patched" return False, str(result.get("error", "Failed to patch config")) self._deep_merge(self.config, updates_to_apply) if "glass" in sections: self._reload_runtime_settings() self._sync_mqtt_publisher() return True, "Config patched" def _get_sqlite_handler(self): if not self.daemon_instance: return None repeater_handler = getattr(self.daemon_instance, "repeater_handler", None) storage = getattr(repeater_handler, "storage", None) return getattr(storage, "sqlite_handler", None) def _apply_transport_keys_sync( self, params: Dict[str, Any], ) -> Tuple[bool, str, Optional[Dict[str, Any]]]: if not isinstance(params, dict): return False, "transport_keys_sync params must be an object", None entries = params.get("transport_keys") if not isinstance(entries, list): return False, "transport_keys_sync payload must include a transport_keys list", None sqlite_handler = self._get_sqlite_handler() if sqlite_handler is None: return False, "SQLite handler unavailable for transport key sync", None try: result = sqlite_handler.sync_transport_keys(entries) except Exception as exc: return False, f"Transport key sync failed: {exc}", None payload_hash = params.get("payload_hash") details: Dict[str, Any] = { "applied_nodes": int(result.get("applied_nodes", 0)), "generated_keys": int(result.get("generated_keys", 0)), } if isinstance(payload_hash, str) and payload_hash.strip(): details["payload_hash"] = payload_hash return True, f"Applied transport key sync ({details['applied_nodes']} nodes)", details def _apply_cert_renewal(self, response: Dict[str, Any]) -> Tuple[bool, str]: client_cert = response.get("client_cert") client_key = response.get("client_key") ca_cert = response.get("ca_cert") if not all( isinstance(item, str) and item.strip() for item in (client_cert, client_key, ca_cert) ): return False, "Missing certificate payload values" cert_dir = Path(self.cert_store_dir) cert_dir.mkdir(parents=True, exist_ok=True) client_cert_path = cert_dir / "glass-client.crt" client_key_path = cert_dir / "glass-client.key" ca_cert_path = cert_dir / "glass-ca.crt" client_cert_path.write_text(client_cert, encoding="utf-8") client_key_path.write_text(client_key, encoding="utf-8") ca_cert_path.write_text(ca_cert, encoding="utf-8") os.chmod(client_key_path, 0o600) return self._apply_config_update( { "glass": { "client_cert_path": str(client_cert_path), "client_key_path": str(client_key_path), "ca_cert_path": str(ca_cert_path), } }, merge_mode="patch", ) async def _get_pending_command_results(self) -> List[Dict[str, Any]]: async with self._pending_lock: return list(self._pending_command_results) async def _queue_command_result( self, command_id: str, status: str, message: str, details: Optional[Dict[str, Any]] = None, ) -> None: completed_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") result = { "command_id": command_id, "status": status, "message": message[:1024] if message else "", "completed_at": completed_at, } if details: result["details"] = details async with self._pending_lock: self._pending_command_results.append(result) def publish_telemetry(self, record_type: str, record: Dict[str, Any]) -> None: if not self.enabled or not self.mqtt_enabled or not self._mqtt_ready: return if not self._mqtt_client: return node_name = self.config.get("repeater", {}).get("node_name", "unknown-repeater") event_type = "event" event_name: Optional[str] = record_type if record_type in ("packet", "advert"): event_type = record_type event_name = None topic = self._mqtt_topic_for_record(node_name=node_name, record_type=record_type) timestamp = self._to_rfc3339_timestamp(record.get("timestamp")) payload = self._normalize_for_hash(record) envelope: Dict[str, Any] = { "version": 1, "type": event_type, "topic": topic, "node_name": node_name, "timestamp": timestamp, "payload": payload, } if event_type == "event" and event_name: envelope["event_name"] = event_name try: message = json.dumps(envelope, separators=(",", ":"), sort_keys=True, default=str) self._mqtt_client.publish(topic, message, qos=0, retain=False) except Exception as exc: logger.debug("Failed publishing Glass telemetry MQTT message: %s", exc) def _mqtt_topic_for_record(self, *, node_name: str, record_type: str) -> str: base = self.mqtt_base_topic.strip("/") or "glass" if record_type in ("packet", "advert"): return f"{base}/{node_name}/{record_type}" return f"{base}/{node_name}/event/{record_type}" def _to_rfc3339_timestamp(self, value: Any) -> str: if isinstance(value, (int, float)): dt = datetime.fromtimestamp(float(value), timezone.utc) elif isinstance(value, str): normalized = value.strip() if normalized.endswith("Z"): return normalized try: dt = datetime.fromisoformat(normalized) except ValueError: dt = datetime.now(timezone.utc) elif isinstance(value, datetime): dt = value else: dt = datetime.now(timezone.utc) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: dt = dt.astimezone(timezone.utc) return dt.isoformat().replace("+00:00", "Z") def _init_mqtt_publisher(self) -> None: if not self.mqtt_enabled: self._close_mqtt_publisher() return if mqtt is None: logger.warning("Glass MQTT telemetry publishing enabled but paho-mqtt is unavailable") self._close_mqtt_publisher() return if self._mqtt_client is not None: return try: client = mqtt.Client() if self.mqtt_username: client.username_pw_set(self.mqtt_username, self.mqtt_password) if self.mqtt_tls_enabled: ca_certs = ( self._require_ssl_file(self.ca_cert_path, "ca_cert_path") if self.ca_cert_path else None ) certfile = None keyfile = None if self.client_cert_path or self.client_key_path: certfile = self._require_ssl_file(self.client_cert_path, "client_cert_path") keyfile = self._require_ssl_file(self.client_key_path, "client_key_path") cert_reqs = ssl.CERT_REQUIRED if self.verify_tls else ssl.CERT_NONE client.tls_set( ca_certs=ca_certs, certfile=certfile, keyfile=keyfile, cert_reqs=cert_reqs, tls_version=ssl.PROTOCOL_TLS_CLIENT, ) if not self.verify_tls: client.tls_insecure_set(True) client.on_connect = self._on_mqtt_connect client.on_disconnect = self._on_mqtt_disconnect client.connect_async(self.mqtt_broker_host, self.mqtt_broker_port, 60) client.loop_start() self._mqtt_client = client self._mqtt_runtime_signature = self._current_mqtt_signature() logger.info( "Glass MQTT telemetry publisher started (%s:%s, base_topic=%s)", self.mqtt_broker_host, self.mqtt_broker_port, self.mqtt_base_topic, ) except Exception as exc: self._mqtt_client = None self._mqtt_ready = False self._mqtt_runtime_signature = None logger.warning("Failed to start Glass MQTT telemetry publisher: %s", exc) def _close_mqtt_publisher(self) -> None: client = self._mqtt_client self._mqtt_client = None self._mqtt_ready = False self._mqtt_runtime_signature = None if client is None: return try: client.loop_stop() client.disconnect() except Exception as exc: logger.debug("Error stopping Glass MQTT telemetry publisher: %s", exc) def _on_mqtt_connect(self, _client, _userdata, _flags, reason_code, _properties=None) -> None: rc = getattr(reason_code, "value", reason_code) if rc == 0: self._mqtt_ready = True logger.info("Glass MQTT telemetry publisher connected") return self._mqtt_ready = False logger.warning("Glass MQTT telemetry publisher connect failed (code=%s)", rc) def _on_mqtt_disconnect(self, _client, _userdata, reason_code, _properties=None) -> None: self._mqtt_ready = False rc = getattr(reason_code, "value", reason_code) if rc: logger.warning("Glass MQTT telemetry publisher disconnected (code=%s)", rc) def _current_mqtt_signature( self, ) -> Tuple[ str, int, str, bool, bool, Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], ]: return ( self.mqtt_broker_host, self.mqtt_broker_port, self.mqtt_base_topic, self.mqtt_tls_enabled, self.verify_tls, self.ca_cert_path, self.client_cert_path, self.client_key_path, self.mqtt_username, self.mqtt_password, ) def _sync_mqtt_publisher(self) -> None: if not self.enabled or not self.mqtt_enabled: self._close_mqtt_publisher() return if mqtt is None: self._close_mqtt_publisher() return signature = self._current_mqtt_signature() if self._mqtt_client is None: self._init_mqtt_publisher() return if self._mqtt_runtime_signature != signature: self._close_mqtt_publisher() self._init_mqtt_publisher() @staticmethod def _deep_merge(target: Dict[str, Any], source: Dict[str, Any]) -> None: for key, value in source.items(): if isinstance(value, dict) and isinstance(target.get(key), dict): GlassHandler._deep_merge(target[key], value) else: target[key] = value @staticmethod def _normalize_for_hash(value: Any) -> Any: if isinstance(value, bytes): return value.hex() if isinstance(value, dict): return {k: GlassHandler._normalize_for_hash(v) for k, v in value.items()} if isinstance(value, list): return [GlassHandler._normalize_for_hash(v) for v in value] return value @staticmethod def _compute_config_hash(config: dict) -> str: normalized = GlassHandler._normalize_for_hash(config) encoded = json.dumps(normalized, sort_keys=True, separators=(",", ":")).encode("utf-8") digest = hashlib.sha256(encoded).hexdigest() return f"sha256:{digest}" @staticmethod def _clamp_interval(interval_seconds: int) -> int: if interval_seconds < 5: return 5 if interval_seconds > 3600: return 3600 return interval_seconds