mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-05-01 02:53:00 +02:00
Add auto-resend option for not-heard-repeated messages. Closes #154.
This commit is contained in:
@@ -105,7 +105,8 @@ CREATE TABLE IF NOT EXISTS app_settings (
|
||||
blocked_keys TEXT DEFAULT '[]',
|
||||
blocked_names TEXT DEFAULT '[]',
|
||||
discovery_blocked_types TEXT DEFAULT '[]',
|
||||
tracked_telemetry_repeaters TEXT DEFAULT '[]'
|
||||
tracked_telemetry_repeaters TEXT DEFAULT '[]',
|
||||
auto_resend_channel INTEGER DEFAULT 0
|
||||
);
|
||||
INSERT OR IGNORE INTO app_settings (id) VALUES (1);
|
||||
|
||||
|
||||
@@ -407,6 +407,12 @@ async def run_migrations(conn: aiosqlite.Connection) -> int:
|
||||
await set_version(conn, 53)
|
||||
applied += 1
|
||||
|
||||
if version < 54:
|
||||
logger.info("Applying migration 54: add auto_resend_channel to app_settings")
|
||||
await _migrate_054_auto_resend_channel(conn)
|
||||
await set_version(conn, 54)
|
||||
applied += 1
|
||||
|
||||
if applied > 0:
|
||||
logger.info(
|
||||
"Applied %d migration(s), schema now at version %d", applied, await get_version(conn)
|
||||
@@ -3192,3 +3198,18 @@ async def _migrate_053_tracked_telemetry_repeaters(conn: aiosqlite.Connection) -
|
||||
"ALTER TABLE app_settings ADD COLUMN tracked_telemetry_repeaters TEXT DEFAULT '[]'"
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
|
||||
async def _migrate_054_auto_resend_channel(conn: aiosqlite.Connection) -> None:
|
||||
"""Add auto_resend_channel boolean column to app_settings."""
|
||||
tables_cursor = await conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
||||
if "app_settings" not in {row[0] for row in await tables_cursor.fetchall()}:
|
||||
await conn.commit()
|
||||
return
|
||||
col_cursor = await conn.execute("PRAGMA table_info(app_settings)")
|
||||
columns = {row[1] for row in await col_cursor.fetchall()}
|
||||
if "auto_resend_channel" not in columns:
|
||||
await conn.execute(
|
||||
"ALTER TABLE app_settings ADD COLUMN auto_resend_channel INTEGER DEFAULT 0"
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
@@ -843,6 +843,13 @@ class AppSettings(BaseModel):
|
||||
default_factory=list,
|
||||
description="Public keys of repeaters opted into periodic telemetry collection (max 8)",
|
||||
)
|
||||
auto_resend_channel: bool = Field(
|
||||
default=False,
|
||||
description=(
|
||||
"When enabled, outgoing channel messages that receive no echo within 2 seconds "
|
||||
"are automatically byte-perfect resent once (within the 30-second dedup window)"
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class FanoutConfig(BaseModel):
|
||||
|
||||
@@ -30,7 +30,7 @@ class AppSettingsRepository:
|
||||
last_message_times, preferences_migrated,
|
||||
advert_interval, last_advert_time, flood_scope,
|
||||
blocked_keys, blocked_names, discovery_blocked_types,
|
||||
tracked_telemetry_repeaters
|
||||
tracked_telemetry_repeaters, auto_resend_channel
|
||||
FROM app_settings WHERE id = 1
|
||||
"""
|
||||
)
|
||||
@@ -99,6 +99,12 @@ class AppSettingsRepository:
|
||||
except (json.JSONDecodeError, TypeError, KeyError):
|
||||
tracked_telemetry_repeaters = []
|
||||
|
||||
# Parse auto_resend_channel boolean
|
||||
try:
|
||||
auto_resend_channel = bool(row["auto_resend_channel"])
|
||||
except (KeyError, TypeError):
|
||||
auto_resend_channel = False
|
||||
|
||||
return AppSettings(
|
||||
max_radio_contacts=row["max_radio_contacts"],
|
||||
favorites=favorites,
|
||||
@@ -112,6 +118,7 @@ class AppSettingsRepository:
|
||||
blocked_names=blocked_names,
|
||||
discovery_blocked_types=discovery_blocked_types,
|
||||
tracked_telemetry_repeaters=tracked_telemetry_repeaters,
|
||||
auto_resend_channel=auto_resend_channel,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@@ -128,6 +135,7 @@ class AppSettingsRepository:
|
||||
blocked_names: list[str] | None = None,
|
||||
discovery_blocked_types: list[int] | None = None,
|
||||
tracked_telemetry_repeaters: list[str] | None = None,
|
||||
auto_resend_channel: bool | None = None,
|
||||
) -> AppSettings:
|
||||
"""Update app settings. Only provided fields are updated."""
|
||||
updates = []
|
||||
@@ -182,6 +190,10 @@ class AppSettingsRepository:
|
||||
updates.append("tracked_telemetry_repeaters = ?")
|
||||
params.append(json.dumps(tracked_telemetry_repeaters))
|
||||
|
||||
if auto_resend_channel is not None:
|
||||
updates.append("auto_resend_channel = ?")
|
||||
params.append(1 if auto_resend_channel else 0)
|
||||
|
||||
if updates:
|
||||
query = f"UPDATE app_settings SET {', '.join(updates)} WHERE id = 1"
|
||||
await db.conn.execute(query, params)
|
||||
|
||||
@@ -53,6 +53,10 @@ class AppSettingsUpdate(BaseModel):
|
||||
"advertisements should not create new contacts"
|
||||
),
|
||||
)
|
||||
auto_resend_channel: bool | None = Field(
|
||||
default=None,
|
||||
description="Auto-resend channel messages once if no echo heard within 2 seconds",
|
||||
)
|
||||
|
||||
|
||||
class BlockKeyRequest(BaseModel):
|
||||
@@ -142,6 +146,10 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings:
|
||||
valid = [t for t in update.discovery_blocked_types if t in (1, 2, 3, 4)]
|
||||
kwargs["discovery_blocked_types"] = sorted(set(valid))
|
||||
|
||||
# Auto-resend channel
|
||||
if update.auto_resend_channel is not None:
|
||||
kwargs["auto_resend_channel"] = update.auto_resend_channel
|
||||
|
||||
# Flood scope
|
||||
flood_scope_changed = False
|
||||
if update.flood_scope is not None:
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time as _time
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
@@ -9,8 +10,14 @@ from fastapi import HTTPException
|
||||
from meshcore import EventType
|
||||
|
||||
from app.models import ResendChannelMessageResponse
|
||||
from app.radio import RadioOperationBusyError
|
||||
from app.region_scope import normalize_region_scope
|
||||
from app.repository import AppSettingsRepository, ContactRepository, MessageRepository
|
||||
from app.repository import (
|
||||
AppSettingsRepository,
|
||||
ChannelRepository,
|
||||
ContactRepository,
|
||||
MessageRepository,
|
||||
)
|
||||
from app.services import dm_ack_tracker
|
||||
from app.services.messages import (
|
||||
broadcast_message,
|
||||
@@ -33,6 +40,15 @@ NowFn = Callable[[], float]
|
||||
OutgoingReservationKey = tuple[str, str, str]
|
||||
RetryTaskScheduler = Callable[[Any], Any]
|
||||
|
||||
# Channel echo watchdog: delay before checking for echoes
|
||||
ECHO_WATCHDOG_DELAY_SECONDS = 2.0
|
||||
|
||||
# Byte-perfect resend window (must match router's RESEND_WINDOW_SECONDS)
|
||||
RESEND_WINDOW_SECONDS = 30
|
||||
|
||||
# Temp radio slot used by the router for channel sends
|
||||
WATCHDOG_TEMP_RADIO_SLOT = 0
|
||||
|
||||
_pending_outgoing_timestamp_reservations: dict[OutgoingReservationKey, set[int]] = {}
|
||||
_outgoing_timestamp_reservations_lock = asyncio.Lock()
|
||||
|
||||
@@ -620,6 +636,85 @@ async def send_direct_message_to_contact(
|
||||
return message
|
||||
|
||||
|
||||
async def _channel_echo_watchdog(
|
||||
message_id: int,
|
||||
radio_manager,
|
||||
broadcast_fn: BroadcastFn,
|
||||
error_broadcast_fn: BroadcastFn,
|
||||
) -> None:
|
||||
"""One-shot watchdog: if no echo heard after delay, attempt one byte-perfect resend.
|
||||
|
||||
Spawned as a fire-and-forget task after a channel send when auto_resend_channel is enabled.
|
||||
Uses non-blocking radio lock so it never stalls user actions.
|
||||
"""
|
||||
try:
|
||||
await asyncio.sleep(ECHO_WATCHDOG_DELAY_SECONDS)
|
||||
|
||||
msg = await MessageRepository.get_by_id(message_id)
|
||||
if not msg:
|
||||
return
|
||||
if msg.acked > 0:
|
||||
logger.debug(
|
||||
"Echo watchdog: message %d already has %d echo(s), skipping", message_id, msg.acked
|
||||
)
|
||||
return
|
||||
if msg.sender_timestamp is None:
|
||||
return
|
||||
|
||||
elapsed = int(_time.time()) - msg.sender_timestamp
|
||||
if elapsed > RESEND_WINDOW_SECONDS:
|
||||
logger.debug(
|
||||
"Echo watchdog: message %d outside resend window (%ds)", message_id, elapsed
|
||||
)
|
||||
return
|
||||
|
||||
channel = await ChannelRepository.get_by_key(msg.conversation_key)
|
||||
if not channel:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Echo watchdog: no echo for message %d after %.0fs, attempting byte-perfect resend",
|
||||
message_id,
|
||||
ECHO_WATCHDOG_DELAY_SECONDS,
|
||||
)
|
||||
|
||||
try:
|
||||
key_bytes = bytes.fromhex(msg.conversation_key)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
timestamp_bytes = msg.sender_timestamp.to_bytes(4, "little")
|
||||
|
||||
# Strip sender name prefix to get the raw text for the radio
|
||||
async with radio_manager.radio_operation("echo_watchdog_resend", blocking=False) as mc:
|
||||
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
||||
text_to_send = msg.text
|
||||
if radio_name and text_to_send.startswith(f"{radio_name}: "):
|
||||
text_to_send = text_to_send[len(f"{radio_name}: ") :]
|
||||
|
||||
result = await send_channel_message_with_effective_scope(
|
||||
mc=mc,
|
||||
channel=channel,
|
||||
channel_key=msg.conversation_key,
|
||||
key_bytes=key_bytes,
|
||||
text=text_to_send,
|
||||
timestamp_bytes=timestamp_bytes,
|
||||
action_label="echo watchdog resend",
|
||||
radio_manager=radio_manager,
|
||||
temp_radio_slot=WATCHDOG_TEMP_RADIO_SLOT,
|
||||
error_broadcast_fn=error_broadcast_fn,
|
||||
)
|
||||
if result is not None and result.type != EventType.ERROR:
|
||||
logger.info("Echo watchdog: resent message %d successfully", message_id)
|
||||
else:
|
||||
logger.debug("Echo watchdog: resend got no/error result for message %d", message_id)
|
||||
|
||||
except RadioOperationBusyError:
|
||||
logger.debug("Echo watchdog: radio busy, skipping resend for message %d", message_id)
|
||||
except Exception:
|
||||
logger.debug("Echo watchdog: resend failed for message %d", message_id, exc_info=True)
|
||||
|
||||
|
||||
async def send_channel_message_to_channel(
|
||||
*,
|
||||
channel,
|
||||
@@ -728,6 +823,22 @@ async def send_channel_message_to_channel(
|
||||
message_repository=message_repository,
|
||||
)
|
||||
broadcast_message(message=outgoing_message, broadcast_fn=broadcast_fn)
|
||||
|
||||
# Spawn echo watchdog if auto-resend is enabled
|
||||
try:
|
||||
settings = await AppSettingsRepository.get()
|
||||
if settings.auto_resend_channel:
|
||||
asyncio.create_task(
|
||||
_channel_echo_watchdog(
|
||||
message_id=outgoing_message.id,
|
||||
radio_manager=radio_manager,
|
||||
broadcast_fn=broadcast_fn,
|
||||
error_broadcast_fn=error_broadcast_fn,
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
pass # Never let watchdog setup failure break the send
|
||||
|
||||
return outgoing_message
|
||||
|
||||
|
||||
|
||||
@@ -702,6 +702,26 @@ export function SettingsRadioSection({
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-start gap-3 rounded-md border border-border/60 p-3">
|
||||
<Checkbox
|
||||
id="auto-resend-channel"
|
||||
checked={appSettings.auto_resend_channel}
|
||||
onCheckedChange={(checked) =>
|
||||
onSaveAppSettings({ auto_resend_channel: checked === true })
|
||||
}
|
||||
className="mt-0.5"
|
||||
/>
|
||||
<div className="space-y-1">
|
||||
<Label htmlFor="auto-resend-channel">Auto-Resend Unheard Channel Messages</Label>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
When enabled, outgoing channel messages that receive no echo within 2 seconds are
|
||||
automatically resent once (byte-perfect, within the 30-second dedup window). Repeaters
|
||||
that already heard the original will ignore the duplicate. This functionality will NOT
|
||||
create double-sent/duplicate messages.
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="space-y-2">
|
||||
|
||||
@@ -70,6 +70,7 @@ const baseSettings: AppSettings = {
|
||||
blocked_names: [],
|
||||
discovery_blocked_types: [],
|
||||
tracked_telemetry_repeaters: [],
|
||||
auto_resend_channel: false,
|
||||
};
|
||||
|
||||
function renderModal(overrides?: {
|
||||
|
||||
@@ -337,12 +337,14 @@ export interface AppSettings {
|
||||
blocked_names: string[];
|
||||
discovery_blocked_types: number[];
|
||||
tracked_telemetry_repeaters: string[];
|
||||
auto_resend_channel: boolean;
|
||||
}
|
||||
|
||||
export interface AppSettingsUpdate {
|
||||
max_radio_contacts?: number;
|
||||
auto_decrypt_dm_on_advert?: boolean;
|
||||
advert_interval?: number;
|
||||
auto_resend_channel?: boolean;
|
||||
flood_scope?: string;
|
||||
blocked_keys?: string[];
|
||||
blocked_names?: string[];
|
||||
|
||||
@@ -1249,8 +1249,8 @@ class TestMigration039:
|
||||
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
assert applied == 15
|
||||
assert await get_version(conn) == 53
|
||||
assert applied == 16
|
||||
assert await get_version(conn) == 54
|
||||
|
||||
cursor = await conn.execute(
|
||||
"""
|
||||
@@ -1321,8 +1321,8 @@ class TestMigration039:
|
||||
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
assert applied == 15
|
||||
assert await get_version(conn) == 53
|
||||
assert applied == 16
|
||||
assert await get_version(conn) == 54
|
||||
|
||||
cursor = await conn.execute(
|
||||
"""
|
||||
@@ -1388,8 +1388,8 @@ class TestMigration039:
|
||||
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
assert applied == 9
|
||||
assert await get_version(conn) == 53
|
||||
assert applied == 10
|
||||
assert await get_version(conn) == 54
|
||||
|
||||
cursor = await conn.execute(
|
||||
"""
|
||||
@@ -1441,8 +1441,8 @@ class TestMigration040:
|
||||
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
assert applied == 14
|
||||
assert await get_version(conn) == 53
|
||||
assert applied == 15
|
||||
assert await get_version(conn) == 54
|
||||
|
||||
await conn.execute(
|
||||
"""
|
||||
@@ -1503,8 +1503,8 @@ class TestMigration041:
|
||||
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
assert applied == 13
|
||||
assert await get_version(conn) == 53
|
||||
assert applied == 14
|
||||
assert await get_version(conn) == 54
|
||||
|
||||
await conn.execute(
|
||||
"""
|
||||
@@ -1556,8 +1556,8 @@ class TestMigration042:
|
||||
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
assert applied == 12
|
||||
assert await get_version(conn) == 53
|
||||
assert applied == 13
|
||||
assert await get_version(conn) == 54
|
||||
|
||||
await conn.execute(
|
||||
"""
|
||||
@@ -1696,8 +1696,8 @@ class TestMigration046:
|
||||
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
assert applied == 8
|
||||
assert await get_version(conn) == 53
|
||||
assert applied == 9
|
||||
assert await get_version(conn) == 54
|
||||
|
||||
cursor = await conn.execute(
|
||||
"""
|
||||
@@ -1790,8 +1790,8 @@ class TestMigration047:
|
||||
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
assert applied == 7
|
||||
assert await get_version(conn) == 53
|
||||
assert applied == 8
|
||||
assert await get_version(conn) == 54
|
||||
|
||||
cursor = await conn.execute(
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user