1
0
forked from iarv/contact

Compare commits

...

22 Commits

Author SHA1 Message Date
pdxlocations
e273b3325d bump version 2025-01-31 16:54:25 -08:00
pdxlocations
ad7c7a148f Add Config Import and Export (#98)
* init

* working changes

* working changes

* remove unused code
2025-01-31 16:42:55 -08:00
Russell Schmidt
df7d9b0e2e Enable resizing app (#95)
* Enable resizing app

* Crash less with narrow windows

If the window gets too narrow we'll still crash, but this lets us get
narrower than before.

* Fix resize re-drawing

* Fix crash when resizing too fast

* Fix crash after resize with dialog open

* Enable resizing settings
2025-01-31 16:37:12 -08:00
pdxlocations
b9d8c9ad44 Add Lat/Lon/Alt to Position Settings (#96)
* add lat/lon/alt

* fix types and conditions
2025-01-30 22:33:42 -08:00
pdxlocations
e27504f215 fix reseting nested modified_settings 2025-01-30 22:12:33 -08:00
pdxlocations
648993607d fix packet log height 2025-01-30 14:24:13 -08:00
pdxlocations
cf8ee248de adjust packet log message window offset 2025-01-30 13:40:13 -08:00
pdxlocations
03f7fd81a7 Merge pull request #94 from pdxlocations/get-name-from-db
Get long/short name from sql db, not nodedb
2025-01-30 12:53:22 -08:00
pdxlocations
5730beafa9 change get name to sql db from nodedb 2025-01-30 12:51:31 -08:00
pdxlocations
2a6a1ff798 rm globlas that i think we don't need 2025-01-30 12:06:56 -08:00
pdxlocations
09d832a203 colors on traceroute window 2025-01-29 21:25:53 -08:00
pdxlocations
bea051a69f Merge pull request #93 from pdxlocations:psk-from-bytes
Convert bytes PSK to base 64
2025-01-29 21:16:42 -08:00
pdxlocations
aa1b7d43a8 Convert bytes PSK to base 64 2025-01-29 21:12:26 -08:00
pdxlocations
59187a3838 rm depreciated field (#92) 2025-01-29 18:36:00 -08:00
pdxlocations
47d6212b3a Minor Refactor and Dynamic Color List (#91)
* refactor init

* dynamic color list
2025-01-29 17:45:48 -08:00
pdxlocations
f6e7a09c7e refactor select_from_list 2025-01-29 16:28:08 -08:00
pdxlocations
1d9d055a4d import cleanup 2025-01-29 16:16:38 -08:00
pdxlocations
dae8c46b7b allow emojis in node name 2025-01-29 11:58:20 -08:00
pdxlocations
d7a9112918 restore .clear for dictionary 2025-01-29 11:42:42 -08:00
pdxlocations
5084eca388 enforce 4-letter short name 2025-01-29 11:40:23 -08:00
pdxlocations
5e4b28d47a typo 2025-01-29 08:47:34 -08:00
pdxlocations
c8a5ad3a95 Adding some startup logging 2025-01-29 08:45:42 -08:00
16 changed files with 699 additions and 219 deletions

3
.gitignore vendored
View File

@@ -1,9 +1,10 @@
venv/
.venv/
__pycache__/
node-configs/
client.db
.DS_Store
client.log
settings.log
config.json
default_config.log
default_config.log

View File

@@ -1,11 +1,12 @@
import sqlite3
import time
from datetime import datetime
import logging
import globals
from utilities.utils import decimal_to_hex
import default_config as config
from utilities.utils import get_name_from_number
import globals
def get_table_name(channel):
# Construct the table name
@@ -133,7 +134,7 @@ def load_messages_from_db():
if user_id == str(globals.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (f"{config.message_prefix} {get_name_from_number(int(user_id), 'short')}: ", message)
formatted_message = (f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ", message)
hourly_messages[hour].append(formatted_message)
@@ -285,3 +286,38 @@ def maybe_store_nodeinfo_in_db(packet):
logging.error(f"SQLite error in maybe_store_nodeinfo_in_db: {e}")
finally:
db_connection.close()
def get_name_from_database(user_id, type="long"):
"""
Retrieve a user's name (long or short) from the node database.
:param user_id: The user ID to look up.
:param type: "long" for long name, "short" for short name.
:return: The retrieved name or the hex of the user id
"""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
column_name = "long_name" if type == "long" else "short_name"
# Query the database
query = f"SELECT {column_name} FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))
result = db_cursor.fetchone()
return result[0] if result else decimal_to_hex(user_id)
except sqlite3.Error as e:
logging.error(f"SQLite error in get_name_from_database: {e}")
return "Unknown"
except Exception as e:
logging.error(f"Unexpected error in get_name_from_database: {e}")
return "Unknown"

View File

@@ -1,6 +1,6 @@
import os
import json
import logging
import json
import os
def format_json_single_line_arrays(data, indent=4):
"""

View File

@@ -20,22 +20,31 @@ def get_user_input(prompt):
input_win.addstr(3, 2, "Enter value: ", get_color("settings_default"))
input_win.refresh()
# Check if "shortName" is in the prompt, and set max length accordingly
max_length = 4 if "shortName" in prompt else None
curses.curs_set(1)
user_input = ""
input_position = (3, 15) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
key = input_win.getch(3, 15 + len(user_input)) # Adjust cursor position dynamically
if key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
key = input_win.get_wch(row, col + len(user_input)) # Adjust cursor position dynamically
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
curses.curs_set(0)
return None # Exit without returning a value
elif key == ord('\n'): # Enter key
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
break
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
user_input = user_input[:-1]
input_win.addstr(3, 15, " " * (len(user_input) + 1), get_color("settings_default")) # Clear the line
input_win.addstr(3, 15, user_input, get_color("settings_default"))
else:
user_input += chr(key)
input_win.addstr(row, col, " " * (len(user_input) + 1), get_color("settings_default")) # Clear the line
input_win.addstr(row, col, user_input, get_color("settings_default"))
elif max_length is None or len(user_input) < max_length: # Enforce max length if applicable
# Append typed character to input text
if(isinstance(key, str)):
user_input += key
else:
user_input += chr(key)
input_win.addstr(3, 15, user_input, get_color("settings_default"))
curses.curs_set(0)
@@ -249,4 +258,73 @@ def get_fixed32_input(current_value):
if char.isdigit() or char == ".":
user_input += char # Append only valid characters (digits or dots)
except ValueError:
pass # Ignore invalid inputs
pass # Ignore invalid inputs
def select_from_list(prompt, current_option, list_options):
"""
Displays a scrollable list of list_options for the user to choose from using a pad.
"""
selected_index = list_options.index(current_option) if current_option in list_options else 0
height = min(len(list_options) + 5, curses.LINES - 2)
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
list_win = curses.newwin(height, width, start_y, start_x)
list_win.bkgd(get_color("background"))
list_win.attrset(get_color("window_frame"))
list_win.keypad(True)
list_pad = curses.newpad(len(list_options) + 1, width - 8)
list_pad.bkgd(get_color("background"))
# Render header
list_win.clear()
list_win.border()
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
# Render options on the pad
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
# Initial refresh
list_win.refresh()
list_pad.refresh(0, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
while True:
key = list_win.getch()
if key == curses.KEY_UP:
if selected_index > 0:
selected_index -= 1
elif key == curses.KEY_DOWN:
if selected_index < len(list_options) - 1:
selected_index += 1
elif key == curses.KEY_RIGHT or key == ord('\n'):
return list_options[selected_index]
elif key == curses.KEY_LEFT or key == 27: # ESC key
return current_option
# Refresh the pad with updated selection and scroll offset
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
list_win.refresh()
list_pad.refresh(0, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)

View File

@@ -3,7 +3,7 @@
'''
Contact - A Console UI for Meshtastic by http://github.com/pdxlocations
Powered by Meshtastic.org
V 1.1.3
V 1.2.0
'''
import curses
@@ -18,8 +18,8 @@ from message_handlers.rx_handler import on_receive
from ui.curses_ui import main_ui, draw_splash
from utilities.utils import get_channels, get_node_list, get_nodeNum
from db_handler import init_nodedb, load_messages_from_db
import globals
import default_config as config
import globals
# Set environment variables for ncurses compatibility
os.environ["NCURSES_NO_UTF8_ACS"] = "1"
@@ -39,13 +39,17 @@ def main(stdscr):
draw_splash(stdscr)
parser = setup_parser()
args = parser.parse_args()
logging.info("Initializing interface %s", args)
globals.interface = initialize_interface(args)
logging.info("Interface initialized")
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, 'meshtastic.receive')
init_nodedb()
load_messages_from_db()
logging.info("Starting main UI")
main_ui(stdscr)
except Exception as e:
logging.error("An error occurred: %s", e)

View File

@@ -1,17 +1,15 @@
import logging
import time
from meshtastic import BROADCAST_NUM
from utilities.utils import get_node_list, decimal_to_hex, get_name_from_number
import globals
from utilities.utils import get_node_list
from ui.curses_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
from db_handler import save_message_to_db, maybe_store_nodeinfo_in_db
from db_handler import save_message_to_db, maybe_store_nodeinfo_in_db, get_name_from_database
import default_config as config
import globals
from datetime import datetime
def on_receive(packet, interface):
global nodes_win
# Update packet log
globals.packet_buffer.append(packet)
@@ -65,7 +63,7 @@ def on_receive(packet, interface):
# Add received message to the messages list
message_from_id = packet['from']
message_from_string = get_name_from_number(message_from_id, type='short') + ":"
message_from_string = get_name_from_database(message_from_id, type='short') + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []

View File

@@ -1,11 +1,11 @@
from datetime import datetime
from meshtastic import BROADCAST_NUM
from db_handler import save_message_to_db, update_ack_nak
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from utilities.utils import get_name_from_number
import globals
import google.protobuf.json_format
from meshtastic import BROADCAST_NUM
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from db_handler import save_message_to_db, update_ack_nak, get_name_from_database
import default_config as config
import globals
ack_naks = {}
@@ -56,18 +56,18 @@ def on_response_traceroute(packet):
msg_str = "Traceroute to:\n"
route_str = get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
route_str = get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
# SNR list should have one more entry than the route, as the final destination adds its SNR also
lenTowards = 0 if "route" not in msg_dict else len(msg_dict["route"])
snrTowardsValid = "snrTowards" in msg_dict and len(msg_dict["snrTowards"]) == lenTowards + 1
if lenTowards > 0: # Loop through hops in route and add SNR if available
for idx, node_num in enumerate(msg_dict["route"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrTowards"][idx] / 4) if snrTowardsValid and msg_dict["snrTowards"][idx] != UNK_SNR else "?") + "dB)"
# End with origin of response
route_str += " --> " + (get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}") \
route_str += " --> " + (get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}") \
+ " (" + (str(msg_dict["snrTowards"][-1] / 4) if snrTowardsValid and msg_dict["snrTowards"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route towards destination
@@ -77,15 +77,15 @@ def on_response_traceroute(packet):
backValid = "hopStart" in packet and "snrBack" in msg_dict and len(msg_dict["snrBack"]) == lenBack + 1
if backValid:
msg_str += "Back:\n"
route_str = get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
route_str = get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
if lenBack > 0: # Loop through hops in routeBack and add SNR if available
for idx, node_num in enumerate(msg_dict["routeBack"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrBack"][idx] / 4) if msg_dict["snrBack"][idx] != UNK_SNR else "?") + "dB)"
# End with destination of response (us)
route_str += " --> " + (get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}") \
route_str += " --> " + (get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}") \
+ " (" + (str(msg_dict["snrBack"][-1] / 4) if msg_dict["snrBack"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route back to us
@@ -102,7 +102,7 @@ def on_response_traceroute(packet):
add_notification(channel_number)
refresh_channels = True
message_from_string = get_name_from_number(packet['from'], type='short') + ":\n"
message_from_string = get_name_from_database(packet['from'], type='short') + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []

View File

@@ -15,10 +15,10 @@ def settings_shutdown(interface):
def settings_factory_reset(interface):
interface.localNode.factoryReset()
def settings_set_owner(interface, long_name=None, short_name=None, is_licensed=False):
if isinstance(is_licensed, str):
is_licensed = is_licensed.lower() == 'true'
interface.localNode.setOwner(long_name, short_name, is_licensed)
# def settings_set_owner(interface, long_name=None, short_name=None, is_licensed=False):
# if isinstance(is_licensed, str):
# is_licensed = is_licensed.lower() == 'true'
# interface.localNode.setOwner(long_name, short_name, is_licensed)
def save_changes(interface, menu_path, modified_settings):
@@ -38,13 +38,24 @@ def save_changes(interface, menu_path, modified_settings):
if menu_path[1] == "Radio Settings" or menu_path[1] == "Module Settings":
config_category = menu_path[2].lower() # for radio and module configs
if {'latitude', 'longitude', 'altitude'} & modified_settings.keys():
lat = float(modified_settings.get('latitude', 0.0))
lon = float(modified_settings.get('longitude', 0.0))
alt = int(modified_settings.get('altitude', 0))
interface.localNode.setFixedPosition(lat, lon, alt)
logging.info(f"Updated {config_category} with Latitude: {lat} and Longitude {lon} and Altitude {alt}")
return
elif menu_path[1] == "User Settings": # for user configs
config_category = "User Settings"
long_name = modified_settings.get("longName")
short_name = modified_settings.get("shortName")
is_licensed = modified_settings.get("isLicensed")
is_licensed = is_licensed == "True" or is_licensed is True
node.setOwner(long_name, short_name, is_licensed)
logging.info(f"Updated {config_category} with Long Name: {long_name} and Short Name {short_name} and Licensed Mode {is_licensed}")
return

View File

@@ -1,9 +1,11 @@
import curses
import logging
import os
from save_to_radio import settings_factory_reset, settings_reboot, settings_reset_nodedb, settings_shutdown, save_changes
from utilities.config_io import config_export, config_import
from input_handlers import get_bool_selection, get_repeated_input, get_user_input, get_enum_input, get_fixed32_input, select_from_list
from ui.menus import generate_menu_from_protobuf
from input_handlers import get_bool_selection, get_repeated_input, get_user_input, get_enum_input, get_fixed32_input
from ui.colors import setup_colors, get_color
from utilities.arg_parser import setup_parser
from utilities.interfaces import initialize_interface
@@ -15,7 +17,6 @@ save_option = "Save Changes"
sensitive_settings = ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"]
def display_menu(current_menu, menu_path, selected_index, show_save_option):
global menu_win, menu_pad
# Calculate the dynamic height based on the number of menu items
num_items = len(current_menu) + (1 if show_save_option else 0) # Add 1 for the "Save Changes" option if applicable
@@ -63,8 +64,11 @@ def display_menu(current_menu, menu_path, selected_index, show_save_option):
menu_pad.refresh(0, 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0), menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 8)
return menu_win, menu_pad
def move_highlight(old_idx, new_idx, options, show_save_option, menu_win):
def move_highlight(old_idx, new_idx, options, show_save_option, menu_win, menu_pad):
if(old_idx == new_idx): # no-op
return
@@ -89,6 +93,7 @@ def move_highlight(old_idx, new_idx, options, show_save_option, menu_win):
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0), menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 8)
def settings_menu(stdscr, interface):
curses.update_lines_cols()
menu = generate_menu_from_protobuf(interface)
current_menu = menu["Main Menu"]
@@ -113,7 +118,7 @@ def settings_menu(stdscr, interface):
)
# Display the menu
display_menu(current_menu, menu_path, selected_index, show_save_option)
menu_win, menu_pad = display_menu(current_menu, menu_path, selected_index, show_save_option)
need_redraw = False
@@ -125,17 +130,21 @@ def settings_menu(stdscr, interface):
if key == curses.KEY_UP:
old_selected_index = selected_index
selected_index = max_index if selected_index == 0 else selected_index - 1
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win)
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad)
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
selected_index = 0 if selected_index == max_index else selected_index + 1
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win)
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad)
elif key == curses.KEY_RESIZE:
need_redraw = True
curses.update_lines_cols()
elif key == ord("\t") and show_save_option:
old_selected_index = selected_index
selected_index = max_index
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win)
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad)
elif key == curses.KEY_RIGHT or key == ord('\n'):
need_redraw = True
@@ -143,7 +152,7 @@ def settings_menu(stdscr, interface):
menu_win.refresh()
if show_save_option and selected_index == len(options):
save_changes(interface, menu_path, modified_settings)
modified_settings.erase()
modified_settings.clear()
logging.info("Changes Saved")
if len(menu_path) > 1:
@@ -159,6 +168,62 @@ def settings_menu(stdscr, interface):
if selected_option == "Exit":
break
elif selected_option == "Export Config":
filename = get_user_input("Enter a filename for the config file")
if not filename:
logging.warning("Export aborted: No filename provided.")
continue # Go back to the menu
if not filename.lower().endswith(".yaml"):
filename += ".yaml"
try:
config_text = config_export(globals.interface)
app_directory = os.path.dirname(os.path.abspath(__file__))
config_folder = "node-configs"
yaml_file_path = os.path.join(app_directory, config_folder, filename)
if os.path.exists(yaml_file_path):
overwrite = get_bool_selection(f"{filename} already exists. Overwrite?", None)
if overwrite == "False":
logging.info("Export cancelled: User chose not to overwrite.")
continue # Return to menu
os.makedirs(os.path.dirname(yaml_file_path), exist_ok=True)
with open(yaml_file_path, "w", encoding="utf-8") as file:
file.write(config_text)
logging.info(f"Config file saved to {yaml_file_path}")
break
except PermissionError:
logging.error(f"Permission denied: Unable to write to {yaml_file_path}")
except OSError as e:
logging.error(f"OS error while saving config: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
continue
elif selected_option == "Load Config":
app_directory = os.path.dirname(os.path.abspath(__file__))
config_folder = "node-configs"
folder_path = os.path.join(app_directory, config_folder)
file_list = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
filename = select_from_list("Choose a config file", None, file_list)
if filename:
file_path = os.path.join(app_directory, config_folder, filename)
overwrite = get_bool_selection(f"Are you sure you want to load {filename}?", None)
if overwrite == "True":
config_import(globals.interface, file_path)
break
continue
elif selected_option == "Reboot":
confirmation = get_bool_selection("Are you sure you want to Reboot?", 0)
if confirmation == "True":
@@ -213,6 +278,15 @@ def settings_menu(stdscr, interface):
for option, (field, value) in current_menu.items():
modified_settings[option] = value
elif selected_option in ['latitude', 'longitude', 'altitude']:
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else new_value
current_menu[selected_option] = (field, new_value)
for option in ['latitude', 'longitude', 'altitude']:
if option in current_menu:
modified_settings[option] = current_menu[option][1]
elif field.type == 8: # Handle boolean type
new_value = get_bool_selection(selected_option, str(current_value))
new_value = new_value == "True" or new_value is True
@@ -265,7 +339,8 @@ def settings_menu(stdscr, interface):
menu_win.erase()
menu_win.refresh()
modified_settings.clear()
if len(menu_path) < 2:
modified_settings.clear()
# Navigate back to the previous menu
if len(menu_path) > 1:

View File

@@ -1,19 +1,13 @@
import curses
import textwrap
import globals
from utilities.utils import get_name_from_number, get_channels, get_time_ago
from utilities.utils import get_channels, get_time_ago
from settings import settings_menu
from message_handlers.tx_handler import send_message, send_traceroute
import ui.dialog
from ui.colors import setup_colors, get_color
from db_handler import get_name_from_database
import default_config as config
def refresh_all():
for i, box in enumerate([channel_box, messages_box, nodes_box]):
box.attrset(get_color("window_frame_selected") if globals.current_window == i else get_color("window_frame"))
box.box()
box.refresh()
refresh_pad(i)
import ui.dialog
import globals
def draw_node_details():
nodes_snapshot = list(globals.interface.nodes.values())
@@ -48,12 +42,16 @@ def draw_node_details():
draw_centered_text_field(function_win, nodestr, 0, get_color("commands"))
def draw_function_win():
draw_centered_text_field(function_win,
f"↑→↓← = Select ENTER = Send ` = Settings ^P = Packet Log ESC = Quit",
0, get_color("commands"))
cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit"]
function_str = ""
for s in cmds:
if(len(function_str) + len(s) < function_win.getmaxyx()[1]):
function_str += s
draw_centered_text_field(function_win, function_str, 0, get_color("commands"))
def get_msg_window_lines():
packetlog_height = packetlog_win.getmaxyx()[0] if globals.display_log else 0
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if globals.display_log else 0
return messages_box.getmaxyx()[0] - 2 - packetlog_height
def refresh_pad(window):
@@ -96,7 +94,7 @@ def highlight_line(highlight, window, line):
if(window == 2):
pad = nodes_pad
select_len = len(get_name_from_number(globals.node_list[line], "long"))
select_len = len(get_name_from_database(globals.node_list[line], "long"))
pad.chgat(line, 1, select_len, nd_color | curses.A_REVERSE if highlight else nd_color)
@@ -105,7 +103,7 @@ def highlight_line(highlight, window, line):
win_width = channel_box.getmaxyx()[1]
if(isinstance(channel, int)):
channel = get_name_from_number(channel, type="long")
channel = get_name_from_database(channel, type="long")
select_len = min(len(channel), win_width - 4)
if line == globals.selected_channel and highlight == False:
@@ -171,7 +169,7 @@ def draw_channel_list():
for i, channel in enumerate(list(globals.all_messages.keys())):
# Convert node number to long name if it's an integer
if isinstance(channel, int):
channel = get_name_from_number(channel, type='long')
channel = get_name_from_database(channel, type='long')
# Determine whether to add the notification
notification = " " + config.notification_symbol if i in globals.notifications else ""
@@ -239,16 +237,13 @@ def draw_messages_window(scroll_to_bottom = False):
def draw_node_list():
nodes_pad.erase()
win_height = nodes_box.getmaxyx()[0]
start_index = max(0, globals.selected_node - (win_height - 3)) # Calculate starting index based on selected node and window height
nodes_pad.resize(len(globals.node_list), nodes_box.getmaxyx()[1])
for i, node in enumerate(globals.node_list):
if globals.selected_node == i and globals.current_window == 2:
nodes_pad.addstr(i, 1, get_name_from_number(node, "long"), get_color("node_list", reverse=True))
nodes_pad.addstr(i, 1, get_name_from_database(node, "long"), get_color("node_list", reverse=True))
else:
nodes_pad.addstr(i, 1, get_name_from_number(node, "long"), get_color("node_list"))
nodes_pad.addstr(i, 1, get_name_from_database(node, "long"), get_color("node_list"))
nodes_box.attrset(get_color("window_frame_selected") if globals.current_window == 2 else get_color("window_frame"))
nodes_box.box()
@@ -330,10 +325,10 @@ def draw_packetlog_win():
break
# Format each field
from_id = get_name_from_number(packet['from'], 'short').ljust(columns[0])
from_id = get_name_from_database(packet['from'], 'short').ljust(columns[0])
to_id = (
"BROADCAST".ljust(columns[1]) if str(packet['to']) == "4294967295"
else get_name_from_number(packet['to'], 'short').ljust(columns[1])
else get_name_from_database(packet['to'], 'short').ljust(columns[1])
)
if 'decoded' in packet:
port = packet['decoded']['portnum'].ljust(columns[2])
@@ -353,84 +348,107 @@ def draw_packetlog_win():
packetlog_win.box()
packetlog_win.refresh()
def main_ui(stdscr):
global messages_pad, messages_box, nodes_pad, nodes_box, channel_pad, channel_box, function_win, packetlog_win
stdscr.keypad(True)
get_channels()
def handle_resize(stdscr, firstrun):
global messages_pad, messages_box, nodes_pad, nodes_box, channel_pad, channel_box, function_win, packetlog_win, entry_win
# Calculate window max dimensions
height, width = stdscr.getmaxyx()
# Define window dimensions and positions
entry_win = curses.newwin(3, width, 0, 0)
channel_width = 3 * (width // 16)
nodes_width = 5 * (width // 16)
messages_width = width - channel_width - nodes_width
channel_box = curses.newwin(height - 6, channel_width, 3, 0)
messages_box = curses.newwin(height - 6, messages_width, 3, channel_width)
nodes_box = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
if firstrun:
entry_win = curses.newwin(3, width, 0, 0)
channel_box = curses.newwin(height - 6, channel_width, 3, 0)
messages_box = curses.newwin(height - 6, messages_width, 3, channel_width)
nodes_box = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
function_win = curses.newwin(3, width, height - 3, 0)
packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width)
entry_win.bkgd(get_color("background"))
channel_box.bkgd(get_color("background"))
messages_box.bkgd(get_color("background"))
nodes_box.bkgd(get_color("background"))
# Will be resized to what we need when drawn
messages_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1,1)
channel_pad = curses.newpad(1,1)
# Will be resized to what we need when drawn
messages_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1,1)
channel_pad = curses.newpad(1,1)
entry_win.bkgd(get_color("background"))
channel_box.bkgd(get_color("background"))
messages_box.bkgd(get_color("background"))
nodes_box.bkgd(get_color("background"))
messages_pad.bkgd(get_color("background"))
nodes_pad.bkgd(get_color("background"))
channel_pad.bkgd(get_color("background"))
messages_pad.bkgd(get_color("background"))
nodes_pad.bkgd(get_color("background"))
channel_pad.bkgd(get_color("background"))
function_win = curses.newwin(3, width, height - 3, 0)
packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width)
function_win.bkgd(get_color("background"))
packetlog_win.bkgd(get_color("background"))
function_win.bkgd(get_color("background"))
packetlog_win.bkgd(get_color("background"))
channel_box.attrset(get_color("window_frame"))
entry_win.attrset(get_color("window_frame"))
nodes_box.attrset(get_color("window_frame"))
messages_box.attrset(get_color("window_frame"))
function_win.attrset(get_color("window_frame"))
draw_function_win()
else:
entry_win.erase()
channel_box.erase()
messages_box.erase()
nodes_box.erase()
function_win.erase()
packetlog_win.erase()
entry_win.resize(3, width)
channel_box.resize(height - 6, channel_width)
messages_box.resize(height - 6, messages_width)
messages_box.mvwin(3, channel_width)
nodes_box.resize(height - 6, nodes_width)
nodes_box.mvwin(3, channel_width + messages_width)
function_win.resize(3, width)
function_win.mvwin(height - 3, 0)
packetlog_win.resize(int(height / 3), messages_width)
packetlog_win.mvwin(height - int(height / 3) - 3, channel_width)
# Draw boxes around windows
# Set the normal frame color for the channel box
channel_box.attrset(get_color("window_frame"))
channel_box.box()
# Draw boxes for other windows
entry_win.attrset(get_color("window_frame"))
entry_win.box()
nodes_box.attrset(get_color("window_frame"))
nodes_box.box()
messages_box.attrset(get_color("window_frame"))
messages_box.box()
function_win.attrset(get_color("window_frame"))
function_win.box()
# Refresh all windows
entry_win.refresh()
channel_box.refresh()
function_win.refresh()
nodes_box.refresh()
messages_box.refresh()
input_text = ""
entry_win.keypad(True)
curses.curs_set(1)
draw_channel_list()
draw_node_list()
draw_messages_window(True)
try:
draw_function_win()
draw_channel_list()
draw_messages_window(True)
draw_node_list()
except:
# Resize events can come faster than we can re-draw, which can cause a curses error.
# In this case we'll see another curses.KEY_RESIZE in our key handler and draw again later.
pass
def main_ui(stdscr):
global messages_pad, messages_box, nodes_pad, nodes_box, channel_pad, channel_box, function_win, packetlog_win, entry_win
messages_pad = messages_box = nodes_pad = nodes_box = channel_pad = channel_box = function_win = packetlog_win = entry_win = None
stdscr.keypad(True)
get_channels()
input_text = ""
handle_resize(stdscr, True)
while True:
draw_text_field(entry_win, f"Input: {input_text[-(width - 10):]}", get_color("input"))
draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
char = entry_win.get_wch()
@@ -445,6 +463,10 @@ def main_ui(stdscr):
elif globals.current_window == 2:
scroll_nodes(-1)
elif char == curses.KEY_RESIZE:
input_text = ""
handle_resize(stdscr, False)
elif char == curses.KEY_DOWN:
if globals.current_window == 0:
scroll_channels(1)
@@ -548,7 +570,7 @@ def main_ui(stdscr):
curses.curs_set(0) # Hide cursor
ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
curses.curs_set(1) # Show cursor again
refresh_all()
handle_resize(stdscr, False)
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
if globals.current_window == 2:
@@ -587,7 +609,7 @@ def main_ui(stdscr):
curses.curs_set(0)
settings_menu(stdscr, globals.interface)
curses.curs_set(1)
refresh_all()
handle_resize(stdscr, False)
elif char == chr(16):
# Display packet log

View File

@@ -1,4 +1,5 @@
import curses
from ui.colors import get_color
def dialog(stdscr, title, message):
height, width = stdscr.getmaxyx()
@@ -15,17 +16,19 @@ def dialog(stdscr, title, message):
# Create dialog window
win = curses.newwin(dialog_height, dialog_width, y, x)
win.bkgd(get_color("background"))
win.attrset(get_color("window_frame"))
win.border(0)
# Add title
win.addstr(0, 2, title)
win.addstr(0, 2, title, get_color("settings_default"))
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l)
win.addstr(2 + i, 2, l, get_color("settings_default"))
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", curses.color_pair(1) | curses.A_REVERSE)
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
# Refresh dialog window
win.refresh()

View File

@@ -1,6 +1,8 @@
from collections import OrderedDict
from meshtastic.protobuf import config_pb2, module_config_pb2, channel_pb2
from save_to_radio import settings_reboot, settings_factory_reset, settings_reset_nodedb, settings_shutdown
import logging, traceback
import base64
def extract_fields(message_instance, current_config=None):
if isinstance(current_config, dict): # Handle dictionaries
@@ -12,9 +14,11 @@ def extract_fields(message_instance, current_config=None):
menu = {}
fields = message_instance.DESCRIPTOR.fields
for field in fields:
if field.name in {"sessionkey", "channel_num", "id"}: # Skip certain fields
if field.name in {"sessionkey", "channel_num", "id", "ignore_incoming"}: # Skip certain fields
continue
if field.message_type: # Nested message
nested_instance = getattr(message_instance, field.name)
nested_config = getattr(current_config, field.name, None) if current_config else None
@@ -70,6 +74,8 @@ def generate_menu_from_protobuf(interface):
current_channel = interface.localNode.getChannelByChannelIndex(i)
if current_channel:
channel_config = extract_fields(channel, current_channel.settings)
# Convert 'psk' field to Base64
channel_config["psk"] = (channel_config["psk"][0], base64.b64encode(channel_config["psk"][1]).decode('utf-8'))
menu_structure["Main Menu"]["Channels"][f"Channel {i + 1}"] = channel_config
# Add Radio Settings
@@ -77,6 +83,30 @@ def generate_menu_from_protobuf(interface):
current_radio_config = interface.localNode.localConfig if interface else None
menu_structure["Main Menu"]["Radio Settings"] = extract_fields(radio, current_radio_config)
# Add Lat/Lon/Alt
position_data = {
"latitude": (None, current_node_info["position"].get("latitude", 0.0)),
"longitude": (None, current_node_info["position"].get("longitude", 0.0)),
"altitude": (None, current_node_info["position"].get("altitude", 0))
}
# Get existing position menu items
existing_position_menu = menu_structure["Main Menu"]["Radio Settings"].get("position", {})
# Create an ordered position menu with Lat/Lon/Alt inserted in the middle
ordered_position_menu = OrderedDict()
for key, value in existing_position_menu.items():
if key == "fixed_position": # Insert before or after a specific key
ordered_position_menu[key] = value
ordered_position_menu.update(position_data) # Insert Lat/Lon/Alt **right here**
else:
ordered_position_menu[key] = value
# Update the menu with the new order
menu_structure["Main Menu"]["Radio Settings"]["position"] = ordered_position_menu
# Add Module Settings
module = module_config_pb2.ModuleConfig()
current_module_config = interface.localNode.moduleConfig if interface else None
@@ -86,10 +116,12 @@ def generate_menu_from_protobuf(interface):
menu_structure["Main Menu"]["App Settings"] = {"Open": "app_settings"}
# Add additional settings options
menu_structure["Main Menu"]["Reboot"] = settings_reboot
menu_structure["Main Menu"]["Reset Node DB"] = settings_reset_nodedb
menu_structure["Main Menu"]["Shutdown"] = settings_shutdown
menu_structure["Main Menu"]["Factory Reset"] = settings_factory_reset
menu_structure["Main Menu"]["Export Config"] = None
menu_structure["Main Menu"]["Load Config"] = None
menu_structure["Main Menu"]["Reboot"] = None
menu_structure["Main Menu"]["Reset Node DB"] = None
menu_structure["Main Menu"]["Shutdown"] = None
menu_structure["Main Menu"]["Factory Reset"] = None
# Add Exit option
menu_structure["Main Menu"]["Exit"] = None

View File

@@ -1,8 +1,9 @@
import curses
import json
import os
from ui.colors import get_color, setup_colors
import json
import curses
from ui.colors import get_color, setup_colors, COLOR_MAP
from default_config import format_json_single_line_arrays, loaded_config
from input_handlers import select_from_list
width = 60
save_option_text = "Save Changes"
@@ -12,81 +13,12 @@ def edit_color_pair(key, current_value):
"""
Allows the user to select a foreground and background color for a key.
"""
colors = [" ", "black", "red", "green", "yellow", "blue", "magenta", "cyan", "white"]
fg_color = select_color_from_list(f"Select Foreground Color for {key}", current_value[0], colors)
bg_color = select_color_from_list(f"Select Background Color for {key}", current_value[1], colors)
color_list = [" "] + list(COLOR_MAP.keys())
fg_color = select_from_list(f"Select Foreground Color for {key}", current_value[0], color_list)
bg_color = select_from_list(f"Select Background Color for {key}", current_value[1], color_list)
return [fg_color, bg_color]
def select_color_from_list(prompt, current_color, colors):
"""
Displays a scrollable list of colors for the user to choose from using a pad.
"""
selected_index = colors.index(current_color) if current_color in colors else 0
height = min(len(colors) + 5, curses.LINES - 2)
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
color_win = curses.newwin(height, width, start_y, start_x)
color_win.bkgd(get_color("background"))
color_win.attrset(get_color("window_frame"))
color_win.keypad(True)
color_pad = curses.newpad(len(colors) + 1, width - 8)
color_pad.bkgd(get_color("background"))
# Render header
color_win.clear()
color_win.border()
color_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
# Render color options on the pad
for idx, color in enumerate(colors):
if idx == selected_index:
color_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
color_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
# Initial refresh
color_win.refresh()
color_pad.refresh(0, 0,
color_win.getbegyx()[0] + 3, color_win.getbegyx()[1] + 4,
color_win.getbegyx()[0] + color_win.getmaxyx()[0] - 2, color_win.getbegyx()[1] + color_win.getmaxyx()[1] - 4)
while True:
key = color_win.getch()
if key == curses.KEY_UP:
if selected_index > 0:
selected_index -= 1
elif key == curses.KEY_DOWN:
if selected_index < len(colors) - 1:
selected_index += 1
elif key == curses.KEY_RIGHT or key == ord('\n'):
return colors[selected_index]
elif key == curses.KEY_LEFT or key == 27: # ESC key
return current_color
# Refresh the pad with updated selection and scroll offset
for idx, color in enumerate(colors):
if idx == selected_index:
color_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
color_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
color_win.refresh()
color_pad.refresh(0, 0,
color_win.getbegyx()[0] + 3, color_win.getbegyx()[1] + 4,
color_win.getbegyx()[0] + color_win.getmaxyx()[0] - 2, color_win.getbegyx()[1] + color_win.getmaxyx()[1] - 4)
def edit_value(key, current_value):
width = 60
height = 10
@@ -116,22 +48,23 @@ def edit_value(key, current_value):
if key == "theme":
# Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return select_color_from_list("Select Theme", current_value, theme_options)
return select_from_list("Select Theme", current_value, theme_options)
# Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1)
user_input = ""
scroll_offset = 0 # Determines which part of the text is visible
user_input = ""
input_position = (7, 13) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
visible_text = user_input[scroll_offset:scroll_offset + input_width] # Only show what fits
edit_win.addstr(7, 13, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(7, 13, visible_text, get_color("settings_default")) # Display text
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
edit_win.refresh()
edit_win.move(7, 13 + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
key = edit_win.get_wch()
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow

281
utilities/config_io.py Normal file
View File

@@ -0,0 +1,281 @@
import yaml
import logging
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import BROADCAST_ADDR, mt_config
from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main
def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = camel_to_snake(config_root)
for pref in config:
pref_name = f"{snake_name}.{pref}"
if isinstance(config[pref], dict):
traverseConfig(pref_name, config[pref], interface_config)
else:
setPref(interface_config, pref_name, config[pref])
return True
def splitCompoundName(comp_name: str) -> List[str]:
"""Split compound (dot separated) preference name into parts"""
name: List[str] = comp_name.split(".")
if len(name) < 2:
name[0] = comp_name
name.append(comp_name)
return name
def setPref(config, comp_name, raw_val) -> bool:
"""Set a channel or preferences value"""
name = splitCompoundName(comp_name)
snake_name = camel_to_snake(name[-1])
camel_name = snake_to_camel(name[-1])
uni_name = camel_name if mt_config.camel_case else snake_name
logging.debug(f"snake_name:{snake_name}")
logging.debug(f"camel_name:{camel_name}")
objDesc = config.DESCRIPTOR
config_part = config
config_type = objDesc.fields_by_name.get(name[0])
if config_type and config_type.message_type is not None:
for name_part in name[1:-1]:
part_snake_name = camel_to_snake((name_part))
config_part = getattr(config, config_type.name)
config_type = config_type.message_type.fields_by_name.get(part_snake_name)
pref = None
if config_type and config_type.message_type is not None:
pref = config_type.message_type.fields_by_name.get(snake_name)
# Others like ChannelSettings are standalone
elif config_type:
pref = config_type
if (not pref) or (not config_type):
return False
if isinstance(raw_val, str):
val = fromStr(raw_val)
else:
val = raw_val
logging.debug(f"valStr:{raw_val} val:{val}")
if snake_name == "wifi_psk" and len(str(raw_val)) < 8:
logging.info(f"Warning: network.wifi_psk must be 8 or more characters.")
return False
enumType = pref.enum_type
# pylint: disable=C0123
if enumType and type(val) == str:
# We've failed so far to convert this string into an enum, try to find it by reflection
e = enumType.values_by_name.get(val)
if e:
val = e.number
else:
logging.info(
f"{name[0]}.{uni_name} does not have an enum called {val}, so you can not set it."
)
logging.info(f"Choices in sorted order are:")
names = []
for f in enumType.values:
# Note: We must use the value of the enum (regardless if camel or snake case)
names.append(f"{f.name}")
for temp_name in sorted(names):
logging.info(f" {temp_name}")
return False
# repeating fields need to be handled with append, not setattr
if pref.label != pref.LABEL_REPEATED:
try:
if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name)
setattr(config_values, pref.name, val)
else:
setattr(config_part, snake_name, val)
except TypeError:
# The setter didn't like our arg type guess try again as a string
config_values = getattr(config_part, config_type.name)
setattr(config_values, pref.name, str(val))
elif type(val) == list:
new_vals = [fromStr(x) for x in val]
config_values = getattr(config, config_type.name)
getattr(config_values, pref.name)[:] = new_vals
else:
config_values = getattr(config, config_type.name)
if val == 0:
# clear values
logging.info(f"Clearing {pref.name} list")
del getattr(config_values, pref.name)[:]
else:
logging.info(f"Adding '{raw_val}' to the {pref.name} list")
cur_vals = [x for x in getattr(config_values, pref.name) if x not in [0, "", b""]]
cur_vals.append(val)
getattr(config_values, pref.name)[:] = cur_vals
return True
prefix = f"{'.'.join(name[0:-1])}." if config_type.message_type is not None else ""
logging.info(f"Set {prefix}{uni_name} to {raw_val}")
return True
def config_import(interface, filename):
with open(filename, encoding="utf8") as file:
configuration = yaml.safe_load(file)
closeNow = True
interface.getNode('^local', False).beginSettingsTransaction()
if "owner" in configuration:
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode('^local', False).setOwner(configuration["owner"])
if "owner_short" in configuration:
logging.info(
f"Setting device owner short to {configuration['owner_short']}"
)
waitForAckNak = True
interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["owner_short"]
)
if "ownerShort" in configuration:
logging.info(
f"Setting device owner short to {configuration['ownerShort']}"
)
waitForAckNak = True
interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["ownerShort"]
)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode('^local').setURL(configuration["channel_url"])
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode('^local').setURL(configuration["channelUrl"])
if "location" in configuration:
alt = 0
lat = 0.0
lon = 0.0
localConfig = interface.localNode.localConfig
if "alt" in configuration["location"]:
alt = int(configuration["location"]["alt"] or 0)
logging.info(f"Fixing altitude at {alt} meters")
if "lat" in configuration["location"]:
lat = float(configuration["location"]["lat"] or 0)
logging.info(f"Fixing latitude at {lat} degrees")
if "lon" in configuration["location"]:
lon = float(configuration["location"]["lon"] or 0)
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
if "config" in configuration:
localConfig = interface.getNode('^local').localConfig
for section in configuration["config"]:
traverseConfig(
section, configuration["config"][section], localConfig
)
interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
if "module_config" in configuration:
moduleConfig = interface.getNode('^local').moduleConfig
for section in configuration["module_config"]:
traverseConfig(
section,
configuration["module_config"][section],
moduleConfig,
)
interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
interface.getNode('^local', False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")
def config_export(interface) -> str:
"""used in --export-config"""
configObj = {}
owner = interface.getLongName()
owner_short = interface.getShortName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
pos = myinfo.get("position")
lat = None
lon = None
alt = None
if pos:
lat = pos.get("latitude")
lon = pos.get("longitude")
alt = pos.get("altitude")
if owner:
configObj["owner"] = owner
if owner_short:
configObj["owner_short"] = owner_short
if channel_url:
if mt_config.camel_case:
configObj["channelUrl"] = channel_url
else:
configObj["channel_url"] = channel_url
# lat and lon don't make much sense without the other (so fill with 0s), and alt isn't meaningful without both
if lat or lon:
configObj["location"] = {"lat": lat or float(0), "lon": lon or float(0)}
if alt:
configObj["location"]["alt"] = alt
config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
if config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in config:
if mt_config.camel_case:
prefs[snake_to_camel(pref)] = config[pref]
else:
prefs[pref] = config[pref]
# mark base64 encoded fields as such
if pref == "security":
if 'privateKey' in prefs[pref]:
prefs[pref]['privateKey'] = 'base64:' + prefs[pref]['privateKey']
if 'publicKey' in prefs[pref]:
prefs[pref]['publicKey'] = 'base64:' + prefs[pref]['publicKey']
if 'adminKey' in prefs[pref]:
for i in range(len(prefs[pref]['adminKey'])):
prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i]
if mt_config.camel_case:
configObj["config"] = config #Identical command here and 2 lines below?
else:
configObj["config"] = config
module_config = MessageToDict(interface.localNode.moduleConfig)
if module_config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in module_config:
if len(module_config[pref]) > 0:
prefs[pref] = module_config[pref]
if mt_config.camel_case:
configObj["module_config"] = prefs
else:
configObj["module_config"] = prefs
config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out)
#was used as a string here and a Dictionary above
config_txt += yaml.dump(configObj)
# logging.info(config_txt)
return config_txt

View File

@@ -3,14 +3,21 @@ import meshtastic.serial_interface, meshtastic.tcp_interface, meshtastic.ble_int
import globals
def initialize_interface(args):
if args.ble:
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
elif args.host:
return meshtastic.tcp_interface.TCPInterface(args.host)
else:
try:
return meshtastic.serial_interface.SerialInterface(args.port)
except PermissionError as ex:
logging.error("You probably need to add yourself to the `dialout` group to use a serial connection.")
if globals.interface.devPath is None:
return meshtastic.tcp_interface.TCPInterface("meshtastic.local")
try:
if args.ble:
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
elif args.host:
return meshtastic.tcp_interface.TCPInterface(args.host)
else:
try:
return meshtastic.serial_interface.SerialInterface(args.port)
except PermissionError as ex:
logging.error(f"You probably need to add yourself to the `dialout` group to use a serial connection. {ex}")
except Exception as ex:
logging.error(f"Unexpected error initializing interface: {ex}")
if globals.interface.devPath is None:
return meshtastic.tcp_interface.TCPInterface("meshtastic.local")
except Exception as ex:
logging.critical(f"Fatal error initializing interface: {ex}")

View File

@@ -1,7 +1,6 @@
import globals
from datetime import datetime
from meshtastic.protobuf import config_pb2
import re
def get_channels():
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""