mirror of
https://github.com/jorijn/meshcore-stats.git
synced 2026-03-28 17:42:55 +01:00
* test: add comprehensive pytest test suite with 95% coverage Add full unit and integration test coverage for the meshcore-stats project: - 1020 tests covering all modules (db, charts, html, reports, client, etc.) - 95.95% code coverage with pytest-cov (95% threshold enforced) - GitHub Actions CI workflow for automated testing on push/PR - Proper mocking of external dependencies (meshcore, serial, filesystem) - SVG snapshot infrastructure for chart regression testing - Integration tests for collection and rendering pipelines Test organization: - tests/charts/: Chart rendering and statistics - tests/client/: MeshCore client and connection handling - tests/config/: Environment and configuration parsing - tests/database/: SQLite operations and migrations - tests/html/: HTML generation and Jinja templates - tests/reports/: Report generation and formatting - tests/retry/: Circuit breaker and retry logic - tests/unit/: Pure unit tests for utilities - tests/integration/: End-to-end pipeline tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * chore: add test-engineer agent configuration Add project-local test-engineer agent for pytest test development, coverage analysis, and test review tasks. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * docs: comprehensive test suite review with 956 tests analyzed Conducted thorough review of all 956 test cases across 47 test files: - Unit Tests: 338 tests (battery, metrics, log, telemetry, env, charts, html, reports, formatters) - Config Tests: 53 tests (env loading, config file parsing) - Database Tests: 115 tests (init, insert, queries, migrations, maintenance, validation) - Retry Tests: 59 tests (circuit breaker, async retries, factory) - Charts Tests: 76 tests (transforms, statistics, timeseries, rendering, I/O) - HTML Tests: 81 tests (site generation, Jinja2, metrics builders, reports index) - Reports Tests: 149 tests (location, JSON/TXT formatting, aggregation, counter totals) - Client Tests: 63 tests (contacts, connection, meshcore availability, commands) - Integration Tests: 22 tests (reports, collection, rendering pipelines) Results: - Overall Pass Rate: 99.7% (953/956) - 3 tests marked for improvement (empty test bodies in client tests) - 0 tests requiring fixes Key findings documented in test_review/tests.md including quality observations, F.I.R.S.T. principle adherence, and recommendations. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * test: implement snapshot testing for charts and reports Add comprehensive snapshot testing infrastructure: SVG Chart Snapshots: - Deterministic fixtures with fixed timestamps (2024-01-15 12:00:00) - Tests for gauge/counter metrics in light/dark themes - Empty chart and single-point edge cases - Extended normalize_svg_for_snapshot_full() for reproducible comparisons TXT Report Snapshots: - Monthly/yearly report snapshots for repeater and companion - Empty report handling tests - Tests in tests/reports/test_snapshots.py Infrastructure: - tests/snapshots/conftest.py with shared fixtures - UPDATE_SNAPSHOTS=1 environment variable for regeneration - scripts/generate_snapshots.py for batch snapshot generation Run `UPDATE_SNAPSHOTS=1 pytest tests/charts/test_chart_render.py::TestSvgSnapshots` to generate initial snapshots. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * test: fix SVG normalization and generate initial snapshots Fix normalize_svg_for_snapshot() to handle: - clipPath IDs like id="p47c77a2a6e" - url(#p...) references - xlink:href="#p..." references - <dc:date> timestamps Generated initial snapshot files: - 7 SVG chart snapshots (gauge, counter, empty, single-point in light/dark) - 6 TXT report snapshots (monthly/yearly for repeater/companion + empty) All 13 snapshot tests now pass. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * test: fix SVG normalization to preserve axis rendering The SVG normalization was replacing all matplotlib-generated IDs with the same value, causing duplicate IDs that broke SVG rendering: - Font glyphs, clipPaths, and tick marks all got id="normalized" - References couldn't resolve to the correct elements - X and Y axes failed to render in normalized snapshots Fix uses type-specific prefixes with sequential numbering: - glyph_N for font glyphs (DejaVuSans-XX patterns) - clip_N for clipPath definitions (p[0-9a-f]{8,} patterns) - tick_N for tick marks (m[0-9a-f]{8,} patterns) This ensures all IDs remain unique while still being deterministic for snapshot comparison. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * chore: add coverage and pytest artifacts to gitignore Add .coverage, .coverage.*, htmlcov/, and .pytest_cache/ to prevent test artifacts from being committed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * style: fix all ruff lint errors across codebase - Sort and organize imports (I001) - Use modern type annotations (X | Y instead of Union, collections.abc) - Remove unused imports (F401) - Combine nested if statements (SIM102) - Use ternary operators where appropriate (SIM108) - Combine nested with statements (SIM117) - Use contextlib.suppress instead of try-except-pass (SIM105) - Add noqa comments for intentional SIM115 violations (file locks) - Add TYPE_CHECKING import for forward references - Fix exception chaining (B904) All 1033 tests pass. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * docs: add TDD workflow and pre-commit requirements to CLAUDE.md - Add mandatory test-driven development workflow (write tests first) - Add pre-commit requirements (must run lint and tests before committing) - Document test organization and running commands - Document 95% coverage requirement 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: resolve mypy type checking errors with proper structural fixes - charts.py: Create PeriodConfig dataclass for type-safe period configuration, use mdates.date2num() for matplotlib datetime handling, fix x-axis limits for single-point charts - db.py: Add explicit int() conversion with None handling for SQLite returns - env.py: Add class-level type annotations to Config class - html.py: Add MetricDisplay TypedDict, fix import order, add proper type annotations for table data functions - meshcore_client.py: Add return type annotation Update tests to use new dataclass attribute access and regenerate SVG snapshots. Add mypy step to CLAUDE.md pre-commit requirements. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: cast Jinja2 template.render() to str for mypy Jinja2's type stubs declare render() as returning Any, but it actually returns str. Wrap with str() to satisfy mypy's no-any-return check. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * ci: improve workflow security and reliability - test.yml: Pin all actions by SHA, add concurrency control to cancel in-progress runs on rapid pushes - release-please.yml: Pin action by SHA, add 10-minute timeout - conftest.py: Fix snapshot_base_time to use explicit UTC timezone for consistent behavior across CI and local environments Regenerate SVG snapshots with UTC-aware timestamps. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: add mypy command to permissions in settings.local.json * test: add comprehensive script tests with coroutine warning fixes - Add tests/scripts/ with tests for collect_companion, collect_repeater, and render scripts (1135 tests total, 96% coverage) - Fix unawaited coroutine warnings by using AsyncMock properly for async functions and async_context_manager_factory fixture for context managers - Add --cov=scripts to CI workflow and pyproject.toml coverage config - Omit scripts/generate_snapshots.py from coverage (dev utility) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * docs: migrate claude setup to codex skills * feat: migrate dependencies to uv (#31) * fix: run tests through uv * test: fix ruff lint issues in tests Consolidate patch context managers and clean unused imports/variables Use datetime.UTC in snapshot fixtures * test: avoid unawaited async mocks in entrypoint tests * ci: replace codecov with github coverage artifacts Add junit XML output and coverage summary in job output Upload HTML and XML coverage artifacts (3.12 only) on every run --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
642 lines
24 KiB
Python
642 lines
24 KiB
Python
"""Tests for collect_companion.py script entry point.
|
|
|
|
These tests verify the actual script behavior, not just the library code.
|
|
The script is the entry point that users run - if it breaks, everything breaks.
|
|
"""
|
|
|
|
import inspect
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from tests.scripts.conftest import load_script_module
|
|
|
|
|
|
def load_collect_companion():
|
|
"""Load collect_companion.py as a module."""
|
|
return load_script_module("collect_companion.py")
|
|
|
|
|
|
class TestCollectCompanionImport:
|
|
"""Verify script can be imported without errors."""
|
|
|
|
def test_imports_successfully(self, configured_env):
|
|
"""Script should import without errors."""
|
|
module = load_collect_companion()
|
|
|
|
assert hasattr(module, "main")
|
|
assert hasattr(module, "collect_companion")
|
|
assert callable(module.main)
|
|
|
|
def test_collect_companion_is_async(self, configured_env):
|
|
"""collect_companion() should be an async function."""
|
|
module = load_collect_companion()
|
|
assert inspect.iscoroutinefunction(module.collect_companion)
|
|
|
|
|
|
class TestCollectCompanionExitCodes:
|
|
"""Test exit code behavior - critical for monitoring."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_zero_on_successful_collection(
|
|
self, configured_env, async_context_manager_factory, mock_run_command_factory
|
|
):
|
|
"""Successful collection should return exit code 0."""
|
|
module = load_collect_companion()
|
|
|
|
responses = {
|
|
"send_appstart": (True, "SELF_INFO", {}, None),
|
|
"send_device_query": (True, "DEVICE_INFO", {}, None),
|
|
"get_time": (True, "TIME", {"time": 1234567890}, None),
|
|
"get_self_telemetry": (True, "TELEMETRY", {}, None),
|
|
"get_custom_vars": (True, "CUSTOM_VARS", {}, None),
|
|
"get_contacts": (True, "CONTACTS", {"c1": {}, "c2": {}}, None),
|
|
"get_stats_core": (
|
|
True,
|
|
"STATS_CORE",
|
|
{"battery_mv": 3850, "uptime_secs": 86400},
|
|
None,
|
|
),
|
|
"get_stats_radio": (True, "STATS_RADIO", {"noise_floor": -115}, None),
|
|
"get_stats_packets": (True, "STATS_PACKETS", {"recv": 100, "sent": 50}, None),
|
|
}
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(
|
|
module, "run_command", side_effect=mock_run_command_factory(responses)
|
|
),
|
|
patch.object(module, "insert_metrics", return_value=5),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
assert exit_code == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_one_on_connection_failure(
|
|
self, configured_env, async_context_manager_factory
|
|
):
|
|
"""Failed connection should return exit code 1."""
|
|
module = load_collect_companion()
|
|
|
|
# Connection returns None (failed)
|
|
ctx_mock = async_context_manager_factory(None)
|
|
|
|
with patch.object(module, "connect_with_lock", return_value=ctx_mock):
|
|
exit_code = await module.collect_companion()
|
|
|
|
assert exit_code == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_one_when_no_commands_succeed(
|
|
self, configured_env, async_context_manager_factory
|
|
):
|
|
"""No successful commands should return exit code 1."""
|
|
module = load_collect_companion()
|
|
|
|
# All commands fail
|
|
async def mock_run_command_fail(mc, coro, name):
|
|
return (False, None, None, "Command failed")
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command_fail),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
assert exit_code == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_one_on_database_error(
|
|
self, configured_env, async_context_manager_factory, mock_run_command_factory
|
|
):
|
|
"""Database write failure should return exit code 1."""
|
|
module = load_collect_companion()
|
|
|
|
responses = {
|
|
"get_stats_core": (True, "STATS_CORE", {"battery_mv": 3850}, None),
|
|
}
|
|
# Default to success for other commands
|
|
default = (True, "OK", {}, None)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(
|
|
module, "run_command", side_effect=mock_run_command_factory(responses, default)
|
|
),
|
|
patch.object(module, "insert_metrics", side_effect=Exception("DB error")),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
assert exit_code == 1
|
|
|
|
|
|
class TestCollectCompanionMetrics:
|
|
"""Test metric collection behavior."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_collects_all_numeric_fields_from_stats(
|
|
self, configured_env, async_context_manager_factory, mock_run_command_factory
|
|
):
|
|
"""Should insert all numeric fields from stats responses."""
|
|
module = load_collect_companion()
|
|
collected_metrics = {}
|
|
|
|
responses = {
|
|
"send_appstart": (True, "SELF_INFO", {}, None),
|
|
"send_device_query": (True, "DEVICE_INFO", {}, None),
|
|
"get_time": (True, "TIME", {}, None),
|
|
"get_self_telemetry": (True, "TELEMETRY", {}, None),
|
|
"get_custom_vars": (True, "CUSTOM_VARS", {}, None),
|
|
"get_contacts": (True, "CONTACTS", {"c1": {}, "c2": {}, "c3": {}}, None),
|
|
"get_stats_core": (
|
|
True,
|
|
"STATS_CORE",
|
|
{"battery_mv": 3850, "uptime_secs": 86400, "errors": 0},
|
|
None,
|
|
),
|
|
"get_stats_radio": (
|
|
True,
|
|
"STATS_RADIO",
|
|
{"noise_floor": -115, "last_rssi": -85, "last_snr": 7.5},
|
|
None,
|
|
),
|
|
"get_stats_packets": (True, "STATS_PACKETS", {"recv": 100, "sent": 50}, None),
|
|
}
|
|
|
|
def capture_metrics(ts, role, metrics, conn=None):
|
|
collected_metrics.update(metrics)
|
|
return len(metrics)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(
|
|
module, "run_command", side_effect=mock_run_command_factory(responses)
|
|
),
|
|
patch.object(module, "insert_metrics", side_effect=capture_metrics),
|
|
):
|
|
await module.collect_companion()
|
|
|
|
# Verify all expected metrics were collected
|
|
assert collected_metrics["battery_mv"] == 3850
|
|
assert collected_metrics["uptime_secs"] == 86400
|
|
assert collected_metrics["contacts"] == 3 # From get_contacts count
|
|
assert collected_metrics["recv"] == 100
|
|
assert collected_metrics["sent"] == 50
|
|
assert collected_metrics["noise_floor"] == -115
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telemetry_not_extracted_when_disabled(
|
|
self, configured_env, async_context_manager_factory, monkeypatch
|
|
):
|
|
"""Telemetry metrics should NOT be extracted when TELEMETRY_ENABLED=0 (default)."""
|
|
module = load_collect_companion()
|
|
collected_metrics = {}
|
|
|
|
async def mock_run_command(mc, coro, name):
|
|
if name == "get_self_telemetry":
|
|
# Return telemetry payload with LPP data
|
|
return (True, "TELEMETRY", {"lpp": b"\x00\x67\x01\x00"}, None)
|
|
if name == "get_stats_core":
|
|
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
|
|
return (True, "OK", {}, None)
|
|
|
|
def capture_metrics(ts, role, metrics, conn=None):
|
|
collected_metrics.update(metrics)
|
|
return len(metrics)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command),
|
|
patch.object(module, "insert_metrics", side_effect=capture_metrics),
|
|
):
|
|
await module.collect_companion()
|
|
|
|
# No telemetry.* keys should be present
|
|
telemetry_keys = [k for k in collected_metrics if k.startswith("telemetry.")]
|
|
assert len(telemetry_keys) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telemetry_extracted_when_enabled(
|
|
self, configured_env, async_context_manager_factory, monkeypatch
|
|
):
|
|
"""Telemetry metrics SHOULD be extracted when TELEMETRY_ENABLED=1."""
|
|
# Enable telemetry BEFORE loading the module
|
|
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
|
|
import meshmon.env
|
|
|
|
meshmon.env._config = None
|
|
|
|
module = load_collect_companion()
|
|
collected_metrics = {}
|
|
|
|
# LPP data format: list of dictionaries with type, channel, value
|
|
# This matches the format from MeshCore API
|
|
lpp_data = [
|
|
{"type": "temperature", "channel": 0, "value": 25.5},
|
|
]
|
|
|
|
async def mock_run_command(mc, coro, name):
|
|
if name == "get_self_telemetry":
|
|
return (True, "TELEMETRY", {"lpp": lpp_data}, None)
|
|
if name == "get_stats_core":
|
|
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
|
|
return (True, "OK", {}, None)
|
|
|
|
def capture_metrics(ts, role, metrics, conn=None):
|
|
collected_metrics.update(metrics)
|
|
return len(metrics)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command),
|
|
patch.object(module, "insert_metrics", side_effect=capture_metrics),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
assert exit_code == 0
|
|
# Telemetry keys should be present
|
|
telemetry_keys = [k for k in collected_metrics if k.startswith("telemetry.")]
|
|
assert len(telemetry_keys) > 0, f"Expected telemetry keys, got: {collected_metrics.keys()}"
|
|
assert "telemetry.temperature.0" in collected_metrics
|
|
assert collected_metrics["telemetry.temperature.0"] == 25.5
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telemetry_extraction_handles_invalid_lpp(
|
|
self, configured_env, async_context_manager_factory, monkeypatch
|
|
):
|
|
"""Telemetry extraction should handle invalid LPP data gracefully."""
|
|
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
|
|
import meshmon.env
|
|
|
|
meshmon.env._config = None
|
|
|
|
module = load_collect_companion()
|
|
collected_metrics = {}
|
|
|
|
async def mock_run_command(mc, coro, name):
|
|
if name == "get_self_telemetry":
|
|
# Invalid LPP data (too short)
|
|
return (True, "TELEMETRY", {"lpp": b"\x00"}, None)
|
|
if name == "get_stats_core":
|
|
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
|
|
return (True, "OK", {}, None)
|
|
|
|
def capture_metrics(ts, role, metrics, conn=None):
|
|
collected_metrics.update(metrics)
|
|
return len(metrics)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command),
|
|
patch.object(module, "insert_metrics", side_effect=capture_metrics),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
# Should still succeed - just no telemetry extracted
|
|
assert exit_code == 0
|
|
# No telemetry keys because LPP was invalid
|
|
telemetry_keys = [k for k in collected_metrics if k.startswith("telemetry.")]
|
|
assert len(telemetry_keys) == 0
|
|
|
|
|
|
class TestPartialSuccessScenarios:
|
|
"""Test behavior when only some commands succeed."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_succeeds_with_only_stats_core(
|
|
self, configured_env, async_context_manager_factory
|
|
):
|
|
"""Should succeed if only stats_core returns metrics."""
|
|
module = load_collect_companion()
|
|
collected_metrics = {}
|
|
|
|
async def mock_run_command(mc, coro, name):
|
|
if name == "get_stats_core":
|
|
return (True, "STATS_CORE", {"battery_mv": 3850, "uptime_secs": 1000}, None)
|
|
# All other commands fail
|
|
return (False, None, None, "Timeout")
|
|
|
|
def capture_metrics(ts, role, metrics, conn=None):
|
|
collected_metrics.update(metrics)
|
|
return len(metrics)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command),
|
|
patch.object(module, "insert_metrics", side_effect=capture_metrics),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
# Should succeed because stats_core succeeded and had metrics
|
|
assert exit_code == 0
|
|
assert collected_metrics["battery_mv"] == 3850
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_succeeds_with_only_contacts(
|
|
self, configured_env, async_context_manager_factory
|
|
):
|
|
"""Should succeed if only contacts command returns data."""
|
|
module = load_collect_companion()
|
|
collected_metrics = {}
|
|
|
|
async def mock_run_command(mc, coro, name):
|
|
if name == "get_contacts":
|
|
return (True, "CONTACTS", {"c1": {}, "c2": {}}, None)
|
|
# Stats commands succeed but return no numeric data
|
|
if name.startswith("get_stats"):
|
|
return (True, "OK", {}, None)
|
|
# Other commands succeed
|
|
return (True, "OK", {}, None)
|
|
|
|
def capture_metrics(ts, role, metrics, conn=None):
|
|
collected_metrics.update(metrics)
|
|
return len(metrics)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command),
|
|
patch.object(module, "insert_metrics", side_effect=capture_metrics),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
assert exit_code == 0
|
|
assert collected_metrics["contacts"] == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fails_when_metrics_empty_despite_success(
|
|
self, configured_env, async_context_manager_factory
|
|
):
|
|
"""Should fail if commands succeed but no metrics collected."""
|
|
module = load_collect_companion()
|
|
|
|
async def mock_run_command(mc, coro, name):
|
|
# Commands succeed but return empty/non-dict payloads
|
|
if name == "get_stats_core":
|
|
return (True, "STATS_CORE", None, None) # No payload
|
|
if name == "get_stats_radio":
|
|
return (True, "STATS_RADIO", "not a dict", None) # Invalid payload
|
|
if name == "get_stats_packets":
|
|
return (True, "STATS_PACKETS", {}, None) # Empty payload
|
|
if name == "get_contacts":
|
|
return (False, None, None, "Failed") # Fails
|
|
return (True, "OK", {}, None)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
# Should fail because no metrics were collected
|
|
assert exit_code == 1
|
|
|
|
|
|
class TestExceptionHandling:
|
|
"""Test exception handling in the command loop (lines 165-166)."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_exception_in_command_loop(
|
|
self, configured_env, async_context_manager_factory
|
|
):
|
|
"""Should catch and log exceptions during command execution."""
|
|
module = load_collect_companion()
|
|
|
|
call_count = 0
|
|
|
|
async def mock_run_command_with_exception(mc, coro, name):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 3: # Fail on third command
|
|
raise RuntimeError("Unexpected network error")
|
|
return (True, "OK", {}, None)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command_with_exception),
|
|
patch.object(module, "log") as mock_log,
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
# Should have logged the error
|
|
error_calls = [c for c in mock_log.error.call_args_list if "Error during collection" in str(c)]
|
|
assert len(error_calls) > 0
|
|
|
|
# Should return 1 because exception interrupted collection
|
|
assert exit_code == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exception_closes_connection_properly(
|
|
self, configured_env, async_context_manager_factory
|
|
):
|
|
"""Context manager should still exit properly after exception."""
|
|
module = load_collect_companion()
|
|
|
|
async def mock_run_command_raise(mc, coro, name):
|
|
raise RuntimeError("Connection lost")
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command_raise),
|
|
):
|
|
await module.collect_companion()
|
|
|
|
# Verify context manager was properly exited
|
|
assert ctx_mock.exited is True
|
|
|
|
|
|
class TestMainEntryPoint:
|
|
"""Test the main() entry point behavior."""
|
|
|
|
def test_main_calls_init_db(self, configured_env):
|
|
"""main() should initialize database before collection."""
|
|
module = load_collect_companion()
|
|
|
|
with (
|
|
patch.object(module, "init_db") as mock_init,
|
|
patch.object(module, "collect_companion", new=MagicMock(return_value=0)),
|
|
patch.object(module, "asyncio") as mock_asyncio,
|
|
patch.object(module, "sys"),
|
|
):
|
|
# Patch collect_companion to return a non-coroutine to avoid unawaited coroutine warning
|
|
mock_asyncio.run.return_value = 0
|
|
module.main()
|
|
|
|
mock_init.assert_called_once()
|
|
|
|
def test_main_exits_with_collection_result(self, configured_env):
|
|
"""main() should exit with the collection exit code."""
|
|
module = load_collect_companion()
|
|
|
|
with (
|
|
patch.object(module, "init_db"),
|
|
patch.object(module, "collect_companion", new=MagicMock(return_value=1)),
|
|
patch.object(module, "asyncio") as mock_asyncio,
|
|
patch.object(module, "sys") as mock_sys,
|
|
):
|
|
# Patch collect_companion to return a non-coroutine to avoid unawaited coroutine warning
|
|
mock_asyncio.run.return_value = 1 # Collection failed
|
|
module.main()
|
|
|
|
mock_sys.exit.assert_called_once_with(1)
|
|
|
|
def test_main_runs_collect_companion_async(self, configured_env):
|
|
"""main() should run collect_companion() with asyncio.run()."""
|
|
module = load_collect_companion()
|
|
|
|
with (
|
|
patch.object(module, "init_db"),
|
|
patch.object(module, "collect_companion", new=MagicMock(return_value=0)),
|
|
patch.object(module, "asyncio") as mock_asyncio,
|
|
patch.object(module, "sys"),
|
|
):
|
|
# Patch collect_companion to return a non-coroutine to avoid unawaited coroutine warning
|
|
mock_asyncio.run.return_value = 0
|
|
module.main()
|
|
|
|
# asyncio.run should be called with the return value
|
|
mock_asyncio.run.assert_called_once()
|
|
|
|
|
|
class TestDatabaseIntegration:
|
|
"""Test that collection actually writes to database."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_writes_metrics_to_database(
|
|
self, configured_env, initialized_db, async_context_manager_factory, mock_run_command_factory
|
|
):
|
|
"""Collection should write metrics to database."""
|
|
from meshmon.db import get_latest_metrics
|
|
|
|
module = load_collect_companion()
|
|
|
|
responses = {
|
|
"send_appstart": (True, "SELF_INFO", {}, None),
|
|
"send_device_query": (True, "DEVICE_INFO", {}, None),
|
|
"get_time": (True, "TIME", {}, None),
|
|
"get_self_telemetry": (True, "TELEMETRY", {}, None),
|
|
"get_custom_vars": (True, "CUSTOM_VARS", {}, None),
|
|
"get_contacts": (True, "CONTACTS", {"c1": {}}, None),
|
|
"get_stats_core": (
|
|
True,
|
|
"STATS_CORE",
|
|
{"battery_mv": 3777, "uptime_secs": 12345},
|
|
None,
|
|
),
|
|
"get_stats_radio": (True, "STATS_RADIO", {}, None),
|
|
"get_stats_packets": (True, "STATS_PACKETS", {"recv": 999, "sent": 888}, None),
|
|
}
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(
|
|
module, "run_command", side_effect=mock_run_command_factory(responses)
|
|
),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
assert exit_code == 0
|
|
|
|
# Verify data was written to database
|
|
latest = get_latest_metrics("companion")
|
|
assert latest is not None
|
|
assert latest["battery_mv"] == 3777
|
|
assert latest["recv"] == 999
|
|
assert latest["sent"] == 888
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_writes_telemetry_to_database_when_enabled(
|
|
self, configured_env, initialized_db, async_context_manager_factory, monkeypatch
|
|
):
|
|
"""Telemetry should be written to database when enabled."""
|
|
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
|
|
import meshmon.env
|
|
|
|
meshmon.env._config = None
|
|
|
|
from meshmon.db import get_latest_metrics
|
|
|
|
module = load_collect_companion()
|
|
|
|
# LPP data format: list of dictionaries with type, channel, value
|
|
lpp_data = [
|
|
{"type": "temperature", "channel": 0, "value": 25.5},
|
|
]
|
|
|
|
async def mock_run_command(mc, coro, name):
|
|
if name == "get_self_telemetry":
|
|
return (True, "TELEMETRY", {"lpp": lpp_data}, None)
|
|
if name == "get_stats_core":
|
|
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
|
|
return (True, "OK", {}, None)
|
|
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
ctx_mock = async_context_manager_factory(mc)
|
|
|
|
with (
|
|
patch.object(module, "connect_with_lock", return_value=ctx_mock),
|
|
patch.object(module, "run_command", side_effect=mock_run_command),
|
|
):
|
|
exit_code = await module.collect_companion()
|
|
|
|
assert exit_code == 0
|
|
|
|
# Verify telemetry was written to database
|
|
latest = get_latest_metrics("companion")
|
|
assert latest is not None
|
|
assert "telemetry.temperature.0" in latest
|
|
assert latest["telemetry.temperature.0"] == 25.5
|