feat: enhance RepeaterHandler with duplicate packet limit and cache cleanup, add graceful shutdown handling in RepeaterDaemon, and increase PacketRouter queue size

This commit is contained in:
Lloyd
2026-03-23 14:30:01 +00:00
parent 7d73ca0df6
commit 369b420ae3
3 changed files with 48 additions and 2 deletions

View File

@@ -65,6 +65,7 @@ class RepeaterHandler(BaseHandler):
300, config.get("repeater", {}).get("cache_ttl", 3600)
) # Min 5 min, default 1 hour
self.max_cache_size = 1000
self.max_duplicates_per_packet = 20
self.tx_delay_factor = config.get("delays", {}).get("tx_delay_factor", 1.0)
self.direct_tx_delay_factor = config.get("delays", {}).get("direct_tx_delay_factor", 0.5)
self.use_score_for_tx = config.get("repeater", {}).get("use_score_for_tx", False)
@@ -121,6 +122,8 @@ class RepeaterHandler(BaseHandler):
# Initialize background timer tracking
self.last_noise_measurement = time.time()
self.last_cache_cleanup = time.time()
self.last_db_cleanup = time.time()
self.noise_floor_interval = NOISE_FLOOR_INTERVAL # 30 seconds
self._background_task = None
self._last_crc_error_count = 0 # Track radio counter for delta persistence
@@ -375,7 +378,8 @@ class RepeaterHandler(BaseHandler):
# Add duplicate to original packet's duplicate list
if "duplicates" not in prev_pkt:
prev_pkt["duplicates"] = []
prev_pkt["duplicates"].append(packet_record)
if len(prev_pkt["duplicates"]) < self.max_duplicates_per_packet:
prev_pkt["duplicates"].append(packet_record)
# Don't add duplicate to main list, just track in original
break
else:
@@ -1108,6 +1112,27 @@ class RepeaterHandler(BaseHandler):
await self._send_periodic_advert_async()
self.last_advert_time = current_time
# Prune expired entries from duplicate detection cache (every 60s)
if current_time - self.last_cache_cleanup >= 60.0:
self.cleanup_cache()
self.last_cache_cleanup = current_time
# Prune old SQLite data (check every 6 hours)
if current_time - self.last_db_cleanup >= 21600:
if self.storage:
try:
retention_days = (
self.config
.get("storage", {})
.get("retention", {})
.get("sqlite_cleanup_days", 31)
)
self.storage.cleanup_old_data(days=retention_days)
logger.info("Cleaned up SQLite data older than %d days", retention_days)
except Exception as e:
logger.warning(f"SQLite cleanup failed: {e}")
self.last_db_cleanup = current_time
# Sleep for 5 seconds before next check
await asyncio.sleep(5.0)

View File

@@ -1,6 +1,8 @@
import asyncio
import functools
import logging
import os
import signal
import sys
import time
@@ -957,6 +959,11 @@ class RepeaterDaemon:
logger.error(f"Failed to send advert: {e}", exc_info=True)
return False
def _signal_shutdown(self, sig, loop):
"""Handle SIGTERM/SIGINT by scheduling async shutdown."""
logger.info(f"Received signal {sig.name}, shutting down...")
loop.create_task(self._shutdown())
async def _shutdown(self):
"""Best-effort shutdown: stop background services and release hardware."""
# Stop router
@@ -1004,6 +1011,14 @@ class RepeaterDaemon:
logger.info("Repeater daemon started")
# Register signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
functools.partial(self._signal_shutdown, sig, loop),
)
# Warn if running inside a container (udev rules won't work here)
if os.path.exists("/.dockerenv") or os.environ.get("container") or self._detect_container():
logger.warning(

View File

@@ -47,7 +47,7 @@ class PacketRouter:
def __init__(self, daemon_instance):
self.daemon = daemon_instance
self.queue = asyncio.Queue()
self.queue = asyncio.Queue(maxsize=500)
self.running = False
self.router_task = None
# Serialize injects so one local TX completes before the next is processed
@@ -94,6 +94,12 @@ class PacketRouter:
async def enqueue(self, packet):
"""Add packet to router queue."""
if self.queue.full():
logger.warning("Packet router queue full (%d), dropping oldest", self.queue.maxsize)
try:
self.queue.get_nowait()
except asyncio.QueueEmpty:
pass
await self.queue.put(packet)
async def inject_packet(self, packet, wait_for_ack: bool = False):