Compare commits

..

13 Commits

Author SHA1 Message Date
pdxlocations
74d19c5ca7 clean-up 2025-05-18 12:17:27 -07:00
pdxlocations
33f7db4cb9 Merge branch 'main' into refactor-chat-ui 2025-05-18 08:34:01 -07:00
pdxlocations
06d151e991 hack fix 2025-05-18 08:21:53 -07:00
pdxlocations
50efa50923 working changes 2025-05-17 23:27:19 -07:00
pdxlocations
cef86ab185 I think it works! 2025-05-17 23:14:37 -07:00
pdxlocations
3d305a22b9 closer changes 2025-05-17 22:33:44 -07:00
pdxlocations
6d45788a5b mostly working changes 2025-04-28 22:20:00 -07:00
pdxlocations
e0e09aeccf so close 2025-04-20 22:04:41 -07:00
pdxlocations
bfb7ed0278 more almost working changes 2025-04-20 20:59:10 -07:00
pdxlocations
b7a275f725 Almost working changes 2025-04-19 23:12:59 -07:00
pdxlocations
587d79cb93 move lock to app state 2025-04-19 21:22:47 -07:00
pdxlocations
eb79675be0 convert globals to dataclass 2025-04-19 21:17:19 -07:00
pdxlocations
61e1799199 init 2025-04-19 21:13:46 -07:00
11 changed files with 421 additions and 573 deletions

View File

