From d4a730bb0eb530aa797cfbf7ab22a55cd5edcfb1 Mon Sep 17 00:00:00 2001 From: Lloyd Date: Mon, 1 Dec 2025 20:56:59 +0000 Subject: [PATCH] rename PacketPipeline with PacketRouter --- repeater/main.py | 34 ++++--- repeater/packet_pipeline.py | 172 +++++++----------------------------- repeater/packet_router.py | 100 +++++++++++++++++++++ 3 files changed, 145 insertions(+), 161 deletions(-) create mode 100644 repeater/packet_router.py diff --git a/repeater/main.py b/repeater/main.py index 7be8558..1d10bb2 100644 --- a/repeater/main.py +++ b/repeater/main.py @@ -7,7 +7,7 @@ from repeater.config import get_radio_for_board, load_config from repeater.engine import RepeaterHandler from repeater.web.http_server import HTTPStatsServer, _log_buffer from repeater.handler_helpers import TraceHelper, DiscoveryHelper, AdvertHelper -from repeater.packet_pipeline import PacketPipeline +from repeater.packet_router import PacketRouter logger = logging.getLogger("RepeaterDaemon") @@ -26,7 +26,7 @@ class RepeaterDaemon: self.trace_helper = None self.advert_helper = None self.discovery_helper = None - self.pipeline = None + self.router = None log_level = config.get("logging", {}).get("level", "INFO") @@ -106,14 +106,14 @@ class RepeaterDaemon: self.config, self.dispatcher, self.local_hash, send_advert_func=self.send_advert ) - # Create pipeline - self.pipeline = PacketPipeline(self) - await self.pipeline.start() + # Create router + self.router = PacketRouter(self) + await self.router.start() - # Register pipeline as entry point for ALL packets via fallback handler - # All received packets flow through pipeline → helpers → repeater validation - self.dispatcher.register_fallback_handler(self._pipeline_callback) - logger.info("Pipeline registered as fallback (catches all packets)") + # Register router as entry point for ALL packets via fallback handler + # All received packets flow through router → helpers → repeater engine + self.dispatcher.register_fallback_handler(self._router_callback) + logger.info("Packet router registered as fallback (catches all packets)") # Create processing helpers (handlers created internally) self.trace_helper = TraceHelper( @@ -149,13 +149,13 @@ class RepeaterDaemon: logger.error(f"Failed to initialize dispatcher: {e}") raise - async def _pipeline_callback(self, packet): + async def _router_callback(self, packet): """ Single entry point for ALL packets. - Enqueues packets for pipeline processing. + Enqueues packets for router processing. """ - if self.pipeline: - await self.pipeline.enqueue(packet) + if self.router: + await self.router.enqueue(packet) def get_stats(self) -> dict: stats = {} @@ -170,10 +170,6 @@ class RepeaterDaemon: except Exception: stats["public_key"] = None - # Add pipeline statistics - if self.pipeline: - stats["pipeline"] = self.pipeline.get_stats() - return stats async def send_advert(self) -> bool: @@ -267,8 +263,8 @@ class RepeaterDaemon: await self.dispatcher.run_forever() except KeyboardInterrupt: logger.info("Shutting down...") - if self.pipeline: - await self.pipeline.stop() + if self.router: + await self.router.stop() if self.http_server: self.http_server.stop() diff --git a/repeater/packet_pipeline.py b/repeater/packet_pipeline.py index dce175b..0869764 100644 --- a/repeater/packet_pipeline.py +++ b/repeater/packet_pipeline.py @@ -1,119 +1,76 @@ """ -Packet processing pipeline for pyMC Repeater. +Packet routing pipeline for pyMC Repeater. -This module provides a queue-based pipeline that processes packets through handlers -sequentially, tracks statistics, and ensures all packets flow through repeater logic. +This module provides a simple router that processes packets through handlers +sequentially. All statistics and processing logic is handled by the engine. """ import asyncio import logging -import time -from collections import deque from pymc_core.node.handlers.trace import TraceHandler from pymc_core.node.handlers.control import ControlHandler from pymc_core.node.handlers.advert import AdvertHandler -from pymc_core.protocol.utils import get_packet_type_name -logger = logging.getLogger("PacketPipeline") +logger = logging.getLogger("PacketRouter") -class PacketPipeline: +class PacketRouter: """ - Pipeline that processes packets through handlers sequentially. - Tracks queue statistics and ensures all packets flow through repeater logic. + Simple router that processes packets through handlers sequentially. + All statistics and processing decisions are handled by the engine. """ def __init__(self, daemon_instance): self.daemon = daemon_instance self.queue = asyncio.Queue() self.running = False - self.pipeline_task = None - - # Statistics tracking - self.stats = { - "total_enqueued": 0, - "total_processed": 0, - "total_errors": 0, - "current_queue_size": 0, - "max_queue_size": 0, - "processing_times": deque(maxlen=100), # Last 100 processing times - "packets_by_type": {}, - "packets_marked_no_retransmit": 0, - "packets_forwarded": 0, - } - self.last_stats_log = time.time() + self.router_task = None async def start(self): - """Start the pipeline processing task.""" + """Start the router processing task.""" self.running = True - self.pipeline_task = asyncio.create_task(self._process_pipeline()) - logger.info("Packet pipeline started") + self.router_task = asyncio.create_task(self._process_queue()) + logger.info("Packet router started") async def stop(self): - """Stop the pipeline processing task.""" + """Stop the router processing task.""" self.running = False - if self.pipeline_task: - self.pipeline_task.cancel() + if self.router_task: + self.router_task.cancel() try: - await self.pipeline_task + await self.router_task except asyncio.CancelledError: pass - logger.info("Packet pipeline stopped") - self._log_final_stats() + logger.info("Packet router stopped") async def enqueue(self, packet): - """Add packet to pipeline queue and track statistics.""" + """Add packet to router queue.""" await self.queue.put(packet) - self.stats["total_enqueued"] += 1 - self.stats["current_queue_size"] = self.queue.qsize() - - # Track max queue size - if self.stats["current_queue_size"] > self.stats["max_queue_size"]: - self.stats["max_queue_size"] = self.stats["current_queue_size"] - - # Log stats periodically (every 30 seconds) - now = time.time() - if now - self.last_stats_log > 30: - self._log_stats() - self.last_stats_log = now - async def _process_pipeline(self): - """Process packets through the pipeline.""" + async def _process_queue(self): + """Process packets through the router queue.""" while self.running: try: packet = await asyncio.wait_for(self.queue.get(), timeout=0.1) - - start_time = time.time() - await self._process_packet(packet) - processing_time = (time.time() - start_time) * 1000 # ms - - self.stats["total_processed"] += 1 - self.stats["current_queue_size"] = self.queue.qsize() - self.stats["processing_times"].append(processing_time) - + await self._route_packet(packet) except asyncio.TimeoutError: continue except Exception as e: - self.stats["total_errors"] += 1 - logger.error(f"Pipeline error: {e}", exc_info=True) + logger.error(f"Router error: {e}", exc_info=True) - async def _process_packet(self, packet): + + async def _route_packet(self, packet): """ - Process a single packet through the handler pipeline. + Route a packet to appropriate handlers based on payload type. - Flow: - 1. Route to specific handler based on payload type - 2. Handler processes and may mark do_not_retransmit - 3. If not marked, pass to repeater for forwarding + Simple routing logic: + 1. Route to specific handlers for parsing + 2. Pass to repeater engine for all processing decisions """ payload_type = packet.get_payload_type() - # Track packet type - type_name = get_packet_type_name(payload_type) - self.stats["packets_by_type"][type_name] = self.stats["packets_by_type"].get(type_name, 0) + 1 - - # Stage 1: Route to specific handlers + # Route to specific handlers for parsing only if payload_type == TraceHandler.payload_type(): # Process trace packet if self.daemon.trace_helper: @@ -128,84 +85,15 @@ class PacketPipeline: elif payload_type == AdvertHandler.payload_type(): # Process advertisement packet for neighbor tracking if self.daemon.advert_helper: - # Extract metadata for advert processing rssi = getattr(packet, "rssi", 0) snr = getattr(packet, "snr", 0.0) await self.daemon.advert_helper.process_advert_packet(packet, rssi, snr) + # Always pass to repeater engine for processing decisions and statistics if self.daemon.repeater_handler: metadata = { "rssi": getattr(packet, "rssi", 0), "snr": getattr(packet, "snr", 0.0), "timestamp": getattr(packet, "timestamp", 0), } - - # Call process_packet to get validation result and delay - snr = metadata.get("snr", 0.0) - result = self.daemon.repeater_handler.process_packet(packet, snr) - - if result: - fwd_pkt, delay = result - - # Calculate airtime for duty cycle tracking - from pymc_core.protocol.packet_utils import PacketTimingUtils - packet_bytes = fwd_pkt.write_to() if hasattr(fwd_pkt, "write_to") else fwd_pkt.payload or b"" - airtime_ms = PacketTimingUtils.estimate_airtime_ms( - len(packet_bytes), - self.daemon.repeater_handler.radio_config - ) - - # Check duty cycle - can_tx, wait_time = self.daemon.repeater_handler.airtime_mgr.can_transmit(airtime_ms) - - if can_tx: - # Schedule transmission with calculated delay - await self.daemon.repeater_handler.schedule_retransmit(fwd_pkt, delay, airtime_ms) - self.stats["packets_forwarded"] += 1 - logger.debug(f"Packet scheduled for forwarding with {delay:.3f}s delay") - else: - logger.warning( - f"Duty cycle limit exceeded. Airtime={airtime_ms:.1f}ms, " - f"wait={wait_time:.1f}s before retry" - ) - else: - logger.debug(f"Packet rejected by repeater handler: {self.daemon.repeater_handler._last_drop_reason}") - - - def _log_stats(self): - """Log pipeline statistics.""" - avg_processing_time = 0 - if self.stats["processing_times"]: - avg_processing_time = sum(self.stats["processing_times"]) / len(self.stats["processing_times"]) - - logger.info( - f"[Pipeline Stats] Enqueued: {self.stats['total_enqueued']}, " - f"Processed: {self.stats['total_processed']}, " - f"Errors: {self.stats['total_errors']}, " - f"Queue: {self.stats['current_queue_size']}/{self.stats['max_queue_size']} (current/max), " - f"Avg Time: {avg_processing_time:.2f}ms, " - f"Forwarded: {self.stats['packets_forwarded']}, " - f"Marked NoRetx: {self.stats['packets_marked_no_retransmit']}" - ) - - # Log packet type breakdown - if self.stats["packets_by_type"]: - type_breakdown = ", ".join([f"{k}: {v}" for k, v in sorted(self.stats["packets_by_type"].items())]) - logger.debug(f"[Pipeline Types] {type_breakdown}") - - def _log_final_stats(self): - """Log final statistics on shutdown.""" - logger.info("=== Final Pipeline Statistics ===") - self._log_stats() - logger.info("================================") - - def get_stats(self): - """Return current pipeline statistics.""" - stats_copy = self.stats.copy() - if self.stats["processing_times"]: - stats_copy["avg_processing_time_ms"] = sum(self.stats["processing_times"]) / len(self.stats["processing_times"]) - else: - stats_copy["avg_processing_time_ms"] = 0 - # Don't include the deque in the return value - del stats_copy["processing_times"] - return stats_copy + await self.daemon.repeater_handler(packet, metadata) diff --git a/repeater/packet_router.py b/repeater/packet_router.py new file mode 100644 index 0000000..a029308 --- /dev/null +++ b/repeater/packet_router.py @@ -0,0 +1,100 @@ +""" +Packet router for pyMC Repeater. + +This module provides a simple router that routes packets to appropriate handlers +based on payload type. All statistics, queuing, and processing logic is handled +by the repeater engine for better separation of concerns. +""" + +import asyncio +import logging + +from pymc_core.node.handlers.trace import TraceHandler +from pymc_core.node.handlers.control import ControlHandler +from pymc_core.node.handlers.advert import AdvertHandler + +logger = logging.getLogger("PacketRouter") + + +class PacketRouter: + """ + Simple router that processes packets through handlers sequentially. + All statistics and processing decisions are handled by the engine. + """ + + def __init__(self, daemon_instance): + self.daemon = daemon_instance + self.queue = asyncio.Queue() + self.running = False + self.router_task = None + + async def start(self): + """Start the router processing task.""" + self.running = True + self.router_task = asyncio.create_task(self._process_queue()) + logger.info("Packet router started") + + async def stop(self): + """Stop the router processing task.""" + self.running = False + if self.router_task: + self.router_task.cancel() + try: + await self.router_task + except asyncio.CancelledError: + pass + logger.info("Packet router stopped") + + async def enqueue(self, packet): + """Add packet to router queue.""" + await self.queue.put(packet) + + async def _process_queue(self): + """Process packets through the router queue.""" + while self.running: + try: + packet = await asyncio.wait_for(self.queue.get(), timeout=0.1) + await self._route_packet(packet) + except asyncio.TimeoutError: + continue + except Exception as e: + logger.error(f"Router error: {e}", exc_info=True) + + + async def _route_packet(self, packet): + """ + Route a packet to appropriate handlers based on payload type. + + Simple routing logic: + 1. Route to specific handlers for parsing + 2. Pass to repeater engine for all processing decisions + """ + payload_type = packet.get_payload_type() + + # Route to specific handlers for parsing only + if payload_type == TraceHandler.payload_type(): + # Process trace packet + if self.daemon.trace_helper: + await self.daemon.trace_helper.process_trace_packet(packet) + + elif payload_type == ControlHandler.payload_type(): + # Process control/discovery packet + if self.daemon.discovery_helper: + await self.daemon.discovery_helper.control_handler(packet) + packet.mark_do_not_retransmit() + + elif payload_type == AdvertHandler.payload_type(): + # Process advertisement packet for neighbor tracking + if self.daemon.advert_helper: + rssi = getattr(packet, "rssi", 0) + snr = getattr(packet, "snr", 0.0) + await self.daemon.advert_helper.process_advert_packet(packet, rssi, snr) + + # Always pass to repeater engine for processing decisions and statistics + if self.daemon.repeater_handler: + metadata = { + "rssi": getattr(packet, "rssi", 0), + "snr": getattr(packet, "snr", 0.0), + "timestamp": getattr(packet, "timestamp", 0), + } + await self.daemon.repeater_handler(packet, metadata)