Files
meshing-around/modules/locationdata.py
2026-02-27 14:37:32 -08:00

1998 lines
85 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# helper functions to use location data for the API for NOAA weather, FEMA iPAWS, and repeater data
# K7MHI Kelly Keeton 2024
import json # pip install json
from geopy.geocoders import Nominatim # pip install geopy
import maidenhead as mh # pip install maidenhead
import requests # pip install requests
import bs4 as bs # pip install beautifulsoup4
import xml.dom.minidom # used for parsing XML
import xml.parsers.expat # used for parsing XML
from datetime import datetime
from modules.log import logger
import modules.settings as my_settings
import math
import csv
import os
import sqlite3
trap_list_location = ("whereami", "wx", "wxa", "wxalert", "rlist", "ea", "ealert", "riverflow", "valert", "earthquake", "howfar", "map",)
def where_am_i(lat=0, lon=0, short=False, zip=False):
whereIam = ""
grid = mh.to_maiden(float(lat), float(lon))
location = lat, lon
if int(float(lat)) == 0 and int(float(lon)) == 0:
logger.error("Location: No GPS data, try sending location")
return my_settings.NO_DATA_NOGPS
# initialize Nominatim API
geolocator = Nominatim(user_agent="mesh-bot")
try:
# Nomatim API call to get address
if short:
location = geolocator.reverse(lat + ", " + lon)
address = location.raw['address']
address_components = ['city', 'state', 'county', 'country']
whereIam = f"City: {address.get('city', '')}. State: {address.get('state', '')}. County: {address.get('county', '')}. Country: {address.get('country', '')}."
return whereIam
if zip:
# return a string with zip code only
location = geolocator.reverse(str(lat) + ", " + str(lon))
whereIam = location.raw['address'].get('postcode', '')
return whereIam
if float(lat) == my_settings.latitudeValue and float(lon) == my_settings.longitudeValue:
# redacted address when no GPS and using default location
location = geolocator.reverse(str(lat) + ", " + str(lon))
address = location.raw['address']
address_components = {
'city': 'City',
'state': 'State',
'postcode': 'Zip',
'county': 'County',
'country': 'Country'
}
whereIam += ', '.join([f"{label}: {address.get(component, '')}" for component, label in address_components.items() if component in address])
else:
location = geolocator.reverse(lat + ", " + lon)
address = location.raw['address']
address_components = {
'house_number': 'Number',
'road': 'Road',
'city': 'City',
'state': 'State',
'postcode': 'Zip',
'county': 'County',
'country': 'Country'
}
whereIam += ', '.join([f"{label}: {address.get(component, '')}" for component, label in address_components.items() if component in address])
whereIam += f", Grid: " + grid
return whereIam
except Exception as e:
logger.debug("Location:Error fetching location data with whereami, likely network error")
return my_settings.ERROR_FETCHING_DATA
def getRepeaterBook(lat=0, lon=0):
grid = mh.to_maiden(float(lat), float(lon))
data = []
# check if in the US or not
usapi ="https://www.repeaterbook.com/repeaters/prox_result.php?"
elsewhereapi = "https://www.repeaterbook.com/row_repeaters/prox2_result.php?"
if grid[:2] in ['CN', 'DN', 'EN', 'FN', 'CM', 'DM', 'EM', 'FM', 'DL', 'EL', 'FL']:
repeater_url = usapi
logger.debug("Location: Fetching repeater data from RepeaterBook US API for grid " + grid)
else:
repeater_url = elsewhereapi
logger.debug("Location: Fetching repeater data from RepeaterBook International API for grid " + grid)
repeater_url += f"city={grid}&lat=&long=&distance=50&Dunit=m&band%5B%5D=4&band%5B%5D=16&freq=&call=&mode%5B%5D=1&mode%5B%5D=2&mode%5B%5D=4&mode%5B%5D=64&status_id=1&use=%25&use=OPEN&order=distance_calc%2C+state_id+ASC"
try:
msg = ''
user_agent = {'User-agent': 'Mozilla/5.0'}
response = requests.get(repeater_url, headers=user_agent, timeout=my_settings.urlTimeoutSeconds)
# Fail early on bad HTTP status
if response.status_code != 200:
logger.error(f"Location:Error fetching repeater data from {repeater_url} with status code {response.status_code}")
return my_settings.ERROR_FETCHING_DATA
soup = bs.BeautifulSoup(response.text, 'html.parser')
# match the repeater table by presence of the "sortable" class (class order/extra classes may vary)
table = soup.select_one('table.sortable')
if table is not None:
cells = table.find_all('td')
data = []
for i in range(0, len(cells), 11):
if i + 10 < len(cells): #avoid IndexError
repeater = {
'frequency': cells[i].text.strip() if i < len(cells) else 'N/A',
'offset': cells[i + 1].text.strip() if i + 1 < len(cells) else 'N/A',
'tone': cells[i + 2].text.strip() if i + 2 < len(cells) else 'N/A',
'call_sign': cells[i + 3].text.strip() if i + 3 < len(cells) else 'N/A',
'location': cells[i + 4].text.strip() if i + 4 < len(cells) else 'N/A',
'state': cells[i + 5].text.strip() if i + 5 < len(cells) else 'N/A',
'use': cells[i + 6].text.strip() if i + 6 < len(cells) else 'N/A',
'mode': cells[i + 7].text.strip() if i + 7 < len(cells) else 'N/A',
'distance': cells[i + 8].text.strip() if i + 8 < len(cells) else 'N/A',
'direction': cells[i + 9].text.strip() if i + 9 < len(cells) else 'N/A'
}
data.append(repeater)
else:
# No table found — could be legitimately no data or markup change.
logger.debug("Location: No repeater table found on RepeaterBook page, scraping failed or no data for region.")
msg = "No Data for your Region"
except Exception as e:
msg = "No repeaters found 😔"
# Limit the output to the first 4 repeaters
for repeater in data[:4]:
tmpTone = repeater['tone'].replace(" /", "")
msg += f"{repeater['call_sign']}📶{repeater['frequency']}{repeater['offset']},{tmpTone}.{repeater['mode']}"
if repeater != data[:4][-1]: msg += '\n'
return msg
def getArtSciRepeaters(lat=0, lon=0):
# UK api_url = "https://api-beta.rsgb.online/all/systems"
#grid = mh.to_maiden(float(lat), float(lon))
repeaters = []
zipCode = where_am_i(lat, lon, zip=True)
if zipCode == my_settings.NO_DATA_NOGPS or zipCode == my_settings.ERROR_FETCHING_DATA:
return zipCode
if zipCode.isnumeric():
try:
artsci_url = f"http://www.artscipub.com/mobile/showstate.asp?zip={zipCode}"
response = requests.get(artsci_url, timeout=my_settings.urlTimeoutSeconds)
if response.status_code!=200:
logger.error(f"Location:Error fetching data from {artsci_url} with status code {response.status_code}")
soup = bs.BeautifulSoup(response.text, 'html.parser')
# results needed xpath is /html/body/table[2]/tbody/tr/td/table/tbody/tr[2]/td/table
table = soup.find_all('table')[1]
rows = table.find_all('tr')
for row in rows:
cols = row.find_all('td')
cols = [ele.text.strip() for ele in cols]
# if no elements have the word 'located' then append
if not any('located' in ele for ele in cols):
if not any('Location' in ele for ele in cols):
repeaters.append([ele for ele in cols if ele])
except Exception as e:
logger.error(f"Error fetching data from {artsci_url}: {e}")
if repeaters != []:
msg = f"Found:{len(repeaters)} in {zipCode}\n"
for repeater in repeaters:
# format is ['City', 'Frequency', 'Offset', 'PL', 'Call', 'Notes']
# there might be missing elements or only one element
if len(repeater) == 2:
msg += f"Freq:{repeater[1]}"
elif len(repeater) == 3:
msg += f"Freq:{repeater[1]}, PL:{repeater[2]}"
elif len(repeater) == 4:
msg += f"Freq:{repeater[1]}, PL:{repeater[2]}, ID: {repeater[3]}"
elif len(repeater) == 5:
msg += f"Freq:{repeater[1]}, PL:{repeater[2]}, ID:{repeater[3]}"
elif len(repeater) == 6:
msg += f"Freq:{repeater[1]}, PL:{repeater[2]}, ID:{repeater[3]}. {repeater[5]}"
if repeater != repeaters[-1]:
msg += "\n"
else:
msg = f"no results.. sorry"
return msg
def get_NOAAtide(lat=0, lon=0):
station_id = ""
location = lat,lon
if float(lat) == 0 and float(lon) == 0:
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
station_lookup_url = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/tidepredstations.json?lat=" + str(lat) + "&lon=" + str(lon) + "&radius=50"
try:
station_data = requests.get(station_lookup_url, timeout=my_settings.urlTimeoutSeconds)
if station_data.ok:
station_json = station_data.json()
else:
logger.error("Location:Error fetching tide station table from NOAA")
return my_settings.ERROR_FETCHING_DATA
if station_json['stationList'] == [] or station_json['stationList'] is None:
logger.error("Location:No tide station found")
return "No tide station found with info provided"
station_id = station_json['stationList'][0]['stationId']
except (requests.exceptions.RequestException, json.JSONDecodeError):
logger.error("Location:Error fetching tide station table from NOAA")
return my_settings.ERROR_FETCHING_DATA
station_url = "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=today&time_zone=lst_ldt&datum=MLLW&product=predictions&interval=hilo&format=json&station=" + station_id
if my_settings.use_metric:
station_url += "&units=metric"
else:
station_url += "&units=english"
try:
tide_data = requests.get(station_url, timeout=my_settings.urlTimeoutSeconds)
if tide_data.ok:
tide_json = tide_data.json()
else:
logger.error("Location:Error fetching tide data from NOAA")
return my_settings.ERROR_FETCHING_DATA
except (requests.exceptions.RequestException, json.JSONDecodeError):
logger.error("Location:Error fetching tide data from NOAA")
return my_settings.ERROR_FETCHING_DATA
tide_data = tide_json['predictions']
# format tide data into a table string for mesh
# get the date out of the first t value
tide_date = tide_data[0]['t'].split(" ")[0]
tide_table = "Tide Data for " + tide_date + "\n"
for tide in tide_data:
tide_time = tide['t'].split(" ")[1]
if not my_settings.zuluTime:
# convert to 12 hour clock
if int(tide_time.split(":")[0]) > 12:
tide_time = str(int(tide_time.split(":")[0]) - 12) + ":" + tide_time.split(":")[1] + " PM"
else:
tide_time = tide_time + " AM"
tide_table += tide['type'] + " " + tide_time + ", " + tide['v'] + "\n"
# remove last newline
tide_table = tide_table[:-1]
return tide_table
def get_NOAAweather(lat=0, lon=0, unit=0, report_days=None):
# get weather report from NOAA for forecast detailed
weather = ""
location = lat,lon
if float(lat) == 0 and float(lon) == 0:
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
if report_days is None:
report_days = my_settings.forecastDuration
# get weather data from NOAA units for metric unit = 1 is metric
if my_settings.use_metric:
unit = 1
logger.debug("Location: new API metric units not implemented yet")
weather_url = "https://forecast.weather.gov/MapClick.php?FcstType=text&lat=" + str(lat) + "&lon=" + str(lon)
weather_api = "https://api.weather.gov/points/" + str(lat) + "," + str(lon)
# extract the "forecast": property from the JSON response
try:
weather_data = requests.get(weather_api, timeout=my_settings.urlTimeoutSeconds)
if not weather_data.ok:
logger.warning("Location:Error fetching weather data from NOAA for location")
return my_settings.ERROR_FETCHING_DATA
except Exception:
logger.warning(f"Location:Error fetching weather data malformed: {Exception}")
return my_settings.ERROR_FETCHING_DATA
# get the forecast URL from the JSON response
weather_json = weather_data.json()
forecast_url = weather_json['properties']['forecast']
try:
forecast_data = requests.get(forecast_url, timeout=my_settings.urlTimeoutSeconds)
if not forecast_data.ok:
logger.warning("Location:Error fetching weather forecast from NOAA")
return my_settings.ERROR_FETCHING_DATA
except Exception:
logger.warning(f"Location:Error fetching weather data missing: {Exception}")
return my_settings.ERROR_FETCHING_DATA
# from periods, get the detailedForecast from number of days in NOAAforecastDuration
forecast_json = forecast_data.json()
forecast = forecast_json['properties']['periods']
for day in forecast[:report_days]:
# abreviate the forecast
weather += abbreviate_noaa(day['name']) + ": " + abbreviate_noaa(day['detailedForecast']) + "\n"
# remove last newline
weather = weather[:-1]
# get any alerts and return the count
alerts = getWeatherAlertsNOAA(lat, lon)
if alerts == my_settings.ERROR_FETCHING_DATA or alerts == my_settings.NO_DATA_NOGPS or alerts == my_settings.NO_ALERTS:
alert = ""
alert_num = 0
else:
alert = alerts[0]
alert_num = alerts[1]
if int(alert_num) > 0:
# add the alert count warning to the weather
weather = str(alert_num) + " local alerts!\n" + weather + "\n" + alert
return weather
def case_insensitive_replace(text, old, new):
"""Replace all occurrences of old (any case) in text with new."""
idx = 0
old_lower = old.lower()
text_lower = text.lower()
while True:
idx = text_lower.find(old_lower, idx)
if idx == -1:
break
text = text[:idx] + new + text[idx+len(old):]
text_lower = text.lower()
idx += len(new)
return text
def abbreviate_noaa(data=""):
# Long phrases (with spaces)
phrase_replacements = {
"less than a tenth of an inch possible": "< 0.1in",
"between a tenth and quarter of an inch possible": "0.1-0.25in",
"between a quarter and half an inch possible": "0.25-0.5in",
"between a half and three quarters of an inch possible": "0.5-0.75in",
"between one and two inches possible": "1-2in",
"between two and three inches possible": "2-3in",
"between three and four inches possible": "3-4in",
"between four and five inches possible": "4-5in",
"between five and six inches possible": "5-6in",
"between six and eight inches possible": "6-8in",
"gusts as high as": "gusts to",
}
# Single words (no spaces)
word_replacements = {
"monday": "Mon",
"tuesday": "Tue",
"wednesday": "Wed",
"thursday": "Thu",
"friday": "Fri",
"saturday": "Sat",
"sunday": "Sun",
"northwest": "NW",
"northeast": "NE",
"southwest": "SW",
"southeast": "SE",
"north": "N",
"south": "S",
"east": "E",
"west": "W",
"accumulation": "accum",
"visibility": "vis",
"precipitation": "precip",
"showers": "shwrs",
"thunderstorms": "t-storms",
"thunderstorm": "t-storm",
"quarters": "qtrs",
"quarter": "qtr",
"january": "Jan",
"february": "Feb",
"march": "Mar",
"april": "Apr",
"may": "May",
"june": "Jun",
"july": "Jul",
"august": "Aug",
"september": "Sep",
"october": "Oct",
"november": "Nov",
"december": "Dec",
"degrees": "°",
"percent": "%",
"department": "Dept.",
"temperatures": "temps:",
"temperature": "temp:",
"amounts": "amts:",
"afternoon": "Aftn",
"around": "~",
"evening": "Eve",
}
text = data
# Replace long phrases (case-insensitive)
for key in sorted(phrase_replacements, key=len, reverse=True):
value = phrase_replacements[key]
text = case_insensitive_replace(text, key, value)
# Replace single words (case-insensitive)
for key in word_replacements:
value = word_replacements[key]
text = case_insensitive_replace(text, key, value)
return text
def getWeatherAlertsNOAA(lat=0, lon=0, useDefaultLatLon=False):
# get weather alerts from NOAA limited to ALERT_COUNT with the total number of alerts found
alerts = ""
location = lat,lon
if useDefaultLatLon:
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
if float(lat) == 0 and float(lon) == 0 and not useDefaultLatLon:
return my_settings.NO_DATA_NOGPS
alert_url = "https://api.weather.gov/alerts/active.atom?point=" + str(lat) + "," + str(lon)
#alert_url = "https://api.weather.gov/alerts/active.atom?area=WA"
#logger.debug("Location:Fetching weather alerts from NOAA for " + str(lat) + ", " + str(lon))
try:
alert_data = requests.get(alert_url, timeout=my_settings.urlTimeoutSeconds)
if not alert_data.ok:
logger.warning("Location:Error fetching weather alerts from NOAA bad data")
return my_settings.ERROR_FETCHING_DATA
except Exception as e:
logger.warning(f"Location:Error fetching weather alert request error: {type(e).__name__}: {e}")
return my_settings.ERROR_FETCHING_DATA
alerts = ""
alertxml = xml.dom.minidom.parseString(alert_data.text)
for i in alertxml.getElementsByTagName("entry"):
title = i.getElementsByTagName("title")[0].childNodes[0].nodeValue
area_desc_nodes = i.getElementsByTagName("cap:areaDesc")
if area_desc_nodes and area_desc_nodes[0].childNodes:
area_desc = area_desc_nodes[0].childNodes[0].nodeValue
else:
area_desc = ""
# Extract NWSheadline from cap:parameter if present
nws_headline = ""
for param in i.getElementsByTagName("cap:parameter"):
try:
value_name = param.getElementsByTagName("valueName")[0].childNodes[0].nodeValue
if value_name == "NWSheadline":
nws_headline = param.getElementsByTagName("value")[0].childNodes[0].nodeValue
break
except Exception:
continue
# If title is "Special Weather Statement" and headline exists, use headline only
if "special" in title.lower() and nws_headline:
main_alert = nws_headline
else:
main_alert = title
if my_settings.enableExtraLocationWx:
# adds location data which is too much data?
alerts += f"{main_alert}. {area_desc.replace(' ', '')}"
# Only add headline if not already used as main_alert
# if nws_headline and main_alert != nws_headline:
# alerts += f" ALERT: {nws_headline}"
alerts += "\n"
else:
alerts += f"{main_alert}"
# Only add headline if not already used as main_alert
# if nws_headline and main_alert != nws_headline:
# alerts += f" ALERT: {nws_headline}"
alerts += "\n"
if alerts == "" or alerts == None:
return my_settings.NO_ALERTS
# trim off last newline
if alerts[-1] == "\n":
alerts = alerts[:-1]
# get the number of alerts
alert_num = 0
alert_num = len(alerts.split("\n"))
alerts = abbreviate_noaa(alerts)
# return the first ALERT_COUNT alerts
data = "\n".join(alerts.split("\n")[:my_settings.numWxAlerts]), alert_num
return data
wxAlertCacheNOAA = ""
def alertBrodcastNOAA():
# get the latest weather alerts and broadcast them if there are any
global wxAlertCacheNOAA
currentAlert = getWeatherAlertsNOAA(my_settings.latitudeValue, my_settings.longitudeValue)
# check if any reason to discard the alerts
if currentAlert == my_settings.ERROR_FETCHING_DATA or currentAlert == my_settings.NO_DATA_NOGPS:
return False
elif currentAlert == my_settings.NO_ALERTS:
wxAlertCacheNOAA = ""
return False
if my_settings.ignoreEASenable:
# check if the alert is in the ignoreEAS list
for word in my_settings.ignoreEASwords:
if word.lower() in currentAlert[0].lower():
logger.debug(f"Location:Ignoring NOAA Alert: {currentAlert[0]} containing {word}")
return False
# broadcast the alerts send to wxBrodcastCh
elif currentAlert[0] not in wxAlertCacheNOAA:
# Check if the current alert is not in the weather alert cache
wxAlertCacheNOAA = currentAlert[0]
return currentAlert
return False
def getActiveWeatherAlertsDetailNOAA(lat=0, lon=0):
# get the latest details of weather alerts from NOAA
alerts = ""
location = lat,lon
if float(lat) == 0 and float(lon) == 0:
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
alert_url = "https://api.weather.gov/alerts/active.atom?point=" + str(lat) + "," + str(lon)
#alert_url = "https://api.weather.gov/alerts/active.atom?area=WA"
#logger.debug("Location:Fetching weather alerts detailed from NOAA for " + str(lat) + ", " + str(lon))
try:
alert_data = requests.get(alert_url, timeout=my_settings.urlTimeoutSeconds)
if not alert_data.ok:
logger.warning("Location:Error fetching weather alerts from NOAA bad data")
return my_settings.ERROR_FETCHING_DATA
except Exception as e:
logger.warning(f"Location:Error fetching active weather alert request error: {type(e).__name__}: {e}")
return my_settings.ERROR_FETCHING_DATA
alerts = ""
alertxml = xml.dom.minidom.parseString(alert_data.text)
for i in alertxml.getElementsByTagName("entry"):
summary = i.getElementsByTagName("summary")[0].childNodes[0].nodeValue
summary = summary.replace("\n\n", " ")
summary = summary.replace("\n", " ")
summary = summary.replace("*", "\n")
alerts += (
i.getElementsByTagName("title")[0].childNodes[0].nodeValue +
summary +
"\n***\n"
)
alerts = abbreviate_noaa(alerts)
# trim the alerts to the first ALERT_COUNT
alerts = alerts.split("\n***\n")[:my_settings.numWxAlerts]
if alerts == "" or alerts == ['']:
return my_settings.NO_ALERTS
# trim off last newline
if alerts[-1] == "\n":
alerts = alerts[:-1]
alerts = "\n".join(alerts)
return alerts
def getIpawsAlert(lat=0, lon=0, shortAlerts = False):
# get the latest IPAWS alert from FEMA
alert = ''
alerts = []
linked_data = ''
# set the API URL for IPAWS
namespace = "urn:oasis:names:tc:emergency:cap:1.2"
alert_url = "https://apps.fema.gov/IPAWSOPEN_EAS_SERVICE/rest/feed"
# get the alerts from FEMA
try:
alert_data = requests.get(alert_url, timeout=my_settings.urlTimeoutSeconds)
if not alert_data.ok:
logger.warning(f"System: iPAWS fetching IPAWS alerts from FEMA (HTTP {alert_data.status_code})")
return my_settings.ERROR_FETCHING_DATA
except Exception as e:
logger.warning(f"System: iPAWS fetching IPAWS alerts from FEMA failed: {e}")
return my_settings.ERROR_FETCHING_DATA
# main feed bulletins
alertxml = xml.dom.minidom.parseString(alert_data.text)
# extract alerts from main feed
for entry in alertxml.getElementsByTagName("entry"):
link = entry.getElementsByTagName("link")[0].getAttribute("href")
## state FIPS
## This logic is being added to reduce load on FEMA server.
stateFips = None
for cat in entry.getElementsByTagName("category"):
if cat.getAttribute("label") == "statefips":
stateFips = cat.getAttribute("term")
break
if stateFips is None:
# no stateFIPS found — skip
continue
# check if it matches your list
if stateFips not in my_settings.myStateFIPSList:
#logger.debug(f"Skipping FEMA record link {link} with stateFIPS code of: {stateFips} because it doesn't match our StateFIPSList {my_settings.myStateFIPSList}")
continue # skip to next entry
try:
# get the linked alert data from FEMA
linked_data = requests.get(link, timeout=my_settings.urlTimeoutSeconds)
if not linked_data.ok or not linked_data.text.strip():
# if the linked data is not ok, skip this alert
#logger.warning(f"System: iPAWS Error fetching linked alert data from {link}")
continue
else:
linked_xml = xml.dom.minidom.parseString(linked_data.text)
# this alert is a full CAP alert
except (requests.exceptions.RequestException):
logger.warning(f"System: iPAWS Error fetching embedded alert data from {link}")
continue
except xml.parsers.expat.ExpatError:
logger.warning(f"System: iPAWS Error parsing XML from {link}")
continue
except Exception as e:
logger.debug(f"System: iPAWS Error processing alert data from {link}: {e}")
continue
for info in linked_xml.getElementsByTagName("info"):
# only get en-US language alerts (alternative is es-US)
language_nodes = info.getElementsByTagName("language")
if not any(node.firstChild and node.firstChild.nodeValue.strip() == "en-US" for node in language_nodes):
continue # skip if not en-US
# extract values from XML
sameVal = "NONE"
geocode_value = "NONE"
description = ""
try:
eventCode_table = info.getElementsByTagName("eventCode")[0]
alertType = eventCode_table.getElementsByTagName("valueName")[0].childNodes[0].nodeValue
alertCode = eventCode_table.getElementsByTagName("value")[0].childNodes[0].nodeValue
headline = info.getElementsByTagName("headline")[0].childNodes[0].nodeValue
# use headline if no description
if info.getElementsByTagName("description") and info.getElementsByTagName("description")[0].childNodes:
description = info.getElementsByTagName("description")[0].childNodes[0].nodeValue
else:
description = headline
area_table = info.getElementsByTagName("area")[0]
areaDesc = area_table.getElementsByTagName("areaDesc")[0].childNodes[0].nodeValue
geocode_table = area_table.getElementsByTagName("geocode")[0]
geocode_type = geocode_table.getElementsByTagName("valueName")[0].childNodes[0].nodeValue
geocode_value = geocode_table.getElementsByTagName("value")[0].childNodes[0].nodeValue
if geocode_type == "SAME":
sameVal = geocode_value
except Exception as e:
logger.debug(f"System: iPAWS Error extracting alert data: {link}")
#print(f"DEBUG: {info.toprettyxml()}")
continue
# check if the alert is for the SAME location, if wanted keep alert
if (sameVal in my_settings.mySAMEList) or (geocode_value in my_settings.mySAMEList) or my_settings.mySAMEList == ['']:
ignore_alert = False
if my_settings.ignoreFEMAenable:
ignore_alert = any(
word.lower() in headline.lower()
for word in my_settings.ignoreFEMAwords)
if ignore_alert:
logger.debug(f"System: Filtering FEMA Alert by WORD: {headline} containing one of {my_settings.ignoreFEMAwords} at {areaDesc}")
if ignore_alert:
continue
# add to alert list
alerts.append({
'alertType': alertType,
'alertCode': alertCode,
'headline': headline,
'areaDesc': areaDesc,
'geocode_type': geocode_type,
'geocode_value': geocode_value,
'description': description
})
else:
logger.debug(f"System: iPAWS Alert not in SAME List: {sameVal} or {geocode_value} for {headline} at {areaDesc}")
continue
# return the numWxAlerts of alerts
if len(alerts) > 0:
for alertItem in alerts[:my_settings.numWxAlerts]:
if shortAlerts:
alert += abbreviate_noaa(f"🚨FEMA Alert: {alertItem['headline']}")
else:
alert += abbreviate_noaa(f"🚨FEMA Alert: {alertItem['headline']}\n{alertItem['description']}")
# add a newline if not the last alert
if alertItem != alerts[:my_settings.numWxAlerts][-1]:
alert += "\n"
else:
alert = my_settings.NO_ALERTS
return alert
def get_flood_noaa(lat=0, lon=0, uid=None):
"""
Fetch the latest flood alert from NOAA for a given gauge UID.
Returns a formatted string or an error message.
"""
api_url = "https://api.water.noaa.gov/nwps/v1/gauges/"
headers = {'accept': 'application/json'}
if not uid:
logger.warning(f"Location:No flood gauge data found for UID {uid}")
return my_settings.ERROR_FETCHING_DATA
try:
response = requests.get(api_url + str(uid), headers=headers, timeout=my_settings.urlTimeoutSeconds)
if not response.ok:
logger.warning(f"Location:Error fetching flood gauge data from NOAA for {uid} (HTTP {response.status_code})")
return my_settings.ERROR_FETCHING_DATA
data = response.json()
if not data or 'status' not in data:
logger.warning(f"Location:No flood gauge data found for UID {uid}")
return "No flood gauge data found"
except requests.exceptions.RequestException as e:
logger.warning(f"Location:Error fetching flood gauge data from: {api_url}{uid} ({e})")
return my_settings.ERROR_FETCHING_DATA
except Exception as e:
logger.warning(f"Location:Unexpected error: {e}")
return my_settings.ERROR_FETCHING_DATA
# extract values from JSON safely
try:
name = data.get('name', 'Unknown')
observed = data['status'].get('observed', {})
forecast = data['status'].get('forecast', {})
flood_data = f"Flood Data {name}:\n"
flood_data += f"Observed: {observed.get('primary', '?')}{observed.get('primaryUnit', '')} ({observed.get('secondary', '?')}{observed.get('secondaryUnit', '')}) risk: {observed.get('floodCategory', '?')}"
flood_data += f"\nForecast: {forecast.get('primary', '?')}{forecast.get('primaryUnit', '')} ({forecast.get('secondary', '?')}{forecast.get('secondaryUnit', '')}) risk: {forecast.get('floodCategory', '?')}"
#flood_data += f"\nStage: {data.get('stage', '?')} {data.get('stageUnit', '')}, Flow: {data.get('flow', '?')} {data.get('flowUnit', '')}"
#flood_data += f"\nLast Updated: {data.get('status', {}).get('lastUpdated', '?')}"
flood_data += f"\n"
return flood_data
except Exception as e:
logger.debug(f"Location:Error extracting flood gauge data from NOAA for {uid}: {e}")
return my_settings.ERROR_FETCHING_DATA
def get_volcano_usgs(lat=0, lon=0):
alerts = ''
if lat == 0 and lon == 0:
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
# get the latest volcano alert from USGS from CAP feed
usgs_volcano_url = "https://volcanoes.usgs.gov/hans-public/api/volcano/getCapElevated"
try:
volcano_data = requests.get(usgs_volcano_url, timeout=my_settings.urlTimeoutSeconds)
if not volcano_data.ok:
logger.warning("System: Issue with fetching volcano alerts from USGS")
return my_settings.ERROR_FETCHING_DATA
except (requests.exceptions.RequestException):
logger.warning("System: Issue with fetching volcano alerts from USGS")
return my_settings.ERROR_FETCHING_DATA
volcano_json = volcano_data.json()
# extract alerts from main feed
if volcano_json and isinstance(volcano_json, list):
for alert in volcano_json:
# check ignore list
if my_settings.ignoreUSGSEnable:
for word in my_settings.ignoreUSGSwords:
if word.lower() in alert['volcano_name_appended'].lower():
logger.debug(f"System: Ignoring USGS Alert: {alert['volcano_name_appended']} containing {word}")
continue
# check if the alert lat long is within the range of bot latitudeValue and longitudeValue
if (alert['latitude'] >= my_settings.latitudeValue - 10 and alert['latitude'] <= my_settings.latitudeValue + 10) and (alert['longitude'] >= my_settings.longitudeValue - 10 and alert['longitude'] <= my_settings.longitudeValue + 10):
volcano_name = alert['volcano_name_appended']
alert_level = alert['alert_level']
color_code = alert['color_code']
cap_severity = alert['cap_severity']
synopsis = alert['synopsis']
# format Alert
alerts += f"🌋🚨: {volcano_name}, {alert_level} {color_code}, {cap_severity}.\n{synopsis}\n"
else:
#logger.debug(f"System: USGS volcano alert not in range: {alert['volcano_name_appended']}")
continue
else:
logger.debug("Location:Error fetching volcano data from USGS")
return my_settings.NO_ALERTS
if alerts == "":
return my_settings.NO_ALERTS
# trim off last newline
if alerts[-1] == "\n":
alerts = alerts[:-1]
# return the alerts
alerts = abbreviate_noaa(alerts)
return alerts
def get_nws_marine(zone, days=3):
# forecast from NWS coastal products
try:
marine_pz_data = requests.get(zone, timeout=my_settings.urlTimeoutSeconds)
if not marine_pz_data.ok:
logger.warning(f"Location:Error fetching NWS Marine data (HTTP {marine_pz_data.status_code})")
return my_settings.ERROR_FETCHING_DATA
except requests.exceptions.RequestException as e:
logger.warning(f"Location:Error fetching NWS Marine data: {e}")
return my_settings.ERROR_FETCHING_DATA
marine_pz_data = marine_pz_data.text
todayDate = datetime.now().strftime("%Y%m%d")
if marine_pz_data and marine_pz_data.startswith("Expires:"):
try:
expires = marine_pz_data.split(";;")[0].split(":")[1]
expires_date = expires[:8]
if expires_date < todayDate:
logger.debug("Location: NWS Marine PZ data expired")
return my_settings.ERROR_FETCHING_DATA
except Exception as e:
logger.debug(f"Location: NWS Marine PZ data parse error: {e}")
return my_settings.ERROR_FETCHING_DATA
else:
logger.debug("Location: NWS Marine PZ data not valid or empty")
return my_settings.ERROR_FETCHING_DATA
# process the marine forecast data
marine_pzz_lines = marine_pz_data.split("\n")
marine_pz_report = ""
day_blocks = []
current_block = ""
in_forecast = False
for line in marine_pzz_lines:
if line.startswith(".") and "..." in line:
in_forecast = True
if current_block:
day_blocks.append(current_block.strip())
current_block = ""
current_block += line.strip() + " "
elif in_forecast and line.strip() != "":
current_block += line.strip() + " "
if current_block:
day_blocks.append(current_block.strip())
# Only keep up to pzDays blocks
for block in day_blocks[:my_settings.coastalForecastDays]:
marine_pz_report += block + "\n"
# remove last newline
if marine_pz_report.endswith("\n"):
marine_pz_report = marine_pz_report[:-1]
# remove NOAA EOF $$
if marine_pz_report.endswith("$$"):
marine_pz_report = marine_pz_report[:-2].strip()
# abbreviate the report
marine_pz_report = abbreviate_noaa(marine_pz_report)
if marine_pz_report == "":
return my_settings.NO_DATA_NOGPS
return marine_pz_report
def checkUSGSEarthQuake(lat=0, lon=0):
if lat == 0 and lon == 0:
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
radius = 100 # km
magnitude = 1.5
history = 7 # days
startDate = datetime.fromtimestamp(datetime.now().timestamp() - history*24*60*60).strftime("%Y-%m-%d")
USGSquake_url = f"https://earthquake.usgs.gov/fdsnws/event/1/query?&format=xml&latitude={lat}&longitude={lon}&maxradiuskm={radius}&minmagnitude={magnitude}&starttime={startDate}"
description_text = ""
quake_count = 0
# fetch the earthquake data from USGS
try:
quake_data = requests.get(USGSquake_url, timeout=my_settings.urlTimeoutSeconds)
if not quake_data.ok:
logger.warning("Location:Error fetching earthquake data from USGS")
return my_settings.NO_ALERTS
if not quake_data.text.strip():
return my_settings.NO_ALERTS
try:
quake_xml = xml.dom.minidom.parseString(quake_data.text)
except Exception as e:
logger.warning(f"Location: USGS earthquake API returned invalid XML: {e}")
return my_settings.NO_ALERTS
except (requests.exceptions.RequestException):
logger.warning("Location:Error fetching earthquake data from USGS")
return my_settings.NO_ALERTS
quake_xml = xml.dom.minidom.parseString(quake_data.text)
quake_count = len(quake_xml.getElementsByTagName("event"))
#get largest mag in magnitude of the set of quakes
largest_mag = 0.0
for event in quake_xml.getElementsByTagName("event"):
mag = event.getElementsByTagName("magnitude")[0]
mag_value = float(mag.getElementsByTagName("value")[0].childNodes[0].nodeValue)
if mag_value > largest_mag:
largest_mag = mag_value
# set description text
description_text = event.getElementsByTagName("description")[0].getElementsByTagName("text")[0].childNodes[0].nodeValue
largest_mag = round(largest_mag, 1)
if quake_count == 0:
return my_settings.NO_ALERTS
else:
return f"{quake_count} 🫨quakes in last {history} days within {radius} km. Largest: {largest_mag}M\n{description_text}"
howfarDB = {}
def distance(lat=0,lon=0,nodeID=0, reset=False):
# part of the howfar function, calculates the distance between two lat/lon points
msg = ""
dupe = False
location = lat,lon
r = 6371 # Radius of earth in kilometers # haversine formula
if lat == 0 and lon == 0:
return my_settings.NO_DATA_NOGPS
if nodeID == 0:
return "No NodeID provided"
if reset:
if nodeID in howfarDB:
del howfarDB[nodeID]
if nodeID not in howfarDB:
#register first point NodeID, lat, lon, time, point
howfarDB[nodeID] = [{'lat': lat, 'lon': lon, 'time': datetime.now()}]
if reset:
return "Tracking reset, new starting point registered🗺"
else:
return "Starting point registered🗺"
else:
#de-dupe points if same as last point
if howfarDB[nodeID][-1]['lat'] == lat and howfarDB[nodeID][-1]['lon'] == lon:
dupe = True
msg = "No New GPS📍 "
# calculate distance from last point in howfarDB
last_point = howfarDB[nodeID][-1]
lat1 = math.radians(last_point['lat'])
lon1 = math.radians(last_point['lon'])
lat2 = math.radians(lat)
lon2 = math.radians(lon)
dlon = lon2 - lon1
dlat = lat2 - lat1
a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
c = 2 * math.asin(math.sqrt(a))
distance_km = c * r
if my_settings.use_metric:
msg += f"{distance_km:.2f} km"
else:
distance_miles = distance_km * 0.621371
msg += f"{distance_miles:.2f} miles"
#calculate bearing
x = math.sin(dlon) * math.cos(lat2)
y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1) * math.cos(lat2) * math.cos(dlon))
initial_bearing = math.atan2(x, y)
initial_bearing = math.degrees(initial_bearing)
compass_bearing = (initial_bearing + 360) % 360
msg += f" 🧭{compass_bearing:.2f}° Bearing from last📍"
# calculate the speed if time difference is more than 1 minute
time_diff = datetime.now() - last_point['time']
if time_diff.total_seconds() > 60:
hours = time_diff.total_seconds() / 3600
if my_settings.use_metric:
speed = distance_km / hours
speed_str = f"{speed:.2f} km/h"
else:
speed_mph = (distance_km * 0.621371) / hours
speed_str = f"{speed_mph:.2f} mph"
msg += f", travel time: {int(time_diff.total_seconds()//60)} min, Speed: {speed_str}"
# calculate total distance traveled including this point computed in distance_km from calculate distance from last point in howfarDB
total_distance_km = 0.0
for i in range(1, len(howfarDB[nodeID])):
point1 = howfarDB[nodeID][i-1]
point2 = howfarDB[nodeID][i]
lat1 = math.radians(point1['lat'])
lon1 = math.radians(point1['lon'])
lat2 = math.radians(point2['lat'])
lon2 = math.radians(point2['lon'])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
c = 2 * math.asin(math.sqrt(a))
total_distance_km += c * r
# add the distance from last point to current point
total_distance_km += distance_km
if my_settings.use_metric:
msg += f", Total: {total_distance_km:.2f} km"
else:
total_distance_miles = total_distance_km * 0.621371
msg += f", Total: {total_distance_miles:.2f} miles"
# update the last point in howfarDB
if not dupe:
howfarDB[nodeID].append({'lat': lat, 'lon': lon, 'time': datetime.now()})
# if points 3+ are within 30 meters of the first point add the area of the polygon
if len(howfarDB[nodeID]) >= 3:
points = []
# loop the howfarDB to get all the points except the current nodeID
for key in howfarDB:
if key != nodeID:
points.append((howfarDB[key][-1]['lat'], howfarDB[key][-1]['lon']))
# loop the howfarDB[nodeID] to get the points
for point in howfarDB[nodeID]:
points.append((point['lat'], point['lon']))
# close the polygon by adding the first point to the end
points.append((howfarDB[nodeID][0]['lat'], howfarDB[nodeID][0]['lon']))
# calculate the area of the polygon
area = 0.0
for i in range(len(points)-1):
lat1 = math.radians(points[i][0])
lon1 = math.radians(points[i][1])
lat2 = math.radians(points[i+1][0])
lon2 = math.radians(points[i+1][1])
area += (lon2 - lon1) * (2 + math.sin(lat1) + math.sin(lat2))
area = area * (6378137 ** 2) / 2.0
area = abs(area) / 1e6 # convert to square kilometers
if my_settings.use_metric:
msg += f", Area: {area:.2f} sq.km (approx)"
else:
area_miles = area * 0.386102
msg += f", Area: {area_miles:.2f} sq.mi (approx)"
#calculate the centroid of the polygon
x = 0.0
y = 0.0
z = 0.0
for point in points[:-1]:
lat_rad = math.radians(point[0])
lon_rad = math.radians(point[1])
x += math.cos(lat_rad) * math.cos(lon_rad)
y += math.cos(lat_rad) * math.sin(lon_rad)
z += math.sin(lat_rad)
total_points = len(points) - 1
x /= total_points
y /= total_points
z /= total_points
lon_centroid = math.atan2(y, x)
hyp = math.sqrt(x * x + y * y)
lat_centroid = math.atan2(z, hyp)
lat_centroid = math.degrees(lat_centroid)
lon_centroid = math.degrees(lon_centroid)
msg += f", Centroid: {lat_centroid:.5f}, {lon_centroid:.5f}"
return msg
def get_openskynetwork(lat=0, lon=0, altitude=0, node_altitude=0, altitude_window=900):
"""
Returns the aircraft dict from OpenSky Network closest in altitude (within altitude_window meters)
to the given node_altitude. If no aircraft found, returns my_settings.NO_ALERTS.
"""
def _to_float(v):
try:
# handle numeric and numeric-strings, treat empty/'N/A' as None
if v is None:
return None
if isinstance(v, (int, float)):
return float(v)
s = str(v).strip()
if s == "" or s.upper() == "N/A":
return None
return float(s)
except Exception:
return None
try:
# basic input validation/coercion
try:
lat = float(lat)
lon = float(lon)
except Exception:
return False
try:
node_altitude = _to_float(node_altitude) or 0.0
except Exception:
node_altitude = 0.0
if lat == 0 and lon == 0:
return False
box_size = 0.45 # approx 50km
lamin = lat - box_size
lamax = lat + box_size
lomin = lon - box_size
lomax = lon + box_size
opensky_url = (
f"https://opensky-network.org/api/states/all?lamin={lamin}&lomin={lomin}"
f"&lamax={lamax}&lomax={lomax}"
)
try:
aircraft_data = requests.get(opensky_url, timeout=my_settings.urlTimeoutSeconds)
if not aircraft_data.ok:
logger.warning("Location:Error fetching aircraft data from OpenSky Network")
return False
except (requests.exceptions.RequestException):
logger.warning("Location:Error fetching aircraft data from OpenSky Network")
return False
aircraft_json = aircraft_data.json()
if 'states' not in aircraft_json or not aircraft_json['states']:
return False
aircraft_list = aircraft_json['states']
logger.debug(f"Location: OpenSky Network: Found {len(aircraft_list)} possible aircraft in area")
closest = None
min_diff = float('inf')
if len(aircraft_list) == 1:
# Only one aircraft found; return normalized values (altitudes coerced to numbers or None)
aircraft = aircraft_list[0]
baro_alt = _to_float(aircraft[7]) # barometric altitude
geo_alt = _to_float(aircraft[13]) # geometric altitude
return {
"callsign": aircraft[1].strip() if aircraft[1] else "N/A",
"origin_country": aircraft[2] if aircraft[2] is not None else "N/A",
"velocity": aircraft[9] if aircraft[9] is not None else "N/A",
"true_track": aircraft[10] if aircraft[10] is not None else "N/A",
"vertical_rate": aircraft[11] if aircraft[11] is not None else "N/A",
"sensors": aircraft[12] if aircraft[12] is not None else "N/A",
"altitude": baro_alt,
"geo_altitude": geo_alt,
"squawk": aircraft[14] if len(aircraft) > 14 and aircraft[14] is not None else "N/A",
}
for aircraft in aircraft_list:
try:
callsign = aircraft[1].strip() if aircraft[1] else "N/A"
origin_country = aircraft[2] if aircraft[2] is not None else "N/A"
velocity = aircraft[9] if aircraft[9] is not None else "N/A"
true_track = aircraft[10] if aircraft[10] is not None else "N/A"
vertical_rate = aircraft[11] if aircraft[11] is not None else "N/A"
sensors = aircraft[12] if aircraft[12] is not None else "N/A"
baro_altitude = _to_float(aircraft[7])
geo_altitude = _to_float(aircraft[13])
squawk = aircraft[14] if len(aircraft) > 14 and aircraft[14] is not None else "N/A"
except Exception:
logger.debug("Location:Error extracting aircraft data from OpenSky Network")
continue
# Prefer geo_altitude, fallback to baro_altitude
plane_alt = geo_altitude if geo_altitude is not None else baro_altitude
# skip if we can't get a numeric plane altitude or node_altitude is zero/unset
if plane_alt is None or (node_altitude == 0 or node_altitude is None):
continue
# safe numeric diff
try:
diff = abs(float(plane_alt) - float(node_altitude))
except Exception:
continue
if diff <= altitude_window and diff < min_diff:
min_diff = diff
closest = {
"callsign": callsign,
"origin_country": origin_country,
"velocity": velocity,
"true_track": true_track,
"vertical_rate": vertical_rate,
"sensors": sensors,
"altitude": baro_altitude,
"geo_altitude": geo_altitude,
"squawk": squawk,
}
if closest:
return closest
else:
return False
except Exception as e:
logger.debug(f"SYSTEM: Location HighFly: Error processing OpenSky Network data: {e}")
return False
def get_public_location_admin_manage():
"""Get the public_location_admin_manage setting directly from config file
This ensures the setting is reloaded fresh on first load of the program
"""
import configparser
config = configparser.ConfigParser()
try:
config.read("config.ini", encoding='utf-8')
return config['location'].getboolean('public_location_admin_manage', False)
except Exception:
return False
def get_delete_public_locations_admins_only():
"""Get the delete_public_locations_admins_only setting directly from config file
This ensures the setting is reloaded fresh on first load of the program
"""
import configparser
config = configparser.ConfigParser()
try:
config.read("config.ini", encoding='utf-8')
return config['location'].getboolean('delete_public_locations_admins_only', False)
except Exception:
return False
def get_node_altitude(nodeID, deviceID=1):
"""Get altitude for a node from position data or positionMetadata
Returns altitude in meters, or None if not available
"""
try:
import modules.system as system_module
# Try to get altitude from node position dict first
# Access interface dynamically from system module
interface = getattr(system_module, f'interface{deviceID}', None)
if interface and hasattr(interface, 'nodes') and interface.nodes:
for node in interface.nodes.values():
if nodeID == node['num']:
pos = node.get('position')
if pos and isinstance(pos, dict) and pos.get('altitude') is not None:
try:
altitude = float(pos['altitude'])
if altitude > 0: # Valid altitude
return altitude
except (ValueError, TypeError):
pass
# Fall back to positionMetadata (from POSITION_APP packets)
positionMetadata = getattr(system_module, 'positionMetadata', None)
if positionMetadata and nodeID in positionMetadata:
metadata = positionMetadata[nodeID]
if 'altitude' in metadata:
altitude = metadata.get('altitude', 0)
if altitude and altitude > 0:
return float(altitude)
return None
except Exception as e:
logger.debug(f"Location: Error getting altitude for node {nodeID}: {e}")
return None
def initialize_locations_database():
"""Initialize the SQLite database for storing saved locations"""
try:
# Ensure data directory exists
db_dir = os.path.dirname(my_settings.locations_db)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(my_settings.locations_db)
c = conn.cursor()
logger.debug("Location: Initializing locations database...")
# Check if table exists and get its structure
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='locations'")
table_exists = c.fetchone() is not None
if table_exists:
# Check if is_public column exists
c.execute("PRAGMA table_info(locations)")
columns_info = c.fetchall()
column_names = [col[1] for col in columns_info]
# Check for UNIQUE constraint on location_name by examining the table schema
c.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='locations'")
table_sql = c.fetchone()
has_unique_constraint = False
if table_sql and table_sql[0]:
# Check if UNIQUE constraint exists in the table definition
if 'UNIQUE' in table_sql[0].upper() and 'location_name' in table_sql[0]:
has_unique_constraint = True
# If UNIQUE constraint exists, we need to recreate the table
if has_unique_constraint:
logger.debug("Location: Removing UNIQUE constraint from locations table")
# Create temporary table without UNIQUE constraint
c.execute('''CREATE TABLE locations_new
(location_id INTEGER PRIMARY KEY AUTOINCREMENT,
location_name TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
description TEXT,
userID TEXT,
is_public INTEGER DEFAULT 0,
created_date TEXT,
created_time TEXT)''')
# Copy data from old table to new table
c.execute('''INSERT INTO locations_new
(location_id, location_name, latitude, longitude, description, userID,
is_public, created_date, created_time)
SELECT location_id, location_name, latitude, longitude, description, userID,
COALESCE(is_public, 0), created_date, created_time
FROM locations''')
# Drop old table
c.execute("DROP TABLE locations")
# Rename new table
c.execute("ALTER TABLE locations_new RENAME TO locations")
logger.debug("Location: Successfully removed UNIQUE constraint")
# Refresh column list after table recreation
c.execute("PRAGMA table_info(locations)")
columns_info = c.fetchall()
column_names = [col[1] for col in columns_info]
# Add is_public column if it doesn't exist (migration)
if 'is_public' not in column_names:
try:
c.execute('''ALTER TABLE locations ADD COLUMN is_public INTEGER DEFAULT 0''')
logger.debug("Location: Added is_public column to locations table")
except sqlite3.OperationalError:
# Column might already exist, ignore
pass
# Add altitude column if it doesn't exist (migration)
if 'altitude' not in column_names:
try:
c.execute('''ALTER TABLE locations ADD COLUMN altitude REAL''')
logger.debug("Location: Added altitude column to locations table")
except sqlite3.OperationalError:
# Column might already exist, ignore
pass
else:
# Table doesn't exist, create it without UNIQUE constraint
c.execute('''CREATE TABLE locations
(location_id INTEGER PRIMARY KEY AUTOINCREMENT,
location_name TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
description TEXT,
userID TEXT,
is_public INTEGER DEFAULT 0,
created_date TEXT,
created_time TEXT)''')
# Create index for faster lookups (non-unique)
c.execute('''CREATE INDEX IF NOT EXISTS idx_location_name_user
ON locations(location_name, userID, is_public)''')
conn.commit()
conn.close()
return True
except Exception as e:
logger.warning(f"Location: Failed to initialize locations database: {e}")
return False
def save_location_to_db(location_name, lat, lon, description="", userID="", is_public=False, altitude=None):
"""Save a location to the SQLite database
Returns:
(success, message, conflict_info)
conflict_info is None if no conflict, or dict with conflict details if conflict exists
"""
try:
if not location_name or not location_name.strip():
return False, "Location name cannot be empty", None
# Check if public locations are admin-only and user is not admin
if is_public and get_public_location_admin_manage():
from modules.system import isNodeAdmin
if not isNodeAdmin(userID):
return False, "Only admins can save public locations.", None
conn = sqlite3.connect(my_settings.locations_db)
c = conn.cursor()
location_name_clean = location_name.strip()
# Check for conflicts
# 1. Check if user already has a location with this name (private or public)
c.execute('''SELECT location_id, is_public FROM locations
WHERE location_name = ? AND userID = ?''',
(location_name_clean, userID))
user_existing = c.fetchone()
if user_existing:
conn.close()
return False, f"Location '{location_name}' already exists for your node", None
# 2. Check if there's a public location with this name
# Note: We allow public locations to overlap with other users' private locations
# Only check for existing public locations to prevent duplicate public locations
c.execute('''SELECT location_id, userID, description FROM locations
WHERE location_name = ? AND is_public = 1''',
(location_name_clean,))
public_existing = c.fetchone()
if public_existing:
if not is_public:
# User is trying to create private location but public one exists
# They can use "map public <name>" to access the public location
conn.close()
return False, f"Public location '{location_name}' already exists. Use 'map public {location_name}' to access it.", None
else:
# User is trying to create public location but one already exists
# Only one public location per name globally
conn.close()
return False, f"Public location '{location_name}' already exists", None
# 3. If saving as public, we don't check for other users' private locations
# This allows public locations to overlap with private location names
# Insert new location
now = datetime.now()
c.execute('''INSERT INTO locations
(location_name, latitude, longitude, altitude, description, userID, is_public, created_date, created_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(location_name_clean, lat, lon, altitude, description, userID, 1 if is_public else 0,
now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S")))
conn.commit()
conn.close()
visibility = "public" if is_public else "private"
logger.debug(f"Location: Saved {visibility} location '{location_name}' to database")
return True, f"Location '{location_name}' saved as {visibility}", None
except Exception as e:
logger.error(f"Location: Failed to save location: {e}")
return False, f"Error saving location: {e}", None
def get_location_from_db(location_name, userID=None):
"""Retrieve a location from the database by name
Returns:
- User's private location if exists
- Public location if exists
- None if not found
"""
try:
conn = sqlite3.connect(my_settings.locations_db)
c = conn.cursor()
location_name_clean = location_name.strip()
# First, try to get user's private location
if userID:
c.execute('''SELECT location_name, latitude, longitude, altitude, description, userID, is_public, created_date, created_time
FROM locations
WHERE location_name = ? AND userID = ? AND is_public = 0''',
(location_name_clean, userID))
result = c.fetchone()
if result:
conn.close()
return {
'name': result[0],
'lat': result[1],
'lon': result[2],
'altitude': result[3],
'description': result[4],
'userID': result[5],
'is_public': bool(result[6]),
'created_date': result[7],
'created_time': result[8]
}
# Then try public location
c.execute('''SELECT location_name, latitude, longitude, altitude, description, userID, is_public, created_date, created_time
FROM locations
WHERE location_name = ? AND is_public = 1''',
(location_name_clean,))
result = c.fetchone()
conn.close()
if result:
return {
'name': result[0],
'lat': result[1],
'lon': result[2],
'altitude': result[3],
'description': result[4],
'userID': result[5],
'is_public': bool(result[6]),
'created_date': result[7],
'created_time': result[8]
}
return None
except Exception as e:
logger.error(f"Location: Failed to retrieve location: {e}")
return None
def get_public_location_from_db(location_name):
"""Retrieve only a public location from the database by name (ignores private locations)
Returns:
- Public location if exists
- None if not found
"""
try:
conn = sqlite3.connect(my_settings.locations_db)
c = conn.cursor()
location_name_clean = location_name.strip()
# Get only public location
c.execute('''SELECT location_name, latitude, longitude, altitude, description, userID, is_public, created_date, created_time
FROM locations
WHERE location_name = ? AND is_public = 1''',
(location_name_clean,))
result = c.fetchone()
conn.close()
if result:
return {
'name': result[0],
'lat': result[1],
'lon': result[2],
'altitude': result[3],
'description': result[4],
'userID': result[5],
'is_public': bool(result[6]),
'created_date': result[7],
'created_time': result[8]
}
return None
except Exception as e:
logger.error(f"Location: Failed to retrieve public location: {e}")
return None
def list_locations_from_db(userID=None):
"""List saved locations
Shows:
- User's private locations
- All public locations
"""
try:
conn = sqlite3.connect(my_settings.locations_db)
c = conn.cursor()
if userID:
# Get user's private locations and all public locations
c.execute('''SELECT location_name, latitude, longitude, altitude, description, is_public, created_date
FROM locations
WHERE (userID = ? AND is_public = 0) OR is_public = 1
ORDER BY is_public ASC, location_name''', (userID,))
else:
# Get all public locations only
c.execute('''SELECT location_name, latitude, longitude, altitude, description, is_public, created_date
FROM locations
WHERE is_public = 1
ORDER BY location_name''')
results = c.fetchall() # Get ALL results, no limit
conn.close()
if not results:
return "No saved locations found"
locations_list = f"Saved Locations ({len(results)} total):\n"
# Return ALL results, not limited
for result in results:
is_public = bool(result[5])
visibility = "🌐Public" if is_public else "🔒Private"
locations_list += f"{result[0]} ({result[1]:.5f}, {result[2]:.5f})"
if result[3] is not None: # altitude
locations_list += f" @ {result[3]:.1f}m"
locations_list += f" [{visibility}]"
if result[4]: # description
locations_list += f" - {result[4]}"
locations_list += "\n"
return locations_list.strip()
except Exception as e:
logger.error(f"Location: Failed to list locations: {e}")
return f"Error listing locations: {e}"
def delete_location_from_db(location_name, userID=""):
"""Delete a location from the database
Returns:
(success, message)
"""
try:
if not location_name or not location_name.strip():
return False, "Location name cannot be empty"
conn = sqlite3.connect(my_settings.locations_db)
c = conn.cursor()
location_name_clean = location_name.strip()
# Check if location exists - prioritize user's private location, then public
# First try to get user's private location
c.execute('''SELECT location_id, userID, is_public FROM locations
WHERE location_name = ? AND userID = ? AND is_public = 0''',
(location_name_clean, userID))
location = c.fetchone()
# If not found, try public location
if not location:
c.execute('''SELECT location_id, userID, is_public FROM locations
WHERE location_name = ? AND is_public = 1''',
(location_name_clean,))
location = c.fetchone()
# If still not found, try any location (for admin delete)
if not location:
c.execute('''SELECT location_id, userID, is_public FROM locations
WHERE location_name = ? LIMIT 1''',
(location_name_clean,))
location = c.fetchone()
if not location:
conn.close()
return False, f"Location '{location_name}' not found"
location_id, location_userID, is_public = location
# Check permissions
# Users can only delete their own private locations
# Admins can delete any location if delete_public_locations_admins_only is enabled
is_admin = False
if get_delete_public_locations_admins_only():
from modules.system import isNodeAdmin
is_admin = isNodeAdmin(userID)
# Check if user owns this location
is_owner = (str(location_userID) == str(userID))
# Determine if deletion is allowed
can_delete = False
if is_public:
# Public locations: only admins can delete if admin-only is enabled
if get_delete_public_locations_admins_only():
can_delete = is_admin
else:
# If not admin-only, then anyone can delete public locations
can_delete = True
else:
# Private locations: owner can always delete
can_delete = is_owner
if not can_delete:
conn.close()
if is_public and get_delete_public_locations_admins_only():
return False, "Only admins can delete public locations."
else:
return False, f"You can only delete your own locations. This location belongs to another user."
# Delete the location
c.execute('''DELETE FROM locations WHERE location_id = ?''', (location_id,))
conn.commit()
conn.close()
visibility = "public" if is_public else "private"
logger.debug(f"Location: Deleted {visibility} location '{location_name}' from database")
return True, f"Location '{location_name}' deleted"
except Exception as e:
logger.error(f"Location: Failed to delete location: {e}")
return False, f"Error deleting location: {e}"
def calculate_heading_and_distance(lat1, lon1, lat2, lon2):
"""Calculate heading (bearing) and distance between two points"""
if lat1 == 0 and lon1 == 0:
return None, None, "Current location not available"
if lat2 == 0 and lon2 == 0:
return None, None, "Target location not available"
# Calculate distance using Haversine formula
r = 6371 # Earth radius in kilometers
lat1_rad = math.radians(lat1)
lon1_rad = math.radians(lon1)
lat2_rad = math.radians(lat2)
lon2_rad = math.radians(lon2)
dlon = lon2_rad - lon1_rad
dlat = lat2_rad - lat1_rad
a = math.sin(dlat / 2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2)**2
c = 2 * math.asin(math.sqrt(a))
distance_km = c * r
# Calculate bearing
x = math.sin(dlon) * math.cos(lat2_rad)
y = math.cos(lat1_rad) * math.sin(lat2_rad) - (math.sin(lat1_rad) * math.cos(lat2_rad) * math.cos(dlon))
initial_bearing = math.atan2(x, y)
initial_bearing = math.degrees(initial_bearing)
compass_bearing = (initial_bearing + 360) % 360
return compass_bearing, distance_km, None
def log_locationData_toMap(userID, location, message):
"""
Logs location data to a CSV file for meshing purposes.
Returns True if successful, False otherwise.
"""
lat, lon = location
if lat is None or lon is None or lat == 0.0 or lon == 0.0:
return False
# Set default directory to ../data/
default_dir = os.path.join(os.path.dirname(__file__), "..", "data")
os.makedirs(default_dir, exist_ok=True)
map_filepath = os.path.join(default_dir, "map_data.csv")
# Check if the file exists to determine if we need to write headers
write_header = not os.path.isfile(map_filepath) or os.path.getsize(map_filepath) == 0
try:
with open(map_filepath, mode='a', newline='') as csvfile:
fieldnames = ['userID', 'Latitude', 'Longitude', 'Description']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Write headers only if the file did not exist before or is empty
if write_header:
writer.writeheader()
writer.writerow({
'userID': userID,
'Latitude': lat,
'Longitude': lon,
'Description': message if message else ""
})
logger.debug(f"Logged location for {userID} to {map_filepath}")
return True
except Exception as e:
logger.error(f"Failed to log location for {userID}: {e}")
return False
def mapHandler(userID, deviceID, channel_number, message, snr, rssi, hop):
from modules.system import get_node_location
command = message[len("map"):].strip()
location = get_node_location(userID, deviceID)
lat = location[0]
lon = location[1]
"""
Handles 'map' commands from meshbot.
Usage:
map save <name> [description] - Save current location with a name
map save public <name> [desc] - Save public location (all can see)
map <name> - Get heading and distance to a saved location
map public <name> - Get heading to public location (ignores private)
map delete <name> - Delete a location
map list - List all saved locations
map log <description> - Log current location with description (CSV, legacy)
"""
command = str(command) # Ensure command is always a string
if command.strip().lower() == "help":
return (
f"'map save <name> [description]' - Save private\n"
f"'map save public <name> [desc]' - Save public\n"
f"'map <name>' - heading to saved\n"
f"'map public <name>' - heading to public\n"
f"'map delete <name>' \n"
f"'map list' - List\n"
f"'map log <description>' - Log CSV\n"
)
# Handle "save" command
if command.lower().startswith("save "):
save_cmd = command[5:].strip()
is_public = False
# Check for "public" keyword
if save_cmd.lower().startswith("public "):
is_public = True
save_cmd = save_cmd[7:].strip() # Remove "public " prefix
parts = save_cmd.split(" ", 1)
if len(parts) < 1 or not parts[0]:
if is_public:
return "🚫Usage: map save public <name> [description]"
else:
return "🚫Usage: map save <name> [description]"
location_name = parts[0]
description = parts[1] if len(parts) > 1 else ""
# Add SNR/RSSI info to description if available
if snr is not None and rssi is not None:
if description:
description += f" SNR:{snr}dB RSSI:{rssi}dBm"
else:
description = f"SNR:{snr}dB RSSI:{rssi}dBm"
if hop is not None:
if description:
description += f" Meta:{hop}"
else:
description = f"Meta:{hop}"
if not location or len(location) != 2 or lat == 0 or lon == 0:
return "🚫Location data is missing or invalid."
# Get altitude for the node
altitude = get_node_altitude(userID, deviceID)
success, msg, _ = save_location_to_db(location_name, lat, lon, description, str(userID), is_public, altitude)
if success:
return f"📍{msg}"
else:
return f"🚫{msg}"
# Handle "list" command
if command.strip().lower() == "list":
return list_locations_from_db(str(userID))
# Handle "delete" command
if command.lower().startswith("delete "):
location_name = command[7:].strip() # Remove "delete " prefix
if not location_name:
return "🚫Usage: map delete <name>"
success, msg = delete_location_from_db(location_name, str(userID))
if success:
return f"🗑️{msg}"
else:
return f"🚫{msg}"
# Handle "public" command to retrieve public locations (even if user has private with same name)
if command.lower().startswith("public "):
location_name = command[7:].strip() # Remove "public " prefix
if not location_name:
return "🚫Usage: map public <name>"
saved_location = get_public_location_from_db(location_name)
if saved_location:
# Calculate heading and distance from current location
if not location or len(location) != 2 or lat == 0 or lon == 0:
result = f"📍{saved_location['name']} (Public): {saved_location['lat']:.5f}, {saved_location['lon']:.5f}"
if saved_location.get('altitude') is not None:
result += f" @ {saved_location['altitude']:.1f}m"
result += "\n🚫Current location not available for heading"
return result
bearing, distance_km, error = calculate_heading_and_distance(
lat, lon, saved_location['lat'], saved_location['lon']
)
if error:
return f"📍{saved_location['name']} (Public): {error}"
# Format distance
if my_settings.use_metric:
distance_str = f"{distance_km:.2f} km"
else:
distance_miles = distance_km * 0.621371
if distance_miles < 0.25:
# Convert to feet for short distances
distance_feet = distance_miles * 5280
distance_str = f"{distance_feet:.0f} ft"
else:
distance_str = f"{distance_miles:.2f} miles"
# Format bearing with cardinal direction
bearing_rounded = round(bearing)
cardinal = ""
if bearing_rounded == 0 or bearing_rounded == 360:
cardinal = "N"
elif bearing_rounded == 90:
cardinal = "E"
elif bearing_rounded == 180:
cardinal = "S"
elif bearing_rounded == 270:
cardinal = "W"
elif 0 < bearing_rounded < 90:
cardinal = "NE"
elif 90 < bearing_rounded < 180:
cardinal = "SE"
elif 180 < bearing_rounded < 270:
cardinal = "SW"
elif 270 < bearing_rounded < 360:
cardinal = "NW"
result = f"📍{saved_location['name']} (Public)\n"
result += f"🧭Heading: {bearing_rounded}° {cardinal}\n"
result += f"📏Distance: {distance_str}"
# Calculate altitude difference if both are available
current_altitude = get_node_altitude(userID, deviceID)
saved_altitude = saved_location.get('altitude')
if current_altitude is not None and saved_altitude is not None:
altitude_diff_m = saved_altitude - current_altitude # message altitude - DB altitude
altitude_diff_ft = altitude_diff_m * 3.28084 # Convert meters to feet
altitude_diff_ft_rounded = round(altitude_diff_ft) # Round to nearest foot
if altitude_diff_ft_rounded > 0:
result += f"\nAltitude: +{altitude_diff_ft_rounded}ft" # Message is higher
elif altitude_diff_ft_rounded < 0:
result += f"\nAltitude: {altitude_diff_ft_rounded}ft" # Message is lower (negative already has -)
else:
result += f"\nAltitude: ±0ft"
if saved_location['description']:
result += f"\n📝{saved_location['description']}"
return result
else:
return f"🚫Public location '{location_name}' not found."
# Handle "log" command for CSV logging
if command.lower().startswith("log "):
description = command[4:].strip() # Remove "log " prefix
# if no description provided, set to default
if not description:
description = "Logged:"
# Sanitize description for CSV injection
if description and description[0] in ('=', '+', '-', '@'):
description = "'" + description
# if there is SNR and RSSI info, append to description
if snr is not None and rssi is not None:
description += f" SNR:{snr}dB RSSI:{rssi}dBm"
# if there is hop info, append to description
if hop is not None:
description += f" Meta:{hop}"
# location should be a tuple: (lat, lon)
if not location or len(location) != 2:
return "🚫Location data is missing or invalid."
success = log_locationData_toMap(userID, location, description)
if success:
return f"📍Location logged (CSV)"
else:
return "🚫Failed to log location. Please try again."
# Handle location name lookup (get heading)
if command.strip():
location_name = command.strip()
saved_location = get_location_from_db(location_name, str(userID))
if saved_location:
# Calculate heading and distance from current location
if not location or len(location) != 2 or lat == 0 or lon == 0:
result = f"📍{saved_location['name']}: {saved_location['lat']:.5f}, {saved_location['lon']:.5f}"
if saved_location.get('altitude') is not None:
result += f" @ {saved_location['altitude']:.1f}m"
result += "\n🚫Current location not available for heading"
return result
bearing, distance_km, error = calculate_heading_and_distance(
lat, lon, saved_location['lat'], saved_location['lon']
)
if error:
return f"📍{saved_location['name']}: {error}"
# Format distance
if my_settings.use_metric:
distance_str = f"{distance_km:.2f} km"
else:
distance_miles = distance_km * 0.621371
if distance_miles < 0.25:
# Convert to feet for short distances
distance_feet = distance_miles * 5280
distance_str = f"{distance_feet:.0f} ft"
else:
distance_str = f"{distance_miles:.2f} miles"
# Format bearing with cardinal direction
bearing_rounded = round(bearing)
cardinal = ""
if bearing_rounded == 0 or bearing_rounded == 360:
cardinal = "N"
elif bearing_rounded == 90:
cardinal = "E"
elif bearing_rounded == 180:
cardinal = "S"
elif bearing_rounded == 270:
cardinal = "W"
elif 0 < bearing_rounded < 90:
cardinal = "NE"
elif 90 < bearing_rounded < 180:
cardinal = "SE"
elif 180 < bearing_rounded < 270:
cardinal = "SW"
elif 270 < bearing_rounded < 360:
cardinal = "NW"
result = f"📍{saved_location['name']}\n"
result += f"🧭Heading: {bearing_rounded}° {cardinal}\n"
result += f"📏Distance: {distance_str}"
# Calculate altitude difference if both are available
current_altitude = get_node_altitude(userID, deviceID)
saved_altitude = saved_location.get('altitude')
if current_altitude is not None and saved_altitude is not None:
altitude_diff_m = saved_altitude - current_altitude # message altitude - DB altitude
altitude_diff_ft = altitude_diff_m * 3.28084 # Convert meters to feet
altitude_diff_ft_rounded = round(altitude_diff_ft) # Round to nearest foot
if altitude_diff_ft_rounded > 0:
result += f"\nAltitude: +{altitude_diff_ft_rounded}ft" # Message is higher
elif altitude_diff_ft_rounded < 0:
result += f"\nAltitude: {altitude_diff_ft_rounded}ft" # Message is lower (negative already has -)
else:
result += f"\nAltitude: ±0ft"
if saved_location['description']:
result += f"\n📝{saved_location['description']}"
return result
else:
# Location not found
return f"🚫Location '{location_name}' not found. Use 'map list' to see available locations."
# Empty command - show help
return "🗺Use 'map help' for help"
# Initialize the locations database when module is imported
initialize_locations_database()