Compare commits

..

13 Commits

Author SHA1 Message Date
pdxlocations
74d19c5ca7 clean-up 2025-05-18 12:17:27 -07:00
pdxlocations
33f7db4cb9 Merge branch 'main' into refactor-chat-ui 2025-05-18 08:34:01 -07:00
pdxlocations
06d151e991 hack fix 2025-05-18 08:21:53 -07:00
pdxlocations
50efa50923 working changes 2025-05-17 23:27:19 -07:00
pdxlocations
cef86ab185 I think it works! 2025-05-17 23:14:37 -07:00
pdxlocations
3d305a22b9 closer changes 2025-05-17 22:33:44 -07:00
pdxlocations
6d45788a5b mostly working changes 2025-04-28 22:20:00 -07:00
pdxlocations
e0e09aeccf so close 2025-04-20 22:04:41 -07:00
pdxlocations
bfb7ed0278 more almost working changes 2025-04-20 20:59:10 -07:00
pdxlocations
b7a275f725 Almost working changes 2025-04-19 23:12:59 -07:00
pdxlocations
587d79cb93 move lock to app state 2025-04-19 21:22:47 -07:00
pdxlocations
eb79675be0 convert globals to dataclass 2025-04-19 21:17:19 -07:00
pdxlocations
61e1799199 init 2025-04-19 21:13:46 -07:00
15 changed files with 475 additions and 849 deletions

View File

