feat(console): add device/channel management commands (Etap 3)

Add device management: get/set params, clock/clock sync, time,
reboot, ver, scope, self_telemetry, node_discover.
Add channel management: get_channel, set_channel, add_channel,
remove_channel. Update help text with all command categories.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-19 08:10:46 +01:00
parent d80f9a7b3a
commit 4f64cc92e5
2 changed files with 392 additions and 0 deletions

View File

@@ -1888,3 +1888,212 @@ class DeviceManager:
except Exception as e:
logger.error(f"change_flags failed: {e}")
return {'success': False, 'error': str(e)}
# ── Device Management ────────────────────────────────────────
def get_clock(self) -> Dict:
"""Get device clock time."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
event = self.execute(self.mc.commands.get_time(), timeout=5)
if event and hasattr(event, 'payload'):
return {'success': True, 'data': event.payload}
return {'success': False, 'error': 'No time response'}
except Exception as e:
logger.error(f"get_clock failed: {e}")
return {'success': False, 'error': str(e)}
def set_clock(self, epoch: int) -> Dict:
"""Set device clock to given epoch timestamp."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
self.execute(self.mc.commands.set_time(epoch), timeout=5)
return {'success': True, 'message': f'Clock set to {epoch}'}
except Exception as e:
logger.error(f"set_clock failed: {e}")
return {'success': False, 'error': str(e)}
def reboot_device(self) -> Dict:
"""Reboot the device."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
self.execute(self.mc.commands.reboot(), timeout=5)
return {'success': True, 'message': 'Device rebooting...'}
except Exception as e:
logger.error(f"reboot failed: {e}")
return {'success': False, 'error': str(e)}
def set_flood_scope(self, scope: str) -> Dict:
"""Set flood message scope."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
self.execute(self.mc.commands.set_flood_scope(scope), timeout=5)
return {'success': True, 'message': f'Scope set to: {scope}'}
except Exception as e:
logger.error(f"set_flood_scope failed: {e}")
return {'success': False, 'error': str(e)}
def get_self_telemetry(self) -> Dict:
"""Get own telemetry data."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
event = self.execute(self.mc.commands.get_self_telemetry(), timeout=5)
if event and hasattr(event, 'payload'):
return {'success': True, 'data': event.payload}
return {'success': False, 'error': 'No telemetry response'}
except Exception as e:
logger.error(f"get_self_telemetry failed: {e}")
return {'success': False, 'error': str(e)}
def get_param(self, param: str) -> Dict:
"""Get a device parameter."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
info = self.get_device_info()
if param == 'name':
return {'success': True, 'data': {'name': info.get('name', info.get('adv_name', '?'))}}
elif param == 'tx':
return {'success': True, 'data': {'tx': info.get('tx_power', '?')}}
elif param in ('coords', 'lat', 'lon'):
return {'success': True, 'data': {'lat': info.get('lat', 0), 'lon': info.get('lon', 0)}}
elif param == 'bat':
bat = self.get_battery()
return {'success': True, 'data': bat or {}}
elif param == 'radio':
return {'success': True, 'data': {
'freq': info.get('freq', '?'),
'bw': info.get('bw', '?'),
'sf': info.get('sf', '?'),
'cr': info.get('cr', '?'),
}}
elif param == 'stats':
stats = self.get_device_stats()
return {'success': True, 'data': stats}
elif param == 'custom':
event = self.execute(self.mc.commands.get_custom_vars(), timeout=5)
if event and hasattr(event, 'payload'):
return {'success': True, 'data': event.payload}
return {'success': False, 'error': 'No custom vars response'}
elif param == 'path_hash_mode':
event = self.execute(self.mc.commands.get_path_hash_mode(), timeout=5)
if event and hasattr(event, 'payload'):
return {'success': True, 'data': event.payload}
return {'success': False, 'error': 'No response'}
elif param == 'help':
return {'success': True, 'data': {
'Available params': 'name, tx, coords, lat, lon, bat, radio, stats, custom, path_hash_mode'
}}
else:
return {'success': False, 'error': f"Unknown param: {param}. Type 'get help' for list."}
except Exception as e:
logger.error(f"get_param failed: {e}")
return {'success': False, 'error': str(e)}
def set_param(self, param: str, value: str) -> Dict:
"""Set a device parameter."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
if param == 'name':
self.execute(self.mc.commands.set_name(value), timeout=5)
return {'success': True, 'message': f'Name set to: {value}'}
elif param == 'tx':
self.execute(self.mc.commands.set_tx_power(value), timeout=5)
return {'success': True, 'message': f'TX power set to: {value}'}
elif param == 'coords':
parts = value.split(',')
if len(parts) != 2:
return {'success': False, 'error': 'Format: set coords <lat>,<lon>'}
lat, lon = float(parts[0].strip()), float(parts[1].strip())
self.execute(self.mc.commands.set_coords(lat, lon), timeout=5)
return {'success': True, 'message': f'Coords set to: {lat}, {lon}'}
elif param == 'lat':
info = self.get_device_info()
lon = info.get('lon', 0)
self.execute(self.mc.commands.set_coords(float(value), lon), timeout=5)
return {'success': True, 'message': f'Lat set to: {value}'}
elif param == 'lon':
info = self.get_device_info()
lat = info.get('lat', 0)
self.execute(self.mc.commands.set_coords(lat, float(value)), timeout=5)
return {'success': True, 'message': f'Lon set to: {value}'}
elif param == 'pin':
self.execute(self.mc.commands.set_devicepin(value), timeout=5)
return {'success': True, 'message': 'PIN set'}
elif param == 'telemetry_mode_base':
self.execute(self.mc.commands.set_telemetry_mode_base(int(value)), timeout=5)
return {'success': True, 'message': f'Telemetry mode base set to: {value}'}
elif param == 'telemetry_mode_loc':
self.execute(self.mc.commands.set_telemetry_mode_loc(int(value)), timeout=5)
return {'success': True, 'message': f'Telemetry mode loc set to: {value}'}
elif param == 'telemetry_mode_env':
self.execute(self.mc.commands.set_telemetry_mode_env(int(value)), timeout=5)
return {'success': True, 'message': f'Telemetry mode env set to: {value}'}
elif param == 'advert_loc_policy':
self.execute(self.mc.commands.set_advert_loc_policy(int(value)), timeout=5)
return {'success': True, 'message': f'Advert loc policy set to: {value}'}
elif param == 'manual_add_contacts':
enabled = value.lower() in ('true', '1', 'yes', 'on')
self.execute(self.mc.commands.set_manual_add_contacts(enabled), timeout=5)
return {'success': True, 'message': f'Manual add contacts: {enabled}'}
elif param == 'multi_acks':
enabled = value.lower() in ('true', '1', 'yes', 'on')
self.execute(self.mc.commands.set_multi_acks(enabled), timeout=5)
return {'success': True, 'message': f'Multi acks: {enabled}'}
elif param == 'path_hash_mode':
self.execute(self.mc.commands.set_path_hash_mode(int(value)), timeout=5)
return {'success': True, 'message': f'Path hash mode set to: {value}'}
elif param == 'help':
return {'success': True, 'data': {
'Available params': 'name, tx, coords, lat, lon, pin, telemetry_mode_base, '
'telemetry_mode_loc, telemetry_mode_env, advert_loc_policy, '
'manual_add_contacts, multi_acks, path_hash_mode, <custom_var>'
}}
else:
# Try as custom variable
self.execute(self.mc.commands.set_custom_var(param, value), timeout=5)
return {'success': True, 'message': f'Custom var {param} set to: {value}'}
except Exception as e:
logger.error(f"set_param failed: {e}")
return {'success': False, 'error': str(e)}
def node_discover(self, type_filter: str = None) -> Dict:
"""Discover nodes on the mesh."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
from meshcore.events import EventType
types = 0xFF # all types
if type_filter:
type_map = {'cli': 1, 'rep': 2, 'room': 3, 'sensor': 4, 'sens': 4}
t = type_map.get(type_filter.lower())
if t:
types = t
res = self.execute(
self.mc.commands.send_node_discover_req(types),
timeout=10
)
# Collect responses with timeout
results = []
try:
while True:
ev = self.execute(
self.mc.wait_for_event(EventType.DISCOVER_RESPONSE, timeout=5),
timeout=10
)
if ev and hasattr(ev, 'payload'):
results.append(ev.payload)
else:
break
except Exception:
pass # timeout = no more responses
return {'success': True, 'data': results}
except Exception as e:
logger.error(f"node_discover failed: {e}")
return {'success': False, 'error': str(e)}

