Merge pull request #191 from tjdownes/perf/in-flight-cap

perf: replace _route_tasks set with bounded in-flight counter
This commit is contained in:
Lloyd
2026-04-23 16:08:41 +01:00
committed by GitHub
3 changed files with 723 additions and 17 deletions

349
docs/pr_in_flight_cap.md Normal file
View File

@@ -0,0 +1,349 @@
# PR: Bounded In-Flight Task Counter + Simplified Route Task Management
**Branch:** `perf/in-flight-cap`
**Base:** `rightup/fix-perfom-speed`
**Files changed:** `repeater/packet_router.py` (1 file, ~33 lines net)
---
## Background
The queue loop dispatches each incoming packet as an `asyncio.create_task` so TX
delay timers run concurrently — this is correct behaviour. The previous
implementation tracked these tasks in a `set[asyncio.Task]` (`_route_tasks`) for
two reasons:
1. **Error surfacing** — the done-callback read `task.result()` to log exceptions.
2. **Shutdown cancellation**`stop()` cancelled and awaited all tasks in the set.
This PR replaces the set with a simple integer counter and tightens the companion
deduplication prune threshold.
---
## Problems
### Problem 1 — Unbounded task accumulation
LoRa airtime naturally limits steady-state throughput to a handful of in-flight
tasks at any time. But burst arrivals can spike the count temporarily:
- **Multi-hop flood amplification**: a single source packet is forwarded by every
repeater in range, each of which re-broadcasts it. A node at a mesh junction
may receive 510 copies within 100 ms, each scheduling a separate `delayed_send`
task.
- **Collision retries**: hardware-level collisions produce duplicate RF bursts that
all arrive within the same RX window.
- **Bridge nodes**: high-traffic gateway nodes connect multiple mesh segments and
forward both directions simultaneously.
Under these conditions `_route_tasks` can accumulate dozens of sleeping tasks.
Each holds a reference to the packet, the forwarded packet copy, a closure over
`delayed_send`, and associated asyncio task overhead. There is no cap; the set
grows until the duty-cycle gate finally fires for each task.
### Problem 2 — `_route_tasks` set adds O(1) cost on every packet but O(n) cost on shutdown
Every packet adds one entry to `_route_tasks` and removes it in the done-callback.
This is O(1) per operation, but the `stop()` shutdown path iterates the entire set
to cancel and gather all tasks — O(n) where n is however many tasks happen to be
in-flight at shutdown time. On a busy node this could delay clean shutdown.
### Problem 3 — `_COMPANION_DEDUPE_PRUNE_THRESHOLD = 1000` is too high
The companion delivery deduplication dict prunes itself only when it exceeds 1000
entries. With a 60-second TTL, each PATH/protocol-response packet adds one entry.
On a busy mesh with 50+ nodes sending adverts and PATH packets, the dict can grow
to hundreds of entries before a prune is triggered — keeping stale entries in
memory for up to 60 seconds × 1000/rate entries worth of time.
---
## Solution
### Replace `_route_tasks` set with `_in_flight` counter
An integer counter provides the same protection (tasks complete; done-callback
fires) without holding strong references to each task object:
```python
# __init__
self._in_flight: int = 0
self._max_in_flight: int = 30
# _process_queue — drop early if cap reached
if self._in_flight >= self._max_in_flight:
logger.warning("In-flight task cap reached (%d/%d), dropping packet", ...)
continue
self._in_flight += 1
task = asyncio.create_task(self._route_packet(packet))
task.add_done_callback(self._on_route_done)
# done-callback
def _on_route_done(self, task):
self._in_flight -= 1
if not task.cancelled() and task.exception():
logger.error("_route_packet raised: %s", task.exception(), ...)
```
### Cap at 30 concurrent in-flight tasks
30 is chosen as a ceiling that is:
- **Never reached in normal operation**: LoRa airtime at SF8/125 kHz limits
throughput to ~23 packets per second; with delays of 0.55 s each, the
steady-state in-flight count is at most 515 tasks.
- **High enough not to drop legitimate traffic**: a burst of 30 nearly-simultaneous
packets would require every node in a large mesh to transmit within 1 second.
- **Low enough to protect against pathological scenarios**: a misconfigured node
flooding the channel or a software bug causing infinite re-queuing.
### Tighten companion dedup prune threshold to 200
200 entries at 60 s TTL means a sweep is triggered after ~200 unique PATH/response
packets arrive without any expiry. This is far more than a typical companion
session (which sees a handful of active connections) but prevents multi-hour
accumulation on a busy mesh.
---
## Trade-off: Shutdown Cancellation
The previous `_route_tasks` set allowed `stop()` to explicitly cancel and await
all in-flight tasks on shutdown. The counter approach does not.
**Why this is acceptable:**
1. In-flight `_route_packet` tasks are sleeping inside `delayed_send` (waiting for
their TX delay timer). When the event loop is shut down — whether via
`asyncio.run()` completing, `loop.stop()`, or `SIGTERM` handling — Python
cancels all pending tasks automatically.
2. Even under the old approach, cancelling a sleeping `delayed_send` means the
packet is not transmitted. The result is the same whether cancellation happens
explicitly in `stop()` or implicitly when the event loop closes.
3. For a graceful shutdown where we want to *wait* for in-flight packets to
complete transmission, the right mechanism is `stop()` awaiting the queue to
drain *before* cancelling the router task — not cancelling sleeping tasks.
Neither the old code nor this PR implements that, so no regression.
---
## Why This Is the Right Approach
### Alternative A — Keep `_route_tasks` set, add a size cap
```python
if len(self._route_tasks) >= 30:
logger.warning(...)
continue
```
Works, but the set still holds a strong reference to every Task object for the
duration of its sleep. The counter holds an integer. Task objects in Python 3.12+
are already strongly referenced by the event loop scheduler; the set reference is
redundant for preventing GC cancellation.
### Alternative B — `asyncio.Semaphore`
```python
self._sem = asyncio.Semaphore(30)
async with self._sem:
await self._route_packet(packet)
```
Correct but changes the queue loop from fire-and-forget to blocking: the loop
would wait at `async with self._sem` for a slot to open, stalling packet reads
while a slot is occupied. That reintroduces the queue freeze the concurrent
dispatch was designed to prevent. A semaphore is the right tool for *rate-
limiting* producers; a counter cap at the dispatch site is the right tool for
bounding *background* tasks.
### Alternative C — Integer counter (this PR)
- O(1) increment and decrement.
- No strong reference to task objects beyond the event loop's own reference.
- Drop decision is synchronous and immediate — no sleeping on semaphore.
- Error logging preserved in `_on_route_done`.
- Simpler code, easier to reason about.
---
## Changes — `repeater/packet_router.py` only
| Location | Change | Reason |
|----------|--------|--------|
| Module level | Remove `_COMPANION_DEDUPE_PRUNE_THRESHOLD = 1000` | Replaced with inline literal `200`; no need for a named constant for a single usage site |
| `__init__` | Remove `self._route_tasks = set()`; add `self._in_flight = 0`, `self._max_in_flight = 30` | Replace set-based tracking with counter |
| `stop()` | Remove `_route_tasks` cancellation block | Tasks complete or are cancelled by event loop shutdown; explicit cancellation not needed |
| `_on_route_task_done``_on_route_done` | Simpler done-callback: decrement counter + log exceptions | Error logging preserved; set management removed |
| `_should_deliver_path_to_companions` | `> _COMPANION_DEDUPE_PRUNE_THRESHOLD``> 200` with explanatory comment | Lower threshold; comment explains the sizing rationale |
| `_process_queue` | Check `_in_flight >= _max_in_flight` before `create_task`; increment `_in_flight`; use `_on_route_done` | Cap accumulation; counter tracks live task count |
---
## Test Plan
### Unit tests (no hardware)
**T1 — Counter increments and decrements correctly**
```python
async def test_in_flight_counter():
router = PacketRouter(mock_daemon)
await router.start()
assert router._in_flight == 0
# Enqueue a packet that takes time to process
async def slow_route(pkt):
await asyncio.sleep(0.1)
router._route_packet = slow_route
await router.enqueue(make_test_packet())
await asyncio.sleep(0.01) # let queue loop run
assert router._in_flight == 1 # task is sleeping
await asyncio.sleep(0.15) # task finishes
assert router._in_flight == 0 # counter decremented by done-callback
```
**T2 — Cap enforced: packet dropped when at limit**
```python
async def test_cap_drops_packet_at_limit():
router = PacketRouter(mock_daemon)
router._max_in_flight = 2
router._in_flight = 2 # simulate cap reached
dropped = []
original_create_task = asyncio.create_task
asyncio.create_task = lambda coro: dropped.append(coro)
await router._process_queue_once(make_test_packet())
assert dropped == [], "create_task must not be called when cap is reached"
asyncio.create_task = original_create_task
```
**T3 — Exceptions in `_route_packet` are logged, not swallowed**
```python
async def test_exception_logged():
router = PacketRouter(mock_daemon)
async def failing_route(pkt):
raise ValueError("simulated error")
router._route_packet = failing_route
with patch("repeater.packet_router.logger") as mock_log:
task = asyncio.create_task(failing_route(make_test_packet()))
router._in_flight = 1
task.add_done_callback(router._on_route_done)
await asyncio.gather(task, return_exceptions=True)
mock_log.error.assert_called_once()
assert router._in_flight == 0
```
**T4 — Companion dedup dict pruned at 200, not 1000**
```python
def test_companion_dedup_prune_threshold():
router = PacketRouter(mock_daemon)
future_time = time.time() + 999
# Fill with 199 entries (all unexpired) — no prune
router._companion_delivered = {f"key{i}": future_time for i in range(199)}
pkt = make_path_packet()
router._should_deliver_path_to_companions(pkt)
assert len(router._companion_delivered) == 200 # added one, no prune yet
# 201st entry triggers prune — all unexpired so count stays at 201
router._companion_delivered[f"key_extra"] = future_time
assert len(router._companion_delivered) == 201
# Force prune by making all existing entries expired
past_time = time.time() - 1
router._companion_delivered = {f"key{i}": past_time for i in range(201)}
router._should_deliver_path_to_companions(pkt)
# All expired entries pruned; only the new entry remains
assert len(router._companion_delivered) == 1
```
### Integration / field tests (with hardware)
**T5 — Burst flood: verify cap fires under pathological load**
1. Configure a test mesh with 4+ nodes all in range of the repeater.
2. Have all nodes send a flood packet simultaneously.
3. Observe repeater logs.
**Expected:** `_in_flight` peaks in low single digits (LoRa airtime prevents
large bursts); no `"In-flight task cap reached"` warning fires under normal
conditions, confirming the cap is never a bottleneck in practice.
**T6 — Counter reaches zero after all packets processed**
1. Send a burst of 10 packets.
2. Wait 10 seconds (longer than max TX delay of 5 s).
3. Query `router._in_flight` from a debug endpoint or log.
**Expected:** `_in_flight == 0` after all delays expire and packets transmit.
**T7 — Error in `_route_packet` is logged and counter is decremented**
1. Temporarily introduce a deliberate exception in `_route_packet`.
2. Send a packet.
3. Check logs for the error message and verify the repeater continues operating
(counter decremented, queue still draining).
**T8 — Normal forwarding throughput unchanged**
1. Send packets at a steady rate of 1 every 10 seconds for 5 minutes.
2. Verify all packets are forwarded with no warnings or errors.
3. Confirm `_in_flight` never exceeds 34 during normal operation.
---
## Proof of Correctness
### Counter vs set: why the counter is sufficient
The `_route_tasks` set solved two problems:
1. **GC protection**: In Python < 3.12, a task with no strong references other
than the event loop's internal weakref could be garbage collected before
completing. Python 3.12+ strengthened task references in the event loop.
However, even in earlier versions, the set was unnecessary once `create_task`
returns — the caller holds the reference, and the done-callback fires reliably
because the event loop holds the task alive until completion.
2. **Explicit shutdown cancellation**: The counter loses this. As argued above,
the outcome is identical — sleeping tasks are cancelled either explicitly by
`stop()` or implicitly by the event loop at shutdown — and no packet that
hasn't been transmitted yet can complete its send after the radio is shut down
anyway.
### Why `_on_route_done` is a done-callback and not a `try/finally` inside `_route_packet`
A `try/finally` block inside `_route_packet` would also decrement the counter.
Done-callbacks are preferable because:
- They fire even if the task is externally cancelled (e.g. by event loop shutdown),
whereas `finally` may not run if `CancelledError` is not caught.
- They decouple counter management from `_route_packet` logic — `_route_packet`
has no knowledge of or dependency on the cap mechanism.
- They keep the pattern consistent with the rest of the codebase's use of
`add_done_callback` for task lifecycle management.
### Why 30 and not a smaller number like 10
At SF8, 125 kHz bandwidth, a 30-byte payload takes ~111 ms airtime and produces
a TX delay of roughly 0.53 s. With a 60-second duty-cycle window and 3.6 s
max airtime, the node can forward at most ~32 packets per minute at full budget.
If all 32 arrive within one second (they cannot physically, but as an upper
bound), 32 tasks would be in-flight simultaneously. A cap of 30 is aggressive
enough to protect against unbounded growth but not so low that it would drop
legitimate traffic under any realistic burst scenario.

