From 788d1cbdca0041d256f66595134ef7913dbe1cd1 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Mon, 30 Mar 2026 21:13:25 -0700 Subject: [PATCH] Fix non-repeater traffic during repeater ops dropping messages --- app/routers/repeaters.py | 59 ------------------------ app/routers/server_control.py | 7 ++- tests/test_repeater_routes.py | 86 +++++++++++++++++++++-------------- 3 files changed, 58 insertions(+), 94 deletions(-) diff --git a/app/routers/repeaters.py b/app/routers/repeaters.py index fe9744f..8def3a6 100644 --- a/app/routers/repeaters.py +++ b/app/routers/repeaters.py @@ -1,9 +1,6 @@ -import asyncio import logging -from typing import TYPE_CHECKING from fastapi import APIRouter, HTTPException -from meshcore import EventType from app.dependencies import require_connected from app.models import ( @@ -28,7 +25,6 @@ from app.models import ( from app.repository import ContactRepository from app.routers.contacts import _ensure_on_radio, _resolve_contact_or_404 from app.routers.server_control import ( - _monotonic, batch_cli_fetch, extract_response_text, prepare_authenticated_contact_connection, @@ -37,9 +33,6 @@ from app.routers.server_control import ( ) from app.services.radio_runtime import radio_runtime as radio_manager -if TYPE_CHECKING: - from meshcore.events import Event - logger = logging.getLogger(__name__) # ACL permission level names @@ -57,58 +50,6 @@ def _extract_response_text(event) -> str: return extract_response_text(event) -async def _fetch_repeater_response( - mc, - target_pubkey_prefix: str, - timeout: float = 20.0, -) -> "Event | None": - deadline = _monotonic() + timeout - - while _monotonic() < deadline: - try: - result = await mc.commands.get_msg(timeout=2.0) - except asyncio.TimeoutError: - continue - except Exception as exc: - logger.debug("get_msg() exception: %s", exc) - await asyncio.sleep(1.0) - continue - - if result.type == EventType.NO_MORE_MSGS: - await asyncio.sleep(1.0) - continue - - if result.type == EventType.ERROR: - logger.debug("get_msg() error: %s", result.payload) - await asyncio.sleep(1.0) - continue - - if result.type == EventType.CONTACT_MSG_RECV: - msg_prefix = result.payload.get("pubkey_prefix", "") - txt_type = result.payload.get("txt_type", 0) - if msg_prefix == target_pubkey_prefix and txt_type == 1: - return result - logger.debug( - "Skipping non-target message (from=%s, txt_type=%d) while waiting for %s", - msg_prefix, - txt_type, - target_pubkey_prefix, - ) - continue - - if result.type == EventType.CHANNEL_MSG_RECV: - logger.debug( - "Skipping channel message (channel_idx=%s) during repeater fetch", - result.payload.get("channel_idx"), - ) - continue - - logger.debug("Unexpected event type %s during repeater fetch, skipping", result.type) - - logger.warning("No CLI response from repeater %s within %.1fs", target_pubkey_prefix, timeout) - return None - - async def prepare_repeater_connection(mc, contact: Contact, password: str) -> RepeaterLoginResponse: return await prepare_authenticated_contact_connection( mc, diff --git a/app/routers/server_control.py b/app/routers/server_control.py index b3f9e9e..a13ffca 100644 --- a/app/routers/server_control.py +++ b/app/routers/server_control.py @@ -13,6 +13,7 @@ from app.models import ( Contact, RepeaterLoginResponse, ) +from app.radio_sync import _store_pending_channel_message, _store_pending_direct_message from app.routers.contacts import _ensure_on_radio from app.services.radio_runtime import radio_runtime as radio_manager @@ -115,18 +116,20 @@ async def fetch_contact_cli_response( if msg_prefix == target_pubkey_prefix and txt_type == 1: return result logger.debug( - "Skipping non-target message (from=%s, txt_type=%d) while waiting for %s", + "Storing non-target DM (from=%s, txt_type=%d) consumed while waiting for %s", msg_prefix, txt_type, target_pubkey_prefix, ) + await _store_pending_direct_message(result) continue if result.type == EventType.CHANNEL_MSG_RECV: logger.debug( - "Skipping channel message (channel_idx=%s) during CLI fetch", + "Storing channel message (channel_idx=%s) consumed during CLI fetch", result.payload.get("channel_idx"), ) + await _store_pending_channel_message(mc, result.payload) continue logger.debug("Unexpected event type %s during CLI fetch, skipping", result.type) diff --git a/tests/test_repeater_routes.py b/tests/test_repeater_routes.py index 8d27f64..cb633b1 100644 --- a/tests/test_repeater_routes.py +++ b/tests/test_repeater_routes.py @@ -12,7 +12,6 @@ from app.repository import ContactRepository from app.routers.contacts import request_trace from app.routers.repeaters import ( _batch_cli_fetch, - _fetch_repeater_response, prepare_repeater_connection, repeater_acl, repeater_advert_intervals, @@ -25,12 +24,17 @@ from app.routers.repeaters import ( repeater_status, send_repeater_command, ) +from app.routers.server_control import fetch_contact_cli_response KEY_A = "aa" * 32 -# Patch target for the wall-clock wrapper used by _fetch_repeater_response. +# Patch target for the wall-clock wrapper used by fetch_contact_cli_response. # We patch _monotonic (not time.monotonic) to avoid breaking the asyncio event loop. -_MONOTONIC = "app.routers.repeaters._monotonic" +_MONOTONIC = "app.routers.server_control._monotonic" + +# Patch targets for the store helpers called on consumed non-target messages. +_STORE_DM = "app.routers.server_control._store_pending_direct_message" +_STORE_CHAN = "app.routers.server_control._store_pending_channel_message" @pytest.fixture(autouse=True) @@ -104,8 +108,8 @@ def _advancing_clock(start=0.0, step=0.1): return _tick -class TestFetchRepeaterResponse: - """Tests for the _fetch_repeater_response helper.""" +class TestFetchContactCliResponse: + """Tests for the fetch_contact_cli_response helper.""" @pytest.mark.asyncio async def test_returns_matching_cli_response(self): @@ -118,7 +122,7 @@ class TestFetchRepeaterResponse: ) with patch(_MONOTONIC, side_effect=_advancing_clock()): - result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0) assert result is not None assert result.payload["text"] == "ok" @@ -138,16 +142,20 @@ class TestFetchRepeaterResponse: ) mc.commands.get_msg = AsyncMock(side_effect=[non_cli, cli_response]) - with patch(_MONOTONIC, side_effect=_advancing_clock()): - result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + with ( + patch(_MONOTONIC, side_effect=_advancing_clock()), + patch(_STORE_DM, new_callable=AsyncMock) as store_dm, + ): + result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0) assert result is not None assert result.payload["text"] == "ver 1.0" assert mc.commands.get_msg.await_count == 2 + store_dm.assert_awaited_once_with(non_cli) @pytest.mark.asyncio - async def test_unrelated_dm_is_skipped(self): - """Unrelated DMs are skipped (dispatcher already handled them).""" + async def test_unrelated_dm_is_stored(self): + """Unrelated DMs consumed during CLI fetch are stored, not discarded.""" mc = _mock_mc() unrelated = _radio_result( EventType.CONTACT_MSG_RECV, @@ -159,14 +167,18 @@ class TestFetchRepeaterResponse: ) mc.commands.get_msg = AsyncMock(side_effect=[unrelated, expected]) - with patch(_MONOTONIC, side_effect=_advancing_clock()): - result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + with ( + patch(_MONOTONIC, side_effect=_advancing_clock()), + patch(_STORE_DM, new_callable=AsyncMock) as store_dm, + ): + result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0) assert result is not None assert result.payload["text"] == "ver 1.0" + store_dm.assert_awaited_once_with(unrelated) @pytest.mark.asyncio - async def test_channel_message_is_skipped(self): + async def test_channel_message_is_stored(self): mc = _mock_mc() channel_msg = _radio_result( EventType.CHANNEL_MSG_RECV, @@ -178,11 +190,15 @@ class TestFetchRepeaterResponse: ) mc.commands.get_msg = AsyncMock(side_effect=[channel_msg, expected]) - with patch(_MONOTONIC, side_effect=_advancing_clock()): - result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + with ( + patch(_MONOTONIC, side_effect=_advancing_clock()), + patch(_STORE_CHAN, new_callable=AsyncMock) as store_chan, + ): + result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0) assert result is not None assert result.payload["text"] == "ok" + store_chan.assert_awaited_once_with(mc, channel_msg.payload) @pytest.mark.asyncio async def test_no_more_msgs_retries_then_succeeds(self): @@ -196,9 +212,9 @@ class TestFetchRepeaterResponse: with ( patch(_MONOTONIC, side_effect=_advancing_clock()), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): - result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0) assert result is not None assert result.payload["text"] == "ok" @@ -215,9 +231,9 @@ class TestFetchRepeaterResponse: with ( patch(_MONOTONIC, side_effect=times), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): - result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=2.0) + result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=2.0) assert result is None @@ -233,16 +249,16 @@ class TestFetchRepeaterResponse: with ( patch(_MONOTONIC, side_effect=_advancing_clock()), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): - result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0) + result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0) assert result is not None assert result.payload["text"] == "ok" @pytest.mark.asyncio - async def test_high_traffic_does_not_exhaust_budget(self): - """Many unrelated messages don't prevent eventual success (wall-clock deadline).""" + async def test_high_traffic_stores_all_consumed_messages(self): + """Many unrelated messages are stored and don't prevent eventual success.""" mc = _mock_mc() # 20 unrelated DMs followed by the expected CLI response unrelated = [ @@ -258,12 +274,16 @@ class TestFetchRepeaterResponse: ) mc.commands.get_msg = AsyncMock(side_effect=[*unrelated, expected]) - with patch(_MONOTONIC, side_effect=_advancing_clock()): - result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=30.0) + with ( + patch(_MONOTONIC, side_effect=_advancing_clock()), + patch(_STORE_DM, new_callable=AsyncMock) as store_dm, + ): + result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=30.0) assert result is not None assert result.payload["text"] == "ver 1.0" assert mc.commands.get_msg.await_count == 21 + assert store_dm.await_count == 20 class TestRepeaterCommandRoute: @@ -297,7 +317,7 @@ class TestRepeaterCommandRoute: patch("app.routers.repeaters.require_connected", return_value=mc), patch.object(radio_manager, "_meshcore", mc), patch(_MONOTONIC, side_effect=[0.0, 5.0, 25.0]), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): response = await send_repeater_command(KEY_A, CommandRequest(command="ver")) @@ -457,7 +477,7 @@ class TestRepeaterCommandRoute: patch("app.routers.repeaters.require_connected", return_value=mc), patch.object(radio_manager, "_meshcore", mc), patch(_MONOTONIC, side_effect=_advancing_clock()), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): response = await send_repeater_command(KEY_A, CommandRequest(command="ver")) @@ -998,7 +1018,7 @@ class TestRepeaterRadioSettings: patch("app.routers.repeaters.require_connected", return_value=mc), patch.object(radio_manager, "_meshcore", mc), patch(_MONOTONIC, side_effect=clock_ticks), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): response = await repeater_radio_settings(KEY_A) @@ -1073,7 +1093,7 @@ class TestRepeaterNodeInfo: patch("app.routers.repeaters.require_connected", return_value=mc), patch.object(radio_manager, "_meshcore", mc), patch(_MONOTONIC, side_effect=clock_ticks), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): response = await repeater_node_info(KEY_A) @@ -1126,7 +1146,7 @@ class TestRepeaterAdvertIntervals: patch("app.routers.repeaters.require_connected", return_value=mc), patch.object(radio_manager, "_meshcore", mc), patch(_MONOTONIC, side_effect=clock_ticks), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): response = await repeater_advert_intervals(KEY_A) @@ -1181,7 +1201,7 @@ class TestRepeaterOwnerInfo: patch("app.routers.repeaters.require_connected", return_value=mc), patch.object(radio_manager, "_meshcore", mc), patch(_MONOTONIC, side_effect=clock_ticks), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): response = await repeater_owner_info(KEY_A) @@ -1239,7 +1259,7 @@ class TestBatchCliFetch: with ( patch.object(radio_manager, "_meshcore", mc), patch(_MONOTONIC, side_effect=_advancing_clock()), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): results = await _batch_cli_fetch( contact, "test_op", [("bad_cmd", "field_a"), ("good_cmd", "field_b")] @@ -1260,7 +1280,7 @@ class TestBatchCliFetch: with ( patch.object(radio_manager, "_meshcore", mc), patch(_MONOTONIC, side_effect=[0.0, 5.0, 11.0]), - patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock), + patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock), ): results = await _batch_cli_fetch(contact, "test_op", [("clock", "clock_output")])