Compare commits

...

34 Commits

Author SHA1 Message Date
pdxlocations
8b05072786 bump version 2025-06-13 15:33:57 -07:00
pdxlocations
4455781e6c fix types and returns 2025-06-12 16:38:05 -07:00
pdxlocations
0c8aaee415 Merge pull request #197 from rfschmid/redirect-sound-player-output-to-dev-null
Redirect sound player output to dev null
2025-06-12 16:10:26 -07:00
pdxlocations
b97d9f4649 Merge pull request #196 from rfschmid/only-clear-input-text-on-enter-if-sending-message
Only clear input on enter when sending message
2025-06-12 16:09:44 -07:00
Russell Schmidt
4152fb6a21 Redirect sound player output to dev null
On my linux system, the sound playing code goes to aplay. When called,
aplay outputs a message about the file it is playing to stderr, which
causes it to be printed on the input line, which can't be easily
cleared. Redirect output from the audio player executable to dev/null.
Deduplicate sound playing code a bit so we only need one call to
subprocess.run, so I don't have to make this change in three places.
2025-06-12 17:28:30 -05:00
Russell Schmidt
384e36dac2 Only clear input on enter when sending message
We should only clear the input field when the user presses enter if the
user actually sent the message. If selecting a different node to send
to, don't clear input.
2025-06-12 17:23:03 -05:00
pdxlocations
65bca84fe6 minor refactor 2025-06-10 23:24:11 -07:00
pdxlocations
16fa2830fd bump version 2025-06-10 22:29:31 -07:00
pdxlocations
c8f1da99e3 Merge pull request #194 from rfschmid:fix-crash-with-newlines
Fix crash with newlines, message spacing
2025-06-10 22:28:41 -07:00
Russell Schmidt
702250c329 Fix crash with newlines, message spacing 2025-06-10 17:42:14 -05:00
pdxlocations
6291082405 Merge pull request #192 from rfschmid/fix-wrapping-with-wide-chars
Fix crash when wrapping with wide characters
2025-06-10 12:19:02 -07:00
pdxlocations
4fa5148664 Merge pull request #193 from rfschmid/fix-backspace
Fix enter not clearing input
2025-06-10 11:57:06 -07:00
Russell Schmidt
d62ec09eea Fix enter not clearing input
Similar to 981d72e, pressing enter wasn't clearing the input field.
2025-06-10 12:19:03 -05:00
Russell Schmidt
61026dcc73 Fix crash when wrapping with wide characters
Update contact_ui.py to use already-existing custom wrap function
implemented in nav_utils instead of textwrap library. Update custom
wrap_text function to use east_asian_width to determine characters that
can use two columns of width.
2025-06-10 12:17:23 -05:00
pdxlocations
1362d3a219 bump version 2025-06-10 10:02:04 -07:00
pdxlocations
981d72e688 fix backspace 2025-06-10 10:01:44 -07:00
pdxlocations
0b5ec0b3d7 Merge pull request #191 from pdxlocations:refactor-ui-functions
Refactor keypress handling
2025-06-09 23:20:42 -07:00
pdxlocations
cbb4ef9e34 break out key functions 2025-06-09 23:19:28 -07:00
pdxlocations
fecd71f4b7 refactor window sizes 2025-06-09 22:37:51 -07:00
pdxlocations
59edfab451 add notif sound prefs (#190) 2025-06-09 22:15:53 -07:00
pdxlocations
39159099e1 change prints to logging 2025-06-09 19:01:40 -07:00
pdxlocations
02e5368c61 waits in configio 2025-06-09 07:40:07 -07:00
pdxlocations
9d234a75d8 change default configs order 2025-06-06 22:45:06 -07:00
pdxlocations
c7edd602ec Make widths configurable (#189) 2025-06-06 22:37:10 -07:00
pdxlocations
00226c5b4d don't use white in green config (#188) 2025-06-06 22:19:58 -07:00
pdxlocations
243079f8eb Error Handling for play_sound (#187)
* add sound for mac and linux

* add error handling for sounds

* use subprocess
2025-06-06 22:04:18 -07:00
pdxlocations
1e0432642c add sound for mac and linux (#183) 2025-05-29 10:08:12 -07:00
pdxlocations
71f37065bf bump version 2025-05-29 10:03:23 -07:00
pdxlocations
ee6aad5d0a allow blank key (#178) 2025-05-18 16:57:49 -07:00
pdxlocations
478f017de1 bump version 2025-05-18 14:43:47 -07:00
pdxlocations
c96c4edb01 Add Arrows to Main UI (#177)
* init

* convert globals to dataclass

* move lock to app state

* Almost working changes

* more almost working changes

* so close

* mostly working changes

* closer changes

* I think it works!

* working changes

* hack fix

* Merge branch 'main' into refactor-chat-ui

* clean-up
2025-05-18 12:28:03 -07:00
pdxlocations
cc416476f5 Merge pull request #174 from rfschmid/patch-1 2025-04-25 21:20:28 -07:00
Russell Schmidt
7fc1cbc3a9 Fix error "No module named 'ui.ui_state'"
Was unable to run locally at tips due to an import not including the package name.
2025-04-23 17:02:47 -05:00
pdxlocations
78f0775ad5 Convert Globals to Class (#173)
* init

* convert globals to dataclass

* move lock to app state
2025-04-19 21:37:54 -07:00
15 changed files with 834 additions and 515 deletions

View File

@@ -24,7 +24,6 @@ import traceback
from pubsub import pub
# Local application
import contact.globals as globals
import contact.ui.default_config as config
from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region
@@ -36,7 +35,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input
from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
from contact.utilities.singleton import ui_state, interface_state, app_state
# ------------------------------------------------------------------------------
# Environment & Logging Setup
@@ -52,28 +51,31 @@ logging.basicConfig(
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
globals.lock = threading.Lock()
app_state.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
# ------------------------------------------------------------------------------
def prompt_region_if_unset(args: object) -> None:
"""Prompt user to set region if it is unset."""
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
def initialize_globals(args) -> None:
def initialize_globals(args: object) -> None:
"""Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
interface_state.interface = initialize_interface(args)
# Prompt for region if unset
if globals.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
if interface_state.interface.localNode.localConfig.lora.region == 0:
prompt_region_if_unset(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
ui_state.node_list = get_node_list()
pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb()
@@ -96,7 +98,7 @@ def main(stdscr: curses.window) -> None:
return
logging.info("Initializing interface...")
with globals.lock:
with app_state.lock:
initialize_globals(args)
logging.info("Starting main UI")

View File

@@ -1,13 +0,0 @@
interface = None
lock = None
display_log = False
all_messages = {}
channel_list = []
notifications = []
packet_buffer = []
node_list = []
myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0
current_window = 0

View File

@@ -1,4 +1,8 @@
import logging
import os
import platform
import shutil
import subprocess
import time
from datetime import datetime
from typing import Any, Dict
@@ -18,7 +22,45 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
sound_path = ""
executable = ""
if system == "Darwin": #macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
sound_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
if(shutil.which("paplay")):
executable = "paplay"
else:
executable = "aplay"
if executable != "" and sound_path != "":
if os.path.exists(sound_path):
if shutil.which(executable):
subprocess.run([executable, sound_path], check=True,
stdout=subprocess.DEVNULL, stderr = subprocess.DEVNULL)
return
else:
logging.warning("No sound player found (afplay/paplay/aplay)")
else:
logging.warning(f"Sound file not found: {sound_path}")
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
# Final fallback: terminal beep
print("\a")
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
@@ -29,14 +71,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with globals.lock:
with app_state.lock:
# Update packet log
globals.packet_buffer.append(packet)
if len(globals.packet_buffer) > 20:
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
globals.packet_buffer = globals.packet_buffer[-20:]
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if globals.display_log:
if ui_state.display_log:
draw_packetlog_win()
try:
if "decoded" not in packet:
@@ -52,6 +94,10 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
if config.notification_sound == "True":
play_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
@@ -63,19 +109,19 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
else:
channel_number = 0
if packet["to"] == globals.myNodeNum:
if packet["from"] in globals.channel_list:
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:
globals.channel_list.append(packet["from"])
if packet["from"] not in globals.all_messages:
globals.all_messages[packet["from"]] = []
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = globals.channel_list.index(packet["from"])
channel_number = ui_state.channel_list.index(packet["from"])
if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
@@ -85,15 +131,15 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[globals.channel_list[channel_number]]
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
@@ -107,9 +153,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
globals.all_messages[globals.channel_list[channel_number]].append(
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
@@ -118,7 +164,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
if refresh_messages:
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -13,7 +13,8 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -31,12 +32,12 @@ def onAckNak(packet: Dict[str, Any]) -> None:
return
acknak = ack_naks.pop(request)
message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
confirm_string = " "
ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str
ack_type = "Implicit"
else:
@@ -46,15 +47,15 @@ def onAckNak(packet: Dict[str, Any]) -> None:
confirm_string = config.nak_str
ack_type = "Nak"
globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
config.sent_message_prefix + confirm_string + ": ",
message,
)
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
channel_number = globals.channel_list.index(acknak["channel"])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
channel_number = ui_state.channel_list.index(acknak["channel"])
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
draw_messages_window()
@@ -137,16 +138,16 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
msg_str += route_str + "\n" # Print the route back to us
if packet["from"] not in globals.channel_list:
globals.channel_list.append(packet["from"])
if packet["from"] not in ui_state.channel_list:
ui_state.channel_list.append(packet["from"])
refresh_channels = True
if is_chat_archived(packet["from"]):
update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = globals.channel_list.index(packet["from"])
channel_number = ui_state.channel_list.index(packet["from"])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
@@ -154,9 +155,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append(
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
@@ -164,23 +165,23 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
Sends a chat message using the selected channel.
"""
myid = globals.myNodeNum
myid = interface_state.myNodeNum
send_on_channel = 0
channel_id = globals.channel_list[channel]
channel_id = ui_state.channel_list[channel]
if isinstance(channel_id, int):
send_on_channel = 0
destination = channel_id
elif isinstance(channel_id, str):
send_on_channel = channel
sent_message_data = globals.interface.sendText(
sent_message_data = interface_state.interface.sendText(
text=message,
destinationId=destination,
wantAck=True,
@@ -190,15 +191,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
)
# Add sent message to the messages dictionary
if channel_id not in globals.all_messages:
globals.all_messages[channel_id] = []
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[channel_id]
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
@@ -212,15 +213,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message)
ack_naks[sent_message_data.id] = {
"channel": channel_id,
"messageIndex": len(globals.all_messages[channel_id]) - 1,
"messageIndex": len(ui_state.all_messages[channel_id]) - 1,
"timestamp": timestamp,
}
@@ -230,9 +231,9 @@ def send_traceroute() -> None:
Sends a RouteDiscovery protobuf to the selected node.
"""
r = mesh_pb2.RouteDiscovery()
globals.interface.sendData(
interface_state.interface.sendData(
r,
destinationId=globals.node_list[globals.selected_node],
destinationId=ui_state.node_list[ui_state.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,

File diff suppressed because it is too large Load Diff

View File

@@ -123,15 +123,18 @@ def initialize_config() -> Dict[str, object]:
"settings_breadcrumbs": ["green", "black"],
"settings_warning": ["green", "black"],
"settings_note": ["green", "black"],
"node_favorite": ["cyan", "white"],
"node_ignored": ["red", "white"],
"node_favorite": ["cyan", "green"],
"node_ignored": ["red", "black"],
}
default_config_variables = {
"channel_list_16ths": "3",
"node_list_16ths": "5",
"db_file_path": db_file_path,
"log_file_path": log_file_path,
"message_prefix": ">>",
"sent_message_prefix": ">> Sent",
"notification_symbol": "*",
"notification_sound": "True",
"ack_implicit_str": "[◌]",
"ack_str": "[✓]",
"nak_str": "[x]",
@@ -170,18 +173,23 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
global db_file_path, log_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global node_list_16ths, channel_list_16ths
global theme, COLOR_CONFIG
global node_sort
global node_sort, notification_sound
channel_list_16ths = loaded_config["channel_list_16ths"]
node_list_16ths = loaded_config["node_list_16ths"]
db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"]
message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"]
notification_sound = loaded_config["notification_sound"]
ack_implicit_str = loaded_config["ack_implicit_str"]
ack_str = loaded_config["ack_str"]
nak_str = loaded_config["nak_str"]
ack_unknown_str = loaded_config["ack_unknown_str"]
node_sort = loaded_config["node_sort"]
theme = loaded_config["theme"]
if theme == "dark":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
@@ -189,7 +197,6 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
node_sort = loaded_config["node_sort"]
# Call the function when the script is imported

View File

@@ -1,8 +1,22 @@
import curses
import re
from unicodedata import east_asian_width
from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict
from contact.utilities.singleton import interface_state, ui_state
def get_node_color(node_index: int, reverse: bool = False):
node_num = ui_state.node_list[node_index]
node = interface_state.interface.nodesByNum.get(node_num, {})
if node.get("isFavorite"):
return get_color("node_favorite", reverse=reverse)
elif node.get("isIgnored"):
return get_color("node_ignored", reverse=reverse)
return get_color("settings_default", reverse=reverse)
# Aliases
Segment = tuple[str, str, bool, bool]
@@ -128,7 +142,6 @@ def draw_arrows(
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
) -> None:
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if show_save_option else 0)
if visible_height < mi:
@@ -282,9 +295,16 @@ def get_wrapped_help_text(
return wrapped_help
def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words."""
whitespace = '\t\n\x0b\x0c\r '
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
text = text.translate(whitespace_trans)
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
wrapped_lines = []
line_buffer = ""
@@ -293,11 +313,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrap_width -= margin
for word in words:
word_length = len(word)
word_length = text_width(word)
if word_length > wrap_width: # Break long words
if line_buffer:
wrapped_lines.append(line_buffer)
wrapped_lines.append(line_buffer.strip())
line_buffer = ""
line_length = 0
for i in range(0, word_length, wrap_width):
@@ -305,7 +325,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
continue
if line_length + word_length > wrap_width and word.strip():
wrapped_lines.append(line_buffer)
wrapped_lines.append(line_buffer.strip())
line_buffer = ""
line_length = 0
@@ -313,6 +333,94 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
line_length += word_length
if line_buffer:
wrapped_lines.append(line_buffer)
wrapped_lines.append(line_buffer.strip())
return wrapped_lines
def move_main_highlight(
old_idx: int, new_idx, options: List[str], menu_win: curses.window, menu_pad: curses.window, ui_state: object
) -> None:
if old_idx == new_idx: # No-op
return
max_index = len(options) - 1
visible_height = menu_win.getmaxyx()[0] - 2
if new_idx < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
ui_state.start_index[ui_state.current_window] = new_idx
elif new_idx >= ui_state.start_index[ui_state.current_window] + visible_height: # Moving below the visible area
ui_state.start_index[ui_state.current_window] = new_idx - visible_height + 1
# Ensure start_index is within bounds
ui_state.start_index[ui_state.current_window] = max(
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
)
highlight_line(menu_win, menu_pad, old_idx, new_idx, visible_height)
if ui_state.current_window == 0: # hack to fix max_index
max_index += 1
draw_main_arrows(menu_win, max_index, window=ui_state.current_window)
menu_win.refresh()
def highlight_line(
menu_win: curses.window, menu_pad: curses.window, old_idx: int, new_idx: int, visible_height: int
) -> None:
if ui_state.current_window == 0:
color_old = (
get_color("channel_selected") if old_idx == ui_state.selected_channel else get_color("channel_list")
)
color_new = get_color("channel_list", reverse=True) if True else get_color("channel_list", reverse=True)
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, color_old)
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
elif ui_state.current_window == 2:
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(
ui_state.start_index[ui_state.current_window],
0,
menu_win.getbegyx()[0] + 1,
menu_win.getbegyx()[1] + 1,
menu_win.getbegyx()[0] + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 3,
)
def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None:
height, width = win.getmaxyx()
usable_height = height - 2
usable_width = width - 2
if window == 1 and ui_state.display_log:
if log_height := kwargs.get("log_height"):
usable_height -= log_height - 1
if usable_height < max_index:
if ui_state.start_index[window] > 0:
win.addstr(1, usable_width, "", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
if max_index - ui_state.start_index[window] - 1 >= usable_height:
win.addstr(usable_height, usable_width, "", get_color("settings_default"))
else:
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
def get_msg_window_lines(messages_win, packetlog_win) -> None:
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height

View File

@@ -1,11 +1,42 @@
from typing import Any, Union, List, Dict
from dataclasses import dataclass, field
@dataclass
class MenuState:
def __init__(self):
self.menu_index: List[int] = [] # Row we left the previous menus
self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit
self.selected_index: int = 0 # Selected Row
self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu
self.menu_path: List[str] = [] # Menu Path
self.show_save_option: bool = False # Display 'Save'
menu_index: List[int] = field(default_factory=list)
start_index: List[int] = field(default_factory=lambda: [0])
selected_index: int = 0
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict)
menu_path: List[str] = field(default_factory=list)
show_save_option: bool = False
@dataclass
class ChatUIState:
display_log: bool = False
channel_list: List[str] = field(default_factory=list)
all_messages: Dict[str, List[str]] = field(default_factory=dict)
notifications: List[str] = field(default_factory=list)
packet_buffer: List[str] = field(default_factory=list)
node_list: List[str] = field(default_factory=list)
selected_channel: int = 0
selected_message: int = 0
selected_node: int = 0
current_window: int = 0
selected_index: int = 0
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
show_save_option: bool = False
menu_path: List[str] = field(default_factory=list)
@dataclass
class InterfaceState:
interface: Any = None
myNodeNum: int = 0
@dataclass
class AppState:
lock: Any = None

View File

@@ -55,10 +55,15 @@ def edit_value(key: str, current_value: str) -> str:
# Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort":
sort_options = ["lastHeard", "name", "hops"]
return get_list_input("Sort By", current_value, sort_options)
elif key == "notification_sound":
sound_options = ["True", "False"]
return get_list_input("Notification Sound", current_value, sound_options)
# Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1)

View File

@@ -1,5 +1,6 @@
import yaml
import logging
import time
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import mt_config
@@ -133,24 +134,29 @@ def config_import(interface, filename):
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(configuration["owner"])
time.sleep(0.5)
if "owner_short" in configuration:
logging.info(f"Setting device owner short to {configuration['owner_short']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
time.sleep(0.5)
if "ownerShort" in configuration:
logging.info(f"Setting device owner short to {configuration['ownerShort']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
time.sleep(0.5)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode("^local").setURL(configuration["channel_url"])
time.sleep(0.5)
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode("^local").setURL(configuration["channelUrl"])
time.sleep(0.5)
if "location" in configuration:
alt = 0
@@ -169,12 +175,14 @@ def config_import(interface, filename):
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)
if "config" in configuration:
localConfig = interface.getNode("^local").localConfig
for section in configuration["config"]:
traverseConfig(section, configuration["config"][section], localConfig)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
if "module_config" in configuration:
moduleConfig = interface.getNode("^local").moduleConfig
@@ -185,6 +193,7 @@ def config_import(interface, filename):
moduleConfig,
)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
interface.getNode("^local", False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")

View File

@@ -6,12 +6,14 @@ from typing import Optional, Union, Dict
from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
def get_table_name(channel: str) -> str:
# Construct the table name
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
@@ -61,7 +63,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
message_text = ?
"""
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message))
db_connection.commit()
except sqlite3.Error as e:
@@ -72,13 +74,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
def load_messages_from_db() -> None:
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
"""Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages
@@ -104,15 +106,15 @@ def load_messages_from_db() -> None:
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
channel = int(channel) if channel.isdigit() else channel
# Add the channel to globals.channel_list if not already present
if channel not in globals.channel_list and not is_chat_archived(channel):
globals.channel_list.append(channel)
# Add the channel to ui_state.channel_list if not already present
if channel not in ui_state.channel_list and not is_chat_archived(channel):
ui_state.channel_list.append(channel)
# Ensure the channel exists in globals.all_messages
if channel not in globals.all_messages:
globals.all_messages[channel] = []
# Ensure the channel exists in ui_state.all_messages
if channel not in ui_state.all_messages:
ui_state.all_messages[channel] = []
# Add messages to globals.all_messages grouped by hourly timestamp
# Add messages to ui_state.all_messages grouped by hourly timestamp
hourly_messages = {}
for user_id, message, timestamp, ack_type in db_messages:
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
@@ -127,7 +129,7 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak":
ack_str = config.nak_str
if user_id == str(globals.myNodeNum):
if user_id == str(interface_state.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (
@@ -137,10 +139,10 @@ def load_messages_from_db() -> None:
hourly_messages[hour].append(formatted_message)
# Flatten the hourly messages into globals.all_messages[channel]
# Flatten the hourly messages into ui_state.all_messages[channel]
for hour, messages in sorted(hourly_messages.items()):
globals.all_messages[channel].append((f"-- {hour} --", ""))
globals.all_messages[channel].extend(messages)
ui_state.all_messages[channel].append((f"-- {hour} --", ""))
ui_state.all_messages[channel].extend(messages)
except sqlite3.Error as e:
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
@@ -153,11 +155,11 @@ def init_nodedb() -> None:
"""Initialize the node database and update it with nodes from the interface."""
try:
if not globals.interface.nodes:
if not interface_state.interface.nodes:
return # No nodes to initialize
ensure_node_table_exists() # Ensure the table exists before insertion
nodes_snapshot = list(globals.interface.nodes.values())
nodes_snapshot = list(interface_state.interface.nodes.values())
# Insert or update all nodes
for node in nodes_snapshot:
@@ -214,7 +216,7 @@ def update_node_info_in_db(
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
if "chat_archived" not in table_columns:
@@ -278,7 +280,7 @@ def update_node_info_in_db(
def ensure_node_table_exists() -> None:
"""Ensure the node database table exists."""
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety
schema = """
user_id TEXT PRIMARY KEY,
long_name TEXT,
@@ -319,7 +321,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
@@ -345,7 +347,7 @@ def is_chat_archived(user_id: int) -> int:
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))

View File

@@ -104,11 +104,13 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
return [base64.b64encode(b).decode() for b in byte_strings]
def is_valid_base64(s):
"""Check if a string is valid Base64."""
"""Check if a string is valid Base64 or blank."""
if s == "":
return True
try:
decoded = base64.b64decode(s, validate=True)
return len(decoded) == 32 # Ensure it's exactly 32 bytes
except binascii.Error:
except (binascii.Error, ValueError):
return False
cvalue = to_base64(current_value) # Convert current values to Base64

View File

@@ -0,0 +1,5 @@
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState
ui_state = ChatUIState()
interface_state = InterfaceState()
app_state = AppState()

View File

@@ -1,16 +1,17 @@
import contact.globals as globals
import datetime
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
def get_channels():
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
node = globals.interface.getNode("^local")
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages."""
node = interface_state.interface.getNode("^local")
device_channels = node.channels
# Clear and rebuild channel list
# globals.channel_list = []
# ui_state.channel_list = []
for device_channel in device_channels:
if device_channel.role:
@@ -26,20 +27,20 @@ def get_channels():
].name
channel_name = convert_to_camel_case(modem_preset_string)
# Add channel to globals.channel_list if not already present
if channel_name not in globals.channel_list:
globals.channel_list.append(channel_name)
# Add channel to ui_state.channel_list if not already present
if channel_name not in ui_state.channel_list:
ui_state.channel_list.append(channel_name)
# Initialize globals.all_messages[channel_name] if it doesn't exist
if channel_name not in globals.all_messages:
globals.all_messages[channel_name] = []
# Initialize ui_state.all_messages[channel_name] if it doesn't exist
if channel_name not in ui_state.all_messages:
ui_state.all_messages[channel_name] = []
return globals.channel_list
return ui_state.channel_list
def get_node_list():
if globals.interface.nodes:
my_node_num = globals.myNodeNum
if interface_state.interface.nodes:
my_node_num = interface_state.myNodeNum
def node_sort(node):
if config.node_sort == "lastHeard":
@@ -51,7 +52,7 @@ def get_node_list():
else:
return node
sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort)
# Move favorite nodes to the beginning
sorted_nodes = sorted(
@@ -68,14 +69,14 @@ def get_node_list():
def refresh_node_list():
new_node_list = get_node_list()
if new_node_list != globals.node_list:
globals.node_list = new_node_list
if new_node_list != ui_state.node_list:
ui_state.node_list = new_node_list
return True
return False
def get_nodeNum():
myinfo = globals.interface.getMyNodeInfo()
myinfo = interface_state.interface.getMyNodeInfo()
myNodeNum = myinfo["num"]
return myNodeNum

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.8"
version = "1.3.14"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}