ChannelAwarness

This commit is contained in:
SpudGunMan
2025-11-09 15:00:21 -08:00
parent e6a17d9258
commit 3cce938334

View File

@@ -403,44 +403,141 @@ for i in range(1, 10):
globals()[f'myNodeNum{i}'] = 777
# Fetch channel list from each device
channel_list = []
for i in range(1, 10):
if globals().get(f'interface{i}') and globals().get(f'interface{i}_enabled'):
_channel_cache = None
def build_channel_cache(force_refresh: bool = False):
"""
Build and cache channel_list from interfaces once (or when forced).
Returns cached list of dicts: [{"interface_id": i, "channels": {name: {number:, hash:}}}, ...]
"""
global _channel_cache
if _channel_cache is not None and not force_refresh:
return _channel_cache
cache = []
for i in range(1, 10):
if not globals().get(f'interface{i}') or not globals().get(f'interface{i}_enabled'):
continue
try:
# lightweight call to fetch local node and its channels once
node = globals()[f'interface{i}'].getNode('^local')
channels = node.channels
channels = getattr(node, "channels", []) or []
channel_dict = {}
for channel in channels:
if hasattr(channel, 'role') and channel.role:
channel_name = getattr(channel.settings, 'name', '').strip()
channel_number = getattr(channel, 'index', 0)
# Only add channels with a non-empty name
if getattr(channel, "role", False):
channel_name = getattr(channel.settings, "name", "").strip()
channel_number = getattr(channel, "index", 0)
if channel_name:
channel_dict[channel_name] = channel_number
channel_list.append({
"interface_id": i,
"channels": channel_dict
})
logger.debug(f"System: Fetched Channel List from Device{i}")
channel_dict[channel_name] = {"number": channel_number}
if channel_dict:
cache.append({"interface_id": i, "channels": channel_dict})
logger.debug(f"System: Fetched Channel List from Device{i} (cached)")
except Exception as e:
logger.error(f"System: Error fetching channel list from Device{i}: {e}")
logger.debug(f"System: Error fetching channel list from Device{i}: {e}")
# add channel hash to channel_list
for device in channel_list:
interface_id = device["interface_id"]
interface = globals().get(f'interface{interface_id}')
for channel_name, channel_number in device["channels"].items():
psk_base64 = "AQ==" # default PSK
channel_hash = generate_hash(channel_name, psk_base64)
# add hash to the channel entry in channel_list under key 'hash'
for entry in channel_list:
if entry["interface_id"] == interface_id:
entry["channels"][channel_name] = {
"number": channel_number,
"hash": channel_hash
}
# compute and attach channel hash (PSK default) once
for device in cache:
for channel_name, info in list(device["channels"].items()):
psk_base64 = "AQ=="
try:
channel_hash = generate_hash(channel_name, psk_base64)
except Exception:
channel_hash = 0
device["channels"][channel_name] = {"number": info.get("number", 0), "hash": channel_hash}
_channel_cache = cache
return _channel_cache
def refresh_channel_cache():
"""Force rebuild of channel cache (call only when channel config changes)."""
return build_channel_cache(force_refresh=True)
channel_list = build_channel_cache()
#### FUN-ctions ####
def resolve_channel_name(channel_number, rxNode=1, interface_obj=None, allow_node_lookup: bool = False):
"""
Resolve a channel number/hash to a human name.
Prefers the cached channel_list (build_channel_cache) and only does node API lookups
if allow_node_lookup is True.
Returns (channel_name, matched_index_or_hash)
"""
try:
# ensure cache exists (cheap)
cached = build_channel_cache()
# quick search in cache first (no node calls)
for device in cached:
if device.get("interface_id") == rxNode:
device_channels = device.get("channels", {}) or {}
# info is dict: {name: {'number': X, 'hash': Y}}
for chan_name, info in device_channels.items():
try:
if isinstance(info, dict):
if str(info.get('number')) == str(channel_number) or str(info.get('hash')) == str(channel_number):
return (chan_name, info.get('number') or info.get('hash'))
else:
if str(info) == str(channel_number):
return (chan_name, info)
except Exception:
continue
break
# If caller allows, try heavier node-level lookups as a fallback
if not allow_node_lookup:
return ("unknown", channel_number)
if interface_obj is None:
interface_obj = globals().get(f'interface{rxNode}')
# Try node-level API
node = None
if interface_obj:
if hasattr(interface_obj, "get_node") and callable(interface_obj.get_node):
node = interface_obj.get_node()
elif hasattr(interface_obj, "node"):
node = getattr(interface_obj, "node")
channels = None
if node is not None and hasattr(node, "get_channels_with_hash"):
try:
channels = node.get_channels_with_hash()
except Exception:
channels = None
# Fallback: generate channel list from raw payload
if not channels:
try:
from meshtastic.util import generate_channel_hash
except Exception:
generate_channel_hash = None
channels_raw = {}
if node is not None:
if isinstance(node, dict):
channels_raw = node.get("channels", {}) or {}
else:
channels_raw = getattr(node, "channels", None) or getattr(node, "channels_payload", None) or {}
if channels_raw is None:
channels_raw = {}
if generate_channel_hash and channels_raw:
try:
channels = generate_channel_hash(channels_raw)
except Exception:
channels = None
# If we have a channels sequence, try to match by hash or index
if channels and isinstance(channels, (list, tuple)):
for ch in channels:
try:
if str(ch.get('hash')) == str(channel_number) or str(ch.get('index')) == str(channel_number):
return (ch.get('name', 'unknown'), ch.get('index') or ch.get('hash'))
except Exception:
continue
except Exception:
pass
return ("unknown", channel_number)
def cleanup_memory():
"""Clean up memory by limiting list sizes and removing stale entries"""