Merge pull request #26 from rfschmid/add-traceroute-support

Add traceroute support
This commit is contained in:
pdxlocations
2025-01-16 11:37:56 -08:00
committed by GitHub
5 changed files with 135 additions and 18 deletions

View File

@@ -1,7 +1,7 @@
import sqlite3
import globals
import time
from utilities.utils import get_nodeNum, get_name_from_number
from utilities.utils import get_name_from_number
def get_table_name(channel):
# Construct the table name

View File

@@ -1,5 +1,5 @@
from meshtastic import BROADCAST_NUM
from utilities.utils import get_node_list, decimal_to_hex, get_nodeNum
from utilities.utils import get_node_list, decimal_to_hex, get_name_from_number
import globals
from ui.curses_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
from db_handler import save_message_to_db, maybe_store_nodeinfo_in_db
@@ -51,18 +51,12 @@ def on_receive(packet, interface):
# Add received message to the messages list
message_from_id = packet['from']
message_from_string = ""
for node in globals.interface.nodes.values():
if message_from_id == node['num']:
message_from_string = node["user"]["shortName"] + ":" # Get the name using the node ID
break
else:
message_from_string = str(decimal_to_hex(message_from_id)) # If long name not found, use the ID as string
if globals.channel_list[channel_number] in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]].append((f"{globals.message_prefix} {message_from_string} ", message_string))
else:
globals.all_messages[globals.channel_list[channel_number]] = [(f"{globals.message_prefix} {message_from_string} ", message_string)]
message_from_string = get_name_from_number(message_from_id, type='short') + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append((f"{globals.message_prefix} {message_from_string} ", message_string))
draw_channel_list()
draw_messages_window()

View File

@@ -1,7 +1,9 @@
from meshtastic import BROADCAST_NUM
from db_handler import save_message_to_db, update_ack_nak
from utilities.utils import get_nodeNum
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from utilities.utils import get_name_from_number
import globals
import google.protobuf.json_format
ack_naks = {}
@@ -35,6 +37,70 @@ def onAckNak(packet):
draw_messages_window()
def on_response_traceroute(packet):
"""on response for trace route"""
from ui.curses_ui import draw_channel_list, draw_messages_window, add_notification
UNK_SNR = -128 # Value representing unknown SNR
route_discovery = mesh_pb2.RouteDiscovery()
route_discovery.ParseFromString(packet["decoded"]["payload"])
msg_dict = google.protobuf.json_format.MessageToDict(route_discovery)
msg_str = "Traceroute to:\n"
route_str = get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
# SNR list should have one more entry than the route, as the final destination adds its SNR also
lenTowards = 0 if "route" not in msg_dict else len(msg_dict["route"])
snrTowardsValid = "snrTowards" in msg_dict and len(msg_dict["snrTowards"]) == lenTowards + 1
if lenTowards > 0: # Loop through hops in route and add SNR if available
for idx, node_num in enumerate(msg_dict["route"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrTowards"][idx] / 4) if snrTowardsValid and msg_dict["snrTowards"][idx] != UNK_SNR else "?") + "dB)"
# End with origin of response
route_str += " --> " + (get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}") \
+ " (" + (str(msg_dict["snrTowards"][-1] / 4) if snrTowardsValid and msg_dict["snrTowards"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route towards destination
# Only if hopStart is set and there is an SNR entry (for the origin) it's valid, even though route might be empty (direct connection)
lenBack = 0 if "routeBack" not in msg_dict else len(msg_dict["routeBack"])
backValid = "hopStart" in packet and "snrBack" in msg_dict and len(msg_dict["snrBack"]) == lenBack + 1
if backValid:
msg_str += "Back:\n"
route_str = get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
if lenBack > 0: # Loop through hops in routeBack and add SNR if available
for idx, node_num in enumerate(msg_dict["routeBack"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrBack"][idx] / 4) if msg_dict["snrBack"][idx] != UNK_SNR else "?") + "dB)"
# End with destination of response (us)
route_str += " --> " + (get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}") \
+ " (" + (str(msg_dict["snrBack"][-1] / 4) if msg_dict["snrBack"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route back to us
if(packet['from'] not in globals.channel_list):
globals.channel_list.append(packet['from'])
channel_number = globals.channel_list.index(packet['from'])
if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
add_notification(channel_number)
message_from_string = get_name_from_number(packet['from'], type='short') + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append((f"{globals.message_prefix} {message_from_string}", msg_str))
draw_channel_list()
draw_messages_window()
save_message_to_db(globals.channel_list[channel_number], packet['from'], msg_str)
def send_message(message, destination=BROADCAST_NUM, channel=0):
myid = globals.myNodeNum
@@ -65,3 +131,14 @@ def send_message(message, destination=BROADCAST_NUM, channel=0):
ack_naks[sent_message_data.id] = {'channel' : channel_id, 'messageIndex' : len(globals.all_messages[channel_id]) - 1, 'timestamp' : timestamp }
def send_traceroute():
r = mesh_pb2.RouteDiscovery()
globals.interface.sendData(
r,
destinationId=globals.node_list[globals.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,
channelIndex=0,
hopLimit=3,
)

View File

@@ -3,8 +3,8 @@ import textwrap
import globals
from utilities.utils import get_name_from_number, get_channels
from settings import settings
from message_handlers.tx_handler import send_message
from message_handlers.tx_handler import send_message, send_traceroute
import ui.dialog
def add_notification(channel_number):
handle_notification(channel_number, add=True)
@@ -324,7 +324,14 @@ def main_ui(stdscr):
# Check for Esc
elif char == 27:
break
# Check for Ctrl + t
elif char == 20:
send_traceroute()
curses.curs_set(0) # Hide cursor
ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
curses.curs_set(1) # Show cursor again
elif char == curses.KEY_ENTER or char == 10 or char == 13:
if globals.current_window == 2:
node_list = globals.node_list

39
ui/dialog.py Normal file
View File

@@ -0,0 +1,39 @@
import curses
def dialog(stdscr, title, message):
height, width = stdscr.getmaxyx()
# Calculate dialog dimensions
max_line_lengh = 0
message_lines = message.splitlines()
for l in message_lines:
max_line_length = max(len(l), max_line_lengh)
dialog_width = max(len(title) + 4, max_line_length + 4)
dialog_height = len(message_lines) + 4
x = (width - dialog_width) // 2
y = (height - dialog_height) // 2
# Create dialog window
win = curses.newwin(dialog_height, dialog_width, y, x)
win.border(0)
# Add title
win.addstr(0, 2, title)
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l)
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ")
# Refresh dialog window
win.refresh()
# Get user input
while True:
char = win.getch()
if char == curses.KEY_ENTER or char == 10 or char == 13:
win.clear()
win.refresh()
return