rename PacketPipeline with PacketRouter

This commit is contained in:
Lloyd
2025-12-01 20:56:59 +00:00
parent 58011b4993
commit d4a730bb0e
3 changed files with 145 additions and 161 deletions

View File

@@ -7,7 +7,7 @@ from repeater.config import get_radio_for_board, load_config
from repeater.engine import RepeaterHandler
from repeater.web.http_server import HTTPStatsServer, _log_buffer
from repeater.handler_helpers import TraceHelper, DiscoveryHelper, AdvertHelper
from repeater.packet_pipeline import PacketPipeline
from repeater.packet_router import PacketRouter
logger = logging.getLogger("RepeaterDaemon")
@@ -26,7 +26,7 @@ class RepeaterDaemon:
self.trace_helper = None
self.advert_helper = None
self.discovery_helper = None
self.pipeline = None
self.router = None
log_level = config.get("logging", {}).get("level", "INFO")
@@ -106,14 +106,14 @@ class RepeaterDaemon:
self.config, self.dispatcher, self.local_hash, send_advert_func=self.send_advert
)
# Create pipeline
self.pipeline = PacketPipeline(self)
await self.pipeline.start()
# Create router
self.router = PacketRouter(self)
await self.router.start()
# Register pipeline as entry point for ALL packets via fallback handler
# All received packets flow through pipeline → helpers → repeater validation
self.dispatcher.register_fallback_handler(self._pipeline_callback)
logger.info("Pipeline registered as fallback (catches all packets)")
# Register router as entry point for ALL packets via fallback handler
# All received packets flow through router → helpers → repeater engine
self.dispatcher.register_fallback_handler(self._router_callback)
logger.info("Packet router registered as fallback (catches all packets)")
# Create processing helpers (handlers created internally)
self.trace_helper = TraceHelper(
@@ -149,13 +149,13 @@ class RepeaterDaemon:
logger.error(f"Failed to initialize dispatcher: {e}")
raise
async def _pipeline_callback(self, packet):
async def _router_callback(self, packet):
"""
Single entry point for ALL packets.
Enqueues packets for pipeline processing.
Enqueues packets for router processing.
"""
if self.pipeline:
await self.pipeline.enqueue(packet)
if self.router:
await self.router.enqueue(packet)
def get_stats(self) -> dict:
stats = {}
@@ -170,10 +170,6 @@ class RepeaterDaemon:
except Exception:
stats["public_key"] = None
# Add pipeline statistics
if self.pipeline:
stats["pipeline"] = self.pipeline.get_stats()
return stats
async def send_advert(self) -> bool:
@@ -267,8 +263,8 @@ class RepeaterDaemon:
await self.dispatcher.run_forever()
except KeyboardInterrupt:
logger.info("Shutting down...")
if self.pipeline:
await self.pipeline.stop()
if self.router:
await self.router.stop()
if self.http_server:
self.http_server.stop()

View File

@@ -1,119 +1,76 @@
"""
Packet processing pipeline for pyMC Repeater.
Packet routing pipeline for pyMC Repeater.
This module provides a queue-based pipeline that processes packets through handlers
sequentially, tracks statistics, and ensures all packets flow through repeater logic.
This module provides a simple router that processes packets through handlers
sequentially. All statistics and processing logic is handled by the engine.
"""
import asyncio
import logging
import time
from collections import deque
from pymc_core.node.handlers.trace import TraceHandler
from pymc_core.node.handlers.control import ControlHandler
from pymc_core.node.handlers.advert import AdvertHandler
from pymc_core.protocol.utils import get_packet_type_name
logger = logging.getLogger("PacketPipeline")
logger = logging.getLogger("PacketRouter")
class PacketPipeline:
class PacketRouter:
"""
Pipeline that processes packets through handlers sequentially.
Tracks queue statistics and ensures all packets flow through repeater logic.
Simple router that processes packets through handlers sequentially.
All statistics and processing decisions are handled by the engine.
"""
def __init__(self, daemon_instance):
self.daemon = daemon_instance
self.queue = asyncio.Queue()
self.running = False
self.pipeline_task = None
# Statistics tracking
self.stats = {
"total_enqueued": 0,
"total_processed": 0,
"total_errors": 0,
"current_queue_size": 0,
"max_queue_size": 0,
"processing_times": deque(maxlen=100), # Last 100 processing times
"packets_by_type": {},
"packets_marked_no_retransmit": 0,
"packets_forwarded": 0,
}
self.last_stats_log = time.time()
self.router_task = None
async def start(self):
"""Start the pipeline processing task."""
"""Start the router processing task."""
self.running = True
self.pipeline_task = asyncio.create_task(self._process_pipeline())
logger.info("Packet pipeline started")
self.router_task = asyncio.create_task(self._process_queue())
logger.info("Packet router started")
async def stop(self):
"""Stop the pipeline processing task."""
"""Stop the router processing task."""
self.running = False
if self.pipeline_task:
self.pipeline_task.cancel()
if self.router_task:
self.router_task.cancel()
try:
await self.pipeline_task
await self.router_task
except asyncio.CancelledError:
pass
logger.info("Packet pipeline stopped")
self._log_final_stats()
logger.info("Packet router stopped")
async def enqueue(self, packet):
"""Add packet to pipeline queue and track statistics."""
"""Add packet to router queue."""
await self.queue.put(packet)
self.stats["total_enqueued"] += 1
self.stats["current_queue_size"] = self.queue.qsize()
# Track max queue size
if self.stats["current_queue_size"] > self.stats["max_queue_size"]:
self.stats["max_queue_size"] = self.stats["current_queue_size"]
# Log stats periodically (every 30 seconds)
now = time.time()
if now - self.last_stats_log > 30:
self._log_stats()
self.last_stats_log = now
async def _process_pipeline(self):
"""Process packets through the pipeline."""
async def _process_queue(self):
"""Process packets through the router queue."""
while self.running:
try:
packet = await asyncio.wait_for(self.queue.get(), timeout=0.1)
start_time = time.time()
await self._process_packet(packet)
processing_time = (time.time() - start_time) * 1000 # ms
self.stats["total_processed"] += 1
self.stats["current_queue_size"] = self.queue.qsize()
self.stats["processing_times"].append(processing_time)
await self._route_packet(packet)
except asyncio.TimeoutError:
continue
except Exception as e:
self.stats["total_errors"] += 1
logger.error(f"Pipeline error: {e}", exc_info=True)
logger.error(f"Router error: {e}", exc_info=True)
async def _process_packet(self, packet):
async def _route_packet(self, packet):
"""
Process a single packet through the handler pipeline.
Route a packet to appropriate handlers based on payload type.
Flow:
1. Route to specific handler based on payload type
2. Handler processes and may mark do_not_retransmit
3. If not marked, pass to repeater for forwarding
Simple routing logic:
1. Route to specific handlers for parsing
2. Pass to repeater engine for all processing decisions
"""
payload_type = packet.get_payload_type()
# Track packet type
type_name = get_packet_type_name(payload_type)
self.stats["packets_by_type"][type_name] = self.stats["packets_by_type"].get(type_name, 0) + 1
# Stage 1: Route to specific handlers
# Route to specific handlers for parsing only
if payload_type == TraceHandler.payload_type():
# Process trace packet
if self.daemon.trace_helper:
@@ -128,84 +85,15 @@ class PacketPipeline:
elif payload_type == AdvertHandler.payload_type():
# Process advertisement packet for neighbor tracking
if self.daemon.advert_helper:
# Extract metadata for advert processing
rssi = getattr(packet, "rssi", 0)
snr = getattr(packet, "snr", 0.0)
await self.daemon.advert_helper.process_advert_packet(packet, rssi, snr)
# Always pass to repeater engine for processing decisions and statistics
if self.daemon.repeater_handler:
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
# Call process_packet to get validation result and delay
snr = metadata.get("snr", 0.0)
result = self.daemon.repeater_handler.process_packet(packet, snr)
if result:
fwd_pkt, delay = result
# Calculate airtime for duty cycle tracking
from pymc_core.protocol.packet_utils import PacketTimingUtils
packet_bytes = fwd_pkt.write_to() if hasattr(fwd_pkt, "write_to") else fwd_pkt.payload or b""
airtime_ms = PacketTimingUtils.estimate_airtime_ms(
len(packet_bytes),
self.daemon.repeater_handler.radio_config
)
# Check duty cycle
can_tx, wait_time = self.daemon.repeater_handler.airtime_mgr.can_transmit(airtime_ms)
if can_tx:
# Schedule transmission with calculated delay
await self.daemon.repeater_handler.schedule_retransmit(fwd_pkt, delay, airtime_ms)
self.stats["packets_forwarded"] += 1
logger.debug(f"Packet scheduled for forwarding with {delay:.3f}s delay")
else:
logger.warning(
f"Duty cycle limit exceeded. Airtime={airtime_ms:.1f}ms, "
f"wait={wait_time:.1f}s before retry"
)
else:
logger.debug(f"Packet rejected by repeater handler: {self.daemon.repeater_handler._last_drop_reason}")
def _log_stats(self):
"""Log pipeline statistics."""
avg_processing_time = 0
if self.stats["processing_times"]:
avg_processing_time = sum(self.stats["processing_times"]) / len(self.stats["processing_times"])
logger.info(
f"[Pipeline Stats] Enqueued: {self.stats['total_enqueued']}, "
f"Processed: {self.stats['total_processed']}, "
f"Errors: {self.stats['total_errors']}, "
f"Queue: {self.stats['current_queue_size']}/{self.stats['max_queue_size']} (current/max), "
f"Avg Time: {avg_processing_time:.2f}ms, "
f"Forwarded: {self.stats['packets_forwarded']}, "
f"Marked NoRetx: {self.stats['packets_marked_no_retransmit']}"
)
# Log packet type breakdown
if self.stats["packets_by_type"]:
type_breakdown = ", ".join([f"{k}: {v}" for k, v in sorted(self.stats["packets_by_type"].items())])
logger.debug(f"[Pipeline Types] {type_breakdown}")
def _log_final_stats(self):
"""Log final statistics on shutdown."""
logger.info("=== Final Pipeline Statistics ===")
self._log_stats()
logger.info("================================")
def get_stats(self):
"""Return current pipeline statistics."""
stats_copy = self.stats.copy()
if self.stats["processing_times"]:
stats_copy["avg_processing_time_ms"] = sum(self.stats["processing_times"]) / len(self.stats["processing_times"])
else:
stats_copy["avg_processing_time_ms"] = 0
# Don't include the deque in the return value
del stats_copy["processing_times"]
return stats_copy
await self.daemon.repeater_handler(packet, metadata)

100
repeater/packet_router.py Normal file
View File

@@ -0,0 +1,100 @@
"""
Packet router for pyMC Repeater.
This module provides a simple router that routes packets to appropriate handlers
based on payload type. All statistics, queuing, and processing logic is handled
by the repeater engine for better separation of concerns.
"""
import asyncio
import logging
from pymc_core.node.handlers.trace import TraceHandler
from pymc_core.node.handlers.control import ControlHandler
from pymc_core.node.handlers.advert import AdvertHandler
logger = logging.getLogger("PacketRouter")
class PacketRouter:
"""
Simple router that processes packets through handlers sequentially.
All statistics and processing decisions are handled by the engine.
"""
def __init__(self, daemon_instance):
self.daemon = daemon_instance
self.queue = asyncio.Queue()
self.running = False
self.router_task = None
async def start(self):
"""Start the router processing task."""
self.running = True
self.router_task = asyncio.create_task(self._process_queue())
logger.info("Packet router started")
async def stop(self):
"""Stop the router processing task."""
self.running = False
if self.router_task:
self.router_task.cancel()
try:
await self.router_task
except asyncio.CancelledError:
pass
logger.info("Packet router stopped")
async def enqueue(self, packet):
"""Add packet to router queue."""
await self.queue.put(packet)
async def _process_queue(self):
"""Process packets through the router queue."""
while self.running:
try:
packet = await asyncio.wait_for(self.queue.get(), timeout=0.1)
await self._route_packet(packet)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Router error: {e}", exc_info=True)
async def _route_packet(self, packet):
"""
Route a packet to appropriate handlers based on payload type.
Simple routing logic:
1. Route to specific handlers for parsing
2. Pass to repeater engine for all processing decisions
"""
payload_type = packet.get_payload_type()
# Route to specific handlers for parsing only
if payload_type == TraceHandler.payload_type():
# Process trace packet
if self.daemon.trace_helper:
await self.daemon.trace_helper.process_trace_packet(packet)
elif payload_type == ControlHandler.payload_type():
# Process control/discovery packet
if self.daemon.discovery_helper:
await self.daemon.discovery_helper.control_handler(packet)
packet.mark_do_not_retransmit()
elif payload_type == AdvertHandler.payload_type():
# Process advertisement packet for neighbor tracking
if self.daemon.advert_helper:
rssi = getattr(packet, "rssi", 0)
snr = getattr(packet, "snr", 0.0)
await self.daemon.advert_helper.process_advert_packet(packet, rssi, snr)
# Always pass to repeater engine for processing decisions and statistics
if self.daemon.repeater_handler:
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
await self.daemon.repeater_handler(packet, metadata)