forked from iarv/contact
Compare commits
1 Commits
errors
...
chat-windo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d1a489fd1 |
@@ -24,6 +24,7 @@ import traceback
|
|||||||
from pubsub import pub
|
from pubsub import pub
|
||||||
|
|
||||||
# Local application
|
# Local application
|
||||||
|
import contact.globals as globals
|
||||||
import contact.ui.default_config as config
|
import contact.ui.default_config as config
|
||||||
from contact.message_handlers.rx_handler import on_receive
|
from contact.message_handlers.rx_handler import on_receive
|
||||||
from contact.settings import set_region
|
from contact.settings import set_region
|
||||||
@@ -35,7 +36,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
|
|||||||
from contact.utilities.input_handlers import get_list_input
|
from contact.utilities.input_handlers import get_list_input
|
||||||
from contact.utilities.interfaces import initialize_interface
|
from contact.utilities.interfaces import initialize_interface
|
||||||
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
|
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
|
||||||
from contact.utilities.singleton import ui_state, interface_state, app_state
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Environment & Logging Setup
|
# Environment & Logging Setup
|
||||||
@@ -51,27 +52,28 @@ logging.basicConfig(
|
|||||||
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||||||
)
|
)
|
||||||
|
|
||||||
app_state.lock = threading.Lock()
|
globals.lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Main Program Logic
|
# Main Program Logic
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
def prompt_region_if_unset(args: object) -> None:
|
|
||||||
"""Prompt user to set region if it is unset."""
|
|
||||||
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
|
|
||||||
if confirmation == "Yes":
|
|
||||||
set_region(interface_state.interface)
|
|
||||||
interface_state.interface.close()
|
|
||||||
interface_state.interface = initialize_interface(args)
|
|
||||||
|
|
||||||
|
|
||||||
def initialize_globals(args: object) -> None:
|
def initialize_globals(args) -> None:
|
||||||
"""Initializes interface and shared globals."""
|
"""Initializes interface and shared globals."""
|
||||||
|
globals.interface = initialize_interface(args)
|
||||||
|
|
||||||
interface_state.myNodeNum = get_nodeNum()
|
# Prompt for region if unset
|
||||||
ui_state.channel_list = get_channels()
|
if globals.interface.localNode.localConfig.lora.region == 0:
|
||||||
ui_state.node_list = get_node_list()
|
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
|
||||||
|
if confirmation == "Yes":
|
||||||
|
set_region(globals.interface)
|
||||||
|
globals.interface.close()
|
||||||
|
globals.interface = initialize_interface(args)
|
||||||
|
|
||||||
|
globals.myNodeNum = get_nodeNum()
|
||||||
|
globals.channel_list = get_channels()
|
||||||
|
globals.node_list = get_node_list()
|
||||||
pub.subscribe(on_receive, "meshtastic.receive")
|
pub.subscribe(on_receive, "meshtastic.receive")
|
||||||
|
|
||||||
init_nodedb()
|
init_nodedb()
|
||||||
@@ -80,63 +82,55 @@ def initialize_globals(args: object) -> None:
|
|||||||
|
|
||||||
def main(stdscr: curses.window) -> None:
|
def main(stdscr: curses.window) -> None:
|
||||||
"""Main entry point for the curses UI."""
|
"""Main entry point for the curses UI."""
|
||||||
|
|
||||||
output_capture = io.StringIO()
|
output_capture = io.StringIO()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
setup_colors()
|
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
||||||
draw_splash(stdscr)
|
setup_colors()
|
||||||
|
draw_splash(stdscr)
|
||||||
|
|
||||||
args = setup_parser().parse_args()
|
args = setup_parser().parse_args()
|
||||||
|
|
||||||
if getattr(args, "settings", False):
|
if getattr(args, "settings", False):
|
||||||
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
|
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
|
||||||
return
|
return
|
||||||
|
|
||||||
logging.info("Initializing interface...")
|
logging.info("Initializing interface...")
|
||||||
with app_state.lock:
|
with globals.lock:
|
||||||
interface_state.interface = initialize_interface(args)
|
initialize_globals(args)
|
||||||
|
logging.info("Starting main UI")
|
||||||
|
|
||||||
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
main_ui(stdscr)
|
||||||
prompt_region_if_unset(args)
|
|
||||||
|
|
||||||
initialize_globals(args)
|
except Exception as e:
|
||||||
logging.info("Starting main UI")
|
console_output = output_capture.getvalue()
|
||||||
|
logging.error("Uncaught exception: %s", e)
|
||||||
try:
|
logging.error("Traceback: %s", traceback.format_exc())
|
||||||
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
logging.error("Console output:\n%s", console_output)
|
||||||
main_ui(stdscr)
|
|
||||||
except Exception:
|
|
||||||
console_output = output_capture.getvalue()
|
|
||||||
logging.error("Uncaught exception inside main_ui")
|
|
||||||
logging.error("Traceback:\n%s", traceback.format_exc())
|
|
||||||
logging.error("Console output:\n%s", console_output)
|
|
||||||
return
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
def start() -> None:
|
def start() -> None:
|
||||||
"""Entry point for the application."""
|
"""Launch curses wrapper and redirect logs to file."""
|
||||||
|
|
||||||
if "--help" in sys.argv or "-h" in sys.argv:
|
if "--help" in sys.argv or "-h" in sys.argv:
|
||||||
setup_parser().print_help()
|
setup_parser().print_help()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
try:
|
with open(config.log_file_path, "a", buffering=1) as log_f:
|
||||||
curses.wrapper(main)
|
sys.stdout = log_f
|
||||||
except KeyboardInterrupt:
|
sys.stderr = log_f
|
||||||
logging.info("User exited with Ctrl+C")
|
|
||||||
sys.exit(0)
|
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
|
||||||
except Exception as e:
|
try:
|
||||||
logging.critical("Fatal error", exc_info=True)
|
curses.wrapper(main)
|
||||||
try:
|
except KeyboardInterrupt:
|
||||||
curses.endwin()
|
logging.info("User exited with Ctrl+C")
|
||||||
except Exception:
|
sys.exit(0)
|
||||||
pass
|
except Exception as e:
|
||||||
print("Fatal error:", e)
|
logging.error("Fatal error: %s", e)
|
||||||
traceback.print_exc()
|
logging.error("Traceback: %s", traceback.format_exc())
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
13
contact/globals.py
Normal file
13
contact/globals.py
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
interface = None
|
||||||
|
lock = None
|
||||||
|
display_log = False
|
||||||
|
all_messages = {}
|
||||||
|
channel_list = []
|
||||||
|
notifications = []
|
||||||
|
packet_buffer = []
|
||||||
|
node_list = []
|
||||||
|
myNodeNum = 0
|
||||||
|
selected_channel = 0
|
||||||
|
selected_message = 0
|
||||||
|
selected_node = 0
|
||||||
|
current_window = 0
|
||||||
@@ -1,14 +1,9 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import time
|
||||||
import platform
|
from datetime import datetime
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
from contact.utilities.utils import (
|
from contact.utilities.utils import refresh_node_list
|
||||||
refresh_node_list,
|
|
||||||
add_new_message,
|
|
||||||
)
|
|
||||||
from contact.ui.contact_ui import (
|
from contact.ui.contact_ui import (
|
||||||
draw_packetlog_win,
|
draw_packetlog_win,
|
||||||
draw_node_list,
|
draw_node_list,
|
||||||
@@ -23,48 +18,7 @@ from contact.utilities.db_handler import (
|
|||||||
update_node_info_in_db,
|
update_node_info_in_db,
|
||||||
)
|
)
|
||||||
import contact.ui.default_config as config
|
import contact.ui.default_config as config
|
||||||
|
import contact.globals as globals
|
||||||
from contact.utilities.singleton import ui_state, interface_state, app_state
|
|
||||||
|
|
||||||
|
|
||||||
def play_sound():
|
|
||||||
try:
|
|
||||||
system = platform.system()
|
|
||||||
sound_path = None
|
|
||||||
executable = None
|
|
||||||
|
|
||||||
if system == "Darwin": # macOS
|
|
||||||
sound_path = "/System/Library/Sounds/Ping.aiff"
|
|
||||||
executable = "afplay"
|
|
||||||
|
|
||||||
elif system == "Linux":
|
|
||||||
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
|
||||||
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
|
|
||||||
|
|
||||||
if shutil.which("paplay") and os.path.exists(ogg_path):
|
|
||||||
executable = "paplay"
|
|
||||||
sound_path = ogg_path
|
|
||||||
elif shutil.which("ffplay") and os.path.exists(ogg_path):
|
|
||||||
executable = "ffplay"
|
|
||||||
sound_path = ogg_path
|
|
||||||
elif shutil.which("aplay") and os.path.exists(wav_path):
|
|
||||||
executable = "aplay"
|
|
||||||
sound_path = wav_path
|
|
||||||
else:
|
|
||||||
logging.warning("No suitable sound player or sound file found on Linux")
|
|
||||||
|
|
||||||
if executable and sound_path:
|
|
||||||
cmd = [executable, sound_path]
|
|
||||||
if executable == "ffplay":
|
|
||||||
cmd = [executable, "-nodisp", "-autoexit", sound_path]
|
|
||||||
|
|
||||||
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
||||||
return
|
|
||||||
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
logging.error(f"Sound playback failed: {e}")
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"Unexpected error: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||||
@@ -75,14 +29,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
|||||||
packet: The received Meshtastic packet as a dictionary.
|
packet: The received Meshtastic packet as a dictionary.
|
||||||
interface: The Meshtastic interface instance that received the packet.
|
interface: The Meshtastic interface instance that received the packet.
|
||||||
"""
|
"""
|
||||||
with app_state.lock:
|
with globals.lock:
|
||||||
# Update packet log
|
# Update packet log
|
||||||
ui_state.packet_buffer.append(packet)
|
globals.packet_buffer.append(packet)
|
||||||
if len(ui_state.packet_buffer) > 20:
|
if len(globals.packet_buffer) > 20:
|
||||||
# Trim buffer to 20 packets
|
# Trim buffer to 20 packets
|
||||||
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
|
globals.packet_buffer = globals.packet_buffer[-20:]
|
||||||
|
|
||||||
if ui_state.display_log:
|
if globals.display_log:
|
||||||
draw_packetlog_win()
|
draw_packetlog_win()
|
||||||
try:
|
try:
|
||||||
if "decoded" not in packet:
|
if "decoded" not in packet:
|
||||||
@@ -98,10 +52,6 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
|||||||
maybe_store_nodeinfo_in_db(packet)
|
maybe_store_nodeinfo_in_db(packet)
|
||||||
|
|
||||||
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
|
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
|
||||||
|
|
||||||
if config.notification_sound == "True":
|
|
||||||
play_sound()
|
|
||||||
|
|
||||||
message_bytes = packet["decoded"]["payload"]
|
message_bytes = packet["decoded"]["payload"]
|
||||||
message_string = message_bytes.decode("utf-8")
|
message_string = message_bytes.decode("utf-8")
|
||||||
|
|
||||||
@@ -113,21 +63,19 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
|||||||
else:
|
else:
|
||||||
channel_number = 0
|
channel_number = 0
|
||||||
|
|
||||||
if packet["to"] == interface_state.myNodeNum:
|
if packet["to"] == globals.myNodeNum:
|
||||||
if packet["from"] in ui_state.channel_list:
|
if packet["from"] in globals.channel_list:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
ui_state.channel_list.append(packet["from"])
|
globals.channel_list.append(packet["from"])
|
||||||
if packet["from"] not in ui_state.all_messages:
|
if packet["from"] not in globals.all_messages:
|
||||||
ui_state.all_messages[packet["from"]] = []
|
globals.all_messages[packet["from"]] = []
|
||||||
update_node_info_in_db(packet["from"], chat_archived=False)
|
update_node_info_in_db(packet["from"], chat_archived=False)
|
||||||
refresh_channels = True
|
refresh_channels = True
|
||||||
|
|
||||||
channel_number = ui_state.channel_list.index(packet["from"])
|
channel_number = globals.channel_list.index(packet["from"])
|
||||||
|
|
||||||
channel_id = ui_state.channel_list[channel_number]
|
if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
|
||||||
|
|
||||||
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
|
|
||||||
add_notification(channel_number)
|
add_notification(channel_number)
|
||||||
refresh_channels = True
|
refresh_channels = True
|
||||||
else:
|
else:
|
||||||
@@ -137,14 +85,40 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
|||||||
message_from_id = packet["from"]
|
message_from_id = packet["from"]
|
||||||
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
|
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
|
||||||
|
|
||||||
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string)
|
if globals.channel_list[channel_number] not in globals.all_messages:
|
||||||
|
globals.all_messages[globals.channel_list[channel_number]] = []
|
||||||
|
|
||||||
|
# Timestamp handling
|
||||||
|
current_timestamp = time.time()
|
||||||
|
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||||
|
|
||||||
|
# Retrieve the last timestamp if available
|
||||||
|
channel_messages = globals.all_messages[globals.channel_list[channel_number]]
|
||||||
|
if channel_messages:
|
||||||
|
# Check the last entry for a timestamp
|
||||||
|
for entry in reversed(channel_messages):
|
||||||
|
if entry[0].startswith("--"):
|
||||||
|
last_hour = entry[0].strip("- ").strip()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
last_hour = None
|
||||||
|
else:
|
||||||
|
last_hour = None
|
||||||
|
|
||||||
|
# Add a new timestamp if it's a new hour
|
||||||
|
if last_hour != current_hour:
|
||||||
|
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
|
||||||
|
|
||||||
|
globals.all_messages[globals.channel_list[channel_number]].append(
|
||||||
|
(f"{config.message_prefix} {message_from_string} ", message_string)
|
||||||
|
)
|
||||||
|
|
||||||
if refresh_channels:
|
if refresh_channels:
|
||||||
draw_channel_list()
|
draw_channel_list()
|
||||||
if refresh_messages:
|
if refresh_messages:
|
||||||
draw_messages_window(True)
|
draw_messages_window(True)
|
||||||
|
|
||||||
save_message_to_db(channel_id, message_from_id, message_string)
|
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
|
||||||
|
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
logging.error(f"Error processing packet: {e}")
|
logging.error(f"Error processing packet: {e}")
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
from datetime import datetime
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
import google.protobuf.json_format
|
import google.protobuf.json_format
|
||||||
@@ -12,10 +13,7 @@ from contact.utilities.db_handler import (
|
|||||||
update_node_info_in_db,
|
update_node_info_in_db,
|
||||||
)
|
)
|
||||||
import contact.ui.default_config as config
|
import contact.ui.default_config as config
|
||||||
|
import contact.globals as globals
|
||||||
from contact.utilities.singleton import ui_state, interface_state
|
|
||||||
|
|
||||||
from contact.utilities.utils import add_new_message
|
|
||||||
|
|
||||||
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
|
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
|
||||||
|
|
||||||
@@ -33,12 +31,12 @@ def onAckNak(packet: Dict[str, Any]) -> None:
|
|||||||
return
|
return
|
||||||
|
|
||||||
acknak = ack_naks.pop(request)
|
acknak = ack_naks.pop(request)
|
||||||
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
|
message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
|
||||||
|
|
||||||
confirm_string = " "
|
confirm_string = " "
|
||||||
ack_type = None
|
ack_type = None
|
||||||
if packet["decoded"]["routing"]["errorReason"] == "NONE":
|
if packet["decoded"]["routing"]["errorReason"] == "NONE":
|
||||||
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
|
if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
|
||||||
confirm_string = config.ack_implicit_str
|
confirm_string = config.ack_implicit_str
|
||||||
ack_type = "Implicit"
|
ack_type = "Implicit"
|
||||||
else:
|
else:
|
||||||
@@ -48,15 +46,15 @@ def onAckNak(packet: Dict[str, Any]) -> None:
|
|||||||
confirm_string = config.nak_str
|
confirm_string = config.nak_str
|
||||||
ack_type = "Nak"
|
ack_type = "Nak"
|
||||||
|
|
||||||
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
|
globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
|
||||||
config.sent_message_prefix + confirm_string + ": ",
|
config.sent_message_prefix + confirm_string + ": ",
|
||||||
message,
|
message,
|
||||||
)
|
)
|
||||||
|
|
||||||
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
|
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
|
||||||
|
|
||||||
channel_number = ui_state.channel_list.index(acknak["channel"])
|
channel_number = globals.channel_list.index(acknak["channel"])
|
||||||
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
|
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
|
||||||
draw_messages_window()
|
draw_messages_window()
|
||||||
|
|
||||||
|
|
||||||
@@ -139,17 +137,16 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
|||||||
|
|
||||||
msg_str += route_str + "\n" # Print the route back to us
|
msg_str += route_str + "\n" # Print the route back to us
|
||||||
|
|
||||||
if packet["from"] not in ui_state.channel_list:
|
if packet["from"] not in globals.channel_list:
|
||||||
ui_state.channel_list.append(packet["from"])
|
globals.channel_list.append(packet["from"])
|
||||||
refresh_channels = True
|
refresh_channels = True
|
||||||
|
|
||||||
if is_chat_archived(packet["from"]):
|
if is_chat_archived(packet["from"]):
|
||||||
update_node_info_in_db(packet["from"], chat_archived=False)
|
update_node_info_in_db(packet["from"], chat_archived=False)
|
||||||
|
|
||||||
channel_number = ui_state.channel_list.index(packet["from"])
|
channel_number = globals.channel_list.index(packet["from"])
|
||||||
channel_id = ui_state.channel_list[channel_number]
|
|
||||||
|
|
||||||
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
|
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
|
||||||
refresh_messages = True
|
refresh_messages = True
|
||||||
else:
|
else:
|
||||||
add_notification(channel_number)
|
add_notification(channel_number)
|
||||||
@@ -157,29 +154,33 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
|||||||
|
|
||||||
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
|
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
|
||||||
|
|
||||||
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
|
if globals.channel_list[channel_number] not in globals.all_messages:
|
||||||
|
globals.all_messages[globals.channel_list[channel_number]] = []
|
||||||
|
globals.all_messages[globals.channel_list[channel_number]].append(
|
||||||
|
(f"{config.message_prefix} {message_from_string}", msg_str)
|
||||||
|
)
|
||||||
|
|
||||||
if refresh_channels:
|
if refresh_channels:
|
||||||
draw_channel_list()
|
draw_channel_list()
|
||||||
if refresh_messages:
|
if refresh_messages:
|
||||||
draw_messages_window(True)
|
draw_messages_window(True)
|
||||||
|
save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str)
|
||||||
|
|
||||||
save_message_to_db(channel_id, packet["from"], msg_str)
|
|
||||||
|
|
||||||
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
|
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
|
||||||
"""
|
"""
|
||||||
Sends a chat message using the selected channel.
|
Sends a chat message using the selected channel.
|
||||||
"""
|
"""
|
||||||
myid = interface_state.myNodeNum
|
myid = globals.myNodeNum
|
||||||
send_on_channel = 0
|
send_on_channel = 0
|
||||||
channel_id = ui_state.channel_list[channel]
|
channel_id = globals.channel_list[channel]
|
||||||
if isinstance(channel_id, int):
|
if isinstance(channel_id, int):
|
||||||
send_on_channel = 0
|
send_on_channel = 0
|
||||||
destination = channel_id
|
destination = channel_id
|
||||||
elif isinstance(channel_id, str):
|
elif isinstance(channel_id, str):
|
||||||
send_on_channel = channel
|
send_on_channel = channel
|
||||||
|
|
||||||
sent_message_data = interface_state.interface.sendText(
|
sent_message_data = globals.interface.sendText(
|
||||||
text=message,
|
text=message,
|
||||||
destinationId=destination,
|
destinationId=destination,
|
||||||
wantAck=True,
|
wantAck=True,
|
||||||
@@ -188,13 +189,38 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
|
|||||||
channelIndex=send_on_channel,
|
channelIndex=send_on_channel,
|
||||||
)
|
)
|
||||||
|
|
||||||
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
|
# Add sent message to the messages dictionary
|
||||||
|
if channel_id not in globals.all_messages:
|
||||||
|
globals.all_messages[channel_id] = []
|
||||||
|
|
||||||
|
# Handle timestamp logic
|
||||||
|
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
|
||||||
|
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||||
|
|
||||||
|
# Retrieve the last timestamp if available
|
||||||
|
channel_messages = globals.all_messages[channel_id]
|
||||||
|
if channel_messages:
|
||||||
|
# Check the last entry for a timestamp
|
||||||
|
for entry in reversed(channel_messages):
|
||||||
|
if entry[0].startswith("--"):
|
||||||
|
last_hour = entry[0].strip("- ").strip()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
last_hour = None
|
||||||
|
else:
|
||||||
|
last_hour = None
|
||||||
|
|
||||||
|
# Add a new timestamp if it's a new hour
|
||||||
|
if last_hour != current_hour:
|
||||||
|
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
||||||
|
|
||||||
|
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
|
||||||
|
|
||||||
timestamp = save_message_to_db(channel_id, myid, message)
|
timestamp = save_message_to_db(channel_id, myid, message)
|
||||||
|
|
||||||
ack_naks[sent_message_data.id] = {
|
ack_naks[sent_message_data.id] = {
|
||||||
"channel": channel_id,
|
"channel": channel_id,
|
||||||
"messageIndex": len(ui_state.all_messages[channel_id]) - 1,
|
"messageIndex": len(globals.all_messages[channel_id]) - 1,
|
||||||
"timestamp": timestamp,
|
"timestamp": timestamp,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -203,14 +229,10 @@ def send_traceroute() -> None:
|
|||||||
"""
|
"""
|
||||||
Sends a RouteDiscovery protobuf to the selected node.
|
Sends a RouteDiscovery protobuf to the selected node.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
channel_id = ui_state.node_list[ui_state.selected_node]
|
|
||||||
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
|
|
||||||
|
|
||||||
r = mesh_pb2.RouteDiscovery()
|
r = mesh_pb2.RouteDiscovery()
|
||||||
interface_state.interface.sendData(
|
globals.interface.sendData(
|
||||||
r,
|
r,
|
||||||
destinationId=channel_id,
|
destinationId=globals.node_list[globals.selected_node],
|
||||||
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
|
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
|
||||||
wantResponse=True,
|
wantResponse=True,
|
||||||
onResponse=on_response_traceroute,
|
onResponse=on_response_traceroute,
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -123,18 +123,15 @@ def initialize_config() -> Dict[str, object]:
|
|||||||
"settings_breadcrumbs": ["green", "black"],
|
"settings_breadcrumbs": ["green", "black"],
|
||||||
"settings_warning": ["green", "black"],
|
"settings_warning": ["green", "black"],
|
||||||
"settings_note": ["green", "black"],
|
"settings_note": ["green", "black"],
|
||||||
"node_favorite": ["cyan", "green"],
|
"node_favorite": ["cyan", "white"],
|
||||||
"node_ignored": ["red", "black"],
|
"node_ignored": ["red", "white"],
|
||||||
}
|
}
|
||||||
default_config_variables = {
|
default_config_variables = {
|
||||||
"channel_list_16ths": "3",
|
|
||||||
"node_list_16ths": "5",
|
|
||||||
"db_file_path": db_file_path,
|
"db_file_path": db_file_path,
|
||||||
"log_file_path": log_file_path,
|
"log_file_path": log_file_path,
|
||||||
"message_prefix": ">>",
|
"message_prefix": ">>",
|
||||||
"sent_message_prefix": ">> Sent",
|
"sent_message_prefix": ">> Sent",
|
||||||
"notification_symbol": "*",
|
"notification_symbol": "*",
|
||||||
"notification_sound": "True",
|
|
||||||
"ack_implicit_str": "[◌]",
|
"ack_implicit_str": "[◌]",
|
||||||
"ack_str": "[✓]",
|
"ack_str": "[✓]",
|
||||||
"nak_str": "[x]",
|
"nak_str": "[x]",
|
||||||
@@ -173,23 +170,18 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
|
|||||||
|
|
||||||
global db_file_path, log_file_path, message_prefix, sent_message_prefix
|
global db_file_path, log_file_path, message_prefix, sent_message_prefix
|
||||||
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
|
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
|
||||||
global node_list_16ths, channel_list_16ths
|
|
||||||
global theme, COLOR_CONFIG
|
global theme, COLOR_CONFIG
|
||||||
global node_sort, notification_sound
|
global node_sort
|
||||||
|
|
||||||
channel_list_16ths = loaded_config["channel_list_16ths"]
|
|
||||||
node_list_16ths = loaded_config["node_list_16ths"]
|
|
||||||
db_file_path = loaded_config["db_file_path"]
|
db_file_path = loaded_config["db_file_path"]
|
||||||
log_file_path = loaded_config["log_file_path"]
|
log_file_path = loaded_config["log_file_path"]
|
||||||
message_prefix = loaded_config["message_prefix"]
|
message_prefix = loaded_config["message_prefix"]
|
||||||
sent_message_prefix = loaded_config["sent_message_prefix"]
|
sent_message_prefix = loaded_config["sent_message_prefix"]
|
||||||
notification_symbol = loaded_config["notification_symbol"]
|
notification_symbol = loaded_config["notification_symbol"]
|
||||||
notification_sound = loaded_config["notification_sound"]
|
|
||||||
ack_implicit_str = loaded_config["ack_implicit_str"]
|
ack_implicit_str = loaded_config["ack_implicit_str"]
|
||||||
ack_str = loaded_config["ack_str"]
|
ack_str = loaded_config["ack_str"]
|
||||||
nak_str = loaded_config["nak_str"]
|
nak_str = loaded_config["nak_str"]
|
||||||
ack_unknown_str = loaded_config["ack_unknown_str"]
|
ack_unknown_str = loaded_config["ack_unknown_str"]
|
||||||
node_sort = loaded_config["node_sort"]
|
|
||||||
theme = loaded_config["theme"]
|
theme = loaded_config["theme"]
|
||||||
if theme == "dark":
|
if theme == "dark":
|
||||||
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
|
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
|
||||||
@@ -197,6 +189,7 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
|
|||||||
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
|
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
|
||||||
elif theme == "green":
|
elif theme == "green":
|
||||||
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
|
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
|
||||||
|
node_sort = loaded_config["node_sort"]
|
||||||
|
|
||||||
|
|
||||||
# Call the function when the script is imported
|
# Call the function when the script is imported
|
||||||
|
|||||||
@@ -1,22 +1,8 @@
|
|||||||
import curses
|
import curses
|
||||||
import re
|
import re
|
||||||
from unicodedata import east_asian_width
|
|
||||||
|
|
||||||
from contact.ui.colors import get_color
|
from contact.ui.colors import get_color
|
||||||
from contact.utilities.control_utils import transform_menu_path
|
from contact.utilities.control_utils import transform_menu_path
|
||||||
from typing import Any, Optional, List, Dict
|
from typing import Any, Optional, List, Dict
|
||||||
from contact.utilities.singleton import interface_state, ui_state
|
|
||||||
|
|
||||||
|
|
||||||
def get_node_color(node_index: int, reverse: bool = False):
|
|
||||||
node_num = ui_state.node_list[node_index]
|
|
||||||
node = interface_state.interface.nodesByNum.get(node_num, {})
|
|
||||||
if node.get("isFavorite"):
|
|
||||||
return get_color("node_favorite", reverse=reverse)
|
|
||||||
elif node.get("isIgnored"):
|
|
||||||
return get_color("node_ignored", reverse=reverse)
|
|
||||||
return get_color("settings_default", reverse=reverse)
|
|
||||||
|
|
||||||
|
|
||||||
# Aliases
|
# Aliases
|
||||||
Segment = tuple[str, str, bool, bool]
|
Segment = tuple[str, str, bool, bool]
|
||||||
@@ -142,6 +128,7 @@ def draw_arrows(
|
|||||||
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
|
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
# vh = visible_height + (1 if show_save_option else 0)
|
||||||
mi = max_index - (2 if show_save_option else 0)
|
mi = max_index - (2 if show_save_option else 0)
|
||||||
|
|
||||||
if visible_height < mi:
|
if visible_height < mi:
|
||||||
@@ -295,16 +282,9 @@ def get_wrapped_help_text(
|
|||||||
|
|
||||||
return wrapped_help
|
return wrapped_help
|
||||||
|
|
||||||
def text_width(text: str) -> int:
|
|
||||||
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
|
|
||||||
|
|
||||||
def wrap_text(text: str, wrap_width: int) -> List[str]:
|
def wrap_text(text: str, wrap_width: int) -> List[str]:
|
||||||
"""Wraps text while preserving spaces and breaking long words."""
|
"""Wraps text while preserving spaces and breaking long words."""
|
||||||
|
|
||||||
whitespace = '\t\n\x0b\x0c\r '
|
|
||||||
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
|
|
||||||
text = text.translate(whitespace_trans)
|
|
||||||
|
|
||||||
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
|
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
|
||||||
wrapped_lines = []
|
wrapped_lines = []
|
||||||
line_buffer = ""
|
line_buffer = ""
|
||||||
@@ -313,11 +293,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
|
|||||||
wrap_width -= margin
|
wrap_width -= margin
|
||||||
|
|
||||||
for word in words:
|
for word in words:
|
||||||
word_length = text_width(word)
|
word_length = len(word)
|
||||||
|
|
||||||
if word_length > wrap_width: # Break long words
|
if word_length > wrap_width: # Break long words
|
||||||
if line_buffer:
|
if line_buffer:
|
||||||
wrapped_lines.append(line_buffer.strip())
|
wrapped_lines.append(line_buffer)
|
||||||
line_buffer = ""
|
line_buffer = ""
|
||||||
line_length = 0
|
line_length = 0
|
||||||
for i in range(0, word_length, wrap_width):
|
for i in range(0, word_length, wrap_width):
|
||||||
@@ -325,7 +305,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if line_length + word_length > wrap_width and word.strip():
|
if line_length + word_length > wrap_width and word.strip():
|
||||||
wrapped_lines.append(line_buffer.strip())
|
wrapped_lines.append(line_buffer)
|
||||||
line_buffer = ""
|
line_buffer = ""
|
||||||
line_length = 0
|
line_length = 0
|
||||||
|
|
||||||
@@ -333,94 +313,6 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
|
|||||||
line_length += word_length
|
line_length += word_length
|
||||||
|
|
||||||
if line_buffer:
|
if line_buffer:
|
||||||
wrapped_lines.append(line_buffer.strip())
|
wrapped_lines.append(line_buffer)
|
||||||
|
|
||||||
return wrapped_lines
|
return wrapped_lines
|
||||||
|
|
||||||
|
|
||||||
def move_main_highlight(
|
|
||||||
old_idx: int, new_idx, options: List[str], menu_win: curses.window, menu_pad: curses.window, ui_state: object
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
if old_idx == new_idx: # No-op
|
|
||||||
return
|
|
||||||
|
|
||||||
max_index = len(options) - 1
|
|
||||||
visible_height = menu_win.getmaxyx()[0] - 2
|
|
||||||
|
|
||||||
if new_idx < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
|
|
||||||
ui_state.start_index[ui_state.current_window] = new_idx
|
|
||||||
elif new_idx >= ui_state.start_index[ui_state.current_window] + visible_height: # Moving below the visible area
|
|
||||||
ui_state.start_index[ui_state.current_window] = new_idx - visible_height + 1
|
|
||||||
|
|
||||||
# Ensure start_index is within bounds
|
|
||||||
ui_state.start_index[ui_state.current_window] = max(
|
|
||||||
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
|
|
||||||
)
|
|
||||||
|
|
||||||
highlight_line(menu_win, menu_pad, old_idx, new_idx, visible_height)
|
|
||||||
|
|
||||||
if ui_state.current_window == 0: # hack to fix max_index
|
|
||||||
max_index += 1
|
|
||||||
|
|
||||||
draw_main_arrows(menu_win, max_index, window=ui_state.current_window)
|
|
||||||
menu_win.refresh()
|
|
||||||
|
|
||||||
|
|
||||||
def highlight_line(
|
|
||||||
menu_win: curses.window, menu_pad: curses.window, old_idx: int, new_idx: int, visible_height: int
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
if ui_state.current_window == 0:
|
|
||||||
color_old = (
|
|
||||||
get_color("channel_selected") if old_idx == ui_state.selected_channel else get_color("channel_list")
|
|
||||||
)
|
|
||||||
color_new = get_color("channel_list", reverse=True) if True else get_color("channel_list", reverse=True)
|
|
||||||
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, color_old)
|
|
||||||
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
|
|
||||||
|
|
||||||
elif ui_state.current_window == 2:
|
|
||||||
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
|
|
||||||
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
|
|
||||||
|
|
||||||
menu_win.refresh()
|
|
||||||
|
|
||||||
# Refresh pad only if scrolling is needed
|
|
||||||
menu_pad.refresh(
|
|
||||||
ui_state.start_index[ui_state.current_window],
|
|
||||||
0,
|
|
||||||
menu_win.getbegyx()[0] + 1,
|
|
||||||
menu_win.getbegyx()[1] + 1,
|
|
||||||
menu_win.getbegyx()[0] + visible_height,
|
|
||||||
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 3,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None:
|
|
||||||
|
|
||||||
height, width = win.getmaxyx()
|
|
||||||
usable_height = height - 2
|
|
||||||
usable_width = width - 2
|
|
||||||
|
|
||||||
if window == 1 and ui_state.display_log:
|
|
||||||
if log_height := kwargs.get("log_height"):
|
|
||||||
usable_height -= log_height - 1
|
|
||||||
|
|
||||||
if usable_height < max_index:
|
|
||||||
if ui_state.start_index[window] > 0:
|
|
||||||
win.addstr(1, usable_width, "▲", get_color("settings_default"))
|
|
||||||
else:
|
|
||||||
win.addstr(1, usable_width, " ", get_color("settings_default"))
|
|
||||||
|
|
||||||
if max_index - ui_state.start_index[window] - 1 >= usable_height:
|
|
||||||
win.addstr(usable_height, usable_width, "▼", get_color("settings_default"))
|
|
||||||
else:
|
|
||||||
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
|
|
||||||
else:
|
|
||||||
win.addstr(1, usable_width, " ", get_color("settings_default"))
|
|
||||||
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
|
|
||||||
|
|
||||||
|
|
||||||
def get_msg_window_lines(messages_win, packetlog_win) -> None:
|
|
||||||
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0
|
|
||||||
return messages_win.getmaxyx()[0] - 2 - packetlog_height
|
|
||||||
|
|||||||
@@ -1,42 +1,11 @@
|
|||||||
from typing import Any, Union, List, Dict
|
from typing import Any, Union, List, Dict
|
||||||
from dataclasses import dataclass, field
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class MenuState:
|
class MenuState:
|
||||||
menu_index: List[int] = field(default_factory=list)
|
def __init__(self):
|
||||||
start_index: List[int] = field(default_factory=lambda: [0])
|
self.menu_index: List[int] = [] # Row we left the previous menus
|
||||||
selected_index: int = 0
|
self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit
|
||||||
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict)
|
self.selected_index: int = 0 # Selected Row
|
||||||
menu_path: List[str] = field(default_factory=list)
|
self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu
|
||||||
show_save_option: bool = False
|
self.menu_path: List[str] = [] # Menu Path
|
||||||
|
self.show_save_option: bool = False # Display 'Save'
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ChatUIState:
|
|
||||||
display_log: bool = False
|
|
||||||
channel_list: List[str] = field(default_factory=list)
|
|
||||||
all_messages: Dict[str, List[str]] = field(default_factory=dict)
|
|
||||||
notifications: List[str] = field(default_factory=list)
|
|
||||||
packet_buffer: List[str] = field(default_factory=list)
|
|
||||||
node_list: List[str] = field(default_factory=list)
|
|
||||||
selected_channel: int = 0
|
|
||||||
selected_message: int = 0
|
|
||||||
selected_node: int = 0
|
|
||||||
current_window: int = 0
|
|
||||||
|
|
||||||
selected_index: int = 0
|
|
||||||
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
|
|
||||||
show_save_option: bool = False
|
|
||||||
menu_path: List[str] = field(default_factory=list)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class InterfaceState:
|
|
||||||
interface: Any = None
|
|
||||||
myNodeNum: int = 0
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class AppState:
|
|
||||||
lock: Any = None
|
|
||||||
|
|||||||
@@ -55,15 +55,10 @@ def edit_value(key: str, current_value: str) -> str:
|
|||||||
# Load theme names dynamically from the JSON
|
# Load theme names dynamically from the JSON
|
||||||
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
|
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
|
||||||
return get_list_input("Select Theme", current_value, theme_options)
|
return get_list_input("Select Theme", current_value, theme_options)
|
||||||
|
|
||||||
elif key == "node_sort":
|
elif key == "node_sort":
|
||||||
sort_options = ["lastHeard", "name", "hops"]
|
sort_options = ["lastHeard", "name", "hops"]
|
||||||
return get_list_input("Sort By", current_value, sort_options)
|
return get_list_input("Sort By", current_value, sort_options)
|
||||||
|
|
||||||
elif key == "notification_sound":
|
|
||||||
sound_options = ["True", "False"]
|
|
||||||
return get_list_input("Notification Sound", current_value, sound_options)
|
|
||||||
|
|
||||||
# Standard Input Mode (Scrollable)
|
# Standard Input Mode (Scrollable)
|
||||||
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
|
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
|
||||||
curses.curs_set(1)
|
curses.curs_set(1)
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import yaml
|
import yaml
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
from typing import List
|
from typing import List
|
||||||
from google.protobuf.json_format import MessageToDict
|
from google.protobuf.json_format import MessageToDict
|
||||||
from meshtastic import mt_config
|
from meshtastic import mt_config
|
||||||
@@ -134,29 +133,24 @@ def config_import(interface, filename):
|
|||||||
logging.info(f"Setting device owner to {configuration['owner']}")
|
logging.info(f"Setting device owner to {configuration['owner']}")
|
||||||
waitForAckNak = True
|
waitForAckNak = True
|
||||||
interface.getNode("^local", False).setOwner(configuration["owner"])
|
interface.getNode("^local", False).setOwner(configuration["owner"])
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
if "owner_short" in configuration:
|
if "owner_short" in configuration:
|
||||||
logging.info(f"Setting device owner short to {configuration['owner_short']}")
|
logging.info(f"Setting device owner short to {configuration['owner_short']}")
|
||||||
waitForAckNak = True
|
waitForAckNak = True
|
||||||
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
|
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
if "ownerShort" in configuration:
|
if "ownerShort" in configuration:
|
||||||
logging.info(f"Setting device owner short to {configuration['ownerShort']}")
|
logging.info(f"Setting device owner short to {configuration['ownerShort']}")
|
||||||
waitForAckNak = True
|
waitForAckNak = True
|
||||||
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
|
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
if "channel_url" in configuration:
|
if "channel_url" in configuration:
|
||||||
logging.info(f"Setting channel url to {configuration['channel_url']}")
|
logging.info(f"Setting channel url to {configuration['channel_url']}")
|
||||||
interface.getNode("^local").setURL(configuration["channel_url"])
|
interface.getNode("^local").setURL(configuration["channel_url"])
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
if "channelUrl" in configuration:
|
if "channelUrl" in configuration:
|
||||||
logging.info(f"Setting channel url to {configuration['channelUrl']}")
|
logging.info(f"Setting channel url to {configuration['channelUrl']}")
|
||||||
interface.getNode("^local").setURL(configuration["channelUrl"])
|
interface.getNode("^local").setURL(configuration["channelUrl"])
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
if "location" in configuration:
|
if "location" in configuration:
|
||||||
alt = 0
|
alt = 0
|
||||||
@@ -175,14 +169,12 @@ def config_import(interface, filename):
|
|||||||
logging.info(f"Fixing longitude at {lon} degrees")
|
logging.info(f"Fixing longitude at {lon} degrees")
|
||||||
logging.info("Setting device position")
|
logging.info("Setting device position")
|
||||||
interface.localNode.setFixedPosition(lat, lon, alt)
|
interface.localNode.setFixedPosition(lat, lon, alt)
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
if "config" in configuration:
|
if "config" in configuration:
|
||||||
localConfig = interface.getNode("^local").localConfig
|
localConfig = interface.getNode("^local").localConfig
|
||||||
for section in configuration["config"]:
|
for section in configuration["config"]:
|
||||||
traverseConfig(section, configuration["config"][section], localConfig)
|
traverseConfig(section, configuration["config"][section], localConfig)
|
||||||
interface.getNode("^local").writeConfig(camel_to_snake(section))
|
interface.getNode("^local").writeConfig(camel_to_snake(section))
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
if "module_config" in configuration:
|
if "module_config" in configuration:
|
||||||
moduleConfig = interface.getNode("^local").moduleConfig
|
moduleConfig = interface.getNode("^local").moduleConfig
|
||||||
@@ -193,7 +185,6 @@ def config_import(interface, filename):
|
|||||||
moduleConfig,
|
moduleConfig,
|
||||||
)
|
)
|
||||||
interface.getNode("^local").writeConfig(camel_to_snake(section))
|
interface.getNode("^local").writeConfig(camel_to_snake(section))
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
interface.getNode("^local", False).commitSettingsTransaction()
|
interface.getNode("^local", False).commitSettingsTransaction()
|
||||||
logging.info("Writing modified configuration to device")
|
logging.info("Writing modified configuration to device")
|
||||||
|
|||||||
@@ -6,14 +6,12 @@ from typing import Optional, Union, Dict
|
|||||||
|
|
||||||
from contact.utilities.utils import decimal_to_hex
|
from contact.utilities.utils import decimal_to_hex
|
||||||
import contact.ui.default_config as config
|
import contact.ui.default_config as config
|
||||||
|
import contact.globals as globals
|
||||||
|
|
||||||
from contact.utilities.singleton import ui_state, interface_state
|
|
||||||
|
|
||||||
|
|
||||||
def get_table_name(channel: str) -> str:
|
def get_table_name(channel: str) -> str:
|
||||||
# Construct the table name
|
# Construct the table name
|
||||||
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages"
|
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
|
||||||
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
|
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
|
||||||
return quoted_table_name
|
return quoted_table_name
|
||||||
|
|
||||||
@@ -63,7 +61,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
|
|||||||
message_text = ?
|
message_text = ?
|
||||||
"""
|
"""
|
||||||
|
|
||||||
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message))
|
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
|
||||||
db_connection.commit()
|
db_connection.commit()
|
||||||
|
|
||||||
except sqlite3.Error as e:
|
except sqlite3.Error as e:
|
||||||
@@ -74,13 +72,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
|
|||||||
|
|
||||||
|
|
||||||
def load_messages_from_db() -> None:
|
def load_messages_from_db() -> None:
|
||||||
"""Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list."""
|
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
|
||||||
try:
|
try:
|
||||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||||
db_cursor = db_connection.cursor()
|
db_cursor = db_connection.cursor()
|
||||||
|
|
||||||
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
|
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
|
||||||
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",))
|
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
|
||||||
tables = [row[0] for row in db_cursor.fetchall()]
|
tables = [row[0] for row in db_cursor.fetchall()]
|
||||||
|
|
||||||
# Iterate through each table and fetch its messages
|
# Iterate through each table and fetch its messages
|
||||||
@@ -106,15 +104,15 @@ def load_messages_from_db() -> None:
|
|||||||
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
|
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
|
||||||
channel = int(channel) if channel.isdigit() else channel
|
channel = int(channel) if channel.isdigit() else channel
|
||||||
|
|
||||||
# Add the channel to ui_state.channel_list if not already present
|
# Add the channel to globals.channel_list if not already present
|
||||||
if channel not in ui_state.channel_list and not is_chat_archived(channel):
|
if channel not in globals.channel_list and not is_chat_archived(channel):
|
||||||
ui_state.channel_list.append(channel)
|
globals.channel_list.append(channel)
|
||||||
|
|
||||||
# Ensure the channel exists in ui_state.all_messages
|
# Ensure the channel exists in globals.all_messages
|
||||||
if channel not in ui_state.all_messages:
|
if channel not in globals.all_messages:
|
||||||
ui_state.all_messages[channel] = []
|
globals.all_messages[channel] = []
|
||||||
|
|
||||||
# Add messages to ui_state.all_messages grouped by hourly timestamp
|
# Add messages to globals.all_messages grouped by hourly timestamp
|
||||||
hourly_messages = {}
|
hourly_messages = {}
|
||||||
for user_id, message, timestamp, ack_type in db_messages:
|
for user_id, message, timestamp, ack_type in db_messages:
|
||||||
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
|
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
|
||||||
@@ -129,7 +127,7 @@ def load_messages_from_db() -> None:
|
|||||||
elif ack_type == "Nak":
|
elif ack_type == "Nak":
|
||||||
ack_str = config.nak_str
|
ack_str = config.nak_str
|
||||||
|
|
||||||
if user_id == str(interface_state.myNodeNum):
|
if user_id == str(globals.myNodeNum):
|
||||||
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
|
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
|
||||||
else:
|
else:
|
||||||
formatted_message = (
|
formatted_message = (
|
||||||
@@ -139,10 +137,10 @@ def load_messages_from_db() -> None:
|
|||||||
|
|
||||||
hourly_messages[hour].append(formatted_message)
|
hourly_messages[hour].append(formatted_message)
|
||||||
|
|
||||||
# Flatten the hourly messages into ui_state.all_messages[channel]
|
# Flatten the hourly messages into globals.all_messages[channel]
|
||||||
for hour, messages in sorted(hourly_messages.items()):
|
for hour, messages in sorted(hourly_messages.items()):
|
||||||
ui_state.all_messages[channel].append((f"-- {hour} --", ""))
|
globals.all_messages[channel].append((f"-- {hour} --", ""))
|
||||||
ui_state.all_messages[channel].extend(messages)
|
globals.all_messages[channel].extend(messages)
|
||||||
|
|
||||||
except sqlite3.Error as e:
|
except sqlite3.Error as e:
|
||||||
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
|
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
|
||||||
@@ -155,11 +153,11 @@ def init_nodedb() -> None:
|
|||||||
"""Initialize the node database and update it with nodes from the interface."""
|
"""Initialize the node database and update it with nodes from the interface."""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not interface_state.interface.nodes:
|
if not globals.interface.nodes:
|
||||||
return # No nodes to initialize
|
return # No nodes to initialize
|
||||||
|
|
||||||
ensure_node_table_exists() # Ensure the table exists before insertion
|
ensure_node_table_exists() # Ensure the table exists before insertion
|
||||||
nodes_snapshot = list(interface_state.interface.nodes.values())
|
nodes_snapshot = list(globals.interface.nodes.values())
|
||||||
|
|
||||||
# Insert or update all nodes
|
# Insert or update all nodes
|
||||||
for node in nodes_snapshot:
|
for node in nodes_snapshot:
|
||||||
@@ -216,7 +214,7 @@ def update_node_info_in_db(
|
|||||||
|
|
||||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||||
db_cursor = db_connection.cursor()
|
db_cursor = db_connection.cursor()
|
||||||
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names
|
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
|
||||||
|
|
||||||
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
|
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
|
||||||
if "chat_archived" not in table_columns:
|
if "chat_archived" not in table_columns:
|
||||||
@@ -280,7 +278,7 @@ def update_node_info_in_db(
|
|||||||
|
|
||||||
def ensure_node_table_exists() -> None:
|
def ensure_node_table_exists() -> None:
|
||||||
"""Ensure the node database table exists."""
|
"""Ensure the node database table exists."""
|
||||||
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety
|
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
|
||||||
schema = """
|
schema = """
|
||||||
user_id TEXT PRIMARY KEY,
|
user_id TEXT PRIMARY KEY,
|
||||||
long_name TEXT,
|
long_name TEXT,
|
||||||
@@ -321,7 +319,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
|
|||||||
db_cursor = db_connection.cursor()
|
db_cursor = db_connection.cursor()
|
||||||
|
|
||||||
# Construct table name
|
# Construct table name
|
||||||
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
|
table_name = f"{str(globals.myNodeNum)}_nodedb"
|
||||||
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
|
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
|
||||||
|
|
||||||
# Determine the correct column to fetch
|
# Determine the correct column to fetch
|
||||||
@@ -347,7 +345,7 @@ def is_chat_archived(user_id: int) -> int:
|
|||||||
try:
|
try:
|
||||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||||
db_cursor = db_connection.cursor()
|
db_cursor = db_connection.cursor()
|
||||||
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
|
table_name = f"{str(globals.myNodeNum)}_nodedb"
|
||||||
nodeinfo_table = f'"{table_name}"'
|
nodeinfo_table = f'"{table_name}"'
|
||||||
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
|
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
|
||||||
db_cursor.execute(query, (user_id,))
|
db_cursor.execute(query, (user_id,))
|
||||||
|
|||||||
@@ -104,13 +104,11 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
|||||||
return [base64.b64encode(b).decode() for b in byte_strings]
|
return [base64.b64encode(b).decode() for b in byte_strings]
|
||||||
|
|
||||||
def is_valid_base64(s):
|
def is_valid_base64(s):
|
||||||
"""Check if a string is valid Base64 or blank."""
|
"""Check if a string is valid Base64."""
|
||||||
if s == "":
|
|
||||||
return True
|
|
||||||
try:
|
try:
|
||||||
decoded = base64.b64decode(s, validate=True)
|
decoded = base64.b64decode(s, validate=True)
|
||||||
return len(decoded) == 32 # Ensure it's exactly 32 bytes
|
return len(decoded) == 32 # Ensure it's exactly 32 bytes
|
||||||
except (binascii.Error, ValueError):
|
except binascii.Error:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
cvalue = to_base64(current_value) # Convert current values to Base64
|
cvalue = to_base64(current_value) # Convert current values to Base64
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState
|
|
||||||
|
|
||||||
ui_state = ChatUIState()
|
|
||||||
interface_state = InterfaceState()
|
|
||||||
app_state = AppState()
|
|
||||||
@@ -1,18 +1,16 @@
|
|||||||
|
import contact.globals as globals
|
||||||
import datetime
|
import datetime
|
||||||
import time
|
|
||||||
from meshtastic.protobuf import config_pb2
|
from meshtastic.protobuf import config_pb2
|
||||||
import contact.ui.default_config as config
|
import contact.ui.default_config as config
|
||||||
|
|
||||||
from contact.utilities.singleton import ui_state, interface_state
|
|
||||||
|
|
||||||
|
|
||||||
def get_channels():
|
def get_channels():
|
||||||
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages."""
|
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
|
||||||
node = interface_state.interface.getNode("^local")
|
node = globals.interface.getNode("^local")
|
||||||
device_channels = node.channels
|
device_channels = node.channels
|
||||||
|
|
||||||
# Clear and rebuild channel list
|
# Clear and rebuild channel list
|
||||||
# ui_state.channel_list = []
|
# globals.channel_list = []
|
||||||
|
|
||||||
for device_channel in device_channels:
|
for device_channel in device_channels:
|
||||||
if device_channel.role:
|
if device_channel.role:
|
||||||
@@ -28,20 +26,20 @@ def get_channels():
|
|||||||
].name
|
].name
|
||||||
channel_name = convert_to_camel_case(modem_preset_string)
|
channel_name = convert_to_camel_case(modem_preset_string)
|
||||||
|
|
||||||
# Add channel to ui_state.channel_list if not already present
|
# Add channel to globals.channel_list if not already present
|
||||||
if channel_name not in ui_state.channel_list:
|
if channel_name not in globals.channel_list:
|
||||||
ui_state.channel_list.append(channel_name)
|
globals.channel_list.append(channel_name)
|
||||||
|
|
||||||
# Initialize ui_state.all_messages[channel_name] if it doesn't exist
|
# Initialize globals.all_messages[channel_name] if it doesn't exist
|
||||||
if channel_name not in ui_state.all_messages:
|
if channel_name not in globals.all_messages:
|
||||||
ui_state.all_messages[channel_name] = []
|
globals.all_messages[channel_name] = []
|
||||||
|
|
||||||
return ui_state.channel_list
|
return globals.channel_list
|
||||||
|
|
||||||
|
|
||||||
def get_node_list():
|
def get_node_list():
|
||||||
if interface_state.interface.nodes:
|
if globals.interface.nodes:
|
||||||
my_node_num = interface_state.myNodeNum
|
my_node_num = globals.myNodeNum
|
||||||
|
|
||||||
def node_sort(node):
|
def node_sort(node):
|
||||||
if config.node_sort == "lastHeard":
|
if config.node_sort == "lastHeard":
|
||||||
@@ -53,7 +51,7 @@ def get_node_list():
|
|||||||
else:
|
else:
|
||||||
return node
|
return node
|
||||||
|
|
||||||
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort)
|
sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
|
||||||
|
|
||||||
# Move favorite nodes to the beginning
|
# Move favorite nodes to the beginning
|
||||||
sorted_nodes = sorted(
|
sorted_nodes = sorted(
|
||||||
@@ -70,14 +68,14 @@ def get_node_list():
|
|||||||
|
|
||||||
def refresh_node_list():
|
def refresh_node_list():
|
||||||
new_node_list = get_node_list()
|
new_node_list = get_node_list()
|
||||||
if new_node_list != ui_state.node_list:
|
if new_node_list != globals.node_list:
|
||||||
ui_state.node_list = new_node_list
|
globals.node_list = new_node_list
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def get_nodeNum():
|
def get_nodeNum():
|
||||||
myinfo = interface_state.interface.getMyNodeInfo()
|
myinfo = globals.interface.getMyNodeInfo()
|
||||||
myNodeNum = myinfo["num"]
|
myNodeNum = myinfo["num"]
|
||||||
return myNodeNum
|
return myNodeNum
|
||||||
|
|
||||||
@@ -135,31 +133,3 @@ def get_time_ago(timestamp):
|
|||||||
if unit != "s":
|
if unit != "s":
|
||||||
return f"{value} {unit} ago"
|
return f"{value} {unit} ago"
|
||||||
return "now"
|
return "now"
|
||||||
|
|
||||||
def add_new_message(channel_id, prefix, message):
|
|
||||||
if channel_id not in ui_state.all_messages:
|
|
||||||
ui_state.all_messages[channel_id] = []
|
|
||||||
|
|
||||||
# Timestamp handling
|
|
||||||
current_timestamp = time.time()
|
|
||||||
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
|
||||||
|
|
||||||
# Retrieve the last timestamp if available
|
|
||||||
channel_messages = ui_state.all_messages[channel_id]
|
|
||||||
if channel_messages:
|
|
||||||
# Check the last entry for a timestamp
|
|
||||||
for entry in reversed(channel_messages):
|
|
||||||
if entry[0].startswith("--"):
|
|
||||||
last_hour = entry[0].strip("- ").strip()
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
last_hour = None
|
|
||||||
else:
|
|
||||||
last_hour = None
|
|
||||||
|
|
||||||
# Add a new timestamp if it's a new hour
|
|
||||||
if last_hour != current_hour:
|
|
||||||
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
|
||||||
|
|
||||||
# Add the message
|
|
||||||
ui_state.all_messages[channel_id].append((prefix,message))
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "contact"
|
name = "contact"
|
||||||
version = "1.3.15"
|
version = "1.3.8"
|
||||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||||
authors = [
|
authors = [
|
||||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||||
|
|||||||
Reference in New Issue
Block a user