From ad7c7a148f99c64be2fa9f5f19f7dfd02359c523 Mon Sep 17 00:00:00 2001 From: pdxlocations <117498748+pdxlocations@users.noreply.github.com> Date: Fri, 31 Jan 2025 16:42:55 -0800 Subject: [PATCH] Add Config Import and Export (#98) * init * working changes * working changes * remove unused code --- .gitignore | 3 +- input_handlers.py | 71 ++++++++++- settings.py | 60 ++++++++- ui/menus.py | 15 ++- user_config.py | 70 +--------- utilities/config_io.py | 281 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 421 insertions(+), 79 deletions(-) create mode 100644 utilities/config_io.py diff --git a/.gitignore b/.gitignore index 5fd14eb..5bceb25 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,10 @@ venv/ .venv/ __pycache__/ +node-configs/ client.db .DS_Store client.log settings.log config.json -default_config.log +default_config.log \ No newline at end of file diff --git a/input_handlers.py b/input_handlers.py index 36b71fb..87196a4 100644 --- a/input_handlers.py +++ b/input_handlers.py @@ -258,4 +258,73 @@ def get_fixed32_input(current_value): if char.isdigit() or char == ".": user_input += char # Append only valid characters (digits or dots) except ValueError: - pass # Ignore invalid inputs \ No newline at end of file + pass # Ignore invalid inputs + + + +def select_from_list(prompt, current_option, list_options): + """ + Displays a scrollable list of list_options for the user to choose from using a pad. + """ + selected_index = list_options.index(current_option) if current_option in list_options else 0 + + height = min(len(list_options) + 5, curses.LINES - 2) + width = 60 + start_y = (curses.LINES - height) // 2 + start_x = (curses.COLS - width) // 2 + + list_win = curses.newwin(height, width, start_y, start_x) + list_win.bkgd(get_color("background")) + list_win.attrset(get_color("window_frame")) + list_win.keypad(True) + + list_pad = curses.newpad(len(list_options) + 1, width - 8) + list_pad.bkgd(get_color("background")) + + # Render header + list_win.clear() + list_win.border() + list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True)) + + # Render options on the pad + for idx, color in enumerate(list_options): + if idx == selected_index: + list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True)) + else: + list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default")) + + # Initial refresh + list_win.refresh() + list_pad.refresh(0, 0, + list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4, + list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4) + + while True: + key = list_win.getch() + + if key == curses.KEY_UP: + + if selected_index > 0: + selected_index -= 1 + + elif key == curses.KEY_DOWN: + if selected_index < len(list_options) - 1: + selected_index += 1 + + elif key == curses.KEY_RIGHT or key == ord('\n'): + return list_options[selected_index] + + elif key == curses.KEY_LEFT or key == 27: # ESC key + return current_option + + # Refresh the pad with updated selection and scroll offset + for idx, color in enumerate(list_options): + if idx == selected_index: + list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True)) + else: + list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default")) + + list_win.refresh() + list_pad.refresh(0, 0, + list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4, + list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4) diff --git a/settings.py b/settings.py index 1e139be..35c6f38 100644 --- a/settings.py +++ b/settings.py @@ -1,8 +1,10 @@ import curses import logging +import os from save_to_radio import settings_factory_reset, settings_reboot, settings_reset_nodedb, settings_shutdown, save_changes -from input_handlers import get_bool_selection, get_repeated_input, get_user_input, get_enum_input, get_fixed32_input +from utilities.config_io import config_export, config_import +from input_handlers import get_bool_selection, get_repeated_input, get_user_input, get_enum_input, get_fixed32_input, select_from_list from ui.menus import generate_menu_from_protobuf from ui.colors import setup_colors, get_color from utilities.arg_parser import setup_parser @@ -166,6 +168,62 @@ def settings_menu(stdscr, interface): if selected_option == "Exit": break + + + elif selected_option == "Export Config": + filename = get_user_input("Enter a filename for the config file") + + if not filename: + logging.warning("Export aborted: No filename provided.") + continue # Go back to the menu + + if not filename.lower().endswith(".yaml"): + filename += ".yaml" + + try: + config_text = config_export(globals.interface) + app_directory = os.path.dirname(os.path.abspath(__file__)) + config_folder = "node-configs" + yaml_file_path = os.path.join(app_directory, config_folder, filename) + + if os.path.exists(yaml_file_path): + overwrite = get_bool_selection(f"{filename} already exists. Overwrite?", None) + if overwrite == "False": + logging.info("Export cancelled: User chose not to overwrite.") + continue # Return to menu + os.makedirs(os.path.dirname(yaml_file_path), exist_ok=True) + with open(yaml_file_path, "w", encoding="utf-8") as file: + file.write(config_text) + logging.info(f"Config file saved to {yaml_file_path}") + break + except PermissionError: + logging.error(f"Permission denied: Unable to write to {yaml_file_path}") + except OSError as e: + logging.error(f"OS error while saving config: {e}") + except Exception as e: + logging.error(f"Unexpected error: {e}") + continue + + + + + elif selected_option == "Load Config": + + app_directory = os.path.dirname(os.path.abspath(__file__)) + config_folder = "node-configs" + folder_path = os.path.join(app_directory, config_folder) + file_list = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))] + filename = select_from_list("Choose a config file", None, file_list) + if filename: + file_path = os.path.join(app_directory, config_folder, filename) + overwrite = get_bool_selection(f"Are you sure you want to load {filename}?", None) + if overwrite == "True": + config_import(globals.interface, file_path) + break + continue + + + elif selected_option == "Reboot": confirmation = get_bool_selection("Are you sure you want to Reboot?", 0) if confirmation == "True": diff --git a/ui/menus.py b/ui/menus.py index 2fe107e..2098572 100644 --- a/ui/menus.py +++ b/ui/menus.py @@ -1,9 +1,8 @@ from collections import OrderedDict -from meshtastic.protobuf import config_pb2, module_config_pb2, channel_pb2, mesh_pb2 -from save_to_radio import settings_reboot, settings_factory_reset, settings_reset_nodedb, settings_shutdown +from meshtastic.protobuf import config_pb2, module_config_pb2, channel_pb2 import logging, traceback import base64 -import globals + def extract_fields(message_instance, current_config=None): if isinstance(current_config, dict): # Handle dictionaries @@ -117,10 +116,12 @@ def generate_menu_from_protobuf(interface): menu_structure["Main Menu"]["App Settings"] = {"Open": "app_settings"} # Add additional settings options - menu_structure["Main Menu"]["Reboot"] = settings_reboot - menu_structure["Main Menu"]["Reset Node DB"] = settings_reset_nodedb - menu_structure["Main Menu"]["Shutdown"] = settings_shutdown - menu_structure["Main Menu"]["Factory Reset"] = settings_factory_reset + menu_structure["Main Menu"]["Export Config"] = None + menu_structure["Main Menu"]["Load Config"] = None + menu_structure["Main Menu"]["Reboot"] = None + menu_structure["Main Menu"]["Reset Node DB"] = None + menu_structure["Main Menu"]["Shutdown"] = None + menu_structure["Main Menu"]["Factory Reset"] = None # Add Exit option menu_structure["Main Menu"]["Exit"] = None diff --git a/user_config.py b/user_config.py index 613c199..4bed606 100644 --- a/user_config.py +++ b/user_config.py @@ -3,6 +3,7 @@ import json import curses from ui.colors import get_color, setup_colors, COLOR_MAP from default_config import format_json_single_line_arrays, loaded_config +from input_handlers import select_from_list width = 60 save_option_text = "Save Changes" @@ -18,75 +19,6 @@ def edit_color_pair(key, current_value): return [fg_color, bg_color] - -def select_from_list(prompt, current_option, list_options): - """ - Displays a scrollable list of list_options for the user to choose from using a pad. - """ - selected_index = list_options.index(current_option) if current_option in list_options else 0 - - height = min(len(list_options) + 5, curses.LINES - 2) - width = 60 - start_y = (curses.LINES - height) // 2 - start_x = (curses.COLS - width) // 2 - - list_win = curses.newwin(height, width, start_y, start_x) - list_win.bkgd(get_color("background")) - list_win.attrset(get_color("window_frame")) - list_win.keypad(True) - - list_pad = curses.newpad(len(list_options) + 1, width - 8) - list_pad.bkgd(get_color("background")) - - # Render header - list_win.clear() - list_win.border() - list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True)) - - # Render options on the pad - for idx, color in enumerate(list_options): - if idx == selected_index: - list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True)) - else: - list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default")) - - # Initial refresh - list_win.refresh() - list_pad.refresh(0, 0, - list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4, - list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4) - - while True: - key = list_win.getch() - - if key == curses.KEY_UP: - - if selected_index > 0: - selected_index -= 1 - - elif key == curses.KEY_DOWN: - if selected_index < len(list_options) - 1: - selected_index += 1 - - elif key == curses.KEY_RIGHT or key == ord('\n'): - return list_options[selected_index] - - elif key == curses.KEY_LEFT or key == 27: # ESC key - return current_option - - # Refresh the pad with updated selection and scroll offset - for idx, color in enumerate(list_options): - if idx == selected_index: - list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True)) - else: - list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default")) - - - list_win.refresh() - list_pad.refresh(0, 0, - list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4, - list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4) - def edit_value(key, current_value): width = 60 height = 10 diff --git a/utilities/config_io.py b/utilities/config_io.py new file mode 100644 index 0000000..3f54ed4 --- /dev/null +++ b/utilities/config_io.py @@ -0,0 +1,281 @@ + +import yaml +import logging +from typing import List +from google.protobuf.json_format import MessageToDict +from meshtastic import BROADCAST_ADDR, mt_config +from meshtastic.util import camel_to_snake, snake_to_camel, fromStr + +# defs are from meshtastic/python/main + +def traverseConfig(config_root, config, interface_config) -> bool: + """Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference""" + snake_name = camel_to_snake(config_root) + for pref in config: + pref_name = f"{snake_name}.{pref}" + if isinstance(config[pref], dict): + traverseConfig(pref_name, config[pref], interface_config) + else: + setPref(interface_config, pref_name, config[pref]) + + return True + +def splitCompoundName(comp_name: str) -> List[str]: + """Split compound (dot separated) preference name into parts""" + name: List[str] = comp_name.split(".") + if len(name) < 2: + name[0] = comp_name + name.append(comp_name) + return name + +def setPref(config, comp_name, raw_val) -> bool: + """Set a channel or preferences value""" + + name = splitCompoundName(comp_name) + + snake_name = camel_to_snake(name[-1]) + camel_name = snake_to_camel(name[-1]) + uni_name = camel_name if mt_config.camel_case else snake_name + logging.debug(f"snake_name:{snake_name}") + logging.debug(f"camel_name:{camel_name}") + + objDesc = config.DESCRIPTOR + config_part = config + config_type = objDesc.fields_by_name.get(name[0]) + if config_type and config_type.message_type is not None: + for name_part in name[1:-1]: + part_snake_name = camel_to_snake((name_part)) + config_part = getattr(config, config_type.name) + config_type = config_type.message_type.fields_by_name.get(part_snake_name) + pref = None + if config_type and config_type.message_type is not None: + pref = config_type.message_type.fields_by_name.get(snake_name) + # Others like ChannelSettings are standalone + elif config_type: + pref = config_type + + if (not pref) or (not config_type): + return False + + if isinstance(raw_val, str): + val = fromStr(raw_val) + else: + val = raw_val + logging.debug(f"valStr:{raw_val} val:{val}") + + if snake_name == "wifi_psk" and len(str(raw_val)) < 8: + logging.info(f"Warning: network.wifi_psk must be 8 or more characters.") + return False + + enumType = pref.enum_type + # pylint: disable=C0123 + if enumType and type(val) == str: + # We've failed so far to convert this string into an enum, try to find it by reflection + e = enumType.values_by_name.get(val) + if e: + val = e.number + else: + logging.info( + f"{name[0]}.{uni_name} does not have an enum called {val}, so you can not set it." + ) + logging.info(f"Choices in sorted order are:") + names = [] + for f in enumType.values: + # Note: We must use the value of the enum (regardless if camel or snake case) + names.append(f"{f.name}") + for temp_name in sorted(names): + logging.info(f" {temp_name}") + return False + + # repeating fields need to be handled with append, not setattr + if pref.label != pref.LABEL_REPEATED: + try: + if config_type.message_type is not None: + config_values = getattr(config_part, config_type.name) + setattr(config_values, pref.name, val) + else: + setattr(config_part, snake_name, val) + except TypeError: + # The setter didn't like our arg type guess try again as a string + config_values = getattr(config_part, config_type.name) + setattr(config_values, pref.name, str(val)) + elif type(val) == list: + new_vals = [fromStr(x) for x in val] + config_values = getattr(config, config_type.name) + getattr(config_values, pref.name)[:] = new_vals + else: + config_values = getattr(config, config_type.name) + if val == 0: + # clear values + logging.info(f"Clearing {pref.name} list") + del getattr(config_values, pref.name)[:] + else: + logging.info(f"Adding '{raw_val}' to the {pref.name} list") + cur_vals = [x for x in getattr(config_values, pref.name) if x not in [0, "", b""]] + cur_vals.append(val) + getattr(config_values, pref.name)[:] = cur_vals + return True + + prefix = f"{'.'.join(name[0:-1])}." if config_type.message_type is not None else "" + logging.info(f"Set {prefix}{uni_name} to {raw_val}") + + return True + + + +def config_import(interface, filename): + with open(filename, encoding="utf8") as file: + configuration = yaml.safe_load(file) + closeNow = True + + interface.getNode('^local', False).beginSettingsTransaction() + + if "owner" in configuration: + logging.info(f"Setting device owner to {configuration['owner']}") + waitForAckNak = True + interface.getNode('^local', False).setOwner(configuration["owner"]) + + if "owner_short" in configuration: + logging.info( + f"Setting device owner short to {configuration['owner_short']}" + ) + waitForAckNak = True + interface.getNode('^local', False).setOwner( + long_name=None, short_name=configuration["owner_short"] + ) + + if "ownerShort" in configuration: + logging.info( + f"Setting device owner short to {configuration['ownerShort']}" + ) + waitForAckNak = True + interface.getNode('^local', False).setOwner( + long_name=None, short_name=configuration["ownerShort"] + ) + + if "channel_url" in configuration: + logging.info(f"Setting channel url to {configuration['channel_url']}") + interface.getNode('^local').setURL(configuration["channel_url"]) + + if "channelUrl" in configuration: + logging.info(f"Setting channel url to {configuration['channelUrl']}") + interface.getNode('^local').setURL(configuration["channelUrl"]) + + if "location" in configuration: + alt = 0 + lat = 0.0 + lon = 0.0 + localConfig = interface.localNode.localConfig + + if "alt" in configuration["location"]: + alt = int(configuration["location"]["alt"] or 0) + logging.info(f"Fixing altitude at {alt} meters") + if "lat" in configuration["location"]: + lat = float(configuration["location"]["lat"] or 0) + logging.info(f"Fixing latitude at {lat} degrees") + if "lon" in configuration["location"]: + lon = float(configuration["location"]["lon"] or 0) + logging.info(f"Fixing longitude at {lon} degrees") + logging.info("Setting device position") + interface.localNode.setFixedPosition(lat, lon, alt) + + if "config" in configuration: + localConfig = interface.getNode('^local').localConfig + for section in configuration["config"]: + traverseConfig( + section, configuration["config"][section], localConfig + ) + interface.getNode('^local').writeConfig( + camel_to_snake(section) + ) + + if "module_config" in configuration: + moduleConfig = interface.getNode('^local').moduleConfig + for section in configuration["module_config"]: + traverseConfig( + section, + configuration["module_config"][section], + moduleConfig, + ) + interface.getNode('^local').writeConfig( + camel_to_snake(section) + ) + + interface.getNode('^local', False).commitSettingsTransaction() + logging.info("Writing modified configuration to device") + + + +def config_export(interface) -> str: + """used in --export-config""" + configObj = {} + + owner = interface.getLongName() + owner_short = interface.getShortName() + channel_url = interface.localNode.getURL() + myinfo = interface.getMyNodeInfo() + pos = myinfo.get("position") + lat = None + lon = None + alt = None + if pos: + lat = pos.get("latitude") + lon = pos.get("longitude") + alt = pos.get("altitude") + + if owner: + configObj["owner"] = owner + if owner_short: + configObj["owner_short"] = owner_short + if channel_url: + if mt_config.camel_case: + configObj["channelUrl"] = channel_url + else: + configObj["channel_url"] = channel_url + # lat and lon don't make much sense without the other (so fill with 0s), and alt isn't meaningful without both + if lat or lon: + configObj["location"] = {"lat": lat or float(0), "lon": lon or float(0)} + if alt: + configObj["location"]["alt"] = alt + + config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below + if config: + # Convert inner keys to correct snake/camelCase + prefs = {} + for pref in config: + if mt_config.camel_case: + prefs[snake_to_camel(pref)] = config[pref] + else: + prefs[pref] = config[pref] + # mark base64 encoded fields as such + if pref == "security": + if 'privateKey' in prefs[pref]: + prefs[pref]['privateKey'] = 'base64:' + prefs[pref]['privateKey'] + if 'publicKey' in prefs[pref]: + prefs[pref]['publicKey'] = 'base64:' + prefs[pref]['publicKey'] + if 'adminKey' in prefs[pref]: + for i in range(len(prefs[pref]['adminKey'])): + prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i] + if mt_config.camel_case: + configObj["config"] = config #Identical command here and 2 lines below? + else: + configObj["config"] = config + + module_config = MessageToDict(interface.localNode.moduleConfig) + if module_config: + # Convert inner keys to correct snake/camelCase + prefs = {} + for pref in module_config: + if len(module_config[pref]) > 0: + prefs[pref] = module_config[pref] + if mt_config.camel_case: + configObj["module_config"] = prefs + else: + configObj["module_config"] = prefs + + config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out) + #was used as a string here and a Dictionary above + config_txt += yaml.dump(configObj) + + # logging.info(config_txt) + return config_txt \ No newline at end of file