mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
Drop frequency of contact sync task, make standard polling opt-in only
This commit is contained in:
@@ -84,6 +84,8 @@ Ancillary AGENTS.md files which should generally not be reviewed unless specific
|
||||
|
||||
**Background tasks** (WebSocket broadcasts, periodic sync, contact auto-loading, etc.) use fire-and-forget `asyncio.create_task`. Exceptions in these tasks are logged to the backend logs, which is sufficient for debugging. There is no need to track task references or add done-callbacks purely for error visibility. If there's a convenient way to bubble an error to the frontend (e.g., via `broadcast_error` for user-actionable problems), do so, but this is minor and best-effort.
|
||||
|
||||
Radio startup/setup is one place where that frontend bubbling is intentional: if post-connect setup hangs past its timeout, the backend both logs the failure and pushes a toast instructing the operator to reboot the radio and restart the server.
|
||||
|
||||
## Key Design Principles
|
||||
|
||||
1. **Store-and-serve**: Backend stores all packets even when no client is connected
|
||||
@@ -440,8 +442,9 @@ mc.subscribe(EventType.ACK, handler)
|
||||
| `MESHCORE_LOG_LEVEL` | `INFO` | Logging level (`DEBUG`/`INFO`/`WARNING`/`ERROR`) |
|
||||
| `MESHCORE_DATABASE_PATH` | `data/meshcore.db` | SQLite database location |
|
||||
| `MESHCORE_DISABLE_BOTS` | `false` | Disable bot system entirely (blocks execution and config) |
|
||||
| `MESHCORE_ENABLE_MESSAGE_POLL_FALLBACK` | `false` | Enable periodic `get_msg()` fallback polling if radio events are not reliably surfacing messages |
|
||||
|
||||
**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `flood_scope`, `blocked_keys`, and `blocked_names`. They are configured via `GET/PATCH /api/settings`. MQTT, bot, webhook, and Apprise configs are stored in the `fanout_configs` table, managed via `/api/fanout`.
|
||||
**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `flood_scope`, `blocked_keys`, and `blocked_names`. `max_radio_contacts` now means favorite contacts kept loaded on the radio for ACK support, not a recent-contact rotation pool. They are configured via `GET/PATCH /api/settings`. MQTT, bot, webhook, and Apprise configs are stored in the `fanout_configs` table, managed via `/api/fanout`.
|
||||
|
||||
Byte-perfect channel retries are user-triggered via `POST /api/messages/channel/{message_id}/resend` and are allowed for 30 seconds after the original send.
|
||||
|
||||
|
||||
@@ -224,9 +224,12 @@ npm run build # build the frontend
|
||||
| `MESHCORE_LOG_LEVEL` | INFO | DEBUG, INFO, WARNING, ERROR |
|
||||
| `MESHCORE_DATABASE_PATH` | data/meshcore.db | SQLite database path |
|
||||
| `MESHCORE_DISABLE_BOTS` | false | Disable bot system entirely (blocks execution and config) |
|
||||
| `MESHCORE_ENABLE_MESSAGE_POLL_FALLBACK` | false | Enable periodic `get_msg()` fallback polling if radio events are not reliably surfacing messages |
|
||||
|
||||
Only one transport may be active at a time. If multiple are set, the server will refuse to start.
|
||||
|
||||
By default the app relies on radio events plus MeshCore auto-fetch for incoming messages. If you see messages on the radio that never show up in the app, try `MESHCORE_ENABLE_MESSAGE_POLL_FALLBACK=true` to add a more aggressive periodic `get_msg()` safety net.
|
||||
|
||||
## Additional Setup
|
||||
|
||||
<details>
|
||||
|
||||
@@ -87,7 +87,8 @@ app/
|
||||
- `RadioManager.post_connect_setup()` delegates to `services/radio_lifecycle.py`.
|
||||
- Routers, startup/lifespan code, fanout helpers, and `radio_sync.py` should reach radio state through `services/radio_runtime.py`, not by importing `app.radio.radio_manager` directly.
|
||||
- Shared reconnect/setup helpers in `services/radio_lifecycle.py` are used by startup, the monitor, and manual reconnect/reboot flows before broadcasting healthy state.
|
||||
- Setup still includes handler registration, key export, time sync, contact/channel sync, polling/advert tasks.
|
||||
- Setup still includes handler registration, key export, time sync, contact/channel sync, and advertisement tasks. Fallback message polling starts only when `MESHCORE_ENABLE_MESSAGE_POLL_FALLBACK=true`.
|
||||
- Post-connect setup is timeout-bounded. If initial radio offload/setup hangs too long, the backend logs the failure and broadcasts an `error` toast telling the operator to reboot the radio and restart the server.
|
||||
|
||||
## Important Behaviors
|
||||
|
||||
@@ -230,7 +231,7 @@ app/
|
||||
- `contact_deleted` — contact removed from database (payload: `{ public_key }`)
|
||||
- `channel` — single channel upsert/update (payload: full `Channel`)
|
||||
- `channel_deleted` — channel removed from database (payload: `{ key }`)
|
||||
- `error` — toast notification (reconnect failure, missing private key, etc.)
|
||||
- `error` — toast notification (reconnect failure, missing private key, stuck radio startup, etc.)
|
||||
- `success` — toast notification (historical decrypt complete, etc.)
|
||||
|
||||
Backend WS sends go through typed serialization in `events.py`. Initial WS connect sends `health` only. Contacts/channels are loaded by REST.
|
||||
@@ -250,6 +251,8 @@ Main tables:
|
||||
|
||||
Repository writes should prefer typed models such as `ContactUpsert` over ad hoc dict payloads when adding or updating schema-coupled data.
|
||||
|
||||
`max_radio_contacts` now controls how many favorite contacts are kept loaded on the radio for ACK support. The app no longer rotates recent non-favorite contacts onto the radio during normal background maintenance.
|
||||
|
||||
`app_settings` fields in active model:
|
||||
- `max_radio_contacts`
|
||||
- `favorites`
|
||||
|
||||
@@ -18,6 +18,7 @@ class Settings(BaseSettings):
|
||||
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = "INFO"
|
||||
database_path: str = "data/meshcore.db"
|
||||
disable_bots: bool = False
|
||||
enable_message_poll_fallback: bool = False
|
||||
|
||||
@model_validator(mode="after")
|
||||
def validate_transport_exclusivity(self) -> "Settings":
|
||||
|
||||
@@ -778,7 +778,7 @@ async def _migrate_009_create_app_settings_table(conn: aiosqlite.Connection) ->
|
||||
Create app_settings table for persistent application preferences.
|
||||
|
||||
This table stores:
|
||||
- max_radio_contacts: Max non-repeater contacts to keep on radio for DM ACKs
|
||||
- max_radio_contacts: Max favorite contacts to keep on radio for DM ACKs
|
||||
- favorites: JSON array of favorite conversations [{type, id}, ...]
|
||||
- auto_decrypt_dm_on_advert: Whether to attempt historical DM decryption on new contact
|
||||
- sidebar_sort_order: 'recent' or 'alpha' for sidebar sorting
|
||||
|
||||
@@ -515,10 +515,7 @@ class AppSettings(BaseModel):
|
||||
|
||||
max_radio_contacts: int = Field(
|
||||
default=200,
|
||||
description=(
|
||||
"Maximum contacts to keep on radio for DM ACKs "
|
||||
"(favorite contacts first, then recent non-repeaters)"
|
||||
),
|
||||
description="Maximum favorite contacts to keep on radio for DM ACKs",
|
||||
)
|
||||
favorites: list[Favorite] = Field(
|
||||
default_factory=list, description="List of favorited conversations"
|
||||
|
||||
@@ -29,7 +29,6 @@ from app.decoder import (
|
||||
)
|
||||
from app.keystore import get_private_key, get_public_key, has_private_key
|
||||
from app.models import (
|
||||
CONTACT_TYPE_REPEATER,
|
||||
Contact,
|
||||
ContactUpsert,
|
||||
RawPacketBroadcast,
|
||||
@@ -415,7 +414,6 @@ async def _process_advertisement(
|
||||
Process an advertisement packet.
|
||||
|
||||
Extracts contact info and updates the database/broadcasts to clients.
|
||||
For non-repeater contacts, triggers sync of recent contacts to radio for DM ACK support.
|
||||
"""
|
||||
# Parse packet to get path info if not already provided
|
||||
if packet_info is None:
|
||||
@@ -533,14 +531,6 @@ async def _process_advertisement(
|
||||
if settings.auto_decrypt_dm_on_advert:
|
||||
await start_historical_dm_decryption(None, advert.public_key.lower(), advert.name)
|
||||
|
||||
# If this is not a repeater, trigger recent contacts sync to radio
|
||||
# This ensures we can auto-ACK DMs from recent contacts
|
||||
if contact_type != CONTACT_TYPE_REPEATER:
|
||||
# Import here to avoid circular import
|
||||
from app.radio_sync import sync_recent_contacts_to_radio
|
||||
|
||||
asyncio.create_task(sync_recent_contacts_to_radio())
|
||||
|
||||
|
||||
async def _process_direct_message(
|
||||
raw_bytes: bytes,
|
||||
|
||||
@@ -4,7 +4,7 @@ Radio sync and offload management.
|
||||
This module handles syncing contacts and channels from the radio to the database,
|
||||
then removing them from the radio to free up space for new discoveries.
|
||||
|
||||
Also handles loading recent non-repeater contacts TO the radio for DM ACK support.
|
||||
Also handles loading favorites plus recently active contacts TO the radio for DM ACK support.
|
||||
Also handles periodic message polling as a fallback for platforms where push events
|
||||
don't work reliably.
|
||||
"""
|
||||
@@ -49,6 +49,26 @@ def _contact_sync_debug_fields(contact: Contact) -> dict[str, object]:
|
||||
}
|
||||
|
||||
|
||||
async def _reconcile_contact_messages_background(
|
||||
public_key: str,
|
||||
contact_name: str | None,
|
||||
) -> None:
|
||||
"""Run contact/message reconciliation outside the radio critical path."""
|
||||
try:
|
||||
await reconcile_contact_messages(
|
||||
public_key=public_key,
|
||||
contact_name=contact_name,
|
||||
log=logger,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Background contact reconciliation failed for %s: %s",
|
||||
public_key[:12],
|
||||
exc,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
|
||||
async def upsert_channel_from_radio_slot(payload: dict, *, on_radio: bool) -> str | None:
|
||||
"""Parse a radio channel-slot payload and upsert to the database.
|
||||
|
||||
@@ -119,8 +139,8 @@ async def pause_polling():
|
||||
# Background task handle
|
||||
_sync_task: asyncio.Task | None = None
|
||||
|
||||
# Sync interval in seconds (5 minutes)
|
||||
SYNC_INTERVAL = 300
|
||||
# Sync interval in seconds (10 minutes)
|
||||
SYNC_INTERVAL = 600
|
||||
|
||||
|
||||
async def sync_and_offload_contacts(mc: MeshCore) -> dict:
|
||||
@@ -157,10 +177,11 @@ async def sync_and_offload_contacts(mc: MeshCore) -> dict:
|
||||
await ContactRepository.upsert(
|
||||
ContactUpsert.from_radio_dict(public_key, contact_data, on_radio=False)
|
||||
)
|
||||
await reconcile_contact_messages(
|
||||
public_key=public_key,
|
||||
contact_name=contact_data.get("adv_name"),
|
||||
log=logger,
|
||||
asyncio.create_task(
|
||||
_reconcile_contact_messages_background(
|
||||
public_key,
|
||||
contact_data.get("adv_name"),
|
||||
)
|
||||
)
|
||||
synced += 1
|
||||
|
||||
@@ -290,10 +311,10 @@ async def sync_and_offload_all(mc: MeshCore) -> dict:
|
||||
# Ensure default channels exist
|
||||
await ensure_default_channels()
|
||||
|
||||
# Reload favorites and recent contacts back onto the radio immediately
|
||||
# so favorited contacts don't stay in the on_radio=False limbo until the
|
||||
# next advertisement arrives. Pass mc directly since the caller already
|
||||
# holds the radio operation lock (asyncio.Lock is not reentrant).
|
||||
# Reload favorites back onto the radio immediately so they do not stay
|
||||
# in on_radio=False limbo after offload. Pass mc directly since the
|
||||
# caller already holds the radio operation lock (asyncio.Lock is not
|
||||
# reentrant).
|
||||
reload_result = await sync_recent_contacts_to_radio(force=True, mc=mc)
|
||||
|
||||
return {
|
||||
@@ -376,10 +397,6 @@ async def _message_poll_loop():
|
||||
try:
|
||||
await asyncio.sleep(MESSAGE_POLL_INTERVAL)
|
||||
|
||||
# Clean up expired pending ACKs every poll cycle so they don't
|
||||
# accumulate when no ACKs arrive (e.g. all recipients out of range).
|
||||
cleanup_expired_acks()
|
||||
|
||||
if radio_manager.is_connected and not is_polling_paused():
|
||||
try:
|
||||
async with radio_manager.radio_operation(
|
||||
@@ -554,6 +571,7 @@ async def _periodic_sync_loop():
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(SYNC_INTERVAL)
|
||||
cleanup_expired_acks()
|
||||
if not radio_manager.is_connected:
|
||||
continue
|
||||
|
||||
@@ -604,8 +622,12 @@ async def _sync_contacts_to_radio_inner(mc: MeshCore) -> dict:
|
||||
"""
|
||||
Core logic for loading contacts onto the radio.
|
||||
|
||||
Favorite contacts are prioritized first, then recent non-repeater contacts
|
||||
fill remaining slots up to max_radio_contacts.
|
||||
Fill order is:
|
||||
1. Favorite contacts
|
||||
2. Most recently interacted-with non-repeaters
|
||||
3. Most recently advert-heard non-repeaters without interaction history
|
||||
|
||||
Contacts are loaded up to max_radio_contacts.
|
||||
|
||||
Caller must hold the radio operation lock and pass a valid MeshCore instance.
|
||||
"""
|
||||
@@ -638,8 +660,21 @@ async def _sync_contacts_to_radio_inner(mc: MeshCore) -> dict:
|
||||
break
|
||||
|
||||
if len(selected_contacts) < max_contacts:
|
||||
recent_contacts = await ContactRepository.get_recent_non_repeaters(limit=max_contacts)
|
||||
for contact in recent_contacts:
|
||||
for contact in await ContactRepository.get_recently_contacted_non_repeaters(
|
||||
limit=max_contacts
|
||||
):
|
||||
key = contact.public_key.lower()
|
||||
if key in selected_keys:
|
||||
continue
|
||||
selected_keys.add(key)
|
||||
selected_contacts.append(contact)
|
||||
if len(selected_contacts) >= max_contacts:
|
||||
break
|
||||
|
||||
if len(selected_contacts) < max_contacts:
|
||||
for contact in await ContactRepository.get_recently_advertised_non_repeaters(
|
||||
limit=max_contacts
|
||||
):
|
||||
key = contact.public_key.lower()
|
||||
if key in selected_keys:
|
||||
continue
|
||||
@@ -649,22 +684,75 @@ async def _sync_contacts_to_radio_inner(mc: MeshCore) -> dict:
|
||||
break
|
||||
|
||||
logger.debug(
|
||||
"Selected %d contacts to sync (%d favorite contacts first, limit=%d)",
|
||||
"Selected %d contacts to sync (%d favorites, limit=%d)",
|
||||
len(selected_contacts),
|
||||
favorite_contacts_loaded,
|
||||
max_contacts,
|
||||
)
|
||||
return await _load_contacts_to_radio(mc, selected_contacts)
|
||||
|
||||
|
||||
async def ensure_contact_on_radio(
|
||||
public_key: str,
|
||||
*,
|
||||
force: bool = False,
|
||||
mc: MeshCore | None = None,
|
||||
) -> dict:
|
||||
"""Ensure one contact is loaded on the radio for ACK/routing support."""
|
||||
global _last_contact_sync
|
||||
|
||||
now = time.time()
|
||||
if not force and (now - _last_contact_sync) < CONTACT_SYNC_THROTTLE_SECONDS:
|
||||
logger.debug(
|
||||
"Single-contact sync throttled (last sync %ds ago)",
|
||||
int(now - _last_contact_sync),
|
||||
)
|
||||
return {"loaded": 0, "throttled": True}
|
||||
|
||||
try:
|
||||
contact = await ContactRepository.get_by_key_or_prefix(public_key)
|
||||
except AmbiguousPublicKeyPrefixError:
|
||||
logger.warning("Cannot sync favorite contact '%s': ambiguous key prefix", public_key)
|
||||
return {"loaded": 0, "error": "Ambiguous contact key prefix"}
|
||||
|
||||
if not contact:
|
||||
logger.debug("Cannot sync favorite contact %s: not found", public_key[:12])
|
||||
return {"loaded": 0, "error": "Contact not found"}
|
||||
|
||||
if mc is not None:
|
||||
_last_contact_sync = now
|
||||
return await _load_contacts_to_radio(mc, [contact])
|
||||
|
||||
if not radio_manager.is_connected or radio_manager.meshcore is None:
|
||||
logger.debug("Cannot sync favorite contact to radio: not connected")
|
||||
return {"loaded": 0, "error": "Radio not connected"}
|
||||
|
||||
try:
|
||||
async with radio_manager.radio_operation(
|
||||
"ensure_contact_on_radio",
|
||||
blocking=False,
|
||||
) as mc:
|
||||
_last_contact_sync = now
|
||||
assert mc is not None
|
||||
return await _load_contacts_to_radio(mc, [contact])
|
||||
except RadioOperationBusyError:
|
||||
logger.debug("Skipping favorite contact sync: radio busy")
|
||||
return {"loaded": 0, "busy": True}
|
||||
except Exception as e:
|
||||
logger.error("Error syncing favorite contact to radio: %s", e, exc_info=True)
|
||||
return {"loaded": 0, "error": str(e)}
|
||||
|
||||
|
||||
async def _load_contacts_to_radio(mc: MeshCore, contacts: list[Contact]) -> dict:
|
||||
"""Load the provided contacts onto the radio."""
|
||||
loaded = 0
|
||||
already_on_radio = 0
|
||||
failed = 0
|
||||
|
||||
for contact in selected_contacts:
|
||||
# Check if already on radio
|
||||
for contact in contacts:
|
||||
radio_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
|
||||
if radio_contact:
|
||||
already_on_radio += 1
|
||||
# Update DB if not marked as on_radio
|
||||
if not contact.on_radio:
|
||||
await ContactRepository.set_on_radio(contact.public_key, True)
|
||||
continue
|
||||
@@ -722,8 +810,8 @@ async def sync_recent_contacts_to_radio(force: bool = False, mc: MeshCore | None
|
||||
"""
|
||||
Load contacts to the radio for DM ACK support.
|
||||
|
||||
Favorite contacts are prioritized first, then recent non-repeater contacts
|
||||
fill remaining slots up to max_radio_contacts.
|
||||
Fill order is favorites, then recently contacted non-repeaters,
|
||||
then recently advert-heard non-repeaters until max_radio_contacts.
|
||||
Only runs at most once every CONTACT_SYNC_THROTTLE_SECONDS unless forced.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -253,17 +253,28 @@ class ContactRepository:
|
||||
return [ContactRepository._row_to_contact(row) for row in rows]
|
||||
|
||||
@staticmethod
|
||||
async def get_recent_non_repeaters(limit: int = 200) -> list[Contact]:
|
||||
"""Get the most recently active non-repeater contacts.
|
||||
|
||||
Orders by most recent activity (last_contacted or last_advert),
|
||||
excluding repeaters (type=2).
|
||||
"""
|
||||
async def get_recently_contacted_non_repeaters(limit: int = 200) -> list[Contact]:
|
||||
"""Get recently interacted-with non-repeater contacts."""
|
||||
cursor = await db.conn.execute(
|
||||
"""
|
||||
SELECT * FROM contacts
|
||||
WHERE type != 2
|
||||
ORDER BY COALESCE(last_contacted, 0) DESC, COALESCE(last_advert, 0) DESC
|
||||
WHERE type != 2 AND last_contacted IS NOT NULL
|
||||
ORDER BY last_contacted DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(limit,),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return [ContactRepository._row_to_contact(row) for row in rows]
|
||||
|
||||
@staticmethod
|
||||
async def get_recently_advertised_non_repeaters(limit: int = 200) -> list[Contact]:
|
||||
"""Get recently advert-heard non-repeater contacts."""
|
||||
cursor = await db.conn.execute(
|
||||
"""
|
||||
SELECT * FROM contacts
|
||||
WHERE type != 2 AND last_advert IS NOT NULL
|
||||
ORDER BY last_advert DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(limit,),
|
||||
|
||||
@@ -18,9 +18,7 @@ class AppSettingsUpdate(BaseModel):
|
||||
default=None,
|
||||
ge=1,
|
||||
le=1000,
|
||||
description=(
|
||||
"Maximum contacts to keep on radio (favorites first, then recent non-repeaters)"
|
||||
),
|
||||
description="Maximum favorite contacts to keep loaded on the radio for ACK support",
|
||||
)
|
||||
auto_decrypt_dm_on_advert: bool | None = Field(
|
||||
default=None,
|
||||
@@ -161,12 +159,12 @@ async def toggle_favorite(request: FavoriteRequest) -> AppSettings:
|
||||
logger.info("Adding favorite: %s %s", request.type, request.id[:12])
|
||||
result = await AppSettingsRepository.add_favorite(request.type, request.id)
|
||||
|
||||
# When a contact favorite changes, sync the radio so the contact is
|
||||
# loaded/unloaded immediately rather than waiting for the next advert.
|
||||
if request.type == "contact":
|
||||
from app.radio_sync import sync_recent_contacts_to_radio
|
||||
# When a contact is newly favorited, load just that contact to the radio
|
||||
# immediately so DM ACK support does not wait for the next maintenance cycle.
|
||||
if request.type == "contact" and not is_favorited:
|
||||
from app.radio_sync import ensure_contact_on_radio
|
||||
|
||||
asyncio.create_task(sync_recent_contacts_to_radio(force=True))
|
||||
asyncio.create_task(ensure_contact_on_radio(request.id, force=True))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@@ -137,15 +137,20 @@ async def send_direct_message_to_contact(
|
||||
) -> Any:
|
||||
"""Send a direct message and persist/broadcast the outgoing row."""
|
||||
contact_data = contact.to_radio_dict()
|
||||
contact_ensured_on_radio = False
|
||||
async with radio_manager.radio_operation("send_direct_message") as mc:
|
||||
logger.debug("Ensuring contact %s is on radio before sending", contact.public_key[:12])
|
||||
add_result = await mc.commands.add_contact(contact_data)
|
||||
if add_result.type == EventType.ERROR:
|
||||
logger.warning("Failed to add contact to radio: %s", add_result.payload)
|
||||
else:
|
||||
contact_ensured_on_radio = True
|
||||
|
||||
cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
|
||||
if not cached_contact:
|
||||
cached_contact = contact_data
|
||||
else:
|
||||
contact_ensured_on_radio = True
|
||||
|
||||
logger.info("Sending direct message to %s", contact.public_key[:12])
|
||||
now = int(now_fn())
|
||||
@@ -158,6 +163,9 @@ async def send_direct_message_to_contact(
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
||||
|
||||
if contact_ensured_on_radio and not contact.on_radio:
|
||||
await contact_repository.set_on_radio(contact.public_key.lower(), True)
|
||||
|
||||
message = await create_outgoing_direct_message(
|
||||
conversation_key=contact.public_key.lower(),
|
||||
text=text,
|
||||
|
||||
@@ -1,8 +1,13 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
POST_CONNECT_SETUP_TIMEOUT_SECONDS = 300
|
||||
POST_CONNECT_SETUP_MAX_ATTEMPTS = 2
|
||||
|
||||
|
||||
async def run_post_connect_setup(radio_manager) -> None:
|
||||
"""Run shared radio initialization after a transport connection succeeds."""
|
||||
@@ -24,7 +29,7 @@ async def run_post_connect_setup(radio_manager) -> None:
|
||||
if radio_manager._setup_lock is None:
|
||||
radio_manager._setup_lock = asyncio.Lock()
|
||||
|
||||
async with radio_manager._setup_lock:
|
||||
async def _setup_body() -> None:
|
||||
if not radio_manager.meshcore:
|
||||
return
|
||||
radio_manager._setup_in_progress = True
|
||||
@@ -132,20 +137,51 @@ async def run_post_connect_setup(radio_manager) -> None:
|
||||
# These tasks acquire their own locks when they need radio access.
|
||||
start_periodic_sync()
|
||||
start_periodic_advert()
|
||||
start_message_polling()
|
||||
if settings.enable_message_poll_fallback:
|
||||
start_message_polling()
|
||||
else:
|
||||
logger.info("Fallback message polling disabled; relying on radio events")
|
||||
|
||||
radio_manager._setup_complete = True
|
||||
finally:
|
||||
radio_manager._setup_in_progress = False
|
||||
|
||||
async with radio_manager._setup_lock:
|
||||
await asyncio.wait_for(_setup_body(), timeout=POST_CONNECT_SETUP_TIMEOUT_SECONDS)
|
||||
|
||||
logger.info("Post-connect setup complete")
|
||||
|
||||
|
||||
async def prepare_connected_radio(radio_manager, *, broadcast_on_success: bool = True) -> None:
|
||||
"""Finish setup for an already-connected radio and optionally broadcast health."""
|
||||
from app.websocket import broadcast_health
|
||||
from app.websocket import broadcast_error, broadcast_health
|
||||
|
||||
for attempt in range(1, POST_CONNECT_SETUP_MAX_ATTEMPTS + 1):
|
||||
try:
|
||||
await radio_manager.post_connect_setup()
|
||||
break
|
||||
except asyncio.TimeoutError as exc:
|
||||
if attempt < POST_CONNECT_SETUP_MAX_ATTEMPTS:
|
||||
logger.warning(
|
||||
"Post-connect setup timed out after %ds on attempt %d/%d; retrying once",
|
||||
POST_CONNECT_SETUP_TIMEOUT_SECONDS,
|
||||
attempt,
|
||||
POST_CONNECT_SETUP_MAX_ATTEMPTS,
|
||||
)
|
||||
continue
|
||||
|
||||
logger.error(
|
||||
"Post-connect setup timed out after %ds on %d attempts. Initial radio offload "
|
||||
"took too long; something is probably wrong.",
|
||||
POST_CONNECT_SETUP_TIMEOUT_SECONDS,
|
||||
POST_CONNECT_SETUP_MAX_ATTEMPTS,
|
||||
)
|
||||
broadcast_error(
|
||||
"Radio startup appears stuck",
|
||||
"Initial radio offload took too long. Reboot the radio and restart the server.",
|
||||
)
|
||||
raise RuntimeError("Post-connect setup timed out") from exc
|
||||
|
||||
await radio_manager.post_connect_setup()
|
||||
radio_manager._last_connected = True
|
||||
if broadcast_on_success:
|
||||
broadcast_health(True, radio_manager.connection_info)
|
||||
@@ -197,7 +233,11 @@ async def connection_monitor_loop(radio_manager) -> None:
|
||||
await prepare_connected_radio(radio_manager, broadcast_on_success=True)
|
||||
consecutive_setup_failures = 0
|
||||
|
||||
elif current_connected and not radio_manager.is_setup_complete:
|
||||
elif (
|
||||
current_connected
|
||||
and not radio_manager.is_setup_complete
|
||||
and not radio_manager.is_setup_in_progress
|
||||
):
|
||||
logger.info("Retrying post-connect setup...")
|
||||
await prepare_connected_radio(radio_manager, broadcast_on_success=True)
|
||||
consecutive_setup_failures = 0
|
||||
|
||||
@@ -51,6 +51,14 @@ export function ChatHeader({
|
||||
const titleClickable =
|
||||
(conversation.type === 'contact' && onOpenContactInfo) ||
|
||||
(conversation.type === 'channel' && onOpenChannelInfo);
|
||||
const favoriteTitle =
|
||||
conversation.type === 'contact'
|
||||
? isFavorite(favorites, 'contact', conversation.id)
|
||||
? 'Remove from favorites. Favorite contacts stay loaded on the radio for ACK support.'
|
||||
: 'Add to favorites. Favorite contacts stay loaded on the radio for ACK support.'
|
||||
: isFavorite(favorites, conversation.type as 'channel' | 'contact', conversation.id)
|
||||
? 'Remove from favorites'
|
||||
: 'Add to favorites';
|
||||
|
||||
const handleEditFloodScopeOverride = () => {
|
||||
if (conversation.type !== 'channel' || !onSetChannelFloodScopeOverride) return;
|
||||
@@ -184,11 +192,7 @@ export function ChatHeader({
|
||||
onClick={() =>
|
||||
onToggleFavorite(conversation.type as 'channel' | 'contact', conversation.id)
|
||||
}
|
||||
title={
|
||||
isFavorite(favorites, conversation.type as 'channel' | 'contact', conversation.id)
|
||||
? 'Remove from favorites'
|
||||
: 'Add to favorites'
|
||||
}
|
||||
title={favoriteTitle}
|
||||
aria-label={
|
||||
isFavorite(favorites, conversation.type as 'channel' | 'contact', conversation.id)
|
||||
? 'Remove from favorites'
|
||||
|
||||
@@ -279,6 +279,7 @@ export function ContactInfoPane({
|
||||
type="button"
|
||||
className="text-sm flex items-center gap-2 hover:text-primary transition-colors"
|
||||
onClick={() => onToggleFavorite('contact', contact.public_key)}
|
||||
title="Favorite contacts stay loaded on the radio for ACK support"
|
||||
>
|
||||
{isFavorite(favorites, 'contact', contact.public_key) ? (
|
||||
<>
|
||||
|
||||
@@ -111,7 +111,11 @@ export function RepeaterDashboard({
|
||||
<button
|
||||
className="p-1.5 rounded hover:bg-accent text-lg leading-none transition-colors focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring"
|
||||
onClick={() => onToggleFavorite('contact', conversation.id)}
|
||||
title={isFav ? 'Remove from favorites' : 'Add to favorites'}
|
||||
title={
|
||||
isFav
|
||||
? 'Remove from favorites. Favorite contacts stay loaded on the radio for ACK support.'
|
||||
: 'Add to favorites. Favorite contacts stay loaded on the radio for ACK support.'
|
||||
}
|
||||
aria-label={isFav ? 'Remove from favorites' : 'Add to favorites'}
|
||||
>
|
||||
{isFav ? (
|
||||
|
||||
@@ -575,8 +575,7 @@ export function SettingsRadioSection({
|
||||
onChange={(e) => setMaxRadioContacts(e.target.value)}
|
||||
/>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Favorite contacts load first, then recent non-repeater contacts until this limit is
|
||||
reached (1-1000)
|
||||
Favorite contacts stay loaded on the radio for DM ACK support up to this limit (1-1000)
|
||||
</p>
|
||||
</div>
|
||||
|
||||
|
||||
@@ -179,13 +179,13 @@ describe('SettingsModal', () => {
|
||||
expect(screen.queryByLabelText('Preset')).not.toBeInTheDocument();
|
||||
});
|
||||
|
||||
it('shows favorite-first contact sync helper text in radio tab', async () => {
|
||||
it('shows favorite-contact radio sync helper text in radio tab', async () => {
|
||||
renderModal();
|
||||
openRadioSection();
|
||||
|
||||
expect(
|
||||
screen.getByText(
|
||||
/Favorite contacts load first, then recent non-repeater contacts until this\s+limit is reached/i
|
||||
/Favorite contacts stay loaded on the radio for DM ACK support up to this limit/i
|
||||
)
|
||||
).toBeInTheDocument();
|
||||
});
|
||||
|
||||
@@ -273,6 +273,33 @@ class TestConnectionMonitor:
|
||||
mock_broadcast.assert_any_call(True, "TCP: test:4000")
|
||||
assert rm._setup_complete is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_monitor_does_not_retry_while_setup_already_running(self):
|
||||
"""Monitor leaves an in-progress setup alone instead of queueing another one."""
|
||||
from app.radio import RadioManager
|
||||
|
||||
rm = RadioManager()
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.is_connected = True
|
||||
rm._meshcore = mock_mc
|
||||
rm._connection_info = "TCP: test:4000"
|
||||
rm._last_connected = True
|
||||
rm._setup_complete = False
|
||||
rm._setup_in_progress = True
|
||||
rm.post_connect_setup = AsyncMock()
|
||||
|
||||
async def _sleep(_seconds: float):
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with patch("app.radio.asyncio.sleep", side_effect=_sleep):
|
||||
await rm.start_connection_monitor()
|
||||
try:
|
||||
await rm._reconnect_task
|
||||
finally:
|
||||
await rm.stop_connection_monitor()
|
||||
|
||||
rm.post_connect_setup.assert_not_called()
|
||||
|
||||
|
||||
class TestReconnectLock:
|
||||
"""Tests for reconnect() lock serialization — no duplicate reconnections."""
|
||||
@@ -722,3 +749,121 @@ class TestPostConnectSetupOrdering:
|
||||
await rm.post_connect_setup()
|
||||
|
||||
mock_mc.commands.set_flood_scope.assert_awaited_once_with("")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_message_polling_disabled_by_default(self):
|
||||
"""Post-connect setup does not start fallback polling unless explicitly enabled."""
|
||||
from app.models import AppSettings
|
||||
from app.radio import RadioManager
|
||||
|
||||
rm = RadioManager()
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.start_auto_message_fetching = AsyncMock()
|
||||
mock_mc.commands.set_flood_scope = AsyncMock()
|
||||
rm._meshcore = mock_mc
|
||||
|
||||
with (
|
||||
patch("app.event_handlers.register_event_handlers"),
|
||||
patch("app.keystore.export_and_store_private_key", new_callable=AsyncMock),
|
||||
patch("app.radio_sync.sync_radio_time", new_callable=AsyncMock),
|
||||
patch(
|
||||
"app.repository.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=AppSettings(),
|
||||
),
|
||||
patch("app.radio_sync.sync_and_offload_all", new_callable=AsyncMock, return_value={}),
|
||||
patch("app.radio_sync.start_periodic_sync"),
|
||||
patch("app.radio_sync.send_advertisement", new_callable=AsyncMock, return_value=False),
|
||||
patch("app.radio_sync.start_periodic_advert"),
|
||||
patch("app.radio_sync.drain_pending_messages", new_callable=AsyncMock, return_value=0),
|
||||
patch("app.radio_sync.start_message_polling") as mock_start_message_polling,
|
||||
):
|
||||
await rm.post_connect_setup()
|
||||
|
||||
mock_start_message_polling.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_message_polling_starts_when_env_flag_enabled(self):
|
||||
"""Post-connect setup starts fallback polling when the env-backed setting is enabled."""
|
||||
from app.models import AppSettings
|
||||
from app.radio import RadioManager
|
||||
|
||||
rm = RadioManager()
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.start_auto_message_fetching = AsyncMock()
|
||||
mock_mc.commands.set_flood_scope = AsyncMock()
|
||||
rm._meshcore = mock_mc
|
||||
|
||||
with (
|
||||
patch("app.event_handlers.register_event_handlers"),
|
||||
patch("app.keystore.export_and_store_private_key", new_callable=AsyncMock),
|
||||
patch("app.radio_sync.sync_radio_time", new_callable=AsyncMock),
|
||||
patch(
|
||||
"app.repository.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=AppSettings(),
|
||||
),
|
||||
patch("app.radio_sync.sync_and_offload_all", new_callable=AsyncMock, return_value={}),
|
||||
patch("app.radio_sync.start_periodic_sync"),
|
||||
patch("app.radio_sync.send_advertisement", new_callable=AsyncMock, return_value=False),
|
||||
patch("app.radio_sync.start_periodic_advert"),
|
||||
patch("app.radio_sync.drain_pending_messages", new_callable=AsyncMock, return_value=0),
|
||||
patch("app.radio_sync.start_message_polling") as mock_start_message_polling,
|
||||
patch("app.services.radio_lifecycle.settings.enable_message_poll_fallback", True),
|
||||
):
|
||||
await rm.post_connect_setup()
|
||||
|
||||
mock_start_message_polling.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prepare_connected_radio_retries_timeout_once_before_failing(self):
|
||||
"""Hung post-connect setup gets one retry before surfacing an operator error."""
|
||||
from app.radio import RadioManager
|
||||
from app.services.radio_lifecycle import prepare_connected_radio
|
||||
|
||||
rm = RadioManager()
|
||||
rm._connection_info = "Serial: /dev/ttyUSB0"
|
||||
rm.post_connect_setup = AsyncMock(
|
||||
side_effect=[asyncio.TimeoutError(), asyncio.TimeoutError()]
|
||||
)
|
||||
|
||||
with (
|
||||
patch("app.services.radio_lifecycle.logger") as mock_logger,
|
||||
patch("app.websocket.broadcast_error") as mock_broadcast_error,
|
||||
patch("app.websocket.broadcast_health") as mock_broadcast_health,
|
||||
):
|
||||
with pytest.raises(RuntimeError, match="Post-connect setup timed out"):
|
||||
await prepare_connected_radio(rm, broadcast_on_success=True)
|
||||
|
||||
assert rm.post_connect_setup.await_count == 2
|
||||
mock_logger.warning.assert_called_once()
|
||||
mock_logger.error.assert_called_once()
|
||||
mock_broadcast_error.assert_called_once_with(
|
||||
"Radio startup appears stuck",
|
||||
"Initial radio offload took too long. Reboot the radio and restart the server.",
|
||||
)
|
||||
mock_broadcast_health.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prepare_connected_radio_succeeds_on_retry_after_timeout(self):
|
||||
"""A slow first attempt can time out once without failing the reconnect flow."""
|
||||
from app.radio import RadioManager
|
||||
from app.services.radio_lifecycle import prepare_connected_radio
|
||||
|
||||
rm = RadioManager()
|
||||
rm._connection_info = "Serial: /dev/ttyUSB0"
|
||||
rm.post_connect_setup = AsyncMock(side_effect=[asyncio.TimeoutError(), None])
|
||||
|
||||
with (
|
||||
patch("app.services.radio_lifecycle.logger") as mock_logger,
|
||||
patch("app.websocket.broadcast_error") as mock_broadcast_error,
|
||||
patch("app.websocket.broadcast_health") as mock_broadcast_health,
|
||||
):
|
||||
await prepare_connected_radio(rm, broadcast_on_success=True)
|
||||
|
||||
assert rm.post_connect_setup.await_count == 2
|
||||
assert rm._last_connected is True
|
||||
mock_logger.warning.assert_called_once()
|
||||
mock_logger.error.assert_not_called()
|
||||
mock_broadcast_error.assert_not_called()
|
||||
mock_broadcast_health.assert_called_once_with(True, "Serial: /dev/ttyUSB0")
|
||||
|
||||
@@ -16,6 +16,7 @@ from app.radio_sync import (
|
||||
_message_poll_loop,
|
||||
_periodic_advert_loop,
|
||||
_periodic_sync_loop,
|
||||
ensure_contact_on_radio,
|
||||
is_polling_paused,
|
||||
pause_polling,
|
||||
sync_radio_time,
|
||||
@@ -180,10 +181,16 @@ class TestSyncRecentContactsToRadio:
|
||||
"""Test the sync_recent_contacts_to_radio function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_loads_contacts_not_on_radio(self, test_db):
|
||||
"""Contacts not on radio are added via add_contact."""
|
||||
async def test_loads_favorite_contacts_not_on_radio(self, test_db):
|
||||
"""Favorite contacts not on radio are added via add_contact."""
|
||||
await _insert_contact(KEY_A, "Alice", last_contacted=2000)
|
||||
await _insert_contact(KEY_B, "Bob", last_contacted=1000)
|
||||
await AppSettingsRepository.update(
|
||||
favorites=[
|
||||
Favorite(type="contact", id=KEY_A),
|
||||
Favorite(type="contact", id=KEY_B),
|
||||
]
|
||||
)
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
|
||||
@@ -202,16 +209,16 @@ class TestSyncRecentContactsToRadio:
|
||||
assert bob.on_radio is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_favorites_loaded_before_recent_contacts(self, test_db):
|
||||
"""Favorite contacts are loaded first, then recents until limit."""
|
||||
async def test_fills_remaining_slots_with_recently_contacted_then_advertised(self, test_db):
|
||||
"""Fill order is favorites, then recent contacts, then recent adverts."""
|
||||
await _insert_contact(KEY_A, "Alice", last_contacted=100)
|
||||
await _insert_contact(KEY_B, "Bob", last_contacted=2000)
|
||||
await _insert_contact("cc" * 32, "Carol", last_contacted=1000)
|
||||
await _insert_contact("dd" * 32, "Dave", last_advert=3000)
|
||||
await _insert_contact("ee" * 32, "Eve", last_advert=2500)
|
||||
|
||||
# Set max_radio_contacts=2 and add KEY_A as favorite
|
||||
await AppSettingsRepository.update(
|
||||
max_radio_contacts=2,
|
||||
favorites=[Favorite(type="contact", id=KEY_A)],
|
||||
max_radio_contacts=4, favorites=[Favorite(type="contact", id=KEY_A)]
|
||||
)
|
||||
|
||||
mock_mc = MagicMock()
|
||||
@@ -223,22 +230,45 @@ class TestSyncRecentContactsToRadio:
|
||||
radio_manager._meshcore = mock_mc
|
||||
result = await sync_recent_contacts_to_radio()
|
||||
|
||||
assert result["loaded"] == 2
|
||||
# KEY_A (favorite) should be loaded first, then KEY_B (most recent)
|
||||
assert result["loaded"] == 4
|
||||
loaded_keys = [
|
||||
call.args[0]["public_key"] for call in mock_mc.commands.add_contact.call_args_list
|
||||
]
|
||||
assert loaded_keys == [KEY_A, KEY_B]
|
||||
assert loaded_keys == [KEY_A, KEY_B, "cc" * 32, "dd" * 32]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_favorite_contact_not_loaded_twice_if_also_recent(self, test_db):
|
||||
"""A favorite contact that is also recent is loaded only once."""
|
||||
async def test_advert_fill_skips_repeaters(self, test_db):
|
||||
"""Recent advert fallback only considers non-repeaters."""
|
||||
await _insert_contact(KEY_A, "Alice", last_advert=3000, contact_type=2)
|
||||
await _insert_contact(KEY_B, "Bob", last_advert=2000, contact_type=1)
|
||||
|
||||
await AppSettingsRepository.update(max_radio_contacts=1, favorites=[])
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
|
||||
mock_result = MagicMock()
|
||||
mock_result.type = EventType.OK
|
||||
mock_mc.commands.add_contact = AsyncMock(return_value=mock_result)
|
||||
|
||||
radio_manager._meshcore = mock_mc
|
||||
result = await sync_recent_contacts_to_radio()
|
||||
|
||||
assert result["loaded"] == 1
|
||||
payload = mock_mc.commands.add_contact.call_args.args[0]
|
||||
assert payload["public_key"] == KEY_B
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_duplicate_favorite_not_loaded_twice(self, test_db):
|
||||
"""Duplicate favorite entries still load the contact only once."""
|
||||
await _insert_contact(KEY_A, "Alice", last_contacted=2000)
|
||||
await _insert_contact(KEY_B, "Bob", last_contacted=1000)
|
||||
|
||||
await AppSettingsRepository.update(
|
||||
max_radio_contacts=2,
|
||||
favorites=[Favorite(type="contact", id=KEY_A)],
|
||||
favorites=[
|
||||
Favorite(type="contact", id=KEY_A),
|
||||
Favorite(type="contact", id=KEY_A),
|
||||
],
|
||||
)
|
||||
|
||||
mock_mc = MagicMock()
|
||||
@@ -260,6 +290,7 @@ class TestSyncRecentContactsToRadio:
|
||||
async def test_skips_contacts_already_on_radio(self, test_db):
|
||||
"""Contacts already on radio are counted but not re-added."""
|
||||
await _insert_contact(KEY_A, "Alice", on_radio=True)
|
||||
await AppSettingsRepository.update(favorites=[Favorite(type="contact", id=KEY_A)])
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=MagicMock()) # Found
|
||||
@@ -319,6 +350,7 @@ class TestSyncRecentContactsToRadio:
|
||||
async def test_marks_on_radio_when_found_but_not_flagged(self, test_db):
|
||||
"""Contact found on radio but not flagged gets set_on_radio(True)."""
|
||||
await _insert_contact(KEY_A, "Alice", on_radio=False)
|
||||
await AppSettingsRepository.update(favorites=[Favorite(type="contact", id=KEY_A)])
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=MagicMock()) # Found
|
||||
@@ -335,6 +367,7 @@ class TestSyncRecentContactsToRadio:
|
||||
async def test_handles_add_failure(self, test_db):
|
||||
"""Failed add_contact increments the failed counter."""
|
||||
await _insert_contact(KEY_A, "Alice")
|
||||
await AppSettingsRepository.update(favorites=[Favorite(type="contact", id=KEY_A)])
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
|
||||
@@ -360,6 +393,7 @@ class TestSyncRecentContactsToRadio:
|
||||
last_path_len=2,
|
||||
out_path_hash_mode=1,
|
||||
)
|
||||
await AppSettingsRepository.update(favorites=[Favorite(type="contact", id=KEY_A)])
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
|
||||
@@ -388,6 +422,7 @@ class TestSyncRecentContactsToRadio:
|
||||
last_path_len=-125,
|
||||
out_path_hash_mode=2,
|
||||
)
|
||||
await AppSettingsRepository.update(favorites=[Favorite(type="contact", id=KEY_A)])
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
|
||||
@@ -412,6 +447,7 @@ class TestSyncRecentContactsToRadio:
|
||||
so it passes mc directly to avoid deadlock (asyncio.Lock is not reentrant).
|
||||
"""
|
||||
await _insert_contact(KEY_A, "Alice", last_contacted=2000)
|
||||
await AppSettingsRepository.update(favorites=[Favorite(type="contact", id=KEY_A)])
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
|
||||
@@ -451,6 +487,7 @@ class TestSyncRecentContactsToRadio:
|
||||
"""If _meshcore is swapped between pre-check and lock acquisition,
|
||||
the function uses the new (post-lock) instance, not the stale one."""
|
||||
await _insert_contact(KEY_A, "Alice", last_contacted=2000)
|
||||
await AppSettingsRepository.update(favorites=[Favorite(type="contact", id=KEY_A)])
|
||||
|
||||
old_mc = MagicMock(name="old_mc")
|
||||
new_mc = MagicMock(name="new_mc")
|
||||
@@ -471,6 +508,26 @@ class TestSyncRecentContactsToRadio:
|
||||
new_mc.commands.add_contact.assert_called_once()
|
||||
old_mc.commands.add_contact.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ensure_contact_on_radio_loads_single_contact_even_when_not_favorited(
|
||||
self, test_db
|
||||
):
|
||||
"""Targeted sync loads one contact without needing it in favorites."""
|
||||
await _insert_contact(KEY_A, "Alice", last_contacted=2000)
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
|
||||
mock_result = MagicMock()
|
||||
mock_result.type = EventType.OK
|
||||
mock_mc.commands.add_contact = AsyncMock(return_value=mock_result)
|
||||
|
||||
radio_manager._meshcore = mock_mc
|
||||
result = await ensure_contact_on_radio(KEY_A, force=True)
|
||||
|
||||
assert result["loaded"] == 1
|
||||
payload = mock_mc.commands.add_contact.call_args.args[0]
|
||||
assert payload["public_key"] == KEY_A
|
||||
|
||||
|
||||
class TestSyncAndOffloadContacts:
|
||||
"""Test sync_and_offload_contacts: pull contacts from radio, save to DB, remove from radio."""
|
||||
@@ -511,7 +568,7 @@ class TestSyncAndOffloadContacts:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_claims_prefix_messages_for_each_contact(self, test_db):
|
||||
"""claim_prefix_messages is called for each synced contact."""
|
||||
"""Prefix message claims still complete via scheduled reconciliation tasks."""
|
||||
from app.radio_sync import sync_and_offload_contacts
|
||||
|
||||
# Pre-insert a message with a prefix key that matches KEY_A
|
||||
@@ -536,13 +593,73 @@ class TestSyncAndOffloadContacts:
|
||||
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_get_result)
|
||||
mock_mc.commands.remove_contact = AsyncMock(return_value=mock_remove_result)
|
||||
|
||||
await sync_and_offload_contacts(mock_mc)
|
||||
created_tasks: list[asyncio.Task] = []
|
||||
real_create_task = asyncio.create_task
|
||||
|
||||
def _capture_task(coro):
|
||||
task = real_create_task(coro)
|
||||
created_tasks.append(task)
|
||||
return task
|
||||
|
||||
with patch("app.radio_sync.asyncio.create_task", side_effect=_capture_task):
|
||||
await sync_and_offload_contacts(mock_mc)
|
||||
|
||||
await asyncio.gather(*created_tasks)
|
||||
|
||||
# Verify the prefix message was claimed (promoted to full key)
|
||||
messages = await MessageRepository.get_all(conversation_key=KEY_A)
|
||||
assert len(messages) == 1
|
||||
assert messages[0].conversation_key == KEY_A.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconciliation_does_not_block_contact_removal(self, test_db):
|
||||
"""Slow reconciliation work is scheduled in background, not awaited inline."""
|
||||
from app.radio_sync import sync_and_offload_contacts
|
||||
|
||||
contact_payload = {KEY_A: {"adv_name": "Alice", "type": 1, "flags": 0}}
|
||||
|
||||
mock_get_result = MagicMock()
|
||||
mock_get_result.type = EventType.NEW_CONTACT
|
||||
mock_get_result.payload = contact_payload
|
||||
|
||||
mock_remove_result = MagicMock()
|
||||
mock_remove_result.type = EventType.OK
|
||||
|
||||
mock_mc = MagicMock()
|
||||
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_get_result)
|
||||
mock_mc.commands.remove_contact = AsyncMock(return_value=mock_remove_result)
|
||||
|
||||
reconcile_started = asyncio.Event()
|
||||
reconcile_release = asyncio.Event()
|
||||
created_tasks: list[asyncio.Task] = []
|
||||
real_create_task = asyncio.create_task
|
||||
|
||||
async def _slow_reconcile(*, public_key: str, contact_name: str | None, log):
|
||||
del public_key, contact_name, log
|
||||
reconcile_started.set()
|
||||
await reconcile_release.wait()
|
||||
|
||||
def _capture_task(coro):
|
||||
task = real_create_task(coro)
|
||||
created_tasks.append(task)
|
||||
return task
|
||||
|
||||
with (
|
||||
patch("app.radio_sync.reconcile_contact_messages", side_effect=_slow_reconcile),
|
||||
patch("app.radio_sync.asyncio.create_task", side_effect=_capture_task),
|
||||
):
|
||||
result = await sync_and_offload_contacts(mock_mc)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert result["synced"] == 1
|
||||
assert result["removed"] == 1
|
||||
assert reconcile_started.is_set() is True
|
||||
assert created_tasks and created_tasks[0].done() is False
|
||||
mock_mc.commands.remove_contact.assert_awaited_once()
|
||||
|
||||
reconcile_release.set()
|
||||
await asyncio.gather(*created_tasks)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_remove_failure_gracefully(self, test_db):
|
||||
"""Failed remove_contact logs warning but continues to next contact."""
|
||||
@@ -1181,11 +1298,13 @@ class TestPeriodicSyncLoopRaces:
|
||||
with (
|
||||
patch("app.radio_sync.radio_manager", rm),
|
||||
patch("asyncio.sleep", side_effect=mock_sleep),
|
||||
patch("app.radio_sync.cleanup_expired_acks") as mock_cleanup,
|
||||
patch("app.radio_sync.sync_and_offload_all", new_callable=AsyncMock) as mock_sync,
|
||||
patch("app.radio_sync.sync_radio_time", new_callable=AsyncMock) as mock_time,
|
||||
):
|
||||
await _periodic_sync_loop()
|
||||
|
||||
mock_cleanup.assert_called_once()
|
||||
mock_sync.assert_not_called()
|
||||
mock_time.assert_not_called()
|
||||
assert len(sleep_calls) == 2
|
||||
@@ -1201,6 +1320,7 @@ class TestPeriodicSyncLoopRaces:
|
||||
with (
|
||||
patch("app.radio_sync.radio_manager", rm),
|
||||
patch("asyncio.sleep", side_effect=mock_sleep),
|
||||
patch("app.radio_sync.cleanup_expired_acks") as mock_cleanup,
|
||||
patch("app.radio_sync.sync_and_offload_all", new_callable=AsyncMock) as mock_sync,
|
||||
patch("app.radio_sync.sync_radio_time", new_callable=AsyncMock) as mock_time,
|
||||
):
|
||||
@@ -1208,6 +1328,7 @@ class TestPeriodicSyncLoopRaces:
|
||||
finally:
|
||||
lock.release()
|
||||
|
||||
mock_cleanup.assert_called_once()
|
||||
mock_sync.assert_not_called()
|
||||
mock_time.assert_not_called()
|
||||
|
||||
@@ -1222,10 +1343,12 @@ class TestPeriodicSyncLoopRaces:
|
||||
with (
|
||||
patch("app.radio_sync.radio_manager", rm),
|
||||
patch("asyncio.sleep", side_effect=mock_sleep),
|
||||
patch("app.radio_sync.cleanup_expired_acks") as mock_cleanup,
|
||||
patch("app.radio_sync.sync_and_offload_all", new_callable=AsyncMock) as mock_sync,
|
||||
patch("app.radio_sync.sync_radio_time", new_callable=AsyncMock) as mock_time,
|
||||
):
|
||||
await _periodic_sync_loop()
|
||||
|
||||
mock_cleanup.assert_called_once()
|
||||
mock_sync.assert_called_once_with(mock_mc)
|
||||
mock_time.assert_called_once_with(mock_mc)
|
||||
|
||||
@@ -525,6 +525,49 @@ class TestContactRepositoryResolvePrefixes:
|
||||
assert result == {}
|
||||
|
||||
|
||||
class TestContactRepositoryRecentQueries:
|
||||
"""Test recent-contact selection helpers used for radio fill."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_recently_advertised_includes_contacted_contacts(self, test_db):
|
||||
stale_contacted_fresh_advert = "ab" * 32
|
||||
advert_only = "cd" * 32
|
||||
repeater = "ef" * 32
|
||||
|
||||
await ContactRepository.upsert(
|
||||
{
|
||||
"public_key": stale_contacted_fresh_advert,
|
||||
"name": "SeenAgain",
|
||||
"type": 1,
|
||||
"last_contacted": 100,
|
||||
"last_advert": 5000,
|
||||
}
|
||||
)
|
||||
await ContactRepository.upsert(
|
||||
{
|
||||
"public_key": advert_only,
|
||||
"name": "AdvertOnly",
|
||||
"type": 1,
|
||||
"last_advert": 4000,
|
||||
}
|
||||
)
|
||||
await ContactRepository.upsert(
|
||||
{
|
||||
"public_key": repeater,
|
||||
"name": "Repeater",
|
||||
"type": 2,
|
||||
"last_advert": 6000,
|
||||
}
|
||||
)
|
||||
|
||||
contacts = await ContactRepository.get_recently_advertised_non_repeaters()
|
||||
|
||||
assert [contact.public_key for contact in contacts] == [
|
||||
stale_contacted_fresh_advert,
|
||||
advert_only,
|
||||
]
|
||||
|
||||
|
||||
class TestAppSettingsRepository:
|
||||
"""Test AppSettingsRepository parsing and migration edge cases."""
|
||||
|
||||
|
||||
@@ -153,6 +153,9 @@ class TestOutgoingDMBroadcast:
|
||||
assert contact_payload["out_path"] == "aa00bb00"
|
||||
assert contact_payload["out_path_len"] == 2
|
||||
assert contact_payload["out_path_hash_mode"] == 1
|
||||
contact = await ContactRepository.get_by_key(pub_key)
|
||||
assert contact is not None
|
||||
assert contact.on_radio is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_dm_prefers_route_override_over_learned_path(self, test_db):
|
||||
|
||||
@@ -133,21 +133,35 @@ class TestToggleFavorite:
|
||||
@pytest.mark.asyncio
|
||||
async def test_adds_when_not_favorited(self, test_db):
|
||||
request = FavoriteRequest(type="contact", id="aa" * 32)
|
||||
result = await toggle_favorite(request)
|
||||
with (
|
||||
patch("app.radio_sync.ensure_contact_on_radio", new_callable=AsyncMock) as mock_sync,
|
||||
patch("app.routers.settings.asyncio.create_task") as mock_create_task,
|
||||
):
|
||||
mock_create_task.side_effect = lambda coro: coro.close()
|
||||
result = await toggle_favorite(request)
|
||||
|
||||
assert len(result.favorites) == 1
|
||||
assert result.favorites[0].type == "contact"
|
||||
assert result.favorites[0].id == "aa" * 32
|
||||
mock_sync.assert_called_once_with("aa" * 32, force=True)
|
||||
mock_create_task.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_removes_when_already_favorited(self, test_db):
|
||||
# Pre-add a favorite
|
||||
await AppSettingsRepository.add_favorite("channel", "ABCD")
|
||||
await AppSettingsRepository.add_favorite("contact", "aa" * 32)
|
||||
|
||||
request = FavoriteRequest(type="channel", id="ABCD")
|
||||
result = await toggle_favorite(request)
|
||||
request = FavoriteRequest(type="contact", id="aa" * 32)
|
||||
with (
|
||||
patch("app.radio_sync.ensure_contact_on_radio", new_callable=AsyncMock) as mock_sync,
|
||||
patch("app.routers.settings.asyncio.create_task") as mock_create_task,
|
||||
):
|
||||
mock_create_task.side_effect = lambda coro: coro.close()
|
||||
result = await toggle_favorite(request)
|
||||
|
||||
assert result.favorites == []
|
||||
mock_sync.assert_not_called()
|
||||
mock_create_task.assert_not_called()
|
||||
|
||||
|
||||
class TestMigratePreferences:
|
||||
|
||||
Reference in New Issue
Block a user