Compare commits

...

35 Commits

Author SHA1 Message Date
pdxlocations
bc72c3b0b6 break out color definitions 2025-01-20 23:01:04 -08:00
pdxlocations
559618a229 Merge pull request #40 from pdxlocations/fix-wrapped-lines-out-of-bounds
fix text wrap line count
2025-01-20 21:04:41 -08:00
pdxlocations
2abdd763c1 fix text wrap line count 2025-01-20 20:51:03 -08:00
pdxlocations
89d8b7690f New Settings Menu (#39)
* init

* backup old code

* working changes

* bckup old settings

* working changes

* working changes

* rename backup

* current state of things

* starting to work

* no test for you

* working changes

* working changes

* semi-working changes

* changes

* integrating with contact main.py

* working changes

* working changes

* working changes

* rm settings.log

* bool messages and sys function confirmation

* start IP's and sub-categories

* display enum names

* fix deep nested configs
2025-01-20 16:02:20 -08:00
pdxlocations
d267f41805 Merge pull request #38 from rfschmid/fix-potential-crash-in-sort-nodes 2025-01-20 09:11:13 -08:00
Russell Schmidt
72939e5988 Fix potential crash in sort nodes 2025-01-20 09:41:26 -06:00
pdxlocations
bd5d8aa6e4 Merge pull request #34 from rfschmid/remove-unused-sanitize-string 2025-01-17 11:42:25 -08:00
pdxlocations
5899f35833 Merge pull request #33 from rfschmid/okay-dialog-improvements 2025-01-17 11:41:21 -08:00
pdxlocations
550556df2b Merge pull request #32 from rfschmid/fix-sql-injection 2025-01-17 11:38:43 -08:00
pdxlocations
fac22aee91 Merge pull request #31 from rfschmid/limit-more-drawing 2025-01-17 11:37:46 -08:00
Russell Schmidt
87e68689f4 Remove unused sanitize string function 2025-01-17 12:59:16 -06:00
Russell Schmidt
fe1eaacee9 Make dialog easier to get out of 2025-01-17 12:55:43 -06:00
Russell Schmidt
94e7e8f628 Make dialog "Ok" look highlighted 2025-01-17 12:49:39 -06:00
Russell Schmidt
54ec4009a1 Fix sql injection in update_ack_nak
Messages with single quotes would send data directly to sqlite
2025-01-17 12:26:47 -06:00
Russell Schmidt
9901e5b8a0 Limit more drawing in tx handler 2025-01-17 07:59:13 -06:00
Russell Schmidt
80a5edb6de Limit redrawing on message receipt 2025-01-17 07:45:01 -06:00
Russell Schmidt
93664397e8 Refresh node list only on change 2025-01-17 07:40:08 -06:00
pdxlocations
5bd33ed786 Merge pull request #30 from rfschmid/limit-refreshing
Limit refreshing on changing windows
2025-01-16 16:37:07 -08:00
Russell Schmidt
fae3330bb0 Limit refreshing on changing windows 2025-01-16 18:21:17 -06:00
pdxlocations
5bd91bde25 Merge pull request #26 from rfschmid/add-traceroute-support
Add traceroute support
2025-01-16 11:37:56 -08:00
Russell Schmidt
2a7317e612 Add warning about 30 second traceroute interval 2025-01-16 13:06:36 -06:00
Russell Schmidt
68f13585b6 Make traceroute less verbose 2025-01-16 12:51:31 -06:00
Russell Schmidt
ad81d34551 typo
Co-authored-by: pdxlocations <117498748+pdxlocations@users.noreply.github.com>
2025-01-16 12:44:19 -06:00
Russell Schmidt
4c4c0d553e Add dialog confirmation when traceroute sent 2025-01-16 12:07:29 -06:00
Russell Schmidt
02c104dcd5 Remove unused imports 2025-01-16 07:59:22 -06:00
Russell Schmidt
96493e5973 Add traceroute support 2025-01-16 07:49:01 -06:00
Russell Schmidt
8a13b60d23 Small cleanup in rx handler 2025-01-16 07:38:28 -06:00
pdxlocations
7ae4bb7c9d just get my node number once 2025-01-15 22:28:44 -08:00
pdxlocations
36ba9065a2 typo 2025-01-15 21:35:07 -08:00
pdxlocations
2ad2aa1faa fix gitignore 2025-01-15 21:33:32 -08:00
pdxlocations
342afce9fb start logging 2025-01-15 21:32:30 -08:00
pdxlocations
205a0c547d Merge pull request #25 from rfschmid/persist-ack-to-db
Persist message acks to db
2025-01-15 19:50:12 -08:00
Russell Schmidt
daa94f57a6 Persist message acks to db 2025-01-15 17:32:39 -06:00
pdxlocations
5e17e8e7d3 Merge pull request #24 from rfschmid/sort-nodes-by-last-heard
Sort nodes list by last heard
2025-01-15 14:19:48 -08:00
Russell Schmidt
455c3b10dd Sort nodes list by last heard 2025-01-15 12:37:40 -06:00
14 changed files with 991 additions and 872 deletions

3
.gitignore vendored
View File

@@ -1,4 +1,7 @@
venv/
.venv/
__pycache__/
client.db
.DS_Store
client.log
settings.log

View File

@@ -1,7 +1,13 @@
import sqlite3
import globals
import time
from utilities.utils import get_nodeNum, get_name_from_number
from utilities.utils import get_name_from_number
def get_table_name(channel):
# Construct the table name
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
def save_message_to_db(channel, user_id, message_text):
"""Save messages to the database, ensuring the table exists."""
@@ -9,35 +15,59 @@ def save_message_to_db(channel, user_id, message_text):
with sqlite3.connect(globals.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
# Construct the table name
table_name = f"{str(get_nodeNum())}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
quoted_table_name = get_table_name(channel)
# Ensure the table exists
create_table_query = f'''
CREATE TABLE IF NOT EXISTS {quoted_table_name} (
user_id TEXT,
message_text TEXT,
timestamp INTEGER
timestamp INTEGER,
ack_type TEXT
)
'''
db_cursor.execute(create_table_query)
timestamp = int(time.time())
# Insert the message
insert_query = f'''
INSERT INTO {quoted_table_name} (user_id, message_text, timestamp)
VALUES (?, ?, ?)
INSERT INTO {quoted_table_name} (user_id, message_text, timestamp, ack_type)
VALUES (?, ?, ?, ?)
'''
db_cursor.execute(insert_query, (user_id, message_text, int(time.time())))
db_cursor.execute(insert_query, (user_id, message_text, timestamp, None))
db_connection.commit()
return timestamp
except sqlite3.Error as e:
print(f"SQLite error in save_message_to_db: {e}")
except Exception as e:
print(f"Unexpected error in save_message_to_db: {e}")
def update_ack_nak(channel, timestamp, message, ack):
try:
with sqlite3.connect(globals.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
update_query = f"""
UPDATE {get_table_name(channel)}
SET ack_type = ?
WHERE user_id = ? AND
timestamp = ? AND
message_text = ?
"""
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_connection.commit()
except sqlite3.Error as e:
print(f"SQLite error in update_ack_nak: {e}")
except Exception as e:
print(f"Unexpected error in update_ack_nak: {e}")
def load_messages_from_db():
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
@@ -47,17 +77,23 @@ def load_messages_from_db():
# Retrieve all table names that match the pattern
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(get_nodeNum())}_%_messages",))
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages
for table_name in tables:
query = f'SELECT user_id, message_text FROM "{table_name}"'
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
table_columns = [i[1] for i in db_cursor.execute(f'PRAGMA table_info({quoted_table_name})')]
if("ack_type" not in table_columns):
update_table_query = f"ALTER TABLE {quoted_table_name} ADD COLUMN ack_type TEXT"
db_cursor.execute(update_table_query)
query = f'SELECT user_id, message_text, ack_type FROM {quoted_table_name}'
try:
# Fetch all messages from the table
db_cursor.execute(query)
db_messages = [(row[0], row[1]) for row in db_cursor.fetchall()] # Save as tuples
db_messages = [(row[0], row[1], row[2]) for row in db_cursor.fetchall()] # Save as tuples
# Extract the channel name from the table name
channel = table_name.split("_")[1]
@@ -74,9 +110,17 @@ def load_messages_from_db():
globals.all_messages[channel] = []
# Add messages to globals.all_messages in tuple format
for user_id, message in db_messages:
if user_id == str(get_nodeNum()):
formatted_message = (f"{globals.sent_message_prefix}: ", message)
for user_id, message, ack_type in db_messages:
if user_id == str(globals.myNodeNum):
ack_str = globals.ack_unknown_str
if(ack_type == "Implicit"):
ack_str = globals.ack_implicit_str
elif(ack_type == "Ack"):
ack_str = globals.ack_str
elif(ack_type == "Nak"):
ack_str = globals.nak_str
formatted_message = (f"{globals.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (f"{globals.message_prefix} {get_name_from_number(int(user_id), 'short')}: ", message)
@@ -97,7 +141,7 @@ def init_nodedb():
db_cursor = db_connection.cursor()
# Table name construction
table_name = f"{str(get_nodeNum())}_nodedb"
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote the table name because it might begin with numerics
# Create the table if it doesn't exist
@@ -148,7 +192,7 @@ def maybe_store_nodeinfo_in_db(packet):
try:
with sqlite3.connect(globals.db_file_path) as db_connection:
table_name = f"{str(get_nodeNum())}_nodedb"
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote the table name becuase we might begin with numerics
db_cursor = db_connection.cursor()
@@ -224,6 +268,5 @@ def maybe_store_nodeinfo_in_db(packet):
except sqlite3.Error as e:
print(f"SQLite error in maybe_store_nodeinfo_in_db: {e}")
finally:
db_connection.close()

View File

@@ -2,6 +2,7 @@ import os
app_directory = os.path.dirname(os.path.abspath(__file__))
db_file_path = os.path.join(app_directory, "client.db")
log_file_path = os.path.join(app_directory, "client.log")
all_messages = {}
channel_list = []
@@ -16,4 +17,9 @@ interface = None
display_log = False
message_prefix = ">>"
sent_message_prefix = message_prefix + " Sent"
notification_symbol = "*"
ack_implicit_str = "[◌]"
ack_str = "[✓]"
nak_str = "[x]"
ack_unknown_str = "[…]"
notification_symbol = "*"
node_list = []

212
input_handlers.py Normal file
View File

@@ -0,0 +1,212 @@
import curses
import ipaddress
def get_user_input(prompt):
# Calculate the dynamic height and width for the input window
height = 7 # Fixed height for input prompt
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
# Create a new window for user input
input_win = curses.newwin(height, width, start_y, start_x)
input_win.clear()
input_win.border()
# Display the prompt
input_win.addstr(1, 2, prompt, curses.A_BOLD)
input_win.addstr(3, 2, "Enter value: ")
input_win.refresh()
# Enable user input
curses.echo()
curses.curs_set(1)
user_input = ""
while True:
key = input_win.getch(3, 15 + len(user_input)) # Adjust cursor position dynamically
if key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
curses.noecho()
curses.curs_set(0)
return None # Exit without returning a value
elif key == ord('\n'): # Enter key
break
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace
user_input = user_input[:-1]
input_win.addstr(3, 15, " " * (len(user_input) + 1)) # Clear the line
input_win.addstr(3, 15, user_input)
else:
user_input += chr(key)
input_win.addstr(3, 15, user_input)
curses.curs_set(0)
curses.noecho()
# Clear the input window
input_win.clear()
input_win.refresh()
return user_input
def get_bool_selection(message, current_value):
message = "Select True or False:" if None else message
cvalue = current_value
options = ["True", "False"]
selected_index = 0 if current_value == "True" else 1
height = 7
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
bool_win = curses.newwin(height, width, start_y, start_x)
bool_win.keypad(True)
while True:
bool_win.clear()
bool_win.border()
bool_win.addstr(1, 2, message, curses.A_BOLD)
for idx, option in enumerate(options):
if idx == selected_index:
bool_win.addstr(idx + 3, 4, option, curses.A_REVERSE)
else:
bool_win.addstr(idx + 3, 4, option)
bool_win.refresh()
key = bool_win.getch()
if key == curses.KEY_UP:
selected_index = max(0, selected_index - 1)
elif key == curses.KEY_DOWN:
selected_index = min(len(options) - 1, selected_index + 1)
elif key == ord('\n'): # Enter key
return options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
return cvalue
def get_repeated_input(current_value):
cvalue = current_value
height = 10
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
repeated_win = curses.newwin(height, width, start_y, start_x)
repeated_win.keypad(True) # Enable keypad for special keys
curses.echo()
curses.curs_set(1)
user_input = ""
while True:
repeated_win.clear()
repeated_win.border()
repeated_win.addstr(1, 2, "Enter comma-separated values:", curses.A_BOLD)
repeated_win.addstr(3, 2, f"Current: {', '.join(current_value)}")
repeated_win.addstr(5, 2, f"New value: {user_input}")
repeated_win.refresh()
key = repeated_win.getch()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow
curses.noecho()
curses.curs_set(0)
return cvalue # Return the current value without changes
elif key == ord('\n'): # Enter key to save and return
curses.noecho()
curses.curs_set(0)
return user_input.split(",") # Split the input into a list
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
user_input = user_input[:-1]
else:
try:
user_input += chr(key) # Append valid character input
except ValueError:
pass # Ignore invalid character inputs
def get_enum_input(options, current_value):
selected_index = options.index(current_value) if current_value in options else 0
height = min(len(options) + 4, curses.LINES - 2)
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
enum_win = curses.newwin(height, width, start_y, start_x)
enum_win.keypad(True)
while True:
enum_win.clear()
enum_win.border()
enum_win.addstr(1, 2, "Select an option:", curses.A_BOLD)
for idx, option in enumerate(options):
if idx == selected_index:
enum_win.addstr(idx + 2, 4, option, curses.A_REVERSE)
else:
enum_win.addstr(idx + 2, 4, option)
enum_win.refresh()
key = enum_win.getch()
if key == curses.KEY_UP:
selected_index = max(0, selected_index - 1)
elif key == curses.KEY_DOWN:
selected_index = min(len(options) - 1, selected_index + 1)
elif key == ord('\n'): # Enter key
return options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
return current_value
def get_fixed32_input(current_value):
cvalue = current_value
current_value = str(ipaddress.IPv4Address(current_value))
height = 10
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
fixed32_win = curses.newwin(height, width, start_y, start_x)
fixed32_win.keypad(True)
curses.echo()
curses.curs_set(1)
user_input = ""
while True:
fixed32_win.clear()
fixed32_win.border()
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD)
fixed32_win.addstr(3, 2, f"Current: {current_value}")
fixed32_win.addstr(5, 2, f"New value: {user_input}")
fixed32_win.refresh()
key = fixed32_win.getch()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow to cancel
curses.noecho()
curses.curs_set(0)
return cvalue # Return the current value unchanged
elif key == ord('\n'): # Enter key to validate and save
# Validate IP address
octets = user_input.split(".")
if len(octets) == 4 and all(octet.isdigit() and 0 <= int(octet) <= 255 for octet in octets):
curses.noecho()
curses.curs_set(0)
fixed32_address = ipaddress.ip_address(user_input)
return int(fixed32_address) # Return the valid IP address
else:
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", curses.A_BOLD | curses.color_pair(5))
fixed32_win.refresh()
curses.napms(1500) # Wait for 1.5 seconds before refreshing
user_input = "" # Clear invalid input
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
user_input = user_input[:-1]
else:
try:
char = chr(key)
if char.isdigit() or char == ".":
user_input += char # Append only valid characters (digits or dots)
except ValueError:
pass # Ignore invalid inputs

44
main.py
View File

@@ -9,12 +9,14 @@ V 1.0.1
import curses
from pubsub import pub
import os
import logging
import traceback
from utilities.arg_parser import setup_parser
from utilities.interfaces import initialize_interface
from message_handlers.rx_handler import on_receive
from ui.curses_ui import main_ui, draw_splash
from utilities.utils import get_channels
from utilities.utils import get_channels, get_node_list, get_nodeNum
from db_handler import init_nodedb, load_messages_from_db
import globals
@@ -23,17 +25,35 @@ os.environ["NCURSES_NO_UTF8_ACS"] = "1"
os.environ["TERM"] = "screen"
os.environ["LANG"] = "C.UTF-8"
def main(stdscr):
draw_splash(stdscr)
parser = setup_parser()
args = parser.parse_args()
globals.interface = initialize_interface(args)
globals.channel_list = get_channels()
pub.subscribe(on_receive, 'meshtastic.receive')
init_nodedb()
load_messages_from_db()
main_ui(stdscr)
# Configure logging
# Run `tail -f client.log` in another terminal to view live
logging.basicConfig(
filename=globals.log_file_path,
level=logging.INFO, # DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s"
)
def main(stdscr):
try:
draw_splash(stdscr)
parser = setup_parser()
args = parser.parse_args()
globals.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, 'meshtastic.receive')
init_nodedb()
load_messages_from_db()
main_ui(stdscr)
except Exception as e:
logging.error("An error occurred: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
raise
if __name__ == "__main__":
curses.wrapper(main)
try:
curses.wrapper(main)
except Exception as e:
logging.error("Fatal error in curses wrapper: %s", e)
logging.error("Traceback: %s", traceback.format_exc())

View File

@@ -1,5 +1,5 @@
from meshtastic import BROADCAST_NUM
from utilities.utils import get_node_list, decimal_to_hex, get_nodeNum
from utilities.utils import get_node_list, decimal_to_hex, get_name_from_number
import globals
from ui.curses_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
from db_handler import save_message_to_db, maybe_store_nodeinfo_in_db
@@ -17,50 +17,61 @@ def on_receive(packet, interface):
if globals.display_log:
draw_packetlog_win()
try:
if 'decoded' in packet and packet['decoded']['portnum'] == 'NODEINFO_APP':
if 'decoded' not in packet:
return
# Assume any incoming packet could update the last seen time for a node, so we
# may need to reorder the list. This could probably be limited to specific packets.
new_node_list = get_node_list()
if(new_node_list != globals.node_list):
globals.node_list = new_node_list
draw_node_list()
if packet['decoded']['portnum'] == 'NODEINFO_APP':
if "user" in packet['decoded'] and "longName" in packet['decoded']["user"]:
get_node_list()
draw_node_list()
maybe_store_nodeinfo_in_db(packet)
elif 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP':
elif packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP':
message_bytes = packet['decoded']['payload']
message_string = message_bytes.decode('utf-8')
refresh_channels = False
refresh_messages = False
if packet.get('channel'):
channel_number = packet['channel']
else:
channel_number = 0
myNodeNum = get_nodeNum()
if packet['to'] == myNodeNum:
if packet['to'] == globals.myNodeNum:
if packet['from'] in globals.channel_list:
pass
else:
globals.channel_list.append(packet['from'])
globals.all_messages[packet['from']] = []
draw_channel_list()
refresh_channels = True
channel_number = globals.channel_list.index(packet['from'])
if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
# Add received message to the messages list
message_from_id = packet['from']
message_from_string = ""
for node in globals.interface.nodes.values():
if message_from_id == node['num']:
message_from_string = node["user"]["shortName"] + ":" # Get the name using the node ID
break
else:
message_from_string = str(decimal_to_hex(message_from_id)) # If long name not found, use the ID as string
if globals.channel_list[channel_number] in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]].append((f"{globals.message_prefix} {message_from_string} ", message_string))
else:
globals.all_messages[globals.channel_list[channel_number]] = [(f"{globals.message_prefix} {message_from_string} ", message_string)]
message_from_string = get_name_from_number(message_from_id, type='short') + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append((f"{globals.message_prefix} {message_from_string} ", message_string))
if(refresh_channels):
draw_channel_list()
if(refresh_messages):
draw_messages_window()
draw_channel_list()
draw_messages_window()
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
except KeyError as e:

