Compare commits

...

54 Commits

Author SHA1 Message Date
pdxlocations
c100539ff9 Merge remote-tracking branch 'origin/main' into rm-node-from-db 2025-02-03 18:10:05 -08:00
pdxlocations
5e1ede0bea init 2025-02-03 18:01:38 -08:00
pdxlocations
d911176603 Merge pull request #114 from rfschmid/fix-string-comparison-issue
Fix string comparison
2025-02-03 17:32:54 -08:00
Russell Schmidt
586724662d Fix string comparison 2025-02-03 18:55:02 -06:00
pdxlocations
313c13a96a Merge pull request #113 from rfschmid:fix-node-details-after-resize
Fix node details after resize or settings
2025-02-03 15:18:47 -08:00
Russell Schmidt
1dc0fc1f2e Fix node details after resize or settings 2025-02-03 17:14:16 -06:00
pdxlocations
84dd99fc40 Merge pull request #111 from rfschmid/show-different-node-details-for-ourself 2025-02-03 14:08:09 -08:00
pdxlocations
03328e4115 Merge remote-tracking branch 'origin/main' into pr/rfschmid/111 2025-02-03 14:06:20 -08:00
pdxlocations
2d03f2c60c Merge pull request #112 from rfschmid/fix-help-window-drawing-over-outline 2025-02-03 14:03:28 -08:00
pdxlocations
e462530930 Merge pull request #110 from rfschmid/allow-archiving-chats 2025-02-03 11:38:17 -08:00
Russell Schmidt
7560b0805a Fix help/node details drawing over outline
Leave buffer for box
2025-02-03 12:56:34 -06:00
Russell Schmidt
b5a841d7d2 Show different node details for ourself
User is probably more interested in their own device's eg battery level
vs how long ago the node DB thinks we saw ourselves.
2025-02-03 12:48:04 -06:00
Russell Schmidt
fe625efd5f Merge 'upstream/main' into un-archive-channel-on-msg-receive 2025-02-03 07:43:02 -06:00
pdxlocations
25b3fc427b Merge pull request #109 from pdxlocations:test-compatibilty-settings
Test Compatibility Settings
2025-02-02 19:12:56 -08:00
pdxlocations
21e7e01703 init 2025-02-02 18:10:42 -08:00
pdxlocations
07ce9dfbac Refactor Input Handlers (#108)
* bool is just a list

* working changes

* enum is a list too

* spacing
2025-02-02 16:52:16 -08:00
Russell Schmidt
bae197eeca Add new options to help window 2025-02-02 17:02:09 -06:00
Russell Schmidt
d0c67f0864 Fix display glitch
When deleting a channel made the newly selected channel one that had a
notification, we didn't clear the notification symbol
2025-02-02 16:59:44 -06:00
Russell Schmidt
6e96023906 Un-archive channel on message receive 2025-02-02 16:30:27 -06:00
Russell Schmidt
f5b9db6d7a Allow archivinig chats
^d will remove a conversation from the channels list, but preserve it in
the database in case we start a conversation with the same node again.
2025-02-02 16:22:19 -06:00
pdxlocations
40c2ef62b4 reorder configs 2025-02-02 09:47:34 -08:00
Russell Schmidt
d019c6371c Add node sort preferences (#102)
* Add node sort preferences

Now supports 'lastHeard', 'name', and 'hops'. There's probably a way to
make this a multi-select type input instead of requiring the user to
type exactly the right string but it wasn't immediately obvious how to
do that.

* Select node sort from list, refresh on change

---------

Co-authored-by: pdxlocations <benlipsey@gmail.com>
2025-02-02 09:43:04 -08:00
pdxlocations
c62965a94f Revert "Display Connection Status (#104)" (#105)
This reverts commit a4a15e57b4.
2025-02-02 08:51:51 -08:00
pdxlocations
a4a15e57b4 Display Connection Status (#104)
* init

* working changes
2025-02-02 07:24:17 -08:00
pdxlocations
9621bb09b3 add space in node info bar 2025-02-01 22:01:29 -08:00
pdxlocations
3cb265ca13 Database Refactor (#103)
* update sql db if nodedb is different

* db refactor

* don't insert dummy row

* more refactoring

* cleanup
2025-02-01 21:40:25 -08:00
pdxlocations
ba03f7af7e db commit tab 2025-02-01 18:35:03 -08:00
pdxlocations
3d3d628483 Merge pull request #101 from rfschmid:use-nodes-by-num
Use nodesByNum instead of iterating over nodes
2025-02-01 18:01:46 -08:00
Russell Schmidt
466f385c31 Use nodesByNum instead of iterating over nodes
Removes no longer used function that also iterated over nodes
2025-02-01 11:51:04 -06:00
pdxlocations
aa2d3bded4 keep me on top (#99) 2025-01-31 22:14:04 -08:00
pdxlocations
5dea39ae50 refactor special menu items 2025-01-31 22:01:15 -08:00
pdxlocations
0464e44e0d Update README.md 2025-01-31 18:15:55 -08:00
pdxlocations
e273b3325d bump version 2025-01-31 16:54:25 -08:00
pdxlocations
ad7c7a148f Add Config Import and Export (#98)
* init

* working changes

* working changes

* remove unused code
2025-01-31 16:42:55 -08:00
Russell Schmidt
df7d9b0e2e Enable resizing app (#95)
* Enable resizing app

* Crash less with narrow windows

If the window gets too narrow we'll still crash, but this lets us get
narrower than before.

* Fix resize re-drawing

* Fix crash when resizing too fast

* Fix crash after resize with dialog open

* Enable resizing settings
2025-01-31 16:37:12 -08:00
pdxlocations
b9d8c9ad44 Add Lat/Lon/Alt to Position Settings (#96)
* add lat/lon/alt

* fix types and conditions
2025-01-30 22:33:42 -08:00
pdxlocations
e27504f215 fix reseting nested modified_settings 2025-01-30 22:12:33 -08:00
pdxlocations
648993607d fix packet log height 2025-01-30 14:24:13 -08:00
pdxlocations
cf8ee248de adjust packet log message window offset 2025-01-30 13:40:13 -08:00
pdxlocations
03f7fd81a7 Merge pull request #94 from pdxlocations/get-name-from-db
Get long/short name from sql db, not nodedb
2025-01-30 12:53:22 -08:00
pdxlocations
5730beafa9 change get name to sql db from nodedb 2025-01-30 12:51:31 -08:00
pdxlocations
2a6a1ff798 rm globlas that i think we don't need 2025-01-30 12:06:56 -08:00
pdxlocations
09d832a203 colors on traceroute window 2025-01-29 21:25:53 -08:00
pdxlocations
bea051a69f Merge pull request #93 from pdxlocations:psk-from-bytes
Convert bytes PSK to base 64
2025-01-29 21:16:42 -08:00
pdxlocations
aa1b7d43a8 Convert bytes PSK to base 64 2025-01-29 21:12:26 -08:00
pdxlocations
59187a3838 rm depreciated field (#92) 2025-01-29 18:36:00 -08:00
pdxlocations
47d6212b3a Minor Refactor and Dynamic Color List (#91)
* refactor init

* dynamic color list
2025-01-29 17:45:48 -08:00
pdxlocations
f6e7a09c7e refactor select_from_list 2025-01-29 16:28:08 -08:00
pdxlocations
1d9d055a4d import cleanup 2025-01-29 16:16:38 -08:00
pdxlocations
dae8c46b7b allow emojis in node name 2025-01-29 11:58:20 -08:00
pdxlocations
d7a9112918 restore .clear for dictionary 2025-01-29 11:42:42 -08:00
pdxlocations
5084eca388 enforce 4-letter short name 2025-01-29 11:40:23 -08:00
pdxlocations
5e4b28d47a typo 2025-01-29 08:47:34 -08:00
pdxlocations
c8a5ad3a95 Adding some startup logging 2025-01-29 08:45:42 -08:00
18 changed files with 1035 additions and 585 deletions

3
.gitignore vendored
View File

@@ -1,9 +1,10 @@
venv/
.venv/
__pycache__/
node-configs/
client.db
.DS_Store
client.log
settings.log
config.json
default_config.log
default_config.log

View File

@@ -1,5 +1,9 @@
## Contact - A Console UI for Meshtastic
Powered by Meshtastic.org
### (Formerly Curses Client for Meshtastic)
#### Powered by Meshtastic.org
This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores.
<img width="846" alt="Screenshot_2024-03-29_at_4 00 29_PM" src="https://github.com/pdxlocations/meshtastic-curses-client/assets/117498748/e99533b7-5c0c-463d-8d5f-6e3cccaeced7">

View File

@@ -1,11 +1,11 @@
import sqlite3
import time
from datetime import datetime
import logging
from datetime import datetime
import globals
from utilities.utils import decimal_to_hex
import default_config as config
from utilities.utils import get_name_from_number
import globals
def get_table_name(channel):
# Construct the table name
@@ -13,26 +13,22 @@ def get_table_name(channel):
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
def save_message_to_db(channel, user_id, message_text):
"""Save messages to the database, ensuring the table exists."""
try:
quoted_table_name = get_table_name(channel)
schema = '''
user_id TEXT,
message_text TEXT,
timestamp INTEGER,
ack_type TEXT
'''
ensure_table_exists(quoted_table_name, schema)
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
quoted_table_name = get_table_name(channel)
# Ensure the table exists
create_table_query = f'''
CREATE TABLE IF NOT EXISTS {quoted_table_name} (
user_id TEXT,
message_text TEXT,
timestamp INTEGER,
ack_type TEXT
)
'''
db_cursor.execute(create_table_query)
timestamp = int(time.time())
# Insert the message
@@ -47,10 +43,10 @@ def save_message_to_db(channel, user_id, message_text):
except sqlite3.Error as e:
logging.error(f"SQLite error in save_message_to_db: {e}")
except Exception as e:
logging.error(f"Unexpected error in save_message_to_db: {e}")
def update_ack_nak(channel, timestamp, message, ack):
try:
with sqlite3.connect(config.db_file_path) as db_connection:
@@ -73,15 +69,12 @@ def update_ack_nak(channel, timestamp, message, ack):
logging.error(f"Unexpected error in update_ack_nak: {e}")
from datetime import datetime
def load_messages_from_db():
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
# Retrieve all table names that match the pattern
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
@@ -104,11 +97,11 @@ def load_messages_from_db():
# Extract the channel name from the table name
channel = table_name.split("_")[1]
# Convert the channel to an integer if it's numeric, otherwise keep it as a string
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
channel = int(channel) if channel.isdigit() else channel
# Add the channel to globals.channel_list if not already present
if channel not in globals.channel_list:
if channel not in globals.channel_list and not is_chat_archived(channel):
globals.channel_list.append(channel)
# Ensure the channel exists in globals.all_messages
@@ -133,7 +126,7 @@ def load_messages_from_db():
if user_id == str(globals.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (f"{config.message_prefix} {get_name_from_number(int(user_id), 'short')}: ", message)
formatted_message = (f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ", message)
hourly_messages[hour].append(formatted_message)
@@ -152,136 +145,183 @@ def load_messages_from_db():
def init_nodedb():
"""Initialize the node database and update it with nodes from the interface."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
if not globals.interface.nodes:
return # No nodes to initialize
# Table name construction
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote the table name because it might begin with numerics
ensure_node_table_exists() # Ensure the table exists before insertion
# Create the table if it doesn't exist
create_table_query = f'''
CREATE TABLE IF NOT EXISTS {nodeinfo_table} (
user_id TEXT PRIMARY KEY,
long_name TEXT,
short_name TEXT,
hw_model TEXT,
is_licensed TEXT,
role TEXT,
public_key TEXT
)
'''
db_cursor.execute(create_table_query)
# Insert or update all nodes
for node in globals.interface.nodes.values():
update_node_info_in_db(
user_id=node['num'],
long_name=node['user'].get('longName', ''),
short_name=node['user'].get('shortName', ''),
hw_model=node['user'].get('hwModel', ''),
is_licensed=node['user'].get('isLicensed', '0'),
role=node['user'].get('role', 'CLIENT'),
public_key=node['user'].get('publicKey', '')
)
# Iterate over nodes and insert them into the database
if globals.interface.nodes:
for node in globals.interface.nodes.values():
role = node['user'].get('role', 'CLIENT')
is_licensed = node['user'].get('isLicensed', '0')
public_key = node['user'].get('publicKey', '')
insert_query = f'''
INSERT OR IGNORE INTO {nodeinfo_table} (user_id, long_name, short_name, hw_model, is_licensed, role, public_key)
VALUES (?, ?, ?, ?, ?, ?, ?)
'''
db_cursor.execute(insert_query, (
node['num'],
node['user']['longName'],
node['user']['shortName'],
node['user']['hwModel'],
is_licensed,
role,
public_key
))
db_connection.commit()
logging.info("Node database initialized successfully.")
except sqlite3.Error as e:
logging.error(f"SQLite error in init_nodedb: {e}")
except Exception as e:
logging.error(f"Unexpected error in init_nodedb: {e}")
def maybe_store_nodeinfo_in_db(packet):
"""Save nodeinfo unless that record is already there."""
"""Save nodeinfo unless that record is already there, updating if necessary."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote the table name becuase we might begin with numerics
db_cursor = db_connection.cursor()
# Check if a record with the same user_id already exists
existing_record = db_cursor.execute(f'SELECT * FROM {nodeinfo_table} WHERE user_id=?', (packet['from'],)).fetchone()
if existing_record is None:
role = packet['decoded']['user'].get('role', 'CLIENT')
is_licensed = packet['decoded']['user'].get('isLicensed', '0')
public_key = packet['decoded']['user'].get('publicKey', '')
# No existing record, insert the new record
insert_query = f'''
INSERT INTO {nodeinfo_table} (user_id, long_name, short_name, hw_model, is_licensed, role, public_key)
VALUES (?, ?, ?, ?, ?, ?, ?)
'''
db_cursor.execute(insert_query, (
packet['from'],
packet['decoded']['user']['longName'],
packet['decoded']['user']['shortName'],
packet['decoded']['user']['hwModel'],
is_licensed,
role,
public_key
))
db_connection.commit()
else:
# Check if values are different, update if necessary
# Extract existing values
existing_long_name = existing_record[1]
existing_short_name = existing_record[2]
existing_is_licensed = existing_record[4]
existing_role = existing_record[5]
existing_public_key = existing_record[6]
# Extract new values from the packet
new_long_name = packet['decoded']['user']['longName']
new_short_name = packet['decoded']['user']['shortName']
new_is_licensed = packet['decoded']['user'].get('isLicensed', '0')
new_role = packet['decoded']['user'].get('role', 'CLIENT')
new_public_key = packet['decoded']['user'].get('publicKey', '')
# Check for any differences
if (
existing_long_name != new_long_name or
existing_short_name != new_short_name or
existing_is_licensed != new_is_licensed or
existing_role != new_role or
existing_public_key != new_public_key
):
# Perform necessary updates
update_query = f'''
UPDATE {nodeinfo_table}
SET long_name = ?, short_name = ?, is_licensed = ?, role = ?, public_key = ?
WHERE user_id = ?
'''
db_cursor.execute(update_query, (
new_long_name,
new_short_name,
new_is_licensed,
new_role,
new_public_key,
packet['from']
))
db_connection.commit()
# TODO display new node name in nodelist
user_id = packet['from']
long_name = packet['decoded']['user']['longName']
short_name = packet['decoded']['user']['shortName']
hw_model = packet['decoded']['user']['hwModel']
is_licensed = packet['decoded']['user'].get('isLicensed', '0')
role = packet['decoded']['user'].get('role', 'CLIENT')
public_key = packet['decoded']['user'].get('publicKey', '')
update_node_info_in_db(user_id, long_name, short_name, hw_model, is_licensed, role, public_key)
except sqlite3.Error as e:
logging.error(f"SQLite error in maybe_store_nodeinfo_in_db: {e}")
finally:
db_connection.close()
except Exception as e:
logging.error(f"Unexpected error in maybe_store_nodeinfo_in_db: {e}")
def update_node_info_in_db(user_id, long_name=None, short_name=None, hw_model=None, is_licensed=None, role=None, public_key=None, chat_archived=None):
"""Update or insert node information into the database, preserving unchanged fields."""
try:
ensure_node_table_exists() # Ensure the table exists before any operation
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f'PRAGMA table_info({table_name})')]
if "chat_archived" not in table_columns:
update_table_query = f"ALTER TABLE {table_name} ADD COLUMN chat_archived INTEGER"
db_cursor.execute(update_table_query)
# Fetch existing values to preserve unchanged fields
db_cursor.execute(f'SELECT * FROM {table_name} WHERE user_id = ?', (user_id,))
existing_record = db_cursor.fetchone()
if existing_record:
existing_long_name, existing_short_name, existing_hw_model, existing_is_licensed, existing_role, existing_public_key, existing_chat_archived = existing_record[1:]
long_name = long_name if long_name is not None else existing_long_name
short_name = short_name if short_name is not None else existing_short_name
hw_model = hw_model if hw_model is not None else existing_hw_model
is_licensed = is_licensed if is_licensed is not None else existing_is_licensed
role = role if role is not None else existing_role
public_key = public_key if public_key is not None else existing_public_key
chat_archived = chat_archived if chat_archived is not None else existing_chat_archived
# Upsert logic
upsert_query = f'''
INSERT INTO {table_name} (user_id, long_name, short_name, hw_model, is_licensed, role, public_key, chat_archived)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
long_name = excluded.long_name,
short_name = excluded.short_name,
hw_model = excluded.hw_model,
is_licensed = excluded.is_licensed,
role = excluded.role,
public_key = excluded.public_key,
chat_archived = excluded.chat_archived
'''
db_cursor.execute(upsert_query, (user_id, long_name, short_name, hw_model, is_licensed, role, public_key, chat_archived))
db_connection.commit()
except sqlite3.Error as e:
logging.error(f"SQLite error in update_node_info_in_db: {e}")
except Exception as e:
logging.error(f"Unexpected error in update_node_info_in_db: {e}")
def ensure_node_table_exists():
"""Ensure the node database table exists."""
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
schema = '''
user_id TEXT PRIMARY KEY,
long_name TEXT,
short_name TEXT,
hw_model TEXT,
is_licensed TEXT,
role TEXT,
public_key TEXT,
chat_archived INTEGER
'''
ensure_table_exists(table_name, schema)
def ensure_table_exists(table_name, schema):
"""Ensure the given table exists in the database."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({schema})"
db_cursor.execute(create_table_query)
db_connection.commit()
except sqlite3.Error as e:
logging.error(f"SQLite error in ensure_table_exists({table_name}): {e}")
except Exception as e:
logging.error(f"Unexpected error in ensure_table_exists({table_name}): {e}")
def get_name_from_database(user_id, type="long"):
"""
Retrieve a user's name (long or short) from the node database.
:param user_id: The user ID to look up.
:param type: "long" for long name, "short" for short name.
:return: The retrieved name or the hex of the user id
"""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
column_name = "long_name" if type == "long" else "short_name"
# Query the database
query = f"SELECT {column_name} FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))
result = db_cursor.fetchone()
return result[0] if result else decimal_to_hex(user_id)
except sqlite3.Error as e:
logging.error(f"SQLite error in get_name_from_database: {e}")
return "Unknown"
except Exception as e:
logging.error(f"Unexpected error in get_name_from_database: {e}")
return "Unknown"
def is_chat_archived(user_id):
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))
result = db_cursor.fetchone()
return result[0] if result else 0
except sqlite3.Error as e:
logging.error(f"SQLite error in is_chat_archived: {e}")
return "Unknown"
except Exception as e:
logging.error(f"Unexpected error in is_chat_archived: {e}")
return "Unknown"

View File

@@ -1,6 +1,6 @@
import os
import json
import logging
import json
import os
def format_json_single_line_arrays(data, indent=4):
"""
@@ -115,6 +115,7 @@ def initialize_config():
"ack_str": "[✓]",
"nak_str": "[x]",
"ack_unknown_str": "[…]",
"node_sort": "lastHeard",
"theme": "dark",
"COLOR_CONFIG_DARK": COLOR_CONFIG_DARK,
"COLOR_CONFIG_LIGHT": COLOR_CONFIG_LIGHT,
@@ -148,6 +149,7 @@ def assign_config_variables(loaded_config):
global db_file_path, log_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global theme, COLOR_CONFIG
global node_sort
db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"]
@@ -165,6 +167,7 @@ def assign_config_variables(loaded_config):
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
node_sort = loaded_config["node_sort"]
# Call the function when the script is imported

View File

@@ -2,7 +2,7 @@ interface = None
display_log = False
all_messages = {}
channel_list = []
notifications = set()
notifications = []
packet_buffer = []
node_list = []
myNodeNum = 0

View File

@@ -2,7 +2,7 @@ import curses
import ipaddress
from ui.colors import get_color
def get_user_input(prompt):
def get_text_input(prompt):
# Calculate the dynamic height and width for the input window
height = 7 # Fixed height for input prompt
width = 60
@@ -20,22 +20,31 @@ def get_user_input(prompt):
input_win.addstr(3, 2, "Enter value: ", get_color("settings_default"))
input_win.refresh()
# Check if "shortName" is in the prompt, and set max length accordingly
max_length = 4 if "shortName" in prompt else None
curses.curs_set(1)
user_input = ""
input_position = (3, 15) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
key = input_win.getch(3, 15 + len(user_input)) # Adjust cursor position dynamically
if key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
key = input_win.get_wch(row, col + len(user_input)) # Adjust cursor position dynamically
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
curses.curs_set(0)
return None # Exit without returning a value
elif key == ord('\n'): # Enter key
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
break
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
user_input = user_input[:-1]
input_win.addstr(3, 15, " " * (len(user_input) + 1), get_color("settings_default")) # Clear the line
input_win.addstr(3, 15, user_input, get_color("settings_default"))
else:
user_input += chr(key)
input_win.addstr(row, col, " " * (len(user_input) + 1), get_color("settings_default")) # Clear the line
input_win.addstr(row, col, user_input, get_color("settings_default"))
elif max_length is None or len(user_input) < max_length: # Enforce max length if applicable
# Append typed character to input text
if(isinstance(key, str)):
user_input += key
else:
user_input += chr(key)
input_win.addstr(3, 15, user_input, get_color("settings_default"))
curses.curs_set(0)
@@ -45,51 +54,6 @@ def get_user_input(prompt):
input_win.refresh()
return user_input
def get_bool_selection(message, current_value):
message = "Select True or False:" if None else message
cvalue = current_value
options = ["True", "False"]
selected_index = 0 if current_value == "True" else 1
height = 7
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
bool_win = curses.newwin(height, width, start_y, start_x)
bool_win.bkgd(get_color("background"))
bool_win.attrset(get_color("window_frame"))
bool_win.keypad(True)
bool_win.erase()
bool_win.border()
bool_win.addstr(1, 2, message, get_color("settings_default", bold=True))
for idx, option in enumerate(options):
if idx == selected_index:
bool_win.addstr(idx + 3, 4, option, get_color("settings_default", reverse=True))
else:
bool_win.addstr(idx + 3, 4, option, get_color("settings_default"))
bool_win.refresh()
while True:
key = bool_win.getch()
if key == curses.KEY_UP:
if(selected_index > 0):
selected_index = selected_index - 1
bool_win.chgat(1 + 3, 4, len(options[1]), get_color("settings_default"))
bool_win.chgat(0 + 3, 4, len(options[0]), get_color("settings_default", reverse = True))
elif key == curses.KEY_DOWN:
if(selected_index < len(options) - 1):
selected_index = selected_index + 1
bool_win.chgat(0 + 3, 4, len(options[0]), get_color("settings_default"))
bool_win.chgat(1 + 3, 4, len(options[1]), get_color("settings_default", reverse = True))
elif key == ord('\n'): # Enter key
return options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
return cvalue
def get_repeated_input(current_value):
cvalue = current_value
@@ -133,69 +97,6 @@ def get_repeated_input(current_value):
except ValueError:
pass # Ignore invalid character inputs
def move_highlight(old_idx, new_idx, options, enum_win, enum_pad):
if old_idx == new_idx:
return # no-op
enum_pad.chgat(old_idx, 0, enum_pad.getmaxyx()[1], get_color("settings_default"))
enum_pad.chgat(new_idx, 0, enum_pad.getmaxyx()[1], get_color("settings_default", reverse = True))
enum_win.refresh()
start_index = max(0, new_idx - (enum_win.getmaxyx()[0] - 4))
enum_win.refresh()
enum_pad.refresh(start_index, 0,
enum_win.getbegyx()[0] + 2, enum_win.getbegyx()[1] + 4,
enum_win.getbegyx()[0] + enum_win.getmaxyx()[0] - 2, enum_win.getbegyx()[1] + 4 + enum_win.getmaxyx()[1] - 4)
def get_enum_input(options, current_value):
selected_index = options.index(current_value) if current_value in options else 0
height = min(len(options) + 4, curses.LINES - 2)
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
enum_win = curses.newwin(height, width, start_y, start_x)
enum_win.bkgd(get_color("background"))
enum_win.attrset(get_color("window_frame"))
enum_win.keypad(True)
enum_pad = curses.newpad(len(options) + 1, width - 8)
enum_pad.bkgd(get_color("background"))
enum_win.erase()
enum_win.border()
enum_win.addstr(1, 2, "Select an option:", get_color("settings_default", bold=True))
for idx, option in enumerate(options):
if idx == selected_index:
enum_pad.addstr(idx, 0, option.ljust(width - 8), get_color("settings_default", reverse=True))
else:
enum_pad.addstr(idx, 0, option.ljust(width - 8), get_color("settings_default"))
enum_win.refresh()
enum_pad.refresh(0, 0,
enum_win.getbegyx()[0] + 2, enum_win.getbegyx()[1] + 4,
enum_win.getbegyx()[0] + enum_win.getmaxyx()[0] - 2, enum_win.getbegyx()[1] + enum_win.getmaxyx()[1] - 4)
while True:
key = enum_win.getch()
if key == curses.KEY_UP:
old_selected_index = selected_index
selected_index = max(0, selected_index - 1)
move_highlight(old_selected_index, selected_index, options, enum_win, enum_pad)
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
selected_index = min(len(options) - 1, selected_index + 1)
move_highlight(old_selected_index, selected_index, options, enum_win, enum_pad)
elif key == ord('\n'): # Enter key
return options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
return current_value
def get_fixed32_input(current_value):
cvalue = current_value
@@ -249,4 +150,76 @@ def get_fixed32_input(current_value):
if char.isdigit() or char == ".":
user_input += char # Append only valid characters (digits or dots)
except ValueError:
pass # Ignore invalid inputs
pass # Ignore invalid inputs
def get_list_input(prompt, current_option, list_options):
"""
Displays a scrollable list of list_options for the user to choose from.
"""
selected_index = list_options.index(current_option) if current_option in list_options else 0
height = min(len(list_options) + 5, curses.LINES - 2)
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
list_win = curses.newwin(height, width, start_y, start_x)
list_win.bkgd(get_color("background"))
list_win.attrset(get_color("window_frame"))
list_win.keypad(True)
list_pad = curses.newpad(len(list_options) + 1, width - 8)
list_pad.bkgd(get_color("background"))
# Render header
list_win.erase()
list_win.border()
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
# Render options on the pad
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
# Initial refresh
list_win.refresh()
list_pad.refresh(0, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
while True:
key = list_win.getch()
if key == curses.KEY_UP:
old_selected_index = selected_index
selected_index = max(0, selected_index - 1)
move_highlight(old_selected_index, selected_index, list_options, list_win, list_pad)
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
selected_index = min(len(list_options) - 1, selected_index + 1)
move_highlight(old_selected_index, selected_index, list_options, list_win, list_pad)
elif key == ord('\n'): # Enter key
return list_options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
return current_option
def move_highlight(old_idx, new_idx, options, enum_win, enum_pad):
if old_idx == new_idx:
return # no-op
enum_pad.chgat(old_idx, 0, enum_pad.getmaxyx()[1], get_color("settings_default"))
enum_pad.chgat(new_idx, 0, enum_pad.getmaxyx()[1], get_color("settings_default", reverse = True))
enum_win.refresh()
start_index = max(0, new_idx - (enum_win.getmaxyx()[0] - 4))
enum_win.refresh()
enum_pad.refresh(start_index, 0,
enum_win.getbegyx()[0] + 3, enum_win.getbegyx()[1] + 4,
enum_win.getbegyx()[0] + enum_win.getmaxyx()[0] - 2, enum_win.getbegyx()[1] + 4 + enum_win.getmaxyx()[1] - 4)

16
main.py
View File

@@ -3,7 +3,7 @@
'''
Contact - A Console UI for Meshtastic by http://github.com/pdxlocations
Powered by Meshtastic.org
V 1.1.3
V 1.2.0
'''
import curses
@@ -18,13 +18,17 @@ from message_handlers.rx_handler import on_receive
from ui.curses_ui import main_ui, draw_splash
from utilities.utils import get_channels, get_node_list, get_nodeNum
from db_handler import init_nodedb, load_messages_from_db
import globals
import default_config as config
import globals
# Set environment variables for ncurses compatibility
import os
# Set ncurses compatibility settings
os.environ["NCURSES_NO_UTF8_ACS"] = "1"
os.environ["TERM"] = "screen"
os.environ["LANG"] = "C.UTF-8"
os.environ.setdefault("TERM", "xterm-256color")
if os.environ.get("COLORTERM") == "gnome-terminal":
os.environ["TERM"] = "xterm-256color"
# Configure logging
# Run `tail -f client.log` in another terminal to view live
@@ -39,13 +43,17 @@ def main(stdscr):
draw_splash(stdscr)
parser = setup_parser()
args = parser.parse_args()
logging.info("Initializing interface %s", args)
globals.interface = initialize_interface(args)
logging.info("Interface initialized")
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, 'meshtastic.receive')
init_nodedb()
load_messages_from_db()
logging.info("Starting main UI")
main_ui(stdscr)
except Exception as e:
logging.error("An error occurred: %s", e)

View File

@@ -1,17 +1,16 @@
import logging
import time
from meshtastic import BROADCAST_NUM
from utilities.utils import get_node_list, decimal_to_hex, get_name_from_number
import globals
from utilities.utils import refresh_node_list
from datetime import datetime
from ui.curses_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
from db_handler import save_message_to_db, maybe_store_nodeinfo_in_db
from db_handler import save_message_to_db, maybe_store_nodeinfo_in_db, get_name_from_database, update_node_info_in_db
import default_config as config
import globals
from datetime import datetime
def on_receive(packet, interface):
global nodes_win
# Update packet log
globals.packet_buffer.append(packet)
@@ -26,9 +25,8 @@ def on_receive(packet, interface):
return
# Assume any incoming packet could update the last seen time for a node
new_node_list = get_node_list()
if new_node_list != globals.node_list:
globals.node_list = new_node_list
changed = refresh_node_list()
if(changed):
draw_node_list()
if packet['decoded']['portnum'] == 'NODEINFO_APP':
@@ -52,7 +50,9 @@ def on_receive(packet, interface):
pass
else:
globals.channel_list.append(packet['from'])
globals.all_messages[packet['from']] = []
if(packet['from'] not in globals.all_messages):
globals.all_messages[packet['from']] = []
update_node_info_in_db(packet['from'], chat_archived=False)
refresh_channels = True
channel_number = globals.channel_list.index(packet['from'])
@@ -65,7 +65,7 @@ def on_receive(packet, interface):
# Add received message to the messages list
message_from_id = packet['from']
message_from_string = get_name_from_number(message_from_id, type='short') + ":"
message_from_string = get_name_from_database(message_from_id, type='short') + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
@@ -101,4 +101,4 @@ def on_receive(packet, interface):
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")
logging.error(f"Error processing packet: {e}")

View File

@@ -1,11 +1,11 @@
from datetime import datetime
from meshtastic import BROADCAST_NUM
from db_handler import save_message_to_db, update_ack_nak
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from utilities.utils import get_name_from_number
import globals
import google.protobuf.json_format
from meshtastic import BROADCAST_NUM
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from db_handler import save_message_to_db, update_ack_nak, get_name_from_database
import default_config as config
import globals
ack_naks = {}
@@ -56,18 +56,18 @@ def on_response_traceroute(packet):
msg_str = "Traceroute to:\n"
route_str = get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
route_str = get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
# SNR list should have one more entry than the route, as the final destination adds its SNR also
lenTowards = 0 if "route" not in msg_dict else len(msg_dict["route"])
snrTowardsValid = "snrTowards" in msg_dict and len(msg_dict["snrTowards"]) == lenTowards + 1
if lenTowards > 0: # Loop through hops in route and add SNR if available
for idx, node_num in enumerate(msg_dict["route"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrTowards"][idx] / 4) if snrTowardsValid and msg_dict["snrTowards"][idx] != UNK_SNR else "?") + "dB)"
# End with origin of response
route_str += " --> " + (get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}") \
route_str += " --> " + (get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}") \
+ " (" + (str(msg_dict["snrTowards"][-1] / 4) if snrTowardsValid and msg_dict["snrTowards"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route towards destination
@@ -77,15 +77,15 @@ def on_response_traceroute(packet):
backValid = "hopStart" in packet and "snrBack" in msg_dict and len(msg_dict["snrBack"]) == lenBack + 1
if backValid:
msg_str += "Back:\n"
route_str = get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
route_str = get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
if lenBack > 0: # Loop through hops in routeBack and add SNR if available
for idx, node_num in enumerate(msg_dict["routeBack"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrBack"][idx] / 4) if msg_dict["snrBack"][idx] != UNK_SNR else "?") + "dB)"
# End with destination of response (us)
route_str += " --> " + (get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}") \
route_str += " --> " + (get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}") \
+ " (" + (str(msg_dict["snrBack"][-1] / 4) if msg_dict["snrBack"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route back to us
@@ -102,7 +102,7 @@ def on_response_traceroute(packet):
add_notification(channel_number)
refresh_channels = True
message_from_string = get_name_from_number(packet['from'], type='short') + ":\n"
message_from_string = get_name_from_database(packet['from'], type='short') + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []

View File

@@ -2,24 +2,8 @@ from meshtastic.protobuf import channel_pb2
from google.protobuf.message import Message
import logging
import base64
def settings_reboot(interface):
interface.localNode.reboot()
def settings_reset_nodedb(interface):
interface.localNode.resetNodeDb()
def settings_shutdown(interface):
interface.localNode.shutdown()
def settings_factory_reset(interface):
interface.localNode.factoryReset()
def settings_set_owner(interface, long_name=None, short_name=None, is_licensed=False):
if isinstance(is_licensed, str):
is_licensed = is_licensed.lower() == 'true'
interface.localNode.setOwner(long_name, short_name, is_licensed)
from db_handler import update_node_info_in_db
import globals
def save_changes(interface, menu_path, modified_settings):
"""
@@ -38,14 +22,29 @@ def save_changes(interface, menu_path, modified_settings):
if menu_path[1] == "Radio Settings" or menu_path[1] == "Module Settings":
config_category = menu_path[2].lower() # for radio and module configs
elif menu_path[1] == "User Settings": # for user configs
config_category = "User Settings"
if {'latitude', 'longitude', 'altitude'} & modified_settings.keys():
lat = float(modified_settings.get('latitude', 0.0))
lon = float(modified_settings.get('longitude', 0.0))
alt = int(modified_settings.get('altitude', 0))
interface.localNode.setFixedPosition(lat, lon, alt)
logging.info(f"Updated {config_category} with Latitude: {lat} and Longitude {lon} and Altitude {alt}")
return
elif menu_path[1] == "User Settings": # for user configs
config_category = "User Settings"
long_name = modified_settings.get("longName")
short_name = modified_settings.get("shortName")
is_licensed = modified_settings.get("isLicensed")
is_licensed = is_licensed == "True" or is_licensed is True
is_licensed = is_licensed == "True" or is_licensed is True # Normalize boolean
node.setOwner(long_name, short_name, is_licensed)
logging.info(f"Updated {config_category} with Long Name: {long_name} and Short Name {short_name} and Licensed Mode {is_licensed}")
# Update only the changed fields and preserve others
update_node_info_in_db(globals.myNodeNum, long_name=long_name, short_name=short_name, is_licensed=is_licensed)
logging.info(f"Updated {config_category} with Long Name: {long_name}, Short Name: {short_name}, Licensed Mode: {is_licensed}")
return
elif menu_path[1] == "Channels": # for channel configs

View File

@@ -1,9 +1,11 @@
import curses
import logging
import os
from save_to_radio import settings_factory_reset, settings_reboot, settings_reset_nodedb, settings_shutdown, save_changes
from save_to_radio import save_changes
from utilities.config_io import config_export, config_import
from input_handlers import get_repeated_input, get_text_input, get_fixed32_input, get_list_input
from ui.menus import generate_menu_from_protobuf
from input_handlers import get_bool_selection, get_repeated_input, get_user_input, get_enum_input, get_fixed32_input
from ui.colors import setup_colors, get_color
from utilities.arg_parser import setup_parser
from utilities.interfaces import initialize_interface
@@ -15,7 +17,6 @@ save_option = "Save Changes"
sensitive_settings = ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"]
def display_menu(current_menu, menu_path, selected_index, show_save_option):
global menu_win, menu_pad
# Calculate the dynamic height based on the number of menu items
num_items = len(current_menu) + (1 if show_save_option else 0) # Add 1 for the "Save Changes" option if applicable
@@ -63,8 +64,11 @@ def display_menu(current_menu, menu_path, selected_index, show_save_option):
menu_pad.refresh(0, 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0), menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 8)
return menu_win, menu_pad
def move_highlight(old_idx, new_idx, options, show_save_option, menu_win):
def move_highlight(old_idx, new_idx, options, show_save_option, menu_win, menu_pad):
if(old_idx == new_idx): # no-op
return
@@ -89,6 +93,7 @@ def move_highlight(old_idx, new_idx, options, show_save_option, menu_win):
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0), menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 8)
def settings_menu(stdscr, interface):
curses.update_lines_cols()
menu = generate_menu_from_protobuf(interface)
current_menu = menu["Main Menu"]
@@ -113,7 +118,7 @@ def settings_menu(stdscr, interface):
)
# Display the menu
display_menu(current_menu, menu_path, selected_index, show_save_option)
menu_win, menu_pad = display_menu(current_menu, menu_path, selected_index, show_save_option)
need_redraw = False
@@ -125,17 +130,21 @@ def settings_menu(stdscr, interface):
if key == curses.KEY_UP:
old_selected_index = selected_index
selected_index = max_index if selected_index == 0 else selected_index - 1
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win)
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad)
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
selected_index = 0 if selected_index == max_index else selected_index + 1
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win)
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad)
elif key == curses.KEY_RESIZE:
need_redraw = True
curses.update_lines_cols()
elif key == ord("\t") and show_save_option:
old_selected_index = selected_index
selected_index = max_index
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win)
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad)
elif key == curses.KEY_RIGHT or key == ord('\n'):
need_redraw = True
@@ -143,7 +152,7 @@ def settings_menu(stdscr, interface):
menu_win.refresh()
if show_save_option and selected_index == len(options):
save_changes(interface, menu_path, modified_settings)
modified_settings.erase()
modified_settings.clear()
logging.info("Changes Saved")
if len(menu_path) > 1:
@@ -159,31 +168,81 @@ def settings_menu(stdscr, interface):
if selected_option == "Exit":
break
elif selected_option == "Export Config":
filename = get_text_input("Enter a filename for the config file")
if not filename:
logging.warning("Export aborted: No filename provided.")
continue # Go back to the menu
if not filename.lower().endswith(".yaml"):
filename += ".yaml"
try:
config_text = config_export(globals.interface)
app_directory = os.path.dirname(os.path.abspath(__file__))
config_folder = "node-configs"
yaml_file_path = os.path.join(app_directory, config_folder, filename)
if os.path.exists(yaml_file_path):
overwrite = get_list_input(f"{filename} already exists. Overwrite?", None, ["Yes", "No"])
if overwrite == "Yes":
logging.info("Export cancelled: User chose not to overwrite.")
continue # Return to menu
os.makedirs(os.path.dirname(yaml_file_path), exist_ok=True)
with open(yaml_file_path, "w", encoding="utf-8") as file:
file.write(config_text)
logging.info(f"Config file saved to {yaml_file_path}")
break
except PermissionError:
logging.error(f"Permission denied: Unable to write to {yaml_file_path}")
except OSError as e:
logging.error(f"OS error while saving config: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
continue
elif selected_option == "Load Config":
app_directory = os.path.dirname(os.path.abspath(__file__))
config_folder = "node-configs"
folder_path = os.path.join(app_directory, config_folder)
file_list = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
filename = get_list_input("Choose a config file", None, file_list)
if filename:
file_path = os.path.join(app_directory, config_folder, filename)
overwrite = get_list_input(f"Are you sure you want to load {filename}?", None, ["Yes", "No"])
if overwrite == "Yes":
config_import(globals.interface, file_path)
break
continue
elif selected_option == "Reboot":
confirmation = get_bool_selection("Are you sure you want to Reboot?", 0)
if confirmation == "True":
settings_reboot(interface)
confirmation = get_list_input("Are you sure you want to Reboot?", None, ["Yes", "No"])
if confirmation == "Yes":
interface.localNode.reboot()
logging.info(f"Node Reboot Requested by menu")
break
continue
elif selected_option == "Reset Node DB":
confirmation = get_bool_selection("Are you sure you want to Reset Node DB?", 0)
if confirmation == "True":
settings_reset_nodedb(interface)
confirmation = get_list_input("Are you sure you want to Reset Node DB?", None, ["Yes", "No"])
if confirmation == "Yes":
interface.localNode.resetNodeDb()
logging.info(f"Node DB Reset Requested by menu")
break
continue
elif selected_option == "Shutdown":
confirmation = get_bool_selection("Are you sure you want to Shutdown?", 0)
if confirmation == "True":
settings_shutdown(interface)
confirmation = get_list_input("Are you sure you want to Shutdown?", None, ["Yes", "No"])
if confirmation == "Yes":
interface.localNode.shutdown()
logging.info(f"Node Shutdown Requested by menu")
break
continue
elif selected_option == "Factory Reset":
confirmation = get_bool_selection("Are you sure you want to Factory Reset?", 0)
if confirmation == "True":
settings_factory_reset(interface)
confirmation = get_list_input("Are you sure you want to Factory Reset?", None, ["Yes", "No"])
if confirmation == "Yes":
interface.localNode.factoryReset()
logging.info(f"Factory Reset Requested by menu")
break
continue
@@ -194,27 +253,35 @@ def settings_menu(stdscr, interface):
continue
# need_redraw = True
field_info = current_menu.get(selected_option)
if isinstance(field_info, tuple):
field, current_value = field_info
if selected_option in ['longName', 'shortName', 'isLicensed']:
if selected_option in ['longName', 'shortName']:
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = get_text_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else new_value
current_menu[selected_option] = (field, new_value)
elif selected_option == 'isLicensed':
new_value = get_bool_selection(f"Current value for {selected_option}: {current_value}", str(current_value))
new_value = get_list_input(f"Current value for {selected_option}: {current_value}", str(current_value), ["True", "False"])
new_value = new_value == "True"
current_menu[selected_option] = (field, new_value)
for option, (field, value) in current_menu.items():
modified_settings[option] = value
elif selected_option in ['latitude', 'longitude', 'altitude']:
new_value = get_text_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else new_value
current_menu[selected_option] = (field, new_value)
for option in ['latitude', 'longitude', 'altitude']:
if option in current_menu:
modified_settings[option] = current_menu[option][1]
elif field.type == 8: # Handle boolean type
new_value = get_bool_selection(selected_option, str(current_value))
new_value = get_list_input(selected_option, str(current_value), ["True", "False"])
new_value = new_value == "True" or new_value is True
elif field.label == field.LABEL_REPEATED: # Handle repeated field
@@ -223,22 +290,22 @@ def settings_menu(stdscr, interface):
elif field.enum_type: # Enum field
enum_options = {v.name: v.number for v in field.enum_type.values}
new_value_name = get_enum_input(list(enum_options.keys()), current_value)
new_value_name = get_list_input(selected_option, current_value, list(enum_options.keys()))
new_value = enum_options.get(new_value_name, current_value)
elif field.type == 7: # Field type 7 corresponds to FIXED32
new_value = get_fixed32_input(current_value)
elif field.type == 13: # Field type 13 corresponds to UINT32
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = get_text_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else int(new_value)
elif field.type == 2: # Field type 13 corresponds to INT64
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = get_text_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else float(new_value)
else: # Handle other field types
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = get_text_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else new_value
for key in menu_path[3:]: # Skip "Main Menu"
@@ -265,7 +332,8 @@ def settings_menu(stdscr, interface):
menu_win.erase()
menu_win.refresh()
modified_settings.clear()
if len(menu_path) < 2:
modified_settings.clear()
# Navigate back to the previous menu
if len(menu_path) > 1:

View File

@@ -1,59 +1,79 @@
import curses
import textwrap
import globals
from utilities.utils import get_name_from_number, get_channels, get_time_ago
import time
from utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from settings import settings_menu
from input_handlers import get_list_input
from message_handlers.tx_handler import send_message, send_traceroute
import ui.dialog
from ui.colors import setup_colors, get_color
from db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
import default_config as config
def refresh_all():
for i, box in enumerate([channel_box, messages_box, nodes_box]):
box.attrset(get_color("window_frame_selected") if globals.current_window == i else get_color("window_frame"))
box.box()
box.refresh()
refresh_pad(i)
import ui.dialog
import globals
def draw_node_details():
nodes_snapshot = list(globals.interface.nodes.values())
node = None
for node in nodes_snapshot:
if globals.node_list[globals.selected_node] == node['num']:
break
try:
node = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
except KeyError:
return
function_win.erase()
function_win.box()
nodestr = ""
width = function_win.getmaxyx()[1]
node_details_list = [f"{node['user']['longName']}"
node_details_list = [f"{node['user']['longName']} "
if 'user' in node and 'longName' in node['user'] else "",
f"({node['user']['shortName']})"
if 'user' in node and 'shortName' in node['user'] else "",
f" | {node['user']['hwModel']}"
if 'user' in node and 'hwModel' in node['user'] else "",
f" | {get_time_ago(node['lastHeard'])}" if ('lastHeard' in node and node['lastHeard']) else "",
f" | Hops: {node['hopsAway']}" if 'hopsAway' in node else "",
f" | SNR: {node['snr']}dB"
if ('snr' in node and 'hopsAway' in node and node['hopsAway'] == 0)
else "",
]
if 'user' in node and 'hwModel' in node['user'] else ""]
if globals.node_list[globals.selected_node] == globals.myNodeNum:
node_details_list.extend([f" | Bat: {node['deviceMetrics']['batteryLevel']}% ({node['deviceMetrics']['voltage']}v)"
if 'deviceMetrics' in node
and 'batteryLevel' in node['deviceMetrics']
and 'voltage' in node['deviceMetrics'] else "",
f" | Up: {get_readable_duration(node['deviceMetrics']['uptimeSeconds'])}" if 'deviceMetrics' in node
and 'uptimeSeconds' in node['deviceMetrics'] else "",
f" | ChUtil: {node['deviceMetrics']['channelUtilization']:.2f}%" if 'deviceMetrics' in node
and 'channelUtilization' in node['deviceMetrics'] else "",
f" | AirUtilTX: {node['deviceMetrics']['airUtilTx']:.2f}%" if 'deviceMetrics' in node
and 'airUtilTx' in node['deviceMetrics'] else "",
])
else:
node_details_list.extend([f" | {get_time_ago(node['lastHeard'])}" if ('lastHeard' in node and node['lastHeard']) else "",
f" | Hops: {node['hopsAway']}" if 'hopsAway' in node else "",
f" | SNR: {node['snr']}dB"
if ('snr' in node and 'hopsAway' in node and node['hopsAway'] == 0)
else "",
])
for s in node_details_list:
if len(nodestr) + len(s) < width:
if len(nodestr) + len(s) < width - 2:
nodestr = nodestr + s
draw_centered_text_field(function_win, nodestr, 0, get_color("commands"))
def draw_help():
cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit", " ^t = Traceroute", " ^d = Archive Chat"]
function_str = ""
for s in cmds:
if(len(function_str) + len(s) < function_win.getmaxyx()[1] - 2):
function_str += s
draw_centered_text_field(function_win, function_str, 0, get_color("commands"))
def draw_function_win():
draw_centered_text_field(function_win,
f"↑→↓← = Select ENTER = Send ` = Settings ^P = Packet Log ESC = Quit",
0, get_color("commands"))
if(globals.current_window == 2):
draw_node_details()
else:
draw_help()
def get_msg_window_lines():
packetlog_height = packetlog_win.getmaxyx()[0] if globals.display_log else 0
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if globals.display_log else 0
return messages_box.getmaxyx()[0] - 2 - packetlog_height
def refresh_pad(window):
@@ -96,16 +116,16 @@ def highlight_line(highlight, window, line):
if(window == 2):
pad = nodes_pad
select_len = len(get_name_from_number(globals.node_list[line], "long"))
select_len = len(get_name_from_database(globals.node_list[line], "long"))
pad.chgat(line, 1, select_len, nd_color | curses.A_REVERSE if highlight else nd_color)
if(window == 0):
channel = list(globals.all_messages.keys())[line]
channel = globals.channel_list[line]
win_width = channel_box.getmaxyx()[1]
if(isinstance(channel, int)):
channel = get_name_from_number(channel, type="long")
channel = get_name_from_database(channel, type="long")
select_len = min(len(channel), win_width - 4)
if line == globals.selected_channel and highlight == False:
@@ -114,10 +134,12 @@ def highlight_line(highlight, window, line):
pad.chgat(line, 1, select_len, ch_color | curses.A_REVERSE if highlight else ch_color)
def add_notification(channel_number):
globals.notifications.add(channel_number)
if channel_number not in globals.notifications:
globals.notifications.append(channel_number)
def remove_notification(channel_number):
globals.notifications.discard(channel_number)
if channel_number in globals.notifications:
globals.notifications.remove(channel_number)
def draw_text_field(win, text, color):
win.border()
@@ -168,24 +190,28 @@ def draw_channel_list():
channel_pad.resize(len(globals.all_messages), channel_box.getmaxyx()[1])
for i, channel in enumerate(list(globals.all_messages.keys())):
idx = 0
for channel in globals.channel_list:
# Convert node number to long name if it's an integer
if isinstance(channel, int):
channel = get_name_from_number(channel, type='long')
if is_chat_archived(channel):
continue
channel = get_name_from_database(channel, type='long')
# Determine whether to add the notification
notification = " " + config.notification_symbol if i in globals.notifications else ""
notification = " " + config.notification_symbol if idx in globals.notifications else ""
# Truncate the channel name if it's too long to fit in the window
truncated_channel = channel[:win_width - 5] + '-' if len(channel) > win_width - 5 else channel
if i == globals.selected_channel:
if idx == globals.selected_channel:
if globals.current_window == 0:
channel_pad.addstr(i, 1, truncated_channel + notification, get_color("channel_list", reverse=True))
channel_pad.addstr(idx, 1, truncated_channel + notification, get_color("channel_list", reverse=True))
remove_notification(globals.selected_channel)
else:
channel_pad.addstr(i, 1, truncated_channel + notification, get_color("channel_selected"))
channel_pad.addstr(idx, 1, truncated_channel + notification, get_color("channel_selected"))
else:
channel_pad.addstr(i, 1, truncated_channel + notification, get_color("channel_list"))
channel_pad.addstr(idx, 1, truncated_channel + notification, get_color("channel_list"))
idx += 1
channel_box.attrset(get_color("window_frame_selected") if globals.current_window == 0 else get_color("window_frame"))
channel_box.box()
@@ -239,16 +265,13 @@ def draw_messages_window(scroll_to_bottom = False):
def draw_node_list():
nodes_pad.erase()
win_height = nodes_box.getmaxyx()[0]
start_index = max(0, globals.selected_node - (win_height - 3)) # Calculate starting index based on selected node and window height
nodes_pad.resize(len(globals.node_list), nodes_box.getmaxyx()[1])
for i, node in enumerate(globals.node_list):
if globals.selected_node == i and globals.current_window == 2:
nodes_pad.addstr(i, 1, get_name_from_number(node, "long"), get_color("node_list", reverse=True))
nodes_pad.addstr(i, 1, get_name_from_database(node, "long"), get_color("node_list", reverse=True))
else:
nodes_pad.addstr(i, 1, get_name_from_number(node, "long"), get_color("node_list"))
nodes_pad.addstr(i, 1, get_name_from_database(node, "long"), get_color("node_list"))
nodes_box.attrset(get_color("window_frame_selected") if globals.current_window == 2 else get_color("window_frame"))
nodes_box.box()
@@ -297,7 +320,7 @@ def select_node(idx):
highlight_line(True, 2, globals.selected_node)
refresh_pad(2)
draw_node_details()
draw_function_win()
def scroll_nodes(direction):
new_selected_node = globals.selected_node + direction
@@ -330,10 +353,10 @@ def draw_packetlog_win():
break
# Format each field
from_id = get_name_from_number(packet['from'], 'short').ljust(columns[0])
from_id = get_name_from_database(packet['from'], 'short').ljust(columns[0])
to_id = (
"BROADCAST".ljust(columns[1]) if str(packet['to']) == "4294967295"
else get_name_from_number(packet['to'], 'short').ljust(columns[1])
else get_name_from_database(packet['to'], 'short').ljust(columns[1])
)
if 'decoded' in packet:
port = packet['decoded']['portnum'].ljust(columns[2])
@@ -353,84 +376,107 @@ def draw_packetlog_win():
packetlog_win.box()
packetlog_win.refresh()
def main_ui(stdscr):
global messages_pad, messages_box, nodes_pad, nodes_box, channel_pad, channel_box, function_win, packetlog_win
stdscr.keypad(True)
get_channels()
def handle_resize(stdscr, firstrun):
global messages_pad, messages_box, nodes_pad, nodes_box, channel_pad, channel_box, function_win, packetlog_win, entry_win
# Calculate window max dimensions
height, width = stdscr.getmaxyx()
# Define window dimensions and positions
entry_win = curses.newwin(3, width, 0, 0)
channel_width = 3 * (width // 16)
nodes_width = 5 * (width // 16)
messages_width = width - channel_width - nodes_width
channel_box = curses.newwin(height - 6, channel_width, 3, 0)
messages_box = curses.newwin(height - 6, messages_width, 3, channel_width)
nodes_box = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
if firstrun:
entry_win = curses.newwin(3, width, 0, 0)
channel_box = curses.newwin(height - 6, channel_width, 3, 0)
messages_box = curses.newwin(height - 6, messages_width, 3, channel_width)
nodes_box = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
function_win = curses.newwin(3, width, height - 3, 0)
packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width)
entry_win.bkgd(get_color("background"))
channel_box.bkgd(get_color("background"))
messages_box.bkgd(get_color("background"))
nodes_box.bkgd(get_color("background"))
# Will be resized to what we need when drawn
messages_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1,1)
channel_pad = curses.newpad(1,1)
# Will be resized to what we need when drawn
messages_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1,1)
channel_pad = curses.newpad(1,1)
entry_win.bkgd(get_color("background"))
channel_box.bkgd(get_color("background"))
messages_box.bkgd(get_color("background"))
nodes_box.bkgd(get_color("background"))
messages_pad.bkgd(get_color("background"))
nodes_pad.bkgd(get_color("background"))
channel_pad.bkgd(get_color("background"))
messages_pad.bkgd(get_color("background"))
nodes_pad.bkgd(get_color("background"))
channel_pad.bkgd(get_color("background"))
function_win = curses.newwin(3, width, height - 3, 0)
packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width)
function_win.bkgd(get_color("background"))
packetlog_win.bkgd(get_color("background"))
function_win.bkgd(get_color("background"))
packetlog_win.bkgd(get_color("background"))
channel_box.attrset(get_color("window_frame"))
entry_win.attrset(get_color("window_frame"))
nodes_box.attrset(get_color("window_frame"))
messages_box.attrset(get_color("window_frame"))
function_win.attrset(get_color("window_frame"))
draw_function_win()
else:
entry_win.erase()
channel_box.erase()
messages_box.erase()
nodes_box.erase()
function_win.erase()
packetlog_win.erase()
entry_win.resize(3, width)
channel_box.resize(height - 6, channel_width)
messages_box.resize(height - 6, messages_width)
messages_box.mvwin(3, channel_width)
nodes_box.resize(height - 6, nodes_width)
nodes_box.mvwin(3, channel_width + messages_width)
function_win.resize(3, width)
function_win.mvwin(height - 3, 0)
packetlog_win.resize(int(height / 3), messages_width)
packetlog_win.mvwin(height - int(height / 3) - 3, channel_width)
# Draw boxes around windows
# Set the normal frame color for the channel box
channel_box.attrset(get_color("window_frame"))
channel_box.box()
# Draw boxes for other windows
entry_win.attrset(get_color("window_frame"))
entry_win.box()
nodes_box.attrset(get_color("window_frame"))
nodes_box.box()
messages_box.attrset(get_color("window_frame"))
messages_box.box()
function_win.attrset(get_color("window_frame"))
function_win.box()
# Refresh all windows
entry_win.refresh()
channel_box.refresh()
function_win.refresh()
nodes_box.refresh()
messages_box.refresh()
input_text = ""
entry_win.keypad(True)
curses.curs_set(1)
draw_channel_list()
draw_node_list()
draw_messages_window(True)
try:
draw_function_win()
draw_channel_list()
draw_messages_window(True)
draw_node_list()
except:
# Resize events can come faster than we can re-draw, which can cause a curses error.
# In this case we'll see another curses.KEY_RESIZE in our key handler and draw again later.
pass
def main_ui(stdscr):
global messages_pad, messages_box, nodes_pad, nodes_box, channel_pad, channel_box, function_win, packetlog_win, entry_win
messages_pad = messages_box = nodes_pad = nodes_box = channel_pad = channel_box = function_win = packetlog_win = entry_win = None
stdscr.keypad(True)
get_channels()
input_text = ""
handle_resize(stdscr, True)
while True:
draw_text_field(entry_win, f"Input: {input_text[-(width - 10):]}", get_color("input"))
draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
char = entry_win.get_wch()
@@ -530,7 +576,7 @@ def main_ui(stdscr):
messages_box.refresh()
refresh_pad(1)
elif globals.current_window == 2:
draw_node_details()
draw_function_win()
nodes_box.attrset(get_color("window_frame_selected"))
nodes_box.box()
nodes_box.attrset(get_color("window_frame"))
@@ -548,16 +594,22 @@ def main_ui(stdscr):
curses.curs_set(0) # Hide cursor
ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
curses.curs_set(1) # Show cursor again
refresh_all()
handle_resize(stdscr, False)
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
if globals.current_window == 2:
node_list = globals.node_list
if node_list[globals.selected_node] not in globals.channel_list:
globals.channel_list.append(node_list[globals.selected_node])
if(node_list[globals.selected_node] not in globals.all_messages):
globals.all_messages[node_list[globals.selected_node]] = []
globals.selected_channel = globals.channel_list.index(node_list[globals.selected_node])
if(is_chat_archived(globals.channel_list[globals.selected_channel])):
update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=False)
globals.selected_node = 0
globals.current_window = 0
@@ -587,7 +639,8 @@ def main_ui(stdscr):
curses.curs_set(0)
settings_menu(stdscr, globals.interface)
curses.curs_set(1)
refresh_all()
refresh_node_list()
handle_resize(stdscr, False)
elif char == chr(16):
# Display packet log
@@ -598,6 +651,42 @@ def main_ui(stdscr):
globals.display_log = False
packetlog_win.erase()
draw_messages_window(True)
elif char == curses.KEY_RESIZE:
input_text = ""
handle_resize(stdscr, False)
# ^D
elif char == chr(4):
if(globals.current_window == 0):
if(isinstance(globals.channel_list[globals.selected_channel], int)):
update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=True)
# Shift notifications up to account for deleted item
for i in range(len(globals.notifications)):
if globals.notifications[i] > globals.selected_channel:
globals.notifications[i] -= 1
del globals.channel_list[globals.selected_channel]
globals.selected_channel = min(globals.selected_channel, len(globals.channel_list) - 1)
select_channel(globals.selected_channel)
draw_channel_list()
draw_messages_window()
if(globals.current_window == 2):
curses.curs_set(0)
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from nodedb?", "no", ["yes", "no"])
if confirmation == "yes":
globals.interface.localNode.removeNode(globals.node_list[globals.selected_node])
globals.node_list.pop(globals.selected_node)
draw_messages_window()
draw_node_list()
else:
draw_messages_window()
curses.curs_set(1)
continue
else:
# Append typed character to input text
if(isinstance(char, str)):

View File

@@ -1,4 +1,5 @@
import curses
from ui.colors import get_color
def dialog(stdscr, title, message):
height, width = stdscr.getmaxyx()
@@ -15,17 +16,19 @@ def dialog(stdscr, title, message):
# Create dialog window
win = curses.newwin(dialog_height, dialog_width, y, x)
win.bkgd(get_color("background"))
win.attrset(get_color("window_frame"))
win.border(0)
# Add title
win.addstr(0, 2, title)
win.addstr(0, 2, title, get_color("settings_default"))
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l)
win.addstr(2 + i, 2, l, get_color("settings_default"))
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", curses.color_pair(1) | curses.A_REVERSE)
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
# Refresh dialog window
win.refresh()

View File

@@ -1,6 +1,8 @@
from collections import OrderedDict
from meshtastic.protobuf import config_pb2, module_config_pb2, channel_pb2
from save_to_radio import settings_reboot, settings_factory_reset, settings_reset_nodedb, settings_shutdown
import logging, traceback
import logging
import base64
def extract_fields(message_instance, current_config=None):
if isinstance(current_config, dict): # Handle dictionaries
@@ -12,9 +14,9 @@ def extract_fields(message_instance, current_config=None):
menu = {}
fields = message_instance.DESCRIPTOR.fields
for field in fields:
if field.name in {"sessionkey", "channel_num", "id"}: # Skip certain fields
if field.name in {"sessionkey", "channel_num", "id", "ignore_incoming"}: # Skip certain fields
continue
if field.message_type: # Nested message
nested_instance = getattr(message_instance, field.name)
nested_config = getattr(current_config, field.name, None) if current_config else None
@@ -33,10 +35,8 @@ def extract_fields(message_instance, current_config=None):
else: # Handle other field types
current_value = getattr(current_config, field.name, "Not Set") if current_config else "Not Set"
menu[field.name] = (field, current_value)
return menu
def generate_menu_from_protobuf(interface):
# Function to generate the menu structure from protobuf messages
menu_structure = {"Main Menu": {}}
@@ -54,7 +54,6 @@ def generate_menu_from_protobuf(interface):
"shortName": (None, current_user_config.get("shortName", "Not Set")),
"isLicensed": (None, current_user_config.get("isLicensed", "False"))
}
else:
logging.info("User settings not found in Node Info")
menu_structure["Main Menu"]["User Settings"] = "No user settings available"
@@ -70,6 +69,8 @@ def generate_menu_from_protobuf(interface):
current_channel = interface.localNode.getChannelByChannelIndex(i)
if current_channel:
channel_config = extract_fields(channel, current_channel.settings)
# Convert 'psk' field to Base64
channel_config["psk"] = (channel_config["psk"][0], base64.b64encode(channel_config["psk"][1]).decode('utf-8'))
menu_structure["Main Menu"]["Channels"][f"Channel {i + 1}"] = channel_config
# Add Radio Settings
@@ -77,6 +78,30 @@ def generate_menu_from_protobuf(interface):
current_radio_config = interface.localNode.localConfig if interface else None
menu_structure["Main Menu"]["Radio Settings"] = extract_fields(radio, current_radio_config)
# Add Lat/Lon/Alt
position_data = {
"latitude": (None, current_node_info["position"].get("latitude", 0.0)),
"longitude": (None, current_node_info["position"].get("longitude", 0.0)),
"altitude": (None, current_node_info["position"].get("altitude", 0))
}
# Get existing position menu items
existing_position_menu = menu_structure["Main Menu"]["Radio Settings"].get("position", {})
# Create an ordered position menu with Lat/Lon/Alt inserted in the middle
ordered_position_menu = OrderedDict()
for key, value in existing_position_menu.items():
if key == "fixed_position": # Insert before or after a specific key
ordered_position_menu[key] = value
ordered_position_menu.update(position_data) # Insert Lat/Lon/Alt **right here**
else:
ordered_position_menu[key] = value
# Update the menu with the new order
menu_structure["Main Menu"]["Radio Settings"]["position"] = ordered_position_menu
# Add Module Settings
module = module_config_pb2.ModuleConfig()
current_module_config = interface.localNode.moduleConfig if interface else None
@@ -86,10 +111,12 @@ def generate_menu_from_protobuf(interface):
menu_structure["Main Menu"]["App Settings"] = {"Open": "app_settings"}
# Add additional settings options
menu_structure["Main Menu"]["Reboot"] = settings_reboot
menu_structure["Main Menu"]["Reset Node DB"] = settings_reset_nodedb
menu_structure["Main Menu"]["Shutdown"] = settings_shutdown
menu_structure["Main Menu"]["Factory Reset"] = settings_factory_reset
menu_structure["Main Menu"]["Export Config"] = None
menu_structure["Main Menu"]["Load Config"] = None
menu_structure["Main Menu"]["Reboot"] = None
menu_structure["Main Menu"]["Reset Node DB"] = None
menu_structure["Main Menu"]["Shutdown"] = None
menu_structure["Main Menu"]["Factory Reset"] = None
# Add Exit option
menu_structure["Main Menu"]["Exit"] = None

View File

@@ -1,8 +1,9 @@
import curses
import json
import os
from ui.colors import get_color, setup_colors
import json
import curses
from ui.colors import get_color, setup_colors, COLOR_MAP
from default_config import format_json_single_line_arrays, loaded_config
from input_handlers import get_list_input
width = 60
save_option_text = "Save Changes"
@@ -12,81 +13,12 @@ def edit_color_pair(key, current_value):
"""
Allows the user to select a foreground and background color for a key.
"""
colors = [" ", "black", "red", "green", "yellow", "blue", "magenta", "cyan", "white"]
fg_color = select_color_from_list(f"Select Foreground Color for {key}", current_value[0], colors)
bg_color = select_color_from_list(f"Select Background Color for {key}", current_value[1], colors)
color_list = [" "] + list(COLOR_MAP.keys())
fg_color = get_list_input(f"Select Foreground Color for {key}", current_value[0], color_list)
bg_color = get_list_input(f"Select Background Color for {key}", current_value[1], color_list)
return [fg_color, bg_color]
def select_color_from_list(prompt, current_color, colors):
"""
Displays a scrollable list of colors for the user to choose from using a pad.
"""
selected_index = colors.index(current_color) if current_color in colors else 0
height = min(len(colors) + 5, curses.LINES - 2)
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
color_win = curses.newwin(height, width, start_y, start_x)
color_win.bkgd(get_color("background"))
color_win.attrset(get_color("window_frame"))
color_win.keypad(True)
color_pad = curses.newpad(len(colors) + 1, width - 8)
color_pad.bkgd(get_color("background"))
# Render header
color_win.clear()
color_win.border()
color_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
# Render color options on the pad
for idx, color in enumerate(colors):
if idx == selected_index:
color_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
color_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
# Initial refresh
color_win.refresh()
color_pad.refresh(0, 0,
color_win.getbegyx()[0] + 3, color_win.getbegyx()[1] + 4,
color_win.getbegyx()[0] + color_win.getmaxyx()[0] - 2, color_win.getbegyx()[1] + color_win.getmaxyx()[1] - 4)
while True:
key = color_win.getch()
if key == curses.KEY_UP:
if selected_index > 0:
selected_index -= 1
elif key == curses.KEY_DOWN:
if selected_index < len(colors) - 1:
selected_index += 1
elif key == curses.KEY_RIGHT or key == ord('\n'):
return colors[selected_index]
elif key == curses.KEY_LEFT or key == 27: # ESC key
return current_color
# Refresh the pad with updated selection and scroll offset
for idx, color in enumerate(colors):
if idx == selected_index:
color_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
color_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
color_win.refresh()
color_pad.refresh(0, 0,
color_win.getbegyx()[0] + 3, color_win.getbegyx()[1] + 4,
color_win.getbegyx()[0] + color_win.getmaxyx()[0] - 2, color_win.getbegyx()[1] + color_win.getmaxyx()[1] - 4)
def edit_value(key, current_value):
width = 60
height = 10
@@ -116,22 +48,26 @@ def edit_value(key, current_value):
if key == "theme":
# Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return select_color_from_list("Select Theme", current_value, theme_options)
return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort":
sort_options = ['lastHeard', 'name', 'hops']
return get_list_input("Sort By", current_value, sort_options)
# Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1)
user_input = ""
scroll_offset = 0 # Determines which part of the text is visible
user_input = ""
input_position = (7, 13) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
visible_text = user_input[scroll_offset:scroll_offset + input_width] # Only show what fits
edit_win.addstr(7, 13, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(7, 13, visible_text, get_color("settings_default")) # Display text
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
edit_win.refresh()
edit_win.move(7, 13 + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
key = edit_win.get_wch()
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow

281
utilities/config_io.py Normal file
View File

@@ -0,0 +1,281 @@
import yaml
import logging
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import BROADCAST_ADDR, mt_config
from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main
def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = camel_to_snake(config_root)
for pref in config:
pref_name = f"{snake_name}.{pref}"
if isinstance(config[pref], dict):
traverseConfig(pref_name, config[pref], interface_config)
else:
setPref(interface_config, pref_name, config[pref])
return True
def splitCompoundName(comp_name: str) -> List[str]:
"""Split compound (dot separated) preference name into parts"""
name: List[str] = comp_name.split(".")
if len(name) < 2:
name[0] = comp_name
name.append(comp_name)
return name
def setPref(config, comp_name, raw_val) -> bool:
"""Set a channel or preferences value"""
name = splitCompoundName(comp_name)
snake_name = camel_to_snake(name[-1])
camel_name = snake_to_camel(name[-1])
uni_name = camel_name if mt_config.camel_case else snake_name
logging.debug(f"snake_name:{snake_name}")
logging.debug(f"camel_name:{camel_name}")
objDesc = config.DESCRIPTOR
config_part = config
config_type = objDesc.fields_by_name.get(name[0])
if config_type and config_type.message_type is not None:
for name_part in name[1:-1]:
part_snake_name = camel_to_snake((name_part))
config_part = getattr(config, config_type.name)
config_type = config_type.message_type.fields_by_name.get(part_snake_name)
pref = None
if config_type and config_type.message_type is not None:
pref = config_type.message_type.fields_by_name.get(snake_name)
# Others like ChannelSettings are standalone
elif config_type:
pref = config_type
if (not pref) or (not config_type):
return False
if isinstance(raw_val, str):
val = fromStr(raw_val)
else:
val = raw_val
logging.debug(f"valStr:{raw_val} val:{val}")
if snake_name == "wifi_psk" and len(str(raw_val)) < 8:
logging.info(f"Warning: network.wifi_psk must be 8 or more characters.")
return False
enumType = pref.enum_type
# pylint: disable=C0123
if enumType and type(val) == str:
# We've failed so far to convert this string into an enum, try to find it by reflection
e = enumType.values_by_name.get(val)
if e:
val = e.number
else:
logging.info(
f"{name[0]}.{uni_name} does not have an enum called {val}, so you can not set it."
)
logging.info(f"Choices in sorted order are:")
names = []
for f in enumType.values:
# Note: We must use the value of the enum (regardless if camel or snake case)
names.append(f"{f.name}")
for temp_name in sorted(names):
logging.info(f" {temp_name}")
return False
# repeating fields need to be handled with append, not setattr
if pref.label != pref.LABEL_REPEATED:
try:
if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name)
setattr(config_values, pref.name, val)
else:
setattr(config_part, snake_name, val)
except TypeError:
# The setter didn't like our arg type guess try again as a string
config_values = getattr(config_part, config_type.name)
setattr(config_values, pref.name, str(val))
elif type(val) == list:
new_vals = [fromStr(x) for x in val]
config_values = getattr(config, config_type.name)
getattr(config_values, pref.name)[:] = new_vals
else:
config_values = getattr(config, config_type.name)
if val == 0:
# clear values
logging.info(f"Clearing {pref.name} list")
del getattr(config_values, pref.name)[:]
else:
logging.info(f"Adding '{raw_val}' to the {pref.name} list")
cur_vals = [x for x in getattr(config_values, pref.name) if x not in [0, "", b""]]
cur_vals.append(val)
getattr(config_values, pref.name)[:] = cur_vals
return True
prefix = f"{'.'.join(name[0:-1])}." if config_type.message_type is not None else ""
logging.info(f"Set {prefix}{uni_name} to {raw_val}")
return True
def config_import(interface, filename):
with open(filename, encoding="utf8") as file:
configuration = yaml.safe_load(file)
closeNow = True
interface.getNode('^local', False).beginSettingsTransaction()
if "owner" in configuration:
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode('^local', False).setOwner(configuration["owner"])
if "owner_short" in configuration:
logging.info(
f"Setting device owner short to {configuration['owner_short']}"
)
waitForAckNak = True
interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["owner_short"]
)
if "ownerShort" in configuration:
logging.info(
f"Setting device owner short to {configuration['ownerShort']}"
)
waitForAckNak = True
interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["ownerShort"]
)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode('^local').setURL(configuration["channel_url"])
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode('^local').setURL(configuration["channelUrl"])
if "location" in configuration:
alt = 0
lat = 0.0
lon = 0.0
localConfig = interface.localNode.localConfig
if "alt" in configuration["location"]:
alt = int(configuration["location"]["alt"] or 0)
logging.info(f"Fixing altitude at {alt} meters")
if "lat" in configuration["location"]:
lat = float(configuration["location"]["lat"] or 0)
logging.info(f"Fixing latitude at {lat} degrees")
if "lon" in configuration["location"]:
lon = float(configuration["location"]["lon"] or 0)
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
if "config" in configuration:
localConfig = interface.getNode('^local').localConfig
for section in configuration["config"]:
traverseConfig(
section, configuration["config"][section], localConfig
)
interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
if "module_config" in configuration:
moduleConfig = interface.getNode('^local').moduleConfig
for section in configuration["module_config"]:
traverseConfig(
section,
configuration["module_config"][section],
moduleConfig,
)
interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
interface.getNode('^local', False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")
def config_export(interface) -> str:
"""used in --export-config"""
configObj = {}
owner = interface.getLongName()
owner_short = interface.getShortName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
pos = myinfo.get("position")
lat = None
lon = None
alt = None
if pos:
lat = pos.get("latitude")
lon = pos.get("longitude")
alt = pos.get("altitude")
if owner:
configObj["owner"] = owner
if owner_short:
configObj["owner_short"] = owner_short
if channel_url:
if mt_config.camel_case:
configObj["channelUrl"] = channel_url
else:
configObj["channel_url"] = channel_url
# lat and lon don't make much sense without the other (so fill with 0s), and alt isn't meaningful without both
if lat or lon:
configObj["location"] = {"lat": lat or float(0), "lon": lon or float(0)}
if alt:
configObj["location"]["alt"] = alt
config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
if config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in config:
if mt_config.camel_case:
prefs[snake_to_camel(pref)] = config[pref]
else:
prefs[pref] = config[pref]
# mark base64 encoded fields as such
if pref == "security":
if 'privateKey' in prefs[pref]:
prefs[pref]['privateKey'] = 'base64:' + prefs[pref]['privateKey']
if 'publicKey' in prefs[pref]:
prefs[pref]['publicKey'] = 'base64:' + prefs[pref]['publicKey']
if 'adminKey' in prefs[pref]:
for i in range(len(prefs[pref]['adminKey'])):
prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i]
if mt_config.camel_case:
configObj["config"] = config #Identical command here and 2 lines below?
else:
configObj["config"] = config
module_config = MessageToDict(interface.localNode.moduleConfig)
if module_config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in module_config:
if len(module_config[pref]) > 0:
prefs[pref] = module_config[pref]
if mt_config.camel_case:
configObj["module_config"] = prefs
else:
configObj["module_config"] = prefs
config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out)
#was used as a string here and a Dictionary above
config_txt += yaml.dump(configObj)
# logging.info(config_txt)
return config_txt

View File

@@ -3,14 +3,21 @@ import meshtastic.serial_interface, meshtastic.tcp_interface, meshtastic.ble_int
import globals
def initialize_interface(args):
if args.ble:
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
elif args.host:
return meshtastic.tcp_interface.TCPInterface(args.host)
else:
try:
return meshtastic.serial_interface.SerialInterface(args.port)
except PermissionError as ex:
logging.error("You probably need to add yourself to the `dialout` group to use a serial connection.")
if globals.interface.devPath is None:
return meshtastic.tcp_interface.TCPInterface("meshtastic.local")
try:
if args.ble:
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
elif args.host:
return meshtastic.tcp_interface.TCPInterface(args.host)
else:
try:
return meshtastic.serial_interface.SerialInterface(args.port)
except PermissionError as ex:
logging.error(f"You probably need to add yourself to the `dialout` group to use a serial connection. {ex}")
except Exception as ex:
logging.error(f"Unexpected error initializing interface: {ex}")
if globals.interface.devPath is None:
return meshtastic.tcp_interface.TCPInterface("meshtastic.local")
except Exception as ex:
logging.critical(f"Fatal error initializing interface: {ex}")

View File

@@ -1,7 +1,7 @@
import globals
from datetime import datetime
import datetime
from meshtastic.protobuf import config_pb2
import re
import default_config as config
def get_channels():
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
@@ -36,13 +36,29 @@ def get_channels():
def get_node_list():
if globals.interface.nodes:
sorted_nodes = sorted(
globals.interface.nodes.values(),
key = lambda node: (node['lastHeard'] if ('lastHeard' in node and isinstance(node['lastHeard'], int)) else 0),
reverse = True)
return [node['num'] for node in sorted_nodes]
my_node_num = globals.myNodeNum
def node_sort(node):
if(config.node_sort == 'lastHeard'):
return -node['lastHeard'] if ('lastHeard' in node and isinstance(node['lastHeard'], int)) else 0
elif(config.node_sort == "name"):
return node['user']['longName']
elif(config.node_sort == "hops"):
return node['hopsAway'] if 'hopsAway' in node else 100
else:
return node
sorted_nodes = sorted(globals.interface.nodes.values(), key = node_sort)
node_list = [node['num'] for node in sorted_nodes if node['num'] != my_node_num]
return [my_node_num] + node_list # Ensuring your node is always first
return []
def refresh_node_list():
new_node_list = get_node_list()
if new_node_list != globals.node_list:
globals.node_list = new_node_list
return True
return False
def get_nodeNum():
myinfo = globals.interface.getMyNodeInfo()
myNodeNum = myinfo['num']
@@ -56,50 +72,45 @@ def convert_to_camel_case(string):
camel_case_string = ''.join(word.capitalize() for word in words)
return camel_case_string
def get_name_from_number(number, type='long'):
name = ""
nodes_snapshot = list(globals.interface.nodes.values())
for node in nodes_snapshot:
if number == node['num']:
if type == 'long':
return node['user']['longName']
elif type == 'short':
return node['user']['shortName']
else:
pass
# If no match is found, use the ID as a string
return str(decimal_to_hex(number))
def get_time_ago(timestamp):
now = datetime.now()
dt = datetime.fromtimestamp(timestamp)
delta = now - dt
def get_time_val_units(time_delta):
value = 0
unit = ""
if delta.days > 365:
value = delta.days // 365
if time_delta.days > 365:
value = time_delta.days // 365
unit = "y"
elif delta.days > 30:
value = delta.days // 30
elif time_delta.days > 30:
value = time_delta.days // 30
unit = "mon"
elif delta.days > 7:
value = delta.days // 7
elif time_delta.days > 7:
value = time_delta.days // 7
unit = "w"
elif delta.days > 0:
value = delta.days
elif time_delta.days > 0:
value = time_delta.days
unit = "d"
elif delta.seconds > 3600:
value = delta.seconds // 3600
elif time_delta.seconds > 3600:
value = time_delta.seconds // 3600
unit = "h"
elif delta.seconds > 60:
value = delta.seconds // 60
elif time_delta.seconds > 60:
value = time_delta.seconds // 60
unit = "min"
else:
value = time_delta.seconds
unit = "s"
return (value, unit)
if len(unit) > 0:
def get_readable_duration(seconds):
delta = datetime.timedelta(seconds = seconds)
val, units = get_time_val_units(delta)
return f"{val} {units}"
def get_time_ago(timestamp):
now = datetime.datetime.now()
dt = datetime.datetime.fromtimestamp(timestamp)
delta = now - dt
value, unit = get_time_val_units(delta)
if unit != "s":
return f"{value} {unit} ago"
return "now"