Compare commits

...

29 Commits

Author SHA1 Message Date
pdxlocations
cf4137347f refactor 2025-07-26 21:18:30 -07:00
pdxlocations
12f81f6af6 tweaks 2025-07-26 20:12:50 -07:00
pdxlocations
684d298fac check for selected_config 2025-07-26 20:01:59 -07:00
pdxlocations
4179a3f8d0 redraw input 2025-07-26 16:39:24 -07:00
pdxlocations
7a61808f47 fix positions 2025-07-26 00:55:41 -07:00
pdxlocations
a8680ac0ed changes 2025-07-26 00:31:49 -07:00
pdxlocations
818575939a automatic types 2025-07-25 19:02:22 -07:00
pdxlocations
acaad849b0 add rules 2025-07-25 18:40:37 -07:00
pdxlocations
0b25dda4af validation framework 2025-07-25 18:32:37 -07:00
pdxlocations
18329f0128 init 2025-07-25 00:18:51 -07:00
pdxlocations
4378f3045c unused argument 2025-07-24 18:02:44 -07:00
pdxlocations
a451d1d7d6 note to future me 2025-07-21 23:18:11 -07:00
pdxlocations
fe1f027219 Merge pull request #209 from pdxlocations/check-db-fields-for-null
Check for NULLS in DB
2025-07-21 10:57:18 -07:00
pdxlocations
43435cbe04 replace \x00 in messages 2025-07-21 10:54:19 -07:00
pdxlocations
fe98075582 bump version 2025-07-21 00:04:00 -07:00
pdxlocations
8716ea6fe1 dont write to the log before config 2025-07-20 23:46:24 -07:00
pdxlocations
a8bdcbb7e6 Merge pull request #208 from pdxlocations/config
fallback to user if install dir not writable
2025-07-17 23:00:13 -07:00
pdxlocations
02742b27f3 fallback to user if install dir not writable 2025-07-17 22:59:35 -07:00
pdxlocations
ae028032a0 Merge pull request #207 from pdxlocations/errors
show connection errors in console
2025-07-17 00:12:12 -07:00
pdxlocations
30402f4906 show connection errors in console 2025-07-17 00:10:17 -07:00
pdxlocations
324e0b03e7 Merge pull request #206 from pdxlocations/notifications
maybe fix aplay
2025-07-16 19:43:37 -07:00
pdxlocations
056db12911 maybe fix aplay 2025-07-16 18:46:51 -07:00
pdxlocations
685a2d4bf8 bump version 2025-07-14 08:15:16 -07:00
pdxlocations
6ed0cc8c9f Merge pull request #199 from rfschmid/add-traceroute-sent-message-to-history
Add traceroute sent message to history
2025-07-03 10:46:07 -07:00
Russell Schmidt
fc208a9258 Add "Traceroute Sent" to message history 2025-07-03 12:43:18 -05:00
Russell Schmidt
eaf9381bca Refactor message saving
Add common function for saving a message to history, removing some
duplicate code and making traceroutes add timestamps like other messages
do.
2025-07-03 12:43:18 -05:00
pdxlocations
367af9044c Merge pull request #198 from rfschmid/add-node-name-to-traceroute-confirm-dialog
Add node name to traceroute confirm dialog
2025-07-03 10:32:41 -07:00
Russell Schmidt
d8183d9009 Make capitalization consistent 2025-07-03 12:19:24 -05:00
Russell Schmidt
3fb1335be3 Add node name to traceroute confirm dialog 2025-07-03 12:10:56 -05:00
12 changed files with 397 additions and 185 deletions

View File

