mirror of
https://github.com/pdxlocations/contact.git
synced 2026-03-28 17:12:35 +01:00
Compare commits
13 Commits
check-db-f
...
input-vali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cf4137347f | ||
|
|
12f81f6af6 | ||
|
|
684d298fac | ||
|
|
4179a3f8d0 | ||
|
|
7a61808f47 | ||
|
|
a8680ac0ed | ||
|
|
818575939a | ||
|
|
acaad849b0 | ||
|
|
0b25dda4af | ||
|
|
18329f0128 | ||
|
|
4378f3045c | ||
|
|
a451d1d7d6 | ||
|
|
fe1f027219 |
@@ -66,7 +66,7 @@ def prompt_region_if_unset(args: object) -> None:
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
|
||||
def initialize_globals(args: object) -> None:
|
||||
def initialize_globals() -> None:
|
||||
"""Initializes interface and shared globals."""
|
||||
|
||||
interface_state.myNodeNum = get_nodeNum()
|
||||
@@ -99,7 +99,7 @@ def main(stdscr: curses.window) -> None:
|
||||
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
||||
prompt_region_if_unset(args)
|
||||
|
||||
initialize_globals(args)
|
||||
initialize_globals()
|
||||
logging.info("Starting main UI")
|
||||
|
||||
try:
|
||||
|
||||
@@ -335,7 +335,6 @@ def handle_ctrl_t(stdscr: curses.window) -> None:
|
||||
send_traceroute()
|
||||
curses.curs_set(0) # Hide cursor
|
||||
contact.ui.dialog.dialog(
|
||||
stdscr,
|
||||
f"Traceroute Sent To: {get_name_from_database(ui_state.node_list[ui_state.selected_node])}",
|
||||
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
|
||||
)
|
||||
|
||||
@@ -150,6 +150,15 @@ def draw_help_window(
|
||||
)
|
||||
|
||||
|
||||
def get_input_type_for_field(field) -> type:
|
||||
if field.type in (field.TYPE_INT32, field.TYPE_UINT32, field.TYPE_INT64):
|
||||
return int
|
||||
elif field.type in (field.TYPE_FLOAT, field.TYPE_DOUBLE):
|
||||
return float
|
||||
else:
|
||||
return str
|
||||
|
||||
|
||||
def settings_menu(stdscr: object, interface: object) -> None:
|
||||
curses.update_lines_cols()
|
||||
|
||||
@@ -268,7 +277,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
break
|
||||
|
||||
elif selected_option == "Export Config File":
|
||||
filename = get_text_input("Enter a filename for the config file")
|
||||
|
||||
filename = get_text_input("Enter a filename for the config file", None, None)
|
||||
if not filename:
|
||||
logging.info("Export aborted: No filename provided.")
|
||||
menu_state.start_index.pop()
|
||||
@@ -290,7 +300,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
with open(yaml_file_path, "w", encoding="utf-8") as file:
|
||||
file.write(config_text)
|
||||
logging.info(f"Config file saved to {yaml_file_path}")
|
||||
dialog(stdscr, "Config File Saved:", yaml_file_path)
|
||||
dialog("Config File Saved:", yaml_file_path)
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
except PermissionError:
|
||||
@@ -306,14 +316,14 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
# Check if folder exists and is not empty
|
||||
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
|
||||
dialog(stdscr, "", " No config files found. Export a config first.")
|
||||
dialog("", " No config files found. Export a config first.")
|
||||
continue # Return to menu
|
||||
|
||||
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
|
||||
|
||||
# Ensure file_list is not empty before proceeding
|
||||
if not file_list:
|
||||
dialog(stdscr, "", " No config files found. Export a config first.")
|
||||
dialog("", " No config files found. Export a config first.")
|
||||
continue
|
||||
|
||||
filename = get_list_input("Choose a config file", None, file_list)
|
||||
@@ -327,7 +337,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
elif selected_option == "Config URL":
|
||||
current_value = interface.localNode.getURL()
|
||||
new_value = get_text_input(f"Config URL is currently: {current_value}")
|
||||
new_value = get_text_input(f"Config URL is currently: {current_value}", None, str)
|
||||
if new_value is not None:
|
||||
current_value = new_value
|
||||
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
|
||||
@@ -395,7 +405,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
if selected_option in ["longName", "shortName", "isLicensed"]:
|
||||
if selected_option in ["longName", "shortName"]:
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, None
|
||||
)
|
||||
new_value = current_value if new_value is None else new_value
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
|
||||
@@ -414,7 +426,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif selected_option in ["latitude", "longitude", "altitude"]:
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, float
|
||||
)
|
||||
new_value = current_value if new_value is None else new_value
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
|
||||
@@ -453,17 +467,26 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 13: # Field type 13 corresponds to UINT32
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
input_type = get_input_type_for_field(field)
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
|
||||
)
|
||||
new_value = current_value if new_value is None else int(new_value)
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 2: # Field type 13 corresponds to INT64
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
input_type = get_input_type_for_field(field)
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
|
||||
)
|
||||
new_value = current_value if new_value is None else float(new_value)
|
||||
menu_state.start_index.pop()
|
||||
|
||||
else: # Handle other field types
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
input_type = get_input_type_for_field(field)
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
|
||||
)
|
||||
new_value = current_value if new_value is None else new_value
|
||||
menu_state.start_index.pop()
|
||||
|
||||
|
||||
@@ -7,6 +7,11 @@ from typing import Dict
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
|
||||
|
||||
# To test writting to a non-writable directory, you can uncomment the following lines:
|
||||
# mkdir /tmp/test_nonwritable
|
||||
# chmod -w /tmp/test_nonwritable
|
||||
# parent_dir = "/tmp/test_nonwritable"
|
||||
|
||||
|
||||
def _is_writable_dir(path: str) -> bool:
|
||||
"""
|
||||
|
||||
@@ -2,14 +2,13 @@ import curses
|
||||
from contact.ui.colors import get_color
|
||||
|
||||
|
||||
def dialog(stdscr: curses.window, title: str, message: str) -> None:
|
||||
height, width = stdscr.getmaxyx()
|
||||
def dialog(title: str, message: str) -> None:
|
||||
|
||||
height, width = curses.LINES, curses.COLS
|
||||
|
||||
# Calculate dialog dimensions
|
||||
max_line_lengh = 0
|
||||
message_lines = message.splitlines()
|
||||
for l in message_lines:
|
||||
max_line_length = max(len(l), max_line_lengh)
|
||||
max_line_length = max(len(l) for l in message_lines)
|
||||
dialog_width = max(len(title) + 4, max_line_length + 4)
|
||||
dialog_height = len(message_lines) + 4
|
||||
x = (width - dialog_width) // 2
|
||||
@@ -24,12 +23,19 @@ def dialog(stdscr: curses.window, title: str, message: str) -> None:
|
||||
# Add title
|
||||
win.addstr(0, 2, title, get_color("settings_default"))
|
||||
|
||||
# Add message
|
||||
for i, l in enumerate(message_lines):
|
||||
win.addstr(2 + i, 2, l, get_color("settings_default"))
|
||||
# Add message (centered)
|
||||
for i, line in enumerate(message_lines):
|
||||
msg_x = (dialog_width - len(line)) // 2
|
||||
win.addstr(2 + i, msg_x, line, get_color("settings_default"))
|
||||
|
||||
# Add button
|
||||
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
|
||||
# Add centered OK button
|
||||
ok_text = " Ok "
|
||||
win.addstr(
|
||||
dialog_height - 2,
|
||||
(dialog_width - len(ok_text)) // 2,
|
||||
ok_text,
|
||||
get_color("settings_default", reverse=True),
|
||||
)
|
||||
|
||||
# Refresh dialog window
|
||||
win.refresh()
|
||||
@@ -37,8 +43,7 @@ def dialog(stdscr: curses.window, title: str, message: str) -> None:
|
||||
# Get user input
|
||||
while True:
|
||||
char = win.getch()
|
||||
# Close dialog with enter, space, or esc
|
||||
if char in (curses.KEY_ENTER, 10, 13, 32, 27):
|
||||
if char in (curses.KEY_ENTER, 10, 13, 32, 27): # Enter, space, or Esc
|
||||
win.erase()
|
||||
win.refresh()
|
||||
return
|
||||
|
||||
@@ -6,10 +6,43 @@ from typing import Any, Optional, List
|
||||
|
||||
from contact.ui.colors import get_color
|
||||
from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text
|
||||
from contact.ui.dialog import dialog
|
||||
from contact.utilities.validation_rules import get_validation_for
|
||||
|
||||
|
||||
def get_text_input(prompt: str) -> Optional[str]:
|
||||
def invalid_input(window: curses.window, message: str, redraw_func: Optional[callable] = None) -> None:
|
||||
"""Displays an invalid input message in the given window and redraws if needed."""
|
||||
cursor_y, cursor_x = window.getyx()
|
||||
curses.curs_set(0)
|
||||
dialog("Invalid Input", message)
|
||||
if redraw_func:
|
||||
redraw_func() # Redraw the original window content that got obscured
|
||||
else:
|
||||
window.refresh()
|
||||
window.move(cursor_y, cursor_x)
|
||||
curses.curs_set(1)
|
||||
|
||||
|
||||
def get_text_input(prompt: str, selected_config: str, input_type: str) -> Optional[str]:
|
||||
"""Handles user input with wrapped text for long prompts."""
|
||||
|
||||
def redraw_input_win():
|
||||
"""Redraw the input window with the current prompt and user input."""
|
||||
input_win.erase()
|
||||
input_win.border()
|
||||
row = 1
|
||||
for line in wrapped_prompt:
|
||||
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
|
||||
row += 1
|
||||
if row >= height - 3:
|
||||
break
|
||||
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
|
||||
input_win.addstr(row + 1, col_start, user_input[:first_line_width], get_color("settings_default"))
|
||||
for i, line in enumerate(wrap_text(user_input[first_line_width:], wrap_width=input_width)):
|
||||
if row + 2 + i < height - 1:
|
||||
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
|
||||
input_win.refresh()
|
||||
|
||||
height = 8
|
||||
width = 80
|
||||
margin = 2 # Left and right margin
|
||||
@@ -27,6 +60,7 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
# Wrap the prompt text
|
||||
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
|
||||
row = 1
|
||||
|
||||
for line in wrapped_prompt:
|
||||
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
|
||||
row += 1
|
||||
@@ -39,34 +73,115 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
input_win.refresh()
|
||||
curses.curs_set(1)
|
||||
|
||||
max_length = 4 if "shortName" in prompt else None
|
||||
user_input = ""
|
||||
min_value = 0
|
||||
max_value = 4294967295
|
||||
min_length = 0
|
||||
max_length = None
|
||||
|
||||
# Start user input after the prompt text
|
||||
if selected_config is not None:
|
||||
validation = get_validation_for(selected_config) or {}
|
||||
min_value = validation.get("min_value", 0)
|
||||
max_value = validation.get("max_value", 4294967295)
|
||||
min_length = validation.get("min_length", 0)
|
||||
max_length = validation.get("max_length")
|
||||
|
||||
user_input = ""
|
||||
col_start = margin + len(prompt_text)
|
||||
first_line_width = input_width - len(prompt_text) # Available space for first line
|
||||
first_line_width = input_width - len(prompt_text)
|
||||
|
||||
while True:
|
||||
key = input_win.get_wch() # Waits for user input
|
||||
key = input_win.get_wch()
|
||||
|
||||
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
|
||||
if key == chr(27) or key == curses.KEY_LEFT:
|
||||
input_win.erase()
|
||||
input_win.refresh()
|
||||
curses.curs_set(0)
|
||||
return None # Exit without saving
|
||||
return None
|
||||
|
||||
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): # Enter key
|
||||
break
|
||||
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
|
||||
if not user_input.strip():
|
||||
invalid_input(input_win, "Value cannot be empty.", redraw_func=redraw_input_win)
|
||||
continue
|
||||
|
||||
length = len(user_input)
|
||||
if min_length == max_length and max_length is not None:
|
||||
if length != min_length:
|
||||
invalid_input(
|
||||
input_win, f"Value must be exactly {min_length} characters long.", redraw_func=redraw_input_win
|
||||
)
|
||||
continue
|
||||
else:
|
||||
if length < min_length:
|
||||
invalid_input(
|
||||
input_win,
|
||||
f"Value must be at least {min_length} characters long.",
|
||||
redraw_func=redraw_input_win,
|
||||
)
|
||||
continue
|
||||
if max_length is not None and length > max_length:
|
||||
invalid_input(
|
||||
input_win,
|
||||
f"Value must be no more than {max_length} characters long.",
|
||||
redraw_func=redraw_input_win,
|
||||
)
|
||||
continue
|
||||
|
||||
if input_type is int:
|
||||
if not user_input.isdigit():
|
||||
invalid_input(input_win, "Only numeric digits (0–9) allowed.", redraw_func=redraw_input_win)
|
||||
continue
|
||||
|
||||
int_val = int(user_input)
|
||||
if not (min_value <= int_val <= max_value):
|
||||
invalid_input(
|
||||
input_win, f"Enter a number between {min_value} and {max_value}.", redraw_func=redraw_input_win
|
||||
)
|
||||
continue
|
||||
|
||||
curses.curs_set(0)
|
||||
return int_val
|
||||
|
||||
elif input_type is float:
|
||||
try:
|
||||
float_val = float(user_input)
|
||||
if not (min_value <= float_val <= max_value):
|
||||
invalid_input(
|
||||
input_win,
|
||||
f"Enter a number between {min_value} and {max_value}.",
|
||||
redraw_func=redraw_input_win,
|
||||
)
|
||||
continue
|
||||
except ValueError:
|
||||
invalid_input(input_win, "Must be a valid floating point number.", redraw_func=redraw_input_win)
|
||||
continue
|
||||
else:
|
||||
curses.curs_set(0)
|
||||
return float_val
|
||||
|
||||
else:
|
||||
break
|
||||
|
||||
elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace
|
||||
if user_input:
|
||||
user_input = user_input[:-1] # Remove last character
|
||||
|
||||
elif max_length is None or len(user_input) < max_length: # Enforce max length
|
||||
if isinstance(key, str):
|
||||
user_input += key
|
||||
else:
|
||||
user_input += chr(key)
|
||||
elif max_length is None or len(user_input) < max_length:
|
||||
try:
|
||||
char = chr(key) if not isinstance(key, str) else key
|
||||
if input_type is int:
|
||||
if char.isdigit() or (char == "-" and len(user_input) == 0):
|
||||
user_input += char
|
||||
elif input_type is float:
|
||||
if (
|
||||
char.isdigit()
|
||||
or (char == "." and "." not in user_input)
|
||||
or (char == "-" and len(user_input) == 0)
|
||||
):
|
||||
user_input += char
|
||||
else:
|
||||
user_input += char
|
||||
except ValueError:
|
||||
pass # Ignore invalid input
|
||||
|
||||
# First line must be manually handled before using wrap_text()
|
||||
first_line = user_input[:first_line_width] # Cut to max first line width
|
||||
@@ -95,10 +210,12 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
curses.curs_set(0)
|
||||
input_win.erase()
|
||||
input_win.refresh()
|
||||
return user_input
|
||||
return user_input.strip()
|
||||
|
||||
|
||||
def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
"""Handles user input for editing up to 3 Admin Keys in Base64 format."""
|
||||
|
||||
def to_base64(byte_strings):
|
||||
"""Convert byte values to Base64-encoded strings."""
|
||||
return [base64.b64encode(b).decode() for b in byte_strings]
|
||||
@@ -130,7 +247,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
# Editable list of values (max 3 values)
|
||||
user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields
|
||||
cursor_pos = 0 # Track which value is being edited
|
||||
error_message = ""
|
||||
invalid_input = ""
|
||||
|
||||
while True:
|
||||
repeated_win.erase()
|
||||
@@ -150,8 +267,8 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
|
||||
|
||||
# Show error message if needed
|
||||
if error_message:
|
||||
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
|
||||
if invalid_input:
|
||||
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
|
||||
|
||||
repeated_win.refresh()
|
||||
key = repeated_win.getch()
|
||||
@@ -169,7 +286,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
curses.curs_set(0)
|
||||
return user_values # Return the edited Base64 values
|
||||
else:
|
||||
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
|
||||
invalid_input = "Error: Each key must be valid Base64 and 32 bytes long!"
|
||||
elif key == curses.KEY_UP: # Move cursor up
|
||||
cursor_pos = (cursor_pos - 1) % len(user_values)
|
||||
elif key == curses.KEY_DOWN: # Move cursor down
|
||||
@@ -180,7 +297,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
else:
|
||||
try:
|
||||
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
|
||||
error_message = "" # Clear error if user starts fixing input
|
||||
invalid_input = "" # Clear error if user starts fixing input
|
||||
except ValueError:
|
||||
pass # Ignore invalid character inputs
|
||||
|
||||
@@ -202,7 +319,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
|
||||
# Editable list of values (max 3 values)
|
||||
user_values = current_value[:3]
|
||||
cursor_pos = 0 # Track which value is being edited
|
||||
error_message = ""
|
||||
invalid_input = ""
|
||||
|
||||
while True:
|
||||
repeated_win.erase()
|
||||
@@ -222,8 +339,8 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
|
||||
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
|
||||
|
||||
# Show error message if needed
|
||||
if error_message:
|
||||
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
|
||||
if invalid_input:
|
||||
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
|
||||
|
||||
repeated_win.refresh()
|
||||
key = repeated_win.getch()
|
||||
@@ -249,7 +366,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
|
||||
else:
|
||||
try:
|
||||
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
|
||||
error_message = "" # Clear error if user starts fixing input
|
||||
invalid_input = "" # Clear error if user starts fixing input
|
||||
except ValueError:
|
||||
pass # Ignore invalid character inputs
|
||||
|
||||
|
||||
23
contact/utilities/validation_rules.py
Normal file
23
contact/utilities/validation_rules.py
Normal file
@@ -0,0 +1,23 @@
|
||||
validation_rules = {
|
||||
"shortName": {"max_length": 4},
|
||||
"longName": {"max_length": 32},
|
||||
"fixed_pin": {"min_length": 6, "max_length": 6},
|
||||
"position_flags": {"max_length": 3},
|
||||
"enabled_protocols": {"max_value": 2},
|
||||
"hop_limit": {"max_value": 7},
|
||||
"latitude": {"min_value": -90, "max_value": 90},
|
||||
"longitude": {"min_value": -180, "max_value": 180},
|
||||
"altitude": {"min_value": -4294967295, "max_value": 4294967295},
|
||||
"red": {"max_value": 255},
|
||||
"green": {"max_value": 255},
|
||||
"blue": {"max_value": 255},
|
||||
"current": {"max_value": 255},
|
||||
"position_precision": {"max_value": 32},
|
||||
}
|
||||
|
||||
|
||||
def get_validation_for(key: str) -> dict:
|
||||
for rule_key, config in validation_rules.items():
|
||||
if rule_key in key:
|
||||
return config
|
||||
return {}
|
||||
Reference in New Issue
Block a user