#!/usr/bin/env python3 # Meshtastic Autoresponder PONG Bot # K7MHI Kelly Keeton 2024 import asyncio import time # for sleep, get some when you can :) from pubsub import pub # pip install pubsub from modules.log import * from modules.system import * # Global Variables cmdHistory = [] # list to hold the last commands DEBUGpacket = False # Debug print the packet rx def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_number, deviceID, isDM): # Auto response to messages message_lower = message.lower() bot_response = "I'm sorry, I'm afraid I can't do that." command_handler = { "ack": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "cmd": lambda: help_message, "cmd?": lambda: help_message, "cq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "cqcq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "cqcqcq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "lheard": lambda: handle_lheard(interface1, interface2_enabled, myNodeNum1, myNodeNum2), "motd": lambda: handle_motd(message, MOTD), "ping": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "pong": lambda: "🏓PING!!🛜", "sitrep": lambda: handle_lheard(interface1, interface2_enabled, myNodeNum1, myNodeNum2), "test": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), "testing": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number), } cmds = [] # list to hold the commands found in the message for key in command_handler: if key in message_lower.split(' '): cmds.append({'cmd': key, 'index': message_lower.index(key)}) if len(cmds) > 0: # sort the commands by index value cmds = sorted(cmds, key=lambda k: k['index']) logger.debug(f"System: Bot detected Commands:{cmds}") # run the first command after sorting bot_response = command_handler[cmds[0]['cmd']]() # wait a responseDelay to avoid message collision from lora-ack time.sleep(responseDelay) return bot_response def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number): global multiPing if "?" in message and isDM: return message.split("?")[0].title() + " command returns SNR and RSSI, or hopcount from your message. Try adding e.g. @place or #tag" msg = "" type = '' if "ping" in message.lower(): msg = "🏓PONG\n" type = "🏓PING" elif "test" in message.lower() or "testing" in message.lower(): msg = random.choice(["🎙Testing 1,2,3\n", "🎙Testing\n",\ "🎙Testing, testing\n",\ "🎙Ah-wun, ah-two...\n", "🎙Is this thing on?\n",\ "🎙Roger that!\n",]) type = "🎙TEST" elif "ack" in message.lower(): msg = random.choice(["✋ACK-ACK!\n", "✋Ack to you!\n"]) type = "✋ACK" elif "cqcq" in message.lower() or "cq" in message.lower() or "cqcqcq" in message.lower(): if deviceID == 1: myname = get_name_from_number(myNodeNum1, 'short', 1) elif deviceID == 2: myname = get_name_from_number(myNodeNum2, 'short', 2) msg = f"QSP QSL OM DE {myname} K\n" else: msg = "🔊 Can you hear me now?" if hop == "Direct": msg = msg + f"SNR:{snr} RSSI:{rssi}" else: msg = msg + hop if "@" in message: msg = msg + " @" + message.split("@")[1] type = type + " @" + message.split("@")[1] elif "#" in message: msg = msg + " #" + message.split("#")[1] type = type + " #" + message.split("#")[1] # check for multi ping request if " " in message: # if stop multi ping if "stop" in message.lower(): for i in range(0, len(multiPingList)): if multiPingList[i].get('message_from_id') == message_from_id: multiPingList.pop(i) msg = "🛑 auto-ping" # if 3 or more entries (2 or more active), throttle the multi-ping for congestion if len(multiPingList) > 2: msg = "🚫⛔️ auto-ping, service busy. ⏳Try again soon." pingCount = -1 else: # set inital pingCount try: pingCount = int(message.split(" ")[1]) if pingCount == 123 or pingCount == 1234: pingCount = 1 if pingCount > 51: pingCount = 50 except: pingCount = -1 if pingCount > 1: multiPingList.append({'message_from_id': message_from_id, 'count': pingCount + 1, 'type': type, 'deviceID': deviceID, 'channel_number': channel_number, 'startCount': pingCount}) if type == "🎙TEST": msg = f"🛜Initalizing BufferTest, using chunks of about {int(maxBuffer // pingCount)}, max length {maxBuffer} in {pingCount} messages" else: msg = f"🚦Initalizing {pingCount} auto-ping" return msg def handle_motd(message): global MOTD if "$" in message: motd = message.split("$")[1] MOTD = motd.rstrip() return "MOTD Set to: " + MOTD else: return MOTD def handle_lheard(message, nodeid, deviceID, isDM): if "?" in message and isDM: return message.split("?")[0].title() + " command returns a list of the nodes that have been heard recently" # display last heard nodes add to response bot_response = "Last Heard\n" bot_response += str(get_node_list(1)) # show last users of the bot with the cmdHistory list history = handle_history(message, nodeid, deviceID, isDM, lheard=True) if history: bot_response += f'LastSeen\n{history}' else: # trim the last \n bot_response = bot_response[:-1] # bot_response += getNodeTelemetry(deviceID) return bot_response def onReceive(packet, interface): # Priocess the incoming packet, handles the responses to the packet with auto_response() # Sends the packet to the correct handler for processing # extract interface details from inbound packet rxType = type(interface).__name__ # Valies assinged to the packet rxNode, message_from_id, snr, rssi, hop, hop_away, channel_number = 0, 0, 0, 0, 0, 0, 0 pkiStatus = (False, 'ABC') isDM = False if DEBUGpacket: # Debug print the interface object for item in interface.__dict__.items(): intDebug = f"{item}\n" logger.debug(f"System: Packet Received on {rxType} Interface\n {intDebug} \n END of interface \n") # Debug print the packet for debugging logger.debug(f"Packet Received\n {packet} \n END of packet \n") # set the value for the incomming interface if rxType == 'SerialInterface': rxInterface = interface.__dict__.get('devPath', 'unknown') if port1 in rxInterface: rxNode = 1 elif interface2_enabled and port2 in rxInterface: rxNode = 2 if rxType == 'TCPInterface': rxHost = interface.__dict__.get('hostname', 'unknown') if hostname1 in rxHost and interface1_type == 'tcp': rxNode = 1 elif interface2_enabled and hostname2 in rxHost and interface2_type == 'tcp': rxNode = 2 if rxType == 'BLEInterface': if interface1_type == 'ble': rxNode = 1 elif interface2_enabled and interface2_type == 'ble': rxNode = 2 # check if the packet has a channel flag use it if packet.get('channel'): channel_number = packet.get('channel', 0) # handle TEXT_MESSAGE_APP try: if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': message_bytes = packet['decoded']['payload'] message_string = message_bytes.decode('utf-8') message_from_id = packet['from'] # get the signal strength and snr if available if packet.get('rxSnr') or packet.get('rxRssi'): snr = packet.get('rxSnr', 0) rssi = packet.get('rxRssi', 0) # check if the packet has a publicKey flag use it if packet.get('publicKey'): pkiStatus = (packet.get('pkiEncrypted', False), packet.get('publicKey', 'ABC')) # check if the packet has a hop count flag use it if packet.get('hopsAway'): hop_away = packet.get('hopsAway', 0) else: # if the packet does not have a hop count try other methods if packet.get('hopLimit'): hop_limit = packet.get('hopLimit', 0) else: hop_limit = 0 if packet.get('hopStart'): hop_start = packet.get('hopStart', 0) else: hop_start = 0 if hop_start == hop_limit: hop = "Direct" hop_count = 0 else: # set hop to Direct if the message was sent directly otherwise set the hop count if hop_away > 0: hop_count = hop_away else: hop_count = hop_start - hop_limit #print (f"calculated hop count: {hop_start} - {hop_limit} = {hop_count}") hop = f"{hop_count} hops" if help_message in message_string or welcome_message in message_string or "CMD?:" in message_string: # ignore help and welcome messages logger.warning(f"Got Own Welcome/Help header. From: {get_name_from_number(message_from_id, 'long', rxNode)}") return # If the packet is a DM (Direct Message) respond to it, otherwise validate its a message for us on the channel if packet['to'] == myNodeNum1 or packet['to'] == myNodeNum2: # message is DM to us isDM = True # check if the message contains a trap word, DMs are always responded to if (messageTrap(message_string) and not llm_enabled) or messageTrap(message_string.split()[0]): # log the message to the message log logger.info(f"Device:{rxNode} Channel: {channel_number} " + CustomFormatter.green + f"Received DM: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\ "From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}") # respond with DM send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: logger.warning(f"Device:{rxNode} Ignoring DM: {message_string} From: {get_name_from_number(message_from_id, 'long', rxNode)}") send_message(welcome_message, channel_number, message_from_id, rxNode) time.sleep(responseDelay) # log the message to the message log msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-')) else: # message is on a channel if messageTrap(message_string): if ignoreDefaultChannel and channel_number == publicChannel: logger.debug(f"System: ignoreDefaultChannel CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)}") else: # message is for bot to respond to logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "ReceivedChannel: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\ "From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}") if useDMForResponse: # respond to channel message via direct message send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # or respond to channel message on the channel itself if channel_number == publicChannel and antiSpam: # warning user spamming default channel logger.warning(f"System: AntiSpam protection, sending DM to: {get_name_from_number(message_from_id, 'long', rxNode)}") # respond to channel message via direct message send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode) else: # respond to channel message on the channel itself send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, 0, rxNode) else: # message is not for bot to respond to # ignore the message but add it to the message history list if zuluTime: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") else: timestamp = datetime.now().strftime("%Y-%m-%d %I:%M:%S%p") if len(msg_history) < storeFlimit: msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode)) else: msg_history.pop(0) msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode)) # print the message to the log and sdout logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "Ignoring Message:" + CustomFormatter.white +\ f" {message_string} " + CustomFormatter.purple + "From:" + CustomFormatter.white + f" {get_name_from_number(message_from_id)}") if log_messages_to_file: msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-')) # repeat the message on the other device if repeater_enabled and interface2_enabled: # wait a responseDelay to avoid message collision from lora-ack. time.sleep(responseDelay) rMsg = (f"{message_string} From:{get_name_from_number(message_from_id, 'short', rxNode)}") # if channel found in the repeater list repeat the message if str(channel_number) in repeater_channels: if rxNode == 1: logger.debug(f"Repeating message on Device2 Channel:{channel_number}") send_message(rMsg, channel_number, 0, 2) elif rxNode == 2: logger.debug(f"Repeating message on Device1 Channel:{channel_number}") send_message(rMsg, channel_number, 0, 1) else: # Evaluate non TEXT_MESSAGE_APP packets consumeMetadata(packet, rxNode) except KeyError as e: logger.critical(f"System: Error processing packet: {e} Device:{rxNode}") logger.debug(f"System: Error Packet = {packet}") async def start_rx(): print (CustomFormatter.bold_white + f"\nMeshtastic Autoresponder Bot CTL+C to exit\n" + CustomFormatter.reset) # Start the receive subscriber using pubsub via meshtastic library pub.subscribe(onReceive, 'meshtastic.receive') pub.subscribe(onDisconnect, 'meshtastic.connection.lost') logger.info(f"System: Autoresponder Started for Device1 {get_name_from_number(myNodeNum1, 'long', 1)}," f"{get_name_from_number(myNodeNum1, 'short', 1)}. NodeID: {myNodeNum1}, {decimal_to_hex(myNodeNum1)}") if interface2_enabled: logger.info(f"System: Autoresponder Started for Device2 {get_name_from_number(myNodeNum2, 'long', 2)}," f"{get_name_from_number(myNodeNum2, 'short', 2)}. NodeID: {myNodeNum2}, {decimal_to_hex(myNodeNum2)}") if log_messages_to_file: logger.debug("System: Logging Messages to disk") if syslog_to_file: logger.debug("System: Logging System Logs to disk") if solar_conditions_enabled: logger.debug("System: Celestial Telemetry Enabled") if motd_enabled: logger.debug(f"System: MOTD Enabled using {MOTD}") if sentry_enabled: logger.debug(f"System: Sentry Mode Enabled {sentry_radius}m radius reporting to channel:{secure_channel}") if store_forward_enabled: logger.debug(f"System: Store and Forward Enabled using limit: {storeFlimit}") if useDMForResponse: logger.debug(f"System: Respond by DM only") if repeater_enabled and interface2_enabled: logger.debug(f"System: Repeater Enabled for Channels: {repeater_channels}") if file_monitor_enabled: logger.debug(f"System: File Monitor Enabled for {file_monitor_file_path}, broadcasting to channels: {file_monitor_broadcastCh}") if read_news_enabled: logger.debug(f"System: File Monitor News Reader Enabled for {news_file_path}") if scheduler_enabled: # Examples of using the scheduler, Times here are in 24hr format # https://schedule.readthedocs.io/en/stable/ # Reminder Scheduler is enabled every Monday at noon send a log message schedule.every().monday.at("12:00").do(lambda: logger.info("System: Scheduled Broadcast Reminder")) logger.debug("System: Starting the broadcast scheduler") await BroadcastScheduler() # here we go loopty loo while True: await asyncio.sleep(0.5) pass # Hello World async def main(): meshRxTask = asyncio.create_task(start_rx()) watchdogTask = asyncio.create_task(watchdog()) if file_monitor_enabled: fileMonTask: asyncio.Task = asyncio.create_task(handleFileWatcher()) await asyncio.gather(meshRxTask, watchdogTask) if file_monitor_enabled: await asyncio.gather(fileMonTask) await asyncio.sleep(0.01) try: if __name__ == "__main__": asyncio.run(main()) except KeyboardInterrupt: exit_handler() pass # EOF