Merge pull request #29 from rightup/feat/valid-check

Refactor packet handling, improve routing flow, and enhance UI/logging
This commit is contained in:
Lloyd
2025-12-09 14:03:13 -08:00
committed by GitHub
17 changed files with 1009 additions and 563 deletions
+2 -2
View File
@@ -204,7 +204,7 @@ install_repeater() {
echo "25"; echo "# Installing system dependencies..."
apt-get update -qq
apt-get install -y libffi-dev jq pip python3-rrdtool wget swig build-essential python3-dev liblgpio-dev
apt-get install -y libffi-dev jq pip python3-rrdtool wget swig build-essential python3-dev
# Install mikefarah yq v4 if not already installed
if ! command -v yq &> /dev/null || [[ "$(yq --version 2>&1)" != *"mikefarah/yq"* ]]; then
@@ -340,7 +340,7 @@ upgrade_repeater() {
echo "[3/9] Updating system dependencies..."
apt-get update -qq
apt-get install -y libffi-dev jq pip python3-rrdtool wget swig build-essential python3-dev liblgpio-dev
apt-get install -y libffi-dev jq pip python3-rrdtool wget swig build-essential python3-dev
# Install mikefarah yq v4 if not already installed
if ! command -v yq &> /dev/null || [[ "$(yq --version 2>&1)" != *"mikefarah/yq"* ]]; then
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "pymc_repeater"
version = "1.0.4"
version = "1.0.5"
authors = [
{name = "Lloyd", email = "lloyd@rightup.co.uk"},
]
+1 -1
View File
@@ -1 +1 @@
__version__ = "1.0.5-beta-1"
__version__ = "1.0.5-beta-2"
@@ -142,7 +142,7 @@ class MeshCoreToMqttJwtPusher:
signature_hex = binascii.hexlify(signature).decode()
token = f"{header_b64}.{payload_b64}.{signature_hex}"
logging.debug(f"Generated MeshCore token: {token}")
logging.debug(f"Generated MeshCore token: {token[:10]}...{token[-10:]}")
return token
# ----------------------------------------------------------------
+55 -8
View File
@@ -15,6 +15,7 @@ class SQLiteHandler:
self.storage_dir = storage_dir
self.sqlite_path = self.storage_dir / "repeater.db"
self._init_database()
self._run_migrations()
def _init_database(self):
try:
@@ -63,7 +64,8 @@ class SQLiteHandler:
rssi INTEGER,
snr REAL,
advert_count INTEGER NOT NULL DEFAULT 1,
is_new_neighbor BOOLEAN NOT NULL
is_new_neighbor BOOLEAN NOT NULL,
zero_hop BOOLEAN NOT NULL DEFAULT FALSE
)
""")
@@ -105,6 +107,47 @@ class SQLiteHandler:
except Exception as e:
logger.error(f"Failed to initialize SQLite: {e}")
def _run_migrations(self):
"""Run database migrations"""
try:
with sqlite3.connect(self.sqlite_path) as conn:
# Create migrations table if it doesn't exist
conn.execute("""
CREATE TABLE IF NOT EXISTS migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
migration_name TEXT NOT NULL UNIQUE,
applied_at REAL NOT NULL
)
""")
# Migration 1: Add zero_hop column to adverts table
migration_name = "add_zero_hop_to_adverts"
existing = conn.execute(
"SELECT migration_name FROM migrations WHERE migration_name = ?",
(migration_name,)
).fetchone()
if not existing:
# Check if zero_hop column already exists
cursor = conn.execute("PRAGMA table_info(adverts)")
columns = [column[1] for column in cursor.fetchall()]
if "zero_hop" not in columns:
conn.execute("ALTER TABLE adverts ADD COLUMN zero_hop BOOLEAN NOT NULL DEFAULT FALSE")
logger.info("Added zero_hop column to adverts table")
# Mark migration as applied
conn.execute(
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
(migration_name, time.time())
)
logger.info(f"Migration '{migration_name}' applied successfully")
conn.commit()
except Exception as e:
logger.error(f"Failed to run migrations: {e}")
def store_packet(self, record: dict):
try:
with sqlite3.connect(self.sqlite_path) as conn:
@@ -169,7 +212,8 @@ class SQLiteHandler:
UPDATE adverts
SET timestamp = ?, node_name = ?, is_repeater = ?, route_type = ?,
contact_type = ?, latitude = ?, longitude = ?, last_seen = ?,
rssi = ?, snr = ?, advert_count = advert_count + 1, is_new_neighbor = 0
rssi = ?, snr = ?, advert_count = advert_count + 1, is_new_neighbor = 0,
zero_hop = ?
WHERE pubkey = ?
""", (
current_time,
@@ -182,14 +226,16 @@ class SQLiteHandler:
current_time,
record.get("rssi"),
record.get("snr"),
record.get("zero_hop", False),
record.get("pubkey", "")
))
else:
conn.execute("""
INSERT INTO adverts (
timestamp, pubkey, node_name, is_repeater, route_type, contact_type,
latitude, longitude, first_seen, last_seen, rssi, snr, advert_count, is_new_neighbor
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
latitude, longitude, first_seen, last_seen, rssi, snr, advert_count,
is_new_neighbor, zero_hop
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
current_time,
record.get("pubkey", ""),
@@ -204,7 +250,8 @@ class SQLiteHandler:
record.get("rssi"),
record.get("snr"),
1,
True
True,
record.get("zero_hop", False)
))
except Exception as e:
@@ -619,7 +666,7 @@ class SQLiteHandler:
query = """
SELECT id, timestamp, pubkey, node_name, is_repeater, route_type,
contact_type, latitude, longitude, first_seen, last_seen,
rssi, snr, advert_count, is_new_neighbor
rssi, snr, advert_count, is_new_neighbor, zero_hop
FROM adverts
WHERE contact_type = ?
"""
@@ -655,11 +702,11 @@ class SQLiteHandler:
"rssi": row["rssi"],
"snr": row["snr"],
"advert_count": row["advert_count"],
"is_new_neighbor": bool(row["is_new_neighbor"])
"is_new_neighbor": bool(row["is_new_neighbor"]),
"zero_hop": bool(row["zero_hop"])
}
adverts.append(advert)
logger.debug(f"Found {len(adverts)} adverts with contact_type '{contact_type}'")
return adverts
except Exception as e:
+12 -4
View File
@@ -82,8 +82,13 @@ class StorageCollector:
"packets_received": self.repeater_handler.rx_count,
}
def record_packet(self, packet_record: dict):
"""Record packet to storage and publish to MQTT/LetsMesh"""
def record_packet(self, packet_record: dict, skip_letsmesh_if_invalid: bool = True):
"""Record packet to storage and publish to MQTT/LetsMesh
Args:
packet_record: Dictionary containing packet information
skip_letsmesh_if_invalid: If True, don't publish packets with drop_reason to LetsMesh
"""
logger.debug(
f"Recording packet: type={packet_record.get('type')}, "
f"transmitted={packet_record.get('transmitted')}"
@@ -95,8 +100,11 @@ class StorageCollector:
self.rrd_handler.update_packet_metrics(packet_record, cumulative_counts)
self.mqtt_handler.publish(packet_record, "packet")
# Publish to LetsMesh if enabled
self._publish_to_letsmesh(packet_record)
# Publish to LetsMesh if enabled (skip invalid packets if requested)
if skip_letsmesh_if_invalid and packet_record.get('drop_reason'):
logger.debug(f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}")
else:
self._publish_to_letsmesh(packet_record)
def _publish_to_letsmesh(self, packet_record: dict):
"""Publish packet to LetsMesh broker if enabled and allowed"""
+33 -104
View File
@@ -98,21 +98,15 @@ class RepeaterHandler(BaseHandler):
self._transport_keys_cache_time = 0
self._transport_keys_cache_ttl = 60 # Cache for 60 seconds
self._last_drop_reason = None
self._known_neighbors = set()
self._start_background_tasks()
async def __call__(self, packet: Packet, metadata: Optional[dict] = None) -> None:
async def __call__(self, packet: Packet, metadata: Optional[dict] = None, local_transmission: bool = False) -> None:
if metadata is None:
metadata = {}
self.rx_count += 1
# Reset drop reason for this packet processing
self._last_drop_reason = None
# Check if we're in monitor mode (receive only, no forwarding)
mode = self.config.get("repeater", {}).get("mode", "forward")
monitor_mode = mode == "monitor"
@@ -131,9 +125,20 @@ class RepeaterHandler(BaseHandler):
original_path = list(packet.path) if packet.path else []
# Process for forwarding (skip if in monitor mode)
result = None if monitor_mode else self.process_packet(packet, snr)
# Process for forwarding (skip if in monitor mode or if this is a local transmission)
result = None if (monitor_mode or local_transmission) else self.process_packet(packet, snr)
forwarded_path = None
# For local transmissions, create a direct transmission result
if local_transmission and not monitor_mode:
# Mark local packet as seen to prevent duplicate processing when received back
self.mark_seen(packet)
# Calculate transmission delay for local packets
delay = self._calculate_tx_delay(packet, snr)
result = (packet, delay)
forwarded_path = list(packet.path) if packet.path else []
logger.debug(f"Local transmission: calculated delay {delay:.3f}s")
if result:
fwd_pkt, delay = result
tx_delay_ms = delay * 1000.0
@@ -167,8 +172,9 @@ class RepeaterHandler(BaseHandler):
if monitor_mode:
drop_reason = "Monitor mode"
else:
drop_reason = self._last_drop_reason or self._get_drop_reason(packet)
logger.debug(f"Packet not forwarded: {drop_reason}")
# Check if packet has a specific drop reason set by handlers
drop_reason = packet.drop_reason or self._get_drop_reason(packet)
logger.debug(f"Packet not forwarded: {drop_reason}")
# Extract packet type and route from header
if not hasattr(packet, "header") or packet.header is None:
@@ -191,10 +197,6 @@ class RepeaterHandler(BaseHandler):
if is_dupe and drop_reason is None:
drop_reason = "Duplicate"
# Process adverts for neighbor tracking
if payload_type == PAYLOAD_TYPE_ADVERT:
self._process_advert(packet, rssi, snr)
path_hash = None
display_path = (
original_path if original_path else (list(packet.path) if packet.path else [])
@@ -258,9 +260,13 @@ class RepeaterHandler(BaseHandler):
}
# Store packet record to persistent storage
# Skip LetsMesh only for invalid packets (not duplicates or operational drops)
if self.storage:
try:
self.storage.record_packet(packet_record)
# Only skip LetsMesh for actual invalid/bad packets
invalid_reasons = ["Invalid advert packet", "Empty payload", "Path too long"]
skip_letsmesh = drop_reason in invalid_reasons if drop_reason else False
self.storage.record_packet(packet_record, skip_letsmesh_if_invalid=skip_letsmesh)
except Exception as e:
logger.error(f"Failed to store packet record: {e}")
@@ -342,85 +348,6 @@ class RepeaterHandler(BaseHandler):
# Default reason
return "Unknown"
def _process_advert(self, packet: Packet, rssi: int, snr: float):
try:
from pymc_core.protocol.constants import ADVERT_FLAG_IS_REPEATER
from pymc_core.protocol.utils import (
decode_appdata,
get_contact_type_name,
parse_advert_payload,
determine_contact_type_from_flags,
)
# Parse advert payload
if not packet.payload or len(packet.payload) < 40:
return
advert_data = parse_advert_payload(packet.payload)
pubkey = advert_data.get("pubkey", "")
# Skip our own adverts
if self.dispatcher and hasattr(self.dispatcher, "local_identity"):
local_pubkey = self.dispatcher.local_identity.get_public_key().hex()
if pubkey == local_pubkey:
logger.debug("Ignoring own advert in neighbor tracking")
return
appdata = advert_data.get("appdata", b"")
if not appdata:
return
appdata_decoded = decode_appdata(appdata)
flags = appdata_decoded.get("flags", 0)
is_repeater = bool(flags & ADVERT_FLAG_IS_REPEATER)
route_type = packet.header & PH_ROUTE_MASK
contact_type_id = determine_contact_type_from_flags(flags)
contact_type = get_contact_type_name(contact_type_id)
# Extract neighbor info
node_name = appdata_decoded.get("node_name", "Unknown")
latitude = appdata_decoded.get("latitude")
longitude = appdata_decoded.get("longitude")
current_time = time.time()
if pubkey not in self._known_neighbors:
# Only check database if not in cache
current_neighbors = self.storage.get_neighbors() if self.storage else {}
is_new_neighbor = pubkey not in current_neighbors
if is_new_neighbor:
self._known_neighbors.add(pubkey)
else:
is_new_neighbor = False
advert_record = {
"timestamp": current_time,
"pubkey": pubkey,
"node_name": node_name,
"is_repeater": is_repeater,
"route_type": route_type,
"contact_type": contact_type,
"latitude": latitude,
"longitude": longitude,
"rssi": rssi,
"snr": snr,
"is_new_neighbor": is_new_neighbor,
}
# Store to database
if self.storage:
try:
self.storage.record_advert(advert_record)
if is_new_neighbor:
logger.info(f"Discovered new neighbor: {node_name} ({pubkey[:16]}...)")
except Exception as e:
logger.error(f"Failed to store advert record: {e}")
except Exception as e:
logger.debug(f"Error processing advert for neighbor tracking: {e}")
def is_duplicate(self, packet: Packet) -> bool:
pkt_hash = packet.calculate_packet_hash().hex().upper()
@@ -531,12 +458,14 @@ class RepeaterHandler(BaseHandler):
# Validate
valid, reason = self.validate_packet(packet)
if not valid:
self._last_drop_reason = reason
packet.drop_reason = reason
return None
# Check if packet is marked do-not-retransmit
if packet.is_marked_do_not_retransmit():
self._last_drop_reason = "Marked do not retransmit"
# Check if packet has custom drop reason
if not packet.drop_reason:
packet.drop_reason = "Marked do not retransmit"
return None
# Check global flood policy
@@ -547,15 +476,15 @@ class RepeaterHandler(BaseHandler):
allowed, check_reason = self._check_transport_codes(packet)
if not allowed:
self._last_drop_reason = check_reason
packet.drop_reason = check_reason
return None
else:
self._last_drop_reason = "Global flood policy disabled"
packet.drop_reason = "Global flood policy disabled"
return None
# Suppress duplicates
if self.is_duplicate(packet):
self._last_drop_reason = "Duplicate"
packet.drop_reason = "Duplicate"
return None
if packet.path is None:
@@ -574,12 +503,12 @@ class RepeaterHandler(BaseHandler):
# Check if we're the next hop
if not packet.path or len(packet.path) == 0:
self._last_drop_reason = "Direct: no path"
packet.drop_reason = "Direct: no path"
return None
next_hop = packet.path[0]
next_hop = packet.path[0]
if next_hop != self.local_hash:
self._last_drop_reason = "Direct: not for us"
packet.drop_reason = "Direct: not for us"
return None
original_path = list(packet.path)
@@ -681,7 +610,7 @@ class RepeaterHandler(BaseHandler):
return fwd_pkt, delay
else:
self._last_drop_reason = f"Unknown route type: {route_type}"
packet.drop_reason = f"Unknown route type: {route_type}"
return None
async def schedule_retransmit(self, fwd_pkt: Packet, delay: float, airtime_ms: float = 0.0):
+7
View File
@@ -0,0 +1,7 @@
"""Handler helper modules for pyMC Repeater."""
from .trace import TraceHelper
from .discovery import DiscoveryHelper
from .advert import AdvertHelper
__all__ = ["TraceHelper", "DiscoveryHelper", "AdvertHelper"]
+115
View File
@@ -0,0 +1,115 @@
"""
Advertisement packet handling helper for pyMC Repeater.
This module processes advertisement packets for neighbor tracking and discovery.
"""
import logging
import time
from pymc_core.node.handlers.advert import AdvertHandler
logger = logging.getLogger("AdvertHelper")
class AdvertHelper:
"""Helper class for processing advertisement packets in the repeater."""
def __init__(self, local_identity, storage, log_fn=None):
"""
Initialize the advert helper.
Args:
local_identity: The LocalIdentity instance for this repeater
storage: StorageCollector instance for persisting advert data
log_fn: Optional logging function for AdvertHandler
"""
self.local_identity = local_identity
self.storage = storage
# Create AdvertHandler internally as a parsing utility
self.advert_handler = AdvertHandler(log_fn=log_fn or logger.info)
# Cache for tracking known neighbors (avoid repeated database queries)
self._known_neighbors = set()
async def process_advert_packet(self, packet, rssi: int, snr: float) -> None:
"""
Process an incoming advertisement packet.
This method uses AdvertHandler to parse the packet, then stores
the neighbor information for tracking and discovery.
Args:
packet: The advertisement packet to process
rssi: Received signal strength indicator
snr: Signal-to-noise ratio
"""
try:
# Set signal metrics on packet for handler to use
packet._snr = snr
packet._rssi = rssi
# Use AdvertHandler to parse the packet - it now returns parsed data
advert_data = await self.advert_handler(packet)
if not advert_data or not advert_data.get("valid"):
logger.warning("Invalid advert packet received, dropping.")
packet.mark_do_not_retransmit()
packet.drop_reason = "Invalid advert packet"
return
# Extract data from parsed advert
pubkey = advert_data["public_key"]
node_name = advert_data["name"]
contact_type = advert_data["contact_type"]
# Skip our own adverts
if self.local_identity:
local_pubkey = self.local_identity.get_public_key().hex()
if pubkey == local_pubkey:
logger.debug("Ignoring own advert in neighbor tracking")
return
# Get route type from packet header
from pymc_core.protocol.constants import PH_ROUTE_MASK
route_type = packet.header & PH_ROUTE_MASK
# Check if this is a new neighbor
current_time = time.time()
if pubkey not in self._known_neighbors:
# Only check database if not in cache
current_neighbors = self.storage.get_neighbors() if self.storage else {}
is_new_neighbor = pubkey not in current_neighbors
if is_new_neighbor:
self._known_neighbors.add(pubkey)
logger.info(f"Discovered new neighbor: {node_name} ({pubkey[:16]}...)")
else:
is_new_neighbor = False
# Build advert record
advert_record = {
"timestamp": current_time,
"pubkey": pubkey,
"node_name": node_name,
"is_repeater": "REPEATER" in contact_type.upper(),
"route_type": route_type,
"contact_type": contact_type,
"latitude": advert_data["latitude"],
"longitude": advert_data["longitude"],
"rssi": rssi,
"snr": snr,
"is_new_neighbor": is_new_neighbor,
"zero_hop": route_type in [0x02, 0x03], # True for direct routes (no intermediate hops)
}
# Store to database
if self.storage:
try:
self.storage.record_advert(advert_record)
except Exception as e:
logger.error(f"Failed to store advert record: {e}")
except Exception as e:
logger.error(f"Error processing advert packet: {e}", exc_info=True)
+133
View File
@@ -0,0 +1,133 @@
"""
Discovery request/response handling helper for pyMC Repeater.
This module handles the processing and response to discovery requests,
allowing other nodes to discover repeaters on the mesh network.
"""
import asyncio
import logging
from pymc_core.node.handlers.control import ControlHandler
logger = logging.getLogger("DiscoveryHelper")
class DiscoveryHelper:
"""Helper class for processing discovery requests in the repeater."""
def __init__(
self,
local_identity,
packet_injector=None,
node_type: int = 2,
log_fn=None,
):
"""
Initialize the discovery helper.
Args:
local_identity: The LocalIdentity instance for this repeater
packet_injector: Callable to inject new packets into the router for sending
node_type: Node type identifier (2 = Repeater)
log_fn: Optional logging function for ControlHandler
"""
self.local_identity = local_identity
self.packet_injector = packet_injector # Function to inject packets into router
self.node_type = node_type
# Create ControlHandler internally as a parsing utility
self.control_handler = ControlHandler(log_fn=log_fn or logger.info)
# Set up the request callback
self.control_handler.set_request_callback(self._on_discovery_request)
logger.debug("Discovery handler initialized")
def _on_discovery_request(self, request_data: dict) -> None:
"""
Handle incoming discovery request.
Args:
request_data: Dictionary containing the parsed discovery request
"""
try:
tag = request_data.get("tag", 0)
filter_byte = request_data.get("filter", 0)
prefix_only = request_data.get("prefix_only", False)
snr = request_data.get("snr", 0.0)
rssi = request_data.get("rssi", 0)
logger.info(
f"Request: tag=0x{tag:08X}, filter=0x{filter_byte:02X}, "
f"SNR={snr:+.1f}dB, RSSI={rssi}dBm"
)
# Check if filter matches our node type (repeater = 2, filter_mask = 0x04)
filter_mask = 1 << self.node_type # 1 << 2 = 0x04
if (filter_byte & filter_mask) == 0:
logger.debug("Filter doesn't match, ignoring")
return
logger.info("Sending response...")
if self.local_identity:
self._send_discovery_response(tag, self.node_type, snr, prefix_only)
else:
logger.warning("No local identity available for response")
except Exception as e:
logger.error(f"Error handling request: {e}")
def _send_discovery_response(
self,
tag: int,
node_type: int,
inbound_snr: float,
prefix_only: bool,
) -> None:
"""
Create and send a discovery response packet.
Args:
tag: The tag from the discovery request
node_type: Node type identifier
inbound_snr: SNR of the received request
prefix_only: Whether to use prefix-only mode
"""
try:
our_pub_key = self.local_identity.get_public_key()
from pymc_core.protocol.packet_builder import PacketBuilder
response_packet = PacketBuilder.create_discovery_response(
tag=tag,
node_type=node_type,
inbound_snr=inbound_snr,
pub_key=our_pub_key,
prefix_only=prefix_only,
)
# Send response via router injection
if self.packet_injector:
asyncio.create_task(self._send_packet_async(response_packet, tag))
else:
logger.warning("No packet injector available - discovery response not sent")
except Exception as e:
logger.error(f"Error creating discovery response: {e}")
async def _send_packet_async(self, packet, tag: int) -> None:
"""
Send a discovery response packet via router injection.
Args:
packet: The packet to send
tag: The tag for logging purposes
"""
try:
success = await self.packet_injector(packet, wait_for_ack=False)
if success:
logger.info(f"Response sent for tag 0x{tag:08X}")
else:
logger.warning(f"Failed to send response for tag 0x{tag:08X}")
except Exception as e:
logger.error(f"Error sending response: {e}")
+273
View File
@@ -0,0 +1,273 @@
"""
Trace packet handling helper for pyMC Repeater.
This module handles the processing and forwarding of trace packets,
which are used for network diagnostics to track the path and SNR
of packets through the mesh network.
"""
import logging
import time
from typing import Dict, Any
from pymc_core.node.handlers.trace import TraceHandler
from pymc_core.protocol.constants import MAX_PATH_SIZE, ROUTE_TYPE_DIRECT
logger = logging.getLogger("TraceHelper")
class TraceHelper:
"""Helper class for processing trace packets in the repeater."""
def __init__(self, local_hash: int, repeater_handler, packet_injector=None, log_fn=None):
"""
Initialize the trace helper.
Args:
local_hash: The local node's hash identifier
repeater_handler: The RepeaterHandler instance
packet_injector: Callable to inject new packets into the router for sending
log_fn: Optional logging function for TraceHandler
"""
self.local_hash = local_hash
self.repeater_handler = repeater_handler
self.packet_injector = packet_injector # Function to inject packets into router
# Create TraceHandler internally as a parsing utility
self.trace_handler = TraceHandler(log_fn=log_fn or logger.info)
async def process_trace_packet(self, packet) -> None:
"""
Process an incoming trace packet.
This method handles trace packet validation, logging, recording,
and forwarding if this node is the next hop in the trace path.
Args:
packet: The trace packet to process
"""
try:
# Only process direct route trace packets
if packet.get_route_type() != ROUTE_TYPE_DIRECT or packet.path_len >= MAX_PATH_SIZE:
return
# Parse the trace payload
parsed_data = self.trace_handler._parse_trace_payload(packet.payload)
if not parsed_data.get("valid", False):
logger.warning(
f"Invalid trace packet: {parsed_data.get('error', 'Unknown error')}"
)
return
trace_path = parsed_data["trace_path"]
trace_path_len = len(trace_path)
# Record the trace packet for dashboard/statistics
if self.repeater_handler:
packet_record = self._create_trace_record(packet, trace_path, parsed_data)
self.repeater_handler.log_trace_record(packet_record)
# Extract and log path SNRs and hashes
path_snrs, path_hashes = self._extract_path_info(packet, trace_path)
# Add packet metadata for logging
parsed_data["snr"] = packet.get_snr()
parsed_data["rssi"] = getattr(packet, "rssi", 0)
formatted_response = self.trace_handler._format_trace_response(parsed_data)
logger.info(f"{formatted_response}")
logger.info(f"Path SNRs: [{', '.join(path_snrs)}], Hashes: [{', '.join(path_hashes)}]")
# Check if we should forward this trace packet
should_forward = self._should_forward_trace(packet, trace_path, trace_path_len)
if should_forward:
await self._forward_trace_packet(packet, trace_path_len)
else:
# This is the final destination or can't forward - just log and record
self._log_no_forward_reason(packet, trace_path, trace_path_len)
except Exception as e:
logger.error(f"Error processing trace packet: {e}")
def _create_trace_record(self, packet, trace_path: list, parsed_data: dict) -> Dict[str, Any]:
"""
Create a packet record for trace packets to log to statistics.
Args:
packet: The trace packet
trace_path: The parsed trace path from the payload
parsed_data: The parsed trace data
Returns:
A dictionary containing the packet record
"""
# Format trace path for display
trace_path_bytes = [f"{h:02X}" for h in trace_path[:8]]
if len(trace_path) > 8:
trace_path_bytes.append("...")
path_hash = "[" + ", ".join(trace_path_bytes) + "]"
# Extract SNR information from the path
path_snrs = []
path_snr_details = []
for i in range(packet.path_len):
if i < len(packet.path):
snr_val = packet.path[i]
# Convert unsigned byte to signed SNR
snr_signed = snr_val if snr_val < 128 else snr_val - 256
snr_db = snr_signed / 4.0
path_snrs.append(f"{snr_val}({snr_db:.1f}dB)")
# Add detailed SNR info if we have the corresponding hash
if i < len(trace_path):
path_snr_details.append({
"hash": f"{trace_path[i]:02X}",
"snr_raw": snr_val,
"snr_db": snr_db
})
return {
"timestamp": time.time(),
"header": f"0x{packet.header:02X}" if hasattr(packet, "header") and packet.header is not None else None,
"payload": packet.payload.hex() if hasattr(packet, "payload") and packet.payload else None,
"payload_length": len(packet.payload) if hasattr(packet, "payload") and packet.payload else 0,
"type": packet.get_payload_type(), # 0x09 for trace
"route": packet.get_route_type(), # Should be direct (1)
"length": len(packet.payload or b""),
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"score": self.repeater_handler.calculate_packet_score(
getattr(packet, "snr", 0.0),
len(packet.payload or b""),
self.repeater_handler.radio_config.get("spreading_factor", 8)
) if self.repeater_handler else 0.0,
"tx_delay_ms": 0,
"transmitted": False,
"is_duplicate": False,
"packet_hash": packet.calculate_packet_hash().hex().upper()[:16],
"drop_reason": "trace_received",
"path_hash": path_hash,
"src_hash": None,
"dst_hash": None,
"original_path": [f"{h:02X}" for h in trace_path],
"forwarded_path": None,
# Add trace-specific SNR path information
"path_snrs": path_snrs, # ["58(14.5dB)", "19(4.8dB)"]
"path_snr_details": path_snr_details, # [{"hash": "29", "snr_raw": 58, "snr_db": 14.5}]
"is_trace": True,
"raw_packet": packet.write_to().hex() if hasattr(packet, "write_to") else None,
}
def _extract_path_info(self, packet, trace_path: list) -> tuple:
"""
Extract SNR and hash information from the packet path.
Args:
packet: The trace packet
trace_path: The parsed trace path from the payload
Returns:
A tuple of (path_snrs, path_hashes) lists
"""
path_snrs = []
path_hashes = []
for i in range(packet.path_len):
if i < len(packet.path):
snr_val = packet.path[i]
snr_signed = snr_val if snr_val < 128 else snr_val - 256
snr_db = snr_signed / 4.0
path_snrs.append(f"{snr_val}({snr_db:.1f}dB)")
if i < len(trace_path):
path_hashes.append(f"0x{trace_path[i]:02x}")
return path_snrs, path_hashes
def _should_forward_trace(self, packet, trace_path: list, trace_path_len: int) -> bool:
"""
Determine if this node should forward the trace packet.
Uses the same logic as the original working implementation.
Args:
packet: The trace packet
trace_path: The parsed trace path from the payload
trace_path_len: The length of the trace path
Returns:
True if the packet should be forwarded, False otherwise
"""
# Use the exact logic from the original working code
return (packet.path_len < trace_path_len and
len(trace_path) > packet.path_len and
trace_path[packet.path_len] == self.local_hash and
self.repeater_handler and not self.repeater_handler.is_duplicate(packet))
async def _forward_trace_packet(self, packet, trace_path_len: int) -> None:
"""
Forward a trace packet by appending SNR and sending via injection.
Args:
packet: The trace packet to forward
trace_path_len: The length of the trace path
"""
# Update the packet record to show it will be transmitted
if self.repeater_handler and hasattr(self.repeater_handler, 'recent_packets'):
packet_hash = packet.calculate_packet_hash().hex().upper()[:16]
for record in reversed(self.repeater_handler.recent_packets):
if record.get("packet_hash") == packet_hash:
record["transmitted"] = True
record["drop_reason"] = "trace_forwarded"
break
# Get current SNR and scale it for storage (SNR * 4)
current_snr = packet.get_snr()
snr_scaled = int(current_snr * 4)
# Clamp to signed byte range [-128, 127]
if snr_scaled > 127:
snr_scaled = 127
elif snr_scaled < -128:
snr_scaled = -128
# Convert to unsigned byte representation
snr_byte = snr_scaled if snr_scaled >= 0 else (256 + snr_scaled)
# Ensure path array is long enough
while len(packet.path) <= packet.path_len:
packet.path.append(0)
# Store SNR at current position and increment path length
packet.path[packet.path_len] = snr_byte
packet.path_len += 1
logger.info(
f"Forwarding trace, stored SNR {current_snr:.1f}dB at position {packet.path_len - 1}"
)
# Inject packet into router for proper routing and transmission
if self.packet_injector:
await self.packet_injector(packet, wait_for_ack=False)
else:
logger.warning("No packet injector available - trace packet not forwarded")
def _log_no_forward_reason(self, packet, trace_path: list, trace_path_len: int) -> None:
"""
Log the reason why a trace packet was not forwarded.
Args:
packet: The trace packet
trace_path: The parsed trace path from the payload
trace_path_len: The length of the trace path
"""
if packet.path_len >= trace_path_len:
logger.info("Trace completed (reached end of path)")
elif len(trace_path) <= packet.path_len:
logger.info("Path index out of bounds")
elif trace_path[packet.path_len] != self.local_hash:
expected_hash = trace_path[packet.path_len] if packet.path_len < len(trace_path) else None
logger.info(f"Not our turn (next hop: 0x{expected_hash:02x})")
elif self.repeater_handler and self.repeater_handler.is_duplicate(packet):
logger.info("Duplicate packet, ignoring")
+54 -255
View File
@@ -6,8 +6,8 @@ import sys
from repeater.config import get_radio_for_board, load_config
from repeater.engine import RepeaterHandler
from repeater.web.http_server import HTTPStatsServer, _log_buffer
from pymc_core.node.handlers.trace import TraceHandler
from pymc_core.protocol.constants import MAX_PATH_SIZE, ROUTE_TYPE_DIRECT
from repeater.handler_helpers import TraceHelper, DiscoveryHelper, AdvertHelper
from repeater.packet_router import PacketRouter
logger = logging.getLogger("RepeaterDaemon")
@@ -23,7 +23,10 @@ class RepeaterDaemon:
self.local_hash = None
self.local_identity = None
self.http_server = None
self.trace_handler = None
self.trace_helper = None
self.advert_helper = None
self.discovery_helper = None
self.router = None
log_level = config.get("logging", {}).get("level", "INFO")
@@ -45,7 +48,6 @@ class RepeaterDaemon:
try:
self.radio = get_radio_for_board(self.config)
if hasattr(self.radio, 'set_custom_cad_thresholds'):
# Load CAD settings from config, with defaults
cad_config = self.config.get("radio", {}).get("cad", {})
@@ -91,7 +93,6 @@ class RepeaterDaemon:
self.local_identity = local_identity
self.dispatcher.local_identity = local_identity
pubkey = local_identity.get_public_key()
self.local_hash = pubkey[0]
logger.info(f"Local identity set: {local_identity.get_address_bytes().hex()}")
@@ -105,266 +106,62 @@ class RepeaterDaemon:
self.config, self.dispatcher, self.local_hash, send_advert_func=self.send_advert
)
self.dispatcher.register_fallback_handler(self._repeater_callback)
logger.info("Repeater handler registered (forwarder mode)")
self.trace_handler = TraceHandler(log_fn=logger.info)
# Create router
self.router = PacketRouter(self)
await self.router.start()
self.dispatcher.register_handler(
TraceHandler.payload_type(),
self._trace_callback,
)
logger.info("Trace handler registered for network diagnostics")
# Register router as entry point for ALL packets via fallback handler
# All received packets flow through router → helpers → repeater engine
self.dispatcher.register_fallback_handler(self._router_callback)
logger.info("Packet router registered as fallback (catches all packets)")
# Create processing helpers (handlers created internally)
self.trace_helper = TraceHelper(
local_hash=self.local_hash,
repeater_handler=self.repeater_handler,
packet_injector=self.router.inject_packet,
log_fn=logger.info,
)
logger.info("Trace processing helper initialized")
# Create advert helper for neighbor tracking
self.advert_helper = AdvertHelper(
local_identity=self.local_identity,
storage=self.repeater_handler.storage if self.repeater_handler else None,
log_fn=logger.info,
)
logger.info("Advert processing helper initialized")
# Set up discovery handler if enabled
allow_discovery = self.config.get("repeater", {}).get("allow_discovery", True)
if allow_discovery:
self._setup_discovery_handler()
logger.info("Discovery response handler enabled")
self.discovery_helper = DiscoveryHelper(
local_identity=self.local_identity,
packet_injector=self.router.inject_packet,
node_type=2,
log_fn=logger.info,
)
logger.info("Discovery processing helper initialized")
else:
logger.info("Discovery response handler disabled")
except Exception as e:
logger.error(f"Failed to initialize dispatcher: {e}")
raise
async def _repeater_callback(self, packet):
if self.repeater_handler:
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
await self.repeater_handler(packet, metadata)
async def _trace_callback(self, packet):
try:
# Only process direct route trace packets
if packet.get_route_type() != ROUTE_TYPE_DIRECT or packet.path_len >= MAX_PATH_SIZE:
return
parsed_data = self.trace_handler._parse_trace_payload(packet.payload)
if not parsed_data.get("valid", False):
logger.warning(f"[TraceHandler] Invalid trace packet: {parsed_data.get('error', 'Unknown error')}")
return
trace_path = parsed_data["trace_path"]
trace_path_len = len(trace_path)
if self.repeater_handler:
import time
trace_path_bytes = [f"{h:02X}" for h in trace_path[:8]]
if len(trace_path) > 8:
trace_path_bytes.append("...")
path_hash = "[" + ", ".join(trace_path_bytes) + "]"
path_snrs = []
path_snr_details = []
for i in range(packet.path_len):
if i < len(packet.path):
snr_val = packet.path[i]
snr_signed = snr_val if snr_val < 128 else snr_val - 256
snr_db = snr_signed / 4.0
path_snrs.append(f"{snr_val}({snr_db:.1f}dB)")
if i < len(trace_path):
path_snr_details.append({
"hash": f"{trace_path[i]:02X}",
"snr_raw": snr_val,
"snr_db": snr_db
})
packet_record = {
"timestamp": time.time(),
"header": f"0x{packet.header:02X}" if hasattr(packet, "header") and packet.header is not None else None,
"payload": packet.payload.hex() if hasattr(packet, "payload") and packet.payload else None,
"payload_length": len(packet.payload) if hasattr(packet, "payload") and packet.payload else 0,
"type": packet.get_payload_type(), # 0x09 for trace
"route": packet.get_route_type(), # Should be direct (1)
"length": len(packet.payload or b""),
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"score": self.repeater_handler.calculate_packet_score(
getattr(packet, "snr", 0.0),
len(packet.payload or b""),
self.repeater_handler.radio_config.get("spreading_factor", 8)
),
"tx_delay_ms": 0,
"transmitted": False,
"is_duplicate": False,
"packet_hash": packet.calculate_packet_hash().hex().upper()[:16],
"drop_reason": "trace_received",
"path_hash": path_hash,
"src_hash": None,
"dst_hash": None,
"original_path": [f"{h:02X}" for h in trace_path],
"forwarded_path": None,
# Add trace-specific SNR path information
"path_snrs": path_snrs, # ["58(14.5dB)", "19(4.8dB)"]
"path_snr_details": path_snr_details, # [{"hash": "29", "snr_raw": 58, "snr_db": 14.5}]
"is_trace": True,
"raw_packet": packet.write_to().hex() if hasattr(packet, "write_to") else None,
}
self.repeater_handler.log_trace_record(packet_record)
path_snrs = []
path_hashes = []
for i in range(packet.path_len):
if i < len(packet.path):
snr_val = packet.path[i]
snr_signed = snr_val if snr_val < 128 else snr_val - 256
snr_db = snr_signed / 4.0
path_snrs.append(f"{snr_val}({snr_db:.1f}dB)")
if i < len(trace_path):
path_hashes.append(f"0x{trace_path[i]:02x}")
parsed_data["snr"] = packet.get_snr()
parsed_data["rssi"] = getattr(packet, "rssi", 0)
formatted_response = self.trace_handler._format_trace_response(parsed_data)
logger.info(f"[TraceHandler] {formatted_response}")
logger.info(f"[TraceHandler] Path SNRs: [{', '.join(path_snrs)}], Hashes: [{', '.join(path_hashes)}]")
if (packet.path_len < trace_path_len and
len(trace_path) > packet.path_len and
trace_path[packet.path_len] == self.local_hash and
self.repeater_handler and not self.repeater_handler.is_duplicate(packet)):
if self.repeater_handler and hasattr(self.repeater_handler, 'recent_packets'):
packet_hash = packet.calculate_packet_hash().hex().upper()[:16]
for record in reversed(self.repeater_handler.recent_packets):
if record.get("packet_hash") == packet_hash:
record["transmitted"] = True
record["drop_reason"] = "trace_forwarded"
break
current_snr = packet.get_snr()
snr_scaled = int(current_snr * 4)
if snr_scaled > 127:
snr_scaled = 127
elif snr_scaled < -128:
snr_scaled = -128
snr_byte = snr_scaled if snr_scaled >= 0 else (256 + snr_scaled)
while len(packet.path) <= packet.path_len:
packet.path.append(0)
packet.path[packet.path_len] = snr_byte
packet.path_len += 1
logger.info(f"[TraceHandler] Forwarding trace, stored SNR {current_snr:.1f}dB at position {packet.path_len-1}")
# Mark as seen and forward directly (bypass normal routing, no ACK required)
self.repeater_handler.mark_seen(packet)
if self.dispatcher:
await self.dispatcher.send_packet(packet, wait_for_ack=False)
else:
# Show why we didn't forward
if packet.path_len >= trace_path_len:
logger.info(f"[TraceHandler] Trace completed (reached end of path)")
elif len(trace_path) <= packet.path_len:
logger.info(f"[TraceHandler] Path index out of bounds")
elif trace_path[packet.path_len] != self.local_hash:
expected_hash = trace_path[packet.path_len] if packet.path_len < len(trace_path) else None
logger.info(f"[TraceHandler] Not our turn (next hop: 0x{expected_hash:02x})")
elif self.repeater_handler and self.repeater_handler.is_duplicate(packet):
logger.info(f"[TraceHandler] Duplicate packet, ignoring")
except Exception as e:
logger.error(f"[TraceHandler] Error processing trace packet: {e}")
def _setup_discovery_handler(self):
"""Set up discovery request/response handling."""
try:
from pymc_core.node.handlers.control import ControlHandler
self.control_handler = ControlHandler(log_fn=logger.info)
self.dispatcher.register_handler(
ControlHandler.payload_type(),
self._control_callback,
)
# Node type 2 = Repeater
node_type = 2
def on_discovery_request(request_data: dict):
"""Handle incoming discovery request."""
try:
tag = request_data.get("tag", 0)
filter_byte = request_data.get("filter", 0)
prefix_only = request_data.get("prefix_only", False)
snr = request_data.get("snr", 0.0)
rssi = request_data.get("rssi", 0)
logger.info(f"[Discovery] Request: tag=0x{tag:08X}, filter=0x{filter_byte:02X}, SNR={snr:+.1f}dB, RSSI={rssi}dBm")
# Check if filter matches our node type (repeater = 2, filter_mask = 0x04)
filter_mask = 1 << node_type # 1 << 2 = 0x04
if (filter_byte & filter_mask) == 0:
logger.debug("[Discovery] Filter doesn't match, ignoring")
return
logger.info("[Discovery] Sending response...")
if self.local_identity:
our_pub_key = self.local_identity.get_public_key()
from pymc_core.protocol.packet_builder import PacketBuilder
response_packet = PacketBuilder.create_discovery_response(
tag=tag,
node_type=node_type,
inbound_snr=snr,
pub_key=our_pub_key,
prefix_only=prefix_only,
)
# Send response asynchronously
asyncio.create_task(self._send_discovery_response(response_packet, tag))
else:
logger.warning("[Discovery] No local identity available for response")
except Exception as e:
logger.error(f"[Discovery] Error handling request: {e}")
self.control_handler.set_request_callback(on_discovery_request)
logger.debug("[Discovery] Handler registered")
except Exception as e:
logger.error(f"Failed to setup discovery handler: {e}")
async def _control_callback(self, packet):
if self.control_handler:
await self.control_handler(packet)
async def _send_discovery_response(self, packet, tag):
try:
success = await self.dispatcher.send_packet(packet, wait_for_ack=False)
if success:
logger.info(f"[Discovery] Response sent for tag 0x{tag:08X}")
else:
logger.warning(f"[Discovery] Failed to send response for tag 0x{tag:08X}")
except Exception as e:
logger.error(f"[Discovery] Error sending response: {e}")
async def _router_callback(self, packet):
"""
Single entry point for ALL packets.
Enqueues packets for router processing.
"""
if self.router:
try:
await self.router.enqueue(packet)
except Exception as e:
logger.error(f"Error enqueuing packet in router: {e}", exc_info=True)
def get_stats(self) -> dict:
stats = {}
if self.repeater_handler:
stats = self.repeater_handler.get_stats()
# Add public key if available
@@ -374,8 +171,8 @@ class RepeaterDaemon:
stats["public_key"] = pubkey.hex()
except Exception:
stats["public_key"] = None
return stats
return {}
return stats
async def send_advert(self) -> bool:
@@ -468,6 +265,8 @@ class RepeaterDaemon:
await self.dispatcher.run_forever()
except KeyboardInterrupt:
logger.info("Shutting down...")
if self.router:
await self.router.stop()
if self.http_server:
self.http_server.stop()
+135
View File
@@ -0,0 +1,135 @@
"""
Packet router for pyMC Repeater.
This module provides a simple router that routes packets to appropriate handlers
based on payload type. All statistics, queuing, and processing logic is handled
by the repeater engine for better separation of concerns.
"""
import asyncio
import logging
from pymc_core.node.handlers.trace import TraceHandler
from pymc_core.node.handlers.control import ControlHandler
from pymc_core.node.handlers.advert import AdvertHandler
logger = logging.getLogger("PacketRouter")
class PacketRouter:
"""
Simple router that processes packets through handlers sequentially.
All statistics and processing decisions are handled by the engine.
"""
def __init__(self, daemon_instance):
self.daemon = daemon_instance
self.queue = asyncio.Queue()
self.running = False
self.router_task = None
async def start(self):
"""Start the router processing task."""
self.running = True
self.router_task = asyncio.create_task(self._process_queue())
logger.info("Packet router started")
async def stop(self):
"""Stop the router processing task."""
self.running = False
if self.router_task:
self.router_task.cancel()
try:
await self.router_task
except asyncio.CancelledError:
pass
logger.info("Packet router stopped")
async def enqueue(self, packet):
"""Add packet to router queue."""
await self.queue.put(packet)
async def inject_packet(self, packet, wait_for_ack: bool = False):
"""
Inject a new packet into the system for transmission through the engine.
This method uses the engine's main packet handler with the local_transmission
flag to bypass forwarding logic while maintaining proper statistics and airtime.
Args:
packet: The packet to send
wait_for_ack: Whether to wait for acknowledgment
Returns:
True if packet was sent successfully, False otherwise
"""
try:
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
# Use local_transmission=True to bypass forwarding logic
await self.daemon.repeater_handler(packet, metadata, local_transmission=True)
packet_len = len(packet.payload) if packet.payload else 0
logger.debug(f"Injected packet processed by engine as local transmission ({packet_len} bytes)")
return True
except Exception as e:
logger.error(f"Error injecting packet through engine: {e}")
return False
async def _process_queue(self):
"""Process packets through the router queue."""
while self.running:
try:
packet = await asyncio.wait_for(self.queue.get(), timeout=0.1)
await self._route_packet(packet)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Router error: {e}", exc_info=True)
async def _route_packet(self, packet):
"""
Route a packet to appropriate handlers based on payload type.
Simple routing logic:
1. Route to specific handlers for parsing
2. Pass to repeater engine for all processing decisions
"""
payload_type = packet.get_payload_type()
processed_by_injection = False
# Route to specific handlers for parsing only
if payload_type == TraceHandler.payload_type():
# Process trace packet
if self.daemon.trace_helper:
await self.daemon.trace_helper.process_trace_packet(packet)
# Skip engine processing for trace packets - they're handled by trace helper
processed_by_injection = True
elif payload_type == ControlHandler.payload_type():
# Process control/discovery packet
if self.daemon.discovery_helper:
await self.daemon.discovery_helper.control_handler(packet)
packet.mark_do_not_retransmit()
elif payload_type == AdvertHandler.payload_type():
# Process advertisement packet for neighbor tracking
if self.daemon.advert_helper:
rssi = getattr(packet, "rssi", 0)
snr = getattr(packet, "snr", 0.0)
await self.daemon.advert_helper.process_advert_packet(packet, rssi, snr)
# Only pass to repeater engine if not already processed by injection
if self.daemon.repeater_handler and not processed_by_injection:
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
await self.daemon.repeater_handler(packet, metadata)
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
+2 -2
View File
@@ -8,8 +8,8 @@
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Noto+Sans:wght@400;500;600;700&display=swap" rel="stylesheet">
<script type="module" crossorigin src="/assets/index-D4rBXqAS.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-Dy52hcrv.css">
<script type="module" crossorigin src="/assets/index-DYqcF5Pe.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-BsJox_xF.css">
</head>
<body>
<div id="app"></div>