Compare commits

..

3 Commits

Author SHA1 Message Date
SpudGunMan
75410c98e3 sweep 2025-10-24 20:40:23 -07:00
SpudGunMan
10171a712e moar cleanup 🧹
tighter memory control
2025-10-24 20:32:30 -07:00
SpudGunMan
fa76a76203 BIG OLD PATCH 🍠
pz days ... haha. I hope this works.
fancy potato
2025-10-24 19:54:46 -07:00
38 changed files with 423 additions and 349 deletions

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# # Simulate meshing-around de K7MHI 2024
from modules.log import * # Import the logger; ### --> If you are reading this put the script in the project root <-- ###
from modules.log import logger, getPrettyTime # Import the logger; ### --> If you are reading this put the script in the project root <-- ###
import time
import random

View File

@@ -1,7 +1,6 @@
#!/usr/bin/python3
# Meshtastic Autoresponder MESH Bot
# K7MHI Kelly Keeton 2025
try:
from pubsub import pub
except ImportError:
@@ -12,7 +11,8 @@ import asyncio
import time # for sleep, get some when you can :)
import random
from datetime import datetime
from modules.log import *
from modules.log import logger, CustomFormatter, msgLogger, getPrettyTime
import modules.settings as my_settings
from modules.system import *
# list of commands to remove from the default list for DM only
@@ -129,7 +129,7 @@ def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_n
# check the message for commands words list, processed after system.messageTrap
for key in command_handler:
word = message_lower.split(' ')
if cmdBang:
if my_settings.cmdBang:
# strip the !
if word[0].startswith("!"):
word[0] = word[0][1:]
@@ -180,13 +180,13 @@ def isPlayingGame(message_from_id):
trackers = [tracker for tracker in trackers if tracker is not None]
for tracker, game_name, handle_game_func in trackers:
for tracker, game_name, _ in trackers:
for i in range(len(tracker)-1, -1, -1): # iterate backwards for safe removal
id_key = 'userID' if game_name == "DopeWars" else 'nodeID'
id_key = 'id' if game_name == "Survey" else id_key
if tracker[i].get(id_key) == message_from_id:
last_played_key = 'last_played' if 'last_played' in tracker[i] else 'time'
if tracker[i].get(last_played_key, 0) > (time.time() - GAMEDELAY):
if tracker[i].get(last_played_key, 0) > (time.time() - my_settings.GAMEDELAY):
playingGame = True
game = game_name
break
@@ -218,7 +218,7 @@ def check_and_play_game(tracker, message_from_id, message_string, rxNode, channe
if tracker[i].get(id_key) == message_from_id:
last_played_key = 'last_played' if 'last_played' in tracker[i] else 'time'
if tracker[i].get(last_played_key) > (time.time() - GAMEDELAY):
if tracker[i].get(last_played_key) > (time.time() - my_settings.GAMEDELAY):
if llm_enabled:
logger.debug(f"System: LLM Disabled for {message_from_id} for duration of {game_name}")
send_message(handle_game_func(message_string, message_from_id, rxNode), channel_number, message_from_id, rxNode)
@@ -276,7 +276,7 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann
if len(toNode) <= 4:
toNode = get_num_from_short_name(toNode, deviceID)
if toNode and isinstance(toNode, int) and toNode != 0:
if bbs_enabled:
if my_settings.bbs_enabled:
msg_result = None
logger.debug(f"System: Sending ping as BBS DM to @{toNode} from {get_name_from_number(message_from_id, 'short', deviceID)}")
msg_result = bbs_post_dm(toNode, f"Joke for you! {tell_joke()}", message_from_id)
@@ -306,13 +306,13 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann
pingCount = int(message.split(" ")[1])
if pingCount == 123 or pingCount == 1234:
pingCount = 1
elif not autoPingInChannel and not isDM:
elif not my_settings.autoPingInChannel and not isDM:
# no autoping in channels
pingCount = 1
if pingCount > 51:
pingCount = 50
except:
except ValueError:
pingCount = -1
if pingCount > 1:
@@ -324,7 +324,7 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann
msg = f"🚦Initalizing {pingCount} auto-ping"
# if not a DM add the username to the beginning of msg
if not useDMForResponse and not isDM:
if not my_settings.useDMForResponse and not isDM:
msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg
return msg
@@ -336,7 +336,7 @@ def handle_alertBell(message_from_id, deviceID, message):
def handle_emergency(message_from_id, deviceID, message):
myNodeNum = globals().get(f'myNodeNum{deviceID}', 777)
# if user in bbs_ban_list return
if str(message_from_id) in bbs_ban_list:
if str(message_from_id) in my_settings.bbs_ban_list:
# silent discard
logger.warning(f"System: {message_from_id} on spam list, no emergency responder alert sent")
return ''
@@ -344,30 +344,29 @@ def handle_emergency(message_from_id, deviceID, message):
if message_from_id != 0:
nodeLocation = get_node_location(message_from_id, deviceID)
# if default location is returned set to Unknown
if nodeLocation[0] == latitudeValue and nodeLocation[1] == longitudeValue:
if nodeLocation[0] == my_settings.latitudeValue and nodeLocation[1] == my_settings.longitudeValue:
nodeLocation = ["?", "?"]
nodeInfo = f"{get_name_from_number(message_from_id, 'short', deviceID)} detected by {get_name_from_number(myNodeNum, 'short', deviceID)} lastGPS {nodeLocation[0]}, {nodeLocation[1]}"
msg = f"🔔🚨Intercepted Possible Emergency Assistance needed for: {nodeInfo}"
# alert the emergency_responder_alert_channel
send_message(msg, emergency_responder_alert_channel, 0, emergency_responder_alert_interface)
send_message(msg, my_settings.emergency_responder_alert_channel, 0, my_settings.emergency_responder_alert_interface)
logger.warning(f"System: {message_from_id} Emergency Assistance Requested in {message}")
# send the message out via email/sms
if enableSMTP:
for user in sysopEmails:
if my_settings.enableSMTP:
for user in my_settings.sysopEmails:
send_email(user, f"Emergency Assistance Requested by {nodeInfo} in {message}", message_from_id)
return EMERGENCY_RESPONSE
return my_settings.EMERGENCY_RESPONSE
def handle_motd(message, message_from_id, isDM):
global MOTD
msg = MOTD
msg = my_settings.MOTD
isAdmin = isNodeAdmin(message_from_id)
if "?" in message:
msg = "Message of the day, set with 'motd $ HelloWorld!'"
elif "$" in message and isAdmin:
motd = message.split("$")[1]
MOTD = motd.rstrip()
logger.debug(f"System: {message_from_id} temporarly changed MOTD: {MOTD}")
msg = "MOTD changed to: " + MOTD
my_settings.MOTD = message.split("$")[1]
my_settings.MOTD = my_settings.MOTD.rstrip()
logger.debug(f"System: {message_from_id} temporarly changed my_settings.MOTD: {my_settings.MOTD}")
msg = "my_settings.MOTD changed to: " + my_settings.MOTD
return msg
def handle_echo(message, message_from_id, deviceID, isDM, channel_number):
@@ -391,7 +390,7 @@ def handle_echo(message, message_from_id, deviceID, isDM, channel_number):
parts = message.lower().split("echo ", 1)
if len(parts) > 1 and parts[1].strip() != "":
echo_msg = parts[1]
if channel_number != echoChannel and not isDM:
if channel_number != my_settings.echoChannel and not isDM:
echo_msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + echo_msg
return echo_msg
else:
@@ -400,7 +399,7 @@ def handle_echo(message, message_from_id, deviceID, isDM, channel_number):
return "Please provide a message to echo back to you. Example:echo Hello World"
def handle_wxalert(message_from_id, deviceID, message):
if use_meteo_wxApi:
if my_settings.use_meteo_wxApi:
return "wxalert is not supported"
else:
location = get_node_location(message_from_id, deviceID)
@@ -410,7 +409,7 @@ def handle_wxalert(message_from_id, deviceID, message):
else:
weatherAlert = getWeatherAlertsNOAA(str(location[0]), str(location[1]))
if NO_ALERTS not in weatherAlert:
if my_settings.NO_ALERTS not in weatherAlert:
weatherAlert = weatherAlert[0]
return weatherAlert
@@ -428,7 +427,7 @@ def handleNews(message_from_id, deviceID, message, isDM):
if news:
# if not a DM add the username to the beginning of msg
if not useDMForResponse and not isDM:
if not my_settings.useDMForResponse and not isDM:
news = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + news
return news
else:
@@ -444,7 +443,7 @@ def handle_howfar(message, message_from_id, deviceID, isDM):
return "command returns the distance you have traveled since your last HowFar-command. Add 'reset' to reset your starting point."
# if no GPS location return
if lat == latitudeValue and lon == longitudeValue:
if lat == my_settings.latitudeValue and lon == my_settings.longitudeValue:
logger.debug(f"System: HowFar: No GPS location for {message_from_id}")
return "No GPS location available"
@@ -454,7 +453,7 @@ def handle_howfar(message, message_from_id, deviceID, isDM):
msg = distance(lat,lon,message_from_id)
# if not a DM add the username to the beginning of msg
if not useDMForResponse and not isDM:
if not my_settings.useDMForResponse and not isDM:
msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg
return msg
@@ -464,10 +463,10 @@ def handle_howtall(message, message_from_id, deviceID, isDM):
location = get_node_location(message_from_id, deviceID)
lat = location[0]
lon = location[1]
if lat == latitudeValue and lon == longitudeValue:
if lat == my_settings.latitudeValue and lon == my_settings.longitudeValue:
# add guessing tot he msg
msg += "Guessing:"
if use_metric:
if my_settings.use_metric:
measure = "meters"
else:
measure = "feet"
@@ -477,14 +476,14 @@ def handle_howtall(message, message_from_id, deviceID, isDM):
# get the shadow length from the message split after howtall
try:
shadow_length = float(message.lower().split("howtall ")[1].split(" ")[0])
except:
except (IndexError, ValueError):
return f"Please provide a shadow length in {measure} example: howtall 5.5"
# get data
msg += measureHeight(lat, lon, shadow_length)
# if data has NO_ALERTS return help
if NO_ALERTS in msg:
if my_settings.NO_ALERTS in msg:
return f"Please provide a shadow length in {measure} example: howtall 5.5"
return msg
@@ -511,12 +510,12 @@ llmLocationTable = [{'nodeID': 1234567890, 'location': 'No Location'},]
def handle_satpass(message_from_id, deviceID, message='', vox=False):
if vox:
location = (latitudeValue, longitudeValue)
location = (my_settings.latitudeValue, my_settings.longitudeValue)
message = 'satpass'
else:
location = get_node_location(message_from_id, deviceID)
passes = ''
satList = satListConfig
satList = my_settings.satListConfig
message = message.lower()
# if user has a NORAD ID in the message
@@ -547,7 +546,7 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel
location_name = 'no location provided'
msg = ''
if location_enabled:
if my_settings.location_enabled:
# if message_from_id is is the llmLocationTable use the location from the list to save on API calls
for i in range(0, len(llmLocationTable)):
if llmLocationTable[i].get('nodeID') == message_from_id:
@@ -558,7 +557,7 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel
location = get_node_location(message_from_id, deviceID)
location_name = where_am_i(str(location[0]), str(location[1]), short = True)
if NO_DATA_NOGPS in location_name:
if my_settings.NO_DATA_NOGPS in location_name:
location_name = "no location provided"
if "ask:" in message.lower():
@@ -573,12 +572,12 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel
# check for a welcome message (is this redundant?)
if not any(node['nodeID'] == message_from_id and node['welcome'] == True for node in seenNodes):
if (channel_number == publicChannel and antiSpam) or useDMForResponse:
if (channel_number == publicChannel and my_settings.antiSpam) or my_settings.useDMForResponse:
# send via DM
send_message(welcome_message, channel_number, message_from_id, deviceID)
send_message(my_settings.welcome_message, channel_number, message_from_id, deviceID)
else:
# send via channel
send_message(welcome_message, channel_number, 0, deviceID)
send_message(my_settings.welcome_message, channel_number, 0, deviceID)
# mark the node as welcomed
for node in seenNodes:
if node['nodeID'] == message_from_id:
@@ -606,7 +605,7 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel
msg = "Please wait, response could take 30+ seconds. Fund the SysOp's GPU budget!"
if msg != '':
if (channel_number == publicChannel and antiSpam) or useDMForResponse:
if (channel_number == publicChannel and my_settings.antiSpam) or my_settings.useDMForResponse:
# send via DM
send_message(msg, channel_number, message_from_id, deviceID)
else:
@@ -1006,13 +1005,14 @@ def handleTicTacToe(message, nodeID, deviceID):
return msg
def quizHandler(message, nodeID, deviceID):
global quizGamePlayer
user_name = get_name_from_number(nodeID)
user_id = nodeID
msg = ''
user_answer = ''
user_answer = message.lower()
user_answer = user_answer.replace("quiz","").replace("q:","").strip()
if user_answer.startswith("!") and cmdBang:
if user_answer.startswith("!") and my_settings.cmdBang:
user_answer = user_answer[1:].strip()
if user_answer:
if user_answer.startswith("start"):
@@ -1066,6 +1066,7 @@ def quizHandler(message, nodeID, deviceID):
return "🧠Please provide an answer or command, or send q: ?"
def surveyHandler(message, nodeID, deviceID):
global surveyTracker
user_id = nodeID
location = get_node_location(nodeID, deviceID)
msg = ''
@@ -1114,7 +1115,7 @@ def surveyHandler(message, nodeID, deviceID):
def handle_riverFlow(message, message_from_id, deviceID, vox=False):
# River Flow from NOAA or Open-Meteo
if vox:
location = (latitudeValue, longitudeValue)
location = (my_settings.latitudeValue, my_settings.longitudeValue)
message = "riverflow"
else:
location = get_node_location(message_from_id, deviceID)
@@ -1138,9 +1139,9 @@ def handle_riverFlow(message, message_from_id, deviceID, vox=False):
def handle_mwx(message_from_id, deviceID, cmd):
# NOAA Coastal and Marine Weather
if myCoastalZone is None:
if my_settings.myCoastalZone is None:
logger.warning("System: Coastal Zone not set, please set in config.ini")
return NO_ALERTS
return my_settings.NO_ALERTS
return get_nws_marine(zone=myCoastalZone, days=coastalForecastDays)
def handle_wxc(message_from_id, deviceID, cmd, vox=False):
@@ -1148,17 +1149,17 @@ def handle_wxc(message_from_id, deviceID, cmd, vox=False):
if vox:
# return a default message if vox is enabled
if use_meteo_wxApi:
return get_wx_meteo(latitudeValue, longitudeValue)
return get_wx_meteo(my_settings.latitudeValue, my_settings.longitudeValue)
else:
return get_NOAAweather(latitudeValue, longitudeValue)
return get_NOAAweather(my_settings.latitudeValue, my_settings.longitudeValue)
location = get_node_location(message_from_id, deviceID)
if use_meteo_wxApi and not "wxc" in cmd and not use_metric:
if my_settings.use_meteo_wxApi and not "wxc" in cmd and not use_metric:
#logger.debug("System: Bot Returning Open-Meteo API for weather imperial")
weather = get_wx_meteo(str(location[0]), str(location[1]))
elif use_meteo_wxApi:
elif my_settings.use_meteo_wxApi:
#logger.debug("System: Bot Returning Open-Meteo API for weather metric")
weather = get_wx_meteo(str(location[0]), str(location[1]), 1)
elif not use_meteo_wxApi and "wxc" in cmd or use_metric:
elif not my_settings.use_meteo_wxApi and "wxc" in cmd or my_settings.use_metric:
#logger.debug("System: Bot Returning NOAA API for weather metric")
weather = get_NOAAweather(str(location[0]), str(location[1]), 1)
else:
@@ -1168,7 +1169,7 @@ def handle_wxc(message_from_id, deviceID, cmd, vox=False):
def handle_emergency_alerts(message, message_from_id, deviceID):
location = get_node_location(message_from_id, deviceID)
if enableDEalerts:
if my_settings.enableDEalerts:
# nina Alerts
return get_nina_alerts()
if message.lower().startswith("ealert"):
@@ -1249,7 +1250,7 @@ def handle_messages(message, deviceID, channel_number, msg_history, publicChanne
# Choose order and slice
# Oldest first, take first N
filtered_msgs = filtered_msgs[-storeFlimit:][::-1]
if reverseSF:
if my_settings.reverseSF:
# reverse that
filtered_msgs = filtered_msgs[::-1]
@@ -1284,7 +1285,7 @@ def handle_messages(message, deviceID, channel_number, msg_history, publicChanne
def handle_sun(message_from_id, deviceID, channel_number, vox=False):
if vox:
# return a default message if vox is enabled
return get_sun(str(latitudeValue), str(longitudeValue))
return get_sun(str(my_settings.latitudeValue), str(my_settings.longitudeValue))
location = get_node_location(message_from_id, deviceID, channel_number)
return get_sun(str(location[0]), str(location[1]))
@@ -1358,7 +1359,8 @@ def handle_history(message, nodeid, deviceID, isDM, lheard=False):
# create the message from the buffer list
for i in range(0, len(buffer)):
msg += f"{buffer[i][0]}: {buffer[i][1]} :{buffer[i][2]} ago"
if i < len(buffer) - 1: msg += "\n" # add a new line if not the last line
if i < len(buffer) - 1:
msg += "\n" # add a new line if not the last line
else:
# sort the cmdHistory list by time, return the username and time into a new list which used for display
for i in range(len(cmdHistory)):
@@ -1380,8 +1382,10 @@ def handle_history(message, nodeid, deviceID, isDM, lheard=False):
buffer.reverse() # reverse the list to show the latest first
for i in range(0, len(buffer)):
msg += f"{buffer[i][0]}, {buffer[i][1]} ago"
if i < len(buffer) - 1: msg += "\n" # add a new line if not the last line
if i > 3: break # only return the last 4 nodes
if i < len(buffer) - 1:
msg += "\n" # add a new line if not the last line
if i > 3:
break # only return the last 4 nodes
return msg
def handle_whereami(message_from_id, deviceID, channel_number):
@@ -1399,13 +1403,13 @@ def handle_repeaterQuery(message_from_id, deviceID, channel_number):
def handle_tide(message_from_id, deviceID, channel_number, vox=False):
if vox:
return get_NOAAtide(str(latitudeValue), str(longitudeValue))
return get_NOAAtide(str(my_settings.latitudeValue), str(my_settings.longitudeValue))
location = get_node_location(message_from_id, deviceID, channel_number)
return get_NOAAtide(str(location[0]), str(location[1]))
def handle_moon(message_from_id, deviceID, channel_number, vox=False):
if vox:
return get_moon(str(latitudeValue), str(longitudeValue))
return get_moon(str(my_settings.latitudeValue), str(my_settings.longitudeValue))
location = get_node_location(message_from_id, deviceID, channel_number)
return get_moon(str(location[0]), str(location[1]))
@@ -1421,7 +1425,7 @@ def handle_whoami(message_from_id, deviceID, hop, snr, rssi, pkiStatus):
msg += f"\nYour PKI bit is {pkiStatus[0]} pubKey: {pkiStatus[1]}"
loc = get_node_location(message_from_id, deviceID)
if loc != [latitudeValue, longitudeValue]:
if loc != [my_settings.latitudeValue, my_settings.longitudeValue]:
msg += f"\nYou are at: lat:{loc[0]} lon:{loc[1]}"
# check the positionMetadata for nodeID and get metadata
@@ -1469,7 +1473,7 @@ def handle_whois(message, deviceID, channel_number, message_from_id):
location = get_node_location(seenNodes[i]['nodeID'], deviceID, channel_number)
msg += f"Ch: {seenNodes[i]['channel']}, Int: {seenNodes[i]['rxInterface']}"
msg += f"Lat: {location[0]}, Lon: {location[1]}\n"
if location != [latitudeValue, longitudeValue]:
if location != [my_settings.latitudeValue, my_settings.longitudeValue]:
msg += f"Loc: {where_am_i(str(location[0]), str(location[1]))}"
return msg
@@ -1493,7 +1497,7 @@ def onReceive(packet, interface):
session_passkey = None
playingGame = False
if DEBUGpacket:
if my_settings.DEBUGpacket:
# Debug print the interface object
for item in interface.__dict__.items(): intDebug = f"{item}\n"
logger.debug(f"System: Packet Received on {rxType} Interface\n {intDebug} \n END of interface \n")
@@ -1671,10 +1675,10 @@ def onReceive(packet, interface):
send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode)
else:
# DM is useful for games or LLM
if games_enabled and ("Direct" in hop or hop_count < game_hop_limit):
if my_settings.games_enabled and ("Direct" in hop or hop_count < my_settings.game_hop_limit):
playingGame = checkPlayingGame(message_from_id, message_string, rxNode, channel_number)
elif hop_count >= game_hop_limit:
if games_enabled:
elif hop_count >= my_settings.game_hop_limit:
if my_settings.games_enabled:
logger.warning(f"Device:{rxNode} Ignoring Request to Play Game: {message_string} From: {get_name_from_number(message_from_id, 'long', rxNode)} with hop count: {hop}")
send_message(f"Your hop count exceeds safe playable distance at {hop_count} hops", channel_number, message_from_id, rxNode)
else:
@@ -1683,7 +1687,7 @@ def onReceive(packet, interface):
playingGame = False
if not playingGame:
if llm_enabled and llmReplyToNonCommands:
if llm_enabled and my_settings.llmReplyToNonCommands:
# respond with LLM
llm = handle_llm(message_from_id, channel_number, rxNode, message_string, publicChannel)
send_message(llm, channel_number, message_from_id, rxNode)
@@ -1700,7 +1704,7 @@ def onReceive(packet, interface):
if node['nodeID'] == message_from_id:
node['welcome'] = True
else:
if dad_jokes_enabled:
if my_settings.dad_jokes_enabled:
# respond with a dad joke on DM
send_message(tell_joke(), channel_number, message_from_id, rxNode)
else:
@@ -1714,24 +1718,24 @@ def onReceive(packet, interface):
# message is on a channel
if messageTrap(message_string):
# message is for us to respond to, or is it...
if ignoreDefaultChannel and channel_number == publicChannel:
if my_settings.ignoreDefaultChannel and channel_number == my_settings.publicChannel:
logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Default Channel:{channel_number}")
elif str(message_from_id) in bbs_ban_list:
elif str(message_from_id) in my_settings.bbs_ban_list:
logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Cantankerous Node")
elif str(channel_number) in ignoreChannels:
elif str(channel_number) in my_settings.ignoreChannels:
logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Ignored Channel:{channel_number}")
elif cmdBang and not message_string.startswith("!"):
elif my_settings.cmdBang and not message_string.startswith("!"):
logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Didnt sound like they meant it")
else:
# message is for bot to respond to, seriously this time..
logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "ReceivedChannel: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\
"From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}")
if useDMForResponse:
if my_settings.useDMForResponse:
# respond to channel message via direct message
send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode)
else:
# or respond to channel message on the channel itself
if channel_number == publicChannel and antiSpam:
if channel_number == my_settings.publicChannel and my_settings.antiSpam:
# warning user spamming default channel
logger.warning(f"System: AntiSpam protection, sending DM to: {get_name_from_number(message_from_id, 'long', rxNode)}")
@@ -1744,15 +1748,15 @@ def onReceive(packet, interface):
else:
# message is not for us to respond to
# ignore the message but add it to the message history list
if zuluTime:
if my_settings.zuluTime:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
timestamp = datetime.now().strftime("%Y-%m-%d %I:%M:%S%p")
# trim the history list if it exceeds max_history
if len(msg_history) >= MAX_MSG_HISTORY:
if len(msg_history) >= my_settings.MAX_MSG_HISTORY:
# Always keep only the most recent MAX_MSG_HISTORY entries
msg_history = msg_history[-MAX_MSG_HISTORY:]
msg_history = msg_history[-my_settings.MAX_MSG_HISTORY:]
# add the message to the history list
msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode))
@@ -1760,27 +1764,27 @@ def onReceive(packet, interface):
# print the message to the log and sdout
logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "Ignoring Message:" + CustomFormatter.white +\
f" {message_string} " + CustomFormatter.purple + "From:" + CustomFormatter.white + f" {get_name_from_number(message_from_id)}")
if log_messages_to_file:
if my_settings.log_messages_to_file:
msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-'))
# repeat the message on the other device
if repeater_enabled and multiple_interface:
if my_settings.repeater_enabled and my_settings.multiple_interface:
# wait a responseDelay to avoid message collision from lora-ack.
time.sleep(responseDelay)
if len(message_string) > (3 * MESSAGE_CHUNK_SIZE):
time.sleep(my_settings.responseDelay)
if len(message_string) > (3 * my_settings.MESSAGE_CHUNK_SIZE):
logger.warning(f"System: Not repeating message, exceeds size limit ({len(message_string)} > {3 * MESSAGE_CHUNK_SIZE})")
else:
rMsg = (f"{message_string} From:{get_name_from_number(message_from_id, 'short', rxNode)}")
# if channel found in the repeater list repeat the message
if str(channel_number) in repeater_channels:
if str(channel_number) in my_settings.repeater_channels:
for i in range(1, 10):
if globals().get(f'interface{i}_enabled', False) and i != rxNode:
logger.debug(f"Repeating message on Device{i} Channel:{channel_number}")
send_message(rMsg, channel_number, 0, i)
time.sleep(responseDelay)
time.sleep(my_settings.responseDelay)
# if QRZ enabled check if we have said hello
if qrz_hello_enabled:
if my_settings.qrz_hello_enabled:
if never_seen_before(message_from_id):
name = get_name_from_number(message_from_id, 'short', rxNode)
if isinstance(name, str) and name.startswith("!") and len(name) == 9:
@@ -1790,11 +1794,11 @@ def onReceive(packet, interface):
# add to qrz_hello list
hello(message_from_id, name)
# send a hello message as a DM
if not train_qrz:
if not my_settings.train_qrz:
send_message(f"Hello {name} {qrz_hello_string}", channel_number, message_from_id, rxNode)
# handle mini games
if wordOfTheDay:
if my_settings.wordOfTheDay:
#word of the day game play on non bot messages
happened, old_entry, new_entry, bingo_win, bingo_message = theWordOfTheDay.did_it_happen(message_string)
if happened:
@@ -1831,135 +1835,145 @@ async def start_rx():
f"{get_name_from_number(myNodeNum, 'short', i)}. NodeID: {myNodeNum}, {decimal_to_hex(myNodeNum)}")
if llm_enabled:
logger.debug(f"System: Ollama LLM Enabled, loading model {llmModel} please wait")
logger.debug(f"System: Ollama LLM Enabled, loading model {my_settings.llmModel} please wait")
llmLoad = llm_query(" ")
if "trouble" not in llmLoad:
logger.debug(f"System: LLM Model {llmModel} loaded")
logger.debug(f"System: LLM Model {my_settings.llmModel} loaded")
if useDMForResponse:
if my_settings.useDMForResponse:
logger.debug("System: Respond by DM only")
if log_messages_to_file:
if my_settings.log_messages_to_file:
logger.debug("System: Logging Messages to disk")
if syslog_to_file:
if my_settings.syslog_to_file:
logger.debug("System: Logging System Logs to disk")
if bbs_enabled:
if my_settings.bbs_enabled:
logger.debug(f"System: BBS Enabled, {bbsdb} has {len(bbs_messages)} messages. Direct Mail Messages waiting: {(len(bbs_dm) - 1)}")
if bbs_link_enabled:
if my_settings.bbs_link_enabled:
if len(bbs_link_whitelist) > 0:
logger.debug(f"System: BBS Link Enabled with {len(bbs_link_whitelist)} peers")
else:
logger.debug(f"System: BBS Link Enabled allowing all")
if solar_conditions_enabled:
if my_settings.solar_conditions_enabled:
logger.debug("System: Celestial Telemetry Enabled")
if location_enabled:
if use_meteo_wxApi:
if my_settings.location_enabled:
if my_settings.use_meteo_wxApi:
logger.debug("System: Location Telemetry Enabled using Open-Meteo API")
else:
logger.debug("System: Location Telemetry Enabled using NOAA API")
if dad_jokes_enabled:
if my_settings.dad_jokes_enabled:
logger.debug("System: Dad Jokes Enabled!")
if coastalEnabled:
if my_settings.coastalEnabled:
logger.debug("System: Coastal Forecast and Tide Enabled!")
if games_enabled:
if my_settings.games_enabled:
logger.debug("System: Games Enabled!")
if wikipedia_enabled:
if use_kiwix_server:
if my_settings.wikipedia_enabled:
if my_settings.use_kiwix_server:
logger.debug(f"System: Wikipedia search Enabled using Kiwix server at {kiwix_url}")
else:
logger.debug("System: Wikipedia search Enabled")
if rssEnable:
if my_settings.rssEnable:
logger.debug(f"System: RSS Feed Reader Enabled for feeds: {rssFeedNames}")
if motd_enabled:
logger.debug(f"System: MOTD Enabled using {MOTD} scheduler:{schedulerMotd}")
if my_settings.MOTD_enabled:
logger.debug(f"System: MOTD Enabled using {my_settings.MOTD} scheduler:{my_settings.schedulerMOTD}")
if sentry_enabled:
logger.debug(f"System: Sentry Mode Enabled {sentry_radius}m radius reporting to channel:{secure_channel} requestLOC:{reqLocationEnabled}")
if sentryIgnoreList:
logger.debug(f"System: Sentry BlockList Enabled for nodes: {sentryIgnoreList}")
if sentryWatchList:
logger.debug(f"System: Sentry WatchList Enabled for nodes: {sentryWatchList}")
if my_settings.sentry_enabled:
logger.debug(f"System: Sentry Mode Enabled {my_settings.sentry_radius}m radius reporting to channel:{my_settings.secure_channel} requestLOC:{reqLocationEnabled}")
if my_settings.sentryIgnoreList:
logger.debug(f"System: Sentry BlockList Enabled for nodes: {my_settings.sentryIgnoreList}")
if my_settings.sentryWatchList:
logger.debug(f"System: Sentry WatchList Enabled for nodes: {my_settings.sentryWatchList}")
if highfly_enabled:
logger.debug(f"System: HighFly Enabled using {highfly_altitude}m limit reporting to channel:{highfly_channel}")
if my_settings.highfly_enabled:
logger.debug(f"System: HighFly Enabled using {my_settings.highfly_altitude}m limit reporting to channel:{my_settings.highfly_channel}")
if store_forward_enabled:
logger.debug(f"System: S&F(messages command) Enabled using limit: {storeFlimit} and reverse queue:{reverseSF}")
if my_settings.store_forward_enabled:
logger.debug(f"System: S&F(messages command) Enabled using limit: {storeFlimit} and reverse queue:{my_settings.reverseSF}")
if enableEcho:
if my_settings.enableEcho:
logger.debug("System: Echo command Enabled")
if repeater_enabled and multiple_interface:
logger.debug(f"System: Repeater Enabled for Channels: {repeater_channels}")
if my_settings.repeater_enabled and multiple_interface:
logger.debug(f"System: Repeater Enabled for Channels: {my_settings.repeater_channels}")
if radio_detection_enabled:
logger.debug(f"System: Radio Detection Enabled using rigctld at {rigControlServerAddress} broadcasting to channels: {sigWatchBroadcastCh} for {get_freq_common_name(get_hamlib('f'))}")
if my_settings.radio_detection_enabled:
logger.debug(f"System: Radio Detection Enabled using rigctld at {my_settings.rigControlServerAddress} broadcasting to channels: {my_settings.sigWatchBroadcastCh} for {get_freq_common_name(get_hamlib('f'))}")
if file_monitor_enabled:
logger.warning(f"System: File Monitor Enabled for {file_monitor_file_path}, broadcasting to channels: {file_monitor_broadcastCh}")
if enable_runShellCmd:
if my_settings.file_monitor_enabled:
logger.warning(f"System: File Monitor Enabled for {my_settings.file_monitor_file_path}, broadcasting to channels: {my_settings.file_monitor_broadcastCh}")
if my_settings.enable_runShellCmd:
logger.debug("System: Shell Command monitor enabled")
if allowXcmd:
if my_settings.allowXcmd:
logger.warning("System: File Monitor shell XCMD Enabled")
if read_news_enabled:
logger.debug(f"System: File Monitor News Reader Enabled for {news_file_path}")
if bee_enabled:
if my_settings.read_news_enabled:
logger.debug(f"System: File Monitor News Reader Enabled for {my_settings.news_file_path}")
if my_settings.bee_enabled:
logger.debug("System: File Monitor Bee Monitor Enabled for bee.txt")
if wxAlertBroadcastEnabled:
logger.debug(f"System: Weather Alert Broadcast Enabled on channels {wxAlertBroadcastChannel}")
if my_settings.wxAlertBroadcastEnabled:
logger.debug(f"System: Weather Alert Broadcast Enabled on channels {my_settings.wxAlertBroadcastChannel}")
if emergencyAlertBrodcastEnabled:
logger.debug(f"System: Emergency Alert Broadcast Enabled on channels {emergencyAlertBroadcastCh} for FIPS codes {myStateFIPSList}")
if myStateFIPSList == ['']:
if my_settings.emergencyAlertBrodcastEnabled:
logger.debug(f"System: Emergency Alert Broadcast Enabled on channels {my_settings.emergencyAlertBroadcastCh} for FIPS codes {my_settings.myStateFIPSList}")
if my_settings.myStateFIPSList == ['']:
logger.warning("System: No FIPS codes set for iPAWS Alerts")
if emergency_responder_enabled:
logger.debug(f"System: Emergency Responder Enabled on channels {emergency_responder_alert_channel} for interface {emergency_responder_alert_interface}")
if my_settings.emergency_responder_enabled:
logger.debug(f"System: Emergency Responder Enabled on channels {my_settings.emergency_responder_alert_channel} for interface {my_settings.emergency_responder_alert_interface}")
if volcanoAlertBroadcastEnabled:
logger.debug(f"System: Volcano Alert Broadcast Enabled on channels {volcanoAlertBroadcastChannel}")
if my_settings.volcanoAlertBroadcastEnabled:
logger.debug(f"System: Volcano Alert Broadcast Enabled on channels {my_settings.volcanoAlertBroadcastChannel}")
if qrz_hello_enabled:
if train_qrz:
if my_settings.qrz_hello_enabled:
if my_settings.train_qrz:
logger.debug("System: QRZ Welcome/Hello Enabled with training mode")
else:
logger.debug("System: QRZ Welcome/Hello Enabled")
if checklist_enabled:
if my_settings.checklist_enabled:
logger.debug("System: CheckList Module Enabled")
if ignoreChannels:
logger.debug(f"System: Ignoring Channels: {ignoreChannels}")
if my_settings.ignoreChannels:
logger.debug(f"System: Ignoring Channels: {my_settings.ignoreChannels}")
if noisyNodeLogging:
if my_settings.noisyNodeLogging:
logger.debug("System: Noisy Node Logging Enabled")
if logMetaStats:
if my_settings.logMetaStats:
logger.debug("System: Logging Metadata Stats Enabled, leaderboard")
loadLeaderboard()
if enableSMTP:
if enableImap:
if my_settings.enableSMTP:
if my_settings.enableImap:
logger.debug("System: SMTP Email Alerting Enabled using IMAP")
else:
logger.warning("System: SMTP Email Alerting Enabled")
if scheduler_enabled:
if my_settings.scheduler_enabled:
# setup the scheduler
from modules.scheduler import setup_scheduler
await setup_scheduler(
schedulerMotd, MOTD, schedulerMessage, schedulerChannel, schedulerInterface,
schedulerValue, schedulerTime, schedulerInterval, logger, BroadcastScheduler
scheduler(
my_settings.MOTD,
my_settings.MOTD,
my_settings.schedulerMessage,
my_settings.schedulerChannel,
my_settings.schedulerInterface,
my_settings.schedulerValue,
my_settings.schedulerTime,
my_settings.schedulerInterval,
logger,
my_settings.BroadcastScheduler
)
)
# here we go loopty loo
@@ -1993,13 +2007,13 @@ async def main():
tasks.append(asyncio.create_task(watchdog(), name="watchdog"))
# Add optional tasks
if file_monitor_enabled:
if my_settings.file_monitor_enabled:
tasks.append(asyncio.create_task(handleFileWatcher(), name="file_monitor"))
if radio_detection_enabled:
if my_settings.radio_detection_enabled:
tasks.append(asyncio.create_task(handleSignalWatcher(), name="hamlib"))
if voxDetectionEnabled:
if my_settings.voxDetectionEnabled:
tasks.append(asyncio.create_task(voxMonitor(), name="vox_detection"))
logger.debug(f"System: Starting {len(tasks)} async tasks")

