Compare commits

..

1 Commits

Author SHA1 Message Date
pdxlocations
3d1a489fd1 maybe dead end until refactor scrolling 2025-04-19 18:57:47 -07:00
18 changed files with 694 additions and 1181 deletions

View File

@@ -24,6 +24,7 @@ import traceback
from pubsub import pub from pubsub import pub
# Local application # Local application
import contact.globals as globals
import contact.ui.default_config as config import contact.ui.default_config as config
from contact.message_handlers.rx_handler import on_receive from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region from contact.settings import set_region
@@ -35,7 +36,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input from contact.utilities.input_handlers import get_list_input
from contact.utilities.interfaces import initialize_interface from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
from contact.utilities.singleton import ui_state, interface_state, app_state
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Environment & Logging Setup # Environment & Logging Setup
@@ -51,27 +52,28 @@ logging.basicConfig(
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
) )
app_state.lock = threading.Lock() globals.lock = threading.Lock()
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Main Program Logic # Main Program Logic
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
def prompt_region_if_unset(args: object) -> None:
"""Prompt user to set region if it is unset."""
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
def initialize_globals() -> None: def initialize_globals(args) -> None:
"""Initializes interface and shared globals.""" """Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
interface_state.myNodeNum = get_nodeNum() # Prompt for region if unset
ui_state.channel_list = get_channels() if globals.interface.localNode.localConfig.lora.region == 0:
ui_state.node_list = get_node_list() confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, "meshtastic.receive") pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb() init_nodedb()
@@ -80,63 +82,55 @@ def initialize_globals() -> None:
def main(stdscr: curses.window) -> None: def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI.""" """Main entry point for the curses UI."""
output_capture = io.StringIO() output_capture = io.StringIO()
try: try:
setup_colors() with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
draw_splash(stdscr) setup_colors()
draw_splash(stdscr)
args = setup_parser().parse_args() args = setup_parser().parse_args()
if getattr(args, "settings", False): if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True) subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return return
logging.info("Initializing interface...") logging.info("Initializing interface...")
with app_state.lock: with globals.lock:
interface_state.interface = initialize_interface(args) initialize_globals(args)
logging.info("Starting main UI")
if interface_state.interface.localNode.localConfig.lora.region == 0: main_ui(stdscr)
prompt_region_if_unset(args)
initialize_globals() except Exception as e:
logging.info("Starting main UI") console_output = output_capture.getvalue()
logging.error("Uncaught exception: %s", e)
try: logging.error("Traceback: %s", traceback.format_exc())
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture): logging.error("Console output:\n%s", console_output)
main_ui(stdscr)
except Exception:
console_output = output_capture.getvalue()
logging.error("Uncaught exception inside main_ui")
logging.error("Traceback:\n%s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
return
except Exception:
raise raise
def start() -> None: def start() -> None:
"""Entry point for the application.""" """Launch curses wrapper and redirect logs to file."""
if "--help" in sys.argv or "-h" in sys.argv: if "--help" in sys.argv or "-h" in sys.argv:
setup_parser().print_help() setup_parser().print_help()
sys.exit(0) sys.exit(0)
try: with open(config.log_file_path, "a", buffering=1) as log_f:
curses.wrapper(main) sys.stdout = log_f
except KeyboardInterrupt: sys.stderr = log_f
logging.info("User exited with Ctrl+C")
sys.exit(0) with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
except Exception as e: try:
logging.critical("Fatal error", exc_info=True) curses.wrapper(main)
try: except KeyboardInterrupt:
curses.endwin() logging.info("User exited with Ctrl+C")
except Exception: sys.exit(0)
pass except Exception as e:
print("Fatal error:", e) logging.error("Fatal error: %s", e)
traceback.print_exc() logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1) sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":

13
contact/globals.py Normal file
View File

@@ -0,0 +1,13 @@
interface = None
lock = None
display_log = False
all_messages = {}
channel_list = []
notifications = []
packet_buffer = []
node_list = []
myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0
current_window = 0

View File

