Files
pyMC_Repeater/tests/test_engine.py
T
2026-06-02 10:31:47 +01:00

2142 lines
82 KiB
Python

"""
tests for pyMC_Repeater engine.py — RepeaterHandler.
Covers: flood_forward, direct_forward, process_packet, duplicate detection,
mark_seen, validate_packet, packet scoring, TX delay, cache management,
airtime duty-cycle, TX mode (forward/monitor/no_tx), and config reloading.
"""
import asyncio
import base64
import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pymc_core.protocol import Packet, PacketBuilder
from pymc_core.protocol.constants import (
MAX_PATH_SIZE,
PH_ROUTE_MASK,
PH_TYPE_SHIFT,
ROUTE_TYPE_DIRECT,
ROUTE_TYPE_FLOOD,
ROUTE_TYPE_TRANSPORT_DIRECT,
ROUTE_TYPE_TRANSPORT_FLOOD,
)
# ---------------------------------------------------------------------------
# Helpers — build minimal config / mocks needed by RepeaterHandler.__init__
# ---------------------------------------------------------------------------
LOCAL_HASH = 0xAB # repeater's own 1-byte path hash
def _make_config(**overrides) -> dict:
"""Return a minimal valid config dict for RepeaterHandler."""
cfg = {
"repeater": {
"mode": "forward",
"cache_ttl": 3600,
"use_score_for_tx": False,
"score_threshold": 0.3,
"send_advert_interval_hours": 0, # off in tests
"node_name": "test-node",
},
"mesh": {
"unscoped_flood_allow": True,
"loop_detect": "off",
},
"delays": {
"tx_delay_factor": 1.0,
"direct_tx_delay_factor": 0.5,
},
"duty_cycle": {
"max_airtime_per_minute": 3600,
"enforcement_enabled": True,
},
"radio": {
"spreading_factor": 8,
"bandwidth": 125000,
"coding_rate": 8,
"preamble_length": 17,
},
}
# Merge overrides
for key, val in overrides.items():
if isinstance(val, dict) and key in cfg:
cfg[key].update(val)
else:
cfg[key] = val
return cfg
def _make_radio():
"""Return a mock radio with sensible defaults."""
radio = MagicMock()
radio.spreading_factor = 8
radio.bandwidth = 125000
radio.coding_rate = 8
radio.preamble_length = 17
radio.frequency = 915000000
radio.tx_power = 14
return radio
def _make_dispatcher(radio=None):
"""Return a mock dispatcher with a radio and local_identity."""
dispatcher = MagicMock()
dispatcher.radio = radio or _make_radio()
dispatcher.local_identity = MagicMock()
dispatcher.send_packet = AsyncMock()
return dispatcher
@pytest.fixture()
def handler():
"""Create a RepeaterHandler with mocked external dependencies."""
config = _make_config()
dispatcher = _make_dispatcher()
with (
patch("repeater.engine.StorageCollector"),
patch("repeater.engine.RepeaterHandler._start_background_tasks"),
):
from repeater.engine import RepeaterHandler
h = RepeaterHandler(config, dispatcher, LOCAL_HASH)
return h
def _make_flood_packet(
payload: bytes = b"\x01\x02\x03\x04", path: bytes = b"", payload_type: int = 0x01
) -> Packet:
"""Build a FLOOD-routed packet."""
pkt = Packet()
# header: route=FLOOD(0x01), payload_type shifted, version=0
pkt.header = ROUTE_TYPE_FLOOD | (payload_type << PH_TYPE_SHIFT)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
pkt.path = bytearray(path)
pkt.path_len = len(path)
return pkt
def _make_direct_packet(
payload: bytes = b"\x01\x02\x03\x04", path: bytes = None, payload_type: int = 0x01
) -> Packet:
"""Build a DIRECT-routed packet with path[0] == LOCAL_HASH by default."""
if path is None:
path = bytes([LOCAL_HASH, 0xCC, 0xDD])
pkt = Packet()
pkt.header = ROUTE_TYPE_DIRECT | (payload_type << PH_TYPE_SHIFT)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
pkt.path = bytearray(path)
pkt.path_len = len(path)
return pkt
def _make_transport_flood_packet(
payload: bytes = b"\x01\x02\x03\x04",
path: bytes = b"",
payload_type: int = 0x01,
transport_codes=(0x1234, 0x5678),
) -> Packet:
"""Build a TRANSPORT_FLOOD-routed packet."""
pkt = Packet()
pkt.header = ROUTE_TYPE_TRANSPORT_FLOOD | (payload_type << PH_TYPE_SHIFT)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
pkt.path = bytearray(path)
pkt.path_len = len(path)
pkt.transport_codes = list(transport_codes)
return pkt
def _make_transport_direct_packet(
payload: bytes = b"\x01\x02\x03\x04",
path: bytes = None,
payload_type: int = 0x01,
transport_codes=(0x1234, 0x5678),
) -> Packet:
"""Build a TRANSPORT_DIRECT-routed packet with path[0] == LOCAL_HASH."""
if path is None:
path = bytes([LOCAL_HASH, 0xCC])
pkt = Packet()
pkt.header = ROUTE_TYPE_TRANSPORT_DIRECT | (payload_type << PH_TYPE_SHIFT)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
pkt.path = bytearray(path)
pkt.path_len = len(path)
pkt.transport_codes = list(transport_codes)
return pkt
# ===================================================================
# 1. flood_forward
# ===================================================================
class TestFloodForward:
"""flood_forward: validation, duplicate suppression, path append."""
def test_valid_flood_returns_packet(self, handler):
pkt = _make_flood_packet()
result = handler.flood_forward(pkt)
assert result is not None
assert result is pkt # mutated in-place
def test_local_hash_appended_to_path(self, handler):
pkt = _make_flood_packet(path=b"\x11\x22")
result = handler.flood_forward(pkt)
assert result.path[-1] == LOCAL_HASH
assert list(result.path) == [0x11, 0x22, LOCAL_HASH]
assert result.path_len == 3
def test_empty_path_gets_local_hash(self, handler):
pkt = _make_flood_packet(path=b"")
result = handler.flood_forward(pkt)
assert list(result.path) == [LOCAL_HASH]
assert result.path_len == 1
def test_duplicate_flood_dropped(self, handler):
pkt = _make_flood_packet()
handler.flood_forward(pkt)
pkt2 = _make_flood_packet() # same payload → same hash
result = handler.flood_forward(pkt2)
assert result is None
assert pkt2.drop_reason == "Duplicate"
def test_empty_payload_rejected(self, handler):
pkt = _make_flood_packet(payload=b"")
pkt.payload_len = 0
result = handler.flood_forward(pkt)
assert result is None
assert "Empty payload" in pkt.drop_reason
def test_none_payload_rejected(self, handler):
pkt = _make_flood_packet()
pkt.payload = None
result = handler.flood_forward(pkt)
assert result is None
def test_path_at_max_rejected(self, handler):
pkt = _make_flood_packet(path=bytes(range(MAX_PATH_SIZE)))
result = handler.flood_forward(pkt)
assert result is None
assert "Path length" in pkt.drop_reason
def test_do_not_retransmit_dropped(self, handler):
pkt = _make_flood_packet()
pkt.mark_do_not_retransmit()
result = handler.flood_forward(pkt)
assert result is None
assert "do not retransmit" in pkt.drop_reason.lower()
def test_unscoped_flood_deny_plain_flood(self, handler):
handler.config["mesh"]["unscoped_flood_allow"] = False
pkt = _make_flood_packet()
# When unscoped_flood_allow=False, flood_forward should fail on a packet type without a transport code defined
result = handler.flood_forward(pkt)
assert result is None
def test_hash_computed_before_path_append(self, handler):
"""mark_seen must use the pre-append hash so duplicate detection works
when another node sends the same packet with or without our hash."""
pkt1 = _make_flood_packet(payload=b"\xaa\xbb")
hash_before = pkt1.calculate_packet_hash().hex().upper()
handler.flood_forward(pkt1)
# The hash stored in seen_packets should be the PRE-append hash
assert hash_before in handler.seen_packets
def test_path_len_updated_after_append(self, handler):
pkt = _make_flood_packet(path=b"\x11")
handler.flood_forward(pkt)
assert pkt.path_len == len(pkt.path) == 2
def test_different_payloads_not_duplicate(self, handler):
pkt1 = _make_flood_packet(payload=b"\x01")
pkt2 = _make_flood_packet(payload=b"\x02")
r1 = handler.flood_forward(pkt1)
r2 = handler.flood_forward(pkt2)
assert r1 is not None
assert r2 is not None
def test_path_none_is_handled(self, handler):
pkt = _make_flood_packet()
pkt.path = None
pkt.path_len = 0
result = handler.flood_forward(pkt)
assert result is not None
assert list(result.path) == [LOCAL_HASH]
# ===================================================================
# 2. direct_forward
# ===================================================================
class TestDirectForward:
"""direct_forward: next-hop check, path consumption, duplicate suppression."""
def test_valid_direct_returns_packet(self, handler):
pkt = _make_direct_packet()
result = handler.direct_forward(pkt)
assert result is not None
def test_path_consumed(self, handler):
pkt = _make_direct_packet(path=bytes([LOCAL_HASH, 0xCC, 0xDD]))
result = handler.direct_forward(pkt)
assert list(result.path) == [0xCC, 0xDD]
assert result.path_len == 2
def test_single_hop_path_consumed(self, handler):
"""Single hop to us: we strip and return packet with empty path (forward so it can reach destination)."""
pkt = _make_direct_packet(path=bytes([LOCAL_HASH]))
result = handler.direct_forward(pkt)
assert result is not None
assert list(result.path) == []
assert result.path_len == 0
def test_wrong_next_hop_dropped(self, handler):
pkt = _make_direct_packet(path=bytes([0xFF, 0xCC]))
result = handler.direct_forward(pkt)
assert result is None
assert "not for us" in pkt.drop_reason
def test_empty_path_dropped(self, handler):
pkt = _make_direct_packet(path=b"")
pkt.path_len = 0
result = handler.direct_forward(pkt)
assert result is None
assert "no path" in pkt.drop_reason
def test_none_path_dropped(self, handler):
pkt = Packet()
pkt.header = ROUTE_TYPE_DIRECT | (0x01 << PH_TYPE_SHIFT)
pkt.payload = bytearray(b"\x01\x02")
pkt.payload_len = 2
pkt.path = None
pkt.path_len = 0
result = handler.direct_forward(pkt)
assert result is None
def test_duplicate_direct_dropped(self, handler):
pkt = _make_direct_packet()
handler.direct_forward(pkt)
pkt2 = _make_direct_packet() # same payload → same hash
result = handler.direct_forward(pkt2)
assert result is None
assert pkt2.drop_reason == "Duplicate"
def test_hash_computed_before_path_consume(self, handler):
"""mark_seen hash must match the packet as received, before path[0] removal."""
pkt = _make_direct_packet(path=bytes([LOCAL_HASH, 0xCC]))
hash_before = pkt.calculate_packet_hash().hex().upper()
handler.direct_forward(pkt)
assert hash_before in handler.seen_packets
def test_path_len_updated_after_consume(self, handler):
pkt = _make_direct_packet(path=bytes([LOCAL_HASH, 0xCC, 0xDD]))
handler.direct_forward(pkt)
assert pkt.path_len == len(pkt.path) == 2
# ===================================================================
# 3. process_packet — route dispatch
# ===================================================================
class TestProcessPacket:
"""process_packet routes to flood_forward or direct_forward."""
def test_flood_route_dispatched(self, handler):
pkt = _make_flood_packet()
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, delay = result
# Flood appends local hash
assert fwd_pkt.path[-1] == LOCAL_HASH
def test_direct_route_dispatched(self, handler):
pkt = _make_direct_packet()
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, delay = result
# Direct consumed first hop
assert LOCAL_HASH not in fwd_pkt.path
def test_transport_flood_dispatched(self, handler):
pkt = _make_transport_flood_packet()
with patch.object(handler, "_check_transport_codes", return_value=(True, "")):
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert fwd_pkt.path[-1] == LOCAL_HASH
def test_transport_direct_dispatched(self, handler):
pkt = _make_transport_direct_packet()
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
def test_unknown_route_type_dropped(self, handler):
"""A header with route bits = 0x03 is TRANSPORT_DIRECT which IS handled.
We hackily create a truly unknown route by patching both masks."""
pkt = _make_flood_packet()
# All 4 route types are handled. We can verify the fallback by
# ensuring flood forward fails, e.g. empty payload → returns None.
pkt.payload = None
result = handler.process_packet(pkt, snr=0.0)
assert result is None
def test_returns_tuple_with_delay(self, handler):
pkt = _make_flood_packet()
result = handler.process_packet(pkt, snr=5.0)
assert isinstance(result, tuple)
assert len(result) == 2
fwd_pkt, delay = result
assert isinstance(delay, float)
assert delay >= 0.0
def test_flood_forward_failure_returns_none(self, handler):
pkt = _make_flood_packet(payload=b"")
pkt.payload_len = 0
result = handler.process_packet(pkt, snr=0.0)
assert result is None
def test_direct_forward_failure_returns_none(self, handler):
pkt = _make_direct_packet(path=bytes([0xFF])) # wrong hop
result = handler.process_packet(pkt, snr=0.0)
assert result is None
# ===================================================================
# 4. is_duplicate / mark_seen / cache management
# ===================================================================
class TestDuplicateDetection:
"""Duplicate tracking, TTL clean-up, and cache eviction."""
def test_unseen_packet_not_duplicate(self, handler):
pkt = _make_flood_packet()
assert handler.is_duplicate(pkt) is False
def test_after_mark_seen_is_duplicate(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
assert handler.is_duplicate(pkt) is True
def test_different_packets_independent(self, handler):
pkt1 = _make_flood_packet(payload=b"\x01")
pkt2 = _make_flood_packet(payload=b"\x02")
handler.mark_seen(pkt1)
assert handler.is_duplicate(pkt1) is True
assert handler.is_duplicate(pkt2) is False
def test_cache_eviction_at_max_size(self, handler):
handler.max_cache_size = 5
packets = [_make_flood_packet(payload=bytes([i])) for i in range(6)]
for p in packets:
handler.mark_seen(p)
# Oldest (packets[0]) should have been evicted
assert handler.is_duplicate(packets[0]) is False
assert handler.is_duplicate(packets[5]) is True
def test_cleanup_cache_removes_expired(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
pkt_hash = pkt.calculate_packet_hash().hex().upper()
# Manually expire it
handler.seen_packets[pkt_hash] = time.time() - handler.cache_ttl - 1
handler.cleanup_cache()
assert handler.is_duplicate(pkt) is False
def test_cleanup_cache_keeps_fresh(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
handler.cleanup_cache()
assert handler.is_duplicate(pkt) is True
def test_mark_seen_stores_hex_upper_key(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
for key in handler.seen_packets:
assert key == key.upper()
# ===================================================================
# 5. validate_packet
# ===================================================================
class TestValidatePacket:
"""validate_packet: empty payload, oversized path."""
def test_valid_packet(self, handler):
pkt = _make_flood_packet()
valid, reason = handler.validate_packet(pkt)
assert valid is True
assert reason == ""
def test_empty_payload_fails(self, handler):
pkt = _make_flood_packet(payload=b"")
pkt.payload = None
valid, reason = handler.validate_packet(pkt)
assert valid is False
assert "Empty" in reason
def test_path_at_max_fails(self, handler):
pkt = _make_flood_packet(path=bytes(range(MAX_PATH_SIZE)))
valid, reason = handler.validate_packet(pkt)
assert valid is False
assert "MAX_PATH_SIZE" in reason
def test_path_one_below_max_passes(self, handler):
pkt = _make_flood_packet(path=bytes(range(MAX_PATH_SIZE - 1)))
valid, reason = handler.validate_packet(pkt)
assert valid is True
def test_none_packet(self, handler):
valid, reason = handler.validate_packet(None)
assert valid is False
# ===================================================================
# 6. calculate_packet_score — static method
# ===================================================================
class TestPacketScore:
"""Score: SNR thresholds, collision penalty, clamping."""
def test_below_threshold_returns_zero(self):
from repeater.engine import RepeaterHandler
# SF8 threshold is -10.0
score = RepeaterHandler.calculate_packet_score(snr=-15.0, packet_len=50, spreading_factor=8)
assert score == 0.0
def test_at_threshold_returns_zero(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=-10.0, packet_len=50, spreading_factor=8)
assert score == 0.0
def test_above_threshold_positive(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=0.0, packet_len=50, spreading_factor=8)
assert score > 0.0
def test_high_snr_high_score(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=10.0, packet_len=10, spreading_factor=8)
assert score > 0.5
def test_long_packet_collision_penalty(self):
from repeater.engine import RepeaterHandler
short = RepeaterHandler.calculate_packet_score(snr=5.0, packet_len=10, spreading_factor=8)
long_ = RepeaterHandler.calculate_packet_score(snr=5.0, packet_len=250, spreading_factor=8)
assert short > long_
def test_score_clamped_to_0_1(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=50.0, packet_len=1, spreading_factor=8)
assert 0.0 <= score <= 1.0
def test_sf_below_7_returns_zero(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=10.0, packet_len=50, spreading_factor=6)
assert score == 0.0
def test_each_sf_has_different_threshold(self):
from repeater.engine import RepeaterHandler
scores = {}
for sf in (7, 8, 9, 10, 11, 12):
scores[sf] = RepeaterHandler.calculate_packet_score(
snr=-5.0, packet_len=50, spreading_factor=sf
)
# Higher SF → lower threshold → better reception at same SNR
# At SNR=-5, SF7 (threshold -7.5) should be worse than SF12 (threshold -20)
assert scores[12] > scores[7]
# ===================================================================
# 7. _calculate_tx_delay
# ===================================================================
class TestTxDelay:
"""TX delay: flood random, direct fixed, score adjustment, cap."""
def test_flood_delay_non_negative(self, handler):
pkt = _make_flood_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
assert delay >= 0.0
def test_flood_delay_capped_at_5s(self, handler):
handler.tx_delay_factor = 1000.0 # extreme multiplier
pkt = _make_flood_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
assert delay <= 5.0
def test_direct_delay_uses_factor(self, handler):
handler.direct_tx_delay_factor = 1.23
pkt = _make_direct_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
# Direct packets use direct_tx_delay_factor directly (in seconds)
# Score adjustment may change it, but base should be 1.23 when score is off
assert delay == pytest.approx(1.23, abs=0.01)
def test_score_adjustment_reduces_delay(self, handler):
handler.use_score_for_tx = True
pkt = _make_flood_packet(payload=b"\x01" * 50)
# High SNR → high score → shorter delay
delays = []
for _ in range(50):
d = handler._calculate_tx_delay(pkt, snr=10.0)
delays.append(d)
avg_high_snr = sum(delays) / len(delays)
# Low SNR → low score → longer delay
delays_low = []
for _ in range(50):
d = handler._calculate_tx_delay(pkt, snr=-5.0)
delays_low.append(d)
avg_low_snr = sum(delays_low) / len(delays_low)
# Using statistical comparison — average high-SNR delay should be lower
# (Both use random, but multiplier differs)
# This is non-deterministic; we just check score path doesn't crash
assert avg_high_snr >= 0.0
assert avg_low_snr >= 0.0
def test_zero_tx_delay_factor(self, handler):
handler.tx_delay_factor = 0.0
pkt = _make_flood_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
assert delay == 0.0
def test_transport_direct_uses_direct_delay(self, handler):
handler.direct_tx_delay_factor = 0.77
pkt = _make_transport_direct_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
assert delay == pytest.approx(0.77, abs=0.01)
# ===================================================================
# 8. Hash stability through forwarding operations
# ===================================================================
class TestHashStabilityThroughForwarding:
"""Verify hash is computed on original packet (before path mutation)."""
def test_flood_hash_unchanged_after_forward(self, handler):
pkt = _make_flood_packet(payload=b"\xde\xad")
hash_before = pkt.calculate_packet_hash().hex().upper()
handler.flood_forward(pkt)
# The hash stored should be the pre-modification hash
assert hash_before in handler.seen_packets
def test_direct_hash_unchanged_after_forward(self, handler):
pkt = _make_direct_packet(payload=b"\xbe\xef", path=bytes([LOCAL_HASH, 0xCC]))
hash_before = pkt.calculate_packet_hash().hex().upper()
handler.direct_forward(pkt)
assert hash_before in handler.seen_packets
def test_flood_second_identical_detected_as_duplicate(self, handler):
"""Two identical packets with the same payload (but path not yet modified)
should be correctly detected as duplicates."""
p1 = _make_flood_packet(payload=b"\xca\xfe")
p2 = _make_flood_packet(payload=b"\xca\xfe")
handler.flood_forward(p1)
result = handler.flood_forward(p2)
assert result is None
def test_direct_second_identical_detected_as_duplicate(self, handler):
p1 = _make_direct_packet(payload=b"\xca\xfe", path=bytes([LOCAL_HASH, 0x11]))
p2 = _make_direct_packet(payload=b"\xca\xfe", path=bytes([LOCAL_HASH, 0x11]))
handler.direct_forward(p1)
result = handler.direct_forward(p2)
assert result is None
# ===================================================================
# 9. unscoped flood policy
# ===================================================================
class TestUnscopedFloodPolicy:
"""unscoped_flood_allow=False blocks plain flood, transport checked."""
def test_flood_blocked_by_policy(self, handler):
handler.config["mesh"]["unscoped_flood_allow"] = False
pkt = _make_flood_packet()
result = handler.flood_forward(pkt)
assert result is None
def test_direct_unaffected_by_flood_policy(self, handler):
handler.config["mesh"]["unscoped_flood_allow"] = False
pkt = _make_direct_packet()
result = handler.direct_forward(pkt)
assert result is not None # direct is not blocked by flood policy
def test_transport_flood_unaffected_by_unscoped_policy(self, handler):
# unscoped_flood_allow controls only plain FLOOD packets.
# Transport floods are validated via _check_transport_codes regardless.
# With a configured scope that allows, transport flood passes even when
# unscoped traffic is denied — the two settings are fully independent.
handler.config["mesh"]["unscoped_flood_allow"] = False
pkt = _make_transport_flood_packet()
with patch.object(handler, "_check_transport_codes", return_value=(True, "")):
result = handler.flood_forward(pkt)
assert result is not None # transport flood passes; unscoped=False did not block it
def test_transport_flood_denied_with_no_keys(self, handler):
# Scope Not Configured = denied, regardless of unscoped_flood_allow.
pkt = _make_transport_flood_packet()
result = handler.flood_forward(pkt) # no mocking — real _check_transport_codes
assert result is None # denied because no transport keys configured
class TestFloodLoopDetection:
"""MeshCore-style loop detection for flood forwarding."""
def test_loop_detect_off_allows_looped_path(self, handler):
handler.config["mesh"]["loop_detect"] = "off"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, 0x11, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is not None
def test_loop_detect_minimal_drops_at_four(self, handler):
handler.config["mesh"]["loop_detect"] = "minimal"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, LOCAL_HASH, LOCAL_HASH, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is None
assert "loop detected" in (pkt.drop_reason or "").lower()
def test_loop_detect_minimal_allows_below_threshold(self, handler):
handler.config["mesh"]["loop_detect"] = "minimal"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, LOCAL_HASH, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is not None
def test_loop_detect_moderate_drops_at_two(self, handler):
handler.config["mesh"]["loop_detect"] = "moderate"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, 0x22, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is None
def test_loop_detect_strict_drops_at_one(self, handler):
handler.config["mesh"]["loop_detect"] = "strict"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([0x33, LOCAL_HASH, 0x44]))
result = handler.flood_forward(pkt)
assert result is None
# ===================================================================
# 10. Airtime / duty-cycle integration
# ===================================================================
class TestAirtimeIntegration:
"""Airtime calculation and duty-cycle enforcement."""
def test_airtime_positive(self, handler):
airtime = handler.airtime_mgr.calculate_airtime(50)
assert airtime > 0.0
def test_can_transmit_fresh(self, handler):
can_tx, wait = handler.airtime_mgr.can_transmit(100.0)
assert can_tx is True
assert wait == 0.0
def test_cannot_transmit_after_exhaustion(self, handler):
# Fill up the budget
handler.airtime_mgr.record_tx(handler.airtime_mgr.max_airtime_per_minute)
can_tx, wait = handler.airtime_mgr.can_transmit(1.0)
assert can_tx is False
assert wait > 0.0
def test_duty_cycle_disabled(self, handler):
handler.config["duty_cycle"]["enforcement_enabled"] = False
handler.airtime_mgr.config = handler.config
handler.airtime_mgr.record_tx(999999)
can_tx, wait = handler.airtime_mgr.can_transmit(999999)
assert can_tx is True
def test_airtime_increases_with_packet_size(self, handler):
short = handler.airtime_mgr.calculate_airtime(10)
long_ = handler.airtime_mgr.calculate_airtime(200)
assert long_ > short
# ===================================================================
# 11. Config reload
# ===================================================================
class TestConfigReload:
"""reload_runtime_config updates in-memory state."""
def test_tx_delay_factor_reloaded(self, handler):
handler.config["delays"]["tx_delay_factor"] = 3.14
handler.reload_runtime_config()
assert handler.tx_delay_factor == 3.14
def test_direct_tx_delay_factor_reloaded(self, handler):
handler.config["delays"]["direct_tx_delay_factor"] = 2.5
handler.reload_runtime_config()
assert handler.direct_tx_delay_factor == 2.5
def test_use_score_for_tx_reloaded(self, handler):
handler.config["repeater"]["use_score_for_tx"] = True
handler.reload_runtime_config()
assert handler.use_score_for_tx is True
def test_cache_ttl_reloaded(self, handler):
handler.config["repeater"]["cache_ttl"] = 120
handler.reload_runtime_config()
assert handler.cache_ttl == 120
# ===================================================================
# 12. _get_drop_reason
# ===================================================================
class TestGetDropReason:
"""_get_drop_reason: determine why a packet was not forwarded."""
def test_duplicate_reason(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
reason = handler._get_drop_reason(pkt)
assert reason == "Duplicate"
def test_empty_payload_reason(self, handler):
pkt = _make_flood_packet(payload=b"")
pkt.payload_len = 0
pkt.payload = bytearray()
reason = handler._get_drop_reason(pkt)
assert "Empty" in reason
def test_path_too_long_reason(self, handler):
pkt = _make_flood_packet(path=bytes(range(MAX_PATH_SIZE)))
reason = handler._get_drop_reason(pkt)
assert "Path too long" in reason
def test_flood_policy_reason(self, handler):
handler.config["mesh"]["unscoped_flood_allow"] = False
pkt = _make_flood_packet()
reason = handler._get_drop_reason(pkt)
assert "flood" in reason.lower()
def test_direct_not_for_us_reason(self, handler):
pkt = _make_direct_packet(path=bytes([0xFF, 0xCC]))
reason = handler._get_drop_reason(pkt)
assert "not for us" in reason
def test_direct_no_path_reason(self, handler):
pkt = _make_direct_packet(path=b"")
pkt.path_len = 0
reason = handler._get_drop_reason(pkt)
assert "no path" in reason
# ===================================================================
# 13. Transport route forwarding
# ===================================================================
class TestTransportForwarding:
"""TRANSPORT_FLOOD and TRANSPORT_DIRECT: packet routing through process_packet."""
def test_transport_flood_appends_path(self, handler):
pkt = _make_transport_flood_packet(path=b"\x11")
with patch.object(handler, "_check_transport_codes", return_value=(True, "")):
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert fwd_pkt.path[-1] == LOCAL_HASH
assert len(fwd_pkt.path) == 2
def test_transport_direct_consumes_path(self, handler):
pkt = _make_transport_direct_packet(path=bytes([LOCAL_HASH, 0xCC]))
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert list(fwd_pkt.path) == [0xCC]
def test_transport_codes_preserved_after_flood(self, handler):
pkt = _make_transport_flood_packet(transport_codes=(0xAAAA, 0xBBBB))
with patch.object(handler, "_check_transport_codes", return_value=(True, "")):
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert fwd_pkt.transport_codes == [0xAAAA, 0xBBBB]
def test_transport_codes_preserved_after_direct(self, handler):
pkt = _make_transport_direct_packet(transport_codes=(0x1111, 0x2222))
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert fwd_pkt.transport_codes == [0x1111, 0x2222]
# ===================================================================
# 14. Statistics tracking
# ===================================================================
class TestStatistics:
"""RX/TX/dropped counters and recent_packets list."""
def test_initial_counters_zero(self, handler):
assert handler.rx_count == 0
assert handler.forwarded_count == 0
assert handler.dropped_count == 0
def test_get_stats_returns_dict(self, handler):
with patch.object(handler, "storage", None):
stats = handler.get_stats()
assert "rx_count" in stats
assert "forwarded_count" in stats
assert "dropped_count" in stats
assert "local_hash" in stats
assert "uptime_seconds" in stats
def test_get_stats_local_hash_format(self, handler):
with patch.object(handler, "storage", None):
stats = handler.get_stats()
assert stats["local_hash"] == f"0x{LOCAL_HASH:02x}"
# ===================================================================
# 15. Edge cases and regression tests
# ===================================================================
class TestEdgeCases:
"""Miscellaneous edge cases and regressions."""
def test_path_as_list_converted_to_bytearray(self, handler):
"""flood_forward should handle path being a list (not bytearray)."""
pkt = _make_flood_packet()
pkt.path = [0x11, 0x22]
pkt.path_len = 2
result = handler.flood_forward(pkt)
assert result is not None
assert isinstance(result.path, bytearray)
assert list(result.path) == [0x11, 0x22, LOCAL_HASH]
def test_flood_forward_idempotent_on_second_call(self, handler):
"""Calling flood_forward again with the SAME packet object should
detect as duplicate (the first call already mark_seen'd it)."""
pkt = _make_flood_packet(payload=b"\xff" * 10)
r1 = handler.flood_forward(pkt)
assert r1 is not None
# Now pkt has local_hash appended, but hash was computed pre-append.
# A new packet with same original payload should be duplicate.
pkt2 = _make_flood_packet(payload=b"\xff" * 10)
r2 = handler.flood_forward(pkt2)
assert r2 is None
def test_large_payload_still_forwarded(self, handler):
pkt = _make_flood_packet(payload=bytes(range(256)) * 4)
result = handler.flood_forward(pkt)
assert result is not None
def test_payload_type_encoding_in_header(self, handler):
"""Verify header construction encodes payload_type correctly."""
for pt in range(16):
pkt = _make_flood_packet(payload=bytes([pt, 0xFF]), payload_type=pt)
assert pkt.get_payload_type() == pt
result = handler.flood_forward(pkt)
assert result is not None
def test_many_distinct_packets_all_forwarded(self, handler):
"""100 unique packets should all be forwarded (no false duplicates)."""
for i in range(100):
pkt = _make_flood_packet(payload=i.to_bytes(4, "big"))
result = handler.flood_forward(pkt)
assert result is not None, f"Packet {i} incorrectly detected as duplicate"
def test_cache_eviction_order_is_fifo(self, handler):
handler.max_cache_size = 3
pkts = [_make_flood_packet(payload=bytes([i])) for i in range(4)]
for p in pkts:
handler.mark_seen(p)
# First one evicted
assert handler.is_duplicate(pkts[0]) is False
# Last three still present
for p in pkts[1:]:
assert handler.is_duplicate(p) is True
def test_do_not_retransmit_with_custom_drop_reason(self, handler):
pkt = _make_flood_packet()
pkt.mark_do_not_retransmit()
pkt.drop_reason = "Custom reason"
result = handler.flood_forward(pkt)
assert result is None
# Custom reason should be preserved (not overwritten)
assert pkt.drop_reason == "Custom reason"
def test_monitor_mode_skips_processing(self, handler):
"""In monitor mode, process_packet is not called at all in __call__,
but process_packet itself doesn't check mode — that's done in __call__.
We test that process_packet works irrespective of mode."""
handler.config["repeater"]["mode"] = "monitor"
pkt = _make_flood_packet()
# process_packet doesn't check mode — should still work
result = handler.process_packet(pkt, snr=0.0)
assert result is not None
# ===================================================================
# 15b. TX mode: forward, monitor, no_tx
# ===================================================================
@pytest.mark.asyncio
class TestTxMode:
"""forward = repeat on; monitor = no repeat, local TX allowed; no_tx = all TX off."""
async def test_forward_mode_calls_process_packet_for_rx(self, handler):
"""In forward mode, a received packet (not local) triggers process_packet."""
handler.config["repeater"]["mode"] = "forward"
pkt = _make_flood_packet()
with patch.object(handler, "process_packet", wraps=handler.process_packet) as m:
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=False)
m.assert_called_once()
async def test_monitor_mode_does_not_call_process_packet_for_rx(self, handler):
"""In monitor mode, a received packet does not trigger process_packet."""
handler.config["repeater"]["mode"] = "monitor"
pkt = _make_flood_packet()
with patch.object(handler, "process_packet") as m:
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=False)
m.assert_not_called()
async def test_no_tx_mode_does_not_call_process_packet_for_rx(self, handler):
"""In no_tx mode, a received packet does not trigger process_packet."""
handler.config["repeater"]["mode"] = "no_tx"
pkt = _make_flood_packet()
with patch.object(handler, "process_packet") as m:
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=False)
m.assert_not_called()
async def test_monitor_mode_allows_local_tx(self, handler):
"""In monitor mode, local_transmission=True still schedules send_packet."""
handler.config["repeater"]["mode"] = "monitor"
pkt = _make_flood_packet()
with patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock):
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=True)
await asyncio.sleep(0) # flush scheduled task
handler.dispatcher.send_packet.assert_called_once()
async def test_no_tx_mode_blocks_local_tx(self, handler):
"""In no_tx mode, local_transmission=True does not schedule send_packet."""
handler.config["repeater"]["mode"] = "no_tx"
pkt = _make_flood_packet()
with patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock):
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=True)
await asyncio.sleep(0)
handler.dispatcher.send_packet.assert_not_called()
async def test_forward_mode_allows_local_tx(self, handler):
"""In forward mode, local_transmission=True schedules send_packet."""
handler.config["repeater"]["mode"] = "forward"
pkt = _make_flood_packet()
with patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock):
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=True)
await asyncio.sleep(0)
handler.dispatcher.send_packet.assert_called_once()
async def test_local_tx_exception_does_not_underflow_forwarded_count(self, handler):
"""TX exceptions must not decrement forwarded_count before any increment happened."""
handler.config["repeater"]["mode"] = "forward"
pkt = _make_flood_packet()
handler.forwarded_count = 0
async def _boom():
raise RuntimeError("simulated tx failure")
with patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock):
with patch.object(
handler,
"schedule_retransmit",
new=AsyncMock(return_value=asyncio.create_task(_boom())),
):
with pytest.raises(RuntimeError, match="simulated tx failure"):
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=True)
assert handler.forwarded_count == 0
async def test_rx_tx_failure_increments_dropped_count_and_preserves_accounting(self, handler):
"""A graceful TX failure should count as dropped and keep counters balanced."""
handler.config["repeater"]["mode"] = "forward"
pkt = _make_flood_packet()
handler.rx_count = 0
handler.forwarded_count = 0
handler.dropped_count = 0
async def _tx_false():
return False
with patch.object(
handler,
"schedule_retransmit",
new=AsyncMock(return_value=asyncio.create_task(_tx_false())),
):
transmitted = await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=False)
assert transmitted is False
assert handler.rx_count == 1
assert handler.forwarded_count == 0
assert handler.dropped_count == 1
assert handler.rx_count == handler.forwarded_count + handler.dropped_count
# ===================================================================
# 16. Airtime calculation correctness
# ===================================================================
class TestAirtimeCalculation:
"""Semtech LoRa airtime formula validation."""
def test_known_airtime_sf7_125khz(self, handler):
"""SF7, 125kHz, CR4/5, 10-byte payload — well-known reference value."""
mgr = handler.airtime_mgr
# Override to known settings
at = mgr.calculate_airtime(
10, spreading_factor=7, bandwidth_hz=125000, coding_rate=5, preamble_len=8
)
# Semtech calculator: ~36ms for these params
assert 30.0 < at < 50.0
def test_sf12_much_slower_than_sf7(self, handler):
mgr = handler.airtime_mgr
at7 = mgr.calculate_airtime(50, spreading_factor=7)
at12 = mgr.calculate_airtime(50, spreading_factor=12)
# SF12 is roughly 32x slower than SF7 per symbol
assert at12 > at7 * 10
def test_zero_payload_still_has_preamble(self, handler):
mgr = handler.airtime_mgr
at = mgr.calculate_airtime(0)
assert at > 0.0
# ===================================================================
# 17. Curated good-packet and bad-packet arrays
# ===================================================================
# ---- 20 GOOD packets: all should be forwarded by process_packet ----
GOOD_PACKETS = [
# (id, description, builder)
(
"good_flood_minimal",
"Flood, 1-byte payload, empty path",
lambda: _make_flood_packet(payload=b"\x01"),
),
(
"good_flood_typical",
"Flood, 10-byte payload, 2-hop path",
lambda: _make_flood_packet(payload=bytes(range(10)), path=b"\x11\x22"),
),
(
"good_flood_max_payload_type",
"Flood, payload_type=15 (max 4-bit)",
lambda: _make_flood_packet(payload=b"\xaa\xbb", payload_type=15),
),
(
"good_flood_payload_type_0",
"Flood, payload_type=0 (plain text)",
lambda: _make_flood_packet(payload=b"\x01\x02\x03", payload_type=0),
),
(
"good_flood_long_payload",
"Flood, 200-byte payload",
lambda: _make_flood_packet(payload=bytes(range(200))),
),
(
"good_flood_single_byte_path",
"Flood, path has 1 prior hop",
lambda: _make_flood_packet(payload=b"\xde\xad", path=b"\x42"),
),
(
"good_flood_binary_payload",
"Flood, all-zero payload",
lambda: _make_flood_packet(payload=b"\x00" * 16),
),
(
"good_flood_high_entropy",
"Flood, high-entropy random-looking payload",
lambda: _make_flood_packet(payload=bytes(i ^ 0xA5 for i in range(64))),
),
(
"good_flood_advert_type",
"Flood, payload_type=4 (ADVERT)",
lambda: _make_flood_packet(payload=b"\xab\x01\x02\x03", payload_type=4),
),
(
"good_direct_minimal",
"Direct, 1-byte payload, single hop to us (forward with empty path)",
lambda: _make_direct_packet(payload=b"\x01", path=bytes([LOCAL_HASH])),
),
(
"good_direct_multihop",
"Direct, 3-hop remaining path (us + 2 more)",
lambda: _make_direct_packet(payload=b"\xca\xfe", path=bytes([LOCAL_HASH, 0x11, 0x22])),
),
(
"good_direct_long_payload",
"Direct, 150-byte payload",
lambda: _make_direct_packet(payload=bytes(range(150)), path=bytes([LOCAL_HASH, 0xBB])),
),
(
"good_direct_type_2",
"Direct, payload_type=2 (ACK)",
lambda: _make_direct_packet(payload=b"\x01\x02", path=bytes([LOCAL_HASH]), payload_type=2),
),
(
"good_direct_long_remaining_path",
"Direct, 10 hops remaining after us",
lambda: _make_direct_packet(
payload=b"\xff\xee", path=bytes([LOCAL_HASH] + list(range(10)))
),
),
(
"good_transport_direct_basic",
"Transport direct, basic hop to us",
lambda: _make_transport_direct_packet(payload=b"\x01\x02"),
),
(
"good_transport_direct_long_path",
"Transport direct, 5 remaining hops",
lambda: _make_transport_direct_packet(
payload=b"\xde\xad\xbe\xef", path=bytes([LOCAL_HASH, 0x11, 0x22, 0x33, 0x44])
),
),
]
# ---- 20 BAD packets: all should be dropped / return None ----
BAD_PACKETS = [
# (id, description, builder)
(
"bad_empty_payload",
"Empty bytearray payload",
lambda: _make_flood_packet(payload=b""),
"Empty payload",
),
(
"bad_none_payload",
"payload = None",
lambda: (lambda p: (setattr(p, "payload", None), p)[-1])(_make_flood_packet()),
"Empty payload",
),
(
"bad_path_at_max",
"Path exactly MAX_PATH_SIZE — no room to append",
lambda: _make_flood_packet(payload=b"\x01", path=bytes(range(MAX_PATH_SIZE))),
"Path length",
),
(
"bad_flood_path_near_max",
"Flood, path = MAX_PATH_SIZE - 1 (63 hops; path_len encodes 0-63, cannot append)",
lambda: _make_flood_packet(payload=b"\xff", path=bytes(range(MAX_PATH_SIZE - 1))),
"cannot append",
),
(
"bad_path_over_max",
"Path exceeds MAX_PATH_SIZE",
lambda: _make_flood_packet(payload=b"\x01", path=bytes(range(MAX_PATH_SIZE + 5))),
"Path length",
),
(
"bad_do_not_retransmit",
"Marked do-not-retransmit",
lambda: (lambda p: (p.mark_do_not_retransmit(), p)[-1])(_make_flood_packet()),
"do not retransmit",
),
(
"bad_direct_wrong_hop",
"Direct packet, path[0] != LOCAL_HASH",
lambda: _make_direct_packet(path=bytes([0xFF, 0xCC])),
"not for us",
),
(
"bad_direct_empty_path",
"Direct packet with empty path",
lambda: _make_direct_packet(path=b""),
"no path",
),
(
"bad_direct_none_path",
"Direct packet with path = None",
lambda: (lambda p: (setattr(p, "path", None), setattr(p, "path_len", 0), p)[-1])(
_make_direct_packet()
),
"no path",
),
(
"bad_flood_policy_off",
"Plain flood when unscoped_flood_allow=False (needs config override)",
lambda: _make_flood_packet(payload=b"\x01\x02"),
"unscoped flood",
),
(
"bad_transport_flood_no_keys",
"Transport flood with no configured transport keys — always denied",
lambda: _make_transport_flood_packet(payload=b"\x01\x02"),
"transport",
),
(
"bad_direct_empty_payload",
"Direct with empty payload (now caught by validate_packet)",
lambda: (
lambda p: (setattr(p, "payload", bytearray()), setattr(p, "payload_len", 0), p)[-1]
)(_make_direct_packet(path=bytes([LOCAL_HASH]))),
"Empty payload",
),
(
"bad_flood_zero_len_payload",
"Flood with payload_len forced to 0",
lambda: (
lambda p: (setattr(p, "payload_len", 0), setattr(p, "payload", bytearray()), p)[-1]
)(_make_flood_packet(payload=b"\x01")),
"Empty payload",
),
(
"bad_direct_only_wrong_hops",
"Direct path of all 0xFF bytes (none match LOCAL_HASH)",
lambda: _make_direct_packet(path=bytes([0xFF, 0xFE, 0xFD])),
"not for us",
),
(
"bad_transport_direct_wrong_hop",
"Transport direct with wrong first hop",
lambda: _make_transport_direct_packet(path=bytes([0x01, 0x02])),
"not for us",
),
(
"bad_transport_direct_empty_path",
"Transport direct with empty path",
lambda: _make_transport_direct_packet(path=b""),
"no path",
),
(
"bad_transport_direct_none_path",
"Transport direct with path = None",
lambda: (lambda p: (setattr(p, "path", None), setattr(p, "path_len", 0), p)[-1])(
_make_transport_direct_packet()
),
"no path",
),
(
"bad_flood_payload_255_zeros",
"Flood with payload = bytearray(0) (empty)",
lambda: (
lambda p: (setattr(p, "payload", bytearray()), setattr(p, "payload_len", 0), p)[-1]
)(_make_flood_packet()),
"Empty payload",
),
(
"bad_direct_none_payload",
"Direct with None payload (now caught by validate_packet)",
lambda: (lambda p: (setattr(p, "payload", None), p)[-1])(
_make_direct_packet(path=bytes([LOCAL_HASH]))
),
"Empty payload",
),
(
"bad_flood_do_not_retransmit_custom",
"Flood, do-not-retransmit with custom drop reason",
lambda: (
lambda p: (p.mark_do_not_retransmit(), setattr(p, "drop_reason", "Advert consumed"), p)[
-1
]
)(_make_flood_packet(payload=b"\xab")),
"Advert consumed",
),
(
"bad_direct_do_not_retransmit",
"Direct, marked do-not-retransmit (now caught by direct_forward)",
lambda: (lambda p: (p.mark_do_not_retransmit(), p)[-1])(
_make_direct_packet(payload=b"\x99", path=bytes([LOCAL_HASH, 0x11]))
),
"do not retransmit",
),
]
# Pytest ids for readable output
_good_ids = [g[0] for g in GOOD_PACKETS]
_bad_ids = [b[0] for b in BAD_PACKETS]
class TestGoodPacketArray:
"""All 20 good packets should be forwarded successfully."""
@pytest.mark.parametrize(
"name, desc, builder",
GOOD_PACKETS,
ids=_good_ids,
)
def test_process_packet_forwards(self, handler, name, desc, builder):
pkt = builder()
result = handler.process_packet(pkt, snr=5.0)
assert result is not None, f"[{name}] {desc} — unexpectedly dropped"
fwd_pkt, delay = result
assert delay >= 0.0
@pytest.mark.parametrize(
"name, desc, builder",
GOOD_PACKETS,
ids=_good_ids,
)
def test_good_packet_not_duplicate_on_first_see(self, handler, name, desc, builder):
pkt = builder()
assert handler.is_duplicate(pkt) is False, f"[{name}] falsely flagged as duplicate"
@pytest.mark.parametrize(
"name, desc, builder",
GOOD_PACKETS,
ids=_good_ids,
)
def test_good_packet_path_modified(self, handler, name, desc, builder):
pkt = builder()
route = pkt.header & PH_ROUTE_MASK
original_path = list(pkt.path) if pkt.path else []
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
if route in (ROUTE_TYPE_FLOOD, ROUTE_TYPE_TRANSPORT_FLOOD):
assert fwd_pkt.path[-1] == LOCAL_HASH, f"[{name}] local hash not appended"
assert len(fwd_pkt.path) == len(original_path) + 1
else:
# Direct: first hop consumed
assert len(fwd_pkt.path) == len(original_path) - 1
class TestBadPacketArray:
"""All 20 bad packets should be dropped by the engine."""
@pytest.mark.parametrize(
"name, desc, builder, expected_reason",
BAD_PACKETS,
ids=_bad_ids,
)
def test_process_packet_drops(self, handler, name, desc, builder, expected_reason):
# Two entries need unscoped_flood_allow=False
if "policy_off" in name:
handler.config["mesh"]["unscoped_flood_allow"] = False
pkt = builder()
result = handler.process_packet(pkt, snr=5.0)
assert result is None, f"[{name}] {desc} — should have been dropped"
@pytest.mark.parametrize(
"name, desc, builder, expected_reason",
BAD_PACKETS,
ids=_bad_ids,
)
def test_drop_reason_set(self, handler, name, desc, builder, expected_reason):
if "policy_off" in name:
handler.config["mesh"]["unscoped_flood_allow"] = False
pkt = builder()
handler.process_packet(pkt, snr=5.0)
assert pkt.drop_reason is not None, f"[{name}] drop_reason not set"
assert expected_reason.lower() in pkt.drop_reason.lower(), (
f"[{name}] expected '{expected_reason}' in drop_reason, got '{pkt.drop_reason}'"
)
@pytest.mark.parametrize(
"name, desc, builder, expected_reason",
BAD_PACKETS,
ids=_bad_ids,
)
def test_bad_packet_not_marked_seen(self, handler, name, desc, builder, expected_reason):
"""Dropped packets must NOT pollute the seen cache."""
if "policy_off" in name:
handler.config["mesh"]["unscoped_flood_allow"] = False
pkt = builder()
handler.process_packet(pkt, snr=5.0)
# Should not be in seen_packets (except do-not-retransmit and policy
# packets which fail AFTER validation but BEFORE mark_seen)
# Actually none of these should be marked seen — the engine only
# calls mark_seen on packets that pass all checks.
pkt_hash = pkt.calculate_packet_hash().hex().upper() if pkt.payload else None
if pkt_hash:
assert pkt_hash not in handler.seen_packets, (
f"[{name}] bad packet was incorrectly added to seen cache"
)
class TestRecordPacketOnlyTrace:
"""record_packet_only must not log TRACE: TraceHelper owns trace path; packet.path is SNR."""
def test_record_packet_only_skips_trace(self, handler):
storage = handler.storage
storage.record_packet.reset_mock()
pkt = PacketBuilder.create_trace(tag=1, auth_code=2, flags=0, path=[0xAB, 0xCD])
n_before = len(handler.recent_packets)
handler.record_packet_only(pkt, {"rssi": -80, "snr": 10.0})
storage.record_packet.assert_not_called()
assert len(handler.recent_packets) == n_before
# ===================================================================
# 18. Real packet injection through __call__
# ===================================================================
def _inject_from_wire(pkt: Packet) -> Packet:
"""Serialize and deserialize a packet to simulate a real RF wire packet."""
wire = pkt.write_to()
injected = Packet()
injected.read_from(wire)
return injected
@pytest.mark.asyncio
class TestPacketInjectionRouting:
"""Inject real serialized packets through __call__ and assert routing outcomes."""
@staticmethod
def _prepare_fast_tx(handler):
handler.airtime_mgr.calculate_airtime = MagicMock(return_value=20.0)
handler.airtime_mgr.can_transmit = MagicMock(return_value=(True, 0.0))
handler.airtime_mgr.record_tx = MagicMock()
handler.airtime_mgr.record_rx = MagicMock()
async def test_injected_flood_forwards_and_appends_path(self, handler):
self._prepare_fast_tx(handler)
pkt = _inject_from_wire(_make_flood_packet(payload=b"\x10\x20\x30", path=b"\x11"))
with (
patch.object(handler, "_calculate_tx_delay", return_value=0.0),
patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock),
):
await handler(pkt, {"snr": 7.0, "rssi": -70}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 1
sent_pkt = handler.dispatcher.send_packet.call_args.args[0]
assert bytes(sent_pkt.path) == bytes([0x11, LOCAL_HASH])
assert handler.rx_count == 1
assert handler.forwarded_count == 1
assert handler.dropped_count == 0
async def test_injected_direct_forwards_and_consumes_hop(self, handler):
self._prepare_fast_tx(handler)
pkt = _inject_from_wire(
_make_direct_packet(payload=b"\xaa\xbb", path=bytes([LOCAL_HASH, 0x44, 0x55]))
)
with (
patch.object(handler, "_calculate_tx_delay", return_value=0.0),
patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock),
):
await handler(pkt, {"snr": 3.0, "rssi": -82}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 1
sent_pkt = handler.dispatcher.send_packet.call_args.args[0]
assert bytes(sent_pkt.path) == b"\x44\x55"
async def test_direct_for_other_node_is_dropped(self, handler):
pkt = _inject_from_wire(_make_direct_packet(payload=b"\xaa\xbb", path=b"\xfe\x44"))
with patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock):
await handler(pkt, {"snr": 2.0, "rssi": -90}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 0
assert handler.dropped_count == 1
assert "not for us" in (handler.recent_packets[-1]["drop_reason"] or "")
async def test_duplicate_wire_packet_not_retransmitted(self, handler):
self._prepare_fast_tx(handler)
incoming = _make_flood_packet(payload=b"\x99\x88\x77", path=b"\x01")
pkt1 = _inject_from_wire(incoming)
pkt2 = _inject_from_wire(incoming)
with (
patch.object(handler, "_calculate_tx_delay", return_value=0.0),
patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock),
):
await handler(pkt1, {"snr": 6.0, "rssi": -75}, local_transmission=False)
await handler(pkt2, {"snr": 5.5, "rssi": -76}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 1
assert handler.dropped_count == 1
assert handler.flood_dup_count == 1
original = handler.recent_packets[-1]
assert "duplicates" in original
assert len(original["duplicates"]) == 1
assert original["duplicates"][0]["drop_reason"] == "Duplicate"
async def test_transport_flood_injection_honors_transport_key_decision(self, handler):
pkt = _inject_from_wire(_make_transport_flood_packet(payload=b"\x01\x02\x03\x04", path=b""))
with (
patch.object(handler, "_check_transport_codes", return_value=(False, "denied")),
patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock),
):
await handler(pkt, {"snr": 0.0, "rssi": -92}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 0
assert "transport" in (handler.recent_packets[-1]["drop_reason"] or "").lower()
async def test_local_tx_then_rf_echo_is_duplicate(self, handler):
self._prepare_fast_tx(handler)
local_pkt = _make_flood_packet(payload=b"\x0a\x0b\x0c", path=b"")
with (
patch.object(handler, "_calculate_tx_delay", return_value=0.0),
patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock),
):
await handler(local_pkt, {"snr": 0.0, "rssi": -50}, local_transmission=True)
rf_echo = _inject_from_wire(_make_flood_packet(payload=b"\x0a\x0b\x0c", path=b""))
await handler(rf_echo, {"snr": 0.0, "rssi": -70}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 1
assert handler.dropped_count == 1
assert handler.flood_dup_count == 1
original = handler.recent_packets[-1]
assert "duplicates" in original
assert len(original["duplicates"]) == 1
assert original["duplicates"][0]["drop_reason"] == "Duplicate"
@pytest.mark.parametrize("payload_type", range(16))
async def test_all_payload_types_flood_injection_forwards(self, handler, payload_type):
self._prepare_fast_tx(handler)
pkt = _inject_from_wire(
_make_flood_packet(
payload=bytes([payload_type, 0xA5, 0x5A]),
path=b"\x11",
payload_type=payload_type,
)
)
with (
patch.object(handler, "_calculate_tx_delay", return_value=0.0),
patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock),
):
await handler(pkt, {"snr": 4.0, "rssi": -78}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 1
sent_pkt = handler.dispatcher.send_packet.call_args.args[0]
assert sent_pkt.get_payload_type() == payload_type
assert sent_pkt.path[-1] == LOCAL_HASH
@pytest.mark.parametrize("payload_type", range(16))
async def test_all_payload_types_direct_injection_forwards(self, handler, payload_type):
self._prepare_fast_tx(handler)
pkt = _inject_from_wire(
_make_direct_packet(
payload=bytes([payload_type, 0x01, 0x02]),
path=bytes([LOCAL_HASH, 0x44, 0x55]),
payload_type=payload_type,
)
)
with (
patch.object(handler, "_calculate_tx_delay", return_value=0.0),
patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock),
):
await handler(pkt, {"snr": 2.5, "rssi": -84}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 1
sent_pkt = handler.dispatcher.send_packet.call_args.args[0]
assert sent_pkt.get_payload_type() == payload_type
assert bytes(sent_pkt.path) == b"\x44\x55"
@pytest.mark.parametrize("payload_type", range(16))
async def test_all_payload_types_transport_flood_injection_forwards(
self, handler, payload_type
):
self._prepare_fast_tx(handler)
pkt = _inject_from_wire(
_make_transport_flood_packet(
payload=bytes([payload_type, 0x33, 0x44]),
path=b"",
payload_type=payload_type,
transport_codes=(0x1111, 0x2222),
)
)
with (
patch.object(handler, "_check_transport_codes", return_value=(True, "")),
patch.object(handler, "_calculate_tx_delay", return_value=0.0),
patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock),
):
await handler(pkt, {"snr": 1.0, "rssi": -88}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 1
sent_pkt = handler.dispatcher.send_packet.call_args.args[0]
assert sent_pkt.get_payload_type() == payload_type
assert sent_pkt.transport_codes == [0x1111, 0x2222]
@pytest.mark.parametrize("payload_type", range(16))
async def test_all_payload_types_transport_direct_injection_forwards(
self, handler, payload_type
):
self._prepare_fast_tx(handler)
pkt = _inject_from_wire(
_make_transport_direct_packet(
payload=bytes([payload_type, 0x66, 0x77]),
path=bytes([LOCAL_HASH, 0x22]),
payload_type=payload_type,
transport_codes=(0x3333, 0x4444),
)
)
with (
patch.object(handler, "_calculate_tx_delay", return_value=0.0),
patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock),
):
await handler(pkt, {"snr": 3.0, "rssi": -83}, local_transmission=False)
assert handler.dispatcher.send_packet.call_count == 1
sent_pkt = handler.dispatcher.send_packet.call_args.args[0]
assert sent_pkt.get_payload_type() == payload_type
assert bytes(sent_pkt.path) == b"\x22"
assert sent_pkt.transport_codes == [0x3333, 0x4444]
# ===================================================================
# 19. Missed branch coverage (background/transport helpers)
# ===================================================================
class TestMissedEngineBranches:
"""Target previously untested helper/lifecycle branches in RepeaterHandler."""
def test_check_transport_codes_accepts_matching_key_and_uses_cache(self, handler):
key_raw = b"0123456789ABCDEF"
key_b64 = base64.b64encode(key_raw).decode("ascii")
handler.storage.get_transport_keys.return_value = [
{
"id": 7,
"name": "primary",
"transport_key": key_b64,
"flood_policy": "allow",
}
]
pkt = _make_transport_flood_packet(payload=b"\x01\x02", path=b"")
pkt.transport_codes = [0xCAFE, 0xBEEF]
with patch("pymc_core.protocol.transport_keys.calc_transport_code", return_value=0xCAFE):
allowed_1, reason_1 = handler._check_transport_codes(pkt)
allowed_2, reason_2 = handler._check_transport_codes(pkt)
assert allowed_1 is True and reason_1 == ""
assert allowed_2 is True and reason_2 == ""
assert handler.storage.get_transport_keys.call_count == 1
assert handler.storage.update_transport_key.call_count == 2
def test_record_duplicate_groups_under_original(self, handler):
pkt = _make_flood_packet(payload=b"\x12\x34")
original_hash = pkt.calculate_packet_hash().hex().upper()[:16]
original_record = {
"timestamp": time.time(),
"packet_hash": original_hash,
"transmitted": True,
}
handler._append_recent_packet(original_record)
handler.record_duplicate(pkt, rssi=-90, snr=1.5)
assert handler.flood_dup_count == 1
assert "duplicates" in original_record
assert len(original_record["duplicates"]) == 1
assert original_record["duplicates"][0]["drop_reason"] == "Duplicate"
@pytest.mark.asyncio
async def test_record_crc_errors_async_records_positive_delta(self, handler):
handler.dispatcher.radio.crc_error_count = 9
handler._last_crc_error_count = 4
await handler._record_crc_errors_async()
handler.storage.record_crc_errors.assert_called_once_with(5)
assert handler._last_crc_error_count == 9
@pytest.mark.asyncio
async def test_record_noise_floor_async_caches_and_persists(self, handler):
with patch.object(handler, "get_noise_floor", return_value=-117.5):
await handler._record_noise_floor_async()
assert handler._cached_noise_floor == -117.5
handler.storage.record_noise_floor.assert_called_once_with(-117.5)
@pytest.mark.asyncio
async def test_send_periodic_advert_async_success_and_failure(self, handler):
handler.send_advert_func = AsyncMock(side_effect=[True, False])
await handler._send_periodic_advert_async()
await handler._send_periodic_advert_async()
assert handler.send_advert_func.await_count == 2
def test_cleanup_cancels_background_task_and_closes_storage(self, handler):
fake_task = MagicMock()
fake_task.done.return_value = False
handler._background_task = fake_task
handler.cleanup()
fake_task.cancel.assert_called_once()
handler.storage.close.assert_called_once()
# ===================================================================
# 20. Transmission and Background Lifecycle Branches
# ===================================================================
class TestEngineTransmissionAndBackgroundLifecycle:
"""Cover duty-cycle outcomes, packet-record robustness, and background timer lifecycle."""
@pytest.mark.asyncio
async def test_local_tx_defers_when_duty_cycle_blocked(self, handler):
pkt = _make_flood_packet(payload=b"\x21\x22")
handler.airtime_mgr.calculate_airtime = MagicMock(return_value=120.0)
handler.airtime_mgr.can_transmit = MagicMock(return_value=(False, 2.0))
with patch.object(handler, "_calculate_tx_delay", return_value=0.5):
loop = asyncio.get_running_loop()
completed = loop.create_future()
completed.set_result(True)
async def _fake_schedule(packet, delay, airtime_ms, local_transmission=False):
packet._tx_metadata = {
"lbt_attempts": 2,
"lbt_backoff_delays_ms": [10, 20],
"lbt_channel_busy": True,
}
return completed
handler.schedule_retransmit = AsyncMock(side_effect=_fake_schedule)
await handler(pkt, {"snr": 0.0, "rssi": -50}, local_transmission=True)
handler.schedule_retransmit.assert_awaited_once()
args = handler.schedule_retransmit.await_args.args
assert args[0] is pkt
assert args[1] == pytest.approx(2.5) # original delay + duty-cycle wait
assert args[2] == 120.0
assert handler.forwarded_count == 1
assert handler.dropped_count == 0
assert handler.recent_packets[-1]["lbt_attempts"] == 2
@pytest.mark.asyncio
async def test_local_deferred_tx_failure_decrements_forwarded_counter(self, handler):
pkt = _make_flood_packet(payload=b"\x23\x24")
handler.airtime_mgr.calculate_airtime = MagicMock(return_value=55.0)
handler.airtime_mgr.can_transmit = MagicMock(return_value=(False, 1.0))
loop = asyncio.get_running_loop()
failing = loop.create_future()
failing.set_exception(RuntimeError("deferred tx failed"))
handler.schedule_retransmit = AsyncMock(return_value=failing)
with patch.object(handler, "_calculate_tx_delay", return_value=0.2):
with pytest.raises(RuntimeError, match="deferred tx failed"):
await handler(pkt, {"snr": 0.0, "rssi": -52}, local_transmission=True)
assert handler.forwarded_count == 0
@pytest.mark.asyncio
async def test_non_local_drop_when_duty_cycle_blocked(self, handler):
pkt = _make_flood_packet(payload=b"\x31\x32")
handler.airtime_mgr.calculate_airtime = MagicMock(return_value=80.0)
handler.airtime_mgr.can_transmit = MagicMock(return_value=(False, 1.25))
handler.process_packet = MagicMock(return_value=(pkt, 0.1))
handler.schedule_retransmit = AsyncMock()
await handler(pkt, {"snr": 5.0, "rssi": -75}, local_transmission=False)
handler.schedule_retransmit.assert_not_awaited()
assert handler.dropped_count == 1
assert handler.forwarded_count == 0
assert handler.recent_packets[-1]["drop_reason"] == "Duty cycle limit"
@pytest.mark.asyncio
async def test_tx_failure_rolls_back_forwarded_counter(self, handler):
pkt = _make_flood_packet(payload=b"\x41\x42")
handler.airtime_mgr.calculate_airtime = MagicMock(return_value=40.0)
handler.airtime_mgr.can_transmit = MagicMock(return_value=(True, 0.0))
loop = asyncio.get_running_loop()
failing = loop.create_future()
failing.set_exception(RuntimeError("radio busy"))
handler.schedule_retransmit = AsyncMock(return_value=failing)
with patch.object(handler, "_calculate_tx_delay", return_value=0.0):
with pytest.raises(RuntimeError, match="radio busy"):
await handler(pkt, {"snr": 0.0, "rssi": -40}, local_transmission=True)
# Incremented before scheduling, decremented on failure path.
assert handler.forwarded_count == 0
def test_record_packet_only_missing_header_and_storage_failure(self, handler):
pkt = _make_flood_packet(payload=b"\x51\x52")
n_before = len(handler.recent_packets)
pkt.header = None
handler.record_packet_only(pkt, {"rssi": -70, "snr": 2.0})
assert len(handler.recent_packets) == n_before
pkt.header = ROUTE_TYPE_FLOOD | (0x01 << PH_TYPE_SHIFT)
handler.storage.record_packet.side_effect = RuntimeError("db down")
handler.record_packet_only(pkt, {"rssi": -70, "snr": 2.0})
# Storage failure should not append to recent list.
assert len(handler.recent_packets) == n_before
def test_log_trace_record_updates_counters_even_if_storage_fails(self, handler):
base_rx = handler.rx_count
base_fwd = handler.forwarded_count
base_drop = handler.dropped_count
handler.storage.record_packet.side_effect = RuntimeError("write fail")
handler.log_trace_record({"packet_hash": "ABC123", "transmitted": True})
handler.log_trace_record({"packet_hash": "DEF456", "transmitted": False})
assert handler.rx_count == base_rx + 2
assert handler.forwarded_count == base_fwd + 1
assert handler.dropped_count == base_drop + 1
def test_record_duplicate_direct_route_updates_duplicate_counters(self, handler):
pkt = _make_direct_packet(payload=b"\x61\x62", path=bytes([LOCAL_HASH, 0xAA]))
handler.record_duplicate(pkt, rssi=-88, snr=1.2)
assert handler.recv_direct_count == 1
assert handler.direct_dup_count == 1
def test_start_background_tasks_only_starts_once(self, handler):
marker_task = MagicMock(name="bg_task")
def _fake_create_task(coro):
coro.close()
return marker_task
with patch("repeater.engine.asyncio.create_task", side_effect=_fake_create_task) as mk_task:
handler._background_task = None
handler._start_background_tasks()
handler._start_background_tasks()
mk_task.assert_called_once()
assert handler._background_task is marker_task
@pytest.mark.asyncio
async def test_background_timer_loop_runs_tasks_and_handles_cancel(self, handler):
handler.last_noise_measurement = 0
handler.noise_floor_interval = 1
handler.send_advert_interval_hours = 1
handler.send_advert_func = AsyncMock()
handler.last_advert_time = 0
handler.last_cache_cleanup = 0
handler.last_db_cleanup = 0
handler.cleanup_cache = MagicMock()
handler._record_noise_floor_async = AsyncMock()
handler._record_crc_errors_async = AsyncMock()
handler._send_periodic_advert_async = AsyncMock()
with (
patch("repeater.engine.time.time", return_value=100000.0),
patch(
"repeater.engine.asyncio.sleep",
new_callable=AsyncMock,
side_effect=asyncio.CancelledError,
),
):
with pytest.raises(asyncio.CancelledError):
await handler._background_timer_loop()
handler._record_noise_floor_async.assert_awaited_once()
handler._record_crc_errors_async.assert_awaited_once()
handler._send_periodic_advert_async.assert_awaited_once()
handler.cleanup_cache.assert_called_once()
handler.storage.cleanup_old_data.assert_called_once()
@pytest.mark.asyncio
async def test_background_timer_loop_continues_when_db_cleanup_fails(self, handler):
handler.last_noise_measurement = 0
handler.noise_floor_interval = 999999
handler.send_advert_interval_hours = 0
handler.last_cache_cleanup = 0
handler.last_db_cleanup = 0
handler.cleanup_cache = MagicMock()
handler._record_noise_floor_async = AsyncMock()
handler._record_crc_errors_async = AsyncMock()
handler.storage.cleanup_old_data.side_effect = RuntimeError("cleanup error")
with (
patch("repeater.engine.time.time", return_value=100000.0),
patch(
"repeater.engine.asyncio.sleep",
new_callable=AsyncMock,
side_effect=asyncio.CancelledError,
),
):
with pytest.raises(asyncio.CancelledError):
await handler._background_timer_loop()
handler.storage.cleanup_old_data.assert_called_once()
@pytest.mark.asyncio
async def test_background_timer_loop_exception_restarts_task(self, handler):
handler._record_noise_floor_async = AsyncMock(side_effect=RuntimeError("boom"))
handler.last_noise_measurement = 0
handler.noise_floor_interval = 1
created = {}
def _fake_create_task(coro):
created["called"] = True
# Avoid leaking an un-awaited coroutine in the test process.
coro.close()
return "restarted-task"
with (
patch("repeater.engine.time.time", return_value=100000.0),
patch(
"repeater.engine.asyncio.sleep", new_callable=AsyncMock, return_value=None
) as sleep_mock,
patch("repeater.engine.asyncio.create_task", side_effect=_fake_create_task),
):
await handler._background_timer_loop()
sleep_mock.assert_awaited_once_with(30)
assert created.get("called") is True
assert handler._background_task == "restarted-task"
@pytest.mark.asyncio
async def test_record_noise_floor_handles_none_and_exceptions(self, handler):
with patch.object(handler, "get_noise_floor", return_value=None):
await handler._record_noise_floor_async()
handler.storage.record_noise_floor.assert_not_called()
with patch.object(handler, "get_noise_floor", side_effect=RuntimeError("noise fail")):
await handler._record_noise_floor_async()
@pytest.mark.asyncio
async def test_record_crc_errors_returns_without_storage_and_handles_storage_exception(
self, handler
):
# No storage configured: should return early.
handler.storage = None
await handler._record_crc_errors_async()
# Restore storage and force write error on positive delta.
handler.storage = MagicMock()
handler._last_crc_error_count = 1
handler.dispatcher.radio.crc_error_count = 3
handler.storage.record_crc_errors.side_effect = RuntimeError("crc write fail")
await handler._record_crc_errors_async()
@pytest.mark.asyncio
async def test_send_periodic_advert_handles_missing_handler_and_handler_exception(
self, handler
):
handler.send_advert_func = None
await handler._send_periodic_advert_async()
handler.send_advert_func = AsyncMock(side_effect=RuntimeError("advert fail"))
await handler._send_periodic_advert_async()
class TestEngineRecordAndCleanupHelpers:
"""Cover helper fallbacks that protect UI visibility and in-memory index integrity."""
def test_record_duplicate_appends_when_original_not_found(self, handler):
# Keep recent non-empty but ensure duplicate hash is not indexed.
handler._append_recent_packet({"packet_hash": "OTHERHASH", "transmitted": True})
pkt = _make_flood_packet(payload=b"\x71\x72")
handler.record_duplicate(pkt, rssi=-85, snr=1.0)
assert handler.recent_packets[-1]["drop_reason"] == "Duplicate"
assert (
handler.recent_packets[-1]["packet_hash"]
== pkt.calculate_packet_hash().hex().upper()[:16]
)
def test_record_duplicate_appends_when_recent_packets_empty(self, handler):
handler.recent_packets.clear()
handler._recent_hash_index.clear()
pkt = _make_flood_packet(payload=b"\x73\x74")
handler.record_duplicate(pkt, rssi=-82, snr=1.1)
assert len(handler.recent_packets) == 1
assert handler.recent_packets[0]["drop_reason"] == "Duplicate"
def test_record_duplicate_route_zero_maps_to_flood_counters(self, handler):
pkt = _make_flood_packet(payload=b"\x75\x76")
# Route nibble 0 is parsed as FLOOD in current protocol constants.
pkt.header = 0x00 << PH_TYPE_SHIFT
handler.record_duplicate(pkt, rssi=-90, snr=0.5)
assert handler.flood_dup_count == 1
assert handler.direct_dup_count == 0
def test_append_recent_packet_eviction_removes_matching_index_entry(self, handler):
handler.max_recent_packets = 1
old = {"packet_hash": "OLDHASH"}
handler.recent_packets.append(old)
handler._recent_hash_index["OLDHASH"] = old
handler._append_recent_packet({"packet_hash": "NEWHASH"})
assert "OLDHASH" not in handler._recent_hash_index
assert handler.recent_packets[-1]["packet_hash"] == "NEWHASH"
assert handler._recent_hash_index["NEWHASH"] is handler.recent_packets[-1]
def test_append_recent_packet_without_hash_skips_index_update(self, handler):
base_index = dict(handler._recent_hash_index)
handler._append_recent_packet({"timestamp": time.time()})
assert dict(handler._recent_hash_index) == base_index
def test_cleanup_handles_storage_close_exception(self, handler):
fake_task = MagicMock()
fake_task.done.return_value = False
handler._background_task = fake_task
handler.storage.close.side_effect = RuntimeError("close failed")
# cleanup should swallow close errors and not raise.
handler.cleanup()
fake_task.cancel.assert_called_once()