mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-07-02 07:51:09 +02:00
HomeAssistant MQTT fanout
This commit is contained in:
@@ -31,12 +31,14 @@ def _register_module_types() -> None:
|
||||
from app.fanout.bot import BotModule
|
||||
from app.fanout.map_upload import MapUploadModule
|
||||
from app.fanout.mqtt_community import MqttCommunityModule
|
||||
from app.fanout.mqtt_ha import MqttHaModule
|
||||
from app.fanout.mqtt_private import MqttPrivateModule
|
||||
from app.fanout.sqs import SqsModule
|
||||
from app.fanout.webhook import WebhookModule
|
||||
|
||||
_MODULE_TYPES["mqtt_private"] = MqttPrivateModule
|
||||
_MODULE_TYPES["mqtt_community"] = MqttCommunityModule
|
||||
_MODULE_TYPES["mqtt_ha"] = MqttHaModule
|
||||
_MODULE_TYPES["bot"] = BotModule
|
||||
_MODULE_TYPES["webhook"] = WebhookModule
|
||||
_MODULE_TYPES["apprise"] = AppriseModule
|
||||
|
||||
@@ -0,0 +1,599 @@
|
||||
"""Home Assistant MQTT Discovery fanout module.
|
||||
|
||||
Publishes HA-compatible discovery configs and state updates so that mesh
|
||||
network devices appear natively in Home Assistant via its built-in MQTT
|
||||
integration. No custom HA component is needed.
|
||||
|
||||
Entity types created:
|
||||
- Local radio: binary_sensor (connectivity) + sensors (noise floor, battery,
|
||||
uptime, RSSI, SNR, airtime, packet counts)
|
||||
- Per tracked repeater: sensor entities for telemetry fields
|
||||
- Per tracked contact: device_tracker for GPS position
|
||||
- Messages: event entity for scope-matched messages
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import ssl
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
|
||||
from app.fanout.base import FanoutModule, get_fanout_message_text
|
||||
from app.fanout.mqtt_base import BaseMqttPublisher
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Repeater telemetry sensor definitions ─────────────────────────────────
|
||||
|
||||
_REPEATER_SENSORS: list[dict[str, str | None]] = [
|
||||
{
|
||||
"field": "battery_volts",
|
||||
"name": "Battery Voltage",
|
||||
"object_id": "battery_voltage",
|
||||
"device_class": "voltage",
|
||||
"state_class": "measurement",
|
||||
"unit": "V",
|
||||
},
|
||||
{
|
||||
"field": "noise_floor_dbm",
|
||||
"name": "Noise Floor",
|
||||
"object_id": "noise_floor",
|
||||
"device_class": "signal_strength",
|
||||
"state_class": "measurement",
|
||||
"unit": "dBm",
|
||||
},
|
||||
{
|
||||
"field": "last_rssi_dbm",
|
||||
"name": "Last RSSI",
|
||||
"object_id": "last_rssi",
|
||||
"device_class": "signal_strength",
|
||||
"state_class": "measurement",
|
||||
"unit": "dBm",
|
||||
},
|
||||
{
|
||||
"field": "last_snr_db",
|
||||
"name": "Last SNR",
|
||||
"object_id": "last_snr",
|
||||
"device_class": None,
|
||||
"state_class": "measurement",
|
||||
"unit": "dB",
|
||||
},
|
||||
{
|
||||
"field": "packets_received",
|
||||
"name": "Packets Received",
|
||||
"object_id": "packets_received",
|
||||
"device_class": None,
|
||||
"state_class": "total_increasing",
|
||||
"unit": None,
|
||||
},
|
||||
{
|
||||
"field": "packets_sent",
|
||||
"name": "Packets Sent",
|
||||
"object_id": "packets_sent",
|
||||
"device_class": None,
|
||||
"state_class": "total_increasing",
|
||||
"unit": None,
|
||||
},
|
||||
{
|
||||
"field": "uptime_seconds",
|
||||
"name": "Uptime",
|
||||
"object_id": "uptime",
|
||||
"device_class": "duration",
|
||||
"state_class": None,
|
||||
"unit": "s",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ── Local radio sensor definitions ────────────────────────────────────────
|
||||
|
||||
_RADIO_SENSORS: list[dict[str, str | None]] = [
|
||||
{
|
||||
"field": "noise_floor_dbm",
|
||||
"name": "Noise Floor",
|
||||
"object_id": "noise_floor",
|
||||
"device_class": "signal_strength",
|
||||
"state_class": "measurement",
|
||||
"unit": "dBm",
|
||||
},
|
||||
{
|
||||
"field": "battery_mv",
|
||||
"name": "Battery",
|
||||
"object_id": "battery",
|
||||
"device_class": "voltage",
|
||||
"state_class": "measurement",
|
||||
"unit": "mV",
|
||||
},
|
||||
{
|
||||
"field": "uptime_secs",
|
||||
"name": "Uptime",
|
||||
"object_id": "uptime",
|
||||
"device_class": "duration",
|
||||
"state_class": None,
|
||||
"unit": "s",
|
||||
},
|
||||
{
|
||||
"field": "last_rssi",
|
||||
"name": "Last RSSI",
|
||||
"object_id": "last_rssi",
|
||||
"device_class": "signal_strength",
|
||||
"state_class": "measurement",
|
||||
"unit": "dBm",
|
||||
},
|
||||
{
|
||||
"field": "last_snr",
|
||||
"name": "Last SNR",
|
||||
"object_id": "last_snr",
|
||||
"device_class": None,
|
||||
"state_class": "measurement",
|
||||
"unit": "dB",
|
||||
},
|
||||
{
|
||||
"field": "tx_air_secs",
|
||||
"name": "TX Airtime",
|
||||
"object_id": "tx_airtime",
|
||||
"device_class": "duration",
|
||||
"state_class": "total_increasing",
|
||||
"unit": "s",
|
||||
},
|
||||
{
|
||||
"field": "rx_air_secs",
|
||||
"name": "RX Airtime",
|
||||
"object_id": "rx_airtime",
|
||||
"device_class": "duration",
|
||||
"state_class": "total_increasing",
|
||||
"unit": "s",
|
||||
},
|
||||
{
|
||||
"field": "packets_recv",
|
||||
"name": "Packets Received",
|
||||
"object_id": "packets_received",
|
||||
"device_class": None,
|
||||
"state_class": "total_increasing",
|
||||
"unit": None,
|
||||
},
|
||||
{
|
||||
"field": "packets_sent",
|
||||
"name": "Packets Sent",
|
||||
"object_id": "packets_sent",
|
||||
"device_class": None,
|
||||
"state_class": "total_increasing",
|
||||
"unit": None,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def _node_id(public_key: str) -> str:
|
||||
"""Derive a stable, MQTT-safe node identifier from a public key."""
|
||||
return public_key[:12].lower()
|
||||
|
||||
|
||||
def _device_payload(
|
||||
public_key: str,
|
||||
name: str,
|
||||
model: str,
|
||||
*,
|
||||
via_device_key: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Build an HA device registry fragment."""
|
||||
dev: dict[str, Any] = {
|
||||
"identifiers": [f"meshcore_{_node_id(public_key)}"],
|
||||
"name": name or public_key[:12],
|
||||
"manufacturer": "MeshCore",
|
||||
"model": model,
|
||||
}
|
||||
if via_device_key:
|
||||
dev["via_device"] = f"meshcore_{_node_id(via_device_key)}"
|
||||
return dev
|
||||
|
||||
|
||||
# ── MQTT publisher subclass ───────────────────────────────────────────────
|
||||
|
||||
|
||||
class _HaMqttPublisher(BaseMqttPublisher):
|
||||
"""Thin MQTT lifecycle wrapper for the HA discovery module."""
|
||||
|
||||
_backoff_max = 30
|
||||
_log_prefix = "HA-MQTT"
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._on_connected_callback: Any = None
|
||||
|
||||
def _is_configured(self) -> bool:
|
||||
s = self._settings
|
||||
return bool(s and s.broker_host)
|
||||
|
||||
def _build_client_kwargs(self, settings: object) -> dict[str, Any]:
|
||||
s: Any = settings
|
||||
kw: dict[str, Any] = {
|
||||
"hostname": s.broker_host,
|
||||
"port": s.broker_port,
|
||||
"username": s.username or None,
|
||||
"password": s.password or None,
|
||||
}
|
||||
if s.use_tls:
|
||||
ctx = ssl.create_default_context()
|
||||
if s.tls_insecure:
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
kw["tls_context"] = ctx
|
||||
return kw
|
||||
|
||||
def _on_connected(self, settings: object) -> tuple[str, str]:
|
||||
s: Any = settings
|
||||
return ("HA MQTT connected", f"{s.broker_host}:{s.broker_port}")
|
||||
|
||||
def _on_error(self) -> tuple[str, str]:
|
||||
return ("HA MQTT connection failure", "Please correct the settings or disable.")
|
||||
|
||||
async def _on_connected_async(self, settings: object) -> None:
|
||||
if self._on_connected_callback:
|
||||
await self._on_connected_callback()
|
||||
|
||||
|
||||
# ── Discovery config builders ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def _radio_discovery_configs(
|
||||
prefix: str,
|
||||
radio_key: str,
|
||||
radio_name: str,
|
||||
) -> list[tuple[str, dict]]:
|
||||
"""Build HA discovery config payloads for the local radio device."""
|
||||
nid = _node_id(radio_key)
|
||||
device = _device_payload(radio_key, radio_name, "Radio")
|
||||
state_topic = f"{prefix}/{nid}/health"
|
||||
configs: list[tuple[str, dict]] = []
|
||||
|
||||
# binary_sensor: connected
|
||||
configs.append(
|
||||
(
|
||||
f"homeassistant/binary_sensor/meshcore_{nid}/connected/config",
|
||||
{
|
||||
"name": "Connected",
|
||||
"unique_id": f"meshcore_{nid}_connected",
|
||||
"device": device,
|
||||
"state_topic": state_topic,
|
||||
"value_template": "{{ 'ON' if value_json.connected else 'OFF' }}",
|
||||
"device_class": "connectivity",
|
||||
"payload_on": "ON",
|
||||
"payload_off": "OFF",
|
||||
"expire_after": 120,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# sensors from _RADIO_SENSORS (noise floor, battery, uptime, RSSI, etc.)
|
||||
for sensor in _RADIO_SENSORS:
|
||||
cfg: dict[str, Any] = {
|
||||
"name": sensor["name"],
|
||||
"unique_id": f"meshcore_{nid}_{sensor['object_id']}",
|
||||
"device": device,
|
||||
"state_topic": state_topic,
|
||||
"value_template": "{{ value_json." + sensor["field"] + " }}", # type: ignore[operator]
|
||||
"expire_after": 120,
|
||||
}
|
||||
if sensor["device_class"]:
|
||||
cfg["device_class"] = sensor["device_class"]
|
||||
if sensor["state_class"]:
|
||||
cfg["state_class"] = sensor["state_class"]
|
||||
if sensor["unit"]:
|
||||
cfg["unit_of_measurement"] = sensor["unit"]
|
||||
|
||||
topic = f"homeassistant/sensor/meshcore_{nid}/{sensor['object_id']}/config"
|
||||
configs.append((topic, cfg))
|
||||
|
||||
return configs
|
||||
|
||||
|
||||
def _repeater_discovery_configs(
|
||||
prefix: str,
|
||||
pub_key: str,
|
||||
name: str,
|
||||
radio_key: str | None,
|
||||
) -> list[tuple[str, dict]]:
|
||||
"""Build HA discovery config payloads for a tracked repeater."""
|
||||
nid = _node_id(pub_key)
|
||||
device = _device_payload(pub_key, name, "Repeater", via_device_key=radio_key)
|
||||
state_topic = f"{prefix}/{nid}/telemetry"
|
||||
configs: list[tuple[str, dict]] = []
|
||||
|
||||
for sensor in _REPEATER_SENSORS:
|
||||
cfg: dict[str, Any] = {
|
||||
"name": sensor["name"],
|
||||
"unique_id": f"meshcore_{nid}_{sensor['object_id']}",
|
||||
"device": device,
|
||||
"state_topic": state_topic,
|
||||
"value_template": "{{ value_json." + sensor["field"] + " }}", # type: ignore[operator]
|
||||
}
|
||||
if sensor["device_class"]:
|
||||
cfg["device_class"] = sensor["device_class"]
|
||||
if sensor["state_class"]:
|
||||
cfg["state_class"] = sensor["state_class"]
|
||||
if sensor["unit"]:
|
||||
cfg["unit_of_measurement"] = sensor["unit"]
|
||||
# 10 hours — margin over the 8-hour auto-collect cycle
|
||||
cfg["expire_after"] = 36000
|
||||
|
||||
topic = f"homeassistant/sensor/meshcore_{nid}/{sensor['object_id']}/config"
|
||||
configs.append((topic, cfg))
|
||||
|
||||
return configs
|
||||
|
||||
|
||||
def _contact_tracker_discovery_config(
|
||||
prefix: str,
|
||||
pub_key: str,
|
||||
name: str,
|
||||
radio_key: str | None,
|
||||
) -> tuple[str, dict]:
|
||||
"""Build HA discovery config for a tracked contact's device_tracker."""
|
||||
nid = _node_id(pub_key)
|
||||
device = _device_payload(pub_key, name, "Node", via_device_key=radio_key)
|
||||
topic = f"homeassistant/device_tracker/meshcore_{nid}/config"
|
||||
cfg: dict[str, Any] = {
|
||||
"name": name or pub_key[:12],
|
||||
"unique_id": f"meshcore_{nid}_tracker",
|
||||
"device": device,
|
||||
"json_attributes_topic": f"{prefix}/{nid}/gps",
|
||||
"source_type": "gps",
|
||||
}
|
||||
return topic, cfg
|
||||
|
||||
|
||||
def _message_event_discovery_config(
|
||||
prefix: str, radio_key: str, radio_name: str
|
||||
) -> tuple[str, dict]:
|
||||
"""Build HA discovery config for the message event entity."""
|
||||
nid = _node_id(radio_key)
|
||||
device = _device_payload(radio_key, radio_name, "Radio")
|
||||
topic = f"homeassistant/event/meshcore_{nid}/messages/config"
|
||||
cfg: dict[str, Any] = {
|
||||
"name": "MeshCore Messages",
|
||||
"unique_id": f"meshcore_{nid}_messages",
|
||||
"device": device,
|
||||
"state_topic": f"{prefix}/{nid}/events/message",
|
||||
"event_types": ["message_received"],
|
||||
}
|
||||
return topic, cfg
|
||||
|
||||
|
||||
# ── Module class ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _config_to_settings(config: dict) -> SimpleNamespace:
|
||||
return SimpleNamespace(
|
||||
broker_host=config.get("broker_host", ""),
|
||||
broker_port=config.get("broker_port", 1883),
|
||||
username=config.get("username", ""),
|
||||
password=config.get("password", ""),
|
||||
use_tls=config.get("use_tls", False),
|
||||
tls_insecure=config.get("tls_insecure", False),
|
||||
)
|
||||
|
||||
|
||||
class MqttHaModule(FanoutModule):
|
||||
"""Home Assistant MQTT Discovery fanout module."""
|
||||
|
||||
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
|
||||
super().__init__(config_id, config, name=name)
|
||||
self._publisher = _HaMqttPublisher()
|
||||
self._publisher.set_integration_name(name or config_id)
|
||||
self._publisher._on_connected_callback = self._publish_discovery
|
||||
self._discovery_topics: list[str] = []
|
||||
self._radio_key: str | None = None
|
||||
self._radio_name: str | None = None
|
||||
|
||||
@property
|
||||
def _prefix(self) -> str:
|
||||
return self.config.get("topic_prefix", "meshcore")
|
||||
|
||||
@property
|
||||
def _tracked_contacts(self) -> list[str]:
|
||||
return self.config.get("tracked_contacts") or []
|
||||
|
||||
@property
|
||||
def _tracked_repeaters(self) -> list[str]:
|
||||
return self.config.get("tracked_repeaters") or []
|
||||
|
||||
# ── Lifecycle ──────────────────────────────────────────────────────
|
||||
|
||||
async def start(self) -> None:
|
||||
settings = _config_to_settings(self.config)
|
||||
await self._publisher.start(settings)
|
||||
|
||||
async def stop(self) -> None:
|
||||
await self._remove_discovery()
|
||||
await self._publisher.stop()
|
||||
self._discovery_topics.clear()
|
||||
|
||||
# ── Discovery publishing ──────────────────────────────────────────
|
||||
|
||||
async def _publish_discovery(self) -> None:
|
||||
"""Publish all HA discovery configs with retain=True."""
|
||||
if not self._radio_key:
|
||||
# Don't publish discovery until we know the radio identity —
|
||||
# the first health heartbeat will provide it and trigger this.
|
||||
return
|
||||
|
||||
configs: list[tuple[str, dict]] = []
|
||||
|
||||
radio_name = self._radio_name or "MeshCore Radio"
|
||||
configs.extend(_radio_discovery_configs(self._prefix, self._radio_key, radio_name))
|
||||
|
||||
# Tracked repeaters — resolve names from DB best-effort
|
||||
for pub_key in self._tracked_repeaters:
|
||||
rname = await self._resolve_contact_name(pub_key)
|
||||
configs.extend(
|
||||
_repeater_discovery_configs(self._prefix, pub_key, rname, self._radio_key)
|
||||
)
|
||||
|
||||
# Tracked contacts — resolve names from DB best-effort
|
||||
for pub_key in self._tracked_contacts:
|
||||
cname = await self._resolve_contact_name(pub_key)
|
||||
configs.append(
|
||||
_contact_tracker_discovery_config(self._prefix, pub_key, cname, self._radio_key)
|
||||
)
|
||||
|
||||
# Message event entity (namespaced to this radio)
|
||||
configs.append(_message_event_discovery_config(self._prefix, self._radio_key, radio_name))
|
||||
|
||||
self._discovery_topics = [topic for topic, _ in configs]
|
||||
|
||||
for topic, payload in configs:
|
||||
await self._publisher.publish(topic, payload, retain=True)
|
||||
|
||||
logger.info(
|
||||
"HA MQTT: published %d discovery configs (%d repeaters, %d contacts)",
|
||||
len(configs),
|
||||
len(self._tracked_repeaters),
|
||||
len(self._tracked_contacts),
|
||||
)
|
||||
|
||||
async def _clear_retained_topics(self, topics: list[str]) -> None:
|
||||
"""Publish empty retained payloads to remove entries from broker."""
|
||||
for topic in topics:
|
||||
try:
|
||||
if self._publisher._client:
|
||||
await self._publisher._client.publish(topic, b"", retain=True)
|
||||
except Exception:
|
||||
pass # best-effort cleanup
|
||||
|
||||
async def _remove_discovery(self) -> None:
|
||||
"""Publish empty retained payloads to remove all HA entities."""
|
||||
if not self._publisher.connected or not self._discovery_topics:
|
||||
return
|
||||
await self._clear_retained_topics(self._discovery_topics)
|
||||
|
||||
@staticmethod
|
||||
async def _resolve_contact_name(pub_key: str) -> str:
|
||||
"""Look up a contact's display name, falling back to 12-char prefix."""
|
||||
try:
|
||||
from app.repository.contacts import ContactRepository
|
||||
|
||||
contact = await ContactRepository.get_by_key(pub_key)
|
||||
if contact and contact.name:
|
||||
return contact.name
|
||||
except Exception:
|
||||
pass
|
||||
return pub_key[:12]
|
||||
|
||||
# ── Event handlers ────────────────────────────────────────────────
|
||||
|
||||
async def on_health(self, data: dict) -> None:
|
||||
if not self._publisher.connected:
|
||||
return
|
||||
|
||||
# Cache radio identity for discovery config generation
|
||||
pub_key = data.get("public_key")
|
||||
if pub_key:
|
||||
new_name = data.get("name")
|
||||
key_changed = pub_key != self._radio_key
|
||||
name_changed = new_name and new_name != self._radio_name
|
||||
|
||||
if key_changed:
|
||||
old_key = self._radio_key
|
||||
self._radio_key = pub_key
|
||||
self._radio_name = new_name
|
||||
# Remove stale discovery entries from the old identity (e.g.
|
||||
# "unknown" placeholder from before the radio key was known),
|
||||
# then re-publish with the real identity.
|
||||
if old_key is not None:
|
||||
await self._clear_retained_topics(
|
||||
[t for t, _ in _radio_discovery_configs(self._prefix, old_key, "")]
|
||||
)
|
||||
await self._publish_discovery()
|
||||
elif name_changed:
|
||||
self._radio_name = new_name
|
||||
await self._publish_discovery()
|
||||
|
||||
# Don't publish health state until we know the radio identity —
|
||||
# otherwise we create a stale "unknown" device in HA.
|
||||
if not self._radio_key:
|
||||
return
|
||||
|
||||
nid = _node_id(self._radio_key)
|
||||
payload: dict[str, Any] = {"connected": data.get("connected", False)}
|
||||
for sensor in _RADIO_SENSORS:
|
||||
field = sensor["field"]
|
||||
if field is not None:
|
||||
payload[field] = data.get(field)
|
||||
await self._publisher.publish(f"{self._prefix}/{nid}/health", payload)
|
||||
|
||||
async def on_contact(self, data: dict) -> None:
|
||||
if not self._publisher.connected:
|
||||
return
|
||||
|
||||
pub_key = data.get("public_key", "")
|
||||
if pub_key not in self._tracked_contacts:
|
||||
return
|
||||
|
||||
lat = data.get("lat")
|
||||
lon = data.get("lon")
|
||||
if lat is None or lon is None or (lat == 0.0 and lon == 0.0):
|
||||
return
|
||||
|
||||
nid = _node_id(pub_key)
|
||||
await self._publisher.publish(
|
||||
f"{self._prefix}/{nid}/gps",
|
||||
{
|
||||
"latitude": lat,
|
||||
"longitude": lon,
|
||||
"gps_accuracy": 0,
|
||||
"source_type": "gps",
|
||||
},
|
||||
)
|
||||
|
||||
async def on_telemetry(self, data: dict) -> None:
|
||||
if not self._publisher.connected:
|
||||
return
|
||||
|
||||
pub_key = data.get("public_key", "")
|
||||
if pub_key not in self._tracked_repeaters:
|
||||
return
|
||||
|
||||
nid = _node_id(pub_key)
|
||||
# Publish the full telemetry dict — HA sensors use value_template
|
||||
# to extract individual fields
|
||||
payload: dict[str, Any] = {}
|
||||
for s in _REPEATER_SENSORS:
|
||||
field = s["field"]
|
||||
if field is not None:
|
||||
payload[field] = data.get(field)
|
||||
await self._publisher.publish(f"{self._prefix}/{nid}/telemetry", payload)
|
||||
|
||||
async def on_message(self, data: dict) -> None:
|
||||
if not self._publisher.connected or not self._radio_key:
|
||||
return
|
||||
|
||||
text = get_fanout_message_text(data)
|
||||
nid = _node_id(self._radio_key)
|
||||
await self._publisher.publish(
|
||||
f"{self._prefix}/{nid}/events/message",
|
||||
{
|
||||
"event_type": "message_received",
|
||||
"sender_name": data.get("sender_name", ""),
|
||||
"sender_key": data.get("sender_key", ""),
|
||||
"text": text,
|
||||
"conversation_key": data.get("conversation_key", ""),
|
||||
"message_type": data.get("type", ""),
|
||||
"channel_name": data.get("channel_name"),
|
||||
"outgoing": data.get("outgoing", False),
|
||||
},
|
||||
)
|
||||
|
||||
# ── Status ────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def status(self) -> str:
|
||||
if not self.config.get("broker_host"):
|
||||
return "disconnected"
|
||||
if self._publisher.last_error:
|
||||
return "error"
|
||||
return "connected" if self._publisher.connected else "disconnected"
|
||||
|
||||
@property
|
||||
def last_error(self) -> str | None:
|
||||
return self._publisher.last_error
|
||||
+26
-2
@@ -16,7 +16,16 @@ from app.repository.fanout import FanoutConfigRepository
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/fanout", tags=["fanout"])
|
||||
|
||||
_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "sqs", "map_upload"}
|
||||
_VALID_TYPES = {
|
||||
"mqtt_private",
|
||||
"mqtt_community",
|
||||
"mqtt_ha",
|
||||
"bot",
|
||||
"webhook",
|
||||
"apprise",
|
||||
"sqs",
|
||||
"map_upload",
|
||||
}
|
||||
|
||||
_IATA_RE = re.compile(r"^[A-Z]{3}$")
|
||||
_DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE = "meshcore/{IATA}/{PUBLIC_KEY}/packets"
|
||||
@@ -96,6 +105,8 @@ def _validate_and_normalize_config(config_type: str, config: dict) -> dict:
|
||||
_validate_sqs_config(normalized)
|
||||
elif config_type == "map_upload":
|
||||
_validate_map_upload_config(normalized)
|
||||
elif config_type == "mqtt_ha":
|
||||
_validate_mqtt_ha_config(normalized)
|
||||
|
||||
return normalized
|
||||
|
||||
@@ -318,6 +329,19 @@ def _validate_map_upload_config(config: dict) -> None:
|
||||
config["geofence_radius_km"] = radius
|
||||
|
||||
|
||||
def _validate_mqtt_ha_config(config: dict) -> None:
|
||||
"""Validate mqtt_ha config blob."""
|
||||
if not config.get("broker_host"):
|
||||
raise HTTPException(status_code=400, detail="broker_host is required for mqtt_ha")
|
||||
port = config.get("broker_port", 1883)
|
||||
if not isinstance(port, int) or port < 1 or port > 65535:
|
||||
raise HTTPException(status_code=400, detail="broker_port must be between 1 and 65535")
|
||||
for field in ("tracked_contacts", "tracked_repeaters"):
|
||||
value = config.get(field)
|
||||
if value is not None and not isinstance(value, list):
|
||||
raise HTTPException(status_code=400, detail=f"{field} must be a list of public keys")
|
||||
|
||||
|
||||
def _enforce_scope(config_type: str, scope: dict) -> dict:
|
||||
"""Enforce type-specific scope constraints. Returns normalized scope."""
|
||||
if config_type == "mqtt_community":
|
||||
@@ -326,7 +350,7 @@ def _enforce_scope(config_type: str, scope: dict) -> dict:
|
||||
return {"messages": "none", "raw_packets": "all"}
|
||||
if config_type == "bot":
|
||||
return {"messages": "all", "raw_packets": "none"}
|
||||
if config_type in ("webhook", "apprise"):
|
||||
if config_type in ("webhook", "apprise", "mqtt_ha"):
|
||||
messages = scope.get("messages", "all")
|
||||
if messages not in ("all", "none") and not isinstance(messages, dict):
|
||||
raise HTTPException(
|
||||
|
||||
Reference in New Issue
Block a user