Add health endpoint

This commit is contained in:
Jack Kingsman
2026-03-12 19:47:21 -07:00
parent 338f632514
commit 2710cafb21
18 changed files with 607 additions and 73 deletions
+1
View File
@@ -290,6 +290,7 @@ All endpoints are prefixed with `/api` (e.g., `/api/health`).
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/api/health` | Connection status, fanout statuses, bots_disabled flag |
| GET | `/api/debug` | Support snapshot: recent logs, live radio probe, contact/channel drift audit, and running version/git info |
| GET | `/api/radio/config` | Radio configuration, including `path_hash_mode`, `path_hash_mode_supported`, and whether adverts include current node location |
| PATCH | `/api/radio/config` | Update name, location, advert-location on/off, radio params, and `path_hash_mode` when supported |
| PUT | `/api/radio/private-key` | Import private key to radio |
+4
View File
@@ -53,6 +53,7 @@ app/
├── frontend_static.py # Mount/serve built frontend (production)
└── routers/
├── health.py
├── debug.py
├── radio.py
├── contacts.py
├── channels.py
@@ -149,6 +150,9 @@ app/
### Health
- `GET /health`
### Debug
- `GET /debug` — support snapshot with recent logs, live radio probe, slot/contact audits, and version/git info
### Radio
- `GET /radio/config` — includes `path_hash_mode`, `path_hash_mode_supported`, and advert-location on/off
- `PATCH /radio/config` — may update `path_hash_mode` (`0..2`) when firmware supports it
+56
View File
@@ -1,5 +1,7 @@
import logging
import logging.config
from collections import deque
from threading import Lock
from typing import Literal
from pydantic import model_validator
@@ -67,6 +69,47 @@ class Settings(BaseSettings):
settings = Settings()
class _RingBufferLogHandler(logging.Handler):
"""Keep a bounded in-memory tail of formatted log lines."""
def __init__(self, max_lines: int = 1000) -> None:
super().__init__()
self._buffer: deque[str] = deque(maxlen=max_lines)
self._lock = Lock()
def emit(self, record: logging.LogRecord) -> None:
try:
line = self.format(record)
except Exception:
self.handleError(record)
return
with self._lock:
self._buffer.append(line)
def get_lines(self, limit: int = 1000) -> list[str]:
with self._lock:
if limit <= 0:
return []
return list(self._buffer)[-limit:]
def clear(self) -> None:
with self._lock:
self._buffer.clear()
_recent_log_handler = _RingBufferLogHandler(max_lines=1000)
def get_recent_log_lines(limit: int = 1000) -> list[str]:
"""Return recent formatted log lines from the in-memory ring buffer."""
return _recent_log_handler.get_lines(limit)
def clear_recent_log_lines() -> None:
"""Clear the in-memory log ring buffer."""
_recent_log_handler.clear()
class _RepeatSquelch(logging.Filter):
"""Suppress rapid-fire identical messages and emit a summary instead.
@@ -152,6 +195,19 @@ def setup_logging() -> None:
},
}
)
_recent_log_handler.setLevel(logging.DEBUG)
_recent_log_handler.setFormatter(
logging.Formatter(
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
for logger_name in ("", "uvicorn", "uvicorn.error", "uvicorn.access"):
target = logging.getLogger(logger_name)
if _recent_log_handler not in target.handlers:
target.addHandler(_recent_log_handler)
# Squelch repeated messages from the meshcore library (e.g. rapid-fire
# "Serial Connection started" when the port is contended).
logging.getLogger("meshcore").addFilter(_RepeatSquelch())
+1 -1
View File
@@ -243,7 +243,7 @@ async def on_new_contact(event: "Event") -> None:
logger.debug("New contact: %s", public_key[:12])
contact_upsert = ContactUpsert.from_radio_dict(public_key.lower(), payload, on_radio=True)
contact_upsert = ContactUpsert.from_radio_dict(public_key.lower(), payload, on_radio=False)
contact_upsert.last_seen = int(time.time())
await ContactRepository.upsert(contact_upsert)
promoted_keys = await promote_prefix_contacts_for_contact(
+2
View File
@@ -21,6 +21,7 @@ from app.radio_sync import (
from app.routers import (
channels,
contacts,
debug,
fanout,
health,
messages,
@@ -136,6 +137,7 @@ async def radio_disconnected_handler(request: Request, exc: RadioDisconnectedErr
# API routes - all prefixed with /api for production compatibility
app.include_router(health.router, prefix="/api")
app.include_router(debug.router, prefix="/api")
app.include_router(fanout.router, prefix="/api")
app.include_router(radio.router, prefix="/api")
app.include_router(contacts.router, prefix="/api")
+27 -22
View File
@@ -50,7 +50,6 @@ def _contact_sync_debug_fields(contact: Contact) -> dict[str, object]:
"last_advert": contact.last_advert,
"lat": contact.lat,
"lon": contact.lon,
"on_radio": contact.on_radio,
}
@@ -380,16 +379,19 @@ async def sync_and_offload_all(mc: MeshCore) -> dict:
"""Sync and offload both contacts and channels, then ensure defaults exist."""
logger.info("Starting full radio sync and offload")
# Contact on_radio is legacy/stale metadata. Clear it during the offload/reload
# cycle so old rows stop claiming radio residency we do not actively track.
await ContactRepository.clear_on_radio_except([])
contacts_result = await sync_and_offload_contacts(mc)
channels_result = await sync_and_offload_channels(mc)
# Ensure default channels exist
await ensure_default_channels()
# Reload favorites plus a working-set fill back onto the radio immediately
# so they do not stay in on_radio=False limbo after offload. Pass mc directly
# since the caller already holds the radio operation lock (asyncio.Lock is not
# reentrant).
# Reload favorites plus a working-set fill back onto the radio immediately.
# Pass mc directly since the caller already holds the radio operation lock
# (asyncio.Lock is not reentrant).
reload_result = await sync_recent_contacts_to_radio(force=True, mc=mc)
return {
@@ -776,20 +778,8 @@ _last_contact_sync: float = 0.0
CONTACT_SYNC_THROTTLE_SECONDS = 30 # Don't sync more than once per 30 seconds
async def _sync_contacts_to_radio_inner(mc: MeshCore) -> dict:
"""
Core logic for loading contacts onto the radio.
Fill order is:
1. Favorite contacts
2. Most recently interacted-with non-repeaters
3. Most recently advert-heard non-repeaters without interaction history
Favorite contacts are always reloaded first, up to the configured capacity.
Additional non-favorite fill stops at the refill target (80% of capacity).
Caller must hold the radio operation lock and pass a valid MeshCore instance.
"""
async def get_contacts_selected_for_radio_sync() -> list[Contact]:
"""Return the contacts that would be loaded onto the radio right now."""
app_settings = await AppSettingsRepository.get()
max_contacts = app_settings.max_radio_contacts
refill_target, _full_sync_trigger = _compute_radio_contact_limits(max_contacts)
@@ -856,6 +846,24 @@ async def _sync_contacts_to_radio_inner(mc: MeshCore) -> dict:
refill_target,
max_contacts,
)
return selected_contacts
async def _sync_contacts_to_radio_inner(mc: MeshCore) -> dict:
"""
Core logic for loading contacts onto the radio.
Fill order is:
1. Favorite contacts
2. Most recently interacted-with non-repeaters
3. Most recently advert-heard non-repeaters without interaction history
Favorite contacts are always reloaded first, up to the configured capacity.
Additional non-favorite fill stops at the refill target (80% of capacity).
Caller must hold the radio operation lock and pass a valid MeshCore instance.
"""
selected_contacts = await get_contacts_selected_for_radio_sync()
return await _load_contacts_to_radio(mc, selected_contacts)
@@ -928,8 +936,6 @@ async def _load_contacts_to_radio(mc: MeshCore, contacts: list[Contact]) -> dict
radio_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
if radio_contact:
already_on_radio += 1
if not contact.on_radio:
await ContactRepository.set_on_radio(contact.public_key, True)
continue
try:
@@ -937,7 +943,6 @@ async def _load_contacts_to_radio(mc: MeshCore, contacts: list[Contact]) -> dict
result = await mc.commands.add_contact(radio_contact_payload)
if result.type == EventType.OK:
loaded += 1
await ContactRepository.set_on_radio(contact.public_key, True)
logger.debug("Loaded contact %s to radio", contact.public_key[:12])
else:
failed += 1
+24
View File
@@ -66,6 +66,30 @@ class ChannelRepository:
for row in rows
]
@staticmethod
async def get_on_radio() -> list[Channel]:
"""Return channels currently marked as resident on the radio in the database."""
cursor = await db.conn.execute(
"""
SELECT key, name, is_hashtag, on_radio, flood_scope_override, last_read_at
FROM channels
WHERE on_radio = 1
ORDER BY name
"""
)
rows = await cursor.fetchall()
return [
Channel(
key=row["key"],
name=row["name"],
is_hashtag=bool(row["is_hashtag"]),
on_radio=bool(row["on_radio"]),
flood_scope_override=row["flood_scope_override"],
last_read_at=row["last_read_at"],
)
for row in rows
]
@staticmethod
async def delete(key: str) -> None:
"""Delete a channel by key."""
-8
View File
@@ -352,14 +352,6 @@ class ContactRepository:
)
await db.conn.commit()
@staticmethod
async def set_on_radio(public_key: str, on_radio: bool) -> None:
await db.conn.execute(
"UPDATE contacts SET on_radio = ? WHERE public_key = ?",
(on_radio, public_key.lower()),
)
await db.conn.commit()
@staticmethod
async def clear_on_radio_except(keep_keys: list[str]) -> None:
"""Set on_radio=False for all contacts NOT in keep_keys."""
+2 -2
View File
@@ -66,8 +66,8 @@ async def _ensure_on_radio(mc, contact: Contact) -> None:
async def _best_effort_push_contact_to_radio(contact: Contact, operation_name: str) -> None:
"""Push the current effective route to the radio when the contact is already loaded."""
if not radio_manager.is_connected or not contact.on_radio:
"""Best-effort push the current effective route to the radio when connected."""
if not radio_manager.is_connected:
return
try:
+298
View File
@@ -0,0 +1,298 @@
import hashlib
import importlib.metadata
import logging
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fastapi import APIRouter
from meshcore import EventType
from pydantic import BaseModel, Field
from app.config import get_recent_log_lines, settings
from app.radio_sync import get_contacts_selected_for_radio_sync, get_radio_channel_limit
from app.routers.health import HealthResponse, build_health_data
from app.services.radio_runtime import radio_runtime
logger = logging.getLogger(__name__)
router = APIRouter(tags=["debug"])
class DebugApplicationInfo(BaseModel):
version: str
commit_hash: str | None = None
git_branch: str | None = None
git_dirty: bool | None = None
python_version: str
class DebugRuntimeInfo(BaseModel):
connection_info: str | None = None
connection_desired: bool
setup_in_progress: bool
setup_complete: bool
max_channels: int
path_hash_mode: int
path_hash_mode_supported: bool
channel_slot_reuse_enabled: bool
channel_send_cache_capacity: int
channel_send_cache: list[dict[str, int | str]]
remediation_flags: dict[str, bool]
class DebugContactAudit(BaseModel):
expected_and_found: int
expected_but_not_found: list[str]
found_but_not_expected: list[str]
class DebugChannelSlotMismatch(BaseModel):
slot_number: int
expected_sha256_of_room_key: str | None = None
actual_sha256_of_room_key: str | None = None
class DebugChannelAudit(BaseModel):
matched_slots: int
wrong_slots: list[DebugChannelSlotMismatch]
class DebugRadioProbe(BaseModel):
performed: bool
errors: list[str] = Field(default_factory=list)
self_info: dict[str, Any] | None = None
device_info: dict[str, Any] | None = None
stats_core: dict[str, Any] | None = None
stats_radio: dict[str, Any] | None = None
contacts: DebugContactAudit | None = None
channels: DebugChannelAudit | None = None
class DebugSnapshotResponse(BaseModel):
captured_at: str
application: DebugApplicationInfo
health: HealthResponse
runtime: DebugRuntimeInfo
radio_probe: DebugRadioProbe
logs: list[str]
def _repo_root() -> Path:
return Path(__file__).resolve().parents[2]
def _get_app_version() -> str:
try:
return importlib.metadata.version("remoteterm-meshcore")
except Exception:
pyproject = _repo_root() / "pyproject.toml"
try:
for line in pyproject.read_text().splitlines():
if line.startswith("version = "):
return line.split('"')[1]
except Exception:
pass
return "0.0.0"
def _git_output(*args: str) -> str | None:
try:
result = subprocess.run(
["git", *args],
cwd=_repo_root(),
check=True,
capture_output=True,
text=True,
)
except Exception:
return None
output = result.stdout.strip()
return output or None
def _build_application_info() -> DebugApplicationInfo:
dirty_output = _git_output("status", "--porcelain")
return DebugApplicationInfo(
version=_get_app_version(),
commit_hash=_git_output("rev-parse", "HEAD"),
git_branch=_git_output("rev-parse", "--abbrev-ref", "HEAD"),
git_dirty=(dirty_output is not None and dirty_output != ""),
python_version=sys.version.split()[0],
)
def _event_type_name(event: Any) -> str:
event_type = getattr(event, "type", None)
return getattr(event_type, "name", str(event_type))
def _sha256_hex(value: str) -> str:
return hashlib.sha256(value.encode("utf-8")).hexdigest()
def _normalize_channel_secret(payload: dict[str, Any]) -> bytes:
secret = payload.get("channel_secret", b"")
if isinstance(secret, bytes):
return secret
return bytes(secret)
def _is_empty_channel_payload(payload: dict[str, Any]) -> bool:
name = payload.get("channel_name", "")
return not name or name == "\x00" * len(name)
def _observed_channel_key(event: Any) -> str | None:
if getattr(event, "type", None) != EventType.CHANNEL_INFO:
return None
payload = event.payload or {}
if _is_empty_channel_payload(payload):
return None
return _normalize_channel_secret(payload).hex().upper()
def _coerce_live_max_channels(device_info: dict[str, Any] | None) -> int | None:
if not device_info or "max_channels" not in device_info:
return None
try:
return int(device_info["max_channels"])
except (TypeError, ValueError):
return None
async def _build_contact_audit(
observed_contacts_payload: dict[str, dict[str, Any]],
) -> DebugContactAudit:
expected_contacts = await get_contacts_selected_for_radio_sync()
expected_keys = {contact.public_key.lower() for contact in expected_contacts}
observed_keys = {public_key.lower() for public_key in observed_contacts_payload}
return DebugContactAudit(
expected_and_found=len(expected_keys & observed_keys),
expected_but_not_found=sorted(_sha256_hex(key) for key in (expected_keys - observed_keys)),
found_but_not_expected=sorted(_sha256_hex(key) for key in (observed_keys - expected_keys)),
)
async def _build_channel_audit(mc: Any, max_channels: int | None = None) -> DebugChannelAudit:
cache_key_by_slot = {
slot: channel_key for channel_key, slot in radio_runtime.get_channel_send_cache_snapshot()
}
matched_slots = 0
wrong_slots: list[DebugChannelSlotMismatch] = []
for slot in range(get_radio_channel_limit(max_channels)):
event = await mc.commands.get_channel(slot)
expected_key = cache_key_by_slot.get(slot)
observed_key = _observed_channel_key(event)
if expected_key == observed_key:
matched_slots += 1
continue
wrong_slots.append(
DebugChannelSlotMismatch(
slot_number=slot,
expected_sha256_of_room_key=_sha256_hex(expected_key) if expected_key else None,
actual_sha256_of_room_key=_sha256_hex(observed_key) if observed_key else None,
)
)
return DebugChannelAudit(
matched_slots=matched_slots,
wrong_slots=wrong_slots,
)
async def _probe_radio() -> DebugRadioProbe:
if not radio_runtime.is_connected:
return DebugRadioProbe(performed=False, errors=["Radio not connected"])
errors: list[str] = []
try:
async with radio_runtime.radio_operation(
"debug_support_snapshot",
suspend_auto_fetch=True,
) as mc:
device_info = None
stats_core = None
stats_radio = None
device_event = await mc.commands.send_device_query()
if getattr(device_event, "type", None) == EventType.DEVICE_INFO:
device_info = device_event.payload
else:
errors.append(f"send_device_query returned {_event_type_name(device_event)}")
core_event = await mc.commands.get_stats_core()
if getattr(core_event, "type", None) == EventType.STATS_CORE:
stats_core = core_event.payload
else:
errors.append(f"get_stats_core returned {_event_type_name(core_event)}")
radio_event = await mc.commands.get_stats_radio()
if getattr(radio_event, "type", None) == EventType.STATS_RADIO:
stats_radio = radio_event.payload
else:
errors.append(f"get_stats_radio returned {_event_type_name(radio_event)}")
contacts_event = await mc.commands.get_contacts()
observed_contacts_payload: dict[str, dict[str, Any]] = {}
if getattr(contacts_event, "type", None) != EventType.ERROR:
observed_contacts_payload = contacts_event.payload or {}
else:
errors.append(f"get_contacts returned {_event_type_name(contacts_event)}")
return DebugRadioProbe(
performed=True,
errors=errors,
self_info=dict(mc.self_info or {}),
device_info=device_info,
stats_core=stats_core,
stats_radio=stats_radio,
contacts=await _build_contact_audit(observed_contacts_payload),
channels=await _build_channel_audit(
mc,
max_channels=_coerce_live_max_channels(device_info),
),
)
except Exception as exc:
logger.warning("Debug support snapshot radio probe failed: %s", exc, exc_info=True)
errors.append(str(exc))
return DebugRadioProbe(performed=False, errors=errors)
@router.get("/debug", response_model=DebugSnapshotResponse)
async def debug_support_snapshot() -> DebugSnapshotResponse:
"""Return a support/debug snapshot with recent logs and live radio state."""
health_data = await build_health_data(radio_runtime.is_connected, radio_runtime.connection_info)
radio_probe = await _probe_radio()
return DebugSnapshotResponse(
captured_at=datetime.now(timezone.utc).isoformat(),
application=_build_application_info(),
health=HealthResponse(**health_data),
runtime=DebugRuntimeInfo(
connection_info=radio_runtime.connection_info,
connection_desired=radio_runtime.connection_desired,
setup_in_progress=radio_runtime.is_setup_in_progress,
setup_complete=radio_runtime.is_setup_complete,
max_channels=radio_runtime.max_channels,
path_hash_mode=radio_runtime.path_hash_mode,
path_hash_mode_supported=radio_runtime.path_hash_mode_supported,
channel_slot_reuse_enabled=radio_runtime.channel_slot_reuse_enabled(),
channel_send_cache_capacity=radio_runtime.get_channel_send_cache_capacity(),
channel_send_cache=[
{"channel_key": channel_key, "slot": slot}
for channel_key, slot in radio_runtime.get_channel_send_cache_snapshot()
],
remediation_flags={
"enable_message_poll_fallback": settings.enable_message_poll_fallback,
"force_channel_slot_reconfigure": settings.force_channel_slot_reconfigure,
},
),
radio_probe=radio_probe,
logs=get_recent_log_lines(limit=1000),
)
-8
View File
@@ -168,20 +168,15 @@ async def send_direct_message_to_contact(
) -> Any:
"""Send a direct message and persist/broadcast the outgoing row."""
contact_data = contact.to_radio_dict()
contact_ensured_on_radio = False
async with radio_manager.radio_operation("send_direct_message") as mc:
logger.debug("Ensuring contact %s is on radio before sending", contact.public_key[:12])
add_result = await mc.commands.add_contact(contact_data)
if add_result.type == EventType.ERROR:
logger.warning("Failed to add contact to radio: %s", add_result.payload)
else:
contact_ensured_on_radio = True
cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
if not cached_contact:
cached_contact = contact_data
else:
contact_ensured_on_radio = True
logger.info("Sending direct message to %s", contact.public_key[:12])
now = int(now_fn())
@@ -194,9 +189,6 @@ async def send_direct_message_to_contact(
if result.type == EventType.ERROR:
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
if contact_ensured_on_radio and not contact.on_radio:
await contact_repository.set_on_radio(contact.public_key.lower(), True)
message = await create_outgoing_direct_message(
conversation_key=contact.public_key.lower(),
text=text,
@@ -278,11 +278,6 @@ export function ContactInfoPane({
<span className="text-[10px] uppercase tracking-wider px-1.5 py-0.5 rounded bg-muted text-muted-foreground font-medium">
{CONTACT_TYPE_LABELS[contact.type] ?? 'Unknown'}
</span>
{contact.on_radio && (
<span className="text-[10px] uppercase tracking-wider px-1.5 py-0.5 rounded bg-primary/10 text-primary font-medium">
On Radio
</span>
)}
</div>
</div>
</div>
@@ -113,6 +113,19 @@ export function SettingsAboutSection({ className }: { className?: string }) {
</a>
</p>
</div>
<Separator />
<div className="text-center">
<a
href="/api/debug"
target="_blank"
rel="noopener noreferrer"
className="text-xs text-muted-foreground hover:text-primary hover:underline"
>
Open debug support snapshot
</a>
</div>
</div>
</div>
);
@@ -0,0 +1,23 @@
import { render, screen } from '@testing-library/react';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { SettingsAboutSection } from '../components/settings/SettingsAboutSection';
describe('SettingsAboutSection', () => {
beforeEach(() => {
vi.stubGlobal('__APP_VERSION__', '3.2.0-test');
vi.stubGlobal('__COMMIT_HASH__', 'deadbeef');
});
afterEach(() => {
vi.unstubAllGlobals();
});
it('renders the debug support snapshot link', () => {
render(<SettingsAboutSection />);
const link = screen.getByRole('link', { name: /Open debug support snapshot/i });
expect(link).toHaveAttribute('href', '/api/debug');
expect(link).toHaveAttribute('target', '_blank');
});
});
+120
View File
@@ -5,6 +5,7 @@ Uses httpx.AsyncClient or direct function calls with real in-memory SQLite.
"""
import hashlib
import logging
import time
from unittest.mock import AsyncMock, MagicMock, patch
@@ -109,6 +110,125 @@ class TestHealthEndpoint:
assert data["connection_info"] is None
class TestDebugEndpoint:
"""Test the debug support snapshot endpoint."""
@pytest.mark.asyncio
async def test_support_snapshot_returns_logs_and_live_radio_audits(self, test_db, client):
"""Debug snapshot should include recent logs plus live radio/contact/channel state."""
from meshcore import EventType
from app.config import clear_recent_log_lines
from app.routers.debug import DebugApplicationInfo
clear_recent_log_lines()
contact_key = "ab" * 32
channel_key = "CD" * 16
await _insert_contact(contact_key, "Alice", last_contacted=1700000000)
await ChannelRepository.upsert(key=channel_key, name="#flightless", on_radio=False)
radio_manager.max_channels = 2
radio_manager.path_hash_mode = 1
radio_manager.path_hash_mode_supported = True
radio_manager._connection_info = "Serial: /dev/ttyUSB0"
radio_manager.note_channel_slot_loaded(channel_key, 0)
mock_mc = MagicMock()
mock_mc.self_info = {"name": "TestNode", "radio_freq": 910.525}
mock_mc.stop_auto_message_fetching = AsyncMock()
mock_mc.start_auto_message_fetching = AsyncMock()
mock_mc.commands.send_device_query = AsyncMock(
return_value=MagicMock(type=EventType.DEVICE_INFO, payload={"max_channels": 2})
)
mock_mc.commands.get_stats_core = AsyncMock(
return_value=MagicMock(type=EventType.STATS_CORE, payload={"heap_free": 1234})
)
mock_mc.commands.get_stats_radio = AsyncMock(
return_value=MagicMock(type=EventType.STATS_RADIO, payload={"tx_good": 9})
)
mock_mc.commands.get_contacts = AsyncMock(
return_value=MagicMock(
type=EventType.OK,
payload={contact_key: {"adv_name": "Alice"}},
)
)
def _channel_event(slot: int) -> MagicMock:
if slot == 0:
return MagicMock(
type=EventType.CHANNEL_INFO,
payload={
"channel_name": "#flightless",
"channel_secret": bytes.fromhex(channel_key),
},
)
return MagicMock(type=EventType.OK, payload={})
mock_mc.commands.get_channel = AsyncMock(side_effect=_channel_event)
radio_manager._meshcore = mock_mc
logging.getLogger("tests.debug").warning("support snapshot marker")
with patch(
"app.routers.debug._build_application_info",
return_value=DebugApplicationInfo(
version="3.2.0",
commit_hash="deadbeef",
git_branch="main",
git_dirty=False,
python_version="3.12.0",
),
):
response = await client.get("/api/debug")
assert response.status_code == 200
payload = response.json()
assert payload["application"]["commit_hash"] == "deadbeef"
assert payload["runtime"]["channel_slot_reuse_enabled"] is True
assert payload["runtime"]["channel_send_cache"] == [{"channel_key": channel_key, "slot": 0}]
assert any("support snapshot marker" in line for line in payload["logs"])
radio_probe = payload["radio_probe"]
assert radio_probe["performed"] is True
assert radio_probe["device_info"] == {"max_channels": 2}
assert radio_probe["stats_core"] == {"heap_free": 1234}
assert radio_probe["stats_radio"] == {"tx_good": 9}
assert radio_probe["contacts"]["expected_and_found"] == 1
assert radio_probe["contacts"]["expected_but_not_found"] == []
assert radio_probe["contacts"]["found_but_not_expected"] == []
assert radio_probe["channels"]["matched_slots"] == 2
assert radio_probe["channels"]["wrong_slots"] == []
@pytest.mark.asyncio
async def test_support_snapshot_returns_runtime_when_disconnected(self, test_db, client):
"""Debug snapshot should still return logs and runtime state when radio is disconnected."""
from app.config import clear_recent_log_lines
from app.routers.debug import DebugApplicationInfo
clear_recent_log_lines()
radio_manager._meshcore = None
radio_manager._connection_info = None
with patch(
"app.routers.debug._build_application_info",
return_value=DebugApplicationInfo(
version="3.2.0",
commit_hash="deadbeef",
git_branch="main",
git_dirty=False,
python_version="3.12.0",
),
):
response = await client.get("/api/debug")
assert response.status_code == 200
payload = response.json()
assert payload["radio_probe"]["performed"] is False
assert payload["radio_probe"]["errors"] == ["Radio not connected"]
class TestRadioDisconnectedHandler:
"""Test that RadioDisconnectedError maps to 503."""
+1 -1
View File
@@ -915,7 +915,7 @@ class TestOnNewContact:
contact = await ContactRepository.get_by_key("cc" * 32)
assert contact is not None
assert contact.name == "Charlie"
assert contact.on_radio is True
assert contact.on_radio is False
assert contact.last_seen == 1700000000
mock_broadcast.assert_called_once()
+35 -23
View File
@@ -20,6 +20,7 @@ from app.radio_sync import (
ensure_contact_on_radio,
is_polling_paused,
pause_polling,
sync_and_offload_all,
sync_radio_time,
sync_recent_contacts_to_radio,
)
@@ -211,11 +212,6 @@ class TestSyncRecentContactsToRadio:
result = await sync_recent_contacts_to_radio()
assert result["loaded"] == 2
# Verify contacts are now marked as on_radio in DB
alice = await ContactRepository.get_by_key(KEY_A)
bob = await ContactRepository.get_by_key(KEY_B)
assert alice.on_radio is True
assert bob.on_radio is True
@pytest.mark.asyncio
async def test_fills_remaining_slots_with_recently_contacted_then_advertised(self, test_db):
@@ -272,6 +268,39 @@ class TestSyncRecentContactsToRadio:
]
assert loaded_keys == favorite_keys
class TestSyncAndOffloadAll:
"""Test session-local contact radio residency reset behavior."""
@pytest.mark.asyncio
async def test_clears_stale_contact_on_radio_flags_before_reload(self, test_db):
await _insert_contact(KEY_A, "Alice", on_radio=True)
await _insert_contact(KEY_B, "Bob", on_radio=True)
mock_mc = MagicMock()
with (
patch(
"app.radio_sync.sync_and_offload_contacts",
new=AsyncMock(return_value={"synced": 0, "removed": 0}),
),
patch(
"app.radio_sync.sync_and_offload_channels",
new=AsyncMock(return_value={"synced": 0, "cleared": 0}),
),
patch("app.radio_sync.ensure_default_channels", new=AsyncMock()),
patch(
"app.radio_sync.sync_recent_contacts_to_radio",
new=AsyncMock(return_value={"loaded": 0, "already_on_radio": 0, "failed": 0}),
),
):
await sync_and_offload_all(mock_mc)
alice = await ContactRepository.get_by_key(KEY_A)
bob = await ContactRepository.get_by_key(KEY_B)
assert alice is not None and alice.on_radio is False
assert bob is not None and bob.on_radio is False
@pytest.mark.asyncio
async def test_advert_fill_skips_repeaters(self, test_db):
"""Recent advert fallback only considers non-repeaters."""
@@ -325,7 +354,7 @@ class TestSyncRecentContactsToRadio:
@pytest.mark.asyncio
async def test_skips_contacts_already_on_radio(self, test_db):
"""Contacts already on radio are counted but not re-added."""
await _insert_contact(KEY_A, "Alice", on_radio=True)
await _insert_contact(KEY_A, "Alice", on_radio=False)
await AppSettingsRepository.update(favorites=[Favorite(type="contact", id=KEY_A)])
mock_mc = MagicMock()
@@ -382,23 +411,6 @@ class TestSyncRecentContactsToRadio:
assert result["loaded"] == 0
assert "error" in result
@pytest.mark.asyncio
async def test_marks_on_radio_when_found_but_not_flagged(self, test_db):
"""Contact found on radio but not flagged gets set_on_radio(True)."""
await _insert_contact(KEY_A, "Alice", on_radio=False)
await AppSettingsRepository.update(favorites=[Favorite(type="contact", id=KEY_A)])
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=MagicMock()) # Found
radio_manager._meshcore = mock_mc
result = await sync_recent_contacts_to_radio()
assert result["already_on_radio"] == 1
# Should update the flag since contact.on_radio was False
contact = await ContactRepository.get_by_key(KEY_A)
assert contact.on_radio is True
@pytest.mark.asyncio
async def test_handles_add_failure(self, test_db):
"""Failed add_contact increments the failed counter."""
-3
View File
@@ -161,9 +161,6 @@ class TestOutgoingDMBroadcast:
assert contact_payload["out_path"] == "aa00bb00"
assert contact_payload["out_path_len"] == 2
assert contact_payload["out_path_hash_mode"] == 1
contact = await ContactRepository.get_by_key(pub_key)
assert contact is not None
assert contact.on_radio is True
@pytest.mark.asyncio
async def test_send_dm_prefers_route_override_over_learned_path(self, test_db):