diff --git a/repeater/sensors/shtc3.py b/repeater/sensors/shtc3.py new file mode 100644 index 0000000..7ec3099 --- /dev/null +++ b/repeater/sensors/shtc3.py @@ -0,0 +1,110 @@ +""" +SHTC3 temperature and humidity sensor plug-in (RAK1901 WisBlock sensor). + +Requires: pip install smbus2 + +The SHTC3 uses 16-bit command words and requires a raw I2C read (no register +byte prefix), so smbus2.i2c_rdwr is used instead of the standard SMBus API. + +Config example: + - type: shtc3 + name: "ambient" + enabled: true + auto_install_packages: false + settings: + i2c_address: 0x70 # SHTC3 fixed I2C address + bus_number: 1 # I2C bus number (1 for Raspberry Pi default) +""" + +from __future__ import annotations + +import time +from typing import Any, Dict, Optional + +from .base import SensorBase +from .registry import SensorRegistry + +# SHTC3 two-byte command words +_CMD_WAKE = [0x35, 0x17] +_CMD_MEAS = [0x7C, 0xA2] # T-first, normal power mode +_CMD_SLEEP = [0xB0, 0x98] + + +@SensorRegistry.register("shtc3") +class SHTC3Sensor(SensorBase): + sensor_type = "shtc3" + + def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, log=None): + super().__init__(name=name, config=config, log=log) + + self.i2c_address = int(self.settings.get("i2c_address", 0x70)) + self.bus_number = int(self.settings.get("bus_number", 1)) + + self.available = False + + if not self.ensure_python_modules([("smbus2", "smbus2")]): + return + + try: + import smbus2 # type: ignore[import-not-found] + self._smbus2 = smbus2 + + # Verify sensor is reachable: wake then immediately sleep + bus = smbus2.SMBus(self.bus_number) + try: + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_WAKE)) + time.sleep(0.002) + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_SLEEP)) + finally: + bus.close() + + self.available = True + self.log.info( + "SHTC3 initialized (addr=0x%02X, bus=%d)", + self.i2c_address, + self.bus_number, + ) + except Exception as exc: + self.log.warning( + "SHTC3 init failed (addr=0x%02X, bus=%d): %s", + self.i2c_address, + self.bus_number, + exc, + ) + self.available = False + + def _read(self) -> Dict[str, Any]: + """Read temperature and humidity from SHTC3.""" + if not self.available: + raise RuntimeError("SHTC3 device not available") + + smbus2 = self._smbus2 + bus = smbus2.SMBus(self.bus_number) + try: + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_WAKE)) + time.sleep(0.002) + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_MEAS)) + time.sleep(0.02) # measurement takes ~12 ms in normal power mode + + r = smbus2.i2c_msg.read(self.i2c_address, 6) + bus.i2c_rdwr(r) + data = list(r) + + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_SLEEP)) + + # Bytes: T_MSB, T_LSB, T_CRC, RH_MSB, RH_LSB, RH_CRC + t_raw = (data[0] << 8) | data[1] + rh_raw = (data[3] << 8) | data[4] + temp_c = round(-45.0 + 175.0 * t_raw / 65536.0, 2) + temp_f = round(temp_c * 9.0 / 5.0 + 32.0, 2) + rh = round(100.0 * rh_raw / 65536.0, 2) + + return { + "temperature_c": temp_c, + "temperature_f": temp_f, + "humidity_pct": rh, + } + except Exception as exc: + raise RuntimeError(f"SHTC3 read failed: {exc}") from exc + finally: + bus.close() diff --git a/repeater/sensors/waveshare_ups_d.py b/repeater/sensors/waveshare_ups_d.py new file mode 100644 index 0000000..5b62232 --- /dev/null +++ b/repeater/sensors/waveshare_ups_d.py @@ -0,0 +1,148 @@ +""" +Waveshare UPS HAT (D) battery monitor plug-in — INA219 at I2C 0x43. + +Reads a single 21700 Li-ion cell via INA219. Reports voltage, current, +power, battery percent, and charge state. + +Requires: pip install smbus2 + +Config example: + - type: waveshare_ups_d + name: "battery" + enabled: true + auto_install_packages: false + settings: + i2c_address: 0x43 # Waveshare UPS HAT (D) fixed address + bus_number: 1 # I2C bus number (1 for Raspberry Pi default) +""" + +from __future__ import annotations + +import time +from typing import Any, Dict, Optional + +from .base import SensorBase +from .registry import SensorRegistry + +# INA219 register addresses +_REG_CONFIG = 0x00 +_REG_SHUNT = 0x01 +_REG_BUS = 0x02 +_REG_POWER = 0x03 +_REG_CURRENT = 0x04 +_REG_CAL = 0x05 + +# 32V range, ±320mV gain, 128-sample averaging, continuous shunt+bus +_CONFIG_VALUE = 0x3FFF + +# Waveshare UPS HAT (D) calibration — 0.01Ω shunt (per Waveshare sample code) +# current_lsb ≈ 0.1524 mA/LSB, power_lsb = current_lsb × 20 +_CAL_VALUE = 26868 +_CURRENT_LSB = 0.0001524 # A per LSB +_POWER_LSB = _CURRENT_LSB * 20.0 # W per LSB + + +def _voltage_to_percent(v: float) -> int: + """Piecewise linear SoC estimate for a single 21700 Li-ion cell (3.0–4.2 V).""" + if v >= 4.20: return 100 + if v >= 4.00: return int(85 + (v - 4.00) / 0.20 * 15) + if v >= 3.80: return int(60 + (v - 3.80) / 0.20 * 25) + if v >= 3.70: return int(40 + (v - 3.70) / 0.10 * 20) + if v >= 3.50: return int(15 + (v - 3.50) / 0.20 * 25) + if v >= 3.00: return int( (v - 3.00) / 0.50 * 15) + return 0 + + +@SensorRegistry.register("waveshare_ups_d") +class WaveshareUpsDSensor(SensorBase): + sensor_type = "waveshare_ups_d" + + def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, log=None): + super().__init__(name=name, config=config, log=log) + + self.i2c_address = int(self.settings.get("i2c_address", 0x43)) + self.bus_number = int(self.settings.get("bus_number", 1)) + + self.available = False + + if not self.ensure_python_modules([("smbus2", "smbus2")]): + return + + try: + import smbus2 # type: ignore[import-not-found] + self._smbus2 = smbus2 + + bus = smbus2.SMBus(self.bus_number) + try: + self._write(bus, _REG_CONFIG, _CONFIG_VALUE) + self._write(bus, _REG_CAL, _CAL_VALUE) + # 128-sample averaging takes ~68 ms; wait for first conversion + time.sleep(0.15) + finally: + bus.close() + + self.available = True + self.log.info( + "Waveshare UPS HAT (D) INA219 initialized (addr=0x%02X, bus=%d)", + self.i2c_address, + self.bus_number, + ) + except Exception as exc: + self.log.warning( + "Waveshare UPS HAT (D) init failed (addr=0x%02X, bus=%d): %s", + self.i2c_address, + self.bus_number, + exc, + ) + + def _write(self, bus, reg: int, val: int) -> None: + bus.write_i2c_block_data( + self.i2c_address, reg, [(val >> 8) & 0xFF, val & 0xFF] + ) + + def _read_u(self, bus, reg: int) -> int: + d = bus.read_i2c_block_data(self.i2c_address, reg, 2) + return (d[0] << 8) | d[1] + + def _read_s(self, bus, reg: int) -> int: + v = self._read_u(bus, reg) + return v - 0x10000 if v & 0x8000 else v + + def _read(self) -> Dict[str, Any]: + """Read voltage, current, power, and derived battery state.""" + if not self.available: + raise RuntimeError("Waveshare UPS HAT (D) not available") + + try: + bus = self._smbus2.SMBus(self.bus_number) + try: + # Re-apply calibration in case of external reset + self._write(bus, _REG_CAL, _CAL_VALUE) + + bus_v = (self._read_u(bus, _REG_BUS) >> 3) * 4 / 1000.0 + shunt_mv = self._read_s(bus, _REG_SHUNT) * 0.01 + current_ma = self._read_s(bus, _REG_CURRENT) * _CURRENT_LSB * 1000.0 + power_mw = self._read_u(bus, _REG_POWER) * _POWER_LSB * 1000.0 + finally: + bus.close() + + pct = _voltage_to_percent(bus_v) + + # HAT (D) sign convention: negative = charging, positive = discharging + if current_ma < -50: + state = "charging" + elif current_ma > 50: + state = "discharging" + else: + state = "idle" + + return { + "bus_voltage_v": round(bus_v, 3), + "shunt_voltage_mv": round(shunt_mv, 2), + "current_ma": round(current_ma, 1), + "power_mw": round(power_mw, 1), + "battery_percent": pct, + "charge_state": state, + } + except Exception as exc: + raise RuntimeError(f"Waveshare UPS HAT (D) read failed: {exc}") from exc