Files
meshcore-stats/tests/charts/test_transforms.py
2026-01-13 17:09:01 +01:00

255 lines
8.8 KiB
Python

"""Tests for chart data transformations (counter-to-rate, etc.)."""
from datetime import datetime, timedelta
import pytest
from meshmon.charts import (
PERIOD_CONFIG,
load_timeseries_from_db,
)
from meshmon.db import insert_metrics
BASE_TIME = datetime(2024, 1, 1, 0, 0, 0)
class TestCounterToRateConversion:
"""Tests for counter metric rate conversion."""
def test_calculates_rate_from_deltas(self, initialized_db, configured_env):
"""Counter values are converted to rate of change."""
base_ts = 1704067200 # 2024-01-01 00:00:00 UTC
# Insert increasing counter values (15 min apart)
for i in range(5):
ts = base_ts + (i * 900) # 15 minutes
insert_metrics(ts, "repeater", {"nb_recv": float(i * 100)}, initialized_db)
ts = load_timeseries_from_db(
role="repeater",
metric="nb_recv",
end_time=datetime.fromtimestamp(base_ts + 4 * 900),
lookback=timedelta(hours=2),
period="day",
)
# Counter produces N-1 rate points from N values
assert len(ts.points) == 4
# All rates should be positive (counter increasing)
expected_rate = (100.0 / 900.0) * 60.0
for p in ts.points:
assert p.value == pytest.approx(expected_rate)
def test_handles_counter_reset(self, initialized_db, configured_env):
"""Counter resets (negative delta) are skipped."""
base_ts = 1704067200
# Insert values with a reset
insert_metrics(base_ts, "repeater", {"nb_recv": 100.0}, initialized_db)
insert_metrics(base_ts + 900, "repeater", {"nb_recv": 200.0}, initialized_db)
insert_metrics(base_ts + 1800, "repeater", {"nb_recv": 50.0}, initialized_db) # Reset!
insert_metrics(base_ts + 2700, "repeater", {"nb_recv": 150.0}, initialized_db)
ts = load_timeseries_from_db(
role="repeater",
metric="nb_recv",
end_time=datetime.fromtimestamp(base_ts + 2700),
lookback=timedelta(hours=1),
period="day",
)
# Reset point should be skipped, so fewer points
assert len(ts.points) == 2 # Only valid deltas
expected_rate = (100.0 / 900.0) * 60.0
assert ts.points[0].timestamp == datetime.fromtimestamp(base_ts + 900)
assert ts.points[1].timestamp == datetime.fromtimestamp(base_ts + 2700)
assert ts.points[0].value == pytest.approx(expected_rate)
assert ts.points[1].value == pytest.approx(expected_rate)
def test_counter_rate_short_interval_under_step_is_skipped(
self,
initialized_db,
configured_env,
monkeypatch,
):
"""Short sampling intervals are skipped to avoid rate spikes."""
base_ts = 1704067200
monkeypatch.setenv("REPEATER_STEP", "900")
import meshmon.env
meshmon.env._config = None
insert_metrics(base_ts, "repeater", {"nb_recv": 0.0}, initialized_db)
insert_metrics(base_ts + 900, "repeater", {"nb_recv": 100.0}, initialized_db)
insert_metrics(base_ts + 904, "repeater", {"nb_recv": 110.0}, initialized_db)
insert_metrics(base_ts + 1800, "repeater", {"nb_recv": 200.0}, initialized_db)
ts = load_timeseries_from_db(
role="repeater",
metric="nb_recv",
end_time=datetime.fromtimestamp(base_ts + 1800),
lookback=timedelta(hours=2),
period="day",
)
expected_rate = (100.0 / 900.0) * 60.0
assert len(ts.points) == 2
assert ts.points[0].timestamp == datetime.fromtimestamp(base_ts + 900)
assert ts.points[1].timestamp == datetime.fromtimestamp(base_ts + 1800)
for point in ts.points:
assert point.value == pytest.approx(expected_rate)
def test_applies_scale_factor(self, initialized_db, configured_env, monkeypatch):
"""Counter rate is scaled (typically x60 for per-minute)."""
base_ts = 1704067200
monkeypatch.setenv("REPEATER_STEP", "60")
import meshmon.env
meshmon.env._config = None
# Insert values 60 seconds apart for easy math
insert_metrics(base_ts, "repeater", {"nb_recv": 0.0}, initialized_db)
insert_metrics(base_ts + 60, "repeater", {"nb_recv": 60.0}, initialized_db)
ts = load_timeseries_from_db(
role="repeater",
metric="nb_recv",
end_time=datetime.fromtimestamp(base_ts + 60),
lookback=timedelta(hours=1),
period="day",
)
# 60 packets in 60 seconds = 1/sec = 60/min with scale=60
assert len(ts.points) == 1
assert ts.points[0].value == pytest.approx(60.0)
def test_single_value_returns_empty(self, initialized_db, configured_env):
"""Single counter value cannot compute rate."""
base_ts = 1704067200
insert_metrics(base_ts, "repeater", {"nb_recv": 100.0}, initialized_db)
ts = load_timeseries_from_db(
role="repeater",
metric="nb_recv",
end_time=datetime.fromtimestamp(base_ts),
lookback=timedelta(hours=1),
period="day",
)
assert ts.is_empty
class TestGaugeValueTransform:
"""Tests for gauge metric value transformation."""
def test_applies_voltage_transform(self, initialized_db, configured_env):
"""Voltage transform converts mV to V."""
base_ts = 1704067200
# Insert millivolt value
insert_metrics(base_ts, "companion", {"battery_mv": 3850.0}, initialized_db)
ts = load_timeseries_from_db(
role="companion",
metric="battery_mv",
end_time=datetime.fromtimestamp(base_ts),
lookback=timedelta(hours=1),
period="day",
)
# Should be converted to volts
assert len(ts.points) == 1
assert ts.points[0].value == pytest.approx(3.85)
def test_no_transform_for_bat_pct(self, initialized_db, configured_env):
"""Battery percentage has no transform."""
base_ts = 1704067200
insert_metrics(base_ts, "repeater", {"bat_pct": 75.0}, initialized_db)
ts = load_timeseries_from_db(
role="repeater",
metric="bat_pct",
end_time=datetime.fromtimestamp(base_ts),
lookback=timedelta(hours=1),
period="day",
)
assert ts.points[0].value == pytest.approx(75.0)
class TestTimeBinning:
"""Tests for time series aggregation/binning."""
def test_no_binning_for_day(self):
"""Day period uses raw data (no binning)."""
assert PERIOD_CONFIG["day"].bin_seconds is None
def test_30_min_bins_for_week(self):
"""Week period uses 30-minute bins."""
assert PERIOD_CONFIG["week"].bin_seconds == 1800
def test_2_hour_bins_for_month(self):
"""Month period uses 2-hour bins."""
assert PERIOD_CONFIG["month"].bin_seconds == 7200
def test_1_day_bins_for_year(self):
"""Year period uses 1-day bins."""
assert PERIOD_CONFIG["year"].bin_seconds == 86400
def test_binning_reduces_point_count(self, initialized_db, configured_env):
"""Binning aggregates multiple points per bin."""
base_ts = 1704067200
# Insert many points (one per minute for an hour)
for i in range(60):
ts = base_ts + (i * 60)
insert_metrics(ts, "repeater", {"bat": 3850.0 + i}, initialized_db)
ts = load_timeseries_from_db(
role="repeater",
metric="bat",
end_time=datetime.fromtimestamp(base_ts + 3600),
lookback=timedelta(days=7), # Week period has 30-min bins
period="week",
)
# 60 points over 1 hour with 30-min bins = 2-3 bins
assert len(ts.points) <= 3
class TestEmptyData:
"""Tests for handling empty/missing data."""
def test_empty_when_no_metric_data(self, initialized_db, configured_env):
"""Returns empty TimeSeries when metric has no data."""
ts = load_timeseries_from_db(
role="repeater",
metric="nonexistent",
end_time=BASE_TIME,
lookback=timedelta(days=1),
period="day",
)
assert ts.is_empty
assert ts.metric == "nonexistent"
assert ts.role == "repeater"
assert ts.period == "day"
def test_empty_when_no_data_in_range(self, initialized_db, configured_env):
"""Returns empty TimeSeries when no data in time range."""
old_ts = 1000000 # Very old timestamp
insert_metrics(old_ts, "repeater", {"bat": 3850.0}, initialized_db)
ts = load_timeseries_from_db(
role="repeater",
metric="bat",
end_time=BASE_TIME,
lookback=timedelta(hours=1),
period="day",
)
assert ts.is_empty