Merge pull request #39 from SpudGunMan/case_test

refactor autoresponse logic
This commit is contained in:
Kelly
2024-08-12 11:52:50 -07:00
committed by GitHub
2 changed files with 252 additions and 200 deletions

View File

@@ -10,167 +10,194 @@ from modules.system import *
def auto_response(message, snr, rssi, hop, message_from_id, channel_number, deviceID):
#Auto response to messages
bot_response = ""
if "ping" in message.lower():
#Check if the user added @foo to the message
if "@" in message:
if hop == "Direct":
bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + " and copy: " + message.split("@")[1]
else:
bot_response = "🏓PONG, " + hop + " and copy: " + message.split("@")[1]
else:
if hop == "Direct":
bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}"
else:
bot_response = "🏓PONG, " + hop
elif "pong" in message.lower():
bot_response = "🏓PING!!"
elif "motd" in message.lower():
global MOTD
#check if the user wants to set the motd by using $
if "$" in message:
motd = message.split("$")[1]
MOTD = motd.rstrip()
bot_response = "MOTD Set to: " + MOTD
else:
bot_response = MOTD
elif "bbshelp" in message.lower():
bot_response = bbs_help()
elif "wxalert" in message.lower() or "wxa" in message.lower():
if use_meteo_wxApi:
bot_response = "wxalert is not supported"
else:
location = get_node_location(message_from_id, deviceID)
weatherAlert = getActiveWeatherAlertsDetail(str(location[0]),str(location[1]))
bot_response = weatherAlert
elif "wxc" in message.lower() or "wx" in message.lower():
location = get_node_location(message_from_id, deviceID)
if use_meteo_wxApi and not "wxc" in message.lower() and not use_metric:
logger.debug(f"System: Bot Returning Open-Meteo API for weather imperial")
weather = get_wx_meteo(str(location[0]),str(location[1]))
elif use_meteo_wxApi:
logger.debug(f"System: Bot Returning Open-Meteo API for weather metric")
weather = get_wx_meteo(str(location[0]),str(location[1]),1)
elif not use_meteo_wxApi and "wxc" in message.lower() or use_metric:
logger.debug(f"System: Bot Returning NOAA API for weather metric")
weather = get_weather(str(location[0]),str(location[1]),1)
else:
logger.debug(f"System: Bot Returning NOAA API for weather imperial")
weather = get_weather(str(location[0]),str(location[1]))
bot_response = weather
elif "joke" in message.lower():
bot_response = tell_joke()
elif "bbslist" in message.lower():
bot_response = bbs_list_messages()
elif "bbspost" in message.lower():
# Check if the user added a subject to the message
if "$" in message and not "example:" in message:
subject = message.split("$")[1].split("#")[0]
subject = subject.rstrip()
if "#" in message:
body = message.split("#")[1]
body = body.rstrip()
logger.info(f"System: BBS Post: {subject} Body: {body}")
bot_response = bbs_post_message(subject,body,message_from_id)
elif not "example:" in message:
bot_response = "example: bbspost $subject #message"
# Check if the user added a node number to the message
elif "@" in message and not "example:" in message:
toNode = message.split("@")[1].split("#")[0]
toNode = toNode.rstrip()
# if toNode is a string look for short name and convert to number
if toNode.isalpha() or not toNode.isnumeric():
toNode = get_num_from_short_name(toNode, deviceID)
if toNode == 0:
bot_response = "Node not found " + message.split("@")[1].split("#")[0]
return bot_response
else:
logger.debug(f"System: bbspost, name lookup found: {toNode}")
if "#" in message:
body = message.split("#")[1]
bot_response = bbs_post_dm(toNode, body, message_from_id)
else:
bot_response = "example: bbspost @nodeNumber/ShortName #message"
elif not "example:" in message:
bot_response = "example: bbspost $subject #message, or bbspost @node #message"
elif "bbsread" in message.lower():
# Check if the user added a message number to the message
if "#" in message and not "example:" in message:
messageID = int(message.split("#")[1])
bot_response = bbs_read_message(messageID)
elif not "example:" in message:
bot_response = "Please add a message number example: bbsread #14"
elif "bbsdelete" in message.lower():
# Check if the user added a message number to the message
if "#" in message and not "example:" in message:
messageID = int(message.split("#")[1])
bot_response = bbs_delete_message(messageID, message_from_id)
elif not "example:" in message:
bot_response = "Please add a message number example: bbsdelete #14"
elif "messages" in message.lower():
response = ""
for msgH in msg_history:
# check if the message is from the same interface
if msgH[4] == deviceID:
# check if the message is from the same channel
if msgH[2] == channel_number or msgH[2] == publicChannel:
# consider message safe to send
response += f"\n{msgH[0]}: {msgH[1]}"
if len(response) > 0:
bot_response = "Message History:" + response
else:
bot_response = "No messages in history"
elif "cmd" in message.lower() or "cmd?" in message.lower():
bot_response = help_message
elif "sun" in message.lower():
location = get_node_location(message_from_id, deviceID, channel_number)
bot_response = get_sun(str(location[0]),str(location[1]))
elif "hfcond" in message.lower():
bot_response = hf_band_conditions()
elif "solar" in message.lower():
bot_response = drap_xray_conditions() + "\n" + solar_conditions()
elif "lheard" in message.lower() or "sitrep" in message.lower():
bot_response = "Last heard:\n" + str(get_node_list(1))
chutil1 = interface1.nodes.get(decimal_to_hex(myNodeNum1), {}).get("deviceMetrics", {}).get("channelUtilization", 0)
chutil1 = "{:.2f}".format(chutil1)
if interface2_enabled:
bot_response += "Port2:\n" + str(get_node_list(2))
chutil2 = interface2.nodes.get(decimal_to_hex(myNodeNum2), {}).get("deviceMetrics", {}).get("channelUtilization", 0)
chutil2 = "{:.2f}".format(chutil2)
bot_response += "Ch Use: " + str(chutil1) + "%"
if interface2_enabled:
bot_response += " P2:" + str(chutil2) + "%"
elif "whereami" in message.lower():
location = get_node_location(message_from_id, deviceID, channel_number)
where = where_am_i(str(location[0]),str(location[1]))
bot_response = where
elif "tide" in message.lower():
location = get_node_location(message_from_id, deviceID, channel_number)
tide = get_tide(str(location[0]),str(location[1]))
bot_response = tide
elif "moon" in message.lower():
location = get_node_location(message_from_id, deviceID, channel_number)
moon = get_moon(str(location[0]),str(location[1]))
bot_response = moon
elif "ack" in message.lower():
if hop == "Direct":
bot_response = "🏓ACK-ACK! " + f"SNR:{snr} RSSI:{rssi}"
else:
bot_response = "🏓ACK-ACK! " + hop
elif "testing" in message.lower() or "test" in message.lower():
if hop == "Direct":
bot_response = "🏓Testing 1,2,3 " + f"SNR:{snr} RSSI:{rssi}"
else:
bot_response = "🏓Testing 1,2,3 " + hop
else:
bot_response = "I'm sorry, I'm afraid I can't do that."
message_lower = message.lower()
bot_response = "I'm sorry, I'm afraid I can't do that."
command_handler = {
"ping": lambda: handle_ping(message, hop, snr, rssi),
"pong": lambda: "🏓PING!!",
"motd": lambda: handle_motd(message),
"bbshelp": bbs_help,
"wxalert": lambda: handle_wxalert(message_from_id, deviceID),
"wxa": lambda: handle_wxalert(message_from_id, deviceID),
"wxc": lambda: handle_wxc(message_from_id, deviceID, 'wxc'),
"wx": lambda: handle_wxc(message_from_id, deviceID, 'wx'),
"joke": tell_joke,
"bbslist": bbs_list_messages,
"bbspost": lambda: handle_bbspost(message, message_from_id, deviceID),
"bbsread": lambda: handle_bbsread(message),
"bbsdelete": lambda: handle_bbsdelete(message, message_from_id),
"messages": lambda: handle_messages(deviceID, channel_number, msg_history, publicChannel),
"cmd": lambda: help_message,
"cmd?": lambda: help_message,
"sun": lambda: handle_sun(message_from_id, deviceID, channel_number),
"hfcond": hf_band_conditions,
"solar": lambda: drap_xray_conditions() + "\n" + solar_conditions(),
"lheard": lambda: handle_lheard(),
"sitrep": lambda: handle_lheard(),
"whereami": lambda: handle_whereami(message_from_id, deviceID, channel_number),
"tide": lambda: handle_tide(message_from_id, deviceID, channel_number),
"moon": lambda: handle_moon(message_from_id, deviceID, channel_number),
"ack": lambda: handle_ack(hop, snr, rssi),
"testing": lambda: handle_testing(hop, snr, rssi),
"test": lambda: handle_testing(hop, snr, rssi),
}
cmds = [] # list to hold the commands found in the message
for key in command_handler:
if key in message_lower.split(' '):
cmds.append({'cmd': key, 'index': message_lower.index(key)})
if len(cmds) > 0:
# sort the commands by index value
cmds = sorted(cmds, key=lambda k: k['index'])
logger.debug(f"System: Bot Detected: {cmds}")
# run the first command after sorting
bot_response = command_handler[cmds[0]['cmd']]()
# wait a 700ms to avoid message collision from lora-ack
time.sleep(0.7)
return bot_response
def handle_ping(message, hop, snr, rssi):
if "@" in message:
if hop == "Direct":
return "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + " and copy: " + message.split("@")[1]
else:
return "🏓PONG, " + hop + " and copy: " + message.split("@")[1]
else:
if hop == "Direct":
return "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}"
else:
return "🏓PONG, " + hop
def handle_motd(message):
global MOTD
if "$" in message:
motd = message.split("$")[1]
MOTD = motd.rstrip()
return "MOTD Set to: " + MOTD
else:
return MOTD
def handle_wxalert(message_from_id, deviceID):
if use_meteo_wxApi:
return "wxalert is not supported"
else:
location = get_node_location(message_from_id, deviceID)
weatherAlert = getActiveWeatherAlertsDetail(str(location[0]), str(location[1]))
return weatherAlert
def handle_wxc(message_from_id, deviceID, cmd):
location = get_node_location(message_from_id, deviceID)
if use_meteo_wxApi and not "wxc" in cmd and not use_metric:
logger.debug(f"System: Bot Returning Open-Meteo API for weather imperial")
weather = get_wx_meteo(str(location[0]), str(location[1]))
elif use_meteo_wxApi:
logger.debug(f"System: Bot Returning Open-Meteo API for weather metric")
weather = get_wx_meteo(str(location[0]), str(location[1]), 1)
elif not use_meteo_wxApi and "wxc" in cmd or use_metric:
logger.debug(f"System: Bot Returning NOAA API for weather metric")
weather = get_weather(str(location[0]), str(location[1]), 1)
else:
logger.debug(f"System: Bot Returning NOAA API for weather imperial")
weather = get_weather(str(location[0]), str(location[1]))
return weather
def handle_bbspost(message, message_from_id, deviceID):
if "$" in message and not "example:" in message:
subject = message.split("$")[1].split("#")[0]
subject = subject.rstrip()
if "#" in message:
body = message.split("#")[1]
body = body.rstrip()
logger.info(f"System: BBS Post: {subject} Body: {body}")
return bbs_post_message(subject, body, message_from_id)
elif not "example:" in message:
return "example: bbspost $subject #message"
elif "@" in message and not "example:" in message:
toNode = message.split("@")[1].split("#")[0]
toNode = toNode.rstrip()
if toNode.isalpha() or not toNode.isnumeric():
toNode = get_num_from_short_name(toNode, deviceID)
if toNode == 0:
return "Node not found " + message.split("@")[1].split("#")[0]
else:
logger.debug(f"System: bbspost, name lookup found: {toNode}")
if "#" in message:
body = message.split("#")[1]
return bbs_post_dm(toNode, body, message_from_id)
else:
return "example: bbspost @nodeNumber/ShortName #message"
elif not "example:" in message:
return "example: bbspost $subject #message, or bbspost @node #message"
def handle_bbsread(message):
if "#" in message and not "example:" in message:
messageID = int(message.split("#")[1])
return bbs_read_message(messageID)
elif not "example:" in message:
return "Please add a message number example: bbsread #14"
def handle_bbsdelete(message, message_from_id):
if "#" in message and not "example:" in message:
messageID = int(message.split("#")[1])
return bbs_delete_message(messageID, message_from_id)
elif not "example:" in message:
return "Please add a message number example: bbsdelete #14"
def handle_messages(deviceID, channel_number, msg_history, publicChannel):
response = ""
for msgH in msg_history:
if msgH[4] == deviceID:
if msgH[2] == channel_number or msgH[2] == publicChannel:
response += f"\n{msgH[0]}: {msgH[1]}"
if len(response) > 0:
return "Message History:" + response
else:
return "No messages in history"
def handle_sun(message_from_id, deviceID, channel_number):
location = get_node_location(message_from_id, deviceID, channel_number)
return get_sun(str(location[0]), str(location[1]))
def handle_lheard():
bot_response = "Last heard:\n" + str(get_node_list(1))
chutil1 = interface1.nodes.get(decimal_to_hex(myNodeNum1), {}).get("deviceMetrics", {}).get("channelUtilization", 0)
chutil1 = "{:.2f}".format(chutil1)
if interface2_enabled:
bot_response += "Port2:\n" + str(get_node_list(2))
chutil2 = interface2.nodes.get(decimal_to_hex(myNodeNum2), {}).get("deviceMetrics", {}).get("channelUtilization", 0)
chutil2 = "{:.2f}".format(chutil2)
bot_response += "Ch Use: " + str(chutil1) + "%"
if interface2_enabled:
bot_response += " P2:" + str(chutil2) + "%"
return bot_response
def handle_whereami(message_from_id, deviceID, channel_number):
location = get_node_location(message_from_id, deviceID, channel_number)
return where_am_i(str(location[0]), str(location[1]))
def handle_tide(message_from_id, deviceID, channel_number):
location = get_node_location(message_from_id, deviceID, channel_number)
return get_tide(str(location[0]), str(location[1]))
def handle_moon(message_from_id, deviceID, channel_number):
location = get_node_location(message_from_id, deviceID, channel_number)
return get_moon(str(location[0]), str(location[1]))
def handle_ack(hop, snr, rssi):
if hop == "Direct":
return "🏓ACK-ACK! " + f"SNR:{snr} RSSI:{rssi}"
else:
return "🏓ACK-ACK! " + hop
def handle_testing(hop, snr, rssi):
if hop == "Direct":
return "🏓Testing 1,2,3 " + f"SNR:{snr} RSSI:{rssi}"
else:
return "🏓Testing 1,2,3 " + hop
def onReceive(packet, interface):
# extract interface defailts from interface object
rxType = type(interface).__name__