View File

@@ -1,7 +1,9 @@
from meshtastic import BROADCAST_NUM
from db_handler import save_message_to_db
from utilities.utils import get_nodeNum
from db_handler import save_message_to_db, update_ack_nak
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from utilities.utils import get_name_from_number
import globals
import google.protobuf.json_format
ack_naks = {}
@@ -17,21 +19,102 @@ def onAckNak(packet):
message = globals.all_messages[acknak['channel']][acknak['messageIndex']][1]
confirm_string = " "
ack_type = None
if(packet['decoded']['routing']['errorReason'] == "NONE"):
if(packet['from'] == get_nodeNum()): # Ack "from" ourself means implicit ACK
confirm_string = "[◌]"
if(packet['from'] == globals.myNodeNum): # Ack "from" ourself means implicit ACK
confirm_string = globals.ack_implicit_str
ack_type = "Implicit"
else:
confirm_string = "[✓]"
confirm_string = globals.ack_str
ack_type = "Ack"
else:
confirm_string = "[x]"
confirm_string = globals.nak_str
ack_type = "Nak"
globals.all_messages[acknak['channel']][acknak['messageIndex']] = (globals.sent_message_prefix + confirm_string + ": ", message)
draw_messages_window()
update_ack_nak(acknak['channel'], acknak['timestamp'], message, ack_type)
channel_number = globals.channel_list.index(acknak['channel'])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
draw_messages_window()
def on_response_traceroute(packet):
"""on response for trace route"""
from ui.curses_ui import draw_channel_list, draw_messages_window, add_notification
refresh_channels = False
refresh_messages = False
UNK_SNR = -128 # Value representing unknown SNR
route_discovery = mesh_pb2.RouteDiscovery()
route_discovery.ParseFromString(packet["decoded"]["payload"])
msg_dict = google.protobuf.json_format.MessageToDict(route_discovery)
msg_str = "Traceroute to:\n"
route_str = get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
# SNR list should have one more entry than the route, as the final destination adds its SNR also
lenTowards = 0 if "route" not in msg_dict else len(msg_dict["route"])
snrTowardsValid = "snrTowards" in msg_dict and len(msg_dict["snrTowards"]) == lenTowards + 1
if lenTowards > 0: # Loop through hops in route and add SNR if available
for idx, node_num in enumerate(msg_dict["route"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrTowards"][idx] / 4) if snrTowardsValid and msg_dict["snrTowards"][idx] != UNK_SNR else "?") + "dB)"
# End with origin of response
route_str += " --> " + (get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}") \
+ " (" + (str(msg_dict["snrTowards"][-1] / 4) if snrTowardsValid and msg_dict["snrTowards"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route towards destination
# Only if hopStart is set and there is an SNR entry (for the origin) it's valid, even though route might be empty (direct connection)
lenBack = 0 if "routeBack" not in msg_dict else len(msg_dict["routeBack"])
backValid = "hopStart" in packet and "snrBack" in msg_dict and len(msg_dict["snrBack"]) == lenBack + 1
if backValid:
msg_str += "Back:\n"
route_str = get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
if lenBack > 0: # Loop through hops in routeBack and add SNR if available
for idx, node_num in enumerate(msg_dict["routeBack"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrBack"][idx] / 4) if msg_dict["snrBack"][idx] != UNK_SNR else "?") + "dB)"
# End with destination of response (us)
route_str += " --> " + (get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}") \
+ " (" + (str(msg_dict["snrBack"][-1] / 4) if msg_dict["snrBack"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route back to us
if(packet['from'] not in globals.channel_list):
globals.channel_list.append(packet['from'])
refresh_channels = True
channel_number = globals.channel_list.index(packet['from'])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
refresh_channels = True
message_from_string = get_name_from_number(packet['from'], type='short') + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append((f"{globals.message_prefix} {message_from_string}", msg_str))
if(refresh_channels):
draw_channel_list()
if(refresh_messages):
draw_messages_window()
save_message_to_db(globals.channel_list[channel_number], packet['from'], msg_str)
def send_message(message, destination=BROADCAST_NUM, channel=0):
myid = get_nodeNum()
myid = globals.myNodeNum
send_on_channel = 0
channel_id = globals.channel_list[channel]
if isinstance(channel_id, int):
@@ -50,12 +133,23 @@ def send_message(message, destination=BROADCAST_NUM, channel=0):
)
# Add sent message to the messages dictionary
if channel_id in globals.all_messages:
globals.all_messages[channel_id].append((globals.sent_message_prefix + "[…]: ", message))
else:
globals.all_messages[channel_id] = [(globals.sent_message_prefix + "[…]: ", message)]
if channel_id not in globals.all_messages:
globals.all_messages[channel_id] = []
ack_naks[sent_message_data.id] = {'channel' : channel_id, 'messageIndex' : len(globals.all_messages[channel_id]) - 1 }
globals.all_messages[channel_id].append((globals.sent_message_prefix + globals.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message)
save_message_to_db(channel_id, myid, message)
ack_naks[sent_message_data.id] = {'channel' : channel_id, 'messageIndex' : len(globals.all_messages[channel_id]) - 1, 'timestamp' : timestamp }
def send_traceroute():
r = mesh_pb2.RouteDiscovery()
globals.interface.sendData(
r,
destinationId=globals.node_list[globals.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,
channelIndex=0,
hopLimit=3,
)

134
save_to_radio.py Normal file
View File

@@ -0,0 +1,134 @@
from meshtastic.protobuf import channel_pb2
from google.protobuf.message import Message
import logging
import base64
def settings_reboot(interface):
interface.localNode.reboot()
def settings_reset_nodedb(interface):
interface.localNode.resetNodeDb()
def settings_shutdown(interface):
interface.localNode.shutdown()
def settings_factory_reset(interface):
interface.localNode.factoryReset()
def settings_set_owner(interface, long_name=None, short_name=None, is_licensed=False):
if isinstance(is_licensed, str):
is_licensed = is_licensed.lower() == 'true'
interface.localNode.setOwner(long_name, short_name, is_licensed)
def save_changes(interface, menu_path, modified_settings):
"""
Save changes to the device based on modified settings.
:param interface: Meshtastic interface instance
:param menu_path: Current menu path
:param modified_settings: Dictionary of modified settings
"""
try:
if not modified_settings:
logging.info("No changes to save. modified_settings is empty.")
return
node = interface.getNode('^local')
if menu_path[1] == "Radio Settings" or menu_path[1] == "Module Settings":
config_category = menu_path[2].lower() # for radio and module configs
elif menu_path[1] == "User Settings": # for user configs
config_category = "User Settings"
long_name = modified_settings.get("longName", None)
short_name = modified_settings.get("shortName", None)
#TODO add is_licensed
node.setOwner(long_name, short_name, is_licensed=False)
logging.info(f"Updated {config_category} with Long Name: {long_name} and Short Name {short_name}")
return
elif menu_path[1] == "Channels": # for channel configs
config_category = "Channels"
try:
channel = menu_path[-1]
channel_num = int(channel.split()[-1])
except (IndexError, ValueError) as e:
channel_num = None
channel = node.channels[channel_num]
for key, value in modified_settings.items():
if key == 'psk': # Special case: decode Base64 for psk
channel.settings.psk = base64.b64decode(value)
elif key == 'position_precision': # Special case: module_settings
channel.settings.module_settings.position_precision = value
else:
setattr(channel.settings, key, value) # Use setattr for other fields
if channel_num == 0:
channel.role = channel_pb2.Channel.Role.PRIMARY
else:
channel.role = channel_pb2.Channel.Role.SECONDARY
node.writeChannel(channel_num)
logging.info(f"Updated Channel {channel_num} in {config_category}")
logging.info(node.channels)
return
else:
config_category = None
for config_item, new_value in modified_settings.items():
# Check if the category exists in localConfig
if hasattr(node.localConfig, config_category):
config_subcategory = getattr(node.localConfig, config_category)
# Check if the category exists in moduleConfig
elif hasattr(node.moduleConfig, config_category):
config_subcategory = getattr(node.moduleConfig, config_category)
else:
logging.warning(f"Config category '{config_category}' not found in config.")
continue
# Check if the config_item exists in the subcategory
if hasattr(config_subcategory, config_item):
field = getattr(config_subcategory, config_item)
try:
if isinstance(field, (int, float, str, bool)): # Direct field types
setattr(config_subcategory, config_item, new_value)
logging.info(f"Updated {config_category}.{config_item} to {new_value}")
elif isinstance(field, Message): # Handle protobuf sub-messages
if isinstance(new_value, dict): # If new_value is a dictionary
for sub_field, sub_value in new_value.items():
if hasattr(field, sub_field):
setattr(field, sub_field, sub_value)
logging.info(f"Updated {config_category}.{config_item}.{sub_field} to {sub_value}")
else:
logging.warning(f"Sub-field '{sub_field}' not found in {config_category}.{config_item}")
else:
logging.warning(f"Invalid value for {config_category}.{config_item}. Expected dict.")
else:
logging.warning(f"Unsupported field type for {config_category}.{config_item}.")
except AttributeError as e:
logging.error(f"Failed to update {config_category}.{config_item}: {e}")
else:
logging.warning(f"Config item '{config_item}' not found in config category '{config_category}'.")
# Write the configuration changes to the node
try:
node.writeConfig(config_category)
logging.info(f"Changes written to config category: {config_category}")
except Exception as e:
logging.error(f"Failed to write configuration for category '{config_category}': {e}")
node.writeConfig(config_category)
logging.info(f"Changes written to config category: {config_category}")
except Exception as e:
logging.error(f"Error saving changes: {e}")

View File

@@ -1,801 +1,247 @@
import curses
import ipaddress
import meshtastic.serial_interface
import meshtastic.serial_interface, meshtastic.tcp_interface
from meshtastic.protobuf import config_pb2, module_config_pb2, mesh_pb2, channel_pb2
import globals
from save_to_radio import settings_factory_reset, settings_reboot, settings_reset_nodedb, settings_shutdown
from ui.menus import generate_menu_from_protobuf
from input_handlers import get_bool_selection, get_repeated_input, get_user_input, get_enum_input, get_fixed32_input
from save_to_radio import save_changes
from ui.colors import setup_colors
def display_enum_menu(stdscr, enum_values, menu_item):
menu_height = len(enum_values) + 2
menu_width = max(len(option) for option in enum_values) + 4
y_start = (curses.LINES - menu_height) // 2
x_start = (curses.COLS - menu_width) // 2
import logging
# Maximum number of rows to display
max_rows = 10
# Calculate popup window dimensions and position
popup_height = min(len(enum_values), max_rows) + 2
popup_width = max(len(option) for option in enum_values) + 6
y_start = (curses.LINES - popup_height) // 2
x_start = (curses.COLS - popup_width) // 2
def display_menu(current_menu, menu_path, selected_index, show_save_option):
global menu_win
# Create the popup window
try:
popup_win = curses.newwin(popup_height, popup_width, y_start, x_start)
except curses.error as e:
print("Error occurred while initializing curses window:", e)
# Calculate the dynamic height based on the number of menu items
num_items = len(current_menu) + (1 if show_save_option else 0) # Add 1 for the "Save Changes" option if applicable
height = min(curses.LINES - 2, num_items + 5) # Ensure the menu fits within the terminal height
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
# Enable keypad mode
popup_win.keypad(True)
# Create a new curses window with dynamic dimensions
menu_win = curses.newwin(height, width, start_y, start_x)
menu_win.clear()
menu_win.border()
menu_win.keypad(True)
# Display enum values in the popup window
start_index = 0 # Starting index of displayed items
while True:
popup_win.clear()
popup_win.border()
# Display the current menu path as a header
header = " > ".join(word.title() for word in menu_path)
if len(header) > width - 4:
header = header[:width - 7] + "..."
menu_win.addstr(1, 2, header, curses.A_BOLD)
# Calculate the starting index based on the menu item and window size
if menu_item >= start_index + max_rows:
start_index += 1
elif menu_item < start_index:
start_index -= 1
# Display the menu options
for idx, option in enumerate(current_menu):
field_info = current_menu[option]
current_value = field_info[1] if isinstance(field_info, tuple) else ""
display_option = f"{option}"[:width // 2 - 2] # Truncate option name if too long``
display_value = f"{current_value}"[:width // 2 - 4] # Truncate value if too long
# Display enum values within the window height
for i in range(min(len(enum_values) - start_index, max_rows)):
option_index = start_index + i
if option_index == menu_item:
popup_win.addstr(i + 1, 2, enum_values[option_index], curses.A_REVERSE)
try:
# Use red color for "Reboot" or "Shutdown"
color = curses.color_pair(5) if option in ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"] else curses.color_pair(1)
if idx == selected_index:
menu_win.addstr(idx + 3, 4, f"{display_option:<{width // 2 - 2}} {display_value}", curses.A_REVERSE | color)
else:
popup_win.addstr(i + 1, 2, enum_values[option_index])
menu_win.addstr(idx + 3, 4, f"{display_option:<{width // 2 - 2}} {display_value}", color)
except curses.error:
pass
popup_win.refresh()
char = popup_win.getch()
if char == curses.KEY_DOWN:
if menu_item < len(enum_values) - 1:
menu_item += 1
elif char == curses.KEY_UP:
if menu_item > 0:
menu_item -= 1
elif char == ord('\n'):
selected_option = enum_values[menu_item]
popup_win.clear()
popup_win.refresh()
return selected_option, True
elif char == 27 or char == curses.KEY_LEFT: # Check if escape key is pressed
curses.curs_set(0)
popup_win.refresh()
return None, False
def get_string_input(stdscr, setting_string):
popup_height = 5
popup_width = 40
y_start = (curses.LINES - popup_height) // 2
x_start = (curses.COLS - popup_width) // 2
try:
input_win = curses.newwin(popup_height, popup_width, y_start, x_start)
except curses.error as e:
print("Error occurred while initializing curses window:", e)
input_win.border()
input_win.keypad(True)
input_win.refresh()
input_win.addstr(1, 1, str(setting_string)) # Prepopulate input field with the setting value
input_win.refresh()
# Get user input
curses.curs_set(1)
input_text = ""
while True:
# Display the current input text
input_win.addstr(1, 1, input_text)
input_win.border()
input_win.refresh()
# Get a character from the user
key = stdscr.getch()
if key == curses.KEY_ENTER or key == 10 or key == 13: # Enter key
curses.curs_set(0)
input_win.clear()
input_win.refresh()
return input_text, True
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
# Delete the last character from input_text
input_text = input_text[:-1]
elif 32 <= key <= 126: # Printable ASCII characters
# Append the character to input_text
input_text += chr(key)
elif key == 27: # Check if escape key is pressed
curses.curs_set(0)
input_win.refresh()
return None, False
input_win.clear()
input_win.refresh()
def get_uint_input(stdscr, setting_string):
popup_height = 5
popup_width = 40
y_start = (curses.LINES - popup_height) // 2
x_start = (curses.COLS - popup_width) // 2
try:
input_win = curses.newwin(popup_height, popup_width, y_start, x_start)
except curses.error as e:
print("Error occurred while initializing curses window:", e)
input_win.border()
input_win.keypad(True)
input_win.refresh()
input_win.addstr(1, 1, str(setting_string)) # Prepopulate input field with the setting value
input_win.refresh()
curses.curs_set(1)
input_text = ""
while True:
# Display the current input text
input_win.addstr(1, 1, input_text)
input_win.border()
input_win.refresh()
# Get a character from the user
key = stdscr.getch()
if key == curses.KEY_ENTER or key == 10 or key == 13: # Enter key
curses.curs_set(0)
input_win.clear()
input_win.refresh()
return int(input_text), True
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
# Delete the last character from input_text
input_text = input_text[:-1]
elif 48 <= key <= 57: # Numbers(ASCII range)
# Append the character to input_text
input_text += chr(key)
elif key == 27 or key == curses.KEY_LEFT: # Check if escape key is pressed
curses.curs_set(0)
input_win.refresh()
return None, False
input_win.clear()
input_win.refresh()
def get_uint32_list_input(stdscr, setting_string):
setting_string = [str(num) for num in setting_string]
popup_height = 8 # Increased height to accommodate three lines
popup_width = 40
y_start = (curses.LINES - popup_height) // 2
x_start = (curses.COLS - popup_width) // 2
try:
input_win = curses.newwin(popup_height, popup_width, y_start, x_start)
except curses.error as e:
print("Error occurred while initializing curses window:", e)
input_win.border()
input_win.keypad(True)
input_win.refresh()
input_text = setting_string[:] # Copy the input strings
curses.curs_set(0)
while True:
# Display the current input text for each line
for i, line in enumerate(input_text):
input_win.addstr(1 + i, 1, line)
input_win.border()
input_win.refresh()
# Get a character from the user
key = stdscr.getch()
if key == curses.KEY_ENTER or key == 10 or key == 13: # Enter key
input_win.clear()
input_win.refresh()
return None, False # TODO allow setting this
# return input_text, True
# elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
# # Delete the last character from the current line's input_text
# current_y, current_x = input_win.getyx()
# input_text[current_y - 1] = input_text[current_y - 1][:-1]
# elif (48 <= key <= 57) or key == 44: # Numbers and comma (ASCII range)
# # Append the character to the current line's input_text
# current_y, current_x = input_win.getyx()
# input_text[current_y - 1] += chr(key)
elif key == 27 or key == curses.KEY_LEFT: # Check if escape key is pressed
curses.curs_set(0)
input_win.refresh()
return None, False
input_win.clear()
input_win.refresh()
def get_float_input(stdscr, setting_string):
popup_height = 5
popup_width = 40
y_start = (curses.LINES - popup_height) // 2
x_start = (curses.COLS - popup_width) // 2
try:
input_win = curses.newwin(popup_height, popup_width, y_start, x_start)
except curses.error as e:
print("Error occurred while initializing curses window:", e)
input_win.border()
input_win.keypad(True)
input_win.refresh()
input_win.addstr(1, 1, str(setting_string)) # Prepopulate input field with the setting value
input_win.refresh()
curses.curs_set(1)
input_text = ""
while True:
# Display the current input text
input_win.addstr(1, 1, input_text)
input_win.border()
input_win.refresh()
# Get a character from the user
key = stdscr.getch()
if key == curses.KEY_ENTER or key == 10 or key == 13: # Enter key
curses.curs_set(0)
input_win.clear()
input_win.refresh()
return float(input_text), True
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
# Delete the last character from input_text
input_text = input_text[:-1]
elif (48 <= key <= 57) or key == 46: # Numbers and decimal point (ASCII range)
# Append the character to input_text
input_text += chr(key)
elif key == 27 or key == curses.KEY_LEFT: # Check if escape key is pressed
curses.curs_set(0)
input_win.refresh()
return None, False
input_win.clear()
input_win.refresh()
def ip_to_fixed32(ip):
# Parse the IP address
ip_obj = ipaddress.ip_address(ip)
# Convert IP address to 32-bit integer
return int(ip_obj)
def fixed32_to_ip(fixed32):
# Convert 32-bit integer to IPv4Address object
ip_obj = ipaddress.IPv4Address(fixed32)
# Convert IPv4Address object to string representation
return str(ip_obj)
def get_fixed32_input(stdscr, setting_string):
popup_height = 5
popup_width = 40
y_start = (curses.LINES - popup_height) // 2
x_start = (curses.COLS - popup_width) // 2
try:
input_win = curses.newwin(popup_height, popup_width, y_start, x_start)
except curses.error as e:
print("Error occurred while initializing curses window:", e)
input_win.border()
input_win.keypad(True)
input_win.refresh()
input_win.addstr(1, 1, fixed32_to_ip(setting_string)) # Prepopulate input field with the setting value
input_win.refresh()
curses.curs_set(1)
input_text = ""
while True:
# Display the current input text
input_win.addstr(1, 1, input_text)
input_win.border()
input_win.refresh()
# Get a character from the user
key = stdscr.getch()
if key == curses.KEY_ENTER or key == 10 or key == 13: # Enter key
curses.curs_set(0)
input_win.clear()
input_win.refresh()
return ip_to_fixed32(input_text), True
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
# Delete the last character from input_text
input_text = input_text[:-1]
elif 48 <= key <= 57 or key == 46: # Numbers + period (ASCII range)
# Append the character to input_text
input_text += chr(key)
elif key == 27 or key == curses.KEY_LEFT: # Check if escape key is pressed
curses.curs_set(0)
input_win.refresh()
return None, False
input_win.clear()
input_win.refresh()
def display_bool_menu(stdscr, setting_value):
bool_options = ["False", "True"]
return display_enum_menu(stdscr, bool_options, setting_value)
def generate_menu_from_protobuf(message_instance):
if not hasattr(message_instance, "DESCRIPTOR"):
return # This is not a protobuf message instance, exit
menu = {}
field_names = message_instance.DESCRIPTOR.fields_by_name.keys()
for field_name in field_names:
field_descriptor = message_instance.DESCRIPTOR.fields_by_name[field_name]
if field_descriptor is not None:
nested_message_instance = getattr(message_instance, field_name)
menu[field_name] = generate_menu_from_protobuf(nested_message_instance)
return menu
def change_setting(stdscr, menu_path):
node = globals.interface.localNode
field_descriptor = None
setting_value = 0
stdscr.clear()
stdscr.border()
stdscr.refresh()
menu_header(stdscr, f"{menu_path[-1]}")
# Determine the level of nesting based on the length of menu_path
if menu_path[1] == "User Settings":
n = globals.interface.getMyNodeInfo()
setting_string = n['user'].get(snake_to_camel(menu_path[2]), 0)
if menu_path[2] == "is_licensed":
setting_value, do_change_setting = display_bool_menu(stdscr, setting_string)
# Show save option if applicable
if show_save_option:
save_option = "Save Changes"
save_position = height - 2
if selected_index == len(current_menu):
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, curses.color_pair(2) | curses.A_REVERSE)
else:
setting_value, do_change_setting = get_string_input(stdscr, setting_string)
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, curses.color_pair(2))
if not do_change_setting:
stdscr.clear()
stdscr.border()
menu_path.pop()
return
menu_win.refresh()
if menu_path[2] in ["long_name", "short_name"]:
if menu_path[2] == "short_name" and len(setting_value) > 4:
setting_value = setting_value[:4]
settings_set_owner(long_name=setting_value if menu_path[2] == "long_name" else None,
short_name=setting_value if menu_path[2] == "short_name" else None)
elif menu_path[2] == "is_licensed":
ln = n['user']['longName']
settings_set_owner(long_name=ln, is_licensed=setting_value)
def settings_menu(sdscr, interface):
stdscr.clear()
stdscr.border()
menu_path.pop()
return
if menu_path[1] == "Channels":
return
# n = interface.getChannelByChannelIndex()
# setting_string = getattr(getattr(n.channelSettings, str(menu_path[2])), menu_path[3])
# field_descriptor = getattr(n.channelSettings, menu_path[2]).DESCRIPTOR.fields_by_name[menu_path[3]]
if len(menu_path) == 4:
if menu_path[1] == "Radio Settings":
setting_string = getattr(getattr(node.localConfig, str(menu_path[2])), menu_path[3])
field_descriptor = getattr(node.localConfig, menu_path[2]).DESCRIPTOR.fields_by_name[menu_path[3]]
elif menu_path[1] == "Module Settings":
setting_string = getattr(getattr(node.moduleConfig, str(menu_path[2])), menu_path[3])
field_descriptor = getattr(node.moduleConfig, menu_path[2]).DESCRIPTOR.fields_by_name[menu_path[3]]
elif len(menu_path) == 5:
if menu_path[1] == "Radio Settings":
setting_string = getattr(getattr(getattr(node.localConfig, str(menu_path[2])), menu_path[3]), menu_path[4])
field_descriptor = getattr(getattr(node.localConfig, menu_path[2]), menu_path[3]).DESCRIPTOR.fields_by_name[menu_path[4]]
elif menu_path[1] == "Module Settings":
setting_string = getattr(getattr(getattr(node.moduleConfig, str(menu_path[2])), menu_path[3]), menu_path[4])
field_descriptor = getattr(getattr(node.moduleConfig, menu_path[2]), menu_path[3]).DESCRIPTOR.fields_by_name[menu_path[4]]
if field_descriptor.enum_type is not None:
enum_values = [enum_value.name for enum_value in field_descriptor.enum_type.values]
enum_option, do_change_setting = display_enum_menu(stdscr, enum_values, setting_string)
setting_value = enum_option
elif field_descriptor.type == 8: # Field type 8 corresponds to BOOL
setting_value, do_change_setting = display_bool_menu(stdscr, setting_string)
elif field_descriptor.type == 9: # Field type 9 corresponds to STRING
setting_value, do_change_setting = get_string_input(stdscr, setting_string)
elif field_descriptor.type == 2: # Field type 2 corresponds to FLOAT
setting_value, do_change_setting = get_float_input(stdscr, setting_string)
elif field_descriptor.type == 13: # Field type 13 corresponds to UINT32
if field_descriptor.label == field_descriptor.LABEL_REPEATED:
setting_value, do_change_setting = get_uint32_list_input(stdscr, setting_string)
else:
setting_value, do_change_setting = get_uint_input(stdscr, setting_string)
elif field_descriptor.type == 7: # Field type 7 corresponds to FIXED32
setting_value, do_change_setting = get_fixed32_input(stdscr, setting_string)
else:
menu_path.pop()
return
if not do_change_setting:
stdscr.clear()
stdscr.border()
menu_path.pop()
return
# formatted_text = f"{menu_path[2]}.{menu_path[3]} = {setting_value}"
# menu_header(stdscr,formatted_text,2)
ourNode = globals.interface.localNode
# Convert "true" to 1, "false" to 0, leave other values as they are
if setting_value == "True" or setting_value == "1":
setting_value_int = 1
elif setting_value == "False" or setting_value == "0":
setting_value_int = 0
else:
# If setting_value is not "true" or "false", keep it as it is
setting_value_int = setting_value
# if isinstance(setting_value_int, list):
# value_string = ', '.join(str(item) for item in setting_value_int)
# setting_value_int = value_string
try:
if len(menu_path) == 4:
if menu_path[1] == "Radio Settings":
setattr(getattr(ourNode.localConfig, menu_path[2]), menu_path[3], setting_value_int)
elif menu_path[1] == "Module Settings":
setattr(getattr(ourNode.moduleConfig, menu_path[2]), menu_path[3], setting_value_int)
elif len(menu_path) == 5:
if menu_path[1] == "Radio Settings":
setattr(getattr(getattr(ourNode.localConfig, menu_path[2]), menu_path[3]), menu_path[4], setting_value_int)
elif menu_path[1] == "Module Settings":
setattr(getattr(getattr(ourNode.moduleConfig, menu_path[2]), menu_path[3]), menu_path[4], setting_value_int)
except AttributeError as e:
print("Error setting attribute:", e)
ourNode.writeConfig(menu_path[2])
menu_path.pop()
def snake_to_camel(snake_str):
components = snake_str.split('_')
return components[0] + ''.join(x.title() for x in components[1:])
def display_values(stdscr, key_list, menu_path):
node = globals.interface.localNode
user_settings = ["long_name", "short_name", "is_licensed"]
for i, key in enumerate(key_list):
if len(menu_path) == 2:
if menu_path[1] == 'User Settings':
n = globals.interface.getMyNodeInfo()
try:
setting = n['user'][snake_to_camel(key_list[i])]
except:
setting = None
if key_list[i] in user_settings:
stdscr.addstr(i+3, 40, str(setting))
if len(menu_path) == 3:
if menu_path[1] == "Radio Settings":
setting = getattr(getattr(node.localConfig, menu_path[2]), key_list[i])
if menu_path[1] == "Module Settings":
setting = getattr(getattr(node.moduleConfig, menu_path[2]), key_list[i])
stdscr.addstr(i+3, 40, str(setting)[:14])
if len(menu_path) == 4:
if menu_path[1] == "Radio Settings":
setting = getattr(getattr(getattr(node.localConfig, menu_path[2]), menu_path[3]), key_list[i])
if menu_path[1] == "Module Settings":
setting = getattr(getattr(getattr(node.moduleConfig, menu_path[2]), menu_path[3]), key_list[i])
stdscr.addstr(i+3, 40, str(setting)[:14])
stdscr.refresh()
def menu_header(window, text, start_y=1):
window.clear()
window.box()
window.refresh()
_, window_width = window.getmaxyx()
start_x = (window_width - len(text)) // 2
formatted_text = text.replace('_', ' ').title()
window.addstr(start_y, start_x, formatted_text)
window.refresh()
def nested_menu(stdscr, menu):
menu_item = 0
current_menu = menu
prev_menu = []
menu_index = 0
next_key = None
key_list = []
menu = generate_menu_from_protobuf(interface)
current_menu = menu["Main Menu"]
menu_path = ["Main Menu"]
last_menu_level = False
selected_index = 0
modified_settings = {}
while True:
if current_menu is not None:
menu_header(stdscr, f"{menu_path[menu_index]}")
options = list(current_menu.keys())
# Display current menu
for i, key in enumerate(current_menu.keys(), start=0):
if i == menu_item:
if key in ["Reboot", "Reset NodeDB", "Shutdown", "Factory Reset"]:
stdscr.addstr(i+3, 1, key, curses.color_pair(5))
else:
stdscr.addstr(i+3, 1, key, curses.A_REVERSE)
else:
stdscr.addstr(i+3, 1, key)
show_save_option = (
len(menu_path) > 2 and ("Radio Settings" in menu_path or "Module Settings" in menu_path)
) or (
len(menu_path) == 2 and "User Settings" in menu_path
) or (
len(menu_path) == 3 and "Channels" in menu_path
)
# Display current values
display_values(stdscr, key_list, menu_path)
# Display the menu
display_menu(current_menu, menu_path, selected_index, show_save_option)
char = stdscr.getch()
# Capture user input
key = menu_win.getch()
selected_key = list(current_menu.keys())[menu_item]
selected_value = current_menu[selected_key]
if key == curses.KEY_UP:
selected_index = max(0, selected_index - 1)
elif key == curses.KEY_DOWN:
max_index = len(options) + (1 if show_save_option else 0) - 1
selected_index = min(max_index, selected_index + 1)
if char == curses.KEY_DOWN:
if last_menu_level == True:
last_menu_level = False
menu_item = min(len(current_menu) - 1, menu_item + 1)
elif key == curses.KEY_RIGHT or key == ord('\n'):
menu_win.clear()
menu_win.refresh()
if show_save_option and selected_index == len(options):
save_changes(interface, menu_path, modified_settings)
modified_settings.clear()
logging.info("Changes Saved")
elif char == curses.KEY_UP:
if last_menu_level == True:
last_menu_level = False
menu_item = max(0, menu_item - 1)
elif char == curses.KEY_RIGHT:
# if selected_key == "Region":
# settings_region()
# break
if selected_key == "Channels":
channels_editor(stdscr)
elif selected_key not in ["Reboot", "Reset NodeDB", "Shutdown", "Factory Reset"]:
menu_path.append(selected_key)
if isinstance(selected_value, dict):
# If the selected item is a submenu, navigate to it
prev_menu.append(current_menu)
menu_index += 1
current_menu = selected_value
menu_item = 0
last_menu_level = False
else:
last_menu_level = True
elif char == curses.KEY_LEFT:
if last_menu_level == True:
last_menu_level = False
if len(menu_path) > 1:
menu_path.pop()
current_menu = prev_menu[menu_index-1]
del prev_menu[menu_index-1]
menu_index -= 1
menu_item = 0
current_menu = menu["Main Menu"]
for step in menu_path[1:]:
current_menu = current_menu.get(step, {})
selected_index = 0
elif char == ord('\n'):
if selected_key == "Channels":
channels_editor(stdscr)
if selected_key == "Reboot":
settings_reboot()
elif selected_key == "Reset NodeDB":
settings_reset_nodedb()
elif selected_key == "Shutdown":
settings_shutdown()
elif selected_key == "Factory Reset":
settings_factory_reset()
continue
elif selected_value is not None:
stdscr.refresh()
stdscr.getch()
elif char == 27: # escape to exit menu
selected_option = options[selected_index]
if selected_option == "Exit":
break
elif selected_option == "Reboot":
confirmation = get_bool_selection("Are you sure you want to Reboot?", 0)
if confirmation == "True":
settings_reboot(interface)
logging.info(f"Node Reboot Requested by menu")
continue
if char:
stdscr.clear()
stdscr.border()
elif selected_option == "Reset Node DB":
confirmation = get_bool_selection("Are you sure you want to Reset Node DB?", 0)
if confirmation == "True":
settings_reset_nodedb(interface)
logging.info(f"Node DB Reset Requested by menu")
continue
elif selected_option == "Shutdown":
confirmation = get_bool_selection("Are you sure you want to Shutdown?", 0)
if confirmation == "True":
settings_shutdown(interface)
logging.info(f"Node Shutdown Requested by menu")
continue
elif selected_option == "Factory Reset":
confirmation = get_bool_selection("Are you sure you want to Factory Reset?", 0)
if confirmation == "True":
settings_factory_reset(interface)
logging.info(f"Factory Reset Requested by menu")
continue
field_info = current_menu.get(selected_option)
next_key = list(current_menu.keys())[menu_item]
key_list = list(current_menu.keys())
if isinstance(field_info, tuple):
field, current_value = field_info
else:
break # Exit loop if current_menu is None
if selected_option == 'longName' or selected_option == 'shortName':
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
modified_settings[selected_option] = (new_value)
current_menu[selected_option] = (field, new_value)
if last_menu_level == True:
if not isinstance(current_menu.get(next_key), dict):
change_setting(stdscr, menu_path)
elif field.type == 8: # Handle boolean type
new_value = get_bool_selection(selected_option, str(current_value))
try:
# Validate and convert input to a valid boolean
if isinstance(new_value, str):
# Handle string representations of booleans
new_value_lower = new_value.lower()
if new_value_lower in ("true", "yes", "1", "on"):
new_value = True
elif new_value_lower in ("false", "no", "0", "off"):
new_value = False
else:
raise ValueError("Invalid string for boolean")
else:
# Convert other types directly to bool
new_value = bool(new_value)
except ValueError as e:
logging.info(f"Invalid input for boolean: {e}")
def settings(stdscr):
popup_height = 22
popup_width = 60
popup_win = None
y_start = (curses.LINES - popup_height) // 2
x_start = (curses.COLS - popup_width) // 2
elif field.label == field.LABEL_REPEATED: # Handle repeated field
new_value = get_repeated_input(current_value)
curses.curs_set(0)
try:
popup_win = curses.newwin(popup_height, popup_width, y_start, x_start)
except curses.error as e:
print("Error occurred while initializing curses window:", e)
elif field.enum_type: # Enum field
enum_options = [v.name for v in field.enum_type.values]
new_value = get_enum_input(enum_options, current_value)
popup_win.border()
popup_win.keypad(True)
# Generate menu from protobuf for both radio and module settings
elif field.type == 7: # Field type 7 corresponds to FIXED32
new_value = get_fixed32_input(current_value)
elif field.type == 13: # Field type 13 corresponds to UINT32
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else int(new_value)
elif field.type == 2: # Field type 13 corresponds to INT64
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else float(new_value)
user = mesh_pb2.User()
user_settings = ["long_name", "short_name", "is_licensed"]
user_config = generate_menu_from_protobuf(user)
user_config = {key: value for key, value in user_config.items() if key in user_settings}
else: # Handle other field types
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else new_value
# Navigate to the correct nested dictionary based on the menu_path
current_nested = modified_settings
for key in menu_path[3:]: # Skip "Main Menu"
current_nested = current_nested.setdefault(key, {})
channel = channel_pb2.ChannelSettings()
channel_config = generate_menu_from_protobuf(channel)
channel_config = [channel_config.copy() for i in range(8)]
# Add the new value to the appropriate level
current_nested[selected_option] = new_value
radio = config_pb2.Config()
radio_config = generate_menu_from_protobuf(radio)
module = module_config_pb2.ModuleConfig()
module_config = generate_menu_from_protobuf(module)
# Add top-level menu items
top_level_menu = {
"User Settings": user_config,
"Channels": None,
"Radio Settings": radio_config,
"Module Settings": module_config,
"Reboot": None,
"Reset NodeDB": None,
"Shutdown": None,
"Factory Reset": None
}
# Call nested_menu function to display and handle the nested menu
nested_menu(popup_win, top_level_menu)
# Close the popup window
popup_win.clear()
popup_win.refresh()
# def settings_region():
# selected_option, do_set = set_region()
# if do_set:
# ourNode = interface.localNode
# setattr(ourNode.localConfig.lora, "region", selected_option)
# ourNode.writeConfig("lora")
def settings_reboot():
globals.interface.localNode.reboot()
def settings_reset_nodedb():
globals.interface.localNode.resetNodeDb()
def settings_shutdown():
globals.interface.localNode.shutdown()
def settings_factory_reset():
globals.interface.localNode.factoryReset()
def settings_set_owner(long_name=None, short_name=None, is_licensed=False):
if is_licensed == 'True':
is_licensed = True
elif is_licensed == 'False':
is_licensed = False
globals.interface.localNode.setOwner(long_name, short_name, is_licensed)
def channels_editor(stdscr):
# Define the list of channels
channels = [f"{i}" for i in range(8)]
# Initialize menu item index and selected channel
menu_item = 0
selected_channel = channels[menu_item]
while True:
# Display the list of channels in the curses window
menu_header(stdscr, "Channels")
# Fetch and print roles for each channel
for index, channel_index in enumerate(channels):
channel = globals.interface.localNode.getChannelByChannelIndex(index)
role = "DISABLED" if channel.role == 0 else "PRIMARY" if channel.role == 1 else "SECONDARY"
channel_settings = channel.settings
channel_name = channel_settings.name
if not channel_name and role != "DISABLED":
config = globals.interface.localNode.localConfig
channel_name_int = config.lora.modem_preset
channel_name = config_pb2.Config.LoRaConfig.ModemPreset.Name(channel_name_int)
channel_name_formatted = f"{channel_name:<13}" # Adjust channel name to be 13 characters wide
role_formatted = f"{role:<9}" # Adjust role to be 9 characters wide (SECONDARY = 9 chars)
if index == menu_item:
stdscr.addstr(index + 3, 1, f"{channel_index} ", curses.A_REVERSE)
stdscr.addstr(index + 3, 3, f"{channel_name_formatted}", curses.A_REVERSE)
stdscr.addstr(index + 3, 15, f"{role_formatted}", curses.A_REVERSE)
# modified_settings[selected_option] = (new_value)
current_menu[selected_option] = (field, new_value)
else:
stdscr.addstr(index + 3, 1, f"{channel_index}")
stdscr.addstr(index + 3, 3, f"{channel_name_formatted}")
stdscr.addstr(index + 3, 15, f"{role_formatted}")
current_menu = current_menu[selected_option]
menu_path.append(selected_option)
selected_index = 0
stdscr.refresh()
elif key == curses.KEY_LEFT:
key = stdscr.getch()
menu_win.clear()
menu_win.refresh()
if key == curses.KEY_DOWN:
menu_item = min(len(channels) - 1, menu_item + 1)
elif key == curses.KEY_UP:
menu_item = max(0, menu_item - 1)
elif key == ord('\n'):
# Handle selection action
selected_channel = channels[menu_item]
# Perform action here, if needed
elif key == 27 or key == curses.KEY_LEFT: # escape to exit menu
modified_settings.clear()
# Navigate back to the previous menu
if len(menu_path) > 1:
menu_path.pop()
current_menu = menu["Main Menu"]
for step in menu_path[1:]:
current_menu = current_menu.get(step, {})
selected_index = 0
elif key == 27: # Escape key
break
selected_channel = channels[menu_item]
return selected_channel
def main(stdscr):
logging.basicConfig( # Run `tail -f client.log` in another terminal to view live
filename="settings.log",
level=logging.INFO, # DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s"
)
setup_colors()
curses.curs_set(0)
stdscr.keypad(True)
interface = meshtastic.serial_interface.SerialInterface()
stdscr.clear()
stdscr.refresh()
settings_menu(stdscr, interface)
if __name__ == "__main__":
globals.interface = meshtastic.serial_interface.SerialInterface()
# radio = config_pb2.Config()
# module = module_config_pb2.ModuleConfig()
# print(generate_menu_from_protobuf(radio, interface))
# print(generate_menu_from_protobuf(module, interface))
def main(stdscr):
stdscr.keypad(True)
while True:
settings(stdscr)
curses.wrapper(main)

9
ui/colors.py Normal file
View File

@@ -0,0 +1,9 @@
import curses
def setup_colors():
curses.start_color()
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.init_pair(5, curses.COLOR_RED, curses.COLOR_BLACK)

View File

@@ -1,10 +1,11 @@
import curses
import textwrap
import globals
from utilities.utils import get_node_list, get_name_from_number, get_channels
from settings import settings
from message_handlers.tx_handler import send_message
from utilities.utils import get_name_from_number, get_channels
from settings import settings_menu
from message_handlers.tx_handler import send_message, send_traceroute
import ui.dialog
from ui.colors import setup_colors
def add_notification(channel_number):
handle_notification(channel_number, add=True)
@@ -35,8 +36,7 @@ def draw_debug(value):
function_win.refresh()
def draw_splash(stdscr):
curses.start_color()
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) # Green text on black background
setup_colors()
curses.curs_set(0)
stdscr.clear()
@@ -49,9 +49,9 @@ def draw_splash(stdscr):
start_x = width // 2 - len(message_1) // 2
start_x2 = width // 2 - len(message_4) // 2
start_y = height // 2 - 1
stdscr.addstr(start_y, start_x, message_1, curses.color_pair(1) | curses.A_BOLD)
stdscr.addstr(start_y+1, start_x-1, message_2, curses.color_pair(1) | curses.A_BOLD)
stdscr.addstr(start_y+2, start_x-2, message_3, curses.color_pair(1) | curses.A_BOLD)
stdscr.addstr(start_y, start_x, message_1, curses.color_pair(2) | curses.A_BOLD)
stdscr.addstr(start_y+1, start_x-1, message_2, curses.color_pair(2) | curses.A_BOLD)
stdscr.addstr(start_y+2, start_x-2, message_3, curses.color_pair(2) | curses.A_BOLD)
stdscr.addstr(start_y+4, start_x2, message_4)
stdscr.box()
stdscr.refresh()
@@ -77,10 +77,10 @@ def draw_channel_list():
truncated_channel = channel[:win_width - 5] + '-' if len(channel) > win_width - 5 else channel
if i < win_height - 2 : # Check if there is enough space in the window
if start_index + i == globals.selected_channel and globals.current_window == 0:
channel_win.addstr(i + 1, 1, truncated_channel + notification, curses.color_pair(3))
channel_win.addstr(i + 1, 1, truncated_channel + notification, curses.color_pair(1) | curses.A_REVERSE)
remove_notification(globals.selected_channel)
else:
channel_win.addstr(i + 1, 1, truncated_channel + notification, curses.color_pair(4))
channel_win.addstr(i + 1, 1, truncated_channel + notification, curses.color_pair(1))
channel_win.box()
channel_win.refresh()
@@ -107,7 +107,23 @@ def draw_messages_window():
max_scroll_position = max(0, num_messages - max_messages)
start_index = max(0, min(globals.selected_message, max_scroll_position))
# Display messages starting from the calculated start index
# Dynamically calculate max_messages based on visible messages and wraps
row = 1
visible_message_count = 0
for index, (prefix, message) in enumerate(messages[start_index:], start=start_index):
full_message = f"{prefix}{message}"
wrapped_lines = textwrap.wrap(full_message, messages_win.getmaxyx()[1] - 2)
if row + len(wrapped_lines) - 1 > messages_win.getmaxyx()[0] - 2: # Check if it fits in the window
break
visible_message_count += 1
row += len(wrapped_lines)
# Adjust max_messages to match visible messages
max_messages = visible_message_count
# Re-render the visible messages
row = 1
for index, (prefix, message) in enumerate(messages[start_index:start_index + max_messages], start=start_index):
full_message = f"{prefix}{message}"
@@ -116,9 +132,9 @@ def draw_messages_window():
for line in wrapped_lines:
# Highlight the row if it's the selected message
if index == globals.selected_message and globals.current_window == 1:
color = curses.color_pair(3) # Highlighted row color
color = curses.A_REVERSE # Highlighted row color
else:
color = curses.color_pair(1) if prefix.startswith(globals.sent_message_prefix) else curses.color_pair(2)
color = curses.color_pair(4) if prefix.startswith(globals.sent_message_prefix) else curses.color_pair(3)
messages_win.addstr(row, 1, line, color)
row += 1
@@ -133,12 +149,12 @@ def draw_node_list():
win_height = nodes_win.getmaxyx()[0]
start_index = max(0, globals.selected_node - (win_height - 3)) # Calculate starting index based on selected node and window height
for i, node in enumerate(get_node_list()[start_index:], start=1):
for i, node in enumerate(globals.node_list[start_index:], start=1):
if i < win_height - 1 : # Check if there is enough space in the window
if globals.selected_node + 1 == start_index + i and globals.current_window == 2:
nodes_win.addstr(i, 1, get_name_from_number(node, "long"), curses.color_pair(3))
nodes_win.addstr(i, 1, get_name_from_number(node, "long"), curses.color_pair(1) | curses.A_REVERSE)
else:
nodes_win.addstr(i, 1, get_name_from_number(node, "long"), curses.color_pair(4))
nodes_win.addstr(i, 1, get_name_from_number(node, "long"), curses.color_pair(1))
nodes_win.box()
nodes_win.refresh()
@@ -169,7 +185,7 @@ def select_messages(direction):
draw_messages_window()
def select_nodes(direction):
node_list_length = len(get_node_list())
node_list_length = len(globals.node_list)
globals.selected_node += direction
if globals.selected_node < 0:
@@ -229,14 +245,6 @@ def main_ui(stdscr):
stdscr.keypad(True)
get_channels()
# Initialize colors
curses.start_color()
curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE)
curses.init_pair(4, curses.COLOR_WHITE, curses.COLOR_BLACK)
curses.init_pair(5, curses.COLOR_RED, curses.COLOR_BLACK)
# Calculate window max dimensions
height, width = stdscr.getmaxyx()
@@ -309,25 +317,33 @@ def main_ui(stdscr):
elif globals.current_window == 2:
select_nodes(1)
elif char == curses.KEY_LEFT:
globals.current_window = (globals.current_window - 1) % 3
draw_channel_list()
draw_node_list()
draw_messages_window()
elif char == curses.KEY_LEFT or char == curses.KEY_RIGHT:
delta = -1 if char == curses.KEY_LEFT else 1
elif char == curses.KEY_RIGHT:
globals.current_window = (globals.current_window + 1) % 3
draw_channel_list()
draw_node_list()
draw_messages_window()
old_window = globals.current_window
globals.current_window = (globals.current_window + delta) % 3
if old_window == 0 or globals.current_window == 0:
draw_channel_list()
if old_window == 1 or globals.current_window == 1:
draw_messages_window()
if old_window == 2 or globals.current_window == 2:
draw_node_list()
# Check for Esc
elif char == 27:
break
# Check for Ctrl + t
elif char == 20:
send_traceroute()
curses.curs_set(0) # Hide cursor
ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
curses.curs_set(1) # Show cursor again
elif char == curses.KEY_ENTER or char == 10 or char == 13:
if globals.current_window == 2:
node_list = get_node_list()
node_list = globals.node_list
if node_list[globals.selected_node] not in globals.channel_list:
globals.channel_list.append(node_list[globals.selected_node])
globals.all_messages[node_list[globals.selected_node]] = []
@@ -359,10 +375,10 @@ def main_ui(stdscr):
entry_win.move(y, x - 1)
entry_win.refresh()
elif char == 96:
curses.curs_set(0) # Hide cursor
settings(stdscr)
curses.curs_set(1) # Show cursor again
elif char == 96: # ` Launch the settings interface
curses.curs_set(0)
settings_menu(stdscr, globals.interface)
curses.curs_set(1)
elif char == 47:
# Display packet log

40
ui/dialog.py Normal file
View File

@@ -0,0 +1,40 @@
import curses
def dialog(stdscr, title, message):
height, width = stdscr.getmaxyx()
# Calculate dialog dimensions
max_line_lengh = 0
message_lines = message.splitlines()
for l in message_lines:
max_line_length = max(len(l), max_line_lengh)
dialog_width = max(len(title) + 4, max_line_length + 4)
dialog_height = len(message_lines) + 4
x = (width - dialog_width) // 2
y = (height - dialog_height) // 2
# Create dialog window
win = curses.newwin(dialog_height, dialog_width, y, x)
win.border(0)
# Add title
win.addstr(0, 2, title)
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l)
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", curses.color_pair(1) | curses.A_REVERSE)
# Refresh dialog window
win.refresh()
# Get user input
while True:
char = win.getch()
# Close dialog with enter, space, or esc
if char in(curses.KEY_ENTER, 10, 13, 32, 27):
win.clear()
win.refresh()
return

93
ui/menus.py Normal file
View File

@@ -0,0 +1,93 @@
from meshtastic.protobuf import config_pb2, module_config_pb2, channel_pb2
from save_to_radio import settings_reboot, settings_factory_reset, settings_reset_nodedb, settings_shutdown
import logging, traceback
def extract_fields(message_instance, current_config=None):
if isinstance(current_config, dict): # Handle dictionaries
return {key: (None, current_config.get(key, "Not Set")) for key in current_config}
if not hasattr(message_instance, "DESCRIPTOR"):
return {}
menu = {}
fields = message_instance.DESCRIPTOR.fields
for field in fields:
if field.name in {"sessionkey", "channel_num", "id"}: # Skip certain fields
continue
if field.message_type: # Nested message
nested_instance = getattr(message_instance, field.name)
nested_config = getattr(current_config, field.name, None) if current_config else None
menu[field.name] = extract_fields(nested_instance, nested_config)
elif field.enum_type: # Handle enum fields
current_value = getattr(current_config, field.name, "Not Set") if current_config else "Not Set"
if isinstance(current_value, int): # If the value is a number, map it to its name
enum_value = field.enum_type.values_by_number.get(current_value)
if enum_value: # Check if the enum value exists
current_value_name = f"{enum_value.name}"
else:
current_value_name = f"Unknown ({current_value})"
menu[field.name] = (field, current_value_name)
else:
menu[field.name] = (field, current_value) # Non-integer values
else: # Handle other field types
current_value = getattr(current_config, field.name, "Not Set") if current_config else "Not Set"
menu[field.name] = (field, current_value)
return menu
def generate_menu_from_protobuf(interface):
# Function to generate the menu structure from protobuf messages
menu_structure = {"Main Menu": {}}
# Add Radio Settings
radio = config_pb2.Config()
current_radio_config = interface.localNode.localConfig if interface else None
menu_structure["Main Menu"]["Radio Settings"] = extract_fields(radio, current_radio_config)
# Add Module Settings
module = module_config_pb2.ModuleConfig()
current_module_config = interface.localNode.moduleConfig if interface else None
menu_structure["Main Menu"]["Module Settings"] = extract_fields(module, current_module_config)
# Add User Settings
current_node_info = interface.getMyNodeInfo() if interface else None
if current_node_info:
current_user_config = current_node_info.get("user", None)
if current_user_config and isinstance(current_user_config, dict):
menu_structure["Main Menu"]["User Settings"] = {
"longName": (None, current_user_config.get("longName", "Not Set")),
"shortName": (None, current_user_config.get("shortName", "Not Set"))
}
else:
logging.info("User settings not found in Node Info")
menu_structure["Main Menu"]["User Settings"] = "No user settings available"
else:
logging.info("Node Info not available")
menu_structure["Main Menu"]["User Settings"] = "Node Info not available"
# Add Channels
channel = channel_pb2.ChannelSettings()
menu_structure["Main Menu"]["Channels"] = {}
if interface:
for i in range(8):
current_channel = interface.localNode.getChannelByChannelIndex(i)
if current_channel:
channel_config = extract_fields(channel, current_channel.settings)
menu_structure["Main Menu"]["Channels"][f"Channel {i + 1}"] = channel_config
# Add additional settings options
menu_structure["Main Menu"]["Reboot"] = settings_reboot
menu_structure["Main Menu"]["Reset Node DB"] = settings_reset_nodedb
menu_structure["Main Menu"]["Shutdown"] = settings_shutdown
menu_structure["Main Menu"]["Factory Reset"] = settings_factory_reset
# Add Exit option
menu_structure["Main Menu"]["Exit"] = None
return menu_structure

View File

@@ -34,11 +34,13 @@ def get_channels():
return globals.channel_list
def get_node_list():
node_list = []
if globals.interface.nodes:
for node in globals.interface.nodes.values():
node_list.append(node['num'])
return node_list
sorted_nodes = sorted(
globals.interface.nodes.values(),
key = lambda node: (node['lastHeard'] if ('lastHeard' in node and isinstance(node['lastHeard'], int)) else 0),
reverse = True)
return [node['num'] for node in sorted_nodes]
return []
def get_nodeNum():
myinfo = globals.interface.getMyNodeInfo()
@@ -69,13 +71,3 @@ def get_name_from_number(number, type='long'):
name = str(decimal_to_hex(number)) # If long name not found, use the ID as string
return name
def sanitize_string(input_str: str) -> str:
"""Check if the string starts with a letter (a-z, A-Z) or an underscore (_), and replace all non-alpha/numeric/underscore characters with underscores."""
if not re.match(r'^[a-zA-Z_]', input_str):
# If not, add "_"
input_str = '_' + input_str
# Replace special characters with underscores (for database tables)
sanitized_str: str = re.sub(r'[^a-zA-Z0-9_]', '_', input_str)
return sanitized_str