From 60964ea13dce5e17d91900f3f9c8bba6084fab6b Mon Sep 17 00:00:00 2001 From: Lloyd Date: Mon, 1 Dec 2025 22:05:02 +0000 Subject: [PATCH] Refactor packet handling to use packet injector for sending responses in DiscoveryHelper and TraceHelper; remove PacketRouter as it's no longer needed. --- repeater/handler_helpers/discovery.py | 17 +++-- repeater/handler_helpers/trace.py | 70 +++++++------------ repeater/main.py | 4 +- repeater/packet_pipeline.py | 99 --------------------------- repeater/packet_router.py | 42 ++++++++++++ 5 files changed, 80 insertions(+), 152 deletions(-) delete mode 100644 repeater/packet_pipeline.py diff --git a/repeater/handler_helpers/discovery.py b/repeater/handler_helpers/discovery.py index 5269e10..2c9fd59 100644 --- a/repeater/handler_helpers/discovery.py +++ b/repeater/handler_helpers/discovery.py @@ -20,7 +20,7 @@ class DiscoveryHelper: def __init__( self, local_identity, - dispatcher, + packet_injector=None, node_type: int = 2, log_fn=None, ): @@ -29,12 +29,12 @@ class DiscoveryHelper: Args: local_identity: The LocalIdentity instance for this repeater - dispatcher: The Dispatcher instance for sending packets + packet_injector: Callable to inject new packets into the router for sending node_type: Node type identifier (2 = Repeater) log_fn: Optional logging function for ControlHandler """ self.local_identity = local_identity - self.dispatcher = dispatcher + self.packet_injector = packet_injector # Function to inject packets into router self.node_type = node_type # Create ControlHandler internally as a parsing utility @@ -108,22 +108,25 @@ class DiscoveryHelper: prefix_only=prefix_only, ) - # Send response asynchronously - asyncio.create_task(self._send_packet_async(response_packet, tag)) + # Send response via router injection + if self.packet_injector: + asyncio.create_task(self._send_packet_async(response_packet, tag)) + else: + logger.warning("No packet injector available - discovery response not sent") except Exception as e: logger.error(f"Error creating discovery response: {e}") async def _send_packet_async(self, packet, tag: int) -> None: """ - Send a discovery response packet asynchronously. + Send a discovery response packet via router injection. Args: packet: The packet to send tag: The tag for logging purposes """ try: - success = await self.dispatcher.send_packet(packet, wait_for_ack=False) + success = await self.packet_injector(packet, wait_for_ack=False) if success: logger.info(f"Response sent for tag 0x{tag:08X}") else: diff --git a/repeater/handler_helpers/trace.py b/repeater/handler_helpers/trace.py index a80d93c..3ca96ac 100644 --- a/repeater/handler_helpers/trace.py +++ b/repeater/handler_helpers/trace.py @@ -19,19 +19,19 @@ logger = logging.getLogger("TraceHelper") class TraceHelper: """Helper class for processing trace packets in the repeater.""" - def __init__(self, local_hash: int, repeater_handler, dispatcher, log_fn=None): + def __init__(self, local_hash: int, repeater_handler, packet_injector=None, log_fn=None): """ Initialize the trace helper. Args: local_hash: The local node's hash identifier repeater_handler: The RepeaterHandler instance - dispatcher: The Dispatcher instance for sending packets + packet_injector: Callable to inject new packets into the router for sending log_fn: Optional logging function for TraceHandler """ self.local_hash = local_hash self.repeater_handler = repeater_handler - self.dispatcher = dispatcher + self.packet_injector = packet_injector # Function to inject packets into router # Create TraceHandler internally as a parsing utility self.trace_handler = TraceHandler(log_fn=log_fn or logger.info) @@ -84,8 +84,13 @@ class TraceHelper: if should_forward: await self._forward_trace_packet(packet, trace_path_len) + # Packet was sent directly, but let it flow back to engine for standard logging + # The engine will see do_not_retransmit flag and won't try to send it again else: + # This is the final destination or can't forward - just log and record self._log_no_forward_reason(packet, trace_path, trace_path_len) + # Mark packet to not be retransmitted since we're not forwarding + packet.mark_do_not_retransmit() except Exception as e: logger.error(f"Error processing trace packet: {e}") @@ -188,6 +193,7 @@ class TraceHelper: def _should_forward_trace(self, packet, trace_path: list, trace_path_len: int) -> bool: """ Determine if this node should forward the trace packet. + Uses the same logic as the original working implementation. Args: packet: The trace packet @@ -197,35 +203,22 @@ class TraceHelper: Returns: True if the packet should be forwarded, False otherwise """ - # Check if we've reached the end of the trace path - if packet.path_len >= trace_path_len: - return False - - # Check if path index is valid - if len(trace_path) <= packet.path_len: - return False - - # Check if this node is the next hop - if trace_path[packet.path_len] != self.local_hash: - return False - - # Check for duplicates - if self.repeater_handler and self.repeater_handler.is_duplicate(packet): - return False - - return True + # Use the exact logic from the original working code + return (packet.path_len < trace_path_len and + len(trace_path) > packet.path_len and + trace_path[packet.path_len] == self.local_hash and + self.repeater_handler and not self.repeater_handler.is_duplicate(packet)) async def _forward_trace_packet(self, packet, trace_path_len: int) -> None: """ - Forward a trace packet by appending SNR and setting up correct routing. - + Forward a trace packet by appending SNR and sending directly. Args: packet: The trace packet to forward trace_path_len: The length of the trace path """ - # Update the packet record to show it was transmitted + # Update the packet record to show it will be transmitted if self.repeater_handler and hasattr(self.repeater_handler, 'recent_packets'): - packet_hash = packet.calculate_packet_hash().hex().upper()[:16] + packet_hash = packet.calculate_packet_hash().hex()[:16] for record in reversed(self.repeater_handler.recent_packets): if record.get("packet_hash") == packet_hash: record["transmitted"] = True @@ -257,27 +250,16 @@ class TraceHelper: f"Forwarding trace, stored SNR {current_snr:.1f}dB at position {packet.path_len - 1}" ) - # For direct trace packets, we need to update the routing path to point to next hop - # Parse the trace payload to get the trace route - parsed_data = self.trace_handler._parse_trace_payload(packet.payload) - if parsed_data.get("valid", False): - trace_path = parsed_data["trace_path"] + # Inject packet into router for proper routing and transmission + # Router will handle marking as seen to prevent duplicate processing + if self.packet_injector: + await self.packet_injector(packet, wait_for_ack=False) + else: + logger.warning("No packet injector available - trace packet not forwarded") - # Check if there's a next hop after current position - if packet.path_len < len(trace_path): - next_hop = trace_path[packet.path_len] - - # Set up direct routing to next hop by putting it at front of path - # The SNR data stays in the path, but we prepend the next hop for routing - packet.path = bytearray([next_hop] + list(packet.path)) - packet.path_len = len(packet.path) - - logger.debug(f"Set next trace hop to 0x{next_hop:02X}") - else: - logger.info("Trace reached end of route") - - # Don't mark as seen - let the packet flow to repeater handler for normal processing - # The repeater handler will handle duplicate detection and forwarding logic + # Mark as do_not_retransmit so engine won't try to send it again + # but allow it to flow back for standard packet logging + packet.mark_do_not_retransmit() def _log_no_forward_reason(self, packet, trace_path: list, trace_path_len: int) -> None: """ diff --git a/repeater/main.py b/repeater/main.py index 1d10bb2..11582c0 100644 --- a/repeater/main.py +++ b/repeater/main.py @@ -119,7 +119,7 @@ class RepeaterDaemon: self.trace_helper = TraceHelper( local_hash=self.local_hash, repeater_handler=self.repeater_handler, - dispatcher=self.dispatcher, + packet_injector=self.router.inject_packet, log_fn=logger.info, ) logger.info("Trace processing helper initialized") @@ -137,7 +137,7 @@ class RepeaterDaemon: if allow_discovery: self.discovery_helper = DiscoveryHelper( local_identity=self.local_identity, - dispatcher=self.dispatcher, + packet_injector=self.router.inject_packet, node_type=2, log_fn=logger.info, ) diff --git a/repeater/packet_pipeline.py b/repeater/packet_pipeline.py deleted file mode 100644 index 0869764..0000000 --- a/repeater/packet_pipeline.py +++ /dev/null @@ -1,99 +0,0 @@ -""" -Packet routing pipeline for pyMC Repeater. - -This module provides a simple router that processes packets through handlers -sequentially. All statistics and processing logic is handled by the engine. -""" - -import asyncio -import logging - -from pymc_core.node.handlers.trace import TraceHandler -from pymc_core.node.handlers.control import ControlHandler -from pymc_core.node.handlers.advert import AdvertHandler - -logger = logging.getLogger("PacketRouter") - - -class PacketRouter: - """ - Simple router that processes packets through handlers sequentially. - All statistics and processing decisions are handled by the engine. - """ - - def __init__(self, daemon_instance): - self.daemon = daemon_instance - self.queue = asyncio.Queue() - self.running = False - self.router_task = None - - async def start(self): - """Start the router processing task.""" - self.running = True - self.router_task = asyncio.create_task(self._process_queue()) - logger.info("Packet router started") - - async def stop(self): - """Stop the router processing task.""" - self.running = False - if self.router_task: - self.router_task.cancel() - try: - await self.router_task - except asyncio.CancelledError: - pass - logger.info("Packet router stopped") - - async def enqueue(self, packet): - """Add packet to router queue.""" - await self.queue.put(packet) - - async def _process_queue(self): - """Process packets through the router queue.""" - while self.running: - try: - packet = await asyncio.wait_for(self.queue.get(), timeout=0.1) - await self._route_packet(packet) - except asyncio.TimeoutError: - continue - except Exception as e: - logger.error(f"Router error: {e}", exc_info=True) - - - async def _route_packet(self, packet): - """ - Route a packet to appropriate handlers based on payload type. - - Simple routing logic: - 1. Route to specific handlers for parsing - 2. Pass to repeater engine for all processing decisions - """ - payload_type = packet.get_payload_type() - - # Route to specific handlers for parsing only - if payload_type == TraceHandler.payload_type(): - # Process trace packet - if self.daemon.trace_helper: - await self.daemon.trace_helper.process_trace_packet(packet) - - elif payload_type == ControlHandler.payload_type(): - # Process control/discovery packet - if self.daemon.discovery_helper: - await self.daemon.discovery_helper.control_handler(packet) - packet.mark_do_not_retransmit() - - elif payload_type == AdvertHandler.payload_type(): - # Process advertisement packet for neighbor tracking - if self.daemon.advert_helper: - rssi = getattr(packet, "rssi", 0) - snr = getattr(packet, "snr", 0.0) - await self.daemon.advert_helper.process_advert_packet(packet, rssi, snr) - - # Always pass to repeater engine for processing decisions and statistics - if self.daemon.repeater_handler: - metadata = { - "rssi": getattr(packet, "rssi", 0), - "snr": getattr(packet, "snr", 0.0), - "timestamp": getattr(packet, "timestamp", 0), - } - await self.daemon.repeater_handler(packet, metadata) diff --git a/repeater/packet_router.py b/repeater/packet_router.py index a029308..43383fd 100644 --- a/repeater/packet_router.py +++ b/repeater/packet_router.py @@ -48,6 +48,48 @@ class PacketRouter: async def enqueue(self, packet): """Add packet to router queue.""" await self.queue.put(packet) + + async def inject_packet(self, packet, wait_for_ack: bool = False): + """ + Inject a new packet into the system for direct transmission. + + This method bypasses the normal routing and sends packets directly + via the dispatcher. Used by helpers to send response packets. + + IMPORTANT: Pre-marks packet as seen to prevent processing our own + transmitted packets as duplicates when they're received back. + + Args: + packet: The packet to send + wait_for_ack: Whether to wait for acknowledgment + + Returns: + True if packet was sent successfully, False otherwise + """ + try: + # Pre-mark the packet as seen BEFORE transmission to prevent + # our own radio from processing it as an incoming duplicate + if hasattr(self.daemon, 'repeater_handler') and self.daemon.repeater_handler: + self.daemon.repeater_handler.mark_seen(packet) + logger.debug("Pre-marked injected packet as seen to prevent duplicate processing") + + if hasattr(self.daemon, 'dispatcher') and self.daemon.dispatcher: + success = await self.daemon.dispatcher.send_packet(packet, wait_for_ack=wait_for_ack) + + if success: + packet_len = len(packet.payload) if packet.payload else 0 + logger.debug(f"Injected packet sent successfully ({packet_len} bytes)") + else: + logger.warning("Failed to send injected packet") + + return success + else: + logger.error("No dispatcher available for packet injection") + return False + + except Exception as e: + logger.error(f"Error injecting packet: {e}") + return False async def _process_queue(self): """Process packets through the router queue."""