Consolidate healthcheck generation and packet dupe handling

This commit is contained in:
Jack Kingsman
2026-02-04 12:11:01 -08:00
parent ad5e799a62
commit 5268e889fa
10 changed files with 735 additions and 206 deletions
+1 -47
View File
@@ -9,18 +9,11 @@ from fastapi.staticfiles import StaticFiles
from app.config import setup_logging
from app.database import db
from app.event_handlers import register_event_handlers
from app.radio import radio_manager
from app.radio_sync import (
drain_pending_messages,
start_message_polling,
start_periodic_advert,
start_periodic_sync,
stop_message_polling,
stop_periodic_advert,
stop_periodic_sync,
sync_and_offload_all,
sync_radio_time,
)
from app.routers import (
channels,
@@ -47,46 +40,7 @@ async def lifespan(app: FastAPI):
try:
await radio_manager.connect()
logger.info("Connected to radio")
if radio_manager.meshcore:
register_event_handlers(radio_manager.meshcore)
# Export and store private key for server-side DM decryption
from app.keystore import export_and_store_private_key
await export_and_store_private_key(radio_manager.meshcore)
# Sync radio clock with system time
await sync_radio_time()
# Sync contacts/channels from radio to DB and clear radio
logger.info("Syncing and offloading radio data...")
result = await sync_and_offload_all()
logger.info("Sync complete: %s", result)
# Start periodic sync
start_periodic_sync()
# Send advertisement to announce our presence (if enabled and not throttled)
from app.radio_sync import send_advertisement
if await send_advertisement():
logger.info("Startup advertisement sent")
else:
logger.debug("Startup advertisement skipped (disabled or throttled)")
# Start periodic advertisement (every hour)
start_periodic_advert()
await radio_manager.meshcore.start_auto_message_fetching()
logger.info("Auto message fetching started")
# Drain any messages that were queued before we connected
drained = await drain_pending_messages()
if drained > 0:
logger.info("Drained %d pending message(s)", drained)
# Start periodic message polling as fallback for unreliable push events
start_message_polling()
await radio_manager.post_connect_setup()
except Exception as e:
logger.warning("Failed to connect to radio on startup: %s", e)
+75 -89
View File
@@ -39,6 +39,71 @@ from app.websocket import broadcast_error, broadcast_event
logger = logging.getLogger(__name__)
async def _handle_duplicate_message(
packet_id: int,
msg_type: str,
conversation_key: str,
text: str,
sender_timestamp: int,
path: str | None,
received: int,
) -> None:
"""Handle a duplicate message by updating paths/acks on the existing record.
Called when MessageRepository.create returns None (INSERT OR IGNORE hit a duplicate).
Looks up the existing message, adds the new path, increments ack count for outgoing
messages, and broadcasts the update to clients.
"""
existing_msg = await MessageRepository.get_by_content(
msg_type=msg_type,
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
)
if not existing_msg:
label = "message" if msg_type == "CHAN" else "DM"
logger.warning(
"Duplicate %s for %s but couldn't find existing",
label,
conversation_key[:12],
)
return
logger.debug(
"Duplicate %s for %s (msg_id=%d, outgoing=%s) - adding path",
msg_type,
conversation_key[:12],
existing_msg.id,
existing_msg.outgoing,
)
# Add path if provided
if path is not None:
paths = await MessageRepository.add_path(existing_msg.id, path, received)
else:
# Get current paths for broadcast
paths = existing_msg.paths or []
# Increment ack count for outgoing messages (echo confirmation)
if existing_msg.outgoing:
ack_count = await MessageRepository.increment_ack_count(existing_msg.id)
else:
ack_count = await MessageRepository.get_ack_count(existing_msg.id)
# Broadcast updated paths
broadcast_event(
"message_acked",
{
"message_id": existing_msg.id,
"ack_count": ack_count,
"paths": [p.model_dump() for p in paths] if paths else [],
},
)
# Mark this packet as decrypted
await RawPacketRepository.mark_decrypted(packet_id, existing_msg.id)
async def create_message_from_decrypted(
packet_id: int,
channel_key: str,
@@ -91,52 +156,9 @@ async def create_message_from_decrypted(
# 1. Our own outgoing message echoes back (flood routing)
# 2. Same message arrives via multiple paths before first is committed
# In either case, add the path to the existing message.
existing_msg = await MessageRepository.get_by_content(
msg_type="CHAN",
conversation_key=channel_key_normalized,
text=text,
sender_timestamp=timestamp,
await _handle_duplicate_message(
packet_id, "CHAN", channel_key_normalized, text, timestamp, path, received
)
if not existing_msg:
logger.warning(
"Duplicate message for channel %s but couldn't find existing",
channel_key_normalized[:8],
)
return None
logger.debug(
"Duplicate message for channel %s (msg_id=%d, outgoing=%s) - adding path",
channel_key_normalized[:8],
existing_msg.id,
existing_msg.outgoing,
)
# Add path if provided
if path is not None:
paths = await MessageRepository.add_path(existing_msg.id, path, received)
else:
# Get current paths for broadcast
paths = existing_msg.paths or []
# Increment ack count for outgoing messages (echo confirmation)
if existing_msg.outgoing:
ack_count = await MessageRepository.increment_ack_count(existing_msg.id)
else:
ack_count = await MessageRepository.get_ack_count(existing_msg.id)
# Broadcast updated paths
broadcast_event(
"message_acked",
{
"message_id": existing_msg.id,
"ack_count": ack_count,
"paths": [p.model_dump() for p in paths] if paths else [],
},
)
# Mark this packet as decrypted
await RawPacketRepository.mark_decrypted(packet_id, existing_msg.id)
return None
logger.info("Stored channel message %d for channel %s", msg_id, channel_key_normalized[:8])
@@ -241,51 +263,15 @@ async def create_dm_message_from_decrypted(
if msg_id is None:
# Duplicate message detected
existing_msg = await MessageRepository.get_by_content(
msg_type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
await _handle_duplicate_message(
packet_id,
"PRIV",
conversation_key,
decrypted.message,
decrypted.timestamp,
path,
received,
)
if not existing_msg:
logger.warning(
"Duplicate DM for contact %s but couldn't find existing",
conversation_key[:12],
)
return None
logger.debug(
"Duplicate DM for contact %s (msg_id=%d, outgoing=%s) - adding path",
conversation_key[:12],
existing_msg.id,
existing_msg.outgoing,
)
# Add path if provided
if path is not None:
paths = await MessageRepository.add_path(existing_msg.id, path, received)
else:
paths = existing_msg.paths or []
# Increment ack count for outgoing messages (echo confirmation)
if existing_msg.outgoing:
ack_count = await MessageRepository.increment_ack_count(existing_msg.id)
else:
ack_count = await MessageRepository.get_ack_count(existing_msg.id)
# Broadcast updated paths
broadcast_event(
"message_acked",
{
"message_id": existing_msg.id,
"ack_count": ack_count,
"paths": [p.model_dump() for p in paths] if paths else [],
},
)
# Mark this packet as decrypted
await RawPacketRepository.mark_decrypted(packet_id, existing_msg.id)
return None
logger.info(
+49 -6
View File
@@ -108,18 +108,61 @@ class RadioManager:
self._reconnect_lock: asyncio.Lock | None = None
async def post_connect_setup(self) -> None:
"""Register event handlers, export private key, and start message fetching.
"""Full post-connection setup: handlers, key export, sync, advertisements, polling.
Called after every successful connection or reconnection.
Idempotent — safe to call repeatedly (periodic tasks have start guards).
"""
from app.event_handlers import register_event_handlers
from app.keystore import export_and_store_private_key
from app.radio_sync import (
drain_pending_messages,
send_advertisement,
start_message_polling,
start_periodic_advert,
start_periodic_sync,
sync_and_offload_all,
sync_radio_time,
)
if self._meshcore:
register_event_handlers(self._meshcore)
await export_and_store_private_key(self._meshcore)
await self._meshcore.start_auto_message_fetching()
logger.info("Post-connect setup complete (handlers, key export, message fetching)")
if not self._meshcore:
return
register_event_handlers(self._meshcore)
await export_and_store_private_key(self._meshcore)
# Sync radio clock with system time
await sync_radio_time()
# Sync contacts/channels from radio to DB and clear radio
logger.info("Syncing and offloading radio data...")
result = await sync_and_offload_all()
logger.info("Sync complete: %s", result)
# Start periodic sync (idempotent)
start_periodic_sync()
# Send advertisement to announce our presence (if enabled and not throttled)
if await send_advertisement():
logger.info("Advertisement sent")
else:
logger.debug("Advertisement skipped (disabled or throttled)")
# Start periodic advertisement (idempotent)
start_periodic_advert()
await self._meshcore.start_auto_message_fetching()
logger.info("Auto message fetching started")
# Drain any messages that were queued before we connected
drained = await drain_pending_messages()
if drained > 0:
logger.info("Drained %d pending message(s)", drained)
# Start periodic message polling as fallback (idempotent)
start_message_polling()
logger.info("Post-connect setup complete")
@property
def meshcore(self) -> MeshCore | None:
+16 -12
View File
@@ -18,10 +18,8 @@ class HealthResponse(BaseModel):
oldest_undecrypted_timestamp: int | None
@router.get("/health", response_model=HealthResponse)
async def healthcheck() -> HealthResponse:
"""Check if the API is running and if the radio is connected."""
# Get database file size in MB
async def build_health_data(radio_connected: bool, serial_port: str | None) -> dict:
"""Build the health status payload used by REST endpoint and WebSocket broadcasts."""
db_size_mb = 0.0
try:
db_size_bytes = os.path.getsize(settings.database_path)
@@ -29,17 +27,23 @@ async def healthcheck() -> HealthResponse:
except OSError:
pass
# Get oldest undecrypted packet info (gracefully handle if DB not connected)
oldest_ts = None
try:
oldest_ts = await RawPacketRepository.get_oldest_undecrypted()
except RuntimeError:
pass # Database not connected
return HealthResponse(
status="ok" if radio_manager.is_connected else "degraded",
radio_connected=radio_manager.is_connected,
serial_port=radio_manager.port,
database_size_mb=db_size_mb,
oldest_undecrypted_timestamp=oldest_ts,
)
return {
"status": "ok" if radio_connected else "degraded",
"radio_connected": radio_connected,
"serial_port": serial_port,
"database_size_mb": db_size_mb,
"oldest_undecrypted_timestamp": oldest_ts,
}
@router.get("/health", response_model=HealthResponse)
async def healthcheck() -> HealthResponse:
"""Check if the API is running and if the radio is connected."""
data = await build_health_data(radio_manager.is_connected, radio_manager.port)
return HealthResponse(**data)
+2 -24
View File
@@ -1,13 +1,11 @@
"""WebSocket router for real-time updates."""
import logging
import os
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.config import settings
from app.radio import radio_manager
from app.repository import RawPacketRepository
from app.routers.health import build_health_data
from app.websocket import ws_manager
logger = logging.getLogger(__name__)
@@ -25,27 +23,7 @@ async def websocket_endpoint(websocket: WebSocket) -> None:
# Send initial health status
try:
db_size_mb = 0.0
try:
db_size_bytes = os.path.getsize(settings.database_path)
db_size_mb = round(db_size_bytes / (1024 * 1024), 2)
except OSError:
pass
# Get oldest undecrypted packet info
oldest_ts = None
try:
oldest_ts = await RawPacketRepository.get_oldest_undecrypted()
except RuntimeError:
pass # Database not connected
health_data = {
"status": "ok" if radio_manager.is_connected else "degraded",
"radio_connected": radio_manager.is_connected,
"serial_port": radio_manager.port,
"database_size_mb": db_size_mb,
"oldest_undecrypted_timestamp": oldest_ts,
}
health_data = await build_health_data(radio_manager.is_connected, radio_manager.port)
await ws_manager.send_personal(websocket, "health", health_data)
except Exception as e:
+3 -28
View File
@@ -3,13 +3,10 @@
import asyncio
import json
import logging
import os
from typing import Any
from fastapi import WebSocket
from app.config import settings
logger = logging.getLogger(__name__)
# Timeout for individual WebSocket send operations (seconds)
@@ -128,33 +125,11 @@ def broadcast_success(message: str, details: str | None = None) -> None:
def broadcast_health(radio_connected: bool, serial_port: str | None = None) -> None:
"""Broadcast health status change to all connected clients."""
from app.repository import RawPacketRepository
async def _broadcast():
# Get database file size in MB
db_size_mb = 0.0
try:
db_size_bytes = os.path.getsize(settings.database_path)
db_size_mb = round(db_size_bytes / (1024 * 1024), 2)
except OSError:
pass
from app.routers.health import build_health_data
# Get oldest undecrypted packet info
oldest_ts = None
try:
oldest_ts = await RawPacketRepository.get_oldest_undecrypted()
except RuntimeError:
pass # Database not connected
await ws_manager.broadcast(
"health",
{
"status": "ok" if radio_connected else "degraded",
"radio_connected": radio_connected,
"serial_port": serial_port,
"database_size_mb": db_size_mb,
"oldest_undecrypted_timestamp": oldest_ts,
},
)
data = await build_health_data(radio_connected, serial_port)
await ws_manager.broadcast("health", data)
asyncio.create_task(_broadcast())
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long