diff --git a/app/main.py b/app/main.py index 9c6343a18..a2fbfeb83 100644 --- a/app/main.py +++ b/app/main.py @@ -70,8 +70,8 @@ async def lifespan(app: FastAPI): logger.info("Shutting down") await radio_manager.stop_connection_monitor() - stop_message_polling() - stop_periodic_sync() + await stop_message_polling() + await stop_periodic_sync() if radio_manager.meshcore: await radio_manager.meshcore.stop_auto_message_fetching() await radio_manager.disconnect() diff --git a/app/packet_processor.py b/app/packet_processor.py index 88bd366f9..4f746e0f0 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -149,24 +149,6 @@ async def process_raw_packet( ts = timestamp or int(time.time()) packet_id = await RawPacketRepository.create(raw_bytes, ts) - - # If packet_id is None, this is a duplicate packet (same data already exists) - # Skip processing since we've already handled this exact packet - if packet_id is None: - logger.debug("Duplicate raw packet detected, skipping") - return { - "packet_id": None, - "timestamp": ts, - "raw_hex": raw_bytes.hex(), - "payload_type": "Duplicate", - "snr": snr, - "rssi": rssi, - "decrypted": False, - "message_id": None, - "channel_name": None, - "sender": None, - } - raw_hex = raw_bytes.hex() # Parse packet to get type diff --git a/app/radio.py b/app/radio.py index 98cc2d4ee..d093c67cc 100644 --- a/app/radio.py +++ b/app/radio.py @@ -105,7 +105,7 @@ class RadioManager: self._port: str | None = None self._reconnect_task: asyncio.Task | None = None self._last_connected: bool = False - self._reconnecting: bool = False + self._reconnect_lock: asyncio.Lock | None = None @property def meshcore(self) -> MeshCore | None: @@ -121,7 +121,7 @@ class RadioManager: @property def is_reconnecting(self) -> bool: - return self._reconnecting + return self._reconnect_lock is not None and self._reconnect_lock.locked() async def connect(self) -> None: """Connect to the radio over serial.""" @@ -164,43 +164,47 @@ class RadioManager: """Attempt to reconnect to the radio. Returns True if reconnection was successful, False otherwise. + Uses a lock to prevent concurrent reconnection attempts. """ from app.websocket import broadcast_error, broadcast_health - if self._reconnecting: + # Lazily initialize lock (can't create in __init__ before event loop exists) + if self._reconnect_lock is None: + self._reconnect_lock = asyncio.Lock() + + # Try to acquire lock without blocking to check if reconnect is in progress + if self._reconnect_lock.locked(): logger.debug("Reconnection already in progress") return False - self._reconnecting = True - logger.info("Attempting to reconnect to radio...") + async with self._reconnect_lock: + logger.info("Attempting to reconnect to radio...") - try: - # Disconnect if we have a stale connection - if self._meshcore is not None: - try: - await self._meshcore.disconnect() - except Exception: - pass - self._meshcore = None + try: + # Disconnect if we have a stale connection + if self._meshcore is not None: + try: + await self._meshcore.disconnect() + except Exception: + pass + self._meshcore = None - # Try to connect (will auto-detect if no port specified) - await self.connect() + # Try to connect (will auto-detect if no port specified) + await self.connect() - if self.is_connected: - logger.info("Radio reconnected successfully at %s", self._port) - broadcast_health(True, self._port) - return True - else: - logger.warning("Reconnection failed: not connected after connect()") + if self.is_connected: + logger.info("Radio reconnected successfully at %s", self._port) + broadcast_health(True, self._port) + return True + else: + logger.warning("Reconnection failed: not connected after connect()") + return False + + except Exception as e: + logger.warning("Reconnection failed: %s", e) + broadcast_error("Reconnection failed", str(e)) return False - except Exception as e: - logger.warning("Reconnection failed: %s", e) - broadcast_error("Reconnection failed", str(e)) - return False - finally: - self._reconnecting = False - async def start_connection_monitor(self) -> None: """Start background task to monitor connection and auto-reconnect.""" if self._reconnect_task is not None: diff --git a/app/radio_sync.py b/app/radio_sync.py index c8ae76b8e..6903fbbdb 100644 --- a/app/radio_sync.py +++ b/app/radio_sync.py @@ -319,11 +319,17 @@ def start_message_polling(): logger.info("Started periodic message polling (interval: %ds)", MESSAGE_POLL_INTERVAL) -def stop_message_polling(): +async def stop_message_polling(): """Stop the periodic message polling background task.""" global _message_poll_task if _message_poll_task and not _message_poll_task.done(): _message_poll_task.cancel() + try: + await _message_poll_task + except asyncio.CancelledError: + pass + _message_poll_task = None + logger.info("Stopped periodic message polling") async def _periodic_sync_loop(): @@ -348,11 +354,16 @@ def start_periodic_sync(): logger.info("Started periodic radio sync (interval: %ds)", SYNC_INTERVAL) -def stop_periodic_sync(): +async def stop_periodic_sync(): """Stop the periodic sync background task.""" global _sync_task if _sync_task and not _sync_task.done(): _sync_task.cancel() + try: + await _sync_task + except asyncio.CancelledError: + pass + _sync_task = None logger.info("Stopped periodic radio sync") diff --git a/app/repository.py b/app/repository.py index 66743574d..0d449ffa9 100644 --- a/app/repository.py +++ b/app/repository.py @@ -448,18 +448,15 @@ class MessageRepository: class RawPacketRepository: @staticmethod - async def create(data: bytes, timestamp: int | None = None) -> int | None: - """Create a raw packet. Returns None if duplicate (same data already exists).""" + async def create(data: bytes, timestamp: int | None = None) -> int: + """Create a raw packet. Returns the packet ID.""" ts = timestamp or int(time.time()) cursor = await db.conn.execute( - "INSERT OR IGNORE INTO raw_packets (timestamp, data) VALUES (?, ?)", + "INSERT INTO raw_packets (timestamp, data) VALUES (?, ?)", (ts, data), ) await db.conn.commit() - # rowcount is 0 if INSERT was ignored due to duplicate, 1 if inserted - if cursor.rowcount == 0: - return None return cursor.lastrowid @staticmethod diff --git a/tests/test_api.py b/tests/test_api.py index 913af83c1..278f8878c 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -478,49 +478,6 @@ class TestRawPacketRepository: db._connection = original_conn await conn.close() - @pytest.mark.asyncio - async def test_create_returns_none_for_duplicate_packet(self): - """Second insert of same packet data returns None (duplicate).""" - import aiosqlite - from app.repository import RawPacketRepository - from app.database import db - - # Use in-memory database for testing - conn = await aiosqlite.connect(":memory:") - conn.row_factory = aiosqlite.Row - - # Create the raw_packets table - await conn.execute(""" - CREATE TABLE raw_packets ( - id INTEGER PRIMARY KEY, - timestamp INTEGER NOT NULL, - data BLOB NOT NULL UNIQUE, - decrypted INTEGER DEFAULT 0, - message_id INTEGER, - decrypt_attempts INTEGER DEFAULT 0, - last_attempt INTEGER - ) - """) - await conn.commit() - - # Patch the db._connection to use our test connection - original_conn = db._connection - db._connection = conn - - try: - packet_data = b"\x01\x02\x03\x04\x05" - - # First insert succeeds - first_id = await RawPacketRepository.create(packet_data, 1234567890) - assert first_id is not None - - # Second insert of same data returns None - second_id = await RawPacketRepository.create(packet_data, 1234567891) - assert second_id is None - finally: - db._connection = original_conn - await conn.close() - @pytest.mark.asyncio async def test_different_packets_both_stored(self): """Different packet data both get stored with unique IDs."""