Files
pyMC_Repeater/tests/test_engine.py
agessaman c5c94fe60a feat: exclude TRACE packets from logging in RepeaterHandler and PacketRouter
- Updated record_packet_only method to skip logging for TRACE packets, as TraceHelper manages trace paths.
- Enhanced documentation to clarify the handling of TRACE packets in the web UI.
- Added tests to ensure TRACE packets are not recorded, maintaining data integrity.
2026-03-22 15:26:28 -07:00

1382 lines
53 KiB
Python

"""
tests for pyMC_Repeater engine.py — RepeaterHandler.
Covers: flood_forward, direct_forward, process_packet, duplicate detection,
mark_seen, validate_packet, packet scoring, TX delay, cache management,
airtime duty-cycle, TX mode (forward/monitor/no_tx), and config reloading.
"""
import asyncio
import copy
import math
import time
from collections import OrderedDict
from typing import Optional
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pymc_core.protocol import Packet, PacketBuilder
from pymc_core.protocol.constants import (
MAX_PATH_SIZE,
PH_ROUTE_MASK,
PH_TYPE_SHIFT,
ROUTE_TYPE_DIRECT,
ROUTE_TYPE_FLOOD,
ROUTE_TYPE_TRANSPORT_DIRECT,
ROUTE_TYPE_TRANSPORT_FLOOD,
)
# ---------------------------------------------------------------------------
# Helpers — build minimal config / mocks needed by RepeaterHandler.__init__
# ---------------------------------------------------------------------------
LOCAL_HASH = 0xAB # repeater's own 1-byte path hash
def _make_config(**overrides) -> dict:
"""Return a minimal valid config dict for RepeaterHandler."""
cfg = {
"repeater": {
"mode": "forward",
"cache_ttl": 3600,
"use_score_for_tx": False,
"score_threshold": 0.3,
"send_advert_interval_hours": 0, # off in tests
"node_name": "test-node",
},
"mesh": {
"global_flood_allow": True,
"loop_detect": "off",
},
"delays": {
"tx_delay_factor": 1.0,
"direct_tx_delay_factor": 0.5,
},
"duty_cycle": {
"max_airtime_per_minute": 3600,
"enforcement_enabled": True,
},
"radio": {
"spreading_factor": 8,
"bandwidth": 125000,
"coding_rate": 8,
"preamble_length": 17,
},
}
# Merge overrides
for key, val in overrides.items():
if isinstance(val, dict) and key in cfg:
cfg[key].update(val)
else:
cfg[key] = val
return cfg
def _make_radio():
"""Return a mock radio with sensible defaults."""
radio = MagicMock()
radio.spreading_factor = 8
radio.bandwidth = 125000
radio.coding_rate = 8
radio.preamble_length = 17
radio.frequency = 915000000
radio.tx_power = 14
return radio
def _make_dispatcher(radio=None):
"""Return a mock dispatcher with a radio and local_identity."""
dispatcher = MagicMock()
dispatcher.radio = radio or _make_radio()
dispatcher.local_identity = MagicMock()
dispatcher.send_packet = AsyncMock()
return dispatcher
@pytest.fixture()
def handler():
"""Create a RepeaterHandler with mocked external dependencies."""
config = _make_config()
dispatcher = _make_dispatcher()
with (
patch("repeater.engine.StorageCollector"),
patch("repeater.engine.RepeaterHandler._start_background_tasks"),
):
from repeater.engine import RepeaterHandler
h = RepeaterHandler(config, dispatcher, LOCAL_HASH)
return h
def _make_flood_packet(payload: bytes = b"\x01\x02\x03\x04",
path: bytes = b"",
payload_type: int = 0x01) -> Packet:
"""Build a FLOOD-routed packet."""
pkt = Packet()
# header: route=FLOOD(0x01), payload_type shifted, version=0
pkt.header = ROUTE_TYPE_FLOOD | (payload_type << PH_TYPE_SHIFT)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
pkt.path = bytearray(path)
pkt.path_len = len(path)
return pkt
def _make_direct_packet(payload: bytes = b"\x01\x02\x03\x04",
path: bytes = None,
payload_type: int = 0x01) -> Packet:
"""Build a DIRECT-routed packet with path[0] == LOCAL_HASH by default."""
if path is None:
path = bytes([LOCAL_HASH, 0xCC, 0xDD])
pkt = Packet()
pkt.header = ROUTE_TYPE_DIRECT | (payload_type << PH_TYPE_SHIFT)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
pkt.path = bytearray(path)
pkt.path_len = len(path)
return pkt
def _make_transport_flood_packet(payload: bytes = b"\x01\x02\x03\x04",
path: bytes = b"",
transport_codes=(0x1234, 0x5678)) -> Packet:
"""Build a TRANSPORT_FLOOD-routed packet."""
pkt = Packet()
pkt.header = ROUTE_TYPE_TRANSPORT_FLOOD | (0x01 << PH_TYPE_SHIFT)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
pkt.path = bytearray(path)
pkt.path_len = len(path)
pkt.transport_codes = list(transport_codes)
return pkt
def _make_transport_direct_packet(payload: bytes = b"\x01\x02\x03\x04",
path: bytes = None,
transport_codes=(0x1234, 0x5678)) -> Packet:
"""Build a TRANSPORT_DIRECT-routed packet with path[0] == LOCAL_HASH."""
if path is None:
path = bytes([LOCAL_HASH, 0xCC])
pkt = Packet()
pkt.header = ROUTE_TYPE_TRANSPORT_DIRECT | (0x01 << PH_TYPE_SHIFT)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
pkt.path = bytearray(path)
pkt.path_len = len(path)
pkt.transport_codes = list(transport_codes)
return pkt
# ===================================================================
# 1. flood_forward
# ===================================================================
class TestFloodForward:
"""flood_forward: validation, duplicate suppression, path append."""
def test_valid_flood_returns_packet(self, handler):
pkt = _make_flood_packet()
result = handler.flood_forward(pkt)
assert result is not None
assert result is pkt # mutated in-place
def test_local_hash_appended_to_path(self, handler):
pkt = _make_flood_packet(path=b"\x11\x22")
result = handler.flood_forward(pkt)
assert result.path[-1] == LOCAL_HASH
assert list(result.path) == [0x11, 0x22, LOCAL_HASH]
assert result.path_len == 3
def test_empty_path_gets_local_hash(self, handler):
pkt = _make_flood_packet(path=b"")
result = handler.flood_forward(pkt)
assert list(result.path) == [LOCAL_HASH]
assert result.path_len == 1
def test_duplicate_flood_dropped(self, handler):
pkt = _make_flood_packet()
handler.flood_forward(pkt)
pkt2 = _make_flood_packet() # same payload → same hash
result = handler.flood_forward(pkt2)
assert result is None
assert pkt2.drop_reason == "Duplicate"
def test_empty_payload_rejected(self, handler):
pkt = _make_flood_packet(payload=b"")
pkt.payload_len = 0
result = handler.flood_forward(pkt)
assert result is None
assert "Empty payload" in pkt.drop_reason
def test_none_payload_rejected(self, handler):
pkt = _make_flood_packet()
pkt.payload = None
result = handler.flood_forward(pkt)
assert result is None
def test_path_at_max_rejected(self, handler):
pkt = _make_flood_packet(path=bytes(range(MAX_PATH_SIZE)))
result = handler.flood_forward(pkt)
assert result is None
assert "Path length" in pkt.drop_reason
def test_do_not_retransmit_dropped(self, handler):
pkt = _make_flood_packet()
pkt.mark_do_not_retransmit()
result = handler.flood_forward(pkt)
assert result is None
assert "do not retransmit" in pkt.drop_reason.lower()
def test_global_flood_deny_plain_flood(self, handler):
handler.config["mesh"]["global_flood_allow"] = False
pkt = _make_flood_packet()
# When global_flood_allow=False, flood_forward calls _check_transport_codes
# which will fail because there are no transport codes on a plain flood
result = handler.flood_forward(pkt)
assert result is None
def test_hash_computed_before_path_append(self, handler):
"""mark_seen must use the pre-append hash so duplicate detection works
when another node sends the same packet with or without our hash."""
pkt1 = _make_flood_packet(payload=b"\xAA\xBB")
hash_before = pkt1.calculate_packet_hash().hex().upper()
handler.flood_forward(pkt1)
# The hash stored in seen_packets should be the PRE-append hash
assert hash_before in handler.seen_packets
def test_path_len_updated_after_append(self, handler):
pkt = _make_flood_packet(path=b"\x11")
handler.flood_forward(pkt)
assert pkt.path_len == len(pkt.path) == 2
def test_different_payloads_not_duplicate(self, handler):
pkt1 = _make_flood_packet(payload=b"\x01")
pkt2 = _make_flood_packet(payload=b"\x02")
r1 = handler.flood_forward(pkt1)
r2 = handler.flood_forward(pkt2)
assert r1 is not None
assert r2 is not None
def test_path_none_is_handled(self, handler):
pkt = _make_flood_packet()
pkt.path = None
pkt.path_len = 0
result = handler.flood_forward(pkt)
assert result is not None
assert list(result.path) == [LOCAL_HASH]
# ===================================================================
# 2. direct_forward
# ===================================================================
class TestDirectForward:
"""direct_forward: next-hop check, path consumption, duplicate suppression."""
def test_valid_direct_returns_packet(self, handler):
pkt = _make_direct_packet()
result = handler.direct_forward(pkt)
assert result is not None
def test_path_consumed(self, handler):
pkt = _make_direct_packet(path=bytes([LOCAL_HASH, 0xCC, 0xDD]))
result = handler.direct_forward(pkt)
assert list(result.path) == [0xCC, 0xDD]
assert result.path_len == 2
def test_single_hop_path_consumed(self, handler):
"""Single hop to us: we strip and return packet with empty path (forward so it can reach destination)."""
pkt = _make_direct_packet(path=bytes([LOCAL_HASH]))
result = handler.direct_forward(pkt)
assert result is not None
assert list(result.path) == []
assert result.path_len == 0
def test_wrong_next_hop_dropped(self, handler):
pkt = _make_direct_packet(path=bytes([0xFF, 0xCC]))
result = handler.direct_forward(pkt)
assert result is None
assert "not for us" in pkt.drop_reason
def test_empty_path_dropped(self, handler):
pkt = _make_direct_packet(path=b"")
pkt.path_len = 0
result = handler.direct_forward(pkt)
assert result is None
assert "no path" in pkt.drop_reason
def test_none_path_dropped(self, handler):
pkt = Packet()
pkt.header = ROUTE_TYPE_DIRECT | (0x01 << PH_TYPE_SHIFT)
pkt.payload = bytearray(b"\x01\x02")
pkt.payload_len = 2
pkt.path = None
pkt.path_len = 0
result = handler.direct_forward(pkt)
assert result is None
def test_duplicate_direct_dropped(self, handler):
pkt = _make_direct_packet()
handler.direct_forward(pkt)
pkt2 = _make_direct_packet() # same payload → same hash
result = handler.direct_forward(pkt2)
assert result is None
assert pkt2.drop_reason == "Duplicate"
def test_hash_computed_before_path_consume(self, handler):
"""mark_seen hash must match the packet as received, before path[0] removal."""
pkt = _make_direct_packet(path=bytes([LOCAL_HASH, 0xCC]))
hash_before = pkt.calculate_packet_hash().hex().upper()
handler.direct_forward(pkt)
assert hash_before in handler.seen_packets
def test_path_len_updated_after_consume(self, handler):
pkt = _make_direct_packet(path=bytes([LOCAL_HASH, 0xCC, 0xDD]))
handler.direct_forward(pkt)
assert pkt.path_len == len(pkt.path) == 2
# ===================================================================
# 3. process_packet — route dispatch
# ===================================================================
class TestProcessPacket:
"""process_packet routes to flood_forward or direct_forward."""
def test_flood_route_dispatched(self, handler):
pkt = _make_flood_packet()
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, delay = result
# Flood appends local hash
assert fwd_pkt.path[-1] == LOCAL_HASH
def test_direct_route_dispatched(self, handler):
pkt = _make_direct_packet()
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, delay = result
# Direct consumed first hop
assert LOCAL_HASH not in fwd_pkt.path
def test_transport_flood_dispatched(self, handler):
pkt = _make_transport_flood_packet()
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert fwd_pkt.path[-1] == LOCAL_HASH
def test_transport_direct_dispatched(self, handler):
pkt = _make_transport_direct_packet()
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
def test_unknown_route_type_dropped(self, handler):
"""A header with route bits = 0x03 is TRANSPORT_DIRECT which IS handled.
We hackily create a truly unknown route by patching both masks."""
pkt = _make_flood_packet()
# All 4 route types are handled. We can verify the fallback by
# ensuring flood forward fails, e.g. empty payload → returns None.
pkt.payload = None
result = handler.process_packet(pkt, snr=0.0)
assert result is None
def test_returns_tuple_with_delay(self, handler):
pkt = _make_flood_packet()
result = handler.process_packet(pkt, snr=5.0)
assert isinstance(result, tuple)
assert len(result) == 2
fwd_pkt, delay = result
assert isinstance(delay, float)
assert delay >= 0.0
def test_flood_forward_failure_returns_none(self, handler):
pkt = _make_flood_packet(payload=b"")
pkt.payload_len = 0
result = handler.process_packet(pkt, snr=0.0)
assert result is None
def test_direct_forward_failure_returns_none(self, handler):
pkt = _make_direct_packet(path=bytes([0xFF])) # wrong hop
result = handler.process_packet(pkt, snr=0.0)
assert result is None
# ===================================================================
# 4. is_duplicate / mark_seen / cache management
# ===================================================================
class TestDuplicateDetection:
"""Duplicate tracking, TTL clean-up, and cache eviction."""
def test_unseen_packet_not_duplicate(self, handler):
pkt = _make_flood_packet()
assert handler.is_duplicate(pkt) is False
def test_after_mark_seen_is_duplicate(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
assert handler.is_duplicate(pkt) is True
def test_different_packets_independent(self, handler):
pkt1 = _make_flood_packet(payload=b"\x01")
pkt2 = _make_flood_packet(payload=b"\x02")
handler.mark_seen(pkt1)
assert handler.is_duplicate(pkt1) is True
assert handler.is_duplicate(pkt2) is False
def test_cache_eviction_at_max_size(self, handler):
handler.max_cache_size = 5
packets = [_make_flood_packet(payload=bytes([i])) for i in range(6)]
for p in packets:
handler.mark_seen(p)
# Oldest (packets[0]) should have been evicted
assert handler.is_duplicate(packets[0]) is False
assert handler.is_duplicate(packets[5]) is True
def test_cleanup_cache_removes_expired(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
pkt_hash = pkt.calculate_packet_hash().hex().upper()
# Manually expire it
handler.seen_packets[pkt_hash] = time.time() - handler.cache_ttl - 1
handler.cleanup_cache()
assert handler.is_duplicate(pkt) is False
def test_cleanup_cache_keeps_fresh(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
handler.cleanup_cache()
assert handler.is_duplicate(pkt) is True
def test_mark_seen_stores_hex_upper_key(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
for key in handler.seen_packets:
assert key == key.upper()
# ===================================================================
# 5. validate_packet
# ===================================================================
class TestValidatePacket:
"""validate_packet: empty payload, oversized path."""
def test_valid_packet(self, handler):
pkt = _make_flood_packet()
valid, reason = handler.validate_packet(pkt)
assert valid is True
assert reason == ""
def test_empty_payload_fails(self, handler):
pkt = _make_flood_packet(payload=b"")
pkt.payload = None
valid, reason = handler.validate_packet(pkt)
assert valid is False
assert "Empty" in reason
def test_path_at_max_fails(self, handler):
pkt = _make_flood_packet(path=bytes(range(MAX_PATH_SIZE)))
valid, reason = handler.validate_packet(pkt)
assert valid is False
assert "MAX_PATH_SIZE" in reason
def test_path_one_below_max_passes(self, handler):
pkt = _make_flood_packet(path=bytes(range(MAX_PATH_SIZE - 1)))
valid, reason = handler.validate_packet(pkt)
assert valid is True
def test_none_packet(self, handler):
valid, reason = handler.validate_packet(None)
assert valid is False
# ===================================================================
# 6. calculate_packet_score — static method
# ===================================================================
class TestPacketScore:
"""Score: SNR thresholds, collision penalty, clamping."""
def test_below_threshold_returns_zero(self):
from repeater.engine import RepeaterHandler
# SF8 threshold is -10.0
score = RepeaterHandler.calculate_packet_score(snr=-15.0, packet_len=50, spreading_factor=8)
assert score == 0.0
def test_at_threshold_returns_zero(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=-10.0, packet_len=50, spreading_factor=8)
assert score == 0.0
def test_above_threshold_positive(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=0.0, packet_len=50, spreading_factor=8)
assert score > 0.0
def test_high_snr_high_score(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=10.0, packet_len=10, spreading_factor=8)
assert score > 0.5
def test_long_packet_collision_penalty(self):
from repeater.engine import RepeaterHandler
short = RepeaterHandler.calculate_packet_score(snr=5.0, packet_len=10, spreading_factor=8)
long_ = RepeaterHandler.calculate_packet_score(snr=5.0, packet_len=250, spreading_factor=8)
assert short > long_
def test_score_clamped_to_0_1(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=50.0, packet_len=1, spreading_factor=8)
assert 0.0 <= score <= 1.0
def test_sf_below_7_returns_zero(self):
from repeater.engine import RepeaterHandler
score = RepeaterHandler.calculate_packet_score(snr=10.0, packet_len=50, spreading_factor=6)
assert score == 0.0
def test_each_sf_has_different_threshold(self):
from repeater.engine import RepeaterHandler
scores = {}
for sf in (7, 8, 9, 10, 11, 12):
scores[sf] = RepeaterHandler.calculate_packet_score(snr=-5.0, packet_len=50, spreading_factor=sf)
# Higher SF → lower threshold → better reception at same SNR
# At SNR=-5, SF7 (threshold -7.5) should be worse than SF12 (threshold -20)
assert scores[12] > scores[7]
# ===================================================================
# 7. _calculate_tx_delay
# ===================================================================
class TestTxDelay:
"""TX delay: flood random, direct fixed, score adjustment, cap."""
def test_flood_delay_non_negative(self, handler):
pkt = _make_flood_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
assert delay >= 0.0
def test_flood_delay_capped_at_5s(self, handler):
handler.tx_delay_factor = 1000.0 # extreme multiplier
pkt = _make_flood_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
assert delay <= 5.0
def test_direct_delay_uses_factor(self, handler):
handler.direct_tx_delay_factor = 1.23
pkt = _make_direct_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
# Direct packets use direct_tx_delay_factor directly (in seconds)
# Score adjustment may change it, but base should be 1.23 when score is off
assert delay == pytest.approx(1.23, abs=0.01)
def test_score_adjustment_reduces_delay(self, handler):
handler.use_score_for_tx = True
pkt = _make_flood_packet(payload=b"\x01" * 50)
# High SNR → high score → shorter delay
delays = []
for _ in range(50):
d = handler._calculate_tx_delay(pkt, snr=10.0)
delays.append(d)
avg_high_snr = sum(delays) / len(delays)
# Low SNR → low score → longer delay
delays_low = []
for _ in range(50):
d = handler._calculate_tx_delay(pkt, snr=-5.0)
delays_low.append(d)
avg_low_snr = sum(delays_low) / len(delays_low)
# Using statistical comparison — average high-SNR delay should be lower
# (Both use random, but multiplier differs)
# This is non-deterministic; we just check score path doesn't crash
assert avg_high_snr >= 0.0
assert avg_low_snr >= 0.0
def test_zero_tx_delay_factor(self, handler):
handler.tx_delay_factor = 0.0
pkt = _make_flood_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
assert delay == 0.0
def test_transport_direct_uses_direct_delay(self, handler):
handler.direct_tx_delay_factor = 0.77
pkt = _make_transport_direct_packet()
delay = handler._calculate_tx_delay(pkt, snr=0.0)
assert delay == pytest.approx(0.77, abs=0.01)
# ===================================================================
# 8. Hash stability through forwarding operations
# ===================================================================
class TestHashStabilityThroughForwarding:
"""Verify hash is computed on original packet (before path mutation)."""
def test_flood_hash_unchanged_after_forward(self, handler):
pkt = _make_flood_packet(payload=b"\xDE\xAD")
hash_before = pkt.calculate_packet_hash().hex().upper()
handler.flood_forward(pkt)
# The hash stored should be the pre-modification hash
assert hash_before in handler.seen_packets
def test_direct_hash_unchanged_after_forward(self, handler):
pkt = _make_direct_packet(payload=b"\xBE\xEF",
path=bytes([LOCAL_HASH, 0xCC]))
hash_before = pkt.calculate_packet_hash().hex().upper()
handler.direct_forward(pkt)
assert hash_before in handler.seen_packets
def test_flood_second_identical_detected_as_duplicate(self, handler):
"""Two identical packets with the same payload (but path not yet modified)
should be correctly detected as duplicates."""
p1 = _make_flood_packet(payload=b"\xCA\xFE")
p2 = _make_flood_packet(payload=b"\xCA\xFE")
handler.flood_forward(p1)
result = handler.flood_forward(p2)
assert result is None
def test_direct_second_identical_detected_as_duplicate(self, handler):
p1 = _make_direct_packet(payload=b"\xCA\xFE",
path=bytes([LOCAL_HASH, 0x11]))
p2 = _make_direct_packet(payload=b"\xCA\xFE",
path=bytes([LOCAL_HASH, 0x11]))
handler.direct_forward(p1)
result = handler.direct_forward(p2)
assert result is None
# ===================================================================
# 9. Global flood policy
# ===================================================================
class TestGlobalFloodPolicy:
"""global_flood_allow=False blocks plain flood, transport checked."""
def test_flood_blocked_by_policy(self, handler):
handler.config["mesh"]["global_flood_allow"] = False
pkt = _make_flood_packet()
result = handler.flood_forward(pkt)
assert result is None
def test_direct_unaffected_by_flood_policy(self, handler):
handler.config["mesh"]["global_flood_allow"] = False
pkt = _make_direct_packet()
result = handler.direct_forward(pkt)
assert result is not None # direct is not blocked by flood policy
def test_transport_flood_checked_when_policy_off(self, handler):
handler.config["mesh"]["global_flood_allow"] = False
pkt = _make_transport_flood_packet()
# Will call _check_transport_codes which will fail (no storage keys)
result = handler.flood_forward(pkt)
assert result is None
class TestFloodLoopDetection:
"""MeshCore-style loop detection for flood forwarding."""
def test_loop_detect_off_allows_looped_path(self, handler):
handler.config["mesh"]["loop_detect"] = "off"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, 0x11, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is not None
def test_loop_detect_minimal_drops_at_four(self, handler):
handler.config["mesh"]["loop_detect"] = "minimal"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, LOCAL_HASH, LOCAL_HASH, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is None
assert "loop detected" in (pkt.drop_reason or "").lower()
def test_loop_detect_minimal_allows_below_threshold(self, handler):
handler.config["mesh"]["loop_detect"] = "minimal"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, LOCAL_HASH, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is not None
def test_loop_detect_moderate_drops_at_two(self, handler):
handler.config["mesh"]["loop_detect"] = "moderate"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, 0x22, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is None
def test_loop_detect_strict_drops_at_one(self, handler):
handler.config["mesh"]["loop_detect"] = "strict"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([0x33, LOCAL_HASH, 0x44]))
result = handler.flood_forward(pkt)
assert result is None
# ===================================================================
# 10. Airtime / duty-cycle integration
# ===================================================================
class TestAirtimeIntegration:
"""Airtime calculation and duty-cycle enforcement."""
def test_airtime_positive(self, handler):
airtime = handler.airtime_mgr.calculate_airtime(50)
assert airtime > 0.0
def test_can_transmit_fresh(self, handler):
can_tx, wait = handler.airtime_mgr.can_transmit(100.0)
assert can_tx is True
assert wait == 0.0
def test_cannot_transmit_after_exhaustion(self, handler):
# Fill up the budget
handler.airtime_mgr.record_tx(handler.airtime_mgr.max_airtime_per_minute)
can_tx, wait = handler.airtime_mgr.can_transmit(1.0)
assert can_tx is False
assert wait > 0.0
def test_duty_cycle_disabled(self, handler):
handler.config["duty_cycle"]["enforcement_enabled"] = False
handler.airtime_mgr.config = handler.config
handler.airtime_mgr.record_tx(999999)
can_tx, wait = handler.airtime_mgr.can_transmit(999999)
assert can_tx is True
def test_airtime_increases_with_packet_size(self, handler):
short = handler.airtime_mgr.calculate_airtime(10)
long_ = handler.airtime_mgr.calculate_airtime(200)
assert long_ > short
# ===================================================================
# 11. Config reload
# ===================================================================
class TestConfigReload:
"""reload_runtime_config updates in-memory state."""
def test_tx_delay_factor_reloaded(self, handler):
handler.config["delays"]["tx_delay_factor"] = 3.14
handler.reload_runtime_config()
assert handler.tx_delay_factor == 3.14
def test_direct_tx_delay_factor_reloaded(self, handler):
handler.config["delays"]["direct_tx_delay_factor"] = 2.5
handler.reload_runtime_config()
assert handler.direct_tx_delay_factor == 2.5
def test_use_score_for_tx_reloaded(self, handler):
handler.config["repeater"]["use_score_for_tx"] = True
handler.reload_runtime_config()
assert handler.use_score_for_tx is True
def test_cache_ttl_reloaded(self, handler):
handler.config["repeater"]["cache_ttl"] = 120
handler.reload_runtime_config()
assert handler.cache_ttl == 120
# ===================================================================
# 12. _get_drop_reason
# ===================================================================
class TestGetDropReason:
"""_get_drop_reason: determine why a packet was not forwarded."""
def test_duplicate_reason(self, handler):
pkt = _make_flood_packet()
handler.mark_seen(pkt)
reason = handler._get_drop_reason(pkt)
assert reason == "Duplicate"
def test_empty_payload_reason(self, handler):
pkt = _make_flood_packet(payload=b"")
pkt.payload_len = 0
pkt.payload = bytearray()
reason = handler._get_drop_reason(pkt)
assert "Empty" in reason
def test_path_too_long_reason(self, handler):
pkt = _make_flood_packet(path=bytes(range(MAX_PATH_SIZE)))
reason = handler._get_drop_reason(pkt)
assert "Path too long" in reason
def test_flood_policy_reason(self, handler):
handler.config["mesh"]["global_flood_allow"] = False
pkt = _make_flood_packet()
reason = handler._get_drop_reason(pkt)
assert "flood" in reason.lower()
def test_direct_not_for_us_reason(self, handler):
pkt = _make_direct_packet(path=bytes([0xFF, 0xCC]))
reason = handler._get_drop_reason(pkt)
assert "not for us" in reason
def test_direct_no_path_reason(self, handler):
pkt = _make_direct_packet(path=b"")
pkt.path_len = 0
reason = handler._get_drop_reason(pkt)
assert "no path" in reason
# ===================================================================
# 13. Transport route forwarding
# ===================================================================
class TestTransportForwarding:
"""TRANSPORT_FLOOD and TRANSPORT_DIRECT: packet routing through process_packet."""
def test_transport_flood_appends_path(self, handler):
pkt = _make_transport_flood_packet(path=b"\x11")
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert fwd_pkt.path[-1] == LOCAL_HASH
assert len(fwd_pkt.path) == 2
def test_transport_direct_consumes_path(self, handler):
pkt = _make_transport_direct_packet(path=bytes([LOCAL_HASH, 0xCC]))
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert list(fwd_pkt.path) == [0xCC]
def test_transport_codes_preserved_after_flood(self, handler):
pkt = _make_transport_flood_packet(transport_codes=(0xAAAA, 0xBBBB))
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert fwd_pkt.transport_codes == [0xAAAA, 0xBBBB]
def test_transport_codes_preserved_after_direct(self, handler):
pkt = _make_transport_direct_packet(transport_codes=(0x1111, 0x2222))
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
assert fwd_pkt.transport_codes == [0x1111, 0x2222]
# ===================================================================
# 14. Statistics tracking
# ===================================================================
class TestStatistics:
"""RX/TX/dropped counters and recent_packets list."""
def test_initial_counters_zero(self, handler):
assert handler.rx_count == 0
assert handler.forwarded_count == 0
assert handler.dropped_count == 0
def test_get_stats_returns_dict(self, handler):
with patch.object(handler, "storage", None):
stats = handler.get_stats()
assert "rx_count" in stats
assert "forwarded_count" in stats
assert "dropped_count" in stats
assert "local_hash" in stats
assert "uptime_seconds" in stats
def test_get_stats_local_hash_format(self, handler):
with patch.object(handler, "storage", None):
stats = handler.get_stats()
assert stats["local_hash"] == f"0x{LOCAL_HASH:02x}"
# ===================================================================
# 15. Edge cases and regression tests
# ===================================================================
class TestEdgeCases:
"""Miscellaneous edge cases and regressions."""
def test_path_as_list_converted_to_bytearray(self, handler):
"""flood_forward should handle path being a list (not bytearray)."""
pkt = _make_flood_packet()
pkt.path = [0x11, 0x22]
pkt.path_len = 2
result = handler.flood_forward(pkt)
assert result is not None
assert isinstance(result.path, bytearray)
assert list(result.path) == [0x11, 0x22, LOCAL_HASH]
def test_flood_forward_idempotent_on_second_call(self, handler):
"""Calling flood_forward again with the SAME packet object should
detect as duplicate (the first call already mark_seen'd it)."""
pkt = _make_flood_packet(payload=b"\xFF" * 10)
r1 = handler.flood_forward(pkt)
assert r1 is not None
# Now pkt has local_hash appended, but hash was computed pre-append.
# A new packet with same original payload should be duplicate.
pkt2 = _make_flood_packet(payload=b"\xFF" * 10)
r2 = handler.flood_forward(pkt2)
assert r2 is None
def test_large_payload_still_forwarded(self, handler):
pkt = _make_flood_packet(payload=bytes(range(256)) * 4)
result = handler.flood_forward(pkt)
assert result is not None
def test_payload_type_encoding_in_header(self, handler):
"""Verify header construction encodes payload_type correctly."""
for pt in range(16):
pkt = _make_flood_packet(payload=bytes([pt, 0xFF]), payload_type=pt)
assert pkt.get_payload_type() == pt
result = handler.flood_forward(pkt)
assert result is not None
def test_many_distinct_packets_all_forwarded(self, handler):
"""100 unique packets should all be forwarded (no false duplicates)."""
for i in range(100):
pkt = _make_flood_packet(payload=i.to_bytes(4, "big"))
result = handler.flood_forward(pkt)
assert result is not None, f"Packet {i} incorrectly detected as duplicate"
def test_cache_eviction_order_is_fifo(self, handler):
handler.max_cache_size = 3
pkts = [_make_flood_packet(payload=bytes([i])) for i in range(4)]
for p in pkts:
handler.mark_seen(p)
# First one evicted
assert handler.is_duplicate(pkts[0]) is False
# Last three still present
for p in pkts[1:]:
assert handler.is_duplicate(p) is True
def test_do_not_retransmit_with_custom_drop_reason(self, handler):
pkt = _make_flood_packet()
pkt.mark_do_not_retransmit()
pkt.drop_reason = "Custom reason"
result = handler.flood_forward(pkt)
assert result is None
# Custom reason should be preserved (not overwritten)
assert pkt.drop_reason == "Custom reason"
def test_monitor_mode_skips_processing(self, handler):
"""In monitor mode, process_packet is not called at all in __call__,
but process_packet itself doesn't check mode — that's done in __call__.
We test that process_packet works irrespective of mode."""
handler.config["repeater"]["mode"] = "monitor"
pkt = _make_flood_packet()
# process_packet doesn't check mode — should still work
result = handler.process_packet(pkt, snr=0.0)
assert result is not None
# ===================================================================
# 15b. TX mode: forward, monitor, no_tx
# ===================================================================
@pytest.mark.asyncio
class TestTxMode:
"""forward = repeat on; monitor = no repeat, local TX allowed; no_tx = all TX off."""
async def test_forward_mode_calls_process_packet_for_rx(self, handler):
"""In forward mode, a received packet (not local) triggers process_packet."""
handler.config["repeater"]["mode"] = "forward"
pkt = _make_flood_packet()
with patch.object(handler, "process_packet", wraps=handler.process_packet) as m:
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=False)
m.assert_called_once()
async def test_monitor_mode_does_not_call_process_packet_for_rx(self, handler):
"""In monitor mode, a received packet does not trigger process_packet."""
handler.config["repeater"]["mode"] = "monitor"
pkt = _make_flood_packet()
with patch.object(handler, "process_packet") as m:
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=False)
m.assert_not_called()
async def test_no_tx_mode_does_not_call_process_packet_for_rx(self, handler):
"""In no_tx mode, a received packet does not trigger process_packet."""
handler.config["repeater"]["mode"] = "no_tx"
pkt = _make_flood_packet()
with patch.object(handler, "process_packet") as m:
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=False)
m.assert_not_called()
async def test_monitor_mode_allows_local_tx(self, handler):
"""In monitor mode, local_transmission=True still schedules send_packet."""
handler.config["repeater"]["mode"] = "monitor"
pkt = _make_flood_packet()
with patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock):
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=True)
await asyncio.sleep(0) # flush scheduled task
handler.dispatcher.send_packet.assert_called_once()
async def test_no_tx_mode_blocks_local_tx(self, handler):
"""In no_tx mode, local_transmission=True does not schedule send_packet."""
handler.config["repeater"]["mode"] = "no_tx"
pkt = _make_flood_packet()
with patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock):
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=True)
await asyncio.sleep(0)
handler.dispatcher.send_packet.assert_not_called()
async def test_forward_mode_allows_local_tx(self, handler):
"""In forward mode, local_transmission=True schedules send_packet."""
handler.config["repeater"]["mode"] = "forward"
pkt = _make_flood_packet()
with patch("repeater.engine.asyncio.sleep", new_callable=AsyncMock):
await handler(pkt, {"snr": 0.0, "rssi": -80}, local_transmission=True)
await asyncio.sleep(0)
handler.dispatcher.send_packet.assert_called_once()
# ===================================================================
# 16. Airtime calculation correctness
# ===================================================================
class TestAirtimeCalculation:
"""Semtech LoRa airtime formula validation."""
def test_known_airtime_sf7_125khz(self, handler):
"""SF7, 125kHz, CR4/5, 10-byte payload — well-known reference value."""
mgr = handler.airtime_mgr
# Override to known settings
at = mgr.calculate_airtime(10, spreading_factor=7, bandwidth_hz=125000,
coding_rate=5, preamble_len=8)
# Semtech calculator: ~36ms for these params
assert 30.0 < at < 50.0
def test_sf12_much_slower_than_sf7(self, handler):
mgr = handler.airtime_mgr
at7 = mgr.calculate_airtime(50, spreading_factor=7)
at12 = mgr.calculate_airtime(50, spreading_factor=12)
# SF12 is roughly 32x slower than SF7 per symbol
assert at12 > at7 * 10
def test_zero_payload_still_has_preamble(self, handler):
mgr = handler.airtime_mgr
at = mgr.calculate_airtime(0)
assert at > 0.0
# ===================================================================
# 17. Curated good-packet and bad-packet arrays
# ===================================================================
# ---- 20 GOOD packets: all should be forwarded by process_packet ----
GOOD_PACKETS = [
# (id, description, builder)
("good_flood_minimal",
"Flood, 1-byte payload, empty path",
lambda: _make_flood_packet(payload=b"\x01")),
("good_flood_typical",
"Flood, 10-byte payload, 2-hop path",
lambda: _make_flood_packet(payload=bytes(range(10)), path=b"\x11\x22")),
("good_flood_max_payload_type",
"Flood, payload_type=15 (max 4-bit)",
lambda: _make_flood_packet(payload=b"\xAA\xBB", payload_type=15)),
("good_flood_payload_type_0",
"Flood, payload_type=0 (plain text)",
lambda: _make_flood_packet(payload=b"\x01\x02\x03", payload_type=0)),
("good_flood_long_payload",
"Flood, 200-byte payload",
lambda: _make_flood_packet(payload=bytes(range(200)))),
("good_flood_single_byte_path",
"Flood, path has 1 prior hop",
lambda: _make_flood_packet(payload=b"\xDE\xAD", path=b"\x42")),
("good_flood_binary_payload",
"Flood, all-zero payload",
lambda: _make_flood_packet(payload=b"\x00" * 16)),
("good_flood_high_entropy",
"Flood, high-entropy random-looking payload",
lambda: _make_flood_packet(payload=bytes(i ^ 0xA5 for i in range(64)))),
("good_flood_advert_type",
"Flood, payload_type=4 (ADVERT)",
lambda: _make_flood_packet(payload=b"\xAB\x01\x02\x03", payload_type=4)),
("good_direct_minimal",
"Direct, 1-byte payload, single hop to us (forward with empty path)",
lambda: _make_direct_packet(payload=b"\x01", path=bytes([LOCAL_HASH]))),
("good_direct_multihop",
"Direct, 3-hop remaining path (us + 2 more)",
lambda: _make_direct_packet(payload=b"\xCA\xFE", path=bytes([LOCAL_HASH, 0x11, 0x22]))),
("good_direct_long_payload",
"Direct, 150-byte payload",
lambda: _make_direct_packet(payload=bytes(range(150)), path=bytes([LOCAL_HASH, 0xBB]))),
("good_direct_type_2",
"Direct, payload_type=2 (ACK)",
lambda: _make_direct_packet(payload=b"\x01\x02", path=bytes([LOCAL_HASH]),
payload_type=2)),
("good_direct_long_remaining_path",
"Direct, 10 hops remaining after us",
lambda: _make_direct_packet(payload=b"\xFF\xEE",
path=bytes([LOCAL_HASH] + list(range(10))))),
("good_transport_flood_basic",
"Transport flood, basic payload + transport codes",
lambda: _make_transport_flood_packet(payload=b"\x01\x02\x03\x04")),
("good_transport_flood_with_path",
"Transport flood, existing 3-hop path",
lambda: _make_transport_flood_packet(payload=b"\xAA\xBB", path=b"\x11\x22\x33")),
("good_transport_flood_max_codes",
"Transport flood, max uint16 transport codes",
lambda: _make_transport_flood_packet(payload=b"\xFF",
transport_codes=(0xFFFF, 0xFFFF))),
("good_transport_direct_basic",
"Transport direct, basic hop to us",
lambda: _make_transport_direct_packet(payload=b"\x01\x02")),
("good_transport_direct_long_path",
"Transport direct, 5 remaining hops",
lambda: _make_transport_direct_packet(
payload=b"\xDE\xAD\xBE\xEF",
path=bytes([LOCAL_HASH, 0x11, 0x22, 0x33, 0x44]))),
]
# ---- 20 BAD packets: all should be dropped / return None ----
BAD_PACKETS = [
# (id, description, builder)
("bad_empty_payload",
"Empty bytearray payload",
lambda: _make_flood_packet(payload=b""),
"Empty payload"),
("bad_none_payload",
"payload = None",
lambda: (lambda p: (setattr(p, "payload", None), p)[-1])(_make_flood_packet()),
"Empty payload"),
("bad_path_at_max",
"Path exactly MAX_PATH_SIZE — no room to append",
lambda: _make_flood_packet(payload=b"\x01", path=bytes(range(MAX_PATH_SIZE))),
"Path length"),
("bad_flood_path_near_max",
"Flood, path = MAX_PATH_SIZE - 1 (63 hops; path_len encodes 0-63, cannot append)",
lambda: _make_flood_packet(payload=b"\xFF", path=bytes(range(MAX_PATH_SIZE - 1))),
"cannot append"),
("bad_path_over_max",
"Path exceeds MAX_PATH_SIZE",
lambda: _make_flood_packet(payload=b"\x01", path=bytes(range(MAX_PATH_SIZE + 5))),
"Path length"),
("bad_do_not_retransmit",
"Marked do-not-retransmit",
lambda: (lambda p: (p.mark_do_not_retransmit(), p)[-1])(_make_flood_packet()),
"do not retransmit"),
("bad_direct_wrong_hop",
"Direct packet, path[0] != LOCAL_HASH",
lambda: _make_direct_packet(path=bytes([0xFF, 0xCC])),
"not for us"),
("bad_direct_empty_path",
"Direct packet with empty path",
lambda: _make_direct_packet(path=b""),
"no path"),
("bad_direct_none_path",
"Direct packet with path = None",
lambda: (lambda p: (setattr(p, "path", None), setattr(p, "path_len", 0), p)[-1])(
_make_direct_packet()),
"no path"),
("bad_flood_policy_off",
"Plain flood when global_flood_allow=False (needs config override)",
lambda: _make_flood_packet(payload=b"\x01\x02"),
"transport codes"),
("bad_transport_flood_policy_off",
"Transport flood when policy off (no valid transport key)",
lambda: _make_transport_flood_packet(payload=b"\x01\x02"),
"transport"),
("bad_direct_empty_payload",
"Direct with empty payload (now caught by validate_packet)",
lambda: (lambda p: (setattr(p, "payload", bytearray()), setattr(p, "payload_len", 0), p)[-1])(
_make_direct_packet(path=bytes([LOCAL_HASH]))),
"Empty payload"),
("bad_flood_zero_len_payload",
"Flood with payload_len forced to 0",
lambda: (lambda p: (setattr(p, "payload_len", 0), setattr(p, "payload", bytearray()), p)[-1])(
_make_flood_packet(payload=b"\x01")),
"Empty payload"),
("bad_direct_only_wrong_hops",
"Direct path of all 0xFF bytes (none match LOCAL_HASH)",
lambda: _make_direct_packet(path=bytes([0xFF, 0xFE, 0xFD])),
"not for us"),
("bad_transport_direct_wrong_hop",
"Transport direct with wrong first hop",
lambda: _make_transport_direct_packet(path=bytes([0x01, 0x02])),
"not for us"),
("bad_transport_direct_empty_path",
"Transport direct with empty path",
lambda: _make_transport_direct_packet(path=b""),
"no path"),
("bad_transport_direct_none_path",
"Transport direct with path = None",
lambda: (lambda p: (setattr(p, "path", None), setattr(p, "path_len", 0), p)[-1])(
_make_transport_direct_packet()),
"no path"),
("bad_flood_payload_255_zeros",
"Flood with payload = bytearray(0) (empty)",
lambda: (lambda p: (setattr(p, "payload", bytearray()), setattr(p, "payload_len", 0), p)[-1])(
_make_flood_packet()),
"Empty payload"),
("bad_direct_none_payload",
"Direct with None payload (now caught by validate_packet)",
lambda: (lambda p: (setattr(p, "payload", None), p)[-1])(
_make_direct_packet(path=bytes([LOCAL_HASH]))),
"Empty payload"),
("bad_flood_do_not_retransmit_custom",
"Flood, do-not-retransmit with custom drop reason",
lambda: (lambda p: (p.mark_do_not_retransmit(), setattr(p, "drop_reason", "Advert consumed"), p)[-1])(
_make_flood_packet(payload=b"\xAB")),
"Advert consumed"),
("bad_direct_do_not_retransmit",
"Direct, marked do-not-retransmit (now caught by direct_forward)",
lambda: (lambda p: (p.mark_do_not_retransmit(), p)[-1])(
_make_direct_packet(payload=b"\x99", path=bytes([LOCAL_HASH, 0x11]))),
"do not retransmit"),
]
# Pytest ids for readable output
_good_ids = [g[0] for g in GOOD_PACKETS]
_bad_ids = [b[0] for b in BAD_PACKETS]
class TestGoodPacketArray:
"""All 20 good packets should be forwarded successfully."""
@pytest.mark.parametrize(
"name, desc, builder", GOOD_PACKETS, ids=_good_ids,
)
def test_process_packet_forwards(self, handler, name, desc, builder):
pkt = builder()
result = handler.process_packet(pkt, snr=5.0)
assert result is not None, f"[{name}] {desc} — unexpectedly dropped"
fwd_pkt, delay = result
assert delay >= 0.0
@pytest.mark.parametrize(
"name, desc, builder", GOOD_PACKETS, ids=_good_ids,
)
def test_good_packet_not_duplicate_on_first_see(self, handler, name, desc, builder):
pkt = builder()
assert handler.is_duplicate(pkt) is False, f"[{name}] falsely flagged as duplicate"
@pytest.mark.parametrize(
"name, desc, builder", GOOD_PACKETS, ids=_good_ids,
)
def test_good_packet_path_modified(self, handler, name, desc, builder):
pkt = builder()
route = pkt.header & PH_ROUTE_MASK
original_path = list(pkt.path) if pkt.path else []
result = handler.process_packet(pkt, snr=5.0)
assert result is not None
fwd_pkt, _ = result
if route in (ROUTE_TYPE_FLOOD, ROUTE_TYPE_TRANSPORT_FLOOD):
assert fwd_pkt.path[-1] == LOCAL_HASH, f"[{name}] local hash not appended"
assert len(fwd_pkt.path) == len(original_path) + 1
else:
# Direct: first hop consumed
assert len(fwd_pkt.path) == len(original_path) - 1
class TestBadPacketArray:
"""All 20 bad packets should be dropped by the engine."""
@pytest.mark.parametrize(
"name, desc, builder, expected_reason",
BAD_PACKETS, ids=_bad_ids,
)
def test_process_packet_drops(self, handler, name, desc, builder, expected_reason):
# Two entries need global_flood_allow=False
if "policy_off" in name:
handler.config["mesh"]["global_flood_allow"] = False
pkt = builder()
result = handler.process_packet(pkt, snr=5.0)
assert result is None, f"[{name}] {desc} — should have been dropped"
@pytest.mark.parametrize(
"name, desc, builder, expected_reason",
BAD_PACKETS, ids=_bad_ids,
)
def test_drop_reason_set(self, handler, name, desc, builder, expected_reason):
if "policy_off" in name:
handler.config["mesh"]["global_flood_allow"] = False
pkt = builder()
handler.process_packet(pkt, snr=5.0)
assert pkt.drop_reason is not None, f"[{name}] drop_reason not set"
assert expected_reason.lower() in pkt.drop_reason.lower(), (
f"[{name}] expected '{expected_reason}' in drop_reason, got '{pkt.drop_reason}'"
)
@pytest.mark.parametrize(
"name, desc, builder, expected_reason",
BAD_PACKETS, ids=_bad_ids,
)
def test_bad_packet_not_marked_seen(self, handler, name, desc, builder, expected_reason):
"""Dropped packets must NOT pollute the seen cache."""
if "policy_off" in name:
handler.config["mesh"]["global_flood_allow"] = False
pkt = builder()
handler.process_packet(pkt, snr=5.0)
# Should not be in seen_packets (except do-not-retransmit and policy
# packets which fail AFTER validation but BEFORE mark_seen)
# Actually none of these should be marked seen — the engine only
# calls mark_seen on packets that pass all checks.
pkt_hash = pkt.calculate_packet_hash().hex().upper() if pkt.payload else None
if pkt_hash:
assert pkt_hash not in handler.seen_packets, (
f"[{name}] bad packet was incorrectly added to seen cache"
)
class TestRecordPacketOnlyTrace:
"""record_packet_only must not log TRACE: TraceHelper owns trace path; packet.path is SNR."""
def test_record_packet_only_skips_trace(self, handler):
storage = handler.storage
storage.record_packet.reset_mock()
pkt = PacketBuilder.create_trace(tag=1, auth_code=2, flags=0, path=[0xAB, 0xCD])
n_before = len(handler.recent_packets)
handler.record_packet_only(pkt, {"rssi": -80, "snr": 10.0})
storage.record_packet.assert_not_called()
assert len(handler.recent_packets) == n_before