@@ -66,12 +66,8 @@ def prompt_region_if_unset(args: object) -> None:
interface_state.interface = initialize_interface(args)
def initialize_globals(args: object) -> None:
def initialize_globals() -> None:
"""Initializes interface and shared globals."""
interface_state.interface = initialize_interface(args)
if interface_state.interface.localNode.localConfig.lora.region == 0:
prompt_region_if_unset(args)
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
@@ -84,55 +80,63 @@ def initialize_globals(args: object) -> None:
def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI."""
output_capture = io.StringIO()
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
setup_colors()
draw_splash(stdscr)
setup_colors()
draw_splash(stdscr)
args = setup_parser().parse_args()
args = setup_parser().parse_args()
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
logging.info("Initializing interface...")
with app_state.lock:
initialize_globals(args)
logging.info("Starting main UI")
logging.info("Initializing interface...")
with app_state.lock:
interface_state.interface = initialize_interface(args)
main_ui(stdscr)
if interface_state.interface.localNode.localConfig.lora.region == 0:
prompt_region_if_unset(args)
except Exception as e:
console_output = output_capture.getvalue()
logging.error("Uncaught exception: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
initialize_globals()
logging.info("Starting main UI")
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
main_ui(stdscr)
except Exception:
console_output = output_capture.getvalue()
logging.error("Uncaught exception inside main_ui")
logging.error("Traceback:\n%s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
return
except Exception:
raise
def start() -> None:
"""Launch curses wrapper and redirect logs to file."""
"""Entry point for the application."""
if "--help" in sys.argv or "-h" in sys.argv:
setup_parser().print_help()
sys.exit(0)
with open(config.log_file_path, "a", buffering=1) as log_f:
sys.stdout = log_f
sys.stderr = log_f
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.error("Fatal error: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1)
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.critical("Fatal error", exc_info=True)
try:
curses.endwin()
except Exception:
pass
print("Fatal error:", e)
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":

View File

@@ -3,11 +3,12 @@ import os
import platform
import shutil
import subprocess
import time
from datetime import datetime
from typing import Any, Dict
from contact.utilities.utils import refresh_node_list
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
@@ -29,40 +30,43 @@ from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
sound_path = ""
executable = ""
sound_path = None
executable = None
if system == "Darwin": #macOS
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
sound_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
if(shutil.which("paplay")):
executable = "paplay"
else:
executable = "aplay"
if executable != "" and sound_path != "":
if os.path.exists(sound_path):
if shutil.which(executable):
subprocess.run([executable, sound_path], check=True,
stdout=subprocess.DEVNULL, stderr = subprocess.DEVNULL)
return
else:
logging.warning("No sound player found (afplay/paplay/aplay)")
elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
elif shutil.which("ffplay") and os.path.exists(ogg_path):
executable = "ffplay"
sound_path = ogg_path
elif shutil.which("aplay") and os.path.exists(wav_path):
executable = "aplay"
sound_path = wav_path
else:
logging.warning(f"Sound file not found: {sound_path}")
logging.warning("No suitable sound player or sound file found on Linux")
if executable and sound_path:
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
# Final fallback: terminal beep
print("\a")
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
"""
Handles an incoming packet from a Meshtastic interface.
@@ -121,7 +125,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
channel_number = ui_state.channel_list.index(packet["from"])
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
@@ -131,40 +137,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
save_message_to_db(channel_id, message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -1,4 +1,3 @@
from datetime import datetime
from typing import Any, Dict
import google.protobuf.json_format
@@ -16,6 +15,8 @@ import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.utils import add_new_message
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -146,8 +147,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
@@ -155,18 +157,14 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
@@ -190,32 +188,7 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
channelIndex=send_on_channel,
)
# Add sent message to the messages dictionary
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
timestamp = save_message_to_db(channel_id, myid, message)
@@ -230,10 +203,14 @@ def send_traceroute() -> None:
"""
Sends a RouteDiscovery protobuf to the selected node.
"""
channel_id = ui_state.node_list[ui_state.selected_node]
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
r = mesh_pb2.RouteDiscovery()
interface_state.interface.sendData(
r,
destinationId=ui_state.node_list[ui_state.selected_node],
destinationId=channel_id,
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,

View File

@@ -335,8 +335,7 @@ def handle_ctrl_t(stdscr: curses.window) -> None:
send_traceroute()
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(
stdscr,
"Traceroute Sent",
f"Traceroute Sent To: {get_name_from_database(ui_state.node_list[ui_state.selected_node])}",
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
)
curses.curs_set(1) # Show cursor again

View File

@@ -150,6 +150,15 @@ def draw_help_window(
)
def get_input_type_for_field(field) -> type:
if field.type in (field.TYPE_INT32, field.TYPE_UINT32, field.TYPE_INT64):
return int
elif field.type in (field.TYPE_FLOAT, field.TYPE_DOUBLE):
return float
else:
return str
def settings_menu(stdscr: object, interface: object) -> None:
curses.update_lines_cols()
@@ -268,7 +277,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
break
elif selected_option == "Export Config File":
filename = get_text_input("Enter a filename for the config file")
filename = get_text_input("Enter a filename for the config file", None, None)
if not filename:
logging.info("Export aborted: No filename provided.")
menu_state.start_index.pop()
@@ -290,7 +300,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
with open(yaml_file_path, "w", encoding="utf-8") as file:
file.write(config_text)
logging.info(f"Config file saved to {yaml_file_path}")
dialog(stdscr, "Config File Saved:", yaml_file_path)
dialog("Config File Saved:", yaml_file_path)
menu_state.start_index.pop()
continue
except PermissionError:
@@ -306,14 +316,14 @@ def settings_menu(stdscr: object, interface: object) -> None:
# Check if folder exists and is not empty
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
dialog(stdscr, "", " No config files found. Export a config first.")
dialog("", " No config files found. Export a config first.")
continue # Return to menu
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
# Ensure file_list is not empty before proceeding
if not file_list:
dialog(stdscr, "", " No config files found. Export a config first.")
dialog("", " No config files found. Export a config first.")
continue
filename = get_list_input("Choose a config file", None, file_list)
@@ -327,7 +337,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
elif selected_option == "Config URL":
current_value = interface.localNode.getURL()
new_value = get_text_input(f"Config URL is currently: {current_value}")
new_value = get_text_input(f"Config URL is currently: {current_value}", None, str)
if new_value is not None:
current_value = new_value
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
@@ -395,7 +405,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
if selected_option in ["longName", "shortName", "isLicensed"]:
if selected_option in ["longName", "shortName"]:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, None
)
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -414,7 +426,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif selected_option in ["latitude", "longitude", "altitude"]:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, float
)
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -453,17 +467,26 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif field.type == 13: # Field type 13 corresponds to UINT32
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else int(new_value)
menu_state.start_index.pop()
elif field.type == 2: # Field type 13 corresponds to INT64
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else float(new_value)
menu_state.start_index.pop()
else: # Handle other field types
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else new_value
menu_state.start_index.pop()

