Fix non-repeater traffic during repeater ops dropping messages

This commit is contained in:
Jack Kingsman
2026-03-30 21:13:25 -07:00
parent 26e8150092
commit 788d1cbdca
3 changed files with 58 additions and 94 deletions

View File

@@ -1,9 +1,6 @@
import asyncio
import logging
from typing import TYPE_CHECKING
from fastapi import APIRouter, HTTPException
from meshcore import EventType
from app.dependencies import require_connected
from app.models import (
@@ -28,7 +25,6 @@ from app.models import (
from app.repository import ContactRepository
from app.routers.contacts import _ensure_on_radio, _resolve_contact_or_404
from app.routers.server_control import (
_monotonic,
batch_cli_fetch,
extract_response_text,
prepare_authenticated_contact_connection,
@@ -37,9 +33,6 @@ from app.routers.server_control import (
)
from app.services.radio_runtime import radio_runtime as radio_manager
if TYPE_CHECKING:
from meshcore.events import Event
logger = logging.getLogger(__name__)
# ACL permission level names
@@ -57,58 +50,6 @@ def _extract_response_text(event) -> str:
return extract_response_text(event)
async def _fetch_repeater_response(
mc,
target_pubkey_prefix: str,
timeout: float = 20.0,
) -> "Event | None":
deadline = _monotonic() + timeout
while _monotonic() < deadline:
try:
result = await mc.commands.get_msg(timeout=2.0)
except asyncio.TimeoutError:
continue
except Exception as exc:
logger.debug("get_msg() exception: %s", exc)
await asyncio.sleep(1.0)
continue
if result.type == EventType.NO_MORE_MSGS:
await asyncio.sleep(1.0)
continue
if result.type == EventType.ERROR:
logger.debug("get_msg() error: %s", result.payload)
await asyncio.sleep(1.0)
continue
if result.type == EventType.CONTACT_MSG_RECV:
msg_prefix = result.payload.get("pubkey_prefix", "")
txt_type = result.payload.get("txt_type", 0)
if msg_prefix == target_pubkey_prefix and txt_type == 1:
return result
logger.debug(
"Skipping non-target message (from=%s, txt_type=%d) while waiting for %s",
msg_prefix,
txt_type,
target_pubkey_prefix,
)
continue
if result.type == EventType.CHANNEL_MSG_RECV:
logger.debug(
"Skipping channel message (channel_idx=%s) during repeater fetch",
result.payload.get("channel_idx"),
)
continue
logger.debug("Unexpected event type %s during repeater fetch, skipping", result.type)
logger.warning("No CLI response from repeater %s within %.1fs", target_pubkey_prefix, timeout)
return None
async def prepare_repeater_connection(mc, contact: Contact, password: str) -> RepeaterLoginResponse:
return await prepare_authenticated_contact_connection(
mc,

View File

@@ -13,6 +13,7 @@ from app.models import (
Contact,
RepeaterLoginResponse,
)
from app.radio_sync import _store_pending_channel_message, _store_pending_direct_message
from app.routers.contacts import _ensure_on_radio
from app.services.radio_runtime import radio_runtime as radio_manager
@@ -115,18 +116,20 @@ async def fetch_contact_cli_response(
if msg_prefix == target_pubkey_prefix and txt_type == 1:
return result
logger.debug(
"Skipping non-target message (from=%s, txt_type=%d) while waiting for %s",
"Storing non-target DM (from=%s, txt_type=%d) consumed while waiting for %s",
msg_prefix,
txt_type,
target_pubkey_prefix,
)
await _store_pending_direct_message(result)
continue
if result.type == EventType.CHANNEL_MSG_RECV:
logger.debug(
"Skipping channel message (channel_idx=%s) during CLI fetch",
"Storing channel message (channel_idx=%s) consumed during CLI fetch",
result.payload.get("channel_idx"),
)
await _store_pending_channel_message(mc, result.payload)
continue
logger.debug("Unexpected event type %s during CLI fetch, skipping", result.type)

View File

@@ -12,7 +12,6 @@ from app.repository import ContactRepository
from app.routers.contacts import request_trace
from app.routers.repeaters import (
_batch_cli_fetch,
_fetch_repeater_response,
prepare_repeater_connection,
repeater_acl,
repeater_advert_intervals,
@@ -25,12 +24,17 @@ from app.routers.repeaters import (
repeater_status,
send_repeater_command,
)
from app.routers.server_control import fetch_contact_cli_response
KEY_A = "aa" * 32
# Patch target for the wall-clock wrapper used by _fetch_repeater_response.
# Patch target for the wall-clock wrapper used by fetch_contact_cli_response.
# We patch _monotonic (not time.monotonic) to avoid breaking the asyncio event loop.
_MONOTONIC = "app.routers.repeaters._monotonic"
_MONOTONIC = "app.routers.server_control._monotonic"
# Patch targets for the store helpers called on consumed non-target messages.
_STORE_DM = "app.routers.server_control._store_pending_direct_message"
_STORE_CHAN = "app.routers.server_control._store_pending_channel_message"
@pytest.fixture(autouse=True)
@@ -104,8 +108,8 @@ def _advancing_clock(start=0.0, step=0.1):
return _tick
class TestFetchRepeaterResponse:
"""Tests for the _fetch_repeater_response helper."""
class TestFetchContactCliResponse:
"""Tests for the fetch_contact_cli_response helper."""
@pytest.mark.asyncio
async def test_returns_matching_cli_response(self):
@@ -118,7 +122,7 @@ class TestFetchRepeaterResponse:
)
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ok"
@@ -138,16 +142,20 @@ class TestFetchRepeaterResponse:
)
mc.commands.get_msg = AsyncMock(side_effect=[non_cli, cli_response])
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
with (
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch(_STORE_DM, new_callable=AsyncMock) as store_dm,
):
result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ver 1.0"
assert mc.commands.get_msg.await_count == 2
store_dm.assert_awaited_once_with(non_cli)
@pytest.mark.asyncio
async def test_unrelated_dm_is_skipped(self):
"""Unrelated DMs are skipped (dispatcher already handled them)."""
async def test_unrelated_dm_is_stored(self):
"""Unrelated DMs consumed during CLI fetch are stored, not discarded."""
mc = _mock_mc()
unrelated = _radio_result(
EventType.CONTACT_MSG_RECV,
@@ -159,14 +167,18 @@ class TestFetchRepeaterResponse:
)
mc.commands.get_msg = AsyncMock(side_effect=[unrelated, expected])
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
with (
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch(_STORE_DM, new_callable=AsyncMock) as store_dm,
):
result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ver 1.0"
store_dm.assert_awaited_once_with(unrelated)
@pytest.mark.asyncio
async def test_channel_message_is_skipped(self):
async def test_channel_message_is_stored(self):
mc = _mock_mc()
channel_msg = _radio_result(
EventType.CHANNEL_MSG_RECV,
@@ -178,11 +190,15 @@ class TestFetchRepeaterResponse:
)
mc.commands.get_msg = AsyncMock(side_effect=[channel_msg, expected])
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
with (
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch(_STORE_CHAN, new_callable=AsyncMock) as store_chan,
):
result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ok"
store_chan.assert_awaited_once_with(mc, channel_msg.payload)
@pytest.mark.asyncio
async def test_no_more_msgs_retries_then_succeeds(self):
@@ -196,9 +212,9 @@ class TestFetchRepeaterResponse:
with (
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ok"
@@ -215,9 +231,9 @@ class TestFetchRepeaterResponse:
with (
patch(_MONOTONIC, side_effect=times),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=2.0)
result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=2.0)
assert result is None
@@ -233,16 +249,16 @@ class TestFetchRepeaterResponse:
with (
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ok"
@pytest.mark.asyncio
async def test_high_traffic_does_not_exhaust_budget(self):
"""Many unrelated messages don't prevent eventual success (wall-clock deadline)."""
async def test_high_traffic_stores_all_consumed_messages(self):
"""Many unrelated messages are stored and don't prevent eventual success."""
mc = _mock_mc()
# 20 unrelated DMs followed by the expected CLI response
unrelated = [
@@ -258,12 +274,16 @@ class TestFetchRepeaterResponse:
)
mc.commands.get_msg = AsyncMock(side_effect=[*unrelated, expected])
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=30.0)
with (
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch(_STORE_DM, new_callable=AsyncMock) as store_dm,
):
result = await fetch_contact_cli_response(mc, "aaaaaaaaaaaa", timeout=30.0)
assert result is not None
assert result.payload["text"] == "ver 1.0"
assert mc.commands.get_msg.await_count == 21
assert store_dm.await_count == 20
class TestRepeaterCommandRoute:
@@ -297,7 +317,7 @@ class TestRepeaterCommandRoute:
patch("app.routers.repeaters.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch(_MONOTONIC, side_effect=[0.0, 5.0, 25.0]),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
@@ -457,7 +477,7 @@ class TestRepeaterCommandRoute:
patch("app.routers.repeaters.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
@@ -998,7 +1018,7 @@ class TestRepeaterRadioSettings:
patch("app.routers.repeaters.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch(_MONOTONIC, side_effect=clock_ticks),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
response = await repeater_radio_settings(KEY_A)
@@ -1073,7 +1093,7 @@ class TestRepeaterNodeInfo:
patch("app.routers.repeaters.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch(_MONOTONIC, side_effect=clock_ticks),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
response = await repeater_node_info(KEY_A)
@@ -1126,7 +1146,7 @@ class TestRepeaterAdvertIntervals:
patch("app.routers.repeaters.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch(_MONOTONIC, side_effect=clock_ticks),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
response = await repeater_advert_intervals(KEY_A)
@@ -1181,7 +1201,7 @@ class TestRepeaterOwnerInfo:
patch("app.routers.repeaters.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch(_MONOTONIC, side_effect=clock_ticks),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
response = await repeater_owner_info(KEY_A)
@@ -1239,7 +1259,7 @@ class TestBatchCliFetch:
with (
patch.object(radio_manager, "_meshcore", mc),
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
results = await _batch_cli_fetch(
contact, "test_op", [("bad_cmd", "field_a"), ("good_cmd", "field_b")]
@@ -1260,7 +1280,7 @@ class TestBatchCliFetch:
with (
patch.object(radio_manager, "_meshcore", mc),
patch(_MONOTONIC, side_effect=[0.0, 5.0, 11.0]),
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
patch("app.routers.server_control.asyncio.sleep", new_callable=AsyncMock),
):
results = await _batch_cli_fetch(contact, "test_op", [("clock", "clock_output")])