Merge pull request #240 from itk80/feat/pymc-tcp-usb-radio

config: add pymc_tcp / pymc_usb radio_type branches
This commit is contained in:
Lloyd
2026-05-13 16:39:08 +01:00
committed by GitHub
3 changed files with 246 additions and 2 deletions
+22
View File
@@ -314,6 +314,9 @@ identities:
# Supported:
# - sx1262 (Linux spidev + system GPIO)
# - sx1262_ch341 (CH341 USB-to-SPI + CH341 GPIO 0-7)
# - kiss (KISS-modem over a serial port; alias: kiss-modem)
# - pymc_tcp (pymc_usb firmware modem over Wi-Fi/TCP)
# - pymc_usb (pymc_usb firmware modem over USB-CDC)
radio_type: sx1262
# CH341 USB-to-SPI adapter settings (only used when radio_type: sx1262_ch341)
@@ -352,6 +355,25 @@ radio:
# port: "/dev/ttyUSB0"
# baud_rate: 9600
# pymc_usb firmware modem over Wi-Fi/TCP (when radio_type: pymc_tcp).
# Requires pyMC_core with the TCPLoRaRadio driver (PR pyMC-dev/pyMC_core#68,
# merged into dev on 2026-05-13).
# pymc_tcp:
# host: "pymc-3e2834.local" # modem hostname / mDNS name / LAN IP
# port: 5055 # firmware default
# token: "" # empty = no auth; set to match firmware NVS token
# connect_timeout: 5.0 # seconds; deferred-connect retries with backoff on failure
# lbt_enabled: true # Listen-Before-Talk via firmware CAD
# lbt_max_attempts: 5
# pymc_usb firmware modem over USB-CDC (when radio_type: pymc_usb).
# Requires pyMC_core with the USBLoRaRadio driver (PR pyMC-dev/pyMC_core#68).
# pymc_usb:
# port: "/dev/ttyACM0" # USB-CDC device; udev rule may symlink to /dev/lora-modem
# baudrate: 921600 # must match firmware monitor_speed
# lbt_enabled: true
# lbt_max_attempts: 5
# SX1262 Hardware Configuration
# NOTE:
# - When radio_type: sx1262, these pins are BCM GPIO numbers.
+92 -1
View File
@@ -450,6 +450,97 @@ def get_radio_for_board(board_config: dict):
return radio
elif radio_type == "pymc_tcp":
try:
from pymc_core.hardware.tcp_radio import TCPLoRaRadio
except ImportError:
raise RuntimeError(
"pymc_tcp radio requires pyMC_core >= the release that includes "
"PR pyMC-dev/pyMC_core#68 (merged 2026-05-13). "
"Reinstall the [hardware] extra to pick it up."
) from None
tcp_cfg = board_config.get("pymc_tcp")
if not tcp_cfg:
raise ValueError(
"Missing 'pymc_tcp' section in configuration file for radio_type: pymc_tcp"
)
host = tcp_cfg.get("host")
if not host:
raise ValueError(
"Missing 'host' in 'pymc_tcp' section (modem hostname or LAN IP)"
)
radio_cfg = board_config.get("radio") or {}
radio = TCPLoRaRadio(
host=host,
port=int(tcp_cfg.get("port", 5055)),
token=tcp_cfg.get("token", ""),
connect_timeout=float(tcp_cfg.get("connect_timeout", 5.0)),
frequency=int(radio_cfg.get("frequency", 869618000)),
bandwidth=int(radio_cfg.get("bandwidth", 62500)),
spreading_factor=int(radio_cfg.get("spreading_factor", 8)),
coding_rate=int(radio_cfg.get("coding_rate", 8)),
tx_power=int(radio_cfg.get("tx_power", 22)),
sync_word=_parse_int(radio_cfg.get("sync_word", 0x12)),
preamble_length=int(radio_cfg.get("preamble_length", 16)),
lbt_enabled=bool(tcp_cfg.get("lbt_enabled", True)),
lbt_max_attempts=int(tcp_cfg.get("lbt_max_attempts", 5)),
)
try:
radio.begin()
except Exception as e:
raise RuntimeError(f"Failed to initialize pymc_tcp radio: {e}") from e
return radio
elif radio_type == "pymc_usb":
try:
from pymc_core.hardware.usb_radio import USBLoRaRadio
except ImportError:
raise RuntimeError(
"pymc_usb radio requires pyMC_core >= the release that includes "
"PR pyMC-dev/pyMC_core#68 (merged 2026-05-13). "
"Reinstall the [hardware] extra to pick it up."
) from None
usb_cfg = board_config.get("pymc_usb")
if not usb_cfg:
raise ValueError(
"Missing 'pymc_usb' section in configuration file for radio_type: pymc_usb"
)
port = usb_cfg.get("port")
if not port:
raise ValueError(
"Missing 'port' in 'pymc_usb' section (e.g. /dev/ttyACM0)"
)
radio_cfg = board_config.get("radio") or {}
radio = USBLoRaRadio(
port=port,
baudrate=int(usb_cfg.get("baudrate", 921600)),
frequency=int(radio_cfg.get("frequency", 869618000)),
bandwidth=int(radio_cfg.get("bandwidth", 62500)),
spreading_factor=int(radio_cfg.get("spreading_factor", 8)),
coding_rate=int(radio_cfg.get("coding_rate", 8)),
tx_power=int(radio_cfg.get("tx_power", 22)),
sync_word=_parse_int(radio_cfg.get("sync_word", 0x12)),
preamble_length=int(radio_cfg.get("preamble_length", 16)),
lbt_enabled=bool(usb_cfg.get("lbt_enabled", True)),
lbt_max_attempts=int(usb_cfg.get("lbt_max_attempts", 5)),
)
try:
radio.begin()
except Exception as e:
raise RuntimeError(f"Failed to initialize pymc_usb radio: {e}") from e
return radio
raise RuntimeError(
f"Unknown radio type: {radio_type}. Supported: sx1262, sx1262_ch341, kiss (or kiss-modem)"
f"Unknown radio type: {radio_type}. "
"Supported: sx1262, sx1262_ch341, kiss (or kiss-modem), pymc_tcp, pymc_usb"
)
+132 -1
View File
@@ -1,9 +1,14 @@
import pytest
from repeater.config import get_radio_for_board
class _DummyRadio:
_initialized = True
def begin(self):
return True
def test_get_radio_for_board_passes_en_pins(monkeypatch):
captured_kwargs = {}
@@ -46,4 +51,130 @@ def test_get_radio_for_board_passes_en_pins(monkeypatch):
get_radio_for_board(board_config)
assert captured_kwargs["en_pins"] == [26, 23]
assert "en_pin" not in captured_kwargs
assert "en_pin" not in captured_kwargs
# ─── pymc_tcp / pymc_usb branches ────────────────────────────────────
def _pymc_radio_cfg():
"""Common radio params for the pymc_* tests."""
return {
"frequency": 869618000,
"tx_power": 22,
"spreading_factor": 8,
"bandwidth": 62500,
"coding_rate": 8,
"preamble_length": 16,
"sync_word": 0x12,
}
def test_get_radio_for_board_pymc_tcp(monkeypatch):
pytest.importorskip("pymc_core.hardware.tcp_radio")
captured = {}
class _DummyTCPLoRaRadio(_DummyRadio):
def __init__(self, **kwargs):
captured.update(kwargs)
monkeypatch.setattr(
"pymc_core.hardware.tcp_radio.TCPLoRaRadio",
_DummyTCPLoRaRadio,
)
board_config = {
"radio_type": "pymc_tcp",
"pymc_tcp": {
"host": "pymc-3e2834.local",
"port": 5055,
"token": "shared-secret",
"connect_timeout": 7.5,
"lbt_enabled": False,
"lbt_max_attempts": 3,
},
"radio": _pymc_radio_cfg(),
}
get_radio_for_board(board_config)
assert captured["host"] == "pymc-3e2834.local"
assert captured["port"] == 5055
assert captured["token"] == "shared-secret"
assert captured["connect_timeout"] == 7.5
assert captured["frequency"] == 869618000
assert captured["sync_word"] == 0x12
assert captured["lbt_enabled"] is False
assert captured["lbt_max_attempts"] == 3
def test_get_radio_for_board_pymc_tcp_requires_host(monkeypatch):
pytest.importorskip("pymc_core.hardware.tcp_radio")
monkeypatch.setattr(
"pymc_core.hardware.tcp_radio.TCPLoRaRadio",
lambda **kwargs: _DummyRadio(),
)
board_config = {
"radio_type": "pymc_tcp",
"pymc_tcp": {"port": 5055},
"radio": _pymc_radio_cfg(),
}
with pytest.raises(ValueError, match="Missing 'host'"):
get_radio_for_board(board_config)
def test_get_radio_for_board_pymc_usb(monkeypatch):
pytest.importorskip("pymc_core.hardware.usb_radio")
captured = {}
class _DummyUSBLoRaRadio(_DummyRadio):
def __init__(self, **kwargs):
captured.update(kwargs)
monkeypatch.setattr(
"pymc_core.hardware.usb_radio.USBLoRaRadio",
_DummyUSBLoRaRadio,
)
board_config = {
"radio_type": "pymc_usb",
"pymc_usb": {
"port": "/dev/ttyACM0",
"baudrate": 921600,
},
"radio": _pymc_radio_cfg(),
}
get_radio_for_board(board_config)
assert captured["port"] == "/dev/ttyACM0"
assert captured["baudrate"] == 921600
assert captured["frequency"] == 869618000
assert captured["sync_word"] == 0x12
# LBT defaults preserved when omitted from pymc_usb section.
assert captured["lbt_enabled"] is True
assert captured["lbt_max_attempts"] == 5
def test_get_radio_for_board_pymc_usb_requires_port(monkeypatch):
pytest.importorskip("pymc_core.hardware.usb_radio")
monkeypatch.setattr(
"pymc_core.hardware.usb_radio.USBLoRaRadio",
lambda **kwargs: _DummyRadio(),
)
board_config = {
"radio_type": "pymc_usb",
# Section present (baudrate set) but `port` deliberately omitted to
# exercise the inner "Missing 'port'" guard rather than the outer
# "Missing 'pymc_usb' section" one.
"pymc_usb": {"baudrate": 921600},
"radio": _pymc_radio_cfg(),
}
with pytest.raises(ValueError, match="Missing 'port'"):
get_radio_for_board(board_config)