diff --git a/mesh_bot.py b/mesh_bot.py index b1bae7f..e895bf9 100755 --- a/mesh_bot.py +++ b/mesh_bot.py @@ -10,167 +10,194 @@ from modules.system import * def auto_response(message, snr, rssi, hop, message_from_id, channel_number, deviceID): #Auto response to messages - bot_response = "" - if "ping" in message.lower(): - #Check if the user added @foo to the message - if "@" in message: - if hop == "Direct": - bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + " and copy: " + message.split("@")[1] - else: - bot_response = "🏓PONG, " + hop + " and copy: " + message.split("@")[1] - else: - if hop == "Direct": - bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" - else: - bot_response = "🏓PONG, " + hop - elif "pong" in message.lower(): - bot_response = "🏓PING!!" - elif "motd" in message.lower(): - global MOTD - #check if the user wants to set the motd by using $ - if "$" in message: - motd = message.split("$")[1] - MOTD = motd.rstrip() - bot_response = "MOTD Set to: " + MOTD - else: - bot_response = MOTD - elif "bbshelp" in message.lower(): - bot_response = bbs_help() - elif "wxalert" in message.lower() or "wxa" in message.lower(): - if use_meteo_wxApi: - bot_response = "wxalert is not supported" - else: - location = get_node_location(message_from_id, deviceID) - weatherAlert = getActiveWeatherAlertsDetail(str(location[0]),str(location[1])) - bot_response = weatherAlert - elif "wxc" in message.lower() or "wx" in message.lower(): - location = get_node_location(message_from_id, deviceID) - if use_meteo_wxApi and not "wxc" in message.lower() and not use_metric: - logger.debug(f"System: Bot Returning Open-Meteo API for weather imperial") - weather = get_wx_meteo(str(location[0]),str(location[1])) - elif use_meteo_wxApi: - logger.debug(f"System: Bot Returning Open-Meteo API for weather metric") - weather = get_wx_meteo(str(location[0]),str(location[1]),1) - elif not use_meteo_wxApi and "wxc" in message.lower() or use_metric: - logger.debug(f"System: Bot Returning NOAA API for weather metric") - weather = get_weather(str(location[0]),str(location[1]),1) - else: - logger.debug(f"System: Bot Returning NOAA API for weather imperial") - weather = get_weather(str(location[0]),str(location[1])) - bot_response = weather - elif "joke" in message.lower(): - bot_response = tell_joke() - elif "bbslist" in message.lower(): - bot_response = bbs_list_messages() - elif "bbspost" in message.lower(): - # Check if the user added a subject to the message - if "$" in message and not "example:" in message: - subject = message.split("$")[1].split("#")[0] - subject = subject.rstrip() - if "#" in message: - body = message.split("#")[1] - body = body.rstrip() - logger.info(f"System: BBS Post: {subject} Body: {body}") - bot_response = bbs_post_message(subject,body,message_from_id) - elif not "example:" in message: - bot_response = "example: bbspost $subject #message" - # Check if the user added a node number to the message - elif "@" in message and not "example:" in message: - toNode = message.split("@")[1].split("#")[0] - toNode = toNode.rstrip() - # if toNode is a string look for short name and convert to number - if toNode.isalpha() or not toNode.isnumeric(): - toNode = get_num_from_short_name(toNode, deviceID) - if toNode == 0: - bot_response = "Node not found " + message.split("@")[1].split("#")[0] - return bot_response - else: - logger.debug(f"System: bbspost, name lookup found: {toNode}") - if "#" in message: - body = message.split("#")[1] - bot_response = bbs_post_dm(toNode, body, message_from_id) - else: - bot_response = "example: bbspost @nodeNumber/ShortName #message" - elif not "example:" in message: - bot_response = "example: bbspost $subject #message, or bbspost @node #message" - elif "bbsread" in message.lower(): - # Check if the user added a message number to the message - if "#" in message and not "example:" in message: - messageID = int(message.split("#")[1]) - bot_response = bbs_read_message(messageID) - elif not "example:" in message: - bot_response = "Please add a message number example: bbsread #14" - elif "bbsdelete" in message.lower(): - # Check if the user added a message number to the message - if "#" in message and not "example:" in message: - messageID = int(message.split("#")[1]) - bot_response = bbs_delete_message(messageID, message_from_id) - elif not "example:" in message: - bot_response = "Please add a message number example: bbsdelete #14" - elif "messages" in message.lower(): - response = "" - for msgH in msg_history: - # check if the message is from the same interface - if msgH[4] == deviceID: - # check if the message is from the same channel - if msgH[2] == channel_number or msgH[2] == publicChannel: - # consider message safe to send - response += f"\n{msgH[0]}: {msgH[1]}" - - if len(response) > 0: - bot_response = "Message History:" + response - else: - bot_response = "No messages in history" - elif "cmd" in message.lower() or "cmd?" in message.lower(): - bot_response = help_message - elif "sun" in message.lower(): - location = get_node_location(message_from_id, deviceID, channel_number) - bot_response = get_sun(str(location[0]),str(location[1])) - elif "hfcond" in message.lower(): - bot_response = hf_band_conditions() - elif "solar" in message.lower(): - bot_response = drap_xray_conditions() + "\n" + solar_conditions() - elif "lheard" in message.lower() or "sitrep" in message.lower(): - bot_response = "Last heard:\n" + str(get_node_list(1)) - chutil1 = interface1.nodes.get(decimal_to_hex(myNodeNum1), {}).get("deviceMetrics", {}).get("channelUtilization", 0) - chutil1 = "{:.2f}".format(chutil1) - if interface2_enabled: - bot_response += "Port2:\n" + str(get_node_list(2)) - chutil2 = interface2.nodes.get(decimal_to_hex(myNodeNum2), {}).get("deviceMetrics", {}).get("channelUtilization", 0) - chutil2 = "{:.2f}".format(chutil2) - bot_response += "Ch Use: " + str(chutil1) + "%" - if interface2_enabled: - bot_response += " P2:" + str(chutil2) + "%" - elif "whereami" in message.lower(): - location = get_node_location(message_from_id, deviceID, channel_number) - where = where_am_i(str(location[0]),str(location[1])) - bot_response = where - elif "tide" in message.lower(): - location = get_node_location(message_from_id, deviceID, channel_number) - tide = get_tide(str(location[0]),str(location[1])) - bot_response = tide - elif "moon" in message.lower(): - location = get_node_location(message_from_id, deviceID, channel_number) - moon = get_moon(str(location[0]),str(location[1])) - bot_response = moon - elif "ack" in message.lower(): - if hop == "Direct": - bot_response = "🏓ACK-ACK! " + f"SNR:{snr} RSSI:{rssi}" - else: - bot_response = "🏓ACK-ACK! " + hop - elif "testing" in message.lower() or "test" in message.lower(): - if hop == "Direct": - bot_response = "🏓Testing 1,2,3 " + f"SNR:{snr} RSSI:{rssi}" - else: - bot_response = "🏓Testing 1,2,3 " + hop - else: - bot_response = "I'm sorry, I'm afraid I can't do that." - + message_lower = message.lower() + bot_response = "I'm sorry, I'm afraid I can't do that." + + command_handler = { + "ping": lambda: handle_ping(message, hop, snr, rssi), + "pong": lambda: "🏓PING!!", + "motd": lambda: handle_motd(message), + "bbshelp": bbs_help, + "wxalert": lambda: handle_wxalert(message_from_id, deviceID), + "wxa": lambda: handle_wxalert(message_from_id, deviceID), + "wxc": lambda: handle_wxc(message_from_id, deviceID, 'wxc'), + "wx": lambda: handle_wxc(message_from_id, deviceID, 'wx'), + "joke": tell_joke, + "bbslist": bbs_list_messages, + "bbspost": lambda: handle_bbspost(message, message_from_id, deviceID), + "bbsread": lambda: handle_bbsread(message), + "bbsdelete": lambda: handle_bbsdelete(message, message_from_id), + "messages": lambda: handle_messages(deviceID, channel_number, msg_history, publicChannel), + "cmd": lambda: help_message, + "cmd?": lambda: help_message, + "sun": lambda: handle_sun(message_from_id, deviceID, channel_number), + "hfcond": hf_band_conditions, + "solar": lambda: drap_xray_conditions() + "\n" + solar_conditions(), + "lheard": lambda: handle_lheard(), + "sitrep": lambda: handle_lheard(), + "whereami": lambda: handle_whereami(message_from_id, deviceID, channel_number), + "tide": lambda: handle_tide(message_from_id, deviceID, channel_number), + "moon": lambda: handle_moon(message_from_id, deviceID, channel_number), + "ack": lambda: handle_ack(hop, snr, rssi), + "testing": lambda: handle_testing(hop, snr, rssi), + "test": lambda: handle_testing(hop, snr, rssi), + } + cmds = [] # list to hold the commands found in the message + for key in command_handler: + if key in message_lower.split(' '): + cmds.append({'cmd': key, 'index': message_lower.index(key)}) + + if len(cmds) > 0: + # sort the commands by index value + cmds = sorted(cmds, key=lambda k: k['index']) + logger.debug(f"System: Bot Detected: {cmds}") + # run the first command after sorting + bot_response = command_handler[cmds[0]['cmd']]() + # wait a 700ms to avoid message collision from lora-ack time.sleep(0.7) return bot_response +def handle_ping(message, hop, snr, rssi): + if "@" in message: + if hop == "Direct": + return "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + " and copy: " + message.split("@")[1] + else: + return "🏓PONG, " + hop + " and copy: " + message.split("@")[1] + else: + if hop == "Direct": + return "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + else: + return "🏓PONG, " + hop + +def handle_motd(message): + global MOTD + if "$" in message: + motd = message.split("$")[1] + MOTD = motd.rstrip() + return "MOTD Set to: " + MOTD + else: + return MOTD + +def handle_wxalert(message_from_id, deviceID): + if use_meteo_wxApi: + return "wxalert is not supported" + else: + location = get_node_location(message_from_id, deviceID) + weatherAlert = getActiveWeatherAlertsDetail(str(location[0]), str(location[1])) + return weatherAlert + +def handle_wxc(message_from_id, deviceID, cmd): + location = get_node_location(message_from_id, deviceID) + if use_meteo_wxApi and not "wxc" in cmd and not use_metric: + logger.debug(f"System: Bot Returning Open-Meteo API for weather imperial") + weather = get_wx_meteo(str(location[0]), str(location[1])) + elif use_meteo_wxApi: + logger.debug(f"System: Bot Returning Open-Meteo API for weather metric") + weather = get_wx_meteo(str(location[0]), str(location[1]), 1) + elif not use_meteo_wxApi and "wxc" in cmd or use_metric: + logger.debug(f"System: Bot Returning NOAA API for weather metric") + weather = get_weather(str(location[0]), str(location[1]), 1) + else: + logger.debug(f"System: Bot Returning NOAA API for weather imperial") + weather = get_weather(str(location[0]), str(location[1])) + return weather + +def handle_bbspost(message, message_from_id, deviceID): + if "$" in message and not "example:" in message: + subject = message.split("$")[1].split("#")[0] + subject = subject.rstrip() + if "#" in message: + body = message.split("#")[1] + body = body.rstrip() + logger.info(f"System: BBS Post: {subject} Body: {body}") + return bbs_post_message(subject, body, message_from_id) + elif not "example:" in message: + return "example: bbspost $subject #message" + elif "@" in message and not "example:" in message: + toNode = message.split("@")[1].split("#")[0] + toNode = toNode.rstrip() + if toNode.isalpha() or not toNode.isnumeric(): + toNode = get_num_from_short_name(toNode, deviceID) + if toNode == 0: + return "Node not found " + message.split("@")[1].split("#")[0] + else: + logger.debug(f"System: bbspost, name lookup found: {toNode}") + if "#" in message: + body = message.split("#")[1] + return bbs_post_dm(toNode, body, message_from_id) + else: + return "example: bbspost @nodeNumber/ShortName #message" + elif not "example:" in message: + return "example: bbspost $subject #message, or bbspost @node #message" + +def handle_bbsread(message): + if "#" in message and not "example:" in message: + messageID = int(message.split("#")[1]) + return bbs_read_message(messageID) + elif not "example:" in message: + return "Please add a message number example: bbsread #14" + +def handle_bbsdelete(message, message_from_id): + if "#" in message and not "example:" in message: + messageID = int(message.split("#")[1]) + return bbs_delete_message(messageID, message_from_id) + elif not "example:" in message: + return "Please add a message number example: bbsdelete #14" + +def handle_messages(deviceID, channel_number, msg_history, publicChannel): + response = "" + for msgH in msg_history: + if msgH[4] == deviceID: + if msgH[2] == channel_number or msgH[2] == publicChannel: + response += f"\n{msgH[0]}: {msgH[1]}" + if len(response) > 0: + return "Message History:" + response + else: + return "No messages in history" + +def handle_sun(message_from_id, deviceID, channel_number): + location = get_node_location(message_from_id, deviceID, channel_number) + return get_sun(str(location[0]), str(location[1])) + +def handle_lheard(): + bot_response = "Last heard:\n" + str(get_node_list(1)) + chutil1 = interface1.nodes.get(decimal_to_hex(myNodeNum1), {}).get("deviceMetrics", {}).get("channelUtilization", 0) + chutil1 = "{:.2f}".format(chutil1) + if interface2_enabled: + bot_response += "Port2:\n" + str(get_node_list(2)) + chutil2 = interface2.nodes.get(decimal_to_hex(myNodeNum2), {}).get("deviceMetrics", {}).get("channelUtilization", 0) + chutil2 = "{:.2f}".format(chutil2) + bot_response += "Ch Use: " + str(chutil1) + "%" + if interface2_enabled: + bot_response += " P2:" + str(chutil2) + "%" + return bot_response + +def handle_whereami(message_from_id, deviceID, channel_number): + location = get_node_location(message_from_id, deviceID, channel_number) + return where_am_i(str(location[0]), str(location[1])) + +def handle_tide(message_from_id, deviceID, channel_number): + location = get_node_location(message_from_id, deviceID, channel_number) + return get_tide(str(location[0]), str(location[1])) + +def handle_moon(message_from_id, deviceID, channel_number): + location = get_node_location(message_from_id, deviceID, channel_number) + return get_moon(str(location[0]), str(location[1])) + +def handle_ack(hop, snr, rssi): + if hop == "Direct": + return "🏓ACK-ACK! " + f"SNR:{snr} RSSI:{rssi}" + else: + return "🏓ACK-ACK! " + hop + +def handle_testing(hop, snr, rssi): + if hop == "Direct": + return "🏓Testing 1,2,3 " + f"SNR:{snr} RSSI:{rssi}" + else: + return "🏓Testing 1,2,3 " + hop + def onReceive(packet, interface): # extract interface defailts from interface object rxType = type(interface).__name__ diff --git a/pong_bot.py b/pong_bot.py index fbb1bea..a7c4796 100755 --- a/pong_bot.py +++ b/pong_bot.py @@ -10,56 +10,81 @@ from modules.system import * def auto_response(message, snr, rssi, hop, message_from_id, channel_number, deviceID): # Auto response to messages - if "ping" in message.lower(): - # Check if the user added @foo to the message - if "@" in message: - if hop == "Direct": - bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + " and copy: " + message.split("@")[1] - else: - bot_response = "🏓PONG, " + hop + " and copy: " + message.split("@")[1] - else: - if hop == "Direct": - bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" - else: - bot_response = "🏓PONG, " + hop - elif "pong" in message.lower(): - bot_response = "🏓Ping!!" - elif "motd" in message.lower(): - #check if the user wants to set the motd by using $ - if "$" in message: - motd = message.split("$")[1] - MOTD = motd.rstrip() - bot_response = "MOTD Set to: " + MOTD - else: - bot_response = MOTD - elif "cmd" in message.lower() or "cmd?" in message.lower(): - bot_response = help_message - elif "lheard" in message.lower() or "sitrep" in message.lower(): - bot_response = "Last heard:\n" + str(get_node_list(1)) - chutil1 = interface1.nodes.get(decimal_to_hex(myNodeNum1), {}).get("deviceMetrics", {}).get("channelUtilization", 0) - chutil1 = "{:.2f}".format(chutil1) - if interface2_enabled: - bot_response += "Port2:\n" + str(get_node_list(2)) - chutil2 = interface2.nodes.get(decimal_to_hex(myNodeNum2), {}).get("deviceMetrics", {}).get("channelUtilization", 0) - chutil2 = "{:.2f}".format(chutil2) - elif "ack" in message.lower(): - if hop == "Direct": - bot_response = "🏓ACK-ACK! " + f"SNR:{snr} RSSI:{rssi}" - else: - bot_response = "🏓ACK-ACK! " + hop - elif "testing" in message.lower() or "test" in message.lower(): - if hop == "Direct": - bot_response = "🏓Testing 1,2,3 " + f"SNR:{snr} RSSI:{rssi}" - else: - bot_response = "🏓Testing 1,2,3 " + hop - else: - bot_response = "I'm sorry, I'm afraid I can't do that." + message_lower = message.lower() + bot_response = "I'm sorry, I'm afraid I can't do that." + + command_handler = { + "ping": lambda: handle_ping(message, hop, snr, rssi), + "pong": lambda: "🏓Ping!!", + "motd": lambda: handle_motd(message, MOTD), + "cmd": lambda: help_message, + "cmd?": lambda: help_message, + "lheard": lambda: handle_lheard(interface1, interface2_enabled, myNodeNum1, myNodeNum2), + "sitrep": lambda: handle_lheard(interface1, interface2_enabled, myNodeNum1, myNodeNum2), + "ack": lambda: handle_ack(hop, snr, rssi), + "testing": lambda: handle_testing(hop, snr, rssi), + "test": lambda: handle_testing(hop, snr, rssi), + } + cmds = [] # list to hold the commands found in the message + for key in command_handler: + if key in message_lower.split(' '): + cmds.append({'cmd': key, 'index': message_lower.index(key)}) + + if len(cmds) > 0: + # sort the commands by index value + cmds = sorted(cmds, key=lambda k: k['index']) + logger.debug(f"System: Bot Detected: {cmds}") + # run the first command after sorting + bot_response = command_handler[cmds[0]['cmd']]() # wait a 700ms to avoid message collision from lora-ack time.sleep(0.7) return bot_response +def handle_ping(message, hop, snr, rssi): + if "@" in message: + if hop == "Direct": + return "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + " and copy: " + message.split("@")[1] + else: + return "🏓PONG, " + hop + " and copy: " + message.split("@")[1] + else: + if hop == "Direct": + return "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + else: + return "🏓PONG, " + hop + +def handle_motd(message): + global MOTD + if "$" in message: + motd = message.split("$")[1] + MOTD = motd.rstrip() + return "MOTD Set to: " + MOTD + else: + return MOTD + +def handle_lheard(interface1, interface2_enabled, myNodeNum1, myNodeNum2): + bot_response = "Last heard:\n" + str(get_node_list(1)) + chutil1 = interface1.nodes.get(decimal_to_hex(myNodeNum1), {}).get("deviceMetrics", {}).get("channelUtilization", 0) + chutil1 = "{:.2f}".format(chutil1) + if interface2_enabled: + bot_response += "Port2:\n" + str(get_node_list(2)) + chutil2 = interface2.nodes.get(decimal_to_hex(myNodeNum2), {}).get("deviceMetrics", {}).get("channelUtilization", 0) + chutil2 = "{:.2f}".format(chutil2) + return bot_response + +def handle_ack(hop, snr, rssi): + if hop == "Direct": + return "🏓ACK-ACK! " + f"SNR:{snr} RSSI:{rssi}" + else: + return "🏓ACK-ACK! " + hop + +def handle_testing(hop, snr, rssi): + if hop == "Direct": + return "🏓Testing 1,2,3 " + f"SNR:{snr} RSSI:{rssi}" + else: + return "🏓Testing 1,2,3 " + hop + def onReceive(packet, interface): # extract interface defailts from interface object rxType = type(interface).__name__