Merge pull request #264 from SpudGunMan/lab

Lab
This commit is contained in:
Kelly
2025-11-11 16:44:28 -08:00
committed by GitHub
2 changed files with 25 additions and 65 deletions

View File

@@ -118,6 +118,7 @@ if [[ "$program_path" != "/opt/meshing-around" ]]; then
if [[ $(echo "$move" | grep -i "^y") ]]; then
sudo mv "$program_path" /opt/meshing-around
cd /opt/meshing-around
sudo git config --global --add safe.directory /opt/meshing-around
printf "\nProject moved to /opt/meshing-around.\n"
printf "Please re-run the installer from the new location.\n"
exit 0

View File

@@ -4,6 +4,7 @@
import meshtastic.serial_interface #pip install meshtastic or use launch.sh for venv
import meshtastic.tcp_interface
import meshtastic.ble_interface
from meshtastic.util import generate_channel_hash
import time
import asyncio
import random
@@ -331,24 +332,6 @@ if ble_count > 1:
logger.critical(f"System: Multiple BLE interfaces detected. Only one BLE interface is allowed. Exiting")
exit()
def xor_hash(data: bytes) -> int:
"""Compute an XOR hash from bytes."""
result = 0
for char in data:
result ^= char
return result
def generate_hash(name: str, key: str) -> int:
"""generate the channel number by hashing the channel name and psk"""
if key == "AQ==":
key = "1PG7OiApB1nwvP+rz05pAQ=="
replaced_key = key.replace("-", "+").replace("_", "/")
key_bytes = base64.b64decode(replaced_key.encode("utf-8"))
h_name = xor_hash(bytes(name, "utf-8"))
h_key = xor_hash(key_bytes)
result: int = h_name ^ h_key
return result
# Initialize interfaces
logger.debug(f"System: Initializing Interfaces")
interface1 = interface2 = interface3 = interface4 = interface5 = interface6 = interface7 = interface8 = interface9 = None
@@ -408,7 +391,6 @@ _channel_cache = None
def build_channel_cache(force_refresh: bool = False):
"""
Build and cache channel_list from interfaces once (or when forced).
Returns cached list of dicts: [{"interface_id": i, "channels": {name: {number:, hash:}}}, ...]
"""
global _channel_cache
if _channel_cache is not None and not force_refresh:
@@ -419,61 +401,38 @@ def build_channel_cache(force_refresh: bool = False):
if not globals().get(f'interface{i}') or not globals().get(f'interface{i}_enabled'):
continue
try:
# lightweight call to fetch local node and its channels once
node = globals()[f'interface{i}'].getNode('^local')
channels = getattr(node, "channels", []) or []
# try to use the node-provided channel/hash table if available
# Try to use the node-provided channel/hash table if available
try:
ch_hash_table = node.get_channels_with_hash()
ch_hash_table_raw = node.get_channels_with_hash()
#print(f"System: Device{i} Channel Hash Table: {ch_hash_table_raw}")
except Exception:
logger.warning(f"System: update meshtastic API 2.7.4 +")
ch_hash_table = {}
ch_hash_table_raw = []
channel_dict = {}
for channel in channels:
if getattr(channel, "role", False):
channel_name = getattr(channel.settings, "name", "").strip()
channel_number = getattr(channel, "index", 0)
if not channel_name:
continue
ch_hash = None
# ch_hash_table may map by name or by index; try both defensively
if isinstance(ch_hash_table, dict):
# by name
if channel_name in ch_hash_table:
entry = ch_hash_table[channel_name]
if isinstance(entry, dict):
ch_hash = entry.get("hash") or entry.get("pskHash") or entry.get("hashValue")
elif isinstance(entry, (list, tuple)) and len(entry) >= 2:
ch_hash = entry[1]
else:
ch_hash = entry
# by index
elif channel_number in ch_hash_table:
entry = ch_hash_table[channel_number]
if isinstance(entry, dict):
ch_hash = entry.get("hash") or entry.get("pskHash") or entry.get("hashValue")
elif isinstance(entry, (list, tuple)) and len(entry) >= 2:
ch_hash = entry[1]
else:
ch_hash = entry
# fallback to generate_hash with default PSK if no table/hash available
if ch_hash is None:
try:
ch_hash = generate_hash(channel_name, "AQ==")
except Exception:
ch_hash = 0
channel_dict[channel_name] = {"number": channel_number, "hash": ch_hash}
if channel_dict:
cache.append({"interface_id": i, "channels": channel_dict})
# Use the hash table as the source of truth for channels
if isinstance(ch_hash_table_raw, list):
for entry in ch_hash_table_raw:
channel_name = entry.get("name", "").strip()
channel_number = entry.get("index")
ch_hash = entry.get("hash")
role = entry.get("role", "")
# Always add PRIMARY/SECONDARY channels, even if name is empty
if role in ("PRIMARY", "SECONDARY"):
channel_dict[channel_name if channel_name else f"Channel{channel_number}"] = {
"number": channel_number,
"hash": ch_hash
}
elif isinstance(ch_hash_table_raw, dict):
for channel_name, ch_hash in ch_hash_table_raw.items():
channel_dict[channel_name] = {"number": None, "hash": ch_hash}
# Always add the interface, even if no named channels
cache.append({"interface_id": i, "channels": channel_dict})
logger.debug(f"System: Fetched Channel List from Device{i} (cached)")
except Exception as e:
logger.debug(f"System: Error fetching channel list from Device{i}: {e}")
# hashes are attached above using node.get_channels_with_hash() when available
_channel_cache = cache
return _channel_cache