mirror of
https://github.com/pdxlocations/contact.git
synced 2026-03-28 17:12:35 +01:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d2e559ed04 |
@@ -15,6 +15,7 @@ import curses
|
|||||||
import io
|
import io
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import queue
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
@@ -32,7 +33,6 @@ from contact.ui.contact_ui import main_ui
|
|||||||
from contact.ui.splash import draw_splash
|
from contact.ui.splash import draw_splash
|
||||||
from contact.utilities.arg_parser import setup_parser
|
from contact.utilities.arg_parser import setup_parser
|
||||||
from contact.utilities.db_handler import init_nodedb, load_messages_from_db
|
from contact.utilities.db_handler import init_nodedb, load_messages_from_db
|
||||||
from contact.utilities.demo_data import build_demo_interface, configure_demo_database, seed_demo_messages
|
|
||||||
from contact.utilities.input_handlers import get_list_input
|
from contact.utilities.input_handlers import get_list_input
|
||||||
from contact.utilities.i18n import t
|
from contact.utilities.i18n import t
|
||||||
from contact.ui.dialog import dialog
|
from contact.ui.dialog import dialog
|
||||||
@@ -55,6 +55,8 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
|
|
||||||
app_state.lock = threading.Lock()
|
app_state.lock = threading.Lock()
|
||||||
|
app_state.rx_queue = queue.SimpleQueue()
|
||||||
|
app_state.ui_shutdown = False
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
@@ -69,18 +71,9 @@ def prompt_region_if_unset(args: object) -> None:
|
|||||||
interface_state.interface = initialize_interface(args)
|
interface_state.interface = initialize_interface(args)
|
||||||
|
|
||||||
|
|
||||||
def initialize_globals(seed_demo: bool = False) -> None:
|
def initialize_globals() -> None:
|
||||||
"""Initializes interface and shared globals."""
|
"""Initializes interface and shared globals."""
|
||||||
|
|
||||||
ui_state.channel_list = []
|
|
||||||
ui_state.all_messages = {}
|
|
||||||
ui_state.notifications = []
|
|
||||||
ui_state.packet_buffer = []
|
|
||||||
ui_state.node_list = []
|
|
||||||
ui_state.selected_channel = 0
|
|
||||||
ui_state.selected_message = 0
|
|
||||||
ui_state.selected_node = 0
|
|
||||||
ui_state.start_index = [0, 0, 0]
|
|
||||||
interface_state.myNodeNum = get_nodeNum()
|
interface_state.myNodeNum = get_nodeNum()
|
||||||
ui_state.channel_list = get_channels()
|
ui_state.channel_list = get_channels()
|
||||||
ui_state.node_list = get_node_list()
|
ui_state.node_list = get_node_list()
|
||||||
@@ -88,18 +81,9 @@ def initialize_globals(seed_demo: bool = False) -> None:
|
|||||||
pub.subscribe(on_receive, "meshtastic.receive")
|
pub.subscribe(on_receive, "meshtastic.receive")
|
||||||
|
|
||||||
init_nodedb()
|
init_nodedb()
|
||||||
if seed_demo:
|
|
||||||
seed_demo_messages()
|
|
||||||
load_messages_from_db()
|
load_messages_from_db()
|
||||||
|
|
||||||
|
|
||||||
def initialize_runtime_interface(args: object):
|
|
||||||
if getattr(args, "demo_screenshot", False):
|
|
||||||
configure_demo_database()
|
|
||||||
return build_demo_interface()
|
|
||||||
return initialize_interface(args)
|
|
||||||
|
|
||||||
|
|
||||||
def main(stdscr: curses.window) -> None:
|
def main(stdscr: curses.window) -> None:
|
||||||
"""Main entry point for the curses UI."""
|
"""Main entry point for the curses UI."""
|
||||||
|
|
||||||
@@ -117,12 +101,12 @@ def main(stdscr: curses.window) -> None:
|
|||||||
|
|
||||||
logging.info("Initializing interface...")
|
logging.info("Initializing interface...")
|
||||||
with app_state.lock:
|
with app_state.lock:
|
||||||
interface_state.interface = initialize_runtime_interface(args)
|
interface_state.interface = initialize_interface(args)
|
||||||
|
|
||||||
if not getattr(args, "demo_screenshot", False) and interface_state.interface.localNode.localConfig.lora.region == 0:
|
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
||||||
prompt_region_if_unset(args)
|
prompt_region_if_unset(args)
|
||||||
|
|
||||||
initialize_globals(seed_demo=getattr(args, "demo_screenshot", False))
|
initialize_globals()
|
||||||
logging.info("Starting main UI")
|
logging.info("Starting main UI")
|
||||||
|
|
||||||
stdscr.clear()
|
stdscr.clear()
|
||||||
@@ -168,11 +152,10 @@ def start() -> None:
|
|||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
app_state.ui_shutdown = False
|
||||||
curses.wrapper(main)
|
curses.wrapper(main)
|
||||||
interface_state.interface.close()
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logging.info("User exited with Ctrl+C")
|
logging.info("User exited with Ctrl+C")
|
||||||
interface_state.interface.close()
|
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.critical("Fatal error", exc_info=True)
|
logging.critical("Fatal error", exc_info=True)
|
||||||
@@ -183,6 +166,13 @@ def start() -> None:
|
|||||||
print("Fatal error:", e)
|
print("Fatal error:", e)
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
finally:
|
||||||
|
app_state.ui_shutdown = True
|
||||||
|
try:
|
||||||
|
if interface_state.interface is not None:
|
||||||
|
interface_state.interface.close()
|
||||||
|
except Exception:
|
||||||
|
logging.exception("Error while closing interface")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ help.node_info, "F5 = Full node info", ""
|
|||||||
help.archive_chat, "Ctrl+D = Archive chat / remove node", ""
|
help.archive_chat, "Ctrl+D = Archive chat / remove node", ""
|
||||||
help.favorite, "Ctrl+F = Favorite", ""
|
help.favorite, "Ctrl+F = Favorite", ""
|
||||||
help.ignore, "Ctrl+G = Ignore", ""
|
help.ignore, "Ctrl+G = Ignore", ""
|
||||||
help.search, "Ctrl+/ or / = Search", ""
|
help.search, "Ctrl+/ = Search", ""
|
||||||
help.help, "Ctrl+K = Help", ""
|
help.help, "Ctrl+K = Help", ""
|
||||||
help.no_help, "No help available.", ""
|
help.no_help, "No help available.", ""
|
||||||
confirm.remove_from_nodedb, "Remove {name} from nodedb?", ""
|
confirm.remove_from_nodedb, "Remove {name} from nodedb?", ""
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ help.node_info, "F5 = Полная информация об узле", ""
|
|||||||
help.archive_chat, "Ctrl+D = Архив чата / удалить узел", ""
|
help.archive_chat, "Ctrl+D = Архив чата / удалить узел", ""
|
||||||
help.favorite, "Ctrl+F = Избранное", ""
|
help.favorite, "Ctrl+F = Избранное", ""
|
||||||
help.ignore, "Ctrl+G = Игнорировать", ""
|
help.ignore, "Ctrl+G = Игнорировать", ""
|
||||||
help.search, "Ctrl+/ или / = Поиск", ""
|
help.search, "Ctrl+/ = Поиск", ""
|
||||||
help.help, "Ctrl+K = Справка", ""
|
help.help, "Ctrl+K = Справка", ""
|
||||||
help.no_help, "Нет справки.", ""
|
help.no_help, "Нет справки.", ""
|
||||||
confirm.remove_from_nodedb, "Удалить {name} из базы узлов?", ""
|
confirm.remove_from_nodedb, "Удалить {name} из базы узлов?", ""
|
||||||
|
|||||||
@@ -2,30 +2,36 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
import shutil
|
import shutil
|
||||||
import time
|
|
||||||
import subprocess
|
import subprocess
|
||||||
import threading
|
import threading
|
||||||
from typing import Any, Dict, Optional
|
import time
|
||||||
# Debounce notification sounds so a burst of queued messages only plays once.
|
from typing import Any, Dict
|
||||||
|
|
||||||
|
import contact.ui.default_config as config
|
||||||
|
from contact.utilities.db_handler import (
|
||||||
|
get_name_from_database,
|
||||||
|
maybe_store_nodeinfo_in_db,
|
||||||
|
save_message_to_db,
|
||||||
|
update_node_info_in_db,
|
||||||
|
)
|
||||||
|
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
|
||||||
|
from contact.utilities.utils import add_new_message, refresh_node_list
|
||||||
|
|
||||||
|
# Debounce notification sounds so a burst of queued messages only plays once.
|
||||||
_SOUND_DEBOUNCE_SECONDS = 0.8
|
_SOUND_DEBOUNCE_SECONDS = 0.8
|
||||||
_sound_timer: Optional[threading.Timer] = None
|
_sound_timer: threading.Timer | None = None
|
||||||
_sound_timer_lock = threading.Lock()
|
_sound_timer_lock = threading.Lock()
|
||||||
_last_sound_request = 0.0
|
_last_sound_request = 0.0
|
||||||
|
|
||||||
|
|
||||||
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
||||||
"""Schedule a notification sound after a short quiet period.
|
"""Schedule a notification sound after a short quiet period."""
|
||||||
|
|
||||||
If more messages arrive before the delay elapses, the timer is reset.
|
|
||||||
This prevents playing a sound for each message when a backlog flushes.
|
|
||||||
"""
|
|
||||||
global _sound_timer, _last_sound_request
|
global _sound_timer, _last_sound_request
|
||||||
|
|
||||||
now = time.monotonic()
|
now = time.monotonic()
|
||||||
with _sound_timer_lock:
|
with _sound_timer_lock:
|
||||||
_last_sound_request = now
|
_last_sound_request = now
|
||||||
|
|
||||||
# Cancel any previously scheduled sound.
|
|
||||||
if _sound_timer is not None:
|
if _sound_timer is not None:
|
||||||
try:
|
try:
|
||||||
_sound_timer.cancel()
|
_sound_timer.cancel()
|
||||||
@@ -34,7 +40,6 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
|||||||
_sound_timer = None
|
_sound_timer = None
|
||||||
|
|
||||||
def _fire(expected_request_time: float) -> None:
|
def _fire(expected_request_time: float) -> None:
|
||||||
# Only play if nothing newer has been scheduled.
|
|
||||||
with _sound_timer_lock:
|
with _sound_timer_lock:
|
||||||
if expected_request_time != _last_sound_request:
|
if expected_request_time != _last_sound_request:
|
||||||
return
|
return
|
||||||
@@ -43,42 +48,20 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
|||||||
_sound_timer = threading.Timer(delay, _fire, args=(now,))
|
_sound_timer = threading.Timer(delay, _fire, args=(now,))
|
||||||
_sound_timer.daemon = True
|
_sound_timer.daemon = True
|
||||||
_sound_timer.start()
|
_sound_timer.start()
|
||||||
from contact.utilities.utils import (
|
|
||||||
refresh_node_list,
|
|
||||||
add_new_message,
|
|
||||||
)
|
|
||||||
from contact.ui.contact_ui import (
|
|
||||||
draw_packetlog_win,
|
|
||||||
draw_node_list,
|
|
||||||
draw_messages_window,
|
|
||||||
draw_channel_list,
|
|
||||||
add_notification,
|
|
||||||
)
|
|
||||||
from contact.utilities.db_handler import (
|
|
||||||
save_message_to_db,
|
|
||||||
maybe_store_nodeinfo_in_db,
|
|
||||||
get_name_from_database,
|
|
||||||
update_node_info_in_db,
|
|
||||||
)
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
|
|
||||||
from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state
|
|
||||||
|
|
||||||
|
|
||||||
def play_sound():
|
def play_sound() -> None:
|
||||||
try:
|
try:
|
||||||
system = platform.system()
|
system = platform.system()
|
||||||
sound_path = None
|
sound_path = None
|
||||||
executable = None
|
executable = None
|
||||||
|
|
||||||
if system == "Darwin": # macOS
|
if system == "Darwin":
|
||||||
sound_path = "/System/Library/Sounds/Ping.aiff"
|
sound_path = "/System/Library/Sounds/Ping.aiff"
|
||||||
executable = "afplay"
|
executable = "afplay"
|
||||||
|
|
||||||
elif system == "Linux":
|
elif system == "Linux":
|
||||||
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
||||||
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
|
wav_path = "/usr/share/sounds/alsa/Front_Center.wav"
|
||||||
|
|
||||||
if shutil.which("paplay") and os.path.exists(ogg_path):
|
if shutil.which("paplay") and os.path.exists(ogg_path):
|
||||||
executable = "paplay"
|
executable = "paplay"
|
||||||
sound_path = ogg_path
|
sound_path = ogg_path
|
||||||
@@ -95,102 +78,127 @@ def play_sound():
|
|||||||
cmd = [executable, sound_path]
|
cmd = [executable, sound_path]
|
||||||
if executable == "ffplay":
|
if executable == "ffplay":
|
||||||
cmd = [executable, "-nodisp", "-autoexit", sound_path]
|
cmd = [executable, "-nodisp", "-autoexit", sound_path]
|
||||||
|
|
||||||
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||||
return
|
except subprocess.CalledProcessError as exc:
|
||||||
|
logging.error("Sound playback failed: %s", exc)
|
||||||
|
except Exception as exc:
|
||||||
|
logging.error("Unexpected error while playing sound: %s", exc)
|
||||||
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
logging.error(f"Sound playback failed: {e}")
|
def _decode_message_payload(payload: Any) -> str:
|
||||||
except Exception as e:
|
if isinstance(payload, bytes):
|
||||||
logging.error(f"Unexpected error: {e}")
|
return payload.decode("utf-8", errors="replace")
|
||||||
|
if isinstance(payload, str):
|
||||||
|
return payload
|
||||||
|
return str(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def process_receive_event(packet: Dict[str, Any]) -> None:
|
||||||
|
"""Process a queued packet on the UI thread and perform all UI updates."""
|
||||||
|
# Local import prevents module-level circular import.
|
||||||
|
from contact.ui.contact_ui import (
|
||||||
|
add_notification,
|
||||||
|
draw_channel_list,
|
||||||
|
draw_messages_window,
|
||||||
|
draw_node_list,
|
||||||
|
draw_packetlog_win,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update packet log
|
||||||
|
ui_state.packet_buffer.append(packet)
|
||||||
|
if len(ui_state.packet_buffer) > 20:
|
||||||
|
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
|
||||||
|
|
||||||
|
if ui_state.display_log:
|
||||||
|
draw_packetlog_win()
|
||||||
|
if ui_state.current_window == 4:
|
||||||
|
menu_state.need_redraw = True
|
||||||
|
|
||||||
|
decoded = packet.get("decoded")
|
||||||
|
if not isinstance(decoded, dict):
|
||||||
|
return
|
||||||
|
|
||||||
|
changed = refresh_node_list()
|
||||||
|
if changed:
|
||||||
|
draw_node_list()
|
||||||
|
|
||||||
|
portnum = decoded.get("portnum")
|
||||||
|
if portnum == "NODEINFO_APP":
|
||||||
|
user = decoded.get("user")
|
||||||
|
if isinstance(user, dict) and "longName" in user:
|
||||||
|
maybe_store_nodeinfo_in_db(packet)
|
||||||
|
return
|
||||||
|
|
||||||
|
if portnum != "TEXT_MESSAGE_APP":
|
||||||
|
return
|
||||||
|
|
||||||
|
hop_start = packet.get("hopStart", 0)
|
||||||
|
hop_limit = packet.get("hopLimit", 0)
|
||||||
|
hops = hop_start - hop_limit
|
||||||
|
|
||||||
|
if config.notification_sound == "True":
|
||||||
|
schedule_notification_sound()
|
||||||
|
|
||||||
|
message_string = _decode_message_payload(decoded.get("payload"))
|
||||||
|
|
||||||
|
if not ui_state.channel_list:
|
||||||
|
return
|
||||||
|
|
||||||
|
refresh_channels = False
|
||||||
|
refresh_messages = False
|
||||||
|
|
||||||
|
channel_number = packet.get("channel", 0)
|
||||||
|
if not isinstance(channel_number, int):
|
||||||
|
channel_number = 0
|
||||||
|
if channel_number < 0:
|
||||||
|
channel_number = 0
|
||||||
|
|
||||||
|
packet_from = packet.get("from")
|
||||||
|
if packet.get("to") == interface_state.myNodeNum and packet_from is not None:
|
||||||
|
if packet_from not in ui_state.channel_list:
|
||||||
|
ui_state.channel_list.append(packet_from)
|
||||||
|
if packet_from not in ui_state.all_messages:
|
||||||
|
ui_state.all_messages[packet_from] = []
|
||||||
|
update_node_info_in_db(packet_from, chat_archived=False)
|
||||||
|
refresh_channels = True
|
||||||
|
channel_number = ui_state.channel_list.index(packet_from)
|
||||||
|
|
||||||
|
if channel_number >= len(ui_state.channel_list):
|
||||||
|
channel_number = 0
|
||||||
|
|
||||||
|
channel_id = ui_state.channel_list[channel_number]
|
||||||
|
|
||||||
|
if ui_state.selected_channel >= len(ui_state.channel_list):
|
||||||
|
ui_state.selected_channel = 0
|
||||||
|
|
||||||
|
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
|
||||||
|
add_notification(channel_number)
|
||||||
|
refresh_channels = True
|
||||||
|
else:
|
||||||
|
refresh_messages = True
|
||||||
|
|
||||||
|
if packet_from is None:
|
||||||
|
logging.debug("Skipping TEXT_MESSAGE_APP packet with missing 'from' field")
|
||||||
|
return
|
||||||
|
|
||||||
|
message_from_string = get_name_from_database(packet_from, type="short") + ":"
|
||||||
|
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
|
||||||
|
|
||||||
|
if refresh_channels:
|
||||||
|
draw_channel_list()
|
||||||
|
if refresh_messages:
|
||||||
|
draw_messages_window(True)
|
||||||
|
|
||||||
|
save_message_to_db(channel_id, packet_from, message_string)
|
||||||
|
|
||||||
|
|
||||||
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||||
"""
|
"""Enqueue packet to be processed on the main curses thread."""
|
||||||
Handles an incoming packet from a Meshtastic interface.
|
if app_state.ui_shutdown:
|
||||||
|
return
|
||||||
Args:
|
if not isinstance(packet, dict):
|
||||||
packet: The received Meshtastic packet as a dictionary.
|
return
|
||||||
interface: The Meshtastic interface instance that received the packet.
|
try:
|
||||||
"""
|
app_state.rx_queue.put(packet)
|
||||||
with app_state.lock:
|
except Exception:
|
||||||
# Update packet log
|
logging.exception("Failed to enqueue packet for UI processing")
|
||||||
ui_state.packet_buffer.append(packet)
|
|
||||||
if len(ui_state.packet_buffer) > 20:
|
|
||||||
# Trim buffer to 20 packets
|
|
||||||
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
|
|
||||||
|
|
||||||
if ui_state.display_log:
|
|
||||||
draw_packetlog_win()
|
|
||||||
|
|
||||||
if ui_state.current_window == 4:
|
|
||||||
menu_state.need_redraw = True
|
|
||||||
try:
|
|
||||||
if "decoded" not in packet:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Assume any incoming packet could update the last seen time for a node
|
|
||||||
changed = refresh_node_list()
|
|
||||||
if changed:
|
|
||||||
draw_node_list()
|
|
||||||
|
|
||||||
if packet["decoded"]["portnum"] == "NODEINFO_APP":
|
|
||||||
if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]:
|
|
||||||
maybe_store_nodeinfo_in_db(packet)
|
|
||||||
|
|
||||||
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
|
|
||||||
hop_start = packet.get('hopStart', 0)
|
|
||||||
hop_limit = packet.get('hopLimit', 0)
|
|
||||||
|
|
||||||
hops = hop_start - hop_limit
|
|
||||||
|
|
||||||
|
|
||||||
if config.notification_sound == "True":
|
|
||||||
schedule_notification_sound()
|
|
||||||
|
|
||||||
message_bytes = packet["decoded"]["payload"]
|
|
||||||
message_string = message_bytes.decode("utf-8")
|
|
||||||
|
|
||||||
refresh_channels = False
|
|
||||||
refresh_messages = False
|
|
||||||
|
|
||||||
if packet.get("channel"):
|
|
||||||
channel_number = packet["channel"]
|
|
||||||
else:
|
|
||||||
channel_number = 0
|
|
||||||
|
|
||||||
if packet["to"] == interface_state.myNodeNum:
|
|
||||||
if packet["from"] in ui_state.channel_list:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
ui_state.channel_list.append(packet["from"])
|
|
||||||
if packet["from"] not in ui_state.all_messages:
|
|
||||||
ui_state.all_messages[packet["from"]] = []
|
|
||||||
update_node_info_in_db(packet["from"], chat_archived=False)
|
|
||||||
refresh_channels = True
|
|
||||||
|
|
||||||
channel_number = ui_state.channel_list.index(packet["from"])
|
|
||||||
|
|
||||||
channel_id = ui_state.channel_list[channel_number]
|
|
||||||
|
|
||||||
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
|
|
||||||
add_notification(channel_number)
|
|
||||||
refresh_channels = True
|
|
||||||
else:
|
|
||||||
refresh_messages = True
|
|
||||||
|
|
||||||
# Add received message to the messages list
|
|
||||||
message_from_id = packet["from"]
|
|
||||||
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
|
|
||||||
|
|
||||||
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
|
|
||||||
|
|
||||||
if refresh_channels:
|
|
||||||
draw_channel_list()
|
|
||||||
if refresh_messages:
|
|
||||||
draw_messages_window(True)
|
|
||||||
|
|
||||||
save_message_to_db(channel_id, message_from_id, message_string)
|
|
||||||
|
|
||||||
except KeyError as e:
|
|
||||||
logging.error(f"Error processing packet: {e}")
|
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
import curses
|
import curses
|
||||||
import logging
|
import logging
|
||||||
|
from queue import Empty
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
|
||||||
|
from contact.message_handlers.rx_handler import process_receive_event
|
||||||
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
|
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
|
||||||
from contact.settings import settings_menu
|
from contact.settings import settings_menu
|
||||||
from contact.message_handlers.tx_handler import send_message, send_traceroute
|
from contact.message_handlers.tx_handler import send_message, send_traceroute
|
||||||
@@ -12,17 +14,14 @@ from contact.ui.colors import get_color
|
|||||||
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
|
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
|
||||||
from contact.utilities.input_handlers import get_list_input
|
from contact.utilities.input_handlers import get_list_input
|
||||||
from contact.utilities.i18n import t
|
from contact.utilities.i18n import t
|
||||||
from contact.utilities.emoji_utils import normalize_message_text
|
|
||||||
import contact.ui.default_config as config
|
import contact.ui.default_config as config
|
||||||
import contact.ui.dialog
|
import contact.ui.dialog
|
||||||
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
|
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
|
||||||
from contact.utilities.singleton import ui_state, interface_state, menu_state
|
from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state
|
||||||
|
|
||||||
|
|
||||||
MIN_COL = 1 # "effectively zero" without breaking curses
|
MIN_COL = 1 # "effectively zero" without breaking curses
|
||||||
RESIZE_DEBOUNCE_MS = 250
|
|
||||||
root_win = None
|
root_win = None
|
||||||
nodes_pad = None
|
|
||||||
|
|
||||||
|
|
||||||
# Draw arrows for a specific window id (0=channel,1=messages,2=nodes).
|
# Draw arrows for a specific window id (0=channel,1=messages,2=nodes).
|
||||||
@@ -65,72 +64,6 @@ def paint_frame(win, selected: bool) -> None:
|
|||||||
win.refresh()
|
win.refresh()
|
||||||
|
|
||||||
|
|
||||||
def get_channel_row_color(index: int) -> int:
|
|
||||||
if index == ui_state.selected_channel:
|
|
||||||
if ui_state.current_window == 0:
|
|
||||||
return get_color("channel_list", reverse=True)
|
|
||||||
return get_color("channel_selected")
|
|
||||||
return get_color("channel_list")
|
|
||||||
|
|
||||||
|
|
||||||
def get_node_row_color(index: int, highlight: bool = False) -> int:
|
|
||||||
node_num = ui_state.node_list[index]
|
|
||||||
node = interface_state.interface.nodesByNum.get(node_num, {})
|
|
||||||
color = "node_list"
|
|
||||||
if node.get("isFavorite"):
|
|
||||||
color = "node_favorite"
|
|
||||||
if node.get("isIgnored"):
|
|
||||||
color = "node_ignored"
|
|
||||||
reverse = index == ui_state.selected_node and (ui_state.current_window == 2 or highlight)
|
|
||||||
return get_color(color, reverse=reverse)
|
|
||||||
|
|
||||||
|
|
||||||
def refresh_node_selection(old_index: int = -1, highlight: bool = False) -> None:
|
|
||||||
if nodes_pad is None or not ui_state.node_list:
|
|
||||||
return
|
|
||||||
|
|
||||||
width = max(0, nodes_pad.getmaxyx()[1] - 4)
|
|
||||||
|
|
||||||
if 0 <= old_index < len(ui_state.node_list):
|
|
||||||
try:
|
|
||||||
nodes_pad.chgat(old_index, 1, width, get_node_row_color(old_index, highlight=highlight))
|
|
||||||
except curses.error:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if 0 <= ui_state.selected_node < len(ui_state.node_list):
|
|
||||||
try:
|
|
||||||
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node, highlight=highlight))
|
|
||||||
except curses.error:
|
|
||||||
pass
|
|
||||||
|
|
||||||
ui_state.start_index[2] = max(0, ui_state.selected_node - (nodes_win.getmaxyx()[0] - 3))
|
|
||||||
refresh_pad(2)
|
|
||||||
draw_window_arrows(2)
|
|
||||||
|
|
||||||
|
|
||||||
def refresh_main_window(window_id: int, selected: bool) -> None:
|
|
||||||
if window_id == 0:
|
|
||||||
paint_frame(channel_win, selected=selected)
|
|
||||||
if ui_state.channel_list:
|
|
||||||
width = max(0, channel_pad.getmaxyx()[1] - 4)
|
|
||||||
channel_pad.chgat(ui_state.selected_channel, 1, width, get_channel_row_color(ui_state.selected_channel))
|
|
||||||
refresh_pad(0)
|
|
||||||
elif window_id == 1:
|
|
||||||
paint_frame(messages_win, selected=selected)
|
|
||||||
refresh_pad(1)
|
|
||||||
elif window_id == 2:
|
|
||||||
paint_frame(nodes_win, selected=selected)
|
|
||||||
if ui_state.node_list and nodes_pad is not None:
|
|
||||||
width = max(0, nodes_pad.getmaxyx()[1] - 4)
|
|
||||||
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node))
|
|
||||||
refresh_pad(2)
|
|
||||||
|
|
||||||
|
|
||||||
def get_node_display_name(node_num: int, node: dict) -> str:
|
|
||||||
user = node.get("user") or {}
|
|
||||||
return user.get("longName") or get_name_from_database(node_num, "long")
|
|
||||||
|
|
||||||
|
|
||||||
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
||||||
"""Handle terminal resize events and redraw the UI accordingly."""
|
"""Handle terminal resize events and redraw the UI accordingly."""
|
||||||
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, packetlog_win, entry_win
|
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, packetlog_win, entry_win
|
||||||
@@ -230,22 +163,22 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def drain_resize_events(input_win: curses.window) -> Union[str, int, None]:
|
def drain_receive_queue(max_events: int = 200) -> None:
|
||||||
"""Wait for resize events to settle and preserve one queued non-resize key."""
|
processed = 0
|
||||||
input_win.timeout(RESIZE_DEBOUNCE_MS)
|
while processed < max_events:
|
||||||
try:
|
try:
|
||||||
while True:
|
packet = app_state.rx_queue.get(block=False)
|
||||||
try:
|
except Empty:
|
||||||
next_char = input_win.get_wch()
|
return
|
||||||
except curses.error:
|
except Exception:
|
||||||
return None
|
logging.exception("Error while draining receive queue")
|
||||||
|
return
|
||||||
|
|
||||||
if next_char == curses.KEY_RESIZE:
|
try:
|
||||||
continue
|
process_receive_event(packet)
|
||||||
|
except Exception:
|
||||||
return next_char
|
logging.exception("Error while processing receive event")
|
||||||
finally:
|
processed += 1
|
||||||
input_win.timeout(-1)
|
|
||||||
|
|
||||||
|
|
||||||
def main_ui(stdscr: curses.window) -> None:
|
def main_ui(stdscr: curses.window) -> None:
|
||||||
@@ -255,20 +188,20 @@ def main_ui(stdscr: curses.window) -> None:
|
|||||||
|
|
||||||
root_win = stdscr
|
root_win = stdscr
|
||||||
input_text = ""
|
input_text = ""
|
||||||
queued_char = None
|
|
||||||
stdscr.keypad(True)
|
stdscr.keypad(True)
|
||||||
get_channels()
|
get_channels()
|
||||||
handle_resize(stdscr, True)
|
handle_resize(stdscr, True)
|
||||||
|
entry_win.timeout(75)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
drain_receive_queue()
|
||||||
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
|
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
|
||||||
|
|
||||||
# Get user input from entry window
|
# Get user input from entry window
|
||||||
if queued_char is None:
|
try:
|
||||||
char = entry_win.get_wch()
|
char = entry_win.get_wch()
|
||||||
else:
|
except curses.error:
|
||||||
char = queued_char
|
continue
|
||||||
queued_char = None
|
|
||||||
|
|
||||||
# draw_debug(f"Keypress: {char}")
|
# draw_debug(f"Keypress: {char}")
|
||||||
|
|
||||||
@@ -316,16 +249,13 @@ def main_ui(stdscr: curses.window) -> None:
|
|||||||
|
|
||||||
elif char == curses.KEY_RESIZE:
|
elif char == curses.KEY_RESIZE:
|
||||||
input_text = ""
|
input_text = ""
|
||||||
queued_char = drain_resize_events(entry_win)
|
|
||||||
handle_resize(stdscr, False)
|
handle_resize(stdscr, False)
|
||||||
continue
|
entry_win.timeout(75)
|
||||||
|
|
||||||
elif char == chr(4): # Ctrl + D to delete current channel or node
|
elif char == chr(4): # Ctrl + D to delete current channel or node
|
||||||
handle_ctrl_d()
|
handle_ctrl_d()
|
||||||
|
|
||||||
elif char == chr(31) or (
|
elif char == chr(31): # Ctrl + / to search
|
||||||
char == "/" and not input_text and ui_state.current_window in (0, 2)
|
|
||||||
): # Ctrl + / or / to search in channel/node lists
|
|
||||||
handle_ctrl_fslash()
|
handle_ctrl_fslash()
|
||||||
|
|
||||||
elif char == chr(11): # Ctrl + K for Help
|
elif char == chr(11): # Ctrl + K for Help
|
||||||
@@ -429,16 +359,31 @@ def handle_leftright(char: int) -> None:
|
|||||||
delta = -1 if char == curses.KEY_LEFT else 1
|
delta = -1 if char == curses.KEY_LEFT else 1
|
||||||
old_window = ui_state.current_window
|
old_window = ui_state.current_window
|
||||||
ui_state.current_window = (ui_state.current_window + delta) % 3
|
ui_state.current_window = (ui_state.current_window + delta) % 3
|
||||||
if ui_state.single_pane_mode:
|
handle_resize(root_win, False)
|
||||||
handle_resize(root_win, False)
|
|
||||||
return
|
|
||||||
|
|
||||||
refresh_main_window(old_window, selected=False)
|
if old_window == 0:
|
||||||
|
paint_frame(channel_win, selected=False)
|
||||||
|
refresh_pad(0)
|
||||||
|
if old_window == 1:
|
||||||
|
paint_frame(messages_win, selected=False)
|
||||||
|
refresh_pad(1)
|
||||||
|
elif old_window == 2:
|
||||||
|
paint_frame(nodes_win, selected=False)
|
||||||
|
refresh_pad(2)
|
||||||
|
|
||||||
if not ui_state.single_pane_mode:
|
if not ui_state.single_pane_mode:
|
||||||
draw_window_arrows(old_window)
|
draw_window_arrows(old_window)
|
||||||
|
|
||||||
refresh_main_window(ui_state.current_window, selected=True)
|
if ui_state.current_window == 0:
|
||||||
|
paint_frame(channel_win, selected=True)
|
||||||
|
refresh_pad(0)
|
||||||
|
elif ui_state.current_window == 1:
|
||||||
|
paint_frame(messages_win, selected=True)
|
||||||
|
refresh_pad(1)
|
||||||
|
elif ui_state.current_window == 2:
|
||||||
|
paint_frame(nodes_win, selected=True)
|
||||||
|
refresh_pad(2)
|
||||||
|
|
||||||
draw_window_arrows(ui_state.current_window)
|
draw_window_arrows(ui_state.current_window)
|
||||||
|
|
||||||
|
|
||||||
@@ -459,16 +404,31 @@ def handle_function_keys(char: int) -> None:
|
|||||||
return
|
return
|
||||||
|
|
||||||
ui_state.current_window = target
|
ui_state.current_window = target
|
||||||
if ui_state.single_pane_mode:
|
handle_resize(root_win, False)
|
||||||
handle_resize(root_win, False)
|
|
||||||
return
|
|
||||||
|
|
||||||
refresh_main_window(old_window, selected=False)
|
if old_window == 0:
|
||||||
|
paint_frame(channel_win, selected=False)
|
||||||
|
refresh_pad(0)
|
||||||
|
elif old_window == 1:
|
||||||
|
paint_frame(messages_win, selected=False)
|
||||||
|
refresh_pad(1)
|
||||||
|
elif old_window == 2:
|
||||||
|
paint_frame(nodes_win, selected=False)
|
||||||
|
refresh_pad(2)
|
||||||
|
|
||||||
if not ui_state.single_pane_mode:
|
if not ui_state.single_pane_mode:
|
||||||
draw_window_arrows(old_window)
|
draw_window_arrows(old_window)
|
||||||
|
|
||||||
refresh_main_window(ui_state.current_window, selected=True)
|
if ui_state.current_window == 0:
|
||||||
|
paint_frame(channel_win, selected=True)
|
||||||
|
refresh_pad(0)
|
||||||
|
elif ui_state.current_window == 1:
|
||||||
|
paint_frame(messages_win, selected=True)
|
||||||
|
refresh_pad(1)
|
||||||
|
elif ui_state.current_window == 2:
|
||||||
|
paint_frame(nodes_win, selected=True)
|
||||||
|
refresh_pad(2)
|
||||||
|
|
||||||
draw_window_arrows(ui_state.current_window)
|
draw_window_arrows(ui_state.current_window)
|
||||||
|
|
||||||
|
|
||||||
@@ -520,34 +480,34 @@ def handle_enter(input_text: str) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def handle_f5_key(stdscr: curses.window) -> None:
|
def handle_f5_key(stdscr: curses.window) -> None:
|
||||||
if not ui_state.node_list:
|
node = None
|
||||||
return
|
try:
|
||||||
|
|
||||||
def build_node_details() -> tuple[str, list[str]]:
|
|
||||||
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
||||||
|
|
||||||
message_parts = []
|
message_parts = []
|
||||||
|
|
||||||
message_parts.append("**📋 Basic Information:**")
|
message_parts.append("**📋 Basic Information:**")
|
||||||
message_parts.append(f"• Device: {node.get('user', {}).get('longName', 'Unknown')}")
|
message_parts.append(f"• Device: {node.get('user', {}).get('longName', 'Unknown')}")
|
||||||
message_parts.append(f"• Short name: {node.get('user', {}).get('shortName', 'Unknown')}")
|
message_parts.append(f"• Short name: {node.get('user', {}).get('shortName', 'Unknown')}")
|
||||||
message_parts.append(f"• Hardware: {node.get('user', {}).get('hwModel', 'Unknown')}")
|
message_parts.append(f"• Hardware: {node.get('user', {}).get('hwModel', 'Unknown')}")
|
||||||
message_parts.append(f"• Role: {node.get('user', {}).get('role', 'Unknown')}")
|
|
||||||
message_parts.append(f"Public key: {node.get('user', {}).get('publicKey')}")
|
|
||||||
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
|
|
||||||
|
|
||||||
|
role = f"{node.get('user', {}).get('role', 'Unknown')}"
|
||||||
|
message_parts.append(f"• Role: {role}")
|
||||||
|
|
||||||
|
pk = f"{node.get('user', {}).get('publicKey')}"
|
||||||
|
message_parts.append(f"Public key: {pk}")
|
||||||
|
|
||||||
|
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
|
||||||
if "position" in node:
|
if "position" in node:
|
||||||
pos = node["position"]
|
pos = node["position"]
|
||||||
has_coords = pos.get("latitude") and pos.get("longitude")
|
if pos.get("latitude") and pos.get("longitude"):
|
||||||
if has_coords:
|
|
||||||
message_parts.append(f"• Position: {pos['latitude']:.4f}, {pos['longitude']:.4f}")
|
message_parts.append(f"• Position: {pos['latitude']:.4f}, {pos['longitude']:.4f}")
|
||||||
if pos.get("altitude"):
|
if pos.get("altitude"):
|
||||||
message_parts.append(f"• Altitude: {pos['altitude']}m")
|
message_parts.append(f"• Altitude: {pos['altitude']}m")
|
||||||
if has_coords:
|
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
|
||||||
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
|
|
||||||
|
|
||||||
if any(key in node for key in ["snr", "hopsAway", "lastHeard"]):
|
if any(key in node for key in ["snr", "hopsAway", "lastHeard"]):
|
||||||
message_parts.append("")
|
message_parts.append("\n**🌐 Network Metrics:**")
|
||||||
message_parts.append("**🌐 Network Metrics:**")
|
|
||||||
|
|
||||||
if "snr" in node:
|
if "snr" in node:
|
||||||
snr = node["snr"]
|
snr = node["snr"]
|
||||||
@@ -567,13 +527,12 @@ def handle_f5_key(stdscr: curses.window) -> None:
|
|||||||
hop_emoji = "📡" if hops == 0 else "🔄" if hops == 1 else "⏩"
|
hop_emoji = "📡" if hops == 0 else "🔄" if hops == 1 else "⏩"
|
||||||
message_parts.append(f"• Hops away: {hop_emoji} {hops}")
|
message_parts.append(f"• Hops away: {hop_emoji} {hops}")
|
||||||
|
|
||||||
if node.get("lastHeard"):
|
if "lastHeard" in node and node["lastHeard"]:
|
||||||
message_parts.append(f"• Last heard: 🕐 {get_time_ago(node['lastHeard'])}")
|
message_parts.append(f"• Last heard: 🕐 {get_time_ago(node['lastHeard'])}")
|
||||||
|
|
||||||
if node.get("deviceMetrics"):
|
if node.get("deviceMetrics"):
|
||||||
metrics = node["deviceMetrics"]
|
metrics = node["deviceMetrics"]
|
||||||
message_parts.append("")
|
message_parts.append("\n**📊 Device Metrics:**")
|
||||||
message_parts.append("**📊 Device Metrics:**")
|
|
||||||
|
|
||||||
if "batteryLevel" in metrics:
|
if "batteryLevel" in metrics:
|
||||||
battery = metrics["batteryLevel"]
|
battery = metrics["batteryLevel"]
|
||||||
@@ -594,128 +553,21 @@ def handle_f5_key(stdscr: curses.window) -> None:
|
|||||||
air_emoji = "🔴" if air_util > 80 else "🟡" if air_util > 50 else "🟢"
|
air_emoji = "🔴" if air_util > 80 else "🟡" if air_util > 50 else "🟢"
|
||||||
message_parts.append(f"• Air utilization TX: {air_emoji} {air_util:.2f}%")
|
message_parts.append(f"• Air utilization TX: {air_emoji} {air_util:.2f}%")
|
||||||
|
|
||||||
title = t(
|
message = "\n".join(message_parts)
|
||||||
"ui.dialog.node_details_title",
|
|
||||||
default="📡 Node Details: {name}",
|
contact.ui.dialog.dialog(
|
||||||
name=node.get("user", {}).get("shortName", "Unknown"),
|
t(
|
||||||
|
"ui.dialog.node_details_title",
|
||||||
|
default="📡 Node Details: {name}",
|
||||||
|
name=node.get("user", {}).get("shortName", "Unknown"),
|
||||||
|
),
|
||||||
|
message,
|
||||||
)
|
)
|
||||||
return title, message_parts
|
curses.curs_set(1) # Show cursor again
|
||||||
|
handle_resize(stdscr, False)
|
||||||
previous_window = ui_state.current_window
|
|
||||||
ui_state.current_window = 4
|
|
||||||
scroll_offset = 0
|
|
||||||
dialog_win = None
|
|
||||||
|
|
||||||
curses.curs_set(0)
|
|
||||||
refresh_node_selection(highlight=True)
|
|
||||||
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
curses.update_lines_cols()
|
|
||||||
height, width = curses.LINES, curses.COLS
|
|
||||||
title, message_lines = build_node_details()
|
|
||||||
|
|
||||||
max_line_length = max(len(title), *(len(line) for line in message_lines))
|
|
||||||
dialog_width = min(max(max_line_length + 4, 20), max(10, width - 2))
|
|
||||||
dialog_height = min(max(len(message_lines) + 4, 6), max(6, height - 2))
|
|
||||||
x = max(0, (width - dialog_width) // 2)
|
|
||||||
y = max(0, (height - dialog_height) // 2)
|
|
||||||
viewport_h = max(1, dialog_height - 4)
|
|
||||||
max_scroll = max(0, len(message_lines) - viewport_h)
|
|
||||||
scroll_offset = max(0, min(scroll_offset, max_scroll))
|
|
||||||
|
|
||||||
if dialog_win is None:
|
|
||||||
dialog_win = curses.newwin(dialog_height, dialog_width, y, x)
|
|
||||||
else:
|
|
||||||
dialog_win.erase()
|
|
||||||
dialog_win.refresh()
|
|
||||||
dialog_win.resize(dialog_height, dialog_width)
|
|
||||||
dialog_win.mvwin(y, x)
|
|
||||||
|
|
||||||
dialog_win.keypad(True)
|
|
||||||
dialog_win.bkgd(get_color("background"))
|
|
||||||
dialog_win.attrset(get_color("window_frame"))
|
|
||||||
dialog_win.border(0)
|
|
||||||
|
|
||||||
try:
|
|
||||||
dialog_win.addstr(0, 2, title[: max(0, dialog_width - 4)], get_color("settings_default"))
|
|
||||||
hint = f" {ui_state.selected_node + 1}/{len(ui_state.node_list)} "
|
|
||||||
dialog_win.addstr(0, max(2, dialog_width - len(hint) - 2), hint, get_color("commands"))
|
|
||||||
except curses.error:
|
|
||||||
pass
|
|
||||||
|
|
||||||
msg_win = dialog_win.derwin(viewport_h + 2, dialog_width - 2, 1, 1)
|
|
||||||
msg_win.erase()
|
|
||||||
|
|
||||||
for row, line in enumerate(message_lines[scroll_offset : scroll_offset + viewport_h], start=1):
|
|
||||||
trimmed = line[: max(0, dialog_width - 6)]
|
|
||||||
try:
|
|
||||||
msg_win.addstr(row, 1, trimmed, get_color("settings_default"))
|
|
||||||
except curses.error:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if len(message_lines) > viewport_h:
|
|
||||||
old_index = ui_state.start_index[4] if len(ui_state.start_index) > 4 else 0
|
|
||||||
while len(ui_state.start_index) <= 4:
|
|
||||||
ui_state.start_index.append(0)
|
|
||||||
ui_state.start_index[4] = scroll_offset
|
|
||||||
draw_main_arrows(msg_win, len(message_lines) - 1, window=4)
|
|
||||||
ui_state.start_index[4] = old_index
|
|
||||||
|
|
||||||
try:
|
|
||||||
ok_text = " Up/Down: Nodes PgUp/PgDn: Scroll Esc: Close "
|
|
||||||
dialog_win.addstr(
|
|
||||||
dialog_height - 2,
|
|
||||||
max(1, (dialog_width - len(ok_text)) // 2),
|
|
||||||
ok_text[: max(0, dialog_width - 2)],
|
|
||||||
get_color("settings_default", reverse=True),
|
|
||||||
)
|
|
||||||
except curses.error:
|
|
||||||
pass
|
|
||||||
|
|
||||||
dialog_win.refresh()
|
|
||||||
msg_win.noutrefresh()
|
|
||||||
curses.doupdate()
|
|
||||||
|
|
||||||
dialog_win.timeout(200)
|
|
||||||
char = dialog_win.getch()
|
|
||||||
|
|
||||||
if menu_state.need_redraw:
|
|
||||||
menu_state.need_redraw = False
|
|
||||||
continue
|
|
||||||
|
|
||||||
if char in (27, curses.KEY_LEFT, curses.KEY_ENTER, 10, 13, 32):
|
|
||||||
break
|
|
||||||
if char == curses.KEY_UP:
|
|
||||||
old_selected_node = ui_state.selected_node
|
|
||||||
ui_state.selected_node = (ui_state.selected_node - 1) % len(ui_state.node_list)
|
|
||||||
scroll_offset = 0
|
|
||||||
refresh_node_selection(old_selected_node, highlight=True)
|
|
||||||
elif char == curses.KEY_DOWN:
|
|
||||||
old_selected_node = ui_state.selected_node
|
|
||||||
ui_state.selected_node = (ui_state.selected_node + 1) % len(ui_state.node_list)
|
|
||||||
scroll_offset = 0
|
|
||||||
refresh_node_selection(old_selected_node, highlight=True)
|
|
||||||
elif char == curses.KEY_PPAGE:
|
|
||||||
scroll_offset = max(0, scroll_offset - viewport_h)
|
|
||||||
elif char == curses.KEY_NPAGE:
|
|
||||||
scroll_offset = min(max_scroll, scroll_offset + viewport_h)
|
|
||||||
elif char == curses.KEY_HOME:
|
|
||||||
scroll_offset = 0
|
|
||||||
elif char == curses.KEY_END:
|
|
||||||
scroll_offset = max_scroll
|
|
||||||
elif char == curses.KEY_RESIZE:
|
|
||||||
continue
|
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
return
|
return
|
||||||
finally:
|
|
||||||
if dialog_win is not None:
|
|
||||||
dialog_win.erase()
|
|
||||||
dialog_win.refresh()
|
|
||||||
ui_state.current_window = previous_window
|
|
||||||
curses.curs_set(1)
|
|
||||||
handle_resize(stdscr, False)
|
|
||||||
|
|
||||||
|
|
||||||
def handle_ctrl_t(stdscr: curses.window) -> None:
|
def handle_ctrl_t(stdscr: curses.window) -> None:
|
||||||
@@ -772,7 +624,6 @@ def handle_backtick(stdscr: curses.window) -> None:
|
|||||||
ui_state.current_window = 4
|
ui_state.current_window = 4
|
||||||
settings_menu(stdscr, interface_state.interface)
|
settings_menu(stdscr, interface_state.interface)
|
||||||
ui_state.current_window = previous_window
|
ui_state.current_window = previous_window
|
||||||
ui_state.single_pane_mode = config.single_pane_mode.lower() == "true"
|
|
||||||
curses.curs_set(1)
|
curses.curs_set(1)
|
||||||
refresh_node_list()
|
refresh_node_list()
|
||||||
handle_resize(stdscr, False)
|
handle_resize(stdscr, False)
|
||||||
@@ -1014,7 +865,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
|
|||||||
|
|
||||||
row = 0
|
row = 0
|
||||||
for prefix, message in messages:
|
for prefix, message in messages:
|
||||||
full_message = normalize_message_text(f"{prefix}{message}")
|
full_message = f"{prefix}{message}"
|
||||||
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
|
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
|
||||||
msg_line_count += len(wrapped_lines)
|
msg_line_count += len(wrapped_lines)
|
||||||
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])
|
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])
|
||||||
@@ -1056,8 +907,10 @@ def draw_node_list() -> None:
|
|||||||
if ui_state.current_window != 2 and ui_state.single_pane_mode:
|
if ui_state.current_window != 2 and ui_state.single_pane_mode:
|
||||||
return
|
return
|
||||||
|
|
||||||
if nodes_pad is None:
|
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
|
||||||
nodes_pad = curses.newpad(1, 1)
|
# if nodes_pad is None:
|
||||||
|
# nodes_pad = curses.newpad(1, 1)
|
||||||
|
nodes_pad = curses.newpad(1, 1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
nodes_pad.erase()
|
nodes_pad.erase()
|
||||||
@@ -1071,12 +924,28 @@ def draw_node_list() -> None:
|
|||||||
node = interface_state.interface.nodesByNum[node_num]
|
node = interface_state.interface.nodesByNum[node_num]
|
||||||
secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"]
|
secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"]
|
||||||
status_icon = "🔐" if secure else "🔓"
|
status_icon = "🔐" if secure else "🔓"
|
||||||
node_name = get_node_display_name(node_num, node)
|
node_name = get_name_from_database(node_num, "long")
|
||||||
|
user_name = node["user"]["shortName"]
|
||||||
|
|
||||||
|
uptime_str = ""
|
||||||
|
if "deviceMetrics" in node and "uptimeSeconds" in node["deviceMetrics"]:
|
||||||
|
uptime_str = f" / Up: {get_readable_duration(node['deviceMetrics']['uptimeSeconds'])}"
|
||||||
|
|
||||||
|
last_heard_str = f" ■ {get_time_ago(node['lastHeard'])}" if node.get("lastHeard") else ""
|
||||||
|
hops_str = f" ■ Hops: {node['hopsAway']}" if "hopsAway" in node else ""
|
||||||
|
snr_str = f" ■ SNR: {node['snr']}dB" if node.get("hopsAway") == 0 and "snr" in node else ""
|
||||||
|
|
||||||
# Future node name custom formatting possible
|
# Future node name custom formatting possible
|
||||||
node_str = f"{status_icon} {node_name}"
|
node_str = f"{status_icon} {node_name}"
|
||||||
node_str = node_str.ljust(box_width - 4)[: box_width - 2]
|
node_str = node_str.ljust(box_width - 4)[: box_width - 2]
|
||||||
nodes_pad.addstr(i, 1, node_str, get_node_row_color(i))
|
color = "node_list"
|
||||||
|
if "isFavorite" in node and node["isFavorite"]:
|
||||||
|
color = "node_favorite"
|
||||||
|
if "isIgnored" in node and node["isIgnored"]:
|
||||||
|
color = "node_ignored"
|
||||||
|
nodes_pad.addstr(
|
||||||
|
i, 1, node_str, get_color(color, reverse=ui_state.selected_node == i and ui_state.current_window == 2)
|
||||||
|
)
|
||||||
|
|
||||||
paint_frame(nodes_win, selected=(ui_state.current_window == 2))
|
paint_frame(nodes_win, selected=(ui_state.current_window == 2))
|
||||||
nodes_win.refresh()
|
nodes_win.refresh()
|
||||||
|
|||||||
@@ -56,13 +56,6 @@ config_folder = os.path.abspath(config.node_configs_file_path)
|
|||||||
# Load translations
|
# Load translations
|
||||||
field_mapping, help_text = parse_ini_file(translation_file)
|
field_mapping, help_text = parse_ini_file(translation_file)
|
||||||
|
|
||||||
def _is_repeated_field(field_desc) -> bool:
|
|
||||||
"""Return True if the protobuf field is repeated.
|
|
||||||
Protobuf 6.31.0 and later use an is_repeated property, while older versions compare against the label field.
|
|
||||||
"""
|
|
||||||
if hasattr(field_desc, "is_repeated"):
|
|
||||||
return bool(field_desc.is_repeated)
|
|
||||||
return field_desc.label == field_desc.LABEL_REPEATED
|
|
||||||
|
|
||||||
def reload_translations() -> None:
|
def reload_translations() -> None:
|
||||||
global translation_file, field_mapping, help_text
|
global translation_file, field_mapping, help_text
|
||||||
@@ -571,7 +564,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
|||||||
new_value = new_value == "True" or new_value is True
|
new_value = new_value == "True" or new_value is True
|
||||||
menu_state.start_index.pop()
|
menu_state.start_index.pop()
|
||||||
|
|
||||||
elif _is_repeated_field(field): # Handle repeated field - Not currently used
|
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
|
||||||
new_value = get_repeated_input(current_value)
|
new_value = get_repeated_input(current_value)
|
||||||
new_value = current_value if new_value is None else new_value.split(", ")
|
new_value = current_value if new_value is None else new_value.split(", ")
|
||||||
menu_state.start_index.pop()
|
menu_state.start_index.pop()
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
from queue import SimpleQueue
|
||||||
from typing import Any, Union, List, Dict
|
from typing import Any, Union, List, Dict
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
@@ -44,3 +45,5 @@ class InterfaceState:
|
|||||||
@dataclass
|
@dataclass
|
||||||
class AppState:
|
class AppState:
|
||||||
lock: Any = None
|
lock: Any = None
|
||||||
|
rx_queue: SimpleQueue = field(default_factory=SimpleQueue)
|
||||||
|
ui_shutdown: bool = False
|
||||||
|
|||||||
@@ -566,7 +566,7 @@ def save_json(file_path: str, data: Dict[str, Any]) -> None:
|
|||||||
formatted_json = config.format_json_single_line_arrays(data)
|
formatted_json = config.format_json_single_line_arrays(data)
|
||||||
with open(file_path, "w", encoding="utf-8") as f:
|
with open(file_path, "w", encoding="utf-8") as f:
|
||||||
f.write(formatted_json)
|
f.write(formatted_json)
|
||||||
config.reload_config()
|
setup_colors(reinit=True)
|
||||||
reload_translations(data.get("language"))
|
reload_translations(data.get("language"))
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -35,10 +35,5 @@ def setup_parser() -> ArgumentParser:
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--settings", "--set", "--control", "-c", help="Launch directly into the settings", action="store_true"
|
"--settings", "--set", "--control", "-c", help="Launch directly into the settings", action="store_true"
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
|
||||||
"--demo-screenshot",
|
|
||||||
help="Launch with a fake interface and seeded demo data for screenshots/testing.",
|
|
||||||
action="store_true",
|
|
||||||
)
|
|
||||||
|
|
||||||
return parser
|
return parser
|
||||||
|
|||||||
@@ -9,17 +9,6 @@ from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
|
|||||||
# defs are from meshtastic/python/main
|
# defs are from meshtastic/python/main
|
||||||
|
|
||||||
|
|
||||||
def _is_repeated_field(field_desc) -> bool:
|
|
||||||
"""Return True if the protobuf field is repeated.
|
|
||||||
|
|
||||||
Protobuf 6.31.0+ exposes `is_repeated`, while older versions require
|
|
||||||
checking `label == LABEL_REPEATED`.
|
|
||||||
"""
|
|
||||||
if hasattr(field_desc, "is_repeated"):
|
|
||||||
return bool(field_desc.is_repeated)
|
|
||||||
return field_desc.label == field_desc.LABEL_REPEATED
|
|
||||||
|
|
||||||
|
|
||||||
def traverseConfig(config_root, config, interface_config) -> bool:
|
def traverseConfig(config_root, config, interface_config) -> bool:
|
||||||
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
|
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
|
||||||
snake_name = camel_to_snake(config_root)
|
snake_name = camel_to_snake(config_root)
|
||||||
@@ -100,7 +89,7 @@ def setPref(config, comp_name, raw_val) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
# repeating fields need to be handled with append, not setattr
|
# repeating fields need to be handled with append, not setattr
|
||||||
if not _is_repeated_field(pref):
|
if pref.label != pref.LABEL_REPEATED:
|
||||||
try:
|
try:
|
||||||
if config_type.message_type is not None:
|
if config_type.message_type is not None:
|
||||||
config_values = getattr(config_part, config_type.name)
|
config_values = getattr(config_part, config_type.name)
|
||||||
|
|||||||
@@ -1,226 +0,0 @@
|
|||||||
import os
|
|
||||||
import sqlite3
|
|
||||||
import tempfile
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import Dict, List, Tuple, Union
|
|
||||||
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.utilities.db_handler import get_table_name
|
|
||||||
from contact.utilities.singleton import interface_state
|
|
||||||
|
|
||||||
|
|
||||||
DEMO_DB_FILENAME = "contact_demo_client.db"
|
|
||||||
DEMO_LOCAL_NODE_NUM = 0xC0DEC0DE
|
|
||||||
DEMO_BASE_TIMESTAMP = 1738717200 # 2025-02-04 17:00:00 UTC
|
|
||||||
DEMO_CHANNELS = ["MediumFast", "Another Channel"]
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DemoChannelSettings:
|
|
||||||
name: str
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DemoChannel:
|
|
||||||
role: bool
|
|
||||||
settings: DemoChannelSettings
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DemoLoRaConfig:
|
|
||||||
region: int = 1
|
|
||||||
modem_preset: int = 0
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DemoLocalConfig:
|
|
||||||
lora: DemoLoRaConfig
|
|
||||||
|
|
||||||
|
|
||||||
class DemoLocalNode:
|
|
||||||
def __init__(self, interface: "DemoInterface", channels: List[DemoChannel]) -> None:
|
|
||||||
self._interface = interface
|
|
||||||
self.channels = channels
|
|
||||||
self.localConfig = DemoLocalConfig(lora=DemoLoRaConfig())
|
|
||||||
|
|
||||||
def setFavorite(self, node_num: int) -> None:
|
|
||||||
self._interface.nodesByNum[node_num]["isFavorite"] = True
|
|
||||||
|
|
||||||
def removeFavorite(self, node_num: int) -> None:
|
|
||||||
self._interface.nodesByNum[node_num]["isFavorite"] = False
|
|
||||||
|
|
||||||
def setIgnored(self, node_num: int) -> None:
|
|
||||||
self._interface.nodesByNum[node_num]["isIgnored"] = True
|
|
||||||
|
|
||||||
def removeIgnored(self, node_num: int) -> None:
|
|
||||||
self._interface.nodesByNum[node_num]["isIgnored"] = False
|
|
||||||
|
|
||||||
def removeNode(self, node_num: int) -> None:
|
|
||||||
self._interface.nodesByNum.pop(node_num, None)
|
|
||||||
|
|
||||||
|
|
||||||
class DemoInterface:
|
|
||||||
def __init__(self, nodes: Dict[int, Dict[str, object]], channels: List[DemoChannel]) -> None:
|
|
||||||
self.nodesByNum = nodes
|
|
||||||
self.nodes = self.nodesByNum
|
|
||||||
self.localNode = DemoLocalNode(self, channels)
|
|
||||||
|
|
||||||
def getMyNodeInfo(self) -> Dict[str, int]:
|
|
||||||
return {"num": DEMO_LOCAL_NODE_NUM}
|
|
||||||
|
|
||||||
def getNode(self, selector: str) -> DemoLocalNode:
|
|
||||||
if selector != "^local":
|
|
||||||
raise KeyError(selector)
|
|
||||||
return self.localNode
|
|
||||||
|
|
||||||
def close(self) -> None:
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
def build_demo_interface() -> DemoInterface:
|
|
||||||
channels = [DemoChannel(role=True, settings=DemoChannelSettings(name=name)) for name in DEMO_CHANNELS]
|
|
||||||
|
|
||||||
nodes = {
|
|
||||||
DEMO_LOCAL_NODE_NUM: _build_node(
|
|
||||||
DEMO_LOCAL_NODE_NUM,
|
|
||||||
"Meshtastic fb3c",
|
|
||||||
"fb3c",
|
|
||||||
hops=0,
|
|
||||||
snr=13.7,
|
|
||||||
last_heard_offset=5,
|
|
||||||
battery=88,
|
|
||||||
voltage=4.1,
|
|
||||||
favorite=True,
|
|
||||||
),
|
|
||||||
0xA1000001: _build_node(0xA1000001, "KG7NDX-N2", "N2", hops=1, last_heard_offset=18, battery=79, voltage=4.0),
|
|
||||||
0xA1000002: _build_node(0xA1000002, "Satellite II Repeater", "SAT2", hops=2, last_heard_offset=31),
|
|
||||||
0xA1000003: _build_node(0xA1000003, "Search for Discord/Meshtastic", "DISC", hops=1, last_heard_offset=46),
|
|
||||||
0xA1000004: _build_node(0xA1000004, "K7EOK Mobile", "MOBL", hops=1, last_heard_offset=63, battery=52),
|
|
||||||
0xA1000005: _build_node(0xA1000005, "Turtle", "TRTL", hops=3, last_heard_offset=87),
|
|
||||||
0xA1000006: _build_node(0xA1000006, "CARS Trewvilliger Plaza", "CARS", hops=2, last_heard_offset=121),
|
|
||||||
0xA1000007: _build_node(0xA1000007, "No Hands!", "NHDS", hops=1, last_heard_offset=155),
|
|
||||||
0xA1000008: _build_node(0xA1000008, "McCutie", "MCCU", hops=2, last_heard_offset=211, ignored=True),
|
|
||||||
0xA1000009: _build_node(0xA1000009, "K1PDX", "K1PX", hops=2, last_heard_offset=267),
|
|
||||||
0xA100000A: _build_node(0xA100000A, "Arnold Creek", "ARND", hops=1, last_heard_offset=301),
|
|
||||||
0xA100000B: _build_node(0xA100000B, "Nansen", "NANS", hops=1, last_heard_offset=355),
|
|
||||||
0xA100000C: _build_node(0xA100000C, "Kodin 1", "KOD1", hops=2, last_heard_offset=402),
|
|
||||||
0xA100000D: _build_node(0xA100000D, "PH1", "PH1", hops=3, last_heard_offset=470),
|
|
||||||
0xA100000E: _build_node(0xA100000E, "Luna", "LUNA", hops=1, last_heard_offset=501),
|
|
||||||
0xA100000F: _build_node(0xA100000F, "sputnik1", "SPUT", hops=1, last_heard_offset=550),
|
|
||||||
0xA1000010: _build_node(0xA1000010, "K7EOK Maplewood West", "MAPL", hops=2, last_heard_offset=602),
|
|
||||||
0xA1000011: _build_node(0xA1000011, "KE7YVU 2", "YVU2", hops=2, last_heard_offset=655),
|
|
||||||
0xA1000012: _build_node(0xA1000012, "DNET", "DNET", hops=1, last_heard_offset=702),
|
|
||||||
0xA1000013: _build_node(0xA1000013, "Green Bluff", "GBLF", hops=1, last_heard_offset=780),
|
|
||||||
0xA1000014: _build_node(0xA1000014, "Council Crest Solar", "CCST", hops=2, last_heard_offset=830),
|
|
||||||
0xA1000015: _build_node(0xA1000015, "Meshtastic 61c7", "61c7", hops=1, last_heard_offset=901),
|
|
||||||
0xA1000016: _build_node(0xA1000016, "Bella", "BELA", hops=2, last_heard_offset=950),
|
|
||||||
0xA1000017: _build_node(0xA1000017, "Mojo Solar Base 4f12", "MOJO", hops=1, last_heard_offset=1010, favorite=True),
|
|
||||||
}
|
|
||||||
|
|
||||||
return DemoInterface(nodes=nodes, channels=channels)
|
|
||||||
|
|
||||||
|
|
||||||
def configure_demo_database(base_dir: str = "") -> str:
|
|
||||||
if not base_dir:
|
|
||||||
base_dir = tempfile.mkdtemp(prefix="contact_demo_")
|
|
||||||
os.makedirs(base_dir, exist_ok=True)
|
|
||||||
|
|
||||||
db_path = os.path.join(base_dir, DEMO_DB_FILENAME)
|
|
||||||
if os.path.exists(db_path):
|
|
||||||
os.remove(db_path)
|
|
||||||
|
|
||||||
config.db_file_path = db_path
|
|
||||||
return db_path
|
|
||||||
|
|
||||||
|
|
||||||
def seed_demo_messages() -> None:
|
|
||||||
schema = """
|
|
||||||
user_id TEXT,
|
|
||||||
message_text TEXT,
|
|
||||||
timestamp INTEGER,
|
|
||||||
ack_type TEXT
|
|
||||||
"""
|
|
||||||
|
|
||||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
|
||||||
cursor = db_connection.cursor()
|
|
||||||
|
|
||||||
for channel_name, rows in _demo_messages().items():
|
|
||||||
table_name = get_table_name(channel_name)
|
|
||||||
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({schema})")
|
|
||||||
cursor.executemany(
|
|
||||||
f"""
|
|
||||||
INSERT INTO {table_name} (user_id, message_text, timestamp, ack_type)
|
|
||||||
VALUES (?, ?, ?, ?)
|
|
||||||
""",
|
|
||||||
rows,
|
|
||||||
)
|
|
||||||
|
|
||||||
db_connection.commit()
|
|
||||||
|
|
||||||
|
|
||||||
def _build_node(
|
|
||||||
node_num: int,
|
|
||||||
long_name: str,
|
|
||||||
short_name: str,
|
|
||||||
*,
|
|
||||||
hops: int,
|
|
||||||
last_heard_offset: int,
|
|
||||||
snr: float = 0.0,
|
|
||||||
battery: int = 0,
|
|
||||||
voltage: float = 0.0,
|
|
||||||
favorite: bool = False,
|
|
||||||
ignored: bool = False,
|
|
||||||
) -> Dict[str, object]:
|
|
||||||
node = {
|
|
||||||
"num": node_num,
|
|
||||||
"user": {
|
|
||||||
"longName": long_name,
|
|
||||||
"shortName": short_name,
|
|
||||||
"hwModel": "TBEAM",
|
|
||||||
"role": "CLIENT",
|
|
||||||
"publicKey": f"pk-{node_num:08x}",
|
|
||||||
"isLicensed": True,
|
|
||||||
},
|
|
||||||
"lastHeard": DEMO_BASE_TIMESTAMP + 3600 - last_heard_offset,
|
|
||||||
"hopsAway": hops,
|
|
||||||
"isFavorite": favorite,
|
|
||||||
"isIgnored": ignored,
|
|
||||||
}
|
|
||||||
|
|
||||||
if snr:
|
|
||||||
node["snr"] = snr
|
|
||||||
if battery:
|
|
||||||
node["deviceMetrics"] = {
|
|
||||||
"batteryLevel": battery,
|
|
||||||
"voltage": voltage or 4.0,
|
|
||||||
"uptimeSeconds": 86400 + node_num % 10000,
|
|
||||||
"channelUtilization": 12.5 + (node_num % 7),
|
|
||||||
"airUtilTx": 4.5 + (node_num % 5),
|
|
||||||
}
|
|
||||||
|
|
||||||
if node_num % 3 == 0:
|
|
||||||
node["position"] = {
|
|
||||||
"latitude": 45.5231 + ((node_num % 50) * 0.0001),
|
|
||||||
"longitude": -122.6765 - ((node_num % 50) * 0.0001),
|
|
||||||
"altitude": 85 + (node_num % 20),
|
|
||||||
}
|
|
||||||
|
|
||||||
return node
|
|
||||||
|
|
||||||
|
|
||||||
def _demo_messages() -> Dict[Union[str, int], List[Tuple[str, str, int, Union[str, None]]]]:
|
|
||||||
return {
|
|
||||||
"MediumFast": [
|
|
||||||
(str(DEMO_LOCAL_NODE_NUM), "Help, I'm stuck in a ditch!", DEMO_BASE_TIMESTAMP + 45, "Ack"),
|
|
||||||
("2701131778", "Do you require a alpinist?", DEMO_BASE_TIMESTAMP + 80, None),
|
|
||||||
(str(DEMO_LOCAL_NODE_NUM), "I don't know what that is.", DEMO_BASE_TIMESTAMP + 104, "Implicit"),
|
|
||||||
],
|
|
||||||
"Another Channel": [
|
|
||||||
("2701131788", "Weather is holding for the summit push.", DEMO_BASE_TIMESTAMP + 220, None),
|
|
||||||
(str(DEMO_LOCAL_NODE_NUM), "Copy that. Keep me posted.", DEMO_BASE_TIMESTAMP + 260, "Ack"),
|
|
||||||
],
|
|
||||||
2701131788: [
|
|
||||||
("2701131788", "Ping me when you are back at the trailhead.", DEMO_BASE_TIMESTAMP + 330, None),
|
|
||||||
(str(DEMO_LOCAL_NODE_NUM), "Will do.", DEMO_BASE_TIMESTAMP + 350, "Ack"),
|
|
||||||
],
|
|
||||||
}
|
|
||||||
@@ -1,54 +0,0 @@
|
|||||||
"""Helpers for normalizing emoji sequences in width-sensitive message rendering."""
|
|
||||||
|
|
||||||
# Strip zero-width and presentation modifiers that make terminal cell width inconsistent.
|
|
||||||
EMOJI_MODIFIER_REPLACEMENTS = {
|
|
||||||
"\u200d": "",
|
|
||||||
"\u20e3": "",
|
|
||||||
"\ufe0e": "",
|
|
||||||
"\ufe0f": "",
|
|
||||||
"\U0001F3FB": "",
|
|
||||||
"\U0001F3FC": "",
|
|
||||||
"\U0001F3FD": "",
|
|
||||||
"\U0001F3FE": "",
|
|
||||||
"\U0001F3FF": "",
|
|
||||||
}
|
|
||||||
|
|
||||||
_EMOJI_MODIFIER_TRANSLATION = str.maketrans(EMOJI_MODIFIER_REPLACEMENTS)
|
|
||||||
_REGIONAL_INDICATOR_START = ord("\U0001F1E6")
|
|
||||||
_REGIONAL_INDICATOR_END = ord("\U0001F1FF")
|
|
||||||
|
|
||||||
|
|
||||||
def _regional_indicator_to_letter(char: str) -> str:
|
|
||||||
return chr(ord("A") + ord(char) - _REGIONAL_INDICATOR_START)
|
|
||||||
|
|
||||||
|
|
||||||
def _normalize_flag_emoji(text: str) -> str:
|
|
||||||
"""Convert flag emoji built from regional indicators into ASCII country codes."""
|
|
||||||
normalized = []
|
|
||||||
index = 0
|
|
||||||
|
|
||||||
while index < len(text):
|
|
||||||
current = text[index]
|
|
||||||
current_ord = ord(current)
|
|
||||||
|
|
||||||
if _REGIONAL_INDICATOR_START <= current_ord <= _REGIONAL_INDICATOR_END and index + 1 < len(text):
|
|
||||||
next_char = text[index + 1]
|
|
||||||
next_ord = ord(next_char)
|
|
||||||
if _REGIONAL_INDICATOR_START <= next_ord <= _REGIONAL_INDICATOR_END:
|
|
||||||
normalized.append(_regional_indicator_to_letter(current))
|
|
||||||
normalized.append(_regional_indicator_to_letter(next_char))
|
|
||||||
index += 2
|
|
||||||
continue
|
|
||||||
|
|
||||||
normalized.append(current)
|
|
||||||
index += 1
|
|
||||||
|
|
||||||
return "".join(normalized)
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_message_text(text: str) -> str:
|
|
||||||
"""Strip modifiers and rewrite flag emoji into stable terminal-friendly text."""
|
|
||||||
if not text:
|
|
||||||
return text
|
|
||||||
|
|
||||||
return _normalize_flag_emoji(text.translate(_EMOJI_MODIFIER_TRANSLATION))
|
|
||||||
@@ -68,19 +68,19 @@ def get_chunks(data):
|
|||||||
# Leave it string as last resort
|
# Leave it string as last resort
|
||||||
value = value
|
value = value
|
||||||
|
|
||||||
# Python 3.9-compatible alternative to match/case.
|
match key:
|
||||||
if key == "uptime_seconds":
|
|
||||||
# convert seconds to hours, for our sanity
|
# convert seconds to hours, for our sanity
|
||||||
value = round(value / 60 / 60, 1)
|
case "uptime_seconds":
|
||||||
elif key in ("longitude_i", "latitude_i"):
|
value = round(value / 60 / 60, 1)
|
||||||
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
|
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
|
||||||
# truncate to 6th digit after floating point, which would be still accurate
|
# truncate to 6th digit after floating point, which would be still accurate
|
||||||
value = round(value * 1e-7, 6)
|
case "longitude_i" | "latitude_i":
|
||||||
elif key == "wind_direction":
|
value = round(value * 1e-7, 6)
|
||||||
# Convert wind direction from degrees to abbreviation
|
# Convert wind direction from degrees to abbreviation
|
||||||
value = humanize_wind_direction(value)
|
case "wind_direction":
|
||||||
elif key == "time":
|
value = humanize_wind_direction(value)
|
||||||
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
|
case "time":
|
||||||
|
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
|
||||||
|
|
||||||
if key in sensors:
|
if key in sensors:
|
||||||
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "
|
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "contact"
|
name = "contact"
|
||||||
version = "1.5.0"
|
version = "1.4.14"
|
||||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||||
authors = [
|
authors = [
|
||||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||||
|
|||||||
@@ -1 +0,0 @@
|
|||||||
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
import unittest
|
|
||||||
|
|
||||||
from contact.utilities.arg_parser import setup_parser
|
|
||||||
|
|
||||||
|
|
||||||
class ArgParserTests(unittest.TestCase):
|
|
||||||
def test_demo_screenshot_flag_is_supported(self) -> None:
|
|
||||||
args = setup_parser().parse_args(["--demo-screenshot"])
|
|
||||||
self.assertTrue(args.demo_screenshot)
|
|
||||||
|
|
||||||
def test_demo_screenshot_defaults_to_false(self) -> None:
|
|
||||||
args = setup_parser().parse_args([])
|
|
||||||
self.assertFalse(args.demo_screenshot)
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
import unittest
|
|
||||||
|
|
||||||
from contact.utilities.config_io import _is_repeated_field, splitCompoundName
|
|
||||||
|
|
||||||
|
|
||||||
class ConfigIoTests(unittest.TestCase):
|
|
||||||
def test_split_compound_name_preserves_multi_part_values(self) -> None:
|
|
||||||
self.assertEqual(splitCompoundName("config.device.role"), ["config", "device", "role"])
|
|
||||||
|
|
||||||
def test_split_compound_name_duplicates_single_part_values(self) -> None:
|
|
||||||
self.assertEqual(splitCompoundName("owner"), ["owner", "owner"])
|
|
||||||
|
|
||||||
def test_is_repeated_field_prefers_new_style_attribute(self) -> None:
|
|
||||||
field = type("Field", (), {"is_repeated": True})()
|
|
||||||
|
|
||||||
self.assertTrue(_is_repeated_field(field))
|
|
||||||
|
|
||||||
def test_is_repeated_field_falls_back_to_label_comparison(self) -> None:
|
|
||||||
field_type = type("Field", (), {"label": 3, "LABEL_REPEATED": 3})
|
|
||||||
|
|
||||||
self.assertTrue(_is_repeated_field(field_type()))
|
|
||||||
@@ -1,15 +0,0 @@
|
|||||||
import unittest
|
|
||||||
|
|
||||||
from contact.utilities.control_utils import transform_menu_path
|
|
||||||
|
|
||||||
|
|
||||||
class ControlUtilsTests(unittest.TestCase):
|
|
||||||
def test_transform_menu_path_applies_replacements_and_normalization(self) -> None:
|
|
||||||
transformed = transform_menu_path(["Main Menu", "Radio Settings", "Channel 2", "Detail"])
|
|
||||||
|
|
||||||
self.assertEqual(transformed, ["config", "channel", "Detail"])
|
|
||||||
|
|
||||||
def test_transform_menu_path_preserves_unmatched_entries(self) -> None:
|
|
||||||
transformed = transform_menu_path(["Main Menu", "Module Settings", "WiFi"])
|
|
||||||
|
|
||||||
self.assertEqual(transformed, ["module", "WiFi"])
|
|
||||||
@@ -1,121 +0,0 @@
|
|||||||
import os
|
|
||||||
import sqlite3
|
|
||||||
import tempfile
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.utilities import db_handler
|
|
||||||
from contact.utilities.demo_data import DEMO_LOCAL_NODE_NUM, build_demo_interface
|
|
||||||
from contact.utilities.singleton import interface_state, ui_state
|
|
||||||
from contact.utilities.utils import decimal_to_hex
|
|
||||||
|
|
||||||
from tests.test_support import reset_singletons, restore_config, snapshot_config
|
|
||||||
|
|
||||||
|
|
||||||
class DbHandlerTests(unittest.TestCase):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
reset_singletons()
|
|
||||||
self.saved_config = snapshot_config(
|
|
||||||
"db_file_path",
|
|
||||||
"message_prefix",
|
|
||||||
"sent_message_prefix",
|
|
||||||
"ack_str",
|
|
||||||
"ack_implicit_str",
|
|
||||||
"ack_unknown_str",
|
|
||||||
"nak_str",
|
|
||||||
)
|
|
||||||
self.tempdir = tempfile.TemporaryDirectory()
|
|
||||||
config.db_file_path = os.path.join(self.tempdir.name, "client.db")
|
|
||||||
interface_state.myNodeNum = 123
|
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
self.tempdir.cleanup()
|
|
||||||
restore_config(self.saved_config)
|
|
||||||
reset_singletons()
|
|
||||||
|
|
||||||
def test_save_message_to_db_and_update_ack_roundtrip(self) -> None:
|
|
||||||
timestamp = db_handler.save_message_to_db("Primary", "123", "hello")
|
|
||||||
|
|
||||||
self.assertIsInstance(timestamp, int)
|
|
||||||
|
|
||||||
db_handler.update_ack_nak("Primary", timestamp, "hello", "Ack")
|
|
||||||
|
|
||||||
with sqlite3.connect(config.db_file_path) as conn:
|
|
||||||
row = conn.execute('SELECT user_id, message_text, ack_type FROM "123_Primary_messages"').fetchone()
|
|
||||||
|
|
||||||
self.assertEqual(row, ("123", "hello", "Ack"))
|
|
||||||
|
|
||||||
def test_update_node_info_in_db_fills_defaults_and_preserves_existing_values(self) -> None:
|
|
||||||
db_handler.update_node_info_in_db(999, short_name="ABCD")
|
|
||||||
|
|
||||||
original_long_name = db_handler.get_name_from_database(999, "long")
|
|
||||||
self.assertTrue(original_long_name.startswith("Meshtastic "))
|
|
||||||
self.assertEqual(db_handler.get_name_from_database(999, "short"), "ABCD")
|
|
||||||
self.assertEqual(db_handler.is_chat_archived(999), 0)
|
|
||||||
|
|
||||||
db_handler.update_node_info_in_db(999, chat_archived=1)
|
|
||||||
|
|
||||||
self.assertEqual(db_handler.get_name_from_database(999, "long"), original_long_name)
|
|
||||||
self.assertEqual(db_handler.get_name_from_database(999, "short"), "ABCD")
|
|
||||||
self.assertEqual(db_handler.is_chat_archived(999), 1)
|
|
||||||
|
|
||||||
def test_get_name_from_database_returns_hex_when_user_is_missing(self) -> None:
|
|
||||||
user_id = 0x1234ABCD
|
|
||||||
db_handler.ensure_node_table_exists()
|
|
||||||
|
|
||||||
self.assertEqual(db_handler.get_name_from_database(user_id, "short"), decimal_to_hex(user_id))
|
|
||||||
self.assertEqual(db_handler.is_chat_archived(user_id), 0)
|
|
||||||
|
|
||||||
def test_load_messages_from_db_populates_channels_and_messages(self) -> None:
|
|
||||||
db_handler.update_node_info_in_db(123, long_name="Local Node", short_name="ME")
|
|
||||||
db_handler.update_node_info_in_db(456, long_name="Remote Node", short_name="RM")
|
|
||||||
db_handler.update_node_info_in_db(789, long_name="Archived", short_name="AR", chat_archived=1)
|
|
||||||
|
|
||||||
db_handler.ensure_table_exists(
|
|
||||||
'"123_Primary_messages"',
|
|
||||||
"""
|
|
||||||
user_id TEXT,
|
|
||||||
message_text TEXT,
|
|
||||||
timestamp INTEGER,
|
|
||||||
ack_type TEXT
|
|
||||||
""",
|
|
||||||
)
|
|
||||||
db_handler.ensure_table_exists(
|
|
||||||
'"123_789_messages"',
|
|
||||||
"""
|
|
||||||
user_id TEXT,
|
|
||||||
message_text TEXT,
|
|
||||||
timestamp INTEGER,
|
|
||||||
ack_type TEXT
|
|
||||||
""",
|
|
||||||
)
|
|
||||||
|
|
||||||
with sqlite3.connect(config.db_file_path) as conn:
|
|
||||||
conn.execute('INSERT INTO "123_Primary_messages" VALUES (?, ?, ?, ?)', ("123", "sent", 1700000000, "Ack"))
|
|
||||||
conn.execute('INSERT INTO "123_Primary_messages" VALUES (?, ?, ?, ?)', ("456", "reply", 1700000001, None))
|
|
||||||
conn.execute('INSERT INTO "123_789_messages" VALUES (?, ?, ?, ?)', ("789", "hidden", 1700000002, None))
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
ui_state.channel_list = []
|
|
||||||
ui_state.all_messages = {}
|
|
||||||
|
|
||||||
db_handler.load_messages_from_db()
|
|
||||||
|
|
||||||
self.assertIn("Primary", ui_state.channel_list)
|
|
||||||
self.assertNotIn(789, ui_state.channel_list)
|
|
||||||
self.assertIn("Primary", ui_state.all_messages)
|
|
||||||
self.assertIn(789, ui_state.all_messages)
|
|
||||||
|
|
||||||
messages = ui_state.all_messages["Primary"]
|
|
||||||
self.assertTrue(messages[0][0].startswith("-- "))
|
|
||||||
self.assertTrue(any(config.sent_message_prefix in prefix and config.ack_str in prefix for prefix, _ in messages))
|
|
||||||
self.assertTrue(any("RM:" in prefix for prefix, _ in messages))
|
|
||||||
self.assertEqual(ui_state.all_messages[789][-1][1], "hidden")
|
|
||||||
|
|
||||||
def test_init_nodedb_inserts_nodes_from_interface(self) -> None:
|
|
||||||
interface_state.interface = build_demo_interface()
|
|
||||||
interface_state.myNodeNum = DEMO_LOCAL_NODE_NUM
|
|
||||||
|
|
||||||
db_handler.init_nodedb()
|
|
||||||
|
|
||||||
self.assertEqual(db_handler.get_name_from_database(2701131778, "short"), "SAT2")
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
import tempfile
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
from contact.ui import default_config
|
|
||||||
|
|
||||||
|
|
||||||
class DefaultConfigTests(unittest.TestCase):
|
|
||||||
def test_get_localisation_options_filters_hidden_and_non_ini_files(self) -> None:
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
for filename in ("en.ini", "ru.ini", ".hidden.ini", "notes.txt"):
|
|
||||||
with open(f"{tmpdir}/{filename}", "w", encoding="utf-8") as handle:
|
|
||||||
handle.write("")
|
|
||||||
|
|
||||||
self.assertEqual(default_config.get_localisation_options(tmpdir), ["en", "ru"])
|
|
||||||
|
|
||||||
def test_get_localisation_file_normalizes_extensions_and_falls_back_to_english(self) -> None:
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
for filename in ("en.ini", "ru.ini"):
|
|
||||||
with open(f"{tmpdir}/{filename}", "w", encoding="utf-8") as handle:
|
|
||||||
handle.write("")
|
|
||||||
|
|
||||||
self.assertTrue(default_config.get_localisation_file("RU.ini", tmpdir).endswith("/ru.ini"))
|
|
||||||
self.assertTrue(default_config.get_localisation_file("missing", tmpdir).endswith("/en.ini"))
|
|
||||||
|
|
||||||
def test_update_dict_only_adds_missing_values(self) -> None:
|
|
||||||
default = {"theme": "dark", "nested": {"language": "en", "sound": True}}
|
|
||||||
actual = {"nested": {"language": "ru"}}
|
|
||||||
|
|
||||||
updated = default_config.update_dict(default, actual)
|
|
||||||
|
|
||||||
self.assertTrue(updated)
|
|
||||||
self.assertEqual(actual, {"theme": "dark", "nested": {"language": "ru", "sound": True}})
|
|
||||||
|
|
||||||
def test_format_json_single_line_arrays_keeps_arrays_inline(self) -> None:
|
|
||||||
rendered = default_config.format_json_single_line_arrays({"items": [1, 2], "nested": {"flags": ["a", "b"]}})
|
|
||||||
|
|
||||||
self.assertIn('"items": [1, 2]', rendered)
|
|
||||||
self.assertIn('"flags": ["a", "b"]', rendered)
|
|
||||||
@@ -1,51 +0,0 @@
|
|||||||
import tempfile
|
|
||||||
import unittest
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
import contact.__main__ as entrypoint
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.utilities.db_handler import get_name_from_database
|
|
||||||
from contact.utilities.demo_data import DEMO_CHANNELS, DEMO_LOCAL_NODE_NUM, build_demo_interface, configure_demo_database
|
|
||||||
from contact.utilities.singleton import interface_state, ui_state
|
|
||||||
|
|
||||||
from tests.test_support import reset_singletons, restore_config, snapshot_config
|
|
||||||
|
|
||||||
|
|
||||||
class DemoDataTests(unittest.TestCase):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
reset_singletons()
|
|
||||||
self.saved_config = snapshot_config("db_file_path", "node_sort", "single_pane_mode")
|
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
restore_config(self.saved_config)
|
|
||||||
reset_singletons()
|
|
||||||
|
|
||||||
def test_build_demo_interface_exposes_expected_shape(self) -> None:
|
|
||||||
interface = build_demo_interface()
|
|
||||||
|
|
||||||
self.assertEqual(interface.getMyNodeInfo()["num"], DEMO_LOCAL_NODE_NUM)
|
|
||||||
self.assertEqual([channel.settings.name for channel in interface.getNode("^local").channels], DEMO_CHANNELS)
|
|
||||||
self.assertIn(DEMO_LOCAL_NODE_NUM, interface.nodesByNum)
|
|
||||||
|
|
||||||
def test_initialize_globals_seed_demo_populates_ui_state_and_db(self) -> None:
|
|
||||||
interface_state.interface = build_demo_interface()
|
|
||||||
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
demo_db_path = configure_demo_database(tmpdir)
|
|
||||||
with mock.patch.object(entrypoint.pub, "subscribe"):
|
|
||||||
entrypoint.initialize_globals(seed_demo=True)
|
|
||||||
|
|
||||||
self.assertEqual(config.db_file_path, demo_db_path)
|
|
||||||
self.assertIn("MediumFast", ui_state.channel_list)
|
|
||||||
self.assertIn("Another Channel", ui_state.channel_list)
|
|
||||||
self.assertIn(2701131788, ui_state.channel_list)
|
|
||||||
self.assertEqual(ui_state.node_list[0], DEMO_LOCAL_NODE_NUM)
|
|
||||||
self.assertEqual(get_name_from_database(2701131778, "short"), "SAT2")
|
|
||||||
|
|
||||||
medium_fast = ui_state.all_messages["MediumFast"]
|
|
||||||
self.assertTrue(medium_fast[0][0].startswith("-- "))
|
|
||||||
self.assertTrue(any(config.sent_message_prefix in prefix and config.ack_str in prefix for prefix, _ in medium_fast))
|
|
||||||
self.assertTrue(any("SAT2:" in prefix for prefix, _ in medium_fast))
|
|
||||||
|
|
||||||
direct_messages = ui_state.all_messages[2701131788]
|
|
||||||
self.assertEqual(len(direct_messages), 3)
|
|
||||||
@@ -1,11 +0,0 @@
|
|||||||
import unittest
|
|
||||||
|
|
||||||
from contact.utilities.emoji_utils import normalize_message_text
|
|
||||||
|
|
||||||
|
|
||||||
class EmojiUtilsTests(unittest.TestCase):
|
|
||||||
def test_strips_modifiers_from_keycaps_and_skin_tones(self) -> None:
|
|
||||||
self.assertEqual(normalize_message_text("👍🏽 7️⃣"), "👍 7")
|
|
||||||
|
|
||||||
def test_rewrites_flag_emoji_to_country_codes(self) -> None:
|
|
||||||
self.assertEqual(normalize_message_text("🇺🇸 hello 🇩🇪"), "US hello DE")
|
|
||||||
@@ -1,57 +0,0 @@
|
|||||||
import os
|
|
||||||
import tempfile
|
|
||||||
import unittest
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.utilities import i18n
|
|
||||||
|
|
||||||
from tests.test_support import restore_config, snapshot_config
|
|
||||||
|
|
||||||
|
|
||||||
class I18nTests(unittest.TestCase):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
self.saved_config = snapshot_config("language")
|
|
||||||
i18n._translations = {}
|
|
||||||
i18n._language = None
|
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
restore_config(self.saved_config)
|
|
||||||
i18n._translations = {}
|
|
||||||
i18n._language = None
|
|
||||||
|
|
||||||
def test_t_loads_translation_file_and_formats_placeholders(self) -> None:
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
translation_file = os.path.join(tmpdir, "xx.ini")
|
|
||||||
with open(translation_file, "w", encoding="utf-8") as handle:
|
|
||||||
handle.write('[ui]\n')
|
|
||||||
handle.write('greeting,"Hello {name}"\n')
|
|
||||||
|
|
||||||
config.language = "xx"
|
|
||||||
with mock.patch.object(config, "get_localisation_file", return_value=translation_file):
|
|
||||||
self.assertEqual(i18n.t("ui.greeting", name="Ben"), "Hello Ben")
|
|
||||||
|
|
||||||
def test_t_falls_back_to_default_and_returns_unformatted_text_on_error(self) -> None:
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
translation_file = os.path.join(tmpdir, "xx.ini")
|
|
||||||
with open(translation_file, "w", encoding="utf-8") as handle:
|
|
||||||
handle.write('[ui]\n')
|
|
||||||
handle.write('greeting,"Hello {name}"\n')
|
|
||||||
|
|
||||||
config.language = "xx"
|
|
||||||
with mock.patch.object(config, "get_localisation_file", return_value=translation_file):
|
|
||||||
self.assertEqual(i18n.t("ui.greeting"), "Hello {name}")
|
|
||||||
self.assertEqual(i18n.t("ui.missing", default="Fallback"), "Fallback")
|
|
||||||
self.assertEqual(i18n.t_text("Literal {value}", value=7), "Literal 7")
|
|
||||||
|
|
||||||
def test_loader_cache_is_reused_until_language_changes(self) -> None:
|
|
||||||
config.language = "en"
|
|
||||||
|
|
||||||
with mock.patch.object(i18n, "parse_ini_file", return_value=({"key": "value"}, {})) as parse_ini_file:
|
|
||||||
self.assertEqual(i18n.t("key"), "value")
|
|
||||||
self.assertEqual(i18n.t("key"), "value")
|
|
||||||
self.assertEqual(parse_ini_file.call_count, 1)
|
|
||||||
|
|
||||||
config.language = "ru"
|
|
||||||
self.assertEqual(i18n.t("missing", default="fallback"), "fallback")
|
|
||||||
self.assertEqual(parse_ini_file.call_count, 2)
|
|
||||||
@@ -1,40 +0,0 @@
|
|||||||
import os
|
|
||||||
import tempfile
|
|
||||||
import unittest
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
from contact.utilities.ini_utils import parse_ini_file
|
|
||||||
|
|
||||||
|
|
||||||
class IniUtilsTests(unittest.TestCase):
|
|
||||||
def test_parse_ini_file_reads_sections_fields_and_help_text(self) -> None:
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
ini_path = os.path.join(tmpdir, "settings.ini")
|
|
||||||
with open(ini_path, "w", encoding="utf-8") as handle:
|
|
||||||
handle.write('; comment\n')
|
|
||||||
handle.write('[config.device]\n')
|
|
||||||
handle.write('title,"Device","Device help"\n')
|
|
||||||
handle.write('name,"Node Name","Node help"\n')
|
|
||||||
handle.write('empty_help,"Fallback",""\n')
|
|
||||||
|
|
||||||
with mock.patch("contact.utilities.ini_utils.i18n.t", return_value="No help available."):
|
|
||||||
mapping, help_text = parse_ini_file(ini_path)
|
|
||||||
|
|
||||||
self.assertEqual(mapping["config.device"], "Device")
|
|
||||||
self.assertEqual(help_text["config.device"], "Device help")
|
|
||||||
self.assertEqual(mapping["config.device.name"], "Node Name")
|
|
||||||
self.assertEqual(help_text["config.device.name"], "Node help")
|
|
||||||
self.assertEqual(help_text["config.device.empty_help"], "No help available.")
|
|
||||||
|
|
||||||
def test_parse_ini_file_uses_builtin_help_fallback_when_i18n_fails(self) -> None:
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
ini_path = os.path.join(tmpdir, "settings.ini")
|
|
||||||
with open(ini_path, "w", encoding="utf-8") as handle:
|
|
||||||
handle.write('[section]\n')
|
|
||||||
handle.write('name,"Name"\n')
|
|
||||||
|
|
||||||
with mock.patch("contact.utilities.ini_utils.i18n.t", side_effect=RuntimeError("boom")):
|
|
||||||
mapping, help_text = parse_ini_file(ini_path)
|
|
||||||
|
|
||||||
self.assertEqual(mapping["section.name"], "Name")
|
|
||||||
self.assertEqual(help_text["section.name"], "No help available.")
|
|
||||||
@@ -1,178 +0,0 @@
|
|||||||
from argparse import Namespace
|
|
||||||
import unittest
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
import contact.__main__ as entrypoint
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.utilities.singleton import interface_state, ui_state
|
|
||||||
|
|
||||||
from tests.test_support import reset_singletons, restore_config, snapshot_config
|
|
||||||
|
|
||||||
|
|
||||||
class MainRuntimeTests(unittest.TestCase):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
reset_singletons()
|
|
||||||
self.saved_config = snapshot_config("single_pane_mode")
|
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
restore_config(self.saved_config)
|
|
||||||
reset_singletons()
|
|
||||||
|
|
||||||
def test_initialize_runtime_interface_uses_demo_branch(self) -> None:
|
|
||||||
args = Namespace(demo_screenshot=True)
|
|
||||||
|
|
||||||
with mock.patch.object(entrypoint, "configure_demo_database") as configure_demo_database:
|
|
||||||
with mock.patch.object(entrypoint, "build_demo_interface", return_value="demo-interface") as build_demo:
|
|
||||||
with mock.patch.object(entrypoint, "initialize_interface") as initialize_interface:
|
|
||||||
result = entrypoint.initialize_runtime_interface(args)
|
|
||||||
|
|
||||||
self.assertEqual(result, "demo-interface")
|
|
||||||
configure_demo_database.assert_called_once_with()
|
|
||||||
build_demo.assert_called_once_with()
|
|
||||||
initialize_interface.assert_not_called()
|
|
||||||
|
|
||||||
def test_initialize_runtime_interface_uses_live_branch_without_demo_flag(self) -> None:
|
|
||||||
args = Namespace(demo_screenshot=False)
|
|
||||||
|
|
||||||
with mock.patch.object(entrypoint, "initialize_interface", return_value="live-interface") as initialize_interface:
|
|
||||||
result = entrypoint.initialize_runtime_interface(args)
|
|
||||||
|
|
||||||
self.assertEqual(result, "live-interface")
|
|
||||||
initialize_interface.assert_called_once_with(args)
|
|
||||||
|
|
||||||
def test_prompt_region_if_unset_reinitializes_interface_after_confirmation(self) -> None:
|
|
||||||
args = Namespace()
|
|
||||||
old_interface = mock.Mock()
|
|
||||||
new_interface = mock.Mock()
|
|
||||||
interface_state.interface = old_interface
|
|
||||||
|
|
||||||
with mock.patch.object(entrypoint, "get_list_input", return_value="Yes"):
|
|
||||||
with mock.patch.object(entrypoint, "set_region") as set_region:
|
|
||||||
with mock.patch.object(entrypoint, "initialize_interface", return_value=new_interface) as initialize:
|
|
||||||
entrypoint.prompt_region_if_unset(args)
|
|
||||||
|
|
||||||
set_region.assert_called_once_with(old_interface)
|
|
||||||
old_interface.close.assert_called_once_with()
|
|
||||||
initialize.assert_called_once_with(args)
|
|
||||||
self.assertIs(interface_state.interface, new_interface)
|
|
||||||
|
|
||||||
def test_prompt_region_if_unset_leaves_interface_unchanged_when_declined(self) -> None:
|
|
||||||
args = Namespace()
|
|
||||||
interface = mock.Mock()
|
|
||||||
interface_state.interface = interface
|
|
||||||
|
|
||||||
with mock.patch.object(entrypoint, "get_list_input", return_value="No"):
|
|
||||||
with mock.patch.object(entrypoint, "set_region") as set_region:
|
|
||||||
with mock.patch.object(entrypoint, "initialize_interface") as initialize:
|
|
||||||
entrypoint.prompt_region_if_unset(args)
|
|
||||||
|
|
||||||
set_region.assert_not_called()
|
|
||||||
initialize.assert_not_called()
|
|
||||||
interface.close.assert_not_called()
|
|
||||||
self.assertIs(interface_state.interface, interface)
|
|
||||||
|
|
||||||
def test_initialize_globals_resets_and_populates_runtime_state(self) -> None:
|
|
||||||
ui_state.channel_list = ["stale"]
|
|
||||||
ui_state.all_messages = {"stale": [("old", "message")]}
|
|
||||||
ui_state.notifications = [1]
|
|
||||||
ui_state.packet_buffer = ["packet"]
|
|
||||||
ui_state.node_list = [99]
|
|
||||||
ui_state.selected_channel = 3
|
|
||||||
ui_state.selected_message = 4
|
|
||||||
ui_state.selected_node = 5
|
|
||||||
ui_state.start_index = [9, 9, 9]
|
|
||||||
config.single_pane_mode = "True"
|
|
||||||
|
|
||||||
with mock.patch.object(entrypoint, "get_nodeNum", return_value=123):
|
|
||||||
with mock.patch.object(entrypoint, "get_channels", return_value=["Primary"]) as get_channels:
|
|
||||||
with mock.patch.object(entrypoint, "get_node_list", return_value=[123, 456]) as get_node_list:
|
|
||||||
with mock.patch.object(entrypoint.pub, "subscribe") as subscribe:
|
|
||||||
with mock.patch.object(entrypoint, "init_nodedb") as init_nodedb:
|
|
||||||
with mock.patch.object(entrypoint, "seed_demo_messages") as seed_demo_messages:
|
|
||||||
with mock.patch.object(entrypoint, "load_messages_from_db") as load_messages:
|
|
||||||
entrypoint.initialize_globals(seed_demo=True)
|
|
||||||
|
|
||||||
self.assertEqual(ui_state.channel_list, ["Primary"])
|
|
||||||
self.assertEqual(ui_state.all_messages, {})
|
|
||||||
self.assertEqual(ui_state.notifications, [])
|
|
||||||
self.assertEqual(ui_state.packet_buffer, [])
|
|
||||||
self.assertEqual(ui_state.node_list, [123, 456])
|
|
||||||
self.assertEqual(ui_state.selected_channel, 0)
|
|
||||||
self.assertEqual(ui_state.selected_message, 0)
|
|
||||||
self.assertEqual(ui_state.selected_node, 0)
|
|
||||||
self.assertEqual(ui_state.start_index, [0, 0, 0])
|
|
||||||
self.assertTrue(ui_state.single_pane_mode)
|
|
||||||
self.assertEqual(interface_state.myNodeNum, 123)
|
|
||||||
get_channels.assert_called_once_with()
|
|
||||||
get_node_list.assert_called_once_with()
|
|
||||||
subscribe.assert_called_once_with(entrypoint.on_receive, "meshtastic.receive")
|
|
||||||
init_nodedb.assert_called_once_with()
|
|
||||||
seed_demo_messages.assert_called_once_with()
|
|
||||||
load_messages.assert_called_once_with()
|
|
||||||
|
|
||||||
def test_ensure_min_rows_retries_until_terminal_is_large_enough(self) -> None:
|
|
||||||
stdscr = mock.Mock()
|
|
||||||
stdscr.getmaxyx.side_effect = [(10, 80), (11, 80)]
|
|
||||||
|
|
||||||
with mock.patch.object(entrypoint, "dialog") as dialog:
|
|
||||||
with mock.patch.object(entrypoint.curses, "update_lines_cols") as update_lines_cols:
|
|
||||||
entrypoint.ensure_min_rows(stdscr, min_rows=11)
|
|
||||||
|
|
||||||
dialog.assert_called_once()
|
|
||||||
update_lines_cols.assert_called_once_with()
|
|
||||||
stdscr.clear.assert_called_once_with()
|
|
||||||
stdscr.refresh.assert_called_once_with()
|
|
||||||
|
|
||||||
def test_start_prints_help_and_exits_zero(self) -> None:
|
|
||||||
parser = mock.Mock()
|
|
||||||
|
|
||||||
with mock.patch.object(entrypoint.sys, "argv", ["contact", "--help"]):
|
|
||||||
with mock.patch.object(entrypoint, "setup_parser", return_value=parser):
|
|
||||||
with mock.patch.object(entrypoint.sys, "exit", side_effect=SystemExit(0)) as exit_mock:
|
|
||||||
with self.assertRaises(SystemExit) as raised:
|
|
||||||
entrypoint.start()
|
|
||||||
|
|
||||||
self.assertEqual(raised.exception.code, 0)
|
|
||||||
parser.print_help.assert_called_once_with()
|
|
||||||
exit_mock.assert_called_once_with(0)
|
|
||||||
|
|
||||||
def test_start_runs_curses_wrapper_and_closes_interface(self) -> None:
|
|
||||||
interface = mock.Mock()
|
|
||||||
interface_state.interface = interface
|
|
||||||
|
|
||||||
with mock.patch.object(entrypoint.sys, "argv", ["contact"]):
|
|
||||||
with mock.patch.object(entrypoint.curses, "wrapper") as wrapper:
|
|
||||||
entrypoint.start()
|
|
||||||
|
|
||||||
wrapper.assert_called_once_with(entrypoint.main)
|
|
||||||
interface.close.assert_called_once_with()
|
|
||||||
|
|
||||||
def test_start_handles_keyboard_interrupt(self) -> None:
|
|
||||||
interface = mock.Mock()
|
|
||||||
interface_state.interface = interface
|
|
||||||
|
|
||||||
with mock.patch.object(entrypoint.sys, "argv", ["contact"]):
|
|
||||||
with mock.patch.object(entrypoint.curses, "wrapper", side_effect=KeyboardInterrupt):
|
|
||||||
with mock.patch.object(entrypoint.sys, "exit", side_effect=SystemExit(0)) as exit_mock:
|
|
||||||
with self.assertRaises(SystemExit) as raised:
|
|
||||||
entrypoint.start()
|
|
||||||
|
|
||||||
self.assertEqual(raised.exception.code, 0)
|
|
||||||
interface.close.assert_called_once_with()
|
|
||||||
exit_mock.assert_called_once_with(0)
|
|
||||||
|
|
||||||
def test_start_handles_fatal_exception_and_exits_one(self) -> None:
|
|
||||||
with mock.patch.object(entrypoint.sys, "argv", ["contact"]):
|
|
||||||
with mock.patch.object(entrypoint.curses, "wrapper", side_effect=RuntimeError("boom")):
|
|
||||||
with mock.patch.object(entrypoint.curses, "endwin") as endwin:
|
|
||||||
with mock.patch.object(entrypoint.traceback, "print_exc") as print_exc:
|
|
||||||
with mock.patch("builtins.print") as print_mock:
|
|
||||||
with mock.patch.object(entrypoint.sys, "exit", side_effect=SystemExit(1)) as exit_mock:
|
|
||||||
with self.assertRaises(SystemExit) as raised:
|
|
||||||
entrypoint.start()
|
|
||||||
|
|
||||||
self.assertEqual(raised.exception.code, 1)
|
|
||||||
endwin.assert_called_once_with()
|
|
||||||
print_exc.assert_called_once_with()
|
|
||||||
print_mock.assert_any_call("Fatal error:", mock.ANY)
|
|
||||||
exit_mock.assert_called_once_with(1)
|
|
||||||
@@ -1,96 +0,0 @@
|
|||||||
import unittest
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.message_handlers import rx_handler
|
|
||||||
from contact.utilities.singleton import interface_state, menu_state, ui_state
|
|
||||||
|
|
||||||
from tests.test_support import reset_singletons, restore_config, snapshot_config
|
|
||||||
|
|
||||||
|
|
||||||
class RxHandlerTests(unittest.TestCase):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
reset_singletons()
|
|
||||||
self.saved_config = snapshot_config("notification_sound", "message_prefix")
|
|
||||||
config.notification_sound = "False"
|
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
restore_config(self.saved_config)
|
|
||||||
reset_singletons()
|
|
||||||
|
|
||||||
def test_on_receive_text_message_refreshes_selected_channel(self) -> None:
|
|
||||||
interface_state.myNodeNum = 111
|
|
||||||
ui_state.channel_list = ["Primary"]
|
|
||||||
ui_state.all_messages = {"Primary": []}
|
|
||||||
ui_state.selected_channel = 0
|
|
||||||
|
|
||||||
packet = {
|
|
||||||
"from": 222,
|
|
||||||
"to": 999,
|
|
||||||
"channel": 0,
|
|
||||||
"hopStart": 3,
|
|
||||||
"hopLimit": 1,
|
|
||||||
"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hello"},
|
|
||||||
}
|
|
||||||
|
|
||||||
with mock.patch.object(rx_handler, "refresh_node_list", return_value=True):
|
|
||||||
with mock.patch.object(rx_handler, "draw_node_list") as draw_node_list:
|
|
||||||
with mock.patch.object(rx_handler, "draw_messages_window") as draw_messages_window:
|
|
||||||
with mock.patch.object(rx_handler, "draw_channel_list") as draw_channel_list:
|
|
||||||
with mock.patch.object(rx_handler, "add_notification") as add_notification:
|
|
||||||
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
|
|
||||||
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
|
|
||||||
rx_handler.on_receive(packet, interface=None)
|
|
||||||
|
|
||||||
draw_node_list.assert_called_once_with()
|
|
||||||
draw_messages_window.assert_called_once_with(True)
|
|
||||||
draw_channel_list.assert_not_called()
|
|
||||||
add_notification.assert_not_called()
|
|
||||||
save_message_to_db.assert_called_once_with("Primary", 222, "hello")
|
|
||||||
self.assertEqual(ui_state.all_messages["Primary"][-1][1], "hello")
|
|
||||||
self.assertIn("SAT2:", ui_state.all_messages["Primary"][-1][0])
|
|
||||||
self.assertIn("[2]", ui_state.all_messages["Primary"][-1][0])
|
|
||||||
|
|
||||||
def test_on_receive_direct_message_adds_channel_and_notification(self) -> None:
|
|
||||||
interface_state.myNodeNum = 111
|
|
||||||
ui_state.channel_list = ["Primary"]
|
|
||||||
ui_state.all_messages = {"Primary": []}
|
|
||||||
ui_state.selected_channel = 0
|
|
||||||
|
|
||||||
packet = {
|
|
||||||
"from": 222,
|
|
||||||
"to": 111,
|
|
||||||
"hopStart": 1,
|
|
||||||
"hopLimit": 1,
|
|
||||||
"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"dm"},
|
|
||||||
}
|
|
||||||
|
|
||||||
with mock.patch.object(rx_handler, "refresh_node_list", return_value=False):
|
|
||||||
with mock.patch.object(rx_handler, "draw_messages_window") as draw_messages_window:
|
|
||||||
with mock.patch.object(rx_handler, "draw_channel_list") as draw_channel_list:
|
|
||||||
with mock.patch.object(rx_handler, "add_notification") as add_notification:
|
|
||||||
with mock.patch.object(rx_handler, "update_node_info_in_db") as update_node_info_in_db:
|
|
||||||
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
|
|
||||||
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
|
|
||||||
rx_handler.on_receive(packet, interface=None)
|
|
||||||
|
|
||||||
self.assertIn(222, ui_state.channel_list)
|
|
||||||
self.assertIn(222, ui_state.all_messages)
|
|
||||||
draw_messages_window.assert_not_called()
|
|
||||||
draw_channel_list.assert_called_once_with()
|
|
||||||
add_notification.assert_called_once_with(1)
|
|
||||||
update_node_info_in_db.assert_called_once_with(222, chat_archived=False)
|
|
||||||
save_message_to_db.assert_called_once_with(222, 222, "dm")
|
|
||||||
|
|
||||||
def test_on_receive_trims_packet_buffer_even_when_packet_is_undecoded(self) -> None:
|
|
||||||
ui_state.packet_buffer = list(range(25))
|
|
||||||
ui_state.display_log = True
|
|
||||||
ui_state.current_window = 4
|
|
||||||
|
|
||||||
with mock.patch.object(rx_handler, "draw_packetlog_win") as draw_packetlog_win:
|
|
||||||
rx_handler.on_receive({"id": "new"}, interface=None)
|
|
||||||
|
|
||||||
draw_packetlog_win.assert_called_once_with()
|
|
||||||
self.assertEqual(len(ui_state.packet_buffer), 20)
|
|
||||||
self.assertEqual(ui_state.packet_buffer[-1], {"id": "new"})
|
|
||||||
self.assertTrue(menu_state.need_redraw)
|
|
||||||
@@ -1,27 +0,0 @@
|
|||||||
import threading
|
|
||||||
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.ui.ui_state import AppState, ChatUIState, InterfaceState, MenuState
|
|
||||||
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
|
|
||||||
|
|
||||||
|
|
||||||
def reset_singletons() -> None:
|
|
||||||
_reset_instance(ui_state, ChatUIState())
|
|
||||||
_reset_instance(interface_state, InterfaceState())
|
|
||||||
_reset_instance(menu_state, MenuState())
|
|
||||||
_reset_instance(app_state, AppState())
|
|
||||||
app_state.lock = threading.Lock()
|
|
||||||
|
|
||||||
|
|
||||||
def restore_config(saved: dict) -> None:
|
|
||||||
for key, value in saved.items():
|
|
||||||
setattr(config, key, value)
|
|
||||||
|
|
||||||
|
|
||||||
def snapshot_config(*keys: str) -> dict:
|
|
||||||
return {key: getattr(config, key) for key in keys}
|
|
||||||
|
|
||||||
|
|
||||||
def _reset_instance(target: object, replacement: object) -> None:
|
|
||||||
target.__dict__.clear()
|
|
||||||
target.__dict__.update(replacement.__dict__)
|
|
||||||
@@ -1,27 +0,0 @@
|
|||||||
import unittest
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
from contact.utilities.telemetry_beautifier import get_chunks, humanize_wind_direction
|
|
||||||
|
|
||||||
|
|
||||||
class TelemetryBeautifierTests(unittest.TestCase):
|
|
||||||
def test_humanize_wind_direction_handles_boundaries(self) -> None:
|
|
||||||
self.assertEqual(humanize_wind_direction(0), "N")
|
|
||||||
self.assertEqual(humanize_wind_direction(90), "E")
|
|
||||||
self.assertEqual(humanize_wind_direction(225), "SW")
|
|
||||||
self.assertIsNone(humanize_wind_direction(-1))
|
|
||||||
|
|
||||||
def test_get_chunks_formats_known_and_unknown_values(self) -> None:
|
|
||||||
rendered = get_chunks("uptime_seconds:7200\nwind_direction:90\nlatitude_i:123456789\nunknown:abc\n")
|
|
||||||
|
|
||||||
self.assertIn("🆙 2.0h", rendered)
|
|
||||||
self.assertIn("⮆ E", rendered)
|
|
||||||
self.assertIn("🌍 12.345679", rendered)
|
|
||||||
self.assertIn("unknown:abc", rendered)
|
|
||||||
|
|
||||||
def test_get_chunks_formats_time_values(self) -> None:
|
|
||||||
with mock.patch("contact.utilities.telemetry_beautifier.datetime.datetime") as mocked_datetime:
|
|
||||||
mocked_datetime.fromtimestamp.return_value.strftime.return_value = "01.01.1970 00:00"
|
|
||||||
rendered = get_chunks("time:0\n")
|
|
||||||
|
|
||||||
self.assertIn("🕔 01.01.1970 00:00", rendered)
|
|
||||||
@@ -1,107 +0,0 @@
|
|||||||
from types import SimpleNamespace
|
|
||||||
import unittest
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
from meshtastic import BROADCAST_NUM
|
|
||||||
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.message_handlers import tx_handler
|
|
||||||
from contact.utilities.singleton import interface_state, ui_state
|
|
||||||
|
|
||||||
from tests.test_support import reset_singletons, restore_config, snapshot_config
|
|
||||||
|
|
||||||
|
|
||||||
class TxHandlerTests(unittest.TestCase):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
reset_singletons()
|
|
||||||
tx_handler.ack_naks.clear()
|
|
||||||
self.saved_config = snapshot_config("sent_message_prefix", "ack_str", "ack_implicit_str", "nak_str", "ack_unknown_str")
|
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
tx_handler.ack_naks.clear()
|
|
||||||
restore_config(self.saved_config)
|
|
||||||
reset_singletons()
|
|
||||||
|
|
||||||
def test_send_message_on_named_channel_tracks_ack_request(self) -> None:
|
|
||||||
interface = mock.Mock()
|
|
||||||
interface.sendText.return_value = SimpleNamespace(id="req-1")
|
|
||||||
interface_state.interface = interface
|
|
||||||
interface_state.myNodeNum = 111
|
|
||||||
ui_state.channel_list = ["Primary"]
|
|
||||||
ui_state.all_messages = {"Primary": []}
|
|
||||||
|
|
||||||
with mock.patch.object(tx_handler, "save_message_to_db", return_value=999) as save_message_to_db:
|
|
||||||
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[00:00:00] "):
|
|
||||||
tx_handler.send_message("hello", channel=0)
|
|
||||||
|
|
||||||
interface.sendText.assert_called_once_with(
|
|
||||||
text="hello",
|
|
||||||
destinationId=BROADCAST_NUM,
|
|
||||||
wantAck=True,
|
|
||||||
wantResponse=False,
|
|
||||||
onResponse=tx_handler.onAckNak,
|
|
||||||
channelIndex=0,
|
|
||||||
)
|
|
||||||
save_message_to_db.assert_called_once_with("Primary", 111, "hello")
|
|
||||||
self.assertEqual(tx_handler.ack_naks["req-1"]["channel"], "Primary")
|
|
||||||
self.assertEqual(tx_handler.ack_naks["req-1"]["messageIndex"], 1)
|
|
||||||
self.assertEqual(tx_handler.ack_naks["req-1"]["timestamp"], 999)
|
|
||||||
self.assertEqual(ui_state.all_messages["Primary"][-1][1], "hello")
|
|
||||||
|
|
||||||
def test_send_message_to_direct_node_uses_node_as_destination(self) -> None:
|
|
||||||
interface = mock.Mock()
|
|
||||||
interface.sendText.return_value = SimpleNamespace(id="req-2")
|
|
||||||
interface_state.interface = interface
|
|
||||||
interface_state.myNodeNum = 111
|
|
||||||
ui_state.channel_list = [222]
|
|
||||||
ui_state.all_messages = {222: []}
|
|
||||||
|
|
||||||
with mock.patch.object(tx_handler, "save_message_to_db", return_value=123):
|
|
||||||
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[00:00:00] "):
|
|
||||||
tx_handler.send_message("dm", channel=0)
|
|
||||||
|
|
||||||
interface.sendText.assert_called_once_with(
|
|
||||||
text="dm",
|
|
||||||
destinationId=222,
|
|
||||||
wantAck=True,
|
|
||||||
wantResponse=False,
|
|
||||||
onResponse=tx_handler.onAckNak,
|
|
||||||
channelIndex=0,
|
|
||||||
)
|
|
||||||
self.assertEqual(tx_handler.ack_naks["req-2"]["channel"], 222)
|
|
||||||
|
|
||||||
def test_on_ack_nak_updates_message_for_explicit_ack(self) -> None:
|
|
||||||
interface_state.myNodeNum = 111
|
|
||||||
ui_state.channel_list = ["Primary"]
|
|
||||||
ui_state.selected_channel = 0
|
|
||||||
ui_state.all_messages = {"Primary": [("pending", "hello")]}
|
|
||||||
tx_handler.ack_naks["req"] = {"channel": "Primary", "messageIndex": 0, "timestamp": 55}
|
|
||||||
|
|
||||||
packet = {"from": 222, "decoded": {"requestId": "req", "routing": {"errorReason": "NONE"}}}
|
|
||||||
|
|
||||||
with mock.patch.object(tx_handler, "update_ack_nak") as update_ack_nak:
|
|
||||||
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[01:02:03] "):
|
|
||||||
with mock.patch("contact.ui.contact_ui.draw_messages_window") as draw_messages_window:
|
|
||||||
tx_handler.onAckNak(packet)
|
|
||||||
|
|
||||||
update_ack_nak.assert_called_once_with("Primary", 55, "hello", "Ack")
|
|
||||||
draw_messages_window.assert_called_once_with()
|
|
||||||
self.assertIn(config.sent_message_prefix, ui_state.all_messages["Primary"][0][0])
|
|
||||||
self.assertIn(config.ack_str, ui_state.all_messages["Primary"][0][0])
|
|
||||||
|
|
||||||
def test_on_ack_nak_uses_implicit_marker_for_self_ack(self) -> None:
|
|
||||||
interface_state.myNodeNum = 111
|
|
||||||
ui_state.channel_list = ["Primary"]
|
|
||||||
ui_state.selected_channel = 0
|
|
||||||
ui_state.all_messages = {"Primary": [("pending", "hello")]}
|
|
||||||
tx_handler.ack_naks["req"] = {"channel": "Primary", "messageIndex": 0, "timestamp": 55}
|
|
||||||
|
|
||||||
packet = {"from": 111, "decoded": {"requestId": "req", "routing": {"errorReason": "NONE"}}}
|
|
||||||
|
|
||||||
with mock.patch.object(tx_handler, "update_ack_nak") as update_ack_nak:
|
|
||||||
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[01:02:03] "):
|
|
||||||
with mock.patch("contact.ui.contact_ui.draw_messages_window"):
|
|
||||||
tx_handler.onAckNak(packet)
|
|
||||||
|
|
||||||
update_ack_nak.assert_called_once_with("Primary", 55, "hello", "Implicit")
|
|
||||||
self.assertIn(config.ack_implicit_str, ui_state.all_messages["Primary"][0][0])
|
|
||||||
@@ -1,71 +0,0 @@
|
|||||||
import unittest
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.utilities.demo_data import DEMO_LOCAL_NODE_NUM, build_demo_interface
|
|
||||||
from contact.utilities.singleton import interface_state, ui_state
|
|
||||||
from contact.utilities.utils import add_new_message, get_channels, get_node_list, parse_protobuf
|
|
||||||
|
|
||||||
from tests.test_support import reset_singletons, restore_config, snapshot_config
|
|
||||||
|
|
||||||
|
|
||||||
class UtilsTests(unittest.TestCase):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
reset_singletons()
|
|
||||||
self.saved_config = snapshot_config("node_sort")
|
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
restore_config(self.saved_config)
|
|
||||||
reset_singletons()
|
|
||||||
|
|
||||||
def test_get_node_list_keeps_local_first_and_ignored_last(self) -> None:
|
|
||||||
config.node_sort = "lastHeard"
|
|
||||||
interface = build_demo_interface()
|
|
||||||
interface_state.interface = interface
|
|
||||||
interface_state.myNodeNum = DEMO_LOCAL_NODE_NUM
|
|
||||||
|
|
||||||
node_list = get_node_list()
|
|
||||||
|
|
||||||
self.assertEqual(node_list[0], DEMO_LOCAL_NODE_NUM)
|
|
||||||
self.assertEqual(node_list[-1], 0xA1000008)
|
|
||||||
|
|
||||||
def test_add_new_message_groups_messages_by_hour(self) -> None:
|
|
||||||
ui_state.all_messages = {"MediumFast": []}
|
|
||||||
|
|
||||||
with mock.patch("contact.utilities.utils.time.time", side_effect=[1000, 1000]):
|
|
||||||
with mock.patch("contact.utilities.utils.time.strftime", return_value="[00:16:40] "):
|
|
||||||
with mock.patch("contact.utilities.utils.datetime.datetime") as mocked_datetime:
|
|
||||||
mocked_datetime.fromtimestamp.return_value.strftime.return_value = "2025-02-04 17:00"
|
|
||||||
add_new_message("MediumFast", ">> Test: ", "First")
|
|
||||||
add_new_message("MediumFast", ">> Test: ", "Second")
|
|
||||||
|
|
||||||
self.assertEqual(
|
|
||||||
ui_state.all_messages["MediumFast"],
|
|
||||||
[
|
|
||||||
("-- 2025-02-04 17:00 --", ""),
|
|
||||||
("[00:16:40] >> Test: ", "First"),
|
|
||||||
("[00:16:40] >> Test: ", "Second"),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_get_channels_populates_message_buckets_for_device_channels(self) -> None:
|
|
||||||
interface_state.interface = build_demo_interface()
|
|
||||||
ui_state.channel_list = []
|
|
||||||
ui_state.all_messages = {}
|
|
||||||
|
|
||||||
channels = get_channels()
|
|
||||||
|
|
||||||
self.assertIn("MediumFast", channels)
|
|
||||||
self.assertIn("Another Channel", channels)
|
|
||||||
self.assertIn("MediumFast", ui_state.all_messages)
|
|
||||||
self.assertIn("Another Channel", ui_state.all_messages)
|
|
||||||
|
|
||||||
def test_parse_protobuf_returns_string_payload_unchanged(self) -> None:
|
|
||||||
packet = {"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": "hello"}}
|
|
||||||
|
|
||||||
self.assertEqual(parse_protobuf(packet), "hello")
|
|
||||||
|
|
||||||
def test_parse_protobuf_returns_placeholder_for_text_messages(self) -> None:
|
|
||||||
packet = {"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hello"}}
|
|
||||||
|
|
||||||
self.assertEqual(parse_protobuf(packet), "✉️")
|
|
||||||
@@ -1,14 +0,0 @@
|
|||||||
import unittest
|
|
||||||
|
|
||||||
from contact.utilities.validation_rules import get_validation_for
|
|
||||||
|
|
||||||
|
|
||||||
class ValidationRulesTests(unittest.TestCase):
|
|
||||||
def test_get_validation_for_matches_exact_keys(self) -> None:
|
|
||||||
self.assertEqual(get_validation_for("shortName"), {"max_length": 4})
|
|
||||||
|
|
||||||
def test_get_validation_for_matches_substrings(self) -> None:
|
|
||||||
self.assertEqual(get_validation_for("config.position.latitude"), {"min_value": -90, "max_value": 90})
|
|
||||||
|
|
||||||
def test_get_validation_for_returns_empty_dict_for_unknown_key(self) -> None:
|
|
||||||
self.assertEqual(get_validation_for("totally_unknown"), {})
|
|
||||||
Reference in New Issue
Block a user