diff --git a/AGENTS.md b/AGENTS.md index 1244a6c..0dfa427 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -152,7 +152,8 @@ This message-layer echo/path handling is independent of raw-packet storage dedup │ ├── event_handlers.py # Radio events │ ├── decoder.py # Packet decryption │ ├── websocket.py # Real-time broadcasts -│ └── mqtt.py # Optional MQTT publisher +│ ├── mqtt.py # Optional MQTT publisher +│ └── community_mqtt.py # Community MQTT publisher (raw packet sharing) ├── frontend/ # React frontend │ ├── AGENTS.md # Frontend documentation │ ├── src/ @@ -233,6 +234,7 @@ Key test files: - `tests/test_rx_log_data.py` - on_rx_log_data event handler integration - `tests/test_ack_tracking_wiring.py` - DM ACK tracking extraction and wiring - `tests/test_health_mqtt_status.py` - Health endpoint MQTT status field +- `tests/test_community_mqtt.py` - Community MQTT publisher (JWT, packet format, hash, broadcast) ### Frontend (Vitest) @@ -361,6 +363,17 @@ Optional MQTT integration forwards mesh events to an external broker for home au **Security**: MQTT password stored in plaintext in SQLite, consistent with the project's trusted-network design. +### Community MQTT Sharing + +Separate from private MQTT, the community publisher (`app/community_mqtt.py`) shares raw packets with the MeshCore community aggregator for coverage mapping and analysis. Only raw packets are shared — never decrypted messages. + +- Connects to community broker (default `mqtt-us-v1.letsmesh.net:443`) via WebSockets over TLS. +- Authentication via Ed25519 JWT signed with the radio's private key. Tokens auto-renew before 24h expiry. +- Broker address field supports `host:port` format (default port 443 if omitted). +- Topic: `meshcore/{IATA}/{pubkey}/packets` — IATA is a 3-letter region code. +- JWT `email` claim enables node claiming on the community aggregator. +- Config: `community_mqtt_enabled`, `community_mqtt_iata`, `community_mqtt_broker`, `community_mqtt_email` in `app_settings`. + ### Server-Side Decryption The server can decrypt packets using stored keys, both in real-time and for historical packets. @@ -402,7 +415,7 @@ mc.subscribe(EventType.ACK, handler) | `MESHCORE_LOG_LEVEL` | `INFO` | Logging level (`DEBUG`/`INFO`/`WARNING`/`ERROR`) | | `MESHCORE_DATABASE_PATH` | `data/meshcore.db` | SQLite database location | -**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `bots`, and all MQTT configuration (`mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password`, `mqtt_use_tls`, `mqtt_tls_insecure`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets`). They are configured via `GET/PATCH /api/settings` (and related settings endpoints). +**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `bots`, all MQTT configuration (`mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password`, `mqtt_use_tls`, `mqtt_tls_insecure`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets`), and community MQTT configuration (`community_mqtt_enabled`, `community_mqtt_iata`, `community_mqtt_broker`, `community_mqtt_email`). They are configured via `GET/PATCH /api/settings` (and related settings endpoints). Byte-perfect channel retries are user-triggered via `POST /api/messages/channel/{message_id}/resend` and are allowed for 30 seconds after the original send. diff --git a/app/AGENTS.md b/app/AGENTS.md index d9ee962..1dd4de5 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -28,6 +28,7 @@ app/ ├── event_handlers.py # MeshCore event subscriptions and ACK tracking ├── websocket.py # WS manager + broadcast helpers ├── mqtt.py # Optional MQTT publisher (fire-and-forget forwarding) +├── community_mqtt.py # Community MQTT publisher (raw packet sharing) ├── bot.py # Bot execution and outbound bot sends ├── dependencies.py # Shared FastAPI dependency providers ├── keystore.py # Ephemeral private/public key storage for DM decryption @@ -114,6 +115,20 @@ app/ - Settings changes trigger `mqtt_publisher.restart()` — no server restart needed. - Topics: `{prefix}/dm:{key}`, `{prefix}/gm:{key}`, `{prefix}/raw/dm:{key}`, `{prefix}/raw/gm:{key}`, `{prefix}/raw/unrouted`. +### Community MQTT + +- Separate publisher (`app/community_mqtt.py`) for sharing raw packets with the MeshCore community aggregator. +- Independent from the private `MqttPublisher` — different broker, authentication, and topic structure. +- Connects to the community broker (default `mqtt-us-v1.letsmesh.net:443`) via WebSockets over TLS. +- Authentication: Ed25519 JWT tokens signed with the radio's expanded "orlp" private key. Tokens expire after 24 hours; proactive renewal at 23 hours. +- Broker address supports `host:port` format; defaults to port 443 if omitted. +- JWT claims include `publicKey`, `owner` (radio pubkey), `client` (app identifier), and optional `email` (for node claiming on the community aggregator). +- Topic: `meshcore/{IATA}/{pubkey}/packets` — IATA is a 3-letter region code (required to enable; no default). +- Only raw packets are published — never decrypted messages. +- Publishes are fire-and-forget. The connection loop detects publish failures via `connected` flag and reconnects within 60 seconds. +- Health endpoint includes `community_mqtt_status` field. +- Settings: `community_mqtt_enabled`, `community_mqtt_iata`, `community_mqtt_broker`, `community_mqtt_email`. + ## API Surface (all under `/api`) ### Health @@ -222,6 +237,7 @@ Main tables: - `bots` - `mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password` - `mqtt_use_tls`, `mqtt_tls_insecure`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets` +- `community_mqtt_enabled`, `community_mqtt_iata`, `community_mqtt_broker`, `community_mqtt_email` ## Security Posture (intentional) @@ -260,6 +276,7 @@ tests/ ├── test_message_pagination.py # Cursor-based message pagination ├── test_message_prefix_claim.py # Message prefix claim logic ├── test_migrations.py # Schema migration system +├── test_community_mqtt.py # Community MQTT publisher (JWT, packet format, hash, broadcast) ├── test_mqtt.py # MQTT publisher topic routing and lifecycle ├── test_packet_pipeline.py # End-to-end packet processing ├── test_packets_router.py # Packets router endpoints (decrypt, maintenance) diff --git a/app/community_mqtt.py b/app/community_mqtt.py new file mode 100644 index 0000000..358396c --- /dev/null +++ b/app/community_mqtt.py @@ -0,0 +1,492 @@ +"""Community MQTT publisher for sharing raw packets with the MeshCore community. + +Publishes raw packet data to mqtt-us-v1.letsmesh.net using the protocol +defined by meshcore-packet-capture (https://github.com/agessaman/meshcore-packet-capture). + +Authentication uses Ed25519 JWT tokens signed with the radio's private key. +This module is independent from the private MqttPublisher in app/mqtt.py. +""" + +from __future__ import annotations + +import asyncio +import base64 +import hashlib +import json +import logging +import re +import ssl +import time +from datetime import datetime +from typing import Any + +import aiomqtt +import nacl.bindings + +from app.models import AppSettings + +logger = logging.getLogger(__name__) + +_DEFAULT_BROKER = "mqtt-us-v1.letsmesh.net" +_DEFAULT_PORT = 443 # Community protocol uses WSS on port 443 by default +_CLIENT_ID = "RemoteTerm (github.com/jkingsman/Remote-Terminal-for-MeshCore)" + +# Reconnect backoff: start at 5s, cap at 60s +_BACKOFF_MIN = 5 +_BACKOFF_MAX = 60 + +# Proactive JWT renewal: reconnect 1 hour before the 24h token expires +_TOKEN_LIFETIME = 86400 # 24 hours (must match _generate_jwt_token exp) +_TOKEN_RENEWAL_THRESHOLD = _TOKEN_LIFETIME - 3600 # 23 hours + +# Ed25519 group order +_L = 2**252 + 27742317777372353535851937790883648493 +_IATA_RE = re.compile(r"^[A-Z]{3}$") + +# Route type mapping: bottom 2 bits of first byte +_ROUTE_MAP = {0: "F", 1: "F", 2: "D", 3: "T"} + + +def _parse_broker_address(broker_str: str) -> tuple[str, int]: + """Parse 'host' or 'host:port' into (host, port). Defaults to _DEFAULT_PORT.""" + if ":" in broker_str: + host, port_str = broker_str.rsplit(":", 1) + try: + return host, int(port_str) + except ValueError: + pass + return broker_str, _DEFAULT_PORT + + +def _base64url_encode(data: bytes) -> str: + """Base64url encode without padding.""" + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") + + +def _ed25519_sign_expanded( + message: bytes, scalar: bytes, prefix: bytes, public_key: bytes +) -> bytes: + """Sign a message using MeshCore's expanded Ed25519 key format. + + MeshCore stores 64-byte "orlp" format keys: scalar(32) || prefix(32). + Standard Ed25519 libraries expect seed format and would re-SHA-512 the key. + This performs the signing manually using the already-expanded key material. + + Port of meshcore-packet-capture's ed25519_sign_with_expanded_key(). + """ + # r = SHA-512(prefix || message) mod L + r = int.from_bytes(hashlib.sha512(prefix + message).digest(), "little") % _L + # R = r * B (base point multiplication) + R = nacl.bindings.crypto_scalarmult_ed25519_base_noclamp(r.to_bytes(32, "little")) + # k = SHA-512(R || public_key || message) mod L + k = int.from_bytes(hashlib.sha512(R + public_key + message).digest(), "little") % _L + # s = (r + k * scalar) mod L + s = (r + k * int.from_bytes(scalar, "little")) % _L + return R + s.to_bytes(32, "little") + + +def _generate_jwt_token( + private_key: bytes, + public_key: bytes, + *, + audience: str = _DEFAULT_BROKER, + email: str = "", +) -> str: + """Generate a JWT token for community MQTT authentication. + + Creates a token with Ed25519 signature using MeshCore's expanded key format. + Token format: header_b64.payload_b64.signature_hex + + Optional ``email`` embeds a node-claiming identity so the community + aggregator can associate this radio with an owner. + """ + header = {"alg": "Ed25519", "typ": "JWT"} + now = int(time.time()) + pubkey_hex = public_key.hex().upper() + payload: dict[str, object] = { + "publicKey": pubkey_hex, + "iat": now, + "exp": now + _TOKEN_LIFETIME, + "aud": audience, + "owner": pubkey_hex, + "client": _CLIENT_ID, + } + if email: + payload["email"] = email + + header_b64 = _base64url_encode(json.dumps(header, separators=(",", ":")).encode()) + payload_b64 = _base64url_encode(json.dumps(payload, separators=(",", ":")).encode()) + + signing_input = f"{header_b64}.{payload_b64}".encode() + + scalar = private_key[:32] + prefix = private_key[32:] + signature = _ed25519_sign_expanded(signing_input, scalar, prefix, public_key) + + return f"{header_b64}.{payload_b64}.{signature.hex()}" + + +def _calculate_packet_hash(raw_bytes: bytes) -> str: + """Calculate packet hash matching MeshCore's Packet::calculatePacketHash(). + + Parses the packet structure to extract payload type and payload data, + then hashes: payload_type(1 byte) [+ path_len(2 bytes LE) for TRACE] + payload_data. + Returns first 16 hex characters (uppercase). + """ + if not raw_bytes: + return "0" * 16 + + try: + header = raw_bytes[0] + payload_type = (header >> 2) & 0x0F + route_type = header & 0x03 + + # Transport codes present for TRANSPORT_FLOOD (0) and TRANSPORT_DIRECT (3) + has_transport = route_type in (0x00, 0x03) + + offset = 1 # Past header + if has_transport: + offset += 4 # Skip 4 bytes of transport codes + + # Read path_len (1 byte on wire). Invalid/truncated packets map to zero hash. + if offset >= len(raw_bytes): + return "0" * 16 + path_len = raw_bytes[offset] + offset += 1 + + # Skip past path to get to payload. Invalid/truncated packets map to zero hash. + if len(raw_bytes) < offset + path_len: + return "0" * 16 + payload_start = offset + path_len + payload_data = raw_bytes[payload_start:] + + # Hash: payload_type(1 byte) [+ path_len as uint16_t LE for TRACE] + payload_data + hash_obj = hashlib.sha256() + hash_obj.update(bytes([payload_type])) + if payload_type == 9: # PAYLOAD_TYPE_TRACE + hash_obj.update(path_len.to_bytes(2, byteorder="little")) + hash_obj.update(payload_data) + + return hash_obj.hexdigest()[:16].upper() + except Exception: + return "0" * 16 + + +def _decode_packet_fields(raw_bytes: bytes) -> tuple[str, str, str, list[str], int | None]: + """Decode packet fields used by the community uploader payload format. + + Returns: + (route_letter, packet_type_str, payload_len_str, path_values, payload_type_int) + """ + # Reference defaults when decode fails + route = "U" + packet_type = "0" + payload_len = "0" + path_values: list[str] = [] + payload_type: int | None = None + + try: + if len(raw_bytes) < 2: + return route, packet_type, payload_len, path_values, payload_type + + header = raw_bytes[0] + payload_version = (header >> 6) & 0x03 + if payload_version != 0: + return route, packet_type, payload_len, path_values, payload_type + + route_type = header & 0x03 + has_transport = route_type in (0x00, 0x03) + + offset = 1 + if has_transport: + offset += 4 + + if len(raw_bytes) <= offset: + return route, packet_type, payload_len, path_values, payload_type + + path_len = raw_bytes[offset] + offset += 1 + + if len(raw_bytes) < offset + path_len: + return route, packet_type, payload_len, path_values, payload_type + + path_bytes = raw_bytes[offset : offset + path_len] + offset += path_len + + payload_type = (header >> 2) & 0x0F + route = _ROUTE_MAP.get(route_type, "U") + packet_type = str(payload_type) + payload_len = str(max(0, len(raw_bytes) - offset)) + path_values = [f"{b:02x}" for b in path_bytes] + + return route, packet_type, payload_len, path_values, payload_type + except Exception: + return route, packet_type, payload_len, path_values, payload_type + + +def _format_raw_packet(data: dict[str, Any], device_name: str, public_key_hex: str) -> dict: + """Convert a RawPacketBroadcast dict to meshcore-packet-capture format.""" + raw_hex = data.get("data", "") + raw_bytes = bytes.fromhex(raw_hex) if raw_hex else b"" + + route, packet_type, payload_len, path_values, _payload_type = _decode_packet_fields(raw_bytes) + + # Reference format uses local "now" timestamp and derived time/date fields. + current_time = datetime.now() + ts_str = current_time.isoformat() + + # SNR/RSSI are always strings in reference output. + snr_val = data.get("snr") + rssi_val = data.get("rssi") + snr = str(snr_val) if snr_val is not None else "Unknown" + rssi = str(rssi_val) if rssi_val is not None else "Unknown" + + packet_hash = _calculate_packet_hash(raw_bytes) + + packet = { + "origin": device_name or "MeshCore Device", + "origin_id": public_key_hex.upper(), + "timestamp": ts_str, + "type": "PACKET", + "direction": "rx", + "time": current_time.strftime("%H:%M:%S"), + "date": current_time.strftime("%d/%m/%Y"), + "len": str(len(raw_bytes)), + "packet_type": packet_type, + "route": route, + "payload_len": payload_len, + "raw": raw_hex.upper(), + "SNR": snr, + "RSSI": rssi, + "hash": packet_hash, + } + + if route == "D": + packet["path"] = ",".join(path_values) + + return packet + + +def _broadcast_health_update() -> None: + """Push a health broadcast so the frontend sees updated MQTT status badges.""" + from app.radio import radio_manager + from app.websocket import broadcast_health + + broadcast_health(radio_manager.is_connected, radio_manager.connection_info) + + +class CommunityMqttPublisher: + """Manages the community MQTT connection and publishes raw packets.""" + + def __init__(self) -> None: + self._client: aiomqtt.Client | None = None + self._task: asyncio.Task[None] | None = None + self._settings: AppSettings | None = None + self._settings_version: int = 0 + self._version_event: asyncio.Event = asyncio.Event() + self.connected: bool = False + + async def start(self, settings: AppSettings) -> None: + """Start the background connection loop.""" + self._settings = settings + self._settings_version += 1 + self._version_event.set() + if self._task is None or self._task.done(): + self._task = asyncio.create_task(self._connection_loop()) + + async def stop(self) -> None: + """Cancel the background task and disconnect.""" + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + self._client = None + self.connected = False + + async def restart(self, settings: AppSettings) -> None: + """Called when community MQTT settings change — stop + start.""" + await self.stop() + await self.start(settings) + + async def publish(self, topic: str, payload: dict[str, Any]) -> None: + """Publish a JSON payload. Drops silently if not connected.""" + if self._client is None or not self.connected: + return + try: + await self._client.publish(topic, json.dumps(payload)) + except Exception as e: + logger.warning("Community MQTT publish failed on %s: %s", topic, e) + self.connected = False + + def _is_configured(self) -> bool: + """Check if community MQTT is enabled and keys are available.""" + from app.keystore import has_private_key + + return bool(self._settings and self._settings.community_mqtt_enabled and has_private_key()) + + async def _connection_loop(self) -> None: + """Background loop: connect, wait, reconnect on failure.""" + from app.keystore import get_private_key, get_public_key + from app.websocket import broadcast_error, broadcast_success + + backoff = _BACKOFF_MIN + + while True: + if not self._is_configured(): + self.connected = False + self._client = None + self._version_event.clear() + try: + # Also wake periodically so newly exported keys are detected without a settings change. + await asyncio.wait_for(self._version_event.wait(), timeout=30) + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + return + continue + + settings = self._settings + assert settings is not None + version_at_connect = self._settings_version + + try: + private_key = get_private_key() + public_key = get_public_key() + if private_key is None or public_key is None: + # Keys not available yet, wait for settings change + self.connected = False + self._version_event.clear() + try: + await asyncio.wait_for(self._version_event.wait(), timeout=30) + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + return + continue + + pubkey_hex = public_key.hex().upper() + broker_raw = settings.community_mqtt_broker or _DEFAULT_BROKER + broker_host, broker_port = _parse_broker_address(broker_raw) + jwt_token = _generate_jwt_token( + private_key, + public_key, + audience=broker_host, + email=settings.community_mqtt_email or "", + ) + token_created_at = time.monotonic() + + tls_context = ssl.create_default_context() + + async with aiomqtt.Client( + hostname=broker_host, + port=broker_port, + transport="websockets", + tls_context=tls_context, + websocket_path="/", + username=f"v1_{pubkey_hex}", + password=jwt_token, + ) as client: + self._client = client + self.connected = True + backoff = _BACKOFF_MIN + + broadcast_success( + "Community MQTT connected", + f"{broker_host}:{broker_port}", + ) + _broadcast_health_update() + + # Wait until cancelled, settings change, or token nears expiry + while self._settings_version == version_at_connect: + self._version_event.clear() + try: + await asyncio.wait_for(self._version_event.wait(), timeout=60) + except asyncio.TimeoutError: + # Detect publish failure: reconnect so packets resume + if not self.connected: + logger.info("Community MQTT publish failure detected, reconnecting") + break + # Proactive JWT renewal: reconnect before token expires + if time.monotonic() - token_created_at >= _TOKEN_RENEWAL_THRESHOLD: + logger.info("Community MQTT JWT nearing expiry, reconnecting") + break + continue + + # async with exited — client is now closed + self._client = None + self.connected = False + _broadcast_health_update() + + except asyncio.CancelledError: + self.connected = False + self._client = None + return + + except Exception as e: + self.connected = False + self._client = None + + broadcast_error( + "Community MQTT connection failure", + "Check your internet connection or try again later.", + ) + _broadcast_health_update() + logger.warning( + "Community MQTT connection error: %s (reconnecting in %ds)", e, backoff + ) + + try: + await asyncio.sleep(backoff) + except asyncio.CancelledError: + return + backoff = min(backoff * 2, _BACKOFF_MAX) + + +# Module-level singleton +community_publisher = CommunityMqttPublisher() + + +def community_mqtt_broadcast(event_type: str, data: dict[str, Any]) -> None: + """Fire-and-forget community MQTT publish for raw packets only.""" + if event_type != "raw_packet": + return + if not community_publisher.connected or community_publisher._settings is None: + return + asyncio.create_task(_community_maybe_publish(data)) + + +async def _community_maybe_publish(data: dict[str, Any]) -> None: + """Format and publish a raw packet to the community broker.""" + settings = community_publisher._settings + if settings is None or not settings.community_mqtt_enabled: + return + + try: + from app.keystore import get_public_key + from app.radio import radio_manager + + public_key = get_public_key() + if public_key is None: + return + + pubkey_hex = public_key.hex().upper() + + # Get device name from radio + device_name = "" + if radio_manager.meshcore and radio_manager.meshcore.self_info: + device_name = radio_manager.meshcore.self_info.get("name", "") + + packet = _format_raw_packet(data, device_name, pubkey_hex) + iata = settings.community_mqtt_iata.upper().strip() + if not _IATA_RE.fullmatch(iata): + logger.debug("Community MQTT: skipping publish — no valid IATA code configured") + return + topic = f"meshcore/{iata}/{pubkey_hex}/packets" + + await community_publisher.publish(topic, packet) + + except Exception as e: + logger.warning("Community MQTT broadcast error: %s", e) diff --git a/app/main.py b/app/main.py index ee7e4c2..4914e4a 100644 --- a/app/main.py +++ b/app/main.py @@ -56,19 +56,22 @@ async def lifespan(app: FastAPI): # Always start connection monitor (even if initial connection failed) await radio_manager.start_connection_monitor() - # Start MQTT publisher if configured + # Start MQTT publishers if configured + from app.community_mqtt import community_publisher from app.mqtt import mqtt_publisher from app.repository import AppSettingsRepository try: mqtt_settings = await AppSettingsRepository.get() await mqtt_publisher.start(mqtt_settings) + await community_publisher.start(mqtt_settings) except Exception as e: - logger.warning("Failed to start MQTT publisher: %s", e) + logger.warning("Failed to start MQTT publisher(s): %s", e) yield logger.info("Shutting down") + await community_publisher.stop() await mqtt_publisher.stop() await radio_manager.stop_connection_monitor() await stop_message_polling() diff --git a/app/migrations.py b/app/migrations.py index c842033..57e9473 100644 --- a/app/migrations.py +++ b/app/migrations.py @@ -254,6 +254,13 @@ async def run_migrations(conn: aiosqlite.Connection) -> int: await set_version(conn, 31) applied += 1 + # Migration 32: Add community MQTT columns to app_settings + if version < 32: + logger.info("Applying migration 32: add community MQTT columns to app_settings") + await _migrate_032_add_community_mqtt_columns(conn) + await set_version(conn, 32) + applied += 1 + if applied > 0: logger.info( "Applied %d migration(s), schema now at version %d", applied, await get_version(conn) @@ -1891,3 +1898,30 @@ async def _migrate_031_add_mqtt_columns(conn: aiosqlite.Connection) -> None: await conn.execute(f"ALTER TABLE app_settings ADD COLUMN {col_name} {col_def}") await conn.commit() + + +async def _migrate_032_add_community_mqtt_columns(conn: aiosqlite.Connection) -> None: + """Add community MQTT configuration columns to app_settings.""" + # Guard: app_settings may not exist in partial-schema test setups + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='app_settings'" + ) + if not await cursor.fetchone(): + await conn.commit() + return + + cursor = await conn.execute("PRAGMA table_info(app_settings)") + columns = {row[1] for row in await cursor.fetchall()} + + new_columns = [ + ("community_mqtt_enabled", "INTEGER DEFAULT 0"), + ("community_mqtt_iata", "TEXT DEFAULT ''"), + ("community_mqtt_broker", "TEXT DEFAULT 'mqtt-us-v1.letsmesh.net'"), + ("community_mqtt_email", "TEXT DEFAULT ''"), + ] + + for col_name, col_def in new_columns: + if col_name not in columns: + await conn.execute(f"ALTER TABLE app_settings ADD COLUMN {col_name} {col_def}") + + await conn.commit() diff --git a/app/models.py b/app/models.py index a636060..5f1557b 100644 --- a/app/models.py +++ b/app/models.py @@ -463,6 +463,22 @@ class AppSettings(BaseModel): default=False, description="Whether to publish raw packets to MQTT", ) + community_mqtt_enabled: bool = Field( + default=False, + description="Whether to publish raw packets to the community MQTT broker (letsmesh.net)", + ) + community_mqtt_iata: str = Field( + default="", + description="IATA region code for community MQTT topic routing (3 alpha chars)", + ) + community_mqtt_broker: str = Field( + default="mqtt-us-v1.letsmesh.net", + description="Community MQTT broker hostname", + ) + community_mqtt_email: str = Field( + default="", + description="Email address for node claiming on the community aggregator (optional)", + ) class BusyChannel(BaseModel): diff --git a/app/repository/settings.py b/app/repository/settings.py index 49bf450..7cb9540 100644 --- a/app/repository/settings.py +++ b/app/repository/settings.py @@ -29,7 +29,9 @@ class AppSettingsRepository: advert_interval, last_advert_time, bots, mqtt_broker_host, mqtt_broker_port, mqtt_username, mqtt_password, mqtt_use_tls, mqtt_tls_insecure, mqtt_topic_prefix, - mqtt_publish_messages, mqtt_publish_raw_packets + mqtt_publish_messages, mqtt_publish_raw_packets, + community_mqtt_enabled, community_mqtt_iata, + community_mqtt_broker, community_mqtt_email FROM app_settings WHERE id = 1 """ ) @@ -103,6 +105,10 @@ class AppSettingsRepository: mqtt_topic_prefix=row["mqtt_topic_prefix"] or "meshcore", mqtt_publish_messages=bool(row["mqtt_publish_messages"]), mqtt_publish_raw_packets=bool(row["mqtt_publish_raw_packets"]), + community_mqtt_enabled=bool(row["community_mqtt_enabled"]), + community_mqtt_iata=row["community_mqtt_iata"] or "", + community_mqtt_broker=row["community_mqtt_broker"] or "mqtt-us-v1.letsmesh.net", + community_mqtt_email=row["community_mqtt_email"] or "", ) @staticmethod @@ -125,6 +131,10 @@ class AppSettingsRepository: mqtt_topic_prefix: str | None = None, mqtt_publish_messages: bool | None = None, mqtt_publish_raw_packets: bool | None = None, + community_mqtt_enabled: bool | None = None, + community_mqtt_iata: str | None = None, + community_mqtt_broker: str | None = None, + community_mqtt_email: str | None = None, ) -> AppSettings: """Update app settings. Only provided fields are updated.""" updates = [] @@ -204,6 +214,22 @@ class AppSettingsRepository: updates.append("mqtt_publish_raw_packets = ?") params.append(1 if mqtt_publish_raw_packets else 0) + if community_mqtt_enabled is not None: + updates.append("community_mqtt_enabled = ?") + params.append(1 if community_mqtt_enabled else 0) + + if community_mqtt_iata is not None: + updates.append("community_mqtt_iata = ?") + params.append(community_mqtt_iata) + + if community_mqtt_broker is not None: + updates.append("community_mqtt_broker = ?") + params.append(community_mqtt_broker) + + if community_mqtt_email is not None: + updates.append("community_mqtt_email = ?") + params.append(community_mqtt_email) + if updates: query = f"UPDATE app_settings SET {', '.join(updates)} WHERE id = 1" await db.conn.execute(query, params) diff --git a/app/routers/health.py b/app/routers/health.py index ab43c20..632c747 100644 --- a/app/routers/health.py +++ b/app/routers/health.py @@ -17,6 +17,7 @@ class HealthResponse(BaseModel): database_size_mb: float oldest_undecrypted_timestamp: int | None mqtt_status: str | None = None + community_mqtt_status: str | None = None async def build_health_data(radio_connected: bool, connection_info: str | None) -> dict: @@ -46,6 +47,18 @@ async def build_health_data(radio_connected: bool, connection_info: str | None) except Exception: pass + # Community MQTT status + community_mqtt_status: str | None = None + try: + from app.community_mqtt import community_publisher + + if community_publisher._is_configured(): + community_mqtt_status = "connected" if community_publisher.connected else "disconnected" + else: + community_mqtt_status = "disabled" + except Exception: + pass + return { "status": "ok" if radio_connected else "degraded", "radio_connected": radio_connected, @@ -53,6 +66,7 @@ async def build_health_data(radio_connected: bool, connection_info: str | None) "database_size_mb": db_size_mb, "oldest_undecrypted_timestamp": oldest_ts, "mqtt_status": mqtt_status, + "community_mqtt_status": community_mqtt_status, } diff --git a/app/routers/settings.py b/app/routers/settings.py index 4bfc7f1..d6af18f 100644 --- a/app/routers/settings.py +++ b/app/routers/settings.py @@ -1,5 +1,6 @@ import asyncio import logging +import re from typing import Literal from fastapi import APIRouter, HTTPException @@ -97,6 +98,22 @@ class AppSettingsUpdate(BaseModel): default=None, description="Whether to publish raw packets to MQTT", ) + community_mqtt_enabled: bool | None = Field( + default=None, + description="Whether to publish raw packets to the community MQTT broker", + ) + community_mqtt_iata: str | None = Field( + default=None, + description="IATA region code for community MQTT topic routing (3 alpha chars)", + ) + community_mqtt_broker: str | None = Field( + default=None, + description="Community MQTT broker hostname", + ) + community_mqtt_email: str | None = Field( + default=None, + description="Email address for node claiming on the community aggregator", + ) class FavoriteRequest(BaseModel): @@ -181,6 +198,43 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings: kwargs[field] = value mqtt_changed = True + # Community MQTT fields + community_mqtt_changed = False + if update.community_mqtt_enabled is not None: + kwargs["community_mqtt_enabled"] = update.community_mqtt_enabled + community_mqtt_changed = True + + if update.community_mqtt_iata is not None: + iata = update.community_mqtt_iata.upper().strip() + if iata and not re.fullmatch(r"[A-Z]{3}", iata): + raise HTTPException( + status_code=400, + detail="IATA code must be exactly 3 uppercase alphabetic characters", + ) + kwargs["community_mqtt_iata"] = iata + community_mqtt_changed = True + + if update.community_mqtt_broker is not None: + kwargs["community_mqtt_broker"] = update.community_mqtt_broker + community_mqtt_changed = True + + if update.community_mqtt_email is not None: + kwargs["community_mqtt_email"] = update.community_mqtt_email + community_mqtt_changed = True + + # Require IATA when enabling community MQTT + if kwargs.get("community_mqtt_enabled", False): + # Check the IATA value being set, or fall back to current settings + iata_value = kwargs.get("community_mqtt_iata") + if iata_value is None: + current = await AppSettingsRepository.get() + iata_value = current.community_mqtt_iata + if not iata_value or not re.fullmatch(r"[A-Z]{3}", iata_value): + raise HTTPException( + status_code=400, + detail="A valid IATA region code is required to enable community sharing", + ) + if kwargs: result = await AppSettingsRepository.update(**kwargs) @@ -190,6 +244,12 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings: await mqtt_publisher.restart(result) + # Restart community MQTT publisher if any community settings changed + if community_mqtt_changed: + from app.community_mqtt import community_publisher + + await community_publisher.restart(result) + return result return await AppSettingsRepository.get() diff --git a/app/websocket.py b/app/websocket.py index 825d4a1..1638035 100644 --- a/app/websocket.py +++ b/app/websocket.py @@ -104,6 +104,10 @@ def broadcast_event(event_type: str, data: dict) -> None: mqtt_broadcast(event_type, data) + from app.community_mqtt import community_mqtt_broadcast + + community_mqtt_broadcast(event_type, data) + def broadcast_error(message: str, details: str | None = None) -> None: """Broadcast an error notification to all connected clients. diff --git a/frontend/src/components/settings/SettingsMqttSection.tsx b/frontend/src/components/settings/SettingsMqttSection.tsx index 174fe1c..85b3f9c 100644 --- a/frontend/src/components/settings/SettingsMqttSection.tsx +++ b/frontend/src/components/settings/SettingsMqttSection.tsx @@ -27,6 +27,12 @@ export function SettingsMqttSection({ const [mqttPublishMessages, setMqttPublishMessages] = useState(false); const [mqttPublishRawPackets, setMqttPublishRawPackets] = useState(false); + // Community MQTT state + const [communityMqttEnabled, setCommunityMqttEnabled] = useState(false); + const [communityMqttIata, setCommunityMqttIata] = useState(''); + const [communityMqttBroker, setCommunityMqttBroker] = useState('mqtt-us-v1.letsmesh.net'); + const [communityMqttEmail, setCommunityMqttEmail] = useState(''); + const [busy, setBusy] = useState(false); const [error, setError] = useState(null); @@ -40,6 +46,10 @@ export function SettingsMqttSection({ setMqttTopicPrefix(appSettings.mqtt_topic_prefix ?? 'meshcore'); setMqttPublishMessages(appSettings.mqtt_publish_messages ?? false); setMqttPublishRawPackets(appSettings.mqtt_publish_raw_packets ?? false); + setCommunityMqttEnabled(appSettings.community_mqtt_enabled ?? false); + setCommunityMqttIata(appSettings.community_mqtt_iata ?? ''); + setCommunityMqttBroker(appSettings.community_mqtt_broker ?? 'mqtt-us-v1.letsmesh.net'); + setCommunityMqttEmail(appSettings.community_mqtt_email ?? ''); }, [appSettings]); const handleSave = async () => { @@ -57,6 +67,10 @@ export function SettingsMqttSection({ mqtt_topic_prefix: mqttTopicPrefix || 'meshcore', mqtt_publish_messages: mqttPublishMessages, mqtt_publish_raw_packets: mqttPublishRawPackets, + community_mqtt_enabled: communityMqttEnabled, + community_mqtt_iata: communityMqttIata, + community_mqtt_broker: communityMqttBroker || 'mqtt-us-v1.letsmesh.net', + community_mqtt_email: communityMqttEmail, }; await onSaveAppSettings(update); toast.success('MQTT settings saved'); @@ -69,6 +83,14 @@ export function SettingsMqttSection({ return (
+
+

