test: sensor tests with mock implementations and additional assertions

This commit is contained in:
Lloyd
2026-05-21 14:23:02 +01:00
parent 22b39e5715
commit 4cf04f87d1
+256 -9
View File
@@ -1,4 +1,26 @@
from repeater.sensors import SensorBase, SensorConfigManager, SensorManager, SensorRegistry
import importlib.util
import sys
import types
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from repeater.sensors import SensorBase, SensorManager, SensorRegistry
from repeater.sensors import ens210 as ens210_module
from repeater.sensors import ina219 as ina219_module
from repeater.sensors import shtc3 as shtc3_module
from repeater.sensors import waveshare_ups_d as waveshare_ups_d_module
from repeater.sensors import waveshare_ups_e as waveshare_ups_e_module
from repeater.sensors.ens210 import ENS210Sensor
from repeater.sensors.ina219 import INA219Sensor
from repeater.sensors.shtc3 import SHTC3Sensor
from repeater.sensors.waveshare_ups_d import WaveshareUpsDSensor
from repeater.sensors.waveshare_ups_e import WaveshareUpsESensor
class _TestRegistry(SensorRegistry):
_factories = {}
class _DummySensor(SensorBase):
@@ -15,26 +37,69 @@ class _FailingSensor(SensorBase):
raise RuntimeError("boom")
SensorRegistry.register("dummy", _DummySensor)
SensorRegistry.register("failing", _FailingSensor)
_TestRegistry.register("dummy", _DummySensor)
_TestRegistry.register("failing", _FailingSensor)
def test_sensor_config_manager_reads_sensor_definitions():
def _install_fake_smbus2(monkeypatch, bus_class, *, i2c_msg=None):
module = types.ModuleType("smbus2")
setattr(module, "SMBus", bus_class)
if i2c_msg is not None:
setattr(module, "i2c_msg", i2c_msg)
monkeypatch.setitem(sys.modules, "smbus2", module)
monkeypatch.setattr(SensorBase, "ensure_python_modules", lambda self, modules: True)
return module
def _swap_word(value):
return ((value & 0xFF) << 8) | ((value >> 8) & 0xFF)
def _load_hardware_stats_sensor_module(monkeypatch):
fake_data_acquisition = types.ModuleType("repeater.data_acquisition")
fake_data_acquisition.__path__ = []
fake_hardware_stats = types.ModuleType("repeater.data_acquisition.hardware_stats")
setattr(fake_hardware_stats, "HardwareStatsCollector", MagicMock)
monkeypatch.setitem(sys.modules, "repeater.data_acquisition", fake_data_acquisition)
monkeypatch.setitem(sys.modules, "repeater.data_acquisition.hardware_stats", fake_hardware_stats)
module_name = "repeater.sensors._hardware_stats_test"
module_path = Path(__file__).resolve().parents[1] / "repeater" / "sensors" / "hardware_stats.py"
spec = importlib.util.spec_from_file_location(module_name, module_path)
assert spec is not None and spec.loader is not None
module = importlib.util.module_from_spec(spec)
monkeypatch.setitem(sys.modules, module_name, module)
spec.loader.exec_module(module)
return module
def test_sensor_manager_summary_reflects_sensor_config():
config = {
"sensors": {
"enabled": True,
"poll_interval_seconds": 12,
"auto_install_packages": True,
"definitions": [
{"name": "demo", "type": "dummy", "settings": {"value": 7}},
{"name": "skip-me"},
"not-a-dict",
],
}
}
manager = SensorConfigManager(config)
with patch.object(SensorManager, "_load_sensor_module", return_value=None):
manager = SensorManager(config, registry=_TestRegistry)
assert manager.is_enabled() is True
assert manager.get_poll_interval_seconds() == 12.0
assert manager.get_definitions()[0]["type"] == "dummy"
definitions = manager._get_sensor_definitions()
summary = manager.get_summary()
assert definitions[0]["auto_install_packages"] is True
assert summary["enabled"] is True
assert summary["poll_interval_seconds"] == 12.0
assert summary["configured"] == 2
assert summary["loaded"] == 1
def test_sensor_manager_loads_and_reads_sensors_without_stopping_on_failure():
@@ -49,7 +114,8 @@ def test_sensor_manager_loads_and_reads_sensors_without_stopping_on_failure():
}
}
manager = SensorManager(config)
with patch.object(SensorManager, "_load_sensor_module", return_value=None):
manager = SensorManager(config, registry=_TestRegistry)
summary = manager.get_summary()
assert summary["configured"] == 3
@@ -61,3 +127,184 @@ def test_sensor_manager_loads_and_reads_sensors_without_stopping_on_failure():
assert readings[0]["data"]["value"] == 11
assert readings[1]["ok"] is False
assert readings[1]["error"].startswith("RuntimeError:")
def test_hardware_stats_sensor_reads_from_collector(monkeypatch):
hardware_stats_module = _load_hardware_stats_sensor_module(monkeypatch)
collector = MagicMock()
collector.get_stats.return_value = {"cpu": {"usage_percent": 42.0}}
with patch.object(hardware_stats_module, "HardwareStatsCollector", return_value=collector):
reading = hardware_stats_module.HardwareStatsSensor("host").read()
assert reading["ok"] is True
assert reading["data"] == {"cpu": {"usage_percent": 42.0}}
def test_ina219_sensor_reads_voltage_current_and_power(monkeypatch):
class _Bus:
def __init__(self, bus_number):
self.bus_number = bus_number
def write_word_data(self, addr, register, value):
return None
def read_word_data(self, addr, register):
values = {
ina219_module._REG_BUS_VOLTAGE: 0x5DC0,
ina219_module._REG_SHUNT_VOLTAGE: 1234,
ina219_module._REG_CURRENT: 1000,
ina219_module._REG_POWER: 500,
}
return _swap_word(values[register])
def close(self):
return None
_install_fake_smbus2(monkeypatch, _Bus)
reading = INA219Sensor("power_monitor").read()
assert reading["ok"] is True
assert reading["data"]["bus_voltage_v"] == 12.0
assert reading["data"]["shunt_voltage_v"] == 0.01234
assert reading["data"]["current_ma"] == pytest.approx(61.04, abs=0.01)
assert reading["data"]["power_mw"] == pytest.approx(610.35, abs=0.01)
def test_shtc3_sensor_reads_temperature_and_humidity(monkeypatch):
class _Msg:
def __init__(self, addr, payload, *, is_read=False):
self.addr = addr
self.payload = list(payload)
self.is_read = is_read
def __iter__(self):
return iter(self.payload)
class _I2CMsg:
@staticmethod
def write(addr, payload):
return _Msg(addr, payload)
@staticmethod
def read(addr, length):
return _Msg(addr, [0] * length, is_read=True)
class _Bus:
def __init__(self, bus_number):
self.bus_number = bus_number
def i2c_rdwr(self, msg):
if getattr(msg, "is_read", False):
msg.payload[:] = [0x66, 0x66, 0x00, 0x80, 0x00, 0x00]
def close(self):
return None
_install_fake_smbus2(monkeypatch, _Bus, i2c_msg=_I2CMsg)
monkeypatch.setattr(shtc3_module.time, "sleep", lambda *_args, **_kwargs: None)
reading = SHTC3Sensor("ambient").read()
assert reading["ok"] is True
assert reading["data"] == {
"temperature_c": 25.0,
"temperature_f": 77.0,
"humidity_pct": 50.0,
}
def test_ens210_sensor_reads_temperature_and_humidity(monkeypatch):
class _Bus:
def __init__(self, bus_number):
self.bus_number = bus_number
def write_byte_data(self, addr, register, value):
return None
def read_i2c_block_data(self, addr, register, length):
if register == ens210_module._REG_T_VAL:
return [0x4A, 0x49, 0x01]
if register == ens210_module._REG_H_VAL:
return [0x00, 0x6E, 0x01]
raise AssertionError(f"unexpected register: {register}")
def close(self):
return None
_install_fake_smbus2(monkeypatch, _Bus)
monkeypatch.setattr(ens210_module.time, "sleep", lambda *_args, **_kwargs: None)
reading = ENS210Sensor("ambient").read()
assert reading["ok"] is True
assert reading["data"] == {
"temperature_c": 20.01,
"humidity_pct": 55.0,
}
def test_waveshare_ups_d_sensor_reads_battery_state(monkeypatch):
class _Bus:
def __init__(self, bus_number):
self.bus_number = bus_number
def write_i2c_block_data(self, addr, register, data):
return None
def read_i2c_block_data(self, addr, register, length):
values = {
waveshare_ups_d_module._REG_BUS: [0x1F, 0x40],
waveshare_ups_d_module._REG_SHUNT: [0x00, 0x64],
waveshare_ups_d_module._REG_CURRENT: [0xFC, 0x18],
waveshare_ups_d_module._REG_POWER: [0x00, 0x64],
}
return values[register]
def close(self):
return None
_install_fake_smbus2(monkeypatch, _Bus)
monkeypatch.setattr(waveshare_ups_d_module.time, "sleep", lambda *_args, **_kwargs: None)
reading = WaveshareUpsDSensor("battery").read()
assert reading["ok"] is True
assert reading["data"]["bus_voltage_v"] == 4.0
assert reading["data"]["battery_percent"] == 85
assert reading["data"]["charge_state"] == "charging"
assert reading["data"]["current_ma"] == pytest.approx(-152.4, abs=0.1)
assert reading["data"]["power_mw"] == pytest.approx(304.8, abs=0.1)
def test_waveshare_ups_e_sensor_reads_pack_state(monkeypatch):
class _Bus:
def __init__(self, bus_number):
self.bus_number = bus_number
def read_i2c_block_data(self, addr, register, length):
values = {
waveshare_ups_e_module._REG_STATUS: [waveshare_ups_e_module._FLAG_CHARGING],
waveshare_ups_e_module._REG_VBUS: [0xA0, 0x0F, 0x2C, 0x01, 0x58, 0x1B],
waveshare_ups_e_module._REG_BATT: [0x80, 0x3E, 0xFA, 0x00, 0x4E, 0x00, 0x98, 0x08, 0x2D, 0x00, 0x5A, 0x00],
waveshare_ups_e_module._REG_CELLS: [0x80, 0x0C, 0x6C, 0x0C, 0x1C, 0x0C, 0x76, 0x0C],
}
return values[register]
def close(self):
return None
_install_fake_smbus2(monkeypatch, _Bus)
reading = WaveshareUpsESensor("battery").read()
assert reading["ok"] is True
assert reading["data"]["charge_state"] == "charging"
assert reading["data"]["battery_voltage_mv"] == 16000
assert reading["data"]["battery_percent"] == 78
assert reading["data"]["remaining_capacity_mah"] == 2200
assert reading["data"]["cell_voltages_mv"] == [3200, 3180, 3100, 3190]
assert reading["data"]["low_cell_warning"] is True
assert reading["data"]["time_to_full_min"] == 90
assert "time_to_empty_min" not in reading["data"]