mirror of
https://github.com/pdxlocations/contact.git
synced 2026-03-28 17:12:35 +01:00
281 lines
11 KiB
Python
281 lines
11 KiB
Python
|
|
import yaml
|
|
import logging
|
|
from typing import List
|
|
from google.protobuf.json_format import MessageToDict
|
|
from meshtastic import BROADCAST_ADDR, mt_config
|
|
from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
|
|
|
|
# defs are from meshtastic/python/main
|
|
|
|
def traverseConfig(config_root, config, interface_config) -> bool:
|
|
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
|
|
snake_name = camel_to_snake(config_root)
|
|
for pref in config:
|
|
pref_name = f"{snake_name}.{pref}"
|
|
if isinstance(config[pref], dict):
|
|
traverseConfig(pref_name, config[pref], interface_config)
|
|
else:
|
|
setPref(interface_config, pref_name, config[pref])
|
|
|
|
return True
|
|
|
|
def splitCompoundName(comp_name: str) -> List[str]:
|
|
"""Split compound (dot separated) preference name into parts"""
|
|
name: List[str] = comp_name.split(".")
|
|
if len(name) < 2:
|
|
name[0] = comp_name
|
|
name.append(comp_name)
|
|
return name
|
|
|
|
def setPref(config, comp_name, raw_val) -> bool:
|
|
"""Set a channel or preferences value"""
|
|
|
|
name = splitCompoundName(comp_name)
|
|
|
|
snake_name = camel_to_snake(name[-1])
|
|
camel_name = snake_to_camel(name[-1])
|
|
uni_name = camel_name if mt_config.camel_case else snake_name
|
|
logging.debug(f"snake_name:{snake_name}")
|
|
logging.debug(f"camel_name:{camel_name}")
|
|
|
|
objDesc = config.DESCRIPTOR
|
|
config_part = config
|
|
config_type = objDesc.fields_by_name.get(name[0])
|
|
if config_type and config_type.message_type is not None:
|
|
for name_part in name[1:-1]:
|
|
part_snake_name = camel_to_snake((name_part))
|
|
config_part = getattr(config, config_type.name)
|
|
config_type = config_type.message_type.fields_by_name.get(part_snake_name)
|
|
pref = None
|
|
if config_type and config_type.message_type is not None:
|
|
pref = config_type.message_type.fields_by_name.get(snake_name)
|
|
# Others like ChannelSettings are standalone
|
|
elif config_type:
|
|
pref = config_type
|
|
|
|
if (not pref) or (not config_type):
|
|
return False
|
|
|
|
if isinstance(raw_val, str):
|
|
val = fromStr(raw_val)
|
|
else:
|
|
val = raw_val
|
|
logging.debug(f"valStr:{raw_val} val:{val}")
|
|
|
|
if snake_name == "wifi_psk" and len(str(raw_val)) < 8:
|
|
logging.info(f"Warning: network.wifi_psk must be 8 or more characters.")
|
|
return False
|
|
|
|
enumType = pref.enum_type
|
|
# pylint: disable=C0123
|
|
if enumType and type(val) == str:
|
|
# We've failed so far to convert this string into an enum, try to find it by reflection
|
|
e = enumType.values_by_name.get(val)
|
|
if e:
|
|
val = e.number
|
|
else:
|
|
logging.info(
|
|
f"{name[0]}.{uni_name} does not have an enum called {val}, so you can not set it."
|
|
)
|
|
logging.info(f"Choices in sorted order are:")
|
|
names = []
|
|
for f in enumType.values:
|
|
# Note: We must use the value of the enum (regardless if camel or snake case)
|
|
names.append(f"{f.name}")
|
|
for temp_name in sorted(names):
|
|
logging.info(f" {temp_name}")
|
|
return False
|
|
|
|
# repeating fields need to be handled with append, not setattr
|
|
if pref.label != pref.LABEL_REPEATED:
|
|
try:
|
|
if config_type.message_type is not None:
|
|
config_values = getattr(config_part, config_type.name)
|
|
setattr(config_values, pref.name, val)
|
|
else:
|
|
setattr(config_part, snake_name, val)
|
|
except TypeError:
|
|
# The setter didn't like our arg type guess try again as a string
|
|
config_values = getattr(config_part, config_type.name)
|
|
setattr(config_values, pref.name, str(val))
|
|
elif type(val) == list:
|
|
new_vals = [fromStr(x) for x in val]
|
|
config_values = getattr(config, config_type.name)
|
|
getattr(config_values, pref.name)[:] = new_vals
|
|
else:
|
|
config_values = getattr(config, config_type.name)
|
|
if val == 0:
|
|
# clear values
|
|
logging.info(f"Clearing {pref.name} list")
|
|
del getattr(config_values, pref.name)[:]
|
|
else:
|
|
logging.info(f"Adding '{raw_val}' to the {pref.name} list")
|
|
cur_vals = [x for x in getattr(config_values, pref.name) if x not in [0, "", b""]]
|
|
cur_vals.append(val)
|
|
getattr(config_values, pref.name)[:] = cur_vals
|
|
return True
|
|
|
|
prefix = f"{'.'.join(name[0:-1])}." if config_type.message_type is not None else ""
|
|
logging.info(f"Set {prefix}{uni_name} to {raw_val}")
|
|
|
|
return True
|
|
|
|
|
|
|
|
def config_import(interface, filename):
|
|
with open(filename, encoding="utf8") as file:
|
|
configuration = yaml.safe_load(file)
|
|
closeNow = True
|
|
|
|
interface.getNode('^local', False).beginSettingsTransaction()
|
|
|
|
if "owner" in configuration:
|
|
logging.info(f"Setting device owner to {configuration['owner']}")
|
|
waitForAckNak = True
|
|
interface.getNode('^local', False).setOwner(configuration["owner"])
|
|
|
|
if "owner_short" in configuration:
|
|
logging.info(
|
|
f"Setting device owner short to {configuration['owner_short']}"
|
|
)
|
|
waitForAckNak = True
|
|
interface.getNode('^local', False).setOwner(
|
|
long_name=None, short_name=configuration["owner_short"]
|
|
)
|
|
|
|
if "ownerShort" in configuration:
|
|
logging.info(
|
|
f"Setting device owner short to {configuration['ownerShort']}"
|
|
)
|
|
waitForAckNak = True
|
|
interface.getNode('^local', False).setOwner(
|
|
long_name=None, short_name=configuration["ownerShort"]
|
|
)
|
|
|
|
if "channel_url" in configuration:
|
|
logging.info(f"Setting channel url to {configuration['channel_url']}")
|
|
interface.getNode('^local').setURL(configuration["channel_url"])
|
|
|
|
if "channelUrl" in configuration:
|
|
logging.info(f"Setting channel url to {configuration['channelUrl']}")
|
|
interface.getNode('^local').setURL(configuration["channelUrl"])
|
|
|
|
if "location" in configuration:
|
|
alt = 0
|
|
lat = 0.0
|
|
lon = 0.0
|
|
localConfig = interface.localNode.localConfig
|
|
|
|
if "alt" in configuration["location"]:
|
|
alt = int(configuration["location"]["alt"] or 0)
|
|
logging.info(f"Fixing altitude at {alt} meters")
|
|
if "lat" in configuration["location"]:
|
|
lat = float(configuration["location"]["lat"] or 0)
|
|
logging.info(f"Fixing latitude at {lat} degrees")
|
|
if "lon" in configuration["location"]:
|
|
lon = float(configuration["location"]["lon"] or 0)
|
|
logging.info(f"Fixing longitude at {lon} degrees")
|
|
logging.info("Setting device position")
|
|
interface.localNode.setFixedPosition(lat, lon, alt)
|
|
|
|
if "config" in configuration:
|
|
localConfig = interface.getNode('^local').localConfig
|
|
for section in configuration["config"]:
|
|
traverseConfig(
|
|
section, configuration["config"][section], localConfig
|
|
)
|
|
interface.getNode('^local').writeConfig(
|
|
camel_to_snake(section)
|
|
)
|
|
|
|
if "module_config" in configuration:
|
|
moduleConfig = interface.getNode('^local').moduleConfig
|
|
for section in configuration["module_config"]:
|
|
traverseConfig(
|
|
section,
|
|
configuration["module_config"][section],
|
|
moduleConfig,
|
|
)
|
|
interface.getNode('^local').writeConfig(
|
|
camel_to_snake(section)
|
|
)
|
|
|
|
interface.getNode('^local', False).commitSettingsTransaction()
|
|
logging.info("Writing modified configuration to device")
|
|
|
|
|
|
|
|
def config_export(interface) -> str:
|
|
"""used in --export-config"""
|
|
configObj = {}
|
|
|
|
owner = interface.getLongName()
|
|
owner_short = interface.getShortName()
|
|
channel_url = interface.localNode.getURL()
|
|
myinfo = interface.getMyNodeInfo()
|
|
pos = myinfo.get("position")
|
|
lat = None
|
|
lon = None
|
|
alt = None
|
|
if pos:
|
|
lat = pos.get("latitude")
|
|
lon = pos.get("longitude")
|
|
alt = pos.get("altitude")
|
|
|
|
if owner:
|
|
configObj["owner"] = owner
|
|
if owner_short:
|
|
configObj["owner_short"] = owner_short
|
|
if channel_url:
|
|
if mt_config.camel_case:
|
|
configObj["channelUrl"] = channel_url
|
|
else:
|
|
configObj["channel_url"] = channel_url
|
|
# lat and lon don't make much sense without the other (so fill with 0s), and alt isn't meaningful without both
|
|
if lat or lon:
|
|
configObj["location"] = {"lat": lat or float(0), "lon": lon or float(0)}
|
|
if alt:
|
|
configObj["location"]["alt"] = alt
|
|
|
|
config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
|
|
if config:
|
|
# Convert inner keys to correct snake/camelCase
|
|
prefs = {}
|
|
for pref in config:
|
|
if mt_config.camel_case:
|
|
prefs[snake_to_camel(pref)] = config[pref]
|
|
else:
|
|
prefs[pref] = config[pref]
|
|
# mark base64 encoded fields as such
|
|
if pref == "security":
|
|
if 'privateKey' in prefs[pref]:
|
|
prefs[pref]['privateKey'] = 'base64:' + prefs[pref]['privateKey']
|
|
if 'publicKey' in prefs[pref]:
|
|
prefs[pref]['publicKey'] = 'base64:' + prefs[pref]['publicKey']
|
|
if 'adminKey' in prefs[pref]:
|
|
for i in range(len(prefs[pref]['adminKey'])):
|
|
prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i]
|
|
if mt_config.camel_case:
|
|
configObj["config"] = config #Identical command here and 2 lines below?
|
|
else:
|
|
configObj["config"] = config
|
|
|
|
module_config = MessageToDict(interface.localNode.moduleConfig)
|
|
if module_config:
|
|
# Convert inner keys to correct snake/camelCase
|
|
prefs = {}
|
|
for pref in module_config:
|
|
if len(module_config[pref]) > 0:
|
|
prefs[pref] = module_config[pref]
|
|
if mt_config.camel_case:
|
|
configObj["module_config"] = prefs
|
|
else:
|
|
configObj["module_config"] = prefs
|
|
|
|
config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out)
|
|
#was used as a string here and a Dictionary above
|
|
config_txt += yaml.dump(configObj)
|
|
|
|
# logging.info(config_txt)
|
|
return config_txt |