Drain before autofetch, fix same-second collisions, and always mc.disconnect() on false/probe failure

This commit is contained in:
Jack Kingsman
2026-02-23 17:33:35 -08:00
parent 619973bdf0
commit cba9e20698
5 changed files with 427 additions and 16 deletions

View File

@@ -70,6 +70,7 @@ def detect_serial_devices() -> list[str]:
async def test_serial_device(port: str, baudrate: int, timeout: float = 3.0) -> bool:
"""Test if a MeshCore radio responds on the given serial port."""
mc = None
try:
logger.debug("Testing serial device %s", port)
mc = await asyncio.wait_for(
@@ -80,10 +81,8 @@ async def test_serial_device(port: str, baudrate: int, timeout: float = 3.0) ->
# Check if we got valid self_info (indicates successful communication)
if mc.is_connected and mc.self_info:
logger.debug("Device %s responded with valid self_info", port)
await mc.disconnect()
return True
await mc.disconnect()
return False
except asyncio.TimeoutError:
logger.debug("Device %s timed out", port)
@@ -91,6 +90,12 @@ async def test_serial_device(port: str, baudrate: int, timeout: float = 3.0) ->
except Exception as e:
logger.debug("Device %s failed: %s", port, e)
return False
finally:
if mc is not None:
try:
await mc.disconnect()
except Exception:
pass
async def find_radio_port(baudrate: int) -> str | None:
@@ -251,14 +256,16 @@ class RadioManager:
# Start periodic advertisement (idempotent)
start_periodic_advert()
await self._meshcore.start_auto_message_fetching()
logger.info("Auto message fetching started")
# Drain any messages that were queued before we connected
# Drain any messages that were queued before we connected.
# This must happen BEFORE starting auto-fetch, otherwise both
# compete on get_msg() with interleaved radio I/O.
drained = await drain_pending_messages()
if drained > 0:
logger.info("Drained %d pending message(s)", drained)
await self._meshcore.start_auto_message_fetching()
logger.info("Auto message fetching started")
# Start periodic message polling as fallback (idempotent)
start_message_polling()
finally:

View File

@@ -50,7 +50,11 @@ function mergePendingAck(
// Generate a key for deduplicating messages by content
export function getMessageContentKey(msg: Message): string {
return `${msg.type}-${msg.conversation_key}-${msg.text}-${msg.sender_timestamp}`;
// When sender_timestamp exists, dedup by content (catches radio-path duplicates with different IDs).
// When null, include msg.id so each message gets a unique key — avoids silently dropping
// different messages that share the same text and received_at second.
const ts = msg.sender_timestamp ?? `r${msg.received_at}-${msg.id}`;
return `${msg.type}-${msg.conversation_key}-${msg.text}-${ts}`;
}
export interface UseConversationMessagesResult {

View File

@@ -86,12 +86,13 @@ describe('getMessageContentKey', () => {
expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2));
});
it('handles null sender_timestamp', () => {
it('handles null sender_timestamp by falling back to received_at and id', () => {
const msg = createMessage({ sender_timestamp: null });
const key = getMessageContentKey(msg);
expect(key).toBe('CHAN-channel123-Hello world-null');
// Falls back to `r${received_at}-${id}` when sender_timestamp is null
expect(key).toBe('CHAN-channel123-Hello world-r1700000001-1');
});
it('handles empty text', () => {
@@ -109,6 +110,54 @@ describe('getMessageContentKey', () => {
expect(key).toContain('Hello: World! @user #channel');
});
it('null-timestamp messages with different received_at produce different keys', () => {
const msg1 = createMessage({ sender_timestamp: null, received_at: 1700000001 });
const msg2 = createMessage({ sender_timestamp: null, received_at: 1700000002 });
expect(getMessageContentKey(msg1)).not.toBe(getMessageContentKey(msg2));
});
it('null-timestamp key does not collide with numeric timestamp key', () => {
// A message with sender_timestamp=null and received_at=123
// should not match a message with sender_timestamp that looks similar
const nullTsMsg = createMessage({ sender_timestamp: null, received_at: 123 });
const numericTsMsg = createMessage({ sender_timestamp: 123 });
expect(getMessageContentKey(nullTsMsg)).not.toBe(getMessageContentKey(numericTsMsg));
});
it('same text and null timestamp but different conversations produce different keys', () => {
const msg1 = createMessage({
sender_timestamp: null,
conversation_key: 'chan1',
received_at: 1700000001,
});
const msg2 = createMessage({
sender_timestamp: null,
conversation_key: 'chan2',
received_at: 1700000001,
});
expect(getMessageContentKey(msg1)).not.toBe(getMessageContentKey(msg2));
});
it('null-timestamp messages with same text and same received_at but different ids produce different keys', () => {
// This is the key fix: two genuinely different messages arriving in the same second
// with null sender_timestamp must NOT collide, even if text is identical
const msg1 = createMessage({ id: 10, sender_timestamp: null, received_at: 1700000001 });
const msg2 = createMessage({ id: 11, sender_timestamp: null, received_at: 1700000001 });
expect(getMessageContentKey(msg1)).not.toBe(getMessageContentKey(msg2));
});
it('null-timestamp messages with same id produce same key (true duplicates dedup)', () => {
// Same message arriving via WS + API fetch has the same id — should still dedup
const msg1 = createMessage({ id: 42, sender_timestamp: null, received_at: 1700000001 });
const msg2 = createMessage({ id: 42, sender_timestamp: null, received_at: 1700000001 });
expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2));
});
});
describe('updateMessageAck logic', () => {

View File

@@ -1,7 +1,5 @@
"""Tests for RadioManager multi-transport connect dispatch.
These tests verify that connect() routes to the correct transport method
based on settings.connection_type, and that connection_info is set correctly.
"""Tests for RadioManager multi-transport connect dispatch, serial device
testing, and post-connect setup ordering.
"""
import asyncio
@@ -219,8 +217,232 @@ class TestConnectionMonitor:
# Should report connection lost, but not report healthy until setup succeeds.
mock_broadcast_health.assert_any_call(False, "Serial: /dev/ttyUSB0")
healthy_calls = [
call for call in mock_broadcast_health.call_args_list if call.args[0] is True
]
healthy_calls = [c for c in mock_broadcast_health.call_args_list if c.args[0] is True]
assert healthy_calls == []
assert rm._last_connected is False
class TestSerialDeviceProbe:
"""Tests for test_serial_device() — verifies cleanup on all exit paths."""
@pytest.mark.asyncio
async def test_success_returns_true_and_disconnects(self):
"""Successful probe returns True and always disconnects."""
from app.radio import test_serial_device
mock_mc = MagicMock()
mock_mc.is_connected = True
mock_mc.self_info = {"name": "MyNode"}
mock_mc.disconnect = AsyncMock()
with patch("app.radio.MeshCore") as mock_meshcore:
mock_meshcore.create_serial = AsyncMock(return_value=mock_mc)
result = await test_serial_device("/dev/ttyUSB0", 115200)
assert result is True
mock_mc.disconnect.assert_awaited_once()
@pytest.mark.asyncio
async def test_not_connected_returns_false_and_disconnects(self):
"""Device that connects but reports is_connected=False still disconnects."""
from app.radio import test_serial_device
mock_mc = MagicMock()
mock_mc.is_connected = False
mock_mc.self_info = None
mock_mc.disconnect = AsyncMock()
with patch("app.radio.MeshCore") as mock_meshcore:
mock_meshcore.create_serial = AsyncMock(return_value=mock_mc)
result = await test_serial_device("/dev/ttyUSB0", 115200)
assert result is False
mock_mc.disconnect.assert_awaited_once()
@pytest.mark.asyncio
async def test_no_self_info_returns_false_and_disconnects(self):
"""Connected but no self_info returns False; still disconnects."""
from app.radio import test_serial_device
mock_mc = MagicMock()
mock_mc.is_connected = True
mock_mc.self_info = None
mock_mc.disconnect = AsyncMock()
with patch("app.radio.MeshCore") as mock_meshcore:
mock_meshcore.create_serial = AsyncMock(return_value=mock_mc)
result = await test_serial_device("/dev/ttyUSB0", 115200)
assert result is False
mock_mc.disconnect.assert_awaited_once()
@pytest.mark.asyncio
async def test_timeout_returns_false_no_disconnect_needed(self):
"""asyncio.TimeoutError before create_serial completes — mc is None, no disconnect."""
from app.radio import test_serial_device
with patch("app.radio.MeshCore") as mock_meshcore:
mock_meshcore.create_serial = AsyncMock(side_effect=asyncio.TimeoutError)
result = await test_serial_device("/dev/ttyUSB0", 115200, timeout=0.1)
assert result is False
@pytest.mark.asyncio
async def test_exception_returns_false_and_disconnects(self):
"""If create_serial succeeds but subsequent code raises, disconnect still runs."""
from app.radio import test_serial_device
mock_mc = MagicMock()
# Accessing is_connected raises (simulates corrupted state)
type(mock_mc).is_connected = property(lambda self: (_ for _ in ()).throw(OSError("oops")))
mock_mc.disconnect = AsyncMock()
with patch("app.radio.MeshCore") as mock_meshcore:
mock_meshcore.create_serial = AsyncMock(return_value=mock_mc)
result = await test_serial_device("/dev/ttyUSB0", 115200)
assert result is False
mock_mc.disconnect.assert_awaited_once()
@pytest.mark.asyncio
async def test_disconnect_exception_is_swallowed(self):
"""If disconnect() itself raises, the exception does not propagate."""
from app.radio import test_serial_device
mock_mc = MagicMock()
mock_mc.is_connected = True
mock_mc.self_info = {"name": "MyNode"}
mock_mc.disconnect = AsyncMock(side_effect=OSError("port closed"))
with patch("app.radio.MeshCore") as mock_meshcore:
mock_meshcore.create_serial = AsyncMock(return_value=mock_mc)
result = await test_serial_device("/dev/ttyUSB0", 115200)
# Should still return True despite disconnect failure
assert result is True
mock_mc.disconnect.assert_awaited_once()
class TestPostConnectSetupOrdering:
"""Tests for post_connect_setup() — verifies drain-before-auto-fetch ordering."""
@pytest.mark.asyncio
async def test_drain_runs_before_auto_fetch(self):
"""drain_pending_messages must be called BEFORE start_auto_message_fetching."""
from app.radio import RadioManager
rm = RadioManager()
mock_mc = MagicMock()
mock_mc.start_auto_message_fetching = AsyncMock()
rm._meshcore = mock_mc
call_order = []
async def mock_drain():
call_order.append("drain")
return 0
async def mock_start_auto():
call_order.append("auto_fetch")
mock_mc.start_auto_message_fetching = AsyncMock(side_effect=mock_start_auto)
with (
patch("app.event_handlers.register_event_handlers"),
patch("app.keystore.export_and_store_private_key", new_callable=AsyncMock),
patch("app.radio_sync.sync_radio_time", new_callable=AsyncMock),
patch("app.radio_sync.sync_and_offload_all", new_callable=AsyncMock, return_value={}),
patch("app.radio_sync.start_periodic_sync"),
patch("app.radio_sync.send_advertisement", new_callable=AsyncMock, return_value=False),
patch("app.radio_sync.start_periodic_advert"),
patch(
"app.radio_sync.drain_pending_messages",
new_callable=AsyncMock,
side_effect=mock_drain,
),
patch("app.radio_sync.start_message_polling"),
):
await rm.post_connect_setup()
assert call_order == ["drain", "auto_fetch"], (
f"Expected drain before auto_fetch, got: {call_order}"
)
@pytest.mark.asyncio
async def test_setup_sets_and_clears_in_progress_flag(self):
"""is_setup_in_progress is True during setup and False after."""
from app.radio import RadioManager
rm = RadioManager()
mock_mc = MagicMock()
mock_mc.start_auto_message_fetching = AsyncMock()
rm._meshcore = mock_mc
observed_during = None
async def mock_drain():
nonlocal observed_during
observed_during = rm.is_setup_in_progress
return 0
with (
patch("app.event_handlers.register_event_handlers"),
patch("app.keystore.export_and_store_private_key", new_callable=AsyncMock),
patch("app.radio_sync.sync_radio_time", new_callable=AsyncMock),
patch("app.radio_sync.sync_and_offload_all", new_callable=AsyncMock, return_value={}),
patch("app.radio_sync.start_periodic_sync"),
patch("app.radio_sync.send_advertisement", new_callable=AsyncMock, return_value=False),
patch("app.radio_sync.start_periodic_advert"),
patch(
"app.radio_sync.drain_pending_messages",
new_callable=AsyncMock,
side_effect=mock_drain,
),
patch("app.radio_sync.start_message_polling"),
):
await rm.post_connect_setup()
assert observed_during is True
assert rm.is_setup_in_progress is False
@pytest.mark.asyncio
async def test_setup_clears_in_progress_flag_on_failure(self):
"""is_setup_in_progress is cleared even if setup raises."""
from app.radio import RadioManager
rm = RadioManager()
mock_mc = MagicMock()
mock_mc.start_auto_message_fetching = AsyncMock()
rm._meshcore = mock_mc
with (
patch("app.event_handlers.register_event_handlers"),
patch("app.keystore.export_and_store_private_key", new_callable=AsyncMock),
patch(
"app.radio_sync.sync_radio_time",
new_callable=AsyncMock,
side_effect=RuntimeError("clock failed"),
),
):
with pytest.raises(RuntimeError, match="clock failed"):
await rm.post_connect_setup()
assert rm.is_setup_in_progress is False
@pytest.mark.asyncio
async def test_setup_noop_when_no_meshcore(self):
"""post_connect_setup does nothing when meshcore is None."""
from app.radio import RadioManager
rm = RadioManager()
rm._meshcore = None
# Should not raise or call any functions
await rm.post_connect_setup()
assert rm.is_setup_in_progress is False

View File

@@ -346,6 +346,135 @@ class TestTelemetryRoute:
mc.stop_auto_message_fetching.assert_awaited_once()
mc.start_auto_message_fetching.assert_awaited_once()
@pytest.mark.asyncio
async def test_full_success_with_neighbors_acl_and_clock(self, test_db):
"""Full telemetry success: status, neighbors (name-resolved), ACL (with perm names), clock."""
mc = _mock_mc()
# Insert the repeater itself
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
# Insert a known neighbor so name resolution works
neighbor_key = "bb" * 32
await _insert_contact(neighbor_key, name="NeighborNode", contact_type=1)
mc.commands.req_status_sync = AsyncMock(
return_value={
"pubkey_pre": KEY_A[:12],
"bat": 4200,
"uptime": 86400,
"tx_queue_len": 2,
"noise_floor": -120,
"last_rssi": -85,
"last_snr": 7.5,
"nb_recv": 1000,
"nb_sent": 500,
"airtime": 3600,
"rx_airtime": 7200,
"sent_flood": 100,
"sent_direct": 400,
"recv_flood": 300,
"recv_direct": 700,
"flood_dups": 10,
"direct_dups": 5,
"full_evts": 0,
}
)
mc.commands.fetch_all_neighbours = AsyncMock(
return_value={
"neighbours": [
{"pubkey": neighbor_key[:12], "snr": 9.0, "secs_ago": 5},
{"pubkey": "cccccccccccc", "snr": 3.0, "secs_ago": 120},
]
}
)
mc.commands.req_acl_sync = AsyncMock(
return_value=[
{"key": neighbor_key[:12], "perm": 3},
{"key": "dddddddddddd", "perm": 0},
]
)
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
mc.commands.get_msg = AsyncMock(
return_value=_radio_result(
EventType.CONTACT_MSG_RECV,
{
"pubkey_prefix": KEY_A[:12],
"text": "2026-02-23 12:00:00 UTC",
"txt_type": 1,
},
)
)
with (
patch("app.routers.contacts.require_connected", return_value=mc),
patch(
"app.routers.contacts.prepare_repeater_connection",
new_callable=AsyncMock,
),
patch(_MONOTONIC, side_effect=_advancing_clock()),
):
response = await request_telemetry(KEY_A, TelemetryRequest(password="pw"))
# Status fields
assert response.pubkey_prefix == KEY_A[:12]
assert response.battery_volts == 4.2
assert response.uptime_seconds == 86400
assert response.packets_received == 1000
assert response.packets_sent == 500
assert response.noise_floor_dbm == -120
assert response.last_rssi_dbm == -85
assert response.last_snr_db == 7.5
# Neighbors — first resolved by name, second unknown
assert len(response.neighbors) == 2
assert response.neighbors[0].name == "NeighborNode"
assert response.neighbors[0].snr == 9.0
assert response.neighbors[1].name is None
assert response.neighbors[1].last_heard_seconds == 120
# ACL — first resolved, permission names mapped
assert len(response.acl) == 2
assert response.acl[0].name == "NeighborNode"
assert response.acl[0].permission_name == "Admin"
assert response.acl[1].name is None
assert response.acl[1].permission_name == "Guest"
# Clock
assert response.clock_output == "2026-02-23 12:00:00 UTC"
@pytest.mark.asyncio
async def test_empty_neighbors_and_acl(self, test_db):
"""Telemetry with empty neighbor list and ACL still succeeds."""
mc = _mock_mc()
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
mc.commands.req_status_sync = AsyncMock(
return_value={"pubkey_pre": KEY_A[:12], "bat": 3700, "uptime": 100}
)
mc.commands.fetch_all_neighbours = AsyncMock(return_value={"neighbours": []})
mc.commands.req_acl_sync = AsyncMock(return_value=[])
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
mc.commands.get_msg = AsyncMock(
return_value=_radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": KEY_A[:12], "text": "12:00", "txt_type": 1},
)
)
with (
patch("app.routers.contacts.require_connected", return_value=mc),
patch(
"app.routers.contacts.prepare_repeater_connection",
new_callable=AsyncMock,
),
patch(_MONOTONIC, side_effect=_advancing_clock()),
):
response = await request_telemetry(KEY_A, TelemetryRequest(password="pw"))
assert response.battery_volts == 3.7
assert response.neighbors == []
assert response.acl == []
assert response.clock_output == "12:00"
class TestRepeaterCommandRoute:
@pytest.mark.asyncio