Files
Remote-Terminal-for-MeshCore/tests/test_community_mqtt.py
Jack Kingsman 5f039b9c41 Phase 4
2026-03-07 15:05:13 -08:00

1055 lines
40 KiB
Python

"""Tests for community MQTT publisher."""
import json
import time
from contextlib import asynccontextmanager
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import nacl.bindings
import pytest
from app.fanout.community_mqtt import (
_CLIENT_ID,
_DEFAULT_BROKER,
_STATS_REFRESH_INTERVAL,
CommunityMqttPublisher,
_base64url_encode,
_build_radio_info,
_build_status_topic,
_calculate_packet_hash,
_ed25519_sign_expanded,
_format_raw_packet,
_generate_jwt_token,
_get_client_version,
)
def _make_test_keys() -> tuple[bytes, bytes]:
"""Generate a test MeshCore-format key pair.
Returns (private_key_64_bytes, public_key_32_bytes).
MeshCore format: scalar(32) || prefix(32), where scalar is already clamped.
"""
import hashlib
import os
seed = os.urandom(32)
expanded = hashlib.sha512(seed).digest()
scalar = bytearray(expanded[:32])
# Clamp scalar (standard Ed25519 clamping)
scalar[0] &= 248
scalar[31] &= 127
scalar[31] |= 64
scalar = bytes(scalar)
prefix = expanded[32:]
private_key = scalar + prefix
public_key = nacl.bindings.crypto_scalarmult_ed25519_base_noclamp(scalar)
return private_key, public_key
def _make_community_settings(**overrides) -> SimpleNamespace:
"""Create a settings namespace with all community MQTT fields."""
defaults = {
"community_mqtt_enabled": True,
"community_mqtt_broker_host": "mqtt-us-v1.letsmesh.net",
"community_mqtt_broker_port": 443,
"community_mqtt_iata": "",
"community_mqtt_email": "",
}
defaults.update(overrides)
return SimpleNamespace(**defaults)
class TestBase64UrlEncode:
def test_encodes_without_padding(self):
result = _base64url_encode(b"\x00\x01\x02")
assert "=" not in result
def test_uses_url_safe_chars(self):
# Bytes that would produce + and / in standard base64
result = _base64url_encode(b"\xfb\xff\xfe")
assert "+" not in result
assert "/" not in result
class TestJwtGeneration:
def test_token_has_three_parts(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
parts = token.split(".")
assert len(parts) == 3
def test_header_contains_ed25519_alg(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
header_b64 = token.split(".")[0]
# Add padding for base64 decoding
import base64
padded = header_b64 + "=" * (4 - len(header_b64) % 4)
header = json.loads(base64.urlsafe_b64decode(padded))
assert header["alg"] == "Ed25519"
assert header["typ"] == "JWT"
def test_payload_contains_required_fields(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
payload_b64 = token.split(".")[1]
import base64
padded = payload_b64 + "=" * (4 - len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(padded))
assert payload["publicKey"] == public_key.hex().upper()
assert "iat" in payload
assert "exp" in payload
assert payload["exp"] - payload["iat"] == 86400
assert payload["aud"] == _DEFAULT_BROKER
assert payload["owner"] == public_key.hex().upper()
assert payload["client"] == _CLIENT_ID
assert "email" not in payload # omitted when empty
def test_payload_includes_email_when_provided(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key, email="test@example.com")
payload_b64 = token.split(".")[1]
import base64
padded = payload_b64 + "=" * (4 - len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(padded))
assert payload["email"] == "test@example.com"
def test_payload_uses_custom_audience(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key, audience="custom.broker.net")
payload_b64 = token.split(".")[1]
import base64
padded = payload_b64 + "=" * (4 - len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(padded))
assert payload["aud"] == "custom.broker.net"
def test_signature_is_valid_hex(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
sig_hex = token.split(".")[2]
sig_bytes = bytes.fromhex(sig_hex)
assert len(sig_bytes) == 64
def test_signature_verifies(self):
"""Verify the JWT signature using nacl.bindings.crypto_sign_open."""
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
parts = token.split(".")
signing_input = f"{parts[0]}.{parts[1]}".encode()
signature = bytes.fromhex(parts[2])
# crypto_sign_open expects signature + message concatenated
signed_message = signature + signing_input
# This will raise if the signature is invalid
verified = nacl.bindings.crypto_sign_open(signed_message, public_key)
assert verified == signing_input
class TestEddsaSignExpanded:
def test_produces_64_byte_signature(self):
private_key, public_key = _make_test_keys()
message = b"test message"
sig = _ed25519_sign_expanded(message, private_key[:32], private_key[32:], public_key)
assert len(sig) == 64
def test_signature_verifies_with_nacl(self):
private_key, public_key = _make_test_keys()
message = b"hello world"
sig = _ed25519_sign_expanded(message, private_key[:32], private_key[32:], public_key)
signed_message = sig + message
verified = nacl.bindings.crypto_sign_open(signed_message, public_key)
assert verified == message
def test_different_messages_produce_different_signatures(self):
private_key, public_key = _make_test_keys()
sig1 = _ed25519_sign_expanded(b"msg1", private_key[:32], private_key[32:], public_key)
sig2 = _ed25519_sign_expanded(b"msg2", private_key[:32], private_key[32:], public_key)
assert sig1 != sig2
class TestPacketFormatConversion:
def test_basic_field_mapping(self):
data = {
"id": 1,
"observation_id": 100,
"timestamp": 1700000000,
"data": "0a1b2c3d",
"payload_type": "ADVERT",
"snr": 5.5,
"rssi": -90,
"decrypted": False,
"decrypted_info": None,
}
result = _format_raw_packet(data, "TestNode", "AABBCCDD" * 8)
assert result["origin"] == "TestNode"
assert result["origin_id"] == "AABBCCDD" * 8
assert result["raw"] == "0A1B2C3D"
assert result["SNR"] == "5.5"
assert result["RSSI"] == "-90"
assert result["type"] == "PACKET"
assert result["direction"] == "rx"
assert result["len"] == "4"
def test_timestamp_is_iso8601(self):
data = {"timestamp": 1700000000, "data": "00", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["timestamp"]
assert "T" in result["timestamp"]
def test_snr_rssi_unknown_when_none(self):
data = {"timestamp": 0, "data": "00", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["SNR"] == "Unknown"
assert result["RSSI"] == "Unknown"
def test_packet_type_extraction(self):
# Header 0x14 = type 5, route 0 (TRANSPORT_FLOOD): header + 4 transport + path_len.
data = {"timestamp": 0, "data": "140102030400", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["packet_type"] == "5"
assert result["route"] == "F"
def test_route_mapping(self):
# Test all 4 route types (matches meshcore-packet-capture)
# TRANSPORT_FLOOD=0 -> "F", FLOOD=1 -> "F", DIRECT=2 -> "D", TRANSPORT_DIRECT=3 -> "T"
samples = [
("000102030400", "F"), # TRANSPORT_FLOOD: header + transport + path_len
("0100", "F"), # FLOOD: header + path_len
("0200", "D"), # DIRECT: header + path_len
("030102030400", "T"), # TRANSPORT_DIRECT: header + transport + path_len
]
for raw_hex, expected in samples:
data = {"timestamp": 0, "data": raw_hex, "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["route"] == expected
def test_hash_is_16_uppercase_hex_chars(self):
data = {"timestamp": 0, "data": "aabb", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert len(result["hash"]) == 16
assert result["hash"] == result["hash"].upper()
def test_empty_data_handled(self):
data = {"timestamp": 0, "data": "", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["raw"] == ""
assert result["len"] == "0"
assert result["packet_type"] == "0"
assert result["route"] == "U"
def test_includes_reference_time_fields(self):
data = {"timestamp": 0, "data": "0100aabb", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["time"]
assert result["date"]
assert result["payload_len"] == "2"
def test_adds_path_for_direct_route(self):
# route=2 (DIRECT), path_len=2, path=aa bb, payload=cc
data = {"timestamp": 0, "data": "0202AABBCC", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["route"] == "D"
assert result["path"] == "aa,bb"
def test_adds_path_for_multi_byte_direct_route(self):
data = {"timestamp": 0, "data": "024220273031CC", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["route"] == "D"
assert result["path"] == "2027,3031"
def test_direct_route_includes_empty_path_field(self):
data = {"timestamp": 0, "data": "0200", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["route"] == "D"
assert "path" in result
assert result["path"] == ""
def test_unknown_version_uses_defaults(self):
# version=1 in high bits, type=5, route=1
header = (1 << 6) | (5 << 2) | 1
data = {"timestamp": 0, "data": f"{header:02x}00", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["packet_type"] == "0"
assert result["route"] == "U"
assert result["payload_len"] == "0"
class TestCalculatePacketHash:
def test_empty_bytes_returns_zeroes(self):
result = _calculate_packet_hash(b"")
assert result == "0" * 16
def test_returns_16_uppercase_hex_chars(self):
# Simple flood packet: header(1) + path_len(1) + payload
raw = bytes([0x01, 0x00, 0xAA, 0xBB]) # FLOOD, no path, payload=0xAABB
result = _calculate_packet_hash(raw)
assert len(result) == 16
assert result == result.upper()
def test_flood_packet_hash(self):
"""FLOOD route (0x01): no transport codes, header + path_len + payload."""
import hashlib
# Header 0x11 = route=FLOOD(1), payload_type=4(ADVERT): (4<<2)|1 = 0x11
payload = b"\xde\xad"
raw = bytes([0x11, 0x00]) + payload # header, path_len=0, payload
result = _calculate_packet_hash(raw)
# Expected: sha256(payload_type_byte + payload_data)[:16].upper()
expected = hashlib.sha256(bytes([4]) + payload).hexdigest()[:16].upper()
assert result == expected
def test_transport_flood_skips_transport_codes(self):
"""TRANSPORT_FLOOD (0x00): has 4 bytes of transport codes after header."""
import hashlib
# Header 0x10 = route=TRANSPORT_FLOOD(0), payload_type=4: (4<<2)|0 = 0x10
transport_codes = b"\x01\x02\x03\x04"
payload = b"\xca\xfe"
raw = bytes([0x10]) + transport_codes + bytes([0x00]) + payload
result = _calculate_packet_hash(raw)
expected = hashlib.sha256(bytes([4]) + payload).hexdigest()[:16].upper()
assert result == expected
def test_transport_direct_skips_transport_codes(self):
"""TRANSPORT_DIRECT (0x03): also has 4 bytes of transport codes."""
import hashlib
# Header 0x13 = route=TRANSPORT_DIRECT(3), payload_type=4: (4<<2)|3 = 0x13
transport_codes = b"\x05\x06\x07\x08"
payload = b"\xbe\xef"
raw = bytes([0x13]) + transport_codes + bytes([0x00]) + payload
result = _calculate_packet_hash(raw)
expected = hashlib.sha256(bytes([4]) + payload).hexdigest()[:16].upper()
assert result == expected
def test_trace_packet_includes_path_len_in_hash(self):
"""TRACE packets (type 9) include path_len as uint16_t LE in the hash."""
import hashlib
# Header for TRACE with FLOOD route: (9<<2)|1 = 0x25
path_len = 3
path_data = b"\xaa\xbb\xcc"
payload = b"\x01\x02"
raw = bytes([0x25, path_len]) + path_data + payload
result = _calculate_packet_hash(raw)
expected_hash = (
hashlib.sha256(bytes([9]) + path_len.to_bytes(2, byteorder="little") + payload)
.hexdigest()[:16]
.upper()
)
assert result == expected_hash
def test_with_path_data(self):
"""Packet with non-zero path_len should skip path bytes to reach payload."""
import hashlib
# FLOOD route, payload_type=2 (TXT_MSG): (2<<2)|1 = 0x09
path_data = b"\xaa\xbb" # 2 bytes of path
payload = b"\x48\x65\x6c\x6c\x6f" # "Hello"
raw = bytes([0x09, 0x02]) + path_data + payload
result = _calculate_packet_hash(raw)
expected = hashlib.sha256(bytes([2]) + payload).hexdigest()[:16].upper()
assert result == expected
def test_multi_byte_path_uses_packed_path_byte_for_trace_hash(self):
import hashlib
payload = b"\x99\x88"
raw = bytes([0x25, 0x42, 0x20, 0x27, 0x30, 0x31]) + payload
result = _calculate_packet_hash(raw)
expected = (
hashlib.sha256(bytes([9]) + (0x42).to_bytes(2, byteorder="little") + payload)
.hexdigest()[:16]
.upper()
)
assert result == expected
def test_multi_byte_path_skips_full_byte_length(self):
import hashlib
payload = b"\xde\xad\xbe\xef"
raw = bytes([0x09, 0x42, 0x20, 0x27, 0x30, 0x31]) + payload
result = _calculate_packet_hash(raw)
expected = hashlib.sha256(bytes([2]) + payload).hexdigest()[:16].upper()
assert result == expected
def test_truncated_packet_returns_zeroes(self):
# Header says TRANSPORT_FLOOD, but missing path_len at required offset.
raw = bytes([0x10, 0x01, 0x02])
assert _calculate_packet_hash(raw) == "0" * 16
class TestCommunityMqttPublisher:
def test_initial_state(self):
pub = CommunityMqttPublisher()
assert pub.connected is False
assert pub._client is None
assert pub._task is None
@pytest.mark.asyncio
async def test_publish_drops_when_disconnected(self):
pub = CommunityMqttPublisher()
# Should not raise
await pub.publish("topic", {"key": "value"})
@pytest.mark.asyncio
async def test_stop_resets_state(self):
pub = CommunityMqttPublisher()
pub.connected = True
pub._client = MagicMock()
await pub.stop()
assert pub.connected is False
assert pub._client is None
def test_is_configured_false_when_disabled(self):
pub = CommunityMqttPublisher()
pub._settings = SimpleNamespace(community_mqtt_enabled=False)
with patch("app.keystore.has_private_key", return_value=True):
assert pub._is_configured() is False
def test_is_configured_false_when_no_private_key(self):
pub = CommunityMqttPublisher()
pub._settings = SimpleNamespace(community_mqtt_enabled=True)
with patch("app.keystore.has_private_key", return_value=False):
assert pub._is_configured() is False
def test_is_configured_true_when_enabled_with_key(self):
pub = CommunityMqttPublisher()
pub._settings = SimpleNamespace(community_mqtt_enabled=True)
with patch("app.keystore.has_private_key", return_value=True):
assert pub._is_configured() is True
class TestPublishFailureSetsDisconnected:
@pytest.mark.asyncio
async def test_publish_error_sets_connected_false(self):
"""A publish error should set connected=False so the loop can detect it."""
pub = CommunityMqttPublisher()
pub.connected = True
mock_client = MagicMock()
mock_client.publish = MagicMock(side_effect=Exception("broker gone"))
pub._client = mock_client
await pub.publish("topic", {"data": "test"})
assert pub.connected is False
class TestBuildStatusTopic:
def test_builds_correct_topic(self):
settings = SimpleNamespace(community_mqtt_iata="LAX")
topic = _build_status_topic(settings, "AABB1122")
assert topic == "meshcore/LAX/AABB1122/status"
def test_iata_uppercased_and_stripped(self):
settings = SimpleNamespace(community_mqtt_iata=" lax ")
topic = _build_status_topic(settings, "PUBKEY")
assert topic == "meshcore/LAX/PUBKEY/status"
class TestLwtAndStatusPublish:
def test_build_client_kwargs_includes_will(self):
"""_build_client_kwargs should return a will with offline status."""
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = _make_community_settings(community_mqtt_iata="SFO")
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "TestNode"}
with (
patch("app.keystore.get_private_key", return_value=private_key),
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
):
kwargs = pub._build_client_kwargs(settings)
assert "will" in kwargs
will = kwargs["will"]
assert will.topic == f"meshcore/SFO/{pubkey_hex}/status"
assert will.retain is True
payload = json.loads(will.payload)
assert payload["status"] == "offline"
assert payload["origin"] == "TestNode"
assert payload["origin_id"] == pubkey_hex
assert "timestamp" in payload
assert "client" not in payload
@pytest.mark.asyncio
async def test_on_connected_async_publishes_online_status(self):
"""_on_connected_async should publish a retained online status with enriched fields."""
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = SimpleNamespace(
community_mqtt_enabled=True,
community_mqtt_iata="LAX",
)
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "TestNode"}
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "T-Deck", "firmware_version": "v2.2.2 (Build: 2025-01-15)"},
),
patch.object(
pub, "_fetch_stats", new_callable=AsyncMock, return_value={"battery_mv": 4200}
),
patch("app.fanout.community_mqtt._build_radio_info", return_value="915.0,250.0,10,8"),
patch("app.fanout.community_mqtt._get_client_version", return_value="RemoteTerm 2.4.0"),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._on_connected_async(settings)
mock_publish.assert_called_once()
topic = mock_publish.call_args[0][0]
payload = mock_publish.call_args[0][1]
retain = mock_publish.call_args[1]["retain"]
assert topic == f"meshcore/LAX/{pubkey_hex}/status"
assert retain is True
assert payload["status"] == "online"
assert payload["origin"] == "TestNode"
assert payload["origin_id"] == pubkey_hex
assert "client" not in payload
assert "timestamp" in payload
assert payload["model"] == "T-Deck"
assert payload["firmware_version"] == "v2.2.2 (Build: 2025-01-15)"
assert payload["radio"] == "915.0,250.0,10,8"
assert payload["client_version"] == "RemoteTerm 2.4.0"
assert payload["stats"] == {"battery_mv": 4200}
def test_lwt_and_online_share_same_topic(self):
"""LWT and on-connect status should use the same topic path."""
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = _make_community_settings(community_mqtt_iata="JFK")
mock_radio = MagicMock()
mock_radio.meshcore = None
with (
patch("app.keystore.get_private_key", return_value=private_key),
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
):
kwargs = pub._build_client_kwargs(settings)
lwt_topic = kwargs["will"].topic
expected_topic = _build_status_topic(settings, pubkey_hex)
assert lwt_topic == expected_topic
@pytest.mark.asyncio
async def test_on_connected_async_skips_when_no_public_key(self):
"""_on_connected_async should no-op when public key is unavailable."""
pub = CommunityMqttPublisher()
settings = SimpleNamespace(community_mqtt_enabled=True, community_mqtt_iata="LAX")
with (
patch("app.keystore.get_public_key", return_value=None),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._on_connected_async(settings)
mock_publish.assert_not_called()
@pytest.mark.asyncio
async def test_on_connected_async_uses_fallback_device_name(self):
"""Should use 'MeshCore Device' when radio name is unavailable."""
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
settings = SimpleNamespace(community_mqtt_enabled=True, community_mqtt_iata="LAX")
mock_radio = MagicMock()
mock_radio.meshcore = None
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "unknown", "firmware_version": "unknown"},
),
patch.object(pub, "_fetch_stats", new_callable=AsyncMock, return_value=None),
patch("app.fanout.community_mqtt._build_radio_info", return_value="0,0,0,0"),
patch(
"app.fanout.community_mqtt._get_client_version", return_value="RemoteTerm unknown"
),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._on_connected_async(settings)
payload = mock_publish.call_args[0][1]
assert payload["origin"] == "MeshCore Device"
def _mock_radio_operation(mc_mock):
"""Create a mock async context manager for radio_operation."""
@asynccontextmanager
async def _op(*args, **kwargs):
yield mc_mock
return _op
class TestFetchDeviceInfo:
@pytest.mark.asyncio
async def test_success_fw_ver_3(self):
"""Should extract model and firmware_version from DEVICE_INFO with fw ver >= 3."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.send_device_query = AsyncMock(
return_value=Event(
EventType.DEVICE_INFO,
{"fw ver": 3, "model": "T-Deck", "ver": "2.2.2", "fw_build": "2025-01-15"},
)
)
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_device_info()
assert result["model"] == "T-Deck"
assert result["firmware_version"] == "v2.2.2 (Build: 2025-01-15)"
# Should be cached
assert pub._cached_device_info == result
@pytest.mark.asyncio
async def test_fw_ver_below_3_caches_old_version(self):
"""Should cache old firmware version string when fw ver < 3."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.send_device_query = AsyncMock(
return_value=Event(EventType.DEVICE_INFO, {"fw ver": 2})
)
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_device_info()
assert result["model"] == "unknown"
assert result["firmware_version"] == "v2"
# Should be cached (firmware doesn't change mid-connection)
assert pub._cached_device_info == result
@pytest.mark.asyncio
async def test_error_returns_fallback_not_cached(self):
"""Should return unknowns when device query returns ERROR, without caching."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.send_device_query = AsyncMock(return_value=Event(EventType.ERROR, {}))
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_device_info()
assert result["model"] == "unknown"
assert result["firmware_version"] == "unknown"
# Should NOT be cached — allows retry on next status publish
assert pub._cached_device_info is None
@pytest.mark.asyncio
async def test_radio_busy_returns_fallback_not_cached(self):
"""Should return unknowns when radio is busy, without caching."""
from app.radio import RadioOperationBusyError
pub = CommunityMqttPublisher()
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = MagicMock(side_effect=RadioOperationBusyError("busy"))
result = await pub._fetch_device_info()
assert result["model"] == "unknown"
assert result["firmware_version"] == "unknown"
# Should NOT be cached — allows retry when radio becomes available
assert pub._cached_device_info is None
@pytest.mark.asyncio
async def test_cached_result_returned_on_second_call(self):
"""Should return cached result without re-querying the radio."""
pub = CommunityMqttPublisher()
pub._cached_device_info = {"model": "T-Deck", "firmware_version": "v2.2.2"}
# No radio mock needed — should return cached
result = await pub._fetch_device_info()
assert result["model"] == "T-Deck"
@pytest.mark.asyncio
async def test_no_fw_build_omits_build_suffix(self):
"""When fw_build is empty, firmware_version should just be 'vX.Y.Z'."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.send_device_query = AsyncMock(
return_value=Event(
EventType.DEVICE_INFO,
{"fw ver": 3, "model": "Heltec", "ver": "1.0.0", "fw_build": ""},
)
)
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_device_info()
assert result["firmware_version"] == "v1.0.0"
class TestFetchStats:
@pytest.mark.asyncio
async def test_success_merges_core_and_radio(self):
"""Should merge STATS_CORE and STATS_RADIO payloads."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.get_stats_core = AsyncMock(
return_value=Event(
EventType.STATS_CORE,
{"battery_mv": 4200, "uptime_secs": 3600, "errors": 0, "queue_len": 0},
)
)
mc_mock.commands.get_stats_radio = AsyncMock(
return_value=Event(
EventType.STATS_RADIO,
{
"noise_floor": -120,
"last_rssi": -85,
"last_snr": 10.5,
"tx_air_secs": 42,
"rx_air_secs": 150,
},
)
)
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_stats()
assert result is not None
assert result["battery_mv"] == 4200
assert result["noise_floor"] == -120
assert result["tx_air_secs"] == 42
@pytest.mark.asyncio
async def test_core_error_sets_stats_unsupported(self):
"""Should set _stats_supported=False when STATS_CORE returns ERROR."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.get_stats_core = AsyncMock(return_value=Event(EventType.ERROR, {}))
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_stats()
assert pub._stats_supported is False
assert result is None # no cached stats yet
@pytest.mark.asyncio
async def test_radio_error_sets_stats_unsupported(self):
"""Should set _stats_supported=False when STATS_RADIO returns ERROR."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.get_stats_core = AsyncMock(
return_value=Event(
EventType.STATS_CORE,
{"battery_mv": 4200, "uptime_secs": 3600, "errors": 0, "queue_len": 0},
)
)
mc_mock.commands.get_stats_radio = AsyncMock(return_value=Event(EventType.ERROR, {}))
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
await pub._fetch_stats()
assert pub._stats_supported is False
@pytest.mark.asyncio
async def test_stats_unsupported_skips_radio(self):
"""When _stats_supported=False, should return cached stats without radio call."""
pub = CommunityMqttPublisher()
pub._stats_supported = False
pub._cached_stats = {"battery_mv": 4000}
result = await pub._fetch_stats()
assert result == {"battery_mv": 4000}
@pytest.mark.asyncio
async def test_cache_guard_prevents_refetch(self):
"""Should return cached stats when within cache window."""
pub = CommunityMqttPublisher()
pub._cached_stats = {"battery_mv": 4200}
pub._last_stats_fetch = time.monotonic() # Just fetched
result = await pub._fetch_stats()
assert result == {"battery_mv": 4200}
@pytest.mark.asyncio
async def test_radio_busy_returns_cached(self):
"""Should return cached stats when radio is busy."""
from app.radio import RadioOperationBusyError
pub = CommunityMqttPublisher()
pub._cached_stats = {"battery_mv": 3900}
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = MagicMock(side_effect=RadioOperationBusyError("busy"))
result = await pub._fetch_stats()
assert result == {"battery_mv": 3900}
class TestBuildRadioInfo:
def test_formatted_string(self):
"""Should return comma-separated radio info matching reference format."""
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {
"radio_freq": 915.0,
"radio_bw": 250.0,
"radio_sf": 10,
"radio_cr": 8,
}
with patch("app.radio.radio_manager", mock_radio):
result = _build_radio_info()
assert result == "915.0,250.0,10,8"
def test_fallback_when_no_meshcore(self):
"""Should return '0,0,0,0' when meshcore is None."""
mock_radio = MagicMock()
mock_radio.meshcore = None
with patch("app.radio.radio_manager", mock_radio):
result = _build_radio_info()
assert result == "0,0,0,0"
def test_fallback_when_self_info_missing_fields(self):
"""Should use 0 defaults when self_info lacks radio fields."""
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "TestNode"}
with patch("app.radio.radio_manager", mock_radio):
result = _build_radio_info()
assert result == "0,0,0,0"
class TestGetClientVersion:
def test_returns_remoteterm_prefix(self):
"""Should return 'RemoteTerm ...' string."""
result = _get_client_version()
assert result.startswith("RemoteTerm ")
def test_returns_version_from_metadata(self):
"""Should use importlib.metadata to get version."""
with patch("app.fanout.community_mqtt.importlib.metadata.version", return_value="1.2.3"):
result = _get_client_version()
assert result == "RemoteTerm 1.2.3"
def test_fallback_on_error(self):
"""Should return 'RemoteTerm unknown' if metadata lookup fails."""
with patch(
"app.fanout.community_mqtt.importlib.metadata.version",
side_effect=Exception("not found"),
):
result = _get_client_version()
assert result == "RemoteTerm unknown"
class TestPublishStatus:
@pytest.mark.asyncio
async def test_enriched_payload_fields(self):
"""_publish_status should include all enriched fields."""
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = SimpleNamespace(community_mqtt_enabled=True, community_mqtt_iata="LAX")
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "TestNode"}
stats = {"battery_mv": 4200, "uptime_secs": 3600, "noise_floor": -120}
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "T-Deck", "firmware_version": "v2.2.2 (Build: 2025-01-15)"},
),
patch.object(pub, "_fetch_stats", new_callable=AsyncMock, return_value=stats),
patch("app.fanout.community_mqtt._build_radio_info", return_value="915.0,250.0,10,8"),
patch("app.fanout.community_mqtt._get_client_version", return_value="RemoteTerm 2.4.0"),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._publish_status(settings)
payload = mock_publish.call_args[0][1]
assert payload["status"] == "online"
assert payload["origin"] == "TestNode"
assert payload["origin_id"] == pubkey_hex
assert "client" not in payload
assert payload["model"] == "T-Deck"
assert payload["firmware_version"] == "v2.2.2 (Build: 2025-01-15)"
assert payload["radio"] == "915.0,250.0,10,8"
assert payload["client_version"] == "RemoteTerm 2.4.0"
assert payload["stats"] == stats
@pytest.mark.asyncio
async def test_stats_omitted_when_none(self):
"""Should not include 'stats' key when stats are None."""
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
settings = SimpleNamespace(community_mqtt_enabled=True, community_mqtt_iata="LAX")
mock_radio = MagicMock()
mock_radio.meshcore = None
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "unknown", "firmware_version": "unknown"},
),
patch.object(pub, "_fetch_stats", new_callable=AsyncMock, return_value=None),
patch("app.fanout.community_mqtt._build_radio_info", return_value="0,0,0,0"),
patch(
"app.fanout.community_mqtt._get_client_version", return_value="RemoteTerm unknown"
),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._publish_status(settings)
payload = mock_publish.call_args[0][1]
assert "stats" not in payload
@pytest.mark.asyncio
async def test_updates_last_status_publish(self):
"""Should update _last_status_publish after publishing."""
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
settings = SimpleNamespace(community_mqtt_enabled=True, community_mqtt_iata="LAX")
mock_radio = MagicMock()
mock_radio.meshcore = None
before = time.monotonic()
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "unknown", "firmware_version": "unknown"},
),
patch.object(pub, "_fetch_stats", new_callable=AsyncMock, return_value=None),
patch("app.fanout.community_mqtt._build_radio_info", return_value="0,0,0,0"),
patch(
"app.fanout.community_mqtt._get_client_version", return_value="RemoteTerm unknown"
),
patch.object(pub, "publish", new_callable=AsyncMock),
):
await pub._publish_status(settings)
assert pub._last_status_publish >= before
@pytest.mark.asyncio
async def test_no_publish_key_returns_none(self):
"""Should skip publish when public key is unavailable."""
pub = CommunityMqttPublisher()
settings = SimpleNamespace(community_mqtt_enabled=True, community_mqtt_iata="LAX")
with (
patch("app.keystore.get_public_key", return_value=None),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._publish_status(settings)
mock_publish.assert_not_called()
class TestPeriodicWake:
@pytest.mark.asyncio
async def test_skips_before_interval(self):
"""Should not republish before _STATS_REFRESH_INTERVAL."""
pub = CommunityMqttPublisher()
pub._settings = SimpleNamespace(community_mqtt_enabled=True, community_mqtt_iata="LAX")
pub._last_status_publish = time.monotonic() # Just published
with patch.object(pub, "_publish_status", new_callable=AsyncMock) as mock_ps:
await pub._on_periodic_wake(60.0)
mock_ps.assert_not_called()
@pytest.mark.asyncio
async def test_publishes_after_interval(self):
"""Should republish after _STATS_REFRESH_INTERVAL elapsed."""
pub = CommunityMqttPublisher()
pub._settings = SimpleNamespace(community_mqtt_enabled=True, community_mqtt_iata="LAX")
pub._last_status_publish = time.monotonic() - _STATS_REFRESH_INTERVAL - 1
with patch.object(pub, "_publish_status", new_callable=AsyncMock) as mock_ps:
await pub._on_periodic_wake(360.0)
mock_ps.assert_called_once_with(pub._settings, refresh_stats=True)
@pytest.mark.asyncio
async def test_skips_when_no_settings(self):
"""Should no-op when settings are None."""
pub = CommunityMqttPublisher()
pub._settings = None
with patch.object(pub, "_publish_status", new_callable=AsyncMock) as mock_ps:
await pub._on_periodic_wake(360.0)
mock_ps.assert_not_called()