diff --git a/docs/pr_tx_serialization.md b/docs/pr_tx_serialization.md new file mode 100644 index 0000000..b9e84d0 --- /dev/null +++ b/docs/pr_tx_serialization.md @@ -0,0 +1,395 @@ +# PR: Serialise Radio TX and Close Duty-Cycle TOCTOU Race + +**Branch:** `fix/tx-serialization` +**Base:** `rightup/fix-perfom-speed` +**Files changed:** `repeater/engine.py` (1 file, ~30 lines net) + +--- + +## Problem + +Two separate bugs share the same root cause: concurrent `delayed_send` coroutines +racing each other at transmission time. + +### Bug 1 — Interleaved SPI/serial commands to the radio + +The queue loop (added in an earlier commit) dispatches each incoming packet as an +`asyncio.create_task`, so multiple `delayed_send` coroutines can have their sleep +timers running concurrently. That is correct and intentional — it mirrors how +firmware nodes use a hardware timer so the radio keeps listening during a TX delay. + +However the LoRa radio is **half-duplex**: it can only transmit one packet at a +time. When two delay timers expire at nearly the same moment both coroutines call +`dispatcher.send_packet` simultaneously. `send_packet` issues a sequence of +SPI/serial register writes to the radio; two tasks interleaving these writes +produces undefined radio state and the transmission of neither packet is reliable. + +### Bug 2 — TOCTOU gap in duty-cycle enforcement + +`__call__` calls `can_transmit()` before scheduling a task: + +```python +# __call__ (before this fix) +can_tx, wait_time = self.airtime_mgr.can_transmit(airtime_ms) +if not can_tx: + ... # drop or defer +tx_task = await self.schedule_retransmit(fwd_pkt, delay, airtime_ms, ...) +``` + +`record_tx()` is only called later, inside `delayed_send`, after the sleep +completes. Between the check and the debit there is a window that spans the +entire TX delay (up to several seconds). Two packets that both pass the check +before either has slept and recorded its airtime will **both** be transmitted even +if transmitting both would exceed the duty-cycle budget. + +Under normal single-packet conditions this window is harmless. Under burst +conditions — multi-hop amplification, collision retries, or a busy mesh segment +where several packets arrive within the same delay window — multiple tasks pass +the advisory check simultaneously, and the duty-cycle limit is exceeded. + +--- + +## Root Cause + +There is no mutual exclusion around the radio send path. Each `delayed_send` +coroutine independently checks duty-cycle, sleeps, and transmits without +coordinating with any other concurrent coroutine doing the same thing. + +--- + +## Solution + +Add `self._tx_lock = asyncio.Lock()` (initialised in `__init__`) and acquire it +inside `delayed_send` **after** the sleep completes: + +``` +Delay timers run concurrently (unchanged): + Task A: sleep(1.2s) ──────────────────► acquire _tx_lock → check → TX A → release + Task B: sleep(0.9s) ──────────────────► acquire _tx_lock (waits) ──────────► check → TX B → release + Task C: sleep(2.1s) ────────────────────────────────────────────────────────────────► ... + +Radio: one packet at a time, duty-cycle state always stable inside the lock. +``` + +Inside the lock, a **second** `can_transmit()` call is made immediately before +sending. Because only one task holds the lock at a time, airtime state is stable +at this point and `record_tx()` follows on success — check and debit are +effectively atomic. This closes the TOCTOU window completely. + +The upfront `can_transmit()` in `__call__` is retained as an **advisory** fast +path: it still drops or defers packets that are obviously over budget before a +delay task is even scheduled, avoiding unnecessary sleep timers. It is no longer +the enforcement point. + +--- + +## Why This Is the Right Approach + +### Alternative A — Move `record_tx()` before the sleep + +```python +# hypothetical +self.airtime_mgr.record_tx(airtime_ms) # reserve before sleeping +await asyncio.sleep(delay) +await self.dispatcher.send_packet(...) # actual TX +``` + +Records airtime even if the send fails (exception, LBT busy, radio error) — +the budget is debited for a packet that was never transmitted. Over time this +inflates the apparent airtime, causing the node to throttle legitimate traffic +it actually has budget for. Requires a compensating `release_airtime()` on +every failure path, creating new complexity and failure modes. + +### Alternative B — A single global advisory check (status quo before this PR) + +Already demonstrated to fail under burst conditions (two tasks both pass before +either records its airtime). + +### Alternative C — asyncio.Lock (this PR) + +- Delay timers remain concurrent — no regression on the primary non-blocking TX + improvement. +- The check-and-debit pair is atomic within the lock — no TOCTOU window. +- No phantom airtime on send failure — `record_tx()` is only called on success. +- One `asyncio.Lock` object, no new state machines or compensating paths. +- The lock is `async`, so it only blocks other TX tasks, not the event loop or + the packet RX queue. + +### Why `asyncio.Lock` rather than `threading.Lock` + +The entire repeater runs on a single asyncio event loop. `asyncio.Lock` only +yields at `await` points; it does not involve OS threads or context switches. +A `threading.Lock` would work but is semantically wrong here (this is not a +thread-safety problem) and would block the event loop thread if held across an +`await`. + +--- + +## Changes + +### `repeater/engine.py` + +**1. Move `import random` to module level** + +```python +# before (inside _calculate_tx_delay): +def _calculate_tx_delay(self, packet, snr=0.0): + import random + ... + +# after (top of file, with other stdlib imports): +import random +``` + +This is a housekeeping fix bundled with this PR because `random` is a stdlib +module that should never be imported inside a hot-path function — Python caches +the import after the first call, but the attribute lookup and cache check still +run on every call. Moving it to module level is the standard pattern. + +**2. Add `self._tx_lock` to `__init__`** + +```python +# Serialise all radio TX calls. +# +# Background: since the queue loop dispatches each packet as an +# asyncio.create_task, multiple _route_packet coroutines can have their +# TX delay timers running concurrently — which is the intended behaviour +# (firmware nodes do the same with a hardware timer). However, the +# LoRa radio is half-duplex: it can only transmit one packet at a time. +# Without serialisation, two tasks whose delay timers expire near- +# simultaneously both call dispatcher.send_packet, interleaving SPI/serial +# commands to the radio and both passing the LBT check before either has +# actually transmitted. +# +# _tx_lock is acquired after each delay sleep and held for the entire +# send_packet call. Delays still run concurrently; only the radio +# access is serialised. This also eliminates the TOCTOU gap in duty-cycle +# enforcement — see schedule_retransmit / delayed_send for details. +self._tx_lock = asyncio.Lock() +``` + +**3. Acquire lock inside `delayed_send`, add authoritative duty-cycle gate** + +```python +async def delayed_send(): + await asyncio.sleep(delay) + + # Acquire the TX lock *after* the delay so that delay timers for + # multiple packets still run concurrently (matching firmware). Only + # one coroutine enters the radio send path at a time. + async with self._tx_lock: + # ── Authoritative duty-cycle gate ───────────────────────────── + # The upfront can_transmit() call in __call__ is advisory: it + # avoids scheduling packets that are obviously over budget, but + # it cannot prevent a race between two tasks whose delay timers + # expire at almost the same moment. Both tasks pass the advisory + # check before either has recorded its airtime, then both try to + # transmit. + # + # Inside _tx_lock only one task runs at a time, so airtime state + # is stable here. The check and the subsequent record_tx() are + # effectively atomic — no TOCTOU window. + if airtime_ms > 0: + can_tx_now, _ = self.airtime_mgr.can_transmit(airtime_ms) + if not can_tx_now: + logger.warning( + "Packet dropped at TX time: duty-cycle exceeded " + "(airtime=%.1fms)", airtime_ms, + ) + return + + last_error = None + for attempt in range(2 if local_transmission else 1): + try: + await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False) + self._record_packet_sent(fwd_pkt) + if airtime_ms > 0: + self.airtime_mgr.record_tx(airtime_ms) + ... +``` + +--- + +## Invariants Maintained + +| Property | Before | After | +|----------|--------|-------| +| Delay timers run concurrently | ✅ | ✅ | +| Radio accessed by one task at a time | ❌ | ✅ | +| Duty-cycle check and debit atomic | ❌ | ✅ | +| Airtime recorded only on TX success | ✅ | ✅ | +| Event loop not blocked by lock | ✅ | ✅ (asyncio.Lock) | + +--- + +## Test Plan + +### Unit tests (can run without hardware) + +**T1 — Serial TX ordering** + +```python +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +async def test_tx_serialized(): + """Two tasks whose delays expire simultaneously must not interleave.""" + send_order = [] + send_lock = asyncio.Lock() + + async def mock_send(pkt, **kw): + # Confirm the _tx_lock is already held when we enter send_packet + assert send_lock.locked(), "send_packet called without _tx_lock held" + send_order.append(pkt) + await asyncio.sleep(0) # yield; a second task must not enter here + + engine._tx_lock = send_lock # replace with the mock lock reference + engine.dispatcher.send_packet = mock_send + + t1 = asyncio.create_task(engine.schedule_retransmit(pkt_a, delay=0.01, airtime_ms=100)) + t2 = asyncio.create_task(engine.schedule_retransmit(pkt_b, delay=0.01, airtime_ms=100)) + await asyncio.gather(t1, t2) + + assert len(send_order) == 2 # both transmitted + assert send_order[0] is not send_order[1] # different packets +``` + +**T2 — Authoritative duty-cycle gate blocks over-budget second packet** + +```python +async def test_second_packet_dropped_when_over_budget(): + """When first TX fills the budget, second task must be dropped inside the lock.""" + # Set a tiny budget: 50ms per minute + engine.airtime_mgr.max_airtime_per_minute = 50 + + sent = [] + async def mock_send(pkt, **kw): + sent.append(pkt) + + engine.dispatcher.send_packet = mock_send + + # Each packet costs ~111ms (SF8, BW125, 30-byte payload) — first passes, second must not + t1 = asyncio.create_task(engine.schedule_retransmit(pkt_a, delay=0.01, airtime_ms=111)) + t2 = asyncio.create_task(engine.schedule_retransmit(pkt_b, delay=0.01, airtime_ms=111)) + await asyncio.gather(t1, t2) + + assert len(sent) == 1, f"Expected 1 TX, got {len(sent)}" +``` + +**T3 — Airtime not debited on TX failure** + +```python +async def test_airtime_not_recorded_on_send_failure(): + before = engine.airtime_mgr.total_airtime_ms + + async def failing_send(pkt, **kw): + raise RuntimeError("radio error") + + engine.dispatcher.send_packet = failing_send + + with pytest.raises(RuntimeError): + await engine.schedule_retransmit(pkt, delay=0, airtime_ms=100) + + assert engine.airtime_mgr.total_airtime_ms == before, \ + "Airtime must not be recorded when send raises" +``` + +**T4 — Advisory check still drops before scheduling (fast path not regressed)** + +```python +async def test_advisory_check_still_drops_obvious_overage(): + """__call__ should not even schedule a task when clearly over budget.""" + engine.airtime_mgr.max_airtime_per_minute = 0 # budget exhausted + + tasks_created = [] + original = asyncio.create_task + asyncio.create_task = lambda coro: tasks_created.append(coro) or original(coro) + + await engine(over_budget_packet, metadata={}) + + assert not tasks_created, "No task should be created when advisory check fails" +``` + +### Integration / field tests (with hardware) + +**T5 — Burst scenario: 5 packets arrive within the same delay window** + +1. Connect the repeater to a radio. +2. Using a second node, send 5 FLOOD packets in quick succession (< 100 ms apart) + with a low RSSI score so the repeater's delay is ~1–2 s for all of them. +3. Monitor the radio with a spectrum analyser or a third node running in monitor + mode. + +**Expected (after this fix):** +- Transmissions are sequential — no overlapping on-air signals. +- `Retransmitted packet` log lines appear one after another, each with a non-zero + airtime value. +- No `Retransmit failed` errors in the log. +- Duty-cycle log shows airtime accumulating correctly. + +**Expected (before this fix, to confirm the bug existed):** +- Occasional `Retransmit failed` errors under burst load. +- Airtime tracking diverging from actual on-air time (double-counted or missed). + +**T6 — Duty-cycle enforcement under burst** + +1. Set `max_airtime_per_minute` to a low value (e.g. 500 ms) in config. +2. Send 10 packets rapidly so the repeater tries to forward all 10. +3. Observe logs. + +**Expected:** +- First N packets transmitted (total airtime ≤ 500 ms). +- Subsequent packets log `"Packet dropped at TX time: duty-cycle exceeded"` from + inside `delayed_send` (not just the advisory drop). +- `airtime_mgr.get_stats()["utilization_percent"]` reads ≤ 100%. + +**T7 — Normal single-packet forwarding not regressed** + +1. Send one packet every 5 seconds (well within duty-cycle budget). +2. Verify each packet is forwarded with correct airtime logged. +3. Verify no lock contention warnings in the log. + +**T8 — Local TX retry path (local_transmission=True) still works** + +1. Send a command that triggers a local transmission (e.g. a ping reply). +2. Briefly block the radio (simulate with a mock) so the first attempt fails. +3. Verify the retry fires after 1 s and the packet is eventually transmitted. + +--- + +## Proof of Correctness + +### Why `asyncio.Lock` is sufficient (no OS-level synchronisation needed) + +Python's asyncio event loop is **single-threaded**. All coroutines share one +thread and only yield execution at `await` points. Between two consecutive +`await` calls in a coroutine, the event loop does not switch to another coroutine. + +`asyncio.Lock.acquire()` suspends the current coroutine if the lock is held, +returning control to the event loop. `asyncio.Lock.release()` wakes the next +waiter. Because `send_packet` is awaited inside the lock, no other TX task can +run until the current one releases the lock and the event loop gets a chance to +schedule the next waiter. + +There is no possibility of the race seen with `threading.Lock` where an OS thread +can be preempted mid-instruction. + +### Why the advisory check in `__call__` cannot be removed + +The advisory check is still necessary as a fast path. If it were removed, every +incoming packet — even when the node is clearly at 100% duty-cycle — would +schedule a `delayed_send` task that would sleep for the full TX delay (up to +several seconds) before the lock drops it. Under a sustained flood of incoming +packets this wastes memory and CPU. The advisory check prunes the queue early at +negligible cost. + +### Why `record_tx()` must be inside the lock (not before or after) + +- **Before the send:** records airtime for a packet that may never be transmitted + (send could fail, LBT could reject it). Budget is overcounted. +- **After releasing the lock:** a second task could pass the authoritative + `can_transmit()` check between `send_packet` returning and `record_tx()` being + called — the TOCTOU window reopens at a smaller scale. +- **Inside the lock, after a successful send:** the budget is debited exactly once + for exactly the packets that were actually transmitted. The lock ensures no + other task reads airtime state between the check and the debit. diff --git a/repeater/engine.py b/repeater/engine.py index 70cd6b0..c7c2bf3 100644 --- a/repeater/engine.py +++ b/repeater/engine.py @@ -1,6 +1,7 @@ import asyncio import copy import logging +import random import struct import time from collections import OrderedDict, deque @@ -136,6 +137,24 @@ class RepeaterHandler(BaseHandler): self._transport_keys_cache_time = 0 self._transport_keys_cache_ttl = 60 # Cache for 60 seconds + # Serialise all radio TX calls. + # + # Background: since the queue loop dispatches each packet as an + # asyncio.create_task, multiple _route_packet coroutines can have their + # TX delay timers running concurrently — which is the intended behaviour + # (firmware nodes do the same with a hardware timer). However, the + # LoRa radio is half-duplex: it can only transmit one packet at a time. + # Without serialisation, two tasks whose delay timers expire near- + # simultaneously both call dispatcher.send_packet, interleaving SPI/serial + # commands to the radio and both passing the LBT check before either has + # actually transmitted. + # + # _tx_lock is acquired after each delay sleep and held for the entire + # send_packet call. Delays still run concurrently; only the radio + # access is serialised. This also eliminates the TOCTOU gap in duty-cycle + # enforcement — see schedule_retransmit / delayed_send for details. + self._tx_lock = asyncio.Lock() + self._start_background_tasks() async def __call__( @@ -954,8 +973,6 @@ class RepeaterHandler(BaseHandler): def _calculate_tx_delay(self, packet: Packet, snr: float = 0.0) -> float: - import random - packet_len = packet.get_raw_length() airtime_ms = self.airtime_mgr.calculate_airtime(packet_len) @@ -1050,29 +1067,59 @@ class RepeaterHandler(BaseHandler): async def delayed_send(): await asyncio.sleep(delay) - last_error = None + + # Each attempt gets its own lock acquisition so the 1-second retry + # backoff (local_transmission only) happens OUTSIDE the lock. + # Holding _tx_lock across asyncio.sleep(1.0) would block every other + # queued TX task for the full backoff period. + # + # Loop runs once for relayed packets, twice for local_transmission: + # attempt 0 — initial try (no pre-sleep) + # attempt 1 — retry after 1s backoff outside the lock for attempt in range(2 if local_transmission else 1): - try: - await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False) - self._record_packet_sent(fwd_pkt) + if attempt > 0: + # Back-off OUTSIDE the lock — other tasks can transmit here. + logger.info("Retrying local TX in 1s (lock released during backoff)...") + await asyncio.sleep(1.0) + + async with self._tx_lock: + # ── Authoritative duty-cycle gate ────────────────────────── + # The upfront can_transmit() call in __call__ is advisory: it + # avoids scheduling packets obviously over budget, but cannot + # prevent a race between tasks whose delay timers expire nearly + # simultaneously. Both pass the advisory check before either + # records airtime, then both attempt to transmit. + # + # Inside _tx_lock only one task runs at a time. The check and + # record_tx() are effectively atomic — no TOCTOU window. + # Re-checked every attempt because airtime state may change + # while we wait for the lock or sleep through backoff. if airtime_ms > 0: - self.airtime_mgr.record_tx(airtime_ms) - packet_size = fwd_pkt.get_raw_length() - logger.info( - f"Retransmitted packet ({packet_size} bytes, " - f"{airtime_ms:.1f}ms airtime)" - ) - return - except Exception as e: - last_error = e - logger.error(f"Retransmit failed: {e}") - if local_transmission and attempt == 0: - logger.info("Retrying local TX in 1s...") - await asyncio.sleep(1.0) - else: - raise - if last_error is not None: - raise last_error + can_tx_now, _ = self.airtime_mgr.can_transmit(airtime_ms) + if not can_tx_now: + logger.warning( + "Packet dropped at TX time: duty-cycle exceeded " + "(airtime=%.1fms)", airtime_ms, + ) + return + + try: + await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False) + self._record_packet_sent(fwd_pkt) + if airtime_ms > 0: + self.airtime_mgr.record_tx(airtime_ms) + packet_size = fwd_pkt.get_raw_length() + logger.info( + f"Retransmitted packet ({packet_size} bytes, " + f"{airtime_ms:.1f}ms airtime)" + ) + return + except Exception as e: + logger.error(f"Retransmit failed (attempt {attempt + 1}): {e}") + if local_transmission and attempt == 0: + pass # release lock, outer loop sleeps, then retries + else: + raise return asyncio.create_task(delayed_send()) diff --git a/tests/test_tx_lock.py b/tests/test_tx_lock.py new file mode 100644 index 0000000..8f4f1c4 --- /dev/null +++ b/tests/test_tx_lock.py @@ -0,0 +1,279 @@ +""" +Tests for TX lock serialisation and duty-cycle TOCTOU fix. +Addresses the three concerns raised in PR 190 review: + + 1. Concurrent delayed_sends must not interleave send_packet calls. + 2. Duty-cycle TOCTOU must be fixed: the second packet is dropped when the + first consumes the airtime budget inside the lock. + 3. Local retry must NOT hold _tx_lock during the 1-second backoff sleep — + other queued packets must be able to transmit during that window. + +Run with: + python -m pytest tests/test_tx_lock.py -v +or: + python -m unittest tests.test_tx_lock +""" + +import asyncio +import time +import unittest +from collections import deque +from unittest.mock import AsyncMock, MagicMock + + +# --------------------------------------------------------------------------- +# Minimal handler factory +# --------------------------------------------------------------------------- + +def _make_handler(): + """ + Return a RepeaterHandler instance with all external I/O mocked. + + Uses __new__ + manual attribute injection to bypass StorageCollector, + radio hardware, and other heavy dependencies that are irrelevant to the + TX lock behaviour under test. + """ + from repeater.engine import RepeaterHandler + + radio = MagicMock() + radio.spreading_factor = 9 + radio.bandwidth = 62500 + radio.coding_rate = 5 + radio.preamble_length = 17 + radio.frequency = 915_000_000 + radio.tx_power = 14 + + dispatcher = MagicMock() + dispatcher.radio = radio + dispatcher.local_identity = None + dispatcher.send_packet = AsyncMock() + + h = RepeaterHandler.__new__(RepeaterHandler) + h.config = { + "repeater": {"mode": "forward", "cache_ttl": 3600, + "send_advert_interval_hours": 0}, + "delays": {"tx_delay_factor": 1.0, "direct_tx_delay_factor": 0.5}, + "duty_cycle": {"enforcement_enabled": True, + "max_airtime_per_minute": 3600}, + "storage": {}, + "mesh": {}, + } + h.dispatcher = dispatcher + h.airtime_mgr = MagicMock() + h.airtime_mgr.can_transmit.return_value = (True, 0.0) + h.airtime_mgr.calculate_airtime.return_value = 100.0 + h._tx_lock = asyncio.Lock() + h.sent_flood_count = 0 + h.sent_direct_count = 0 + # Stub out _record_packet_sent so it doesn't touch packet.header constants + h._record_packet_sent = MagicMock() + return h + + +def _make_packet(size: int = 50) -> MagicMock: + pkt = MagicMock() + pkt.get_raw_length.return_value = size + pkt.header = 0x00 + return pkt + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestTxLockSerialisation(unittest.IsolatedAsyncioTestCase): + + # ── Test 1: no interleaving ───────────────────────────────────────────── + + async def test_concurrent_sends_do_not_interleave(self): + """ + Two delayed_sends with identical delays race to the radio. + send_packet must never be called while another call is already + in-flight — i.e. _tx_lock must gate them sequentially. + """ + h = _make_handler() + pkt = _make_packet() + + in_flight = [False] + overlap_detected = [False] + + async def send_with_overlap_check(*args, **kwargs): + if in_flight[0]: + overlap_detected[0] = True + in_flight[0] = True + await asyncio.sleep(0.05) # simulate ~50ms radio TX + in_flight[0] = False + + h.dispatcher.send_packet.side_effect = send_with_overlap_check + + # Both tasks use the same tiny delay so their timers expire together. + t1 = await h.schedule_retransmit(pkt, delay=0.01, airtime_ms=0) + t2 = await h.schedule_retransmit(pkt, delay=0.01, airtime_ms=0) + await asyncio.gather(t1, t2, return_exceptions=True) + + self.assertFalse( + overlap_detected[0], + "send_packet was entered while another call was already in-flight " + "— _tx_lock is not serialising correctly", + ) + self.assertEqual(h.dispatcher.send_packet.call_count, 2, + "Expected exactly 2 send_packet calls") + + # ── Test 2: TOCTOU duty-cycle fix ────────────────────────────────────── + + async def test_duty_cycle_toctou_is_fixed(self): + """ + When two tasks both pass the advisory can_transmit() check in __call__ + before either has recorded airtime, the authoritative check inside + _tx_lock must ensure only one of them actually transmits. + + Simulated here by making can_transmit return True for the first + in-lock check and False for every subsequent one. + """ + h = _make_handler() + pkt = _make_packet() + airtime_ms = 100.0 + + # First lock-holder gets True; second gets False (budget consumed). + allow = [True] + + def can_tx(ms): + if allow[0]: + allow[0] = False + return (True, 0.0) + return (False, 5.0) + + h.airtime_mgr.can_transmit.side_effect = can_tx + + # Both tasks start simultaneously (delay=0). + t1 = await h.schedule_retransmit(pkt, delay=0.0, airtime_ms=airtime_ms) + t2 = await h.schedule_retransmit(pkt, delay=0.0, airtime_ms=airtime_ms) + await asyncio.gather(t1, t2, return_exceptions=True) + + self.assertEqual( + h.dispatcher.send_packet.call_count, 1, + "Both packets were sent — duty-cycle TOCTOU race was NOT fixed", + ) + + # ── Test 3: retry backoff does not hold the lock ──────────────────────── + + async def test_local_retry_releases_lock_during_backoff(self): + """ + When a local_transmission send fails on the first attempt, the 1-second + backoff sleep must happen with _tx_lock released. + + We schedule: + - pkt_local: local_transmission=True, delay=0s, fails on attempt 1 + - pkt_other: local_transmission=False, delay=0.1s + + pkt_other fires at ~0.1s. Without the fix, the backoff sleep holds + the lock until ~1.0s, so pkt_other would have to wait. With the fix, + pkt_other sends freely at ~0.1s, well before pkt_local retries at ~1.0s. + """ + h = _make_handler() + pkt_local = _make_packet() + pkt_other = _make_packet() + + send_times: dict[int, float] = {} + first_local_call = [True] + + async def tracked_send(*args, **kwargs): + pkt = args[0] + if pkt is pkt_local and first_local_call[0]: + first_local_call[0] = False + raise RuntimeError("simulated transient radio failure") + send_times[id(pkt)] = time.monotonic() + + h.dispatcher.send_packet.side_effect = tracked_send + + t_local = await h.schedule_retransmit( + pkt_local, delay=0.0, airtime_ms=0, local_transmission=True + ) + t_other = await h.schedule_retransmit( + pkt_other, delay=0.1, airtime_ms=0, local_transmission=False + ) + await asyncio.gather(t_local, t_other, return_exceptions=True) + + self.assertIn(id(pkt_other), send_times, + "pkt_other was never sent") + self.assertIn(id(pkt_local), send_times, + "pkt_local retry was never sent") + + # pkt_other fires at ~0.1s; pkt_local retry fires at ~1.0s. + # If the lock were held during backoff, pkt_other would block until ~1.0s + # and would be recorded AFTER pkt_local's retry — the assertion below + # would fail. + self.assertLess( + send_times[id(pkt_other)], + send_times[id(pkt_local)], + "pkt_other sent AFTER pkt_local retry — " + "_tx_lock was still held during the 1-second backoff sleep", + ) + + # ── Test 4: non-local single-attempt re-raises on failure ────────────── + + async def test_non_local_failure_propagates(self): + """A relayed (non-local) packet that fails send_packet raises immediately.""" + h = _make_handler() + pkt = _make_packet() + + h.dispatcher.send_packet.side_effect = RuntimeError("radio error") + + task = await h.schedule_retransmit(pkt, delay=0.0, airtime_ms=0, + local_transmission=False) + with self.assertRaises(RuntimeError): + await task + + # Only one attempt should have been made. + self.assertEqual(h.dispatcher.send_packet.call_count, 1) + + # ── Test 5: duty-cycle check re-runs after backoff ────────────────────── + + async def test_duty_cycle_rechecked_on_retry(self): + """ + If the duty cycle is exhausted during the 1-second backoff, the retry + attempt must still be dropped — i.e. the duty-cycle gate runs on every + lock acquisition, not just the first. + """ + h = _make_handler() + pkt = _make_packet() + + # First attempt: send_packet raises → triggers backoff. + # Between attempts the budget is consumed, so the retry lock sees False. + call_seq = iter([ + # (can_transmit result, send_packet behaviour) + (True, RuntimeError("transient failure")), # attempt 0: passes gate, send fails + (False, None), # attempt 1: gate rejects + ]) + + async def send_side_effect(*args, **kwargs): + _, exc = next(call_seq_sends) + if exc: + raise exc + + transmit_seq = iter([(True, 0.0), (False, 5.0)]) + h.airtime_mgr.can_transmit.side_effect = lambda ms: next(transmit_seq) + + send_calls = [0] + + async def failing_then_gone(*args, **kwargs): + send_calls[0] += 1 + if send_calls[0] == 1: + raise RuntimeError("transient failure") + # Should not reach here on attempt 1 (gate rejects) + pytest_fail = AssertionError("send_packet called on attempt 1 despite gate rejection") + raise pytest_fail + + h.dispatcher.send_packet.side_effect = failing_then_gone + + task = await h.schedule_retransmit( + pkt, delay=0.0, airtime_ms=100.0, local_transmission=True + ) + await task # should complete without error (gate returns silently) + + self.assertEqual(send_calls[0], 1, + "send_packet called on retry despite duty-cycle rejection") + + +if __name__ == "__main__": + unittest.main()