View File

@@ -7,10 +7,56 @@ from typing import Dict
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# Paths
json_file_path = os.path.join(parent_dir, "config.json")
log_file_path = os.path.join(parent_dir, "client.log")
db_file_path = os.path.join(parent_dir, "client.db")
# To test writting to a non-writable directory, you can uncomment the following lines:
# mkdir /tmp/test_nonwritable
# chmod -w /tmp/test_nonwritable
# parent_dir = "/tmp/test_nonwritable"
def _is_writable_dir(path: str) -> bool:
"""
Return True if we can create & delete a temp file in `path`.
"""
if not os.path.isdir(path):
return False
test_path = os.path.join(path, ".perm_test_tmp")
try:
with open(test_path, "w", encoding="utf-8") as _tmp:
_tmp.write("ok")
os.remove(test_path)
return True
except OSError:
return False
def _get_config_root(preferred_dir: str, fallback_name: str = ".contact_client") -> str:
"""
Choose a writable directory for config artifacts.
"""
if _is_writable_dir(preferred_dir):
return preferred_dir
home = os.path.expanduser("~")
fallback_dir = os.path.join(home, fallback_name)
# Ensure the fallback exists.
os.makedirs(fallback_dir, exist_ok=True)
# If *that* still isn't writable, last-ditch: use a system temp dir.
if not _is_writable_dir(fallback_dir):
import tempfile
fallback_dir = tempfile.mkdtemp(prefix="contact_client_")
return fallback_dir
# Pick the root now.
config_root = _get_config_root(parent_dir)
# Paths (derived from the chosen root)
json_file_path = os.path.join(config_root, "config.json")
log_file_path = os.path.join(config_root, "client.log")
db_file_path = os.path.join(config_root, "client.db")
def format_json_single_line_arrays(data: Dict[str, object], indent: int = 4) -> str:

View File

@@ -2,14 +2,13 @@ import curses
from contact.ui.colors import get_color
def dialog(stdscr: curses.window, title: str, message: str) -> None:
height, width = stdscr.getmaxyx()
def dialog(title: str, message: str) -> None:
height, width = curses.LINES, curses.COLS
# Calculate dialog dimensions
max_line_lengh = 0
message_lines = message.splitlines()
for l in message_lines:
max_line_length = max(len(l), max_line_lengh)
max_line_length = max(len(l) for l in message_lines)
dialog_width = max(len(title) + 4, max_line_length + 4)
dialog_height = len(message_lines) + 4
x = (width - dialog_width) // 2
@@ -24,12 +23,19 @@ def dialog(stdscr: curses.window, title: str, message: str) -> None:
# Add title
win.addstr(0, 2, title, get_color("settings_default"))
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l, get_color("settings_default"))
# Add message (centered)
for i, line in enumerate(message_lines):
msg_x = (dialog_width - len(line)) // 2
win.addstr(2 + i, msg_x, line, get_color("settings_default"))
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
# Add centered OK button
ok_text = " Ok "
win.addstr(
dialog_height - 2,
(dialog_width - len(ok_text)) // 2,
ok_text,
get_color("settings_default", reverse=True),
)
# Refresh dialog window
win.refresh()
@@ -37,8 +43,7 @@ def dialog(stdscr: curses.window, title: str, message: str) -> None:
# Get user input
while True:
char = win.getch()
# Close dialog with enter, space, or esc
if char in (curses.KEY_ENTER, 10, 13, 32, 27):
if char in (curses.KEY_ENTER, 10, 13, 32, 27): # Enter, space, or Esc
win.erase()
win.refresh()
return

