Initial release: MeshCore Stats monitoring system

A Python-based monitoring system for MeshCore LoRa mesh networks.
Collects metrics from companion and repeater nodes, stores them in
a SQLite database, and generates a static website with interactive
SVG charts and statistics.

Features:
- Data collection from local companion and remote repeater nodes
- SQLite database with EAV schema for flexible metric storage
- Interactive SVG chart generation with matplotlib
- Static HTML site with day/week/month/year views
- Monthly and yearly statistics reports (HTML, TXT, JSON)
- Light and dark theme support
- Circuit breaker for unreliable LoRa connections
- Battery percentage calculation from 18650 discharge curves
- Automated releases via release-please

Live demo: https://meshcore.jorijn.com
This commit is contained in:
Jorijn Schrijvershof
2026-01-04 19:37:57 +01:00
commit 0f8b0a3492
45 changed files with 9252 additions and 0 deletions

213
scripts/collect_companion.py Executable file
View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
Phase 1: Collect data from companion node.
Connects to the local companion node via serial and collects:
- Device info
- Battery status
- Time
- Self telemetry
- Custom vars
- Contacts list
Outputs:
- Concise summary to stdout
- Metrics written to SQLite database (EAV schema)
"""
import asyncio
import sys
import time
from pathlib import Path
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from meshmon.env import get_config
from meshmon import log
from meshmon.meshcore_client import connect_from_env, run_command
from meshmon.db import init_db, insert_metrics
async def collect_companion() -> int:
"""
Collect data from companion node.
Returns:
Exit code (0 = success, 1 = connection failed)
"""
cfg = get_config()
ts = int(time.time())
log.debug("Connecting to companion node...")
mc = await connect_from_env()
if mc is None:
log.error("Failed to connect to companion node")
return 1
# Metrics to insert (firmware field names)
metrics: dict[str, float] = {}
commands_succeeded = 0
# Commands are accessed via mc.commands
cmd = mc.commands
try:
# send_appstart (already called during connect, but call again to get self_info)
ok, evt_type, payload, err = await run_command(
mc, cmd.send_appstart(), "send_appstart"
)
if ok:
commands_succeeded += 1
log.debug(f"appstart: {evt_type}")
else:
log.error(f"appstart failed: {err}")
# send_device_query
ok, evt_type, payload, err = await run_command(
mc, cmd.send_device_query(), "send_device_query"
)
if ok:
commands_succeeded += 1
log.debug(f"device_query: {payload}")
else:
log.error(f"device_query failed: {err}")
# get_bat
ok, evt_type, payload, err = await run_command(
mc, cmd.get_bat(), "get_bat"
)
if ok:
commands_succeeded += 1
log.debug(f"get_bat: {payload}")
else:
log.error(f"get_bat failed: {err}")
# get_time
ok, evt_type, payload, err = await run_command(
mc, cmd.get_time(), "get_time"
)
if ok:
commands_succeeded += 1
log.debug(f"get_time: {payload}")
else:
log.error(f"get_time failed: {err}")
# get_self_telemetry
ok, evt_type, payload, err = await run_command(
mc, cmd.get_self_telemetry(), "get_self_telemetry"
)
if ok:
commands_succeeded += 1
log.debug(f"get_self_telemetry: {payload}")
else:
log.error(f"get_self_telemetry failed: {err}")
# get_custom_vars
ok, evt_type, payload, err = await run_command(
mc, cmd.get_custom_vars(), "get_custom_vars"
)
if ok:
commands_succeeded += 1
log.debug(f"get_custom_vars: {payload}")
else:
log.debug(f"get_custom_vars failed: {err}")
# get_contacts - count contacts
ok, evt_type, payload, err = await run_command(
mc, cmd.get_contacts(), "get_contacts"
)
if ok:
commands_succeeded += 1
contacts_count = len(payload) if payload else 0
metrics["contacts"] = float(contacts_count)
log.debug(f"get_contacts: found {contacts_count} contacts")
else:
log.error(f"get_contacts failed: {err}")
# Get statistics - these contain the main metrics
# Core stats (battery_mv, uptime_secs, errors, queue_len)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_core(), "get_stats_core"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
# Insert all numeric fields from stats_core
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_core: {payload}")
# Radio stats (noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_radio(), "get_stats_radio"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_radio: {payload}")
# Packet stats (recv, sent, flood_tx, direct_tx, flood_rx, direct_rx)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_packets(), "get_stats_packets"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_packets: {payload}")
except Exception as e:
log.error(f"Error during collection: {e}")
finally:
# Close connection
if hasattr(mc, "disconnect"):
try:
await mc.disconnect()
except Exception:
pass
# Print summary
summary_parts = [f"ts={ts}"]
if "battery_mv" in metrics:
bat_v = metrics["battery_mv"] / 1000.0
summary_parts.append(f"bat={bat_v:.2f}V")
if "contacts" in metrics:
summary_parts.append(f"contacts={int(metrics['contacts'])}")
if "recv" in metrics:
summary_parts.append(f"rx={int(metrics['recv'])}")
if "sent" in metrics:
summary_parts.append(f"tx={int(metrics['sent'])}")
log.info(f"Companion: {', '.join(summary_parts)}")
# Write metrics to database
if commands_succeeded > 0 and metrics:
try:
inserted = insert_metrics(ts=ts, role="companion", metrics=metrics)
log.debug(f"Inserted {inserted} metrics to database (ts={ts})")
except Exception as e:
log.error(f"Failed to write metrics to database: {e}")
return 1
return 0
else:
log.error("No commands succeeded or no metrics collected")
return 1
def main():
"""Entry point."""
# Ensure database is initialized
init_db()
exit_code = asyncio.run(collect_companion())
sys.exit(exit_code)
if __name__ == "__main__":
main()

307
scripts/collect_repeater.py Executable file
View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""
Phase 1: Collect data from remote repeater node.
Connects to the local companion node, finds the repeater contact,
and queries it over LoRa using binary protocol.
Features:
- Circuit breaker to avoid spamming LoRa
- Retry with backoff
- Timeout handling
Outputs:
- Concise summary to stdout
- Metrics written to SQLite database (EAV schema)
"""
import asyncio
import sys
import time
from pathlib import Path
from typing import Any, Callable, Coroutine, Optional
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from meshmon.env import get_config
from meshmon import log
from meshmon.meshcore_client import (
connect_from_env,
run_command,
get_contact_by_name,
get_contact_by_key_prefix,
extract_contact_info,
list_contacts_summary,
)
from meshmon.db import init_db, insert_metrics
from meshmon.retry import get_repeater_circuit_breaker, with_retries
async def find_repeater_contact(mc: Any) -> Optional[Any]:
"""
Find the repeater contact by name or key prefix.
Returns:
Contact dict or None
"""
cfg = get_config()
# Get all contacts first (this populates mc.contacts)
ok, evt_type, payload, err = await run_command(
mc, mc.commands.get_contacts(), "get_contacts"
)
if not ok:
log.error(f"Failed to get contacts: {err}")
return None
# payload is a dict keyed by public key, mc.contacts is also populated
# The get_contact_by_name method searches mc._contacts
contacts_dict = mc.contacts if hasattr(mc, "contacts") else {}
if isinstance(payload, dict):
contacts_dict = payload
# Try by name first using the helper (searches mc._contacts)
if cfg.repeater_name:
log.debug(f"Looking for repeater by name: {cfg.repeater_name}")
contact = get_contact_by_name(mc, cfg.repeater_name)
if contact:
return contact
# Manual search in payload dict
for pk, c in contacts_dict.items():
if isinstance(c, dict):
name = c.get("adv_name", "")
if name and name.lower() == cfg.repeater_name.lower():
return c
# Try by key prefix
if cfg.repeater_key_prefix:
log.debug(f"Looking for repeater by key prefix: {cfg.repeater_key_prefix}")
contact = get_contact_by_key_prefix(mc, cfg.repeater_key_prefix)
if contact:
return contact
# Manual search
prefix = cfg.repeater_key_prefix.lower()
for pk, c in contacts_dict.items():
if pk.lower().startswith(prefix):
return c
# Not found - print available contacts
log.error("Repeater contact not found")
log.info("Available contacts:")
for pk, c in contacts_dict.items():
if isinstance(c, dict):
name = c.get("adv_name", c.get("name", "unnamed"))
key = pk[:12] if pk else ""
log.info(f" - {name} (key: {key}...)")
return None
async def query_repeater_with_retry(
mc: Any,
contact: Any,
command_name: str,
command_coro_fn: Callable[[], Coroutine[Any, Any, Any]],
) -> tuple[bool, Optional[dict], Optional[str]]:
"""
Query repeater with retry logic.
The binary req_*_sync methods return the payload directly (or None on failure),
not an Event object.
Args:
mc: MeshCore instance
contact: Contact object
command_name: Name for logging
command_coro_fn: Function that returns command coroutine
Returns:
(success, payload, error_message)
"""
cfg = get_config()
async def do_query():
result = await command_coro_fn()
if result is None:
raise Exception("No response received")
return result
success, result, exc = await with_retries(
do_query,
attempts=cfg.remote_retry_attempts,
backoff_s=cfg.remote_retry_backoff_s,
name=command_name,
)
if success:
return (True, result, None)
else:
return (False, None, str(exc) if exc else "Failed")
async def collect_repeater() -> int:
"""
Collect data from remote repeater node.
Returns:
Exit code (0 = success, 1 = error)
"""
cfg = get_config()
ts = int(time.time())
# Check circuit breaker first
cb = get_repeater_circuit_breaker()
if cb.is_open():
remaining = cb.cooldown_remaining()
log.warn(f"Circuit breaker open, cooldown active ({remaining}s remaining)")
# Skip collection - no metrics to write
return 0
# Connect to companion
log.debug("Connecting to companion node...")
mc = await connect_from_env()
if mc is None:
log.error("Failed to connect to companion node")
return 1
# Metrics to insert (firmware field names from req_status_sync)
metrics: dict[str, float] = {}
node_name = "unknown"
status_ok = False
# Commands are accessed via mc.commands
cmd = mc.commands
try:
# Initialize (appstart already called during connect)
ok, evt_type, payload, err = await run_command(
mc, cmd.send_appstart(), "send_appstart"
)
if not ok:
log.error(f"appstart failed: {err}")
# Find repeater contact
contact = await find_repeater_contact(mc)
if contact is None:
log.error("Cannot find repeater contact")
return 1
# Store contact info
contact_info = extract_contact_info(contact)
node_name = contact_info.get("adv_name", "unknown")
log.debug(f"Found repeater: {node_name}")
# Optional login (if command exists)
if cfg.repeater_password and hasattr(cmd, "send_login"):
log.debug("Attempting login...")
try:
ok, evt_type, payload, err = await run_command(
mc,
cmd.send_login(contact, cfg.repeater_password),
"send_login",
)
if ok:
log.debug("Login successful")
else:
log.debug(f"Login failed or not supported: {err}")
except Exception as e:
log.debug(f"Login not supported: {e}")
# Query status (using _sync version which returns payload directly)
# Use timeout=0 to let the device suggest timeout, with min_timeout as floor
log.debug("Querying repeater status...")
success, payload, err = await query_repeater_with_retry(
mc,
contact,
"req_status_sync",
lambda: cmd.req_status_sync(contact, timeout=0, min_timeout=cfg.remote_timeout_s),
)
if success and payload and isinstance(payload, dict):
status_ok = True
# Insert all numeric fields from status response
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"req_status_sync: {payload}")
else:
log.warn(f"req_status_sync failed: {err}")
# Optional ACL query (using _sync version)
if cfg.repeater_fetch_acl:
log.debug("Querying repeater ACL...")
success, payload, err = await query_repeater_with_retry(
mc,
contact,
"req_acl_sync",
lambda: cmd.req_acl_sync(contact, timeout=0, min_timeout=cfg.remote_timeout_s),
)
if success:
log.debug(f"req_acl_sync: {payload}")
else:
log.debug(f"req_acl_sync failed: {err}")
# Update circuit breaker
if status_ok:
cb.record_success()
log.debug("Circuit breaker: recorded success")
else:
cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s)
log.debug(f"Circuit breaker: recorded failure ({cb.consecutive_failures}/{cfg.remote_cb_fails})")
except Exception as e:
log.error(f"Error during collection: {e}")
cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s)
finally:
# Close connection
if hasattr(mc, "disconnect"):
try:
await mc.disconnect()
except Exception:
pass
# Print summary
summary_parts = [f"ts={ts}"]
if "bat" in metrics:
bat_v = metrics["bat"] / 1000.0
summary_parts.append(f"bat={bat_v:.2f}V")
if "uptime" in metrics:
uptime_days = metrics["uptime"] // 86400
summary_parts.append(f"uptime={int(uptime_days)}d")
if "nb_recv" in metrics:
summary_parts.append(f"rx={int(metrics['nb_recv'])}")
if "nb_sent" in metrics:
summary_parts.append(f"tx={int(metrics['nb_sent'])}")
log.info(f"Repeater ({node_name}): {', '.join(summary_parts)}")
# Write metrics to database
if status_ok and metrics:
try:
inserted = insert_metrics(ts=ts, role="repeater", metrics=metrics)
log.debug(f"Inserted {inserted} metrics to database (ts={ts})")
except Exception as e:
log.error(f"Failed to write metrics to database: {e}")
return 1
return 0 if status_ok else 1
def main():
"""Entry point."""
# Ensure database is initialized
init_db()
exit_code = asyncio.run(collect_repeater())
sys.exit(exit_code)
if __name__ == "__main__":
main()

