Merge pull request #310 from yellowcooln/feat/openhop-glass-integration

feat: integrate openHop Glass policy synchronization and sensor summaries
This commit is contained in:
Lloyd
2026-06-25 09:29:26 +01:00
committed by GitHub
8 changed files with 600 additions and 7 deletions
+331 -1
View File
@@ -3,6 +3,7 @@ import hashlib
import json
import logging
import os
import re
import ssl
import time
from datetime import datetime, timezone
@@ -12,6 +13,7 @@ from urllib import error, request
from urllib.parse import urlparse
import psutil
import yaml
try:
import paho.mqtt.client as mqtt
@@ -19,6 +21,7 @@ except ImportError:
mqtt = None
from repeater import __version__
from repeater.policy_engine import PolicyEngine, SUPPORTED_ACTIONS, default_policy_engine_config
from repeater.service_utils import restart_service
logger = logging.getLogger("GlassHandler")
@@ -289,7 +292,7 @@ class GlassHandler:
settings_snapshot = self._build_settings_snapshot()
location = self._extract_location_from_settings(settings_snapshot)
return {
payload = {
"type": "inform",
"version": 1,
"node_name": node_name,
@@ -321,6 +324,28 @@ class GlassHandler:
"settings": settings_snapshot,
"command_results": command_results,
}
sensors_summary = self._collect_sensor_summary()
if sensors_summary is not None:
payload["sensors"] = sensors_summary
return payload
def _collect_sensor_summary(self) -> Optional[Dict[str, Any]]:
sensor_manager = getattr(self.daemon_instance, "sensor_manager", None)
if sensor_manager is None:
return None
try:
summary = sensor_manager.get_summary()
return summary if isinstance(summary, dict) else None
except Exception as exc:
logger.debug("Failed collecting sensor summary for Glass inform: %s", exc)
return {
"enabled": False,
"configured": 0,
"loaded": 0,
"running": False,
"readings": [],
"error": str(exc),
}
def _build_settings_snapshot(self) -> Dict[str, Any]:
normalized = self._normalize_for_hash(self.config)
@@ -594,6 +619,10 @@ class GlassHandler:
success, message, details = self._apply_transport_keys_sync(params)
return success, message, details
if action == "policy_sync":
success, message, details = self._apply_policy_sync(params)
return success, message, details
if action == "set_radio":
radio_values = params.get("radio", params)
if not isinstance(radio_values, dict):
@@ -717,6 +746,307 @@ class GlassHandler:
details["payload_hash"] = payload_hash
return True, f"Applied transport key sync ({details['applied_nodes']} nodes)", details
def _apply_policy_sync(
self,
params: Dict[str, Any],
) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
if not isinstance(params, dict):
return False, "policy_sync params must be an object", None
incoming_policy = params.get("policy")
if not isinstance(incoming_policy, dict):
return False, "policy_sync payload must include a policy object", None
mode = str(params.get("mode", "replace") or "replace").lower().strip()
if mode not in ("replace", "patch"):
return False, f"Unsupported policy_sync mode: {mode}", None
validate_only = bool(params.get("validate_only", False))
existing_doc, _ = self._load_policy_document()
existing_policy = self._normalize_policy_engine(existing_doc.get("policy_engine", {}))
if mode == "patch":
policy_engine_cfg = dict(existing_policy)
self._deep_merge(policy_engine_cfg, incoming_policy)
else:
policy_engine_cfg = incoming_policy
groups_cfg = params.get("groups", existing_doc.get("groups", {}))
doc_to_apply = {
"policy_engine": self._normalize_policy_engine(policy_engine_cfg),
"groups": self._normalize_policy_groups(groups_cfg),
}
doc_to_apply = self._sync_policy_engine_objects_from_groups(doc_to_apply)
try:
self._validate_policy_engine(doc_to_apply.get("policy_engine", {}))
PolicyEngine(doc_to_apply.get("policy_engine", {}))
except Exception as exc:
return False, f"Invalid policy: {exc}", None
details = self._policy_sync_details(doc_to_apply, mode=mode, validate_only=validate_only)
if validate_only:
return True, "Policy validated", details
try:
self._write_policy_document(doc_to_apply)
self._apply_policy_runtime(doc_to_apply.get("policy_engine", {}))
except Exception as exc:
return False, f"Policy sync failed: {exc}", None
return True, "Policy synchronized", details
def _policy_sync_details(
self,
doc: Dict[str, Any],
*,
mode: str,
validate_only: bool,
) -> Dict[str, Any]:
policy_engine_cfg = doc.get("policy_engine", {}) if isinstance(doc, dict) else {}
rules = policy_engine_cfg.get("rules", []) if isinstance(policy_engine_cfg, dict) else []
return {
"policy_file": self._get_policy_file_path(),
"mode": mode,
"validate_only": validate_only,
"rule_count": len(rules) if isinstance(rules, list) else 0,
"enabled": bool(policy_engine_cfg.get("enabled", False))
if isinstance(policy_engine_cfg, dict)
else False,
"default_action": str(policy_engine_cfg.get("default_action", "allow"))
if isinstance(policy_engine_cfg, dict)
else "allow",
}
def _get_policy_file_path(self) -> str:
policy_cfg = self.config.get("policy", {}) if isinstance(self.config, dict) else {}
policy_file = policy_cfg.get("policy_file", "policy.yaml")
if os.path.isabs(str(policy_file)):
return str(policy_file)
config_path = getattr(self.config_manager, "config_path", None) or self.config.get(
"config_path", "/etc/pymc_repeater/config.yaml"
)
config_dir = os.path.dirname(os.path.abspath(str(config_path)))
return os.path.abspath(os.path.join(config_dir, str(policy_file)))
@staticmethod
def _default_policy_document() -> Dict[str, Any]:
return {
"policy_engine": default_policy_engine_config(),
"groups": {"channel_hashes": [], "pubkeys": []},
}
def _load_policy_document(self) -> Tuple[Dict[str, Any], bool]:
path = self._get_policy_file_path()
if not os.path.exists(path):
return self._default_policy_document(), False
try:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
return self._default_policy_document(), False
if "policy_engine" not in data:
return {
"policy_engine": data,
"groups": self._default_policy_document()["groups"],
}, True
if not isinstance(data.get("policy_engine"), dict):
return self._default_policy_document(), False
if not isinstance(data.get("groups"), dict):
data["groups"] = self._default_policy_document()["groups"]
return data, True
except Exception as exc:
logger.error("Failed to load policy file %s: %s", path, exc)
return self._default_policy_document(), False
def _write_policy_document(self, doc: Dict[str, Any]) -> None:
policy_path = self._get_policy_file_path()
os.makedirs(os.path.dirname(policy_path), exist_ok=True)
with open(policy_path, "w", encoding="utf-8") as f:
yaml.safe_dump(
doc,
f,
default_flow_style=False,
sort_keys=False,
allow_unicode=True,
width=1000000,
)
@staticmethod
def _normalize_policy_engine(engine_cfg: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(engine_cfg, dict):
engine_cfg = {}
return {
"enabled": bool(engine_cfg.get("enabled", False)),
"default_action": str(engine_cfg.get("default_action", "allow")),
"rules": engine_cfg.get("rules") if isinstance(engine_cfg.get("rules"), list) else [],
"objects": (
engine_cfg.get("objects") if isinstance(engine_cfg.get("objects"), dict) else {}
),
}
def _apply_policy_runtime(self, policy_engine_cfg: Dict[str, Any]) -> None:
self.config["policy_engine"] = policy_engine_cfg
self.config["policy_file_path"] = self._get_policy_file_path()
repeater_handler = getattr(self.daemon_instance, "repeater_handler", None)
if repeater_handler is not None:
repeater_handler.policy_engine = PolicyEngine.from_runtime_config(self.config)
def _sync_policy_engine_objects_from_groups(self, doc: Dict[str, Any]) -> Dict[str, Any]:
policy_engine_cfg = self._normalize_policy_engine(doc.get("policy_engine", {}))
groups_cfg = self._normalize_policy_groups(doc.get("groups", {}))
objects = policy_engine_cfg.get("objects", {})
if not isinstance(objects, dict):
objects = {}
objects.update(self._policy_objects_from_groups(groups_cfg))
policy_engine_cfg["objects"] = objects
doc["policy_engine"] = policy_engine_cfg
doc["groups"] = groups_cfg
return doc
def _normalize_policy_groups(self, groups_cfg: Dict[str, Any]) -> Dict[str, Any]:
normalized = {"channel_hashes": [], "pubkeys": []}
if not isinstance(groups_cfg, dict):
return normalized
for kind in ("channel_hashes", "pubkeys"):
source_groups = groups_cfg.get(kind)
if not isinstance(source_groups, list):
continue
seen_group_ids = set()
for idx, group in enumerate(source_groups):
if not isinstance(group, dict):
continue
group_id = self._slugify_policy_id(
group.get("id") or group.get("name") or group.get("friendly_name"),
f"{kind}_{idx + 1}",
)
if group_id in seen_group_ids:
continue
seen_group_ids.add(group_id)
entries = []
seen_entry_ids = set()
for ent_idx, entry in enumerate(group.get("entries") or []):
if not isinstance(entry, dict):
continue
try:
entry_value = self._normalize_policy_entry_value(kind, entry.get("value"))
except Exception as exc:
logger.warning(
"Skipping invalid policy entry at index %d: %s", ent_idx, exc
)
continue
entry_id = self._slugify_policy_id(
entry.get("id")
or entry.get("name")
or entry.get("friendly_name")
or entry_value,
f"entry_{ent_idx + 1}",
)
if entry_id in seen_entry_ids:
continue
seen_entry_ids.add(entry_id)
entries.append(
{
"id": entry_id,
"friendly_name": str(
entry.get("friendly_name") or entry.get("name") or entry_id
),
"value": entry_value,
}
)
normalized[kind].append(
{
"id": group_id,
"friendly_name": str(
group.get("friendly_name") or group.get("name") or group_id
),
"description": str(group.get("description") or ""),
"entries": entries,
}
)
return normalized
@staticmethod
def _slugify_policy_id(value: str, fallback: str) -> str:
text = str(value or "").strip().lower()
text = re.sub(r"[^a-z0-9]+", "_", text).strip("_")
return text or fallback
def _normalize_policy_entry_value(self, kind: str, value: Any) -> str:
if kind == "pubkeys":
return self._normalize_pubkey_value(value)
if kind == "channel_hashes":
return self._normalize_channel_hash_value(value)
raise ValueError(f"Unsupported group kind: {kind}")
@staticmethod
def _normalize_pubkey_value(value: Any) -> str:
if value is None:
raise ValueError("pubkey value is required")
raw = value.hex() if isinstance(value, bytes) else str(value).strip().lower()
if raw.startswith("0x"):
raw = raw[2:]
raw = raw.replace(" ", "")
if not raw:
raise ValueError("pubkey value is required")
if not re.fullmatch(r"[0-9a-f]+", raw):
raise ValueError("pubkey must be hex")
if len(raw) % 2 != 0:
raise ValueError("pubkey hex length must be even")
return f"0x{raw}"
@staticmethod
def _normalize_channel_hash_value(value: Any) -> str:
if value is None:
raise ValueError("channel hash value is required")
if isinstance(value, int):
parsed = value
else:
raw = str(value).strip()
if not raw:
raise ValueError("channel hash value is required")
normalized_hex = raw[2:] if raw.lower().startswith("0x") else raw
if len(normalized_hex) in (32, 64) and re.fullmatch(r"[0-9a-fA-F]+", normalized_hex):
return f"0x{normalized_hex.upper()}"
if raw.lower().startswith("0x"):
parsed = int(raw, 16)
elif re.fullmatch(r"[0-9]+", raw):
parsed = int(raw, 10)
else:
parsed = int(raw, 16)
if parsed < 0:
raise ValueError("channel hash must be non-negative")
if parsed > 0xFF:
raise ValueError("channel hash must be one byte (0x00-0xFF)")
return f"0x{parsed:02X}"
def _policy_objects_from_groups(self, groups_cfg: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
channel_hash_groups = {}
pubkey_groups = {}
for group in groups_cfg.get("channel_hashes", []):
channel_hash_groups[group["id"]] = [
entry["value"] for entry in group.get("entries", [])
]
for group in groups_cfg.get("pubkeys", []):
pubkey_groups[group["id"]] = [entry["value"] for entry in group.get("entries", [])]
return {"channel_hash_groups": channel_hash_groups, "pubkey_groups": pubkey_groups}
@staticmethod
def _validate_policy_engine(policy_engine_cfg: Dict[str, Any]) -> None:
default_action = str(policy_engine_cfg.get("default_action", "allow"))
if default_action not in SUPPORTED_ACTIONS:
raise ValueError(f"Unsupported default_action: {default_action}")
rules = policy_engine_cfg.get("rules", [])
if not isinstance(rules, list):
raise ValueError("rules must be a list")
for idx, rule in enumerate(rules):
if not isinstance(rule, dict):
raise ValueError(f"rule {idx} must be an object")
then_block = rule.get("then", {})
action = then_block.get("action") if isinstance(then_block, dict) else then_block
if action is None:
action = rule.get("action", "allow")
action = str(action or "allow")
if action not in SUPPORTED_ACTIONS:
raise ValueError(f"Unsupported rule action at index {idx}: {action}")
def _apply_cert_renewal(self, response: Dict[str, Any]) -> Tuple[bool, str]:
client_cert = response.get("client_cert")
client_key = response.get("client_key")
+28 -1
View File
@@ -12,6 +12,7 @@ except ImportError:
psutil = None
import logging
import platform
import time
logger = logging.getLogger("HardwareStats")
@@ -57,6 +58,7 @@ class HardwareStatsCollector:
# System boot time
boot_time = psutil.boot_time()
system_uptime = now - boot_time
system_info = self._get_system_info()
# Temperature (if available)
temperatures = {}
@@ -96,7 +98,13 @@ class HardwareStatsCollector:
"packets_sent": net_io.packets_sent,
"packets_recv": net_io.packets_recv,
},
"system": {"uptime": system_uptime, "boot_time": boot_time},
"system": {
"uptime": system_uptime,
"boot_time": boot_time,
"os": system_info["os"],
"kernel": system_info["kernel"],
"arch": system_info["arch"],
},
}
# Add temperatures if available
@@ -109,6 +117,25 @@ class HardwareStatsCollector:
logger.error(f"Error collecting hardware stats: {e}")
return {"error": str(e)}
@staticmethod
def _get_system_info(os_release_path="/etc/os-release"):
os_name = None
try:
with open(os_release_path, "r", encoding="utf-8") as f:
for line in f:
key, sep, value = line.partition("=")
if sep and key == "PRETTY_NAME":
os_name = value.strip().strip('"')
break
except OSError:
os_name = None
return {
"os": os_name or platform.system(),
"kernel": platform.release(),
"arch": platform.machine(),
}
def get_processes_summary(self, limit=10):
"""
Get top processes by CPU and memory usage.
+4 -1
View File
@@ -1140,7 +1140,10 @@ class RepeaterDaemon:
try:
from openhop_core.protocol import PacketBuilder
from openhop_core.protocol.constants import ADVERT_FLAG_HAS_NAME, ADVERT_FLAG_IS_REPEATER
from openhop_core.protocol.constants import (
ADVERT_FLAG_HAS_NAME,
ADVERT_FLAG_IS_REPEATER,
)
# Get node name and location from config
repeater_config = self.config.get("repeater", {})
+214 -2
View File
@@ -4,6 +4,7 @@ import json
import time
from pathlib import Path
import pytest
import yaml
_MODULE_PATH = (
Path(__file__).resolve().parents[1] / "repeater" / "data_acquisition" / "glass_handler.py"
@@ -22,7 +23,8 @@ class _DummyIdentity:
class _DummyConfigManager:
def __init__(self):
def __init__(self, config_path="/tmp/config.yaml"):
self.config_path = config_path
self.calls = []
def update_and_save(self, updates, live_update=True, live_update_sections=None):
@@ -47,7 +49,10 @@ class _DummyConfigManager:
class _DummyDaemon:
def __init__(self):
self.local_identity = _DummyIdentity()
self.repeater_handler = type("RH", (), {"start_time": time.time() - 60})()
self.repeater_handler = type(
"RH", (), {"start_time": time.time() - 60, "policy_engine": None}
)()
self.sensor_manager = None
@staticmethod
def get_stats():
@@ -69,6 +74,32 @@ class _DummyDaemon:
return True
class _DummySensorManager:
def __init__(self, summary=None, error=None):
self.summary = summary or {
"enabled": True,
"poll_interval_seconds": 30.0,
"configured": 1,
"loaded": 1,
"running": True,
"readings": [
{
"name": "ups-main",
"type": "waveshare_ups_d",
"ok": True,
"timestamp": "2026-06-20T12:00:00+00:00",
"data": {"battery_percent": 87.5, "current_ma": 120.0},
}
],
}
self.error = error
def get_summary(self):
if self.error:
raise self.error
return self.summary
class _DummyMqttClient:
def __init__(self):
self.published = []
@@ -208,6 +239,32 @@ def test_build_inform_payload_contains_expected_fields():
assert payload["command_results"][0]["command_id"] == "cmd-1"
def test_build_inform_payload_includes_sensors_when_manager_exists():
config = _make_config()
daemon = _DummyDaemon()
daemon.sensor_manager = _DummySensorManager()
manager = _DummyConfigManager()
handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager)
payload = asyncio.run(handler._build_inform_payload())
assert payload["sensors"]["enabled"] is True
assert payload["sensors"]["readings"][0]["data"]["battery_percent"] == 87.5
def test_build_inform_payload_reports_sensor_summary_error():
config = _make_config()
daemon = _DummyDaemon()
daemon.sensor_manager = _DummySensorManager(error=RuntimeError("i2c unavailable"))
manager = _DummyConfigManager()
handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager)
payload = asyncio.run(handler._build_inform_payload())
assert payload["sensors"]["running"] is False
assert "i2c unavailable" in payload["sensors"]["error"]
def test_execute_set_mode_command_updates_config():
config = _make_config()
daemon = _DummyDaemon()
@@ -224,6 +281,161 @@ def test_execute_set_mode_command_updates_config():
assert manager.calls[-1]["updates"]["repeater"]["mode"] == "monitor"
def test_execute_policy_sync_validate_only_does_not_write_or_apply_runtime(tmp_path):
config = _make_config()
cfg_path = tmp_path / "config.yaml"
cfg_path.write_text("repeater: {node_name: test}\n", encoding="utf-8")
daemon = _DummyDaemon()
manager = _DummyConfigManager(config_path=str(cfg_path))
handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager)
ok, message, details = asyncio.run(
handler._execute_command_action(
"policy_sync",
{
"policy": {
"enabled": True,
"default_action": "allow",
"rules": [{"id": 1, "if": {"all": []}, "then": {"action": "drop"}}],
},
"validate_only": True,
},
)
)
assert ok is True
assert message == "Policy validated"
assert details["validate_only"] is True
assert details["rule_count"] == 1
assert not (tmp_path / "policy.yaml").exists()
assert "policy_engine" not in config
assert daemon.repeater_handler.policy_engine is None
def test_execute_policy_sync_replace_writes_wrapper_preserves_groups_and_applies_runtime(tmp_path):
config = _make_config()
cfg_path = tmp_path / "config.yaml"
cfg_path.write_text("repeater: {node_name: test}\n", encoding="utf-8")
policy_path = tmp_path / "policy.yaml"
policy_path.write_text(
yaml.safe_dump(
{
"policy_engine": {"enabled": False, "default_action": "allow", "rules": []},
"groups": {
"channel_hashes": [
{
"id": "ops_channels",
"friendly_name": "Ops Channels",
"entries": [{"id": "ops", "value": "0x12"}],
}
],
"pubkeys": [],
},
}
),
encoding="utf-8",
)
daemon = _DummyDaemon()
manager = _DummyConfigManager(config_path=str(cfg_path))
handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager)
ok, message, details = asyncio.run(
handler._execute_command_action(
"policy_sync",
{
"policy": {
"enabled": True,
"default_action": "allow",
"rules": [
{
"id": 7,
"if": {
"all": [
{
"field": "channel_hash",
"op": "in",
"value": "@channel_hash_groups.ops_channels",
}
]
},
"then": {"action": "drop"},
}
],
},
"mode": "replace",
},
)
)
assert ok is True
assert message == "Policy synchronized"
assert details["enabled"] is True
loaded = yaml.safe_load(policy_path.read_text(encoding="utf-8"))
assert loaded["policy_engine"]["enabled"] is True
assert loaded["groups"]["channel_hashes"][0]["id"] == "ops_channels"
assert loaded["policy_engine"]["objects"]["channel_hash_groups"]["ops_channels"] == ["0x12"]
assert config["policy_engine"]["enabled"] is True
assert daemon.repeater_handler.policy_engine is not None
def test_execute_policy_sync_patch_preserves_unspecified_policy_fields(tmp_path):
config = _make_config()
cfg_path = tmp_path / "config.yaml"
cfg_path.write_text("repeater: {node_name: test}\n", encoding="utf-8")
policy_path = tmp_path / "policy.yaml"
policy_path.write_text(
yaml.safe_dump(
{
"policy_engine": {
"enabled": False,
"default_action": "drop",
"rules": [{"id": "keep", "if": {"all": []}, "then": {"action": "allow"}}],
"objects": {"custom": {"values": ["a"]}},
},
"groups": {"channel_hashes": [], "pubkeys": []},
}
),
encoding="utf-8",
)
daemon = _DummyDaemon()
manager = _DummyConfigManager(config_path=str(cfg_path))
handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager)
ok, message, _details = asyncio.run(
handler._execute_command_action(
"policy_sync",
{"policy": {"enabled": True}, "mode": "patch"},
)
)
assert ok is True
assert message == "Policy synchronized"
loaded = yaml.safe_load(policy_path.read_text(encoding="utf-8"))
assert loaded["policy_engine"]["enabled"] is True
assert loaded["policy_engine"]["default_action"] == "drop"
assert loaded["policy_engine"]["rules"][0]["id"] == "keep"
assert loaded["policy_engine"]["objects"]["custom"] == {"values": ["a"]}
def test_execute_policy_sync_rejects_unsupported_mode(tmp_path):
config = _make_config()
cfg_path = tmp_path / "config.yaml"
cfg_path.write_text("repeater: {node_name: test}\n", encoding="utf-8")
daemon = _DummyDaemon()
manager = _DummyConfigManager(config_path=str(cfg_path))
handler = GlassHandler(config=config, daemon_instance=daemon, config_manager=manager)
ok, message, details = asyncio.run(
handler._execute_command_action(
"policy_sync", {"policy": {"enabled": True}, "mode": "merge"}
)
)
assert ok is False
assert "Unsupported policy_sync mode" in message
assert details is None
def test_handle_command_response_queues_result():
config = _make_config()
daemon = _DummyDaemon()
@@ -56,7 +56,8 @@ async def test_path_helper_updates_client_out_path_on_valid_decrypt():
packet = _PathPacket(payload=b"\x11\x22\xaa\xbb\xcc")
with patch(
"openhop_core.protocol.crypto.CryptoUtils.mac_then_decrypt", return_value=b"\x02\x99\x88\x01"
"openhop_core.protocol.crypto.CryptoUtils.mac_then_decrypt",
return_value=b"\x02\x99\x88\x01",
):
handled = await helper.process_path_packet(packet)
+17
View File
@@ -160,6 +160,23 @@ def test_hardware_stats_sensor_reads_from_collector(monkeypatch):
assert reading["data"] == {"cpu": {"usage_percent": 42.0}}
def test_hardware_stats_collector_reads_os_kernel_and_arch(tmp_path, monkeypatch):
import repeater.data_acquisition.hardware_stats as hardware_stats_module
os_release = tmp_path / "os-release"
os_release.write_text('NAME="Debian GNU/Linux"\nPRETTY_NAME="Debian GNU/Linux 12"\n')
monkeypatch.setattr(hardware_stats_module.platform, "release", lambda: "6.8.0-test")
monkeypatch.setattr(hardware_stats_module.platform, "machine", lambda: "aarch64")
info = hardware_stats_module.HardwareStatsCollector._get_system_info(str(os_release))
assert info == {
"os": "Debian GNU/Linux 12",
"kernel": "6.8.0-test",
"arch": "aarch64",
}
def test_pymc_modem_sensor_reads_modem_stats(monkeypatch):
class _Response:
status = 200
@@ -29,7 +29,9 @@ def _make_collector() -> StorageCollector:
patch("repeater.data_acquisition.storage_collector.RRDToolHandler"),
patch("repeater.data_acquisition.hardware_stats.HardwareStatsCollector"),
):
collector = StorageCollector(config={"storage": {"storage_dir": "/tmp/openhop_repeater_test"}})
collector = StorageCollector(
config={"storage": {"storage_dir": "/tmp/openhop_repeater_test"}}
)
# Stop any real stats-broadcast thread started during construction so the tests
# drive the loop deterministically.
+1
View File
@@ -370,6 +370,7 @@ def test_do_install_root_install_command_failure_sets_error(isolated_state, monk
monkeypatch.setattr(ue.os, "geteuid", lambda: 0)
monkeypatch.setattr(ue, "is_buildroot", lambda: False)
monkeypatch.setattr(ue, "_migrate_service_unit", lambda: None)
monkeypatch.setattr(ue, "_disable_legacy_services", lambda: None)
monkeypatch.setattr(ue.os.path, "isfile", lambda p: True)
monkeypatch.setattr(ue.os.path, "isdir", lambda p: False)