@@ -1,14 +1,9 @@
import logging import logging
import os import time
import platform from datetime import datetime
import shutil
import subprocess
from typing import Any, Dict from typing import Any, Dict
from contact.utilities.utils import ( from contact.utilities.utils import refresh_node_list
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import ( from contact.ui.contact_ui import (
draw_packetlog_win, draw_packetlog_win,
draw_node_list, draw_node_list,
@@ -23,48 +18,7 @@ from contact.utilities.db_handler import (
update_node_info_in_db, update_node_info_in_db,
) )
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
elif shutil.which("ffplay") and os.path.exists(ogg_path):
executable = "ffplay"
sound_path = ogg_path
elif shutil.which("aplay") and os.path.exists(wav_path):
executable = "aplay"
sound_path = wav_path
else:
logging.warning("No suitable sound player or sound file found on Linux")
if executable and sound_path:
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
def on_receive(packet: Dict[str, Any], interface: Any) -> None: def on_receive(packet: Dict[str, Any], interface: Any) -> None:
@@ -75,14 +29,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
packet: The received Meshtastic packet as a dictionary. packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet. interface: The Meshtastic interface instance that received the packet.
""" """
with app_state.lock: with globals.lock:
# Update packet log # Update packet log
ui_state.packet_buffer.append(packet) globals.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20: if len(globals.packet_buffer) > 20:
# Trim buffer to 20 packets # Trim buffer to 20 packets
ui_state.packet_buffer = ui_state.packet_buffer[-20:] globals.packet_buffer = globals.packet_buffer[-20:]
if ui_state.display_log: if globals.display_log:
draw_packetlog_win() draw_packetlog_win()
try: try:
if "decoded" not in packet: if "decoded" not in packet:
@@ -98,10 +52,6 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
maybe_store_nodeinfo_in_db(packet) maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP": elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
if config.notification_sound == "True":
play_sound()
message_bytes = packet["decoded"]["payload"] message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8") message_string = message_bytes.decode("utf-8")
@@ -113,21 +63,19 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
else: else:
channel_number = 0 channel_number = 0
if packet["to"] == interface_state.myNodeNum: if packet["to"] == globals.myNodeNum:
if packet["from"] in ui_state.channel_list: if packet["from"] in globals.channel_list:
pass pass
else: else:
ui_state.channel_list.append(packet["from"]) globals.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages: if packet["from"] not in globals.all_messages:
ui_state.all_messages[packet["from"]] = [] globals.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False) update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True refresh_channels = True
channel_number = ui_state.channel_list.index(packet["from"]) channel_number = globals.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number] if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number) add_notification(channel_number)
refresh_channels = True refresh_channels = True
else: else:
@@ -137,14 +85,40 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"] message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":" message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string) if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[globals.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
globals.all_messages[globals.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
if refresh_channels: if refresh_channels:
draw_channel_list() draw_channel_list()
if refresh_messages: if refresh_messages:
draw_messages_window(True) draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string) save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
except KeyError as e: except KeyError as e:
logging.error(f"Error processing packet: {e}") logging.error(f"Error processing packet: {e}")

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict from typing import Any, Dict
import google.protobuf.json_format import google.protobuf.json_format
@@ -12,10 +13,7 @@ from contact.utilities.db_handler import (
update_node_info_in_db, update_node_info_in_db,
) )
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.utils import add_new_message
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp} ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -33,12 +31,12 @@ def onAckNak(packet: Dict[str, Any]) -> None:
return return
acknak = ack_naks.pop(request) acknak = ack_naks.pop(request)
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1] message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
confirm_string = " " confirm_string = " "
ack_type = None ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE": if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str confirm_string = config.ack_implicit_str
ack_type = "Implicit" ack_type = "Implicit"
else: else:
@@ -48,15 +46,15 @@ def onAckNak(packet: Dict[str, Any]) -> None:
confirm_string = config.nak_str confirm_string = config.nak_str
ack_type = "Nak" ack_type = "Nak"
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = ( globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
config.sent_message_prefix + confirm_string + ": ", config.sent_message_prefix + confirm_string + ": ",
message, message,
) )
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type) update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
channel_number = ui_state.channel_list.index(acknak["channel"]) channel_number = globals.channel_list.index(acknak["channel"])
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]: if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
draw_messages_window() draw_messages_window()
@@ -139,17 +137,16 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
msg_str += route_str + "\n" # Print the route back to us msg_str += route_str + "\n" # Print the route back to us
if packet["from"] not in ui_state.channel_list: if packet["from"] not in globals.channel_list:
ui_state.channel_list.append(packet["from"]) globals.channel_list.append(packet["from"])
refresh_channels = True refresh_channels = True
if is_chat_archived(packet["from"]): if is_chat_archived(packet["from"]):
update_node_info_in_db(packet["from"], chat_archived=False) update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = ui_state.channel_list.index(packet["from"]) channel_number = globals.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id == ui_state.channel_list[ui_state.selected_channel]: if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
refresh_messages = True refresh_messages = True
else: else:
add_notification(channel_number) add_notification(channel_number)
@@ -157,29 +154,33 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n" message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str) if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
if refresh_channels: if refresh_channels:
draw_channel_list() draw_channel_list()
if refresh_messages: if refresh_messages:
draw_messages_window(True) draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None: def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
""" """
Sends a chat message using the selected channel. Sends a chat message using the selected channel.
""" """
myid = interface_state.myNodeNum myid = globals.myNodeNum
send_on_channel = 0 send_on_channel = 0
channel_id = ui_state.channel_list[channel] channel_id = globals.channel_list[channel]
if isinstance(channel_id, int): if isinstance(channel_id, int):
send_on_channel = 0 send_on_channel = 0
destination = channel_id destination = channel_id
elif isinstance(channel_id, str): elif isinstance(channel_id, str):
send_on_channel = channel send_on_channel = channel
sent_message_data = interface_state.interface.sendText( sent_message_data = globals.interface.sendText(
text=message, text=message,
destinationId=destination, destinationId=destination,
wantAck=True, wantAck=True,
@@ -188,13 +189,38 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
channelIndex=send_on_channel, channelIndex=send_on_channel,
) )
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message) # Add sent message to the messages dictionary
if channel_id not in globals.all_messages:
globals.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message) timestamp = save_message_to_db(channel_id, myid, message)
ack_naks[sent_message_data.id] = { ack_naks[sent_message_data.id] = {
"channel": channel_id, "channel": channel_id,
"messageIndex": len(ui_state.all_messages[channel_id]) - 1, "messageIndex": len(globals.all_messages[channel_id]) - 1,
"timestamp": timestamp, "timestamp": timestamp,
} }
@@ -203,14 +229,10 @@ def send_traceroute() -> None:
""" """
Sends a RouteDiscovery protobuf to the selected node. Sends a RouteDiscovery protobuf to the selected node.
""" """
channel_id = ui_state.node_list[ui_state.selected_node]
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
r = mesh_pb2.RouteDiscovery() r = mesh_pb2.RouteDiscovery()
interface_state.interface.sendData( globals.interface.sendData(
r, r,
destinationId=channel_id, destinationId=globals.node_list[globals.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP, portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True, wantResponse=True,
onResponse=on_response_traceroute, onResponse=on_response_traceroute,

File diff suppressed because it is too large Load Diff

View File

@@ -150,15 +150,6 @@ def draw_help_window(
) )
def get_input_type_for_field(field) -> type:
if field.type in (field.TYPE_INT32, field.TYPE_UINT32, field.TYPE_INT64):
return int
elif field.type in (field.TYPE_FLOAT, field.TYPE_DOUBLE):
return float
else:
return str
def settings_menu(stdscr: object, interface: object) -> None: def settings_menu(stdscr: object, interface: object) -> None:
curses.update_lines_cols() curses.update_lines_cols()
@@ -277,8 +268,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
break break
elif selected_option == "Export Config File": elif selected_option == "Export Config File":
filename = get_text_input("Enter a filename for the config file")
filename = get_text_input("Enter a filename for the config file", None, None)
if not filename: if not filename:
logging.info("Export aborted: No filename provided.") logging.info("Export aborted: No filename provided.")
menu_state.start_index.pop() menu_state.start_index.pop()
@@ -300,7 +290,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
with open(yaml_file_path, "w", encoding="utf-8") as file: with open(yaml_file_path, "w", encoding="utf-8") as file:
file.write(config_text) file.write(config_text)
logging.info(f"Config file saved to {yaml_file_path}") logging.info(f"Config file saved to {yaml_file_path}")
dialog("Config File Saved:", yaml_file_path) dialog(stdscr, "Config File Saved:", yaml_file_path)
menu_state.start_index.pop() menu_state.start_index.pop()
continue continue
except PermissionError: except PermissionError:
@@ -316,14 +306,14 @@ def settings_menu(stdscr: object, interface: object) -> None:
# Check if folder exists and is not empty # Check if folder exists and is not empty
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)): if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
dialog("", " No config files found. Export a config first.") dialog(stdscr, "", " No config files found. Export a config first.")
continue # Return to menu continue # Return to menu
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))] file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
# Ensure file_list is not empty before proceeding # Ensure file_list is not empty before proceeding
if not file_list: if not file_list:
dialog("", " No config files found. Export a config first.") dialog(stdscr, "", " No config files found. Export a config first.")
continue continue
filename = get_list_input("Choose a config file", None, file_list) filename = get_list_input("Choose a config file", None, file_list)
@@ -337,7 +327,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
elif selected_option == "Config URL": elif selected_option == "Config URL":
current_value = interface.localNode.getURL() current_value = interface.localNode.getURL()
new_value = get_text_input(f"Config URL is currently: {current_value}", None, str) new_value = get_text_input(f"Config URL is currently: {current_value}")
if new_value is not None: if new_value is not None:
current_value = new_value current_value = new_value
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"]) overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
@@ -405,9 +395,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
if selected_option in ["longName", "shortName", "isLicensed"]: if selected_option in ["longName", "shortName", "isLicensed"]:
if selected_option in ["longName", "shortName"]: if selected_option in ["longName", "shortName"]:
new_value = get_text_input( new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
f"{human_readable_name} is currently: {current_value}", selected_option, None
)
new_value = current_value if new_value is None else new_value new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value) menu_state.current_menu[selected_option] = (field, new_value)
@@ -426,9 +414,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop() menu_state.start_index.pop()
elif selected_option in ["latitude", "longitude", "altitude"]: elif selected_option in ["latitude", "longitude", "altitude"]:
new_value = get_text_input( new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
f"{human_readable_name} is currently: {current_value}", selected_option, float
)
new_value = current_value if new_value is None else new_value new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value) menu_state.current_menu[selected_option] = (field, new_value)
@@ -467,26 +453,17 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop() menu_state.start_index.pop()
elif field.type == 13: # Field type 13 corresponds to UINT32 elif field.type == 13: # Field type 13 corresponds to UINT32
input_type = get_input_type_for_field(field) new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else int(new_value) new_value = current_value if new_value is None else int(new_value)
menu_state.start_index.pop() menu_state.start_index.pop()
elif field.type == 2: # Field type 13 corresponds to INT64 elif field.type == 2: # Field type 13 corresponds to INT64
input_type = get_input_type_for_field(field) new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else float(new_value) new_value = current_value if new_value is None else float(new_value)
menu_state.start_index.pop() menu_state.start_index.pop()
else: # Handle other field types else: # Handle other field types
input_type = get_input_type_for_field(field) new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else new_value new_value = current_value if new_value is None else new_value
menu_state.start_index.pop() menu_state.start_index.pop()

