mirror of
https://github.com/l5yth/potato-mesh.git
synced 2026-05-07 13:54:57 +02:00
Support mock serial interface in CI (#143)
This commit is contained in:
@@ -130,6 +130,7 @@ jobs:
|
||||
docker run --rm --name ingestor-test \
|
||||
-e POTATOMESH_INSTANCE=http://localhost:41447 \
|
||||
-e API_TOKEN=test-token \
|
||||
-e MESH_SERIAL=mock \
|
||||
-e DEBUG=1 \
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_PREFIX }}-ingestor-linux-amd64:${{ steps.version.outputs.version }} &
|
||||
sleep 5
|
||||
|
||||
+38
-1
@@ -42,6 +42,43 @@ INSTANCE = os.environ.get("POTATOMESH_INSTANCE", "").rstrip("/")
|
||||
API_TOKEN = os.environ.get("API_TOKEN", "")
|
||||
|
||||
|
||||
# --- Serial interface helpers --------------------------------------------------
|
||||
|
||||
|
||||
class _DummySerialInterface:
|
||||
"""In-memory replacement for ``meshtastic.serial_interface.SerialInterface``.
|
||||
|
||||
The GitHub Actions release tests run the ingestor container without access
|
||||
to a serial device. When ``MESH_SERIAL`` is set to ``"mock"`` (or similar)
|
||||
we provide this stub interface so the daemon can start and exercise its
|
||||
background loop without failing due to missing hardware.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.nodes = {}
|
||||
|
||||
def close(self):
|
||||
"""Mirror the real interface API."""
|
||||
pass
|
||||
|
||||
|
||||
def _create_serial_interface(port: str):
|
||||
"""Return an appropriate serial interface for ``port``.
|
||||
|
||||
Passing ``mock`` (case-insensitive) or an empty value skips hardware access
|
||||
and returns :class:`_DummySerialInterface`. This makes it possible to run
|
||||
the container in CI environments that do not expose serial devices while
|
||||
keeping production behaviour unchanged.
|
||||
"""
|
||||
|
||||
port_value = (port or "").strip()
|
||||
if port_value.lower() in {"", "mock", "none", "null", "disabled"}:
|
||||
if DEBUG:
|
||||
print(f"[debug] using dummy serial interface for port={port_value!r}")
|
||||
return _DummySerialInterface()
|
||||
return SerialInterface(devPath=port_value)
|
||||
|
||||
|
||||
# --- POST queue ----------------------------------------------------------------
|
||||
_POST_QUEUE_LOCK = threading.Lock()
|
||||
_POST_QUEUE = []
|
||||
@@ -422,7 +459,7 @@ def main():
|
||||
# Subscribe to PubSub topics (reliable in current meshtastic)
|
||||
pub.subscribe(on_receive, "meshtastic.receive")
|
||||
|
||||
iface = SerialInterface(devPath=PORT)
|
||||
iface = _create_serial_interface(PORT)
|
||||
|
||||
stop = threading.Event()
|
||||
|
||||
|
||||
@@ -102,6 +102,33 @@ def test_snapshot_interval_defaults_to_60_seconds(mesh_module):
|
||||
assert mesh.SNAPSHOT_SECS == 60
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", ["mock", "Mock", " disabled "])
|
||||
def test_create_serial_interface_allows_mock(mesh_module, value):
|
||||
mesh = mesh_module
|
||||
|
||||
iface = mesh._create_serial_interface(value)
|
||||
|
||||
assert isinstance(iface.nodes, dict)
|
||||
iface.close()
|
||||
|
||||
|
||||
def test_create_serial_interface_uses_serial_module(mesh_module, monkeypatch):
|
||||
mesh = mesh_module
|
||||
created = {}
|
||||
sentinel = object()
|
||||
|
||||
def fake_interface(*, devPath):
|
||||
created["devPath"] = devPath
|
||||
return SimpleNamespace(nodes={"!foo": sentinel}, close=lambda: None)
|
||||
|
||||
monkeypatch.setattr(mesh, "SerialInterface", fake_interface)
|
||||
|
||||
iface = mesh._create_serial_interface("/dev/ttyTEST")
|
||||
|
||||
assert created["devPath"] == "/dev/ttyTEST"
|
||||
assert iface.nodes == {"!foo": sentinel}
|
||||
|
||||
|
||||
def test_node_to_dict_handles_nested_structures(mesh_module):
|
||||
mesh = mesh_module
|
||||
|
||||
|
||||
Reference in New Issue
Block a user