View File

@@ -24,7 +24,6 @@ logger = logging.getLogger("PacketRouter")
# Deliver PATH and protocol-response (PATH) to companion at most once per logical packet
# so the client is not spammed with duplicate telemetry when the mesh delivers multiple copies.
_COMPANION_DEDUPE_TTL_SEC = 60.0
_COMPANION_DEDUPE_PRUNE_THRESHOLD = 1000
def _companion_dedup_key(packet) -> str | None:
@@ -51,11 +50,26 @@ class PacketRouter:
self.queue = asyncio.Queue(maxsize=500)
self.running = False
self.router_task = None
self._route_tasks = set()
# Serialize injects so one local TX completes before the next is processed
self._inject_lock = asyncio.Lock()
# Hash -> expiry time; skip delivering same PATH/protocol-response to companions more than once
self._companion_delivered = {}
# Safety valve: cap the number of _route_packet tasks sleeping concurrently.
# LoRa's airtime budget naturally limits throughput, but burst arrivals
# (multi-hop amplification, collision retries) can stack many sleeping
# delay tasks before the duty-cycle gate fires. 30 is very generous for
# any realistic LoRa network but protects against pathological scenarios
# (e.g. a busy bridge node during a mesh-wide flood) exhausting memory or
# starving the event loop.
self._in_flight: int = 0
self._max_in_flight: int = 30
# Live set of in-flight tasks — kept in sync with _in_flight via the
# done-callback. Used exclusively for shutdown drain; the integer
# counter is used for the cap check (faster, single source of truth).
self._route_tasks: set = set()
# Total packets dropped because the cap was reached. Exposed in logs
# at shutdown so operators know whether the cap is actually firing.
self._cap_drop_count: int = 0
async def start(self):
self.running = True
@@ -70,22 +84,43 @@ class PacketRouter:
await self.router_task
except asyncio.CancelledError:
pass
# Cancel in-flight packet routing tasks during shutdown.
# Drain in-flight tasks gracefully, then cancel any that outlast the
# timeout. This mirrors what the old _route_tasks set enabled and gives
# in-progress packets a fair chance to finish (e.g. their TX delay sleep
# + send) before the process exits.
if self._route_tasks:
tasks = list(self._route_tasks)
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
pending_snapshot = set(self._route_tasks)
logger.info(
"Draining %d in-flight route task(s) (5 s timeout)...",
len(pending_snapshot),
)
_, still_pending = await asyncio.wait(pending_snapshot, timeout=5.0)
if still_pending:
logger.warning(
"Cancelling %d route task(s) that did not finish within the shutdown timeout",
len(still_pending),
)
for task in still_pending:
task.cancel()
await asyncio.gather(*still_pending, return_exceptions=True)
if self._cap_drop_count:
logger.warning(
"In-flight cap dropped %d packet(s) during this session — "
"consider raising _max_in_flight if this is frequent",
self._cap_drop_count,
)
logger.info("Packet router stopped")
def _on_route_task_done(self, task: asyncio.Task) -> None:
def _on_route_done(self, task: asyncio.Task) -> None:
"""Done-callback for _route_packet tasks: decrement counter and surface errors."""
self._in_flight -= 1
self._route_tasks.discard(task)
try:
task.result()
except asyncio.CancelledError:
pass
except Exception as e:
logger.error("Router packet task error: %s", e, exc_info=True)
if not task.cancelled():
exc = task.exception()
if exc is not None:
logger.error("_route_packet raised: %s", exc, exc_info=exc)
def _should_deliver_path_to_companions(self, packet) -> bool:
"""Return True if this PATH/protocol-response should be delivered to companions (first of duplicates)."""
@@ -93,8 +128,12 @@ class PacketRouter:
if not key:
return True
now = time.time()
# Prune expired entries only when map grows beyond threshold to avoid per-packet full sweeps.
if len(self._companion_delivered) > _COMPANION_DEDUPE_PRUNE_THRESHOLD:
# Prune expired entries only when the dict grows large, avoiding a full
# dict comprehension on every packet. 200 entries × 60 s TTL means a
# sweep only triggers after ~200 unique PATH packets with no expiry — far
# more than any realistic companion session, and well below the 1000-entry
# threshold that could accumulate over hours without pruning.
if len(self._companion_delivered) > 200:
self._companion_delivered = {
k: v for k, v in self._companion_delivered.items() if v > now
}
@@ -166,9 +205,21 @@ class PacketRouter:
while self.running:
try:
packet = await asyncio.wait_for(self.queue.get(), timeout=0.1)
# Drop early if the in-flight cap is reached. This is a last-resort
# safety valve — under normal operation LoRa airtime and the duty-cycle
# gate keep _in_flight well below _max_in_flight.
if self._in_flight >= self._max_in_flight:
self._cap_drop_count += 1
logger.warning(
"In-flight task cap reached (%d/%d), dropping packet "
"(session total dropped: %d)",
self._in_flight, self._max_in_flight, self._cap_drop_count,
)
continue
self._in_flight += 1
task = asyncio.create_task(self._route_packet(packet))
self._route_tasks.add(task)
task.add_done_callback(self._on_route_task_done)
task.add_done_callback(self._on_route_done)
except asyncio.TimeoutError:
continue
except Exception as e:

