1
0
forked from iarv/contact

Compare commits

..

1 Commits

Author SHA1 Message Date
pdxlocations
3d1a489fd1 maybe dead end until refactor scrolling 2025-04-19 18:57:47 -07:00
15 changed files with 640 additions and 905 deletions

View File

@@ -24,6 +24,7 @@ import traceback
from pubsub import pub from pubsub import pub
# Local application # Local application
import contact.globals as globals
import contact.ui.default_config as config import contact.ui.default_config as config
from contact.message_handlers.rx_handler import on_receive from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region from contact.settings import set_region
@@ -35,7 +36,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input from contact.utilities.input_handlers import get_list_input
from contact.utilities.interfaces import initialize_interface from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
from contact.utilities.singleton import ui_state, interface_state, app_state
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Environment & Logging Setup # Environment & Logging Setup
@@ -51,27 +52,28 @@ logging.basicConfig(
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
) )
app_state.lock = threading.Lock() globals.lock = threading.Lock()
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Main Program Logic # Main Program Logic
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
def prompt_region_if_unset(args: object) -> None:
"""Prompt user to set region if it is unset."""
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
def initialize_globals(args: object) -> None: def initialize_globals(args) -> None:
"""Initializes interface and shared globals.""" """Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
interface_state.myNodeNum = get_nodeNum() # Prompt for region if unset
ui_state.channel_list = get_channels() if globals.interface.localNode.localConfig.lora.region == 0:
ui_state.node_list = get_node_list() confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, "meshtastic.receive") pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb() init_nodedb()
@@ -80,63 +82,55 @@ def initialize_globals(args: object) -> None:
def main(stdscr: curses.window) -> None: def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI.""" """Main entry point for the curses UI."""
output_capture = io.StringIO() output_capture = io.StringIO()
try: try:
setup_colors() with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
draw_splash(stdscr) setup_colors()
draw_splash(stdscr)
args = setup_parser().parse_args() args = setup_parser().parse_args()
if getattr(args, "settings", False): if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True) subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return return
logging.info("Initializing interface...") logging.info("Initializing interface...")
with app_state.lock: with globals.lock:
interface_state.interface = initialize_interface(args) initialize_globals(args)
logging.info("Starting main UI")
if interface_state.interface.localNode.localConfig.lora.region == 0: main_ui(stdscr)
prompt_region_if_unset(args)
initialize_globals(args) except Exception as e:
logging.info("Starting main UI") console_output = output_capture.getvalue()
logging.error("Uncaught exception: %s", e)
try: logging.error("Traceback: %s", traceback.format_exc())
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture): logging.error("Console output:\n%s", console_output)
main_ui(stdscr)
except Exception:
console_output = output_capture.getvalue()
logging.error("Uncaught exception inside main_ui")
logging.error("Traceback:\n%s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
return
except Exception:
raise raise
def start() -> None: def start() -> None:
"""Entry point for the application.""" """Launch curses wrapper and redirect logs to file."""
if "--help" in sys.argv or "-h" in sys.argv: if "--help" in sys.argv or "-h" in sys.argv:
setup_parser().print_help() setup_parser().print_help()
sys.exit(0) sys.exit(0)
try: with open(config.log_file_path, "a", buffering=1) as log_f:
curses.wrapper(main) sys.stdout = log_f
except KeyboardInterrupt: sys.stderr = log_f
logging.info("User exited with Ctrl+C")
sys.exit(0) with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
except Exception as e: try:
logging.critical("Fatal error", exc_info=True) curses.wrapper(main)
try: except KeyboardInterrupt:
curses.endwin() logging.info("User exited with Ctrl+C")
except Exception: sys.exit(0)
pass except Exception as e:
print("Fatal error:", e) logging.error("Fatal error: %s", e)
traceback.print_exc() logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1) sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":

13
contact/globals.py Normal file
View File

@@ -0,0 +1,13 @@
interface = None
lock = None
display_log = False
all_messages = {}
channel_list = []
notifications = []
packet_buffer = []
node_list = []
myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0
current_window = 0

