1
0
forked from iarv/contact

Compare commits

..

22 Commits

Author SHA1 Message Date
pdxlocations
056db12911 maybe fix aplay 2025-07-16 18:46:51 -07:00
pdxlocations
685a2d4bf8 bump version 2025-07-14 08:15:16 -07:00
pdxlocations
6ed0cc8c9f Merge pull request #199 from rfschmid/add-traceroute-sent-message-to-history
Add traceroute sent message to history
2025-07-03 10:46:07 -07:00
Russell Schmidt
fc208a9258 Add "Traceroute Sent" to message history 2025-07-03 12:43:18 -05:00
Russell Schmidt
eaf9381bca Refactor message saving
Add common function for saving a message to history, removing some
duplicate code and making traceroutes add timestamps like other messages
do.
2025-07-03 12:43:18 -05:00
pdxlocations
367af9044c Merge pull request #198 from rfschmid/add-node-name-to-traceroute-confirm-dialog
Add node name to traceroute confirm dialog
2025-07-03 10:32:41 -07:00
Russell Schmidt
d8183d9009 Make capitalization consistent 2025-07-03 12:19:24 -05:00
Russell Schmidt
3fb1335be3 Add node name to traceroute confirm dialog 2025-07-03 12:10:56 -05:00
pdxlocations
8b05072786 bump version 2025-06-13 15:33:57 -07:00
pdxlocations
4455781e6c fix types and returns 2025-06-12 16:38:05 -07:00
pdxlocations
0c8aaee415 Merge pull request #197 from rfschmid/redirect-sound-player-output-to-dev-null
Redirect sound player output to dev null
2025-06-12 16:10:26 -07:00
pdxlocations
b97d9f4649 Merge pull request #196 from rfschmid/only-clear-input-text-on-enter-if-sending-message
Only clear input on enter when sending message
2025-06-12 16:09:44 -07:00
Russell Schmidt
4152fb6a21 Redirect sound player output to dev null
On my linux system, the sound playing code goes to aplay. When called,
aplay outputs a message about the file it is playing to stderr, which
causes it to be printed on the input line, which can't be easily
cleared. Redirect output from the audio player executable to dev/null.
Deduplicate sound playing code a bit so we only need one call to
subprocess.run, so I don't have to make this change in three places.
2025-06-12 17:28:30 -05:00
Russell Schmidt
384e36dac2 Only clear input on enter when sending message
We should only clear the input field when the user presses enter if the
user actually sent the message. If selecting a different node to send
to, don't clear input.
2025-06-12 17:23:03 -05:00
pdxlocations
65bca84fe6 minor refactor 2025-06-10 23:24:11 -07:00
pdxlocations
16fa2830fd bump version 2025-06-10 22:29:31 -07:00
pdxlocations
c8f1da99e3 Merge pull request #194 from rfschmid:fix-crash-with-newlines
Fix crash with newlines, message spacing
2025-06-10 22:28:41 -07:00
Russell Schmidt
702250c329 Fix crash with newlines, message spacing 2025-06-10 17:42:14 -05:00
pdxlocations
6291082405 Merge pull request #192 from rfschmid/fix-wrapping-with-wide-chars
Fix crash when wrapping with wide characters
2025-06-10 12:19:02 -07:00
pdxlocations
4fa5148664 Merge pull request #193 from rfschmid/fix-backspace
Fix enter not clearing input
2025-06-10 11:57:06 -07:00
Russell Schmidt
d62ec09eea Fix enter not clearing input
Similar to 981d72e, pressing enter wasn't clearing the input field.
2025-06-10 12:19:03 -05:00
Russell Schmidt
61026dcc73 Fix crash when wrapping with wide characters
Update contact_ui.py to use already-existing custom wrap function
implemented in nav_utils instead of textwrap library. Update custom
wrap_text function to use east_asian_width to determine characters that
can use two columns of width.
2025-06-10 12:17:23 -05:00
7 changed files with 108 additions and 107 deletions

View File