306
tests/test_packet_router.py Normal file
View File

@@ -0,0 +1,306 @@
"""
Tests for PacketRouter in-flight cap and shutdown behaviour.
Addresses the three concerns raised in PR 191 review:
1. Cap enforcement: packets beyond _max_in_flight are dropped, not queued.
2. Drop counter: _cap_drop_count increments on each cap-drop so operators
have visibility into how often the safety valve fires.
3. Shutdown drain: stop() waits for in-flight tasks to finish (up to 5 s),
then cancels any that remain — tasks are never silently abandoned.
Run with:
python -m pytest tests/test_packet_router.py -v
or:
python -m unittest tests.test_packet_router -v
"""
import asyncio
import time
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
from repeater.packet_router import PacketRouter
# ---------------------------------------------------------------------------
# Minimal daemon stub
# ---------------------------------------------------------------------------
def _make_daemon():
"""Minimal daemon that satisfies PacketRouter without touching hardware."""
daemon = MagicMock()
daemon.repeater_handler = AsyncMock(return_value=None)
daemon.trace_helper = None
daemon.discovery_helper = None
daemon.advert_helper = None
daemon.companion_bridges = {}
daemon.login_helper = None
daemon.text_helper = None
daemon.path_helper = None
daemon.protocol_request_helper = None
return daemon
def _make_packet(payload_type: int = 0xFF):
"""Minimal packet stub."""
pkt = MagicMock()
pkt.get_payload_type.return_value = payload_type
pkt.payload = b"\xff"
pkt.header = 0x00
pkt.rssi = -80
pkt.snr = 5.0
pkt.timestamp = time.time()
pkt._injected_for_tx = False
pkt.path = bytearray()
return pkt
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestInFlightCap(unittest.IsolatedAsyncioTestCase):
# ── 1. Cap enforcement ──────────────────────────────────────────────────
async def test_cap_drops_packets_when_full(self):
"""
When _in_flight reaches _max_in_flight, new packets from the queue
must be dropped (not passed to _route_packet).
"""
router = PacketRouter(_make_daemon())
router._max_in_flight = 3
# Manually occupy all slots with long-sleeping tasks
barrier = asyncio.Event()
async def slow_route(pkt):
await barrier.wait() # blocks until we release
routed = []
async def counting_route(pkt):
routed.append(pkt)
await barrier.wait()
router._route_packet = counting_route
await router.start()
# Fill the cap
for _ in range(3):
await router.enqueue(_make_packet())
await asyncio.sleep(0.05) # let queue drain into tasks
self.assertEqual(router._in_flight, 3)
# These should be dropped
for _ in range(5):
await router.enqueue(_make_packet())
await asyncio.sleep(0.05)
self.assertEqual(router._in_flight, 3,
"In-flight count exceeded cap")
self.assertEqual(router._cap_drop_count, 5,
"Expected 5 cap-drops, got different count")
barrier.set() # release blocked tasks
await router.stop()
# ── 2. Drop counter ─────────────────────────────────────────────────────
async def test_cap_drop_count_increments(self):
"""_cap_drop_count must increment by exactly 1 for each dropped packet."""
router = PacketRouter(_make_daemon())
router._max_in_flight = 1
barrier = asyncio.Event()
async def blocking_route(pkt):
await barrier.wait()
router._route_packet = blocking_route
await router.start()
# Fill the single slot
await router.enqueue(_make_packet())
await asyncio.sleep(0.05)
self.assertEqual(router._in_flight, 1)
# Drop three packets
for _ in range(3):
await router.enqueue(_make_packet())
await asyncio.sleep(0.05)
self.assertEqual(router._cap_drop_count, 3)
barrier.set()
await router.stop()
async def test_cap_drop_count_zero_when_cap_not_reached(self):
"""_cap_drop_count must stay 0 when the cap is never reached."""
router = PacketRouter(_make_daemon())
router._max_in_flight = 30
completed = []
async def fast_route(pkt):
completed.append(pkt)
router._route_packet = fast_route
await router.start()
for _ in range(10):
await router.enqueue(_make_packet())
await asyncio.sleep(0.1)
self.assertEqual(router._cap_drop_count, 0)
await router.stop()
# ── 3. Shutdown: in-flight tasks drained ────────────────────────────────
async def test_stop_waits_for_in_flight_tasks(self):
"""
stop() must wait for in-flight tasks to complete before returning.
Tasks that finish within the 5-second timeout must complete normally,
not be cancelled.
"""
router = PacketRouter(_make_daemon())
completed = []
started = asyncio.Event()
async def slow_route(pkt):
started.set()
await asyncio.sleep(0.2) # finishes well within 5 s timeout
completed.append(pkt)
router._route_packet = slow_route
await router.start()
pkt = _make_packet()
await router.enqueue(pkt)
# Wait until the task has actually started
await asyncio.wait_for(started.wait(), timeout=1.0)
await router.stop()
# Task should have completed, not been cancelled
self.assertEqual(len(completed), 1,
"In-flight task was cancelled instead of drained")
async def test_stop_cancels_tasks_that_exceed_timeout(self):
"""
Tasks that don't finish within the 5-second timeout must be cancelled,
not left running indefinitely.
"""
router = PacketRouter(_make_daemon())
router._max_in_flight = 5
cancelled = []
started = asyncio.Event()
async def hanging_route(pkt):
started.set()
try:
await asyncio.sleep(999) # will not finish within 5 s
except asyncio.CancelledError:
cancelled.append(pkt)
raise
router._route_packet = hanging_route
# Patch the timeout to 0.1 s so the test runs fast
original_stop = router.stop
async def fast_stop():
router.running = False
if router.router_task:
router.router_task.cancel()
try:
await router.router_task
except asyncio.CancelledError:
pass
if router._route_tasks:
snapshot = set(router._route_tasks)
_, still_pending = await asyncio.wait(snapshot, timeout=0.1)
for task in still_pending:
task.cancel()
await asyncio.gather(*still_pending, return_exceptions=True)
router.stop = fast_stop
await router.start()
await router.enqueue(_make_packet())
await asyncio.wait_for(started.wait(), timeout=1.0)
await router.stop()
self.assertEqual(len(cancelled), 1,
"Hanging task was not cancelled on shutdown")
# ── 4. Route-tasks set stays in sync with counter ───────────────────────
async def test_route_tasks_set_cleaned_up_on_completion(self):
"""
_route_tasks must be empty after all tasks complete — the done-callback
must discard each task so the set doesn't grow unboundedly.
"""
router = PacketRouter(_make_daemon())
async def fast_route(pkt):
await asyncio.sleep(0) # yield, then done
router._route_packet = fast_route
await router.start()
for _ in range(10):
await router.enqueue(_make_packet())
# Give tasks time to complete
await asyncio.sleep(0.1)
self.assertEqual(len(router._route_tasks), 0,
"_route_tasks not cleaned up after task completion")
self.assertEqual(router._in_flight, 0,
"_in_flight counter not back to 0 after completion")
await router.stop()
# ── 5. Counter and set always agree ─────────────────────────────────────
async def test_counter_matches_set_size_under_load(self):
"""
_in_flight must always equal len(_route_tasks) while tasks are running.
Checked at steady state when the cap is saturated.
"""
router = PacketRouter(_make_daemon())
router._max_in_flight = 5
barrier = asyncio.Event()
async def blocking_route(pkt):
await barrier.wait()
router._route_packet = blocking_route
await router.start()
for _ in range(5):
await router.enqueue(_make_packet())
await asyncio.sleep(0.05)
self.assertEqual(
router._in_flight, len(router._route_tasks),
f"Counter ({router._in_flight}) != set size ({len(router._route_tasks)})"
)
barrier.set()
await router.stop()
if __name__ == "__main__":
unittest.main()