mirror of
https://github.com/pdxlocations/contact.git
synced 2026-03-28 17:12:35 +01:00
Compare commits
29 Commits
1.3.14
...
input-vali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cf4137347f | ||
|
|
12f81f6af6 | ||
|
|
684d298fac | ||
|
|
4179a3f8d0 | ||
|
|
7a61808f47 | ||
|
|
a8680ac0ed | ||
|
|
818575939a | ||
|
|
acaad849b0 | ||
|
|
0b25dda4af | ||
|
|
18329f0128 | ||
|
|
4378f3045c | ||
|
|
a451d1d7d6 | ||
|
|
fe1f027219 | ||
|
|
43435cbe04 | ||
|
|
fe98075582 | ||
|
|
8716ea6fe1 | ||
|
|
a8bdcbb7e6 | ||
|
|
02742b27f3 | ||
|
|
ae028032a0 | ||
|
|
30402f4906 | ||
|
|
324e0b03e7 | ||
|
|
056db12911 | ||
|
|
685a2d4bf8 | ||
|
|
6ed0cc8c9f | ||
|
|
fc208a9258 | ||
|
|
eaf9381bca | ||
|
|
367af9044c | ||
|
|
d8183d9009 | ||
|
|
3fb1335be3 |
@@ -66,12 +66,8 @@ def prompt_region_if_unset(args: object) -> None:
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
|
||||
def initialize_globals(args: object) -> None:
|
||||
def initialize_globals() -> None:
|
||||
"""Initializes interface and shared globals."""
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
||||
prompt_region_if_unset(args)
|
||||
|
||||
interface_state.myNodeNum = get_nodeNum()
|
||||
ui_state.channel_list = get_channels()
|
||||
@@ -84,55 +80,63 @@ def initialize_globals(args: object) -> None:
|
||||
|
||||
def main(stdscr: curses.window) -> None:
|
||||
"""Main entry point for the curses UI."""
|
||||
|
||||
output_capture = io.StringIO()
|
||||
|
||||
try:
|
||||
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
||||
setup_colors()
|
||||
draw_splash(stdscr)
|
||||
setup_colors()
|
||||
draw_splash(stdscr)
|
||||
|
||||
args = setup_parser().parse_args()
|
||||
args = setup_parser().parse_args()
|
||||
|
||||
if getattr(args, "settings", False):
|
||||
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
|
||||
return
|
||||
if getattr(args, "settings", False):
|
||||
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
|
||||
return
|
||||
|
||||
logging.info("Initializing interface...")
|
||||
with app_state.lock:
|
||||
initialize_globals(args)
|
||||
logging.info("Starting main UI")
|
||||
logging.info("Initializing interface...")
|
||||
with app_state.lock:
|
||||
interface_state.interface = initialize_interface(args)
|
||||
|
||||
main_ui(stdscr)
|
||||
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
||||
prompt_region_if_unset(args)
|
||||
|
||||
except Exception as e:
|
||||
console_output = output_capture.getvalue()
|
||||
logging.error("Uncaught exception: %s", e)
|
||||
logging.error("Traceback: %s", traceback.format_exc())
|
||||
logging.error("Console output:\n%s", console_output)
|
||||
initialize_globals()
|
||||
logging.info("Starting main UI")
|
||||
|
||||
try:
|
||||
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
||||
main_ui(stdscr)
|
||||
except Exception:
|
||||
console_output = output_capture.getvalue()
|
||||
logging.error("Uncaught exception inside main_ui")
|
||||
logging.error("Traceback:\n%s", traceback.format_exc())
|
||||
logging.error("Console output:\n%s", console_output)
|
||||
return
|
||||
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
|
||||
def start() -> None:
|
||||
"""Launch curses wrapper and redirect logs to file."""
|
||||
"""Entry point for the application."""
|
||||
|
||||
if "--help" in sys.argv or "-h" in sys.argv:
|
||||
setup_parser().print_help()
|
||||
sys.exit(0)
|
||||
|
||||
with open(config.log_file_path, "a", buffering=1) as log_f:
|
||||
sys.stdout = log_f
|
||||
sys.stderr = log_f
|
||||
|
||||
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
|
||||
try:
|
||||
curses.wrapper(main)
|
||||
except KeyboardInterrupt:
|
||||
logging.info("User exited with Ctrl+C")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logging.error("Fatal error: %s", e)
|
||||
logging.error("Traceback: %s", traceback.format_exc())
|
||||
sys.exit(1)
|
||||
try:
|
||||
curses.wrapper(main)
|
||||
except KeyboardInterrupt:
|
||||
logging.info("User exited with Ctrl+C")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logging.critical("Fatal error", exc_info=True)
|
||||
try:
|
||||
curses.endwin()
|
||||
except Exception:
|
||||
pass
|
||||
print("Fatal error:", e)
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -3,11 +3,12 @@ import os
|
||||
import platform
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict
|
||||
|
||||
from contact.utilities.utils import refresh_node_list
|
||||
from contact.utilities.utils import (
|
||||
refresh_node_list,
|
||||
add_new_message,
|
||||
)
|
||||
from contact.ui.contact_ui import (
|
||||
draw_packetlog_win,
|
||||
draw_node_list,
|
||||
@@ -29,40 +30,43 @@ from contact.utilities.singleton import ui_state, interface_state, app_state
|
||||
def play_sound():
|
||||
try:
|
||||
system = platform.system()
|
||||
sound_path = ""
|
||||
executable = ""
|
||||
sound_path = None
|
||||
executable = None
|
||||
|
||||
if system == "Darwin": #macOS
|
||||
if system == "Darwin": # macOS
|
||||
sound_path = "/System/Library/Sounds/Ping.aiff"
|
||||
executable = "afplay"
|
||||
elif system == "Linux":
|
||||
sound_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
||||
if(shutil.which("paplay")):
|
||||
executable = "paplay"
|
||||
else:
|
||||
executable = "aplay"
|
||||
|
||||
if executable != "" and sound_path != "":
|
||||
if os.path.exists(sound_path):
|
||||
if shutil.which(executable):
|
||||
subprocess.run([executable, sound_path], check=True,
|
||||
stdout=subprocess.DEVNULL, stderr = subprocess.DEVNULL)
|
||||
return
|
||||
else:
|
||||
logging.warning("No sound player found (afplay/paplay/aplay)")
|
||||
elif system == "Linux":
|
||||
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
||||
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
|
||||
|
||||
if shutil.which("paplay") and os.path.exists(ogg_path):
|
||||
executable = "paplay"
|
||||
sound_path = ogg_path
|
||||
elif shutil.which("ffplay") and os.path.exists(ogg_path):
|
||||
executable = "ffplay"
|
||||
sound_path = ogg_path
|
||||
elif shutil.which("aplay") and os.path.exists(wav_path):
|
||||
executable = "aplay"
|
||||
sound_path = wav_path
|
||||
else:
|
||||
logging.warning(f"Sound file not found: {sound_path}")
|
||||
logging.warning("No suitable sound player or sound file found on Linux")
|
||||
|
||||
if executable and sound_path:
|
||||
cmd = [executable, sound_path]
|
||||
if executable == "ffplay":
|
||||
cmd = [executable, "-nodisp", "-autoexit", sound_path]
|
||||
|
||||
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
return
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
logging.error(f"Sound playback failed: {e}")
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error: {e}")
|
||||
|
||||
|
||||
|
||||
# Final fallback: terminal beep
|
||||
print("\a")
|
||||
|
||||
|
||||
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
"""
|
||||
Handles an incoming packet from a Meshtastic interface.
|
||||
@@ -121,7 +125,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
|
||||
channel_number = ui_state.channel_list.index(packet["from"])
|
||||
|
||||
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
|
||||
channel_id = ui_state.channel_list[channel_number]
|
||||
|
||||
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
|
||||
add_notification(channel_number)
|
||||
refresh_channels = True
|
||||
else:
|
||||
@@ -131,40 +137,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
message_from_id = packet["from"]
|
||||
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
|
||||
|
||||
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
|
||||
|
||||
# Timestamp handling
|
||||
current_timestamp = time.time()
|
||||
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||
|
||||
# Retrieve the last timestamp if available
|
||||
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
|
||||
if channel_messages:
|
||||
# Check the last entry for a timestamp
|
||||
for entry in reversed(channel_messages):
|
||||
if entry[0].startswith("--"):
|
||||
last_hour = entry[0].strip("- ").strip()
|
||||
break
|
||||
else:
|
||||
last_hour = None
|
||||
else:
|
||||
last_hour = None
|
||||
|
||||
# Add a new timestamp if it's a new hour
|
||||
if last_hour != current_hour:
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
|
||||
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
|
||||
(f"{config.message_prefix} {message_from_string} ", message_string)
|
||||
)
|
||||
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string)
|
||||
|
||||
if refresh_channels:
|
||||
draw_channel_list()
|
||||
if refresh_messages:
|
||||
draw_messages_window(True)
|
||||
|
||||
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
|
||||
save_message_to_db(channel_id, message_from_id, message_string)
|
||||
|
||||
except KeyError as e:
|
||||
logging.error(f"Error processing packet: {e}")
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict
|
||||
|
||||
import google.protobuf.json_format
|
||||
@@ -16,6 +15,8 @@ import contact.ui.default_config as config
|
||||
|
||||
from contact.utilities.singleton import ui_state, interface_state
|
||||
|
||||
from contact.utilities.utils import add_new_message
|
||||
|
||||
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
|
||||
|
||||
|
||||
@@ -146,8 +147,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
||||
update_node_info_in_db(packet["from"], chat_archived=False)
|
||||
|
||||
channel_number = ui_state.channel_list.index(packet["from"])
|
||||
channel_id = ui_state.channel_list[channel_number]
|
||||
|
||||
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
|
||||
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
|
||||
refresh_messages = True
|
||||
else:
|
||||
add_notification(channel_number)
|
||||
@@ -155,18 +157,14 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
||||
|
||||
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
|
||||
|
||||
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
|
||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
|
||||
(f"{config.message_prefix} {message_from_string}", msg_str)
|
||||
)
|
||||
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
|
||||
|
||||
if refresh_channels:
|
||||
draw_channel_list()
|
||||
if refresh_messages:
|
||||
draw_messages_window(True)
|
||||
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
|
||||
|
||||
save_message_to_db(channel_id, packet["from"], msg_str)
|
||||
|
||||
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
|
||||
"""
|
||||
@@ -190,32 +188,7 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
|
||||
channelIndex=send_on_channel,
|
||||
)
|
||||
|
||||
# Add sent message to the messages dictionary
|
||||
if channel_id not in ui_state.all_messages:
|
||||
ui_state.all_messages[channel_id] = []
|
||||
|
||||
# Handle timestamp logic
|
||||
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
|
||||
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||
|
||||
# Retrieve the last timestamp if available
|
||||
channel_messages = ui_state.all_messages[channel_id]
|
||||
if channel_messages:
|
||||
# Check the last entry for a timestamp
|
||||
for entry in reversed(channel_messages):
|
||||
if entry[0].startswith("--"):
|
||||
last_hour = entry[0].strip("- ").strip()
|
||||
break
|
||||
else:
|
||||
last_hour = None
|
||||
else:
|
||||
last_hour = None
|
||||
|
||||
# Add a new timestamp if it's a new hour
|
||||
if last_hour != current_hour:
|
||||
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
||||
|
||||
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
|
||||
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
|
||||
|
||||
timestamp = save_message_to_db(channel_id, myid, message)
|
||||
|
||||
@@ -230,10 +203,14 @@ def send_traceroute() -> None:
|
||||
"""
|
||||
Sends a RouteDiscovery protobuf to the selected node.
|
||||
"""
|
||||
|
||||
channel_id = ui_state.node_list[ui_state.selected_node]
|
||||
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
|
||||
|
||||
r = mesh_pb2.RouteDiscovery()
|
||||
interface_state.interface.sendData(
|
||||
r,
|
||||
destinationId=ui_state.node_list[ui_state.selected_node],
|
||||
destinationId=channel_id,
|
||||
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
|
||||
wantResponse=True,
|
||||
onResponse=on_response_traceroute,
|
||||
|
||||
@@ -335,8 +335,7 @@ def handle_ctrl_t(stdscr: curses.window) -> None:
|
||||
send_traceroute()
|
||||
curses.curs_set(0) # Hide cursor
|
||||
contact.ui.dialog.dialog(
|
||||
stdscr,
|
||||
"Traceroute Sent",
|
||||
f"Traceroute Sent To: {get_name_from_database(ui_state.node_list[ui_state.selected_node])}",
|
||||
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
|
||||
)
|
||||
curses.curs_set(1) # Show cursor again
|
||||
|
||||
@@ -150,6 +150,15 @@ def draw_help_window(
|
||||
)
|
||||
|
||||
|
||||
def get_input_type_for_field(field) -> type:
|
||||
if field.type in (field.TYPE_INT32, field.TYPE_UINT32, field.TYPE_INT64):
|
||||
return int
|
||||
elif field.type in (field.TYPE_FLOAT, field.TYPE_DOUBLE):
|
||||
return float
|
||||
else:
|
||||
return str
|
||||
|
||||
|
||||
def settings_menu(stdscr: object, interface: object) -> None:
|
||||
curses.update_lines_cols()
|
||||
|
||||
@@ -268,7 +277,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
break
|
||||
|
||||
elif selected_option == "Export Config File":
|
||||
filename = get_text_input("Enter a filename for the config file")
|
||||
|
||||
filename = get_text_input("Enter a filename for the config file", None, None)
|
||||
if not filename:
|
||||
logging.info("Export aborted: No filename provided.")
|
||||
menu_state.start_index.pop()
|
||||
@@ -290,7 +300,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
with open(yaml_file_path, "w", encoding="utf-8") as file:
|
||||
file.write(config_text)
|
||||
logging.info(f"Config file saved to {yaml_file_path}")
|
||||
dialog(stdscr, "Config File Saved:", yaml_file_path)
|
||||
dialog("Config File Saved:", yaml_file_path)
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
except PermissionError:
|
||||
@@ -306,14 +316,14 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
# Check if folder exists and is not empty
|
||||
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
|
||||
dialog(stdscr, "", " No config files found. Export a config first.")
|
||||
dialog("", " No config files found. Export a config first.")
|
||||
continue # Return to menu
|
||||
|
||||
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
|
||||
|
||||
# Ensure file_list is not empty before proceeding
|
||||
if not file_list:
|
||||
dialog(stdscr, "", " No config files found. Export a config first.")
|
||||
dialog("", " No config files found. Export a config first.")
|
||||
continue
|
||||
|
||||
filename = get_list_input("Choose a config file", None, file_list)
|
||||
@@ -327,7 +337,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
elif selected_option == "Config URL":
|
||||
current_value = interface.localNode.getURL()
|
||||
new_value = get_text_input(f"Config URL is currently: {current_value}")
|
||||
new_value = get_text_input(f"Config URL is currently: {current_value}", None, str)
|
||||
if new_value is not None:
|
||||
current_value = new_value
|
||||
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
|
||||
@@ -395,7 +405,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
|
||||
if selected_option in ["longName", "shortName", "isLicensed"]:
|
||||
if selected_option in ["longName", "shortName"]:
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, None
|
||||
)
|
||||
new_value = current_value if new_value is None else new_value
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
|
||||
@@ -414,7 +426,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif selected_option in ["latitude", "longitude", "altitude"]:
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, float
|
||||
)
|
||||
new_value = current_value if new_value is None else new_value
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
|
||||
@@ -453,17 +467,26 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 13: # Field type 13 corresponds to UINT32
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
input_type = get_input_type_for_field(field)
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
|
||||
)
|
||||
new_value = current_value if new_value is None else int(new_value)
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 2: # Field type 13 corresponds to INT64
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
input_type = get_input_type_for_field(field)
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
|
||||
)
|
||||
new_value = current_value if new_value is None else float(new_value)
|
||||
menu_state.start_index.pop()
|
||||
|
||||
else: # Handle other field types
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
input_type = get_input_type_for_field(field)
|
||||
new_value = get_text_input(
|
||||
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
|
||||
)
|
||||
new_value = current_value if new_value is None else new_value
|
||||
menu_state.start_index.pop()
|
||||
|
||||
|
||||
@@ -7,10 +7,56 @@ from typing import Dict
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
|
||||
|
||||
# Paths
|
||||
json_file_path = os.path.join(parent_dir, "config.json")
|
||||
log_file_path = os.path.join(parent_dir, "client.log")
|
||||
db_file_path = os.path.join(parent_dir, "client.db")
|
||||
# To test writting to a non-writable directory, you can uncomment the following lines:
|
||||
# mkdir /tmp/test_nonwritable
|
||||
# chmod -w /tmp/test_nonwritable
|
||||
# parent_dir = "/tmp/test_nonwritable"
|
||||
|
||||
|
||||
def _is_writable_dir(path: str) -> bool:
|
||||
"""
|
||||
Return True if we can create & delete a temp file in `path`.
|
||||
"""
|
||||
if not os.path.isdir(path):
|
||||
return False
|
||||
test_path = os.path.join(path, ".perm_test_tmp")
|
||||
try:
|
||||
with open(test_path, "w", encoding="utf-8") as _tmp:
|
||||
_tmp.write("ok")
|
||||
os.remove(test_path)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def _get_config_root(preferred_dir: str, fallback_name: str = ".contact_client") -> str:
|
||||
"""
|
||||
Choose a writable directory for config artifacts.
|
||||
"""
|
||||
if _is_writable_dir(preferred_dir):
|
||||
return preferred_dir
|
||||
|
||||
home = os.path.expanduser("~")
|
||||
fallback_dir = os.path.join(home, fallback_name)
|
||||
# Ensure the fallback exists.
|
||||
os.makedirs(fallback_dir, exist_ok=True)
|
||||
|
||||
# If *that* still isn't writable, last-ditch: use a system temp dir.
|
||||
if not _is_writable_dir(fallback_dir):
|
||||
import tempfile
|
||||
|
||||
fallback_dir = tempfile.mkdtemp(prefix="contact_client_")
|
||||
|
||||
return fallback_dir
|
||||
|
||||
|
||||
# Pick the root now.
|
||||
config_root = _get_config_root(parent_dir)
|
||||
|
||||
# Paths (derived from the chosen root)
|
||||
json_file_path = os.path.join(config_root, "config.json")
|
||||
log_file_path = os.path.join(config_root, "client.log")
|
||||
db_file_path = os.path.join(config_root, "client.db")
|
||||
|
||||
|
||||
def format_json_single_line_arrays(data: Dict[str, object], indent: int = 4) -> str:
|
||||
|
||||
@@ -2,14 +2,13 @@ import curses
|
||||
from contact.ui.colors import get_color
|
||||
|
||||
|
||||
def dialog(stdscr: curses.window, title: str, message: str) -> None:
|
||||
height, width = stdscr.getmaxyx()
|
||||
def dialog(title: str, message: str) -> None:
|
||||
|
||||
height, width = curses.LINES, curses.COLS
|
||||
|
||||
# Calculate dialog dimensions
|
||||
max_line_lengh = 0
|
||||
message_lines = message.splitlines()
|
||||
for l in message_lines:
|
||||
max_line_length = max(len(l), max_line_lengh)
|
||||
max_line_length = max(len(l) for l in message_lines)
|
||||
dialog_width = max(len(title) + 4, max_line_length + 4)
|
||||
dialog_height = len(message_lines) + 4
|
||||
x = (width - dialog_width) // 2
|
||||
@@ -24,12 +23,19 @@ def dialog(stdscr: curses.window, title: str, message: str) -> None:
|
||||
# Add title
|
||||
win.addstr(0, 2, title, get_color("settings_default"))
|
||||
|
||||
# Add message
|
||||
for i, l in enumerate(message_lines):
|
||||
win.addstr(2 + i, 2, l, get_color("settings_default"))
|
||||
# Add message (centered)
|
||||
for i, line in enumerate(message_lines):
|
||||
msg_x = (dialog_width - len(line)) // 2
|
||||
win.addstr(2 + i, msg_x, line, get_color("settings_default"))
|
||||
|
||||
# Add button
|
||||
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
|
||||
# Add centered OK button
|
||||
ok_text = " Ok "
|
||||
win.addstr(
|
||||
dialog_height - 2,
|
||||
(dialog_width - len(ok_text)) // 2,
|
||||
ok_text,
|
||||
get_color("settings_default", reverse=True),
|
||||
)
|
||||
|
||||
# Refresh dialog window
|
||||
win.refresh()
|
||||
@@ -37,8 +43,7 @@ def dialog(stdscr: curses.window, title: str, message: str) -> None:
|
||||
# Get user input
|
||||
while True:
|
||||
char = win.getch()
|
||||
# Close dialog with enter, space, or esc
|
||||
if char in (curses.KEY_ENTER, 10, 13, 32, 27):
|
||||
if char in (curses.KEY_ENTER, 10, 13, 32, 27): # Enter, space, or Esc
|
||||
win.erase()
|
||||
win.refresh()
|
||||
return
|
||||
|
||||
@@ -116,7 +116,14 @@ def load_messages_from_db() -> None:
|
||||
|
||||
# Add messages to ui_state.all_messages grouped by hourly timestamp
|
||||
hourly_messages = {}
|
||||
for user_id, message, timestamp, ack_type in db_messages:
|
||||
for row in db_messages:
|
||||
user_id, message, timestamp, ack_type = row
|
||||
|
||||
# Only ack_type is allowed to be None
|
||||
if user_id is None or message is None or timestamp is None:
|
||||
logging.warning(f"Skipping row with NULL required field(s): {row}")
|
||||
continue
|
||||
|
||||
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
|
||||
if hour not in hourly_messages:
|
||||
hourly_messages[hour] = []
|
||||
@@ -130,11 +137,13 @@ def load_messages_from_db() -> None:
|
||||
ack_str = config.nak_str
|
||||
|
||||
if user_id == str(interface_state.myNodeNum):
|
||||
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
|
||||
sanitized_message = message.replace("\x00", "")
|
||||
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", sanitized_message)
|
||||
else:
|
||||
sanitized_message = message.replace("\x00", "")
|
||||
formatted_message = (
|
||||
f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ",
|
||||
message,
|
||||
sanitized_message,
|
||||
)
|
||||
|
||||
hourly_messages[hour].append(formatted_message)
|
||||
|
||||
@@ -6,10 +6,43 @@ from typing import Any, Optional, List
|
||||
|
||||
from contact.ui.colors import get_color
|
||||
from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text
|
||||
from contact.ui.dialog import dialog
|
||||
from contact.utilities.validation_rules import get_validation_for
|
||||
|
||||
|
||||
def get_text_input(prompt: str) -> Optional[str]:
|
||||
def invalid_input(window: curses.window, message: str, redraw_func: Optional[callable] = None) -> None:
|
||||
"""Displays an invalid input message in the given window and redraws if needed."""
|
||||
cursor_y, cursor_x = window.getyx()
|
||||
curses.curs_set(0)
|
||||
dialog("Invalid Input", message)
|
||||
if redraw_func:
|
||||
redraw_func() # Redraw the original window content that got obscured
|
||||
else:
|
||||
window.refresh()
|
||||
window.move(cursor_y, cursor_x)
|
||||
curses.curs_set(1)
|
||||
|
||||
|
||||
def get_text_input(prompt: str, selected_config: str, input_type: str) -> Optional[str]:
|
||||
"""Handles user input with wrapped text for long prompts."""
|
||||
|
||||
def redraw_input_win():
|
||||
"""Redraw the input window with the current prompt and user input."""
|
||||
input_win.erase()
|
||||
input_win.border()
|
||||
row = 1
|
||||
for line in wrapped_prompt:
|
||||
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
|
||||
row += 1
|
||||
if row >= height - 3:
|
||||
break
|
||||
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
|
||||
input_win.addstr(row + 1, col_start, user_input[:first_line_width], get_color("settings_default"))
|
||||
for i, line in enumerate(wrap_text(user_input[first_line_width:], wrap_width=input_width)):
|
||||
if row + 2 + i < height - 1:
|
||||
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
|
||||
input_win.refresh()
|
||||
|
||||
height = 8
|
||||
width = 80
|
||||
margin = 2 # Left and right margin
|
||||
@@ -27,6 +60,7 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
# Wrap the prompt text
|
||||
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
|
||||
row = 1
|
||||
|
||||
for line in wrapped_prompt:
|
||||
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
|
||||
row += 1
|
||||
@@ -39,34 +73,115 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
input_win.refresh()
|
||||
curses.curs_set(1)
|
||||
|
||||
max_length = 4 if "shortName" in prompt else None
|
||||
user_input = ""
|
||||
min_value = 0
|
||||
max_value = 4294967295
|
||||
min_length = 0
|
||||
max_length = None
|
||||
|
||||
# Start user input after the prompt text
|
||||
if selected_config is not None:
|
||||
validation = get_validation_for(selected_config) or {}
|
||||
min_value = validation.get("min_value", 0)
|
||||
max_value = validation.get("max_value", 4294967295)
|
||||
min_length = validation.get("min_length", 0)
|
||||
max_length = validation.get("max_length")
|
||||
|
||||
user_input = ""
|
||||
col_start = margin + len(prompt_text)
|
||||
first_line_width = input_width - len(prompt_text) # Available space for first line
|
||||
first_line_width = input_width - len(prompt_text)
|
||||
|
||||
while True:
|
||||
key = input_win.get_wch() # Waits for user input
|
||||
key = input_win.get_wch()
|
||||
|
||||
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
|
||||
if key == chr(27) or key == curses.KEY_LEFT:
|
||||
input_win.erase()
|
||||
input_win.refresh()
|
||||
curses.curs_set(0)
|
||||
return None # Exit without saving
|
||||
return None
|
||||
|
||||
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): # Enter key
|
||||
break
|
||||
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
|
||||
if not user_input.strip():
|
||||
invalid_input(input_win, "Value cannot be empty.", redraw_func=redraw_input_win)
|
||||
continue
|
||||
|
||||
length = len(user_input)
|
||||
if min_length == max_length and max_length is not None:
|
||||
if length != min_length:
|
||||
invalid_input(
|
||||
input_win, f"Value must be exactly {min_length} characters long.", redraw_func=redraw_input_win
|
||||
)
|
||||
continue
|
||||
else:
|
||||
if length < min_length:
|
||||
invalid_input(
|
||||
input_win,
|
||||
f"Value must be at least {min_length} characters long.",
|
||||
redraw_func=redraw_input_win,
|
||||
)
|
||||
continue
|
||||
if max_length is not None and length > max_length:
|
||||
invalid_input(
|
||||
input_win,
|
||||
f"Value must be no more than {max_length} characters long.",
|
||||
redraw_func=redraw_input_win,
|
||||
)
|
||||
continue
|
||||
|
||||
if input_type is int:
|
||||
if not user_input.isdigit():
|
||||
invalid_input(input_win, "Only numeric digits (0–9) allowed.", redraw_func=redraw_input_win)
|
||||
continue
|
||||
|
||||
int_val = int(user_input)
|
||||
if not (min_value <= int_val <= max_value):
|
||||
invalid_input(
|
||||
input_win, f"Enter a number between {min_value} and {max_value}.", redraw_func=redraw_input_win
|
||||
)
|
||||
continue
|
||||
|
||||
curses.curs_set(0)
|
||||
return int_val
|
||||
|
||||
elif input_type is float:
|
||||
try:
|
||||
float_val = float(user_input)
|
||||
if not (min_value <= float_val <= max_value):
|
||||
invalid_input(
|
||||
input_win,
|
||||
f"Enter a number between {min_value} and {max_value}.",
|
||||
redraw_func=redraw_input_win,
|
||||
)
|
||||
continue
|
||||
except ValueError:
|
||||
invalid_input(input_win, "Must be a valid floating point number.", redraw_func=redraw_input_win)
|
||||
continue
|
||||
else:
|
||||
curses.curs_set(0)
|
||||
return float_val
|
||||
|
||||
else:
|
||||
break
|
||||
|
||||
elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace
|
||||
if user_input:
|
||||
user_input = user_input[:-1] # Remove last character
|
||||
|
||||
elif max_length is None or len(user_input) < max_length: # Enforce max length
|
||||
if isinstance(key, str):
|
||||
user_input += key
|
||||
else:
|
||||
user_input += chr(key)
|
||||
elif max_length is None or len(user_input) < max_length:
|
||||
try:
|
||||
char = chr(key) if not isinstance(key, str) else key
|
||||
if input_type is int:
|
||||
if char.isdigit() or (char == "-" and len(user_input) == 0):
|
||||
user_input += char
|
||||
elif input_type is float:
|
||||
if (
|
||||
char.isdigit()
|
||||
or (char == "." and "." not in user_input)
|
||||
or (char == "-" and len(user_input) == 0)
|
||||
):
|
||||
user_input += char
|
||||
else:
|
||||
user_input += char
|
||||
except ValueError:
|
||||
pass # Ignore invalid input
|
||||
|
||||
# First line must be manually handled before using wrap_text()
|
||||
first_line = user_input[:first_line_width] # Cut to max first line width
|
||||
@@ -95,10 +210,12 @@ def get_text_input(prompt: str) -> Optional[str]:
|
||||
curses.curs_set(0)
|
||||
input_win.erase()
|
||||
input_win.refresh()
|
||||
return user_input
|
||||
return user_input.strip()
|
||||
|
||||
|
||||
def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
"""Handles user input for editing up to 3 Admin Keys in Base64 format."""
|
||||
|
||||
def to_base64(byte_strings):
|
||||
"""Convert byte values to Base64-encoded strings."""
|
||||
return [base64.b64encode(b).decode() for b in byte_strings]
|
||||
@@ -130,7 +247,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
# Editable list of values (max 3 values)
|
||||
user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields
|
||||
cursor_pos = 0 # Track which value is being edited
|
||||
error_message = ""
|
||||
invalid_input = ""
|
||||
|
||||
while True:
|
||||
repeated_win.erase()
|
||||
@@ -150,8 +267,8 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
|
||||
|
||||
# Show error message if needed
|
||||
if error_message:
|
||||
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
|
||||
if invalid_input:
|
||||
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
|
||||
|
||||
repeated_win.refresh()
|
||||
key = repeated_win.getch()
|
||||
@@ -169,7 +286,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
curses.curs_set(0)
|
||||
return user_values # Return the edited Base64 values
|
||||
else:
|
||||
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
|
||||
invalid_input = "Error: Each key must be valid Base64 and 32 bytes long!"
|
||||
elif key == curses.KEY_UP: # Move cursor up
|
||||
cursor_pos = (cursor_pos - 1) % len(user_values)
|
||||
elif key == curses.KEY_DOWN: # Move cursor down
|
||||
@@ -180,7 +297,7 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
|
||||
else:
|
||||
try:
|
||||
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
|
||||
error_message = "" # Clear error if user starts fixing input
|
||||
invalid_input = "" # Clear error if user starts fixing input
|
||||
except ValueError:
|
||||
pass # Ignore invalid character inputs
|
||||
|
||||
@@ -202,7 +319,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
|
||||
# Editable list of values (max 3 values)
|
||||
user_values = current_value[:3]
|
||||
cursor_pos = 0 # Track which value is being edited
|
||||
error_message = ""
|
||||
invalid_input = ""
|
||||
|
||||
while True:
|
||||
repeated_win.erase()
|
||||
@@ -222,8 +339,8 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
|
||||
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
|
||||
|
||||
# Show error message if needed
|
||||
if error_message:
|
||||
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
|
||||
if invalid_input:
|
||||
repeated_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
|
||||
|
||||
repeated_win.refresh()
|
||||
key = repeated_win.getch()
|
||||
@@ -249,7 +366,7 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
|
||||
else:
|
||||
try:
|
||||
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
|
||||
error_message = "" # Clear error if user starts fixing input
|
||||
invalid_input = "" # Clear error if user starts fixing input
|
||||
except ValueError:
|
||||
pass # Ignore invalid character inputs
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import datetime
|
||||
import time
|
||||
from meshtastic.protobuf import config_pb2
|
||||
import contact.ui.default_config as config
|
||||
|
||||
@@ -134,3 +135,31 @@ def get_time_ago(timestamp):
|
||||
if unit != "s":
|
||||
return f"{value} {unit} ago"
|
||||
return "now"
|
||||
|
||||
def add_new_message(channel_id, prefix, message):
|
||||
if channel_id not in ui_state.all_messages:
|
||||
ui_state.all_messages[channel_id] = []
|
||||
|
||||
# Timestamp handling
|
||||
current_timestamp = time.time()
|
||||
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||
|
||||
# Retrieve the last timestamp if available
|
||||
channel_messages = ui_state.all_messages[channel_id]
|
||||
if channel_messages:
|
||||
# Check the last entry for a timestamp
|
||||
for entry in reversed(channel_messages):
|
||||
if entry[0].startswith("--"):
|
||||
last_hour = entry[0].strip("- ").strip()
|
||||
break
|
||||
else:
|
||||
last_hour = None
|
||||
else:
|
||||
last_hour = None
|
||||
|
||||
# Add a new timestamp if it's a new hour
|
||||
if last_hour != current_hour:
|
||||
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
||||
|
||||
# Add the message
|
||||
ui_state.all_messages[channel_id].append((prefix,message))
|
||||
|
||||
23
contact/utilities/validation_rules.py
Normal file
23
contact/utilities/validation_rules.py
Normal file
@@ -0,0 +1,23 @@
|
||||
validation_rules = {
|
||||
"shortName": {"max_length": 4},
|
||||
"longName": {"max_length": 32},
|
||||
"fixed_pin": {"min_length": 6, "max_length": 6},
|
||||
"position_flags": {"max_length": 3},
|
||||
"enabled_protocols": {"max_value": 2},
|
||||
"hop_limit": {"max_value": 7},
|
||||
"latitude": {"min_value": -90, "max_value": 90},
|
||||
"longitude": {"min_value": -180, "max_value": 180},
|
||||
"altitude": {"min_value": -4294967295, "max_value": 4294967295},
|
||||
"red": {"max_value": 255},
|
||||
"green": {"max_value": 255},
|
||||
"blue": {"max_value": 255},
|
||||
"current": {"max_value": 255},
|
||||
"position_precision": {"max_value": 32},
|
||||
}
|
||||
|
||||
|
||||
def get_validation_for(key: str) -> dict:
|
||||
for rule_key, config in validation_rules.items():
|
||||
if rule_key in key:
|
||||
return config
|
||||
return {}
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "contact"
|
||||
version = "1.3.14"
|
||||
version = "1.3.16"
|
||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||
authors = [
|
||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||
|
||||
Reference in New Issue
Block a user