fix(console): trace path support, stats field names, self_telemetry format

- trace: accepts comma-separated hex path (e.g. "trace 5e,d1,e7"),
  waits for TRACE_DATA response with proper timeout from device
- stats: fix field names (uptime_secs, queue_len, battery_mv, etc.),
  show all radio/packet stats with detail breakdown
- self_telemetry: format LPP sensor data nicely instead of raw dict

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-19 17:03:35 +01:00
parent 019d351ab7
commit 20924d134d
2 changed files with 69 additions and 29 deletions
+24 -9
View File
@@ -1533,20 +1533,35 @@ class DeviceManager:
logger.error(f"Neighbors request failed: {e}")
return {'error': str(e)}
def send_trace(self, tag: int = 0) -> Optional[Dict]:
"""Send a trace packet to discover mesh topology."""
def send_trace(self, path: str) -> Dict:
"""Send a trace packet and wait for trace data response."""
if not self.is_connected:
return None
return {'success': False, 'error': 'Device not connected'}
try:
event = self.execute(
self.mc.commands.send_trace(tag=tag),
timeout=5
)
return {'success': True, 'message': f'Trace sent (tag={tag})'}
async def _trace():
from meshcore.events import EventType
res = await self.mc.commands.send_trace(path=path)
if res is None or res.type == EventType.ERROR:
return None
tag = int.from_bytes(res.payload['expected_ack'], byteorder="little")
timeout = res.payload["suggested_timeout"] / 1000 * 1.2
ev = await self.mc.wait_for_event(
EventType.TRACE_DATA,
attribute_filters={"tag": tag},
timeout=timeout
)
if ev is None or ev.type == EventType.ERROR:
return None
return ev.payload
result = self.execute(_trace(), timeout=120)
if result is not None:
return {'success': True, 'data': result}
return {'success': False, 'error': f'Timeout waiting trace for path {path}'}
except Exception as e:
logger.error(f"Trace failed: {e}")
return {'error': str(e)}
return {'success': False, 'error': str(e)}
def resolve_contact(self, name_or_key: str) -> Optional[Dict]:
"""Resolve a contact by name or public key prefix."""
+45 -20
View File
@@ -334,27 +334,27 @@ def _execute_console_command(args: list) -> str:
lines = ["Device Statistics:"]
if 'core' in stats:
core = stats['core']
uptime_s = core.get('uptime', 0)
uptime_s = core.get('uptime_secs', 0)
days, rem = divmod(uptime_s, 86400)
hours, rem = divmod(rem, 3600)
mins = rem // 60
lines.append(f" Uptime: {int(days)}d {int(hours)}h {int(mins)}m")
if 'queue_length' in core:
lines.append(f" Queue: {core['queue_length']}")
if 'errors' in core:
lines.append(f" Errors: {core['errors']}")
lines.append(f" Battery: {core.get('battery_mv', '?')} mV")
lines.append(f" Queue: {core.get('queue_len', 0)}")
lines.append(f" Errors: {core.get('errors', 0)}")
if 'radio' in stats:
radio = stats['radio']
if 'tx_air_time' in radio:
lines.append(f" TX air time: {radio['tx_air_time']:.1f} min")
if 'rx_air_time' in radio:
lines.append(f" RX air time: {radio['rx_air_time']:.1f} min")
lines.append(f" Noise floor: {radio.get('noise_floor', '?')} dBm")
lines.append(f" Last RSSI: {radio.get('last_rssi', '?')} dBm")
lines.append(f" Last SNR: {radio.get('last_snr', '?')} dB")
tx_s = radio.get('tx_air_secs', 0)
rx_s = radio.get('rx_air_secs', 0)
lines.append(f" TX air time: {tx_s / 60:.1f} min")
lines.append(f" RX air time: {rx_s / 60:.1f} min")
if 'packets' in stats:
pkts = stats['packets']
if 'sent' in pkts:
lines.append(f" Packets TX: {pkts['sent']}")
if 'received' in pkts:
lines.append(f" Packets RX: {pkts['received']}")
lines.append(f" Packets TX: {pkts.get('sent', 0)} (flood: {pkts.get('flood_tx', 0)}, direct: {pkts.get('direct_tx', 0)})")
lines.append(f" Packets RX: {pkts.get('recv', 0)} (flood: {pkts.get('flood_rx', 0)}, direct: {pkts.get('direct_rx', 0)})")
return "\n".join(lines)
elif cmd == 'telemetry' and len(args) >= 2:
@@ -388,12 +388,22 @@ def _execute_console_command(args: list) -> str:
lines.append(f" {k}: {v}")
return "\n".join(lines)
elif cmd == 'trace' and len(args) >= 2:
path = args[1]
result = device_manager.send_trace(path)
if result.get('success'):
data = result['data']
# Format like meshcore-cli: snr [hash] > snr [hash] > ... snr
parts = []
for t in data.get('path', []):
snr = f"{t['snr']:.2f}"
h = f"[{t['hash']}]" if 'hash' in t else ''
parts.append(f"{snr} {h}")
return " > ".join(parts) if parts else "(empty trace)"
return f"Error: {result.get('error')}"
elif cmd == 'trace':
tag = int(args[1]) if len(args) >= 2 else 0
result = device_manager.send_trace(tag)
if not result:
return "Trace unavailable"
return result.get('message', result.get('error', 'Unknown'))
return "Usage: trace <path>\n Path: comma-separated hex hashes (e.g. 5e,d1,e7)"
# ── Repeater commands ────────────────────────────────────────
@@ -827,8 +837,23 @@ def _execute_console_command(args: list) -> str:
if result.get('success'):
data = result['data']
lines = ["Self telemetry:"]
for k, v in data.items():
lines.append(f" {k}: {v}")
lpp = data.get('lpp', [])
for sensor in lpp:
ch = sensor.get('channel', '?')
stype = sensor.get('type', '?')
val = sensor.get('value', '?')
unit = ''
if stype == 'voltage':
unit = ' V'
elif stype == 'temperature':
unit = ' C'
elif stype == 'humidity':
unit = ' %'
elif stype == 'pressure':
unit = ' hPa'
lines.append(f" Ch {ch}: {stype} = {val}{unit}")
if not lpp:
lines.append(" (no sensor data)")
return "\n".join(lines)
return f"Error: {result.get('error')}"