diff --git a/mesh_bot.py b/mesh_bot.py index e18c244..8a892bd 100755 --- a/mesh_bot.py +++ b/mesh_bot.py @@ -1,413 +1,267 @@ -#!/usr/bin/env python3 -# Meshtastic Autoresponder MESH Bot +# helper functions to use location data # K7MHI Kelly Keeton 2024 -import asyncio # for the event loop -import time # for sleep, get some when you can :) -from pubsub import pub # pip install pubsub -import meshtastic.serial_interface #pip install meshtastic -import meshtastic.tcp_interface -import meshtastic.ble_interface -from datetime import datetime -from dadjokes import Dadjoke # pip install dadjokes +import json # pip install json +from geopy.geocoders import Nominatim # pip install geopy +import maidenhead as mh # pip install maidenhead +import requests # pip install requests +import bs4 as bs # pip install beautifulsoup4 +import xml.dom.minidom -# Import all the functions from the modules in the repo if you want to use them otherwise make waffles # -from modules.solarconditions import * # from the spudgunman/meshing-around repo -from modules.locationdata import * # from the spudgunman/meshing-around repo -from modules.bbstools import * # from the spudgunman/meshing-around repo +URL_TIMEOUT = 10 # wait time for URL requests +DAYS_OF_WEATHER = 4 # weather forecast days, the first two rows are today and tonight +# error messages +ALERT_COUNT = 2 # number of weather alerts to display +NO_DATA_NOGPS = "no location data: does your device have GPS?" +ERROR_FETCHING_DATA = "error fetching data" -# Uncomment the interface you want to use depending on your device connection -interface = meshtastic.serial_interface.SerialInterface() #serial interface (specify port) default is first found -#interface=meshtastic.tcp_interface.TCPInterface(hostname="192.168.0.1") # IP of your device -#interface=meshtastic.ble_interface.BLEInterface("AA:BB:CC:DD:EE:FF") # BLE interface +trap_list_location = ("whereami", "tide", "moon", "wx", "wxc", "wxa", "wxalert") -# A basic list of strings to trap and respond to -trap_list = ("ping", "pinging", "ack", "testing", "pong", "motd", "cmd", "lheard", "sitrep", "joke") +def where_am_i(lat=0, lon=0): + whereIam = "" + if float(lat) == 0 and float(lon) == 0: + return NO_DATA_NOGPS + # initialize Nominatim API + geolocator = Nominatim(user_agent="mesh-bot") -# comment out unwanted funtionality, defined in corresponding files/modules -trap_list = trap_list + trap_list_location # items tide, whereami, wxc, wx -trap_list = trap_list + trap_list_solarconditions # items hfcond, solar, sun, moon -trap_list = trap_list + trap_list_bbs # items bbslist, bbspost, bbsread, bbsdelete, bbshelp + location = geolocator.reverse(lat + ", " + lon) + address = location.raw['address'] + address_components = ['house_number', 'road', 'city', 'state', 'postcode', 'county', 'country'] + whereIam += ' '.join([address.get(component, '') for component in address_components if component in address]) + grid = mh.to_maiden(float(lat), float(lon)) + whereIam += " Grid: " + grid -welcome_message = "MeshBot, here for you like a friend who is not. Try sending: ping @foo or, cmd" -help_message = "CMD?: ping, motd, sitrep, joke, sun, hfcond, solar, moon, tide, whereami, wx, wxc, wxa, bbslist, bbshelp" -MOTD = "Thanks for using PongBOT! Have a good day!" # Message of the Day -RESPOND_BY_DM_ONLY = False # Set to True to respond messages via DM only, False uses smart response -DEFAULT_CHANNEL = 0 # Default channel on your node, also known as "public channel" 0 on new devices + return whereIam -#Get the node number of the device, check if the device is connected -try: - myinfo = interface.getMyNodeInfo() - myNodeNum = myinfo['num'] -except Exception as e: - print(f"System: Critical Error script abort. {e}") - exit() - -def auto_response(message, snr, rssi, hop, message_from_id): - #Auto response to messages - if "ping" in message.lower(): - #Check if the user added @foo to the message - if "@" in message: - if hop == "Direct": - bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + " and copy: " + message.split("@")[1] - else: - bot_response = "🏓PONG, " + hop + " and copy: " + message.split("@")[1] - else: - if hop == "Direct": - bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" - else: - bot_response = "🏓PONG, " + hop - elif "ack" in message.lower(): - if hop == "Direct": - bot_response = "🏓ACK-ACK! " + f"SNR:{snr} RSSI:{rssi}" - else: - bot_response = "🏓ACK-ACK! " + hop - elif "testing" in message.lower(): - bot_response = "🏓Testing 1,2,3" - elif "pong" in message.lower(): - bot_response = "🏓PING!!" - elif "motd" in message.lower(): - #check if the user wants to set the motd by using $ - if "$" in message: - motd = message.split("$")[1] - global MOTD - MOTD = motd - bot_response = "MOTD Set to: " + MOTD - else: - bot_response = MOTD - elif "bbshelp" in message.lower(): - bot_response = bbs_help() - elif "cmd" in message.lower(): - bot_response = help_message - elif "sun" in message.lower(): - location = get_node_location(message_from_id) - bot_response = get_sun(str(location[0]),str(location[1])) - elif "hfcond" in message.lower(): - bot_response = hf_band_conditions() - elif "solar" in message.lower(): - bot_response = drap_xray_conditions() + "\n" + solar_conditions() - elif "lheard" in message.lower() or "sitrep" in message.lower(): - bot_response = "Last 5 nodes heard:\n" + str(get_node_list()) - elif "whereami" in message.lower(): - location = get_node_location(message_from_id) - where = where_am_i(str(location[0]),str(location[1])) - bot_response = where - elif "tide" in message.lower(): - location = get_node_location(message_from_id) - tide = get_tide(str(location[0]),str(location[1])) - bot_response = tide - elif "moon" in message.lower(): - location = get_node_location(message_from_id) - moon = get_moon(str(location[0]),str(location[1])) - bot_response = moon - elif "wxalert" in message.lower(): - location = get_node_location(message_from_id) - weatherAlert = getActiveWeatherAlertsDetail(str(location[0]),str(location[1])) - bot_response = weatherAlert[0] - elif "wxa" in message.lower(): - location = get_node_location(message_from_id) - weatherAlert = getWeatherAlerts(str(location[0]),str(location[1])) - bot_response = weatherAlert[0] - elif "wxc" in message.lower(): - location = get_node_location(message_from_id) - weather = get_weather(str(location[0]),str(location[1]),1) - bot_response = weather - elif "wx" in message.lower(): - location = get_node_location(message_from_id) - weather = get_weather(str(location[0]),str(location[1])) - bot_response = weather - elif "joke" in message.lower(): - bot_response = tell_joke() - elif "bbslist" in message.lower(): - bot_response = bbs_list_messages() - elif "bbspost" in message.lower(): - # Check if the user added a subject to the message - if "$" in message: - subject = message.split("$")[1].split("#")[0] - subject = subject.rstrip() - if "#" in message: - message = message.split("#")[1] - message = message.rstrip() - - bot_response = bbs_post_message(subject,message,message_from_id) - else: - bot_response = "example: bbspost $subject #message" - else: - bot_response = "Please add a subject to the message. ex: bbspost $subject #message" - elif "bbsread" in message.lower(): - # Check if the user added a message number to the message - if "#" in message: - messageID = int(message.split("#")[1]) - bot_response = bbs_read_message(messageID) - else: - bot_response = "Please add a message number ex: bbsread #14" - elif "bbsdelete" in message.lower(): - # Check if the user added a message number to the message - if "#" in message: - messageID = int(message.split("#")[1]) - bot_response = bbs_delete_message(messageID, message_from_id) - else: - bot_response = "Please add a message number ex: bbsdelete #14" - else: - bot_response = "I'm sorry, I'm afraid I can't do that." - - # wait a 700ms to avoid message collision from lora-ack - time.sleep(0.7) - - return bot_response - -def log_timestamp(): - return datetime.now().strftime("%Y-%m-%d %H:%M:%S") - -def onReceive(packet, interface): - # receive a packet and process it, main instruction loop - - # print the packet for debugging - #print("Packet Received") - #print(packet) # print the packet for debugging - #print("END of packet \n") - message_from_id = 0 - snr = 0 - rssi = 0 +def get_tide(lat=0, lon=0): + station_id = "" + if float(lat) == 0 and float(lon) == 0: + return NO_DATA_NOGPS + station_lookup_url = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/tidepredstations.json?lat=" + str(lat) + "&lon=" + str(lon) + "&radius=50" try: - if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': - message_bytes = packet['decoded']['payload'] - message_string = message_bytes.decode('utf-8') - message_from_id = packet['from'] - snr = packet['rxSnr'] - rssi = packet['rxRssi'] - - if packet.get('channel'): - channel_number = packet['channel'] - else: - channel_number = 0 - - # check if the packet has a hop count flag use it - if packet.get('hopsAway'): - hop_away = packet['hopsAway'] - else: - # if the packet does not have a hop count try other methods - hop_away = 0 - if packet.get('hopLimit'): - hop_limit = packet['hopLimit'] - else: - hop_limit = 0 - - if packet.get('hopStart'): - hop_start = packet['hopStart'] - else: - hop_start = 0 - - if hop_start == hop_limit: - hop = "Direct" - else: - # set hop to Direct if the message was sent directly otherwise set the hop count - if hop_away > 0: - hop_count = hop_away - else: - hop_count = hop_start - hop_limit - #print (f"calculated hop count: {hop_start} - {hop_limit} = {hop_count}") - - hop = f"{hop_count} hops" - - # If the packet is a DM (Direct Message) respond to it, otherwise validate its a message for us on the channel - if packet['to'] == myNodeNum: - # message is DM to us - if message_string == help_message or message_string == welcome_message: - # ignore help and welcome messages - print(f"{log_timestamp()} Got Own Welcome/Help header. From: {get_name_from_number(message_from_id)}") - return - elif "Try sending: ping @foo or, help" in message_string: - # ignore help and welcome messages - print(f"{log_timestamp()} Got Help Message. From: {get_name_from_number(message_from_id)}") - return - - # check if the message contains a trap word, DMs are always responded to - if messageTrap(message_string): - print(f"{log_timestamp()} Received DM: {message_string} on Channel: {channel_number} From: {get_name_from_number(message_from_id)}") - # respond with DM - send_message(auto_response(message_string, snr, rssi, hop, message_from_id), channel_number, message_from_id) - else: - # respond with welcome message on DM - print(f"{log_timestamp()} Ignoring DM: {message_string} From: {get_name_from_number(message_from_id)}") - send_message(welcome_message, channel_number, message_from_id) - else: - # message is on a channel - if messageTrap(message_string): - print(f"{log_timestamp()} Received On Channel {channel_number}: {message_string} From: {get_name_from_number(message_from_id)}") - if RESPOND_BY_DM_ONLY: - # respond to channel message via direct message - send_message(auto_response(message_string, snr, rssi, hop, message_from_id), channel_number, message_from_id) - else: - # or respond to channel message on the channel itself - if channel_number == DEFAULT_CHANNEL: - # warning user spamming default channel - print(f"{log_timestamp()} System: Warning spamming default channel not allowed. sending DM to {get_name_from_number(message_from_id)}") - - # respond to channel message via direct message - send_message(auto_response(message_string, snr, rssi, hop, message_from_id), channel_number, message_from_id) - else: - # respond to channel message on the channel itself - send_message(auto_response(message_string, snr, rssi, hop, message_from_id), channel_number) - else: - print(f"{log_timestamp()} System: Ignoring incoming channel {channel_number}: {message_string} From: {get_name_from_number(message_from_id)}") - - except KeyError as e: - print(f"System: Error processing packet: {e}") - print(packet) # print the packet for debugging - print("END of packet \n") - -def messageTrap(msg): - # Check if the message contains a trap word - message_list=msg.split(" ") - for m in message_list: - for t in trap_list: - if t.lower() == m.lower(): - return True - return False - -def tell_joke(): - # tell a dad joke, does it need an explanationn :) - dadjoke = Dadjoke() - return dadjoke.joke - -def decimal_to_hex(decimal_number): - return f"!{decimal_number:08x}" - -def get_name_from_number(number, type='long'): - name = "" - for node in interface.nodes.values(): - if number == node['num']: - if type == 'long': - name = node['user']['longName'] - return name - elif type == 'short': - name = node['user']['shortName'] - return name - else: - pass + station_data = requests.get(station_lookup_url, timeout=URL_TIMEOUT) + if station_data.ok: + station_json = station_data.json() else: - name = str(decimal_to_hex(number)) # If long name not found, use the ID as string - return name + return ERROR_FETCHING_DATA + except (requests.exceptions.RequestException, json.JSONDecodeError): + return ERROR_FETCHING_DATA -def get_node_list(): - node_list = [] - short_node_list = [] - if interface.nodes: - for node in interface.nodes.values(): - # ignore own - if node['num'] != myNodeNum: - node_name = get_name_from_number(node['num']) - snr = node.get('snr', 0) - - # issue where lastHeard is not always present - last_heard = node.get('lastHeard', 0) - - # make a list of nodes with last heard time and SNR - item = (node_name, last_heard, snr) - node_list.append(item) - - node_list.sort(key=lambda x: x[1], reverse=True) - #print (f"Node List: {node_list[:5]}\n") - - # make a nice list for the user - for x in node_list[:5]: - short_node_list.append(f"{x[0]} SNR:{x[2]}") - - return "\n".join(short_node_list) + if station_id is None: + return "no tide station found" + station_id = station_json['stationList'][0]['stationId'] + + station_url = "https://tidesandcurrents.noaa.gov/noaatidepredictions.html?id=" + station_id + + try: + station_data = requests.get(station_url, timeout=URL_TIMEOUT) + if not station_data.ok: + return ERROR_FETCHING_DATA + except (requests.exceptions.RequestException): + return ERROR_FETCHING_DATA + + # extract table class="table table-condensed" + soup = bs.BeautifulSoup(station_data.text, 'html.parser') + table = soup.find('table', class_='table table-condensed') + + # extract rows + rows = table.find_all('tr') + # extract data from rows + tide_data = [] + for row in rows: + row_text = "" + cols = row.find_all('td') + for col in cols: + row_text += col.text + " " + tide_data.append(row_text) + # format tide data into a string + tide_string = "" + for data in tide_data: + tide_string += data + "\n" + # trim off last newline + tide_string = tide_string[:-1] + return tide_string + +def get_weather(lat=0, lon=0, unit=0): + # get weather report from NOAA for forecast detailed + weather = "" + if float(lat) == 0 and float(lon) == 0: + return NO_DATA_NOGPS + + # get weather data from NOAA units for metric + weather_url = "https://forecast.weather.gov/MapClick.php?FcstType=text&lat=" + str(lat) + "&lon=" + str(lon) + if unit == 1: + weather_url += "&unit=1" + + try: + weather_data = requests.get(weather_url, timeout=URL_TIMEOUT) + if not weather_data.ok: + return ERROR_FETCHING_DATA + except (requests.exceptions.RequestException): + return ERROR_FETCHING_DATA + + soup = bs.BeautifulSoup(weather_data.text, 'html.parser') + table = soup.find('div', id="detailed-forecast-body") + + if table is None: + return "no weather data found on NOAA for your location" else: - return "Error Processing Node List" - -def get_node_location(number): - # Get the location of a node by its number from nodeDB on device - latitude = 0 - longitude = 0 - position = [0,0] - if interface.nodes: - for node in interface.nodes.values(): - if number == node['num']: - if 'position' in node: - latitude = node['position']['latitude'] - longitude = node['position']['longitude'] - print (f"System: location data for {number} is {latitude},{longitude}") - position = [latitude,longitude] - return position - else: - print (f"{log_timestamp()} System: No location data for {number}") - return position + # get rows + rows = table.find_all('div', class_="row") + + # extract data from rows + for row in rows: + # shrink the text + line = abbreviate_weather(row.text) + # only grab a few days of weather + if len(weather.split("\n")) < DAYS_OF_WEATHER: + weather += line + "\n" + # trim off last newline + weather = weather[:-1] + + # get any alerts and return the count + alerts = getWeatherAlerts(lat, lon) + alert = alerts[0] + if alert == "No weather alerts found": + alert = "" + alert_num = 0 else: - print (f"{log_timestamp()} System: No nodes found") - return position - -def send_message(message, ch, nodeid=0): - # if message over 160 characters, split it into multiple messages - if len(message) > 160: - print (f"{log_timestamp()} System: Splitting Message, Message Length: {len(message)}") - # split the message into 160 character chunks - - #message = message.replace('\n', ' NEWLINE ') # replace newlines with NEWLINE to keep them in split chunks + alert_num = alerts[1] - split_message = message.split() - line = '' - split_len = 160 - message_list = [] + if alert_num > 0: + # add the alert count warning to the weather + weather = str(alert_num) + " local alerts!\n" + weather + "\n" + alert - for word in split_message: - if len(line+word)