From 88d5a76081fde5a5583db9457e7afb7cfe661c7b Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Mon, 23 Feb 2026 15:56:45 -0800 Subject: [PATCH] Better behavior and message tracking around repeater contact on a busy mesh --- app/routers/contacts.py | 179 +++++++++++++------- tests/test_repeater_routes.py | 307 ++++++++++++++++++++++++++++++++-- 2 files changed, 418 insertions(+), 68 deletions(-) diff --git a/app/routers/contacts.py b/app/routers/contacts.py index 5a0a439..2a374f7 100644 --- a/app/routers/contacts.py +++ b/app/routers/contacts.py @@ -1,6 +1,8 @@ import asyncio import logging import random +import time +from typing import TYPE_CHECKING from fastapi import APIRouter, BackgroundTasks, HTTPException, Query from meshcore import EventType @@ -22,6 +24,9 @@ from app.packet_processor import start_historical_dm_decryption from app.radio import radio_manager from app.repository import AmbiguousPublicKeyPrefixError, ContactRepository, MessageRepository +if TYPE_CHECKING: + from meshcore.events import Event + logger = logging.getLogger(__name__) # ACL permission level names @@ -37,6 +42,86 @@ router = APIRouter(prefix="/contacts", tags=["contacts"]) REPEATER_OP_DELAY_SECONDS = 2.0 +def _monotonic() -> float: + """Wrapper around time.monotonic() for testability. + + Patching time.monotonic directly breaks the asyncio event loop which also + uses it. This indirection allows tests to control the clock safely. + """ + return time.monotonic() + + +async def _fetch_repeater_response( + mc, + target_pubkey_prefix: str, + timeout: float = 20.0, +) -> "Event | None": + """Fetch a CLI response from a specific repeater via a validated get_msg() loop. + + Calls get_msg() repeatedly until a matching CLI response (txt_type=1) from the + target repeater arrives or the wall-clock deadline expires. Unrelated messages + are safe to skip — meshcore's event dispatcher already delivers them to the + normal subscription handlers (on_contact_message, etc.) when get_msg() returns. + + Args: + mc: MeshCore instance + target_pubkey_prefix: 12-char hex prefix of the repeater's public key + timeout: Wall-clock seconds before giving up + + Returns: + The matching Event, or None if no response arrived before the deadline. + """ + deadline = _monotonic() + timeout + + while _monotonic() < deadline: + try: + result = await mc.commands.get_msg(timeout=2.0) + except asyncio.TimeoutError: + continue + except Exception as e: + logger.debug("get_msg() exception: %s", e) + await asyncio.sleep(1.0) + continue + + if result.type == EventType.NO_MORE_MSGS: + # No messages queued yet — wait and retry + await asyncio.sleep(1.0) + continue + + if result.type == EventType.ERROR: + logger.debug("get_msg() error: %s", result.payload) + await asyncio.sleep(1.0) + continue + + if result.type == EventType.CONTACT_MSG_RECV: + msg_prefix = result.payload.get("pubkey_prefix", "") + txt_type = result.payload.get("txt_type", 0) + if msg_prefix == target_pubkey_prefix and txt_type == 1: + return result + # Not our target — already dispatched to subscribers by meshcore, + # so just continue draining the queue. + logger.debug( + "Skipping non-target message (from=%s, txt_type=%d) while waiting for %s", + msg_prefix, + txt_type, + target_pubkey_prefix, + ) + continue + + if result.type == EventType.CHANNEL_MSG_RECV: + # Already dispatched to subscribers by meshcore; skip. + logger.debug( + "Skipping channel message (channel_idx=%s) during repeater fetch", + result.payload.get("channel_idx"), + ) + continue + + logger.debug("Unexpected event type %s during repeater fetch, skipping", result.type) + + logger.warning("No CLI response from repeater %s within %.1fs", target_pubkey_prefix, timeout) + return None + + def _ambiguous_contact_detail(err: AmbiguousPublicKeyPrefixError) -> str: sample = ", ".join(key[:12] for key in err.matches[:2]) return ( @@ -402,30 +487,22 @@ async def request_telemetry(public_key: str, request: TelemetryRequest) -> Telem clock_output: str | None = None for attempt in range(1, 3): logger.debug("Clock request attempt %d/2", attempt) - try: - send_result = await mc.commands.send_cmd(contact.public_key, "clock") - if send_result.type == EventType.ERROR: - logger.debug("Clock command send error: %s", send_result.payload) - continue - - # Wait for response - wait_result = await mc.wait_for_event(EventType.MESSAGES_WAITING, timeout=5.0) - if wait_result is None: - logger.debug("Clock request timeout, retrying...") - continue - - response_event = await mc.commands.get_msg() - if response_event.type == EventType.ERROR: - logger.debug("Clock get_msg error: %s", response_event.payload) - continue - - clock_output = response_event.payload.get("text", "") - logger.info("Received clock output: %s", clock_output) - break - except Exception as e: - logger.debug("Clock request exception: %s", e) + send_result = await mc.commands.send_cmd(contact.public_key, "clock") + if send_result.type == EventType.ERROR: + logger.debug("Clock command send error: %s", send_result.payload) continue + response_event = await _fetch_repeater_response( + mc, contact.public_key[:12], timeout=10.0 + ) + if response_event is None: + logger.debug("Clock request timeout, retrying...") + continue + + clock_output = response_event.payload.get("text", "") + logger.info("Received clock output: %s", clock_output) + break + if clock_output is None: clock_output = "Unable to fetch `clock` output (repeater did not respond)" @@ -507,48 +584,34 @@ async def send_repeater_command(public_key: str, request: CommandRequest) -> Com status_code=500, detail=f"Failed to send command: {send_result.payload}" ) - # Wait for response (MESSAGES_WAITING event, then get_msg) - try: - wait_result = await mc.wait_for_event(EventType.MESSAGES_WAITING, timeout=10.0) + # Wait for response using validated fetch loop + response_event = await _fetch_repeater_response(mc, contact.public_key[:12]) - if wait_result is None: - # Timeout - no response received - logger.warning( - "No response from repeater %s for command: %s", - contact.public_key[:12], - request.command, - ) - return CommandResponse( - command=request.command, - response="(no response - command may have been processed)", - ) - - response_event = await mc.commands.get_msg() - - if response_event.type == EventType.ERROR: - return CommandResponse( - command=request.command, response=f"(error: {response_event.payload})" - ) - - # CONTACT_MSG_RECV payloads use sender_timestamp in meshcore. - response_text = response_event.payload.get("text", str(response_event.payload)) - sender_timestamp = response_event.payload.get( - "sender_timestamp", - response_event.payload.get("timestamp"), + if response_event is None: + logger.warning( + "No response from repeater %s for command: %s", + contact.public_key[:12], + request.command, ) - logger.info("Received response from %s: %s", contact.public_key[:12], response_text) - return CommandResponse( command=request.command, - response=response_text, - sender_timestamp=sender_timestamp, - ) - except Exception as e: - logger.error("Error waiting for response: %s", e) - return CommandResponse( - command=request.command, response=f"(error waiting for response: {e})" + response="(no response - command may have been processed)", ) + # CONTACT_MSG_RECV payloads use sender_timestamp in meshcore. + response_text = response_event.payload.get("text", str(response_event.payload)) + sender_timestamp = response_event.payload.get( + "sender_timestamp", + response_event.payload.get("timestamp"), + ) + logger.info("Received response from %s: %s", contact.public_key[:12], response_text) + + return CommandResponse( + command=request.command, + response=response_text, + sender_timestamp=sender_timestamp, + ) + @router.post("/{public_key}/trace", response_model=TraceResponse) async def request_trace(public_key: str) -> TraceResponse: diff --git a/tests/test_repeater_routes.py b/tests/test_repeater_routes.py index e4db4be..09f29dd 100644 --- a/tests/test_repeater_routes.py +++ b/tests/test_repeater_routes.py @@ -9,10 +9,19 @@ from meshcore import EventType from app.database import Database from app.models import CommandRequest, TelemetryRequest from app.repository import ContactRepository -from app.routers.contacts import request_telemetry, request_trace, send_repeater_command +from app.routers.contacts import ( + _fetch_repeater_response, + request_telemetry, + request_trace, + send_repeater_command, +) KEY_A = "aa" * 32 +# Patch target for the wall-clock wrapper used by _fetch_repeater_response. +# We patch _monotonic (not time.monotonic) to avoid breaking the asyncio event loop. +_MONOTONIC = "app.routers.contacts._monotonic" + @pytest.fixture async def test_db(): @@ -75,6 +84,181 @@ def _mock_mc(): return mc +def _advancing_clock(start=0.0, step=0.1): + """Return a callable for _monotonic that advances by `step` each call.""" + t = start + + def _tick(): + nonlocal t + val = t + t += step + return val + + return _tick + + +class TestFetchRepeaterResponse: + """Tests for the _fetch_repeater_response helper.""" + + @pytest.mark.asyncio + async def test_returns_matching_cli_response(self): + mc = _mock_mc() + mc.commands.get_msg = AsyncMock( + return_value=_radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1}, + ) + ) + + with patch(_MONOTONIC, side_effect=_advancing_clock()): + result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + + assert result is not None + assert result.payload["text"] == "ok" + mc.commands.get_msg.assert_awaited_once() + + @pytest.mark.asyncio + async def test_rejects_same_sender_non_cli_message(self): + """A txt_type=0 message from the target repeater is NOT accepted as the CLI response.""" + mc = _mock_mc() + non_cli = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "aaaaaaaaaaaa", "text": "chat msg", "txt_type": 0}, + ) + cli_response = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "aaaaaaaaaaaa", "text": "ver 1.0", "txt_type": 1}, + ) + mc.commands.get_msg = AsyncMock(side_effect=[non_cli, cli_response]) + + with patch(_MONOTONIC, side_effect=_advancing_clock()): + result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + + assert result is not None + assert result.payload["text"] == "ver 1.0" + assert mc.commands.get_msg.await_count == 2 + + @pytest.mark.asyncio + async def test_unrelated_dm_is_skipped(self): + """Unrelated DMs are skipped (dispatcher already handled them).""" + mc = _mock_mc() + unrelated = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "bbbbbbbbbbbb", "text": "hello", "txt_type": 0}, + ) + expected = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "aaaaaaaaaaaa", "text": "ver 1.0", "txt_type": 1}, + ) + mc.commands.get_msg = AsyncMock(side_effect=[unrelated, expected]) + + with patch(_MONOTONIC, side_effect=_advancing_clock()): + result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + + assert result is not None + assert result.payload["text"] == "ver 1.0" + + @pytest.mark.asyncio + async def test_channel_message_is_skipped(self): + mc = _mock_mc() + channel_msg = _radio_result( + EventType.CHANNEL_MSG_RECV, + {"channel_idx": 0, "text": "flood msg"}, + ) + expected = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1}, + ) + mc.commands.get_msg = AsyncMock(side_effect=[channel_msg, expected]) + + with patch(_MONOTONIC, side_effect=_advancing_clock()): + result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + + assert result is not None + assert result.payload["text"] == "ok" + + @pytest.mark.asyncio + async def test_no_more_msgs_retries_then_succeeds(self): + mc = _mock_mc() + no_msgs = _radio_result(EventType.NO_MORE_MSGS) + expected = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1}, + ) + mc.commands.get_msg = AsyncMock(side_effect=[no_msgs, expected]) + + with ( + patch(_MONOTONIC, side_effect=_advancing_clock()), + patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock), + ): + result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + + assert result is not None + assert result.payload["text"] == "ok" + assert mc.commands.get_msg.await_count == 2 + + @pytest.mark.asyncio + async def test_returns_none_after_deadline(self): + """Returns None when wall-clock deadline expires.""" + mc = _mock_mc() + mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS)) + + # Start at 100.0, jump past deadline (timeout=2.0) after 2 get_msg calls + times = iter([100.0, 100.5, 101.0, 103.0]) + + with ( + patch(_MONOTONIC, side_effect=times), + patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock), + ): + result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=2.0) + + assert result is None + + @pytest.mark.asyncio + async def test_error_retries_then_succeeds(self): + mc = _mock_mc() + error = _radio_result(EventType.ERROR, {"err": "busy"}) + expected = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1}, + ) + mc.commands.get_msg = AsyncMock(side_effect=[error, expected]) + + with ( + patch(_MONOTONIC, side_effect=_advancing_clock()), + patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock), + ): + result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + + assert result is not None + assert result.payload["text"] == "ok" + + @pytest.mark.asyncio + async def test_high_traffic_does_not_exhaust_budget(self): + """Many unrelated messages don't prevent eventual success (wall-clock deadline).""" + mc = _mock_mc() + # 20 unrelated DMs followed by the expected CLI response + unrelated = [ + _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": f"{i:012x}", "text": f"msg {i}", "txt_type": 0}, + ) + for i in range(20) + ] + expected = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "aaaaaaaaaaaa", "text": "ver 1.0", "txt_type": 1}, + ) + mc.commands.get_msg = AsyncMock(side_effect=[*unrelated, expected]) + + with patch(_MONOTONIC, side_effect=_advancing_clock()): + result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=30.0) + + assert result is not None + assert result.payload["text"] == "ver 1.0" + assert mc.commands.get_msg.await_count == 21 + + class TestTelemetryRoute: @pytest.mark.asyncio async def test_returns_404_when_contact_missing(self, test_db): @@ -133,7 +317,15 @@ class TestTelemetryRoute: ) mc.commands.req_acl_sync = AsyncMock(return_value=[{"key": "def456abc123", "perm": 2}]) mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK)) - mc.wait_for_event = AsyncMock(side_effect=[None, None]) # two clock attempts, no response + # Clock fetch uses _fetch_repeater_response which calls get_msg() directly. + # Return NO_MORE_MSGS to simulate no clock response. + mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS)) + + # Clock is attempted twice, each with timeout=10.0. Provide enough ticks + # for the deadline to expire on each attempt. + clock_ticks = [] + for base in (0.0, 100.0): + clock_ticks.extend([base, base + 5.0, base + 11.0]) with ( patch("app.routers.contacts.require_connected", return_value=mc), @@ -141,6 +333,8 @@ class TestTelemetryRoute: "app.routers.contacts.prepare_repeater_connection", new_callable=AsyncMock, ) as mock_prepare, + patch(_MONOTONIC, side_effect=clock_ticks), + patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock), ): response = await request_telemetry(KEY_A, TelemetryRequest(password="pw")) @@ -174,9 +368,14 @@ class TestRepeaterCommandRoute: mc = _mock_mc() await _insert_contact(KEY_A, name="Repeater", contact_type=2) mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK)) - mc.wait_for_event = AsyncMock(return_value=None) + mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS)) - with patch("app.routers.contacts.require_connected", return_value=mc): + # Expire the deadline after a couple of ticks + with ( + patch("app.routers.contacts.require_connected", return_value=mc), + patch(_MONOTONIC, side_effect=[0.0, 5.0, 25.0]), + patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock), + ): response = await send_repeater_command(KEY_A, CommandRequest(command="ver")) assert response.command == "ver" @@ -188,15 +387,22 @@ class TestRepeaterCommandRoute: mc = _mock_mc() await _insert_contact(KEY_A, name="Repeater", contact_type=2) mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK)) - mc.wait_for_event = AsyncMock(return_value=MagicMock()) mc.commands.get_msg = AsyncMock( return_value=_radio_result( EventType.CONTACT_MSG_RECV, - {"text": "firmware: v1.2.3", "sender_timestamp": 1700000000}, + { + "pubkey_prefix": KEY_A[:12], + "text": "firmware: v1.2.3", + "sender_timestamp": 1700000000, + "txt_type": 1, + }, ) ) - with patch("app.routers.contacts.require_connected", return_value=mc): + with ( + patch("app.routers.contacts.require_connected", return_value=mc), + patch(_MONOTONIC, side_effect=_advancing_clock()), + ): response = await send_repeater_command(KEY_A, CommandRequest(command="ver")) assert response.command == "ver" @@ -208,21 +414,102 @@ class TestRepeaterCommandRoute: mc = _mock_mc() await _insert_contact(KEY_A, name="Repeater", contact_type=2) mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK)) - mc.wait_for_event = AsyncMock(return_value=MagicMock()) mc.commands.get_msg = AsyncMock( return_value=_radio_result( EventType.CONTACT_MSG_RECV, - {"text": "firmware: v1.2.3", "timestamp": 1700000000}, + { + "pubkey_prefix": KEY_A[:12], + "text": "firmware: v1.2.3", + "timestamp": 1700000000, + "txt_type": 1, + }, ) ) - with patch("app.routers.contacts.require_connected", return_value=mc): + with ( + patch("app.routers.contacts.require_connected", return_value=mc), + patch(_MONOTONIC, side_effect=_advancing_clock()), + ): response = await send_repeater_command(KEY_A, CommandRequest(command="ver")) assert response.command == "ver" assert response.response == "firmware: v1.2.3" assert response.sender_timestamp == 1700000000 + @pytest.mark.asyncio + async def test_unrelated_dm_during_command_does_not_prevent_success(self, test_db): + """Unrelated DMs arriving during command wait are skipped; correct response returned.""" + mc = _mock_mc() + await _insert_contact(KEY_A, name="Repeater", contact_type=2) + mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK)) + + unrelated = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": "bbbbbbbbbbbb", "text": "hello from someone", "txt_type": 0}, + ) + expected = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": KEY_A[:12], "text": "ver 1.0", "txt_type": 1}, + ) + mc.commands.get_msg = AsyncMock(side_effect=[unrelated, expected]) + + with ( + patch("app.routers.contacts.require_connected", return_value=mc), + patch(_MONOTONIC, side_effect=_advancing_clock()), + ): + response = await send_repeater_command(KEY_A, CommandRequest(command="ver")) + + assert response.command == "ver" + assert response.response == "ver 1.0" + + @pytest.mark.asyncio + async def test_channel_message_during_command_is_skipped(self, test_db): + mc = _mock_mc() + await _insert_contact(KEY_A, name="Repeater", contact_type=2) + mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK)) + + channel_msg = _radio_result( + EventType.CHANNEL_MSG_RECV, + {"channel_idx": 0, "text": "flood msg"}, + ) + expected = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": KEY_A[:12], "text": "ok", "txt_type": 1}, + ) + mc.commands.get_msg = AsyncMock(side_effect=[channel_msg, expected]) + + with ( + patch("app.routers.contacts.require_connected", return_value=mc), + patch(_MONOTONIC, side_effect=_advancing_clock()), + ): + response = await send_repeater_command(KEY_A, CommandRequest(command="ver")) + + assert response.command == "ver" + assert response.response == "ok" + + @pytest.mark.asyncio + async def test_no_more_msgs_then_response_succeeds(self, test_db): + mc = _mock_mc() + await _insert_contact(KEY_A, name="Repeater", contact_type=2) + mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK)) + + no_msgs = _radio_result(EventType.NO_MORE_MSGS) + expected = _radio_result( + EventType.CONTACT_MSG_RECV, + {"pubkey_prefix": KEY_A[:12], "text": "done", "txt_type": 1}, + ) + mc.commands.get_msg = AsyncMock(side_effect=[no_msgs, expected]) + + with ( + patch("app.routers.contacts.require_connected", return_value=mc), + patch(_MONOTONIC, side_effect=_advancing_clock()), + patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock), + ): + response = await send_repeater_command(KEY_A, CommandRequest(command="ver")) + + assert response.command == "ver" + assert response.response == "done" + class TestTraceRoute: @pytest.mark.asyncio