Add periodic message draining for bad-push situations

This commit is contained in:
Jack Kingsman
2026-01-10 12:51:56 -08:00
parent 02379dbd9b
commit b76795913c
5 changed files with 153 additions and 53 deletions

View File

@@ -44,19 +44,16 @@ async def on_contact_message(event: "Event") -> None:
The packet processor cannot decrypt these without the node's private key.
"""
payload = event.payload
pubkey_prefix = payload.get("pubkey_prefix", "unknown")
text_preview = payload.get("text", "")[:50]
logger.info("[DM] CONTACT_MSG_RECV from %s: %s%s",
pubkey_prefix, text_preview, "..." if len(payload.get("text", "")) > 50 else "")
# Skip CLI command responses (txt_type=1) - these are handled by the command endpoint
# and should not be stored in the database or broadcast via WebSocket
txt_type = payload.get("txt_type", 0)
if txt_type == 1:
logger.info("[DM] Skipping CLI response from %s (txt_type=1)", pubkey_prefix)
logger.debug("Skipping CLI response from %s (txt_type=1)", payload.get("pubkey_prefix"))
return
logger.debug("Received direct message from %s", payload.get("pubkey_prefix"))
# Get full public key if available, otherwise use prefix
sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "")
received_at = int(time.time())
@@ -112,29 +109,21 @@ async def on_rx_log_data(event: "Event") -> None:
handles channel messages (GROUP_TEXT) and advertisements (ADVERT).
"""
payload = event.payload
logger.debug("Received RX log data packet")
if "payload" not in payload:
logger.warning("[RX] RX_LOG_DATA event missing 'payload' field: %s", payload)
logger.warning("RX_LOG_DATA event missing 'payload' field")
return
raw_hex = payload["payload"]
raw_bytes = bytes.fromhex(raw_hex)
snr = payload.get("snr")
rssi = payload.get("rssi")
logger.info("[RX] RX_LOG_DATA received: %d bytes, SNR=%.1f, RSSI=%s",
len(raw_bytes), snr if snr is not None else 0, rssi)
result = await process_raw_packet(
await process_raw_packet(
raw_bytes=raw_bytes,
snr=snr,
rssi=rssi,
snr=payload.get("snr"),
rssi=payload.get("rssi"),
)
logger.info("[RX] Processed: type=%s, decrypted=%s, channel=%s, msg_id=%s",
result.get("payload_type"), result.get("decrypted"),
result.get("channel_name"), result.get("message_id"))
async def on_path_update(event: "Event") -> None:
"""Handle path update events."""
@@ -204,15 +193,9 @@ def register_event_handlers(meshcore) -> None:
These are handled by the packet processor via RX_LOG_DATA to avoid
duplicate processing and ensure consistent handling.
"""
logger.info("[INIT] Registering event handlers...")
meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_contact_message)
logger.info("[INIT] Subscribed to CONTACT_MSG_RECV (direct messages)")
meshcore.subscribe(EventType.RX_LOG_DATA, on_rx_log_data)
logger.info("[INIT] Subscribed to RX_LOG_DATA (raw packets for channel messages)")
meshcore.subscribe(EventType.PATH_UPDATE, on_path_update)
logger.info("[INIT] Subscribed to PATH_UPDATE")
meshcore.subscribe(EventType.NEW_CONTACT, on_new_contact)
logger.info("[INIT] Subscribed to NEW_CONTACT")
meshcore.subscribe(EventType.ACK, on_ack)
logger.info("[INIT] Subscribed to ACK")
logger.info("[INIT] All event handlers registered successfully")
logger.info("Event handlers registered")

View File

@@ -12,7 +12,10 @@ from app.database import db
from app.event_handlers import register_event_handlers
from app.radio import radio_manager
from app.radio_sync import (
drain_pending_messages,
start_message_polling,
start_periodic_sync,
stop_message_polling,
stop_periodic_sync,
sync_and_offload_all,
)
@@ -30,27 +33,33 @@ async def lifespan(app: FastAPI):
try:
await radio_manager.connect()
logger.info("[STARTUP] Connected to radio at %s", radio_manager.port)
logger.info("Connected to radio")
if radio_manager.meshcore:
logger.info("[STARTUP] Registering event handlers for message reception...")
register_event_handlers(radio_manager.meshcore)
# Sync contacts/channels from radio to DB and clear radio
logger.info("[STARTUP] Syncing and offloading radio data...")
logger.info("Syncing and offloading radio data...")
result = await sync_and_offload_all()
logger.info("[STARTUP] Sync complete: %s", result)
logger.info("Sync complete: %s", result)
# Start periodic sync
start_periodic_sync()
# Send advertisement to announce our presence
logger.info("[STARTUP] Sending startup advertisement...")
logger.info("Sending startup advertisement...")
advert_result = await radio_manager.meshcore.commands.send_advert(flood=True)
logger.info("[STARTUP] Advertisement sent: %s", advert_result.type)
logger.info("Advertisement sent: %s", advert_result.type)
logger.info("[STARTUP] Starting auto message fetching...")
await radio_manager.meshcore.start_auto_message_fetching()
logger.info("[STARTUP] Auto message fetching started - ready to receive messages")
logger.info("Auto message fetching started")
# Drain any messages that were queued before we connected
drained = await drain_pending_messages()
if drained > 0:
logger.info("Drained %d pending message(s)", drained)
# Start periodic message polling as fallback for unreliable push events
start_message_polling()
except Exception as e:
logger.warning("Failed to connect to radio on startup: %s", e)
@@ -61,6 +70,7 @@ async def lifespan(app: FastAPI):
logger.info("Shutting down")
await radio_manager.stop_connection_monitor()
stop_message_polling()
stop_periodic_sync()
if radio_manager.meshcore:
await radio_manager.meshcore.stop_auto_message_fetching()

View File

@@ -134,7 +134,7 @@ async def process_raw_packet(
# If packet_id is None, this is a duplicate packet (same data already exists)
# Skip processing since we've already handled this exact packet
if packet_id is None:
logger.info("[PKT] Duplicate packet detected (same bytes), skipping")
logger.debug("Duplicate raw packet detected, skipping")
return {
"packet_id": None,
"timestamp": ts,
@@ -155,9 +155,6 @@ async def process_raw_packet(
payload_type = packet_info.payload_type if packet_info else None
payload_type_name = payload_type.name if payload_type else "Unknown"
logger.info("[PKT] New packet id=%d, type=%s, %d bytes",
packet_id, payload_type_name, len(raw_bytes))
result = {
"packet_id": packet_id,
"timestamp": ts,
@@ -220,7 +217,6 @@ async def _process_group_text(
"""
# Try to decrypt with all known channel keys
channels = await ChannelRepository.get_all()
logger.info("[CHAN] Attempting decryption with %d known channel keys", len(channels))
for channel in channels:
# Convert hex key to bytes for decryption
@@ -234,9 +230,10 @@ async def _process_group_text(
continue
# Successfully decrypted!
logger.info("[CHAN] Decrypted with channel '%s': %s%s",
channel.name, decrypted.message[:50],
"..." if len(decrypted.message) > 50 else "")
logger.debug(
"Decrypted GroupText for channel %s: %s",
channel.name, decrypted.message[:50]
)
# Check for repeat detection (our own message echoed back)
is_repeat = False
@@ -329,7 +326,6 @@ async def _process_group_text(
}
# Couldn't decrypt with any known key
logger.info("[CHAN] No matching channel key found for GROUP_TEXT packet")
return None
@@ -345,11 +341,10 @@ async def _process_advertisement(
"""
advert = try_parse_advertisement(raw_bytes)
if not advert:
logger.info("[ADVERT] Failed to parse advertisement packet")
logger.debug("Failed to parse advertisement packet")
return
logger.info("[ADVERT] From %s: name='%s', lat=%.4f, lon=%.4f",
advert.public_key[:12], advert.name, advert.lat or 0, advert.lon or 0)
logger.debug("Parsed advertisement from %s: %s", advert.public_key[:12], advert.name)
# Try to find existing contact
existing = await ContactRepository.get_by_key(advert.public_key)

View File

@@ -137,8 +137,11 @@ class RadioManager:
if not port:
raise RuntimeError("No MeshCore radio found. Please specify MESHCORE_SERIAL_PORT.")
logger.info("[RADIO] Connecting to radio at %s (baud %d)...",
port, settings.serial_baudrate)
logger.debug(
"Connecting to radio at %s (baud %d)",
port,
settings.serial_baudrate,
)
self._meshcore = await MeshCore.create_serial(
port=port,
baudrate=settings.serial_baudrate,
@@ -147,11 +150,7 @@ class RadioManager:
)
self._port = port
self._last_connected = True
logger.info("[RADIO] Serial connection established to %s", port)
if self._meshcore.self_info:
logger.info("[RADIO] Radio info: name='%s', pubkey=%s...",
self._meshcore.self_info.get("name", "unknown"),
self._meshcore.self_info.get("public_key", "")[:16])
logger.debug("Serial connection established")
async def disconnect(self) -> None:
"""Disconnect from the radio."""

View File

@@ -5,6 +5,8 @@ This module handles syncing contacts and channels from the radio to the database
then removing them from the radio to free up space for new discoveries.
Also handles loading recent non-repeater contacts TO the radio for DM ACK support.
Also handles periodic message polling as a fallback for platforms where push events
don't work reliably.
"""
import asyncio
@@ -20,6 +22,12 @@ from app.repository import ChannelRepository, ContactRepository
logger = logging.getLogger(__name__)
# Message poll task handle
_message_poll_task: asyncio.Task | None = None
# Message poll interval in seconds
MESSAGE_POLL_INTERVAL = 5
# Background task handle
_sync_task: asyncio.Task | None = None
@@ -187,6 +195,111 @@ async def sync_and_offload_all() -> dict:
}
async def drain_pending_messages() -> int:
"""
Drain all pending messages from the radio.
Calls get_msg() repeatedly until NO_MORE_MSGS is received.
Returns the count of messages retrieved.
"""
if not radio_manager.is_connected or radio_manager.meshcore is None:
return 0
mc = radio_manager.meshcore
count = 0
max_iterations = 100 # Safety limit
for _ in range(max_iterations):
try:
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
break
elif result.type == EventType.ERROR:
logger.debug("Error during message drain: %s", result.payload)
break
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# Small delay between fetches
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
break
except Exception as e:
logger.debug("Error draining messages: %s", e)
break
return count
async def poll_for_messages() -> int:
"""
Poll the radio for any pending messages (single pass).
This is a fallback for platforms where MESSAGES_WAITING push events
don't work reliably.
Returns the count of messages retrieved.
"""
if not radio_manager.is_connected or radio_manager.meshcore is None:
return 0
mc = radio_manager.meshcore
count = 0
try:
# Try to get one message
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
# No messages waiting
return 0
elif result.type == EventType.ERROR:
return 0
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# If we got a message, there might be more - drain them
count += await drain_pending_messages()
except asyncio.TimeoutError:
pass
except Exception as e:
logger.debug("Message poll exception: %s", e)
return count
async def _message_poll_loop():
"""Background task that periodically polls for messages."""
while True:
try:
await asyncio.sleep(MESSAGE_POLL_INTERVAL)
if radio_manager.is_connected:
await poll_for_messages()
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Error in message poll loop: %s", e)
def start_message_polling():
"""Start the periodic message polling background task."""
global _message_poll_task
if _message_poll_task is None or _message_poll_task.done():
_message_poll_task = asyncio.create_task(_message_poll_loop())
logger.info("Started periodic message polling (interval: %ds)", MESSAGE_POLL_INTERVAL)
def stop_message_polling():
"""Stop the periodic message polling background task."""
global _message_poll_task
if _message_poll_task and not _message_poll_task.done():
_message_poll_task.cancel()
async def _periodic_sync_loop():
"""Background task that periodically syncs and offloads."""
while True: