test: add comprehensive pytest test suite with 95% coverage (#29)

* test: add comprehensive pytest test suite with 95% coverage

Add full unit and integration test coverage for the meshcore-stats project:

- 1020 tests covering all modules (db, charts, html, reports, client, etc.)
- 95.95% code coverage with pytest-cov (95% threshold enforced)
- GitHub Actions CI workflow for automated testing on push/PR
- Proper mocking of external dependencies (meshcore, serial, filesystem)
- SVG snapshot infrastructure for chart regression testing
- Integration tests for collection and rendering pipelines

Test organization:
- tests/charts/: Chart rendering and statistics
- tests/client/: MeshCore client and connection handling
- tests/config/: Environment and configuration parsing
- tests/database/: SQLite operations and migrations
- tests/html/: HTML generation and Jinja templates
- tests/reports/: Report generation and formatting
- tests/retry/: Circuit breaker and retry logic
- tests/unit/: Pure unit tests for utilities
- tests/integration/: End-to-end pipeline tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* chore: add test-engineer agent configuration

Add project-local test-engineer agent for pytest test development,
coverage analysis, and test review tasks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* docs: comprehensive test suite review with 956 tests analyzed

Conducted thorough review of all 956 test cases across 47 test files:

- Unit Tests: 338 tests (battery, metrics, log, telemetry, env, charts, html, reports, formatters)
- Config Tests: 53 tests (env loading, config file parsing)
- Database Tests: 115 tests (init, insert, queries, migrations, maintenance, validation)
- Retry Tests: 59 tests (circuit breaker, async retries, factory)
- Charts Tests: 76 tests (transforms, statistics, timeseries, rendering, I/O)
- HTML Tests: 81 tests (site generation, Jinja2, metrics builders, reports index)
- Reports Tests: 149 tests (location, JSON/TXT formatting, aggregation, counter totals)
- Client Tests: 63 tests (contacts, connection, meshcore availability, commands)
- Integration Tests: 22 tests (reports, collection, rendering pipelines)

Results:
- Overall Pass Rate: 99.7% (953/956)
- 3 tests marked for improvement (empty test bodies in client tests)
- 0 tests requiring fixes

Key findings documented in test_review/tests.md including quality
observations, F.I.R.S.T. principle adherence, and recommendations.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test: implement snapshot testing for charts and reports

Add comprehensive snapshot testing infrastructure:

SVG Chart Snapshots:
- Deterministic fixtures with fixed timestamps (2024-01-15 12:00:00)
- Tests for gauge/counter metrics in light/dark themes
- Empty chart and single-point edge cases
- Extended normalize_svg_for_snapshot_full() for reproducible comparisons

TXT Report Snapshots:
- Monthly/yearly report snapshots for repeater and companion
- Empty report handling tests
- Tests in tests/reports/test_snapshots.py

Infrastructure:
- tests/snapshots/conftest.py with shared fixtures
- UPDATE_SNAPSHOTS=1 environment variable for regeneration
- scripts/generate_snapshots.py for batch snapshot generation

Run `UPDATE_SNAPSHOTS=1 pytest tests/charts/test_chart_render.py::TestSvgSnapshots`
to generate initial snapshots.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test: fix SVG normalization and generate initial snapshots

Fix normalize_svg_for_snapshot() to handle:
- clipPath IDs like id="p47c77a2a6e"
- url(#p...) references
- xlink:href="#p..." references
- <dc:date> timestamps

Generated initial snapshot files:
- 7 SVG chart snapshots (gauge, counter, empty, single-point in light/dark)
- 6 TXT report snapshots (monthly/yearly for repeater/companion + empty)

All 13 snapshot tests now pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test: fix SVG normalization to preserve axis rendering

The SVG normalization was replacing all matplotlib-generated IDs with
the same value, causing duplicate IDs that broke SVG rendering:
- Font glyphs, clipPaths, and tick marks all got id="normalized"
- References couldn't resolve to the correct elements
- X and Y axes failed to render in normalized snapshots

Fix uses type-specific prefixes with sequential numbering:
- glyph_N for font glyphs (DejaVuSans-XX patterns)
- clip_N for clipPath definitions (p[0-9a-f]{8,} patterns)
- tick_N for tick marks (m[0-9a-f]{8,} patterns)

This ensures all IDs remain unique while still being deterministic
for snapshot comparison.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* chore: add coverage and pytest artifacts to gitignore

Add .coverage, .coverage.*, htmlcov/, and .pytest_cache/ to prevent
test artifacts from being committed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* style: fix all ruff lint errors across codebase

- Sort and organize imports (I001)
- Use modern type annotations (X | Y instead of Union, collections.abc)
- Remove unused imports (F401)
- Combine nested if statements (SIM102)
- Use ternary operators where appropriate (SIM108)
- Combine nested with statements (SIM117)
- Use contextlib.suppress instead of try-except-pass (SIM105)
- Add noqa comments for intentional SIM115 violations (file locks)
- Add TYPE_CHECKING import for forward references
- Fix exception chaining (B904)

All 1033 tests pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* docs: add TDD workflow and pre-commit requirements to CLAUDE.md

- Add mandatory test-driven development workflow (write tests first)
- Add pre-commit requirements (must run lint and tests before committing)
- Document test organization and running commands
- Document 95% coverage requirement

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: resolve mypy type checking errors with proper structural fixes

- charts.py: Create PeriodConfig dataclass for type-safe period configuration,
  use mdates.date2num() for matplotlib datetime handling, fix x-axis limits
  for single-point charts
- db.py: Add explicit int() conversion with None handling for SQLite returns
- env.py: Add class-level type annotations to Config class
- html.py: Add MetricDisplay TypedDict, fix import order, add proper type
  annotations for table data functions
- meshcore_client.py: Add return type annotation

Update tests to use new dataclass attribute access and regenerate SVG
snapshots. Add mypy step to CLAUDE.md pre-commit requirements.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: cast Jinja2 template.render() to str for mypy

Jinja2's type stubs declare render() as returning Any, but it actually
returns str. Wrap with str() to satisfy mypy's no-any-return check.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ci: improve workflow security and reliability

- test.yml: Pin all actions by SHA, add concurrency control to cancel
  in-progress runs on rapid pushes
- release-please.yml: Pin action by SHA, add 10-minute timeout
- conftest.py: Fix snapshot_base_time to use explicit UTC timezone for
  consistent behavior across CI and local environments

Regenerate SVG snapshots with UTC-aware timestamps.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: add mypy command to permissions in settings.local.json

* test: add comprehensive script tests with coroutine warning fixes

- Add tests/scripts/ with tests for collect_companion, collect_repeater,
  and render scripts (1135 tests total, 96% coverage)
- Fix unawaited coroutine warnings by using AsyncMock properly for async
  functions and async_context_manager_factory fixture for context managers
- Add --cov=scripts to CI workflow and pyproject.toml coverage config
- Omit scripts/generate_snapshots.py from coverage (dev utility)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* docs: migrate claude setup to codex skills

* feat: migrate dependencies to uv (#31)

* fix: run tests through uv

* test: fix ruff lint issues in tests

Consolidate patch context managers and clean unused imports/variables

Use datetime.UTC in snapshot fixtures

* test: avoid unawaited async mocks in entrypoint tests

* ci: replace codecov with github coverage artifacts

Add junit XML output and coverage summary in job output

Upload HTML and XML coverage artifacts (3.12 only) on every run

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Jorijn Schrijvershof
2026-01-08 17:16:53 +01:00
committed by GitHub
parent 45bdf5d6d4
commit a9f6926104
125 changed files with 28005 additions and 448 deletions

View File

@@ -0,0 +1 @@
# Tests for executable scripts

189
tests/scripts/conftest.py Normal file
View File

@@ -0,0 +1,189 @@
"""Script-specific test fixtures."""
import importlib.util
import sys
from contextlib import contextmanager
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
# Ensure scripts can import from src
SCRIPTS_DIR = Path(__file__).parent.parent.parent / "scripts"
SRC_DIR = Path(__file__).parent.parent.parent / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
# Track dynamically loaded script modules for cleanup
_loaded_script_modules: set[str] = set()
def load_script_module(script_name: str):
"""Load a script as a module and track it for cleanup.
Args:
script_name: Name of script file (e.g., "collect_companion.py")
Returns:
Loaded module object
"""
script_path = SCRIPTS_DIR / script_name
module_name = script_name.replace(".py", "")
spec = importlib.util.spec_from_file_location(module_name, script_path)
assert spec is not None, f"Could not load spec for {script_path}"
assert spec.loader is not None, f"No loader for {script_path}"
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
_loaded_script_modules.add(module_name)
spec.loader.exec_module(module)
return module
@pytest.fixture(autouse=True)
def cleanup_script_modules():
"""Clean up dynamically loaded script modules after each test.
This prevents test pollution where module-level state persists
between tests, potentially causing false positives or flaky tests.
"""
# Clear tracking before test
_loaded_script_modules.clear()
yield
# Clean up after test
for module_name in _loaded_script_modules:
if module_name in sys.modules:
del sys.modules[module_name]
_loaded_script_modules.clear()
@pytest.fixture
def scripts_dir():
"""Path to the scripts directory."""
return SCRIPTS_DIR
@contextmanager
def mock_async_context_manager(return_value=None):
"""Create a mock that works as an async context manager.
Usage:
with patch.object(module, "connect_with_lock") as mock_connect:
mock_connect.return_value = mock_async_context_manager(mc)
# or for None return:
mock_connect.return_value = mock_async_context_manager(None)
Args:
return_value: Value to return from __aenter__
Returns:
A mock configured as an async context manager
"""
mock = MagicMock()
mock.__aenter__ = AsyncMock(return_value=return_value)
mock.__aexit__ = AsyncMock(return_value=None)
yield mock
class AsyncContextManagerMock:
"""A class-based async context manager mock for more complex scenarios.
Can be configured with enter/exit callbacks and exception handling.
"""
def __init__(self, return_value=None, exit_exception=None):
"""Initialize the mock.
Args:
return_value: Value to return from __aenter__
exit_exception: Exception to raise in __aexit__ (for testing cleanup)
"""
self.return_value = return_value
self.exit_exception = exit_exception
self.entered = False
self.exited = False
self.exit_args = None
async def __aenter__(self):
self.entered = True
return self.return_value
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.exited = True
self.exit_args = (exc_type, exc_val, exc_tb)
if self.exit_exception:
raise self.exit_exception
return None
@pytest.fixture
def async_context_manager_factory():
"""Factory fixture to create async context manager mocks.
Usage:
def test_something(async_context_manager_factory):
mc = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with patch.object(module, "connect_with_lock", return_value=ctx_mock):
...
"""
def factory(return_value=None, exit_exception=None):
return AsyncContextManagerMock(return_value, exit_exception)
return factory
@pytest.fixture
def mock_repeater_contact():
"""Mock repeater contact for testing."""
return {
"adv_name": "TestRepeater",
"public_key": "abc123def456",
"last_seen": 1234567890,
}
@pytest.fixture
def mock_repeater_status(sample_repeater_metrics):
"""Mock repeater status response."""
return sample_repeater_metrics.copy()
@pytest.fixture
def mock_run_command_factory():
"""Factory to create mock run_command functions with configurable responses.
Usage:
def test_something(mock_run_command_factory):
responses = {
"send_appstart": (True, "SELF_INFO", {}, None),
"get_stats_core": (True, "STATS_CORE", {"battery_mv": 3850}, None),
}
mock_run = mock_run_command_factory(responses)
with patch.object(module, "run_command", side_effect=mock_run):
...
"""
def factory(responses: dict, default_response=None):
"""Create a mock run_command function.
Args:
responses: Dict mapping command names to (ok, evt_type, payload, err) tuples
default_response: Response for commands not in responses dict.
If None, returns (False, None, None, "Unknown command")
"""
if default_response is None:
default_response = (False, None, None, "Unknown command")
async def mock_run_command(mc, coro, name):
return responses.get(name, default_response)
return mock_run_command
return factory

View File

@@ -0,0 +1,641 @@
"""Tests for collect_companion.py script entry point.
These tests verify the actual script behavior, not just the library code.
The script is the entry point that users run - if it breaks, everything breaks.
"""
import inspect
from unittest.mock import MagicMock, patch
import pytest
from tests.scripts.conftest import load_script_module
def load_collect_companion():
"""Load collect_companion.py as a module."""
return load_script_module("collect_companion.py")
class TestCollectCompanionImport:
"""Verify script can be imported without errors."""
def test_imports_successfully(self, configured_env):
"""Script should import without errors."""
module = load_collect_companion()
assert hasattr(module, "main")
assert hasattr(module, "collect_companion")
assert callable(module.main)
def test_collect_companion_is_async(self, configured_env):
"""collect_companion() should be an async function."""
module = load_collect_companion()
assert inspect.iscoroutinefunction(module.collect_companion)
class TestCollectCompanionExitCodes:
"""Test exit code behavior - critical for monitoring."""
@pytest.mark.asyncio
async def test_returns_zero_on_successful_collection(
self, configured_env, async_context_manager_factory, mock_run_command_factory
):
"""Successful collection should return exit code 0."""
module = load_collect_companion()
responses = {
"send_appstart": (True, "SELF_INFO", {}, None),
"send_device_query": (True, "DEVICE_INFO", {}, None),
"get_time": (True, "TIME", {"time": 1234567890}, None),
"get_self_telemetry": (True, "TELEMETRY", {}, None),
"get_custom_vars": (True, "CUSTOM_VARS", {}, None),
"get_contacts": (True, "CONTACTS", {"c1": {}, "c2": {}}, None),
"get_stats_core": (
True,
"STATS_CORE",
{"battery_mv": 3850, "uptime_secs": 86400},
None,
),
"get_stats_radio": (True, "STATS_RADIO", {"noise_floor": -115}, None),
"get_stats_packets": (True, "STATS_PACKETS", {"recv": 100, "sent": 50}, None),
}
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(
module, "run_command", side_effect=mock_run_command_factory(responses)
),
patch.object(module, "insert_metrics", return_value=5),
):
exit_code = await module.collect_companion()
assert exit_code == 0
@pytest.mark.asyncio
async def test_returns_one_on_connection_failure(
self, configured_env, async_context_manager_factory
):
"""Failed connection should return exit code 1."""
module = load_collect_companion()
# Connection returns None (failed)
ctx_mock = async_context_manager_factory(None)
with patch.object(module, "connect_with_lock", return_value=ctx_mock):
exit_code = await module.collect_companion()
assert exit_code == 1
@pytest.mark.asyncio
async def test_returns_one_when_no_commands_succeed(
self, configured_env, async_context_manager_factory
):
"""No successful commands should return exit code 1."""
module = load_collect_companion()
# All commands fail
async def mock_run_command_fail(mc, coro, name):
return (False, None, None, "Command failed")
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command_fail),
):
exit_code = await module.collect_companion()
assert exit_code == 1
@pytest.mark.asyncio
async def test_returns_one_on_database_error(
self, configured_env, async_context_manager_factory, mock_run_command_factory
):
"""Database write failure should return exit code 1."""
module = load_collect_companion()
responses = {
"get_stats_core": (True, "STATS_CORE", {"battery_mv": 3850}, None),
}
# Default to success for other commands
default = (True, "OK", {}, None)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(
module, "run_command", side_effect=mock_run_command_factory(responses, default)
),
patch.object(module, "insert_metrics", side_effect=Exception("DB error")),
):
exit_code = await module.collect_companion()
assert exit_code == 1
class TestCollectCompanionMetrics:
"""Test metric collection behavior."""
@pytest.mark.asyncio
async def test_collects_all_numeric_fields_from_stats(
self, configured_env, async_context_manager_factory, mock_run_command_factory
):
"""Should insert all numeric fields from stats responses."""
module = load_collect_companion()
collected_metrics = {}
responses = {
"send_appstart": (True, "SELF_INFO", {}, None),
"send_device_query": (True, "DEVICE_INFO", {}, None),
"get_time": (True, "TIME", {}, None),
"get_self_telemetry": (True, "TELEMETRY", {}, None),
"get_custom_vars": (True, "CUSTOM_VARS", {}, None),
"get_contacts": (True, "CONTACTS", {"c1": {}, "c2": {}, "c3": {}}, None),
"get_stats_core": (
True,
"STATS_CORE",
{"battery_mv": 3850, "uptime_secs": 86400, "errors": 0},
None,
),
"get_stats_radio": (
True,
"STATS_RADIO",
{"noise_floor": -115, "last_rssi": -85, "last_snr": 7.5},
None,
),
"get_stats_packets": (True, "STATS_PACKETS", {"recv": 100, "sent": 50}, None),
}
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(
module, "run_command", side_effect=mock_run_command_factory(responses)
),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
await module.collect_companion()
# Verify all expected metrics were collected
assert collected_metrics["battery_mv"] == 3850
assert collected_metrics["uptime_secs"] == 86400
assert collected_metrics["contacts"] == 3 # From get_contacts count
assert collected_metrics["recv"] == 100
assert collected_metrics["sent"] == 50
assert collected_metrics["noise_floor"] == -115
@pytest.mark.asyncio
async def test_telemetry_not_extracted_when_disabled(
self, configured_env, async_context_manager_factory, monkeypatch
):
"""Telemetry metrics should NOT be extracted when TELEMETRY_ENABLED=0 (default)."""
module = load_collect_companion()
collected_metrics = {}
async def mock_run_command(mc, coro, name):
if name == "get_self_telemetry":
# Return telemetry payload with LPP data
return (True, "TELEMETRY", {"lpp": b"\x00\x67\x01\x00"}, None)
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
return (True, "OK", {}, None)
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
await module.collect_companion()
# No telemetry.* keys should be present
telemetry_keys = [k for k in collected_metrics if k.startswith("telemetry.")]
assert len(telemetry_keys) == 0
@pytest.mark.asyncio
async def test_telemetry_extracted_when_enabled(
self, configured_env, async_context_manager_factory, monkeypatch
):
"""Telemetry metrics SHOULD be extracted when TELEMETRY_ENABLED=1."""
# Enable telemetry BEFORE loading the module
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
import meshmon.env
meshmon.env._config = None
module = load_collect_companion()
collected_metrics = {}
# LPP data format: list of dictionaries with type, channel, value
# This matches the format from MeshCore API
lpp_data = [
{"type": "temperature", "channel": 0, "value": 25.5},
]
async def mock_run_command(mc, coro, name):
if name == "get_self_telemetry":
return (True, "TELEMETRY", {"lpp": lpp_data}, None)
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
return (True, "OK", {}, None)
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
exit_code = await module.collect_companion()
assert exit_code == 0
# Telemetry keys should be present
telemetry_keys = [k for k in collected_metrics if k.startswith("telemetry.")]
assert len(telemetry_keys) > 0, f"Expected telemetry keys, got: {collected_metrics.keys()}"
assert "telemetry.temperature.0" in collected_metrics
assert collected_metrics["telemetry.temperature.0"] == 25.5
@pytest.mark.asyncio
async def test_telemetry_extraction_handles_invalid_lpp(
self, configured_env, async_context_manager_factory, monkeypatch
):
"""Telemetry extraction should handle invalid LPP data gracefully."""
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
import meshmon.env
meshmon.env._config = None
module = load_collect_companion()
collected_metrics = {}
async def mock_run_command(mc, coro, name):
if name == "get_self_telemetry":
# Invalid LPP data (too short)
return (True, "TELEMETRY", {"lpp": b"\x00"}, None)
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
return (True, "OK", {}, None)
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
exit_code = await module.collect_companion()
# Should still succeed - just no telemetry extracted
assert exit_code == 0
# No telemetry keys because LPP was invalid
telemetry_keys = [k for k in collected_metrics if k.startswith("telemetry.")]
assert len(telemetry_keys) == 0
class TestPartialSuccessScenarios:
"""Test behavior when only some commands succeed."""
@pytest.mark.asyncio
async def test_succeeds_with_only_stats_core(
self, configured_env, async_context_manager_factory
):
"""Should succeed if only stats_core returns metrics."""
module = load_collect_companion()
collected_metrics = {}
async def mock_run_command(mc, coro, name):
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850, "uptime_secs": 1000}, None)
# All other commands fail
return (False, None, None, "Timeout")
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
exit_code = await module.collect_companion()
# Should succeed because stats_core succeeded and had metrics
assert exit_code == 0
assert collected_metrics["battery_mv"] == 3850
@pytest.mark.asyncio
async def test_succeeds_with_only_contacts(
self, configured_env, async_context_manager_factory
):
"""Should succeed if only contacts command returns data."""
module = load_collect_companion()
collected_metrics = {}
async def mock_run_command(mc, coro, name):
if name == "get_contacts":
return (True, "CONTACTS", {"c1": {}, "c2": {}}, None)
# Stats commands succeed but return no numeric data
if name.startswith("get_stats"):
return (True, "OK", {}, None)
# Other commands succeed
return (True, "OK", {}, None)
def capture_metrics(ts, role, metrics, conn=None):
collected_metrics.update(metrics)
return len(metrics)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "insert_metrics", side_effect=capture_metrics),
):
exit_code = await module.collect_companion()
assert exit_code == 0
assert collected_metrics["contacts"] == 2
@pytest.mark.asyncio
async def test_fails_when_metrics_empty_despite_success(
self, configured_env, async_context_manager_factory
):
"""Should fail if commands succeed but no metrics collected."""
module = load_collect_companion()
async def mock_run_command(mc, coro, name):
# Commands succeed but return empty/non-dict payloads
if name == "get_stats_core":
return (True, "STATS_CORE", None, None) # No payload
if name == "get_stats_radio":
return (True, "STATS_RADIO", "not a dict", None) # Invalid payload
if name == "get_stats_packets":
return (True, "STATS_PACKETS", {}, None) # Empty payload
if name == "get_contacts":
return (False, None, None, "Failed") # Fails
return (True, "OK", {}, None)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
):
exit_code = await module.collect_companion()
# Should fail because no metrics were collected
assert exit_code == 1
class TestExceptionHandling:
"""Test exception handling in the command loop (lines 165-166)."""
@pytest.mark.asyncio
async def test_handles_exception_in_command_loop(
self, configured_env, async_context_manager_factory
):
"""Should catch and log exceptions during command execution."""
module = load_collect_companion()
call_count = 0
async def mock_run_command_with_exception(mc, coro, name):
nonlocal call_count
call_count += 1
if call_count == 3: # Fail on third command
raise RuntimeError("Unexpected network error")
return (True, "OK", {}, None)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command_with_exception),
patch.object(module, "log") as mock_log,
):
exit_code = await module.collect_companion()
# Should have logged the error
error_calls = [c for c in mock_log.error.call_args_list if "Error during collection" in str(c)]
assert len(error_calls) > 0
# Should return 1 because exception interrupted collection
assert exit_code == 1
@pytest.mark.asyncio
async def test_exception_closes_connection_properly(
self, configured_env, async_context_manager_factory
):
"""Context manager should still exit properly after exception."""
module = load_collect_companion()
async def mock_run_command_raise(mc, coro, name):
raise RuntimeError("Connection lost")
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command_raise),
):
await module.collect_companion()
# Verify context manager was properly exited
assert ctx_mock.exited is True
class TestMainEntryPoint:
"""Test the main() entry point behavior."""
def test_main_calls_init_db(self, configured_env):
"""main() should initialize database before collection."""
module = load_collect_companion()
with (
patch.object(module, "init_db") as mock_init,
patch.object(module, "collect_companion", new=MagicMock(return_value=0)),
patch.object(module, "asyncio") as mock_asyncio,
patch.object(module, "sys"),
):
# Patch collect_companion to return a non-coroutine to avoid unawaited coroutine warning
mock_asyncio.run.return_value = 0
module.main()
mock_init.assert_called_once()
def test_main_exits_with_collection_result(self, configured_env):
"""main() should exit with the collection exit code."""
module = load_collect_companion()
with (
patch.object(module, "init_db"),
patch.object(module, "collect_companion", new=MagicMock(return_value=1)),
patch.object(module, "asyncio") as mock_asyncio,
patch.object(module, "sys") as mock_sys,
):
# Patch collect_companion to return a non-coroutine to avoid unawaited coroutine warning
mock_asyncio.run.return_value = 1 # Collection failed
module.main()
mock_sys.exit.assert_called_once_with(1)
def test_main_runs_collect_companion_async(self, configured_env):
"""main() should run collect_companion() with asyncio.run()."""
module = load_collect_companion()
with (
patch.object(module, "init_db"),
patch.object(module, "collect_companion", new=MagicMock(return_value=0)),
patch.object(module, "asyncio") as mock_asyncio,
patch.object(module, "sys"),
):
# Patch collect_companion to return a non-coroutine to avoid unawaited coroutine warning
mock_asyncio.run.return_value = 0
module.main()
# asyncio.run should be called with the return value
mock_asyncio.run.assert_called_once()
class TestDatabaseIntegration:
"""Test that collection actually writes to database."""
@pytest.mark.asyncio
async def test_writes_metrics_to_database(
self, configured_env, initialized_db, async_context_manager_factory, mock_run_command_factory
):
"""Collection should write metrics to database."""
from meshmon.db import get_latest_metrics
module = load_collect_companion()
responses = {
"send_appstart": (True, "SELF_INFO", {}, None),
"send_device_query": (True, "DEVICE_INFO", {}, None),
"get_time": (True, "TIME", {}, None),
"get_self_telemetry": (True, "TELEMETRY", {}, None),
"get_custom_vars": (True, "CUSTOM_VARS", {}, None),
"get_contacts": (True, "CONTACTS", {"c1": {}}, None),
"get_stats_core": (
True,
"STATS_CORE",
{"battery_mv": 3777, "uptime_secs": 12345},
None,
),
"get_stats_radio": (True, "STATS_RADIO", {}, None),
"get_stats_packets": (True, "STATS_PACKETS", {"recv": 999, "sent": 888}, None),
}
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(
module, "run_command", side_effect=mock_run_command_factory(responses)
),
):
exit_code = await module.collect_companion()
assert exit_code == 0
# Verify data was written to database
latest = get_latest_metrics("companion")
assert latest is not None
assert latest["battery_mv"] == 3777
assert latest["recv"] == 999
assert latest["sent"] == 888
@pytest.mark.asyncio
async def test_writes_telemetry_to_database_when_enabled(
self, configured_env, initialized_db, async_context_manager_factory, monkeypatch
):
"""Telemetry should be written to database when enabled."""
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
import meshmon.env
meshmon.env._config = None
from meshmon.db import get_latest_metrics
module = load_collect_companion()
# LPP data format: list of dictionaries with type, channel, value
lpp_data = [
{"type": "temperature", "channel": 0, "value": 25.5},
]
async def mock_run_command(mc, coro, name):
if name == "get_self_telemetry":
return (True, "TELEMETRY", {"lpp": lpp_data}, None)
if name == "get_stats_core":
return (True, "STATS_CORE", {"battery_mv": 3850}, None)
return (True, "OK", {}, None)
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
):
exit_code = await module.collect_companion()
assert exit_code == 0
# Verify telemetry was written to database
latest = get_latest_metrics("companion")
assert latest is not None
assert "telemetry.temperature.0" in latest
assert latest["telemetry.temperature.0"] == 25.5