@@ -53,21 +53,22 @@ logging.basicConfig(
app_state.lock = threading.Lock() app_state.lock = threading.Lock()
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Main Program Logic # Main Program Logic
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
def prompt_region_if_unset(args: object) -> None:
"""Prompt user to set region if it is unset."""
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
def initialize_globals(args: object) -> None: def initialize_globals(args) -> None:
"""Initializes interface and shared globals.""" """Initializes interface and shared globals."""
interface_state.interface = initialize_interface(args)
# Prompt for region if unset
if interface_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
interface_state.myNodeNum = get_nodeNum() interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels() ui_state.channel_list = get_channels()
@@ -80,63 +81,55 @@ def initialize_globals(args: object) -> None:
def main(stdscr: curses.window) -> None: def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI.""" """Main entry point for the curses UI."""
output_capture = io.StringIO() output_capture = io.StringIO()
try: try:
setup_colors() with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
draw_splash(stdscr) setup_colors()
draw_splash(stdscr)
args = setup_parser().parse_args() args = setup_parser().parse_args()
if getattr(args, "settings", False): if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True) subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return return
logging.info("Initializing interface...") logging.info("Initializing interface...")
with app_state.lock: with app_state.lock:
interface_state.interface = initialize_interface(args) initialize_globals(args)
logging.info("Starting main UI")
if interface_state.interface.localNode.localConfig.lora.region == 0: main_ui(stdscr)
prompt_region_if_unset(args)
initialize_globals(args) except Exception as e:
logging.info("Starting main UI") console_output = output_capture.getvalue()
logging.error("Uncaught exception: %s", e)
try: logging.error("Traceback: %s", traceback.format_exc())
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture): logging.error("Console output:\n%s", console_output)
main_ui(stdscr)
except Exception:
console_output = output_capture.getvalue()
logging.error("Uncaught exception inside main_ui")
logging.error("Traceback:\n%s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
return
except Exception:
raise raise
def start() -> None: def start() -> None:
"""Entry point for the application.""" """Launch curses wrapper and redirect logs to file."""
if "--help" in sys.argv or "-h" in sys.argv: if "--help" in sys.argv or "-h" in sys.argv:
setup_parser().print_help() setup_parser().print_help()
sys.exit(0) sys.exit(0)
try: with open(config.log_file_path, "a", buffering=1) as log_f:
curses.wrapper(main) sys.stdout = log_f
except KeyboardInterrupt: sys.stderr = log_f
logging.info("User exited with Ctrl+C")
sys.exit(0) with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
except Exception as e: try:
logging.critical("Fatal error", exc_info=True) curses.wrapper(main)
try: except KeyboardInterrupt:
curses.endwin() logging.info("User exited with Ctrl+C")
except Exception: sys.exit(0)
pass except Exception as e:
print("Fatal error:", e) logging.error("Fatal error: %s", e)
traceback.print_exc() logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1) sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,14 +1,9 @@
import logging import logging
import os import time
import platform from datetime import datetime
import shutil
import subprocess
from typing import Any, Dict from typing import Any, Dict
from contact.utilities.utils import ( from contact.utilities.utils import refresh_node_list
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import ( from contact.ui.contact_ui import (
draw_packetlog_win, draw_packetlog_win,
draw_node_list, draw_node_list,
@@ -27,46 +22,6 @@ import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
elif shutil.which("ffplay") and os.path.exists(ogg_path):
executable = "ffplay"
sound_path = ogg_path
elif shutil.which("aplay") and os.path.exists(wav_path):
executable = "aplay"
sound_path = wav_path
else:
logging.warning("No suitable sound player or sound file found on Linux")
if executable and sound_path:
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
def on_receive(packet: Dict[str, Any], interface: Any) -> None: def on_receive(packet: Dict[str, Any], interface: Any) -> None:
""" """
Handles an incoming packet from a Meshtastic interface. Handles an incoming packet from a Meshtastic interface.
@@ -98,10 +53,6 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
maybe_store_nodeinfo_in_db(packet) maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP": elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
if config.notification_sound == "True":
play_sound()
message_bytes = packet["decoded"]["payload"] message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8") message_string = message_bytes.decode("utf-8")
@@ -125,9 +76,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
channel_number = ui_state.channel_list.index(packet["from"]) channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number] if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number) add_notification(channel_number)
refresh_channels = True refresh_channels = True
else: else:
@@ -137,14 +86,40 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"] message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":" message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string) if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
if refresh_channels: if refresh_channels:
draw_channel_list() draw_channel_list()
if refresh_messages: if refresh_messages:
draw_messages_window(True) draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string) save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
except KeyError as e: except KeyError as e:
logging.error(f"Error processing packet: {e}") logging.error(f"Error processing packet: {e}")

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict from typing import Any, Dict
import google.protobuf.json_format import google.protobuf.json_format
@@ -15,8 +16,6 @@ import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.utils import add_new_message
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp} ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -147,9 +146,8 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
update_node_info_in_db(packet["from"], chat_archived=False) update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = ui_state.channel_list.index(packet["from"]) channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id == ui_state.channel_list[ui_state.selected_channel]: if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True refresh_messages = True
else: else:
add_notification(channel_number) add_notification(channel_number)
@@ -157,14 +155,18 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n" message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str) if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
if refresh_channels: if refresh_channels:
draw_channel_list() draw_channel_list()
if refresh_messages: if refresh_messages:
draw_messages_window(True) draw_messages_window(True)
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None: def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
""" """
@@ -188,7 +190,32 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
channelIndex=send_on_channel, channelIndex=send_on_channel,
) )
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message) # Add sent message to the messages dictionary
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message) timestamp = save_message_to_db(channel_id, myid, message)
@@ -203,14 +230,10 @@ def send_traceroute() -> None:
""" """
Sends a RouteDiscovery protobuf to the selected node. Sends a RouteDiscovery protobuf to the selected node.
""" """
channel_id = ui_state.node_list[ui_state.selected_node]
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
r = mesh_pb2.RouteDiscovery() r = mesh_pb2.RouteDiscovery()
interface_state.interface.sendData( interface_state.interface.sendData(
r, r,
destinationId=channel_id, destinationId=ui_state.node_list[ui_state.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP, portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True, wantResponse=True,
onResponse=on_response_traceroute, onResponse=on_response_traceroute,

View File

@@ -1,4 +1,5 @@
import curses import curses
import textwrap
import logging import logging
import traceback import traceback
from typing import Union from typing import Union
@@ -11,36 +12,28 @@ from contact.utilities.db_handler import get_name_from_database, update_node_inf
from contact.utilities.input_handlers import get_list_input from contact.utilities.input_handlers import get_list_input
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.ui.dialog import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines
from contact.utilities.singleton import ui_state, interface_state from contact.utilities.singleton import ui_state, interface_state
def handle_resize(stdscr: curses.window, firstrun: bool) -> None: def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
"""Handle terminal resize events and redraw the UI accordingly."""
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win
# Calculate window max dimensions # Calculate window max dimensions
height, width = stdscr.getmaxyx() height, width = stdscr.getmaxyx()
# Define window dimensions and positions # Define window dimensions and positions
channel_width = int(config.channel_list_16ths) * (width // 16) channel_width = 3 * (width // 16)
nodes_width = int(config.node_list_16ths) * (width // 16) nodes_width = 5 * (width // 16)
messages_width = width - channel_width - nodes_width messages_width = width - channel_width - nodes_width
entry_height = 3
function_height = 3
y_pad = entry_height + function_height
packet_log_height = int(height / 3)
if firstrun: if firstrun:
entry_win = curses.newwin(entry_height, width, 0, 0) entry_win = curses.newwin(3, width, 0, 0)
channel_win = curses.newwin(height - y_pad, channel_width, entry_height, 0) channel_win = curses.newwin(height - 6, channel_width, 3, 0)
messages_win = curses.newwin(height - y_pad, messages_width, entry_height, channel_width) messages_win = curses.newwin(height - 6, messages_width, 3, channel_width)
nodes_win = curses.newwin(height - y_pad, nodes_width, entry_height, channel_width + messages_width) nodes_win = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
function_win = curses.newwin(function_height, width, height - function_height, 0) function_win = curses.newwin(3, width, height - 3, 0)
packetlog_win = curses.newwin( packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width)
packet_log_height, messages_width, height - packet_log_height - function_height, channel_width
)
# Will be resized to what we need when drawn # Will be resized to what we need when drawn
messages_pad = curses.newpad(1, 1) messages_pad = curses.newpad(1, 1)
@@ -65,19 +58,19 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
entry_win.resize(3, width) entry_win.resize(3, width)
channel_win.resize(height - y_pad, channel_width) channel_win.resize(height - 6, channel_width)
messages_win.resize(height - y_pad, messages_width) messages_win.resize(height - 6, messages_width)
messages_win.mvwin(3, channel_width) messages_win.mvwin(3, channel_width)
nodes_win.resize(height - y_pad, nodes_width) nodes_win.resize(height - 6, nodes_width)
nodes_win.mvwin(entry_height, channel_width + messages_width) nodes_win.mvwin(3, channel_width + messages_width)
function_win.resize(3, width) function_win.resize(3, width)
function_win.mvwin(height - function_height, 0) function_win.mvwin(height - 3, 0)
packetlog_win.resize(packet_log_height, messages_width) packetlog_win.resize(int(height / 3), messages_width)
packetlog_win.mvwin(height - packet_log_height - function_height, channel_width) packetlog_win.mvwin(height - int(height / 3) - 3, channel_width)
# Draw window borders # Draw window borders
for win in [channel_win, entry_win, nodes_win, messages_win, function_win]: for win in [channel_win, entry_win, nodes_win, messages_win, function_win]:
@@ -100,7 +93,6 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
def main_ui(stdscr: curses.window) -> None: def main_ui(stdscr: curses.window) -> None:
"""Main UI loop for the curses interface."""
global input_text global input_text
input_text = "" input_text = ""
stdscr.keypad(True) stdscr.keypad(True)
@@ -108,7 +100,7 @@ def main_ui(stdscr: curses.window) -> None:
handle_resize(stdscr, True) handle_resize(stdscr, True)
while True: while True:
draw_text_field(entry_win, f"Input: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input")) draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window # Get user input from entry window
char = entry_win.get_wch() char = entry_win.get_wch()
@@ -116,59 +108,307 @@ def main_ui(stdscr: curses.window) -> None:
# draw_debug(f"Keypress: {char}") # draw_debug(f"Keypress: {char}")
if char == curses.KEY_UP: if char == curses.KEY_UP:
handle_up() if ui_state.current_window == 0:
scroll_channels(-1)
elif ui_state.current_window == 1:
scroll_messages(-1)
elif ui_state.current_window == 2:
scroll_nodes(-1)
elif char == curses.KEY_DOWN: elif char == curses.KEY_DOWN:
handle_down() if ui_state.current_window == 0:
scroll_channels(1)
elif ui_state.current_window == 1:
scroll_messages(1)
elif ui_state.current_window == 2:
scroll_nodes(1)
elif char == curses.KEY_HOME: elif char == curses.KEY_HOME:
handle_home() if ui_state.current_window == 0:
select_channel(0)
elif ui_state.current_window == 1:
ui_state.selected_message = 0
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(0)
elif char == curses.KEY_END: elif char == curses.KEY_END:
handle_end() if ui_state.current_window == 0:
select_channel(len(ui_state.channel_list) - 1)
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
ui_state.selected_message = max(msg_line_count - get_msg_window_lines(messages_win, packetlog_win), 0)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(len(ui_state.node_list) - 1)
elif char == curses.KEY_PPAGE: elif char == curses.KEY_PPAGE:
handle_pageup() if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif ui_state.current_window == 1:
ui_state.selected_message = max(
ui_state.selected_message - get_msg_window_lines(messages_win, packetlog_win), 0
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(
ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2)
) # select_node will bounds check for us
elif char == curses.KEY_NPAGE: elif char == curses.KEY_NPAGE:
handle_pagedown() if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
ui_state.selected_message = min(
ui_state.selected_message + get_msg_window_lines(messages_win, packetlog_win),
msg_line_count - get_msg_window_lines(messages_win, packetlog_win),
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(
ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2)
) # select_node will bounds check for us
elif char == curses.KEY_LEFT or char == curses.KEY_RIGHT: elif char == curses.KEY_LEFT or char == curses.KEY_RIGHT:
handle_leftright(char) delta = -1 if char == curses.KEY_LEFT else 1
old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3
if old_window == 0:
channel_win.attrset(get_color("window_frame"))
channel_win.box()
channel_win.refresh()
refresh_pad(0)
if old_window == 1:
messages_win.attrset(get_color("window_frame"))
messages_win.box()
messages_win.refresh()
refresh_pad(1)
elif old_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame"))
nodes_win.box()
nodes_win.refresh()
refresh_pad(2)
if ui_state.current_window == 0:
channel_win.attrset(get_color("window_frame_selected"))
channel_win.box()
channel_win.attrset(get_color("window_frame"))
channel_win.refresh()
refresh_pad(0)
elif ui_state.current_window == 1:
messages_win.attrset(get_color("window_frame_selected"))
messages_win.box()
messages_win.attrset(get_color("window_frame"))
messages_win.refresh()
refresh_pad(1)
elif ui_state.current_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame_selected"))
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
nodes_win.refresh()
refresh_pad(2)
# Check for Esc
elif char == chr(27):
break
# Check for Ctrl + t
elif char == chr(20):
send_traceroute()
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(
stdscr,
"Traceroute Sent",
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
)
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)): elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
input_text = handle_enter(input_text) if ui_state.current_window == 2:
node_list = ui_state.node_list
if node_list[ui_state.selected_node] not in ui_state.channel_list:
ui_state.channel_list.append(node_list[ui_state.selected_node])
if node_list[ui_state.selected_node] not in ui_state.all_messages:
ui_state.all_messages[node_list[ui_state.selected_node]] = []
elif char == chr(20): # Ctrl + t for Traceroute ui_state.selected_channel = ui_state.channel_list.index(node_list[ui_state.selected_node])
handle_ctrl_t(stdscr)
if is_chat_archived(ui_state.channel_list[ui_state.selected_channel]):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=False)
ui_state.selected_node = 0
ui_state.current_window = 0
draw_node_list()
draw_channel_list()
draw_messages_window(True)
elif len(input_text) > 0:
# Enter key pressed, send user input as message
send_message(input_text, channel=ui_state.selected_channel)
draw_messages_window(True)
# Clear entry window and reset input text
input_text = ""
entry_win.erase()
elif char in (curses.KEY_BACKSPACE, chr(127)): elif char in (curses.KEY_BACKSPACE, chr(127)):
input_text = handle_backspace(entry_win, input_text) if input_text:
input_text = input_text[:-1]
y, x = entry_win.getyx()
entry_win.move(y, x - 1)
entry_win.addch(" ") #
entry_win.move(y, x - 1)
entry_win.refresh()
elif char == "`": # ` Launch the settings interface elif char == "`": # ` Launch the settings interface
handle_backtick(stdscr) curses.curs_set(0)
settings_menu(stdscr, interface_state.interface)
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
elif char == chr(16): # Ctrl + P for Packet Log elif char == chr(16):
handle_ctrl_p() # Display packet log
if ui_state.display_log is False:
ui_state.display_log = True
draw_messages_window(True)
else:
ui_state.display_log = False
packetlog_win.erase()
draw_messages_window(True)
elif char == curses.KEY_RESIZE: elif char == curses.KEY_RESIZE:
input_text = "" input_text = ""
handle_resize(stdscr, False) handle_resize(stdscr, False)
elif char == chr(4): # Ctrl + D to delete current channel or node # ^D
handle_ctrl_d() elif char == chr(4):
if ui_state.current_window == 0:
if isinstance(ui_state.channel_list[ui_state.selected_channel], int):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=True)
elif char == chr(31): # Ctrl + / to search # Shift notifications up to account for deleted item
handle_ctrl_fslash() for i in range(len(ui_state.notifications)):
if ui_state.notifications[i] > ui_state.selected_channel:
ui_state.notifications[i] -= 1
elif char == chr(6): # Ctrl + F to toggle favorite del ui_state.channel_list[ui_state.selected_channel]
handle_ctrl_f(stdscr) ui_state.selected_channel = min(ui_state.selected_channel, len(ui_state.channel_list) - 1)
select_channel(ui_state.selected_channel)
draw_channel_list()
draw_messages_window()
elif char == chr(7): # Ctrl + G to toggle ignored if ui_state.current_window == 2:
handle_ctlr_g(stdscr) curses.curs_set(0)
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from nodedb?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node])
elif char == chr(27): # Escape to exit # Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
break del interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
# Convert to "!hex" representation that interface.nodes uses
hexid = f"!{hex(ui_state.node_list[ui_state.selected_node])[2:]}"
del interface_state.interface.nodes[hexid]
ui_state.node_list.pop(ui_state.selected_node)
draw_messages_window()
draw_node_list()
else:
draw_messages_window()
curses.curs_set(1)
continue
# ^/
elif char == chr(31):
if ui_state.current_window == 2 or ui_state.current_window == 0:
search(ui_state.current_window)
# ^F
elif char == chr(6):
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isFavorite" not in selectedNode or selectedNode["isFavorite"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Favorite?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isFavorite"
] = True
refresh_node_list()
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Favorites?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isFavorite"
] = False
refresh_node_list()
handle_resize(stdscr, False)
elif char == chr(7):
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isIgnored" not in selectedNode or selectedNode["isIgnored"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isIgnored"
] = True
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isIgnored"
] = False
handle_resize(stdscr, False)
else: else:
# Append typed character to input text # Append typed character to input text
@@ -178,319 +418,7 @@ def main_ui(stdscr: curses.window) -> None:
input_text += chr(char) input_text += chr(char)
def handle_up() -> None:
"""Handle key up events to scroll the current window."""
if ui_state.current_window == 0:
scroll_channels(-1)
elif ui_state.current_window == 1:
scroll_messages(-1)
elif ui_state.current_window == 2:
scroll_nodes(-1)
def handle_down() -> None:
"""Handle key down events to scroll the current window."""
if ui_state.current_window == 0:
scroll_channels(1)
elif ui_state.current_window == 1:
scroll_messages(1)
elif ui_state.current_window == 2:
scroll_nodes(1)
def handle_home() -> None:
"""Handle home key events to select the first item in the current window."""
if ui_state.current_window == 0:
select_channel(0)
elif ui_state.current_window == 1:
ui_state.selected_message = 0
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(0)
def handle_end() -> None:
"""Handle end key events to select the last item in the current window."""
if ui_state.current_window == 0:
select_channel(len(ui_state.channel_list) - 1)
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
ui_state.selected_message = max(msg_line_count - get_msg_window_lines(messages_win, packetlog_win), 0)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(len(ui_state.node_list) - 1)
def handle_pageup() -> None:
"""Handle page up key events to scroll the current window by a page."""
if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif ui_state.current_window == 1:
ui_state.selected_message = max(
ui_state.selected_message - get_msg_window_lines(messages_win, packetlog_win), 0
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us
def handle_pagedown() -> None:
"""Handle page down key events to scroll the current window down."""
if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
ui_state.selected_message = min(
ui_state.selected_message + get_msg_window_lines(messages_win, packetlog_win),
msg_line_count - get_msg_window_lines(messages_win, packetlog_win),
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us
def handle_leftright(char: int) -> None:
"""Handle left/right key events to switch between windows."""
delta = -1 if char == curses.KEY_LEFT else 1
old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3
if old_window == 0:
channel_win.attrset(get_color("window_frame"))
channel_win.box()
channel_win.refresh()
refresh_pad(0)
if old_window == 1:
messages_win.attrset(get_color("window_frame"))
messages_win.box()
messages_win.refresh()
refresh_pad(1)
elif old_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame"))
nodes_win.box()
nodes_win.refresh()
refresh_pad(2)
if ui_state.current_window == 0:
channel_win.attrset(get_color("window_frame_selected"))
channel_win.box()
channel_win.attrset(get_color("window_frame"))
channel_win.refresh()
refresh_pad(0)
elif ui_state.current_window == 1:
messages_win.attrset(get_color("window_frame_selected"))
messages_win.box()
messages_win.attrset(get_color("window_frame"))
messages_win.refresh()
refresh_pad(1)
elif ui_state.current_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame_selected"))
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
nodes_win.refresh()
refresh_pad(2)
def handle_enter(input_text: str) -> str:
"""Handle Enter key events to send messages or select channels."""
if ui_state.current_window == 2:
node_list = ui_state.node_list
if node_list[ui_state.selected_node] not in ui_state.channel_list:
ui_state.channel_list.append(node_list[ui_state.selected_node])
if node_list[ui_state.selected_node] not in ui_state.all_messages:
ui_state.all_messages[node_list[ui_state.selected_node]] = []
ui_state.selected_channel = ui_state.channel_list.index(node_list[ui_state.selected_node])
if is_chat_archived(ui_state.channel_list[ui_state.selected_channel]):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=False)
ui_state.selected_node = 0
ui_state.current_window = 0
draw_node_list()
draw_channel_list()
draw_messages_window(True)
return input_text
elif len(input_text) > 0:
# Enter key pressed, send user input as message
send_message(input_text, channel=ui_state.selected_channel)
draw_messages_window(True)
# Clear entry window and reset input text
entry_win.erase()
return ""
return input_text
def handle_ctrl_t(stdscr: curses.window) -> None:
"""Handle Ctrl + T key events to send a traceroute."""
send_traceroute()
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(
stdscr,
f"Traceroute Sent To: {get_name_from_database(ui_state.node_list[ui_state.selected_node])}",
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
)
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
def handle_backspace(entry_win: curses.window, input_text: str) -> str:
"""Handle backspace key events to remove the last character from input text."""
if input_text:
input_text = input_text[:-1]
y, x = entry_win.getyx()
entry_win.move(y, x - 1)
entry_win.addch(" ") #
entry_win.move(y, x - 1)
entry_win.refresh()
return input_text
def handle_backtick(stdscr: curses.window) -> None:
"""Handle backtick key events to open the settings menu."""
curses.curs_set(0)
settings_menu(stdscr, interface_state.interface)
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
def handle_ctrl_p() -> None:
"""Handle Ctrl + P key events to toggle the packet log display."""
# Display packet log
if ui_state.display_log is False:
ui_state.display_log = True
draw_messages_window(True)
else:
ui_state.display_log = False
packetlog_win.erase()
draw_messages_window(True)
def handle_ctrl_d() -> None:
if ui_state.current_window == 0:
if isinstance(ui_state.channel_list[ui_state.selected_channel], int):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=True)
# Shift notifications up to account for deleted item
for i in range(len(ui_state.notifications)):
if ui_state.notifications[i] > ui_state.selected_channel:
ui_state.notifications[i] -= 1
del ui_state.channel_list[ui_state.selected_channel]
ui_state.selected_channel = min(ui_state.selected_channel, len(ui_state.channel_list) - 1)
select_channel(ui_state.selected_channel)
draw_channel_list()
draw_messages_window()
if ui_state.current_window == 2:
curses.curs_set(0)
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from nodedb?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node])
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
del interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
# Convert to "!hex" representation that interface.nodes uses
hexid = f"!{hex(ui_state.node_list[ui_state.selected_node])[2:]}"
del interface_state.interface.nodes[hexid]
ui_state.node_list.pop(ui_state.selected_node)
draw_messages_window()
draw_node_list()
else:
draw_messages_window()
curses.curs_set(1)
def handle_ctrl_fslash() -> None:
"""Handle Ctrl + / key events to search in the current window."""
if ui_state.current_window == 2 or ui_state.current_window == 0:
search(ui_state.current_window)
def handle_ctrl_f(stdscr: curses.window) -> None:
"""Handle Ctrl + F key events to toggle favorite status of the selected node."""
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isFavorite" not in selectedNode or selectedNode["isFavorite"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Favorite?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isFavorite"] = True
refresh_node_list()
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Favorites?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isFavorite"] = False
refresh_node_list()
handle_resize(stdscr, False)
def handle_ctlr_g(stdscr: curses.window) -> None:
"""Handle Ctrl + G key events to toggle ignored status of the selected node."""
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isIgnored" not in selectedNode or selectedNode["isIgnored"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isIgnored"] = True
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
interface_state.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isIgnored"] = False
handle_resize(stdscr, False)
def draw_channel_list() -> None: def draw_channel_list() -> None:
"""Update the channel list window and pad based on the current state."""
channel_pad.erase() channel_pad.erase()
win_width = channel_win.getmaxyx()[1] win_width = channel_win.getmaxyx()[1]
@@ -551,7 +479,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
row = 0 row = 0
for prefix, message in messages: for prefix, message in messages:
full_message = f"{prefix}{message}" full_message = f"{prefix}{message}"
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2) wrapped_lines = textwrap.wrap(full_message, messages_win.getmaxyx()[1] - 2)
msg_line_count += len(wrapped_lines) msg_line_count += len(wrapped_lines)
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1]) messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])
@@ -596,7 +524,6 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
def draw_node_list() -> None: def draw_node_list() -> None:
"""Update the nodes list window and pad based on the current state."""
global nodes_pad global nodes_pad
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time # This didn't work, for some reason an error is thown on startup, so we just create the pad every time
@@ -645,7 +572,6 @@ def draw_node_list() -> None:
def select_channel(idx: int) -> None: def select_channel(idx: int) -> None:
"""Select a channel by index and update the UI state accordingly."""
old_selected_channel = ui_state.selected_channel old_selected_channel = ui_state.selected_channel
ui_state.selected_channel = max(0, min(idx, len(ui_state.channel_list) - 1)) ui_state.selected_channel = max(0, min(idx, len(ui_state.channel_list) - 1))
draw_messages_window(True) draw_messages_window(True)
@@ -667,7 +593,6 @@ def select_channel(idx: int) -> None:
def scroll_channels(direction: int) -> None: def scroll_channels(direction: int) -> None:
"""Scroll through the channel list by a given direction."""
new_selected_channel = ui_state.selected_channel + direction new_selected_channel = ui_state.selected_channel + direction
if new_selected_channel < 0: if new_selected_channel < 0:
@@ -679,7 +604,6 @@ def scroll_channels(direction: int) -> None:
def scroll_messages(direction: int) -> None: def scroll_messages(direction: int) -> None:
"""Scroll through the messages in the current channel by a given direction."""
ui_state.selected_message += direction ui_state.selected_message += direction
msg_line_count = messages_pad.getmaxyx()[0] msg_line_count = messages_pad.getmaxyx()[0]
@@ -712,7 +636,6 @@ def scroll_messages(direction: int) -> None:
def select_node(idx: int) -> None: def select_node(idx: int) -> None:
"""Select a node by index and update the UI state accordingly."""
old_selected_node = ui_state.selected_node old_selected_node = ui_state.selected_node
ui_state.selected_node = max(0, min(idx, len(ui_state.node_list) - 1)) ui_state.selected_node = max(0, min(idx, len(ui_state.node_list) - 1))
@@ -729,7 +652,6 @@ def select_node(idx: int) -> None:
def scroll_nodes(direction: int) -> None: def scroll_nodes(direction: int) -> None:
"""Scroll through the node list by a given direction."""
new_selected_node = ui_state.selected_node + direction new_selected_node = ui_state.selected_node + direction
if new_selected_node < 0: if new_selected_node < 0:
@@ -741,7 +663,7 @@ def scroll_nodes(direction: int) -> None:
def draw_packetlog_win() -> None: def draw_packetlog_win() -> None:
"""Draw the packet log window with the latest packets."""
columns = [10, 10, 15, 30] columns = [10, 10, 15, 30]
span = 0 span = 0
@@ -794,7 +716,6 @@ def draw_packetlog_win() -> None:
def search(win: int) -> None: def search(win: int) -> None:
"""Search for a node or channel based on user input."""
start_idx = ui_state.selected_node start_idx = ui_state.selected_node
select_func = select_node select_func = select_node
@@ -843,7 +764,6 @@ def search(win: int) -> None:
def draw_node_details() -> None: def draw_node_details() -> None:
"""Draw the details of the selected node in the function window."""
node = None node = None
try: try:
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]] node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
@@ -907,18 +827,16 @@ def draw_node_details() -> None:
def draw_help() -> None: def draw_help() -> None:
"""Draw the help text in the function window."""
cmds = [ cmds = [
"↑→↓← = Select", "↑→↓← = Select",
" ENTER = Send", " ENTER = Send",
" ` = Settings", " ` = Settings",
" ESC = Quit", " ^P = Packet Log",
" ^P = Packet Log", " ESC = Quit",
" ^t = Traceroute", " ^t = Traceroute",
" ^d = Archive Chat", " ^d = Archive Chat",
" ^f = Favorite", " ^f = Favorite",
" ^g = Ignore", " ^g = Ignore",
" ^/ = Search",
] ]
function_str = "" function_str = ""
for s in cmds: for s in cmds:

View File

@@ -123,18 +123,15 @@ def initialize_config() -> Dict[str, object]:
"settings_breadcrumbs": ["green", "black"], "settings_breadcrumbs": ["green", "black"],
"settings_warning": ["green", "black"], "settings_warning": ["green", "black"],
"settings_note": ["green", "black"], "settings_note": ["green", "black"],
"node_favorite": ["cyan", "green"], "node_favorite": ["cyan", "white"],
"node_ignored": ["red", "black"], "node_ignored": ["red", "white"],
} }
default_config_variables = { default_config_variables = {
"channel_list_16ths": "3",
"node_list_16ths": "5",
"db_file_path": db_file_path, "db_file_path": db_file_path,
"log_file_path": log_file_path, "log_file_path": log_file_path,
"message_prefix": ">>", "message_prefix": ">>",
"sent_message_prefix": ">> Sent", "sent_message_prefix": ">> Sent",
"notification_symbol": "*", "notification_symbol": "*",
"notification_sound": "True",
"ack_implicit_str": "[◌]", "ack_implicit_str": "[◌]",
"ack_str": "[✓]", "ack_str": "[✓]",
"nak_str": "[x]", "nak_str": "[x]",
@@ -173,23 +170,18 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
global db_file_path, log_file_path, message_prefix, sent_message_prefix global db_file_path, log_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global node_list_16ths, channel_list_16ths
global theme, COLOR_CONFIG global theme, COLOR_CONFIG
global node_sort, notification_sound global node_sort
channel_list_16ths = loaded_config["channel_list_16ths"]
node_list_16ths = loaded_config["node_list_16ths"]
db_file_path = loaded_config["db_file_path"] db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"] log_file_path = loaded_config["log_file_path"]
message_prefix = loaded_config["message_prefix"] message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"] sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"] notification_symbol = loaded_config["notification_symbol"]
notification_sound = loaded_config["notification_sound"]
ack_implicit_str = loaded_config["ack_implicit_str"] ack_implicit_str = loaded_config["ack_implicit_str"]
ack_str = loaded_config["ack_str"] ack_str = loaded_config["ack_str"]
nak_str = loaded_config["nak_str"] nak_str = loaded_config["nak_str"]
ack_unknown_str = loaded_config["ack_unknown_str"] ack_unknown_str = loaded_config["ack_unknown_str"]
node_sort = loaded_config["node_sort"]
theme = loaded_config["theme"] theme = loaded_config["theme"]
if theme == "dark": if theme == "dark":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"] COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
@@ -197,6 +189,7 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"] COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green": elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"] COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
node_sort = loaded_config["node_sort"]
# Call the function when the script is imported # Call the function when the script is imported

View File

@@ -1,7 +1,5 @@
import curses import curses
import re import re
from unicodedata import east_asian_width
from contact.ui.colors import get_color from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict from typing import Any, Optional, List, Dict
@@ -295,16 +293,9 @@ def get_wrapped_help_text(
return wrapped_help return wrapped_help
def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def wrap_text(text: str, wrap_width: int) -> List[str]: def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words.""" """Wraps text while preserving spaces and breaking long words."""
whitespace = '\t\n\x0b\x0c\r '
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
text = text.translate(whitespace_trans)
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
wrapped_lines = [] wrapped_lines = []
line_buffer = "" line_buffer = ""
@@ -313,11 +304,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrap_width -= margin wrap_width -= margin
for word in words: for word in words:
word_length = text_width(word) word_length = len(word)
if word_length > wrap_width: # Break long words if word_length > wrap_width: # Break long words
if line_buffer: if line_buffer:
wrapped_lines.append(line_buffer.strip()) wrapped_lines.append(line_buffer)
line_buffer = "" line_buffer = ""
line_length = 0 line_length = 0
for i in range(0, word_length, wrap_width): for i in range(0, word_length, wrap_width):
@@ -325,7 +316,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
continue continue
if line_length + word_length > wrap_width and word.strip(): if line_length + word_length > wrap_width and word.strip():
wrapped_lines.append(line_buffer.strip()) wrapped_lines.append(line_buffer)
line_buffer = "" line_buffer = ""
line_length = 0 line_length = 0
@@ -333,7 +324,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
line_length += word_length line_length += word_length
if line_buffer: if line_buffer:
wrapped_lines.append(line_buffer.strip()) wrapped_lines.append(line_buffer)
return wrapped_lines return wrapped_lines

View File

@@ -55,15 +55,10 @@ def edit_value(key: str, current_value: str) -> str:
# Load theme names dynamically from the JSON # Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")] theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return get_list_input("Select Theme", current_value, theme_options) return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort": elif key == "node_sort":
sort_options = ["lastHeard", "name", "hops"] sort_options = ["lastHeard", "name", "hops"]
return get_list_input("Sort By", current_value, sort_options) return get_list_input("Sort By", current_value, sort_options)
elif key == "notification_sound":
sound_options = ["True", "False"]
return get_list_input("Notification Sound", current_value, sound_options)
# Standard Input Mode (Scrollable) # Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default")) edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1) curses.curs_set(1)

View File

@@ -1,6 +1,5 @@
import yaml import yaml
import logging import logging
import time
from typing import List from typing import List
from google.protobuf.json_format import MessageToDict from google.protobuf.json_format import MessageToDict
from meshtastic import mt_config from meshtastic import mt_config
@@ -134,29 +133,24 @@ def config_import(interface, filename):
logging.info(f"Setting device owner to {configuration['owner']}") logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True waitForAckNak = True
interface.getNode("^local", False).setOwner(configuration["owner"]) interface.getNode("^local", False).setOwner(configuration["owner"])
time.sleep(0.5)
if "owner_short" in configuration: if "owner_short" in configuration:
logging.info(f"Setting device owner short to {configuration['owner_short']}") logging.info(f"Setting device owner short to {configuration['owner_short']}")
waitForAckNak = True waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"]) interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
time.sleep(0.5)
if "ownerShort" in configuration: if "ownerShort" in configuration:
logging.info(f"Setting device owner short to {configuration['ownerShort']}") logging.info(f"Setting device owner short to {configuration['ownerShort']}")
waitForAckNak = True waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"]) interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
time.sleep(0.5)
if "channel_url" in configuration: if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}") logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode("^local").setURL(configuration["channel_url"]) interface.getNode("^local").setURL(configuration["channel_url"])
time.sleep(0.5)
if "channelUrl" in configuration: if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}") logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode("^local").setURL(configuration["channelUrl"]) interface.getNode("^local").setURL(configuration["channelUrl"])
time.sleep(0.5)
if "location" in configuration: if "location" in configuration:
alt = 0 alt = 0
@@ -175,14 +169,12 @@ def config_import(interface, filename):
logging.info(f"Fixing longitude at {lon} degrees") logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position") logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt) interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)
if "config" in configuration: if "config" in configuration:
localConfig = interface.getNode("^local").localConfig localConfig = interface.getNode("^local").localConfig
for section in configuration["config"]: for section in configuration["config"]:
traverseConfig(section, configuration["config"][section], localConfig) traverseConfig(section, configuration["config"][section], localConfig)
interface.getNode("^local").writeConfig(camel_to_snake(section)) interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
if "module_config" in configuration: if "module_config" in configuration:
moduleConfig = interface.getNode("^local").moduleConfig moduleConfig = interface.getNode("^local").moduleConfig
@@ -193,7 +185,6 @@ def config_import(interface, filename):
moduleConfig, moduleConfig,
) )
interface.getNode("^local").writeConfig(camel_to_snake(section)) interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
interface.getNode("^local", False).commitSettingsTransaction() interface.getNode("^local", False).commitSettingsTransaction()
logging.info("Writing modified configuration to device") logging.info("Writing modified configuration to device")