View File

@@ -661,6 +661,173 @@ def _execute_console_command(args: list) -> str:
return result.get('message', 'Pending contacts flushed')
return f"Error: {result.get('error')}"
# ── Device management commands ───────────────────────────────
elif cmd == 'get' and len(args) >= 2:
param = args[1]
result = device_manager.get_param(param)
if result.get('success'):
data = result.get('data', {})
lines = []
for k, v in data.items():
if isinstance(v, dict):
lines.append(f" {k}:")
for k2, v2 in v.items():
lines.append(f" {k2}: {v2}")
else:
lines.append(f" {k}: {v}")
return "\n".join(lines) if lines else "OK"
return f"Error: {result.get('error')}"
elif cmd == 'get':
return "Usage: get <param>\n Type 'get help' for available params"
elif cmd == 'set' and len(args) >= 3:
param = args[1]
value = ' '.join(args[2:])
result = device_manager.set_param(param, value)
if result.get('success'):
if 'data' in result:
data = result['data']
lines = []
for k, v in data.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
return result.get('message', 'OK')
return f"Error: {result.get('error')}"
elif cmd == 'set':
return "Usage: set <param> <value>\n Type 'set help' for available params"
elif cmd == 'clock':
if len(args) >= 2 and args[1] == 'sync':
import time as _time
epoch = int(_time.time())
result = device_manager.set_clock(epoch)
if result.get('success'):
return f"Clock synced to {epoch}"
return f"Error: {result.get('error')}"
result = device_manager.get_clock()
if result.get('success'):
data = result['data']
lines = ["Device clock:"]
for k, v in data.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
return f"Error: {result.get('error')}"
elif cmd == 'time' and len(args) >= 2:
try:
epoch = int(args[1])
except ValueError:
return "Usage: time <epoch>"
result = device_manager.set_clock(epoch)
if result.get('success'):
return result.get('message', 'OK')
return f"Error: {result.get('error')}"
elif cmd == 'reboot':
result = device_manager.reboot_device()
if result.get('success'):
return result.get('message', 'OK')
return f"Error: {result.get('error')}"
elif cmd == 'ver':
info = device_manager.get_device_info()
if info:
fw = info.get('firmware', info.get('fw_ver', '?'))
return f"Firmware: {fw}"
return "Version info unavailable"
elif cmd == 'scope' and len(args) >= 2:
scope = ' '.join(args[1:])
result = device_manager.set_flood_scope(scope)
if result.get('success'):
return result.get('message', 'OK')
return f"Error: {result.get('error')}"
elif cmd == 'self_telemetry':
result = device_manager.get_self_telemetry()
if result.get('success'):
data = result['data']
lines = ["Self telemetry:"]
for k, v in data.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
return f"Error: {result.get('error')}"
elif cmd == 'node_discover':
type_filter = args[1] if len(args) >= 2 else None
result = device_manager.node_discover(type_filter)
if result.get('success'):
data = result['data']
if not data:
return "No nodes discovered"
lines = [f"Discovered nodes ({len(data)}):"]
for node in data:
if isinstance(node, dict):
name = node.get('adv_name', node.get('name', '?'))
pk = node.get('public_key', '')[:12]
lines.append(f" {name} ({pk}...)")
else:
lines.append(f" {node}")
return "\n".join(lines)
return f"Error: {result.get('error')}"
# ── Channel management commands ──────────────────────────────
elif cmd == 'get_channel' and len(args) >= 2:
try:
idx = int(args[1])
except ValueError:
return "Usage: get_channel <index>"
ch = device_manager.get_channel_info(idx)
if ch and ch.get('name'):
lines = [f"Channel [{idx}]:"]
for k, v in ch.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
return f"Channel {idx} not configured"
elif cmd == 'set_channel' and len(args) >= 3:
try:
idx = int(args[1])
name = args[2]
secret = bytes.fromhex(args[3]) if len(args) >= 4 else None
result = device_manager.set_channel(idx, name, secret)
if result.get('success'):
return f"Channel [{idx}] set to: {name}"
return f"Error: {result.get('error')}"
except (ValueError, IndexError) as e:
return f"Usage: set_channel <index> <name> [key_hex]"
elif cmd == 'add_channel' and len(args) >= 2:
name = args[1]
secret = bytes.fromhex(args[2]) if len(args) >= 3 else None
# Find next available channel slot
idx = None
for i in range(device_manager._max_channels):
ch = device_manager.get_channel_info(i)
if not ch or not ch.get('name'):
idx = i
break
if idx is None:
return "Error: No free channel slots"
result = device_manager.set_channel(idx, name, secret)
if result.get('success'):
return f"Channel [{idx}] added: {name}"
return f"Error: {result.get('error')}"
elif cmd == 'remove_channel' and len(args) >= 2:
try:
idx = int(args[1])
except ValueError:
return "Usage: remove_channel <index>"
result = device_manager.remove_channel(idx)
if result.get('success'):
return f"Channel [{idx}] removed"
return f"Error: {result.get('error')}"
elif cmd == 'help':
return (
"Available commands:\n\n"
@@ -705,6 +872,22 @@ def _execute_console_command(args: list) -> str:
" req_acl <name> — Request access control list\n"
" req_clock <name> — Request repeater clock\n"
" req_mma <n> <f> <t> — Request min/max/avg sensor data\n\n"
" Management\n"
" get <param> — Get device parameter\n"
" set <param> <value> — Set device parameter\n"
" clock — Get device clock\n"
" clock sync — Sync device clock to now\n"
" time <epoch> — Set device time\n"
" reboot — Reboot device\n"
" ver — Firmware version\n"
" scope <scope> — Set flood scope\n"
" self_telemetry — Own telemetry data\n"
" node_discover [type] — Discover mesh nodes\n\n"
" Channels\n"
" get_channel <n> — Channel info\n"
" set_channel <n> <nm> [key] — Set channel\n"
" add_channel <name> [key] — Add channel\n"
" remove_channel <n> — Remove channel\n\n"
" help — Show this help"
)