add loop detection configuration and tests for flood routing

This commit is contained in:
Lloyd
2026-03-05 14:06:28 +00:00
parent c54efa3412
commit 136af19178
3 changed files with 98 additions and 0 deletions

View File

@@ -115,6 +115,10 @@ mesh:
# Individual transport keys can override this setting
global_flood_allow: true
# Flood loop detection mode
# off = disabled, minimal = allow up to 3 self-hashes, moderate = allow up to 1, strict = allow 0
loop_detect: minimal
# Multiple Identity Configuration (Optional)
# Define additional identities for the repeater to manage
# Each identity operates independently with its own key pair and configuration

View File

@@ -29,6 +29,20 @@ logger = logging.getLogger("RepeaterHandler")
NOISE_FLOOR_INTERVAL = 30.0 # seconds
LOOP_DETECT_OFF = "off"
LOOP_DETECT_MINIMAL = "minimal"
LOOP_DETECT_MODERATE = "moderate"
LOOP_DETECT_STRICT = "strict"
# Thresholds for 1-byte path hashes loop detection.
# Count how many times our own hash already exists in the incoming FLOOD path.
# If occurrences >= threshold, treat as loop and drop.
LOOP_DETECT_MAX_COUNTERS = {
LOOP_DETECT_MINIMAL: 4,
LOOP_DETECT_MODERATE: 2,
LOOP_DETECT_STRICT: 1,
}
class RepeaterHandler(BaseHandler):
@@ -55,6 +69,9 @@ class RepeaterHandler(BaseHandler):
"send_advert_interval_hours", 10
)
self.last_advert_time = time.time()
self.loop_detect_mode = self._normalize_loop_detect_mode(
config.get("mesh", {}).get("loop_detect", LOOP_DETECT_OFF)
)
radio = dispatcher.radio if dispatcher else None
if radio:
@@ -401,6 +418,34 @@ class RepeaterHandler(BaseHandler):
return True, ""
def _normalize_loop_detect_mode(self, mode) -> str:
if isinstance(mode, str):
normalized = mode.strip().lower()
if normalized in {
LOOP_DETECT_OFF,
LOOP_DETECT_MINIMAL,
LOOP_DETECT_MODERATE,
LOOP_DETECT_STRICT,
}:
return normalized
return LOOP_DETECT_OFF
def _get_loop_detect_mode(self) -> str:
return self.loop_detect_mode
def _is_flood_looped(self, packet: Packet, mode: Optional[str] = None) -> bool:
mode = mode or self._get_loop_detect_mode()
if mode == LOOP_DETECT_OFF:
return False
max_counter = LOOP_DETECT_MAX_COUNTERS.get(mode)
if max_counter is None:
return False
path = packet.path or bytearray()
local_count = sum(1 for hop in path if hop == self.local_hash)
return local_count >= max_counter
def _check_transport_codes(self, packet: Packet) -> Tuple[bool, str]:
if not self.storage:
@@ -510,6 +555,11 @@ class RepeaterHandler(BaseHandler):
packet.drop_reason = "Global flood policy disabled"
return None
mode = self._get_loop_detect_mode()
if self._is_flood_looped(packet, mode):
packet.drop_reason = f"FLOOD loop detected ({mode})"
return None
# Suppress duplicates
if self.is_duplicate(packet):
packet.drop_reason = "Duplicate"
@@ -858,6 +908,9 @@ class RepeaterHandler(BaseHandler):
self.score_threshold = repeater_config.get("score_threshold", 0.3)
self.send_advert_interval_hours = repeater_config.get("send_advert_interval_hours", 10)
self.cache_ttl = repeater_config.get("cache_ttl", 60)
self.loop_detect_mode = self._normalize_loop_detect_mode(
self.config.get("mesh", {}).get("loop_detect", LOOP_DETECT_OFF)
)
# Note: Radio config changes require restart as they affect hardware
# Note: Airtime manager has its own config reference that gets updated

View File

@@ -46,6 +46,7 @@ def _make_config(**overrides) -> dict:
},
"mesh": {
"global_flood_allow": True,
"loop_detect": "off",
},
"delays": {
"tx_delay_factor": 1.0,
@@ -679,6 +680,46 @@ class TestGlobalFloodPolicy:
assert result is None
class TestFloodLoopDetection:
"""MeshCore-style loop detection for flood forwarding."""
def test_loop_detect_off_allows_looped_path(self, handler):
handler.config["mesh"]["loop_detect"] = "off"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, 0x11, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is not None
def test_loop_detect_minimal_drops_at_four(self, handler):
handler.config["mesh"]["loop_detect"] = "minimal"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, LOCAL_HASH, LOCAL_HASH, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is None
assert "loop detected" in (pkt.drop_reason or "").lower()
def test_loop_detect_minimal_allows_below_threshold(self, handler):
handler.config["mesh"]["loop_detect"] = "minimal"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, LOCAL_HASH, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is not None
def test_loop_detect_moderate_drops_at_two(self, handler):
handler.config["mesh"]["loop_detect"] = "moderate"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([LOCAL_HASH, 0x22, LOCAL_HASH]))
result = handler.flood_forward(pkt)
assert result is None
def test_loop_detect_strict_drops_at_one(self, handler):
handler.config["mesh"]["loop_detect"] = "strict"
handler.reload_runtime_config()
pkt = _make_flood_packet(path=bytes([0x33, LOCAL_HASH, 0x44]))
result = handler.flood_forward(pkt)
assert result is None
# ===================================================================
# 10. Airtime / duty-cycle integration
# ===================================================================