Files
meshtastic-bridge/plugins.py
2022-12-04 14:25:38 -05:00

454 lines
14 KiB
Python

from haversine import haversine
from meshtastic import mesh_pb2
from random import randrange
import base64
import json
import logging
import os
import re
plugins = {}
class Plugin(object):
def __init__(self) -> None:
self.logger.setLevel(logging.INFO)
def configure(self, devices, mqtt_servers, config):
self.config = config
self.devices = devices
self.mqtt_servers = mqtt_servers
if config and "log_level" in config:
if config["log_level"] == "debug":
self.logger.setLevel(logging.DEBUG)
elif config["log_level"] == "info":
self.logger.setLevel(logging.INFO)
def do_action(self, packet):
pass
class PacketFilter(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.filter.packet")
def strip_raw(self, data):
if type(data) is not dict:
return data
if "raw" in data:
del data["raw"]
for k, v in data.items():
data[k] = self.strip_raw(v)
return data
def normalize(self, dict_obj):
"""
Packets are either a dict, string dict or string
"""
if type(dict_obj) is not dict:
try:
dict_obj = json.loads(dict_obj)
except:
dict_obj = {"decoded": {"text": dict_obj}}
return self.strip_raw(dict_obj)
def do_action(self, packet):
self.logger.debug(f"Before normalization: {packet}")
packet = self.normalize(packet)
if "decoded" in packet and "payload" in packet["decoded"]:
if type(packet["decoded"]["payload"]) is bytes:
text = packet["decoded"]["payload"]
packet["decoded"]["payload"] = base64.b64encode(
packet["decoded"]["payload"]
).decode("utf-8")
self.logger.debug(f"After normalization: {packet}")
return packet
plugins["packet_filter"] = PacketFilter()
class DebugFilter(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.plugin.logging")
def do_action(self, packet):
self.logger.debug(packet)
return packet
plugins["debugger"] = DebugFilter()
class MessageFilter(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.filter.message")
def do_action(self, packet):
if not packet:
self.logger.error("Missing packet")
return packet
text = packet["decoded"]["text"] if "text" in packet["decoded"] else None
if text and "message" in self.config:
if "allow" in self.config["message"]:
matches = False
for allow_regex in self.config["message"]["allow"]:
if not matches and re.search(allow_regex, text):
matches = True
if not matches:
self.logger.debug(
f"Dropped because it doesn't match message allow filter"
)
return None
if "disallow" in self.config["message"]:
matches = False
for disallow_regex in self.config["message"]["disallow"]:
if not matches and re.search(disallow_regex, text):
matches = True
if matches:
self.logger.debug(
f"Dropped because it matches message disallow filter"
)
return None
filters = {
"app": packet["decoded"]["portnum"],
"from": packet["fromId"],
"to": packet["toId"],
}
for filter_key, value in filters.items():
if filter_key in self.config:
filter_val = self.config[filter_key]
if (
"allow" in filter_val
and filter_val["allow"]
and value not in filter_val["allow"]
):
self.logger.debug(
f"Dropped because {value} doesn't match {filter_key} allow filter"
)
return None
if (
"disallow" in filter_val
and filter_val["disallow"]
and value in filter_val["disallow"]
):
self.logger.debug(
f"Dropped because {value} matches {filter_key} disallow filter"
)
return None
self.logger.debug(f"Accepted")
return packet
plugins["message_filter"] = MessageFilter()
class LocationFilter(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.filter.distance")
def do_action(self, packet):
message_source_position = None
current_local_position = None
if "device" in self.config and self.config["device"] in self.devices:
nodeInfo = self.devices[self.config["device"]].getMyNodeInfo()
current_local_position = (
nodeInfo["position"]["latitude"],
nodeInfo["position"]["longitude"],
)
if (
"decoded" in packet
and "position" in packet["decoded"]
and "latitude" in packet["decoded"]["position"]
and "longitude" in packet["decoded"]["position"]
):
message_source_position = (
packet["decoded"]["position"]["latitude"],
packet["decoded"]["position"]["longitude"],
)
if "compare_latitude" in self.config and "compare_longitude" in self.config:
current_local_position = (
self.config["compare_latitude"],
self.config["compare_longitude"],
)
if message_source_position and current_local_position:
distance_km = haversine(message_source_position, current_local_position)
comparison = (
self.config["comparison"] if "comparison" in self.config else "within"
)
# message originates from too far a distance
if "max_distance_km" in self.config and self.config["max_distance_km"] > 0:
acceptable_distance = self.config["max_distance_km"]
if comparison == "within" and distance_km > acceptable_distance:
self.logger.debug(
f"Packet from too far: {distance_km} > {acceptable_distance}"
)
return None
elif comparison == "outside" and distance_km < acceptable_distance:
self.logger.debug(
f"Packet too close: {distance_km} < {acceptable_distance}"
)
return None
if "latitude" in self.config:
packet["decoded"]["position"]["latitude"] = self.config["latitude"]
if "longitude" in self.config:
packet["decoded"]["position"]["longitude"] = self.config["longitude"]
return packet
plugins["location_filter"] = LocationFilter()
class WebhookPlugin(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.plugin.webhook")
def do_action(self, packet):
if "active" in self.config and not self.config["active"]:
return packet
if "body" not in self.config:
self.logger.warning("Missing config: body")
return packet
import requests
position = (
packet["decoded"]["position"] if "position" in packet["decoded"] else None
)
text = packet["decoded"]["text"] if "text" in packet["decoded"] else None
macros = {
"{LAT}": position["latitude"] if position else None,
"{LNG}": position["longitude"] if position else None,
"{MSG}": self.config["message"] if "message" in self.config else text,
"{FID}": packet["fromId"],
"{TID}": packet["toId"],
}
body = self.config["body"]
for macro, value in macros.items():
body = body.replace(macro, str(value))
payload = json.loads(body)
self.logger.debug(f"Sending http POST request to {self.config['url']}")
# pass secrets from environment variables to request headers
headers = self.config["headers"] if "headers" in self.config else {}
for k, v in headers.items():
for ek, ev in os.environ.items():
needle = "{" + ek + "}"
if needle in v:
v = v.replace(needle, ev)
headers[k] = v
response = requests.post(self.config["url"], headers=headers, json=payload)
if not response.ok:
self.logger.warning(f"Error returned: {response.status_code}")
return packet
plugins["webhook"] = WebhookPlugin()
class MQTTPlugin(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.plugin.mqtt")
def do_action(self, packet):
required_options = ["name", "topic"]
for option in required_options:
if option not in self.config:
self.logger.warning(f"Missing config: {option}")
return packet
if self.config["name"] not in self.mqtt_servers:
self.logger.warning(f"No server established: {self.config['name']}")
return packet
mqtt_server = self.mqtt_servers[self.config["name"]]
if not mqtt_server.is_connected():
self.logger.error("Not sent, not connected")
return
packet_message = json.dumps(packet)
if "message" in self.config:
message = self.config["message"].replace("{MSG}", packet["decoded"]["text"])
else:
message = packet_message
info = mqtt_server.publish(self.config["topic"], message)
info.wait_for_publish()
self.logger.debug("Message sent")
plugins["mqtt_plugin"] = MQTTPlugin()
class EncryptFilter(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.filter.encrypt")
def do_action(self, packet):
if "key" not in self.config:
return None
from jwcrypto import jwk, jwe
from jwcrypto.common import json_encode, json_decode
with open(self.config["key"], "rb") as pemfile:
encrypt_key = jwk.JWK.from_pem(pemfile.read())
public_key = jwk.JWK()
public_key.import_key(**json_decode(encrypt_key.export_public()))
protected_header = {
"alg": "RSA-OAEP-256",
"enc": "A256CBC-HS512",
"typ": "JWE",
"kid": public_key.thumbprint(),
}
message = json.dumps(packet)
jwetoken = jwe.JWE(
message.encode("utf-8"), recipient=public_key, protected=protected_header
)
self.logger.debug(f"Encrypted message: {packet['id']}")
return jwetoken.serialize()
plugins["encrypt_filter"] = EncryptFilter()
class DecryptFilter(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.filter.decrypt")
def do_action(self, packet):
if "key" not in self.config:
return packet
if type(packet) is not str:
self.logger.warning(f"Packet is not string")
return packet
from jwcrypto import jwk, jwe
with open(self.config["key"], "rb") as pemfile:
private_key = jwk.JWK.from_pem(pemfile.read())
jwetoken = jwe.JWE()
jwetoken.deserialize(packet, key=private_key)
payload = jwetoken.payload
packet = json.loads(payload)
self.logger.debug(f"Decrypted message: {packet['id']}")
return packet
plugins["decrypt_filter"] = DecryptFilter()
class RadioMessagePlugin(Plugin):
logger = logging.getLogger(name="meshtastic.bridge.plugin.send")
def do_action(self, packet):
if self.config["device"] not in self.devices:
self.logger.error(f"Missing interface for device {self.config['device']}")
return packet
destinationId = None
if "to" in self.config:
destinationId = self.config["to"]
elif "toId" in self.config:
destinationId = self.config["toId"]
elif "node_mapping" in self.config and "to" in packet:
destinationId = self.config["node_mapping"][packet["to"]]
elif "to" in packet:
destinationId = packet["to"]
elif "toId" in packet:
destinationId = packet["toId"]
if not destinationId:
self.logger.error("Missing 'to' property in config or packet")
return packet
device_name = self.config["device"]
device = self.devices[device_name]
# Not a radio packet
if "decoded" in packet and "text" in packet["decoded"] and "from" not in packet:
self.logger.debug(f"Sending text to Radio {device_name}")
device.sendText(text=packet["decoded"]["text"], destinationId=destinationId)
elif (
"lat" in self.config
and self.config["lat"] > 0
and "lng" in self.config
and self.config["lng"] > 0
):
lat = self.config["lat"]
lng = self.config["lng"]
altitude = self.config["alt"] if "alt" in self.config else 0
self.logger.debug(f"Sending position to Radio {device_name}")
device.sendPosition(
latitude=lat,
longitude=lng,
altitude=altitude,
destinationId=destinationId,
)
elif (
"decoded" in packet
and "payload" in packet["decoded"]
and "portnum" in packet["decoded"]
):
meshPacket = mesh_pb2.MeshPacket()
meshPacket.channel = 0
meshPacket.decoded.payload = base64.b64decode(packet["decoded"]["payload"])
meshPacket.decoded.portnum = packet["decoded"]["portnum"]
meshPacket.decoded.want_response = False
meshPacket.id = device._generatePacketId()
self.logger.debug(f"Sending packet to Radio {device_name}")
device._sendPacket(meshPacket=meshPacket, destinationId=destinationId)
return packet
plugins["radio_message_plugin"] = RadioMessagePlugin()