Better behavior and message tracking around repeater contact on a busy mesh

This commit is contained in:
Jack Kingsman
2026-02-23 15:56:45 -08:00
parent 9193d113fe
commit 88d5a76081
2 changed files with 418 additions and 68 deletions
+121 -58
View File
@@ -1,6 +1,8 @@
import asyncio
import logging
import random
import time
from typing import TYPE_CHECKING
from fastapi import APIRouter, BackgroundTasks, HTTPException, Query
from meshcore import EventType
@@ -22,6 +24,9 @@ from app.packet_processor import start_historical_dm_decryption
from app.radio import radio_manager
from app.repository import AmbiguousPublicKeyPrefixError, ContactRepository, MessageRepository
if TYPE_CHECKING:
from meshcore.events import Event
logger = logging.getLogger(__name__)
# ACL permission level names
@@ -37,6 +42,86 @@ router = APIRouter(prefix="/contacts", tags=["contacts"])
REPEATER_OP_DELAY_SECONDS = 2.0
def _monotonic() -> float:
"""Wrapper around time.monotonic() for testability.
Patching time.monotonic directly breaks the asyncio event loop which also
uses it. This indirection allows tests to control the clock safely.
"""
return time.monotonic()
async def _fetch_repeater_response(
mc,
target_pubkey_prefix: str,
timeout: float = 20.0,
) -> "Event | None":
"""Fetch a CLI response from a specific repeater via a validated get_msg() loop.
Calls get_msg() repeatedly until a matching CLI response (txt_type=1) from the
target repeater arrives or the wall-clock deadline expires. Unrelated messages
are safe to skip — meshcore's event dispatcher already delivers them to the
normal subscription handlers (on_contact_message, etc.) when get_msg() returns.
Args:
mc: MeshCore instance
target_pubkey_prefix: 12-char hex prefix of the repeater's public key
timeout: Wall-clock seconds before giving up
Returns:
The matching Event, or None if no response arrived before the deadline.
"""
deadline = _monotonic() + timeout
while _monotonic() < deadline:
try:
result = await mc.commands.get_msg(timeout=2.0)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.debug("get_msg() exception: %s", e)
await asyncio.sleep(1.0)
continue
if result.type == EventType.NO_MORE_MSGS:
# No messages queued yet — wait and retry
await asyncio.sleep(1.0)
continue
if result.type == EventType.ERROR:
logger.debug("get_msg() error: %s", result.payload)
await asyncio.sleep(1.0)
continue
if result.type == EventType.CONTACT_MSG_RECV:
msg_prefix = result.payload.get("pubkey_prefix", "")
txt_type = result.payload.get("txt_type", 0)
if msg_prefix == target_pubkey_prefix and txt_type == 1:
return result
# Not our target — already dispatched to subscribers by meshcore,
# so just continue draining the queue.
logger.debug(
"Skipping non-target message (from=%s, txt_type=%d) while waiting for %s",
msg_prefix,
txt_type,
target_pubkey_prefix,
)
continue
if result.type == EventType.CHANNEL_MSG_RECV:
# Already dispatched to subscribers by meshcore; skip.
logger.debug(
"Skipping channel message (channel_idx=%s) during repeater fetch",
result.payload.get("channel_idx"),
)
continue
logger.debug("Unexpected event type %s during repeater fetch, skipping", result.type)
logger.warning("No CLI response from repeater %s within %.1fs", target_pubkey_prefix, timeout)
return None
def _ambiguous_contact_detail(err: AmbiguousPublicKeyPrefixError) -> str:
sample = ", ".join(key[:12] for key in err.matches[:2])
return (
@@ -402,30 +487,22 @@ async def request_telemetry(public_key: str, request: TelemetryRequest) -> Telem
clock_output: str | None = None
for attempt in range(1, 3):
logger.debug("Clock request attempt %d/2", attempt)
try:
send_result = await mc.commands.send_cmd(contact.public_key, "clock")
if send_result.type == EventType.ERROR:
logger.debug("Clock command send error: %s", send_result.payload)
continue
# Wait for response
wait_result = await mc.wait_for_event(EventType.MESSAGES_WAITING, timeout=5.0)
if wait_result is None:
logger.debug("Clock request timeout, retrying...")
continue
response_event = await mc.commands.get_msg()
if response_event.type == EventType.ERROR:
logger.debug("Clock get_msg error: %s", response_event.payload)
continue
clock_output = response_event.payload.get("text", "")
logger.info("Received clock output: %s", clock_output)
break
except Exception as e:
logger.debug("Clock request exception: %s", e)
send_result = await mc.commands.send_cmd(contact.public_key, "clock")
if send_result.type == EventType.ERROR:
logger.debug("Clock command send error: %s", send_result.payload)
continue
response_event = await _fetch_repeater_response(
mc, contact.public_key[:12], timeout=10.0
)
if response_event is None:
logger.debug("Clock request timeout, retrying...")
continue
clock_output = response_event.payload.get("text", "")
logger.info("Received clock output: %s", clock_output)
break
if clock_output is None:
clock_output = "Unable to fetch `clock` output (repeater did not respond)"
@@ -507,48 +584,34 @@ async def send_repeater_command(public_key: str, request: CommandRequest) -> Com
status_code=500, detail=f"Failed to send command: {send_result.payload}"
)
# Wait for response (MESSAGES_WAITING event, then get_msg)
try:
wait_result = await mc.wait_for_event(EventType.MESSAGES_WAITING, timeout=10.0)
# Wait for response using validated fetch loop
response_event = await _fetch_repeater_response(mc, contact.public_key[:12])
if wait_result is None:
# Timeout - no response received
logger.warning(
"No response from repeater %s for command: %s",
contact.public_key[:12],
request.command,
)
return CommandResponse(
command=request.command,
response="(no response - command may have been processed)",
)
response_event = await mc.commands.get_msg()
if response_event.type == EventType.ERROR:
return CommandResponse(
command=request.command, response=f"(error: {response_event.payload})"
)
# CONTACT_MSG_RECV payloads use sender_timestamp in meshcore.
response_text = response_event.payload.get("text", str(response_event.payload))
sender_timestamp = response_event.payload.get(
"sender_timestamp",
response_event.payload.get("timestamp"),
if response_event is None:
logger.warning(
"No response from repeater %s for command: %s",
contact.public_key[:12],
request.command,
)
logger.info("Received response from %s: %s", contact.public_key[:12], response_text)
return CommandResponse(
command=request.command,
response=response_text,
sender_timestamp=sender_timestamp,
)
except Exception as e:
logger.error("Error waiting for response: %s", e)
return CommandResponse(
command=request.command, response=f"(error waiting for response: {e})"
response="(no response - command may have been processed)",
)
# CONTACT_MSG_RECV payloads use sender_timestamp in meshcore.
response_text = response_event.payload.get("text", str(response_event.payload))
sender_timestamp = response_event.payload.get(
"sender_timestamp",
response_event.payload.get("timestamp"),
)
logger.info("Received response from %s: %s", contact.public_key[:12], response_text)
return CommandResponse(
command=request.command,
response=response_text,
sender_timestamp=sender_timestamp,
)
@router.post("/{public_key}/trace", response_model=TraceResponse)
async def request_trace(public_key: str) -> TraceResponse:
+297 -10
View File
@@ -9,10 +9,19 @@ from meshcore import EventType
from app.database import Database
from app.models import CommandRequest, TelemetryRequest
from app.repository import ContactRepository
from app.routers.contacts import request_telemetry, request_trace, send_repeater_command
from app.routers.contacts import (
_fetch_repeater_response,
request_telemetry,
request_trace,
send_repeater_command,
)
KEY_A = "aa" * 32
# Patch target for the wall-clock wrapper used by _fetch_repeater_response.
# We patch _monotonic (not time.monotonic) to avoid breaking the asyncio event loop.
_MONOTONIC = "app.routers.contacts._monotonic"
@pytest.fixture
async def test_db():
@@ -75,6 +84,181 @@ def _mock_mc():
return mc
def _advancing_clock(start=0.0, step=0.1):
"""Return a callable for _monotonic that advances by `step` each call."""
t = start
def _tick():
nonlocal t
val = t
t += step
return val
return _tick
class TestFetchRepeaterResponse:
"""Tests for the _fetch_repeater_response helper."""
@pytest.mark.asyncio
async def test_returns_matching_cli_response(self):
mc = _mock_mc()
mc.commands.get_msg = AsyncMock(
return_value=_radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1},
)
)
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ok"
mc.commands.get_msg.assert_awaited_once()
@pytest.mark.asyncio
async def test_rejects_same_sender_non_cli_message(self):
"""A txt_type=0 message from the target repeater is NOT accepted as the CLI response."""
mc = _mock_mc()
non_cli = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "chat msg", "txt_type": 0},
)
cli_response = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ver 1.0", "txt_type": 1},
)
mc.commands.get_msg = AsyncMock(side_effect=[non_cli, cli_response])
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ver 1.0"
assert mc.commands.get_msg.await_count == 2
@pytest.mark.asyncio
async def test_unrelated_dm_is_skipped(self):
"""Unrelated DMs are skipped (dispatcher already handled them)."""
mc = _mock_mc()
unrelated = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "bbbbbbbbbbbb", "text": "hello", "txt_type": 0},
)
expected = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ver 1.0", "txt_type": 1},
)
mc.commands.get_msg = AsyncMock(side_effect=[unrelated, expected])
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ver 1.0"
@pytest.mark.asyncio
async def test_channel_message_is_skipped(self):
mc = _mock_mc()
channel_msg = _radio_result(
EventType.CHANNEL_MSG_RECV,
{"channel_idx": 0, "text": "flood msg"},
)
expected = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1},
)
mc.commands.get_msg = AsyncMock(side_effect=[channel_msg, expected])
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ok"
@pytest.mark.asyncio
async def test_no_more_msgs_retries_then_succeeds(self):
mc = _mock_mc()
no_msgs = _radio_result(EventType.NO_MORE_MSGS)
expected = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1},
)
mc.commands.get_msg = AsyncMock(side_effect=[no_msgs, expected])
with (
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock),
):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ok"
assert mc.commands.get_msg.await_count == 2
@pytest.mark.asyncio
async def test_returns_none_after_deadline(self):
"""Returns None when wall-clock deadline expires."""
mc = _mock_mc()
mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS))
# Start at 100.0, jump past deadline (timeout=2.0) after 2 get_msg calls
times = iter([100.0, 100.5, 101.0, 103.0])
with (
patch(_MONOTONIC, side_effect=times),
patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock),
):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=2.0)
assert result is None
@pytest.mark.asyncio
async def test_error_retries_then_succeeds(self):
mc = _mock_mc()
error = _radio_result(EventType.ERROR, {"err": "busy"})
expected = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1},
)
mc.commands.get_msg = AsyncMock(side_effect=[error, expected])
with (
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock),
):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
assert result is not None
assert result.payload["text"] == "ok"
@pytest.mark.asyncio
async def test_high_traffic_does_not_exhaust_budget(self):
"""Many unrelated messages don't prevent eventual success (wall-clock deadline)."""
mc = _mock_mc()
# 20 unrelated DMs followed by the expected CLI response
unrelated = [
_radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": f"{i:012x}", "text": f"msg {i}", "txt_type": 0},
)
for i in range(20)
]
expected = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ver 1.0", "txt_type": 1},
)
mc.commands.get_msg = AsyncMock(side_effect=[*unrelated, expected])
with patch(_MONOTONIC, side_effect=_advancing_clock()):
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=30.0)
assert result is not None
assert result.payload["text"] == "ver 1.0"
assert mc.commands.get_msg.await_count == 21
class TestTelemetryRoute:
@pytest.mark.asyncio
async def test_returns_404_when_contact_missing(self, test_db):
@@ -133,7 +317,15 @@ class TestTelemetryRoute:
)
mc.commands.req_acl_sync = AsyncMock(return_value=[{"key": "def456abc123", "perm": 2}])
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
mc.wait_for_event = AsyncMock(side_effect=[None, None]) # two clock attempts, no response
# Clock fetch uses _fetch_repeater_response which calls get_msg() directly.
# Return NO_MORE_MSGS to simulate no clock response.
mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS))
# Clock is attempted twice, each with timeout=10.0. Provide enough ticks
# for the deadline to expire on each attempt.
clock_ticks = []
for base in (0.0, 100.0):
clock_ticks.extend([base, base + 5.0, base + 11.0])
with (
patch("app.routers.contacts.require_connected", return_value=mc),
@@ -141,6 +333,8 @@ class TestTelemetryRoute:
"app.routers.contacts.prepare_repeater_connection",
new_callable=AsyncMock,
) as mock_prepare,
patch(_MONOTONIC, side_effect=clock_ticks),
patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock),
):
response = await request_telemetry(KEY_A, TelemetryRequest(password="pw"))
@@ -174,9 +368,14 @@ class TestRepeaterCommandRoute:
mc = _mock_mc()
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
mc.wait_for_event = AsyncMock(return_value=None)
mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS))
with patch("app.routers.contacts.require_connected", return_value=mc):
# Expire the deadline after a couple of ticks
with (
patch("app.routers.contacts.require_connected", return_value=mc),
patch(_MONOTONIC, side_effect=[0.0, 5.0, 25.0]),
patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock),
):
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
assert response.command == "ver"
@@ -188,15 +387,22 @@ class TestRepeaterCommandRoute:
mc = _mock_mc()
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
mc.wait_for_event = AsyncMock(return_value=MagicMock())
mc.commands.get_msg = AsyncMock(
return_value=_radio_result(
EventType.CONTACT_MSG_RECV,
{"text": "firmware: v1.2.3", "sender_timestamp": 1700000000},
{
"pubkey_prefix": KEY_A[:12],
"text": "firmware: v1.2.3",
"sender_timestamp": 1700000000,
"txt_type": 1,
},
)
)
with patch("app.routers.contacts.require_connected", return_value=mc):
with (
patch("app.routers.contacts.require_connected", return_value=mc),
patch(_MONOTONIC, side_effect=_advancing_clock()),
):
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
assert response.command == "ver"
@@ -208,21 +414,102 @@ class TestRepeaterCommandRoute:
mc = _mock_mc()
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
mc.wait_for_event = AsyncMock(return_value=MagicMock())
mc.commands.get_msg = AsyncMock(
return_value=_radio_result(
EventType.CONTACT_MSG_RECV,
{"text": "firmware: v1.2.3", "timestamp": 1700000000},
{
"pubkey_prefix": KEY_A[:12],
"text": "firmware: v1.2.3",
"timestamp": 1700000000,
"txt_type": 1,
},
)
)
with patch("app.routers.contacts.require_connected", return_value=mc):
with (
patch("app.routers.contacts.require_connected", return_value=mc),
patch(_MONOTONIC, side_effect=_advancing_clock()),
):
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
assert response.command == "ver"
assert response.response == "firmware: v1.2.3"
assert response.sender_timestamp == 1700000000
@pytest.mark.asyncio
async def test_unrelated_dm_during_command_does_not_prevent_success(self, test_db):
"""Unrelated DMs arriving during command wait are skipped; correct response returned."""
mc = _mock_mc()
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
unrelated = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": "bbbbbbbbbbbb", "text": "hello from someone", "txt_type": 0},
)
expected = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": KEY_A[:12], "text": "ver 1.0", "txt_type": 1},
)
mc.commands.get_msg = AsyncMock(side_effect=[unrelated, expected])
with (
patch("app.routers.contacts.require_connected", return_value=mc),
patch(_MONOTONIC, side_effect=_advancing_clock()),
):
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
assert response.command == "ver"
assert response.response == "ver 1.0"
@pytest.mark.asyncio
async def test_channel_message_during_command_is_skipped(self, test_db):
mc = _mock_mc()
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
channel_msg = _radio_result(
EventType.CHANNEL_MSG_RECV,
{"channel_idx": 0, "text": "flood msg"},
)
expected = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": KEY_A[:12], "text": "ok", "txt_type": 1},
)
mc.commands.get_msg = AsyncMock(side_effect=[channel_msg, expected])
with (
patch("app.routers.contacts.require_connected", return_value=mc),
patch(_MONOTONIC, side_effect=_advancing_clock()),
):
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
assert response.command == "ver"
assert response.response == "ok"
@pytest.mark.asyncio
async def test_no_more_msgs_then_response_succeeds(self, test_db):
mc = _mock_mc()
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
no_msgs = _radio_result(EventType.NO_MORE_MSGS)
expected = _radio_result(
EventType.CONTACT_MSG_RECV,
{"pubkey_prefix": KEY_A[:12], "text": "done", "txt_type": 1},
)
mc.commands.get_msg = AsyncMock(side_effect=[no_msgs, expected])
with (
patch("app.routers.contacts.require_connected", return_value=mc),
patch(_MONOTONIC, side_effect=_advancing_clock()),
patch("app.routers.contacts.asyncio.sleep", new_callable=AsyncMock),
):
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
assert response.command == "ver"
assert response.response == "done"
class TestTraceRoute:
@pytest.mark.asyncio