mirror of
https://github.com/pyMC-dev/pyMC_Repeater.git
synced 2026-06-26 13:01:06 +02:00
fix: add generic PyMC modem stats sensor
This commit is contained in:
@@ -139,6 +139,7 @@ gps:
|
||||
# Source type:
|
||||
# serial = read directly from an attached GPS module
|
||||
# file = read NMEA lines from source_path (useful for gpsd/sidecar bridges)
|
||||
# modem_http = poll a modem's generic HTTP /api/stats GPS JSON
|
||||
source: serial
|
||||
|
||||
# Serial source settings (used when source: serial)
|
||||
@@ -153,6 +154,17 @@ gps:
|
||||
source_path: "/var/lib/pymc_repeater/gps_nmea.txt"
|
||||
poll_interval_seconds: 2.0
|
||||
|
||||
# Modem HTTP source settings (used when source: modem_http)
|
||||
# This is intentionally modem-generic. Today it can point at the Photon
|
||||
# modem's HTTP API; later it can point at another board such as Heltec V4
|
||||
# if that modem exposes the same GPS JSON shape.
|
||||
host: ""
|
||||
port: 80
|
||||
endpoint: "/api/stats"
|
||||
scheme: "http"
|
||||
username: "admin"
|
||||
password: null
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Location behaviour
|
||||
# Three independent controls — read the comments carefully, they do
|
||||
@@ -276,6 +288,21 @@ sensors:
|
||||
# bus_number: 1 # I2C bus number (1 for Raspberry Pi default)
|
||||
# low_cell_mv: 3150 # Per-cell warning threshold in mV
|
||||
|
||||
# Example pyMC modem sensor. This reads a networked pyMC modem's
|
||||
# HTTP /api/stats JSON API for modem diagnostics, including GPS fields
|
||||
# when the modem exposes them. If password is set to the same value as
|
||||
# pymc_tcp.token, the repeater can use one shared modem secret.
|
||||
# - type: pymc_modem
|
||||
# name: modem
|
||||
# enabled: true
|
||||
# settings:
|
||||
# host: "pymc-modem.local" # or the modem LAN IP
|
||||
# port: 80
|
||||
# endpoint: "/api/stats"
|
||||
# username: "admin"
|
||||
# password: "" # modem HTTP password; may match pymc_tcp.token
|
||||
# timeout_seconds: 2.0
|
||||
|
||||
|
||||
|
||||
# Mesh Network Configuration
|
||||
|
||||
@@ -214,6 +214,12 @@ def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
|
||||
"baud_rate": 9600,
|
||||
"read_timeout_seconds": 1.0,
|
||||
"reconnect_interval_seconds": 5.0,
|
||||
"host": "",
|
||||
"port": 80,
|
||||
"endpoint": "/api/stats",
|
||||
"scheme": "http",
|
||||
"username": "admin",
|
||||
"password": None,
|
||||
"stale_after_seconds": 10.0,
|
||||
"retain_sentences": 25,
|
||||
"validate_checksum": True,
|
||||
|
||||
@@ -14,6 +14,9 @@ import logging
|
||||
import math
|
||||
import threading
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from base64 import b64encode
|
||||
from collections import Counter, deque
|
||||
from copy import deepcopy
|
||||
from datetime import datetime, timezone
|
||||
@@ -60,6 +63,20 @@ def _to_int(value: Any) -> Optional[int]:
|
||||
return None
|
||||
|
||||
|
||||
def _to_bool(value: Any) -> Optional[bool]:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, bool):
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
lowered = value.strip().lower()
|
||||
if lowered in {"1", "true", "yes", "on"}:
|
||||
return True
|
||||
if lowered in {"0", "false", "no", "off"}:
|
||||
return False
|
||||
return bool(value)
|
||||
|
||||
|
||||
def _is_valid_latitude(value: Optional[float]) -> bool:
|
||||
return value is not None and -90.0 <= value <= 90.0
|
||||
|
||||
@@ -264,6 +281,87 @@ class NMEAParser:
|
||||
for line in lines:
|
||||
self.ingest_sentence(line)
|
||||
|
||||
def ingest_modem_payload(self, payload: Dict[str, Any]) -> bool:
|
||||
"""Ingest a generic pyMC modem HTTP GPS payload.
|
||||
|
||||
This is for modems that expose parsed GPS JSON instead of raw NMEA.
|
||||
The payload may be the whole /api/stats response or the nested gps dict.
|
||||
"""
|
||||
if not isinstance(payload, dict):
|
||||
return False
|
||||
|
||||
gps = payload.get("gps") if isinstance(payload.get("gps"), dict) else payload
|
||||
position = self._first_dict(
|
||||
gps.get("position"), gps.get("gps_position"), gps.get("location"), payload.get("position")
|
||||
)
|
||||
fix = self._first_dict(gps.get("fix"), payload.get("fix"))
|
||||
satellites = self._first_dict(gps.get("satellites"), payload.get("satellites"))
|
||||
time_data = self._first_dict(gps.get("time"), gps.get("time_data"), payload.get("time_data"))
|
||||
motion = self._first_dict(gps.get("motion"), payload.get("motion"))
|
||||
|
||||
latitude = _to_float(position.get("latitude"))
|
||||
longitude = _to_float(position.get("longitude"))
|
||||
fix_valid = _to_bool(fix.get("valid", gps.get("fix_valid")))
|
||||
if fix_valid is None:
|
||||
fix_valid = _is_valid_latitude(latitude) and _is_valid_longitude(longitude)
|
||||
|
||||
with self._lock:
|
||||
now = time.time()
|
||||
self.last_update = now
|
||||
self.last_error = None
|
||||
self.nmea["last_sentence"] = None
|
||||
self.nmea["last_sentence_type"] = "MODEM_HTTP"
|
||||
self.nmea["last_talker"] = "MODEM"
|
||||
self.nmea["seen_sentence_types"] = ["MODEM_HTTP"]
|
||||
self._sentence_counters["MODEM_HTTP"] += 1
|
||||
self.nmea["sentence_counters"] = dict(self._sentence_counters)
|
||||
|
||||
if latitude is not None:
|
||||
self.position["latitude"] = latitude
|
||||
if longitude is not None:
|
||||
self.position["longitude"] = longitude
|
||||
altitude = _to_float(position.get("altitude_m"))
|
||||
if altitude is not None:
|
||||
self.position["altitude_m"] = altitude
|
||||
|
||||
self.fix["valid"] = bool(fix_valid)
|
||||
quality = _to_int(fix.get("quality", gps.get("fix_quality")))
|
||||
if quality is not None:
|
||||
self.fix["quality"] = quality
|
||||
self.fix["quality_label"] = FIX_QUALITY_LABELS.get(quality, f"quality {quality}")
|
||||
elif fix_valid:
|
||||
self.fix["quality_label"] = "modem valid"
|
||||
|
||||
used = _to_int(satellites.get("used_count", satellites.get("satellites_used")))
|
||||
if used is not None:
|
||||
self.satellites["used_count"] = used
|
||||
in_view = _to_int(satellites.get("in_view_count", satellites.get("satellites_in_view")))
|
||||
if in_view is not None:
|
||||
self.satellites["in_view_count"] = in_view
|
||||
|
||||
for key in ("datetime_utc", "utc_time", "date"):
|
||||
value = time_data.get(key) or payload.get(key)
|
||||
if value:
|
||||
self.time_data[key] = str(value)
|
||||
|
||||
speed_kmh = _to_float(motion.get("speed_kmh", payload.get("speed_kmh")))
|
||||
if speed_kmh is not None:
|
||||
self.motion["speed_kmh"] = speed_kmh
|
||||
self.motion["speed_knots"] = round(speed_kmh / 1.852, 3)
|
||||
course = _to_float(motion.get("course_degrees", payload.get("course_degrees")))
|
||||
if course is not None:
|
||||
self.motion["course_degrees"] = course
|
||||
|
||||
self.raw_attributes["MODEM_HTTP"] = {
|
||||
"gps_enabled": gps.get("enabled"),
|
||||
"gps_seen": gps.get("seen"),
|
||||
"battery_voltage_mv": payload.get("battery_voltage_mv"),
|
||||
"battery_voltage_v": payload.get("battery_voltage_v"),
|
||||
"solar_charge_rate_percent_per_hour": payload.get("solar_charge_rate_percent_per_hour"),
|
||||
}
|
||||
self._refresh_derived_unlocked()
|
||||
return True
|
||||
|
||||
def ingest_sentence(self, sentence: str) -> bool:
|
||||
parsed = self._split_sentence(sentence)
|
||||
with self._lock:
|
||||
@@ -513,6 +611,13 @@ class NMEAParser:
|
||||
def _field(fields: List[str], index: int) -> str:
|
||||
return fields[index] if index < len(fields) else ""
|
||||
|
||||
@staticmethod
|
||||
def _first_dict(*values: Any) -> Dict[str, Any]:
|
||||
for value in values:
|
||||
if isinstance(value, dict):
|
||||
return value
|
||||
return {}
|
||||
|
||||
def _normalize_raw_attributes_unlocked(self):
|
||||
if self._unhandled_sentence_types:
|
||||
self.raw_attributes["unhandled_sentence_types"] = sorted(self._unhandled_sentence_types)
|
||||
@@ -617,6 +722,14 @@ class GPSService:
|
||||
self.reconnect_interval_seconds = float(gps_config.get("reconnect_interval_seconds", 5.0))
|
||||
self.poll_interval_seconds = float(gps_config.get("poll_interval_seconds", 2.0))
|
||||
self.source_path = gps_config.get("source_path") or gps_config.get("snapshot_path")
|
||||
self.modem_http_host = str(gps_config.get("host", "") or "").strip()
|
||||
self.modem_http_port = int(gps_config.get("port", 80))
|
||||
self.modem_http_endpoint = str(gps_config.get("endpoint", "/api/stats") or "/api/stats")
|
||||
if not self.modem_http_endpoint.startswith("/"):
|
||||
self.modem_http_endpoint = "/" + self.modem_http_endpoint
|
||||
self.modem_http_scheme = str(gps_config.get("scheme", "http") or "http")
|
||||
self.modem_http_username = str(gps_config.get("username", "admin") or "admin")
|
||||
self.modem_http_password = gps_config.get("password")
|
||||
self.repeater_config = repeater_config
|
||||
self.time_sync_enabled = bool(gps_config.get("time_sync_enabled", True))
|
||||
self.time_sync_interval_seconds = max(
|
||||
@@ -687,7 +800,12 @@ class GPSService:
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
|
||||
target = self._run_file_loop if self.source == "file" else self._run_serial_loop
|
||||
if self.source == "file":
|
||||
target = self._run_file_loop
|
||||
elif self.source in ("modem_http", "pymc_modem", "http"):
|
||||
target = self._run_modem_http_loop
|
||||
else:
|
||||
target = self._run_serial_loop
|
||||
self._stop_event.clear()
|
||||
self._thread = threading.Thread(target=target, name="gps-service", daemon=True)
|
||||
self._thread.start()
|
||||
@@ -800,6 +918,9 @@ class GPSService:
|
||||
"device": self.device if self.source == "serial" else None,
|
||||
"baud_rate": self.baud_rate if self.source == "serial" else None,
|
||||
"source_path": self.source_path if self.source == "file" else None,
|
||||
"host": self.modem_http_host if self.source in ("modem_http", "pymc_modem", "http") else None,
|
||||
"port": self.modem_http_port if self.source in ("modem_http", "pymc_modem", "http") else None,
|
||||
"endpoint": self.modem_http_endpoint if self.source in ("modem_http", "pymc_modem", "http") else None,
|
||||
"read_timeout_seconds": self.read_timeout_seconds,
|
||||
"poll_interval_seconds": self.poll_interval_seconds,
|
||||
"stale_after_seconds": self.parser.stale_after_seconds,
|
||||
@@ -1165,6 +1286,45 @@ class GPSService:
|
||||
|
||||
self._running = False
|
||||
|
||||
def _run_modem_http_loop(self):
|
||||
if not self.modem_http_host:
|
||||
self._set_source_error("gps.host is required for modem_http source")
|
||||
self._running = False
|
||||
return
|
||||
|
||||
url = f"{self.modem_http_scheme}://{self.modem_http_host}:{self.modem_http_port}{self.modem_http_endpoint}"
|
||||
while not self._stop_event.is_set():
|
||||
request = urllib.request.Request(url, headers={"Accept": "application/json"})
|
||||
if self.modem_http_password not in (None, ""):
|
||||
raw_auth = f"{self.modem_http_username}:{self.modem_http_password}".encode("utf-8")
|
||||
request.add_header(
|
||||
"Authorization", "Basic " + b64encode(raw_auth).decode("ascii")
|
||||
)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=self.read_timeout_seconds) as response:
|
||||
status = int(getattr(response, "status", 200) or 200)
|
||||
body = response.read()
|
||||
if status < 200 or status >= 300:
|
||||
raise RuntimeError(f"modem HTTP {status} reading {url}")
|
||||
payload = json.loads(body.decode("utf-8"))
|
||||
if not isinstance(payload, dict):
|
||||
raise RuntimeError("modem GPS response was not a JSON object")
|
||||
if not self.parser.ingest_modem_payload(payload):
|
||||
raise RuntimeError("modem GPS response did not contain a usable GPS payload")
|
||||
self._set_source_error(None)
|
||||
self._maybe_sync_system_time()
|
||||
self._maybe_update_repeater_location()
|
||||
except urllib.error.HTTPError as exc:
|
||||
self._set_source_error(f"modem HTTP {exc.code} reading {url}")
|
||||
except urllib.error.URLError as exc:
|
||||
self._set_source_error(f"modem GPS request failed: {exc.reason}")
|
||||
except Exception as exc:
|
||||
self._set_source_error(f"{type(exc).__name__}: {exc}")
|
||||
self._stop_event.wait(self.poll_interval_seconds)
|
||||
|
||||
self._running = False
|
||||
|
||||
@staticmethod
|
||||
def _extract_file_sentences(content: str) -> List[str]:
|
||||
stripped = content.strip()
|
||||
|
||||
@@ -0,0 +1,154 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from typing import Any, Dict, Optional
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from .base import SensorBase
|
||||
from .registry import SensorRegistry
|
||||
|
||||
|
||||
@SensorRegistry.register("pymc_modem")
|
||||
class PymcModemSensor(SensorBase):
|
||||
"""Read diagnostics exposed by a pyMC modem HTTP API."""
|
||||
|
||||
sensor_type = "pymc_modem"
|
||||
|
||||
def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, log=None):
|
||||
super().__init__(name=name, config=config, log=log)
|
||||
self.timeout_seconds = float(self.settings.get("timeout_seconds", 2.0))
|
||||
self.endpoint = str(self.settings.get("endpoint", "/api/stats") or "/api/stats")
|
||||
self.url = self._build_url()
|
||||
self.username = str(self.settings.get("username", "admin") or "admin")
|
||||
self.password = self.settings.get("password")
|
||||
|
||||
def _build_url(self) -> str:
|
||||
base_url = self.settings.get("base_url")
|
||||
if base_url:
|
||||
base = str(base_url).rstrip("/") + "/"
|
||||
return urljoin(base, self.endpoint.lstrip("/"))
|
||||
|
||||
host = str(self.settings.get("host", "") or "").strip()
|
||||
if not host:
|
||||
raise ValueError("pymc_modem requires settings.host or settings.base_url")
|
||||
scheme = str(self.settings.get("scheme", "http") or "http")
|
||||
port = self.settings.get("port")
|
||||
netloc = host
|
||||
if port not in (None, ""):
|
||||
netloc = f"{host}:{int(port)}"
|
||||
return f"{scheme}://{netloc}{self.endpoint if self.endpoint.startswith('/') else '/' + self.endpoint}"
|
||||
|
||||
def _read(self) -> Dict[str, Any]:
|
||||
request = urllib.request.Request(self.url, headers={"Accept": "application/json"})
|
||||
if self.password not in (None, ""):
|
||||
raw = f"{self.username}:{self.password}".encode("utf-8")
|
||||
request.add_header("Authorization", "Basic " + base64.b64encode(raw).decode("ascii"))
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=self.timeout_seconds) as response:
|
||||
status = int(getattr(response, "status", 200) or 200)
|
||||
body = response.read()
|
||||
except urllib.error.HTTPError as exc:
|
||||
raise RuntimeError(f"pyMC modem HTTP {exc.code} reading {self.url}") from exc
|
||||
except urllib.error.URLError as exc:
|
||||
raise RuntimeError(f"pyMC modem request failed: {exc.reason}") from exc
|
||||
|
||||
if status < 200 or status >= 300:
|
||||
raise RuntimeError(f"pyMC modem HTTP {status} reading {self.url}")
|
||||
|
||||
try:
|
||||
payload = json.loads(body.decode("utf-8"))
|
||||
except Exception as exc:
|
||||
raise RuntimeError("pyMC modem response was not valid JSON") from exc
|
||||
if not isinstance(payload, dict):
|
||||
raise RuntimeError("pyMC modem response was not a JSON object")
|
||||
|
||||
return self._normalize_payload(payload)
|
||||
|
||||
def _normalize_payload(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
||||
raw_gps = payload.get("gps")
|
||||
gps: Dict[str, Any] = raw_gps if isinstance(raw_gps, dict) else {}
|
||||
position = self._first_dict(
|
||||
gps.get("position"),
|
||||
gps.get("gps_position"),
|
||||
gps.get("location"),
|
||||
payload.get("gps_position"),
|
||||
payload.get("position"),
|
||||
payload,
|
||||
)
|
||||
fix = self._first_dict(gps.get("fix"), payload.get("fix"))
|
||||
satellites = self._first_dict(gps.get("satellites"), payload.get("satellites"))
|
||||
time_data = self._first_dict(gps.get("time"), gps.get("time_data"), payload.get("time_data"))
|
||||
motion = self._first_dict(gps.get("motion"), payload.get("motion"))
|
||||
|
||||
out: Dict[str, Any] = {
|
||||
"source": "pymc_modem",
|
||||
"url": self.url,
|
||||
"gps_enabled": self._bool_or_none(gps.get("enabled")),
|
||||
"gps_seen": self._bool_or_none(gps.get("seen")),
|
||||
"latitude": self._float(position.get("latitude")),
|
||||
"longitude": self._float(position.get("longitude")),
|
||||
"altitude_m": self._float(position.get("altitude_m")),
|
||||
"fix_valid": self._bool_or_none(fix.get("valid")),
|
||||
"fix_quality": self._int(fix.get("quality")),
|
||||
"satellites_used": self._int(
|
||||
satellites.get("used_count", satellites.get("satellites_used"))
|
||||
),
|
||||
"satellites_in_view": self._int(
|
||||
satellites.get("in_view_count", satellites.get("satellites_in_view"))
|
||||
),
|
||||
"datetime_utc": time_data.get("datetime_utc") or payload.get("datetime_utc"),
|
||||
"speed_kmh": self._float(motion.get("speed_kmh", payload.get("speed_kmh"))),
|
||||
"course_degrees": self._float(
|
||||
motion.get("course_degrees", payload.get("course_degrees"))
|
||||
),
|
||||
}
|
||||
|
||||
for key in ("battery_voltage_mv", "battery_voltage_v", "solar_charge_rate_percent_per_hour"):
|
||||
if key in payload:
|
||||
out[key] = payload[key]
|
||||
|
||||
return {key: value for key, value in out.items() if value is not None}
|
||||
|
||||
@staticmethod
|
||||
def _first_dict(*values: Any) -> Dict[str, Any]:
|
||||
for value in values:
|
||||
if isinstance(value, dict):
|
||||
return value
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def _float(value: Any) -> Optional[float]:
|
||||
if value in (None, ""):
|
||||
return None
|
||||
try:
|
||||
result = float(value)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _int(value: Any) -> Optional[int]:
|
||||
if value in (None, ""):
|
||||
return None
|
||||
try:
|
||||
return int(value)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _bool_or_none(value: Any) -> Optional[bool]:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, bool):
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
lowered = value.strip().lower()
|
||||
if lowered in {"1", "true", "yes", "on"}:
|
||||
return True
|
||||
if lowered in {"0", "false", "no", "off"}:
|
||||
return False
|
||||
return bool(value)
|
||||
@@ -1,4 +1,5 @@
|
||||
import importlib.util
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
@@ -111,6 +112,85 @@ def test_gps_service_file_source_reads_nmea_lines(tmp_path):
|
||||
assert snapshot["satellites"]["used_count"] == 5
|
||||
|
||||
|
||||
def test_gps_service_modem_http_source_reads_generic_modem_gps(monkeypatch):
|
||||
class _Response:
|
||||
status = 200
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *_args):
|
||||
return False
|
||||
|
||||
def read(self):
|
||||
return json.dumps(
|
||||
{
|
||||
"battery_voltage_mv": 4112,
|
||||
"gps": {
|
||||
"fix": {"valid": True, "quality": 1},
|
||||
"position": {
|
||||
"latitude": 42.360082,
|
||||
"longitude": -71.05888,
|
||||
"altitude_m": 12.5,
|
||||
},
|
||||
"satellites": {"used_count": 9, "in_view_count": 14},
|
||||
"time": {"datetime_utc": "2026-06-14T18:25:30+00:00"},
|
||||
},
|
||||
}
|
||||
).encode("utf-8")
|
||||
|
||||
captured = {}
|
||||
|
||||
def _urlopen(request, timeout=None):
|
||||
captured["url"] = request.full_url
|
||||
captured["auth"] = request.headers.get("Authorization")
|
||||
captured["timeout"] = timeout
|
||||
return _Response()
|
||||
|
||||
monkeypatch.setattr(_MODULE.urllib.request, "urlopen", _urlopen)
|
||||
|
||||
service = GPSService(
|
||||
{
|
||||
"gps": {
|
||||
"enabled": True,
|
||||
"source": "modem_http",
|
||||
"host": "192.168.30.114",
|
||||
"port": 80,
|
||||
"endpoint": "/api/stats",
|
||||
"username": "admin",
|
||||
"password": "password",
|
||||
"read_timeout_seconds": 1.0,
|
||||
"poll_interval_seconds": 0.05,
|
||||
"stale_after_seconds": 5.0,
|
||||
"time_sync_enabled": False,
|
||||
}
|
||||
}
|
||||
)
|
||||
service.start()
|
||||
try:
|
||||
deadline = time.time() + 1.0
|
||||
snapshot = service.get_snapshot()
|
||||
while snapshot["status"]["state"] == "no_data" and time.time() < deadline:
|
||||
time.sleep(0.05)
|
||||
snapshot = service.get_snapshot()
|
||||
finally:
|
||||
service.stop()
|
||||
|
||||
assert captured["url"] == "http://192.168.30.114:80/api/stats"
|
||||
assert captured["auth"].startswith("Basic ")
|
||||
assert captured["timeout"] == 1.0
|
||||
assert snapshot["enabled"] is True
|
||||
assert snapshot["source"]["type"] == "modem_http"
|
||||
assert snapshot["status"]["state"] == "valid_fix"
|
||||
assert snapshot["position"]["latitude"] == 42.360082
|
||||
assert snapshot["position"]["longitude"] == -71.05888
|
||||
assert snapshot["position"]["altitude_m"] == 12.5
|
||||
assert snapshot["fix"]["quality"] == 1
|
||||
assert snapshot["satellites"]["used_count"] == 9
|
||||
assert snapshot["satellites"]["in_view_count"] == 14
|
||||
assert snapshot["time"]["datetime_utc"] == "2026-06-14T18:25:30+00:00"
|
||||
|
||||
|
||||
def test_rmc_only_fix_has_non_conflicting_quality_label():
|
||||
parser = NMEAParser()
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
@@ -16,6 +17,7 @@ from repeater.sensors import waveshare_ups_e as waveshare_ups_e_module
|
||||
from repeater.sensors.ens210 import ENS210Sensor
|
||||
from repeater.sensors.ina219 import INA219Sensor
|
||||
from repeater.sensors.lafvin_ups_3s import LafvinUps3sSensor
|
||||
from repeater.sensors.pymc_modem import PymcModemSensor
|
||||
from repeater.sensors.shtc3 import SHTC3Sensor
|
||||
from repeater.sensors.waveshare_ups_d import WaveshareUpsDSensor
|
||||
from repeater.sensors.waveshare_ups_e import WaveshareUpsESensor
|
||||
@@ -145,6 +147,118 @@ def test_hardware_stats_sensor_reads_from_collector(monkeypatch):
|
||||
assert reading["data"] == {"cpu": {"usage_percent": 42.0}}
|
||||
|
||||
|
||||
def test_pymc_modem_sensor_reads_modem_stats(monkeypatch):
|
||||
class _Response:
|
||||
status = 200
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *_args):
|
||||
return False
|
||||
|
||||
def read(self):
|
||||
return json.dumps(
|
||||
{
|
||||
"battery_voltage_mv": 4112,
|
||||
"battery_voltage_v": 4.112,
|
||||
"gps": {
|
||||
"fix": {"valid": True, "quality": 1},
|
||||
"position": {
|
||||
"latitude": 42.360082,
|
||||
"longitude": -71.05888,
|
||||
"altitude_m": 12.5,
|
||||
},
|
||||
"satellites": {"used_count": 9, "in_view_count": 14},
|
||||
"time": {"datetime_utc": "2026-06-14T18:25:30+00:00"},
|
||||
},
|
||||
}
|
||||
).encode()
|
||||
|
||||
captured = {}
|
||||
|
||||
def _urlopen(request, timeout=None):
|
||||
captured["url"] = request.full_url
|
||||
captured["auth"] = request.headers.get("Authorization")
|
||||
captured["timeout"] = timeout
|
||||
return _Response()
|
||||
|
||||
import repeater.sensors.pymc_modem as pymc_modem_module
|
||||
|
||||
monkeypatch.setattr(pymc_modem_module.urllib.request, "urlopen", _urlopen)
|
||||
|
||||
reading = PymcModemSensor(
|
||||
"modem",
|
||||
{
|
||||
"settings": {
|
||||
"host": "192.168.0.205",
|
||||
"password": "secret-token",
|
||||
"timeout_seconds": 3.5,
|
||||
}
|
||||
},
|
||||
).read()
|
||||
|
||||
assert reading["ok"] is True
|
||||
assert captured["url"] == "http://192.168.0.205/api/stats"
|
||||
assert captured["auth"].startswith("Basic ")
|
||||
assert captured["timeout"] == 3.5
|
||||
assert reading["data"]["source"] == "pymc_modem"
|
||||
assert reading["data"]["latitude"] == 42.360082
|
||||
assert reading["data"]["longitude"] == -71.05888
|
||||
assert reading["data"]["altitude_m"] == 12.5
|
||||
assert reading["data"]["fix_valid"] is True
|
||||
assert reading["data"]["fix_quality"] == 1
|
||||
assert reading["data"]["satellites_used"] == 9
|
||||
assert reading["data"]["satellites_in_view"] == 14
|
||||
assert reading["data"]["datetime_utc"] == "2026-06-14T18:25:30+00:00"
|
||||
assert reading["data"]["battery_voltage_mv"] == 4112
|
||||
|
||||
|
||||
def test_pymc_modem_sensor_accepts_stats_without_gps_coordinates(monkeypatch):
|
||||
class _Response:
|
||||
status = 200
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *_args):
|
||||
return False
|
||||
|
||||
def read(self):
|
||||
return json.dumps(
|
||||
{
|
||||
"battery_voltage_mv": 3681,
|
||||
"battery_voltage_v": 3.681,
|
||||
"solar_charge_rate_percent_per_hour": 9.568,
|
||||
"gps": {"enabled": True, "seen": False, "fix": {"valid": False}},
|
||||
}
|
||||
).encode("utf-8")
|
||||
|
||||
import repeater.sensors.pymc_modem as pymc_modem_module
|
||||
|
||||
monkeypatch.setattr(
|
||||
pymc_modem_module.urllib.request,
|
||||
"urlopen",
|
||||
lambda *_args, **_kwargs: _Response(),
|
||||
)
|
||||
|
||||
reading = PymcModemSensor(
|
||||
"modem",
|
||||
{"settings": {"base_url": "http://pymc-modem.local"}},
|
||||
).read()
|
||||
|
||||
assert reading["ok"] is True
|
||||
assert reading["data"]["source"] == "pymc_modem"
|
||||
assert reading["data"]["battery_voltage_mv"] == 3681
|
||||
assert reading["data"]["battery_voltage_v"] == 3.681
|
||||
assert reading["data"]["solar_charge_rate_percent_per_hour"] == 9.568
|
||||
assert reading["data"]["gps_enabled"] is True
|
||||
assert reading["data"]["gps_seen"] is False
|
||||
assert reading["data"]["fix_valid"] is False
|
||||
assert reading["data"].get("latitude") is None
|
||||
assert reading["data"].get("longitude") is None
|
||||
|
||||
|
||||
def test_ina219_sensor_reads_voltage_current_and_power(monkeypatch):
|
||||
class _Bus:
|
||||
def __init__(self, bus_number):
|
||||
|
||||
Reference in New Issue
Block a user