View File

@@ -10,56 +10,81 @@ from modules.system import *
def auto_response(message, snr, rssi, hop, message_from_id, channel_number, deviceID):
# Auto response to messages
if "ping" in message.lower():
# Check if the user added @foo to the message
if "@" in message:
if hop == "Direct":
bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + " and copy: " + message.split("@")[1]
else:
bot_response = "🏓PONG, " + hop + " and copy: " + message.split("@")[1]
else:
if hop == "Direct":
bot_response = "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}"
else:
bot_response = "🏓PONG, " + hop
elif "pong" in message.lower():
bot_response = "🏓Ping!!"
elif "motd" in message.lower():
#check if the user wants to set the motd by using $
if "$" in message:
motd = message.split("$")[1]
MOTD = motd.rstrip()
bot_response = "MOTD Set to: " + MOTD
else:
bot_response = MOTD
elif "cmd" in message.lower() or "cmd?" in message.lower():
bot_response = help_message
elif "lheard" in message.lower() or "sitrep" in message.lower():
bot_response = "Last heard:\n" + str(get_node_list(1))
chutil1 = interface1.nodes.get(decimal_to_hex(myNodeNum1), {}).get("deviceMetrics", {}).get("channelUtilization", 0)
chutil1 = "{:.2f}".format(chutil1)
if interface2_enabled:
bot_response += "Port2:\n" + str(get_node_list(2))
chutil2 = interface2.nodes.get(decimal_to_hex(myNodeNum2), {}).get("deviceMetrics", {}).get("channelUtilization", 0)
chutil2 = "{:.2f}".format(chutil2)
elif "ack" in message.lower():
if hop == "Direct":
bot_response = "🏓ACK-ACK! " + f"SNR:{snr} RSSI:{rssi}"
else:
bot_response = "🏓ACK-ACK! " + hop
elif "testing" in message.lower() or "test" in message.lower():
if hop == "Direct":
bot_response = "🏓Testing 1,2,3 " + f"SNR:{snr} RSSI:{rssi}"
else:
bot_response = "🏓Testing 1,2,3 " + hop
else:
bot_response = "I'm sorry, I'm afraid I can't do that."
message_lower = message.lower()
bot_response = "I'm sorry, I'm afraid I can't do that."
command_handler = {
"ping": lambda: handle_ping(message, hop, snr, rssi),
"pong": lambda: "🏓Ping!!",
"motd": lambda: handle_motd(message, MOTD),
"cmd": lambda: help_message,
"cmd?": lambda: help_message,
"lheard": lambda: handle_lheard(interface1, interface2_enabled, myNodeNum1, myNodeNum2),
"sitrep": lambda: handle_lheard(interface1, interface2_enabled, myNodeNum1, myNodeNum2),
"ack": lambda: handle_ack(hop, snr, rssi),
"testing": lambda: handle_testing(hop, snr, rssi),
"test": lambda: handle_testing(hop, snr, rssi),
}
cmds = [] # list to hold the commands found in the message
for key in command_handler:
if key in message_lower.split(' '):
cmds.append({'cmd': key, 'index': message_lower.index(key)})
if len(cmds) > 0:
# sort the commands by index value
cmds = sorted(cmds, key=lambda k: k['index'])
logger.debug(f"System: Bot Detected: {cmds}")
# run the first command after sorting
bot_response = command_handler[cmds[0]['cmd']]()
# wait a 700ms to avoid message collision from lora-ack
time.sleep(0.7)
return bot_response
def handle_ping(message, hop, snr, rssi):
if "@" in message:
if hop == "Direct":
return "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}" + " and copy: " + message.split("@")[1]
else:
return "🏓PONG, " + hop + " and copy: " + message.split("@")[1]
else:
if hop == "Direct":
return "🏓PONG, " + f"SNR:{snr} RSSI:{rssi}"
else:
return "🏓PONG, " + hop
def handle_motd(message):
global MOTD
if "$" in message:
motd = message.split("$")[1]
MOTD = motd.rstrip()
return "MOTD Set to: " + MOTD
else:
return MOTD
def handle_lheard(interface1, interface2_enabled, myNodeNum1, myNodeNum2):
bot_response = "Last heard:\n" + str(get_node_list(1))
chutil1 = interface1.nodes.get(decimal_to_hex(myNodeNum1), {}).get("deviceMetrics", {}).get("channelUtilization", 0)
chutil1 = "{:.2f}".format(chutil1)
if interface2_enabled:
bot_response += "Port2:\n" + str(get_node_list(2))
chutil2 = interface2.nodes.get(decimal_to_hex(myNodeNum2), {}).get("deviceMetrics", {}).get("channelUtilization", 0)
chutil2 = "{:.2f}".format(chutil2)
return bot_response
def handle_ack(hop, snr, rssi):
if hop == "Direct":
return "🏓ACK-ACK! " + f"SNR:{snr} RSSI:{rssi}"
else:
return "🏓ACK-ACK! " + hop
def handle_testing(hop, snr, rssi):
if hop == "Direct":
return "🏓Testing 1,2,3 " + f"SNR:{snr} RSSI:{rssi}"
else:
return "🏓Testing 1,2,3 " + hop
def onReceive(packet, interface):
# extract interface defailts from interface object
rxType = type(interface).__name__