Add status/LWT to community MQTT ingest

This commit is contained in:
Jack Kingsman
2026-03-02 23:10:25 -08:00
parent fb279ccf1a
commit 4c1d5fb8ec
4 changed files with 214 additions and 2 deletions

View File

@@ -20,6 +20,7 @@ import time
from datetime import datetime
from typing import Any
import aiomqtt
import nacl.bindings
from app.models import AppSettings
@@ -252,6 +253,12 @@ def _format_raw_packet(data: dict[str, Any], device_name: str, public_key_hex: s
return packet
def _build_status_topic(settings: AppSettings, pubkey_hex: str) -> str:
"""Build the ``meshcore/{IATA}/{PUBKEY}/status`` topic string."""
iata = settings.community_mqtt_iata.upper().strip()
return f"meshcore/{iata}/{pubkey_hex}/status"
class CommunityMqttPublisher(BaseMqttPublisher):
"""Manages the community MQTT connection and publishes raw packets."""
@@ -308,6 +315,15 @@ class CommunityMqttPublisher(BaseMqttPublisher):
tls_context = ssl.create_default_context()
status_topic = _build_status_topic(settings, pubkey_hex)
offline_payload = json.dumps(
{
"status": "offline",
"origin_id": pubkey_hex,
"client": _CLIENT_ID,
}
)
return {
"hostname": broker_host,
"port": broker_port,
@@ -316,6 +332,7 @@ class CommunityMqttPublisher(BaseMqttPublisher):
"websocket_path": "/",
"username": f"v1_{pubkey_hex}",
"password": jwt_token,
"will": aiomqtt.Will(status_topic, offline_payload, retain=True),
}
def _on_connected(self, settings: AppSettings) -> tuple[str, str]:
@@ -323,6 +340,32 @@ class CommunityMqttPublisher(BaseMqttPublisher):
broker_port = settings.community_mqtt_broker_port or _DEFAULT_PORT
return ("Community MQTT connected", f"{broker_host}:{broker_port}")
async def _on_connected_async(self, settings: AppSettings) -> None:
"""Publish a retained online status message after connecting."""
from app.keystore import get_public_key
from app.radio import radio_manager
public_key = get_public_key()
if public_key is None:
return
pubkey_hex = public_key.hex().upper()
device_name = ""
if radio_manager.meshcore and radio_manager.meshcore.self_info:
device_name = radio_manager.meshcore.self_info.get("name", "")
status_topic = _build_status_topic(settings, pubkey_hex)
payload = {
"status": "online",
"timestamp": datetime.now().isoformat(),
"origin": device_name or "MeshCore Device",
"origin_id": pubkey_hex,
"client": _CLIENT_ID,
}
await self.publish(status_topic, payload, retain=True)
def _on_error(self) -> tuple[str, str]:
return (
"Community MQTT connection failure",

View File

@@ -79,12 +79,12 @@ class BaseMqttPublisher(ABC):
await self.stop()
await self.start(settings)
async def publish(self, topic: str, payload: dict[str, Any]) -> None:
async def publish(self, topic: str, payload: dict[str, Any], *, retain: bool = False) -> None:
"""Publish a JSON payload. Drops silently if not connected."""
if self._client is None or not self.connected:
return
try:
await self._client.publish(topic, json.dumps(payload))
await self._client.publish(topic, json.dumps(payload), retain=retain)
except Exception as e:
logger.warning("%s publish failed on %s: %s", self._log_prefix, topic, e)
self.connected = False
@@ -124,6 +124,13 @@ class BaseMqttPublisher(ABC):
"""Called each time the loop finds the publisher not configured."""
return # no-op by default; subclasses may override
async def _on_connected_async(self, settings: AppSettings) -> None:
"""Async hook called after connection succeeds (before health broadcast).
Subclasses can override to publish messages immediately after connecting.
"""
return # no-op by default
# ── Connection loop ────────────────────────────────────────────────
async def _connection_loop(self) -> None:
@@ -170,6 +177,7 @@ class BaseMqttPublisher(ABC):
title, detail = self._on_connected(settings)
broadcast_success(title, detail)
await self._on_connected_async(settings)
_broadcast_health()
# Wait until cancelled or settings version changes.

View File

@@ -11,6 +11,7 @@ from app.community_mqtt import (
_DEFAULT_BROKER,
CommunityMqttPublisher,
_base64url_encode,
_build_status_topic,
_calculate_packet_hash,
_ed25519_sign_expanded,
_format_raw_packet,
@@ -432,3 +433,137 @@ class TestPublishFailureSetsDisconnected:
pub._client = mock_client
await pub.publish("topic", {"data": "test"})
assert pub.connected is False
class TestBuildStatusTopic:
def test_builds_correct_topic(self):
settings = AppSettings(community_mqtt_iata="LAX")
topic = _build_status_topic(settings, "AABB1122")
assert topic == "meshcore/LAX/AABB1122/status"
def test_iata_uppercased_and_stripped(self):
settings = AppSettings(community_mqtt_iata=" lax ")
topic = _build_status_topic(settings, "PUBKEY")
assert topic == "meshcore/LAX/PUBKEY/status"
class TestLwtAndStatusPublish:
def test_build_client_kwargs_includes_will(self):
"""_build_client_kwargs should return a will with offline status."""
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = AppSettings(
community_mqtt_enabled=True,
community_mqtt_iata="SFO",
)
with (
patch("app.keystore.get_private_key", return_value=private_key),
patch("app.keystore.get_public_key", return_value=public_key),
):
kwargs = pub._build_client_kwargs(settings)
assert "will" in kwargs
will = kwargs["will"]
assert will.topic == f"meshcore/SFO/{pubkey_hex}/status"
assert will.retain is True
payload = json.loads(will.payload)
assert payload["status"] == "offline"
assert payload["origin_id"] == pubkey_hex
assert payload["client"] == _CLIENT_ID
@pytest.mark.asyncio
async def test_on_connected_async_publishes_online_status(self):
"""_on_connected_async should publish a retained online status."""
from unittest.mock import AsyncMock
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = AppSettings(
community_mqtt_enabled=True,
community_mqtt_iata="LAX",
)
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "TestNode"}
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._on_connected_async(settings)
mock_publish.assert_called_once()
topic = mock_publish.call_args[0][0]
payload = mock_publish.call_args[0][1]
retain = mock_publish.call_args[1]["retain"]
assert topic == f"meshcore/LAX/{pubkey_hex}/status"
assert retain is True
assert payload["status"] == "online"
assert payload["origin"] == "TestNode"
assert payload["origin_id"] == pubkey_hex
assert payload["client"] == _CLIENT_ID
assert "timestamp" in payload
def test_lwt_and_online_share_same_topic(self):
"""LWT and on-connect status should use the same topic path."""
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = AppSettings(
community_mqtt_enabled=True,
community_mqtt_iata="JFK",
)
with (
patch("app.keystore.get_private_key", return_value=private_key),
patch("app.keystore.get_public_key", return_value=public_key),
):
kwargs = pub._build_client_kwargs(settings)
lwt_topic = kwargs["will"].topic
expected_topic = _build_status_topic(settings, pubkey_hex)
assert lwt_topic == expected_topic
@pytest.mark.asyncio
async def test_on_connected_async_skips_when_no_public_key(self):
"""_on_connected_async should no-op when public key is unavailable."""
from unittest.mock import AsyncMock
pub = CommunityMqttPublisher()
settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
with (
patch("app.keystore.get_public_key", return_value=None),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._on_connected_async(settings)
mock_publish.assert_not_called()
@pytest.mark.asyncio
async def test_on_connected_async_uses_fallback_device_name(self):
"""Should use 'MeshCore Device' when radio name is unavailable."""
from unittest.mock import AsyncMock
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
mock_radio = MagicMock()
mock_radio.meshcore = None
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._on_connected_async(settings)
payload = mock_publish.call_args[0][1]
assert payload["origin"] == "MeshCore Device"

View File

@@ -108,6 +108,32 @@ class TestMqttPublisher:
call_args = mock_client.publish.call_args
assert call_args[0][0] == "test/topic"
@pytest.mark.asyncio
async def test_publish_passes_retain_flag(self):
pub = MqttPublisher()
pub.connected = True
mock_client = AsyncMock()
pub._client = mock_client
await pub.publish("test/topic", {"msg": "hello"}, retain=True)
mock_client.publish.assert_called_once()
call_args = mock_client.publish.call_args
assert call_args[0][0] == "test/topic"
assert call_args[1]["retain"] is True
@pytest.mark.asyncio
async def test_publish_retain_defaults_false(self):
pub = MqttPublisher()
pub.connected = True
mock_client = AsyncMock()
pub._client = mock_client
await pub.publish("test/topic", {"msg": "hello"})
call_args = mock_client.publish.call_args
assert call_args[1]["retain"] is False
@pytest.mark.asyncio
async def test_publish_handles_exception_gracefully(self):
pub = MqttPublisher()