@@ -53,21 +53,22 @@ logging.basicConfig(
app_state.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
# ------------------------------------------------------------------------------
def prompt_region_if_unset(args: object) -> None:
"""Prompt user to set region if it is unset."""
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
def initialize_globals() -> None:
def initialize_globals(args) -> None:
"""Initializes interface and shared globals."""
interface_state.interface = initialize_interface(args)
# Prompt for region if unset
if interface_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
@@ -80,63 +81,55 @@ def initialize_globals() -> None:
def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI."""
output_capture = io.StringIO()
try:
setup_colors()
draw_splash(stdscr)
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
setup_colors()
draw_splash(stdscr)
args = setup_parser().parse_args()
args = setup_parser().parse_args()
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
logging.info("Initializing interface...")
with app_state.lock:
interface_state.interface = initialize_interface(args)
logging.info("Initializing interface...")
with app_state.lock:
initialize_globals(args)
logging.info("Starting main UI")
if interface_state.interface.localNode.localConfig.lora.region == 0:
prompt_region_if_unset(args)
main_ui(stdscr)
initialize_globals()
logging.info("Starting main UI")
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
main_ui(stdscr)
except Exception:
console_output = output_capture.getvalue()
logging.error("Uncaught exception inside main_ui")
logging.error("Traceback:\n%s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
return
except Exception:
except Exception as e:
console_output = output_capture.getvalue()
logging.error("Uncaught exception: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
raise
def start() -> None:
"""Entry point for the application."""
"""Launch curses wrapper and redirect logs to file."""
if "--help" in sys.argv or "-h" in sys.argv:
setup_parser().print_help()
sys.exit(0)
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.critical("Fatal error", exc_info=True)
try:
curses.endwin()
except Exception:
pass
print("Fatal error:", e)
traceback.print_exc()
sys.exit(1)
with open(config.log_file_path, "a", buffering=1) as log_f:
sys.stdout = log_f
sys.stderr = log_f
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.error("Fatal error: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1)
if __name__ == "__main__":

View File

@@ -1,14 +1,9 @@
import logging
import os
import platform
import shutil
import subprocess
import time
from datetime import datetime
from typing import Any, Dict
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.utilities.utils import refresh_node_list
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
@@ -27,46 +22,6 @@ import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
elif shutil.which("ffplay") and os.path.exists(ogg_path):
executable = "ffplay"
sound_path = ogg_path
elif shutil.which("aplay") and os.path.exists(wav_path):
executable = "aplay"
sound_path = wav_path
else:
logging.warning("No suitable sound player or sound file found on Linux")
if executable and sound_path:
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
"""
Handles an incoming packet from a Meshtastic interface.
@@ -98,10 +53,6 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
if config.notification_sound == "True":
play_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
@@ -125,9 +76,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
@@ -137,14 +86,40 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string)
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string)
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict
import google.protobuf.json_format
@@ -15,8 +16,6 @@ import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.utils import add_new_message
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -147,9 +146,8 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
@@ -157,14 +155,18 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
@@ -188,7 +190,32 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
channelIndex=send_on_channel,
)
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
# Add sent message to the messages dictionary
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message)
@@ -203,14 +230,10 @@ def send_traceroute() -> None:
"""
Sends a RouteDiscovery protobuf to the selected node.
"""
channel_id = ui_state.node_list[ui_state.selected_node]
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
r = mesh_pb2.RouteDiscovery()
interface_state.interface.sendData(
r,
destinationId=channel_id,
destinationId=ui_state.node_list[ui_state.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,

View File

@@ -1,4 +1,5 @@
import curses
import textwrap
import logging
import traceback
from typing import Union
@@ -11,36 +12,28 @@ from contact.utilities.db_handler import get_name_from_database, update_node_inf
from contact.utilities.input_handlers import get_list_input
import contact.ui.default_config as config
import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines
from contact.utilities.singleton import ui_state, interface_state
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
"""Handle terminal resize events and redraw the UI accordingly."""
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win
# Calculate window max dimensions
height, width = stdscr.getmaxyx()
# Define window dimensions and positions
channel_width = int(config.channel_list_16ths) * (width // 16)
nodes_width = int(config.node_list_16ths) * (width // 16)
channel_width = 3 * (width // 16)
nodes_width = 5 * (width // 16)
messages_width = width - channel_width - nodes_width
entry_height = 3
function_height = 3
y_pad = entry_height + function_height
packet_log_height = int(height / 3)
if firstrun:
entry_win = curses.newwin(entry_height, width, 0, 0)
channel_win = curses.newwin(height - y_pad, channel_width, entry_height, 0)
messages_win = curses.newwin(height - y_pad, messages_width, entry_height, channel_width)
nodes_win = curses.newwin(height - y_pad, nodes_width, entry_height, channel_width + messages_width)
function_win = curses.newwin(function_height, width, height - function_height, 0)
packetlog_win = curses.newwin(
packet_log_height, messages_width, height - packet_log_height - function_height, channel_width
)
entry_win = curses.newwin(3, width, 0, 0)
channel_win = curses.newwin(height - 6, channel_width, 3, 0)
messages_win = curses.newwin(height - 6, messages_width, 3, channel_width)
nodes_win = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
function_win = curses.newwin(3, width, height - 3, 0)
packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width)
# Will be resized to what we need when drawn
messages_pad = curses.newpad(1, 1)
@@ -65,19 +58,19 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
entry_win.resize(3, width)
channel_win.resize(height - y_pad, channel_width)
channel_win.resize(height - 6, channel_width)
messages_win.resize(height - y_pad, messages_width)
messages_win.resize(height - 6, messages_width)
messages_win.mvwin(3, channel_width)
nodes_win.resize(height - y_pad, nodes_width)
nodes_win.mvwin(entry_height, channel_width + messages_width)
nodes_win.resize(height - 6, nodes_width)
nodes_win.mvwin(3, channel_width + messages_width)
function_win.resize(3, width)
function_win.mvwin(height - function_height, 0)
function_win.mvwin(height - 3, 0)
packetlog_win.resize(packet_log_height, messages_width)
packetlog_win.mvwin(height - packet_log_height - function_height, channel_width)
packetlog_win.resize(int(height / 3), messages_width)
packetlog_win.mvwin(height - int(height / 3) - 3, channel_width)
# Draw window borders
for win in [channel_win, entry_win, nodes_win, messages_win, function_win]:
@@ -100,7 +93,6 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
def main_ui(stdscr: curses.window) -> None:
"""Main UI loop for the curses interface."""
global input_text
input_text = ""
stdscr.keypad(True)
@@ -108,7 +100,7 @@ def main_ui(stdscr: curses.window) -> None:
handle_resize(stdscr, True)
while True:
draw_text_field(entry_win, f"Input: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
char = entry_win.get_wch()
@@ -116,59 +108,307 @@ def main_ui(stdscr: curses.window) -> None:
# draw_debug(f"Keypress: {char}")
if char == curses.KEY_UP:
handle_up()
if ui_state.current_window == 0:
scroll_channels(-1)
elif ui_state.current_window == 1:
scroll_messages(-1)
elif ui_state.current_window == 2:
scroll_nodes(-1)
elif char == curses.KEY_DOWN:
handle_down()
if ui_state.current_window == 0:
scroll_channels(1)
elif ui_state.current_window == 1:
scroll_messages(1)
elif ui_state.current_window == 2:
scroll_nodes(1)
elif char == curses.KEY_HOME:
handle_home()
if ui_state.current_window == 0:
select_channel(0)
elif ui_state.current_window == 1:
ui_state.selected_message = 0
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(0)
elif char == curses.KEY_END:
handle_end()
if ui_state.current_window == 0:
select_channel(len(ui_state.channel_list) - 1)
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
ui_state.selected_message = max(msg_line_count - get_msg_window_lines(messages_win, packetlog_win), 0)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(len(ui_state.node_list) - 1)
elif char == curses.KEY_PPAGE:
handle_pageup()
if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif ui_state.current_window == 1:
ui_state.selected_message = max(
ui_state.selected_message - get_msg_window_lines(messages_win, packetlog_win), 0
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(
ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2)
) # select_node will bounds check for us
elif char == curses.KEY_NPAGE:
handle_pagedown()
if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
ui_state.selected_message = min(
ui_state.selected_message + get_msg_window_lines(messages_win, packetlog_win),
msg_line_count - get_msg_window_lines(messages_win, packetlog_win),
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(
ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2)
) # select_node will bounds check for us
elif char == curses.KEY_LEFT or char == curses.KEY_RIGHT:
handle_leftright(char)
delta = -1 if char == curses.KEY_LEFT else 1
old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3
if old_window == 0:
channel_win.attrset(get_color("window_frame"))
channel_win.box()
channel_win.refresh()
refresh_pad(0)
if old_window == 1:
messages_win.attrset(get_color("window_frame"))
messages_win.box()
messages_win.refresh()
refresh_pad(1)
elif old_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame"))
nodes_win.box()
nodes_win.refresh()
refresh_pad(2)
if ui_state.current_window == 0:
channel_win.attrset(get_color("window_frame_selected"))
channel_win.box()
channel_win.attrset(get_color("window_frame"))
channel_win.refresh()
refresh_pad(0)
elif ui_state.current_window == 1:
messages_win.attrset(get_color("window_frame_selected"))
messages_win.box()
messages_win.attrset(get_color("window_frame"))
messages_win.refresh()
refresh_pad(1)
elif ui_state.current_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame_selected"))
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
nodes_win.refresh()
refresh_pad(2)
# Check for Esc
elif char == chr(27):
break
# Check for Ctrl + t
elif char == chr(20):
send_traceroute()
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(
stdscr,
"Traceroute Sent",
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
)
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
input_text = handle_enter(input_text)
if ui_state.current_window == 2:
node_list = ui_state.node_list
if node_list[ui_state.selected_node] not in ui_state.channel_list:
ui_state.channel_list.append(node_list[ui_state.selected_node])
if node_list[ui_state.selected_node] not in ui_state.all_messages:
ui_state.all_messages[node_list[ui_state.selected_node]] = []
elif char == chr(20): # Ctrl + t for Traceroute
handle_ctrl_t(stdscr)
ui_state.selected_channel = ui_state.channel_list.index(node_list[ui_state.selected_node])
if is_chat_archived(ui_state.channel_list[ui_state.selected_channel]):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=False)
ui_state.selected_node = 0
ui_state.current_window = 0
draw_node_list()
draw_channel_list()
draw_messages_window(True)
elif len(input_text) > 0:
# Enter key pressed, send user input as message
send_message(input_text, channel=ui_state.selected_channel)
draw_messages_window(True)
# Clear entry window and reset input text
input_text = ""
entry_win.erase()
elif char in (curses.KEY_BACKSPACE, chr(127)):
input_text = handle_backspace(entry_win, input_text)
if input_text:
input_text = input_text[:-1]
y, x = entry_win.getyx()
entry_win.move(y, x - 1)
entry_win.addch(" ") #
entry_win.move(y, x - 1)
entry_win.refresh()
elif char == "`": # ` Launch the settings interface
handle_backtick(stdscr)
curses.curs_set(0)
settings_menu(stdscr, interface_state.interface)
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
elif char == chr(16): # Ctrl + P for Packet Log
handle_ctrl_p()
elif char == chr(16):
# Display packet log
if ui_state.display_log is False:
ui_state.display_log = True
draw_messages_window(True)
else:
ui_state.display_log = False
packetlog_win.erase()
draw_messages_window(True)
elif char == curses.KEY_RESIZE:
input_text = ""
handle_resize(stdscr, False)
elif char == chr(4): # Ctrl + D to delete current channel or node
handle_ctrl_d()
# ^D
elif char == chr(4):
if ui_state.current_window == 0:
if isinstance(ui_state.channel_list[ui_state.selected_channel], int):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=True)
elif char == chr(31): # Ctrl + / to search
handle_ctrl_fslash()
# Shift notifications up to account for deleted item
for i in range(len(ui_state.notifications)):
if ui_state.notifications[i] > ui_state.selected_channel:
ui_state.notifications[i] -= 1
elif char == chr(6): # Ctrl + F to toggle favorite
handle_ctrl_f(stdscr)
del ui_state.channel_list[ui_state.selected_channel]
ui_state.selected_channel = min(ui_state.selected_channel, len(ui_state.channel_list) - 1)
select_channel(ui_state.selected_channel)
draw_channel_list()
draw_messages_window()
elif char == chr(7): # Ctrl + G to toggle ignored
handle_ctlr_g(stdscr)
if ui_state.current_window == 2:
curses.curs_set(0)
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from nodedb?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node])
elif char == chr(27): # Escape to exit
break
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
del interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
# Convert to "!hex" representation that interface.nodes uses
hexid = f"!{hex(ui_state.node_list[ui_state.selected_node])[2:]}"
del interface_state.interface.nodes[hexid]
ui_state.node_list.pop(ui_state.selected_node)
draw_messages_window()
draw_node_list()
else:
draw_messages_window()
curses.curs_set(1)
continue
# ^/
elif char == chr(31):
if ui_state.current_window == 2 or ui_state.current_window == 0:
search(ui_state.current_window)
# ^F
elif char == chr(6):
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isFavorite" not in selectedNode or selectedNode["isFavorite"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Favorite?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isFavorite"
] = True
refresh_node_list()
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Favorites?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isFavorite"
] = False
refresh_node_list()
handle_resize(stdscr, False)
elif char == chr(7):
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isIgnored" not in selectedNode or selectedNode["isIgnored"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isIgnored"
] = True
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isIgnored"
] = False
handle_resize(stdscr, False)
else:
# Append typed character to input text
@@ -178,318 +418,7 @@ def main_ui(stdscr: curses.window) -> None:
input_text += chr(char)
def handle_up() -> None:
"""Handle key up events to scroll the current window."""
if ui_state.current_window == 0:
scroll_channels(-1)
elif ui_state.current_window == 1:
scroll_messages(-1)
elif ui_state.current_window == 2:
scroll_nodes(-1)
def handle_down() -> None:
"""Handle key down events to scroll the current window."""
if ui_state.current_window == 0:
scroll_channels(1)
elif ui_state.current_window == 1:
scroll_messages(1)
elif ui_state.current_window == 2:
scroll_nodes(1)
def handle_home() -> None:
"""Handle home key events to select the first item in the current window."""
if ui_state.current_window == 0:
select_channel(0)
elif ui_state.current_window == 1:
ui_state.selected_message = 0
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(0)
def handle_end() -> None:
"""Handle end key events to select the last item in the current window."""
if ui_state.current_window == 0:
select_channel(len(ui_state.channel_list) - 1)
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
ui_state.selected_message = max(msg_line_count - get_msg_window_lines(messages_win, packetlog_win), 0)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(len(ui_state.node_list) - 1)
def handle_pageup() -> None:
"""Handle page up key events to scroll the current window by a page."""
if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif ui_state.current_window == 1:
ui_state.selected_message = max(
ui_state.selected_message - get_msg_window_lines(messages_win, packetlog_win), 0
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us
def handle_pagedown() -> None:
"""Handle page down key events to scroll the current window down."""
if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
ui_state.selected_message = min(
ui_state.selected_message + get_msg_window_lines(messages_win, packetlog_win),
msg_line_count - get_msg_window_lines(messages_win, packetlog_win),
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us
def handle_leftright(char: int) -> None:
"""Handle left/right key events to switch between windows."""
delta = -1 if char == curses.KEY_LEFT else 1
old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3
if old_window == 0:
channel_win.attrset(get_color("window_frame"))
channel_win.box()
channel_win.refresh()
refresh_pad(0)
if old_window == 1:
messages_win.attrset(get_color("window_frame"))
messages_win.box()
messages_win.refresh()
refresh_pad(1)
elif old_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame"))
nodes_win.box()
nodes_win.refresh()
refresh_pad(2)
if ui_state.current_window == 0:
channel_win.attrset(get_color("window_frame_selected"))
channel_win.box()
channel_win.attrset(get_color("window_frame"))
channel_win.refresh()
refresh_pad(0)
elif ui_state.current_window == 1:
messages_win.attrset(get_color("window_frame_selected"))
messages_win.box()
messages_win.attrset(get_color("window_frame"))
messages_win.refresh()
refresh_pad(1)
elif ui_state.current_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame_selected"))
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
nodes_win.refresh()
refresh_pad(2)
def handle_enter(input_text: str) -> str:
"""Handle Enter key events to send messages or select channels."""
if ui_state.current_window == 2:
node_list = ui_state.node_list
if node_list[ui_state.selected_node] not in ui_state.channel_list:
ui_state.channel_list.append(node_list[ui_state.selected_node])
if node_list[ui_state.selected_node] not in ui_state.all_messages:
ui_state.all_messages[node_list[ui_state.selected_node]] = []
ui_state.selected_channel = ui_state.channel_list.index(node_list[ui_state.selected_node])
if is_chat_archived(ui_state.channel_list[ui_state.selected_channel]):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=False)
ui_state.selected_node = 0
ui_state.current_window = 0
draw_node_list()
draw_channel_list()
draw_messages_window(True)
return input_text
elif len(input_text) > 0:
# Enter key pressed, send user input as message
send_message(input_text, channel=ui_state.selected_channel)
draw_messages_window(True)
# Clear entry window and reset input text
entry_win.erase()
return ""
return input_text
def handle_ctrl_t(stdscr: curses.window) -> None:
"""Handle Ctrl + T key events to send a traceroute."""
send_traceroute()
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(
f"Traceroute Sent To: {get_name_from_database(ui_state.node_list[ui_state.selected_node])}",
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
)
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
def handle_backspace(entry_win: curses.window, input_text: str) -> str:
"""Handle backspace key events to remove the last character from input text."""
if input_text:
input_text = input_text[:-1]
y, x = entry_win.getyx()
entry_win.move(y, x - 1)
entry_win.addch(" ") #
entry_win.move(y, x - 1)
entry_win.refresh()
return input_text
def handle_backtick(stdscr: curses.window) -> None:
"""Handle backtick key events to open the settings menu."""
curses.curs_set(0)
settings_menu(stdscr, interface_state.interface)
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
def handle_ctrl_p() -> None:
"""Handle Ctrl + P key events to toggle the packet log display."""
# Display packet log
if ui_state.display_log is False:
ui_state.display_log = True
draw_messages_window(True)
else:
ui_state.display_log = False
packetlog_win.erase()
draw_messages_window(True)
def handle_ctrl_d() -> None:
if ui_state.current_window == 0:
if isinstance(ui_state.channel_list[ui_state.selected_channel], int):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=True)
# Shift notifications up to account for deleted item
for i in range(len(ui_state.notifications)):
if ui_state.notifications[i] > ui_state.selected_channel:
ui_state.notifications[i] -= 1
del ui_state.channel_list[ui_state.selected_channel]
ui_state.selected_channel = min(ui_state.selected_channel, len(ui_state.channel_list) - 1)
select_channel(ui_state.selected_channel)
draw_channel_list()
draw_messages_window()
if ui_state.current_window == 2:
curses.curs_set(0)
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from nodedb?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node])
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
del interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
# Convert to "!hex" representation that interface.nodes uses
hexid = f"!{hex(ui_state.node_list[ui_state.selected_node])[2:]}"
del interface_state.interface.nodes[hexid]
ui_state.node_list.pop(ui_state.selected_node)
draw_messages_window()
draw_node_list()
else:
draw_messages_window()
curses.curs_set(1)
def handle_ctrl_fslash() -> None:
"""Handle Ctrl + / key events to search in the current window."""
if ui_state.current_window == 2 or ui_state.current_window == 0:
search(ui_state.current_window)
def handle_ctrl_f(stdscr: curses.window) -> None:
"""Handle Ctrl + F key events to toggle favorite status of the selected node."""
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isFavorite" not in selectedNode or selectedNode["isFavorite"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Favorite?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isFavorite"] = True
refresh_node_list()
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Favorites?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isFavorite"] = False
refresh_node_list()
handle_resize(stdscr, False)
def handle_ctlr_g(stdscr: curses.window) -> None:
"""Handle Ctrl + G key events to toggle ignored status of the selected node."""
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isIgnored" not in selectedNode or selectedNode["isIgnored"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isIgnored"] = True
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isIgnored"] = False
handle_resize(stdscr, False)
def draw_channel_list() -> None:
"""Update the channel list window and pad based on the current state."""
channel_pad.erase()
win_width = channel_win.getmaxyx()[1]
@@ -550,7 +479,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
row = 0
for prefix, message in messages:
full_message = f"{prefix}{message}"
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
wrapped_lines = textwrap.wrap(full_message, messages_win.getmaxyx()[1] - 2)
msg_line_count += len(wrapped_lines)
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])
@@ -595,7 +524,6 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
def draw_node_list() -> None:
"""Update the nodes list window and pad based on the current state."""
global nodes_pad
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
@@ -644,7 +572,6 @@ def draw_node_list() -> None:
def select_channel(idx: int) -> None:
"""Select a channel by index and update the UI state accordingly."""
old_selected_channel = ui_state.selected_channel
ui_state.selected_channel = max(0, min(idx, len(ui_state.channel_list) - 1))
draw_messages_window(True)
@@ -666,7 +593,6 @@ def select_channel(idx: int) -> None:
def scroll_channels(direction: int) -> None:
"""Scroll through the channel list by a given direction."""
new_selected_channel = ui_state.selected_channel + direction
if new_selected_channel < 0:
@@ -678,7 +604,6 @@ def scroll_channels(direction: int) -> None:
def scroll_messages(direction: int) -> None:
"""Scroll through the messages in the current channel by a given direction."""
ui_state.selected_message += direction
msg_line_count = messages_pad.getmaxyx()[0]
@@ -711,7 +636,6 @@ def scroll_messages(direction: int) -> None:
def select_node(idx: int) -> None:
"""Select a node by index and update the UI state accordingly."""
old_selected_node = ui_state.selected_node
ui_state.selected_node = max(0, min(idx, len(ui_state.node_list) - 1))
@@ -728,7 +652,6 @@ def select_node(idx: int) -> None:
def scroll_nodes(direction: int) -> None:
"""Scroll through the node list by a given direction."""
new_selected_node = ui_state.selected_node + direction
if new_selected_node < 0:
@@ -740,7 +663,7 @@ def scroll_nodes(direction: int) -> None:
def draw_packetlog_win() -> None:
"""Draw the packet log window with the latest packets."""
columns = [10, 10, 15, 30]
span = 0
@@ -793,7 +716,6 @@ def draw_packetlog_win() -> None:
def search(win: int) -> None:
"""Search for a node or channel based on user input."""
start_idx = ui_state.selected_node
select_func = select_node
@@ -842,7 +764,6 @@ def search(win: int) -> None:
def draw_node_details() -> None:
"""Draw the details of the selected node in the function window."""
node = None
try:
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
@@ -906,18 +827,16 @@ def draw_node_details() -> None:
def draw_help() -> None:
"""Draw the help text in the function window."""
cmds = [
"↑→↓← = Select",
" ENTER = Send",
" ` = Settings",
" ESC = Quit",
" ^P = Packet Log",
" ^t = Traceroute",
" ^d = Archive Chat",
" ^f = Favorite",
" ^g = Ignore",
" ^/ = Search",
" ENTER = Send",
" ` = Settings",
" ^P = Packet Log",
" ESC = Quit",
" ^t = Traceroute",
" ^d = Archive Chat",
" ^f = Favorite",
" ^g = Ignore",
]
function_str = ""
for s in cmds:

