Files

462 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import logging
import time
from pymc_core.node.handlers.ack import AckHandler
from pymc_core.node.handlers.advert import AdvertHandler
from pymc_core.node.handlers.control import ControlHandler
from pymc_core.node.handlers.group_text import GroupTextHandler
from pymc_core.node.handlers.login_response import LoginResponseHandler
from pymc_core.node.handlers.login_server import LoginServerHandler
from pymc_core.node.handlers.path import PathHandler
from pymc_core.node.handlers.protocol_request import ProtocolRequestHandler
from pymc_core.node.handlers.protocol_response import ProtocolResponseHandler
from pymc_core.node.handlers.text import TextMessageHandler
from pymc_core.node.handlers.trace import TraceHandler
from pymc_core.protocol.constants import (
PH_ROUTE_MASK,
ROUTE_TYPE_DIRECT,
ROUTE_TYPE_TRANSPORT_DIRECT,
)
logger = logging.getLogger("PacketRouter")
# Deliver PATH and protocol-response (PATH) to companion at most once per logical packet
# so the client is not spammed with duplicate telemetry when the mesh delivers multiple copies.
_COMPANION_DEDUPE_TTL_SEC = 60.0
def _companion_dedup_key(packet) -> str | None:
"""Return a stable key for companion delivery deduplication, or None if not available."""
try:
return packet.calculate_packet_hash().hex().upper()
except Exception:
return None
def _is_direct_final_hop(packet) -> bool:
"""True if packet is DIRECT (or TRANSPORT_DIRECT) with empty path — we're the final destination."""
route = getattr(packet, "header", 0) & PH_ROUTE_MASK
if route != ROUTE_TYPE_DIRECT and route != ROUTE_TYPE_TRANSPORT_DIRECT:
return False
path = getattr(packet, "path", None)
return not path or len(path) == 0
class PacketRouter:
def __init__(self, daemon_instance):
self.daemon = daemon_instance
self.queue = asyncio.Queue(maxsize=500)
self.running = False
self.router_task = None
# Serialize injects so one local TX completes before the next is processed
self._inject_lock = asyncio.Lock()
# Hash -> expiry time; skip delivering same PATH/protocol-response to companions more than once
self._companion_delivered = {}
# Safety valve: cap the number of _route_packet tasks sleeping concurrently.
# LoRa's airtime budget naturally limits throughput, but burst arrivals
# (multi-hop amplification, collision retries) can stack many sleeping
# delay tasks before the duty-cycle gate fires. 30 is very generous for
# any realistic LoRa network but protects against pathological scenarios
# (e.g. a busy bridge node during a mesh-wide flood) exhausting memory or
# starving the event loop.
self._in_flight: int = 0
self._max_in_flight: int = 30
# Live set of in-flight tasks — kept in sync with _in_flight via the
# done-callback. Used exclusively for shutdown drain; the integer
# counter is used for the cap check (faster, single source of truth).
self._route_tasks: set = set()
# Total packets dropped because the cap was reached. Exposed in logs
# at shutdown so operators know whether the cap is actually firing.
self._cap_drop_count: int = 0
async def start(self):
self.running = True
self.router_task = asyncio.create_task(self._process_queue())
logger.info("Packet router started")
async def stop(self):
self.running = False
if self.router_task:
self.router_task.cancel()
try:
await self.router_task
except asyncio.CancelledError:
pass
# Drain in-flight tasks gracefully, then cancel any that outlast the
# timeout. This mirrors what the old _route_tasks set enabled and gives
# in-progress packets a fair chance to finish (e.g. their TX delay sleep
# + send) before the process exits.
if self._route_tasks:
pending_snapshot = set(self._route_tasks)
logger.info(
"Draining %d in-flight route task(s) (5 s timeout)...",
len(pending_snapshot),
)
_, still_pending = await asyncio.wait(pending_snapshot, timeout=5.0)
if still_pending:
logger.warning(
"Cancelling %d route task(s) that did not finish within the shutdown timeout",
len(still_pending),
)
for task in still_pending:
task.cancel()
await asyncio.gather(*still_pending, return_exceptions=True)
if self._cap_drop_count:
logger.warning(
"In-flight cap dropped %d packet(s) during this session — "
"consider raising _max_in_flight if this is frequent",
self._cap_drop_count,
)
logger.info("Packet router stopped")
def _on_route_done(self, task: asyncio.Task) -> None:
"""Done-callback for _route_packet tasks: decrement counter and surface errors."""
self._in_flight -= 1
self._route_tasks.discard(task)
if not task.cancelled():
exc = task.exception()
if exc is not None:
logger.error("_route_packet raised: %s", exc, exc_info=exc)
def _should_deliver_path_to_companions(self, packet) -> bool:
"""Return True if this PATH/protocol-response should be delivered to companions (first of duplicates)."""
key = _companion_dedup_key(packet)
if not key:
return True
now = time.time()
# Prune expired entries only when the dict grows large, avoiding a full
# dict comprehension on every packet. 200 entries × 60 s TTL means a
# sweep only triggers after ~200 unique PATH packets with no expiry — far
# more than any realistic companion session, and well below the 1000-entry
# threshold that could accumulate over hours without pruning.
if len(self._companion_delivered) > 200:
self._companion_delivered = {
k: v for k, v in self._companion_delivered.items() if v > now
}
if key in self._companion_delivered:
return False
self._companion_delivered[key] = now + _COMPANION_DEDUPE_TTL_SEC
return True
def _record_for_ui(self, packet, metadata: dict) -> None:
"""Record an injection-only packet for the web UI (storage + recent_packets)."""
handler = getattr(self.daemon, "repeater_handler", None)
if handler and getattr(handler, "storage", None):
try:
handler.record_packet_only(packet, metadata)
except Exception as e:
logger.debug("Record for UI failed: %s", e)
async def enqueue(self, packet):
"""Add packet to router queue."""
if self.queue.full():
logger.warning("Packet router queue full (%d), dropping oldest", self.queue.maxsize)
try:
self.queue.get_nowait()
except asyncio.QueueEmpty:
pass
await self.queue.put(packet)
async def inject_packet(self, packet, wait_for_ack: bool = False):
try:
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
# Serialize injects so one local TX completes before the next runs
# (avoids duty-cycle or dispatcher races where a later packet goes out first)
async with self._inject_lock:
# Use local_transmission=True to bypass forwarding logic
await self.daemon.repeater_handler(
packet, metadata, local_transmission=True
)
# Mark so when this packet is dequeued we don't pass to engine again (avoid double-send / double-count)
packet._injected_for_tx = True
# Enqueue so router can deliver to companion(s): TXT_MSG -> dest bridge, ACK -> all bridges (sender sees ACK)
await self.enqueue(packet)
packet_len = len(packet.payload) if packet.payload else 0
logger.debug(
f"Injected packet processed by engine as local transmission ({packet_len} bytes)"
)
# Log protocol REQ (e.g. status/telemetry) so we can confirm target node
ptype = getattr(packet, "get_payload_type", lambda: None)()
if ptype == ProtocolRequestHandler.payload_type() and packet.payload and packet_len >= 1:
logger.info(
"Injected protocol REQ: dest=0x%02x, payload=%d bytes",
packet.payload[0],
packet_len,
)
return True
except Exception as e:
logger.error(f"Error injecting packet through engine: {e}")
return False
async def _process_queue(self):
while self.running:
try:
packet = await asyncio.wait_for(self.queue.get(), timeout=0.1)
# Drop early if the in-flight cap is reached. This is a last-resort
# safety valve — under normal operation LoRa airtime and the duty-cycle
# gate keep _in_flight well below _max_in_flight.
if self._in_flight >= self._max_in_flight:
self._cap_drop_count += 1
logger.warning(
"In-flight task cap reached (%d/%d), dropping packet "
"(session total dropped: %d)",
self._in_flight, self._max_in_flight, self._cap_drop_count,
)
continue
self._in_flight += 1
task = asyncio.create_task(self._route_packet(packet))
self._route_tasks.add(task)
task.add_done_callback(self._on_route_done)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Router error: {e}", exc_info=True)
async def _route_packet(self, packet):
payload_type = packet.get_payload_type()
processed_by_injection = False
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
# Route to specific handlers for parsing only
if payload_type == TraceHandler.payload_type():
# Locally injected TRACE requests are TX-only and re-enter the router so
# companion delivery can still happen. They are not inbound RF responses,
# so skip TraceHelper parsing to avoid matching pending ping tags against
# zeroed local metadata.
if getattr(packet, "_injected_for_tx", False):
processed_by_injection = True
elif self.daemon.trace_helper:
await self.daemon.trace_helper.process_trace_packet(packet)
# Skip engine processing for trace packets - they're handled by trace helper
processed_by_injection = True
# Do not call _record_for_ui: TraceHelper.log_trace_record already persists the
# trace path from the payload. record_packet_only would treat packet.path (SNR bytes)
# as routing hashes and log bogus duplicate rows.
elif payload_type == ControlHandler.payload_type():
# Process control/discovery packet
if self.daemon.discovery_helper:
await self.daemon.discovery_helper.control_handler(packet)
packet.mark_do_not_retransmit()
# Deliver to companions via daemon (frame servers push PUSH_CODE_CONTROL_DATA 0x8E)
deliver = getattr(self.daemon, "deliver_control_data", None)
if deliver:
snr = getattr(packet, "_snr", None) or getattr(packet, "snr", 0.0)
rssi = getattr(packet, "_rssi", None) or getattr(packet, "rssi", 0)
path_len = getattr(packet, "path_len", 0) or 0
path_bytes = (
bytes(getattr(packet, "path", []))
if getattr(packet, "path", None) is not None
else b""
)[:path_len]
payload_bytes = bytes(packet.payload) if packet.payload else b""
await deliver(snr, rssi, path_len, path_bytes, payload_bytes)
elif payload_type == AdvertHandler.payload_type():
# Process advertisement packet for neighbor tracking
if self.daemon.advert_helper:
rssi = getattr(packet, "rssi", 0)
snr = getattr(packet, "snr", 0.0)
await self.daemon.advert_helper.process_advert_packet(packet, rssi, snr)
# Also feed adverts to companion bridges (for contact/path updates)
for bridge in getattr(self.daemon, "companion_bridges", {}).values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge advert error: {e}")
elif payload_type == LoginServerHandler.payload_type():
# Route to companion if dest is a companion; else to login_helper (for logging into this repeater).
# When dest is remote (not handled), pass to engine so DIRECT/FLOOD ANON_REQ can be forwarded.
# Our own injected ANON_REQ is suppressed by the engine's duplicate (mark_seen) check.
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
await companion_bridges[dest_hash].process_received_packet(packet)
processed_by_injection = True
elif self.daemon.login_helper:
handled = await self.daemon.login_helper.process_login_packet(packet)
if handled:
processed_by_injection = True
if processed_by_injection:
self._record_for_ui(packet, metadata)
elif payload_type == AckHandler.payload_type():
# ACK has no dest in payload (4-byte CRC only); deliver to all bridges so sender sees send_confirmed.
# Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop.
companion_bridges = getattr(self.daemon, "companion_bridges", {})
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge ACK error: {e}")
elif payload_type == TextMessageHandler.payload_type():
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
await companion_bridges[dest_hash].process_received_packet(packet)
processed_by_injection = True
self._record_for_ui(packet, metadata)
elif self.daemon.text_helper:
handled = await self.daemon.text_helper.process_text_packet(packet)
if handled:
processed_by_injection = True
self._record_for_ui(packet, metadata)
elif payload_type == PathHandler.payload_type():
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
if self._should_deliver_path_to_companions(packet):
await companion_bridges[dest_hash].process_received_packet(packet)
# Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop.
elif companion_bridges and self._should_deliver_path_to_companions(packet):
# Dest not in bridges: path-return with ephemeral dest (e.g. multi-hop login).
# Deliver to all bridges; each will try to decrypt and ignore if not relevant.
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge PATH error: {e}")
logger.debug(
"PATH dest=0x%02x (anon) delivered to %d bridge(s) for matching",
dest_hash or 0,
len(companion_bridges),
)
# Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop.
elif self.daemon.path_helper:
await self.daemon.path_helper.process_path_packet(packet)
elif payload_type == LoginResponseHandler.payload_type():
# PAYLOAD_TYPE_RESPONSE (0x01): payload is dest_hash(1)+src_hash(1)+encrypted.
# Deliver to the bridge that is the destination, or to all bridges when the
# response is addressed to this repeater (path-based reply: firmware sends
# to first hop instead of original requester).
# Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop.
dest_hash = packet.payload[0] if packet.payload and len(packet.payload) >= 1 else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
local_hash = getattr(self.daemon, "local_hash", None)
if dest_hash is not None and dest_hash in companion_bridges:
try:
await companion_bridges[dest_hash].process_received_packet(packet)
logger.info(
"RESPONSE dest=0x%02x delivered to companion bridge",
dest_hash,
)
except Exception as e:
logger.debug(f"Companion bridge RESPONSE error: {e}")
elif dest_hash == local_hash and companion_bridges:
# Response addressed to this repeater (e.g. path-based reply to first hop)
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge RESPONSE error: {e}")
logger.info(
"RESPONSE dest=0x%02x (local) delivered to %d companion bridge(s)",
dest_hash,
len(companion_bridges),
)
elif companion_bridges:
# Dest not in bridges and not local: likely ANON_REQ response (dest = ephemeral
# sender hash). Deliver to all bridges; each will try to decrypt and ignore if
# not relevant (firmware-like behavior, works with multiple companion bridges).
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge RESPONSE error: {e}")
logger.debug(
"RESPONSE dest=0x%02x (anon) delivered to %d bridge(s) for matching",
dest_hash or 0,
len(companion_bridges),
)
if companion_bridges and _is_direct_final_hop(packet):
# DIRECT with empty path: we're the final hop; don't pass to engine (it would drop with "Direct: no path")
processed_by_injection = True
self._record_for_ui(packet, metadata)
elif payload_type == ProtocolResponseHandler.payload_type():
# PAYLOAD_TYPE_PATH (0x08): protocol responses (telemetry, binary, etc.).
# Deliver at most once per logical packet so the client is not spammed with duplicates.
# Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop.
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if companion_bridges and self._should_deliver_path_to_companions(packet):
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge RESPONSE error: {e}")
if companion_bridges and _is_direct_final_hop(packet):
# DIRECT with empty path: we're the final hop; ensure delivery to all bridges (anon)
if not self._should_deliver_path_to_companions(packet):
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge RESPONSE (final hop) error: {e}")
processed_by_injection = True
self._record_for_ui(packet, metadata)
elif payload_type == ProtocolRequestHandler.payload_type():
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
await companion_bridges[dest_hash].process_received_packet(packet)
processed_by_injection = True
self._record_for_ui(packet, metadata)
elif self.daemon.protocol_request_helper:
handled = await self.daemon.protocol_request_helper.process_request_packet(packet)
if handled:
processed_by_injection = True
self._record_for_ui(packet, metadata)
elif companion_bridges and _is_direct_final_hop(packet):
# DIRECT with empty path: we're the final hop; deliver to all bridges for anon matching
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge REQ (final hop) error: {e}")
processed_by_injection = True
self._record_for_ui(packet, metadata)
elif payload_type == GroupTextHandler.payload_type():
# GRP_TXT: pass to all companions (they filter by channel); still forward
companion_bridges = getattr(self.daemon, "companion_bridges", {})
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge GRP_TXT error: {e}")
# Only pass to repeater engine if not already processed by injection
# Skip engine for packets we injected for TX (already sent; avoid double-send/double-count)
if getattr(packet, "_injected_for_tx", False):
processed_by_injection = True
if self.daemon.repeater_handler and not processed_by_injection:
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
await self.daemon.repeater_handler(packet, metadata)