Files
Remote-Terminal-for-MeshCore/app/mqtt_base.py
2026-03-03 09:17:57 -08:00

230 lines
8.9 KiB
Python

"""Shared base class for MQTT publisher lifecycle management.
Both ``MqttPublisher`` (private broker) and ``CommunityMqttPublisher``
(community aggregator) inherit from ``BaseMqttPublisher``, which owns
the connection-loop skeleton, reconnect/backoff logic, and publish method.
Subclasses override a small set of hooks to control configuration checks,
client construction, toast messages, and optional wait-loop behavior.
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from abc import ABC, abstractmethod
from typing import Any
import aiomqtt
from app.models import AppSettings
logger = logging.getLogger(__name__)
_BACKOFF_MIN = 5
def _broadcast_health() -> None:
"""Push updated health (including MQTT status) to all WS clients."""
from app.radio import radio_manager
from app.websocket import broadcast_health
broadcast_health(radio_manager.is_connected, radio_manager.connection_info)
class BaseMqttPublisher(ABC):
"""Base class for MQTT publishers with shared lifecycle management.
Subclasses implement the abstract hooks to control configuration checks,
client construction, toast messages, and optional wait-loop behavior.
"""
_backoff_max: int = 30
_log_prefix: str = "MQTT"
_not_configured_timeout: float | None = None # None = block forever
def __init__(self) -> None:
self._client: aiomqtt.Client | None = None
self._task: asyncio.Task[None] | None = None
self._settings: AppSettings | None = None
self._settings_version: int = 0
self._version_event: asyncio.Event = asyncio.Event()
self.connected: bool = False
# ── Lifecycle ──────────────────────────────────────────────────────
async def start(self, settings: AppSettings) -> None:
"""Start the background connection loop."""
self._settings = settings
self._settings_version += 1
self._version_event.set()
if self._task is None or self._task.done():
self._task = asyncio.create_task(self._connection_loop())
async def stop(self) -> None:
"""Cancel the background task and disconnect."""
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
self._client = None
self.connected = False
async def restart(self, settings: AppSettings) -> None:
"""Called when settings change — stop + start."""
await self.stop()
await self.start(settings)
async def publish(self, topic: str, payload: dict[str, Any], *, retain: bool = False) -> None:
"""Publish a JSON payload. Drops silently if not connected."""
if self._client is None or not self.connected:
return
try:
await self._client.publish(topic, json.dumps(payload), retain=retain)
except Exception as e:
logger.warning("%s publish failed on %s: %s", self._log_prefix, topic, e)
self.connected = False
# Wake the connection loop so it exits the wait and reconnects
self._settings_version += 1
self._version_event.set()
# ── Abstract hooks ─────────────────────────────────────────────────
@abstractmethod
def _is_configured(self) -> bool:
"""Return True when this publisher should attempt to connect."""
@abstractmethod
def _build_client_kwargs(self, settings: AppSettings) -> dict[str, Any]:
"""Return the keyword arguments for ``aiomqtt.Client(...)``."""
@abstractmethod
def _on_connected(self, settings: AppSettings) -> tuple[str, str]:
"""Return ``(title, detail)`` for the success toast on connect."""
@abstractmethod
def _on_error(self) -> tuple[str, str]:
"""Return ``(title, detail)`` for the error toast on connect failure."""
# ── Optional hooks ─────────────────────────────────────────────────
def _should_break_wait(self, elapsed: float) -> bool:
"""Return True to break the inner wait (e.g. token expiry)."""
return False
async def _pre_connect(self, settings: AppSettings) -> bool:
"""Called before connecting. Return True to proceed, False to retry."""
return True
def _on_not_configured(self) -> None:
"""Called each time the loop finds the publisher not configured."""
return # no-op by default; subclasses may override
async def _on_connected_async(self, settings: AppSettings) -> None:
"""Async hook called after connection succeeds (before health broadcast).
Subclasses can override to publish messages immediately after connecting.
"""
return # no-op by default
async def _on_periodic_wake(self, elapsed: float) -> None:
"""Called every ~60s while connected. Subclasses may override."""
return
# ── Connection loop ────────────────────────────────────────────────
async def _connection_loop(self) -> None:
"""Background loop: connect, wait for version change, reconnect on failure."""
from app.websocket import broadcast_error, broadcast_success
backoff = _BACKOFF_MIN
while True:
if not self._is_configured():
self._on_not_configured()
self.connected = False
self._client = None
self._version_event.clear()
try:
if self._not_configured_timeout is None:
await self._version_event.wait()
else:
await asyncio.wait_for(
self._version_event.wait(),
timeout=self._not_configured_timeout,
)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
return
continue
settings = self._settings
assert settings is not None # guaranteed by _is_configured()
version_at_connect = self._settings_version
try:
if not await self._pre_connect(settings):
continue
client_kwargs = self._build_client_kwargs(settings)
connect_time = time.monotonic()
async with aiomqtt.Client(**client_kwargs) as client:
self._client = client
self.connected = True
backoff = _BACKOFF_MIN
title, detail = self._on_connected(settings)
broadcast_success(title, detail)
await self._on_connected_async(settings)
_broadcast_health()
# Wait until cancelled or settings version changes.
# The 60s timeout is a housekeeping wake-up; actual connection
# liveness is handled by paho-mqtt's keepalive mechanism.
while self._settings_version == version_at_connect:
self._version_event.clear()
try:
await asyncio.wait_for(self._version_event.wait(), timeout=60)
except asyncio.TimeoutError:
elapsed = time.monotonic() - connect_time
await self._on_periodic_wake(elapsed)
if self._should_break_wait(elapsed):
break
continue
# async with exited — client is now closed
self._client = None
self.connected = False
_broadcast_health()
except asyncio.CancelledError:
self.connected = False
self._client = None
return
except Exception as e:
self.connected = False
self._client = None
title, detail = self._on_error()
broadcast_error(title, detail)
_broadcast_health()
logger.warning(
"%s connection error: %s (reconnecting in %ds)",
self._log_prefix,
e,
backoff,
)
try:
await asyncio.sleep(backoff)
except asyncio.CancelledError:
return
backoff = min(backoff * 2, self._backoff_max)