feat: Bring back disallowed types

This commit is contained in:
Rigear
2026-04-11 20:46:42 -07:00
parent ba942ca1b7
commit 7256807fdd
2 changed files with 24 additions and 41 deletions

View File

@@ -37,31 +37,6 @@ def b64url(x: bytes) -> str:
return base64.urlsafe_b64encode(x).rstrip(b"=").decode()
# # --------------------------------------------------------------------
# # Let's Mesh MQTT Broker List (WebSocket Secure)
# # --------------------------------------------------------------------
# LETSMESH_BROKERS = [
# {
# "name": "Europe (LetsMesh v1)",
# "host": "mqtt-eu-v1.letsmesh.net",
# "port": 443,
# "audience": "mqtt-eu-v1.letsmesh.net",
# "use_jwt_auth": True,
# "transport": "websockets",
# "enabled": True,
# },
# {
# "name": "US West (LetsMesh v1)",
# "host": "mqtt-us-v1.letsmesh.net",
# "port": 443,
# "audience": "mqtt-us-v1.letsmesh.net",
# "use_jwt_auth": True,
# "transport": "websockets",
# "enabled": True,
# },
# ]
# ====================================================================
# Single Broker Connection Manager
# ====================================================================
@@ -81,6 +56,7 @@ class _BrokerConnection:
use_tls: bool,
email: str,
owner: str,
broker_index: int,
on_connect_callback: Optional[Callable] = None,
on_disconnect_callback: Optional[Callable] = None
):
@@ -92,6 +68,7 @@ class _BrokerConnection:
self.use_tls = use_tls
self.email = email
self.owner = owner
self.broker_index = broker_index
self._on_connect_callback = on_connect_callback
self._on_disconnect_callback = on_disconnect_callback
self._connect_time = None
@@ -101,7 +78,7 @@ class _BrokerConnection:
self._reconnect_timer = None
self._max_reconnect_delay = 300 # 5 minutes max
self._jwt_refresh_timer = None
self.transport= broker.get('transport', 'websockets')
self.transport = broker.get('transport', 'websockets')
client_id = f"meshcore_{self.public_key}_{broker['host']}"
self.client = mqtt.Client(client_id=client_id, transport=self.transport)
self.client.on_connect = self._on_connect
@@ -115,8 +92,8 @@ class _BrokerConnection:
disallowed_types = broker.get("disallowed_packet_types", [])
type_name_map = {name: code for code, name in PAYLOAD_TYPES.items()}
self.disallowed_hex = [type_name_map.get(name.upper(), None) for name in disallowed_types]
self.disallowed_hex = [val for val in self.disallowed_hex if val is not None] # Filter out invalid names
self.disallowed_types = [type_name_map.get(name.upper(), None) for name in disallowed_types]
self.disallowed_types = [val for val in self.disallowed_types if val is not None] # Filter out invalid names
def _generate_jwt(self) -> str:
@@ -303,11 +280,14 @@ class _BrokerConnection:
self.client.disconnect()
logger.info(f"Disconnected from {self.broker['name']}")
def publish(self, topic: str, payload: str, retain: bool = False):
def publish(self, topic: str, payload: str, retain: bool = False, qos: int = 0):
"""Publish message to broker"""
logger.debug(f"Publishing to topic '{topic}' with payload: {payload}: self._running={self._running}")
if self._running:
result = self.client.publish(topic, payload, retain=retain)
result = self.client.publish(topic, payload, retain=retain, qos=qos)
return result
else:
logger.warning(f"Cannot publish to {self.broker['name']} - not connected")
return None
def is_connected(self) -> bool:
@@ -386,7 +366,6 @@ class MeshCoreToMqttPusher:
node_info = get_node_info(config)
iata_code = node_info["iata_code"]
broker_index = node_info.get("broker_index")
self.email = node_info.get("email", "")
self.owner = node_info.get("owner", "")
status_interval = node_info["status_interval"]
@@ -398,6 +377,7 @@ class MeshCoreToMqttPusher:
brokers = mqtt_config.get("brokers", [])
# Add additional brokers from config
self.brokers = []
if brokers:
for broker_config in brokers:
if all(k in broker_config for k in ["name", "host", "port", "enabled"]):
@@ -546,16 +526,12 @@ class MeshCoreToMqttPusher:
def _topic(self, subtopic: str) -> str:
return f"meshcore/{self.iata_code}/{self.public_key}/{subtopic}"
def publish_packet(self, pkt: dict, packet_type: string, subtopic="packets", retain=False):
if packet_type in self.disallowed_packet_types:
logger.debug(f"Skipped publishing packet type 0x{packet_type:02X} (disallowed)")
return
def publish_packet(self, pkt: dict, subtopic="packets", retain=False):
return self.publish(subtopic, self._process_packet(pkt), retain)
def publish_raw_data(self, raw_hex: str, subtopic="raw", retain=False):
pkt = {"type": "raw", "data": raw_hex, "bytes": len(raw_hex) // 2}
return self.publish_packet(pkt, "raw", subtopic, retain)
return self.publish_packet(pkt, subtopic, retain)
def publish_status(
self,
@@ -598,16 +574,23 @@ class MeshCoreToMqttPusher:
return self.publish("status", status, retain=True, qos=1)
def publish(self, subtopic: str, payload: dict, retain: bool = False):
def publish(self, subtopic: str, payload: dict, retain: bool = False, qos: int = 0):
"""Publish message to all connected brokers"""
topic = self._topic(subtopic)
message = json.dumps(payload)
logger.debug(f"Publishing to topic '{topic}' with payload: {message}")
packet_type = payload.get("type")
results = []
with self._lock:
for conn in self.connections:
if conn.is_connected():
result = conn.publish(topic, message, retain=retain)
if packet_type in conn.disallowed_types:
logger.debug(f"Skipped publishing packet type 0x{packet_type:02X} (disallowed)")
return
result = conn.publish(topic, message, retain=retain, qos=qos)
results.append((conn.broker["name"], result))
logger.debug(f"Published to {conn.broker['name']}/{topic}")

View File

@@ -198,8 +198,8 @@ class StorageCollector:
packet_record, origin=node_name, origin_id=self.mqtt_handler.public_key
)
if packet:
self.mqtt_handler.publish_packet(packet.to_dict(), packet_type)
if packet:
self.mqtt_handler.publish_packet(packet.to_dict())
logger.debug(f"Published packet type 0x{packet_type:02X} to LetsMesh")
else:
logger.debug("Skipped LetsMesh publish: packet missing raw_packet data")