Refactor packet handling to use packet injector for sending responses in DiscoveryHelper and TraceHelper; remove PacketRouter as it's no longer needed.

This commit is contained in:
Lloyd
2025-12-01 22:05:02 +00:00
parent ea54e86585
commit 60964ea13d
5 changed files with 80 additions and 152 deletions

View File

@@ -20,7 +20,7 @@ class DiscoveryHelper:
def __init__(
self,
local_identity,
dispatcher,
packet_injector=None,
node_type: int = 2,
log_fn=None,
):
@@ -29,12 +29,12 @@ class DiscoveryHelper:
Args:
local_identity: The LocalIdentity instance for this repeater
dispatcher: The Dispatcher instance for sending packets
packet_injector: Callable to inject new packets into the router for sending
node_type: Node type identifier (2 = Repeater)
log_fn: Optional logging function for ControlHandler
"""
self.local_identity = local_identity
self.dispatcher = dispatcher
self.packet_injector = packet_injector # Function to inject packets into router
self.node_type = node_type
# Create ControlHandler internally as a parsing utility
@@ -108,22 +108,25 @@ class DiscoveryHelper:
prefix_only=prefix_only,
)
# Send response asynchronously
asyncio.create_task(self._send_packet_async(response_packet, tag))
# Send response via router injection
if self.packet_injector:
asyncio.create_task(self._send_packet_async(response_packet, tag))
else:
logger.warning("No packet injector available - discovery response not sent")
except Exception as e:
logger.error(f"Error creating discovery response: {e}")
async def _send_packet_async(self, packet, tag: int) -> None:
"""
Send a discovery response packet asynchronously.
Send a discovery response packet via router injection.
Args:
packet: The packet to send
tag: The tag for logging purposes
"""
try:
success = await self.dispatcher.send_packet(packet, wait_for_ack=False)
success = await self.packet_injector(packet, wait_for_ack=False)
if success:
logger.info(f"Response sent for tag 0x{tag:08X}")
else:

View File

@@ -19,19 +19,19 @@ logger = logging.getLogger("TraceHelper")
class TraceHelper:
"""Helper class for processing trace packets in the repeater."""
def __init__(self, local_hash: int, repeater_handler, dispatcher, log_fn=None):
def __init__(self, local_hash: int, repeater_handler, packet_injector=None, log_fn=None):
"""
Initialize the trace helper.
Args:
local_hash: The local node's hash identifier
repeater_handler: The RepeaterHandler instance
dispatcher: The Dispatcher instance for sending packets
packet_injector: Callable to inject new packets into the router for sending
log_fn: Optional logging function for TraceHandler
"""
self.local_hash = local_hash
self.repeater_handler = repeater_handler
self.dispatcher = dispatcher
self.packet_injector = packet_injector # Function to inject packets into router
# Create TraceHandler internally as a parsing utility
self.trace_handler = TraceHandler(log_fn=log_fn or logger.info)
@@ -84,8 +84,13 @@ class TraceHelper:
if should_forward:
await self._forward_trace_packet(packet, trace_path_len)
# Packet was sent directly, but let it flow back to engine for standard logging
# The engine will see do_not_retransmit flag and won't try to send it again
else:
# This is the final destination or can't forward - just log and record
self._log_no_forward_reason(packet, trace_path, trace_path_len)
# Mark packet to not be retransmitted since we're not forwarding
packet.mark_do_not_retransmit()
except Exception as e:
logger.error(f"Error processing trace packet: {e}")
@@ -188,6 +193,7 @@ class TraceHelper:
def _should_forward_trace(self, packet, trace_path: list, trace_path_len: int) -> bool:
"""
Determine if this node should forward the trace packet.
Uses the same logic as the original working implementation.
Args:
packet: The trace packet
@@ -197,35 +203,22 @@ class TraceHelper:
Returns:
True if the packet should be forwarded, False otherwise
"""
# Check if we've reached the end of the trace path
if packet.path_len >= trace_path_len:
return False
# Check if path index is valid
if len(trace_path) <= packet.path_len:
return False
# Check if this node is the next hop
if trace_path[packet.path_len] != self.local_hash:
return False
# Check for duplicates
if self.repeater_handler and self.repeater_handler.is_duplicate(packet):
return False
return True
# Use the exact logic from the original working code
return (packet.path_len < trace_path_len and
len(trace_path) > packet.path_len and
trace_path[packet.path_len] == self.local_hash and
self.repeater_handler and not self.repeater_handler.is_duplicate(packet))
async def _forward_trace_packet(self, packet, trace_path_len: int) -> None:
"""
Forward a trace packet by appending SNR and setting up correct routing.
Forward a trace packet by appending SNR and sending directly.
Args:
packet: The trace packet to forward
trace_path_len: The length of the trace path
"""
# Update the packet record to show it was transmitted
# Update the packet record to show it will be transmitted
if self.repeater_handler and hasattr(self.repeater_handler, 'recent_packets'):
packet_hash = packet.calculate_packet_hash().hex().upper()[:16]
packet_hash = packet.calculate_packet_hash().hex()[:16]
for record in reversed(self.repeater_handler.recent_packets):
if record.get("packet_hash") == packet_hash:
record["transmitted"] = True
@@ -257,27 +250,16 @@ class TraceHelper:
f"Forwarding trace, stored SNR {current_snr:.1f}dB at position {packet.path_len - 1}"
)
# For direct trace packets, we need to update the routing path to point to next hop
# Parse the trace payload to get the trace route
parsed_data = self.trace_handler._parse_trace_payload(packet.payload)
if parsed_data.get("valid", False):
trace_path = parsed_data["trace_path"]
# Inject packet into router for proper routing and transmission
# Router will handle marking as seen to prevent duplicate processing
if self.packet_injector:
await self.packet_injector(packet, wait_for_ack=False)
else:
logger.warning("No packet injector available - trace packet not forwarded")
# Check if there's a next hop after current position
if packet.path_len < len(trace_path):
next_hop = trace_path[packet.path_len]
# Set up direct routing to next hop by putting it at front of path
# The SNR data stays in the path, but we prepend the next hop for routing
packet.path = bytearray([next_hop] + list(packet.path))
packet.path_len = len(packet.path)
logger.debug(f"Set next trace hop to 0x{next_hop:02X}")
else:
logger.info("Trace reached end of route")
# Don't mark as seen - let the packet flow to repeater handler for normal processing
# The repeater handler will handle duplicate detection and forwarding logic
# Mark as do_not_retransmit so engine won't try to send it again
# but allow it to flow back for standard packet logging
packet.mark_do_not_retransmit()
def _log_no_forward_reason(self, packet, trace_path: list, trace_path_len: int) -> None:
"""

