Compare commits

...

80 Commits

Author SHA1 Message Date
pdxlocations
e273b3325d bump version 2025-01-31 16:54:25 -08:00
pdxlocations
ad7c7a148f Add Config Import and Export (#98)
* init

* working changes

* working changes

* remove unused code
2025-01-31 16:42:55 -08:00
Russell Schmidt
df7d9b0e2e Enable resizing app (#95)
* Enable resizing app

* Crash less with narrow windows

If the window gets too narrow we'll still crash, but this lets us get
narrower than before.

* Fix resize re-drawing

* Fix crash when resizing too fast

* Fix crash after resize with dialog open

* Enable resizing settings
2025-01-31 16:37:12 -08:00
pdxlocations
b9d8c9ad44 Add Lat/Lon/Alt to Position Settings (#96)
* add lat/lon/alt

* fix types and conditions
2025-01-30 22:33:42 -08:00
pdxlocations
e27504f215 fix reseting nested modified_settings 2025-01-30 22:12:33 -08:00
pdxlocations
648993607d fix packet log height 2025-01-30 14:24:13 -08:00
pdxlocations
cf8ee248de adjust packet log message window offset 2025-01-30 13:40:13 -08:00
pdxlocations
03f7fd81a7 Merge pull request #94 from pdxlocations/get-name-from-db
Get long/short name from sql db, not nodedb
2025-01-30 12:53:22 -08:00
pdxlocations
5730beafa9 change get name to sql db from nodedb 2025-01-30 12:51:31 -08:00
pdxlocations
2a6a1ff798 rm globlas that i think we don't need 2025-01-30 12:06:56 -08:00
pdxlocations
09d832a203 colors on traceroute window 2025-01-29 21:25:53 -08:00
pdxlocations
bea051a69f Merge pull request #93 from pdxlocations:psk-from-bytes
Convert bytes PSK to base 64
2025-01-29 21:16:42 -08:00
pdxlocations
aa1b7d43a8 Convert bytes PSK to base 64 2025-01-29 21:12:26 -08:00
pdxlocations
59187a3838 rm depreciated field (#92) 2025-01-29 18:36:00 -08:00
pdxlocations
47d6212b3a Minor Refactor and Dynamic Color List (#91)
* refactor init

* dynamic color list
2025-01-29 17:45:48 -08:00
pdxlocations
f6e7a09c7e refactor select_from_list 2025-01-29 16:28:08 -08:00
pdxlocations
1d9d055a4d import cleanup 2025-01-29 16:16:38 -08:00
pdxlocations
dae8c46b7b allow emojis in node name 2025-01-29 11:58:20 -08:00
pdxlocations
d7a9112918 restore .clear for dictionary 2025-01-29 11:42:42 -08:00
pdxlocations
5084eca388 enforce 4-letter short name 2025-01-29 11:40:23 -08:00
pdxlocations
5e4b28d47a typo 2025-01-29 08:47:34 -08:00
pdxlocations
c8a5ad3a95 Adding some startup logging 2025-01-29 08:45:42 -08:00
pdxlocations
852a912072 bump version 2025-01-28 18:09:18 -08:00
pdxlocations
af5fe53658 Add App Settings Menu (#89)
* init

* working changes

* working changes

* working changes

* working changes

* not working changes

* almost working changes

* working changes

* working changes

* broke save and nested menus

* working better

* changes

* working changes

* scrolling text input

* allow wide char input

* set pad bg colors

* add empty color for bg

* reload colors on save

* tab to save changes

* cleanup on isle edit_value

* dynamically create theme options
2025-01-28 16:52:20 -08:00
pdxlocations
f21269ba62 set a few pad bg colors 2025-01-28 13:18:46 -08:00
pdxlocations
3f94b9e276 Merge pull request #88 from rfschmid/fix-crash-in-node-details
Fix crash in get node details
2025-01-28 10:46:04 -08:00
Russell Schmidt
c865d6a942 Fix crash in get node details
The last heard timestamp apparently sometimes gets populated in the dict
as None, instead of just not being present, which caused a crash. Check
for None before tryint to use it.
2025-01-28 12:42:59 -06:00
pdxlocations
0bbabba77b Merge pull request #85 from rfschmid:add-node-details
Add node details
2025-01-27 16:28:54 -08:00
pdxlocations
bf43799a7d Merge pull request #86 from rfschmid/replace-clear-with-erase 2025-01-27 16:26:37 -08:00
pdxlocations
dd0ce4f098 Merge pull request #87 from rfschmid/fix-packet-log-disappearing 2025-01-27 16:24:31 -08:00
Russell Schmidt
5bd9b45753 Fix packet log disappearing 2025-01-27 17:16:56 -06:00
Russell Schmidt
4aaef5381e Replace window.clear() calls with window.erase()
https://lists.gnu.org/archive/html/bug-ncurses/2014-01/msg00007.html
2025-01-27 12:58:01 -06:00
Russell Schmidt
51dcfb5aa2 Add node details 2025-01-27 12:46:33 -06:00
pdxlocations
77b995f00f Merge pull request #81 from rfschmid/make-tab-jump-to-save-in-settings 2025-01-26 17:54:13 -08:00
pdxlocations
62cc2089db Merge pull request #82 from rfschmid:refresh-display-when-closing-dialogs
Refresh display when closing dialogs
2025-01-26 17:52:55 -08:00
pdxlocations
2eb8a17094 Merge pull request #80 from rfschmid/reduce-refresh-on-bool-input 2025-01-26 17:51:09 -08:00
pdxlocations
abe400648f Merge pull request #79 from rfschmid:fix-cancelling-input-crash
Fix crash on cancelling settings input
2025-01-26 17:49:29 -08:00
Russell Schmidt
22b2a9a50e Make refresh more efficient 2025-01-26 18:33:46 -06:00
Russell Schmidt
9a306f1553 Refresh display when closing dialogs 2025-01-26 18:12:10 -06:00
Russell Schmidt
92db3f4a30 Make tab jump to save in settings 2025-01-26 16:41:13 -06:00
Russell Schmidt
a32526e650 Reduce screen refresh on bool input 2025-01-26 16:29:48 -06:00
Russell Schmidt
1ebf1c4988 Fix crash on cancelling settings input
When backing out of entering user short name or long name, the app would
crash. Once it didn't crash, backing out would set these fields to None,
rather than cancelling the change.
2025-01-26 16:07:04 -06:00
pdxlocations
7901f00c49 Merge pull request #78 from rfschmid:fix-crash-on-long-enum-settings
Allow enum settings entry to scroll
2025-01-26 13:25:57 -08:00
Russell Schmidt
4ce279ab0d Allow enum settings entry to scroll 2025-01-26 15:01:38 -06:00
pdxlocations
e8e91f893e Merge pull request #76 from rfschmid/allow-settings-wraparound-scroll 2025-01-26 12:37:51 -08:00
pdxlocations
16c81f059d Merge pull request #75 from rfschmid:fix-settings-crash-when-height-smol
Enable scrolling settings options
2025-01-26 12:36:14 -08:00
pdxlocations
5588c6c6d9 Merge pull request #77 from rfschmid:remember-settings-menu-selected-idx
Remember settings menu stack selected index
2025-01-26 12:35:12 -08:00
Russell Schmidt
73111a46bb Fix exception adding last item in settings 2025-01-26 13:19:56 -06:00
Russell Schmidt
2d762515b4 Maybe fix crash with settings scroll 2025-01-26 11:22:59 -06:00
Russell Schmidt
6ce9707232 Remember settings menu stack selected index 2025-01-26 11:19:29 -06:00
Russell Schmidt
c33b903825 Allow settings wraparound scrolling 2025-01-26 11:12:38 -06:00
Russell Schmidt
c5327d8644 Enable scrolling settings options
Fixes crash in settings when the window height is too small to
accommodate the full list of options.
2025-01-26 10:33:56 -06:00
pdxlocations
ed1e9a3055 reorder config menu 2025-01-25 20:45:23 -08:00
pdxlocations
f0554ec1f6 bump version 2025-01-25 20:42:16 -08:00
Russell Schmidt
bb623d149c Fix crash when cancelling shutdown/reboot/etc (#73) 2025-01-25 20:39:04 -08:00
pdxlocations
4a92ad49ce missed some coloring 2025-01-25 18:57:04 -08:00
pdxlocations
9d0470d55b just call it green 2025-01-25 18:27:48 -08:00
pdxlocations
e96ea7ffef Green Theme for Josh (#72)
* add themes

* change tty defaults
2025-01-25 18:23:30 -08:00
pdxlocations
44b1a3071b fix packet log border color 2025-01-25 18:02:37 -08:00
pdxlocations
702d20a011 fix sesitive settings highlight 2025-01-25 16:25:47 -08:00
pdxlocations
fba4642ff8 fix settings crash 2025-01-25 16:02:40 -08:00
pdxlocations
916b0cfe53 fix splash border 2025-01-25 15:36:24 -08:00
pdxlocations
e086814b83 fix color fixes 2025-01-25 15:30:06 -08:00
pdxlocations
92f08d020e cleanup comments 2025-01-25 12:29:05 -08:00
pdxlocations
86463f6f84 maybe fix setting enum (#69) 2025-01-25 11:30:53 -08:00
pdxlocations
e58340fa65 extra bool check 2025-01-25 09:10:35 -08:00
pdxlocations
c243daf253 Fix channels saving to wrong index 2025-01-25 08:43:17 -08:00
pdxlocations
70f1f5d4bf late version bump 2025-01-25 07:48:41 -08:00
pdxlocations
67f1bde217 rm incorrect comment 2025-01-24 22:34:50 -08:00
pdxlocations
ca17bbee31 color fixes 2025-01-24 22:12:25 -08:00
pdxlocations
659dad493c check for new default colors 2025-01-24 21:38:51 -08:00
pdxlocations
4359d37979 add settings box color 2025-01-24 21:25:22 -08:00
pdxlocations
96612b3e1b rm print statements 2025-01-24 21:14:17 -08:00
pdxlocations
84246aefd9 Move user config into JSON file (#68)
* working changes

* Reduce redraws in settings (#66)

* Reduce redraws in settings

* Fix drawing sensitive settings in red

* Update other settings to use new color API

* Update reduce redraw changes to use new color API

* Fix highlight/unhighlight red settings

* add settings colors

* format json file

---------

Co-authored-by: Russell Schmidt <russ@sumonkey.com>
2025-01-24 21:00:00 -08:00
Russell Schmidt
7cd98a39f8 Reduce redraws in settings (#66)
* Reduce redraws in settings

* Fix drawing sensitive settings in red

* Update other settings to use new color API

* Update reduce redraw changes to use new color API

* Fix highlight/unhighlight red settings
2025-01-24 18:12:57 -08:00
pdxlocations
978e2942cb fix setting is_licensed bool 2025-01-24 09:55:42 -08:00
pdxlocations
92790ddca6 use local time for timestamp 2025-01-24 08:35:39 -08:00
pdxlocations
d5a6a0462f Color Customization (#64)
* init

* customize colors

* fix splash frame

* cleanup
2025-01-23 23:08:01 -08:00
pdxlocations
3816a3f166 Merge pull request #62 from rfschmid/stop-sending-blank
Don't allow sending empty string
2025-01-23 18:21:09 -08:00
Russell Schmidt
c61dc19319 Don't allow sending empty string 2025-01-23 19:15:35 -06:00
18 changed files with 1599 additions and 278 deletions

3
.gitignore vendored
View File

@@ -1,7 +1,10 @@
venv/
.venv/
__pycache__/
node-configs/
client.db
.DS_Store
client.log
settings.log
config.json
default_config.log

View File

@@ -1,9 +1,12 @@
import sqlite3
import globals
import time
from datetime import datetime
import logging
from utilities.utils import get_name_from_number
from utilities.utils import decimal_to_hex
import default_config as config
import globals
def get_table_name(channel):
# Construct the table name
@@ -14,7 +17,7 @@ def get_table_name(channel):
def save_message_to_db(channel, user_id, message_text):
"""Save messages to the database, ensuring the table exists."""
try:
with sqlite3.connect(globals.db_file_path) as db_connection:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
quoted_table_name = get_table_name(channel)
@@ -44,14 +47,14 @@ def save_message_to_db(channel, user_id, message_text):
return timestamp
except sqlite3.Error as e:
print(f"SQLite error in save_message_to_db: {e}")
logging.error(f"SQLite error in save_message_to_db: {e}")
except Exception as e:
print(f"Unexpected error in save_message_to_db: {e}")
logging.error(f"Unexpected error in save_message_to_db: {e}")
def update_ack_nak(channel, timestamp, message, ack):
try:
with sqlite3.connect(globals.db_file_path) as db_connection:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
update_query = f"""
UPDATE {get_table_name(channel)}
@@ -65,10 +68,10 @@ def update_ack_nak(channel, timestamp, message, ack):
db_connection.commit()
except sqlite3.Error as e:
print(f"SQLite error in update_ack_nak: {e}")
logging.error(f"SQLite error in update_ack_nak: {e}")
except Exception as e:
print(f"Unexpected error in update_ack_nak: {e}")
logging.error(f"Unexpected error in update_ack_nak: {e}")
from datetime import datetime
@@ -76,7 +79,7 @@ from datetime import datetime
def load_messages_from_db():
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
try:
with sqlite3.connect(globals.db_file_path) as db_connection:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
# Retrieve all table names that match the pattern
@@ -120,18 +123,18 @@ def load_messages_from_db():
if hour not in hourly_messages:
hourly_messages[hour] = []
ack_str = globals.ack_unknown_str
ack_str = config.ack_unknown_str
if ack_type == "Implicit":
ack_str = globals.ack_implicit_str
ack_str = config.ack_implicit_str
elif ack_type == "Ack":
ack_str = globals.ack_str
ack_str = config.ack_str
elif ack_type == "Nak":
ack_str = globals.nak_str
ack_str = config.nak_str
if user_id == str(globals.myNodeNum):
formatted_message = (f"{globals.sent_message_prefix}{ack_str}: ", message)
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (f"{globals.message_prefix} {get_name_from_number(int(user_id), 'short')}: ", message)
formatted_message = (f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ", message)
hourly_messages[hour].append(formatted_message)
@@ -141,16 +144,16 @@ def load_messages_from_db():
globals.all_messages[channel].extend(messages)
except sqlite3.Error as e:
print(f"SQLite error while loading messages from table '{table_name}': {e}")
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
except sqlite3.Error as e:
print(f"SQLite error in load_messages_from_db: {e}")
logging.error(f"SQLite error in load_messages_from_db: {e}")
def init_nodedb():
"""Initialize the node database and update it with nodes from the interface."""
try:
with sqlite3.connect(globals.db_file_path) as db_connection:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
# Table name construction
@@ -196,14 +199,14 @@ def init_nodedb():
db_connection.commit()
except sqlite3.Error as e:
print(f"SQLite error in init_nodedb: {e}")
logging.error(f"SQLite error in init_nodedb: {e}")
except Exception as e:
print(f"Unexpected error in init_nodedb: {e}")
logging.error(f"Unexpected error in init_nodedb: {e}")
def maybe_store_nodeinfo_in_db(packet):
"""Save nodeinfo unless that record is already there."""
try:
with sqlite3.connect(globals.db_file_path) as db_connection:
with sqlite3.connect(config.db_file_path) as db_connection:
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote the table name becuase we might begin with numerics
@@ -280,6 +283,41 @@ def maybe_store_nodeinfo_in_db(packet):
except sqlite3.Error as e:
print(f"SQLite error in maybe_store_nodeinfo_in_db: {e}")
logging.error(f"SQLite error in maybe_store_nodeinfo_in_db: {e}")
finally:
db_connection.close()
def get_name_from_database(user_id, type="long"):
"""
Retrieve a user's name (long or short) from the node database.
:param user_id: The user ID to look up.
:param type: "long" for long name, "short" for short name.
:return: The retrieved name or the hex of the user id
"""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
column_name = "long_name" if type == "long" else "short_name"
# Query the database
query = f"SELECT {column_name} FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))
result = db_cursor.fetchone()
return result[0] if result else decimal_to_hex(user_id)
except sqlite3.Error as e:
logging.error(f"SQLite error in get_name_from_database: {e}")
return "Unknown"
except Exception as e:
logging.error(f"Unexpected error in get_name_from_database: {e}")
return "Unknown"

190
default_config.py Normal file
View File

@@ -0,0 +1,190 @@
import logging
import json
import os
def format_json_single_line_arrays(data, indent=4):
"""
Formats JSON with arrays on a single line while keeping other elements properly indented.
"""
def format_value(value, current_indent):
if isinstance(value, dict):
items = []
for key, val in value.items():
items.append(
f'{" " * current_indent}"{key}": {format_value(val, current_indent + indent)}'
)
return "{\n" + ",\n".join(items) + f"\n{' ' * (current_indent - indent)}}}"
elif isinstance(value, list):
return f"[{', '.join(json.dumps(el, ensure_ascii=False) for el in value)}]"
else:
return json.dumps(value, ensure_ascii=False)
return format_value(data, indent)
# Recursive function to check and update nested dictionaries
def update_dict(default, actual):
updated = False
for key, value in default.items():
if key not in actual:
actual[key] = value
updated = True
elif isinstance(value, dict):
# Recursively check nested dictionaries
updated = update_dict(value, actual[key]) or updated
return updated
def initialize_config():
app_directory = os.path.dirname(os.path.abspath(__file__))
json_file_path = os.path.join(app_directory, "config.json")
COLOR_CONFIG_DARK = {
"default": ["white", "black"],
"background": [" ", "black"],
"splash_logo": ["green", "black"],
"splash_text": ["white", "black"],
"input": ["white", "black"],
"node_list": ["white", "black"],
"channel_list": ["white", "black"],
"channel_selected": ["green", "black"],
"rx_messages": ["cyan", "black"],
"tx_messages": ["green", "black"],
"timestamps": ["white", "black"],
"commands": ["white", "black"],
"window_frame": ["white", "black"],
"window_frame_selected": ["green", "black"],
"log_header": ["blue", "black"],
"log": ["green", "black"],
"settings_default": ["white", "black"],
"settings_sensitive": ["red", "black"],
"settings_save": ["green", "black"],
"settings_breadcrumbs": ["white", "black"]
}
COLOR_CONFIG_LIGHT = {
"default": ["black", "white"],
"background": [" ", "white"],
"splash_logo": ["green", "white"],
"splash_text": ["black", "white"],
"input": ["black", "white"],
"node_list": ["black", "white"],
"channel_list": ["black", "white"],
"channel_selected": ["green", "white"],
"rx_messages": ["cyan", "white"],
"tx_messages": ["green", "white"],
"timestamps": ["black", "white"],
"commands": ["black", "white"],
"window_frame": ["black", "white"],
"window_frame_selected": ["green", "white"],
"log_header": ["black", "white"],
"log": ["blue", "white"],
"settings_default": ["black", "white"],
"settings_sensitive": ["red", "white"],
"settings_save": ["green", "white"],
"settings_breadcrumbs": ["black", "white"]
}
COLOR_CONFIG_GREEN = {
"default": ["green", "black"],
"background": [" ", "black"],
"splash_logo": ["green", "black"],
"splash_text": ["green", "black"],
"input": ["green", "black"],
"node_list": ["green", "black"],
"channel_list": ["green", "black"],
"channel_selected": ["cyan", "black"],
"rx_messages": ["green", "black"],
"tx_messages": ["green", "black"],
"timestamps": ["green", "black"],
"commands": ["green", "black"],
"window_frame": ["green", "black"],
"window_frame_selected": ["cyan", "black"],
"log_header": ["green", "black"],
"log": ["green", "black"],
"settings_default": ["green", "black"],
"settings_sensitive": ["green", "black"],
"settings_save": ["green", "black"],
"settings_breadcrumbs": ["green", "black"]
}
default_config_variables = {
"db_file_path": os.path.join(app_directory, "client.db"),
"log_file_path": os.path.join(app_directory, "client.log"),
"message_prefix": ">>",
"sent_message_prefix": ">> Sent",
"notification_symbol": "*",
"ack_implicit_str": "[◌]",
"ack_str": "[✓]",
"nak_str": "[x]",
"ack_unknown_str": "[…]",
"theme": "dark",
"COLOR_CONFIG_DARK": COLOR_CONFIG_DARK,
"COLOR_CONFIG_LIGHT": COLOR_CONFIG_LIGHT,
"COLOR_CONFIG_GREEN": COLOR_CONFIG_GREEN
}
if not os.path.exists(json_file_path):
with open(json_file_path, "w", encoding="utf-8") as json_file:
formatted_json = format_json_single_line_arrays(default_config_variables)
json_file.write(formatted_json)
# Ensure all default variables exist in the JSON file
with open(json_file_path, "r", encoding="utf-8") as json_file:
loaded_config = json.load(json_file)
# Check and add missing variables
updated = update_dict(default_config_variables, loaded_config)
# Update the JSON file if any variables were missing
if updated:
formatted_json = format_json_single_line_arrays(loaded_config)
with open(json_file_path, "w", encoding="utf-8") as json_file:
json_file.write(formatted_json)
logging.info(f"JSON file updated with missing default variables and COLOR_CONFIG items.")
return loaded_config
def assign_config_variables(loaded_config):
# Assign values to local variables
global db_file_path, log_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global theme, COLOR_CONFIG
db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"]
message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"]
ack_implicit_str = loaded_config["ack_implicit_str"]
ack_str = loaded_config["ack_str"]
nak_str = loaded_config["nak_str"]
ack_unknown_str = loaded_config["ack_unknown_str"]
theme = loaded_config["theme"]
if theme == "dark":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
elif theme == "light":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
# Call the function when the script is imported
loaded_config = initialize_config()
assign_config_variables(loaded_config)
if __name__ == "__main__":
logging.basicConfig(
filename="default_config.log",
level=logging.INFO, # DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s"
)
print("\nLoaded Configuration:")
print(f"Database File Path: {db_file_path}")
print(f"Log File Path: {log_file_path}")
print(f"Message Prefix: {message_prefix}")
print(f"Sent Message Prefix: {sent_message_prefix}")
print(f"Notification Symbol: {notification_symbol}")
print(f"ACK Implicit String: {ack_implicit_str}")
print(f"ACK String: {ack_str}")
print(f"NAK String: {nak_str}")
print(f"ACK Unknown String: {ack_unknown_str}")
print(f"Color Config: {COLOR_CONFIG}")

View File

@@ -1,7 +1,3 @@
import os
# App Variables
app_directory = os.path.dirname(os.path.abspath(__file__))
interface = None
display_log = False
all_messages = {}
@@ -13,15 +9,4 @@ myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0
current_window = 0
# User Configurable
db_file_path = os.path.join(app_directory, "client.db")
log_file_path = os.path.join(app_directory, "client.log")
message_prefix = ">>"
sent_message_prefix = message_prefix + " Sent"
notification_symbol = "*"
ack_implicit_str = "[◌]"
ack_str = "[✓]"
nak_str = "[x]"
ack_unknown_str = "[…]"
current_window = 0

View File

@@ -1,5 +1,6 @@
import curses
import ipaddress
from ui.colors import get_color
def get_user_input(prompt):
# Calculate the dynamic height and width for the input window
@@ -10,35 +11,46 @@ def get_user_input(prompt):
# Create a new window for user input
input_win = curses.newwin(height, width, start_y, start_x)
input_win.bkgd(get_color("background"))
input_win.attrset(get_color("window_frame"))
input_win.border()
# Display the prompt
input_win.addstr(1, 2, prompt, curses.A_BOLD)
input_win.addstr(3, 2, "Enter value: ")
input_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
input_win.addstr(3, 2, "Enter value: ", get_color("settings_default"))
input_win.refresh()
# Check if "shortName" is in the prompt, and set max length accordingly
max_length = 4 if "shortName" in prompt else None
curses.curs_set(1)
user_input = ""
input_position = (3, 15) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
key = input_win.getch(3, 15 + len(user_input)) # Adjust cursor position dynamically
if key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
key = input_win.get_wch(row, col + len(user_input)) # Adjust cursor position dynamically
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
curses.curs_set(0)
return None # Exit without returning a value
elif key == ord('\n'): # Enter key
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
break
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
user_input = user_input[:-1]
input_win.addstr(3, 15, " " * (len(user_input) + 1)) # Clear the line
input_win.addstr(3, 15, user_input)
else:
user_input += chr(key)
input_win.addstr(3, 15, user_input)
input_win.addstr(row, col, " " * (len(user_input) + 1), get_color("settings_default")) # Clear the line
input_win.addstr(row, col, user_input, get_color("settings_default"))
elif max_length is None or len(user_input) < max_length: # Enforce max length if applicable
# Append typed character to input text
if(isinstance(key, str)):
user_input += key
else:
user_input += chr(key)
input_win.addstr(3, 15, user_input, get_color("settings_default"))
curses.curs_set(0)
# Clear the input window
input_win.clear()
input_win.erase()
input_win.refresh()
return user_input
@@ -54,26 +66,35 @@ def get_bool_selection(message, current_value):
start_x = (curses.COLS - width) // 2
bool_win = curses.newwin(height, width, start_y, start_x)
bool_win.bkgd(get_color("background"))
bool_win.attrset(get_color("window_frame"))
bool_win.keypad(True)
bool_win.erase()
bool_win.border()
bool_win.addstr(1, 2, message, get_color("settings_default", bold=True))
for idx, option in enumerate(options):
if idx == selected_index:
bool_win.addstr(idx + 3, 4, option, get_color("settings_default", reverse=True))
else:
bool_win.addstr(idx + 3, 4, option, get_color("settings_default"))
bool_win.refresh()
while True:
bool_win.clear()
bool_win.border()
bool_win.addstr(1, 2, message, curses.A_BOLD)
for idx, option in enumerate(options):
if idx == selected_index:
bool_win.addstr(idx + 3, 4, option, curses.A_REVERSE)
else:
bool_win.addstr(idx + 3, 4, option)
bool_win.refresh()
key = bool_win.getch()
if key == curses.KEY_UP:
selected_index = max(0, selected_index - 1)
if(selected_index > 0):
selected_index = selected_index - 1
bool_win.chgat(1 + 3, 4, len(options[1]), get_color("settings_default"))
bool_win.chgat(0 + 3, 4, len(options[0]), get_color("settings_default", reverse = True))
elif key == curses.KEY_DOWN:
selected_index = min(len(options) - 1, selected_index + 1)
if(selected_index < len(options) - 1):
selected_index = selected_index + 1
bool_win.chgat(0 + 3, 4, len(options[0]), get_color("settings_default"))
bool_win.chgat(1 + 3, 4, len(options[1]), get_color("settings_default", reverse = True))
elif key == ord('\n'): # Enter key
return options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
@@ -87,6 +108,8 @@ def get_repeated_input(current_value):
start_x = (curses.COLS - width) // 2
repeated_win = curses.newwin(height, width, start_y, start_x)
repeated_win.bkgd(get_color("background"))
repeated_win.attrset(get_color("window_frame"))
repeated_win.keypad(True) # Enable keypad for special keys
curses.echo()
@@ -94,11 +117,11 @@ def get_repeated_input(current_value):
user_input = ""
while True:
repeated_win.clear()
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(1, 2, "Enter comma-separated values:", curses.A_BOLD)
repeated_win.addstr(3, 2, f"Current: {', '.join(map(str, current_value))}")
repeated_win.addstr(5, 2, f"New value: {user_input}")
repeated_win.addstr(1, 2, "Enter comma-separated values:", get_color("settings_default", bold=True))
repeated_win.addstr(3, 2, f"Current: {', '.join(map(str, current_value))}", get_color("settings_default"))
repeated_win.addstr(5, 2, f"New value: {user_input}", get_color("settings_default"))
repeated_win.refresh()
key = repeated_win.getch()
@@ -119,6 +142,22 @@ def get_repeated_input(current_value):
except ValueError:
pass # Ignore invalid character inputs
def move_highlight(old_idx, new_idx, options, enum_win, enum_pad):
if old_idx == new_idx:
return # no-op
enum_pad.chgat(old_idx, 0, enum_pad.getmaxyx()[1], get_color("settings_default"))
enum_pad.chgat(new_idx, 0, enum_pad.getmaxyx()[1], get_color("settings_default", reverse = True))
enum_win.refresh()
start_index = max(0, new_idx - (enum_win.getmaxyx()[0] - 4))
enum_win.refresh()
enum_pad.refresh(start_index, 0,
enum_win.getbegyx()[0] + 2, enum_win.getbegyx()[1] + 4,
enum_win.getbegyx()[0] + enum_win.getmaxyx()[0] - 2, enum_win.getbegyx()[1] + 4 + enum_win.getmaxyx()[1] - 4)
def get_enum_input(options, current_value):
selected_index = options.index(current_value) if current_value in options else 0
@@ -128,26 +167,39 @@ def get_enum_input(options, current_value):
start_x = (curses.COLS - width) // 2
enum_win = curses.newwin(height, width, start_y, start_x)
enum_win.bkgd(get_color("background"))
enum_win.attrset(get_color("window_frame"))
enum_win.keypad(True)
enum_pad = curses.newpad(len(options) + 1, width - 8)
enum_pad.bkgd(get_color("background"))
enum_win.erase()
enum_win.border()
enum_win.addstr(1, 2, "Select an option:", get_color("settings_default", bold=True))
for idx, option in enumerate(options):
if idx == selected_index:
enum_pad.addstr(idx, 0, option.ljust(width - 8), get_color("settings_default", reverse=True))
else:
enum_pad.addstr(idx, 0, option.ljust(width - 8), get_color("settings_default"))
enum_win.refresh()
enum_pad.refresh(0, 0,
enum_win.getbegyx()[0] + 2, enum_win.getbegyx()[1] + 4,
enum_win.getbegyx()[0] + enum_win.getmaxyx()[0] - 2, enum_win.getbegyx()[1] + enum_win.getmaxyx()[1] - 4)
while True:
enum_win.clear()
enum_win.border()
enum_win.addstr(1, 2, "Select an option:", curses.A_BOLD)
for idx, option in enumerate(options):
if idx == selected_index:
enum_win.addstr(idx + 2, 4, option, curses.A_REVERSE)
else:
enum_win.addstr(idx + 2, 4, option)
enum_win.refresh()
key = enum_win.getch()
if key == curses.KEY_UP:
old_selected_index = selected_index
selected_index = max(0, selected_index - 1)
move_highlight(old_selected_index, selected_index, options, enum_win, enum_pad)
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
selected_index = min(len(options) - 1, selected_index + 1)
move_highlight(old_selected_index, selected_index, options, enum_win, enum_pad)
elif key == ord('\n'): # Enter key
return options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
@@ -163,6 +215,8 @@ def get_fixed32_input(current_value):
start_x = (curses.COLS - width) // 2
fixed32_win = curses.newwin(height, width, start_y, start_x)
fixed32_win.bkgd(get_color("background"))
fixed32_win.attrset(get_color("window_frame"))
fixed32_win.keypad(True)
curses.echo()
@@ -170,7 +224,7 @@ def get_fixed32_input(current_value):
user_input = ""
while True:
fixed32_win.clear()
fixed32_win.erase()
fixed32_win.border()
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD)
fixed32_win.addstr(3, 2, f"Current: {current_value}")
@@ -204,4 +258,73 @@ def get_fixed32_input(current_value):
if char.isdigit() or char == ".":
user_input += char # Append only valid characters (digits or dots)
except ValueError:
pass # Ignore invalid inputs
pass # Ignore invalid inputs
def select_from_list(prompt, current_option, list_options):
"""
Displays a scrollable list of list_options for the user to choose from using a pad.
"""
selected_index = list_options.index(current_option) if current_option in list_options else 0
height = min(len(list_options) + 5, curses.LINES - 2)
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
list_win = curses.newwin(height, width, start_y, start_x)
list_win.bkgd(get_color("background"))
list_win.attrset(get_color("window_frame"))
list_win.keypad(True)
list_pad = curses.newpad(len(list_options) + 1, width - 8)
list_pad.bkgd(get_color("background"))
# Render header
list_win.clear()
list_win.border()
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
# Render options on the pad
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
# Initial refresh
list_win.refresh()
list_pad.refresh(0, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
while True:
key = list_win.getch()
if key == curses.KEY_UP:
if selected_index > 0:
selected_index -= 1
elif key == curses.KEY_DOWN:
if selected_index < len(list_options) - 1:
selected_index += 1
elif key == curses.KEY_RIGHT or key == ord('\n'):
return list_options[selected_index]
elif key == curses.KEY_LEFT or key == 27: # ESC key
return current_option
# Refresh the pad with updated selection and scroll offset
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
list_win.refresh()
list_pad.refresh(0, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)

View File

@@ -3,7 +3,7 @@
'''
Contact - A Console UI for Meshtastic by http://github.com/pdxlocations
Powered by Meshtastic.org
V 1.0.3
V 1.2.0
'''
import curses
@@ -18,6 +18,7 @@ from message_handlers.rx_handler import on_receive
from ui.curses_ui import main_ui, draw_splash
from utilities.utils import get_channels, get_node_list, get_nodeNum
from db_handler import init_nodedb, load_messages_from_db
import default_config as config
import globals
# Set environment variables for ncurses compatibility
@@ -28,7 +29,7 @@ os.environ["LANG"] = "C.UTF-8"
# Configure logging
# Run `tail -f client.log` in another terminal to view live
logging.basicConfig(
filename=globals.log_file_path,
filename=config.log_file_path,
level=logging.INFO, # DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s"
)
@@ -38,13 +39,17 @@ def main(stdscr):
draw_splash(stdscr)
parser = setup_parser()
args = parser.parse_args()
logging.info("Initializing interface %s", args)
globals.interface = initialize_interface(args)
logging.info("Interface initialized")
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, 'meshtastic.receive')
init_nodedb()
load_messages_from_db()
logging.info("Starting main UI")
main_ui(stdscr)
except Exception as e:
logging.error("An error occurred: %s", e)

View File

@@ -1,14 +1,15 @@
from meshtastic import BROADCAST_NUM
from utilities.utils import get_node_list, decimal_to_hex, get_name_from_number
import globals
import logging
import time
from utilities.utils import get_node_list
from ui.curses_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
from db_handler import save_message_to_db, maybe_store_nodeinfo_in_db
from db_handler import save_message_to_db, maybe_store_nodeinfo_in_db, get_name_from_database
import default_config as config
import globals
from datetime import datetime
def on_receive(packet, interface):
global nodes_win
# Update packet log
globals.packet_buffer.append(packet)
@@ -62,13 +63,13 @@ def on_receive(packet, interface):
# Add received message to the messages list
message_from_id = packet['from']
message_from_string = get_name_from_number(message_from_id, type='short') + ":"
message_from_string = get_name_from_database(message_from_id, type='short') + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = int(packet['rxTime']) # Use the packet's rxTime for timestamp
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime('%Y-%m-%d %H:00')
# Retrieve the last timestamp if available
@@ -88,7 +89,7 @@ def on_receive(packet, interface):
if last_hour != current_hour:
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
globals.all_messages[globals.channel_list[channel_number]].append((f"{globals.message_prefix} {message_from_string} ", message_string))
globals.all_messages[globals.channel_list[channel_number]].append((f"{config.message_prefix} {message_from_string} ", message_string))
if refresh_channels:
draw_channel_list()
@@ -98,4 +99,4 @@ def on_receive(packet, interface):
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
except KeyError as e:
print(f"Error processing packet: {e}")
logging.error(f"Error processing packet: {e}")

View File

@@ -1,10 +1,11 @@
from datetime import datetime
from meshtastic import BROADCAST_NUM
from db_handler import save_message_to_db, update_ack_nak
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from utilities.utils import get_name_from_number
import globals
import google.protobuf.json_format
from meshtastic import BROADCAST_NUM
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from db_handler import save_message_to_db, update_ack_nak, get_name_from_database
import default_config as config
import globals
ack_naks = {}
@@ -23,16 +24,16 @@ def onAckNak(packet):
ack_type = None
if(packet['decoded']['routing']['errorReason'] == "NONE"):
if(packet['from'] == globals.myNodeNum): # Ack "from" ourself means implicit ACK
confirm_string = globals.ack_implicit_str
confirm_string = config.ack_implicit_str
ack_type = "Implicit"
else:
confirm_string = globals.ack_str
confirm_string = config.ack_str
ack_type = "Ack"
else:
confirm_string = globals.nak_str
confirm_string = config.nak_str
ack_type = "Nak"
globals.all_messages[acknak['channel']][acknak['messageIndex']] = (globals.sent_message_prefix + confirm_string + ": ", message)
globals.all_messages[acknak['channel']][acknak['messageIndex']] = (config.sent_message_prefix + confirm_string + ": ", message)
update_ack_nak(acknak['channel'], acknak['timestamp'], message, ack_type)
@@ -55,18 +56,18 @@ def on_response_traceroute(packet):
msg_str = "Traceroute to:\n"
route_str = get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
route_str = get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
# SNR list should have one more entry than the route, as the final destination adds its SNR also
lenTowards = 0 if "route" not in msg_dict else len(msg_dict["route"])
snrTowardsValid = "snrTowards" in msg_dict and len(msg_dict["snrTowards"]) == lenTowards + 1
if lenTowards > 0: # Loop through hops in route and add SNR if available
for idx, node_num in enumerate(msg_dict["route"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrTowards"][idx] / 4) if snrTowardsValid and msg_dict["snrTowards"][idx] != UNK_SNR else "?") + "dB)"
# End with origin of response
route_str += " --> " + (get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}") \
route_str += " --> " + (get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}") \
+ " (" + (str(msg_dict["snrTowards"][-1] / 4) if snrTowardsValid and msg_dict["snrTowards"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route towards destination
@@ -76,15 +77,15 @@ def on_response_traceroute(packet):
backValid = "hopStart" in packet and "snrBack" in msg_dict and len(msg_dict["snrBack"]) == lenBack + 1
if backValid:
msg_str += "Back:\n"
route_str = get_name_from_number(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
route_str = get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
if lenBack > 0: # Loop through hops in routeBack and add SNR if available
for idx, node_num in enumerate(msg_dict["routeBack"]):
route_str += " --> " + (get_name_from_number(node_num, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrBack"][idx] / 4) if msg_dict["snrBack"][idx] != UNK_SNR else "?") + "dB)"
# End with destination of response (us)
route_str += " --> " + (get_name_from_number(packet["to"], 'short') or f"{packet['to']:08x}") \
route_str += " --> " + (get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}") \
+ " (" + (str(msg_dict["snrBack"][-1] / 4) if msg_dict["snrBack"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route back to us
@@ -101,11 +102,11 @@ def on_response_traceroute(packet):
add_notification(channel_number)
refresh_channels = True
message_from_string = get_name_from_number(packet['from'], type='short') + ":\n"
message_from_string = get_name_from_database(packet['from'], type='short') + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append((f"{globals.message_prefix} {message_from_string}", msg_str))
globals.all_messages[globals.channel_list[channel_number]].append((f"{config.message_prefix} {message_from_string}", msg_str))
if refresh_channels:
draw_channel_list()
@@ -158,7 +159,7 @@ def send_message(message, destination=BROADCAST_NUM, channel=0):
if last_hour != current_hour:
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
globals.all_messages[channel_id].append((globals.sent_message_prefix + globals.ack_unknown_str + ": ", message))
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message)

View File

@@ -15,10 +15,10 @@ def settings_shutdown(interface):
def settings_factory_reset(interface):
interface.localNode.factoryReset()
def settings_set_owner(interface, long_name=None, short_name=None, is_licensed=False):
if isinstance(is_licensed, str):
is_licensed = is_licensed.lower() == 'true'
interface.localNode.setOwner(long_name, short_name, is_licensed)
# def settings_set_owner(interface, long_name=None, short_name=None, is_licensed=False):
# if isinstance(is_licensed, str):
# is_licensed = is_licensed.lower() == 'true'
# interface.localNode.setOwner(long_name, short_name, is_licensed)
def save_changes(interface, menu_path, modified_settings):
@@ -38,12 +38,24 @@ def save_changes(interface, menu_path, modified_settings):
if menu_path[1] == "Radio Settings" or menu_path[1] == "Module Settings":
config_category = menu_path[2].lower() # for radio and module configs
if {'latitude', 'longitude', 'altitude'} & modified_settings.keys():
lat = float(modified_settings.get('latitude', 0.0))
lon = float(modified_settings.get('longitude', 0.0))
alt = int(modified_settings.get('altitude', 0))
interface.localNode.setFixedPosition(lat, lon, alt)
logging.info(f"Updated {config_category} with Latitude: {lat} and Longitude {lon} and Altitude {alt}")
return
elif menu_path[1] == "User Settings": # for user configs
config_category = "User Settings"
long_name = modified_settings.get("longName")
short_name = modified_settings.get("shortName")
is_licensed = modified_settings.get("isLicensed")
is_licensed = is_licensed == "True" or is_licensed is True
node.setOwner(long_name, short_name, is_licensed)
logging.info(f"Updated {config_category} with Long Name: {long_name} and Short Name {short_name} and Licensed Mode {is_licensed}")
return
@@ -52,7 +64,7 @@ def save_changes(interface, menu_path, modified_settings):
try:
channel = menu_path[-1]
channel_num = int(channel.split()[-1])
channel_num = int(channel.split()[-1]) - 1
except (IndexError, ValueError) as e:
channel_num = None

View File

@@ -1,35 +1,45 @@
import curses
import logging
import os
from save_to_radio import settings_factory_reset, settings_reboot, settings_reset_nodedb, settings_shutdown, save_changes
from utilities.config_io import config_export, config_import
from input_handlers import get_bool_selection, get_repeated_input, get_user_input, get_enum_input, get_fixed32_input, select_from_list
from ui.menus import generate_menu_from_protobuf
from input_handlers import get_bool_selection, get_repeated_input, get_user_input, get_enum_input, get_fixed32_input
from ui.colors import setup_colors
from ui.colors import setup_colors, get_color
from utilities.arg_parser import setup_parser
from utilities.interfaces import initialize_interface
from user_config import json_editor
import globals
width = 60
save_option = "Save Changes"
sensitive_settings = ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"]
def display_menu(current_menu, menu_path, selected_index, show_save_option):
global menu_win
# Calculate the dynamic height based on the number of menu items
num_items = len(current_menu) + (1 if show_save_option else 0) # Add 1 for the "Save Changes" option if applicable
height = min(curses.LINES - 2, num_items + 5) # Ensure the menu fits within the terminal height
width = 60
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
# Create a new curses window with dynamic dimensions
menu_win = curses.newwin(height, width, start_y, start_x)
menu_win.clear()
menu_win.erase()
menu_win.bkgd(get_color("background"))
menu_win.attrset(get_color("window_frame"))
menu_win.border()
menu_win.keypad(True)
menu_pad = curses.newpad(len(current_menu) + 1, width - 8)
menu_pad.bkgd(get_color("background"))
# Display the current menu path as a header
header = " > ".join(word.title() for word in menu_path)
if len(header) > width - 4:
header = header[:width - 7] + "..."
menu_win.addstr(1, 2, header, curses.A_BOLD)
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
# Display the menu options
for idx, option in enumerate(current_menu):
@@ -40,60 +50,105 @@ def display_menu(current_menu, menu_path, selected_index, show_save_option):
try:
# Use red color for "Reboot" or "Shutdown"
color = curses.color_pair(5) if option in ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"] else curses.color_pair(1)
if idx == selected_index:
menu_win.addstr(idx + 3, 4, f"{display_option:<{width // 2 - 2}} {display_value}", curses.A_REVERSE | color)
else:
menu_win.addstr(idx + 3, 4, f"{display_option:<{width // 2 - 2}} {display_value}", color)
color = get_color("settings_sensitive" if option in sensitive_settings else "settings_default", reverse = (idx == selected_index))
menu_pad.addstr(idx, 0, f"{display_option:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
except curses.error:
pass
# Show save option if applicable
if show_save_option:
save_option = "Save Changes"
save_position = height - 2
if selected_index == len(current_menu):
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, curses.color_pair(2) | curses.A_REVERSE)
else:
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, curses.color_pair(2))
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, get_color("settings_save", reverse = (selected_index == len(current_menu))))
menu_win.refresh()
menu_pad.refresh(0, 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0), menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 8)
return menu_win, menu_pad
def move_highlight(old_idx, new_idx, options, show_save_option, menu_win, menu_pad):
if(old_idx == new_idx): # no-op
return
max_index = len(options) + (1 if show_save_option else 0) - 1
if show_save_option and old_idx == max_index: # special case un-highlight "Save" option
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
else:
menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
if show_save_option and new_idx == max_index: # special case highlight "Save" option
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse = True))
else:
menu_pad.chgat(new_idx, 0,menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[new_idx] in sensitive_settings else get_color("settings_default", reverse = True))
menu_win.refresh()
start_index = max(0, new_idx - (menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0)) - (1 if show_save_option and new_idx == max_index else 0)) # Leave room for borders
menu_pad.refresh(start_index, 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if show_save_option else 0), menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 8)
def settings_menu(stdscr, interface):
curses.update_lines_cols()
menu = generate_menu_from_protobuf(interface)
current_menu = menu["Main Menu"]
menu_path = ["Main Menu"]
menu_index = []
selected_index = 0
modified_settings = {}
need_redraw = True
show_save_option = False
while True:
options = list(current_menu.keys())
if(need_redraw):
options = list(current_menu.keys())
show_save_option = (
len(menu_path) > 2 and ("Radio Settings" in menu_path or "Module Settings" in menu_path)
) or (
len(menu_path) == 2 and "User Settings" in menu_path
) or (
len(menu_path) == 3 and "Channels" in menu_path
)
show_save_option = (
len(menu_path) > 2 and ("Radio Settings" in menu_path or "Module Settings" in menu_path)
) or (
len(menu_path) == 2 and "User Settings" in menu_path
) or (
len(menu_path) == 3 and "Channels" in menu_path
)
# Display the menu
display_menu(current_menu, menu_path, selected_index, show_save_option)
# Display the menu
menu_win, menu_pad = display_menu(current_menu, menu_path, selected_index, show_save_option)
need_redraw = False
# Capture user input
key = menu_win.getch()
max_index = len(options) + (1 if show_save_option else 0) - 1
if key == curses.KEY_UP:
selected_index = max(0, selected_index - 1)
old_selected_index = selected_index
selected_index = max_index if selected_index == 0 else selected_index - 1
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad)
elif key == curses.KEY_DOWN:
max_index = len(options) + (1 if show_save_option else 0) - 1
selected_index = min(max_index, selected_index + 1)
old_selected_index = selected_index
selected_index = 0 if selected_index == max_index else selected_index + 1
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad)
elif key == curses.KEY_RESIZE:
need_redraw = True
curses.update_lines_cols()
elif key == ord("\t") and show_save_option:
old_selected_index = selected_index
selected_index = max_index
move_highlight(old_selected_index, selected_index, options, show_save_option, menu_win, menu_pad)
elif key == curses.KEY_RIGHT or key == ord('\n'):
menu_win.clear()
need_redraw = True
menu_win.erase()
menu_win.refresh()
if show_save_option and selected_index == len(options):
save_changes(interface, menu_path, modified_settings)
@@ -113,30 +168,97 @@ def settings_menu(stdscr, interface):
if selected_option == "Exit":
break
elif selected_option == "Export Config":
filename = get_user_input("Enter a filename for the config file")
if not filename:
logging.warning("Export aborted: No filename provided.")
continue # Go back to the menu
if not filename.lower().endswith(".yaml"):
filename += ".yaml"
try:
config_text = config_export(globals.interface)
app_directory = os.path.dirname(os.path.abspath(__file__))
config_folder = "node-configs"
yaml_file_path = os.path.join(app_directory, config_folder, filename)
if os.path.exists(yaml_file_path):
overwrite = get_bool_selection(f"{filename} already exists. Overwrite?", None)
if overwrite == "False":
logging.info("Export cancelled: User chose not to overwrite.")
continue # Return to menu
os.makedirs(os.path.dirname(yaml_file_path), exist_ok=True)
with open(yaml_file_path, "w", encoding="utf-8") as file:
file.write(config_text)
logging.info(f"Config file saved to {yaml_file_path}")
break
except PermissionError:
logging.error(f"Permission denied: Unable to write to {yaml_file_path}")
except OSError as e:
logging.error(f"OS error while saving config: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
continue
elif selected_option == "Load Config":
app_directory = os.path.dirname(os.path.abspath(__file__))
config_folder = "node-configs"
folder_path = os.path.join(app_directory, config_folder)
file_list = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
filename = select_from_list("Choose a config file", None, file_list)
if filename:
file_path = os.path.join(app_directory, config_folder, filename)
overwrite = get_bool_selection(f"Are you sure you want to load {filename}?", None)
if overwrite == "True":
config_import(globals.interface, file_path)
break
continue
elif selected_option == "Reboot":
confirmation = get_bool_selection("Are you sure you want to Reboot?", 0)
if confirmation == "True":
settings_reboot(interface)
logging.info(f"Node Reboot Requested by menu")
break
continue
elif selected_option == "Reset Node DB":
confirmation = get_bool_selection("Are you sure you want to Reset Node DB?", 0)
if confirmation == "True":
settings_reset_nodedb(interface)
logging.info(f"Node DB Reset Requested by menu")
break
continue
elif selected_option == "Shutdown":
confirmation = get_bool_selection("Are you sure you want to Shutdown?", 0)
if confirmation == "True":
settings_shutdown(interface)
logging.info(f"Node Shutdown Requested by menu")
break
continue
elif selected_option == "Factory Reset":
confirmation = get_bool_selection("Are you sure you want to Factory Reset?", 0)
if confirmation == "True":
settings_factory_reset(interface)
logging.info(f"Factory Reset Requested by menu")
break
continue
elif selected_option == "App Settings":
menu_win.clear()
menu_win.refresh()
json_editor(stdscr) # Open the App Settings menu
continue
# need_redraw = True
field_info = current_menu.get(selected_option)
if isinstance(field_info, tuple):
@@ -145,6 +267,7 @@ def settings_menu(stdscr, interface):
if selected_option in ['longName', 'shortName', 'isLicensed']:
if selected_option in ['longName', 'shortName']:
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else new_value
current_menu[selected_option] = (field, new_value)
elif selected_option == 'isLicensed':
@@ -155,17 +278,27 @@ def settings_menu(stdscr, interface):
for option, (field, value) in current_menu.items():
modified_settings[option] = value
elif selected_option in ['latitude', 'longitude', 'altitude']:
new_value = get_user_input(f"Current value for {selected_option}: {current_value}")
new_value = current_value if new_value is None else new_value
current_menu[selected_option] = (field, new_value)
for option in ['latitude', 'longitude', 'altitude']:
if option in current_menu:
modified_settings[option] = current_menu[option][1]
elif field.type == 8: # Handle boolean type
new_value = get_bool_selection(selected_option, str(current_value))
new_value = new_value == "True"
new_value = new_value == "True" or new_value is True
elif field.label == field.LABEL_REPEATED: # Handle repeated field
new_value = get_repeated_input(current_value)
new_value = current_value if new_value is None else [int(item) for item in new_value]
elif field.enum_type: # Enum field
enum_options = [v.name for v in field.enum_type.values]
new_value = get_enum_input(enum_options, current_value)
enum_options = {v.name: v.number for v in field.enum_type.values}
new_value_name = get_enum_input(list(enum_options.keys()), current_value)
new_value = enum_options.get(new_value_name, current_value)
elif field.type == 7: # Field type 7 corresponds to FIXED32
new_value = get_fixed32_input(current_value)
@@ -188,18 +321,26 @@ def settings_menu(stdscr, interface):
# Add the new value to the appropriate level
modified_settings[selected_option] = new_value
# Convert enum string to int
if field and field.enum_type:
enum_value_descriptor = field.enum_type.values_by_number.get(new_value)
new_value = enum_value_descriptor.name if enum_value_descriptor else new_value
current_menu[selected_option] = (field, new_value)
else:
current_menu = current_menu[selected_option]
menu_path.append(selected_option)
menu_index.append(selected_index)
selected_index = 0
elif key == curses.KEY_LEFT:
need_redraw = True
menu_win.clear()
menu_win.erase()
menu_win.refresh()
modified_settings.clear()
if len(menu_path) < 2:
modified_settings.clear()
# Navigate back to the previous menu
if len(menu_path) > 1:
@@ -207,10 +348,10 @@ def settings_menu(stdscr, interface):
current_menu = menu["Main Menu"]
for step in menu_path[1:]:
current_menu = current_menu.get(step, {})
selected_index = 0
selected_index = menu_index.pop()
elif key == 27: # Escape key
menu_win.clear()
menu_win.erase()
menu_win.refresh()
break
@@ -232,4 +373,4 @@ def main(stdscr):
settings_menu(stdscr, globals.interface)
if __name__ == "__main__":
curses.wrapper(main)
curses.wrapper(main)

View File

@@ -1,9 +1,43 @@
import curses
import default_config as config
def setup_colors():
COLOR_MAP = {
"black": curses.COLOR_BLACK,
"red": curses.COLOR_RED,
"green": curses.COLOR_GREEN,
"yellow": curses.COLOR_YELLOW,
"blue": curses.COLOR_BLUE,
"magenta": curses.COLOR_MAGENTA,
"cyan": curses.COLOR_CYAN,
"white": curses.COLOR_WHITE
}
def setup_colors(reinit=False):
"""
Initialize curses color pairs based on the COLOR_CONFIG.
"""
curses.start_color()
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.init_pair(5, curses.COLOR_RED, curses.COLOR_BLACK)
if reinit:
conf = config.initialize_config()
config.assign_config_variables(conf)
for idx, (category, (fg_name, bg_name)) in enumerate(config.COLOR_CONFIG.items(), start=1):
fg = COLOR_MAP.get(fg_name.lower(), curses.COLOR_WHITE)
bg = COLOR_MAP.get(bg_name.lower(), curses.COLOR_BLACK)
curses.init_pair(idx, fg, bg)
config.COLOR_CONFIG[category] = idx
print()
def get_color(category, bold=False, reverse=False, underline=False):
"""
Retrieve a curses color pair with optional attributes.
"""
color = curses.color_pair(config.COLOR_CONFIG[category])
if bold:
color |= curses.A_BOLD
if reverse:
color |= curses.A_REVERSE
if underline:
color |= curses.A_UNDERLINE
return color

View File

@@ -1,14 +1,57 @@
import curses
import textwrap
import globals
from utilities.utils import get_name_from_number, get_channels
from utilities.utils import get_channels, get_time_ago
from settings import settings_menu
from message_handlers.tx_handler import send_message, send_traceroute
from ui.colors import setup_colors, get_color
from db_handler import get_name_from_database
import default_config as config
import ui.dialog
from ui.colors import setup_colors
import globals
def draw_node_details():
nodes_snapshot = list(globals.interface.nodes.values())
node = None
for node in nodes_snapshot:
if globals.node_list[globals.selected_node] == node['num']:
break
function_win.erase()
function_win.box()
nodestr = ""
width = function_win.getmaxyx()[1]
node_details_list = [f"{node['user']['longName']}"
if 'user' in node and 'longName' in node['user'] else "",
f"({node['user']['shortName']})"
if 'user' in node and 'shortName' in node['user'] else "",
f" | {node['user']['hwModel']}"
if 'user' in node and 'hwModel' in node['user'] else "",
f" | {get_time_ago(node['lastHeard'])}" if ('lastHeard' in node and node['lastHeard']) else "",
f" | Hops: {node['hopsAway']}" if 'hopsAway' in node else "",
f" | SNR: {node['snr']}dB"
if ('snr' in node and 'hopsAway' in node and node['hopsAway'] == 0)
else "",
]
for s in node_details_list:
if len(nodestr) + len(s) < width:
nodestr = nodestr + s
draw_centered_text_field(function_win, nodestr, 0, get_color("commands"))
def draw_function_win():
cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit"]
function_str = ""
for s in cmds:
if(len(function_str) + len(s) < function_win.getmaxyx()[1]):
function_str += s
draw_centered_text_field(function_win, function_str, 0, get_color("commands"))
def get_msg_window_lines():
packetlog_height = packetlog_win.getmaxyx()[0] if globals.display_log else 0
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if globals.display_log else 0
return messages_box.getmaxyx()[0] - 2 - packetlog_height
def refresh_pad(window):
@@ -27,6 +70,10 @@ def refresh_pad(window):
selected_item = globals.selected_message
start_index = globals.selected_message
if globals.display_log:
packetlog_win.box()
packetlog_win.refresh()
if(window == 2):
pad = nodes_pad
box = nodes_box
@@ -42,24 +89,27 @@ def refresh_pad(window):
def highlight_line(highlight, window, line):
pad = channel_pad
select_len = 0
color = curses.color_pair(1)
ch_color = get_color("channel_list")
nd_color = get_color("node_list")
if(window == 2):
pad = nodes_pad
select_len = len(get_name_from_number(globals.node_list[line], "long"))
select_len = len(get_name_from_database(globals.node_list[line], "long"))
pad.chgat(line, 1, select_len, nd_color | curses.A_REVERSE if highlight else nd_color)
if(window == 0):
channel = list(globals.all_messages.keys())[line]
win_width = channel_box.getmaxyx()[1]
if(isinstance(channel, int)):
channel = get_name_from_number(channel, type="long")
channel = get_name_from_database(channel, type="long")
select_len = min(len(channel), win_width - 4)
if line == globals.selected_channel and highlight == False:
color = curses.color_pair(2)
ch_color = get_color("channel_selected")
pad.chgat(line, 1, select_len, color | curses.A_REVERSE if highlight else color)
pad.chgat(line, 1, select_len, ch_color | curses.A_REVERSE if highlight else ch_color)
def add_notification(channel_number):
globals.notifications.add(channel_number)
@@ -67,15 +117,15 @@ def add_notification(channel_number):
def remove_notification(channel_number):
globals.notifications.discard(channel_number)
def draw_text_field(win, text):
def draw_text_field(win, text, color):
win.border()
win.addstr(1, 1, text)
win.addstr(1, 1, text, color)
def draw_centered_text_field(win, text, y_offset = 0):
def draw_centered_text_field(win, text, y_offset, color):
height, width = win.getmaxyx()
x = (width - len(text)) // 2
y = (height // 2) + y_offset
win.addstr(y, x, text)
win.addstr(y, x, text, color)
win.refresh()
def draw_debug(value):
@@ -87,6 +137,8 @@ def draw_splash(stdscr):
curses.curs_set(0)
stdscr.clear()
stdscr.bkgd(get_color("background"))
height, width = stdscr.getmaxyx()
message_1 = "/ Λ"
message_2 = "/ / \\"
@@ -96,17 +148,19 @@ def draw_splash(stdscr):
start_x = width // 2 - len(message_1) // 2
start_x2 = width // 2 - len(message_4) // 2
start_y = height // 2 - 1
stdscr.addstr(start_y, start_x, message_1, curses.color_pair(2) | curses.A_BOLD)
stdscr.addstr(start_y+1, start_x-1, message_2, curses.color_pair(2) | curses.A_BOLD)
stdscr.addstr(start_y+2, start_x-2, message_3, curses.color_pair(2) | curses.A_BOLD)
stdscr.addstr(start_y+4, start_x2, message_4)
stdscr.addstr(start_y, start_x, message_1, get_color("splash_logo", bold=True))
stdscr.addstr(start_y+1, start_x-1, message_2, get_color("splash_logo", bold=True))
stdscr.addstr(start_y+2, start_x-2, message_3, get_color("splash_logo", bold=True))
stdscr.addstr(start_y+4, start_x2, message_4, get_color("splash_text"))
stdscr.attrset(get_color("window_frame"))
stdscr.box()
stdscr.refresh()
curses.napms(500)
def draw_channel_list():
channel_pad.clear()
channel_pad.erase()
win_height, win_width = channel_box.getmaxyx()
start_index = max(0, globals.selected_channel - (win_height - 3)) # Leave room for borders
@@ -115,32 +169,32 @@ def draw_channel_list():
for i, channel in enumerate(list(globals.all_messages.keys())):
# Convert node number to long name if it's an integer
if isinstance(channel, int):
channel = get_name_from_number(channel, type='long')
channel = get_name_from_database(channel, type='long')
# Determine whether to add the notification
notification = " " + globals.notification_symbol if i in globals.notifications else ""
notification = " " + config.notification_symbol if i in globals.notifications else ""
# Truncate the channel name if it's too long to fit in the window
truncated_channel = channel[:win_width - 5] + '-' if len(channel) > win_width - 5 else channel
if i == globals.selected_channel:
if globals.current_window == 0:
channel_pad.addstr(i, 1, truncated_channel + notification, curses.color_pair(1) | curses.A_REVERSE)
channel_pad.addstr(i, 1, truncated_channel + notification, get_color("channel_list", reverse=True))
remove_notification(globals.selected_channel)
else:
channel_pad.addstr(i, 1, truncated_channel + notification, curses.color_pair(2))
channel_pad.addstr(i, 1, truncated_channel + notification, get_color("channel_selected"))
else:
channel_pad.addstr(i, 1, truncated_channel + notification, curses.color_pair(1))
channel_pad.addstr(i, 1, truncated_channel + notification, get_color("channel_list"))
channel_box.attrset(curses.color_pair(2) if globals.current_window == 0 else curses.color_pair(0))
channel_box.attrset(get_color("window_frame_selected") if globals.current_window == 0 else get_color("window_frame"))
channel_box.box()
channel_box.attrset(curses.color_pair(0))
channel_box.attrset((get_color("window_frame")))
channel_box.refresh()
refresh_pad(0)
def draw_messages_window(scroll_to_bottom = False):
"""Update the messages window based on the selected channel and scroll position."""
messages_pad.clear()
messages_pad.erase()
channel = globals.channel_list[globals.selected_channel]
@@ -157,13 +211,19 @@ def draw_messages_window(scroll_to_bottom = False):
messages_pad.resize(msg_line_count, messages_box.getmaxyx()[1])
for line in wrapped_lines:
color = curses.color_pair(1) if prefix.startswith("--") else (curses.color_pair(3) if prefix.startswith(globals.sent_message_prefix) else curses.color_pair(2))
if prefix.startswith("--"):
color = get_color("timestamps")
elif prefix.startswith(config.sent_message_prefix):
color = get_color("tx_messages")
else:
color = get_color("rx_messages")
messages_pad.addstr(row, 1, line, color)
row += 1
messages_box.attrset(curses.color_pair(2) if globals.current_window == 1 else curses.color_pair(0))
messages_box.attrset(get_color("window_frame_selected") if globals.current_window == 1 else get_color("window_frame"))
messages_box.box()
messages_box.attrset(curses.color_pair(0))
messages_box.attrset(get_color("window_frame"))
messages_box.refresh()
if(scroll_to_bottom):
@@ -176,21 +236,18 @@ def draw_messages_window(scroll_to_bottom = False):
draw_packetlog_win()
def draw_node_list():
nodes_pad.clear()
win_height = nodes_box.getmaxyx()[0]
start_index = max(0, globals.selected_node - (win_height - 3)) # Calculate starting index based on selected node and window height
nodes_pad.erase()
nodes_pad.resize(len(globals.node_list), nodes_box.getmaxyx()[1])
for i, node in enumerate(globals.node_list):
if globals.selected_node == i and globals.current_window == 2:
nodes_pad.addstr(i, 1, get_name_from_number(node, "long"), curses.color_pair(1) | curses.A_REVERSE)
nodes_pad.addstr(i, 1, get_name_from_database(node, "long"), get_color("node_list", reverse=True))
else:
nodes_pad.addstr(i, 1, get_name_from_number(node, "long"), curses.color_pair(1))
nodes_pad.addstr(i, 1, get_name_from_database(node, "long"), get_color("node_list"))
nodes_box.attrset(curses.color_pair(2) if globals.current_window == 2 else curses.color_pair(0))
nodes_box.attrset(get_color("window_frame_selected") if globals.current_window == 2 else get_color("window_frame"))
nodes_box.box()
nodes_box.attrset(curses.color_pair(0))
nodes_box.attrset(get_color("window_frame"))
nodes_box.refresh()
refresh_pad(2)
@@ -235,6 +292,8 @@ def select_node(idx):
highlight_line(True, 2, globals.selected_node)
refresh_pad(2)
draw_node_details()
def scroll_nodes(direction):
new_selected_node = globals.selected_node + direction
@@ -251,7 +310,7 @@ def draw_packetlog_win():
span = 0
if globals.display_log:
packetlog_win.clear()
packetlog_win.erase()
height, width = packetlog_win.getmaxyx()
for column in columns[:-1]:
@@ -259,17 +318,17 @@ def draw_packetlog_win():
# Add headers
headers = f"{'From':<{columns[0]}} {'To':<{columns[1]}} {'Port':<{columns[2]}} {'Payload':<{width-span}}"
packetlog_win.addstr(1, 1, headers[:width - 2],curses.A_UNDERLINE) # Truncate headers if they exceed window width
packetlog_win.addstr(1, 1, headers[:width - 2],get_color("log_header", underline=True)) # Truncate headers if they exceed window width
for i, packet in enumerate(reversed(globals.packet_buffer)):
if i >= height - 3: # Skip if exceeds the window height
break
# Format each field
from_id = get_name_from_number(packet['from'], 'short').ljust(columns[0])
from_id = get_name_from_database(packet['from'], 'short').ljust(columns[0])
to_id = (
"BROADCAST".ljust(columns[1]) if str(packet['to']) == "4294967295"
else get_name_from_number(packet['to'], 'short').ljust(columns[1])
else get_name_from_database(packet['to'], 'short').ljust(columns[1])
)
if 'decoded' in packet:
port = packet['decoded']['portnum'].ljust(columns[2])
@@ -283,67 +342,113 @@ def draw_packetlog_win():
logString = logString[:width - 3]
# Add to the window
packetlog_win.addstr(i + 2, 1, logString)
packetlog_win.addstr(i + 2, 1, logString, get_color("log"))
packetlog_win.attrset(get_color("window_frame"))
packetlog_win.box()
packetlog_win.refresh()
def main_ui(stdscr):
global messages_pad, messages_box, nodes_pad, nodes_box, channel_pad, channel_box, function_win, packetlog_win
stdscr.keypad(True)
get_channels()
def handle_resize(stdscr, firstrun):
global messages_pad, messages_box, nodes_pad, nodes_box, channel_pad, channel_box, function_win, packetlog_win, entry_win
# Calculate window max dimensions
height, width = stdscr.getmaxyx()
# Define window dimensions and positions
entry_win = curses.newwin(3, width, 0, 0)
channel_width = 3 * (width // 16)
nodes_width = 5 * (width // 16)
messages_width = width - channel_width - nodes_width
channel_box = curses.newwin(height - 6, channel_width, 3, 0)
messages_box = curses.newwin(height - 6, messages_width, 3, channel_width)
nodes_box = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
if firstrun:
entry_win = curses.newwin(3, width, 0, 0)
channel_box = curses.newwin(height - 6, channel_width, 3, 0)
messages_box = curses.newwin(height - 6, messages_width, 3, channel_width)
nodes_box = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
function_win = curses.newwin(3, width, height - 3, 0)
packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width)
# Will be resized to what we need when drawn
messages_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1,1)
channel_pad = curses.newpad(1,1)
# Will be resized to what we need when drawn
messages_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1,1)
channel_pad = curses.newpad(1,1)
function_win = curses.newwin(3, width, height - 3, 0)
packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width)
entry_win.bkgd(get_color("background"))
channel_box.bkgd(get_color("background"))
messages_box.bkgd(get_color("background"))
nodes_box.bkgd(get_color("background"))
messages_pad.bkgd(get_color("background"))
nodes_pad.bkgd(get_color("background"))
channel_pad.bkgd(get_color("background"))
function_win.bkgd(get_color("background"))
packetlog_win.bkgd(get_color("background"))
channel_box.attrset(get_color("window_frame"))
entry_win.attrset(get_color("window_frame"))
nodes_box.attrset(get_color("window_frame"))
messages_box.attrset(get_color("window_frame"))
function_win.attrset(get_color("window_frame"))
else:
entry_win.erase()
channel_box.erase()
messages_box.erase()
nodes_box.erase()
function_win.erase()
packetlog_win.erase()
entry_win.resize(3, width)
channel_box.resize(height - 6, channel_width)
messages_box.resize(height - 6, messages_width)
messages_box.mvwin(3, channel_width)
nodes_box.resize(height - 6, nodes_width)
nodes_box.mvwin(3, channel_width + messages_width)
function_win.resize(3, width)
function_win.mvwin(height - 3, 0)
packetlog_win.resize(int(height / 3), messages_width)
packetlog_win.mvwin(height - int(height / 3) - 3, channel_width)
draw_centered_text_field(function_win, f"↑→↓← = Select ENTER = Send ` = Settings ^P = Packet Log ESC = Quit")
# Draw boxes around windows
channel_box.attrset(curses.color_pair(2))
channel_box.box()
channel_box.attrset(curses.color_pair(0))
entry_win.box()
nodes_box.box()
messages_box.box()
function_win.box()
function_win.box()
# Refresh all windows
entry_win.refresh()
channel_box.refresh()
function_win.refresh()
nodes_box.refresh()
messages_box.refresh()
input_text = ""
entry_win.keypad(True)
curses.curs_set(1)
draw_channel_list()
draw_node_list()
draw_messages_window(True)
try:
draw_function_win()
draw_channel_list()
draw_messages_window(True)
draw_node_list()
except:
# Resize events can come faster than we can re-draw, which can cause a curses error.
# In this case we'll see another curses.KEY_RESIZE in our key handler and draw again later.
pass
def main_ui(stdscr):
global messages_pad, messages_box, nodes_pad, nodes_box, channel_pad, channel_box, function_win, packetlog_win, entry_win
messages_pad = messages_box = nodes_pad = nodes_box = channel_pad = channel_box = function_win = packetlog_win = entry_win = None
stdscr.keypad(True)
get_channels()
input_text = ""
handle_resize(stdscr, True)
while True:
draw_text_field(entry_win, f"Input: {input_text[-(width - 10):]}")
draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
char = entry_win.get_wch()
@@ -358,6 +463,10 @@ def main_ui(stdscr):
elif globals.current_window == 2:
scroll_nodes(-1)
elif char == curses.KEY_RESIZE:
input_text = ""
handle_resize(stdscr, False)
elif char == curses.KEY_DOWN:
if globals.current_window == 0:
scroll_channels(1)
@@ -411,40 +520,42 @@ def main_ui(stdscr):
globals.current_window = (globals.current_window + delta) % 3
if old_window == 0:
channel_box.attrset(curses.color_pair(0))
channel_box.attrset(get_color("window_frame"))
channel_box.box()
channel_box.refresh()
highlight_line(False, 0, globals.selected_channel)
refresh_pad(0)
if old_window == 1:
messages_box.attrset(curses.color_pair(0))
messages_box.attrset(get_color("window_frame"))
messages_box.box()
messages_box.refresh()
refresh_pad(1)
elif old_window == 2:
nodes_box.attrset(curses.color_pair(0))
draw_function_win()
nodes_box.attrset(get_color("window_frame"))
nodes_box.box()
nodes_box.refresh()
highlight_line(False, 2, globals.selected_node)
refresh_pad(2)
if globals.current_window == 0:
channel_box.attrset(curses.color_pair(2))
channel_box.attrset(get_color("window_frame_selected"))
channel_box.box()
channel_box.attrset(curses.color_pair(0))
channel_box.attrset(get_color("window_frame"))
channel_box.refresh()
highlight_line(True, 0, globals.selected_channel)
refresh_pad(0)
elif globals.current_window == 1:
messages_box.attrset(curses.color_pair(2))
messages_box.attrset(get_color("window_frame_selected"))
messages_box.box()
messages_box.attrset(curses.color_pair(0))
messages_box.attrset(get_color("window_frame"))
messages_box.refresh()
refresh_pad(1)
elif globals.current_window == 2:
nodes_box.attrset(curses.color_pair(2))
draw_node_details()
nodes_box.attrset(get_color("window_frame_selected"))
nodes_box.box()
nodes_box.attrset(curses.color_pair(0))
nodes_box.attrset(get_color("window_frame"))
nodes_box.refresh()
highlight_line(True, 2, globals.selected_node)
refresh_pad(2)
@@ -459,6 +570,7 @@ def main_ui(stdscr):
curses.curs_set(0) # Hide cursor
ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
if globals.current_window == 2:
@@ -475,15 +587,14 @@ def main_ui(stdscr):
draw_channel_list()
draw_messages_window(True)
else:
elif len(input_text) > 0:
# Enter key pressed, send user input as message
send_message(input_text, channel=globals.selected_channel)
draw_messages_window(True)
# Clear entry window and reset input text
input_text = ""
entry_win.clear()
# entry_win.refresh()
entry_win.erase()
elif char in (curses.KEY_BACKSPACE, chr(127)):
if input_text:
@@ -498,6 +609,7 @@ def main_ui(stdscr):
curses.curs_set(0)
settings_menu(stdscr, globals.interface)
curses.curs_set(1)
handle_resize(stdscr, False)
elif char == chr(16):
# Display packet log
@@ -506,7 +618,7 @@ def main_ui(stdscr):
draw_messages_window(True)
else:
globals.display_log = False
packetlog_win.clear()
packetlog_win.erase()
draw_messages_window(True)
else:
# Append typed character to input text

View File

@@ -1,4 +1,5 @@
import curses
from ui.colors import get_color
def dialog(stdscr, title, message):
height, width = stdscr.getmaxyx()
@@ -15,17 +16,19 @@ def dialog(stdscr, title, message):
# Create dialog window
win = curses.newwin(dialog_height, dialog_width, y, x)
win.bkgd(get_color("background"))
win.attrset(get_color("window_frame"))
win.border(0)
# Add title
win.addstr(0, 2, title)
win.addstr(0, 2, title, get_color("settings_default"))
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l)
win.addstr(2 + i, 2, l, get_color("settings_default"))
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", curses.color_pair(1) | curses.A_REVERSE)
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
# Refresh dialog window
win.refresh()
@@ -35,6 +38,6 @@ def dialog(stdscr, title, message):
char = win.getch()
# Close dialog with enter, space, or esc
if char in(curses.KEY_ENTER, 10, 13, 32, 27):
win.clear()
win.erase()
win.refresh()
return

View File

@@ -1,6 +1,8 @@
from collections import OrderedDict
from meshtastic.protobuf import config_pb2, module_config_pb2, channel_pb2
from save_to_radio import settings_reboot, settings_factory_reset, settings_reset_nodedb, settings_shutdown
import logging, traceback
import base64
def extract_fields(message_instance, current_config=None):
if isinstance(current_config, dict): # Handle dictionaries
@@ -12,9 +14,11 @@ def extract_fields(message_instance, current_config=None):
menu = {}
fields = message_instance.DESCRIPTOR.fields
for field in fields:
if field.name in {"sessionkey", "channel_num", "id"}: # Skip certain fields
if field.name in {"sessionkey", "channel_num", "id", "ignore_incoming"}: # Skip certain fields
continue
if field.message_type: # Nested message
nested_instance = getattr(message_instance, field.name)
nested_config = getattr(current_config, field.name, None) if current_config else None
@@ -41,16 +45,6 @@ def generate_menu_from_protobuf(interface):
# Function to generate the menu structure from protobuf messages
menu_structure = {"Main Menu": {}}
# Add Radio Settings
radio = config_pb2.Config()
current_radio_config = interface.localNode.localConfig if interface else None
menu_structure["Main Menu"]["Radio Settings"] = extract_fields(radio, current_radio_config)
# Add Module Settings
module = module_config_pb2.ModuleConfig()
current_module_config = interface.localNode.moduleConfig if interface else None
menu_structure["Main Menu"]["Module Settings"] = extract_fields(module, current_module_config)
# Add User Settings
current_node_info = interface.getMyNodeInfo() if interface else None
@@ -80,13 +74,54 @@ def generate_menu_from_protobuf(interface):
current_channel = interface.localNode.getChannelByChannelIndex(i)
if current_channel:
channel_config = extract_fields(channel, current_channel.settings)
# Convert 'psk' field to Base64
channel_config["psk"] = (channel_config["psk"][0], base64.b64encode(channel_config["psk"][1]).decode('utf-8'))
menu_structure["Main Menu"]["Channels"][f"Channel {i + 1}"] = channel_config
# Add Radio Settings
radio = config_pb2.Config()
current_radio_config = interface.localNode.localConfig if interface else None
menu_structure["Main Menu"]["Radio Settings"] = extract_fields(radio, current_radio_config)
# Add Lat/Lon/Alt
position_data = {
"latitude": (None, current_node_info["position"].get("latitude", 0.0)),
"longitude": (None, current_node_info["position"].get("longitude", 0.0)),
"altitude": (None, current_node_info["position"].get("altitude", 0))
}
# Get existing position menu items
existing_position_menu = menu_structure["Main Menu"]["Radio Settings"].get("position", {})
# Create an ordered position menu with Lat/Lon/Alt inserted in the middle
ordered_position_menu = OrderedDict()
for key, value in existing_position_menu.items():
if key == "fixed_position": # Insert before or after a specific key
ordered_position_menu[key] = value
ordered_position_menu.update(position_data) # Insert Lat/Lon/Alt **right here**
else:
ordered_position_menu[key] = value
# Update the menu with the new order
menu_structure["Main Menu"]["Radio Settings"]["position"] = ordered_position_menu
# Add Module Settings
module = module_config_pb2.ModuleConfig()
current_module_config = interface.localNode.moduleConfig if interface else None
menu_structure["Main Menu"]["Module Settings"] = extract_fields(module, current_module_config)
# Add App Settings
menu_structure["Main Menu"]["App Settings"] = {"Open": "app_settings"}
# Add additional settings options
menu_structure["Main Menu"]["Reboot"] = settings_reboot
menu_structure["Main Menu"]["Reset Node DB"] = settings_reset_nodedb
menu_structure["Main Menu"]["Shutdown"] = settings_shutdown
menu_structure["Main Menu"]["Factory Reset"] = settings_factory_reset
menu_structure["Main Menu"]["Export Config"] = None
menu_structure["Main Menu"]["Load Config"] = None
menu_structure["Main Menu"]["Reboot"] = None
menu_structure["Main Menu"]["Reset Node DB"] = None
menu_structure["Main Menu"]["Shutdown"] = None
menu_structure["Main Menu"]["Factory Reset"] = None
# Add Exit option
menu_structure["Main Menu"]["Exit"] = None

318
user_config.py Normal file
View File

@@ -0,0 +1,318 @@
import os
import json
import curses
from ui.colors import get_color, setup_colors, COLOR_MAP
from default_config import format_json_single_line_arrays, loaded_config
from input_handlers import select_from_list
width = 60
save_option_text = "Save Changes"
def edit_color_pair(key, current_value):
"""
Allows the user to select a foreground and background color for a key.
"""
color_list = [" "] + list(COLOR_MAP.keys())
fg_color = select_from_list(f"Select Foreground Color for {key}", current_value[0], color_list)
bg_color = select_from_list(f"Select Background Color for {key}", current_value[1], color_list)
return [fg_color, bg_color]
def edit_value(key, current_value):
width = 60
height = 10
input_width = width - 16 # Allow space for "New Value: "
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
# Create a centered window
edit_win = curses.newwin(height, width, start_y, start_x)
edit_win.bkgd(get_color("background"))
edit_win.attrset(get_color("window_frame"))
edit_win.border()
# Display instructions
edit_win.addstr(1, 2, f"Editing {key}", get_color("settings_default", bold=True))
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
wrap_width = width - 4 # Account for border and padding
wrapped_lines = [current_value[i:i+wrap_width] for i in range(0, len(current_value), wrap_width)]
for i, line in enumerate(wrapped_lines[:4]): # Limit display to fit the window height
edit_win.addstr(4 + i, 2, line, get_color("settings_default"))
edit_win.refresh()
# Handle theme selection dynamically
if key == "theme":
# Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return select_from_list("Select Theme", current_value, theme_options)
# Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1)
scroll_offset = 0 # Determines which part of the text is visible
user_input = ""
input_position = (7, 13) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
visible_text = user_input[scroll_offset:scroll_offset + input_width] # Only show what fits
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
edit_win.refresh()
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
key = edit_win.get_wch()
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow
curses.curs_set(0)
return current_value # Exit without returning a value
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
break
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
if user_input: # Only process if there's something to delete
user_input = user_input[:-1]
if scroll_offset > 0 and len(user_input) < scroll_offset + input_width:
scroll_offset -= 1 # Move back if text is shorter than scrolled area
else:
if isinstance(key, str):
user_input += key
else:
user_input += chr(key)
if len(user_input) > input_width: # Scroll if input exceeds visible area
scroll_offset += 1
curses.curs_set(0)
return user_input if user_input else current_value
def render_menu(current_data, menu_path, selected_index):
"""
Render the configuration menu with a Save button directly added to the window.
"""
# Determine menu items based on the type of current_data
if isinstance(current_data, dict):
options = list(current_data.keys())
elif isinstance(current_data, list):
options = [f"[{i}]" for i in range(len(current_data))]
else:
options = [] # Fallback in case of unexpected data types
# Calculate dynamic dimensions for the menu
num_items = len(options)
height = min(curses.LINES - 2, num_items + 6) # Include space for borders and Save button
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
# Create the window
menu_win = curses.newwin(height, width, start_y, start_x)
menu_win.clear()
menu_win.bkgd(get_color("background"))
menu_win.attrset(get_color("window_frame"))
menu_win.border()
menu_win.keypad(True)
# Display the menu path
header = " > ".join(menu_path)
if len(header) > width - 4:
header = header[:width - 7] + "..."
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
# Create the pad for scrolling
menu_pad = curses.newpad(num_items + 1, width - 8)
menu_pad.bkgd(get_color("background"))
# Populate the pad with menu options
for idx, key in enumerate(options):
value = current_data[key] if isinstance(current_data, dict) else current_data[int(key.strip("[]"))]
display_key = f"{key}"[:width // 2 - 2]
display_value = (
f"{value}"[:width // 2 - 8]
)
color = get_color("settings_default", reverse=(idx == selected_index))
menu_pad.addstr(idx, 0, f"{display_key:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
# Add Save button to the main window
save_button_position = height - 2
menu_win.addstr(
save_button_position,
(width - len(save_option_text)) // 2,
save_option_text,
get_color("settings_save", reverse=(selected_index == len(options))),
)
# Refresh menu and pad
menu_win.refresh()
menu_pad.refresh(
0,
0,
menu_win.getbegyx()[0] + 3,
menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + menu_win.getmaxyx()[0] - 3,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4,
)
return menu_win, menu_pad, options
def move_highlight(old_idx, new_idx, options, menu_win, menu_pad):
if old_idx == new_idx:
return # no-op
show_save_option = True
max_index = len(options) + (1 if show_save_option else 0) - 1
if show_save_option and old_idx == max_index: # special case un-highlight "Save" option
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option_text)) // 2, len(save_option_text), get_color("settings_save"))
else:
menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_default"))
if show_save_option and new_idx == max_index: # special case highlight "Save" option
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option_text)) // 2, len(save_option_text), get_color("settings_save", reverse = True))
else:
menu_pad.chgat(new_idx, 0,menu_pad.getmaxyx()[1], get_color("settings_default", reverse = True))
start_index = max(0, new_idx - (menu_win.getmaxyx()[0] - 6))
menu_win.refresh()
menu_pad.refresh(start_index, 0,
menu_win.getbegyx()[0] + 3,
menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + menu_win.getmaxyx()[0] - 3,
menu_win.getbegyx()[1] + 4 + menu_win.getmaxyx()[1] - 4)
def json_editor(stdscr):
menu_path = ["App Settings"]
selected_index = 0 # Track the selected option
file_path = "config.json"
show_save_option = True # Always show the Save button
# Ensure the file exists
if not os.path.exists(file_path):
with open(file_path, "w") as f:
json.dump({}, f)
# Load JSON data
with open(file_path, "r") as f:
original_data = json.load(f)
data = original_data # Reference to the original data
current_data = data # Track the current level of the menu
# Render the menu
menu_win, menu_pad, options = render_menu(current_data, menu_path, selected_index)
need_redraw = True
while True:
if(need_redraw):
menu_win, menu_pad, options = render_menu(current_data, menu_path, selected_index)
menu_win.refresh()
need_redraw = False
max_index = len(options) + (1 if show_save_option else 0) - 1
key = menu_win.getch()
if key == curses.KEY_UP:
old_selected_index = selected_index
selected_index = max_index if selected_index == 0 else selected_index - 1
move_highlight(old_selected_index, selected_index, options, menu_win, menu_pad)
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
selected_index = 0 if selected_index == max_index else selected_index + 1
move_highlight(old_selected_index, selected_index, options, menu_win, menu_pad)
elif key == ord("\t") and show_save_option:
old_selected_index = selected_index
selected_index = max_index
move_highlight(old_selected_index, selected_index, options, menu_win, menu_pad)
elif key in (curses.KEY_RIGHT, ord("\n")):
need_redraw = True
menu_win.erase()
menu_win.refresh()
if selected_index < len(options): # Handle selection of a menu item
selected_key = options[selected_index]
# Handle nested data
if isinstance(current_data, dict):
if selected_key in current_data:
selected_data = current_data[selected_key]
else:
continue # Skip invalid key
elif isinstance(current_data, list):
selected_data = current_data[int(selected_key.strip("[]"))]
if isinstance(selected_data, list) and len(selected_data) == 2:
# Edit color pair
new_value = edit_color_pair(
selected_key, selected_data)
current_data[selected_key] = new_value
elif isinstance(selected_data, (dict, list)):
# Navigate into nested data
menu_path.append(str(selected_key))
current_data = selected_data
selected_index = 0 # Reset the selected index
else:
# General value editing
new_value = edit_value(selected_key, selected_data)
current_data[selected_key] = new_value
need_redraw = True
else:
# Save button selected
save_json(file_path, data)
stdscr.refresh()
continue
elif key in (27, curses.KEY_LEFT): # Escape or Left Arrow
need_redraw = True
menu_win.erase()
menu_win.refresh()
# Navigate back in the menu
if len(menu_path) > 1:
menu_path.pop()
current_data = data
for path in menu_path[1:]:
current_data = current_data[path] if isinstance(current_data, dict) else current_data[int(path.strip("[]"))]
selected_index = 0
else:
# Exit the editor
menu_win.clear()
menu_win.refresh()
break
def save_json(file_path, data):
formatted_json = format_json_single_line_arrays(data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted_json)
setup_colors(reinit=True)
def main(stdscr):
curses.curs_set(0)
stdscr.keypad(True)
setup_colors()
json_editor(stdscr)
if __name__ == "__main__":
curses.wrapper(main)

281
utilities/config_io.py Normal file
View File

@@ -0,0 +1,281 @@
import yaml
import logging
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import BROADCAST_ADDR, mt_config
from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main
def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = camel_to_snake(config_root)
for pref in config:
pref_name = f"{snake_name}.{pref}"
if isinstance(config[pref], dict):
traverseConfig(pref_name, config[pref], interface_config)
else:
setPref(interface_config, pref_name, config[pref])
return True
def splitCompoundName(comp_name: str) -> List[str]:
"""Split compound (dot separated) preference name into parts"""
name: List[str] = comp_name.split(".")
if len(name) < 2:
name[0] = comp_name
name.append(comp_name)
return name
def setPref(config, comp_name, raw_val) -> bool:
"""Set a channel or preferences value"""
name = splitCompoundName(comp_name)
snake_name = camel_to_snake(name[-1])
camel_name = snake_to_camel(name[-1])
uni_name = camel_name if mt_config.camel_case else snake_name
logging.debug(f"snake_name:{snake_name}")
logging.debug(f"camel_name:{camel_name}")
objDesc = config.DESCRIPTOR
config_part = config
config_type = objDesc.fields_by_name.get(name[0])
if config_type and config_type.message_type is not None:
for name_part in name[1:-1]:
part_snake_name = camel_to_snake((name_part))
config_part = getattr(config, config_type.name)
config_type = config_type.message_type.fields_by_name.get(part_snake_name)
pref = None
if config_type and config_type.message_type is not None:
pref = config_type.message_type.fields_by_name.get(snake_name)
# Others like ChannelSettings are standalone
elif config_type:
pref = config_type
if (not pref) or (not config_type):
return False
if isinstance(raw_val, str):
val = fromStr(raw_val)
else:
val = raw_val
logging.debug(f"valStr:{raw_val} val:{val}")
if snake_name == "wifi_psk" and len(str(raw_val)) < 8:
logging.info(f"Warning: network.wifi_psk must be 8 or more characters.")
return False
enumType = pref.enum_type
# pylint: disable=C0123
if enumType and type(val) == str:
# We've failed so far to convert this string into an enum, try to find it by reflection
e = enumType.values_by_name.get(val)
if e:
val = e.number
else:
logging.info(
f"{name[0]}.{uni_name} does not have an enum called {val}, so you can not set it."
)
logging.info(f"Choices in sorted order are:")
names = []
for f in enumType.values:
# Note: We must use the value of the enum (regardless if camel or snake case)
names.append(f"{f.name}")
for temp_name in sorted(names):
logging.info(f" {temp_name}")
return False
# repeating fields need to be handled with append, not setattr
if pref.label != pref.LABEL_REPEATED:
try:
if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name)
setattr(config_values, pref.name, val)
else:
setattr(config_part, snake_name, val)
except TypeError:
# The setter didn't like our arg type guess try again as a string
config_values = getattr(config_part, config_type.name)
setattr(config_values, pref.name, str(val))
elif type(val) == list:
new_vals = [fromStr(x) for x in val]
config_values = getattr(config, config_type.name)
getattr(config_values, pref.name)[:] = new_vals
else:
config_values = getattr(config, config_type.name)
if val == 0:
# clear values
logging.info(f"Clearing {pref.name} list")
del getattr(config_values, pref.name)[:]
else:
logging.info(f"Adding '{raw_val}' to the {pref.name} list")
cur_vals = [x for x in getattr(config_values, pref.name) if x not in [0, "", b""]]
cur_vals.append(val)
getattr(config_values, pref.name)[:] = cur_vals
return True
prefix = f"{'.'.join(name[0:-1])}." if config_type.message_type is not None else ""
logging.info(f"Set {prefix}{uni_name} to {raw_val}")
return True
def config_import(interface, filename):
with open(filename, encoding="utf8") as file:
configuration = yaml.safe_load(file)
closeNow = True
interface.getNode('^local', False).beginSettingsTransaction()
if "owner" in configuration:
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode('^local', False).setOwner(configuration["owner"])
if "owner_short" in configuration:
logging.info(
f"Setting device owner short to {configuration['owner_short']}"
)
waitForAckNak = True
interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["owner_short"]
)
if "ownerShort" in configuration:
logging.info(
f"Setting device owner short to {configuration['ownerShort']}"
)
waitForAckNak = True
interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["ownerShort"]
)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode('^local').setURL(configuration["channel_url"])
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode('^local').setURL(configuration["channelUrl"])
if "location" in configuration:
alt = 0
lat = 0.0
lon = 0.0
localConfig = interface.localNode.localConfig
if "alt" in configuration["location"]:
alt = int(configuration["location"]["alt"] or 0)
logging.info(f"Fixing altitude at {alt} meters")
if "lat" in configuration["location"]:
lat = float(configuration["location"]["lat"] or 0)
logging.info(f"Fixing latitude at {lat} degrees")
if "lon" in configuration["location"]:
lon = float(configuration["location"]["lon"] or 0)
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
if "config" in configuration:
localConfig = interface.getNode('^local').localConfig
for section in configuration["config"]:
traverseConfig(
section, configuration["config"][section], localConfig
)
interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
if "module_config" in configuration:
moduleConfig = interface.getNode('^local').moduleConfig
for section in configuration["module_config"]:
traverseConfig(
section,
configuration["module_config"][section],
moduleConfig,
)
interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
interface.getNode('^local', False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")
def config_export(interface) -> str:
"""used in --export-config"""
configObj = {}
owner = interface.getLongName()
owner_short = interface.getShortName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
pos = myinfo.get("position")
lat = None
lon = None
alt = None
if pos:
lat = pos.get("latitude")
lon = pos.get("longitude")
alt = pos.get("altitude")
if owner:
configObj["owner"] = owner
if owner_short:
configObj["owner_short"] = owner_short
if channel_url:
if mt_config.camel_case:
configObj["channelUrl"] = channel_url
else:
configObj["channel_url"] = channel_url
# lat and lon don't make much sense without the other (so fill with 0s), and alt isn't meaningful without both
if lat or lon:
configObj["location"] = {"lat": lat or float(0), "lon": lon or float(0)}
if alt:
configObj["location"]["alt"] = alt
config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
if config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in config:
if mt_config.camel_case:
prefs[snake_to_camel(pref)] = config[pref]
else:
prefs[pref] = config[pref]
# mark base64 encoded fields as such
if pref == "security":
if 'privateKey' in prefs[pref]:
prefs[pref]['privateKey'] = 'base64:' + prefs[pref]['privateKey']
if 'publicKey' in prefs[pref]:
prefs[pref]['publicKey'] = 'base64:' + prefs[pref]['publicKey']
if 'adminKey' in prefs[pref]:
for i in range(len(prefs[pref]['adminKey'])):
prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i]
if mt_config.camel_case:
configObj["config"] = config #Identical command here and 2 lines below?
else:
configObj["config"] = config
module_config = MessageToDict(interface.localNode.moduleConfig)
if module_config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in module_config:
if len(module_config[pref]) > 0:
prefs[pref] = module_config[pref]
if mt_config.camel_case:
configObj["module_config"] = prefs
else:
configObj["module_config"] = prefs
config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out)
#was used as a string here and a Dictionary above
config_txt += yaml.dump(configObj)
# logging.info(config_txt)
return config_txt

View File

@@ -1,17 +1,23 @@
import meshtastic.serial_interface
import meshtastic.tcp_interface
import meshtastic.ble_interface
import logging
import meshtastic.serial_interface, meshtastic.tcp_interface, meshtastic.ble_interface
import globals
def initialize_interface(args):
if args.ble:
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
elif args.host:
return meshtastic.tcp_interface.TCPInterface(args.host)
else:
try:
return meshtastic.serial_interface.SerialInterface(args.port)
except PermissionError as ex:
print("You probably need to add yourself to the `dialout` group to use a serial connection.")
if globals.interface.devPath is None:
return meshtastic.tcp_interface.TCPInterface("meshtastic.local")
try:
if args.ble:
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
elif args.host:
return meshtastic.tcp_interface.TCPInterface(args.host)
else:
try:
return meshtastic.serial_interface.SerialInterface(args.port)
except PermissionError as ex:
logging.error(f"You probably need to add yourself to the `dialout` group to use a serial connection. {ex}")
except Exception as ex:
logging.error(f"Unexpected error initializing interface: {ex}")
if globals.interface.devPath is None:
return meshtastic.tcp_interface.TCPInterface("meshtastic.local")
except Exception as ex:
logging.critical(f"Fatal error initializing interface: {ex}")

View File

@@ -1,6 +1,6 @@
import globals
from datetime import datetime
from meshtastic.protobuf import config_pb2
import re
def get_channels():
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
@@ -68,4 +68,37 @@ def get_name_from_number(number, type='long'):
else:
pass
# If no match is found, use the ID as a string
return str(decimal_to_hex(number))
return str(decimal_to_hex(number))
def get_time_ago(timestamp):
now = datetime.now()
dt = datetime.fromtimestamp(timestamp)
delta = now - dt
value = 0
unit = ""
if delta.days > 365:
value = delta.days // 365
unit = "y"
elif delta.days > 30:
value = delta.days // 30
unit = "mon"
elif delta.days > 7:
value = delta.days // 7
unit = "w"
elif delta.days > 0:
value = delta.days
unit = "d"
elif delta.seconds > 3600:
value = delta.seconds // 3600
unit = "h"
elif delta.seconds > 60:
value = delta.seconds // 60
unit = "min"
if len(unit) > 0:
return f"{value} {unit} ago"
return "now"