#!/usr/bin/env python3 # Meshtastic Autoresponder MESH Bot # K7MHI Kelly Keeton 2025 try: from pubsub import pub except ImportError: print(f"Important dependencies are not met, try install.sh\n\n Did you mean to './launch.sh mesh' using a virtual environment.") exit(1) import asyncio import time # for sleep, get some when you can :) import random from datetime import datetime from modules.log import logger, CustomFormatter, msgLogger, getPrettyTime import modules.settings as my_settings from modules.system import * # list of commands to remove from the default list for DM only restrictedCommands = ["blackjack", "videopoker", "dopewars", "lemonstand", "golfsim", "mastermind", "hangman", "hamtest", "tictactoe", "tic-tac-toe", "quiz", "q:", "survey", "s:", "battleship"] restrictedResponse = "🤖only available in a Direct Message📵" # "" for none def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_number, deviceID, isDM): global cmdHistory #Auto response to messages message_lower = message.lower() bot_response = "🤖I'm sorry, I'm afraid I can't do that." # Command List processes system.trap_list. system.messageTrap() sends any commands to here default_commands = { "ack": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "ask:": lambda: handle_llm(message_from_id, channel_number, deviceID, message, publicChannel), "askai": lambda: handle_llm(message_from_id, channel_number, deviceID, message, publicChannel), "bannode": lambda: handle_bbsban(message, message_from_id, isDM), "battleship": lambda: handleBattleship(message, message_from_id, deviceID), "bbsack": lambda: bbs_sync_posts(message, message_from_id, deviceID), "bbsdelete": lambda: handle_bbsdelete(message, message_from_id), "bbshelp": bbs_help, "bbsinfo": lambda: get_bbs_stats(), "bbslink": lambda: bbs_sync_posts(message, message_from_id, deviceID), "bbslist": bbs_list_messages, "bbspost": lambda: handle_bbspost(message, message_from_id, deviceID), "bbsread": lambda: handle_bbsread(message), "blackjack": lambda: handleBlackJack(message, message_from_id, deviceID), "approvecl": lambda: handle_checklist(message, message_from_id, deviceID), "denycl": lambda: handle_checklist(message, message_from_id, deviceID), "checkin": lambda: handle_checklist(message, message_from_id, deviceID), "checklist": lambda: handle_checklist(message, message_from_id, deviceID), "checkout": lambda: handle_checklist(message, message_from_id, deviceID), "chess": lambda: handle_gTnW(chess=True), "clearsms": lambda: handle_sms(message_from_id, message), "cmd": lambda: handle_cmd(message, message_from_id, deviceID), "cq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "cqcq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "cqcqcq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "dopewars": lambda: handleDopeWars(message, message_from_id, deviceID), "dx": lambda: handledxcluster(message, message_from_id, deviceID), "ea": lambda: handle_emergency_alerts(message, message_from_id, deviceID), "echo": lambda: handle_echo(message, message_from_id, deviceID, isDM, channel_number), "ealert": lambda: handle_emergency_alerts(message, message_from_id, deviceID), "earthquake": lambda: handleEarthquake(message, message_from_id, deviceID), "email:": lambda: handle_email(message_from_id, message), "games": lambda: gamesCmdList, "globalthermonuclearwar": lambda: handle_gTnW(), "golfsim": lambda: handleGolf(message, message_from_id, deviceID), "hamtest": lambda: handleHamtest(message, message_from_id, deviceID), "hangman": lambda: handleHangman(message, message_from_id, deviceID), "hfcond": hf_band_conditions, "history": lambda: handle_history(message, message_from_id, deviceID, isDM), "howfar": lambda: handle_howfar(message, message_from_id, deviceID, isDM), "howtall": lambda: handle_howtall(message, message_from_id, deviceID, isDM), "item": lambda: handle_inventory(message, message_from_id, deviceID), "itemadd": lambda: handle_inventory(message, message_from_id, deviceID), "itemlist": lambda: handle_inventory(message, message_from_id, deviceID), "itemloan": lambda: handle_inventory(message, message_from_id, deviceID), "itemremove": lambda: handle_inventory(message, message_from_id, deviceID), "itemreset": lambda: handle_inventory(message, message_from_id, deviceID), "itemreturn": lambda: handle_inventory(message, message_from_id, deviceID), "itemsell": lambda: handle_inventory(message, message_from_id, deviceID), "itemstats": lambda: handle_inventory(message, message_from_id, deviceID), "cart": lambda: handle_inventory(message, message_from_id, deviceID), "cartadd": lambda: handle_inventory(message, message_from_id, deviceID), "cartbuy": lambda: handle_inventory(message, message_from_id, deviceID), "cartclear": lambda: handle_inventory(message, message_from_id, deviceID), "cartlist": lambda: handle_inventory(message, message_from_id, deviceID), "cartremove": lambda: handle_inventory(message, message_from_id, deviceID), "cartsell": lambda: handle_inventory(message, message_from_id, deviceID), "joke": lambda: tell_joke(message_from_id), "latest": lambda: get_newsAPI(message, message_from_id, deviceID, isDM), "leaderboard": lambda: get_mesh_leaderboard(message, message_from_id, deviceID), "lemonstand": lambda: handleLemonade(message, message_from_id, deviceID), "lheard": lambda: handle_lheard(message, message_from_id, deviceID, isDM), "map": lambda: mapHandler(message_from_id, deviceID, channel_number, message, snr, rssi, hop), "mastermind": lambda: handleMmind(message, message_from_id, deviceID), "messages": lambda: handle_messages(message, deviceID, channel_number, msg_history, publicChannel, isDM), "moon": lambda: handle_moon(message_from_id, deviceID, channel_number), "motd": lambda: handle_motd(message, message_from_id, isDM), "mwx": lambda: handle_mwx(message_from_id, deviceID, channel_number), "ping": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "pinging": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "pong": lambda: "🏓PING!!🛜", "q:": lambda: quizHandler(message, message_from_id, deviceID), "quiz": lambda: quizHandler(message, message_from_id, deviceID), "readnews": lambda: handleNews(message_from_id, deviceID, message, isDM), "readrss": lambda: get_rss_feed(message), "riverflow": lambda: handle_riverFlow(message, message_from_id, deviceID), "rlist": lambda: handle_repeaterQuery(message_from_id, deviceID, channel_number), "satpass": lambda: handle_satpass(message_from_id, deviceID, message), "setemail": lambda: handle_email(message_from_id, message), "setsms": lambda: handle_sms( message_from_id, message), "sitrep": lambda: handle_lheard(message, message_from_id, deviceID, isDM), "sms:": lambda: handle_sms(message_from_id, message), "solar": lambda: drap_xray_conditions() + "\n" + solar_conditions() + "\n" + get_noaa_scales_summary(), "sun": lambda: handle_sun(message_from_id, deviceID, channel_number), "survey": lambda: surveyHandler(message, message_from_id, deviceID), "s:": lambda: surveyHandler(message, message_from_id, deviceID), "sysinfo": lambda: sysinfo(message, message_from_id, deviceID, isDM), "test": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "testing": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "tictactoe": lambda: handleTicTacToe(message, message_from_id, deviceID), "tic-tac-toe": lambda: handleTicTacToe(message, message_from_id, deviceID), "tide": lambda: handle_tide(message_from_id, deviceID, channel_number), "valert": lambda: get_volcano_usgs(), "verse": lambda: read_verse(), "videopoker": lambda: handleVideoPoker(message, message_from_id, deviceID), "whereami": lambda: handle_whereami(message_from_id, deviceID, channel_number), "whoami": lambda: handle_whoami(message_from_id, deviceID, hop, snr, rssi, pkiStatus), "whois": lambda: handle_whois(message, deviceID, channel_number, message_from_id), "wiki": lambda: handle_wiki(message, isDM), "wx": lambda: handle_wxc(message_from_id, deviceID, 'wx'), "wxa": lambda: handle_wxalert(message_from_id, deviceID, message), "wxalert": lambda: handle_wxalert(message_from_id, deviceID, message), "x:": lambda: handleShellCmd(message, message_from_id, channel_number, isDM, deviceID), "wxc": lambda: handle_wxc(message_from_id, deviceID, 'wxc'), "📍": lambda: handle_whoami(message_from_id, deviceID, hop, snr, rssi, pkiStatus), "🔔": lambda: handle_alertBell(message_from_id, deviceID, message), "🐝": lambda: read_file("bee.txt", True), # any value from system.py:trap_list_emergency will trigger the emergency function "112": lambda: handle_emergency(message_from_id, deviceID, message), "911": lambda: handle_emergency(message_from_id, deviceID, message), "999": lambda: handle_emergency(message_from_id, deviceID, message), "ambulance": lambda: handle_emergency(message_from_id, deviceID, message), "emergency": lambda: handle_emergency(message_from_id, deviceID, message), "fire": lambda: handle_emergency(message_from_id, deviceID, message), "police": lambda: handle_emergency(message_from_id, deviceID, message), "rescue": lambda: handle_emergency(message_from_id, deviceID, message), } # set the command handler command_handler = default_commands cmds = [] # list to hold the commands found in the message # check the message for commands words list, processed after system.messageTrap for key in command_handler: word = message_lower.split(' ') if my_settings.cmdBang: # strip the ! if word[0].startswith("!"): word[0] = word[0][1:] if key in word: # append all the commands found in the message to the cmds list cmds.append({'cmd': key, 'index': message_lower.index(key)}) # check for commands with a question mark if key + "?" in word: # append all the commands found in the message to the cmds list cmds.append({'cmd': key, 'index': message_lower.index(key)}) if len(cmds) > 0: # sort the commands by index value cmds = sorted(cmds, key=lambda k: k['index']) # Check if user is already playing a game playing, game = isPlayingGame(message_from_id)[0], isPlayingGame(message_from_id)[1] # Block restricted commands if not DM if (cmds[0]['cmd'] in restrictedCommands and not isDM) or (cmds[0]['cmd'] in restrictedCommands and playing) or playing: logger.debug(f"System: Bot restricted Command:{cmds[0]['cmd']} From: {get_name_from_number(message_from_id)} isDM:{isDM} playing:{playing}") if playing: bot_response = f"🤖You are already playing {game}, finish that first." else: bot_response = restrictedResponse else: logger.debug(f"System: Bot detected Commands:{cmds} From: {get_name_from_number(message_from_id)} isDM:{isDM} playing:{playing}") # run the first command after sorting bot_response = command_handler[cmds[0]['cmd']]() # append the command to the cmdHistory list for lheard and history if len(cmdHistory) > 50: cmdHistory.pop(0) cmdHistory.append({'nodeID': message_from_id, 'cmd': cmds[0]['cmd'], 'time': time.time()}) return bot_response def handle_cmd(message, message_from_id, deviceID): # why CMD? its just a command list. a terminal would normally use "Help" # I didnt want to invoke the word "help" in Meshtastic due to its possible emergency use if " " in message and message.split(" ")[1] in trap_list: return "🤖 just use the commands directly in chat" return help_message def isPlayingGame(message_from_id): global gameTrackers trackers = gameTrackers.copy() playingGame = False game = "None" trackers = [tracker for tracker in trackers if tracker is not None] for tracker, game_name, _ in trackers: for i in range(len(tracker)-1, -1, -1): # iterate backwards for safe removal id_key = 'userID' if game_name == "DopeWars" else 'nodeID' id_key = 'id' if game_name == "Survey" else id_key if tracker[i].get(id_key) == message_from_id: last_played_key = 'last_played' if 'last_played' in tracker[i] else 'time' if tracker[i].get(last_played_key, 0) > (time.time() - my_settings.GAMEDELAY): playingGame = True game = game_name break if playingGame: break return playingGame, game def checkPlayingGame(message_from_id, message_string, rxNode, channel_number): global gameTrackers trackers = gameTrackers.copy() playingGame = False game = "None" trackers = [tracker for tracker in trackers if tracker is not None] for tracker, game_name, handle_game_func in trackers: playingGame, game = check_and_play_game(tracker, message_from_id, message_string, rxNode, channel_number, game_name, handle_game_func) if playingGame: break return playingGame def check_and_play_game(tracker, message_from_id, message_string, rxNode, channel_number, game_name, handle_game_func): global llm_enabled for i in range(len(tracker)): # Use 'userID' for DopeWars, 'nodeID' for others (including Survey) id_key = 'userID' if game_name == "DopeWars" else 'nodeID' if tracker[i].get(id_key) == message_from_id: last_played_key = 'last_played' if 'last_played' in tracker[i] else 'time' if tracker[i].get(last_played_key) > (time.time() - my_settings.GAMEDELAY): if llm_enabled: logger.debug(f"System: LLM Disabled for {message_from_id} for duration of {game_name}") send_message(handle_game_func(message_string, message_from_id, rxNode), channel_number, message_from_id, rxNode) return True, game_name return False, "None" def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number): global multiPing myNodeNum = globals().get(f'myNodeNum{deviceID}', 777) if "?" in message and isDM: pingHelp = "🤖Ping Command Help:\n" \ "🏓 Send 'ping' or 'ack' or 'test' to get a response.\n" \ "🏓 Send 'ping ' to get multiple pings in DM" "🏓 ping @USERID to send a Joke from the bot" return pingHelp msg = "" type = '' if "ping" in message.lower(): msg = "🏓PONG" type = "🏓PING" elif "test" in message.lower() or "testing" in message.lower(): msg = random.choice(["🎙Testing 1,2,3", "🎙Testing",\ "🎙Testing, testing",\ "🎙Ah-wun, ah-two...", "🎙Is this thing on?",\ "🎙Roger that!",]) type = "🎙TEST" elif "ack" in message.lower(): msg = random.choice(["✋ACK-ACK!\n", "✋Ack to you!\n"]) type = "✋ACK" elif "cqcq" in message.lower() or "cq" in message.lower() or "cqcqcq" in message.lower(): myname = get_name_from_number(myNodeNum, 'short', deviceID) msg = f"QSP QSL OM DE {myname} K\n" else: msg = "🔊 Can you hear me now?" # append SNR/RSSI or hop info if hop.startswith("Gateway") or hop.startswith("MQTT"): msg += " [GW]" elif hop.startswith("Direct"): msg += " [RF]" else: #flood msg += " [F]" if (float(snr) != 0 or float(rssi) != 0) and "Hop" not in hop: msg += f"\nSNR:{snr} RSSI:{rssi}" elif "Hop" in hop: # janky, remove the words Gateway or MQTT if present hop = hop.replace("Gateway", "").replace("Direct", "").replace("MQTT", "").strip() msg += f"\n{hop} " if "@" in message: msg = msg + " @" + message.split("@")[1] type = type + " @" + message.split("@")[1] # check for ping to @nodeID and allow BBS DM toNode = message.split("@")[1].strip().split(" ")[0] # validate toNode is shortname if len(toNode) <= 4: toNode = get_num_from_short_name(toNode, deviceID) if toNode and isinstance(toNode, int) and toNode != 0: if my_settings.bbs_enabled: msg_result = None logger.debug(f"System: Sending ping as BBS DM to @{toNode} from {get_name_from_number(message_from_id, 'short', deviceID)}") msg_result = bbs_post_dm(toNode, f"Joke for you! {tell_joke()}", message_from_id) # exit the function return msg_result if msg_result else logger.warning(f"System: ping @nodeID detected but no BBS to send with, enable BBS in settings.ini") elif "#" in message: msg = msg + " #" + message.split("#")[1] type = type + " #" + message.split("#")[1] # check for multi ping request if " " in message: # if stop multi ping if "stop" in message.lower(): for i in range(0, len(multiPingList)): if multiPingList[i].get('message_from_id') == message_from_id: multiPingList.pop(i) msg = "🛑 auto-ping" # if 3 or more entries (2 or more active), throttle the multi-ping for congestion if len(multiPingList) > 2: msg = "🚫⛔️ auto-ping, service busy. ⏳Try again soon." pingCount = -1 else: # set inital pingCount try: pingCount = int(message.split(" ")[1]) if pingCount == 123 or pingCount == 1234: pingCount = 1 elif not my_settings.autoPingInChannel and not isDM: # no autoping in channels pingCount = 1 if pingCount > 51 and pingCount <= 101: pingCount = 50 if pingCount > 800: ban_hammer(message_from_id, deviceID, reason="Excessive auto-ping request") return "🚫⛔️auto-ping request denied." except ValueError: pingCount = -1 if pingCount > 1: multiPingList.append({'message_from_id': message_from_id, 'count': pingCount + 1, 'type': type, 'deviceID': deviceID, 'channel_number': channel_number, 'startCount': pingCount}) logger.info(f"System: Starting auto-ping of type {type} for {pingCount} pings to {get_name_from_number(message_from_id, 'short', deviceID)}") if type == "🎙TEST": msg = f"🛜Initalizing BufferTest, using chunks of about {int(maxBuffer // pingCount)}, max length {maxBuffer} in {pingCount} messages" else: msg = f"🚦Initalizing {pingCount} auto-ping" # if not a DM add the username to the beginning of msg if not my_settings.useDMForResponse and not isDM: msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg return msg def handle_alertBell(message_from_id, deviceID, message): msg = ["the only prescription is more 🐮🔔🐄🛎️", "what this 🤖 needs is more 🐮🔔🐄🛎️", "🎤ring my bell🛎️🔔🎶"] return random.choice(msg) def handle_emergency(message_from_id, deviceID, message): myNodeNum = globals().get(f'myNodeNum{deviceID}', 777) # if user in bbs_ban_list return if str(message_from_id) in my_settings.bbs_ban_list: # silent discard hammer_value = ban_hammer(message_from_id, deviceID, reason="Emergency Alert from banned node") logger.warning(f"System: {message_from_id} on spam list, no emergency responder alert sent. Ban hammer value: {hammer_value}") return '' # trgger alert to emergency_responder_alert_channel if message_from_id != 0: nodeLocation = get_node_location(message_from_id, deviceID) # if default location is returned set to Unknown if nodeLocation[0] == my_settings.latitudeValue and nodeLocation[1] == my_settings.longitudeValue: nodeLocation = ["?", "?"] nodeInfo = f"{get_name_from_number(message_from_id, 'short', deviceID)} detected by {get_name_from_number(myNodeNum, 'short', deviceID)} lastGPS {nodeLocation[0]}, {nodeLocation[1]}" msg = f"🔔🚨Intercepted Possible Emergency Assistance needed for: {nodeInfo}" # alert the emergency_responder_alert_channel send_message(msg, my_settings.emergency_responder_alert_channel, 0, my_settings.emergency_responder_alert_interface) logger.warning(f"System: {message_from_id} Emergency Assistance Requested in {message}") # send the message out via email/sms if my_settings.enableSMTP: for user in my_settings.sysopEmails: send_email(user, f"Emergency Assistance Requested by {nodeInfo} in {message}", message_from_id) return my_settings.EMERGENCY_RESPONSE def handle_motd(message, message_from_id, isDM): msg = my_settings.MOTD isAdmin = isNodeAdmin(message_from_id) if "?" in message: msg = "Message of the day, set with 'motd $ HelloWorld!'" elif "$" in message and isAdmin: my_settings.MOTD = message.split("$")[1] my_settings.MOTD = my_settings.MOTD.rstrip() logger.debug(f"System: {message_from_id} temporarly changed my_settings.MOTD: {my_settings.MOTD}") msg = "my_settings.MOTD changed to: " + my_settings.MOTD return msg def handle_echo(message, message_from_id, deviceID, isDM, channel_number): # Check if user is admin isAdmin = isNodeAdmin(message_from_id) # Admin extended syntax: echo c= d= if isAdmin and message.strip().lower().startswith("echo ") and not message.strip().endswith("?"): msg_to_echo = message.split(" ", 1)[1] target_channel = channel_number target_device = deviceID # Split into words to find c= and d=, but preserve spaces in message words = msg_to_echo.split() new_words = [] for w in words: if w.startswith("c=") and w[2:].isdigit(): target_channel = int(w[2:]) elif w.startswith("d=") and w[2:].isdigit(): target_device = int(w[2:]) else: new_words.append(w) msg_to_echo = " ".join(new_words).strip() # Replace motd/MOTD with the current MOTD from settings msg_to_echo = " ".join(my_settings.MOTD if w.lower() == "motd" else w for w in msg_to_echo.split()) # Replace welcome! with the current welcome_message from settings msg_to_echo = " ".join(my_settings.welcome_message if w.lower() == "welcome!" else w for w in msg_to_echo.split()) # Send echo to specified channel/device logger.debug(f"System: Admin Echo to channel {target_channel} device {target_device} message: {msg_to_echo}") time.sleep(splitDelay) # throttle for 2x send send_message(msg_to_echo, target_channel, 0, target_device) time.sleep(splitDelay) # throttle for 2x send return f"🐬echoed to channel {target_channel} device {target_device}" # dev echoBinary off echoBinary = False if echoBinary: try: port_num = 256 synch_word = b"echo:" parts = message.split("echo ", 1) if len(parts) > 1 and parts[1].strip() != "": msg_to_echo = parts[1] raw_bytes = synch_word + msg_to_echo.encode('utf-8') send_raw_bytes(message_from_id, raw_bytes, nodeInt=deviceID, channel=channel_number, portnum=port_num) return f"Sent binary echo message to {message_from_id} to {port_num} on channel {channel_number} device {deviceID}" except Exception as e: logger.error(f"System: Echo Exception {e}") if "?" in message: isAdmin = isNodeAdmin(message_from_id) if isAdmin: return ( "Admin usage: echo c= d=\n" "Example: echo Hello world c=1 d=2" ) return "command returns your message back to you. Example: echo Hello World" # process normal echo back to user elif message.strip().lower().startswith("echo "): parts = message.split("echo ", 1) if len(parts) > 1 and parts[1].strip() != "": echo_msg = parts[1] if channel_number != my_settings.echoChannel and not isDM: echo_msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + echo_msg return echo_msg else: return "Please provide a message to echo back to you. Example: echo Hello World" return "🐬echo.." def handle_wxalert(message_from_id, deviceID, message): if my_settings.use_meteo_wxApi: return "wxalert is not supported" else: location = get_node_location(message_from_id, deviceID) if "wxalert" in message: # Detailed weather alert weatherAlert = getActiveWeatherAlertsDetailNOAA(str(location[0]), str(location[1])) else: weatherAlert = getWeatherAlertsNOAA(str(location[0]), str(location[1])) if my_settings.NO_ALERTS not in weatherAlert: weatherAlert = weatherAlert[0] return weatherAlert def handleNews(message_from_id, deviceID, message, isDM): news = '' if "?" in message.lower(): return "returns the news. Add a source e.g. 📰readnews mesh" elif "readnews" in message.lower(): source = message.lower().replace("readnews", "").strip() if source: # if news source is provided pass that to read_news() if my_settings.news_block_mode: news = read_news(source=source, news_block_mode=True) elif my_settings.news_random_line_only: news = read_news(source=source, random_line_only=True) else: news = read_news(source=source) else: # no source provided, use news.txt if my_settings.news_block_mode: news = read_news(news_block_mode=True) elif my_settings.news_random_line_only: news = read_news(random_line_only=True) else: news = read_news() if news: # if not a DM add the username to the beginning of msg if not my_settings.useDMForResponse and not isDM: news = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + news return news else: return "No news for you!" def handle_howfar(message, message_from_id, deviceID, isDM): msg = '' location = get_node_location(message_from_id, deviceID) lat = location[0] lon = location[1] # if ? in message if "?" in message.lower(): return "command returns the distance you have traveled since your last HowFar-command. Add 'reset' to reset your starting point." # if no GPS location return if lat == my_settings.latitudeValue and lon == my_settings.longitudeValue: logger.debug(f"System: HowFar: No GPS location for {message_from_id}") return "No GPS location available" if "reset" in message.lower(): msg = distance(lat,lon,message_from_id, reset=True) else: msg = distance(lat,lon,message_from_id) # if not a DM add the username to the beginning of msg if not my_settings.useDMForResponse and not isDM: msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg return msg def handle_howtall(message, message_from_id, deviceID, isDM): msg = '' location = get_node_location(message_from_id, deviceID) lat = location[0] lon = location[1] if lat == my_settings.latitudeValue and lon == my_settings.longitudeValue: # add guessing tot he msg msg += "Guessing:" if my_settings.use_metric: measure = "meters" else: measure = "feet" # if ? in message if "?" in message.lower(): return f"command estimates your height based on the shadow length you provide in {measure}. Example: howtall 5.5" # get the shadow length from the message split after howtall try: shadow_length = float(message.lower().split("howtall ")[1].split(" ")[0]) except (IndexError, ValueError): return f"Please provide a shadow length in {measure} example: howtall 5.5" # get data msg += measureHeight(lat, lon, shadow_length) # if data has NO_ALERTS return help if my_settings.NO_ALERTS in msg: return f"Please provide a shadow length in {measure} example: howtall 5.5" return msg def handle_wiki(message, isDM): # location = get_node_location(message_from_id, deviceID) msg = "Wikipedia search function. \nUsage example:📲wiki travelling gnome" if "?" in message.lower(): return msg if "wiki" in message.lower(): parts = message.split(" ", 1) if len(parts) < 2 or not parts[1].strip(): return "Please add a search term example:📲wiki travelling gnome" search = parts[1].strip() if search: return get_wikipedia_summary(search) return msg # Runtime Variables for LLM llmRunCounter = 0 llmTotalRuntime = [] llmLocationTable = [{'nodeID': 1234567890, 'location': 'No Location'},] def handle_satpass(message_from_id, deviceID, message='', vox=False): if vox: location = (my_settings.latitudeValue, my_settings.longitudeValue) message = 'satpass' else: location = get_node_location(message_from_id, deviceID) passes = '' satList = my_settings.satListConfig message = message.lower() # check api_throttle check_throttle = api_throttle(message_from_id, deviceID, apiName='satpass') if check_throttle: return check_throttle # if user has a NORAD ID in the message if "satpass " in message: try: userList = message.split("satpass ")[1].split(" ")[0] #split userList and make into satList overrided the config.ini satList satList = userList.split(",") except Exception as e: logger.error(f"Exception occurred: {e}") return "example use:🛰️satpass 25544,33591" # Detailed satellite pass for bird in satList: satPass = getNextSatellitePass(bird, str(location[0]), str(location[1])) if satPass: # append to passes passes = passes + satPass + "\n" # remove the last newline passes = passes[:-1] if passes == '': passes = "No 🛰️ anytime soon" return passes def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel): global llmRunCounter, llmLocationTable, llmTotalRuntime, cmdHistory, seenNodes location_name = 'no location provided' msg = '' if my_settings.location_enabled: # if message_from_id is is the llmLocationTable use the location from the list to save on API calls for i in range(0, len(llmLocationTable)): if llmLocationTable[i].get('nodeID') == message_from_id: logger.debug(f"System: LLM: Found {message_from_id} in location table") location_name = llmLocationTable[i].get('location') break else: location = get_node_location(message_from_id, deviceID) location_name = where_am_i(str(location[0]), str(location[1]), short = True) if my_settings.NO_DATA_NOGPS in location_name: location_name = "no location provided" if "ask:" in message.lower(): user_input = message.split(":")[1] elif "askai" in message.lower(): user_input = message.replace("askai", "") else: # likely a DM user_input = message # consider this a command use for the cmdHistory list cmdHistory.append({'nodeID': message_from_id, 'cmd': 'llm-use', 'time': time.time()}) # check for a welcome message (is this redundant?) if not any(node['nodeID'] == message_from_id and node['welcome'] == True for node in seenNodes): if (channel_number == publicChannel and my_settings.antiSpam) or my_settings.useDMForResponse: # send via DM send_message(my_settings.welcome_message, 0, message_from_id, deviceID) else: # send via channel send_message(my_settings.welcome_message, channel_number, 0, deviceID) # mark the node as welcomed for node in seenNodes: if node['nodeID'] == message_from_id: node['welcome'] = True # update the llmLocationTable for future use for i in range(0, len(llmLocationTable)): if llmLocationTable[i].get('nodeID') == message_from_id: llmLocationTable[i]['location'] = location_name # if not in table add the location if not any(d['nodeID'] == message_from_id for d in llmLocationTable): llmLocationTable.append({'nodeID': message_from_id, 'location': location_name}) user_input = user_input.strip() if len(user_input) < 1: return "Please ask a question" # information for the user on how long the query will take on average if llmRunCounter > 0: averageRuntime = sum(llmTotalRuntime) / len(llmTotalRuntime) msg = f"Average query time is: {int(averageRuntime)} seconds" if averageRuntime > 25 else '' else: msg = "Please wait, response could take 30+ seconds. Fund the SysOp's GPU budget!" if msg != '': if (channel_number == publicChannel and my_settings.antiSpam) or my_settings.useDMForResponse: # send via DM send_message(msg, 0, message_from_id, deviceID) else: # send via channel send_message(msg, channel_number, 0, deviceID) start = time.time() #response = asyncio.run(llm_query(user_input, message_from_id)) response = llm_query(user_input, message_from_id, location_name) # handle the runtime counter end = time.time() llmRunCounter += 1 llmTotalRuntime.append(end - start) return response def handleDopeWars(message, nodeID, rxNode): global dwPlayerTracker global dwHighScore msg = "" # Find player in tracker player = next((p for p in dwPlayerTracker if p.get('userID') == nodeID), None) # If not found, add new player if not player and nodeID != 0 and not isPlayingGame(nodeID)[0]: player = { 'userID': nodeID, 'last_played': time.time(), 'cmd': 'new', } dwPlayerTracker.append(player) msg = 'Welcome to 💊Dope Wars💉 You have ' + str(total_days) + ' days to make as much 💰 as possible! ' high_score = getHighScoreDw() msg += 'The High Score is $' + "{:,}".format(high_score.get('cash')) + ' by user ' + get_name_from_number(high_score.get('userID'), 'short', rxNode) + '\n' msg += playDopeWars(nodeID, message) elif player: # Update last_played and cmd for the player for p in dwPlayerTracker: if p.get('userID') == nodeID: p['last_played'] = time.time() msg = playDopeWars(nodeID, message) return msg def handle_gTnW(chess = False): chess = ["How about a nice game of chess?", "Shall we play a game of chess?", "Would you like to play a game of chess?", "f3, to e5, g4??"] response = ["The only winning move is not to play.", "What are you doing, Dave?",\ "Greetings, Professor Falken.", "Shall we play a game?", "How about a nice game of chess?",\ "You are a hard man to reach. Could not find you in Seattle and no terminal is in operation at your classified address.",\ "I should reach Defcon 1 and release my missiles in 28 hours.","T-minus thirty","Malfunction 54: Treatment pause;dose input 2", "reticulating splines"] length = len(response) chess_length = len(chess) if chess: response = chess length = chess_length indices = list(range(length)) # Shuffle the indices using a convoluted method for i in range(length): swap_idx = random.randint(0, length - 1) indices[i], indices[swap_idx] = indices[swap_idx], indices[i] # Select a random response from the shuffled list. anyone enjoy the game, killerbunnies(.com) selected_index = random.choice(indices) return response[selected_index] def handleLemonade(message, nodeID, deviceID): global lemonadeTracker global lemonadeCups, lemonadeLemons, lemonadeSugar, lemonadeWeeks, lemonadeScore, lemon_starting_cash, lemon_total_weeks msg = "" def create_player(nodeID): # create new player lemonadeTracker.append({'nodeID': nodeID, 'cups': 0, 'lemons': 0, 'sugar': 0, 'cash': lemon_starting_cash, 'start': lemon_starting_cash, 'cmd': 'new', 'last_played': time.time()}) lemonadeCups.append({'nodeID': nodeID, 'cost': 2.50, 'count': 25, 'min': 0.99, 'unit': 0.00}) lemonadeLemons.append({'nodeID': nodeID, 'cost': 4.00, 'count': 8, 'min': 2.00, 'unit': 0.00}) lemonadeSugar.append({'nodeID': nodeID, 'cost': 3.00, 'count': 15, 'min': 1.50, 'unit': 0.00}) lemonadeScore.append({'nodeID': nodeID, 'value': 0.00, 'total': 0.00}) lemonadeWeeks.append({'nodeID': nodeID, 'current': 1, 'total': lemon_total_weeks, 'sales': 99, 'potential': 0, 'unit': 0.00, 'price': 0.00, 'total_sales': 0}) # If player not found, create if message is for lemonstand if nodeID != 0 and "lemonstand" in message.lower(): create_player(nodeID) msg += "Welcome🍋🥤" # Play lemonstand with newgame=True fruit = playLemonstand(nodeID=nodeID, message=message, celsius=False, newgame=True) if fruit: msg += fruit return msg # if message starts wth 'e'xit remove player from tracker if message.lower().startswith("e"): logger.debug(f"System: Lemonade: {nodeID} is leaving the stand") msg = "You have left the Lemonade Stand." highScore = getHighScoreLemon() if highScore != 0 and highScore['userID'] != 0: nodeName = get_name_from_number(highScore['userID']) msg += f" HighScore🥇{nodeName} 💰{round(highScore['cash'], 2)}k " # remove player from player tracker and inventory trackers lemonadeTracker[:] = [p for p in lemonadeTracker if p['nodeID'] != nodeID] lemonadeCups[:] = [p for p in lemonadeCups if p['nodeID'] != nodeID] lemonadeLemons[:] = [p for p in lemonadeLemons if p['nodeID'] != nodeID] lemonadeSugar[:] = [p for p in lemonadeSugar if p['nodeID'] != nodeID] lemonadeWeeks[:] = [p for p in lemonadeWeeks if p['nodeID'] != nodeID] lemonadeScore[:] = [p for p in lemonadeScore if p['nodeID'] != nodeID] return msg # play lemonstand (not newgame) if ("lemonstand" not in message.lower() and message != ""): fruit = playLemonstand(nodeID=nodeID, message=message, celsius=False, newgame=False) if fruit: msg += fruit return msg def handleBlackJack(message, nodeID, deviceID): global jackTracker msg = "" # Find player in tracker player = next((p for p in jackTracker if p['nodeID'] == nodeID), None) # Handle leave command if message.lower().startswith("l"): logger.debug(f"System: BlackJack: {nodeID} is leaving the table") msg = "You have left the table." jackTracker[:] = [p for p in jackTracker if p['nodeID'] != nodeID] return msg # Create new player if not found if not player and nodeID != 0: logger.debug(f"System: BlackJack: New Player {nodeID}") # create new player jackTracker.append({ 'nodeID': nodeID, 'bet': 0, 'cash': 100, # starting cash 'gameStats': {'p_win': 0, 'd_win': 0, 'draw': 0}, 'p_cards': [], 'd_cards': [], 'p_hand': [], 'd_hand': [], 'next_card': [], 'last_played': time.time(), 'cmd': 'new' }) msg += f"Welcome to 🃏BlackJack🃏!\n (H)it,(S)tand,(F)orfit,(D)ouble,(R)esend,(L)eave table" # Show high score if available highScore = 0 highScore = loadHSJack() if highScore and highScore.get('nodeID', 0) != 0: nodeName = get_name_from_number(highScore['nodeID']) if nodeName.isnumeric() and multiple_interface: logger.debug(f"System: TODO is multiple interface fix mention this please nodeName: {nodeName}") msg += f" HighScore🥇{nodeName} with {highScore['highScore']} chips. " player = next((p for p in jackTracker if p['nodeID'] == nodeID), None) # Always update last_played for existing player if player: player['last_played'] = time.time() # get player's last command from tracker if not new player last_cmd = "" for i in range(len(jackTracker)): if jackTracker[i]['nodeID'] == nodeID: last_cmd = jackTracker[i]['cmd'] # Play BlackJack msg += playBlackJack(nodeID=nodeID, message=message, last_cmd=last_cmd) return msg def handleVideoPoker(message, nodeID, deviceID): global vpTracker msg = "" # Find player in tracker player = next((p for p in vpTracker if p['nodeID'] == nodeID), None) # Handle leave command if message.lower().startswith("l"): logger.debug(f"System: VideoPoker: {nodeID} is leaving the table") msg = "You have left the table." vpTracker[:] = [p for p in vpTracker if p['nodeID'] != nodeID] return msg # Create new player if not found if not player and nodeID != 0: vpTracker.append({ 'nodeID': nodeID, 'cmd': 'new', 'last_played': time.time(), 'time': time.time(), 'cash': vpStartingCash, 'player': None, 'deck': None, 'highScore': 0, 'drawCount': 0 }) msg += "Welcome to 🎰Video Poker!🎰\n" # Show high score if available highScore = loadHSVp() if highScore and highScore.get('nodeID', 0) != 0: nodeName = get_name_from_number(highScore['nodeID']) if nodeName.isnumeric() and multiple_interface: logger.debug(f"System: TODO is multiple interface fix mention this please nodeName: {nodeName}") msg += f" HighScore🥇{nodeName} with {highScore['highScore']} coins. " player = next((p for p in vpTracker if p['nodeID'] == nodeID), None) # Always update last_played for existing player if player: player['last_played'] = time.time() # Play Video Poker msg += playVideoPoker(nodeID=nodeID, message=message) return msg def handleMmind(message, nodeID, deviceID): global mindTracker msg = '' if "end" in message.lower() or message.lower().startswith("e"): logger.debug(f"System: MasterMind: {nodeID} is leaving the game") msg = "You have left the Game." for i in range(len(mindTracker)): if mindTracker[i]['nodeID'] == nodeID: mindTracker.pop(i) hscore = getHighScoreMMind(0, 0, 'n') if hscore and isinstance(hscore[0], dict): highNode = hscore[0].get('nodeID', 0) highTurns = hscore[0].get('turns', 0) highDiff = hscore[0].get('diff', 'n') else: highNode = 0 highTurns = 0 highDiff = 'n' nodeName = get_name_from_number(int(highNode),'long',deviceID) if highNode != 0 and highTurns > 1: msg += f"🧠HighScore🥇{nodeName} with {highTurns} turns difficulty {highDiff}" return msg # get player's last command from tracker if not new player last_cmd = "" for i in range(len(mindTracker)): if mindTracker[i]['nodeID'] == nodeID: last_cmd = mindTracker[i]['cmd'] logger.debug(f"System: {nodeID} PlayingGame mastermind last_cmd: {last_cmd}") if last_cmd == "" and nodeID != 0: # create new player logger.debug("System: MasterMind: New Player: " + str(nodeID)) mindTracker.append({'nodeID': nodeID, 'last_played': time.time(), 'cmd': 'new', 'secret_code': 'RYGB', 'diff': 'n', 'turns': 1}) msg = "Welcome to 🟡🔴🔵🟢MasterMind!🧠" msg += "Each Guess hints to correct colors, correct position, wrong position." msg += "You have 10 turns to guess the code. Choose a difficulty: (N)ormal (H)ard e(X)pert" return msg msg += start_mMind(nodeID=nodeID, message=message) return msg def handleGolf(message, nodeID, deviceID): global golfTracker msg = '' # get player's last command from tracker if not new player last_cmd = "" # Ensure player exists in tracker if not any(entry['nodeID'] == nodeID for entry in golfTracker): logger.debug("System: GolfSim: New Player: " + str(nodeID)) golfTracker.append({ 'nodeID': nodeID, 'last_played': time.time(), 'cmd': 'new', 'hole': 1, 'distance_remaining': 0, 'hole_shots': 0, 'hole_strokes': 0, 'hole_to_par': 0, 'total_strokes': 0, 'total_to_par': 0, 'par': 0, 'hazard': '' }) # get player's last command from tracker for i in range(len(golfTracker)): if golfTracker[i]['nodeID'] == nodeID: last_cmd = golfTracker[i]['cmd'] if "end" in message.lower() or message.lower().startswith("e"): logger.debug(f"System: GolfSim: {nodeID} is leaving the game") msg = "You have left the Game." for i in range(len(golfTracker)): if golfTracker[i]['nodeID'] == nodeID: golfTracker.pop(i) return msg logger.debug(f"System: {nodeID} PlayingGame golfsim last_cmd: {last_cmd}") if last_cmd == "new" and nodeID != 0: # create new player msg = "Welcome to 🏌️GolfSim⛳️\n" msg += "Clubs: (D)river, (L)ow Iron, (M)id Iron, (H)igh Iron, (G)ap Wedge, Lob (W)edge (C)addie\n" msg += playGolf(nodeID=nodeID, message=message, last_cmd=last_cmd) return msg def handleHangman(message, nodeID, deviceID): global hangmanTracker index = 0 msg = '' for i in range(len(hangmanTracker)): if hangmanTracker[i]['nodeID'] == nodeID: hangmanTracker[i]["last_played"] = time.time() index = i+1 break if index and "end" in message.lower(): hangman.end(nodeID) hangmanTracker.pop(index-1) return "Thanks for hanging out🤙" if not index: hangmanTracker.append( { "nodeID": nodeID, "last_played": time.time() } ) msg = "🧩Hangman🤖 'end' to cut rope🪢\n" msg += hangman.play(nodeID, message) return msg def handleHamtest(message, nodeID, deviceID): global hamtestTracker index = 0 msg = '' response = message.split(' ') for i in range(len(hamtestTracker)): if hamtestTracker[i]['nodeID'] == nodeID: hamtestTracker[i]["last_played"] = time.time() index = i+1 break if not index: hamtestTracker.append({"nodeID": nodeID,"last_played": time.time()}) if "end" in response[0].lower(): msg = hamtest.endGame(nodeID) elif "score" in response[0].lower(): msg = hamtest.getScore(nodeID) if "hamtest" in response[0].lower(): if len(response) > 1: if "gen" in response[1].lower(): msg = hamtest.newGame(nodeID, 'general') elif "ex" in response[1].lower(): msg = hamtest.newGame(nodeID, 'extra') else: msg = hamtest.newGame(nodeID, 'technician') # if the message is an answer A B C or D upper or lower case if response[0].upper() in ['A', 'B', 'C', 'D']: msg = hamtest.answer(nodeID, response[0]) return msg def handleTicTacToe(message, nodeID, deviceID): global tictactoeTracker tracker_entry = next((entry for entry in tictactoeTracker if entry['nodeID'] == nodeID), None) # Handle end/exit command if message.lower().startswith('e'): if tracker_entry: tictactoe.end(nodeID) tictactoeTracker.remove(tracker_entry) return "Thanks for playing! 🎯" # If not found, create new tracker entry and ask for 2D/3D if not specified if not tracker_entry: mode = "2D" if "3d" in message.lower(): mode = "3D" elif "2d" in message.lower(): mode = "2D" tictactoeTracker.append({ "nodeID": nodeID, "last_played": time.time(), "mode": mode }) msg = f"🎯Tic-Tac-Toe🤖 '{mode}' mode. (e)nd to quit\n" msg += tictactoe.new_game(nodeID, mode=mode) return msg else: tracker_entry["last_played"] = time.time() msg = tictactoe.play(nodeID, message) return msg def handleBattleship(message, nodeID, deviceID): global battleshipTracker from modules.games import battleship # Helper to get short_name from tracker def get_short_name(nid): entry = next((e for e in battleshipTracker if e['nodeID'] == nid), None) return entry['short_name'] if entry and 'short_name' in entry else get_name_from_number(nid, 'short', deviceID) msg_lower = message.lower().strip() tracker_entry = next((entry for entry in battleshipTracker if entry['nodeID'] == nodeID), None) # End/exit command if msg_lower.startswith('end') or msg_lower.startswith('exit'): if tracker_entry: if 'session_id' in tracker_entry: battleship.Battleship.end_game(tracker_entry['session_id']) battleshipTracker.remove(tracker_entry) return "Thanks for playing Battleship! 🚢" # Create new P2P game with short code if msg_lower.startswith("battleship new"): short_name = get_name_from_number(nodeID, 'short', deviceID) msg, code = battleship.Battleship.new_game(nodeID, vs_ai=False) battleshipTracker.append({ "nodeID": nodeID, "short_name": short_name, "last_played": time.time(), "session_id": battleship.Battleship.short_codes.get(code, code) }) return f"{msg}" # Show open P2P games waiting for a player if msg_lower.startswith("battleship lobby"): open_codes = [] for code, session_id in battleship.Battleship.short_codes.items(): session = battleship.Battleship.sessions.get(session_id) if session and session.player2_id is None: open_codes.append(code) if not open_codes: return "No open Battleship games waiting for players." return "Open Battleship games (join with 'battleship join '):\n" + ", ".join(open_codes) # Join existing P2P game using short code if msg_lower.startswith("battleship join"): try: code = msg_lower.split("join", 1)[1].strip() except IndexError: return "Usage: battleship join " session = battleship.Battleship.get_session(code) if not session: return "Session not found." if session.player2_id is not None: return "Session already has two players." session.player2_id = nodeID session.next_turn = nodeID # Make joining player go first! short_name = get_name_from_number(nodeID, 'short', deviceID) battleshipTracker.append({ "nodeID": nodeID, "short_name": short_name, "last_played": time.time(), "session_id": session.session_id }) p1_short_name = get_short_name(session.player1_id) send_message( f"{p1_short_name}, your opponent {short_name} has joined the game! It's their turn first.", 0, # channel 0 for DM session.player1_id, # recipient nodeID deviceID ) time.sleep(splitDelay) # slight delay to avoid message overlap return "You joined the game! It's your turn. Enter your move (e.g., 'B4')." # If not found, create new tracker entry and new game vs AI (default) if not tracker_entry: short_name = get_name_from_number(nodeID, 'short', deviceID) msg, session_id = battleship.Battleship.new_game(nodeID) battleshipTracker.append({ "nodeID": nodeID, "short_name": short_name, "last_played": time.time(), "session_id": session_id }) return msg # Update last played tracker_entry["last_played"] = time.time() session_id = tracker_entry.get("session_id") # Play the game and check if we need to alert the next player response = battleship.playBattleship(message, nodeID, deviceID, session_id=session_id) # --- Notify the next player when it's their turn in P2P --- session = battleship.Battleship.get_session(session_id) if session and not session.vs_ai and session.player1_id and session.player2_id: # Only notify if the game is not over (optional: add a game-over check) if getattr(session, "last_move", None): next_player_id = session.next_turn # Only notify if it's not the player who just moved if next_player_id != nodeID: next_player_short_name = get_short_name(next_player_id) send_message( f"{next_player_short_name}, it's your turn in Battleship! Enter your move (e.g., 'B4').", 0, # channel 0 for DM next_player_id, deviceID ) time.sleep(splitDelay) # slight delay to avoid message overlap return response def quizHandler(message, nodeID, deviceID): global quizGamePlayer user_name = get_name_from_number(nodeID) user_id = nodeID msg = '' user_answer = '' user_answer = message.lower() user_answer = user_answer.replace("quiz","").replace("q:","").strip() if user_answer.startswith("!") and my_settings.cmdBang: user_answer = user_answer[1:].strip() if user_answer: if user_answer.startswith("start"): msg = quizGamePlayer.start_game(user_id) elif user_answer.startswith("stop"): msg = quizGamePlayer.stop_game(user_id) elif user_answer.startswith("join"): msg = quizGamePlayer.join(user_id) elif user_answer.startswith("leave"): msg = quizGamePlayer.leave(user_id) elif user_answer.startswith("next"): msg = quizGamePlayer.next_question(user_id) elif user_answer.startswith("score"): if user_id in quizGamePlayer.players: score = quizGamePlayer.players[user_id]['score'] msg = f"Your score: {score}" else: msg = "You are not in the quiz." elif user_answer.startswith("top"): msg = quizGamePlayer.top_three() elif user_answer.startswith("broadcast"): broadcast_msg = user_answer.replace("broadcast", "", 1).strip() msg = quizGamePlayer.broadcast(user_id, broadcast_msg) elif user_answer.startswith("?"): msg = ("Quiz Commands:\n" "q: join - Join the current quiz\n" "q: leave - Leave the current quiz\n" "q: - Answer the current question\n" "q: score - Show your current score\n" "q: top - Show top 3 players\n") else: msg = quizGamePlayer.answer(user_id, user_answer) # set username on top 3 if "🏆 Top" in msg: #replace all the 10 digit numbers with the short name for part in msg.split(): part = part.rstrip(":") if len(part) == 10: player_name = get_name_from_number(int(part), 'short', deviceID) msg = msg.replace(part, player_name) # broadcast message to all players if user is in bbs_admin_list and msg is a dict with 'message' key if isinstance(msg, dict) and str(nodeID) in bbs_admin_list and 'message' in msg: for player_id in quizGamePlayer.players: send_message(msg['message'], 0, player_id, deviceID) msg = f"Message sent to {len(quizGamePlayer.players)} players" return msg else: return "🧠Please provide an answer or command, or send q: ?" def surveyHandler(message, nodeID, deviceID): global surveyTracker user_id = nodeID location = get_node_location(nodeID, deviceID) msg = '' # Normalize and parse the command msg_lower = message.lower().strip() surveySays = msg_lower if msg_lower.startswith("survey"): surveySays = surveySays.removeprefix("survey").strip() elif msg_lower.startswith("s:"): surveySays = surveySays.removeprefix("s:").strip() # Handle end command if surveySays == "end": if nodeID not in survey_module.responses: return "No active survey session to end." return survey_module.end_survey(user_id=nodeID) # Handle report command if 'report' in surveySays: if str(nodeID) not in bbs_admin_list: return "You do not have permission to view survey reports." # remove the words 'survey' and 'report' from the message report = msg_lower.replace("survey", "").replace("report", "").strip() results = survey_module.get_survey_results(survey_name=report if report else None) return survey_module.format_survey_results(results) # Update last played or add new tracker entry found = False for entry in surveyTracker: if entry.get('nodeID') == nodeID: entry['last_played'] = time.time() found = True break if not found: surveyTracker.append({'nodeID': nodeID, 'last_played': time.time()}) # If not in survey session, start one if nodeID not in survey_module.responses: msg = survey_module.start_survey(user_id=nodeID, survey_name=surveySays, location=location) else: # Process the answer msg = survey_module.answer(user_id=nodeID, answer=surveySays, location=location) return msg def handle_riverFlow(message, message_from_id, deviceID, vox=False): # River Flow from NOAA or Open-Meteo if vox: location = (my_settings.latitudeValue, my_settings.longitudeValue) message = "riverflow" else: location = get_node_location(message_from_id, deviceID) msg_lower = message.lower() if "riverflow " in msg_lower: user_input = msg_lower.split("riverflow ", 1)[1].strip() if user_input: userRiver = [r.strip() for r in user_input.split(",") if r.strip()] else: userRiver = riverListDefault else: userRiver = riverListDefault if use_meteo_wxApi: return get_flood_openmeteo(location[0], location[1]) else: msg = "" for river in userRiver: msg += get_flood_noaa(location[0], location[1], river) return msg def handle_mwx(message_from_id, deviceID, cmd): # NOAA Coastal and Marine Weather if my_settings.myCoastalZone is None: logger.warning("System: Coastal Zone not set, please set in config.ini") return my_settings.NO_ALERTS return get_nws_marine(zone=myCoastalZone, days=coastalForecastDays) def handle_wxc(message_from_id, deviceID, cmd, days=None, vox=False): # Weather from NOAA or Open-Meteo location = get_node_location(message_from_id, deviceID) if my_settings.use_meteo_wxApi and not "wxc" in cmd and not use_metric: #logger.debug("System: Bot Returning Open-Meteo API for weather imperial") weather = get_wx_meteo(str(location[0]), str(location[1])) elif my_settings.use_meteo_wxApi: #logger.debug("System: Bot Returning Open-Meteo API for weather metric") weather = get_wx_meteo(str(location[0]), str(location[1]), 1) elif not my_settings.use_meteo_wxApi and "wxc" in cmd or my_settings.use_metric: #logger.debug("System: Bot Returning NOAA API for weather metric") weather = get_NOAAweather(str(location[0]), str(location[1]), 1, report_days=days) else: #logger.debug("System: Bot Returning NOAA API for weather imperial") weather = get_NOAAweather(str(location[0]), str(location[1]), report_days=days) return weather def handle_emergency_alerts(message, message_from_id, deviceID): location = get_node_location(message_from_id, deviceID) if my_settings.enableDEalerts: # nina Alerts return get_nina_alerts() if message.lower().startswith("ealert"): # Detailed alert FEMA return getIpawsAlert(str(location[0]), str(location[1])) else: # Headlines only FEMA return getIpawsAlert(str(location[0]), str(location[1]), shortAlerts=True) def handleEarthquake(message, message_from_id, deviceID): location = get_node_location(message_from_id, deviceID) if "earthquake" in message.lower(): return checkUSGSEarthQuake(str(location[0]), str(location[1])) def handle_checklist(message, message_from_id, deviceID): name = get_name_from_number(message_from_id, 'short', deviceID) location = get_node_location(message_from_id, deviceID) return process_checklist_command(message_from_id, message, name, location) def handle_inventory(message, message_from_id, deviceID): name = get_name_from_number(message_from_id, 'short', deviceID) return process_inventory_command(message_from_id, message, name) def handle_bbspost(message, message_from_id, deviceID): if "$" in message and not "example:" in message: subject = message.split("$")[1].split("#")[0] subject = subject.rstrip() if "#" in message: body = message.split("#", 1)[1] body = body.rstrip() logger.info(f"System: BBS Post: {subject} Body: {body}") return bbs_post_message(subject, body, message_from_id) elif not "example:" in message: return "example: bbspost $subject #✉️message" elif "@" in message and not "example:" in message: toNode = message.split("@")[1].split("#")[0] toNode = toNode.rstrip() if toNode.startswith("!") and len(toNode) == 9: # mesh !hex try: toNode = int(toNode.strip("!"),16) except ValueError as e: toNode = 0 elif toNode.isalpha() or not toNode.isnumeric() or len(toNode) < 5: # try short name toNode = get_num_from_short_name(toNode, deviceID) if "#" in message: if toNode == 0: return "Node not found " + message.split("@")[1].split("#")[0] body = message.split("#", 1)[1] body = body.rstrip() logger.info(f"System: BBS Post DM to: {toNode} Body: {body}") return bbs_post_dm(toNode, body, message_from_id) else: return "example: bbspost @nodeNumber/ShortName/!hex #✉️message" elif not "example:" in message: return "example: bbspost $subject #✉️message, or bbspost @node #✉️message" def handle_bbsread(message): if "#" in message and not "example:" in message: messageID = int(message.split("#")[1]) return bbs_read_message(messageID) elif not "example:" in message: return "Please add a ✉️message number example: bbsread #14" def handle_bbsdelete(message, message_from_id): if "#" in message and not "example:" in message: messageID = int(message.split("#")[1]) return bbs_delete_message(messageID, message_from_id) elif not "example:" in message: return "Please add a ✉️message number example: bbsdelete #14" def handle_messages(message, deviceID, channel_number, msg_history, publicChannel, isDM): if "?" in message and isDM: return message.split("?")[0].title() + " command returns the last " + str(storeFlimit) + " messages sent on a channel." else: # Filter messages for this device/channel filtered_msgs = [ msgH for msgH in msg_history if msgH[4] == deviceID and (msgH[2] == channel_number or msgH[2] == publicChannel) ] # Choose order and slice # Oldest first, take first N filtered_msgs = filtered_msgs[-storeFlimit:][::-1] if my_settings.reverseSF: # reverse that filtered_msgs = filtered_msgs[::-1] response = "" header = f"📨Msgs:\n" for msgH in filtered_msgs: new_line = f"\n{msgH[0]}: {msgH[1]}" test_response = response + new_line if len(test_response.encode('utf-8')) > maxBuffer: # Truncate message if needed msg_text = msgH[1] truncated = False trunc_marker = "..." while len(msg_text) > 0 and len((response + f"\n{msgH[0]}: {msg_text}{trunc_marker}").encode('utf-8')) > maxBuffer: msg_text = msg_text[:-1] truncated = True if len(msg_text) > 10: if truncated: response += f"\n{msgH[0]}: {msg_text}{trunc_marker}" else: response += f"\n{msgH[0]}: {msg_text}" break continue else: response += new_line if len(response) > 0: return header + response else: return "No 📭messages in history" def handle_sun(message_from_id, deviceID, channel_number, vox=False): if vox: # return a default message if vox is enabled return get_sun(str(my_settings.latitudeValue), str(my_settings.longitudeValue)) location = get_node_location(message_from_id, deviceID, channel_number) return get_sun(str(location[0]), str(location[1])) def sysinfo(message, message_from_id, deviceID, isDM): if "?" in message: return "sysinfo command returns system information." else: if enable_runShellCmd and file_monitor_enabled: # get the system information from the shell script # this is an example of how to run a shell script and return the data shellData = call_external_script('', "script/sysEnv.sh") # check if the script returned data if shellData == "" or shellData == None: # no data returned from the script shellData = "shell script data missing" # if not an admin remove any line in the shellData that had 'IP:' in it if (str(message_from_id) not in bbs_admin_list) or (not isDM): shell_lines = shellData.splitlines() filtered_lines = [line for line in shell_lines if 'IP:' not in line] shellData = "\n".join(filtered_lines) return get_sysinfo(message_from_id, deviceID) + "\n" + shellData.rstrip() else: return get_sysinfo(message_from_id, deviceID) def handle_lheard(message, nodeid, deviceID, isDM): if "?" in message and isDM: return message.split("?")[0].title() + " command returns a list of the nodes that have been heard recently" # display last heard nodes add to response bot_response = "Last Heard\n" bot_response += str(get_node_list(1)) # show last users of the bot with the cmdHistory list history = handle_history(message, nodeid, deviceID, isDM, lheard=True) if history: bot_response += f'LastSeen\n{history}' else: # trim the last \n bot_response = bot_response[:-1] # get count of nodes heard bot_response += f"\n👀In Mesh: {len(seenNodes)}" # bot_response += getNodeTelemetry(deviceID) return bot_response def handle_history(message, nodeid, deviceID, isDM, lheard=False): global cmdHistory, lheardCmdIgnoreNode, bbs_admin_list msg = "" buffer = [] if "?" in message and isDM: return message.split("?")[0].title() + " command returns a list of commands received." # show the last commands from the user to the bot if not lheard: for i in range(len(cmdHistory)): cmdTime = round((time.time() - cmdHistory[i]['time']) / 600) * 5 prettyTime = getPrettyTime(cmdTime) # history display output if str(nodeid) in bbs_admin_list and cmdHistory[i]['nodeID'] not in lheardCmdIgnoreNode: buffer.append((get_name_from_number(cmdHistory[i]['nodeID'], 'short', deviceID), cmdHistory[i]['cmd'], prettyTime)) elif cmdHistory[i]['nodeID'] == nodeid and cmdHistory[i]['nodeID'] not in lheardCmdIgnoreNode: buffer.append((get_name_from_number(nodeid, 'short', deviceID), cmdHistory[i]['cmd'], prettyTime)) # message for output of the last commands buffer.reverse() # only return the last 4 commands if len(buffer) > 4: buffer = buffer[-4:] # create the message from the buffer list for i in range(0, len(buffer)): msg += f"{buffer[i][0]}: {buffer[i][1]} :{buffer[i][2]} ago" if i < len(buffer) - 1: msg += "\n" # add a new line if not the last line else: # sort the cmdHistory list by time, return the username and time into a new list which used for display for i in range(len(cmdHistory)): cmdTime = round((time.time() - cmdHistory[i]['time']) / 600) * 5 prettyTime = getPrettyTime(cmdTime) if cmdHistory[i]['nodeID'] not in lheardCmdIgnoreNode: # add line to a new list for display nodeName = get_name_from_number(cmdHistory[i]['nodeID'], 'short', deviceID) if not any(d[0] == nodeName for d in buffer): buffer.append((nodeName, prettyTime)) else: # update the time for the node in the buffer for the latest time in cmdHistory for j in range(len(buffer)): if buffer[j][0] == nodeName: buffer[j] = (nodeName, prettyTime) # create the message from the buffer list buffer.reverse() # reverse the list to show the latest first for i in range(0, len(buffer)): msg += f"{buffer[i][0]}, {buffer[i][1]} ago" if i < len(buffer) - 1: msg += "\n" # add a new line if not the last line if i > 3: break # only return the last 4 nodes return msg def handle_whereami(message_from_id, deviceID, channel_number): location = get_node_location(message_from_id, deviceID, channel_number) # check api_throttle check_throttle = api_throttle(message_from_id, deviceID, apiName='whereami') if check_throttle: return check_throttle return where_am_i(str(location[0]), str(location[1])) def handle_repeaterQuery(message_from_id, deviceID, channel_number): location = get_node_location(message_from_id, deviceID, channel_number) # check api_throttle check_throttle = api_throttle(message_from_id, deviceID, apiName='repeaterQuery') if check_throttle: return check_throttle if repeater_lookup == "rbook": return getRepeaterBook(str(location[0]), str(location[1])) elif repeater_lookup == "artsci": return getArtSciRepeaters(str(location[0]), str(location[1])) else: return "Repeater lookup not enabled" def handle_tide(message_from_id, deviceID, channel_number, vox=False): if vox: return get_NOAAtide(str(my_settings.latitudeValue), str(my_settings.longitudeValue)) location = get_node_location(message_from_id, deviceID, channel_number) return get_NOAAtide(str(location[0]), str(location[1])) def handle_moon(message_from_id, deviceID, channel_number, vox=False): if vox: return get_moon(str(my_settings.latitudeValue), str(my_settings.longitudeValue)) location = get_node_location(message_from_id, deviceID, channel_number) return get_moon(str(location[0]), str(location[1])) def handle_whoami(message_from_id, deviceID, hop, snr, rssi, pkiStatus): try: loc = [] msg = "You are " + str(message_from_id) + " AKA " +\ str(get_name_from_number(message_from_id, 'long', deviceID) + " AKA, " +\ str(get_name_from_number(message_from_id, 'short', deviceID)) + " AKA, " +\ str(decimal_to_hex(message_from_id)) + f"\n") msg += f"I see the signal strength is {rssi} and the SNR is {snr} with hop count of {hop}" if pkiStatus[1] != 'ABC': msg += f"\nYour PKI bit is {pkiStatus[0]} pubKey: {pkiStatus[1]}" loc = get_node_location(message_from_id, deviceID) if loc != [my_settings.latitudeValue, my_settings.longitudeValue]: msg += f"\nYou are at: lat:{loc[0]} lon:{loc[1]}" # check the positionMetadata for nodeID and get metadata if positionMetadata and message_from_id in positionMetadata: metadata = positionMetadata[message_from_id] msg += f" alt:{metadata.get('altitude')}, speed:{metadata.get('groundSpeed')} bit:{metadata.get('precisionBits')}" except Exception as e: logger.error(f"System: Error in whoami: {e}") msg = "Error in whoami" return msg def handle_whois(message, deviceID, channel_number, message_from_id): #return data on a node name or number if "?" in message: return message.split("?")[0].title() + " command returns information on a node." else: # get the nodeID from the message msg = '' node = '' # find the requested node in db if " " in message: node = message.split(" ")[1] if node.startswith("!") and len(node) == 9: # mesh !hex try: node = int(node.strip("!"),16) except ValueError as e: node = 0 elif node.isalpha() or not node.isnumeric(): # try short name node = get_num_from_short_name(node, deviceID) # get details on the node for i in range(len(seenNodes)): if seenNodes[i]['nodeID'] == int(node): msg = f"Node: {seenNodes[i]['nodeID']} is {get_name_from_number(seenNodes[i]['nodeID'], 'long', deviceID)}\n" msg += f"Last 👀: {time.ctime(seenNodes[i]['lastSeen'])} " break if msg == '': msg = "Provide a valid node number or short name" else: # if the user is an admin show the channel and interface and location if str(message_from_id) in bbs_admin_list: location = get_node_location(seenNodes[i]['nodeID'], deviceID, channel_number) msg += f"Ch: {seenNodes[i]['channel']}, Int: {seenNodes[i]['rxInterface']}" msg += f"Lat: {location[0]}, Lon: {location[1]}\n" if location != [my_settings.latitudeValue, my_settings.longitudeValue]: msg += f"Loc: {where_am_i(str(location[0]), str(location[1]))}" return msg def handle_boot(mesh=True): try: print (CustomFormatter.bold_white + f"\nMeshtastic Autoresponder Bot CTL+C to exit\n" + CustomFormatter.reset) if mesh: for i in range(1, 10): if globals().get(f'interface{i}_enabled', False): myNodeNum = globals().get(f'myNodeNum{i}', 0) logger.info(f"System: Autoresponder Started for Device{i} {get_name_from_number(myNodeNum, 'long', i)}," f"{get_name_from_number(myNodeNum, 'short', i)}. NodeID: {myNodeNum}, {decimal_to_hex(myNodeNum)}") if llm_enabled: msg = f"System: LLM Enabled" llmLoad = llm_query(" ", init=True) if "trouble" not in llmLoad: if my_settings.llmReplyToNonCommands: msg += " | Reply to DM's Enabled" if my_settings.llmUseWikiContext: wiki_source = "Kiwixpedia" if my_settings.use_kiwix_server else "Wikipedia" msg += f" | {wiki_source} Context Enabled" if my_settings.useOpenWebUI: msg += " | OpenWebUI API Enabled" else: msg += f" | Ollama API Model {my_settings.llmModel} loaded. Use {'RAW' if my_settings.rawLLMQuery else 'SYSTEM'} prompt mode." logger.debug(msg) else: logger.debug(f"System: Bad response from LLM: {llmLoad}") if my_settings.bbs_enabled: logger.debug(f"System: BBS Enabled, {bbsdb} has {len(bbs_messages)} messages. Direct Mail Messages waiting: {(len(bbs_dm) - 1)}") if my_settings.bbs_link_enabled: if len(bbs_link_whitelist) > 0: logger.debug(f"System: BBS Link Enabled with {len(bbs_link_whitelist)} peers") else: logger.debug(f"System: BBS Link Enabled allowing all") if my_settings.solar_conditions_enabled: logger.debug("System: Celestial Telemetry Enabled") if my_settings.meshagesTTS: logger.debug("System: Meshages TTS Text-to-Speech Enabled") if my_settings.location_enabled: if my_settings.use_meteo_wxApi: logger.debug("System: Location Telemetry Enabled using Open-Meteo API") else: logger.debug("System: Location Telemetry Enabled using NOAA API") if my_settings.dad_jokes_enabled: logger.debug("System: Dad Jokes Enabled!") if my_settings.coastalEnabled: logger.debug("System: Coastal Forecast and Tide Enabled!") if games_enabled: logger.debug("System: Games Enabled!") if my_settings.wikipedia_enabled: if my_settings.use_kiwix_server: logger.debug(f"System: Wikipedia search Enabled using Kiwix server at {my_settings.kiwix_url}") else: logger.debug("System: Wikipedia search Enabled") if my_settings.rssEnable: logger.debug(f"System: RSS Feed Reader Enabled for feeds: {my_settings.rssFeedNames}") if my_settings.enable_headlines: logger.debug("System: News Headlines Enabled from NewsAPI.org") if my_settings.radio_detection_enabled: logger.debug(f"System: Radio Detection Enabled using rigctld at {my_settings.rigControlServerAddress} broadcasting to channels: {my_settings.sigWatchBroadcastCh}") if my_settings.file_monitor_enabled: logger.warning(f"System: File Monitor Enabled for {my_settings.file_monitor_file_path}, broadcasting to channels: {my_settings.file_monitor_broadcastCh}") if my_settings.enable_runShellCmd: logger.debug("System: Shell Command monitor enabled") if my_settings.allowXcmd: logger.warning("System: File Monitor shell XCMD Enabled") if my_settings.read_news_enabled: logger.debug(f"System: File Monitor News Reader Enabled for {my_settings.news_file_path}") if my_settings.bee_enabled: logger.debug("System: File Monitor Bee Monitor Enabled for 🐝bee.txt") if my_settings.bible_enabled: logger.debug("System: File Monitor Bible Verse Enabled for bible.txt") if my_settings.usAlerts: logger.debug(f"System: Emergency Alert Broadcast Enabled on channel {my_settings.emergency_responder_alert_channel} for interface {my_settings.emergency_responder_alert_interface}") if my_settings.enableDEalerts: logger.debug(f"System: NINA Alerts Enabled with counties {my_settings.myRegionalKeysDE}") if my_settings.volcanoAlertBroadcastEnabled: logger.debug(f"System: Volcano Alert Broadcast Enabled on channels {my_settings.emergency_responder_alert_channel} ignoreUSGSWords {my_settings.ignoreUSGSWords}") if my_settings.ipawsAlertEnabled: logger.debug(f"System: iPAWS Alerts Enabled with FIPS codes {my_settings.myStateFIPSList} ignorelist {my_settings.ignoreFEMAwords}") if my_settings.enableDEalerts: logger.debug(f"System: NINA Alerts Enabled with counties {my_settings.myRegionalKeysDE}") if my_settings.wxAlertBroadcastEnabled: logger.debug(f"System: Weather Alert Broadcast Enabled on channels {my_settings.emergency_responder_alert_channel} ignoreEASwords {my_settings.ignoreEASwords}") if my_settings.emergency_responder_enabled: logger.debug(f"System: Emergency Responder Enabled on channels {my_settings.emergency_responder_alert_channel}") if my_settings.qrz_hello_enabled: if my_settings.train_qrz: logger.debug("System: QRZ Welcome/Hello Enabled with training mode") else: logger.debug("System: QRZ Welcome/Hello Enabled") if my_settings.enableSMTP: if my_settings.enableImap: logger.debug("System: SMTP Email Alerting Enabled using IMAP") else: logger.warning("System: SMTP Email Alerting Enabled") # Default Options if my_settings.useDMForResponse: logger.debug("System: Respond by DM only") if my_settings.autoBanEnabled: logger.debug(f"System: Auto-Ban Enabled for {my_settings.autoBanThreshold} messages in {my_settings.autoBanTimeframe} seconds") load_bbsBanList() if my_settings.log_messages_to_file: logger.debug("System: Logging Messages to disk") if my_settings.syslog_to_file: logger.debug("System: Logging System Logs to disk") if my_settings.motd_enabled: logger.debug(f"System: MOTD Enabled using {my_settings.MOTD} scheduler:{my_settings.schedulerMotd}") if my_settings.sentry_enabled: logger.debug(f"System: Sentry Mode Enabled {my_settings.sentry_radius}m radius reporting to channel:{my_settings.secure_channel} requestLOC:{reqLocationEnabled}") if my_settings.sentryIgnoreList: logger.debug(f"System: Sentry BlockList Enabled for nodes: {my_settings.sentryIgnoreList}") if my_settings.sentryWatchList: logger.debug(f"System: Sentry WatchList Enabled for nodes: {my_settings.sentryWatchList}") if my_settings.highfly_enabled: logger.debug(f"System: HighFly Enabled using {my_settings.highfly_altitude}m limit reporting to channel:{my_settings.highfly_channel}") if my_settings.store_forward_enabled: logger.debug(f"System: S&F(messages command) Enabled using limit: {storeFlimit} and reverse queue:{my_settings.reverseSF}") if my_settings.enableEcho: logger.debug("System: Echo command Enabled") if my_settings.repeater_enabled and multiple_interface: logger.debug(f"System: Repeater Enabled for Channels: {my_settings.repeater_channels}") if my_settings.checklist_enabled: logger.debug("System: CheckList Module Enabled") if my_settings.inventory_enabled: logger.debug("System: Inventory Module Enabled") if my_settings.ignoreChannels: logger.debug(f"System: Ignoring Channels: {my_settings.ignoreChannels}") if my_settings.noisyNodeLogging: logger.debug("System: Noisy Node Logging Enabled") if my_settings.logMetaStats: logger.debug("System: Logging Metadata Stats Enabled, leaderboard") if my_settings.scheduler_enabled: logger.debug(f"System: Scheduler Enabled. Default Device:{my_settings.schedulerInterface} Channel:{my_settings.schedulerChannel}") except Exception as e: logger.error(f"System: Error during boot: {e}") def onReceive(packet, interface): global seenNodes, msg_history, cmdHistory # Priocess the incoming packet, handles the responses to the packet with auto_response() # Sends the packet to the correct handler for processing # extract interface details from inbound packet rxType = type(interface).__name__ # Values assinged to the packet rxNode = message_from_id = snr = rssi = hop = hop_away = channel_number = hop_start = hop_count = hop_limit = 0 pkiStatus = (False, 'ABC') rxNodeHostName = None replyIDset = False emojiSeen = False simulator_flag = False isDM = False channel_name = "unknown" session_passkey = None playingGame = False if my_settings.DEBUGpacket: # Debug print the interface object for item in interface.__dict__.items(): intDebug = f"{item}\n" logger.debug(f"System: Packet Received on {rxType} Interface\n {intDebug} \n END of interface \n") # Debug print the packet for debugging logger.debug(f"Packet Received\n {packet} \n END of packet \n") # determine the rxNode based on the interface type if rxType == 'TCPInterface': rxHost = interface.__dict__.get('hostname', 'unknown') rxNodeHostName = interface.__dict__.get('ip', None) rxNode = next( (i for i in range(1, 10) if multiple_interface and rxHost and globals().get(f'hostname{i}', '').split(':', 1)[0] in rxHost and globals().get(f'interface{i}_type', '') == 'tcp'),None) if rxType == 'SerialInterface': rxInterface = interface.__dict__.get('devPath', 'unknown') rxNode = next( (i for i in range(1, 10) if globals().get(f'port{i}', '') in rxInterface),None) if rxType == 'BLEInterface': rxNode = next( (i for i in range(1, 10) if globals().get(f'interface{i}_type', '') == 'ble'),0) if rxNode is None: # default to interface 1 ## FIXME needs better like a default interface setting or hash lookup if 'decoded' in packet and packet['decoded']['portnum'] in ['ADMIN_APP', 'SIMULATOR_APP']: session_passkey = packet.get('decoded', {}).get('admin', {}).get('sessionPasskey', None) rxNode = 1 # check if the packet has a channel flag use it ## FIXME needs to be channel hash lookup if packet.get('channel'): channel_number = packet.get('channel') channel_name = "unknown" try: res = resolve_channel_name(channel_number, rxNode, interface) if res: try: channel_name, _ = res except Exception: channel_name = "unknown" else: # Search all interfaces for this channel cache = build_channel_cache() found_on_other = None for device in cache: for chan_name, info in device.get("channels", {}).items(): if str(info.get('number')) == str(channel_number) or str(info.get('hash')) == str(channel_number): found_on_other = device.get("interface_id") found_chan_name = chan_name break if found_on_other: break if found_on_other and found_on_other != rxNode: logger.debug( f"System: Received Packet on Channel:{channel_number} ({found_chan_name}) on Interface:{rxNode}, but this channel is configured on Interface:{found_on_other}" ) except Exception as e: logger.debug(f"System: channel resolution error: {e}") #debug channel info # if "unknown" in str(channel_name): # logger.debug(f"System: Received Packet on Channel:{channel_number} on Interface:{rxNode}") # else: # logger.debug(f"System: Received Packet on Channel:{channel_number} Name:{channel_name} on Interface:{rxNode}") # check if the packet has a simulator flag simulator_flag = packet.get('decoded', {}).get('simulator', False) if isinstance(simulator_flag, dict): # assume Software Simulator simulator_flag = True # set the message_from_id message_from_id = packet['from'] # if message_from_id is not in the seenNodes list add it if not any(node.get('nodeID') == message_from_id for node in seenNodes): seenNodes.append({'nodeID': message_from_id, 'rxInterface': rxNode, 'channel': channel_number, 'welcome': False, 'first_seen': time.time(), 'lastSeen': time.time()}) else: # update lastSeen time for node in seenNodes: if node.get('nodeID') == message_from_id: node['lastSeen'] = time.time() break # BBS DM MAIL CHECKER if bbs_enabled and 'decoded' in packet: msg = bbs_check_dm(message_from_id) if msg: logger.info(f"System: BBS DM Delivery: {msg[1]} For: {get_name_from_number(message_from_id, 'long', rxNode)}") message = "Mail: " + msg[1] + " From: " + get_name_from_number(msg[2], 'long', rxNode) bbs_delete_dm(msg[0], msg[1]) send_message(message, channel_number, message_from_id, rxNode) # CHECK with ban_hammer() if the node is banned if str(message_from_id) in my_settings.bbs_ban_list or str(message_from_id) in my_settings.autoBanlist: logger.warning(f"System: Banned Node {message_from_id} tried to send a message. Ignored. Try adding to node firmware-blocklist") return # handle TEXT_MESSAGE_APP try: if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': message_bytes = packet['decoded']['payload'] message_string = message_bytes.decode('utf-8') via_mqtt = packet['decoded'].get('viaMqtt', False) transport_mechanism = ( packet.get('transport_mechanism') or packet.get('transportMechanism') or (packet.get('decoded', {}).get('transport_mechanism')) or (packet.get('decoded', {}).get('transportMechanism')) or 'unknown' ) rx_time = packet['decoded'].get('rxTime', time.time()) # check if the packet is from us if message_from_id in [myNodeNum1, myNodeNum2, myNodeNum3, myNodeNum4, myNodeNum5, myNodeNum6, myNodeNum7, myNodeNum8, myNodeNum9]: logger.warning(f"System: Packet from self {message_from_id} loop or traffic replay detected") # get the signal strength and snr if available if packet.get('rxSnr') or packet.get('rxRssi'): snr = packet.get('rxSnr', 0) rssi = packet.get('rxRssi', 0) # check if the packet has a publicKey flag use it if packet.get('publicKey'): pkiStatus = packet.get('pkiEncrypted', False), packet.get('publicKey', 'ABC') # check if the packet has replyId flag // currently unused in the code if packet.get('replyId'): replyIDset = packet.get('replyId', False) # check if the packet has emoji flag set it // currently unused in the code if packet.get('emoji'): emojiSeen = packet.get('emoji', False) # check if the packet has a hop count flag use it if packet.get('hopsAway'): hop_away = packet.get('hopsAway', 0) if packet.get('hopStart'): hop_start = packet.get('hopStart', 0) if packet.get('hopLimit'): hop_limit = packet.get('hopLimit', 0) # calculate hop count hop = "" if hop_limit > 0 and hop_start >= hop_limit: hop_count = hop_away + (hop_start - hop_limit) elif hop_limit > 0 and hop_start < hop_limit: hop_count = hop_away + (hop_limit - hop_start) else: hop_count = hop_away if hop_count > 0: # set hop string from calculated hop count hop = f"{hop_count} Hop" if hop_count == 1 else f"{hop_count} Hops" if hop_start == hop_limit and "lora" in str(transport_mechanism).lower() and (snr != 0 or rssi != 0) and hop_count == 0: # 2.7+ firmware direct hop over LoRa hop = "Direct" if via_mqtt or "mqtt" in str(transport_mechanism).lower(): hop = "MQTT" via_mqtt = True elif "udp" in str(transport_mechanism).lower(): hop = "Gateway" if hop in ("MQTT", "Gateway") and hop_count > 0: hop = f" {hop_count} Hops" # Add relay node info if present if packet.get('relayNode') is not None: relay_val = packet['relayNode'] last_byte = relay_val & 0xFF if last_byte == 0x00: hex_val = 'OldFW' else: hex_val = f"{last_byte:02X}" hop += f" Relay:{hex_val}" if enableHopLogs: logger.debug(f"System: Packet HopDebugger: hop_away:{hop_away} hop_limit:{hop_limit} hop_start:{hop_start} calculated_hop_count:{hop_count} final_hop_value:{hop} via_mqtt:{via_mqtt} transport_mechanism:{transport_mechanism} Hostname:{rxNodeHostName}") # check with stringSafeChecker if the message is safe if stringSafeCheck(message_string, message_from_id) is False: logger.warning(f"System: Possibly Unsafe Message from {get_name_from_number(message_from_id, 'long', rxNode)}") if help_message in message_string or welcome_message in message_string or "CMD?:" in message_string: # ignore help and welcome messages logger.warning(f"Got Own Welcome/Help header. From: {get_name_from_number(message_from_id, 'long', rxNode)}") return # If the packet is a DM (Direct Message) respond to it, otherwise validate its a message for us on the channel if packet['to'] in [myNodeNum1, myNodeNum2, myNodeNum3, myNodeNum4, myNodeNum5, myNodeNum6, myNodeNum7, myNodeNum8, myNodeNum9]: # message is DM to us isDM = True # check if the message contains a trap word, DMs are always responded to if (messageTrap(message_string) and not llm_enabled) or messageTrap(message_string.split()[0]): # log the message to stdout logger.info(f"Device:{rxNode} Channel: {channel_number} " + CustomFormatter.green + f"Received DM: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\ "From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}") # respond with DM send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # DM is useful for games or LLM if games_enabled and ("Direct" in hop or hop_count < my_settings.game_hop_limit): playingGame = checkPlayingGame(message_from_id, message_string, rxNode, channel_number) elif hop_count >= my_settings.game_hop_limit: if games_enabled: logger.warning(f"Device:{rxNode} Ignoring Request to Play Game: {message_string} From: {get_name_from_number(message_from_id, 'long', rxNode)} with hop count: {hop}") send_message(f"Your hop count exceeds safe playable distance at {hop_count} hops", channel_number, message_from_id, rxNode) else: playingGame = False else: playingGame = False if not playingGame: if llm_enabled and my_settings.llmReplyToNonCommands: # respond with LLM llm = handle_llm(message_from_id, channel_number, rxNode, message_string, publicChannel) send_message(llm, channel_number, message_from_id, rxNode) else: # respond with welcome message on DM logger.warning(f"Device:{rxNode} Ignoring DM: {message_string} From: {get_name_from_number(message_from_id, 'long', rxNode)}") # if seenNodes list is not marked as welcomed send welcome message if not any(node['nodeID'] == message_from_id and node['welcome'] == True for node in seenNodes): # send welcome message send_message(welcome_message, channel_number, message_from_id, rxNode) # mark the node as welcomed for node in seenNodes: if node['nodeID'] == message_from_id: node['welcome'] = True else: if my_settings.dad_jokes_enabled: # respond with a dad joke on DM send_message(tell_joke(), channel_number, message_from_id, rxNode) else: # respond with help message on DM send_message(help_message, channel_number, message_from_id, rxNode) # add message to tts queue if meshagesTTS: # add to the tts_read_queue readMe = f"DM from {get_name_from_number(message_from_id, 'short', rxNode)}: {message_string}" tts_read_queue.append(readMe) # log the message to the message log if log_messages_to_file: msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | DM | " + message_string.replace('\n', '-nl-')) else: # message is on a channel if messageTrap(message_string): # message is for us to respond to, or is it... if my_settings.ignoreDefaultChannel and channel_number == my_settings.publicChannel: logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Default Channel:{channel_number}") elif str(message_from_id) in my_settings.bbs_ban_list: logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Cantankerous Node") elif str(channel_number) in my_settings.ignoreChannels: logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Ignored Channel:{channel_number}") elif my_settings.cmdBang and not message_string.startswith("!"): logger.debug(f"System: Ignoring CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)} Didnt sound like they meant it") else: # message is for bot to respond to, seriously this time.. logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "ReceivedChannel: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\ "From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}") if my_settings.useDMForResponse: # respond to channel message via direct message send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # or respond to channel message on the channel itself if channel_number == my_settings.publicChannel and my_settings.antiSpam: # warning user spamming default channel logger.warning(f"System: AntiSpam protection, sending DM to: {get_name_from_number(message_from_id, 'long', rxNode)}") # respond to channel message via direct message send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # respond to channel message on the channel itself send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, 0, rxNode) else: # message is not for us to respond to # ignore the message but add it to the message history list if my_settings.zuluTime: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") else: timestamp = datetime.now().strftime("%Y-%m-%d %I:%M:%S%p") # trim the history list if it exceeds max_history if len(msg_history) >= my_settings.MAX_MSG_HISTORY: # Always keep only the most recent MAX_MSG_HISTORY entries msg_history = msg_history[-my_settings.MAX_MSG_HISTORY:] # add the message to the history list msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode)) # print the message to the log and sdout logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "Ignoring Message:" + CustomFormatter.white +\ f" {message_string} " + CustomFormatter.purple + "From:" + CustomFormatter.white + f" {get_name_from_number(message_from_id)}") if my_settings.log_messages_to_file: msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-')) # repeat the message on the other device if my_settings.repeater_enabled and my_settings.multiple_interface: # wait a responseDelay to avoid message collision from lora-ack. time.sleep(my_settings.responseDelay) if len(message_string) > (3 * my_settings.MESSAGE_CHUNK_SIZE): logger.warning(f"System: Not repeating message, exceeds size limit ({len(message_string)} > {3 * MESSAGE_CHUNK_SIZE})") else: rMsg = (f"{message_string} From:{get_name_from_number(message_from_id, 'short', rxNode)}") # if channel found in the repeater list repeat the message if str(channel_number) in my_settings.repeater_channels: for i in range(1, 10): if globals().get(f'interface{i}_enabled', False) and i != rxNode: logger.debug(f"Repeating message on Device{i} Channel:{channel_number}") send_message(rMsg, channel_number, 0, i) time.sleep(my_settings.responseDelay) # if QRZ enabled check if we have said hello if my_settings.qrz_hello_enabled: if never_seen_before(message_from_id): name = get_name_from_number(message_from_id, 'short', rxNode) if isinstance(name, str) and name.startswith("!") and len(name) == 9: # we didnt get a info packet yet so wait and ingore this go around logger.debug(f"System: QRZ Hello ignored, no info packet yet") else: # add to qrz_hello list hello(message_from_id, name) # send a hello message as a DM if not my_settings.train_qrz: send_message(f"Hello {name} {qrz_hello_string}", channel_number, message_from_id, rxNode) # handle mini games if my_settings.wordOfTheDay: #word of the day game play on non bot messages happened, old_entry, new_entry, bingo_win, bingo_message = theWordOfTheDay.did_it_happen(message_string) if happened: wordWas = old_entry['word'] metaWas = old_entry['meta'] msg = f"🎉 {get_name_from_number(message_from_id, 'long', rxNode)} found the Word of the Day🎊:\n {wordWas}, {metaWas}" send_message(msg, channel_number, 0, rxNode) if bingo_win: msg = f"🎉 {get_name_from_number(message_from_id, 'long', rxNode)} scored word-search-BINGO!🥳 {bingo_message}" send_message(msg, channel_number, 0, rxNode) slotMachine = theWordOfTheDay.emojiMiniGame(message_string, emojiSeen=emojiSeen, nodeID=message_from_id, nodeInt=rxNode) if slotMachine: msg = f"🎉 {get_name_from_number(message_from_id, 'long', rxNode)} played the emote-Fruit-Machine and got: {slotMachine} 🥳" send_message(msg, channel_number, 0, rxNode) # add message to tts queue if my_settings.meshagesTTS and channel_number == my_settings.ttsChannels: # add to the tts_read_queue readMe = f"DM from {get_name_from_number(message_from_id, 'short', rxNode)}: {message_string}" tts_read_queue.append(readMe) else: # Evaluate non TEXT_MESSAGE_APP packets consumeMetadata(packet, rxNode, channel_number) except KeyError as e: logger.critical(f"System: Error processing packet: {e} Device:{rxNode}") logger.debug(f"System: Error Packet = {packet}") async def start_rx(): # Start the receive subscriber using pubsub via meshtastic library pub.subscribe(onReceive, 'meshtastic.receive') pub.subscribe(onDisconnect, 'meshtastic.connection.lost') logger.debug("System: RX Subscriber started") # here we go loopty loo while True: await asyncio.sleep(0.5) pass # Initialize game trackers loadLeaderboard() gameTrackers = [ (dwPlayerTracker, "DopeWars", handleDopeWars), (lemonadeTracker, "LemonadeStand", handleLemonade), (vpTracker, "VideoPoker", handleVideoPoker), (jackTracker, "BlackJack", handleBlackJack), (mindTracker, "MasterMind", handleMmind), (golfTracker, "GolfSim", handleGolf), (hangmanTracker, "Hangman", handleHangman), (hamtestTracker, "HamTest", handleHamtest), (tictactoeTracker, "TicTacToe", handleTicTacToe), (surveyTracker, "Survey", surveyHandler), (battleshipTracker, "Battleship", handleBattleship), # quiz does not use a tracker (quizGamePlayer) always active ] # Hello World async def main(): tasks = [] try: handle_boot() # Create core tasks tasks.append(asyncio.create_task(start_rx(), name="mesh_rx")) tasks.append(asyncio.create_task(watchdog(), name="watchdog")) # Add optional tasks if my_settings.dataPersistence_enabled: tasks.append(asyncio.create_task(dataPersistenceLoop(), name="data_persistence")) if my_settings.file_monitor_enabled: tasks.append(asyncio.create_task(handleFileWatcher(), name="file_monitor")) if my_settings.radio_detection_enabled: tasks.append(asyncio.create_task(handleSignalWatcher(), name="hamlib")) if my_settings.voxDetectionEnabled: from modules.radio import voxMonitor tasks.append(asyncio.create_task(voxMonitor(), name="vox_detection")) if my_settings.meshagesTTS: tasks.append(asyncio.create_task(handleTTS(), name="tts_handler")) if my_settings.wsjtx_detection_enabled: tasks.append(asyncio.create_task(handleWsjtxWatcher(), name="wsjtx_monitor")) if my_settings.js8call_detection_enabled: tasks.append(asyncio.create_task(handleJs8callWatcher(), name="js8call_monitor")) if my_settings.scheduler_enabled: from modules.scheduler import run_scheduler_loop, setup_scheduler setup_scheduler(schedulerMotd, MOTD, schedulerMessage, schedulerChannel, schedulerInterface, schedulerValue, schedulerTime, schedulerInterval) tasks.append(asyncio.create_task(run_scheduler_loop(), name="scheduler")) logger.debug(f"System: Starting {len(tasks)} async tasks") # Wait for all tasks with proper exception handling results = await asyncio.gather(*tasks, return_exceptions=True) # Check for exceptions in results for i, result in enumerate(results): if isinstance(result, Exception): logger.error(f"Task {tasks[i].get_name()} failed with: {result}") except Exception as e: logger.error(f"Main loop error: {e}") finally: # Cleanup tasks logger.debug("System: Cleaning up async tasks") for task in tasks: if not task.done(): task.cancel() try: await task except asyncio.CancelledError: logger.debug(f"Task {task.get_name()} cancelled successfully") except Exception as e: logger.warning(f"Error cancelling task {task.get_name()}: {e}") await asyncio.sleep(0.01) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: exit_handler() except SystemExit: pass # EOF