43
scripts/db_maintenance.sh Executable file
View File

@@ -0,0 +1,43 @@
#!/bin/bash
# Database maintenance script
#
# Runs VACUUM and ANALYZE on the SQLite database to compact it and
# update query statistics.
#
# Recommended: Run monthly via cron
# Example crontab entry:
# 0 3 1 * * cd /home/jorijn/apps/meshcore-stats && ./scripts/db_maintenance.sh
#
# This script will:
# 1. Run VACUUM to compact the database and reclaim space
# 2. Run ANALYZE to update query optimizer statistics
#
# Note: VACUUM acquires an exclusive lock internally. Other processes
# using busy_timeout will wait for it to complete.
set -e
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
DB_PATH="$PROJECT_DIR/data/state/metrics.db"
# Check if database exists
if [ ! -f "$DB_PATH" ]; then
echo "Database not found: $DB_PATH"
exit 1
fi
echo "$(date '+%Y-%m-%d %H:%M:%S') Starting database maintenance..."
echo "Database: $DB_PATH"
echo "Running VACUUM..."
sqlite3 "$DB_PATH" "VACUUM;"
echo "Running ANALYZE..."
sqlite3 "$DB_PATH" "ANALYZE;"
# Get database size
DB_SIZE=$(du -h "$DB_PATH" | cut -f1)
echo "Database size after maintenance: $DB_SIZE"
echo "$(date '+%Y-%m-%d %H:%M:%S') Maintenance complete"

