mirror of
https://github.com/pdxlocations/contact.git
synced 2026-03-28 17:12:35 +01:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c7b54caf45 | ||
|
|
773f43edd8 | ||
|
|
6af1c46bd3 | ||
|
|
7e3e44df24 | ||
|
|
45626f5e83 | ||
|
|
e9181972b2 | ||
|
|
795ab84ef5 |
@@ -15,7 +15,6 @@ import curses
|
|||||||
import io
|
import io
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import queue
|
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
@@ -55,8 +54,6 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
|
|
||||||
app_state.lock = threading.Lock()
|
app_state.lock = threading.Lock()
|
||||||
app_state.rx_queue = queue.SimpleQueue()
|
|
||||||
app_state.ui_shutdown = False
|
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
@@ -152,10 +149,11 @@ def start() -> None:
|
|||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
app_state.ui_shutdown = False
|
|
||||||
curses.wrapper(main)
|
curses.wrapper(main)
|
||||||
|
interface_state.interface.close()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logging.info("User exited with Ctrl+C")
|
logging.info("User exited with Ctrl+C")
|
||||||
|
interface_state.interface.close()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.critical("Fatal error", exc_info=True)
|
logging.critical("Fatal error", exc_info=True)
|
||||||
@@ -166,13 +164,6 @@ def start() -> None:
|
|||||||
print("Fatal error:", e)
|
print("Fatal error:", e)
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
finally:
|
|
||||||
app_state.ui_shutdown = True
|
|
||||||
try:
|
|
||||||
if interface_state.interface is not None:
|
|
||||||
interface_state.interface.close()
|
|
||||||
except Exception:
|
|
||||||
logging.exception("Error while closing interface")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -2,36 +2,30 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
import shutil
|
import shutil
|
||||||
|
import time
|
||||||
import subprocess
|
import subprocess
|
||||||
import threading
|
import threading
|
||||||
import time
|
from typing import Any, Dict, Optional
|
||||||
from typing import Any, Dict
|
# Debounce notification sounds so a burst of queued messages only plays once.
|
||||||
|
|
||||||
import contact.ui.default_config as config
|
|
||||||
from contact.utilities.db_handler import (
|
|
||||||
get_name_from_database,
|
|
||||||
maybe_store_nodeinfo_in_db,
|
|
||||||
save_message_to_db,
|
|
||||||
update_node_info_in_db,
|
|
||||||
)
|
|
||||||
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
|
|
||||||
from contact.utilities.utils import add_new_message, refresh_node_list
|
|
||||||
|
|
||||||
# Debounce notification sounds so a burst of queued messages only plays once.
|
|
||||||
_SOUND_DEBOUNCE_SECONDS = 0.8
|
_SOUND_DEBOUNCE_SECONDS = 0.8
|
||||||
_sound_timer: threading.Timer | None = None
|
_sound_timer: Optional[threading.Timer] = None
|
||||||
_sound_timer_lock = threading.Lock()
|
_sound_timer_lock = threading.Lock()
|
||||||
_last_sound_request = 0.0
|
_last_sound_request = 0.0
|
||||||
|
|
||||||
|
|
||||||
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
||||||
"""Schedule a notification sound after a short quiet period."""
|
"""Schedule a notification sound after a short quiet period.
|
||||||
|
|
||||||
|
If more messages arrive before the delay elapses, the timer is reset.
|
||||||
|
This prevents playing a sound for each message when a backlog flushes.
|
||||||
|
"""
|
||||||
global _sound_timer, _last_sound_request
|
global _sound_timer, _last_sound_request
|
||||||
|
|
||||||
now = time.monotonic()
|
now = time.monotonic()
|
||||||
with _sound_timer_lock:
|
with _sound_timer_lock:
|
||||||
_last_sound_request = now
|
_last_sound_request = now
|
||||||
|
|
||||||
|
# Cancel any previously scheduled sound.
|
||||||
if _sound_timer is not None:
|
if _sound_timer is not None:
|
||||||
try:
|
try:
|
||||||
_sound_timer.cancel()
|
_sound_timer.cancel()
|
||||||
@@ -40,6 +34,7 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
|||||||
_sound_timer = None
|
_sound_timer = None
|
||||||
|
|
||||||
def _fire(expected_request_time: float) -> None:
|
def _fire(expected_request_time: float) -> None:
|
||||||
|
# Only play if nothing newer has been scheduled.
|
||||||
with _sound_timer_lock:
|
with _sound_timer_lock:
|
||||||
if expected_request_time != _last_sound_request:
|
if expected_request_time != _last_sound_request:
|
||||||
return
|
return
|
||||||
@@ -48,20 +43,42 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
|||||||
_sound_timer = threading.Timer(delay, _fire, args=(now,))
|
_sound_timer = threading.Timer(delay, _fire, args=(now,))
|
||||||
_sound_timer.daemon = True
|
_sound_timer.daemon = True
|
||||||
_sound_timer.start()
|
_sound_timer.start()
|
||||||
|
from contact.utilities.utils import (
|
||||||
|
refresh_node_list,
|
||||||
|
add_new_message,
|
||||||
|
)
|
||||||
|
from contact.ui.contact_ui import (
|
||||||
|
draw_packetlog_win,
|
||||||
|
draw_node_list,
|
||||||
|
draw_messages_window,
|
||||||
|
draw_channel_list,
|
||||||
|
add_notification,
|
||||||
|
)
|
||||||
|
from contact.utilities.db_handler import (
|
||||||
|
save_message_to_db,
|
||||||
|
maybe_store_nodeinfo_in_db,
|
||||||
|
get_name_from_database,
|
||||||
|
update_node_info_in_db,
|
||||||
|
)
|
||||||
|
import contact.ui.default_config as config
|
||||||
|
|
||||||
|
from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state
|
||||||
|
|
||||||
|
|
||||||
def play_sound() -> None:
|
def play_sound():
|
||||||
try:
|
try:
|
||||||
system = platform.system()
|
system = platform.system()
|
||||||
sound_path = None
|
sound_path = None
|
||||||
executable = None
|
executable = None
|
||||||
|
|
||||||
if system == "Darwin":
|
if system == "Darwin": # macOS
|
||||||
sound_path = "/System/Library/Sounds/Ping.aiff"
|
sound_path = "/System/Library/Sounds/Ping.aiff"
|
||||||
executable = "afplay"
|
executable = "afplay"
|
||||||
|
|
||||||
elif system == "Linux":
|
elif system == "Linux":
|
||||||
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
||||||
wav_path = "/usr/share/sounds/alsa/Front_Center.wav"
|
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
|
||||||
|
|
||||||
if shutil.which("paplay") and os.path.exists(ogg_path):
|
if shutil.which("paplay") and os.path.exists(ogg_path):
|
||||||
executable = "paplay"
|
executable = "paplay"
|
||||||
sound_path = ogg_path
|
sound_path = ogg_path
|
||||||
@@ -78,127 +95,102 @@ def play_sound() -> None:
|
|||||||
cmd = [executable, sound_path]
|
cmd = [executable, sound_path]
|
||||||
if executable == "ffplay":
|
if executable == "ffplay":
|
||||||
cmd = [executable, "-nodisp", "-autoexit", sound_path]
|
cmd = [executable, "-nodisp", "-autoexit", sound_path]
|
||||||
|
|
||||||
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||||
except subprocess.CalledProcessError as exc:
|
return
|
||||||
logging.error("Sound playback failed: %s", exc)
|
|
||||||
except Exception as exc:
|
|
||||||
logging.error("Unexpected error while playing sound: %s", exc)
|
|
||||||
|
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
def _decode_message_payload(payload: Any) -> str:
|
logging.error(f"Sound playback failed: {e}")
|
||||||
if isinstance(payload, bytes):
|
except Exception as e:
|
||||||
return payload.decode("utf-8", errors="replace")
|
logging.error(f"Unexpected error: {e}")
|
||||||
if isinstance(payload, str):
|
|
||||||
return payload
|
|
||||||
return str(payload)
|
|
||||||
|
|
||||||
|
|
||||||
def process_receive_event(packet: Dict[str, Any]) -> None:
|
|
||||||
"""Process a queued packet on the UI thread and perform all UI updates."""
|
|
||||||
# Local import prevents module-level circular import.
|
|
||||||
from contact.ui.contact_ui import (
|
|
||||||
add_notification,
|
|
||||||
draw_channel_list,
|
|
||||||
draw_messages_window,
|
|
||||||
draw_node_list,
|
|
||||||
draw_packetlog_win,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update packet log
|
|
||||||
ui_state.packet_buffer.append(packet)
|
|
||||||
if len(ui_state.packet_buffer) > 20:
|
|
||||||
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
|
|
||||||
|
|
||||||
if ui_state.display_log:
|
|
||||||
draw_packetlog_win()
|
|
||||||
if ui_state.current_window == 4:
|
|
||||||
menu_state.need_redraw = True
|
|
||||||
|
|
||||||
decoded = packet.get("decoded")
|
|
||||||
if not isinstance(decoded, dict):
|
|
||||||
return
|
|
||||||
|
|
||||||
changed = refresh_node_list()
|
|
||||||
if changed:
|
|
||||||
draw_node_list()
|
|
||||||
|
|
||||||
portnum = decoded.get("portnum")
|
|
||||||
if portnum == "NODEINFO_APP":
|
|
||||||
user = decoded.get("user")
|
|
||||||
if isinstance(user, dict) and "longName" in user:
|
|
||||||
maybe_store_nodeinfo_in_db(packet)
|
|
||||||
return
|
|
||||||
|
|
||||||
if portnum != "TEXT_MESSAGE_APP":
|
|
||||||
return
|
|
||||||
|
|
||||||
hop_start = packet.get("hopStart", 0)
|
|
||||||
hop_limit = packet.get("hopLimit", 0)
|
|
||||||
hops = hop_start - hop_limit
|
|
||||||
|
|
||||||
if config.notification_sound == "True":
|
|
||||||
schedule_notification_sound()
|
|
||||||
|
|
||||||
message_string = _decode_message_payload(decoded.get("payload"))
|
|
||||||
|
|
||||||
if not ui_state.channel_list:
|
|
||||||
return
|
|
||||||
|
|
||||||
refresh_channels = False
|
|
||||||
refresh_messages = False
|
|
||||||
|
|
||||||
channel_number = packet.get("channel", 0)
|
|
||||||
if not isinstance(channel_number, int):
|
|
||||||
channel_number = 0
|
|
||||||
if channel_number < 0:
|
|
||||||
channel_number = 0
|
|
||||||
|
|
||||||
packet_from = packet.get("from")
|
|
||||||
if packet.get("to") == interface_state.myNodeNum and packet_from is not None:
|
|
||||||
if packet_from not in ui_state.channel_list:
|
|
||||||
ui_state.channel_list.append(packet_from)
|
|
||||||
if packet_from not in ui_state.all_messages:
|
|
||||||
ui_state.all_messages[packet_from] = []
|
|
||||||
update_node_info_in_db(packet_from, chat_archived=False)
|
|
||||||
refresh_channels = True
|
|
||||||
channel_number = ui_state.channel_list.index(packet_from)
|
|
||||||
|
|
||||||
if channel_number >= len(ui_state.channel_list):
|
|
||||||
channel_number = 0
|
|
||||||
|
|
||||||
channel_id = ui_state.channel_list[channel_number]
|
|
||||||
|
|
||||||
if ui_state.selected_channel >= len(ui_state.channel_list):
|
|
||||||
ui_state.selected_channel = 0
|
|
||||||
|
|
||||||
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
|
|
||||||
add_notification(channel_number)
|
|
||||||
refresh_channels = True
|
|
||||||
else:
|
|
||||||
refresh_messages = True
|
|
||||||
|
|
||||||
if packet_from is None:
|
|
||||||
logging.debug("Skipping TEXT_MESSAGE_APP packet with missing 'from' field")
|
|
||||||
return
|
|
||||||
|
|
||||||
message_from_string = get_name_from_database(packet_from, type="short") + ":"
|
|
||||||
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
|
|
||||||
|
|
||||||
if refresh_channels:
|
|
||||||
draw_channel_list()
|
|
||||||
if refresh_messages:
|
|
||||||
draw_messages_window(True)
|
|
||||||
|
|
||||||
save_message_to_db(channel_id, packet_from, message_string)
|
|
||||||
|
|
||||||
|
|
||||||
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||||
"""Enqueue packet to be processed on the main curses thread."""
|
"""
|
||||||
if app_state.ui_shutdown:
|
Handles an incoming packet from a Meshtastic interface.
|
||||||
return
|
|
||||||
if not isinstance(packet, dict):
|
Args:
|
||||||
return
|
packet: The received Meshtastic packet as a dictionary.
|
||||||
try:
|
interface: The Meshtastic interface instance that received the packet.
|
||||||
app_state.rx_queue.put(packet)
|
"""
|
||||||
except Exception:
|
with app_state.lock:
|
||||||
logging.exception("Failed to enqueue packet for UI processing")
|
# Update packet log
|
||||||
|
ui_state.packet_buffer.append(packet)
|
||||||
|
if len(ui_state.packet_buffer) > 20:
|
||||||
|
# Trim buffer to 20 packets
|
||||||
|
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
|
||||||
|
|
||||||
|
if ui_state.display_log:
|
||||||
|
draw_packetlog_win()
|
||||||
|
|
||||||
|
if ui_state.current_window == 4:
|
||||||
|
menu_state.need_redraw = True
|
||||||
|
try:
|
||||||
|
if "decoded" not in packet:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Assume any incoming packet could update the last seen time for a node
|
||||||
|
changed = refresh_node_list()
|
||||||
|
if changed:
|
||||||
|
draw_node_list()
|
||||||
|
|
||||||
|
if packet["decoded"]["portnum"] == "NODEINFO_APP":
|
||||||
|
if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]:
|
||||||
|
maybe_store_nodeinfo_in_db(packet)
|
||||||
|
|
||||||
|
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
|
||||||
|
hop_start = packet.get('hopStart', 0)
|
||||||
|
hop_limit = packet.get('hopLimit', 0)
|
||||||
|
|
||||||
|
hops = hop_start - hop_limit
|
||||||
|
|
||||||
|
|
||||||
|
if config.notification_sound == "True":
|
||||||
|
schedule_notification_sound()
|
||||||
|
|
||||||
|
message_bytes = packet["decoded"]["payload"]
|
||||||
|
message_string = message_bytes.decode("utf-8")
|
||||||
|
|
||||||
|
refresh_channels = False
|
||||||
|
refresh_messages = False
|
||||||
|
|
||||||
|
if packet.get("channel"):
|
||||||
|
channel_number = packet["channel"]
|
||||||
|
else:
|
||||||
|
channel_number = 0
|
||||||
|
|
||||||
|
if packet["to"] == interface_state.myNodeNum:
|
||||||
|
if packet["from"] in ui_state.channel_list:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
ui_state.channel_list.append(packet["from"])
|
||||||
|
if packet["from"] not in ui_state.all_messages:
|
||||||
|
ui_state.all_messages[packet["from"]] = []
|
||||||
|
update_node_info_in_db(packet["from"], chat_archived=False)
|
||||||
|
refresh_channels = True
|
||||||
|
|
||||||
|
channel_number = ui_state.channel_list.index(packet["from"])
|
||||||
|
|
||||||
|
channel_id = ui_state.channel_list[channel_number]
|
||||||
|
|
||||||
|
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
|
||||||
|
add_notification(channel_number)
|
||||||
|
refresh_channels = True
|
||||||
|
else:
|
||||||
|
refresh_messages = True
|
||||||
|
|
||||||
|
# Add received message to the messages list
|
||||||
|
message_from_id = packet["from"]
|
||||||
|
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
|
||||||
|
|
||||||
|
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
|
||||||
|
|
||||||
|
if refresh_channels:
|
||||||
|
draw_channel_list()
|
||||||
|
if refresh_messages:
|
||||||
|
draw_messages_window(True)
|
||||||
|
|
||||||
|
save_message_to_db(channel_id, message_from_id, message_string)
|
||||||
|
|
||||||
|
except KeyError as e:
|
||||||
|
logging.error(f"Error processing packet: {e}")
|
||||||
|
|||||||
@@ -1,11 +1,9 @@
|
|||||||
import curses
|
import curses
|
||||||
import logging
|
import logging
|
||||||
from queue import Empty
|
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
|
||||||
from contact.message_handlers.rx_handler import process_receive_event
|
|
||||||
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
|
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
|
||||||
from contact.settings import settings_menu
|
from contact.settings import settings_menu
|
||||||
from contact.message_handlers.tx_handler import send_message, send_traceroute
|
from contact.message_handlers.tx_handler import send_message, send_traceroute
|
||||||
@@ -17,10 +15,11 @@ from contact.utilities.i18n import t
|
|||||||
import contact.ui.default_config as config
|
import contact.ui.default_config as config
|
||||||
import contact.ui.dialog
|
import contact.ui.dialog
|
||||||
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
|
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
|
||||||
from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state
|
from contact.utilities.singleton import ui_state, interface_state, menu_state
|
||||||
|
|
||||||
|
|
||||||
MIN_COL = 1 # "effectively zero" without breaking curses
|
MIN_COL = 1 # "effectively zero" without breaking curses
|
||||||
|
RESIZE_DEBOUNCE_MS = 250
|
||||||
root_win = None
|
root_win = None
|
||||||
|
|
||||||
|
|
||||||
@@ -163,22 +162,22 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def drain_receive_queue(max_events: int = 200) -> None:
|
def drain_resize_events(input_win: curses.window) -> Union[str, int, None]:
|
||||||
processed = 0
|
"""Wait for resize events to settle and preserve one queued non-resize key."""
|
||||||
while processed < max_events:
|
input_win.timeout(RESIZE_DEBOUNCE_MS)
|
||||||
try:
|
try:
|
||||||
packet = app_state.rx_queue.get(block=False)
|
while True:
|
||||||
except Empty:
|
try:
|
||||||
return
|
next_char = input_win.get_wch()
|
||||||
except Exception:
|
except curses.error:
|
||||||
logging.exception("Error while draining receive queue")
|
return None
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
if next_char == curses.KEY_RESIZE:
|
||||||
process_receive_event(packet)
|
continue
|
||||||
except Exception:
|
|
||||||
logging.exception("Error while processing receive event")
|
return next_char
|
||||||
processed += 1
|
finally:
|
||||||
|
input_win.timeout(-1)
|
||||||
|
|
||||||
|
|
||||||
def main_ui(stdscr: curses.window) -> None:
|
def main_ui(stdscr: curses.window) -> None:
|
||||||
@@ -188,20 +187,20 @@ def main_ui(stdscr: curses.window) -> None:
|
|||||||
|
|
||||||
root_win = stdscr
|
root_win = stdscr
|
||||||
input_text = ""
|
input_text = ""
|
||||||
|
queued_char = None
|
||||||
stdscr.keypad(True)
|
stdscr.keypad(True)
|
||||||
get_channels()
|
get_channels()
|
||||||
handle_resize(stdscr, True)
|
handle_resize(stdscr, True)
|
||||||
entry_win.timeout(75)
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
drain_receive_queue()
|
|
||||||
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
|
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
|
||||||
|
|
||||||
# Get user input from entry window
|
# Get user input from entry window
|
||||||
try:
|
if queued_char is None:
|
||||||
char = entry_win.get_wch()
|
char = entry_win.get_wch()
|
||||||
except curses.error:
|
else:
|
||||||
continue
|
char = queued_char
|
||||||
|
queued_char = None
|
||||||
|
|
||||||
# draw_debug(f"Keypress: {char}")
|
# draw_debug(f"Keypress: {char}")
|
||||||
|
|
||||||
@@ -249,8 +248,9 @@ def main_ui(stdscr: curses.window) -> None:
|
|||||||
|
|
||||||
elif char == curses.KEY_RESIZE:
|
elif char == curses.KEY_RESIZE:
|
||||||
input_text = ""
|
input_text = ""
|
||||||
|
queued_char = drain_resize_events(entry_win)
|
||||||
handle_resize(stdscr, False)
|
handle_resize(stdscr, False)
|
||||||
entry_win.timeout(75)
|
continue
|
||||||
|
|
||||||
elif char == chr(4): # Ctrl + D to delete current channel or node
|
elif char == chr(4): # Ctrl + D to delete current channel or node
|
||||||
handle_ctrl_d()
|
handle_ctrl_d()
|
||||||
|
|||||||
@@ -56,6 +56,13 @@ config_folder = os.path.abspath(config.node_configs_file_path)
|
|||||||
# Load translations
|
# Load translations
|
||||||
field_mapping, help_text = parse_ini_file(translation_file)
|
field_mapping, help_text = parse_ini_file(translation_file)
|
||||||
|
|
||||||
|
def _is_repeated_field(field_desc) -> bool:
|
||||||
|
"""Return True if the protobuf field is repeated.
|
||||||
|
Protobuf 6.31.0 and later use an is_repeated property, while older versions compare against the label field.
|
||||||
|
"""
|
||||||
|
if hasattr(field_desc, "is_repeated"):
|
||||||
|
return bool(field_desc.is_repeated)
|
||||||
|
return field_desc.label == field_desc.LABEL_REPEATED
|
||||||
|
|
||||||
def reload_translations() -> None:
|
def reload_translations() -> None:
|
||||||
global translation_file, field_mapping, help_text
|
global translation_file, field_mapping, help_text
|
||||||
@@ -564,7 +571,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
|||||||
new_value = new_value == "True" or new_value is True
|
new_value = new_value == "True" or new_value is True
|
||||||
menu_state.start_index.pop()
|
menu_state.start_index.pop()
|
||||||
|
|
||||||
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
|
elif _is_repeated_field(field): # Handle repeated field - Not currently used
|
||||||
new_value = get_repeated_input(current_value)
|
new_value = get_repeated_input(current_value)
|
||||||
new_value = current_value if new_value is None else new_value.split(", ")
|
new_value = current_value if new_value is None else new_value.split(", ")
|
||||||
menu_state.start_index.pop()
|
menu_state.start_index.pop()
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
from queue import SimpleQueue
|
|
||||||
from typing import Any, Union, List, Dict
|
from typing import Any, Union, List, Dict
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
@@ -45,5 +44,3 @@ class InterfaceState:
|
|||||||
@dataclass
|
@dataclass
|
||||||
class AppState:
|
class AppState:
|
||||||
lock: Any = None
|
lock: Any = None
|
||||||
rx_queue: SimpleQueue = field(default_factory=SimpleQueue)
|
|
||||||
ui_shutdown: bool = False
|
|
||||||
|
|||||||
@@ -9,6 +9,17 @@ from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
|
|||||||
# defs are from meshtastic/python/main
|
# defs are from meshtastic/python/main
|
||||||
|
|
||||||
|
|
||||||
|
def _is_repeated_field(field_desc) -> bool:
|
||||||
|
"""Return True if the protobuf field is repeated.
|
||||||
|
|
||||||
|
Protobuf 6.31.0+ exposes `is_repeated`, while older versions require
|
||||||
|
checking `label == LABEL_REPEATED`.
|
||||||
|
"""
|
||||||
|
if hasattr(field_desc, "is_repeated"):
|
||||||
|
return bool(field_desc.is_repeated)
|
||||||
|
return field_desc.label == field_desc.LABEL_REPEATED
|
||||||
|
|
||||||
|
|
||||||
def traverseConfig(config_root, config, interface_config) -> bool:
|
def traverseConfig(config_root, config, interface_config) -> bool:
|
||||||
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
|
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
|
||||||
snake_name = camel_to_snake(config_root)
|
snake_name = camel_to_snake(config_root)
|
||||||
@@ -89,7 +100,7 @@ def setPref(config, comp_name, raw_val) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
# repeating fields need to be handled with append, not setattr
|
# repeating fields need to be handled with append, not setattr
|
||||||
if pref.label != pref.LABEL_REPEATED:
|
if not _is_repeated_field(pref):
|
||||||
try:
|
try:
|
||||||
if config_type.message_type is not None:
|
if config_type.message_type is not None:
|
||||||
config_values = getattr(config_part, config_type.name)
|
config_values = getattr(config_part, config_type.name)
|
||||||
|
|||||||
@@ -68,19 +68,19 @@ def get_chunks(data):
|
|||||||
# Leave it string as last resort
|
# Leave it string as last resort
|
||||||
value = value
|
value = value
|
||||||
|
|
||||||
match key:
|
# Python 3.9-compatible alternative to match/case.
|
||||||
|
if key == "uptime_seconds":
|
||||||
# convert seconds to hours, for our sanity
|
# convert seconds to hours, for our sanity
|
||||||
case "uptime_seconds":
|
value = round(value / 60 / 60, 1)
|
||||||
value = round(value / 60 / 60, 1)
|
elif key in ("longitude_i", "latitude_i"):
|
||||||
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
|
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
|
||||||
# truncate to 6th digit after floating point, which would be still accurate
|
# truncate to 6th digit after floating point, which would be still accurate
|
||||||
case "longitude_i" | "latitude_i":
|
value = round(value * 1e-7, 6)
|
||||||
value = round(value * 1e-7, 6)
|
elif key == "wind_direction":
|
||||||
# Convert wind direction from degrees to abbreviation
|
# Convert wind direction from degrees to abbreviation
|
||||||
case "wind_direction":
|
value = humanize_wind_direction(value)
|
||||||
value = humanize_wind_direction(value)
|
elif key == "time":
|
||||||
case "time":
|
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
|
||||||
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
|
|
||||||
|
|
||||||
if key in sensors:
|
if key in sensors:
|
||||||
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "
|
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "contact"
|
name = "contact"
|
||||||
version = "1.4.14"
|
version = "1.4.18"
|
||||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||||
authors = [
|
authors = [
|
||||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||||
|
|||||||
Reference in New Issue
Block a user