View File

@@ -116,7 +116,14 @@ def load_messages_from_db() -> None:
# Add messages to ui_state.all_messages grouped by hourly timestamp
hourly_messages = {}
for user_id, message, timestamp, ack_type in db_messages:
for row in db_messages:
user_id, message, timestamp, ack_type = row
# Only ack_type is allowed to be None
if user_id is None or message is None or timestamp is None:
logging.warning(f"Skipping row with NULL required field(s): {row}")
continue
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
if hour not in hourly_messages:
hourly_messages[hour] = []
@@ -130,11 +137,13 @@ def load_messages_from_db() -> None:
ack_str = config.nak_str
if user_id == str(interface_state.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
sanitized_message = message.replace("\x00", "")
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", sanitized_message)
else:
sanitized_message = message.replace("\x00", "")
formatted_message = (
f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ",
message,
sanitized_message,
)
hourly_messages[hour].append(formatted_message)

View File

@@ -6,10 +6,43 @@ from typing import Any, Optional, List
from contact.ui.colors import get_color
from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text
from contact.ui.dialog import dialog
from contact.utilities.validation_rules import get_validation_for
def get_text_input(prompt: str) -> Optional[str]:
def invalid_input(window: curses.window, message: str, redraw_func: Optional[callable] = None) -> None:
"""Displays an invalid input message in the given window and redraws if needed."""
cursor_y, cursor_x = window.getyx()
curses.curs_set(0)
dialog("Invalid Input", message)
if redraw_func:
redraw_func() # Redraw the original window content that got obscured
else:
window.refresh()
window.move(cursor_y, cursor_x)
curses.curs_set(1)
def get_text_input(prompt: str, selected_config: str, input_type: str) -> Optional[str]:
"""Handles user input with wrapped text for long prompts."""
def redraw_input_win():
"""Redraw the input window with the current prompt and user input."""
input_win.erase()
input_win.border()
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
if row >= height - 3:
break
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
input_win.addstr(row + 1, col_start, user_input[:first_line_width], get_color("settings_default"))
for i, line in enumerate(wrap_text(user_input[first_line_width:], wrap_width=input_width)):
if row + 2 + i < height - 1:
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
input_win.refresh()
height = 8
width = 80
margin = 2 # Left and right margin
@@ -27,6 +60,7 @@ def get_text_input(prompt: str) -> Optional[str]:
# Wrap the prompt text
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
@@ -39,34 +73,115 @@ def get_text_input(prompt: str) -> Optional[str]:
input_win.refresh()
curses.curs_set(1)
max_length = 4 if "shortName" in prompt else None
user_input = ""
min_value = 0
max_value = 4294967295
min_length = 0
max_length = None
# Start user input after the prompt text
if selected_config is not None:
validation = get_validation_for(selected_config) or {}
min_value = validation.get("min_value", 0)
max_value = validation.get("max_value", 4294967295)
min_length = validation.get("min_length", 0)
max_length = validation.get("max_length")
user_input = ""
col_start = margin + len(prompt_text)
first_line_width = input_width - len(prompt_text) # Available space for first line
first_line_width = input_width - len(prompt_text)
while True:
key = input_win.get_wch() # Waits for user input
key = input_win.get_wch()
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
if key == chr(27) or key == curses.KEY_LEFT:
input_win.erase()
input_win.refresh()
curses.curs_set(0)
return None # Exit without saving
return None
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): # Enter key
break
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
if not user_input.strip():
invalid_input(input_win, "Value cannot be empty.", redraw_func=redraw_input_win)
continue
length = len(user_input)
if min_length == max_length and max_length is not None:
if length != min_length:
invalid_input(
input_win, f"Value must be exactly {min_length} characters long.", redraw_func=redraw_input_win
)
continue
else:
if length < min_length:
invalid_input(
input_win,
f"Value must be at least {min_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if max_length is not None and length > max_length:
invalid_input(
input_win,
f"Value must be no more than {max_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if input_type is int:
if not user_input.isdigit():
invalid_input(input_win, "Only numeric digits (09) allowed.", redraw_func=redraw_input_win)
continue
int_val = int(user_input)
if not (min_value <= int_val <= max_value):
invalid_input(
input_win, f"Enter a number between {min_value} and {max_value}.", redraw_func=redraw_input_win
)
continue
curses.curs_set(0)
return int_val
elif input_type is float:
try:
float_val = float(user_input)
if not (min_value <= float_val <= max_value):
invalid_input(
input_win,
f"Enter a number between {min_value} and {max_value}.",
redraw_func=redraw_input_win,
)
continue
except ValueError:
invalid_input(input_win, "Must be a valid floating point number.", redraw_func=redraw_input_win)
continue
else:
curses.curs_set(0)
return float_val
else:
break
elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace
if user_input:
user_input = user_input[:-1] # Remove last character
elif max_length is None or len(user_input) < max_length: # Enforce max length
if isinstance(key, str):
user_input += key
else:
user_input += chr(key)
elif max_length is None or len(user_input) < max_length:
try:
char = chr(key) if not isinstance(key, str) else key
if input_type is int:
if char.isdigit() or (char == "-" and len(user_input) == 0):
user_input += char
elif input_type is float:
if (
char.isdigit()
or (char == "." and "." not in user_input)
or (char == "-" and len(user_input) == 0)
):
user_input += char
else:
user_input += char
except ValueError:
pass # Ignore invalid input
# First line must be manually handled before using wrap_text()
first_line = user_input[:first_line_width] # Cut to max first line width
@@ -95,10 +210,12 @@ def get_text_input(prompt: str) -> Optional[str]:
curses.curs_set(0)
input_win.erase()
input_win.refresh()
return user_input
return user_input.strip()
def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
"""Handles user input for editing up to 3 Admin Keys in Base64 format."""
def to_base64(byte_strings):
"""Convert byte values to Base64-encoded strings."""
return [base64.b64encode(b).decode() for b in byte_strings]
@@ -130,7 +247,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
# Editable list of values (max 3 values)
user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields
cursor_pos = 0 # Track which value is being edited
error_message = ""
invalid_input = ""
while True:
repeated_win.erase()
@@ -150,8 +267,8 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
if invalid_input:
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
repeated_win.refresh()
key = repeated_win.getch()
@@ -169,7 +286,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
curses.curs_set(0)
return user_values # Return the edited Base64 values
else:
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
invalid_input = "Error: Each key must be valid Base64 and 32 bytes long!"
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
@@ -180,7 +297,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
invalid_input = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
@@ -202,7 +319,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
# Editable list of values (max 3 values)
user_values = current_value[:3]
cursor_pos = 0 # Track which value is being edited
error_message = ""
invalid_input = ""
while True:
repeated_win.erase()
@@ -222,8 +339,8 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
if invalid_input:
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
repeated_win.refresh()
key = repeated_win.getch()
@@ -249,7 +366,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
invalid_input = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs

View File

@@ -1,4 +1,5 @@
import datetime
import time
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
@@ -134,3 +135,31 @@ def get_time_ago(timestamp):
if unit != "s":
return f"{value} {unit} ago"
return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ui_state.all_messages[channel_id].append((prefix,message))

View File

@@ -0,0 +1,23 @@
validation_rules = {
"shortName": {"max_length": 4},
"longName": {"max_length": 32},
"fixed_pin": {"min_length": 6, "max_length": 6},
"position_flags": {"max_length": 3},
"enabled_protocols": {"max_value": 2},
"hop_limit": {"max_value": 7},
"latitude": {"min_value": -90, "max_value": 90},
"longitude": {"min_value": -180, "max_value": 180},
"altitude": {"min_value": -4294967295, "max_value": 4294967295},
"red": {"max_value": 255},
"green": {"max_value": 255},
"blue": {"max_value": 255},
"current": {"max_value": 255},
"position_precision": {"max_value": 32},
}
def get_validation_for(key: str) -> dict:
for rule_key, config in validation_rules.items():
if rule_key in key:
return config
return {}

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.14"
version = "1.3.16"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}