51
scripts/render_charts.py Executable file
View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
"""
Phase 2: Render charts from SQLite database.
Generates SVG charts for day/week/month/year for both companion and repeater
using matplotlib, reading directly from the SQLite metrics database.
"""
import sys
from pathlib import Path
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from meshmon.db import init_db, get_metric_count
from meshmon import log
from meshmon.charts import render_all_charts, save_chart_stats
def main():
"""Render all charts and save statistics."""
# Ensure database is initialized
init_db()
log.info("Rendering charts from database...")
# Check if data exists before rendering
companion_count = get_metric_count("companion")
repeater_count = get_metric_count("repeater")
# Companion charts
if companion_count > 0:
charts, stats = render_all_charts("companion")
save_chart_stats("companion", stats)
log.info(f"Rendered {len(charts)} companion charts ({companion_count} data points)")
else:
log.warn("No companion metrics in database")
# Repeater charts
if repeater_count > 0:
charts, stats = render_all_charts("repeater")
save_chart_stats("repeater", stats)
log.info(f"Rendered {len(charts)} repeater charts ({repeater_count} data points)")
else:
log.warn("No repeater metrics in database")
log.info("Chart rendering complete")
if __name__ == "__main__":
main()

304
scripts/render_reports.py Executable file
View File

@@ -0,0 +1,304 @@
#!/usr/bin/env python3
"""
Phase 4: Render reports from SQLite database.
Generates monthly and yearly statistics reports in HTML, TXT, and JSON
formats for both repeater and companion nodes.
Output structure:
out/reports/
index.html # Reports listing
repeater/
2025/
index.html # Yearly report (HTML)
report.txt # Yearly report (TXT)
report.json # Yearly report (JSON)
12/
index.html # Monthly report (HTML)
report.txt # Monthly report (TXT)
report.json # Monthly report (JSON)
companion/
... # Same structure
"""
import calendar
import json
import sys
from pathlib import Path
from typing import Optional
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from meshmon.db import init_db
from meshmon.env import get_config
from meshmon import log
def safe_write(path: Path, content: str) -> bool:
"""Write content to file with error handling.
Args:
path: File path to write to
content: Content to write
Returns:
True if write succeeded, False otherwise
"""
try:
path.write_text(content, encoding="utf-8")
return True
except IOError as e:
log.error(f"Failed to write {path}: {e}")
return False
from meshmon.reports import (
LocationInfo,
aggregate_monthly,
aggregate_yearly,
format_monthly_txt,
format_yearly_txt,
get_available_periods,
monthly_to_json,
yearly_to_json,
)
from meshmon.html import render_report_page, render_reports_index
def get_node_name(role: str) -> str:
"""Get display name for a node role from configuration."""
cfg = get_config()
if role == "repeater":
return cfg.repeater_display_name
elif role == "companion":
return cfg.companion_display_name
return role.capitalize()
def get_location() -> LocationInfo:
"""Get location info from config."""
cfg = get_config()
return LocationInfo(
name=cfg.report_location_name,
lat=cfg.report_lat,
lon=cfg.report_lon,
elev=cfg.report_elev,
)
def render_monthly_report(
role: str,
year: int,
month: int,
prev_period: Optional[tuple[int, int]] = None,
next_period: Optional[tuple[int, int]] = None,
) -> None:
"""Render monthly report in all formats.
Args:
role: "companion" or "repeater"
year: Report year
month: Report month (1-12)
prev_period: (year, month) of previous report, or None
next_period: (year, month) of next report, or None
"""
cfg = get_config()
node_name = get_node_name(role)
location = get_location()
log.info(f"Aggregating {role} monthly report for {year}-{month:02d}...")
agg = aggregate_monthly(role, year, month)
if not agg.daily:
log.warn(f"No data for {role} {year}-{month:02d}, skipping")
return
# Create output directory
out_dir = cfg.out_dir / "reports" / role / str(year) / f"{month:02d}"
out_dir.mkdir(parents=True, exist_ok=True)
# Build prev/next navigation
prev_report = None
next_report = None
if prev_period:
py, pm = prev_period
prev_report = {
"url": f"/reports/{role}/{py}/{pm:02d}/",
"label": f"{calendar.month_abbr[pm]} {py}",
}
if next_period:
ny, nm = next_period
next_report = {
"url": f"/reports/{role}/{ny}/{nm:02d}/",
"label": f"{calendar.month_abbr[nm]} {ny}",
}
# HTML
html = render_report_page(agg, node_name, "monthly", prev_report, next_report)
safe_write(out_dir / "index.html", html)
# TXT (WeeWX-style)
txt = format_monthly_txt(agg, node_name, location)
safe_write(out_dir / "report.txt", txt)
# JSON
json_data = monthly_to_json(agg)
safe_write(out_dir / "report.json", json.dumps(json_data, indent=2))
log.debug(f"Wrote monthly report: {out_dir}")
def render_yearly_report(
role: str,
year: int,
prev_year: Optional[int] = None,
next_year: Optional[int] = None,
) -> None:
"""Render yearly report in all formats.
Args:
role: "companion" or "repeater"
year: Report year
prev_year: Previous year with data, or None
next_year: Next year with data, or None
"""
cfg = get_config()
node_name = get_node_name(role)
location = get_location()
log.info(f"Aggregating {role} yearly report for {year}...")
agg = aggregate_yearly(role, year)
if not agg.monthly:
log.warn(f"No data for {role} {year}, skipping")
return
# Create output directory
out_dir = cfg.out_dir / "reports" / role / str(year)
out_dir.mkdir(parents=True, exist_ok=True)
# Build prev/next navigation
prev_report = None
next_report = None
if prev_year:
prev_report = {
"url": f"/reports/{role}/{prev_year}/",
"label": str(prev_year),
}
if next_year:
next_report = {
"url": f"/reports/{role}/{next_year}/",
"label": str(next_year),
}
# HTML
html = render_report_page(agg, node_name, "yearly", prev_report, next_report)
safe_write(out_dir / "index.html", html)
# TXT (WeeWX-style)
txt = format_yearly_txt(agg, node_name, location)
safe_write(out_dir / "report.txt", txt)
# JSON
json_data = yearly_to_json(agg)
safe_write(out_dir / "report.json", json.dumps(json_data, indent=2))
log.debug(f"Wrote yearly report: {out_dir}")
def build_reports_index_data() -> list[dict]:
"""Build data structure for reports index page.
Returns:
List of section dicts with 'role' and 'years' keys
"""
sections = []
for role in ["repeater", "companion"]:
periods = get_available_periods(role)
if not periods:
sections.append({"role": role, "years": []})
continue
# Group by year
years_data = {}
for year, month in periods:
if year not in years_data:
years_data[year] = []
years_data[year].append({
"month": month,
"name": calendar.month_name[month],
})
# Build years list, sorted descending
years = []
for year in sorted(years_data.keys(), reverse=True):
years.append({
"year": year,
"months": sorted(years_data[year], key=lambda m: m["month"]),
})
sections.append({"role": role, "years": years})
return sections
def main():
"""Generate all statistics reports."""
# Ensure database is initialized
init_db()
cfg = get_config()
log.info("Generating reports from database...")
# Ensure base reports directory exists
(cfg.out_dir / "reports").mkdir(parents=True, exist_ok=True)
total_monthly = 0
total_yearly = 0
for role in ["repeater", "companion"]:
periods = get_available_periods(role)
if not periods:
log.info(f"No data found for {role}")
continue
log.info(f"Found {len(periods)} months of data for {role}")
# Sort periods chronologically for prev/next navigation
sorted_periods = sorted(periods)
# Render monthly reports with prev/next links
for i, (year, month) in enumerate(sorted_periods):
prev_period = sorted_periods[i - 1] if i > 0 else None
next_period = sorted_periods[i + 1] if i < len(sorted_periods) - 1 else None
render_monthly_report(role, year, month, prev_period, next_period)
total_monthly += 1
# Get unique years
years = sorted(set(y for y, m in periods))
# Render yearly reports with prev/next links
for i, year in enumerate(years):
prev_year = years[i - 1] if i > 0 else None
next_year = years[i + 1] if i < len(years) - 1 else None
render_yearly_report(role, year, prev_year, next_year)
total_yearly += 1
# Render reports index
log.info("Rendering reports index...")
sections = build_reports_index_data()
index_html = render_reports_index(sections)
safe_write(cfg.out_dir / "reports" / "index.html", index_html)
log.info(
f"Generated {total_monthly} monthly + {total_yearly} yearly reports "
f"to {cfg.out_dir / 'reports'}"
)
log.info("Report generation complete")
if __name__ == "__main__":
main()

52
scripts/render_site.py Executable file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""
Phase 3: Render static HTML site.
Generates static HTML pages using latest metrics from SQLite database
and rendered charts. Creates day/week/month/year pages for both
companion and repeater nodes.
"""
import sys
from pathlib import Path
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from meshmon.db import init_db, get_latest_metrics
from meshmon.env import get_config
from meshmon import log
from meshmon.html import write_site
def main():
"""Render static site."""
# Ensure database is initialized
init_db()
cfg = get_config()
log.info("Rendering static site...")
# Load latest metrics from database
companion_row = get_latest_metrics("companion")
if companion_row:
log.debug(f"Loaded companion metrics (ts={companion_row.get('ts')})")
else:
log.warn("No companion metrics found in database")
repeater_row = get_latest_metrics("repeater")
if repeater_row:
log.debug(f"Loaded repeater metrics (ts={repeater_row.get('ts')})")
else:
log.warn("No repeater metrics found in database")
# Write site
pages = write_site(companion_row, repeater_row)
log.info(f"Wrote {len(pages)} pages to {cfg.out_dir}")
log.info("Site rendering complete")
if __name__ == "__main__":
main()