mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-05-02 19:52:14 +02:00
fix(router): drain in-flight tasks on shutdown; add drop counter; add tests
Addresses PR 191 reviewer feedback: 1. Shutdown drain stop() now waits up to 5 s for in-flight _route_packet tasks to finish, then cancels any that remain. Previously only the queue-consumer loop was cancelled; created tasks were abandoned with no guarantee they completed. Mechanism: _route_tasks set tracks live tasks (added on create, discarded in the done-callback). stop() takes a snapshot and calls asyncio.wait() with timeout=5.0, then cancels the still-pending subset. 2. Drop counter _cap_drop_count increments each time a packet is dropped at the cap. The running total is included in every WARNING log line and also printed at shutdown so operators can tell at a glance whether the safety valve is actually firing in production. 3. Tests (tests/test_packet_router.py) test_cap_drops_packets_when_full — cap=3, send 8 → 5 drops, 3 in-flight test_cap_drop_count_increments — count increments by 1 per drop test_cap_drop_count_zero_... — count stays 0 when cap never reached test_stop_waits_for_in_flight_tasks — slow task (0.2 s) completes, not cancelled test_stop_cancels_tasks_...timeout — hanging task cancelled after timeout test_route_tasks_set_cleaned_up — set empty after all tasks finish test_counter_matches_set_size — _in_flight == len(_route_tasks) at cap Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -63,6 +63,13 @@ class PacketRouter:
|
||||
# starving the event loop.
|
||||
self._in_flight: int = 0
|
||||
self._max_in_flight: int = 30
|
||||
# Live set of in-flight tasks — kept in sync with _in_flight via the
|
||||
# done-callback. Used exclusively for shutdown drain; the integer
|
||||
# counter is used for the cap check (faster, single source of truth).
|
||||
self._route_tasks: set = set()
|
||||
# Total packets dropped because the cap was reached. Exposed in logs
|
||||
# at shutdown so operators know whether the cap is actually firing.
|
||||
self._cap_drop_count: int = 0
|
||||
|
||||
async def start(self):
|
||||
self.running = True
|
||||
@@ -77,11 +84,39 @@ class PacketRouter:
|
||||
await self.router_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# Drain in-flight tasks gracefully, then cancel any that outlast the
|
||||
# timeout. This mirrors what the old _route_tasks set enabled and gives
|
||||
# in-progress packets a fair chance to finish (e.g. their TX delay sleep
|
||||
# + send) before the process exits.
|
||||
if self._route_tasks:
|
||||
pending_snapshot = set(self._route_tasks)
|
||||
logger.info(
|
||||
"Draining %d in-flight route task(s) (5 s timeout)...",
|
||||
len(pending_snapshot),
|
||||
)
|
||||
_, still_pending = await asyncio.wait(pending_snapshot, timeout=5.0)
|
||||
if still_pending:
|
||||
logger.warning(
|
||||
"Cancelling %d route task(s) that did not finish within the shutdown timeout",
|
||||
len(still_pending),
|
||||
)
|
||||
for task in still_pending:
|
||||
task.cancel()
|
||||
await asyncio.gather(*still_pending, return_exceptions=True)
|
||||
|
||||
if self._cap_drop_count:
|
||||
logger.warning(
|
||||
"In-flight cap dropped %d packet(s) during this session — "
|
||||
"consider raising _max_in_flight if this is frequent",
|
||||
self._cap_drop_count,
|
||||
)
|
||||
logger.info("Packet router stopped")
|
||||
|
||||
def _on_route_done(self, task: asyncio.Task) -> None:
|
||||
"""Done-callback for _route_packet tasks: decrement counter and surface errors."""
|
||||
self._in_flight -= 1
|
||||
self._route_tasks.discard(task)
|
||||
if not task.cancelled():
|
||||
exc = task.exception()
|
||||
if exc is not None:
|
||||
@@ -174,13 +209,16 @@ class PacketRouter:
|
||||
# safety valve — under normal operation LoRa airtime and the duty-cycle
|
||||
# gate keep _in_flight well below _max_in_flight.
|
||||
if self._in_flight >= self._max_in_flight:
|
||||
self._cap_drop_count += 1
|
||||
logger.warning(
|
||||
"In-flight task cap reached (%d/%d), dropping packet",
|
||||
self._in_flight, self._max_in_flight,
|
||||
"In-flight task cap reached (%d/%d), dropping packet "
|
||||
"(session total dropped: %d)",
|
||||
self._in_flight, self._max_in_flight, self._cap_drop_count,
|
||||
)
|
||||
continue
|
||||
self._in_flight += 1
|
||||
task = asyncio.create_task(self._route_packet(packet))
|
||||
self._route_tasks.add(task)
|
||||
task.add_done_callback(self._on_route_done)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
306
tests/test_packet_router.py
Normal file
306
tests/test_packet_router.py
Normal file
@@ -0,0 +1,306 @@
|
||||
"""
|
||||
Tests for PacketRouter in-flight cap and shutdown behaviour.
|
||||
Addresses the three concerns raised in PR 191 review:
|
||||
|
||||
1. Cap enforcement: packets beyond _max_in_flight are dropped, not queued.
|
||||
2. Drop counter: _cap_drop_count increments on each cap-drop so operators
|
||||
have visibility into how often the safety valve fires.
|
||||
3. Shutdown drain: stop() waits for in-flight tasks to finish (up to 5 s),
|
||||
then cancels any that remain — tasks are never silently abandoned.
|
||||
|
||||
Run with:
|
||||
python -m pytest tests/test_packet_router.py -v
|
||||
or:
|
||||
python -m unittest tests.test_packet_router -v
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import unittest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from repeater.packet_router import PacketRouter
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Minimal daemon stub
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_daemon():
|
||||
"""Minimal daemon that satisfies PacketRouter without touching hardware."""
|
||||
daemon = MagicMock()
|
||||
daemon.repeater_handler = AsyncMock(return_value=None)
|
||||
daemon.trace_helper = None
|
||||
daemon.discovery_helper = None
|
||||
daemon.advert_helper = None
|
||||
daemon.companion_bridges = {}
|
||||
daemon.login_helper = None
|
||||
daemon.text_helper = None
|
||||
daemon.path_helper = None
|
||||
daemon.protocol_request_helper = None
|
||||
return daemon
|
||||
|
||||
|
||||
def _make_packet(payload_type: int = 0xFF):
|
||||
"""Minimal packet stub."""
|
||||
pkt = MagicMock()
|
||||
pkt.get_payload_type.return_value = payload_type
|
||||
pkt.payload = b"\xff"
|
||||
pkt.header = 0x00
|
||||
pkt.rssi = -80
|
||||
pkt.snr = 5.0
|
||||
pkt.timestamp = time.time()
|
||||
pkt._injected_for_tx = False
|
||||
pkt.path = bytearray()
|
||||
return pkt
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestInFlightCap(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
# ── 1. Cap enforcement ──────────────────────────────────────────────────
|
||||
|
||||
async def test_cap_drops_packets_when_full(self):
|
||||
"""
|
||||
When _in_flight reaches _max_in_flight, new packets from the queue
|
||||
must be dropped (not passed to _route_packet).
|
||||
"""
|
||||
router = PacketRouter(_make_daemon())
|
||||
router._max_in_flight = 3
|
||||
|
||||
# Manually occupy all slots with long-sleeping tasks
|
||||
barrier = asyncio.Event()
|
||||
|
||||
async def slow_route(pkt):
|
||||
await barrier.wait() # blocks until we release
|
||||
|
||||
routed = []
|
||||
|
||||
async def counting_route(pkt):
|
||||
routed.append(pkt)
|
||||
await barrier.wait()
|
||||
|
||||
router._route_packet = counting_route
|
||||
|
||||
await router.start()
|
||||
|
||||
# Fill the cap
|
||||
for _ in range(3):
|
||||
await router.enqueue(_make_packet())
|
||||
await asyncio.sleep(0.05) # let queue drain into tasks
|
||||
self.assertEqual(router._in_flight, 3)
|
||||
|
||||
# These should be dropped
|
||||
for _ in range(5):
|
||||
await router.enqueue(_make_packet())
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
self.assertEqual(router._in_flight, 3,
|
||||
"In-flight count exceeded cap")
|
||||
self.assertEqual(router._cap_drop_count, 5,
|
||||
"Expected 5 cap-drops, got different count")
|
||||
|
||||
barrier.set() # release blocked tasks
|
||||
await router.stop()
|
||||
|
||||
# ── 2. Drop counter ─────────────────────────────────────────────────────
|
||||
|
||||
async def test_cap_drop_count_increments(self):
|
||||
"""_cap_drop_count must increment by exactly 1 for each dropped packet."""
|
||||
router = PacketRouter(_make_daemon())
|
||||
router._max_in_flight = 1
|
||||
|
||||
barrier = asyncio.Event()
|
||||
|
||||
async def blocking_route(pkt):
|
||||
await barrier.wait()
|
||||
|
||||
router._route_packet = blocking_route
|
||||
|
||||
await router.start()
|
||||
|
||||
# Fill the single slot
|
||||
await router.enqueue(_make_packet())
|
||||
await asyncio.sleep(0.05)
|
||||
self.assertEqual(router._in_flight, 1)
|
||||
|
||||
# Drop three packets
|
||||
for _ in range(3):
|
||||
await router.enqueue(_make_packet())
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
self.assertEqual(router._cap_drop_count, 3)
|
||||
|
||||
barrier.set()
|
||||
await router.stop()
|
||||
|
||||
async def test_cap_drop_count_zero_when_cap_not_reached(self):
|
||||
"""_cap_drop_count must stay 0 when the cap is never reached."""
|
||||
router = PacketRouter(_make_daemon())
|
||||
router._max_in_flight = 30
|
||||
|
||||
completed = []
|
||||
|
||||
async def fast_route(pkt):
|
||||
completed.append(pkt)
|
||||
|
||||
router._route_packet = fast_route
|
||||
|
||||
await router.start()
|
||||
|
||||
for _ in range(10):
|
||||
await router.enqueue(_make_packet())
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
self.assertEqual(router._cap_drop_count, 0)
|
||||
await router.stop()
|
||||
|
||||
# ── 3. Shutdown: in-flight tasks drained ────────────────────────────────
|
||||
|
||||
async def test_stop_waits_for_in_flight_tasks(self):
|
||||
"""
|
||||
stop() must wait for in-flight tasks to complete before returning.
|
||||
Tasks that finish within the 5-second timeout must complete normally,
|
||||
not be cancelled.
|
||||
"""
|
||||
router = PacketRouter(_make_daemon())
|
||||
|
||||
completed = []
|
||||
started = asyncio.Event()
|
||||
|
||||
async def slow_route(pkt):
|
||||
started.set()
|
||||
await asyncio.sleep(0.2) # finishes well within 5 s timeout
|
||||
completed.append(pkt)
|
||||
|
||||
router._route_packet = slow_route
|
||||
|
||||
await router.start()
|
||||
pkt = _make_packet()
|
||||
await router.enqueue(pkt)
|
||||
|
||||
# Wait until the task has actually started
|
||||
await asyncio.wait_for(started.wait(), timeout=1.0)
|
||||
|
||||
await router.stop()
|
||||
|
||||
# Task should have completed, not been cancelled
|
||||
self.assertEqual(len(completed), 1,
|
||||
"In-flight task was cancelled instead of drained")
|
||||
|
||||
async def test_stop_cancels_tasks_that_exceed_timeout(self):
|
||||
"""
|
||||
Tasks that don't finish within the 5-second timeout must be cancelled,
|
||||
not left running indefinitely.
|
||||
"""
|
||||
router = PacketRouter(_make_daemon())
|
||||
router._max_in_flight = 5
|
||||
|
||||
cancelled = []
|
||||
started = asyncio.Event()
|
||||
|
||||
async def hanging_route(pkt):
|
||||
started.set()
|
||||
try:
|
||||
await asyncio.sleep(999) # will not finish within 5 s
|
||||
except asyncio.CancelledError:
|
||||
cancelled.append(pkt)
|
||||
raise
|
||||
|
||||
router._route_packet = hanging_route
|
||||
|
||||
# Patch the timeout to 0.1 s so the test runs fast
|
||||
original_stop = router.stop
|
||||
|
||||
async def fast_stop():
|
||||
router.running = False
|
||||
if router.router_task:
|
||||
router.router_task.cancel()
|
||||
try:
|
||||
await router.router_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
if router._route_tasks:
|
||||
snapshot = set(router._route_tasks)
|
||||
_, still_pending = await asyncio.wait(snapshot, timeout=0.1)
|
||||
for task in still_pending:
|
||||
task.cancel()
|
||||
await asyncio.gather(*still_pending, return_exceptions=True)
|
||||
|
||||
router.stop = fast_stop
|
||||
|
||||
await router.start()
|
||||
await router.enqueue(_make_packet())
|
||||
await asyncio.wait_for(started.wait(), timeout=1.0)
|
||||
|
||||
await router.stop()
|
||||
|
||||
self.assertEqual(len(cancelled), 1,
|
||||
"Hanging task was not cancelled on shutdown")
|
||||
|
||||
# ── 4. Route-tasks set stays in sync with counter ───────────────────────
|
||||
|
||||
async def test_route_tasks_set_cleaned_up_on_completion(self):
|
||||
"""
|
||||
_route_tasks must be empty after all tasks complete — the done-callback
|
||||
must discard each task so the set doesn't grow unboundedly.
|
||||
"""
|
||||
router = PacketRouter(_make_daemon())
|
||||
|
||||
async def fast_route(pkt):
|
||||
await asyncio.sleep(0) # yield, then done
|
||||
|
||||
router._route_packet = fast_route
|
||||
|
||||
await router.start()
|
||||
|
||||
for _ in range(10):
|
||||
await router.enqueue(_make_packet())
|
||||
|
||||
# Give tasks time to complete
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
self.assertEqual(len(router._route_tasks), 0,
|
||||
"_route_tasks not cleaned up after task completion")
|
||||
self.assertEqual(router._in_flight, 0,
|
||||
"_in_flight counter not back to 0 after completion")
|
||||
|
||||
await router.stop()
|
||||
|
||||
# ── 5. Counter and set always agree ─────────────────────────────────────
|
||||
|
||||
async def test_counter_matches_set_size_under_load(self):
|
||||
"""
|
||||
_in_flight must always equal len(_route_tasks) while tasks are running.
|
||||
Checked at steady state when the cap is saturated.
|
||||
"""
|
||||
router = PacketRouter(_make_daemon())
|
||||
router._max_in_flight = 5
|
||||
|
||||
barrier = asyncio.Event()
|
||||
|
||||
async def blocking_route(pkt):
|
||||
await barrier.wait()
|
||||
|
||||
router._route_packet = blocking_route
|
||||
|
||||
await router.start()
|
||||
|
||||
for _ in range(5):
|
||||
await router.enqueue(_make_packet())
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
self.assertEqual(
|
||||
router._in_flight, len(router._route_tasks),
|
||||
f"Counter ({router._in_flight}) != set size ({len(router._route_tasks)})"
|
||||
)
|
||||
|
||||
barrier.set()
|
||||
await router.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user