mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-05-07 22:14:28 +02:00
Merge pull request #190 from tjdownes/fix/tx-serialization
fix: serialise radio TX and close duty-cycle TOCTOU race
This commit is contained in:
@@ -0,0 +1,395 @@
|
||||
# PR: Serialise Radio TX and Close Duty-Cycle TOCTOU Race
|
||||
|
||||
**Branch:** `fix/tx-serialization`
|
||||
**Base:** `rightup/fix-perfom-speed`
|
||||
**Files changed:** `repeater/engine.py` (1 file, ~30 lines net)
|
||||
|
||||
---
|
||||
|
||||
## Problem
|
||||
|
||||
Two separate bugs share the same root cause: concurrent `delayed_send` coroutines
|
||||
racing each other at transmission time.
|
||||
|
||||
### Bug 1 — Interleaved SPI/serial commands to the radio
|
||||
|
||||
The queue loop (added in an earlier commit) dispatches each incoming packet as an
|
||||
`asyncio.create_task`, so multiple `delayed_send` coroutines can have their sleep
|
||||
timers running concurrently. That is correct and intentional — it mirrors how
|
||||
firmware nodes use a hardware timer so the radio keeps listening during a TX delay.
|
||||
|
||||
However the LoRa radio is **half-duplex**: it can only transmit one packet at a
|
||||
time. When two delay timers expire at nearly the same moment both coroutines call
|
||||
`dispatcher.send_packet` simultaneously. `send_packet` issues a sequence of
|
||||
SPI/serial register writes to the radio; two tasks interleaving these writes
|
||||
produces undefined radio state and the transmission of neither packet is reliable.
|
||||
|
||||
### Bug 2 — TOCTOU gap in duty-cycle enforcement
|
||||
|
||||
`__call__` calls `can_transmit()` before scheduling a task:
|
||||
|
||||
```python
|
||||
# __call__ (before this fix)
|
||||
can_tx, wait_time = self.airtime_mgr.can_transmit(airtime_ms)
|
||||
if not can_tx:
|
||||
... # drop or defer
|
||||
tx_task = await self.schedule_retransmit(fwd_pkt, delay, airtime_ms, ...)
|
||||
```
|
||||
|
||||
`record_tx()` is only called later, inside `delayed_send`, after the sleep
|
||||
completes. Between the check and the debit there is a window that spans the
|
||||
entire TX delay (up to several seconds). Two packets that both pass the check
|
||||
before either has slept and recorded its airtime will **both** be transmitted even
|
||||
if transmitting both would exceed the duty-cycle budget.
|
||||
|
||||
Under normal single-packet conditions this window is harmless. Under burst
|
||||
conditions — multi-hop amplification, collision retries, or a busy mesh segment
|
||||
where several packets arrive within the same delay window — multiple tasks pass
|
||||
the advisory check simultaneously, and the duty-cycle limit is exceeded.
|
||||
|
||||
---
|
||||
|
||||
## Root Cause
|
||||
|
||||
There is no mutual exclusion around the radio send path. Each `delayed_send`
|
||||
coroutine independently checks duty-cycle, sleeps, and transmits without
|
||||
coordinating with any other concurrent coroutine doing the same thing.
|
||||
|
||||
---
|
||||
|
||||
## Solution
|
||||
|
||||
Add `self._tx_lock = asyncio.Lock()` (initialised in `__init__`) and acquire it
|
||||
inside `delayed_send` **after** the sleep completes:
|
||||
|
||||
```
|
||||
Delay timers run concurrently (unchanged):
|
||||
Task A: sleep(1.2s) ──────────────────► acquire _tx_lock → check → TX A → release
|
||||
Task B: sleep(0.9s) ──────────────────► acquire _tx_lock (waits) ──────────► check → TX B → release
|
||||
Task C: sleep(2.1s) ────────────────────────────────────────────────────────────────► ...
|
||||
|
||||
Radio: one packet at a time, duty-cycle state always stable inside the lock.
|
||||
```
|
||||
|
||||
Inside the lock, a **second** `can_transmit()` call is made immediately before
|
||||
sending. Because only one task holds the lock at a time, airtime state is stable
|
||||
at this point and `record_tx()` follows on success — check and debit are
|
||||
effectively atomic. This closes the TOCTOU window completely.
|
||||
|
||||
The upfront `can_transmit()` in `__call__` is retained as an **advisory** fast
|
||||
path: it still drops or defers packets that are obviously over budget before a
|
||||
delay task is even scheduled, avoiding unnecessary sleep timers. It is no longer
|
||||
the enforcement point.
|
||||
|
||||
---
|
||||
|
||||
## Why This Is the Right Approach
|
||||
|
||||
### Alternative A — Move `record_tx()` before the sleep
|
||||
|
||||
```python
|
||||
# hypothetical
|
||||
self.airtime_mgr.record_tx(airtime_ms) # reserve before sleeping
|
||||
await asyncio.sleep(delay)
|
||||
await self.dispatcher.send_packet(...) # actual TX
|
||||
```
|
||||
|
||||
Records airtime even if the send fails (exception, LBT busy, radio error) —
|
||||
the budget is debited for a packet that was never transmitted. Over time this
|
||||
inflates the apparent airtime, causing the node to throttle legitimate traffic
|
||||
it actually has budget for. Requires a compensating `release_airtime()` on
|
||||
every failure path, creating new complexity and failure modes.
|
||||
|
||||
### Alternative B — A single global advisory check (status quo before this PR)
|
||||
|
||||
Already demonstrated to fail under burst conditions (two tasks both pass before
|
||||
either records its airtime).
|
||||
|
||||
### Alternative C — asyncio.Lock (this PR)
|
||||
|
||||
- Delay timers remain concurrent — no regression on the primary non-blocking TX
|
||||
improvement.
|
||||
- The check-and-debit pair is atomic within the lock — no TOCTOU window.
|
||||
- No phantom airtime on send failure — `record_tx()` is only called on success.
|
||||
- One `asyncio.Lock` object, no new state machines or compensating paths.
|
||||
- The lock is `async`, so it only blocks other TX tasks, not the event loop or
|
||||
the packet RX queue.
|
||||
|
||||
### Why `asyncio.Lock` rather than `threading.Lock`
|
||||
|
||||
The entire repeater runs on a single asyncio event loop. `asyncio.Lock` only
|
||||
yields at `await` points; it does not involve OS threads or context switches.
|
||||
A `threading.Lock` would work but is semantically wrong here (this is not a
|
||||
thread-safety problem) and would block the event loop thread if held across an
|
||||
`await`.
|
||||
|
||||
---
|
||||
|
||||
## Changes
|
||||
|
||||
### `repeater/engine.py`
|
||||
|
||||
**1. Move `import random` to module level**
|
||||
|
||||
```python
|
||||
# before (inside _calculate_tx_delay):
|
||||
def _calculate_tx_delay(self, packet, snr=0.0):
|
||||
import random
|
||||
...
|
||||
|
||||
# after (top of file, with other stdlib imports):
|
||||
import random
|
||||
```
|
||||
|
||||
This is a housekeeping fix bundled with this PR because `random` is a stdlib
|
||||
module that should never be imported inside a hot-path function — Python caches
|
||||
the import after the first call, but the attribute lookup and cache check still
|
||||
run on every call. Moving it to module level is the standard pattern.
|
||||
|
||||
**2. Add `self._tx_lock` to `__init__`**
|
||||
|
||||
```python
|
||||
# Serialise all radio TX calls.
|
||||
#
|
||||
# Background: since the queue loop dispatches each packet as an
|
||||
# asyncio.create_task, multiple _route_packet coroutines can have their
|
||||
# TX delay timers running concurrently — which is the intended behaviour
|
||||
# (firmware nodes do the same with a hardware timer). However, the
|
||||
# LoRa radio is half-duplex: it can only transmit one packet at a time.
|
||||
# Without serialisation, two tasks whose delay timers expire near-
|
||||
# simultaneously both call dispatcher.send_packet, interleaving SPI/serial
|
||||
# commands to the radio and both passing the LBT check before either has
|
||||
# actually transmitted.
|
||||
#
|
||||
# _tx_lock is acquired after each delay sleep and held for the entire
|
||||
# send_packet call. Delays still run concurrently; only the radio
|
||||
# access is serialised. This also eliminates the TOCTOU gap in duty-cycle
|
||||
# enforcement — see schedule_retransmit / delayed_send for details.
|
||||
self._tx_lock = asyncio.Lock()
|
||||
```
|
||||
|
||||
**3. Acquire lock inside `delayed_send`, add authoritative duty-cycle gate**
|
||||
|
||||
```python
|
||||
async def delayed_send():
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
# Acquire the TX lock *after* the delay so that delay timers for
|
||||
# multiple packets still run concurrently (matching firmware). Only
|
||||
# one coroutine enters the radio send path at a time.
|
||||
async with self._tx_lock:
|
||||
# ── Authoritative duty-cycle gate ─────────────────────────────
|
||||
# The upfront can_transmit() call in __call__ is advisory: it
|
||||
# avoids scheduling packets that are obviously over budget, but
|
||||
# it cannot prevent a race between two tasks whose delay timers
|
||||
# expire at almost the same moment. Both tasks pass the advisory
|
||||
# check before either has recorded its airtime, then both try to
|
||||
# transmit.
|
||||
#
|
||||
# Inside _tx_lock only one task runs at a time, so airtime state
|
||||
# is stable here. The check and the subsequent record_tx() are
|
||||
# effectively atomic — no TOCTOU window.
|
||||
if airtime_ms > 0:
|
||||
can_tx_now, _ = self.airtime_mgr.can_transmit(airtime_ms)
|
||||
if not can_tx_now:
|
||||
logger.warning(
|
||||
"Packet dropped at TX time: duty-cycle exceeded "
|
||||
"(airtime=%.1fms)", airtime_ms,
|
||||
)
|
||||
return
|
||||
|
||||
last_error = None
|
||||
for attempt in range(2 if local_transmission else 1):
|
||||
try:
|
||||
await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False)
|
||||
self._record_packet_sent(fwd_pkt)
|
||||
if airtime_ms > 0:
|
||||
self.airtime_mgr.record_tx(airtime_ms)
|
||||
...
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Invariants Maintained
|
||||
|
||||
| Property | Before | After |
|
||||
|----------|--------|-------|
|
||||
| Delay timers run concurrently | ✅ | ✅ |
|
||||
| Radio accessed by one task at a time | ❌ | ✅ |
|
||||
| Duty-cycle check and debit atomic | ❌ | ✅ |
|
||||
| Airtime recorded only on TX success | ✅ | ✅ |
|
||||
| Event loop not blocked by lock | ✅ | ✅ (asyncio.Lock) |
|
||||
|
||||
---
|
||||
|
||||
## Test Plan
|
||||
|
||||
### Unit tests (can run without hardware)
|
||||
|
||||
**T1 — Serial TX ordering**
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
async def test_tx_serialized():
|
||||
"""Two tasks whose delays expire simultaneously must not interleave."""
|
||||
send_order = []
|
||||
send_lock = asyncio.Lock()
|
||||
|
||||
async def mock_send(pkt, **kw):
|
||||
# Confirm the _tx_lock is already held when we enter send_packet
|
||||
assert send_lock.locked(), "send_packet called without _tx_lock held"
|
||||
send_order.append(pkt)
|
||||
await asyncio.sleep(0) # yield; a second task must not enter here
|
||||
|
||||
engine._tx_lock = send_lock # replace with the mock lock reference
|
||||
engine.dispatcher.send_packet = mock_send
|
||||
|
||||
t1 = asyncio.create_task(engine.schedule_retransmit(pkt_a, delay=0.01, airtime_ms=100))
|
||||
t2 = asyncio.create_task(engine.schedule_retransmit(pkt_b, delay=0.01, airtime_ms=100))
|
||||
await asyncio.gather(t1, t2)
|
||||
|
||||
assert len(send_order) == 2 # both transmitted
|
||||
assert send_order[0] is not send_order[1] # different packets
|
||||
```
|
||||
|
||||
**T2 — Authoritative duty-cycle gate blocks over-budget second packet**
|
||||
|
||||
```python
|
||||
async def test_second_packet_dropped_when_over_budget():
|
||||
"""When first TX fills the budget, second task must be dropped inside the lock."""
|
||||
# Set a tiny budget: 50ms per minute
|
||||
engine.airtime_mgr.max_airtime_per_minute = 50
|
||||
|
||||
sent = []
|
||||
async def mock_send(pkt, **kw):
|
||||
sent.append(pkt)
|
||||
|
||||
engine.dispatcher.send_packet = mock_send
|
||||
|
||||
# Each packet costs ~111ms (SF8, BW125, 30-byte payload) — first passes, second must not
|
||||
t1 = asyncio.create_task(engine.schedule_retransmit(pkt_a, delay=0.01, airtime_ms=111))
|
||||
t2 = asyncio.create_task(engine.schedule_retransmit(pkt_b, delay=0.01, airtime_ms=111))
|
||||
await asyncio.gather(t1, t2)
|
||||
|
||||
assert len(sent) == 1, f"Expected 1 TX, got {len(sent)}"
|
||||
```
|
||||
|
||||
**T3 — Airtime not debited on TX failure**
|
||||
|
||||
```python
|
||||
async def test_airtime_not_recorded_on_send_failure():
|
||||
before = engine.airtime_mgr.total_airtime_ms
|
||||
|
||||
async def failing_send(pkt, **kw):
|
||||
raise RuntimeError("radio error")
|
||||
|
||||
engine.dispatcher.send_packet = failing_send
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
await engine.schedule_retransmit(pkt, delay=0, airtime_ms=100)
|
||||
|
||||
assert engine.airtime_mgr.total_airtime_ms == before, \
|
||||
"Airtime must not be recorded when send raises"
|
||||
```
|
||||
|
||||
**T4 — Advisory check still drops before scheduling (fast path not regressed)**
|
||||
|
||||
```python
|
||||
async def test_advisory_check_still_drops_obvious_overage():
|
||||
"""__call__ should not even schedule a task when clearly over budget."""
|
||||
engine.airtime_mgr.max_airtime_per_minute = 0 # budget exhausted
|
||||
|
||||
tasks_created = []
|
||||
original = asyncio.create_task
|
||||
asyncio.create_task = lambda coro: tasks_created.append(coro) or original(coro)
|
||||
|
||||
await engine(over_budget_packet, metadata={})
|
||||
|
||||
assert not tasks_created, "No task should be created when advisory check fails"
|
||||
```
|
||||
|
||||
### Integration / field tests (with hardware)
|
||||
|
||||
**T5 — Burst scenario: 5 packets arrive within the same delay window**
|
||||
|
||||
1. Connect the repeater to a radio.
|
||||
2. Using a second node, send 5 FLOOD packets in quick succession (< 100 ms apart)
|
||||
with a low RSSI score so the repeater's delay is ~1–2 s for all of them.
|
||||
3. Monitor the radio with a spectrum analyser or a third node running in monitor
|
||||
mode.
|
||||
|
||||
**Expected (after this fix):**
|
||||
- Transmissions are sequential — no overlapping on-air signals.
|
||||
- `Retransmitted packet` log lines appear one after another, each with a non-zero
|
||||
airtime value.
|
||||
- No `Retransmit failed` errors in the log.
|
||||
- Duty-cycle log shows airtime accumulating correctly.
|
||||
|
||||
**Expected (before this fix, to confirm the bug existed):**
|
||||
- Occasional `Retransmit failed` errors under burst load.
|
||||
- Airtime tracking diverging from actual on-air time (double-counted or missed).
|
||||
|
||||
**T6 — Duty-cycle enforcement under burst**
|
||||
|
||||
1. Set `max_airtime_per_minute` to a low value (e.g. 500 ms) in config.
|
||||
2. Send 10 packets rapidly so the repeater tries to forward all 10.
|
||||
3. Observe logs.
|
||||
|
||||
**Expected:**
|
||||
- First N packets transmitted (total airtime ≤ 500 ms).
|
||||
- Subsequent packets log `"Packet dropped at TX time: duty-cycle exceeded"` from
|
||||
inside `delayed_send` (not just the advisory drop).
|
||||
- `airtime_mgr.get_stats()["utilization_percent"]` reads ≤ 100%.
|
||||
|
||||
**T7 — Normal single-packet forwarding not regressed**
|
||||
|
||||
1. Send one packet every 5 seconds (well within duty-cycle budget).
|
||||
2. Verify each packet is forwarded with correct airtime logged.
|
||||
3. Verify no lock contention warnings in the log.
|
||||
|
||||
**T8 — Local TX retry path (local_transmission=True) still works**
|
||||
|
||||
1. Send a command that triggers a local transmission (e.g. a ping reply).
|
||||
2. Briefly block the radio (simulate with a mock) so the first attempt fails.
|
||||
3. Verify the retry fires after 1 s and the packet is eventually transmitted.
|
||||
|
||||
---
|
||||
|
||||
## Proof of Correctness
|
||||
|
||||
### Why `asyncio.Lock` is sufficient (no OS-level synchronisation needed)
|
||||
|
||||
Python's asyncio event loop is **single-threaded**. All coroutines share one
|
||||
thread and only yield execution at `await` points. Between two consecutive
|
||||
`await` calls in a coroutine, the event loop does not switch to another coroutine.
|
||||
|
||||
`asyncio.Lock.acquire()` suspends the current coroutine if the lock is held,
|
||||
returning control to the event loop. `asyncio.Lock.release()` wakes the next
|
||||
waiter. Because `send_packet` is awaited inside the lock, no other TX task can
|
||||
run until the current one releases the lock and the event loop gets a chance to
|
||||
schedule the next waiter.
|
||||
|
||||
There is no possibility of the race seen with `threading.Lock` where an OS thread
|
||||
can be preempted mid-instruction.
|
||||
|
||||
### Why the advisory check in `__call__` cannot be removed
|
||||
|
||||
The advisory check is still necessary as a fast path. If it were removed, every
|
||||
incoming packet — even when the node is clearly at 100% duty-cycle — would
|
||||
schedule a `delayed_send` task that would sleep for the full TX delay (up to
|
||||
several seconds) before the lock drops it. Under a sustained flood of incoming
|
||||
packets this wastes memory and CPU. The advisory check prunes the queue early at
|
||||
negligible cost.
|
||||
|
||||
### Why `record_tx()` must be inside the lock (not before or after)
|
||||
|
||||
- **Before the send:** records airtime for a packet that may never be transmitted
|
||||
(send could fail, LBT could reject it). Budget is overcounted.
|
||||
- **After releasing the lock:** a second task could pass the authoritative
|
||||
`can_transmit()` check between `send_packet` returning and `record_tx()` being
|
||||
called — the TOCTOU window reopens at a smaller scale.
|
||||
- **Inside the lock, after a successful send:** the budget is debited exactly once
|
||||
for exactly the packets that were actually transmitted. The lock ensures no
|
||||
other task reads airtime state between the check and the debit.
|
||||
+70
-23
@@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import copy
|
||||
import logging
|
||||
import random
|
||||
import struct
|
||||
import time
|
||||
from collections import OrderedDict, deque
|
||||
@@ -136,6 +137,24 @@ class RepeaterHandler(BaseHandler):
|
||||
self._transport_keys_cache_time = 0
|
||||
self._transport_keys_cache_ttl = 60 # Cache for 60 seconds
|
||||
|
||||
# Serialise all radio TX calls.
|
||||
#
|
||||
# Background: since the queue loop dispatches each packet as an
|
||||
# asyncio.create_task, multiple _route_packet coroutines can have their
|
||||
# TX delay timers running concurrently — which is the intended behaviour
|
||||
# (firmware nodes do the same with a hardware timer). However, the
|
||||
# LoRa radio is half-duplex: it can only transmit one packet at a time.
|
||||
# Without serialisation, two tasks whose delay timers expire near-
|
||||
# simultaneously both call dispatcher.send_packet, interleaving SPI/serial
|
||||
# commands to the radio and both passing the LBT check before either has
|
||||
# actually transmitted.
|
||||
#
|
||||
# _tx_lock is acquired after each delay sleep and held for the entire
|
||||
# send_packet call. Delays still run concurrently; only the radio
|
||||
# access is serialised. This also eliminates the TOCTOU gap in duty-cycle
|
||||
# enforcement — see schedule_retransmit / delayed_send for details.
|
||||
self._tx_lock = asyncio.Lock()
|
||||
|
||||
self._start_background_tasks()
|
||||
|
||||
async def __call__(
|
||||
@@ -954,8 +973,6 @@ class RepeaterHandler(BaseHandler):
|
||||
|
||||
def _calculate_tx_delay(self, packet: Packet, snr: float = 0.0) -> float:
|
||||
|
||||
import random
|
||||
|
||||
packet_len = packet.get_raw_length()
|
||||
airtime_ms = self.airtime_mgr.calculate_airtime(packet_len)
|
||||
|
||||
@@ -1050,29 +1067,59 @@ class RepeaterHandler(BaseHandler):
|
||||
|
||||
async def delayed_send():
|
||||
await asyncio.sleep(delay)
|
||||
last_error = None
|
||||
|
||||
# Each attempt gets its own lock acquisition so the 1-second retry
|
||||
# backoff (local_transmission only) happens OUTSIDE the lock.
|
||||
# Holding _tx_lock across asyncio.sleep(1.0) would block every other
|
||||
# queued TX task for the full backoff period.
|
||||
#
|
||||
# Loop runs once for relayed packets, twice for local_transmission:
|
||||
# attempt 0 — initial try (no pre-sleep)
|
||||
# attempt 1 — retry after 1s backoff outside the lock
|
||||
for attempt in range(2 if local_transmission else 1):
|
||||
try:
|
||||
await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False)
|
||||
self._record_packet_sent(fwd_pkt)
|
||||
if attempt > 0:
|
||||
# Back-off OUTSIDE the lock — other tasks can transmit here.
|
||||
logger.info("Retrying local TX in 1s (lock released during backoff)...")
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
async with self._tx_lock:
|
||||
# ── Authoritative duty-cycle gate ──────────────────────────
|
||||
# The upfront can_transmit() call in __call__ is advisory: it
|
||||
# avoids scheduling packets obviously over budget, but cannot
|
||||
# prevent a race between tasks whose delay timers expire nearly
|
||||
# simultaneously. Both pass the advisory check before either
|
||||
# records airtime, then both attempt to transmit.
|
||||
#
|
||||
# Inside _tx_lock only one task runs at a time. The check and
|
||||
# record_tx() are effectively atomic — no TOCTOU window.
|
||||
# Re-checked every attempt because airtime state may change
|
||||
# while we wait for the lock or sleep through backoff.
|
||||
if airtime_ms > 0:
|
||||
self.airtime_mgr.record_tx(airtime_ms)
|
||||
packet_size = fwd_pkt.get_raw_length()
|
||||
logger.info(
|
||||
f"Retransmitted packet ({packet_size} bytes, "
|
||||
f"{airtime_ms:.1f}ms airtime)"
|
||||
)
|
||||
return
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
logger.error(f"Retransmit failed: {e}")
|
||||
if local_transmission and attempt == 0:
|
||||
logger.info("Retrying local TX in 1s...")
|
||||
await asyncio.sleep(1.0)
|
||||
else:
|
||||
raise
|
||||
if last_error is not None:
|
||||
raise last_error
|
||||
can_tx_now, _ = self.airtime_mgr.can_transmit(airtime_ms)
|
||||
if not can_tx_now:
|
||||
logger.warning(
|
||||
"Packet dropped at TX time: duty-cycle exceeded "
|
||||
"(airtime=%.1fms)", airtime_ms,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False)
|
||||
self._record_packet_sent(fwd_pkt)
|
||||
if airtime_ms > 0:
|
||||
self.airtime_mgr.record_tx(airtime_ms)
|
||||
packet_size = fwd_pkt.get_raw_length()
|
||||
logger.info(
|
||||
f"Retransmitted packet ({packet_size} bytes, "
|
||||
f"{airtime_ms:.1f}ms airtime)"
|
||||
)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Retransmit failed (attempt {attempt + 1}): {e}")
|
||||
if local_transmission and attempt == 0:
|
||||
pass # release lock, outer loop sleeps, then retries
|
||||
else:
|
||||
raise
|
||||
|
||||
return asyncio.create_task(delayed_send())
|
||||
|
||||
|
||||
@@ -0,0 +1,279 @@
|
||||
"""
|
||||
Tests for TX lock serialisation and duty-cycle TOCTOU fix.
|
||||
Addresses the three concerns raised in PR 190 review:
|
||||
|
||||
1. Concurrent delayed_sends must not interleave send_packet calls.
|
||||
2. Duty-cycle TOCTOU must be fixed: the second packet is dropped when the
|
||||
first consumes the airtime budget inside the lock.
|
||||
3. Local retry must NOT hold _tx_lock during the 1-second backoff sleep —
|
||||
other queued packets must be able to transmit during that window.
|
||||
|
||||
Run with:
|
||||
python -m pytest tests/test_tx_lock.py -v
|
||||
or:
|
||||
python -m unittest tests.test_tx_lock
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import unittest
|
||||
from collections import deque
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Minimal handler factory
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_handler():
|
||||
"""
|
||||
Return a RepeaterHandler instance with all external I/O mocked.
|
||||
|
||||
Uses __new__ + manual attribute injection to bypass StorageCollector,
|
||||
radio hardware, and other heavy dependencies that are irrelevant to the
|
||||
TX lock behaviour under test.
|
||||
"""
|
||||
from repeater.engine import RepeaterHandler
|
||||
|
||||
radio = MagicMock()
|
||||
radio.spreading_factor = 9
|
||||
radio.bandwidth = 62500
|
||||
radio.coding_rate = 5
|
||||
radio.preamble_length = 17
|
||||
radio.frequency = 915_000_000
|
||||
radio.tx_power = 14
|
||||
|
||||
dispatcher = MagicMock()
|
||||
dispatcher.radio = radio
|
||||
dispatcher.local_identity = None
|
||||
dispatcher.send_packet = AsyncMock()
|
||||
|
||||
h = RepeaterHandler.__new__(RepeaterHandler)
|
||||
h.config = {
|
||||
"repeater": {"mode": "forward", "cache_ttl": 3600,
|
||||
"send_advert_interval_hours": 0},
|
||||
"delays": {"tx_delay_factor": 1.0, "direct_tx_delay_factor": 0.5},
|
||||
"duty_cycle": {"enforcement_enabled": True,
|
||||
"max_airtime_per_minute": 3600},
|
||||
"storage": {},
|
||||
"mesh": {},
|
||||
}
|
||||
h.dispatcher = dispatcher
|
||||
h.airtime_mgr = MagicMock()
|
||||
h.airtime_mgr.can_transmit.return_value = (True, 0.0)
|
||||
h.airtime_mgr.calculate_airtime.return_value = 100.0
|
||||
h._tx_lock = asyncio.Lock()
|
||||
h.sent_flood_count = 0
|
||||
h.sent_direct_count = 0
|
||||
# Stub out _record_packet_sent so it doesn't touch packet.header constants
|
||||
h._record_packet_sent = MagicMock()
|
||||
return h
|
||||
|
||||
|
||||
def _make_packet(size: int = 50) -> MagicMock:
|
||||
pkt = MagicMock()
|
||||
pkt.get_raw_length.return_value = size
|
||||
pkt.header = 0x00
|
||||
return pkt
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestTxLockSerialisation(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
# ── Test 1: no interleaving ─────────────────────────────────────────────
|
||||
|
||||
async def test_concurrent_sends_do_not_interleave(self):
|
||||
"""
|
||||
Two delayed_sends with identical delays race to the radio.
|
||||
send_packet must never be called while another call is already
|
||||
in-flight — i.e. _tx_lock must gate them sequentially.
|
||||
"""
|
||||
h = _make_handler()
|
||||
pkt = _make_packet()
|
||||
|
||||
in_flight = [False]
|
||||
overlap_detected = [False]
|
||||
|
||||
async def send_with_overlap_check(*args, **kwargs):
|
||||
if in_flight[0]:
|
||||
overlap_detected[0] = True
|
||||
in_flight[0] = True
|
||||
await asyncio.sleep(0.05) # simulate ~50ms radio TX
|
||||
in_flight[0] = False
|
||||
|
||||
h.dispatcher.send_packet.side_effect = send_with_overlap_check
|
||||
|
||||
# Both tasks use the same tiny delay so their timers expire together.
|
||||
t1 = await h.schedule_retransmit(pkt, delay=0.01, airtime_ms=0)
|
||||
t2 = await h.schedule_retransmit(pkt, delay=0.01, airtime_ms=0)
|
||||
await asyncio.gather(t1, t2, return_exceptions=True)
|
||||
|
||||
self.assertFalse(
|
||||
overlap_detected[0],
|
||||
"send_packet was entered while another call was already in-flight "
|
||||
"— _tx_lock is not serialising correctly",
|
||||
)
|
||||
self.assertEqual(h.dispatcher.send_packet.call_count, 2,
|
||||
"Expected exactly 2 send_packet calls")
|
||||
|
||||
# ── Test 2: TOCTOU duty-cycle fix ──────────────────────────────────────
|
||||
|
||||
async def test_duty_cycle_toctou_is_fixed(self):
|
||||
"""
|
||||
When two tasks both pass the advisory can_transmit() check in __call__
|
||||
before either has recorded airtime, the authoritative check inside
|
||||
_tx_lock must ensure only one of them actually transmits.
|
||||
|
||||
Simulated here by making can_transmit return True for the first
|
||||
in-lock check and False for every subsequent one.
|
||||
"""
|
||||
h = _make_handler()
|
||||
pkt = _make_packet()
|
||||
airtime_ms = 100.0
|
||||
|
||||
# First lock-holder gets True; second gets False (budget consumed).
|
||||
allow = [True]
|
||||
|
||||
def can_tx(ms):
|
||||
if allow[0]:
|
||||
allow[0] = False
|
||||
return (True, 0.0)
|
||||
return (False, 5.0)
|
||||
|
||||
h.airtime_mgr.can_transmit.side_effect = can_tx
|
||||
|
||||
# Both tasks start simultaneously (delay=0).
|
||||
t1 = await h.schedule_retransmit(pkt, delay=0.0, airtime_ms=airtime_ms)
|
||||
t2 = await h.schedule_retransmit(pkt, delay=0.0, airtime_ms=airtime_ms)
|
||||
await asyncio.gather(t1, t2, return_exceptions=True)
|
||||
|
||||
self.assertEqual(
|
||||
h.dispatcher.send_packet.call_count, 1,
|
||||
"Both packets were sent — duty-cycle TOCTOU race was NOT fixed",
|
||||
)
|
||||
|
||||
# ── Test 3: retry backoff does not hold the lock ────────────────────────
|
||||
|
||||
async def test_local_retry_releases_lock_during_backoff(self):
|
||||
"""
|
||||
When a local_transmission send fails on the first attempt, the 1-second
|
||||
backoff sleep must happen with _tx_lock released.
|
||||
|
||||
We schedule:
|
||||
- pkt_local: local_transmission=True, delay=0s, fails on attempt 1
|
||||
- pkt_other: local_transmission=False, delay=0.1s
|
||||
|
||||
pkt_other fires at ~0.1s. Without the fix, the backoff sleep holds
|
||||
the lock until ~1.0s, so pkt_other would have to wait. With the fix,
|
||||
pkt_other sends freely at ~0.1s, well before pkt_local retries at ~1.0s.
|
||||
"""
|
||||
h = _make_handler()
|
||||
pkt_local = _make_packet()
|
||||
pkt_other = _make_packet()
|
||||
|
||||
send_times: dict[int, float] = {}
|
||||
first_local_call = [True]
|
||||
|
||||
async def tracked_send(*args, **kwargs):
|
||||
pkt = args[0]
|
||||
if pkt is pkt_local and first_local_call[0]:
|
||||
first_local_call[0] = False
|
||||
raise RuntimeError("simulated transient radio failure")
|
||||
send_times[id(pkt)] = time.monotonic()
|
||||
|
||||
h.dispatcher.send_packet.side_effect = tracked_send
|
||||
|
||||
t_local = await h.schedule_retransmit(
|
||||
pkt_local, delay=0.0, airtime_ms=0, local_transmission=True
|
||||
)
|
||||
t_other = await h.schedule_retransmit(
|
||||
pkt_other, delay=0.1, airtime_ms=0, local_transmission=False
|
||||
)
|
||||
await asyncio.gather(t_local, t_other, return_exceptions=True)
|
||||
|
||||
self.assertIn(id(pkt_other), send_times,
|
||||
"pkt_other was never sent")
|
||||
self.assertIn(id(pkt_local), send_times,
|
||||
"pkt_local retry was never sent")
|
||||
|
||||
# pkt_other fires at ~0.1s; pkt_local retry fires at ~1.0s.
|
||||
# If the lock were held during backoff, pkt_other would block until ~1.0s
|
||||
# and would be recorded AFTER pkt_local's retry — the assertion below
|
||||
# would fail.
|
||||
self.assertLess(
|
||||
send_times[id(pkt_other)],
|
||||
send_times[id(pkt_local)],
|
||||
"pkt_other sent AFTER pkt_local retry — "
|
||||
"_tx_lock was still held during the 1-second backoff sleep",
|
||||
)
|
||||
|
||||
# ── Test 4: non-local single-attempt re-raises on failure ──────────────
|
||||
|
||||
async def test_non_local_failure_propagates(self):
|
||||
"""A relayed (non-local) packet that fails send_packet raises immediately."""
|
||||
h = _make_handler()
|
||||
pkt = _make_packet()
|
||||
|
||||
h.dispatcher.send_packet.side_effect = RuntimeError("radio error")
|
||||
|
||||
task = await h.schedule_retransmit(pkt, delay=0.0, airtime_ms=0,
|
||||
local_transmission=False)
|
||||
with self.assertRaises(RuntimeError):
|
||||
await task
|
||||
|
||||
# Only one attempt should have been made.
|
||||
self.assertEqual(h.dispatcher.send_packet.call_count, 1)
|
||||
|
||||
# ── Test 5: duty-cycle check re-runs after backoff ──────────────────────
|
||||
|
||||
async def test_duty_cycle_rechecked_on_retry(self):
|
||||
"""
|
||||
If the duty cycle is exhausted during the 1-second backoff, the retry
|
||||
attempt must still be dropped — i.e. the duty-cycle gate runs on every
|
||||
lock acquisition, not just the first.
|
||||
"""
|
||||
h = _make_handler()
|
||||
pkt = _make_packet()
|
||||
|
||||
# First attempt: send_packet raises → triggers backoff.
|
||||
# Between attempts the budget is consumed, so the retry lock sees False.
|
||||
call_seq = iter([
|
||||
# (can_transmit result, send_packet behaviour)
|
||||
(True, RuntimeError("transient failure")), # attempt 0: passes gate, send fails
|
||||
(False, None), # attempt 1: gate rejects
|
||||
])
|
||||
|
||||
async def send_side_effect(*args, **kwargs):
|
||||
_, exc = next(call_seq_sends)
|
||||
if exc:
|
||||
raise exc
|
||||
|
||||
transmit_seq = iter([(True, 0.0), (False, 5.0)])
|
||||
h.airtime_mgr.can_transmit.side_effect = lambda ms: next(transmit_seq)
|
||||
|
||||
send_calls = [0]
|
||||
|
||||
async def failing_then_gone(*args, **kwargs):
|
||||
send_calls[0] += 1
|
||||
if send_calls[0] == 1:
|
||||
raise RuntimeError("transient failure")
|
||||
# Should not reach here on attempt 1 (gate rejects)
|
||||
pytest_fail = AssertionError("send_packet called on attempt 1 despite gate rejection")
|
||||
raise pytest_fail
|
||||
|
||||
h.dispatcher.send_packet.side_effect = failing_then_gone
|
||||
|
||||
task = await h.schedule_retransmit(
|
||||
pkt, delay=0.0, airtime_ms=100.0, local_transmission=True
|
||||
)
|
||||
await task # should complete without error (gate returns silently)
|
||||
|
||||
self.assertEqual(send_calls[0], 1,
|
||||
"send_packet called on retry despite duty-cycle rejection")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user