View File

@@ -7,56 +7,10 @@ from typing import Dict
script_dir = os.path.dirname(os.path.abspath(__file__)) script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir)) parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# To test writting to a non-writable directory, you can uncomment the following lines: # Paths
# mkdir /tmp/test_nonwritable json_file_path = os.path.join(parent_dir, "config.json")
# chmod -w /tmp/test_nonwritable log_file_path = os.path.join(parent_dir, "client.log")
# parent_dir = "/tmp/test_nonwritable" db_file_path = os.path.join(parent_dir, "client.db")
def _is_writable_dir(path: str) -> bool:
"""
Return True if we can create & delete a temp file in `path`.
"""
if not os.path.isdir(path):
return False
test_path = os.path.join(path, ".perm_test_tmp")
try:
with open(test_path, "w", encoding="utf-8") as _tmp:
_tmp.write("ok")
os.remove(test_path)
return True
except OSError:
return False
def _get_config_root(preferred_dir: str, fallback_name: str = ".contact_client") -> str:
"""
Choose a writable directory for config artifacts.
"""
if _is_writable_dir(preferred_dir):
return preferred_dir
home = os.path.expanduser("~")
fallback_dir = os.path.join(home, fallback_name)
# Ensure the fallback exists.
os.makedirs(fallback_dir, exist_ok=True)
# If *that* still isn't writable, last-ditch: use a system temp dir.
if not _is_writable_dir(fallback_dir):
import tempfile
fallback_dir = tempfile.mkdtemp(prefix="contact_client_")
return fallback_dir
# Pick the root now.
config_root = _get_config_root(parent_dir)
# Paths (derived from the chosen root)
json_file_path = os.path.join(config_root, "config.json")
log_file_path = os.path.join(config_root, "client.log")
db_file_path = os.path.join(config_root, "client.db")
def format_json_single_line_arrays(data: Dict[str, object], indent: int = 4) -> str: def format_json_single_line_arrays(data: Dict[str, object], indent: int = 4) -> str:
@@ -169,18 +123,15 @@ def initialize_config() -> Dict[str, object]:
"settings_breadcrumbs": ["green", "black"], "settings_breadcrumbs": ["green", "black"],
"settings_warning": ["green", "black"], "settings_warning": ["green", "black"],
"settings_note": ["green", "black"], "settings_note": ["green", "black"],
"node_favorite": ["cyan", "green"], "node_favorite": ["cyan", "white"],
"node_ignored": ["red", "black"], "node_ignored": ["red", "white"],
} }
default_config_variables = { default_config_variables = {
"channel_list_16ths": "3",
"node_list_16ths": "5",
"db_file_path": db_file_path, "db_file_path": db_file_path,
"log_file_path": log_file_path, "log_file_path": log_file_path,
"message_prefix": ">>", "message_prefix": ">>",
"sent_message_prefix": ">> Sent", "sent_message_prefix": ">> Sent",
"notification_symbol": "*", "notification_symbol": "*",
"notification_sound": "True",
"ack_implicit_str": "[◌]", "ack_implicit_str": "[◌]",
"ack_str": "[✓]", "ack_str": "[✓]",
"nak_str": "[x]", "nak_str": "[x]",
@@ -219,23 +170,18 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
global db_file_path, log_file_path, message_prefix, sent_message_prefix global db_file_path, log_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global node_list_16ths, channel_list_16ths
global theme, COLOR_CONFIG global theme, COLOR_CONFIG
global node_sort, notification_sound global node_sort
channel_list_16ths = loaded_config["channel_list_16ths"]
node_list_16ths = loaded_config["node_list_16ths"]
db_file_path = loaded_config["db_file_path"] db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"] log_file_path = loaded_config["log_file_path"]
message_prefix = loaded_config["message_prefix"] message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"] sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"] notification_symbol = loaded_config["notification_symbol"]
notification_sound = loaded_config["notification_sound"]
ack_implicit_str = loaded_config["ack_implicit_str"] ack_implicit_str = loaded_config["ack_implicit_str"]
ack_str = loaded_config["ack_str"] ack_str = loaded_config["ack_str"]
nak_str = loaded_config["nak_str"] nak_str = loaded_config["nak_str"]
ack_unknown_str = loaded_config["ack_unknown_str"] ack_unknown_str = loaded_config["ack_unknown_str"]
node_sort = loaded_config["node_sort"]
theme = loaded_config["theme"] theme = loaded_config["theme"]
if theme == "dark": if theme == "dark":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"] COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
@@ -243,6 +189,7 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"] COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green": elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"] COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
node_sort = loaded_config["node_sort"]
# Call the function when the script is imported # Call the function when the script is imported

View File

@@ -2,13 +2,14 @@ import curses
from contact.ui.colors import get_color from contact.ui.colors import get_color
def dialog(title: str, message: str) -> None: def dialog(stdscr: curses.window, title: str, message: str) -> None:
height, width = stdscr.getmaxyx()
height, width = curses.LINES, curses.COLS
# Calculate dialog dimensions # Calculate dialog dimensions
max_line_lengh = 0
message_lines = message.splitlines() message_lines = message.splitlines()
max_line_length = max(len(l) for l in message_lines) for l in message_lines:
max_line_length = max(len(l), max_line_lengh)
dialog_width = max(len(title) + 4, max_line_length + 4) dialog_width = max(len(title) + 4, max_line_length + 4)
dialog_height = len(message_lines) + 4 dialog_height = len(message_lines) + 4
x = (width - dialog_width) // 2 x = (width - dialog_width) // 2
@@ -23,19 +24,12 @@ def dialog(title: str, message: str) -> None:
# Add title # Add title
win.addstr(0, 2, title, get_color("settings_default")) win.addstr(0, 2, title, get_color("settings_default"))
# Add message (centered) # Add message
for i, line in enumerate(message_lines): for i, l in enumerate(message_lines):
msg_x = (dialog_width - len(line)) // 2 win.addstr(2 + i, 2, l, get_color("settings_default"))
win.addstr(2 + i, msg_x, line, get_color("settings_default"))
# Add centered OK button # Add button
ok_text = " Ok " win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
win.addstr(
dialog_height - 2,
(dialog_width - len(ok_text)) // 2,
ok_text,
get_color("settings_default", reverse=True),
)
# Refresh dialog window # Refresh dialog window
win.refresh() win.refresh()
@@ -43,7 +37,8 @@ def dialog(title: str, message: str) -> None:
# Get user input # Get user input
while True: while True:
char = win.getch() char = win.getch()
if char in (curses.KEY_ENTER, 10, 13, 32, 27): # Enter, space, or Esc # Close dialog with enter, space, or esc
if char in (curses.KEY_ENTER, 10, 13, 32, 27):
win.erase() win.erase()
win.refresh() win.refresh()
return return

View File

@@ -1,22 +1,8 @@
import curses import curses
import re import re
from unicodedata import east_asian_width
from contact.ui.colors import get_color from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict from typing import Any, Optional, List, Dict
from contact.utilities.singleton import interface_state, ui_state
def get_node_color(node_index: int, reverse: bool = False):
node_num = ui_state.node_list[node_index]
node = interface_state.interface.nodesByNum.get(node_num, {})
if node.get("isFavorite"):
return get_color("node_favorite", reverse=reverse)
elif node.get("isIgnored"):
return get_color("node_ignored", reverse=reverse)
return get_color("settings_default", reverse=reverse)
# Aliases # Aliases
Segment = tuple[str, str, bool, bool] Segment = tuple[str, str, bool, bool]
@@ -142,6 +128,7 @@ def draw_arrows(
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
) -> None: ) -> None:
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if show_save_option else 0) mi = max_index - (2 if show_save_option else 0)
if visible_height < mi: if visible_height < mi:
@@ -295,16 +282,9 @@ def get_wrapped_help_text(
return wrapped_help return wrapped_help
def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def wrap_text(text: str, wrap_width: int) -> List[str]: def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words.""" """Wraps text while preserving spaces and breaking long words."""
whitespace = '\t\n\x0b\x0c\r '
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
text = text.translate(whitespace_trans)
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
wrapped_lines = [] wrapped_lines = []
line_buffer = "" line_buffer = ""
@@ -313,11 +293,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrap_width -= margin wrap_width -= margin
for word in words: for word in words:
word_length = text_width(word) word_length = len(word)
if word_length > wrap_width: # Break long words if word_length > wrap_width: # Break long words
if line_buffer: if line_buffer:
wrapped_lines.append(line_buffer.strip()) wrapped_lines.append(line_buffer)
line_buffer = "" line_buffer = ""
line_length = 0 line_length = 0
for i in range(0, word_length, wrap_width): for i in range(0, word_length, wrap_width):
@@ -325,7 +305,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
continue continue
if line_length + word_length > wrap_width and word.strip(): if line_length + word_length > wrap_width and word.strip():
wrapped_lines.append(line_buffer.strip()) wrapped_lines.append(line_buffer)
line_buffer = "" line_buffer = ""
line_length = 0 line_length = 0
@@ -333,94 +313,6 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
line_length += word_length line_length += word_length
if line_buffer: if line_buffer:
wrapped_lines.append(line_buffer.strip()) wrapped_lines.append(line_buffer)
return wrapped_lines return wrapped_lines
def move_main_highlight(
old_idx: int, new_idx, options: List[str], menu_win: curses.window, menu_pad: curses.window, ui_state: object
) -> None:
if old_idx == new_idx: # No-op
return
max_index = len(options) - 1
visible_height = menu_win.getmaxyx()[0] - 2
if new_idx < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
ui_state.start_index[ui_state.current_window] = new_idx
elif new_idx >= ui_state.start_index[ui_state.current_window] + visible_height: # Moving below the visible area
ui_state.start_index[ui_state.current_window] = new_idx - visible_height + 1
# Ensure start_index is within bounds
ui_state.start_index[ui_state.current_window] = max(
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
)
highlight_line(menu_win, menu_pad, old_idx, new_idx, visible_height)
if ui_state.current_window == 0: # hack to fix max_index
max_index += 1
draw_main_arrows(menu_win, max_index, window=ui_state.current_window)
menu_win.refresh()
def highlight_line(
menu_win: curses.window, menu_pad: curses.window, old_idx: int, new_idx: int, visible_height: int
) -> None:
if ui_state.current_window == 0:
color_old = (
get_color("channel_selected") if old_idx == ui_state.selected_channel else get_color("channel_list")
)
color_new = get_color("channel_list", reverse=True) if True else get_color("channel_list", reverse=True)
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, color_old)
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
elif ui_state.current_window == 2:
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(
ui_state.start_index[ui_state.current_window],
0,
menu_win.getbegyx()[0] + 1,
menu_win.getbegyx()[1] + 1,
menu_win.getbegyx()[0] + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 3,
)
def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None:
height, width = win.getmaxyx()
usable_height = height - 2
usable_width = width - 2
if window == 1 and ui_state.display_log:
if log_height := kwargs.get("log_height"):
usable_height -= log_height - 1
if usable_height < max_index:
if ui_state.start_index[window] > 0:
win.addstr(1, usable_width, "", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
if max_index - ui_state.start_index[window] - 1 >= usable_height:
win.addstr(usable_height, usable_width, "", get_color("settings_default"))
else:
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
def get_msg_window_lines(messages_win, packetlog_win) -> None:
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height

View File

@@ -1,42 +1,11 @@
from typing import Any, Union, List, Dict from typing import Any, Union, List, Dict
from dataclasses import dataclass, field
@dataclass
class MenuState: class MenuState:
menu_index: List[int] = field(default_factory=list) def __init__(self):
start_index: List[int] = field(default_factory=lambda: [0]) self.menu_index: List[int] = [] # Row we left the previous menus
selected_index: int = 0 self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict) self.selected_index: int = 0 # Selected Row
menu_path: List[str] = field(default_factory=list) self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu
show_save_option: bool = False self.menu_path: List[str] = [] # Menu Path
self.show_save_option: bool = False # Display 'Save'
@dataclass
class ChatUIState:
display_log: bool = False
channel_list: List[str] = field(default_factory=list)
all_messages: Dict[str, List[str]] = field(default_factory=dict)
notifications: List[str] = field(default_factory=list)
packet_buffer: List[str] = field(default_factory=list)
node_list: List[str] = field(default_factory=list)
selected_channel: int = 0
selected_message: int = 0
selected_node: int = 0
current_window: int = 0
selected_index: int = 0
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
show_save_option: bool = False
menu_path: List[str] = field(default_factory=list)
@dataclass
class InterfaceState:
interface: Any = None
myNodeNum: int = 0
@dataclass
class AppState:
lock: Any = None

View File

@@ -55,15 +55,10 @@ def edit_value(key: str, current_value: str) -> str:
# Load theme names dynamically from the JSON # Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")] theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return get_list_input("Select Theme", current_value, theme_options) return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort": elif key == "node_sort":
sort_options = ["lastHeard", "name", "hops"] sort_options = ["lastHeard", "name", "hops"]
return get_list_input("Sort By", current_value, sort_options) return get_list_input("Sort By", current_value, sort_options)
elif key == "notification_sound":
sound_options = ["True", "False"]
return get_list_input("Notification Sound", current_value, sound_options)
# Standard Input Mode (Scrollable) # Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default")) edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1) curses.curs_set(1)

View File

@@ -1,6 +1,5 @@
import yaml import yaml
import logging import logging
import time
from typing import List from typing import List
from google.protobuf.json_format import MessageToDict from google.protobuf.json_format import MessageToDict
from meshtastic import mt_config from meshtastic import mt_config
@@ -134,29 +133,24 @@ def config_import(interface, filename):
logging.info(f"Setting device owner to {configuration['owner']}") logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True waitForAckNak = True
interface.getNode("^local", False).setOwner(configuration["owner"]) interface.getNode("^local", False).setOwner(configuration["owner"])
time.sleep(0.5)
if "owner_short" in configuration: if "owner_short" in configuration:
logging.info(f"Setting device owner short to {configuration['owner_short']}") logging.info(f"Setting device owner short to {configuration['owner_short']}")
waitForAckNak = True waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"]) interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
time.sleep(0.5)
if "ownerShort" in configuration: if "ownerShort" in configuration:
logging.info(f"Setting device owner short to {configuration['ownerShort']}") logging.info(f"Setting device owner short to {configuration['ownerShort']}")
waitForAckNak = True waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"]) interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
time.sleep(0.5)
if "channel_url" in configuration: if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}") logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode("^local").setURL(configuration["channel_url"]) interface.getNode("^local").setURL(configuration["channel_url"])
time.sleep(0.5)
if "channelUrl" in configuration: if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}") logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode("^local").setURL(configuration["channelUrl"]) interface.getNode("^local").setURL(configuration["channelUrl"])
time.sleep(0.5)
if "location" in configuration: if "location" in configuration:
alt = 0 alt = 0
@@ -175,14 +169,12 @@ def config_import(interface, filename):
logging.info(f"Fixing longitude at {lon} degrees") logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position") logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt) interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)
if "config" in configuration: if "config" in configuration:
localConfig = interface.getNode("^local").localConfig localConfig = interface.getNode("^local").localConfig
for section in configuration["config"]: for section in configuration["config"]:
traverseConfig(section, configuration["config"][section], localConfig) traverseConfig(section, configuration["config"][section], localConfig)
interface.getNode("^local").writeConfig(camel_to_snake(section)) interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
if "module_config" in configuration: if "module_config" in configuration:
moduleConfig = interface.getNode("^local").moduleConfig moduleConfig = interface.getNode("^local").moduleConfig
@@ -193,7 +185,6 @@ def config_import(interface, filename):
moduleConfig, moduleConfig,
) )
interface.getNode("^local").writeConfig(camel_to_snake(section)) interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
interface.getNode("^local", False).commitSettingsTransaction() interface.getNode("^local", False).commitSettingsTransaction()
logging.info("Writing modified configuration to device") logging.info("Writing modified configuration to device")

View File

@@ -6,14 +6,12 @@ from typing import Optional, Union, Dict
from contact.utilities.utils import decimal_to_hex from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
def get_table_name(channel: str) -> str: def get_table_name(channel: str) -> str:
# Construct the table name # Construct the table name
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages" table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name return quoted_table_name
@@ -63,7 +61,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
message_text = ? message_text = ?
""" """
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message)) db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_connection.commit() db_connection.commit()
except sqlite3.Error as e: except sqlite3.Error as e:
@@ -74,13 +72,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
def load_messages_from_db() -> None: def load_messages_from_db() -> None:
"""Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list.""" """Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
try: try:
with sqlite3.connect(config.db_file_path) as db_connection: with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor() db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?" query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",)) db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()] tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages # Iterate through each table and fetch its messages
@@ -106,24 +104,17 @@ def load_messages_from_db() -> None:
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name) # Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
channel = int(channel) if channel.isdigit() else channel channel = int(channel) if channel.isdigit() else channel
# Add the channel to ui_state.channel_list if not already present # Add the channel to globals.channel_list if not already present
if channel not in ui_state.channel_list and not is_chat_archived(channel): if channel not in globals.channel_list and not is_chat_archived(channel):
ui_state.channel_list.append(channel) globals.channel_list.append(channel)
# Ensure the channel exists in ui_state.all_messages # Ensure the channel exists in globals.all_messages
if channel not in ui_state.all_messages: if channel not in globals.all_messages:
ui_state.all_messages[channel] = [] globals.all_messages[channel] = []
# Add messages to ui_state.all_messages grouped by hourly timestamp # Add messages to globals.all_messages grouped by hourly timestamp
hourly_messages = {} hourly_messages = {}
for row in db_messages: for user_id, message, timestamp, ack_type in db_messages:
user_id, message, timestamp, ack_type = row
# Only ack_type is allowed to be None
if user_id is None or message is None or timestamp is None:
logging.warning(f"Skipping row with NULL required field(s): {row}")
continue
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00") hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
if hour not in hourly_messages: if hour not in hourly_messages:
hourly_messages[hour] = [] hourly_messages[hour] = []
@@ -136,22 +127,20 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak": elif ack_type == "Nak":
ack_str = config.nak_str ack_str = config.nak_str
if user_id == str(interface_state.myNodeNum): if user_id == str(globals.myNodeNum):
sanitized_message = message.replace("\x00", "") formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", sanitized_message)
else: else:
sanitized_message = message.replace("\x00", "")
formatted_message = ( formatted_message = (
f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ", f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ",
sanitized_message, message,
) )
hourly_messages[hour].append(formatted_message) hourly_messages[hour].append(formatted_message)
# Flatten the hourly messages into ui_state.all_messages[channel] # Flatten the hourly messages into globals.all_messages[channel]
for hour, messages in sorted(hourly_messages.items()): for hour, messages in sorted(hourly_messages.items()):
ui_state.all_messages[channel].append((f"-- {hour} --", "")) globals.all_messages[channel].append((f"-- {hour} --", ""))
ui_state.all_messages[channel].extend(messages) globals.all_messages[channel].extend(messages)
except sqlite3.Error as e: except sqlite3.Error as e:
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}") logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
@@ -164,11 +153,11 @@ def init_nodedb() -> None:
"""Initialize the node database and update it with nodes from the interface.""" """Initialize the node database and update it with nodes from the interface."""
try: try:
if not interface_state.interface.nodes: if not globals.interface.nodes:
return # No nodes to initialize return # No nodes to initialize
ensure_node_table_exists() # Ensure the table exists before insertion ensure_node_table_exists() # Ensure the table exists before insertion
nodes_snapshot = list(interface_state.interface.nodes.values()) nodes_snapshot = list(globals.interface.nodes.values())
# Insert or update all nodes # Insert or update all nodes
for node in nodes_snapshot: for node in nodes_snapshot:
@@ -225,7 +214,7 @@ def update_node_info_in_db(
with sqlite3.connect(config.db_file_path) as db_connection: with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor() db_cursor = db_connection.cursor()
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")] table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
if "chat_archived" not in table_columns: if "chat_archived" not in table_columns:
@@ -289,7 +278,7 @@ def update_node_info_in_db(
def ensure_node_table_exists() -> None: def ensure_node_table_exists() -> None:
"""Ensure the node database table exists.""" """Ensure the node database table exists."""
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
schema = """ schema = """
user_id TEXT PRIMARY KEY, user_id TEXT PRIMARY KEY,
long_name TEXT, long_name TEXT,
@@ -330,7 +319,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
db_cursor = db_connection.cursor() db_cursor = db_connection.cursor()
# Construct table name # Construct table name
table_name = f"{str(interface_state.myNodeNum)}_nodedb" table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch # Determine the correct column to fetch
@@ -356,7 +345,7 @@ def is_chat_archived(user_id: int) -> int:
try: try:
with sqlite3.connect(config.db_file_path) as db_connection: with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor() db_cursor = db_connection.cursor()
table_name = f"{str(interface_state.myNodeNum)}_nodedb" table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?" query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,)) db_cursor.execute(query, (user_id,))

View File

@@ -6,43 +6,10 @@ from typing import Any, Optional, List
from contact.ui.colors import get_color from contact.ui.colors import get_color
from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text
from contact.ui.dialog import dialog
from contact.utilities.validation_rules import get_validation_for
def invalid_input(window: curses.window, message: str, redraw_func: Optional[callable] = None) -> None: def get_text_input(prompt: str) -> Optional[str]:
"""Displays an invalid input message in the given window and redraws if needed."""
cursor_y, cursor_x = window.getyx()
curses.curs_set(0)
dialog("Invalid Input", message)
if redraw_func:
redraw_func() # Redraw the original window content that got obscured
else:
window.refresh()
window.move(cursor_y, cursor_x)
curses.curs_set(1)
def get_text_input(prompt: str, selected_config: str, input_type: str) -> Optional[str]:
"""Handles user input with wrapped text for long prompts.""" """Handles user input with wrapped text for long prompts."""
def redraw_input_win():
"""Redraw the input window with the current prompt and user input."""
input_win.erase()
input_win.border()
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
if row >= height - 3:
break
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
input_win.addstr(row + 1, col_start, user_input[:first_line_width], get_color("settings_default"))
for i, line in enumerate(wrap_text(user_input[first_line_width:], wrap_width=input_width)):
if row + 2 + i < height - 1:
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
input_win.refresh()
height = 8 height = 8
width = 80 width = 80
margin = 2 # Left and right margin margin = 2 # Left and right margin
@@ -60,7 +27,6 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
# Wrap the prompt text # Wrap the prompt text
wrapped_prompt = wrap_text(prompt, wrap_width=input_width) wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
row = 1 row = 1
for line in wrapped_prompt: for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True)) input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1 row += 1
@@ -73,115 +39,34 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
input_win.refresh() input_win.refresh()
curses.curs_set(1) curses.curs_set(1)
min_value = 0 max_length = 4 if "shortName" in prompt else None
max_value = 4294967295
min_length = 0
max_length = None
if selected_config is not None:
validation = get_validation_for(selected_config) or {}
min_value = validation.get("min_value", 0)
max_value = validation.get("max_value", 4294967295)
min_length = validation.get("min_length", 0)
max_length = validation.get("max_length")
user_input = "" user_input = ""
# Start user input after the prompt text
col_start = margin + len(prompt_text) col_start = margin + len(prompt_text)
first_line_width = input_width - len(prompt_text) first_line_width = input_width - len(prompt_text) # Available space for first line
while True: while True:
key = input_win.get_wch() key = input_win.get_wch() # Waits for user input
if key == chr(27) or key == curses.KEY_LEFT: if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
input_win.erase() input_win.erase()
input_win.refresh() input_win.refresh()
curses.curs_set(0) curses.curs_set(0)
return None return None # Exit without saving
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): # Enter key
if not user_input.strip(): break
invalid_input(input_win, "Value cannot be empty.", redraw_func=redraw_input_win)
continue
length = len(user_input)
if min_length == max_length and max_length is not None:
if length != min_length:
invalid_input(
input_win, f"Value must be exactly {min_length} characters long.", redraw_func=redraw_input_win
)
continue
else:
if length < min_length:
invalid_input(
input_win,
f"Value must be at least {min_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if max_length is not None and length > max_length:
invalid_input(
input_win,
f"Value must be no more than {max_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if input_type is int:
if not user_input.isdigit():
invalid_input(input_win, "Only numeric digits (09) allowed.", redraw_func=redraw_input_win)
continue
int_val = int(user_input)
if not (min_value <= int_val <= max_value):
invalid_input(
input_win, f"Enter a number between {min_value} and {max_value}.", redraw_func=redraw_input_win
)
continue
curses.curs_set(0)
return int_val
elif input_type is float:
try:
float_val = float(user_input)
if not (min_value <= float_val <= max_value):
invalid_input(
input_win,
f"Enter a number between {min_value} and {max_value}.",
redraw_func=redraw_input_win,
)
continue
except ValueError:
invalid_input(input_win, "Must be a valid floating point number.", redraw_func=redraw_input_win)
continue
else:
curses.curs_set(0)
return float_val
else:
break
elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace
if user_input: if user_input:
user_input = user_input[:-1] # Remove last character user_input = user_input[:-1] # Remove last character
elif max_length is None or len(user_input) < max_length: elif max_length is None or len(user_input) < max_length: # Enforce max length
try: if isinstance(key, str):
char = chr(key) if not isinstance(key, str) else key user_input += key
if input_type is int: else:
if char.isdigit() or (char == "-" and len(user_input) == 0): user_input += chr(key)
user_input += char
elif input_type is float:
if (
char.isdigit()
or (char == "." and "." not in user_input)
or (char == "-" and len(user_input) == 0)
):
user_input += char
else:
user_input += char
except ValueError:
pass # Ignore invalid input
# First line must be manually handled before using wrap_text() # First line must be manually handled before using wrap_text()
first_line = user_input[:first_line_width] # Cut to max first line width first_line = user_input[:first_line_width] # Cut to max first line width
@@ -210,24 +95,20 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
curses.curs_set(0) curses.curs_set(0)
input_win.erase() input_win.erase()
input_win.refresh() input_win.refresh()
return user_input.strip() return user_input
def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]: def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
"""Handles user input for editing up to 3 Admin Keys in Base64 format."""
def to_base64(byte_strings): def to_base64(byte_strings):
"""Convert byte values to Base64-encoded strings.""" """Convert byte values to Base64-encoded strings."""
return [base64.b64encode(b).decode() for b in byte_strings] return [base64.b64encode(b).decode() for b in byte_strings]
def is_valid_base64(s): def is_valid_base64(s):
"""Check if a string is valid Base64 or blank.""" """Check if a string is valid Base64."""
if s == "":
return True
try: try:
decoded = base64.b64decode(s, validate=True) decoded = base64.b64decode(s, validate=True)
return len(decoded) == 32 # Ensure it's exactly 32 bytes return len(decoded) == 32 # Ensure it's exactly 32 bytes
except (binascii.Error, ValueError): except binascii.Error:
return False return False
cvalue = to_base64(current_value) # Convert current values to Base64 cvalue = to_base64(current_value) # Convert current values to Base64
@@ -247,7 +128,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
# Editable list of values (max 3 values) # Editable list of values (max 3 values)
user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields
cursor_pos = 0 # Track which value is being edited cursor_pos = 0 # Track which value is being edited
invalid_input = "" error_message = ""
while True: while True:
repeated_win.erase() repeated_win.erase()
@@ -267,8 +148,8 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed # Show error message if needed
if invalid_input: if error_message:
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True)) repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
repeated_win.refresh() repeated_win.refresh()
key = repeated_win.getch() key = repeated_win.getch()
@@ -286,7 +167,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
curses.curs_set(0) curses.curs_set(0)
return user_values # Return the edited Base64 values return user_values # Return the edited Base64 values
else: else:
invalid_input = "Error: Each key must be valid Base64 and 32 bytes long!" error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
elif key == curses.KEY_UP: # Move cursor up elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values) cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down elif key == curses.KEY_DOWN: # Move cursor down
@@ -297,7 +178,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
else: else:
try: try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
invalid_input = "" # Clear error if user starts fixing input error_message = "" # Clear error if user starts fixing input
except ValueError: except ValueError:
pass # Ignore invalid character inputs pass # Ignore invalid character inputs
@@ -319,7 +200,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
# Editable list of values (max 3 values) # Editable list of values (max 3 values)
user_values = current_value[:3] user_values = current_value[:3]
cursor_pos = 0 # Track which value is being edited cursor_pos = 0 # Track which value is being edited
invalid_input = "" error_message = ""
while True: while True:
repeated_win.erase() repeated_win.erase()
@@ -339,8 +220,8 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed # Show error message if needed
if invalid_input: if error_message:
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True)) repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
repeated_win.refresh() repeated_win.refresh()
key = repeated_win.getch() key = repeated_win.getch()
@@ -366,7 +247,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
else: else:
try: try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
invalid_input = "" # Clear error if user starts fixing input error_message = "" # Clear error if user starts fixing input
except ValueError: except ValueError:
pass # Ignore invalid character inputs pass # Ignore invalid character inputs

