From 7256807fdd142ec8ec51b0bb71ff6df3d808db93 Mon Sep 17 00:00:00 2001 From: Rigear <278971+Rigear@users.noreply.github.com> Date: Sat, 11 Apr 2026 20:46:42 -0700 Subject: [PATCH] feat: Bring back disallowed types --- repeater/data_acquisition/mqtt_handler.py | 61 +++++++------------ .../data_acquisition/storage_collector.py | 4 +- 2 files changed, 24 insertions(+), 41 deletions(-) diff --git a/repeater/data_acquisition/mqtt_handler.py b/repeater/data_acquisition/mqtt_handler.py index 6e874b0..7b8c125 100644 --- a/repeater/data_acquisition/mqtt_handler.py +++ b/repeater/data_acquisition/mqtt_handler.py @@ -37,31 +37,6 @@ def b64url(x: bytes) -> str: return base64.urlsafe_b64encode(x).rstrip(b"=").decode() -# # -------------------------------------------------------------------- -# # Let's Mesh MQTT Broker List (WebSocket Secure) -# # -------------------------------------------------------------------- -# LETSMESH_BROKERS = [ -# { -# "name": "Europe (LetsMesh v1)", -# "host": "mqtt-eu-v1.letsmesh.net", -# "port": 443, -# "audience": "mqtt-eu-v1.letsmesh.net", -# "use_jwt_auth": True, -# "transport": "websockets", -# "enabled": True, -# }, -# { -# "name": "US West (LetsMesh v1)", -# "host": "mqtt-us-v1.letsmesh.net", -# "port": 443, -# "audience": "mqtt-us-v1.letsmesh.net", -# "use_jwt_auth": True, -# "transport": "websockets", -# "enabled": True, -# }, -# ] - - # ==================================================================== # Single Broker Connection Manager # ==================================================================== @@ -81,6 +56,7 @@ class _BrokerConnection: use_tls: bool, email: str, owner: str, + broker_index: int, on_connect_callback: Optional[Callable] = None, on_disconnect_callback: Optional[Callable] = None ): @@ -92,6 +68,7 @@ class _BrokerConnection: self.use_tls = use_tls self.email = email self.owner = owner + self.broker_index = broker_index self._on_connect_callback = on_connect_callback self._on_disconnect_callback = on_disconnect_callback self._connect_time = None @@ -101,7 +78,7 @@ class _BrokerConnection: self._reconnect_timer = None self._max_reconnect_delay = 300 # 5 minutes max self._jwt_refresh_timer = None - self.transport= broker.get('transport', 'websockets') + self.transport = broker.get('transport', 'websockets') client_id = f"meshcore_{self.public_key}_{broker['host']}" self.client = mqtt.Client(client_id=client_id, transport=self.transport) self.client.on_connect = self._on_connect @@ -115,8 +92,8 @@ class _BrokerConnection: disallowed_types = broker.get("disallowed_packet_types", []) type_name_map = {name: code for code, name in PAYLOAD_TYPES.items()} - self.disallowed_hex = [type_name_map.get(name.upper(), None) for name in disallowed_types] - self.disallowed_hex = [val for val in self.disallowed_hex if val is not None] # Filter out invalid names + self.disallowed_types = [type_name_map.get(name.upper(), None) for name in disallowed_types] + self.disallowed_types = [val for val in self.disallowed_types if val is not None] # Filter out invalid names def _generate_jwt(self) -> str: @@ -303,11 +280,14 @@ class _BrokerConnection: self.client.disconnect() logger.info(f"Disconnected from {self.broker['name']}") - def publish(self, topic: str, payload: str, retain: bool = False): + def publish(self, topic: str, payload: str, retain: bool = False, qos: int = 0): """Publish message to broker""" + logger.debug(f"Publishing to topic '{topic}' with payload: {payload}: self._running={self._running}") if self._running: - result = self.client.publish(topic, payload, retain=retain) + result = self.client.publish(topic, payload, retain=retain, qos=qos) return result + else: + logger.warning(f"Cannot publish to {self.broker['name']} - not connected") return None def is_connected(self) -> bool: @@ -386,7 +366,6 @@ class MeshCoreToMqttPusher: node_info = get_node_info(config) iata_code = node_info["iata_code"] - broker_index = node_info.get("broker_index") self.email = node_info.get("email", "") self.owner = node_info.get("owner", "") status_interval = node_info["status_interval"] @@ -398,6 +377,7 @@ class MeshCoreToMqttPusher: brokers = mqtt_config.get("brokers", []) # Add additional brokers from config + self.brokers = [] if brokers: for broker_config in brokers: if all(k in broker_config for k in ["name", "host", "port", "enabled"]): @@ -546,16 +526,12 @@ class MeshCoreToMqttPusher: def _topic(self, subtopic: str) -> str: return f"meshcore/{self.iata_code}/{self.public_key}/{subtopic}" - def publish_packet(self, pkt: dict, packet_type: string, subtopic="packets", retain=False): - if packet_type in self.disallowed_packet_types: - logger.debug(f"Skipped publishing packet type 0x{packet_type:02X} (disallowed)") - return - + def publish_packet(self, pkt: dict, subtopic="packets", retain=False): return self.publish(subtopic, self._process_packet(pkt), retain) def publish_raw_data(self, raw_hex: str, subtopic="raw", retain=False): pkt = {"type": "raw", "data": raw_hex, "bytes": len(raw_hex) // 2} - return self.publish_packet(pkt, "raw", subtopic, retain) + return self.publish_packet(pkt, subtopic, retain) def publish_status( self, @@ -598,16 +574,23 @@ class MeshCoreToMqttPusher: return self.publish("status", status, retain=True, qos=1) - def publish(self, subtopic: str, payload: dict, retain: bool = False): + def publish(self, subtopic: str, payload: dict, retain: bool = False, qos: int = 0): """Publish message to all connected brokers""" topic = self._topic(subtopic) message = json.dumps(payload) + logger.debug(f"Publishing to topic '{topic}' with payload: {message}") + + packet_type = payload.get("type") + results = [] with self._lock: for conn in self.connections: if conn.is_connected(): - result = conn.publish(topic, message, retain=retain) + if packet_type in conn.disallowed_types: + logger.debug(f"Skipped publishing packet type 0x{packet_type:02X} (disallowed)") + return + result = conn.publish(topic, message, retain=retain, qos=qos) results.append((conn.broker["name"], result)) logger.debug(f"Published to {conn.broker['name']}/{topic}") diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index cddf7c8..e835005 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -198,8 +198,8 @@ class StorageCollector: packet_record, origin=node_name, origin_id=self.mqtt_handler.public_key ) - if packet: - self.mqtt_handler.publish_packet(packet.to_dict(), packet_type) + if packet: + self.mqtt_handler.publish_packet(packet.to_dict()) logger.debug(f"Published packet type 0x{packet_type:02X} to LetsMesh") else: logger.debug("Skipped LetsMesh publish: packet missing raw_packet data")