diff --git a/repeater/engine.py b/repeater/engine.py index 030f435..f1f65f9 100644 --- a/repeater/engine.py +++ b/repeater/engine.py @@ -11,6 +11,7 @@ from pymc_core.protocol import Packet from pymc_core.protocol.constants import ( MAX_PATH_SIZE, PAYLOAD_TYPE_ADVERT, + PAYLOAD_TYPE_ANON_REQ, PH_ROUTE_MASK, PH_TYPE_MASK, PH_TYPE_SHIFT, @@ -150,6 +151,9 @@ class RepeaterHandler(BaseHandler): transmitted = False tx_delay_ms = 0.0 drop_reason = None + lbt_attempts = 0 + lbt_backoff_delays_ms = None + lbt_channel_busy = False original_path_hashes = packet.get_path_hashes_hex() path_hash_size = packet.get_path_hash_size() @@ -291,70 +295,33 @@ class RepeaterHandler(BaseHandler): if is_dupe and drop_reason is None: drop_reason = "Duplicate" - path_hash = None display_hashes = ( original_path_hashes if original_path_hashes else packet.get_path_hashes_hex() ) - if display_hashes: - display = display_hashes[:8] - if len(display_hashes) > 8: - display = list(display) + ["..."] - path_hash = "[" + ", ".join(display) + "]" - - src_hash = None - dst_hash = None - - # Payload types with dest_hash and src_hash as first 2 bytes - if payload_type in [0x00, 0x01, 0x02, 0x08]: - if hasattr(packet, "payload") and packet.payload and len(packet.payload) >= 2: - dst_hash = f"{packet.payload[0]:02X}" - src_hash = f"{packet.payload[1]:02X}" - - # ADVERT packets have source identifier as first byte - elif payload_type == PAYLOAD_TYPE_ADVERT: - if hasattr(packet, "payload") and packet.payload and len(packet.payload) >= 1: - src_hash = f"{packet.payload[0]:02X}" + path_hash = self._path_hash_display(display_hashes) + src_hash, dst_hash = self._packet_record_src_dst(packet, payload_type) # Record packet for charts - packet_record = { - "timestamp": time.time(), - "header": ( - f"0x{packet.header:02X}" - if hasattr(packet, "header") and packet.header is not None - else None - ), - "payload": ( - packet.payload.hex() if hasattr(packet, "payload") and packet.payload else None - ), - "payload_length": ( - len(packet.payload) if hasattr(packet, "payload") and packet.payload else 0 - ), - "type": payload_type, - "route": route_type, - "length": len(packet.payload or b""), - "rssi": rssi, - "snr": snr, - "score": self.calculate_packet_score( - snr, len(packet.payload or b""), self.radio_config["spreading_factor"] - ), - "tx_delay_ms": tx_delay_ms, - "transmitted": transmitted, - "is_duplicate": is_dupe, - "packet_hash": pkt_hash[:16], - "drop_reason": drop_reason, - "path_hash": path_hash, - "src_hash": src_hash, - "dst_hash": dst_hash, - "original_path": original_path_hashes or None, - "forwarded_path": forwarded_path_hashes, - "path_hash_size": path_hash_size, - "raw_packet": packet.write_to().hex() if hasattr(packet, "write_to") else None, - "lbt_attempts": lbt_attempts if transmitted else 0, - "lbt_backoff_delays_ms": ( - lbt_backoff_delays_ms if transmitted and lbt_backoff_delays_ms else None - ), - "lbt_channel_busy": lbt_channel_busy if transmitted else False, - } + packet_record = self._build_packet_record( + packet, + payload_type, + route_type, + rssi, + snr, + original_path_hashes, + path_hash_size, + path_hash, + src_hash, + dst_hash, + transmitted=transmitted, + drop_reason=drop_reason, + is_duplicate=is_dupe, + forwarded_path=forwarded_path_hashes, + tx_delay_ms=tx_delay_ms, + lbt_attempts=lbt_attempts, + lbt_backoff_delays_ms=lbt_backoff_delays_ms, + lbt_channel_busy=lbt_channel_busy, + ) # Store packet record to persistent storage # Skip LetsMesh only for invalid packets (not duplicates or operational drops) @@ -409,6 +376,48 @@ class RepeaterHandler(BaseHandler): if len(self.recent_packets) > self.max_recent_packets: self.recent_packets.pop(0) + def record_packet_only(self, packet: Packet, metadata: dict) -> None: + """Record a packet for UI/storage without running forwarding or duplicate logic. + + Used by the packet router for injection-only types (ANON_REQ, ACK, PATH, etc.) + so they still appear in the web UI. + """ + if not self.storage: + return + rssi = metadata.get("rssi", 0) + snr = metadata.get("snr", 0.0) + if not hasattr(packet, "header") or packet.header is None: + logger.debug("record_packet_only: packet missing header, skipping") + return + header_info = PacketHeaderUtils.parse_header(packet.header) + payload_type = header_info["payload_type"] + route_type = header_info["route_type"] + original_path_hashes = packet.get_path_hashes_hex() + path_hash_size = packet.get_path_hash_size() + path_hash = self._path_hash_display(original_path_hashes) + src_hash, dst_hash = self._packet_record_src_dst(packet, payload_type) + packet_record = self._build_packet_record( + packet, + payload_type, + route_type, + rssi, + snr, + original_path_hashes, + path_hash_size, + path_hash, + src_hash, + dst_hash, + ) + try: + self.storage.record_packet(packet_record, skip_letsmesh_if_invalid=False) + except Exception as e: + logger.error(f"Failed to store packet record (record_packet_only): {e}") + return + self.recent_packets.append(packet_record) + if len(self.recent_packets) > self.max_recent_packets: + self.recent_packets.pop(0) + self.rx_count += 1 + def cleanup_cache(self): now = time.time() @@ -416,6 +425,94 @@ class RepeaterHandler(BaseHandler): for k in expired: del self.seen_packets[k] + def _path_hash_display(self, display_hashes) -> Optional[str]: + """Build path hash string for packet record from path hashes list.""" + if not display_hashes: + return None + display = display_hashes[:8] + if len(display_hashes) > 8: + display = list(display) + ["..."] + return "[" + ", ".join(display) + "]" + + def _packet_record_src_dst( + self, packet: Packet, payload_type: int + ) -> Tuple[Optional[str], Optional[str]]: + """Return (src_hash, dst_hash) for packet_record from packet and payload_type.""" + src_hash = None + dst_hash = None + payload = getattr(packet, "payload", None) + if payload_type in [0x00, 0x01, 0x02, 0x08]: + if payload and len(payload) >= 2: + dst_hash = f"{payload[0]:02X}" + src_hash = f"{payload[1]:02X}" + elif payload_type == PAYLOAD_TYPE_ADVERT: + if payload and len(payload) >= 1: + src_hash = f"{payload[0]:02X}" + elif payload_type == PAYLOAD_TYPE_ANON_REQ: + if payload and len(payload) >= 1: + dst_hash = f"{payload[0]:02X}" + return (src_hash, dst_hash) + + def _build_packet_record( + self, + packet: Packet, + payload_type: int, + route_type: int, + rssi: int, + snr: float, + original_path_hashes, + path_hash_size: int, + path_hash: Optional[str], + src_hash: Optional[str], + dst_hash: Optional[str], + *, + transmitted: bool = False, + drop_reason: Optional[str] = None, + is_duplicate: bool = False, + forwarded_path=None, + tx_delay_ms: float = 0.0, + lbt_attempts: int = 0, + lbt_backoff_delays_ms=None, + lbt_channel_busy: bool = False, + ) -> dict: + """Build a single packet_record dict for storage and recent_packets.""" + pkt_hash = packet.calculate_packet_hash().hex().upper() + payload = getattr(packet, "payload", None) + payload_len = len(payload or b"") + return { + "timestamp": time.time(), + "header": ( + f"0x{packet.header:02X}" + if hasattr(packet, "header") and packet.header is not None + else None + ), + "payload": payload.hex() if payload else None, + "payload_length": len(payload) if payload else 0, + "type": payload_type, + "route": route_type, + "length": payload_len, + "rssi": rssi, + "snr": snr, + "score": self.calculate_packet_score( + snr, payload_len, self.radio_config["spreading_factor"] + ), + "tx_delay_ms": tx_delay_ms, + "transmitted": transmitted, + "is_duplicate": is_duplicate, + "packet_hash": pkt_hash[:16], + "drop_reason": drop_reason, + "path_hash": path_hash, + "src_hash": src_hash, + "dst_hash": dst_hash, + "original_path": original_path_hashes or None, + "forwarded_path": forwarded_path, + "path_hash_size": path_hash_size, + "raw_packet": packet.write_to().hex() if hasattr(packet, "write_to") else None, + "lbt_attempts": lbt_attempts, + "lbt_backoff_delays_ms": lbt_backoff_delays_ms, + "lbt_channel_busy": lbt_channel_busy, + } + def _get_drop_reason(self, packet: Packet) -> str: if self.is_duplicate(packet): @@ -692,7 +789,6 @@ class RepeaterHandler(BaseHandler): self.mark_seen(packet) - original_path = list(packet.path) # Remove first hash entry (hash_size bytes) packet.path = bytearray(packet.path[hash_size:]) packet.path_len = PathUtils.encode_path_len(hash_size, hop_count - 1) diff --git a/repeater/packet_router.py b/repeater/packet_router.py index 843fc44..d9c02e9 100644 --- a/repeater/packet_router.py +++ b/repeater/packet_router.py @@ -13,6 +13,11 @@ from pymc_core.node.handlers.protocol_request import ProtocolRequestHandler from pymc_core.node.handlers.protocol_response import ProtocolResponseHandler from pymc_core.node.handlers.text import TextMessageHandler from pymc_core.node.handlers.trace import TraceHandler +from pymc_core.protocol.constants import ( + PH_ROUTE_MASK, + ROUTE_TYPE_DIRECT, + ROUTE_TYPE_TRANSPORT_DIRECT, +) logger = logging.getLogger("PacketRouter") @@ -29,6 +34,15 @@ def _companion_dedup_key(packet) -> str | None: return None +def _is_direct_final_hop(packet) -> bool: + """True if packet is DIRECT (or TRANSPORT_DIRECT) with empty path — we're the final destination.""" + route = getattr(packet, "header", 0) & PH_ROUTE_MASK + if route != ROUTE_TYPE_DIRECT and route != ROUTE_TYPE_TRANSPORT_DIRECT: + return False + path = getattr(packet, "path", None) + return not path or len(path) == 0 + + class PacketRouter: def __init__(self, daemon_instance): @@ -69,6 +83,15 @@ class PacketRouter: self._companion_delivered[key] = now + _COMPANION_DEDUPE_TTL_SEC return True + def _record_for_ui(self, packet, metadata: dict) -> None: + """Record an injection-only packet for the web UI (storage + recent_packets).""" + handler = getattr(self.daemon, "repeater_handler", None) + if handler and getattr(handler, "storage", None): + try: + handler.record_packet_only(packet, metadata) + except Exception as e: + logger.debug("Record for UI failed: %s", e) + async def enqueue(self, packet): """Add packet to router queue.""" await self.queue.put(packet) @@ -124,6 +147,11 @@ class PacketRouter: payload_type = packet.get_payload_type() processed_by_injection = False + metadata = { + "rssi": getattr(packet, "rssi", 0), + "snr": getattr(packet, "snr", 0.0), + "timestamp": getattr(packet, "timestamp", 0), + } # Route to specific handlers for parsing only if payload_type == TraceHandler.payload_type(): @@ -132,6 +160,7 @@ class PacketRouter: await self.daemon.trace_helper.process_trace_packet(packet) # Skip engine processing for trace packets - they're handled by trace helper processed_by_injection = True + self._record_for_ui(packet, metadata) elif payload_type == ControlHandler.payload_type(): # Process control/discovery packet @@ -167,7 +196,8 @@ class PacketRouter: elif payload_type == LoginServerHandler.payload_type(): # Route to companion if dest is a companion; else to login_helper (for logging into this repeater). - # If dest is remote (no local handler), mark processed so we don't pass our own outbound login TX to the repeater as RX. + # When dest is remote (not handled), pass to engine so DIRECT/FLOOD ANON_REQ can be forwarded. + # Our own injected ANON_REQ is suppressed by the engine's duplicate (mark_seen) check. dest_hash = packet.payload[0] if packet.payload else None companion_bridges = getattr(self.daemon, "companion_bridges", {}) if dest_hash is not None and dest_hash in companion_bridges: @@ -177,19 +207,18 @@ class PacketRouter: handled = await self.daemon.login_helper.process_login_packet(packet) if handled: processed_by_injection = True - else: - # Login request for remote repeater (we already TXed it via inject); don't treat as RX. - processed_by_injection = True + if processed_by_injection: + self._record_for_ui(packet, metadata) elif payload_type == AckHandler.payload_type(): - # ACK has no dest in payload (4-byte CRC only); deliver to all bridges so sender sees send_confirmed + # ACK has no dest in payload (4-byte CRC only); deliver to all bridges so sender sees send_confirmed. + # Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop. companion_bridges = getattr(self.daemon, "companion_bridges", {}) for bridge in companion_bridges.values(): try: await bridge.process_received_packet(packet) except Exception as e: logger.debug(f"Companion bridge ACK error: {e}") - processed_by_injection = True elif payload_type == TextMessageHandler.payload_type(): dest_hash = packet.payload[0] if packet.payload else None @@ -197,10 +226,12 @@ class PacketRouter: if dest_hash is not None and dest_hash in companion_bridges: await companion_bridges[dest_hash].process_received_packet(packet) processed_by_injection = True + self._record_for_ui(packet, metadata) elif self.daemon.text_helper: handled = await self.daemon.text_helper.process_text_packet(packet) if handled: processed_by_injection = True + self._record_for_ui(packet, metadata) elif payload_type == PathHandler.payload_type(): dest_hash = packet.payload[0] if packet.payload else None @@ -208,7 +239,7 @@ class PacketRouter: if dest_hash is not None and dest_hash in companion_bridges: if self._should_deliver_path_to_companions(packet): await companion_bridges[dest_hash].process_received_packet(packet) - processed_by_injection = True + # Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop. elif companion_bridges and self._should_deliver_path_to_companions(packet): # Dest not in bridges: path-return with ephemeral dest (e.g. multi-hop login). # Deliver to all bridges; each will try to decrypt and ignore if not relevant. @@ -222,7 +253,7 @@ class PacketRouter: dest_hash or 0, len(companion_bridges), ) - processed_by_injection = True + # Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop. elif self.daemon.path_helper: await self.daemon.path_helper.process_path_packet(packet) @@ -231,6 +262,7 @@ class PacketRouter: # Deliver to the bridge that is the destination, or to all bridges when the # response is addressed to this repeater (path-based reply: firmware sends # to first hop instead of original requester). + # Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop. dest_hash = packet.payload[0] if packet.payload and len(packet.payload) >= 1 else None companion_bridges = getattr(self.daemon, "companion_bridges", {}) local_hash = getattr(self.daemon, "local_hash", None) @@ -243,7 +275,6 @@ class PacketRouter: ) except Exception as e: logger.debug(f"Companion bridge RESPONSE error: {e}") - processed_by_injection = True elif dest_hash == local_hash and companion_bridges: # Response addressed to this repeater (e.g. path-based reply to first hop) for bridge in companion_bridges.values(): @@ -256,7 +287,6 @@ class PacketRouter: dest_hash, len(companion_bridges), ) - processed_by_injection = True elif companion_bridges: # Dest not in bridges and not local: likely ANON_REQ response (dest = ephemeral # sender hash). Deliver to all bridges; each will try to decrypt and ignore if @@ -271,11 +301,15 @@ class PacketRouter: dest_hash or 0, len(companion_bridges), ) + if companion_bridges and _is_direct_final_hop(packet): + # DIRECT with empty path: we're the final hop; don't pass to engine (it would drop with "Direct: no path") processed_by_injection = True + self._record_for_ui(packet, metadata) elif payload_type == ProtocolResponseHandler.payload_type(): # PAYLOAD_TYPE_PATH (0x08): protocol responses (telemetry, binary, etc.). # Deliver at most once per logical packet so the client is not spammed with duplicates. + # Do not set processed_by_injection so packet also reaches engine for DIRECT forwarding when we're a middle hop. companion_bridges = getattr(self.daemon, "companion_bridges", {}) if companion_bridges and self._should_deliver_path_to_companions(packet): for bridge in companion_bridges.values(): @@ -283,8 +317,16 @@ class PacketRouter: await bridge.process_received_packet(packet) except Exception as e: logger.debug(f"Companion bridge RESPONSE error: {e}") - if companion_bridges: + if companion_bridges and _is_direct_final_hop(packet): + # DIRECT with empty path: we're the final hop; ensure delivery to all bridges (anon) + if not self._should_deliver_path_to_companions(packet): + for bridge in companion_bridges.values(): + try: + await bridge.process_received_packet(packet) + except Exception as e: + logger.debug(f"Companion bridge RESPONSE (final hop) error: {e}") processed_by_injection = True + self._record_for_ui(packet, metadata) elif payload_type == ProtocolRequestHandler.payload_type(): dest_hash = packet.payload[0] if packet.payload else None @@ -292,10 +334,21 @@ class PacketRouter: if dest_hash is not None and dest_hash in companion_bridges: await companion_bridges[dest_hash].process_received_packet(packet) processed_by_injection = True + self._record_for_ui(packet, metadata) elif self.daemon.protocol_request_helper: handled = await self.daemon.protocol_request_helper.process_request_packet(packet) if handled: processed_by_injection = True + self._record_for_ui(packet, metadata) + elif companion_bridges and _is_direct_final_hop(packet): + # DIRECT with empty path: we're the final hop; deliver to all bridges for anon matching + for bridge in companion_bridges.values(): + try: + await bridge.process_received_packet(packet) + except Exception as e: + logger.debug(f"Companion bridge REQ (final hop) error: {e}") + processed_by_injection = True + self._record_for_ui(packet, metadata) elif payload_type == GroupTextHandler.payload_type(): # GRP_TXT: pass to all companions (they filter by channel); still forward diff --git a/tests/test_engine.py b/tests/test_engine.py index 7c4adf9..5669614 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -287,9 +287,10 @@ class TestDirectForward: assert result.path_len == 2 def test_single_hop_path_consumed(self, handler): - """After consuming our hash the path becomes empty — packet delivered.""" + """Single hop to us: we strip and return packet with empty path (forward so it can reach destination).""" pkt = _make_direct_packet(path=bytes([LOCAL_HASH])) result = handler.direct_forward(pkt) + assert result is not None assert list(result.path) == [] assert result.path_len == 0 @@ -1044,7 +1045,7 @@ GOOD_PACKETS = [ lambda: _make_flood_packet(payload=b"\xAB\x01\x02\x03", payload_type=4)), ("good_direct_minimal", - "Direct, 1-byte payload, single hop to us", + "Direct, 1-byte payload, single hop to us (forward with empty path)", lambda: _make_direct_packet(payload=b"\x01", path=bytes([LOCAL_HASH]))), ("good_direct_multihop",