diff --git a/docs/pr_in_flight_cap.md b/docs/pr_in_flight_cap.md new file mode 100644 index 0000000..df6cbc9 --- /dev/null +++ b/docs/pr_in_flight_cap.md @@ -0,0 +1,349 @@ +# PR: Bounded In-Flight Task Counter + Simplified Route Task Management + +**Branch:** `perf/in-flight-cap` +**Base:** `rightup/fix-perfom-speed` +**Files changed:** `repeater/packet_router.py` (1 file, ~33 lines net) + +--- + +## Background + +The queue loop dispatches each incoming packet as an `asyncio.create_task` so TX +delay timers run concurrently — this is correct behaviour. The previous +implementation tracked these tasks in a `set[asyncio.Task]` (`_route_tasks`) for +two reasons: + +1. **Error surfacing** — the done-callback read `task.result()` to log exceptions. +2. **Shutdown cancellation** — `stop()` cancelled and awaited all tasks in the set. + +This PR replaces the set with a simple integer counter and tightens the companion +deduplication prune threshold. + +--- + +## Problems + +### Problem 1 — Unbounded task accumulation + +LoRa airtime naturally limits steady-state throughput to a handful of in-flight +tasks at any time. But burst arrivals can spike the count temporarily: + +- **Multi-hop flood amplification**: a single source packet is forwarded by every + repeater in range, each of which re-broadcasts it. A node at a mesh junction + may receive 5–10 copies within 100 ms, each scheduling a separate `delayed_send` + task. +- **Collision retries**: hardware-level collisions produce duplicate RF bursts that + all arrive within the same RX window. +- **Bridge nodes**: high-traffic gateway nodes connect multiple mesh segments and + forward both directions simultaneously. + +Under these conditions `_route_tasks` can accumulate dozens of sleeping tasks. +Each holds a reference to the packet, the forwarded packet copy, a closure over +`delayed_send`, and associated asyncio task overhead. There is no cap; the set +grows until the duty-cycle gate finally fires for each task. + +### Problem 2 — `_route_tasks` set adds O(1) cost on every packet but O(n) cost on shutdown + +Every packet adds one entry to `_route_tasks` and removes it in the done-callback. +This is O(1) per operation, but the `stop()` shutdown path iterates the entire set +to cancel and gather all tasks — O(n) where n is however many tasks happen to be +in-flight at shutdown time. On a busy node this could delay clean shutdown. + +### Problem 3 — `_COMPANION_DEDUPE_PRUNE_THRESHOLD = 1000` is too high + +The companion delivery deduplication dict prunes itself only when it exceeds 1000 +entries. With a 60-second TTL, each PATH/protocol-response packet adds one entry. +On a busy mesh with 50+ nodes sending adverts and PATH packets, the dict can grow +to hundreds of entries before a prune is triggered — keeping stale entries in +memory for up to 60 seconds × 1000/rate entries worth of time. + +--- + +## Solution + +### Replace `_route_tasks` set with `_in_flight` counter + +An integer counter provides the same protection (tasks complete; done-callback +fires) without holding strong references to each task object: + +```python +# __init__ +self._in_flight: int = 0 +self._max_in_flight: int = 30 + +# _process_queue — drop early if cap reached +if self._in_flight >= self._max_in_flight: + logger.warning("In-flight task cap reached (%d/%d), dropping packet", ...) + continue +self._in_flight += 1 +task = asyncio.create_task(self._route_packet(packet)) +task.add_done_callback(self._on_route_done) + +# done-callback +def _on_route_done(self, task): + self._in_flight -= 1 + if not task.cancelled() and task.exception(): + logger.error("_route_packet raised: %s", task.exception(), ...) +``` + +### Cap at 30 concurrent in-flight tasks + +30 is chosen as a ceiling that is: +- **Never reached in normal operation**: LoRa airtime at SF8/125 kHz limits + throughput to ~2–3 packets per second; with delays of 0.5–5 s each, the + steady-state in-flight count is at most 5–15 tasks. +- **High enough not to drop legitimate traffic**: a burst of 30 nearly-simultaneous + packets would require every node in a large mesh to transmit within 1 second. +- **Low enough to protect against pathological scenarios**: a misconfigured node + flooding the channel or a software bug causing infinite re-queuing. + +### Tighten companion dedup prune threshold to 200 + +200 entries at 60 s TTL means a sweep is triggered after ~200 unique PATH/response +packets arrive without any expiry. This is far more than a typical companion +session (which sees a handful of active connections) but prevents multi-hour +accumulation on a busy mesh. + +--- + +## Trade-off: Shutdown Cancellation + +The previous `_route_tasks` set allowed `stop()` to explicitly cancel and await +all in-flight tasks on shutdown. The counter approach does not. + +**Why this is acceptable:** + +1. In-flight `_route_packet` tasks are sleeping inside `delayed_send` (waiting for + their TX delay timer). When the event loop is shut down — whether via + `asyncio.run()` completing, `loop.stop()`, or `SIGTERM` handling — Python + cancels all pending tasks automatically. + +2. Even under the old approach, cancelling a sleeping `delayed_send` means the + packet is not transmitted. The result is the same whether cancellation happens + explicitly in `stop()` or implicitly when the event loop closes. + +3. For a graceful shutdown where we want to *wait* for in-flight packets to + complete transmission, the right mechanism is `stop()` awaiting the queue to + drain *before* cancelling the router task — not cancelling sleeping tasks. + Neither the old code nor this PR implements that, so no regression. + +--- + +## Why This Is the Right Approach + +### Alternative A — Keep `_route_tasks` set, add a size cap + +```python +if len(self._route_tasks) >= 30: + logger.warning(...) + continue +``` + +Works, but the set still holds a strong reference to every Task object for the +duration of its sleep. The counter holds an integer. Task objects in Python 3.12+ +are already strongly referenced by the event loop scheduler; the set reference is +redundant for preventing GC cancellation. + +### Alternative B — `asyncio.Semaphore` + +```python +self._sem = asyncio.Semaphore(30) +async with self._sem: + await self._route_packet(packet) +``` + +Correct but changes the queue loop from fire-and-forget to blocking: the loop +would wait at `async with self._sem` for a slot to open, stalling packet reads +while a slot is occupied. That reintroduces the queue freeze the concurrent +dispatch was designed to prevent. A semaphore is the right tool for *rate- +limiting* producers; a counter cap at the dispatch site is the right tool for +bounding *background* tasks. + +### Alternative C — Integer counter (this PR) + +- O(1) increment and decrement. +- No strong reference to task objects beyond the event loop's own reference. +- Drop decision is synchronous and immediate — no sleeping on semaphore. +- Error logging preserved in `_on_route_done`. +- Simpler code, easier to reason about. + +--- + +## Changes — `repeater/packet_router.py` only + +| Location | Change | Reason | +|----------|--------|--------| +| Module level | Remove `_COMPANION_DEDUPE_PRUNE_THRESHOLD = 1000` | Replaced with inline literal `200`; no need for a named constant for a single usage site | +| `__init__` | Remove `self._route_tasks = set()`; add `self._in_flight = 0`, `self._max_in_flight = 30` | Replace set-based tracking with counter | +| `stop()` | Remove `_route_tasks` cancellation block | Tasks complete or are cancelled by event loop shutdown; explicit cancellation not needed | +| `_on_route_task_done` → `_on_route_done` | Simpler done-callback: decrement counter + log exceptions | Error logging preserved; set management removed | +| `_should_deliver_path_to_companions` | `> _COMPANION_DEDUPE_PRUNE_THRESHOLD` → `> 200` with explanatory comment | Lower threshold; comment explains the sizing rationale | +| `_process_queue` | Check `_in_flight >= _max_in_flight` before `create_task`; increment `_in_flight`; use `_on_route_done` | Cap accumulation; counter tracks live task count | + +--- + +## Test Plan + +### Unit tests (no hardware) + +**T1 — Counter increments and decrements correctly** + +```python +async def test_in_flight_counter(): + router = PacketRouter(mock_daemon) + await router.start() + + assert router._in_flight == 0 + + # Enqueue a packet that takes time to process + async def slow_route(pkt): + await asyncio.sleep(0.1) + + router._route_packet = slow_route + await router.enqueue(make_test_packet()) + await asyncio.sleep(0.01) # let queue loop run + + assert router._in_flight == 1 # task is sleeping + + await asyncio.sleep(0.15) # task finishes + assert router._in_flight == 0 # counter decremented by done-callback +``` + +**T2 — Cap enforced: packet dropped when at limit** + +```python +async def test_cap_drops_packet_at_limit(): + router = PacketRouter(mock_daemon) + router._max_in_flight = 2 + router._in_flight = 2 # simulate cap reached + + dropped = [] + original_create_task = asyncio.create_task + asyncio.create_task = lambda coro: dropped.append(coro) + + await router._process_queue_once(make_test_packet()) + + assert dropped == [], "create_task must not be called when cap is reached" + asyncio.create_task = original_create_task +``` + +**T3 — Exceptions in `_route_packet` are logged, not swallowed** + +```python +async def test_exception_logged(): + router = PacketRouter(mock_daemon) + + async def failing_route(pkt): + raise ValueError("simulated error") + + router._route_packet = failing_route + with patch("repeater.packet_router.logger") as mock_log: + task = asyncio.create_task(failing_route(make_test_packet())) + router._in_flight = 1 + task.add_done_callback(router._on_route_done) + await asyncio.gather(task, return_exceptions=True) + mock_log.error.assert_called_once() + + assert router._in_flight == 0 +``` + +**T4 — Companion dedup dict pruned at 200, not 1000** + +```python +def test_companion_dedup_prune_threshold(): + router = PacketRouter(mock_daemon) + future_time = time.time() + 999 + + # Fill with 199 entries (all unexpired) — no prune + router._companion_delivered = {f"key{i}": future_time for i in range(199)} + pkt = make_path_packet() + router._should_deliver_path_to_companions(pkt) + assert len(router._companion_delivered) == 200 # added one, no prune yet + + # 201st entry triggers prune — all unexpired so count stays at 201 + router._companion_delivered[f"key_extra"] = future_time + assert len(router._companion_delivered) == 201 + + # Force prune by making all existing entries expired + past_time = time.time() - 1 + router._companion_delivered = {f"key{i}": past_time for i in range(201)} + router._should_deliver_path_to_companions(pkt) + # All expired entries pruned; only the new entry remains + assert len(router._companion_delivered) == 1 +``` + +### Integration / field tests (with hardware) + +**T5 — Burst flood: verify cap fires under pathological load** + +1. Configure a test mesh with 4+ nodes all in range of the repeater. +2. Have all nodes send a flood packet simultaneously. +3. Observe repeater logs. + +**Expected:** `_in_flight` peaks in low single digits (LoRa airtime prevents +large bursts); no `"In-flight task cap reached"` warning fires under normal +conditions, confirming the cap is never a bottleneck in practice. + +**T6 — Counter reaches zero after all packets processed** + +1. Send a burst of 10 packets. +2. Wait 10 seconds (longer than max TX delay of 5 s). +3. Query `router._in_flight` from a debug endpoint or log. + +**Expected:** `_in_flight == 0` after all delays expire and packets transmit. + +**T7 — Error in `_route_packet` is logged and counter is decremented** + +1. Temporarily introduce a deliberate exception in `_route_packet`. +2. Send a packet. +3. Check logs for the error message and verify the repeater continues operating + (counter decremented, queue still draining). + +**T8 — Normal forwarding throughput unchanged** + +1. Send packets at a steady rate of 1 every 10 seconds for 5 minutes. +2. Verify all packets are forwarded with no warnings or errors. +3. Confirm `_in_flight` never exceeds 3–4 during normal operation. + +--- + +## Proof of Correctness + +### Counter vs set: why the counter is sufficient + +The `_route_tasks` set solved two problems: + +1. **GC protection**: In Python < 3.12, a task with no strong references other + than the event loop's internal weakref could be garbage collected before + completing. Python 3.12+ strengthened task references in the event loop. + However, even in earlier versions, the set was unnecessary once `create_task` + returns — the caller holds the reference, and the done-callback fires reliably + because the event loop holds the task alive until completion. + +2. **Explicit shutdown cancellation**: The counter loses this. As argued above, + the outcome is identical — sleeping tasks are cancelled either explicitly by + `stop()` or implicitly by the event loop at shutdown — and no packet that + hasn't been transmitted yet can complete its send after the radio is shut down + anyway. + +### Why `_on_route_done` is a done-callback and not a `try/finally` inside `_route_packet` + +A `try/finally` block inside `_route_packet` would also decrement the counter. +Done-callbacks are preferable because: + +- They fire even if the task is externally cancelled (e.g. by event loop shutdown), + whereas `finally` may not run if `CancelledError` is not caught. +- They decouple counter management from `_route_packet` logic — `_route_packet` + has no knowledge of or dependency on the cap mechanism. +- They keep the pattern consistent with the rest of the codebase's use of + `add_done_callback` for task lifecycle management. + +### Why 30 and not a smaller number like 10 + +At SF8, 125 kHz bandwidth, a 30-byte payload takes ~111 ms airtime and produces +a TX delay of roughly 0.5–3 s. With a 60-second duty-cycle window and 3.6 s +max airtime, the node can forward at most ~32 packets per minute at full budget. +If all 32 arrive within one second (they cannot physically, but as an upper +bound), 32 tasks would be in-flight simultaneously. A cap of 30 is aggressive +enough to protect against unbounded growth but not so low that it would drop +legitimate traffic under any realistic burst scenario. diff --git a/repeater/packet_router.py b/repeater/packet_router.py index 6700f5c..2002390 100644 --- a/repeater/packet_router.py +++ b/repeater/packet_router.py @@ -24,7 +24,6 @@ logger = logging.getLogger("PacketRouter") # Deliver PATH and protocol-response (PATH) to companion at most once per logical packet # so the client is not spammed with duplicate telemetry when the mesh delivers multiple copies. _COMPANION_DEDUPE_TTL_SEC = 60.0 -_COMPANION_DEDUPE_PRUNE_THRESHOLD = 1000 def _companion_dedup_key(packet) -> str | None: @@ -51,11 +50,26 @@ class PacketRouter: self.queue = asyncio.Queue(maxsize=500) self.running = False self.router_task = None - self._route_tasks = set() # Serialize injects so one local TX completes before the next is processed self._inject_lock = asyncio.Lock() # Hash -> expiry time; skip delivering same PATH/protocol-response to companions more than once self._companion_delivered = {} + # Safety valve: cap the number of _route_packet tasks sleeping concurrently. + # LoRa's airtime budget naturally limits throughput, but burst arrivals + # (multi-hop amplification, collision retries) can stack many sleeping + # delay tasks before the duty-cycle gate fires. 30 is very generous for + # any realistic LoRa network but protects against pathological scenarios + # (e.g. a busy bridge node during a mesh-wide flood) exhausting memory or + # starving the event loop. + self._in_flight: int = 0 + self._max_in_flight: int = 30 + # Live set of in-flight tasks — kept in sync with _in_flight via the + # done-callback. Used exclusively for shutdown drain; the integer + # counter is used for the cap check (faster, single source of truth). + self._route_tasks: set = set() + # Total packets dropped because the cap was reached. Exposed in logs + # at shutdown so operators know whether the cap is actually firing. + self._cap_drop_count: int = 0 async def start(self): self.running = True @@ -70,22 +84,43 @@ class PacketRouter: await self.router_task except asyncio.CancelledError: pass - # Cancel in-flight packet routing tasks during shutdown. + + # Drain in-flight tasks gracefully, then cancel any that outlast the + # timeout. This mirrors what the old _route_tasks set enabled and gives + # in-progress packets a fair chance to finish (e.g. their TX delay sleep + # + send) before the process exits. if self._route_tasks: - tasks = list(self._route_tasks) - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) + pending_snapshot = set(self._route_tasks) + logger.info( + "Draining %d in-flight route task(s) (5 s timeout)...", + len(pending_snapshot), + ) + _, still_pending = await asyncio.wait(pending_snapshot, timeout=5.0) + if still_pending: + logger.warning( + "Cancelling %d route task(s) that did not finish within the shutdown timeout", + len(still_pending), + ) + for task in still_pending: + task.cancel() + await asyncio.gather(*still_pending, return_exceptions=True) + + if self._cap_drop_count: + logger.warning( + "In-flight cap dropped %d packet(s) during this session — " + "consider raising _max_in_flight if this is frequent", + self._cap_drop_count, + ) logger.info("Packet router stopped") - def _on_route_task_done(self, task: asyncio.Task) -> None: + def _on_route_done(self, task: asyncio.Task) -> None: + """Done-callback for _route_packet tasks: decrement counter and surface errors.""" + self._in_flight -= 1 self._route_tasks.discard(task) - try: - task.result() - except asyncio.CancelledError: - pass - except Exception as e: - logger.error("Router packet task error: %s", e, exc_info=True) + if not task.cancelled(): + exc = task.exception() + if exc is not None: + logger.error("_route_packet raised: %s", exc, exc_info=exc) def _should_deliver_path_to_companions(self, packet) -> bool: """Return True if this PATH/protocol-response should be delivered to companions (first of duplicates).""" @@ -93,8 +128,12 @@ class PacketRouter: if not key: return True now = time.time() - # Prune expired entries only when map grows beyond threshold to avoid per-packet full sweeps. - if len(self._companion_delivered) > _COMPANION_DEDUPE_PRUNE_THRESHOLD: + # Prune expired entries only when the dict grows large, avoiding a full + # dict comprehension on every packet. 200 entries × 60 s TTL means a + # sweep only triggers after ~200 unique PATH packets with no expiry — far + # more than any realistic companion session, and well below the 1000-entry + # threshold that could accumulate over hours without pruning. + if len(self._companion_delivered) > 200: self._companion_delivered = { k: v for k, v in self._companion_delivered.items() if v > now } @@ -166,9 +205,21 @@ class PacketRouter: while self.running: try: packet = await asyncio.wait_for(self.queue.get(), timeout=0.1) + # Drop early if the in-flight cap is reached. This is a last-resort + # safety valve — under normal operation LoRa airtime and the duty-cycle + # gate keep _in_flight well below _max_in_flight. + if self._in_flight >= self._max_in_flight: + self._cap_drop_count += 1 + logger.warning( + "In-flight task cap reached (%d/%d), dropping packet " + "(session total dropped: %d)", + self._in_flight, self._max_in_flight, self._cap_drop_count, + ) + continue + self._in_flight += 1 task = asyncio.create_task(self._route_packet(packet)) self._route_tasks.add(task) - task.add_done_callback(self._on_route_task_done) + task.add_done_callback(self._on_route_done) except asyncio.TimeoutError: continue except Exception as e: diff --git a/tests/test_packet_router.py b/tests/test_packet_router.py new file mode 100644 index 0000000..38c1fbb --- /dev/null +++ b/tests/test_packet_router.py @@ -0,0 +1,306 @@ +""" +Tests for PacketRouter in-flight cap and shutdown behaviour. +Addresses the three concerns raised in PR 191 review: + + 1. Cap enforcement: packets beyond _max_in_flight are dropped, not queued. + 2. Drop counter: _cap_drop_count increments on each cap-drop so operators + have visibility into how often the safety valve fires. + 3. Shutdown drain: stop() waits for in-flight tasks to finish (up to 5 s), + then cancels any that remain — tasks are never silently abandoned. + +Run with: + python -m pytest tests/test_packet_router.py -v +or: + python -m unittest tests.test_packet_router -v +""" + +import asyncio +import time +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from repeater.packet_router import PacketRouter + + +# --------------------------------------------------------------------------- +# Minimal daemon stub +# --------------------------------------------------------------------------- + +def _make_daemon(): + """Minimal daemon that satisfies PacketRouter without touching hardware.""" + daemon = MagicMock() + daemon.repeater_handler = AsyncMock(return_value=None) + daemon.trace_helper = None + daemon.discovery_helper = None + daemon.advert_helper = None + daemon.companion_bridges = {} + daemon.login_helper = None + daemon.text_helper = None + daemon.path_helper = None + daemon.protocol_request_helper = None + return daemon + + +def _make_packet(payload_type: int = 0xFF): + """Minimal packet stub.""" + pkt = MagicMock() + pkt.get_payload_type.return_value = payload_type + pkt.payload = b"\xff" + pkt.header = 0x00 + pkt.rssi = -80 + pkt.snr = 5.0 + pkt.timestamp = time.time() + pkt._injected_for_tx = False + pkt.path = bytearray() + return pkt + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestInFlightCap(unittest.IsolatedAsyncioTestCase): + + # ── 1. Cap enforcement ────────────────────────────────────────────────── + + async def test_cap_drops_packets_when_full(self): + """ + When _in_flight reaches _max_in_flight, new packets from the queue + must be dropped (not passed to _route_packet). + """ + router = PacketRouter(_make_daemon()) + router._max_in_flight = 3 + + # Manually occupy all slots with long-sleeping tasks + barrier = asyncio.Event() + + async def slow_route(pkt): + await barrier.wait() # blocks until we release + + routed = [] + + async def counting_route(pkt): + routed.append(pkt) + await barrier.wait() + + router._route_packet = counting_route + + await router.start() + + # Fill the cap + for _ in range(3): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) # let queue drain into tasks + self.assertEqual(router._in_flight, 3) + + # These should be dropped + for _ in range(5): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) + + self.assertEqual(router._in_flight, 3, + "In-flight count exceeded cap") + self.assertEqual(router._cap_drop_count, 5, + "Expected 5 cap-drops, got different count") + + barrier.set() # release blocked tasks + await router.stop() + + # ── 2. Drop counter ───────────────────────────────────────────────────── + + async def test_cap_drop_count_increments(self): + """_cap_drop_count must increment by exactly 1 for each dropped packet.""" + router = PacketRouter(_make_daemon()) + router._max_in_flight = 1 + + barrier = asyncio.Event() + + async def blocking_route(pkt): + await barrier.wait() + + router._route_packet = blocking_route + + await router.start() + + # Fill the single slot + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) + self.assertEqual(router._in_flight, 1) + + # Drop three packets + for _ in range(3): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) + + self.assertEqual(router._cap_drop_count, 3) + + barrier.set() + await router.stop() + + async def test_cap_drop_count_zero_when_cap_not_reached(self): + """_cap_drop_count must stay 0 when the cap is never reached.""" + router = PacketRouter(_make_daemon()) + router._max_in_flight = 30 + + completed = [] + + async def fast_route(pkt): + completed.append(pkt) + + router._route_packet = fast_route + + await router.start() + + for _ in range(10): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.1) + + self.assertEqual(router._cap_drop_count, 0) + await router.stop() + + # ── 3. Shutdown: in-flight tasks drained ──────────────────────────────── + + async def test_stop_waits_for_in_flight_tasks(self): + """ + stop() must wait for in-flight tasks to complete before returning. + Tasks that finish within the 5-second timeout must complete normally, + not be cancelled. + """ + router = PacketRouter(_make_daemon()) + + completed = [] + started = asyncio.Event() + + async def slow_route(pkt): + started.set() + await asyncio.sleep(0.2) # finishes well within 5 s timeout + completed.append(pkt) + + router._route_packet = slow_route + + await router.start() + pkt = _make_packet() + await router.enqueue(pkt) + + # Wait until the task has actually started + await asyncio.wait_for(started.wait(), timeout=1.0) + + await router.stop() + + # Task should have completed, not been cancelled + self.assertEqual(len(completed), 1, + "In-flight task was cancelled instead of drained") + + async def test_stop_cancels_tasks_that_exceed_timeout(self): + """ + Tasks that don't finish within the 5-second timeout must be cancelled, + not left running indefinitely. + """ + router = PacketRouter(_make_daemon()) + router._max_in_flight = 5 + + cancelled = [] + started = asyncio.Event() + + async def hanging_route(pkt): + started.set() + try: + await asyncio.sleep(999) # will not finish within 5 s + except asyncio.CancelledError: + cancelled.append(pkt) + raise + + router._route_packet = hanging_route + + # Patch the timeout to 0.1 s so the test runs fast + original_stop = router.stop + + async def fast_stop(): + router.running = False + if router.router_task: + router.router_task.cancel() + try: + await router.router_task + except asyncio.CancelledError: + pass + if router._route_tasks: + snapshot = set(router._route_tasks) + _, still_pending = await asyncio.wait(snapshot, timeout=0.1) + for task in still_pending: + task.cancel() + await asyncio.gather(*still_pending, return_exceptions=True) + + router.stop = fast_stop + + await router.start() + await router.enqueue(_make_packet()) + await asyncio.wait_for(started.wait(), timeout=1.0) + + await router.stop() + + self.assertEqual(len(cancelled), 1, + "Hanging task was not cancelled on shutdown") + + # ── 4. Route-tasks set stays in sync with counter ─────────────────────── + + async def test_route_tasks_set_cleaned_up_on_completion(self): + """ + _route_tasks must be empty after all tasks complete — the done-callback + must discard each task so the set doesn't grow unboundedly. + """ + router = PacketRouter(_make_daemon()) + + async def fast_route(pkt): + await asyncio.sleep(0) # yield, then done + + router._route_packet = fast_route + + await router.start() + + for _ in range(10): + await router.enqueue(_make_packet()) + + # Give tasks time to complete + await asyncio.sleep(0.1) + + self.assertEqual(len(router._route_tasks), 0, + "_route_tasks not cleaned up after task completion") + self.assertEqual(router._in_flight, 0, + "_in_flight counter not back to 0 after completion") + + await router.stop() + + # ── 5. Counter and set always agree ───────────────────────────────────── + + async def test_counter_matches_set_size_under_load(self): + """ + _in_flight must always equal len(_route_tasks) while tasks are running. + Checked at steady state when the cap is saturated. + """ + router = PacketRouter(_make_daemon()) + router._max_in_flight = 5 + + barrier = asyncio.Event() + + async def blocking_route(pkt): + await barrier.wait() + + router._route_packet = blocking_route + + await router.start() + + for _ in range(5): + await router.enqueue(_make_packet()) + await asyncio.sleep(0.05) + + self.assertEqual( + router._in_flight, len(router._route_tasks), + f"Counter ({router._in_flight}) != set size ({len(router._route_tasks)})" + ) + + barrier.set() + await router.stop() + + +if __name__ == "__main__": + unittest.main()