Files
meshcore-stats/scripts/collect_companion.py
T
Jorijn Schrijvershof 0f8b0a3492 Initial release: MeshCore Stats monitoring system
A Python-based monitoring system for MeshCore LoRa mesh networks.
Collects metrics from companion and repeater nodes, stores them in
a SQLite database, and generates a static website with interactive
SVG charts and statistics.

Features:
- Data collection from local companion and remote repeater nodes
- SQLite database with EAV schema for flexible metric storage
- Interactive SVG chart generation with matplotlib
- Static HTML site with day/week/month/year views
- Monthly and yearly statistics reports (HTML, TXT, JSON)
- Light and dark theme support
- Circuit breaker for unreliable LoRa connections
- Battery percentage calculation from 18650 discharge curves
- Automated releases via release-please

Live demo: https://meshcore.jorijn.com
2026-01-04 19:37:57 +01:00

214 lines
6.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Phase 1: Collect data from companion node.
Connects to the local companion node via serial and collects:
- Device info
- Battery status
- Time
- Self telemetry
- Custom vars
- Contacts list
Outputs:
- Concise summary to stdout
- Metrics written to SQLite database (EAV schema)
"""
import asyncio
import sys
import time
from pathlib import Path
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from meshmon.env import get_config
from meshmon import log
from meshmon.meshcore_client import connect_from_env, run_command
from meshmon.db import init_db, insert_metrics
async def collect_companion() -> int:
"""
Collect data from companion node.
Returns:
Exit code (0 = success, 1 = connection failed)
"""
cfg = get_config()
ts = int(time.time())
log.debug("Connecting to companion node...")
mc = await connect_from_env()
if mc is None:
log.error("Failed to connect to companion node")
return 1
# Metrics to insert (firmware field names)
metrics: dict[str, float] = {}
commands_succeeded = 0
# Commands are accessed via mc.commands
cmd = mc.commands
try:
# send_appstart (already called during connect, but call again to get self_info)
ok, evt_type, payload, err = await run_command(
mc, cmd.send_appstart(), "send_appstart"
)
if ok:
commands_succeeded += 1
log.debug(f"appstart: {evt_type}")
else:
log.error(f"appstart failed: {err}")
# send_device_query
ok, evt_type, payload, err = await run_command(
mc, cmd.send_device_query(), "send_device_query"
)
if ok:
commands_succeeded += 1
log.debug(f"device_query: {payload}")
else:
log.error(f"device_query failed: {err}")
# get_bat
ok, evt_type, payload, err = await run_command(
mc, cmd.get_bat(), "get_bat"
)
if ok:
commands_succeeded += 1
log.debug(f"get_bat: {payload}")
else:
log.error(f"get_bat failed: {err}")
# get_time
ok, evt_type, payload, err = await run_command(
mc, cmd.get_time(), "get_time"
)
if ok:
commands_succeeded += 1
log.debug(f"get_time: {payload}")
else:
log.error(f"get_time failed: {err}")
# get_self_telemetry
ok, evt_type, payload, err = await run_command(
mc, cmd.get_self_telemetry(), "get_self_telemetry"
)
if ok:
commands_succeeded += 1
log.debug(f"get_self_telemetry: {payload}")
else:
log.error(f"get_self_telemetry failed: {err}")
# get_custom_vars
ok, evt_type, payload, err = await run_command(
mc, cmd.get_custom_vars(), "get_custom_vars"
)
if ok:
commands_succeeded += 1
log.debug(f"get_custom_vars: {payload}")
else:
log.debug(f"get_custom_vars failed: {err}")
# get_contacts - count contacts
ok, evt_type, payload, err = await run_command(
mc, cmd.get_contacts(), "get_contacts"
)
if ok:
commands_succeeded += 1
contacts_count = len(payload) if payload else 0
metrics["contacts"] = float(contacts_count)
log.debug(f"get_contacts: found {contacts_count} contacts")
else:
log.error(f"get_contacts failed: {err}")
# Get statistics - these contain the main metrics
# Core stats (battery_mv, uptime_secs, errors, queue_len)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_core(), "get_stats_core"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
# Insert all numeric fields from stats_core
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_core: {payload}")
# Radio stats (noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_radio(), "get_stats_radio"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_radio: {payload}")
# Packet stats (recv, sent, flood_tx, direct_tx, flood_rx, direct_rx)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_packets(), "get_stats_packets"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_packets: {payload}")
except Exception as e:
log.error(f"Error during collection: {e}")
finally:
# Close connection
if hasattr(mc, "disconnect"):
try:
await mc.disconnect()
except Exception:
pass
# Print summary
summary_parts = [f"ts={ts}"]
if "battery_mv" in metrics:
bat_v = metrics["battery_mv"] / 1000.0
summary_parts.append(f"bat={bat_v:.2f}V")
if "contacts" in metrics:
summary_parts.append(f"contacts={int(metrics['contacts'])}")
if "recv" in metrics:
summary_parts.append(f"rx={int(metrics['recv'])}")
if "sent" in metrics:
summary_parts.append(f"tx={int(metrics['sent'])}")
log.info(f"Companion: {', '.join(summary_parts)}")
# Write metrics to database
if commands_succeeded > 0 and metrics:
try:
inserted = insert_metrics(ts=ts, role="companion", metrics=metrics)
log.debug(f"Inserted {inserted} metrics to database (ts={ts})")
except Exception as e:
log.error(f"Failed to write metrics to database: {e}")
return 1
return 0
else:
log.error("No commands succeeded or no metrics collected")
return 1
def main():
"""Entry point."""
# Ensure database is initialized
init_db()
exit_code = asyncio.run(collect_companion())
sys.exit(exit_code)
if __name__ == "__main__":
main()