Compare commits

..

5 Commits

Author SHA1 Message Date
pdxlocations
478f017de1 bump version 2025-05-18 14:43:47 -07:00
pdxlocations
c96c4edb01 Add Arrows to Main UI (#177)
* init

* convert globals to dataclass

* move lock to app state

* Almost working changes

* more almost working changes

* so close

* mostly working changes

* closer changes

* I think it works!

* working changes

* hack fix

* Merge branch 'main' into refactor-chat-ui

* clean-up
2025-05-18 12:28:03 -07:00
pdxlocations
cc416476f5 Merge pull request #174 from rfschmid/patch-1 2025-04-25 21:20:28 -07:00
Russell Schmidt
7fc1cbc3a9 Fix error "No module named 'ui.ui_state'"
Was unable to run locally at tips due to an import not including the package name.
2025-04-23 17:02:47 -05:00
pdxlocations
78f0775ad5 Convert Globals to Class (#173)
* init

* convert globals to dataclass

* move lock to app state
2025-04-19 21:37:54 -07:00
11 changed files with 453 additions and 340 deletions

View File

@@ -24,7 +24,6 @@ import traceback
from pubsub import pub
# Local application
import contact.globals as globals
import contact.ui.default_config as config
from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region
@@ -36,7 +35,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input
from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
from contact.utilities.singleton import ui_state, interface_state, app_state
# ------------------------------------------------------------------------------
# Environment & Logging Setup
@@ -52,7 +51,7 @@ logging.basicConfig(
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
globals.lock = threading.Lock()
app_state.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
@@ -61,19 +60,19 @@ globals.lock = threading.Lock()
def initialize_globals(args) -> None:
"""Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
interface_state.interface = initialize_interface(args)
# Prompt for region if unset
if globals.interface.localNode.localConfig.lora.region == 0:
if interface_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
ui_state.node_list = get_node_list()
pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb()
@@ -96,7 +95,7 @@ def main(stdscr: curses.window) -> None:
return
logging.info("Initializing interface...")
with globals.lock:
with app_state.lock:
initialize_globals(args)
logging.info("Starting main UI")

View File

@@ -1,13 +0,0 @@
interface = None
lock = None
display_log = False
all_messages = {}
channel_list = []
notifications = []
packet_buffer = []
node_list = []
myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0
current_window = 0

View File

@@ -18,7 +18,8 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state, app_state
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
@@ -29,14 +30,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with globals.lock:
with app_state.lock:
# Update packet log
globals.packet_buffer.append(packet)
if len(globals.packet_buffer) > 20:
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
globals.packet_buffer = globals.packet_buffer[-20:]
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if globals.display_log:
if ui_state.display_log:
draw_packetlog_win()
try:
if "decoded" not in packet:
@@ -63,19 +64,19 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
else:
channel_number = 0
if packet["to"] == globals.myNodeNum:
if packet["from"] in globals.channel_list:
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:
globals.channel_list.append(packet["from"])
if packet["from"] not in globals.all_messages:
globals.all_messages[packet["from"]] = []
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = globals.channel_list.index(packet["from"])
channel_number = ui_state.channel_list.index(packet["from"])
if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
@@ -85,15 +86,15 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[globals.channel_list[channel_number]]
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
@@ -107,9 +108,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
globals.all_messages[globals.channel_list[channel_number]].append(
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
@@ -118,7 +119,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
if refresh_messages:
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -13,7 +13,8 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -31,12 +32,12 @@ def onAckNak(packet: Dict[str, Any]) -> None:
return
acknak = ack_naks.pop(request)
message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
confirm_string = " "
ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str
ack_type = "Implicit"
else:
@@ -46,15 +47,15 @@ def onAckNak(packet: Dict[str, Any]) -> None:
confirm_string = config.nak_str
ack_type = "Nak"
globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
config.sent_message_prefix + confirm_string + ": ",
message,
)
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
channel_number = globals.channel_list.index(acknak["channel"])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
channel_number = ui_state.channel_list.index(acknak["channel"])
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
draw_messages_window()
@@ -137,16 +138,16 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
msg_str += route_str + "\n" # Print the route back to us
if packet["from"] not in globals.channel_list:
globals.channel_list.append(packet["from"])
if packet["from"] not in ui_state.channel_list:
ui_state.channel_list.append(packet["from"])
refresh_channels = True
if is_chat_archived(packet["from"]):
update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = globals.channel_list.index(packet["from"])
channel_number = ui_state.channel_list.index(packet["from"])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
@@ -154,9 +155,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append(
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
@@ -164,23 +165,23 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
Sends a chat message using the selected channel.
"""
myid = globals.myNodeNum
myid = interface_state.myNodeNum
send_on_channel = 0
channel_id = globals.channel_list[channel]
channel_id = ui_state.channel_list[channel]
if isinstance(channel_id, int):
send_on_channel = 0
destination = channel_id
elif isinstance(channel_id, str):
send_on_channel = channel
sent_message_data = globals.interface.sendText(
sent_message_data = interface_state.interface.sendText(
text=message,
destinationId=destination,
wantAck=True,
@@ -190,15 +191,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
)
# Add sent message to the messages dictionary
if channel_id not in globals.all_messages:
globals.all_messages[channel_id] = []
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[channel_id]
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
@@ -212,15 +213,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message)
ack_naks[sent_message_data.id] = {
"channel": channel_id,
"messageIndex": len(globals.all_messages[channel_id]) - 1,
"messageIndex": len(ui_state.all_messages[channel_id]) - 1,
"timestamp": timestamp,
}
@@ -230,9 +231,9 @@ def send_traceroute() -> None:
Sends a RouteDiscovery protobuf to the selected node.
"""
r = mesh_pb2.RouteDiscovery()
globals.interface.sendData(
interface_state.interface.sendData(
r,
destinationId=globals.node_list[globals.selected_node],
destinationId=ui_state.node_list[ui_state.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,

View File

@@ -2,7 +2,7 @@ import curses
import textwrap
import logging
import traceback
from typing import Union, List
from typing import Union
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu
@@ -12,7 +12,8 @@ from contact.utilities.db_handler import get_name_from_database, update_node_inf
from contact.utilities.input_handlers import get_list_input
import contact.ui.default_config as config
import contact.ui.dialog
import contact.globals as globals
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines
from contact.utilities.singleton import ui_state, interface_state
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
@@ -84,6 +85,7 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
draw_channel_list()
draw_messages_window(True)
draw_node_list()
except:
# Resize events can come faster than we can re-draw, which can cause a curses error.
# In this case we'll see another curses.KEY_RESIZE in our key handler and draw again later.
@@ -106,129 +108,114 @@ def main_ui(stdscr: curses.window) -> None:
# draw_debug(f"Keypress: {char}")
if char == curses.KEY_UP:
if globals.current_window == 0:
if ui_state.current_window == 0:
scroll_channels(-1)
elif globals.current_window == 1:
elif ui_state.current_window == 1:
scroll_messages(-1)
elif globals.current_window == 2:
elif ui_state.current_window == 2:
scroll_nodes(-1)
elif char == curses.KEY_DOWN:
if globals.current_window == 0:
if ui_state.current_window == 0:
scroll_channels(1)
elif globals.current_window == 1:
elif ui_state.current_window == 1:
scroll_messages(1)
elif globals.current_window == 2:
elif ui_state.current_window == 2:
scroll_nodes(1)
elif char == curses.KEY_HOME:
if globals.current_window == 0:
if ui_state.current_window == 0:
select_channel(0)
elif globals.current_window == 1:
globals.selected_message = 0
elif ui_state.current_window == 1:
ui_state.selected_message = 0
refresh_pad(1)
elif globals.current_window == 2:
elif ui_state.current_window == 2:
select_node(0)
elif char == curses.KEY_END:
if globals.current_window == 0:
select_channel(len(globals.channel_list) - 1)
elif globals.current_window == 1:
if ui_state.current_window == 0:
select_channel(len(ui_state.channel_list) - 1)
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
globals.selected_message = max(msg_line_count - get_msg_window_lines(), 0)
ui_state.selected_message = max(msg_line_count - get_msg_window_lines(messages_win, packetlog_win), 0)
refresh_pad(1)
elif globals.current_window == 2:
select_node(len(globals.node_list) - 1)
elif ui_state.current_window == 2:
select_node(len(ui_state.node_list) - 1)
elif char == curses.KEY_PPAGE:
if globals.current_window == 0:
if ui_state.current_window == 0:
select_channel(
globals.selected_channel - (channel_win.getmaxyx()[0] - 2)
ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif globals.current_window == 1:
globals.selected_message = max(globals.selected_message - get_msg_window_lines(), 0)
elif ui_state.current_window == 1:
ui_state.selected_message = max(
ui_state.selected_message - get_msg_window_lines(messages_win, packetlog_win), 0
)
refresh_pad(1)
elif globals.current_window == 2:
elif ui_state.current_window == 2:
select_node(
globals.selected_node - (nodes_win.getmaxyx()[0] - 2)
ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2)
) # select_node will bounds check for us
elif char == curses.KEY_NPAGE:
if globals.current_window == 0:
if ui_state.current_window == 0:
select_channel(
globals.selected_channel + (channel_win.getmaxyx()[0] - 2)
ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
elif globals.current_window == 1:
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
globals.selected_message = min(
globals.selected_message + get_msg_window_lines(), msg_line_count - get_msg_window_lines()
ui_state.selected_message = min(
ui_state.selected_message + get_msg_window_lines(messages_win, packetlog_win),
msg_line_count - get_msg_window_lines(messages_win, packetlog_win),
)
refresh_pad(1)
elif globals.current_window == 2:
elif ui_state.current_window == 2:
select_node(
globals.selected_node + (nodes_win.getmaxyx()[0] - 2)
ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2)
) # select_node will bounds check for us
elif char == curses.KEY_LEFT or char == curses.KEY_RIGHT:
delta = -1 if char == curses.KEY_LEFT else 1
old_window = globals.current_window
globals.current_window = (globals.current_window + delta) % 3
old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3
if old_window == 0:
channel_win.attrset(get_color("window_frame"))
channel_win.box()
channel_win.refresh()
highlight_line(False, 0, globals.selected_channel)
refresh_pad(0)
setup_arrows(channel_win, channel_pad)
channel_win.refresh()
if old_window == 1:
messages_win.attrset(get_color("window_frame"))
messages_win.box()
messages_win.refresh()
refresh_pad(1)
setup_arrows(messages_win, messages_pad)
messages_win.refresh()
elif old_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame"))
nodes_win.box()
nodes_win.refresh()
highlight_line(False, 2, globals.selected_node)
refresh_pad(2)
setup_arrows(nodes_win, nodes_pad)
nodes_win.refresh()
if globals.current_window == 0:
if ui_state.current_window == 0:
channel_win.attrset(get_color("window_frame_selected"))
channel_win.box()
channel_win.attrset(get_color("window_frame"))
channel_win.refresh()
highlight_line(True, 0, globals.selected_channel)
refresh_pad(0)
setup_arrows(channel_win, channel_pad)
channel_win.refresh()
elif globals.current_window == 1:
elif ui_state.current_window == 1:
messages_win.attrset(get_color("window_frame_selected"))
messages_win.box()
messages_win.attrset(get_color("window_frame"))
messages_win.refresh()
refresh_pad(1)
setup_arrows(messages_win, messages_pad)
messages_win.refresh()
elif globals.current_window == 2:
elif ui_state.current_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame_selected"))
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
nodes_win.refresh()
highlight_line(True, 2, globals.selected_node)
refresh_pad(2)
setup_arrows(nodes_win, nodes_pad)
nodes_win.refresh()
# Check for Esc
elif char == chr(27):
@@ -247,20 +234,20 @@ def main_ui(stdscr: curses.window) -> None:
handle_resize(stdscr, False)
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
if globals.current_window == 2:
node_list = globals.node_list
if node_list[globals.selected_node] not in globals.channel_list:
globals.channel_list.append(node_list[globals.selected_node])
if node_list[globals.selected_node] not in globals.all_messages:
globals.all_messages[node_list[globals.selected_node]] = []
if ui_state.current_window == 2:
node_list = ui_state.node_list
if node_list[ui_state.selected_node] not in ui_state.channel_list:
ui_state.channel_list.append(node_list[ui_state.selected_node])
if node_list[ui_state.selected_node] not in ui_state.all_messages:
ui_state.all_messages[node_list[ui_state.selected_node]] = []
globals.selected_channel = globals.channel_list.index(node_list[globals.selected_node])
ui_state.selected_channel = ui_state.channel_list.index(node_list[ui_state.selected_node])
if is_chat_archived(globals.channel_list[globals.selected_channel]):
update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=False)
if is_chat_archived(ui_state.channel_list[ui_state.selected_channel]):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=False)
globals.selected_node = 0
globals.current_window = 0
ui_state.selected_node = 0
ui_state.current_window = 0
draw_node_list()
draw_channel_list()
@@ -268,7 +255,7 @@ def main_ui(stdscr: curses.window) -> None:
elif len(input_text) > 0:
# Enter key pressed, send user input as message
send_message(input_text, channel=globals.selected_channel)
send_message(input_text, channel=ui_state.selected_channel)
draw_messages_window(True)
# Clear entry window and reset input text
@@ -286,18 +273,18 @@ def main_ui(stdscr: curses.window) -> None:
elif char == "`": # ` Launch the settings interface
curses.curs_set(0)
settings_menu(stdscr, globals.interface)
settings_menu(stdscr, interface_state.interface)
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
elif char == chr(16):
# Display packet log
if globals.display_log is False:
globals.display_log = True
if ui_state.display_log is False:
ui_state.display_log = True
draw_messages_window(True)
else:
globals.display_log = False
ui_state.display_log = False
packetlog_win.erase()
draw_messages_window(True)
@@ -307,39 +294,39 @@ def main_ui(stdscr: curses.window) -> None:
# ^D
elif char == chr(4):
if globals.current_window == 0:
if isinstance(globals.channel_list[globals.selected_channel], int):
update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=True)
if ui_state.current_window == 0:
if isinstance(ui_state.channel_list[ui_state.selected_channel], int):
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=True)
# Shift notifications up to account for deleted item
for i in range(len(globals.notifications)):
if globals.notifications[i] > globals.selected_channel:
globals.notifications[i] -= 1
for i in range(len(ui_state.notifications)):
if ui_state.notifications[i] > ui_state.selected_channel:
ui_state.notifications[i] -= 1
del globals.channel_list[globals.selected_channel]
globals.selected_channel = min(globals.selected_channel, len(globals.channel_list) - 1)
select_channel(globals.selected_channel)
del ui_state.channel_list[ui_state.selected_channel]
ui_state.selected_channel = min(ui_state.selected_channel, len(ui_state.channel_list) - 1)
select_channel(ui_state.selected_channel)
draw_channel_list()
draw_messages_window()
if globals.current_window == 2:
if ui_state.current_window == 2:
curses.curs_set(0)
confirmation = get_list_input(
f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from nodedb?",
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from nodedb?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.removeNode(globals.node_list[globals.selected_node])
interface_state.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node])
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
del globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
del interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
# Convert to "!hex" representation that interface.nodes uses
hexid = f"!{hex(globals.node_list[globals.selected_node])[2:]}"
del globals.interface.nodes[hexid]
hexid = f"!{hex(ui_state.node_list[ui_state.selected_node])[2:]}"
del interface_state.interface.nodes[hexid]
globals.node_list.pop(globals.selected_node)
ui_state.node_list.pop(ui_state.selected_node)
draw_messages_window()
draw_node_list()
@@ -350,68 +337,76 @@ def main_ui(stdscr: curses.window) -> None:
# ^/
elif char == chr(31):
if globals.current_window == 2 or globals.current_window == 0:
search(globals.current_window)
if ui_state.current_window == 2 or ui_state.current_window == 0:
search(ui_state.current_window)
# ^F
elif char == chr(6):
if globals.current_window == 2:
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isFavorite" not in selectedNode or selectedNode["isFavorite"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Favorite?",
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Favorite?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.setFavorite(globals.node_list[globals.selected_node])
interface_state.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]["isFavorite"] = True
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isFavorite"
] = True
refresh_node_list()
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Favorites?",
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Favorites?",
None,
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.removeFavorite(globals.node_list[globals.selected_node])
interface_state.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]["isFavorite"] = False
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isFavorite"
] = False
refresh_node_list()
handle_resize(stdscr, False)
elif char == chr(7):
if globals.current_window == 2:
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
if ui_state.current_window == 2:
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
if "isIgnored" not in selectedNode or selectedNode["isIgnored"] == False:
confirmation = get_list_input(
f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Ignored?",
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.setIgnored(globals.node_list[globals.selected_node])
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]["isIgnored"] = True
interface_state.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isIgnored"
] = True
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Ignored?",
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Ignored?",
"No",
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.removeIgnored(globals.node_list[globals.selected_node])
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]["isIgnored"] = False
interface_state.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isIgnored"
] = False
handle_resize(stdscr, False)
@@ -425,13 +420,12 @@ def main_ui(stdscr: curses.window) -> None:
def draw_channel_list() -> None:
channel_pad.erase()
win_height, win_width = channel_win.getmaxyx()
start_index = max(0, globals.selected_channel - (win_height - 3)) # Leave room for borders
win_width = channel_win.getmaxyx()[1]
channel_pad.resize(len(globals.all_messages), channel_win.getmaxyx()[1])
channel_pad.resize(len(ui_state.all_messages), channel_win.getmaxyx()[1])
idx = 0
for channel in globals.channel_list:
for channel in ui_state.channel_list:
# Convert node number to long name if it's an integer
if isinstance(channel, int):
if is_chat_archived(channel):
@@ -442,7 +436,7 @@ def draw_channel_list() -> None:
channel = channel_name
# Determine whether to add the notification
notification = " " + config.notification_symbol if idx in globals.notifications else ""
notification = " " + config.notification_symbol if idx in ui_state.notifications else ""
# Truncate the channel name if it's too long to fit in the window
truncated_channel = (
@@ -450,20 +444,22 @@ def draw_channel_list() -> None:
).ljust(win_width - 3)
color = get_color("channel_list")
if idx == globals.selected_channel:
if globals.current_window == 0:
if idx == ui_state.selected_channel:
if ui_state.current_window == 0:
color = get_color("channel_list", reverse=True)
remove_notification(globals.selected_channel)
remove_notification(ui_state.selected_channel)
else:
color = get_color("channel_selected")
channel_pad.addstr(idx, 1, truncated_channel, color)
idx += 1
channel_win.attrset(
get_color("window_frame_selected") if globals.current_window == 0 else get_color("window_frame")
get_color("window_frame_selected") if ui_state.current_window == 0 else get_color("window_frame")
)
channel_win.box()
channel_win.attrset((get_color("window_frame")))
draw_main_arrows(channel_win, len(ui_state.channel_list), window=0)
channel_win.refresh()
refresh_pad(0)
@@ -473,10 +469,10 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
"""Update the messages window based on the selected channel and scroll position."""
messages_pad.erase()
channel = globals.channel_list[globals.selected_channel]
channel = ui_state.channel_list[ui_state.selected_channel]
if channel in globals.all_messages:
messages = globals.all_messages[channel]
if channel in ui_state.all_messages:
messages = ui_state.all_messages[channel]
msg_line_count = 0
@@ -499,16 +495,28 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
row += 1
messages_win.attrset(
get_color("window_frame_selected") if globals.current_window == 1 else get_color("window_frame")
get_color("window_frame_selected") if ui_state.current_window == 1 else get_color("window_frame")
)
messages_win.box()
messages_win.attrset(get_color("window_frame"))
messages_win.refresh()
visible_lines = get_msg_window_lines(messages_win, packetlog_win)
if scroll_to_bottom:
globals.selected_message = max(msg_line_count - get_msg_window_lines(), 0)
ui_state.selected_message = max(msg_line_count - visible_lines, 0)
ui_state.start_index[1] = max(msg_line_count - visible_lines, 0)
pass
else:
globals.selected_message = max(min(globals.selected_message, msg_line_count - get_msg_window_lines()), 0)
ui_state.selected_message = max(min(ui_state.selected_message, msg_line_count - visible_lines), 0)
draw_main_arrows(
messages_win,
msg_line_count,
window=1,
log_height=packetlog_win.getmaxyx()[0],
)
messages_win.refresh()
refresh_pad(1)
@@ -526,13 +534,13 @@ def draw_node_list() -> None:
try:
nodes_pad.erase()
box_width = nodes_win.getmaxyx()[1]
nodes_pad.resize(len(globals.node_list) + 1, box_width)
nodes_pad.resize(len(ui_state.node_list) + 1, box_width)
except Exception as e:
logging.error(f"Error Drawing Nodes List: {e}")
logging.error("Traceback: %s", traceback.format_exc())
for i, node_num in enumerate(globals.node_list):
node = globals.interface.nodesByNum[node_num]
for i, node_num in enumerate(ui_state.node_list):
node = interface_state.interface.nodesByNum[node_num]
secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"]
node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, 'long')}".ljust(box_width - 2)[
: box_width - 2
@@ -543,12 +551,16 @@ def draw_node_list() -> None:
if "isIgnored" in node and node["isIgnored"]:
color = "node_ignored"
nodes_pad.addstr(
i, 1, node_str, get_color(color, reverse=globals.selected_node == i and globals.current_window == 2)
i, 1, node_str, get_color(color, reverse=ui_state.selected_node == i and ui_state.current_window == 2)
)
nodes_win.attrset(get_color("window_frame_selected") if globals.current_window == 2 else get_color("window_frame"))
nodes_win.attrset(
get_color("window_frame_selected") if ui_state.current_window == 2 else get_color("window_frame")
)
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
draw_main_arrows(nodes_win, len(ui_state.node_list), window=2)
nodes_win.refresh()
refresh_pad(2)
@@ -560,57 +572,91 @@ def draw_node_list() -> None:
def select_channel(idx: int) -> None:
old_selected_channel = globals.selected_channel
globals.selected_channel = max(0, min(idx, len(globals.channel_list) - 1))
old_selected_channel = ui_state.selected_channel
ui_state.selected_channel = max(0, min(idx, len(ui_state.channel_list) - 1))
draw_messages_window(True)
# For now just re-draw channel list when clearing notifications, we can probably make this more efficient
if globals.selected_channel in globals.notifications:
remove_notification(globals.selected_channel)
if ui_state.selected_channel in ui_state.notifications:
remove_notification(ui_state.selected_channel)
draw_channel_list()
return
highlight_line(False, 0, old_selected_channel)
highlight_line(True, 0, globals.selected_channel)
refresh_pad(0)
move_main_highlight(
old_idx=old_selected_channel,
new_idx=ui_state.selected_channel,
options=ui_state.channel_list,
menu_win=channel_win,
menu_pad=channel_pad,
ui_state=ui_state,
)
def scroll_channels(direction: int) -> None:
new_selected_channel = globals.selected_channel + direction
new_selected_channel = ui_state.selected_channel + direction
if new_selected_channel < 0:
new_selected_channel = len(globals.channel_list) - 1
elif new_selected_channel >= len(globals.channel_list):
new_selected_channel = len(ui_state.channel_list) - 1
elif new_selected_channel >= len(ui_state.channel_list):
new_selected_channel = 0
select_channel(new_selected_channel)
def scroll_messages(direction: int) -> None:
globals.selected_message += direction
ui_state.selected_message += direction
msg_line_count = messages_pad.getmaxyx()[0]
globals.selected_message = max(0, min(globals.selected_message, msg_line_count - get_msg_window_lines()))
ui_state.selected_message = max(
0, min(ui_state.selected_message, msg_line_count - get_msg_window_lines(messages_win, packetlog_win))
)
max_index = msg_line_count - 1
visible_height = get_msg_window_lines(messages_win, packetlog_win)
if ui_state.selected_message < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
ui_state.start_index[ui_state.current_window] = ui_state.selected_message
elif ui_state.selected_message >= ui_state.start_index[ui_state.current_window]: # Moving below the visible area
ui_state.start_index[ui_state.current_window] = ui_state.selected_message
# Ensure start_index is within bounds
ui_state.start_index[ui_state.current_window] = max(
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
)
draw_main_arrows(
messages_win,
msg_line_count,
ui_state.current_window,
log_height=packetlog_win.getmaxyx()[0],
)
messages_win.refresh()
refresh_pad(1)
def select_node(idx: int) -> None:
old_selected_node = globals.selected_node
globals.selected_node = max(0, min(idx, len(globals.node_list) - 1))
old_selected_node = ui_state.selected_node
ui_state.selected_node = max(0, min(idx, len(ui_state.node_list) - 1))
highlight_line(False, 2, old_selected_node)
highlight_line(True, 2, globals.selected_node)
refresh_pad(2)
move_main_highlight(
old_idx=old_selected_node,
new_idx=ui_state.selected_node,
options=ui_state.node_list,
menu_win=nodes_win,
menu_pad=nodes_pad,
ui_state=ui_state,
)
draw_function_win()
def scroll_nodes(direction: int) -> None:
new_selected_node = globals.selected_node + direction
new_selected_node = ui_state.selected_node + direction
if new_selected_node < 0:
new_selected_node = len(globals.node_list) - 1
elif new_selected_node >= len(globals.node_list):
new_selected_node = len(ui_state.node_list) - 1
elif new_selected_node >= len(ui_state.node_list):
new_selected_node = 0
select_node(new_selected_node)
@@ -621,7 +667,7 @@ def draw_packetlog_win() -> None:
columns = [10, 10, 15, 30]
span = 0
if globals.display_log:
if ui_state.display_log:
packetlog_win.erase()
height, width = packetlog_win.getmaxyx()
@@ -634,7 +680,7 @@ def draw_packetlog_win() -> None:
1, 1, headers[: width - 2], get_color("log_header", underline=True)
) # Truncate headers if they exceed window width
for i, packet in enumerate(reversed(globals.packet_buffer)):
for i, packet in enumerate(reversed(ui_state.packet_buffer)):
if i >= height - 3: # Skip if exceeds the window height
break
@@ -670,11 +716,11 @@ def draw_packetlog_win() -> None:
def search(win: int) -> None:
start_idx = globals.selected_node
start_idx = ui_state.selected_node
select_func = select_node
if win == 0:
start_idx = globals.selected_channel
start_idx = ui_state.selected_channel
select_func = select_channel
search_text = ""
@@ -687,7 +733,7 @@ def search(win: int) -> None:
if char in (chr(27), chr(curses.KEY_ENTER), chr(10), chr(13)):
break
elif char == "\t":
start_idx = globals.selected_node + 1 if win == 2 else globals.selected_channel + 1
start_idx = ui_state.selected_node + 1 if win == 2 else ui_state.selected_channel + 1
elif char in (curses.KEY_BACKSPACE, chr(127)):
if search_text:
search_text = search_text[:-1]
@@ -702,7 +748,7 @@ def search(win: int) -> None:
search_text_caseless = search_text.casefold()
l = globals.node_list if win == 2 else globals.channel_list
l = ui_state.node_list if win == 2 else ui_state.channel_list
for i, n in enumerate(l[start_idx:] + l[:start_idx]):
if (
isinstance(n, int)
@@ -720,7 +766,7 @@ def search(win: int) -> None:
def draw_node_details() -> None:
node = None
try:
node = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
except KeyError:
return
@@ -737,7 +783,7 @@ def draw_node_details() -> None:
f" | {node['user']['role']}" if "user" in node and "role" in node["user"] else "",
]
if globals.node_list[globals.selected_node] == globals.myNodeNum:
if ui_state.node_list[ui_state.selected_node] == interface_state.myNodeNum:
node_details_list.extend(
[
(
@@ -801,17 +847,12 @@ def draw_help() -> None:
def draw_function_win() -> None:
if globals.current_window == 2:
if ui_state.current_window == 2:
draw_node_details()
else:
draw_help()
def get_msg_window_lines() -> None:
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if globals.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height
def refresh_pad(window: int) -> None:
win_height = channel_win.getmaxyx()[0]
@@ -819,11 +860,11 @@ def refresh_pad(window: int) -> None:
if window == 1:
pad = messages_pad
box = messages_win
lines = get_msg_window_lines()
selected_item = globals.selected_message
start_index = globals.selected_message
lines = get_msg_window_lines(messages_win, packetlog_win)
selected_item = ui_state.selected_message
start_index = ui_state.selected_message
if globals.display_log:
if ui_state.display_log:
packetlog_win.box()
packetlog_win.refresh()
@@ -831,14 +872,14 @@ def refresh_pad(window: int) -> None:
pad = nodes_pad
box = nodes_win
lines = box.getmaxyx()[0] - 2
selected_item = globals.selected_node
selected_item = ui_state.selected_node
start_index = max(0, selected_item - (win_height - 3)) # Leave room for borders
else:
pad = channel_pad
box = channel_win
lines = box.getmaxyx()[0] - 2
selected_item = globals.selected_channel
selected_item = ui_state.selected_channel
start_index = max(0, selected_item - (win_height - 3)) # Leave room for borders
pad.refresh(
@@ -847,72 +888,18 @@ def refresh_pad(window: int) -> None:
box.getbegyx()[0] + 1,
box.getbegyx()[1] + 1,
box.getbegyx()[0] + lines,
box.getbegyx()[1] + box.getmaxyx()[1] - 2,
box.getbegyx()[1] + box.getmaxyx()[1] - 3,
)
def highlight_line(highlight: bool, window: int, line: int) -> None:
pad = nodes_pad
color = get_color("node_list")
select_len = nodes_win.getmaxyx()[1] - 2
if window == 2:
node_num = globals.node_list[line]
node = globals.interface.nodesByNum[node_num]
if "isFavorite" in node and node["isFavorite"]:
color = get_color("node_favorite")
if "isIgnored" in node and node["isIgnored"]:
color = get_color("node_ignored")
if window == 0:
pad = channel_pad
color = get_color(
"channel_selected" if (line == globals.selected_channel and highlight == False) else "channel_list"
)
select_len = channel_win.getmaxyx()[1] - 2
pad.chgat(line, 1, select_len, color | curses.A_REVERSE if highlight else color)
def setup_arrows(win: curses.window, pad: curses.window) -> None:
margin = 8
height, width = win.getmaxyx()
start_index = [0]
pad_height = pad.getmaxyx()[0]
draw_chat_arrows(win=win, visible_height=height - 3, max_index=pad_height, start_index=start_index)
def draw_chat_arrows(win: object, visible_height: int, max_index: int, start_index: List[int]) -> None:
mi = max_index
height, width = win.getmaxyx()
win.addstr(1, width - 2, "", get_color("settings_default"))
win.addstr(visible_height + 1, width - 2, "", get_color("settings_default"))
# if visible_height < mi:
# if start_index[-1] > 0:
# win.addstr(1, width - 2, "▲", get_color("settings_default"))
# else:
# win.addstr(1, 1, " ", get_color("settings_default"))
# if mi - start_index[-1] >= visible_height + 1:
# win.addstr(visible_height + 1, 1, "▼", get_color("settings_default"))
# else:
# win.addstr(visible_height + 1, 1, " ", get_color("settings_default"))
def add_notification(channel_number: int) -> None:
if channel_number not in globals.notifications:
globals.notifications.append(channel_number)
if channel_number not in ui_state.notifications:
ui_state.notifications.append(channel_number)
def remove_notification(channel_number: int) -> None:
if channel_number in globals.notifications:
globals.notifications.remove(channel_number)
if channel_number in ui_state.notifications:
ui_state.notifications.remove(channel_number)
def draw_text_field(win: curses.window, text: str, color: int) -> None:

View File

@@ -3,6 +3,18 @@ import re
from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict
from contact.utilities.singleton import interface_state, ui_state
def get_node_color(node_index: int, reverse: bool = False):
node_num = ui_state.node_list[node_index]
node = interface_state.interface.nodesByNum.get(node_num, {})
if node.get("isFavorite"):
return get_color("node_favorite", reverse=reverse)
elif node.get("isIgnored"):
return get_color("node_ignored", reverse=reverse)
return get_color("settings_default", reverse=reverse)
# Aliases
Segment = tuple[str, str, bool, bool]
@@ -128,7 +140,6 @@ def draw_arrows(
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
) -> None:
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if show_save_option else 0)
if visible_height < mi:
@@ -316,3 +327,91 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrapped_lines.append(line_buffer)
return wrapped_lines
def move_main_highlight(
old_idx: int, new_idx, options: List[str], menu_win: curses.window, menu_pad: curses.window, ui_state: object
) -> None:
if old_idx == new_idx: # No-op
return
max_index = len(options) - 1
visible_height = menu_win.getmaxyx()[0] - 2
if new_idx < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
ui_state.start_index[ui_state.current_window] = new_idx
elif new_idx >= ui_state.start_index[ui_state.current_window] + visible_height: # Moving below the visible area
ui_state.start_index[ui_state.current_window] = new_idx - visible_height + 1
# Ensure start_index is within bounds
ui_state.start_index[ui_state.current_window] = max(
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
)
highlight_line(menu_win, menu_pad, old_idx, new_idx, visible_height)
if ui_state.current_window == 0: # hack to fix max_index
max_index += 1
draw_main_arrows(menu_win, max_index, window=ui_state.current_window)
menu_win.refresh()
def highlight_line(
menu_win: curses.window, menu_pad: curses.window, old_idx: int, new_idx: int, visible_height: int
) -> None:
if ui_state.current_window == 0:
color_old = (
get_color("channel_selected") if old_idx == ui_state.selected_channel else get_color("channel_list")
)
color_new = get_color("channel_list", reverse=True) if True else get_color("channel_list", reverse=True)
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, color_old)
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
elif ui_state.current_window == 2:
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(
ui_state.start_index[ui_state.current_window],
0,
menu_win.getbegyx()[0] + 1,
menu_win.getbegyx()[1] + 1,
menu_win.getbegyx()[0] + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 3,
)
def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None:
height, width = win.getmaxyx()
usable_height = height - 2
usable_width = width - 2
if window == 1 and ui_state.display_log:
if log_height := kwargs.get("log_height"):
usable_height -= log_height - 1
if usable_height < max_index:
if ui_state.start_index[window] > 0:
win.addstr(1, usable_width, "", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
if max_index - ui_state.start_index[window] - 1 >= usable_height:
win.addstr(usable_height, usable_width, "", get_color("settings_default"))
else:
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
def get_msg_window_lines(messages_win, packetlog_win) -> None:
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height

View File

@@ -1,11 +1,42 @@
from typing import Any, Union, List, Dict
from dataclasses import dataclass, field
@dataclass
class MenuState:
def __init__(self):
self.menu_index: List[int] = [] # Row we left the previous menus
self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit
self.selected_index: int = 0 # Selected Row
self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu
self.menu_path: List[str] = [] # Menu Path
self.show_save_option: bool = False # Display 'Save'
menu_index: List[int] = field(default_factory=list)
start_index: List[int] = field(default_factory=lambda: [0])
selected_index: int = 0
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict)
menu_path: List[str] = field(default_factory=list)
show_save_option: bool = False
@dataclass
class ChatUIState:
display_log: bool = False
channel_list: List[str] = field(default_factory=list)
all_messages: Dict[str, List[str]] = field(default_factory=dict)
notifications: List[str] = field(default_factory=list)
packet_buffer: List[str] = field(default_factory=list)
node_list: List[str] = field(default_factory=list)
selected_channel: int = 0
selected_message: int = 0
selected_node: int = 0
current_window: int = 0
selected_index: int = 0
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
show_save_option: bool = False
menu_path: List[str] = field(default_factory=list)
@dataclass
class InterfaceState:
interface: Any = None
myNodeNum: int = 0
@dataclass
class AppState:
lock: Any = None

View File

@@ -6,12 +6,14 @@ from typing import Optional, Union, Dict
from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
def get_table_name(channel: str) -> str:
# Construct the table name
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
@@ -61,7 +63,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
message_text = ?
"""
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message))
db_connection.commit()
except sqlite3.Error as e:
@@ -72,13 +74,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
def load_messages_from_db() -> None:
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
"""Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages
@@ -104,15 +106,15 @@ def load_messages_from_db() -> None:
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
channel = int(channel) if channel.isdigit() else channel
# Add the channel to globals.channel_list if not already present
if channel not in globals.channel_list and not is_chat_archived(channel):
globals.channel_list.append(channel)
# Add the channel to ui_state.channel_list if not already present
if channel not in ui_state.channel_list and not is_chat_archived(channel):
ui_state.channel_list.append(channel)
# Ensure the channel exists in globals.all_messages
if channel not in globals.all_messages:
globals.all_messages[channel] = []
# Ensure the channel exists in ui_state.all_messages
if channel not in ui_state.all_messages:
ui_state.all_messages[channel] = []
# Add messages to globals.all_messages grouped by hourly timestamp
# Add messages to ui_state.all_messages grouped by hourly timestamp
hourly_messages = {}
for user_id, message, timestamp, ack_type in db_messages:
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
@@ -127,7 +129,7 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak":
ack_str = config.nak_str
if user_id == str(globals.myNodeNum):
if user_id == str(interface_state.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (
@@ -137,10 +139,10 @@ def load_messages_from_db() -> None:
hourly_messages[hour].append(formatted_message)
# Flatten the hourly messages into globals.all_messages[channel]
# Flatten the hourly messages into ui_state.all_messages[channel]
for hour, messages in sorted(hourly_messages.items()):
globals.all_messages[channel].append((f"-- {hour} --", ""))
globals.all_messages[channel].extend(messages)
ui_state.all_messages[channel].append((f"-- {hour} --", ""))
ui_state.all_messages[channel].extend(messages)
except sqlite3.Error as e:
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
@@ -153,11 +155,11 @@ def init_nodedb() -> None:
"""Initialize the node database and update it with nodes from the interface."""
try:
if not globals.interface.nodes:
if not interface_state.interface.nodes:
return # No nodes to initialize
ensure_node_table_exists() # Ensure the table exists before insertion
nodes_snapshot = list(globals.interface.nodes.values())
nodes_snapshot = list(interface_state.interface.nodes.values())
# Insert or update all nodes
for node in nodes_snapshot:
@@ -214,7 +216,7 @@ def update_node_info_in_db(
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
if "chat_archived" not in table_columns:
@@ -278,7 +280,7 @@ def update_node_info_in_db(
def ensure_node_table_exists() -> None:
"""Ensure the node database table exists."""
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety
schema = """
user_id TEXT PRIMARY KEY,
long_name TEXT,
@@ -319,7 +321,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
@@ -345,7 +347,7 @@ def is_chat_archived(user_id: int) -> int:
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))

