import base64 import logging import os from pathlib import Path from typing import Any, Dict, Optional, overload import yaml from repeater.policy_engine import default_policy_engine_config logger = logging.getLogger("Config") def _resolve_policy_config_path(config: Dict[str, Any], config_path: str) -> Path: policy_section = config.get("policy", {}) if isinstance(config.get("policy"), dict) else {} configured = policy_section.get("policy_file") or "policy.yaml" base_dir = Path(config_path).expanduser().resolve().parent p = Path(str(configured)).expanduser() if not p.is_absolute(): p = (base_dir / p).resolve() return p def _load_policy_engine_config(config: Dict[str, Any], config_path: str) -> Dict[str, Any]: policy_path = _resolve_policy_config_path(config, config_path) defaults = default_policy_engine_config() if not policy_path.exists(): logger.info("Policy file not found at %s, policy engine disabled", policy_path) config["policy_engine"] = defaults config["policy_file_path"] = str(policy_path) return config try: with open(policy_path) as f: loaded = yaml.safe_load(f) or {} if isinstance(loaded, dict) and isinstance(loaded.get("policy_engine"), dict): policy_cfg = loaded.get("policy_engine") elif isinstance(loaded, dict): policy_cfg = loaded else: policy_cfg = {} merged = dict(defaults) if isinstance(policy_cfg, dict): merged.update(policy_cfg) if not isinstance(merged.get("rules"), list): merged["rules"] = [] if not isinstance(merged.get("objects"), dict): merged["objects"] = {} config["policy_engine"] = merged config["policy_file_path"] = str(policy_path) logger.info("Loaded policy config from %s", policy_path) return config except Exception as e: logger.warning( "Failed to load policy config from %s: %s. Policy engine disabled.", policy_path, e, ) config["policy_engine"] = defaults config["policy_file_path"] = str(policy_path) return config class NullRadio: """No-op radio used when radio_type disables hardware initialization.""" def __init__(self): self._rx_callback = None def begin(self): return True async def send(self, data: bytes): raise RuntimeError("Radio is disabled (radio_type is null/none)") async def wait_for_rx(self) -> bytes: import asyncio while True: await asyncio.sleep(3600) def sleep(self): return None def get_last_rssi(self) -> int: return 0 def get_last_snr(self) -> float: return 0.0 def set_rx_callback(self, callback): self._rx_callback = callback def check_radio_health(self): return True def resolve_storage_dir( config: Dict[str, Any], *, config_path: Optional[str] = None, default: str = "/var/lib/pymc_repeater", ) -> Path: storage_dir_cfg = ( config.get("storage", {}).get("storage_dir") or config.get("storage_dir") or default ) storage_dir = Path(str(storage_dir_cfg)).expanduser() if not storage_dir.is_absolute(): if config_path: base_dir = Path(config_path).expanduser().resolve().parent storage_dir = (base_dir / storage_dir).resolve() else: storage_dir = storage_dir.resolve() return storage_dir def get_node_info(config: Dict[str, Any]) -> Dict[str, Any]: """ Extract node name, radio configuration, and MQTT settings from config. Args: config: Configuration dictionary Returns: Dictionary with node_name, radio_config, and MQTT configuration """ node_name = config.get("repeater", {}).get("node_name", "PyMC-Repeater") radio_config = config.get("radio", {}) radio_freq = radio_config.get("frequency", 0.0) radio_bw = radio_config.get("bandwidth", 0.0) radio_sf = radio_config.get("spreading_factor", 7) radio_cr = radio_config.get("coding_rate", 5) # Format frequency in MHz and bandwidth in kHz radio_freq_mhz = radio_freq / 1_000_000 radio_bw_khz = radio_bw / 1_000 radio_config_str = f"{radio_freq_mhz},{radio_bw_khz},{radio_sf},{radio_cr}" # Handle getting the config from mqtt brokers, falling back to letsmesh if it doesn't exist mqtt_config = config.get("mqtt_brokers", config.get("letsmesh", {})) return { "node_name": node_name, "radio_config": radio_config_str, "iata_code": mqtt_config.get("iata_code", "TEST"), "status_interval": mqtt_config.get("status_interval", 60), "model": mqtt_config.get("model", "PyMC-Repeater"), "email": mqtt_config.get("email", ""), "owner": mqtt_config.get("owner", ""), } def load_config(config_path: Optional[str] = None) -> Dict[str, Any]: if config_path is None: config_path = os.getenv("PYMC_REPEATER_CONFIG", "/etc/pymc_repeater/config.yaml") # Check if config file exists if not Path(config_path).exists(): raise FileNotFoundError( f"Configuration file not found: {config_path}\n" f"Please create a config file. Example: \n" f" sudo cp {Path(config_path).parent}/config.yaml.example {config_path}\n" f" sudo nano {config_path}" ) # Load from file - no defaults, all settings must be in config file try: with open(config_path) as f: config = yaml.safe_load(f) or {} logger.info(f"Loaded config from {config_path}") except Exception as e: raise RuntimeError(f"Failed to load configuration from {config_path}: {e}") from e storage_dir = resolve_storage_dir(config, config_path=config_path) if "storage" not in config or not isinstance(config.get("storage"), dict): config["storage"] = {} config["storage"]["storage_dir"] = str(storage_dir) if config.get("storage_dir"): logger.warning( "Deprecated config key 'storage_dir' detected; prefer 'storage.storage_dir'." ) if "mesh" not in config: config["mesh"] = {} if "glass" not in config: config["glass"] = { "enabled": False, "base_url": "http://localhost:8080", "inform_interval_seconds": 30, "request_timeout_seconds": 10, "verify_tls": True, "api_token": None, "cert_store_dir": "/etc/pymc_repeater/glass", } if "gps" not in config: config["gps"] = { "enabled": False, "api_fallback_to_config_location": True, "advertise_gps_location": False, "location_precision_digits": None, "source": "serial", "device": "/dev/serial0", "baud_rate": 9600, "read_timeout_seconds": 1.0, "reconnect_interval_seconds": 5.0, "stale_after_seconds": 10.0, "retain_sentences": 25, "validate_checksum": True, "require_checksum": False, "time_sync_enabled": True, "time_sync_interval_seconds": 3600.0, "time_sync_min_offset_seconds": 1.0, "time_sync_min_valid_year": 2020, "persist_gps_fix_to_config": False, "persist_gps_fix_interval_seconds": 600.0, } if "sensors" not in config: config["sensors"] = { "enabled": False, "poll_interval_seconds": 30.0, "auto_install_packages": False, "definitions": [], } # Ensure repeater.security exists with defaults for upgrades from older configs if "repeater" not in config: config["repeater"] = {} if "security" not in config["repeater"]: logger.warning( "No 'security' section found under 'repeater' in config. " "Adding secure placeholders — complete setup wizard before login." ) config["repeater"]["security"] = { "max_clients": 1, "admin_password": None, "guest_password": None, "allow_read_only": False, "jwt_secret": None, "jwt_expiry_minutes": 60, } # Only auto-generate identity_key if not provided under repeater section if "identity_key" not in config["repeater"]: # Check if identity_file is specified identity_file = config["repeater"].get("identity_file") if identity_file: config["repeater"]["identity_key"] = _load_or_create_identity_key(path=identity_file) else: config["repeater"]["identity_key"] = _load_or_create_identity_key() if os.getenv("PYMC_REPEATER_LOG_LEVEL"): if "logging" not in config: config["logging"] = {} config["logging"]["level"] = os.getenv("PYMC_REPEATER_LOG_LEVEL") config = _load_policy_engine_config(config, config_path) return config def save_config(config_data: Dict[str, Any], config_path: Optional[str] = None) -> bool: """ Save configuration to YAML file. Args: config_data: Configuration dictionary to save config_path: Path to config file (uses default if None) Returns: True if successful, False otherwise """ if config_path is None: config_path = os.getenv("PYMC_REPEATER_CONFIG", "/etc/pymc_repeater/config.yaml") try: # Create backup of existing config config_file = Path(config_path) if config_file.exists(): backup_path = config_file.with_suffix(".yaml.backup") config_file.rename(backup_path) logger.info(f"Created backup at {backup_path}") # Save new config (allow_unicode=True so emojis etc. are not escaped as \U0001F47E) with open(config_path, "w", encoding="utf-8") as f: yaml.safe_dump( config_data, f, default_flow_style=False, sort_keys=False, allow_unicode=True, width=1000000, ) logger.info(f"Saved configuration to {config_path}") return True except Exception as e: logger.error(f"Failed to save configuration: {e}") return False def update_unscoped_flood_policy(allow: bool, config_path: Optional[str] = None) -> bool: """ Update the unscoped flood policy in the configuration. Args: allow: True to allow unscoped flooding, False to deny config_path: Path to config file (uses default if None) Returns: True if successful, False otherwise """ try: # Load current config config = load_config(config_path) # Ensure mesh section exists if "mesh" not in config: config["mesh"] = {} # Set global flood policy config["mesh"]["global_flood_allow"] = allow config["mesh"]["unscoped_flood_allow"] = allow # Save updated config return save_config(config, config_path) except Exception as e: logger.error(f"Failed to update unscoped flood policy: {e}") return False def _load_or_create_identity_key(path: Optional[str] = None) -> bytes: if path is None: # Check system-wide location first (matches config.yaml location) system_key_path = Path("/etc/pymc_repeater/identity.key") if system_key_path.exists(): key_path = system_key_path else: # Follow XDG spec xdg_config_home = os.environ.get("XDG_CONFIG_HOME") if xdg_config_home: config_dir = Path(xdg_config_home) / "pymc_repeater" else: config_dir = Path.home() / ".config" / "pymc_repeater" key_path = config_dir / "identity.key" else: key_path = Path(path) key_path.parent.mkdir(parents=True, exist_ok=True) if key_path.exists(): try: with open(key_path, "rb") as f: encoded = f.read() key = base64.b64decode(encoded) if len(key) not in (32, 64): raise ValueError(f"Invalid key length: {len(key)}, expected 32 or 64") logger.info(f"Loaded existing identity key from {key_path}") return key except Exception as e: logger.warning(f"Failed to load identity key: {e}") # Generate new random key key = os.urandom(32) # Save it try: with open(key_path, "wb") as f: f.write(base64.b64encode(key)) os.chmod(key_path, 0o600) # Restrict permissions logger.info(f"Generated and stored new identity key at {key_path}") except Exception as e: logger.warning(f"Failed to save identity key: {e}") return key def get_radio_for_board(board_config: dict): @overload def _parse_int(value, *, default: None = None) -> Optional[int]: ... @overload def _parse_int(value, *, default: int) -> int: ... def _parse_int(value, *, default=None): if value is None: return default if isinstance(value, int): return value if isinstance(value, str): return int(value.strip().rstrip(","), 0) raise ValueError(f"Invalid int value type: {type(value)}") def _parse_int_list(value): if value is None: return None if isinstance(value, (list, tuple)): return [_parse_int(item) for item in value] if isinstance(value, str): stripped = value.strip() if not stripped: return [] if stripped[0] == "[" and stripped[-1] == "]": stripped = stripped[1:-1] return [_parse_int(item) for item in stripped.split(",") if item.strip()] raise ValueError(f"Invalid int list value type: {type(value)}") radio_type_raw = board_config.get("radio_type") if radio_type_raw is None: radio_type = "none" else: radio_type = str(radio_type_raw).lower().strip() if radio_type in ("", "none", "null", "disabled", "off", "no_radio"): logger.warning("Radio disabled by configuration (radio_type=%r)", radio_type_raw) return NullRadio() if radio_type == "kiss-modem": radio_type = "kiss" if radio_type in ("sx1262", "sx1262_ch341"): from pymc_core.hardware.sx1262_wrapper import SX1262Radio # Get radio and SPI configuration - all settings must be in config file spi_config = board_config.get("sx1262") if not spi_config: raise ValueError("Missing 'sx1262' section in configuration file") radio_config = board_config.get("radio") if not radio_config: raise ValueError("Missing 'radio' section in configuration file") # CH341 integration: swap SPI transport + GPIO backend to CH341 if radio_type == "sx1262_ch341": ch341_cfg = board_config.get("ch341") if not ch341_cfg: raise ValueError("Missing 'ch341' section in configuration file") from pymc_core.hardware.lora.LoRaRF.SX126x import set_spi_transport from pymc_core.hardware.transports.ch341_spi_transport import CH341SPITransport vid = _parse_int(ch341_cfg.get("vid"), default=0x1A86) pid = _parse_int(ch341_cfg.get("pid"), default=0x5512) # Create CH341 transport (also configures CH341 GPIO manager globally) ch341_spi = CH341SPITransport(vid=vid, pid=pid, auto_setup_gpio=True) set_spi_transport(ch341_spi) combined_config = { "bus_id": _parse_int(spi_config["bus_id"]), "cs_id": _parse_int(spi_config["cs_id"]), "cs_pin": _parse_int(spi_config["cs_pin"]), "reset_pin": _parse_int(spi_config["reset_pin"]), "busy_pin": _parse_int(spi_config["busy_pin"]), "irq_pin": _parse_int(spi_config["irq_pin"]), "txen_pin": _parse_int(spi_config["txen_pin"]), "rxen_pin": _parse_int(spi_config["rxen_pin"]), "txled_pin": _parse_int(spi_config.get("txled_pin", -1), default=-1), "rxled_pin": _parse_int(spi_config.get("rxled_pin", -1), default=-1), "use_dio3_tcxo": spi_config.get("use_dio3_tcxo", False), "dio3_tcxo_voltage": float(spi_config.get("dio3_tcxo_voltage", 1.8)), "use_dio2_rf": spi_config.get("use_dio2_rf", False), "is_waveshare": spi_config.get("is_waveshare", False), "frequency": int(radio_config["frequency"]), "tx_power": radio_config["tx_power"], "spreading_factor": radio_config["spreading_factor"], "bandwidth": int(radio_config["bandwidth"]), "coding_rate": radio_config["coding_rate"], "preamble_length": radio_config["preamble_length"], "sync_word": _parse_int(radio_config.get("sync_word", 0x12)), } en_pin = _parse_int(spi_config.get("en_pin"), default=None) en_pins = _parse_int_list(spi_config.get("en_pins")) if en_pin is not None: combined_config["en_pin"] = en_pin if en_pins is not None: combined_config["en_pins"] = en_pins # Add optional GPIO parameters if specified in config # These wont be supported by older versions of pymc_core if "gpio_chip" in spi_config: combined_config["gpio_chip"] = _parse_int(spi_config["gpio_chip"], default=0) if "use_gpiod_backend" in spi_config: combined_config["use_gpiod_backend"] = spi_config["use_gpiod_backend"] if "radio_timing_delay" in spi_config: combined_config["radio_timing_delay"] = float(spi_config["radio_timing_delay"]) radio = SX1262Radio.get_instance(**combined_config) if hasattr(radio, "_initialized") and not radio._initialized: try: radio.begin() except RuntimeError as e: raise RuntimeError(f"Failed to initialize SX1262 radio: {e}") from e return radio elif radio_type == "kiss": try: from pymc_core.hardware.kiss_modem_wrapper import KissModemWrapper except ImportError: try: from pymc_core.hardware.kiss_serial_wrapper import ( KissSerialWrapper as KissModemWrapper, ) except ImportError: raise RuntimeError( "KISS modem support requires pyMC_core with KISS support. " "Install your fork with: pip install -e /path/to/pyMC_core" ) from None kiss_config = board_config.get("kiss") if not kiss_config: raise ValueError("Missing 'kiss' section in configuration file for radio_type: kiss") port = kiss_config.get("port") if not port: raise ValueError("Missing 'port' in 'kiss' section (e.g. /dev/ttyUSB0)") baudrate = int(kiss_config.get("baud_rate", 115200)) radio_cfg = board_config.get("radio") or {} radio_config = { "frequency": int(radio_cfg.get("frequency", 869618000)), "bandwidth": int(radio_cfg.get("bandwidth", 62500)), "spreading_factor": int(radio_cfg.get("spreading_factor", 8)), "coding_rate": int(radio_cfg.get("coding_rate", 8)), "tx_power": int(radio_cfg.get("tx_power", 14)), "preamble_length": int(radio_cfg.get("preamble_length", 32)), } # Optional KISS key-up / CSMA tuning, forwarded to the modem firmware (via # SetHardware) only when present so the wrapper keeps its own defaults otherwise. # For a host-managed repeater the engine already staggers retransmits, so the # firmware's p-persistent CSMA backoff is usually redundant; set # kiss_persistence: 255 to transmit as soon as the channel is clear. for _key in ("tx_delay_ms", "kiss_persistence", "kiss_slottime_ms", "kiss_txtail_ms"): if kiss_config.get(_key) is not None: radio_config[_key] = int(kiss_config[_key]) if kiss_config.get("kiss_full_duplex") is not None: radio_config["kiss_full_duplex"] = bool(kiss_config["kiss_full_duplex"]) radio = KissModemWrapper( port=port, baudrate=baudrate, radio_config=radio_config, auto_configure=True, ) if hasattr(radio, "begin"): try: radio.begin() except Exception as e: raise RuntimeError(f"Failed to initialize KISS modem: {e}") from e return radio elif radio_type == "pymc_tcp": try: from pymc_core.hardware.tcp_radio import TCPLoRaRadio except ImportError: raise RuntimeError( "pymc_tcp radio requires pyMC_core >= the release that includes " "PR pyMC-dev/pyMC_core#68 (merged 2026-05-13). " "Reinstall the [hardware] extra to pick it up." ) from None tcp_cfg = board_config.get("pymc_tcp") if not tcp_cfg: raise ValueError( "Missing 'pymc_tcp' section in configuration file for radio_type: pymc_tcp" ) host = tcp_cfg.get("host") if not host: raise ValueError("Missing 'host' in 'pymc_tcp' section (modem hostname or LAN IP)") radio_cfg = board_config.get("radio") or {} radio = TCPLoRaRadio( host=host, port=int(tcp_cfg.get("port", 5055)), token=tcp_cfg.get("token", ""), connect_timeout=float(tcp_cfg.get("connect_timeout", 5.0)), frequency=int(radio_cfg.get("frequency", 869618000)), bandwidth=int(radio_cfg.get("bandwidth", 62500)), spreading_factor=int(radio_cfg.get("spreading_factor", 8)), coding_rate=int(radio_cfg.get("coding_rate", 8)), tx_power=int(radio_cfg.get("tx_power", 22)), sync_word=_parse_int(radio_cfg.get("sync_word", 0x12), default=0x12), preamble_length=int(radio_cfg.get("preamble_length", 16)), lbt_enabled=bool(tcp_cfg.get("lbt_enabled", True)), lbt_max_attempts=int(tcp_cfg.get("lbt_max_attempts", 5)), ) try: radio.begin() except Exception as e: raise RuntimeError(f"Failed to initialize pymc_tcp radio: {e}") from e return radio elif radio_type == "pymc_usb": try: from pymc_core.hardware.usb_radio import USBLoRaRadio except ImportError: raise RuntimeError( "pymc_usb radio requires pyMC_core >= the release that includes " "PR pyMC-dev/pyMC_core#68 (merged 2026-05-13). " "Reinstall the [hardware] extra to pick it up." ) from None usb_cfg = board_config.get("pymc_usb") if not usb_cfg: raise ValueError( "Missing 'pymc_usb' section in configuration file for radio_type: pymc_usb" ) port = usb_cfg.get("port") if not port: raise ValueError("Missing 'port' in 'pymc_usb' section (e.g. /dev/ttyACM0)") radio_cfg = board_config.get("radio") or {} radio = USBLoRaRadio( port=port, baudrate=int(usb_cfg.get("baudrate", 921600)), frequency=int(radio_cfg.get("frequency", 869618000)), bandwidth=int(radio_cfg.get("bandwidth", 62500)), spreading_factor=int(radio_cfg.get("spreading_factor", 8)), coding_rate=int(radio_cfg.get("coding_rate", 8)), tx_power=int(radio_cfg.get("tx_power", 22)), sync_word=_parse_int(radio_cfg.get("sync_word", 0x12), default=0x12), preamble_length=int(radio_cfg.get("preamble_length", 16)), lbt_enabled=bool(usb_cfg.get("lbt_enabled", True)), lbt_max_attempts=int(usb_cfg.get("lbt_max_attempts", 5)), ) try: radio.begin() except Exception as e: raise RuntimeError(f"Failed to initialize pymc_usb radio: {e}") from e return radio raise RuntimeError( f"Unknown radio type: {radio_type}. " "Supported: sx1262, sx1262_ch341, kiss (or kiss-modem), pymc_tcp, pymc_usb" )