Files
Remote-Terminal-for-MeshCore/tests/test_community_mqtt.py
2026-03-06 14:41:51 -08:00

1011 lines
39 KiB
Python

"""Tests for community MQTT publisher."""
import json
import time
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock, patch
import nacl.bindings
import pytest
from app.community_mqtt import (
_CLIENT_ID,
_DEFAULT_BROKER,
_STATS_REFRESH_INTERVAL,
CommunityMqttPublisher,
_base64url_encode,
_build_radio_info,
_build_status_topic,
_calculate_packet_hash,
_ed25519_sign_expanded,
_format_raw_packet,
_generate_jwt_token,
_get_client_version,
)
from app.models import AppSettings
def _make_test_keys() -> tuple[bytes, bytes]:
"""Generate a test MeshCore-format key pair.
Returns (private_key_64_bytes, public_key_32_bytes).
MeshCore format: scalar(32) || prefix(32), where scalar is already clamped.
"""
import hashlib
import os
seed = os.urandom(32)
expanded = hashlib.sha512(seed).digest()
scalar = bytearray(expanded[:32])
# Clamp scalar (standard Ed25519 clamping)
scalar[0] &= 248
scalar[31] &= 127
scalar[31] |= 64
scalar = bytes(scalar)
prefix = expanded[32:]
private_key = scalar + prefix
public_key = nacl.bindings.crypto_scalarmult_ed25519_base_noclamp(scalar)
return private_key, public_key
class TestBase64UrlEncode:
def test_encodes_without_padding(self):
result = _base64url_encode(b"\x00\x01\x02")
assert "=" not in result
def test_uses_url_safe_chars(self):
# Bytes that would produce + and / in standard base64
result = _base64url_encode(b"\xfb\xff\xfe")
assert "+" not in result
assert "/" not in result
class TestJwtGeneration:
def test_token_has_three_parts(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
parts = token.split(".")
assert len(parts) == 3
def test_header_contains_ed25519_alg(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
header_b64 = token.split(".")[0]
# Add padding for base64 decoding
import base64
padded = header_b64 + "=" * (4 - len(header_b64) % 4)
header = json.loads(base64.urlsafe_b64decode(padded))
assert header["alg"] == "Ed25519"
assert header["typ"] == "JWT"
def test_payload_contains_required_fields(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
payload_b64 = token.split(".")[1]
import base64
padded = payload_b64 + "=" * (4 - len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(padded))
assert payload["publicKey"] == public_key.hex().upper()
assert "iat" in payload
assert "exp" in payload
assert payload["exp"] - payload["iat"] == 86400
assert payload["aud"] == _DEFAULT_BROKER
assert payload["owner"] == public_key.hex().upper()
assert payload["client"] == _CLIENT_ID
assert "email" not in payload # omitted when empty
def test_payload_includes_email_when_provided(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key, email="test@example.com")
payload_b64 = token.split(".")[1]
import base64
padded = payload_b64 + "=" * (4 - len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(padded))
assert payload["email"] == "test@example.com"
def test_payload_uses_custom_audience(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key, audience="custom.broker.net")
payload_b64 = token.split(".")[1]
import base64
padded = payload_b64 + "=" * (4 - len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(padded))
assert payload["aud"] == "custom.broker.net"
def test_signature_is_valid_hex(self):
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
sig_hex = token.split(".")[2]
sig_bytes = bytes.fromhex(sig_hex)
assert len(sig_bytes) == 64
def test_signature_verifies(self):
"""Verify the JWT signature using nacl.bindings.crypto_sign_open."""
private_key, public_key = _make_test_keys()
token = _generate_jwt_token(private_key, public_key)
parts = token.split(".")
signing_input = f"{parts[0]}.{parts[1]}".encode()
signature = bytes.fromhex(parts[2])
# crypto_sign_open expects signature + message concatenated
signed_message = signature + signing_input
# This will raise if the signature is invalid
verified = nacl.bindings.crypto_sign_open(signed_message, public_key)
assert verified == signing_input
class TestEddsaSignExpanded:
def test_produces_64_byte_signature(self):
private_key, public_key = _make_test_keys()
message = b"test message"
sig = _ed25519_sign_expanded(message, private_key[:32], private_key[32:], public_key)
assert len(sig) == 64
def test_signature_verifies_with_nacl(self):
private_key, public_key = _make_test_keys()
message = b"hello world"
sig = _ed25519_sign_expanded(message, private_key[:32], private_key[32:], public_key)
signed_message = sig + message
verified = nacl.bindings.crypto_sign_open(signed_message, public_key)
assert verified == message
def test_different_messages_produce_different_signatures(self):
private_key, public_key = _make_test_keys()
sig1 = _ed25519_sign_expanded(b"msg1", private_key[:32], private_key[32:], public_key)
sig2 = _ed25519_sign_expanded(b"msg2", private_key[:32], private_key[32:], public_key)
assert sig1 != sig2
class TestPacketFormatConversion:
def test_basic_field_mapping(self):
data = {
"id": 1,
"observation_id": 100,
"timestamp": 1700000000,
"data": "0a1b2c3d",
"payload_type": "ADVERT",
"snr": 5.5,
"rssi": -90,
"decrypted": False,
"decrypted_info": None,
}
result = _format_raw_packet(data, "TestNode", "AABBCCDD" * 8)
assert result["origin"] == "TestNode"
assert result["origin_id"] == "AABBCCDD" * 8
assert result["raw"] == "0A1B2C3D"
assert result["SNR"] == "5.5"
assert result["RSSI"] == "-90"
assert result["type"] == "PACKET"
assert result["direction"] == "rx"
assert result["len"] == "4"
def test_timestamp_is_iso8601(self):
data = {"timestamp": 1700000000, "data": "00", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["timestamp"]
assert "T" in result["timestamp"]
def test_snr_rssi_unknown_when_none(self):
data = {"timestamp": 0, "data": "00", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["SNR"] == "Unknown"
assert result["RSSI"] == "Unknown"
def test_packet_type_extraction(self):
# Header 0x14 = type 5, route 0 (TRANSPORT_FLOOD): header + 4 transport + path_len.
data = {"timestamp": 0, "data": "140102030400", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["packet_type"] == "5"
assert result["route"] == "F"
def test_route_mapping(self):
# Test all 4 route types (matches meshcore-packet-capture)
# TRANSPORT_FLOOD=0 -> "F", FLOOD=1 -> "F", DIRECT=2 -> "D", TRANSPORT_DIRECT=3 -> "T"
samples = [
("000102030400", "F"), # TRANSPORT_FLOOD: header + transport + path_len
("0100", "F"), # FLOOD: header + path_len
("0200", "D"), # DIRECT: header + path_len
("030102030400", "T"), # TRANSPORT_DIRECT: header + transport + path_len
]
for raw_hex, expected in samples:
data = {"timestamp": 0, "data": raw_hex, "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["route"] == expected
def test_hash_is_16_uppercase_hex_chars(self):
data = {"timestamp": 0, "data": "aabb", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert len(result["hash"]) == 16
assert result["hash"] == result["hash"].upper()
def test_empty_data_handled(self):
data = {"timestamp": 0, "data": "", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["raw"] == ""
assert result["len"] == "0"
assert result["packet_type"] == "0"
assert result["route"] == "U"
def test_includes_reference_time_fields(self):
data = {"timestamp": 0, "data": "0100aabb", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["time"]
assert result["date"]
assert result["payload_len"] == "2"
def test_adds_path_for_direct_route(self):
# route=2 (DIRECT), path_len=2, path=aa bb, payload=cc
data = {"timestamp": 0, "data": "0202AABBCC", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["route"] == "D"
assert result["path"] == "aa,bb"
def test_direct_route_includes_empty_path_field(self):
data = {"timestamp": 0, "data": "0200", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["route"] == "D"
assert "path" in result
assert result["path"] == ""
def test_unknown_version_uses_defaults(self):
# version=1 in high bits, type=5, route=1
header = (1 << 6) | (5 << 2) | 1
data = {"timestamp": 0, "data": f"{header:02x}00", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["packet_type"] == "0"
assert result["route"] == "U"
assert result["payload_len"] == "0"
class TestCalculatePacketHash:
def test_empty_bytes_returns_zeroes(self):
result = _calculate_packet_hash(b"")
assert result == "0" * 16
def test_returns_16_uppercase_hex_chars(self):
# Simple flood packet: header(1) + path_len(1) + payload
raw = bytes([0x01, 0x00, 0xAA, 0xBB]) # FLOOD, no path, payload=0xAABB
result = _calculate_packet_hash(raw)
assert len(result) == 16
assert result == result.upper()
def test_flood_packet_hash(self):
"""FLOOD route (0x01): no transport codes, header + path_len + payload."""
import hashlib
# Header 0x11 = route=FLOOD(1), payload_type=4(ADVERT): (4<<2)|1 = 0x11
payload = b"\xde\xad"
raw = bytes([0x11, 0x00]) + payload # header, path_len=0, payload
result = _calculate_packet_hash(raw)
# Expected: sha256(payload_type_byte + payload_data)[:16].upper()
expected = hashlib.sha256(bytes([4]) + payload).hexdigest()[:16].upper()
assert result == expected
def test_transport_flood_skips_transport_codes(self):
"""TRANSPORT_FLOOD (0x00): has 4 bytes of transport codes after header."""
import hashlib
# Header 0x10 = route=TRANSPORT_FLOOD(0), payload_type=4: (4<<2)|0 = 0x10
transport_codes = b"\x01\x02\x03\x04"
payload = b"\xca\xfe"
raw = bytes([0x10]) + transport_codes + bytes([0x00]) + payload
result = _calculate_packet_hash(raw)
expected = hashlib.sha256(bytes([4]) + payload).hexdigest()[:16].upper()
assert result == expected
def test_transport_direct_skips_transport_codes(self):
"""TRANSPORT_DIRECT (0x03): also has 4 bytes of transport codes."""
import hashlib
# Header 0x13 = route=TRANSPORT_DIRECT(3), payload_type=4: (4<<2)|3 = 0x13
transport_codes = b"\x05\x06\x07\x08"
payload = b"\xbe\xef"
raw = bytes([0x13]) + transport_codes + bytes([0x00]) + payload
result = _calculate_packet_hash(raw)
expected = hashlib.sha256(bytes([4]) + payload).hexdigest()[:16].upper()
assert result == expected
def test_trace_packet_includes_path_len_in_hash(self):
"""TRACE packets (type 9) include path_len as uint16_t LE in the hash."""
import hashlib
# Header for TRACE with FLOOD route: (9<<2)|1 = 0x25
path_len = 3
path_data = b"\xaa\xbb\xcc"
payload = b"\x01\x02"
raw = bytes([0x25, path_len]) + path_data + payload
result = _calculate_packet_hash(raw)
expected_hash = (
hashlib.sha256(bytes([9]) + path_len.to_bytes(2, byteorder="little") + payload)
.hexdigest()[:16]
.upper()
)
assert result == expected_hash
def test_with_path_data(self):
"""Packet with non-zero path_len should skip path bytes to reach payload."""
import hashlib
# FLOOD route, payload_type=2 (TXT_MSG): (2<<2)|1 = 0x09
path_data = b"\xaa\xbb" # 2 bytes of path
payload = b"\x48\x65\x6c\x6c\x6f" # "Hello"
raw = bytes([0x09, 0x02]) + path_data + payload
result = _calculate_packet_hash(raw)
expected = hashlib.sha256(bytes([2]) + payload).hexdigest()[:16].upper()
assert result == expected
def test_truncated_packet_returns_zeroes(self):
# Header says TRANSPORT_FLOOD, but missing path_len at required offset.
raw = bytes([0x10, 0x01, 0x02])
assert _calculate_packet_hash(raw) == "0" * 16
class TestCommunityMqttPublisher:
def test_initial_state(self):
pub = CommunityMqttPublisher()
assert pub.connected is False
assert pub._client is None
assert pub._task is None
@pytest.mark.asyncio
async def test_publish_drops_when_disconnected(self):
pub = CommunityMqttPublisher()
# Should not raise
await pub.publish("topic", {"key": "value"})
@pytest.mark.asyncio
async def test_stop_resets_state(self):
pub = CommunityMqttPublisher()
pub.connected = True
pub._client = MagicMock()
await pub.stop()
assert pub.connected is False
assert pub._client is None
def test_is_configured_false_when_disabled(self):
pub = CommunityMqttPublisher()
pub._settings = AppSettings(community_mqtt_enabled=False)
with patch("app.keystore.has_private_key", return_value=True):
assert pub._is_configured() is False
def test_is_configured_false_when_no_private_key(self):
pub = CommunityMqttPublisher()
pub._settings = AppSettings(community_mqtt_enabled=True)
with patch("app.keystore.has_private_key", return_value=False):
assert pub._is_configured() is False
def test_is_configured_true_when_enabled_with_key(self):
pub = CommunityMqttPublisher()
pub._settings = AppSettings(community_mqtt_enabled=True)
with patch("app.keystore.has_private_key", return_value=True):
assert pub._is_configured() is True
class TestPublishFailureSetsDisconnected:
@pytest.mark.asyncio
async def test_publish_error_sets_connected_false(self):
"""A publish error should set connected=False so the loop can detect it."""
pub = CommunityMqttPublisher()
pub.connected = True
mock_client = MagicMock()
mock_client.publish = MagicMock(side_effect=Exception("broker gone"))
pub._client = mock_client
await pub.publish("topic", {"data": "test"})
assert pub.connected is False
class TestBuildStatusTopic:
def test_builds_correct_topic(self):
settings = AppSettings(community_mqtt_iata="LAX")
topic = _build_status_topic(settings, "AABB1122")
assert topic == "meshcore/LAX/AABB1122/status"
def test_iata_uppercased_and_stripped(self):
settings = AppSettings(community_mqtt_iata=" lax ")
topic = _build_status_topic(settings, "PUBKEY")
assert topic == "meshcore/LAX/PUBKEY/status"
class TestLwtAndStatusPublish:
def test_build_client_kwargs_includes_will(self):
"""_build_client_kwargs should return a will with offline status."""
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = AppSettings(
community_mqtt_enabled=True,
community_mqtt_iata="SFO",
)
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "TestNode"}
with (
patch("app.keystore.get_private_key", return_value=private_key),
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
):
kwargs = pub._build_client_kwargs(settings)
assert "will" in kwargs
will = kwargs["will"]
assert will.topic == f"meshcore/SFO/{pubkey_hex}/status"
assert will.retain is True
payload = json.loads(will.payload)
assert payload["status"] == "offline"
assert payload["origin"] == "TestNode"
assert payload["origin_id"] == pubkey_hex
assert "timestamp" in payload
assert "client" not in payload
@pytest.mark.asyncio
async def test_on_connected_async_publishes_online_status(self):
"""_on_connected_async should publish a retained online status with enriched fields."""
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = AppSettings(
community_mqtt_enabled=True,
community_mqtt_iata="LAX",
)
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "TestNode"}
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "T-Deck", "firmware_version": "v2.2.2 (Build: 2025-01-15)"},
),
patch.object(
pub, "_fetch_stats", new_callable=AsyncMock, return_value={"battery_mv": 4200}
),
patch("app.community_mqtt._build_radio_info", return_value="915.0,250.0,10,8"),
patch("app.community_mqtt._get_client_version", return_value="RemoteTerm 2.4.0"),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._on_connected_async(settings)
mock_publish.assert_called_once()
topic = mock_publish.call_args[0][0]
payload = mock_publish.call_args[0][1]
retain = mock_publish.call_args[1]["retain"]
assert topic == f"meshcore/LAX/{pubkey_hex}/status"
assert retain is True
assert payload["status"] == "online"
assert payload["origin"] == "TestNode"
assert payload["origin_id"] == pubkey_hex
assert "client" not in payload
assert "timestamp" in payload
assert payload["model"] == "T-Deck"
assert payload["firmware_version"] == "v2.2.2 (Build: 2025-01-15)"
assert payload["radio"] == "915.0,250.0,10,8"
assert payload["client_version"] == "RemoteTerm 2.4.0"
assert payload["stats"] == {"battery_mv": 4200}
def test_lwt_and_online_share_same_topic(self):
"""LWT and on-connect status should use the same topic path."""
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = AppSettings(
community_mqtt_enabled=True,
community_mqtt_iata="JFK",
)
mock_radio = MagicMock()
mock_radio.meshcore = None
with (
patch("app.keystore.get_private_key", return_value=private_key),
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
):
kwargs = pub._build_client_kwargs(settings)
lwt_topic = kwargs["will"].topic
expected_topic = _build_status_topic(settings, pubkey_hex)
assert lwt_topic == expected_topic
@pytest.mark.asyncio
async def test_on_connected_async_skips_when_no_public_key(self):
"""_on_connected_async should no-op when public key is unavailable."""
pub = CommunityMqttPublisher()
settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
with (
patch("app.keystore.get_public_key", return_value=None),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._on_connected_async(settings)
mock_publish.assert_not_called()
@pytest.mark.asyncio
async def test_on_connected_async_uses_fallback_device_name(self):
"""Should use 'MeshCore Device' when radio name is unavailable."""
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
mock_radio = MagicMock()
mock_radio.meshcore = None
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "unknown", "firmware_version": "unknown"},
),
patch.object(pub, "_fetch_stats", new_callable=AsyncMock, return_value=None),
patch("app.community_mqtt._build_radio_info", return_value="0,0,0,0"),
patch("app.community_mqtt._get_client_version", return_value="RemoteTerm unknown"),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._on_connected_async(settings)
payload = mock_publish.call_args[0][1]
assert payload["origin"] == "MeshCore Device"
def _mock_radio_operation(mc_mock):
"""Create a mock async context manager for radio_operation."""
@asynccontextmanager
async def _op(*args, **kwargs):
yield mc_mock
return _op
class TestFetchDeviceInfo:
@pytest.mark.asyncio
async def test_success_fw_ver_3(self):
"""Should extract model and firmware_version from DEVICE_INFO with fw ver >= 3."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.send_device_query = AsyncMock(
return_value=Event(
EventType.DEVICE_INFO,
{"fw ver": 3, "model": "T-Deck", "ver": "2.2.2", "fw_build": "2025-01-15"},
)
)
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_device_info()
assert result["model"] == "T-Deck"
assert result["firmware_version"] == "v2.2.2 (Build: 2025-01-15)"
# Should be cached
assert pub._cached_device_info == result
@pytest.mark.asyncio
async def test_fw_ver_below_3_caches_old_version(self):
"""Should cache old firmware version string when fw ver < 3."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.send_device_query = AsyncMock(
return_value=Event(EventType.DEVICE_INFO, {"fw ver": 2})
)
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_device_info()
assert result["model"] == "unknown"
assert result["firmware_version"] == "v2"
# Should be cached (firmware doesn't change mid-connection)
assert pub._cached_device_info == result
@pytest.mark.asyncio
async def test_error_returns_fallback_not_cached(self):
"""Should return unknowns when device query returns ERROR, without caching."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.send_device_query = AsyncMock(return_value=Event(EventType.ERROR, {}))
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_device_info()
assert result["model"] == "unknown"
assert result["firmware_version"] == "unknown"
# Should NOT be cached — allows retry on next status publish
assert pub._cached_device_info is None
@pytest.mark.asyncio
async def test_radio_busy_returns_fallback_not_cached(self):
"""Should return unknowns when radio is busy, without caching."""
from app.radio import RadioOperationBusyError
pub = CommunityMqttPublisher()
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = MagicMock(side_effect=RadioOperationBusyError("busy"))
result = await pub._fetch_device_info()
assert result["model"] == "unknown"
assert result["firmware_version"] == "unknown"
# Should NOT be cached — allows retry when radio becomes available
assert pub._cached_device_info is None
@pytest.mark.asyncio
async def test_cached_result_returned_on_second_call(self):
"""Should return cached result without re-querying the radio."""
pub = CommunityMqttPublisher()
pub._cached_device_info = {"model": "T-Deck", "firmware_version": "v2.2.2"}
# No radio mock needed — should return cached
result = await pub._fetch_device_info()
assert result["model"] == "T-Deck"
@pytest.mark.asyncio
async def test_no_fw_build_omits_build_suffix(self):
"""When fw_build is empty, firmware_version should just be 'vX.Y.Z'."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.send_device_query = AsyncMock(
return_value=Event(
EventType.DEVICE_INFO,
{"fw ver": 3, "model": "Heltec", "ver": "1.0.0", "fw_build": ""},
)
)
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_device_info()
assert result["firmware_version"] == "v1.0.0"
class TestFetchStats:
@pytest.mark.asyncio
async def test_success_merges_core_and_radio(self):
"""Should merge STATS_CORE and STATS_RADIO payloads."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.get_stats_core = AsyncMock(
return_value=Event(
EventType.STATS_CORE,
{"battery_mv": 4200, "uptime_secs": 3600, "errors": 0, "queue_len": 0},
)
)
mc_mock.commands.get_stats_radio = AsyncMock(
return_value=Event(
EventType.STATS_RADIO,
{
"noise_floor": -120,
"last_rssi": -85,
"last_snr": 10.5,
"tx_air_secs": 42,
"rx_air_secs": 150,
},
)
)
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_stats()
assert result is not None
assert result["battery_mv"] == 4200
assert result["noise_floor"] == -120
assert result["tx_air_secs"] == 42
@pytest.mark.asyncio
async def test_core_error_sets_stats_unsupported(self):
"""Should set _stats_supported=False when STATS_CORE returns ERROR."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.get_stats_core = AsyncMock(return_value=Event(EventType.ERROR, {}))
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
result = await pub._fetch_stats()
assert pub._stats_supported is False
assert result is None # no cached stats yet
@pytest.mark.asyncio
async def test_radio_error_sets_stats_unsupported(self):
"""Should set _stats_supported=False when STATS_RADIO returns ERROR."""
from meshcore.events import Event, EventType
pub = CommunityMqttPublisher()
mc_mock = MagicMock()
mc_mock.commands.get_stats_core = AsyncMock(
return_value=Event(
EventType.STATS_CORE,
{"battery_mv": 4200, "uptime_secs": 3600, "errors": 0, "queue_len": 0},
)
)
mc_mock.commands.get_stats_radio = AsyncMock(return_value=Event(EventType.ERROR, {}))
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = _mock_radio_operation(mc_mock)
await pub._fetch_stats()
assert pub._stats_supported is False
@pytest.mark.asyncio
async def test_stats_unsupported_skips_radio(self):
"""When _stats_supported=False, should return cached stats without radio call."""
pub = CommunityMqttPublisher()
pub._stats_supported = False
pub._cached_stats = {"battery_mv": 4000}
result = await pub._fetch_stats()
assert result == {"battery_mv": 4000}
@pytest.mark.asyncio
async def test_cache_guard_prevents_refetch(self):
"""Should return cached stats when within cache window."""
pub = CommunityMqttPublisher()
pub._cached_stats = {"battery_mv": 4200}
pub._last_stats_fetch = time.monotonic() # Just fetched
result = await pub._fetch_stats()
assert result == {"battery_mv": 4200}
@pytest.mark.asyncio
async def test_radio_busy_returns_cached(self):
"""Should return cached stats when radio is busy."""
from app.radio import RadioOperationBusyError
pub = CommunityMqttPublisher()
pub._cached_stats = {"battery_mv": 3900}
with patch("app.radio.radio_manager") as mock_rm:
mock_rm.radio_operation = MagicMock(side_effect=RadioOperationBusyError("busy"))
result = await pub._fetch_stats()
assert result == {"battery_mv": 3900}
class TestBuildRadioInfo:
def test_formatted_string(self):
"""Should return comma-separated radio info matching reference format."""
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {
"radio_freq": 915.0,
"radio_bw": 250.0,
"radio_sf": 10,
"radio_cr": 8,
}
with patch("app.radio.radio_manager", mock_radio):
result = _build_radio_info()
assert result == "915.0,250.0,10,8"
def test_fallback_when_no_meshcore(self):
"""Should return '0,0,0,0' when meshcore is None."""
mock_radio = MagicMock()
mock_radio.meshcore = None
with patch("app.radio.radio_manager", mock_radio):
result = _build_radio_info()
assert result == "0,0,0,0"
def test_fallback_when_self_info_missing_fields(self):
"""Should use 0 defaults when self_info lacks radio fields."""
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "TestNode"}
with patch("app.radio.radio_manager", mock_radio):
result = _build_radio_info()
assert result == "0,0,0,0"
class TestGetClientVersion:
def test_returns_remoteterm_prefix(self):
"""Should return 'RemoteTerm ...' string."""
result = _get_client_version()
assert result.startswith("RemoteTerm ")
def test_returns_version_from_metadata(self):
"""Should use importlib.metadata to get version."""
with patch("app.community_mqtt.importlib.metadata.version", return_value="1.2.3"):
result = _get_client_version()
assert result == "RemoteTerm 1.2.3"
def test_fallback_on_error(self):
"""Should return 'RemoteTerm unknown' if metadata lookup fails."""
with patch(
"app.community_mqtt.importlib.metadata.version", side_effect=Exception("not found")
):
result = _get_client_version()
assert result == "RemoteTerm unknown"
class TestPublishStatus:
@pytest.mark.asyncio
async def test_enriched_payload_fields(self):
"""_publish_status should include all enriched fields."""
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
pubkey_hex = public_key.hex().upper()
settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "TestNode"}
stats = {"battery_mv": 4200, "uptime_secs": 3600, "noise_floor": -120}
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "T-Deck", "firmware_version": "v2.2.2 (Build: 2025-01-15)"},
),
patch.object(pub, "_fetch_stats", new_callable=AsyncMock, return_value=stats),
patch("app.community_mqtt._build_radio_info", return_value="915.0,250.0,10,8"),
patch("app.community_mqtt._get_client_version", return_value="RemoteTerm 2.4.0"),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._publish_status(settings)
payload = mock_publish.call_args[0][1]
assert payload["status"] == "online"
assert payload["origin"] == "TestNode"
assert payload["origin_id"] == pubkey_hex
assert "client" not in payload
assert payload["model"] == "T-Deck"
assert payload["firmware_version"] == "v2.2.2 (Build: 2025-01-15)"
assert payload["radio"] == "915.0,250.0,10,8"
assert payload["client_version"] == "RemoteTerm 2.4.0"
assert payload["stats"] == stats
@pytest.mark.asyncio
async def test_stats_omitted_when_none(self):
"""Should not include 'stats' key when stats are None."""
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
mock_radio = MagicMock()
mock_radio.meshcore = None
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "unknown", "firmware_version": "unknown"},
),
patch.object(pub, "_fetch_stats", new_callable=AsyncMock, return_value=None),
patch("app.community_mqtt._build_radio_info", return_value="0,0,0,0"),
patch("app.community_mqtt._get_client_version", return_value="RemoteTerm unknown"),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._publish_status(settings)
payload = mock_publish.call_args[0][1]
assert "stats" not in payload
@pytest.mark.asyncio
async def test_updates_last_status_publish(self):
"""Should update _last_status_publish after publishing."""
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
mock_radio = MagicMock()
mock_radio.meshcore = None
before = time.monotonic()
with (
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager", mock_radio),
patch.object(
pub,
"_fetch_device_info",
new_callable=AsyncMock,
return_value={"model": "unknown", "firmware_version": "unknown"},
),
patch.object(pub, "_fetch_stats", new_callable=AsyncMock, return_value=None),
patch("app.community_mqtt._build_radio_info", return_value="0,0,0,0"),
patch("app.community_mqtt._get_client_version", return_value="RemoteTerm unknown"),
patch.object(pub, "publish", new_callable=AsyncMock),
):
await pub._publish_status(settings)
assert pub._last_status_publish >= before
@pytest.mark.asyncio
async def test_no_publish_key_returns_none(self):
"""Should skip publish when public key is unavailable."""
pub = CommunityMqttPublisher()
settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
with (
patch("app.keystore.get_public_key", return_value=None),
patch.object(pub, "publish", new_callable=AsyncMock) as mock_publish,
):
await pub._publish_status(settings)
mock_publish.assert_not_called()
class TestPeriodicWake:
@pytest.mark.asyncio
async def test_skips_before_interval(self):
"""Should not republish before _STATS_REFRESH_INTERVAL."""
pub = CommunityMqttPublisher()
pub._settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
pub._last_status_publish = time.monotonic() # Just published
with patch.object(pub, "_publish_status", new_callable=AsyncMock) as mock_ps:
await pub._on_periodic_wake(60.0)
mock_ps.assert_not_called()
@pytest.mark.asyncio
async def test_publishes_after_interval(self):
"""Should republish after _STATS_REFRESH_INTERVAL elapsed."""
pub = CommunityMqttPublisher()
pub._settings = AppSettings(community_mqtt_enabled=True, community_mqtt_iata="LAX")
pub._last_status_publish = time.monotonic() - _STATS_REFRESH_INTERVAL - 1
with patch.object(pub, "_publish_status", new_callable=AsyncMock) as mock_ps:
await pub._on_periodic_wake(360.0)
mock_ps.assert_called_once_with(pub._settings, refresh_stats=True)
@pytest.mark.asyncio
async def test_skips_when_no_settings(self):
"""Should no-op when settings are None."""
pub = CommunityMqttPublisher()
pub._settings = None
with patch.object(pub, "_publish_status", new_callable=AsyncMock) as mock_ps:
await pub._on_periodic_wake(360.0)
mock_ps.assert_not_called()