# helper functions to use location data for the API for NOAA weather, FEMA iPAWS, and repeater data # K7MHI Kelly Keeton 2024 import json # pip install json from geopy.geocoders import Nominatim # pip install geopy import maidenhead as mh # pip install maidenhead import requests # pip install requests import bs4 as bs # pip install beautifulsoup4 import xml.dom.minidom # used for parsing XML import xml.parsers.expat # used for parsing XML from datetime import datetime from modules.log import logger import modules.settings as my_settings import math import csv import os import sqlite3 trap_list_location = ("whereami", "wx", "wxa", "wxalert", "rlist", "ea", "ealert", "riverflow", "valert", "earthquake", "howfar", "map",) def where_am_i(lat=0, lon=0, short=False, zip=False): whereIam = "" grid = mh.to_maiden(float(lat), float(lon)) location = lat, lon if int(float(lat)) == 0 and int(float(lon)) == 0: logger.error("Location: No GPS data, try sending location") return my_settings.NO_DATA_NOGPS # initialize Nominatim API geolocator = Nominatim(user_agent="mesh-bot") try: # Nomatim API call to get address if short: location = geolocator.reverse(lat + ", " + lon) address = location.raw['address'] address_components = ['city', 'state', 'county', 'country'] whereIam = f"City: {address.get('city', '')}. State: {address.get('state', '')}. County: {address.get('county', '')}. Country: {address.get('country', '')}." return whereIam if zip: # return a string with zip code only location = geolocator.reverse(str(lat) + ", " + str(lon)) whereIam = location.raw['address'].get('postcode', '') return whereIam if float(lat) == my_settings.latitudeValue and float(lon) == my_settings.longitudeValue: # redacted address when no GPS and using default location location = geolocator.reverse(str(lat) + ", " + str(lon)) address = location.raw['address'] address_components = { 'city': 'City', 'state': 'State', 'postcode': 'Zip', 'county': 'County', 'country': 'Country' } whereIam += ', '.join([f"{label}: {address.get(component, '')}" for component, label in address_components.items() if component in address]) else: location = geolocator.reverse(lat + ", " + lon) address = location.raw['address'] address_components = { 'house_number': 'Number', 'road': 'Road', 'city': 'City', 'state': 'State', 'postcode': 'Zip', 'county': 'County', 'country': 'Country' } whereIam += ', '.join([f"{label}: {address.get(component, '')}" for component, label in address_components.items() if component in address]) whereIam += f", Grid: " + grid return whereIam except Exception as e: logger.debug("Location:Error fetching location data with whereami, likely network error") return my_settings.ERROR_FETCHING_DATA def getRepeaterBook(lat=0, lon=0): grid = mh.to_maiden(float(lat), float(lon)) data = [] # check if in the US or not usapi ="https://www.repeaterbook.com/repeaters/prox_result.php?" elsewhereapi = "https://www.repeaterbook.com/row_repeaters/prox2_result.php?" if grid[:2] in ['CN', 'DN', 'EN', 'FN', 'CM', 'DM', 'EM', 'FM', 'DL', 'EL', 'FL']: repeater_url = usapi logger.debug("Location: Fetching repeater data from RepeaterBook US API for grid " + grid) else: repeater_url = elsewhereapi logger.debug("Location: Fetching repeater data from RepeaterBook International API for grid " + grid) repeater_url += f"city={grid}&lat=&long=&distance=50&Dunit=m&band%5B%5D=4&band%5B%5D=16&freq=&call=&mode%5B%5D=1&mode%5B%5D=2&mode%5B%5D=4&mode%5B%5D=64&status_id=1&use=%25&use=OPEN&order=distance_calc%2C+state_id+ASC" try: msg = '' user_agent = {'User-agent': 'Mozilla/5.0'} response = requests.get(repeater_url, headers=user_agent, timeout=my_settings.urlTimeoutSeconds) # Fail early on bad HTTP status if response.status_code != 200: logger.error(f"Location:Error fetching repeater data from {repeater_url} with status code {response.status_code}") return my_settings.ERROR_FETCHING_DATA soup = bs.BeautifulSoup(response.text, 'html.parser') # match the repeater table by presence of the "sortable" class (class order/extra classes may vary) table = soup.select_one('table.sortable') if table is not None: cells = table.find_all('td') data = [] for i in range(0, len(cells), 11): if i + 10 < len(cells): #avoid IndexError repeater = { 'frequency': cells[i].text.strip() if i < len(cells) else 'N/A', 'offset': cells[i + 1].text.strip() if i + 1 < len(cells) else 'N/A', 'tone': cells[i + 2].text.strip() if i + 2 < len(cells) else 'N/A', 'call_sign': cells[i + 3].text.strip() if i + 3 < len(cells) else 'N/A', 'location': cells[i + 4].text.strip() if i + 4 < len(cells) else 'N/A', 'state': cells[i + 5].text.strip() if i + 5 < len(cells) else 'N/A', 'use': cells[i + 6].text.strip() if i + 6 < len(cells) else 'N/A', 'mode': cells[i + 7].text.strip() if i + 7 < len(cells) else 'N/A', 'distance': cells[i + 8].text.strip() if i + 8 < len(cells) else 'N/A', 'direction': cells[i + 9].text.strip() if i + 9 < len(cells) else 'N/A' } data.append(repeater) else: # No table found — could be legitimately no data or markup change. logger.debug("Location: No repeater table found on RepeaterBook page, scraping failed or no data for region.") msg = "No Data for your Region" except Exception as e: msg = "No repeaters found 😔" # Limit the output to the first 4 repeaters for repeater in data[:4]: tmpTone = repeater['tone'].replace(" /", "") msg += f"{repeater['call_sign']}📶{repeater['frequency']}{repeater['offset']},{tmpTone}.{repeater['mode']}" if repeater != data[:4][-1]: msg += '\n' return msg def getArtSciRepeaters(lat=0, lon=0): # UK api_url = "https://api-beta.rsgb.online/all/systems" #grid = mh.to_maiden(float(lat), float(lon)) repeaters = [] zipCode = where_am_i(lat, lon, zip=True) if zipCode == my_settings.NO_DATA_NOGPS or zipCode == my_settings.ERROR_FETCHING_DATA: return zipCode if zipCode.isnumeric(): try: artsci_url = f"http://www.artscipub.com/mobile/showstate.asp?zip={zipCode}" response = requests.get(artsci_url, timeout=my_settings.urlTimeoutSeconds) if response.status_code!=200: logger.error(f"Location:Error fetching data from {artsci_url} with status code {response.status_code}") soup = bs.BeautifulSoup(response.text, 'html.parser') # results needed xpath is /html/body/table[2]/tbody/tr/td/table/tbody/tr[2]/td/table table = soup.find_all('table')[1] rows = table.find_all('tr') for row in rows: cols = row.find_all('td') cols = [ele.text.strip() for ele in cols] # if no elements have the word 'located' then append if not any('located' in ele for ele in cols): if not any('Location' in ele for ele in cols): repeaters.append([ele for ele in cols if ele]) except Exception as e: logger.error(f"Error fetching data from {artsci_url}: {e}") if repeaters != []: msg = f"Found:{len(repeaters)} in {zipCode}\n" for repeater in repeaters: # format is ['City', 'Frequency', 'Offset', 'PL', 'Call', 'Notes'] # there might be missing elements or only one element if len(repeater) == 2: msg += f"Freq:{repeater[1]}" elif len(repeater) == 3: msg += f"Freq:{repeater[1]}, PL:{repeater[2]}" elif len(repeater) == 4: msg += f"Freq:{repeater[1]}, PL:{repeater[2]}, ID: {repeater[3]}" elif len(repeater) == 5: msg += f"Freq:{repeater[1]}, PL:{repeater[2]}, ID:{repeater[3]}" elif len(repeater) == 6: msg += f"Freq:{repeater[1]}, PL:{repeater[2]}, ID:{repeater[3]}. {repeater[5]}" if repeater != repeaters[-1]: msg += "\n" else: msg = f"no results.. sorry" return msg def get_NOAAtide(lat=0, lon=0): station_id = "" location = lat,lon if float(lat) == 0 and float(lon) == 0: lat = my_settings.latitudeValue lon = my_settings.longitudeValue station_lookup_url = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/tidepredstations.json?lat=" + str(lat) + "&lon=" + str(lon) + "&radius=50" try: station_data = requests.get(station_lookup_url, timeout=my_settings.urlTimeoutSeconds) if station_data.ok: station_json = station_data.json() else: logger.error("Location:Error fetching tide station table from NOAA") return my_settings.ERROR_FETCHING_DATA if station_json['stationList'] == [] or station_json['stationList'] is None: logger.error("Location:No tide station found") return "No tide station found with info provided" station_id = station_json['stationList'][0]['stationId'] except (requests.exceptions.RequestException, json.JSONDecodeError): logger.error("Location:Error fetching tide station table from NOAA") return my_settings.ERROR_FETCHING_DATA station_url = "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=today&time_zone=lst_ldt&datum=MLLW&product=predictions&interval=hilo&format=json&station=" + station_id if my_settings.use_metric: station_url += "&units=metric" else: station_url += "&units=english" try: tide_data = requests.get(station_url, timeout=my_settings.urlTimeoutSeconds) if tide_data.ok: tide_json = tide_data.json() else: logger.error("Location:Error fetching tide data from NOAA") return my_settings.ERROR_FETCHING_DATA except (requests.exceptions.RequestException, json.JSONDecodeError): logger.error("Location:Error fetching tide data from NOAA") return my_settings.ERROR_FETCHING_DATA tide_data = tide_json['predictions'] # format tide data into a table string for mesh # get the date out of the first t value tide_date = tide_data[0]['t'].split(" ")[0] tide_table = "Tide Data for " + tide_date + "\n" for tide in tide_data: tide_time = tide['t'].split(" ")[1] if not my_settings.zuluTime: # convert to 12 hour clock if int(tide_time.split(":")[0]) > 12: tide_time = str(int(tide_time.split(":")[0]) - 12) + ":" + tide_time.split(":")[1] + " PM" else: tide_time = tide_time + " AM" tide_table += tide['type'] + " " + tide_time + ", " + tide['v'] + "\n" # remove last newline tide_table = tide_table[:-1] return tide_table def get_NOAAweather(lat=0, lon=0, unit=0, report_days=None): # get weather report from NOAA for forecast detailed weather = "" location = lat,lon if float(lat) == 0 and float(lon) == 0: lat = my_settings.latitudeValue lon = my_settings.longitudeValue if report_days is None: report_days = my_settings.forecastDuration # get weather data from NOAA units for metric unit = 1 is metric if my_settings.use_metric: unit = 1 logger.debug("Location: new API metric units not implemented yet") weather_url = "https://forecast.weather.gov/MapClick.php?FcstType=text&lat=" + str(lat) + "&lon=" + str(lon) weather_api = "https://api.weather.gov/points/" + str(lat) + "," + str(lon) # extract the "forecast": property from the JSON response try: weather_data = requests.get(weather_api, timeout=my_settings.urlTimeoutSeconds) if not weather_data.ok: logger.warning("Location:Error fetching weather data from NOAA for location") return my_settings.ERROR_FETCHING_DATA except Exception: logger.warning(f"Location:Error fetching weather data malformed: {Exception}") return my_settings.ERROR_FETCHING_DATA # get the forecast URL from the JSON response weather_json = weather_data.json() forecast_url = weather_json['properties']['forecast'] try: forecast_data = requests.get(forecast_url, timeout=my_settings.urlTimeoutSeconds) if not forecast_data.ok: logger.warning("Location:Error fetching weather forecast from NOAA") return my_settings.ERROR_FETCHING_DATA except Exception: logger.warning(f"Location:Error fetching weather data missing: {Exception}") return my_settings.ERROR_FETCHING_DATA # from periods, get the detailedForecast from number of days in NOAAforecastDuration forecast_json = forecast_data.json() forecast = forecast_json['properties']['periods'] for day in forecast[:report_days]: # abreviate the forecast weather += abbreviate_noaa(day['name']) + ": " + abbreviate_noaa(day['detailedForecast']) + "\n" # remove last newline weather = weather[:-1] # get any alerts and return the count alerts = getWeatherAlertsNOAA(lat, lon) if alerts == my_settings.ERROR_FETCHING_DATA or alerts == my_settings.NO_DATA_NOGPS or alerts == my_settings.NO_ALERTS: alert = "" alert_num = 0 else: alert = alerts[0] alert_num = alerts[1] if int(alert_num) > 0: # add the alert count warning to the weather weather = str(alert_num) + " local alerts!\n" + weather + "\n" + alert return weather def case_insensitive_replace(text, old, new): """Replace all occurrences of old (any case) in text with new.""" idx = 0 old_lower = old.lower() text_lower = text.lower() while True: idx = text_lower.find(old_lower, idx) if idx == -1: break text = text[:idx] + new + text[idx+len(old):] text_lower = text.lower() idx += len(new) return text def abbreviate_noaa(data=""): # Long phrases (with spaces) phrase_replacements = { "less than a tenth of an inch possible": "< 0.1in", "between a tenth and quarter of an inch possible": "0.1-0.25in", "between a quarter and half an inch possible": "0.25-0.5in", "between a half and three quarters of an inch possible": "0.5-0.75in", "between one and two inches possible": "1-2in", "between two and three inches possible": "2-3in", "between three and four inches possible": "3-4in", "between four and five inches possible": "4-5in", "between five and six inches possible": "5-6in", "between six and eight inches possible": "6-8in", "gusts as high as": "gusts to", } # Single words (no spaces) word_replacements = { "monday": "Mon", "tuesday": "Tue", "wednesday": "Wed", "thursday": "Thu", "friday": "Fri", "saturday": "Sat", "sunday": "Sun", "northwest": "NW", "northeast": "NE", "southwest": "SW", "southeast": "SE", "north": "N", "south": "S", "east": "E", "west": "W", "accumulation": "accum", "visibility": "vis", "precipitation": "precip", "showers": "shwrs", "thunderstorms": "t-storms", "thunderstorm": "t-storm", "quarters": "qtrs", "quarter": "qtr", "january": "Jan", "february": "Feb", "march": "Mar", "april": "Apr", "may": "May", "june": "Jun", "july": "Jul", "august": "Aug", "september": "Sep", "october": "Oct", "november": "Nov", "december": "Dec", "degrees": "°", "percent": "%", "department": "Dept.", "temperatures": "temps:", "temperature": "temp:", "amounts": "amts:", "afternoon": "Aftn", "around": "~", "evening": "Eve", } text = data # Replace long phrases (case-insensitive) for key in sorted(phrase_replacements, key=len, reverse=True): value = phrase_replacements[key] text = case_insensitive_replace(text, key, value) # Replace single words (case-insensitive) for key in word_replacements: value = word_replacements[key] text = case_insensitive_replace(text, key, value) return text def getWeatherAlertsNOAA(lat=0, lon=0, useDefaultLatLon=False): # get weather alerts from NOAA limited to ALERT_COUNT with the total number of alerts found alerts = "" location = lat,lon if useDefaultLatLon: lat = my_settings.latitudeValue lon = my_settings.longitudeValue if float(lat) == 0 and float(lon) == 0 and not useDefaultLatLon: return my_settings.NO_DATA_NOGPS alert_url = "https://api.weather.gov/alerts/active.atom?point=" + str(lat) + "," + str(lon) #alert_url = "https://api.weather.gov/alerts/active.atom?area=WA" #logger.debug("Location:Fetching weather alerts from NOAA for " + str(lat) + ", " + str(lon)) try: alert_data = requests.get(alert_url, timeout=my_settings.urlTimeoutSeconds) if not alert_data.ok: logger.warning("Location:Error fetching weather alerts from NOAA bad data") return my_settings.ERROR_FETCHING_DATA except Exception as e: logger.warning(f"Location:Error fetching weather alert request error: {type(e).__name__}: {e}") return my_settings.ERROR_FETCHING_DATA alerts = "" alertxml = xml.dom.minidom.parseString(alert_data.text) for i in alertxml.getElementsByTagName("entry"): title = i.getElementsByTagName("title")[0].childNodes[0].nodeValue area_desc_nodes = i.getElementsByTagName("cap:areaDesc") if area_desc_nodes and area_desc_nodes[0].childNodes: area_desc = area_desc_nodes[0].childNodes[0].nodeValue else: area_desc = "" # Extract NWSheadline from cap:parameter if present nws_headline = "" for param in i.getElementsByTagName("cap:parameter"): try: value_name = param.getElementsByTagName("valueName")[0].childNodes[0].nodeValue if value_name == "NWSheadline": nws_headline = param.getElementsByTagName("value")[0].childNodes[0].nodeValue break except Exception: continue # If title is "Special Weather Statement" and headline exists, use headline only if "special" in title.lower() and nws_headline: main_alert = nws_headline else: main_alert = title if my_settings.enableExtraLocationWx: # adds location data which is too much data? alerts += f"{main_alert}. {area_desc.replace(' ', '')}" # Only add headline if not already used as main_alert # if nws_headline and main_alert != nws_headline: # alerts += f" ALERT: {nws_headline}" alerts += "\n" else: alerts += f"{main_alert}" # Only add headline if not already used as main_alert # if nws_headline and main_alert != nws_headline: # alerts += f" ALERT: {nws_headline}" alerts += "\n" if alerts == "" or alerts == None: return my_settings.NO_ALERTS # trim off last newline if alerts[-1] == "\n": alerts = alerts[:-1] # get the number of alerts alert_num = 0 alert_num = len(alerts.split("\n")) alerts = abbreviate_noaa(alerts) # return the first ALERT_COUNT alerts data = "\n".join(alerts.split("\n")[:my_settings.numWxAlerts]), alert_num return data wxAlertCacheNOAA = "" def alertBrodcastNOAA(): # get the latest weather alerts and broadcast them if there are any global wxAlertCacheNOAA currentAlert = getWeatherAlertsNOAA(my_settings.latitudeValue, my_settings.longitudeValue) # check if any reason to discard the alerts if currentAlert == my_settings.ERROR_FETCHING_DATA or currentAlert == my_settings.NO_DATA_NOGPS: return False elif currentAlert == my_settings.NO_ALERTS: wxAlertCacheNOAA = "" return False if my_settings.ignoreEASenable: # check if the alert is in the ignoreEAS list for word in my_settings.ignoreEASwords: if word.lower() in currentAlert[0].lower(): logger.debug(f"Location:Ignoring NOAA Alert: {currentAlert[0]} containing {word}") return False # broadcast the alerts send to wxBrodcastCh elif currentAlert[0] not in wxAlertCacheNOAA: # Check if the current alert is not in the weather alert cache wxAlertCacheNOAA = currentAlert[0] return currentAlert return False def getActiveWeatherAlertsDetailNOAA(lat=0, lon=0): # get the latest details of weather alerts from NOAA alerts = "" location = lat,lon if float(lat) == 0 and float(lon) == 0: lat = my_settings.latitudeValue lon = my_settings.longitudeValue alert_url = "https://api.weather.gov/alerts/active.atom?point=" + str(lat) + "," + str(lon) #alert_url = "https://api.weather.gov/alerts/active.atom?area=WA" #logger.debug("Location:Fetching weather alerts detailed from NOAA for " + str(lat) + ", " + str(lon)) try: alert_data = requests.get(alert_url, timeout=my_settings.urlTimeoutSeconds) if not alert_data.ok: logger.warning("Location:Error fetching weather alerts from NOAA bad data") return my_settings.ERROR_FETCHING_DATA except Exception as e: logger.warning(f"Location:Error fetching active weather alert request error: {type(e).__name__}: {e}") return my_settings.ERROR_FETCHING_DATA alerts = "" alertxml = xml.dom.minidom.parseString(alert_data.text) for i in alertxml.getElementsByTagName("entry"): summary = i.getElementsByTagName("summary")[0].childNodes[0].nodeValue summary = summary.replace("\n\n", " ") summary = summary.replace("\n", " ") summary = summary.replace("*", "\n") alerts += ( i.getElementsByTagName("title")[0].childNodes[0].nodeValue + summary + "\n***\n" ) alerts = abbreviate_noaa(alerts) # trim the alerts to the first ALERT_COUNT alerts = alerts.split("\n***\n")[:my_settings.numWxAlerts] if alerts == "" or alerts == ['']: return my_settings.NO_ALERTS # trim off last newline if alerts[-1] == "\n": alerts = alerts[:-1] alerts = "\n".join(alerts) return alerts def getIpawsAlert(lat=0, lon=0, shortAlerts = False): # get the latest IPAWS alert from FEMA alert = '' alerts = [] linked_data = '' # set the API URL for IPAWS namespace = "urn:oasis:names:tc:emergency:cap:1.2" alert_url = "https://apps.fema.gov/IPAWSOPEN_EAS_SERVICE/rest/feed" # get the alerts from FEMA try: alert_data = requests.get(alert_url, timeout=my_settings.urlTimeoutSeconds) if not alert_data.ok: logger.warning(f"System: iPAWS fetching IPAWS alerts from FEMA (HTTP {alert_data.status_code})") return my_settings.ERROR_FETCHING_DATA except Exception as e: logger.warning(f"System: iPAWS fetching IPAWS alerts from FEMA failed: {e}") return my_settings.ERROR_FETCHING_DATA # main feed bulletins alertxml = xml.dom.minidom.parseString(alert_data.text) # extract alerts from main feed for entry in alertxml.getElementsByTagName("entry"): link = entry.getElementsByTagName("link")[0].getAttribute("href") ## state FIPS ## This logic is being added to reduce load on FEMA server. stateFips = None for cat in entry.getElementsByTagName("category"): if cat.getAttribute("label") == "statefips": stateFips = cat.getAttribute("term") break if stateFips is None: # no stateFIPS found — skip continue # check if it matches your list if stateFips not in my_settings.myStateFIPSList: #logger.debug(f"Skipping FEMA record link {link} with stateFIPS code of: {stateFips} because it doesn't match our StateFIPSList {my_settings.myStateFIPSList}") continue # skip to next entry try: # get the linked alert data from FEMA linked_data = requests.get(link, timeout=my_settings.urlTimeoutSeconds) if not linked_data.ok or not linked_data.text.strip(): # if the linked data is not ok, skip this alert #logger.warning(f"System: iPAWS Error fetching linked alert data from {link}") continue else: linked_xml = xml.dom.minidom.parseString(linked_data.text) # this alert is a full CAP alert except (requests.exceptions.RequestException): logger.warning(f"System: iPAWS Error fetching embedded alert data from {link}") continue except xml.parsers.expat.ExpatError: logger.warning(f"System: iPAWS Error parsing XML from {link}") continue except Exception as e: logger.debug(f"System: iPAWS Error processing alert data from {link}: {e}") continue for info in linked_xml.getElementsByTagName("info"): # only get en-US language alerts (alternative is es-US) language_nodes = info.getElementsByTagName("language") if not any(node.firstChild and node.firstChild.nodeValue.strip() == "en-US" for node in language_nodes): continue # skip if not en-US # extract values from XML sameVal = "NONE" geocode_value = "NONE" description = "" try: eventCode_table = info.getElementsByTagName("eventCode")[0] alertType = eventCode_table.getElementsByTagName("valueName")[0].childNodes[0].nodeValue alertCode = eventCode_table.getElementsByTagName("value")[0].childNodes[0].nodeValue headline = info.getElementsByTagName("headline")[0].childNodes[0].nodeValue # use headline if no description if info.getElementsByTagName("description") and info.getElementsByTagName("description")[0].childNodes: description = info.getElementsByTagName("description")[0].childNodes[0].nodeValue else: description = headline area_table = info.getElementsByTagName("area")[0] areaDesc = area_table.getElementsByTagName("areaDesc")[0].childNodes[0].nodeValue geocode_table = area_table.getElementsByTagName("geocode")[0] geocode_type = geocode_table.getElementsByTagName("valueName")[0].childNodes[0].nodeValue geocode_value = geocode_table.getElementsByTagName("value")[0].childNodes[0].nodeValue if geocode_type == "SAME": sameVal = geocode_value except Exception as e: logger.debug(f"System: iPAWS Error extracting alert data: {link}") #print(f"DEBUG: {info.toprettyxml()}") continue # check if the alert is for the SAME location, if wanted keep alert if (sameVal in my_settings.mySAMEList) or (geocode_value in my_settings.mySAMEList) or my_settings.mySAMEList == ['']: ignore_alert = False if my_settings.ignoreFEMAenable: ignore_alert = any( word.lower() in headline.lower() for word in my_settings.ignoreFEMAwords) if ignore_alert: logger.debug(f"System: Filtering FEMA Alert by WORD: {headline} containing one of {my_settings.ignoreFEMAwords} at {areaDesc}") if ignore_alert: continue # add to alert list alerts.append({ 'alertType': alertType, 'alertCode': alertCode, 'headline': headline, 'areaDesc': areaDesc, 'geocode_type': geocode_type, 'geocode_value': geocode_value, 'description': description }) else: logger.debug(f"System: iPAWS Alert not in SAME List: {sameVal} or {geocode_value} for {headline} at {areaDesc}") continue # return the numWxAlerts of alerts if len(alerts) > 0: for alertItem in alerts[:my_settings.numWxAlerts]: if shortAlerts: alert += abbreviate_noaa(f"🚨FEMA Alert: {alertItem['headline']}") else: alert += abbreviate_noaa(f"🚨FEMA Alert: {alertItem['headline']}\n{alertItem['description']}") # add a newline if not the last alert if alertItem != alerts[:my_settings.numWxAlerts][-1]: alert += "\n" else: alert = my_settings.NO_ALERTS return alert def get_flood_noaa(lat=0, lon=0, uid=None): """ Fetch the latest flood alert from NOAA for a given gauge UID. Returns a formatted string or an error message. """ api_url = "https://api.water.noaa.gov/nwps/v1/gauges/" headers = {'accept': 'application/json'} if not uid: logger.warning(f"Location:No flood gauge data found for UID {uid}") return my_settings.ERROR_FETCHING_DATA try: response = requests.get(api_url + str(uid), headers=headers, timeout=my_settings.urlTimeoutSeconds) if not response.ok: logger.warning(f"Location:Error fetching flood gauge data from NOAA for {uid} (HTTP {response.status_code})") return my_settings.ERROR_FETCHING_DATA data = response.json() if not data or 'status' not in data: logger.warning(f"Location:No flood gauge data found for UID {uid}") return "No flood gauge data found" except requests.exceptions.RequestException as e: logger.warning(f"Location:Error fetching flood gauge data from: {api_url}{uid} ({e})") return my_settings.ERROR_FETCHING_DATA except Exception as e: logger.warning(f"Location:Unexpected error: {e}") return my_settings.ERROR_FETCHING_DATA # extract values from JSON safely try: name = data.get('name', 'Unknown') observed = data['status'].get('observed', {}) forecast = data['status'].get('forecast', {}) flood_data = f"Flood Data {name}:\n" flood_data += f"Observed: {observed.get('primary', '?')}{observed.get('primaryUnit', '')} ({observed.get('secondary', '?')}{observed.get('secondaryUnit', '')}) risk: {observed.get('floodCategory', '?')}" flood_data += f"\nForecast: {forecast.get('primary', '?')}{forecast.get('primaryUnit', '')} ({forecast.get('secondary', '?')}{forecast.get('secondaryUnit', '')}) risk: {forecast.get('floodCategory', '?')}" #flood_data += f"\nStage: {data.get('stage', '?')} {data.get('stageUnit', '')}, Flow: {data.get('flow', '?')} {data.get('flowUnit', '')}" #flood_data += f"\nLast Updated: {data.get('status', {}).get('lastUpdated', '?')}" flood_data += f"\n" return flood_data except Exception as e: logger.debug(f"Location:Error extracting flood gauge data from NOAA for {uid}: {e}") return my_settings.ERROR_FETCHING_DATA def get_volcano_usgs(lat=0, lon=0): alerts = '' if lat == 0 and lon == 0: lat = my_settings.latitudeValue lon = my_settings.longitudeValue # get the latest volcano alert from USGS from CAP feed usgs_volcano_url = "https://volcanoes.usgs.gov/hans-public/api/volcano/getCapElevated" try: volcano_data = requests.get(usgs_volcano_url, timeout=my_settings.urlTimeoutSeconds) if not volcano_data.ok: logger.warning("System: Issue with fetching volcano alerts from USGS") return my_settings.ERROR_FETCHING_DATA except (requests.exceptions.RequestException): logger.warning("System: Issue with fetching volcano alerts from USGS") return my_settings.ERROR_FETCHING_DATA volcano_json = volcano_data.json() # extract alerts from main feed if volcano_json and isinstance(volcano_json, list): for alert in volcano_json: # check ignore list if my_settings.ignoreUSGSEnable: for word in my_settings.ignoreUSGSwords: if word.lower() in alert['volcano_name_appended'].lower(): logger.debug(f"System: Ignoring USGS Alert: {alert['volcano_name_appended']} containing {word}") continue # check if the alert lat long is within the range of bot latitudeValue and longitudeValue if (alert['latitude'] >= my_settings.latitudeValue - 10 and alert['latitude'] <= my_settings.latitudeValue + 10) and (alert['longitude'] >= my_settings.longitudeValue - 10 and alert['longitude'] <= my_settings.longitudeValue + 10): volcano_name = alert['volcano_name_appended'] alert_level = alert['alert_level'] color_code = alert['color_code'] cap_severity = alert['cap_severity'] synopsis = alert['synopsis'] # format Alert alerts += f"🌋🚨: {volcano_name}, {alert_level} {color_code}, {cap_severity}.\n{synopsis}\n" else: #logger.debug(f"System: USGS volcano alert not in range: {alert['volcano_name_appended']}") continue else: logger.debug("Location:Error fetching volcano data from USGS") return my_settings.NO_ALERTS if alerts == "": return my_settings.NO_ALERTS # trim off last newline if alerts[-1] == "\n": alerts = alerts[:-1] # return the alerts alerts = abbreviate_noaa(alerts) return alerts def get_nws_marine(zone, days=3): # forecast from NWS coastal products try: marine_pz_data = requests.get(zone, timeout=my_settings.urlTimeoutSeconds) if not marine_pz_data.ok: logger.warning(f"Location:Error fetching NWS Marine data (HTTP {marine_pz_data.status_code})") return my_settings.ERROR_FETCHING_DATA except requests.exceptions.RequestException as e: logger.warning(f"Location:Error fetching NWS Marine data: {e}") return my_settings.ERROR_FETCHING_DATA marine_pz_data = marine_pz_data.text todayDate = datetime.now().strftime("%Y%m%d") if marine_pz_data and marine_pz_data.startswith("Expires:"): try: expires = marine_pz_data.split(";;")[0].split(":")[1] expires_date = expires[:8] if expires_date < todayDate: logger.debug("Location: NWS Marine PZ data expired") return my_settings.ERROR_FETCHING_DATA except Exception as e: logger.debug(f"Location: NWS Marine PZ data parse error: {e}") return my_settings.ERROR_FETCHING_DATA else: logger.debug("Location: NWS Marine PZ data not valid or empty") return my_settings.ERROR_FETCHING_DATA # process the marine forecast data marine_pzz_lines = marine_pz_data.split("\n") marine_pz_report = "" day_blocks = [] current_block = "" in_forecast = False for line in marine_pzz_lines: if line.startswith(".") and "..." in line: in_forecast = True if current_block: day_blocks.append(current_block.strip()) current_block = "" current_block += line.strip() + " " elif in_forecast and line.strip() != "": current_block += line.strip() + " " if current_block: day_blocks.append(current_block.strip()) # Only keep up to pzDays blocks for block in day_blocks[:my_settings.coastalForecastDays]: marine_pz_report += block + "\n" # remove last newline if marine_pz_report.endswith("\n"): marine_pz_report = marine_pz_report[:-1] # remove NOAA EOF $$ if marine_pz_report.endswith("$$"): marine_pz_report = marine_pz_report[:-2].strip() # abbreviate the report marine_pz_report = abbreviate_noaa(marine_pz_report) if marine_pz_report == "": return my_settings.NO_DATA_NOGPS return marine_pz_report def checkUSGSEarthQuake(lat=0, lon=0): if lat == 0 and lon == 0: lat = my_settings.latitudeValue lon = my_settings.longitudeValue radius = 100 # km magnitude = 1.5 history = 7 # days startDate = datetime.fromtimestamp(datetime.now().timestamp() - history*24*60*60).strftime("%Y-%m-%d") USGSquake_url = f"https://earthquake.usgs.gov/fdsnws/event/1/query?&format=xml&latitude={lat}&longitude={lon}&maxradiuskm={radius}&minmagnitude={magnitude}&starttime={startDate}" description_text = "" quake_count = 0 # fetch the earthquake data from USGS try: quake_data = requests.get(USGSquake_url, timeout=my_settings.urlTimeoutSeconds) if not quake_data.ok: logger.warning("Location:Error fetching earthquake data from USGS") return my_settings.NO_ALERTS if not quake_data.text.strip(): return my_settings.NO_ALERTS try: quake_xml = xml.dom.minidom.parseString(quake_data.text) except Exception as e: logger.warning(f"Location: USGS earthquake API returned invalid XML: {e}") return my_settings.NO_ALERTS except (requests.exceptions.RequestException): logger.warning("Location:Error fetching earthquake data from USGS") return my_settings.NO_ALERTS quake_xml = xml.dom.minidom.parseString(quake_data.text) quake_count = len(quake_xml.getElementsByTagName("event")) #get largest mag in magnitude of the set of quakes largest_mag = 0.0 for event in quake_xml.getElementsByTagName("event"): mag = event.getElementsByTagName("magnitude")[0] mag_value = float(mag.getElementsByTagName("value")[0].childNodes[0].nodeValue) if mag_value > largest_mag: largest_mag = mag_value # set description text description_text = event.getElementsByTagName("description")[0].getElementsByTagName("text")[0].childNodes[0].nodeValue largest_mag = round(largest_mag, 1) if quake_count == 0: return my_settings.NO_ALERTS else: return f"{quake_count} 🫨quakes in last {history} days within {radius} km. Largest: {largest_mag}M\n{description_text}" howfarDB = {} def distance(lat=0,lon=0,nodeID=0, reset=False): # part of the howfar function, calculates the distance between two lat/lon points msg = "" dupe = False location = lat,lon r = 6371 # Radius of earth in kilometers # haversine formula if lat == 0 and lon == 0: return my_settings.NO_DATA_NOGPS if nodeID == 0: return "No NodeID provided" if reset: if nodeID in howfarDB: del howfarDB[nodeID] if nodeID not in howfarDB: #register first point NodeID, lat, lon, time, point howfarDB[nodeID] = [{'lat': lat, 'lon': lon, 'time': datetime.now()}] if reset: return "Tracking reset, new starting point registered🗺️" else: return "Starting point registered🗺️" else: #de-dupe points if same as last point if howfarDB[nodeID][-1]['lat'] == lat and howfarDB[nodeID][-1]['lon'] == lon: dupe = True msg = "No New GPS📍 " # calculate distance from last point in howfarDB last_point = howfarDB[nodeID][-1] lat1 = math.radians(last_point['lat']) lon1 = math.radians(last_point['lon']) lat2 = math.radians(lat) lon2 = math.radians(lon) dlon = lon2 - lon1 dlat = lat2 - lat1 a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2 c = 2 * math.asin(math.sqrt(a)) distance_km = c * r if my_settings.use_metric: msg += f"{distance_km:.2f} km" else: distance_miles = distance_km * 0.621371 msg += f"{distance_miles:.2f} miles" #calculate bearing x = math.sin(dlon) * math.cos(lat2) y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1) * math.cos(lat2) * math.cos(dlon)) initial_bearing = math.atan2(x, y) initial_bearing = math.degrees(initial_bearing) compass_bearing = (initial_bearing + 360) % 360 msg += f" 🧭{compass_bearing:.2f}° Bearing from last📍" # calculate the speed if time difference is more than 1 minute time_diff = datetime.now() - last_point['time'] if time_diff.total_seconds() > 60: hours = time_diff.total_seconds() / 3600 if my_settings.use_metric: speed = distance_km / hours speed_str = f"{speed:.2f} km/h" else: speed_mph = (distance_km * 0.621371) / hours speed_str = f"{speed_mph:.2f} mph" msg += f", travel time: {int(time_diff.total_seconds()//60)} min, Speed: {speed_str}" # calculate total distance traveled including this point computed in distance_km from calculate distance from last point in howfarDB total_distance_km = 0.0 for i in range(1, len(howfarDB[nodeID])): point1 = howfarDB[nodeID][i-1] point2 = howfarDB[nodeID][i] lat1 = math.radians(point1['lat']) lon1 = math.radians(point1['lon']) lat2 = math.radians(point2['lat']) lon2 = math.radians(point2['lon']) dlon = lon2 - lon1 dlat = lat2 - lat1 a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2 c = 2 * math.asin(math.sqrt(a)) total_distance_km += c * r # add the distance from last point to current point total_distance_km += distance_km if my_settings.use_metric: msg += f", Total: {total_distance_km:.2f} km" else: total_distance_miles = total_distance_km * 0.621371 msg += f", Total: {total_distance_miles:.2f} miles" # update the last point in howfarDB if not dupe: howfarDB[nodeID].append({'lat': lat, 'lon': lon, 'time': datetime.now()}) # if points 3+ are within 30 meters of the first point add the area of the polygon if len(howfarDB[nodeID]) >= 3: points = [] # loop the howfarDB to get all the points except the current nodeID for key in howfarDB: if key != nodeID: points.append((howfarDB[key][-1]['lat'], howfarDB[key][-1]['lon'])) # loop the howfarDB[nodeID] to get the points for point in howfarDB[nodeID]: points.append((point['lat'], point['lon'])) # close the polygon by adding the first point to the end points.append((howfarDB[nodeID][0]['lat'], howfarDB[nodeID][0]['lon'])) # calculate the area of the polygon area = 0.0 for i in range(len(points)-1): lat1 = math.radians(points[i][0]) lon1 = math.radians(points[i][1]) lat2 = math.radians(points[i+1][0]) lon2 = math.radians(points[i+1][1]) area += (lon2 - lon1) * (2 + math.sin(lat1) + math.sin(lat2)) area = area * (6378137 ** 2) / 2.0 area = abs(area) / 1e6 # convert to square kilometers if my_settings.use_metric: msg += f", Area: {area:.2f} sq.km (approx)" else: area_miles = area * 0.386102 msg += f", Area: {area_miles:.2f} sq.mi (approx)" #calculate the centroid of the polygon x = 0.0 y = 0.0 z = 0.0 for point in points[:-1]: lat_rad = math.radians(point[0]) lon_rad = math.radians(point[1]) x += math.cos(lat_rad) * math.cos(lon_rad) y += math.cos(lat_rad) * math.sin(lon_rad) z += math.sin(lat_rad) total_points = len(points) - 1 x /= total_points y /= total_points z /= total_points lon_centroid = math.atan2(y, x) hyp = math.sqrt(x * x + y * y) lat_centroid = math.atan2(z, hyp) lat_centroid = math.degrees(lat_centroid) lon_centroid = math.degrees(lon_centroid) msg += f", Centroid: {lat_centroid:.5f}, {lon_centroid:.5f}" return msg def get_openskynetwork(lat=0, lon=0, altitude=0, node_altitude=0, altitude_window=900): """ Returns the aircraft dict from OpenSky Network closest in altitude (within altitude_window meters) to the given node_altitude. If no aircraft found, returns my_settings.NO_ALERTS. """ def _to_float(v): try: # handle numeric and numeric-strings, treat empty/'N/A' as None if v is None: return None if isinstance(v, (int, float)): return float(v) s = str(v).strip() if s == "" or s.upper() == "N/A": return None return float(s) except Exception: return None try: # basic input validation/coercion try: lat = float(lat) lon = float(lon) except Exception: return False try: node_altitude = _to_float(node_altitude) or 0.0 except Exception: node_altitude = 0.0 if lat == 0 and lon == 0: return False box_size = 0.45 # approx 50km lamin = lat - box_size lamax = lat + box_size lomin = lon - box_size lomax = lon + box_size opensky_url = ( f"https://opensky-network.org/api/states/all?lamin={lamin}&lomin={lomin}" f"&lamax={lamax}&lomax={lomax}" ) try: aircraft_data = requests.get(opensky_url, timeout=my_settings.urlTimeoutSeconds) if not aircraft_data.ok: logger.warning("Location:Error fetching aircraft data from OpenSky Network") return False except (requests.exceptions.RequestException): logger.warning("Location:Error fetching aircraft data from OpenSky Network") return False aircraft_json = aircraft_data.json() if 'states' not in aircraft_json or not aircraft_json['states']: return False aircraft_list = aircraft_json['states'] logger.debug(f"Location: OpenSky Network: Found {len(aircraft_list)} possible aircraft in area") closest = None min_diff = float('inf') if len(aircraft_list) == 1: # Only one aircraft found; return normalized values (altitudes coerced to numbers or None) aircraft = aircraft_list[0] baro_alt = _to_float(aircraft[7]) # barometric altitude geo_alt = _to_float(aircraft[13]) # geometric altitude return { "callsign": aircraft[1].strip() if aircraft[1] else "N/A", "origin_country": aircraft[2] if aircraft[2] is not None else "N/A", "velocity": aircraft[9] if aircraft[9] is not None else "N/A", "true_track": aircraft[10] if aircraft[10] is not None else "N/A", "vertical_rate": aircraft[11] if aircraft[11] is not None else "N/A", "sensors": aircraft[12] if aircraft[12] is not None else "N/A", "altitude": baro_alt, "geo_altitude": geo_alt, "squawk": aircraft[14] if len(aircraft) > 14 and aircraft[14] is not None else "N/A", } for aircraft in aircraft_list: try: callsign = aircraft[1].strip() if aircraft[1] else "N/A" origin_country = aircraft[2] if aircraft[2] is not None else "N/A" velocity = aircraft[9] if aircraft[9] is not None else "N/A" true_track = aircraft[10] if aircraft[10] is not None else "N/A" vertical_rate = aircraft[11] if aircraft[11] is not None else "N/A" sensors = aircraft[12] if aircraft[12] is not None else "N/A" baro_altitude = _to_float(aircraft[7]) geo_altitude = _to_float(aircraft[13]) squawk = aircraft[14] if len(aircraft) > 14 and aircraft[14] is not None else "N/A" except Exception: logger.debug("Location:Error extracting aircraft data from OpenSky Network") continue # Prefer geo_altitude, fallback to baro_altitude plane_alt = geo_altitude if geo_altitude is not None else baro_altitude # skip if we can't get a numeric plane altitude or node_altitude is zero/unset if plane_alt is None or (node_altitude == 0 or node_altitude is None): continue # safe numeric diff try: diff = abs(float(plane_alt) - float(node_altitude)) except Exception: continue if diff <= altitude_window and diff < min_diff: min_diff = diff closest = { "callsign": callsign, "origin_country": origin_country, "velocity": velocity, "true_track": true_track, "vertical_rate": vertical_rate, "sensors": sensors, "altitude": baro_altitude, "geo_altitude": geo_altitude, "squawk": squawk, } if closest: return closest else: return False except Exception as e: logger.debug(f"SYSTEM: Location HighFly: Error processing OpenSky Network data: {e}") return False def get_public_location_admin_manage(): """Get the public_location_admin_manage setting directly from config file This ensures the setting is reloaded fresh on first load of the program """ import configparser config = configparser.ConfigParser() try: config.read("config.ini", encoding='utf-8') return config['location'].getboolean('public_location_admin_manage', False) except Exception: return False def get_delete_public_locations_admins_only(): """Get the delete_public_locations_admins_only setting directly from config file This ensures the setting is reloaded fresh on first load of the program """ import configparser config = configparser.ConfigParser() try: config.read("config.ini", encoding='utf-8') return config['location'].getboolean('delete_public_locations_admins_only', False) except Exception: return False def get_node_altitude(nodeID, deviceID=1): """Get altitude for a node from position data or positionMetadata Returns altitude in meters, or None if not available """ try: import modules.system as system_module # Try to get altitude from node position dict first # Access interface dynamically from system module interface = getattr(system_module, f'interface{deviceID}', None) if interface and hasattr(interface, 'nodes') and interface.nodes: for node in interface.nodes.values(): if nodeID == node['num']: pos = node.get('position') if pos and isinstance(pos, dict) and pos.get('altitude') is not None: try: altitude = float(pos['altitude']) if altitude > 0: # Valid altitude return altitude except (ValueError, TypeError): pass # Fall back to positionMetadata (from POSITION_APP packets) positionMetadata = getattr(system_module, 'positionMetadata', None) if positionMetadata and nodeID in positionMetadata: metadata = positionMetadata[nodeID] if 'altitude' in metadata: altitude = metadata.get('altitude', 0) if altitude and altitude > 0: return float(altitude) return None except Exception as e: logger.debug(f"Location: Error getting altitude for node {nodeID}: {e}") return None def initialize_locations_database(): """Initialize the SQLite database for storing saved locations""" try: # Ensure data directory exists db_dir = os.path.dirname(my_settings.locations_db) if db_dir: os.makedirs(db_dir, exist_ok=True) conn = sqlite3.connect(my_settings.locations_db) c = conn.cursor() logger.debug("Location: Initializing locations database...") # Check if table exists and get its structure c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='locations'") table_exists = c.fetchone() is not None if table_exists: # Check if is_public column exists c.execute("PRAGMA table_info(locations)") columns_info = c.fetchall() column_names = [col[1] for col in columns_info] # Check for UNIQUE constraint on location_name by examining the table schema c.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='locations'") table_sql = c.fetchone() has_unique_constraint = False if table_sql and table_sql[0]: # Check if UNIQUE constraint exists in the table definition if 'UNIQUE' in table_sql[0].upper() and 'location_name' in table_sql[0]: has_unique_constraint = True # If UNIQUE constraint exists, we need to recreate the table if has_unique_constraint: logger.debug("Location: Removing UNIQUE constraint from locations table") # Create temporary table without UNIQUE constraint c.execute('''CREATE TABLE locations_new (location_id INTEGER PRIMARY KEY AUTOINCREMENT, location_name TEXT NOT NULL, latitude REAL NOT NULL, longitude REAL NOT NULL, altitude REAL, description TEXT, userID TEXT, is_public INTEGER DEFAULT 0, created_date TEXT, created_time TEXT)''') # Copy data from old table to new table c.execute('''INSERT INTO locations_new (location_id, location_name, latitude, longitude, description, userID, is_public, created_date, created_time) SELECT location_id, location_name, latitude, longitude, description, userID, COALESCE(is_public, 0), created_date, created_time FROM locations''') # Drop old table c.execute("DROP TABLE locations") # Rename new table c.execute("ALTER TABLE locations_new RENAME TO locations") logger.debug("Location: Successfully removed UNIQUE constraint") # Refresh column list after table recreation c.execute("PRAGMA table_info(locations)") columns_info = c.fetchall() column_names = [col[1] for col in columns_info] # Add is_public column if it doesn't exist (migration) if 'is_public' not in column_names: try: c.execute('''ALTER TABLE locations ADD COLUMN is_public INTEGER DEFAULT 0''') logger.debug("Location: Added is_public column to locations table") except sqlite3.OperationalError: # Column might already exist, ignore pass # Add altitude column if it doesn't exist (migration) if 'altitude' not in column_names: try: c.execute('''ALTER TABLE locations ADD COLUMN altitude REAL''') logger.debug("Location: Added altitude column to locations table") except sqlite3.OperationalError: # Column might already exist, ignore pass else: # Table doesn't exist, create it without UNIQUE constraint c.execute('''CREATE TABLE locations (location_id INTEGER PRIMARY KEY AUTOINCREMENT, location_name TEXT NOT NULL, latitude REAL NOT NULL, longitude REAL NOT NULL, altitude REAL, description TEXT, userID TEXT, is_public INTEGER DEFAULT 0, created_date TEXT, created_time TEXT)''') # Create index for faster lookups (non-unique) c.execute('''CREATE INDEX IF NOT EXISTS idx_location_name_user ON locations(location_name, userID, is_public)''') conn.commit() conn.close() return True except Exception as e: logger.error(f"Location: Failed to initialize locations database: {e}") return False def save_location_to_db(location_name, lat, lon, description="", userID="", is_public=False, altitude=None): """Save a location to the SQLite database Returns: (success, message, conflict_info) conflict_info is None if no conflict, or dict with conflict details if conflict exists """ try: if not location_name or not location_name.strip(): return False, "Location name cannot be empty", None # Check if public locations are admin-only and user is not admin if is_public and get_public_location_admin_manage(): from modules.system import isNodeAdmin if not isNodeAdmin(userID): return False, "Only admins can save public locations.", None conn = sqlite3.connect(my_settings.locations_db) c = conn.cursor() location_name_clean = location_name.strip() # Check for conflicts # 1. Check if user already has a location with this name (private or public) c.execute('''SELECT location_id, is_public FROM locations WHERE location_name = ? AND userID = ?''', (location_name_clean, userID)) user_existing = c.fetchone() if user_existing: conn.close() return False, f"Location '{location_name}' already exists for your node", None # 2. Check if there's a public location with this name # Note: We allow public locations to overlap with other users' private locations # Only check for existing public locations to prevent duplicate public locations c.execute('''SELECT location_id, userID, description FROM locations WHERE location_name = ? AND is_public = 1''', (location_name_clean,)) public_existing = c.fetchone() if public_existing: if not is_public: # User is trying to create private location but public one exists # They can use "map public " to access the public location conn.close() return False, f"Public location '{location_name}' already exists. Use 'map public {location_name}' to access it.", None else: # User is trying to create public location but one already exists # Only one public location per name globally conn.close() return False, f"Public location '{location_name}' already exists", None # 3. If saving as public, we don't check for other users' private locations # This allows public locations to overlap with private location names # Insert new location now = datetime.now() c.execute('''INSERT INTO locations (location_name, latitude, longitude, altitude, description, userID, is_public, created_date, created_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''', (location_name_clean, lat, lon, altitude, description, userID, 1 if is_public else 0, now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S"))) conn.commit() conn.close() visibility = "public" if is_public else "private" logger.debug(f"Location: Saved {visibility} location '{location_name}' to database") return True, f"Location '{location_name}' saved as {visibility}", None except Exception as e: logger.error(f"Location: Failed to save location: {e}") return False, f"Error saving location: {e}", None def get_location_from_db(location_name, userID=None): """Retrieve a location from the database by name Returns: - User's private location if exists - Public location if exists - None if not found """ try: conn = sqlite3.connect(my_settings.locations_db) c = conn.cursor() location_name_clean = location_name.strip() # First, try to get user's private location if userID: c.execute('''SELECT location_name, latitude, longitude, altitude, description, userID, is_public, created_date, created_time FROM locations WHERE location_name = ? AND userID = ? AND is_public = 0''', (location_name_clean, userID)) result = c.fetchone() if result: conn.close() return { 'name': result[0], 'lat': result[1], 'lon': result[2], 'altitude': result[3], 'description': result[4], 'userID': result[5], 'is_public': bool(result[6]), 'created_date': result[7], 'created_time': result[8] } # Then try public location c.execute('''SELECT location_name, latitude, longitude, altitude, description, userID, is_public, created_date, created_time FROM locations WHERE location_name = ? AND is_public = 1''', (location_name_clean,)) result = c.fetchone() conn.close() if result: return { 'name': result[0], 'lat': result[1], 'lon': result[2], 'altitude': result[3], 'description': result[4], 'userID': result[5], 'is_public': bool(result[6]), 'created_date': result[7], 'created_time': result[8] } return None except Exception as e: logger.error(f"Location: Failed to retrieve location: {e}") return None def get_public_location_from_db(location_name): """Retrieve only a public location from the database by name (ignores private locations) Returns: - Public location if exists - None if not found """ try: conn = sqlite3.connect(my_settings.locations_db) c = conn.cursor() location_name_clean = location_name.strip() # Get only public location c.execute('''SELECT location_name, latitude, longitude, altitude, description, userID, is_public, created_date, created_time FROM locations WHERE location_name = ? AND is_public = 1''', (location_name_clean,)) result = c.fetchone() conn.close() if result: return { 'name': result[0], 'lat': result[1], 'lon': result[2], 'altitude': result[3], 'description': result[4], 'userID': result[5], 'is_public': bool(result[6]), 'created_date': result[7], 'created_time': result[8] } return None except Exception as e: logger.error(f"Location: Failed to retrieve public location: {e}") return None def list_locations_from_db(userID=None): """List saved locations Shows: - User's private locations - All public locations """ try: conn = sqlite3.connect(my_settings.locations_db) c = conn.cursor() if userID: # Get user's private locations and all public locations c.execute('''SELECT location_name, latitude, longitude, altitude, description, is_public, created_date FROM locations WHERE (userID = ? AND is_public = 0) OR is_public = 1 ORDER BY is_public ASC, location_name''', (userID,)) else: # Get all public locations only c.execute('''SELECT location_name, latitude, longitude, altitude, description, is_public, created_date FROM locations WHERE is_public = 1 ORDER BY location_name''') results = c.fetchall() # Get ALL results, no limit conn.close() if not results: return "No saved locations found" locations_list = f"Saved Locations ({len(results)} total):\n" # Return ALL results, not limited for result in results: is_public = bool(result[5]) visibility = "🌐Public" if is_public else "🔒Private" locations_list += f" • {result[0]} ({result[1]:.5f}, {result[2]:.5f})" if result[3] is not None: # altitude locations_list += f" @ {result[3]:.1f}m" locations_list += f" [{visibility}]" if result[4]: # description locations_list += f" - {result[4]}" locations_list += "\n" return locations_list.strip() except Exception as e: logger.error(f"Location: Failed to list locations: {e}") return f"Error listing locations: {e}" def delete_location_from_db(location_name, userID=""): """Delete a location from the database Returns: (success, message) """ try: if not location_name or not location_name.strip(): return False, "Location name cannot be empty" conn = sqlite3.connect(my_settings.locations_db) c = conn.cursor() location_name_clean = location_name.strip() # Check if location exists - prioritize user's private location, then public # First try to get user's private location c.execute('''SELECT location_id, userID, is_public FROM locations WHERE location_name = ? AND userID = ? AND is_public = 0''', (location_name_clean, userID)) location = c.fetchone() # If not found, try public location if not location: c.execute('''SELECT location_id, userID, is_public FROM locations WHERE location_name = ? AND is_public = 1''', (location_name_clean,)) location = c.fetchone() # If still not found, try any location (for admin delete) if not location: c.execute('''SELECT location_id, userID, is_public FROM locations WHERE location_name = ? LIMIT 1''', (location_name_clean,)) location = c.fetchone() if not location: conn.close() return False, f"Location '{location_name}' not found" location_id, location_userID, is_public = location # Check permissions # Users can only delete their own private locations # Admins can delete any location if delete_public_locations_admins_only is enabled is_admin = False if get_delete_public_locations_admins_only(): from modules.system import isNodeAdmin is_admin = isNodeAdmin(userID) # Check if user owns this location is_owner = (str(location_userID) == str(userID)) # Determine if deletion is allowed can_delete = False if is_public: # Public locations: only admins can delete if admin-only is enabled if get_delete_public_locations_admins_only(): can_delete = is_admin else: # If not admin-only, then anyone can delete public locations can_delete = True else: # Private locations: owner can always delete can_delete = is_owner if not can_delete: conn.close() if is_public and get_delete_public_locations_admins_only(): return False, "Only admins can delete public locations." else: return False, f"You can only delete your own locations. This location belongs to another user." # Delete the location c.execute('''DELETE FROM locations WHERE location_id = ?''', (location_id,)) conn.commit() conn.close() visibility = "public" if is_public else "private" logger.debug(f"Location: Deleted {visibility} location '{location_name}' from database") return True, f"Location '{location_name}' deleted" except Exception as e: logger.error(f"Location: Failed to delete location: {e}") return False, f"Error deleting location: {e}" def calculate_heading_and_distance(lat1, lon1, lat2, lon2): """Calculate heading (bearing) and distance between two points""" if lat1 == 0 and lon1 == 0: return None, None, "Current location not available" if lat2 == 0 and lon2 == 0: return None, None, "Target location not available" # Calculate distance using Haversine formula r = 6371 # Earth radius in kilometers lat1_rad = math.radians(lat1) lon1_rad = math.radians(lon1) lat2_rad = math.radians(lat2) lon2_rad = math.radians(lon2) dlon = lon2_rad - lon1_rad dlat = lat2_rad - lat1_rad a = math.sin(dlat / 2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2)**2 c = 2 * math.asin(math.sqrt(a)) distance_km = c * r # Calculate bearing x = math.sin(dlon) * math.cos(lat2_rad) y = math.cos(lat1_rad) * math.sin(lat2_rad) - (math.sin(lat1_rad) * math.cos(lat2_rad) * math.cos(dlon)) initial_bearing = math.atan2(x, y) initial_bearing = math.degrees(initial_bearing) compass_bearing = (initial_bearing + 360) % 360 return compass_bearing, distance_km, None def log_locationData_toMap(userID, location, message): """ Logs location data to a CSV file for meshing purposes. Returns True if successful, False otherwise. """ lat, lon = location if lat is None or lon is None or lat == 0.0 or lon == 0.0: return False # Set default directory to ../data/ default_dir = os.path.join(os.path.dirname(__file__), "..", "data") os.makedirs(default_dir, exist_ok=True) map_filepath = os.path.join(default_dir, "map_data.csv") # Check if the file exists to determine if we need to write headers write_header = not os.path.isfile(map_filepath) or os.path.getsize(map_filepath) == 0 try: with open(map_filepath, mode='a', newline='') as csvfile: fieldnames = ['userID', 'Latitude', 'Longitude', 'Description'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) # Write headers only if the file did not exist before or is empty if write_header: writer.writeheader() writer.writerow({ 'userID': userID, 'Latitude': lat, 'Longitude': lon, 'Description': message if message else "" }) logger.debug(f"Logged location for {userID} to {map_filepath}") return True except Exception as e: logger.error(f"Failed to log location for {userID}: {e}") return False def mapHandler(userID, deviceID, channel_number, message, snr, rssi, hop): from modules.system import get_node_location command = message[len("map"):].strip() location = get_node_location(userID, deviceID) lat = location[0] lon = location[1] """ Handles 'map' commands from meshbot. Usage: map save [description] - Save current location with a name map save public [desc] - Save public location (all can see) map - Get heading and distance to a saved location map public - Get heading to public location (ignores private) map delete - Delete a location map list - List all saved locations map log - Log current location with description (CSV, legacy) """ command = str(command) # Ensure command is always a string if command.strip().lower() == "help": return ( f"'map save [description]' - Save private\n" f"'map save public [desc]' - Save public\n" f"'map ' - heading to saved\n" f"'map public ' - heading to public\n" f"'map delete ' \n" f"'map list' - List\n" f"'map log ' - Log CSV\n" ) # Handle "save" command if command.lower().startswith("save "): save_cmd = command[5:].strip() is_public = False # Check for "public" keyword if save_cmd.lower().startswith("public "): is_public = True save_cmd = save_cmd[7:].strip() # Remove "public " prefix parts = save_cmd.split(" ", 1) if len(parts) < 1 or not parts[0]: if is_public: return "🚫Usage: map save public [description]" else: return "🚫Usage: map save [description]" location_name = parts[0] description = parts[1] if len(parts) > 1 else "" # Add SNR/RSSI info to description if available if snr is not None and rssi is not None: if description: description += f" SNR:{snr}dB RSSI:{rssi}dBm" else: description = f"SNR:{snr}dB RSSI:{rssi}dBm" if hop is not None: if description: description += f" Meta:{hop}" else: description = f"Meta:{hop}" if not location or len(location) != 2 or lat == 0 or lon == 0: return "🚫Location data is missing or invalid." # Get altitude for the node altitude = get_node_altitude(userID, deviceID) success, msg, _ = save_location_to_db(location_name, lat, lon, description, str(userID), is_public, altitude) if success: return f"📍{msg}" else: return f"🚫{msg}" # Handle "list" command if command.strip().lower() == "list": return list_locations_from_db(str(userID)) # Handle "delete" command if command.lower().startswith("delete "): location_name = command[7:].strip() # Remove "delete " prefix if not location_name: return "🚫Usage: map delete " success, msg = delete_location_from_db(location_name, str(userID)) if success: return f"🗑️{msg}" else: return f"🚫{msg}" # Handle "public" command to retrieve public locations (even if user has private with same name) if command.lower().startswith("public "): location_name = command[7:].strip() # Remove "public " prefix if not location_name: return "🚫Usage: map public " saved_location = get_public_location_from_db(location_name) if saved_location: # Calculate heading and distance from current location if not location or len(location) != 2 or lat == 0 or lon == 0: result = f"📍{saved_location['name']} (Public): {saved_location['lat']:.5f}, {saved_location['lon']:.5f}" if saved_location.get('altitude') is not None: result += f" @ {saved_location['altitude']:.1f}m" result += "\n🚫Current location not available for heading" return result bearing, distance_km, error = calculate_heading_and_distance( lat, lon, saved_location['lat'], saved_location['lon'] ) if error: return f"📍{saved_location['name']} (Public): {error}" # Format distance if my_settings.use_metric: distance_str = f"{distance_km:.2f} km" else: distance_miles = distance_km * 0.621371 if distance_miles < 0.25: # Convert to feet for short distances distance_feet = distance_miles * 5280 distance_str = f"{distance_feet:.0f} ft" else: distance_str = f"{distance_miles:.2f} miles" # Format bearing with cardinal direction bearing_rounded = round(bearing) cardinal = "" if bearing_rounded == 0 or bearing_rounded == 360: cardinal = "N" elif bearing_rounded == 90: cardinal = "E" elif bearing_rounded == 180: cardinal = "S" elif bearing_rounded == 270: cardinal = "W" elif 0 < bearing_rounded < 90: cardinal = "NE" elif 90 < bearing_rounded < 180: cardinal = "SE" elif 180 < bearing_rounded < 270: cardinal = "SW" elif 270 < bearing_rounded < 360: cardinal = "NW" result = f"📍{saved_location['name']} (Public)\n" result += f"🧭Heading: {bearing_rounded}° {cardinal}\n" result += f"📏Distance: {distance_str}" # Calculate altitude difference if both are available current_altitude = get_node_altitude(userID, deviceID) saved_altitude = saved_location.get('altitude') if current_altitude is not None and saved_altitude is not None: altitude_diff_m = saved_altitude - current_altitude # message altitude - DB altitude altitude_diff_ft = altitude_diff_m * 3.28084 # Convert meters to feet altitude_diff_ft_rounded = round(altitude_diff_ft) # Round to nearest foot if altitude_diff_ft_rounded > 0: result += f"\n⛰️Altitude: +{altitude_diff_ft_rounded}ft" # Message is higher elif altitude_diff_ft_rounded < 0: result += f"\n⛰️Altitude: {altitude_diff_ft_rounded}ft" # Message is lower (negative already has -) else: result += f"\n⛰️Altitude: ±0ft" if saved_location['description']: result += f"\n📝{saved_location['description']}" return result else: return f"🚫Public location '{location_name}' not found." # Handle "log" command for CSV logging if command.lower().startswith("log "): description = command[4:].strip() # Remove "log " prefix # if no description provided, set to default if not description: description = "Logged:" # Sanitize description for CSV injection if description and description[0] in ('=', '+', '-', '@'): description = "'" + description # if there is SNR and RSSI info, append to description if snr is not None and rssi is not None: description += f" SNR:{snr}dB RSSI:{rssi}dBm" # if there is hop info, append to description if hop is not None: description += f" Meta:{hop}" # location should be a tuple: (lat, lon) if not location or len(location) != 2: return "🚫Location data is missing or invalid." success = log_locationData_toMap(userID, location, description) if success: return f"📍Location logged (CSV)" else: return "🚫Failed to log location. Please try again." # Handle location name lookup (get heading) if command.strip(): location_name = command.strip() saved_location = get_location_from_db(location_name, str(userID)) if saved_location: # Calculate heading and distance from current location if not location or len(location) != 2 or lat == 0 or lon == 0: result = f"📍{saved_location['name']}: {saved_location['lat']:.5f}, {saved_location['lon']:.5f}" if saved_location.get('altitude') is not None: result += f" @ {saved_location['altitude']:.1f}m" result += "\n🚫Current location not available for heading" return result bearing, distance_km, error = calculate_heading_and_distance( lat, lon, saved_location['lat'], saved_location['lon'] ) if error: return f"📍{saved_location['name']}: {error}" # Format distance if my_settings.use_metric: distance_str = f"{distance_km:.2f} km" else: distance_miles = distance_km * 0.621371 if distance_miles < 0.25: # Convert to feet for short distances distance_feet = distance_miles * 5280 distance_str = f"{distance_feet:.0f} ft" else: distance_str = f"{distance_miles:.2f} miles" # Format bearing with cardinal direction bearing_rounded = round(bearing) cardinal = "" if bearing_rounded == 0 or bearing_rounded == 360: cardinal = "N" elif bearing_rounded == 90: cardinal = "E" elif bearing_rounded == 180: cardinal = "S" elif bearing_rounded == 270: cardinal = "W" elif 0 < bearing_rounded < 90: cardinal = "NE" elif 90 < bearing_rounded < 180: cardinal = "SE" elif 180 < bearing_rounded < 270: cardinal = "SW" elif 270 < bearing_rounded < 360: cardinal = "NW" result = f"📍{saved_location['name']}\n" result += f"🧭Heading: {bearing_rounded}° {cardinal}\n" result += f"📏Distance: {distance_str}" # Calculate altitude difference if both are available current_altitude = get_node_altitude(userID, deviceID) saved_altitude = saved_location.get('altitude') if current_altitude is not None and saved_altitude is not None: altitude_diff_m = saved_altitude - current_altitude # message altitude - DB altitude altitude_diff_ft = altitude_diff_m * 3.28084 # Convert meters to feet altitude_diff_ft_rounded = round(altitude_diff_ft) # Round to nearest foot if altitude_diff_ft_rounded > 0: result += f"\n⛰️Altitude: +{altitude_diff_ft_rounded}ft" # Message is higher elif altitude_diff_ft_rounded < 0: result += f"\n⛰️Altitude: {altitude_diff_ft_rounded}ft" # Message is lower (negative already has -) else: result += f"\n⛰️Altitude: ±0ft" if saved_location['description']: result += f"\n📝{saved_location['description']}" return result else: # Location not found return f"🚫Location '{location_name}' not found. Use 'map list' to see available locations." # Empty command - show help return "🗺️Use 'map help' for help" # Initialize the locations database when module is imported initialize_locations_database()