Private MQTT Broker

+

+ Forward all mesh data to your own MQTT broker for home automation, logging, or alerting. + Publishes both decrypted messages and raw packets to your broker. +

+
+
{health?.mqtt_status === 'connected' ? ( @@ -218,6 +240,95 @@ export function SettingsMqttSection({

Forward all RF packets

+ + +
+

Community Analytics

+

+ Share raw packet data with the MeshCore community for coverage mapping and network + analysis. Only raw RF packets are shared — never decrypted messages. +

+
+ {health?.community_mqtt_status === 'connected' ? ( + <> +
+ Connected + + ) : health?.community_mqtt_status === 'disconnected' ? ( + <> +
+ Disconnected + + ) : ( + <> +
+ Disabled + + )} +
+ + + {communityMqttEnabled && ( +
+ + setCommunityMqttBroker(e.target.value)} + /> +

host or host:port (default port 443)

+ + setCommunityMqttIata(e.target.value.toUpperCase())} + className="w-32" + /> +

+ Your nearest airport's{' '} + + IATA code + {' '} + (required) +

+ {communityMqttIata && ( +

+ Topic: meshcore/{communityMqttIata}/<pubkey>/packets +

+ )} + + setCommunityMqttEmail(e.target.value)} + /> +

+ Used to claim your node on the community aggregator +

+
+ )} +
+ diff --git a/frontend/src/test/settingsModal.test.tsx b/frontend/src/test/settingsModal.test.tsx index 9c16ebd..c67aadd 100644 --- a/frontend/src/test/settingsModal.test.tsx +++ b/frontend/src/test/settingsModal.test.tsx @@ -39,6 +39,7 @@ const baseHealth: HealthStatus = { database_size_mb: 1.2, oldest_undecrypted_timestamp: null, mqtt_status: null, + community_mqtt_status: null, }; const baseSettings: AppSettings = { @@ -60,6 +61,10 @@ const baseSettings: AppSettings = { mqtt_topic_prefix: 'meshcore', mqtt_publish_messages: false, mqtt_publish_raw_packets: false, + community_mqtt_enabled: false, + community_mqtt_iata: '', + community_mqtt_broker: 'mqtt-us-v1.letsmesh.net', + community_mqtt_email: '', }; function renderModal(overrides?: { @@ -446,7 +451,9 @@ describe('SettingsModal', () => { }); openMqttSection(); - expect(screen.getByText('Disabled')).toBeInTheDocument(); + // Both MQTT and community MQTT show "Disabled" when null status + const disabledElements = screen.getAllByText('Disabled'); + expect(disabledElements.length).toBeGreaterThanOrEqual(1); }); it('shows MQTT connected status badge', () => { @@ -465,6 +472,73 @@ describe('SettingsModal', () => { expect(screen.getByText('Connected')).toBeInTheDocument(); }); + it('renders community sharing section in MQTT tab', () => { + renderModal(); + openMqttSection(); + + expect(screen.getByText('Community Analytics')).toBeInTheDocument(); + expect(screen.getByText('Enable Community Analytics')).toBeInTheDocument(); + }); + + it('shows IATA input only when community sharing is enabled', () => { + renderModal({ + appSettings: { + ...baseSettings, + community_mqtt_enabled: false, + }, + }); + openMqttSection(); + + expect(screen.queryByLabelText('Region Code (IATA)')).not.toBeInTheDocument(); + + // Enable community sharing + fireEvent.click(screen.getByText('Enable Community Analytics')); + expect(screen.getByLabelText('Region Code (IATA)')).toBeInTheDocument(); + }); + + it('includes community MQTT fields in save payload', async () => { + const { onSaveAppSettings } = renderModal({ + appSettings: { + ...baseSettings, + community_mqtt_enabled: true, + community_mqtt_iata: 'DEN', + }, + }); + openMqttSection(); + + fireEvent.click(screen.getByRole('button', { name: 'Save MQTT Settings' })); + + await waitFor(() => { + expect(onSaveAppSettings).toHaveBeenCalledWith( + expect.objectContaining({ + community_mqtt_enabled: true, + community_mqtt_iata: 'DEN', + }) + ); + }); + }); + + it('shows community MQTT connected status badge', () => { + renderModal({ + appSettings: { + ...baseSettings, + community_mqtt_enabled: true, + }, + health: { + ...baseHealth, + community_mqtt_status: 'connected', + }, + }); + openMqttSection(); + + // Community Analytics sub-section should show Connected + const communitySection = screen.getByText('Community Analytics').closest('div'); + expect(communitySection).not.toBeNull(); + // Both MQTT and community could show "Connected" — check count + const connectedElements = screen.getAllByText('Connected'); + expect(connectedElements.length).toBeGreaterThanOrEqual(1); + }); + it('fetches statistics when expanded in mobile external-nav mode', async () => { const mockStats: StatisticsResponse = { busiest_channels_24h: [], diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 7609161..941cc9c 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -30,6 +30,7 @@ export interface HealthStatus { database_size_mb: number; oldest_undecrypted_timestamp: number | null; mqtt_status: string | null; + community_mqtt_status: string | null; } export interface MaintenanceResult { @@ -192,6 +193,10 @@ export interface AppSettings { mqtt_topic_prefix: string; mqtt_publish_messages: boolean; mqtt_publish_raw_packets: boolean; + community_mqtt_enabled: boolean; + community_mqtt_iata: string; + community_mqtt_broker: string; + community_mqtt_email: string; } export interface AppSettingsUpdate { @@ -209,6 +214,10 @@ export interface AppSettingsUpdate { mqtt_topic_prefix?: string; mqtt_publish_messages?: boolean; mqtt_publish_raw_packets?: boolean; + community_mqtt_enabled?: boolean; + community_mqtt_iata?: string; + community_mqtt_broker?: string; + community_mqtt_email?: string; } export interface MigratePreferencesRequest { diff --git a/tests/test_community_mqtt.py b/tests/test_community_mqtt.py new file mode 100644 index 0000000..7878074 --- /dev/null +++ b/tests/test_community_mqtt.py @@ -0,0 +1,463 @@ +"""Tests for community MQTT publisher.""" + +import json +from unittest.mock import MagicMock, patch + +import nacl.bindings +import pytest + +from app.community_mqtt import ( + _CLIENT_ID, + _DEFAULT_BROKER, + _DEFAULT_PORT, + CommunityMqttPublisher, + _base64url_encode, + _calculate_packet_hash, + _ed25519_sign_expanded, + _format_raw_packet, + _generate_jwt_token, + _parse_broker_address, + community_mqtt_broadcast, +) +from app.models import AppSettings + + +def _make_test_keys() -> tuple[bytes, bytes]: + """Generate a test MeshCore-format key pair. + + Returns (private_key_64_bytes, public_key_32_bytes). + MeshCore format: scalar(32) || prefix(32), where scalar is already clamped. + """ + import hashlib + import os + + seed = os.urandom(32) + expanded = hashlib.sha512(seed).digest() + scalar = bytearray(expanded[:32]) + # Clamp scalar (standard Ed25519 clamping) + scalar[0] &= 248 + scalar[31] &= 127 + scalar[31] |= 64 + scalar = bytes(scalar) + prefix = expanded[32:] + + private_key = scalar + prefix + public_key = nacl.bindings.crypto_scalarmult_ed25519_base_noclamp(scalar) + return private_key, public_key + + +class TestBase64UrlEncode: + def test_encodes_without_padding(self): + result = _base64url_encode(b"\x00\x01\x02") + assert "=" not in result + + def test_uses_url_safe_chars(self): + # Bytes that would produce + and / in standard base64 + result = _base64url_encode(b"\xfb\xff\xfe") + assert "+" not in result + assert "/" not in result + + +class TestJwtGeneration: + def test_token_has_three_parts(self): + private_key, public_key = _make_test_keys() + token = _generate_jwt_token(private_key, public_key) + parts = token.split(".") + assert len(parts) == 3 + + def test_header_contains_ed25519_alg(self): + private_key, public_key = _make_test_keys() + token = _generate_jwt_token(private_key, public_key) + header_b64 = token.split(".")[0] + # Add padding for base64 decoding + import base64 + + padded = header_b64 + "=" * (4 - len(header_b64) % 4) + header = json.loads(base64.urlsafe_b64decode(padded)) + assert header["alg"] == "Ed25519" + assert header["typ"] == "JWT" + + def test_payload_contains_required_fields(self): + private_key, public_key = _make_test_keys() + token = _generate_jwt_token(private_key, public_key) + payload_b64 = token.split(".")[1] + import base64 + + padded = payload_b64 + "=" * (4 - len(payload_b64) % 4) + payload = json.loads(base64.urlsafe_b64decode(padded)) + assert payload["publicKey"] == public_key.hex().upper() + assert "iat" in payload + assert "exp" in payload + assert payload["exp"] - payload["iat"] == 86400 + assert payload["aud"] == _DEFAULT_BROKER + assert payload["owner"] == public_key.hex().upper() + assert payload["client"] == _CLIENT_ID + assert "email" not in payload # omitted when empty + + def test_payload_includes_email_when_provided(self): + private_key, public_key = _make_test_keys() + token = _generate_jwt_token(private_key, public_key, email="test@example.com") + payload_b64 = token.split(".")[1] + import base64 + + padded = payload_b64 + "=" * (4 - len(payload_b64) % 4) + payload = json.loads(base64.urlsafe_b64decode(padded)) + assert payload["email"] == "test@example.com" + + def test_payload_uses_custom_audience(self): + private_key, public_key = _make_test_keys() + token = _generate_jwt_token(private_key, public_key, audience="custom.broker.net") + payload_b64 = token.split(".")[1] + import base64 + + padded = payload_b64 + "=" * (4 - len(payload_b64) % 4) + payload = json.loads(base64.urlsafe_b64decode(padded)) + assert payload["aud"] == "custom.broker.net" + + def test_signature_is_valid_hex(self): + private_key, public_key = _make_test_keys() + token = _generate_jwt_token(private_key, public_key) + sig_hex = token.split(".")[2] + sig_bytes = bytes.fromhex(sig_hex) + assert len(sig_bytes) == 64 + + def test_signature_verifies(self): + """Verify the JWT signature using nacl.bindings.crypto_sign_open.""" + private_key, public_key = _make_test_keys() + token = _generate_jwt_token(private_key, public_key) + parts = token.split(".") + signing_input = f"{parts[0]}.{parts[1]}".encode() + signature = bytes.fromhex(parts[2]) + + # crypto_sign_open expects signature + message concatenated + signed_message = signature + signing_input + # This will raise if the signature is invalid + verified = nacl.bindings.crypto_sign_open(signed_message, public_key) + assert verified == signing_input + + +class TestEddsaSignExpanded: + def test_produces_64_byte_signature(self): + private_key, public_key = _make_test_keys() + message = b"test message" + sig = _ed25519_sign_expanded(message, private_key[:32], private_key[32:], public_key) + assert len(sig) == 64 + + def test_signature_verifies_with_nacl(self): + private_key, public_key = _make_test_keys() + message = b"hello world" + sig = _ed25519_sign_expanded(message, private_key[:32], private_key[32:], public_key) + + signed_message = sig + message + verified = nacl.bindings.crypto_sign_open(signed_message, public_key) + assert verified == message + + def test_different_messages_produce_different_signatures(self): + private_key, public_key = _make_test_keys() + sig1 = _ed25519_sign_expanded(b"msg1", private_key[:32], private_key[32:], public_key) + sig2 = _ed25519_sign_expanded(b"msg2", private_key[:32], private_key[32:], public_key) + assert sig1 != sig2 + + +class TestPacketFormatConversion: + def test_basic_field_mapping(self): + data = { + "id": 1, + "observation_id": 100, + "timestamp": 1700000000, + "data": "0a1b2c3d", + "payload_type": "ADVERT", + "snr": 5.5, + "rssi": -90, + "decrypted": False, + "decrypted_info": None, + } + result = _format_raw_packet(data, "TestNode", "AABBCCDD" * 8) + + assert result["origin"] == "TestNode" + assert result["origin_id"] == "AABBCCDD" * 8 + assert result["raw"] == "0A1B2C3D" + assert result["SNR"] == "5.5" + assert result["RSSI"] == "-90" + assert result["type"] == "PACKET" + assert result["direction"] == "rx" + assert result["len"] == "4" + + def test_timestamp_is_iso8601(self): + data = {"timestamp": 1700000000, "data": "00", "snr": None, "rssi": None} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert result["timestamp"] + assert "T" in result["timestamp"] + + def test_snr_rssi_unknown_when_none(self): + data = {"timestamp": 0, "data": "00", "snr": None, "rssi": None} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert result["SNR"] == "Unknown" + assert result["RSSI"] == "Unknown" + + def test_packet_type_extraction(self): + # Header 0x14 = type 5, route 0 (TRANSPORT_FLOOD): header + 4 transport + path_len. + data = {"timestamp": 0, "data": "140102030400", "snr": None, "rssi": None} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert result["packet_type"] == "5" + assert result["route"] == "F" + + def test_route_mapping(self): + # Test all 4 route types (matches meshcore-packet-capture) + # TRANSPORT_FLOOD=0 -> "F", FLOOD=1 -> "F", DIRECT=2 -> "D", TRANSPORT_DIRECT=3 -> "T" + samples = [ + ("000102030400", "F"), # TRANSPORT_FLOOD: header + transport + path_len + ("0100", "F"), # FLOOD: header + path_len + ("0200", "D"), # DIRECT: header + path_len + ("030102030400", "T"), # TRANSPORT_DIRECT: header + transport + path_len + ] + for raw_hex, expected in samples: + data = {"timestamp": 0, "data": raw_hex, "snr": None, "rssi": None} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert result["route"] == expected + + def test_hash_is_16_uppercase_hex_chars(self): + data = {"timestamp": 0, "data": "aabb", "snr": None, "rssi": None} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert len(result["hash"]) == 16 + assert result["hash"] == result["hash"].upper() + + def test_empty_data_handled(self): + data = {"timestamp": 0, "data": "", "snr": None, "rssi": None} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert result["raw"] == "" + assert result["len"] == "0" + assert result["packet_type"] == "0" + assert result["route"] == "U" + + def test_includes_reference_time_fields(self): + data = {"timestamp": 0, "data": "0100aabb", "snr": 1.0, "rssi": -70} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert result["time"] + assert result["date"] + assert result["payload_len"] == "2" + + def test_adds_path_for_direct_route(self): + # route=2 (DIRECT), path_len=2, path=aa bb, payload=cc + data = {"timestamp": 0, "data": "0202AABBCC", "snr": 1.0, "rssi": -70} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert result["route"] == "D" + assert result["path"] == "aa,bb" + + def test_direct_route_includes_empty_path_field(self): + data = {"timestamp": 0, "data": "0200", "snr": 1.0, "rssi": -70} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert result["route"] == "D" + assert "path" in result + assert result["path"] == "" + + def test_unknown_version_uses_defaults(self): + # version=1 in high bits, type=5, route=1 + header = (1 << 6) | (5 << 2) | 1 + data = {"timestamp": 0, "data": f"{header:02x}00", "snr": 1.0, "rssi": -70} + result = _format_raw_packet(data, "Node", "AA" * 32) + assert result["packet_type"] == "0" + assert result["route"] == "U" + assert result["payload_len"] == "0" + + +class TestCalculatePacketHash: + def test_empty_bytes_returns_zeroes(self): + result = _calculate_packet_hash(b"") + assert result == "0" * 16 + + def test_returns_16_uppercase_hex_chars(self): + # Simple flood packet: header(1) + path_len(1) + payload + raw = bytes([0x01, 0x00, 0xAA, 0xBB]) # FLOOD, no path, payload=0xAABB + result = _calculate_packet_hash(raw) + assert len(result) == 16 + assert result == result.upper() + + def test_flood_packet_hash(self): + """FLOOD route (0x01): no transport codes, header + path_len + payload.""" + import hashlib + + # Header 0x11 = route=FLOOD(1), payload_type=4(ADVERT): (4<<2)|1 = 0x11 + payload = b"\xde\xad" + raw = bytes([0x11, 0x00]) + payload # header, path_len=0, payload + result = _calculate_packet_hash(raw) + + # Expected: sha256(payload_type_byte + payload_data)[:16].upper() + expected = hashlib.sha256(bytes([4]) + payload).hexdigest()[:16].upper() + assert result == expected + + def test_transport_flood_skips_transport_codes(self): + """TRANSPORT_FLOOD (0x00): has 4 bytes of transport codes after header.""" + import hashlib + + # Header 0x10 = route=TRANSPORT_FLOOD(0), payload_type=4: (4<<2)|0 = 0x10 + transport_codes = b"\x01\x02\x03\x04" + payload = b"\xca\xfe" + raw = bytes([0x10]) + transport_codes + bytes([0x00]) + payload + result = _calculate_packet_hash(raw) + + expected = hashlib.sha256(bytes([4]) + payload).hexdigest()[:16].upper() + assert result == expected + + def test_transport_direct_skips_transport_codes(self): + """TRANSPORT_DIRECT (0x03): also has 4 bytes of transport codes.""" + import hashlib + + # Header 0x13 = route=TRANSPORT_DIRECT(3), payload_type=4: (4<<2)|3 = 0x13 + transport_codes = b"\x05\x06\x07\x08" + payload = b"\xbe\xef" + raw = bytes([0x13]) + transport_codes + bytes([0x00]) + payload + result = _calculate_packet_hash(raw) + + expected = hashlib.sha256(bytes([4]) + payload).hexdigest()[:16].upper() + assert result == expected + + def test_trace_packet_includes_path_len_in_hash(self): + """TRACE packets (type 9) include path_len as uint16_t LE in the hash.""" + import hashlib + + # Header for TRACE with FLOOD route: (9<<2)|1 = 0x25 + path_len = 3 + path_data = b"\xaa\xbb\xcc" + payload = b"\x01\x02" + raw = bytes([0x25, path_len]) + path_data + payload + result = _calculate_packet_hash(raw) + + expected_hash = ( + hashlib.sha256(bytes([9]) + path_len.to_bytes(2, byteorder="little") + payload) + .hexdigest()[:16] + .upper() + ) + assert result == expected_hash + + def test_with_path_data(self): + """Packet with non-zero path_len should skip path bytes to reach payload.""" + import hashlib + + # FLOOD route, payload_type=2 (TXT_MSG): (2<<2)|1 = 0x09 + path_data = b"\xaa\xbb" # 2 bytes of path + payload = b"\x48\x65\x6c\x6c\x6f" # "Hello" + raw = bytes([0x09, 0x02]) + path_data + payload + result = _calculate_packet_hash(raw) + + expected = hashlib.sha256(bytes([2]) + payload).hexdigest()[:16].upper() + assert result == expected + + def test_truncated_packet_returns_zeroes(self): + # Header says TRANSPORT_FLOOD, but missing path_len at required offset. + raw = bytes([0x10, 0x01, 0x02]) + assert _calculate_packet_hash(raw) == "0" * 16 + + +class TestCommunityMqttPublisher: + def test_initial_state(self): + pub = CommunityMqttPublisher() + assert pub.connected is False + assert pub._client is None + assert pub._task is None + + @pytest.mark.asyncio + async def test_publish_drops_when_disconnected(self): + pub = CommunityMqttPublisher() + # Should not raise + await pub.publish("topic", {"key": "value"}) + + @pytest.mark.asyncio + async def test_stop_resets_state(self): + pub = CommunityMqttPublisher() + pub.connected = True + pub._client = MagicMock() + await pub.stop() + assert pub.connected is False + assert pub._client is None + + def test_is_configured_false_when_disabled(self): + pub = CommunityMqttPublisher() + pub._settings = AppSettings(community_mqtt_enabled=False) + with patch("app.keystore.has_private_key", return_value=True): + assert pub._is_configured() is False + + def test_is_configured_false_when_no_private_key(self): + pub = CommunityMqttPublisher() + pub._settings = AppSettings(community_mqtt_enabled=True) + with patch("app.keystore.has_private_key", return_value=False): + assert pub._is_configured() is False + + def test_is_configured_true_when_enabled_with_key(self): + pub = CommunityMqttPublisher() + pub._settings = AppSettings(community_mqtt_enabled=True) + with patch("app.keystore.has_private_key", return_value=True): + assert pub._is_configured() is True + + +class TestCommunityMqttBroadcast: + def test_filters_non_raw_packet(self): + """Non-raw_packet events should be ignored.""" + with patch("app.community_mqtt.community_publisher") as mock_pub: + mock_pub.connected = True + mock_pub._settings = AppSettings(community_mqtt_enabled=True) + community_mqtt_broadcast("message", {"text": "hello"}) + # No asyncio.create_task should be called for non-raw_packet events + # Since we're filtering, we just verify no exception + + def test_skips_when_disconnected(self): + """Should not publish when disconnected.""" + with ( + patch("app.community_mqtt.community_publisher") as mock_pub, + patch("app.community_mqtt.asyncio.create_task") as mock_task, + ): + mock_pub.connected = False + mock_pub._settings = AppSettings(community_mqtt_enabled=True) + community_mqtt_broadcast("raw_packet", {"data": "00"}) + mock_task.assert_not_called() + + def test_skips_when_settings_none(self): + """Should not publish when settings are None.""" + with ( + patch("app.community_mqtt.community_publisher") as mock_pub, + patch("app.community_mqtt.asyncio.create_task") as mock_task, + ): + mock_pub.connected = True + mock_pub._settings = None + community_mqtt_broadcast("raw_packet", {"data": "00"}) + mock_task.assert_not_called() + + +class TestParseBrokerAddress: + def test_hostname_only_uses_default_port(self): + host, port = _parse_broker_address("mqtt-us-v1.letsmesh.net") + assert host == "mqtt-us-v1.letsmesh.net" + assert port == _DEFAULT_PORT + + def test_hostname_with_port(self): + host, port = _parse_broker_address("mqtt-us-v1.letsmesh.net:8883") + assert host == "mqtt-us-v1.letsmesh.net" + assert port == 8883 + + def test_hostname_with_port_443(self): + host, port = _parse_broker_address("broker.example.com:443") + assert host == "broker.example.com" + assert port == 443 + + def test_invalid_port_uses_default(self): + host, port = _parse_broker_address("broker.example.com:abc") + assert host == "broker.example.com:abc" + assert port == _DEFAULT_PORT + + def test_empty_string(self): + host, port = _parse_broker_address("") + assert host == "" + assert port == _DEFAULT_PORT + + +class TestPublishFailureSetsDisconnected: + @pytest.mark.asyncio + async def test_publish_error_sets_connected_false(self): + """A publish error should set connected=False so the loop can detect it.""" + pub = CommunityMqttPublisher() + pub.connected = True + mock_client = MagicMock() + mock_client.publish = MagicMock(side_effect=Exception("broker gone")) + pub._client = mock_client + await pub.publish("topic", {"data": "test"}) + assert pub.connected is False diff --git a/tests/test_migrations.py b/tests/test_migrations.py index 64c3948..7c418e9 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -100,8 +100,8 @@ class TestMigration001: # Run migrations applied = await run_migrations(conn) - assert applied == 31 # All migrations run - assert await get_version(conn) == 31 + assert applied == 32 # All migrations run + assert await get_version(conn) == 32 # Verify columns exist by inserting and selecting await conn.execute( @@ -183,9 +183,9 @@ class TestMigration001: applied1 = await run_migrations(conn) applied2 = await run_migrations(conn) - assert applied1 == 31 # All migrations run + assert applied1 == 32 # All migrations run assert applied2 == 0 # No migrations on second run - assert await get_version(conn) == 31 + assert await get_version(conn) == 32 finally: await conn.close() @@ -246,8 +246,8 @@ class TestMigration001: applied = await run_migrations(conn) # All migrations applied (version incremented) but no error - assert applied == 31 - assert await get_version(conn) == 31 + assert applied == 32 + assert await get_version(conn) == 32 finally: await conn.close() @@ -374,10 +374,10 @@ class TestMigration013: ) await conn.commit() - # Run migration 13 (plus 14-27 which also run) + # Run migration 13 (plus 14-33 which also run) applied = await run_migrations(conn) - assert applied == 19 - assert await get_version(conn) == 31 + assert applied == 20 + assert await get_version(conn) == 32 # Verify bots array was created with migrated data cursor = await conn.execute("SELECT bots FROM app_settings WHERE id = 1") @@ -497,7 +497,7 @@ class TestMigration018: assert await cursor.fetchone() is not None await run_migrations(conn) - assert await get_version(conn) == 31 + assert await get_version(conn) == 32 # Verify autoindex is gone cursor = await conn.execute( @@ -575,8 +575,8 @@ class TestMigration018: await conn.commit() applied = await run_migrations(conn) - assert applied == 14 # Migrations 18-31 run (18+19 skip internally) - assert await get_version(conn) == 31 + assert applied == 15 # Migrations 18-32 run (18+19 skip internally) + assert await get_version(conn) == 32 finally: await conn.close() @@ -648,7 +648,7 @@ class TestMigration019: assert await cursor.fetchone() is not None await run_migrations(conn) - assert await get_version(conn) == 31 + assert await get_version(conn) == 32 # Verify autoindex is gone cursor = await conn.execute( @@ -714,8 +714,8 @@ class TestMigration020: assert (await cursor.fetchone())[0] == "delete" applied = await run_migrations(conn) - assert applied == 12 # Migrations 20-31 - assert await get_version(conn) == 31 + assert applied == 13 # Migrations 20-32 + assert await get_version(conn) == 32 # Verify WAL mode cursor = await conn.execute("PRAGMA journal_mode") @@ -745,7 +745,7 @@ class TestMigration020: await set_version(conn, 20) applied = await run_migrations(conn) - assert applied == 11 # Migrations 21-31 still run + assert applied == 12 # Migrations 21-32 still run # Still WAL + INCREMENTAL cursor = await conn.execute("PRAGMA journal_mode") @@ -803,8 +803,8 @@ class TestMigration028: await conn.commit() applied = await run_migrations(conn) - assert applied == 4 - assert await get_version(conn) == 31 + assert applied == 5 + assert await get_version(conn) == 32 # Verify payload_hash column is now BLOB cursor = await conn.execute("PRAGMA table_info(raw_packets)") @@ -873,8 +873,8 @@ class TestMigration028: await conn.commit() applied = await run_migrations(conn) - assert applied == 4 # Version still bumped - assert await get_version(conn) == 31 + assert applied == 5 # Version still bumped + assert await get_version(conn) == 32 # Verify data unchanged cursor = await conn.execute("SELECT payload_hash FROM raw_packets") @@ -882,3 +882,60 @@ class TestMigration028: assert bytes(row["payload_hash"]) == b"\xab" * 32 finally: await conn.close() + + +class TestMigration032: + """Test migration 032: add community MQTT columns to app_settings.""" + + @pytest.mark.asyncio + async def test_migration_adds_all_community_mqtt_columns(self): + """Migration adds enabled, iata, broker, and email columns.""" + conn = await aiosqlite.connect(":memory:") + conn.row_factory = aiosqlite.Row + try: + await set_version(conn, 31) + + # Create app_settings without community columns (pre-migration schema) + await conn.execute(""" + CREATE TABLE app_settings ( + id INTEGER PRIMARY KEY, + max_radio_contacts INTEGER DEFAULT 200, + favorites TEXT DEFAULT '[]', + auto_decrypt_dm_on_advert INTEGER DEFAULT 0, + sidebar_sort_order TEXT DEFAULT 'recent', + last_message_times TEXT DEFAULT '{}', + preferences_migrated INTEGER DEFAULT 0, + advert_interval INTEGER DEFAULT 0, + last_advert_time INTEGER DEFAULT 0, + bots TEXT DEFAULT '[]', + mqtt_broker_host TEXT DEFAULT '', + mqtt_broker_port INTEGER DEFAULT 1883, + mqtt_username TEXT DEFAULT '', + mqtt_password TEXT DEFAULT '', + mqtt_use_tls INTEGER DEFAULT 0, + mqtt_tls_insecure INTEGER DEFAULT 0, + mqtt_topic_prefix TEXT DEFAULT 'meshcore', + mqtt_publish_messages INTEGER DEFAULT 0, + mqtt_publish_raw_packets INTEGER DEFAULT 0 + ) + """) + await conn.execute("INSERT INTO app_settings (id) VALUES (1)") + await conn.commit() + + applied = await run_migrations(conn) + assert applied == 1 + assert await get_version(conn) == 32 + + # Verify all columns exist with correct defaults + cursor = await conn.execute( + """SELECT community_mqtt_enabled, community_mqtt_iata, + community_mqtt_broker, community_mqtt_email + FROM app_settings WHERE id = 1""" + ) + row = await cursor.fetchone() + assert row["community_mqtt_enabled"] == 0 + assert row["community_mqtt_iata"] == "" + assert row["community_mqtt_broker"] == "mqtt-us-v1.letsmesh.net" + assert row["community_mqtt_email"] == "" + finally: + await conn.close() diff --git a/tests/test_repository.py b/tests/test_repository.py index f9daaaf..f5f81b3 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -502,6 +502,10 @@ class TestAppSettingsRepository: "mqtt_topic_prefix": "meshcore", "mqtt_publish_messages": 0, "mqtt_publish_raw_packets": 0, + "community_mqtt_enabled": 0, + "community_mqtt_iata": "", + "community_mqtt_broker": "mqtt-us-v1.letsmesh.net", + "community_mqtt_email": "", } ) mock_conn.execute = AsyncMock(return_value=mock_cursor) diff --git a/tests/test_settings_router.py b/tests/test_settings_router.py index 74fdce8..7d99906 100644 --- a/tests/test_settings_router.py +++ b/tests/test_settings_router.py @@ -117,6 +117,78 @@ class TestUpdateSettings: assert settings.mqtt_publish_messages is False assert settings.mqtt_publish_raw_packets is False + @pytest.mark.asyncio + async def test_community_mqtt_fields_round_trip(self, test_db): + """Community MQTT settings should be saved and retrieved correctly.""" + mock_community = type("MockCommunity", (), {"restart": AsyncMock()})() + with patch("app.community_mqtt.community_publisher", mock_community): + result = await update_settings( + AppSettingsUpdate( + community_mqtt_enabled=True, + community_mqtt_iata="DEN", + community_mqtt_broker="custom-broker.example.com", + community_mqtt_email="test@example.com", + ) + ) + + assert result.community_mqtt_enabled is True + assert result.community_mqtt_iata == "DEN" + assert result.community_mqtt_broker == "custom-broker.example.com" + assert result.community_mqtt_email == "test@example.com" + + # Verify persistence + fresh = await AppSettingsRepository.get() + assert fresh.community_mqtt_enabled is True + assert fresh.community_mqtt_iata == "DEN" + assert fresh.community_mqtt_broker == "custom-broker.example.com" + assert fresh.community_mqtt_email == "test@example.com" + + # Verify restart was called + mock_community.restart.assert_called_once() + + @pytest.mark.asyncio + async def test_community_mqtt_iata_validation_rejects_invalid(self, test_db): + """Invalid IATA codes should be rejected.""" + with pytest.raises(HTTPException) as exc: + await update_settings(AppSettingsUpdate(community_mqtt_iata="A")) + assert exc.value.status_code == 400 + + with pytest.raises(HTTPException) as exc: + await update_settings(AppSettingsUpdate(community_mqtt_iata="ABCDE")) + assert exc.value.status_code == 400 + + with pytest.raises(HTTPException) as exc: + await update_settings(AppSettingsUpdate(community_mqtt_iata="12")) + assert exc.value.status_code == 400 + + with pytest.raises(HTTPException) as exc: + await update_settings(AppSettingsUpdate(community_mqtt_iata="ABCD")) + assert exc.value.status_code == 400 + + @pytest.mark.asyncio + async def test_community_mqtt_enable_requires_iata(self, test_db): + """Enabling community MQTT without a valid IATA code should be rejected.""" + with pytest.raises(HTTPException) as exc: + await update_settings(AppSettingsUpdate(community_mqtt_enabled=True)) + assert exc.value.status_code == 400 + assert "IATA" in exc.value.detail + + @pytest.mark.asyncio + async def test_community_mqtt_iata_uppercased(self, test_db): + """IATA codes should be uppercased.""" + mock_community = type("MockCommunity", (), {"restart": AsyncMock()})() + with patch("app.community_mqtt.community_publisher", mock_community): + result = await update_settings(AppSettingsUpdate(community_mqtt_iata="den")) + assert result.community_mqtt_iata == "DEN" + + @pytest.mark.asyncio + async def test_community_mqtt_defaults_on_fresh_db(self, test_db): + """Community MQTT fields should have correct defaults on a fresh database.""" + settings = await AppSettingsRepository.get() + assert settings.community_mqtt_enabled is False + assert settings.community_mqtt_iata == "" + assert settings.community_mqtt_email == "" + class TestToggleFavorite: @pytest.mark.asyncio