Compare commits

...

68 Commits

Author SHA1 Message Date
pdxlocations
640955656f fix no config redraw 2025-08-08 00:38:34 -07:00
pdxlocations
8f248f4b5b add confirmation to app settings 2025-08-08 00:29:11 -07:00
pdxlocations
c10905e954 don't exit dialog with left arrow 2025-08-07 23:58:47 -07:00
pdxlocations
d1b93263fa add confirmation box if settings not saved 2025-08-07 23:34:36 -07:00
pdxlocations
623708c2a1 update README.md 2025-08-07 22:30:18 -07:00
pdxlocations
9b8abdb344 fix admin key window name 2025-07-31 23:48:15 -07:00
pdxlocations
8c3e00b52b redraw other input types 2025-07-31 23:44:14 -07:00
pdxlocations
81ebd1b95f fix get_text_input 2025-07-31 00:55:19 -07:00
pdxlocations
ae75d85741 fix user settings inputs 2025-07-31 00:33:30 -07:00
pdxlocations
b6767f423e Fix settings redraw (#214)
* current window 4

* refresh settings on new message

* redraw dialog and fix traceroute

* formatting and catch

* move continue
2025-07-31 00:08:08 -07:00
pdxlocations
b1252fec6c Update README.md 2025-07-30 22:18:01 -07:00
pdxlocations
43d1152074 Update README.md 2025-07-30 22:14:46 -07:00
pdxlocations
786a7b03c5 Configure Filepath for Export Node Config (#213)
* add node config path to settings

* try reload config but failed
2025-07-29 16:51:26 -07:00
pdxlocations
8d111c5df7 Add Warning for Sending Messages Quickly (#212)
* Warn About 2-Second Message Delay

* add comment

* update lines and cols
2025-07-28 23:04:57 -07:00
pdxlocations
b314a24a0c Input Validation Framework (#211)
* init

* validation framework

* add rules

* automatic types

* changes

* fix positions

* redraw input

* check for selected_config

* tweaks

* refactor
2025-07-26 21:20:15 -07:00
pdxlocations
4378f3045c unused argument 2025-07-24 18:02:44 -07:00
pdxlocations
a451d1d7d6 note to future me 2025-07-21 23:18:11 -07:00
pdxlocations
fe1f027219 Merge pull request #209 from pdxlocations/check-db-fields-for-null
Check for NULLS in DB
2025-07-21 10:57:18 -07:00
pdxlocations
43435cbe04 replace \x00 in messages 2025-07-21 10:54:19 -07:00
pdxlocations
fe98075582 bump version 2025-07-21 00:04:00 -07:00
pdxlocations
8716ea6fe1 dont write to the log before config 2025-07-20 23:46:24 -07:00
pdxlocations
a8bdcbb7e6 Merge pull request #208 from pdxlocations/config
fallback to user if install dir not writable
2025-07-17 23:00:13 -07:00
pdxlocations
02742b27f3 fallback to user if install dir not writable 2025-07-17 22:59:35 -07:00
pdxlocations
ae028032a0 Merge pull request #207 from pdxlocations/errors
show connection errors in console
2025-07-17 00:12:12 -07:00
pdxlocations
30402f4906 show connection errors in console 2025-07-17 00:10:17 -07:00
pdxlocations
324e0b03e7 Merge pull request #206 from pdxlocations/notifications
maybe fix aplay
2025-07-16 19:43:37 -07:00
pdxlocations
056db12911 maybe fix aplay 2025-07-16 18:46:51 -07:00
pdxlocations
685a2d4bf8 bump version 2025-07-14 08:15:16 -07:00
pdxlocations
6ed0cc8c9f Merge pull request #199 from rfschmid/add-traceroute-sent-message-to-history
Add traceroute sent message to history
2025-07-03 10:46:07 -07:00
Russell Schmidt
fc208a9258 Add "Traceroute Sent" to message history 2025-07-03 12:43:18 -05:00
Russell Schmidt
eaf9381bca Refactor message saving
Add common function for saving a message to history, removing some
duplicate code and making traceroutes add timestamps like other messages
do.
2025-07-03 12:43:18 -05:00
pdxlocations
367af9044c Merge pull request #198 from rfschmid/add-node-name-to-traceroute-confirm-dialog
Add node name to traceroute confirm dialog
2025-07-03 10:32:41 -07:00
Russell Schmidt
d8183d9009 Make capitalization consistent 2025-07-03 12:19:24 -05:00
Russell Schmidt
3fb1335be3 Add node name to traceroute confirm dialog 2025-07-03 12:10:56 -05:00
pdxlocations
8b05072786 bump version 2025-06-13 15:33:57 -07:00
pdxlocations
4455781e6c fix types and returns 2025-06-12 16:38:05 -07:00
pdxlocations
0c8aaee415 Merge pull request #197 from rfschmid/redirect-sound-player-output-to-dev-null
Redirect sound player output to dev null
2025-06-12 16:10:26 -07:00
pdxlocations
b97d9f4649 Merge pull request #196 from rfschmid/only-clear-input-text-on-enter-if-sending-message
Only clear input on enter when sending message
2025-06-12 16:09:44 -07:00
Russell Schmidt
4152fb6a21 Redirect sound player output to dev null
On my linux system, the sound playing code goes to aplay. When called,
aplay outputs a message about the file it is playing to stderr, which
causes it to be printed on the input line, which can't be easily
cleared. Redirect output from the audio player executable to dev/null.
Deduplicate sound playing code a bit so we only need one call to
subprocess.run, so I don't have to make this change in three places.
2025-06-12 17:28:30 -05:00
Russell Schmidt
384e36dac2 Only clear input on enter when sending message
We should only clear the input field when the user presses enter if the
user actually sent the message. If selecting a different node to send
to, don't clear input.
2025-06-12 17:23:03 -05:00
pdxlocations
65bca84fe6 minor refactor 2025-06-10 23:24:11 -07:00
pdxlocations
16fa2830fd bump version 2025-06-10 22:29:31 -07:00
pdxlocations
c8f1da99e3 Merge pull request #194 from rfschmid:fix-crash-with-newlines
Fix crash with newlines, message spacing
2025-06-10 22:28:41 -07:00
Russell Schmidt
702250c329 Fix crash with newlines, message spacing 2025-06-10 17:42:14 -05:00
pdxlocations
6291082405 Merge pull request #192 from rfschmid/fix-wrapping-with-wide-chars
Fix crash when wrapping with wide characters
2025-06-10 12:19:02 -07:00
pdxlocations
4fa5148664 Merge pull request #193 from rfschmid/fix-backspace
Fix enter not clearing input
2025-06-10 11:57:06 -07:00
Russell Schmidt
d62ec09eea Fix enter not clearing input
Similar to 981d72e, pressing enter wasn't clearing the input field.
2025-06-10 12:19:03 -05:00
Russell Schmidt
61026dcc73 Fix crash when wrapping with wide characters
Update contact_ui.py to use already-existing custom wrap function
implemented in nav_utils instead of textwrap library. Update custom
wrap_text function to use east_asian_width to determine characters that
can use two columns of width.
2025-06-10 12:17:23 -05:00
pdxlocations
1362d3a219 bump version 2025-06-10 10:02:04 -07:00
pdxlocations
981d72e688 fix backspace 2025-06-10 10:01:44 -07:00
pdxlocations
0b5ec0b3d7 Merge pull request #191 from pdxlocations:refactor-ui-functions
Refactor keypress handling
2025-06-09 23:20:42 -07:00
pdxlocations
cbb4ef9e34 break out key functions 2025-06-09 23:19:28 -07:00
pdxlocations
fecd71f4b7 refactor window sizes 2025-06-09 22:37:51 -07:00
pdxlocations
59edfab451 add notif sound prefs (#190) 2025-06-09 22:15:53 -07:00
pdxlocations
39159099e1 change prints to logging 2025-06-09 19:01:40 -07:00
pdxlocations
02e5368c61 waits in configio 2025-06-09 07:40:07 -07:00
pdxlocations
9d234a75d8 change default configs order 2025-06-06 22:45:06 -07:00
pdxlocations
c7edd602ec Make widths configurable (#189) 2025-06-06 22:37:10 -07:00
pdxlocations
00226c5b4d don't use white in green config (#188) 2025-06-06 22:19:58 -07:00
pdxlocations
243079f8eb Error Handling for play_sound (#187)
* add sound for mac and linux

* add error handling for sounds

* use subprocess
2025-06-06 22:04:18 -07:00
pdxlocations
1e0432642c add sound for mac and linux (#183) 2025-05-29 10:08:12 -07:00
pdxlocations
71f37065bf bump version 2025-05-29 10:03:23 -07:00
pdxlocations
ee6aad5d0a allow blank key (#178) 2025-05-18 16:57:49 -07:00
pdxlocations
478f017de1 bump version 2025-05-18 14:43:47 -07:00
pdxlocations
c96c4edb01 Add Arrows to Main UI (#177)
* init

* convert globals to dataclass

* move lock to app state

* Almost working changes

* more almost working changes

* so close

* mostly working changes

* closer changes

* I think it works!

* working changes

* hack fix

* Merge branch 'main' into refactor-chat-ui

* clean-up
2025-05-18 12:28:03 -07:00
pdxlocations
cc416476f5 Merge pull request #174 from rfschmid/patch-1 2025-04-25 21:20:28 -07:00
Russell Schmidt
7fc1cbc3a9 Fix error "No module named 'ui.ui_state'"
Was unable to run locally at tips due to an import not including the package name.
2025-04-23 17:02:47 -05:00
pdxlocations
78f0775ad5 Convert Globals to Class (#173)
* init

* convert globals to dataclass

* move lock to app state
2025-04-19 21:37:54 -07:00
19 changed files with 1532 additions and 809 deletions

View File

@@ -33,6 +33,7 @@ By navigating to Settings -> App Settings, you may customize your UI's icons, co
- `CTRL` + `p` = Hide/show a log of raw received packets.
- `CTRL` + `t` = With the Node List highlighted, send a traceroute to the selected node
- `CTRL` + `d` = With the Channel List hightlighted, archive a chat to reduce UI clutter. Messages will be saved in the db and repopulate if you send or receive a DM from this user.
- `CTRL` + `d` = With the Note List highlghted, remove a node from your nodedb.
- `ESC` = Exit out of the Settings Dialogue, or Quit the application if settings are not displayed.
### Search
@@ -67,3 +68,11 @@ To quickly connect to localhost, use:
```sh
contact -t
```
## Install in development (editable) mode:
```bash
git clone https://github.com/pdxlocations/contact.git
cd contact
python3 -m venv .venv
source .venv/bin/activate
pip install -e .
```

View File

@@ -24,7 +24,6 @@ import traceback
from pubsub import pub
# Local application
import contact.globals as globals
import contact.ui.default_config as config
from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region
@@ -36,7 +35,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input
from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
from contact.utilities.singleton import ui_state, interface_state, app_state
# ------------------------------------------------------------------------------
# Environment & Logging Setup
@@ -52,28 +51,27 @@ logging.basicConfig(
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
globals.lock = threading.Lock()
app_state.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
# ------------------------------------------------------------------------------
def prompt_region_if_unset(args: object) -> None:
"""Prompt user to set region if it is unset."""
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
def initialize_globals(args) -> None:
def initialize_globals() -> None:
"""Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
# Prompt for region if unset
if globals.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
ui_state.node_list = get_node_list()
pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb()
@@ -82,55 +80,63 @@ def initialize_globals(args) -> None:
def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI."""
output_capture = io.StringIO()
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
setup_colors()
draw_splash(stdscr)
setup_colors()
draw_splash(stdscr)
args = setup_parser().parse_args()
args = setup_parser().parse_args()
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
logging.info("Initializing interface...")
with globals.lock:
initialize_globals(args)
logging.info("Starting main UI")
logging.info("Initializing interface...")
with app_state.lock:
interface_state.interface = initialize_interface(args)
main_ui(stdscr)
if interface_state.interface.localNode.localConfig.lora.region == 0:
prompt_region_if_unset(args)
except Exception as e:
console_output = output_capture.getvalue()
logging.error("Uncaught exception: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
initialize_globals()
logging.info("Starting main UI")
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
main_ui(stdscr)
except Exception:
console_output = output_capture.getvalue()
logging.error("Uncaught exception inside main_ui")
logging.error("Traceback:\n%s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
return
except Exception:
raise
def start() -> None:
"""Launch curses wrapper and redirect logs to file."""
"""Entry point for the application."""
if "--help" in sys.argv or "-h" in sys.argv:
setup_parser().print_help()
sys.exit(0)
with open(config.log_file_path, "a", buffering=1) as log_f:
sys.stdout = log_f
sys.stderr = log_f
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.error("Fatal error: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1)
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.critical("Fatal error", exc_info=True)
try:
curses.endwin()
except Exception:
pass
print("Fatal error:", e)
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":

View File

@@ -1,13 +0,0 @@
interface = None
lock = None
display_log = False
all_messages = {}
channel_list = []
notifications = []
packet_buffer = []
node_list = []
myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0
current_window = 0

View File

@@ -1,9 +1,14 @@
import logging
import time
from datetime import datetime
import os
import platform
import shutil
import subprocess
from typing import Any, Dict
from contact.utilities.utils import refresh_node_list
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
@@ -18,7 +23,48 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
elif shutil.which("ffplay") and os.path.exists(ogg_path):
executable = "ffplay"
sound_path = ogg_path
elif shutil.which("aplay") and os.path.exists(wav_path):
executable = "aplay"
sound_path = wav_path
else:
logging.warning("No suitable sound player or sound file found on Linux")
if executable and sound_path:
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
@@ -29,14 +75,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with globals.lock:
with app_state.lock:
# Update packet log
globals.packet_buffer.append(packet)
if len(globals.packet_buffer) > 20:
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
globals.packet_buffer = globals.packet_buffer[-20:]
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if globals.display_log:
if ui_state.display_log:
draw_packetlog_win()
try:
if "decoded" not in packet:
@@ -52,6 +98,10 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
if config.notification_sound == "True":
play_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
@@ -63,19 +113,21 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
else:
channel_number = 0
if packet["to"] == globals.myNodeNum:
if packet["from"] in globals.channel_list:
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:
globals.channel_list.append(packet["from"])
if packet["from"] not in globals.all_messages:
globals.all_messages[packet["from"]] = []
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = globals.channel_list.index(packet["from"])
channel_number = ui_state.channel_list.index(packet["from"])
if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
@@ -85,40 +137,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[globals.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
globals.all_messages[globals.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
save_message_to_db(channel_id, message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -1,4 +1,3 @@
from datetime import datetime
from typing import Any, Dict
import google.protobuf.json_format
@@ -13,7 +12,10 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.utils import add_new_message
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -31,12 +33,12 @@ def onAckNak(packet: Dict[str, Any]) -> None:
return
acknak = ack_naks.pop(request)
message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
confirm_string = " "
ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str
ack_type = "Implicit"
else:
@@ -46,15 +48,15 @@ def onAckNak(packet: Dict[str, Any]) -> None:
confirm_string = config.nak_str
ack_type = "Nak"
globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
config.sent_message_prefix + confirm_string + ": ",
message,
)
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
channel_number = globals.channel_list.index(acknak["channel"])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
channel_number = ui_state.channel_list.index(acknak["channel"])
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
draw_messages_window()
@@ -137,16 +139,17 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
msg_str += route_str + "\n" # Print the route back to us
if packet["from"] not in globals.channel_list:
globals.channel_list.append(packet["from"])
if packet["from"] not in ui_state.channel_list:
ui_state.channel_list.append(packet["from"])
refresh_channels = True
if is_chat_archived(packet["from"]):
update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = globals.channel_list.index(packet["from"])
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
@@ -154,33 +157,29 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
Sends a chat message using the selected channel.
"""
myid = globals.myNodeNum
myid = interface_state.myNodeNum
send_on_channel = 0
channel_id = globals.channel_list[channel]
channel_id = ui_state.channel_list[channel]
if isinstance(channel_id, int):
send_on_channel = 0
destination = channel_id
elif isinstance(channel_id, str):
send_on_channel = channel
sent_message_data = globals.interface.sendText(
sent_message_data = interface_state.interface.sendText(
text=message,
destinationId=destination,
wantAck=True,
@@ -189,38 +188,13 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
channelIndex=send_on_channel,
)
# Add sent message to the messages dictionary
if channel_id not in globals.all_messages:
globals.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
timestamp = save_message_to_db(channel_id, myid, message)
ack_naks[sent_message_data.id] = {
"channel": channel_id,
"messageIndex": len(globals.all_messages[channel_id]) - 1,
"messageIndex": len(ui_state.all_messages[channel_id]) - 1,
"timestamp": timestamp,
}
@@ -229,10 +203,14 @@ def send_traceroute() -> None:
"""
Sends a RouteDiscovery protobuf to the selected node.
"""
channel_id = ui_state.node_list[ui_state.selected_node]
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
r = mesh_pb2.RouteDiscovery()
globals.interface.sendData(
interface_state.interface.sendData(
r,
destinationId=globals.node_list[globals.selected_node],
destinationId=channel_id,
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@ import sys
from typing import List
from contact.utilities.save_to_radio import save_changes
import contact.ui.default_config as config
from contact.utilities.config_io import config_export, config_import
from contact.utilities.control_utils import parse_ini_file, transform_menu_path
from contact.utilities.input_handlers import (
@@ -20,9 +21,7 @@ from contact.ui.dialog import dialog
from contact.ui.menus import generate_menu_from_protobuf
from contact.ui.nav_utils import move_highlight, draw_arrows, update_help_window
from contact.ui.user_config import json_editor
from contact.ui.ui_state import MenuState
menu_state = MenuState()
from contact.utilities.singleton import menu_state
# Constants
width = 80
@@ -36,16 +35,17 @@ script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# Paths
locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
# locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
translation_file = os.path.join(parent_dir, "localisations", "en.ini")
config_folder = os.path.join(locals_dir, "node-configs")
# config_folder = os.path.join(locals_dir, "node-configs")
config_folder = os.path.abspath(config.node_configs_file_path)
# Load translations
field_mapping, help_text = parse_ini_file(translation_file)
def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.window or pad types
def display_menu() -> tuple[object, object]: # curses.window or pad types
min_help_window_height = 6
num_items = len(menu_state.current_menu) + (1 if menu_state.show_save_option else 0)
@@ -106,7 +106,7 @@ def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.wind
)
# Draw help window with dynamically updated max_help_lines
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path, menu_state)
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path)
menu_win.refresh()
menu_pad.refresh(
@@ -132,7 +132,6 @@ def draw_help_window(
menu_height: int,
max_help_lines: int,
transformed_path: List[str],
menu_state: MenuState,
) -> None:
global help_win
@@ -150,6 +149,15 @@ def draw_help_window(
)
def get_input_type_for_field(field) -> type:
if field.type in (field.TYPE_INT32, field.TYPE_UINT32, field.TYPE_INT64):
return int
elif field.type in (field.TYPE_FLOAT, field.TYPE_DOUBLE):
return float
else:
return str
def settings_menu(stdscr: object, interface: object) -> None:
curses.update_lines_cols()
@@ -159,29 +167,32 @@ def settings_menu(stdscr: object, interface: object) -> None:
modified_settings = {}
need_redraw = True
menu_state.need_redraw = True
menu_state.show_save_option = False
while True:
if need_redraw:
if menu_state.need_redraw:
menu_state.need_redraw = False
options = list(menu_state.current_menu.keys())
# Determine if save option should be shown
path = menu_state.menu_path
menu_state.show_save_option = (
(
len(menu_state.menu_path) > 2
and ("Radio Settings" in menu_state.menu_path or "Module Settings" in menu_state.menu_path)
)
or (len(menu_state.menu_path) == 2 and "User Settings" in menu_state.menu_path)
or (len(menu_state.menu_path) == 3 and "Channels" in menu_state.menu_path)
(len(path) > 2 and ("Radio Settings" in path or "Module Settings" in path))
or (len(path) == 2 and "User Settings" in path)
or (len(path) == 3 and "Channels" in path)
)
# Display the menu
menu_win, menu_pad = display_menu(menu_state)
menu_win, menu_pad = display_menu()
need_redraw = False
if menu_win is None:
continue # Skip if menu_win is not initialized
# Capture user input
menu_win.timeout(200) # wait up to 200 ms for a keypress (or less if key is pressed)
key = menu_win.getch()
if key == -1:
continue
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
# max_help_lines = 4
@@ -215,7 +226,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
)
elif key == curses.KEY_RESIZE:
need_redraw = True
menu_state.need_redraw = True
curses.update_lines_cols()
menu_win.erase()
@@ -239,7 +250,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
)
elif key == curses.KEY_RIGHT or key == ord("\n"):
need_redraw = True
menu_state.need_redraw = True
menu_state.start_index.append(0)
menu_win.erase()
help_win.erase()
@@ -268,7 +279,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
break
elif selected_option == "Export Config File":
filename = get_text_input("Enter a filename for the config file")
filename = get_text_input("Enter a filename for the config file", None, None)
if not filename:
logging.info("Export aborted: No filename provided.")
menu_state.start_index.pop()
@@ -290,7 +302,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
with open(yaml_file_path, "w", encoding="utf-8") as file:
file.write(config_text)
logging.info(f"Config file saved to {yaml_file_path}")
dialog(stdscr, "Config File Saved:", yaml_file_path)
dialog("Config File Saved:", yaml_file_path)
menu_state.need_redraw = True
menu_state.start_index.pop()
continue
except PermissionError:
@@ -306,14 +319,16 @@ def settings_menu(stdscr: object, interface: object) -> None:
# Check if folder exists and is not empty
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
dialog(stdscr, "", " No config files found. Export a config first.")
dialog("", " No config files found. Export a config first.")
menu_state.need_redraw = True
continue # Return to menu
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
# Ensure file_list is not empty before proceeding
if not file_list:
dialog(stdscr, "", " No config files found. Export a config first.")
dialog("", " No config files found. Export a config first.")
menu_state.need_redraw = True
continue
filename = get_list_input("Choose a config file", None, file_list)
@@ -327,7 +342,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
elif selected_option == "Config URL":
current_value = interface.localNode.getURL()
new_value = get_text_input(f"Config URL is currently: {current_value}")
new_value = get_text_input(f"Config URL is currently: {current_value}", None, str)
if new_value is not None:
current_value = new_value
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
@@ -380,7 +395,6 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
menu_state.selected_index = 4
continue
# need_redraw = True
field_info = menu_state.current_menu.get(selected_option)
if isinstance(field_info, tuple):
@@ -395,7 +409,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
if selected_option in ["longName", "shortName", "isLicensed"]:
if selected_option in ["longName", "shortName"]:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, None
)
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -414,7 +430,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif selected_option in ["latitude", "longitude", "altitude"]:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, float
)
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -453,17 +471,26 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif field.type == 13: # Field type 13 corresponds to UINT32
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else int(new_value)
menu_state.start_index.pop()
elif field.type == 2: # Field type 13 corresponds to INT64
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else float(new_value)
menu_state.start_index.pop()
else: # Handle other field types
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else new_value
menu_state.start_index.pop()
@@ -486,7 +513,28 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.selected_index = 0
elif key == curses.KEY_LEFT:
need_redraw = True
# If we are at the main menu and there are unsaved changes, prompt to save
if len(menu_state.menu_path) == 3 and modified_settings:
current_section = menu_state.menu_path[-1]
save_prompt = get_list_input(
f"You have unsaved changes in {current_section}. Save before exiting?",
None,
["Yes", "No", "Cancel"],
mandatory=True,
)
if save_prompt == "Cancel":
continue # Stay in the menu without doing anything
elif save_prompt == "Yes":
save_changes(interface, modified_settings, menu_state)
logging.info("Changes Saved")
modified_settings.clear()
menu = rebuild_menu_at_current_path(interface, menu_state)
pass
menu_state.need_redraw = True
menu_win.erase()
help_win.erase()
@@ -497,8 +545,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_win.refresh()
help_win.refresh()
if len(menu_state.menu_path) < 2:
modified_settings.clear()
# if len(menu_state.menu_path) < 2:
# modified_settings.clear()
# Navigate back to the previous menu
if len(menu_state.menu_path) > 1:
@@ -515,6 +563,16 @@ def settings_menu(stdscr: object, interface: object) -> None:
break
def rebuild_menu_at_current_path(interface, menu_state):
"""Rebuild menus from the device and re-point current_menu to the same path."""
new_menu = generate_menu_from_protobuf(interface)
cur = new_menu["Main Menu"]
for step in menu_state.menu_path[1:]:
cur = cur.get(step, {})
menu_state.current_menu = cur
return new_menu
def set_region(interface: object) -> None:
node = interface.getNode("^local")
device_config = node.localConfig

View File

@@ -2,15 +2,69 @@ import json
import logging
import os
from typing import Dict
from contact.ui.colors import setup_colors
# Get the parent directory of the script
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# Paths
json_file_path = os.path.join(parent_dir, "config.json")
log_file_path = os.path.join(parent_dir, "client.log")
db_file_path = os.path.join(parent_dir, "client.db")
# To test writting to a non-writable directory, you can uncomment the following lines:
# mkdir /tmp/test_nonwritable
# chmod -w /tmp/test_nonwritable
# parent_dir = "/tmp/test_nonwritable"
def reload_config() -> None:
loaded_config = initialize_config()
assign_config_variables(loaded_config)
setup_colors(reinit=True)
def _is_writable_dir(path: str) -> bool:
"""
Return True if we can create & delete a temp file in `path`.
"""
if not os.path.isdir(path):
return False
test_path = os.path.join(path, ".perm_test_tmp")
try:
with open(test_path, "w", encoding="utf-8") as _tmp:
_tmp.write("ok")
os.remove(test_path)
return True
except OSError:
return False
def _get_config_root(preferred_dir: str, fallback_name: str = ".contact_client") -> str:
"""
Choose a writable directory for config artifacts.
"""
if _is_writable_dir(preferred_dir):
return preferred_dir
home = os.path.expanduser("~")
fallback_dir = os.path.join(home, fallback_name)
# Ensure the fallback exists.
os.makedirs(fallback_dir, exist_ok=True)
# If *that* still isn't writable, last-ditch: use a system temp dir.
if not _is_writable_dir(fallback_dir):
import tempfile
fallback_dir = tempfile.mkdtemp(prefix="contact_client_")
return fallback_dir
# Pick the root now.
config_root = _get_config_root(parent_dir)
# Paths (derived from the chosen root)
json_file_path = os.path.join(config_root, "config.json")
log_file_path = os.path.join(config_root, "client.log")
db_file_path = os.path.join(config_root, "client.db")
node_configs_file_path = os.path.join(config_root, "node-configs/")
def format_json_single_line_arrays(data: Dict[str, object], indent: int = 4) -> str:
@@ -123,15 +177,19 @@ def initialize_config() -> Dict[str, object]:
"settings_breadcrumbs": ["green", "black"],
"settings_warning": ["green", "black"],
"settings_note": ["green", "black"],
"node_favorite": ["cyan", "white"],
"node_ignored": ["red", "white"],
"node_favorite": ["cyan", "green"],
"node_ignored": ["red", "black"],
}
default_config_variables = {
"channel_list_16ths": "3",
"node_list_16ths": "5",
"db_file_path": db_file_path,
"log_file_path": log_file_path,
"node_configs_file_path": node_configs_file_path,
"message_prefix": ">>",
"sent_message_prefix": ">> Sent",
"notification_symbol": "*",
"notification_sound": "True",
"ack_implicit_str": "[◌]",
"ack_str": "[✓]",
"nak_str": "[x]",
@@ -168,20 +226,26 @@ def initialize_config() -> Dict[str, object]:
def assign_config_variables(loaded_config: Dict[str, object]) -> None:
# Assign values to local variables
global db_file_path, log_file_path, message_prefix, sent_message_prefix
global db_file_path, log_file_path, node_configs_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global node_list_16ths, channel_list_16ths
global theme, COLOR_CONFIG
global node_sort
global node_sort, notification_sound
channel_list_16ths = loaded_config["channel_list_16ths"]
node_list_16ths = loaded_config["node_list_16ths"]
db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"]
node_configs_file_path = loaded_config.get("node_configs_file_path")
message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"]
notification_sound = loaded_config["notification_sound"]
ack_implicit_str = loaded_config["ack_implicit_str"]
ack_str = loaded_config["ack_str"]
nak_str = loaded_config["nak_str"]
ack_unknown_str = loaded_config["ack_unknown_str"]
node_sort = loaded_config["node_sort"]
theme = loaded_config["theme"]
if theme == "dark":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
@@ -189,7 +253,6 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
node_sort = loaded_config["node_sort"]
# Call the function when the script is imported
@@ -205,6 +268,7 @@ if __name__ == "__main__":
print("\nLoaded Configuration:")
print(f"Database File Path: {db_file_path}")
print(f"Log File Path: {log_file_path}")
print(f"Configs File Path: {node_configs_file_path}")
print(f"Message Prefix: {message_prefix}")
print(f"Sent Message Prefix: {sent_message_prefix}")
print(f"Notification Symbol: {notification_symbol}")

View File

@@ -1,44 +1,63 @@
import curses
from contact.ui.colors import get_color
from contact.utilities.singleton import menu_state, ui_state
def dialog(stdscr: curses.window, title: str, message: str) -> None:
height, width = stdscr.getmaxyx()
def dialog(title: str, message: str) -> None:
"""Display a dialog with a title and message."""
# Calculate dialog dimensions
max_line_lengh = 0
previous_window = ui_state.current_window
ui_state.current_window = 4
curses.update_lines_cols()
height, width = curses.LINES, curses.COLS
# Parse message into lines and calculate dimensions
message_lines = message.splitlines()
for l in message_lines:
max_line_length = max(len(l), max_line_lengh)
max_line_length = max(len(l) for l in message_lines)
dialog_width = max(len(title) + 4, max_line_length + 4)
dialog_height = len(message_lines) + 4
x = (width - dialog_width) // 2
y = (height - dialog_height) // 2
# Create dialog window
def draw_window():
win.erase()
win.bkgd(get_color("background"))
win.attrset(get_color("window_frame"))
win.border(0)
win.addstr(0, 2, title, get_color("settings_default"))
for i, line in enumerate(message_lines):
msg_x = (dialog_width - len(line)) // 2
win.addstr(2 + i, msg_x, line, get_color("settings_default"))
ok_text = " Ok "
win.addstr(
dialog_height - 2,
(dialog_width - len(ok_text)) // 2,
ok_text,
get_color("settings_default", reverse=True),
)
win.refresh()
win = curses.newwin(dialog_height, dialog_width, y, x)
win.bkgd(get_color("background"))
win.attrset(get_color("window_frame"))
win.border(0)
draw_window()
# Add title
win.addstr(0, 2, title, get_color("settings_default"))
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l, get_color("settings_default"))
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
# Refresh dialog window
win.refresh()
# Get user input
while True:
win.timeout(200)
char = win.getch()
# Close dialog with enter, space, or esc
if char in (curses.KEY_ENTER, 10, 13, 32, 27):
if menu_state.need_redraw:
menu_state.need_redraw = False
draw_window()
if char in (curses.KEY_ENTER, 10, 13, 32, 27): # Enter, space, Esc
win.erase()
win.refresh()
ui_state.current_window = previous_window
return
if char == -1:
continue

View File

@@ -1,8 +1,22 @@
import curses
import re
from unicodedata import east_asian_width
from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict
from contact.utilities.singleton import interface_state, ui_state
def get_node_color(node_index: int, reverse: bool = False):
node_num = ui_state.node_list[node_index]
node = interface_state.interface.nodesByNum.get(node_num, {})
if node.get("isFavorite"):
return get_color("node_favorite", reverse=reverse)
elif node.get("isIgnored"):
return get_color("node_ignored", reverse=reverse)
return get_color("settings_default", reverse=reverse)
# Aliases
Segment = tuple[str, str, bool, bool]
@@ -128,7 +142,6 @@ def draw_arrows(
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
) -> None:
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if show_save_option else 0)
if visible_height < mi:
@@ -282,9 +295,16 @@ def get_wrapped_help_text(
return wrapped_help
def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words."""
whitespace = '\t\n\x0b\x0c\r '
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
text = text.translate(whitespace_trans)
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
wrapped_lines = []
line_buffer = ""
@@ -293,11 +313,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrap_width -= margin
for word in words:
word_length = len(word)
word_length = text_width(word)
if word_length > wrap_width: # Break long words
if line_buffer:
wrapped_lines.append(line_buffer)
wrapped_lines.append(line_buffer.strip())
line_buffer = ""
line_length = 0
for i in range(0, word_length, wrap_width):
@@ -305,7 +325,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
continue
if line_length + word_length > wrap_width and word.strip():
wrapped_lines.append(line_buffer)
wrapped_lines.append(line_buffer.strip())
line_buffer = ""
line_length = 0
@@ -313,6 +333,94 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
line_length += word_length
if line_buffer:
wrapped_lines.append(line_buffer)
wrapped_lines.append(line_buffer.strip())
return wrapped_lines
def move_main_highlight(
old_idx: int, new_idx, options: List[str], menu_win: curses.window, menu_pad: curses.window, ui_state: object
) -> None:
if old_idx == new_idx: # No-op
return
max_index = len(options) - 1
visible_height = menu_win.getmaxyx()[0] - 2
if new_idx < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
ui_state.start_index[ui_state.current_window] = new_idx
elif new_idx >= ui_state.start_index[ui_state.current_window] + visible_height: # Moving below the visible area
ui_state.start_index[ui_state.current_window] = new_idx - visible_height + 1
# Ensure start_index is within bounds
ui_state.start_index[ui_state.current_window] = max(
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
)
highlight_line(menu_win, menu_pad, old_idx, new_idx, visible_height)
if ui_state.current_window == 0: # hack to fix max_index
max_index += 1
draw_main_arrows(menu_win, max_index, window=ui_state.current_window)
menu_win.refresh()
def highlight_line(
menu_win: curses.window, menu_pad: curses.window, old_idx: int, new_idx: int, visible_height: int
) -> None:
if ui_state.current_window == 0:
color_old = (
get_color("channel_selected") if old_idx == ui_state.selected_channel else get_color("channel_list")
)
color_new = get_color("channel_list", reverse=True) if True else get_color("channel_list", reverse=True)
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, color_old)
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
elif ui_state.current_window == 2:
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(
ui_state.start_index[ui_state.current_window],
0,
menu_win.getbegyx()[0] + 1,
menu_win.getbegyx()[1] + 1,
menu_win.getbegyx()[0] + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 3,
)
def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None:
height, width = win.getmaxyx()
usable_height = height - 2
usable_width = width - 2
if window == 1 and ui_state.display_log:
if log_height := kwargs.get("log_height"):
usable_height -= log_height - 1
if usable_height < max_index:
if ui_state.start_index[window] > 0:
win.addstr(1, usable_width, "", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
if max_index - ui_state.start_index[window] - 1 >= usable_height:
win.addstr(usable_height, usable_width, "", get_color("settings_default"))
else:
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
def get_msg_window_lines(messages_win, packetlog_win) -> None:
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height

View File

@@ -1,11 +1,44 @@
from typing import Any, Union, List, Dict
from dataclasses import dataclass, field
@dataclass
class MenuState:
def __init__(self):
self.menu_index: List[int] = [] # Row we left the previous menus
self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit
self.selected_index: int = 0 # Selected Row
self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu
self.menu_path: List[str] = [] # Menu Path
self.show_save_option: bool = False # Display 'Save'
menu_index: List[int] = field(default_factory=list)
start_index: List[int] = field(default_factory=lambda: [0])
selected_index: int = 0
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict)
menu_path: List[str] = field(default_factory=list)
show_save_option: bool = False
need_redraw: bool = False
@dataclass
class ChatUIState:
display_log: bool = False
channel_list: List[str] = field(default_factory=list)
all_messages: Dict[str, List[str]] = field(default_factory=dict)
notifications: List[str] = field(default_factory=list)
packet_buffer: List[str] = field(default_factory=list)
node_list: List[str] = field(default_factory=list)
selected_channel: int = 0
selected_message: int = 0
selected_node: int = 0
current_window: int = 0
last_sent_time: float = 0.0
selected_index: int = 0
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
show_save_option: bool = False
menu_path: List[str] = field(default_factory=list)
@dataclass
class InterfaceState:
interface: Any = None
myNodeNum: int = 0
@dataclass
class AppState:
lock: Any = None

View File

@@ -4,9 +4,10 @@ import curses
from typing import Any, List, Dict
from contact.ui.colors import get_color, setup_colors, COLOR_MAP
from contact.ui.default_config import format_json_single_line_arrays, loaded_config
import contact.ui.default_config as config
from contact.ui.nav_utils import move_highlight, draw_arrows
from contact.utilities.input_handlers import get_list_input
from contact.utilities.singleton import menu_state
width = 80
@@ -53,12 +54,19 @@ def edit_value(key: str, current_value: str) -> str:
# Handle theme selection dynamically
if key == "theme":
# Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
theme_options = [
k.split("_", 2)[2].lower() for k in config.loaded_config.keys() if k.startswith("COLOR_CONFIG")
]
return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort":
sort_options = ["lastHeard", "name", "hops"]
return get_list_input("Sort By", current_value, sort_options)
elif key == "notification_sound":
sound_options = ["True", "False"]
return get_list_input("Notification Sound", current_value, sound_options)
# Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1)
@@ -67,41 +75,64 @@ def edit_value(key: str, current_value: str) -> str:
user_input = ""
input_position = (7, 13) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
visible_text = user_input[scroll_offset : scroll_offset + input_width] # Only show what fits
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
if menu_state.need_redraw:
curses.update_lines_cols()
menu_state.need_redraw = False
# Re-create the window to fully reset state
edit_win = curses.newwin(height, width, start_y, start_x)
edit_win.timeout(200)
edit_win.bkgd(get_color("background"))
edit_win.attrset(get_color("window_frame"))
edit_win.border()
# Redraw static content
edit_win.addstr(1, 2, f"Editing {key}", get_color("settings_default", bold=True))
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
for i, line in enumerate(wrapped_lines[:4]):
edit_win.addstr(4 + i, 2, line, get_color("settings_default"))
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
visible_text = user_input[scroll_offset : scroll_offset + input_width]
edit_win.addstr(row, col, " " * input_width, get_color("settings_default"))
edit_win.addstr(row, col, visible_text, get_color("settings_default"))
edit_win.refresh()
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
key = edit_win.get_wch()
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width))
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow
try:
key = edit_win.get_wch()
except curses.error:
continue # window not ready — skip this loop
if key in (chr(27), curses.KEY_LEFT):
curses.curs_set(0)
return current_value # Exit without returning a value
return current_value
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
break
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
if user_input: # Only process if there's something to delete
elif key in (curses.KEY_BACKSPACE, chr(127)):
if user_input:
user_input = user_input[:-1]
if scroll_offset > 0 and len(user_input) < scroll_offset + input_width:
scroll_offset -= 1 # Move back if text is shorter than scrolled area
scroll_offset -= 1
else:
if isinstance(key, str):
user_input += key
else:
user_input += chr(key)
if len(user_input) > input_width: # Scroll if input exceeds visible area
if len(user_input) > input_width:
scroll_offset += 1
curses.curs_set(0)
return user_input if user_input else current_value
def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
def display_menu() -> tuple[Any, Any, List[str]]:
"""
Render the configuration menu with a Save button directly added to the window.
"""
@@ -184,6 +215,7 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
def json_editor(stdscr: curses.window, menu_state: Any) -> None:
menu_state.selected_index = 0 # Track the selected option
made_changes = False # Track if any changes were made
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
@@ -206,16 +238,18 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
menu_state.current_menu = data # Track the current level of the menu
# Render the menu
menu_win, menu_pad, options = display_menu(menu_state)
need_redraw = True
menu_win, menu_pad, options = display_menu()
menu_state.need_redraw = True
while True:
if need_redraw:
menu_win, menu_pad, options = display_menu(menu_state)
if menu_state.need_redraw:
menu_state.need_redraw = False
menu_win, menu_pad, options = display_menu()
menu_win.refresh()
need_redraw = False
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
menu_win.timeout(200)
key = menu_win.getch()
if key == curses.KEY_UP:
@@ -243,7 +277,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
elif key in (curses.KEY_RIGHT, 10, 13): # 10 = \n, 13 = carriage return
need_redraw = True
menu_state.need_redraw = True
menu_win.erase()
menu_win.refresh()
@@ -264,11 +298,14 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
if isinstance(selected_data, list) and len(selected_data) == 2:
# Edit color pair
old = selected_data
new_value = edit_color_pair(selected_key, selected_data)
menu_state.menu_path.pop()
menu_state.start_index.pop()
menu_state.menu_index.pop()
menu_state.current_menu[selected_key] = new_value
if new_value != old:
made_changes = True
elif isinstance(selected_data, (dict, list)):
# Navigate into nested data
@@ -277,22 +314,26 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
else:
# General value editing
old = selected_data
new_value = edit_value(selected_key, selected_data)
menu_state.menu_path.pop()
menu_state.menu_index.pop()
menu_state.start_index.pop()
menu_state.current_menu[selected_key] = new_value
need_redraw = True
menu_state.need_redraw = True
if new_value != old:
made_changes = True
else:
# Save button selected
save_json(file_path, data)
stdscr.refresh()
# config.reload() # This isn't refreshing the file paths as expected
continue
elif key in (27, curses.KEY_LEFT): # Escape or Left Arrow
need_redraw = True
menu_state.need_redraw = True
menu_win.erase()
menu_win.refresh()
@@ -313,6 +354,19 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
else:
# Exit the editor
if made_changes:
save_prompt = get_list_input(
"You have unsaved changes. Save before exiting?",
None,
["Yes", "No", "Cancel"],
mandatory=True,
)
if save_prompt == "Cancel":
continue # Stay in the menu without doing anything
elif save_prompt == "Yes":
save_json(file_path, data)
made_changes = False
menu_win.clear()
menu_win.refresh()
@@ -320,7 +374,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
def save_json(file_path: str, data: Dict[str, Any]) -> None:
formatted_json = format_json_single_line_arrays(data)
formatted_json = config.format_json_single_line_arrays(data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted_json)
setup_colors(reinit=True)
@@ -329,7 +383,6 @@ def save_json(file_path: str, data: Dict[str, Any]) -> None:
def main(stdscr: curses.window) -> None:
from contact.ui.ui_state import MenuState
menu_state = MenuState()
if len(menu_state.menu_path) == 0:
menu_state.menu_path = ["App Settings"] # Initialize if not set

View File

@@ -1,5 +1,6 @@
import yaml
import logging
import time
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import mt_config
@@ -133,24 +134,29 @@ def config_import(interface, filename):
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(configuration["owner"])
time.sleep(0.5)
if "owner_short" in configuration:
logging.info(f"Setting device owner short to {configuration['owner_short']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
time.sleep(0.5)
if "ownerShort" in configuration:
logging.info(f"Setting device owner short to {configuration['ownerShort']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
time.sleep(0.5)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode("^local").setURL(configuration["channel_url"])
time.sleep(0.5)
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode("^local").setURL(configuration["channelUrl"])
time.sleep(0.5)
if "location" in configuration:
alt = 0
@@ -169,12 +175,14 @@ def config_import(interface, filename):
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)
if "config" in configuration:
localConfig = interface.getNode("^local").localConfig
for section in configuration["config"]:
traverseConfig(section, configuration["config"][section], localConfig)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
if "module_config" in configuration:
moduleConfig = interface.getNode("^local").moduleConfig
@@ -185,6 +193,7 @@ def config_import(interface, filename):
moduleConfig,
)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
interface.getNode("^local", False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")

View File

@@ -6,12 +6,14 @@ from typing import Optional, Union, Dict
from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config
import contact.globals as globals
from contact.utilities.singleton import ui_state, interface_state
def get_table_name(channel: str) -> str:
# Construct the table name
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
@@ -61,7 +63,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
message_text = ?
"""
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message))
db_connection.commit()
except sqlite3.Error as e:
@@ -72,13 +74,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
def load_messages_from_db() -> None:
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
"""Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages
@@ -104,17 +106,24 @@ def load_messages_from_db() -> None:
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
channel = int(channel) if channel.isdigit() else channel
# Add the channel to globals.channel_list if not already present
if channel not in globals.channel_list and not is_chat_archived(channel):
globals.channel_list.append(channel)
# Add the channel to ui_state.channel_list if not already present
if channel not in ui_state.channel_list and not is_chat_archived(channel):
ui_state.channel_list.append(channel)
# Ensure the channel exists in globals.all_messages
if channel not in globals.all_messages:
globals.all_messages[channel] = []
# Ensure the channel exists in ui_state.all_messages
if channel not in ui_state.all_messages:
ui_state.all_messages[channel] = []
# Add messages to globals.all_messages grouped by hourly timestamp
# Add messages to ui_state.all_messages grouped by hourly timestamp
hourly_messages = {}
for user_id, message, timestamp, ack_type in db_messages:
for row in db_messages:
user_id, message, timestamp, ack_type = row
# Only ack_type is allowed to be None
if user_id is None or message is None or timestamp is None:
logging.warning(f"Skipping row with NULL required field(s): {row}")
continue
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
if hour not in hourly_messages:
hourly_messages[hour] = []
@@ -127,20 +136,22 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak":
ack_str = config.nak_str
if user_id == str(globals.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
if user_id == str(interface_state.myNodeNum):
sanitized_message = message.replace("\x00", "")
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", sanitized_message)
else:
sanitized_message = message.replace("\x00", "")
formatted_message = (
f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ",
message,
sanitized_message,
)
hourly_messages[hour].append(formatted_message)
# Flatten the hourly messages into globals.all_messages[channel]
# Flatten the hourly messages into ui_state.all_messages[channel]
for hour, messages in sorted(hourly_messages.items()):
globals.all_messages[channel].append((f"-- {hour} --", ""))
globals.all_messages[channel].extend(messages)
ui_state.all_messages[channel].append((f"-- {hour} --", ""))
ui_state.all_messages[channel].extend(messages)
except sqlite3.Error as e:
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
@@ -153,11 +164,11 @@ def init_nodedb() -> None:
"""Initialize the node database and update it with nodes from the interface."""
try:
if not globals.interface.nodes:
if not interface_state.interface.nodes:
return # No nodes to initialize
ensure_node_table_exists() # Ensure the table exists before insertion
nodes_snapshot = list(globals.interface.nodes.values())
nodes_snapshot = list(interface_state.interface.nodes.values())
# Insert or update all nodes
for node in nodes_snapshot:
@@ -214,7 +225,7 @@ def update_node_info_in_db(
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
if "chat_archived" not in table_columns:
@@ -278,7 +289,7 @@ def update_node_info_in_db(
def ensure_node_table_exists() -> None:
"""Ensure the node database table exists."""
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety
schema = """
user_id TEXT PRIMARY KEY,
long_name TEXT,
@@ -319,7 +330,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
@@ -345,7 +356,7 @@ def is_chat_archived(user_id: int) -> int:
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))

View File

@@ -6,10 +6,44 @@ from typing import Any, Optional, List
from contact.ui.colors import get_color
from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text
from contact.ui.dialog import dialog
from contact.utilities.validation_rules import get_validation_for
from contact.utilities.singleton import menu_state
def get_text_input(prompt: str) -> Optional[str]:
def invalid_input(window: curses.window, message: str, redraw_func: Optional[callable] = None) -> None:
"""Displays an invalid input message in the given window and redraws if needed."""
cursor_y, cursor_x = window.getyx()
curses.curs_set(0)
dialog("Invalid Input", message)
if redraw_func:
redraw_func() # Redraw the original window content that got obscured
else:
window.refresh()
window.move(cursor_y, cursor_x)
curses.curs_set(1)
def get_text_input(prompt: str, selected_config: str, input_type: str) -> Optional[str]:
"""Handles user input with wrapped text for long prompts."""
def redraw_input_win():
"""Redraw the input window with the current prompt and user input."""
input_win.erase()
input_win.border()
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
if row >= height - 3:
break
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
input_win.addstr(row + 1, col_start, user_input[:first_line_width], get_color("settings_default"))
for i, line in enumerate(wrap_text(user_input[first_line_width:], wrap_width=input_width)):
if row + 2 + i < height - 1:
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
input_win.refresh()
height = 8
width = 80
margin = 2 # Left and right margin
@@ -20,6 +54,7 @@ def get_text_input(prompt: str) -> Optional[str]:
start_x = (curses.COLS - width) // 2
input_win = curses.newwin(height, width, start_y, start_x)
input_win.timeout(200)
input_win.bkgd(get_color("background"))
input_win.attrset(get_color("window_frame"))
input_win.border()
@@ -27,6 +62,7 @@ def get_text_input(prompt: str) -> Optional[str]:
# Wrap the prompt text
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
@@ -39,34 +75,125 @@ def get_text_input(prompt: str) -> Optional[str]:
input_win.refresh()
curses.curs_set(1)
max_length = 4 if "shortName" in prompt else None
user_input = ""
min_value = 0
max_value = 4294967295
min_length = 0
max_length = None
# Start user input after the prompt text
if selected_config is not None:
validation = get_validation_for(selected_config) or {}
min_value = validation.get("min_value", 0)
max_value = validation.get("max_value", 4294967295)
min_length = validation.get("min_length", 0)
max_length = validation.get("max_length")
user_input = ""
col_start = margin + len(prompt_text)
first_line_width = input_width - len(prompt_text) # Available space for first line
first_line_width = input_width - len(prompt_text)
while True:
key = input_win.get_wch() # Waits for user input
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw_input_win()
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
try:
key = input_win.get_wch()
except curses.error:
continue
if key == chr(27) or key == curses.KEY_LEFT:
input_win.erase()
input_win.refresh()
curses.curs_set(0)
return None # Exit without saving
menu_state.need_redraw = True
return None
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): # Enter key
break
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
menu_state.need_redraw = True
if not user_input.strip():
invalid_input(input_win, "Value cannot be empty.", redraw_func=redraw_input_win)
continue
length = len(user_input)
if min_length == max_length and max_length is not None:
if length != min_length:
invalid_input(
input_win, f"Value must be exactly {min_length} characters long.", redraw_func=redraw_input_win
)
continue
else:
if length < min_length:
invalid_input(
input_win,
f"Value must be at least {min_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if max_length is not None and length > max_length:
invalid_input(
input_win,
f"Value must be no more than {max_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if input_type is int:
if not user_input.isdigit():
invalid_input(input_win, "Only numeric digits (09) allowed.", redraw_func=redraw_input_win)
continue
int_val = int(user_input)
if not (min_value <= int_val <= max_value):
invalid_input(
input_win, f"Enter a number between {min_value} and {max_value}.", redraw_func=redraw_input_win
)
continue
curses.curs_set(0)
return int_val
elif input_type is float:
try:
float_val = float(user_input)
if not (min_value <= float_val <= max_value):
invalid_input(
input_win,
f"Enter a number between {min_value} and {max_value}.",
redraw_func=redraw_input_win,
)
continue
except ValueError:
invalid_input(input_win, "Must be a valid floating point number.", redraw_func=redraw_input_win)
continue
else:
curses.curs_set(0)
return float_val
else:
break
elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace
if user_input:
user_input = user_input[:-1] # Remove last character
elif max_length is None or len(user_input) < max_length: # Enforce max length
if isinstance(key, str):
user_input += key
else:
user_input += chr(key)
elif max_length is None or len(user_input) < max_length:
try:
char = chr(key) if not isinstance(key, str) else key
if input_type is int:
if char.isdigit() or (char == "-" and len(user_input) == 0):
user_input += char
elif input_type is float:
if (
char.isdigit()
or (char == "." and "." not in user_input)
or (char == "-" and len(user_input) == 0)
):
user_input += char
else:
user_input += char
except ValueError:
pass # Ignore invalid input
# First line must be manually handled before using wrap_text()
first_line = user_input[:first_line_width] # Cut to max first line width
@@ -95,20 +222,24 @@ def get_text_input(prompt: str) -> Optional[str]:
curses.curs_set(0)
input_win.erase()
input_win.refresh()
return user_input
return user_input.strip()
def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
"""Handles user input for editing up to 3 Admin Keys in Base64 format."""
def to_base64(byte_strings):
"""Convert byte values to Base64-encoded strings."""
return [base64.b64encode(b).decode() for b in byte_strings]
def is_valid_base64(s):
"""Check if a string is valid Base64."""
"""Check if a string is valid Base64 or blank."""
if s == "":
return True
try:
decoded = base64.b64decode(s, validate=True)
return len(decoded) == 32 # Ensure it's exactly 32 bytes
except binascii.Error:
except (binascii.Error, ValueError):
return False
cvalue = to_base64(current_value) # Convert current values to Base64
@@ -117,10 +248,11 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
repeated_win = curses.newwin(height, width, start_y, start_x)
repeated_win.bkgd(get_color("background"))
repeated_win.attrset(get_color("window_frame"))
repeated_win.keypad(True) # Enable keypad for special keys
admin_key_win = curses.newwin(height, width, start_y, start_x)
admin_key_win.timeout(200)
admin_key_win.bkgd(get_color("background"))
admin_key_win.attrset(get_color("window_frame"))
admin_key_win.keypad(True) # Enable keypad for special keys
curses.echo()
curses.curs_set(1)
@@ -128,46 +260,48 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
# Editable list of values (max 3 values)
user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields
cursor_pos = 0 # Track which value is being edited
error_message = ""
invalid_input = ""
while True:
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
admin_key_win.erase()
admin_key_win.border()
admin_key_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
# Display current values, allowing editing
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " " # Highlight the current line
repeated_win.addstr(
admin_key_win.addstr(
3 + i, 2, f"{prefix}Admin Key {i + 1}: ", get_color("settings_default", bold=(i == cursor_pos))
)
repeated_win.addstr(3 + i, 18, line) # Align text for easier editing
admin_key_win.addstr(3 + i, 18, line) # Align text for easier editing
# Move cursor to the correct position inside the field
curses.curs_set(1)
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
admin_key_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
if invalid_input:
admin_key_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
repeated_win.refresh()
key = repeated_win.getch()
admin_key_win.refresh()
key = admin_key_win.getch()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
repeated_win.erase()
repeated_win.refresh()
admin_key_win.erase()
admin_key_win.refresh()
curses.noecho()
curses.curs_set(0)
menu_state.need_redraw = True
return None
elif key == ord("\n"): # Enter key to save and return
menu_state.need_redraw = True
if all(is_valid_base64(val) for val in user_values): # Ensure all values are valid Base64 and 32 bytes
curses.noecho()
curses.curs_set(0)
return user_values # Return the edited Base64 values
else:
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
invalid_input = "Error: Each key must be valid Base64 and 32 bytes long!"
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
@@ -178,11 +312,14 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
invalid_input = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
from contact.utilities.singleton import menu_state # Required if not already imported
def get_repeated_input(current_value: List[str]) -> Optional[str]:
height = 9
width = 80
@@ -190,71 +327,82 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
start_x = (curses.COLS - width) // 2
repeated_win = curses.newwin(height, width, start_y, start_x)
repeated_win.timeout(200)
repeated_win.bkgd(get_color("background"))
repeated_win.attrset(get_color("window_frame"))
repeated_win.keypad(True) # Enable keypad for special keys
repeated_win.keypad(True)
curses.echo()
curses.curs_set(1) # Show the cursor
curses.curs_set(1)
# Editable list of values (max 3 values)
user_values = current_value[:3]
cursor_pos = 0 # Track which value is being edited
error_message = ""
user_values = current_value[:3] + [""] * (3 - len(current_value)) # Always 3 fields
cursor_pos = 0
invalid_input = ""
while True:
def redraw():
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(1, 2, "Edit up to 3 Values:", get_color("settings_default", bold=True))
# Display current values, allowing editing
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " " # Highlight the current line
prefix = "" if i == cursor_pos else " "
repeated_win.addstr(
3 + i, 2, f"{prefix}Value{i + 1}: ", get_color("settings_default", bold=(i == cursor_pos))
)
repeated_win.addstr(3 + i, 18, line)
repeated_win.addstr(3 + i, 18, line[: width - 20]) # Prevent overflow
# Move cursor to the correct position inside the field
curses.curs_set(1)
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
if invalid_input:
repeated_win.addstr(7, 2, invalid_input[: width - 4], get_color("settings_default", bold=True))
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos]))
repeated_win.refresh()
key = repeated_win.getch()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw()
redraw()
try:
key = repeated_win.get_wch()
except curses.error:
continue # ignore timeout or input issues
if key in (27, curses.KEY_LEFT): # ESC or Left Arrow
repeated_win.erase()
repeated_win.refresh()
curses.noecho()
curses.curs_set(0)
menu_state.need_redraw = True
return None
elif key == ord("\n"): # Enter key to save and return
elif key in ("\n", curses.KEY_ENTER):
curses.noecho()
curses.curs_set(0)
return ", ".join(user_values)
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
cursor_pos = (cursor_pos + 1) % len(user_values)
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
if len(user_values[cursor_pos]) > 0:
user_values[cursor_pos] = user_values[cursor_pos][:-1] # Remove last character
menu_state.need_redraw = True
return ", ".join(user_values).strip()
elif key == curses.KEY_UP:
cursor_pos = (cursor_pos - 1) % 3
elif key == curses.KEY_DOWN:
cursor_pos = (cursor_pos + 1) % 3
elif key in (curses.KEY_BACKSPACE, 127):
user_values[cursor_pos] = user_values[cursor_pos][:-1]
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
ch = chr(key) if isinstance(key, int) else key
if ch.isprintable():
user_values[cursor_pos] += ch
invalid_input = ""
except Exception:
pass
from contact.utilities.singleton import menu_state # Ensure this is imported
def get_fixed32_input(current_value: int) -> int:
cvalue = current_value
current_value = str(ipaddress.IPv4Address(current_value))
original_value = current_value
ip_string = str(ipaddress.IPv4Address(current_value))
height = 10
width = 80
start_y = (curses.LINES - height) // 2
@@ -264,54 +412,73 @@ def get_fixed32_input(current_value: int) -> int:
fixed32_win.bkgd(get_color("background"))
fixed32_win.attrset(get_color("window_frame"))
fixed32_win.keypad(True)
fixed32_win.timeout(200)
curses.echo()
curses.curs_set(1)
user_input = ""
while True:
def redraw():
fixed32_win.erase()
fixed32_win.border()
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD)
fixed32_win.addstr(3, 2, f"Current: {current_value}")
fixed32_win.addstr(5, 2, f"New value: {user_input}")
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", get_color("settings_default", bold=True))
fixed32_win.addstr(3, 2, f"Current: {ip_string}", get_color("settings_default"))
fixed32_win.addstr(5, 2, f"New value: {user_input}", get_color("settings_default"))
fixed32_win.refresh()
key = fixed32_win.getch()
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow to cancel
redraw()
try:
key = fixed32_win.get_wch()
except curses.error:
continue # ignore timeout
if key in (27, curses.KEY_LEFT): # ESC or Left Arrow to cancel
fixed32_win.erase()
fixed32_win.refresh()
curses.noecho()
curses.curs_set(0)
return cvalue # Return the current value unchanged
elif key == ord("\n"): # Enter key to validate and save
# Validate IP address
menu_state.need_redraw = True
return original_value
elif key in ("\n", curses.KEY_ENTER):
octets = user_input.split(".")
menu_state.need_redraw = True
if len(octets) == 4 and all(octet.isdigit() and 0 <= int(octet) <= 255 for octet in octets):
curses.noecho()
curses.curs_set(0)
fixed32_address = ipaddress.ip_address(user_input)
return int(fixed32_address) # Return the valid IP address
return int(ipaddress.ip_address(user_input))
else:
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", curses.A_BOLD | curses.color_pair(5))
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", get_color("settings_default", bold=True))
fixed32_win.refresh()
curses.napms(1500) # Wait for 1.5 seconds before refreshing
user_input = "" # Clear invalid input
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
curses.napms(1500)
user_input = ""
elif key in (curses.KEY_BACKSPACE, 127):
user_input = user_input[:-1]
else:
try:
char = chr(key)
if char.isdigit() or char == ".":
user_input += char # Append only valid characters (digits or dots)
except ValueError:
pass # Ignore invalid inputs
ch = chr(key) if isinstance(key, int) else key
if ch.isdigit() or ch == ".":
user_input += ch
except Exception:
pass # Ignore unprintable inputs
def get_list_input(prompt: str, current_option: Optional[str], list_options: List[str]) -> Optional[str]:
from typing import List, Optional # ensure Optional is imported
def get_list_input(
prompt: str, current_option: Optional[str], list_options: List[str], mandatory: bool = False
) -> Optional[str]:
"""
Displays a scrollable list of list_options for the user to choose from.
List selector.
"""
selected_index = list_options.index(current_option) if current_option in list_options else 0
@@ -321,6 +488,7 @@ def get_list_input(prompt: str, current_option: Optional[str], list_options: Lis
start_x = (curses.COLS - width) // 2
list_win = curses.newwin(height, width, start_y, start_x)
list_win.timeout(200)
list_win.bkgd(get_color("background"))
list_win.attrset(get_color("window_frame"))
list_win.keypad(True)
@@ -328,50 +496,62 @@ def get_list_input(prompt: str, current_option: Optional[str], list_options: Lis
list_pad = curses.newpad(len(list_options) + 1, width - 8)
list_pad.bkgd(get_color("background"))
# Render header
list_win.erase()
list_win.border()
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
# Render options on the pad
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
# Initial refresh
list_win.refresh()
list_pad.refresh(
0,
0,
list_win.getbegyx()[0] + 3,
list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
)
max_index = len(list_options) - 1
visible_height = list_win.getmaxyx()[0] - 5
draw_arrows(list_win, visible_height, max_index, [0], show_save_option=False) # Initial call to draw arrows
def redraw_list_ui():
list_win.erase()
list_win.border()
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
for idx, item in enumerate(list_options):
color = get_color("settings_default", reverse=(idx == selected_index))
list_pad.addstr(idx, 0, item.ljust(width - 8), color)
list_win.refresh()
list_pad.refresh(
0,
0,
list_win.getbegyx()[0] + 3,
list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
)
draw_arrows(list_win, visible_height, max_index, [0], show_save_option=False)
# Initial draw
redraw_list_ui()
while True:
key = list_win.getch()
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw_list_ui()
try:
key = list_win.getch()
except curses.error:
continue
if key == curses.KEY_UP:
old_selected_index = selected_index
selected_index = max(0, selected_index - 1)
move_highlight(old_selected_index, list_options, list_win, list_pad, selected_index=selected_index)
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
selected_index = min(len(list_options) - 1, selected_index + 1)
move_highlight(old_selected_index, list_options, list_win, list_pad, selected_index=selected_index)
elif key == ord("\n"): # Enter key
elif key == ord("\n"): # Enter
list_win.clear()
list_win.refresh()
menu_state.need_redraw = True
return list_options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left
if mandatory:
continue
list_win.clear()
list_win.refresh()
menu_state.need_redraw = True
return current_option

View File

@@ -0,0 +1,6 @@
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState, MenuState
ui_state = ChatUIState()
interface_state = InterfaceState()
app_state = AppState()
menu_state = MenuState()

View File

@@ -1,16 +1,18 @@
import contact.globals as globals
import datetime
import time
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
def get_channels():
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
node = globals.interface.getNode("^local")
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages."""
node = interface_state.interface.getNode("^local")
device_channels = node.channels
# Clear and rebuild channel list
# globals.channel_list = []
# ui_state.channel_list = []
for device_channel in device_channels:
if device_channel.role:
@@ -26,20 +28,20 @@ def get_channels():
].name
channel_name = convert_to_camel_case(modem_preset_string)
# Add channel to globals.channel_list if not already present
if channel_name not in globals.channel_list:
globals.channel_list.append(channel_name)
# Add channel to ui_state.channel_list if not already present
if channel_name not in ui_state.channel_list:
ui_state.channel_list.append(channel_name)
# Initialize globals.all_messages[channel_name] if it doesn't exist
if channel_name not in globals.all_messages:
globals.all_messages[channel_name] = []
# Initialize ui_state.all_messages[channel_name] if it doesn't exist
if channel_name not in ui_state.all_messages:
ui_state.all_messages[channel_name] = []
return globals.channel_list
return ui_state.channel_list
def get_node_list():
if globals.interface.nodes:
my_node_num = globals.myNodeNum
if interface_state.interface.nodes:
my_node_num = interface_state.myNodeNum
def node_sort(node):
if config.node_sort == "lastHeard":
@@ -51,7 +53,7 @@ def get_node_list():
else:
return node
sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort)
# Move favorite nodes to the beginning
sorted_nodes = sorted(
@@ -68,14 +70,14 @@ def get_node_list():
def refresh_node_list():
new_node_list = get_node_list()
if new_node_list != globals.node_list:
globals.node_list = new_node_list
if new_node_list != ui_state.node_list:
ui_state.node_list = new_node_list
return True
return False
def get_nodeNum():
myinfo = globals.interface.getMyNodeInfo()
myinfo = interface_state.interface.getMyNodeInfo()
myNodeNum = myinfo["num"]
return myNodeNum
@@ -133,3 +135,31 @@ def get_time_ago(timestamp):
if unit != "s":
return f"{value} {unit} ago"
return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ui_state.all_messages[channel_id].append((prefix,message))

View File

@@ -0,0 +1,23 @@
validation_rules = {
"shortName": {"max_length": 4},
"longName": {"max_length": 32},
"fixed_pin": {"min_length": 6, "max_length": 6},
"position_flags": {"max_length": 3},
"enabled_protocols": {"max_value": 2},
"hop_limit": {"max_value": 7},
"latitude": {"min_value": -90, "max_value": 90},
"longitude": {"min_value": -180, "max_value": 180},
"altitude": {"min_value": -4294967295, "max_value": 4294967295},
"red": {"max_value": 255},
"green": {"max_value": 255},
"blue": {"max_value": 255},
"current": {"max_value": 255},
"position_precision": {"max_value": 32},
}
def get_validation_for(key: str) -> dict:
for rule_key, config in validation_rules.items():
if rule_key in key:
return config
return {}

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.8"
version = "1.3.16"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}