forked from iarv/contact
Compare commits
60 Commits
notificati
...
confirm-un
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
640955656f | ||
|
|
8f248f4b5b | ||
|
|
c10905e954 | ||
|
|
d1b93263fa | ||
|
|
623708c2a1 | ||
|
|
9b8abdb344 | ||
|
|
8c3e00b52b | ||
|
|
81ebd1b95f | ||
|
|
ae75d85741 | ||
|
|
b6767f423e | ||
|
|
b1252fec6c | ||
|
|
43d1152074 | ||
|
|
786a7b03c5 | ||
|
|
8d111c5df7 | ||
|
|
b314a24a0c | ||
|
|
4378f3045c | ||
|
|
a451d1d7d6 | ||
|
|
fe1f027219 | ||
|
|
43435cbe04 | ||
|
|
fe98075582 | ||
|
|
8716ea6fe1 | ||
|
|
a8bdcbb7e6 | ||
|
|
02742b27f3 | ||
|
|
ae028032a0 | ||
|
|
30402f4906 | ||
|
|
324e0b03e7 | ||
|
|
056db12911 | ||
|
|
685a2d4bf8 | ||
|
|
6ed0cc8c9f | ||
|
|
fc208a9258 | ||
|
|
eaf9381bca | ||
|
|
367af9044c | ||
|
|
d8183d9009 | ||
|
|
3fb1335be3 | ||
|
|
8b05072786 | ||
|
|
4455781e6c | ||
|
|
0c8aaee415 | ||
|
|
b97d9f4649 | ||
|
|
4152fb6a21 | ||
|
|
384e36dac2 | ||
|
|
65bca84fe6 | ||
|
|
16fa2830fd | ||
|
|
c8f1da99e3 | ||
|
|
702250c329 | ||
|
|
6291082405 | ||
|
|
4fa5148664 | ||
|
|
d62ec09eea | ||
|
|
61026dcc73 | ||
|
|
1362d3a219 | ||
|
|
981d72e688 | ||
|
|
0b5ec0b3d7 | ||
|
|
cbb4ef9e34 | ||
|
|
fecd71f4b7 | ||
|
|
59edfab451 | ||
|
|
39159099e1 | ||
|
|
02e5368c61 | ||
|
|
9d234a75d8 | ||
|
|
c7edd602ec | ||
|
|
00226c5b4d | ||
|
|
243079f8eb |
@@ -33,6 +33,7 @@ By navigating to Settings -> App Settings, you may customize your UI's icons, co
|
||||
- `CTRL` + `p` = Hide/show a log of raw received packets.
|
||||
- `CTRL` + `t` = With the Node List highlighted, send a traceroute to the selected node
|
||||
- `CTRL` + `d` = With the Channel List hightlighted, archive a chat to reduce UI clutter. Messages will be saved in the db and repopulate if you send or receive a DM from this user.
|
||||
- `CTRL` + `d` = With the Note List highlghted, remove a node from your nodedb.
|
||||
- `ESC` = Exit out of the Settings Dialogue, or Quit the application if settings are not displayed.
|
||||
|
||||
### Search
|
||||
@@ -67,3 +68,11 @@ To quickly connect to localhost, use:
|
||||
```sh
|
||||
contact -t
|
||||
```
|
||||
## Install in development (editable) mode:
|
||||
```bash
|
||||
git clone https://github.com/pdxlocations/contact.git
|
||||
cd contact
|
||||
python3 -m venv .venv
|
||||
source .venv/bin/activate
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
@@ -53,22 +53,21 @@ logging.basicConfig(
|
||||
|
||||
app_state.lock = threading.Lock()
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Main Program Logic
|
||||
# ------------------------------------------------------------------------------
|
||||
def prompt_region_if_unset(args: object) -> None:
|
||||
"""Prompt user to set region if it is unset."""
|
||||
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
set_region(interface_state.interface)
|
||||
interface_state.interface.close()
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
|
||||
def initialize_globals(args) -> None:
|
||||
def initialize_globals() -> None:
|
||||
"""Initializes interface and shared globals."""
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
# Prompt for region if unset
|
||||
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
||||
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
set_region(interface_state.interface)
|
||||
interface_state.interface.close()
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
interface_state.myNodeNum = get_nodeNum()
|
||||
ui_state.channel_list = get_channels()
|
||||
@@ -81,55 +80,63 @@ def initialize_globals(args) -> None:
|
||||
|
||||
def main(stdscr: curses.window) -> None:
|
||||
"""Main entry point for the curses UI."""
|
||||
|
||||
output_capture = io.StringIO()
|
||||
|
||||
try:
|
||||
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
||||
setup_colors()
|
||||
draw_splash(stdscr)
|
||||
setup_colors()
|
||||
draw_splash(stdscr)
|
||||
|
||||
args = setup_parser().parse_args()
|
||||
args = setup_parser().parse_args()
|
||||
|
||||
if getattr(args, "settings", False):
|
||||
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
|
||||
return
|
||||
if getattr(args, "settings", False):
|
||||
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
|
||||
return
|
||||
|
||||
logging.info("Initializing interface...")
|
||||
with app_state.lock:
|
||||
initialize_globals(args)
|
||||
logging.info("Starting main UI")
|
||||
logging.info("Initializing interface...")
|
||||
with app_state.lock:
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
main_ui(stdscr)
|
||||
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
||||
prompt_region_if_unset(args)
|
||||
|
||||
except Exception as e:
|
||||
console_output = output_capture.getvalue()
|
||||
logging.error("Uncaught exception: %s", e)
|
||||
logging.error("Traceback: %s", traceback.format_exc())
|
||||
logging.error("Console output:\n%s", console_output)
|
||||
initialize_globals()
|
||||
logging.info("Starting main UI")
|
||||
|
||||
try:
|
||||
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
||||
main_ui(stdscr)
|
||||
except Exception:
|
||||
console_output = output_capture.getvalue()
|
||||
logging.error("Uncaught exception inside main_ui")
|
||||
logging.error("Traceback:\n%s", traceback.format_exc())
|
||||
logging.error("Console output:\n%s", console_output)
|
||||
return
|
||||
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
|
||||
def start() -> None:
|
||||
"""Launch curses wrapper and redirect logs to file."""
|
||||
"""Entry point for the application."""
|
||||
|
||||
if "--help" in sys.argv or "-h" in sys.argv:
|
||||
setup_parser().print_help()
|
||||
sys.exit(0)
|
||||
|
||||
with open(config.log_file_path, "a", buffering=1) as log_f:
|
||||
sys.stdout = log_f
|
||||
sys.stderr = log_f
|
||||
|
||||
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
|
||||
try:
|
||||
curses.wrapper(main)
|
||||
except KeyboardInterrupt:
|
||||
logging.info("User exited with Ctrl+C")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logging.error("Fatal error: %s", e)
|
||||
logging.error("Traceback: %s", traceback.format_exc())
|
||||
sys.exit(1)
|
||||
try:
|
||||
curses.wrapper(main)
|
||||
except KeyboardInterrupt:
|
||||
logging.info("User exited with Ctrl+C")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logging.critical("Fatal error", exc_info=True)
|
||||
try:
|
||||
curses.endwin()
|
||||
except Exception:
|
||||
pass
|
||||
print("Fatal error:", e)
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import time
|
||||
from datetime import datetime
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import Any, Dict
|
||||
|
||||
from contact.utilities.utils import refresh_node_list
|
||||
from contact.utilities.utils import (
|
||||
refresh_node_list,
|
||||
add_new_message,
|
||||
)
|
||||
from contact.ui.contact_ui import (
|
||||
draw_packetlog_win,
|
||||
draw_node_list,
|
||||
@@ -25,12 +28,43 @@ from contact.utilities.singleton import ui_state, interface_state, app_state
|
||||
|
||||
|
||||
def play_sound():
|
||||
if platform.system() == "Darwin": # macOS
|
||||
os.system("afplay /System/Library/Sounds/Ping.aiff")
|
||||
elif platform.system() == "Linux":
|
||||
os.system("paplay /usr/share/sounds/freedesktop/stereo/complete.oga")
|
||||
else:
|
||||
print("\a") # fallback
|
||||
try:
|
||||
system = platform.system()
|
||||
sound_path = None
|
||||
executable = None
|
||||
|
||||
if system == "Darwin": # macOS
|
||||
sound_path = "/System/Library/Sounds/Ping.aiff"
|
||||
executable = "afplay"
|
||||
|
||||
elif system == "Linux":
|
||||
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
||||
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
|
||||
|
||||
if shutil.which("paplay") and os.path.exists(ogg_path):
|
||||
executable = "paplay"
|
||||
sound_path = ogg_path
|
||||
elif shutil.which("ffplay") and os.path.exists(ogg_path):
|
||||
executable = "ffplay"
|
||||
sound_path = ogg_path
|
||||
elif shutil.which("aplay") and os.path.exists(wav_path):
|
||||
executable = "aplay"
|
||||
sound_path = wav_path
|
||||
else:
|
||||
logging.warning("No suitable sound player or sound file found on Linux")
|
||||
|
||||
if executable and sound_path:
|
||||
cmd = [executable, sound_path]
|
||||
if executable == "ffplay":
|
||||
cmd = [executable, "-nodisp", "-autoexit", sound_path]
|
||||
|
||||
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
return
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
logging.error(f"Sound playback failed: {e}")
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error: {e}")
|
||||
|
||||
|
||||
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
@@ -64,7 +98,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
maybe_store_nodeinfo_in_db(packet)
|
||||
|
||||
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
|
||||
play_sound()
|
||||
|
||||
if config.notification_sound == "True":
|
||||
play_sound()
|
||||
|
||||
message_bytes = packet["decoded"]["payload"]
|
||||
message_string = message_bytes.decode("utf-8")
|
||||
@@ -89,7 +125,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
|
||||
channel_number = ui_state.channel_list.index(packet["from"])
|
||||
|
||||
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
|
||||
channel_id = ui_state.channel_list[channel_number]
|
||||
|
||||
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
|
||||
add_notification(channel_number)
|
||||
refresh_channels = True
|
||||
else:
|
||||
@@ -99,40 +137,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
message_from_id = packet["from"]
|
||||
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
|
||||
|
||||
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
|
||||
|
||||
# Timestamp handling
|
||||
current_timestamp = time.time()
|
||||
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||
|
||||
# Retrieve the last timestamp if available
|
||||
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
|
||||
if channel_messages:
|
||||
# Check the last entry for a timestamp
|
||||
for entry in reversed(channel_messages):
|
||||
if entry[0].startswith("--"):
|
||||
last_hour = entry[0].strip("- ").strip()
|
||||
break
|
||||
else:
|
||||
last_hour = None
|
||||
else:
|
||||
last_hour = None
|
||||
|
||||
# Add a new timestamp if it's a new hour
|
||||
if last_hour != current_hour:
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
|
||||
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
|
||||
(f"{config.message_prefix} {message_from_string} ", message_string)
|
||||
)
|
||||
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string)
|
||||
|
||||
if refresh_channels:
|
||||
draw_channel_list()
|
||||
if refresh_messages:
|
||||
draw_messages_window(True)
|
||||
|
||||
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
|
||||
save_message_to_db(channel_id, message_from_id, message_string)
|
||||
|
||||
except KeyError as e:
|
||||
logging.error(f"Error processing packet: {e}")
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict
|
||||
|
||||
import google.protobuf.json_format
|
||||
@@ -16,6 +15,8 @@ import contact.ui.default_config as config
|
||||
|
||||
from contact.utilities.singleton import ui_state, interface_state
|
||||
|
||||
from contact.utilities.utils import add_new_message
|
||||
|
||||
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
|
||||
|
||||
|
||||
@@ -146,8 +147,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
||||
update_node_info_in_db(packet["from"], chat_archived=False)
|
||||
|
||||
channel_number = ui_state.channel_list.index(packet["from"])
|
||||
channel_id = ui_state.channel_list[channel_number]
|
||||
|
||||
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
|
||||
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
|
||||
refresh_messages = True
|
||||
else:
|
||||
add_notification(channel_number)
|
||||
@@ -155,18 +157,14 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
||||
|
||||
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
|
||||
|
||||
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
|
||||
(f"{config.message_prefix} {message_from_string}", msg_str)
|
||||
)
|
||||
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
|
||||
|
||||
if refresh_channels:
|
||||
draw_channel_list()
|
||||
if refresh_messages:
|
||||
draw_messages_window(True)
|
||||
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
|
||||
|
||||
save_message_to_db(channel_id, packet["from"], msg_str)
|
||||
|
||||
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
|
||||
"""
|
||||
@@ -190,32 +188,7 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
|
||||
channelIndex=send_on_channel,
|
||||
)
|
||||
|
||||
# Add sent message to the messages dictionary
|
||||
if channel_id not in ui_state.all_messages:
|
||||
ui_state.all_messages[channel_id] = []
|
||||
|
||||
# Handle timestamp logic
|
||||
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
|
||||
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||
|
||||
# Retrieve the last timestamp if available
|
||||
channel_messages = ui_state.all_messages[channel_id]
|
||||
if channel_messages:
|
||||
# Check the last entry for a timestamp
|
||||
for entry in reversed(channel_messages):
|
||||
if entry[0].startswith("--"):
|
||||
last_hour = entry[0].strip("- ").strip()
|
||||
break
|
||||
else:
|
||||
last_hour = None
|
||||
else:
|
||||
last_hour = None
|
||||
|
||||
# Add a new timestamp if it's a new hour
|
||||
if last_hour != current_hour:
|
||||
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
||||
|
||||
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
|
||||
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
|
||||
|
||||
timestamp = save_message_to_db(channel_id, myid, message)
|
||||
|
||||
@@ -230,10 +203,14 @@ def send_traceroute() -> None:
|
||||
"""
|
||||
Sends a RouteDiscovery protobuf to the selected node.
|
||||
"""
|
||||
|
||||
channel_id = ui_state.node_list[ui_state.selected_node]
|
||||
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
|
||||
|
||||
r = mesh_pb2.RouteDiscovery()
|
||||
interface_state.interface.sendData(
|
||||
r,
|
||||
destinationId=ui_state.node_list[ui_state.selected_node],
|
||||
destinationId=channel_id,
|
||||
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
|
||||
wantResponse=True,
|
||||
onResponse=on_response_traceroute,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import curses
|
||||
import textwrap
|
||||
import logging
|
||||
import time
|
||||
import traceback
|
||||
from typing import Union
|
||||
|
||||
@@ -12,28 +12,36 @@ from contact.utilities.db_handler import get_name_from_database, update_node_inf
|
||||
from contact.utilities.input_handlers import get_list_input
|
||||
import contact.ui.default_config as config
|
||||
import contact.ui.dialog
|
||||
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines
|
||||
from contact.utilities.singleton import ui_state, interface_state
|
||||
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
|
||||
from contact.utilities.singleton import ui_state, interface_state, menu_state
|
||||
|
||||
|
||||
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
||||
"""Handle terminal resize events and redraw the UI accordingly."""
|
||||
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win
|
||||
|
||||
# Calculate window max dimensions
|
||||
height, width = stdscr.getmaxyx()
|
||||
|
||||
# Define window dimensions and positions
|
||||
channel_width = 3 * (width // 16)
|
||||
nodes_width = 5 * (width // 16)
|
||||
channel_width = int(config.channel_list_16ths) * (width // 16)
|
||||
nodes_width = int(config.node_list_16ths) * (width // 16)
|
||||
messages_width = width - channel_width - nodes_width
|
||||
|
||||
entry_height = 3
|
||||
function_height = 3
|
||||
y_pad = entry_height + function_height
|
||||
packet_log_height = int(height / 3)
|
||||
|
||||
if firstrun:
|
||||
entry_win = curses.newwin(3, width, 0, 0)
|
||||
channel_win = curses.newwin(height - 6, channel_width, 3, 0)
|
||||
messages_win = curses.newwin(height - 6, messages_width, 3, channel_width)
|
||||
nodes_win = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
|
||||
function_win = curses.newwin(3, width, height - 3, 0)
|
||||
packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width)
|
||||
entry_win = curses.newwin(entry_height, width, 0, 0)
|
||||
channel_win = curses.newwin(height - y_pad, channel_width, entry_height, 0)
|
||||
messages_win = curses.newwin(height - y_pad, messages_width, entry_height, channel_width)
|
||||
nodes_win = curses.newwin(height - y_pad, nodes_width, entry_height, channel_width + messages_width)
|
||||
function_win = curses.newwin(function_height, width, height - function_height, 0)
|
||||
packetlog_win = curses.newwin(
|
||||
packet_log_height, messages_width, height - packet_log_height - function_height, channel_width
|
||||
)
|
||||
|
||||
# Will be resized to what we need when drawn
|
||||
messages_pad = curses.newpad(1, 1)
|
||||
@@ -58,19 +66,19 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
||||
|
||||
entry_win.resize(3, width)
|
||||
|
||||
channel_win.resize(height - 6, channel_width)
|
||||
channel_win.resize(height - y_pad, channel_width)
|
||||
|
||||
messages_win.resize(height - 6, messages_width)
|
||||
messages_win.resize(height - y_pad, messages_width)
|
||||
messages_win.mvwin(3, channel_width)
|
||||
|
||||
nodes_win.resize(height - 6, nodes_width)
|
||||
nodes_win.mvwin(3, channel_width + messages_width)
|
||||
nodes_win.resize(height - y_pad, nodes_width)
|
||||
nodes_win.mvwin(entry_height, channel_width + messages_width)
|
||||
|
||||
function_win.resize(3, width)
|
||||
function_win.mvwin(height - 3, 0)
|
||||
function_win.mvwin(height - function_height, 0)
|
||||
|
||||
packetlog_win.resize(int(height / 3), messages_width)
|
||||
packetlog_win.mvwin(height - int(height / 3) - 3, channel_width)
|
||||
packetlog_win.resize(packet_log_height, messages_width)
|
||||
packetlog_win.mvwin(height - packet_log_height - function_height, channel_width)
|
||||
|
||||
# Draw window borders
|
||||
for win in [channel_win, entry_win, nodes_win, messages_win, function_win]:
|
||||
@@ -93,6 +101,7 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
||||
|
||||
|
||||
def main_ui(stdscr: curses.window) -> None:
|
||||
"""Main UI loop for the curses interface."""
|
||||
global input_text
|
||||
input_text = ""
|
||||
stdscr.keypad(True)
|
||||
@@ -100,7 +109,7 @@ def main_ui(stdscr: curses.window) -> None:
|
||||
handle_resize(stdscr, True)
|
||||
|
||||
while True:
|
||||
draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
|
||||
draw_text_field(entry_win, f"Input: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
|
||||
|
||||
# Get user input from entry window
|
||||
char = entry_win.get_wch()
|
||||
@@ -108,307 +117,59 @@ def main_ui(stdscr: curses.window) -> None:
|
||||
# draw_debug(f"Keypress: {char}")
|
||||
|
||||
if char == curses.KEY_UP:
|
||||
if ui_state.current_window == 0:
|
||||
scroll_channels(-1)
|
||||
elif ui_state.current_window == 1:
|
||||
scroll_messages(-1)
|
||||
elif ui_state.current_window == 2:
|
||||
scroll_nodes(-1)
|
||||
handle_up()
|
||||
|
||||
elif char == curses.KEY_DOWN:
|
||||
if ui_state.current_window == 0:
|
||||
scroll_channels(1)
|
||||
elif ui_state.current_window == 1:
|
||||
scroll_messages(1)
|
||||
elif ui_state.current_window == 2:
|
||||
scroll_nodes(1)
|
||||
handle_down()
|
||||
|
||||
elif char == curses.KEY_HOME:
|
||||
if ui_state.current_window == 0:
|
||||
select_channel(0)
|
||||
elif ui_state.current_window == 1:
|
||||
ui_state.selected_message = 0
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
select_node(0)
|
||||
handle_home()
|
||||
|
||||
elif char == curses.KEY_END:
|
||||
if ui_state.current_window == 0:
|
||||
select_channel(len(ui_state.channel_list) - 1)
|
||||
elif ui_state.current_window == 1:
|
||||
msg_line_count = messages_pad.getmaxyx()[0]
|
||||
ui_state.selected_message = max(msg_line_count - get_msg_window_lines(messages_win, packetlog_win), 0)
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
select_node(len(ui_state.node_list) - 1)
|
||||
handle_end()
|
||||
|
||||
elif char == curses.KEY_PPAGE:
|
||||
if ui_state.current_window == 0:
|
||||
select_channel(
|
||||
ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2)
|
||||
) # select_channel will bounds check for us
|
||||
elif ui_state.current_window == 1:
|
||||
ui_state.selected_message = max(
|
||||
ui_state.selected_message - get_msg_window_lines(messages_win, packetlog_win), 0
|
||||
)
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
select_node(
|
||||
ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2)
|
||||
) # select_node will bounds check for us
|
||||
handle_pageup()
|
||||
|
||||
elif char == curses.KEY_NPAGE:
|
||||
if ui_state.current_window == 0:
|
||||
select_channel(
|
||||
ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2)
|
||||
) # select_channel will bounds check for us
|
||||
elif ui_state.current_window == 1:
|
||||
msg_line_count = messages_pad.getmaxyx()[0]
|
||||
ui_state.selected_message = min(
|
||||
ui_state.selected_message + get_msg_window_lines(messages_win, packetlog_win),
|
||||
msg_line_count - get_msg_window_lines(messages_win, packetlog_win),
|
||||
)
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
select_node(
|
||||
ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2)
|
||||
) # select_node will bounds check for us
|
||||
handle_pagedown()
|
||||
|
||||
elif char == curses.KEY_LEFT or char == curses.KEY_RIGHT:
|
||||
delta = -1 if char == curses.KEY_LEFT else 1
|
||||
|
||||
old_window = ui_state.current_window
|
||||
ui_state.current_window = (ui_state.current_window + delta) % 3
|
||||
|
||||
if old_window == 0:
|
||||
channel_win.attrset(get_color("window_frame"))
|
||||
channel_win.box()
|
||||
channel_win.refresh()
|
||||
refresh_pad(0)
|
||||
if old_window == 1:
|
||||
messages_win.attrset(get_color("window_frame"))
|
||||
messages_win.box()
|
||||
messages_win.refresh()
|
||||
refresh_pad(1)
|
||||
elif old_window == 2:
|
||||
draw_function_win()
|
||||
nodes_win.attrset(get_color("window_frame"))
|
||||
nodes_win.box()
|
||||
nodes_win.refresh()
|
||||
refresh_pad(2)
|
||||
|
||||
if ui_state.current_window == 0:
|
||||
channel_win.attrset(get_color("window_frame_selected"))
|
||||
channel_win.box()
|
||||
channel_win.attrset(get_color("window_frame"))
|
||||
channel_win.refresh()
|
||||
refresh_pad(0)
|
||||
elif ui_state.current_window == 1:
|
||||
messages_win.attrset(get_color("window_frame_selected"))
|
||||
messages_win.box()
|
||||
messages_win.attrset(get_color("window_frame"))
|
||||
messages_win.refresh()
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
draw_function_win()
|
||||
nodes_win.attrset(get_color("window_frame_selected"))
|
||||
nodes_win.box()
|
||||
nodes_win.attrset(get_color("window_frame"))
|
||||
nodes_win.refresh()
|
||||
refresh_pad(2)
|
||||
|
||||
# Check for Esc
|
||||
elif char == chr(27):
|
||||
break
|
||||
|
||||
# Check for Ctrl + t
|
||||
elif char == chr(20):
|
||||
send_traceroute()
|
||||
curses.curs_set(0) # Hide cursor
|
||||
contact.ui.dialog.dialog(
|
||||
stdscr,
|
||||
"Traceroute Sent",
|
||||
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
|
||||
)
|
||||
curses.curs_set(1) # Show cursor again
|
||||
handle_resize(stdscr, False)
|
||||
handle_leftright(char)
|
||||
|
||||
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
|
||||
if ui_state.current_window == 2:
|
||||
node_list = ui_state.node_list
|
||||
if node_list[ui_state.selected_node] not in ui_state.channel_list:
|
||||
ui_state.channel_list.append(node_list[ui_state.selected_node])
|
||||
if node_list[ui_state.selected_node] not in ui_state.all_messages:
|
||||
ui_state.all_messages[node_list[ui_state.selected_node]] = []
|
||||
input_text = handle_enter(input_text)
|
||||
|
||||
ui_state.selected_channel = ui_state.channel_list.index(node_list[ui_state.selected_node])
|
||||
|
||||
if is_chat_archived(ui_state.channel_list[ui_state.selected_channel]):
|
||||
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=False)
|
||||
|
||||
ui_state.selected_node = 0
|
||||
ui_state.current_window = 0
|
||||
|
||||
draw_node_list()
|
||||
draw_channel_list()
|
||||
draw_messages_window(True)
|
||||
|
||||
elif len(input_text) > 0:
|
||||
# Enter key pressed, send user input as message
|
||||
send_message(input_text, channel=ui_state.selected_channel)
|
||||
draw_messages_window(True)
|
||||
|
||||
# Clear entry window and reset input text
|
||||
input_text = ""
|
||||
entry_win.erase()
|
||||
elif char == chr(20): # Ctrl + t for Traceroute
|
||||
handle_ctrl_t(stdscr)
|
||||
|
||||
elif char in (curses.KEY_BACKSPACE, chr(127)):
|
||||
if input_text:
|
||||
input_text = input_text[:-1]
|
||||
y, x = entry_win.getyx()
|
||||
entry_win.move(y, x - 1)
|
||||
entry_win.addch(" ") #
|
||||
entry_win.move(y, x - 1)
|
||||
entry_win.refresh()
|
||||
input_text = handle_backspace(entry_win, input_text)
|
||||
|
||||
elif char == "`": # ` Launch the settings interface
|
||||
curses.curs_set(0)
|
||||
settings_menu(stdscr, interface_state.interface)
|
||||
curses.curs_set(1)
|
||||
refresh_node_list()
|
||||
handle_resize(stdscr, False)
|
||||
handle_backtick(stdscr)
|
||||
|
||||
elif char == chr(16):
|
||||
# Display packet log
|
||||
if ui_state.display_log is False:
|
||||
ui_state.display_log = True
|
||||
draw_messages_window(True)
|
||||
else:
|
||||
ui_state.display_log = False
|
||||
packetlog_win.erase()
|
||||
draw_messages_window(True)
|
||||
elif char == chr(16): # Ctrl + P for Packet Log
|
||||
handle_ctrl_p()
|
||||
|
||||
elif char == curses.KEY_RESIZE:
|
||||
input_text = ""
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
# ^D
|
||||
elif char == chr(4):
|
||||
if ui_state.current_window == 0:
|
||||
if isinstance(ui_state.channel_list[ui_state.selected_channel], int):
|
||||
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=True)
|
||||
elif char == chr(4): # Ctrl + D to delete current channel or node
|
||||
handle_ctrl_d()
|
||||
|
||||
# Shift notifications up to account for deleted item
|
||||
for i in range(len(ui_state.notifications)):
|
||||
if ui_state.notifications[i] > ui_state.selected_channel:
|
||||
ui_state.notifications[i] -= 1
|
||||
elif char == chr(31): # Ctrl + / to search
|
||||
handle_ctrl_fslash()
|
||||
|
||||
del ui_state.channel_list[ui_state.selected_channel]
|
||||
ui_state.selected_channel = min(ui_state.selected_channel, len(ui_state.channel_list) - 1)
|
||||
select_channel(ui_state.selected_channel)
|
||||
draw_channel_list()
|
||||
draw_messages_window()
|
||||
elif char == chr(6): # Ctrl + F to toggle favorite
|
||||
handle_ctrl_f(stdscr)
|
||||
|
||||
if ui_state.current_window == 2:
|
||||
curses.curs_set(0)
|
||||
confirmation = get_list_input(
|
||||
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from nodedb?",
|
||||
"No",
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node])
|
||||
elif char == chr(7): # Ctrl + G to toggle ignored
|
||||
handle_ctlr_g(stdscr)
|
||||
|
||||
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
|
||||
del interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
||||
|
||||
# Convert to "!hex" representation that interface.nodes uses
|
||||
hexid = f"!{hex(ui_state.node_list[ui_state.selected_node])[2:]}"
|
||||
del interface_state.interface.nodes[hexid]
|
||||
|
||||
ui_state.node_list.pop(ui_state.selected_node)
|
||||
|
||||
draw_messages_window()
|
||||
draw_node_list()
|
||||
else:
|
||||
draw_messages_window()
|
||||
curses.curs_set(1)
|
||||
continue
|
||||
|
||||
# ^/
|
||||
elif char == chr(31):
|
||||
if ui_state.current_window == 2 or ui_state.current_window == 0:
|
||||
search(ui_state.current_window)
|
||||
|
||||
# ^F
|
||||
elif char == chr(6):
|
||||
if ui_state.current_window == 2:
|
||||
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
||||
|
||||
curses.curs_set(0)
|
||||
|
||||
if "isFavorite" not in selectedNode or selectedNode["isFavorite"] == False:
|
||||
confirmation = get_list_input(
|
||||
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Favorite?",
|
||||
None,
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node])
|
||||
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
|
||||
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
|
||||
"isFavorite"
|
||||
] = True
|
||||
|
||||
refresh_node_list()
|
||||
|
||||
else:
|
||||
confirmation = get_list_input(
|
||||
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Favorites?",
|
||||
None,
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node])
|
||||
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
|
||||
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
|
||||
"isFavorite"
|
||||
] = False
|
||||
|
||||
refresh_node_list()
|
||||
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
elif char == chr(7):
|
||||
if ui_state.current_window == 2:
|
||||
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
||||
|
||||
curses.curs_set(0)
|
||||
|
||||
if "isIgnored" not in selectedNode or selectedNode["isIgnored"] == False:
|
||||
confirmation = get_list_input(
|
||||
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Ignored?",
|
||||
"No",
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node])
|
||||
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
|
||||
"isIgnored"
|
||||
] = True
|
||||
else:
|
||||
confirmation = get_list_input(
|
||||
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Ignored?",
|
||||
"No",
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node])
|
||||
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]][
|
||||
"isIgnored"
|
||||
] = False
|
||||
|
||||
handle_resize(stdscr, False)
|
||||
elif char == chr(27): # Escape to exit
|
||||
break
|
||||
|
||||
else:
|
||||
# Append typed character to input text
|
||||
@@ -418,7 +179,326 @@ def main_ui(stdscr: curses.window) -> None:
|
||||
input_text += chr(char)
|
||||
|
||||
|
||||
def handle_up() -> None:
|
||||
"""Handle key up events to scroll the current window."""
|
||||
if ui_state.current_window == 0:
|
||||
scroll_channels(-1)
|
||||
elif ui_state.current_window == 1:
|
||||
scroll_messages(-1)
|
||||
elif ui_state.current_window == 2:
|
||||
scroll_nodes(-1)
|
||||
|
||||
|
||||
def handle_down() -> None:
|
||||
"""Handle key down events to scroll the current window."""
|
||||
if ui_state.current_window == 0:
|
||||
scroll_channels(1)
|
||||
elif ui_state.current_window == 1:
|
||||
scroll_messages(1)
|
||||
elif ui_state.current_window == 2:
|
||||
scroll_nodes(1)
|
||||
|
||||
|
||||
def handle_home() -> None:
|
||||
"""Handle home key events to select the first item in the current window."""
|
||||
if ui_state.current_window == 0:
|
||||
select_channel(0)
|
||||
elif ui_state.current_window == 1:
|
||||
ui_state.selected_message = 0
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
select_node(0)
|
||||
|
||||
|
||||
def handle_end() -> None:
|
||||
"""Handle end key events to select the last item in the current window."""
|
||||
if ui_state.current_window == 0:
|
||||
select_channel(len(ui_state.channel_list) - 1)
|
||||
elif ui_state.current_window == 1:
|
||||
msg_line_count = messages_pad.getmaxyx()[0]
|
||||
ui_state.selected_message = max(msg_line_count - get_msg_window_lines(messages_win, packetlog_win), 0)
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
select_node(len(ui_state.node_list) - 1)
|
||||
|
||||
|
||||
def handle_pageup() -> None:
|
||||
"""Handle page up key events to scroll the current window by a page."""
|
||||
if ui_state.current_window == 0:
|
||||
select_channel(
|
||||
ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2)
|
||||
) # select_channel will bounds check for us
|
||||
elif ui_state.current_window == 1:
|
||||
ui_state.selected_message = max(
|
||||
ui_state.selected_message - get_msg_window_lines(messages_win, packetlog_win), 0
|
||||
)
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
select_node(ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us
|
||||
|
||||
|
||||
def handle_pagedown() -> None:
|
||||
"""Handle page down key events to scroll the current window down."""
|
||||
if ui_state.current_window == 0:
|
||||
select_channel(
|
||||
ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2)
|
||||
) # select_channel will bounds check for us
|
||||
elif ui_state.current_window == 1:
|
||||
msg_line_count = messages_pad.getmaxyx()[0]
|
||||
ui_state.selected_message = min(
|
||||
ui_state.selected_message + get_msg_window_lines(messages_win, packetlog_win),
|
||||
msg_line_count - get_msg_window_lines(messages_win, packetlog_win),
|
||||
)
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
select_node(ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us
|
||||
|
||||
|
||||
def handle_leftright(char: int) -> None:
|
||||
"""Handle left/right key events to switch between windows."""
|
||||
delta = -1 if char == curses.KEY_LEFT else 1
|
||||
old_window = ui_state.current_window
|
||||
ui_state.current_window = (ui_state.current_window + delta) % 3
|
||||
|
||||
if old_window == 0:
|
||||
channel_win.attrset(get_color("window_frame"))
|
||||
channel_win.box()
|
||||
channel_win.refresh()
|
||||
refresh_pad(0)
|
||||
if old_window == 1:
|
||||
messages_win.attrset(get_color("window_frame"))
|
||||
messages_win.box()
|
||||
messages_win.refresh()
|
||||
refresh_pad(1)
|
||||
elif old_window == 2:
|
||||
draw_function_win()
|
||||
nodes_win.attrset(get_color("window_frame"))
|
||||
nodes_win.box()
|
||||
nodes_win.refresh()
|
||||
refresh_pad(2)
|
||||
|
||||
if ui_state.current_window == 0:
|
||||
channel_win.attrset(get_color("window_frame_selected"))
|
||||
channel_win.box()
|
||||
channel_win.attrset(get_color("window_frame"))
|
||||
channel_win.refresh()
|
||||
refresh_pad(0)
|
||||
elif ui_state.current_window == 1:
|
||||
messages_win.attrset(get_color("window_frame_selected"))
|
||||
messages_win.box()
|
||||
messages_win.attrset(get_color("window_frame"))
|
||||
messages_win.refresh()
|
||||
refresh_pad(1)
|
||||
elif ui_state.current_window == 2:
|
||||
draw_function_win()
|
||||
nodes_win.attrset(get_color("window_frame_selected"))
|
||||
nodes_win.box()
|
||||
nodes_win.attrset(get_color("window_frame"))
|
||||
nodes_win.refresh()
|
||||
refresh_pad(2)
|
||||
|
||||
|
||||
def handle_enter(input_text: str) -> str:
|
||||
"""Handle Enter key events to send messages or select channels."""
|
||||
if ui_state.current_window == 2:
|
||||
node_list = ui_state.node_list
|
||||
if node_list[ui_state.selected_node] not in ui_state.channel_list:
|
||||
ui_state.channel_list.append(node_list[ui_state.selected_node])
|
||||
if node_list[ui_state.selected_node] not in ui_state.all_messages:
|
||||
ui_state.all_messages[node_list[ui_state.selected_node]] = []
|
||||
|
||||
ui_state.selected_channel = ui_state.channel_list.index(node_list[ui_state.selected_node])
|
||||
|
||||
if is_chat_archived(ui_state.channel_list[ui_state.selected_channel]):
|
||||
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=False)
|
||||
|
||||
ui_state.selected_node = 0
|
||||
ui_state.current_window = 0
|
||||
|
||||
draw_node_list()
|
||||
draw_channel_list()
|
||||
draw_messages_window(True)
|
||||
return input_text
|
||||
|
||||
elif len(input_text) > 0:
|
||||
# TODO: This is a hack to prevent sending messages too quickly. Let's get errors from the node.
|
||||
now = time.monotonic()
|
||||
if now - ui_state.last_sent_time < 2.5:
|
||||
contact.ui.dialog.dialog("Slow down", "Please wait 2 seconds between messages.")
|
||||
return input_text
|
||||
# Enter key pressed, send user input as message
|
||||
send_message(input_text, channel=ui_state.selected_channel)
|
||||
draw_messages_window(True)
|
||||
ui_state.last_sent_time = now
|
||||
# Clear entry window and reset input text
|
||||
entry_win.erase()
|
||||
return ""
|
||||
return input_text
|
||||
|
||||
|
||||
def handle_ctrl_t(stdscr: curses.window) -> None:
|
||||
"""Handle Ctrl + T key events to send a traceroute."""
|
||||
send_traceroute()
|
||||
curses.curs_set(0) # Hide cursor
|
||||
contact.ui.dialog.dialog(
|
||||
f"Traceroute Sent To: {get_name_from_database(ui_state.node_list[ui_state.selected_node])}",
|
||||
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
|
||||
)
|
||||
curses.curs_set(1) # Show cursor again
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
|
||||
def handle_backspace(entry_win: curses.window, input_text: str) -> str:
|
||||
"""Handle backspace key events to remove the last character from input text."""
|
||||
if input_text:
|
||||
input_text = input_text[:-1]
|
||||
y, x = entry_win.getyx()
|
||||
entry_win.move(y, x - 1)
|
||||
entry_win.addch(" ") #
|
||||
entry_win.move(y, x - 1)
|
||||
entry_win.refresh()
|
||||
return input_text
|
||||
|
||||
|
||||
def handle_backtick(stdscr: curses.window) -> None:
|
||||
"""Handle backtick key events to open the settings menu."""
|
||||
curses.curs_set(0)
|
||||
previous_window = ui_state.current_window
|
||||
ui_state.current_window = 4
|
||||
settings_menu(stdscr, interface_state.interface)
|
||||
ui_state.current_window = previous_window
|
||||
curses.curs_set(1)
|
||||
refresh_node_list()
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
|
||||
def handle_ctrl_p() -> None:
|
||||
"""Handle Ctrl + P key events to toggle the packet log display."""
|
||||
# Display packet log
|
||||
if ui_state.display_log is False:
|
||||
ui_state.display_log = True
|
||||
draw_messages_window(True)
|
||||
else:
|
||||
ui_state.display_log = False
|
||||
packetlog_win.erase()
|
||||
draw_messages_window(True)
|
||||
|
||||
|
||||
def handle_ctrl_d() -> None:
|
||||
if ui_state.current_window == 0:
|
||||
if isinstance(ui_state.channel_list[ui_state.selected_channel], int):
|
||||
update_node_info_in_db(ui_state.channel_list[ui_state.selected_channel], chat_archived=True)
|
||||
|
||||
# Shift notifications up to account for deleted item
|
||||
for i in range(len(ui_state.notifications)):
|
||||
if ui_state.notifications[i] > ui_state.selected_channel:
|
||||
ui_state.notifications[i] -= 1
|
||||
|
||||
del ui_state.channel_list[ui_state.selected_channel]
|
||||
ui_state.selected_channel = min(ui_state.selected_channel, len(ui_state.channel_list) - 1)
|
||||
select_channel(ui_state.selected_channel)
|
||||
draw_channel_list()
|
||||
draw_messages_window()
|
||||
|
||||
if ui_state.current_window == 2:
|
||||
curses.curs_set(0)
|
||||
confirmation = get_list_input(
|
||||
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from nodedb?",
|
||||
"No",
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.removeNode(ui_state.node_list[ui_state.selected_node])
|
||||
|
||||
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
|
||||
del interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
||||
|
||||
# Convert to "!hex" representation that interface.nodes uses
|
||||
hexid = f"!{hex(ui_state.node_list[ui_state.selected_node])[2:]}"
|
||||
del interface_state.interface.nodes[hexid]
|
||||
|
||||
ui_state.node_list.pop(ui_state.selected_node)
|
||||
|
||||
draw_messages_window()
|
||||
draw_node_list()
|
||||
else:
|
||||
draw_messages_window()
|
||||
curses.curs_set(1)
|
||||
|
||||
|
||||
def handle_ctrl_fslash() -> None:
|
||||
"""Handle Ctrl + / key events to search in the current window."""
|
||||
if ui_state.current_window == 2 or ui_state.current_window == 0:
|
||||
search(ui_state.current_window)
|
||||
|
||||
|
||||
def handle_ctrl_f(stdscr: curses.window) -> None:
|
||||
"""Handle Ctrl + F key events to toggle favorite status of the selected node."""
|
||||
if ui_state.current_window == 2:
|
||||
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
||||
|
||||
curses.curs_set(0)
|
||||
|
||||
if "isFavorite" not in selectedNode or selectedNode["isFavorite"] == False:
|
||||
confirmation = get_list_input(
|
||||
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Favorite?",
|
||||
None,
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.setFavorite(ui_state.node_list[ui_state.selected_node])
|
||||
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
|
||||
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isFavorite"] = True
|
||||
|
||||
refresh_node_list()
|
||||
|
||||
else:
|
||||
confirmation = get_list_input(
|
||||
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Favorites?",
|
||||
None,
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.removeFavorite(ui_state.node_list[ui_state.selected_node])
|
||||
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
|
||||
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isFavorite"] = False
|
||||
|
||||
refresh_node_list()
|
||||
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
|
||||
def handle_ctlr_g(stdscr: curses.window) -> None:
|
||||
"""Handle Ctrl + G key events to toggle ignored status of the selected node."""
|
||||
if ui_state.current_window == 2:
|
||||
selectedNode = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
||||
|
||||
curses.curs_set(0)
|
||||
|
||||
if "isIgnored" not in selectedNode or selectedNode["isIgnored"] == False:
|
||||
confirmation = get_list_input(
|
||||
f"Set {get_name_from_database(ui_state.node_list[ui_state.selected_node])} as Ignored?",
|
||||
"No",
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.setIgnored(ui_state.node_list[ui_state.selected_node])
|
||||
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isIgnored"] = True
|
||||
else:
|
||||
confirmation = get_list_input(
|
||||
f"Remove {get_name_from_database(ui_state.node_list[ui_state.selected_node])} from Ignored?",
|
||||
"No",
|
||||
["Yes", "No"],
|
||||
)
|
||||
if confirmation == "Yes":
|
||||
interface_state.interface.localNode.removeIgnored(ui_state.node_list[ui_state.selected_node])
|
||||
interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]["isIgnored"] = False
|
||||
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
|
||||
def draw_channel_list() -> None:
|
||||
"""Update the channel list window and pad based on the current state."""
|
||||
channel_pad.erase()
|
||||
win_width = channel_win.getmaxyx()[1]
|
||||
|
||||
@@ -479,7 +559,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
|
||||
row = 0
|
||||
for prefix, message in messages:
|
||||
full_message = f"{prefix}{message}"
|
||||
wrapped_lines = textwrap.wrap(full_message, messages_win.getmaxyx()[1] - 2)
|
||||
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
|
||||
msg_line_count += len(wrapped_lines)
|
||||
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])
|
||||
|
||||
@@ -521,9 +601,12 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
|
||||
refresh_pad(1)
|
||||
|
||||
draw_packetlog_win()
|
||||
if ui_state.current_window == 4:
|
||||
menu_state.need_redraw = True
|
||||
|
||||
|
||||
def draw_node_list() -> None:
|
||||
"""Update the nodes list window and pad based on the current state."""
|
||||
global nodes_pad
|
||||
|
||||
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
|
||||
@@ -572,6 +655,7 @@ def draw_node_list() -> None:
|
||||
|
||||
|
||||
def select_channel(idx: int) -> None:
|
||||
"""Select a channel by index and update the UI state accordingly."""
|
||||
old_selected_channel = ui_state.selected_channel
|
||||
ui_state.selected_channel = max(0, min(idx, len(ui_state.channel_list) - 1))
|
||||
draw_messages_window(True)
|
||||
@@ -593,6 +677,7 @@ def select_channel(idx: int) -> None:
|
||||
|
||||
|
||||
def scroll_channels(direction: int) -> None:
|
||||
"""Scroll through the channel list by a given direction."""
|
||||
new_selected_channel = ui_state.selected_channel + direction
|
||||
|
||||
if new_selected_channel < 0:
|
||||
@@ -604,6 +689,7 @@ def scroll_channels(direction: int) -> None:
|
||||
|
||||
|
||||
def scroll_messages(direction: int) -> None:
|
||||
"""Scroll through the messages in the current channel by a given direction."""
|
||||
ui_state.selected_message += direction
|
||||
|
||||
msg_line_count = messages_pad.getmaxyx()[0]
|
||||
@@ -636,6 +722,7 @@ def scroll_messages(direction: int) -> None:
|
||||
|
||||
|
||||
def select_node(idx: int) -> None:
|
||||
"""Select a node by index and update the UI state accordingly."""
|
||||
old_selected_node = ui_state.selected_node
|
||||
ui_state.selected_node = max(0, min(idx, len(ui_state.node_list) - 1))
|
||||
|
||||
@@ -652,6 +739,7 @@ def select_node(idx: int) -> None:
|
||||
|
||||
|
||||
def scroll_nodes(direction: int) -> None:
|
||||
"""Scroll through the node list by a given direction."""
|
||||
new_selected_node = ui_state.selected_node + direction
|
||||
|
||||
if new_selected_node < 0:
|
||||
@@ -663,7 +751,7 @@ def scroll_nodes(direction: int) -> None:
|
||||
|
||||
|
||||
def draw_packetlog_win() -> None:
|
||||
|
||||
"""Draw the packet log window with the latest packets."""
|
||||
columns = [10, 10, 15, 30]
|
||||
span = 0
|
||||
|
||||
@@ -716,6 +804,7 @@ def draw_packetlog_win() -> None:
|
||||
|
||||
|
||||
def search(win: int) -> None:
|
||||
"""Search for a node or channel based on user input."""
|
||||
start_idx = ui_state.selected_node
|
||||
select_func = select_node
|
||||
|
||||
@@ -764,6 +853,7 @@ def search(win: int) -> None:
|
||||
|
||||
|
||||
def draw_node_details() -> None:
|
||||
"""Draw the details of the selected node in the function window."""
|
||||
node = None
|
||||
try:
|
||||
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
||||
@@ -827,16 +917,18 @@ def draw_node_details() -> None:
|
||||
|
||||
|
||||
def draw_help() -> None:
|
||||
"""Draw the help text in the function window."""
|
||||
cmds = [
|
||||
"↑→↓← = Select",
|
||||
" ENTER = Send",
|
||||
" ` = Settings",
|
||||
" ^P = Packet Log",
|
||||
" ESC = Quit",
|
||||
" ^t = Traceroute",
|
||||
" ^d = Archive Chat",
|
||||
" ^f = Favorite",
|
||||
" ^g = Ignore",
|
||||
" ENTER = Send",
|
||||
" ` = Settings",
|
||||
" ESC = Quit",
|
||||
" ^P = Packet Log",
|
||||
" ^t = Traceroute",
|
||||
" ^d = Archive Chat",
|
||||
" ^f = Favorite",
|
||||
" ^g = Ignore",
|
||||
" ^/ = Search",
|
||||
]
|
||||
function_str = ""
|
||||
for s in cmds:
|
||||
|
||||
@@ -6,6 +6,7 @@ import sys
|
||||
from typing import List
|
||||
|
||||
from contact.utilities.save_to_radio import save_changes
|
||||
import contact.ui.default_config as config
|
||||
from contact.utilities.config_io import config_export, config_import
|
||||
from contact.utilities.control_utils import parse_ini_file, transform_menu_path
|
||||
from contact.utilities.input_handlers import (
|
||||
@@ -20,9 +21,7 @@ from contact.ui.dialog import dialog
|
||||
from contact.ui.menus import generate_menu_from_protobuf
|
||||
from contact.ui.nav_utils import move_highlight, draw_arrows, update_help_window
|
||||
from contact.ui.user_config import json_editor
|
||||
from contact.ui.ui_state import MenuState
|
||||
|
||||
menu_state = MenuState()
|
||||
from contact.utilities.singleton import menu_state
|
||||
|
||||
# Constants
|
||||
width = 80
|
||||
@@ -36,16 +35,17 @@ script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
|
||||
|
||||
# Paths
|
||||
locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
|
||||
# locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
|
||||
translation_file = os.path.join(parent_dir, "localisations", "en.ini")
|
||||
|
||||
config_folder = os.path.join(locals_dir, "node-configs")
|
||||
# config_folder = os.path.join(locals_dir, "node-configs")
|
||||
config_folder = os.path.abspath(config.node_configs_file_path)
|
||||
|
||||
# Load translations
|
||||
field_mapping, help_text = parse_ini_file(translation_file)
|
||||
|
||||
|
||||
def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.window or pad types
|
||||
def display_menu() -> tuple[object, object]: # curses.window or pad types
|
||||
|
||||
min_help_window_height = 6
|
||||
num_items = len(menu_state.current_menu) + (1 if menu_state.show_save_option else 0)
|
||||
@@ -106,7 +106,7 @@ def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.wind
|
||||
)
|
||||
|
||||
# Draw help window with dynamically updated max_help_lines
|
||||
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path, menu_state)
|
||||
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path)
|
||||
|
||||
menu_win.refresh()
|
||||
menu_pad.refresh(
|
||||
@@ -132,7 +132,6 @@ def draw_help_window(
|
||||
menu_height: int,
|
||||
max_help_lines: int,
|
||||
transformed_path: List[str],
|
||||
menu_state: MenuState,
|
||||
) -> None:
|
||||
|
||||
global help_win
|
||||
@@ -150,6 +149,15 @@ def draw_help_window(
|
||||
)
|
||||
|
||||
|
||||
def get_input_type_for_field(field) -> type:
|
||||
if field.type in (field.TYPE_INT32, field.TYPE_UINT32, field.TYPE_INT64):
|
||||
return int
|
||||
elif field.type in (field.TYPE_FLOAT, field.TYPE_DOUBLE):
|
||||
return float
|
||||
else:
|
||||
return str
|
||||
|
||||
|
||||
def settings_menu(stdscr: object, interface: object) -> None:
|
||||
curses.update_lines_cols()
|
||||
|
||||
@@ -159,29 +167,32 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
modified_settings = {}
|
||||
|
||||
need_redraw = True
|
||||
menu_state.need_redraw = True
|
||||
menu_state.show_save_option = False
|
||||
|
||||
while True:
|
||||
if need_redraw:
|
||||
if menu_state.need_redraw:
|
||||
menu_state.need_redraw = False
|
||||
options = list(menu_state.current_menu.keys())
|
||||
|
||||
# Determine if save option should be shown
|
||||
path = menu_state.menu_path
|
||||
menu_state.show_save_option = (
|
||||
(
|
||||
len(menu_state.menu_path) > 2
|
||||
and ("Radio Settings" in menu_state.menu_path or "Module Settings" in menu_state.menu_path)
|
||||
)
|
||||
or (len(menu_state.menu_path) == 2 and "User Settings" in menu_state.menu_path)
|
||||
or (len(menu_state.menu_path) == 3 and "Channels" in menu_state.menu_path)
|
||||
(len(path) > 2 and ("Radio Settings" in path or "Module Settings" in path))
|
||||
or (len(path) == 2 and "User Settings" in path)
|
||||
or (len(path) == 3 and "Channels" in path)
|
||||
)
|
||||
|
||||
# Display the menu
|
||||
menu_win, menu_pad = display_menu(menu_state)
|
||||
menu_win, menu_pad = display_menu()
|
||||
|
||||
need_redraw = False
|
||||
if menu_win is None:
|
||||
continue # Skip if menu_win is not initialized
|
||||
|
||||
# Capture user input
|
||||
menu_win.timeout(200) # wait up to 200 ms for a keypress (or less if key is pressed)
|
||||
key = menu_win.getch()
|
||||
if key == -1:
|
||||
continue
|
||||
|
||||
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
|
||||
# max_help_lines = 4
|
||||
@@ -215,7 +226,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
)
|
||||
|
||||
elif key == curses.KEY_RESIZE:
|
||||
need_redraw = True
|
||||
menu_state.need_redraw = True
|
||||
curses.update_lines_cols()
|
||||
|
||||
menu_win.erase()
|
||||
@@ -239,7 +250,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
)
|
||||
|
||||
elif key == curses.KEY_RIGHT or key == ord("\n"):
|
||||
need_redraw = True
|
||||
menu_state.need_redraw = True
|
||||
menu_state.start_index.append(0)
|
||||
menu_win.erase()
|
||||
help_win.erase()
|
||||
@@ -268,7 +279,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
break
|
||||
|
||||
elif selected_option == "Export Config File":
|
||||
filename = get_text_input("Enter a filename for the config file")
|
||||
|
||||
filename = get_text_input("Enter a filename for the config file", None, None)
|
||||
if not filename:
|
||||
logging.info("Export aborted: No filename provided.")
|
||||
menu_state.start_index.pop()
|
||||
@@ -290,7 +302,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
with open(yaml_file_path, "w", encoding="utf-8") as file:
|
||||
file.write(config_text)
|
||||
logging.info(f"Config file saved to {yaml_file_path}")
|
||||
dialog(stdscr, "Config File Saved:", yaml_file_path)
|
||||
dialog("Config File Saved:", yaml_file_path)
|
||||
menu_state.need_redraw = True
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
except PermissionError:
|
||||
@@ -306,14 +319,16 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
# Check if folder exists and is not empty
|
||||
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
|
||||
dialog(stdscr, "", " No config files found. Export a config first.")
|
||||
dialog("", " No config files found. Export a config first.")
|
||||
menu_state.need_redraw = True
|
||||
continue # Return to menu
|
||||
|
||||
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
|
||||
|
||||
# Ensure file_list is not empty before proceeding
|
||||
if not file_list:
|
||||
dialog(stdscr, "", " No config files found. Export a config first.")
|
||||
dialog("", " No config files found. Export a config first.")
|
||||
menu_state.need_redraw = True
|
||||
continue
|
||||
|
||||
filename = get_list_input("Choose a config file", None, file_list)
|
||||
@@ -327,7 +342,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
elif selected_option == "Config URL":
|
||||
current_value = interface.localNode.getURL()
|
||||
new_value = get_text_input(f"Config URL is currently: {current_value}")
|
||||
new_value = get_text_input(f"Config URL is currently: {current_value}", None, str)
|
||||
if new_value is not None:
|
||||
current_value = new_value
|
||||
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
|
||||
@@ -380,7 +395,6 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
menu_state.start_index.pop()
|
||||
menu_state.selected_index = 4
|
||||
continue
|
||||
# need_redraw = True
|
||||
|
||||
field_info = menu_state.current_menu.get(selected_option)
|
||||
if isinstance(field_info, tuple):
|
||||
@@ -395,7 +409,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
if selected_option in ["longName", "shortName", "isLicensed"]:
|
||||
if selected_option in ["longName", "shortName"]:
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, None
|
||||
)
|
||||
new_value = current_value if new_value is None else new_value
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
|
||||
@@ -414,7 +430,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif selected_option in ["latitude", "longitude", "altitude"]:
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, float
|
||||
)
|
||||
new_value = current_value if new_value is None else new_value
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
|
||||
@@ -453,17 +471,26 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 13: # Field type 13 corresponds to UINT32
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
input_type = get_input_type_for_field(field)
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
|
||||
)
|
||||
new_value = current_value if new_value is None else int(new_value)
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 2: # Field type 13 corresponds to INT64
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
input_type = get_input_type_for_field(field)
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
|
||||
)
|
||||
new_value = current_value if new_value is None else float(new_value)
|
||||
menu_state.start_index.pop()
|
||||
|
||||
else: # Handle other field types
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
input_type = get_input_type_for_field(field)
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
|
||||
)
|
||||
new_value = current_value if new_value is None else new_value
|
||||
menu_state.start_index.pop()
|
||||
|
||||
@@ -486,7 +513,28 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
menu_state.selected_index = 0
|
||||
|
||||
elif key == curses.KEY_LEFT:
|
||||
need_redraw = True
|
||||
|
||||
# If we are at the main menu and there are unsaved changes, prompt to save
|
||||
if len(menu_state.menu_path) == 3 and modified_settings:
|
||||
|
||||
current_section = menu_state.menu_path[-1]
|
||||
save_prompt = get_list_input(
|
||||
f"You have unsaved changes in {current_section}. Save before exiting?",
|
||||
None,
|
||||
["Yes", "No", "Cancel"],
|
||||
mandatory=True,
|
||||
)
|
||||
if save_prompt == "Cancel":
|
||||
continue # Stay in the menu without doing anything
|
||||
elif save_prompt == "Yes":
|
||||
save_changes(interface, modified_settings, menu_state)
|
||||
logging.info("Changes Saved")
|
||||
|
||||
modified_settings.clear()
|
||||
menu = rebuild_menu_at_current_path(interface, menu_state)
|
||||
pass
|
||||
|
||||
menu_state.need_redraw = True
|
||||
|
||||
menu_win.erase()
|
||||
help_win.erase()
|
||||
@@ -497,8 +545,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
menu_win.refresh()
|
||||
help_win.refresh()
|
||||
|
||||
if len(menu_state.menu_path) < 2:
|
||||
modified_settings.clear()
|
||||
# if len(menu_state.menu_path) < 2:
|
||||
# modified_settings.clear()
|
||||
|
||||
# Navigate back to the previous menu
|
||||
if len(menu_state.menu_path) > 1:
|
||||
@@ -515,6 +563,16 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
break
|
||||
|
||||
|
||||
def rebuild_menu_at_current_path(interface, menu_state):
|
||||
"""Rebuild menus from the device and re-point current_menu to the same path."""
|
||||
new_menu = generate_menu_from_protobuf(interface)
|
||||
cur = new_menu["Main Menu"]
|
||||
for step in menu_state.menu_path[1:]:
|
||||
cur = cur.get(step, {})
|
||||
menu_state.current_menu = cur
|
||||
return new_menu
|
||||
|
||||
|
||||
def set_region(interface: object) -> None:
|
||||
node = interface.getNode("^local")
|
||||
device_config = node.localConfig
|
||||
|
||||
@@ -2,15 +2,69 @@ import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Dict
|
||||
from contact.ui.colors import setup_colors
|
||||
|
||||
# Get the parent directory of the script
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
|
||||
|
||||
# Paths
|
||||
json_file_path = os.path.join(parent_dir, "config.json")
|
||||
log_file_path = os.path.join(parent_dir, "client.log")
|
||||
db_file_path = os.path.join(parent_dir, "client.db")
|
||||
# To test writting to a non-writable directory, you can uncomment the following lines:
|
||||
# mkdir /tmp/test_nonwritable
|
||||
# chmod -w /tmp/test_nonwritable
|
||||
# parent_dir = "/tmp/test_nonwritable"
|
||||
|
||||
|
||||
def reload_config() -> None:
|
||||
loaded_config = initialize_config()
|
||||
assign_config_variables(loaded_config)
|
||||
setup_colors(reinit=True)
|
||||
|
||||
|
||||
def _is_writable_dir(path: str) -> bool:
|
||||
"""
|
||||
Return True if we can create & delete a temp file in `path`.
|
||||
"""
|
||||
if not os.path.isdir(path):
|
||||
return False
|
||||
test_path = os.path.join(path, ".perm_test_tmp")
|
||||
try:
|
||||
with open(test_path, "w", encoding="utf-8") as _tmp:
|
||||
_tmp.write("ok")
|
||||
os.remove(test_path)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def _get_config_root(preferred_dir: str, fallback_name: str = ".contact_client") -> str:
|
||||
"""
|
||||
Choose a writable directory for config artifacts.
|
||||
"""
|
||||
if _is_writable_dir(preferred_dir):
|
||||
return preferred_dir
|
||||
|
||||
home = os.path.expanduser("~")
|
||||
fallback_dir = os.path.join(home, fallback_name)
|
||||
# Ensure the fallback exists.
|
||||
os.makedirs(fallback_dir, exist_ok=True)
|
||||
|
||||
# If *that* still isn't writable, last-ditch: use a system temp dir.
|
||||
if not _is_writable_dir(fallback_dir):
|
||||
import tempfile
|
||||
|
||||
fallback_dir = tempfile.mkdtemp(prefix="contact_client_")
|
||||
|
||||
return fallback_dir
|
||||
|
||||
|
||||
# Pick the root now.
|
||||
config_root = _get_config_root(parent_dir)
|
||||
|
||||
# Paths (derived from the chosen root)
|
||||
json_file_path = os.path.join(config_root, "config.json")
|
||||
log_file_path = os.path.join(config_root, "client.log")
|
||||
db_file_path = os.path.join(config_root, "client.db")
|
||||
node_configs_file_path = os.path.join(config_root, "node-configs/")
|
||||
|
||||
|
||||
def format_json_single_line_arrays(data: Dict[str, object], indent: int = 4) -> str:
|
||||
@@ -123,15 +177,19 @@ def initialize_config() -> Dict[str, object]:
|
||||
"settings_breadcrumbs": ["green", "black"],
|
||||
"settings_warning": ["green", "black"],
|
||||
"settings_note": ["green", "black"],
|
||||
"node_favorite": ["cyan", "white"],
|
||||
"node_ignored": ["red", "white"],
|
||||
"node_favorite": ["cyan", "green"],
|
||||
"node_ignored": ["red", "black"],
|
||||
}
|
||||
default_config_variables = {
|
||||
"channel_list_16ths": "3",
|
||||
"node_list_16ths": "5",
|
||||
"db_file_path": db_file_path,
|
||||
"log_file_path": log_file_path,
|
||||
"node_configs_file_path": node_configs_file_path,
|
||||
"message_prefix": ">>",
|
||||
"sent_message_prefix": ">> Sent",
|
||||
"notification_symbol": "*",
|
||||
"notification_sound": "True",
|
||||
"ack_implicit_str": "[◌]",
|
||||
"ack_str": "[✓]",
|
||||
"nak_str": "[x]",
|
||||
@@ -168,20 +226,26 @@ def initialize_config() -> Dict[str, object]:
|
||||
def assign_config_variables(loaded_config: Dict[str, object]) -> None:
|
||||
# Assign values to local variables
|
||||
|
||||
global db_file_path, log_file_path, message_prefix, sent_message_prefix
|
||||
global db_file_path, log_file_path, node_configs_file_path, message_prefix, sent_message_prefix
|
||||
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
|
||||
global node_list_16ths, channel_list_16ths
|
||||
global theme, COLOR_CONFIG
|
||||
global node_sort
|
||||
global node_sort, notification_sound
|
||||
|
||||
channel_list_16ths = loaded_config["channel_list_16ths"]
|
||||
node_list_16ths = loaded_config["node_list_16ths"]
|
||||
db_file_path = loaded_config["db_file_path"]
|
||||
log_file_path = loaded_config["log_file_path"]
|
||||
node_configs_file_path = loaded_config.get("node_configs_file_path")
|
||||
message_prefix = loaded_config["message_prefix"]
|
||||
sent_message_prefix = loaded_config["sent_message_prefix"]
|
||||
notification_symbol = loaded_config["notification_symbol"]
|
||||
notification_sound = loaded_config["notification_sound"]
|
||||
ack_implicit_str = loaded_config["ack_implicit_str"]
|
||||
ack_str = loaded_config["ack_str"]
|
||||
nak_str = loaded_config["nak_str"]
|
||||
ack_unknown_str = loaded_config["ack_unknown_str"]
|
||||
node_sort = loaded_config["node_sort"]
|
||||
theme = loaded_config["theme"]
|
||||
if theme == "dark":
|
||||
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
|
||||
@@ -189,7 +253,6 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
|
||||
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
|
||||
elif theme == "green":
|
||||
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
|
||||
node_sort = loaded_config["node_sort"]
|
||||
|
||||
|
||||
# Call the function when the script is imported
|
||||
@@ -205,6 +268,7 @@ if __name__ == "__main__":
|
||||
print("\nLoaded Configuration:")
|
||||
print(f"Database File Path: {db_file_path}")
|
||||
print(f"Log File Path: {log_file_path}")
|
||||
print(f"Configs File Path: {node_configs_file_path}")
|
||||
print(f"Message Prefix: {message_prefix}")
|
||||
print(f"Sent Message Prefix: {sent_message_prefix}")
|
||||
print(f"Notification Symbol: {notification_symbol}")
|
||||
|
||||
@@ -1,44 +1,63 @@
|
||||
import curses
|
||||
from contact.ui.colors import get_color
|
||||
from contact.utilities.singleton import menu_state, ui_state
|
||||
|
||||
|
||||
def dialog(stdscr: curses.window, title: str, message: str) -> None:
|
||||
height, width = stdscr.getmaxyx()
|
||||
def dialog(title: str, message: str) -> None:
|
||||
"""Display a dialog with a title and message."""
|
||||
|
||||
# Calculate dialog dimensions
|
||||
max_line_lengh = 0
|
||||
previous_window = ui_state.current_window
|
||||
ui_state.current_window = 4
|
||||
|
||||
curses.update_lines_cols()
|
||||
height, width = curses.LINES, curses.COLS
|
||||
|
||||
# Parse message into lines and calculate dimensions
|
||||
message_lines = message.splitlines()
|
||||
for l in message_lines:
|
||||
max_line_length = max(len(l), max_line_lengh)
|
||||
max_line_length = max(len(l) for l in message_lines)
|
||||
dialog_width = max(len(title) + 4, max_line_length + 4)
|
||||
dialog_height = len(message_lines) + 4
|
||||
x = (width - dialog_width) // 2
|
||||
y = (height - dialog_height) // 2
|
||||
|
||||
# Create dialog window
|
||||
def draw_window():
|
||||
win.erase()
|
||||
win.bkgd(get_color("background"))
|
||||
win.attrset(get_color("window_frame"))
|
||||
win.border(0)
|
||||
|
||||
win.addstr(0, 2, title, get_color("settings_default"))
|
||||
|
||||
for i, line in enumerate(message_lines):
|
||||
msg_x = (dialog_width - len(line)) // 2
|
||||
win.addstr(2 + i, msg_x, line, get_color("settings_default"))
|
||||
|
||||
ok_text = " Ok "
|
||||
win.addstr(
|
||||
dialog_height - 2,
|
||||
(dialog_width - len(ok_text)) // 2,
|
||||
ok_text,
|
||||
get_color("settings_default", reverse=True),
|
||||
)
|
||||
|
||||
win.refresh()
|
||||
|
||||
win = curses.newwin(dialog_height, dialog_width, y, x)
|
||||
win.bkgd(get_color("background"))
|
||||
win.attrset(get_color("window_frame"))
|
||||
win.border(0)
|
||||
draw_window()
|
||||
|
||||
# Add title
|
||||
win.addstr(0, 2, title, get_color("settings_default"))
|
||||
|
||||
# Add message
|
||||
for i, l in enumerate(message_lines):
|
||||
win.addstr(2 + i, 2, l, get_color("settings_default"))
|
||||
|
||||
# Add button
|
||||
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
|
||||
|
||||
# Refresh dialog window
|
||||
win.refresh()
|
||||
|
||||
# Get user input
|
||||
while True:
|
||||
win.timeout(200)
|
||||
char = win.getch()
|
||||
# Close dialog with enter, space, or esc
|
||||
if char in (curses.KEY_ENTER, 10, 13, 32, 27):
|
||||
|
||||
if menu_state.need_redraw:
|
||||
menu_state.need_redraw = False
|
||||
draw_window()
|
||||
|
||||
if char in (curses.KEY_ENTER, 10, 13, 32, 27): # Enter, space, Esc
|
||||
win.erase()
|
||||
win.refresh()
|
||||
ui_state.current_window = previous_window
|
||||
return
|
||||
|
||||
if char == -1:
|
||||
continue
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import curses
|
||||
import re
|
||||
from unicodedata import east_asian_width
|
||||
|
||||
from contact.ui.colors import get_color
|
||||
from contact.utilities.control_utils import transform_menu_path
|
||||
from typing import Any, Optional, List, Dict
|
||||
@@ -293,9 +295,16 @@ def get_wrapped_help_text(
|
||||
|
||||
return wrapped_help
|
||||
|
||||
def text_width(text: str) -> int:
|
||||
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
|
||||
|
||||
def wrap_text(text: str, wrap_width: int) -> List[str]:
|
||||
"""Wraps text while preserving spaces and breaking long words."""
|
||||
|
||||
whitespace = '\t\n\x0b\x0c\r '
|
||||
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
|
||||
text = text.translate(whitespace_trans)
|
||||
|
||||
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
|
||||
wrapped_lines = []
|
||||
line_buffer = ""
|
||||
@@ -304,11 +313,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
|
||||
wrap_width -= margin
|
||||
|
||||
for word in words:
|
||||
word_length = len(word)
|
||||
word_length = text_width(word)
|
||||
|
||||
if word_length > wrap_width: # Break long words
|
||||
if line_buffer:
|
||||
wrapped_lines.append(line_buffer)
|
||||
wrapped_lines.append(line_buffer.strip())
|
||||
line_buffer = ""
|
||||
line_length = 0
|
||||
for i in range(0, word_length, wrap_width):
|
||||
@@ -316,7 +325,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
|
||||
continue
|
||||
|
||||
if line_length + word_length > wrap_width and word.strip():
|
||||
wrapped_lines.append(line_buffer)
|
||||
wrapped_lines.append(line_buffer.strip())
|
||||
line_buffer = ""
|
||||
line_length = 0
|
||||
|
||||
@@ -324,7 +333,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
|
||||
line_length += word_length
|
||||
|
||||
if line_buffer:
|
||||
wrapped_lines.append(line_buffer)
|
||||
wrapped_lines.append(line_buffer.strip())
|
||||
|
||||
return wrapped_lines
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ class MenuState:
|
||||
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict)
|
||||
menu_path: List[str] = field(default_factory=list)
|
||||
show_save_option: bool = False
|
||||
need_redraw: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -24,6 +25,7 @@ class ChatUIState:
|
||||
selected_message: int = 0
|
||||
selected_node: int = 0
|
||||
current_window: int = 0
|
||||
last_sent_time: float = 0.0
|
||||
|
||||
selected_index: int = 0
|
||||
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
|
||||
|
||||
@@ -4,9 +4,10 @@ import curses
|
||||
from typing import Any, List, Dict
|
||||
|
||||
from contact.ui.colors import get_color, setup_colors, COLOR_MAP
|
||||
from contact.ui.default_config import format_json_single_line_arrays, loaded_config
|
||||
import contact.ui.default_config as config
|
||||
from contact.ui.nav_utils import move_highlight, draw_arrows
|
||||
from contact.utilities.input_handlers import get_list_input
|
||||
from contact.utilities.singleton import menu_state
|
||||
|
||||
|
||||
width = 80
|
||||
@@ -53,12 +54,19 @@ def edit_value(key: str, current_value: str) -> str:
|
||||
# Handle theme selection dynamically
|
||||
if key == "theme":
|
||||
# Load theme names dynamically from the JSON
|
||||
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
|
||||
theme_options = [
|
||||
k.split("_", 2)[2].lower() for k in config.loaded_config.keys() if k.startswith("COLOR_CONFIG")
|
||||
]
|
||||
return get_list_input("Select Theme", current_value, theme_options)
|
||||
|
||||
elif key == "node_sort":
|
||||
sort_options = ["lastHeard", "name", "hops"]
|
||||
return get_list_input("Sort By", current_value, sort_options)
|
||||
|
||||
elif key == "notification_sound":
|
||||
sound_options = ["True", "False"]
|
||||
return get_list_input("Notification Sound", current_value, sound_options)
|
||||
|
||||
# Standard Input Mode (Scrollable)
|
||||
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
|
||||
curses.curs_set(1)
|
||||
@@ -67,41 +75,64 @@ def edit_value(key: str, current_value: str) -> str:
|
||||
user_input = ""
|
||||
input_position = (7, 13) # Tuple for row and column
|
||||
row, col = input_position # Unpack tuple
|
||||
|
||||
while True:
|
||||
visible_text = user_input[scroll_offset : scroll_offset + input_width] # Only show what fits
|
||||
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
|
||||
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
|
||||
if menu_state.need_redraw:
|
||||
curses.update_lines_cols()
|
||||
menu_state.need_redraw = False
|
||||
|
||||
# Re-create the window to fully reset state
|
||||
edit_win = curses.newwin(height, width, start_y, start_x)
|
||||
edit_win.timeout(200)
|
||||
edit_win.bkgd(get_color("background"))
|
||||
edit_win.attrset(get_color("window_frame"))
|
||||
edit_win.border()
|
||||
|
||||
# Redraw static content
|
||||
edit_win.addstr(1, 2, f"Editing {key}", get_color("settings_default", bold=True))
|
||||
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
|
||||
for i, line in enumerate(wrapped_lines[:4]):
|
||||
edit_win.addstr(4 + i, 2, line, get_color("settings_default"))
|
||||
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
|
||||
|
||||
visible_text = user_input[scroll_offset : scroll_offset + input_width]
|
||||
edit_win.addstr(row, col, " " * input_width, get_color("settings_default"))
|
||||
edit_win.addstr(row, col, visible_text, get_color("settings_default"))
|
||||
edit_win.refresh()
|
||||
|
||||
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
|
||||
key = edit_win.get_wch()
|
||||
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width))
|
||||
|
||||
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow
|
||||
try:
|
||||
key = edit_win.get_wch()
|
||||
except curses.error:
|
||||
continue # window not ready — skip this loop
|
||||
|
||||
if key in (chr(27), curses.KEY_LEFT):
|
||||
curses.curs_set(0)
|
||||
return current_value # Exit without returning a value
|
||||
return current_value
|
||||
|
||||
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
|
||||
break
|
||||
|
||||
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
|
||||
if user_input: # Only process if there's something to delete
|
||||
elif key in (curses.KEY_BACKSPACE, chr(127)):
|
||||
if user_input:
|
||||
user_input = user_input[:-1]
|
||||
if scroll_offset > 0 and len(user_input) < scroll_offset + input_width:
|
||||
scroll_offset -= 1 # Move back if text is shorter than scrolled area
|
||||
scroll_offset -= 1
|
||||
else:
|
||||
if isinstance(key, str):
|
||||
user_input += key
|
||||
else:
|
||||
user_input += chr(key)
|
||||
|
||||
if len(user_input) > input_width: # Scroll if input exceeds visible area
|
||||
if len(user_input) > input_width:
|
||||
scroll_offset += 1
|
||||
|
||||
curses.curs_set(0)
|
||||
return user_input if user_input else current_value
|
||||
|
||||
|
||||
def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
|
||||
def display_menu() -> tuple[Any, Any, List[str]]:
|
||||
"""
|
||||
Render the configuration menu with a Save button directly added to the window.
|
||||
"""
|
||||
@@ -184,6 +215,7 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
|
||||
def json_editor(stdscr: curses.window, menu_state: Any) -> None:
|
||||
|
||||
menu_state.selected_index = 0 # Track the selected option
|
||||
made_changes = False # Track if any changes were made
|
||||
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
|
||||
@@ -206,16 +238,18 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
|
||||
menu_state.current_menu = data # Track the current level of the menu
|
||||
|
||||
# Render the menu
|
||||
menu_win, menu_pad, options = display_menu(menu_state)
|
||||
need_redraw = True
|
||||
menu_win, menu_pad, options = display_menu()
|
||||
menu_state.need_redraw = True
|
||||
|
||||
while True:
|
||||
if need_redraw:
|
||||
menu_win, menu_pad, options = display_menu(menu_state)
|
||||
if menu_state.need_redraw:
|
||||
menu_state.need_redraw = False
|
||||
menu_win, menu_pad, options = display_menu()
|
||||
menu_win.refresh()
|
||||
need_redraw = False
|
||||
|
||||
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
|
||||
|
||||
menu_win.timeout(200)
|
||||
key = menu_win.getch()
|
||||
|
||||
if key == curses.KEY_UP:
|
||||
@@ -243,7 +277,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
|
||||
|
||||
elif key in (curses.KEY_RIGHT, 10, 13): # 10 = \n, 13 = carriage return
|
||||
|
||||
need_redraw = True
|
||||
menu_state.need_redraw = True
|
||||
menu_win.erase()
|
||||
menu_win.refresh()
|
||||
|
||||
@@ -264,11 +298,14 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
|
||||
|
||||
if isinstance(selected_data, list) and len(selected_data) == 2:
|
||||
# Edit color pair
|
||||
old = selected_data
|
||||
new_value = edit_color_pair(selected_key, selected_data)
|
||||
menu_state.menu_path.pop()
|
||||
menu_state.start_index.pop()
|
||||
menu_state.menu_index.pop()
|
||||
menu_state.current_menu[selected_key] = new_value
|
||||
if new_value != old:
|
||||
made_changes = True
|
||||
|
||||
elif isinstance(selected_data, (dict, list)):
|
||||
# Navigate into nested data
|
||||
@@ -277,22 +314,26 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
|
||||
|
||||
else:
|
||||
# General value editing
|
||||
old = selected_data
|
||||
new_value = edit_value(selected_key, selected_data)
|
||||
menu_state.menu_path.pop()
|
||||
menu_state.menu_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
menu_state.current_menu[selected_key] = new_value
|
||||
need_redraw = True
|
||||
menu_state.need_redraw = True
|
||||
if new_value != old:
|
||||
made_changes = True
|
||||
|
||||
else:
|
||||
# Save button selected
|
||||
save_json(file_path, data)
|
||||
stdscr.refresh()
|
||||
# config.reload() # This isn't refreshing the file paths as expected
|
||||
continue
|
||||
|
||||
elif key in (27, curses.KEY_LEFT): # Escape or Left Arrow
|
||||
|
||||
need_redraw = True
|
||||
menu_state.need_redraw = True
|
||||
menu_win.erase()
|
||||
menu_win.refresh()
|
||||
|
||||
@@ -313,6 +354,19 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
|
||||
|
||||
else:
|
||||
# Exit the editor
|
||||
if made_changes:
|
||||
save_prompt = get_list_input(
|
||||
"You have unsaved changes. Save before exiting?",
|
||||
None,
|
||||
["Yes", "No", "Cancel"],
|
||||
mandatory=True,
|
||||
)
|
||||
if save_prompt == "Cancel":
|
||||
continue # Stay in the menu without doing anything
|
||||
elif save_prompt == "Yes":
|
||||
save_json(file_path, data)
|
||||
made_changes = False
|
||||
|
||||
menu_win.clear()
|
||||
menu_win.refresh()
|
||||
|
||||
@@ -320,7 +374,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
|
||||
|
||||
|
||||
def save_json(file_path: str, data: Dict[str, Any]) -> None:
|
||||
formatted_json = format_json_single_line_arrays(data)
|
||||
formatted_json = config.format_json_single_line_arrays(data)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(formatted_json)
|
||||
setup_colors(reinit=True)
|
||||
@@ -329,7 +383,6 @@ def save_json(file_path: str, data: Dict[str, Any]) -> None:
|
||||
def main(stdscr: curses.window) -> None:
|
||||
from contact.ui.ui_state import MenuState
|
||||
|
||||
menu_state = MenuState()
|
||||
if len(menu_state.menu_path) == 0:
|
||||
menu_state.menu_path = ["App Settings"] # Initialize if not set
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import yaml
|
||||
import logging
|
||||
import time
|
||||
from typing import List
|
||||
from google.protobuf.json_format import MessageToDict
|
||||
from meshtastic import mt_config
|
||||
@@ -133,24 +134,29 @@ def config_import(interface, filename):
|
||||
logging.info(f"Setting device owner to {configuration['owner']}")
|
||||
waitForAckNak = True
|
||||
interface.getNode("^local", False).setOwner(configuration["owner"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "owner_short" in configuration:
|
||||
logging.info(f"Setting device owner short to {configuration['owner_short']}")
|
||||
waitForAckNak = True
|
||||
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "ownerShort" in configuration:
|
||||
logging.info(f"Setting device owner short to {configuration['ownerShort']}")
|
||||
waitForAckNak = True
|
||||
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "channel_url" in configuration:
|
||||
logging.info(f"Setting channel url to {configuration['channel_url']}")
|
||||
interface.getNode("^local").setURL(configuration["channel_url"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "channelUrl" in configuration:
|
||||
logging.info(f"Setting channel url to {configuration['channelUrl']}")
|
||||
interface.getNode("^local").setURL(configuration["channelUrl"])
|
||||
time.sleep(0.5)
|
||||
|
||||
if "location" in configuration:
|
||||
alt = 0
|
||||
@@ -169,12 +175,14 @@ def config_import(interface, filename):
|
||||
logging.info(f"Fixing longitude at {lon} degrees")
|
||||
logging.info("Setting device position")
|
||||
interface.localNode.setFixedPosition(lat, lon, alt)
|
||||
time.sleep(0.5)
|
||||
|
||||
if "config" in configuration:
|
||||
localConfig = interface.getNode("^local").localConfig
|
||||
for section in configuration["config"]:
|
||||
traverseConfig(section, configuration["config"][section], localConfig)
|
||||
interface.getNode("^local").writeConfig(camel_to_snake(section))
|
||||
time.sleep(0.5)
|
||||
|
||||
if "module_config" in configuration:
|
||||
moduleConfig = interface.getNode("^local").moduleConfig
|
||||
@@ -185,6 +193,7 @@ def config_import(interface, filename):
|
||||
moduleConfig,
|
||||
)
|
||||
interface.getNode("^local").writeConfig(camel_to_snake(section))
|
||||
time.sleep(0.5)
|
||||
|
||||
interface.getNode("^local", False).commitSettingsTransaction()
|
||||
logging.info("Writing modified configuration to device")
|
||||
|
||||
@@ -116,7 +116,14 @@ def load_messages_from_db() -> None:
|
||||
|
||||
# Add messages to ui_state.all_messages grouped by hourly timestamp
|
||||
hourly_messages = {}
|
||||
for user_id, message, timestamp, ack_type in db_messages:
|
||||
for row in db_messages:
|
||||
user_id, message, timestamp, ack_type = row
|
||||
|
||||
# Only ack_type is allowed to be None
|
||||
if user_id is None or message is None or timestamp is None:
|
||||
logging.warning(f"Skipping row with NULL required field(s): {row}")
|
||||
continue
|
||||
|
||||
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
|
||||
if hour not in hourly_messages:
|
||||
hourly_messages[hour] = []
|
||||
@@ -130,11 +137,13 @@ def load_messages_from_db() -> None:
|
||||
ack_str = config.nak_str
|
||||
|
||||
if user_id == str(interface_state.myNodeNum):
|
||||
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
|
||||
sanitized_message = message.replace("\x00", "")
|
||||
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", sanitized_message)
|
||||
else:
|
||||
sanitized_message = message.replace("\x00", "")
|
||||
formatted_message = (
|
||||
f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ",
|
||||
message,
|
||||
sanitized_message,
|
||||
)
|
||||
|
||||
hourly_messages[hour].append(formatted_message)
|
||||
|
||||
@@ -6,10 +6,44 @@ from typing import Any, Optional, List
|
||||
|
||||
from contact.ui.colors import get_color
|
||||
from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text
|
||||
from contact.ui.dialog import dialog
|
||||
from contact.utilities.validation_rules import get_validation_for
|
||||
from contact.utilities.singleton import menu_state
|
||||
|
||||
|
||||
def get_text_input(prompt: str) -> Optional[str]:
|
||||
def invalid_input(window: curses.window, message: str, redraw_func: Optional[callable] = None) -> None:
|
||||
"""Displays an invalid input message in the given window and redraws if needed."""
|
||||
cursor_y, cursor_x = window.getyx()
|
||||
curses.curs_set(0)
|
||||
dialog("Invalid Input", message)
|
||||
if redraw_func:
|
||||
redraw_func() # Redraw the original window content that got obscured
|
||||
else:
|
||||
window.refresh()
|
||||
window.move(cursor_y, cursor_x)
|
||||
curses.curs_set(1)
|
||||
|
||||
|
||||
def get_text_input(prompt: str, selected_config: str, input_type: str) -> Optional[str]:
|
||||
"""Handles user input with wrapped text for long prompts."""
|
||||
|
||||
def redraw_input_win():
|
||||
"""Redraw the input window with the current prompt and user input."""
|
||||
input_win.erase()
|
||||
input_win.border()
|
||||
row = 1
|
||||
for line in wrapped_prompt:
|
||||
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
|
||||
row += 1
|
||||
if row >= height - 3:
|
||||
break
|
||||
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
|
||||
input_win.addstr(row + 1, col_start, user_input[:first_line_width], get_color("settings_default"))
|
||||
for i, line in enumerate(wrap_text(user_input[first_line_width:], wrap_width=input_width)):
|
||||
if row + 2 + i < height - 1:
|
||||
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
|
||||
input_win.refresh()
|
||||
|
||||
height = 8
|
||||
width = 80
|
||||
margin = 2 # Left and right margin
|
||||
@@ -20,6 +54,7 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
start_x = (curses.COLS - width) // 2
|
||||
|
||||
input_win = curses.newwin(height, width, start_y, start_x)
|
||||
input_win.timeout(200)
|
||||
input_win.bkgd(get_color("background"))
|
||||
input_win.attrset(get_color("window_frame"))
|
||||
input_win.border()
|
||||
@@ -27,6 +62,7 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
# Wrap the prompt text
|
||||
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
|
||||
row = 1
|
||||
|
||||
for line in wrapped_prompt:
|
||||
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
|
||||
row += 1
|
||||
@@ -39,34 +75,125 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
input_win.refresh()
|
||||
curses.curs_set(1)
|
||||
|
||||
max_length = 4 if "shortName" in prompt else None
|
||||
user_input = ""
|
||||
min_value = 0
|
||||
max_value = 4294967295
|
||||
min_length = 0
|
||||
max_length = None
|
||||
|
||||
# Start user input after the prompt text
|
||||
if selected_config is not None:
|
||||
validation = get_validation_for(selected_config) or {}
|
||||
min_value = validation.get("min_value", 0)
|
||||
max_value = validation.get("max_value", 4294967295)
|
||||
min_length = validation.get("min_length", 0)
|
||||
max_length = validation.get("max_length")
|
||||
|
||||
user_input = ""
|
||||
col_start = margin + len(prompt_text)
|
||||
first_line_width = input_width - len(prompt_text) # Available space for first line
|
||||
first_line_width = input_width - len(prompt_text)
|
||||
|
||||
while True:
|
||||
key = input_win.get_wch() # Waits for user input
|
||||
if menu_state.need_redraw:
|
||||
menu_state.need_redraw = False
|
||||
redraw_input_win()
|
||||
|
||||
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
|
||||
try:
|
||||
key = input_win.get_wch()
|
||||
except curses.error:
|
||||
continue
|
||||
|
||||
if key == chr(27) or key == curses.KEY_LEFT:
|
||||
input_win.erase()
|
||||
input_win.refresh()
|
||||
curses.curs_set(0)
|
||||
return None # Exit without saving
|
||||
menu_state.need_redraw = True
|
||||
return None
|
||||
|
||||
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): # Enter key
|
||||
break
|
||||
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
|
||||
menu_state.need_redraw = True
|
||||
|
||||
if not user_input.strip():
|
||||
invalid_input(input_win, "Value cannot be empty.", redraw_func=redraw_input_win)
|
||||
continue
|
||||
|
||||
length = len(user_input)
|
||||
if min_length == max_length and max_length is not None:
|
||||
if length != min_length:
|
||||
invalid_input(
|
||||
input_win, f"Value must be exactly {min_length} characters long.", redraw_func=redraw_input_win
|
||||
)
|
||||
continue
|
||||
else:
|
||||
if length < min_length:
|
||||
invalid_input(
|
||||
input_win,
|
||||
f"Value must be at least {min_length} characters long.",
|
||||
redraw_func=redraw_input_win,
|
||||
)
|
||||
continue
|
||||
if max_length is not None and length > max_length:
|
||||
invalid_input(
|
||||
input_win,
|
||||
f"Value must be no more than {max_length} characters long.",
|
||||
redraw_func=redraw_input_win,
|
||||
)
|
||||
continue
|
||||
|
||||
if input_type is int:
|
||||
if not user_input.isdigit():
|
||||
invalid_input(input_win, "Only numeric digits (0–9) allowed.", redraw_func=redraw_input_win)
|
||||
continue
|
||||
|
||||
int_val = int(user_input)
|
||||
if not (min_value <= int_val <= max_value):
|
||||
invalid_input(
|
||||
input_win, f"Enter a number between {min_value} and {max_value}.", redraw_func=redraw_input_win
|
||||
)
|
||||
continue
|
||||
|
||||
curses.curs_set(0)
|
||||
return int_val
|
||||
|
||||
elif input_type is float:
|
||||
try:
|
||||
float_val = float(user_input)
|
||||
if not (min_value <= float_val <= max_value):
|
||||
invalid_input(
|
||||
input_win,
|
||||
f"Enter a number between {min_value} and {max_value}.",
|
||||
redraw_func=redraw_input_win,
|
||||
)
|
||||
continue
|
||||
except ValueError:
|
||||
invalid_input(input_win, "Must be a valid floating point number.", redraw_func=redraw_input_win)
|
||||
continue
|
||||
else:
|
||||
curses.curs_set(0)
|
||||
return float_val
|
||||
|
||||
else:
|
||||
break
|
||||
|
||||
elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace
|
||||
if user_input:
|
||||
user_input = user_input[:-1] # Remove last character
|
||||
|
||||
elif max_length is None or len(user_input) < max_length: # Enforce max length
|
||||
if isinstance(key, str):
|
||||
user_input += key
|
||||
else:
|
||||
user_input += chr(key)
|
||||
elif max_length is None or len(user_input) < max_length:
|
||||
try:
|
||||
char = chr(key) if not isinstance(key, str) else key
|
||||
if input_type is int:
|
||||
if char.isdigit() or (char == "-" and len(user_input) == 0):
|
||||
user_input += char
|
||||
elif input_type is float:
|
||||
if (
|
||||
char.isdigit()
|
||||
or (char == "." and "." not in user_input)
|
||||
or (char == "-" and len(user_input) == 0)
|
||||
):
|
||||
user_input += char
|
||||
else:
|
||||
user_input += char
|
||||
except ValueError:
|
||||
pass # Ignore invalid input
|
||||
|
||||
# First line must be manually handled before using wrap_text()
|
||||
first_line = user_input[:first_line_width] # Cut to max first line width
|
||||
@@ -95,10 +222,12 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
curses.curs_set(0)
|
||||
input_win.erase()
|
||||
input_win.refresh()
|
||||
return user_input
|
||||
return user_input.strip()
|
||||
|
||||
|
||||
def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
"""Handles user input for editing up to 3 Admin Keys in Base64 format."""
|
||||
|
||||
def to_base64(byte_strings):
|
||||
"""Convert byte values to Base64-encoded strings."""
|
||||
return [base64.b64encode(b).decode() for b in byte_strings]
|
||||
@@ -119,10 +248,11 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
start_y = (curses.LINES - height) // 2
|
||||
start_x = (curses.COLS - width) // 2
|
||||
|
||||
repeated_win = curses.newwin(height, width, start_y, start_x)
|
||||
repeated_win.bkgd(get_color("background"))
|
||||
repeated_win.attrset(get_color("window_frame"))
|
||||
repeated_win.keypad(True) # Enable keypad for special keys
|
||||
admin_key_win = curses.newwin(height, width, start_y, start_x)
|
||||
admin_key_win.timeout(200)
|
||||
admin_key_win.bkgd(get_color("background"))
|
||||
admin_key_win.attrset(get_color("window_frame"))
|
||||
admin_key_win.keypad(True) # Enable keypad for special keys
|
||||
|
||||
curses.echo()
|
||||
curses.curs_set(1)
|
||||
@@ -130,46 +260,48 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
# Editable list of values (max 3 values)
|
||||
user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields
|
||||
cursor_pos = 0 # Track which value is being edited
|
||||
error_message = ""
|
||||
invalid_input = ""
|
||||
|
||||
while True:
|
||||
repeated_win.erase()
|
||||
repeated_win.border()
|
||||
repeated_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
|
||||
admin_key_win.erase()
|
||||
admin_key_win.border()
|
||||
admin_key_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
|
||||
|
||||
# Display current values, allowing editing
|
||||
for i, line in enumerate(user_values):
|
||||
prefix = "→ " if i == cursor_pos else " " # Highlight the current line
|
||||
repeated_win.addstr(
|
||||
admin_key_win.addstr(
|
||||
3 + i, 2, f"{prefix}Admin Key {i + 1}: ", get_color("settings_default", bold=(i == cursor_pos))
|
||||
)
|
||||
repeated_win.addstr(3 + i, 18, line) # Align text for easier editing
|
||||
admin_key_win.addstr(3 + i, 18, line) # Align text for easier editing
|
||||
|
||||
# Move cursor to the correct position inside the field
|
||||
curses.curs_set(1)
|
||||
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
|
||||
admin_key_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
|
||||
|
||||
# Show error message if needed
|
||||
if error_message:
|
||||
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
|
||||
if invalid_input:
|
||||
admin_key_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
|
||||
|
||||
repeated_win.refresh()
|
||||
key = repeated_win.getch()
|
||||
admin_key_win.refresh()
|
||||
key = admin_key_win.getch()
|
||||
|
||||
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
|
||||
repeated_win.erase()
|
||||
repeated_win.refresh()
|
||||
admin_key_win.erase()
|
||||
admin_key_win.refresh()
|
||||
curses.noecho()
|
||||
curses.curs_set(0)
|
||||
menu_state.need_redraw = True
|
||||
return None
|
||||
|
||||
elif key == ord("\n"): # Enter key to save and return
|
||||
menu_state.need_redraw = True
|
||||
if all(is_valid_base64(val) for val in user_values): # Ensure all values are valid Base64 and 32 bytes
|
||||
curses.noecho()
|
||||
curses.curs_set(0)
|
||||
return user_values # Return the edited Base64 values
|
||||
else:
|
||||
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
|
||||
invalid_input = "Error: Each key must be valid Base64 and 32 bytes long!"
|
||||
elif key == curses.KEY_UP: # Move cursor up
|
||||
cursor_pos = (cursor_pos - 1) % len(user_values)
|
||||
elif key == curses.KEY_DOWN: # Move cursor down
|
||||
@@ -180,11 +312,14 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
else:
|
||||
try:
|
||||
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
|
||||
error_message = "" # Clear error if user starts fixing input
|
||||
invalid_input = "" # Clear error if user starts fixing input
|
||||
except ValueError:
|
||||
pass # Ignore invalid character inputs
|
||||
|
||||
|
||||
from contact.utilities.singleton import menu_state # Required if not already imported
|
||||
|
||||
|
||||
def get_repeated_input(current_value: List[str]) -> Optional[str]:
|
||||
height = 9
|
||||
width = 80
|
||||
@@ -192,71 +327,82 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
|
||||
start_x = (curses.COLS - width) // 2
|
||||
|
||||
repeated_win = curses.newwin(height, width, start_y, start_x)
|
||||
repeated_win.timeout(200)
|
||||
repeated_win.bkgd(get_color("background"))
|
||||
repeated_win.attrset(get_color("window_frame"))
|
||||
repeated_win.keypad(True) # Enable keypad for special keys
|
||||
repeated_win.keypad(True)
|
||||
|
||||
curses.echo()
|
||||
curses.curs_set(1) # Show the cursor
|
||||
curses.curs_set(1)
|
||||
|
||||
# Editable list of values (max 3 values)
|
||||
user_values = current_value[:3]
|
||||
cursor_pos = 0 # Track which value is being edited
|
||||
error_message = ""
|
||||
user_values = current_value[:3] + [""] * (3 - len(current_value)) # Always 3 fields
|
||||
cursor_pos = 0
|
||||
invalid_input = ""
|
||||
|
||||
while True:
|
||||
def redraw():
|
||||
repeated_win.erase()
|
||||
repeated_win.border()
|
||||
repeated_win.addstr(1, 2, "Edit up to 3 Values:", get_color("settings_default", bold=True))
|
||||
|
||||
# Display current values, allowing editing
|
||||
for i, line in enumerate(user_values):
|
||||
prefix = "→ " if i == cursor_pos else " " # Highlight the current line
|
||||
prefix = "→ " if i == cursor_pos else " "
|
||||
repeated_win.addstr(
|
||||
3 + i, 2, f"{prefix}Value{i + 1}: ", get_color("settings_default", bold=(i == cursor_pos))
|
||||
)
|
||||
repeated_win.addstr(3 + i, 18, line)
|
||||
repeated_win.addstr(3 + i, 18, line[: width - 20]) # Prevent overflow
|
||||
|
||||
# Move cursor to the correct position inside the field
|
||||
curses.curs_set(1)
|
||||
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
|
||||
|
||||
# Show error message if needed
|
||||
if error_message:
|
||||
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
|
||||
if invalid_input:
|
||||
repeated_win.addstr(7, 2, invalid_input[: width - 4], get_color("settings_default", bold=True))
|
||||
|
||||
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos]))
|
||||
repeated_win.refresh()
|
||||
key = repeated_win.getch()
|
||||
|
||||
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
|
||||
while True:
|
||||
if menu_state.need_redraw:
|
||||
menu_state.need_redraw = False
|
||||
redraw()
|
||||
|
||||
redraw()
|
||||
|
||||
try:
|
||||
key = repeated_win.get_wch()
|
||||
except curses.error:
|
||||
continue # ignore timeout or input issues
|
||||
|
||||
if key in (27, curses.KEY_LEFT): # ESC or Left Arrow
|
||||
repeated_win.erase()
|
||||
repeated_win.refresh()
|
||||
curses.noecho()
|
||||
curses.curs_set(0)
|
||||
menu_state.need_redraw = True
|
||||
return None
|
||||
|
||||
elif key == ord("\n"): # Enter key to save and return
|
||||
elif key in ("\n", curses.KEY_ENTER):
|
||||
curses.noecho()
|
||||
curses.curs_set(0)
|
||||
return ", ".join(user_values)
|
||||
elif key == curses.KEY_UP: # Move cursor up
|
||||
cursor_pos = (cursor_pos - 1) % len(user_values)
|
||||
elif key == curses.KEY_DOWN: # Move cursor down
|
||||
cursor_pos = (cursor_pos + 1) % len(user_values)
|
||||
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
|
||||
if len(user_values[cursor_pos]) > 0:
|
||||
user_values[cursor_pos] = user_values[cursor_pos][:-1] # Remove last character
|
||||
menu_state.need_redraw = True
|
||||
return ", ".join(user_values).strip()
|
||||
elif key == curses.KEY_UP:
|
||||
cursor_pos = (cursor_pos - 1) % 3
|
||||
elif key == curses.KEY_DOWN:
|
||||
cursor_pos = (cursor_pos + 1) % 3
|
||||
elif key in (curses.KEY_BACKSPACE, 127):
|
||||
user_values[cursor_pos] = user_values[cursor_pos][:-1]
|
||||
else:
|
||||
try:
|
||||
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
|
||||
error_message = "" # Clear error if user starts fixing input
|
||||
except ValueError:
|
||||
pass # Ignore invalid character inputs
|
||||
ch = chr(key) if isinstance(key, int) else key
|
||||
if ch.isprintable():
|
||||
user_values[cursor_pos] += ch
|
||||
invalid_input = ""
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
from contact.utilities.singleton import menu_state # Ensure this is imported
|
||||
|
||||
|
||||
def get_fixed32_input(current_value: int) -> int:
|
||||
cvalue = current_value
|
||||
current_value = str(ipaddress.IPv4Address(current_value))
|
||||
original_value = current_value
|
||||
ip_string = str(ipaddress.IPv4Address(current_value))
|
||||
height = 10
|
||||
width = 80
|
||||
start_y = (curses.LINES - height) // 2
|
||||
@@ -266,54 +412,73 @@ def get_fixed32_input(current_value: int) -> int:
|
||||
fixed32_win.bkgd(get_color("background"))
|
||||
fixed32_win.attrset(get_color("window_frame"))
|
||||
fixed32_win.keypad(True)
|
||||
fixed32_win.timeout(200)
|
||||
|
||||
curses.echo()
|
||||
curses.curs_set(1)
|
||||
user_input = ""
|
||||
|
||||
while True:
|
||||
def redraw():
|
||||
fixed32_win.erase()
|
||||
fixed32_win.border()
|
||||
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD)
|
||||
fixed32_win.addstr(3, 2, f"Current: {current_value}")
|
||||
fixed32_win.addstr(5, 2, f"New value: {user_input}")
|
||||
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", get_color("settings_default", bold=True))
|
||||
fixed32_win.addstr(3, 2, f"Current: {ip_string}", get_color("settings_default"))
|
||||
fixed32_win.addstr(5, 2, f"New value: {user_input}", get_color("settings_default"))
|
||||
fixed32_win.refresh()
|
||||
|
||||
key = fixed32_win.getch()
|
||||
while True:
|
||||
if menu_state.need_redraw:
|
||||
menu_state.need_redraw = False
|
||||
redraw()
|
||||
|
||||
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow to cancel
|
||||
redraw()
|
||||
|
||||
try:
|
||||
key = fixed32_win.get_wch()
|
||||
except curses.error:
|
||||
continue # ignore timeout
|
||||
|
||||
if key in (27, curses.KEY_LEFT): # ESC or Left Arrow to cancel
|
||||
fixed32_win.erase()
|
||||
fixed32_win.refresh()
|
||||
curses.noecho()
|
||||
curses.curs_set(0)
|
||||
return cvalue # Return the current value unchanged
|
||||
elif key == ord("\n"): # Enter key to validate and save
|
||||
# Validate IP address
|
||||
menu_state.need_redraw = True
|
||||
return original_value
|
||||
|
||||
elif key in ("\n", curses.KEY_ENTER):
|
||||
octets = user_input.split(".")
|
||||
menu_state.need_redraw = True
|
||||
if len(octets) == 4 and all(octet.isdigit() and 0 <= int(octet) <= 255 for octet in octets):
|
||||
curses.noecho()
|
||||
curses.curs_set(0)
|
||||
fixed32_address = ipaddress.ip_address(user_input)
|
||||
return int(fixed32_address) # Return the valid IP address
|
||||
return int(ipaddress.ip_address(user_input))
|
||||
else:
|
||||
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", curses.A_BOLD | curses.color_pair(5))
|
||||
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", get_color("settings_default", bold=True))
|
||||
fixed32_win.refresh()
|
||||
curses.napms(1500) # Wait for 1.5 seconds before refreshing
|
||||
user_input = "" # Clear invalid input
|
||||
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
|
||||
curses.napms(1500)
|
||||
user_input = ""
|
||||
|
||||
elif key in (curses.KEY_BACKSPACE, 127):
|
||||
user_input = user_input[:-1]
|
||||
|
||||
else:
|
||||
try:
|
||||
char = chr(key)
|
||||
if char.isdigit() or char == ".":
|
||||
user_input += char # Append only valid characters (digits or dots)
|
||||
except ValueError:
|
||||
pass # Ignore invalid inputs
|
||||
ch = chr(key) if isinstance(key, int) else key
|
||||
if ch.isdigit() or ch == ".":
|
||||
user_input += ch
|
||||
except Exception:
|
||||
pass # Ignore unprintable inputs
|
||||
|
||||
|
||||
def get_list_input(prompt: str, current_option: Optional[str], list_options: List[str]) -> Optional[str]:
|
||||
from typing import List, Optional # ensure Optional is imported
|
||||
|
||||
|
||||
def get_list_input(
|
||||
prompt: str, current_option: Optional[str], list_options: List[str], mandatory: bool = False
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Displays a scrollable list of list_options for the user to choose from.
|
||||
List selector.
|
||||
"""
|
||||
selected_index = list_options.index(current_option) if current_option in list_options else 0
|
||||
|
||||
@@ -323,6 +488,7 @@ def get_list_input(prompt: str, current_option: Optional[str], list_options: Lis
|
||||
start_x = (curses.COLS - width) // 2
|
||||
|
||||
list_win = curses.newwin(height, width, start_y, start_x)
|
||||
list_win.timeout(200)
|
||||
list_win.bkgd(get_color("background"))
|
||||
list_win.attrset(get_color("window_frame"))
|
||||
list_win.keypad(True)
|
||||
@@ -330,50 +496,62 @@ def get_list_input(prompt: str, current_option: Optional[str], list_options: Lis
|
||||
list_pad = curses.newpad(len(list_options) + 1, width - 8)
|
||||
list_pad.bkgd(get_color("background"))
|
||||
|
||||
# Render header
|
||||
list_win.erase()
|
||||
list_win.border()
|
||||
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
|
||||
|
||||
# Render options on the pad
|
||||
for idx, color in enumerate(list_options):
|
||||
if idx == selected_index:
|
||||
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
|
||||
else:
|
||||
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
|
||||
|
||||
# Initial refresh
|
||||
list_win.refresh()
|
||||
list_pad.refresh(
|
||||
0,
|
||||
0,
|
||||
list_win.getbegyx()[0] + 3,
|
||||
list_win.getbegyx()[1] + 4,
|
||||
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
|
||||
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
|
||||
)
|
||||
|
||||
max_index = len(list_options) - 1
|
||||
visible_height = list_win.getmaxyx()[0] - 5
|
||||
|
||||
draw_arrows(list_win, visible_height, max_index, [0], show_save_option=False) # Initial call to draw arrows
|
||||
def redraw_list_ui():
|
||||
list_win.erase()
|
||||
list_win.border()
|
||||
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
|
||||
|
||||
for idx, item in enumerate(list_options):
|
||||
color = get_color("settings_default", reverse=(idx == selected_index))
|
||||
list_pad.addstr(idx, 0, item.ljust(width - 8), color)
|
||||
|
||||
list_win.refresh()
|
||||
list_pad.refresh(
|
||||
0,
|
||||
0,
|
||||
list_win.getbegyx()[0] + 3,
|
||||
list_win.getbegyx()[1] + 4,
|
||||
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
|
||||
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
|
||||
)
|
||||
draw_arrows(list_win, visible_height, max_index, [0], show_save_option=False)
|
||||
|
||||
# Initial draw
|
||||
redraw_list_ui()
|
||||
|
||||
while True:
|
||||
key = list_win.getch()
|
||||
if menu_state.need_redraw:
|
||||
menu_state.need_redraw = False
|
||||
redraw_list_ui()
|
||||
|
||||
try:
|
||||
key = list_win.getch()
|
||||
except curses.error:
|
||||
continue
|
||||
|
||||
if key == curses.KEY_UP:
|
||||
old_selected_index = selected_index
|
||||
selected_index = max(0, selected_index - 1)
|
||||
move_highlight(old_selected_index, list_options, list_win, list_pad, selected_index=selected_index)
|
||||
|
||||
elif key == curses.KEY_DOWN:
|
||||
old_selected_index = selected_index
|
||||
selected_index = min(len(list_options) - 1, selected_index + 1)
|
||||
move_highlight(old_selected_index, list_options, list_win, list_pad, selected_index=selected_index)
|
||||
elif key == ord("\n"): # Enter key
|
||||
|
||||
elif key == ord("\n"): # Enter
|
||||
list_win.clear()
|
||||
list_win.refresh()
|
||||
menu_state.need_redraw = True
|
||||
return list_options[selected_index]
|
||||
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
|
||||
|
||||
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left
|
||||
if mandatory:
|
||||
continue
|
||||
list_win.clear()
|
||||
list_win.refresh()
|
||||
menu_state.need_redraw = True
|
||||
return current_option
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState
|
||||
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState, MenuState
|
||||
|
||||
ui_state = ChatUIState()
|
||||
interface_state = InterfaceState()
|
||||
app_state = AppState()
|
||||
menu_state = MenuState()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import datetime
|
||||
import time
|
||||
from meshtastic.protobuf import config_pb2
|
||||
import contact.ui.default_config as config
|
||||
|
||||
@@ -134,3 +135,31 @@ def get_time_ago(timestamp):
|
||||
if unit != "s":
|
||||
return f"{value} {unit} ago"
|
||||
return "now"
|
||||
|
||||
def add_new_message(channel_id, prefix, message):
|
||||
if channel_id not in ui_state.all_messages:
|
||||
ui_state.all_messages[channel_id] = []
|
||||
|
||||
# Timestamp handling
|
||||
current_timestamp = time.time()
|
||||
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||
|
||||
# Retrieve the last timestamp if available
|
||||
channel_messages = ui_state.all_messages[channel_id]
|
||||
if channel_messages:
|
||||
# Check the last entry for a timestamp
|
||||
for entry in reversed(channel_messages):
|
||||
if entry[0].startswith("--"):
|
||||
last_hour = entry[0].strip("- ").strip()
|
||||
break
|
||||
else:
|
||||
last_hour = None
|
||||
else:
|
||||
last_hour = None
|
||||
|
||||
# Add a new timestamp if it's a new hour
|
||||
if last_hour != current_hour:
|
||||
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
||||
|
||||
# Add the message
|
||||
ui_state.all_messages[channel_id].append((prefix,message))
|
||||
|
||||
23
contact/utilities/validation_rules.py
Normal file
23
contact/utilities/validation_rules.py
Normal file
@@ -0,0 +1,23 @@
|
||||
validation_rules = {
|
||||
"shortName": {"max_length": 4},
|
||||
"longName": {"max_length": 32},
|
||||
"fixed_pin": {"min_length": 6, "max_length": 6},
|
||||
"position_flags": {"max_length": 3},
|
||||
"enabled_protocols": {"max_value": 2},
|
||||
"hop_limit": {"max_value": 7},
|
||||
"latitude": {"min_value": -90, "max_value": 90},
|
||||
"longitude": {"min_value": -180, "max_value": 180},
|
||||
"altitude": {"min_value": -4294967295, "max_value": 4294967295},
|
||||
"red": {"max_value": 255},
|
||||
"green": {"max_value": 255},
|
||||
"blue": {"max_value": 255},
|
||||
"current": {"max_value": 255},
|
||||
"position_precision": {"max_value": 32},
|
||||
}
|
||||
|
||||
|
||||
def get_validation_for(key: str) -> dict:
|
||||
for rule_key, config in validation_rules.items():
|
||||
if rule_key in key:
|
||||
return config
|
||||
return {}
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "contact"
|
||||
version = "1.3.10"
|
||||
version = "1.3.16"
|
||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||
authors = [
|
||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||
|
||||
Reference in New Issue
Block a user