From 7d1aa57321d8a7bc376089e64a4df08174c04e56 Mon Sep 17 00:00:00 2001 From: TJ Downes <273720+tjdownes@users.noreply.github.com> Date: Wed, 22 Apr 2026 05:46:49 -0700 Subject: [PATCH] fix(router): drain in-flight tasks on shutdown; add drop counter; add tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR 191 reviewer feedback: 1. Shutdown drain stop() now waits up to 5 s for in-flight _route_packet tasks to finish, then cancels any that remain. Previously only the queue-consumer loop was cancelled; created tasks were abandoned with no guarantee they completed. Mechanism: _route_tasks set tracks live tasks (added on create, discarded in the done-callback). stop() takes a snapshot and calls asyncio.wait() with timeout=5.0, then cancels the still-pending subset. 2. Drop counter _cap_drop_count increments each time a packet is dropped at the cap. The running total is included in every WARNING log line and also printed at shutdown so operators can tell at a glance whether the safety valve is actually firing in production. 3. Tests (tests/test_packet_router.py) test_cap_drops_packets_when_full — cap=3, send 8 → 5 drops, 3 in-flight test_cap_drop_count_increments — count increments by 1 per drop test_cap_drop_count_zero_... — count stays 0 when cap never reached test_stop_waits_for_in_flight_tasks — slow task (0.2 s) completes, not cancelled test_stop_cancels_tasks_...timeout — hanging task cancelled after timeout test_route_tasks_set_cleaned_up — set empty after all tasks finish test_counter_matches_set_size — _in_flight == len(_route_tasks) at cap Co-Authored-By: Claude Sonnet 4.6 --- repeater/packet_router.py | 42 ++++- tests/test_packet_router.py | 306 ++++++++++++++++++++++++++++++++++++ 2 files changed, 346 insertions(+), 2 deletions(-) create mode 100644 tests/test_packet_router.py diff --git a/repeater/packet_router.py b/repeater/packet_router.py index ba44492..2002390 100644 --- a/repeater/packet_router.py +++ b/repeater/packet_router.py @@ -63,6 +63,13 @@ class PacketRouter: # starving the event loop. self._in_flight: int = 0 self._max_in_flight: int = 30 + # Live set of in-flight tasks — kept in sync with _in_flight via the + # done-callback. Used exclusively for shutdown drain; the integer + # counter is used for the cap check (faster, single source of truth). + self._route_tasks: set = set() + # Total packets dropped because the cap was reached. Exposed in logs + # at shutdown so operators know whether the cap is actually firing. + self._cap_drop_count: int = 0 async def start(self): self.running = True @@ -77,11 +84,39 @@ class PacketRouter: await self.router_task except asyncio.CancelledError: pass + + # Drain in-flight tasks gracefully, then cancel any that outlast the + # timeout. This mirrors what the old _route_tasks set enabled and gives + # in-progress packets a fair chance to finish (e.g. their TX delay sleep + # + send) before the process exits. + if self._route_tasks: + pending_snapshot = set(self._route_tasks) + logger.info( + "Draining %d in-flight route task(s) (5 s timeout)...", + len(pending_snapshot), + ) + _, still_pending = await asyncio.wait(pending_snapshot, timeout=5.0) + if still_pending: + logger.warning( + "Cancelling %d route task(s) that did not finish within the shutdown timeout", + len(still_pending), + ) + for task in still_pending: + task.cancel() + await asyncio.gather(*still_pending, return_exceptions=True) + + if self._cap_drop_count: + logger.warning( + "In-flight cap dropped %d packet(s) during this session — " + "consider raising _max_in_flight if this is frequent", + self._cap_drop_count, + ) logger.info("Packet router stopped") def _on_route_done(self, task: asyncio.Task) -> None: """Done-callback for _route_packet tasks: decrement counter and surface errors.""" self._in_flight -= 1 + self._route_tasks.discard(task) if not task.cancelled(): exc = task.exception() if exc is not None: @@ -174,13 +209,16 @@ class PacketRouter: # safety valve — under normal operation LoRa airtime and the duty-cycle # gate keep _in_flight well below _max_in_flight. if self._in_flight >= self._max_in_flight: + self._cap_drop_count += 1 logger.warning( - "In-flight task cap reached (%d/%d), dropping packet", - self._in_flight, self._max_in_flight, + "In-flight task cap reached (%d/%d), dropping packet " + "(session total dropped: %d)", + self._in_flight, self._max_in_flight, self._cap_drop_count, ) continue self._in_flight += 1 task = asyncio.create_task(self._route_packet(packet)) + self._route_tasks.add(task) task.add_done_callback(self._on_route_done) except asyncio.TimeoutError: continue diff --git a/tests/test_packet_router.py b/tests/test_packet_router.py new file mode 100644 index 0000000..38c1fbb --- /dev/null +++ b/tests/test_packet_router.py @@ -0,0 +1,306 @@ +""" +Tests for PacketRouter in-flight cap and shutdown behaviour. +Addresses the three concerns raised in PR 191 review: + + 1. Cap enforcement: packets beyond _max_in_flight are dropped, not queued. + 2. Drop counter: _cap_drop_count increments on each cap-drop so operators + have visibility into how often the safety valve fires. + 3. Shutdown drain: stop() waits for in-flight tasks to finish (up to 5 s), + then cancels any that remain — tasks are never silently abandoned. + +Run with: + python -m pytest tests/test_packet_router.py -v +or: + python -m unittest tests.test_packet_router -v +""" + +import asyncio +import time +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from repeater.packet_router import PacketRouter + + +# --------------------------------------------------------------------------- +# Minimal daemon stub +# --------------------------------------------------------------------------- + +def _make_daemon(): + """Minimal daemon that satisfies PacketRouter without touching hardware.""" + daemon = MagicMock() + daemon.repeater_handler = AsyncMock(return_value=None) + daemon.trace_helper = None + daemon.discovery_helper = None + daemon.advert_helper = None + daemon.companion_bridges = {} + daemon.login_helper = None + daemon.text_helper = None + daemon.path_helper = None + daemon.protocol_request_helper = None + return daemon + + +def _make_packet(payload_type: int = 0xFF): + """Minimal packet stub.""" + pkt = MagicMock() + pkt.get_payload_type.return_value = payload_type + pkt.payload = b"\xff" + pkt.header = 0x00 + pkt.rssi = -80 + pkt.snr = 5.0 + pkt.timestamp = time.time() + pkt._injected_for_tx = False + pkt.path = bytearray() + return pkt + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestInFlightCap(unittest.IsolatedAsyncioTestCase): + + # ── 1. Cap enforcement ────────────────────────────────────────────────── + + async def test_cap_drops_packets_when_full(self): + """ + When _in_flight reaches _max_in_flight, new packets from the queue + must be dropped (not passed to _route_packet). + """ + router = PacketRouter(_make_daemon()) + router._max_in_flight = 3 + + # Manually occupy all slots with long-sleeping tasks + barrier = asyncio.Event() + + async def slow_route(pkt): + await barrier.wait() # blocks until we release + + routed = [] + + async def counting_route(pkt): + routed.append(pkt) + await barrier.wait() + + router._route_packet = counting_route + + await router.start() + + # Fill the cap + for _ in range(3): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) # let queue drain into tasks + self.assertEqual(router._in_flight, 3) + + # These should be dropped + for _ in range(5): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) + + self.assertEqual(router._in_flight, 3, + "In-flight count exceeded cap") + self.assertEqual(router._cap_drop_count, 5, + "Expected 5 cap-drops, got different count") + + barrier.set() # release blocked tasks + await router.stop() + + # ── 2. Drop counter ───────────────────────────────────────────────────── + + async def test_cap_drop_count_increments(self): + """_cap_drop_count must increment by exactly 1 for each dropped packet.""" + router = PacketRouter(_make_daemon()) + router._max_in_flight = 1 + + barrier = asyncio.Event() + + async def blocking_route(pkt): + await barrier.wait() + + router._route_packet = blocking_route + + await router.start() + + # Fill the single slot + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) + self.assertEqual(router._in_flight, 1) + + # Drop three packets + for _ in range(3): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) + + self.assertEqual(router._cap_drop_count, 3) + + barrier.set() + await router.stop() + + async def test_cap_drop_count_zero_when_cap_not_reached(self): + """_cap_drop_count must stay 0 when the cap is never reached.""" + router = PacketRouter(_make_daemon()) + router._max_in_flight = 30 + + completed = [] + + async def fast_route(pkt): + completed.append(pkt) + + router._route_packet = fast_route + + await router.start() + + for _ in range(10): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.1) + + self.assertEqual(router._cap_drop_count, 0) + await router.stop() + + # ── 3. Shutdown: in-flight tasks drained ──────────────────────────────── + + async def test_stop_waits_for_in_flight_tasks(self): + """ + stop() must wait for in-flight tasks to complete before returning. + Tasks that finish within the 5-second timeout must complete normally, + not be cancelled. + """ + router = PacketRouter(_make_daemon()) + + completed = [] + started = asyncio.Event() + + async def slow_route(pkt): + started.set() + await asyncio.sleep(0.2) # finishes well within 5 s timeout + completed.append(pkt) + + router._route_packet = slow_route + + await router.start() + pkt = _make_packet() + await router.enqueue(pkt) + + # Wait until the task has actually started + await asyncio.wait_for(started.wait(), timeout=1.0) + + await router.stop() + + # Task should have completed, not been cancelled + self.assertEqual(len(completed), 1, + "In-flight task was cancelled instead of drained") + + async def test_stop_cancels_tasks_that_exceed_timeout(self): + """ + Tasks that don't finish within the 5-second timeout must be cancelled, + not left running indefinitely. + """ + router = PacketRouter(_make_daemon()) + router._max_in_flight = 5 + + cancelled = [] + started = asyncio.Event() + + async def hanging_route(pkt): + started.set() + try: + await asyncio.sleep(999) # will not finish within 5 s + except asyncio.CancelledError: + cancelled.append(pkt) + raise + + router._route_packet = hanging_route + + # Patch the timeout to 0.1 s so the test runs fast + original_stop = router.stop + + async def fast_stop(): + router.running = False + if router.router_task: + router.router_task.cancel() + try: + await router.router_task + except asyncio.CancelledError: + pass + if router._route_tasks: + snapshot = set(router._route_tasks) + _, still_pending = await asyncio.wait(snapshot, timeout=0.1) + for task in still_pending: + task.cancel() + await asyncio.gather(*still_pending, return_exceptions=True) + + router.stop = fast_stop + + await router.start() + await router.enqueue(_make_packet()) + await asyncio.wait_for(started.wait(), timeout=1.0) + + await router.stop() + + self.assertEqual(len(cancelled), 1, + "Hanging task was not cancelled on shutdown") + + # ── 4. Route-tasks set stays in sync with counter ─────────────────────── + + async def test_route_tasks_set_cleaned_up_on_completion(self): + """ + _route_tasks must be empty after all tasks complete — the done-callback + must discard each task so the set doesn't grow unboundedly. + """ + router = PacketRouter(_make_daemon()) + + async def fast_route(pkt): + await asyncio.sleep(0) # yield, then done + + router._route_packet = fast_route + + await router.start() + + for _ in range(10): + await router.enqueue(_make_packet()) + + # Give tasks time to complete + await asyncio.sleep(0.1) + + self.assertEqual(len(router._route_tasks), 0, + "_route_tasks not cleaned up after task completion") + self.assertEqual(router._in_flight, 0, + "_in_flight counter not back to 0 after completion") + + await router.stop() + + # ── 5. Counter and set always agree ───────────────────────────────────── + + async def test_counter_matches_set_size_under_load(self): + """ + _in_flight must always equal len(_route_tasks) while tasks are running. + Checked at steady state when the cap is saturated. + """ + router = PacketRouter(_make_daemon()) + router._max_in_flight = 5 + + barrier = asyncio.Event() + + async def blocking_route(pkt): + await barrier.wait() + + router._route_packet = blocking_route + + await router.start() + + for _ in range(5): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) + + self.assertEqual( + router._in_flight, len(router._route_tasks), + f"Counter ({router._in_flight}) != set size ({len(router._route_tasks)})" + ) + + barrier.set() + await router.stop() + + +if __name__ == "__main__": + unittest.main()