mirror of
https://github.com/jorijn/meshcore-stats.git
synced 2026-03-28 17:42:55 +01:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81b7c6897a | ||
|
|
a3015e2209 | ||
|
|
5545ce5b28 | ||
|
|
666ed4215f | ||
|
|
3d0d90304c | ||
|
|
6afc14e007 | ||
|
|
4c5a408604 | ||
|
|
3c5eace220 | ||
|
|
7eee23ec40 |
@@ -1,3 +1,3 @@
|
||||
{
|
||||
".": "0.2.4"
|
||||
".": "0.2.7"
|
||||
}
|
||||
|
||||
26
CHANGELOG.md
26
CHANGELOG.md
@@ -4,6 +4,32 @@ All notable changes to this project will be documented in this file.
|
||||
|
||||
This changelog is automatically generated by [release-please](https://github.com/googleapis/release-please) based on [Conventional Commits](https://www.conventionalcommits.org/).
|
||||
|
||||
## [0.2.7](https://github.com/jorijn/meshcore-stats/compare/v0.2.6...v0.2.7) (2026-01-06)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* add telemetry collection for companion and repeater nodes ([#24](https://github.com/jorijn/meshcore-stats/issues/24)) ([a3015e2](https://github.com/jorijn/meshcore-stats/commit/a3015e2209781bdd7c317fa992ced6afa19efe61))
|
||||
|
||||
## [0.2.6](https://github.com/jorijn/meshcore-stats/compare/v0.2.5...v0.2.6) (2026-01-05)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* add tmpfs mount for fontconfig cache to fix read-only filesystem errors ([3d0d903](https://github.com/jorijn/meshcore-stats/commit/3d0d90304cec5ebcdb34935400de31afd62e258d))
|
||||
|
||||
## [0.2.5](https://github.com/jorijn/meshcore-stats/compare/v0.2.4...v0.2.5) (2026-01-05)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* add automatic serial port locking to prevent concurrent access ([3c5eace](https://github.com/jorijn/meshcore-stats/commit/3c5eace2207279c55401dd8fa27294d5a94bb682))
|
||||
|
||||
|
||||
### Documentation
|
||||
|
||||
* fix formatting in architecture diagram ([7eee23e](https://github.com/jorijn/meshcore-stats/commit/7eee23ec40ff9441515b4ac18fbb7cd3f87fa4b5))
|
||||
|
||||
## [0.2.4](https://github.com/jorijn/meshcore-stats/compare/v0.2.3...v0.2.4) (2026-01-05)
|
||||
|
||||
|
||||
|
||||
24
CLAUDE.md
24
CLAUDE.md
@@ -16,6 +16,8 @@ Always edit the source templates, then regenerate with `python scripts/render_si
|
||||
|
||||
## Running Commands
|
||||
|
||||
**IMPORTANT: Always activate the virtual environment before running any Python commands.**
|
||||
|
||||
```bash
|
||||
cd /path/to/meshcore-stats
|
||||
source .venv/bin/activate
|
||||
@@ -354,11 +356,17 @@ All configuration via `meshcore.conf` or environment variables. The config file
|
||||
|
||||
### Timeouts & Retry
|
||||
- `REMOTE_TIMEOUT_S`: Minimum timeout for LoRa requests (default: 10)
|
||||
- `REMOTE_RETRY_ATTEMPTS`: Number of retry attempts (default: 5)
|
||||
- `REMOTE_RETRY_ATTEMPTS`: Number of retry attempts (default: 2)
|
||||
- `REMOTE_RETRY_BACKOFF_S`: Seconds between retries (default: 4)
|
||||
- `REMOTE_CB_FAILS`: Failures before circuit breaker opens (default: 6)
|
||||
- `REMOTE_CB_COOLDOWN_S`: Circuit breaker cooldown (default: 3600)
|
||||
|
||||
### Telemetry Collection
|
||||
- `TELEMETRY_ENABLED`: Enable environmental telemetry collection from repeater (0/1, default: 0)
|
||||
- `TELEMETRY_TIMEOUT_S`: Timeout for telemetry requests (default: 10)
|
||||
- `TELEMETRY_RETRY_ATTEMPTS`: Retry attempts for telemetry (default: 2)
|
||||
- `TELEMETRY_RETRY_BACKOFF_S`: Backoff between telemetry retries (default: 4)
|
||||
|
||||
### Intervals
|
||||
- `COMPANION_STEP`: Collection interval for companion (default: 60s)
|
||||
- `REPEATER_STEP`: Collection interval for repeater (default: 900s / 15min)
|
||||
@@ -410,6 +418,12 @@ Metrics are classified as either **gauge** or **counter** in `src/meshmon/metric
|
||||
|
||||
Counter metrics are converted to rates during chart rendering by calculating deltas between consecutive readings.
|
||||
|
||||
- **TELEMETRY**: Environmental sensor data (when `TELEMETRY_ENABLED=1`):
|
||||
- Stored with `telemetry.` prefix: `telemetry.temperature.0`, `telemetry.humidity.0`, `telemetry.barometer.0`
|
||||
- Channel number distinguishes multiple sensors of the same type
|
||||
- Compound values (e.g., GPS) stored as: `telemetry.gps.0.latitude`, `telemetry.gps.0.longitude`
|
||||
- Telemetry collection does NOT affect circuit breaker state
|
||||
|
||||
## Database Schema
|
||||
|
||||
Metrics are stored in a SQLite database at `data/state/metrics.db` with WAL mode enabled for concurrent access.
|
||||
@@ -694,16 +708,14 @@ meshcore-cli -s /dev/ttyACM0 reset_path "repeater name"
|
||||
|
||||
## Cron Setup (Example)
|
||||
|
||||
Use `flock` to prevent USB serial conflicts when companion and repeater collection overlap.
|
||||
|
||||
```cron
|
||||
MESHCORE=/path/to/meshcore-stats
|
||||
|
||||
# Companion: every minute
|
||||
* * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_companion.py
|
||||
* * * * * cd $MESHCORE && .venv/bin/python scripts/collect_companion.py
|
||||
|
||||
# Repeater: every 15 minutes (offset by 1 min for staggering)
|
||||
1,16,31,46 * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_repeater.py
|
||||
1,16,31,46 * * * * cd $MESHCORE && .venv/bin/python scripts/collect_repeater.py
|
||||
|
||||
# Charts: every 5 minutes (generates SVG charts from database)
|
||||
*/5 * * * * cd $MESHCORE && .venv/bin/python scripts/render_charts.py
|
||||
@@ -717,7 +729,7 @@ MESHCORE=/path/to/meshcore-stats
|
||||
|
||||
**Notes:**
|
||||
- `cd $MESHCORE` is required because paths in the config are relative to the project root
|
||||
- `flock -w 60` waits up to 60 seconds for the lock, preventing USB serial conflicts
|
||||
- Serial port locking is handled automatically via `fcntl.flock()` in Python (no external `flock` needed)
|
||||
|
||||
## Adding New Metrics
|
||||
|
||||
|
||||
@@ -184,10 +184,10 @@ Add to your crontab (`crontab -e`):
|
||||
MESHCORE=/path/to/meshcore-stats
|
||||
|
||||
# Companion: every minute
|
||||
* * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_companion.py
|
||||
* * * * * cd $MESHCORE && .venv/bin/python scripts/collect_companion.py
|
||||
|
||||
# Repeater: every 15 minutes
|
||||
1,16,31,46 * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_repeater.py
|
||||
1,16,31,46 * * * * cd $MESHCORE && .venv/bin/python scripts/collect_repeater.py
|
||||
|
||||
# Charts: every 5 minutes
|
||||
*/5 * * * * cd $MESHCORE && .venv/bin/python scripts/render_charts.py
|
||||
@@ -328,7 +328,7 @@ docker compose restart meshcore-stats
|
||||
|
||||
```
|
||||
┌─────────────────┐ LoRa ┌─────────────────┐
|
||||
│ Companion │◄────────────►│ Repeater │
|
||||
│ Companion │◄─────────────►│ Repeater │
|
||||
│ (USB Serial) │ │ (Remote) │
|
||||
└────────┬────────┘ └─────────────────┘
|
||||
│
|
||||
|
||||
@@ -15,7 +15,7 @@ services:
|
||||
# MeshCore Stats - Data collection and rendering
|
||||
# ==========================================================================
|
||||
meshcore-stats:
|
||||
image: ghcr.io/jorijn/meshcore-stats:0.2.4 # x-release-please-version
|
||||
image: ghcr.io/jorijn/meshcore-stats:0.2.7 # x-release-please-version
|
||||
container_name: meshcore-stats
|
||||
restart: unless-stopped
|
||||
|
||||
@@ -47,6 +47,7 @@ services:
|
||||
read_only: true
|
||||
tmpfs:
|
||||
- /tmp:noexec,nosuid,size=64m
|
||||
- /var/cache/fontconfig:noexec,nosuid,size=4m
|
||||
|
||||
# Resource limits
|
||||
deploy:
|
||||
|
||||
@@ -102,6 +102,84 @@ Returns a single dict with all status fields.
|
||||
|
||||
---
|
||||
|
||||
## Telemetry Data
|
||||
|
||||
Environmental telemetry is requested via `req_telemetry_sync(contact)` and returns
|
||||
Cayenne LPP formatted sensor data. This requires `TELEMETRY_ENABLED=1` and a sensor
|
||||
board attached to the repeater.
|
||||
|
||||
### Payload Format
|
||||
|
||||
Both `req_telemetry_sync()` and `get_self_telemetry()` return a dict containing the
|
||||
LPP data list and a public key prefix:
|
||||
|
||||
```python
|
||||
{
|
||||
'pubkey_pre': 'a5c14f5244d6',
|
||||
'lpp': [
|
||||
{'channel': 0, 'type': 'temperature', 'value': 23.5},
|
||||
{'channel': 0, 'type': 'humidity', 'value': 45.2},
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The `extract_lpp_from_payload()` helper in `src/meshmon/telemetry.py` handles
|
||||
extracting the `lpp` list from this wrapper format.
|
||||
|
||||
### `req_telemetry_sync(contact)`
|
||||
|
||||
Returns sensor readings from a remote node in Cayenne LPP format:
|
||||
|
||||
```python
|
||||
[
|
||||
{'channel': 0, 'type': 'temperature', 'value': 23.5},
|
||||
{'channel': 0, 'type': 'humidity', 'value': 45.2},
|
||||
{'channel': 0, 'type': 'barometer', 'value': 1013.25},
|
||||
{'channel': 1, 'type': 'gps', 'value': {'latitude': 51.5, 'longitude': -0.1, 'altitude': 10}},
|
||||
]
|
||||
```
|
||||
|
||||
**Common sensor types:**
|
||||
|
||||
| Type | Unit | Description |
|
||||
|------|------|-------------|
|
||||
| `temperature` | Celsius | Temperature reading |
|
||||
| `humidity` | % | Relative humidity |
|
||||
| `barometer` | hPa/mbar | Barometric pressure |
|
||||
| `voltage` | V | Voltage reading |
|
||||
| `gps` | compound | GPS with `latitude`, `longitude`, `altitude` |
|
||||
|
||||
**Stored as:**
|
||||
- `telemetry.temperature.0` - Temperature on channel 0
|
||||
- `telemetry.humidity.0` - Humidity on channel 0
|
||||
- `telemetry.gps.1.latitude` - GPS latitude on channel 1
|
||||
|
||||
**Notes:**
|
||||
- Requires environmental sensor board (BME280, BME680, etc.) on repeater
|
||||
- Channel number distinguishes multiple sensors of the same type
|
||||
- Not all repeaters have environmental sensors attached
|
||||
- Telemetry collection does not affect circuit breaker state
|
||||
- Telemetry failures are logged as warnings and do not block status collection
|
||||
|
||||
### `get_self_telemetry()`
|
||||
|
||||
Returns self telemetry from the companion node's attached sensors.
|
||||
Same Cayenne LPP format as `req_telemetry_sync()`.
|
||||
|
||||
```python
|
||||
[
|
||||
{'channel': 0, 'type': 'temperature', 'value': 23.5},
|
||||
{'channel': 0, 'type': 'humidity', 'value': 45.2},
|
||||
]
|
||||
```
|
||||
|
||||
**Notes:**
|
||||
- Requires environmental sensor board attached to companion
|
||||
- Returns empty list if no sensors attached
|
||||
- Uses same format as repeater telemetry
|
||||
|
||||
---
|
||||
|
||||
## Derived Metrics
|
||||
|
||||
These are computed at query time, not stored:
|
||||
|
||||
@@ -113,6 +113,23 @@ RADIO_CODING_RATE=CR8
|
||||
# REMOTE_CB_FAILS=6
|
||||
# REMOTE_CB_COOLDOWN_S=3600
|
||||
|
||||
# =============================================================================
|
||||
# Telemetry Collection (Environmental Sensors)
|
||||
# =============================================================================
|
||||
# Enable telemetry collection from repeater's environmental sensors
|
||||
# (temperature, humidity, barometric pressure, etc.)
|
||||
# Requires sensor board attached to repeater (e.g., BME280, BME680)
|
||||
# Default: 0 (disabled)
|
||||
# TELEMETRY_ENABLED=1
|
||||
|
||||
# Telemetry-specific timeout and retry settings
|
||||
# Defaults match status settings. Separate config allows tuning if telemetry
|
||||
# proves problematic (e.g., firmware doesn't support it, sensor board missing).
|
||||
# You can reduce these if telemetry collection is causing issues.
|
||||
# TELEMETRY_TIMEOUT_S=10
|
||||
# TELEMETRY_RETRY_ATTEMPTS=2
|
||||
# TELEMETRY_RETRY_BACKOFF_S=4
|
||||
|
||||
# =============================================================================
|
||||
# Paths (Native installation only)
|
||||
# =============================================================================
|
||||
|
||||
@@ -25,8 +25,9 @@ sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from meshmon.env import get_config
|
||||
from meshmon import log
|
||||
from meshmon.meshcore_client import connect_from_env, run_command
|
||||
from meshmon.meshcore_client import connect_with_lock, run_command
|
||||
from meshmon.db import init_db, insert_metrics
|
||||
from meshmon.telemetry import extract_lpp_from_payload, extract_telemetry_metrics
|
||||
|
||||
|
||||
async def collect_companion() -> int:
|
||||
@@ -39,138 +40,142 @@ async def collect_companion() -> int:
|
||||
cfg = get_config()
|
||||
ts = int(time.time())
|
||||
|
||||
log.debug("Connecting to companion node...")
|
||||
mc = await connect_from_env()
|
||||
|
||||
if mc is None:
|
||||
log.error("Failed to connect to companion node")
|
||||
return 1
|
||||
|
||||
# Metrics to insert (firmware field names)
|
||||
metrics: dict[str, float] = {}
|
||||
commands_succeeded = 0
|
||||
|
||||
# Commands are accessed via mc.commands
|
||||
cmd = mc.commands
|
||||
log.debug("Connecting to companion node...")
|
||||
async with connect_with_lock() as mc:
|
||||
if mc is None:
|
||||
log.error("Failed to connect to companion node")
|
||||
return 1
|
||||
|
||||
try:
|
||||
# send_appstart (already called during connect, but call again to get self_info)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.send_appstart(), "send_appstart"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"appstart: {evt_type}")
|
||||
else:
|
||||
log.error(f"appstart failed: {err}")
|
||||
# Commands are accessed via mc.commands
|
||||
cmd = mc.commands
|
||||
|
||||
# send_device_query
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.send_device_query(), "send_device_query"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"device_query: {payload}")
|
||||
else:
|
||||
log.error(f"device_query failed: {err}")
|
||||
try:
|
||||
# send_appstart (already called during connect, but call again to get self_info)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.send_appstart(), "send_appstart"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"appstart: {evt_type}")
|
||||
else:
|
||||
log.error(f"appstart failed: {err}")
|
||||
|
||||
# get_bat
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_bat(), "get_bat"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"get_bat: {payload}")
|
||||
else:
|
||||
log.error(f"get_bat failed: {err}")
|
||||
# send_device_query
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.send_device_query(), "send_device_query"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"device_query: {payload}")
|
||||
else:
|
||||
log.error(f"device_query failed: {err}")
|
||||
|
||||
# get_time
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_time(), "get_time"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"get_time: {payload}")
|
||||
else:
|
||||
log.error(f"get_time failed: {err}")
|
||||
# get_bat
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_bat(), "get_bat"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"get_bat: {payload}")
|
||||
else:
|
||||
log.error(f"get_bat failed: {err}")
|
||||
|
||||
# get_self_telemetry
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_self_telemetry(), "get_self_telemetry"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"get_self_telemetry: {payload}")
|
||||
else:
|
||||
log.error(f"get_self_telemetry failed: {err}")
|
||||
# get_time
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_time(), "get_time"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"get_time: {payload}")
|
||||
else:
|
||||
log.error(f"get_time failed: {err}")
|
||||
|
||||
# get_custom_vars
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_custom_vars(), "get_custom_vars"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"get_custom_vars: {payload}")
|
||||
else:
|
||||
log.debug(f"get_custom_vars failed: {err}")
|
||||
# get_self_telemetry - collect environmental sensor data
|
||||
# Note: The call happens regardless of telemetry_enabled for device query completeness,
|
||||
# but we only extract and store metrics if the feature is enabled.
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_self_telemetry(), "get_self_telemetry"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"get_self_telemetry: {payload}")
|
||||
# Extract and store telemetry if enabled
|
||||
if cfg.telemetry_enabled:
|
||||
lpp_data = extract_lpp_from_payload(payload)
|
||||
if lpp_data is not None:
|
||||
telemetry_metrics = extract_telemetry_metrics(lpp_data)
|
||||
if telemetry_metrics:
|
||||
metrics.update(telemetry_metrics)
|
||||
log.debug(f"Extracted {len(telemetry_metrics)} telemetry metrics")
|
||||
else:
|
||||
# Debug level because not all devices have sensors attached - this is expected
|
||||
log.debug(f"get_self_telemetry failed: {err}")
|
||||
|
||||
# get_contacts - count contacts
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_contacts(), "get_contacts"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
contacts_count = len(payload) if payload else 0
|
||||
metrics["contacts"] = float(contacts_count)
|
||||
log.debug(f"get_contacts: found {contacts_count} contacts")
|
||||
else:
|
||||
log.error(f"get_contacts failed: {err}")
|
||||
# get_custom_vars
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_custom_vars(), "get_custom_vars"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
log.debug(f"get_custom_vars: {payload}")
|
||||
else:
|
||||
log.debug(f"get_custom_vars failed: {err}")
|
||||
|
||||
# Get statistics - these contain the main metrics
|
||||
# Core stats (battery_mv, uptime_secs, errors, queue_len)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_stats_core(), "get_stats_core"
|
||||
)
|
||||
if ok and payload and isinstance(payload, dict):
|
||||
commands_succeeded += 1
|
||||
# Insert all numeric fields from stats_core
|
||||
for key, value in payload.items():
|
||||
if isinstance(value, (int, float)):
|
||||
metrics[key] = float(value)
|
||||
log.debug(f"stats_core: {payload}")
|
||||
# get_contacts - count contacts
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_contacts(), "get_contacts"
|
||||
)
|
||||
if ok:
|
||||
commands_succeeded += 1
|
||||
contacts_count = len(payload) if payload else 0
|
||||
metrics["contacts"] = float(contacts_count)
|
||||
log.debug(f"get_contacts: found {contacts_count} contacts")
|
||||
else:
|
||||
log.error(f"get_contacts failed: {err}")
|
||||
|
||||
# Radio stats (noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_stats_radio(), "get_stats_radio"
|
||||
)
|
||||
if ok and payload and isinstance(payload, dict):
|
||||
commands_succeeded += 1
|
||||
for key, value in payload.items():
|
||||
if isinstance(value, (int, float)):
|
||||
metrics[key] = float(value)
|
||||
log.debug(f"stats_radio: {payload}")
|
||||
# Get statistics - these contain the main metrics
|
||||
# Core stats (battery_mv, uptime_secs, errors, queue_len)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_stats_core(), "get_stats_core"
|
||||
)
|
||||
if ok and payload and isinstance(payload, dict):
|
||||
commands_succeeded += 1
|
||||
# Insert all numeric fields from stats_core
|
||||
for key, value in payload.items():
|
||||
if isinstance(value, (int, float)):
|
||||
metrics[key] = float(value)
|
||||
log.debug(f"stats_core: {payload}")
|
||||
|
||||
# Packet stats (recv, sent, flood_tx, direct_tx, flood_rx, direct_rx)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_stats_packets(), "get_stats_packets"
|
||||
)
|
||||
if ok and payload and isinstance(payload, dict):
|
||||
commands_succeeded += 1
|
||||
for key, value in payload.items():
|
||||
if isinstance(value, (int, float)):
|
||||
metrics[key] = float(value)
|
||||
log.debug(f"stats_packets: {payload}")
|
||||
# Radio stats (noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_stats_radio(), "get_stats_radio"
|
||||
)
|
||||
if ok and payload and isinstance(payload, dict):
|
||||
commands_succeeded += 1
|
||||
for key, value in payload.items():
|
||||
if isinstance(value, (int, float)):
|
||||
metrics[key] = float(value)
|
||||
log.debug(f"stats_radio: {payload}")
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error during collection: {e}")
|
||||
# Packet stats (recv, sent, flood_tx, direct_tx, flood_rx, direct_rx)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.get_stats_packets(), "get_stats_packets"
|
||||
)
|
||||
if ok and payload and isinstance(payload, dict):
|
||||
commands_succeeded += 1
|
||||
for key, value in payload.items():
|
||||
if isinstance(value, (int, float)):
|
||||
metrics[key] = float(value)
|
||||
log.debug(f"stats_packets: {payload}")
|
||||
|
||||
finally:
|
||||
# Close connection
|
||||
if hasattr(mc, "disconnect"):
|
||||
try:
|
||||
await mc.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
log.error(f"Error during collection: {e}")
|
||||
|
||||
# Connection closed and lock released by context manager
|
||||
|
||||
# Print summary
|
||||
summary_parts = [f"ts={ts}"]
|
||||
@@ -183,6 +188,10 @@ async def collect_companion() -> int:
|
||||
summary_parts.append(f"rx={int(metrics['recv'])}")
|
||||
if "sent" in metrics:
|
||||
summary_parts.append(f"tx={int(metrics['sent'])}")
|
||||
# Add telemetry count to summary if present
|
||||
telemetry_count = sum(1 for k in metrics if k.startswith("telemetry."))
|
||||
if telemetry_count > 0:
|
||||
summary_parts.append(f"telem={telemetry_count}")
|
||||
|
||||
log.info(f"Companion: {', '.join(summary_parts)}")
|
||||
|
||||
|
||||
@@ -27,15 +27,15 @@ sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
from meshmon.env import get_config
|
||||
from meshmon import log
|
||||
from meshmon.meshcore_client import (
|
||||
connect_from_env,
|
||||
connect_with_lock,
|
||||
run_command,
|
||||
get_contact_by_name,
|
||||
get_contact_by_key_prefix,
|
||||
extract_contact_info,
|
||||
list_contacts_summary,
|
||||
)
|
||||
from meshmon.db import init_db, insert_metrics
|
||||
from meshmon.retry import get_repeater_circuit_breaker, with_retries
|
||||
from meshmon.telemetry import extract_lpp_from_payload, extract_telemetry_metrics
|
||||
|
||||
|
||||
async def find_repeater_contact(mc: Any) -> Optional[Any]:
|
||||
@@ -143,8 +143,10 @@ async def query_repeater_with_retry(
|
||||
|
||||
|
||||
async def collect_repeater() -> int:
|
||||
"""
|
||||
Collect data from remote repeater node.
|
||||
"""Collect data from remote repeater node.
|
||||
|
||||
Collects status metrics (battery, uptime, packet counters, etc.) and
|
||||
optionally telemetry data (temperature, humidity, pressure) if enabled.
|
||||
|
||||
Returns:
|
||||
Exit code (0 = success, 1 = error)
|
||||
@@ -161,122 +163,154 @@ async def collect_repeater() -> int:
|
||||
# Skip collection - no metrics to write
|
||||
return 0
|
||||
|
||||
# Connect to companion
|
||||
log.debug("Connecting to companion node...")
|
||||
mc = await connect_from_env()
|
||||
|
||||
if mc is None:
|
||||
log.error("Failed to connect to companion node")
|
||||
return 1
|
||||
|
||||
# Metrics to insert (firmware field names from req_status_sync)
|
||||
metrics: dict[str, float] = {}
|
||||
status_metrics: dict[str, float] = {}
|
||||
telemetry_metrics: dict[str, float] = {}
|
||||
node_name = "unknown"
|
||||
status_ok = False
|
||||
|
||||
# Commands are accessed via mc.commands
|
||||
cmd = mc.commands
|
||||
|
||||
try:
|
||||
# Initialize (appstart already called during connect)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.send_appstart(), "send_appstart"
|
||||
)
|
||||
if not ok:
|
||||
log.error(f"appstart failed: {err}")
|
||||
|
||||
# Find repeater contact
|
||||
contact = await find_repeater_contact(mc)
|
||||
|
||||
if contact is None:
|
||||
log.error("Cannot find repeater contact")
|
||||
# Connect to companion
|
||||
log.debug("Connecting to companion node...")
|
||||
async with connect_with_lock() as mc:
|
||||
if mc is None:
|
||||
log.error("Failed to connect to companion node")
|
||||
return 1
|
||||
|
||||
# Store contact info
|
||||
contact_info = extract_contact_info(contact)
|
||||
node_name = contact_info.get("adv_name", "unknown")
|
||||
# Commands are accessed via mc.commands
|
||||
cmd = mc.commands
|
||||
|
||||
log.debug(f"Found repeater: {node_name}")
|
||||
try:
|
||||
# Initialize (appstart already called during connect)
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc, cmd.send_appstart(), "send_appstart"
|
||||
)
|
||||
if not ok:
|
||||
log.error(f"appstart failed: {err}")
|
||||
|
||||
# Optional login (if command exists)
|
||||
if cfg.repeater_password and hasattr(cmd, "send_login"):
|
||||
log.debug("Attempting login...")
|
||||
try:
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc,
|
||||
cmd.send_login(contact, cfg.repeater_password),
|
||||
"send_login",
|
||||
)
|
||||
if ok:
|
||||
log.debug("Login successful")
|
||||
else:
|
||||
log.debug(f"Login failed or not supported: {err}")
|
||||
except Exception as e:
|
||||
log.debug(f"Login not supported: {e}")
|
||||
# Find repeater contact
|
||||
contact = await find_repeater_contact(mc)
|
||||
|
||||
# Query status (using _sync version which returns payload directly)
|
||||
# Use timeout=0 to let the device suggest timeout, with min_timeout as floor
|
||||
log.debug("Querying repeater status...")
|
||||
success, payload, err = await query_repeater_with_retry(
|
||||
mc,
|
||||
contact,
|
||||
"req_status_sync",
|
||||
lambda: cmd.req_status_sync(contact, timeout=0, min_timeout=cfg.remote_timeout_s),
|
||||
)
|
||||
if success and payload and isinstance(payload, dict):
|
||||
status_ok = True
|
||||
# Insert all numeric fields from status response
|
||||
for key, value in payload.items():
|
||||
if isinstance(value, (int, float)):
|
||||
metrics[key] = float(value)
|
||||
log.debug(f"req_status_sync: {payload}")
|
||||
else:
|
||||
log.warn(f"req_status_sync failed: {err}")
|
||||
if contact is None:
|
||||
log.error("Cannot find repeater contact")
|
||||
return 1
|
||||
|
||||
# Update circuit breaker
|
||||
if status_ok:
|
||||
cb.record_success()
|
||||
log.debug("Circuit breaker: recorded success")
|
||||
else:
|
||||
# Store contact info
|
||||
contact_info = extract_contact_info(contact)
|
||||
node_name = contact_info.get("adv_name", "unknown")
|
||||
|
||||
log.debug(f"Found repeater: {node_name}")
|
||||
|
||||
# Optional login (if command exists)
|
||||
if cfg.repeater_password and hasattr(cmd, "send_login"):
|
||||
log.debug("Attempting login...")
|
||||
try:
|
||||
ok, evt_type, payload, err = await run_command(
|
||||
mc,
|
||||
cmd.send_login(contact, cfg.repeater_password),
|
||||
"send_login",
|
||||
)
|
||||
if ok:
|
||||
log.debug("Login successful")
|
||||
else:
|
||||
log.debug(f"Login failed or not supported: {err}")
|
||||
except Exception as e:
|
||||
log.debug(f"Login not supported: {e}")
|
||||
|
||||
# Phase 1: Status collection (affects circuit breaker)
|
||||
# Use timeout=0 to let the device suggest timeout, with min_timeout as floor
|
||||
log.debug("Querying repeater status...")
|
||||
success, payload, err = await query_repeater_with_retry(
|
||||
mc,
|
||||
contact,
|
||||
"req_status_sync",
|
||||
lambda: cmd.req_status_sync(contact, timeout=0, min_timeout=cfg.remote_timeout_s),
|
||||
)
|
||||
if success and payload and isinstance(payload, dict):
|
||||
status_ok = True
|
||||
# Insert all numeric fields from status response
|
||||
for key, value in payload.items():
|
||||
if isinstance(value, (int, float)):
|
||||
status_metrics[key] = float(value)
|
||||
log.debug(f"req_status_sync: {payload}")
|
||||
else:
|
||||
log.warn(f"req_status_sync failed: {err}")
|
||||
|
||||
# Update circuit breaker based on status result
|
||||
if status_ok:
|
||||
cb.record_success()
|
||||
log.debug("Circuit breaker: recorded success")
|
||||
else:
|
||||
cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s)
|
||||
log.debug(f"Circuit breaker: recorded failure ({cb.consecutive_failures}/{cfg.remote_cb_fails})")
|
||||
|
||||
# CRITICAL: Store status metrics immediately before attempting telemetry
|
||||
# This ensures critical data is saved even if telemetry fails
|
||||
if status_ok and status_metrics:
|
||||
try:
|
||||
inserted = insert_metrics(ts=ts, role="repeater", metrics=status_metrics)
|
||||
log.debug(f"Stored {inserted} status metrics (ts={ts})")
|
||||
except Exception as e:
|
||||
log.error(f"Failed to store status metrics: {e}")
|
||||
return 1
|
||||
|
||||
# Phase 2: Telemetry collection (does NOT affect circuit breaker)
|
||||
if cfg.telemetry_enabled and status_ok:
|
||||
log.debug("Querying repeater telemetry...")
|
||||
try:
|
||||
# Note: Telemetry uses its own retry settings and does NOT
|
||||
# affect circuit breaker. Status success proves the link is up;
|
||||
# telemetry failures are likely firmware/capability issues.
|
||||
telem_success, telem_payload, telem_err = await with_retries(
|
||||
lambda: cmd.req_telemetry_sync(
|
||||
contact, timeout=0, min_timeout=cfg.telemetry_timeout_s
|
||||
),
|
||||
attempts=cfg.telemetry_retry_attempts,
|
||||
backoff_s=cfg.telemetry_retry_backoff_s,
|
||||
name="req_telemetry_sync",
|
||||
)
|
||||
|
||||
if telem_success and telem_payload:
|
||||
log.debug(f"req_telemetry_sync: {telem_payload}")
|
||||
lpp_data = extract_lpp_from_payload(telem_payload)
|
||||
if lpp_data is not None:
|
||||
telemetry_metrics = extract_telemetry_metrics(lpp_data)
|
||||
log.debug(f"Extracted {len(telemetry_metrics)} telemetry metrics")
|
||||
|
||||
# Store telemetry metrics
|
||||
if telemetry_metrics:
|
||||
try:
|
||||
inserted = insert_metrics(ts=ts, role="repeater", metrics=telemetry_metrics)
|
||||
log.debug(f"Stored {inserted} telemetry metrics")
|
||||
except Exception as e:
|
||||
log.warn(f"Failed to store telemetry metrics: {e}")
|
||||
else:
|
||||
log.warn(f"req_telemetry_sync failed: {telem_err}")
|
||||
except Exception as e:
|
||||
log.warn(f"Telemetry collection error (continuing): {e}")
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error during collection: {e}")
|
||||
cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s)
|
||||
log.debug(f"Circuit breaker: recorded failure ({cb.consecutive_failures}/{cfg.remote_cb_fails})")
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error during collection: {e}")
|
||||
cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s)
|
||||
|
||||
finally:
|
||||
# Close connection
|
||||
if hasattr(mc, "disconnect"):
|
||||
try:
|
||||
await mc.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
# Connection closed and lock released by context manager
|
||||
|
||||
# Print summary
|
||||
summary_parts = [f"ts={ts}"]
|
||||
if "bat" in metrics:
|
||||
bat_v = metrics["bat"] / 1000.0
|
||||
if "bat" in status_metrics:
|
||||
bat_v = status_metrics["bat"] / 1000.0
|
||||
summary_parts.append(f"bat={bat_v:.2f}V")
|
||||
if "uptime" in metrics:
|
||||
uptime_days = metrics["uptime"] // 86400
|
||||
if "uptime" in status_metrics:
|
||||
uptime_days = status_metrics["uptime"] // 86400
|
||||
summary_parts.append(f"uptime={int(uptime_days)}d")
|
||||
if "nb_recv" in metrics:
|
||||
summary_parts.append(f"rx={int(metrics['nb_recv'])}")
|
||||
if "nb_sent" in metrics:
|
||||
summary_parts.append(f"tx={int(metrics['nb_sent'])}")
|
||||
if "nb_recv" in status_metrics:
|
||||
summary_parts.append(f"rx={int(status_metrics['nb_recv'])}")
|
||||
if "nb_sent" in status_metrics:
|
||||
summary_parts.append(f"tx={int(status_metrics['nb_sent'])}")
|
||||
if telemetry_metrics:
|
||||
summary_parts.append(f"telem={len(telemetry_metrics)}")
|
||||
|
||||
log.info(f"Repeater ({node_name}): {', '.join(summary_parts)}")
|
||||
|
||||
# Write metrics to database
|
||||
if status_ok and metrics:
|
||||
try:
|
||||
inserted = insert_metrics(ts=ts, role="repeater", metrics=metrics)
|
||||
log.debug(f"Inserted {inserted} metrics to database (ts={ts})")
|
||||
except Exception as e:
|
||||
log.error(f"Failed to write metrics to database: {e}")
|
||||
return 1
|
||||
|
||||
return 0 if status_ok else 1
|
||||
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""MeshCore network monitoring library."""
|
||||
|
||||
__version__ = "0.2.4" # x-release-please-version
|
||||
__version__ = "0.2.7" # x-release-please-version
|
||||
|
||||
@@ -155,6 +155,14 @@ class Config:
|
||||
self.remote_cb_fails = get_int("REMOTE_CB_FAILS", 6)
|
||||
self.remote_cb_cooldown_s = get_int("REMOTE_CB_COOLDOWN_S", 3600)
|
||||
|
||||
# Telemetry collection (requires sensor board on repeater)
|
||||
self.telemetry_enabled = get_bool("TELEMETRY_ENABLED", False)
|
||||
# Separate settings allow tuning if telemetry proves problematic
|
||||
# Defaults match status settings - tune down if needed
|
||||
self.telemetry_timeout_s = get_int("TELEMETRY_TIMEOUT_S", 10)
|
||||
self.telemetry_retry_attempts = get_int("TELEMETRY_RETRY_ATTEMPTS", 2)
|
||||
self.telemetry_retry_backoff_s = get_int("TELEMETRY_RETRY_BACKOFF_S", 4)
|
||||
|
||||
# Paths (defaults are Docker container paths; native installs override via config)
|
||||
self.state_dir = get_path("STATE_DIR", "/data/state")
|
||||
self.out_dir = get_path("OUT_DIR", "/out")
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
"""MeshCore client wrapper with safe command execution and contact lookup."""
|
||||
|
||||
import asyncio
|
||||
from typing import Any, Optional, Callable, Coroutine
|
||||
import fcntl
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any, AsyncIterator, Callable, Coroutine, Optional
|
||||
|
||||
from .env import get_config
|
||||
from . import log
|
||||
@@ -100,6 +103,92 @@ async def connect_from_env() -> Optional[Any]:
|
||||
return None
|
||||
|
||||
|
||||
async def _acquire_lock_async(
|
||||
lock_file,
|
||||
timeout: float = 60.0,
|
||||
poll_interval: float = 0.1,
|
||||
) -> None:
|
||||
"""Acquire exclusive file lock without blocking the event loop.
|
||||
|
||||
Uses non-blocking LOCK_NB with async polling to avoid freezing the event loop.
|
||||
|
||||
Args:
|
||||
lock_file: Open file handle to lock
|
||||
timeout: Maximum seconds to wait for lock
|
||||
poll_interval: Seconds between lock attempts
|
||||
|
||||
Raises:
|
||||
TimeoutError: If lock cannot be acquired within timeout
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
deadline = loop.time() + timeout
|
||||
|
||||
while True:
|
||||
try:
|
||||
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
return
|
||||
except BlockingIOError:
|
||||
if loop.time() >= deadline:
|
||||
raise TimeoutError(
|
||||
f"Could not acquire serial lock within {timeout}s. "
|
||||
"Another process may be using the serial port."
|
||||
)
|
||||
await asyncio.sleep(poll_interval)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def connect_with_lock(
|
||||
lock_timeout: float = 60.0,
|
||||
) -> AsyncIterator[Optional[Any]]:
|
||||
"""Connect to MeshCore with serial port locking to prevent concurrent access.
|
||||
|
||||
For serial transport: Acquires exclusive file lock before connecting.
|
||||
For TCP/BLE: No locking needed (protocol handles multiple connections).
|
||||
|
||||
Args:
|
||||
lock_timeout: Maximum seconds to wait for serial lock
|
||||
|
||||
Yields:
|
||||
MeshCore client instance, or None if connection failed
|
||||
"""
|
||||
cfg = get_config()
|
||||
lock_file = None
|
||||
mc = None
|
||||
needs_lock = cfg.mesh_transport.lower() == "serial"
|
||||
|
||||
try:
|
||||
if needs_lock:
|
||||
lock_path: Path = cfg.state_dir / "serial.lock"
|
||||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Use 'a' mode: doesn't truncate, creates if missing
|
||||
lock_file = open(lock_path, "a")
|
||||
try:
|
||||
await _acquire_lock_async(lock_file, timeout=lock_timeout)
|
||||
log.debug(f"Acquired serial lock: {lock_path}")
|
||||
except Exception:
|
||||
# If lock acquisition fails, close file before re-raising
|
||||
lock_file.close()
|
||||
lock_file = None
|
||||
raise
|
||||
|
||||
mc = await connect_from_env()
|
||||
yield mc
|
||||
|
||||
finally:
|
||||
# Disconnect first (while we still hold the lock)
|
||||
if mc is not None and hasattr(mc, "disconnect"):
|
||||
try:
|
||||
await mc.disconnect()
|
||||
except Exception as e:
|
||||
log.debug(f"Error during disconnect (ignored): {e}")
|
||||
|
||||
# Release lock by closing the file (close() auto-releases flock)
|
||||
if lock_file is not None:
|
||||
lock_file.close()
|
||||
log.debug("Released serial lock")
|
||||
|
||||
|
||||
async def run_command(
|
||||
mc: Any,
|
||||
cmd_coro: Coroutine,
|
||||
|
||||
102
src/meshmon/telemetry.py
Normal file
102
src/meshmon/telemetry.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""Telemetry data extraction from Cayenne LPP format."""
|
||||
|
||||
from typing import Any
|
||||
from . import log
|
||||
|
||||
__all__ = ["extract_lpp_from_payload", "extract_telemetry_metrics"]
|
||||
|
||||
|
||||
def extract_lpp_from_payload(payload: Any) -> list | None:
|
||||
"""Extract LPP data list from telemetry payload.
|
||||
|
||||
Handles both formats returned by the MeshCore API:
|
||||
- Dict format: {'pubkey_pre': '...', 'lpp': [...]}
|
||||
- Direct list format: [...]
|
||||
|
||||
Args:
|
||||
payload: Raw telemetry payload from get_self_telemetry() or req_telemetry_sync()
|
||||
|
||||
Returns:
|
||||
The LPP data list, or None if not extractable.
|
||||
"""
|
||||
if payload is None:
|
||||
return None
|
||||
|
||||
if isinstance(payload, dict):
|
||||
lpp = payload.get("lpp")
|
||||
if lpp is None:
|
||||
log.debug("No 'lpp' key in telemetry payload dict")
|
||||
return None
|
||||
if not isinstance(lpp, list):
|
||||
log.debug(f"Unexpected LPP data type in payload: {type(lpp).__name__}")
|
||||
return None
|
||||
return lpp
|
||||
|
||||
if isinstance(payload, list):
|
||||
return payload
|
||||
|
||||
log.debug(f"Unexpected telemetry payload type: {type(payload).__name__}")
|
||||
return None
|
||||
|
||||
|
||||
def extract_telemetry_metrics(lpp_data: Any) -> dict[str, float]:
|
||||
"""Extract numeric telemetry values from Cayenne LPP response.
|
||||
|
||||
Expected format:
|
||||
[
|
||||
{"type": "temperature", "channel": 0, "value": 23.5},
|
||||
{"type": "gps", "channel": 1, "value": {"latitude": 51.5, "longitude": -0.1, "altitude": 10}}
|
||||
]
|
||||
|
||||
Keys are formatted as:
|
||||
- telemetry.{type}.{channel} for scalar values
|
||||
- telemetry.{type}.{channel}.{subkey} for compound values (e.g., GPS)
|
||||
|
||||
Returns:
|
||||
Dict mapping metric keys to float values. Invalid readings are skipped.
|
||||
"""
|
||||
if not isinstance(lpp_data, list):
|
||||
log.warn(f"Expected list for LPP data, got {type(lpp_data).__name__}")
|
||||
return {}
|
||||
|
||||
metrics: dict[str, float] = {}
|
||||
|
||||
for i, reading in enumerate(lpp_data):
|
||||
if not isinstance(reading, dict):
|
||||
log.debug(f"Skipping non-dict LPP reading at index {i}")
|
||||
continue
|
||||
|
||||
sensor_type = reading.get("type")
|
||||
if not isinstance(sensor_type, str) or not sensor_type.strip():
|
||||
log.debug(f"Skipping reading with invalid type at index {i}")
|
||||
continue
|
||||
|
||||
# Normalize sensor type for use as metric key component
|
||||
sensor_type = sensor_type.strip().lower().replace(" ", "_")
|
||||
|
||||
channel = reading.get("channel", 0)
|
||||
if not isinstance(channel, int):
|
||||
channel = 0
|
||||
|
||||
value = reading.get("value")
|
||||
base_key = f"telemetry.{sensor_type}.{channel}"
|
||||
|
||||
# Note: Check bool before int because bool is a subclass of int in Python.
|
||||
# Some sensors may report digital on/off values as booleans.
|
||||
if isinstance(value, bool):
|
||||
metrics[base_key] = float(value)
|
||||
elif isinstance(value, (int, float)):
|
||||
metrics[base_key] = float(value)
|
||||
elif isinstance(value, dict):
|
||||
for subkey, subval in value.items():
|
||||
if not isinstance(subkey, str):
|
||||
continue
|
||||
subkey_clean = subkey.strip().lower().replace(" ", "_")
|
||||
if not subkey_clean:
|
||||
continue
|
||||
if isinstance(subval, bool):
|
||||
metrics[f"{base_key}.{subkey_clean}"] = float(subval)
|
||||
elif isinstance(subval, (int, float)):
|
||||
metrics[f"{base_key}.{subkey_clean}"] = float(subval)
|
||||
|
||||
return metrics
|
||||
Reference in New Issue
Block a user