View File

@@ -119,7 +119,7 @@ class RepeaterDaemon:
self.trace_helper = TraceHelper(
local_hash=self.local_hash,
repeater_handler=self.repeater_handler,
dispatcher=self.dispatcher,
packet_injector=self.router.inject_packet,
log_fn=logger.info,
)
logger.info("Trace processing helper initialized")
@@ -137,7 +137,7 @@ class RepeaterDaemon:
if allow_discovery:
self.discovery_helper = DiscoveryHelper(
local_identity=self.local_identity,
dispatcher=self.dispatcher,
packet_injector=self.router.inject_packet,
node_type=2,
log_fn=logger.info,
)

View File

@@ -1,99 +0,0 @@
"""
Packet routing pipeline for pyMC Repeater.
This module provides a simple router that processes packets through handlers
sequentially. All statistics and processing logic is handled by the engine.
"""
import asyncio
import logging
from pymc_core.node.handlers.trace import TraceHandler
from pymc_core.node.handlers.control import ControlHandler
from pymc_core.node.handlers.advert import AdvertHandler
logger = logging.getLogger("PacketRouter")
class PacketRouter:
"""
Simple router that processes packets through handlers sequentially.
All statistics and processing decisions are handled by the engine.
"""
def __init__(self, daemon_instance):
self.daemon = daemon_instance
self.queue = asyncio.Queue()
self.running = False
self.router_task = None
async def start(self):
"""Start the router processing task."""
self.running = True
self.router_task = asyncio.create_task(self._process_queue())
logger.info("Packet router started")
async def stop(self):
"""Stop the router processing task."""
self.running = False
if self.router_task:
self.router_task.cancel()
try:
await self.router_task
except asyncio.CancelledError:
pass
logger.info("Packet router stopped")
async def enqueue(self, packet):
"""Add packet to router queue."""
await self.queue.put(packet)
async def _process_queue(self):
"""Process packets through the router queue."""
while self.running:
try:
packet = await asyncio.wait_for(self.queue.get(), timeout=0.1)
await self._route_packet(packet)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Router error: {e}", exc_info=True)
async def _route_packet(self, packet):
"""
Route a packet to appropriate handlers based on payload type.
Simple routing logic:
1. Route to specific handlers for parsing
2. Pass to repeater engine for all processing decisions
"""
payload_type = packet.get_payload_type()
# Route to specific handlers for parsing only
if payload_type == TraceHandler.payload_type():
# Process trace packet
if self.daemon.trace_helper:
await self.daemon.trace_helper.process_trace_packet(packet)
elif payload_type == ControlHandler.payload_type():
# Process control/discovery packet
if self.daemon.discovery_helper:
await self.daemon.discovery_helper.control_handler(packet)
packet.mark_do_not_retransmit()
elif payload_type == AdvertHandler.payload_type():
# Process advertisement packet for neighbor tracking
if self.daemon.advert_helper:
rssi = getattr(packet, "rssi", 0)
snr = getattr(packet, "snr", 0.0)
await self.daemon.advert_helper.process_advert_packet(packet, rssi, snr)
# Always pass to repeater engine for processing decisions and statistics
if self.daemon.repeater_handler:
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
await self.daemon.repeater_handler(packet, metadata)

View File

@@ -48,6 +48,48 @@ class PacketRouter:
async def enqueue(self, packet):
"""Add packet to router queue."""
await self.queue.put(packet)
async def inject_packet(self, packet, wait_for_ack: bool = False):
"""
Inject a new packet into the system for direct transmission.
This method bypasses the normal routing and sends packets directly
via the dispatcher. Used by helpers to send response packets.
IMPORTANT: Pre-marks packet as seen to prevent processing our own
transmitted packets as duplicates when they're received back.
Args:
packet: The packet to send
wait_for_ack: Whether to wait for acknowledgment
Returns:
True if packet was sent successfully, False otherwise
"""
try:
# Pre-mark the packet as seen BEFORE transmission to prevent
# our own radio from processing it as an incoming duplicate
if hasattr(self.daemon, 'repeater_handler') and self.daemon.repeater_handler:
self.daemon.repeater_handler.mark_seen(packet)
logger.debug("Pre-marked injected packet as seen to prevent duplicate processing")
if hasattr(self.daemon, 'dispatcher') and self.daemon.dispatcher:
success = await self.daemon.dispatcher.send_packet(packet, wait_for_ack=wait_for_ack)
if success:
packet_len = len(packet.payload) if packet.payload else 0
logger.debug(f"Injected packet sent successfully ({packet_len} bytes)")
else:
logger.warning("Failed to send injected packet")
return success
else:
logger.error("No dispatcher available for packet injection")
return False
except Exception as e:
logger.error(f"Error injecting packet: {e}")
return False
async def _process_queue(self):
"""Process packets through the router queue."""