View File

@@ -150,15 +150,6 @@ def draw_help_window(
)
def get_input_type_for_field(field) -> type:
if field.type in (field.TYPE_INT32, field.TYPE_UINT32, field.TYPE_INT64):
return int
elif field.type in (field.TYPE_FLOAT, field.TYPE_DOUBLE):
return float
else:
return str
def settings_menu(stdscr: object, interface: object) -> None:
curses.update_lines_cols()
@@ -277,8 +268,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
break
elif selected_option == "Export Config File":
filename = get_text_input("Enter a filename for the config file", None, None)
filename = get_text_input("Enter a filename for the config file")
if not filename:
logging.info("Export aborted: No filename provided.")
menu_state.start_index.pop()
@@ -300,7 +290,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
with open(yaml_file_path, "w", encoding="utf-8") as file:
file.write(config_text)
logging.info(f"Config file saved to {yaml_file_path}")
dialog("Config File Saved:", yaml_file_path)
dialog(stdscr, "Config File Saved:", yaml_file_path)
menu_state.start_index.pop()
continue
except PermissionError:
@@ -316,14 +306,14 @@ def settings_menu(stdscr: object, interface: object) -> None:
# Check if folder exists and is not empty
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
dialog("", " No config files found. Export a config first.")
dialog(stdscr, "", " No config files found. Export a config first.")
continue # Return to menu
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
# Ensure file_list is not empty before proceeding
if not file_list:
dialog("", " No config files found. Export a config first.")
dialog(stdscr, "", " No config files found. Export a config first.")
continue
filename = get_list_input("Choose a config file", None, file_list)
@@ -337,7 +327,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
elif selected_option == "Config URL":
current_value = interface.localNode.getURL()
new_value = get_text_input(f"Config URL is currently: {current_value}", None, str)
new_value = get_text_input(f"Config URL is currently: {current_value}")
if new_value is not None:
current_value = new_value
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
@@ -405,9 +395,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
if selected_option in ["longName", "shortName", "isLicensed"]:
if selected_option in ["longName", "shortName"]:
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, None
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -426,9 +414,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif selected_option in ["latitude", "longitude", "altitude"]:
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, float
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -467,26 +453,17 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif field.type == 13: # Field type 13 corresponds to UINT32
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else int(new_value)
menu_state.start_index.pop()
elif field.type == 2: # Field type 13 corresponds to INT64
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else float(new_value)
menu_state.start_index.pop()
else: # Handle other field types
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else new_value
menu_state.start_index.pop()