View File

@@ -0,0 +1,867 @@
"""Tests for collect_repeater.py script entry point.
These tests verify the actual script behavior, including:
- Finding repeater contact by name or key prefix
- Circuit breaker integration
- Exit codes for monitoring
- Database writes
"""
import inspect
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from tests.scripts.conftest import load_script_module
def load_collect_repeater():
"""Load collect_repeater.py as a module."""
return load_script_module("collect_repeater.py")
class TestCollectRepeaterImport:
"""Verify script can be imported without errors."""
def test_imports_successfully(self, configured_env):
"""Script should import without errors."""
module = load_collect_repeater()
assert hasattr(module, "main")
assert hasattr(module, "collect_repeater")
assert hasattr(module, "find_repeater_contact")
assert hasattr(module, "query_repeater_with_retry")
assert callable(module.main)
def test_collect_repeater_is_async(self, configured_env):
"""collect_repeater() should be an async function."""
module = load_collect_repeater()
assert inspect.iscoroutinefunction(module.collect_repeater)
def test_find_repeater_contact_is_async(self, configured_env):
"""find_repeater_contact() should be an async function."""
module = load_collect_repeater()
assert inspect.iscoroutinefunction(module.find_repeater_contact)
class TestFindRepeaterContact:
"""Test the find_repeater_contact function."""
@pytest.mark.asyncio
async def test_finds_contact_by_name(self, configured_env, monkeypatch):
"""Should find repeater by advertised name."""
monkeypatch.setenv("REPEATER_NAME", "MyRepeater")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mc = MagicMock()
mc.commands = MagicMock()
mc.contacts = {"abc123": {"adv_name": "MyRepeater", "public_key": "abc123"}}
with (
patch.object(module, "run_command") as mock_run,
patch.object(module, "get_contact_by_name") as mock_get,
):
mock_run.return_value = (True, "CONTACTS", mc.contacts, None)
mock_get.return_value = mc.contacts["abc123"]
contact = await module.find_repeater_contact(mc)
assert contact is not None
assert contact["adv_name"] == "MyRepeater"
mock_get.assert_called_once_with(mc, "MyRepeater")
@pytest.mark.asyncio
async def test_finds_contact_by_key_prefix(self, configured_env, monkeypatch):
"""Should find repeater by public key prefix when name not set."""
monkeypatch.setenv("REPEATER_KEY_PREFIX", "abc123")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mc = MagicMock()
mc.commands = MagicMock()
mc.contacts = {"abc123def456": {"adv_name": "SomeNode", "public_key": "abc123def456"}}
with (
patch.object(module, "run_command") as mock_run,
patch.object(module, "get_contact_by_name", return_value=None),
patch.object(module, "get_contact_by_key_prefix") as mock_get,
):
mock_run.return_value = (True, "CONTACTS", mc.contacts, None)
mock_get.return_value = mc.contacts["abc123def456"]
contact = await module.find_repeater_contact(mc)
assert contact is not None
mock_get.assert_called_once()
@pytest.mark.asyncio
async def test_fallback_to_manual_name_search(self, configured_env, monkeypatch):
"""Should fallback to manual name search in payload dict."""
monkeypatch.setenv("REPEATER_NAME", "ManualFind")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mc = MagicMock()
mc.commands = MagicMock()
contacts_dict = {"xyz789": {"adv_name": "ManualFind", "public_key": "xyz789"}}
with (
patch.object(module, "run_command") as mock_run,
patch.object(module, "get_contact_by_name", return_value=None),
):
mock_run.return_value = (True, "CONTACTS", contacts_dict, None)
# get_contact_by_name returns None, forcing manual search
contact = await module.find_repeater_contact(mc)
assert contact is not None
assert contact["adv_name"] == "ManualFind"
@pytest.mark.asyncio
async def test_case_insensitive_name_match(self, configured_env, monkeypatch):
"""Name search should be case-insensitive."""
monkeypatch.setenv("REPEATER_NAME", "myrepeater") # lowercase
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mc = MagicMock()
mc.commands = MagicMock()
contacts_dict = {"key1": {"adv_name": "MyRepeater", "public_key": "key1"}} # Mixed case
with (
patch.object(module, "run_command") as mock_run,
patch.object(module, "get_contact_by_name", return_value=None),
):
mock_run.return_value = (True, "CONTACTS", contacts_dict, None)
contact = await module.find_repeater_contact(mc)
assert contact is not None
@pytest.mark.asyncio
async def test_returns_none_when_not_found(self, configured_env, monkeypatch):
"""Should return None when repeater not in contacts."""
monkeypatch.setenv("REPEATER_NAME", "NonExistent")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mc = MagicMock()
mc.commands = MagicMock()
with (
patch.object(module, "run_command") as mock_run,
patch.object(module, "get_contact_by_name", return_value=None),
):
mock_run.return_value = (True, "CONTACTS", {}, None)
contact = await module.find_repeater_contact(mc)
assert contact is None
@pytest.mark.asyncio
async def test_returns_none_when_get_contacts_fails(self, configured_env, monkeypatch):
"""Should return None when get_contacts command fails."""
monkeypatch.setenv("REPEATER_NAME", "AnyName")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mc = MagicMock()
mc.commands = MagicMock()
with patch.object(module, "run_command") as mock_run:
mock_run.return_value = (False, None, None, "Connection failed")
contact = await module.find_repeater_contact(mc)
assert contact is None
class TestCircuitBreakerIntegration:
"""Test circuit breaker integration in collect_repeater."""
@pytest.mark.asyncio
async def test_skips_collection_when_circuit_open(self, configured_env):
"""Should return 0 and skip collection when circuit breaker is open."""
module = load_collect_repeater()
# Create mock circuit breaker that is open
mock_cb = MagicMock()
mock_cb.is_open.return_value = True
mock_cb.cooldown_remaining.return_value = 1800
with patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb):
exit_code = await module.collect_repeater()
# Should return 0 (not an error, just skipped)
assert exit_code == 0
# Should not have tried to connect
mock_cb.is_open.assert_called_once()
@pytest.mark.asyncio
async def test_records_success_on_successful_status(
self, configured_env, monkeypatch, async_context_manager_factory
):
"""Should record success when status query succeeds."""
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "query_repeater_with_retry") as mock_query,
patch.object(module, "insert_metrics", return_value=2),
):
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_query.return_value = (True, {"bat": 3850, "uptime": 86400}, None)
await module.collect_repeater()
mock_cb.record_success.assert_called_once()
@pytest.mark.asyncio
async def test_records_failure_on_status_timeout(
self, configured_env, monkeypatch, async_context_manager_factory
):
"""Should record failure when status query times out."""
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "query_repeater_with_retry") as mock_query,
):
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_query.return_value = (False, None, "Timeout")
exit_code = await module.collect_repeater()
mock_cb.record_failure.assert_called_once()
assert exit_code == 1
class TestCollectRepeaterExitCodes:
"""Test exit code behavior - critical for monitoring."""
@pytest.mark.asyncio
async def test_returns_zero_on_successful_collection(
self, configured_env, monkeypatch, async_context_manager_factory
):
"""Successful collection should return exit code 0."""
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "query_repeater_with_retry") as mock_query,
patch.object(module, "insert_metrics", return_value=3),
):
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_query.return_value = (
True,
{"bat": 3850, "uptime": 86400, "nb_recv": 100},
None,
)
exit_code = await module.collect_repeater()
assert exit_code == 0
@pytest.mark.asyncio
async def test_returns_one_on_connection_failure(
self, configured_env, async_context_manager_factory
):
"""Failed connection should return exit code 1."""
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
ctx_mock = async_context_manager_factory(None) # Connection returns None
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
):
exit_code = await module.collect_repeater()
assert exit_code == 1
@pytest.mark.asyncio
async def test_returns_one_when_repeater_not_found(
self, configured_env, monkeypatch, async_context_manager_factory
):
"""Should return 1 when repeater contact not found."""
monkeypatch.setenv("REPEATER_NAME", "NonExistent")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
):
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = None # Not found
exit_code = await module.collect_repeater()
assert exit_code == 1
class TestQueryRepeaterWithRetry:
"""Test the retry wrapper for repeater queries."""
@pytest.mark.asyncio
async def test_returns_success_on_first_try(self, configured_env):
"""Should return success when command succeeds immediately."""
module = load_collect_repeater()
mc = MagicMock()
contact = {"adv_name": "Test"}
async def successful_command():
return {"bat": 3850}
with patch.object(module, "with_retries") as mock_retries:
mock_retries.return_value = (True, {"bat": 3850}, None)
success, payload, err = await module.query_repeater_with_retry(
mc, contact, "test_cmd", successful_command
)
assert success is True
assert payload == {"bat": 3850}
assert err is None
@pytest.mark.asyncio
async def test_returns_failure_after_retries_exhausted(self, configured_env):
"""Should return failure when all retries fail."""
module = load_collect_repeater()
mc = MagicMock()
contact = {"adv_name": "Test"}
async def failing_command():
raise Exception("Timeout")
with patch.object(module, "with_retries") as mock_retries:
mock_retries.return_value = (False, None, Exception("Timeout"))
success, payload, err = await module.query_repeater_with_retry(
mc, contact, "test_cmd", failing_command
)
assert success is False
assert payload is None
assert "Timeout" in err
class TestMainEntryPoint:
"""Test the main() entry point behavior."""
def test_main_calls_init_db(self, configured_env):
"""main() should initialize database before collection."""
module = load_collect_repeater()
with (
patch.object(module, "init_db") as mock_init,
patch.object(module, "collect_repeater", new=MagicMock(return_value=0)),
patch.object(module, "asyncio") as mock_asyncio,
patch.object(module, "sys"),
):
# Patch collect_repeater to return a non-coroutine to avoid unawaited coroutine warning
mock_asyncio.run.return_value = 0
module.main()
mock_init.assert_called_once()
def test_main_exits_with_collection_result(self, configured_env):
"""main() should exit with the collection exit code."""
module = load_collect_repeater()
with (
patch.object(module, "init_db"),
patch.object(module, "collect_repeater", new=MagicMock(return_value=1)),
patch.object(module, "asyncio") as mock_asyncio,
patch.object(module, "sys") as mock_sys,
):
# Patch collect_repeater to return a non-coroutine to avoid unawaited coroutine warning
mock_asyncio.run.return_value = 1 # Collection failed
module.main()
mock_sys.exit.assert_called_once_with(1)
class TestDatabaseIntegration:
"""Test that collection actually writes to database."""
@pytest.mark.asyncio
async def test_writes_metrics_to_database(
self, configured_env, initialized_db, monkeypatch, async_context_manager_factory
):
"""Collection should write metrics to database."""
from meshmon.db import get_latest_metrics
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "query_repeater_with_retry") as mock_query,
):
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_query.return_value = (
True,
{"bat": 3777, "uptime": 99999, "nb_recv": 1234, "nb_sent": 567},
None,
)
exit_code = await module.collect_repeater()
assert exit_code == 0
# Verify data was written to database
latest = get_latest_metrics("repeater")
assert latest is not None
assert latest["bat"] == 3777
assert latest["nb_recv"] == 1234
class TestFindRepeaterContactEdgeCases:
"""Test edge cases in find_repeater_contact."""
@pytest.mark.asyncio
async def test_finds_contact_in_payload_dict(self, configured_env, monkeypatch):
"""Should find contact in payload dict when mc.contacts is empty."""
monkeypatch.setenv("REPEATER_NAME", "PayloadRepeater")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mc = MagicMock()
mc.commands = MagicMock()
mc.contacts = {} # Empty contacts attribute
payload_dict = {"pk123": {"adv_name": "PayloadRepeater", "public_key": "pk123"}}
with (
patch.object(module, "run_command") as mock_run,
patch.object(module, "get_contact_by_name", return_value=None),
):
# Return contacts in payload
mock_run.return_value = (True, "CONTACTS", payload_dict, None)
# get_contact_by_name returns None, forcing manual search in payload
contact = await module.find_repeater_contact(mc)
assert contact is not None
assert contact["adv_name"] == "PayloadRepeater"
@pytest.mark.asyncio
async def test_finds_contact_by_key_prefix_manual_search(self, configured_env, monkeypatch):
"""Should find contact by key prefix via manual search in payload."""
monkeypatch.setenv("REPEATER_KEY_PREFIX", "abc")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mc = MagicMock()
mc.commands = MagicMock()
contacts_dict = {"abc123xyz": {"adv_name": "KeyPrefixNode"}}
with (
patch.object(module, "run_command") as mock_run,
patch.object(module, "get_contact_by_name", return_value=None),
patch.object(module, "get_contact_by_key_prefix", return_value=None),
):
mock_run.return_value = (True, "CONTACTS", contacts_dict, None)
# Both helper functions return None, forcing manual search
contact = await module.find_repeater_contact(mc)
assert contact is not None
assert contact["adv_name"] == "KeyPrefixNode"
@pytest.mark.asyncio
async def test_prints_available_contacts_when_not_found(self, configured_env, monkeypatch):
"""Should print available contacts when repeater not found."""
monkeypatch.setenv("REPEATER_NAME", "NonExistent")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mc = MagicMock()
mc.commands = MagicMock()
contacts_dict = {
"key1": {"adv_name": "Node1", "name": "alt1"},
"key2": {"adv_name": "Node2"},
"key3": {}, # No name fields
}
with (
patch.object(module, "run_command") as mock_run,
patch.object(module, "get_contact_by_name", return_value=None),
patch.object(module, "log") as mock_log,
):
mock_run.return_value = (True, "CONTACTS", contacts_dict, None)
contact = await module.find_repeater_contact(mc)
assert contact is None
# Should have logged available contacts
mock_log.info.assert_called()
class TestLoginFunctionality:
"""Test optional login functionality."""
@pytest.mark.asyncio
async def test_attempts_login_when_password_set(
self, configured_env, monkeypatch, async_context_manager_factory
):
"""Should attempt login when REPEATER_PASSWORD is set."""
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
monkeypatch.setenv("REPEATER_PASSWORD", "secret123")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
mc.commands.send_login = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "extract_contact_info") as mock_extract,
patch.object(module, "query_repeater_with_retry") as mock_query,
patch.object(module, "insert_metrics", return_value=1),
):
# Return success for all commands
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_extract.return_value = {"adv_name": "TestRepeater"}
mock_query.return_value = (True, {"bat": 3850}, None)
await module.collect_repeater()
# Verify login was attempted (run_command called with send_login)
login_calls = [c for c in mock_run.call_args_list if c[0][2] == "send_login"]
assert len(login_calls) == 1
@pytest.mark.asyncio
async def test_handles_login_exception(
self, configured_env, monkeypatch, async_context_manager_factory
):
"""Should handle exception during login gracefully."""
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
monkeypatch.setenv("REPEATER_PASSWORD", "secret123")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
mc.commands.send_login = MagicMock(side_effect=Exception("Login not supported"))
ctx_mock = async_context_manager_factory(mc)
call_count = 0
async def mock_run_command(mc, coro, name):
nonlocal call_count
call_count += 1
if name == "send_login":
raise Exception("Login not supported")
return (True, "OK", {}, None)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command", side_effect=mock_run_command),
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "extract_contact_info") as mock_extract,
patch.object(module, "query_repeater_with_retry") as mock_query,
patch.object(module, "insert_metrics", return_value=1),
):
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_extract.return_value = {"adv_name": "TestRepeater"}
mock_query.return_value = (True, {"bat": 3850}, None)
# Should not raise - login failure should be handled
exit_code = await module.collect_repeater()
assert exit_code == 0
class TestTelemetryCollection:
"""Test telemetry collection when enabled."""
@pytest.mark.asyncio
async def test_collects_telemetry_when_enabled(
self, configured_env, monkeypatch, initialized_db, async_context_manager_factory
):
"""Should collect telemetry when TELEMETRY_ENABLED=1."""
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "extract_contact_info") as mock_extract,
patch.object(module, "query_repeater_with_retry") as mock_query,
patch.object(
module,
"with_retries",
new=AsyncMock(return_value=(True, {"lpp": b"\x00\x67\x01\x00"}, None)),
),
patch.object(module, "extract_lpp_from_payload") as mock_lpp,
patch.object(module, "extract_telemetry_metrics") as mock_telem,
):
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_extract.return_value = {"adv_name": "TestRepeater"}
mock_query.return_value = (True, {"bat": 3850}, None)
mock_lpp.return_value = {"temperature": [(0, 25.5)]}
mock_telem.return_value = {"telemetry.temperature.0": 25.5}
exit_code = await module.collect_repeater()
assert exit_code == 0
# Verify telemetry was processed
mock_lpp.assert_called_once()
mock_telem.assert_called_once()
@pytest.mark.asyncio
async def test_handles_telemetry_failure_gracefully(
self, configured_env, monkeypatch, async_context_manager_factory
):
"""Should continue when telemetry collection fails."""
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
monkeypatch.setenv("TELEMETRY_ENABLED", "1")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "extract_contact_info") as mock_extract,
patch.object(module, "query_repeater_with_retry") as mock_query,
patch.object(module, "insert_metrics", return_value=1),
patch.object(
module,
"with_retries",
new=AsyncMock(return_value=(False, None, Exception("Timeout"))),
),
):
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_extract.return_value = {"adv_name": "TestRepeater"}
mock_query.return_value = (True, {"bat": 3850}, None)
# Should still succeed (status metrics were saved)
exit_code = await module.collect_repeater()
assert exit_code == 0
class TestDatabaseErrorHandling:
"""Test database error handling."""
@pytest.mark.asyncio
async def test_returns_one_on_status_db_error(
self, configured_env, monkeypatch, async_context_manager_factory
):
"""Should return 1 when status metrics DB write fails."""
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "extract_contact_info") as mock_extract,
patch.object(module, "query_repeater_with_retry") as mock_query,
patch.object(module, "insert_metrics", side_effect=Exception("DB error")),
):
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_extract.return_value = {"adv_name": "TestRepeater"}
mock_query.return_value = (True, {"bat": 3850}, None)
exit_code = await module.collect_repeater()
assert exit_code == 1
class TestExceptionHandling:
"""Test general exception handling."""
@pytest.mark.asyncio
async def test_records_failure_on_exception(
self, configured_env, monkeypatch, async_context_manager_factory
):
"""Should record circuit breaker failure on unexpected exception."""
monkeypatch.setenv("REPEATER_NAME", "TestRepeater")
import meshmon.env
meshmon.env._config = None
module = load_collect_repeater()
mock_cb = MagicMock()
mock_cb.is_open.return_value = False
mc = MagicMock()
mc.commands = MagicMock()
ctx_mock = async_context_manager_factory(mc)
with (
patch.object(module, "get_repeater_circuit_breaker", return_value=mock_cb),
patch.object(module, "connect_with_lock", return_value=ctx_mock),
patch.object(module, "run_command") as mock_run,
patch.object(module, "find_repeater_contact") as mock_find,
patch.object(module, "extract_contact_info") as mock_extract,
):
mock_run.return_value = (True, "OK", {}, None)
mock_find.return_value = {"adv_name": "TestRepeater"}
mock_extract.side_effect = Exception("Unexpected error")
await module.collect_repeater()
# Circuit breaker should record failure
mock_cb.record_failure.assert_called_once()

View File

@@ -0,0 +1,632 @@
"""Tests for render script entry points.
These tests verify the render_charts.py, render_site.py, and render_reports.py
scripts can be imported and their main() functions work correctly.
"""
import json
from pathlib import Path
from unittest.mock import MagicMock, patch
from tests.scripts.conftest import load_script_module
def load_script(script_name: str):
"""Load a script as a module."""
return load_script_module(script_name)
class TestRenderChartsImport:
"""Verify render_charts.py imports correctly."""
def test_imports_successfully(self, configured_env):
"""Script should import without errors."""
module = load_script("render_charts.py")
assert hasattr(module, "main")
assert callable(module.main)
def test_main_calls_init_db(self, configured_env):
"""main() should initialize database."""
module = load_script("render_charts.py")
with (
patch.object(module, "init_db") as mock_init,
patch.object(module, "get_metric_count", return_value=0),
):
module.main()
mock_init.assert_called_once()
def test_main_checks_metric_counts(self, configured_env):
"""main() should check for data before rendering."""
module = load_script("render_charts.py")
with (
patch.object(module, "init_db"),
patch.object(module, "get_metric_count") as mock_count,
):
mock_count.return_value = 0
module.main()
# Should check both companion and repeater
assert mock_count.call_count == 2
def test_main_renders_when_data_exists(self, configured_env):
"""main() should render charts when data exists."""
module = load_script("render_charts.py")
with (
patch.object(module, "init_db"),
patch.object(module, "get_metric_count", return_value=100),
patch.object(module, "render_all_charts") as mock_render,
patch.object(module, "save_chart_stats"),
):
mock_render.return_value = (["chart1.svg"], {"bat": {}})
module.main()
# Should render for both roles
assert mock_render.call_count == 2
class TestRenderSiteImport:
"""Verify render_site.py imports correctly."""
def test_imports_successfully(self, configured_env):
"""Script should import without errors."""
module = load_script("render_site.py")
assert hasattr(module, "main")
assert callable(module.main)
def test_main_calls_init_db(self, configured_env):
"""main() should initialize database."""
module = load_script("render_site.py")
with (
patch.object(module, "init_db") as mock_init,
patch.object(module, "get_latest_metrics", return_value=None),
patch.object(module, "write_site", return_value=[]),
):
module.main()
mock_init.assert_called_once()
def test_main_loads_latest_metrics(self, configured_env):
"""main() should load latest metrics for both roles."""
module = load_script("render_site.py")
with (
patch.object(module, "init_db"),
patch.object(module, "get_latest_metrics") as mock_get,
patch.object(module, "write_site", return_value=[]),
):
mock_get.return_value = {"battery_mv": 3850}
module.main()
# Should get metrics for both companion and repeater
assert mock_get.call_count == 2
def test_main_calls_write_site(self, configured_env):
"""main() should call write_site with metrics."""
module = load_script("render_site.py")
companion_metrics = {"battery_mv": 3850, "ts": 12345}
repeater_metrics = {"bat": 3900, "ts": 12346}
with (
patch.object(module, "init_db"),
patch.object(module, "get_latest_metrics") as mock_get,
patch.object(module, "write_site") as mock_write,
):
mock_get.side_effect = [companion_metrics, repeater_metrics]
mock_write.return_value = ["day.html", "week.html"]
module.main()
mock_write.assert_called_once_with(companion_metrics, repeater_metrics)
def test_creates_html_files_for_all_periods(self, configured_env, initialized_db, tmp_path):
"""Should create HTML files for day/week/month/year periods."""
module = load_script("render_site.py")
out_dir = configured_env["out_dir"]
# Use real write_site but mock the templates to avoid complex setup
with (
patch.object(module, "init_db"),
patch.object(module, "get_latest_metrics") as mock_get,
):
mock_get.return_value = {"battery_mv": 3850, "ts": 12345}
# Let write_site run - it will create the files
module.main()
# Verify HTML files exist and have content
for period in ["day", "week", "month", "year"]:
html_file = out_dir / f"{period}.html"
assert html_file.exists(), f"{period}.html should exist"
content = html_file.read_text()
assert len(content) > 0, f"{period}.html should have content"
assert "<!DOCTYPE html>" in content or "<html" in content, f"{period}.html should be valid HTML"
class TestRenderReportsImport:
"""Verify render_reports.py imports correctly."""
def test_imports_successfully(self, configured_env):
"""Script should import without errors."""
module = load_script("render_reports.py")
assert hasattr(module, "main")
assert hasattr(module, "safe_write")
assert hasattr(module, "get_node_name")
assert hasattr(module, "get_location")
assert hasattr(module, "render_monthly_report")
assert hasattr(module, "render_yearly_report")
assert hasattr(module, "build_reports_index_data")
assert callable(module.main)
def test_main_calls_init_db(self, configured_env):
"""main() should initialize database."""
module = load_script("render_reports.py")
with (
patch.object(module, "init_db") as mock_init,
patch.object(module, "get_available_periods", return_value=[]),
patch.object(module, "build_reports_index_data", return_value=[]),
patch.object(module, "render_reports_index", return_value="<html>"),
patch.object(module, "safe_write", return_value=True),
):
module.main()
mock_init.assert_called_once()
def test_main_processes_both_roles(self, configured_env):
"""main() should process both repeater and companion."""
module = load_script("render_reports.py")
with (
patch.object(module, "init_db"),
patch.object(module, "get_available_periods") as mock_periods,
patch.object(module, "build_reports_index_data", return_value=[]),
patch.object(module, "render_reports_index", return_value="<html>"),
patch.object(module, "safe_write", return_value=True),
):
mock_periods.return_value = []
module.main()
# Should check periods for both roles
assert mock_periods.call_count == 2
class TestRenderReportsHelpers:
"""Test helper functions in render_reports.py."""
def test_safe_write_success(self, configured_env, tmp_path):
"""safe_write should return True on success."""
module = load_script("render_reports.py")
test_file = tmp_path / "test.txt"
result = module.safe_write(test_file, "test content")
assert result is True
assert test_file.read_text() == "test content"
def test_safe_write_fails_for_missing_parent_directories(self, configured_env, tmp_path):
"""safe_write should fail when parent directories don't exist (it doesn't create them)."""
module = load_script("render_reports.py")
# Parent directory doesn't exist
test_file = tmp_path / "nested" / "dir" / "test.txt"
result = module.safe_write(test_file, "nested content")
# safe_write doesn't create parent dirs - it fails
assert result is False
assert not test_file.exists()
def test_safe_write_works_with_existing_parent_directories(self, configured_env, tmp_path):
"""safe_write should work when parent directories exist."""
module = load_script("render_reports.py")
# Create parent directory first
nested_dir = tmp_path / "existing" / "dir"
nested_dir.mkdir(parents=True)
test_file = nested_dir / "test.txt"
result = module.safe_write(test_file, "nested content")
assert result is True
assert test_file.exists()
assert test_file.read_text() == "nested content"
def test_safe_write_failure(self, configured_env):
"""safe_write should return False on failure."""
module = load_script("render_reports.py")
# Try to write to non-existent directory that can't be created
bad_path = Path("/nonexistent/dir/file.txt")
result = module.safe_write(bad_path, "test content")
assert result is False
def test_get_node_name_repeater(self, configured_env, monkeypatch):
"""get_node_name should return display name for repeater."""
monkeypatch.setenv("REPEATER_DISPLAY_NAME", "My Repeater")
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
name = module.get_node_name("repeater")
assert name == "My Repeater"
def test_get_node_name_companion(self, configured_env, monkeypatch):
"""get_node_name should return display name for companion."""
monkeypatch.setenv("COMPANION_DISPLAY_NAME", "My Companion")
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
name = module.get_node_name("companion")
assert name == "My Companion"
def test_get_node_name_unknown(self, configured_env):
"""get_node_name should capitalize unknown roles."""
module = load_script("render_reports.py")
name = module.get_node_name("unknown")
assert name == "Unknown"
def test_get_location(self, configured_env, monkeypatch):
"""get_location should return LocationInfo from config."""
monkeypatch.setenv("REPORT_LOCATION_NAME", "Test Location")
monkeypatch.setenv("REPORT_LAT", "52.37")
monkeypatch.setenv("REPORT_LON", "4.89")
monkeypatch.setenv("REPORT_ELEV", "10")
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
location = module.get_location()
assert location.name == "Test Location"
assert location.lat == 52.37
assert location.lon == 4.89
assert location.elev == 10
def test_build_reports_index_data_empty(self, configured_env):
"""build_reports_index_data should return empty years for no data."""
module = load_script("render_reports.py")
with patch.object(module, "get_available_periods", return_value=[]):
sections = module.build_reports_index_data()
assert len(sections) == 2 # repeater and companion
assert sections[0]["role"] == "repeater"
assert sections[0]["years"] == []
assert sections[1]["role"] == "companion"
assert sections[1]["years"] == []
def test_build_reports_index_data_with_periods(self, configured_env):
"""build_reports_index_data should organize periods by year."""
module = load_script("render_reports.py")
def mock_periods(role):
if role == "repeater":
return [(2024, 11), (2024, 12), (2025, 1)]
return []
with patch.object(module, "get_available_periods", side_effect=mock_periods):
sections = module.build_reports_index_data()
repeater_section = sections[0]
assert repeater_section["role"] == "repeater"
assert len(repeater_section["years"]) == 2 # 2024 and 2025
# Years should be sorted descending
assert repeater_section["years"][0]["year"] == 2025
assert repeater_section["years"][1]["year"] == 2024
class TestRenderMonthlyReport:
"""Test render_monthly_report function."""
def test_skips_empty_aggregation(self, configured_env):
"""Should skip when no data for the period."""
module = load_script("render_reports.py")
mock_agg = MagicMock()
mock_agg.daily = [] # No data
with (
patch.object(module, "aggregate_monthly", return_value=mock_agg),
patch.object(module, "safe_write") as mock_write,
):
module.render_monthly_report("repeater", 2024, 12)
# Should not write any files
mock_write.assert_not_called()
def test_writes_all_formats(self, configured_env, tmp_path, monkeypatch):
"""Should write HTML, TXT, and JSON formats."""
monkeypatch.setenv("OUT_DIR", str(tmp_path))
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
mock_agg = MagicMock()
mock_agg.daily = [{"day": 1}] # Has data
with (
patch.object(module, "aggregate_monthly", return_value=mock_agg),
patch.object(module, "render_report_page", return_value="<html>"),
patch.object(module, "format_monthly_txt", return_value="TXT"),
patch.object(module, "monthly_to_json", return_value={}),
):
module.render_monthly_report("repeater", 2024, 12)
# Check files were created
report_dir = tmp_path / "reports" / "repeater" / "2024" / "12"
assert (report_dir / "index.html").exists()
assert (report_dir / "report.txt").exists()
assert (report_dir / "report.json").exists()
def test_writes_valid_json(self, configured_env, tmp_path, monkeypatch):
"""JSON output should be valid JSON."""
monkeypatch.setenv("OUT_DIR", str(tmp_path))
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
mock_agg = MagicMock()
mock_agg.daily = [{"day": 1}]
json_data = {"period": "2024-12", "metrics": {"bat": {"avg": 3850}}}
with (
patch.object(module, "aggregate_monthly", return_value=mock_agg),
patch.object(module, "render_report_page", return_value="<html>"),
patch.object(module, "format_monthly_txt", return_value="TXT"),
patch.object(module, "monthly_to_json", return_value=json_data),
):
module.render_monthly_report("repeater", 2024, 12)
json_file = tmp_path / "reports" / "repeater" / "2024" / "12" / "report.json"
content = json_file.read_text()
parsed = json.loads(content) # Should not raise
assert parsed["period"] == "2024-12"
class TestRenderYearlyReport:
"""Test render_yearly_report function."""
def test_skips_empty_aggregation(self, configured_env):
"""Should skip when no data for the year."""
module = load_script("render_reports.py")
mock_agg = MagicMock()
mock_agg.monthly = [] # No data
with (
patch.object(module, "aggregate_yearly", return_value=mock_agg),
patch.object(module, "safe_write") as mock_write,
):
module.render_yearly_report("repeater", 2024)
# Should not write any files
mock_write.assert_not_called()
def test_writes_all_formats(self, configured_env, tmp_path, monkeypatch):
"""Should write HTML, TXT, and JSON formats."""
monkeypatch.setenv("OUT_DIR", str(tmp_path))
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
mock_agg = MagicMock()
mock_agg.monthly = [{"month": 1}] # Has data
with (
patch.object(module, "aggregate_yearly", return_value=mock_agg),
patch.object(module, "render_report_page", return_value="<html>"),
patch.object(module, "format_yearly_txt", return_value="TXT"),
patch.object(module, "yearly_to_json", return_value={}),
):
module.render_yearly_report("repeater", 2024)
# Check files were created
report_dir = tmp_path / "reports" / "repeater" / "2024"
assert (report_dir / "index.html").exists()
assert (report_dir / "report.txt").exists()
assert (report_dir / "report.json").exists()
def test_writes_valid_html(self, configured_env, tmp_path, monkeypatch):
"""HTML output should contain valid HTML structure."""
monkeypatch.setenv("OUT_DIR", str(tmp_path))
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
mock_agg = MagicMock()
mock_agg.monthly = [{"month": 1}]
html_content = "<!DOCTYPE html><html><head><title>Report</title></head><body>Content</body></html>"
with (
patch.object(module, "aggregate_yearly", return_value=mock_agg),
patch.object(module, "render_report_page", return_value=html_content),
patch.object(module, "format_yearly_txt", return_value="TXT"),
patch.object(module, "yearly_to_json", return_value={}),
):
module.render_yearly_report("repeater", 2024)
html_file = tmp_path / "reports" / "repeater" / "2024" / "index.html"
content = html_file.read_text()
assert "<!DOCTYPE html>" in content
assert "<html>" in content
assert "</html>" in content
class TestReportNavigation:
"""Test prev/next navigation in reports."""
def test_monthly_report_with_prev_next(self, configured_env, tmp_path, monkeypatch):
"""Monthly report should build prev/next navigation links."""
monkeypatch.setenv("OUT_DIR", str(tmp_path))
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
mock_agg = MagicMock()
mock_agg.daily = [{"day": 1}]
prev_report_data = None
next_report_data = None
def capture_render(agg, node_name, report_type, prev_report, next_report):
nonlocal prev_report_data, next_report_data
prev_report_data = prev_report
next_report_data = next_report
return "<html>"
with (
patch.object(module, "aggregate_monthly", return_value=mock_agg),
patch.object(module, "render_report_page", side_effect=capture_render),
patch.object(module, "format_monthly_txt", return_value="TXT"),
patch.object(module, "monthly_to_json", return_value={}),
):
# Call with prev and next periods
module.render_monthly_report(
"repeater", 2024, 6, prev_period=(2024, 5), next_period=(2024, 7)
)
assert prev_report_data is not None
assert prev_report_data["url"] == "/reports/repeater/2024/05/"
assert prev_report_data["label"] == "May 2024"
assert next_report_data is not None
assert next_report_data["url"] == "/reports/repeater/2024/07/"
assert next_report_data["label"] == "Jul 2024"
def test_yearly_report_with_prev_next(self, configured_env, tmp_path, monkeypatch):
"""Yearly report should build prev/next navigation links."""
monkeypatch.setenv("OUT_DIR", str(tmp_path))
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
mock_agg = MagicMock()
mock_agg.monthly = [{"month": 1}]
prev_report_data = None
next_report_data = None
def capture_render(agg, node_name, report_type, prev_report, next_report):
nonlocal prev_report_data, next_report_data
prev_report_data = prev_report
next_report_data = next_report
return "<html>"
with (
patch.object(module, "aggregate_yearly", return_value=mock_agg),
patch.object(module, "render_report_page", side_effect=capture_render),
patch.object(module, "format_yearly_txt", return_value="TXT"),
patch.object(module, "yearly_to_json", return_value={}),
):
# Call with prev and next years
module.render_yearly_report("repeater", 2024, prev_year=2023, next_year=2025)
assert prev_report_data is not None
assert prev_report_data["url"] == "/reports/repeater/2023/"
assert prev_report_data["label"] == "2023"
assert next_report_data is not None
assert next_report_data["url"] == "/reports/repeater/2025/"
assert next_report_data["label"] == "2025"
class TestMainWithData:
"""Test main() function with actual data periods."""
def test_main_renders_reports_when_data_exists(self, configured_env, tmp_path, monkeypatch):
"""main() should render monthly and yearly reports when data exists."""
monkeypatch.setenv("OUT_DIR", str(tmp_path))
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
def mock_periods(role):
if role == "repeater":
return [(2024, 11), (2024, 12)]
return []
mock_monthly_agg = MagicMock()
mock_monthly_agg.daily = [{"day": 1}]
mock_yearly_agg = MagicMock()
mock_yearly_agg.monthly = [{"month": 11}]
with (
patch.object(module, "init_db"),
patch.object(module, "get_available_periods", side_effect=mock_periods),
patch.object(module, "aggregate_monthly", return_value=mock_monthly_agg),
patch.object(module, "aggregate_yearly", return_value=mock_yearly_agg),
patch.object(module, "render_report_page", return_value="<html>"),
patch.object(module, "format_monthly_txt", return_value="TXT"),
patch.object(module, "format_yearly_txt", return_value="TXT"),
patch.object(module, "monthly_to_json", return_value={}),
patch.object(module, "yearly_to_json", return_value={}),
patch.object(module, "render_reports_index", return_value="<html>"),
):
module.main()
# Verify reports were created
repeater_dir = tmp_path / "reports" / "repeater"
assert (repeater_dir / "2024" / "11" / "index.html").exists()
assert (repeater_dir / "2024" / "12" / "index.html").exists()
assert (repeater_dir / "2024" / "index.html").exists()
def test_main_creates_index_with_content(self, configured_env, tmp_path, monkeypatch):
"""main() should create reports index with valid content."""
monkeypatch.setenv("OUT_DIR", str(tmp_path))
import meshmon.env
meshmon.env._config = None
module = load_script("render_reports.py")
index_html = """<!DOCTYPE html>
<html>
<head><title>Reports Index</title></head>
<body><h1>Reports</h1></body>
</html>"""
with (
patch.object(module, "init_db"),
patch.object(module, "get_available_periods", return_value=[]),
patch.object(module, "build_reports_index_data", return_value=[]),
patch.object(module, "render_reports_index", return_value=index_html),
):
module.main()
index_file = tmp_path / "reports" / "index.html"
assert index_file.exists()
content = index_file.read_text()
assert "<!DOCTYPE html>" in content
assert "Reports" in content

