from haversine import haversine from meshtastic import mesh_pb2 from random import randrange import base64 import json import logging import os import re plugins = {} class Plugin: def configure(self, devices, mqtt_servers, config): self.config = config self.devices = devices self.mqtt_servers = mqtt_servers if config and "log_level" in config: if config["log_level"] == "debug": self.logger.setLevel(logging.DEBUG) elif config["log_level"] == "info": self.logger.setLevel(logging.INFO) def do_action(self, packet): pass class PacketFilter(Plugin): logger = logging.getLogger(name="meshtastic.bridge.filter.packet") def strip_raw(self, dict_obj): if type(dict_obj) is not dict: return dict_obj if "raw" in dict_obj: del dict_obj["raw"] for k, v in dict_obj.items(): dict_obj[k] = self.strip_raw(v) return dict_obj def do_action(self, packet): packet = self.strip_raw(packet) if "decoded" in packet and "payload" in packet["decoded"]: packet["decoded"]["payload"] = base64.b64encode( packet["decoded"]["payload"] ).decode("utf-8") return packet plugins["packet_filter"] = PacketFilter() class DebugFilter(Plugin): logger = logging.getLogger(name="meshtastic.bridge.plugin.logging") def do_action(self, packet): self.logger.debug(packet) return packet plugins["debugger"] = DebugFilter() class MessageFilter(Plugin): logger = logging.getLogger(name="meshtastic.bridge.filter.message") def do_action(self, packet): if not packet: self.logger.error("Missing packet") return packet text = packet["decoded"]["text"] if "text" in packet["decoded"] else None if text and "message" in self.config: if "allow" in self.config["message"]: matches = False for allow_regex in self.config["message"]["allow"]: if not matches and re.search(allow_regex, text): matches = True if not matches: self.logger.debug( f"Dropped because it doesn't match message allow filter" ) return None if text and "disallow" in self.config["message"]: matches = False for disallow_regex in self.config["message"]["disallow"]: if not matches and re.search(disallow_regex, text): matches = True if matches: self.logger.debug(f"Dropped because it matches message disallow filter") return None filters = { "app": packet["decoded"]["portnum"], "from": packet["fromId"], "to": packet["toId"], } for filter_key, value in filters.items(): if filter_key in self.config: filter_val = self.config[filter_key] if ( "allow" in filter_val and filter_val["allow"] and value not in filter_val["allow"] ): self.logger.debug( f"Dropped because it doesn't match {filter_key} allow filter" ) return None if ( "disallow" in filter_val and filter_val["disallow"] and value in filter_val["disallow"] ): self.logger.debug( f"Dropped because it matches {filter_key} disallow filter" ) return None self.logger.debug(f"Accepted") return packet plugins["message_filter"] = MessageFilter() class LocationFilter(Plugin): logger = logging.getLogger(name="meshtastic.bridge.filter.distance") def do_action(self, packet): message_source_position = None current_local_position = None if "device" in self.config and self.config["device"] in self.devices: nodeInfo = self.devices[self.config["device"]].getMyNodeInfo() current_local_position = ( nodeInfo["position"]["latitude"], nodeInfo["position"]["longitude"], ) if ( "decoded" in packet and "position" in packet["decoded"] and "latitude" in packet["decoded"]["position"] and "longitude" in packet["decoded"]["position"] ): message_source_position = ( packet["decoded"]["position"]["latitude"], packet["decoded"]["position"]["longitude"], ) if "compare_latitude" in self.config and "compare_longitude" in self.config: current_local_position = ( self.config["compare_latitude"], self.config["compare_longitude"], ) if message_source_position and current_local_position: distance_km = haversine(message_source_position, current_local_position) comparison = ( self.config["comparison"] if "comparison" in self.config else "within" ) # message originates from too far a distance if "max_distance_km" in self.config and self.config["max_distance_km"] > 0: acceptable_distance = self.config["max_distance_km"] if comparison == "within" and distance_km > acceptable_distance: self.logger.debug( f"Packet from too far: {distance_km} > {acceptable_distance}" ) return None elif comparison == "outside" and distance_km < acceptable_distance: self.logger.debug( f"Packet too close: {distance_km} < {acceptable_distance}" ) return None if "latitude" in self.config: packet["decoded"]["position"]["latitude"] = self.config["latitude"] if "longitude" in self.config: packet["decoded"]["position"]["longitude"] = self.config["longitude"] return packet plugins["location_filter"] = LocationFilter() class WebhookPlugin(Plugin): logger = logging.getLogger(name="meshtastic.bridge.plugin.webhook") def do_action(self, packet): if type(packet) is not dict: try: packet = json.loads(packet) except: self.logger.warning("Packet is not dict") return packet if "active" in self.config and not self.config["active"]: return packet if "body" not in self.config: self.logger.warning("Missing config: body") return packet import requests position = ( packet["decoded"]["position"] if "position" in packet["decoded"] else None ) text = packet["decoded"]["text"] if "text" in packet["decoded"] else None macros = { "{LAT}": position["latitude"] if position else None, "{LNG}": position["longitude"] if position else None, "{MSG}": self.config["message"] if "message" in self.config else text, "{FID}": packet["fromId"], "{TID}": packet["toId"], } body = self.config["body"] for macro, value in macros.items(): body = body.replace(macro, str(value)) payload = json.loads(body) self.logger.debug(f"Sending http POST request to {self.config['url']}") # pass secrets from environment variables to request headers headers = self.config["headers"] if "headers" in self.config else {} for k, v in headers.items(): for ek, ev in os.environ.items(): needle = "{" + ek + "}" if needle in v: v = v.replace(needle, ev) headers[k] = v response = requests.post(self.config["url"], headers=headers, json=payload) if not response.ok: self.logger.warning(f"Error returned: {response.status_code}") return packet plugins["webhook"] = WebhookPlugin() class MQTTPlugin(Plugin): logger = logging.getLogger(name="meshtastic.bridge.plugin.mqtt") def do_action(self, packet): required_options = ["name", "topic"] for option in required_options: if option not in self.config: self.logger.warning(f"Missing config: {option}") return packet if self.config["name"] not in self.mqtt_servers: self.logger.warning(f"No server established: {self.config['name']}") return packet mqtt_server = self.mqtt_servers[self.config["name"]] packet_payload = packet if type(packet) is str else json.dumps(packet) message = self.config["message"] if "message" in self.config else packet_payload info = mqtt_server.publish(self.config["topic"], message) info.wait_for_publish() self.logger.debug("Message sent") plugins["mqtt_plugin"] = MQTTPlugin() class EncryptFilter(Plugin): logger = logging.getLogger(name="meshtastic.bridge.filter.encrypt") def do_action(self, packet): if "key" not in self.config: return None from jwcrypto import jwk, jwe from jwcrypto.common import json_encode, json_decode with open(self.config["key"], "rb") as pemfile: encrypt_key = jwk.JWK.from_pem(pemfile.read()) public_key = jwk.JWK() public_key.import_key(**json_decode(encrypt_key.export_public())) protected_header = { "alg": "RSA-OAEP-256", "enc": "A256CBC-HS512", "typ": "JWE", "kid": public_key.thumbprint(), } message = json.dumps(packet) jwetoken = jwe.JWE( message.encode("utf-8"), recipient=public_key, protected=protected_header ) self.logger.debug(f"Encrypted message: {packet['id']}") return jwetoken.serialize() plugins["encrypt_filter"] = EncryptFilter() class DecryptFilter(Plugin): logger = logging.getLogger(name="meshtastic.bridge.filter.decrypt") def do_action(self, packet): if "key" not in self.config: return packet if type(packet) is not str: self.logger.warning(f"Packet is not string") return packet from jwcrypto import jwk, jwe with open(self.config["key"], "rb") as pemfile: private_key = jwk.JWK.from_pem(pemfile.read()) jwetoken = jwe.JWE() jwetoken.deserialize(packet, key=private_key) payload = jwetoken.payload packet = json.loads(payload) self.logger.debug(f"Decrypted message: {packet['id']}") return packet plugins["decrypt_filter"] = DecryptFilter() class RadioMessagePlugin(Plugin): logger = logging.getLogger(name="meshtastic.bridge.plugin.send") def do_action(self, packet): if type(packet) is not dict: try: packet = json.loads(packet) except: self.logger.error("Packet is not a dict") return packet if self.config["device"] not in self.devices: self.logger.error(f"Missing interface for device {self.config['device']}") return packet if "to" not in packet and "toId" not in packet: self.logger.debug("Not a message") return packet # Broadcast messages or specific if ( "node_mapping" in self.config and packet["to"] in self.config["node_mapping"] ): destinationId = self.config["node_mapping"][packet["to"]] else: destinationId = packet["to"] if "to" in packet else packet["toId"] if "to" in self.config: destinationId = self.config["to"] elif "toId" in self.config: destinationId = self.config["toId"] device_name = self.config["device"] if device_name not in self.devices: self.logger.warning(f"No such radio device: {device_name}") return packet device = self.devices[device_name] self.logger.debug(f"Sending packet to Radio {device_name}") if "message" in self.config and self.config["message"]: device.sendText(text=self.config["message"], destinationId=destinationId) elif ( "lat" in self.config and self.config["lat"] > 0 and "lng" in self.config and self.config["lng"] > 0 ): lat = self.config["lat"] lng = self.config["lng"] altitude = self.config["alt"] if "alt" in self.config else 0 device.sendPosition( latitude=lat, longitude=lng, altitude=altitude, destinationId=destinationId, ) else: meshPacket = mesh_pb2.MeshPacket() meshPacket.channel = 0 meshPacket.decoded.payload = base64.b64decode(packet["decoded"]["payload"]) meshPacket.decoded.portnum = packet["decoded"]["portnum"] meshPacket.decoded.want_response = False meshPacket.id = device._generatePacketId() device._sendPacket(meshPacket=meshPacket, destinationId=destinationId) return packet plugins["radio_message_plugin"] = RadioMessagePlugin()