diff --git a/AGENTS.md b/AGENTS.md index 0dfa427..8ff41df 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -152,7 +152,8 @@ This message-layer echo/path handling is independent of raw-packet storage dedup │ ├── event_handlers.py # Radio events │ ├── decoder.py # Packet decryption │ ├── websocket.py # Real-time broadcasts -│ ├── mqtt.py # Optional MQTT publisher +│ ├── mqtt_base.py # Shared MQTT publisher base class (lifecycle, reconnect, backoff) +│ ├── mqtt.py # Private MQTT publisher │ └── community_mqtt.py # Community MQTT publisher (raw packet sharing) ├── frontend/ # React frontend │ ├── AGENTS.md # Frontend documentation diff --git a/app/AGENTS.md b/app/AGENTS.md index 1dd4de5..5273650 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -27,7 +27,8 @@ app/ ├── packet_processor.py # Raw packet pipeline, dedup, path handling ├── event_handlers.py # MeshCore event subscriptions and ACK tracking ├── websocket.py # WS manager + broadcast helpers -├── mqtt.py # Optional MQTT publisher (fire-and-forget forwarding) +├── mqtt_base.py # Shared MQTT publisher base class (lifecycle, reconnect, backoff) +├── mqtt.py # Private MQTT publisher (fire-and-forget forwarding) ├── community_mqtt.py # Community MQTT publisher (raw packet sharing) ├── bot.py # Bot execution and outbound bot sends ├── dependencies.py # Shared FastAPI dependency providers diff --git a/app/community_mqtt.py b/app/community_mqtt.py index 358396c..2cfdb8e 100644 --- a/app/community_mqtt.py +++ b/app/community_mqtt.py @@ -20,10 +20,10 @@ import time from datetime import datetime from typing import Any -import aiomqtt import nacl.bindings from app.models import AppSettings +from app.mqtt_base import BaseMqttPublisher logger = logging.getLogger(__name__) @@ -31,10 +31,6 @@ _DEFAULT_BROKER = "mqtt-us-v1.letsmesh.net" _DEFAULT_PORT = 443 # Community protocol uses WSS on port 443 by default _CLIENT_ID = "RemoteTerm (github.com/jkingsman/Remote-Terminal-for-MeshCore)" -# Reconnect backoff: start at 5s, cap at 60s -_BACKOFF_MIN = 5 -_BACKOFF_MAX = 60 - # Proactive JWT renewal: reconnect 1 hour before the 24h token expires _TOKEN_LIFETIME = 86400 # 24 hours (must match _generate_jwt_token exp) _TOKEN_RENEWAL_THRESHOLD = _TOKEN_LIFETIME - 3600 # 23 hours @@ -267,59 +263,12 @@ def _format_raw_packet(data: dict[str, Any], device_name: str, public_key_hex: s return packet -def _broadcast_health_update() -> None: - """Push a health broadcast so the frontend sees updated MQTT status badges.""" - from app.radio import radio_manager - from app.websocket import broadcast_health - - broadcast_health(radio_manager.is_connected, radio_manager.connection_info) - - -class CommunityMqttPublisher: +class CommunityMqttPublisher(BaseMqttPublisher): """Manages the community MQTT connection and publishes raw packets.""" - def __init__(self) -> None: - self._client: aiomqtt.Client | None = None - self._task: asyncio.Task[None] | None = None - self._settings: AppSettings | None = None - self._settings_version: int = 0 - self._version_event: asyncio.Event = asyncio.Event() - self.connected: bool = False - - async def start(self, settings: AppSettings) -> None: - """Start the background connection loop.""" - self._settings = settings - self._settings_version += 1 - self._version_event.set() - if self._task is None or self._task.done(): - self._task = asyncio.create_task(self._connection_loop()) - - async def stop(self) -> None: - """Cancel the background task and disconnect.""" - if self._task and not self._task.done(): - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - self._task = None - self._client = None - self.connected = False - - async def restart(self, settings: AppSettings) -> None: - """Called when community MQTT settings change — stop + start.""" - await self.stop() - await self.start(settings) - - async def publish(self, topic: str, payload: dict[str, Any]) -> None: - """Publish a JSON payload. Drops silently if not connected.""" - if self._client is None or not self.connected: - return - try: - await self._client.publish(topic, json.dumps(payload)) - except Exception as e: - logger.warning("Community MQTT publish failed on %s: %s", topic, e) - self.connected = False + _backoff_max = 60 + _log_prefix = "Community MQTT" + _not_configured_timeout: float | None = 30 def _is_configured(self) -> bool: """Check if community MQTT is enabled and keys are available.""" @@ -327,122 +276,70 @@ class CommunityMqttPublisher: return bool(self._settings and self._settings.community_mqtt_enabled and has_private_key()) - async def _connection_loop(self) -> None: - """Background loop: connect, wait, reconnect on failure.""" + def _build_client_kwargs(self, settings: AppSettings) -> dict[str, Any]: from app.keystore import get_private_key, get_public_key - from app.websocket import broadcast_error, broadcast_success - backoff = _BACKOFF_MIN + private_key = get_private_key() + public_key = get_public_key() + assert private_key is not None and public_key is not None # guaranteed by _pre_connect - while True: - if not self._is_configured(): - self.connected = False - self._client = None - self._version_event.clear() - try: - # Also wake periodically so newly exported keys are detected without a settings change. - await asyncio.wait_for(self._version_event.wait(), timeout=30) - except asyncio.TimeoutError: - continue - except asyncio.CancelledError: - return - continue + pubkey_hex = public_key.hex().upper() + broker_raw = settings.community_mqtt_broker or _DEFAULT_BROKER + broker_host, broker_port = _parse_broker_address(broker_raw) + jwt_token = _generate_jwt_token( + private_key, + public_key, + audience=broker_host, + email=settings.community_mqtt_email or "", + ) - settings = self._settings - assert settings is not None - version_at_connect = self._settings_version + tls_context = ssl.create_default_context() + return { + "hostname": broker_host, + "port": broker_port, + "transport": "websockets", + "tls_context": tls_context, + "websocket_path": "/", + "username": f"v1_{pubkey_hex}", + "password": jwt_token, + } + + def _on_connected(self, settings: AppSettings) -> tuple[str, str]: + broker_raw = settings.community_mqtt_broker or _DEFAULT_BROKER + broker_host, broker_port = _parse_broker_address(broker_raw) + return ("Community MQTT connected", f"{broker_host}:{broker_port}") + + def _on_error(self) -> tuple[str, str]: + return ( + "Community MQTT connection failure", + "Check your internet connection or try again later.", + ) + + def _should_break_wait(self, elapsed: float) -> bool: + if not self.connected: + logger.info("Community MQTT publish failure detected, reconnecting") + return True + if elapsed >= _TOKEN_RENEWAL_THRESHOLD: + logger.info("Community MQTT JWT nearing expiry, reconnecting") + return True + return False + + async def _pre_connect(self, settings: AppSettings) -> bool: + from app.keystore import get_private_key, get_public_key + + private_key = get_private_key() + public_key = get_public_key() + if private_key is None or public_key is None: + # Keys not available yet, wait for settings change or key export + self.connected = False + self._version_event.clear() try: - private_key = get_private_key() - public_key = get_public_key() - if private_key is None or public_key is None: - # Keys not available yet, wait for settings change - self.connected = False - self._version_event.clear() - try: - await asyncio.wait_for(self._version_event.wait(), timeout=30) - except asyncio.TimeoutError: - continue - except asyncio.CancelledError: - return - continue - - pubkey_hex = public_key.hex().upper() - broker_raw = settings.community_mqtt_broker or _DEFAULT_BROKER - broker_host, broker_port = _parse_broker_address(broker_raw) - jwt_token = _generate_jwt_token( - private_key, - public_key, - audience=broker_host, - email=settings.community_mqtt_email or "", - ) - token_created_at = time.monotonic() - - tls_context = ssl.create_default_context() - - async with aiomqtt.Client( - hostname=broker_host, - port=broker_port, - transport="websockets", - tls_context=tls_context, - websocket_path="/", - username=f"v1_{pubkey_hex}", - password=jwt_token, - ) as client: - self._client = client - self.connected = True - backoff = _BACKOFF_MIN - - broadcast_success( - "Community MQTT connected", - f"{broker_host}:{broker_port}", - ) - _broadcast_health_update() - - # Wait until cancelled, settings change, or token nears expiry - while self._settings_version == version_at_connect: - self._version_event.clear() - try: - await asyncio.wait_for(self._version_event.wait(), timeout=60) - except asyncio.TimeoutError: - # Detect publish failure: reconnect so packets resume - if not self.connected: - logger.info("Community MQTT publish failure detected, reconnecting") - break - # Proactive JWT renewal: reconnect before token expires - if time.monotonic() - token_created_at >= _TOKEN_RENEWAL_THRESHOLD: - logger.info("Community MQTT JWT nearing expiry, reconnecting") - break - continue - - # async with exited — client is now closed - self._client = None - self.connected = False - _broadcast_health_update() - - except asyncio.CancelledError: - self.connected = False - self._client = None - return - - except Exception as e: - self.connected = False - self._client = None - - broadcast_error( - "Community MQTT connection failure", - "Check your internet connection or try again later.", - ) - _broadcast_health_update() - logger.warning( - "Community MQTT connection error: %s (reconnecting in %ds)", e, backoff - ) - - try: - await asyncio.sleep(backoff) - except asyncio.CancelledError: - return - backoff = min(backoff * 2, _BACKOFF_MAX) + await asyncio.wait_for(self._version_event.wait(), timeout=30) + except asyncio.TimeoutError: + pass + return False + return True # Module-level singleton diff --git a/app/mqtt.py b/app/mqtt.py index 9c2933c..5323654 100644 --- a/app/mqtt.py +++ b/app/mqtt.py @@ -3,148 +3,40 @@ from __future__ import annotations import asyncio -import json import logging import ssl from typing import Any -import aiomqtt - from app.models import AppSettings +from app.mqtt_base import BaseMqttPublisher logger = logging.getLogger(__name__) -# Reconnect backoff: start at 5s, cap at 30s -_BACKOFF_MIN = 5 -_BACKOFF_MAX = 30 - -class MqttPublisher: +class MqttPublisher(BaseMqttPublisher): """Manages an MQTT connection and publishes mesh network events.""" - def __init__(self) -> None: - self._client: aiomqtt.Client | None = None - self._task: asyncio.Task[None] | None = None - self._settings: AppSettings | None = None - self._settings_version: int = 0 - self._version_event: asyncio.Event = asyncio.Event() - self.connected: bool = False + _backoff_max = 30 + _log_prefix = "MQTT" - async def start(self, settings: AppSettings) -> None: - """Start the background connection loop.""" - self._settings = settings - self._settings_version += 1 - self._version_event.set() - if self._task is None or self._task.done(): - self._task = asyncio.create_task(self._connection_loop()) - - async def stop(self) -> None: - """Cancel the background task and disconnect.""" - if self._task and not self._task.done(): - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - self._task = None - self._client = None - self.connected = False - - async def restart(self, settings: AppSettings) -> None: - """Called when MQTT settings change — stop + start.""" - await self.stop() - await self.start(settings) - - async def publish(self, topic: str, payload: dict[str, Any]) -> None: - """Publish a JSON payload. Drops silently if not connected.""" - if self._client is None or not self.connected: - return - try: - await self._client.publish(topic, json.dumps(payload)) - except Exception as e: - logger.warning("MQTT publish failed on %s: %s", topic, e) - self.connected = False - # Wake the connection loop so it exits the wait and reconnects - self._settings_version += 1 - self._version_event.set() - - def _mqtt_configured(self) -> bool: + def _is_configured(self) -> bool: """Check if MQTT is configured (broker host is set).""" return bool(self._settings and self._settings.mqtt_broker_host) - async def _connection_loop(self) -> None: - """Background loop: connect, wait, reconnect on failure.""" - from app.websocket import broadcast_error, broadcast_success + def _build_client_kwargs(self, settings: AppSettings) -> dict[str, Any]: + return { + "hostname": settings.mqtt_broker_host, + "port": settings.mqtt_broker_port, + "username": settings.mqtt_username or None, + "password": settings.mqtt_password or None, + "tls_context": self._build_tls_context(settings), + } - backoff = _BACKOFF_MIN + def _on_connected(self, settings: AppSettings) -> tuple[str, str]: + return ("MQTT connected", f"{settings.mqtt_broker_host}:{settings.mqtt_broker_port}") - while True: - if not self._mqtt_configured(): - self.connected = False - self._client = None - # Wait until settings change (which might configure MQTT) - self._version_event.clear() - try: - await self._version_event.wait() - except asyncio.CancelledError: - return - continue - - settings = self._settings - assert settings is not None # guaranteed by _mqtt_configured() - version_at_connect = self._settings_version - - try: - tls_context = self._build_tls_context(settings) - - async with aiomqtt.Client( - hostname=settings.mqtt_broker_host, - port=settings.mqtt_broker_port, - username=settings.mqtt_username or None, - password=settings.mqtt_password or None, - tls_context=tls_context, - ) as client: - self._client = client - self.connected = True - backoff = _BACKOFF_MIN - - broadcast_success( - "MQTT connected", - f"{settings.mqtt_broker_host}:{settings.mqtt_broker_port}", - ) - _broadcast_mqtt_health() - - # Wait until cancelled or settings version changes. - # The 60s timeout is a housekeeping wake-up; actual connection - # liveness is handled by paho-mqtt's keepalive mechanism. - while self._settings_version == version_at_connect: - self._version_event.clear() - try: - await asyncio.wait_for(self._version_event.wait(), timeout=60) - except asyncio.TimeoutError: - continue - - except asyncio.CancelledError: - self.connected = False - self._client = None - return - - except Exception as e: - self.connected = False - self._client = None - - broadcast_error( - "MQTT connection failure", - "Please correct the settings or disable.", - ) - _broadcast_mqtt_health() - logger.warning("MQTT connection error: %s (reconnecting in %ds)", e, backoff) - - try: - await asyncio.sleep(backoff) - except asyncio.CancelledError: - return - backoff = min(backoff * 2, _BACKOFF_MAX) + def _on_error(self) -> tuple[str, str]: + return ("MQTT connection failure", "Please correct the settings or disable.") @staticmethod def _build_tls_context(settings: AppSettings) -> ssl.SSLContext | None: @@ -162,14 +54,6 @@ class MqttPublisher: mqtt_publisher = MqttPublisher() -def _broadcast_mqtt_health() -> None: - """Push updated health (including mqtt_status) to all WS clients.""" - from app.radio import radio_manager - from app.websocket import broadcast_health - - broadcast_health(radio_manager.is_connected, radio_manager.connection_info) - - def mqtt_broadcast(event_type: str, data: dict[str, Any]) -> None: """Fire-and-forget MQTT publish, matching broadcast_event's pattern.""" if event_type not in ("message", "raw_packet"): diff --git a/app/mqtt_base.py b/app/mqtt_base.py new file mode 100644 index 0000000..e0e45ee --- /dev/null +++ b/app/mqtt_base.py @@ -0,0 +1,211 @@ +"""Shared base class for MQTT publisher lifecycle management. + +Both ``MqttPublisher`` (private broker) and ``CommunityMqttPublisher`` +(community aggregator) inherit from ``BaseMqttPublisher``, which owns +the connection-loop skeleton, reconnect/backoff logic, and publish method. +Subclasses override a small set of hooks to control configuration checks, +client construction, toast messages, and optional wait-loop behavior. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time +from abc import ABC, abstractmethod +from typing import Any + +import aiomqtt + +from app.models import AppSettings + +logger = logging.getLogger(__name__) + +_BACKOFF_MIN = 5 + + +def _broadcast_health() -> None: + """Push updated health (including MQTT status) to all WS clients.""" + from app.radio import radio_manager + from app.websocket import broadcast_health + + broadcast_health(radio_manager.is_connected, radio_manager.connection_info) + + +class BaseMqttPublisher(ABC): + """Base class for MQTT publishers with shared lifecycle management. + + Subclasses implement the abstract hooks to control configuration checks, + client construction, toast messages, and optional wait-loop behavior. + """ + + _backoff_max: int = 30 + _log_prefix: str = "MQTT" + _not_configured_timeout: float | None = None # None = block forever + + def __init__(self) -> None: + self._client: aiomqtt.Client | None = None + self._task: asyncio.Task[None] | None = None + self._settings: AppSettings | None = None + self._settings_version: int = 0 + self._version_event: asyncio.Event = asyncio.Event() + self.connected: bool = False + + # ── Lifecycle ────────────────────────────────────────────────────── + + async def start(self, settings: AppSettings) -> None: + """Start the background connection loop.""" + self._settings = settings + self._settings_version += 1 + self._version_event.set() + if self._task is None or self._task.done(): + self._task = asyncio.create_task(self._connection_loop()) + + async def stop(self) -> None: + """Cancel the background task and disconnect.""" + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + self._client = None + self.connected = False + + async def restart(self, settings: AppSettings) -> None: + """Called when settings change — stop + start.""" + await self.stop() + await self.start(settings) + + async def publish(self, topic: str, payload: dict[str, Any]) -> None: + """Publish a JSON payload. Drops silently if not connected.""" + if self._client is None or not self.connected: + return + try: + await self._client.publish(topic, json.dumps(payload)) + except Exception as e: + logger.warning("%s publish failed on %s: %s", self._log_prefix, topic, e) + self.connected = False + # Wake the connection loop so it exits the wait and reconnects + self._settings_version += 1 + self._version_event.set() + + # ── Abstract hooks ───────────────────────────────────────────────── + + @abstractmethod + def _is_configured(self) -> bool: + """Return True when this publisher should attempt to connect.""" + + @abstractmethod + def _build_client_kwargs(self, settings: AppSettings) -> dict[str, Any]: + """Return the keyword arguments for ``aiomqtt.Client(...)``.""" + + @abstractmethod + def _on_connected(self, settings: AppSettings) -> tuple[str, str]: + """Return ``(title, detail)`` for the success toast on connect.""" + + @abstractmethod + def _on_error(self) -> tuple[str, str]: + """Return ``(title, detail)`` for the error toast on connect failure.""" + + # ── Optional hooks ───────────────────────────────────────────────── + + def _should_break_wait(self, elapsed: float) -> bool: + """Return True to break the inner wait (e.g. token expiry).""" + return False + + async def _pre_connect(self, settings: AppSettings) -> bool: + """Called before connecting. Return True to proceed, False to retry.""" + return True + + # ── Connection loop ──────────────────────────────────────────────── + + async def _connection_loop(self) -> None: + """Background loop: connect, wait for version change, reconnect on failure.""" + from app.websocket import broadcast_error, broadcast_success + + backoff = _BACKOFF_MIN + + while True: + if not self._is_configured(): + self.connected = False + self._client = None + self._version_event.clear() + try: + if self._not_configured_timeout is None: + await self._version_event.wait() + else: + await asyncio.wait_for( + self._version_event.wait(), + timeout=self._not_configured_timeout, + ) + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + return + continue + + settings = self._settings + assert settings is not None # guaranteed by _is_configured() + version_at_connect = self._settings_version + + try: + if not await self._pre_connect(settings): + continue + + client_kwargs = self._build_client_kwargs(settings) + connect_time = time.monotonic() + + async with aiomqtt.Client(**client_kwargs) as client: + self._client = client + self.connected = True + backoff = _BACKOFF_MIN + + title, detail = self._on_connected(settings) + broadcast_success(title, detail) + _broadcast_health() + + # Wait until cancelled or settings version changes. + # The 60s timeout is a housekeeping wake-up; actual connection + # liveness is handled by paho-mqtt's keepalive mechanism. + while self._settings_version == version_at_connect: + self._version_event.clear() + try: + await asyncio.wait_for(self._version_event.wait(), timeout=60) + except asyncio.TimeoutError: + elapsed = time.monotonic() - connect_time + if self._should_break_wait(elapsed): + break + continue + + # async with exited — client is now closed + self._client = None + self.connected = False + _broadcast_health() + + except asyncio.CancelledError: + self.connected = False + self._client = None + return + + except Exception as e: + self.connected = False + self._client = None + + title, detail = self._on_error() + broadcast_error(title, detail) + _broadcast_health() + logger.warning( + "%s connection error: %s (reconnecting in %ds)", + self._log_prefix, + e, + backoff, + ) + + try: + await asyncio.sleep(backoff) + except asyncio.CancelledError: + return + backoff = min(backoff * 2, self._backoff_max) diff --git a/app/routers/health.py b/app/routers/health.py index 632c747..c49717a 100644 --- a/app/routers/health.py +++ b/app/routers/health.py @@ -40,7 +40,7 @@ async def build_health_data(radio_connected: bool, connection_info: str | None) try: from app.mqtt import mqtt_publisher - if mqtt_publisher._mqtt_configured(): + if mqtt_publisher._is_configured(): mqtt_status = "connected" if mqtt_publisher.connected else "disconnected" else: mqtt_status = "disabled" diff --git a/tests/test_mqtt.py b/tests/test_mqtt.py index 64a01e6..eab5250 100644 --- a/tests/test_mqtt.py +++ b/tests/test_mqtt.py @@ -81,12 +81,12 @@ class TestMqttPublisher: def test_not_configured_when_host_empty(self): pub = MqttPublisher() pub._settings = _make_settings(mqtt_broker_host="") - assert pub._mqtt_configured() is False + assert pub._is_configured() is False def test_configured_when_host_set(self): pub = MqttPublisher() pub._settings = _make_settings(mqtt_broker_host="broker.local") - assert pub._mqtt_configured() is True + assert pub._is_configured() is True @pytest.mark.asyncio async def test_publish_drops_silently_when_disconnected(self): @@ -300,8 +300,8 @@ class TestConnectionLoop: mock_client.__aenter__ = AsyncMock(side_effect=side_effect_aenter) with ( - patch("app.mqtt.aiomqtt.Client", return_value=mock_client), - patch("app.mqtt._broadcast_mqtt_health"), + patch("app.mqtt_base.aiomqtt.Client", return_value=mock_client), + patch("app.mqtt_base._broadcast_health"), patch("app.websocket.broadcast_success"), patch("app.websocket.broadcast_health"), ): @@ -321,7 +321,7 @@ class TestConnectionLoop: """Connection loop should retry after a connection error with backoff.""" import asyncio - from app.mqtt import _BACKOFF_MIN + from app.mqtt_base import _BACKOFF_MIN pub = MqttPublisher() settings = _make_settings() @@ -354,12 +354,12 @@ class TestConnectionLoop: return factory with ( - patch("app.mqtt.aiomqtt.Client", side_effect=make_client_factory()), - patch("app.mqtt._broadcast_mqtt_health"), + patch("app.mqtt_base.aiomqtt.Client", side_effect=make_client_factory()), + patch("app.mqtt_base._broadcast_health"), patch("app.websocket.broadcast_success"), patch("app.websocket.broadcast_error"), patch("app.websocket.broadcast_health"), - patch("app.mqtt.asyncio.sleep", new_callable=AsyncMock) as mock_sleep, + patch("app.mqtt_base.asyncio.sleep", new_callable=AsyncMock) as mock_sleep, ): await pub.start(settings) @@ -375,10 +375,10 @@ class TestConnectionLoop: @pytest.mark.asyncio async def test_backoff_increases_on_repeated_failures(self): - """Backoff should double after each failure, capped at _BACKOFF_MAX.""" + """Backoff should double after each failure, capped at _backoff_max.""" import asyncio - from app.mqtt import _BACKOFF_MAX, _BACKOFF_MIN + from app.mqtt_base import _BACKOFF_MIN pub = MqttPublisher() settings = _make_settings() @@ -408,11 +408,11 @@ class TestConnectionLoop: raise asyncio.CancelledError with ( - patch("app.mqtt.aiomqtt.Client", side_effect=factory), - patch("app.mqtt._broadcast_mqtt_health"), + patch("app.mqtt_base.aiomqtt.Client", side_effect=factory), + patch("app.mqtt_base._broadcast_health"), patch("app.websocket.broadcast_error"), patch("app.websocket.broadcast_health"), - patch("app.mqtt.asyncio.sleep", side_effect=capture_sleep), + patch("app.mqtt_base.asyncio.sleep", side_effect=capture_sleep), ): await pub.start(settings) try: @@ -423,8 +423,8 @@ class TestConnectionLoop: assert sleep_args[0] == _BACKOFF_MIN assert sleep_args[1] == _BACKOFF_MIN * 2 assert sleep_args[2] == _BACKOFF_MIN * 4 - # Fourth should be capped at _BACKOFF_MAX (5*8=40 > 30) - assert sleep_args[3] == _BACKOFF_MAX + # Fourth should be capped at _backoff_max (5*8=40 > 30) + assert sleep_args[3] == MqttPublisher._backoff_max @pytest.mark.asyncio async def test_waits_for_settings_when_unconfigured(self): @@ -449,8 +449,8 @@ class TestConnectionLoop: return mock with ( - patch("app.mqtt.aiomqtt.Client", side_effect=make_success_client), - patch("app.mqtt._broadcast_mqtt_health"), + patch("app.mqtt_base.aiomqtt.Client", side_effect=make_success_client), + patch("app.mqtt_base._broadcast_health"), patch("app.websocket.broadcast_success"), patch("app.websocket.broadcast_health"), ): @@ -472,7 +472,7 @@ class TestConnectionLoop: @pytest.mark.asyncio async def test_health_broadcast_on_connect_and_failure(self): - """_broadcast_mqtt_health should be called on connect and on failure.""" + """_broadcast_health should be called on connect and on failure.""" import asyncio pub = MqttPublisher() @@ -497,8 +497,8 @@ class TestConnectionLoop: return mock with ( - patch("app.mqtt.aiomqtt.Client", side_effect=make_client), - patch("app.mqtt._broadcast_mqtt_health", side_effect=track_health), + patch("app.mqtt_base.aiomqtt.Client", side_effect=make_client), + patch("app.mqtt_base._broadcast_health", side_effect=track_health), patch("app.websocket.broadcast_success"), patch("app.websocket.broadcast_health"), ): @@ -512,7 +512,7 @@ class TestConnectionLoop: @pytest.mark.asyncio async def test_health_broadcast_on_connection_error(self): - """_broadcast_mqtt_health should be called when connection fails.""" + """_broadcast_health should be called when connection fails.""" import asyncio pub = MqttPublisher() @@ -534,11 +534,11 @@ class TestConnectionLoop: return mock with ( - patch("app.mqtt.aiomqtt.Client", side_effect=make_failing_client), - patch("app.mqtt._broadcast_mqtt_health", side_effect=track_health), + patch("app.mqtt_base.aiomqtt.Client", side_effect=make_failing_client), + patch("app.mqtt_base._broadcast_health", side_effect=track_health), patch("app.websocket.broadcast_error"), patch("app.websocket.broadcast_health"), - patch("app.mqtt.asyncio.sleep", side_effect=cancel_on_sleep), + patch("app.mqtt_base.asyncio.sleep", side_effect=cancel_on_sleep), ): await pub.start(settings) try: