feat: add ENS210 temperature/humidity sensor plug-in

Adds support for the ENS210 relative humidity and temperature sensor
as a new plug-in under repeater/sensors/ens210.py. Also adds a
commented configuration example to config.yaml.example and a
contributor guide at docs/adding_sensors.md explaining how to add
further sensor plug-ins.

## Implementation notes

### Why smbus2 instead of an Adafruit/CircuitPython library

The ENS210 has no maintained Adafruit CircuitPython driver. The
available third-party options are either unmaintained or bring in the
full Blinka/CircuitPython hardware-abstraction stack as a dependency.
smbus2 is a thin, widely-packaged wrapper around the Linux i2c-dev
kernel interface that is already present on Raspberry Pi OS and most
Debian-based systems. It has no transitive dependencies and adds no
abstraction cost.

The ENS210 protocol is simple enough that direct register access is
preferable: two writes to start a measurement (REG_SENS_RUN + REG_SENS_START)
and two three-byte block reads to retrieve temperature and humidity.
The status/validity bit is checked inline rather than relying on a
library to surface it. There is no value a higher-level driver would
add here.

### Read strategy

A fixed post-trigger delay is unreliable — the sensor datasheet quotes
~130 ms typical conversion time but the actual ready time varies. The
implementation instead polls the data-valid status bit (bit 1 of the
third byte in each register block) every 50 ms for up to
read_timeout_seconds (default 1.0 s), breaking as soon as both T and
H report valid data. This is the same approach used in the validated
reference script.

