mirror of
https://github.com/pdxlocations/contact.git
synced 2026-03-28 17:12:35 +01:00
Compare commits
71 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0ada7eb5b | ||
|
|
8d9bbac0be | ||
|
|
613eeb4fab | ||
|
|
f7b2645dcb | ||
|
|
bc5a5951d4 | ||
|
|
d7eec6de6e | ||
|
|
8779297424 | ||
|
|
ccc1399644 | ||
|
|
f52034e61f | ||
|
|
cdd1d89062 | ||
|
|
c3ff85a646 | ||
|
|
9b8cf19a0c | ||
|
|
f2e671da7f | ||
|
|
3fc1293db1 | ||
|
|
4abe9611e3 | ||
|
|
4d20df17fe | ||
|
|
3bb57b9420 | ||
|
|
e305bb4464 | ||
|
|
636b27cf9b | ||
|
|
8e500cb305 | ||
|
|
0878937194 | ||
|
|
ac2016322b | ||
|
|
031d74a290 | ||
|
|
14913ce5ae | ||
|
|
c9e39d89b0 | ||
|
|
dc27e9e02f | ||
|
|
4f64131d2e | ||
|
|
a55d68a828 | ||
|
|
bd41870567 | ||
|
|
5a722cbf7d | ||
|
|
9cbc2d51f8 | ||
|
|
5ce3e62fdb | ||
|
|
5628758de0 | ||
|
|
890a3b6dc4 | ||
|
|
db01d241c7 | ||
|
|
9044d8d380 | ||
|
|
0288a1d190 | ||
|
|
3674afc216 | ||
|
|
da24902bd0 | ||
|
|
f9bc7f9be9 | ||
|
|
ffd28c02a3 | ||
|
|
d22b3abc2f | ||
|
|
3c9b81f391 | ||
|
|
ecc360dba9 | ||
|
|
696370308f | ||
|
|
5999deac1a | ||
|
|
492c1d30d6 | ||
|
|
9e3b684a5f | ||
|
|
25f388ed23 | ||
|
|
07fbdb92e3 | ||
|
|
7c4cc1dd2f | ||
|
|
06ce9f7ac2 | ||
|
|
f4115e48ad | ||
|
|
857d8d0c04 | ||
|
|
ec0554df14 | ||
|
|
372204a684 | ||
|
|
8ff55c3de9 | ||
|
|
d9088ccd68 | ||
|
|
db8496b2e3 | ||
|
|
2b0f6515af | ||
|
|
4cd7c4e24d | ||
|
|
92a30ad8b0 | ||
|
|
2960553fef | ||
|
|
37c5a7dbc3 | ||
|
|
25240f211b | ||
|
|
c236a386a5 | ||
|
|
66a9954149 | ||
|
|
3df012dd69 | ||
|
|
6d204581ce | ||
|
|
c100539ff9 | ||
|
|
5e1ede0bea |
143
.github/workflows/release.yaml
vendored
Normal file
143
.github/workflows/release.yaml
vendored
Normal file
@@ -0,0 +1,143 @@
|
||||
name: release
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- "[0-9]+.[0-9]+.[0-9]+"
|
||||
- "[0-9]+.[0-9]+.[0-9]+a[0-9]+"
|
||||
- "[0-9]+.[0-9]+.[0-9]+b[0-9]+"
|
||||
- "[0-9]+.[0-9]+.[0-9]+rc[0-9]+"
|
||||
|
||||
env:
|
||||
PACKAGE_NAME: "contact"
|
||||
OWNER: "pdxlocations"
|
||||
|
||||
jobs:
|
||||
details:
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
new_version: ${{ steps.release.outputs.new_version }}
|
||||
suffix: ${{ steps.release.outputs.suffix }}
|
||||
tag_name: ${{ steps.release.outputs.tag_name }}
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Extract tag and Details
|
||||
id: release
|
||||
run: |
|
||||
if [ "${{ github.ref_type }}" = "tag" ]; then
|
||||
TAG_NAME=${GITHUB_REF#refs/tags/}
|
||||
NEW_VERSION=$(echo $TAG_NAME | awk -F'-' '{print $1}')
|
||||
SUFFIX=$(echo $TAG_NAME | grep -oP '[a-z]+[0-9]+' || echo "")
|
||||
echo "new_version=$NEW_VERSION" >> "$GITHUB_OUTPUT"
|
||||
echo "suffix=$SUFFIX" >> "$GITHUB_OUTPUT"
|
||||
echo "tag_name=$TAG_NAME" >> "$GITHUB_OUTPUT"
|
||||
echo "Version is $NEW_VERSION"
|
||||
echo "Suffix is $SUFFIX"
|
||||
echo "Tag name is $TAG_NAME"
|
||||
else
|
||||
echo "No tag found"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
check_pypi:
|
||||
needs: details
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Fetch information from PyPI
|
||||
run: |
|
||||
response=$(curl -s https://pypi.org/pypi/${{ env.PACKAGE_NAME }}/json || echo "{}")
|
||||
latest_previous_version=$(echo $response | jq --raw-output "select(.releases != null) | .releases | keys_unsorted | last")
|
||||
if [ -z "$latest_previous_version" ]; then
|
||||
echo "Package not found on PyPI."
|
||||
latest_previous_version="0.0.0"
|
||||
fi
|
||||
echo "Latest version on PyPI: $latest_previous_version"
|
||||
echo "latest_previous_version=$latest_previous_version" >> $GITHUB_ENV
|
||||
|
||||
- name: Compare versions and exit if not newer
|
||||
run: |
|
||||
NEW_VERSION=${{ needs.details.outputs.new_version }}
|
||||
LATEST_VERSION=$latest_previous_version
|
||||
if [ "$(printf '%s\n' "$LATEST_VERSION" "$NEW_VERSION" | sort -rV | head -n 1)" != "$NEW_VERSION" ] || [ "$NEW_VERSION" == "$LATEST_VERSION" ]; then
|
||||
echo "The new version $NEW_VERSION is not greater than the latest version $LATEST_VERSION on PyPI."
|
||||
exit 1
|
||||
else
|
||||
echo "The new version $NEW_VERSION is greater than the latest version $LATEST_VERSION on PyPI."
|
||||
fi
|
||||
|
||||
setup_and_build:
|
||||
needs: [details, check_pypi]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: "3.12"
|
||||
|
||||
- name: Install Poetry
|
||||
run: |
|
||||
curl -sSL https://install.python-poetry.org | python3 -
|
||||
echo "$HOME/.local/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Set project version with Poetry
|
||||
run: |
|
||||
poetry version ${{ needs.details.outputs.new_version }}
|
||||
|
||||
- name: Install dependencies
|
||||
run: poetry install --sync --no-interaction
|
||||
|
||||
- name: Build source and wheel distribution
|
||||
run: |
|
||||
poetry build
|
||||
|
||||
- name: Upload artifacts
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: dist
|
||||
path: dist/
|
||||
|
||||
pypi_publish:
|
||||
name: Upload release to PyPI
|
||||
needs: [setup_and_build, details]
|
||||
runs-on: ubuntu-latest
|
||||
environment:
|
||||
name: release
|
||||
permissions:
|
||||
id-token: write
|
||||
steps:
|
||||
- name: Download artifacts
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: dist
|
||||
path: dist/
|
||||
|
||||
- name: Publish distribution to PyPI
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
|
||||
github_release:
|
||||
name: Create GitHub Release
|
||||
needs: [setup_and_build, details]
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
steps:
|
||||
- name: Checkout Code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Download artifacts
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: dist
|
||||
path: dist/
|
||||
|
||||
- name: Create GitHub Release
|
||||
id: create_release
|
||||
env:
|
||||
GH_TOKEN: ${{ github.token }}
|
||||
run: |
|
||||
gh release create ${{ needs.details.outputs.tag_name }} dist/* --title ${{ needs.details.outputs.tag_name }} --generate-notes
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -7,4 +7,5 @@ client.db
|
||||
client.log
|
||||
settings.log
|
||||
config.json
|
||||
default_config.log
|
||||
default_config.log
|
||||
dist/
|
||||
13
.vscode/launch.json
vendored
Normal file
13
.vscode/launch.json
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
{
|
||||
"version": "0.1.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Python Debugger: Current File",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"cwd": "${workspaceFolder}",
|
||||
"module": "contact.__main__",
|
||||
"args": []
|
||||
}
|
||||
]
|
||||
}
|
||||
19
README.md
19
README.md
@@ -1,17 +1,21 @@
|
||||
## Contact - A Console UI for Meshtastic
|
||||
### (Formerly Curses Client for Meshtastic)
|
||||
|
||||
#### Powered by Meshtastic.org
|
||||
|
||||
### Install with:
|
||||
```bash
|
||||
pip install contact
|
||||
```
|
||||
|
||||
This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores.
|
||||
|
||||
|
||||
<img width="846" alt="Contact - Main UI Screenshot" src="https://github.com/user-attachments/assets/d2996bfb-2c6d-46a8-b820-92a9143375f4">
|
||||
|
||||
<br><br>
|
||||
The settings dialogue can be accessed within the client or may be run standalone to configure your node by launching `settings.py`
|
||||
The settings dialogue can be accessed within the client or may be run standalone to configure your node by launching `contact --settings` or `contact -c`
|
||||
|
||||
<img width="441" alt="Contact - Settings Dialogue" src="https://github.com/user-attachments/assets/dd47f52a-d4d8-4e40-8001-9ea53d87f816" />
|
||||
<img width="696" alt="Screenshot 2025-04-08 at 6 10 06 PM" src="https://github.com/user-attachments/assets/3d5e3964-f009-4772-bd6e-91b907c65a3b" />
|
||||
|
||||
## Message Persistence
|
||||
|
||||
@@ -48,17 +52,18 @@ Optional arguments to specify a device to connect to and how.
|
||||
- `--port`, `--serial`, `-s`: The port to connect to via serial, e.g. `/dev/ttyUSB0`.
|
||||
- `--host`, `--tcp`, `-t`: The hostname or IP address to connect to using TCP, will default to localhost if no host is passed.
|
||||
- `--ble`, `-b`: The BLE device MAC address or name to connect to.
|
||||
- `--settings`, `--set`, `--control`, `-c`: Launch directly into the settings.
|
||||
|
||||
If no connection arguments are specified, the client will attempt a serial connection and then a TCP connection to localhost.
|
||||
|
||||
### Example Usage
|
||||
|
||||
```sh
|
||||
python main.py --port /dev/ttyUSB0
|
||||
python main.py --host 192.168.1.1
|
||||
python main.py --ble BlAddressOfDevice
|
||||
contact --port /dev/ttyUSB0
|
||||
contact --host 192.168.1.1
|
||||
contact --ble BlAddressOfDevice
|
||||
```
|
||||
To quickly connect to localhost, use:
|
||||
```sh
|
||||
python main.py -t
|
||||
contact -t
|
||||
```
|
||||
|
||||
133
contact/__main__.py
Normal file
133
contact/__main__.py
Normal file
@@ -0,0 +1,133 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Contact - A Console UI for Meshtastic by http://github.com/pdxlocations
|
||||
Powered by Meshtastic.org
|
||||
|
||||
Meshtastic® is a registered trademark of Meshtastic LLC.
|
||||
Meshtastic software components are released under various licenses—see GitHub for details.
|
||||
No warranty is provided. Use at your own risk.
|
||||
"""
|
||||
|
||||
# Standard library
|
||||
import contextlib
|
||||
import curses
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
# Third-party
|
||||
from pubsub import pub
|
||||
|
||||
# Local application
|
||||
import contact.globals as globals
|
||||
import contact.ui.default_config as config
|
||||
from contact.message_handlers.rx_handler import on_receive
|
||||
from contact.settings import set_region
|
||||
from contact.ui.colors import setup_colors
|
||||
from contact.ui.contact_ui import main_ui
|
||||
from contact.ui.splash import draw_splash
|
||||
from contact.utilities.arg_parser import setup_parser
|
||||
from contact.utilities.db_handler import init_nodedb, load_messages_from_db
|
||||
from contact.utilities.input_handlers import get_list_input
|
||||
from contact.utilities.interfaces import initialize_interface
|
||||
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Environment & Logging Setup
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
os.environ["NCURSES_NO_UTF8_ACS"] = "1"
|
||||
os.environ["LANG"] = "C.UTF-8"
|
||||
os.environ.setdefault("TERM", "xterm-256color")
|
||||
if os.environ.get("COLORTERM") == "gnome-terminal":
|
||||
os.environ["TERM"] = "xterm-256color"
|
||||
|
||||
logging.basicConfig(
|
||||
filename=config.log_file_path,
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
|
||||
globals.lock = threading.Lock()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Main Program Logic
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
def initialize_globals(args) -> None:
|
||||
"""Initializes interface and shared globals."""
|
||||
globals.interface = initialize_interface(args)
|
||||
|
||||
# Prompt for region if unset
|
||||
if globals.interface.localNode.localConfig.lora.region == 0:
|
||||
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
set_region(globals.interface)
|
||||
globals.interface.close()
|
||||
globals.interface = initialize_interface(args)
|
||||
|
||||
globals.myNodeNum = get_nodeNum()
|
||||
globals.channel_list = get_channels()
|
||||
globals.node_list = get_node_list()
|
||||
pub.subscribe(on_receive, 'meshtastic.receive')
|
||||
|
||||
init_nodedb()
|
||||
load_messages_from_db()
|
||||
|
||||
|
||||
def main(stdscr: curses.window) -> None:
|
||||
"""Main entry point for the curses UI."""
|
||||
output_capture = io.StringIO()
|
||||
|
||||
try:
|
||||
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
||||
setup_colors()
|
||||
draw_splash(stdscr)
|
||||
|
||||
args = setup_parser().parse_args()
|
||||
|
||||
if getattr(args, 'settings', False):
|
||||
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
|
||||
return
|
||||
|
||||
logging.info("Initializing interface...")
|
||||
with globals.lock:
|
||||
initialize_globals(args)
|
||||
logging.info("Starting main UI")
|
||||
|
||||
main_ui(stdscr)
|
||||
|
||||
except Exception as e:
|
||||
console_output = output_capture.getvalue()
|
||||
logging.error("Uncaught exception: %s", e)
|
||||
logging.error("Traceback: %s", traceback.format_exc())
|
||||
logging.error("Console output:\n%s", console_output)
|
||||
raise
|
||||
|
||||
|
||||
def start() -> None:
|
||||
"""Launch curses wrapper and redirect logs to file."""
|
||||
with open(config.log_file_path, "a", buffering=1) as log_f:
|
||||
sys.stdout = log_f
|
||||
sys.stderr = log_f
|
||||
|
||||
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
|
||||
try:
|
||||
curses.wrapper(main)
|
||||
except KeyboardInterrupt:
|
||||
logging.info("User exited with Ctrl+C")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logging.error("Fatal error: %s", e)
|
||||
logging.error("Traceback: %s", traceback.format_exc())
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
start()
|
||||
@@ -78,6 +78,13 @@ dns, "IPv4 DNS server", ""
|
||||
rsyslog_server, "RSyslog server", ""
|
||||
enabled_protocols, "Enabled protocols", ""
|
||||
|
||||
[config.network.ipv4_config]
|
||||
title, "IPv4 Config", ""
|
||||
ip, "IP", ""
|
||||
gateway, "Gateway", ""
|
||||
subnet, "Subnet", ""
|
||||
dns, "DNS", ""
|
||||
|
||||
[config.display]
|
||||
title, "Display"
|
||||
screen_on_secs, "Screen on duration", "How long the screen remains on in seconds after the user button is pressed or messages are received."
|
||||
@@ -105,6 +112,35 @@ theme, "Theme", ""
|
||||
alert_enabled, "Alert enabled", ""
|
||||
banner_enabled, "Banner enabled", ""
|
||||
ring_tone_id, "Ring tone ID", ""
|
||||
language, "Language", ""
|
||||
node_filter, "Node Filter", ""
|
||||
node_highlight, "Node Highlight", ""
|
||||
calibration_data, "Calibration Data", ""
|
||||
map_data, "Map Data", ""
|
||||
|
||||
[config.device_ui.node_filter]
|
||||
title, "Node Filter"
|
||||
unknown_switch, "Unknown Switch", ""
|
||||
offline_switch, "Offline Switch", ""
|
||||
public_key_switch, "Public Key Switch", ""
|
||||
hops_away, "Hops Away", ""
|
||||
position_switch, "Position Switch", ""
|
||||
node_name, "Node Name", ""
|
||||
channel, "Channel", ""
|
||||
|
||||
[config.device_ui.node_highlight]
|
||||
title, "Node Highlight"
|
||||
chat_switch, "Chat Switch", ""
|
||||
position_switch, "Position Switch", ""
|
||||
telemetry_switch, "Telemetry Switch", ""
|
||||
iaq_switch, "IAQ Switch", ""
|
||||
node_name, "Node Name", ""
|
||||
|
||||
[config.device_ui.map_data]
|
||||
title, "Map Data"
|
||||
home, "Home", ""
|
||||
style, "Style", ""
|
||||
follow_gps, "Follow GPS", ""
|
||||
|
||||
[config.lora]
|
||||
title, "LoRa"
|
||||
@@ -1,17 +1,34 @@
|
||||
import logging
|
||||
import time
|
||||
from utilities.utils import refresh_node_list
|
||||
from datetime import datetime
|
||||
from ui.curses_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
|
||||
from utilities.db_handler import save_message_to_db, maybe_store_nodeinfo_in_db, get_name_from_database, update_node_info_in_db
|
||||
import ui.default_config as config
|
||||
import globals
|
||||
from typing import Any
|
||||
|
||||
from contact.utilities.utils import refresh_node_list
|
||||
from contact.ui.contact_ui import (
|
||||
draw_packetlog_win,
|
||||
draw_node_list,
|
||||
draw_messages_window,
|
||||
draw_channel_list,
|
||||
add_notification,
|
||||
)
|
||||
from contact.utilities.db_handler import (
|
||||
save_message_to_db,
|
||||
maybe_store_nodeinfo_in_db,
|
||||
get_name_from_database,
|
||||
update_node_info_in_db,
|
||||
)
|
||||
import contact.ui.default_config as config
|
||||
import contact.globals as globals
|
||||
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
def on_receive(packet, interface):
|
||||
def on_receive(packet: dict[str, Any], interface: Any) -> None:
|
||||
"""
|
||||
Handles an incoming packet from a Meshtastic interface.
|
||||
|
||||
Args:
|
||||
packet: The received Meshtastic packet as a dictionary.
|
||||
interface: The Meshtastic interface instance that received the packet.
|
||||
"""
|
||||
with globals.lock:
|
||||
# Update packet log
|
||||
globals.packet_buffer.append(packet)
|
||||
@@ -1,18 +1,30 @@
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import google.protobuf.json_format
|
||||
from meshtastic import BROADCAST_NUM
|
||||
from meshtastic.protobuf import mesh_pb2, portnums_pb2
|
||||
|
||||
from utilities.db_handler import save_message_to_db, update_ack_nak, get_name_from_database, is_chat_archived, update_node_info_in_db
|
||||
import ui.default_config as config
|
||||
import globals
|
||||
from contact.utilities.db_handler import (
|
||||
save_message_to_db,
|
||||
update_ack_nak,
|
||||
get_name_from_database,
|
||||
is_chat_archived,
|
||||
update_node_info_in_db,
|
||||
)
|
||||
import contact.ui.default_config as config
|
||||
import contact.globals as globals
|
||||
|
||||
ack_naks: dict[str, dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
|
||||
|
||||
ack_naks = {}
|
||||
|
||||
# Note "onAckNak" has special meaning to the API, thus the nonstandard naming convention
|
||||
# See https://github.com/meshtastic/python/blob/master/meshtastic/mesh_interface.py#L462
|
||||
def onAckNak(packet):
|
||||
from ui.curses_ui import draw_messages_window
|
||||
def onAckNak(packet: dict[str, Any]) -> None:
|
||||
"""
|
||||
Handles incoming ACK/NAK response packets.
|
||||
"""
|
||||
from contact.ui.contact_ui import draw_messages_window
|
||||
request = packet['decoded']['requestId']
|
||||
if(request not in ack_naks):
|
||||
return
|
||||
@@ -41,9 +53,11 @@ def onAckNak(packet):
|
||||
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
|
||||
draw_messages_window()
|
||||
|
||||
def on_response_traceroute(packet):
|
||||
"""on response for trace route"""
|
||||
from ui.curses_ui import draw_channel_list, draw_messages_window, add_notification
|
||||
def on_response_traceroute(packet: dict[str, Any]) -> None:
|
||||
"""
|
||||
Handle traceroute response packets and render the route visually in the UI.
|
||||
"""
|
||||
from contact.ui.contact_ui import draw_channel_list, draw_messages_window, add_notification
|
||||
|
||||
refresh_channels = False
|
||||
refresh_messages = False
|
||||
@@ -118,7 +132,10 @@ def on_response_traceroute(packet):
|
||||
save_message_to_db(globals.channel_list[channel_number], packet['from'], msg_str)
|
||||
|
||||
|
||||
def send_message(message, destination=BROADCAST_NUM, channel=0):
|
||||
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
|
||||
"""
|
||||
Sends a chat message using the selected channel.
|
||||
"""
|
||||
myid = globals.myNodeNum
|
||||
send_on_channel = 0
|
||||
channel_id = globals.channel_list[channel]
|
||||
@@ -168,7 +185,10 @@ def send_message(message, destination=BROADCAST_NUM, channel=0):
|
||||
|
||||
ack_naks[sent_message_data.id] = {'channel': channel_id, 'messageIndex': len(globals.all_messages[channel_id]) - 1, 'timestamp': timestamp}
|
||||
|
||||
def send_traceroute():
|
||||
def send_traceroute() -> None:
|
||||
"""
|
||||
Sends a RouteDiscovery protobuf to the selected node.
|
||||
"""
|
||||
r = mesh_pb2.RouteDiscovery()
|
||||
globals.interface.sendData(
|
||||
r,
|
||||
@@ -5,16 +5,16 @@ import logging
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import ui.default_config as config
|
||||
from utilities.input_handlers import get_list_input
|
||||
from ui.colors import setup_colors
|
||||
from ui.splash import draw_splash
|
||||
from ui.control_ui import set_region, settings_menu
|
||||
from utilities.arg_parser import setup_parser
|
||||
from utilities.interfaces import initialize_interface
|
||||
import contact.ui.default_config as config
|
||||
from contact.utilities.input_handlers import get_list_input
|
||||
from contact.ui.colors import setup_colors
|
||||
from contact.ui.splash import draw_splash
|
||||
from contact.ui.control_ui import set_region, settings_menu
|
||||
from contact.utilities.arg_parser import setup_parser
|
||||
from contact.utilities.interfaces import initialize_interface
|
||||
|
||||
|
||||
def main(stdscr):
|
||||
def main(stdscr: curses.window) -> None:
|
||||
output_capture = io.StringIO()
|
||||
try:
|
||||
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
||||
@@ -1,5 +1,5 @@
|
||||
import curses
|
||||
import ui.default_config as config
|
||||
import contact.ui.default_config as config
|
||||
|
||||
COLOR_MAP = {
|
||||
"black": curses.COLOR_BLACK,
|
||||
@@ -12,7 +12,7 @@ COLOR_MAP = {
|
||||
"white": curses.COLOR_WHITE
|
||||
}
|
||||
|
||||
def setup_colors(reinit=False):
|
||||
def setup_colors(reinit: bool = False) -> None:
|
||||
"""
|
||||
Initialize curses color pairs based on the COLOR_CONFIG.
|
||||
"""
|
||||
@@ -29,7 +29,7 @@ def setup_colors(reinit=False):
|
||||
print()
|
||||
|
||||
|
||||
def get_color(category, bold=False, reverse=False, underline=False):
|
||||
def get_color(category: str, bold: bool = False, reverse: bool = False, underline: bool = False) -> int:
|
||||
"""
|
||||
Retrieve a curses color pair with optional attributes.
|
||||
"""
|
||||
@@ -1,17 +1,19 @@
|
||||
import curses
|
||||
import textwrap
|
||||
import time
|
||||
import logging
|
||||
import traceback
|
||||
from utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
|
||||
from settings import settings_menu
|
||||
from message_handlers.tx_handler import send_message, send_traceroute
|
||||
from ui.colors import setup_colors, get_color
|
||||
from utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
|
||||
import ui.default_config as config
|
||||
import ui.dialog
|
||||
import globals
|
||||
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
|
||||
from contact.settings import settings_menu
|
||||
from contact.message_handlers.tx_handler import send_message, send_traceroute
|
||||
from contact.ui.colors import setup_colors, get_color
|
||||
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
|
||||
from contact.utilities.input_handlers import get_list_input
|
||||
import contact.ui.default_config as config
|
||||
import contact.ui.dialog
|
||||
import contact.globals as globals
|
||||
|
||||
def handle_resize(stdscr, firstrun):
|
||||
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
||||
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win
|
||||
|
||||
# Calculate window max dimensions
|
||||
@@ -88,7 +90,7 @@ def handle_resize(stdscr, firstrun):
|
||||
pass
|
||||
|
||||
|
||||
def main_ui(stdscr):
|
||||
def main_ui(stdscr: curses.window) -> None:
|
||||
global input_text
|
||||
input_text = ""
|
||||
stdscr.keypad(True)
|
||||
@@ -212,7 +214,7 @@ def main_ui(stdscr):
|
||||
elif char == chr(20):
|
||||
send_traceroute()
|
||||
curses.curs_set(0) # Hide cursor
|
||||
ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
|
||||
contact.ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
|
||||
curses.curs_set(1) # Show cursor again
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
@@ -293,10 +295,79 @@ def main_ui(stdscr):
|
||||
draw_channel_list()
|
||||
draw_messages_window()
|
||||
|
||||
if(globals.current_window == 2):
|
||||
curses.curs_set(0)
|
||||
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from nodedb?", "No", ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
globals.interface.localNode.removeNode(globals.node_list[globals.selected_node])
|
||||
|
||||
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
|
||||
del(globals.interface.nodesByNum[globals.node_list[globals.selected_node]])
|
||||
|
||||
# Convert to "!hex" representation that interface.nodes uses
|
||||
hexid = f"!{hex(globals.node_list[globals.selected_node])[2:]}"
|
||||
del(globals.interface.nodes[hexid])
|
||||
|
||||
globals.node_list.pop(globals.selected_node)
|
||||
|
||||
draw_messages_window()
|
||||
draw_node_list()
|
||||
else:
|
||||
draw_messages_window()
|
||||
curses.curs_set(1)
|
||||
continue
|
||||
|
||||
# ^/
|
||||
elif char == chr(31):
|
||||
if(globals.current_window == 2 or globals.current_window == 0):
|
||||
search(globals.current_window)
|
||||
|
||||
# ^F
|
||||
elif char == chr(6):
|
||||
if globals.current_window == 2:
|
||||
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
|
||||
|
||||
curses.curs_set(0)
|
||||
|
||||
if 'isFavorite' not in selectedNode or selectedNode['isFavorite'] == False:
|
||||
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Favorite?", None, ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
globals.interface.localNode.setFavorite(globals.node_list[globals.selected_node])
|
||||
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
|
||||
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = True
|
||||
|
||||
refresh_node_list()
|
||||
|
||||
else:
|
||||
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Favorites?", None, ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
globals.interface.localNode.removeFavorite(globals.node_list[globals.selected_node])
|
||||
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
|
||||
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = False
|
||||
|
||||
refresh_node_list()
|
||||
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
elif char == chr(7):
|
||||
if globals.current_window == 2:
|
||||
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
|
||||
|
||||
curses.curs_set(0)
|
||||
|
||||
if 'isIgnored' not in selectedNode or selectedNode['isIgnored'] == False:
|
||||
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Ignored?", "No", ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
globals.interface.localNode.setIgnored(globals.node_list[globals.selected_node])
|
||||
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = True
|
||||
else:
|
||||
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Ignored?", "No", ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
globals.interface.localNode.removeIgnored(globals.node_list[globals.selected_node])
|
||||
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = False
|
||||
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
else:
|
||||
# Append typed character to input text
|
||||
if(isinstance(char, str)):
|
||||
@@ -306,7 +377,7 @@ def main_ui(stdscr):
|
||||
|
||||
|
||||
|
||||
def draw_channel_list():
|
||||
def draw_channel_list() -> None:
|
||||
channel_pad.erase()
|
||||
win_height, win_width = channel_win.getmaxyx()
|
||||
start_index = max(0, globals.selected_channel - (win_height - 3)) # Leave room for borders
|
||||
@@ -347,7 +418,7 @@ def draw_channel_list():
|
||||
|
||||
refresh_pad(0)
|
||||
|
||||
def draw_messages_window(scroll_to_bottom = False):
|
||||
def draw_messages_window(scroll_to_bottom: bool = False) -> None:
|
||||
"""Update the messages window based on the selected channel and scroll position."""
|
||||
messages_pad.erase()
|
||||
|
||||
@@ -390,7 +461,7 @@ def draw_messages_window(scroll_to_bottom = False):
|
||||
|
||||
draw_packetlog_win()
|
||||
|
||||
def draw_node_list():
|
||||
def draw_node_list() -> None:
|
||||
global nodes_pad
|
||||
|
||||
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
|
||||
@@ -411,7 +482,12 @@ def draw_node_list():
|
||||
node = globals.interface.nodesByNum[node_num]
|
||||
secure = 'user' in node and 'publicKey' in node['user'] and node['user']['publicKey']
|
||||
node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, 'long')}".ljust(box_width - 2)[:box_width - 2]
|
||||
nodes_pad.addstr(i, 1, node_str, get_color("node_list", reverse=globals.selected_node == i and globals.current_window == 2))
|
||||
color = "node_list"
|
||||
if 'isFavorite' in node and node['isFavorite']:
|
||||
color = "node_favorite"
|
||||
if 'isIgnored' in node and node['isIgnored']:
|
||||
color = "node_ignored"
|
||||
nodes_pad.addstr(i, 1, node_str, get_color(color, reverse=globals.selected_node == i and globals.current_window == 2))
|
||||
|
||||
nodes_win.attrset(get_color("window_frame_selected") if globals.current_window == 2 else get_color("window_frame"))
|
||||
nodes_win.box()
|
||||
@@ -425,7 +501,7 @@ def draw_node_list():
|
||||
curses.curs_set(1)
|
||||
entry_win.refresh()
|
||||
|
||||
def select_channel(idx):
|
||||
def select_channel(idx: int) -> None:
|
||||
old_selected_channel = globals.selected_channel
|
||||
globals.selected_channel = max(0, min(idx, len(globals.channel_list) - 1))
|
||||
draw_messages_window(True)
|
||||
@@ -439,7 +515,7 @@ def select_channel(idx):
|
||||
highlight_line(True, 0, globals.selected_channel)
|
||||
refresh_pad(0)
|
||||
|
||||
def scroll_channels(direction):
|
||||
def scroll_channels(direction: int) -> None:
|
||||
new_selected_channel = globals.selected_channel + direction
|
||||
|
||||
if new_selected_channel < 0:
|
||||
@@ -449,7 +525,7 @@ def scroll_channels(direction):
|
||||
|
||||
select_channel(new_selected_channel)
|
||||
|
||||
def scroll_messages(direction):
|
||||
def scroll_messages(direction: int) -> None:
|
||||
globals.selected_message += direction
|
||||
|
||||
msg_line_count = messages_pad.getmaxyx()[0]
|
||||
@@ -457,7 +533,7 @@ def scroll_messages(direction):
|
||||
|
||||
refresh_pad(1)
|
||||
|
||||
def select_node(idx):
|
||||
def select_node(idx: int) -> None:
|
||||
old_selected_node = globals.selected_node
|
||||
globals.selected_node = max(0, min(idx, len(globals.node_list) - 1))
|
||||
|
||||
@@ -467,7 +543,7 @@ def select_node(idx):
|
||||
|
||||
draw_function_win()
|
||||
|
||||
def scroll_nodes(direction):
|
||||
def scroll_nodes(direction: int) -> None:
|
||||
new_selected_node = globals.selected_node + direction
|
||||
|
||||
if new_selected_node < 0:
|
||||
@@ -477,7 +553,7 @@ def scroll_nodes(direction):
|
||||
|
||||
select_node(new_selected_node)
|
||||
|
||||
def draw_packetlog_win():
|
||||
def draw_packetlog_win() -> None:
|
||||
|
||||
columns = [10,10,15,30]
|
||||
span = 0
|
||||
@@ -526,7 +602,7 @@ def draw_packetlog_win():
|
||||
curses.curs_set(1)
|
||||
entry_win.refresh()
|
||||
|
||||
def search(win):
|
||||
def search(win: int) -> None:
|
||||
start_idx = globals.selected_node
|
||||
select_func = select_node
|
||||
|
||||
@@ -569,7 +645,7 @@ def search(win):
|
||||
|
||||
entry_win.erase()
|
||||
|
||||
def draw_node_details():
|
||||
def draw_node_details() -> None:
|
||||
node = None
|
||||
try:
|
||||
node = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
|
||||
@@ -617,8 +693,8 @@ def draw_node_details():
|
||||
|
||||
draw_centered_text_field(function_win, nodestr, 0, get_color("commands"))
|
||||
|
||||
def draw_help():
|
||||
cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit", " ^t = Traceroute", " ^d = Archive Chat"]
|
||||
def draw_help() -> None:
|
||||
cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit", " ^t = Traceroute", " ^d = Archive Chat", " ^f = Favorite", " ^g = Ignore"]
|
||||
function_str = ""
|
||||
for s in cmds:
|
||||
if(len(function_str) + len(s) < function_win.getmaxyx()[1] - 2):
|
||||
@@ -626,19 +702,18 @@ def draw_help():
|
||||
|
||||
draw_centered_text_field(function_win, function_str, 0, get_color("commands"))
|
||||
|
||||
def draw_function_win():
|
||||
def draw_function_win() -> None:
|
||||
if(globals.current_window == 2):
|
||||
draw_node_details()
|
||||
else:
|
||||
draw_help()
|
||||
|
||||
def get_msg_window_lines():
|
||||
def get_msg_window_lines() -> None:
|
||||
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if globals.display_log else 0
|
||||
return messages_win.getmaxyx()[0] - 2 - packetlog_height
|
||||
|
||||
def refresh_pad(window):
|
||||
# global messages_pad, nodes_pad, channel_pad
|
||||
|
||||
def refresh_pad(window: int) -> None:
|
||||
|
||||
win_height = channel_win.getmaxyx()[0]
|
||||
|
||||
if(window == 1):
|
||||
@@ -670,11 +745,20 @@ def refresh_pad(window):
|
||||
box.getbegyx()[0] + 1, box.getbegyx()[1] + 1,
|
||||
box.getbegyx()[0] + lines, box.getbegyx()[1] + box.getmaxyx()[1] - 2)
|
||||
|
||||
def highlight_line(highlight, window, line):
|
||||
def highlight_line(highlight: bool, window: int, line: int) -> None:
|
||||
pad = nodes_pad
|
||||
|
||||
color = get_color("node_list")
|
||||
select_len = nodes_win.getmaxyx()[1] - 2
|
||||
|
||||
if window == 2:
|
||||
node_num = globals.node_list[line]
|
||||
node = globals.interface.nodesByNum[node_num]
|
||||
if 'isFavorite' in node and node['isFavorite']:
|
||||
color = get_color("node_favorite")
|
||||
if 'isIgnored' in node and node['isIgnored']:
|
||||
color = get_color("node_ignored")
|
||||
|
||||
if(window == 0):
|
||||
pad = channel_pad
|
||||
color = get_color("channel_selected" if (line == globals.selected_channel and highlight == False) else "channel_list")
|
||||
@@ -682,25 +766,25 @@ def highlight_line(highlight, window, line):
|
||||
|
||||
pad.chgat(line, 1, select_len, color | curses.A_REVERSE if highlight else color)
|
||||
|
||||
def add_notification(channel_number):
|
||||
def add_notification(channel_number: int) -> None:
|
||||
if channel_number not in globals.notifications:
|
||||
globals.notifications.append(channel_number)
|
||||
|
||||
def remove_notification(channel_number):
|
||||
def remove_notification(channel_number: int) -> None:
|
||||
if channel_number in globals.notifications:
|
||||
globals.notifications.remove(channel_number)
|
||||
|
||||
def draw_text_field(win, text, color):
|
||||
def draw_text_field(win: curses.window, text: str, color: int) -> None:
|
||||
win.border()
|
||||
win.addstr(1, 1, text, color)
|
||||
|
||||
def draw_centered_text_field(win, text, y_offset, color):
|
||||
def draw_centered_text_field(win: curses.window, text: str, y_offset: int, color: int) -> None:
|
||||
height, width = win.getmaxyx()
|
||||
x = (width - len(text)) // 2
|
||||
y = (height // 2) + y_offset
|
||||
win.addstr(y, x, text, color)
|
||||
win.refresh()
|
||||
|
||||
def draw_debug(value):
|
||||
def draw_debug(value: str | int) -> None:
|
||||
function_win.addstr(1, 1, f"debug: {value} ")
|
||||
function_win.refresh()
|
||||
function_win.refresh()
|
||||
@@ -5,14 +5,17 @@ import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
from utilities.save_to_radio import save_changes
|
||||
from utilities.config_io import config_export, config_import
|
||||
from utilities.input_handlers import get_repeated_input, get_text_input, get_fixed32_input, get_list_input, get_admin_key_input
|
||||
from ui.menus import generate_menu_from_protobuf
|
||||
from ui.colors import get_color
|
||||
from ui.dialog import dialog
|
||||
from utilities.control_utils import parse_ini_file, transform_menu_path
|
||||
from ui.user_config import json_editor
|
||||
from contact.utilities.save_to_radio import save_changes
|
||||
from contact.utilities.config_io import config_export, config_import
|
||||
from contact.utilities.input_handlers import get_repeated_input, get_text_input, get_fixed32_input, get_list_input, get_admin_key_input
|
||||
from contact.ui.menus import generate_menu_from_protobuf
|
||||
from contact.ui.colors import get_color
|
||||
from contact.ui.dialog import dialog
|
||||
from contact.utilities.control_utils import parse_ini_file, transform_menu_path
|
||||
from contact.ui.user_config import json_editor
|
||||
from contact.ui.ui_state import MenuState
|
||||
|
||||
menu_state = MenuState()
|
||||
|
||||
# Constants
|
||||
width = 80
|
||||
@@ -27,20 +30,21 @@ parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
|
||||
|
||||
# Paths
|
||||
locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
|
||||
translation_file = os.path.join(locals_dir, "localisations", "en.ini")
|
||||
config_folder = os.path.join(parent_dir, "node-configs")
|
||||
translation_file = os.path.join(parent_dir, "localisations", "en.ini")
|
||||
|
||||
config_folder = os.path.join(locals_dir, "node-configs")
|
||||
|
||||
# Load translations
|
||||
field_mapping, help_text = parse_ini_file(translation_file)
|
||||
|
||||
# Aliases
|
||||
Segment = tuple[str, str, bool, bool]
|
||||
WrappedLine = list[Segment]
|
||||
|
||||
def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.window or pad types
|
||||
|
||||
def display_menu(current_menu, menu_path, selected_index, show_save_option, help_text):
|
||||
min_help_window_height = 6
|
||||
num_items = len(current_menu) + (1 if show_save_option else 0)
|
||||
# Track visible range
|
||||
global start_index
|
||||
if 'start_index' not in globals():
|
||||
start_index = [0] # Initialize if not set
|
||||
num_items = len(menu_state.current_menu) + (1 if menu_state.show_save_option else 0)
|
||||
|
||||
# Determine the available height for the menu
|
||||
max_menu_height = curses.LINES
|
||||
@@ -60,18 +64,18 @@ def display_menu(current_menu, menu_path, selected_index, show_save_option, help
|
||||
menu_win.border()
|
||||
menu_win.keypad(True)
|
||||
|
||||
menu_pad = curses.newpad(len(current_menu) + 1, width - 8)
|
||||
menu_pad = curses.newpad(len(menu_state.current_menu) + 1, width - 8)
|
||||
menu_pad.bkgd(get_color("background"))
|
||||
|
||||
header = " > ".join(word.title() for word in menu_path)
|
||||
header = " > ".join(word.title() for word in menu_state.menu_path)
|
||||
if len(header) > width - 4:
|
||||
header = header[:width - 7] + "..."
|
||||
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
|
||||
|
||||
transformed_path = transform_menu_path(menu_path)
|
||||
transformed_path = transform_menu_path(menu_state.menu_path)
|
||||
|
||||
for idx, option in enumerate(current_menu):
|
||||
field_info = current_menu[option]
|
||||
for idx, option in enumerate(menu_state.current_menu):
|
||||
field_info = menu_state.current_menu[option]
|
||||
current_value = field_info[1] if isinstance(field_info, tuple) else ""
|
||||
full_key = '.'.join(transformed_path + [option])
|
||||
display_name = field_mapping.get(full_key, option)
|
||||
@@ -80,46 +84,64 @@ def display_menu(current_menu, menu_path, selected_index, show_save_option, help
|
||||
display_value = f"{current_value}"[:width // 2 - 4]
|
||||
|
||||
try:
|
||||
color = get_color("settings_sensitive" if option in sensitive_settings else "settings_default", reverse=(idx == selected_index))
|
||||
color = get_color("settings_sensitive" if option in sensitive_settings else "settings_default", reverse=(idx == menu_state.selected_index))
|
||||
menu_pad.addstr(idx, 0, f"{display_option:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
if show_save_option:
|
||||
if menu_state.show_save_option:
|
||||
save_position = menu_height - 2
|
||||
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, get_color("settings_save", reverse=(selected_index == len(current_menu))))
|
||||
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, get_color("settings_save", reverse=(menu_state.selected_index == len(menu_state.current_menu))))
|
||||
|
||||
# Draw help window with dynamically updated max_help_lines
|
||||
draw_help_window(start_y, start_x, menu_height, max_help_lines, current_menu, selected_index, transformed_path)
|
||||
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path, menu_state)
|
||||
|
||||
menu_win.refresh()
|
||||
menu_pad.refresh(
|
||||
start_index[-1], 0,
|
||||
menu_state.start_index[-1], 0,
|
||||
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
|
||||
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0),
|
||||
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 8
|
||||
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0),
|
||||
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4
|
||||
)
|
||||
|
||||
max_index = num_items + (1 if show_save_option else 0) - 1
|
||||
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0)
|
||||
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
|
||||
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
|
||||
|
||||
draw_arrows(menu_win, visible_height, max_index, start_index, show_save_option)
|
||||
draw_arrows(menu_win, visible_height, max_index, menu_state)
|
||||
|
||||
return menu_win, menu_pad
|
||||
|
||||
|
||||
def draw_help_window(menu_start_y, menu_start_x, menu_height, max_help_lines, current_menu, selected_index, transformed_path):
|
||||
def draw_help_window(
|
||||
menu_start_y: int,
|
||||
menu_start_x: int,
|
||||
menu_height: int,
|
||||
max_help_lines: int,
|
||||
transformed_path: list[str],
|
||||
menu_state: MenuState
|
||||
) -> None:
|
||||
|
||||
global help_win
|
||||
|
||||
if 'help_win' not in globals():
|
||||
help_win = None # Initialize if it does not exist
|
||||
|
||||
selected_option = list(current_menu.keys())[selected_index] if current_menu else None
|
||||
selected_option = list(menu_state.current_menu.keys())[menu_state.selected_index] if menu_state.current_menu else None
|
||||
help_y = menu_start_y + menu_height
|
||||
|
||||
help_win = update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_start_x)
|
||||
|
||||
def update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, help_x):
|
||||
def update_help_window(
|
||||
help_win: object, # curses window or None
|
||||
help_text: dict[str, str],
|
||||
transformed_path: list[str],
|
||||
selected_option: str | None,
|
||||
max_help_lines: int,
|
||||
width: int,
|
||||
help_y: int,
|
||||
help_x: int
|
||||
) -> object: # returns a curses window
|
||||
|
||||
"""Handles rendering the help window consistently."""
|
||||
wrapped_help = get_wrapped_help_text(help_text, transformed_path, selected_option, width, max_help_lines)
|
||||
|
||||
@@ -157,7 +179,13 @@ def update_help_window(help_win, help_text, transformed_path, selected_option, m
|
||||
return help_win
|
||||
|
||||
|
||||
def get_wrapped_help_text(help_text, transformed_path, selected_option, width, max_lines):
|
||||
def get_wrapped_help_text(
|
||||
help_text: dict[str, str],
|
||||
transformed_path: list[str],
|
||||
selected_option: str | None,
|
||||
width: int,
|
||||
max_lines: int
|
||||
) -> list[WrappedLine]:
|
||||
"""Fetches and formats help text for display, ensuring it fits within the allowed lines."""
|
||||
|
||||
full_help_key = '.'.join(transformed_path + [selected_option]) if selected_option else None
|
||||
@@ -176,7 +204,7 @@ def get_wrapped_help_text(help_text, transformed_path, selected_option, width, m
|
||||
r'\\033\[4m(.*?)\\033\[0m': ('settings_default', False, True) # Underline
|
||||
}
|
||||
|
||||
def extract_ansi_segments(text):
|
||||
def extract_ansi_segments(text: str) -> list[Segment]:
|
||||
"""Extracts and replaces ANSI color codes, ensuring spaces are preserved."""
|
||||
matches = []
|
||||
last_pos = 0
|
||||
@@ -206,7 +234,7 @@ def get_wrapped_help_text(help_text, transformed_path, selected_option, width, m
|
||||
|
||||
return matches
|
||||
|
||||
def wrap_ansi_text(segments, wrap_width):
|
||||
def wrap_ansi_text(segments: list[Segment], wrap_width: int) -> list[WrappedLine]:
|
||||
"""Wraps text while preserving ANSI formatting and spaces."""
|
||||
wrapped_lines = []
|
||||
line_buffer = []
|
||||
@@ -249,116 +277,131 @@ def get_wrapped_help_text(help_text, transformed_path, selected_option, width, m
|
||||
return wrapped_help
|
||||
|
||||
|
||||
def move_highlight(old_idx, new_idx, options, show_save_option, menu_win, menu_pad, help_win, help_text, menu_path, max_help_lines):
|
||||
if old_idx == new_idx: # No-op
|
||||
def move_highlight(
|
||||
old_idx: int,
|
||||
options: list[str],
|
||||
menu_win: object,
|
||||
menu_pad: object,
|
||||
help_win: object,
|
||||
help_text: dict[str, str],
|
||||
max_help_lines: int,
|
||||
menu_state: MenuState
|
||||
) -> None:
|
||||
|
||||
if old_idx == menu_state.selected_index: # No-op
|
||||
return
|
||||
|
||||
max_index = len(options) + (1 if show_save_option else 0) - 1
|
||||
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0)
|
||||
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
|
||||
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
|
||||
|
||||
# Adjust start_index only when moving out of visible range
|
||||
if new_idx == max_index and show_save_option:
|
||||
# Adjust menu_state.start_index only when moving out of visible range
|
||||
if menu_state.selected_index == max_index and menu_state.show_save_option:
|
||||
pass
|
||||
elif new_idx < start_index[-1]: # Moving above the visible area
|
||||
start_index[-1] = new_idx
|
||||
elif new_idx >= start_index[-1] + visible_height: # Moving below the visible area
|
||||
start_index[-1] = new_idx - visible_height
|
||||
elif menu_state.selected_index < menu_state.start_index[-1]: # Moving above the visible area
|
||||
menu_state.start_index[-1] = menu_state.selected_index
|
||||
elif menu_state.selected_index >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
|
||||
menu_state.start_index[-1] = menu_state.selected_index - visible_height
|
||||
pass
|
||||
|
||||
# Ensure start_index is within bounds
|
||||
start_index[-1] = max(0, min(start_index[-1], max_index - visible_height + 1))
|
||||
# Ensure menu_state.start_index is within bounds
|
||||
menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
|
||||
|
||||
# Clear old selection
|
||||
if show_save_option and old_idx == max_index:
|
||||
if menu_state.show_save_option and old_idx == max_index:
|
||||
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
|
||||
else:
|
||||
menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
|
||||
|
||||
# Highlight new selection
|
||||
if show_save_option and new_idx == max_index:
|
||||
if menu_state.show_save_option and menu_state.selected_index == max_index:
|
||||
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
|
||||
else:
|
||||
menu_pad.chgat(new_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[new_idx] in sensitive_settings else get_color("settings_default", reverse=True))
|
||||
menu_pad.chgat(menu_state.selected_index, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[menu_state.selected_index] in sensitive_settings else get_color("settings_default", reverse=True))
|
||||
|
||||
menu_win.refresh()
|
||||
|
||||
# Refresh pad only if scrolling is needed
|
||||
menu_pad.refresh(start_index[-1], 0,
|
||||
menu_pad.refresh(menu_state.start_index[-1], 0,
|
||||
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
|
||||
menu_win.getbegyx()[0] + 3 + visible_height,
|
||||
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
|
||||
|
||||
# Update help window
|
||||
transformed_path = transform_menu_path(menu_path)
|
||||
selected_option = options[new_idx] if new_idx < len(options) else None
|
||||
transformed_path = transform_menu_path(menu_state.menu_path)
|
||||
selected_option = options[menu_state.selected_index] if menu_state.selected_index < len(options) else None
|
||||
help_y = menu_win.getbegyx()[0] + menu_win.getmaxyx()[0]
|
||||
help_win = update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_win.getbegyx()[1])
|
||||
|
||||
draw_arrows(menu_win, visible_height, max_index, start_index, show_save_option)
|
||||
draw_arrows(menu_win, visible_height, max_index, menu_state)
|
||||
|
||||
|
||||
def draw_arrows(win, visible_height, max_index, start_index, show_save_option):
|
||||
def draw_arrows(
|
||||
win: object,
|
||||
visible_height: int,
|
||||
max_index: int,
|
||||
menu_state: MenuState
|
||||
) -> None:
|
||||
|
||||
# vh = visible_height + (1 if show_save_option else 0)
|
||||
mi = max_index - (2 if show_save_option else 0)
|
||||
mi = max_index - (2 if menu_state.show_save_option else 0)
|
||||
|
||||
if visible_height < mi:
|
||||
if start_index[-1] > 0:
|
||||
if menu_state.start_index[-1] > 0:
|
||||
win.addstr(3, 2, "▲", get_color("settings_default"))
|
||||
else:
|
||||
win.addstr(3, 2, " ", get_color("settings_default"))
|
||||
|
||||
if mi - start_index[-1] >= visible_height + (0 if show_save_option else 1) :
|
||||
if mi - menu_state.start_index[-1] >= visible_height + (0 if menu_state.show_save_option else 1) :
|
||||
win.addstr(visible_height + 3, 2, "▼", get_color("settings_default"))
|
||||
else:
|
||||
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))
|
||||
|
||||
|
||||
def settings_menu(stdscr, interface):
|
||||
def settings_menu(stdscr: object, interface: object) -> None:
|
||||
curses.update_lines_cols()
|
||||
|
||||
menu = generate_menu_from_protobuf(interface)
|
||||
current_menu = menu["Main Menu"]
|
||||
menu_path = ["Main Menu"]
|
||||
menu_index = []
|
||||
selected_index = 0
|
||||
menu_state.current_menu = menu["Main Menu"]
|
||||
menu_state.menu_path = ["Main Menu"]
|
||||
|
||||
|
||||
modified_settings = {}
|
||||
|
||||
need_redraw = True
|
||||
show_save_option = False
|
||||
menu_state.show_save_option = False
|
||||
|
||||
while True:
|
||||
if(need_redraw):
|
||||
options = list(current_menu.keys())
|
||||
options = list(menu_state.current_menu.keys())
|
||||
|
||||
show_save_option = (
|
||||
len(menu_path) > 2 and ("Radio Settings" in menu_path or "Module Settings" in menu_path)
|
||||
menu_state.show_save_option = (
|
||||
len(menu_state.menu_path) > 2 and ("Radio Settings" in menu_state.menu_path or "Module Settings" in menu_state.menu_path)
|
||||
) or (
|
||||
len(menu_path) == 2 and "User Settings" in menu_path
|
||||
len(menu_state.menu_path) == 2 and "User Settings" in menu_state.menu_path
|
||||
) or (
|
||||
len(menu_path) == 3 and "Channels" in menu_path
|
||||
len(menu_state.menu_path) == 3 and "Channels" in menu_state.menu_path
|
||||
)
|
||||
|
||||
# Display the menu
|
||||
menu_win, menu_pad = display_menu(current_menu, menu_path, selected_index, show_save_option, help_text)
|
||||
menu_win, menu_pad = display_menu(menu_state)
|
||||
|
||||
need_redraw = False
|
||||
|
||||
# Capture user input
|
||||
key = menu_win.getch()
|
||||
|
||||
max_index = len(options) + (1 if show_save_option else 0) - 1
|
||||
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
|
||||
# max_help_lines = 4
|
||||
|
||||
if key == curses.KEY_UP:
|
||||
old_selected_index = selected_index
|
||||
selected_index = max_index if selected_index == 0 else selected_index - 1
|
||||
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad, help_win, help_text, menu_path,max_help_lines)
|
||||
old_selected_index = menu_state.selected_index
|
||||
menu_state.selected_index = max_index if menu_state.selected_index == 0 else menu_state.selected_index - 1
|
||||
move_highlight(old_selected_index, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state)
|
||||
|
||||
elif key == curses.KEY_DOWN:
|
||||
old_selected_index = selected_index
|
||||
selected_index = 0 if selected_index == max_index else selected_index + 1
|
||||
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad, help_win, help_text, menu_path, max_help_lines)
|
||||
old_selected_index = menu_state.selected_index
|
||||
menu_state.selected_index = 0 if menu_state.selected_index == max_index else menu_state.selected_index + 1
|
||||
move_highlight(old_selected_index, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state)
|
||||
|
||||
elif key == curses.KEY_RESIZE:
|
||||
need_redraw = True
|
||||
@@ -370,36 +413,36 @@ def settings_menu(stdscr, interface):
|
||||
menu_win.refresh()
|
||||
help_win.refresh()
|
||||
|
||||
elif key == ord("\t") and show_save_option:
|
||||
old_selected_index = selected_index
|
||||
selected_index = max_index
|
||||
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad, help_win, help_text, menu_path, max_help_lines)
|
||||
elif key == ord("\t") and menu_state.show_save_option:
|
||||
old_selected_index = menu_state.selected_index
|
||||
menu_state.selected_index = max_index
|
||||
move_highlight(old_selected_index, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state)
|
||||
|
||||
elif key == curses.KEY_RIGHT or key == ord('\n'):
|
||||
need_redraw = True
|
||||
start_index.append(0)
|
||||
menu_state.start_index.append(0)
|
||||
menu_win.erase()
|
||||
help_win.erase()
|
||||
|
||||
# draw_help_window(menu_win.getbegyx()[0], menu_win.getbegyx()[1], menu_win.getmaxyx()[0], max_help_lines, current_menu, selected_index, transform_menu_path(menu_path))
|
||||
# draw_help_window(menu_win.getbegyx()[0], menu_win.getbegyx()[1], menu_win.getmaxyx()[0], max_help_lines, menu_state.current_menu, selected_index, transform_menu_path(menu_state.menu_path))
|
||||
|
||||
menu_win.refresh()
|
||||
help_win.refresh()
|
||||
|
||||
if show_save_option and selected_index == len(options):
|
||||
save_changes(interface, menu_path, modified_settings)
|
||||
if menu_state.show_save_option and menu_state.selected_index == len(options):
|
||||
save_changes(interface, modified_settings, menu_state)
|
||||
modified_settings.clear()
|
||||
logging.info("Changes Saved")
|
||||
|
||||
if len(menu_path) > 1:
|
||||
menu_path.pop()
|
||||
current_menu = menu["Main Menu"]
|
||||
for step in menu_path[1:]:
|
||||
current_menu = current_menu.get(step, {})
|
||||
selected_index = 0
|
||||
if len(menu_state.menu_path) > 1:
|
||||
menu_state.menu_path.pop()
|
||||
menu_state.current_menu = menu["Main Menu"]
|
||||
for step in menu_state.menu_path[1:]:
|
||||
menu_state.current_menu = menu_state.current_menu.get(step, {})
|
||||
menu_state.selected_index = 0
|
||||
continue
|
||||
|
||||
selected_option = options[selected_index]
|
||||
selected_option = options[menu_state.selected_index]
|
||||
|
||||
if selected_option == "Exit":
|
||||
break
|
||||
@@ -408,7 +451,7 @@ def settings_menu(stdscr, interface):
|
||||
filename = get_text_input("Enter a filename for the config file")
|
||||
if not filename:
|
||||
logging.info("Export aborted: No filename provided.")
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue # Go back to the menu
|
||||
if not filename.lower().endswith(".yaml"):
|
||||
filename += ".yaml"
|
||||
@@ -421,14 +464,14 @@ def settings_menu(stdscr, interface):
|
||||
overwrite = get_list_input(f"{filename} already exists. Overwrite?", None, ["Yes", "No"])
|
||||
if overwrite == "No":
|
||||
logging.info("Export cancelled: User chose not to overwrite.")
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue # Return to menu
|
||||
os.makedirs(os.path.dirname(yaml_file_path), exist_ok=True)
|
||||
with open(yaml_file_path, "w", encoding="utf-8") as file:
|
||||
file.write(config_text)
|
||||
logging.info(f"Config file saved to {yaml_file_path}")
|
||||
dialog(stdscr, "Config File Saved:", yaml_file_path)
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
except PermissionError:
|
||||
logging.error(f"Permission denied: Unable to write to {yaml_file_path}")
|
||||
@@ -436,7 +479,7 @@ def settings_menu(stdscr, interface):
|
||||
logging.error(f"OS error while saving config: {e}")
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error: {e}")
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
|
||||
elif selected_option == "Load Config File":
|
||||
@@ -459,7 +502,7 @@ def settings_menu(stdscr, interface):
|
||||
overwrite = get_list_input(f"Are you sure you want to load {filename}?", None, ["Yes", "No"])
|
||||
if overwrite == "Yes":
|
||||
config_import(interface, file_path)
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
|
||||
elif selected_option == "Config URL":
|
||||
@@ -471,7 +514,7 @@ def settings_menu(stdscr, interface):
|
||||
if overwrite == "Yes":
|
||||
interface.localNode.setURL(new_value)
|
||||
logging.info(f"New Config URL sent to node")
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
|
||||
elif selected_option == "Reboot":
|
||||
@@ -479,7 +522,7 @@ def settings_menu(stdscr, interface):
|
||||
if confirmation == "Yes":
|
||||
interface.localNode.reboot()
|
||||
logging.info(f"Node Reboot Requested by menu")
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
|
||||
elif selected_option == "Reset Node DB":
|
||||
@@ -487,7 +530,7 @@ def settings_menu(stdscr, interface):
|
||||
if confirmation == "Yes":
|
||||
interface.localNode.resetNodeDb()
|
||||
logging.info(f"Node DB Reset Requested by menu")
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
|
||||
elif selected_option == "Shutdown":
|
||||
@@ -495,7 +538,7 @@ def settings_menu(stdscr, interface):
|
||||
if confirmation == "Yes":
|
||||
interface.localNode.shutdown()
|
||||
logging.info(f"Node Shutdown Requested by menu")
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
|
||||
elif selected_option == "Factory Reset":
|
||||
@@ -503,22 +546,28 @@ def settings_menu(stdscr, interface):
|
||||
if confirmation == "Yes":
|
||||
interface.localNode.factoryReset()
|
||||
logging.info(f"Factory Reset Requested by menu")
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
continue
|
||||
|
||||
elif selected_option == "App Settings":
|
||||
menu_win.clear()
|
||||
menu_win.refresh()
|
||||
json_editor(stdscr) # Open the App Settings menu
|
||||
menu_state.menu_path.append("App Settings")
|
||||
menu_state.menu_index.append(menu_state.selected_index)
|
||||
json_editor(stdscr, menu_state) # Open the App Settings menu
|
||||
menu_state.current_menu = menu["Main Menu"]
|
||||
menu_state.menu_path = ["Main Menu"]
|
||||
menu_state.start_index.pop()
|
||||
menu_state.selected_index = 4
|
||||
continue
|
||||
# need_redraw = True
|
||||
|
||||
field_info = current_menu.get(selected_option)
|
||||
field_info = menu_state.current_menu.get(selected_option)
|
||||
if isinstance(field_info, tuple):
|
||||
field, current_value = field_info
|
||||
|
||||
# Transform the menu path to get the full key
|
||||
transformed_path = transform_menu_path(menu_path)
|
||||
transformed_path = transform_menu_path(menu_state.menu_path)
|
||||
full_key = '.'.join(transformed_path + [selected_option])
|
||||
|
||||
# Fetch human-readable name from field_mapping
|
||||
@@ -528,70 +577,73 @@ def settings_menu(stdscr, interface):
|
||||
if selected_option in ['longName', 'shortName']:
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = current_value if new_value is None else new_value
|
||||
current_menu[selected_option] = (field, new_value)
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
|
||||
elif selected_option == 'isLicensed':
|
||||
new_value = get_list_input(f"{human_readable_name} is currently: {current_value}", str(current_value), ["True", "False"])
|
||||
new_value = new_value == "True"
|
||||
current_menu[selected_option] = (field, new_value)
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
|
||||
for option, (field, value) in current_menu.items():
|
||||
for option, (field, value) in menu_state.current_menu.items():
|
||||
modified_settings[option] = value
|
||||
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif selected_option in ['latitude', 'longitude', 'altitude']:
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = current_value if new_value is None else new_value
|
||||
current_menu[selected_option] = (field, new_value)
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
|
||||
for option in ['latitude', 'longitude', 'altitude']:
|
||||
if option in current_menu:
|
||||
modified_settings[option] = current_menu[option][1]
|
||||
if option in menu_state.current_menu:
|
||||
modified_settings[option] = menu_state.current_menu[option][1]
|
||||
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif selected_option == "admin_key":
|
||||
new_values = get_admin_key_input(current_value)
|
||||
new_value = current_value if new_values is None else [base64.b64decode(key) for key in new_values]
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 8: # Handle boolean type
|
||||
new_value = get_list_input(human_readable_name, str(current_value), ["True", "False"])
|
||||
new_value = new_value == "True" or new_value is True
|
||||
start_index.pop()
|
||||
new_value = get_list_input(human_readable_name, str(current_value), ["True", "False"])
|
||||
if new_value == "Not Set":
|
||||
pass # Leave it as-is
|
||||
else:
|
||||
new_value = new_value == "True" or new_value is True
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
|
||||
new_value = get_repeated_input(current_value)
|
||||
new_value = current_value if new_value is None else new_value.split(", ")
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.enum_type: # Enum field
|
||||
enum_options = {v.name: v.number for v in field.enum_type.values}
|
||||
new_value_name = get_list_input(human_readable_name, current_value, list(enum_options.keys()))
|
||||
new_value = enum_options.get(new_value_name, current_value)
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 7: # Field type 7 corresponds to FIXED32
|
||||
new_value = get_fixed32_input(current_value)
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 13: # Field type 13 corresponds to UINT32
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = current_value if new_value is None else int(new_value)
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.type == 2: # Field type 13 corresponds to INT64
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = current_value if new_value is None else float(new_value)
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
else: # Handle other field types
|
||||
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
|
||||
new_value = current_value if new_value is None else new_value
|
||||
start_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
for key in menu_path[3:]: # Skip "Main Menu"
|
||||
for key in menu_state.menu_path[3:]: # Skip "Main Menu"
|
||||
modified_settings = modified_settings.setdefault(key, {})
|
||||
|
||||
# Add the new value to the appropriate level
|
||||
@@ -602,12 +654,12 @@ def settings_menu(stdscr, interface):
|
||||
enum_value_descriptor = field.enum_type.values_by_number.get(new_value)
|
||||
new_value = enum_value_descriptor.name if enum_value_descriptor else new_value
|
||||
|
||||
current_menu[selected_option] = (field, new_value)
|
||||
menu_state.current_menu[selected_option] = (field, new_value)
|
||||
else:
|
||||
current_menu = current_menu[selected_option]
|
||||
menu_path.append(selected_option)
|
||||
menu_index.append(selected_index)
|
||||
selected_index = 0
|
||||
menu_state.current_menu = menu_state.current_menu[selected_option]
|
||||
menu_state.menu_path.append(selected_option)
|
||||
menu_state.menu_index.append(menu_state.selected_index)
|
||||
menu_state.selected_index = 0
|
||||
|
||||
|
||||
elif key == curses.KEY_LEFT:
|
||||
@@ -617,29 +669,29 @@ def settings_menu(stdscr, interface):
|
||||
help_win.erase()
|
||||
|
||||
# max_help_lines = 4
|
||||
# draw_help_window(menu_win.getbegyx()[0], menu_win.getbegyx()[1], menu_win.getmaxyx()[0], max_help_lines, current_menu, selected_index, transform_menu_path(menu_path))
|
||||
# draw_help_window(menu_win.getbegyx()[0], menu_win.getbegyx()[1], menu_win.getmaxyx()[0], max_help_lines, menu_state.current_menu, selected_index, transform_menu_path(menu_state.menu_path))
|
||||
|
||||
menu_win.refresh()
|
||||
help_win.refresh()
|
||||
|
||||
if len(menu_path) < 2:
|
||||
if len(menu_state.menu_path) < 2:
|
||||
modified_settings.clear()
|
||||
|
||||
# Navigate back to the previous menu
|
||||
if len(menu_path) > 1:
|
||||
menu_path.pop()
|
||||
current_menu = menu["Main Menu"]
|
||||
for step in menu_path[1:]:
|
||||
current_menu = current_menu.get(step, {})
|
||||
selected_index = menu_index.pop()
|
||||
start_index.pop()
|
||||
if len(menu_state.menu_path) > 1:
|
||||
menu_state.menu_path.pop()
|
||||
menu_state.current_menu = menu["Main Menu"]
|
||||
for step in menu_state.menu_path[1:]:
|
||||
menu_state.current_menu = menu_state.current_menu.get(step, {})
|
||||
menu_state.selected_index = menu_state.menu_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif key == 27: # Escape key
|
||||
menu_win.erase()
|
||||
menu_win.refresh()
|
||||
break
|
||||
|
||||
def set_region(interface):
|
||||
def set_region(interface: object) -> None:
|
||||
node = interface.getNode('^local')
|
||||
device_config = node.localConfig
|
||||
lora_descriptor = device_config.lora.DESCRIPTOR
|
||||
@@ -1,5 +1,5 @@
|
||||
import logging
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
# Get the parent directory of the script
|
||||
@@ -11,11 +11,11 @@ json_file_path = os.path.join(parent_dir, "config.json")
|
||||
log_file_path = os.path.join(parent_dir, "client.log")
|
||||
db_file_path = os.path.join(parent_dir, "client.db")
|
||||
|
||||
def format_json_single_line_arrays(data, indent=4):
|
||||
def format_json_single_line_arrays(data: dict[str, object], indent: int = 4) -> str:
|
||||
"""
|
||||
Formats JSON with arrays on a single line while keeping other elements properly indented.
|
||||
"""
|
||||
def format_value(value, current_indent):
|
||||
def format_value(value: object, current_indent: int) -> str:
|
||||
if isinstance(value, dict):
|
||||
items = []
|
||||
for key, val in value.items():
|
||||
@@ -31,7 +31,7 @@ def format_json_single_line_arrays(data, indent=4):
|
||||
return format_value(data, indent)
|
||||
|
||||
# Recursive function to check and update nested dictionaries
|
||||
def update_dict(default, actual):
|
||||
def update_dict(default: dict[str, object], actual: dict[str, object]) -> bool:
|
||||
updated = False
|
||||
for key, value in default.items():
|
||||
if key not in actual:
|
||||
@@ -42,7 +42,7 @@ def update_dict(default, actual):
|
||||
updated = update_dict(value, actual[key]) or updated
|
||||
return updated
|
||||
|
||||
def initialize_config():
|
||||
def initialize_config() -> dict[str, object]:
|
||||
COLOR_CONFIG_DARK = {
|
||||
"default": ["white", "black"],
|
||||
"background": [" ", "black"],
|
||||
@@ -65,7 +65,9 @@ def initialize_config():
|
||||
"settings_save": ["green", "black"],
|
||||
"settings_breadcrumbs": ["white", "black"],
|
||||
"settings_warning": ["red", "black"],
|
||||
"settings_note": ["green", "black"]
|
||||
"settings_note": ["green", "black"],
|
||||
"node_favorite": ["green", "black"],
|
||||
"node_ignored": ["red", "black"]
|
||||
}
|
||||
COLOR_CONFIG_LIGHT = {
|
||||
"default": ["black", "white"],
|
||||
@@ -89,7 +91,9 @@ def initialize_config():
|
||||
"settings_save": ["green", "white"],
|
||||
"settings_breadcrumbs": ["black", "white"],
|
||||
"settings_warning": ["red", "white"],
|
||||
"settings_note": ["green", "white"]
|
||||
"settings_note": ["green", "white"],
|
||||
"node_favorite": ["green", "white"],
|
||||
"node_ignored": ["red", "white"]
|
||||
}
|
||||
COLOR_CONFIG_GREEN = {
|
||||
"default": ["green", "black"],
|
||||
@@ -115,7 +119,9 @@ def initialize_config():
|
||||
"settings_save": ["green", "black"],
|
||||
"settings_breadcrumbs": ["green", "black"],
|
||||
"settings_warning": ["green", "black"],
|
||||
"settings_note": ["green", "black"]
|
||||
"settings_note": ["green", "black"],
|
||||
"node_favorite": ["cyan", "white"],
|
||||
"node_ignored": ["red", "white"]
|
||||
}
|
||||
default_config_variables = {
|
||||
"db_file_path": db_file_path,
|
||||
@@ -155,7 +161,7 @@ def initialize_config():
|
||||
|
||||
return loaded_config
|
||||
|
||||
def assign_config_variables(loaded_config):
|
||||
def assign_config_variables(loaded_config: dict[str, object]) -> None:
|
||||
# Assign values to local variables
|
||||
|
||||
global db_file_path, log_file_path, message_prefix, sent_message_prefix
|
||||
@@ -1,7 +1,7 @@
|
||||
import curses
|
||||
from ui.colors import get_color
|
||||
from contact.ui.colors import get_color
|
||||
|
||||
def dialog(stdscr, title, message):
|
||||
def dialog(stdscr: curses.window, title: str, message: str) -> None:
|
||||
height, width = stdscr.getmaxyx()
|
||||
|
||||
# Calculate dialog dimensions
|
||||
@@ -1,19 +1,27 @@
|
||||
from collections import OrderedDict
|
||||
from meshtastic.protobuf import config_pb2, module_config_pb2, channel_pb2
|
||||
import logging
|
||||
import base64
|
||||
import logging
|
||||
import os
|
||||
from collections import OrderedDict
|
||||
|
||||
from typing import Any
|
||||
|
||||
from google.protobuf.message import Message
|
||||
from meshtastic.protobuf import channel_pb2, config_pb2, module_config_pb2
|
||||
|
||||
|
||||
locals_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
translation_file = os.path.join(locals_dir, "localisations", "en.ini")
|
||||
|
||||
def encode_if_bytes(value):
|
||||
def encode_if_bytes(value: Any) -> str:
|
||||
"""Encode byte values to base64 string."""
|
||||
if isinstance(value, bytes):
|
||||
return base64.b64encode(value).decode('utf-8')
|
||||
return value
|
||||
|
||||
def extract_fields(message_instance, current_config=None):
|
||||
def extract_fields(
|
||||
message_instance: Message,
|
||||
current_config: Message | dict[str, Any] | None = None
|
||||
) -> dict[str, Any]:
|
||||
if isinstance(current_config, dict): # Handle dictionaries
|
||||
return {key: (None, encode_if_bytes(current_config.get(key, "Not Set"))) for key in current_config}
|
||||
|
||||
@@ -47,7 +55,10 @@ def extract_fields(message_instance, current_config=None):
|
||||
menu[field.name] = (field, encode_if_bytes(current_value))
|
||||
return menu
|
||||
|
||||
def generate_menu_from_protobuf(interface):
|
||||
def generate_menu_from_protobuf(interface: object) -> dict[str, Any]:
|
||||
"""
|
||||
Builds the full settings menu structure from the protobuf definitions.
|
||||
"""
|
||||
menu_structure = {"Main Menu": {}}
|
||||
|
||||
# Add User Settings
|
||||
@@ -1,7 +1,8 @@
|
||||
import curses
|
||||
from ui.colors import get_color
|
||||
from contact.ui.colors import get_color
|
||||
|
||||
def draw_splash(stdscr):
|
||||
def draw_splash(stdscr: object) -> None:
|
||||
"""Draw the splash screen with a logo and connecting message."""
|
||||
curses.curs_set(0)
|
||||
|
||||
stdscr.clear()
|
||||
10
contact/ui/ui_state.py
Normal file
10
contact/ui/ui_state.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from typing import Any
|
||||
|
||||
class MenuState:
|
||||
def __init__(self):
|
||||
self.menu_index: list[int]= [] # Row we left the previous menus
|
||||
self.start_index: list[int] = [0] # Row to start the menu if it doesn't all fit
|
||||
self.selected_index: int = 0 # Selected Row
|
||||
self.current_menu: dict[str, Any] | list[Any] | str | int = {} # Contents of the current menu
|
||||
self.menu_path: list[str] = [] # Menu Path
|
||||
self.show_save_option: bool = False # Display 'Save'
|
||||
385
contact/ui/user_config.py
Normal file
385
contact/ui/user_config.py
Normal file
@@ -0,0 +1,385 @@
|
||||
import os
|
||||
import json
|
||||
import curses
|
||||
from typing import Any
|
||||
from contact.ui.colors import get_color, setup_colors, COLOR_MAP
|
||||
from contact.ui.default_config import format_json_single_line_arrays, loaded_config
|
||||
from contact.utilities.input_handlers import get_list_input
|
||||
|
||||
width = 80
|
||||
save_option = "Save Changes"
|
||||
sensitive_settings = []
|
||||
|
||||
def edit_color_pair(key: str, current_value: list[str]) -> list[str]:
|
||||
"""
|
||||
Allows the user to select a foreground and background color for a key.
|
||||
"""
|
||||
color_list = [" "] + list(COLOR_MAP.keys())
|
||||
fg_color = get_list_input(f"Select Foreground Color for {key}", current_value[0], color_list)
|
||||
bg_color = get_list_input(f"Select Background Color for {key}", current_value[1], color_list)
|
||||
|
||||
return [fg_color, bg_color]
|
||||
|
||||
def edit_value(key: str, current_value: str) -> str:
|
||||
|
||||
height = 10
|
||||
input_width = width - 16 # Allow space for "New Value: "
|
||||
start_y = (curses.LINES - height) // 2
|
||||
start_x = (curses.COLS - width) // 2
|
||||
|
||||
# Create a centered window
|
||||
edit_win = curses.newwin(height, width, start_y, start_x)
|
||||
edit_win.bkgd(get_color("background"))
|
||||
edit_win.attrset(get_color("window_frame"))
|
||||
edit_win.border()
|
||||
|
||||
# Display instructions
|
||||
edit_win.addstr(1, 2, f"Editing {key}", get_color("settings_default", bold=True))
|
||||
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
|
||||
|
||||
wrap_width = width - 4 # Account for border and padding
|
||||
wrapped_lines = [current_value[i:i+wrap_width] for i in range(0, len(current_value), wrap_width)]
|
||||
|
||||
for i, line in enumerate(wrapped_lines[:4]): # Limit display to fit the window height
|
||||
edit_win.addstr(4 + i, 2, line, get_color("settings_default"))
|
||||
|
||||
edit_win.refresh()
|
||||
|
||||
# Handle theme selection dynamically
|
||||
if key == "theme":
|
||||
# Load theme names dynamically from the JSON
|
||||
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
|
||||
return get_list_input("Select Theme", current_value, theme_options)
|
||||
elif key == "node_sort":
|
||||
sort_options = ['lastHeard', 'name', 'hops']
|
||||
return get_list_input("Sort By", current_value, sort_options)
|
||||
|
||||
# Standard Input Mode (Scrollable)
|
||||
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
|
||||
curses.curs_set(1)
|
||||
|
||||
scroll_offset = 0 # Determines which part of the text is visible
|
||||
user_input = ""
|
||||
input_position = (7, 13) # Tuple for row and column
|
||||
row, col = input_position # Unpack tuple
|
||||
while True:
|
||||
visible_text = user_input[scroll_offset:scroll_offset + input_width] # Only show what fits
|
||||
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
|
||||
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
|
||||
edit_win.refresh()
|
||||
|
||||
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
|
||||
key = edit_win.get_wch()
|
||||
|
||||
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow
|
||||
curses.curs_set(0)
|
||||
return current_value # Exit without returning a value
|
||||
|
||||
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
|
||||
break
|
||||
|
||||
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
|
||||
if user_input: # Only process if there's something to delete
|
||||
user_input = user_input[:-1]
|
||||
if scroll_offset > 0 and len(user_input) < scroll_offset + input_width:
|
||||
scroll_offset -= 1 # Move back if text is shorter than scrolled area
|
||||
else:
|
||||
if isinstance(key, str):
|
||||
user_input += key
|
||||
else:
|
||||
user_input += chr(key)
|
||||
|
||||
if len(user_input) > input_width: # Scroll if input exceeds visible area
|
||||
scroll_offset += 1
|
||||
|
||||
curses.curs_set(0)
|
||||
return user_input if user_input else current_value
|
||||
|
||||
|
||||
def display_menu(menu_state: Any) -> tuple[Any, Any, list[str]]:
|
||||
"""
|
||||
Render the configuration menu with a Save button directly added to the window.
|
||||
"""
|
||||
num_items = len(menu_state.current_menu) + (1 if menu_state.show_save_option else 0)
|
||||
|
||||
# Determine menu items based on the type of current_menu
|
||||
if isinstance(menu_state.current_menu, dict):
|
||||
options = list(menu_state.current_menu.keys())
|
||||
elif isinstance(menu_state.current_menu, list):
|
||||
options = [f"[{i}]" for i in range(len(menu_state.current_menu))]
|
||||
else:
|
||||
options = [] # Fallback in case of unexpected data types
|
||||
|
||||
# Calculate dynamic dimensions for the menu
|
||||
max_menu_height = curses.LINES
|
||||
menu_height = min(max_menu_height, num_items + 5)
|
||||
num_items = len(options)
|
||||
start_y = (curses.LINES - menu_height) // 2
|
||||
start_x = (curses.COLS - width) // 2
|
||||
|
||||
# Create the window
|
||||
menu_win = curses.newwin(menu_height, width, start_y, start_x)
|
||||
menu_win.erase()
|
||||
menu_win.bkgd(get_color("background"))
|
||||
menu_win.attrset(get_color("window_frame"))
|
||||
menu_win.border()
|
||||
menu_win.keypad(True)
|
||||
|
||||
# Create the pad for scrolling
|
||||
menu_pad = curses.newpad(num_items + 1, width - 8)
|
||||
menu_pad.bkgd(get_color("background"))
|
||||
|
||||
# Display the menu path
|
||||
header = " > ".join(menu_state.menu_path)
|
||||
if len(header) > width - 4:
|
||||
header = header[:width - 7] + "..."
|
||||
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
|
||||
|
||||
# Populate the pad with menu options
|
||||
for idx, key in enumerate(options):
|
||||
value = menu_state.current_menu[key] if isinstance(menu_state.current_menu, dict) else menu_state.current_menu[int(key.strip("[]"))]
|
||||
display_key = f"{key}"[:width // 2 - 2]
|
||||
display_value = (
|
||||
f"{value}"[:width // 2 - 8]
|
||||
)
|
||||
|
||||
color = get_color("settings_default", reverse=(idx == menu_state.selected_index))
|
||||
menu_pad.addstr(idx, 0, f"{display_key:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
|
||||
|
||||
# Add Save button to the main window
|
||||
if menu_state.show_save_option:
|
||||
save_position = menu_height - 2
|
||||
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, get_color("settings_save", reverse=(menu_state.selected_index == len(menu_state.current_menu))))
|
||||
|
||||
menu_win.refresh()
|
||||
menu_pad.refresh(
|
||||
menu_state.start_index[-1], 0,
|
||||
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
|
||||
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0),
|
||||
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4
|
||||
)
|
||||
|
||||
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
|
||||
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
|
||||
|
||||
draw_arrows(menu_win, visible_height, max_index, menu_state)
|
||||
|
||||
return menu_win, menu_pad, options
|
||||
|
||||
|
||||
def move_highlight(
|
||||
old_idx: int,
|
||||
options: list[str],
|
||||
menu_win: curses.window,
|
||||
menu_pad: curses.window,
|
||||
menu_state: Any
|
||||
) -> None:
|
||||
|
||||
if old_idx == menu_state.selected_index: # No-op
|
||||
return
|
||||
|
||||
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
|
||||
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
|
||||
|
||||
# Adjust menu_state.start_index only when moving out of visible range
|
||||
if menu_state.selected_index == max_index and menu_state.show_save_option:
|
||||
pass
|
||||
elif menu_state.selected_index < menu_state.start_index[-1]: # Moving above the visible area
|
||||
menu_state.start_index[-1] = menu_state.selected_index
|
||||
elif menu_state.selected_index >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
|
||||
menu_state.start_index[-1] = menu_state.selected_index - visible_height
|
||||
pass
|
||||
|
||||
# Ensure menu_state.start_index is within bounds
|
||||
menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
|
||||
|
||||
# Clear old selection
|
||||
if menu_state.show_save_option and old_idx == max_index:
|
||||
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
|
||||
else:
|
||||
menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
|
||||
|
||||
# Highlight new selection
|
||||
if menu_state.show_save_option and menu_state.selected_index == max_index:
|
||||
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
|
||||
else:
|
||||
menu_pad.chgat(menu_state.selected_index, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[menu_state.selected_index] in sensitive_settings else get_color("settings_default", reverse=True))
|
||||
|
||||
menu_win.refresh()
|
||||
|
||||
# Refresh pad only if scrolling is needed
|
||||
menu_pad.refresh(menu_state.start_index[-1], 0,
|
||||
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
|
||||
menu_win.getbegyx()[0] + 3 + visible_height,
|
||||
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
|
||||
|
||||
draw_arrows(menu_win, visible_height, max_index, menu_state)
|
||||
|
||||
|
||||
def draw_arrows(
|
||||
win: curses.window,
|
||||
visible_height: int,
|
||||
max_index: int,
|
||||
menu_state: any
|
||||
) -> None:
|
||||
|
||||
mi = max_index - (2 if menu_state.show_save_option else 0)
|
||||
|
||||
if visible_height < mi:
|
||||
if menu_state.start_index[-1] > 0:
|
||||
win.addstr(3, 2, "▲", get_color("settings_default"))
|
||||
else:
|
||||
win.addstr(3, 2, " ", get_color("settings_default"))
|
||||
|
||||
if mi - menu_state.start_index[-1] >= visible_height + (0 if menu_state.show_save_option else 1) :
|
||||
win.addstr(visible_height + 3, 2, "▼", get_color("settings_default"))
|
||||
else:
|
||||
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))
|
||||
|
||||
|
||||
def json_editor(stdscr: curses.window, menu_state: Any) -> None:
|
||||
|
||||
menu_state.selected_index = 0 # Track the selected option
|
||||
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
|
||||
file_path = os.path.join(parent_dir, "config.json")
|
||||
|
||||
menu_state.show_save_option = True # Always show the Save button
|
||||
|
||||
# Ensure the file exists
|
||||
if not os.path.exists(file_path):
|
||||
with open(file_path, "w") as f:
|
||||
json.dump({}, f)
|
||||
|
||||
# Load JSON data
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
original_data = json.load(f)
|
||||
|
||||
data = original_data # Reference to the original data
|
||||
menu_state.current_menu = data # Track the current level of the menu
|
||||
|
||||
# Render the menu
|
||||
menu_win, menu_pad, options = display_menu(menu_state)
|
||||
need_redraw = True
|
||||
|
||||
while True:
|
||||
if(need_redraw):
|
||||
menu_win, menu_pad, options = display_menu(menu_state)
|
||||
menu_win.refresh()
|
||||
need_redraw = False
|
||||
|
||||
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
|
||||
key = menu_win.getch()
|
||||
|
||||
if key == curses.KEY_UP:
|
||||
|
||||
old_selected_index = menu_state.selected_index
|
||||
menu_state.selected_index = max_index if menu_state.selected_index == 0 else menu_state.selected_index - 1
|
||||
move_highlight(old_selected_index, options, menu_win, menu_pad, menu_state)
|
||||
|
||||
elif key == curses.KEY_DOWN:
|
||||
|
||||
old_selected_index = menu_state.selected_index
|
||||
menu_state.selected_index = 0 if menu_state.selected_index == max_index else menu_state.selected_index + 1
|
||||
move_highlight(old_selected_index, options, menu_win, menu_pad, menu_state)
|
||||
|
||||
elif key == ord("\t") and menu_state.show_save_option:
|
||||
old_selected_index = menu_state.selected_index
|
||||
menu_state.selected_index = max_index
|
||||
move_highlight(old_selected_index, options, menu_win, menu_pad, menu_state)
|
||||
|
||||
elif key in (curses.KEY_RIGHT, 10, 13): # 10 = \n, 13 = carriage return
|
||||
|
||||
need_redraw = True
|
||||
menu_win.erase()
|
||||
menu_win.refresh()
|
||||
|
||||
if menu_state.selected_index < len(options): # Handle selection of a menu item
|
||||
selected_key = options[menu_state.selected_index]
|
||||
menu_state.menu_path.append(str(selected_key))
|
||||
menu_state.start_index.append(0)
|
||||
menu_state.menu_index.append(menu_state.selected_index)
|
||||
|
||||
# Handle nested data
|
||||
if isinstance(menu_state.current_menu, dict):
|
||||
if selected_key in menu_state.current_menu:
|
||||
selected_data = menu_state.current_menu[selected_key]
|
||||
else:
|
||||
continue # Skip invalid key
|
||||
elif isinstance(menu_state.current_menu, list):
|
||||
selected_data = menu_state.current_menu[int(selected_key.strip("[]"))]
|
||||
|
||||
if isinstance(selected_data, list) and len(selected_data) == 2:
|
||||
# Edit color pair
|
||||
new_value = edit_color_pair(selected_key, selected_data)
|
||||
menu_state.menu_path.pop()
|
||||
menu_state.start_index.pop()
|
||||
menu_state.menu_index.pop()
|
||||
menu_state.current_menu[selected_key] = new_value
|
||||
|
||||
elif isinstance(selected_data, (dict, list)):
|
||||
# Navigate into nested data
|
||||
menu_state.current_menu = selected_data
|
||||
menu_state.selected_index = 0 # Reset the selected index
|
||||
|
||||
else:
|
||||
# General value editing
|
||||
new_value = edit_value(selected_key, selected_data)
|
||||
menu_state.menu_path.pop()
|
||||
menu_state.menu_index.pop()
|
||||
menu_state.start_index.pop()
|
||||
menu_state.current_menu[selected_key] = new_value
|
||||
need_redraw = True
|
||||
|
||||
else:
|
||||
# Save button selected
|
||||
save_json(file_path, data)
|
||||
stdscr.refresh()
|
||||
continue
|
||||
|
||||
elif key in (27, curses.KEY_LEFT): # Escape or Left Arrow
|
||||
|
||||
need_redraw = True
|
||||
menu_win.erase()
|
||||
menu_win.refresh()
|
||||
|
||||
# menu_state.selected_index = menu_state.menu_index[-1]
|
||||
|
||||
# Navigate back in the menu
|
||||
if len(menu_state.menu_path) > 2:
|
||||
menu_state.menu_path.pop()
|
||||
menu_state.start_index.pop()
|
||||
menu_state.current_menu = data
|
||||
|
||||
for path in menu_state.menu_path[2:]:
|
||||
menu_state.current_menu = menu_state.current_menu[path] if isinstance(menu_state.current_menu, dict) else menu_state.current_menu[int(path.strip("[]"))]
|
||||
|
||||
else:
|
||||
# Exit the editor
|
||||
menu_win.clear()
|
||||
menu_win.refresh()
|
||||
|
||||
break
|
||||
|
||||
|
||||
def save_json(file_path: str, data: dict[str, Any]) -> None:
|
||||
formatted_json = format_json_single_line_arrays(data)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(formatted_json)
|
||||
setup_colors(reinit=True)
|
||||
|
||||
def main(stdscr: curses.window) -> None:
|
||||
from contact.ui.ui_state import MenuState
|
||||
|
||||
menu_state = MenuState()
|
||||
if len(menu_state.menu_path) == 0:
|
||||
menu_state.menu_path = ["App Settings"] # Initialize if not set
|
||||
|
||||
curses.curs_set(0)
|
||||
stdscr.keypad(True)
|
||||
setup_colors()
|
||||
json_editor(stdscr, menu_state)
|
||||
|
||||
if __name__ == "__main__":
|
||||
curses.wrapper(main)
|
||||
@@ -1,7 +1,7 @@
|
||||
import argparse
|
||||
from argparse import ArgumentParser
|
||||
|
||||
def setup_parser():
|
||||
parser = argparse.ArgumentParser(
|
||||
def setup_parser() -> ArgumentParser:
|
||||
parser = ArgumentParser(
|
||||
add_help=True,
|
||||
epilog="If no connection arguments are specified, we attempt a serial connection and then a TCP connection to localhost.")
|
||||
|
||||
@@ -33,5 +33,14 @@ def setup_parser():
|
||||
default=None,
|
||||
const="any"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--settings",
|
||||
"--set",
|
||||
"--control",
|
||||
"-c",
|
||||
help="Launch directly into the settings",
|
||||
action="store_true"
|
||||
)
|
||||
|
||||
|
||||
return parser
|
||||
@@ -1,9 +1,8 @@
|
||||
|
||||
import yaml
|
||||
import logging
|
||||
from typing import List
|
||||
from google.protobuf.json_format import MessageToDict
|
||||
from meshtastic import BROADCAST_ADDR, mt_config
|
||||
from meshtastic import mt_config
|
||||
from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
|
||||
|
||||
# defs are from meshtastic/python/main
|
||||
@@ -20,9 +19,9 @@ def traverseConfig(config_root, config, interface_config) -> bool:
|
||||
|
||||
return True
|
||||
|
||||
def splitCompoundName(comp_name: str) -> List[str]:
|
||||
def splitCompoundName(comp_name: str) -> list[str]:
|
||||
"""Split compound (dot separated) preference name into parts"""
|
||||
name: List[str] = comp_name.split(".")
|
||||
name: list[str] = comp_name.split(".")
|
||||
if len(name) < 2:
|
||||
name[0] = comp_name
|
||||
name.append(comp_name)
|
||||
@@ -1,10 +1,11 @@
|
||||
import re
|
||||
|
||||
def parse_ini_file(ini_file_path: str) -> tuple[dict[str, str], dict[str, str]]:
|
||||
"""Parses an INI file and returns a mapping of keys to human-readable names and help text."""
|
||||
|
||||
def parse_ini_file(ini_file_path):
|
||||
field_mapping = {}
|
||||
help_text = {}
|
||||
current_section = None
|
||||
field_mapping: dict[str, str] = {}
|
||||
help_text: dict[str, str] = {}
|
||||
current_section: str | None = None
|
||||
|
||||
with open(ini_file_path, 'r', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
@@ -46,14 +47,14 @@ def parse_ini_file(ini_file_path):
|
||||
|
||||
return field_mapping, help_text
|
||||
|
||||
def transform_menu_path(menu_path):
|
||||
def transform_menu_path(menu_path: list[str]) -> list[str]:
|
||||
"""Applies path replacements and normalizes entries in the menu path."""
|
||||
path_replacements = {
|
||||
"Radio Settings": "config",
|
||||
"Module Settings": "module"
|
||||
}
|
||||
|
||||
transformed_path = []
|
||||
transformed_path: list[str] = []
|
||||
for part in menu_path[1:]: # Skip 'Main Menu'
|
||||
# Apply fixed replacements
|
||||
part = path_replacements.get(part, part)
|
||||
@@ -3,18 +3,19 @@ import time
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from utilities.utils import decimal_to_hex
|
||||
import ui.default_config as config
|
||||
import globals
|
||||
from contact.utilities.utils import decimal_to_hex
|
||||
import contact.ui.default_config as config
|
||||
import contact.globals as globals
|
||||
|
||||
def get_table_name(channel):
|
||||
def get_table_name(channel: str) -> str:
|
||||
# Construct the table name
|
||||
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
|
||||
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
|
||||
return quoted_table_name
|
||||
|
||||
|
||||
def save_message_to_db(channel, user_id, message_text):
|
||||
def save_message_to_db(channel: str, user_id: str, message_text: str) -> int | None:
|
||||
|
||||
"""Save messages to the database, ensuring the table exists."""
|
||||
try:
|
||||
quoted_table_name = get_table_name(channel)
|
||||
@@ -47,7 +48,7 @@ def save_message_to_db(channel, user_id, message_text):
|
||||
logging.error(f"Unexpected error in save_message_to_db: {e}")
|
||||
|
||||
|
||||
def update_ack_nak(channel, timestamp, message, ack):
|
||||
def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None:
|
||||
try:
|
||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||
db_cursor = db_connection.cursor()
|
||||
@@ -69,7 +70,7 @@ def update_ack_nak(channel, timestamp, message, ack):
|
||||
logging.error(f"Unexpected error in update_ack_nak: {e}")
|
||||
|
||||
|
||||
def load_messages_from_db():
|
||||
def load_messages_from_db() -> None:
|
||||
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
|
||||
try:
|
||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||
@@ -142,7 +143,7 @@ def load_messages_from_db():
|
||||
logging.error(f"SQLite error in load_messages_from_db: {e}")
|
||||
|
||||
|
||||
def init_nodedb():
|
||||
def init_nodedb() -> None:
|
||||
"""Initialize the node database and update it with nodes from the interface."""
|
||||
|
||||
try:
|
||||
@@ -172,7 +173,7 @@ def init_nodedb():
|
||||
logging.error(f"Unexpected error in init_nodedb: {e}")
|
||||
|
||||
|
||||
def maybe_store_nodeinfo_in_db(packet):
|
||||
def maybe_store_nodeinfo_in_db(packet: dict[str, object]) -> None:
|
||||
"""Save nodeinfo unless that record is already there, updating if necessary."""
|
||||
try:
|
||||
user_id = packet['from']
|
||||
@@ -190,21 +191,25 @@ def maybe_store_nodeinfo_in_db(packet):
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error in maybe_store_nodeinfo_in_db: {e}")
|
||||
|
||||
def update_node_info_in_db(user_id, long_name=None, short_name=None, hw_model="UNSET", is_licensed=0, role="CLIENT", public_key="", chat_archived=0):
|
||||
def update_node_info_in_db(
|
||||
user_id: int | str,
|
||||
long_name: str | None = None,
|
||||
short_name: str | None = None,
|
||||
hw_model: str | None = None,
|
||||
is_licensed: str | int | None = None,
|
||||
role: str | None = None,
|
||||
public_key: str | None = None,
|
||||
chat_archived: int | None = None
|
||||
) -> None:
|
||||
|
||||
"""Update or insert node information into the database, preserving unchanged fields."""
|
||||
try:
|
||||
ensure_node_table_exists() # Ensure the table exists before any operation
|
||||
|
||||
if long_name == None:
|
||||
long_name = "Meshtastic " + str(decimal_to_hex(user_id)[-4:])
|
||||
if short_name == None:
|
||||
short_name = str(decimal_to_hex(user_id)[-4:])
|
||||
|
||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||
db_cursor = db_connection.cursor()
|
||||
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
|
||||
|
||||
|
||||
table_columns = [i[1] for i in db_cursor.execute(f'PRAGMA table_info({table_name})')]
|
||||
if "chat_archived" not in table_columns:
|
||||
update_table_query = f"ALTER TABLE {table_name} ADD COLUMN chat_archived INTEGER"
|
||||
@@ -225,6 +230,14 @@ def update_node_info_in_db(user_id, long_name=None, short_name=None, hw_model="U
|
||||
public_key = public_key if public_key is not None else existing_public_key
|
||||
chat_archived = chat_archived if chat_archived is not None else existing_chat_archived
|
||||
|
||||
long_name = long_name if long_name is not None else "Meshtastic " + str(decimal_to_hex(user_id)[-4:])
|
||||
short_name = short_name if short_name is not None else str(decimal_to_hex(user_id)[-4:])
|
||||
hw_model = hw_model if hw_model is not None else "UNSET"
|
||||
is_licensed = is_licensed if is_licensed is not None else 0
|
||||
role = role if role is not None else "CLIENT"
|
||||
public_key = public_key if public_key is not None else ""
|
||||
chat_archived = chat_archived if chat_archived is not None else 0
|
||||
|
||||
# Upsert logic
|
||||
upsert_query = f'''
|
||||
INSERT INTO {table_name} (user_id, long_name, short_name, hw_model, is_licensed, role, public_key, chat_archived)
|
||||
@@ -247,7 +260,7 @@ def update_node_info_in_db(user_id, long_name=None, short_name=None, hw_model="U
|
||||
logging.error(f"Unexpected error in update_node_info_in_db: {e}")
|
||||
|
||||
|
||||
def ensure_node_table_exists():
|
||||
def ensure_node_table_exists() -> None:
|
||||
"""Ensure the node database table exists."""
|
||||
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
|
||||
schema = '''
|
||||
@@ -263,7 +276,7 @@ def ensure_node_table_exists():
|
||||
ensure_table_exists(table_name, schema)
|
||||
|
||||
|
||||
def ensure_table_exists(table_name, schema):
|
||||
def ensure_table_exists(table_name: str, schema: str) -> None:
|
||||
"""Ensure the given table exists in the database."""
|
||||
try:
|
||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||
@@ -277,7 +290,7 @@ def ensure_table_exists(table_name, schema):
|
||||
logging.error(f"Unexpected error in ensure_table_exists({table_name}): {e}")
|
||||
|
||||
|
||||
def get_name_from_database(user_id, type="long"):
|
||||
def get_name_from_database(user_id: int, type: str = "long") -> str:
|
||||
"""
|
||||
Retrieve a user's name (long or short) from the node database.
|
||||
|
||||
@@ -311,7 +324,7 @@ def get_name_from_database(user_id, type="long"):
|
||||
logging.error(f"Unexpected error in get_name_from_database: {e}")
|
||||
return "Unknown"
|
||||
|
||||
def is_chat_archived(user_id):
|
||||
def is_chat_archived(user_id: int) -> int:
|
||||
try:
|
||||
with sqlite3.connect(config.db_file_path) as db_connection:
|
||||
db_cursor = db_connection.cursor()
|
||||
@@ -3,9 +3,10 @@ import binascii
|
||||
import curses
|
||||
import ipaddress
|
||||
import re
|
||||
from ui.colors import get_color
|
||||
from typing import Any, Optional
|
||||
from contact.ui.colors import get_color
|
||||
|
||||
def wrap_text(text, wrap_width):
|
||||
def wrap_text(text: str, wrap_width: int) -> list[str]:
|
||||
"""Wraps text while preserving spaces and breaking long words."""
|
||||
words = re.findall(r'\S+|\s+', text) # Capture words and spaces separately
|
||||
wrapped_lines = []
|
||||
@@ -40,7 +41,7 @@ def wrap_text(text, wrap_width):
|
||||
return wrapped_lines
|
||||
|
||||
|
||||
def get_text_input(prompt):
|
||||
def get_text_input(prompt: str) -> Optional[str]:
|
||||
"""Handles user input with wrapped text for long prompts."""
|
||||
height = 8
|
||||
width = 80
|
||||
@@ -128,7 +129,7 @@ def get_text_input(prompt):
|
||||
return user_input
|
||||
|
||||
|
||||
def get_admin_key_input(current_value):
|
||||
def get_admin_key_input(current_value: list[bytes]) -> Optional[list[str]]:
|
||||
def to_base64(byte_strings):
|
||||
"""Convert byte values to Base64-encoded strings."""
|
||||
return [base64.b64encode(b).decode() for b in byte_strings]
|
||||
@@ -212,7 +213,7 @@ def get_admin_key_input(current_value):
|
||||
|
||||
|
||||
|
||||
def get_repeated_input(current_value):
|
||||
def get_repeated_input(current_value: list[str]) -> Optional[str]:
|
||||
height = 9
|
||||
width = 80
|
||||
start_y = (curses.LINES - height) // 2
|
||||
@@ -279,7 +280,7 @@ def get_repeated_input(current_value):
|
||||
pass # Ignore invalid character inputs
|
||||
|
||||
|
||||
def get_fixed32_input(current_value):
|
||||
def get_fixed32_input(current_value: int) -> int:
|
||||
cvalue = current_value
|
||||
current_value = str(ipaddress.IPv4Address(current_value))
|
||||
height = 10
|
||||
@@ -336,7 +337,7 @@ def get_fixed32_input(current_value):
|
||||
pass # Ignore invalid inputs
|
||||
|
||||
|
||||
def get_list_input(prompt, current_option, list_options):
|
||||
def get_list_input(prompt: str, current_option: Optional[str], list_options: list[str]) -> Optional[str]:
|
||||
"""
|
||||
Displays a scrollable list of list_options for the user to choose from.
|
||||
"""
|
||||
@@ -399,7 +400,13 @@ def get_list_input(prompt, current_option, list_options):
|
||||
return current_option
|
||||
|
||||
|
||||
def move_highlight(old_idx, new_idx, options, list_win, list_pad):
|
||||
def move_highlight(
|
||||
old_idx: int,
|
||||
new_idx: int,
|
||||
options: list[str],
|
||||
list_win: curses.window,
|
||||
list_pad: curses.window
|
||||
) -> int:
|
||||
|
||||
global scroll_offset
|
||||
if 'scroll_offset' not in globals():
|
||||
@@ -439,7 +446,12 @@ def move_highlight(old_idx, new_idx, options, list_win, list_pad):
|
||||
return scroll_offset # Return updated scroll_offset to be stored externally
|
||||
|
||||
|
||||
def draw_arrows(win, visible_height, max_index, start_index):
|
||||
def draw_arrows(
|
||||
win: curses.window,
|
||||
visible_height: int,
|
||||
max_index: int,
|
||||
start_index: int
|
||||
) -> None:
|
||||
|
||||
if visible_height < max_index:
|
||||
if start_index > 0:
|
||||
41
contact/utilities/interfaces.py
Normal file
41
contact/utilities/interfaces.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import logging
|
||||
import meshtastic.serial_interface, meshtastic.tcp_interface, meshtastic.ble_interface
|
||||
|
||||
def initialize_interface(args):
|
||||
try:
|
||||
|
||||
if args.ble:
|
||||
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
|
||||
|
||||
elif args.host:
|
||||
try:
|
||||
if ":" in args.host:
|
||||
tcp_hostname, tcp_port = args.host.split(':')
|
||||
else:
|
||||
tcp_hostname = args.host
|
||||
tcp_port = meshtastic.tcp_interface.DEFAULT_TCP_PORT
|
||||
return meshtastic.tcp_interface.TCPInterface(tcp_hostname, portNumber=tcp_port)
|
||||
except Exception as ex:
|
||||
logging.error(f"Error connecting to {args.host}. {ex}")
|
||||
else:
|
||||
try:
|
||||
client = meshtastic.serial_interface.SerialInterface(args.port)
|
||||
except FileNotFoundError as ex:
|
||||
logging.error(f"The serial device at '{args.port}' was not found. {ex}")
|
||||
except PermissionError as ex:
|
||||
logging.error(f"You probably need to add yourself to the `dialout` group to use a serial connection. {ex}")
|
||||
except Exception as ex:
|
||||
logging.error(f"Unexpected error initializing interface: {ex}")
|
||||
except OSError as ex:
|
||||
logging.error(f"The serial device couldn't be opened, it might be in use by another process. {ex}")
|
||||
if client.devPath is None:
|
||||
try:
|
||||
client = meshtastic.tcp_interface.TCPInterface("localhost")
|
||||
except Exception as ex:
|
||||
logging.error(f"Error connecting to localhost:{ex}")
|
||||
|
||||
return client
|
||||
|
||||
except Exception as ex:
|
||||
logging.critical(f"Fatal error initializing interface: {ex}")
|
||||
|
||||
@@ -4,7 +4,7 @@ import logging
|
||||
import base64
|
||||
import time
|
||||
|
||||
def save_changes(interface, menu_path, modified_settings):
|
||||
def save_changes(interface, modified_settings, menu_state):
|
||||
"""
|
||||
Save changes to the device based on modified settings.
|
||||
:param interface: Meshtastic interface instance
|
||||
@@ -52,8 +52,8 @@ def save_changes(interface, menu_path, modified_settings):
|
||||
if not modified_settings:
|
||||
return
|
||||
|
||||
if menu_path[1] == "Radio Settings" or menu_path[1] == "Module Settings":
|
||||
config_category = menu_path[2].lower() # for radio and module configs
|
||||
if menu_state.menu_path[1] == "Radio Settings" or menu_state.menu_path[1] == "Module Settings":
|
||||
config_category = menu_state.menu_path[2].lower() # for radio and module configs
|
||||
|
||||
if {'latitude', 'longitude', 'altitude'} & modified_settings.keys():
|
||||
lat = float(modified_settings.get('latitude', 0.0))
|
||||
@@ -64,7 +64,7 @@ def save_changes(interface, menu_path, modified_settings):
|
||||
logging.info(f"Updated {config_category} with Latitude: {lat} and Longitude {lon} and Altitude {alt}")
|
||||
return
|
||||
|
||||
elif menu_path[1] == "User Settings": # for user configs
|
||||
elif menu_state.menu_path[1] == "User Settings": # for user configs
|
||||
config_category = "User Settings"
|
||||
long_name = modified_settings.get("longName")
|
||||
short_name = modified_settings.get("shortName")
|
||||
@@ -77,11 +77,11 @@ def save_changes(interface, menu_path, modified_settings):
|
||||
|
||||
return
|
||||
|
||||
elif menu_path[1] == "Channels": # for channel configs
|
||||
elif menu_state.menu_path[1] == "Channels": # for channel configs
|
||||
config_category = "Channels"
|
||||
|
||||
try:
|
||||
channel = menu_path[-1]
|
||||
channel = menu_state.menu_path[-1]
|
||||
channel_num = int(channel.split()[-1]) - 1
|
||||
except (IndexError, ValueError) as e:
|
||||
channel_num = None
|
||||
@@ -1,7 +1,7 @@
|
||||
import globals
|
||||
import contact.globals as globals
|
||||
import datetime
|
||||
from meshtastic.protobuf import config_pb2
|
||||
import ui.default_config as config
|
||||
import contact.ui.default_config as config
|
||||
|
||||
def get_channels():
|
||||
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
|
||||
@@ -47,7 +47,15 @@ def get_node_list():
|
||||
return node['hopsAway'] if 'hopsAway' in node else 100
|
||||
else:
|
||||
return node
|
||||
|
||||
sorted_nodes = sorted(globals.interface.nodes.values(), key = node_sort)
|
||||
|
||||
# Move favorite nodes to the beginning
|
||||
sorted_nodes = sorted(sorted_nodes, key = lambda node: node['isFavorite'] if 'isFavorite' in node else False, reverse = True)
|
||||
|
||||
# Move ignored nodes to the end
|
||||
sorted_nodes = sorted(sorted_nodes, key = lambda node: node['isIgnored'] if 'isIgnored' in node else False)
|
||||
|
||||
node_list = [node['num'] for node in sorted_nodes if node['num'] != my_node_num]
|
||||
return [my_node_num] + node_list # Ensuring your node is always first
|
||||
return []
|
||||
106
main.py
106
main.py
@@ -1,106 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
'''
|
||||
Contact - A Console UI for Meshtastic by http://github.com/pdxlocations
|
||||
Powered by Meshtastic.org
|
||||
V 1.2.2
|
||||
|
||||
Meshtastic® is a registered trademark of Meshtastic LLC. Meshtastic software components are released under various licenses, see GitHub for details. No warranty is provided - use at your own risk.
|
||||
'''
|
||||
|
||||
import contextlib
|
||||
import curses
|
||||
import os
|
||||
from pubsub import pub
|
||||
import sys
|
||||
import io
|
||||
import logging
|
||||
import traceback
|
||||
import threading
|
||||
|
||||
from utilities.db_handler import init_nodedb, load_messages_from_db
|
||||
from message_handlers.rx_handler import on_receive
|
||||
from settings import set_region
|
||||
from ui.curses_ui import main_ui
|
||||
from ui.colors import setup_colors
|
||||
from ui.splash import draw_splash
|
||||
import ui.default_config as config
|
||||
from utilities.arg_parser import setup_parser
|
||||
from utilities.interfaces import initialize_interface
|
||||
from utilities.input_handlers import get_list_input
|
||||
from utilities.utils import get_channels, get_node_list, get_nodeNum
|
||||
import globals
|
||||
|
||||
# Set ncurses compatibility settings
|
||||
os.environ["NCURSES_NO_UTF8_ACS"] = "1"
|
||||
os.environ["LANG"] = "C.UTF-8"
|
||||
os.environ.setdefault("TERM", "xterm-256color")
|
||||
if os.environ.get("COLORTERM") == "gnome-terminal":
|
||||
os.environ["TERM"] = "xterm-256color"
|
||||
|
||||
# Configure logging
|
||||
# Run `tail -f client.log` in another terminal to view live
|
||||
logging.basicConfig(
|
||||
filename=config.log_file_path,
|
||||
level=logging.INFO, # DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
||||
format="%(asctime)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
|
||||
globals.lock = threading.Lock()
|
||||
|
||||
def main(stdscr):
|
||||
output_capture = io.StringIO()
|
||||
try:
|
||||
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
||||
|
||||
setup_colors()
|
||||
draw_splash(stdscr)
|
||||
parser = setup_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.info("Initializing interface %s", args)
|
||||
with globals.lock:
|
||||
globals.interface = initialize_interface(args)
|
||||
|
||||
if globals.interface.localNode.localConfig.lora.region == 0:
|
||||
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
|
||||
if confirmation == "Yes":
|
||||
set_region(globals.interface)
|
||||
globals.interface.close()
|
||||
globals.interface = initialize_interface(args)
|
||||
|
||||
logging.info("Interface initialized")
|
||||
globals.myNodeNum = get_nodeNum()
|
||||
globals.channel_list = get_channels()
|
||||
globals.node_list = get_node_list()
|
||||
pub.subscribe(on_receive, 'meshtastic.receive')
|
||||
init_nodedb()
|
||||
load_messages_from_db()
|
||||
logging.info("Starting main UI")
|
||||
|
||||
main_ui(stdscr)
|
||||
|
||||
except Exception as e:
|
||||
console_output = output_capture.getvalue()
|
||||
logging.error("An error occurred: %s", e)
|
||||
logging.error("Traceback: %s", traceback.format_exc())
|
||||
logging.error("Console output before crash:\n%s", console_output)
|
||||
raise # Re-raise only unexpected errors
|
||||
|
||||
if __name__ == "__main__":
|
||||
log_file = config.log_file_path
|
||||
log_f = open(log_file, "a", buffering=1) # Enable line-buffering for immediate log writes
|
||||
|
||||
sys.stdout = log_f
|
||||
sys.stderr = log_f
|
||||
|
||||
with contextlib.redirect_stderr(log_f), contextlib.redirect_stdout(log_f):
|
||||
try:
|
||||
curses.wrapper(main)
|
||||
except KeyboardInterrupt:
|
||||
logging.info("User exited with Ctrl+C or Ctrl+X") # Clean exit logging
|
||||
sys.exit(0) # Ensure a clean exit
|
||||
except Exception as e:
|
||||
logging.error("Fatal error in curses wrapper: %s", e)
|
||||
logging.error("Traceback: %s", traceback.format_exc())
|
||||
sys.exit(1) # Exit with an error code
|
||||
24
pyproject.toml
Normal file
24
pyproject.toml
Normal file
@@ -0,0 +1,24 @@
|
||||
[project]
|
||||
name = "contact"
|
||||
version = "1.3.5"
|
||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||
authors = [
|
||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||
]
|
||||
license = "GPL-3.0-only"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.9,<3.14"
|
||||
dependencies = [
|
||||
"meshtastic (>=2.6.0,<3.0.0)"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://github.com/pdxlocations/contact"
|
||||
Issues = "https://github.com/pdxlocations/contact/issues"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=2.0.0,<3.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
contact = "contact.__main__:start"
|
||||
@@ -1,321 +0,0 @@
|
||||
import os
|
||||
import json
|
||||
import curses
|
||||
from ui.colors import get_color, setup_colors, COLOR_MAP
|
||||
from ui.default_config import format_json_single_line_arrays, loaded_config
|
||||
from utilities.input_handlers import get_list_input
|
||||
|
||||
width = 60
|
||||
save_option_text = "Save Changes"
|
||||
|
||||
def edit_color_pair(key, current_value):
|
||||
|
||||
"""
|
||||
Allows the user to select a foreground and background color for a key.
|
||||
"""
|
||||
color_list = [" "] + list(COLOR_MAP.keys())
|
||||
fg_color = get_list_input(f"Select Foreground Color for {key}", current_value[0], color_list)
|
||||
bg_color = get_list_input(f"Select Background Color for {key}", current_value[1], color_list)
|
||||
|
||||
return [fg_color, bg_color]
|
||||
|
||||
def edit_value(key, current_value):
|
||||
width = 60
|
||||
height = 10
|
||||
input_width = width - 16 # Allow space for "New Value: "
|
||||
start_y = (curses.LINES - height) // 2
|
||||
start_x = (curses.COLS - width) // 2
|
||||
|
||||
# Create a centered window
|
||||
edit_win = curses.newwin(height, width, start_y, start_x)
|
||||
edit_win.bkgd(get_color("background"))
|
||||
edit_win.attrset(get_color("window_frame"))
|
||||
edit_win.border()
|
||||
|
||||
# Display instructions
|
||||
edit_win.addstr(1, 2, f"Editing {key}", get_color("settings_default", bold=True))
|
||||
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
|
||||
|
||||
wrap_width = width - 4 # Account for border and padding
|
||||
wrapped_lines = [current_value[i:i+wrap_width] for i in range(0, len(current_value), wrap_width)]
|
||||
|
||||
for i, line in enumerate(wrapped_lines[:4]): # Limit display to fit the window height
|
||||
edit_win.addstr(4 + i, 2, line, get_color("settings_default"))
|
||||
|
||||
edit_win.refresh()
|
||||
|
||||
# Handle theme selection dynamically
|
||||
if key == "theme":
|
||||
# Load theme names dynamically from the JSON
|
||||
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
|
||||
return get_list_input("Select Theme", current_value, theme_options)
|
||||
elif key == "node_sort":
|
||||
sort_options = ['lastHeard', 'name', 'hops']
|
||||
return get_list_input("Sort By", current_value, sort_options)
|
||||
|
||||
# Standard Input Mode (Scrollable)
|
||||
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
|
||||
curses.curs_set(1)
|
||||
|
||||
scroll_offset = 0 # Determines which part of the text is visible
|
||||
user_input = ""
|
||||
input_position = (7, 13) # Tuple for row and column
|
||||
row, col = input_position # Unpack tuple
|
||||
while True:
|
||||
visible_text = user_input[scroll_offset:scroll_offset + input_width] # Only show what fits
|
||||
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
|
||||
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
|
||||
edit_win.refresh()
|
||||
|
||||
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
|
||||
key = edit_win.get_wch()
|
||||
|
||||
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow
|
||||
curses.curs_set(0)
|
||||
return current_value # Exit without returning a value
|
||||
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
|
||||
break
|
||||
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
|
||||
if user_input: # Only process if there's something to delete
|
||||
user_input = user_input[:-1]
|
||||
if scroll_offset > 0 and len(user_input) < scroll_offset + input_width:
|
||||
scroll_offset -= 1 # Move back if text is shorter than scrolled area
|
||||
else:
|
||||
if isinstance(key, str):
|
||||
user_input += key
|
||||
else:
|
||||
user_input += chr(key)
|
||||
|
||||
if len(user_input) > input_width: # Scroll if input exceeds visible area
|
||||
scroll_offset += 1
|
||||
|
||||
curses.curs_set(0)
|
||||
return user_input if user_input else current_value
|
||||
|
||||
|
||||
def render_menu(current_data, menu_path, selected_index):
|
||||
"""
|
||||
Render the configuration menu with a Save button directly added to the window.
|
||||
"""
|
||||
# Determine menu items based on the type of current_data
|
||||
if isinstance(current_data, dict):
|
||||
options = list(current_data.keys())
|
||||
elif isinstance(current_data, list):
|
||||
options = [f"[{i}]" for i in range(len(current_data))]
|
||||
else:
|
||||
options = [] # Fallback in case of unexpected data types
|
||||
|
||||
# Calculate dynamic dimensions for the menu
|
||||
num_items = len(options)
|
||||
height = min(curses.LINES - 2, num_items + 6) # Include space for borders and Save button
|
||||
start_y = (curses.LINES - height) // 2
|
||||
start_x = (curses.COLS - width) // 2
|
||||
|
||||
# Create the window
|
||||
menu_win = curses.newwin(height, width, start_y, start_x)
|
||||
menu_win.clear()
|
||||
menu_win.bkgd(get_color("background"))
|
||||
menu_win.attrset(get_color("window_frame"))
|
||||
menu_win.border()
|
||||
menu_win.keypad(True)
|
||||
|
||||
# Display the menu path
|
||||
header = " > ".join(menu_path)
|
||||
if len(header) > width - 4:
|
||||
header = header[:width - 7] + "..."
|
||||
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
|
||||
|
||||
# Create the pad for scrolling
|
||||
menu_pad = curses.newpad(num_items + 1, width - 8)
|
||||
menu_pad.bkgd(get_color("background"))
|
||||
|
||||
# Populate the pad with menu options
|
||||
for idx, key in enumerate(options):
|
||||
value = current_data[key] if isinstance(current_data, dict) else current_data[int(key.strip("[]"))]
|
||||
display_key = f"{key}"[:width // 2 - 2]
|
||||
display_value = (
|
||||
f"{value}"[:width // 2 - 8]
|
||||
)
|
||||
|
||||
color = get_color("settings_default", reverse=(idx == selected_index))
|
||||
menu_pad.addstr(idx, 0, f"{display_key:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
|
||||
|
||||
# Add Save button to the main window
|
||||
save_button_position = height - 2
|
||||
menu_win.addstr(
|
||||
save_button_position,
|
||||
(width - len(save_option_text)) // 2,
|
||||
save_option_text,
|
||||
get_color("settings_save", reverse=(selected_index == len(options))),
|
||||
)
|
||||
|
||||
# Refresh menu and pad
|
||||
menu_win.refresh()
|
||||
menu_pad.refresh(
|
||||
0,
|
||||
0,
|
||||
menu_win.getbegyx()[0] + 3,
|
||||
menu_win.getbegyx()[1] + 4,
|
||||
|
||||
menu_win.getbegyx()[0] + menu_win.getmaxyx()[0] - 3,
|
||||
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4,
|
||||
)
|
||||
|
||||
return menu_win, menu_pad, options
|
||||
|
||||
|
||||
def move_highlight(old_idx, new_idx, options, menu_win, menu_pad):
|
||||
if old_idx == new_idx:
|
||||
return # no-op
|
||||
|
||||
show_save_option = True
|
||||
|
||||
max_index = len(options) + (1 if show_save_option else 0) - 1
|
||||
|
||||
if show_save_option and old_idx == max_index: # special case un-highlight "Save" option
|
||||
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option_text)) // 2, len(save_option_text), get_color("settings_save"))
|
||||
else:
|
||||
menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_default"))
|
||||
|
||||
if show_save_option and new_idx == max_index: # special case highlight "Save" option
|
||||
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option_text)) // 2, len(save_option_text), get_color("settings_save", reverse = True))
|
||||
else:
|
||||
menu_pad.chgat(new_idx, 0,menu_pad.getmaxyx()[1], get_color("settings_default", reverse = True))
|
||||
|
||||
start_index = max(0, new_idx - (menu_win.getmaxyx()[0] - 6))
|
||||
|
||||
menu_win.refresh()
|
||||
menu_pad.refresh(start_index, 0,
|
||||
menu_win.getbegyx()[0] + 3,
|
||||
menu_win.getbegyx()[1] + 4,
|
||||
menu_win.getbegyx()[0] + menu_win.getmaxyx()[0] - 3,
|
||||
menu_win.getbegyx()[1] + 4 + menu_win.getmaxyx()[1] - 4)
|
||||
|
||||
|
||||
def json_editor(stdscr):
|
||||
menu_path = ["App Settings"]
|
||||
selected_index = 0 # Track the selected option
|
||||
|
||||
file_path = "config.json"
|
||||
show_save_option = True # Always show the Save button
|
||||
|
||||
# Ensure the file exists
|
||||
if not os.path.exists(file_path):
|
||||
with open(file_path, "w") as f:
|
||||
json.dump({}, f)
|
||||
|
||||
# Load JSON data
|
||||
with open(file_path, "r") as f:
|
||||
original_data = json.load(f)
|
||||
|
||||
data = original_data # Reference to the original data
|
||||
current_data = data # Track the current level of the menu
|
||||
|
||||
# Render the menu
|
||||
menu_win, menu_pad, options = render_menu(current_data, menu_path, selected_index)
|
||||
need_redraw = True
|
||||
|
||||
while True:
|
||||
if(need_redraw):
|
||||
menu_win, menu_pad, options = render_menu(current_data, menu_path, selected_index)
|
||||
menu_win.refresh()
|
||||
need_redraw = False
|
||||
|
||||
max_index = len(options) + (1 if show_save_option else 0) - 1
|
||||
key = menu_win.getch()
|
||||
|
||||
|
||||
if key == curses.KEY_UP:
|
||||
|
||||
old_selected_index = selected_index
|
||||
selected_index = max_index if selected_index == 0 else selected_index - 1
|
||||
move_highlight(old_selected_index, selected_index, options, menu_win, menu_pad)
|
||||
|
||||
elif key == curses.KEY_DOWN:
|
||||
|
||||
old_selected_index = selected_index
|
||||
selected_index = 0 if selected_index == max_index else selected_index + 1
|
||||
move_highlight(old_selected_index, selected_index, options, menu_win, menu_pad)
|
||||
|
||||
elif key == ord("\t") and show_save_option:
|
||||
old_selected_index = selected_index
|
||||
selected_index = max_index
|
||||
move_highlight(old_selected_index, selected_index, options, menu_win, menu_pad)
|
||||
|
||||
elif key in (curses.KEY_RIGHT, ord("\n")):
|
||||
|
||||
need_redraw = True
|
||||
menu_win.erase()
|
||||
menu_win.refresh()
|
||||
|
||||
|
||||
if selected_index < len(options): # Handle selection of a menu item
|
||||
selected_key = options[selected_index]
|
||||
|
||||
# Handle nested data
|
||||
if isinstance(current_data, dict):
|
||||
if selected_key in current_data:
|
||||
selected_data = current_data[selected_key]
|
||||
else:
|
||||
continue # Skip invalid key
|
||||
elif isinstance(current_data, list):
|
||||
selected_data = current_data[int(selected_key.strip("[]"))]
|
||||
|
||||
if isinstance(selected_data, list) and len(selected_data) == 2:
|
||||
# Edit color pair
|
||||
new_value = edit_color_pair(
|
||||
selected_key, selected_data)
|
||||
current_data[selected_key] = new_value
|
||||
|
||||
elif isinstance(selected_data, (dict, list)):
|
||||
# Navigate into nested data
|
||||
menu_path.append(str(selected_key))
|
||||
current_data = selected_data
|
||||
selected_index = 0 # Reset the selected index
|
||||
|
||||
else:
|
||||
# General value editing
|
||||
new_value = edit_value(selected_key, selected_data)
|
||||
current_data[selected_key] = new_value
|
||||
need_redraw = True
|
||||
|
||||
else:
|
||||
# Save button selected
|
||||
save_json(file_path, data)
|
||||
stdscr.refresh()
|
||||
continue
|
||||
|
||||
elif key in (27, curses.KEY_LEFT): # Escape or Left Arrow
|
||||
|
||||
need_redraw = True
|
||||
menu_win.erase()
|
||||
menu_win.refresh()
|
||||
|
||||
# Navigate back in the menu
|
||||
if len(menu_path) > 1:
|
||||
menu_path.pop()
|
||||
current_data = data
|
||||
for path in menu_path[1:]:
|
||||
current_data = current_data[path] if isinstance(current_data, dict) else current_data[int(path.strip("[]"))]
|
||||
selected_index = 0
|
||||
else:
|
||||
# Exit the editor
|
||||
menu_win.clear()
|
||||
menu_win.refresh()
|
||||
break
|
||||
|
||||
|
||||
def save_json(file_path, data):
|
||||
formatted_json = format_json_single_line_arrays(data)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(formatted_json)
|
||||
setup_colors(reinit=True)
|
||||
|
||||
def main(stdscr):
|
||||
curses.curs_set(0)
|
||||
stdscr.keypad(True)
|
||||
setup_colors()
|
||||
json_editor(stdscr)
|
||||
|
||||
if __name__ == "__main__":
|
||||
curses.wrapper(main)
|
||||
@@ -1,23 +0,0 @@
|
||||
import logging
|
||||
import meshtastic.serial_interface, meshtastic.tcp_interface, meshtastic.ble_interface
|
||||
import globals
|
||||
|
||||
def initialize_interface(args):
|
||||
try:
|
||||
if args.ble:
|
||||
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
|
||||
elif args.host:
|
||||
return meshtastic.tcp_interface.TCPInterface(args.host)
|
||||
else:
|
||||
try:
|
||||
return meshtastic.serial_interface.SerialInterface(args.port)
|
||||
except PermissionError as ex:
|
||||
logging.error(f"You probably need to add yourself to the `dialout` group to use a serial connection. {ex}")
|
||||
except Exception as ex:
|
||||
logging.error(f"Unexpected error initializing interface: {ex}")
|
||||
if globals.interface.devPath is None:
|
||||
return meshtastic.tcp_interface.TCPInterface("meshtastic.local")
|
||||
|
||||
except Exception as ex:
|
||||
logging.critical(f"Fatal error initializing interface: {ex}")
|
||||
|
||||
Reference in New Issue
Block a user