mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
Refactor to combined base for MQTT
This commit is contained in:
@@ -152,7 +152,8 @@ This message-layer echo/path handling is independent of raw-packet storage dedup
|
||||
│ ├── event_handlers.py # Radio events
|
||||
│ ├── decoder.py # Packet decryption
|
||||
│ ├── websocket.py # Real-time broadcasts
|
||||
│ ├── mqtt.py # Optional MQTT publisher
|
||||
│ ├── mqtt_base.py # Shared MQTT publisher base class (lifecycle, reconnect, backoff)
|
||||
│ ├── mqtt.py # Private MQTT publisher
|
||||
│ └── community_mqtt.py # Community MQTT publisher (raw packet sharing)
|
||||
├── frontend/ # React frontend
|
||||
│ ├── AGENTS.md # Frontend documentation
|
||||
|
||||
@@ -27,7 +27,8 @@ app/
|
||||
├── packet_processor.py # Raw packet pipeline, dedup, path handling
|
||||
├── event_handlers.py # MeshCore event subscriptions and ACK tracking
|
||||
├── websocket.py # WS manager + broadcast helpers
|
||||
├── mqtt.py # Optional MQTT publisher (fire-and-forget forwarding)
|
||||
├── mqtt_base.py # Shared MQTT publisher base class (lifecycle, reconnect, backoff)
|
||||
├── mqtt.py # Private MQTT publisher (fire-and-forget forwarding)
|
||||
├── community_mqtt.py # Community MQTT publisher (raw packet sharing)
|
||||
├── bot.py # Bot execution and outbound bot sends
|
||||
├── dependencies.py # Shared FastAPI dependency providers
|
||||
|
||||
@@ -20,10 +20,10 @@ import time
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import aiomqtt
|
||||
import nacl.bindings
|
||||
|
||||
from app.models import AppSettings
|
||||
from app.mqtt_base import BaseMqttPublisher
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -31,10 +31,6 @@ _DEFAULT_BROKER = "mqtt-us-v1.letsmesh.net"
|
||||
_DEFAULT_PORT = 443 # Community protocol uses WSS on port 443 by default
|
||||
_CLIENT_ID = "RemoteTerm (github.com/jkingsman/Remote-Terminal-for-MeshCore)"
|
||||
|
||||
# Reconnect backoff: start at 5s, cap at 60s
|
||||
_BACKOFF_MIN = 5
|
||||
_BACKOFF_MAX = 60
|
||||
|
||||
# Proactive JWT renewal: reconnect 1 hour before the 24h token expires
|
||||
_TOKEN_LIFETIME = 86400 # 24 hours (must match _generate_jwt_token exp)
|
||||
_TOKEN_RENEWAL_THRESHOLD = _TOKEN_LIFETIME - 3600 # 23 hours
|
||||
@@ -267,59 +263,12 @@ def _format_raw_packet(data: dict[str, Any], device_name: str, public_key_hex: s
|
||||
return packet
|
||||
|
||||
|
||||
def _broadcast_health_update() -> None:
|
||||
"""Push a health broadcast so the frontend sees updated MQTT status badges."""
|
||||
from app.radio import radio_manager
|
||||
from app.websocket import broadcast_health
|
||||
|
||||
broadcast_health(radio_manager.is_connected, radio_manager.connection_info)
|
||||
|
||||
|
||||
class CommunityMqttPublisher:
|
||||
class CommunityMqttPublisher(BaseMqttPublisher):
|
||||
"""Manages the community MQTT connection and publishes raw packets."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._client: aiomqtt.Client | None = None
|
||||
self._task: asyncio.Task[None] | None = None
|
||||
self._settings: AppSettings | None = None
|
||||
self._settings_version: int = 0
|
||||
self._version_event: asyncio.Event = asyncio.Event()
|
||||
self.connected: bool = False
|
||||
|
||||
async def start(self, settings: AppSettings) -> None:
|
||||
"""Start the background connection loop."""
|
||||
self._settings = settings
|
||||
self._settings_version += 1
|
||||
self._version_event.set()
|
||||
if self._task is None or self._task.done():
|
||||
self._task = asyncio.create_task(self._connection_loop())
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Cancel the background task and disconnect."""
|
||||
if self._task and not self._task.done():
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
self._client = None
|
||||
self.connected = False
|
||||
|
||||
async def restart(self, settings: AppSettings) -> None:
|
||||
"""Called when community MQTT settings change — stop + start."""
|
||||
await self.stop()
|
||||
await self.start(settings)
|
||||
|
||||
async def publish(self, topic: str, payload: dict[str, Any]) -> None:
|
||||
"""Publish a JSON payload. Drops silently if not connected."""
|
||||
if self._client is None or not self.connected:
|
||||
return
|
||||
try:
|
||||
await self._client.publish(topic, json.dumps(payload))
|
||||
except Exception as e:
|
||||
logger.warning("Community MQTT publish failed on %s: %s", topic, e)
|
||||
self.connected = False
|
||||
_backoff_max = 60
|
||||
_log_prefix = "Community MQTT"
|
||||
_not_configured_timeout: float | None = 30
|
||||
|
||||
def _is_configured(self) -> bool:
|
||||
"""Check if community MQTT is enabled and keys are available."""
|
||||
@@ -327,122 +276,70 @@ class CommunityMqttPublisher:
|
||||
|
||||
return bool(self._settings and self._settings.community_mqtt_enabled and has_private_key())
|
||||
|
||||
async def _connection_loop(self) -> None:
|
||||
"""Background loop: connect, wait, reconnect on failure."""
|
||||
def _build_client_kwargs(self, settings: AppSettings) -> dict[str, Any]:
|
||||
from app.keystore import get_private_key, get_public_key
|
||||
from app.websocket import broadcast_error, broadcast_success
|
||||
|
||||
backoff = _BACKOFF_MIN
|
||||
private_key = get_private_key()
|
||||
public_key = get_public_key()
|
||||
assert private_key is not None and public_key is not None # guaranteed by _pre_connect
|
||||
|
||||
while True:
|
||||
if not self._is_configured():
|
||||
self.connected = False
|
||||
self._client = None
|
||||
self._version_event.clear()
|
||||
try:
|
||||
# Also wake periodically so newly exported keys are detected without a settings change.
|
||||
await asyncio.wait_for(self._version_event.wait(), timeout=30)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
continue
|
||||
pubkey_hex = public_key.hex().upper()
|
||||
broker_raw = settings.community_mqtt_broker or _DEFAULT_BROKER
|
||||
broker_host, broker_port = _parse_broker_address(broker_raw)
|
||||
jwt_token = _generate_jwt_token(
|
||||
private_key,
|
||||
public_key,
|
||||
audience=broker_host,
|
||||
email=settings.community_mqtt_email or "",
|
||||
)
|
||||
|
||||
settings = self._settings
|
||||
assert settings is not None
|
||||
version_at_connect = self._settings_version
|
||||
tls_context = ssl.create_default_context()
|
||||
|
||||
return {
|
||||
"hostname": broker_host,
|
||||
"port": broker_port,
|
||||
"transport": "websockets",
|
||||
"tls_context": tls_context,
|
||||
"websocket_path": "/",
|
||||
"username": f"v1_{pubkey_hex}",
|
||||
"password": jwt_token,
|
||||
}
|
||||
|
||||
def _on_connected(self, settings: AppSettings) -> tuple[str, str]:
|
||||
broker_raw = settings.community_mqtt_broker or _DEFAULT_BROKER
|
||||
broker_host, broker_port = _parse_broker_address(broker_raw)
|
||||
return ("Community MQTT connected", f"{broker_host}:{broker_port}")
|
||||
|
||||
def _on_error(self) -> tuple[str, str]:
|
||||
return (
|
||||
"Community MQTT connection failure",
|
||||
"Check your internet connection or try again later.",
|
||||
)
|
||||
|
||||
def _should_break_wait(self, elapsed: float) -> bool:
|
||||
if not self.connected:
|
||||
logger.info("Community MQTT publish failure detected, reconnecting")
|
||||
return True
|
||||
if elapsed >= _TOKEN_RENEWAL_THRESHOLD:
|
||||
logger.info("Community MQTT JWT nearing expiry, reconnecting")
|
||||
return True
|
||||
return False
|
||||
|
||||
async def _pre_connect(self, settings: AppSettings) -> bool:
|
||||
from app.keystore import get_private_key, get_public_key
|
||||
|
||||
private_key = get_private_key()
|
||||
public_key = get_public_key()
|
||||
if private_key is None or public_key is None:
|
||||
# Keys not available yet, wait for settings change or key export
|
||||
self.connected = False
|
||||
self._version_event.clear()
|
||||
try:
|
||||
private_key = get_private_key()
|
||||
public_key = get_public_key()
|
||||
if private_key is None or public_key is None:
|
||||
# Keys not available yet, wait for settings change
|
||||
self.connected = False
|
||||
self._version_event.clear()
|
||||
try:
|
||||
await asyncio.wait_for(self._version_event.wait(), timeout=30)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
continue
|
||||
|
||||
pubkey_hex = public_key.hex().upper()
|
||||
broker_raw = settings.community_mqtt_broker or _DEFAULT_BROKER
|
||||
broker_host, broker_port = _parse_broker_address(broker_raw)
|
||||
jwt_token = _generate_jwt_token(
|
||||
private_key,
|
||||
public_key,
|
||||
audience=broker_host,
|
||||
email=settings.community_mqtt_email or "",
|
||||
)
|
||||
token_created_at = time.monotonic()
|
||||
|
||||
tls_context = ssl.create_default_context()
|
||||
|
||||
async with aiomqtt.Client(
|
||||
hostname=broker_host,
|
||||
port=broker_port,
|
||||
transport="websockets",
|
||||
tls_context=tls_context,
|
||||
websocket_path="/",
|
||||
username=f"v1_{pubkey_hex}",
|
||||
password=jwt_token,
|
||||
) as client:
|
||||
self._client = client
|
||||
self.connected = True
|
||||
backoff = _BACKOFF_MIN
|
||||
|
||||
broadcast_success(
|
||||
"Community MQTT connected",
|
||||
f"{broker_host}:{broker_port}",
|
||||
)
|
||||
_broadcast_health_update()
|
||||
|
||||
# Wait until cancelled, settings change, or token nears expiry
|
||||
while self._settings_version == version_at_connect:
|
||||
self._version_event.clear()
|
||||
try:
|
||||
await asyncio.wait_for(self._version_event.wait(), timeout=60)
|
||||
except asyncio.TimeoutError:
|
||||
# Detect publish failure: reconnect so packets resume
|
||||
if not self.connected:
|
||||
logger.info("Community MQTT publish failure detected, reconnecting")
|
||||
break
|
||||
# Proactive JWT renewal: reconnect before token expires
|
||||
if time.monotonic() - token_created_at >= _TOKEN_RENEWAL_THRESHOLD:
|
||||
logger.info("Community MQTT JWT nearing expiry, reconnecting")
|
||||
break
|
||||
continue
|
||||
|
||||
# async with exited — client is now closed
|
||||
self._client = None
|
||||
self.connected = False
|
||||
_broadcast_health_update()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
self.connected = False
|
||||
self._client = None
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
self.connected = False
|
||||
self._client = None
|
||||
|
||||
broadcast_error(
|
||||
"Community MQTT connection failure",
|
||||
"Check your internet connection or try again later.",
|
||||
)
|
||||
_broadcast_health_update()
|
||||
logger.warning(
|
||||
"Community MQTT connection error: %s (reconnecting in %ds)", e, backoff
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.sleep(backoff)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
backoff = min(backoff * 2, _BACKOFF_MAX)
|
||||
await asyncio.wait_for(self._version_event.wait(), timeout=30)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
|
||||
150
app/mqtt.py
150
app/mqtt.py
@@ -3,148 +3,40 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import ssl
|
||||
from typing import Any
|
||||
|
||||
import aiomqtt
|
||||
|
||||
from app.models import AppSettings
|
||||
from app.mqtt_base import BaseMqttPublisher
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Reconnect backoff: start at 5s, cap at 30s
|
||||
_BACKOFF_MIN = 5
|
||||
_BACKOFF_MAX = 30
|
||||
|
||||
|
||||
class MqttPublisher:
|
||||
class MqttPublisher(BaseMqttPublisher):
|
||||
"""Manages an MQTT connection and publishes mesh network events."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._client: aiomqtt.Client | None = None
|
||||
self._task: asyncio.Task[None] | None = None
|
||||
self._settings: AppSettings | None = None
|
||||
self._settings_version: int = 0
|
||||
self._version_event: asyncio.Event = asyncio.Event()
|
||||
self.connected: bool = False
|
||||
_backoff_max = 30
|
||||
_log_prefix = "MQTT"
|
||||
|
||||
async def start(self, settings: AppSettings) -> None:
|
||||
"""Start the background connection loop."""
|
||||
self._settings = settings
|
||||
self._settings_version += 1
|
||||
self._version_event.set()
|
||||
if self._task is None or self._task.done():
|
||||
self._task = asyncio.create_task(self._connection_loop())
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Cancel the background task and disconnect."""
|
||||
if self._task and not self._task.done():
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
self._client = None
|
||||
self.connected = False
|
||||
|
||||
async def restart(self, settings: AppSettings) -> None:
|
||||
"""Called when MQTT settings change — stop + start."""
|
||||
await self.stop()
|
||||
await self.start(settings)
|
||||
|
||||
async def publish(self, topic: str, payload: dict[str, Any]) -> None:
|
||||
"""Publish a JSON payload. Drops silently if not connected."""
|
||||
if self._client is None or not self.connected:
|
||||
return
|
||||
try:
|
||||
await self._client.publish(topic, json.dumps(payload))
|
||||
except Exception as e:
|
||||
logger.warning("MQTT publish failed on %s: %s", topic, e)
|
||||
self.connected = False
|
||||
# Wake the connection loop so it exits the wait and reconnects
|
||||
self._settings_version += 1
|
||||
self._version_event.set()
|
||||
|
||||
def _mqtt_configured(self) -> bool:
|
||||
def _is_configured(self) -> bool:
|
||||
"""Check if MQTT is configured (broker host is set)."""
|
||||
return bool(self._settings and self._settings.mqtt_broker_host)
|
||||
|
||||
async def _connection_loop(self) -> None:
|
||||
"""Background loop: connect, wait, reconnect on failure."""
|
||||
from app.websocket import broadcast_error, broadcast_success
|
||||
def _build_client_kwargs(self, settings: AppSettings) -> dict[str, Any]:
|
||||
return {
|
||||
"hostname": settings.mqtt_broker_host,
|
||||
"port": settings.mqtt_broker_port,
|
||||
"username": settings.mqtt_username or None,
|
||||
"password": settings.mqtt_password or None,
|
||||
"tls_context": self._build_tls_context(settings),
|
||||
}
|
||||
|
||||
backoff = _BACKOFF_MIN
|
||||
def _on_connected(self, settings: AppSettings) -> tuple[str, str]:
|
||||
return ("MQTT connected", f"{settings.mqtt_broker_host}:{settings.mqtt_broker_port}")
|
||||
|
||||
while True:
|
||||
if not self._mqtt_configured():
|
||||
self.connected = False
|
||||
self._client = None
|
||||
# Wait until settings change (which might configure MQTT)
|
||||
self._version_event.clear()
|
||||
try:
|
||||
await self._version_event.wait()
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
continue
|
||||
|
||||
settings = self._settings
|
||||
assert settings is not None # guaranteed by _mqtt_configured()
|
||||
version_at_connect = self._settings_version
|
||||
|
||||
try:
|
||||
tls_context = self._build_tls_context(settings)
|
||||
|
||||
async with aiomqtt.Client(
|
||||
hostname=settings.mqtt_broker_host,
|
||||
port=settings.mqtt_broker_port,
|
||||
username=settings.mqtt_username or None,
|
||||
password=settings.mqtt_password or None,
|
||||
tls_context=tls_context,
|
||||
) as client:
|
||||
self._client = client
|
||||
self.connected = True
|
||||
backoff = _BACKOFF_MIN
|
||||
|
||||
broadcast_success(
|
||||
"MQTT connected",
|
||||
f"{settings.mqtt_broker_host}:{settings.mqtt_broker_port}",
|
||||
)
|
||||
_broadcast_mqtt_health()
|
||||
|
||||
# Wait until cancelled or settings version changes.
|
||||
# The 60s timeout is a housekeeping wake-up; actual connection
|
||||
# liveness is handled by paho-mqtt's keepalive mechanism.
|
||||
while self._settings_version == version_at_connect:
|
||||
self._version_event.clear()
|
||||
try:
|
||||
await asyncio.wait_for(self._version_event.wait(), timeout=60)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
except asyncio.CancelledError:
|
||||
self.connected = False
|
||||
self._client = None
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
self.connected = False
|
||||
self._client = None
|
||||
|
||||
broadcast_error(
|
||||
"MQTT connection failure",
|
||||
"Please correct the settings or disable.",
|
||||
)
|
||||
_broadcast_mqtt_health()
|
||||
logger.warning("MQTT connection error: %s (reconnecting in %ds)", e, backoff)
|
||||
|
||||
try:
|
||||
await asyncio.sleep(backoff)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
backoff = min(backoff * 2, _BACKOFF_MAX)
|
||||
def _on_error(self) -> tuple[str, str]:
|
||||
return ("MQTT connection failure", "Please correct the settings or disable.")
|
||||
|
||||
@staticmethod
|
||||
def _build_tls_context(settings: AppSettings) -> ssl.SSLContext | None:
|
||||
@@ -162,14 +54,6 @@ class MqttPublisher:
|
||||
mqtt_publisher = MqttPublisher()
|
||||
|
||||
|
||||
def _broadcast_mqtt_health() -> None:
|
||||
"""Push updated health (including mqtt_status) to all WS clients."""
|
||||
from app.radio import radio_manager
|
||||
from app.websocket import broadcast_health
|
||||
|
||||
broadcast_health(radio_manager.is_connected, radio_manager.connection_info)
|
||||
|
||||
|
||||
def mqtt_broadcast(event_type: str, data: dict[str, Any]) -> None:
|
||||
"""Fire-and-forget MQTT publish, matching broadcast_event's pattern."""
|
||||
if event_type not in ("message", "raw_packet"):
|
||||
|
||||
211
app/mqtt_base.py
Normal file
211
app/mqtt_base.py
Normal file
@@ -0,0 +1,211 @@
|
||||
"""Shared base class for MQTT publisher lifecycle management.
|
||||
|
||||
Both ``MqttPublisher`` (private broker) and ``CommunityMqttPublisher``
|
||||
(community aggregator) inherit from ``BaseMqttPublisher``, which owns
|
||||
the connection-loop skeleton, reconnect/backoff logic, and publish method.
|
||||
Subclasses override a small set of hooks to control configuration checks,
|
||||
client construction, toast messages, and optional wait-loop behavior.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any
|
||||
|
||||
import aiomqtt
|
||||
|
||||
from app.models import AppSettings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_BACKOFF_MIN = 5
|
||||
|
||||
|
||||
def _broadcast_health() -> None:
|
||||
"""Push updated health (including MQTT status) to all WS clients."""
|
||||
from app.radio import radio_manager
|
||||
from app.websocket import broadcast_health
|
||||
|
||||
broadcast_health(radio_manager.is_connected, radio_manager.connection_info)
|
||||
|
||||
|
||||
class BaseMqttPublisher(ABC):
|
||||
"""Base class for MQTT publishers with shared lifecycle management.
|
||||
|
||||
Subclasses implement the abstract hooks to control configuration checks,
|
||||
client construction, toast messages, and optional wait-loop behavior.
|
||||
"""
|
||||
|
||||
_backoff_max: int = 30
|
||||
_log_prefix: str = "MQTT"
|
||||
_not_configured_timeout: float | None = None # None = block forever
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._client: aiomqtt.Client | None = None
|
||||
self._task: asyncio.Task[None] | None = None
|
||||
self._settings: AppSettings | None = None
|
||||
self._settings_version: int = 0
|
||||
self._version_event: asyncio.Event = asyncio.Event()
|
||||
self.connected: bool = False
|
||||
|
||||
# ── Lifecycle ──────────────────────────────────────────────────────
|
||||
|
||||
async def start(self, settings: AppSettings) -> None:
|
||||
"""Start the background connection loop."""
|
||||
self._settings = settings
|
||||
self._settings_version += 1
|
||||
self._version_event.set()
|
||||
if self._task is None or self._task.done():
|
||||
self._task = asyncio.create_task(self._connection_loop())
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Cancel the background task and disconnect."""
|
||||
if self._task and not self._task.done():
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
self._client = None
|
||||
self.connected = False
|
||||
|
||||
async def restart(self, settings: AppSettings) -> None:
|
||||
"""Called when settings change — stop + start."""
|
||||
await self.stop()
|
||||
await self.start(settings)
|
||||
|
||||
async def publish(self, topic: str, payload: dict[str, Any]) -> None:
|
||||
"""Publish a JSON payload. Drops silently if not connected."""
|
||||
if self._client is None or not self.connected:
|
||||
return
|
||||
try:
|
||||
await self._client.publish(topic, json.dumps(payload))
|
||||
except Exception as e:
|
||||
logger.warning("%s publish failed on %s: %s", self._log_prefix, topic, e)
|
||||
self.connected = False
|
||||
# Wake the connection loop so it exits the wait and reconnects
|
||||
self._settings_version += 1
|
||||
self._version_event.set()
|
||||
|
||||
# ── Abstract hooks ─────────────────────────────────────────────────
|
||||
|
||||
@abstractmethod
|
||||
def _is_configured(self) -> bool:
|
||||
"""Return True when this publisher should attempt to connect."""
|
||||
|
||||
@abstractmethod
|
||||
def _build_client_kwargs(self, settings: AppSettings) -> dict[str, Any]:
|
||||
"""Return the keyword arguments for ``aiomqtt.Client(...)``."""
|
||||
|
||||
@abstractmethod
|
||||
def _on_connected(self, settings: AppSettings) -> tuple[str, str]:
|
||||
"""Return ``(title, detail)`` for the success toast on connect."""
|
||||
|
||||
@abstractmethod
|
||||
def _on_error(self) -> tuple[str, str]:
|
||||
"""Return ``(title, detail)`` for the error toast on connect failure."""
|
||||
|
||||
# ── Optional hooks ─────────────────────────────────────────────────
|
||||
|
||||
def _should_break_wait(self, elapsed: float) -> bool:
|
||||
"""Return True to break the inner wait (e.g. token expiry)."""
|
||||
return False
|
||||
|
||||
async def _pre_connect(self, settings: AppSettings) -> bool:
|
||||
"""Called before connecting. Return True to proceed, False to retry."""
|
||||
return True
|
||||
|
||||
# ── Connection loop ────────────────────────────────────────────────
|
||||
|
||||
async def _connection_loop(self) -> None:
|
||||
"""Background loop: connect, wait for version change, reconnect on failure."""
|
||||
from app.websocket import broadcast_error, broadcast_success
|
||||
|
||||
backoff = _BACKOFF_MIN
|
||||
|
||||
while True:
|
||||
if not self._is_configured():
|
||||
self.connected = False
|
||||
self._client = None
|
||||
self._version_event.clear()
|
||||
try:
|
||||
if self._not_configured_timeout is None:
|
||||
await self._version_event.wait()
|
||||
else:
|
||||
await asyncio.wait_for(
|
||||
self._version_event.wait(),
|
||||
timeout=self._not_configured_timeout,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
continue
|
||||
|
||||
settings = self._settings
|
||||
assert settings is not None # guaranteed by _is_configured()
|
||||
version_at_connect = self._settings_version
|
||||
|
||||
try:
|
||||
if not await self._pre_connect(settings):
|
||||
continue
|
||||
|
||||
client_kwargs = self._build_client_kwargs(settings)
|
||||
connect_time = time.monotonic()
|
||||
|
||||
async with aiomqtt.Client(**client_kwargs) as client:
|
||||
self._client = client
|
||||
self.connected = True
|
||||
backoff = _BACKOFF_MIN
|
||||
|
||||
title, detail = self._on_connected(settings)
|
||||
broadcast_success(title, detail)
|
||||
_broadcast_health()
|
||||
|
||||
# Wait until cancelled or settings version changes.
|
||||
# The 60s timeout is a housekeeping wake-up; actual connection
|
||||
# liveness is handled by paho-mqtt's keepalive mechanism.
|
||||
while self._settings_version == version_at_connect:
|
||||
self._version_event.clear()
|
||||
try:
|
||||
await asyncio.wait_for(self._version_event.wait(), timeout=60)
|
||||
except asyncio.TimeoutError:
|
||||
elapsed = time.monotonic() - connect_time
|
||||
if self._should_break_wait(elapsed):
|
||||
break
|
||||
continue
|
||||
|
||||
# async with exited — client is now closed
|
||||
self._client = None
|
||||
self.connected = False
|
||||
_broadcast_health()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
self.connected = False
|
||||
self._client = None
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
self.connected = False
|
||||
self._client = None
|
||||
|
||||
title, detail = self._on_error()
|
||||
broadcast_error(title, detail)
|
||||
_broadcast_health()
|
||||
logger.warning(
|
||||
"%s connection error: %s (reconnecting in %ds)",
|
||||
self._log_prefix,
|
||||
e,
|
||||
backoff,
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.sleep(backoff)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
backoff = min(backoff * 2, self._backoff_max)
|
||||
@@ -40,7 +40,7 @@ async def build_health_data(radio_connected: bool, connection_info: str | None)
|
||||
try:
|
||||
from app.mqtt import mqtt_publisher
|
||||
|
||||
if mqtt_publisher._mqtt_configured():
|
||||
if mqtt_publisher._is_configured():
|
||||
mqtt_status = "connected" if mqtt_publisher.connected else "disconnected"
|
||||
else:
|
||||
mqtt_status = "disabled"
|
||||
|
||||
@@ -81,12 +81,12 @@ class TestMqttPublisher:
|
||||
def test_not_configured_when_host_empty(self):
|
||||
pub = MqttPublisher()
|
||||
pub._settings = _make_settings(mqtt_broker_host="")
|
||||
assert pub._mqtt_configured() is False
|
||||
assert pub._is_configured() is False
|
||||
|
||||
def test_configured_when_host_set(self):
|
||||
pub = MqttPublisher()
|
||||
pub._settings = _make_settings(mqtt_broker_host="broker.local")
|
||||
assert pub._mqtt_configured() is True
|
||||
assert pub._is_configured() is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_drops_silently_when_disconnected(self):
|
||||
@@ -300,8 +300,8 @@ class TestConnectionLoop:
|
||||
mock_client.__aenter__ = AsyncMock(side_effect=side_effect_aenter)
|
||||
|
||||
with (
|
||||
patch("app.mqtt.aiomqtt.Client", return_value=mock_client),
|
||||
patch("app.mqtt._broadcast_mqtt_health"),
|
||||
patch("app.mqtt_base.aiomqtt.Client", return_value=mock_client),
|
||||
patch("app.mqtt_base._broadcast_health"),
|
||||
patch("app.websocket.broadcast_success"),
|
||||
patch("app.websocket.broadcast_health"),
|
||||
):
|
||||
@@ -321,7 +321,7 @@ class TestConnectionLoop:
|
||||
"""Connection loop should retry after a connection error with backoff."""
|
||||
import asyncio
|
||||
|
||||
from app.mqtt import _BACKOFF_MIN
|
||||
from app.mqtt_base import _BACKOFF_MIN
|
||||
|
||||
pub = MqttPublisher()
|
||||
settings = _make_settings()
|
||||
@@ -354,12 +354,12 @@ class TestConnectionLoop:
|
||||
return factory
|
||||
|
||||
with (
|
||||
patch("app.mqtt.aiomqtt.Client", side_effect=make_client_factory()),
|
||||
patch("app.mqtt._broadcast_mqtt_health"),
|
||||
patch("app.mqtt_base.aiomqtt.Client", side_effect=make_client_factory()),
|
||||
patch("app.mqtt_base._broadcast_health"),
|
||||
patch("app.websocket.broadcast_success"),
|
||||
patch("app.websocket.broadcast_error"),
|
||||
patch("app.websocket.broadcast_health"),
|
||||
patch("app.mqtt.asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
|
||||
patch("app.mqtt_base.asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
|
||||
):
|
||||
await pub.start(settings)
|
||||
|
||||
@@ -375,10 +375,10 @@ class TestConnectionLoop:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backoff_increases_on_repeated_failures(self):
|
||||
"""Backoff should double after each failure, capped at _BACKOFF_MAX."""
|
||||
"""Backoff should double after each failure, capped at _backoff_max."""
|
||||
import asyncio
|
||||
|
||||
from app.mqtt import _BACKOFF_MAX, _BACKOFF_MIN
|
||||
from app.mqtt_base import _BACKOFF_MIN
|
||||
|
||||
pub = MqttPublisher()
|
||||
settings = _make_settings()
|
||||
@@ -408,11 +408,11 @@ class TestConnectionLoop:
|
||||
raise asyncio.CancelledError
|
||||
|
||||
with (
|
||||
patch("app.mqtt.aiomqtt.Client", side_effect=factory),
|
||||
patch("app.mqtt._broadcast_mqtt_health"),
|
||||
patch("app.mqtt_base.aiomqtt.Client", side_effect=factory),
|
||||
patch("app.mqtt_base._broadcast_health"),
|
||||
patch("app.websocket.broadcast_error"),
|
||||
patch("app.websocket.broadcast_health"),
|
||||
patch("app.mqtt.asyncio.sleep", side_effect=capture_sleep),
|
||||
patch("app.mqtt_base.asyncio.sleep", side_effect=capture_sleep),
|
||||
):
|
||||
await pub.start(settings)
|
||||
try:
|
||||
@@ -423,8 +423,8 @@ class TestConnectionLoop:
|
||||
assert sleep_args[0] == _BACKOFF_MIN
|
||||
assert sleep_args[1] == _BACKOFF_MIN * 2
|
||||
assert sleep_args[2] == _BACKOFF_MIN * 4
|
||||
# Fourth should be capped at _BACKOFF_MAX (5*8=40 > 30)
|
||||
assert sleep_args[3] == _BACKOFF_MAX
|
||||
# Fourth should be capped at _backoff_max (5*8=40 > 30)
|
||||
assert sleep_args[3] == MqttPublisher._backoff_max
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_waits_for_settings_when_unconfigured(self):
|
||||
@@ -449,8 +449,8 @@ class TestConnectionLoop:
|
||||
return mock
|
||||
|
||||
with (
|
||||
patch("app.mqtt.aiomqtt.Client", side_effect=make_success_client),
|
||||
patch("app.mqtt._broadcast_mqtt_health"),
|
||||
patch("app.mqtt_base.aiomqtt.Client", side_effect=make_success_client),
|
||||
patch("app.mqtt_base._broadcast_health"),
|
||||
patch("app.websocket.broadcast_success"),
|
||||
patch("app.websocket.broadcast_health"),
|
||||
):
|
||||
@@ -472,7 +472,7 @@ class TestConnectionLoop:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_health_broadcast_on_connect_and_failure(self):
|
||||
"""_broadcast_mqtt_health should be called on connect and on failure."""
|
||||
"""_broadcast_health should be called on connect and on failure."""
|
||||
import asyncio
|
||||
|
||||
pub = MqttPublisher()
|
||||
@@ -497,8 +497,8 @@ class TestConnectionLoop:
|
||||
return mock
|
||||
|
||||
with (
|
||||
patch("app.mqtt.aiomqtt.Client", side_effect=make_client),
|
||||
patch("app.mqtt._broadcast_mqtt_health", side_effect=track_health),
|
||||
patch("app.mqtt_base.aiomqtt.Client", side_effect=make_client),
|
||||
patch("app.mqtt_base._broadcast_health", side_effect=track_health),
|
||||
patch("app.websocket.broadcast_success"),
|
||||
patch("app.websocket.broadcast_health"),
|
||||
):
|
||||
@@ -512,7 +512,7 @@ class TestConnectionLoop:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_health_broadcast_on_connection_error(self):
|
||||
"""_broadcast_mqtt_health should be called when connection fails."""
|
||||
"""_broadcast_health should be called when connection fails."""
|
||||
import asyncio
|
||||
|
||||
pub = MqttPublisher()
|
||||
@@ -534,11 +534,11 @@ class TestConnectionLoop:
|
||||
return mock
|
||||
|
||||
with (
|
||||
patch("app.mqtt.aiomqtt.Client", side_effect=make_failing_client),
|
||||
patch("app.mqtt._broadcast_mqtt_health", side_effect=track_health),
|
||||
patch("app.mqtt_base.aiomqtt.Client", side_effect=make_failing_client),
|
||||
patch("app.mqtt_base._broadcast_health", side_effect=track_health),
|
||||
patch("app.websocket.broadcast_error"),
|
||||
patch("app.websocket.broadcast_health"),
|
||||
patch("app.mqtt.asyncio.sleep", side_effect=cancel_on_sleep),
|
||||
patch("app.mqtt_base.asyncio.sleep", side_effect=cancel_on_sleep),
|
||||
):
|
||||
await pub.start(settings)
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user