From 9b53c5b6debdadb37b5ae9b3627597d42db417ad Mon Sep 17 00:00:00 2001 From: Lloyd Date: Tue, 2 Dec 2025 16:09:43 +0000 Subject: [PATCH] Update packet handling to skip invalid advert packets and update record_packet method to conditionally publish to LetsMesh --- .../data_acquisition/storage_collector.py | 16 +++++++--- repeater/engine.py | 32 +++++++++---------- repeater/handler_helpers/advert.py | 5 +-- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index e2a549e..4f8bb7b 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -82,8 +82,13 @@ class StorageCollector: "packets_received": self.repeater_handler.rx_count, } - def record_packet(self, packet_record: dict): - """Record packet to storage and publish to MQTT/LetsMesh""" + def record_packet(self, packet_record: dict, skip_letsmesh_if_invalid: bool = True): + """Record packet to storage and publish to MQTT/LetsMesh + + Args: + packet_record: Dictionary containing packet information + skip_letsmesh_if_invalid: If True, don't publish packets with drop_reason to LetsMesh + """ logger.debug( f"Recording packet: type={packet_record.get('type')}, " f"transmitted={packet_record.get('transmitted')}" @@ -95,8 +100,11 @@ class StorageCollector: self.rrd_handler.update_packet_metrics(packet_record, cumulative_counts) self.mqtt_handler.publish(packet_record, "packet") - # Publish to LetsMesh if enabled - self._publish_to_letsmesh(packet_record) + # Publish to LetsMesh if enabled (skip invalid packets if requested) + if skip_letsmesh_if_invalid and packet_record.get('drop_reason'): + logger.debug(f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}") + else: + self._publish_to_letsmesh(packet_record) def _publish_to_letsmesh(self, packet_record: dict): """Publish packet to LetsMesh broker if enabled and allowed""" diff --git a/repeater/engine.py b/repeater/engine.py index 472b6aa..c612f65 100644 --- a/repeater/engine.py +++ b/repeater/engine.py @@ -98,8 +98,6 @@ class RepeaterHandler(BaseHandler): self._transport_keys_cache_time = 0 self._transport_keys_cache_ttl = 60 # Cache for 60 seconds - self._last_drop_reason = None - self._start_background_tasks() async def __call__(self, packet: Packet, metadata: Optional[dict] = None, local_transmission: bool = False) -> None: @@ -109,9 +107,6 @@ class RepeaterHandler(BaseHandler): self.rx_count += 1 - # Reset drop reason for this packet processing - self._last_drop_reason = None - # Check if we're in monitor mode (receive only, no forwarding) mode = self.config.get("repeater", {}).get("mode", "forward") monitor_mode = mode == "monitor" @@ -177,8 +172,9 @@ class RepeaterHandler(BaseHandler): if monitor_mode: drop_reason = "Monitor mode" else: - drop_reason = self._last_drop_reason or self._get_drop_reason(packet) - logger.debug(f"Packet not forwarded: {drop_reason}") + # Check if packet has a specific drop reason set by handlers + drop_reason = packet.drop_reason or self._get_drop_reason(packet) + logger.debug(f"Packet not forwarded: {drop_reason}") # Extract packet type and route from header if not hasattr(packet, "header") or packet.header is None: @@ -264,9 +260,11 @@ class RepeaterHandler(BaseHandler): } # Store packet record to persistent storage + # Skip LetsMesh if packet has a drop_reason (invalid/bad packet) if self.storage: try: - self.storage.record_packet(packet_record) + skip_letsmesh = bool(drop_reason) + self.storage.record_packet(packet_record, skip_letsmesh_if_invalid=skip_letsmesh) except Exception as e: logger.error(f"Failed to store packet record: {e}") @@ -458,12 +456,14 @@ class RepeaterHandler(BaseHandler): # Validate valid, reason = self.validate_packet(packet) if not valid: - self._last_drop_reason = reason + packet.drop_reason = reason return None # Check if packet is marked do-not-retransmit if packet.is_marked_do_not_retransmit(): - self._last_drop_reason = "Marked do not retransmit" + # Check if packet has custom drop reason + if not packet.drop_reason: + packet.drop_reason = "Marked do not retransmit" return None # Check global flood policy @@ -474,15 +474,15 @@ class RepeaterHandler(BaseHandler): allowed, check_reason = self._check_transport_codes(packet) if not allowed: - self._last_drop_reason = check_reason + packet.drop_reason = check_reason return None else: - self._last_drop_reason = "Global flood policy disabled" + packet.drop_reason = "Global flood policy disabled" return None # Suppress duplicates if self.is_duplicate(packet): - self._last_drop_reason = "Duplicate" + packet.drop_reason = "Duplicate" return None if packet.path is None: @@ -501,12 +501,12 @@ class RepeaterHandler(BaseHandler): # Check if we're the next hop if not packet.path or len(packet.path) == 0: - self._last_drop_reason = "Direct: no path" + packet.drop_reason = "Direct: no path" return None next_hop = packet.path[0] if next_hop != self.local_hash: - self._last_drop_reason = "Direct: not for us" + packet.drop_reason = "Direct: not for us" return None original_path = list(packet.path) @@ -608,7 +608,7 @@ class RepeaterHandler(BaseHandler): return fwd_pkt, delay else: - self._last_drop_reason = f"Unknown route type: {route_type}" + packet.drop_reason = f"Unknown route type: {route_type}" return None async def schedule_retransmit(self, fwd_pkt: Packet, delay: float, airtime_ms: float = 0.0): diff --git a/repeater/handler_helpers/advert.py b/repeater/handler_helpers/advert.py index 0c64f2e..b676a9f 100644 --- a/repeater/handler_helpers/advert.py +++ b/repeater/handler_helpers/advert.py @@ -54,9 +54,10 @@ class AdvertHelper: advert_data = await self.advert_handler(packet) if not advert_data or not advert_data.get("valid"): - logger.debug("Invalid advert packet") + logger.warning(f"Invalid advert packet from {packet.from_id}") packet.mark_do_not_retransmit() - + packet.drop_reason = "Invalid advert packet" + return # Extract data from parsed advert pubkey = advert_data["public_key"]