Files
pyMC_Repeater/repeater/config.py

366 lines
14 KiB
Python

import base64
import logging
import os
from pathlib import Path
from typing import Any, Dict, Optional
import yaml
logger = logging.getLogger("Config")
def get_node_info(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract node name, radio configuration, and LetsMesh settings from config.
Args:
config: Configuration dictionary
Returns:
Dictionary with node_name, radio_config, and LetsMesh configuration
"""
node_name = config.get("repeater", {}).get("node_name", "PyMC-Repeater")
radio_config = config.get("radio", {})
radio_freq = radio_config.get("frequency", 0.0)
radio_bw = radio_config.get("bandwidth", 0.0)
radio_sf = radio_config.get("spreading_factor", 7)
radio_cr = radio_config.get("coding_rate", 5)
# Format frequency in MHz and bandwidth in kHz
radio_freq_mhz = radio_freq / 1_000_000
radio_bw_khz = radio_bw / 1_000
radio_config_str = f"{radio_freq_mhz},{radio_bw_khz},{radio_sf},{radio_cr}"
letsmesh_config = config.get("letsmesh", {})
from pymc_core.protocol.utils import PAYLOAD_TYPES
disallowed_types = letsmesh_config.get("disallowed_packet_types", [])
type_name_map = {name: code for code, name in PAYLOAD_TYPES.items()}
disallowed_hex = [type_name_map.get(name.upper(), None) for name in disallowed_types]
disallowed_hex = [val for val in disallowed_hex if val is not None] # Filter out invalid names
return {
"node_name": node_name,
"radio_config": radio_config_str,
"iata_code": letsmesh_config.get("iata_code", "TEST"),
"broker_index": letsmesh_config.get("broker_index", 0),
"status_interval": letsmesh_config.get("status_interval", 60),
"model": letsmesh_config.get("model", "PyMC-Repeater"),
"disallowed_packet_types": disallowed_hex,
"email": letsmesh_config.get("email", ""),
"owner": letsmesh_config.get("owner", ""),
}
def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
if config_path is None:
config_path = os.getenv("PYMC_REPEATER_CONFIG", "/etc/pymc_repeater/config.yaml")
# Check if config file exists
if not Path(config_path).exists():
raise FileNotFoundError(
f"Configuration file not found: {config_path}\n"
f"Please create a config file. Example: \n"
f" sudo cp {Path(config_path).parent}/config.yaml.example {config_path}\n"
f" sudo nano {config_path}"
)
# Load from file - no defaults, all settings must be in config file
try:
with open(config_path) as f:
config = yaml.safe_load(f) or {}
logger.info(f"Loaded config from {config_path}")
except Exception as e:
raise RuntimeError(f"Failed to load configuration from {config_path}: {e}") from e
if "mesh" not in config:
config["mesh"] = {}
# Ensure repeater.security exists with defaults for upgrades from older configs
if "repeater" not in config:
config["repeater"] = {}
if "security" not in config["repeater"]:
logger.warning(
"No 'security' section found under 'repeater' in config. "
"Adding defaults — please review and update passwords."
)
config["repeater"]["security"] = {
"max_clients": 1,
"admin_password": "admin123",
"guest_password": "guest123",
"allow_read_only": False,
"jwt_secret": "",
"jwt_expiry_minutes": 60,
}
# Only auto-generate identity_key if not provided under repeater section
if "identity_key" not in config["repeater"]:
# Check if identity_file is specified
identity_file = config["repeater"].get("identity_file")
if identity_file:
config["repeater"]["identity_key"] = _load_or_create_identity_key(path=identity_file)
else:
config["repeater"]["identity_key"] = _load_or_create_identity_key()
if os.getenv("PYMC_REPEATER_LOG_LEVEL"):
if "logging" not in config:
config["logging"] = {}
config["logging"]["level"] = os.getenv("PYMC_REPEATER_LOG_LEVEL")
return config
def save_config(config_data: Dict[str, Any], config_path: Optional[str] = None) -> bool:
"""
Save configuration to YAML file.
Args:
config_data: Configuration dictionary to save
config_path: Path to config file (uses default if None)
Returns:
True if successful, False otherwise
"""
if config_path is None:
config_path = os.getenv("PYMC_REPEATER_CONFIG", "/etc/pymc_repeater/config.yaml")
try:
# Create backup of existing config
config_file = Path(config_path)
if config_file.exists():
backup_path = config_file.with_suffix(".yaml.backup")
config_file.rename(backup_path)
logger.info(f"Created backup at {backup_path}")
# Save new config (allow_unicode=True so emojis etc. are not escaped as \U0001F47E)
with open(config_path, "w", encoding="utf-8") as f:
yaml.safe_dump(
config_data,
f,
default_flow_style=False,
sort_keys=False,
allow_unicode=True,
width=1000000,
)
logger.info(f"Saved configuration to {config_path}")
return True
except Exception as e:
logger.error(f"Failed to save configuration: {e}")
return False
def update_global_flood_policy(allow: bool, config_path: Optional[str] = None) -> bool:
"""
Update the global flood policy in the configuration.
Args:
allow: True to allow flooding globally, False to deny
config_path: Path to config file (uses default if None)
Returns:
True if successful, False otherwise
"""
try:
# Load current config
config = load_config(config_path)
# Ensure mesh section exists
if "mesh" not in config:
config["mesh"] = {}
# Set global flood policy
config["mesh"]["global_flood_allow"] = allow
# Save updated config
return save_config(config, config_path)
except Exception as e:
logger.error(f"Failed to update global flood policy: {e}")
return False
def _load_or_create_identity_key(path: Optional[str] = None) -> bytes:
if path is None:
# Check system-wide location first (matches config.yaml location)
system_key_path = Path("/etc/pymc_repeater/identity.key")
if system_key_path.exists():
key_path = system_key_path
else:
# Follow XDG spec
xdg_config_home = os.environ.get("XDG_CONFIG_HOME")
if xdg_config_home:
config_dir = Path(xdg_config_home) / "pymc_repeater"
else:
config_dir = Path.home() / ".config" / "pymc_repeater"
key_path = config_dir / "identity.key"
else:
key_path = Path(path)
key_path.parent.mkdir(parents=True, exist_ok=True)
if key_path.exists():
try:
with open(key_path, "rb") as f:
encoded = f.read()
key = base64.b64decode(encoded)
if len(key) not in (32, 64):
raise ValueError(f"Invalid key length: {len(key)}, expected 32 or 64")
logger.info(f"Loaded existing identity key from {key_path}")
return key
except Exception as e:
logger.warning(f"Failed to load identity key: {e}")
# Generate new random key
key = os.urandom(32)
# Save it
try:
with open(key_path, "wb") as f:
f.write(base64.b64encode(key))
os.chmod(key_path, 0o600) # Restrict permissions
logger.info(f"Generated and stored new identity key at {key_path}")
except Exception as e:
logger.warning(f"Failed to save identity key: {e}")
return key
def get_radio_for_board(board_config: dict):
def _parse_int(value, *, default=None) -> int:
if value is None:
return default
if isinstance(value, int):
return value
if isinstance(value, str):
return int(value.strip().rstrip(','), 0)
raise ValueError(f"Invalid int value type: {type(value)}")
radio_type = board_config.get("radio_type", "sx1262").lower().strip()
if radio_type == "kiss-modem":
radio_type = "kiss"
if radio_type in ("sx1262", "sx1262_ch341"):
from pymc_core.hardware.sx1262_wrapper import SX1262Radio
# Get radio and SPI configuration - all settings must be in config file
spi_config = board_config.get("sx1262")
if not spi_config:
raise ValueError("Missing 'sx1262' section in configuration file")
radio_config = board_config.get("radio")
if not radio_config:
raise ValueError("Missing 'radio' section in configuration file")
# CH341 integration: swap SPI transport + GPIO backend to CH341
if radio_type == "sx1262_ch341":
ch341_cfg = board_config.get("ch341")
if not ch341_cfg:
raise ValueError("Missing 'ch341' section in configuration file")
from pymc_core.hardware.lora.LoRaRF.SX126x import set_spi_transport
from pymc_core.hardware.transports.ch341_spi_transport import CH341SPITransport
vid = _parse_int(ch341_cfg.get("vid"), default=0x1A86)
pid = _parse_int(ch341_cfg.get("pid"), default=0x5512)
# Create CH341 transport (also configures CH341 GPIO manager globally)
ch341_spi = CH341SPITransport(vid=vid, pid=pid, auto_setup_gpio=True)
set_spi_transport(ch341_spi)
combined_config = {
"bus_id": _parse_int(spi_config["bus_id"]),
"cs_id": _parse_int(spi_config["cs_id"]),
"cs_pin": _parse_int(spi_config["cs_pin"]),
"reset_pin": _parse_int(spi_config["reset_pin"]),
"busy_pin": _parse_int(spi_config["busy_pin"]),
"irq_pin": _parse_int(spi_config["irq_pin"]),
"txen_pin": _parse_int(spi_config["txen_pin"]),
"rxen_pin": _parse_int(spi_config["rxen_pin"]),
"txled_pin": _parse_int(spi_config.get("txled_pin", -1), default=-1),
"rxled_pin": _parse_int(spi_config.get("rxled_pin", -1), default=-1),
"en_pin": _parse_int(spi_config.get("en_pin", -1), default=-1),
"use_dio3_tcxo": spi_config.get("use_dio3_tcxo", False),
"dio3_tcxo_voltage": float(spi_config.get("dio3_tcxo_voltage", 1.8)),
"use_dio2_rf": spi_config.get("use_dio2_rf", False),
"is_waveshare": spi_config.get("is_waveshare", False),
"frequency": int(radio_config["frequency"]),
"tx_power": radio_config["tx_power"],
"spreading_factor": radio_config["spreading_factor"],
"bandwidth": int(radio_config["bandwidth"]),
"coding_rate": radio_config["coding_rate"],
"preamble_length": radio_config["preamble_length"],
"sync_word": radio_config["sync_word"],
}
# Add optional GPIO parameters if specified in config
# These wont be supported by older versions of pymc_core
if "gpio_chip" in spi_config:
combined_config["gpio_chip"] = _parse_int(spi_config["gpio_chip"], default=0)
if "use_gpiod_backend" in spi_config:
combined_config["use_gpiod_backend"] = spi_config["use_gpiod_backend"]
radio = SX1262Radio.get_instance(**combined_config)
if hasattr(radio, "_initialized") and not radio._initialized:
try:
radio.begin()
except RuntimeError as e:
raise RuntimeError(f"Failed to initialize SX1262 radio: {e}") from e
return radio
elif radio_type == "kiss":
try:
from pymc_core.hardware.kiss_modem_wrapper import KissModemWrapper
except ImportError:
try:
from pymc_core.hardware.kiss_serial_wrapper import (
KissSerialWrapper as KissModemWrapper,
)
except ImportError:
raise RuntimeError(
"KISS modem support requires pyMC_core with KISS support. "
"Install your fork with: pip install -e /path/to/pyMC_core"
) from None
kiss_config = board_config.get("kiss")
if not kiss_config:
raise ValueError("Missing 'kiss' section in configuration file for radio_type: kiss")
port = kiss_config.get("port")
if not port:
raise ValueError("Missing 'port' in 'kiss' section (e.g. /dev/ttyUSB0)")
baudrate = int(kiss_config.get("baud_rate", 115200))
radio_cfg = board_config.get("radio") or {}
radio_config = {
"frequency": int(radio_cfg.get("frequency", 869618000)),
"bandwidth": int(radio_cfg.get("bandwidth", 62500)),
"spreading_factor": int(radio_cfg.get("spreading_factor", 8)),
"coding_rate": int(radio_cfg.get("coding_rate", 8)),
"tx_power": int(radio_cfg.get("tx_power", 14)),
}
radio = KissModemWrapper(
port=port,
baudrate=baudrate,
radio_config=radio_config,
auto_configure=True,
)
if hasattr(radio, "begin"):
try:
radio.begin()
except Exception as e:
raise RuntimeError(f"Failed to initialize KISS modem: {e}") from e
return radio
raise RuntimeError(
f"Unknown radio type: {radio_type}. Supported: sx1262, sx1262_ch341, kiss (or kiss-modem)"
)