Merge branch 'fix/companion-message-send' into feat/pre-1160-compatibility-sendfix

Bring in companion transmission handling improvements from RepeaterHandler.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
agessaman
2026-05-30 21:42:26 -07:00
6 changed files with 126 additions and 17 deletions
+33 -12
View File
@@ -162,7 +162,7 @@ class RepeaterHandler(BaseHandler):
async def __call__(
self, packet: Packet, metadata: Optional[dict] = None, local_transmission: bool = False
) -> None:
) -> bool:
if metadata is None:
metadata = {}
@@ -259,19 +259,23 @@ class RepeaterHandler(BaseHandler):
f"Duty-cycle limit: deferring local TX by {wait_time:.1f}s "
f"(airtime={airtime_ms:.1f}ms)"
)
self.forwarded_count += 1
transmitted = True
tx_task = await self.schedule_retransmit(
fwd_pkt, deferred_delay, airtime_ms, local_transmission=True
)
try:
await tx_task
tx_success = await tx_task
except Exception as e:
self.forwarded_count -= 1
transmitted = False
drop_reason = "TX failed (deferred)"
logger.warning(f"Deferred local TX failed: {e}")
raise
if not tx_success:
transmitted = False
drop_reason = "TX failed (deferred)"
self.dropped_count += 1
else:
self.forwarded_count += 1
transmitted = True
tx_metadata = getattr(fwd_pkt, "_tx_metadata", None)
if tx_metadata:
lbt_attempts = tx_metadata.get("lbt_attempts", 0)
@@ -292,19 +296,23 @@ class RepeaterHandler(BaseHandler):
self.dropped_count += 1
drop_reason = "Duty cycle limit"
else:
self.forwarded_count += 1
transmitted = True
tx_task = await self.schedule_retransmit(
fwd_pkt, delay, airtime_ms, local_transmission=local_transmission
)
try:
await tx_task
tx_success = await tx_task
except Exception as e:
self.forwarded_count -= 1
transmitted = False
drop_reason = "TX failed"
logger.warning(f"Local TX failed: {e}")
raise
if not tx_success:
transmitted = False
drop_reason = "TX failed"
self.dropped_count += 1
else:
self.forwarded_count += 1
transmitted = True
tx_metadata = getattr(fwd_pkt, "_tx_metadata", None)
if tx_metadata:
lbt_attempts = tx_metadata.get("lbt_attempts", 0)
@@ -415,6 +423,8 @@ class RepeaterHandler(BaseHandler):
# Not a duplicate or first occurrence
self._append_recent_packet(packet_record)
return transmitted
def log_trace_record(self, packet_record: dict) -> None:
"""Manually log a packet trace record (used by external callers)"""
self._append_recent_packet(packet_record)
@@ -1128,10 +1138,20 @@ class RepeaterHandler(BaseHandler):
"Packet dropped at TX time: duty-cycle exceeded (airtime=%.1fms)",
airtime_ms,
)
return
return False
try:
await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False)
sent = await self.dispatcher.send_packet(
fwd_pkt, wait_for_ack=False
)
if not sent:
logger.warning(
"Retransmit failed (attempt %d): dispatcher returned false",
attempt + 1,
)
if local_transmission and attempt == 0:
continue
return False
self._record_packet_sent(fwd_pkt)
if airtime_ms > 0:
self.airtime_mgr.record_tx(airtime_ms)
@@ -1140,13 +1160,14 @@ class RepeaterHandler(BaseHandler):
f"Retransmitted packet ({packet_size} bytes, "
f"{airtime_ms:.1f}ms airtime)"
)
return
return True
except Exception as e:
logger.error(f"Retransmit failed (attempt {attempt + 1}): {e}")
if local_transmission and attempt == 0:
pass # release lock, outer loop sleeps, then retries
else:
raise
return False
return asyncio.create_task(delayed_send())
+1 -1
View File
@@ -1099,7 +1099,7 @@ class RepeaterDaemon:
logger.debug("Marked own advert as seen in duplicate cache")
logger.info(
"Sent flood advert '%s' at (% .6f, % .6f) source=%s",
"Sent flood advert '%s' at (%.6f, %.6f) source=%s",
node_name,
latitude,
longitude,
+36 -2
View File
@@ -172,7 +172,12 @@ class PacketRouter:
# (avoids duty-cycle or dispatcher races where a later packet goes out first)
async with self._inject_lock:
# Use local_transmission=True to bypass forwarding logic
await self.daemon.repeater_handler(packet, metadata, local_transmission=True)
sent = await self.daemon.repeater_handler(
packet, metadata, local_transmission=True
)
if not sent:
logger.warning("Injected packet failed local transmission")
return False
# Mark so when this packet is dequeued we don't pass to engine again (avoid double-send / double-count)
packet._injected_for_tx = True
@@ -180,6 +185,28 @@ class PacketRouter:
# Enqueue so router can deliver to companion(s): TXT_MSG -> dest bridge, ACK -> all bridges (sender sees ACK)
await self.enqueue(packet)
if wait_for_ack:
ptype = getattr(packet, "get_payload_type", lambda: None)()
if ptype not in {
AckHandler.payload_type(),
AdvertHandler.payload_type(),
}:
dispatcher = getattr(self.daemon, "dispatcher", None)
if dispatcher and hasattr(dispatcher, "wait_for_ack"):
try:
expected_crc = packet.get_crc()
ack_ok = await dispatcher.wait_for_ack(
expected_crc, timeout=5.0
)
if not ack_ok:
logger.warning(
"Injected packet ACK timeout (crc=%08X)", expected_crc
)
return False
except Exception as e:
logger.warning("Injected packet ACK wait failed: %s", e)
return False
packet_len = len(packet.payload) if packet.payload else 0
logger.debug(
f"Injected packet processed by engine as local transmission ({packet_len} bytes)"
@@ -461,4 +488,11 @@ class PacketRouter:
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
await self.daemon.repeater_handler(packet, metadata)
sent = await self.daemon.repeater_handler(packet, metadata)
if sent is False:
logger.warning(
"Inbound packet not transmitted by repeater handler "
"(type=%s, header=0x%02x)",
payload_type,
getattr(packet, "header", 0),
)
+40
View File
@@ -1072,6 +1072,46 @@ class TestTxMode:
await asyncio.sleep(0)
handler.dispatcher.send_packet.assert_called_once()
async def test_local_tx_exception_does_not_underflow_forwarded_count(self, handler):
"""TX exceptions must not decrement forwarded_count before any increment happened."""
handler.config["repeater"]["mode"] = "forward"
pkt = _make_flood_packet()
handler.forwarded_count = 0
async def _boom():
raise RuntimeError("simulated tx failure")
with patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock):
with patch.object(
handler, "schedule_retransmit", new=AsyncMock(return_value=asyncio.create_task(_boom()))
):
with pytest.raises(RuntimeError, match="simulated tx failure"):
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=True)
assert handler.forwarded_count == 0
async def test_rx_tx_failure_increments_dropped_count_and_preserves_accounting(self, handler):
"""A graceful TX failure should count as dropped and keep counters balanced."""
handler.config["repeater"]["mode"] = "forward"
pkt = _make_flood_packet()
handler.rx_count = 0
handler.forwarded_count = 0
handler.dropped_count = 0
async def _tx_false():
return False
with patch.object(
handler, "schedule_retransmit", new=AsyncMock(return_value=asyncio.create_task(_tx_false()))
):
transmitted = await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=False)
assert transmitted is False
assert handler.rx_count == 1
assert handler.forwarded_count == 0
assert handler.dropped_count == 1
assert handler.rx_count == handler.forwarded_count + handler.dropped_count
# ===================================================================
# 16. Airtime calculation correctness
+14 -1
View File
@@ -46,7 +46,7 @@ from repeater.packet_router import (
def _make_daemon():
"""Minimal daemon that satisfies PacketRouter without touching hardware."""
daemon = MagicMock()
daemon.repeater_handler = AsyncMock(return_value=None)
daemon.repeater_handler = AsyncMock(return_value=True)
daemon.trace_helper = None
daemon.discovery_helper = None
daemon.advert_helper = None
@@ -201,6 +201,19 @@ class TestInFlightCap(unittest.IsolatedAsyncioTestCase):
finally:
await router.stop()
async def test_non_injected_handler_false_is_logged(self):
"""Inbound packets should log when repeater_handler reports TX failure."""
daemon = _make_daemon()
daemon.repeater_handler = AsyncMock(return_value=False)
router = PacketRouter(daemon)
pkt = _make_packet(payload_type=0xFF)
with patch("repeater.packet_router.logger.warning") as mock_warn:
await router._route_packet(pkt)
daemon.repeater_handler.assert_awaited_once()
mock_warn.assert_called()
# ── 3. Shutdown: in-flight tasks drained ────────────────────────────────
async def test_stop_waits_for_in_flight_tasks(self):
+2 -1
View File
@@ -255,7 +255,8 @@ class TestTxLockSerialisation(unittest.IsolatedAsyncioTestCase):
task = await h.schedule_retransmit(
pkt, delay=0.0, airtime_ms=100.0, local_transmission=True
)
await task # should complete without error (gate returns silently)
result = await task
self.assertFalse(result, "Duty-cycle drop should report TX failure")
self.assertEqual(
send_calls[0], 1, "send_packet called on retry despite duty-cycle rejection"