View File

@@ -7,56 +7,10 @@ from typing import Dict
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# To test writting to a non-writable directory, you can uncomment the following lines:
# mkdir /tmp/test_nonwritable
# chmod -w /tmp/test_nonwritable
# parent_dir = "/tmp/test_nonwritable"
def _is_writable_dir(path: str) -> bool:
"""
Return True if we can create & delete a temp file in `path`.
"""
if not os.path.isdir(path):
return False
test_path = os.path.join(path, ".perm_test_tmp")
try:
with open(test_path, "w", encoding="utf-8") as _tmp:
_tmp.write("ok")
os.remove(test_path)
return True
except OSError:
return False
def _get_config_root(preferred_dir: str, fallback_name: str = ".contact_client") -> str:
"""
Choose a writable directory for config artifacts.
"""
if _is_writable_dir(preferred_dir):
return preferred_dir
home = os.path.expanduser("~")
fallback_dir = os.path.join(home, fallback_name)
# Ensure the fallback exists.
os.makedirs(fallback_dir, exist_ok=True)
# If *that* still isn't writable, last-ditch: use a system temp dir.
if not _is_writable_dir(fallback_dir):
import tempfile
fallback_dir = tempfile.mkdtemp(prefix="contact_client_")
return fallback_dir
# Pick the root now.
config_root = _get_config_root(parent_dir)
# Paths (derived from the chosen root)
json_file_path = os.path.join(config_root, "config.json")
log_file_path = os.path.join(config_root, "client.log")
db_file_path = os.path.join(config_root, "client.db")
# Paths
json_file_path = os.path.join(parent_dir, "config.json")
log_file_path = os.path.join(parent_dir, "client.log")
db_file_path = os.path.join(parent_dir, "client.db")
def format_json_single_line_arrays(data: Dict[str, object], indent: int = 4) -> str:
@@ -169,18 +123,15 @@ def initialize_config() -> Dict[str, object]:
"settings_breadcrumbs": ["green", "black"],
"settings_warning": ["green", "black"],
"settings_note": ["green", "black"],
"node_favorite": ["cyan", "green"],
"node_ignored": ["red", "black"],
"node_favorite": ["cyan", "white"],
"node_ignored": ["red", "white"],
}
default_config_variables = {
"channel_list_16ths": "3",
"node_list_16ths": "5",
"db_file_path": db_file_path,
"log_file_path": log_file_path,
"message_prefix": ">>",
"sent_message_prefix": ">> Sent",
"notification_symbol": "*",
"notification_sound": "True",
"ack_implicit_str": "[◌]",
"ack_str": "[✓]",
"nak_str": "[x]",
@@ -219,23 +170,18 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
global db_file_path, log_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global node_list_16ths, channel_list_16ths
global theme, COLOR_CONFIG
global node_sort, notification_sound
global node_sort
channel_list_16ths = loaded_config["channel_list_16ths"]
node_list_16ths = loaded_config["node_list_16ths"]
db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"]
message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"]
notification_sound = loaded_config["notification_sound"]
ack_implicit_str = loaded_config["ack_implicit_str"]
ack_str = loaded_config["ack_str"]
nak_str = loaded_config["nak_str"]
ack_unknown_str = loaded_config["ack_unknown_str"]
node_sort = loaded_config["node_sort"]
theme = loaded_config["theme"]
if theme == "dark":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
@@ -243,6 +189,7 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
node_sort = loaded_config["node_sort"]
# Call the function when the script is imported

View File

@@ -2,13 +2,14 @@ import curses
from contact.ui.colors import get_color
def dialog(title: str, message: str) -> None:
height, width = curses.LINES, curses.COLS
def dialog(stdscr: curses.window, title: str, message: str) -> None:
height, width = stdscr.getmaxyx()
# Calculate dialog dimensions
max_line_lengh = 0
message_lines = message.splitlines()
max_line_length = max(len(l) for l in message_lines)
for l in message_lines:
max_line_length = max(len(l), max_line_lengh)
dialog_width = max(len(title) + 4, max_line_length + 4)
dialog_height = len(message_lines) + 4
x = (width - dialog_width) // 2
@@ -23,19 +24,12 @@ def dialog(title: str, message: str) -> None:
# Add title
win.addstr(0, 2, title, get_color("settings_default"))
# Add message (centered)
for i, line in enumerate(message_lines):
msg_x = (dialog_width - len(line)) // 2
win.addstr(2 + i, msg_x, line, get_color("settings_default"))
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l, get_color("settings_default"))
# Add centered OK button
ok_text = " Ok "
win.addstr(
dialog_height - 2,
(dialog_width - len(ok_text)) // 2,
ok_text,
get_color("settings_default", reverse=True),
)
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
# Refresh dialog window
win.refresh()
@@ -43,7 +37,8 @@ def dialog(title: str, message: str) -> None:
# Get user input
while True:
char = win.getch()
if char in (curses.KEY_ENTER, 10, 13, 32, 27): # Enter, space, or Esc
# Close dialog with enter, space, or esc
if char in (curses.KEY_ENTER, 10, 13, 32, 27):
win.erase()
win.refresh()
return

