Fix niggling bugs -- unclean shutdown, radio reconnect contention

This commit is contained in:
Jack Kingsman
2026-01-13 20:08:07 -08:00
parent 32ed00fd34
commit 2083e9c015
6 changed files with 50 additions and 99 deletions

View File

@@ -70,8 +70,8 @@ async def lifespan(app: FastAPI):
logger.info("Shutting down")
await radio_manager.stop_connection_monitor()
stop_message_polling()
stop_periodic_sync()
await stop_message_polling()
await stop_periodic_sync()
if radio_manager.meshcore:
await radio_manager.meshcore.stop_auto_message_fetching()
await radio_manager.disconnect()

View File

@@ -149,24 +149,6 @@ async def process_raw_packet(
ts = timestamp or int(time.time())
packet_id = await RawPacketRepository.create(raw_bytes, ts)
# If packet_id is None, this is a duplicate packet (same data already exists)
# Skip processing since we've already handled this exact packet
if packet_id is None:
logger.debug("Duplicate raw packet detected, skipping")
return {
"packet_id": None,
"timestamp": ts,
"raw_hex": raw_bytes.hex(),
"payload_type": "Duplicate",
"snr": snr,
"rssi": rssi,
"decrypted": False,
"message_id": None,
"channel_name": None,
"sender": None,
}
raw_hex = raw_bytes.hex()
# Parse packet to get type

View File

@@ -105,7 +105,7 @@ class RadioManager:
self._port: str | None = None
self._reconnect_task: asyncio.Task | None = None
self._last_connected: bool = False
self._reconnecting: bool = False
self._reconnect_lock: asyncio.Lock | None = None
@property
def meshcore(self) -> MeshCore | None:
@@ -121,7 +121,7 @@ class RadioManager:
@property
def is_reconnecting(self) -> bool:
return self._reconnecting
return self._reconnect_lock is not None and self._reconnect_lock.locked()
async def connect(self) -> None:
"""Connect to the radio over serial."""
@@ -164,43 +164,47 @@ class RadioManager:
"""Attempt to reconnect to the radio.
Returns True if reconnection was successful, False otherwise.
Uses a lock to prevent concurrent reconnection attempts.
"""
from app.websocket import broadcast_error, broadcast_health
if self._reconnecting:
# Lazily initialize lock (can't create in __init__ before event loop exists)
if self._reconnect_lock is None:
self._reconnect_lock = asyncio.Lock()
# Try to acquire lock without blocking to check if reconnect is in progress
if self._reconnect_lock.locked():
logger.debug("Reconnection already in progress")
return False
self._reconnecting = True
logger.info("Attempting to reconnect to radio...")
async with self._reconnect_lock:
logger.info("Attempting to reconnect to radio...")
try:
# Disconnect if we have a stale connection
if self._meshcore is not None:
try:
await self._meshcore.disconnect()
except Exception:
pass
self._meshcore = None
try:
# Disconnect if we have a stale connection
if self._meshcore is not None:
try:
await self._meshcore.disconnect()
except Exception:
pass
self._meshcore = None
# Try to connect (will auto-detect if no port specified)
await self.connect()
# Try to connect (will auto-detect if no port specified)
await self.connect()
if self.is_connected:
logger.info("Radio reconnected successfully at %s", self._port)
broadcast_health(True, self._port)
return True
else:
logger.warning("Reconnection failed: not connected after connect()")
if self.is_connected:
logger.info("Radio reconnected successfully at %s", self._port)
broadcast_health(True, self._port)
return True
else:
logger.warning("Reconnection failed: not connected after connect()")
return False
except Exception as e:
logger.warning("Reconnection failed: %s", e)
broadcast_error("Reconnection failed", str(e))
return False
except Exception as e:
logger.warning("Reconnection failed: %s", e)
broadcast_error("Reconnection failed", str(e))
return False
finally:
self._reconnecting = False
async def start_connection_monitor(self) -> None:
"""Start background task to monitor connection and auto-reconnect."""
if self._reconnect_task is not None:

View File

@@ -319,11 +319,17 @@ def start_message_polling():
logger.info("Started periodic message polling (interval: %ds)", MESSAGE_POLL_INTERVAL)
def stop_message_polling():
async def stop_message_polling():
"""Stop the periodic message polling background task."""
global _message_poll_task
if _message_poll_task and not _message_poll_task.done():
_message_poll_task.cancel()
try:
await _message_poll_task
except asyncio.CancelledError:
pass
_message_poll_task = None
logger.info("Stopped periodic message polling")
async def _periodic_sync_loop():
@@ -348,11 +354,16 @@ def start_periodic_sync():
logger.info("Started periodic radio sync (interval: %ds)", SYNC_INTERVAL)
def stop_periodic_sync():
async def stop_periodic_sync():
"""Stop the periodic sync background task."""
global _sync_task
if _sync_task and not _sync_task.done():
_sync_task.cancel()
try:
await _sync_task
except asyncio.CancelledError:
pass
_sync_task = None
logger.info("Stopped periodic radio sync")

View File

@@ -448,18 +448,15 @@ class MessageRepository:
class RawPacketRepository:
@staticmethod
async def create(data: bytes, timestamp: int | None = None) -> int | None:
"""Create a raw packet. Returns None if duplicate (same data already exists)."""
async def create(data: bytes, timestamp: int | None = None) -> int:
"""Create a raw packet. Returns the packet ID."""
ts = timestamp or int(time.time())
cursor = await db.conn.execute(
"INSERT OR IGNORE INTO raw_packets (timestamp, data) VALUES (?, ?)",
"INSERT INTO raw_packets (timestamp, data) VALUES (?, ?)",
(ts, data),
)
await db.conn.commit()
# rowcount is 0 if INSERT was ignored due to duplicate, 1 if inserted
if cursor.rowcount == 0:
return None
return cursor.lastrowid
@staticmethod

View File

@@ -478,49 +478,6 @@ class TestRawPacketRepository:
db._connection = original_conn
await conn.close()
@pytest.mark.asyncio
async def test_create_returns_none_for_duplicate_packet(self):
"""Second insert of same packet data returns None (duplicate)."""
import aiosqlite
from app.repository import RawPacketRepository
from app.database import db
# Use in-memory database for testing
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
# Create the raw_packets table
await conn.execute("""
CREATE TABLE raw_packets (
id INTEGER PRIMARY KEY,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL UNIQUE,
decrypted INTEGER DEFAULT 0,
message_id INTEGER,
decrypt_attempts INTEGER DEFAULT 0,
last_attempt INTEGER
)
""")
await conn.commit()
# Patch the db._connection to use our test connection
original_conn = db._connection
db._connection = conn
try:
packet_data = b"\x01\x02\x03\x04\x05"
# First insert succeeds
first_id = await RawPacketRepository.create(packet_data, 1234567890)
assert first_id is not None
# Second insert of same data returns None
second_id = await RawPacketRepository.create(packet_data, 1234567891)
assert second_id is None
finally:
db._connection = original_conn
await conn.close()
@pytest.mark.asyncio
async def test_different_packets_both_stored(self):
"""Different packet data both get stored with unique IDs."""