121
tests/scripts/test_smoke.py Normal file
View File

@@ -0,0 +1,121 @@
"""
Smoke tests for all executable scripts.
These tests verify that scripts can be imported without errors,
ensuring all dependencies and syntax are correct.
"""
from pathlib import Path
import pytest
from tests.scripts.conftest import load_script_module
# Scripts directory
SCRIPTS_DIR = Path(__file__).parent.parent.parent / "scripts"
class TestScriptImports:
"""Smoke tests - verify all scripts can be imported."""
def test_collect_companion_imports(self):
"""collect_companion.py imports without error."""
module = load_script_module("collect_companion.py")
assert hasattr(module, "main")
assert hasattr(module, "collect_companion")
assert callable(module.main)
def test_collect_repeater_imports(self):
"""collect_repeater.py imports without error."""
module = load_script_module("collect_repeater.py")
assert hasattr(module, "main")
assert hasattr(module, "collect_repeater")
assert hasattr(module, "find_repeater_contact")
assert callable(module.main)
def test_render_charts_imports(self):
"""render_charts.py imports without error."""
module = load_script_module("render_charts.py")
assert hasattr(module, "main")
assert callable(module.main)
def test_render_site_imports(self):
"""render_site.py imports without error."""
module = load_script_module("render_site.py")
assert hasattr(module, "main")
assert callable(module.main)
def test_render_reports_imports(self):
"""render_reports.py imports without error."""
module = load_script_module("render_reports.py")
assert hasattr(module, "main")
assert hasattr(module, "safe_write")
assert hasattr(module, "get_node_name")
assert hasattr(module, "get_location")
assert hasattr(module, "render_monthly_report")
assert hasattr(module, "render_yearly_report")
assert hasattr(module, "build_reports_index_data")
assert callable(module.main)
def test_generate_snapshots_imports(self):
"""generate_snapshots.py imports without error."""
module = load_script_module("generate_snapshots.py")
# This utility script uses direct function calls instead of main()
assert hasattr(module, "generate_svg_snapshots")
assert hasattr(module, "generate_txt_snapshots")
assert callable(module.generate_svg_snapshots)
class TestScriptStructure:
"""Verify scripts follow expected patterns."""
@pytest.mark.parametrize(
"script_name",
[
"collect_companion.py",
"collect_repeater.py",
"render_charts.py",
"render_site.py",
"render_reports.py",
],
)
def test_script_has_main_guard(self, script_name: str):
"""Scripts have if __name__ == '__main__' guard."""
script_path = SCRIPTS_DIR / script_name
content = script_path.read_text()
assert 'if __name__ == "__main__":' in content
@pytest.mark.parametrize(
"script_name",
[
"collect_companion.py",
"collect_repeater.py",
"render_charts.py",
"render_site.py",
"render_reports.py",
],
)
def test_script_has_docstring(self, script_name: str):
"""Scripts have module-level docstring."""
script_path = SCRIPTS_DIR / script_name
content = script_path.read_text()
# Should start with shebang, then docstring
lines = content.split("\n")
assert lines[0].startswith("#!/")
assert lines[1] == '"""'
@pytest.mark.parametrize(
"script_name",
[
"collect_companion.py",
"collect_repeater.py",
"render_charts.py",
"render_site.py",
"render_reports.py",
],
)
def test_script_calls_init_db(self, script_name: str):
"""Scripts initialize database before operations."""
script_path = SCRIPTS_DIR / script_name
content = script_path.read_text()
assert "init_db()" in content