View File

@@ -1,7 +1,5 @@
import curses
import re
from unicodedata import east_asian_width
from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict
@@ -295,16 +293,9 @@ def get_wrapped_help_text(
return wrapped_help
def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words."""
whitespace = '\t\n\x0b\x0c\r '
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
text = text.translate(whitespace_trans)
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
wrapped_lines = []
line_buffer = ""
@@ -313,11 +304,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrap_width -= margin
for word in words:
word_length = text_width(word)
word_length = len(word)
if word_length > wrap_width: # Break long words
if line_buffer:
wrapped_lines.append(line_buffer.strip())
wrapped_lines.append(line_buffer)
line_buffer = ""
line_length = 0
for i in range(0, word_length, wrap_width):
@@ -325,7 +316,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
continue
if line_length + word_length > wrap_width and word.strip():
wrapped_lines.append(line_buffer.strip())
wrapped_lines.append(line_buffer)
line_buffer = ""
line_length = 0
@@ -333,7 +324,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
line_length += word_length
if line_buffer:
wrapped_lines.append(line_buffer.strip())
wrapped_lines.append(line_buffer)
return wrapped_lines

View File

@@ -55,15 +55,10 @@ def edit_value(key: str, current_value: str) -> str:
# Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort":
sort_options = ["lastHeard", "name", "hops"]
return get_list_input("Sort By", current_value, sort_options)
elif key == "notification_sound":
sound_options = ["True", "False"]
return get_list_input("Notification Sound", current_value, sound_options)
# Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1)

View File

@@ -1,6 +1,5 @@
import yaml
import logging
import time
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import mt_config
@@ -134,29 +133,24 @@ def config_import(interface, filename):
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(configuration["owner"])
time.sleep(0.5)
if "owner_short" in configuration:
logging.info(f"Setting device owner short to {configuration['owner_short']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
time.sleep(0.5)
if "ownerShort" in configuration:
logging.info(f"Setting device owner short to {configuration['ownerShort']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
time.sleep(0.5)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode("^local").setURL(configuration["channel_url"])
time.sleep(0.5)
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode("^local").setURL(configuration["channelUrl"])
time.sleep(0.5)
if "location" in configuration:
alt = 0
@@ -175,14 +169,12 @@ def config_import(interface, filename):
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)
if "config" in configuration:
localConfig = interface.getNode("^local").localConfig
for section in configuration["config"]:
traverseConfig(section, configuration["config"][section], localConfig)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
if "module_config" in configuration:
moduleConfig = interface.getNode("^local").moduleConfig
@@ -193,7 +185,6 @@ def config_import(interface, filename):
moduleConfig,
)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
interface.getNode("^local", False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")

View File

@@ -116,14 +116,7 @@ def load_messages_from_db() -> None:
# Add messages to ui_state.all_messages grouped by hourly timestamp
hourly_messages = {}
for row in db_messages:
user_id, message, timestamp, ack_type = row
# Only ack_type is allowed to be None
if user_id is None or message is None or timestamp is None:
logging.warning(f"Skipping row with NULL required field(s): {row}")
continue
for user_id, message, timestamp, ack_type in db_messages:
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
if hour not in hourly_messages:
hourly_messages[hour] = []
@@ -137,13 +130,11 @@ def load_messages_from_db() -> None:
ack_str = config.nak_str
if user_id == str(interface_state.myNodeNum):
sanitized_message = message.replace("\x00", "")
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", sanitized_message)
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
sanitized_message = message.replace("\x00", "")
formatted_message = (
f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ",
sanitized_message,
message,
)
hourly_messages[hour].append(formatted_message)

View File

@@ -6,43 +6,10 @@ from typing import Any, Optional, List
from contact.ui.colors import get_color
from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text
from contact.ui.dialog import dialog
from contact.utilities.validation_rules import get_validation_for
def invalid_input(window: curses.window, message: str, redraw_func: Optional[callable] = None) -> None:
"""Displays an invalid input message in the given window and redraws if needed."""
cursor_y, cursor_x = window.getyx()
curses.curs_set(0)
dialog("Invalid Input", message)
if redraw_func:
redraw_func() # Redraw the original window content that got obscured
else:
window.refresh()
window.move(cursor_y, cursor_x)
curses.curs_set(1)
def get_text_input(prompt: str, selected_config: str, input_type: str) -> Optional[str]:
def get_text_input(prompt: str) -> Optional[str]:
"""Handles user input with wrapped text for long prompts."""
def redraw_input_win():
"""Redraw the input window with the current prompt and user input."""
input_win.erase()
input_win.border()
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
if row >= height - 3:
break
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
input_win.addstr(row + 1, col_start, user_input[:first_line_width], get_color("settings_default"))
for i, line in enumerate(wrap_text(user_input[first_line_width:], wrap_width=input_width)):
if row + 2 + i < height - 1:
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
input_win.refresh()
height = 8
width = 80
margin = 2 # Left and right margin
@@ -60,7 +27,6 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
# Wrap the prompt text
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
@@ -73,115 +39,34 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
input_win.refresh()
curses.curs_set(1)
min_value = 0
max_value = 4294967295
min_length = 0
max_length = None
if selected_config is not None:
validation = get_validation_for(selected_config) or {}
min_value = validation.get("min_value", 0)
max_value = validation.get("max_value", 4294967295)
min_length = validation.get("min_length", 0)
max_length = validation.get("max_length")
max_length = 4 if "shortName" in prompt else None
user_input = ""
# Start user input after the prompt text
col_start = margin + len(prompt_text)
first_line_width = input_width - len(prompt_text)
first_line_width = input_width - len(prompt_text) # Available space for first line
while True:
key = input_win.get_wch()
key = input_win.get_wch() # Waits for user input
if key == chr(27) or key == curses.KEY_LEFT:
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
input_win.erase()
input_win.refresh()
curses.curs_set(0)
return None
return None # Exit without saving
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
if not user_input.strip():
invalid_input(input_win, "Value cannot be empty.", redraw_func=redraw_input_win)
continue
length = len(user_input)
if min_length == max_length and max_length is not None:
if length != min_length:
invalid_input(
input_win, f"Value must be exactly {min_length} characters long.", redraw_func=redraw_input_win
)
continue
else:
if length < min_length:
invalid_input(
input_win,
f"Value must be at least {min_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if max_length is not None and length > max_length:
invalid_input(
input_win,
f"Value must be no more than {max_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if input_type is int:
if not user_input.isdigit():
invalid_input(input_win, "Only numeric digits (09) allowed.", redraw_func=redraw_input_win)
continue
int_val = int(user_input)
if not (min_value <= int_val <= max_value):
invalid_input(
input_win, f"Enter a number between {min_value} and {max_value}.", redraw_func=redraw_input_win
)
continue
curses.curs_set(0)
return int_val
elif input_type is float:
try:
float_val = float(user_input)
if not (min_value <= float_val <= max_value):
invalid_input(
input_win,
f"Enter a number between {min_value} and {max_value}.",
redraw_func=redraw_input_win,
)
continue
except ValueError:
invalid_input(input_win, "Must be a valid floating point number.", redraw_func=redraw_input_win)
continue
else:
curses.curs_set(0)
return float_val
else:
break
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): # Enter key
break
elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace
if user_input:
user_input = user_input[:-1] # Remove last character
elif max_length is None or len(user_input) < max_length:
try:
char = chr(key) if not isinstance(key, str) else key
if input_type is int:
if char.isdigit() or (char == "-" and len(user_input) == 0):
user_input += char
elif input_type is float:
if (
char.isdigit()
or (char == "." and "." not in user_input)
or (char == "-" and len(user_input) == 0)
):
user_input += char
else:
user_input += char
except ValueError:
pass # Ignore invalid input
elif max_length is None or len(user_input) < max_length: # Enforce max length
if isinstance(key, str):
user_input += key
else:
user_input += chr(key)
# First line must be manually handled before using wrap_text()
first_line = user_input[:first_line_width] # Cut to max first line width
@@ -210,24 +95,20 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
curses.curs_set(0)
input_win.erase()
input_win.refresh()
return user_input.strip()
return user_input
def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
"""Handles user input for editing up to 3 Admin Keys in Base64 format."""
def to_base64(byte_strings):
"""Convert byte values to Base64-encoded strings."""
return [base64.b64encode(b).decode() for b in byte_strings]
def is_valid_base64(s):
"""Check if a string is valid Base64 or blank."""
if s == "":
return True
"""Check if a string is valid Base64."""
try:
decoded = base64.b64decode(s, validate=True)
return len(decoded) == 32 # Ensure it's exactly 32 bytes
except (binascii.Error, ValueError):
except binascii.Error:
return False
cvalue = to_base64(current_value) # Convert current values to Base64
@@ -247,7 +128,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
# Editable list of values (max 3 values)
user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields
cursor_pos = 0 # Track which value is being edited
invalid_input = ""
error_message = ""
while True:
repeated_win.erase()
@@ -267,8 +148,8 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if invalid_input:
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
repeated_win.refresh()
key = repeated_win.getch()
@@ -286,7 +167,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
curses.curs_set(0)
return user_values # Return the edited Base64 values
else:
invalid_input = "Error: Each key must be valid Base64 and 32 bytes long!"
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
@@ -297,7 +178,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
invalid_input = "" # Clear error if user starts fixing input
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
@@ -319,7 +200,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
# Editable list of values (max 3 values)
user_values = current_value[:3]
cursor_pos = 0 # Track which value is being edited
invalid_input = ""
error_message = ""
while True:
repeated_win.erase()
@@ -339,8 +220,8 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if invalid_input:
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
repeated_win.refresh()
key = repeated_win.getch()
@@ -366,7 +247,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
invalid_input = "" # Clear error if user starts fixing input
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs

View File

@@ -1,5 +1,4 @@
import datetime
import time
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
@@ -135,31 +134,3 @@ def get_time_ago(timestamp):
if unit != "s":
return f"{value} {unit} ago"
return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ui_state.all_messages[channel_id].append((prefix,message))

View File

@@ -1,23 +0,0 @@
validation_rules = {
"shortName": {"max_length": 4},
"longName": {"max_length": 32},
"fixed_pin": {"min_length": 6, "max_length": 6},
"position_flags": {"max_length": 3},
"enabled_protocols": {"max_value": 2},
"hop_limit": {"max_value": 7},
"latitude": {"min_value": -90, "max_value": 90},
"longitude": {"min_value": -180, "max_value": 180},
"altitude": {"min_value": -4294967295, "max_value": 4294967295},
"red": {"max_value": 255},
"green": {"max_value": 255},
"blue": {"max_value": 255},
"current": {"max_value": 255},
"position_precision": {"max_value": 32},
}
def get_validation_for(key: str) -> dict:
for rule_key, config in validation_rules.items():
if rule_key in key:
return config
return {}

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.16"
version = "1.3.8"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}