forked from iarv/pyMC_Repeater
249 lines
8.6 KiB
Python
249 lines
8.6 KiB
Python
import base64
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
|
|
import yaml
|
|
|
|
logger = logging.getLogger("Config")
|
|
|
|
|
|
def get_node_info(config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extract node name, radio configuration, and LetsMesh settings from config.
|
|
|
|
Args:
|
|
config: Configuration dictionary
|
|
|
|
Returns:
|
|
Dictionary with node_name, radio_config, and LetsMesh configuration
|
|
"""
|
|
node_name = config.get("repeater", {}).get("node_name", "PyMC-Repeater")
|
|
radio_config = config.get("radio", {})
|
|
radio_freq = radio_config.get("frequency", 0.0)
|
|
radio_bw = radio_config.get("bandwidth", 0.0)
|
|
radio_sf = radio_config.get("spreading_factor", 7)
|
|
radio_cr = radio_config.get("coding_rate", 5)
|
|
# Format frequency in MHz and bandwidth in kHz
|
|
radio_freq_mhz = radio_freq / 1_000_000
|
|
radio_bw_khz = radio_bw / 1_000
|
|
radio_config_str = f"{radio_freq_mhz},{radio_bw_khz},{radio_sf},{radio_cr}"
|
|
|
|
letsmesh_config = config.get("letsmesh", {})
|
|
|
|
from pymc_core.protocol.utils import PAYLOAD_TYPES
|
|
|
|
disallowed_types = letsmesh_config.get("disallowed_packet_types", [])
|
|
type_name_map = {name: code for code, name in PAYLOAD_TYPES.items()}
|
|
|
|
disallowed_hex = [type_name_map.get(name.upper(), None) for name in disallowed_types]
|
|
disallowed_hex = [val for val in disallowed_hex if val is not None] # Filter out invalid names
|
|
|
|
return {
|
|
"node_name": node_name,
|
|
"radio_config": radio_config_str,
|
|
"iata_code": letsmesh_config.get("iata_code", "TEST"),
|
|
"broker_index": letsmesh_config.get("broker_index", 0),
|
|
"status_interval": letsmesh_config.get("status_interval", 60),
|
|
"model": letsmesh_config.get("model", "PyMC-Repeater"),
|
|
"disallowed_packet_types": disallowed_hex,
|
|
"email": letsmesh_config.get("email", ""),
|
|
"owner": letsmesh_config.get("owner", "")
|
|
}
|
|
|
|
|
|
def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
|
|
if config_path is None:
|
|
config_path = os.getenv("PYMC_REPEATER_CONFIG", "/etc/pymc_repeater/config.yaml")
|
|
|
|
# Check if config file exists
|
|
if not Path(config_path).exists():
|
|
raise FileNotFoundError(
|
|
f"Configuration file not found: {config_path}\n"
|
|
f"Please create a config file. Example: \n"
|
|
f" sudo cp {Path(config_path).parent}/config.yaml.example {config_path}\n"
|
|
f" sudo nano {config_path}"
|
|
)
|
|
|
|
# Load from file - no defaults, all settings must be in config file
|
|
try:
|
|
with open(config_path) as f:
|
|
config = yaml.safe_load(f) or {}
|
|
logger.info(f"Loaded config from {config_path}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to load configuration from {config_path}: {e}") from e
|
|
|
|
if "mesh" not in config:
|
|
config["mesh"] = {}
|
|
|
|
# Only auto-generate identity_key if not provided
|
|
if "identity_key" not in config["mesh"]:
|
|
config["mesh"]["identity_key"] = _load_or_create_identity_key()
|
|
|
|
if os.getenv("PYMC_REPEATER_LOG_LEVEL"):
|
|
if "logging" not in config:
|
|
config["logging"] = {}
|
|
config["logging"]["level"] = os.getenv("PYMC_REPEATER_LOG_LEVEL")
|
|
|
|
return config
|
|
|
|
|
|
def save_config(config_data: Dict[str, Any], config_path: Optional[str] = None) -> bool:
|
|
"""
|
|
Save configuration to YAML file.
|
|
|
|
Args:
|
|
config_data: Configuration dictionary to save
|
|
config_path: Path to config file (uses default if None)
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
if config_path is None:
|
|
config_path = os.getenv("PYMC_REPEATER_CONFIG", "/etc/pymc_repeater/config.yaml")
|
|
|
|
try:
|
|
# Create backup of existing config
|
|
config_file = Path(config_path)
|
|
if config_file.exists():
|
|
backup_path = config_file.with_suffix('.yaml.backup')
|
|
config_file.rename(backup_path)
|
|
logger.info(f"Created backup at {backup_path}")
|
|
|
|
# Save new config
|
|
with open(config_path, 'w') as f:
|
|
yaml.safe_dump(config_data, f, default_flow_style=False, sort_keys=False)
|
|
|
|
logger.info(f"Saved configuration to {config_path}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save configuration: {e}")
|
|
return False
|
|
|
|
|
|
def update_global_flood_policy(allow: bool, config_path: Optional[str] = None) -> bool:
|
|
"""
|
|
Update the global flood policy in the configuration.
|
|
|
|
Args:
|
|
allow: True to allow flooding globally, False to deny
|
|
config_path: Path to config file (uses default if None)
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
# Load current config
|
|
config = load_config(config_path)
|
|
|
|
# Ensure mesh section exists
|
|
if "mesh" not in config:
|
|
config["mesh"] = {}
|
|
|
|
# Set global flood policy
|
|
config["mesh"]["global_flood_allow"] = allow
|
|
|
|
# Save updated config
|
|
return save_config(config, config_path)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update global flood policy: {e}")
|
|
return False
|
|
|
|
|
|
def _load_or_create_identity_key(path: Optional[str] = None) -> bytes:
|
|
|
|
if path is None:
|
|
# Follow XDG spec
|
|
xdg_config_home = os.environ.get("XDG_CONFIG_HOME")
|
|
if xdg_config_home:
|
|
config_dir = Path(xdg_config_home) / "pymc_repeater"
|
|
else:
|
|
config_dir = Path.home() / ".config" / "pymc_repeater"
|
|
key_path = config_dir / "identity.key"
|
|
else:
|
|
key_path = Path(path)
|
|
|
|
key_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if key_path.exists():
|
|
try:
|
|
with open(key_path, "rb") as f:
|
|
encoded = f.read()
|
|
key = base64.b64decode(encoded)
|
|
if len(key) != 32:
|
|
raise ValueError(f"Invalid key length: {len(key)}, expected 32")
|
|
logger.info(f"Loaded existing identity key from {key_path}")
|
|
return key
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load identity key: {e}")
|
|
|
|
# Generate new random key
|
|
key = os.urandom(32)
|
|
|
|
# Save it
|
|
try:
|
|
with open(key_path, "wb") as f:
|
|
f.write(base64.b64encode(key))
|
|
os.chmod(key_path, 0o600) # Restrict permissions
|
|
logger.info(f"Generated and stored new identity key at {key_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to save identity key: {e}")
|
|
|
|
return key
|
|
|
|
|
|
def get_radio_for_board(board_config: dict):
|
|
|
|
radio_type = board_config.get("radio_type", "sx1262").lower()
|
|
|
|
if radio_type == "sx1262":
|
|
from pymc_core.hardware.sx1262_wrapper import SX1262Radio
|
|
|
|
# Get radio and SPI configuration - all settings must be in config file
|
|
spi_config = board_config.get("sx1262")
|
|
if not spi_config:
|
|
raise ValueError("Missing 'sx1262' section in configuration file")
|
|
|
|
radio_config = board_config.get("radio")
|
|
if not radio_config:
|
|
raise ValueError("Missing 'radio' section in configuration file")
|
|
|
|
# Build config with required fields - no defaults
|
|
combined_config = {
|
|
"bus_id": spi_config["bus_id"],
|
|
"cs_id": spi_config["cs_id"],
|
|
"cs_pin": spi_config["cs_pin"],
|
|
"reset_pin": spi_config["reset_pin"],
|
|
"busy_pin": spi_config["busy_pin"],
|
|
"irq_pin": spi_config["irq_pin"],
|
|
"txen_pin": spi_config["txen_pin"],
|
|
"rxen_pin": spi_config["rxen_pin"],
|
|
"txled_pin": spi_config.get("txled_pin", -1),
|
|
"rxled_pin": spi_config.get("rxled_pin", -1),
|
|
"use_dio3_tcxo": spi_config.get("use_dio3_tcxo", False),
|
|
"is_waveshare": spi_config.get("is_waveshare", False),
|
|
"frequency": int(radio_config["frequency"]),
|
|
"tx_power": radio_config["tx_power"],
|
|
"spreading_factor": radio_config["spreading_factor"],
|
|
"bandwidth": int(radio_config["bandwidth"]),
|
|
"coding_rate": radio_config["coding_rate"],
|
|
"preamble_length": radio_config["preamble_length"],
|
|
"sync_word": radio_config["sync_word"],
|
|
}
|
|
|
|
radio = SX1262Radio.get_instance(**combined_config)
|
|
|
|
if hasattr(radio, "_initialized") and not radio._initialized:
|
|
try:
|
|
radio.begin()
|
|
except RuntimeError as e:
|
|
raise RuntimeError(f"Failed to initialize SX1262 radio: {e}") from e
|
|
|
|
return radio
|
|
|
|
else:
|
|
raise RuntimeError(f"Unknown radio type: {radio_type}. Supported: sx1262")
|