The I2C bus is opened and closed on every read rather than kept open
across poll cycles. Keeping a persistent SMBus handle caused subsequent
reads to time out, consistent with the Linux i2c-dev file descriptor
accumulating state between transactions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Joshua Mesilane
2026-05-13 11:46:26 +10:00
parent 66532a0647
commit 9bfe1259da
3 changed files with 279 additions and 0 deletions
+10
View File
@@ -236,6 +236,16 @@ sensors:
# max_expected_amps: 2.0
# shunt_ohms: 0.1
# Example ENS210 temperature/humidity sensor (commented out by default)
# - type: ens210
# name: ambient
# enabled: true
# auto_install_packages: true
# settings:
# i2c_address: 67 # 0x43 in decimal (default ENS210 address)
# bus_number: 1 # I2C bus number (1 for Raspberry Pi default)
# read_timeout_seconds: 1.0 # Max seconds to wait for valid data (polls every 50 ms)
# Mesh Network Configuration
mesh:
# Unscoped flood policy - controls whether the repeater allows or denies unscoped flooding
+167
View File
@@ -0,0 +1,167 @@
# Adding a New Sensor Plug-in
Sensors in pyMC_Repeater are self-contained modules that live in `repeater/sensors/`. The subsystem is plug-in based: adding a new sensor requires only one new file. The manager discovers and loads it automatically at runtime by importing the module named after the sensor type.
---
## How the sensor subsystem works
| Component | File | Role |
|-----------|------|------|
| `SensorBase` | `repeater/sensors/base.py` | Abstract base class all sensors inherit from |
| `SensorRegistry` | `repeater/sensors/registry.py` | Maps type strings → sensor classes via `@SensorRegistry.register` |
| `SensorManager` | `repeater/sensors/manager.py` | Reads config, imports sensor modules, polls sensors in background |
When `SensorManager` loads a sensor of type `"foo"`, it calls `importlib.import_module("repeater.sensors.foo")`. That import runs the `@SensorRegistry.register("foo")` decorator on your class, making it available. No changes to `__init__.py` or the manager are needed.
---
## Step-by-step guide
### 1. Create `repeater/sensors/<type>.py`
Name the file after the sensor type string (lowercase, underscores for hyphens). The type string is what operators write in `config.yaml`.
Minimal template:
```python
"""
<SensorName> sensor plug-in.
Requires: pip install <package>
Config example:
- type: <type>
name: "my-sensor"
enabled: true
auto_install_packages: false
settings:
some_option: value
"""
from __future__ import annotations
from typing import Any, Dict, Optional
from .base import SensorBase
from .registry import SensorRegistry
@SensorRegistry.register("<type>")
class MySensor(SensorBase):
sensor_type = "<type>"
def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, log=None):
super().__init__(name=name, config=config, log=log)
# Read settings with safe defaults
self.some_option = self.settings.get("some_option", "default")
self.available = False
if not self.ensure_python_modules([("import_name", "pip-package-name")]):
return # logs a warning; sensor will report unavailable
try:
import import_name # type: ignore[import-not-found]
# Initialise hardware here
self.device = import_name.Device(...)
self.available = True
self.log.info("MySensor initialized")
except Exception as exc:
self.log.warning("MySensor init failed: %s", exc)
self.available = False
def _read(self) -> Dict[str, Any]:
if not self.available:
raise RuntimeError("device not available")
try:
return {
"field_one": ...,
"field_two": ...,
}
except Exception as exc:
raise RuntimeError(f"read failed: {exc}") from exc
```
Key rules:
- **`sensor_type`** class attribute must match the string passed to `@SensorRegistry.register`.
- **`self.settings`** is the `settings:` block from the sensor's config entry (a plain dict).
- **`ensure_python_modules`** handles missing dependencies gracefully. Pass a list of `(import_name, pip_package)` tuples. Returns `False` and logs a warning if any are missing and `auto_install_packages` is `false`; installs them if `true`.
- **`_read`** must return a flat `dict[str, Any]`. The base class wraps it in a standard envelope (`name`, `type`, `ok`, `timestamp`, `data`, optional `error`).
- **`_read`** must raise `RuntimeError` on failure — the base class catches it, marks `ok=False`, and logs it without crashing the polling loop.
- All hardware initialisation belongs in `__init__`, not in `_read`. Keep `_read` fast.
- Lazy-import third-party packages inside `__init__` (after `ensure_python_modules` returns `True`) so the module can be imported on hosts that don't have the package installed.
### 2. Add a commented example to `config.yaml.example`
Find the `sensors.definitions` block and add your sensor alongside the existing examples:
```yaml
# Example MySensor (commented out by default)
# - type: <type>
# name: my-sensor
# enabled: true
# auto_install_packages: true
# settings:
# some_option: value
```
Use decimal for numeric config values that would naturally be written in hex (e.g. I2C addresses: `0x43` = `67`). Add a comment showing both forms if the value is commonly written in hex.
### 3. Test locally
Add a test to `tests/test_sensors.py` that:
1. Registers a lightweight mock of your sensor (or stubs the hardware import).
2. Verifies that `SensorManager` loads it and `read_all()` returns the expected structure.
3. Verifies that a hardware failure in `_read` produces an `ok=False` result rather than raising.
Example pattern from the existing test suite:
```python
class _MockMySensor(SensorBase):
sensor_type = "<type>"
def _read(self):
return {"field_one": 42.0, "field_two": 55.0}
SensorRegistry.register("<type>", _MockMySensor)
def test_my_sensor_loads_and_reads():
config = {
"sensors": {
"enabled": True,
"definitions": [
{"name": "test-sensor", "type": "<type>", "settings": {}},
],
}
}
manager = SensorManager(config)
readings = manager.read_all()
assert readings[0]["ok"] is True
assert readings[0]["data"]["field_one"] == 42.0
```
---
## Checklist
- [ ] `repeater/sensors/<type>.py` created
- [ ] `sensor_type` class attribute matches the `@SensorRegistry.register` key
- [ ] All settings read from `self.settings` with sensible defaults
- [ ] `ensure_python_modules` called before any third-party import
- [ ] Hardware initialised in `__init__`, not `_read`
- [ ] `_read` raises `RuntimeError` on failure (never returns `None` or partial data silently)
- [ ] Commented example added to `config.yaml.example`
- [ ] Unit test added to `tests/test_sensors.py`
---
## Existing sensors
| Type | File | Hardware |
|------|------|----------|
| `hardware_stats` | `repeater/sensors/hardware_stats.py` | Host CPU / memory / disk / network (via `psutil`) |
| `ina219` | `repeater/sensors/ina219.py` | INA219 I²C current/voltage/power monitor |
| `ens210` | `repeater/sensors/ens210.py` | ENS210 I²C relative humidity and temperature sensor |
+102
View File
@@ -0,0 +1,102 @@
"""
ENS210 relative humidity and temperature sensor plug-in.
Requires: pip install smbus2
Config example:
- type: ens210
name: "ambient"
enabled: true
auto_install_packages: false
settings:
i2c_address: 0x43 # Default ENS210 I2C address
bus_number: 1 # I2C bus number (1 for Raspberry Pi default)
read_timeout_seconds: 1.0 # Max time to wait for valid data (polls every 50 ms)
"""
from __future__ import annotations
import time
from typing import Any, Dict, Optional
from .base import SensorBase
from .registry import SensorRegistry
# ENS210 register addresses
_REG_SENS_RUN = 0x21
_REG_SENS_START = 0x22
_REG_T_VAL = 0x30
_REG_H_VAL = 0x33
@SensorRegistry.register("ens210")
class ENS210Sensor(SensorBase):
sensor_type = "ens210"
def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, log=None):
super().__init__(name=name, config=config, log=log)
self.i2c_address = int(self.settings.get("i2c_address", 0x43))
self.bus_number = int(self.settings.get("bus_number", 1))
self._poll_interval = 0.05 # 50 ms between validity checks
self._poll_attempts = max(1, int(float(self.settings.get("read_timeout_seconds", 1.0)) / self._poll_interval))
self.available = False
if not self.ensure_python_modules([("smbus2", "smbus2")]):
return
try:
import smbus2 # type: ignore[import-not-found]
self._smbus2 = smbus2
# Verify the bus is accessible
smbus2.SMBus(self.bus_number).close()
self.available = True
self.log.info(
"ENS210 initialized (addr=0x%02X, bus=%d)",
self.i2c_address,
self.bus_number,
)
except Exception as exc:
self.log.warning(
"ENS210 init failed (addr=0x%02X, bus=%d): %s",
self.i2c_address,
self.bus_number,
exc,
)
self.available = False
def _read(self) -> Dict[str, Any]:
"""Read temperature and humidity from ENS210."""
if not self.available:
raise RuntimeError("ENS210 device not available")
bus = self._smbus2.SMBus(self.bus_number)
try:
bus.write_byte_data(self.i2c_address, _REG_SENS_RUN, 0x03)
bus.write_byte_data(self.i2c_address, _REG_SENS_START, 0x03)
for _ in range(self._poll_attempts):
time.sleep(self._poll_interval)
t_data = bus.read_i2c_block_data(self.i2c_address, _REG_T_VAL, 3)
h_data = bus.read_i2c_block_data(self.i2c_address, _REG_H_VAL, 3)
if ((t_data[2] >> 1) & 0x01) and ((h_data[2] >> 1) & 0x01):
break
else:
raise RuntimeError(
f"ENS210 measurement timed out after {self._poll_attempts * self._poll_interval:.1f}s"
)
t_raw = t_data[0] | (t_data[1] << 8)
h_raw = h_data[0] | (h_data[1] << 8)
return {
"temperature_c": round(t_raw / 64.0 - 273.15, 2),
"humidity_pct": round(h_raw / 512.0, 2),
}
except RuntimeError:
raise
except Exception as exc:
raise RuntimeError(f"ENS210 read failed: {exc}") from exc
finally:
bus.close()