convert globals to dataclass

This commit is contained in:
pdxlocations
2025-04-19 21:17:19 -07:00
parent 61e1799199
commit eb79675be0
7 changed files with 65 additions and 72 deletions

View File

@@ -24,7 +24,6 @@ import traceback
from pubsub import pub
# Local application
import contact.globals as globals
import contact.ui.default_config as config
from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region
@@ -36,7 +35,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input
from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
from contact.utilities.singleton import ui_state
from contact.utilities.singleton import ui_state, interface_state
# ------------------------------------------------------------------------------
# Environment & Logging Setup
@@ -52,7 +51,7 @@ logging.basicConfig(
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
globals.lock = threading.Lock()
interface_state.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
@@ -61,17 +60,17 @@ globals.lock = threading.Lock()
def initialize_globals(args) -> None:
"""Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
interface_state.interface = initialize_interface(args)
# Prompt for region if unset
if globals.interface.localNode.localConfig.lora.region == 0:
if interface_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
ui_state.node_list = get_node_list()
pub.subscribe(on_receive, "meshtastic.receive")
@@ -96,7 +95,7 @@ def main(stdscr: curses.window) -> None:
return
logging.info("Initializing interface...")
with globals.lock:
with interface_state.lock:
initialize_globals(args)
logging.info("Starting main UI")

View File

@@ -1,13 +0,0 @@
interface = None
lock = None
# display_log = False
# all_messages = {}
# channel_list = []
# notifications = []
# packet_buffer = []
# node_list = []
myNodeNum = 0
# selected_channel = 0
# selected_message = 0
# selected_node = 0
# current_window = 0

View File

@@ -18,8 +18,8 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state
from contact.utilities.singleton import ui_state, interface_state
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
@@ -30,7 +30,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with globals.lock:
with interface_state.lock:
# Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
@@ -64,7 +64,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
else:
channel_number = 0
if packet["to"] == globals.myNodeNum:
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:

View File

@@ -13,8 +13,8 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state
from contact.utilities.singleton import ui_state, interface_state
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -37,7 +37,7 @@ def onAckNak(packet: Dict[str, Any]) -> None:
confirm_string = " "
ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str
ack_type = "Implicit"
else:
@@ -172,7 +172,7 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
"""
Sends a chat message using the selected channel.
"""
myid = globals.myNodeNum
myid = interface_state.myNodeNum
send_on_channel = 0
channel_id = ui_state.channel_list[channel]
if isinstance(channel_id, int):
@@ -181,7 +181,7 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
elif isinstance(channel_id, str):
send_on_channel = channel
sent_message_data = globals.interface.sendText(
sent_message_data = interface_state.interface.sendText(
text=message,
destinationId=destination,
wantAck=True,
@@ -231,7 +231,7 @@ def send_traceroute() -> None:
Sends a RouteDiscovery protobuf to the selected node.
"""
r = mesh_pb2.RouteDiscovery()
globals.interface.sendData(
interface_state.interface.sendData(
r,
destinationId=ui_state.node_list[ui_state.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,

View File

@@ -12,9 +12,9 @@ from contact.utilities.db_handler import get_name_from_database, update_node_inf
from contact.utilities.input_handlers import get_list_input
import contact.ui.default_config as config
import contact.ui.dialog
import contact.globals as globals
from contact.utilities.singleton import ui_state
from contact.utilities.singleton import ui_state, interface_state
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
@@ -274,7 +274,7 @@ def main_ui(stdscr: curses.window) -> None:
elif char == "`": # ` Launch the settings interface
curses.curs_set(0)
settings_menu(stdscr, globals.interface)
settings_menu(stdscr, interface_state.interface)
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
@@ -318,14 +318,14 @@ def main_ui(stdscr: curses.window) -> None:
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node])
interface_state.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node])
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
del globals.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
del interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
# Convert to "!hex" representation that interface.nodes uses
hexid = f"!{hex(ui_state.node_list[ui_state.selected_node])[2:]}"
del globals.interface.nodes[hexid]
del interface_state.interface.nodes[hexid]
ui_state.node_list.pop(ui_state.selected_node)
@@ -344,7 +344,7 @@ def main_ui(stdscr: curses.window) -> None:
# ^F
elif char == chr(6):
if ui_state.current_window == 2:
selectedNode = globals.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
@@ -355,9 +355,11 @@ def main_ui(stdscr: curses.window) -> None:
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node])
interface_state.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
globals.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isFavorite"] = True
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isFavorite"
] = True
refresh_node_list()
@@ -368,9 +370,11 @@ def main_ui(stdscr: curses.window) -> None:
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node])
interface_state.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
globals.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isFavorite"] = False
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isFavorite"
] = False
refresh_node_list()
@@ -378,7 +382,7 @@ def main_ui(stdscr: curses.window) -> None:
elif char == chr(7):
if ui_state.current_window == 2:
selectedNode = globals.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
curses.curs_set(0)
@@ -389,8 +393,10 @@ def main_ui(stdscr: curses.window) -> None:
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node])
globals.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isIgnored"] = True
interface_state.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isIgnored"
] = True
else:
confirmation = get_list_input(
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Ignored?",
@@ -398,8 +404,10 @@ def main_ui(stdscr: curses.window) -> None:
["Yes", "No"],
)
if confirmation == "Yes":
globals.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node])
globals.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isIgnored"] = False
interface_state.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node])
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
"isIgnored"
] = False
handle_resize(stdscr, False)
@@ -520,7 +528,7 @@ def draw_node_list() -> None:
logging.error("Traceback: %s", traceback.format_exc())
for i, node_num in enumerate(ui_state.node_list):
node = globals.interface.nodesByNum[node_num]
node = interface_state.interface.nodesByNum[node_num]
secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"]
node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, 'long')}".ljust(box_width - 2)[
: box_width - 2
@@ -710,7 +718,7 @@ def search(win: int) -> None:
def draw_node_details() -> None:
node = None
try:
node = globals.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
except KeyError:
return
@@ -727,7 +735,7 @@ def draw_node_details() -> None:
f" | {node['user']['role']}" if "user" in node and "role" in node["user"] else "",
]
if ui_state.node_list[ui_state.selected_node] == globals.myNodeNum:
if ui_state.node_list[ui_state.selected_node] == interface_state.myNodeNum:
node_details_list.extend(
[
(
@@ -849,7 +857,7 @@ def highlight_line(highlight: bool, window: int, line: int) -> None:
if window == 2:
node_num = ui_state.node_list[line]
node = globals.interface.nodesByNum[node_num]
node = interface_state.interface.nodesByNum[node_num]
if "isFavorite" in node and node["isFavorite"]:
color = get_color("node_favorite")
if "isIgnored" in node and node["isIgnored"]:

View File

@@ -6,14 +6,14 @@ from typing import Optional, Union, Dict
from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state
from contact.utilities.singleton import ui_state, interface_state
def get_table_name(channel: str) -> str:
# Construct the table name
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
@@ -63,7 +63,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
message_text = ?
"""
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message))
db_connection.commit()
except sqlite3.Error as e:
@@ -80,7 +80,7 @@ def load_messages_from_db() -> None:
db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages
@@ -129,7 +129,7 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak":
ack_str = config.nak_str
if user_id == str(globals.myNodeNum):
if user_id == str(interface_state.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (
@@ -155,11 +155,11 @@ def init_nodedb() -> None:
"""Initialize the node database and update it with nodes from the interface."""
try:
if not globals.interface.nodes:
if not interface_state.interface.nodes:
return # No nodes to initialize
ensure_node_table_exists() # Ensure the table exists before insertion
nodes_snapshot = list(globals.interface.nodes.values())
nodes_snapshot = list(interface_state.interface.nodes.values())
# Insert or update all nodes
for node in nodes_snapshot:
@@ -216,7 +216,7 @@ def update_node_info_in_db(
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
if "chat_archived" not in table_columns:
@@ -280,7 +280,7 @@ def update_node_info_in_db(
def ensure_node_table_exists() -> None:
"""Ensure the node database table exists."""
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety
schema = """
user_id TEXT PRIMARY KEY,
long_name TEXT,
@@ -321,7 +321,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
@@ -347,7 +347,7 @@ def is_chat_archived(user_id: int) -> int:
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))

View File

@@ -1,14 +1,13 @@
import contact.globals as globals
import datetime
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state
from contact.utilities.singleton import ui_state, interface_state
def get_channels():
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages."""
node = globals.interface.getNode("^local")
node = interface_state.interface.getNode("^local")
device_channels = node.channels
# Clear and rebuild channel list
@@ -40,8 +39,8 @@ def get_channels():
def get_node_list():
if globals.interface.nodes:
my_node_num = globals.myNodeNum
if interface_state.interface.nodes:
my_node_num = interface_state.myNodeNum
def node_sort(node):
if config.node_sort == "lastHeard":
@@ -53,7 +52,7 @@ def get_node_list():
else:
return node
sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort)
# Move favorite nodes to the beginning
sorted_nodes = sorted(
@@ -77,7 +76,7 @@ def refresh_node_list():
def get_nodeNum():
myinfo = globals.interface.getMyNodeInfo()
myinfo = interface_state.interface.getMyNodeInfo()
myNodeNum = myinfo["num"]
return myNodeNum