View File

@@ -2,7 +2,8 @@
# K7MHI Kelly Keeton 2024
import pickle # pip install pickle
from modules.log import *
from modules.log import logger
from modules.settings import bbs_admin_list, bbs_ban_list, MESSAGE_CHUNK_SIZE, bbs_link_enabled, bbs_link_whitelist, responseDelay
import time
from datetime import datetime

View File

@@ -2,7 +2,8 @@
# K7MHI Kelly Keeton 2024
import sqlite3
from modules.log import *
from modules.log import logger
from modules.settings import checklist_db, reverse_in_out, bbs_ban_list
import time
trap_list_checklist = ("checkin", "checkout", "checklist", "purgein", "purgeout")

View File

@@ -1,7 +1,17 @@
# File monitor module for the meshing-around bot
# 2024 Kelly Keeton K7MHI
from modules.log import *
from modules.log import logger
from modules.settings import (
file_monitor_file_path,
news_file_path,
news_random_line_only,
allowXcmd,
bbs_admin_list,
xCmd2factorEnabled,
xCmd2factor_timeout,
enable_runShellCmd
)
import asyncio
import random
import os

View File

@@ -2,7 +2,8 @@
# Adapted for Meshtastic mesh-bot by K7MHI Kelly Keeton 2024
from random import choices, shuffle
from modules.log import *
from modules.log import logger
from modules.settings import jackTracker
import time
import pickle

