#!/usr/bin/env python3 # Meshtastic Autoresponder PONG Bot # K7MHI Kelly Keeton 2024 try: from pubsub import pub except ImportError: print(f"Important dependencies are not met, try install.sh\n\n Did you mean to './launch.sh pong' using a virtual environment.") exit(1) import asyncio import time # for sleep, get some when you can :) import random from modules.log import * from modules.system import * # Global Variables DEBUGpacket = False # Debug print the packet rx def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_number, deviceID, isDM): # Auto response to messages message_lower = message.lower() bot_response = "I'm sorry, I'm afraid I can't do that." command_handler = { # Command List processes system.trap_list. system.messageTrap() sends any commands to here "ack": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "cmd": lambda: handle_cmd(message, message_from_id, deviceID), "cq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "cqcq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "cqcqcq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "echo": lambda: handle_echo(message, message_from_id, deviceID, isDM, channel_number), "lheard": lambda: handle_lheard(message, message_from_id, deviceID, isDM), "motd": lambda: handle_motd(message, MOTD), "ping": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "pong": lambda: "🏓PING!!🛜", "sitrep": lambda: lambda: handle_lheard(message, message_from_id, deviceID, isDM), "sysinfo": lambda: sysinfo(message, message_from_id, deviceID), "test": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "testing": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), } cmds = [] # list to hold the commands found in the message for key in command_handler: if key in message_lower.split(' '): cmds.append({'cmd': key, 'index': message_lower.index(key)}) if len(cmds) > 0: # sort the commands by index value cmds = sorted(cmds, key=lambda k: k['index']) logger.debug(f"System: Bot detected Commands:{cmds}") # run the first command after sorting bot_response = command_handler[cmds[0]['cmd']]() # wait a responseDelay to avoid message collision from lora-ack time.sleep(responseDelay) return bot_response def handle_cmd(message, message_from_id, deviceID): # why CMD? its just a command list. a terminal would normally use "Help" # I didnt want to invoke the word "help" in Meshtastic due to its possible emergency use if " " in message and message.split(" ")[1] in trap_list: return "🤖 just use the commands directly in chat" return help_message def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number): global multiPing if "?" in message and isDM: return message.split("?")[0].title() + " command returns SNR and RSSI, or hopcount from your message. Try adding e.g. @place or #tag" msg = "" type = '' if "ping" in message.lower(): msg = "🏓PONG\n" type = "🏓PING" elif "test" in message.lower() or "testing" in message.lower(): msg = random.choice(["🎙Testing 1,2,3\n", "🎙Testing\n",\ "🎙Testing, testing\n",\ "🎙Ah-wun, ah-two...\n", "🎙Is this thing on?\n",\ "🎙Roger that!\n",]) type = "🎙TEST" elif "ack" in message.lower(): msg = random.choice(["✋ACK-ACK!\n", "✋Ack to you!\n"]) type = "✋ACK" elif "cqcq" in message.lower() or "cq" in message.lower() or "cqcqcq" in message.lower(): if deviceID == 1: myname = get_name_from_number(myNodeNum1, 'short', 1) elif deviceID == 2: myname = get_name_from_number(myNodeNum2, 'short', 2) msg = f"QSP QSL OM DE {myname} K\n" else: msg = "🔊 Can you hear me now?" if hop == "Direct": msg = msg + f"SNR:{snr} RSSI:{rssi}" else: msg = msg + hop if "@" in message: msg = msg + " @" + message.split("@")[1] type = type + " @" + message.split("@")[1] elif "#" in message: msg = msg + " #" + message.split("#")[1] type = type + " #" + message.split("#")[1] # check for multi ping request if " " in message: # if stop multi ping if "stop" in message.lower(): for i in range(0, len(multiPingList)): if multiPingList[i].get('message_from_id') == message_from_id: multiPingList.pop(i) msg = "🛑 auto-ping" # if 3 or more entries (2 or more active), throttle the multi-ping for congestion if len(multiPingList) > 2: msg = "🚫⛔️ auto-ping, service busy. ⏳Try again soon." pingCount = -1 else: # set inital pingCount try: pingCount = int(message.split(" ")[1]) if pingCount == 123 or pingCount == 1234: pingCount = 1 elif not autoPingInChannel and not isDM: # no autoping in channels pingCount = 1 if pingCount > 51: pingCount = 50 except: pingCount = -1 if pingCount > 1: multiPingList.append({'message_from_id': message_from_id, 'count': pingCount + 1, 'type': type, 'deviceID': deviceID, 'channel_number': channel_number, 'startCount': pingCount}) if type == "🎙TEST": msg = f"🛜Initalizing BufferTest, using chunks of about {int(maxBuffer // pingCount)}, max length {maxBuffer} in {pingCount} messages" else: msg = f"🚦Initalizing {pingCount} auto-ping" # if not a DM add the username to the beginning of msg if not useDMForResponse and not isDM: msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg return msg def handle_motd(message, message_from_id, isDM): global MOTD isAdmin = False msg = "" # check if the message_from_id is in the bbs_admin_list if bbs_admin_list != ['']: for admin in bbs_admin_list: if str(message_from_id) == admin: isAdmin = True break else: isAdmin = True # admin help via DM if "?" in message and isDM and isAdmin: msg = "Message of the day, set with 'motd $ HelloWorld!'" elif "?" in message and isDM and not isAdmin: # non-admin help via DM msg = "Message of the day" elif "$" in message and isAdmin: motd = message.split("$")[1] MOTD = motd.rstrip() logger.debug(f"System: {message_from_id} changed MOTD: {MOTD}") msg = "MOTD changed to: " + MOTD else: msg = "MOTD: " + MOTD return msg def handle_echo(message, message_from_id, deviceID, isDM, channel_number): if "?" in message.lower(): return "echo command returns your message back to you. Example:echo Hello World" elif "echo " in message.lower(): parts = message.lower().split("echo ", 1) if len(parts) > 1 and parts[1].strip() != "": echo_msg = parts[1] if channel_number != echoChannel: echo_msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + echo_msg return echo_msg else: return "Please provide a message to echo back to you. Example:echo Hello World" else: return "Please provide a message to echo back to you. Example:echo Hello World" def sysinfo(message, message_from_id, deviceID): if "?" in message: return "sysinfo command returns system information." else: return get_sysinfo(message_from_id, deviceID) def handle_lheard(message, nodeid, deviceID, isDM): if "?" in message and isDM: return message.split("?")[0].title() + " command returns a list of the nodes that have been heard recently" # display last heard nodes add to response bot_response = "Last Heard\n" bot_response += str(get_node_list(1)) # bot_response += getNodeTelemetry(deviceID) return bot_response def onReceive(packet, interface): global seenNodes # Priocess the incoming packet, handles the responses to the packet with auto_response() # Sends the packet to the correct handler for processing # extract interface details from inbound packet rxType = type(interface).__name__ # Valies assinged to the packet rxNode, message_from_id, snr, rssi, hop, hop_away, channel_number = 0, 0, 0, 0, 0, 0, 0 pkiStatus = (False, 'ABC') replyIDset = False emojiSeen = False isDM = False if DEBUGpacket: # Debug print the interface object for item in interface.__dict__.items(): intDebug = f"{item}\n" logger.debug(f"System: Packet Received on {rxType} Interface\n {intDebug} \n END of interface \n") # Debug print the packet for debugging logger.debug(f"Packet Received\n {packet} \n END of packet \n") # set the value for the incomming interface if rxType == 'SerialInterface': rxInterface = interface.__dict__.get('devPath', 'unknown') if port1 in rxInterface: rxNode = 1 elif multiple_interface and port2 in rxInterface: rxNode = 2 elif multiple_interface and port3 in rxInterface: rxNode = 3 elif multiple_interface and port4 in rxInterface: rxNode = 4 elif multiple_interface and port5 in rxInterface: rxNode = 5 elif multiple_interface and port6 in rxInterface: rxNode = 6 elif multiple_interface and port7 in rxInterface: rxNode = 7 elif multiple_interface and port8 in rxInterface: rxNode = 8 elif multiple_interface and port9 in rxInterface: rxNode = 9 if rxType == 'TCPInterface': rxHost = interface.__dict__.get('hostname', 'unknown') if rxHost and hostname1 in rxHost and interface1_type == 'tcp': rxNode = 1 elif multiple_interface and rxHost and hostname2 in rxHost and interface2_type == 'tcp': rxNode = 2 elif multiple_interface and rxHost and hostname3 in rxHost and interface3_type == 'tcp': rxNode = 3 elif multiple_interface and rxHost and hostname4 in rxHost and interface4_type == 'tcp': rxNode = 4 elif multiple_interface and rxHost and hostname5 in rxHost and interface5_type == 'tcp': rxNode = 5 elif multiple_interface and rxHost and hostname6 in rxHost and interface6_type == 'tcp': rxNode = 6 elif multiple_interface and rxHost and hostname7 in rxHost and interface7_type == 'tcp': rxNode = 7 elif multiple_interface and rxHost and hostname8 in rxHost and interface8_type == 'tcp': rxNode = 8 elif multiple_interface and rxHost and hostname9 in rxHost and interface9_type == 'tcp': rxNode = 9 if rxType == 'BLEInterface': if interface1_type == 'ble': rxNode = 1 elif multiple_interface and interface2_type == 'ble': rxNode = 2 elif multiple_interface and interface3_type == 'ble': rxNode = 3 elif multiple_interface and interface4_type == 'ble': rxNode = 4 elif multiple_interface and interface5_type == 'ble': rxNode = 5 elif multiple_interface and interface6_type == 'ble': rxNode = 6 elif multiple_interface and interface7_type == 'ble': rxNode = 7 elif multiple_interface and interface8_type == 'ble': rxNode = 8 elif multiple_interface and interface9_type == 'ble': rxNode = 9 # check if the packet has a channel flag use it if packet.get('channel'): channel_number = packet.get('channel', 0) # set the message_from_id message_from_id = packet['from'] # check if the packet has a channel flag use it if packet.get('channel'): channel_number = packet.get('channel', 0) # handle TEXT_MESSAGE_APP try: if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': message_bytes = packet['decoded']['payload'] message_string = message_bytes.decode('utf-8') via_mqtt = packet['decoded'].get('viaMqtt', False) # check if the packet is from us if message_from_id == myNodeNum1 or message_from_id == myNodeNum2: logger.warning(f"System: Packet from self {message_from_id} loop or traffic replay deteted") # get the signal strength and snr if available if packet.get('rxSnr') or packet.get('rxRssi'): snr = packet.get('rxSnr', 0) rssi = packet.get('rxRssi', 0) # check if the packet has a publicKey flag use it if packet.get('publicKey'): pkiStatus = (packet.get('pkiEncrypted', False), packet.get('publicKey', 'ABC')) # check if the packet has a hop count flag use it if packet.get('hopsAway'): hop_away = packet.get('hopsAway', 0) else: # if the packet does not have a hop count try other methods if packet.get('hopLimit'): hop_limit = packet.get('hopLimit', 0) else: hop_limit = 0 if packet.get('hopStart'): hop_start = packet.get('hopStart', 0) else: hop_start = 0 if enableHopLogs: logger.debug(f"System: Packet HopDebugger: hop_away:{hop_away} hop_limit:{hop_limit} hop_start:{hop_start}") if hop_away == 0 and hop_limit == 0 and hop_start == 0: hop = "Last Hop" hop_count = 0 if hop_start == hop_limit: hop = "Direct" hop_count = 0 elif hop_start == 0 and hop_limit > 0 or via_mqtt: hop = "MQTT" hop_count = 0 else: # set hop to Direct if the message was sent directly otherwise set the hop count if hop_away > 0: hop_count = hop_away else: hop_count = hop_start - hop_limit #print (f"calculated hop count: {hop_start} - {hop_limit} = {hop_count}") hop = f"{hop_count} hops" if help_message in message_string or welcome_message in message_string or "CMD?:" in message_string: # ignore help and welcome messages logger.warning(f"Got Own Welcome/Help header. From: {get_name_from_number(message_from_id, 'long', rxNode)}") return # If the packet is a DM (Direct Message) respond to it, otherwise validate its a message for us on the channel if packet['to'] == myNodeNum1 or packet['to'] == myNodeNum2: # message is DM to us isDM = True # check if the message contains a trap word, DMs are always responded to if (messageTrap(message_string) and not llm_enabled) or messageTrap(message_string.split()[0]): # log the message to stdout logger.info(f"Device:{rxNode} Channel: {channel_number} " + CustomFormatter.green + f"Received DM: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\ "From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}") # respond with DM send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: logger.warning(f"Device:{rxNode} Ignoring DM: {message_string} From: {get_name_from_number(message_from_id, 'long', rxNode)}") send_message(welcome_message, channel_number, message_from_id, rxNode) time.sleep(responseDelay) # log the message to the message log if log_messages_to_file: msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | DM | " + message_string.replace('\n', '-nl-')) else: # message is on a channel if messageTrap(message_string): if ignoreDefaultChannel and channel_number == publicChannel: logger.debug(f"System: ignoreDefaultChannel CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)}") else: # message is for bot to respond to logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "ReceivedChannel: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\ "From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}") if useDMForResponse: # respond to channel message via direct message send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # or respond to channel message on the channel itself if channel_number == publicChannel and antiSpam: # warning user spamming default channel logger.warning(f"System: AntiSpam protection, sending DM to: {get_name_from_number(message_from_id, 'long', rxNode)}") # respond to channel message via direct message send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # respond to channel message on the channel itself send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, 0, rxNode) else: # message is not for bot to respond to # ignore the message but add it to the message history list if zuluTime: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") else: timestamp = datetime.now().strftime("%Y-%m-%d %I:%M:%S%p") if len(msg_history) < storeFlimit: msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode)) else: msg_history.pop(0) msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode)) # print the message to the log and sdout logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "Ignoring Message:" + CustomFormatter.white +\ f" {message_string} " + CustomFormatter.purple + "From:" + CustomFormatter.white + f" {get_name_from_number(message_from_id)}") if log_messages_to_file: msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-')) # repeat the message on the other device if repeater_enabled and multiple_interface: # wait a responseDelay to avoid message collision from lora-ack. time.sleep(responseDelay) rMsg = (f"{message_string} From:{get_name_from_number(message_from_id, 'short', rxNode)}") # if channel found in the repeater list repeat the message if str(channel_number) in repeater_channels: for i in range(1, 10): if globals().get(f'interface{i}_enabled', False) and i != rxNode: logger.debug(f"Repeating message on Device{i} Channel:{channel_number}") send_message(rMsg, channel_number, 0, i) time.sleep(responseDelay) else: # Evaluate non TEXT_MESSAGE_APP packets consumeMetadata(packet, rxNode, channel_number) except KeyError as e: logger.critical(f"System: Error processing packet: {e} Device:{rxNode}") logger.debug(f"System: Error Packet = {packet}") async def start_rx(): print (CustomFormatter.bold_white + f"\nMeshtastic Autoresponder Bot CTL+C to exit\n" + CustomFormatter.reset) # Start the receive subscriber using pubsub via meshtastic library pub.subscribe(onReceive, 'meshtastic.receive') pub.subscribe(onDisconnect, 'meshtastic.connection.lost') for i in range(1, 10): if globals().get(f'interface{i}_enabled', False): myNodeNum = globals().get(f'myNodeNum{i}', 0) logger.info(f"System: Autoresponder Started for Device{i} {get_name_from_number(myNodeNum, 'long', i)}," f"{get_name_from_number(myNodeNum, 'short', i)}. NodeID: {myNodeNum}, {decimal_to_hex(myNodeNum)}") if useDMForResponse: logger.debug(f"System: Respond by DM only") if log_messages_to_file: logger.debug("System: Logging Messages to disk") if syslog_to_file: logger.debug("System: Logging System Logs to disk") if motd_enabled: logger.debug(f"System: MOTD Enabled using {MOTD}") if enableEcho: logger.debug(f"System: Echo command Enabled") if sentry_enabled: logger.debug(f"System: Sentry Mode Enabled {sentry_radius}m radius reporting to channel:{secure_channel}") if highfly_enabled: logger.debug(f"System: HighFly Enabled using {highfly_altitude}m limit reporting to channel:{highfly_channel}") if repeater_enabled and multiple_interface: logger.debug(f"System: Repeater Enabled for Channels: {repeater_channels}") if bbs_enabled: logger.debug(f"System: BBS Enabled, {bbsdb} has {len(bbs_messages)} messages. Direct Mail Messages waiting: {(len(bbs_dm) - 1)}") if bbs_link_enabled: if len(bbs_link_whitelist) > 0: logger.debug(f"System: BBS Link Enabled with {len(bbs_link_whitelist)} peers") else: logger.debug(f"System: BBS Link Enabled allowing all") if scheduler_enabled: # Examples of using the scheduler, Times here are in 24hr format # https://schedule.readthedocs.io/en/stable/ # Reminder Scheduler is enabled every Monday at noon send a log message schedule.every().monday.at("12:00").do(lambda: logger.info("System: Scheduled Broadcast Reminder")) logger.debug("System: Starting the broadcast scheduler") await BroadcastScheduler() # here we go loopty loo while True: await asyncio.sleep(0.5) pass # Hello World async def main(): tasks = [] try: # Create core tasks tasks.append(asyncio.create_task(start_rx(), name="pong_rx")) tasks.append(asyncio.create_task(watchdog(), name="watchdog")) # Add optional tasks if file_monitor_enabled: tasks.append(asyncio.create_task(handleFileWatcher(), name="file_monitor")) logger.debug(f"System: Starting {len(tasks)} async tasks") # Wait for all tasks with proper exception handling results = await asyncio.gather(*tasks, return_exceptions=True) # Check for exceptions in results for i, result in enumerate(results): if isinstance(result, Exception): logger.error(f"Task {tasks[i].get_name()} failed with: {result}") except Exception as e: logger.error(f"Main loop error: {e}") finally: # Cleanup tasks logger.debug("System: Cleaning up async tasks") for task in tasks: if not task.done(): task.cancel() try: await task except asyncio.CancelledError: logger.debug(f"Task {task.get_name()} cancelled successfully") except Exception as e: logger.warning(f"Error cancelling task {task.get_name()}: {e}") await asyncio.sleep(0.01) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: exit_handler() except SystemExit: pass # EOF