Compare commits

..

22 Commits

Author SHA1 Message Date
pdxlocations
1362d3a219 bump version 2025-06-10 10:02:04 -07:00
pdxlocations
981d72e688 fix backspace 2025-06-10 10:01:44 -07:00
pdxlocations
0b5ec0b3d7 Merge pull request #191 from pdxlocations:refactor-ui-functions
Refactor keypress handling
2025-06-09 23:20:42 -07:00
pdxlocations
cbb4ef9e34 break out key functions 2025-06-09 23:19:28 -07:00
pdxlocations
fecd71f4b7 refactor window sizes 2025-06-09 22:37:51 -07:00
pdxlocations
59edfab451 add notif sound prefs (#190) 2025-06-09 22:15:53 -07:00
pdxlocations
39159099e1 change prints to logging 2025-06-09 19:01:40 -07:00
pdxlocations
02e5368c61 waits in configio 2025-06-09 07:40:07 -07:00
pdxlocations
9d234a75d8 change default configs order 2025-06-06 22:45:06 -07:00
pdxlocations
c7edd602ec Make widths configurable (#189) 2025-06-06 22:37:10 -07:00
pdxlocations
00226c5b4d don't use white in green config (#188) 2025-06-06 22:19:58 -07:00
pdxlocations
243079f8eb Error Handling for play_sound (#187)
* add sound for mac and linux

* add error handling for sounds

* use subprocess
2025-06-06 22:04:18 -07:00
pdxlocations
1e0432642c add sound for mac and linux (#183) 2025-05-29 10:08:12 -07:00
pdxlocations
71f37065bf bump version 2025-05-29 10:03:23 -07:00
pdxlocations
ee6aad5d0a allow blank key (#178) 2025-05-18 16:57:49 -07:00
pdxlocations
478f017de1 bump version 2025-05-18 14:43:47 -07:00
pdxlocations
c96c4edb01 Add Arrows to Main UI (#177)
* init

* convert globals to dataclass

* move lock to app state

* Almost working changes

* more almost working changes

* so close

* mostly working changes

* closer changes

* I think it works!

* working changes

* hack fix

* Merge branch 'main' into refactor-chat-ui

* clean-up
2025-05-18 12:28:03 -07:00
pdxlocations
cc416476f5 Merge pull request #174 from rfschmid/patch-1 2025-04-25 21:20:28 -07:00
Russell Schmidt
7fc1cbc3a9 Fix error "No module named 'ui.ui_state'"
Was unable to run locally at tips due to an import not including the package name.
2025-04-23 17:02:47 -05:00
pdxlocations
78f0775ad5 Convert Globals to Class (#173)
* init

* convert globals to dataclass

* move lock to app state
2025-04-19 21:37:54 -07:00
pdxlocations
43f0929247 fix down arrow in user settings 2025-04-19 16:23:17 -07:00
pdxlocations
941e081e90 bump version 2025-04-19 16:17:10 -07:00
15 changed files with 809 additions and 505 deletions

View File

@@ -24,7 +24,6 @@ import traceback
from pubsub import pub
# Local application
import contact.globals as globals
import contact.ui.default_config as config
from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region
@@ -36,7 +35,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input
from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
from contact.utilities.singleton import ui_state, interface_state, app_state
# ------------------------------------------------------------------------------
# Environment & Logging Setup
@@ -52,7 +51,7 @@ logging.basicConfig(
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
globals.lock = threading.Lock()
app_state.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
@@ -61,19 +60,19 @@ globals.lock = threading.Lock()
def initialize_globals(args) -> None:
"""Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
interface_state.interface = initialize_interface(args)
# Prompt for region if unset
if globals.interface.localNode.localConfig.lora.region == 0:
if interface_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
ui_state.node_list = get_node_list()
pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb()
@@ -96,7 +95,7 @@ def main(stdscr: curses.window) -> None:
return
logging.info("Initializing interface...")
with globals.lock:
with app_state.lock:
initialize_globals(args)
logging.info("Starting main UI")

View File

@@ -1,13 +0,0 @@
interface = None
lock = None
display_log = False
all_messages = {}
channel_list = []
notifications = []
packet_buffer = []
node_list = []
myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0
current_window = 0

View File

@@ -1,4 +1,8 @@
import logging
import os
import platform
import shutil
import subprocess
import time
from datetime import datetime
from typing import Any, Dict
@@ -18,7 +22,43 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
if os.path.exists(sound_path):
subprocess.run(["afplay", sound_path], check=True)
return
else:
logging.warning(f"macOS sound file not found: {sound_path}")
elif system == "Linux":
sound_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
if os.path.exists(sound_path):
if shutil.which("paplay"):
subprocess.run(["paplay", sound_path], check=True)
return
elif shutil.which("aplay"):
subprocess.run(["aplay", sound_path], check=True)
return
else:
logging.warning("No sound player found (paplay/aplay)")
else:
logging.warning(f"Linux sound file not found: {sound_path}")
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
# Final fallback: terminal beep
print("\a")
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
@@ -29,14 +69,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with globals.lock:
with app_state.lock:
# Update packet log
globals.packet_buffer.append(packet)
if len(globals.packet_buffer) > 20:
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
globals.packet_buffer = globals.packet_buffer[-20:]
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if globals.display_log:
if ui_state.display_log:
draw_packetlog_win()
try:
if "decoded" not in packet:
@@ -52,6 +92,10 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
if config.notification_sound == "True":
play_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
@@ -63,19 +107,19 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
else:
channel_number = 0
if packet["to"] == globals.myNodeNum:
if packet["from"] in globals.channel_list:
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:
globals.channel_list.append(packet["from"])
if packet["from"] not in globals.all_messages:
globals.all_messages[packet["from"]] = []
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = globals.channel_list.index(packet["from"])
channel_number = ui_state.channel_list.index(packet["from"])
if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
@@ -85,15 +129,15 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[globals.channel_list[channel_number]]
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
@@ -107,9 +151,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
globals.all_messages[globals.channel_list[channel_number]].append(
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
@@ -118,7 +162,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
if refresh_messages:
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -13,7 +13,8 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -31,12 +32,12 @@ def onAckNak(packet: Dict[str, Any]) -> None:
return
acknak = ack_naks.pop(request)
message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
confirm_string = " "
ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str
ack_type = "Implicit"
else:
@@ -46,15 +47,15 @@ def onAckNak(packet: Dict[str, Any]) -> None:
confirm_string = config.nak_str
ack_type = "Nak"
globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
config.sent_message_prefix + confirm_string + ": ",
message,
)
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
channel_number = globals.channel_list.index(acknak["channel"])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
channel_number = ui_state.channel_list.index(acknak["channel"])
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
draw_messages_window()
@@ -137,16 +138,16 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
msg_str += route_str + "\n" # Print the route back to us
if packet["from"] not in globals.channel_list:
globals.channel_list.append(packet["from"])
if packet["from"] not in ui_state.channel_list:
ui_state.channel_list.append(packet["from"])
refresh_channels = True
if is_chat_archived(packet["from"]):
update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = globals.channel_list.index(packet["from"])
channel_number = ui_state.channel_list.index(packet["from"])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
@@ -154,9 +155,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append(
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
@@ -164,23 +165,23 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
Sends a chat message using the selected channel.
"""
myid = globals.myNodeNum
myid = interface_state.myNodeNum
send_on_channel = 0
channel_id = globals.channel_list[channel]
channel_id = ui_state.channel_list[channel]
if isinstance(channel_id, int):
send_on_channel = 0
destination = channel_id
elif isinstance(channel_id, str):
send_on_channel = channel
sent_message_data = globals.interface.sendText(
sent_message_data = interface_state.interface.sendText(
text=message,
destinationId=destination,
wantAck=True,
@@ -190,15 +191,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
)
# Add sent message to the messages dictionary
if channel_id not in globals.all_messages:
globals.all_messages[channel_id] = []
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[channel_id]
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
@@ -212,15 +213,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message)
ack_naks[sent_message_data.id] = {
"channel": channel_id,
"messageIndex": len(globals.all_messages[channel_id]) - 1,
"messageIndex": len(ui_state.all_messages[channel_id]) - 1,
"timestamp": timestamp,
}
@@ -230,9 +231,9 @@ def send_traceroute() -> None:
Sends a RouteDiscovery protobuf to the selected node.
"""
r = mesh_pb2.RouteDiscovery()
globals.interface.sendData(
interface_state.interface.sendData(
r,
destinationId=globals.node_list[globals.selected_node],
destinationId=ui_state.node_list[ui_state.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,

File diff suppressed because it is too large Load Diff

View File

@@ -123,15 +123,18 @@ def initialize_config() -> Dict[str, object]:
"settings_breadcrumbs": ["green", "black"],
"settings_warning": ["green", "black"],
"settings_note": ["green", "black"],
"node_favorite": ["cyan", "white"],
"node_ignored": ["red", "white"],
"node_favorite": ["cyan", "green"],
"node_ignored": ["red", "black"],
}
default_config_variables = {
"channel_list_16ths": "3",
"node_list_16ths": "5",
"db_file_path": db_file_path,
"log_file_path": log_file_path,
"message_prefix": ">>",
"sent_message_prefix": ">> Sent",
"notification_symbol": "*",
"notification_sound": "True",
"ack_implicit_str": "[◌]",
"ack_str": "[✓]",
"nak_str": "[x]",
@@ -170,18 +173,23 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
global db_file_path, log_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global node_list_16ths, channel_list_16ths
global theme, COLOR_CONFIG
global node_sort
global node_sort, notification_sound
channel_list_16ths = loaded_config["channel_list_16ths"]
node_list_16ths = loaded_config["node_list_16ths"]
db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"]
message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"]
notification_sound = loaded_config["notification_sound"]
ack_implicit_str = loaded_config["ack_implicit_str"]
ack_str = loaded_config["ack_str"]
nak_str = loaded_config["nak_str"]
ack_unknown_str = loaded_config["ack_unknown_str"]
node_sort = loaded_config["node_sort"]
theme = loaded_config["theme"]
if theme == "dark":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
@@ -189,7 +197,6 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
node_sort = loaded_config["node_sort"]
# Call the function when the script is imported

View File

@@ -3,6 +3,18 @@ import re
from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict
from contact.utilities.singleton import interface_state, ui_state
def get_node_color(node_index: int, reverse: bool = False):
node_num = ui_state.node_list[node_index]
node = interface_state.interface.nodesByNum.get(node_num, {})
if node.get("isFavorite"):
return get_color("node_favorite", reverse=reverse)
elif node.get("isIgnored"):
return get_color("node_ignored", reverse=reverse)
return get_color("settings_default", reverse=reverse)
# Aliases
Segment = tuple[str, str, bool, bool]
@@ -128,7 +140,6 @@ def draw_arrows(
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
) -> None:
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if show_save_option else 0)
if visible_height < mi:
@@ -316,3 +327,91 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrapped_lines.append(line_buffer)
return wrapped_lines
def move_main_highlight(
old_idx: int, new_idx, options: List[str], menu_win: curses.window, menu_pad: curses.window, ui_state: object
) -> None:
if old_idx == new_idx: # No-op
return
max_index = len(options) - 1
visible_height = menu_win.getmaxyx()[0] - 2
if new_idx < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
ui_state.start_index[ui_state.current_window] = new_idx
elif new_idx >= ui_state.start_index[ui_state.current_window] + visible_height: # Moving below the visible area
ui_state.start_index[ui_state.current_window] = new_idx - visible_height + 1
# Ensure start_index is within bounds
ui_state.start_index[ui_state.current_window] = max(
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
)
highlight_line(menu_win, menu_pad, old_idx, new_idx, visible_height)
if ui_state.current_window == 0: # hack to fix max_index
max_index += 1
draw_main_arrows(menu_win, max_index, window=ui_state.current_window)
menu_win.refresh()
def highlight_line(
menu_win: curses.window, menu_pad: curses.window, old_idx: int, new_idx: int, visible_height: int
) -> None:
if ui_state.current_window == 0:
color_old = (
get_color("channel_selected") if old_idx == ui_state.selected_channel else get_color("channel_list")
)
color_new = get_color("channel_list", reverse=True) if True else get_color("channel_list", reverse=True)
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, color_old)
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
elif ui_state.current_window == 2:
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(
ui_state.start_index[ui_state.current_window],
0,
menu_win.getbegyx()[0] + 1,
menu_win.getbegyx()[1] + 1,
menu_win.getbegyx()[0] + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 3,
)
def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None:
height, width = win.getmaxyx()
usable_height = height - 2
usable_width = width - 2
if window == 1 and ui_state.display_log:
if log_height := kwargs.get("log_height"):
usable_height -= log_height - 1
if usable_height < max_index:
if ui_state.start_index[window] > 0:
win.addstr(1, usable_width, "", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
if max_index - ui_state.start_index[window] - 1 >= usable_height:
win.addstr(usable_height, usable_width, "", get_color("settings_default"))
else:
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
def get_msg_window_lines(messages_win, packetlog_win) -> None:
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height

View File

@@ -1,11 +1,42 @@
from typing import Any, Union, List, Dict
from dataclasses import dataclass, field
@dataclass
class MenuState:
def __init__(self):
self.menu_index: List[int] = [] # Row we left the previous menus
self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit
self.selected_index: int = 0 # Selected Row
self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu
self.menu_path: List[str] = [] # Menu Path
self.show_save_option: bool = False # Display 'Save'
menu_index: List[int] = field(default_factory=list)
start_index: List[int] = field(default_factory=lambda: [0])
selected_index: int = 0
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict)
menu_path: List[str] = field(default_factory=list)
show_save_option: bool = False
@dataclass
class ChatUIState:
display_log: bool = False
channel_list: List[str] = field(default_factory=list)
all_messages: Dict[str, List[str]] = field(default_factory=dict)
notifications: List[str] = field(default_factory=list)
packet_buffer: List[str] = field(default_factory=list)
node_list: List[str] = field(default_factory=list)
selected_channel: int = 0
selected_message: int = 0
selected_node: int = 0
current_window: int = 0
selected_index: int = 0
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
show_save_option: bool = False
menu_path: List[str] = field(default_factory=list)
@dataclass
class InterfaceState:
interface: Any = None
myNodeNum: int = 0
@dataclass
class AppState:
lock: Any = None

View File

@@ -55,10 +55,15 @@ def edit_value(key: str, current_value: str) -> str:
# Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort":
sort_options = ["lastHeard", "name", "hops"]
return get_list_input("Sort By", current_value, sort_options)
elif key == "notification_sound":
sound_options = ["True", "False"]
return get_list_input("Notification Sound", current_value, sound_options)
# Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1)
@@ -176,7 +181,7 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
draw_arrows(menu_win, visible_height, max_index, menu_state.start_index, show_save_option=False)
draw_arrows(menu_win, visible_height, max_index, menu_state.start_index, menu_state.show_save_option)
return menu_win, menu_pad, options

View File

@@ -1,5 +1,6 @@
import yaml
import logging
import time
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import mt_config
@@ -133,24 +134,29 @@ def config_import(interface, filename):
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(configuration["owner"])
time.sleep(0.5)
if "owner_short" in configuration:
logging.info(f"Setting device owner short to {configuration['owner_short']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
time.sleep(0.5)
if "ownerShort" in configuration:
logging.info(f"Setting device owner short to {configuration['ownerShort']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
time.sleep(0.5)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode("^local").setURL(configuration["channel_url"])
time.sleep(0.5)
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode("^local").setURL(configuration["channelUrl"])
time.sleep(0.5)
if "location" in configuration:
alt = 0
@@ -169,12 +175,14 @@ def config_import(interface, filename):
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)
if "config" in configuration:
localConfig = interface.getNode("^local").localConfig
for section in configuration["config"]:
traverseConfig(section, configuration["config"][section], localConfig)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
if "module_config" in configuration:
moduleConfig = interface.getNode("^local").moduleConfig
@@ -185,6 +193,7 @@ def config_import(interface, filename):
moduleConfig,
)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
interface.getNode("^local", False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")

View File

@@ -6,12 +6,14 @@ from typing import Optional, Union, Dict
from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
def get_table_name(channel: str) -> str:
# Construct the table name
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
@@ -61,7 +63,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
message_text = ?
"""
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message))
db_connection.commit()
except sqlite3.Error as e:
@@ -72,13 +74,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
def load_messages_from_db() -> None:
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
"""Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages
@@ -104,15 +106,15 @@ def load_messages_from_db() -> None:
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
channel = int(channel) if channel.isdigit() else channel
# Add the channel to globals.channel_list if not already present
if channel not in globals.channel_list and not is_chat_archived(channel):
globals.channel_list.append(channel)
# Add the channel to ui_state.channel_list if not already present
if channel not in ui_state.channel_list and not is_chat_archived(channel):
ui_state.channel_list.append(channel)
# Ensure the channel exists in globals.all_messages
if channel not in globals.all_messages:
globals.all_messages[channel] = []
# Ensure the channel exists in ui_state.all_messages
if channel not in ui_state.all_messages:
ui_state.all_messages[channel] = []
# Add messages to globals.all_messages grouped by hourly timestamp
# Add messages to ui_state.all_messages grouped by hourly timestamp
hourly_messages = {}
for user_id, message, timestamp, ack_type in db_messages:
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
@@ -127,7 +129,7 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak":
ack_str = config.nak_str
if user_id == str(globals.myNodeNum):
if user_id == str(interface_state.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (
@@ -137,10 +139,10 @@ def load_messages_from_db() -> None:
hourly_messages[hour].append(formatted_message)
# Flatten the hourly messages into globals.all_messages[channel]
# Flatten the hourly messages into ui_state.all_messages[channel]
for hour, messages in sorted(hourly_messages.items()):
globals.all_messages[channel].append((f"-- {hour} --", ""))
globals.all_messages[channel].extend(messages)
ui_state.all_messages[channel].append((f"-- {hour} --", ""))
ui_state.all_messages[channel].extend(messages)
except sqlite3.Error as e:
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
@@ -153,11 +155,11 @@ def init_nodedb() -> None:
"""Initialize the node database and update it with nodes from the interface."""
try:
if not globals.interface.nodes:
if not interface_state.interface.nodes:
return # No nodes to initialize
ensure_node_table_exists() # Ensure the table exists before insertion
nodes_snapshot = list(globals.interface.nodes.values())
nodes_snapshot = list(interface_state.interface.nodes.values())
# Insert or update all nodes
for node in nodes_snapshot:
@@ -214,7 +216,7 @@ def update_node_info_in_db(
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
if "chat_archived" not in table_columns:
@@ -278,7 +280,7 @@ def update_node_info_in_db(
def ensure_node_table_exists() -> None:
"""Ensure the node database table exists."""
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety
schema = """
user_id TEXT PRIMARY KEY,
long_name TEXT,
@@ -319,7 +321,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
@@ -345,7 +347,7 @@ def is_chat_archived(user_id: int) -> int:
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))

View File

@@ -104,11 +104,13 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
return [base64.b64encode(b).decode() for b in byte_strings]
def is_valid_base64(s):
"""Check if a string is valid Base64."""
"""Check if a string is valid Base64 or blank."""
if s == "":
return True
try:
decoded = base64.b64decode(s, validate=True)
return len(decoded) == 32 # Ensure it's exactly 32 bytes
except binascii.Error:
except (binascii.Error, ValueError):
return False
cvalue = to_base64(current_value) # Convert current values to Base64

View File

@@ -0,0 +1,5 @@
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState
ui_state = ChatUIState()
interface_state = InterfaceState()
app_state = AppState()

View File

@@ -1,16 +1,17 @@
import contact.globals as globals
import datetime
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
def get_channels():
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
node = globals.interface.getNode("^local")
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages."""
node = interface_state.interface.getNode("^local")
device_channels = node.channels
# Clear and rebuild channel list
# globals.channel_list = []
# ui_state.channel_list = []
for device_channel in device_channels:
if device_channel.role:
@@ -26,20 +27,20 @@ def get_channels():
].name
channel_name = convert_to_camel_case(modem_preset_string)
# Add channel to globals.channel_list if not already present
if channel_name not in globals.channel_list:
globals.channel_list.append(channel_name)
# Add channel to ui_state.channel_list if not already present
if channel_name not in ui_state.channel_list:
ui_state.channel_list.append(channel_name)
# Initialize globals.all_messages[channel_name] if it doesn't exist
if channel_name not in globals.all_messages:
globals.all_messages[channel_name] = []
# Initialize ui_state.all_messages[channel_name] if it doesn't exist
if channel_name not in ui_state.all_messages:
ui_state.all_messages[channel_name] = []
return globals.channel_list
return ui_state.channel_list
def get_node_list():
if globals.interface.nodes:
my_node_num = globals.myNodeNum
if interface_state.interface.nodes:
my_node_num = interface_state.myNodeNum
def node_sort(node):
if config.node_sort == "lastHeard":
@@ -51,7 +52,7 @@ def get_node_list():
else:
return node
sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort)
# Move favorite nodes to the beginning
sorted_nodes = sorted(
@@ -68,14 +69,14 @@ def get_node_list():
def refresh_node_list():
new_node_list = get_node_list()
if new_node_list != globals.node_list:
globals.node_list = new_node_list
if new_node_list != ui_state.node_list:
ui_state.node_list = new_node_list
return True
return False
def get_nodeNum():
myinfo = globals.interface.getMyNodeInfo()
myinfo = interface_state.interface.getMyNodeInfo()
myNodeNum = myinfo["num"]
return myNodeNum

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.7"
version = "1.3.12"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}