mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-05-01 11:02:56 +02:00
Offer multiple timing windows for repeater telemetry pickup. Closes #192.
This commit is contained in:
@@ -108,7 +108,8 @@ CREATE TABLE IF NOT EXISTS app_settings (
|
||||
blocked_names TEXT DEFAULT '[]',
|
||||
discovery_blocked_types TEXT DEFAULT '[]',
|
||||
tracked_telemetry_repeaters TEXT DEFAULT '[]',
|
||||
auto_resend_channel INTEGER DEFAULT 0
|
||||
auto_resend_channel INTEGER DEFAULT 0,
|
||||
telemetry_interval_hours INTEGER DEFAULT 8
|
||||
);
|
||||
INSERT OR IGNORE INTO app_settings (id) VALUES (1);
|
||||
|
||||
|
||||
22
app/migrations/_057_telemetry_interval_hours.py
Normal file
22
app/migrations/_057_telemetry_interval_hours.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import logging
|
||||
|
||||
import aiosqlite
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def migrate(conn: aiosqlite.Connection) -> None:
|
||||
"""Add telemetry_interval_hours integer column to app_settings."""
|
||||
tables_cursor = await conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
||||
if "app_settings" not in {row[0] for row in await tables_cursor.fetchall()}:
|
||||
await conn.commit()
|
||||
return
|
||||
col_cursor = await conn.execute("PRAGMA table_info(app_settings)")
|
||||
columns = {row[1] for row in await col_cursor.fetchall()}
|
||||
if "telemetry_interval_hours" not in columns:
|
||||
# Default to 8 hours, matching the previous hard-coded interval
|
||||
# so existing users see no behavior change until they opt in.
|
||||
await conn.execute(
|
||||
"ALTER TABLE app_settings ADD COLUMN telemetry_interval_hours INTEGER DEFAULT 8"
|
||||
)
|
||||
await conn.commit()
|
||||
@@ -842,6 +842,14 @@ class AppSettings(BaseModel):
|
||||
default_factory=list,
|
||||
description="Public keys of repeaters opted into periodic telemetry collection (max 8)",
|
||||
)
|
||||
telemetry_interval_hours: int = Field(
|
||||
default=8,
|
||||
description=(
|
||||
"User-preferred telemetry collection interval in hours. The backend "
|
||||
"clamps this up to the shortest legal interval given the number of "
|
||||
"tracked repeaters so daily checks stay under a 24/day ceiling."
|
||||
),
|
||||
)
|
||||
auto_resend_channel: bool = Field(
|
||||
default=False,
|
||||
description=(
|
||||
|
||||
@@ -14,6 +14,7 @@ import logging
|
||||
import math
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Literal
|
||||
|
||||
from meshcore import EventType, MeshCore
|
||||
@@ -36,6 +37,7 @@ from app.services.contact_reconciliation import (
|
||||
)
|
||||
from app.services.messages import create_fallback_channel_message
|
||||
from app.services.radio_runtime import radio_runtime as radio_manager
|
||||
from app.telemetry_interval import clamp_telemetry_interval
|
||||
from app.websocket import broadcast_error, broadcast_event
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -159,10 +161,10 @@ MIN_ADVERT_INTERVAL = 3600
|
||||
# Periodic telemetry collection task handle
|
||||
_telemetry_collect_task: asyncio.Task | None = None
|
||||
|
||||
# Telemetry collection interval (8 hours)
|
||||
TELEMETRY_COLLECT_INTERVAL = 8 * 3600
|
||||
|
||||
# Initial delay before the first telemetry collection cycle (let radio settle)
|
||||
# Initial delay before the scheduler starts (let radio settle). After this,
|
||||
# the loop wakes at each UTC top-of-hour and decides whether to run a cycle
|
||||
# based on the user's telemetry_interval_hours preference, clamped up to
|
||||
# the shortest-legal interval for the current tracked-repeater count.
|
||||
TELEMETRY_COLLECT_INITIAL_DELAY = 60
|
||||
|
||||
# Counter to pause polling during repeater operations (supports nested pauses)
|
||||
@@ -1656,62 +1658,122 @@ async def _collect_repeater_telemetry(mc: MeshCore, contact: Contact) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
async def _run_telemetry_cycle() -> None:
|
||||
"""Collect one telemetry sample from every tracked repeater."""
|
||||
if not radio_manager.is_connected:
|
||||
logger.debug("Telemetry collect: radio not connected, skipping cycle")
|
||||
return
|
||||
|
||||
app_settings = await AppSettingsRepository.get()
|
||||
tracked = app_settings.tracked_telemetry_repeaters
|
||||
if not tracked:
|
||||
return
|
||||
|
||||
logger.info("Telemetry collect: starting cycle for %d repeater(s)", len(tracked))
|
||||
collected = 0
|
||||
|
||||
for pub_key in tracked:
|
||||
contact = await ContactRepository.get_by_key(pub_key)
|
||||
if not contact or contact.type != 2:
|
||||
logger.debug(
|
||||
"Telemetry collect: skipping %s (not found or not repeater)",
|
||||
pub_key[:12],
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
async with radio_manager.radio_operation(
|
||||
"telemetry_collect",
|
||||
blocking=False,
|
||||
suspend_auto_fetch=True,
|
||||
) as mc:
|
||||
if await _collect_repeater_telemetry(mc, contact):
|
||||
collected += 1
|
||||
except RadioOperationBusyError:
|
||||
logger.debug(
|
||||
"Telemetry collect: radio busy, skipping %s",
|
||||
pub_key[:12],
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Telemetry collect: cycle complete, %d/%d successful",
|
||||
collected,
|
||||
len(tracked),
|
||||
)
|
||||
|
||||
|
||||
async def _sleep_until_next_utc_top_of_hour() -> None:
|
||||
"""Sleep until the next UTC top-of-hour (or a minimum of 1 second)."""
|
||||
now = datetime.now(UTC)
|
||||
next_top = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
|
||||
delay = (next_top - now).total_seconds()
|
||||
if delay < 1:
|
||||
delay = 1
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
|
||||
async def _maybe_run_scheduled_cycle(now: datetime) -> None:
|
||||
"""Evaluate the modulo gate for the given UTC time and run a cycle if due.
|
||||
|
||||
Factored out of the loop so we can also invoke it immediately after the
|
||||
post-boot initial delay — otherwise a restart within the initial-delay
|
||||
window before a scheduled boundary would carry the task past that boundary
|
||||
and skip a due cycle (for 24h cadence users, that's a full day of missed
|
||||
telemetry).
|
||||
"""
|
||||
app_settings = await AppSettingsRepository.get()
|
||||
tracked_count = len(app_settings.tracked_telemetry_repeaters)
|
||||
if tracked_count == 0:
|
||||
return
|
||||
effective_hours = clamp_telemetry_interval(app_settings.telemetry_interval_hours, tracked_count)
|
||||
if effective_hours <= 0:
|
||||
return
|
||||
if now.hour % effective_hours != 0:
|
||||
return
|
||||
await _run_telemetry_cycle()
|
||||
|
||||
|
||||
async def _telemetry_collect_loop() -> None:
|
||||
"""Background task that collects telemetry from tracked repeaters every 8 hours.
|
||||
"""Background task that runs tracked-repeater telemetry collection.
|
||||
|
||||
Runs a first cycle after a short initial delay (so newly tracked repeaters
|
||||
get a sample promptly), then sleeps the full interval between subsequent cycles.
|
||||
After an initial post-boot delay we evaluate the modulo gate once
|
||||
(covers the edge case where the initial delay crossed a scheduled
|
||||
boundary on restart). Then we wake at every UTC top-of-hour and
|
||||
evaluate the gate again. A cycle runs only when
|
||||
``current_utc_hour % effective_interval_hours == 0``, where the
|
||||
effective interval is the user preference clamped up to the shortest
|
||||
legal interval for the current tracked-repeater count. This keeps the
|
||||
total daily check count bounded at ``DAILY_CHECK_CEILING`` (24).
|
||||
|
||||
Acquires the radio lock per-repeater (non-blocking) so manual operations can
|
||||
The loop never updates the stored user preference. If the user picks a
|
||||
short interval and then adds repeaters that make it illegal, they keep
|
||||
their pick stored and we silently use the clamped value until they drop
|
||||
repeaters.
|
||||
|
||||
Radio lock is acquired per-repeater (non-blocking) so manual ops can
|
||||
interleave. Failures are logged and skipped.
|
||||
"""
|
||||
first_run = True
|
||||
try:
|
||||
await asyncio.sleep(TELEMETRY_COLLECT_INITIAL_DELAY)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Telemetry collect task cancelled before initial delay")
|
||||
return
|
||||
|
||||
# Post-boot boundary check: if the delay carried us into a matching hour
|
||||
# (or we booted exactly at a matching hour), run now rather than waiting
|
||||
# another full cycle.
|
||||
try:
|
||||
await _maybe_run_scheduled_cycle(datetime.now(UTC))
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Telemetry collect task cancelled after initial delay")
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error("Error in post-boot telemetry check: %s", e, exc_info=True)
|
||||
|
||||
while True:
|
||||
try:
|
||||
delay = TELEMETRY_COLLECT_INITIAL_DELAY if first_run else TELEMETRY_COLLECT_INTERVAL
|
||||
await asyncio.sleep(delay)
|
||||
first_run = False
|
||||
|
||||
if not radio_manager.is_connected:
|
||||
logger.debug("Telemetry collect: radio not connected, skipping cycle")
|
||||
continue
|
||||
|
||||
app_settings = await AppSettingsRepository.get()
|
||||
tracked = app_settings.tracked_telemetry_repeaters
|
||||
if not tracked:
|
||||
continue
|
||||
|
||||
logger.info("Telemetry collect: starting cycle for %d repeater(s)", len(tracked))
|
||||
collected = 0
|
||||
|
||||
for pub_key in tracked:
|
||||
contact = await ContactRepository.get_by_key(pub_key)
|
||||
if not contact or contact.type != 2:
|
||||
logger.debug(
|
||||
"Telemetry collect: skipping %s (not found or not repeater)",
|
||||
pub_key[:12],
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
async with radio_manager.radio_operation(
|
||||
"telemetry_collect",
|
||||
blocking=False,
|
||||
suspend_auto_fetch=True,
|
||||
) as mc:
|
||||
if await _collect_repeater_telemetry(mc, contact):
|
||||
collected += 1
|
||||
except RadioOperationBusyError:
|
||||
logger.debug(
|
||||
"Telemetry collect: radio busy, skipping %s",
|
||||
pub_key[:12],
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Telemetry collect: cycle complete, %d/%d successful",
|
||||
collected,
|
||||
len(tracked),
|
||||
)
|
||||
await _sleep_until_next_utc_top_of_hour()
|
||||
await _maybe_run_scheduled_cycle(datetime.now(UTC))
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Telemetry collect task cancelled")
|
||||
@@ -1725,10 +1787,7 @@ def start_telemetry_collect() -> None:
|
||||
global _telemetry_collect_task
|
||||
if _telemetry_collect_task is None or _telemetry_collect_task.done():
|
||||
_telemetry_collect_task = asyncio.create_task(_telemetry_collect_loop())
|
||||
logger.info(
|
||||
"Started periodic telemetry collection (interval: %ds)",
|
||||
TELEMETRY_COLLECT_INTERVAL,
|
||||
)
|
||||
logger.info("Started periodic telemetry collection (UTC-hourly scheduler)")
|
||||
|
||||
|
||||
async def stop_telemetry_collect() -> None:
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Any
|
||||
from app.database import db
|
||||
from app.models import AppSettings
|
||||
from app.path_utils import bucket_path_hash_widths
|
||||
from app.telemetry_interval import DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -30,7 +31,8 @@ class AppSettingsRepository:
|
||||
last_message_times,
|
||||
advert_interval, last_advert_time, flood_scope,
|
||||
blocked_keys, blocked_names, discovery_blocked_types,
|
||||
tracked_telemetry_repeaters, auto_resend_channel
|
||||
tracked_telemetry_repeaters, auto_resend_channel,
|
||||
telemetry_interval_hours
|
||||
FROM app_settings WHERE id = 1
|
||||
"""
|
||||
)
|
||||
@@ -91,6 +93,16 @@ class AppSettingsRepository:
|
||||
except (KeyError, TypeError):
|
||||
auto_resend_channel = False
|
||||
|
||||
# Parse telemetry_interval_hours (migration adds the column with
|
||||
# default=8, but guard against older rows / partial migrations).
|
||||
try:
|
||||
raw_interval = row["telemetry_interval_hours"]
|
||||
telemetry_interval_hours = (
|
||||
int(raw_interval) if raw_interval is not None else DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
)
|
||||
except (KeyError, TypeError, ValueError):
|
||||
telemetry_interval_hours = DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
|
||||
return AppSettings(
|
||||
max_radio_contacts=row["max_radio_contacts"],
|
||||
auto_decrypt_dm_on_advert=bool(row["auto_decrypt_dm_on_advert"]),
|
||||
@@ -103,6 +115,7 @@ class AppSettingsRepository:
|
||||
discovery_blocked_types=discovery_blocked_types,
|
||||
tracked_telemetry_repeaters=tracked_telemetry_repeaters,
|
||||
auto_resend_channel=auto_resend_channel,
|
||||
telemetry_interval_hours=telemetry_interval_hours,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@@ -118,6 +131,7 @@ class AppSettingsRepository:
|
||||
discovery_blocked_types: list[int] | None = None,
|
||||
tracked_telemetry_repeaters: list[str] | None = None,
|
||||
auto_resend_channel: bool | None = None,
|
||||
telemetry_interval_hours: int | None = None,
|
||||
) -> AppSettings:
|
||||
"""Update app settings. Only provided fields are updated."""
|
||||
updates = []
|
||||
@@ -167,6 +181,10 @@ class AppSettingsRepository:
|
||||
updates.append("auto_resend_channel = ?")
|
||||
params.append(1 if auto_resend_channel else 0)
|
||||
|
||||
if telemetry_interval_hours is not None:
|
||||
updates.append("telemetry_interval_hours = ?")
|
||||
params.append(telemetry_interval_hours)
|
||||
|
||||
if updates:
|
||||
query = f"UPDATE app_settings SET {', '.join(updates)} WHERE id = 1"
|
||||
await db.conn.execute(query, params)
|
||||
|
||||
@@ -8,6 +8,13 @@ from pydantic import BaseModel, Field
|
||||
from app.models import CONTACT_TYPE_REPEATER, AppSettings
|
||||
from app.region_scope import normalize_region_scope
|
||||
from app.repository import AppSettingsRepository, ChannelRepository, ContactRepository
|
||||
from app.telemetry_interval import (
|
||||
DEFAULT_TELEMETRY_INTERVAL_HOURS,
|
||||
TELEMETRY_INTERVAL_OPTIONS_HOURS,
|
||||
clamp_telemetry_interval,
|
||||
legal_interval_options,
|
||||
next_run_timestamp_utc,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/settings", tags=["settings"])
|
||||
@@ -57,6 +64,15 @@ class AppSettingsUpdate(BaseModel):
|
||||
default=None,
|
||||
description="Auto-resend channel messages once if no echo heard within 2 seconds",
|
||||
)
|
||||
telemetry_interval_hours: int | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Preferred tracked-repeater telemetry interval in hours. "
|
||||
f"Must be one of {list(TELEMETRY_INTERVAL_OPTIONS_HOURS)}. "
|
||||
"Effective interval is clamped up to the shortest legal value "
|
||||
"based on the current tracked-repeater count."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class BlockKeyRequest(BaseModel):
|
||||
@@ -82,6 +98,29 @@ class TrackedTelemetryRequest(BaseModel):
|
||||
public_key: str = Field(description="Public key of the repeater to toggle tracking")
|
||||
|
||||
|
||||
class TelemetrySchedule(BaseModel):
|
||||
"""Surface of telemetry scheduling derivations for the UI.
|
||||
|
||||
``preferred_hours`` is the stored user choice. ``effective_hours`` is the
|
||||
value the scheduler actually uses (preferred, clamped up to the shortest
|
||||
legal interval given the current tracked-repeater count). ``options``
|
||||
lists the subset of the menu that is legal at the current count; the UI
|
||||
should hide anything not in this list. ``next_run_at`` is the Unix
|
||||
timestamp (seconds, UTC) of the next scheduled cycle, or ``None`` when
|
||||
no repeaters are tracked (nothing to schedule).
|
||||
"""
|
||||
|
||||
preferred_hours: int = Field(description="User's saved telemetry interval preference")
|
||||
effective_hours: int = Field(description="Scheduler's clamped interval")
|
||||
options: list[int] = Field(description="Legal interval choices at the current count")
|
||||
tracked_count: int = Field(description="Number of repeaters currently tracked")
|
||||
max_tracked: int = Field(description="Maximum number of repeaters that can be tracked")
|
||||
next_run_at: int | None = Field(
|
||||
default=None,
|
||||
description="Unix timestamp (UTC seconds) of the next scheduled cycle",
|
||||
)
|
||||
|
||||
|
||||
class TrackedTelemetryResponse(BaseModel):
|
||||
tracked_telemetry_repeaters: list[str] = Field(
|
||||
description="Current list of tracked repeater public keys"
|
||||
@@ -89,6 +128,24 @@ class TrackedTelemetryResponse(BaseModel):
|
||||
names: dict[str, str] = Field(
|
||||
description="Map of public key to display name for tracked repeaters"
|
||||
)
|
||||
schedule: TelemetrySchedule = Field(description="Current scheduling state")
|
||||
|
||||
|
||||
def _build_schedule(tracked_count: int, preferred_hours: int | None) -> TelemetrySchedule:
|
||||
pref = (
|
||||
preferred_hours
|
||||
if preferred_hours in TELEMETRY_INTERVAL_OPTIONS_HOURS
|
||||
else DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
)
|
||||
effective = clamp_telemetry_interval(pref, tracked_count)
|
||||
return TelemetrySchedule(
|
||||
preferred_hours=pref,
|
||||
effective_hours=effective,
|
||||
options=legal_interval_options(tracked_count),
|
||||
tracked_count=tracked_count,
|
||||
max_tracked=MAX_TRACKED_TELEMETRY_REPEATERS,
|
||||
next_run_at=next_run_timestamp_utc(effective) if tracked_count > 0 else None,
|
||||
)
|
||||
|
||||
|
||||
@router.get("", response_model=AppSettings)
|
||||
@@ -136,6 +193,20 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings:
|
||||
if update.auto_resend_channel is not None:
|
||||
kwargs["auto_resend_channel"] = update.auto_resend_channel
|
||||
|
||||
# Telemetry interval preference. Invalid values fall back to default
|
||||
# rather than 400-ing so a stale client can't brick settings saves.
|
||||
if update.telemetry_interval_hours is not None:
|
||||
raw_interval = update.telemetry_interval_hours
|
||||
if raw_interval not in TELEMETRY_INTERVAL_OPTIONS_HOURS:
|
||||
logger.warning(
|
||||
"telemetry_interval_hours=%r is not in the menu; defaulting to %d",
|
||||
raw_interval,
|
||||
DEFAULT_TELEMETRY_INTERVAL_HOURS,
|
||||
)
|
||||
raw_interval = DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
logger.info("Updating telemetry_interval_hours to %d", raw_interval)
|
||||
kwargs["telemetry_interval_hours"] = raw_interval
|
||||
|
||||
# Flood scope
|
||||
flood_scope_changed = False
|
||||
if update.flood_scope is not None:
|
||||
@@ -229,6 +300,7 @@ async def toggle_tracked_telemetry(request: TrackedTelemetryRequest) -> TrackedT
|
||||
return TrackedTelemetryResponse(
|
||||
tracked_telemetry_repeaters=new_list,
|
||||
names=await _resolve_names(new_list),
|
||||
schedule=_build_schedule(len(new_list), settings.telemetry_interval_hours),
|
||||
)
|
||||
|
||||
# Validate it's a repeater
|
||||
@@ -255,4 +327,20 @@ async def toggle_tracked_telemetry(request: TrackedTelemetryRequest) -> TrackedT
|
||||
return TrackedTelemetryResponse(
|
||||
tracked_telemetry_repeaters=new_list,
|
||||
names=await _resolve_names(new_list),
|
||||
schedule=_build_schedule(len(new_list), settings.telemetry_interval_hours),
|
||||
)
|
||||
|
||||
|
||||
@router.get("/tracked-telemetry/schedule", response_model=TelemetrySchedule)
|
||||
async def get_telemetry_schedule() -> TelemetrySchedule:
|
||||
"""Return the current telemetry scheduling derivation.
|
||||
|
||||
The UI uses this to render the interval dropdown (legal options),
|
||||
surface saved-vs-effective when they differ, and show the next-run-at
|
||||
timestamp so users know when the next cycle will fire.
|
||||
"""
|
||||
app_settings = await AppSettingsRepository.get()
|
||||
return _build_schedule(
|
||||
len(app_settings.tracked_telemetry_repeaters),
|
||||
app_settings.telemetry_interval_hours,
|
||||
)
|
||||
|
||||
88
app/telemetry_interval.py
Normal file
88
app/telemetry_interval.py
Normal file
@@ -0,0 +1,88 @@
|
||||
"""Shared math for the tracked-repeater telemetry scheduler.
|
||||
|
||||
The app enforces a ceiling of 24 repeater status checks per 24 hours across
|
||||
all tracked repeaters. With N repeaters tracked, the shortest legal interval
|
||||
is ``24 // floor(24 / N)`` hours. Longer intervals (``12`` or ``24``) are
|
||||
always legal at any N and are offered as user choices on top of the derived
|
||||
shortest-legal value.
|
||||
|
||||
The user picks an interval via settings. The scheduler uses
|
||||
``clamp_telemetry_interval`` to push that pick up to the shortest legal
|
||||
interval if the user has added repeaters that invalidated their choice.
|
||||
The stored preference is *not* mutated on clamp — users get their pick back
|
||||
if they later drop repeaters.
|
||||
"""
|
||||
|
||||
from datetime import UTC, datetime
|
||||
|
||||
# Daily check budget: total number of repeater status checks we allow
|
||||
# across all tracked repeaters per 24-hour window.
|
||||
DAILY_CHECK_CEILING = 24
|
||||
|
||||
# Menu of interval values shown to users. The derivation-based options
|
||||
# (1..8) are filtered per current repeater count via
|
||||
# ``legal_interval_options``; 12 and 24 are always legal.
|
||||
TELEMETRY_INTERVAL_OPTIONS_HOURS: tuple[int, ...] = (1, 2, 3, 4, 6, 8, 12, 24)
|
||||
|
||||
DEFAULT_TELEMETRY_INTERVAL_HOURS = 8
|
||||
|
||||
|
||||
def shortest_legal_interval_hours(n_tracked: int) -> int:
|
||||
"""Return the shortest interval (hours) that keeps under the daily ceiling.
|
||||
|
||||
With ``N`` repeaters, each full cycle costs ``N`` checks. We're capped at
|
||||
``DAILY_CHECK_CEILING`` checks/day, so the maximum cycles/day is
|
||||
``floor(24 / N)`` and the resulting interval is ``24 // cycles_per_day``.
|
||||
For ``N == 0`` we return the default so the math still terminates, though
|
||||
the scheduler skips empty-tracked cycles regardless.
|
||||
"""
|
||||
if n_tracked <= 0:
|
||||
return DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
cycles_per_day = DAILY_CHECK_CEILING // n_tracked
|
||||
if cycles_per_day <= 0:
|
||||
# Would exceed ceiling even at 24h cadence; fall back to 24h.
|
||||
return 24
|
||||
return 24 // cycles_per_day
|
||||
|
||||
|
||||
def clamp_telemetry_interval(preferred_hours: int, n_tracked: int) -> int:
|
||||
"""Return the effective interval: max of user preference and shortest legal.
|
||||
|
||||
Unrecognized values fall back to the default.
|
||||
"""
|
||||
if preferred_hours not in TELEMETRY_INTERVAL_OPTIONS_HOURS:
|
||||
preferred_hours = DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
shortest = shortest_legal_interval_hours(n_tracked)
|
||||
return max(preferred_hours, shortest)
|
||||
|
||||
|
||||
def legal_interval_options(n_tracked: int) -> list[int]:
|
||||
"""Return the subset of the interval menu that is legal for a given N."""
|
||||
shortest = shortest_legal_interval_hours(n_tracked)
|
||||
return [h for h in TELEMETRY_INTERVAL_OPTIONS_HOURS if h >= shortest]
|
||||
|
||||
|
||||
def next_run_timestamp_utc(effective_hours: int, now: datetime | None = None) -> int:
|
||||
"""Return Unix timestamp for the next UTC top-of-hour where
|
||||
``hour % effective_hours == 0``.
|
||||
|
||||
Returns the next matching hour strictly in the future (never ``now``
|
||||
itself, even if ``now`` lies exactly on a matching boundary).
|
||||
"""
|
||||
if effective_hours <= 0:
|
||||
effective_hours = DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
if now is None:
|
||||
now = datetime.now(UTC)
|
||||
else:
|
||||
now = now.astimezone(UTC)
|
||||
|
||||
# Round up to the next top-of-hour, then skip forward until the modulo matches.
|
||||
candidate = now.replace(minute=0, second=0, microsecond=0)
|
||||
# Always move at least one hour forward so "now" never matches.
|
||||
candidate = candidate.replace(hour=candidate.hour)
|
||||
from datetime import timedelta
|
||||
|
||||
candidate = candidate + timedelta(hours=1)
|
||||
while candidate.hour % effective_hours != 0:
|
||||
candidate = candidate + timedelta(hours=1)
|
||||
return int(candidate.timestamp())
|
||||
@@ -33,6 +33,7 @@ import type {
|
||||
RepeaterRadioSettingsResponse,
|
||||
RepeaterStatusResponse,
|
||||
TelemetryHistoryEntry,
|
||||
TelemetrySchedule,
|
||||
TrackedTelemetryResponse,
|
||||
StatisticsResponse,
|
||||
TraceResponse,
|
||||
@@ -332,6 +333,8 @@ export const api = {
|
||||
body: JSON.stringify({ public_key: publicKey }),
|
||||
}),
|
||||
|
||||
getTelemetrySchedule: () => fetchJson<TelemetrySchedule>('/settings/tracked-telemetry/schedule'),
|
||||
|
||||
// Favorites
|
||||
toggleFavorite: (type: 'channel' | 'contact', id: string) =>
|
||||
fetchJson<{ type: string; id: string; favorite: boolean }>('/settings/favorites/toggle', {
|
||||
|
||||
@@ -15,6 +15,7 @@ import type {
|
||||
Contact,
|
||||
HealthStatus,
|
||||
TelemetryHistoryEntry,
|
||||
TelemetrySchedule,
|
||||
} from '../../types';
|
||||
|
||||
export function SettingsDatabaseSection({
|
||||
@@ -54,19 +55,45 @@ export function SettingsDatabaseSection({
|
||||
const [discoveryBlockedTypes, setDiscoveryBlockedTypes] = useState<number[]>([]);
|
||||
const [bulkDeleteOpen, setBulkDeleteOpen] = useState(false);
|
||||
|
||||
const [busy, setBusy] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
|
||||
const [latestTelemetry, setLatestTelemetry] = useState<
|
||||
Record<string, TelemetryHistoryEntry | null>
|
||||
>({});
|
||||
const telemetryFetchedRef = useRef(false);
|
||||
|
||||
const [schedule, setSchedule] = useState<TelemetrySchedule | null>(null);
|
||||
const [intervalDraft, setIntervalDraft] = useState<number>(appSettings.telemetry_interval_hours);
|
||||
|
||||
// Serialization chain for every auto-persisted control on this page.
|
||||
// Without this, rapid successive toggles (or mixed dropdown + checkbox
|
||||
// interactions) can dispatch overlapping PATCHes that land out of order
|
||||
// on HTTP/2 — a stale write then wins, reverting the user's last click.
|
||||
// Each call awaits the previous one before sending its request, so the
|
||||
// server sees updates in the order the user made them.
|
||||
const saveChainRef = useRef<Promise<void>>(Promise.resolve());
|
||||
|
||||
useEffect(() => {
|
||||
setAutoDecryptOnAdvert(appSettings.auto_decrypt_dm_on_advert);
|
||||
setDiscoveryBlockedTypes(appSettings.discovery_blocked_types ?? []);
|
||||
setIntervalDraft(appSettings.telemetry_interval_hours);
|
||||
}, [appSettings]);
|
||||
|
||||
// Re-fetch the scheduler derivation whenever the tracked list changes or
|
||||
// the stored preference changes. Cheap: single GET, no radio lock.
|
||||
useEffect(() => {
|
||||
let cancelled = false;
|
||||
api
|
||||
.getTelemetrySchedule()
|
||||
.then((s) => {
|
||||
if (!cancelled) setSchedule(s);
|
||||
})
|
||||
.catch(() => {
|
||||
// Non-critical: dropdown falls back to the unfiltered menu.
|
||||
});
|
||||
return () => {
|
||||
cancelled = true;
|
||||
};
|
||||
}, [trackedTelemetryRepeaters.length, appSettings.telemetry_interval_hours]);
|
||||
|
||||
useEffect(() => {
|
||||
if (trackedTelemetryRepeaters.length === 0 || telemetryFetchedRef.current) return;
|
||||
telemetryFetchedRef.current = true;
|
||||
@@ -132,28 +159,26 @@ export function SettingsDatabaseSection({
|
||||
}
|
||||
};
|
||||
|
||||
const handleSave = async () => {
|
||||
setBusy(true);
|
||||
setError(null);
|
||||
|
||||
try {
|
||||
const update: AppSettingsUpdate = { auto_decrypt_dm_on_advert: autoDecryptOnAdvert };
|
||||
const currentBlocked = appSettings.discovery_blocked_types ?? [];
|
||||
if (
|
||||
discoveryBlockedTypes.length !== currentBlocked.length ||
|
||||
discoveryBlockedTypes.some((t) => !currentBlocked.includes(t))
|
||||
) {
|
||||
update.discovery_blocked_types = discoveryBlockedTypes;
|
||||
/**
|
||||
* Apply an AppSettings PATCH after any already-queued saves finish, and
|
||||
* revert local state if the save fails. Every auto-persist control on
|
||||
* this page routes through here so the user-visible order of clicks is
|
||||
* the order the backend sees, regardless of network reordering.
|
||||
*/
|
||||
const persistAppSettings = (update: AppSettingsUpdate, revert: () => void): Promise<void> => {
|
||||
const chained = saveChainRef.current.then(async () => {
|
||||
try {
|
||||
await onSaveAppSettings(update);
|
||||
} catch (err) {
|
||||
console.error('Failed to save database settings:', err);
|
||||
revert();
|
||||
toast.error('Failed to save setting', {
|
||||
description: err instanceof Error ? err.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
await onSaveAppSettings(update);
|
||||
toast.success('Database settings saved');
|
||||
} catch (err) {
|
||||
console.error('Failed to save database settings:', err);
|
||||
setError(err instanceof Error ? err.message : 'Failed to save');
|
||||
toast.error('Failed to save settings');
|
||||
} finally {
|
||||
setBusy(false);
|
||||
}
|
||||
});
|
||||
saveChainRef.current = chained;
|
||||
return chained;
|
||||
};
|
||||
|
||||
return (
|
||||
@@ -249,7 +274,14 @@ export function SettingsDatabaseSection({
|
||||
<input
|
||||
type="checkbox"
|
||||
checked={autoDecryptOnAdvert}
|
||||
onChange={(e) => setAutoDecryptOnAdvert(e.target.checked)}
|
||||
onChange={(e) => {
|
||||
const next = e.target.checked;
|
||||
const prev = autoDecryptOnAdvert;
|
||||
setAutoDecryptOnAdvert(next);
|
||||
void persistAppSettings({ auto_decrypt_dm_on_advert: next }, () =>
|
||||
setAutoDecryptOnAdvert(prev)
|
||||
);
|
||||
}}
|
||||
className="w-4 h-4 rounded border-input accent-primary"
|
||||
/>
|
||||
<span className="text-sm">Auto-decrypt historical DMs when new contact advertises</span>
|
||||
@@ -266,10 +298,61 @@ export function SettingsDatabaseSection({
|
||||
<div className="space-y-3">
|
||||
<Label className="text-base">Tracked Repeater Telemetry</Label>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Repeaters opted into automatic telemetry collection are polled every 8 hours. Up to 8
|
||||
repeaters may be tracked at a time ({trackedTelemetryRepeaters.length} / 8 slots used).
|
||||
Repeaters opted into automatic telemetry collection are polled on a scheduled interval. To
|
||||
limit mesh traffic, the app caps telemetry at 24 checks per day across all tracked
|
||||
repeaters — so fewer tracked repeaters allows shorter intervals, and more tracked
|
||||
repeaters forces longer ones. Up to {schedule?.max_tracked ?? 8} repeaters may be tracked
|
||||
at once ({trackedTelemetryRepeaters.length} / {schedule?.max_tracked ?? 8} slots used).
|
||||
</p>
|
||||
|
||||
{/* Interval picker. Legal options depend on current tracked count;
|
||||
we list only those. If the saved preference is no longer legal,
|
||||
the effective interval is shown below so the user knows what the
|
||||
scheduler is actually using. */}
|
||||
<div className="space-y-1.5">
|
||||
<Label htmlFor="telemetry-interval" className="text-sm">
|
||||
Collection interval
|
||||
</Label>
|
||||
<div className="flex items-center gap-2">
|
||||
<select
|
||||
id="telemetry-interval"
|
||||
value={intervalDraft}
|
||||
onChange={(e) => {
|
||||
const nextValue = Number(e.target.value);
|
||||
if (!Number.isFinite(nextValue) || nextValue === intervalDraft) return;
|
||||
const prevValue = intervalDraft;
|
||||
setIntervalDraft(nextValue);
|
||||
void persistAppSettings({ telemetry_interval_hours: nextValue }, () =>
|
||||
setIntervalDraft(prevValue)
|
||||
);
|
||||
}}
|
||||
className="h-9 px-3 rounded-md border border-input bg-background text-sm ring-offset-background focus:outline-none focus:ring-2 focus:ring-ring focus:ring-offset-2"
|
||||
>
|
||||
{(schedule?.options ?? [1, 2, 3, 4, 6, 8, 12, 24]).map((hrs) => (
|
||||
<option key={hrs} value={hrs}>
|
||||
Every {hrs} hour{hrs === 1 ? '' : 's'} ({Math.floor(24 / hrs)} check
|
||||
{Math.floor(24 / hrs) === 1 ? '' : 's'}/day)
|
||||
</option>
|
||||
))}
|
||||
</select>
|
||||
</div>
|
||||
{schedule && schedule.effective_hours !== schedule.preferred_hours && (
|
||||
<p className="text-xs text-warning">
|
||||
Saved preference is {schedule.preferred_hours} hour
|
||||
{schedule.preferred_hours === 1 ? '' : 's'}, but the scheduler is using{' '}
|
||||
{schedule.effective_hours} hours because {schedule.tracked_count} repeater
|
||||
{schedule.tracked_count === 1 ? '' : 's'}{' '}
|
||||
{schedule.tracked_count === 1 ? 'is' : 'are'} tracked. Your preference will be
|
||||
restored if you drop back to a supported count.
|
||||
</p>
|
||||
)}
|
||||
{schedule?.next_run_at != null && (
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Next run at {formatTime(schedule.next_run_at)} (UTC top of hour).
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
|
||||
{trackedTelemetryRepeaters.length === 0 ? (
|
||||
<p className="text-sm text-muted-foreground italic">
|
||||
No repeaters are being tracked. Enable tracking from a repeater's dashboard.
|
||||
@@ -341,16 +424,6 @@ export function SettingsDatabaseSection({
|
||||
)}
|
||||
</div>
|
||||
|
||||
{error && (
|
||||
<div className="text-sm text-destructive" role="alert">
|
||||
{error}
|
||||
</div>
|
||||
)}
|
||||
|
||||
<Button onClick={handleSave} disabled={busy} className="w-full">
|
||||
{busy ? 'Saving...' : 'Save Settings'}
|
||||
</Button>
|
||||
|
||||
<Separator />
|
||||
|
||||
{/* ── Contact Management ── */}
|
||||
@@ -380,11 +453,14 @@ export function SettingsDatabaseSection({
|
||||
<input
|
||||
type="checkbox"
|
||||
checked={checked}
|
||||
onChange={() =>
|
||||
setDiscoveryBlockedTypes((prev) =>
|
||||
checked ? prev.filter((t) => t !== typeCode) : [...prev, typeCode]
|
||||
)
|
||||
}
|
||||
onChange={() => {
|
||||
const prev = discoveryBlockedTypes;
|
||||
const next = checked ? prev.filter((t) => t !== typeCode) : [...prev, typeCode];
|
||||
setDiscoveryBlockedTypes(next);
|
||||
void persistAppSettings({ discovery_blocked_types: next }, () =>
|
||||
setDiscoveryBlockedTypes(prev)
|
||||
);
|
||||
}}
|
||||
className="rounded border-input"
|
||||
/>
|
||||
{label}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { act, fireEvent, render, screen, waitFor } from '@testing-library/react';
|
||||
import { fireEvent, render, screen, waitFor } from '@testing-library/react';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { SettingsModal } from '../components/SettingsModal';
|
||||
@@ -70,6 +70,7 @@ const baseSettings: AppSettings = {
|
||||
discovery_blocked_types: [],
|
||||
tracked_telemetry_repeaters: [],
|
||||
auto_resend_channel: false,
|
||||
telemetry_interval_hours: 8,
|
||||
};
|
||||
|
||||
function renderModal(overrides?: {
|
||||
@@ -442,52 +443,86 @@ describe('SettingsModal', () => {
|
||||
expect(screen.getByText('iPhone')).toBeInTheDocument();
|
||||
});
|
||||
|
||||
it('clears stale errors when switching external desktop sections', async () => {
|
||||
it('reverts checkbox state when auto-persist fails on the database section', async () => {
|
||||
// Auto-persist replaced the old "Save Settings" button on this section.
|
||||
// The risk is now: a toggle gets applied optimistically, the PATCH fails,
|
||||
// and we're left with the UI out of sync with saved state. Verify the
|
||||
// revert-on-error path keeps the checkbox consistent with the server.
|
||||
const onSaveAppSettings = vi.fn(async () => {
|
||||
throw new Error('Save failed');
|
||||
});
|
||||
|
||||
const { view } = renderModal({
|
||||
renderModal({
|
||||
externalSidebarNav: true,
|
||||
desktopSection: 'database',
|
||||
onSaveAppSettings,
|
||||
});
|
||||
|
||||
fireEvent.click(screen.getByRole('button', { name: 'Save Settings' }));
|
||||
const checkbox = screen.getByRole('checkbox', {
|
||||
name: /Auto-decrypt historical DMs/i,
|
||||
}) as HTMLInputElement;
|
||||
const initialChecked = checkbox.checked;
|
||||
|
||||
fireEvent.click(checkbox);
|
||||
|
||||
await waitFor(() => {
|
||||
expect(screen.getByText('Save failed')).toBeInTheDocument();
|
||||
expect(onSaveAppSettings).toHaveBeenCalled();
|
||||
});
|
||||
await waitFor(() => {
|
||||
expect(checkbox.checked).toBe(initialChecked);
|
||||
});
|
||||
});
|
||||
|
||||
it('serializes rapid auto-persist clicks so stale writes cannot win', async () => {
|
||||
// Regression test for a race where rapid consecutive checkbox toggles
|
||||
// fire overlapping PATCHes that can land out of order. The page now
|
||||
// chains saves through a single promise, so the server sees them in
|
||||
// the order the user clicked. This test hand-controls resolution
|
||||
// order to force the "stale write" scenario if serialization were off.
|
||||
|
||||
const deferred: { resolve: () => void }[] = [];
|
||||
const callOrder: number[] = [];
|
||||
|
||||
const onSaveAppSettings = vi.fn(async (_update: unknown) => {
|
||||
const index = deferred.length;
|
||||
callOrder.push(index);
|
||||
await new Promise<void>((res) => {
|
||||
deferred.push({ resolve: res });
|
||||
});
|
||||
});
|
||||
|
||||
await act(async () => {
|
||||
view.rerender(
|
||||
<SettingsModal
|
||||
open
|
||||
externalSidebarNav
|
||||
desktopSection="fanout"
|
||||
config={baseConfig}
|
||||
health={baseHealth}
|
||||
appSettings={baseSettings}
|
||||
onClose={vi.fn()}
|
||||
onSave={vi.fn(async () => {})}
|
||||
onSaveAppSettings={onSaveAppSettings}
|
||||
onSetPrivateKey={vi.fn(async () => {})}
|
||||
onReboot={vi.fn(async () => {})}
|
||||
onDisconnect={vi.fn(async () => {})}
|
||||
onReconnect={vi.fn(async () => {})}
|
||||
onAdvertise={vi.fn(async () => {})}
|
||||
meshDiscovery={null}
|
||||
meshDiscoveryLoadingTarget={null}
|
||||
onDiscoverMesh={vi.fn(async () => {})}
|
||||
onHealthRefresh={vi.fn(async () => {})}
|
||||
onRefreshAppSettings={vi.fn(async () => {})}
|
||||
/>
|
||||
);
|
||||
await Promise.resolve();
|
||||
renderModal({
|
||||
externalSidebarNav: true,
|
||||
desktopSection: 'database',
|
||||
onSaveAppSettings,
|
||||
});
|
||||
|
||||
expect(api.getFanoutConfigs).toHaveBeenCalled();
|
||||
expect(screen.getByRole('button', { name: 'Add Integration' })).toBeInTheDocument();
|
||||
expect(screen.queryByText('Save failed')).not.toBeInTheDocument();
|
||||
// Two distinct checkboxes in quick succession.
|
||||
const blockClients = screen.getByRole('checkbox', { name: /Block clients/i });
|
||||
const blockRepeaters = screen.getByRole('checkbox', { name: /Block repeaters/i });
|
||||
|
||||
fireEvent.click(blockClients);
|
||||
fireEvent.click(blockRepeaters);
|
||||
|
||||
// Wait for the first PATCH to be registered. Only the first should be
|
||||
// in-flight — the second must be queued behind it.
|
||||
await waitFor(() => {
|
||||
expect(deferred.length).toBe(1);
|
||||
});
|
||||
expect(callOrder).toEqual([0]);
|
||||
|
||||
// Resolve the first PATCH. The chain should now dispatch the second.
|
||||
deferred[0].resolve();
|
||||
await waitFor(() => {
|
||||
expect(deferred.length).toBe(2);
|
||||
});
|
||||
expect(callOrder).toEqual([0, 1]);
|
||||
|
||||
// Resolve the second so the test tears down cleanly.
|
||||
deferred[1].resolve();
|
||||
await waitFor(() => {
|
||||
expect(onSaveAppSettings).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
it('does not call onClose after save/reboot flows in page mode', async () => {
|
||||
|
||||
@@ -355,6 +355,7 @@ export interface AppSettings {
|
||||
discovery_blocked_types: number[];
|
||||
tracked_telemetry_repeaters: string[];
|
||||
auto_resend_channel: boolean;
|
||||
telemetry_interval_hours: number;
|
||||
}
|
||||
|
||||
export interface AppSettingsUpdate {
|
||||
@@ -366,11 +367,22 @@ export interface AppSettingsUpdate {
|
||||
blocked_keys?: string[];
|
||||
blocked_names?: string[];
|
||||
discovery_blocked_types?: number[];
|
||||
telemetry_interval_hours?: number;
|
||||
}
|
||||
|
||||
export interface TelemetrySchedule {
|
||||
preferred_hours: number;
|
||||
effective_hours: number;
|
||||
options: number[];
|
||||
tracked_count: number;
|
||||
max_tracked: number;
|
||||
next_run_at: number | null;
|
||||
}
|
||||
|
||||
export interface TrackedTelemetryResponse {
|
||||
tracked_telemetry_repeaters: string[];
|
||||
names: Record<string, string>;
|
||||
schedule: TelemetrySchedule;
|
||||
}
|
||||
|
||||
/** Contact type constants */
|
||||
|
||||
@@ -2,4 +2,4 @@
|
||||
# run ``run_migrations`` to completion assert ``get_version == LATEST`` and
|
||||
# ``applied == LATEST - starting_version`` so only this constant needs to
|
||||
# change, not every individual assertion.
|
||||
LATEST_SCHEMA_VERSION = 56
|
||||
LATEST_SCHEMA_VERSION = 57
|
||||
|
||||
@@ -1880,6 +1880,305 @@ class TestCollectRepeaterTelemetryLpp:
|
||||
assert "lpp_sensors" not in recorded_data
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _telemetry_collect_loop — UTC modulo scheduler
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestTelemetryCollectSchedulerDecision:
|
||||
"""Verify the scheduler's run/skip decision at an hourly wake.
|
||||
|
||||
We test the decision logic by stubbing the sleep + datetime functions
|
||||
and asserting ``_run_telemetry_cycle`` is called exactly on matching
|
||||
hours. Full end-to-end of the loop is covered implicitly by the
|
||||
existing telemetry-collect tests; what we're pinning here is the
|
||||
hour-modulo gate the new scheduler depends on.
|
||||
"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_hour_modulo_mismatch(self):
|
||||
"""At 09:00 UTC with interval 8h, the loop must NOT run a cycle."""
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from app import radio_sync
|
||||
from app.models import AppSettings
|
||||
|
||||
settings = AppSettings(
|
||||
tracked_telemetry_repeaters=["aa" * 32],
|
||||
telemetry_interval_hours=8,
|
||||
)
|
||||
ran = False
|
||||
|
||||
async def fake_cycle():
|
||||
nonlocal ran
|
||||
ran = True
|
||||
|
||||
def make_fake_datetime(hour: int):
|
||||
class FakeDatetime:
|
||||
@classmethod
|
||||
def now(cls, tz=None):
|
||||
import datetime as real_datetime
|
||||
|
||||
return real_datetime.datetime(2026, 4, 16, hour, 0, 0, tzinfo=real_datetime.UTC)
|
||||
|
||||
return FakeDatetime
|
||||
|
||||
sleep_count = 0
|
||||
|
||||
async def fake_sleep(_duration):
|
||||
# The loop does: (1) initial-delay sleep, (2) sleep-to-top-of-hour,
|
||||
# then evaluates the run/skip decision. Allow both sleeps to
|
||||
# pass, then cancel on the 3rd (next iteration's top-of-hour sleep).
|
||||
nonlocal sleep_count
|
||||
sleep_count += 1
|
||||
if sleep_count >= 3:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.radio_sync.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=settings,
|
||||
),
|
||||
patch("app.radio_sync._run_telemetry_cycle", new=fake_cycle),
|
||||
patch("app.radio_sync.asyncio.sleep", new=fake_sleep),
|
||||
patch("app.radio_sync.datetime", new=make_fake_datetime(9)),
|
||||
):
|
||||
try:
|
||||
await radio_sync._telemetry_collect_loop()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
assert ran is False, "09:00 UTC is not a multiple of 8h; cycle must not run"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_runs_when_hour_modulo_matches(self):
|
||||
"""At 16:00 UTC with interval 8h, the loop must run a cycle."""
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from app import radio_sync
|
||||
from app.models import AppSettings
|
||||
|
||||
settings = AppSettings(
|
||||
tracked_telemetry_repeaters=["aa" * 32],
|
||||
telemetry_interval_hours=8,
|
||||
)
|
||||
ran = False
|
||||
|
||||
async def fake_cycle():
|
||||
nonlocal ran
|
||||
ran = True
|
||||
|
||||
class FakeDatetime:
|
||||
@classmethod
|
||||
def now(cls, tz=None):
|
||||
import datetime as real_datetime
|
||||
|
||||
return real_datetime.datetime(2026, 4, 16, 16, 0, 0, tzinfo=real_datetime.UTC)
|
||||
|
||||
sleep_count = 0
|
||||
|
||||
async def fake_sleep(_duration):
|
||||
# Let the loop's initial-delay + top-of-hour sleeps pass; cancel
|
||||
# on the third sleep (next iteration's top-of-hour wake).
|
||||
nonlocal sleep_count
|
||||
sleep_count += 1
|
||||
if sleep_count >= 3:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.radio_sync.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=settings,
|
||||
),
|
||||
patch("app.radio_sync._run_telemetry_cycle", new=fake_cycle),
|
||||
patch("app.radio_sync.asyncio.sleep", new=fake_sleep),
|
||||
patch("app.radio_sync.datetime", new=FakeDatetime),
|
||||
):
|
||||
try:
|
||||
await radio_sync._telemetry_collect_loop()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
assert ran is True, "16:00 UTC is a multiple of 8h; cycle must run"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_no_repeaters_tracked(self):
|
||||
"""Empty tracked list short-circuits regardless of modulo match."""
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from app import radio_sync
|
||||
from app.models import AppSettings
|
||||
|
||||
settings = AppSettings(tracked_telemetry_repeaters=[], telemetry_interval_hours=8)
|
||||
ran = False
|
||||
|
||||
async def fake_cycle():
|
||||
nonlocal ran
|
||||
ran = True
|
||||
|
||||
class FakeDatetime:
|
||||
@classmethod
|
||||
def now(cls, tz=None):
|
||||
import datetime as real_datetime
|
||||
|
||||
return real_datetime.datetime(2026, 4, 16, 16, 0, 0, tzinfo=real_datetime.UTC)
|
||||
|
||||
sleep_count = 0
|
||||
|
||||
async def fake_sleep(_duration):
|
||||
# Let the loop's initial-delay + top-of-hour sleeps pass; cancel
|
||||
# on the third sleep (next iteration's top-of-hour wake).
|
||||
nonlocal sleep_count
|
||||
sleep_count += 1
|
||||
if sleep_count >= 3:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.radio_sync.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=settings,
|
||||
),
|
||||
patch("app.radio_sync._run_telemetry_cycle", new=fake_cycle),
|
||||
patch("app.radio_sync.asyncio.sleep", new=fake_sleep),
|
||||
patch("app.radio_sync.datetime", new=FakeDatetime),
|
||||
):
|
||||
try:
|
||||
await radio_sync._telemetry_collect_loop()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
assert ran is False, "No tracked repeaters: no cycle regardless of hour"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_runs_on_boundary_immediately_after_initial_delay(self):
|
||||
"""Regression test: if the post-boot initial delay finishes inside a
|
||||
matching hour, the cycle must run even if the first
|
||||
sleep-to-next-top-of-hour would otherwise carry us past the boundary.
|
||||
|
||||
Scenario: server starts at 23:59:30 UTC with a 24-hour interval. The
|
||||
60-second boot guard pushes the first check into 00:00:30 — a matching
|
||||
hour that we must NOT skip. Before the fix, the loop went straight to
|
||||
sleeping until 01:00 and then failing the modulo, missing the entire
|
||||
day's only scheduled collection.
|
||||
"""
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from app import radio_sync
|
||||
from app.models import AppSettings
|
||||
|
||||
settings = AppSettings(
|
||||
tracked_telemetry_repeaters=["aa" * 32],
|
||||
telemetry_interval_hours=24, # daily cadence; only matching hour is 00
|
||||
)
|
||||
ran = False
|
||||
|
||||
async def fake_cycle():
|
||||
nonlocal ran
|
||||
ran = True
|
||||
|
||||
class FakeDatetime:
|
||||
@classmethod
|
||||
def now(cls, tz=None):
|
||||
import datetime as real_datetime
|
||||
|
||||
# Simulates "initial delay just ended at 00:00:30 UTC on a
|
||||
# restart that began at 23:59:30." Without the post-boot
|
||||
# boundary check, the loop would have skipped this.
|
||||
return real_datetime.datetime(2026, 4, 16, 0, 0, 30, tzinfo=real_datetime.UTC)
|
||||
|
||||
sleep_count = 0
|
||||
|
||||
async def fake_sleep(_duration):
|
||||
# Let the initial delay pass, then cancel before the first
|
||||
# top-of-hour sleep so we isolate the post-boot check as the
|
||||
# only opportunity to run.
|
||||
nonlocal sleep_count
|
||||
sleep_count += 1
|
||||
if sleep_count >= 2:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.radio_sync.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=settings,
|
||||
),
|
||||
patch("app.radio_sync._run_telemetry_cycle", new=fake_cycle),
|
||||
patch("app.radio_sync.asyncio.sleep", new=fake_sleep),
|
||||
patch("app.radio_sync.datetime", new=FakeDatetime),
|
||||
):
|
||||
try:
|
||||
await radio_sync._telemetry_collect_loop()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
assert ran is True, (
|
||||
"Post-boot check must fire the due 00:00 cycle; otherwise a "
|
||||
"restart near midnight suppresses the whole day's collection."
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clamps_up_when_preferred_illegal_for_current_count(self):
|
||||
"""5 tracked repeaters with saved pref 1h: scheduler should use 6h.
|
||||
|
||||
At 02:00 UTC: 2 % 6 == 2 (not a run), so cycle must not fire.
|
||||
If clamping were skipped, 2 % 1 == 0 and cycle would incorrectly run.
|
||||
"""
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from app import radio_sync
|
||||
from app.models import AppSettings
|
||||
|
||||
settings = AppSettings(
|
||||
tracked_telemetry_repeaters=["aa" * 32] * 5,
|
||||
telemetry_interval_hours=1, # illegal at N=5; shortest legal is 6h
|
||||
)
|
||||
ran = False
|
||||
|
||||
async def fake_cycle():
|
||||
nonlocal ran
|
||||
ran = True
|
||||
|
||||
class FakeDatetime:
|
||||
@classmethod
|
||||
def now(cls, tz=None):
|
||||
import datetime as real_datetime
|
||||
|
||||
return real_datetime.datetime(2026, 4, 16, 2, 0, 0, tzinfo=real_datetime.UTC)
|
||||
|
||||
sleep_count = 0
|
||||
|
||||
async def fake_sleep(_duration):
|
||||
# Let the loop's initial-delay + top-of-hour sleeps pass; cancel
|
||||
# on the third sleep (next iteration's top-of-hour wake).
|
||||
nonlocal sleep_count
|
||||
sleep_count += 1
|
||||
if sleep_count >= 3:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.radio_sync.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=settings,
|
||||
),
|
||||
patch("app.radio_sync._run_telemetry_cycle", new=fake_cycle),
|
||||
patch("app.radio_sync.asyncio.sleep", new=fake_sleep),
|
||||
patch("app.radio_sync.datetime", new=FakeDatetime),
|
||||
):
|
||||
try:
|
||||
await radio_sync._telemetry_collect_loop()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
assert ran is False, (
|
||||
"Clamping to 6h must prevent the 02:00 run that 1h cadence would've triggered"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_contacts_selected_for_radio_sync — DM-active prioritization
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -11,6 +11,7 @@ from app.routers.settings import (
|
||||
AppSettingsUpdate,
|
||||
FavoriteRequest,
|
||||
TrackedTelemetryRequest,
|
||||
get_telemetry_schedule,
|
||||
toggle_favorite,
|
||||
toggle_tracked_telemetry,
|
||||
update_settings,
|
||||
@@ -244,3 +245,88 @@ class TestToggleTrackedTelemetry:
|
||||
result = await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=keys[0]))
|
||||
assert keys[0] not in result.tracked_telemetry_repeaters
|
||||
assert len(result.tracked_telemetry_repeaters) == 7
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_toggle_response_includes_schedule(self, test_db):
|
||||
"""After toggle, response must carry the schedule derivation so the UI
|
||||
can update the interval dropdown without a follow-up fetch."""
|
||||
key = "aa" * 32
|
||||
await self._create_repeater(key)
|
||||
|
||||
result = await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=key))
|
||||
|
||||
assert result.schedule.tracked_count == 1
|
||||
# N=1 unlocks the full menu including 1h
|
||||
assert 1 in result.schedule.options
|
||||
assert result.schedule.max_tracked == 8
|
||||
|
||||
|
||||
class TestTelemetryIntervalValidation:
|
||||
"""PATCH /settings validation for telemetry_interval_hours."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_accepts_valid_interval(self, test_db):
|
||||
result = await update_settings(AppSettingsUpdate(telemetry_interval_hours=4))
|
||||
assert result.telemetry_interval_hours == 4
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_interval_falls_back_to_default(self, test_db):
|
||||
"""Non-menu values are defaulted rather than 400-ing to keep stale
|
||||
clients from getting stuck on a save error."""
|
||||
result = await update_settings(AppSettingsUpdate(telemetry_interval_hours=99))
|
||||
assert result.telemetry_interval_hours == 8 # DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preference_is_preserved_even_when_illegal_for_count(self, test_db):
|
||||
"""User picks 1h at N=5 tracked: stored pref must stay 1h. Scheduler
|
||||
handles the clamping at run time; storage is verbatim."""
|
||||
# Seed 5 tracked repeaters
|
||||
keys = [f"{i:02x}" * 32 for i in range(5)]
|
||||
for k in keys:
|
||||
await ContactRepository.upsert(
|
||||
ContactUpsert(public_key=k, name=f"R{k[:4]}", type=CONTACT_TYPE_REPEATER)
|
||||
)
|
||||
await AppSettingsRepository.update(tracked_telemetry_repeaters=keys)
|
||||
|
||||
result = await update_settings(AppSettingsUpdate(telemetry_interval_hours=1))
|
||||
assert result.telemetry_interval_hours == 1
|
||||
|
||||
# But the GET schedule endpoint should report the clamped effective value.
|
||||
schedule = await get_telemetry_schedule()
|
||||
assert schedule.preferred_hours == 1
|
||||
assert schedule.effective_hours == 6 # N=5 -> shortest legal = 6h
|
||||
|
||||
|
||||
class TestTelemetryScheduleEndpoint:
|
||||
"""GET /settings/tracked-telemetry/schedule."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_schedule_with_no_tracked_repeaters(self, test_db):
|
||||
"""No tracked repeaters means nothing to schedule; next_run_at is None.
|
||||
|
||||
At N=0 the clamp helper returns the default 8h, which is a fine
|
||||
display value for an empty state. Options start at 8h for the same
|
||||
reason — any lower shortest-legal only makes sense once the user
|
||||
has at least one repeater tracked.
|
||||
"""
|
||||
schedule = await get_telemetry_schedule()
|
||||
|
||||
assert schedule.tracked_count == 0
|
||||
assert schedule.next_run_at is None
|
||||
# At N=0 shortest-legal defaults to 8h.
|
||||
assert schedule.options == [8, 12, 24]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_schedule_filters_options_by_tracked_count(self, test_db):
|
||||
keys = [f"{i:02x}" * 32 for i in range(5)]
|
||||
for k in keys:
|
||||
await ContactRepository.upsert(
|
||||
ContactUpsert(public_key=k, name=f"R{k[:4]}", type=CONTACT_TYPE_REPEATER)
|
||||
)
|
||||
await AppSettingsRepository.update(tracked_telemetry_repeaters=keys)
|
||||
|
||||
schedule = await get_telemetry_schedule()
|
||||
|
||||
assert schedule.tracked_count == 5
|
||||
assert schedule.options == [6, 8, 12, 24]
|
||||
assert schedule.next_run_at is not None
|
||||
|
||||
116
tests/test_telemetry_interval.py
Normal file
116
tests/test_telemetry_interval.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""Tests for the telemetry interval math helpers.
|
||||
|
||||
These helpers back both the PATCH validation and the scheduler clamping,
|
||||
so regressions here silently corrupt cadence for every operator. Keep this
|
||||
suite fast, pure, and focused on the boundary values in the N=1..8 table.
|
||||
"""
|
||||
|
||||
from datetime import UTC, datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from app.telemetry_interval import (
|
||||
DAILY_CHECK_CEILING,
|
||||
DEFAULT_TELEMETRY_INTERVAL_HOURS,
|
||||
TELEMETRY_INTERVAL_OPTIONS_HOURS,
|
||||
clamp_telemetry_interval,
|
||||
legal_interval_options,
|
||||
next_run_timestamp_utc,
|
||||
shortest_legal_interval_hours,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("n", "expected_hours"),
|
||||
[
|
||||
(1, 1),
|
||||
(2, 2),
|
||||
(3, 3),
|
||||
(4, 4),
|
||||
(5, 6),
|
||||
(6, 6),
|
||||
(7, 8),
|
||||
(8, 8),
|
||||
],
|
||||
)
|
||||
def test_shortest_legal_interval_table(n: int, expected_hours: int):
|
||||
"""The N=1..8 table must match the user-facing design exactly."""
|
||||
assert shortest_legal_interval_hours(n) == expected_hours
|
||||
|
||||
|
||||
def test_shortest_legal_interval_above_ceiling_falls_back_to_24h():
|
||||
# Not reachable today (max 8 tracked), but verify the math terminates
|
||||
# gracefully if the limit is ever raised above DAILY_CHECK_CEILING.
|
||||
assert shortest_legal_interval_hours(DAILY_CHECK_CEILING + 1) == 24
|
||||
|
||||
|
||||
def test_shortest_legal_interval_zero_returns_default():
|
||||
# No repeaters tracked: loop skips the cycle regardless, but the math
|
||||
# must terminate with a sane value (otherwise div-by-zero).
|
||||
assert shortest_legal_interval_hours(0) == DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
|
||||
|
||||
def test_clamp_respects_user_pref_when_legal():
|
||||
# User picks 2h with N=2 tracked -> 2h is the shortest legal, keep it.
|
||||
assert clamp_telemetry_interval(2, 2) == 2
|
||||
|
||||
|
||||
def test_clamp_pushes_up_when_pref_illegal():
|
||||
# User picked 1h, then grew to 5 tracked. 5 repeaters' shortest legal is
|
||||
# 6h, so the scheduler should be using 6h while the saved pref is still 1.
|
||||
assert clamp_telemetry_interval(1, 5) == 6
|
||||
|
||||
|
||||
def test_clamp_unrecognized_value_falls_back_to_default():
|
||||
# A malformed saved value (e.g. from a hand-edited DB row) should default,
|
||||
# not error. Default 8h still gets clamped up if illegal for N.
|
||||
assert clamp_telemetry_interval(99, 1) == DEFAULT_TELEMETRY_INTERVAL_HOURS
|
||||
|
||||
|
||||
def test_clamp_preserves_longer_than_shortest_legal():
|
||||
# 24h is always legal at any N.
|
||||
assert clamp_telemetry_interval(24, 8) == 24
|
||||
|
||||
|
||||
def test_legal_options_filters_menu():
|
||||
assert legal_interval_options(5) == [6, 8, 12, 24]
|
||||
assert legal_interval_options(1) == list(TELEMETRY_INTERVAL_OPTIONS_HOURS)
|
||||
assert legal_interval_options(8) == [8, 12, 24]
|
||||
|
||||
|
||||
def test_next_run_is_strictly_future_even_on_boundary():
|
||||
# Exactly at a matching top-of-hour (8:00 UTC with interval=8), we want
|
||||
# the *next* one (16:00), never "now". Prevents a double-run in the same
|
||||
# minute if code mishandles equality.
|
||||
now = datetime(2026, 4, 16, 8, 0, 0, tzinfo=UTC)
|
||||
result = next_run_timestamp_utc(8, now=now)
|
||||
expected = datetime(2026, 4, 16, 16, 0, 0, tzinfo=UTC)
|
||||
assert result == int(expected.timestamp())
|
||||
|
||||
|
||||
def test_next_run_rounds_up_from_mid_hour():
|
||||
# 14:37 UTC with interval=8 -> next matching hour is 16:00.
|
||||
now = datetime(2026, 4, 16, 14, 37, 0, tzinfo=UTC)
|
||||
result = next_run_timestamp_utc(8, now=now)
|
||||
expected = datetime(2026, 4, 16, 16, 0, 0, tzinfo=UTC)
|
||||
assert result == int(expected.timestamp())
|
||||
|
||||
|
||||
def test_next_run_crosses_midnight():
|
||||
# 23:12 UTC with interval=8 -> midnight (00:00 next day) is legal.
|
||||
now = datetime(2026, 4, 16, 23, 12, 0, tzinfo=UTC)
|
||||
result = next_run_timestamp_utc(8, now=now)
|
||||
expected = datetime(2026, 4, 17, 0, 0, 0, tzinfo=UTC)
|
||||
assert result == int(expected.timestamp())
|
||||
|
||||
|
||||
def test_next_run_accepts_non_utc_input():
|
||||
# Non-UTC input should be normalized internally.
|
||||
from datetime import timedelta
|
||||
|
||||
pst = timezone(timedelta(hours=-8))
|
||||
# 08:00 PST == 16:00 UTC, a matching boundary for interval=8 -> next is 00:00 UTC.
|
||||
now = datetime(2026, 4, 16, 8, 0, 0, tzinfo=pst)
|
||||
result = next_run_timestamp_utc(8, now=now)
|
||||
expected = datetime(2026, 4, 17, 0, 0, 0, tzinfo=UTC)
|
||||
assert result == int(expected.timestamp())
|
||||
Reference in New Issue
Block a user