View File

@@ -0,0 +1,5 @@
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState
ui_state = ChatUIState()
interface_state = InterfaceState()
app_state = AppState()

View File

@@ -1,16 +1,17 @@
import contact.globals as globals
import datetime
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
def get_channels():
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
node = globals.interface.getNode("^local")
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages."""
node = interface_state.interface.getNode("^local")
device_channels = node.channels
# Clear and rebuild channel list
# globals.channel_list = []
# ui_state.channel_list = []
for device_channel in device_channels:
if device_channel.role:
@@ -26,20 +27,20 @@ def get_channels():
].name
channel_name = convert_to_camel_case(modem_preset_string)
# Add channel to globals.channel_list if not already present
if channel_name not in globals.channel_list:
globals.channel_list.append(channel_name)
# Add channel to ui_state.channel_list if not already present
if channel_name not in ui_state.channel_list:
ui_state.channel_list.append(channel_name)
# Initialize globals.all_messages[channel_name] if it doesn't exist
if channel_name not in globals.all_messages:
globals.all_messages[channel_name] = []
# Initialize ui_state.all_messages[channel_name] if it doesn't exist
if channel_name not in ui_state.all_messages:
ui_state.all_messages[channel_name] = []
return globals.channel_list
return ui_state.channel_list
def get_node_list():
if globals.interface.nodes:
my_node_num = globals.myNodeNum
if interface_state.interface.nodes:
my_node_num = interface_state.myNodeNum
def node_sort(node):
if config.node_sort == "lastHeard":
@@ -51,7 +52,7 @@ def get_node_list():
else:
return node
sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort)
# Move favorite nodes to the beginning
sorted_nodes = sorted(
@@ -68,14 +69,14 @@ def get_node_list():
def refresh_node_list():
new_node_list = get_node_list()
if new_node_list != globals.node_list:
globals.node_list = new_node_list
if new_node_list != ui_state.node_list:
ui_state.node_list = new_node_list
return True
return False
def get_nodeNum():
myinfo = globals.interface.getMyNodeInfo()
myinfo = interface_state.interface.getMyNodeInfo()
myNodeNum = myinfo["num"]
return myNodeNum

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.8"
version = "1.3.9"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}