From 78f0775ad55e6d1db6484e02ab8ca5f33f955273 Mon Sep 17 00:00:00 2001 From: pdxlocations <117498748+pdxlocations@users.noreply.github.com> Date: Sat, 19 Apr 2025 21:37:54 -0700 Subject: [PATCH 1/2] Convert Globals to Class (#173) * init * convert globals to dataclass * move lock to app state --- contact/__main__.py | 23 +- contact/globals.py | 13 -- contact/message_handlers/rx_handler.py | 39 ++-- contact/message_handlers/tx_handler.py | 51 +++-- contact/ui/contact_ui.py | 304 +++++++++++++------------ contact/ui/ui_state.py | 40 +++- contact/utilities/db_handler.py | 46 ++-- contact/utilities/singleton.py | 5 + contact/utilities/utils.py | 35 +-- 9 files changed, 295 insertions(+), 261 deletions(-) delete mode 100644 contact/globals.py create mode 100644 contact/utilities/singleton.py diff --git a/contact/__main__.py b/contact/__main__.py index cb8ca17..bf5c940 100644 --- a/contact/__main__.py +++ b/contact/__main__.py @@ -24,7 +24,6 @@ import traceback from pubsub import pub # Local application -import contact.globals as globals import contact.ui.default_config as config from contact.message_handlers.rx_handler import on_receive from contact.settings import set_region @@ -36,7 +35,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db from contact.utilities.input_handlers import get_list_input from contact.utilities.interfaces import initialize_interface from contact.utilities.utils import get_channels, get_nodeNum, get_node_list - +from contact.utilities.singleton import ui_state, interface_state, app_state # ------------------------------------------------------------------------------ # Environment & Logging Setup @@ -52,7 +51,7 @@ logging.basicConfig( filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -globals.lock = threading.Lock() +app_state.lock = threading.Lock() # ------------------------------------------------------------------------------ # Main Program Logic @@ -61,19 +60,19 @@ globals.lock = threading.Lock() def initialize_globals(args) -> None: """Initializes interface and shared globals.""" - globals.interface = initialize_interface(args) + interface_state.interface = initialize_interface(args) # Prompt for region if unset - if globals.interface.localNode.localConfig.lora.region == 0: + if interface_state.interface.localNode.localConfig.lora.region == 0: confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"]) if confirmation == "Yes": - set_region(globals.interface) - globals.interface.close() - globals.interface = initialize_interface(args) + set_region(interface_state.interface) + interface_state.interface.close() + interface_state.interface = initialize_interface(args) - globals.myNodeNum = get_nodeNum() - globals.channel_list = get_channels() - globals.node_list = get_node_list() + interface_state.myNodeNum = get_nodeNum() + ui_state.channel_list = get_channels() + ui_state.node_list = get_node_list() pub.subscribe(on_receive, "meshtastic.receive") init_nodedb() @@ -96,7 +95,7 @@ def main(stdscr: curses.window) -> None: return logging.info("Initializing interface...") - with globals.lock: + with app_state.lock: initialize_globals(args) logging.info("Starting main UI") diff --git a/contact/globals.py b/contact/globals.py deleted file mode 100644 index 6eeea66..0000000 --- a/contact/globals.py +++ /dev/null @@ -1,13 +0,0 @@ -interface = None -lock = None -display_log = False -all_messages = {} -channel_list = [] -notifications = [] -packet_buffer = [] -node_list = [] -myNodeNum = 0 -selected_channel = 0 -selected_message = 0 -selected_node = 0 -current_window = 0 diff --git a/contact/message_handlers/rx_handler.py b/contact/message_handlers/rx_handler.py index 9147848..bfba3d6 100644 --- a/contact/message_handlers/rx_handler.py +++ b/contact/message_handlers/rx_handler.py @@ -18,7 +18,8 @@ from contact.utilities.db_handler import ( update_node_info_in_db, ) import contact.ui.default_config as config -import contact.globals as globals + +from contact.utilities.singleton import ui_state, interface_state, app_state def on_receive(packet: Dict[str, Any], interface: Any) -> None: @@ -29,14 +30,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None: packet: The received Meshtastic packet as a dictionary. interface: The Meshtastic interface instance that received the packet. """ - with globals.lock: + with app_state.lock: # Update packet log - globals.packet_buffer.append(packet) - if len(globals.packet_buffer) > 20: + ui_state.packet_buffer.append(packet) + if len(ui_state.packet_buffer) > 20: # Trim buffer to 20 packets - globals.packet_buffer = globals.packet_buffer[-20:] + ui_state.packet_buffer = ui_state.packet_buffer[-20:] - if globals.display_log: + if ui_state.display_log: draw_packetlog_win() try: if "decoded" not in packet: @@ -63,19 +64,19 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None: else: channel_number = 0 - if packet["to"] == globals.myNodeNum: - if packet["from"] in globals.channel_list: + if packet["to"] == interface_state.myNodeNum: + if packet["from"] in ui_state.channel_list: pass else: - globals.channel_list.append(packet["from"]) - if packet["from"] not in globals.all_messages: - globals.all_messages[packet["from"]] = [] + ui_state.channel_list.append(packet["from"]) + if packet["from"] not in ui_state.all_messages: + ui_state.all_messages[packet["from"]] = [] update_node_info_in_db(packet["from"], chat_archived=False) refresh_channels = True - channel_number = globals.channel_list.index(packet["from"]) + channel_number = ui_state.channel_list.index(packet["from"]) - if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]: + if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]: add_notification(channel_number) refresh_channels = True else: @@ -85,15 +86,15 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None: message_from_id = packet["from"] message_from_string = get_name_from_database(message_from_id, type="short") + ":" - if globals.channel_list[channel_number] not in globals.all_messages: - globals.all_messages[globals.channel_list[channel_number]] = [] + if ui_state.channel_list[channel_number] not in ui_state.all_messages: + ui_state.all_messages[ui_state.channel_list[channel_number]] = [] # Timestamp handling current_timestamp = time.time() current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00") # Retrieve the last timestamp if available - channel_messages = globals.all_messages[globals.channel_list[channel_number]] + channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]] if channel_messages: # Check the last entry for a timestamp for entry in reversed(channel_messages): @@ -107,9 +108,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None: # Add a new timestamp if it's a new hour if last_hour != current_hour: - globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", "")) + ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", "")) - globals.all_messages[globals.channel_list[channel_number]].append( + ui_state.all_messages[ui_state.channel_list[channel_number]].append( (f"{config.message_prefix} {message_from_string} ", message_string) ) @@ -118,7 +119,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None: if refresh_messages: draw_messages_window(True) - save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string) + save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string) except KeyError as e: logging.error(f"Error processing packet: {e}") diff --git a/contact/message_handlers/tx_handler.py b/contact/message_handlers/tx_handler.py index dfc9bfd..72b7f36 100644 --- a/contact/message_handlers/tx_handler.py +++ b/contact/message_handlers/tx_handler.py @@ -13,7 +13,8 @@ from contact.utilities.db_handler import ( update_node_info_in_db, ) import contact.ui.default_config as config -import contact.globals as globals + +from contact.utilities.singleton import ui_state, interface_state ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp} @@ -31,12 +32,12 @@ def onAckNak(packet: Dict[str, Any]) -> None: return acknak = ack_naks.pop(request) - message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1] + message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1] confirm_string = " " ack_type = None if packet["decoded"]["routing"]["errorReason"] == "NONE": - if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK + if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK confirm_string = config.ack_implicit_str ack_type = "Implicit" else: @@ -46,15 +47,15 @@ def onAckNak(packet: Dict[str, Any]) -> None: confirm_string = config.nak_str ack_type = "Nak" - globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = ( + ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = ( config.sent_message_prefix + confirm_string + ": ", message, ) update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type) - channel_number = globals.channel_list.index(acknak["channel"]) - if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]: + channel_number = ui_state.channel_list.index(acknak["channel"]) + if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]: draw_messages_window() @@ -137,16 +138,16 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None: msg_str += route_str + "\n" # Print the route back to us - if packet["from"] not in globals.channel_list: - globals.channel_list.append(packet["from"]) + if packet["from"] not in ui_state.channel_list: + ui_state.channel_list.append(packet["from"]) refresh_channels = True if is_chat_archived(packet["from"]): update_node_info_in_db(packet["from"], chat_archived=False) - channel_number = globals.channel_list.index(packet["from"]) + channel_number = ui_state.channel_list.index(packet["from"]) - if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]: + if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]: refresh_messages = True else: add_notification(channel_number) @@ -154,9 +155,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None: message_from_string = get_name_from_database(packet["from"], type="short") + ":\n" - if globals.channel_list[channel_number] not in globals.all_messages: - globals.all_messages[globals.channel_list[channel_number]] = [] - globals.all_messages[globals.channel_list[channel_number]].append( + if ui_state.channel_list[channel_number] not in ui_state.all_messages: + ui_state.all_messages[ui_state.channel_list[channel_number]] = [] + ui_state.all_messages[ui_state.channel_list[channel_number]].append( (f"{config.message_prefix} {message_from_string}", msg_str) ) @@ -164,23 +165,23 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None: draw_channel_list() if refresh_messages: draw_messages_window(True) - save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str) + save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str) def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None: """ Sends a chat message using the selected channel. """ - myid = globals.myNodeNum + myid = interface_state.myNodeNum send_on_channel = 0 - channel_id = globals.channel_list[channel] + channel_id = ui_state.channel_list[channel] if isinstance(channel_id, int): send_on_channel = 0 destination = channel_id elif isinstance(channel_id, str): send_on_channel = channel - sent_message_data = globals.interface.sendText( + sent_message_data = interface_state.interface.sendText( text=message, destinationId=destination, wantAck=True, @@ -190,15 +191,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = ) # Add sent message to the messages dictionary - if channel_id not in globals.all_messages: - globals.all_messages[channel_id] = [] + if channel_id not in ui_state.all_messages: + ui_state.all_messages[channel_id] = [] # Handle timestamp logic current_timestamp = int(datetime.now().timestamp()) # Get current timestamp current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00") # Retrieve the last timestamp if available - channel_messages = globals.all_messages[channel_id] + channel_messages = ui_state.all_messages[channel_id] if channel_messages: # Check the last entry for a timestamp for entry in reversed(channel_messages): @@ -212,15 +213,15 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = # Add a new timestamp if it's a new hour if last_hour != current_hour: - globals.all_messages[channel_id].append((f"-- {current_hour} --", "")) + ui_state.all_messages[channel_id].append((f"-- {current_hour} --", "")) - globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message)) + ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message)) timestamp = save_message_to_db(channel_id, myid, message) ack_naks[sent_message_data.id] = { "channel": channel_id, - "messageIndex": len(globals.all_messages[channel_id]) - 1, + "messageIndex": len(ui_state.all_messages[channel_id]) - 1, "timestamp": timestamp, } @@ -230,9 +231,9 @@ def send_traceroute() -> None: Sends a RouteDiscovery protobuf to the selected node. """ r = mesh_pb2.RouteDiscovery() - globals.interface.sendData( + interface_state.interface.sendData( r, - destinationId=globals.node_list[globals.selected_node], + destinationId=ui_state.node_list[ui_state.selected_node], portNum=portnums_pb2.PortNum.TRACEROUTE_APP, wantResponse=True, onResponse=on_response_traceroute, diff --git a/contact/ui/contact_ui.py b/contact/ui/contact_ui.py index 31e5b9c..5b2c1e4 100644 --- a/contact/ui/contact_ui.py +++ b/contact/ui/contact_ui.py @@ -12,7 +12,9 @@ from contact.utilities.db_handler import get_name_from_database, update_node_inf from contact.utilities.input_handlers import get_list_input import contact.ui.default_config as config import contact.ui.dialog -import contact.globals as globals + + +from contact.utilities.singleton import ui_state, interface_state def handle_resize(stdscr: curses.window, firstrun: bool) -> None: @@ -106,80 +108,80 @@ def main_ui(stdscr: curses.window) -> None: # draw_debug(f"Keypress: {char}") if char == curses.KEY_UP: - if globals.current_window == 0: + if ui_state.current_window == 0: scroll_channels(-1) - elif globals.current_window == 1: + elif ui_state.current_window == 1: scroll_messages(-1) - elif globals.current_window == 2: + elif ui_state.current_window == 2: scroll_nodes(-1) elif char == curses.KEY_DOWN: - if globals.current_window == 0: + if ui_state.current_window == 0: scroll_channels(1) - elif globals.current_window == 1: + elif ui_state.current_window == 1: scroll_messages(1) - elif globals.current_window == 2: + elif ui_state.current_window == 2: scroll_nodes(1) elif char == curses.KEY_HOME: - if globals.current_window == 0: + if ui_state.current_window == 0: select_channel(0) - elif globals.current_window == 1: - globals.selected_message = 0 + elif ui_state.current_window == 1: + ui_state.selected_message = 0 refresh_pad(1) - elif globals.current_window == 2: + elif ui_state.current_window == 2: select_node(0) elif char == curses.KEY_END: - if globals.current_window == 0: - select_channel(len(globals.channel_list) - 1) - elif globals.current_window == 1: + if ui_state.current_window == 0: + select_channel(len(ui_state.channel_list) - 1) + elif ui_state.current_window == 1: msg_line_count = messages_pad.getmaxyx()[0] - globals.selected_message = max(msg_line_count - get_msg_window_lines(), 0) + ui_state.selected_message = max(msg_line_count - get_msg_window_lines(), 0) refresh_pad(1) - elif globals.current_window == 2: - select_node(len(globals.node_list) - 1) + elif ui_state.current_window == 2: + select_node(len(ui_state.node_list) - 1) elif char == curses.KEY_PPAGE: - if globals.current_window == 0: + if ui_state.current_window == 0: select_channel( - globals.selected_channel - (channel_win.getmaxyx()[0] - 2) + ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2) ) # select_channel will bounds check for us - elif globals.current_window == 1: - globals.selected_message = max(globals.selected_message - get_msg_window_lines(), 0) + elif ui_state.current_window == 1: + ui_state.selected_message = max(ui_state.selected_message - get_msg_window_lines(), 0) refresh_pad(1) - elif globals.current_window == 2: + elif ui_state.current_window == 2: select_node( - globals.selected_node - (nodes_win.getmaxyx()[0] - 2) + ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2) ) # select_node will bounds check for us elif char == curses.KEY_NPAGE: - if globals.current_window == 0: + if ui_state.current_window == 0: select_channel( - globals.selected_channel + (channel_win.getmaxyx()[0] - 2) + ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2) ) # select_channel will bounds check for us - elif globals.current_window == 1: + elif ui_state.current_window == 1: msg_line_count = messages_pad.getmaxyx()[0] - globals.selected_message = min( - globals.selected_message + get_msg_window_lines(), msg_line_count - get_msg_window_lines() + ui_state.selected_message = min( + ui_state.selected_message + get_msg_window_lines(), msg_line_count - get_msg_window_lines() ) refresh_pad(1) - elif globals.current_window == 2: + elif ui_state.current_window == 2: select_node( - globals.selected_node + (nodes_win.getmaxyx()[0] - 2) + ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2) ) # select_node will bounds check for us elif char == curses.KEY_LEFT or char == curses.KEY_RIGHT: delta = -1 if char == curses.KEY_LEFT else 1 - old_window = globals.current_window - globals.current_window = (globals.current_window + delta) % 3 + old_window = ui_state.current_window + ui_state.current_window = (ui_state.current_window + delta) % 3 if old_window == 0: channel_win.attrset(get_color("window_frame")) channel_win.box() channel_win.refresh() - highlight_line(False, 0, globals.selected_channel) + highlight_line(False, 0, ui_state.selected_channel) refresh_pad(0) if old_window == 1: messages_win.attrset(get_color("window_frame")) @@ -191,29 +193,29 @@ def main_ui(stdscr: curses.window) -> None: nodes_win.attrset(get_color("window_frame")) nodes_win.box() nodes_win.refresh() - highlight_line(False, 2, globals.selected_node) + highlight_line(False, 2, ui_state.selected_node) refresh_pad(2) - if globals.current_window == 0: + if ui_state.current_window == 0: channel_win.attrset(get_color("window_frame_selected")) channel_win.box() channel_win.attrset(get_color("window_frame")) channel_win.refresh() - highlight_line(True, 0, globals.selected_channel) + highlight_line(True, 0, ui_state.selected_channel) refresh_pad(0) - elif globals.current_window == 1: + elif ui_state.current_window == 1: messages_win.attrset(get_color("window_frame_selected")) messages_win.box() messages_win.attrset(get_color("window_frame")) messages_win.refresh() refresh_pad(1) - elif globals.current_window == 2: + elif ui_state.current_window == 2: draw_function_win() nodes_win.attrset(get_color("window_frame_selected")) nodes_win.box() nodes_win.attrset(get_color("window_frame")) nodes_win.refresh() - highlight_line(True, 2, globals.selected_node) + highlight_line(True, 2, ui_state.selected_node) refresh_pad(2) # Check for Esc @@ -233,20 +235,20 @@ def main_ui(stdscr: curses.window) -> None: handle_resize(stdscr, False) elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)): - if globals.current_window == 2: - node_list = globals.node_list - if node_list[globals.selected_node] not in globals.channel_list: - globals.channel_list.append(node_list[globals.selected_node]) - if node_list[globals.selected_node] not in globals.all_messages: - globals.all_messages[node_list[globals.selected_node]] = [] + if ui_state.current_window == 2: + node_list = ui_state.node_list + if node_list[ui_state.selected_node] not in ui_state.channel_list: + ui_state.channel_list.append(node_list[ui_state.selected_node]) + if node_list[ui_state.selected_node] not in ui_state.all_messages: + ui_state.all_messages[node_list[ui_state.selected_node]] = [] - globals.selected_channel = globals.channel_list.index(node_list[globals.selected_node]) + ui_state.selected_channel = ui_state.channel_list.index(node_list[ui_state.selected_node]) - if is_chat_archived(globals.channel_list[globals.selected_channel]): - update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=False) + if is_chat_archived(ui_state.channel_list[ui_state.selected_channel]): + update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=False) - globals.selected_node = 0 - globals.current_window = 0 + ui_state.selected_node = 0 + ui_state.current_window = 0 draw_node_list() draw_channel_list() @@ -254,7 +256,7 @@ def main_ui(stdscr: curses.window) -> None: elif len(input_text) > 0: # Enter key pressed, send user input as message - send_message(input_text, channel=globals.selected_channel) + send_message(input_text, channel=ui_state.selected_channel) draw_messages_window(True) # Clear entry window and reset input text @@ -272,18 +274,18 @@ def main_ui(stdscr: curses.window) -> None: elif char == "`": # ` Launch the settings interface curses.curs_set(0) - settings_menu(stdscr, globals.interface) + settings_menu(stdscr, interface_state.interface) curses.curs_set(1) refresh_node_list() handle_resize(stdscr, False) elif char == chr(16): # Display packet log - if globals.display_log is False: - globals.display_log = True + if ui_state.display_log is False: + ui_state.display_log = True draw_messages_window(True) else: - globals.display_log = False + ui_state.display_log = False packetlog_win.erase() draw_messages_window(True) @@ -293,39 +295,39 @@ def main_ui(stdscr: curses.window) -> None: # ^D elif char == chr(4): - if globals.current_window == 0: - if isinstance(globals.channel_list[globals.selected_channel], int): - update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=True) + if ui_state.current_window == 0: + if isinstance(ui_state.channel_list[ui_state.selected_channel], int): + update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=True) # Shift notifications up to account for deleted item - for i in range(len(globals.notifications)): - if globals.notifications[i] > globals.selected_channel: - globals.notifications[i] -= 1 + for i in range(len(ui_state.notifications)): + if ui_state.notifications[i] > ui_state.selected_channel: + ui_state.notifications[i] -= 1 - del globals.channel_list[globals.selected_channel] - globals.selected_channel = min(globals.selected_channel, len(globals.channel_list) - 1) - select_channel(globals.selected_channel) + del ui_state.channel_list[ui_state.selected_channel] + ui_state.selected_channel = min(ui_state.selected_channel, len(ui_state.channel_list) - 1) + select_channel(ui_state.selected_channel) draw_channel_list() draw_messages_window() - if globals.current_window == 2: + if ui_state.current_window == 2: curses.curs_set(0) confirmation = get_list_input( - f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from nodedb?", + f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from nodedb?", "No", ["Yes", "No"], ) if confirmation == "Yes": - globals.interface.localNode.removeNode(globals.node_list[globals.selected_node]) + interface_state.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node]) # Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid? - del globals.interface.nodesByNum[globals.node_list[globals.selected_node]] + del interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]] # Convert to "!hex" representation that interface.nodes uses - hexid = f"!{hex(globals.node_list[globals.selected_node])[2:]}" - del globals.interface.nodes[hexid] + hexid = f"!{hex(ui_state.node_list[ui_state.selected_node])[2:]}" + del interface_state.interface.nodes[hexid] - globals.node_list.pop(globals.selected_node) + ui_state.node_list.pop(ui_state.selected_node) draw_messages_window() draw_node_list() @@ -336,68 +338,76 @@ def main_ui(stdscr: curses.window) -> None: # ^/ elif char == chr(31): - if globals.current_window == 2 or globals.current_window == 0: - search(globals.current_window) + if ui_state.current_window == 2 or ui_state.current_window == 0: + search(ui_state.current_window) # ^F elif char == chr(6): - if globals.current_window == 2: - selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]] + if ui_state.current_window == 2: + selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]] curses.curs_set(0) if "isFavorite" not in selectedNode or selectedNode["isFavorite"] == False: confirmation = get_list_input( - f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Favorite?", + f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Favorite?", None, ["Yes", "No"], ) if confirmation == "Yes": - globals.interface.localNode.setFavorite(globals.node_list[globals.selected_node]) + interface_state.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node]) # Maybe we shouldn't be modifying the nodedb, but maybe it should update itself - globals.interface.nodesByNum[globals.node_list[globals.selected_node]]["isFavorite"] = True + interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][ + "isFavorite" + ] = True refresh_node_list() else: confirmation = get_list_input( - f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Favorites?", + f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Favorites?", None, ["Yes", "No"], ) if confirmation == "Yes": - globals.interface.localNode.removeFavorite(globals.node_list[globals.selected_node]) + interface_state.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node]) # Maybe we shouldn't be modifying the nodedb, but maybe it should update itself - globals.interface.nodesByNum[globals.node_list[globals.selected_node]]["isFavorite"] = False + interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][ + "isFavorite" + ] = False refresh_node_list() handle_resize(stdscr, False) elif char == chr(7): - if globals.current_window == 2: - selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]] + if ui_state.current_window == 2: + selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]] curses.curs_set(0) if "isIgnored" not in selectedNode or selectedNode["isIgnored"] == False: confirmation = get_list_input( - f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Ignored?", + f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Ignored?", "No", ["Yes", "No"], ) if confirmation == "Yes": - globals.interface.localNode.setIgnored(globals.node_list[globals.selected_node]) - globals.interface.nodesByNum[globals.node_list[globals.selected_node]]["isIgnored"] = True + interface_state.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node]) + interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][ + "isIgnored" + ] = True else: confirmation = get_list_input( - f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Ignored?", + f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Ignored?", "No", ["Yes", "No"], ) if confirmation == "Yes": - globals.interface.localNode.removeIgnored(globals.node_list[globals.selected_node]) - globals.interface.nodesByNum[globals.node_list[globals.selected_node]]["isIgnored"] = False + interface_state.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node]) + interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][ + "isIgnored" + ] = False handle_resize(stdscr, False) @@ -412,12 +422,12 @@ def main_ui(stdscr: curses.window) -> None: def draw_channel_list() -> None: channel_pad.erase() win_height, win_width = channel_win.getmaxyx() - start_index = max(0, globals.selected_channel - (win_height - 3)) # Leave room for borders + start_index = max(0, ui_state.selected_channel - (win_height - 3)) # Leave room for borders - channel_pad.resize(len(globals.all_messages), channel_win.getmaxyx()[1]) + channel_pad.resize(len(ui_state.all_messages), channel_win.getmaxyx()[1]) idx = 0 - for channel in globals.channel_list: + for channel in ui_state.channel_list: # Convert node number to long name if it's an integer if isinstance(channel, int): if is_chat_archived(channel): @@ -428,7 +438,7 @@ def draw_channel_list() -> None: channel = channel_name # Determine whether to add the notification - notification = " " + config.notification_symbol if idx in globals.notifications else "" + notification = " " + config.notification_symbol if idx in ui_state.notifications else "" # Truncate the channel name if it's too long to fit in the window truncated_channel = ( @@ -436,17 +446,17 @@ def draw_channel_list() -> None: ).ljust(win_width - 3) color = get_color("channel_list") - if idx == globals.selected_channel: - if globals.current_window == 0: + if idx == ui_state.selected_channel: + if ui_state.current_window == 0: color = get_color("channel_list", reverse=True) - remove_notification(globals.selected_channel) + remove_notification(ui_state.selected_channel) else: color = get_color("channel_selected") channel_pad.addstr(idx, 1, truncated_channel, color) idx += 1 channel_win.attrset( - get_color("window_frame_selected") if globals.current_window == 0 else get_color("window_frame") + get_color("window_frame_selected") if ui_state.current_window == 0 else get_color("window_frame") ) channel_win.box() channel_win.attrset((get_color("window_frame"))) @@ -459,10 +469,10 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None: """Update the messages window based on the selected channel and scroll position.""" messages_pad.erase() - channel = globals.channel_list[globals.selected_channel] + channel = ui_state.channel_list[ui_state.selected_channel] - if channel in globals.all_messages: - messages = globals.all_messages[channel] + if channel in ui_state.all_messages: + messages = ui_state.all_messages[channel] msg_line_count = 0 @@ -485,16 +495,16 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None: row += 1 messages_win.attrset( - get_color("window_frame_selected") if globals.current_window == 1 else get_color("window_frame") + get_color("window_frame_selected") if ui_state.current_window == 1 else get_color("window_frame") ) messages_win.box() messages_win.attrset(get_color("window_frame")) messages_win.refresh() if scroll_to_bottom: - globals.selected_message = max(msg_line_count - get_msg_window_lines(), 0) + ui_state.selected_message = max(msg_line_count - get_msg_window_lines(), 0) else: - globals.selected_message = max(min(globals.selected_message, msg_line_count - get_msg_window_lines()), 0) + ui_state.selected_message = max(min(ui_state.selected_message, msg_line_count - get_msg_window_lines()), 0) refresh_pad(1) @@ -512,13 +522,13 @@ def draw_node_list() -> None: try: nodes_pad.erase() box_width = nodes_win.getmaxyx()[1] - nodes_pad.resize(len(globals.node_list) + 1, box_width) + nodes_pad.resize(len(ui_state.node_list) + 1, box_width) except Exception as e: logging.error(f"Error Drawing Nodes List: {e}") logging.error("Traceback: %s", traceback.format_exc()) - for i, node_num in enumerate(globals.node_list): - node = globals.interface.nodesByNum[node_num] + for i, node_num in enumerate(ui_state.node_list): + node = interface_state.interface.nodesByNum[node_num] secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"] node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, 'long')}".ljust(box_width - 2)[ : box_width - 2 @@ -529,10 +539,12 @@ def draw_node_list() -> None: if "isIgnored" in node and node["isIgnored"]: color = "node_ignored" nodes_pad.addstr( - i, 1, node_str, get_color(color, reverse=globals.selected_node == i and globals.current_window == 2) + i, 1, node_str, get_color(color, reverse=ui_state.selected_node == i and ui_state.current_window == 2) ) - nodes_win.attrset(get_color("window_frame_selected") if globals.current_window == 2 else get_color("window_frame")) + nodes_win.attrset( + get_color("window_frame_selected") if ui_state.current_window == 2 else get_color("window_frame") + ) nodes_win.box() nodes_win.attrset(get_color("window_frame")) nodes_win.refresh() @@ -546,57 +558,57 @@ def draw_node_list() -> None: def select_channel(idx: int) -> None: - old_selected_channel = globals.selected_channel - globals.selected_channel = max(0, min(idx, len(globals.channel_list) - 1)) + old_selected_channel = ui_state.selected_channel + ui_state.selected_channel = max(0, min(idx, len(ui_state.channel_list) - 1)) draw_messages_window(True) # For now just re-draw channel list when clearing notifications, we can probably make this more efficient - if globals.selected_channel in globals.notifications: - remove_notification(globals.selected_channel) + if ui_state.selected_channel in ui_state.notifications: + remove_notification(ui_state.selected_channel) draw_channel_list() return highlight_line(False, 0, old_selected_channel) - highlight_line(True, 0, globals.selected_channel) + highlight_line(True, 0, ui_state.selected_channel) refresh_pad(0) def scroll_channels(direction: int) -> None: - new_selected_channel = globals.selected_channel + direction + new_selected_channel = ui_state.selected_channel + direction if new_selected_channel < 0: - new_selected_channel = len(globals.channel_list) - 1 - elif new_selected_channel >= len(globals.channel_list): + new_selected_channel = len(ui_state.channel_list) - 1 + elif new_selected_channel >= len(ui_state.channel_list): new_selected_channel = 0 select_channel(new_selected_channel) def scroll_messages(direction: int) -> None: - globals.selected_message += direction + ui_state.selected_message += direction msg_line_count = messages_pad.getmaxyx()[0] - globals.selected_message = max(0, min(globals.selected_message, msg_line_count - get_msg_window_lines())) + ui_state.selected_message = max(0, min(ui_state.selected_message, msg_line_count - get_msg_window_lines())) refresh_pad(1) def select_node(idx: int) -> None: - old_selected_node = globals.selected_node - globals.selected_node = max(0, min(idx, len(globals.node_list) - 1)) + old_selected_node = ui_state.selected_node + ui_state.selected_node = max(0, min(idx, len(ui_state.node_list) - 1)) highlight_line(False, 2, old_selected_node) - highlight_line(True, 2, globals.selected_node) + highlight_line(True, 2, ui_state.selected_node) refresh_pad(2) draw_function_win() def scroll_nodes(direction: int) -> None: - new_selected_node = globals.selected_node + direction + new_selected_node = ui_state.selected_node + direction if new_selected_node < 0: - new_selected_node = len(globals.node_list) - 1 - elif new_selected_node >= len(globals.node_list): + new_selected_node = len(ui_state.node_list) - 1 + elif new_selected_node >= len(ui_state.node_list): new_selected_node = 0 select_node(new_selected_node) @@ -607,7 +619,7 @@ def draw_packetlog_win() -> None: columns = [10, 10, 15, 30] span = 0 - if globals.display_log: + if ui_state.display_log: packetlog_win.erase() height, width = packetlog_win.getmaxyx() @@ -620,7 +632,7 @@ def draw_packetlog_win() -> None: 1, 1, headers[: width - 2], get_color("log_header", underline=True) ) # Truncate headers if they exceed window width - for i, packet in enumerate(reversed(globals.packet_buffer)): + for i, packet in enumerate(reversed(ui_state.packet_buffer)): if i >= height - 3: # Skip if exceeds the window height break @@ -656,11 +668,11 @@ def draw_packetlog_win() -> None: def search(win: int) -> None: - start_idx = globals.selected_node + start_idx = ui_state.selected_node select_func = select_node if win == 0: - start_idx = globals.selected_channel + start_idx = ui_state.selected_channel select_func = select_channel search_text = "" @@ -673,7 +685,7 @@ def search(win: int) -> None: if char in (chr(27), chr(curses.KEY_ENTER), chr(10), chr(13)): break elif char == "\t": - start_idx = globals.selected_node + 1 if win == 2 else globals.selected_channel + 1 + start_idx = ui_state.selected_node + 1 if win == 2 else ui_state.selected_channel + 1 elif char in (curses.KEY_BACKSPACE, chr(127)): if search_text: search_text = search_text[:-1] @@ -688,7 +700,7 @@ def search(win: int) -> None: search_text_caseless = search_text.casefold() - l = globals.node_list if win == 2 else globals.channel_list + l = ui_state.node_list if win == 2 else ui_state.channel_list for i, n in enumerate(l[start_idx:] + l[:start_idx]): if ( isinstance(n, int) @@ -706,7 +718,7 @@ def search(win: int) -> None: def draw_node_details() -> None: node = None try: - node = globals.interface.nodesByNum[globals.node_list[globals.selected_node]] + node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]] except KeyError: return @@ -723,7 +735,7 @@ def draw_node_details() -> None: f" | {node['user']['role']}" if "user" in node and "role" in node["user"] else "", ] - if globals.node_list[globals.selected_node] == globals.myNodeNum: + if ui_state.node_list[ui_state.selected_node] == interface_state.myNodeNum: node_details_list.extend( [ ( @@ -787,14 +799,14 @@ def draw_help() -> None: def draw_function_win() -> None: - if globals.current_window == 2: + if ui_state.current_window == 2: draw_node_details() else: draw_help() def get_msg_window_lines() -> None: - packetlog_height = packetlog_win.getmaxyx()[0] - 1 if globals.display_log else 0 + packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0 return messages_win.getmaxyx()[0] - 2 - packetlog_height @@ -806,10 +818,10 @@ def refresh_pad(window: int) -> None: pad = messages_pad box = messages_win lines = get_msg_window_lines() - selected_item = globals.selected_message - start_index = globals.selected_message + selected_item = ui_state.selected_message + start_index = ui_state.selected_message - if globals.display_log: + if ui_state.display_log: packetlog_win.box() packetlog_win.refresh() @@ -817,14 +829,14 @@ def refresh_pad(window: int) -> None: pad = nodes_pad box = nodes_win lines = box.getmaxyx()[0] - 2 - selected_item = globals.selected_node + selected_item = ui_state.selected_node start_index = max(0, selected_item - (win_height - 3)) # Leave room for borders else: pad = channel_pad box = channel_win lines = box.getmaxyx()[0] - 2 - selected_item = globals.selected_channel + selected_item = ui_state.selected_channel start_index = max(0, selected_item - (win_height - 3)) # Leave room for borders pad.refresh( @@ -844,8 +856,8 @@ def highlight_line(highlight: bool, window: int, line: int) -> None: select_len = nodes_win.getmaxyx()[1] - 2 if window == 2: - node_num = globals.node_list[line] - node = globals.interface.nodesByNum[node_num] + node_num = ui_state.node_list[line] + node = interface_state.interface.nodesByNum[node_num] if "isFavorite" in node and node["isFavorite"]: color = get_color("node_favorite") if "isIgnored" in node and node["isIgnored"]: @@ -854,7 +866,7 @@ def highlight_line(highlight: bool, window: int, line: int) -> None: if window == 0: pad = channel_pad color = get_color( - "channel_selected" if (line == globals.selected_channel and highlight == False) else "channel_list" + "channel_selected" if (line == ui_state.selected_channel and highlight == False) else "channel_list" ) select_len = channel_win.getmaxyx()[1] - 2 @@ -862,13 +874,13 @@ def highlight_line(highlight: bool, window: int, line: int) -> None: def add_notification(channel_number: int) -> None: - if channel_number not in globals.notifications: - globals.notifications.append(channel_number) + if channel_number not in ui_state.notifications: + ui_state.notifications.append(channel_number) def remove_notification(channel_number: int) -> None: - if channel_number in globals.notifications: - globals.notifications.remove(channel_number) + if channel_number in ui_state.notifications: + ui_state.notifications.remove(channel_number) def draw_text_field(win: curses.window, text: str, color: int) -> None: diff --git a/contact/ui/ui_state.py b/contact/ui/ui_state.py index 0998011..0c09341 100644 --- a/contact/ui/ui_state.py +++ b/contact/ui/ui_state.py @@ -1,11 +1,37 @@ from typing import Any, Union, List, Dict +from dataclasses import dataclass, field +@dataclass class MenuState: - def __init__(self): - self.menu_index: List[int] = [] # Row we left the previous menus - self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit - self.selected_index: int = 0 # Selected Row - self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu - self.menu_path: List[str] = [] # Menu Path - self.show_save_option: bool = False # Display 'Save' + menu_index: List[int] = field(default_factory=list) + start_index: List[int] = field(default_factory=lambda: [0]) + selected_index: int = 0 + current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict) + menu_path: List[str] = field(default_factory=list) + show_save_option: bool = False + + +@dataclass +class ChatUIState: + display_log: bool = False + channel_list: List[str] = field(default_factory=list) + all_messages: Dict[str, List[str]] = field(default_factory=dict) + notifications: List[str] = field(default_factory=list) + packet_buffer: List[str] = field(default_factory=list) + node_list: List[str] = field(default_factory=list) + selected_channel: int = 0 + selected_message: int = 0 + selected_node: int = 0 + current_window: int = 0 + + +@dataclass +class InterfaceState: + interface: Any = None + myNodeNum: int = 0 + + +@dataclass +class AppState: + lock: Any = None diff --git a/contact/utilities/db_handler.py b/contact/utilities/db_handler.py index b6e7ba2..fc5512e 100644 --- a/contact/utilities/db_handler.py +++ b/contact/utilities/db_handler.py @@ -6,12 +6,14 @@ from typing import Optional, Union, Dict from contact.utilities.utils import decimal_to_hex import contact.ui.default_config as config -import contact.globals as globals + + +from contact.utilities.singleton import ui_state, interface_state def get_table_name(channel: str) -> str: # Construct the table name - table_name = f"{str(globals.myNodeNum)}_{channel}_messages" + table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages" quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces return quoted_table_name @@ -61,7 +63,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None message_text = ? """ - db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message)) + db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message)) db_connection.commit() except sqlite3.Error as e: @@ -72,13 +74,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None def load_messages_from_db() -> None: - """Load messages from the database for all channels and update globals.all_messages and globals.channel_list.""" + """Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list.""" try: with sqlite3.connect(config.db_file_path) as db_connection: db_cursor = db_connection.cursor() query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?" - db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",)) + db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",)) tables = [row[0] for row in db_cursor.fetchall()] # Iterate through each table and fetch its messages @@ -104,15 +106,15 @@ def load_messages_from_db() -> None: # Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name) channel = int(channel) if channel.isdigit() else channel - # Add the channel to globals.channel_list if not already present - if channel not in globals.channel_list and not is_chat_archived(channel): - globals.channel_list.append(channel) + # Add the channel to ui_state.channel_list if not already present + if channel not in ui_state.channel_list and not is_chat_archived(channel): + ui_state.channel_list.append(channel) - # Ensure the channel exists in globals.all_messages - if channel not in globals.all_messages: - globals.all_messages[channel] = [] + # Ensure the channel exists in ui_state.all_messages + if channel not in ui_state.all_messages: + ui_state.all_messages[channel] = [] - # Add messages to globals.all_messages grouped by hourly timestamp + # Add messages to ui_state.all_messages grouped by hourly timestamp hourly_messages = {} for user_id, message, timestamp, ack_type in db_messages: hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00") @@ -127,7 +129,7 @@ def load_messages_from_db() -> None: elif ack_type == "Nak": ack_str = config.nak_str - if user_id == str(globals.myNodeNum): + if user_id == str(interface_state.myNodeNum): formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message) else: formatted_message = ( @@ -137,10 +139,10 @@ def load_messages_from_db() -> None: hourly_messages[hour].append(formatted_message) - # Flatten the hourly messages into globals.all_messages[channel] + # Flatten the hourly messages into ui_state.all_messages[channel] for hour, messages in sorted(hourly_messages.items()): - globals.all_messages[channel].append((f"-- {hour} --", "")) - globals.all_messages[channel].extend(messages) + ui_state.all_messages[channel].append((f"-- {hour} --", "")) + ui_state.all_messages[channel].extend(messages) except sqlite3.Error as e: logging.error(f"SQLite error while loading messages from table '{table_name}': {e}") @@ -153,11 +155,11 @@ def init_nodedb() -> None: """Initialize the node database and update it with nodes from the interface.""" try: - if not globals.interface.nodes: + if not interface_state.interface.nodes: return # No nodes to initialize ensure_node_table_exists() # Ensure the table exists before insertion - nodes_snapshot = list(globals.interface.nodes.values()) + nodes_snapshot = list(interface_state.interface.nodes.values()) # Insert or update all nodes for node in nodes_snapshot: @@ -214,7 +216,7 @@ def update_node_info_in_db( with sqlite3.connect(config.db_file_path) as db_connection: db_cursor = db_connection.cursor() - table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names + table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")] if "chat_archived" not in table_columns: @@ -278,7 +280,7 @@ def update_node_info_in_db( def ensure_node_table_exists() -> None: """Ensure the node database table exists.""" - table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety + table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety schema = """ user_id TEXT PRIMARY KEY, long_name TEXT, @@ -319,7 +321,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str: db_cursor = db_connection.cursor() # Construct table name - table_name = f"{str(globals.myNodeNum)}_nodedb" + table_name = f"{str(interface_state.myNodeNum)}_nodedb" nodeinfo_table = f'"{table_name}"' # Quote table name for safety # Determine the correct column to fetch @@ -345,7 +347,7 @@ def is_chat_archived(user_id: int) -> int: try: with sqlite3.connect(config.db_file_path) as db_connection: db_cursor = db_connection.cursor() - table_name = f"{str(globals.myNodeNum)}_nodedb" + table_name = f"{str(interface_state.myNodeNum)}_nodedb" nodeinfo_table = f'"{table_name}"' query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?" db_cursor.execute(query, (user_id,)) diff --git a/contact/utilities/singleton.py b/contact/utilities/singleton.py new file mode 100644 index 0000000..94e9171 --- /dev/null +++ b/contact/utilities/singleton.py @@ -0,0 +1,5 @@ +from ui.ui_state import ChatUIState, InterfaceState, AppState + +ui_state = ChatUIState() +interface_state = InterfaceState() +app_state = AppState() diff --git a/contact/utilities/utils.py b/contact/utilities/utils.py index cfbd909..83fab87 100644 --- a/contact/utilities/utils.py +++ b/contact/utilities/utils.py @@ -1,16 +1,17 @@ -import contact.globals as globals import datetime from meshtastic.protobuf import config_pb2 import contact.ui.default_config as config +from contact.utilities.singleton import ui_state, interface_state + def get_channels(): - """Retrieve channels from the node and update globals.channel_list and globals.all_messages.""" - node = globals.interface.getNode("^local") + """Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages.""" + node = interface_state.interface.getNode("^local") device_channels = node.channels # Clear and rebuild channel list - # globals.channel_list = [] + # ui_state.channel_list = [] for device_channel in device_channels: if device_channel.role: @@ -26,20 +27,20 @@ def get_channels(): ].name channel_name = convert_to_camel_case(modem_preset_string) - # Add channel to globals.channel_list if not already present - if channel_name not in globals.channel_list: - globals.channel_list.append(channel_name) + # Add channel to ui_state.channel_list if not already present + if channel_name not in ui_state.channel_list: + ui_state.channel_list.append(channel_name) - # Initialize globals.all_messages[channel_name] if it doesn't exist - if channel_name not in globals.all_messages: - globals.all_messages[channel_name] = [] + # Initialize ui_state.all_messages[channel_name] if it doesn't exist + if channel_name not in ui_state.all_messages: + ui_state.all_messages[channel_name] = [] - return globals.channel_list + return ui_state.channel_list def get_node_list(): - if globals.interface.nodes: - my_node_num = globals.myNodeNum + if interface_state.interface.nodes: + my_node_num = interface_state.myNodeNum def node_sort(node): if config.node_sort == "lastHeard": @@ -51,7 +52,7 @@ def get_node_list(): else: return node - sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort) + sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort) # Move favorite nodes to the beginning sorted_nodes = sorted( @@ -68,14 +69,14 @@ def get_node_list(): def refresh_node_list(): new_node_list = get_node_list() - if new_node_list != globals.node_list: - globals.node_list = new_node_list + if new_node_list != ui_state.node_list: + ui_state.node_list = new_node_list return True return False def get_nodeNum(): - myinfo = globals.interface.getMyNodeInfo() + myinfo = interface_state.interface.getMyNodeInfo() myNodeNum = myinfo["num"] return myNodeNum From 7fc1cbc3a91070cf3105f19c61f781d8c1c0beab Mon Sep 17 00:00:00 2001 From: Russell Schmidt Date: Wed, 23 Apr 2025 17:02:47 -0500 Subject: [PATCH 2/2] Fix error "No module named 'ui.ui_state'" Was unable to run locally at tips due to an import not including the package name. --- contact/utilities/singleton.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contact/utilities/singleton.py b/contact/utilities/singleton.py index 94e9171..17fb99b 100644 --- a/contact/utilities/singleton.py +++ b/contact/utilities/singleton.py @@ -1,4 +1,4 @@ -from ui.ui_state import ChatUIState, InterfaceState, AppState +from contact.ui.ui_state import ChatUIState, InterfaceState, AppState ui_state = ChatUIState() interface_state = InterfaceState()