@@ -53,22 +53,25 @@ logging.basicConfig(
app_state.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
# ------------------------------------------------------------------------------
def prompt_region_if_unset(args: object) -> None:
"""Prompt user to set region if it is unset."""
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
def initialize_globals(args) -> None:
def initialize_globals(args: object) -> None:
"""Initializes interface and shared globals."""
interface_state.interface = initialize_interface(args)
# Prompt for region if unset
if interface_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
prompt_region_if_unset(args)
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()

View File

@@ -3,11 +3,12 @@ import os
import platform
import shutil
import subprocess
import time
from datetime import datetime
from typing import Any, Dict
from contact.utilities.utils import refresh_node_list
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
@@ -29,37 +30,42 @@ from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
if os.path.exists(sound_path):
subprocess.run(["afplay", sound_path], check=True)
return
else:
logging.warning(f"macOS sound file not found: {sound_path}")
executable = "afplay"
elif system == "Linux":
sound_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
if os.path.exists(sound_path):
if shutil.which("paplay"):
subprocess.run(["paplay", sound_path], check=True)
return
elif shutil.which("aplay"):
subprocess.run(["aplay", sound_path], check=True)
return
else:
logging.warning("No sound player found (paplay/aplay)")
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
elif shutil.which("ffplay") and os.path.exists(ogg_path):
executable = "ffplay"
sound_path = ogg_path
elif shutil.which("aplay") and os.path.exists(wav_path):
executable = "aplay"
sound_path = wav_path
else:
logging.warning(f"Linux sound file not found: {sound_path}")
logging.warning("No suitable sound player or sound file found on Linux")
if executable and sound_path:
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
# Final fallback: terminal beep
print("\a")
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
"""
@@ -119,7 +125,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
channel_number = ui_state.channel_list.index(packet["from"])
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
@@ -129,40 +137,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
save_message_to_db(channel_id, message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -1,4 +1,3 @@
from datetime import datetime
from typing import Any, Dict
import google.protobuf.json_format
@@ -16,6 +15,8 @@ import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.utils import add_new_message
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -146,8 +147,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
@@ -155,18 +157,14 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
@@ -190,32 +188,7 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
channelIndex=send_on_channel,
)
# Add sent message to the messages dictionary
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
timestamp = save_message_to_db(channel_id, myid, message)
@@ -230,10 +203,14 @@ def send_traceroute() -> None:
"""
Sends a RouteDiscovery protobuf to the selected node.
"""
channel_id = ui_state.node_list[ui_state.selected_node]
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
r = mesh_pb2.RouteDiscovery()
interface_state.interface.sendData(
r,
destinationId=ui_state.node_list[ui_state.selected_node],
destinationId=channel_id,
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,

View File

@@ -1,5 +1,4 @@
import curses
import textwrap
import logging
import traceback
from typing import Union
@@ -12,7 +11,7 @@ from contact.utilities.db_handler import get_name_from_database, update_node_inf
from contact.utilities.input_handlers import get_list_input
import contact.ui.default_config as config
import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state
@@ -109,7 +108,7 @@ def main_ui(stdscr: curses.window) -> None:
handle_resize(stdscr, True)
while True:
draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
draw_text_field(entry_win, f"Input: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
char = entry_win.get_wch()
@@ -138,7 +137,7 @@ def main_ui(stdscr: curses.window) -> None:
handle_leftright(char)
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
handle_enter(input_text)
input_text = handle_enter(input_text)
elif char == chr(20): # Ctrl + t for Traceroute
handle_ctrl_t(stdscr)
@@ -298,7 +297,7 @@ def handle_leftright(char: int) -> None:
refresh_pad(2)
def handle_enter(input_text: str) -> None:
def handle_enter(input_text: str) -> str:
"""Handle Enter key events to send messages or select channels."""
if ui_state.current_window == 2:
node_list = ui_state.node_list
@@ -318,6 +317,7 @@ def handle_enter(input_text: str) -> None:
draw_node_list()
draw_channel_list()
draw_messages_window(True)
return input_text
elif len(input_text) > 0:
# Enter key pressed, send user input as message
@@ -325,8 +325,9 @@ def handle_enter(input_text: str) -> None:
draw_messages_window(True)
# Clear entry window and reset input text
input_text = ""
entry_win.erase()
return ""
return input_text
def handle_ctrl_t(stdscr: curses.window) -> None:
@@ -335,14 +336,14 @@ def handle_ctrl_t(stdscr: curses.window) -> None:
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(
stdscr,
"Traceroute Sent",
f"Traceroute Sent To: {get_name_from_database(ui_state.node_list[ui_state.selected_node])}",
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
)
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
def handle_backspace(entry_win: curses.window, input_text: str) -> None:
def handle_backspace(entry_win: curses.window, input_text: str) -> str:
"""Handle backspace key events to remove the last character from input text."""
if input_text:
input_text = input_text[:-1]
@@ -550,7 +551,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
row = 0
for prefix, message in messages:
full_message = f"{prefix}{message}"
wrapped_lines = textwrap.wrap(full_message, messages_win.getmaxyx()[1] - 2)
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
msg_line_count += len(wrapped_lines)
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])

View File

@@ -1,5 +1,7 @@
import curses
import re
from unicodedata import east_asian_width
from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict
@@ -293,9 +295,16 @@ def get_wrapped_help_text(
return wrapped_help
def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words."""
whitespace = '\t\n\x0b\x0c\r '
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
text = text.translate(whitespace_trans)
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
wrapped_lines = []
line_buffer = ""
@@ -304,11 +313,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrap_width -= margin
for word in words:
word_length = len(word)
word_length = text_width(word)
if word_length > wrap_width: # Break long words
if line_buffer:
wrapped_lines.append(line_buffer)
wrapped_lines.append(line_buffer.strip())
line_buffer = ""
line_length = 0
for i in range(0, word_length, wrap_width):
@@ -316,7 +325,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
continue
if line_length + word_length > wrap_width and word.strip():
wrapped_lines.append(line_buffer)
wrapped_lines.append(line_buffer.strip())
line_buffer = ""
line_length = 0
@@ -324,7 +333,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
line_length += word_length
if line_buffer:
wrapped_lines.append(line_buffer)
wrapped_lines.append(line_buffer.strip())
return wrapped_lines

View File

@@ -1,4 +1,5 @@
import datetime
import time
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
@@ -134,3 +135,31 @@ def get_time_ago(timestamp):
if unit != "s":
return f"{value} {unit} ago"
return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ui_state.all_messages[channel_id].append((prefix,message))

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.12"
version = "1.3.15"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}