From 4c1d5fb8ec79838b980f06e210eef85e8c395952 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Mon, 2 Mar 2026 23:10:25 -0800 Subject: [PATCH] Add status/LWT to community MQTT ingest --- app/community_mqtt.py | 43 +++++++++++ app/mqtt_base.py | 12 +++- tests/test_community_mqtt.py | 135 +++++++++++++++++++++++++++++++++++ tests/test_mqtt.py | 26 +++++++ 4 files changed, 214 insertions(+), 2 deletions(-) diff --git a/app/community_mqtt.py b/app/community_mqtt.py index 8da1165..f6acfe3 100644 --- a/app/community_mqtt.py +++ b/app/community_mqtt.py @@ -20,6 +20,7 @@ import time from datetime import datetime from typing import Any +import aiomqtt import nacl.bindings from app.models import AppSettings @@ -252,6 +253,12 @@ def _format_raw_packet(data: dict[str, Any], device_name: str, public_key_hex: s return packet +def _build_status_topic(settings: AppSettings, pubkey_hex: str) -> str: + """Build the ``meshcore/{IATA}/{PUBKEY}/status`` topic string.""" + iata = settings.community_mqtt_iata.upper().strip() + return f"meshcore/{iata}/{pubkey_hex}/status" + + class CommunityMqttPublisher(BaseMqttPublisher): """Manages the community MQTT connection and publishes raw packets.""" @@ -308,6 +315,15 @@ class CommunityMqttPublisher(BaseMqttPublisher): tls_context = ssl.create_default_context() + status_topic = _build_status_topic(settings, pubkey_hex) + offline_payload = json.dumps( + { + "status": "offline", + "origin_id": pubkey_hex, + "client": _CLIENT_ID, + } + ) + return { "hostname": broker_host, "port": broker_port, @@ -316,6 +332,7 @@ class CommunityMqttPublisher(BaseMqttPublisher): "websocket_path": "/", "username": f"v1_{pubkey_hex}", "password": jwt_token, + "will": aiomqtt.Will(status_topic, offline_payload, retain=True), } def _on_connected(self, settings: AppSettings) -> tuple[str, str]: @@ -323,6 +340,32 @@ class CommunityMqttPublisher(BaseMqttPublisher): broker_port = settings.community_mqtt_broker_port or _DEFAULT_PORT return ("Community MQTT connected", f"{broker_host}:{broker_port}") + async def _on_connected_async(self, settings: AppSettings) -> None: + """Publish a retained online status message after connecting.""" + from app.keystore import get_public_key + from app.radio import radio_manager + + public_key = get_public_key() + if public_key is None: + return + + pubkey_hex = public_key.hex().upper() + + device_name = "" + if radio_manager.meshcore and radio_manager.meshcore.self_info: + device_name = radio_manager.meshcore.self_info.get("name", "") + + status_topic = _build_status_topic(settings, pubkey_hex) + payload = { + "status": "online", + "timestamp": datetime.now().isoformat(), + "origin": device_name or "MeshCore Device", + "origin_id": pubkey_hex, + "client": _CLIENT_ID, + } + + await self.publish(status_topic, payload, retain=True) + def _on_error(self) -> tuple[str, str]: return ( "Community MQTT connection failure", diff --git a/app/mqtt_base.py b/app/mqtt_base.py index 9b065da..57e0102 100644 --- a/app/mqtt_base.py +++ b/app/mqtt_base.py @@ -79,12 +79,12 @@ class BaseMqttPublisher(ABC): await self.stop() await self.start(settings) - async def publish(self, topic: str, payload: dict[str, Any]) -> None: + async def publish(self, topic: str, payload: dict[str, Any], *, retain: bool = False) -> None: """Publish a JSON payload. Drops silently if not connected.""" if self._client is None or not self.connected: return try: - await self._client.publish(topic, json.dumps(payload)) + await self._client.publish(topic, json.dumps(payload), retain=retain) except Exception as e: logger.warning("%s publish failed on %s: %s", self._log_prefix, topic, e) self.connected = False @@ -124,6 +124,13 @@ class BaseMqttPublisher(ABC): """Called each time the loop finds the publisher not configured.""" return # no-op by default; subclasses may override + async def _on_connected_async(self, settings: AppSettings) -> None: + """Async hook called after connection succeeds (before health broadcast). + + Subclasses can override to publish messages immediately after connecting. + """ + return # no-op by default + # ── Connection loop ──────────────────────────────────────────────── async def _connection_loop(self) -> None: @@ -170,6 +177,7 @@ class BaseMqttPublisher(ABC): title, detail = self._on_connected(settings) broadcast_success(title, detail) + await self._on_connected_async(settings) _broadcast_health() # Wait until cancelled or settings version changes. diff --git a/tests/test_community_mqtt.py b/tests/test_community_mqtt.py index 91024ec..669f9db 100644 --- a/tests/test_community_mqtt.py +++ b/tests/test_community_mqtt.py @@ -11,6 +11,7 @@ from app.community_mqtt import ( _DEFAULT_BROKER, CommunityMqttPublisher, _base64url_encode, + _build_status_topic, _calculate_packet_hash, _ed25519_sign_expanded, _format_raw_packet, @@ -432,3 +433,137 @@ class TestPublishFailureSetsDisconnected: pub._client = mock_client await pub.publish("topic", {"data": "test"}) assert pub.connected is False + + +class TestBuildStatusTopic: + def test_builds_correct_topic(self): + settings = AppSettings(community_mqtt_iata="LAX") + topic = _build_status_topic(settings, "AABB1122") + assert topic == "meshcore/LAX/AABB1122/status" + + def test_iata_uppercased_and_stripped(self): + settings = AppSettings(community_mqtt_iata=" lax ") + topic = _build_status_topic(settings, "PUBKEY") + assert topic == "meshcore/LAX/PUBKEY/status" + + +class TestLwtAndStatusPublish: + def test_build_client_kwargs_includes_will(self): + """_build_client_kwargs should return a will with offline status.""" + pub = CommunityMqttPublisher() + private_key, public_key = _make_test_keys() + pubkey_hex = public_key.hex().upper() + settings = AppSettings( + community_mqtt_enabled=True, + community_mqtt_iata="SFO", + ) + + with ( + patch("app.keystore.get_private_key", return_value=private_key), + patch("app.keystore.get_public_key", return_value=public_key), + ): + kwargs = pub._build_client_kwargs(settings) + + assert "will" in kwargs + will = kwargs["will"] + assert will.topic == f"meshcore/SFO/{pubkey_hex}/status" + assert will.retain is True + payload = json.loads(will.payload) + assert payload["status"] == "offline" + assert payload["origin_id"] == pubkey_hex + assert payload["client"] == _CLIENT_ID + + @pytest.mark.asyncio + async def test_on_connected_async_publishes_online_status(self): + """_on_connected_async should publish a retained online status.""" + from unittest.mock import AsyncMock + + pub = CommunityMqttPublisher() + private_key, public_key = _make_test_keys() + pubkey_hex = public_key.hex().upper() + settings = AppSettings( + community_mqtt_enabled=True, + community_mqtt_iata="LAX", + ) + + mock_radio = MagicMock() + mock_radio.meshcore = MagicMock() + mock_radio.meshcore.self_info = {"name": "TestNode"} + + with ( + patch("app.keystore.get_public_key", return_value=public_key), + patch("app.radio.radio_manager", mock_radio), + patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish, + ): + await pub._on_connected_async(settings) + + mock_publish.assert_called_once() + topic = mock_publish.call_args[0][0] + payload = mock_publish.call_args[0][1] + retain = mock_publish.call_args[1]["retain"] + + assert topic == f"meshcore/LAX/{pubkey_hex}/status" + assert retain is True + assert payload["status"] == "online" + assert payload["origin"] == "TestNode" + assert payload["origin_id"] == pubkey_hex + assert payload["client"] == _CLIENT_ID + assert "timestamp" in payload + + def test_lwt_and_online_share_same_topic(self): + """LWT and on-connect status should use the same topic path.""" + pub = CommunityMqttPublisher() + private_key, public_key = _make_test_keys() + pubkey_hex = public_key.hex().upper() + settings = AppSettings( + community_mqtt_enabled=True, + community_mqtt_iata="JFK", + ) + + with ( + patch("app.keystore.get_private_key", return_value=private_key), + patch("app.keystore.get_public_key", return_value=public_key), + ): + kwargs = pub._build_client_kwargs(settings) + + lwt_topic = kwargs["will"].topic + expected_topic = _build_status_topic(settings, pubkey_hex) + assert lwt_topic == expected_topic + + @pytest.mark.asyncio + async def test_on_connected_async_skips_when_no_public_key(self): + """_on_connected_async should no-op when public key is unavailable.""" + from unittest.mock import AsyncMock + + pub = CommunityMqttPublisher() + settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX") + + with ( + patch("app.keystore.get_public_key", return_value=None), + patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish, + ): + await pub._on_connected_async(settings) + + mock_publish.assert_not_called() + + @pytest.mark.asyncio + async def test_on_connected_async_uses_fallback_device_name(self): + """Should use 'MeshCore Device' when radio name is unavailable.""" + from unittest.mock import AsyncMock + + pub = CommunityMqttPublisher() + _, public_key = _make_test_keys() + settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX") + + mock_radio = MagicMock() + mock_radio.meshcore = None + + with ( + patch("app.keystore.get_public_key", return_value=public_key), + patch("app.radio.radio_manager", mock_radio), + patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish, + ): + await pub._on_connected_async(settings) + + payload = mock_publish.call_args[0][1] + assert payload["origin"] == "MeshCore Device" diff --git a/tests/test_mqtt.py b/tests/test_mqtt.py index eab5250..7be3473 100644 --- a/tests/test_mqtt.py +++ b/tests/test_mqtt.py @@ -108,6 +108,32 @@ class TestMqttPublisher: call_args = mock_client.publish.call_args assert call_args[0][0] == "test/topic" + @pytest.mark.asyncio + async def test_publish_passes_retain_flag(self): + pub = MqttPublisher() + pub.connected = True + mock_client = AsyncMock() + pub._client = mock_client + + await pub.publish("test/topic", {"msg": "hello"}, retain=True) + + mock_client.publish.assert_called_once() + call_args = mock_client.publish.call_args + assert call_args[0][0] == "test/topic" + assert call_args[1]["retain"] is True + + @pytest.mark.asyncio + async def test_publish_retain_defaults_false(self): + pub = MqttPublisher() + pub.connected = True + mock_client = AsyncMock() + pub._client = mock_client + + await pub.publish("test/topic", {"msg": "hello"}) + + call_args = mock_client.publish.call_args + assert call_args[1]["retain"] is False + @pytest.mark.asyncio async def test_publish_handles_exception_gracefully(self): pub = MqttPublisher()