From 10171a712e3d79dbbfdb46e5cd7cafe7f575fbd5 Mon Sep 17 00:00:00 2001 From: SpudGunMan Date: Fri, 24 Oct 2025 20:32:30 -0700 Subject: [PATCH] =?UTF-8?q?moar=20cleanup=20=F0=9F=A7=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tighter memory control --- mesh_bot.py | 277 ++++++++++++++++++++++++++-------------------------- pong_bot.py | 67 ++++++------- 2 files changed, 175 insertions(+), 169 deletions(-) diff --git a/mesh_bot.py b/mesh_bot.py index 2804861..eb3c140 100755 --- a/mesh_bot.py +++ b/mesh_bot.py @@ -11,7 +11,7 @@ import asyncio import time # for sleep, get some when you can :) import random from datetime import datetime -from modules.log import logger, CustomFormatter, msgLogger +from modules.log import logger, CustomFormatter, msgLogger, getPrettyTime import modules.settings as my_settings from modules.system import * @@ -129,7 +129,7 @@ def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_n # check the message for commands words list, processed after system.messageTrap for key in command_handler: word = message_lower.split(' ') - if cmdBang: + if my_settings.cmdBang: # strip the ! if word[0].startswith("!"): word[0] = word[0][1:] @@ -180,13 +180,13 @@ def isPlayingGame(message_from_id): trackers = [tracker for tracker in trackers if tracker is not None] - for tracker, game_name, handle_game_func in trackers: + for tracker, game_name in trackers: for i in range(len(tracker)-1, -1, -1): # iterate backwards for safe removal id_key = 'userID' if game_name == "DopeWars" else 'nodeID' id_key = 'id' if game_name == "Survey" else id_key if tracker[i].get(id_key) == message_from_id: last_played_key = 'last_played' if 'last_played' in tracker[i] else 'time' - if tracker[i].get(last_played_key, 0) > (time.time() - GAMEDELAY): + if tracker[i].get(last_played_key, 0) > (time.time() - my_settings.GAMEDELAY): playingGame = True game = game_name break @@ -218,7 +218,7 @@ def check_and_play_game(tracker, message_from_id, message_string, rxNode, channe if tracker[i].get(id_key) == message_from_id: last_played_key = 'last_played' if 'last_played' in tracker[i] else 'time' - if tracker[i].get(last_played_key) > (time.time() - GAMEDELAY): + if tracker[i].get(last_played_key) > (time.time() - my_settings.GAMEDELAY): if llm_enabled: logger.debug(f"System: LLM Disabled for {message_from_id} for duration of {game_name}") send_message(handle_game_func(message_string, message_from_id, rxNode), channel_number, message_from_id, rxNode) @@ -276,7 +276,7 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann if len(toNode) <= 4: toNode = get_num_from_short_name(toNode, deviceID) if toNode and isinstance(toNode, int) and toNode != 0: - if bbs_enabled: + if my_settings.bbs_enabled: msg_result = None logger.debug(f"System: Sending ping as BBS DM to @{toNode} from {get_name_from_number(message_from_id, 'short', deviceID)}") msg_result = bbs_post_dm(toNode, f"Joke for you! {tell_joke()}", message_from_id) @@ -306,13 +306,13 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann pingCount = int(message.split(" ")[1]) if pingCount == 123 or pingCount == 1234: pingCount = 1 - elif not autoPingInChannel and not isDM: + elif not my_settings.autoPingInChannel and not isDM: # no autoping in channels pingCount = 1 if pingCount > 51: pingCount = 50 - except: + except ValueError: pingCount = -1 if pingCount > 1: @@ -324,7 +324,7 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann msg = f"🚦Initalizing {pingCount} auto-ping" # if not a DM add the username to the beginning of msg - if not useDMForResponse and not isDM: + if not my_settings.useDMForResponse and not isDM: msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg return msg @@ -336,7 +336,7 @@ def handle_alertBell(message_from_id, deviceID, message): def handle_emergency(message_from_id, deviceID, message): myNodeNum = globals().get(f'myNodeNum{deviceID}', 777) # if user in bbs_ban_list return - if str(message_from_id) in bbs_ban_list: + if str(message_from_id) in my_settings.bbs_ban_list: # silent discard logger.warning(f"System: {message_from_id} on spam list, no emergency responder alert sent") return '' @@ -344,18 +344,18 @@ def handle_emergency(message_from_id, deviceID, message): if message_from_id != 0: nodeLocation = get_node_location(message_from_id, deviceID) # if default location is returned set to Unknown - if nodeLocation[0] == latitudeValue and nodeLocation[1] == longitudeValue: + if nodeLocation[0] == my_settings.latitudeValue and nodeLocation[1] == my_settings.longitudeValue: nodeLocation = ["?", "?"] nodeInfo = f"{get_name_from_number(message_from_id, 'short', deviceID)} detected by {get_name_from_number(myNodeNum, 'short', deviceID)} lastGPS {nodeLocation[0]}, {nodeLocation[1]}" msg = f"🔔🚨Intercepted Possible Emergency Assistance needed for: {nodeInfo}" # alert the emergency_responder_alert_channel - send_message(msg, emergency_responder_alert_channel, 0, emergency_responder_alert_interface) + send_message(msg, my_settings.emergency_responder_alert_channel, 0, my_settings.emergency_responder_alert_interface) logger.warning(f"System: {message_from_id} Emergency Assistance Requested in {message}") # send the message out via email/sms - if enableSMTP: - for user in sysopEmails: + if my_settings.enableSMTP: + for user in my_settings.sysopEmails: send_email(user, f"Emergency Assistance Requested by {nodeInfo} in {message}", message_from_id) - return EMERGENCY_RESPONSE + return my_settings.EMERGENCY_RESPONSE def handle_motd(message, message_from_id, isDM): msg = my_settings.MOTD @@ -390,7 +390,7 @@ def handle_echo(message, message_from_id, deviceID, isDM, channel_number): parts = message.lower().split("echo ", 1) if len(parts) > 1 and parts[1].strip() != "": echo_msg = parts[1] - if channel_number != echoChannel and not isDM: + if channel_number != my_settings.echoChannel and not isDM: echo_msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + echo_msg return echo_msg else: @@ -399,7 +399,7 @@ def handle_echo(message, message_from_id, deviceID, isDM, channel_number): return "Please provide a message to echo back to you. Example:echo Hello World" def handle_wxalert(message_from_id, deviceID, message): - if use_meteo_wxApi: + if my_settings.use_meteo_wxApi: return "wxalert is not supported" else: location = get_node_location(message_from_id, deviceID) @@ -409,7 +409,7 @@ def handle_wxalert(message_from_id, deviceID, message): else: weatherAlert = getWeatherAlertsNOAA(str(location[0]), str(location[1])) - if NO_ALERTS not in weatherAlert: + if my_settings.NO_ALERTS not in weatherAlert: weatherAlert = weatherAlert[0] return weatherAlert @@ -427,7 +427,7 @@ def handleNews(message_from_id, deviceID, message, isDM): if news: # if not a DM add the username to the beginning of msg - if not useDMForResponse and not isDM: + if not my_settings.useDMForResponse and not isDM: news = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + news return news else: @@ -443,7 +443,7 @@ def handle_howfar(message, message_from_id, deviceID, isDM): return "command returns the distance you have traveled since your last HowFar-command. Add 'reset' to reset your starting point." # if no GPS location return - if lat == latitudeValue and lon == longitudeValue: + if lat == my_settings.latitudeValue and lon == my_settings.longitudeValue: logger.debug(f"System: HowFar: No GPS location for {message_from_id}") return "No GPS location available" @@ -453,7 +453,7 @@ def handle_howfar(message, message_from_id, deviceID, isDM): msg = distance(lat,lon,message_from_id) # if not a DM add the username to the beginning of msg - if not useDMForResponse and not isDM: + if not my_settings.useDMForResponse and not isDM: msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg return msg @@ -463,10 +463,10 @@ def handle_howtall(message, message_from_id, deviceID, isDM): location = get_node_location(message_from_id, deviceID) lat = location[0] lon = location[1] - if lat == latitudeValue and lon == longitudeValue: + if lat == my_settings.latitudeValue and lon == my_settings.longitudeValue: # add guessing tot he msg msg += "Guessing:" - if use_metric: + if my_settings.use_metric: measure = "meters" else: measure = "feet" @@ -476,14 +476,14 @@ def handle_howtall(message, message_from_id, deviceID, isDM): # get the shadow length from the message split after howtall try: shadow_length = float(message.lower().split("howtall ")[1].split(" ")[0]) - except: + except (IndexError, ValueError): return f"Please provide a shadow length in {measure} example: howtall 5.5" # get data msg += measureHeight(lat, lon, shadow_length) # if data has NO_ALERTS return help - if NO_ALERTS in msg: + if my_settings.NO_ALERTS in msg: return f"Please provide a shadow length in {measure} example: howtall 5.5" return msg @@ -510,12 +510,12 @@ llmLocationTable = [{'nodeID': 1234567890, 'location': 'No Location'},] def handle_satpass(message_from_id, deviceID, message='', vox=False): if vox: - location = (latitudeValue, longitudeValue) + location = (my_settings.latitudeValue, my_settings.longitudeValue) message = 'satpass' else: location = get_node_location(message_from_id, deviceID) passes = '' - satList = satListConfig + satList = my_settings.satListConfig message = message.lower() # if user has a NORAD ID in the message @@ -546,7 +546,7 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel location_name = 'no location provided' msg = '' - if location_enabled: + if my_settings.location_enabled: # if message_from_id is is the llmLocationTable use the location from the list to save on API calls for i in range(0, len(llmLocationTable)): if llmLocationTable[i].get('nodeID') == message_from_id: @@ -557,7 +557,7 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel location = get_node_location(message_from_id, deviceID) location_name = where_am_i(str(location[0]), str(location[1]), short = True) - if NO_DATA_NOGPS in location_name: + if my_settings.NO_DATA_NOGPS in location_name: location_name = "no location provided" if "ask:" in message.lower(): @@ -572,12 +572,12 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel # check for a welcome message (is this redundant?) if not any(node['nodeID'] == message_from_id and node['welcome'] == True for node in seenNodes): - if (channel_number == publicChannel and antiSpam) or useDMForResponse: + if (channel_number == publicChannel and my_settings.antiSpam) or my_settings.useDMForResponse: # send via DM - send_message(welcome_message, channel_number, message_from_id, deviceID) + send_message(my_settings.welcome_message, channel_number, message_from_id, deviceID) else: # send via channel - send_message(welcome_message, channel_number, 0, deviceID) + send_message(my_settings.welcome_message, channel_number, 0, deviceID) # mark the node as welcomed for node in seenNodes: if node['nodeID'] == message_from_id: @@ -605,7 +605,7 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel msg = "Please wait, response could take 30+ seconds. Fund the SysOp's GPU budget!" if msg != '': - if (channel_number == publicChannel and antiSpam) or useDMForResponse: + if (channel_number == publicChannel and my_settings.antiSpam) or my_settings.useDMForResponse: # send via DM send_message(msg, channel_number, message_from_id, deviceID) else: @@ -1005,13 +1005,14 @@ def handleTicTacToe(message, nodeID, deviceID): return msg def quizHandler(message, nodeID, deviceID): + global quizGamePlayer user_name = get_name_from_number(nodeID) user_id = nodeID msg = '' user_answer = '' user_answer = message.lower() user_answer = user_answer.replace("quiz","").replace("q:","").strip() - if user_answer.startswith("!") and cmdBang: + if user_answer.startswith("!") and my_settings.cmdBang: user_answer = user_answer[1:].strip() if user_answer: if user_answer.startswith("start"): @@ -1065,6 +1066,7 @@ def quizHandler(message, nodeID, deviceID): return "🧠Please provide an answer or command, or send q: ?" def surveyHandler(message, nodeID, deviceID): + global surveyTracker user_id = nodeID location = get_node_location(nodeID, deviceID) msg = '' @@ -1113,7 +1115,7 @@ def surveyHandler(message, nodeID, deviceID): def handle_riverFlow(message, message_from_id, deviceID, vox=False): # River Flow from NOAA or Open-Meteo if vox: - location = (latitudeValue, longitudeValue) + location = (my_settings.latitudeValue, my_settings.longitudeValue) message = "riverflow" else: location = get_node_location(message_from_id, deviceID) @@ -1137,9 +1139,9 @@ def handle_riverFlow(message, message_from_id, deviceID, vox=False): def handle_mwx(message_from_id, deviceID, cmd): # NOAA Coastal and Marine Weather - if myCoastalZone is None: + if my_settings.myCoastalZone is None: logger.warning("System: Coastal Zone not set, please set in config.ini") - return NO_ALERTS + return my_settings.NO_ALERTS return get_nws_marine(zone=myCoastalZone, days=coastalForecastDays) def handle_wxc(message_from_id, deviceID, cmd, vox=False): @@ -1147,17 +1149,17 @@ def handle_wxc(message_from_id, deviceID, cmd, vox=False): if vox: # return a default message if vox is enabled if use_meteo_wxApi: - return get_wx_meteo(latitudeValue, longitudeValue) + return get_wx_meteo(my_settings.latitudeValue, my_settings.longitudeValue) else: - return get_NOAAweather(latitudeValue, longitudeValue) + return get_NOAAweather(my_settings.latitudeValue, my_settings.longitudeValue) location = get_node_location(message_from_id, deviceID) - if use_meteo_wxApi and not "wxc" in cmd and not use_metric: + if my_settings.use_meteo_wxApi and not "wxc" in cmd and not use_metric: #logger.debug("System: Bot Returning Open-Meteo API for weather imperial") weather = get_wx_meteo(str(location[0]), str(location[1])) - elif use_meteo_wxApi: + elif my_settings.use_meteo_wxApi: #logger.debug("System: Bot Returning Open-Meteo API for weather metric") weather = get_wx_meteo(str(location[0]), str(location[1]), 1) - elif not use_meteo_wxApi and "wxc" in cmd or use_metric: + elif not my_settings.use_meteo_wxApi and "wxc" in cmd or my_settings.use_metric: #logger.debug("System: Bot Returning NOAA API for weather metric") weather = get_NOAAweather(str(location[0]), str(location[1]), 1) else: @@ -1167,7 +1169,7 @@ def handle_wxc(message_from_id, deviceID, cmd, vox=False): def handle_emergency_alerts(message, message_from_id, deviceID): location = get_node_location(message_from_id, deviceID) - if enableDEalerts: + if my_settings.enableDEalerts: # nina Alerts return get_nina_alerts() if message.lower().startswith("ealert"): @@ -1248,7 +1250,7 @@ def handle_messages(message, deviceID, channel_number, msg_history, publicChanne # Choose order and slice # Oldest first, take first N filtered_msgs = filtered_msgs[-storeFlimit:][::-1] - if reverseSF: + if my_settings.reverseSF: # reverse that filtered_msgs = filtered_msgs[::-1] @@ -1283,7 +1285,7 @@ def handle_messages(message, deviceID, channel_number, msg_history, publicChanne def handle_sun(message_from_id, deviceID, channel_number, vox=False): if vox: # return a default message if vox is enabled - return get_sun(str(latitudeValue), str(longitudeValue)) + return get_sun(str(my_settings.latitudeValue), str(my_settings.longitudeValue)) location = get_node_location(message_from_id, deviceID, channel_number) return get_sun(str(location[0]), str(location[1])) @@ -1357,7 +1359,8 @@ def handle_history(message, nodeid, deviceID, isDM, lheard=False): # create the message from the buffer list for i in range(0, len(buffer)): msg += f"{buffer[i][0]}: {buffer[i][1]} :{buffer[i][2]} ago" - if i < len(buffer) - 1: msg += "\n" # add a new line if not the last line + if i < len(buffer) - 1: + msg += "\n" # add a new line if not the last line else: # sort the cmdHistory list by time, return the username and time into a new list which used for display for i in range(len(cmdHistory)): @@ -1379,8 +1382,10 @@ def handle_history(message, nodeid, deviceID, isDM, lheard=False): buffer.reverse() # reverse the list to show the latest first for i in range(0, len(buffer)): msg += f"{buffer[i][0]}, {buffer[i][1]} ago" - if i < len(buffer) - 1: msg += "\n" # add a new line if not the last line - if i > 3: break # only return the last 4 nodes + if i < len(buffer) - 1: + msg += "\n" # add a new line if not the last line + if i > 3: + break # only return the last 4 nodes return msg def handle_whereami(message_from_id, deviceID, channel_number): @@ -1398,13 +1403,13 @@ def handle_repeaterQuery(message_from_id, deviceID, channel_number): def handle_tide(message_from_id, deviceID, channel_number, vox=False): if vox: - return get_NOAAtide(str(latitudeValue), str(longitudeValue)) + return get_NOAAtide(str(my_settings.latitudeValue), str(my_settings.longitudeValue)) location = get_node_location(message_from_id, deviceID, channel_number) return get_NOAAtide(str(location[0]), str(location[1])) def handle_moon(message_from_id, deviceID, channel_number, vox=False): if vox: - return get_moon(str(latitudeValue), str(longitudeValue)) + return get_moon(str(my_settings.latitudeValue), str(my_settings.longitudeValue)) location = get_node_location(message_from_id, deviceID, channel_number) return get_moon(str(location[0]), str(location[1])) @@ -1420,7 +1425,7 @@ def handle_whoami(message_from_id, deviceID, hop, snr, rssi, pkiStatus): msg += f"\nYour PKI bit is {pkiStatus[0]} pubKey: {pkiStatus[1]}" loc = get_node_location(message_from_id, deviceID) - if loc != [latitudeValue, longitudeValue]: + if loc != [my_settings.latitudeValue, my_settings.longitudeValue]: msg += f"\nYou are at: lat:{loc[0]} lon:{loc[1]}" # check the positionMetadata for nodeID and get metadata @@ -1468,7 +1473,7 @@ def handle_whois(message, deviceID, channel_number, message_from_id): location = get_node_location(seenNodes[i]['nodeID'], deviceID, channel_number) msg += f"Ch: {seenNodes[i]['channel']}, Int: {seenNodes[i]['rxInterface']}" msg += f"Lat: {location[0]}, Lon: {location[1]}\n" - if location != [latitudeValue, longitudeValue]: + if location != [my_settings.latitudeValue, my_settings.longitudeValue]: msg += f"Loc: {where_am_i(str(location[0]), str(location[1]))}" return msg @@ -1492,7 +1497,7 @@ def onReceive(packet, interface): session_passkey = None playingGame = False - if DEBUGpacket: + if my_settings.DEBUGpacket: # Debug print the interface object for item in interface.__dict__.items(): intDebug = f"{item}\n" logger.debug(f"System: Packet Received on {rxType} Interface\n {intDebug} \n END of interface \n") @@ -1670,10 +1675,10 @@ def onReceive(packet, interface): send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # DM is useful for games or LLM - if games_enabled and ("Direct" in hop or hop_count < game_hop_limit): + if my_settings.games_enabled and ("Direct" in hop or hop_count < my_settings.game_hop_limit): playingGame = checkPlayingGame(message_from_id, message_string, rxNode, channel_number) - elif hop_count >= game_hop_limit: - if games_enabled: + elif hop_count >= my_settings.game_hop_limit: + if my_settings.games_enabled: logger.warning(f"Device:{rxNode} Ignoring Request to Play Game: {message_string} From: {get_name_from_number(message_from_id, 'long', rxNode)} with hop count: {hop}") send_message(f"Your hop count exceeds safe playable distance at {hop_count} hops", channel_number, message_from_id, rxNode) else: @@ -1682,7 +1687,7 @@ def onReceive(packet, interface): playingGame = False if not playingGame: - if llm_enabled and llmReplyToNonCommands: + if llm_enabled and my_settings.llmReplyToNonCommands: # respond with LLM llm = handle_llm(message_from_id, channel_number, rxNode, message_string, publicChannel) send_message(llm, channel_number, message_from_id, rxNode) @@ -1699,7 +1704,7 @@ def onReceive(packet, interface): if node['nodeID'] == message_from_id: node['welcome'] = True else: - if dad_jokes_enabled: + if my_settings.dad_jokes_enabled: # respond with a dad joke on DM send_message(tell_joke(), channel_number, message_from_id, rxNode) else: @@ -1713,24 +1718,24 @@ def onReceive(packet, interface): # message is on a channel if messageTrap(message_string): # message is for us to respond to, or is it... - if ignoreDefaultChannel and channel_number == publicChannel: + if my_settings.ignoreDefaultChannel and channel_number == my_settings.publicChannel: logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Default Channel:{channel_number}") - elif str(message_from_id) in bbs_ban_list: + elif str(message_from_id) in my_settings.bbs_ban_list: logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Cantankerous Node") - elif str(channel_number) in ignoreChannels: + elif str(channel_number) in my_settings.ignoreChannels: logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Ignored Channel:{channel_number}") - elif cmdBang and not message_string.startswith("!"): + elif my_settings.cmdBang and not message_string.startswith("!"): logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Didnt sound like they meant it") else: # message is for bot to respond to, seriously this time.. logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "ReceivedChannel: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\ "From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}") - if useDMForResponse: + if my_settings.useDMForResponse: # respond to channel message via direct message send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # or respond to channel message on the channel itself - if channel_number == publicChannel and antiSpam: + if channel_number == my_settings.publicChannel and my_settings.antiSpam: # warning user spamming default channel logger.warning(f"System: AntiSpam protection, sending DM to: {get_name_from_number(message_from_id, 'long', rxNode)}") @@ -1743,15 +1748,15 @@ def onReceive(packet, interface): else: # message is not for us to respond to # ignore the message but add it to the message history list - if zuluTime: + if my_settings.zuluTime: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") else: timestamp = datetime.now().strftime("%Y-%m-%d %I:%M:%S%p") # trim the history list if it exceeds max_history - if len(msg_history) >= MAX_MSG_HISTORY: + if len(msg_history) >= my_settings.MAX_MSG_HISTORY: # Always keep only the most recent MAX_MSG_HISTORY entries - msg_history = msg_history[-MAX_MSG_HISTORY:] + msg_history = msg_history[-my_settings.MAX_MSG_HISTORY:] # add the message to the history list msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode)) @@ -1759,27 +1764,27 @@ def onReceive(packet, interface): # print the message to the log and sdout logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "Ignoring Message:" + CustomFormatter.white +\ f" {message_string} " + CustomFormatter.purple + "From:" + CustomFormatter.white + f" {get_name_from_number(message_from_id)}") - if log_messages_to_file: + if my_settings.log_messages_to_file: msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-')) # repeat the message on the other device - if repeater_enabled and multiple_interface: + if my_settings.repeater_enabled and my_settings.multiple_interface: # wait a responseDelay to avoid message collision from lora-ack. - time.sleep(responseDelay) - if len(message_string) > (3 * MESSAGE_CHUNK_SIZE): + time.sleep(my_settings.responseDelay) + if len(message_string) > (3 * my_settings.MESSAGE_CHUNK_SIZE): logger.warning(f"System: Not repeating message, exceeds size limit ({len(message_string)} > {3 * MESSAGE_CHUNK_SIZE})") else: rMsg = (f"{message_string} From:{get_name_from_number(message_from_id, 'short', rxNode)}") # if channel found in the repeater list repeat the message - if str(channel_number) in repeater_channels: + if str(channel_number) in my_settings.repeater_channels: for i in range(1, 10): if globals().get(f'interface{i}_enabled', False) and i != rxNode: logger.debug(f"Repeating message on Device{i} Channel:{channel_number}") send_message(rMsg, channel_number, 0, i) - time.sleep(responseDelay) + time.sleep(my_settings.responseDelay) # if QRZ enabled check if we have said hello - if qrz_hello_enabled: + if my_settings.qrz_hello_enabled: if never_seen_before(message_from_id): name = get_name_from_number(message_from_id, 'short', rxNode) if isinstance(name, str) and name.startswith("!") and len(name) == 9: @@ -1789,11 +1794,11 @@ def onReceive(packet, interface): # add to qrz_hello list hello(message_from_id, name) # send a hello message as a DM - if not train_qrz: + if not my_settings.train_qrz: send_message(f"Hello {name} {qrz_hello_string}", channel_number, message_from_id, rxNode) # handle mini games - if wordOfTheDay: + if my_settings.wordOfTheDay: #word of the day game play on non bot messages happened, old_entry, new_entry, bingo_win, bingo_message = theWordOfTheDay.did_it_happen(message_string) if happened: @@ -1830,130 +1835,130 @@ async def start_rx(): f"{get_name_from_number(myNodeNum, 'short', i)}. NodeID: {myNodeNum}, {decimal_to_hex(myNodeNum)}") if llm_enabled: - logger.debug(f"System: Ollama LLM Enabled, loading model {llmModel} please wait") + logger.debug(f"System: Ollama LLM Enabled, loading model {my_settings.llmModel} please wait") llmLoad = llm_query(" ") if "trouble" not in llmLoad: - logger.debug(f"System: LLM Model {llmModel} loaded") + logger.debug(f"System: LLM Model {my_settings.llmModel} loaded") - if useDMForResponse: + if my_settings.useDMForResponse: logger.debug("System: Respond by DM only") - if log_messages_to_file: + if my_settings.log_messages_to_file: logger.debug("System: Logging Messages to disk") - if syslog_to_file: + if my_settings.syslog_to_file: logger.debug("System: Logging System Logs to disk") - if bbs_enabled: + if my_settings.bbs_enabled: logger.debug(f"System: BBS Enabled, {bbsdb} has {len(bbs_messages)} messages. Direct Mail Messages waiting: {(len(bbs_dm) - 1)}") - if bbs_link_enabled: + if my_settings.bbs_link_enabled: if len(bbs_link_whitelist) > 0: logger.debug(f"System: BBS Link Enabled with {len(bbs_link_whitelist)} peers") else: logger.debug(f"System: BBS Link Enabled allowing all") - if solar_conditions_enabled: + if my_settings.solar_conditions_enabled: logger.debug("System: Celestial Telemetry Enabled") - if location_enabled: - if use_meteo_wxApi: + if my_settings.location_enabled: + if my_settings.use_meteo_wxApi: logger.debug("System: Location Telemetry Enabled using Open-Meteo API") else: logger.debug("System: Location Telemetry Enabled using NOAA API") - if dad_jokes_enabled: + if my_settings.dad_jokes_enabled: logger.debug("System: Dad Jokes Enabled!") - if coastalEnabled: + if my_settings.coastalEnabled: logger.debug("System: Coastal Forecast and Tide Enabled!") - if games_enabled: + if my_settings.games_enabled: logger.debug("System: Games Enabled!") - if wikipedia_enabled: - if use_kiwix_server: + if my_settings.wikipedia_enabled: + if my_settings.use_kiwix_server: logger.debug(f"System: Wikipedia search Enabled using Kiwix server at {kiwix_url}") else: logger.debug("System: Wikipedia search Enabled") - if rssEnable: + if my_settings.rssEnable: logger.debug(f"System: RSS Feed Reader Enabled for feeds: {rssFeedNames}") if my_settings.MOTD_enabled: logger.debug(f"System: MOTD Enabled using {my_settings.MOTD} scheduler:{my_settings.schedulerMOTD}") - if sentry_enabled: - logger.debug(f"System: Sentry Mode Enabled {sentry_radius}m radius reporting to channel:{secure_channel} requestLOC:{reqLocationEnabled}") - if sentryIgnoreList: - logger.debug(f"System: Sentry BlockList Enabled for nodes: {sentryIgnoreList}") - if sentryWatchList: - logger.debug(f"System: Sentry WatchList Enabled for nodes: {sentryWatchList}") + if my_settings.sentry_enabled: + logger.debug(f"System: Sentry Mode Enabled {my_settings.sentry_radius}m radius reporting to channel:{my_settings.secure_channel} requestLOC:{reqLocationEnabled}") + if my_settings.sentryIgnoreList: + logger.debug(f"System: Sentry BlockList Enabled for nodes: {my_settings.sentryIgnoreList}") + if my_settings.sentryWatchList: + logger.debug(f"System: Sentry WatchList Enabled for nodes: {my_settings.sentryWatchList}") - if highfly_enabled: - logger.debug(f"System: HighFly Enabled using {highfly_altitude}m limit reporting to channel:{highfly_channel}") + if my_settings.highfly_enabled: + logger.debug(f"System: HighFly Enabled using {my_settings.highfly_altitude}m limit reporting to channel:{my_settings.highfly_channel}") - if store_forward_enabled: - logger.debug(f"System: S&F(messages command) Enabled using limit: {storeFlimit} and reverse queue:{reverseSF}") + if my_settings.store_forward_enabled: + logger.debug(f"System: S&F(messages command) Enabled using limit: {storeFlimit} and reverse queue:{my_settings.reverseSF}") - if enableEcho: + if my_settings.enableEcho: logger.debug("System: Echo command Enabled") - if repeater_enabled and multiple_interface: - logger.debug(f"System: Repeater Enabled for Channels: {repeater_channels}") + if my_settings.repeater_enabled and multiple_interface: + logger.debug(f"System: Repeater Enabled for Channels: {my_settings.repeater_channels}") - if radio_detection_enabled: - logger.debug(f"System: Radio Detection Enabled using rigctld at {rigControlServerAddress} broadcasting to channels: {sigWatchBroadcastCh} for {get_freq_common_name(get_hamlib('f'))}") + if my_settings.radio_detection_enabled: + logger.debug(f"System: Radio Detection Enabled using rigctld at {my_settings.rigControlServerAddress} broadcasting to channels: {my_settings.sigWatchBroadcastCh} for {get_freq_common_name(get_hamlib('f'))}") - if file_monitor_enabled: - logger.warning(f"System: File Monitor Enabled for {file_monitor_file_path}, broadcasting to channels: {file_monitor_broadcastCh}") - if enable_runShellCmd: + if my_settings.file_monitor_enabled: + logger.warning(f"System: File Monitor Enabled for {my_settings.file_monitor_file_path}, broadcasting to channels: {my_settings.file_monitor_broadcastCh}") + if my_settings.enable_runShellCmd: logger.debug("System: Shell Command monitor enabled") - if allowXcmd: + if my_settings.allowXcmd: logger.warning("System: File Monitor shell XCMD Enabled") - if read_news_enabled: - logger.debug(f"System: File Monitor News Reader Enabled for {news_file_path}") - if bee_enabled: + if my_settings.read_news_enabled: + logger.debug(f"System: File Monitor News Reader Enabled for {my_settings.news_file_path}") + if my_settings.bee_enabled: logger.debug("System: File Monitor Bee Monitor Enabled for bee.txt") - if wxAlertBroadcastEnabled: - logger.debug(f"System: Weather Alert Broadcast Enabled on channels {wxAlertBroadcastChannel}") + if my_settings.wxAlertBroadcastEnabled: + logger.debug(f"System: Weather Alert Broadcast Enabled on channels {my_settings.wxAlertBroadcastChannel}") - if emergencyAlertBrodcastEnabled: - logger.debug(f"System: Emergency Alert Broadcast Enabled on channels {emergencyAlertBroadcastCh} for FIPS codes {myStateFIPSList}") - if myStateFIPSList == ['']: + if my_settings.emergencyAlertBrodcastEnabled: + logger.debug(f"System: Emergency Alert Broadcast Enabled on channels {my_settings.emergencyAlertBroadcastCh} for FIPS codes {my_settings.myStateFIPSList}") + if my_settings.myStateFIPSList == ['']: logger.warning("System: No FIPS codes set for iPAWS Alerts") - if emergency_responder_enabled: - logger.debug(f"System: Emergency Responder Enabled on channels {emergency_responder_alert_channel} for interface {emergency_responder_alert_interface}") + if my_settings.emergency_responder_enabled: + logger.debug(f"System: Emergency Responder Enabled on channels {my_settings.emergency_responder_alert_channel} for interface {my_settings.emergency_responder_alert_interface}") - if volcanoAlertBroadcastEnabled: - logger.debug(f"System: Volcano Alert Broadcast Enabled on channels {volcanoAlertBroadcastChannel}") + if my_settings.volcanoAlertBroadcastEnabled: + logger.debug(f"System: Volcano Alert Broadcast Enabled on channels {my_settings.volcanoAlertBroadcastChannel}") - if qrz_hello_enabled: - if train_qrz: + if my_settings.qrz_hello_enabled: + if my_settings.train_qrz: logger.debug("System: QRZ Welcome/Hello Enabled with training mode") else: logger.debug("System: QRZ Welcome/Hello Enabled") - if checklist_enabled: + if my_settings.checklist_enabled: logger.debug("System: CheckList Module Enabled") - if ignoreChannels: - logger.debug(f"System: Ignoring Channels: {ignoreChannels}") + if my_settings.ignoreChannels: + logger.debug(f"System: Ignoring Channels: {my_settings.ignoreChannels}") - if noisyNodeLogging: + if my_settings.noisyNodeLogging: logger.debug("System: Noisy Node Logging Enabled") - if logMetaStats: + if my_settings.logMetaStats: logger.debug("System: Logging Metadata Stats Enabled, leaderboard") loadLeaderboard() - if enableSMTP: - if enableImap: + if my_settings.enableSMTP: + if my_settings.enableImap: logger.debug("System: SMTP Email Alerting Enabled using IMAP") else: logger.warning("System: SMTP Email Alerting Enabled") - if scheduler_enabled: + if my_settings.scheduler_enabled: # setup the scheduler from modules.scheduler import setup_scheduler await setup_scheduler( @@ -2002,13 +2007,13 @@ async def main(): tasks.append(asyncio.create_task(watchdog(), name="watchdog")) # Add optional tasks - if file_monitor_enabled: + if my_settings.file_monitor_enabled: tasks.append(asyncio.create_task(handleFileWatcher(), name="file_monitor")) - if radio_detection_enabled: + if my_settings.radio_detection_enabled: tasks.append(asyncio.create_task(handleSignalWatcher(), name="hamlib")) - if voxDetectionEnabled: + if my_settings.voxDetectionEnabled: tasks.append(asyncio.create_task(voxMonitor(), name="vox_detection")) logger.debug(f"System: Starting {len(tasks)} async tasks") diff --git a/pong_bot.py b/pong_bot.py index a6b2722..cc0eec1 100755 --- a/pong_bot.py +++ b/pong_bot.py @@ -135,13 +135,13 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann pingCount = int(message.split(" ")[1]) if pingCount == 123 or pingCount == 1234: pingCount = 1 - elif not autoPingInChannel and not isDM: + elif not my_settings.autoPingInChannel and not isDM: # no autoping in channels pingCount = 1 if pingCount > 51: pingCount = 50 - except: + except ValueError: pingCount = -1 if pingCount > 1: @@ -152,7 +152,7 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann msg = f"🚦Initalizing {pingCount} auto-ping" # if not a DM add the username to the beginning of msg - if not useDMForResponse and not isDM: + if not my_settings.useDMForResponse and not isDM: msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg return msg @@ -162,8 +162,8 @@ def handle_motd(message, message_from_id, isDM): isAdmin = False msg = MOTD # check if the message_from_id is in the bbs_admin_list - if bbs_admin_list != ['']: - for admin in bbs_admin_list: + if my_settings.bbs_admin_list != ['']: + for admin in my_settings.bbs_admin_list: if str(message_from_id) == admin: isAdmin = True break @@ -192,7 +192,7 @@ def handle_echo(message, message_from_id, deviceID, isDM, channel_number): parts = message.lower().split("echo ", 1) if len(parts) > 1 and parts[1].strip() != "": echo_msg = parts[1] - if channel_number != echoChannel: + if channel_number != my_settings.echoChannel: echo_msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + echo_msg return echo_msg else: @@ -239,7 +239,8 @@ def onReceive(packet, interface): if DEBUGpacket: # Debug print the interface object - for item in interface.__dict__.items(): intDebug = f"{item}\n" + for item in interface.__dict__.items(): + intDebug = f"{item}\n" logger.debug(f"System: Packet Received on {rxType} Interface\n {intDebug} \n END of interface \n") # Debug print the packet for debugging logger.debug(f"Packet Received\n {packet} \n END of packet \n") @@ -374,7 +375,7 @@ def onReceive(packet, interface): if hop in ("MQTT", "Gateway") and hop_count > 0: hop = f"{hop_count} Hops" - if enableHopLogs: + if my_settings.enableHopLogs: logger.debug(f"System: Packet HopDebugger: hop_away:{hop_away} hop_limit:{hop_limit} hop_start:{hop_start} calculated_hop_count:{hop_count} final_hop_value:{hop} via_mqtt:{via_mqtt} transport_mechanism:{transport_mechanism} Hostname:{rxNodeHostName}") # check with stringSafeChecker if the message is safe @@ -418,7 +419,7 @@ def onReceive(packet, interface): send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # or respond to channel message on the channel itself - if channel_number == publicChannel and antiSpam: + if channel_number == my_settings.publicChannel and my_settings.antiSpam: # warning user spamming default channel logger.warning(f"System: AntiSpam protection, sending DM to: {get_name_from_number(message_from_id, 'long', rxNode)}") @@ -431,7 +432,7 @@ def onReceive(packet, interface): else: # message is not for bot to respond to # ignore the message but add it to the message history list - if zuluTime: + if my_settings.zuluTime: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") else: timestamp = datetime.now().strftime("%Y-%m-%d %I:%M:%S%p") @@ -449,17 +450,17 @@ def onReceive(packet, interface): msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-')) # repeat the message on the other device - if repeater_enabled and multiple_interface: + if my_settings.repeater_enabled and multiple_interface: # wait a responseDelay to avoid message collision from lora-ack. - time.sleep(responseDelay) + time.sleep(my_settings.responseDelay) rMsg = (f"{message_string} From:{get_name_from_number(message_from_id, 'short', rxNode)}") # if channel found in the repeater list repeat the message - if str(channel_number) in repeater_channels: + if str(channel_number) in my_settings.repeater_channels: for i in range(1, 10): if globals().get(f'interface{i}_enabled', False) and i != rxNode: logger.debug(f"Repeating message on Device{i} Channel:{channel_number}") send_message(rMsg, channel_number, 0, i) - time.sleep(responseDelay) + time.sleep(my_settings.responseDelay) else: # Evaluate non TEXT_MESSAGE_APP packets consumeMetadata(packet, rxNode, channel_number) @@ -478,30 +479,30 @@ async def start_rx(): logger.info(f"System: Autoresponder Started for Device{i} {get_name_from_number(myNodeNum, 'long', i)}," f"{get_name_from_number(myNodeNum, 'short', i)}. NodeID: {myNodeNum}, {decimal_to_hex(myNodeNum)}") - if useDMForResponse: + if my_settings.useDMForResponse: logger.debug(f"System: Respond by DM only") - if log_messages_to_file: + if my_settings.log_messages_to_file: logger.debug("System: Logging Messages to disk") - if syslog_to_file: + if my_settings.syslog_to_file: logger.debug("System: Logging System Logs to disk") - if motd_enabled: - logger.debug(f"System: MOTD Enabled using {MOTD}") - if enableEcho: + if my_settings.motd_enabled: + logger.debug(f"System: MOTD Enabled using {my_settings.MOTD}") + if my_settings.enableEcho: logger.debug(f"System: Echo command Enabled") - if sentry_enabled: - logger.debug(f"System: Sentry Mode Enabled {sentry_radius}m radius reporting to channel:{secure_channel}") - if highfly_enabled: - logger.debug(f"System: HighFly Enabled using {highfly_altitude}m limit reporting to channel:{highfly_channel}") - if repeater_enabled and multiple_interface: - logger.debug(f"System: Repeater Enabled for Channels: {repeater_channels}") - if bbs_enabled: + if my_settings.sentry_enabled: + logger.debug(f"System: Sentry Mode Enabled {my_settings.sentry_radius}m radius reporting to channel:{my_settings.secure_channel}") + if my_settings.highfly_enabled: + logger.debug(f"System: HighFly Enabled using {highfly_altitude}m limit reporting to channel:{my_settings.highfly_channel}") + if my_settings.repeater_enabled and multiple_interface: + logger.debug(f"System: Repeater Enabled for Channels: {my_settings.repeater_channels}") + if my_settings.bbs_enabled: logger.debug(f"System: BBS Enabled, {bbsdb} has {len(bbs_messages)} messages. Direct Mail Messages waiting: {(len(bbs_dm) - 1)}") - if bbs_link_enabled: - if len(bbs_link_whitelist) > 0: - logger.debug(f"System: BBS Link Enabled with {len(bbs_link_whitelist)} peers") + if my_settings.bbs_link_enabled: + if len(my_settings.bbs_link_whitelist) > 0: + logger.debug(f"System: BBS Link Enabled with {len(my_settings.bbs_link_whitelist)} peers") else: - logger.debug(f"System: BBS Link Enabled allowing all") - if scheduler_enabled: + logger.debug("System: BBS Link Enabled allowing all") + if my_settings.scheduler_enabled: # Examples of using the scheduler, Times here are in 24hr format # https://schedule.readthedocs.io/en/stable/ @@ -525,7 +526,7 @@ async def main(): tasks.append(asyncio.create_task(watchdog(), name="watchdog")) # Add optional tasks - if file_monitor_enabled: + if my_settings.file_monitor_enabled: tasks.append(asyncio.create_task(handleFileWatcher(), name="file_monitor")) logger.debug(f"System: Starting {len(tasks)} async tasks")