From 02379dbd9b15613f5470d0a8116251a5da1ab3f8 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Sat, 10 Jan 2026 12:48:11 -0800 Subject: [PATCH 1/2] Add debugging --- app/event_handlers.py | 35 ++++++++++++++++++++++++++--------- app/main.py | 14 ++++++++------ app/packet_processor.py | 19 ++++++++++++------- app/radio.py | 13 +++++++------ 4 files changed, 53 insertions(+), 28 deletions(-) diff --git a/app/event_handlers.py b/app/event_handlers.py index f8c1174..3aa72df 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -44,16 +44,19 @@ async def on_contact_message(event: "Event") -> None: The packet processor cannot decrypt these without the node's private key. """ payload = event.payload + pubkey_prefix = payload.get("pubkey_prefix", "unknown") + text_preview = payload.get("text", "")[:50] + + logger.info("[DM] CONTACT_MSG_RECV from %s: %s%s", + pubkey_prefix, text_preview, "..." if len(payload.get("text", "")) > 50 else "") # Skip CLI command responses (txt_type=1) - these are handled by the command endpoint # and should not be stored in the database or broadcast via WebSocket txt_type = payload.get("txt_type", 0) if txt_type == 1: - logger.debug("Skipping CLI response from %s (txt_type=1)", payload.get("pubkey_prefix")) + logger.info("[DM] Skipping CLI response from %s (txt_type=1)", pubkey_prefix) return - logger.debug("Received direct message from %s", payload.get("pubkey_prefix")) - # Get full public key if available, otherwise use prefix sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "") received_at = int(time.time()) @@ -109,21 +112,29 @@ async def on_rx_log_data(event: "Event") -> None: handles channel messages (GROUP_TEXT) and advertisements (ADVERT). """ payload = event.payload - logger.debug("Received RX log data packet") if "payload" not in payload: - logger.warning("RX_LOG_DATA event missing 'payload' field") + logger.warning("[RX] RX_LOG_DATA event missing 'payload' field: %s", payload) return raw_hex = payload["payload"] raw_bytes = bytes.fromhex(raw_hex) + snr = payload.get("snr") + rssi = payload.get("rssi") - await process_raw_packet( + logger.info("[RX] RX_LOG_DATA received: %d bytes, SNR=%.1f, RSSI=%s", + len(raw_bytes), snr if snr is not None else 0, rssi) + + result = await process_raw_packet( raw_bytes=raw_bytes, - snr=payload.get("snr"), - rssi=payload.get("rssi"), + snr=snr, + rssi=rssi, ) + logger.info("[RX] Processed: type=%s, decrypted=%s, channel=%s, msg_id=%s", + result.get("payload_type"), result.get("decrypted"), + result.get("channel_name"), result.get("message_id")) + async def on_path_update(event: "Event") -> None: """Handle path update events.""" @@ -193,9 +204,15 @@ def register_event_handlers(meshcore) -> None: These are handled by the packet processor via RX_LOG_DATA to avoid duplicate processing and ensure consistent handling. """ + logger.info("[INIT] Registering event handlers...") meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_contact_message) + logger.info("[INIT] Subscribed to CONTACT_MSG_RECV (direct messages)") meshcore.subscribe(EventType.RX_LOG_DATA, on_rx_log_data) + logger.info("[INIT] Subscribed to RX_LOG_DATA (raw packets for channel messages)") meshcore.subscribe(EventType.PATH_UPDATE, on_path_update) + logger.info("[INIT] Subscribed to PATH_UPDATE") meshcore.subscribe(EventType.NEW_CONTACT, on_new_contact) + logger.info("[INIT] Subscribed to NEW_CONTACT") meshcore.subscribe(EventType.ACK, on_ack) - logger.info("Event handlers registered") + logger.info("[INIT] Subscribed to ACK") + logger.info("[INIT] All event handlers registered successfully") diff --git a/app/main.py b/app/main.py index 25c87d9..b23e257 100644 --- a/app/main.py +++ b/app/main.py @@ -30,25 +30,27 @@ async def lifespan(app: FastAPI): try: await radio_manager.connect() - logger.info("Connected to radio") + logger.info("[STARTUP] Connected to radio at %s", radio_manager.port) if radio_manager.meshcore: + logger.info("[STARTUP] Registering event handlers for message reception...") register_event_handlers(radio_manager.meshcore) # Sync contacts/channels from radio to DB and clear radio - logger.info("Syncing and offloading radio data...") + logger.info("[STARTUP] Syncing and offloading radio data...") result = await sync_and_offload_all() - logger.info("Sync complete: %s", result) + logger.info("[STARTUP] Sync complete: %s", result) # Start periodic sync start_periodic_sync() # Send advertisement to announce our presence - logger.info("Sending startup advertisement...") + logger.info("[STARTUP] Sending startup advertisement...") advert_result = await radio_manager.meshcore.commands.send_advert(flood=True) - logger.info("Advertisement sent: %s", advert_result.type) + logger.info("[STARTUP] Advertisement sent: %s", advert_result.type) + logger.info("[STARTUP] Starting auto message fetching...") await radio_manager.meshcore.start_auto_message_fetching() - logger.info("Auto message fetching started") + logger.info("[STARTUP] Auto message fetching started - ready to receive messages") except Exception as e: logger.warning("Failed to connect to radio on startup: %s", e) diff --git a/app/packet_processor.py b/app/packet_processor.py index ca8e47e..31aea24 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -134,7 +134,7 @@ async def process_raw_packet( # If packet_id is None, this is a duplicate packet (same data already exists) # Skip processing since we've already handled this exact packet if packet_id is None: - logger.debug("Duplicate raw packet detected, skipping") + logger.info("[PKT] Duplicate packet detected (same bytes), skipping") return { "packet_id": None, "timestamp": ts, @@ -155,6 +155,9 @@ async def process_raw_packet( payload_type = packet_info.payload_type if packet_info else None payload_type_name = payload_type.name if payload_type else "Unknown" + logger.info("[PKT] New packet id=%d, type=%s, %d bytes", + packet_id, payload_type_name, len(raw_bytes)) + result = { "packet_id": packet_id, "timestamp": ts, @@ -217,6 +220,7 @@ async def _process_group_text( """ # Try to decrypt with all known channel keys channels = await ChannelRepository.get_all() + logger.info("[CHAN] Attempting decryption with %d known channel keys", len(channels)) for channel in channels: # Convert hex key to bytes for decryption @@ -230,10 +234,9 @@ async def _process_group_text( continue # Successfully decrypted! - logger.debug( - "Decrypted GroupText for channel %s: %s", - channel.name, decrypted.message[:50] - ) + logger.info("[CHAN] Decrypted with channel '%s': %s%s", + channel.name, decrypted.message[:50], + "..." if len(decrypted.message) > 50 else "") # Check for repeat detection (our own message echoed back) is_repeat = False @@ -326,6 +329,7 @@ async def _process_group_text( } # Couldn't decrypt with any known key + logger.info("[CHAN] No matching channel key found for GROUP_TEXT packet") return None @@ -341,10 +345,11 @@ async def _process_advertisement( """ advert = try_parse_advertisement(raw_bytes) if not advert: - logger.debug("Failed to parse advertisement packet") + logger.info("[ADVERT] Failed to parse advertisement packet") return - logger.debug("Parsed advertisement from %s: %s", advert.public_key[:12], advert.name) + logger.info("[ADVERT] From %s: name='%s', lat=%.4f, lon=%.4f", + advert.public_key[:12], advert.name, advert.lat or 0, advert.lon or 0) # Try to find existing contact existing = await ContactRepository.get_by_key(advert.public_key) diff --git a/app/radio.py b/app/radio.py index 003f9b3..04ab739 100644 --- a/app/radio.py +++ b/app/radio.py @@ -137,11 +137,8 @@ class RadioManager: if not port: raise RuntimeError("No MeshCore radio found. Please specify MESHCORE_SERIAL_PORT.") - logger.debug( - "Connecting to radio at %s (baud %d)", - port, - settings.serial_baudrate, - ) + logger.info("[RADIO] Connecting to radio at %s (baud %d)...", + port, settings.serial_baudrate) self._meshcore = await MeshCore.create_serial( port=port, baudrate=settings.serial_baudrate, @@ -150,7 +147,11 @@ class RadioManager: ) self._port = port self._last_connected = True - logger.debug("Serial connection established") + logger.info("[RADIO] Serial connection established to %s", port) + if self._meshcore.self_info: + logger.info("[RADIO] Radio info: name='%s', pubkey=%s...", + self._meshcore.self_info.get("name", "unknown"), + self._meshcore.self_info.get("public_key", "")[:16]) async def disconnect(self) -> None: """Disconnect from the radio.""" From b76795913c24a58ed9b0b4ae098bdcaf96e6f37b Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Sat, 10 Jan 2026 12:51:56 -0800 Subject: [PATCH 2/2] Add periodic message draining for bad-push situations --- app/event_handlers.py | 35 ++++--------- app/main.py | 26 ++++++--- app/packet_processor.py | 19 +++---- app/radio.py | 13 +++-- app/radio_sync.py | 113 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 153 insertions(+), 53 deletions(-) diff --git a/app/event_handlers.py b/app/event_handlers.py index 3aa72df..f8c1174 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -44,19 +44,16 @@ async def on_contact_message(event: "Event") -> None: The packet processor cannot decrypt these without the node's private key. """ payload = event.payload - pubkey_prefix = payload.get("pubkey_prefix", "unknown") - text_preview = payload.get("text", "")[:50] - - logger.info("[DM] CONTACT_MSG_RECV from %s: %s%s", - pubkey_prefix, text_preview, "..." if len(payload.get("text", "")) > 50 else "") # Skip CLI command responses (txt_type=1) - these are handled by the command endpoint # and should not be stored in the database or broadcast via WebSocket txt_type = payload.get("txt_type", 0) if txt_type == 1: - logger.info("[DM] Skipping CLI response from %s (txt_type=1)", pubkey_prefix) + logger.debug("Skipping CLI response from %s (txt_type=1)", payload.get("pubkey_prefix")) return + logger.debug("Received direct message from %s", payload.get("pubkey_prefix")) + # Get full public key if available, otherwise use prefix sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "") received_at = int(time.time()) @@ -112,29 +109,21 @@ async def on_rx_log_data(event: "Event") -> None: handles channel messages (GROUP_TEXT) and advertisements (ADVERT). """ payload = event.payload + logger.debug("Received RX log data packet") if "payload" not in payload: - logger.warning("[RX] RX_LOG_DATA event missing 'payload' field: %s", payload) + logger.warning("RX_LOG_DATA event missing 'payload' field") return raw_hex = payload["payload"] raw_bytes = bytes.fromhex(raw_hex) - snr = payload.get("snr") - rssi = payload.get("rssi") - logger.info("[RX] RX_LOG_DATA received: %d bytes, SNR=%.1f, RSSI=%s", - len(raw_bytes), snr if snr is not None else 0, rssi) - - result = await process_raw_packet( + await process_raw_packet( raw_bytes=raw_bytes, - snr=snr, - rssi=rssi, + snr=payload.get("snr"), + rssi=payload.get("rssi"), ) - logger.info("[RX] Processed: type=%s, decrypted=%s, channel=%s, msg_id=%s", - result.get("payload_type"), result.get("decrypted"), - result.get("channel_name"), result.get("message_id")) - async def on_path_update(event: "Event") -> None: """Handle path update events.""" @@ -204,15 +193,9 @@ def register_event_handlers(meshcore) -> None: These are handled by the packet processor via RX_LOG_DATA to avoid duplicate processing and ensure consistent handling. """ - logger.info("[INIT] Registering event handlers...") meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_contact_message) - logger.info("[INIT] Subscribed to CONTACT_MSG_RECV (direct messages)") meshcore.subscribe(EventType.RX_LOG_DATA, on_rx_log_data) - logger.info("[INIT] Subscribed to RX_LOG_DATA (raw packets for channel messages)") meshcore.subscribe(EventType.PATH_UPDATE, on_path_update) - logger.info("[INIT] Subscribed to PATH_UPDATE") meshcore.subscribe(EventType.NEW_CONTACT, on_new_contact) - logger.info("[INIT] Subscribed to NEW_CONTACT") meshcore.subscribe(EventType.ACK, on_ack) - logger.info("[INIT] Subscribed to ACK") - logger.info("[INIT] All event handlers registered successfully") + logger.info("Event handlers registered") diff --git a/app/main.py b/app/main.py index b23e257..cb70d50 100644 --- a/app/main.py +++ b/app/main.py @@ -12,7 +12,10 @@ from app.database import db from app.event_handlers import register_event_handlers from app.radio import radio_manager from app.radio_sync import ( + drain_pending_messages, + start_message_polling, start_periodic_sync, + stop_message_polling, stop_periodic_sync, sync_and_offload_all, ) @@ -30,27 +33,33 @@ async def lifespan(app: FastAPI): try: await radio_manager.connect() - logger.info("[STARTUP] Connected to radio at %s", radio_manager.port) + logger.info("Connected to radio") if radio_manager.meshcore: - logger.info("[STARTUP] Registering event handlers for message reception...") register_event_handlers(radio_manager.meshcore) # Sync contacts/channels from radio to DB and clear radio - logger.info("[STARTUP] Syncing and offloading radio data...") + logger.info("Syncing and offloading radio data...") result = await sync_and_offload_all() - logger.info("[STARTUP] Sync complete: %s", result) + logger.info("Sync complete: %s", result) # Start periodic sync start_periodic_sync() # Send advertisement to announce our presence - logger.info("[STARTUP] Sending startup advertisement...") + logger.info("Sending startup advertisement...") advert_result = await radio_manager.meshcore.commands.send_advert(flood=True) - logger.info("[STARTUP] Advertisement sent: %s", advert_result.type) + logger.info("Advertisement sent: %s", advert_result.type) - logger.info("[STARTUP] Starting auto message fetching...") await radio_manager.meshcore.start_auto_message_fetching() - logger.info("[STARTUP] Auto message fetching started - ready to receive messages") + logger.info("Auto message fetching started") + + # Drain any messages that were queued before we connected + drained = await drain_pending_messages() + if drained > 0: + logger.info("Drained %d pending message(s)", drained) + + # Start periodic message polling as fallback for unreliable push events + start_message_polling() except Exception as e: logger.warning("Failed to connect to radio on startup: %s", e) @@ -61,6 +70,7 @@ async def lifespan(app: FastAPI): logger.info("Shutting down") await radio_manager.stop_connection_monitor() + stop_message_polling() stop_periodic_sync() if radio_manager.meshcore: await radio_manager.meshcore.stop_auto_message_fetching() diff --git a/app/packet_processor.py b/app/packet_processor.py index 31aea24..ca8e47e 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -134,7 +134,7 @@ async def process_raw_packet( # If packet_id is None, this is a duplicate packet (same data already exists) # Skip processing since we've already handled this exact packet if packet_id is None: - logger.info("[PKT] Duplicate packet detected (same bytes), skipping") + logger.debug("Duplicate raw packet detected, skipping") return { "packet_id": None, "timestamp": ts, @@ -155,9 +155,6 @@ async def process_raw_packet( payload_type = packet_info.payload_type if packet_info else None payload_type_name = payload_type.name if payload_type else "Unknown" - logger.info("[PKT] New packet id=%d, type=%s, %d bytes", - packet_id, payload_type_name, len(raw_bytes)) - result = { "packet_id": packet_id, "timestamp": ts, @@ -220,7 +217,6 @@ async def _process_group_text( """ # Try to decrypt with all known channel keys channels = await ChannelRepository.get_all() - logger.info("[CHAN] Attempting decryption with %d known channel keys", len(channels)) for channel in channels: # Convert hex key to bytes for decryption @@ -234,9 +230,10 @@ async def _process_group_text( continue # Successfully decrypted! - logger.info("[CHAN] Decrypted with channel '%s': %s%s", - channel.name, decrypted.message[:50], - "..." if len(decrypted.message) > 50 else "") + logger.debug( + "Decrypted GroupText for channel %s: %s", + channel.name, decrypted.message[:50] + ) # Check for repeat detection (our own message echoed back) is_repeat = False @@ -329,7 +326,6 @@ async def _process_group_text( } # Couldn't decrypt with any known key - logger.info("[CHAN] No matching channel key found for GROUP_TEXT packet") return None @@ -345,11 +341,10 @@ async def _process_advertisement( """ advert = try_parse_advertisement(raw_bytes) if not advert: - logger.info("[ADVERT] Failed to parse advertisement packet") + logger.debug("Failed to parse advertisement packet") return - logger.info("[ADVERT] From %s: name='%s', lat=%.4f, lon=%.4f", - advert.public_key[:12], advert.name, advert.lat or 0, advert.lon or 0) + logger.debug("Parsed advertisement from %s: %s", advert.public_key[:12], advert.name) # Try to find existing contact existing = await ContactRepository.get_by_key(advert.public_key) diff --git a/app/radio.py b/app/radio.py index 04ab739..003f9b3 100644 --- a/app/radio.py +++ b/app/radio.py @@ -137,8 +137,11 @@ class RadioManager: if not port: raise RuntimeError("No MeshCore radio found. Please specify MESHCORE_SERIAL_PORT.") - logger.info("[RADIO] Connecting to radio at %s (baud %d)...", - port, settings.serial_baudrate) + logger.debug( + "Connecting to radio at %s (baud %d)", + port, + settings.serial_baudrate, + ) self._meshcore = await MeshCore.create_serial( port=port, baudrate=settings.serial_baudrate, @@ -147,11 +150,7 @@ class RadioManager: ) self._port = port self._last_connected = True - logger.info("[RADIO] Serial connection established to %s", port) - if self._meshcore.self_info: - logger.info("[RADIO] Radio info: name='%s', pubkey=%s...", - self._meshcore.self_info.get("name", "unknown"), - self._meshcore.self_info.get("public_key", "")[:16]) + logger.debug("Serial connection established") async def disconnect(self) -> None: """Disconnect from the radio.""" diff --git a/app/radio_sync.py b/app/radio_sync.py index ec87d73..a4965d5 100644 --- a/app/radio_sync.py +++ b/app/radio_sync.py @@ -5,6 +5,8 @@ This module handles syncing contacts and channels from the radio to the database then removing them from the radio to free up space for new discoveries. Also handles loading recent non-repeater contacts TO the radio for DM ACK support. +Also handles periodic message polling as a fallback for platforms where push events +don't work reliably. """ import asyncio @@ -20,6 +22,12 @@ from app.repository import ChannelRepository, ContactRepository logger = logging.getLogger(__name__) +# Message poll task handle +_message_poll_task: asyncio.Task | None = None + +# Message poll interval in seconds +MESSAGE_POLL_INTERVAL = 5 + # Background task handle _sync_task: asyncio.Task | None = None @@ -187,6 +195,111 @@ async def sync_and_offload_all() -> dict: } +async def drain_pending_messages() -> int: + """ + Drain all pending messages from the radio. + + Calls get_msg() repeatedly until NO_MORE_MSGS is received. + Returns the count of messages retrieved. + """ + if not radio_manager.is_connected or radio_manager.meshcore is None: + return 0 + + mc = radio_manager.meshcore + count = 0 + max_iterations = 100 # Safety limit + + for _ in range(max_iterations): + try: + result = await mc.commands.get_msg(timeout=2.0) + + if result.type == EventType.NO_MORE_MSGS: + break + elif result.type == EventType.ERROR: + logger.debug("Error during message drain: %s", result.payload) + break + elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV): + count += 1 + + # Small delay between fetches + await asyncio.sleep(0.1) + + except asyncio.TimeoutError: + break + except Exception as e: + logger.debug("Error draining messages: %s", e) + break + + return count + + +async def poll_for_messages() -> int: + """ + Poll the radio for any pending messages (single pass). + + This is a fallback for platforms where MESSAGES_WAITING push events + don't work reliably. + + Returns the count of messages retrieved. + """ + if not radio_manager.is_connected or radio_manager.meshcore is None: + return 0 + + mc = radio_manager.meshcore + count = 0 + + try: + # Try to get one message + result = await mc.commands.get_msg(timeout=2.0) + + if result.type == EventType.NO_MORE_MSGS: + # No messages waiting + return 0 + elif result.type == EventType.ERROR: + return 0 + elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV): + count += 1 + # If we got a message, there might be more - drain them + count += await drain_pending_messages() + + except asyncio.TimeoutError: + pass + except Exception as e: + logger.debug("Message poll exception: %s", e) + + return count + + +async def _message_poll_loop(): + """Background task that periodically polls for messages.""" + while True: + try: + await asyncio.sleep(MESSAGE_POLL_INTERVAL) + + if radio_manager.is_connected: + await poll_for_messages() + + except asyncio.CancelledError: + break + except Exception as e: + logger.debug("Error in message poll loop: %s", e) + + +def start_message_polling(): + """Start the periodic message polling background task.""" + global _message_poll_task + if _message_poll_task is None or _message_poll_task.done(): + _message_poll_task = asyncio.create_task(_message_poll_loop()) + logger.info("Started periodic message polling (interval: %ds)", MESSAGE_POLL_INTERVAL) + + +def stop_message_polling(): + """Stop the periodic message polling background task.""" + global _message_poll_task + if _message_poll_task and not _message_poll_task.done(): + _message_poll_task.cancel() + + async def _periodic_sync_loop(): """Background task that periodically syncs and offloads.""" while True: