mirror of
https://github.com/pyMC-dev/pyMC_Repeater.git
synced 2026-06-12 17:24:48 +02:00
45a44eb47b
- Updated byte representations in tests to use lowercase hex format for consistency. - Reformatted code for better readability, including line breaks and indentation adjustments. - Consolidated multiple lines into single lines where appropriate to enhance clarity. - Ensured that all test cases maintain consistent formatting and style across the test suite.
177 lines
6.4 KiB
Python
177 lines
6.4 KiB
Python
from __future__ import annotations
|
|
|
|
import importlib
|
|
import logging
|
|
import threading
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from .registry import SensorRegistry
|
|
|
|
|
|
class SensorManager:
|
|
"""Load and read sensor plug-ins declared in config with background polling."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Dict[str, Any],
|
|
*,
|
|
log: Optional[logging.Logger] = None,
|
|
registry: type[SensorRegistry] = SensorRegistry,
|
|
):
|
|
self.config = config if isinstance(config, dict) else {}
|
|
self.log = log or logging.getLogger(self.__class__.__name__)
|
|
self.registry = registry
|
|
self.sensors = []
|
|
|
|
# Background polling
|
|
self._poll_thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
self._latest_readings: List[Dict[str, Any]] = []
|
|
self._readings_lock = threading.RLock()
|
|
self._running = False
|
|
|
|
self.reload()
|
|
|
|
def _get_sensor_definitions(self) -> List[Dict[str, Any]]:
|
|
"""Extract sensor definitions from config."""
|
|
section = self.config.get("sensors", {})
|
|
if not isinstance(section, dict):
|
|
return []
|
|
global_auto_install = bool(section.get("auto_install_packages", False))
|
|
definitions = section.get("definitions") or section.get("sensors")
|
|
if not isinstance(definitions, list):
|
|
return []
|
|
out: List[Dict[str, Any]] = []
|
|
for d in definitions:
|
|
if not isinstance(d, dict):
|
|
continue
|
|
entry = dict(d)
|
|
entry.setdefault("auto_install_packages", global_auto_install)
|
|
out.append(entry)
|
|
return out
|
|
|
|
def _load_sensor_module(self, sensor_type: str) -> None:
|
|
"""Import sensor module so @SensorRegistry.register decorators execute."""
|
|
module_name = str(sensor_type).strip().lower().replace("-", "_")
|
|
package_name = __name__.rsplit(".", 1)[0]
|
|
importlib.import_module(f"{package_name}.{module_name}")
|
|
|
|
def reload(self) -> None:
|
|
self.sensors = []
|
|
for definition in self._get_sensor_definitions():
|
|
if not definition.get("enabled", True):
|
|
continue
|
|
|
|
sensor_type = definition.get("type")
|
|
name = definition.get("name") or str(sensor_type or "sensor")
|
|
if not sensor_type:
|
|
self.log.warning("Skipping sensor definition %r: missing type", name)
|
|
continue
|
|
|
|
try:
|
|
self._load_sensor_module(sensor_type)
|
|
sensor = self.registry.create(sensor_type, name=name, config=definition)
|
|
except Exception as exc:
|
|
self.log.warning("Skipping sensor %r of type %r: %s", name, sensor_type, exc)
|
|
continue
|
|
|
|
self.sensors.append(sensor)
|
|
|
|
def start(self) -> None:
|
|
if self._running:
|
|
return
|
|
self.reload()
|
|
|
|
# Start background polling thread if enabled and sensors exist
|
|
section = self.config.get("sensors", {})
|
|
if not isinstance(section, dict) or not section.get("enabled", False):
|
|
self.log.debug("Sensor manager disabled in config")
|
|
return
|
|
|
|
if not self.sensors:
|
|
self.log.debug("No sensors loaded; skipping background polling")
|
|
return
|
|
|
|
self._stop_event.clear()
|
|
self._poll_thread = threading.Thread(
|
|
target=self._poll_loop, name="sensor-manager", daemon=True
|
|
)
|
|
self._poll_thread.start()
|
|
self._running = True
|
|
self.log.info("Sensor manager polling started (%d sensors)", len(self.sensors))
|
|
|
|
def stop(self) -> None:
|
|
if not self._running:
|
|
return
|
|
self._stop_event.set()
|
|
if self._poll_thread and self._poll_thread.is_alive():
|
|
self._poll_thread.join(timeout=2.0)
|
|
self._running = False
|
|
self.log.info("Sensor manager polling stopped")
|
|
|
|
def read_all(self) -> List[Dict[str, Any]]:
|
|
readings: List[Dict[str, Any]] = []
|
|
for sensor in self.sensors:
|
|
try:
|
|
readings.append(sensor.read())
|
|
except Exception as exc:
|
|
self.log.warning("Sensor manager caught read error for %s: %s", sensor.name, exc)
|
|
readings.append(
|
|
{
|
|
"name": sensor.name,
|
|
"type": getattr(sensor, "sensor_type", "sensor"),
|
|
"ok": False,
|
|
"timestamp": None,
|
|
"data": {},
|
|
"error": f"{type(exc).__name__}: {exc}",
|
|
}
|
|
)
|
|
return readings
|
|
|
|
def _poll_loop(self) -> None:
|
|
"""Background thread: poll sensors at configured interval and cache readings."""
|
|
section = self.config.get("sensors", {})
|
|
poll_interval = 30.0
|
|
if isinstance(section, dict):
|
|
try:
|
|
poll_interval = float(section.get("poll_interval_seconds", 30.0))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
self.log.debug("Sensor polling loop started (interval=%.1f sec)", poll_interval)
|
|
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
readings = self.read_all()
|
|
with self._readings_lock:
|
|
self._latest_readings = readings
|
|
except Exception as exc:
|
|
self.log.warning("Sensor poll cycle failed: %s", exc)
|
|
|
|
# Wait for next poll or stop signal
|
|
self._stop_event.wait(poll_interval)
|
|
|
|
self.log.debug("Sensor polling loop stopped")
|
|
|
|
def get_summary(self) -> Dict[str, Any]:
|
|
section = self.config.get("sensors", {})
|
|
poll_interval = 30.0
|
|
if isinstance(section, dict):
|
|
try:
|
|
poll_interval = float(section.get("poll_interval_seconds", 30.0))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
# Get cached readings (or empty list if not running)
|
|
with self._readings_lock:
|
|
readings = list(self._latest_readings) if self._latest_readings else []
|
|
|
|
return {
|
|
"enabled": bool(isinstance(section, dict) and section.get("enabled", False)),
|
|
"poll_interval_seconds": poll_interval,
|
|
"configured": len(self._get_sensor_definitions()),
|
|
"loaded": len(self.sensors),
|
|
"running": self._running,
|
|
"readings": readings,
|
|
}
|