View File

@@ -1,5 +0,0 @@
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState
ui_state = ChatUIState()
interface_state = InterfaceState()
app_state = AppState()

View File

@@ -1,18 +1,16 @@
import contact.globals as globals
import datetime import datetime
import time
from meshtastic.protobuf import config_pb2 from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
def get_channels(): def get_channels():
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages.""" """Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
node = interface_state.interface.getNode("^local") node = globals.interface.getNode("^local")
device_channels = node.channels device_channels = node.channels
# Clear and rebuild channel list # Clear and rebuild channel list
# ui_state.channel_list = [] # globals.channel_list = []
for device_channel in device_channels: for device_channel in device_channels:
if device_channel.role: if device_channel.role:
@@ -28,20 +26,20 @@ def get_channels():
].name ].name
channel_name = convert_to_camel_case(modem_preset_string) channel_name = convert_to_camel_case(modem_preset_string)
# Add channel to ui_state.channel_list if not already present # Add channel to globals.channel_list if not already present
if channel_name not in ui_state.channel_list: if channel_name not in globals.channel_list:
ui_state.channel_list.append(channel_name) globals.channel_list.append(channel_name)
# Initialize ui_state.all_messages[channel_name] if it doesn't exist # Initialize globals.all_messages[channel_name] if it doesn't exist
if channel_name not in ui_state.all_messages: if channel_name not in globals.all_messages:
ui_state.all_messages[channel_name] = [] globals.all_messages[channel_name] = []
return ui_state.channel_list return globals.channel_list
def get_node_list(): def get_node_list():
if interface_state.interface.nodes: if globals.interface.nodes:
my_node_num = interface_state.myNodeNum my_node_num = globals.myNodeNum
def node_sort(node): def node_sort(node):
if config.node_sort == "lastHeard": if config.node_sort == "lastHeard":
@@ -53,7 +51,7 @@ def get_node_list():
else: else:
return node return node
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort) sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
# Move favorite nodes to the beginning # Move favorite nodes to the beginning
sorted_nodes = sorted( sorted_nodes = sorted(
@@ -70,14 +68,14 @@ def get_node_list():
def refresh_node_list(): def refresh_node_list():
new_node_list = get_node_list() new_node_list = get_node_list()
if new_node_list != ui_state.node_list: if new_node_list != globals.node_list:
ui_state.node_list = new_node_list globals.node_list = new_node_list
return True return True
return False return False
def get_nodeNum(): def get_nodeNum():
myinfo = interface_state.interface.getMyNodeInfo() myinfo = globals.interface.getMyNodeInfo()
myNodeNum = myinfo["num"] myNodeNum = myinfo["num"]
return myNodeNum return myNodeNum
@@ -135,31 +133,3 @@ def get_time_ago(timestamp):
if unit != "s": if unit != "s":
return f"{value} {unit} ago" return f"{value} {unit} ago"
return "now" return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ui_state.all_messages[channel_id].append((prefix,message))

View File

@@ -1,23 +0,0 @@
validation_rules = {
"shortName": {"max_length": 4},
"longName": {"max_length": 32},
"fixed_pin": {"min_length": 6, "max_length": 6},
"position_flags": {"max_length": 3},
"enabled_protocols": {"max_value": 2},
"hop_limit": {"max_value": 7},
"latitude": {"min_value": -90, "max_value": 90},
"longitude": {"min_value": -180, "max_value": 180},
"altitude": {"min_value": -4294967295, "max_value": 4294967295},
"red": {"max_value": 255},
"green": {"max_value": 255},
"blue": {"max_value": 255},
"current": {"max_value": 255},
"position_precision": {"max_value": 32},
}
def get_validation_for(key: str) -> dict:
for rule_key, config in validation_rules.items():
if rule_key in key:
return config
return {}

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "contact" name = "contact"
version = "1.3.16" version = "1.3.8"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores." description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [ authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"} {name = "Ben Lipsey",email = "ben@pdxlocations.com"}