Files
meshcore-stats/tests/scripts/test_collect_companion.py
Jorijn Schrijvershof a9f6926104 test: add comprehensive pytest test suite with 95% coverage (#29)
* test: add comprehensive pytest test suite with 95% coverage

Add full unit and integration test coverage for the meshcore-stats project:

- 1020 tests covering all modules (db, charts, html, reports, client, etc.)
- 95.95% code coverage with pytest-cov (95% threshold enforced)
- GitHub Actions CI workflow for automated testing on push/PR
- Proper mocking of external dependencies (meshcore, serial, filesystem)
- SVG snapshot infrastructure for chart regression testing
- Integration tests for collection and rendering pipelines

Test organization:
- tests/charts/: Chart rendering and statistics
- tests/client/: MeshCore client and connection handling
- tests/config/: Environment and configuration parsing
- tests/database/: SQLite operations and migrations
- tests/html/: HTML generation and Jinja templates
- tests/reports/: Report generation and formatting
- tests/retry/: Circuit breaker and retry logic
- tests/unit/: Pure unit tests for utilities
- tests/integration/: End-to-end pipeline tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* chore: add test-engineer agent configuration

Add project-local test-engineer agent for pytest test development,
coverage analysis, and test review tasks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* docs: comprehensive test suite review with 956 tests analyzed

Conducted thorough review of all 956 test cases across 47 test files:

- Unit Tests: 338 tests (battery, metrics, log, telemetry, env, charts, html, reports, formatters)
- Config Tests: 53 tests (env loading, config file parsing)
- Database Tests: 115 tests (init, insert, queries, migrations, maintenance, validation)
- Retry Tests: 59 tests (circuit breaker, async retries, factory)
- Charts Tests: 76 tests (transforms, statistics, timeseries, rendering, I/O)
- HTML Tests: 81 tests (site generation, Jinja2, metrics builders, reports index)
- Reports Tests: 149 tests (location, JSON/TXT formatting, aggregation, counter totals)
- Client Tests: 63 tests (contacts, connection, meshcore availability, commands)
- Integration Tests: 22 tests (reports, collection, rendering pipelines)

Results:
- Overall Pass Rate: 99.7% (953/956)
- 3 tests marked for improvement (empty test bodies in client tests)
- 0 tests requiring fixes

Key findings documented in test_review/tests.md including quality
observations, F.I.R.S.T. principle adherence, and recommendations.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test: implement snapshot testing for charts and reports

Add comprehensive snapshot testing infrastructure:

SVG Chart Snapshots:
- Deterministic fixtures with fixed timestamps (2024-01-15 12:00:00)
- Tests for gauge/counter metrics in light/dark themes
- Empty chart and single-point edge cases
- Extended normalize_svg_for_snapshot_full() for reproducible comparisons

TXT Report Snapshots:
- Monthly/yearly report snapshots for repeater and companion
- Empty report handling tests
- Tests in tests/reports/test_snapshots.py

Infrastructure:
- tests/snapshots/conftest.py with shared fixtures
- UPDATE_SNAPSHOTS=1 environment variable for regeneration
- scripts/generate_snapshots.py for batch snapshot generation

Run `UPDATE_SNAPSHOTS=1 pytest tests/charts/test_chart_render.py::TestSvgSnapshots`
to generate initial snapshots.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test: fix SVG normalization and generate initial snapshots

Fix normalize_svg_for_snapshot() to handle:
- clipPath IDs like id="p47c77a2a6e"
- url(#p...) references
- xlink:href="#p..." references
- <dc:date> timestamps

Generated initial snapshot files:
- 7 SVG chart snapshots (gauge, counter, empty, single-point in light/dark)
- 6 TXT report snapshots (monthly/yearly for repeater/companion + empty)

All 13 snapshot tests now pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test: fix SVG normalization to preserve axis rendering

The SVG normalization was replacing all matplotlib-generated IDs with
the same value, causing duplicate IDs that broke SVG rendering:
- Font glyphs, clipPaths, and tick marks all got id="normalized"
- References couldn't resolve to the correct elements
- X and Y axes failed to render in normalized snapshots

Fix uses type-specific prefixes with sequential numbering:
- glyph_N for font glyphs (DejaVuSans-XX patterns)
- clip_N for clipPath definitions (p[0-9a-f]{8,} patterns)
- tick_N for tick marks (m[0-9a-f]{8,} patterns)

This ensures all IDs remain unique while still being deterministic
for snapshot comparison.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* chore: add coverage and pytest artifacts to gitignore

Add .coverage, .coverage.*, htmlcov/, and .pytest_cache/ to prevent
test artifacts from being committed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* style: fix all ruff lint errors across codebase

- Sort and organize imports (I001)
- Use modern type annotations (X | Y instead of Union, collections.abc)
- Remove unused imports (F401)
- Combine nested if statements (SIM102)
- Use ternary operators where appropriate (SIM108)
- Combine nested with statements (SIM117)
- Use contextlib.suppress instead of try-except-pass (SIM105)
- Add noqa comments for intentional SIM115 violations (file locks)
- Add TYPE_CHECKING import for forward references
- Fix exception chaining (B904)

All 1033 tests pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* docs: add TDD workflow and pre-commit requirements to CLAUDE.md

- Add mandatory test-driven development workflow (write tests first)
- Add pre-commit requirements (must run lint and tests before committing)
- Document test organization and running commands
- Document 95% coverage requirement

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: resolve mypy type checking errors with proper structural fixes

- charts.py: Create PeriodConfig dataclass for type-safe period configuration,
  use mdates.date2num() for matplotlib datetime handling, fix x-axis limits
  for single-point charts
- db.py: Add explicit int() conversion with None handling for SQLite returns
- env.py: Add class-level type annotations to Config class
- html.py: Add MetricDisplay TypedDict, fix import order, add proper type
  annotations for table data functions
- meshcore_client.py: Add return type annotation

Update tests to use new dataclass attribute access and regenerate SVG
snapshots. Add mypy step to CLAUDE.md pre-commit requirements.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: cast Jinja2 template.render() to str for mypy

Jinja2's type stubs declare render() as returning Any, but it actually
returns str. Wrap with str() to satisfy mypy's no-any-return check.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ci: improve workflow security and reliability

- test.yml: Pin all actions by SHA, add concurrency control to cancel
  in-progress runs on rapid pushes
- release-please.yml: Pin action by SHA, add 10-minute timeout
- conftest.py: Fix snapshot_base_time to use explicit UTC timezone for
  consistent behavior across CI and local environments

Regenerate SVG snapshots with UTC-aware timestamps.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: add mypy command to permissions in settings.local.json

* test: add comprehensive script tests with coroutine warning fixes

- Add tests/scripts/ with tests for collect_companion, collect_repeater,
  and render scripts (1135 tests total, 96% coverage)
- Fix unawaited coroutine warnings by using AsyncMock properly for async
  functions and async_context_manager_factory fixture for context managers
- Add --cov=scripts to CI workflow and pyproject.toml coverage config
- Omit scripts/generate_snapshots.py from coverage (dev utility)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* docs: migrate claude setup to codex skills

* feat: migrate dependencies to uv (#31)

* fix: run tests through uv

* test: fix ruff lint issues in tests

Consolidate patch context managers and clean unused imports/variables

Use datetime.UTC in snapshot fixtures

* test: avoid unawaited async mocks in entrypoint tests

* ci: replace codecov with github coverage artifacts

Add junit XML output and coverage summary in job output

Upload HTML and XML coverage artifacts (3.12 only) on every run

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-08 17:16:53 +01:00

642 lines
24 KiB
Python

"""Tests for collect_companion.py script entry point.
These tests verify the actual script behavior, not just the library code.
The script is the entry point that users run - if it breaks, everything breaks.
"""
import inspect
from unittest.mock import MagicMock, patch
import pytest
from tests.scripts.conftest import load_script_module
def load_collect_companion():
"""Load collect_companion.py as a module."""
return load_script_module("collect_companion.py")
class TestCollectCompanionImport:
"""Verify script can be imported without errors."""
def test_imports_successfully(self, configured_env):
"""Script should import without errors."""
module = load_collect_companion()
assert hasattr(module, "main")
assert hasattr(module, "collect_companion")
assert callable(module.main)
def test_collect_companion_is_async(self, configured_env):
"""collect_companion() should be an async function."""
module = load_collect_companion()
assert inspect.iscoroutinefunction(module.collect_companion)
class TestCollectCompanionExitCodes:
"""Test exit code behavior - critical for monitoring."""
@pytest.mark.asyncio
async def test_returns_zero_on_successful_collection(
self, configured_env, async_context_manager_factory, mock_run_command_factory
):
"""Successful collection should return exit code 0."""
module = load_collect_companion()
responses = {
"send_appstart": (True, "SELF_INFO", {}, None),
"send_device_query": (True, "DEVICE_INFO", {}, None),
"get_time": (True, "TIME", {"time": 1234567890}, None),
"get_self_telemetry": (True, "TELEMETRY", {}, None),
"get_custom_vars": (True, "CUSTOM_VARS", {}, None),
"get_contacts": (True, "CONTACTS", {"c1": {}, "c2": {}}, None),
"get_stats_core": (
True,
"STATS_CORE",
{"battery_mv": 3850, "uptime_secs": 86400},
None,
),
"get_stats_radio": (True, "STATS_RADIO", {"noise_floor": -115}, None),
"get_stats_packets": (True, "STATS_PACKETS", {"recv": 100, "sent": 50}, None),
}
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(
module, "run_command", side_effect=mock_run_command_factory(responses)
),
patch.object(module, "insert_metrics", return_value=5),
):
exit_code = await module.collect_companion()
assert exit_code == 0
@pytest.mark.asyncio
async def test_returns_one_on_connection_failure(
self, configured_env, async_context_manager_factory
):
"""Failed connection should return exit code 1."""
module = load_collect_companion()
# Connection returns None (failed)
ctx_mock = async_context_manager_factory(None)
with patch.object(module, "connect_with_lock", return_value=ctx_mock):
exit_code = await module.collect_companion()
assert exit_code == 1
@pytest.mark.asyncio
async def test_returns_one_when_no_commands_succeed(
self, configured_env, async_context_manager_factory
):
"""No successful commands should return exit code 1."""
module = load_collect_companion()
# All commands fail
async def mock_run_command_fail(mc, coro, name):
return (False, None, None, "Command failed")
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command_fail),
):
exit_code = await module.collect_companion()
assert exit_code == 1
@pytest.mark.asyncio
async def test_returns_one_on_database_error(
self, configured_env, async_context_manager_factory, mock_run_command_factory
):
"""Database write failure should return exit code 1."""
module = load_collect_companion()
responses = {
"get_stats_core": (True, "STATS_CORE", {"battery_mv": 3850}, None),
}
# Default to success for other commands
default = (True, "OK", {}, None)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(
module, "run_command", side_effect=mock_run_command_factory(responses, default)
),
patch.object(module, "insert_metrics", side_effect=Exception("DB error")),
):
exit_code = await module.collect_companion()
assert exit_code == 1
class TestCollectCompanionMetrics:
"""Test metric collection behavior."""
@pytest.mark.asyncio
async def test_collects_all_numeric_fields_from_stats(
self, configured_env, async_context_manager_factory, mock_run_command_factory
):
"""Should insert all numeric fields from stats responses."""
module = load_collect_companion()
collected_metrics = {}
responses = {
"send_appstart": (True, "SELF_INFO", {}, None),
"send_device_query": (True, "DEVICE_INFO", {}, None),
"get_time": (True, "TIME", {}, None),
"get_self_telemetry": (True, "TELEMETRY", {}, None),
"get_custom_vars": (True, "CUSTOM_VARS", {}, None),
"get_contacts": (True, "CONTACTS", {"c1": {}, "c2": {}, "c3": {}}, None),
"get_stats_core": (
True,
"STATS_CORE",
{"battery_mv": 3850, "uptime_secs": 86400, "errors": 0},
None,
),
"get_stats_radio": (
True,
"STATS_RADIO",
{"noise_floor": -115, "last_rssi": -85, "last_snr": 7.5},
None,
),
"get_stats_packets": (True, "STATS_PACKETS", {"recv": 100, "sent": 50}, None),
}
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(
module, "run_command", side_effect=mock_run_command_factory(responses)
),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
await module.collect_companion()
# Verify all expected metrics were collected
assert collected_metrics["battery_mv"] == 3850
assert collected_metrics["uptime_secs"] == 86400
assert collected_metrics["contacts"] == 3 # From get_contacts count
assert collected_metrics["recv"] == 100
assert collected_metrics["sent"] == 50
assert collected_metrics["noise_floor"] == -115
@pytest.mark.asyncio
async def test_telemetry_not_extracted_when_disabled(
self, configured_env, async_context_manager_factory, monkeypatch
):
"""Telemetry metrics should NOT be extracted when TELEMETRY_ENABLED=0 (default)."""
module = load_collect_companion()
collected_metrics = {}
async def mock_run_command(mc, coro, name):
if name == "get_self_telemetry":
# Return telemetry payload with LPP data
return (True, "TELEMETRY", {"lpp": b"\x00\x67\x01\x00"}, None)
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
return (True, "OK", {}, None)
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
await module.collect_companion()
# No telemetry.* keys should be present
telemetry_keys = [k for k in collected_metrics if k.startswith("telemetry.")]
assert len(telemetry_keys) == 0
@pytest.mark.asyncio
async def test_telemetry_extracted_when_enabled(
self, configured_env, async_context_manager_factory, monkeypatch
):
"""Telemetry metrics SHOULD be extracted when TELEMETRY_ENABLED=1."""
# Enable telemetry BEFORE loading the module
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
import meshmon.env
meshmon.env._config = None
module = load_collect_companion()
collected_metrics = {}
# LPP data format: list of dictionaries with type, channel, value
# This matches the format from MeshCore API
lpp_data = [
{"type": "temperature", "channel": 0, "value": 25.5},
]
async def mock_run_command(mc, coro, name):
if name == "get_self_telemetry":
return (True, "TELEMETRY", {"lpp": lpp_data}, None)
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
return (True, "OK", {}, None)
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
exit_code = await module.collect_companion()
assert exit_code == 0
# Telemetry keys should be present
telemetry_keys = [k for k in collected_metrics if k.startswith("telemetry.")]
assert len(telemetry_keys) > 0, f"Expected telemetry keys, got: {collected_metrics.keys()}"
assert "telemetry.temperature.0" in collected_metrics
assert collected_metrics["telemetry.temperature.0"] == 25.5
@pytest.mark.asyncio
async def test_telemetry_extraction_handles_invalid_lpp(
self, configured_env, async_context_manager_factory, monkeypatch
):
"""Telemetry extraction should handle invalid LPP data gracefully."""
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
import meshmon.env
meshmon.env._config = None
module = load_collect_companion()
collected_metrics = {}
async def mock_run_command(mc, coro, name):
if name == "get_self_telemetry":
# Invalid LPP data (too short)
return (True, "TELEMETRY", {"lpp": b"\x00"}, None)
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
return (True, "OK", {}, None)
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
exit_code = await module.collect_companion()
# Should still succeed - just no telemetry extracted
assert exit_code == 0
# No telemetry keys because LPP was invalid
telemetry_keys = [k for k in collected_metrics if k.startswith("telemetry.")]
assert len(telemetry_keys) == 0
class TestPartialSuccessScenarios:
"""Test behavior when only some commands succeed."""
@pytest.mark.asyncio
async def test_succeeds_with_only_stats_core(
self, configured_env, async_context_manager_factory
):
"""Should succeed if only stats_core returns metrics."""
module = load_collect_companion()
collected_metrics = {}
async def mock_run_command(mc, coro, name):
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850, "uptime_secs": 1000}, None)
# All other commands fail
return (False, None, None, "Timeout")
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
exit_code = await module.collect_companion()
# Should succeed because stats_core succeeded and had metrics
assert exit_code == 0
assert collected_metrics["battery_mv"] == 3850
@pytest.mark.asyncio
async def test_succeeds_with_only_contacts(
self, configured_env, async_context_manager_factory
):
"""Should succeed if only contacts command returns data."""
module = load_collect_companion()
collected_metrics = {}
async def mock_run_command(mc, coro, name):
if name == "get_contacts":
return (True, "CONTACTS", {"c1": {}, "c2": {}}, None)
# Stats commands succeed but return no numeric data
if name.startswith("get_stats"):
return (True, "OK", {}, None)
# Other commands succeed
return (True, "OK", {}, None)
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
exit_code = await module.collect_companion()
assert exit_code == 0
assert collected_metrics["contacts"] == 2
@pytest.mark.asyncio
async def test_fails_when_metrics_empty_despite_success(
self, configured_env, async_context_manager_factory
):
"""Should fail if commands succeed but no metrics collected."""
module = load_collect_companion()
async def mock_run_command(mc, coro, name):
# Commands succeed but return empty/non-dict payloads
if name == "get_stats_core":
return (True, "STATS_CORE", None, None) # No payload
if name == "get_stats_radio":
return (True, "STATS_RADIO", "not a dict", None) # Invalid payload
if name == "get_stats_packets":
return (True, "STATS_PACKETS", {}, None) # Empty payload
if name == "get_contacts":
return (False, None, None, "Failed") # Fails
return (True, "OK", {}, None)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
):
exit_code = await module.collect_companion()
# Should fail because no metrics were collected
assert exit_code == 1
class TestExceptionHandling:
"""Test exception handling in the command loop (lines 165-166)."""
@pytest.mark.asyncio
async def test_handles_exception_in_command_loop(
self, configured_env, async_context_manager_factory
):
"""Should catch and log exceptions during command execution."""
module = load_collect_companion()
call_count = 0
async def mock_run_command_with_exception(mc, coro, name):
nonlocal call_count
call_count += 1
if call_count == 3: # Fail on third command
raise RuntimeError("Unexpected network error")
return (True, "OK", {}, None)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command_with_exception),
patch.object(module, "log") as mock_log,
):
exit_code = await module.collect_companion()
# Should have logged the error
error_calls = [c for c in mock_log.error.call_args_list if "Error during collection" in str(c)]
assert len(error_calls) > 0
# Should return 1 because exception interrupted collection
assert exit_code == 1
@pytest.mark.asyncio
async def test_exception_closes_connection_properly(
self, configured_env, async_context_manager_factory
):
"""Context manager should still exit properly after exception."""
module = load_collect_companion()
async def mock_run_command_raise(mc, coro, name):
raise RuntimeError("Connection lost")
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command_raise),
):
await module.collect_companion()
# Verify context manager was properly exited
assert ctx_mock.exited is True
class TestMainEntryPoint:
"""Test the main() entry point behavior."""
def test_main_calls_init_db(self, configured_env):
"""main() should initialize database before collection."""
module = load_collect_companion()
with (
patch.object(module, "init_db") as mock_init,
patch.object(module, "collect_companion", new=MagicMock(return_value=0)),
patch.object(module, "asyncio") as mock_asyncio,
patch.object(module, "sys"),
):
# Patch collect_companion to return a non-coroutine to avoid unawaited coroutine warning
mock_asyncio.run.return_value = 0
module.main()
mock_init.assert_called_once()
def test_main_exits_with_collection_result(self, configured_env):
"""main() should exit with the collection exit code."""
module = load_collect_companion()
with (
patch.object(module, "init_db"),
patch.object(module, "collect_companion", new=MagicMock(return_value=1)),
patch.object(module, "asyncio") as mock_asyncio,
patch.object(module, "sys") as mock_sys,
):
# Patch collect_companion to return a non-coroutine to avoid unawaited coroutine warning
mock_asyncio.run.return_value = 1 # Collection failed
module.main()
mock_sys.exit.assert_called_once_with(1)
def test_main_runs_collect_companion_async(self, configured_env):
"""main() should run collect_companion() with asyncio.run()."""
module = load_collect_companion()
with (
patch.object(module, "init_db"),
patch.object(module, "collect_companion", new=MagicMock(return_value=0)),
patch.object(module, "asyncio") as mock_asyncio,
patch.object(module, "sys"),
):
# Patch collect_companion to return a non-coroutine to avoid unawaited coroutine warning
mock_asyncio.run.return_value = 0
module.main()
# asyncio.run should be called with the return value
mock_asyncio.run.assert_called_once()
class TestDatabaseIntegration:
"""Test that collection actually writes to database."""
@pytest.mark.asyncio
async def test_writes_metrics_to_database(
self, configured_env, initialized_db, async_context_manager_factory, mock_run_command_factory
):
"""Collection should write metrics to database."""
from meshmon.db import get_latest_metrics
module = load_collect_companion()
responses = {
"send_appstart": (True, "SELF_INFO", {}, None),
"send_device_query": (True, "DEVICE_INFO", {}, None),
"get_time": (True, "TIME", {}, None),
"get_self_telemetry": (True, "TELEMETRY", {}, None),
"get_custom_vars": (True, "CUSTOM_VARS", {}, None),
"get_contacts": (True, "CONTACTS", {"c1": {}}, None),
"get_stats_core": (
True,
"STATS_CORE",
{"battery_mv": 3777, "uptime_secs": 12345},
None,
),
"get_stats_radio": (True, "STATS_RADIO", {}, None),
"get_stats_packets": (True, "STATS_PACKETS", {"recv": 999, "sent": 888}, None),
}
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(
module, "run_command", side_effect=mock_run_command_factory(responses)
),
):
exit_code = await module.collect_companion()
assert exit_code == 0
# Verify data was written to database
latest = get_latest_metrics("companion")
assert latest is not None
assert latest["battery_mv"] == 3777
assert latest["recv"] == 999
assert latest["sent"] == 888
@pytest.mark.asyncio
async def test_writes_telemetry_to_database_when_enabled(
self, configured_env, initialized_db, async_context_manager_factory, monkeypatch
):
"""Telemetry should be written to database when enabled."""
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
import meshmon.env
meshmon.env._config = None
from meshmon.db import get_latest_metrics
module = load_collect_companion()
# LPP data format: list of dictionaries with type, channel, value
lpp_data = [
{"type": "temperature", "channel": 0, "value": 25.5},
]
async def mock_run_command(mc, coro, name):
if name == "get_self_telemetry":
return (True, "TELEMETRY", {"lpp": lpp_data}, None)
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
return (True, "OK", {}, None)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
):
exit_code = await module.collect_companion()
assert exit_code == 0
# Verify telemetry was written to database
latest = get_latest_metrics("companion")
assert latest is not None
assert "telemetry.temperature.0" in latest
assert latest["telemetry.temperature.0"] == 25.5