diff --git a/repeater/engine.py b/repeater/engine.py index 6810fda..29f6ce1 100644 --- a/repeater/engine.py +++ b/repeater/engine.py @@ -65,6 +65,7 @@ class RepeaterHandler(BaseHandler): 300, config.get("repeater", {}).get("cache_ttl", 3600) ) # Min 5 min, default 1 hour self.max_cache_size = 1000 + self.max_duplicates_per_packet = 20 self.tx_delay_factor = config.get("delays", {}).get("tx_delay_factor", 1.0) self.direct_tx_delay_factor = config.get("delays", {}).get("direct_tx_delay_factor", 0.5) self.use_score_for_tx = config.get("repeater", {}).get("use_score_for_tx", False) @@ -121,6 +122,8 @@ class RepeaterHandler(BaseHandler): # Initialize background timer tracking self.last_noise_measurement = time.time() + self.last_cache_cleanup = time.time() + self.last_db_cleanup = time.time() self.noise_floor_interval = NOISE_FLOOR_INTERVAL # 30 seconds self._background_task = None self._last_crc_error_count = 0 # Track radio counter for delta persistence @@ -375,7 +378,8 @@ class RepeaterHandler(BaseHandler): # Add duplicate to original packet's duplicate list if "duplicates" not in prev_pkt: prev_pkt["duplicates"] = [] - prev_pkt["duplicates"].append(packet_record) + if len(prev_pkt["duplicates"]) < self.max_duplicates_per_packet: + prev_pkt["duplicates"].append(packet_record) # Don't add duplicate to main list, just track in original break else: @@ -1108,6 +1112,27 @@ class RepeaterHandler(BaseHandler): await self._send_periodic_advert_async() self.last_advert_time = current_time + # Prune expired entries from duplicate detection cache (every 60s) + if current_time - self.last_cache_cleanup >= 60.0: + self.cleanup_cache() + self.last_cache_cleanup = current_time + + # Prune old SQLite data (check every 6 hours) + if current_time - self.last_db_cleanup >= 21600: + if self.storage: + try: + retention_days = ( + self.config + .get("storage", {}) + .get("retention", {}) + .get("sqlite_cleanup_days", 31) + ) + self.storage.cleanup_old_data(days=retention_days) + logger.info("Cleaned up SQLite data older than %d days", retention_days) + except Exception as e: + logger.warning(f"SQLite cleanup failed: {e}") + self.last_db_cleanup = current_time + # Sleep for 5 seconds before next check await asyncio.sleep(5.0) diff --git a/repeater/main.py b/repeater/main.py index de97609..05d2907 100644 --- a/repeater/main.py +++ b/repeater/main.py @@ -1,6 +1,8 @@ import asyncio +import functools import logging import os +import signal import sys import time @@ -957,6 +959,11 @@ class RepeaterDaemon: logger.error(f"Failed to send advert: {e}", exc_info=True) return False + def _signal_shutdown(self, sig, loop): + """Handle SIGTERM/SIGINT by scheduling async shutdown.""" + logger.info(f"Received signal {sig.name}, shutting down...") + loop.create_task(self._shutdown()) + async def _shutdown(self): """Best-effort shutdown: stop background services and release hardware.""" # Stop router @@ -1004,6 +1011,14 @@ class RepeaterDaemon: logger.info("Repeater daemon started") + # Register signal handlers for graceful shutdown + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler( + sig, + functools.partial(self._signal_shutdown, sig, loop), + ) + # Warn if running inside a container (udev rules won't work here) if os.path.exists("/.dockerenv") or os.environ.get("container") or self._detect_container(): logger.warning( diff --git a/repeater/packet_router.py b/repeater/packet_router.py index 090949f..187eeef 100644 --- a/repeater/packet_router.py +++ b/repeater/packet_router.py @@ -47,7 +47,7 @@ class PacketRouter: def __init__(self, daemon_instance): self.daemon = daemon_instance - self.queue = asyncio.Queue() + self.queue = asyncio.Queue(maxsize=500) self.running = False self.router_task = None # Serialize injects so one local TX completes before the next is processed @@ -94,6 +94,12 @@ class PacketRouter: async def enqueue(self, packet): """Add packet to router queue.""" + if self.queue.full(): + logger.warning("Packet router queue full (%d), dropping oldest", self.queue.maxsize) + try: + self.queue.get_nowait() + except asyncio.QueueEmpty: + pass await self.queue.put(packet) async def inject_packet(self, packet, wait_for_ack: bool = False):