Compare commits

..

2 Commits

Author SHA1 Message Date
pdxlocations
b889711f63 ignore 2025-03-08 22:32:23 -08:00
pdxlocations
361da1c078 db optimizations 2025-03-08 22:31:28 -08:00
27 changed files with 172 additions and 483 deletions

View File

@@ -1,143 +0,0 @@
name: release
on:
push:
tags:
- "[0-9]+.[0-9]+.[0-9]+"
- "[0-9]+.[0-9]+.[0-9]+a[0-9]+"
- "[0-9]+.[0-9]+.[0-9]+b[0-9]+"
- "[0-9]+.[0-9]+.[0-9]+rc[0-9]+"
env:
PACKAGE_NAME: "contact"
OWNER: "pdxlocations"
jobs:
details:
runs-on: ubuntu-latest
outputs:
new_version: ${{ steps.release.outputs.new_version }}
suffix: ${{ steps.release.outputs.suffix }}
tag_name: ${{ steps.release.outputs.tag_name }}
steps:
- uses: actions/checkout@v2
- name: Extract tag and Details
id: release
run: |
if [ "${{ github.ref_type }}" = "tag" ]; then
TAG_NAME=${GITHUB_REF#refs/tags/}
NEW_VERSION=$(echo $TAG_NAME | awk -F'-' '{print $1}')
SUFFIX=$(echo $TAG_NAME | grep -oP '[a-z]+[0-9]+' || echo "")
echo "new_version=$NEW_VERSION" >> "$GITHUB_OUTPUT"
echo "suffix=$SUFFIX" >> "$GITHUB_OUTPUT"
echo "tag_name=$TAG_NAME" >> "$GITHUB_OUTPUT"
echo "Version is $NEW_VERSION"
echo "Suffix is $SUFFIX"
echo "Tag name is $TAG_NAME"
else
echo "No tag found"
exit 1
fi
check_pypi:
needs: details
runs-on: ubuntu-latest
steps:
- name: Fetch information from PyPI
run: |
response=$(curl -s https://pypi.org/pypi/${{ env.PACKAGE_NAME }}/json || echo "{}")
latest_previous_version=$(echo $response | jq --raw-output "select(.releases != null) | .releases | keys_unsorted | last")
if [ -z "$latest_previous_version" ]; then
echo "Package not found on PyPI."
latest_previous_version="0.0.0"
fi
echo "Latest version on PyPI: $latest_previous_version"
echo "latest_previous_version=$latest_previous_version" >> $GITHUB_ENV
- name: Compare versions and exit if not newer
run: |
NEW_VERSION=${{ needs.details.outputs.new_version }}
LATEST_VERSION=$latest_previous_version
if [ "$(printf '%s\n' "$LATEST_VERSION" "$NEW_VERSION" | sort -rV | head -n 1)" != "$NEW_VERSION" ] || [ "$NEW_VERSION" == "$LATEST_VERSION" ]; then
echo "The new version $NEW_VERSION is not greater than the latest version $LATEST_VERSION on PyPI."
exit 1
else
echo "The new version $NEW_VERSION is greater than the latest version $LATEST_VERSION on PyPI."
fi
setup_and_build:
needs: [details, check_pypi]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.12"
- name: Install Poetry
run: |
curl -sSL https://install.python-poetry.org | python3 -
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Set project version with Poetry
run: |
poetry version ${{ needs.details.outputs.new_version }}
- name: Install dependencies
run: poetry install --sync --no-interaction
- name: Build source and wheel distribution
run: |
poetry build
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: dist
path: dist/
pypi_publish:
name: Upload release to PyPI
needs: [setup_and_build, details]
runs-on: ubuntu-latest
environment:
name: release
permissions:
id-token: write
steps:
- name: Download artifacts
uses: actions/download-artifact@v4
with:
name: dist
path: dist/
- name: Publish distribution to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
github_release:
name: Create GitHub Release
needs: [setup_and_build, details]
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout Code
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Download artifacts
uses: actions/download-artifact@v4
with:
name: dist
path: dist/
- name: Create GitHub Release
id: create_release
env:
GH_TOKEN: ${{ github.token }}
run: |
gh release create ${{ needs.details.outputs.tag_name }} dist/* --title ${{ needs.details.outputs.tag_name }} --generate-notes

4
.gitignore vendored
View File

@@ -8,4 +8,6 @@ client.log
settings.log
config.json
default_config.log
dist/
client.db-shm
client.db-wal
client.db.bk

13
.vscode/launch.json vendored
View File

@@ -1,13 +0,0 @@
{
"version": "0.1.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"cwd": "${workspaceFolder}",
"module": "contact.__main__",
"args": ["-c"]
}
]
}

View File

@@ -54,11 +54,11 @@ If no connection arguments are specified, the client will attempt a serial conne
### Example Usage
```sh
contact --port /dev/ttyUSB0
contact --host 192.168.1.1
contact --ble BlAddressOfDevice
python main.py --port /dev/ttyUSB0
python main.py --host 192.168.1.1
python main.py --ble BlAddressOfDevice
```
To quickly connect to localhost, use:
```sh
contact -t
python main.py -t
```

View File

@@ -24,9 +24,9 @@ serial_enabled, "Enable serial console", ""
button_gpio, "Button GPIO", "GPIO pin for user button."
buzzer_gpio, "Buzzer GPIO", "GPIO pin for user buzzer."
rebroadcast_mode, "Rebroadcast mode", "This setting defines the device's behavior for how messages are rebroadcast."
node_info_broadcast_secs, "Nodeinfo broadcast interval", "This is the number of seconds between NodeInfo message broadcasts. Will also send a nodeinfo in response to new nodes on the mesh."
node_info_broadcast_secs, "Nodeinfo broadcast interval", "This is the number of seconds between NodeInfo message broadcasts. Femtofox will still send nodeinfo in response to new nodes on the mesh."
double_tap_as_button_press, "Double tap as button press", "This option will enable a double tap, when a supported accelerometer is attached to the device, to be treated as a button press."
is_managed, "Enable managed mode", "Enabling Managed Mode blocks smartphone apps and web UI from changing configuration. [note]This setting is not required for remote node administration.[/note] Before enabling, verify that node can be controlled via Remote Admin to [warning]prevent being locked out.[/warning]"
is_managed, "Enable managed mode", "Enabling Managed Mode blocks smartphone apps and web UI from changing configuration. [note]This setting is not required for remote node administration.[/note]Before enabling, verify that node can be controlled via Remote Admin to [warning]prevent being locked out.[/warning]"
disable_triple_click, "Disable triple button press", ""
tzdef, "Timezone", "Uses the TZ Database format to display the correct local time on the device display and in its logs."
led_heartbeat_disabled, "Disable LED heartbeat", "On certain hardware models, this disables the blinking heartbeat LED."
@@ -115,7 +115,7 @@ spread_factor, "Spread factor", "Indicates the number of chirps per symbol. Only
coding_rate, "Coding rate", "The proportion of each LoRa transmission that contains actual data - the rest is used for error correction."
frequency_offset, "Frequency offset", "This parameter is for advanced users with advanced test equipment."
region, "Region", "Sets the region for your node. As long as this is not set, the node will display a message and not transmit any packets."
hop_limit, "Hop limit", "The maximum number of intermediate nodes between our node and a node it is sending to. Does not impact received messages.\n[warning]Excessive hop limit increases congestion![/warning]\nMust be between 0-7."
hop_limit, "Hop limit", "The maximum number of intermediate nodes between Femtofox and a node it is sending to. Does not impact received messages.\n[warning]Excessive hop limit increases congestion![/warning]\nMust be between 0-7."
tx_enabled, "Enable TX", "Enables/disables the radio chip. Useful for hot-swapping antennas."
tx_power, "TX power in dBm", "[warning]Setting a 33db radio above 8db will permanently damage it. ERP above 27db violates EU law. ERP above 36db violates US (unlicensed) law.[/warning] If 0, will use the max continuous power legal in region. Must be 0-30 (0=automatic)."
channel_num, "Frequency slot", "Determines the exact frequency the radio transmits and receives. If unset or set to 0, determined automatically by the primary channel name."
@@ -139,7 +139,7 @@ private_key, "Private key", "The private key of the device, used to create a sha
is_managed, "Enable managed mode", "Enabling Managed Mode blocks smartphone apps and web UI from changing configuration. [note]This setting is not required for remote node administration.[/note]Before enabling, verify that node can be controlled via Remote Admin to [warning]prevent being locked out.[/warning]"
serial_enabled, "Enable serial console", ""
debug_log_api_enabled, "Enable debug log", "Set this to true to continue outputting live debug logs over serial or Bluetooth when the API is active."
admin_channel_enabled, "Enable legacy admin channel", "If the node you need to administer or be administered by is running 2.4.x or earlier, you should set this to enabled. Requires a secondary channel named 'admin' be present on both nodes."
admin_channel_enabled, "Enable legacy admin channel", "If the node you Femtofox needs to administer or be administered by is running 2.4.x or earlier, you should set this to enabled. Requires a secondary channel named 'admin' be present on both nodes."
admin_key, "Admin keys", "The public key(s) authorized to send administrative messages to this node. Only messages signed by these keys will be accepted for administrative control. Up to 3."
[module.mqtt]

View File

@@ -3,8 +3,7 @@
'''
Contact - A Console UI for Meshtastic by http://github.com/pdxlocations
Powered by Meshtastic.org
Meshtastic® is a registered trademark of Meshtastic LLC. Meshtastic software components are released under various licenses, see GitHub for details. No warranty is provided - use at your own risk.
V 1.2.2
'''
import contextlib
@@ -14,22 +13,21 @@ from pubsub import pub
import sys
import io
import logging
import subprocess
import traceback
import threading
from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region
from contact.ui.curses_ui import main_ui
from contact.ui.colors import setup_colors
from contact.ui.splash import draw_splash
import contact.ui.default_config as config
from contact.utilities.arg_parser import setup_parser
from contact.utilities.interfaces import initialize_interface
from contact.utilities.input_handlers import get_list_input
from contact.utilities.utils import get_channels, get_node_list, get_nodeNum
import contact.globals as globals
from utilities.db_handler import init_nodedb, load_messages_from_db
from message_handlers.rx_handler import on_receive
from settings import set_region
from ui.curses_ui import main_ui
from ui.colors import setup_colors
from ui.splash import draw_splash
import ui.default_config as config
from utilities.arg_parser import setup_parser
from utilities.interfaces import initialize_interface
from utilities.input_handlers import get_list_input
from utilities.utils import get_channels, get_node_list, get_nodeNum
import globals
# Set ncurses compatibility settings
os.environ["NCURSES_NO_UTF8_ACS"] = "1"
@@ -42,7 +40,7 @@ if os.environ.get("COLORTERM") == "gnome-terminal":
# Run `tail -f client.log` in another terminal to view live
logging.basicConfig(
filename=config.log_file_path,
level=logging.DEBUG, # DEBUG, INFO, WARNING, ERROR, CRITICAL)
level=logging.INFO, # DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s"
)
@@ -58,11 +56,6 @@ def main(stdscr):
parser = setup_parser()
args = parser.parse_args()
# Check if --settings was passed and run settings.py as a subprocess
if getattr(args, 'settings', False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
logging.info("Initializing interface %s", args)
with globals.lock:
globals.interface = initialize_interface(args)
@@ -92,7 +85,7 @@ def main(stdscr):
logging.error("Console output before crash:\n%s", console_output)
raise # Re-raise only unexpected errors
def start():
if __name__ == "__main__":
log_file = config.log_file_path
log_f = open(log_file, "a", buffering=1) # Enable line-buffering for immediate log writes
@@ -108,7 +101,4 @@ def start():
except Exception as e:
logging.error("Fatal error in curses wrapper: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1) # Exit with an error code
if __name__ == "__main__":
start()
sys.exit(1) # Exit with an error code

View File

@@ -1,11 +1,11 @@
import logging
import time
from contact.utilities.utils import refresh_node_list
from utilities.utils import refresh_node_list
from datetime import datetime
from contact.ui.curses_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
from contact.utilities.db_handler import save_message_to_db, maybe_store_nodeinfo_in_db, get_name_from_database, update_node_info_in_db
import contact.ui.default_config as config
import contact.globals as globals
from ui.curses_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
from utilities.db_handler import save_message_to_db, maybe_store_nodeinfo_in_db, get_name_from_database, update_node_info_in_db
import ui.default_config as config
import globals
from datetime import datetime

View File

@@ -3,16 +3,16 @@ import google.protobuf.json_format
from meshtastic import BROADCAST_NUM
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from contact.utilities.db_handler import save_message_to_db, update_ack_nak, get_name_from_database, is_chat_archived, update_node_info_in_db
import contact.ui.default_config as config
import contact.globals as globals
from utilities.db_handler import save_message_to_db, update_ack_nak, get_name_from_database, is_chat_archived, update_node_info_in_db
import ui.default_config as config
import globals
ack_naks = {}
# Note "onAckNak" has special meaning to the API, thus the nonstandard naming convention
# See https://github.com/meshtastic/python/blob/master/meshtastic/mesh_interface.py#L462
def onAckNak(packet):
from contact.ui.curses_ui import draw_messages_window
from ui.curses_ui import draw_messages_window
request = packet['decoded']['requestId']
if(request not in ack_naks):
return
@@ -43,7 +43,7 @@ def onAckNak(packet):
def on_response_traceroute(packet):
"""on response for trace route"""
from contact.ui.curses_ui import draw_channel_list, draw_messages_window, add_notification
from ui.curses_ui import draw_channel_list, draw_messages_window, add_notification
refresh_channels = False
refresh_messages = False

View File

@@ -1,30 +0,0 @@
[project]
name = "contact"
version = "1.3.2"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"},
{name = "Russell Schmidt"},
{name = "noon92"},
{name = "vidplace7"},
{name = "SpudGunMan"},
{name = "Ian McEwen"},
{name = "Nick Maloney"}
]
license = "GPL-3.0-only"
readme = "README.md"
requires-python = ">=3.9,<3.14"
dependencies = [
"meshtastic (>=2.6.0,<3.0.0)"
]
[project.urls]
Homepage = "https://github.com/pdxlocations/contact"
Issues = "https://github.com/pdxlocations/contact/issues"
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
contact = "contact.__main__:start"

View File

@@ -5,13 +5,13 @@ import logging
import sys
import traceback
import contact.ui.default_config as config
from contact.utilities.input_handlers import get_list_input
from contact.ui.colors import setup_colors
from contact.ui.splash import draw_splash
from contact.ui.control_ui import set_region, settings_menu
from contact.utilities.arg_parser import setup_parser
from contact.utilities.interfaces import initialize_interface
import ui.default_config as config
from utilities.input_handlers import get_list_input
from ui.colors import setup_colors
from ui.splash import draw_splash
from ui.control_ui import set_region, settings_menu
from utilities.arg_parser import setup_parser
from utilities.interfaces import initialize_interface
def main(stdscr):

View File

@@ -1,5 +1,5 @@
import curses
import contact.ui.default_config as config
import ui.default_config as config
COLOR_MAP = {
"black": curses.COLOR_BLACK,

View File

@@ -5,16 +5,14 @@ import os
import re
import sys
from contact.utilities.save_to_radio import save_changes
from contact.utilities.config_io import config_export, config_import
from contact.utilities.input_handlers import get_repeated_input, get_text_input, get_fixed32_input, get_list_input, get_admin_key_input
from contact.ui.menus import generate_menu_from_protobuf
from contact.ui.colors import get_color
from contact.ui.dialog import dialog
from contact.utilities.control_utils import parse_ini_file, transform_menu_path
from contact.ui.user_config import json_editor
import contact.localisations
from utilities.save_to_radio import save_changes
from utilities.config_io import config_export, config_import
from utilities.input_handlers import get_repeated_input, get_text_input, get_fixed32_input, get_list_input, get_admin_key_input
from ui.menus import generate_menu_from_protobuf
from ui.colors import get_color
from ui.dialog import dialog
from utilities.control_utils import parse_ini_file, transform_menu_path
from ui.user_config import json_editor
# Constants
width = 80
@@ -29,9 +27,8 @@ parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# Paths
locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
translation_file = os.path.join(parent_dir, "localisations", "en.ini")
config_folder = os.path.join(locals_dir, "node-configs")
translation_file = os.path.join(locals_dir, "localisations", "en.ini")
config_folder = os.path.join(parent_dir, "node-configs")
# Load translations
field_mapping, help_text = parse_ini_file(translation_file)
@@ -103,14 +100,8 @@ def display_menu(current_menu, menu_path, selected_index, show_save_option, help
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 8
)
max_index = num_items + (1 if show_save_option else 0) - 1
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0)
draw_arrows(menu_win, visible_height, max_index, start_index, show_save_option)
return menu_win, menu_pad
def draw_help_window(menu_start_y, menu_start_x, menu_height, max_help_lines, current_menu, selected_index, transformed_path):
global help_win
@@ -160,6 +151,7 @@ def update_help_window(help_win, help_text, transformed_path, selected_option, m
return help_win
def get_wrapped_help_text(help_text, transformed_path, selected_option, width, max_lines):
"""Fetches and formats help text for display, ensuring it fits within the allowed lines."""
@@ -260,13 +252,10 @@ def move_highlight(old_idx, new_idx, options, show_save_option, menu_win, menu_p
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0)
# Adjust start_index only when moving out of visible range
if new_idx == max_index and show_save_option:
pass
elif new_idx < start_index[-1]: # Moving above the visible area
if new_idx < start_index[-1]: # Moving above the visible area
start_index[-1] = new_idx
elif new_idx >= start_index[-1] + visible_height: # Moving below the visible area
start_index[-1] = new_idx - visible_height
pass
start_index[-1] = new_idx - visible_height
# Ensure start_index is within bounds
start_index[-1] = max(0, min(start_index[-1], max_index - visible_height + 1))
@@ -289,7 +278,7 @@ def move_highlight(old_idx, new_idx, options, show_save_option, menu_win, menu_p
menu_pad.refresh(start_index[-1], 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 8)
# Update help window
transformed_path = transform_menu_path(menu_path)
@@ -297,25 +286,6 @@ def move_highlight(old_idx, new_idx, options, show_save_option, menu_win, menu_p
help_y = menu_win.getbegyx()[0] + menu_win.getmaxyx()[0]
help_win = update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_win.getbegyx()[1])
draw_arrows(menu_win, visible_height, max_index, start_index, show_save_option)
def draw_arrows(win, visible_height, max_index, start_index, show_save_option):
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if show_save_option else 0)
if visible_height < mi:
if start_index[-1] > 0:
win.addstr(3, 2, "", get_color("settings_default"))
else:
win.addstr(3, 2, " ", get_color("settings_default"))
if mi - start_index[-1] >= visible_height + (0 if show_save_option else 1) :
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))
def settings_menu(stdscr, interface):
curses.update_lines_cols()
@@ -389,6 +359,10 @@ def settings_menu(stdscr, interface):
menu_win.refresh()
help_win.refresh()
# Get the parent directory of the script
app_directory = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
config_folder = "node-configs"
if show_save_option and selected_index == len(options):
save_changes(interface, menu_path, modified_settings)
modified_settings.clear()
@@ -400,6 +374,7 @@ def settings_menu(stdscr, interface):
for step in menu_path[1:]:
current_menu = current_menu.get(step, {})
selected_index = 0
continue
selected_option = options[selected_index]
@@ -418,7 +393,7 @@ def settings_menu(stdscr, interface):
try:
config_text = config_export(interface)
yaml_file_path = os.path.join(config_folder, filename)
yaml_file_path = os.path.join(app_directory, config_folder, filename)
if os.path.exists(yaml_file_path):
overwrite = get_list_input(f"{filename} already exists. Overwrite?", None, ["Yes", "No"])
@@ -443,13 +418,14 @@ def settings_menu(stdscr, interface):
continue
elif selected_option == "Load Config File":
folder_path = os.path.join(app_directory, config_folder)
# Check if folder exists and is not empty
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
if not os.path.exists(folder_path) or not any(os.listdir(folder_path)):
dialog(stdscr, "", " No config files found. Export a config first.")
continue # Return to menu
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
file_list = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
# Ensure file_list is not empty before proceeding
if not file_list:
@@ -458,7 +434,7 @@ def settings_menu(stdscr, interface):
filename = get_list_input("Choose a config file", None, file_list)
if filename:
file_path = os.path.join(config_folder, filename)
file_path = os.path.join(app_directory, config_folder, filename)
overwrite = get_list_input(f"Are you sure you want to load {filename}?", None, ["Yes", "No"])
if overwrite == "Yes":
config_import(interface, file_path)

View File

@@ -2,15 +2,14 @@ import curses
import textwrap
import logging
import traceback
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu
from contact.message_handlers.tx_handler import send_message, send_traceroute
from contact.ui.colors import setup_colors, get_color
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
from contact.utilities.input_handlers import get_list_input
import contact.ui.default_config as config
import contact.ui.dialog
import contact.globals as globals
from utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from settings import settings_menu
from message_handlers.tx_handler import send_message, send_traceroute
from ui.colors import setup_colors, get_color
from utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
import ui.default_config as config
import ui.dialog
import globals
def handle_resize(stdscr, firstrun):
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win
@@ -213,7 +212,7 @@ def main_ui(stdscr):
elif char == chr(20):
send_traceroute()
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
@@ -294,57 +293,10 @@ def main_ui(stdscr):
draw_channel_list()
draw_messages_window()
# ^/
elif char == chr(31):
if(globals.current_window == 2 or globals.current_window == 0):
search(globals.current_window)
# ^F
elif char == chr(6):
if globals.current_window == 2:
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
curses.curs_set(0)
if 'isFavorite' not in selectedNode or selectedNode['isFavorite'] == False:
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Favorite?", "no", ["yes", "no"])
if confirmation == "yes":
globals.interface.localNode.setFavorite(globals.node_list[globals.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = True
refresh_node_list()
else:
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Favorites?", "no", ["yes", "no"])
if confirmation == "yes":
globals.interface.localNode.removeFavorite(globals.node_list[globals.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = False
refresh_node_list()
handle_resize(stdscr, False)
elif char == chr(7):
if globals.current_window == 2:
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
curses.curs_set(0)
if 'isIgnored' not in selectedNode or selectedNode['isIgnored'] == False:
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Ignored?", "no", ["yes", "no"])
if confirmation == "yes":
globals.interface.localNode.setIgnored(globals.node_list[globals.selected_node])
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = True
else:
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Ignored?", "no", ["yes", "no"])
if confirmation == "yes":
globals.interface.localNode.removeIgnored(globals.node_list[globals.selected_node])
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = False
handle_resize(stdscr, False)
else:
# Append typed character to input text
if(isinstance(char, str)):
@@ -459,12 +411,7 @@ def draw_node_list():
node = globals.interface.nodesByNum[node_num]
secure = 'user' in node and 'publicKey' in node['user'] and node['user']['publicKey']
node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, 'long')}".ljust(box_width - 2)[:box_width - 2]
color = "node_list"
if 'isFavorite' in node and node['isFavorite']:
color = "node_favorite"
if 'isIgnored' in node and node['isIgnored']:
color = "node_ignored"
nodes_pad.addstr(i, 1, node_str, get_color(color, reverse=globals.selected_node == i and globals.current_window == 2))
nodes_pad.addstr(i, 1, node_str, get_color("node_list", reverse=globals.selected_node == i and globals.current_window == 2))
nodes_win.attrset(get_color("window_frame_selected") if globals.current_window == 2 else get_color("window_frame"))
nodes_win.box()
@@ -474,9 +421,9 @@ def draw_node_list():
refresh_pad(2)
# Restore cursor to input field
entry_win.keypad(True)
curses.curs_set(1)
entry_win.move(1, len("Input: ") + len(input_text)+1)
entry_win.refresh()
curses.curs_set(1)
def select_channel(idx):
old_selected_channel = globals.selected_channel
@@ -575,9 +522,9 @@ def draw_packetlog_win():
packetlog_win.refresh()
# Restore cursor to input field
entry_win.keypad(True)
curses.curs_set(1)
entry_win.move(1, len("Input: ") + len(input_text)+1)
entry_win.refresh()
curses.curs_set(1)
def search(win):
start_idx = globals.selected_node
@@ -671,7 +618,7 @@ def draw_node_details():
draw_centered_text_field(function_win, nodestr, 0, get_color("commands"))
def draw_help():
cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit", " ^t = Traceroute", " ^d = Archive Chat", " ^f = Favorite", " ^g = Ignore"]
cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit", " ^t = Traceroute", " ^d = Archive Chat"]
function_str = ""
for s in cmds:
if(len(function_str) + len(s) < function_win.getmaxyx()[1] - 2):
@@ -725,18 +672,9 @@ def refresh_pad(window):
def highlight_line(highlight, window, line):
pad = nodes_pad
color = get_color("node_list")
select_len = nodes_win.getmaxyx()[1] - 2
if window == 2:
node_num = globals.node_list[line]
node = globals.interface.nodesByNum[node_num]
if 'isFavorite' in node and node['isFavorite']:
color = get_color("node_favorite")
if 'isIgnored' in node and node['isIgnored']:
color = get_color("node_ignored")
if(window == 0):
pad = channel_pad
color = get_color("channel_selected" if (line == globals.selected_channel and highlight == False) else "channel_list")
@@ -765,4 +703,4 @@ def draw_centered_text_field(win, text, y_offset, color):
def draw_debug(value):
function_win.addstr(1, 1, f"debug: {value} ")
function_win.refresh()
function_win.refresh()

View File

@@ -65,9 +65,7 @@ def initialize_config():
"settings_save": ["green", "black"],
"settings_breadcrumbs": ["white", "black"],
"settings_warning": ["red", "black"],
"settings_note": ["green", "black"],
"node_favorite": ["green", "black"],
"node_ignored": ["red", "black"]
"settings_note": ["green", "black"]
}
COLOR_CONFIG_LIGHT = {
"default": ["black", "white"],
@@ -91,9 +89,7 @@ def initialize_config():
"settings_save": ["green", "white"],
"settings_breadcrumbs": ["black", "white"],
"settings_warning": ["red", "white"],
"settings_note": ["green", "white"],
"node_favorite": ["green", "white"],
"node_ignored": ["red", "white"]
"settings_note": ["green", "white"]
}
COLOR_CONFIG_GREEN = {
"default": ["green", "black"],
@@ -119,9 +115,7 @@ def initialize_config():
"settings_save": ["green", "black"],
"settings_breadcrumbs": ["green", "black"],
"settings_warning": ["green", "black"],
"settings_note": ["green", "black"],
"node_favorite": ["cyan", "white"],
"node_ignored": ["red", "white"]
"settings_note": ["green", "black"]
}
default_config_variables = {
"db_file_path": db_file_path,

View File

@@ -1,5 +1,5 @@
import curses
from contact.ui.colors import get_color
from ui.colors import get_color
def dialog(stdscr, title, message):
height, width = stdscr.getmaxyx()

View File

@@ -1,5 +1,5 @@
import curses
from contact.ui.colors import get_color
from ui.colors import get_color
def draw_splash(stdscr):
curses.curs_set(0)

View File

@@ -1,9 +1,9 @@
import os
import json
import curses
from contact.ui.colors import get_color, setup_colors, COLOR_MAP
from contact.ui.default_config import format_json_single_line_arrays, loaded_config
from contact.utilities.input_handlers import get_list_input
from ui.colors import get_color, setup_colors, COLOR_MAP
from ui.default_config import format_json_single_line_arrays, loaded_config
from utilities.input_handlers import get_list_input
width = 60
save_option_text = "Save Changes"
@@ -196,10 +196,7 @@ def json_editor(stdscr):
menu_path = ["App Settings"]
selected_index = 0 # Track the selected option
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
file_path = os.path.join(parent_dir, "config.json")
# file_path = "config.json"
file_path = "config.json"
show_save_option = True # Always show the Save button
# Ensure the file exists

View File

@@ -33,14 +33,5 @@ def setup_parser():
default=None,
const="any"
)
parser.add_argument(
"--settings",
"--set",
"--control",
"-c",
help="Launch directly into the settings",
action="store_true"
)
return parser

View File

@@ -1,17 +1,42 @@
import sqlite3
import time
import logging
import re
from datetime import datetime
from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config
import contact.globals as globals
from utilities.utils import decimal_to_hex
import ui.default_config as config
import globals
def get_db_connection():
"""Get a SQLite connection with optimized PRAGMA settings."""
db_connection = sqlite3.connect(config.db_file_path, check_same_thread=False)
db_cursor = db_connection.cursor()
# Check if journal_mode is already set to WAL
db_cursor.execute("PRAGMA journal_mode;")
current_journal_mode = db_cursor.fetchone()[0]
if current_journal_mode != "wal":
db_cursor.execute("PRAGMA journal_mode=WAL;")
# Apply remaining PRAGMA settings (these are fine to execute every time)
db_cursor.executescript("""
PRAGMA synchronous=NORMAL;
PRAGMA cache_size=-64000;
PRAGMA temp_store=MEMORY;
PRAGMA foreign_keys=ON;
""")
return db_connection
def get_table_name(channel):
# Construct the table name
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
"""Returns a properly formatted and safe table name."""
safe_channel = re.sub(r'[^a-zA-Z0-9_]', '', str(channel))
table_name = f"{globals.myNodeNum}_{safe_channel}_messages"
return f'"{table_name}"'
def save_message_to_db(channel, user_id, message_text):
@@ -27,7 +52,7 @@ def save_message_to_db(channel, user_id, message_text):
'''
ensure_table_exists(quoted_table_name, schema)
with sqlite3.connect(config.db_file_path) as db_connection:
with get_db_connection() as db_connection:
db_cursor = db_connection.cursor()
timestamp = int(time.time())
@@ -49,7 +74,7 @@ def save_message_to_db(channel, user_id, message_text):
def update_ack_nak(channel, timestamp, message, ack):
try:
with sqlite3.connect(config.db_file_path) as db_connection:
with get_db_connection() as db_connection:
db_cursor = db_connection.cursor()
update_query = f"""
UPDATE {get_table_name(channel)}
@@ -72,7 +97,7 @@ def update_ack_nak(channel, timestamp, message, ack):
def load_messages_from_db():
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
with get_db_connection() as db_connection:
db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
@@ -190,15 +215,17 @@ def maybe_store_nodeinfo_in_db(packet):
except Exception as e:
logging.error(f"Unexpected error in maybe_store_nodeinfo_in_db: {e}")
def update_node_info_in_db(user_id, long_name=None, short_name=None, hw_model=None, is_licensed=None, role=None, public_key=None, chat_archived=None):
"""Update or insert node information into the database, preserving unchanged fields."""
try:
ensure_node_table_exists() # Ensure the table exists before any operation
with sqlite3.connect(config.db_file_path) as db_connection:
with get_db_connection() as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f'PRAGMA table_info({table_name})')]
if "chat_archived" not in table_columns:
update_table_query = f"ALTER TABLE {table_name} ADD COLUMN chat_archived INTEGER"
@@ -219,26 +246,18 @@ def update_node_info_in_db(user_id, long_name=None, short_name=None, hw_model=No
public_key = public_key if public_key is not None else existing_public_key
chat_archived = chat_archived if chat_archived is not None else existing_chat_archived
long_name = long_name if long_name is not None else "Meshtastic " + str(decimal_to_hex(user_id)[-4:])
short_name = short_name if short_name is not None else str(decimal_to_hex(user_id)[-4:])
hw_model = hw_model if hw_model is not None else "UNSET"
is_licensed = is_licensed if is_licensed is not None else 0
role = role if role is not None else "CLIENT"
public_key = public_key if public_key is not None else ""
chat_archived = chat_archived if chat_archived is not None else 0
# Upsert logic
upsert_query = f'''
INSERT INTO {table_name} (user_id, long_name, short_name, hw_model, is_licensed, role, public_key, chat_archived)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
long_name = excluded.long_name,
short_name = excluded.short_name,
hw_model = excluded.hw_model,
is_licensed = excluded.is_licensed,
role = excluded.role,
public_key = excluded.public_key,
chat_archived = excluded.chat_archived
INSERT INTO {table_name} (user_id, long_name, short_name, hw_model, is_licensed, role, public_key, chat_archived)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
long_name = excluded.long_name,
short_name = excluded.short_name,
hw_model = excluded.hw_model,
is_licensed = excluded.is_licensed,
role = excluded.role,
public_key = excluded.public_key,
chat_archived = COALESCE(excluded.chat_archived, chat_archived);
'''
db_cursor.execute(upsert_query, (user_id, long_name, short_name, hw_model, is_licensed, role, public_key, chat_archived))
db_connection.commit()
@@ -268,7 +287,7 @@ def ensure_node_table_exists():
def ensure_table_exists(table_name, schema):
"""Ensure the given table exists in the database."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
with get_db_connection() as db_connection:
db_cursor = db_connection.cursor()
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({schema})"
db_cursor.execute(create_table_query)
@@ -279,31 +298,32 @@ def ensure_table_exists(table_name, schema):
logging.error(f"Unexpected error in ensure_table_exists({table_name}): {e}")
name_cache = {}
def get_name_from_database(user_id, type="long"):
"""
Retrieve a user's name (long or short) from the node database.
:param user_id: The user ID to look up.
:param type: "long" for long name, "short" for short name.
:return: The retrieved name or the hex of the user id
"""
"""Retrieve a user's name from the node database with caching."""
# Check if we already cached both long and short names
if user_id in name_cache and type in name_cache[user_id]:
return name_cache[user_id][type]
try:
with sqlite3.connect(config.db_file_path) as db_connection:
with get_db_connection() as db_connection:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
column_name = "long_name" if type == "long" else "short_name"
nodeinfo_table = f'"{table_name}"'
# Query the database
query = f"SELECT {column_name} FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))
# Fetch both long and short names in one query
db_cursor.execute(f"SELECT long_name, short_name FROM {nodeinfo_table} WHERE user_id = ?", (user_id,))
result = db_cursor.fetchone()
return result[0] if result else decimal_to_hex(user_id)
if result:
long_name, short_name = result or ("Unknown", "Unknown") # Handle empty result
name_cache[user_id] = {"long": long_name, "short": short_name}
return name_cache[user_id][type]
# If no result, store a fallback value in the cache to avoid future DB queries
name_cache[user_id] = {"long": decimal_to_hex(user_id), "short": decimal_to_hex(user_id)}
return name_cache[user_id][type]
except sqlite3.Error as e:
logging.error(f"SQLite error in get_name_from_database: {e}")
@@ -312,10 +332,12 @@ def get_name_from_database(user_id, type="long"):
except Exception as e:
logging.error(f"Unexpected error in get_name_from_database: {e}")
return "Unknown"
def is_chat_archived(user_id):
"""Check if a chat is archived, returning 0 (False) if not found."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
with get_db_connection() as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
@@ -327,9 +349,8 @@ def is_chat_archived(user_id):
except sqlite3.Error as e:
logging.error(f"SQLite error in is_chat_archived: {e}")
return "Unknown"
return 0
except Exception as e:
logging.error(f"Unexpected error in is_chat_archived: {e}")
return "Unknown"
return 0

View File

@@ -3,7 +3,7 @@ import binascii
import curses
import ipaddress
import re
from contact.ui.colors import get_color
from ui.colors import get_color
def wrap_text(text, wrap_width):
"""Wraps text while preserving spaces and breaking long words."""
@@ -372,11 +372,6 @@ def get_list_input(prompt, current_option, list_options):
list_pad.refresh(0, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
max_index = len(list_options) - 1
visible_height = list_win.getmaxyx()[0] - 5
draw_arrows(list_win, visible_height, max_index, 0)
while True:
key = list_win.getch()
@@ -390,12 +385,8 @@ def get_list_input(prompt, current_option, list_options):
selected_index = min(len(list_options) - 1, selected_index + 1)
move_highlight(old_selected_index, selected_index, list_options, list_win, list_pad)
elif key == ord('\n'): # Enter key
list_win.clear()
list_win.refresh()
return list_options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
list_win.clear()
list_win.refresh()
return current_option
@@ -433,22 +424,5 @@ def move_highlight(old_idx, new_idx, options, list_win, list_pad):
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + 3 + visible_height,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
draw_arrows(list_win, visible_height, max_index, scroll_offset)
return scroll_offset # Return updated scroll_offset to be stored externally
def draw_arrows(win, visible_height, max_index, start_index):
if visible_height < max_index:
if start_index > 0:
win.addstr(3, 2, "", get_color("settings_default"))
else:
win.addstr(3, 2, " ", get_color("settings_default"))
if max_index - start_index > visible_height:
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))
return scroll_offset # Return updated scroll_offset to be stored externally

View File

@@ -1,6 +1,6 @@
import logging
import meshtastic.serial_interface, meshtastic.tcp_interface, meshtastic.ble_interface
import contact.globals as globals
import globals
def initialize_interface(args):
try:

View File

@@ -1,7 +1,7 @@
import contact.globals as globals
import globals
import datetime
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
import ui.default_config as config
def get_channels():
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
@@ -47,15 +47,7 @@ def get_node_list():
return node['hopsAway'] if 'hopsAway' in node else 100
else:
return node
sorted_nodes = sorted(globals.interface.nodes.values(), key = node_sort)
# Move favorite nodes to the beginning
sorted_nodes = sorted(sorted_nodes, key = lambda node: node['isFavorite'] if 'isFavorite' in node else False, reverse = True)
# Move ignored nodes to the end
sorted_nodes = sorted(sorted_nodes, key = lambda node: node['isIgnored'] if 'isIgnored' in node else False)
node_list = [node['num'] for node in sorted_nodes if node['num'] != my_node_num]
return [my_node_num] + node_list # Ensuring your node is always first
return []