mirror of
https://github.com/pyMC-dev/pyMC_Repeater.git
synced 2026-07-02 07:52:22 +02:00
setup wizard: pymc_tcp / pymc_usb hardware tiles
Lets a fresh repeater install pick the pymc_usb (USB-CDC) or pymc_tcp
(Wi-Fi/Ethernet) external modem from the first-run /setup wizard
instead of requiring the user to hand-edit config.yaml after install.
radio-settings.json gets two new hardware entries; setup_wizard()
in api_endpoints.py handles them in dedicated branches that mirror
the existing KISS pattern (placeholders if the SPA doesn't yet send
modem-specific inputs, request body overrides if it does).
For pymc_tcp the wizard writes a sentinel host placeholder
('REPLACE_WITH_MODEM_HOST') so the YAML stays valid; on startup
get_radio_for_board() then errors with a clear pointer at
pymc_tcp.host (existing behavior from the PR #240 branch). pymc_usb
defaults to /dev/ttyACM0 at 921600 baud — matches the USB-CDC
device path documented in pymc_usb's README + pymc_driver.
Five new tests in tests/test_setup_wizard_pymc.py verify both
default and overridden code paths plus a KISS regression guard.
This commit is contained in:
@@ -322,6 +322,20 @@
|
||||
"use_dio2_rf": false,
|
||||
"use_dio3_tcxo": true,
|
||||
"preamble_length": 17
|
||||
},
|
||||
"pymc_usb": {
|
||||
"name": "pymc_usb modem (USB-CDC)",
|
||||
"description": "ESP32-S3 / nRF52 board running pymc_usb firmware as a USB-CDC LoRa modem. Pick this if the modem is plugged into the host's USB port (e.g. /dev/ttyACM0). Edit the 'pymc_usb' section after first-run to point at the right serial device.",
|
||||
"radio_type": "pymc_usb",
|
||||
"tx_power": 22,
|
||||
"preamble_length": 16
|
||||
},
|
||||
"pymc_tcp": {
|
||||
"name": "pymc_tcp modem (Wi-Fi / Ethernet)",
|
||||
"description": "ESP32 board running pymc_usb firmware exposed as a TCP server over Wi-Fi or Ethernet. After first-run, set 'pymc_tcp.host' to the modem's LAN address or mDNS name (e.g. pymc-3e2834.local).",
|
||||
"radio_type": "pymc_tcp",
|
||||
"tx_power": 22,
|
||||
"preamble_length": 16
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -496,6 +496,49 @@ class APIEndpoints:
|
||||
config_yaml["radio"]["tx_power"] = int(radio_preset.get("tx_power", 14))
|
||||
if "preamble_length" not in config_yaml["radio"]:
|
||||
config_yaml["radio"]["preamble_length"] = 17
|
||||
elif hardware_key == "pymc_usb":
|
||||
# pymc_usb modem: external SX1262 board over USB-CDC.
|
||||
# Accept pymc_usb_port / pymc_usb_baudrate from the request body
|
||||
# (mirrors the KISS pattern) so a future SPA can expose inputs;
|
||||
# fall back to /dev/ttyACM0 at 921600 baud, which matches the
|
||||
# firmware default and the typical USB-CDC modem device on Linux.
|
||||
config_yaml["radio_type"] = "pymc_usb"
|
||||
usb_port = (data.get("pymc_usb_port") or "").strip() or "/dev/ttyACM0"
|
||||
usb_baud = int(data.get("pymc_usb_baudrate", data.get("pymc_usb_baud", 921600)))
|
||||
pymc_usb_section = config_yaml.setdefault("pymc_usb", {})
|
||||
pymc_usb_section["port"] = usb_port
|
||||
pymc_usb_section["baudrate"] = usb_baud
|
||||
pymc_usb_section.setdefault("lbt_enabled", True)
|
||||
pymc_usb_section.setdefault("lbt_max_attempts", 5)
|
||||
if "tx_power" in hw_config:
|
||||
config_yaml["radio"]["tx_power"] = hw_config.get("tx_power", 22)
|
||||
if "preamble_length" in hw_config:
|
||||
config_yaml["radio"]["preamble_length"] = hw_config.get("preamble_length", 16)
|
||||
elif hardware_key == "pymc_tcp":
|
||||
# pymc_tcp modem: external SX1262 board exposed as TCP over Wi-Fi/Ethernet.
|
||||
# 'host' has no sensible default — must be the modem's LAN address or
|
||||
# mDNS name. Accept it from the request body if the SPA provides it,
|
||||
# otherwise write a clearly-placeholder hostname so the file is valid
|
||||
# YAML and the user gets a startup error pointing them at the right
|
||||
# section to edit (see config.py: ValueError 'Missing host …').
|
||||
config_yaml["radio_type"] = "pymc_tcp"
|
||||
tcp_host = (data.get("pymc_tcp_host") or "").strip() or "REPLACE_WITH_MODEM_HOST"
|
||||
tcp_port = int(data.get("pymc_tcp_port", 5055))
|
||||
pymc_tcp_section = config_yaml.setdefault("pymc_tcp", {})
|
||||
pymc_tcp_section["host"] = tcp_host
|
||||
pymc_tcp_section["port"] = tcp_port
|
||||
tcp_token = data.get("pymc_tcp_token")
|
||||
if tcp_token is not None:
|
||||
pymc_tcp_section["token"] = str(tcp_token)
|
||||
else:
|
||||
pymc_tcp_section.setdefault("token", "")
|
||||
pymc_tcp_section.setdefault("connect_timeout", 5.0)
|
||||
pymc_tcp_section.setdefault("lbt_enabled", True)
|
||||
pymc_tcp_section.setdefault("lbt_max_attempts", 5)
|
||||
if "tx_power" in hw_config:
|
||||
config_yaml["radio"]["tx_power"] = hw_config.get("tx_power", 22)
|
||||
if "preamble_length" in hw_config:
|
||||
config_yaml["radio"]["preamble_length"] = hw_config.get("preamble_length", 16)
|
||||
else:
|
||||
# SX1262 / sx1262_ch341: radio_type and optional CH341 from hw_config
|
||||
if "radio_type" in hw_config:
|
||||
@@ -591,6 +634,15 @@ class APIEndpoints:
|
||||
if hardware_key == "kiss":
|
||||
result_config["kiss_port"] = config_yaml.get("kiss", {}).get("port")
|
||||
result_config["kiss_baud_rate"] = config_yaml.get("kiss", {}).get("baud_rate")
|
||||
elif hardware_key == "pymc_usb":
|
||||
pymc_usb_cfg = config_yaml.get("pymc_usb", {})
|
||||
result_config["pymc_usb_port"] = pymc_usb_cfg.get("port")
|
||||
result_config["pymc_usb_baudrate"] = pymc_usb_cfg.get("baudrate")
|
||||
elif hardware_key == "pymc_tcp":
|
||||
pymc_tcp_cfg = config_yaml.get("pymc_tcp", {})
|
||||
result_config["pymc_tcp_host"] = pymc_tcp_cfg.get("host")
|
||||
result_config["pymc_tcp_port"] = pymc_tcp_cfg.get("port")
|
||||
# token deliberately omitted from response (sensitive)
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Setup completed successfully. Service is restarting...",
|
||||
|
||||
@@ -0,0 +1,208 @@
|
||||
"""Tests for setup_wizard pymc_usb / pymc_tcp branches.
|
||||
|
||||
These verify that when the first-run /setup wizard is finished with one of
|
||||
the two pymc_* hardware tiles selected, api_endpoints.setup_wizard() writes
|
||||
a config.yaml that matches what get_radio_for_board() expects (see
|
||||
repeater/config.py and tests/test_radio_config.py).
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import types
|
||||
|
||||
import cherrypy
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from repeater.web.api_endpoints import APIEndpoints
|
||||
|
||||
|
||||
# Minimal initial config.yaml the wizard writes into.
|
||||
_BASE_CONFIG = {
|
||||
"repeater": {"node_name": "mesh-repeater-01", "security": {"admin_password": "admin123"}},
|
||||
"radio": {},
|
||||
}
|
||||
|
||||
_BASE_REQUEST = {
|
||||
"node_name": "pymc-test",
|
||||
"admin_password": "supersecret",
|
||||
"radio_preset": {
|
||||
"frequency": 869.618,
|
||||
"spreading_factor": 8,
|
||||
"bandwidth": 62.5,
|
||||
"coding_rate": 8,
|
||||
"tx_power": 22,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def wizard_env(tmp_path, monkeypatch):
|
||||
"""Bootstrap a tempdir with config.yaml + radio-settings.json + mocked cherrypy."""
|
||||
config_path = tmp_path / "config.yaml"
|
||||
with open(config_path, "w") as f:
|
||||
yaml.safe_dump(_BASE_CONFIG, f)
|
||||
|
||||
radio_settings = {
|
||||
"hardware": {
|
||||
"pymc_usb": {
|
||||
"name": "pymc_usb modem (USB-CDC)",
|
||||
"radio_type": "pymc_usb",
|
||||
"tx_power": 22,
|
||||
"preamble_length": 16,
|
||||
},
|
||||
"pymc_tcp": {
|
||||
"name": "pymc_tcp modem (Wi-Fi / Ethernet)",
|
||||
"radio_type": "pymc_tcp",
|
||||
"tx_power": 22,
|
||||
"preamble_length": 16,
|
||||
},
|
||||
}
|
||||
}
|
||||
with open(tmp_path / "radio-settings.json", "w") as f:
|
||||
json.dump(radio_settings, f)
|
||||
|
||||
# resolve_storage_dir() returns the directory of config_path when the
|
||||
# config has no explicit storage_dir set — that's exactly what we want
|
||||
# so the wizard finds our radio-settings.json next to config.yaml.
|
||||
config = {
|
||||
"repeater": {
|
||||
"storage_dir": str(tmp_path),
|
||||
"node_name": "mesh-repeater-01",
|
||||
"security": {"admin_password": "admin123"},
|
||||
}
|
||||
}
|
||||
endpoints = APIEndpoints(config=config, config_path=str(config_path))
|
||||
|
||||
# Stub the post-wizard service restart — we don't want a real systemctl call.
|
||||
fake_service_utils = types.ModuleType("repeater.service_utils")
|
||||
fake_service_utils.restart_service = lambda: None
|
||||
monkeypatch.setitem(sys.modules, "repeater.service_utils", fake_service_utils)
|
||||
|
||||
def _set_request(body):
|
||||
# cherrypy.request is a thread-local — populate the bits the handler reads.
|
||||
cherrypy.request.method = "POST"
|
||||
cherrypy.request.json = body
|
||||
|
||||
return tmp_path, config_path, endpoints, _set_request
|
||||
|
||||
|
||||
def _read_yaml(path):
|
||||
with open(path) as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
|
||||
# ─── pymc_usb ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_wizard_pymc_usb_defaults(wizard_env):
|
||||
tmp_path, config_path, endpoints, set_request = wizard_env
|
||||
|
||||
body = dict(_BASE_REQUEST, hardware_key="pymc_usb")
|
||||
set_request(body)
|
||||
|
||||
result = endpoints.setup_wizard()
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["config"]["radio_type"] == "pymc_usb"
|
||||
assert result["config"]["pymc_usb_port"] == "/dev/ttyACM0"
|
||||
assert result["config"]["pymc_usb_baudrate"] == 921600
|
||||
|
||||
written = _read_yaml(config_path)
|
||||
assert written["radio_type"] == "pymc_usb"
|
||||
assert written["pymc_usb"]["port"] == "/dev/ttyACM0"
|
||||
assert written["pymc_usb"]["baudrate"] == 921600
|
||||
assert written["pymc_usb"]["lbt_enabled"] is True
|
||||
assert written["pymc_usb"]["lbt_max_attempts"] == 5
|
||||
assert written["radio"]["tx_power"] == 22
|
||||
assert written["radio"]["preamble_length"] == 16
|
||||
# config.py rejects pymc_usb if 'sx1262' / 'ch341' keys leak in — none here.
|
||||
assert "sx1262" not in written
|
||||
|
||||
|
||||
def test_wizard_pymc_usb_overrides_from_request(wizard_env):
|
||||
tmp_path, config_path, endpoints, set_request = wizard_env
|
||||
|
||||
body = dict(
|
||||
_BASE_REQUEST,
|
||||
hardware_key="pymc_usb",
|
||||
pymc_usb_port="/dev/ttyUSB0",
|
||||
pymc_usb_baudrate=115200,
|
||||
)
|
||||
set_request(body)
|
||||
|
||||
result = endpoints.setup_wizard()
|
||||
|
||||
assert result["success"] is True
|
||||
written = _read_yaml(config_path)
|
||||
assert written["pymc_usb"]["port"] == "/dev/ttyUSB0"
|
||||
assert written["pymc_usb"]["baudrate"] == 115200
|
||||
|
||||
|
||||
# ─── pymc_tcp ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_wizard_pymc_tcp_placeholder(wizard_env):
|
||||
"""No host in request → wizard writes a sentinel placeholder. config.py
|
||||
will then refuse to start with a clear error pointing at pymc_tcp.host."""
|
||||
tmp_path, config_path, endpoints, set_request = wizard_env
|
||||
|
||||
body = dict(_BASE_REQUEST, hardware_key="pymc_tcp")
|
||||
set_request(body)
|
||||
|
||||
result = endpoints.setup_wizard()
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["config"]["radio_type"] == "pymc_tcp"
|
||||
assert result["config"]["pymc_tcp_host"] == "REPLACE_WITH_MODEM_HOST"
|
||||
assert result["config"]["pymc_tcp_port"] == 5055
|
||||
|
||||
written = _read_yaml(config_path)
|
||||
assert written["radio_type"] == "pymc_tcp"
|
||||
assert written["pymc_tcp"]["host"] == "REPLACE_WITH_MODEM_HOST"
|
||||
assert written["pymc_tcp"]["port"] == 5055
|
||||
assert written["pymc_tcp"]["token"] == ""
|
||||
assert written["pymc_tcp"]["connect_timeout"] == 5.0
|
||||
assert written["pymc_tcp"]["lbt_enabled"] is True
|
||||
# token deliberately stripped from response.
|
||||
assert "pymc_tcp_token" not in result["config"]
|
||||
|
||||
|
||||
def test_wizard_pymc_tcp_full_fields(wizard_env):
|
||||
tmp_path, config_path, endpoints, set_request = wizard_env
|
||||
|
||||
body = dict(
|
||||
_BASE_REQUEST,
|
||||
hardware_key="pymc_tcp",
|
||||
pymc_tcp_host="pymc-3e2834.local",
|
||||
pymc_tcp_port=6000,
|
||||
pymc_tcp_token="hunter2",
|
||||
)
|
||||
set_request(body)
|
||||
|
||||
result = endpoints.setup_wizard()
|
||||
|
||||
assert result["success"] is True
|
||||
written = _read_yaml(config_path)
|
||||
assert written["pymc_tcp"]["host"] == "pymc-3e2834.local"
|
||||
assert written["pymc_tcp"]["port"] == 6000
|
||||
assert written["pymc_tcp"]["token"] == "hunter2"
|
||||
|
||||
|
||||
# ─── KISS regression guard ────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_wizard_kiss_branch_unchanged(wizard_env, tmp_path):
|
||||
"""Make sure adding the pymc_* branches didn't break the existing KISS path."""
|
||||
tmp_path, config_path, endpoints, set_request = wizard_env
|
||||
|
||||
body = dict(_BASE_REQUEST, hardware_key="kiss")
|
||||
set_request(body)
|
||||
|
||||
result = endpoints.setup_wizard()
|
||||
|
||||
assert result["success"] is True
|
||||
written = _read_yaml(config_path)
|
||||
assert written["radio_type"] == "kiss"
|
||||
assert written["kiss"]["port"] == "/dev/ttyUSB0"
|
||||
assert written["kiss"]["baud_rate"] == 115200
|
||||
Reference in New Issue
Block a user