diff --git a/main.py b/main.py index 5a08d42..e4b77a0 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +import json import logging import meshtastic import meshtastic.serial_interface @@ -11,16 +12,62 @@ from plugins import plugins from pubsub import pub import yaml from yaml.loader import SafeLoader +import paho.mqtt.client as mqtt + +logging.basicConfig() logger = logging.getLogger(name="meshtastic.bridge") logger.setLevel(logging.DEBUG) + +def onReceive(packet, interface): # called when a packet arrives + + for pipeline, pipeline_plugins in bridge_config["pipelines"].items(): + logger.debug(f"Pipeline {pipeline} initiated") + + p = plugins['packet_filter'] + pipeline_packet = p.do_action(packet) + + for plugin in pipeline_plugins: + for plugin_key, plugin_config in plugin.items(): + + logger.debug(f"Processing plugin: {pipeline}/{plugin_key}") + if not pipeline_packet: + logger.debug("Skipping since the packet is null") + continue + + if plugin_key not in plugins: + logger.error(f"No such plugin: {plugin_key}. Skipping") + continue + + p = plugins[plugin_key] + p.configure(devices, mqtt_servers, plugin_config) + + pipeline_packet = p.do_action(pipeline_packet) + + logger.debug(f"Pipeline {pipeline} completed") + +def onConnection( + interface, topic=pub.AUTO_TOPIC +): # called when we (re)connect to the radio + nodeInfo = interface.getMyNodeInfo() + logger.info( + f"Connected to node: userId={nodeInfo['user']['id']} hwModel={nodeInfo['user']['hwModel']}" + ) + +pub.subscribe(onReceive, "meshtastic.receive") +pub.subscribe(onConnection, "meshtastic.connection.established") + with open("config.yaml") as f: bridge_config = yaml.load(f, Loader=SafeLoader) devices = {} +mqtt_servers = {} for device in bridge_config["devices"]: + if "active" in device and not device['active']: + continue + if "serial" in device: devices[device["name"]] = meshtastic.serial_interface.SerialInterface( devPath=device["serial"] @@ -32,42 +79,91 @@ for device in bridge_config["devices"]: else: devices[device["name"]] = meshtastic.serial_interface.SerialInterface() +for config in bridge_config['mqtt_servers']: + required_options = [ + 'name', + 'server', + 'port', + ] -def onReceive(packet, interface): # called when a packet arrives - for pipeline in bridge_config["pipelines"]: + for option in required_options: + if option not in config: + logger.warning("Missing config: {option}") - pipeline_packet = packet + client_id = config['client_id'] if 'client_id' in config else None + username = config['username'] if 'username' in config else None + password = config['password'] if 'password' in config else None - for key, config in pipeline.items(): + if client_id: + mqttc = mqtt.Client(client_id) + else: + mqttc = mqtt.Client() - if not pipeline_packet: + if username and password: + mqttc.username_pw_set(username, password) + + mqtt_servers[config['name']] = mqttc + + def on_connect(mqttc, obj, flags, rc): + logger.debug(f"Connected to MQTT server: {config['name']}") + def on_message(mqttc, obj, msg): + packet = msg.payload.decode() + + logger.debug(f"MQTT {config['name']}: on_message") + + if 'pipelines' not in config: + logger.warning(f"MQTT ({config['name']}): no pipeline") + return + + for pipeline, pipeline_plugins in config['pipelines'].items(): + + logger.debug(f"MQTT {config['name']} pipeline {pipeline} started") + if not packet: continue - if key not in plugins: - logger.error(f"No such plugin: {key}. Skipping") - continue + for plugin in pipeline_plugins: + for plugin_key, plugin_config in plugin.items(): + if plugin_key not in plugins: + logger.error(f"No such plugin: {plugin_key}. Skipping") + continue - p = plugins[key] - p.configure(devices, config) + p = plugins[plugin_key] + p.configure(devices, mqtt_servers, plugin_config) - pipeline_packet = p.do_action(pipeline_packet) + try: + packet = p.do_action(packet) + except Exception as e: + logger.error(f"Hit an error: {e}", exc_info=True) + logger.debug(f"MQTT {config['name']} pipeline {pipeline} finished") + def on_publish(mqttc, obj, mid): + logger.debug(f"MQTT ({config['name']}) on_publish: {mid}") + def on_subscribe(mqttc, obj, mid, granted_qos): + logger.debug(f"MQTT ({config['name']}) on_subscribe: {mid}") -def onConnection( - interface, topic=pub.AUTO_TOPIC -): # called when we (re)connect to the radio - nodeInfo = interface.getMyNodeInfo() + mqttc.on_message = on_message + mqttc.on_connect = on_connect + mqttc.on_publish = on_publish + mqttc.on_subscribe = on_subscribe - logger.info( - f"Connected to node: userId={nodeInfo['user']['id']} hwModel={nodeInfo['user']['hwModel']}" - ) + import ssl + if 'insecure' in config and config['insecure']: + mqttc.tls_set(cert_reqs=ssl.CERT_NONE) + mqttc.tls_insecure_set(True) -pub.subscribe(onReceive, "meshtastic.receive") -pub.subscribe(onConnection, "meshtastic.connection.established") + mqttc.connect(config['server'], config['port'], 60) + + if 'topic' in config: + mqttc.subscribe(config['topic'], 0) + + mqttc.loop_start() while True: time.sleep(1000) for device, instance in devices.items(): instance.close() + +for server, instance in mqtt_servers.items(): + instance.disconnect() diff --git a/plugins.py b/plugins.py index 65e96eb..b726dde 100644 --- a/plugins.py +++ b/plugins.py @@ -1,18 +1,22 @@ from haversine import haversine from meshtastic import mesh_pb2 from meshtastic.__init__ import BROADCAST_ADDR +import base64 +import json import logging import os + plugins = {} class Plugin: - def configure(self, devices, config): + def configure(self, devices, mqtt_servers, config): self.config = config self.devices = devices + self.mqtt_servers = mqtt_servers - if "log_level" in config: + if config and "log_level" in config: if config["log_level"] == "debug": self.logger.setLevel(logging.DEBUG) elif config["log_level"] == "info": @@ -22,13 +26,35 @@ class Plugin: pass +class PacketFilter(Plugin): + logger = logging.getLogger(name="meshtastic.bridge.filter.packet") + + def strip_raw(self, dict_obj): + if type(dict_obj) is not dict: + return dict_obj + + if 'raw' in dict_obj: + del dict_obj['raw'] + + for k, v in dict_obj.items(): + dict_obj[k] = self.strip_raw(v) + + return dict_obj + + def do_action(self, packet): + packet = self.strip_raw(packet) + + if 'decoded' in packet and 'payload' in packet['decoded']: + packet['decoded']['payload'] = base64.b64encode(packet['decoded']['payload']).decode('utf-8') + + return packet + +plugins['packet_filter'] = PacketFilter() + class DebugFilter(Plugin): logger = logging.getLogger(name="meshtastic.bridge.plugin.logging") def do_action(self, packet): - self.logger.info( - f"{packet['id']} | {packet['fromId']}=>{packet['toId']} | {packet['decoded']['portnum']}" - ) self.logger.debug(packet) return packet @@ -138,10 +164,13 @@ class WebhookPlugin(Plugin): import json import requests + position = packet["decoded"]["position"] if "position" in packet["decoded"] else None + text = packet["decoded"]["text"] if "text" in packet["decoded"] else None + macros = { - "{LAT}": packet["decoded"]["position"]["latitude"], - "{LNG}": packet["decoded"]["position"]["longitude"], - "{MSG}": self.config["message"] if "message" in self.config else "", + "{LAT}": position["latitude"] if position else None, + "{LNG}": position["longitude"] if position else None, + "{MSG}": self.config["message"] if "message" in self.config else text, "{FID}": packet["fromId"], "{TID}": packet["toId"], } @@ -176,6 +205,92 @@ class WebhookPlugin(Plugin): plugins["webhook"] = WebhookPlugin() +class MQTTPlugin(Plugin): + logger = logging.getLogger(name="meshtastic.bridge.plugin.mqtt") + + def do_action(self, packet): + required_options = ['name', 'topic'] + + for option in required_options: + if option not in self.config: + self.logger.warning(f"Missing config: {option}") + return packet + + if self.config['name'] not in self.mqtt_servers: + self.logger.warning(f"No server established: {self.config['name']}") + return packet + + mqtt_server = self.mqtt_servers[self.config['name']] + + packet_payload = packet if type(packet) is str else json.dumps(packet) + + message = self.config['message'] if 'message' in self.config else packet_payload + + info = mqtt_server.publish(self.config['topic'], message) + info.wait_for_publish() + + self.logger.debug("Message sent") + +plugins['mqtt_plugin'] = MQTTPlugin() + + +class EncryptFilter(Plugin): + logger = logging.getLogger(name="meshtastic.bridge.filter.encrypt") + + def do_action(self, packet): + + if 'key' not in self.config: + return None + + from jwcrypto import jwk, jwe + from jwcrypto.common import json_encode, json_decode + + with open(self.config['key'], "rb") as pemfile: + encrypt_key = jwk.JWK.from_pem(pemfile.read()) + + public_key = jwk.JWK() + public_key.import_key(**json_decode(encrypt_key.export_public())) + protected_header = { + "alg": "RSA-OAEP-256", + "enc": "A256CBC-HS512", + "typ": "JWE", + "kid": public_key.thumbprint(), + } + + message = json.dumps(packet) + + jwetoken = jwe.JWE(message.encode('utf-8'), + recipient=public_key, + protected=protected_header) + + self.logger.debug(f"Encrypted message: {packet['id']}") + return jwetoken.serialize() + +plugins['encrypt_filter'] = EncryptFilter() + + +class DecryptFilter(Plugin): + logger = logging.getLogger(name="meshtastic.bridge.filter.decrypt") + + def do_action(self, packet): + if 'key' not in self.config: + return packet + + from jwcrypto import jwk, jwe + + with open(self.config['key'], "rb") as pemfile: + private_key = jwk.JWK.from_pem(pemfile.read()) + + jwetoken = jwe.JWE() + jwetoken.deserialize(packet, key=private_key) + payload = jwetoken.payload + packet = json.loads(payload) + self.logger.debug(f"Decrypted message: {packet['id']}") + return packet + +plugins['decrypt_filter'] = DecryptFilter() + + class SendPlugin(Plugin): logger = logging.getLogger(name="meshtastic.bridge.plugin.send") @@ -185,7 +300,7 @@ class SendPlugin(Plugin): self.logger.error(f"Missing interface for device {self.config['device']}") return packet - if "to" not in packet: + if "to" not in packet and "toId" not in packet: self.logger.debug("Not a message") return packet @@ -196,10 +311,12 @@ class SendPlugin(Plugin): ): destinationId = self.config["node_mapping"][packet["to"]] else: - destinationId = packet["to"] + destinationId = packet["to"] if "to" in packet else packet["toId"] if "to" in self.config: destinationId = self.config["to"] + elif "toId" in self.config: + destinationId = self.config["toId"] device_name = self.config["device"] device = self.devices[device_name] @@ -227,14 +344,11 @@ class SendPlugin(Plugin): else: meshPacket = mesh_pb2.MeshPacket() meshPacket.channel = 0 - meshPacket.decoded.payload = packet["decoded"]["payload"] + meshPacket.decoded.payload = base64.b64decode(packet["decoded"]["payload"]) meshPacket.decoded.portnum = packet["decoded"]["portnum"] meshPacket.decoded.want_response = False meshPacket.id = device._generatePacketId() - self.logger.debug( - f"Sending packet {meshPacket.id} to {self.config['device']}" - ) device._sendPacket(meshPacket=meshPacket, destinationId=destinationId) return packet diff --git a/requirements.txt b/requirements.txt index 7a6cf8a..6f786ac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,5 @@ haversine meshtastic requests pyyaml +paho-mqtt +jwcrypto