From 9c1661f097d6e2e6eb9956b9e96c3ec1ed613f1f Mon Sep 17 00:00:00 2001 From: Zack Carlson Date: Mon, 18 May 2026 22:32:20 -0700 Subject: [PATCH 1/3] Add SHTC3 and Waveshare UPS HAT (D) sensor plug-ins MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - shtc3.py: SHTC3 temperature/humidity sensor (RAK1901 WisBlock module, I2C 0x70). Uses smbus2 i2c_rdwr for raw I2C reads since SHTC3 requires 16-bit command words with no register-byte prefix. Returns temperature_c, temperature_f, humidity_pct. - waveshare_ups_d.py: Waveshare UPS HAT (D) battery monitor via INA219 at I2C 0x43. Uses the HAT's actual shunt (0.01 Ω, CAL=26868) rather than the generic INA219 defaults. Returns bus_voltage_v, shunt_voltage_mv, current_ma, power_mw, battery_percent (piecewise-linear SoC for 21700 cell), and charge_state (charging/discharging/idle). Sign convention matches Waveshare sample code: negative current = charging. Both plug-ins tested on Raspberry Pi 3B+ (DietPi) with RAK1901 WisBlock sensor and Waveshare UPS HAT (D). Co-Authored-By: Claude Sonnet 4.6 --- repeater/sensors/shtc3.py | 110 +++++++++++++++++++++ repeater/sensors/waveshare_ups_d.py | 148 ++++++++++++++++++++++++++++ 2 files changed, 258 insertions(+) create mode 100644 repeater/sensors/shtc3.py create mode 100644 repeater/sensors/waveshare_ups_d.py diff --git a/repeater/sensors/shtc3.py b/repeater/sensors/shtc3.py new file mode 100644 index 0000000..7ec3099 --- /dev/null +++ b/repeater/sensors/shtc3.py @@ -0,0 +1,110 @@ +""" +SHTC3 temperature and humidity sensor plug-in (RAK1901 WisBlock sensor). + +Requires: pip install smbus2 + +The SHTC3 uses 16-bit command words and requires a raw I2C read (no register +byte prefix), so smbus2.i2c_rdwr is used instead of the standard SMBus API. + +Config example: + - type: shtc3 + name: "ambient" + enabled: true + auto_install_packages: false + settings: + i2c_address: 0x70 # SHTC3 fixed I2C address + bus_number: 1 # I2C bus number (1 for Raspberry Pi default) +""" + +from __future__ import annotations + +import time +from typing import Any, Dict, Optional + +from .base import SensorBase +from .registry import SensorRegistry + +# SHTC3 two-byte command words +_CMD_WAKE = [0x35, 0x17] +_CMD_MEAS = [0x7C, 0xA2] # T-first, normal power mode +_CMD_SLEEP = [0xB0, 0x98] + + +@SensorRegistry.register("shtc3") +class SHTC3Sensor(SensorBase): + sensor_type = "shtc3" + + def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, log=None): + super().__init__(name=name, config=config, log=log) + + self.i2c_address = int(self.settings.get("i2c_address", 0x70)) + self.bus_number = int(self.settings.get("bus_number", 1)) + + self.available = False + + if not self.ensure_python_modules([("smbus2", "smbus2")]): + return + + try: + import smbus2 # type: ignore[import-not-found] + self._smbus2 = smbus2 + + # Verify sensor is reachable: wake then immediately sleep + bus = smbus2.SMBus(self.bus_number) + try: + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_WAKE)) + time.sleep(0.002) + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_SLEEP)) + finally: + bus.close() + + self.available = True + self.log.info( + "SHTC3 initialized (addr=0x%02X, bus=%d)", + self.i2c_address, + self.bus_number, + ) + except Exception as exc: + self.log.warning( + "SHTC3 init failed (addr=0x%02X, bus=%d): %s", + self.i2c_address, + self.bus_number, + exc, + ) + self.available = False + + def _read(self) -> Dict[str, Any]: + """Read temperature and humidity from SHTC3.""" + if not self.available: + raise RuntimeError("SHTC3 device not available") + + smbus2 = self._smbus2 + bus = smbus2.SMBus(self.bus_number) + try: + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_WAKE)) + time.sleep(0.002) + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_MEAS)) + time.sleep(0.02) # measurement takes ~12 ms in normal power mode + + r = smbus2.i2c_msg.read(self.i2c_address, 6) + bus.i2c_rdwr(r) + data = list(r) + + bus.i2c_rdwr(smbus2.i2c_msg.write(self.i2c_address, _CMD_SLEEP)) + + # Bytes: T_MSB, T_LSB, T_CRC, RH_MSB, RH_LSB, RH_CRC + t_raw = (data[0] << 8) | data[1] + rh_raw = (data[3] << 8) | data[4] + temp_c = round(-45.0 + 175.0 * t_raw / 65536.0, 2) + temp_f = round(temp_c * 9.0 / 5.0 + 32.0, 2) + rh = round(100.0 * rh_raw / 65536.0, 2) + + return { + "temperature_c": temp_c, + "temperature_f": temp_f, + "humidity_pct": rh, + } + except Exception as exc: + raise RuntimeError(f"SHTC3 read failed: {exc}") from exc + finally: + bus.close() diff --git a/repeater/sensors/waveshare_ups_d.py b/repeater/sensors/waveshare_ups_d.py new file mode 100644 index 0000000..5b62232 --- /dev/null +++ b/repeater/sensors/waveshare_ups_d.py @@ -0,0 +1,148 @@ +""" +Waveshare UPS HAT (D) battery monitor plug-in — INA219 at I2C 0x43. + +Reads a single 21700 Li-ion cell via INA219. Reports voltage, current, +power, battery percent, and charge state. + +Requires: pip install smbus2 + +Config example: + - type: waveshare_ups_d + name: "battery" + enabled: true + auto_install_packages: false + settings: + i2c_address: 0x43 # Waveshare UPS HAT (D) fixed address + bus_number: 1 # I2C bus number (1 for Raspberry Pi default) +""" + +from __future__ import annotations + +import time +from typing import Any, Dict, Optional + +from .base import SensorBase +from .registry import SensorRegistry + +# INA219 register addresses +_REG_CONFIG = 0x00 +_REG_SHUNT = 0x01 +_REG_BUS = 0x02 +_REG_POWER = 0x03 +_REG_CURRENT = 0x04 +_REG_CAL = 0x05 + +# 32V range, ±320mV gain, 128-sample averaging, continuous shunt+bus +_CONFIG_VALUE = 0x3FFF + +# Waveshare UPS HAT (D) calibration — 0.01Ω shunt (per Waveshare sample code) +# current_lsb ≈ 0.1524 mA/LSB, power_lsb = current_lsb × 20 +_CAL_VALUE = 26868 +_CURRENT_LSB = 0.0001524 # A per LSB +_POWER_LSB = _CURRENT_LSB * 20.0 # W per LSB + + +def _voltage_to_percent(v: float) -> int: + """Piecewise linear SoC estimate for a single 21700 Li-ion cell (3.0–4.2 V).""" + if v >= 4.20: return 100 + if v >= 4.00: return int(85 + (v - 4.00) / 0.20 * 15) + if v >= 3.80: return int(60 + (v - 3.80) / 0.20 * 25) + if v >= 3.70: return int(40 + (v - 3.70) / 0.10 * 20) + if v >= 3.50: return int(15 + (v - 3.50) / 0.20 * 25) + if v >= 3.00: return int( (v - 3.00) / 0.50 * 15) + return 0 + + +@SensorRegistry.register("waveshare_ups_d") +class WaveshareUpsDSensor(SensorBase): + sensor_type = "waveshare_ups_d" + + def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, log=None): + super().__init__(name=name, config=config, log=log) + + self.i2c_address = int(self.settings.get("i2c_address", 0x43)) + self.bus_number = int(self.settings.get("bus_number", 1)) + + self.available = False + + if not self.ensure_python_modules([("smbus2", "smbus2")]): + return + + try: + import smbus2 # type: ignore[import-not-found] + self._smbus2 = smbus2 + + bus = smbus2.SMBus(self.bus_number) + try: + self._write(bus, _REG_CONFIG, _CONFIG_VALUE) + self._write(bus, _REG_CAL, _CAL_VALUE) + # 128-sample averaging takes ~68 ms; wait for first conversion + time.sleep(0.15) + finally: + bus.close() + + self.available = True + self.log.info( + "Waveshare UPS HAT (D) INA219 initialized (addr=0x%02X, bus=%d)", + self.i2c_address, + self.bus_number, + ) + except Exception as exc: + self.log.warning( + "Waveshare UPS HAT (D) init failed (addr=0x%02X, bus=%d): %s", + self.i2c_address, + self.bus_number, + exc, + ) + + def _write(self, bus, reg: int, val: int) -> None: + bus.write_i2c_block_data( + self.i2c_address, reg, [(val >> 8) & 0xFF, val & 0xFF] + ) + + def _read_u(self, bus, reg: int) -> int: + d = bus.read_i2c_block_data(self.i2c_address, reg, 2) + return (d[0] << 8) | d[1] + + def _read_s(self, bus, reg: int) -> int: + v = self._read_u(bus, reg) + return v - 0x10000 if v & 0x8000 else v + + def _read(self) -> Dict[str, Any]: + """Read voltage, current, power, and derived battery state.""" + if not self.available: + raise RuntimeError("Waveshare UPS HAT (D) not available") + + try: + bus = self._smbus2.SMBus(self.bus_number) + try: + # Re-apply calibration in case of external reset + self._write(bus, _REG_CAL, _CAL_VALUE) + + bus_v = (self._read_u(bus, _REG_BUS) >> 3) * 4 / 1000.0 + shunt_mv = self._read_s(bus, _REG_SHUNT) * 0.01 + current_ma = self._read_s(bus, _REG_CURRENT) * _CURRENT_LSB * 1000.0 + power_mw = self._read_u(bus, _REG_POWER) * _POWER_LSB * 1000.0 + finally: + bus.close() + + pct = _voltage_to_percent(bus_v) + + # HAT (D) sign convention: negative = charging, positive = discharging + if current_ma < -50: + state = "charging" + elif current_ma > 50: + state = "discharging" + else: + state = "idle" + + return { + "bus_voltage_v": round(bus_v, 3), + "shunt_voltage_mv": round(shunt_mv, 2), + "current_ma": round(current_ma, 1), + "power_mw": round(power_mw, 1), + "battery_percent": pct, + "charge_state": state, + } + except Exception as exc: + raise RuntimeError(f"Waveshare UPS HAT (D) read failed: {exc}") from exc From 8b0607aa1cd08bef0e419008448f7c58f170705b Mon Sep 17 00:00:00 2001 From: Zack Carlson Date: Mon, 18 May 2026 22:50:17 -0700 Subject: [PATCH 2/3] Add NMEA GPS sensor plug-in MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit nmea_gps.py reads NMEA 0183 sentences directly from a serial GPS receiver (/dev/serial0 by default) and exposes fix status, position, motion, accuracy, and satellite fields as sensor readings. Parses GGA, RMC, and GSA sentence types using stdlib only (no pynmea2 dependency) — pyserial is already required by the repeater. Designed for use when the repeater's built-in GPS service is disabled (gps.enabled: false). Both cannot share the serial port simultaneously. With gps.api_fallback_to_config_location: true the repeater continues advertising the manually-configured location while the sensor plugin handles raw GPS data. Returns: fix_valid, fix_quality, fix_type, latitude, longitude, altitude_m, speed_kmh, course_degrees, hdop, pdop, vdop, satellites_used, utc_datetime. Position and motion fields are null when fix_valid is false to avoid reporting config-fallback coordinates as real GPS data. Tested on Raspberry Pi 3B+ (DietPi) with a u-blox GPS module on /dev/serial0. Co-Authored-By: Claude Sonnet 4.6 --- repeater/sensors/nmea_gps.py | 264 +++++++++++++++++++++++++++++++++++ 1 file changed, 264 insertions(+) create mode 100644 repeater/sensors/nmea_gps.py diff --git a/repeater/sensors/nmea_gps.py b/repeater/sensors/nmea_gps.py new file mode 100644 index 0000000..98f8c1c --- /dev/null +++ b/repeater/sensors/nmea_gps.py @@ -0,0 +1,264 @@ +""" +NMEA GPS sensor plug-in. + +Reads NMEA 0183 sentences directly from a serial GPS receiver and exposes +fix status, position, motion, accuracy, and satellite fields as sensor +readings. + +The repeater's built-in GPS service must be disabled (gps.enabled: false in +config.yaml) when using this plug-in. Both cannot share the serial port +simultaneously. Set gps.api_fallback_to_config_location: true so the +repeater continues advertising the manually-configured lat/lon. + +Requires: pyserial (already installed with pyMC_Repeater) + +Config example: + - type: nmea_gps + name: "gps" + enabled: true + auto_install_packages: false + settings: + device: /dev/serial0 # Serial device path + baud_rate: 9600 # GPS baud rate (usually 9600 or 115200) + read_timeout_seconds: 3.0 # Max time to wait for a GGA+RMC sentence pair +""" + +from __future__ import annotations + +import time +from typing import Any, Dict, Optional + +from .base import SensorBase +from .registry import SensorRegistry + +_FIX_QUALITY = { + "0": "no fix", "1": "GPS", "2": "DGPS", + "4": "RTK fixed", "5": "RTK float", + "6": "estimated", "7": "manual", "8": "simulation", +} +_GSA_FIX_TYPE = {"1": "no fix", "2": "2D fix", "3": "3D fix"} + + +def _checksum_valid(sentence: str) -> bool: + if "*" not in sentence: + return True # no checksum present — accept + try: + payload, cs_str = sentence[1:].rsplit("*", 1) + expected = 0 + for ch in payload: + expected ^= ord(ch) + return expected == int(cs_str[:2], 16) + except (ValueError, IndexError): + return False + + +def _nmea_coord(value: str, hemisphere: str) -> Optional[float]: + """Convert NMEA DDDMM.MMMMM + hemisphere to signed decimal degrees.""" + if not value: + return None + try: + # Latitude is DDMM, longitude is DDDMM + dot = value.index(".") + degrees = float(value[: dot - 2]) + minutes = float(value[dot - 2 :]) + decimal = degrees + minutes / 60.0 + if hemisphere.upper() in ("S", "W"): + decimal *= -1 + return round(decimal, 8) + except (ValueError, IndexError): + return None + + +def _to_float(value: str) -> Optional[float]: + try: + return float(value) if value else None + except ValueError: + return None + + +def _to_int(value: str) -> Optional[int]: + try: + return int(value) if value else None + except ValueError: + return None + + +def _parse_gga(fields: list) -> dict: + """Parse $xxGGA sentence fields into a dict.""" + # $xxGGA,time,lat,N/S,lon,E/W,quality,numSV,HDOP,alt,M,sep,M,... + try: + return { + "latitude": _nmea_coord(fields[2], fields[3]) if len(fields) > 3 else None, + "longitude": _nmea_coord(fields[4], fields[5]) if len(fields) > 5 else None, + "fix_quality": fields[6] if len(fields) > 6 else "0", + "satellites_used": _to_int(fields[7]) if len(fields) > 7 else None, + "hdop": _to_float(fields[8]) if len(fields) > 8 else None, + "altitude_m": _to_float(fields[9]) if len(fields) > 9 else None, + } + except Exception: + return {} + + +def _parse_rmc(fields: list) -> dict: + """Parse $xxRMC sentence fields into a dict.""" + # $xxRMC,time,status,lat,N/S,lon,E/W,speed_kn,course,date,... + try: + status = fields[2] if len(fields) > 2 else "V" + date_str = fields[9] if len(fields) > 9 else "" + time_str = fields[1] if len(fields) > 1 else "" + utc_dt = None + if len(date_str) == 6 and len(time_str) >= 6: + d, m, y = date_str[0:2], date_str[2:4], date_str[4:6] + h, mi, s = time_str[0:2], time_str[2:4], time_str[4:6] + year = 2000 + int(y) if int(y) < 80 else 1900 + int(y) + utc_dt = f"{year}-{m}-{d}T{h}:{mi}:{s}Z" + speed_kn = _to_float(fields[7]) if len(fields) > 7 else None + return { + "fix_valid": status == "A", + "speed_kmh": round(speed_kn * 1.852, 2) if speed_kn is not None else None, + "course_degrees": _to_float(fields[8]) if len(fields) > 8 else None, + "utc_datetime": utc_dt, + } + except Exception: + return {} + + +def _parse_gsa(fields: list) -> dict: + """Parse $xxGSA sentence fields into a dict.""" + # $xxGSA,mode,fixType,sv...,PDOP,HDOP,VDOP[,systemId] + try: + fix_type = fields[2] if len(fields) > 2 else "1" + # PDOP/HDOP/VDOP are at indices 15/16/17 (after 12 SV slots at 3-14) + pdop = _to_float(fields[15]) if len(fields) > 15 else None + hdop = _to_float(fields[16]) if len(fields) > 16 else None + # VDOP field may contain trailing checksum — strip it + vdop_raw = fields[17].split("*")[0] if len(fields) > 17 else "" + vdop = _to_float(vdop_raw) + return { + "fix_type": fix_type, + "pdop": pdop, + "hdop": hdop, + "vdop": vdop, + } + except Exception: + return {} + + +@SensorRegistry.register("nmea_gps") +class NmeaGpsSensor(SensorBase): + sensor_type = "nmea_gps" + + def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, log=None): + super().__init__(name=name, config=config, log=log) + + self.device = self.settings.get("device", "/dev/serial0") + self.baud_rate = int(self.settings.get("baud_rate", 9600)) + self.read_timeout = float(self.settings.get("read_timeout_seconds", 3.0)) + + self.available = False + + if not self.ensure_python_modules([("serial", "pyserial")]): + return + + try: + import serial # type: ignore[import-not-found] + self._serial = serial + + # Verify port is accessible + port = serial.Serial(self.device, self.baud_rate, timeout=1.0) + port.close() + + self.available = True + self.log.info( + "NMEA GPS initialized (device=%s, baud=%d)", + self.device, + self.baud_rate, + ) + except Exception as exc: + self.log.warning("NMEA GPS init failed (%s): %s", self.device, exc) + + def _read(self) -> Dict[str, Any]: + """Read one sentence cycle from the GPS receiver and return parsed fields.""" + if not self.available: + raise RuntimeError("NMEA GPS not available") + + gga: Optional[list] = None + rmc: Optional[list] = None + gsa: Optional[list] = None + + deadline = time.monotonic() + self.read_timeout + + try: + port = self._serial.Serial(self.device, self.baud_rate, timeout=0.5) + except Exception as exc: + raise RuntimeError(f"NMEA GPS serial open failed: {exc}") from exc + + try: + while time.monotonic() < deadline: + try: + raw = port.readline() + except Exception: + continue + + try: + line = raw.decode("ascii", errors="replace").strip() + except Exception: + continue + + if not line.startswith("$") or not _checksum_valid(line): + continue + + # Sentence type is chars 3-5 (strip 2-char talker prefix, e.g. GN/GP/GL) + fields = line.split(",") + if len(fields[0]) < 6: + continue + sentence_type = fields[0][3:] # GGA, RMC, GSA, … + + if sentence_type == "GGA" and gga is None: + gga = fields + elif sentence_type == "RMC" and rmc is None: + rmc = fields + elif sentence_type == "GSA" and gsa is None: + gsa = fields + + if gga is not None and rmc is not None: + break # GSA is optional; stop as soon as we have the essentials + finally: + port.close() + + if gga is None and rmc is None: + raise RuntimeError( + f"NMEA GPS: no sentences received within {self.read_timeout}s" + ) + + gga_data = _parse_gga(gga) if gga else {} + rmc_data = _parse_rmc(rmc) if rmc else {} + gsa_data = _parse_gsa(gsa) if gsa else {} + + fix_valid = rmc_data.get("fix_valid", False) + fix_quality = _FIX_QUALITY.get(gga_data.get("fix_quality", "0"), "no fix") + fix_type = _GSA_FIX_TYPE.get(gsa_data.get("fix_type", "1"), "no fix") + + # Only report position/motion when fix is valid + latitude = gga_data.get("latitude") if fix_valid else None + longitude = gga_data.get("longitude") if fix_valid else None + altitude_m = gga_data.get("altitude_m") if fix_valid else None + + # HDOP: prefer GSA (averaged across all GNSS systems); fall back to GGA + hdop = gsa_data.get("hdop") or gga_data.get("hdop") + + return { + "fix_valid": fix_valid, + "fix_quality": fix_quality, + "fix_type": fix_type, + "latitude": latitude, + "longitude": longitude, + "altitude_m": altitude_m, + "speed_kmh": rmc_data.get("speed_kmh") if fix_valid else None, + "course_degrees": rmc_data.get("course_degrees") if fix_valid else None, + "hdop": hdop, + "pdop": gsa_data.get("pdop"), + "vdop": gsa_data.get("vdop"), + "satellites_used": gga_data.get("satellites_used"), + "utc_datetime": rmc_data.get("utc_datetime"), + } From f88d3c52bef75c2bdede1b1541f86c9635dde79c Mon Sep 17 00:00:00 2001 From: Zack Carlson Date: Mon, 18 May 2026 22:55:05 -0700 Subject: [PATCH 3/3] Revert "Add NMEA GPS sensor plug-in" This reverts commit 8b0607aa1cd08bef0e419008448f7c58f170705b. --- repeater/sensors/nmea_gps.py | 264 ----------------------------------- 1 file changed, 264 deletions(-) delete mode 100644 repeater/sensors/nmea_gps.py diff --git a/repeater/sensors/nmea_gps.py b/repeater/sensors/nmea_gps.py deleted file mode 100644 index 98f8c1c..0000000 --- a/repeater/sensors/nmea_gps.py +++ /dev/null @@ -1,264 +0,0 @@ -""" -NMEA GPS sensor plug-in. - -Reads NMEA 0183 sentences directly from a serial GPS receiver and exposes -fix status, position, motion, accuracy, and satellite fields as sensor -readings. - -The repeater's built-in GPS service must be disabled (gps.enabled: false in -config.yaml) when using this plug-in. Both cannot share the serial port -simultaneously. Set gps.api_fallback_to_config_location: true so the -repeater continues advertising the manually-configured lat/lon. - -Requires: pyserial (already installed with pyMC_Repeater) - -Config example: - - type: nmea_gps - name: "gps" - enabled: true - auto_install_packages: false - settings: - device: /dev/serial0 # Serial device path - baud_rate: 9600 # GPS baud rate (usually 9600 or 115200) - read_timeout_seconds: 3.0 # Max time to wait for a GGA+RMC sentence pair -""" - -from __future__ import annotations - -import time -from typing import Any, Dict, Optional - -from .base import SensorBase -from .registry import SensorRegistry - -_FIX_QUALITY = { - "0": "no fix", "1": "GPS", "2": "DGPS", - "4": "RTK fixed", "5": "RTK float", - "6": "estimated", "7": "manual", "8": "simulation", -} -_GSA_FIX_TYPE = {"1": "no fix", "2": "2D fix", "3": "3D fix"} - - -def _checksum_valid(sentence: str) -> bool: - if "*" not in sentence: - return True # no checksum present — accept - try: - payload, cs_str = sentence[1:].rsplit("*", 1) - expected = 0 - for ch in payload: - expected ^= ord(ch) - return expected == int(cs_str[:2], 16) - except (ValueError, IndexError): - return False - - -def _nmea_coord(value: str, hemisphere: str) -> Optional[float]: - """Convert NMEA DDDMM.MMMMM + hemisphere to signed decimal degrees.""" - if not value: - return None - try: - # Latitude is DDMM, longitude is DDDMM - dot = value.index(".") - degrees = float(value[: dot - 2]) - minutes = float(value[dot - 2 :]) - decimal = degrees + minutes / 60.0 - if hemisphere.upper() in ("S", "W"): - decimal *= -1 - return round(decimal, 8) - except (ValueError, IndexError): - return None - - -def _to_float(value: str) -> Optional[float]: - try: - return float(value) if value else None - except ValueError: - return None - - -def _to_int(value: str) -> Optional[int]: - try: - return int(value) if value else None - except ValueError: - return None - - -def _parse_gga(fields: list) -> dict: - """Parse $xxGGA sentence fields into a dict.""" - # $xxGGA,time,lat,N/S,lon,E/W,quality,numSV,HDOP,alt,M,sep,M,... - try: - return { - "latitude": _nmea_coord(fields[2], fields[3]) if len(fields) > 3 else None, - "longitude": _nmea_coord(fields[4], fields[5]) if len(fields) > 5 else None, - "fix_quality": fields[6] if len(fields) > 6 else "0", - "satellites_used": _to_int(fields[7]) if len(fields) > 7 else None, - "hdop": _to_float(fields[8]) if len(fields) > 8 else None, - "altitude_m": _to_float(fields[9]) if len(fields) > 9 else None, - } - except Exception: - return {} - - -def _parse_rmc(fields: list) -> dict: - """Parse $xxRMC sentence fields into a dict.""" - # $xxRMC,time,status,lat,N/S,lon,E/W,speed_kn,course,date,... - try: - status = fields[2] if len(fields) > 2 else "V" - date_str = fields[9] if len(fields) > 9 else "" - time_str = fields[1] if len(fields) > 1 else "" - utc_dt = None - if len(date_str) == 6 and len(time_str) >= 6: - d, m, y = date_str[0:2], date_str[2:4], date_str[4:6] - h, mi, s = time_str[0:2], time_str[2:4], time_str[4:6] - year = 2000 + int(y) if int(y) < 80 else 1900 + int(y) - utc_dt = f"{year}-{m}-{d}T{h}:{mi}:{s}Z" - speed_kn = _to_float(fields[7]) if len(fields) > 7 else None - return { - "fix_valid": status == "A", - "speed_kmh": round(speed_kn * 1.852, 2) if speed_kn is not None else None, - "course_degrees": _to_float(fields[8]) if len(fields) > 8 else None, - "utc_datetime": utc_dt, - } - except Exception: - return {} - - -def _parse_gsa(fields: list) -> dict: - """Parse $xxGSA sentence fields into a dict.""" - # $xxGSA,mode,fixType,sv...,PDOP,HDOP,VDOP[,systemId] - try: - fix_type = fields[2] if len(fields) > 2 else "1" - # PDOP/HDOP/VDOP are at indices 15/16/17 (after 12 SV slots at 3-14) - pdop = _to_float(fields[15]) if len(fields) > 15 else None - hdop = _to_float(fields[16]) if len(fields) > 16 else None - # VDOP field may contain trailing checksum — strip it - vdop_raw = fields[17].split("*")[0] if len(fields) > 17 else "" - vdop = _to_float(vdop_raw) - return { - "fix_type": fix_type, - "pdop": pdop, - "hdop": hdop, - "vdop": vdop, - } - except Exception: - return {} - - -@SensorRegistry.register("nmea_gps") -class NmeaGpsSensor(SensorBase): - sensor_type = "nmea_gps" - - def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, log=None): - super().__init__(name=name, config=config, log=log) - - self.device = self.settings.get("device", "/dev/serial0") - self.baud_rate = int(self.settings.get("baud_rate", 9600)) - self.read_timeout = float(self.settings.get("read_timeout_seconds", 3.0)) - - self.available = False - - if not self.ensure_python_modules([("serial", "pyserial")]): - return - - try: - import serial # type: ignore[import-not-found] - self._serial = serial - - # Verify port is accessible - port = serial.Serial(self.device, self.baud_rate, timeout=1.0) - port.close() - - self.available = True - self.log.info( - "NMEA GPS initialized (device=%s, baud=%d)", - self.device, - self.baud_rate, - ) - except Exception as exc: - self.log.warning("NMEA GPS init failed (%s): %s", self.device, exc) - - def _read(self) -> Dict[str, Any]: - """Read one sentence cycle from the GPS receiver and return parsed fields.""" - if not self.available: - raise RuntimeError("NMEA GPS not available") - - gga: Optional[list] = None - rmc: Optional[list] = None - gsa: Optional[list] = None - - deadline = time.monotonic() + self.read_timeout - - try: - port = self._serial.Serial(self.device, self.baud_rate, timeout=0.5) - except Exception as exc: - raise RuntimeError(f"NMEA GPS serial open failed: {exc}") from exc - - try: - while time.monotonic() < deadline: - try: - raw = port.readline() - except Exception: - continue - - try: - line = raw.decode("ascii", errors="replace").strip() - except Exception: - continue - - if not line.startswith("$") or not _checksum_valid(line): - continue - - # Sentence type is chars 3-5 (strip 2-char talker prefix, e.g. GN/GP/GL) - fields = line.split(",") - if len(fields[0]) < 6: - continue - sentence_type = fields[0][3:] # GGA, RMC, GSA, … - - if sentence_type == "GGA" and gga is None: - gga = fields - elif sentence_type == "RMC" and rmc is None: - rmc = fields - elif sentence_type == "GSA" and gsa is None: - gsa = fields - - if gga is not None and rmc is not None: - break # GSA is optional; stop as soon as we have the essentials - finally: - port.close() - - if gga is None and rmc is None: - raise RuntimeError( - f"NMEA GPS: no sentences received within {self.read_timeout}s" - ) - - gga_data = _parse_gga(gga) if gga else {} - rmc_data = _parse_rmc(rmc) if rmc else {} - gsa_data = _parse_gsa(gsa) if gsa else {} - - fix_valid = rmc_data.get("fix_valid", False) - fix_quality = _FIX_QUALITY.get(gga_data.get("fix_quality", "0"), "no fix") - fix_type = _GSA_FIX_TYPE.get(gsa_data.get("fix_type", "1"), "no fix") - - # Only report position/motion when fix is valid - latitude = gga_data.get("latitude") if fix_valid else None - longitude = gga_data.get("longitude") if fix_valid else None - altitude_m = gga_data.get("altitude_m") if fix_valid else None - - # HDOP: prefer GSA (averaged across all GNSS systems); fall back to GGA - hdop = gsa_data.get("hdop") or gga_data.get("hdop") - - return { - "fix_valid": fix_valid, - "fix_quality": fix_quality, - "fix_type": fix_type, - "latitude": latitude, - "longitude": longitude, - "altitude_m": altitude_m, - "speed_kmh": rmc_data.get("speed_kmh") if fix_valid else None, - "course_degrees": rmc_data.get("course_degrees") if fix_valid else None, - "hdop": hdop, - "pdop": gsa_data.get("pdop"), - "vdop": gsa_data.get("vdop"), - "satellites_used": gga_data.get("satellites_used"), - "utc_datetime": rmc_data.get("utc_datetime"), - }