View File

@@ -104,13 +104,11 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
return [base64.b64encode(b).decode() for b in byte_strings] return [base64.b64encode(b).decode() for b in byte_strings]
def is_valid_base64(s): def is_valid_base64(s):
"""Check if a string is valid Base64 or blank.""" """Check if a string is valid Base64."""
if s == "":
return True
try: try:
decoded = base64.b64decode(s, validate=True) decoded = base64.b64decode(s, validate=True)
return len(decoded) == 32 # Ensure it's exactly 32 bytes return len(decoded) == 32 # Ensure it's exactly 32 bytes
except (binascii.Error, ValueError): except binascii.Error:
return False return False
cvalue = to_base64(current_value) # Convert current values to Base64 cvalue = to_base64(current_value) # Convert current values to Base64

View File

@@ -1,5 +1,4 @@
import datetime import datetime
import time
from meshtastic.protobuf import config_pb2 from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config import contact.ui.default_config as config
@@ -135,31 +134,3 @@ def get_time_ago(timestamp):
if unit != "s": if unit != "s":
return f"{value} {unit} ago" return f"{value} {unit} ago"
return "now" return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ui_state.all_messages[channel_id].append((prefix,message))

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "contact" name = "contact"
version = "1.3.15" version = "1.3.8"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores." description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [ authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"} {name = "Ben Lipsey",email = "ben@pdxlocations.com"}