Files
pyMC_Repeater/repeater/engine.py
agessaman 744826199e feat: implement duplicate packet recording for UI visibility in RepeaterHandler
- Added record_duplicate method to RepeaterHandler to log known duplicate packets without forwarding.
- Enhanced RepeaterDaemon to subscribe to raw packets for deduplication logging, ensuring all path variants are visible in the UI.
- Updated recent_packets management to group duplicates under their original packets for better tracking.
2026-03-23 17:02:29 -07:00

1296 lines
52 KiB
Python

import asyncio
import copy
import logging
import struct
import time
from collections import OrderedDict
from typing import Optional, Tuple
from pymc_core.node.handlers.base import BaseHandler
from pymc_core.protocol import Packet
from pymc_core.protocol.constants import (
MAX_PATH_SIZE,
PAYLOAD_TYPE_ADVERT,
PAYLOAD_TYPE_ANON_REQ,
PAYLOAD_TYPE_TRACE,
PH_ROUTE_MASK,
PH_TYPE_MASK,
PH_TYPE_SHIFT,
ROUTE_TYPE_DIRECT,
ROUTE_TYPE_FLOOD,
ROUTE_TYPE_TRANSPORT_DIRECT,
ROUTE_TYPE_TRANSPORT_FLOOD,
)
from pymc_core.protocol.packet_utils import PacketHeaderUtils, PacketTimingUtils, PathUtils
from repeater.airtime import AirtimeManager
from repeater.data_acquisition import StorageCollector
logger = logging.getLogger("RepeaterHandler")
NOISE_FLOOR_INTERVAL = 30.0 # seconds
LOOP_DETECT_OFF = "off"
LOOP_DETECT_MINIMAL = "minimal"
LOOP_DETECT_MODERATE = "moderate"
LOOP_DETECT_STRICT = "strict"
# Thresholds for 1-byte path hashes loop detection.
# Count how many times our own hash already exists in the incoming FLOOD path.
# If occurrences >= threshold, treat as loop and drop.
LOOP_DETECT_MAX_COUNTERS = {
LOOP_DETECT_MINIMAL: 4,
LOOP_DETECT_MODERATE: 2,
LOOP_DETECT_STRICT: 1,
}
class RepeaterHandler(BaseHandler):
@staticmethod
def payload_type() -> int:
return 0xFF # Special marker (not a real payload type)
def __init__(self, config: dict, dispatcher, local_hash: int, *, local_hash_bytes=None, send_advert_func=None):
self.config = config
self.dispatcher = dispatcher
self.local_hash = local_hash
self.local_hash_bytes = local_hash_bytes or bytes([local_hash])
self.send_advert_func = send_advert_func
self.airtime_mgr = AirtimeManager(config)
self.seen_packets = OrderedDict()
self.cache_ttl = max(
300, config.get("repeater", {}).get("cache_ttl", 3600)
) # Min 5 min, default 1 hour
self.max_cache_size = 1000
self.max_duplicates_per_packet = 20
self.tx_delay_factor = config.get("delays", {}).get("tx_delay_factor", 1.0)
self.direct_tx_delay_factor = config.get("delays", {}).get("direct_tx_delay_factor", 0.5)
self.use_score_for_tx = config.get("repeater", {}).get("use_score_for_tx", False)
self.score_threshold = config.get("repeater", {}).get("score_threshold", 0.3)
self.send_advert_interval_hours = config.get("repeater", {}).get(
"send_advert_interval_hours", 10
)
self.last_advert_time = time.time()
self.loop_detect_mode = self._normalize_loop_detect_mode(
config.get("mesh", {}).get("loop_detect", LOOP_DETECT_OFF)
)
radio = dispatcher.radio if dispatcher else None
if radio:
self.radio_config = {
"spreading_factor": getattr(radio, "spreading_factor", 8),
"bandwidth": getattr(radio, "bandwidth", 125000),
"coding_rate": getattr(radio, "coding_rate", 8),
"preamble_length": getattr(radio, "preamble_length", 17),
"frequency": getattr(radio, "frequency", 915000000),
"tx_power": getattr(radio, "tx_power", 14),
}
logger.info(
f"radio settings: SF={self.radio_config['spreading_factor']}, "
f"BW={self.radio_config['bandwidth']}Hz, CR={self.radio_config['coding_rate']}"
)
else:
raise RuntimeError("Radio object not available - cannot initialize repeater")
# Statistics tracking for dashboard
self.rx_count = 0
self.forwarded_count = 0
self.dropped_count = 0
self.recent_packets = []
self.max_recent_packets = 50
self.start_time = time.time()
# Flood/direct and duplicate counters (for GET_STATUS / firmware RepeaterStats)
self.recv_flood_count = 0
self.recv_direct_count = 0
self.sent_flood_count = 0
self.sent_direct_count = 0
self.flood_dup_count = 0
self.direct_dup_count = 0
# Storage collector for persistent packet logging
try:
local_identity = dispatcher.local_identity if dispatcher else None
self.storage = StorageCollector(config, local_identity, repeater_handler=self)
logger.info("StorageCollector initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize StorageCollector: {e}")
self.storage = None
# Initialize background timer tracking
self.last_noise_measurement = time.time()
self.last_cache_cleanup = time.time()
self.last_db_cleanup = time.time()
self.noise_floor_interval = NOISE_FLOOR_INTERVAL # 30 seconds
self._background_task = None
self._last_crc_error_count = 0 # Track radio counter for delta persistence
# Cache transport keys for efficient lookup
self._transport_keys_cache = None
self._transport_keys_cache_time = 0
self._transport_keys_cache_ttl = 60 # Cache for 60 seconds
self._start_background_tasks()
async def __call__(
self, packet: Packet, metadata: Optional[dict] = None, local_transmission: bool = False
) -> None:
if metadata is None:
metadata = {}
# Only count as receive when packet came from the radio (not locally injected)
if not local_transmission:
self.rx_count += 1
route_type = packet.header & PH_ROUTE_MASK
if route_type in (ROUTE_TYPE_FLOOD, ROUTE_TYPE_TRANSPORT_FLOOD):
self.recv_flood_count += 1
elif route_type in (ROUTE_TYPE_DIRECT, ROUTE_TYPE_TRANSPORT_DIRECT):
self.recv_direct_count += 1
try:
rx_airtime_ms = self.airtime_mgr.calculate_airtime(packet.get_raw_length())
self.airtime_mgr.record_rx(rx_airtime_ms)
except Exception:
pass
route_type = packet.header & PH_ROUTE_MASK
# TX mode: forward (repeat on), monitor (no repeat, tenants can TX), no_tx (all TX off)
mode = self.config.get("repeater", {}).get("mode", "forward")
if mode not in ("forward", "monitor", "no_tx"):
mode = "forward"
allow_forward = mode == "forward"
allow_local_tx = mode != "no_tx"
logger.debug(
f"RX packet: header=0x{packet.header:02x}, payload_len={len(packet.payload or b'')}, "
f"path_len={len(packet.path) if packet.path else 0}, "
f"rssi={metadata.get('rssi', 'N/A')}, snr={metadata.get('snr', 'N/A')}, mode={mode}"
)
# clone the packet to avoid modifying the original
processed_packet = copy.deepcopy(packet)
snr = metadata.get("snr", 0.0)
rssi = metadata.get("rssi", 0)
transmitted = False
tx_delay_ms = 0.0
drop_reason = None
lbt_attempts = 0
lbt_backoff_delays_ms = None
lbt_channel_busy = False
original_path_hashes = packet.get_path_hashes_hex()
path_hash_size = packet.get_path_hash_size()
# Process for forwarding (skip if repeat disabled or if this is a local transmission)
result = (
None
if (not allow_forward or local_transmission)
else self.process_packet(processed_packet, snr)
)
forwarded_path_hashes = None
# For local transmissions, create a direct transmission result (if local TX allowed)
if local_transmission and allow_local_tx:
# Mark local packet as seen to prevent duplicate processing when received back
self.mark_seen(packet)
# Calculate transmission delay for local packets
delay = self._calculate_tx_delay(packet, snr)
result = (packet, delay)
forwarded_path_hashes = packet.get_path_hashes_hex()
logger.debug(f"Local transmission: calculated delay {delay:.3f}s")
if result:
fwd_pkt, delay = result
tx_delay_ms = delay * 1000.0
# Capture the forwarded path (after modification)
forwarded_path_hashes = fwd_pkt.get_path_hashes_hex()
# Check duty-cycle before scheduling TX
airtime_ms = self.airtime_mgr.calculate_airtime(fwd_pkt.get_raw_length())
can_tx, wait_time = self.airtime_mgr.can_transmit(airtime_ms)
# LBT metadata (set after any TX path that awaits send)
tx_metadata = None
lbt_attempts = 0
lbt_backoff_delays_ms = None
lbt_channel_busy = False
if not can_tx:
if local_transmission:
# Defer local TX until duty cycle allows instead of dropping
deferred_delay = delay + wait_time
logger.info(
f"Duty-cycle limit: deferring local TX by {wait_time:.1f}s "
f"(airtime={airtime_ms:.1f}ms)"
)
self.forwarded_count += 1
transmitted = True
tx_task = await self.schedule_retransmit(
fwd_pkt, deferred_delay, airtime_ms, local_transmission=True
)
try:
await tx_task
except Exception as e:
self.forwarded_count -= 1
transmitted = False
drop_reason = "TX failed (deferred)"
logger.warning(f"Deferred local TX failed: {e}")
raise
tx_metadata = getattr(fwd_pkt, "_tx_metadata", None)
if tx_metadata:
lbt_attempts = tx_metadata.get("lbt_attempts", 0)
lbt_backoff_delays_ms = tx_metadata.get(
"lbt_backoff_delays_ms", []
)
lbt_channel_busy = tx_metadata.get("lbt_channel_busy", False)
if lbt_attempts > 0:
total_lbt_delay = sum(lbt_backoff_delays_ms)
logger.info(
f"LBT: {lbt_attempts} attempts, "
f"{total_lbt_delay:.0f}ms delay, "
f"backoffs={lbt_backoff_delays_ms}"
)
else:
logger.warning(
f"Duty-cycle limit exceeded. Airtime={airtime_ms:.1f}ms, "
f"wait={wait_time:.1f}s before retry"
)
self.dropped_count += 1
drop_reason = "Duty cycle limit"
else:
self.forwarded_count += 1
transmitted = True
tx_task = await self.schedule_retransmit(
fwd_pkt, delay, airtime_ms, local_transmission=local_transmission
)
try:
await tx_task
except Exception as e:
self.forwarded_count -= 1
transmitted = False
drop_reason = "TX failed"
logger.warning(f"Local TX failed: {e}")
raise
tx_metadata = getattr(fwd_pkt, "_tx_metadata", None)
if tx_metadata:
lbt_attempts = tx_metadata.get("lbt_attempts", 0)
lbt_backoff_delays_ms = tx_metadata.get("lbt_backoff_delays_ms", [])
lbt_channel_busy = tx_metadata.get("lbt_channel_busy", False)
if lbt_attempts > 0:
total_lbt_delay = sum(lbt_backoff_delays_ms)
logger.info(
f"LBT: {lbt_attempts} attempts, {total_lbt_delay:.0f}ms delay, "
f"backoffs={lbt_backoff_delays_ms}"
)
else:
self.dropped_count += 1
# Determine drop reason
if local_transmission and not allow_local_tx:
drop_reason = "No TX mode"
elif not allow_forward:
drop_reason = "Repeat disabled"
else:
# Check if packet has a specific drop reason set by handlers
drop_reason = processed_packet.drop_reason or self._get_drop_reason(
processed_packet
)
logger.debug(f"Packet not forwarded: {drop_reason}")
# Extract packet type and route from header
if not hasattr(packet, "header") or packet.header is None:
logger.error(f"Packet missing header attribute! Packet: {packet}")
payload_type = 0
route_type = 0
else:
header_info = PacketHeaderUtils.parse_header(packet.header)
payload_type = header_info["payload_type"]
route_type = header_info["route_type"]
logger.debug(
f"Packet header=0x{packet.header:02x}, type={payload_type}, route={route_type}"
)
# Check if this is a duplicate
pkt_hash = packet.calculate_packet_hash().hex().upper()
is_dupe = pkt_hash in self.seen_packets and not transmitted
# Set drop reason for duplicates and count flood vs direct dups
if is_dupe and drop_reason is None:
drop_reason = "Duplicate"
if is_dupe:
if route_type in (ROUTE_TYPE_FLOOD, ROUTE_TYPE_TRANSPORT_FLOOD):
self.flood_dup_count += 1
elif route_type in (ROUTE_TYPE_DIRECT, ROUTE_TYPE_TRANSPORT_DIRECT):
self.direct_dup_count += 1
display_hashes = (
original_path_hashes if original_path_hashes else packet.get_path_hashes_hex()
)
path_hash = self._path_hash_display(display_hashes)
src_hash, dst_hash = self._packet_record_src_dst(packet, payload_type)
# Record packet for charts
packet_record = self._build_packet_record(
packet,
payload_type,
route_type,
rssi,
snr,
original_path_hashes,
path_hash_size,
path_hash,
src_hash,
dst_hash,
transmitted=transmitted,
drop_reason=drop_reason,
is_duplicate=is_dupe,
forwarded_path=forwarded_path_hashes,
tx_delay_ms=tx_delay_ms,
lbt_attempts=lbt_attempts,
lbt_backoff_delays_ms=lbt_backoff_delays_ms,
lbt_channel_busy=lbt_channel_busy,
)
# Store packet record to persistent storage
# Skip LetsMesh only for invalid packets (not duplicates or operational drops)
if self.storage:
try:
# Only skip LetsMesh for actual invalid/bad packets
invalid_reasons = ["Invalid advert packet", "Empty payload", "Path too long"]
skip_letsmesh = drop_reason in invalid_reasons if drop_reason else False
self.storage.record_packet(packet_record, skip_letsmesh_if_invalid=skip_letsmesh)
except Exception as e:
logger.error(f"Failed to store packet record: {e}")
# If this is a duplicate, try to attach it to the original packet
if is_dupe and len(self.recent_packets) > 0:
# Find the original packet with same hash
for idx in range(len(self.recent_packets) - 1, -1, -1):
prev_pkt = self.recent_packets[idx]
if prev_pkt.get("packet_hash") == packet_record["packet_hash"]:
# Add duplicate to original packet's duplicate list
if "duplicates" not in prev_pkt:
prev_pkt["duplicates"] = []
if len(prev_pkt["duplicates"]) < self.max_duplicates_per_packet:
prev_pkt["duplicates"].append(packet_record)
# Don't add duplicate to main list, just track in original
break
else:
# Original not found, add as regular packet
self.recent_packets.append(packet_record)
else:
# Not a duplicate or first occurrence
self.recent_packets.append(packet_record)
if len(self.recent_packets) > self.max_recent_packets:
self.recent_packets.pop(0)
def log_trace_record(self, packet_record: dict) -> None:
"""Manually log a packet trace record (used by external callers)"""
self.recent_packets.append(packet_record)
self.rx_count += 1
if packet_record.get("transmitted", False):
self.forwarded_count += 1
else:
self.dropped_count += 1
# Store to persistent storage (same as __call__ does)
if self.storage:
try:
self.storage.record_packet(packet_record)
except Exception as e:
logger.error(f"Failed to store packet record: {e}")
if len(self.recent_packets) > self.max_recent_packets:
self.recent_packets.pop(0)
def record_packet_only(self, packet: Packet, metadata: dict) -> None:
"""Record a packet for UI/storage without running forwarding or duplicate logic.
Used by the packet router for injection-only types (ANON_REQ, ACK, PATH, etc.)
so they still appear in the web UI.
TRACE packets are excluded: TraceHelper.log_trace_record stores the real trace path;
packet.path on TRACE holds SNR bytes, not routing hashes.
"""
if not self.storage:
return
rssi = metadata.get("rssi", 0)
snr = metadata.get("snr", 0.0)
if not hasattr(packet, "header") or packet.header is None:
logger.debug("record_packet_only: packet missing header, skipping")
return
header_info = PacketHeaderUtils.parse_header(packet.header)
payload_type = header_info["payload_type"]
route_type = header_info["route_type"]
if payload_type == PAYLOAD_TYPE_TRACE:
return
original_path_hashes = packet.get_path_hashes_hex()
path_hash_size = packet.get_path_hash_size()
path_hash = self._path_hash_display(original_path_hashes)
src_hash, dst_hash = self._packet_record_src_dst(packet, payload_type)
packet_record = self._build_packet_record(
packet,
payload_type,
route_type,
rssi,
snr,
original_path_hashes,
path_hash_size,
path_hash,
src_hash,
dst_hash,
)
try:
self.storage.record_packet(packet_record, skip_letsmesh_if_invalid=False)
except Exception as e:
logger.error(f"Failed to store packet record (record_packet_only): {e}")
return
self.recent_packets.append(packet_record)
if len(self.recent_packets) > self.max_recent_packets:
self.recent_packets.pop(0)
def record_duplicate(self, packet: Packet, rssi: int = 0, snr: float = 0.0) -> None:
"""Record a known-duplicate packet for UI/storage visibility without forwarding.
Called by the raw_packet_subscriber path so that path variants blocked
by the Dispatcher's payload-based dedup still appear in the UI.
"""
self.rx_count += 1
route_type = packet.header & PH_ROUTE_MASK
if route_type in (ROUTE_TYPE_FLOOD, ROUTE_TYPE_TRANSPORT_FLOOD):
self.recv_flood_count += 1
self.flood_dup_count += 1
elif route_type in (ROUTE_TYPE_DIRECT, ROUTE_TYPE_TRANSPORT_DIRECT):
self.recv_direct_count += 1
self.direct_dup_count += 1
header_info = PacketHeaderUtils.parse_header(packet.header)
payload_type = header_info["payload_type"]
route_type_parsed = header_info["route_type"]
original_path_hashes = packet.get_path_hashes_hex()
path_hash_size = packet.get_path_hash_size()
path_hash = self._path_hash_display(original_path_hashes)
src_hash, dst_hash = self._packet_record_src_dst(packet, payload_type)
packet_record = self._build_packet_record(
packet, payload_type, route_type_parsed, rssi, snr,
original_path_hashes, path_hash_size, path_hash,
src_hash, dst_hash,
transmitted=False,
drop_reason="Duplicate",
is_duplicate=True,
)
if self.storage:
try:
self.storage.record_packet(packet_record, skip_letsmesh_if_invalid=False)
except Exception as e:
logger.error(f"Failed to store duplicate record: {e}")
# Group under original in recent_packets
if len(self.recent_packets) > 0:
for idx in range(len(self.recent_packets) - 1, -1, -1):
prev_pkt = self.recent_packets[idx]
if prev_pkt.get("packet_hash") == packet_record["packet_hash"]:
if "duplicates" not in prev_pkt:
prev_pkt["duplicates"] = []
prev_pkt["duplicates"].append(packet_record)
break
else:
self.recent_packets.append(packet_record)
else:
self.recent_packets.append(packet_record)
if len(self.recent_packets) > self.max_recent_packets:
self.recent_packets.pop(0)
def cleanup_cache(self):
now = time.time()
expired = [k for k, ts in self.seen_packets.items() if now - ts > self.cache_ttl]
for k in expired:
del self.seen_packets[k]
def _path_hash_display(self, display_hashes) -> Optional[str]:
"""Build path hash string for packet record from path hashes list."""
if not display_hashes:
return None
display = display_hashes[:8]
if len(display_hashes) > 8:
display = list(display) + ["..."]
return "[" + ", ".join(display) + "]"
def _packet_record_src_dst(
self, packet: Packet, payload_type: int
) -> Tuple[Optional[str], Optional[str]]:
"""Return (src_hash, dst_hash) for packet_record from packet and payload_type."""
src_hash = None
dst_hash = None
payload = getattr(packet, "payload", None)
if payload_type in [0x00, 0x01, 0x02, 0x08]:
if payload and len(payload) >= 2:
dst_hash = f"{payload[0]:02X}"
src_hash = f"{payload[1]:02X}"
elif payload_type == PAYLOAD_TYPE_ADVERT:
if payload and len(payload) >= 1:
src_hash = f"{payload[0]:02X}"
elif payload_type == PAYLOAD_TYPE_ANON_REQ:
if payload and len(payload) >= 1:
dst_hash = f"{payload[0]:02X}"
return (src_hash, dst_hash)
def _build_packet_record(
self,
packet: Packet,
payload_type: int,
route_type: int,
rssi: int,
snr: float,
original_path_hashes,
path_hash_size: int,
path_hash: Optional[str],
src_hash: Optional[str],
dst_hash: Optional[str],
*,
transmitted: bool = False,
drop_reason: Optional[str] = None,
is_duplicate: bool = False,
forwarded_path=None,
tx_delay_ms: float = 0.0,
lbt_attempts: int = 0,
lbt_backoff_delays_ms=None,
lbt_channel_busy: bool = False,
) -> dict:
"""Build a single packet_record dict for storage and recent_packets."""
pkt_hash = packet.calculate_packet_hash().hex().upper()
payload = getattr(packet, "payload", None)
payload_len = len(payload or b"")
return {
"timestamp": time.time(),
"header": (
f"0x{packet.header:02X}"
if hasattr(packet, "header") and packet.header is not None
else None
),
"payload": payload.hex() if payload else None,
"payload_length": len(payload) if payload else 0,
"type": payload_type,
"route": route_type,
"length": payload_len,
"rssi": rssi,
"snr": snr,
"score": self.calculate_packet_score(
snr, payload_len, self.radio_config["spreading_factor"]
),
"tx_delay_ms": tx_delay_ms,
"transmitted": transmitted,
"is_duplicate": is_duplicate,
"packet_hash": pkt_hash[:16],
"drop_reason": drop_reason,
"path_hash": path_hash,
"src_hash": src_hash,
"dst_hash": dst_hash,
"original_path": original_path_hashes or None,
"forwarded_path": forwarded_path,
"path_hash_size": path_hash_size,
"raw_packet": packet.write_to().hex() if hasattr(packet, "write_to") else None,
"lbt_attempts": lbt_attempts,
"lbt_backoff_delays_ms": lbt_backoff_delays_ms,
"lbt_channel_busy": lbt_channel_busy,
}
def _get_drop_reason(self, packet: Packet) -> str:
if self.is_duplicate(packet):
return "Duplicate"
if not packet or not packet.payload:
return "Empty payload"
if len(packet.path or []) >= MAX_PATH_SIZE:
return "Path too long"
route_type = packet.header & PH_ROUTE_MASK
if route_type == ROUTE_TYPE_FLOOD:
# Check if global flood policy blocked it
global_flood_allow = self.config.get("mesh", {}).get("global_flood_allow", True)
if not global_flood_allow:
return "Global flood policy disabled"
if route_type == ROUTE_TYPE_DIRECT:
hash_size = packet.get_path_hash_size()
if not packet.path or len(packet.path) < hash_size:
return "Direct: no path"
next_hop = bytes(packet.path[:hash_size])
if next_hop != self.local_hash_bytes[:hash_size]:
return "Direct: not for us"
# Default reason
return "Unknown"
def is_duplicate(self, packet: Packet) -> bool:
pkt_hash = packet.calculate_packet_hash().hex().upper()
if pkt_hash in self.seen_packets:
return True
return False
def mark_seen(self, packet: Packet):
pkt_hash = packet.calculate_packet_hash().hex().upper()
self.seen_packets[pkt_hash] = time.time()
if len(self.seen_packets) > self.max_cache_size:
self.seen_packets.popitem(last=False)
def validate_packet(self, packet: Packet) -> Tuple[bool, str]:
if not packet or not packet.payload:
return False, "Empty payload"
if len(packet.path or []) >= MAX_PATH_SIZE:
return (
False,
f"Path length {len(packet.path or [])} exceeds MAX_PATH_SIZE ({MAX_PATH_SIZE})",
)
return True, ""
def _normalize_loop_detect_mode(self, mode) -> str:
if isinstance(mode, str):
normalized = mode.strip().lower()
if normalized in {
LOOP_DETECT_OFF,
LOOP_DETECT_MINIMAL,
LOOP_DETECT_MODERATE,
LOOP_DETECT_STRICT,
}:
return normalized
return LOOP_DETECT_OFF
def _get_loop_detect_mode(self) -> str:
return self.loop_detect_mode
def _is_flood_looped(self, packet: Packet, mode: Optional[str] = None) -> bool:
mode = mode or self._get_loop_detect_mode()
if mode == LOOP_DETECT_OFF:
return False
max_counter = LOOP_DETECT_MAX_COUNTERS.get(mode)
if max_counter is None:
return False
path = packet.path or bytearray()
local_count = sum(1 for hop in path if hop == self.local_hash)
return local_count >= max_counter
def _check_transport_codes(self, packet: Packet) -> Tuple[bool, str]:
if not self.storage:
logger.warning("Transport code check failed: no storage available")
return False, "No storage available for transport key validation"
try:
from pymc_core.protocol.transport_keys import calc_transport_code
# Check cache validity
current_time = time.time()
if (
self._transport_keys_cache is None
or current_time - self._transport_keys_cache_time > self._transport_keys_cache_ttl
):
# Refresh cache
self._transport_keys_cache = self.storage.get_transport_keys()
self._transport_keys_cache_time = current_time
transport_keys = self._transport_keys_cache
if not transport_keys:
return False, "No transport keys configured"
# Check if packet has transport codes
if not packet.has_transport_codes():
return False, "No transport codes present"
transport_code_0 = packet.transport_codes[0] # First transport code
payload = packet.get_payload()
payload_type = (
packet.get_payload_type()
if hasattr(packet, "get_payload_type")
else ((packet.header & 0x3C) >> 2)
)
# Check packet against each transport key
for key_record in transport_keys:
transport_key_encoded = key_record.get("transport_key")
key_name = key_record.get("name", "unknown")
flood_policy = key_record.get("flood_policy", "deny")
if not transport_key_encoded:
continue
try:
import base64
transport_key = base64.b64decode(transport_key_encoded)
expected_code = calc_transport_code(transport_key, packet)
if transport_code_0 == expected_code:
logger.debug(
f"Transport code validated for key '{key_name}' with policy '{flood_policy}'"
)
# Update last_used timestamp for this key
try:
key_id = key_record.get("id")
if key_id:
self.storage.update_transport_key(
key_id=key_id, last_used=time.time()
)
logger.debug(
f"Updated last_used timestamp for transport key '{key_name}'"
)
except Exception as e:
logger.warning(
f"Failed to update last_used for transport key '{key_name}': {e}"
)
# Check flood policy for this key
if flood_policy == "allow":
return True, ""
else:
return False, f"Transport key '{key_name}' flood policy denied"
except Exception as e:
logger.warning(f"Error checking transport key '{key_name}': {e}")
continue
# No matching transport code found
logger.debug(
f"Transport code 0x{transport_code_0:04X} denied (checked {len(transport_keys)} keys)"
)
return False, "No matching transport code"
except Exception as e:
logger.error(f"Transport code validation error: {e}")
return False, f"Transport code validation error: {e}"
def flood_forward(self, packet: Packet) -> Optional[Packet]:
# Validate
valid, reason = self.validate_packet(packet)
if not valid:
packet.drop_reason = reason
return None
# Check if packet is marked do-not-retransmit
if packet.is_marked_do_not_retransmit():
# Check if packet has custom drop reason
if not packet.drop_reason:
packet.drop_reason = "Marked do not retransmit"
return None
# Check global flood policy
global_flood_allow = self.config.get("mesh", {}).get("global_flood_allow", True)
if not global_flood_allow:
route_type = packet.header & PH_ROUTE_MASK
if route_type == ROUTE_TYPE_FLOOD or route_type == ROUTE_TYPE_TRANSPORT_FLOOD:
allowed, check_reason = self._check_transport_codes(packet)
if not allowed:
packet.drop_reason = check_reason
return None
else:
packet.drop_reason = "Global flood policy disabled"
return None
mode = self._get_loop_detect_mode()
if self._is_flood_looped(packet, mode):
packet.drop_reason = f"FLOOD loop detected ({mode})"
return None
# Suppress duplicates
if self.is_duplicate(packet):
packet.drop_reason = "Duplicate"
return None
if packet.path is None:
packet.path = bytearray()
elif not isinstance(packet.path, bytearray):
packet.path = bytearray(packet.path)
hash_size = packet.get_path_hash_size()
hop_count = packet.get_path_hash_count()
# path_len encodes hop count in 6 bits (0-63); adding ourselves must not exceed 63
if hop_count >= 63:
packet.drop_reason = "Path hop count at maximum (63), cannot append"
return None
# Check path won't exceed MAX_PATH_SIZE after append
if (hop_count + 1) * hash_size > MAX_PATH_SIZE:
packet.drop_reason = "Path would exceed MAX_PATH_SIZE"
return None
self.mark_seen(packet)
# Append hash_size bytes from our public key prefix
packet.path.extend(self.local_hash_bytes[:hash_size])
packet.path_len = PathUtils.encode_path_len(hash_size, hop_count + 1)
return packet
def direct_forward(self, packet: Packet) -> Optional[Packet]:
# Validate packet (empty payload, oversized path, etc.)
valid, reason = self.validate_packet(packet)
if not valid:
packet.drop_reason = reason
return None
# Check if packet is marked do-not-retransmit
if packet.is_marked_do_not_retransmit():
if not packet.drop_reason:
packet.drop_reason = "Marked do not retransmit"
return None
hash_size = packet.get_path_hash_size()
hop_count = packet.get_path_hash_count()
# Check if we're the next hop
if not packet.path or len(packet.path) < hash_size:
packet.drop_reason = "Direct: no path"
return None
next_hop = bytes(packet.path[:hash_size])
if next_hop != self.local_hash_bytes[:hash_size]:
packet.drop_reason = "Direct: not for us"
return None
# Suppress duplicates
if self.is_duplicate(packet):
packet.drop_reason = "Duplicate"
return None
self.mark_seen(packet)
# Remove first hash entry (hash_size bytes)
packet.path = bytearray(packet.path[hash_size:])
packet.path_len = PathUtils.encode_path_len(hash_size, hop_count - 1)
return packet
@staticmethod
def calculate_packet_score(snr: float, packet_len: int, spreading_factor: int = 8) -> float:
# SNR thresholds per SF (from MeshCore RadioLibWrappers.cpp)
snr_thresholds = {7: -7.5, 8: -10.0, 9: -12.5, 10: -15.0, 11: -17.5, 12: -20.0}
if spreading_factor < 7:
return 0.0
threshold = snr_thresholds.get(spreading_factor, -10.0)
# Below threshold = no chance of success
if snr < threshold:
return 0.0
# Success rate based on SNR above threshold
success_rate_based_on_snr = (snr - threshold) / 10.0
# Collision penalty: longer packets more likely to collide (max 256 bytes)
collision_penalty = 1.0 - (packet_len / 256.0)
# Combined score
score = success_rate_based_on_snr * collision_penalty
return max(0.0, min(1.0, score))
def _calculate_tx_delay(self, packet: Packet, snr: float = 0.0) -> float:
import random
packet_len = packet.get_raw_length()
airtime_ms = self.airtime_mgr.calculate_airtime(packet_len)
route_type = packet.header & PH_ROUTE_MASK
# Base delay calculations
# this part took me along time to get right well i hope i got it right ;-)
if route_type == ROUTE_TYPE_FLOOD:
# Flood packets: random(0-5) * (airtime * 52/50 / 2) * tx_delay_factor
# This creates collision avoidance with tunable delay
base_delay_ms = (airtime_ms * 52 / 50) / 2.0 # From C++ implementation
random_mult = random.uniform(0, 5) # Random multiplier for collision avoidance
delay_ms = base_delay_ms * random_mult * self.tx_delay_factor
delay_s = delay_ms / 1000.0
else: # DIRECT
# Direct packets: use direct_tx_delay_factor (already in seconds)
# direct_tx_delay_factor is stored as seconds in config
delay_s = self.direct_tx_delay_factor
# Apply score-based delay adjustment ONLY if delay >= 50ms threshold
# (matching C++ reactive behavior in Dispatcher::calcRxDelay)
if delay_s >= 0.05 and self.use_score_for_tx:
score = self.calculate_packet_score(snr, packet_len)
# Higher score = shorter delay: max(0.2, 1.0 - score)
# score 1.0 → multiplier 0.2 (20% of original)
# score 0.0 → multiplier 1.0 (100% of original)
score_multiplier = max(0.2, 1.0 - score)
delay_s = delay_s * score_multiplier
logger.debug(
f"Congestion detected (delay >= 50ms), score={score:.2f}, "
f"delay multiplier={score_multiplier:.2f}"
)
# Cap at 5 seconds maximum
delay_s = min(delay_s, 5.0)
logger.debug(
f"Route={'FLOOD' if route_type == ROUTE_TYPE_FLOOD else 'DIRECT'}, "
f"len={packet_len}B, airtime={airtime_ms:.1f}ms, delay={delay_s:.3f}s"
)
return delay_s
def process_packet(self, packet: Packet, snr: float = 0.0) -> Optional[Tuple[Packet, float]]:
route_type = packet.header & PH_ROUTE_MASK
if route_type == ROUTE_TYPE_FLOOD or route_type == ROUTE_TYPE_TRANSPORT_FLOOD:
fwd_pkt = self.flood_forward(packet)
if fwd_pkt is None:
return None
delay = self._calculate_tx_delay(fwd_pkt, snr)
return fwd_pkt, delay
elif route_type == ROUTE_TYPE_DIRECT or route_type == ROUTE_TYPE_TRANSPORT_DIRECT:
fwd_pkt = self.direct_forward(packet)
if fwd_pkt is None:
return None
delay = self._calculate_tx_delay(fwd_pkt, snr)
return fwd_pkt, delay
else:
packet.drop_reason = f"Unknown route type: {route_type}"
return None
async def schedule_retransmit(
self,
fwd_pkt: Packet,
delay: float,
airtime_ms: float = 0.0,
local_transmission: bool = False,
):
"""Schedule a packet retransmission with delay and return the task.
If local_transmission is True and the first send fails, retry once after
a short delay (handles transient radio/LBT failures).
"""
async def delayed_send():
await asyncio.sleep(delay)
last_error = None
for attempt in range(2 if local_transmission else 1):
try:
await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False)
self._record_packet_sent(fwd_pkt)
if airtime_ms > 0:
self.airtime_mgr.record_tx(airtime_ms)
packet_size = fwd_pkt.get_raw_length()
logger.info(
f"Retransmitted packet ({packet_size} bytes, "
f"{airtime_ms:.1f}ms airtime)"
)
return
except Exception as e:
last_error = e
logger.error(f"Retransmit failed: {e}")
if local_transmission and attempt == 0:
logger.info("Retrying local TX in 1s...")
await asyncio.sleep(1.0)
else:
raise
if last_error is not None:
raise last_error
return asyncio.create_task(delayed_send())
def _record_packet_sent(self, packet: Packet) -> None:
"""Record a packet send for flood/direct stats (forwarded and originated)."""
route = getattr(packet, "header", 0) & PH_ROUTE_MASK
if route in (ROUTE_TYPE_FLOOD, ROUTE_TYPE_TRANSPORT_FLOOD):
self.sent_flood_count += 1
elif route in (ROUTE_TYPE_DIRECT, ROUTE_TYPE_TRANSPORT_DIRECT):
self.sent_direct_count += 1
def get_noise_floor(self) -> Optional[float]:
try:
radio = self.dispatcher.radio if self.dispatcher else None
if radio and hasattr(radio, "get_noise_floor"):
return radio.get_noise_floor()
return None
except Exception as e:
logger.debug(f"Failed to get noise floor: {e}")
return None
def get_stats(self) -> dict:
uptime_seconds = time.time() - self.start_time
# Get config sections
repeater_config = self.config.get("repeater", {})
duty_cycle_config = self.config.get("duty_cycle", {})
delays_config = self.config.get("delays", {})
max_airtime_ms = duty_cycle_config.get("max_airtime_per_minute", 3600)
max_duty_cycle_percent = (max_airtime_ms / 60000) * 100 # 60000ms = 1 minute
# Calculate actual hourly rates (packets in last 3600 seconds)
now = time.time()
packets_last_hour = [p for p in self.recent_packets if now - p["timestamp"] < 3600]
rx_per_hour = len(packets_last_hour)
forwarded_per_hour = sum(1 for p in packets_last_hour if p.get("transmitted", False))
# Get current noise floor from radio
noise_floor_dbm = self.get_noise_floor()
# Get CRC error count from radio hardware
radio = self.dispatcher.radio if self.dispatcher else None
crc_error_count = getattr(radio, "crc_error_count", 0) if radio else 0
# Get neighbors from database
neighbors = self.storage.get_neighbors() if self.storage else {}
# Format local_hash respecting path_hash_mode
phm = self.config.get("mesh", {}).get("path_hash_mode", 0)
_bc = {0: 1, 1: 2, 2: 3}.get(phm, 1)
_hc = _bc * 2
_val = int.from_bytes(bytes(self.local_hash_bytes[:_bc]), "big")
local_hash_str = f"0x{_val:0{_hc}x}"
stats = {
"local_hash": local_hash_str,
"duplicate_cache_size": len(self.seen_packets),
"cache_ttl": self.cache_ttl,
"rx_count": self.rx_count,
"forwarded_count": self.forwarded_count,
"dropped_count": self.dropped_count,
"recv_flood_count": self.recv_flood_count,
"recv_direct_count": self.recv_direct_count,
"sent_flood_count": self.sent_flood_count,
"sent_direct_count": self.sent_direct_count,
"flood_dup_count": self.flood_dup_count,
"direct_dup_count": self.direct_dup_count,
"rx_per_hour": rx_per_hour,
"forwarded_per_hour": forwarded_per_hour,
"recent_packets": self.recent_packets,
"neighbors": neighbors,
"uptime_seconds": uptime_seconds,
"noise_floor_dbm": noise_floor_dbm,
"crc_error_count": crc_error_count,
# Add configuration data
"config": {
"node_name": repeater_config.get("node_name", "Unknown"),
"repeater": {
"mode": repeater_config.get("mode", "forward"),
"use_score_for_tx": repeater_config.get("use_score_for_tx", False),
"score_threshold": repeater_config.get("score_threshold", 0.3),
"send_advert_interval_hours": repeater_config.get(
"send_advert_interval_hours", 10
),
"latitude": repeater_config.get("latitude", 0.0),
"longitude": repeater_config.get("longitude", 0.0),
"max_flood_hops": repeater_config.get("max_flood_hops", 3),
"advert_interval_minutes": repeater_config.get("advert_interval_minutes", 120),
"advert_rate_limit": repeater_config.get("advert_rate_limit", {}),
"advert_penalty_box": repeater_config.get("advert_penalty_box", {}),
"advert_adaptive": repeater_config.get("advert_adaptive", {}),
},
"radio": self.config.get(
"radio", {}
), # Read from live config, not cached radio_config
"duty_cycle": {
"max_airtime_percent": max_duty_cycle_percent,
"enforcement_enabled": duty_cycle_config.get("enforcement_enabled", True),
},
"delays": {
"tx_delay_factor": delays_config.get("tx_delay_factor", 1.0),
"direct_tx_delay_factor": delays_config.get("direct_tx_delay_factor", 0.5),
"rx_delay_base": delays_config.get("rx_delay_base", 0.0),
},
"web": self.config.get("web", {}), # Include web configuration
"mesh": {
"loop_detect": self.config.get("mesh", {}).get("loop_detect", "off"),
"global_flood_allow": self.config.get("mesh", {}).get("global_flood_allow", True),
"path_hash_mode": self.config.get("mesh", {}).get("path_hash_mode", 0),
},
"letsmesh": self.config.get("letsmesh", {}),
},
"public_key": None,
}
# Add airtime stats
stats.update(self.airtime_mgr.get_stats())
return stats
def _start_background_tasks(self):
if self._background_task is None:
self._background_task = asyncio.create_task(self._background_timer_loop())
logger.info("Background timer started for noise floor and adverts")
async def _background_timer_loop(self):
try:
while True:
current_time = time.time()
# Check noise floor recording (every 30 seconds)
if current_time - self.last_noise_measurement >= self.noise_floor_interval:
await self._record_noise_floor_async()
await self._record_crc_errors_async()
self.last_noise_measurement = current_time
# Check advert sending (every N hours)
if self.send_advert_interval_hours > 0 and self.send_advert_func:
interval_seconds = self.send_advert_interval_hours * 3600
if current_time - self.last_advert_time >= interval_seconds:
await self._send_periodic_advert_async()
self.last_advert_time = current_time
# Prune expired entries from duplicate detection cache (every 60s)
if current_time - self.last_cache_cleanup >= 60.0:
self.cleanup_cache()
self.last_cache_cleanup = current_time
# Prune old SQLite data (check every 6 hours)
if current_time - self.last_db_cleanup >= 21600:
if self.storage:
try:
retention_days = (
self.config
.get("storage", {})
.get("retention", {})
.get("sqlite_cleanup_days", 31)
)
self.storage.cleanup_old_data(days=retention_days)
logger.info("Cleaned up SQLite data older than %d days", retention_days)
except Exception as e:
logger.warning(f"SQLite cleanup failed: {e}")
self.last_db_cleanup = current_time
# Sleep for 5 seconds before next check
await asyncio.sleep(5.0)
except asyncio.CancelledError:
logger.info("Background timer loop cancelled")
raise
except Exception as e:
logger.error(f"Error in background timer loop: {e}")
# Restart the timer after a delay
await asyncio.sleep(30)
self._background_task = asyncio.create_task(self._background_timer_loop())
async def _record_noise_floor_async(self):
if not self.storage:
return
try:
# Run in executor so KISS modem's blocking _send_command (up to 5s timeout)
# does not block the event loop and hang the process / delay Ctrl+C.
loop = asyncio.get_running_loop()
noise_floor = await loop.run_in_executor(None, self.get_noise_floor)
if noise_floor is not None:
self.storage.record_noise_floor(noise_floor)
logger.debug(f"Recorded noise floor: {noise_floor} dBm")
else:
logger.debug("Unable to read noise floor from radio")
except Exception as e:
logger.error(f"Error recording noise floor: {e}")
async def _record_crc_errors_async(self):
"""Persist CRC error delta from the radio hardware counter."""
if not self.storage:
return
try:
radio = self.dispatcher.radio if self.dispatcher else None
current = getattr(radio, "crc_error_count", 0) if radio else 0
delta = current - self._last_crc_error_count
if delta > 0:
self.storage.record_crc_errors(delta)
logger.debug(f"Recorded {delta} CRC errors (total: {current})")
self._last_crc_error_count = current
except Exception as e:
logger.error(f"Error recording CRC errors: {e}")
async def _send_periodic_advert_async(self):
logger.info(
f"Periodic advert timer triggered (interval: {self.send_advert_interval_hours}h)"
)
try:
if self.send_advert_func:
success = await self.send_advert_func()
if success:
logger.info("Periodic advert sent successfully")
else:
logger.warning("Failed to send periodic advert")
else:
logger.debug("No send_advert_func configured")
except Exception as e:
logger.error(f"Error sending periodic advert: {e}")
def reload_runtime_config(self):
"""Reload runtime configuration from self.config (called after live config updates)."""
try:
# Refresh delay factors
self.tx_delay_factor = self.config.get("delays", {}).get("tx_delay_factor", 1.0)
self.direct_tx_delay_factor = self.config.get("delays", {}).get(
"direct_tx_delay_factor", 0.5
)
# Refresh repeater settings
repeater_config = self.config.get("repeater", {})
self.use_score_for_tx = repeater_config.get("use_score_for_tx", False)
self.score_threshold = repeater_config.get("score_threshold", 0.3)
self.send_advert_interval_hours = repeater_config.get("send_advert_interval_hours", 10)
self.cache_ttl = repeater_config.get("cache_ttl", 60)
self.loop_detect_mode = self._normalize_loop_detect_mode(
self.config.get("mesh", {}).get("loop_detect", LOOP_DETECT_OFF)
)
# Note: Radio config changes require restart as they affect hardware
# Note: Airtime manager has its own config reference that gets updated
logger.info("Runtime configuration reloaded successfully")
except Exception as e:
logger.error(f"Error reloading runtime config: {e}")
def cleanup(self):
if self._background_task and not self._background_task.done():
self._background_task.cancel()
logger.info("Background timer task cancelled")
if self.storage:
try:
self.storage.close()
logger.info("StorageCollector closed successfully")
except Exception as e:
logger.error(f"Error closing StorageCollector: {e}")
def __del__(self):
try:
self.cleanup()
except Exception:
pass