View File

@@ -4,7 +4,7 @@
import random
import time
import pickle
from modules.log import *
from modules.log import logger
# Global variables
total_days = 7 # number of days or rotations the player has to play

View File

@@ -4,7 +4,7 @@
import random
import time
import pickle
from modules.log import *
from modules.log import logger
# Clubs setup
driver_distances = list(range(230, 280, 5))

View File

@@ -9,7 +9,7 @@
import json
import random
import os
from modules.log import *
from modules.log import logger
class HamTest:
def __init__(self):

View File

@@ -1,5 +1,5 @@
# Written for Meshtastic mesh-bot by ZR1RF Johannes le Roux 2025
from modules.log import *
from modules.log import logger, getPrettyTime
import os
import json
import random

View File

@@ -3,7 +3,7 @@
# As a Ham, is this obsecuring the meaning of the joke? Or is it enhancing it?
from dadjokes import Dadjoke # pip install dadjokes
import random
from modules.log import *
from modules.log import logger, getPrettyTime
lameJokes = [
"Why don't scientists trust atoms? Because they make up everything!",

View File

@@ -6,8 +6,8 @@ from random import randrange, uniform # random numbers
from types import SimpleNamespace # namespaces support
import pickle # pickle file support
import time # time functions
from modules.log import * # mesh-bot logging
from modules.log import logger # mesh-bot logging
from modules.system import lemonadeTracker # player tracking
import locale # culture specific locale
import math # math functions
import re # regular expressions

View File

@@ -12,7 +12,7 @@ Game Rules:
"""
import pickle
from modules.log import *
from modules.log import logger, getPrettyTime
from datetime import datetime, timedelta
from geopy.distance import geodesic

View File

@@ -4,7 +4,8 @@
import random
import time
import pickle
from modules.log import *
from modules.log import logger
from modules.system import mindTracker
def chooseDifficultyMMind(message):
usrInput = message.lower()

View File

@@ -11,7 +11,8 @@
import json
import os
import random
from modules.log import *
from modules.log import logger
from modules.settings import bbs_admin_list
QUIZ_JSON = os.path.join(os.path.dirname(__file__), '../', '../', 'data', 'quiz_questions.json')
QUIZMASTER_ID = bbs_admin_list

View File

@@ -1,13 +1,13 @@
# Tic-Tac-Toe game for Meshtastic mesh-bot
# Board positions chosen by numbers 1-9
# 2025
from modules.log import *
import random
import time
import modules.settings as my_settings
# to (max), molly and jake, I miss you both so much.
if disable_emojis_in_games:
if my_settings.disable_emojis_in_games:
X = "X"
O = "O"
else:
@@ -65,7 +65,7 @@ class TicTacToe:
row = ""
for j in range(3):
pos = i * 3 + j
if disable_emojis_in_games:
if my_settings.disable_emojis_in_games:
cell = b[pos] if b[pos] != " " else str(pos + 1)
else:
cell = b[pos] if b[pos] != " " else f" {str(pos + 1)} "
@@ -74,7 +74,6 @@ class TicTacToe:
row += " | "
board_str += row
if i < 2:
#board_str += "\n-+-+-\n"
board_str += "\n"
return board_str + "\n"

View File

@@ -3,7 +3,7 @@
import random
import time
import pickle
from modules.log import *
from modules.log import logger, getPrettyTime
vpStartingCash = 20
# Define the Card class

View File

@@ -1,6 +1,6 @@
# python word of the day game module for meshing-around bot
# 2025 K7MHI Kelly Keeton
from modules.log import *
from modules.log import logger, getPrettyTime
import random
import json
import os

View File

@@ -2,12 +2,13 @@
# K7MHI Kelly Keeton 2024
import json # pip install json
from geopy.geocoders import Nominatim # pip install geopy
import maidenhead as mh # pip install maidenhead
#from geopy.geocoders import Nominatim # pip install geopy
#import maidenhead as mh # pip install maidenhead
import requests # pip install requests
import bs4 as bs # pip install beautifulsoup4
import xml.dom.minidom
from modules.log import *
#import xml.dom.minidom
from modules.log import logger
from modules.settings import urlTimeoutSeconds, NO_ALERTS, myRegionalKeysDE
trap_list_location_eu = ("ukalert", "ukwx", "ukflood")
trap_list_location_de = ("dealert", "dewx", "deflood")
@@ -22,7 +23,7 @@ def get_govUK_alerts(lat, lon):
alert = soup.find('h2', class_='govuk-heading-m', id='alert-status')
except Exception as e:
logger.warning("Error getting UK alerts: " + str(e))
return NO_ALERTS
return
if alert:
return "🚨" + alert.get_text(strip=True)

View File

@@ -7,7 +7,7 @@
# https://pythonhosted.org/RPIO/
import RPIO
from modules.log import *
from modules.log import logger, getPrettyTime
trap_list_gpio = ("gpio", "pin", "relay", "switch", "pwm")
# set up input channel without pull-up

View File

@@ -2,7 +2,8 @@
# LLM Module for meshing-around
# This module is used to interact with LLM API to generate responses to user input
# K7MHI Kelly Keeton 2024
from modules.log import *
from modules.log import logger
from modules.settings import llmModel, ollamaHostName, rawLLMQuery
# Ollama Client
# https://github.com/ollama/ollama/blob/main/docs/faq.md#how-do-i-configure-ollama-server

View File

@@ -6,14 +6,15 @@ from geopy.geocoders import Nominatim # pip install geopy
import maidenhead as mh # pip install maidenhead
import requests # pip install requests
import bs4 as bs # pip install beautifulsoup4
import xml.dom.minidom
import xml.dom.minidom # used for parsing XML
import xml.parsers.expat # used for parsing XML
from datetime import datetime
from modules.log import *
from modules.log import logger
import modules.settings as my_settings
import math
import csv
import os
trap_list_location = ("whereami", "wx", "wxa", "wxalert", "rlist", "ea", "ealert", "riverflow", "valert", "earthquake", "howfar", "map",)
def where_am_i(lat=0, lon=0, short=False, zip=False):
@@ -23,7 +24,7 @@ def where_am_i(lat=0, lon=0, short=False, zip=False):
if int(float(lat)) == 0 and int(float(lon)) == 0:
logger.error("Location: No GPS data, try sending location")
return NO_DATA_NOGPS
return my_settings.NO_DATA_NOGPS
# initialize Nominatim API
geolocator = Nominatim(user_agent="mesh-bot")
@@ -43,7 +44,7 @@ def where_am_i(lat=0, lon=0, short=False, zip=False):
whereIam = location.raw['address'].get('postcode', '')
return whereIam
if float(lat) == latitudeValue and float(lon) == longitudeValue:
if float(lat) == my_settings.latitudeValue and float(lon) == my_settings.longitudeValue:
# redacted address when no GPS and using default location
location = geolocator.reverse(str(lat) + ", " + str(lon))
address = location.raw['address']
@@ -72,7 +73,7 @@ def where_am_i(lat=0, lon=0, short=False, zip=False):
return whereIam
except Exception as e:
logger.debug("Location:Error fetching location data with whereami, likely network error")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
def getRepeaterBook(lat=0, lon=0):
grid = mh.to_maiden(float(lat), float(lon))
@@ -90,7 +91,7 @@ def getRepeaterBook(lat=0, lon=0):
try:
msg = ''
user_agent = {'User-agent': 'Mozilla/5.0'}
response = requests.get(repeater_url, headers=user_agent, timeout=urlTimeoutSeconds)
response = requests.get(repeater_url, headers=user_agent, timeout=my_settings.urlTimeoutSeconds)
if response.status_code!=200:
logger.error(f"Location:Error fetching repeater data from {repeater_url} with status code {response.status_code}")
soup = bs.BeautifulSoup(response.text, 'html.parser')
@@ -129,13 +130,13 @@ def getArtSciRepeaters(lat=0, lon=0):
#grid = mh.to_maiden(float(lat), float(lon))
repeaters = []
zipCode = where_am_i(lat, lon, zip=True)
if zipCode == NO_DATA_NOGPS or zipCode == ERROR_FETCHING_DATA:
if zipCode == my_settings.NO_DATA_NOGPS or zipCode == my_settings.ERROR_FETCHING_DATA:
return zipCode
if zipCode.isnumeric():
try:
artsci_url = f"http://www.artscipub.com/mobile/showstate.asp?zip={zipCode}"
response = requests.get(artsci_url, timeout=urlTimeoutSeconds)
response = requests.get(artsci_url, timeout=my_settings.urlTimeoutSeconds)
if response.status_code!=200:
logger.error(f"Location:Error fetching data from {artsci_url} with status code {response.status_code}")
soup = bs.BeautifulSoup(response.text, 'html.parser')
@@ -177,16 +178,16 @@ def get_NOAAtide(lat=0, lon=0):
station_id = ""
location = lat,lon
if float(lat) == 0 and float(lon) == 0:
lat = latitudeValue
lon = longitudeValue
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
station_lookup_url = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/tidepredstations.json?lat=" + str(lat) + "&lon=" + str(lon) + "&radius=50"
try:
station_data = requests.get(station_lookup_url, timeout=urlTimeoutSeconds)
station_data = requests.get(station_lookup_url, timeout=my_settings.urlTimeoutSeconds)
if station_data.ok:
station_json = station_data.json()
else:
logger.error("Location:Error fetching tide station table from NOAA")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
if station_json['stationList'] == [] or station_json['stationList'] is None:
logger.error("Location:No tide station found")
@@ -196,26 +197,26 @@ def get_NOAAtide(lat=0, lon=0):
except (requests.exceptions.RequestException, json.JSONDecodeError):
logger.error("Location:Error fetching tide station table from NOAA")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
station_url = "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=today&time_zone=lst_ldt&datum=MLLW&product=predictions&interval=hilo&format=json&station=" + station_id
if use_metric:
if my_settings.use_metric:
station_url += "&units=metric"
else:
station_url += "&units=english"
try:
tide_data = requests.get(station_url, timeout=urlTimeoutSeconds)
tide_data = requests.get(station_url, timeout=my_settings.urlTimeoutSeconds)
if tide_data.ok:
tide_json = tide_data.json()
else:
logger.error("Location:Error fetching tide data from NOAA")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except (requests.exceptions.RequestException, json.JSONDecodeError):
logger.error("Location:Error fetching tide data from NOAA")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
tide_data = tide_json['predictions']
@@ -225,7 +226,7 @@ def get_NOAAtide(lat=0, lon=0):
tide_table = "Tide Data for " + tide_date + "\n"
for tide in tide_data:
tide_time = tide['t'].split(" ")[1]
if not zuluTime:
if not my_settings.zuluTime:
# convert to 12 hour clock
if int(tide_time.split(":")[0]) > 12:
tide_time = str(int(tide_time.split(":")[0]) - 12) + ":" + tide_time.split(":")[1] + " PM"
@@ -242,11 +243,11 @@ def get_NOAAweather(lat=0, lon=0, unit=0):
weather = ""
location = lat,lon
if float(lat) == 0 and float(lon) == 0:
lat = latitudeValue
lon = longitudeValue
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
# get weather data from NOAA units for metric unit = 1 is metric
if use_metric:
if my_settings.use_metric:
unit = 1
logger.debug("Location: new API metric units not implemented yet")
@@ -254,29 +255,29 @@ def get_NOAAweather(lat=0, lon=0, unit=0):
weather_api = "https://api.weather.gov/points/" + str(lat) + "," + str(lon)
# extract the "forecast": property from the JSON response
try:
weather_data = requests.get(weather_api, timeout=urlTimeoutSeconds)
weather_data = requests.get(weather_api, timeout=my_settings.urlTimeoutSeconds)
if not weather_data.ok:
logger.warning("Location:Error fetching weather data from NOAA for location")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except Exception:
logger.warning(f"Location:Error fetching weather data error: {Exception}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
# get the forecast URL from the JSON response
weather_json = weather_data.json()
forecast_url = weather_json['properties']['forecast']
try:
forecast_data = requests.get(forecast_url, timeout=urlTimeoutSeconds)
forecast_data = requests.get(forecast_url, timeout=my_settings.urlTimeoutSeconds)
if not forecast_data.ok:
logger.warning("Location:Error fetching weather forecast from NOAA")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except Exception:
logger.warning(f"Location:Error fetching weather data error: {Exception}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
# from periods, get the detailedForecast from number of days in NOAAforecastDuration
forecast_json = forecast_data.json()
forecast = forecast_json['properties']['periods']
for day in forecast[:forecastDuration]:
for day in forecast[:my_settings.forecastDuration]:
# abreviate the forecast
weather += abbreviate_noaa(day['name']) + ": " + abbreviate_noaa(day['detailedForecast']) + "\n"
@@ -286,7 +287,7 @@ def get_NOAAweather(lat=0, lon=0, unit=0):
# get any alerts and return the count
alerts = getWeatherAlertsNOAA(lat, lon)
if alerts == ERROR_FETCHING_DATA or alerts == NO_DATA_NOGPS or alerts == NO_ALERTS:
if alerts == my_settings.ERROR_FETCHING_DATA or alerts == my_settings.NO_DATA_NOGPS or alerts == my_settings.NO_ALERTS:
alert = ""
alert_num = 0
else:
@@ -395,36 +396,36 @@ def getWeatherAlertsNOAA(lat=0, lon=0, useDefaultLatLon=False):
alerts = ""
location = lat,lon
if useDefaultLatLon:
lat = latitudeValue
lon = longitudeValue
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
if float(lat) == 0 and float(lon) == 0 and not useDefaultLatLon:
return NO_DATA_NOGPS
return my_settings.NO_DATA_NOGPS
alert_url = "https://api.weather.gov/alerts/active.atom?point=" + str(lat) + "," + str(lon)
#alert_url = "https://api.weather.gov/alerts/active.atom?area=WA"
#logger.debug("Location:Fetching weather alerts from NOAA for " + str(lat) + ", " + str(lon))
try:
alert_data = requests.get(alert_url, timeout=urlTimeoutSeconds)
alert_data = requests.get(alert_url, timeout=my_settings.urlTimeoutSeconds)
if not alert_data.ok:
logger.warning("Location:Error fetching weather alerts from NOAA")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except Exception:
logger.warning(f"Location:Error fetching weather data error: {Exception}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
alerts = ""
alertxml = xml.dom.minidom.parseString(alert_data.text)
for i in alertxml.getElementsByTagName("entry"):
title = i.getElementsByTagName("title")[0].childNodes[0].nodeValue
area_desc = i.getElementsByTagName("cap:areaDesc")[0].childNodes[0].nodeValue
if enableExtraLocationWx:
if my_settings.enableExtraLocationWx:
alerts += f"{title}. {area_desc.replace(' ', '')}\n"
else:
alerts += f"{title}\n"
if alerts == "" or alerts == None:
return NO_ALERTS
return my_settings.NO_ALERTS
# trim off last newline
if alerts[-1] == "\n":
@@ -437,23 +438,23 @@ def getWeatherAlertsNOAA(lat=0, lon=0, useDefaultLatLon=False):
alerts = abbreviate_noaa(alerts)
# return the first ALERT_COUNT alerts
data = "\n".join(alerts.split("\n")[:numWxAlerts]), alert_num
data = "\n".join(alerts.split("\n")[:my_settings.numWxAlerts]), alert_num
return data
wxAlertCacheNOAA = ""
def alertBrodcastNOAA():
# get the latest weather alerts and broadcast them if there are any
global wxAlertCacheNOAA
currentAlert = getWeatherAlertsNOAA(latitudeValue, longitudeValue)
currentAlert = getWeatherAlertsNOAA(my_settings.latitudeValue, my_settings.longitudeValue)
# check if any reason to discard the alerts
if currentAlert == ERROR_FETCHING_DATA or currentAlert == NO_DATA_NOGPS:
if currentAlert == my_settings.ERROR_FETCHING_DATA or currentAlert == my_settings.NO_DATA_NOGPS:
return False
elif currentAlert == NO_ALERTS:
elif currentAlert == my_settings.NO_ALERTS:
wxAlertCacheNOAA = ""
return False
if ignoreEASenable:
if my_settings.ignoreEASenable:
# check if the alert is in the ignoreEAS list
for word in ignoreEASwords:
for word in my_settings.ignoreEASwords:
if word.lower() in currentAlert[0].lower():
logger.debug(f"Location:Ignoring NOAA Alert: {currentAlert[0]} containing {word}")
return False
@@ -471,21 +472,21 @@ def getActiveWeatherAlertsDetailNOAA(lat=0, lon=0):
alerts = ""
location = lat,lon
if float(lat) == 0 and float(lon) == 0:
lat = latitudeValue
lon = longitudeValue
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
alert_url = "https://api.weather.gov/alerts/active.atom?point=" + str(lat) + "," + str(lon)
#alert_url = "https://api.weather.gov/alerts/active.atom?area=WA"
#logger.debug("Location:Fetching weather alerts detailed from NOAA for " + str(lat) + ", " + str(lon))
try:
alert_data = requests.get(alert_url, timeout=urlTimeoutSeconds)
alert_data = requests.get(alert_url, timeout=my_settings.urlTimeoutSeconds)
if not alert_data.ok:
logger.warning("Location:Error fetching weather alerts from NOAA")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except Exception:
logger.warning(f"Location:Error fetching weather data error: {Exception}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
alerts = ""
alertxml = xml.dom.minidom.parseString(alert_data.text)
@@ -505,10 +506,10 @@ def getActiveWeatherAlertsDetailNOAA(lat=0, lon=0):
alerts = abbreviate_noaa(alerts)
# trim the alerts to the first ALERT_COUNT
alerts = alerts.split("\n***\n")[:numWxAlerts]
alerts = alerts.split("\n***\n")[:my_settings.numWxAlerts]
if alerts == "" or alerts == ['']:
return NO_ALERTS
return my_settings.NO_ALERTS
# trim off last newline
if alerts[-1] == "\n":
@@ -530,13 +531,13 @@ def getIpawsAlert(lat=0, lon=0, shortAlerts = False):
# get the alerts from FEMA
try:
alert_data = requests.get(alert_url, timeout=urlTimeoutSeconds)
alert_data = requests.get(alert_url, timeout=my_settings.urlTimeoutSeconds)
if not alert_data.ok:
logger.warning(f"System: iPAWS fetching IPAWS alerts from FEMA (HTTP {alert_data.status_code})")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except Exception as e:
logger.warning(f"System: iPAWS fetching IPAWS alerts from FEMA failed: {e}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
# main feed bulletins
alertxml = xml.dom.minidom.parseString(alert_data.text)
@@ -558,13 +559,13 @@ def getIpawsAlert(lat=0, lon=0, shortAlerts = False):
continue
# check if it matches your list
if stateFips not in myStateFIPSList:
#logger.debug(f"Skipping FEMA record link {link} with stateFIPS code of: {stateFips} because it doesn't match our StateFIPSList {myStateFIPSList}")
if stateFips not in my_settings.myStateFIPSList:
#logger.debug(f"Skipping FEMA record link {link} with stateFIPS code of: {stateFips} because it doesn't match our StateFIPSList {my_settings.myStateFIPSList}")
continue # skip to next entry
try:
# get the linked alert data from FEMA
linked_data = requests.get(link, timeout=urlTimeoutSeconds)
linked_data = requests.get(link, timeout=my_settings.urlTimeoutSeconds)
if not linked_data.ok or not linked_data.text.strip():
# if the linked data is not ok, skip this alert
#logger.warning(f"System: iPAWS Error fetching linked alert data from {link}")
@@ -616,14 +617,14 @@ def getIpawsAlert(lat=0, lon=0, shortAlerts = False):
continue
# check if the alert is for the SAME location, if wanted keep alert
if (sameVal in mySAMEList) or (geocode_value in mySAMEList) or mySAMEList == ['']:
if (sameVal in my_settings.mySAMEList) or (geocode_value in my_settings.mySAMEList) or my_settings.mySAMEList == ['']:
ignore_alert = False
if ignoreFEMAenable:
if my_settings.ignoreFEMAenable:
ignore_alert = any(
word.lower() in headline.lower()
for word in ignoreFEMAwords)
for word in my_settings.ignoreFEMAwords)
if ignore_alert:
logger.debug(f"System: Filtering FEMA Alert by WORD: {headline} containing one of {ignoreFEMAwords} at {areaDesc}")
logger.debug(f"System: Filtering FEMA Alert by WORD: {headline} containing one of {my_settings.ignoreFEMAwords} at {areaDesc}")
if ignore_alert:
continue
@@ -643,16 +644,16 @@ def getIpawsAlert(lat=0, lon=0, shortAlerts = False):
# return the numWxAlerts of alerts
if len(alerts) > 0:
for alertItem in alerts[:numWxAlerts]:
for alertItem in alerts[:my_settings.numWxAlerts]:
if shortAlerts:
alert += abbreviate_noaa(f"🚨FEMA Alert: {alertItem['headline']}")
else:
alert += abbreviate_noaa(f"🚨FEMA Alert: {alertItem['headline']}\n{alertItem['description']}")
# add a newline if not the last alert
if alertItem != alerts[:numWxAlerts][-1]:
if alertItem != alerts[:my_settings.numWxAlerts][-1]:
alert += "\n"
else:
alert = NO_ALERTS
alert = my_settings.NO_ALERTS
return alert
@@ -665,22 +666,22 @@ def get_flood_noaa(lat=0, lon=0, uid=None):
headers = {'accept': 'application/json'}
if not uid:
logger.warning(f"Location:No flood gauge data found for UID {uid}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
try:
response = requests.get(api_url + str(uid), headers=headers, timeout=urlTimeoutSeconds)
response = requests.get(api_url + str(uid), headers=headers, timeout=my_settings.urlTimeoutSeconds)
if not response.ok:
logger.warning(f"Location:Error fetching flood gauge data from NOAA for {uid} (HTTP {response.status_code})")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
data = response.json()
if not data or 'status' not in data:
logger.warning(f"Location:No flood gauge data found for UID {uid}")
return "No flood gauge data found"
except requests.exceptions.RequestException as e:
logger.warning(f"Location:Error fetching flood gauge data from: {api_url}{uid} ({e})")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except Exception as e:
logger.warning(f"Location:Unexpected error: {e}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
# extract values from JSON safely
try:
@@ -696,35 +697,35 @@ def get_flood_noaa(lat=0, lon=0, uid=None):
return flood_data
except Exception as e:
logger.debug(f"Location:Error extracting flood gauge data from NOAA for {uid}: {e}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
def get_volcano_usgs(lat=0, lon=0):
alerts = ''
if lat == 0 and lon == 0:
lat = latitudeValue
lon = longitudeValue
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
# get the latest volcano alert from USGS from CAP feed
usgs_volcano_url = "https://volcanoes.usgs.gov/hans-public/api/volcano/getCapElevated"
try:
volcano_data = requests.get(usgs_volcano_url, timeout=urlTimeoutSeconds)
volcano_data = requests.get(usgs_volcano_url, timeout=my_settings.urlTimeoutSeconds)
if not volcano_data.ok:
logger.warning("System: Issue with fetching volcano alerts from USGS")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except (requests.exceptions.RequestException):
logger.warning("System: Issue with fetching volcano alerts from USGS")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
volcano_json = volcano_data.json()
# extract alerts from main feed
if volcano_json and isinstance(volcano_json, list):
for alert in volcano_json:
# check ignore list
if ignoreUSGSEnable:
for word in ignoreUSGSwords:
if my_settings.ignoreUSGSEnable:
for word in my_settings.ignoreUSGSwords:
if word.lower() in alert['volcano_name_appended'].lower():
logger.debug(f"System: Ignoring USGS Alert: {alert['volcano_name_appended']} containing {word}")
continue
# check if the alert lat long is within the range of bot latitudeValue and longitudeValue
if (alert['latitude'] >= latitudeValue - 10 and alert['latitude'] <= latitudeValue + 10) and (alert['longitude'] >= longitudeValue - 10 and alert['longitude'] <= longitudeValue + 10):
if (alert['latitude'] >= my_settings.latitudeValue - 10 and alert['latitude'] <= my_settings.latitudeValue + 10) and (alert['longitude'] >= my_settings.longitudeValue - 10 and alert['longitude'] <= my_settings.longitudeValue + 10):
volcano_name = alert['volcano_name_appended']
alert_level = alert['alert_level']
color_code = alert['color_code']
@@ -737,9 +738,9 @@ def get_volcano_usgs(lat=0, lon=0):
continue
else:
logger.debug("Location:Error fetching volcano data from USGS")
return NO_ALERTS
return my_settings.NO_ALERTS
if alerts == "":
return NO_ALERTS
return my_settings.NO_ALERTS
# trim off last newline
if alerts[-1] == "\n":
alerts = alerts[:-1]
@@ -750,13 +751,13 @@ def get_volcano_usgs(lat=0, lon=0):
def get_nws_marine(zone, days=3):
# forecast from NWS coastal products
try:
marine_pz_data = requests.get(zone, timeout=urlTimeoutSeconds)
marine_pz_data = requests.get(zone, timeout=my_settings.urlTimeoutSeconds)
if not marine_pz_data.ok:
logger.warning(f"Location:Error fetching NWS Marine data (HTTP {marine_pz_data.status_code})")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except requests.exceptions.RequestException as e:
logger.warning(f"Location:Error fetching NWS Marine data: {e}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
marine_pz_data = marine_pz_data.text
todayDate = datetime.now().strftime("%Y%m%d")
@@ -766,13 +767,13 @@ def get_nws_marine(zone, days=3):
expires_date = expires[:8]
if expires_date < todayDate:
logger.debug("Location: NWS Marine PZ data expired")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except Exception as e:
logger.debug(f"Location: NWS Marine PZ data parse error: {e}")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
else:
logger.debug("Location: NWS Marine PZ data not valid or empty")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
# process the marine forecast data
marine_pzz_lines = marine_pz_data.split("\n")
@@ -794,7 +795,7 @@ def get_nws_marine(zone, days=3):
day_blocks.append(current_block.strip())
# Only keep up to pzDays blocks
for block in day_blocks[:days]:
for block in day_blocks[:my_settings.coastalForecastDays]:
marine_pz_report += block + "\n"
# remove last newline
@@ -808,13 +809,13 @@ def get_nws_marine(zone, days=3):
# abbreviate the report
marine_pz_report = abbreviate_noaa(marine_pz_report)
if marine_pz_report == "":
return NO_DATA_NOGPS
return my_settings.NO_DATA_NOGPS
return marine_pz_report
def checkUSGSEarthQuake(lat=0, lon=0):
if lat == 0 and lon == 0:
lat = latitudeValue
lon = longitudeValue
lat = my_settings.latitudeValue
lon = my_settings.longitudeValue
radius = 100 # km
magnitude = 1.5
history = 7 # days
@@ -824,20 +825,20 @@ def checkUSGSEarthQuake(lat=0, lon=0):
quake_count = 0
# fetch the earthquake data from USGS
try:
quake_data = requests.get(USGSquake_url, timeout=urlTimeoutSeconds)
quake_data = requests.get(USGSquake_url, timeout=my_settings.urlTimeoutSeconds)
if not quake_data.ok:
logger.warning("Location:Error fetching earthquake data from USGS")
return NO_ALERTS
return my_settings.NO_ALERTS
if not quake_data.text.strip():
return NO_ALERTS
return my_settings.NO_ALERTS
try:
quake_xml = xml.dom.minidom.parseString(quake_data.text)
except Exception as e:
logger.warning(f"Location: USGS earthquake API returned invalid XML: {e}")
return NO_ALERTS
return my_settings.NO_ALERTS
except (requests.exceptions.RequestException):
logger.warning("Location:Error fetching earthquake data from USGS")
return NO_ALERTS
return my_settings.NO_ALERTS
quake_xml = xml.dom.minidom.parseString(quake_data.text)
quake_count = len(quake_xml.getElementsByTagName("event"))
@@ -853,7 +854,7 @@ def checkUSGSEarthQuake(lat=0, lon=0):
description_text = event.getElementsByTagName("description")[0].getElementsByTagName("text")[0].childNodes[0].nodeValue
largest_mag = round(largest_mag, 1)
if quake_count == 0:
return NO_ALERTS
return my_settings.NO_ALERTS
else:
return f"{quake_count} 🫨quakes in last {history} days within {radius} km. Largest: {largest_mag}M\n{description_text}"
@@ -867,7 +868,7 @@ def distance(lat=0,lon=0,nodeID=0, reset=False):
r = 6371 # Radius of earth in kilometers # haversine formula
if lat == 0 and lon == 0:
return NO_DATA_NOGPS
return my_settings.NO_DATA_NOGPS
if nodeID == 0:
return "No NodeID provided"
@@ -899,7 +900,7 @@ def distance(lat=0,lon=0,nodeID=0, reset=False):
c = 2 * math.asin(math.sqrt(a))
distance_km = c * r
if use_metric:
if my_settings.use_metric:
msg += f"{distance_km:.2f} km"
else:
distance_miles = distance_km * 0.621371
@@ -917,7 +918,7 @@ def distance(lat=0,lon=0,nodeID=0, reset=False):
time_diff = datetime.now() - last_point['time']
if time_diff.total_seconds() > 60:
hours = time_diff.total_seconds() / 3600
if use_metric:
if my_settings.use_metric:
speed = distance_km / hours
speed_str = f"{speed:.2f} km/h"
else:
@@ -941,7 +942,7 @@ def distance(lat=0,lon=0,nodeID=0, reset=False):
total_distance_km += c * r
# add the distance from last point to current point
total_distance_km += distance_km
if use_metric:
if my_settings.use_metric:
msg += f", Total: {total_distance_km:.2f} km"
else:
total_distance_miles = total_distance_km * 0.621371
@@ -974,7 +975,7 @@ def distance(lat=0,lon=0,nodeID=0, reset=False):
area = area * (6378137 ** 2) / 2.0
area = abs(area) / 1e6 # convert to square kilometers
if use_metric:
if my_settings.use_metric:
msg += f", Area: {area:.2f} sq.km (approx)"
else:
area_miles = area * 0.386102
@@ -1006,7 +1007,7 @@ def distance(lat=0,lon=0,nodeID=0, reset=False):
def get_openskynetwork(lat=0, lon=0):
# get the latest aircraft data from OpenSky Network in the area
if lat == 0 and lon == 0:
return NO_ALERTS
return my_settings.NO_ALERTS
# setup a bounding box of 50km around the lat/lon
box_size = 0.45 # approx 50km
# return limits for aircraft search
@@ -1019,16 +1020,16 @@ def get_openskynetwork(lat=0, lon=0):
# fetch the aircraft data from OpenSky Network
opensky_url = f"https://opensky-network.org/api/states/all?lamin={lamin}&lomin={lomin}&lamax={lamax}&lomax={lomax}"
try:
aircraft_data = requests.get(opensky_url, timeout=urlTimeoutSeconds)
aircraft_data = requests.get(opensky_url, timeout=my_settings.urlTimeoutSeconds)
if not aircraft_data.ok:
logger.warning("Location:Error fetching aircraft data from OpenSky Network")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
except (requests.exceptions.RequestException):
logger.warning("Location:Error fetching aircraft data from OpenSky Network")
return ERROR_FETCHING_DATA
return my_settings.ERROR_FETCHING_DATA
aircraft_json = aircraft_data.json()
if 'states' not in aircraft_json or not aircraft_json['states']:
return NO_ALERTS
return my_settings.NO_ALERTS
aircraft_list = aircraft_json['states']
aircraft_report = ""
for aircraft in aircraft_list:
@@ -1055,7 +1056,7 @@ def get_openskynetwork(lat=0, lon=0):
if aircraft_report.endswith("\n"):
aircraft_report = aircraft_report[:-1]
aircraft_report = abbreviate_noaa(aircraft_report)
return aircraft_report if aircraft_report else NO_ALERTS
return aircraft_report if aircraft_report else my_settings.NO_ALERTS
def log_locationData_toMap(userID, location, message):
"""

View File

@@ -1,11 +1,11 @@
import logging
from logging.handlers import TimedRotatingFileHandler
from modules.settings import *
import modules.settings as my_settings
# if LOGGING_LEVEL is not set in settings.py, default to DEBUG
if not LOGGING_LEVEL:
LOGGING_LEVEL = "DEBUG"
if not my_settings.LOGGING_LEVEL:
my_settings.LOGGING_LEVEL = "DEBUG"
LOGGING_LEVEL = getattr(logging, LOGGING_LEVEL)
LOGGING_LEVEL = getattr(logging, my_settings.LOGGING_LEVEL)
class CustomFormatter(logging.Formatter):
grey = '\x1b[38;21m'
@@ -70,16 +70,16 @@ stdout_handler.setFormatter(CustomFormatter(logFormat))
# Add handlers to the logger
logger.addHandler(stdout_handler)
if syslog_to_file:
if my_settings.syslog_to_file:
# Create file handler for logging to a file
file_handler_sys = TimedRotatingFileHandler('logs/meshbot.log', when='midnight', backupCount=log_backup_count, encoding='utf-8')
file_handler_sys = TimedRotatingFileHandler('logs/meshbot.log', when='midnight', backupCount=my_settings.log_backup_count, encoding='utf-8')
file_handler_sys.setLevel(LOGGING_LEVEL) # DEBUG used by default for system logs to disk
file_handler_sys.setFormatter(plainFormatter(logFormat))
logger.addHandler(file_handler_sys)
if log_messages_to_file:
if my_settings.log_messages_to_file:
# Create file handler for logging to a file
file_handler = TimedRotatingFileHandler('logs/messages.log', when='midnight', backupCount=log_backup_count, encoding='utf-8')
file_handler = TimedRotatingFileHandler('logs/messages.log', when='midnight', backupCount=my_settings.log_backup_count, encoding='utf-8')
file_handler.setLevel(logging.INFO) # INFO used for messages to disk
file_handler.setFormatter(logging.Formatter(msgLogFormat))
msgLogger.addHandler(file_handler)

View File

@@ -3,7 +3,8 @@
import os
import sqlite3
from modules.log import *
from modules.log import logger
from modules.settings import qrz_db
def initalize_qrz_database():
try:

View File

@@ -5,8 +5,25 @@
# requires vosk and sounddevice python modules. will auto download needed. more from https://alphacephei.com/vosk/models and unpack
# 2024 Kelly Keeton K7MHI
from modules.log import *
from modules.log import logger
import asyncio
from modules.settings import (
radio_detection_enabled,
rigControlServerAddress,
signalDetectionThreshold,
signalHoldTime,
signalCooldown,
signalCycleLimit,
voxDetectionEnabled,
useLocalVoxModel,
localVoxModelPath,
voxLanguage,
voxInputDevice,
voxTrapList,
voxOnTrapList,
voxEnableCmd,
ERROR_FETCHING_DATA
)
# verbose debug logging for trap words function
debugVoxTmsg = False

View File

@@ -1,5 +1,6 @@
# rss feed module for meshing-around 2025
from modules.log import *
from modules.log import logger
from modules.settings import rssFeedURL, rssFeedNames, rssMaxItems, rssTruncate, urlTimeoutSeconds, ERROR_FETCHING_DATA
import urllib.request
import xml.etree.ElementTree as ET
import html

View File

@@ -3,6 +3,7 @@
import asyncio
import schedule
from modules.log import logger
from modules.settings import MOTD
from modules.system import send_message
async def setup_scheduler(

View File

@@ -46,6 +46,13 @@ dwPlayerTracker = [] # DopeWars player tracker
jackTracker = [] # Jack game tracker
mindTracker = [] # Mastermind (mmind) game tracker
# Memory Management Constants
MAX_MSG_HISTORY = 250
MAX_CMD_HISTORY = 250
MAX_SEEN_NODES = 1000
CLEANUP_INTERVAL = 86400 # 24 hours in seconds
GAMEDELAY = 3 * CLEANUP_INTERVAL # 3 days in seconds
# Read the config file, if it does not exist, create basic config file
config = configparser.ConfigParser()
config_file = "config.ini"

View File

@@ -3,7 +3,12 @@
# https://avtech.com/articles/138/list-of-email-to-sms-addresses/
# 2024 Kelly Keeton K7MHI
from modules.log import *
from modules.log import logger
from modules.settings import (
SMTP_SERVER, SMTP_PORT, SMTP_AUTH, SMTP_USERNAME, SMTP_PASSWORD,
FROM_EMAIL, EMAIL_SUBJECT, enableImap, IMAP_SERVER, IMAP_PORT,
IMAP_USERNAME, IMAP_PASSWORD, IMAP_FOLDER, sysopEmails, bbs_ban_list
)
import pickle
import time
import smtplib

View File

@@ -7,7 +7,10 @@ import xml.dom.minidom
from datetime import datetime
import ephem # pip install pyephem
from datetime import timezone
from modules.log import *
from modules.log import logger, getPrettyTime
from modules.settings import (latitudeValue, longitudeValue, zuluTime,
n2yoAPIKey, urlTimeoutSeconds, use_metric,
ERROR_FETCHING_DATA, NO_DATA_NOGPS, NO_ALERTS)
import math
trap_list_solarconditions = ("sun", "moon", "solar", "hfcond", "satpass", "howtall")

View File

@@ -13,7 +13,8 @@ import os # For file operations
import csv
from datetime import datetime
from collections import Counter
from modules.log import *
from modules.log import logger
from modules.settings import surveyRecordLocation, surveyRecordID
allowedSurveys = [] # List of allowed survey names

View File

@@ -12,7 +12,8 @@ import base64
import contextlib # for suppressing output on watchdog
import io # for suppressing output on watchdog
# homebrew 'modules'
from modules.log import *
from modules.settings import *
from modules.log import logger, getPrettyTime, CustomFormatter
# Global Variables
trap_list = ("cmd","cmd?","bannode",) # base commands
@@ -22,13 +23,6 @@ games_enabled = False
multiPingList = [{'message_from_id': 0, 'count': 0, 'type': '', 'deviceID': 0, 'channel_number': 0, 'startCount': 0}]
interface_retry_count = 3
# Memory Management Constants
MAX_MSG_HISTORY = 250
MAX_CMD_HISTORY = 250
MAX_SEEN_NODES = 1000
CLEANUP_INTERVAL = 86400 # 24 hours in seconds
GAMEDELAY = 3 * CLEANUP_INTERVAL # 3 days in seconds
# Ping Configuration
if ping_enabled:
# ping, pinging, ack, testing, test, pong

View File

@@ -1,18 +1,22 @@
# test_bot.py
# Unit tests for various modules in the meshing-around project
import unittest
import os
import sys
# Add the parent directory to sys.path to allow module imports
parent_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, parent_path)
import unittest
import importlib
import pkgutil
import warnings
from modules.log import logger
from modules.settings import latitudeValue, longitudeValue
# Suppress ResourceWarning warnings for asyncio unclosed event here
warnings.filterwarnings("ignore", category=ResourceWarning)
# Add the parent directory to sys.path to allow module imports
parent_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_path)
modules_path = os.path.join(parent_path, 'modules')
# Limits API calls during testing
@@ -31,7 +35,7 @@ available_modules = [
try:
print("\nImporting Core Modules:")
from modules.log import *
from modules.log import logger, getPrettyTime
print(" ✔ Imported 'log'")
# Set location default
lat = latitudeValue

View File

@@ -1,6 +1,8 @@
# meshbot wiki module
from modules.log import *
from modules.log import logger
from modules.settings import (use_kiwix_server, kiwix_url, kiwix_library_name,
urlTimeoutSeconds, wiki_return_limit, ERROR_FETCHING_DATA)
#import wikipedia # pip install wikipedia
import requests
import bs4 as bs

View File

@@ -3,7 +3,8 @@
import requests
import json
from modules.log import *
from modules.log import logger
from modules.settings import ERROR_FETCHING_DATA
def get_weather_data(api_url, params):
response = requests.get(api_url, params=params)

View File

@@ -12,7 +12,8 @@ import asyncio
import time # for sleep, get some when you can :)
from datetime import datetime
import random
from modules.log import *
from modules.log import logger, CustomFormatter, msgLogger
import modules.settings as my_settings
from modules.system import *
# Global Variables
@@ -134,13 +135,13 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann
pingCount = int(message.split(" ")[1])
if pingCount == 123 or pingCount == 1234:
pingCount = 1
elif not autoPingInChannel and not isDM:
elif not my_settings.autoPingInChannel and not isDM:
# no autoping in channels
pingCount = 1
if pingCount > 51:
pingCount = 50
except:
except ValueError:
pingCount = -1
if pingCount > 1:
@@ -151,7 +152,7 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann
msg = f"🚦Initalizing {pingCount} auto-ping"
# if not a DM add the username to the beginning of msg
if not useDMForResponse and not isDM:
if not my_settings.useDMForResponse and not isDM:
msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg
return msg
@@ -161,8 +162,8 @@ def handle_motd(message, message_from_id, isDM):
isAdmin = False
msg = MOTD
# check if the message_from_id is in the bbs_admin_list
if bbs_admin_list != ['']:
for admin in bbs_admin_list:
if my_settings.bbs_admin_list != ['']:
for admin in my_settings.bbs_admin_list:
if str(message_from_id) == admin:
isAdmin = True
break
@@ -191,7 +192,7 @@ def handle_echo(message, message_from_id, deviceID, isDM, channel_number):
parts = message.lower().split("echo ", 1)
if len(parts) > 1 and parts[1].strip() != "":
echo_msg = parts[1]
if channel_number != echoChannel:
if channel_number != my_settings.echoChannel:
echo_msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + echo_msg
return echo_msg
else:
@@ -238,7 +239,8 @@ def onReceive(packet, interface):
if DEBUGpacket:
# Debug print the interface object
for item in interface.__dict__.items(): intDebug = f"{item}\n"
for item in interface.__dict__.items():
intDebug = f"{item}\n"
logger.debug(f"System: Packet Received on {rxType} Interface\n {intDebug} \n END of interface \n")
# Debug print the packet for debugging
logger.debug(f"Packet Received\n {packet} \n END of packet \n")
@@ -373,7 +375,7 @@ def onReceive(packet, interface):
if hop in ("MQTT", "Gateway") and hop_count > 0:
hop = f"{hop_count} Hops"
if enableHopLogs:
if my_settings.enableHopLogs:
logger.debug(f"System: Packet HopDebugger: hop_away:{hop_away} hop_limit:{hop_limit} hop_start:{hop_start} calculated_hop_count:{hop_count} final_hop_value:{hop} via_mqtt:{via_mqtt} transport_mechanism:{transport_mechanism} Hostname:{rxNodeHostName}")
# check with stringSafeChecker if the message is safe
@@ -417,7 +419,7 @@ def onReceive(packet, interface):
send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode)
else:
# or respond to channel message on the channel itself
if channel_number == publicChannel and antiSpam:
if channel_number == my_settings.publicChannel and my_settings.antiSpam:
# warning user spamming default channel
logger.warning(f"System: AntiSpam protection, sending DM to: {get_name_from_number(message_from_id, 'long', rxNode)}")
@@ -430,7 +432,7 @@ def onReceive(packet, interface):
else:
# message is not for bot to respond to
# ignore the message but add it to the message history list
if zuluTime:
if my_settings.zuluTime:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
timestamp = datetime.now().strftime("%Y-%m-%d %I:%M:%S%p")
@@ -448,17 +450,17 @@ def onReceive(packet, interface):
msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-'))
# repeat the message on the other device
if repeater_enabled and multiple_interface:
if my_settings.repeater_enabled and multiple_interface:
# wait a responseDelay to avoid message collision from lora-ack.
time.sleep(responseDelay)
time.sleep(my_settings.responseDelay)
rMsg = (f"{message_string} From:{get_name_from_number(message_from_id, 'short', rxNode)}")
# if channel found in the repeater list repeat the message
if str(channel_number) in repeater_channels:
if str(channel_number) in my_settings.repeater_channels:
for i in range(1, 10):
if globals().get(f'interface{i}_enabled', False) and i != rxNode:
logger.debug(f"Repeating message on Device{i} Channel:{channel_number}")
send_message(rMsg, channel_number, 0, i)
time.sleep(responseDelay)
time.sleep(my_settings.responseDelay)
else:
# Evaluate non TEXT_MESSAGE_APP packets
consumeMetadata(packet, rxNode, channel_number)
@@ -477,30 +479,30 @@ async def start_rx():
logger.info(f"System: Autoresponder Started for Device{i} {get_name_from_number(myNodeNum, 'long', i)},"
f"{get_name_from_number(myNodeNum, 'short', i)}. NodeID: {myNodeNum}, {decimal_to_hex(myNodeNum)}")
if useDMForResponse:
if my_settings.useDMForResponse:
logger.debug(f"System: Respond by DM only")
if log_messages_to_file:
if my_settings.log_messages_to_file:
logger.debug("System: Logging Messages to disk")
if syslog_to_file:
if my_settings.syslog_to_file:
logger.debug("System: Logging System Logs to disk")
if motd_enabled:
logger.debug(f"System: MOTD Enabled using {MOTD}")
if enableEcho:
if my_settings.motd_enabled:
logger.debug(f"System: MOTD Enabled using {my_settings.MOTD}")
if my_settings.enableEcho:
logger.debug(f"System: Echo command Enabled")
if sentry_enabled:
logger.debug(f"System: Sentry Mode Enabled {sentry_radius}m radius reporting to channel:{secure_channel}")
if highfly_enabled:
logger.debug(f"System: HighFly Enabled using {highfly_altitude}m limit reporting to channel:{highfly_channel}")
if repeater_enabled and multiple_interface:
logger.debug(f"System: Repeater Enabled for Channels: {repeater_channels}")
if bbs_enabled:
if my_settings.sentry_enabled:
logger.debug(f"System: Sentry Mode Enabled {my_settings.sentry_radius}m radius reporting to channel:{my_settings.secure_channel}")
if my_settings.highfly_enabled:
logger.debug(f"System: HighFly Enabled using {highfly_altitude}m limit reporting to channel:{my_settings.highfly_channel}")
if my_settings.repeater_enabled and multiple_interface:
logger.debug(f"System: Repeater Enabled for Channels: {my_settings.repeater_channels}")
if my_settings.bbs_enabled:
logger.debug(f"System: BBS Enabled, {bbsdb} has {len(bbs_messages)} messages. Direct Mail Messages waiting: {(len(bbs_dm) - 1)}")
if bbs_link_enabled:
if len(bbs_link_whitelist) > 0:
logger.debug(f"System: BBS Link Enabled with {len(bbs_link_whitelist)} peers")
if my_settings.bbs_link_enabled:
if len(my_settings.bbs_link_whitelist) > 0:
logger.debug(f"System: BBS Link Enabled with {len(my_settings.bbs_link_whitelist)} peers")
else:
logger.debug(f"System: BBS Link Enabled allowing all")
if scheduler_enabled:
logger.debug("System: BBS Link Enabled allowing all")
if my_settings.scheduler_enabled:
# Examples of using the scheduler, Times here are in 24hr format
# https://schedule.readthedocs.io/en/stable/
@@ -524,7 +526,7 @@ async def main():
tasks.append(asyncio.create_task(watchdog(), name="watchdog"))
# Add optional tasks
if file_monitor_enabled:
if my_settings.file_monitor_enabled:
tasks.append(asyncio.create_task(handleFileWatcher(), name="file_monitor"))
logger.debug(f"System: Starting {len(tasks)} async tasks")

View File

@@ -34,8 +34,10 @@ print("---------------------------------------------------------------")
try:
# set the path to import the modules and config.ini
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from modules.log import *
from modules.system import *
from modules.log import logger, getPrettyTime
from modules.system import handleFavoriteNode
from modules.settings import LOGGING_LEVEL
from modules.system import compileFavoriteList
except Exception as e:
print(f"Error importing modules run this program from the main repo directory 'python3 script/addFav.py'")
print(f"if you forgot the rest of it.. git clone https://github.com/spudgunman/meshing-around")

View File

@@ -14,8 +14,10 @@ print("---------------------------------------------------------------")
try:
# set the path to import the modules and config.ini
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from modules.log import *
from modules.bbstools import *
from modules.log import logger
from modules.bbstools import bbs_post_dm, bbs_dm, get_bbs_stats
from modules.settings import LOGGING_LEVEL, vpTracker, MOTD
logger.setLevel(LOGGING_LEVEL)
except Exception as e:
print(f"Error importing modules run this program from the main program directory 'python3 script/injectDM.py'")
exit(1)