From fdbc85c9269384eb046e0362aa8a60c6daba51ce Mon Sep 17 00:00:00 2001 From: TJ Downes <273720+tjdownes@users.noreply.github.com> Date: Tue, 21 Apr 2026 18:37:56 -0700 Subject: [PATCH 1/2] fix: serialise radio TX and close duty-cycle TOCTOU race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add self._tx_lock (asyncio.Lock) to RepeaterHandler and acquire it inside delayed_send after the per-packet sleep completes. Problem 1 — radio interleave: concurrent delayed_send coroutines (one per queued packet) could both exit their sleep at nearly the same moment and call dispatcher.send_packet simultaneously, interleaving SPI/serial register writes to the half-duplex LoRa radio. Problem 2 — TOCTOU gap: the upfront can_transmit() check in __call__ and the record_tx() call in delayed_send are separated by the entire TX delay (up to several seconds). Under burst conditions two tasks both pass the check before either has recorded its airtime, causing both to transmit and the duty-cycle budget to be exceeded. Fix: acquire _tx_lock after the sleep so delay timers still run concurrently (matching firmware behaviour), then immediately re-check can_transmit() inside the lock before sending. Because only one task holds the lock at a time, airtime state is stable; check and record_tx() are effectively atomic — no TOCTOU window. Airtime is recorded only on a successful send, so a radio failure never inflates the budget. Also move `import random` from inside _calculate_tx_delay to module level (stdlib imports belong at the top; the lazy-import pattern is unnecessary here). Docs: docs/pr_tx_serialization.md — problem statement, root-cause analysis, alternative approaches considered, invariant table, full unit + field test plan, and proof of correctness for the asyncio.Lock approach. Co-Authored-By: Claude Sonnet 4.6 --- docs/pr_tx_serialization.md | 395 ++++++++++++++++++++++++++++++++++++ repeater/engine.py | 92 ++++++--- 2 files changed, 462 insertions(+), 25 deletions(-) create mode 100644 docs/pr_tx_serialization.md diff --git a/docs/pr_tx_serialization.md b/docs/pr_tx_serialization.md new file mode 100644 index 0000000..b9e84d0 --- /dev/null +++ b/docs/pr_tx_serialization.md @@ -0,0 +1,395 @@ +# PR: Serialise Radio TX and Close Duty-Cycle TOCTOU Race + +**Branch:** `fix/tx-serialization` +**Base:** `rightup/fix-perfom-speed` +**Files changed:** `repeater/engine.py` (1 file, ~30 lines net) + +--- + +## Problem + +Two separate bugs share the same root cause: concurrent `delayed_send` coroutines +racing each other at transmission time. + +### Bug 1 — Interleaved SPI/serial commands to the radio + +The queue loop (added in an earlier commit) dispatches each incoming packet as an +`asyncio.create_task`, so multiple `delayed_send` coroutines can have their sleep +timers running concurrently. That is correct and intentional — it mirrors how +firmware nodes use a hardware timer so the radio keeps listening during a TX delay. + +However the LoRa radio is **half-duplex**: it can only transmit one packet at a +time. When two delay timers expire at nearly the same moment both coroutines call +`dispatcher.send_packet` simultaneously. `send_packet` issues a sequence of +SPI/serial register writes to the radio; two tasks interleaving these writes +produces undefined radio state and the transmission of neither packet is reliable. + +### Bug 2 — TOCTOU gap in duty-cycle enforcement + +`__call__` calls `can_transmit()` before scheduling a task: + +```python +# __call__ (before this fix) +can_tx, wait_time = self.airtime_mgr.can_transmit(airtime_ms) +if not can_tx: + ... # drop or defer +tx_task = await self.schedule_retransmit(fwd_pkt, delay, airtime_ms, ...) +``` + +`record_tx()` is only called later, inside `delayed_send`, after the sleep +completes. Between the check and the debit there is a window that spans the +entire TX delay (up to several seconds). Two packets that both pass the check +before either has slept and recorded its airtime will **both** be transmitted even +if transmitting both would exceed the duty-cycle budget. + +Under normal single-packet conditions this window is harmless. Under burst +conditions — multi-hop amplification, collision retries, or a busy mesh segment +where several packets arrive within the same delay window — multiple tasks pass +the advisory check simultaneously, and the duty-cycle limit is exceeded. + +--- + +## Root Cause + +There is no mutual exclusion around the radio send path. Each `delayed_send` +coroutine independently checks duty-cycle, sleeps, and transmits without +coordinating with any other concurrent coroutine doing the same thing. + +--- + +## Solution + +Add `self._tx_lock = asyncio.Lock()` (initialised in `__init__`) and acquire it +inside `delayed_send` **after** the sleep completes: + +``` +Delay timers run concurrently (unchanged): + Task A: sleep(1.2s) ──────────────────► acquire _tx_lock → check → TX A → release + Task B: sleep(0.9s) ──────────────────► acquire _tx_lock (waits) ──────────► check → TX B → release + Task C: sleep(2.1s) ────────────────────────────────────────────────────────────────► ... + +Radio: one packet at a time, duty-cycle state always stable inside the lock. +``` + +Inside the lock, a **second** `can_transmit()` call is made immediately before +sending. Because only one task holds the lock at a time, airtime state is stable +at this point and `record_tx()` follows on success — check and debit are +effectively atomic. This closes the TOCTOU window completely. + +The upfront `can_transmit()` in `__call__` is retained as an **advisory** fast +path: it still drops or defers packets that are obviously over budget before a +delay task is even scheduled, avoiding unnecessary sleep timers. It is no longer +the enforcement point. + +--- + +## Why This Is the Right Approach + +### Alternative A — Move `record_tx()` before the sleep + +```python +# hypothetical +self.airtime_mgr.record_tx(airtime_ms) # reserve before sleeping +await asyncio.sleep(delay) +await self.dispatcher.send_packet(...) # actual TX +``` + +Records airtime even if the send fails (exception, LBT busy, radio error) — +the budget is debited for a packet that was never transmitted. Over time this +inflates the apparent airtime, causing the node to throttle legitimate traffic +it actually has budget for. Requires a compensating `release_airtime()` on +every failure path, creating new complexity and failure modes. + +### Alternative B — A single global advisory check (status quo before this PR) + +Already demonstrated to fail under burst conditions (two tasks both pass before +either records its airtime). + +### Alternative C — asyncio.Lock (this PR) + +- Delay timers remain concurrent — no regression on the primary non-blocking TX + improvement. +- The check-and-debit pair is atomic within the lock — no TOCTOU window. +- No phantom airtime on send failure — `record_tx()` is only called on success. +- One `asyncio.Lock` object, no new state machines or compensating paths. +- The lock is `async`, so it only blocks other TX tasks, not the event loop or + the packet RX queue. + +### Why `asyncio.Lock` rather than `threading.Lock` + +The entire repeater runs on a single asyncio event loop. `asyncio.Lock` only +yields at `await` points; it does not involve OS threads or context switches. +A `threading.Lock` would work but is semantically wrong here (this is not a +thread-safety problem) and would block the event loop thread if held across an +`await`. + +--- + +## Changes + +### `repeater/engine.py` + +**1. Move `import random` to module level** + +```python +# before (inside _calculate_tx_delay): +def _calculate_tx_delay(self, packet, snr=0.0): + import random + ... + +# after (top of file, with other stdlib imports): +import random +``` + +This is a housekeeping fix bundled with this PR because `random` is a stdlib +module that should never be imported inside a hot-path function — Python caches +the import after the first call, but the attribute lookup and cache check still +run on every call. Moving it to module level is the standard pattern. + +**2. Add `self._tx_lock` to `__init__`** + +```python +# Serialise all radio TX calls. +# +# Background: since the queue loop dispatches each packet as an +# asyncio.create_task, multiple _route_packet coroutines can have their +# TX delay timers running concurrently — which is the intended behaviour +# (firmware nodes do the same with a hardware timer). However, the +# LoRa radio is half-duplex: it can only transmit one packet at a time. +# Without serialisation, two tasks whose delay timers expire near- +# simultaneously both call dispatcher.send_packet, interleaving SPI/serial +# commands to the radio and both passing the LBT check before either has +# actually transmitted. +# +# _tx_lock is acquired after each delay sleep and held for the entire +# send_packet call. Delays still run concurrently; only the radio +# access is serialised. This also eliminates the TOCTOU gap in duty-cycle +# enforcement — see schedule_retransmit / delayed_send for details. +self._tx_lock = asyncio.Lock() +``` + +**3. Acquire lock inside `delayed_send`, add authoritative duty-cycle gate** + +```python +async def delayed_send(): + await asyncio.sleep(delay) + + # Acquire the TX lock *after* the delay so that delay timers for + # multiple packets still run concurrently (matching firmware). Only + # one coroutine enters the radio send path at a time. + async with self._tx_lock: + # ── Authoritative duty-cycle gate ───────────────────────────── + # The upfront can_transmit() call in __call__ is advisory: it + # avoids scheduling packets that are obviously over budget, but + # it cannot prevent a race between two tasks whose delay timers + # expire at almost the same moment. Both tasks pass the advisory + # check before either has recorded its airtime, then both try to + # transmit. + # + # Inside _tx_lock only one task runs at a time, so airtime state + # is stable here. The check and the subsequent record_tx() are + # effectively atomic — no TOCTOU window. + if airtime_ms > 0: + can_tx_now, _ = self.airtime_mgr.can_transmit(airtime_ms) + if not can_tx_now: + logger.warning( + "Packet dropped at TX time: duty-cycle exceeded " + "(airtime=%.1fms)", airtime_ms, + ) + return + + last_error = None + for attempt in range(2 if local_transmission else 1): + try: + await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False) + self._record_packet_sent(fwd_pkt) + if airtime_ms > 0: + self.airtime_mgr.record_tx(airtime_ms) + ... +``` + +--- + +## Invariants Maintained + +| Property | Before | After | +|----------|--------|-------| +| Delay timers run concurrently | ✅ | ✅ | +| Radio accessed by one task at a time | ❌ | ✅ | +| Duty-cycle check and debit atomic | ❌ | ✅ | +| Airtime recorded only on TX success | ✅ | ✅ | +| Event loop not blocked by lock | ✅ | ✅ (asyncio.Lock) | + +--- + +## Test Plan + +### Unit tests (can run without hardware) + +**T1 — Serial TX ordering** + +```python +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +async def test_tx_serialized(): + """Two tasks whose delays expire simultaneously must not interleave.""" + send_order = [] + send_lock = asyncio.Lock() + + async def mock_send(pkt, **kw): + # Confirm the _tx_lock is already held when we enter send_packet + assert send_lock.locked(), "send_packet called without _tx_lock held" + send_order.append(pkt) + await asyncio.sleep(0) # yield; a second task must not enter here + + engine._tx_lock = send_lock # replace with the mock lock reference + engine.dispatcher.send_packet = mock_send + + t1 = asyncio.create_task(engine.schedule_retransmit(pkt_a, delay=0.01, airtime_ms=100)) + t2 = asyncio.create_task(engine.schedule_retransmit(pkt_b, delay=0.01, airtime_ms=100)) + await asyncio.gather(t1, t2) + + assert len(send_order) == 2 # both transmitted + assert send_order[0] is not send_order[1] # different packets +``` + +**T2 — Authoritative duty-cycle gate blocks over-budget second packet** + +```python +async def test_second_packet_dropped_when_over_budget(): + """When first TX fills the budget, second task must be dropped inside the lock.""" + # Set a tiny budget: 50ms per minute + engine.airtime_mgr.max_airtime_per_minute = 50 + + sent = [] + async def mock_send(pkt, **kw): + sent.append(pkt) + + engine.dispatcher.send_packet = mock_send + + # Each packet costs ~111ms (SF8, BW125, 30-byte payload) — first passes, second must not + t1 = asyncio.create_task(engine.schedule_retransmit(pkt_a, delay=0.01, airtime_ms=111)) + t2 = asyncio.create_task(engine.schedule_retransmit(pkt_b, delay=0.01, airtime_ms=111)) + await asyncio.gather(t1, t2) + + assert len(sent) == 1, f"Expected 1 TX, got {len(sent)}" +``` + +**T3 — Airtime not debited on TX failure** + +```python +async def test_airtime_not_recorded_on_send_failure(): + before = engine.airtime_mgr.total_airtime_ms + + async def failing_send(pkt, **kw): + raise RuntimeError("radio error") + + engine.dispatcher.send_packet = failing_send + + with pytest.raises(RuntimeError): + await engine.schedule_retransmit(pkt, delay=0, airtime_ms=100) + + assert engine.airtime_mgr.total_airtime_ms == before, \ + "Airtime must not be recorded when send raises" +``` + +**T4 — Advisory check still drops before scheduling (fast path not regressed)** + +```python +async def test_advisory_check_still_drops_obvious_overage(): + """__call__ should not even schedule a task when clearly over budget.""" + engine.airtime_mgr.max_airtime_per_minute = 0 # budget exhausted + + tasks_created = [] + original = asyncio.create_task + asyncio.create_task = lambda coro: tasks_created.append(coro) or original(coro) + + await engine(over_budget_packet, metadata={}) + + assert not tasks_created, "No task should be created when advisory check fails" +``` + +### Integration / field tests (with hardware) + +**T5 — Burst scenario: 5 packets arrive within the same delay window** + +1. Connect the repeater to a radio. +2. Using a second node, send 5 FLOOD packets in quick succession (< 100 ms apart) + with a low RSSI score so the repeater's delay is ~1–2 s for all of them. +3. Monitor the radio with a spectrum analyser or a third node running in monitor + mode. + +**Expected (after this fix):** +- Transmissions are sequential — no overlapping on-air signals. +- `Retransmitted packet` log lines appear one after another, each with a non-zero + airtime value. +- No `Retransmit failed` errors in the log. +- Duty-cycle log shows airtime accumulating correctly. + +**Expected (before this fix, to confirm the bug existed):** +- Occasional `Retransmit failed` errors under burst load. +- Airtime tracking diverging from actual on-air time (double-counted or missed). + +**T6 — Duty-cycle enforcement under burst** + +1. Set `max_airtime_per_minute` to a low value (e.g. 500 ms) in config. +2. Send 10 packets rapidly so the repeater tries to forward all 10. +3. Observe logs. + +**Expected:** +- First N packets transmitted (total airtime ≤ 500 ms). +- Subsequent packets log `"Packet dropped at TX time: duty-cycle exceeded"` from + inside `delayed_send` (not just the advisory drop). +- `airtime_mgr.get_stats()["utilization_percent"]` reads ≤ 100%. + +**T7 — Normal single-packet forwarding not regressed** + +1. Send one packet every 5 seconds (well within duty-cycle budget). +2. Verify each packet is forwarded with correct airtime logged. +3. Verify no lock contention warnings in the log. + +**T8 — Local TX retry path (local_transmission=True) still works** + +1. Send a command that triggers a local transmission (e.g. a ping reply). +2. Briefly block the radio (simulate with a mock) so the first attempt fails. +3. Verify the retry fires after 1 s and the packet is eventually transmitted. + +--- + +## Proof of Correctness + +### Why `asyncio.Lock` is sufficient (no OS-level synchronisation needed) + +Python's asyncio event loop is **single-threaded**. All coroutines share one +thread and only yield execution at `await` points. Between two consecutive +`await` calls in a coroutine, the event loop does not switch to another coroutine. + +`asyncio.Lock.acquire()` suspends the current coroutine if the lock is held, +returning control to the event loop. `asyncio.Lock.release()` wakes the next +waiter. Because `send_packet` is awaited inside the lock, no other TX task can +run until the current one releases the lock and the event loop gets a chance to +schedule the next waiter. + +There is no possibility of the race seen with `threading.Lock` where an OS thread +can be preempted mid-instruction. + +### Why the advisory check in `__call__` cannot be removed + +The advisory check is still necessary as a fast path. If it were removed, every +incoming packet — even when the node is clearly at 100% duty-cycle — would +schedule a `delayed_send` task that would sleep for the full TX delay (up to +several seconds) before the lock drops it. Under a sustained flood of incoming +packets this wastes memory and CPU. The advisory check prunes the queue early at +negligible cost. + +### Why `record_tx()` must be inside the lock (not before or after) + +- **Before the send:** records airtime for a packet that may never be transmitted + (send could fail, LBT could reject it). Budget is overcounted. +- **After releasing the lock:** a second task could pass the authoritative + `can_transmit()` check between `send_packet` returning and `record_tx()` being + called — the TOCTOU window reopens at a smaller scale. +- **Inside the lock, after a successful send:** the budget is debited exactly once + for exactly the packets that were actually transmitted. The lock ensures no + other task reads airtime state between the check and the debit. diff --git a/repeater/engine.py b/repeater/engine.py index ad8711a..760d6b3 100644 --- a/repeater/engine.py +++ b/repeater/engine.py @@ -1,6 +1,7 @@ import asyncio import copy import logging +import random import struct import time from collections import OrderedDict, deque @@ -135,6 +136,24 @@ class RepeaterHandler(BaseHandler): self._transport_keys_cache_time = 0 self._transport_keys_cache_ttl = 60 # Cache for 60 seconds + # Serialise all radio TX calls. + # + # Background: since the queue loop dispatches each packet as an + # asyncio.create_task, multiple _route_packet coroutines can have their + # TX delay timers running concurrently — which is the intended behaviour + # (firmware nodes do the same with a hardware timer). However, the + # LoRa radio is half-duplex: it can only transmit one packet at a time. + # Without serialisation, two tasks whose delay timers expire near- + # simultaneously both call dispatcher.send_packet, interleaving SPI/serial + # commands to the radio and both passing the LBT check before either has + # actually transmitted. + # + # _tx_lock is acquired after each delay sleep and held for the entire + # send_packet call. Delays still run concurrently; only the radio + # access is serialised. This also eliminates the TOCTOU gap in duty-cycle + # enforcement — see schedule_retransmit / delayed_send for details. + self._tx_lock = asyncio.Lock() + self._start_background_tasks() async def __call__( @@ -923,8 +942,6 @@ class RepeaterHandler(BaseHandler): def _calculate_tx_delay(self, packet: Packet, snr: float = 0.0) -> float: - import random - packet_len = packet.get_raw_length() airtime_ms = self.airtime_mgr.calculate_airtime(packet_len) @@ -1006,29 +1023,54 @@ class RepeaterHandler(BaseHandler): async def delayed_send(): await asyncio.sleep(delay) - last_error = None - for attempt in range(2 if local_transmission else 1): - try: - await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False) - self._record_packet_sent(fwd_pkt) - if airtime_ms > 0: - self.airtime_mgr.record_tx(airtime_ms) - packet_size = fwd_pkt.get_raw_length() - logger.info( - f"Retransmitted packet ({packet_size} bytes, " - f"{airtime_ms:.1f}ms airtime)" - ) - return - except Exception as e: - last_error = e - logger.error(f"Retransmit failed: {e}") - if local_transmission and attempt == 0: - logger.info("Retrying local TX in 1s...") - await asyncio.sleep(1.0) - else: - raise - if last_error is not None: - raise last_error + + # Acquire the TX lock *after* the delay so that delay timers for + # multiple packets still run concurrently (matching firmware). Only + # one coroutine enters the radio send path at a time. + async with self._tx_lock: + # ── Authoritative duty-cycle gate ───────────────────────────── + # The upfront can_transmit() call in __call__ is advisory: it + # avoids scheduling packets that are obviously over budget, but + # it cannot prevent a race between two tasks whose delay timers + # expire at almost the same moment. Both tasks pass the advisory + # check before either has recorded its airtime, then both try to + # transmit. + # + # Inside _tx_lock only one task runs at a time, so airtime state + # is stable here. The check and the subsequent record_tx() are + # effectively atomic — no TOCTOU window. + if airtime_ms > 0: + can_tx_now, _ = self.airtime_mgr.can_transmit(airtime_ms) + if not can_tx_now: + logger.warning( + "Packet dropped at TX time: duty-cycle exceeded " + "(airtime=%.1fms)", airtime_ms, + ) + return + + last_error = None + for attempt in range(2 if local_transmission else 1): + try: + await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False) + self._record_packet_sent(fwd_pkt) + if airtime_ms > 0: + self.airtime_mgr.record_tx(airtime_ms) + packet_size = fwd_pkt.get_raw_length() + logger.info( + f"Retransmitted packet ({packet_size} bytes, " + f"{airtime_ms:.1f}ms airtime)" + ) + return + except Exception as e: + last_error = e + logger.error(f"Retransmit failed: {e}") + if local_transmission and attempt == 0: + logger.info("Retrying local TX in 1s...") + await asyncio.sleep(1.0) + else: + raise + if last_error is not None: + raise last_error return asyncio.create_task(delayed_send()) From 179158e68b532696fc34f3057ca7a2a82e400461 Mon Sep 17 00:00:00 2001 From: TJ Downes <273720+tjdownes@users.noreply.github.com> Date: Wed, 22 Apr 2026 05:29:47 -0700 Subject: [PATCH 2/2] fix(engine): release _tx_lock during local-TX retry backoff; add lock tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewer concern (PR 190): The 1-second backoff sleep for local_transmission retry happened inside `async with self._tx_lock`, blocking all other queued TX tasks for the full second — hurting latency and throughput under load. Fix — tighten lock scope to one attempt per acquisition: Before: acquire lock → [attempt 0 → sleep(1) → attempt 1] → release After: for each attempt: [sleep(1) if retry] ← OUTSIDE the lock acquire lock re-check can_transmit ← fresh check every acquisition attempt single send record_tx on success release lock The duty-cycle gate now runs on every lock acquisition (not just the first), which is correct: airtime state may change during the backoff sleep. Tests added (tests/test_tx_lock.py): 1. test_concurrent_sends_do_not_interleave — two tasks racing to the same delay timer must never overlap inside send_packet. 2. test_duty_cycle_toctou_is_fixed — second packet is dropped when the first consumes the budget inside the lock. 3. test_local_retry_releases_lock_during_backoff — a concurrent relayed packet fires at ~0.1s while local retry sleeps 1s; confirms it is not blocked by the backoff. 4. test_non_local_failure_propagates — relayed send failure raises immediately with exactly one attempt. 5. test_duty_cycle_rechecked_on_retry — if the budget is exhausted during backoff, the retry is dropped by the in-lock gate (not sent). Co-Authored-By: Claude Sonnet 4.6 --- repeater/engine.py | 67 +++++----- tests/test_tx_lock.py | 279 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 315 insertions(+), 31 deletions(-) create mode 100644 tests/test_tx_lock.py diff --git a/repeater/engine.py b/repeater/engine.py index 760d6b3..72d7854 100644 --- a/repeater/engine.py +++ b/repeater/engine.py @@ -1024,32 +1024,41 @@ class RepeaterHandler(BaseHandler): async def delayed_send(): await asyncio.sleep(delay) - # Acquire the TX lock *after* the delay so that delay timers for - # multiple packets still run concurrently (matching firmware). Only - # one coroutine enters the radio send path at a time. - async with self._tx_lock: - # ── Authoritative duty-cycle gate ───────────────────────────── - # The upfront can_transmit() call in __call__ is advisory: it - # avoids scheduling packets that are obviously over budget, but - # it cannot prevent a race between two tasks whose delay timers - # expire at almost the same moment. Both tasks pass the advisory - # check before either has recorded its airtime, then both try to - # transmit. - # - # Inside _tx_lock only one task runs at a time, so airtime state - # is stable here. The check and the subsequent record_tx() are - # effectively atomic — no TOCTOU window. - if airtime_ms > 0: - can_tx_now, _ = self.airtime_mgr.can_transmit(airtime_ms) - if not can_tx_now: - logger.warning( - "Packet dropped at TX time: duty-cycle exceeded " - "(airtime=%.1fms)", airtime_ms, - ) - return + # Each attempt gets its own lock acquisition so the 1-second retry + # backoff (local_transmission only) happens OUTSIDE the lock. + # Holding _tx_lock across asyncio.sleep(1.0) would block every other + # queued TX task for the full backoff period. + # + # Loop runs once for relayed packets, twice for local_transmission: + # attempt 0 — initial try (no pre-sleep) + # attempt 1 — retry after 1s backoff outside the lock + for attempt in range(2 if local_transmission else 1): + if attempt > 0: + # Back-off OUTSIDE the lock — other tasks can transmit here. + logger.info("Retrying local TX in 1s (lock released during backoff)...") + await asyncio.sleep(1.0) + + async with self._tx_lock: + # ── Authoritative duty-cycle gate ────────────────────────── + # The upfront can_transmit() call in __call__ is advisory: it + # avoids scheduling packets obviously over budget, but cannot + # prevent a race between tasks whose delay timers expire nearly + # simultaneously. Both pass the advisory check before either + # records airtime, then both attempt to transmit. + # + # Inside _tx_lock only one task runs at a time. The check and + # record_tx() are effectively atomic — no TOCTOU window. + # Re-checked every attempt because airtime state may change + # while we wait for the lock or sleep through backoff. + if airtime_ms > 0: + can_tx_now, _ = self.airtime_mgr.can_transmit(airtime_ms) + if not can_tx_now: + logger.warning( + "Packet dropped at TX time: duty-cycle exceeded " + "(airtime=%.1fms)", airtime_ms, + ) + return - last_error = None - for attempt in range(2 if local_transmission else 1): try: await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False) self._record_packet_sent(fwd_pkt) @@ -1062,15 +1071,11 @@ class RepeaterHandler(BaseHandler): ) return except Exception as e: - last_error = e - logger.error(f"Retransmit failed: {e}") + logger.error(f"Retransmit failed (attempt {attempt + 1}): {e}") if local_transmission and attempt == 0: - logger.info("Retrying local TX in 1s...") - await asyncio.sleep(1.0) + pass # release lock, outer loop sleeps, then retries else: raise - if last_error is not None: - raise last_error return asyncio.create_task(delayed_send()) diff --git a/tests/test_tx_lock.py b/tests/test_tx_lock.py new file mode 100644 index 0000000..8f4f1c4 --- /dev/null +++ b/tests/test_tx_lock.py @@ -0,0 +1,279 @@ +""" +Tests for TX lock serialisation and duty-cycle TOCTOU fix. +Addresses the three concerns raised in PR 190 review: + + 1. Concurrent delayed_sends must not interleave send_packet calls. + 2. Duty-cycle TOCTOU must be fixed: the second packet is dropped when the + first consumes the airtime budget inside the lock. + 3. Local retry must NOT hold _tx_lock during the 1-second backoff sleep — + other queued packets must be able to transmit during that window. + +Run with: + python -m pytest tests/test_tx_lock.py -v +or: + python -m unittest tests.test_tx_lock +""" + +import asyncio +import time +import unittest +from collections import deque +from unittest.mock import AsyncMock, MagicMock + + +# --------------------------------------------------------------------------- +# Minimal handler factory +# --------------------------------------------------------------------------- + +def _make_handler(): + """ + Return a RepeaterHandler instance with all external I/O mocked. + + Uses __new__ + manual attribute injection to bypass StorageCollector, + radio hardware, and other heavy dependencies that are irrelevant to the + TX lock behaviour under test. + """ + from repeater.engine import RepeaterHandler + + radio = MagicMock() + radio.spreading_factor = 9 + radio.bandwidth = 62500 + radio.coding_rate = 5 + radio.preamble_length = 17 + radio.frequency = 915_000_000 + radio.tx_power = 14 + + dispatcher = MagicMock() + dispatcher.radio = radio + dispatcher.local_identity = None + dispatcher.send_packet = AsyncMock() + + h = RepeaterHandler.__new__(RepeaterHandler) + h.config = { + "repeater": {"mode": "forward", "cache_ttl": 3600, + "send_advert_interval_hours": 0}, + "delays": {"tx_delay_factor": 1.0, "direct_tx_delay_factor": 0.5}, + "duty_cycle": {"enforcement_enabled": True, + "max_airtime_per_minute": 3600}, + "storage": {}, + "mesh": {}, + } + h.dispatcher = dispatcher + h.airtime_mgr = MagicMock() + h.airtime_mgr.can_transmit.return_value = (True, 0.0) + h.airtime_mgr.calculate_airtime.return_value = 100.0 + h._tx_lock = asyncio.Lock() + h.sent_flood_count = 0 + h.sent_direct_count = 0 + # Stub out _record_packet_sent so it doesn't touch packet.header constants + h._record_packet_sent = MagicMock() + return h + + +def _make_packet(size: int = 50) -> MagicMock: + pkt = MagicMock() + pkt.get_raw_length.return_value = size + pkt.header = 0x00 + return pkt + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestTxLockSerialisation(unittest.IsolatedAsyncioTestCase): + + # ── Test 1: no interleaving ───────────────────────────────────────────── + + async def test_concurrent_sends_do_not_interleave(self): + """ + Two delayed_sends with identical delays race to the radio. + send_packet must never be called while another call is already + in-flight — i.e. _tx_lock must gate them sequentially. + """ + h = _make_handler() + pkt = _make_packet() + + in_flight = [False] + overlap_detected = [False] + + async def send_with_overlap_check(*args, **kwargs): + if in_flight[0]: + overlap_detected[0] = True + in_flight[0] = True + await asyncio.sleep(0.05) # simulate ~50ms radio TX + in_flight[0] = False + + h.dispatcher.send_packet.side_effect = send_with_overlap_check + + # Both tasks use the same tiny delay so their timers expire together. + t1 = await h.schedule_retransmit(pkt, delay=0.01, airtime_ms=0) + t2 = await h.schedule_retransmit(pkt, delay=0.01, airtime_ms=0) + await asyncio.gather(t1, t2, return_exceptions=True) + + self.assertFalse( + overlap_detected[0], + "send_packet was entered while another call was already in-flight " + "— _tx_lock is not serialising correctly", + ) + self.assertEqual(h.dispatcher.send_packet.call_count, 2, + "Expected exactly 2 send_packet calls") + + # ── Test 2: TOCTOU duty-cycle fix ────────────────────────────────────── + + async def test_duty_cycle_toctou_is_fixed(self): + """ + When two tasks both pass the advisory can_transmit() check in __call__ + before either has recorded airtime, the authoritative check inside + _tx_lock must ensure only one of them actually transmits. + + Simulated here by making can_transmit return True for the first + in-lock check and False for every subsequent one. + """ + h = _make_handler() + pkt = _make_packet() + airtime_ms = 100.0 + + # First lock-holder gets True; second gets False (budget consumed). + allow = [True] + + def can_tx(ms): + if allow[0]: + allow[0] = False + return (True, 0.0) + return (False, 5.0) + + h.airtime_mgr.can_transmit.side_effect = can_tx + + # Both tasks start simultaneously (delay=0). + t1 = await h.schedule_retransmit(pkt, delay=0.0, airtime_ms=airtime_ms) + t2 = await h.schedule_retransmit(pkt, delay=0.0, airtime_ms=airtime_ms) + await asyncio.gather(t1, t2, return_exceptions=True) + + self.assertEqual( + h.dispatcher.send_packet.call_count, 1, + "Both packets were sent — duty-cycle TOCTOU race was NOT fixed", + ) + + # ── Test 3: retry backoff does not hold the lock ──────────────────────── + + async def test_local_retry_releases_lock_during_backoff(self): + """ + When a local_transmission send fails on the first attempt, the 1-second + backoff sleep must happen with _tx_lock released. + + We schedule: + - pkt_local: local_transmission=True, delay=0s, fails on attempt 1 + - pkt_other: local_transmission=False, delay=0.1s + + pkt_other fires at ~0.1s. Without the fix, the backoff sleep holds + the lock until ~1.0s, so pkt_other would have to wait. With the fix, + pkt_other sends freely at ~0.1s, well before pkt_local retries at ~1.0s. + """ + h = _make_handler() + pkt_local = _make_packet() + pkt_other = _make_packet() + + send_times: dict[int, float] = {} + first_local_call = [True] + + async def tracked_send(*args, **kwargs): + pkt = args[0] + if pkt is pkt_local and first_local_call[0]: + first_local_call[0] = False + raise RuntimeError("simulated transient radio failure") + send_times[id(pkt)] = time.monotonic() + + h.dispatcher.send_packet.side_effect = tracked_send + + t_local = await h.schedule_retransmit( + pkt_local, delay=0.0, airtime_ms=0, local_transmission=True + ) + t_other = await h.schedule_retransmit( + pkt_other, delay=0.1, airtime_ms=0, local_transmission=False + ) + await asyncio.gather(t_local, t_other, return_exceptions=True) + + self.assertIn(id(pkt_other), send_times, + "pkt_other was never sent") + self.assertIn(id(pkt_local), send_times, + "pkt_local retry was never sent") + + # pkt_other fires at ~0.1s; pkt_local retry fires at ~1.0s. + # If the lock were held during backoff, pkt_other would block until ~1.0s + # and would be recorded AFTER pkt_local's retry — the assertion below + # would fail. + self.assertLess( + send_times[id(pkt_other)], + send_times[id(pkt_local)], + "pkt_other sent AFTER pkt_local retry — " + "_tx_lock was still held during the 1-second backoff sleep", + ) + + # ── Test 4: non-local single-attempt re-raises on failure ────────────── + + async def test_non_local_failure_propagates(self): + """A relayed (non-local) packet that fails send_packet raises immediately.""" + h = _make_handler() + pkt = _make_packet() + + h.dispatcher.send_packet.side_effect = RuntimeError("radio error") + + task = await h.schedule_retransmit(pkt, delay=0.0, airtime_ms=0, + local_transmission=False) + with self.assertRaises(RuntimeError): + await task + + # Only one attempt should have been made. + self.assertEqual(h.dispatcher.send_packet.call_count, 1) + + # ── Test 5: duty-cycle check re-runs after backoff ────────────────────── + + async def test_duty_cycle_rechecked_on_retry(self): + """ + If the duty cycle is exhausted during the 1-second backoff, the retry + attempt must still be dropped — i.e. the duty-cycle gate runs on every + lock acquisition, not just the first. + """ + h = _make_handler() + pkt = _make_packet() + + # First attempt: send_packet raises → triggers backoff. + # Between attempts the budget is consumed, so the retry lock sees False. + call_seq = iter([ + # (can_transmit result, send_packet behaviour) + (True, RuntimeError("transient failure")), # attempt 0: passes gate, send fails + (False, None), # attempt 1: gate rejects + ]) + + async def send_side_effect(*args, **kwargs): + _, exc = next(call_seq_sends) + if exc: + raise exc + + transmit_seq = iter([(True, 0.0), (False, 5.0)]) + h.airtime_mgr.can_transmit.side_effect = lambda ms: next(transmit_seq) + + send_calls = [0] + + async def failing_then_gone(*args, **kwargs): + send_calls[0] += 1 + if send_calls[0] == 1: + raise RuntimeError("transient failure") + # Should not reach here on attempt 1 (gate rejects) + pytest_fail = AssertionError("send_packet called on attempt 1 despite gate rejection") + raise pytest_fail + + h.dispatcher.send_packet.side_effect = failing_then_gone + + task = await h.schedule_retransmit( + pkt, delay=0.0, airtime_ms=100.0, local_transmission=True + ) + await task # should complete without error (gate returns silently) + + self.assertEqual(send_calls[0], 1, + "send_packet called on retry despite duty-cycle rejection") + + +if __name__ == "__main__": + unittest.main()