forked from iarv/contact
Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b5ec0b3d7 | ||
|
|
cbb4ef9e34 | ||
|
|
fecd71f4b7 | ||
|
|
59edfab451 | ||
|
|
39159099e1 | ||
|
|
02e5368c61 | ||
|
|
9d234a75d8 | ||
|
|
c7edd602ec | ||
|
|
00226c5b4d | ||
|
|
243079f8eb | ||
|
|
1e0432642c | ||
|
|
71f37065bf | ||
|
|
ee6aad5d0a | ||
|
|
478f017de1 | ||
|
|
c96c4edb01 | ||
|
|
cc416476f5 | ||
|
|
7fc1cbc3a9 | ||
|
|
78f0775ad5 | ||
|
|
43f0929247 | ||
|
|
941e081e90 | ||
|
|
2e8af740be | ||
|
|
a95f128d8e |
@@ -24,7 +24,6 @@ import traceback
|
||||
from pubsub import pub
|
||||
|
||||
# Local application
|
||||
import contact.globals as globals
|
||||
import contact.ui.default_config as config
|
||||
from contact.message_handlers.rx_handler import on_receive
|
||||
from contact.settings import set_region
|
||||
@@ -36,7 +35,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
|
||||
from contact.utilities.input_handlers import get_list_input
|
||||
from contact.utilities.interfaces import initialize_interface
|
||||
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
|
||||
|
||||
from contact.utilities.singleton import ui_state, interface_state, app_state
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Environment & Logging Setup
|
||||
@@ -52,7 +51,7 @@ logging.basicConfig(
|
||||
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
|
||||
globals.lock = threading.Lock()
|
||||
app_state.lock = threading.Lock()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Main Program Logic
|
||||
@@ -61,19 +60,19 @@ globals.lock = threading.Lock()
|
||||
|
||||
def initialize_globals(args) -> None:
|
||||
"""Initializes interface and shared globals."""
|
||||
globals.interface = initialize_interface(args)
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
# Prompt for region if unset
|
||||
if globals.interface.localNode.localConfig.lora.region == 0:
|
||||
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
||||
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
set_region(globals.interface)
|
||||
globals.interface.close()
|
||||
globals.interface = initialize_interface(args)
|
||||
set_region(interface_state.interface)
|
||||
interface_state.interface.close()
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
globals.myNodeNum = get_nodeNum()
|
||||
globals.channel_list = get_channels()
|
||||
globals.node_list = get_node_list()
|
||||
interface_state.myNodeNum = get_nodeNum()
|
||||
ui_state.channel_list = get_channels()
|
||||
ui_state.node_list = get_node_list()
|
||||
pub.subscribe(on_receive, "meshtastic.receive")
|
||||
|
||||
init_nodedb()
|
||||
@@ -96,7 +95,7 @@ def main(stdscr: curses.window) -> None:
|
||||
return
|
||||
|
||||
logging.info("Initializing interface...")
|
||||
with globals.lock:
|
||||
with app_state.lock:
|
||||
initialize_globals(args)
|
||||
logging.info("Starting main UI")
|
||||
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
interface = None
|
||||
lock = None
|
||||
display_log = False
|
||||
all_messages = {}
|
||||
channel_list = []
|
||||
notifications = []
|
||||
packet_buffer = []
|
||||
node_list = []
|
||||
myNodeNum = 0
|
||||
selected_channel = 0
|
||||
selected_message = 0
|
||||
selected_node = 0
|
||||
current_window = 0
|
||||
@@ -1,4 +1,8 @@
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict
|
||||
@@ -18,7 +22,43 @@ from contact.utilities.db_handler import (
|
||||
update_node_info_in_db,
|
||||
)
|
||||
import contact.ui.default_config as config
|
||||
import contact.globals as globals
|
||||
|
||||
from contact.utilities.singleton import ui_state, interface_state, app_state
|
||||
|
||||
|
||||
def play_sound():
|
||||
try:
|
||||
system = platform.system()
|
||||
|
||||
if system == "Darwin": # macOS
|
||||
sound_path = "/System/Library/Sounds/Ping.aiff"
|
||||
if os.path.exists(sound_path):
|
||||
subprocess.run(["afplay", sound_path], check=True)
|
||||
return
|
||||
else:
|
||||
logging.warning(f"macOS sound file not found: {sound_path}")
|
||||
|
||||
elif system == "Linux":
|
||||
sound_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
||||
if os.path.exists(sound_path):
|
||||
if shutil.which("paplay"):
|
||||
subprocess.run(["paplay", sound_path], check=True)
|
||||
return
|
||||
elif shutil.which("aplay"):
|
||||
subprocess.run(["aplay", sound_path], check=True)
|
||||
return
|
||||
else:
|
||||
logging.warning("No sound player found (paplay/aplay)")
|
||||
else:
|
||||
logging.warning(f"Linux sound file not found: {sound_path}")
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
logging.error(f"Sound playback failed: {e}")
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error: {e}")
|
||||
|
||||
# Final fallback: terminal beep
|
||||
print("\a")
|
||||
|
||||
|
||||
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
@@ -29,14 +69,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
packet: The received Meshtastic packet as a dictionary.
|
||||
interface: The Meshtastic interface instance that received the packet.
|
||||
"""
|
||||
with globals.lock:
|
||||
with app_state.lock:
|
||||
# Update packet log
|
||||
globals.packet_buffer.append(packet)
|
||||
if len(globals.packet_buffer) > 20:
|
||||
ui_state.packet_buffer.append(packet)
|
||||
if len(ui_state.packet_buffer) > 20:
|
||||
# Trim buffer to 20 packets
|
||||
globals.packet_buffer = globals.packet_buffer[-20:]
|
||||
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
|
||||
|
||||
if globals.display_log:
|
||||
if ui_state.display_log:
|
||||
draw_packetlog_win()
|
||||
try:
|
||||
if "decoded" not in packet:
|
||||
@@ -52,6 +92,10 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
maybe_store_nodeinfo_in_db(packet)
|
||||
|
||||
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
|
||||
|
||||
if config.notification_sound == "True":
|
||||
play_sound()
|
||||
|
||||
message_bytes = packet["decoded"]["payload"]
|
||||
message_string = message_bytes.decode("utf-8")
|
||||
|
||||
@@ -63,19 +107,19 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
else:
|
||||
channel_number = 0
|
||||
|
||||
if packet["to"] == globals.myNodeNum:
|
||||
if packet["from"] in globals.channel_list:
|
||||
if packet["to"] == interface_state.myNodeNum:
|
||||
if packet["from"] in ui_state.channel_list:
|
||||
pass
|
||||
else:
|
||||
globals.channel_list.append(packet["from"])
|
||||
if packet["from"] not in globals.all_messages:
|
||||
globals.all_messages[packet["from"]] = []
|
||||
ui_state.channel_list.append(packet["from"])
|
||||
if packet["from"] not in ui_state.all_messages:
|
||||
ui_state.all_messages[packet["from"]] = []
|
||||
update_node_info_in_db(packet["from"], chat_archived=False)
|
||||
refresh_channels = True
|
||||
|
||||
channel_number = globals.channel_list.index(packet["from"])
|
||||
channel_number = ui_state.channel_list.index(packet["from"])
|
||||
|
||||
if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
|
||||
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
|
||||
add_notification(channel_number)
|
||||
refresh_channels = True
|
||||
else:
|
||||
@@ -85,15 +129,15 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
message_from_id = packet["from"]
|
||||
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
|
||||
|
||||
if globals.channel_list[channel_number] not in globals.all_messages:
|
||||
globals.all_messages[globals.channel_list[channel_number]] = []
|
||||
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
|
||||
|
||||
# Timestamp handling
|
||||
current_timestamp = time.time()
|
||||
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||
|
||||
# Retrieve the last timestamp if available
|
||||
channel_messages = globals.all_messages[globals.channel_list[channel_number]]
|
||||
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
|
||||
if channel_messages:
|
||||
# Check the last entry for a timestamp
|
||||
for entry in reversed(channel_messages):
|
||||
@@ -107,9 +151,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
|
||||
# Add a new timestamp if it's a new hour
|
||||
if last_hour != current_hour:
|
||||
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
|
||||
|
||||
globals.all_messages[globals.channel_list[channel_number]].append(
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
|
||||
(f"{config.message_prefix} {message_from_string} ", message_string)
|
||||
)
|
||||
|
||||
@@ -118,7 +162,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
if refresh_messages:
|
||||
draw_messages_window(True)
|
||||
|
||||
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
|
||||
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
|
||||
|
||||
except KeyError as e:
|
||||
logging.error(f"Error processing packet: {e}")
|
||||
|
||||
@@ -13,7 +13,8 @@ from contact.utilities.db_handler import (
|
||||
update_node_info_in_db,
|
||||
)
|
||||
import contact.ui.default_config as config
|
||||
import contact.globals as globals
|
||||
|
||||
from contact.utilities.singleton import ui_state, interface_state
|
||||
|
||||
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
|
||||
|
||||
@@ -31,12 +32,12 @@ def onAckNak(packet: Dict[str, Any]) -> None:
|
||||
return
|
||||
|
||||
acknak = ack_naks.pop(request)
|
||||
message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
|
||||
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
|
||||
|
||||
confirm_string = " "
|
||||
ack_type = None
|
||||
if packet["decoded"]["routing"]["errorReason"] == "NONE":
|
||||
if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
|
||||
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
|
||||
confirm_string = config.ack_implicit_str
|
||||
ack_type = "Implicit"
|
||||
else:
|
||||
@@ -46,15 +47,15 @@ def onAckNak(packet: Dict[str, Any]) -> None:
|
||||
confirm_string = config.nak_str
|
||||
ack_type = "Nak"
|
||||
|
||||
globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
|
||||
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
|
||||
config.sent_message_prefix + confirm_string + ": ",
|
||||
message,
|
||||
)
|
||||
|
||||
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
|
||||
|
||||
channel_number = globals.channel_list.index(acknak["channel"])
|
||||
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
|
||||
channel_number = ui_state.channel_list.index(acknak["channel"])
|
||||
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
|
||||
draw_messages_window()
|
||||
|
||||
|
||||
@@ -137,16 +138,16 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
||||
|
||||
msg_str += route_str + "\n" # Print the route back to us
|
||||
|
||||
if packet["from"] not in globals.channel_list:
|
||||
globals.channel_list.append(packet["from"])
|
||||
if packet["from"] not in ui_state.channel_list:
|
||||
ui_state.channel_list.append(packet["from"])
|
||||
refresh_channels = True
|
||||
|
||||
if is_chat_archived(packet["from"]):
|
||||
update_node_info_in_db(packet["from"], chat_archived=False)
|
||||
|
||||
channel_number = globals.channel_list.index(packet["from"])
|
||||
channel_number = ui_state.channel_list.index(packet["from"])
|
||||
|
||||
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
|
||||
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
|
||||
refresh_messages = True
|
||||
else:
|
||||
add_notification(channel_number)
|
||||
@@ -154,9 +155,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
||||
|
||||
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
|
||||
|
||||
if globals.channel_list[channel_number] not in globals.all_messages:
|
||||
globals.all_messages[globals.channel_list[channel_number]] = []
|
||||
globals.all_messages[globals.channel_list[channel_number]].append(
|
||||
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
|
||||
(f"{config.message_prefix} {message_from_string}", msg_str)
|
||||
)
|
||||
|
||||
@@ -164,23 +165,23 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
||||
draw_channel_list()
|
||||
if refresh_messages:
|
||||
draw_messages_window(True)
|
||||
save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str)
|
||||
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
|
||||
|
||||
|
||||
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
|
||||
"""
|
||||
Sends a chat message using the selected channel.
|
||||
"""
|
||||
myid = globals.myNodeNum
|
||||
myid = interface_state.myNodeNum
|
||||
send_on_channel = 0
|
||||
channel_id = globals.channel_list[channel]
|
||||
channel_id = ui_state.channel_list[channel]
|
||||
if isinstance(channel_id, int):
|
||||
send_on_channel = 0
|
||||
destination = channel_id
|
||||
elif isinstance(channel_id, str):
|
||||
send_on_channel = channel
|
||||
|
||||
sent_message_data = globals.interface.sendText(
|
||||
sent_message_data = interface_state.interface.sendText(
|
||||
text=message,
|
||||
destinationId=destination,
|
||||
wantAck=True,
|
||||
@@ -190,15 +191,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
|
||||
)
|
||||
|
||||
# Add sent message to the messages dictionary
|
||||
if channel_id not in globals.all_messages:
|
||||
globals.all_messages[channel_id] = []
|
||||
if channel_id not in ui_state.all_messages:
|
||||
ui_state.all_messages[channel_id] = []
|
||||
|
||||
# Handle timestamp logic
|
||||
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
|
||||
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||
|
||||
# Retrieve the last timestamp if available
|
||||
channel_messages = globals.all_messages[channel_id]
|
||||
channel_messages = ui_state.all_messages[channel_id]
|
||||
if channel_messages:
|
||||
# Check the last entry for a timestamp
|
||||
for entry in reversed(channel_messages):
|
||||
@@ -212,15 +213,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
|
||||
|
||||
# Add a new timestamp if it's a new hour
|
||||
if last_hour != current_hour:
|
||||
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
||||
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
||||
|
||||
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
|
||||
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
|
||||
|
||||
timestamp = save_message_to_db(channel_id, myid, message)
|
||||
|
||||
ack_naks[sent_message_data.id] = {
|
||||
"channel": channel_id,
|
||||
"messageIndex": len(globals.all_messages[channel_id]) - 1,
|
||||
"messageIndex": len(ui_state.all_messages[channel_id]) - 1,
|
||||
"timestamp": timestamp,
|
||||
}
|
||||
|
||||
@@ -230,9 +231,9 @@ def send_traceroute() -> None:
|
||||
Sends a RouteDiscovery protobuf to the selected node.
|
||||
"""
|
||||
r = mesh_pb2.RouteDiscovery()
|
||||
globals.interface.sendData(
|
||||
interface_state.interface.sendData(
|
||||
r,
|
||||
destinationId=globals.node_list[globals.selected_node],
|
||||
destinationId=ui_state.node_list[ui_state.selected_node],
|
||||
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
|
||||
wantResponse=True,
|
||||
onResponse=on_response_traceroute,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -121,7 +121,7 @@ def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.wind
|
||||
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
|
||||
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
|
||||
|
||||
draw_arrows(menu_win, visible_height, max_index, menu_state.start_index, show_save_option=False)
|
||||
draw_arrows(menu_win, visible_height, max_index, menu_state.start_index, menu_state.show_save_option)
|
||||
|
||||
return menu_win, menu_pad
|
||||
|
||||
|
||||
@@ -123,15 +123,18 @@ def initialize_config() -> Dict[str, object]:
|
||||
"settings_breadcrumbs": ["green", "black"],
|
||||
"settings_warning": ["green", "black"],
|
||||
"settings_note": ["green", "black"],
|
||||
"node_favorite": ["cyan", "white"],
|
||||
"node_ignored": ["red", "white"],
|
||||
"node_favorite": ["cyan", "green"],
|
||||
"node_ignored": ["red", "black"],
|
||||
}
|
||||
default_config_variables = {
|
||||
"channel_list_16ths": "3",
|
||||
"node_list_16ths": "5",
|
||||
"db_file_path": db_file_path,
|
||||
"log_file_path": log_file_path,
|
||||
"message_prefix": ">>",
|
||||
"sent_message_prefix": ">> Sent",
|
||||
"notification_symbol": "*",
|
||||
"notification_sound": "True",
|
||||
"ack_implicit_str": "[◌]",
|
||||
"ack_str": "[✓]",
|
||||
"nak_str": "[x]",
|
||||
@@ -170,18 +173,23 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
|
||||
|
||||
global db_file_path, log_file_path, message_prefix, sent_message_prefix
|
||||
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
|
||||
global node_list_16ths, channel_list_16ths
|
||||
global theme, COLOR_CONFIG
|
||||
global node_sort
|
||||
global node_sort, notification_sound
|
||||
|
||||
channel_list_16ths = loaded_config["channel_list_16ths"]
|
||||
node_list_16ths = loaded_config["node_list_16ths"]
|
||||
db_file_path = loaded_config["db_file_path"]
|
||||
log_file_path = loaded_config["log_file_path"]
|
||||
message_prefix = loaded_config["message_prefix"]
|
||||
sent_message_prefix = loaded_config["sent_message_prefix"]
|
||||
notification_symbol = loaded_config["notification_symbol"]
|
||||
notification_sound = loaded_config["notification_sound"]
|
||||
ack_implicit_str = loaded_config["ack_implicit_str"]
|
||||
ack_str = loaded_config["ack_str"]
|
||||
nak_str = loaded_config["nak_str"]
|
||||
ack_unknown_str = loaded_config["ack_unknown_str"]
|
||||
node_sort = loaded_config["node_sort"]
|
||||
theme = loaded_config["theme"]
|
||||
if theme == "dark":
|
||||
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
|
||||
@@ -189,7 +197,6 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
|
||||
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
|
||||
elif theme == "green":
|
||||
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
|
||||
node_sort = loaded_config["node_sort"]
|
||||
|
||||
|
||||
# Call the function when the script is imported
|
||||
|
||||
@@ -3,6 +3,18 @@ import re
|
||||
from contact.ui.colors import get_color
|
||||
from contact.utilities.control_utils import transform_menu_path
|
||||
from typing import Any, Optional, List, Dict
|
||||
from contact.utilities.singleton import interface_state, ui_state
|
||||
|
||||
|
||||
def get_node_color(node_index: int, reverse: bool = False):
|
||||
node_num = ui_state.node_list[node_index]
|
||||
node = interface_state.interface.nodesByNum.get(node_num, {})
|
||||
if node.get("isFavorite"):
|
||||
return get_color("node_favorite", reverse=reverse)
|
||||
elif node.get("isIgnored"):
|
||||
return get_color("node_ignored", reverse=reverse)
|
||||
return get_color("settings_default", reverse=reverse)
|
||||
|
||||
|
||||
# Aliases
|
||||
Segment = tuple[str, str, bool, bool]
|
||||
@@ -121,14 +133,13 @@ def move_highlight(
|
||||
menu_win.getbegyx()[1],
|
||||
)
|
||||
|
||||
draw_arrows(menu_win, visible_height, max_index, start_index, show_save_option=False)
|
||||
draw_arrows(menu_win, visible_height, max_index, start_index, show_save_option)
|
||||
|
||||
|
||||
def draw_arrows(
|
||||
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
|
||||
) -> None:
|
||||
|
||||
# vh = visible_height + (1 if show_save_option else 0)
|
||||
mi = max_index - (2 if show_save_option else 0)
|
||||
|
||||
if visible_height < mi:
|
||||
@@ -316,3 +327,91 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
|
||||
wrapped_lines.append(line_buffer)
|
||||
|
||||
return wrapped_lines
|
||||
|
||||
|
||||
def move_main_highlight(
|
||||
old_idx: int, new_idx, options: List[str], menu_win: curses.window, menu_pad: curses.window, ui_state: object
|
||||
) -> None:
|
||||
|
||||
if old_idx == new_idx: # No-op
|
||||
return
|
||||
|
||||
max_index = len(options) - 1
|
||||
visible_height = menu_win.getmaxyx()[0] - 2
|
||||
|
||||
if new_idx < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
|
||||
ui_state.start_index[ui_state.current_window] = new_idx
|
||||
elif new_idx >= ui_state.start_index[ui_state.current_window] + visible_height: # Moving below the visible area
|
||||
ui_state.start_index[ui_state.current_window] = new_idx - visible_height + 1
|
||||
|
||||
# Ensure start_index is within bounds
|
||||
ui_state.start_index[ui_state.current_window] = max(
|
||||
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
|
||||
)
|
||||
|
||||
highlight_line(menu_win, menu_pad, old_idx, new_idx, visible_height)
|
||||
|
||||
if ui_state.current_window == 0: # hack to fix max_index
|
||||
max_index += 1
|
||||
|
||||
draw_main_arrows(menu_win, max_index, window=ui_state.current_window)
|
||||
menu_win.refresh()
|
||||
|
||||
|
||||
def highlight_line(
|
||||
menu_win: curses.window, menu_pad: curses.window, old_idx: int, new_idx: int, visible_height: int
|
||||
) -> None:
|
||||
|
||||
if ui_state.current_window == 0:
|
||||
color_old = (
|
||||
get_color("channel_selected") if old_idx == ui_state.selected_channel else get_color("channel_list")
|
||||
)
|
||||
color_new = get_color("channel_list", reverse=True) if True else get_color("channel_list", reverse=True)
|
||||
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, color_old)
|
||||
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
|
||||
|
||||
elif ui_state.current_window == 2:
|
||||
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
|
||||
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
|
||||
|
||||
menu_win.refresh()
|
||||
|
||||
# Refresh pad only if scrolling is needed
|
||||
menu_pad.refresh(
|
||||
ui_state.start_index[ui_state.current_window],
|
||||
0,
|
||||
menu_win.getbegyx()[0] + 1,
|
||||
menu_win.getbegyx()[1] + 1,
|
||||
menu_win.getbegyx()[0] + visible_height,
|
||||
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 3,
|
||||
)
|
||||
|
||||
|
||||
def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None:
|
||||
|
||||
height, width = win.getmaxyx()
|
||||
usable_height = height - 2
|
||||
usable_width = width - 2
|
||||
|
||||
if window == 1 and ui_state.display_log:
|
||||
if log_height := kwargs.get("log_height"):
|
||||
usable_height -= log_height - 1
|
||||
|
||||
if usable_height < max_index:
|
||||
if ui_state.start_index[window] > 0:
|
||||
win.addstr(1, usable_width, "▲", get_color("settings_default"))
|
||||
else:
|
||||
win.addstr(1, usable_width, " ", get_color("settings_default"))
|
||||
|
||||
if max_index - ui_state.start_index[window] - 1 >= usable_height:
|
||||
win.addstr(usable_height, usable_width, "▼", get_color("settings_default"))
|
||||
else:
|
||||
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
|
||||
else:
|
||||
win.addstr(1, usable_width, " ", get_color("settings_default"))
|
||||
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
|
||||
|
||||
|
||||
def get_msg_window_lines(messages_win, packetlog_win) -> None:
|
||||
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0
|
||||
return messages_win.getmaxyx()[0] - 2 - packetlog_height
|
||||
|
||||
@@ -1,11 +1,42 @@
|
||||
from typing import Any, Union, List, Dict
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass
|
||||
class MenuState:
|
||||
def __init__(self):
|
||||
self.menu_index: List[int] = [] # Row we left the previous menus
|
||||
self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit
|
||||
self.selected_index: int = 0 # Selected Row
|
||||
self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu
|
||||
self.menu_path: List[str] = [] # Menu Path
|
||||
self.show_save_option: bool = False # Display 'Save'
|
||||
menu_index: List[int] = field(default_factory=list)
|
||||
start_index: List[int] = field(default_factory=lambda: [0])
|
||||
selected_index: int = 0
|
||||
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict)
|
||||
menu_path: List[str] = field(default_factory=list)
|
||||
show_save_option: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChatUIState:
|
||||
display_log: bool = False
|
||||
channel_list: List[str] = field(default_factory=list)
|
||||
all_messages: Dict[str, List[str]] = field(default_factory=dict)
|
||||
notifications: List[str] = field(default_factory=list)
|
||||
packet_buffer: List[str] = field(default_factory=list)
|
||||
node_list: List[str] = field(default_factory=list)
|
||||
selected_channel: int = 0
|
||||
selected_message: int = 0
|
||||
selected_node: int = 0
|
||||
current_window: int = 0
|
||||
|
||||
selected_index: int = 0
|
||||
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
|
||||
show_save_option: bool = False
|
||||
menu_path: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class InterfaceState:
|
||||
interface: Any = None
|
||||
myNodeNum: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppState:
|
||||
lock: Any = None
|
||||
|
||||
@@ -55,10 +55,15 @@ def edit_value(key: str, current_value: str) -> str:
|
||||
# Load theme names dynamically from the JSON
|
||||
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
|
||||
return get_list_input("Select Theme", current_value, theme_options)
|
||||
|
||||
elif key == "node_sort":
|
||||
sort_options = ["lastHeard", "name", "hops"]
|
||||
return get_list_input("Sort By", current_value, sort_options)
|
||||
|
||||
elif key == "notification_sound":
|
||||
sound_options = ["True", "False"]
|
||||
return get_list_input("Notification Sound", current_value, sound_options)
|
||||
|
||||
# Standard Input Mode (Scrollable)
|
||||
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
|
||||
curses.curs_set(1)
|
||||
@@ -176,7 +181,7 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
|
||||
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
|
||||
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
|
||||
|
||||
draw_arrows(menu_win, visible_height, max_index, menu_state.start_index, show_save_option=False)
|
||||
draw_arrows(menu_win, visible_height, max_index, menu_state.start_index, menu_state.show_save_option)
|
||||
|
||||
return menu_win, menu_pad, options
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import yaml
|
||||
import logging
|
||||
import time
|
||||
from typing import List
|
||||
from google.protobuf.json_format import MessageToDict
|
||||
from meshtastic import mt_config
|
||||
@@ -133,24 +134,29 @@ def config_import(interface, filename):
|
||||
logging.info(f"Setting device owner to {configuration['owner']}")
|
||||
waitForAckNak = True
|
||||
interface.getNode("^local", False).setOwner(configuration["owner"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "owner_short" in configuration:
|
||||
logging.info(f"Setting device owner short to {configuration['owner_short']}")
|
||||
waitForAckNak = True
|
||||
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "ownerShort" in configuration:
|
||||
logging.info(f"Setting device owner short to {configuration['ownerShort']}")
|
||||
waitForAckNak = True
|
||||
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "channel_url" in configuration:
|
||||
logging.info(f"Setting channel url to {configuration['channel_url']}")
|
||||
interface.getNode("^local").setURL(configuration["channel_url"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "channelUrl" in configuration:
|
||||
logging.info(f"Setting channel url to {configuration['channelUrl']}")
|
||||
interface.getNode("^local").setURL(configuration["channelUrl"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "location" in configuration:
|
||||
alt = 0
|
||||
@@ -169,12 +175,14 @@ def config_import(interface, filename):
|
||||
logging.info(f"Fixing longitude at {lon} degrees")
|
||||
logging.info("Setting device position")
|
||||
interface.localNode.setFixedPosition(lat, lon, alt)
|
||||
time.sleep(0.5)
|
||||
|
||||
if "config" in configuration:
|
||||
localConfig = interface.getNode("^local").localConfig
|
||||
for section in configuration["config"]:
|
||||
traverseConfig(section, configuration["config"][section], localConfig)
|
||||
interface.getNode("^local").writeConfig(camel_to_snake(section))
|
||||
time.sleep(0.5)
|
||||
|
||||
if "module_config" in configuration:
|
||||
moduleConfig = interface.getNode("^local").moduleConfig
|
||||
@@ -185,6 +193,7 @@ def config_import(interface, filename):
|
||||
moduleConfig,
|
||||
)
|
||||
interface.getNode("^local").writeConfig(camel_to_snake(section))
|
||||
time.sleep(0.5)
|
||||
|
||||
interface.getNode("^local", False).commitSettingsTransaction()
|
||||
logging.info("Writing modified configuration to device")
|
||||
|
||||
@@ -6,12 +6,14 @@ from typing import Optional, Union, Dict
|
||||
|
||||
from contact.utilities.utils import decimal_to_hex
|
||||
import contact.ui.default_config as config
|
||||
import contact.globals as globals
|
||||
|
||||
|
||||
from contact.utilities.singleton import ui_state, interface_state
|
||||
|
||||
|
||||
def get_table_name(channel: str) -> str:
|
||||
# Construct the table name
|
||||
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
|
||||
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages"
|
||||
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
|
||||
return quoted_table_name
|
||||
|
||||
@@ -61,7 +63,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
|
||||
message_text = ?
|
||||
"""
|
||||
|
||||
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
|
||||
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message))
|
||||
db_connection.commit()
|
||||
|
||||
except sqlite3.Error as e:
|
||||
@@ -72,13 +74,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
|
||||
|
||||
|
||||
def load_messages_from_db() -> None:
|
||||
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
|
||||
"""Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list."""
|
||||
try:
|
||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||
db_cursor = db_connection.cursor()
|
||||
|
||||
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
|
||||
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
|
||||
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",))
|
||||
tables = [row[0] for row in db_cursor.fetchall()]
|
||||
|
||||
# Iterate through each table and fetch its messages
|
||||
@@ -104,15 +106,15 @@ def load_messages_from_db() -> None:
|
||||
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
|
||||
channel = int(channel) if channel.isdigit() else channel
|
||||
|
||||
# Add the channel to globals.channel_list if not already present
|
||||
if channel not in globals.channel_list and not is_chat_archived(channel):
|
||||
globals.channel_list.append(channel)
|
||||
# Add the channel to ui_state.channel_list if not already present
|
||||
if channel not in ui_state.channel_list and not is_chat_archived(channel):
|
||||
ui_state.channel_list.append(channel)
|
||||
|
||||
# Ensure the channel exists in globals.all_messages
|
||||
if channel not in globals.all_messages:
|
||||
globals.all_messages[channel] = []
|
||||
# Ensure the channel exists in ui_state.all_messages
|
||||
if channel not in ui_state.all_messages:
|
||||
ui_state.all_messages[channel] = []
|
||||
|
||||
# Add messages to globals.all_messages grouped by hourly timestamp
|
||||
# Add messages to ui_state.all_messages grouped by hourly timestamp
|
||||
hourly_messages = {}
|
||||
for user_id, message, timestamp, ack_type in db_messages:
|
||||
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
|
||||
@@ -127,7 +129,7 @@ def load_messages_from_db() -> None:
|
||||
elif ack_type == "Nak":
|
||||
ack_str = config.nak_str
|
||||
|
||||
if user_id == str(globals.myNodeNum):
|
||||
if user_id == str(interface_state.myNodeNum):
|
||||
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
|
||||
else:
|
||||
formatted_message = (
|
||||
@@ -137,10 +139,10 @@ def load_messages_from_db() -> None:
|
||||
|
||||
hourly_messages[hour].append(formatted_message)
|
||||
|
||||
# Flatten the hourly messages into globals.all_messages[channel]
|
||||
# Flatten the hourly messages into ui_state.all_messages[channel]
|
||||
for hour, messages in sorted(hourly_messages.items()):
|
||||
globals.all_messages[channel].append((f"-- {hour} --", ""))
|
||||
globals.all_messages[channel].extend(messages)
|
||||
ui_state.all_messages[channel].append((f"-- {hour} --", ""))
|
||||
ui_state.all_messages[channel].extend(messages)
|
||||
|
||||
except sqlite3.Error as e:
|
||||
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
|
||||
@@ -153,11 +155,11 @@ def init_nodedb() -> None:
|
||||
"""Initialize the node database and update it with nodes from the interface."""
|
||||
|
||||
try:
|
||||
if not globals.interface.nodes:
|
||||
if not interface_state.interface.nodes:
|
||||
return # No nodes to initialize
|
||||
|
||||
ensure_node_table_exists() # Ensure the table exists before insertion
|
||||
nodes_snapshot = list(globals.interface.nodes.values())
|
||||
nodes_snapshot = list(interface_state.interface.nodes.values())
|
||||
|
||||
# Insert or update all nodes
|
||||
for node in nodes_snapshot:
|
||||
@@ -214,7 +216,7 @@ def update_node_info_in_db(
|
||||
|
||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||
db_cursor = db_connection.cursor()
|
||||
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
|
||||
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names
|
||||
|
||||
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
|
||||
if "chat_archived" not in table_columns:
|
||||
@@ -278,7 +280,7 @@ def update_node_info_in_db(
|
||||
|
||||
def ensure_node_table_exists() -> None:
|
||||
"""Ensure the node database table exists."""
|
||||
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
|
||||
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety
|
||||
schema = """
|
||||
user_id TEXT PRIMARY KEY,
|
||||
long_name TEXT,
|
||||
@@ -319,7 +321,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
|
||||
db_cursor = db_connection.cursor()
|
||||
|
||||
# Construct table name
|
||||
table_name = f"{str(globals.myNodeNum)}_nodedb"
|
||||
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
|
||||
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
|
||||
|
||||
# Determine the correct column to fetch
|
||||
@@ -345,7 +347,7 @@ def is_chat_archived(user_id: int) -> int:
|
||||
try:
|
||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||
db_cursor = db_connection.cursor()
|
||||
table_name = f"{str(globals.myNodeNum)}_nodedb"
|
||||
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
|
||||
nodeinfo_table = f'"{table_name}"'
|
||||
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
|
||||
db_cursor.execute(query, (user_id,))
|
||||
|
||||
@@ -104,11 +104,13 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
return [base64.b64encode(b).decode() for b in byte_strings]
|
||||
|
||||
def is_valid_base64(s):
|
||||
"""Check if a string is valid Base64."""
|
||||
"""Check if a string is valid Base64 or blank."""
|
||||
if s == "":
|
||||
return True
|
||||
try:
|
||||
decoded = base64.b64decode(s, validate=True)
|
||||
return len(decoded) == 32 # Ensure it's exactly 32 bytes
|
||||
except binascii.Error:
|
||||
except (binascii.Error, ValueError):
|
||||
return False
|
||||
|
||||
cvalue = to_base64(current_value) # Convert current values to Base64
|
||||
|
||||
5
contact/utilities/singleton.py
Normal file
5
contact/utilities/singleton.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState
|
||||
|
||||
ui_state = ChatUIState()
|
||||
interface_state = InterfaceState()
|
||||
app_state = AppState()
|
||||
@@ -1,16 +1,17 @@
|
||||
import contact.globals as globals
|
||||
import datetime
|
||||
from meshtastic.protobuf import config_pb2
|
||||
import contact.ui.default_config as config
|
||||
|
||||
from contact.utilities.singleton import ui_state, interface_state
|
||||
|
||||
|
||||
def get_channels():
|
||||
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
|
||||
node = globals.interface.getNode("^local")
|
||||
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages."""
|
||||
node = interface_state.interface.getNode("^local")
|
||||
device_channels = node.channels
|
||||
|
||||
# Clear and rebuild channel list
|
||||
# globals.channel_list = []
|
||||
# ui_state.channel_list = []
|
||||
|
||||
for device_channel in device_channels:
|
||||
if device_channel.role:
|
||||
@@ -26,20 +27,20 @@ def get_channels():
|
||||
].name
|
||||
channel_name = convert_to_camel_case(modem_preset_string)
|
||||
|
||||
# Add channel to globals.channel_list if not already present
|
||||
if channel_name not in globals.channel_list:
|
||||
globals.channel_list.append(channel_name)
|
||||
# Add channel to ui_state.channel_list if not already present
|
||||
if channel_name not in ui_state.channel_list:
|
||||
ui_state.channel_list.append(channel_name)
|
||||
|
||||
# Initialize globals.all_messages[channel_name] if it doesn't exist
|
||||
if channel_name not in globals.all_messages:
|
||||
globals.all_messages[channel_name] = []
|
||||
# Initialize ui_state.all_messages[channel_name] if it doesn't exist
|
||||
if channel_name not in ui_state.all_messages:
|
||||
ui_state.all_messages[channel_name] = []
|
||||
|
||||
return globals.channel_list
|
||||
return ui_state.channel_list
|
||||
|
||||
|
||||
def get_node_list():
|
||||
if globals.interface.nodes:
|
||||
my_node_num = globals.myNodeNum
|
||||
if interface_state.interface.nodes:
|
||||
my_node_num = interface_state.myNodeNum
|
||||
|
||||
def node_sort(node):
|
||||
if config.node_sort == "lastHeard":
|
||||
@@ -51,7 +52,7 @@ def get_node_list():
|
||||
else:
|
||||
return node
|
||||
|
||||
sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
|
||||
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort)
|
||||
|
||||
# Move favorite nodes to the beginning
|
||||
sorted_nodes = sorted(
|
||||
@@ -68,14 +69,14 @@ def get_node_list():
|
||||
|
||||
def refresh_node_list():
|
||||
new_node_list = get_node_list()
|
||||
if new_node_list != globals.node_list:
|
||||
globals.node_list = new_node_list
|
||||
if new_node_list != ui_state.node_list:
|
||||
ui_state.node_list = new_node_list
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_nodeNum():
|
||||
myinfo = globals.interface.getMyNodeInfo()
|
||||
myinfo = interface_state.interface.getMyNodeInfo()
|
||||
myNodeNum = myinfo["num"]
|
||||
return myNodeNum
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "contact"
|
||||
version = "1.3.6"
|
||||
version = "1.3.10"
|
||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||
authors = [
|
||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||
|
||||
Reference in New Issue
Block a user