View File

@@ -1,14 +1,9 @@
import logging import logging
import os import time
import platform from datetime import datetime
import shutil
import subprocess
from typing import Any, Dict from typing import Any, Dict
from contact.utilities.utils import ( from contact.utilities.utils import refresh_node_list
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import ( from contact.ui.contact_ui import (
draw_packetlog_win, draw_packetlog_win,
draw_node_list, draw_node_list,
@@ -23,48 +18,7 @@ from contact.utilities.db_handler import (
update_node_info_in_db, update_node_info_in_db,
) )
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
elif shutil.which("ffplay") and os.path.exists(ogg_path):
executable = "ffplay"
sound_path = ogg_path
elif shutil.which("aplay") and os.path.exists(wav_path):
executable = "aplay"
sound_path = wav_path
else:
logging.warning("No suitable sound player or sound file found on Linux")
if executable and sound_path:
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
def on_receive(packet: Dict[str, Any], interface: Any) -> None: def on_receive(packet: Dict[str, Any], interface: Any) -> None:
@@ -75,14 +29,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
packet: The received Meshtastic packet as a dictionary. packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet. interface: The Meshtastic interface instance that received the packet.
""" """
with app_state.lock: with globals.lock:
# Update packet log # Update packet log
ui_state.packet_buffer.append(packet) globals.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20: if len(globals.packet_buffer) > 20:
# Trim buffer to 20 packets # Trim buffer to 20 packets
ui_state.packet_buffer = ui_state.packet_buffer[-20:] globals.packet_buffer = globals.packet_buffer[-20:]
if ui_state.display_log: if globals.display_log:
draw_packetlog_win() draw_packetlog_win()
try: try:
if "decoded" not in packet: if "decoded" not in packet:
@@ -98,10 +52,6 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
maybe_store_nodeinfo_in_db(packet) maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP": elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
if config.notification_sound == "True":
play_sound()
message_bytes = packet["decoded"]["payload"] message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8") message_string = message_bytes.decode("utf-8")
@@ -113,21 +63,19 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
else: else:
channel_number = 0 channel_number = 0
if packet["to"] == interface_state.myNodeNum: if packet["to"] == globals.myNodeNum:
if packet["from"] in ui_state.channel_list: if packet["from"] in globals.channel_list:
pass pass
else: else:
ui_state.channel_list.append(packet["from"]) globals.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages: if packet["from"] not in globals.all_messages:
ui_state.all_messages[packet["from"]] = [] globals.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False) update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True refresh_channels = True
channel_number = ui_state.channel_list.index(packet["from"]) channel_number = globals.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number] if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number) add_notification(channel_number)
refresh_channels = True refresh_channels = True
else: else:
@@ -137,14 +85,40 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"] message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":" message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string) if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[globals.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
globals.all_messages[globals.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
if refresh_channels: if refresh_channels:
draw_channel_list() draw_channel_list()
if refresh_messages: if refresh_messages:
draw_messages_window(True) draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string) save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
except KeyError as e: except KeyError as e:
logging.error(f"Error processing packet: {e}") logging.error(f"Error processing packet: {e}")

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict from typing import Any, Dict
import google.protobuf.json_format import google.protobuf.json_format
@@ -12,10 +13,7 @@ from contact.utilities.db_handler import (
update_node_info_in_db, update_node_info_in_db,
) )
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.utils import add_new_message
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp} ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -33,12 +31,12 @@ def onAckNak(packet: Dict[str, Any]) -> None:
return return
acknak = ack_naks.pop(request) acknak = ack_naks.pop(request)
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1] message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
confirm_string = " " confirm_string = " "
ack_type = None ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE": if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str confirm_string = config.ack_implicit_str
ack_type = "Implicit" ack_type = "Implicit"
else: else:
@@ -48,15 +46,15 @@ def onAckNak(packet: Dict[str, Any]) -> None:
confirm_string = config.nak_str confirm_string = config.nak_str
ack_type = "Nak" ack_type = "Nak"
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = ( globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
config.sent_message_prefix + confirm_string + ": ", config.sent_message_prefix + confirm_string + ": ",
message, message,
) )
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type) update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
channel_number = ui_state.channel_list.index(acknak["channel"]) channel_number = globals.channel_list.index(acknak["channel"])
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]: if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
draw_messages_window() draw_messages_window()
@@ -139,17 +137,16 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
msg_str += route_str + "\n" # Print the route back to us msg_str += route_str + "\n" # Print the route back to us
if packet["from"] not in ui_state.channel_list: if packet["from"] not in globals.channel_list:
ui_state.channel_list.append(packet["from"]) globals.channel_list.append(packet["from"])
refresh_channels = True refresh_channels = True
if is_chat_archived(packet["from"]): if is_chat_archived(packet["from"]):
update_node_info_in_db(packet["from"], chat_archived=False) update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = ui_state.channel_list.index(packet["from"]) channel_number = globals.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id == ui_state.channel_list[ui_state.selected_channel]: if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
refresh_messages = True refresh_messages = True
else: else:
add_notification(channel_number) add_notification(channel_number)
@@ -157,29 +154,33 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n" message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str) if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
if refresh_channels: if refresh_channels:
draw_channel_list() draw_channel_list()
if refresh_messages: if refresh_messages:
draw_messages_window(True) draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None: def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
""" """
Sends a chat message using the selected channel. Sends a chat message using the selected channel.
""" """
myid = interface_state.myNodeNum myid = globals.myNodeNum
send_on_channel = 0 send_on_channel = 0
channel_id = ui_state.channel_list[channel] channel_id = globals.channel_list[channel]
if isinstance(channel_id, int): if isinstance(channel_id, int):
send_on_channel = 0 send_on_channel = 0
destination = channel_id destination = channel_id
elif isinstance(channel_id, str): elif isinstance(channel_id, str):
send_on_channel = channel send_on_channel = channel
sent_message_data = interface_state.interface.sendText( sent_message_data = globals.interface.sendText(
text=message, text=message,
destinationId=destination, destinationId=destination,
wantAck=True, wantAck=True,
@@ -188,13 +189,38 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
channelIndex=send_on_channel, channelIndex=send_on_channel,
) )
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message) # Add sent message to the messages dictionary
if channel_id not in globals.all_messages:
globals.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message) timestamp = save_message_to_db(channel_id, myid, message)
ack_naks[sent_message_data.id] = { ack_naks[sent_message_data.id] = {
"channel": channel_id, "channel": channel_id,
"messageIndex": len(ui_state.all_messages[channel_id]) - 1, "messageIndex": len(globals.all_messages[channel_id]) - 1,
"timestamp": timestamp, "timestamp": timestamp,
} }
@@ -203,14 +229,10 @@ def send_traceroute() -> None:
""" """
Sends a RouteDiscovery protobuf to the selected node. Sends a RouteDiscovery protobuf to the selected node.
""" """
channel_id = ui_state.node_list[ui_state.selected_node]
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
r = mesh_pb2.RouteDiscovery() r = mesh_pb2.RouteDiscovery()
interface_state.interface.sendData( globals.interface.sendData(
r, r,
destinationId=channel_id, destinationId=globals.node_list[globals.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP, portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True, wantResponse=True,
onResponse=on_response_traceroute, onResponse=on_response_traceroute,

File diff suppressed because it is too large Load Diff

View File

@@ -123,18 +123,15 @@ def initialize_config() -> Dict[str, object]:
"settings_breadcrumbs": ["green", "black"], "settings_breadcrumbs": ["green", "black"],
"settings_warning": ["green", "black"], "settings_warning": ["green", "black"],
"settings_note": ["green", "black"], "settings_note": ["green", "black"],
"node_favorite": ["cyan", "green"], "node_favorite": ["cyan", "white"],
"node_ignored": ["red", "black"], "node_ignored": ["red", "white"],
} }
default_config_variables = { default_config_variables = {
"channel_list_16ths": "3",
"node_list_16ths": "5",
"db_file_path": db_file_path, "db_file_path": db_file_path,
"log_file_path": log_file_path, "log_file_path": log_file_path,
"message_prefix": ">>", "message_prefix": ">>",
"sent_message_prefix": ">> Sent", "sent_message_prefix": ">> Sent",
"notification_symbol": "*", "notification_symbol": "*",
"notification_sound": "True",
"ack_implicit_str": "[◌]", "ack_implicit_str": "[◌]",
"ack_str": "[✓]", "ack_str": "[✓]",
"nak_str": "[x]", "nak_str": "[x]",
@@ -173,23 +170,18 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
global db_file_path, log_file_path, message_prefix, sent_message_prefix global db_file_path, log_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global node_list_16ths, channel_list_16ths
global theme, COLOR_CONFIG global theme, COLOR_CONFIG
global node_sort, notification_sound global node_sort
channel_list_16ths = loaded_config["channel_list_16ths"]
node_list_16ths = loaded_config["node_list_16ths"]
db_file_path = loaded_config["db_file_path"] db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"] log_file_path = loaded_config["log_file_path"]
message_prefix = loaded_config["message_prefix"] message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"] sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"] notification_symbol = loaded_config["notification_symbol"]
notification_sound = loaded_config["notification_sound"]
ack_implicit_str = loaded_config["ack_implicit_str"] ack_implicit_str = loaded_config["ack_implicit_str"]
ack_str = loaded_config["ack_str"] ack_str = loaded_config["ack_str"]
nak_str = loaded_config["nak_str"] nak_str = loaded_config["nak_str"]
ack_unknown_str = loaded_config["ack_unknown_str"] ack_unknown_str = loaded_config["ack_unknown_str"]
node_sort = loaded_config["node_sort"]
theme = loaded_config["theme"] theme = loaded_config["theme"]
if theme == "dark": if theme == "dark":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"] COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
@@ -197,6 +189,7 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"] COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green": elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"] COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
node_sort = loaded_config["node_sort"]
# Call the function when the script is imported # Call the function when the script is imported

View File

@@ -1,22 +1,8 @@
import curses import curses
import re import re
from unicodedata import east_asian_width
from contact.ui.colors import get_color from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict from typing import Any, Optional, List, Dict
from contact.utilities.singleton import interface_state, ui_state
def get_node_color(node_index: int, reverse: bool = False):
node_num = ui_state.node_list[node_index]
node = interface_state.interface.nodesByNum.get(node_num, {})
if node.get("isFavorite"):
return get_color("node_favorite", reverse=reverse)
elif node.get("isIgnored"):
return get_color("node_ignored", reverse=reverse)
return get_color("settings_default", reverse=reverse)
# Aliases # Aliases
Segment = tuple[str, str, bool, bool] Segment = tuple[str, str, bool, bool]
@@ -142,6 +128,7 @@ def draw_arrows(
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
) -> None: ) -> None:
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if show_save_option else 0) mi = max_index - (2 if show_save_option else 0)
if visible_height < mi: if visible_height < mi:
@@ -295,16 +282,9 @@ def get_wrapped_help_text(
return wrapped_help return wrapped_help
def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def wrap_text(text: str, wrap_width: int) -> List[str]: def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words.""" """Wraps text while preserving spaces and breaking long words."""
whitespace = '\t\n\x0b\x0c\r '
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
text = text.translate(whitespace_trans)
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
wrapped_lines = [] wrapped_lines = []
line_buffer = "" line_buffer = ""
@@ -313,11 +293,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrap_width -= margin wrap_width -= margin
for word in words: for word in words:
word_length = text_width(word) word_length = len(word)
if word_length > wrap_width: # Break long words if word_length > wrap_width: # Break long words
if line_buffer: if line_buffer:
wrapped_lines.append(line_buffer.strip()) wrapped_lines.append(line_buffer)
line_buffer = "" line_buffer = ""
line_length = 0 line_length = 0
for i in range(0, word_length, wrap_width): for i in range(0, word_length, wrap_width):
@@ -325,7 +305,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
continue continue
if line_length + word_length > wrap_width and word.strip(): if line_length + word_length > wrap_width and word.strip():
wrapped_lines.append(line_buffer.strip()) wrapped_lines.append(line_buffer)
line_buffer = "" line_buffer = ""
line_length = 0 line_length = 0
@@ -333,94 +313,6 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
line_length += word_length line_length += word_length
if line_buffer: if line_buffer:
wrapped_lines.append(line_buffer.strip()) wrapped_lines.append(line_buffer)
return wrapped_lines return wrapped_lines
def move_main_highlight(
old_idx: int, new_idx, options: List[str], menu_win: curses.window, menu_pad: curses.window, ui_state: object
) -> None:
if old_idx == new_idx: # No-op
return
max_index = len(options) - 1
visible_height = menu_win.getmaxyx()[0] - 2
if new_idx < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
ui_state.start_index[ui_state.current_window] = new_idx
elif new_idx >= ui_state.start_index[ui_state.current_window] + visible_height: # Moving below the visible area
ui_state.start_index[ui_state.current_window] = new_idx - visible_height + 1
# Ensure start_index is within bounds
ui_state.start_index[ui_state.current_window] = max(
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
)
highlight_line(menu_win, menu_pad, old_idx, new_idx, visible_height)
if ui_state.current_window == 0: # hack to fix max_index
max_index += 1
draw_main_arrows(menu_win, max_index, window=ui_state.current_window)
menu_win.refresh()
def highlight_line(
menu_win: curses.window, menu_pad: curses.window, old_idx: int, new_idx: int, visible_height: int
) -> None:
if ui_state.current_window == 0:
color_old = (
get_color("channel_selected") if old_idx == ui_state.selected_channel else get_color("channel_list")
)
color_new = get_color("channel_list", reverse=True) if True else get_color("channel_list", reverse=True)
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, color_old)
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
elif ui_state.current_window == 2:
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(
ui_state.start_index[ui_state.current_window],
0,
menu_win.getbegyx()[0] + 1,
menu_win.getbegyx()[1] + 1,
menu_win.getbegyx()[0] + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 3,
)
def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None:
height, width = win.getmaxyx()
usable_height = height - 2
usable_width = width - 2
if window == 1 and ui_state.display_log:
if log_height := kwargs.get("log_height"):
usable_height -= log_height - 1
if usable_height < max_index:
if ui_state.start_index[window] > 0:
win.addstr(1, usable_width, "", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
if max_index - ui_state.start_index[window] - 1 >= usable_height:
win.addstr(usable_height, usable_width, "", get_color("settings_default"))
else:
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
def get_msg_window_lines(messages_win, packetlog_win) -> None:
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height

View File

@@ -1,42 +1,11 @@
from typing import Any, Union, List, Dict from typing import Any, Union, List, Dict
from dataclasses import dataclass, field
@dataclass
class MenuState: class MenuState:
menu_index: List[int] = field(default_factory=list) def __init__(self):
start_index: List[int] = field(default_factory=lambda: [0]) self.menu_index: List[int] = [] # Row we left the previous menus
selected_index: int = 0 self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict) self.selected_index: int = 0 # Selected Row
menu_path: List[str] = field(default_factory=list) self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu
show_save_option: bool = False self.menu_path: List[str] = [] # Menu Path
self.show_save_option: bool = False # Display 'Save'
@dataclass
class ChatUIState:
display_log: bool = False
channel_list: List[str] = field(default_factory=list)
all_messages: Dict[str, List[str]] = field(default_factory=dict)
notifications: List[str] = field(default_factory=list)
packet_buffer: List[str] = field(default_factory=list)
node_list: List[str] = field(default_factory=list)
selected_channel: int = 0
selected_message: int = 0
selected_node: int = 0
current_window: int = 0
selected_index: int = 0
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
show_save_option: bool = False
menu_path: List[str] = field(default_factory=list)
@dataclass
class InterfaceState:
interface: Any = None
myNodeNum: int = 0
@dataclass
class AppState:
lock: Any = None

View File

@@ -55,15 +55,10 @@ def edit_value(key: str, current_value: str) -> str:
# Load theme names dynamically from the JSON # Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")] theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return get_list_input("Select Theme", current_value, theme_options) return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort": elif key == "node_sort":
sort_options = ["lastHeard", "name", "hops"] sort_options = ["lastHeard", "name", "hops"]
return get_list_input("Sort By", current_value, sort_options) return get_list_input("Sort By", current_value, sort_options)
elif key == "notification_sound":
sound_options = ["True", "False"]
return get_list_input("Notification Sound", current_value, sound_options)
# Standard Input Mode (Scrollable) # Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default")) edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1) curses.curs_set(1)

View File

@@ -1,6 +1,5 @@
import yaml import yaml
import logging import logging
import time
from typing import List from typing import List
from google.protobuf.json_format import MessageToDict from google.protobuf.json_format import MessageToDict
from meshtastic import mt_config from meshtastic import mt_config
@@ -134,29 +133,24 @@ def config_import(interface, filename):
logging.info(f"Setting device owner to {configuration['owner']}") logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True waitForAckNak = True
interface.getNode("^local", False).setOwner(configuration["owner"]) interface.getNode("^local", False).setOwner(configuration["owner"])
time.sleep(0.5)
if "owner_short" in configuration: if "owner_short" in configuration:
logging.info(f"Setting device owner short to {configuration['owner_short']}") logging.info(f"Setting device owner short to {configuration['owner_short']}")
waitForAckNak = True waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"]) interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
time.sleep(0.5)
if "ownerShort" in configuration: if "ownerShort" in configuration:
logging.info(f"Setting device owner short to {configuration['ownerShort']}") logging.info(f"Setting device owner short to {configuration['ownerShort']}")
waitForAckNak = True waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"]) interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
time.sleep(0.5)
if "channel_url" in configuration: if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}") logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode("^local").setURL(configuration["channel_url"]) interface.getNode("^local").setURL(configuration["channel_url"])
time.sleep(0.5)
if "channelUrl" in configuration: if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}") logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode("^local").setURL(configuration["channelUrl"]) interface.getNode("^local").setURL(configuration["channelUrl"])
time.sleep(0.5)
if "location" in configuration: if "location" in configuration:
alt = 0 alt = 0
@@ -175,14 +169,12 @@ def config_import(interface, filename):
logging.info(f"Fixing longitude at {lon} degrees") logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position") logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt) interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)
if "config" in configuration: if "config" in configuration:
localConfig = interface.getNode("^local").localConfig localConfig = interface.getNode("^local").localConfig
for section in configuration["config"]: for section in configuration["config"]:
traverseConfig(section, configuration["config"][section], localConfig) traverseConfig(section, configuration["config"][section], localConfig)
interface.getNode("^local").writeConfig(camel_to_snake(section)) interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
if "module_config" in configuration: if "module_config" in configuration:
moduleConfig = interface.getNode("^local").moduleConfig moduleConfig = interface.getNode("^local").moduleConfig
@@ -193,7 +185,6 @@ def config_import(interface, filename):
moduleConfig, moduleConfig,
) )
interface.getNode("^local").writeConfig(camel_to_snake(section)) interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
interface.getNode("^local", False).commitSettingsTransaction() interface.getNode("^local", False).commitSettingsTransaction()
logging.info("Writing modified configuration to device") logging.info("Writing modified configuration to device")

View File

@@ -6,14 +6,12 @@ from typing import Optional, Union, Dict
from contact.utilities.utils import decimal_to_hex from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
def get_table_name(channel: str) -> str: def get_table_name(channel: str) -> str:
# Construct the table name # Construct the table name
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages" table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name return quoted_table_name
@@ -63,7 +61,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
message_text = ? message_text = ?
""" """
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message)) db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_connection.commit() db_connection.commit()
except sqlite3.Error as e: except sqlite3.Error as e:
@@ -74,13 +72,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
def load_messages_from_db() -> None: def load_messages_from_db() -> None:
"""Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list.""" """Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
try: try:
with sqlite3.connect(config.db_file_path) as db_connection: with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor() db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?" query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",)) db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()] tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages # Iterate through each table and fetch its messages
@@ -106,15 +104,15 @@ def load_messages_from_db() -> None:
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name) # Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
channel = int(channel) if channel.isdigit() else channel channel = int(channel) if channel.isdigit() else channel
# Add the channel to ui_state.channel_list if not already present # Add the channel to globals.channel_list if not already present
if channel not in ui_state.channel_list and not is_chat_archived(channel): if channel not in globals.channel_list and not is_chat_archived(channel):
ui_state.channel_list.append(channel) globals.channel_list.append(channel)
# Ensure the channel exists in ui_state.all_messages # Ensure the channel exists in globals.all_messages
if channel not in ui_state.all_messages: if channel not in globals.all_messages:
ui_state.all_messages[channel] = [] globals.all_messages[channel] = []
# Add messages to ui_state.all_messages grouped by hourly timestamp # Add messages to globals.all_messages grouped by hourly timestamp
hourly_messages = {} hourly_messages = {}
for user_id, message, timestamp, ack_type in db_messages: for user_id, message, timestamp, ack_type in db_messages:
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00") hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
@@ -129,7 +127,7 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak": elif ack_type == "Nak":
ack_str = config.nak_str ack_str = config.nak_str
if user_id == str(interface_state.myNodeNum): if user_id == str(globals.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message) formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else: else:
formatted_message = ( formatted_message = (
@@ -139,10 +137,10 @@ def load_messages_from_db() -> None:
hourly_messages[hour].append(formatted_message) hourly_messages[hour].append(formatted_message)
# Flatten the hourly messages into ui_state.all_messages[channel] # Flatten the hourly messages into globals.all_messages[channel]
for hour, messages in sorted(hourly_messages.items()): for hour, messages in sorted(hourly_messages.items()):
ui_state.all_messages[channel].append((f"-- {hour} --", "")) globals.all_messages[channel].append((f"-- {hour} --", ""))
ui_state.all_messages[channel].extend(messages) globals.all_messages[channel].extend(messages)
except sqlite3.Error as e: except sqlite3.Error as e:
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}") logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
@@ -155,11 +153,11 @@ def init_nodedb() -> None:
"""Initialize the node database and update it with nodes from the interface.""" """Initialize the node database and update it with nodes from the interface."""
try: try:
if not interface_state.interface.nodes: if not globals.interface.nodes:
return # No nodes to initialize return # No nodes to initialize
ensure_node_table_exists() # Ensure the table exists before insertion ensure_node_table_exists() # Ensure the table exists before insertion
nodes_snapshot = list(interface_state.interface.nodes.values()) nodes_snapshot = list(globals.interface.nodes.values())
# Insert or update all nodes # Insert or update all nodes
for node in nodes_snapshot: for node in nodes_snapshot:
@@ -216,7 +214,7 @@ def update_node_info_in_db(
with sqlite3.connect(config.db_file_path) as db_connection: with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor() db_cursor = db_connection.cursor()
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")] table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
if "chat_archived" not in table_columns: if "chat_archived" not in table_columns:
@@ -280,7 +278,7 @@ def update_node_info_in_db(
def ensure_node_table_exists() -> None: def ensure_node_table_exists() -> None:
"""Ensure the node database table exists.""" """Ensure the node database table exists."""
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
schema = """ schema = """
user_id TEXT PRIMARY KEY, user_id TEXT PRIMARY KEY,
long_name TEXT, long_name TEXT,
@@ -321,7 +319,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
db_cursor = db_connection.cursor() db_cursor = db_connection.cursor()
# Construct table name # Construct table name
table_name = f"{str(interface_state.myNodeNum)}_nodedb" table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch # Determine the correct column to fetch
@@ -347,7 +345,7 @@ def is_chat_archived(user_id: int) -> int:
try: try:
with sqlite3.connect(config.db_file_path) as db_connection: with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor() db_cursor = db_connection.cursor()
table_name = f"{str(interface_state.myNodeNum)}_nodedb" table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?" query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,)) db_cursor.execute(query, (user_id,))

View File

@@ -104,13 +104,11 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
return [base64.b64encode(b).decode() for b in byte_strings] return [base64.b64encode(b).decode() for b in byte_strings]
def is_valid_base64(s): def is_valid_base64(s):
"""Check if a string is valid Base64 or blank.""" """Check if a string is valid Base64."""
if s == "":
return True
try: try:
decoded = base64.b64decode(s, validate=True) decoded = base64.b64decode(s, validate=True)
return len(decoded) == 32 # Ensure it's exactly 32 bytes return len(decoded) == 32 # Ensure it's exactly 32 bytes
except (binascii.Error, ValueError): except binascii.Error:
return False return False
cvalue = to_base64(current_value) # Convert current values to Base64 cvalue = to_base64(current_value) # Convert current values to Base64

View File

@@ -1,5 +0,0 @@
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState
ui_state = ChatUIState()
interface_state = InterfaceState()
app_state = AppState()

View File

@@ -1,18 +1,16 @@
import contact.globals as globals
import datetime import datetime
import time
from meshtastic.protobuf import config_pb2 from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
def get_channels(): def get_channels():
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages.""" """Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
node = interface_state.interface.getNode("^local") node = globals.interface.getNode("^local")
device_channels = node.channels device_channels = node.channels
# Clear and rebuild channel list # Clear and rebuild channel list
# ui_state.channel_list = [] # globals.channel_list = []
for device_channel in device_channels: for device_channel in device_channels:
if device_channel.role: if device_channel.role:
@@ -28,20 +26,20 @@ def get_channels():
].name ].name
channel_name = convert_to_camel_case(modem_preset_string) channel_name = convert_to_camel_case(modem_preset_string)
# Add channel to ui_state.channel_list if not already present # Add channel to globals.channel_list if not already present
if channel_name not in ui_state.channel_list: if channel_name not in globals.channel_list:
ui_state.channel_list.append(channel_name) globals.channel_list.append(channel_name)
# Initialize ui_state.all_messages[channel_name] if it doesn't exist # Initialize globals.all_messages[channel_name] if it doesn't exist
if channel_name not in ui_state.all_messages: if channel_name not in globals.all_messages:
ui_state.all_messages[channel_name] = [] globals.all_messages[channel_name] = []
return ui_state.channel_list return globals.channel_list
def get_node_list(): def get_node_list():
if interface_state.interface.nodes: if globals.interface.nodes:
my_node_num = interface_state.myNodeNum my_node_num = globals.myNodeNum
def node_sort(node): def node_sort(node):
if config.node_sort == "lastHeard": if config.node_sort == "lastHeard":
@@ -53,7 +51,7 @@ def get_node_list():
else: else:
return node return node
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort) sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
# Move favorite nodes to the beginning # Move favorite nodes to the beginning
sorted_nodes = sorted( sorted_nodes = sorted(
@@ -70,14 +68,14 @@ def get_node_list():
def refresh_node_list(): def refresh_node_list():
new_node_list = get_node_list() new_node_list = get_node_list()
if new_node_list != ui_state.node_list: if new_node_list != globals.node_list:
ui_state.node_list = new_node_list globals.node_list = new_node_list
return True return True
return False return False
def get_nodeNum(): def get_nodeNum():
myinfo = interface_state.interface.getMyNodeInfo() myinfo = globals.interface.getMyNodeInfo()
myNodeNum = myinfo["num"] myNodeNum = myinfo["num"]
return myNodeNum return myNodeNum
@@ -135,31 +133,3 @@ def get_time_ago(timestamp):
if unit != "s": if unit != "s":
return f"{value} {unit} ago" return f"{value} {unit} ago"
return "now" return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ui_state.all_messages[channel_id].append((prefix,message))

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "contact" name = "contact"
version = "1.3.15" version = "1.3.8"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores." description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [ authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"} {name = "Ben Lipsey",email = "ben@pdxlocations.com"}