From 2f40b4a73072ee4d37551dc5d67009f71aad9281 Mon Sep 17 00:00:00 2001 From: yellowcooln Date: Sat, 21 Feb 2026 21:40:14 -0500 Subject: [PATCH] Add LetsMesh compatibility ingest, decoder integration, and admin auth updates --- .env.example | 38 + AGENTS.md | 1 + Dockerfile | 69 ++ PLAN.md | 3 + README.md | 40 +- SCHEMAS.md | 33 +- agent.md | 4 + docker-compose.yml | 17 + src/meshcore_hub/api/app.py | 12 + src/meshcore_hub/api/cli.py | 39 +- src/meshcore_hub/api/dependencies.py | 8 + src/meshcore_hub/api/routes/nodes.py | 39 +- src/meshcore_hub/collector/cli.py | 70 ++ .../collector/handlers/advertisement.py | 40 + .../collector/handlers/message.py | 2 +- .../collector/letsmesh_decoder.py | 272 ++++++ src/meshcore_hub/collector/subscriber.py | 784 +++++++++++++++++- src/meshcore_hub/common/config.py | 77 ++ src/meshcore_hub/common/mqtt.py | 64 +- src/meshcore_hub/common/schemas/events.py | 8 + src/meshcore_hub/web/app.py | 45 +- .../web/static/js/spa/components.js | 66 +- .../web/static/js/spa/pages/dashboard.js | 69 +- .../web/static/js/spa/pages/messages.js | 209 ++++- .../web/static/js/spa/pages/node-detail.js | 2 +- .../web/static/js/spa/pages/nodes.js | 1 + src/meshcore_hub/web/static/locales/en.json | 3 +- .../web/static/locales/languages.md | 3 +- tests/test_api/test_nodes.py | 46 + .../test_handlers/test_advertisement.py | 20 + tests/test_collector/test_letsmesh_decoder.py | 125 +++ tests/test_collector/test_subscriber.py | 523 +++++++++++- tests/test_common/test_config.py | 41 + tests/test_common/test_mqtt.py | 57 ++ tests/test_web/test_admin.py | 56 ++ tests/test_web/test_messages.py | 14 + 36 files changed, 2799 insertions(+), 101 deletions(-) create mode 100644 agent.md create mode 100644 src/meshcore_hub/collector/letsmesh_decoder.py create mode 100644 tests/test_collector/test_letsmesh_decoder.py create mode 100644 tests/test_common/test_mqtt.py diff --git a/.env.example b/.env.example index 9942c97..6c7f2ea 100644 --- a/.env.example +++ b/.env.example @@ -80,6 +80,14 @@ MQTT_PREFIX=meshcore # When enabled, uses TLS with system CA certificates (e.g., for Let's Encrypt) MQTT_TLS=false +# MQTT transport protocol +# Options: tcp, websockets +MQTT_TRANSPORT=tcp + +# MQTT WebSocket path (used only when MQTT_TRANSPORT=websockets) +# Common values: /mqtt, / +MQTT_WS_PATH=/mqtt + # External port mappings for local MQTT broker (--profile mqtt only) MQTT_EXTERNAL_PORT=1883 MQTT_WS_PORT=9001 @@ -123,6 +131,30 @@ CONTACT_CLEANUP_DAYS=7 # ============================================================================= # The collector subscribes to MQTT events and stores them in the database +# Collector MQTT ingest mode +# - native: expects //event/ topics +# - letsmesh_upload: expects LetsMesh observer uploads on +# //(packets|status|internal) +COLLECTOR_INGEST_MODE=native + +# LetsMesh decoder support (used only when COLLECTOR_INGEST_MODE=letsmesh_upload) +# Set to false to disable external packet decoding +COLLECTOR_LETSMESH_DECODER_ENABLED=true + +# Decoder command (must be available in container PATH) +# Examples: meshcore-decoder, /usr/local/bin/meshcore-decoder, npx meshcore-decoder +COLLECTOR_LETSMESH_DECODER_COMMAND=meshcore-decoder + +# Optional: channel secret keys (comma or space separated) used to decrypt GroupText +# packets. This supports unlimited keys. +# Note: Public + #test keys are built into the collector code by default. +# To show friendly channel names in the web feed, use label=hex (example: bot=ABCDEF...). +# Without keys, encrypted packets cannot be shown as plaintext. +# COLLECTOR_LETSMESH_DECODER_KEYS= + +# Timeout in seconds per decode invocation +COLLECTOR_LETSMESH_DECODER_TIMEOUT_SECONDS=2.0 + # ------------------- # Webhook Settings # ------------------- @@ -235,6 +267,12 @@ WEB_PORT=8080 # Supported: en (see src/meshcore_hub/web/static/locales/ for available translations) # WEB_LOCALE=en +# Locale used for date/time formatting in the web dashboard +# Controls date ordering only; 24-hour clock is still used by default +# Examples: en-US (MM/DD/YYYY), en-GB (DD/MM/YYYY) +# Default: en-US +# WEB_DATETIME_LOCALE=en-US + # Auto-refresh interval in seconds for list pages (nodes, advertisements, messages) # Set to 0 to disable auto-refresh # Default: 30 diff --git a/AGENTS.md b/AGENTS.md index 0f84468..e7edc97 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -12,6 +12,7 @@ This document provides context and guidelines for AI coding assistants working o - `source .venv/bin/activate` * You MUST install all project dependencies using `pip install -e ".[dev]"` command` * You MUST install `pre-commit` for quality checks +* You MUST keep project documentation in sync with behavior/config/schema changes made in code (at minimum update relevant sections in `README.md`, `SCHEMAS.md`, `PLAN.md`, and/or `TASKS.md` when applicable) * Before commiting: - Run **targeted tests** for the components you changed, not the full suite: - `pytest tests/test_web/` for web-only changes (templates, static JS, web routes) diff --git a/Dockerfile b/Dockerfile index 31eb721..9c083e2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -65,9 +65,78 @@ ENV PYTHONDONTWRITEBYTECODE=1 \ RUN apt-get update && apt-get install -y --no-install-recommends \ # For serial port access udev \ + # LetsMesh decoder runtime + nodejs \ + npm \ && rm -rf /var/lib/apt/lists/* \ && mkdir -p /data +# Install meshcore-decoder CLI and patch ESM compatibility for Node 18 runtime. +RUN mkdir -p /opt/letsmesh-decoder \ + && cd /opt/letsmesh-decoder \ + && npm init -y >/dev/null 2>&1 \ + && npm install --omit=dev @michaelhart/meshcore-decoder@0.2.7 \ + && python - <<'PY' +from pathlib import Path + +path = Path( + "/opt/letsmesh-decoder/node_modules/@michaelhart/meshcore-decoder/" + "dist/crypto/ed25519-verifier.js" +) +content = path.read_text(encoding="utf-8") + +old_import = 'const ed25519 = __importStar(require("@noble/ed25519"));' +new_import = """let _ed25519 = null; +async function getEd25519() { + if (_ed25519) { + return _ed25519; + } + const mod = await import("@noble/ed25519"); + _ed25519 = mod.default ? mod.default : mod; + try { + _ed25519.etc.sha512Async = sha512Hash; + } + catch (error) { + console.debug("Could not set async SHA-512:", error); + } + try { + _ed25519.etc.sha512Sync = sha512HashSync; + } + catch (error) { + console.debug("Could not set up synchronous SHA-512:", error); + } + return _ed25519; +}""" +if old_import not in content: + raise RuntimeError("meshcore-decoder patch failed: import line not found") +content = content.replace(old_import, new_import, 1) + +old_setup = """// Set up SHA-512 for @noble/ed25519 +ed25519.etc.sha512Async = sha512Hash; +// Always set up sync version - @noble/ed25519 requires it +// It will throw in browser environments, which @noble/ed25519 can handle +try { + ed25519.etc.sha512Sync = sha512HashSync; +} +catch (error) { + console.debug('Could not set up synchronous SHA-512:', error); +} +""" +if old_setup not in content: + raise RuntimeError("meshcore-decoder patch failed: sha512 setup block not found") +content = content.replace(old_setup, "", 1) + +old_verify = " return await ed25519.verify(signature, message, publicKey);" +new_verify = """ const ed25519 = await getEd25519(); + return await ed25519.verify(signature, message, publicKey);""" +if old_verify not in content: + raise RuntimeError("meshcore-decoder patch failed: verify line not found") +content = content.replace(old_verify, new_verify, 1) + +path.write_text(content, encoding="utf-8") +PY +RUN ln -s /opt/letsmesh-decoder/node_modules/.bin/meshcore-decoder /usr/local/bin/meshcore-decoder + # Copy virtual environment from builder COPY --from=builder /opt/venv /opt/venv ENV PATH="/opt/venv/bin:$PATH" diff --git a/PLAN.md b/PLAN.md index c34d733..7c823a8 100644 --- a/PLAN.md +++ b/PLAN.md @@ -506,6 +506,9 @@ ${DATA_HOME}/ | WEB_PORT | 8080 | Web bind port | | API_BASE_URL | http://localhost:8000 | API endpoint | | API_KEY | | API key for queries | +| WEB_LOCALE | en | UI translation locale | +| WEB_DATETIME_LOCALE | en-US | Date formatting locale for UI timestamps | +| TZ | UTC | Timezone used for UI timestamp rendering | | NETWORK_DOMAIN | | Network domain | | NETWORK_NAME | MeshCore Network | Network name | | NETWORK_CITY | | City location | diff --git a/README.md b/README.md index f868b4a..1590a27 100644 --- a/README.md +++ b/README.md @@ -278,6 +278,8 @@ All components are configured via environment variables. Create a `.env` file or | `MQTT_PASSWORD` | *(none)* | MQTT password (optional) | | `MQTT_PREFIX` | `meshcore` | Topic prefix for all MQTT messages | | `MQTT_TLS` | `false` | Enable TLS/SSL for MQTT connection | +| `MQTT_TRANSPORT` | `tcp` | MQTT transport (`tcp` or `websockets`) | +| `MQTT_WS_PATH` | `/mqtt` | MQTT WebSocket path (used when `MQTT_TRANSPORT=websockets`) | ### Interface Settings @@ -291,6 +293,38 @@ All components are configured via environment variables. Create a `.env` file or | `CONTACT_CLEANUP_ENABLED` | `true` | Enable automatic removal of stale contacts from companion node | | `CONTACT_CLEANUP_DAYS` | `7` | Remove contacts not advertised for this many days | +### Collector Settings + +| Variable | Default | Description | +|----------|---------|-------------| +| `COLLECTOR_INGEST_MODE` | `native` | Ingest mode (`native` or `letsmesh_upload`) | +| `COLLECTOR_LETSMESH_DECODER_ENABLED` | `true` | Enable external LetsMesh packet decoding | +| `COLLECTOR_LETSMESH_DECODER_COMMAND` | `meshcore-decoder` | Decoder CLI command | +| `COLLECTOR_LETSMESH_DECODER_KEYS` | *(none)* | Additional decoder channel keys (`label=hex`, `label:hex`, or `hex`) | +| `COLLECTOR_LETSMESH_DECODER_TIMEOUT_SECONDS` | `2.0` | Timeout per decoder invocation | + +#### LetsMesh Upload Compatibility Mode + +When `COLLECTOR_INGEST_MODE=letsmesh_upload`, the collector subscribes to: + +- `/+/packets` +- `/+/status` +- `/+/internal` + +Normalization behavior: + +- `status` packets are mapped to `advertisement` events. +- Decoder payload types `4` and `11` are also mapped to `advertisement` events when node identity metadata is present. +- `packet_type=5` packets are mapped to `channel_msg_recv`. +- `packet_type=1`, `2`, and `7` packets are mapped to `contact_msg_recv` when decryptable text is available. +- For channel packets, if a channel key is available, a channel label is attached (for example `Public` or `#test`) for UI display. +- In the messages feed and dashboard channel sections, known channel indexes are preferred for labels (`17 -> Public`, `217 -> #test`) to avoid stale channel-name mismatches. +- Additional channel names are loaded from `COLLECTOR_LETSMESH_DECODER_KEYS` when entries are provided as `label=hex` (for example `bot=`). +- Decoder-advertisement packets with location metadata update node GPS (`lat/lon`) for map display. +- Packets without decryptable message text are kept as informational `letsmesh_packet` events and are not shown in the messages feed; when decode succeeds the decoded JSON is attached to those packet log events. +- When decoder output includes a human sender (`payload.decoded.decrypted.sender`), message text is normalized to `Name: Message` before storage; receiver/observer names are never used as sender fallback. +- The collector keeps built-in keys for `Public` and `#test`, and merges any additional keys from `COLLECTOR_LETSMESH_DECODER_KEYS`. + ### Webhooks The collector can forward certain events to external HTTP endpoints: @@ -351,9 +385,13 @@ The collector automatically cleans up old event data and inactive nodes: | `API_KEY` | *(none)* | API key for web dashboard queries (optional) | | `WEB_THEME` | `dark` | Default theme (`dark` or `light`). Users can override via theme toggle in navbar. | | `WEB_LOCALE` | `en` | Locale/language for the web dashboard (e.g., `en`, `es`, `fr`) | +| `WEB_DATETIME_LOCALE` | `en-US` | Locale used for date formatting in the web dashboard (e.g., `en-US` for MM/DD/YYYY, `en-GB` for DD/MM/YYYY). | | `WEB_AUTO_REFRESH_SECONDS` | `30` | Auto-refresh interval in seconds for list pages (0 to disable) | -| `WEB_ADMIN_ENABLED` | `false` | Enable admin interface at /a/ (requires auth proxy) | +| `WEB_ADMIN_ENABLED` | `false` | Enable admin interface at /a/ (requires auth proxy: `X-Forwarded-User`/`X-Auth-Request-User` or forwarded `Authorization: Basic ...`) | | `TZ` | `UTC` | Timezone for displaying dates/times (e.g., `America/New_York`, `Europe/London`) | + +Timezone handling note: +- API timestamps that omit an explicit timezone suffix are treated as UTC before rendering in the configured `TZ`. | `NETWORK_DOMAIN` | *(none)* | Network domain name (optional) | | `NETWORK_NAME` | `MeshCore Network` | Display name for the network | | `NETWORK_CITY` | *(none)* | City where network is located | diff --git a/SCHEMAS.md b/SCHEMAS.md index 55d6775..616702a 100644 --- a/SCHEMAS.md +++ b/SCHEMAS.md @@ -45,15 +45,19 @@ Node advertisements announcing presence and metadata. "public_key": "string (64 hex chars)", "name": "string (optional)", "adv_type": "string (optional)", - "flags": "integer (optional)" + "flags": "integer (optional)", + "lat": "number (optional)", + "lon": "number (optional)" } ``` **Field Descriptions**: - `public_key`: Node's full 64-character hexadecimal public key (required) - `name`: Node name/alias (e.g., "Gateway-01", "Alice") -- `adv_type`: Node type - one of: `"chat"`, `"repeater"`, `"room"`, `"none"` +- `adv_type`: Node type - common values: `"chat"`, `"repeater"`, `"room"`, `"companion"` (other values may appear from upstream feeds and are normalized by the collector when possible) - `flags`: Node capability/status flags (bitmask) +- `lat`: GPS latitude when provided by decoder metadata +- `lon`: GPS longitude when provided by decoder metadata **Example**: ```json @@ -61,7 +65,9 @@ Node advertisements announcing presence and metadata. "public_key": "4767c2897c256df8d85a5fa090574284bfd15b92d47359741b0abd5098ed30c4", "name": "Gateway-01", "adv_type": "repeater", - "flags": 218 + "flags": 218, + "lat": 42.470001, + "lon": -71.330001 } ``` @@ -90,7 +96,7 @@ Direct/private messages between two nodes. ``` **Field Descriptions**: -- `pubkey_prefix`: First 12 characters of sender's public key +- `pubkey_prefix`: First 12 characters of sender's public key (or source hash prefix in compatibility ingest modes) - `path_len`: Number of hops message traveled - `txt_type`: Message type indicator (0=plain, 2=signed, etc.) - `signature`: Message signature (8 hex chars) when `txt_type=2` @@ -128,7 +134,9 @@ Group/broadcast messages on specific channels. **Payload Schema**: ```json { - "channel_idx": "integer", + "channel_idx": "integer (optional)", + "channel_name": "string (optional)", + "pubkey_prefix": "string (12 chars, optional)", "path_len": "integer (optional)", "txt_type": "integer (optional)", "signature": "string (optional)", @@ -139,7 +147,9 @@ Group/broadcast messages on specific channels. ``` **Field Descriptions**: -- `channel_idx`: Channel number (0-255) +- `channel_idx`: Channel number (0-255) when available +- `channel_name`: Channel display label (e.g., `"Public"`, `"#test"`) when available +- `pubkey_prefix`: First 12 characters of sender's public key when available - `path_len`: Number of hops message traveled - `txt_type`: Message type indicator (0=plain, 2=signed, etc.) - `signature`: Message signature (8 hex chars) when `txt_type=2` @@ -166,6 +176,17 @@ Group/broadcast messages on specific channels. - Send only text: `$.data.text` - Send channel + text: `$.data.[channel_idx,text]` +**Compatibility ingest note**: +- In LetsMesh upload compatibility mode, packet type `5` is normalized to `CHANNEL_MSG_RECV` and packet types `1`, `2`, and `7` are normalized to `CONTACT_MSG_RECV` when decryptable text is available. +- LetsMesh packets without decryptable message text are treated as informational `letsmesh_packet` events instead of message events. +- For UI labels, known channel indexes are mapped (`17 -> Public`, `217 -> #test`) and preferred over ambiguous/stale channel-name hints. +- Additional channel labels can be provided through `COLLECTOR_LETSMESH_DECODER_KEYS` using `label=hex` entries. +- When decoder output includes a human sender (`payload.decoded.decrypted.sender`), message text is normalized to `Name: Message`; sender identity remains unknown when only hash/prefix metadata is available. + +**Compatibility ingest note (advertisements)**: +- In LetsMesh upload compatibility mode, decoded payload types `4` and `11` are normalized to `ADVERTISEMENT` when node identity metadata is present. +- Payload type `4` location metadata (`appData.location.latitude/longitude`) is mapped to node `lat/lon` for map rendering. + --- ## Persisted Events (Non-Webhook) diff --git a/agent.md b/agent.md new file mode 100644 index 0000000..48d1d44 --- /dev/null +++ b/agent.md @@ -0,0 +1,4 @@ +# Agent Notes + +Local workspace notes for MeshCore Hub development. +This file is intentionally untracked. diff --git a/docker-compose.yml b/docker-compose.yml index 0a6602b..21e21ab 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,6 +48,8 @@ services: - MQTT_PASSWORD=${MQTT_PASSWORD:-} - MQTT_PREFIX=${MQTT_PREFIX:-meshcore} - MQTT_TLS=${MQTT_TLS:-false} + - MQTT_TRANSPORT=${MQTT_TRANSPORT:-tcp} + - MQTT_WS_PATH=${MQTT_WS_PATH:-/mqtt} - SERIAL_PORT=${SERIAL_PORT:-/dev/ttyUSB0} - SERIAL_BAUD=${SERIAL_BAUD:-115200} - NODE_ADDRESS=${NODE_ADDRESS:-} @@ -83,6 +85,8 @@ services: - MQTT_PASSWORD=${MQTT_PASSWORD:-} - MQTT_PREFIX=${MQTT_PREFIX:-meshcore} - MQTT_TLS=${MQTT_TLS:-false} + - MQTT_TRANSPORT=${MQTT_TRANSPORT:-tcp} + - MQTT_WS_PATH=${MQTT_WS_PATH:-/mqtt} - SERIAL_PORT=${SERIAL_PORT_SENDER:-/dev/ttyUSB1} - SERIAL_BAUD=${SERIAL_BAUD:-115200} - NODE_ADDRESS=${NODE_ADDRESS_SENDER:-} @@ -115,6 +119,8 @@ services: - MQTT_PASSWORD=${MQTT_PASSWORD:-} - MQTT_PREFIX=${MQTT_PREFIX:-meshcore} - MQTT_TLS=${MQTT_TLS:-false} + - MQTT_TRANSPORT=${MQTT_TRANSPORT:-tcp} + - MQTT_WS_PATH=${MQTT_WS_PATH:-/mqtt} - MOCK_DEVICE=true - NODE_ADDRESS=${NODE_ADDRESS:-0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef} command: ["interface", "receiver", "--mock"] @@ -152,6 +158,13 @@ services: - MQTT_PASSWORD=${MQTT_PASSWORD:-} - MQTT_PREFIX=${MQTT_PREFIX:-meshcore} - MQTT_TLS=${MQTT_TLS:-false} + - MQTT_TRANSPORT=${MQTT_TRANSPORT:-tcp} + - MQTT_WS_PATH=${MQTT_WS_PATH:-/mqtt} + - COLLECTOR_INGEST_MODE=${COLLECTOR_INGEST_MODE:-native} + - COLLECTOR_LETSMESH_DECODER_ENABLED=${COLLECTOR_LETSMESH_DECODER_ENABLED:-true} + - COLLECTOR_LETSMESH_DECODER_COMMAND=${COLLECTOR_LETSMESH_DECODER_COMMAND:-meshcore-decoder} + - COLLECTOR_LETSMESH_DECODER_KEYS=${COLLECTOR_LETSMESH_DECODER_KEYS:-} + - COLLECTOR_LETSMESH_DECODER_TIMEOUT_SECONDS=${COLLECTOR_LETSMESH_DECODER_TIMEOUT_SECONDS:-2.0} - DATA_HOME=/data - SEED_HOME=/seed # Webhook configuration @@ -210,6 +223,8 @@ services: - MQTT_PASSWORD=${MQTT_PASSWORD:-} - MQTT_PREFIX=${MQTT_PREFIX:-meshcore} - MQTT_TLS=${MQTT_TLS:-false} + - MQTT_TRANSPORT=${MQTT_TRANSPORT:-tcp} + - MQTT_WS_PATH=${MQTT_WS_PATH:-/mqtt} - DATA_HOME=/data - API_HOST=0.0.0.0 - API_PORT=8000 @@ -255,6 +270,7 @@ services: - WEB_PORT=8080 - WEB_THEME=${WEB_THEME:-dark} - WEB_LOCALE=${WEB_LOCALE:-en} + - WEB_DATETIME_LOCALE=${WEB_DATETIME_LOCALE:-en-US} - WEB_ADMIN_ENABLED=${WEB_ADMIN_ENABLED:-false} - NETWORK_NAME=${NETWORK_NAME:-MeshCore Network} - NETWORK_CITY=${NETWORK_CITY:-} @@ -267,6 +283,7 @@ services: - NETWORK_WELCOME_TEXT=${NETWORK_WELCOME_TEXT:-} - CONTENT_HOME=/content - TZ=${TZ:-UTC} + - COLLECTOR_LETSMESH_DECODER_KEYS=${COLLECTOR_LETSMESH_DECODER_KEYS:-} # Feature flags (set to false to disable specific pages) - FEATURE_DASHBOARD=${FEATURE_DASHBOARD:-true} - FEATURE_NODES=${FEATURE_NODES:-true} diff --git a/src/meshcore_hub/api/app.py b/src/meshcore_hub/api/app.py index 87c220a..fd9b3ee 100644 --- a/src/meshcore_hub/api/app.py +++ b/src/meshcore_hub/api/app.py @@ -51,8 +51,12 @@ def create_app( admin_key: str | None = None, mqtt_host: str = "localhost", mqtt_port: int = 1883, + mqtt_username: str | None = None, + mqtt_password: str | None = None, mqtt_prefix: str = "meshcore", mqtt_tls: bool = False, + mqtt_transport: str = "tcp", + mqtt_ws_path: str = "/mqtt", cors_origins: list[str] | None = None, metrics_enabled: bool = True, metrics_cache_ttl: int = 60, @@ -65,8 +69,12 @@ def create_app( admin_key: Admin API key mqtt_host: MQTT broker host mqtt_port: MQTT broker port + mqtt_username: MQTT username + mqtt_password: MQTT password mqtt_prefix: MQTT topic prefix mqtt_tls: Enable TLS/SSL for MQTT connection + mqtt_transport: MQTT transport protocol (tcp or websockets) + mqtt_ws_path: WebSocket path (used when transport=websockets) cors_origins: Allowed CORS origins metrics_enabled: Enable Prometheus metrics endpoint at /metrics metrics_cache_ttl: Seconds to cache metrics output @@ -90,8 +98,12 @@ def create_app( app.state.admin_key = admin_key app.state.mqtt_host = mqtt_host app.state.mqtt_port = mqtt_port + app.state.mqtt_username = mqtt_username + app.state.mqtt_password = mqtt_password app.state.mqtt_prefix = mqtt_prefix app.state.mqtt_tls = mqtt_tls + app.state.mqtt_transport = mqtt_transport + app.state.mqtt_ws_path = mqtt_ws_path app.state.metrics_cache_ttl = metrics_cache_ttl # Configure CORS diff --git a/src/meshcore_hub/api/cli.py b/src/meshcore_hub/api/cli.py index 4e42d5a..e50eec4 100644 --- a/src/meshcore_hub/api/cli.py +++ b/src/meshcore_hub/api/cli.py @@ -60,11 +60,25 @@ import click envvar="MQTT_PORT", help="MQTT broker port", ) +@click.option( + "--mqtt-username", + type=str, + default=None, + envvar="MQTT_USERNAME", + help="MQTT username", +) +@click.option( + "--mqtt-password", + type=str, + default=None, + envvar="MQTT_PASSWORD", + help="MQTT password", +) @click.option( "--mqtt-prefix", type=str, default="meshcore", - envvar="MQTT_TOPIC_PREFIX", + envvar=["MQTT_PREFIX", "MQTT_TOPIC_PREFIX"], help="MQTT topic prefix", ) @click.option( @@ -74,6 +88,20 @@ import click envvar="MQTT_TLS", help="Enable TLS/SSL for MQTT connection", ) +@click.option( + "--mqtt-transport", + type=click.Choice(["tcp", "websockets"], case_sensitive=False), + default="tcp", + envvar="MQTT_TRANSPORT", + help="MQTT transport protocol", +) +@click.option( + "--mqtt-ws-path", + type=str, + default="/mqtt", + envvar="MQTT_WS_PATH", + help="MQTT WebSocket path (used when transport=websockets)", +) @click.option( "--cors-origins", type=str, @@ -111,8 +139,12 @@ def api( admin_key: str | None, mqtt_host: str, mqtt_port: int, + mqtt_username: str | None, + mqtt_password: str | None, mqtt_prefix: str, mqtt_tls: bool, + mqtt_transport: str, + mqtt_ws_path: str, cors_origins: str | None, metrics_enabled: bool, metrics_cache_ttl: int, @@ -161,6 +193,7 @@ def api( click.echo(f"Data home: {effective_data_home}") click.echo(f"Database: {effective_db_url}") click.echo(f"MQTT: {mqtt_host}:{mqtt_port} (prefix: {mqtt_prefix})") + click.echo(f"MQTT transport: {mqtt_transport} (ws_path: {mqtt_ws_path})") click.echo(f"Read key configured: {read_key is not None}") click.echo(f"Admin key configured: {admin_key is not None}") click.echo(f"CORS origins: {cors_origins or 'none'}") @@ -195,8 +228,12 @@ def api( admin_key=admin_key, mqtt_host=mqtt_host, mqtt_port=mqtt_port, + mqtt_username=mqtt_username, + mqtt_password=mqtt_password, mqtt_prefix=mqtt_prefix, mqtt_tls=mqtt_tls, + mqtt_transport=mqtt_transport, + mqtt_ws_path=mqtt_ws_path, cors_origins=origins_list, metrics_enabled=metrics_enabled, metrics_cache_ttl=metrics_cache_ttl, diff --git a/src/meshcore_hub/api/dependencies.py b/src/meshcore_hub/api/dependencies.py index 696b5e7..1e4dca0 100644 --- a/src/meshcore_hub/api/dependencies.py +++ b/src/meshcore_hub/api/dependencies.py @@ -56,17 +56,25 @@ def get_mqtt_client(request: Request) -> MQTTClient: """ mqtt_host = getattr(request.app.state, "mqtt_host", "localhost") mqtt_port = getattr(request.app.state, "mqtt_port", 1883) + mqtt_username = getattr(request.app.state, "mqtt_username", None) + mqtt_password = getattr(request.app.state, "mqtt_password", None) mqtt_prefix = getattr(request.app.state, "mqtt_prefix", "meshcore") mqtt_tls = getattr(request.app.state, "mqtt_tls", False) + mqtt_transport = getattr(request.app.state, "mqtt_transport", "tcp") + mqtt_ws_path = getattr(request.app.state, "mqtt_ws_path", "/mqtt") # Use unique client ID to allow multiple API instances unique_id = uuid.uuid4().hex[:8] config = MQTTConfig( host=mqtt_host, port=mqtt_port, + username=mqtt_username, + password=mqtt_password, prefix=mqtt_prefix, client_id=f"meshcore-api-{unique_id}", tls=mqtt_tls, + transport=mqtt_transport, + ws_path=mqtt_ws_path, ) client = MQTTClient(config) diff --git a/src/meshcore_hub/api/routes/nodes.py b/src/meshcore_hub/api/routes/nodes.py index 9969336..9cca78e 100644 --- a/src/meshcore_hub/api/routes/nodes.py +++ b/src/meshcore_hub/api/routes/nodes.py @@ -48,7 +48,44 @@ async def list_nodes( ) if adv_type: - query = query.where(Node.adv_type == adv_type) + normalized_adv_type = adv_type.strip().lower() + if normalized_adv_type == "repeater": + query = query.where( + or_( + Node.adv_type == "repeater", + Node.adv_type.ilike("%repeater%"), + Node.adv_type.ilike("%relay%"), + Node.name.ilike("%repeater%"), + Node.name.ilike("%relay%"), + ) + ) + elif normalized_adv_type == "companion": + query = query.where( + or_( + Node.adv_type == "companion", + Node.adv_type.ilike("%companion%"), + Node.adv_type.ilike("%observer%"), + Node.name.ilike("%companion%"), + Node.name.ilike("%observer%"), + ) + ) + elif normalized_adv_type == "room": + query = query.where( + or_( + Node.adv_type == "room", + Node.adv_type.ilike("%room%"), + Node.name.ilike("%room%"), + ) + ) + elif normalized_adv_type == "chat": + query = query.where( + or_( + Node.adv_type == "chat", + Node.adv_type.ilike("%chat%"), + ) + ) + else: + query = query.where(Node.adv_type == adv_type) if member_id: # Filter nodes that have a member_id tag with the specified value diff --git a/src/meshcore_hub/collector/cli.py b/src/meshcore_hub/collector/cli.py index 6353b92..f86f8c2 100644 --- a/src/meshcore_hub/collector/cli.py +++ b/src/meshcore_hub/collector/cli.py @@ -54,6 +54,31 @@ if TYPE_CHECKING: envvar="MQTT_TLS", help="Enable TLS/SSL for MQTT connection", ) +@click.option( + "--mqtt-transport", + type=click.Choice(["tcp", "websockets"], case_sensitive=False), + default="tcp", + envvar="MQTT_TRANSPORT", + help="MQTT transport protocol", +) +@click.option( + "--mqtt-ws-path", + type=str, + default="/mqtt", + envvar="MQTT_WS_PATH", + help="MQTT WebSocket path (used when transport=websockets)", +) +@click.option( + "--ingest-mode", + "collector_ingest_mode", + type=click.Choice(["native", "letsmesh_upload"], case_sensitive=False), + default="native", + envvar="COLLECTOR_INGEST_MODE", + help=( + "Collector ingest mode: native MeshCore events or LetsMesh upload " + "(packets/status/internal)" + ), +) @click.option( "--data-home", type=str, @@ -90,6 +115,9 @@ def collector( mqtt_password: str | None, prefix: str, mqtt_tls: bool, + mqtt_transport: str, + mqtt_ws_path: str, + collector_ingest_mode: str, data_home: str | None, seed_home: str | None, database_url: str | None, @@ -134,6 +162,9 @@ def collector( ctx.obj["mqtt_password"] = mqtt_password ctx.obj["prefix"] = prefix ctx.obj["mqtt_tls"] = mqtt_tls + ctx.obj["mqtt_transport"] = mqtt_transport + ctx.obj["mqtt_ws_path"] = mqtt_ws_path + ctx.obj["collector_ingest_mode"] = collector_ingest_mode ctx.obj["data_home"] = data_home or settings.data_home ctx.obj["seed_home"] = settings.effective_seed_home ctx.obj["database_url"] = effective_db_url @@ -149,6 +180,9 @@ def collector( mqtt_password=mqtt_password, prefix=prefix, mqtt_tls=mqtt_tls, + mqtt_transport=mqtt_transport, + mqtt_ws_path=mqtt_ws_path, + ingest_mode=collector_ingest_mode, database_url=effective_db_url, log_level=log_level, data_home=data_home or settings.data_home, @@ -163,6 +197,9 @@ def _run_collector_service( mqtt_password: str | None, prefix: str, mqtt_tls: bool, + mqtt_transport: str, + mqtt_ws_path: str, + ingest_mode: str, database_url: str, log_level: str, data_home: str, @@ -191,6 +228,8 @@ def _run_collector_service( click.echo(f"Data home: {data_home}") click.echo(f"Seed home: {seed_home}") click.echo(f"MQTT: {mqtt_host}:{mqtt_port} (prefix: {prefix})") + click.echo(f"MQTT transport: {mqtt_transport} (ws_path: {mqtt_ws_path})") + click.echo(f"Ingest mode: {ingest_mode}") click.echo(f"Database: {database_url}") # Load webhook configuration from settings @@ -198,6 +237,7 @@ def _run_collector_service( WebhookDispatcher, create_webhooks_from_settings, ) + from meshcore_hub.collector.letsmesh_decoder import LetsMeshPacketDecoder from meshcore_hub.common.config import get_collector_settings settings = get_collector_settings() @@ -234,6 +274,24 @@ def _run_collector_service( if settings.data_retention_enabled or settings.node_cleanup_enabled: click.echo(f" Interval: {settings.data_retention_interval_hours} hours") + if ingest_mode.lower() == "letsmesh_upload": + click.echo("") + click.echo("LetsMesh decode configuration:") + if settings.collector_letsmesh_decoder_enabled: + builtin_keys = len(LetsMeshPacketDecoder.BUILTIN_CHANNEL_KEYS) + env_keys = len(settings.collector_letsmesh_decoder_keys_list) + click.echo( + " Decoder: Enabled " f"({settings.collector_letsmesh_decoder_command})" + ) + click.echo(f" Built-in keys: {builtin_keys}") + click.echo(" Additional keys from .env: " f"{env_keys} configured") + click.echo( + " Timeout: " + f"{settings.collector_letsmesh_decoder_timeout_seconds:.2f}s" + ) + else: + click.echo(" Decoder: Disabled") + click.echo("") click.echo("Starting MQTT subscriber...") run_collector( @@ -243,6 +301,9 @@ def _run_collector_service( mqtt_password=mqtt_password, mqtt_prefix=prefix, mqtt_tls=mqtt_tls, + mqtt_transport=mqtt_transport, + mqtt_ws_path=mqtt_ws_path, + ingest_mode=ingest_mode, database_url=database_url, webhook_dispatcher=webhook_dispatcher, cleanup_enabled=settings.data_retention_enabled, @@ -250,6 +311,12 @@ def _run_collector_service( cleanup_interval_hours=settings.data_retention_interval_hours, node_cleanup_enabled=settings.node_cleanup_enabled, node_cleanup_days=settings.node_cleanup_days, + letsmesh_decoder_enabled=settings.collector_letsmesh_decoder_enabled, + letsmesh_decoder_command=settings.collector_letsmesh_decoder_command, + letsmesh_decoder_channel_keys=settings.collector_letsmesh_decoder_keys_list, + letsmesh_decoder_timeout_seconds=( + settings.collector_letsmesh_decoder_timeout_seconds + ), ) @@ -267,6 +334,9 @@ def run_cmd(ctx: click.Context) -> None: mqtt_password=ctx.obj["mqtt_password"], prefix=ctx.obj["prefix"], mqtt_tls=ctx.obj["mqtt_tls"], + mqtt_transport=ctx.obj["mqtt_transport"], + mqtt_ws_path=ctx.obj["mqtt_ws_path"], + ingest_mode=ctx.obj["collector_ingest_mode"], database_url=ctx.obj["database_url"], log_level=ctx.obj["log_level"], data_home=ctx.obj["data_home"], diff --git a/src/meshcore_hub/collector/handlers/advertisement.py b/src/meshcore_hub/collector/handlers/advertisement.py index bdf8adb..ef4ec28 100644 --- a/src/meshcore_hub/collector/handlers/advertisement.py +++ b/src/meshcore_hub/collector/handlers/advertisement.py @@ -14,6 +14,20 @@ from meshcore_hub.common.models import Advertisement, Node, add_event_receiver logger = logging.getLogger(__name__) +def _coerce_float(value: Any) -> float | None: + """Convert int/float/string values to float when possible.""" + if value is None: + return None + if isinstance(value, (int, float)): + return float(value) + if isinstance(value, str): + try: + return float(value.strip()) + except ValueError: + return None + return None + + def handle_advertisement( public_key: str, event_type: str, @@ -40,6 +54,22 @@ def handle_advertisement( name = payload.get("name") adv_type = payload.get("adv_type") flags = payload.get("flags") + lat = payload.get("lat") + lon = payload.get("lon") + + if lat is None: + lat = payload.get("adv_lat") + if lon is None: + lon = payload.get("adv_lon") + + location = payload.get("location") + if isinstance(location, dict): + if lat is None: + lat = location.get("latitude") + if lon is None: + lon = location.get("longitude") + lat = _coerce_float(lat) + lon = _coerce_float(lon) now = datetime.now(timezone.utc) # Compute event hash for deduplication (30-second time bucket) @@ -79,6 +109,10 @@ def handle_advertisement( node_query = select(Node).where(Node.public_key == adv_public_key) node = session.execute(node_query).scalar_one_or_none() if node: + if lat is not None: + node.lat = lat + if lon is not None: + node.lon = lon node.last_seen = now # Add this receiver to the junction table @@ -110,6 +144,10 @@ def handle_advertisement( node.adv_type = adv_type if flags is not None: node.flags = flags + if lat is not None: + node.lat = lat + if lon is not None: + node.lon = lon node.last_seen = now else: # Create new node @@ -120,6 +158,8 @@ def handle_advertisement( flags=flags, first_seen=now, last_seen=now, + lat=lat, + lon=lon, ) session.add(node) session.flush() diff --git a/src/meshcore_hub/collector/handlers/message.py b/src/meshcore_hub/collector/handlers/message.py index 95105df..0938599 100644 --- a/src/meshcore_hub/collector/handlers/message.py +++ b/src/meshcore_hub/collector/handlers/message.py @@ -70,7 +70,7 @@ def _handle_message( now = datetime.now(timezone.utc) # Extract fields based on message type - pubkey_prefix = payload.get("pubkey_prefix") if message_type == "contact" else None + pubkey_prefix = payload.get("pubkey_prefix") channel_idx = payload.get("channel_idx") if message_type == "channel" else None path_len = payload.get("path_len") txt_type = payload.get("txt_type") diff --git a/src/meshcore_hub/collector/letsmesh_decoder.py b/src/meshcore_hub/collector/letsmesh_decoder.py new file mode 100644 index 0000000..9310d56 --- /dev/null +++ b/src/meshcore_hub/collector/letsmesh_decoder.py @@ -0,0 +1,272 @@ +"""LetsMesh packet decoder integration. + +Provides an optional bridge to the external `meshcore-decoder` CLI so the +collector can turn LetsMesh upload `raw` packet hex into decoded message data. +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import shlex +import shutil +import string +import subprocess +from typing import Any, NamedTuple + +logger = logging.getLogger(__name__) + + +class LetsMeshPacketDecoder: + """Decode LetsMesh packet payloads with `meshcore-decoder` CLI.""" + + class ChannelKey(NamedTuple): + """Channel key metadata for decryption and channel labeling.""" + + label: str | None + key_hex: str + channel_hash: str + + # Built-in keys required by your deployment. + # - Public channel + # - #test channel + BUILTIN_CHANNEL_KEYS: tuple[tuple[str, str], ...] = ( + ("Public", "8B3387E9C5CDEA6AC9E5EDBAA115CD72"), + ("test", "9CD8FCF22A47333B591D96A2B848B73F"), + ) + + def __init__( + self, + enabled: bool = True, + command: str = "meshcore-decoder", + channel_keys: list[str] | None = None, + timeout_seconds: float = 2.0, + ) -> None: + self._enabled = enabled + self._command_tokens = shlex.split(command.strip()) if command.strip() else [] + self._channel_key_infos = self._normalize_channel_keys(channel_keys or []) + self._channel_keys = [info.key_hex for info in self._channel_key_infos] + self._channel_names_by_hash = { + info.channel_hash: info.label + for info in self._channel_key_infos + if info.label + } + self._decode_cache: dict[str, dict[str, Any] | None] = {} + self._decode_cache_maxsize = 2048 + self._timeout_seconds = timeout_seconds + self._checked_command = False + self._command_available = False + self._warned_unavailable = False + + @classmethod + def _normalize_channel_keys(cls, values: list[str]) -> list[ChannelKey]: + """Normalize key list (labels + key + channel hash, deduplicated).""" + normalized: list[LetsMeshPacketDecoder.ChannelKey] = [] + seen_keys: set[str] = set() + + for label, key in cls.BUILTIN_CHANNEL_KEYS: + entry = cls._normalize_channel_entry(f"{label}={key}") + if not entry: + continue + if entry.key_hex in seen_keys: + continue + normalized.append(entry) + seen_keys.add(entry.key_hex) + + for value in values: + entry = cls._normalize_channel_entry(value) + if not entry: + continue + if entry.key_hex in seen_keys: + continue + normalized.append(entry) + seen_keys.add(entry.key_hex) + + return normalized + + @classmethod + def _normalize_channel_entry(cls, value: str | None) -> ChannelKey | None: + """Normalize one key entry (`label=hex`, `label:hex`, or `hex`).""" + if value is None: + return None + + candidate = value.strip() + if not candidate: + return None + + label: str | None = None + key_candidate = candidate + for separator in ("=", ":"): + if separator not in candidate: + continue + left, right = candidate.split(separator, 1) + right = right.strip() + right = right.removeprefix("0x").removeprefix("0X").strip() + if right and cls._is_hex(right): + label = left.strip().lstrip("#") + key_candidate = right + break + + key_candidate = key_candidate.strip() + key_candidate = key_candidate.removeprefix("0x").removeprefix("0X").strip() + if not key_candidate or not cls._is_hex(key_candidate): + return None + + key_hex = key_candidate.upper() + channel_hash = cls._compute_channel_hash(key_hex) + normalized_label = label.strip() if label and label.strip() else None + return cls.ChannelKey( + label=normalized_label, + key_hex=key_hex, + channel_hash=channel_hash, + ) + + @staticmethod + def _is_hex(value: str) -> bool: + """Return True if string contains only hex digits.""" + return bool(value) and all(char in string.hexdigits for char in value) + + @staticmethod + def _compute_channel_hash(key_hex: str) -> str: + """Compute channel hash (first byte of SHA-256 of channel key).""" + return hashlib.sha256(bytes.fromhex(key_hex)).digest()[:1].hex().upper() + + def channel_name_from_decoded( + self, + decoded_packet: dict[str, Any] | None, + ) -> str | None: + """Resolve channel label from decoded payload channel hash.""" + if not isinstance(decoded_packet, dict): + return None + + payload = decoded_packet.get("payload") + if not isinstance(payload, dict): + return None + + decoded = payload.get("decoded") + if not isinstance(decoded, dict): + return None + + channel_hash = decoded.get("channelHash") + if not isinstance(channel_hash, str): + return None + + return self._channel_names_by_hash.get(channel_hash.upper()) + + def channel_labels_by_index(self) -> dict[int, str]: + """Return channel labels keyed by numeric channel index (0-255).""" + labels: dict[int, str] = {} + for info in self._channel_key_infos: + if not info.label: + continue + + label = info.label.strip() + if not label: + continue + + if label.lower() == "public": + normalized_label = "Public" + else: + normalized_label = label if label.startswith("#") else f"#{label}" + + channel_idx = int(info.channel_hash, 16) + labels.setdefault(channel_idx, normalized_label) + + return labels + + def decode_payload(self, payload: dict[str, Any]) -> dict[str, Any] | None: + """Decode packet payload `raw` hex and return decoded JSON if available.""" + raw_hex = payload.get("raw") + if not isinstance(raw_hex, str): + return None + clean_hex = raw_hex.strip() + if not clean_hex: + return None + cached = self._decode_cache.get(clean_hex) + if clean_hex in self._decode_cache: + return cached + + decoded = self._decode_raw(clean_hex) + self._decode_cache[clean_hex] = decoded + if len(self._decode_cache) > self._decode_cache_maxsize: + # Drop oldest cached payload (insertion-order dict). + self._decode_cache.pop(next(iter(self._decode_cache))) + return decoded + + def _decode_raw(self, raw_hex: str) -> dict[str, Any] | None: + """Decode raw packet hex with decoder CLI (cached per packet hex).""" + if not self._enabled: + return None + if not self._is_command_available(): + return None + + command = [*self._command_tokens, "decode", raw_hex, "--json"] + if self._channel_keys: + command.append("--key") + command.extend(self._channel_keys) + + try: + result = subprocess.run( + command, + check=False, + capture_output=True, + text=True, + timeout=self._timeout_seconds, + ) + except subprocess.TimeoutExpired: + logger.debug( + "LetsMesh decoder timed out after %.2fs", + self._timeout_seconds, + ) + return None + except OSError as exc: + logger.debug("LetsMesh decoder failed to execute: %s", exc) + return None + + if result.returncode != 0: + stderr = result.stderr.strip() if result.stderr else "" + logger.debug( + "LetsMesh decoder exited with code %s%s", + result.returncode, + f": {stderr}" if stderr else "", + ) + return None + + output = result.stdout.strip() + if not output: + return None + + try: + decoded = json.loads(output) + except json.JSONDecodeError: + logger.debug("LetsMesh decoder returned non-JSON output") + return None + + return decoded if isinstance(decoded, dict) else None + + def _is_command_available(self) -> bool: + """Check decoder command availability once.""" + if self._checked_command: + return self._command_available + + self._checked_command = True + if not self._command_tokens: + self._command_available = False + else: + command = self._command_tokens[0] + if "/" in command: + self._command_available = shutil.which(command) is not None + else: + self._command_available = shutil.which(command) is not None + + if not self._command_available and not self._warned_unavailable: + self._warned_unavailable = True + command_text = " ".join(self._command_tokens) or "" + logger.warning( + "LetsMesh decoder command not found (%s). " + "Messages will remain encrypted placeholders until decoder is installed.", + command_text, + ) + + return self._command_available diff --git a/src/meshcore_hub/collector/subscriber.py b/src/meshcore_hub/collector/subscriber.py index 0fed551..9b72582 100644 --- a/src/meshcore_hub/collector/subscriber.py +++ b/src/meshcore_hub/collector/subscriber.py @@ -21,6 +21,7 @@ from typing import Any, Callable, Optional, TYPE_CHECKING from meshcore_hub.common.database import DatabaseManager from meshcore_hub.common.health import HealthReporter from meshcore_hub.common.mqtt import MQTTClient, MQTTConfig +from meshcore_hub.collector.letsmesh_decoder import LetsMeshPacketDecoder if TYPE_CHECKING: from meshcore_hub.collector.webhook import WebhookDispatcher @@ -35,6 +36,9 @@ EventHandler = Callable[[str, str, dict[str, Any], DatabaseManager], None] class Subscriber: """MQTT Subscriber for collecting and storing MeshCore events.""" + INGEST_MODE_NATIVE = "native" + INGEST_MODE_LETSMESH_UPLOAD = "letsmesh_upload" + def __init__( self, mqtt_client: MQTTClient, @@ -45,6 +49,11 @@ class Subscriber: cleanup_interval_hours: int = 24, node_cleanup_enabled: bool = False, node_cleanup_days: int = 90, + ingest_mode: str = INGEST_MODE_NATIVE, + letsmesh_decoder_enabled: bool = True, + letsmesh_decoder_command: str = "meshcore-decoder", + letsmesh_decoder_channel_keys: list[str] | None = None, + letsmesh_decoder_timeout_seconds: float = 2.0, ): """Initialize subscriber. @@ -57,6 +66,11 @@ class Subscriber: cleanup_interval_hours: Hours between cleanup runs node_cleanup_enabled: Enable automatic cleanup of inactive nodes node_cleanup_days: Remove nodes not seen for this many days + ingest_mode: Ingest mode ('native' or 'letsmesh_upload') + letsmesh_decoder_enabled: Enable external LetsMesh packet decoder + letsmesh_decoder_command: Decoder CLI command + letsmesh_decoder_channel_keys: Optional channel keys for decrypting group text + letsmesh_decoder_timeout_seconds: Decoder CLI timeout """ self.mqtt = mqtt_client self.db = db_manager @@ -79,6 +93,18 @@ class Subscriber: self._node_cleanup_days = node_cleanup_days self._cleanup_thread: Optional[threading.Thread] = None self._last_cleanup: Optional[datetime] = None + self._ingest_mode = ingest_mode.lower() + if self._ingest_mode not in { + self.INGEST_MODE_NATIVE, + self.INGEST_MODE_LETSMESH_UPLOAD, + }: + raise ValueError(f"Unsupported collector ingest mode: {ingest_mode}") + self._letsmesh_decoder = LetsMeshPacketDecoder( + enabled=letsmesh_decoder_enabled, + command=letsmesh_decoder_command, + channel_keys=letsmesh_decoder_channel_keys, + timeout_seconds=letsmesh_decoder_timeout_seconds, + ) @property def is_healthy(self) -> bool: @@ -125,14 +151,702 @@ class Subscriber: pattern: Subscription pattern payload: Message payload """ - # Parse event from topic - parsed = self.mqtt.topic_builder.parse_event_topic(topic) + parsed: tuple[str, str, dict[str, Any]] | None + if self._ingest_mode == self.INGEST_MODE_LETSMESH_UPLOAD: + parsed = self._normalize_letsmesh_event(topic, payload) + else: + parsed_event = self.mqtt.topic_builder.parse_event_topic(topic) + parsed = ( + (parsed_event[0], parsed_event[1], payload) if parsed_event else None + ) + if not parsed: - logger.warning(f"Could not parse event topic: {topic}") + logger.warning( + "Could not parse topic for ingest mode %s: %s", + self._ingest_mode, + topic, + ) return - public_key, event_type = parsed - logger.debug(f"Received event: {event_type} from {public_key[:12]}...") + public_key, event_type, normalized_payload = parsed + logger.debug("Received event: %s from %s...", event_type, public_key[:12]) + self._dispatch_event(public_key, event_type, normalized_payload) + + def _normalize_letsmesh_event( + self, + topic: str, + payload: dict[str, Any], + ) -> tuple[str, str, dict[str, Any]] | None: + """Normalize LetsMesh upload topics to collector event handlers.""" + parsed = self.mqtt.topic_builder.parse_letsmesh_upload_topic(topic) + if not parsed: + return None + + observer_public_key, feed_type = parsed + + if feed_type == "status": + status_public_key = ( + payload.get("origin_id") + or payload.get("public_key") + or observer_public_key + ) + normalized_payload = dict(payload) + normalized_payload["public_key"] = status_public_key + + status_name = payload.get("origin") or payload.get("name") + if status_name and not normalized_payload.get("name"): + normalized_payload["name"] = status_name + + normalized_adv_type = self._normalize_letsmesh_adv_type(normalized_payload) + if normalized_adv_type: + normalized_payload["adv_type"] = normalized_adv_type + else: + normalized_payload.pop("adv_type", None) + + stats = payload.get("stats") + if ( + isinstance(stats, dict) + and "flags" not in normalized_payload + and "debug_flags" in stats + ): + normalized_payload["flags"] = stats["debug_flags"] + + return observer_public_key, "advertisement", normalized_payload + + if feed_type == "packets": + decoded_packet = self._letsmesh_decoder.decode_payload(payload) + + normalized_message = self._build_letsmesh_message_payload( + payload, + decoded_packet=decoded_packet, + ) + if normalized_message: + event_type, message_payload = normalized_message + return observer_public_key, event_type, message_payload + + normalized_advertisement = self._build_letsmesh_advertisement_payload( + payload, + decoded_packet=decoded_packet, + ) + if normalized_advertisement: + return observer_public_key, "advertisement", normalized_advertisement + + normalized_packet_payload = dict(payload) + if decoded_packet: + normalized_packet_payload["decoded_packet"] = decoded_packet + decoded_payload_type = self._extract_letsmesh_decoder_payload_type( + decoded_packet + ) + if decoded_payload_type is not None: + normalized_packet_payload["decoded_payload_type"] = ( + decoded_payload_type + ) + return observer_public_key, "letsmesh_packet", normalized_packet_payload + + if feed_type == "internal": + return observer_public_key, "letsmesh_internal", payload + + return None + + def _build_letsmesh_message_payload( + self, + payload: dict[str, Any], + decoded_packet: dict[str, Any] | None = None, + ) -> tuple[str, dict[str, Any]] | None: + """Build a message payload from LetsMesh packet data when possible.""" + packet_type = self._resolve_letsmesh_packet_type(payload, decoded_packet) + if packet_type == 5: + event_type = "channel_msg_recv" + elif packet_type in {1, 2, 7}: + event_type = "contact_msg_recv" + else: + return None + + normalized_payload = dict(payload) + packet_hash = payload.get("hash") + packet_hash_text = packet_hash if isinstance(packet_hash, str) else None + if decoded_packet is None: + decoded_packet = self._letsmesh_decoder.decode_payload(payload) + + # In LetsMesh compatibility mode, only show messages that decrypt. + text = self._extract_letsmesh_decoder_text(decoded_packet) + if not text: + logger.debug( + "Skipping LetsMesh packet %s (type=%s): no decryptable text payload", + packet_hash_text or "unknown", + packet_type, + ) + return None + + txt_type = self._parse_int(payload.get("txt_type")) + if txt_type is None: + txt_type = self._extract_letsmesh_decoder_txt_type(decoded_packet) + normalized_payload["txt_type"] = ( + txt_type if txt_type is not None else packet_type + ) + normalized_payload["signature"] = payload.get("signature") or packet_hash + path_len = self._parse_path_length(payload.get("path")) + if path_len is None: + path_len = self._extract_letsmesh_decoder_path_length(decoded_packet) + normalized_payload["path_len"] = path_len + + sender_timestamp = self._parse_sender_timestamp(payload) + if sender_timestamp is None: + sender_timestamp = self._extract_letsmesh_decoder_sender_timestamp( + decoded_packet + ) + if sender_timestamp is not None: + normalized_payload["sender_timestamp"] = sender_timestamp + + snr = self._parse_float(payload.get("SNR")) + if snr is None: + snr = self._parse_float(payload.get("snr")) + if snr is not None: + normalized_payload["SNR"] = snr + + decoded_sender = self._extract_letsmesh_decoder_sender( + decoded_packet, + packet_type=packet_type, + ) + sender_name = self._normalize_sender_name(decoded_sender) + if sender_name: + normalized_payload["sender_name"] = sender_name + + if decoded_sender and not normalized_payload.get("pubkey_prefix"): + normalized_prefix = self._normalize_pubkey_prefix(decoded_sender) + if normalized_prefix: + normalized_payload["pubkey_prefix"] = normalized_prefix + + if not normalized_payload.get("pubkey_prefix"): + fallback_sender = self._extract_letsmesh_sender_from_payload(payload) + if fallback_sender: + normalized_payload["pubkey_prefix"] = fallback_sender + + sender_prefix = self._normalize_pubkey_prefix( + normalized_payload.get("pubkey_prefix") + ) + if sender_prefix: + normalized_payload["pubkey_prefix"] = sender_prefix + else: + normalized_payload.pop("pubkey_prefix", None) + + channel_idx = self._parse_int(payload.get("channel_idx")) + channel_hash = self._extract_letsmesh_decoder_channel_hash(decoded_packet) + if channel_idx is None and channel_hash: + channel_idx = self._parse_channel_hash_idx(channel_hash) + if channel_idx is not None: + normalized_payload["channel_idx"] = channel_idx + + if event_type == "channel_msg_recv": + channel_name = self._letsmesh_decoder.channel_name_from_decoded( + decoded_packet + ) + channel_label = self._format_channel_label( + channel_name=channel_name, + channel_hash=channel_hash, + channel_idx=channel_idx, + ) + if channel_label: + normalized_payload["channel_name"] = channel_label + normalized_payload["text"] = self._prefix_sender_name( + text, + normalized_payload.get("sender_name"), + ) + else: + normalized_payload["text"] = self._prefix_sender_name( + text, + normalized_payload.get("sender_name"), + ) + + return event_type, normalized_payload + + def _build_letsmesh_advertisement_payload( + self, + payload: dict[str, Any], + decoded_packet: dict[str, Any] | None = None, + ) -> dict[str, Any] | None: + """Map decoded LetsMesh packet payloads to advertisement events.""" + if decoded_packet is None: + decoded_packet = self._letsmesh_decoder.decode_payload(payload) + if not isinstance(decoded_packet, dict): + return None + + decoded_payload_type = self._extract_letsmesh_decoder_payload_type( + decoded_packet + ) + # Primary packet forms that carry node identity/role/location metadata. + if decoded_payload_type not in {4, 11}: + return None + + decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet) + if not decoded_payload: + return None + + public_key = self._normalize_full_public_key( + decoded_payload.get("publicKey") + or payload.get("public_key") + or payload.get("origin_id") + ) + if not public_key: + return None + + normalized_payload: dict[str, Any] = { + "public_key": public_key, + } + + app_data = decoded_payload.get("appData") + if isinstance(app_data, dict): + name = app_data.get("name") + if isinstance(name, str) and name.strip(): + normalized_payload["name"] = name.strip() + + flags = self._parse_int(app_data.get("flags")) + if flags is not None: + normalized_payload["flags"] = flags + + device_role = app_data.get("deviceRole") + role_name = self._normalize_letsmesh_node_type(device_role) + if role_name: + normalized_payload["adv_type"] = role_name + + location = app_data.get("location") + if isinstance(location, dict): + lat = self._parse_float(location.get("latitude")) + lon = self._parse_float(location.get("longitude")) + if lat is not None: + normalized_payload["lat"] = lat + if lon is not None: + normalized_payload["lon"] = lon + + if "name" not in normalized_payload: + status_name = payload.get("origin") or payload.get("name") + if isinstance(status_name, str) and status_name.strip(): + normalized_payload["name"] = status_name.strip() + + if "flags" not in normalized_payload: + raw_flags = self._parse_int(decoded_payload.get("rawFlags")) + if raw_flags is not None: + normalized_payload["flags"] = raw_flags + + if "adv_type" not in normalized_payload: + node_type = self._normalize_letsmesh_node_type( + decoded_payload.get("nodeType") + ) + node_type_name = self._normalize_letsmesh_node_type( + decoded_payload.get("nodeTypeName") + ) + normalized_adv_type = ( + node_type + or node_type_name + or self._normalize_letsmesh_adv_type(normalized_payload) + ) + if normalized_adv_type: + normalized_payload["adv_type"] = normalized_adv_type + + return normalized_payload + + @classmethod + def _extract_letsmesh_text( + cls, + payload: dict[str, Any], + depth: int = 3, + ) -> str | None: + """Extract text from possible LetsMesh packet payload fields.""" + if depth < 0: + return None + + for key in ("text", "message", "msg", "body", "content"): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + + for nested in payload.values(): + if not isinstance(nested, dict): + continue + text = cls._extract_letsmesh_text(nested, depth=depth - 1) + if text: + return text + + return None + + @classmethod + def _extract_letsmesh_decoder_text( + cls, + decoded_packet: dict[str, Any] | None, + ) -> str | None: + """Extract human-readable text from decoder JSON output.""" + if not isinstance(decoded_packet, dict): + return None + payload = decoded_packet.get("payload") + if not isinstance(payload, dict): + return None + return cls._extract_letsmesh_text(payload) + + @classmethod + def _extract_letsmesh_decoder_sender_timestamp( + cls, + decoded_packet: dict[str, Any] | None, + ) -> int | None: + """Extract sender timestamp from decoder JSON output.""" + if not isinstance(decoded_packet, dict): + return None + payload = decoded_packet.get("payload") + if not isinstance(payload, dict): + return None + decoded = payload.get("decoded") + if not isinstance(decoded, dict): + return None + decrypted = decoded.get("decrypted") + if not isinstance(decrypted, dict): + return None + return cls._parse_int(decrypted.get("timestamp")) + + @classmethod + def _extract_letsmesh_decoder_sender( + cls, + decoded_packet: dict[str, Any] | None, + packet_type: int | None = None, + ) -> str | None: + """Extract sender identifier from decoder JSON output.""" + if not isinstance(decoded_packet, dict): + return None + payload = decoded_packet.get("payload") + if not isinstance(payload, dict): + return None + decoded = payload.get("decoded") + if not isinstance(decoded, dict): + return None + decrypted = decoded.get("decrypted") + if not isinstance(decrypted, dict): + return None + sender = decrypted.get("sender") + if isinstance(sender, str) and sender.strip(): + return sender.strip() + + source_hash = decoded.get("sourceHash") + if isinstance(source_hash, str) and source_hash.strip(): + return source_hash.strip() + return None + + @staticmethod + def _extract_letsmesh_decoder_payload( + decoded_packet: dict[str, Any] | None, + ) -> dict[str, Any] | None: + """Extract decoded packet payload object.""" + if not isinstance(decoded_packet, dict): + return None + payload = decoded_packet.get("payload") + if not isinstance(payload, dict): + return None + decoded = payload.get("decoded") + return decoded if isinstance(decoded, dict) else None + + @classmethod + def _extract_letsmesh_decoder_payload_type( + cls, + decoded_packet: dict[str, Any] | None, + ) -> int | None: + """Extract payload type from decoder output.""" + if not isinstance(decoded_packet, dict): + return None + payload_type = cls._parse_int(decoded_packet.get("payloadType")) + if payload_type is not None: + return payload_type + decoded = cls._extract_letsmesh_decoder_payload(decoded_packet) + if not decoded: + return None + return cls._parse_int(decoded.get("type")) + + @classmethod + def _resolve_letsmesh_packet_type( + cls, + payload: dict[str, Any], + decoded_packet: dict[str, Any] | None = None, + ) -> int | None: + """Resolve packet type from source payload with decoder fallback.""" + packet_type = cls._parse_int(payload.get("packet_type")) + if packet_type is not None: + return packet_type + return cls._extract_letsmesh_decoder_payload_type(decoded_packet) + + @staticmethod + def _extract_letsmesh_sender_from_payload(payload: dict[str, Any]) -> str | None: + """Extract sender-like identifiers from LetsMesh upload payload fields.""" + for key in ( + "pubkey_prefix", + "sourceHash", + "source_hash", + "source", + "sender", + "from", + "src", + ): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return None + + @classmethod + def _extract_letsmesh_decoder_txt_type( + cls, + decoded_packet: dict[str, Any] | None, + ) -> int | None: + """Extract txt_type equivalent from decoder output.""" + if not isinstance(decoded_packet, dict): + return None + return cls._parse_int(decoded_packet.get("payloadType")) + + @classmethod + def _extract_letsmesh_decoder_path_length( + cls, + decoded_packet: dict[str, Any] | None, + ) -> int | None: + """Extract path length from decoder output.""" + if not isinstance(decoded_packet, dict): + return None + return cls._parse_int(decoded_packet.get("pathLength")) + + @classmethod + def _extract_letsmesh_decoder_channel_hash( + cls, + decoded_packet: dict[str, Any] | None, + ) -> str | None: + """Extract channel hash (1-byte hex) from decoder output.""" + if not isinstance(decoded_packet, dict): + return None + payload = decoded_packet.get("payload") + if not isinstance(payload, dict): + return None + decoded = payload.get("decoded") + if not isinstance(decoded, dict): + return None + channel_hash = decoded.get("channelHash") + if not isinstance(channel_hash, str): + return None + normalized = channel_hash.strip().upper() + if len(normalized) != 2: + return None + if any(ch not in "0123456789ABCDEF" for ch in normalized): + return None + return normalized + + @staticmethod + def _normalize_full_public_key(value: Any) -> str | None: + """Normalize full node public key (64 hex chars).""" + if not isinstance(value, str): + return None + normalized = value.strip().removeprefix("0x").removeprefix("0X").upper() + if len(normalized) != 64: + return None + if any(ch not in "0123456789ABCDEF" for ch in normalized): + return None + return normalized + + @staticmethod + def _normalize_pubkey_prefix(value: Any) -> str | None: + """Normalize sender key/prefix to 12 uppercase hex characters.""" + if not isinstance(value, str): + return None + normalized = value.strip().removeprefix("0x").removeprefix("0X").upper() + if not normalized: + return None + if any(ch not in "0123456789ABCDEF" for ch in normalized): + return None + if len(normalized) < 8: + return None + return normalized[:12] + + @staticmethod + def _parse_channel_hash_idx(channel_hash: str) -> int | None: + """Convert 1-byte channel hash hex string into a stable numeric index.""" + normalized = channel_hash.strip().upper() + if len(normalized) != 2: + return None + if any(ch not in "0123456789ABCDEF" for ch in normalized): + return None + return int(normalized, 16) + + @staticmethod + def _format_channel_label( + channel_name: str | None, + channel_hash: str | None, + channel_idx: int | None, + ) -> str | None: + """Format a display label for channel messages.""" + if channel_name and channel_name.strip(): + cleaned = channel_name.strip() + if cleaned.lower() == "public": + return "Public" + return cleaned if cleaned.startswith("#") else f"#{cleaned}" + if channel_idx is not None: + return f"Ch {channel_idx}" + if channel_hash: + return f"Ch {channel_hash.upper()}" + return None + + @staticmethod + def _prefix_channel_label(text: str, channel_label: str | None) -> str: + """Prefix channel label to message text for LetsMesh channel feeds.""" + if not channel_label: + return text + prefix = f"[{channel_label}] " + if text.startswith(prefix): + return text + return f"{prefix}{text}" + + @classmethod + def _normalize_sender_name(cls, value: Any) -> str | None: + """Normalize human sender names from decoder output.""" + if not isinstance(value, str): + return None + normalized = value.strip() + if not normalized: + return None + if cls._normalize_pubkey_prefix(normalized): + return None + return normalized + + @staticmethod + def _prefix_sender_name(text: str, sender_name: Any) -> str: + """Prefix sender name when available and not already present.""" + if not isinstance(sender_name, str): + return text + sender = sender_name.strip() + if not sender: + return text + lower_text = text.lstrip().lower() + prefix = f"{sender}:" + if lower_text.startswith(prefix.lower()): + return text + return f"{sender}: {text}" + + @staticmethod + def _normalize_letsmesh_adv_type(payload: dict[str, Any]) -> str | None: + """Map LetsMesh status fields to canonical node types.""" + candidates: list[str] = [] + for key in ("adv_type", "type", "node_type", "role", "mode", "status"): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + candidates.append(value.strip().lower()) + + for key in ("origin", "name", "model"): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + candidates.append(value.strip().lower()) + + if not candidates: + return None + + normalized = " ".join(candidates) + if any(token in normalized for token in ("room server", "roomserver", "room")): + return "room" + if any(token in normalized for token in ("repeater", "relay")): + return "repeater" + if any(token in normalized for token in ("companion", "observer")): + return "companion" + if "chat" in normalized: + return "chat" + + # Preserve existing canonical values when they are already set. + for candidate in candidates: + if candidate in {"chat", "repeater", "room", "companion"}: + return candidate + + return None + + @classmethod + def _normalize_letsmesh_node_type(cls, value: Any) -> str | None: + """Normalize LetsMesh node-type values to canonical adv_type values.""" + if value is None: + return None + + if isinstance(value, (int, float)): + numeric = int(value) + if numeric == 0: + return None + if numeric == 1: + return "chat" + if numeric == 2: + return "repeater" + if numeric == 3: + return "room" + if numeric == 4: + return "companion" + return None + + if isinstance(value, str): + normalized = value.strip() + if not normalized: + return None + return cls._normalize_letsmesh_adv_type({"type": normalized}) + + return None + + @staticmethod + def _parse_int(value: Any) -> int | None: + """Parse int-like values safely.""" + if value is None: + return None + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) + if isinstance(value, str): + try: + return int(value) + except ValueError: + return None + return None + + @staticmethod + def _parse_float(value: Any) -> float | None: + """Parse float-like values safely.""" + if value is None: + return None + if isinstance(value, (int, float)): + return float(value) + if isinstance(value, str): + try: + return float(value) + except ValueError: + return None + return None + + @classmethod + def _parse_path_length(cls, value: Any) -> int | None: + """Parse path length from list or packed hex string.""" + if value is None: + return None + if isinstance(value, list): + return len(value) + if isinstance(value, str): + path = value.strip() + if not path: + return None + return len(path) // 2 if len(path) % 2 == 0 else len(path) + return cls._parse_int(value) + + @staticmethod + def _parse_sender_timestamp(payload: dict[str, Any]) -> int | None: + """Parse sender timestamp from known LetsMesh fields.""" + sender_ts = payload.get("sender_timestamp") + if isinstance(sender_ts, (int, float)): + return int(sender_ts) + if isinstance(sender_ts, str): + try: + return int(float(sender_ts)) + except ValueError: + return None + + return None + + def _dispatch_event( + self, + public_key: str, + event_type: str, + payload: dict[str, Any], + ) -> None: + """Route a normalized event to the appropriate handler.""" # Find and call handler handler = self._handlers.get(event_type) @@ -358,10 +1072,20 @@ class Subscriber: logger.error(f"Failed to connect to MQTT broker: {e}") raise - # Subscribe to all event topics - event_topic = self.mqtt.topic_builder.all_events_topic() - self.mqtt.subscribe(event_topic, self._handle_mqtt_message) - logger.info(f"Subscribed to event topic: {event_topic}") + # Subscribe to topics based on ingest mode + if self._ingest_mode == self.INGEST_MODE_LETSMESH_UPLOAD: + letsmesh_topics = [ + f"{self.mqtt.topic_builder.prefix}/+/packets", + f"{self.mqtt.topic_builder.prefix}/+/status", + f"{self.mqtt.topic_builder.prefix}/+/internal", + ] + for letsmesh_topic in letsmesh_topics: + self.mqtt.subscribe(letsmesh_topic, self._handle_mqtt_message) + logger.info(f"Subscribed to LetsMesh upload topic: {letsmesh_topic}") + else: + event_topic = self.mqtt.topic_builder.all_events_topic() + self.mqtt.subscribe(event_topic, self._handle_mqtt_message) + logger.info(f"Subscribed to event topic: {event_topic}") self._running = True @@ -429,6 +1153,9 @@ def create_subscriber( mqtt_password: Optional[str] = None, mqtt_prefix: str = "meshcore", mqtt_tls: bool = False, + mqtt_transport: str = "tcp", + mqtt_ws_path: str = "/mqtt", + ingest_mode: str = "native", database_url: str = "sqlite:///./meshcore.db", webhook_dispatcher: Optional["WebhookDispatcher"] = None, cleanup_enabled: bool = False, @@ -436,6 +1163,10 @@ def create_subscriber( cleanup_interval_hours: int = 24, node_cleanup_enabled: bool = False, node_cleanup_days: int = 90, + letsmesh_decoder_enabled: bool = True, + letsmesh_decoder_command: str = "meshcore-decoder", + letsmesh_decoder_channel_keys: list[str] | None = None, + letsmesh_decoder_timeout_seconds: float = 2.0, ) -> Subscriber: """Create a configured subscriber instance. @@ -446,6 +1177,9 @@ def create_subscriber( mqtt_password: MQTT password mqtt_prefix: MQTT topic prefix mqtt_tls: Enable TLS/SSL for MQTT connection + mqtt_transport: MQTT transport protocol (tcp or websockets) + mqtt_ws_path: WebSocket path (used when transport=websockets) + ingest_mode: Ingest mode ('native' or 'letsmesh_upload') database_url: Database connection URL webhook_dispatcher: Optional webhook dispatcher for event forwarding cleanup_enabled: Enable automatic event data cleanup @@ -453,6 +1187,10 @@ def create_subscriber( cleanup_interval_hours: Hours between cleanup runs node_cleanup_enabled: Enable automatic cleanup of inactive nodes node_cleanup_days: Remove nodes not seen for this many days + letsmesh_decoder_enabled: Enable external LetsMesh packet decoder + letsmesh_decoder_command: Decoder CLI command + letsmesh_decoder_channel_keys: Optional channel keys for decrypting group text + letsmesh_decoder_timeout_seconds: Decoder CLI timeout Returns: Configured Subscriber instance @@ -467,6 +1205,8 @@ def create_subscriber( prefix=mqtt_prefix, client_id=f"meshcore-collector-{unique_id}", tls=mqtt_tls, + transport=mqtt_transport, + ws_path=mqtt_ws_path, ) mqtt_client = MQTTClient(mqtt_config) @@ -483,6 +1223,11 @@ def create_subscriber( cleanup_interval_hours=cleanup_interval_hours, node_cleanup_enabled=node_cleanup_enabled, node_cleanup_days=node_cleanup_days, + ingest_mode=ingest_mode, + letsmesh_decoder_enabled=letsmesh_decoder_enabled, + letsmesh_decoder_command=letsmesh_decoder_command, + letsmesh_decoder_channel_keys=letsmesh_decoder_channel_keys, + letsmesh_decoder_timeout_seconds=letsmesh_decoder_timeout_seconds, ) # Register handlers @@ -500,6 +1245,9 @@ def run_collector( mqtt_password: Optional[str] = None, mqtt_prefix: str = "meshcore", mqtt_tls: bool = False, + mqtt_transport: str = "tcp", + mqtt_ws_path: str = "/mqtt", + ingest_mode: str = "native", database_url: str = "sqlite:///./meshcore.db", webhook_dispatcher: Optional["WebhookDispatcher"] = None, cleanup_enabled: bool = False, @@ -507,6 +1255,10 @@ def run_collector( cleanup_interval_hours: int = 24, node_cleanup_enabled: bool = False, node_cleanup_days: int = 90, + letsmesh_decoder_enabled: bool = True, + letsmesh_decoder_command: str = "meshcore-decoder", + letsmesh_decoder_channel_keys: list[str] | None = None, + letsmesh_decoder_timeout_seconds: float = 2.0, ) -> None: """Run the collector (blocking). @@ -517,6 +1269,9 @@ def run_collector( mqtt_password: MQTT password mqtt_prefix: MQTT topic prefix mqtt_tls: Enable TLS/SSL for MQTT connection + mqtt_transport: MQTT transport protocol (tcp or websockets) + mqtt_ws_path: WebSocket path (used when transport=websockets) + ingest_mode: Ingest mode ('native' or 'letsmesh_upload') database_url: Database connection URL webhook_dispatcher: Optional webhook dispatcher for event forwarding cleanup_enabled: Enable automatic event data cleanup @@ -524,6 +1279,10 @@ def run_collector( cleanup_interval_hours: Hours between cleanup runs node_cleanup_enabled: Enable automatic cleanup of inactive nodes node_cleanup_days: Remove nodes not seen for this many days + letsmesh_decoder_enabled: Enable external LetsMesh packet decoder + letsmesh_decoder_command: Decoder CLI command + letsmesh_decoder_channel_keys: Optional channel keys for decrypting group text + letsmesh_decoder_timeout_seconds: Decoder CLI timeout """ subscriber = create_subscriber( mqtt_host=mqtt_host, @@ -532,6 +1291,9 @@ def run_collector( mqtt_password=mqtt_password, mqtt_prefix=mqtt_prefix, mqtt_tls=mqtt_tls, + mqtt_transport=mqtt_transport, + mqtt_ws_path=mqtt_ws_path, + ingest_mode=ingest_mode, database_url=database_url, webhook_dispatcher=webhook_dispatcher, cleanup_enabled=cleanup_enabled, @@ -539,6 +1301,10 @@ def run_collector( cleanup_interval_hours=cleanup_interval_hours, node_cleanup_enabled=node_cleanup_enabled, node_cleanup_days=node_cleanup_days, + letsmesh_decoder_enabled=letsmesh_decoder_enabled, + letsmesh_decoder_command=letsmesh_decoder_command, + letsmesh_decoder_channel_keys=letsmesh_decoder_channel_keys, + letsmesh_decoder_timeout_seconds=letsmesh_decoder_timeout_seconds, ) # Set up signal handlers diff --git a/src/meshcore_hub/common/config.py b/src/meshcore_hub/common/config.py index 96e732e..c8ad910 100644 --- a/src/meshcore_hub/common/config.py +++ b/src/meshcore_hub/common/config.py @@ -1,6 +1,7 @@ """Pydantic Settings for MeshCore Hub configuration.""" from enum import Enum +import re from typing import Optional from pydantic import Field, field_validator @@ -24,6 +25,20 @@ class InterfaceMode(str, Enum): SENDER = "SENDER" +class MQTTTransport(str, Enum): + """MQTT transport type.""" + + TCP = "tcp" + WEBSOCKETS = "websockets" + + +class CollectorIngestMode(str, Enum): + """Collector MQTT ingest mode.""" + + NATIVE = "native" + LETSMESH_UPLOAD = "letsmesh_upload" + + class CommonSettings(BaseSettings): """Common settings shared by all components.""" @@ -55,6 +70,14 @@ class CommonSettings(BaseSettings): mqtt_tls: bool = Field( default=False, description="Enable TLS/SSL for MQTT connection" ) + mqtt_transport: MQTTTransport = Field( + default=MQTTTransport.TCP, + description="MQTT transport protocol (tcp or websockets)", + ) + mqtt_ws_path: str = Field( + default="/mqtt", + description="WebSocket path for MQTT transport (used when MQTT_TRANSPORT=websockets)", + ) class InterfaceSettings(CommonSettings): @@ -162,6 +185,42 @@ class CollectorSettings(CommonSettings): description="Remove nodes not seen for this many days (last_seen)", ge=1, ) + collector_ingest_mode: CollectorIngestMode = Field( + default=CollectorIngestMode.NATIVE, + description=( + "Collector MQTT ingest mode. " + "'native' expects //event/. " + "'letsmesh_upload' expects LetsMesh observer uploads on " + "//(packets|status|internal)." + ), + ) + collector_letsmesh_decoder_enabled: bool = Field( + default=True, + description=( + "Enable external LetsMesh packet decoding via meshcore-decoder. " + "Only applies when COLLECTOR_INGEST_MODE=letsmesh_upload." + ), + ) + collector_letsmesh_decoder_command: str = Field( + default="meshcore-decoder", + description=( + "Command used to run LetsMesh packet decoder CLI " + "(for example: meshcore-decoder, /usr/local/bin/meshcore-decoder, " + "or 'npx meshcore-decoder')." + ), + ) + collector_letsmesh_decoder_keys: Optional[str] = Field( + default=None, + description=( + "Optional channel secret keys for LetsMesh message decryption. " + "Provide as comma/space separated hex values." + ), + ) + collector_letsmesh_decoder_timeout_seconds: float = Field( + default=2.0, + description="Timeout in seconds for each decoder invocation.", + ge=0.1, + ) @property def collector_data_dir(self) -> str: @@ -201,6 +260,17 @@ class CollectorSettings(CommonSettings): return str(Path(self.effective_seed_home) / "members.yaml") + @property + def collector_letsmesh_decoder_keys_list(self) -> list[str]: + """Parse configured LetsMesh decoder keys into a normalized list.""" + if not self.collector_letsmesh_decoder_keys: + return [] + return [ + part.strip() + for part in re.split(r"[,\s]+", self.collector_letsmesh_decoder_keys) + if part.strip() + ] + @field_validator("database_url") @classmethod def validate_database_url(cls, v: Optional[str]) -> Optional[str]: @@ -267,6 +337,13 @@ class WebSettings(CommonSettings): default="en", description="Locale/language for the web dashboard (e.g. 'en')", ) + web_datetime_locale: str = Field( + default="en-US", + description=( + "Locale used for date/time formatting in the web dashboard " + "(e.g. 'en-US', 'en-GB')." + ), + ) # Auto-refresh interval for list pages web_auto_refresh_seconds: int = Field( diff --git a/src/meshcore_hub/common/mqtt.py b/src/meshcore_hub/common/mqtt.py index fcc9f90..532df52 100644 --- a/src/meshcore_hub/common/mqtt.py +++ b/src/meshcore_hub/common/mqtt.py @@ -24,6 +24,8 @@ class MQTTConfig: keepalive: int = 60 clean_session: bool = True tls: bool = False + transport: str = "tcp" + ws_path: str = "/mqtt" class TopicBuilder: @@ -37,6 +39,10 @@ class TopicBuilder: """ self.prefix = prefix + def _prefix_parts(self) -> list[str]: + """Split configured prefix into path segments.""" + return [part for part in self.prefix.strip("/").split("/") if part] + def event_topic(self, public_key: str, event_name: str) -> str: """Build an event topic. @@ -86,10 +92,16 @@ class TopicBuilder: Returns: Tuple of (public_key, event_name) or None if invalid """ - parts = topic.split("/") - if len(parts) >= 4 and parts[0] == self.prefix and parts[2] == "event": - public_key = parts[1] - event_name = "/".join(parts[3:]) + parts = [part for part in topic.strip("/").split("/") if part] + prefix_parts = self._prefix_parts() + prefix_len = len(prefix_parts) + if ( + len(parts) >= prefix_len + 3 + and parts[:prefix_len] == prefix_parts + and parts[prefix_len + 1] == "event" + ): + public_key = parts[prefix_len] + event_name = "/".join(parts[prefix_len + 2 :]) return (public_key, event_name) return None @@ -102,13 +114,39 @@ class TopicBuilder: Returns: Tuple of (public_key, command_name) or None if invalid """ - parts = topic.split("/") - if len(parts) >= 4 and parts[0] == self.prefix and parts[2] == "command": - public_key = parts[1] - command_name = "/".join(parts[3:]) + parts = [part for part in topic.strip("/").split("/") if part] + prefix_parts = self._prefix_parts() + prefix_len = len(prefix_parts) + if ( + len(parts) >= prefix_len + 3 + and parts[:prefix_len] == prefix_parts + and parts[prefix_len + 1] == "command" + ): + public_key = parts[prefix_len] + command_name = "/".join(parts[prefix_len + 2 :]) return (public_key, command_name) return None + def parse_letsmesh_upload_topic(self, topic: str) -> tuple[str, str] | None: + """Parse a LetsMesh upload topic to extract public key and feed type. + + LetsMesh upload topics are expected in this form: + //(packets|status|internal) + """ + parts = [part for part in topic.strip("/").split("/") if part] + prefix_parts = self._prefix_parts() + prefix_len = len(prefix_parts) + + if len(parts) != prefix_len + 2 or parts[:prefix_len] != prefix_parts: + return None + + public_key = parts[prefix_len] + feed_type = parts[prefix_len + 1] + if feed_type not in {"packets", "status", "internal"}: + return None + + return (public_key, feed_type) + MessageHandler = Callable[[str, str, dict[str, Any]], None] @@ -124,14 +162,24 @@ class MQTTClient: """ self.config = config self.topic_builder = TopicBuilder(config.prefix) + transport = config.transport.lower() + if transport not in {"tcp", "websockets"}: + raise ValueError(f"Unsupported MQTT transport: {config.transport}") + self._client = mqtt.Client( callback_api_version=CallbackAPIVersion.VERSION2, # type: ignore[call-arg] client_id=config.client_id, clean_session=config.clean_session, + transport=transport, ) self._connected = False self._message_handlers: dict[str, list[MessageHandler]] = {} + # Set WebSocket path when using MQTT over WebSockets. + if transport == "websockets": + self._client.ws_set_options(path=config.ws_path) + logger.debug("MQTT WebSocket transport enabled (path=%s)", config.ws_path) + # Set up TLS if enabled if config.tls: self._client.tls_set() diff --git a/src/meshcore_hub/common/schemas/events.py b/src/meshcore_hub/common/schemas/events.py index dca8806..9a08b91 100644 --- a/src/meshcore_hub/common/schemas/events.py +++ b/src/meshcore_hub/common/schemas/events.py @@ -28,6 +28,14 @@ class AdvertisementEvent(BaseModel): default=None, description="Capability/status flags bitmask", ) + lat: Optional[float] = Field( + default=None, + description="Node latitude when location metadata is available", + ) + lon: Optional[float] = Field( + default=None, + description="Node longitude when location metadata is available", + ) class ContactMessageEvent(BaseModel): diff --git a/src/meshcore_hub/web/app.py b/src/meshcore_hub/web/app.py index bb75ab8..0dd638f 100644 --- a/src/meshcore_hub/web/app.py +++ b/src/meshcore_hub/web/app.py @@ -2,6 +2,8 @@ import json import logging +import os +import re from contextlib import asynccontextmanager from datetime import datetime from pathlib import Path @@ -16,6 +18,7 @@ from fastapi.templating import Jinja2Templates from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware from meshcore_hub import __version__ +from meshcore_hub.collector.letsmesh_decoder import LetsMeshPacketDecoder from meshcore_hub.common.i18n import load_locale, t from meshcore_hub.common.schemas import RadioConfig from meshcore_hub.web.middleware import CacheControlMiddleware @@ -29,6 +32,40 @@ TEMPLATES_DIR = PACKAGE_DIR / "templates" STATIC_DIR = PACKAGE_DIR / "static" +def _parse_decoder_key_entries(raw: str | None) -> list[str]: + """Parse COLLECTOR_LETSMESH_DECODER_KEYS into key entries.""" + if not raw: + return [] + return [part.strip() for part in re.split(r"[,\s]+", raw) if part.strip()] + + +def _build_channel_labels() -> dict[str, str]: + """Build UI channel labels from built-in + configured decoder keys.""" + raw_keys = os.getenv("COLLECTOR_LETSMESH_DECODER_KEYS") + decoder = LetsMeshPacketDecoder( + enabled=False, + channel_keys=_parse_decoder_key_entries(raw_keys), + ) + labels = decoder.channel_labels_by_index() + return {str(idx): label for idx, label in sorted(labels.items())} + + +def _is_authenticated_proxy_request(request: Request) -> bool: + """Check whether request is authenticated by an upstream auth proxy. + + Supported patterns: + - OAuth2/OIDC proxy headers: X-Forwarded-User, X-Auth-Request-User + - Forwarded Basic auth header: Authorization: Basic ... + """ + if request.headers.get("x-forwarded-user"): + return True + if request.headers.get("x-auth-request-user"): + return True + + auth_header = request.headers.get("authorization", "") + return auth_header.lower().startswith("basic ") + + @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Application lifespan handler.""" @@ -114,10 +151,12 @@ def _build_config_json(app: FastAPI, request: Request) -> str: "version": __version__, "timezone": app.state.timezone_abbr, "timezone_iana": app.state.timezone, - "is_authenticated": bool(request.headers.get("X-Forwarded-User")), + "is_authenticated": _is_authenticated_proxy_request(request), "default_theme": app.state.web_theme, "locale": app.state.web_locale, + "datetime_locale": app.state.web_datetime_locale, "auto_refresh_seconds": app.state.auto_refresh_seconds, + "channel_labels": app.state.channel_labels, } return json.dumps(config) @@ -183,10 +222,12 @@ def create_app( # Load i18n translations app.state.web_locale = settings.web_locale or "en" + app.state.web_datetime_locale = settings.web_datetime_locale or "en-US" load_locale(app.state.web_locale) # Auto-refresh interval app.state.auto_refresh_seconds = settings.web_auto_refresh_seconds + app.state.channel_labels = _build_channel_labels() # Store configuration in app state (use args if provided, else settings) app.state.web_theme = ( @@ -310,7 +351,7 @@ def create_app( if ( request.method in ("POST", "PUT", "DELETE", "PATCH") and request.app.state.admin_enabled - and not request.headers.get("x-forwarded-user") + and not _is_authenticated_proxy_request(request) ): return JSONResponse( {"detail": "Authentication required"}, diff --git a/src/meshcore_hub/web/static/js/spa/components.js b/src/meshcore_hub/web/static/js/spa/components.js index 841a329..3dbdb8d 100644 --- a/src/meshcore_hub/web/static/js/spa/components.js +++ b/src/meshcore_hub/web/static/js/spa/components.js @@ -22,6 +22,37 @@ export function getConfig() { return window.__APP_CONFIG__ || {}; } +/** + * Parse API datetime strings reliably. + * MeshCore API often returns UTC timestamps without an explicit timezone suffix. + * In that case, treat them as UTC by appending 'Z' before Date parsing. + * + * @param {string|null} isoString + * @returns {Date|null} + */ +export function parseAppDate(isoString) { + if (!isoString || typeof isoString !== 'string') return null; + + let value = isoString.trim(); + if (!value) return null; + + // Normalize "YYYY-MM-DD HH:MM:SS" to ISO separator. + if (/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}/.test(value)) { + value = value.replace(/\s+/, 'T'); + } + + // If no timezone suffix is present, treat as UTC. + const hasTimePart = /T\d{2}:\d{2}/.test(value); + const hasTimezoneSuffix = /(Z|[+-]\d{2}:\d{2}|[+-]\d{4})$/i.test(value); + if (hasTimePart && !hasTimezoneSuffix) { + value += 'Z'; + } + + const parsed = new Date(value); + if (isNaN(parsed.getTime())) return null; + return parsed; +} + /** * Page color palette - reads from CSS custom properties (defined in app.css :root). * Use for inline styles or dynamic coloring in page modules. @@ -42,10 +73,21 @@ export const pageColors = { * @param {string|null} advType * @returns {string} Emoji character */ +function inferNodeType(value) { + const normalized = (value || '').toLowerCase(); + if (!normalized) return null; + if (normalized.includes('room')) return 'room'; + if (normalized.includes('repeater') || normalized.includes('relay')) return 'repeater'; + if (normalized.includes('companion') || normalized.includes('observer')) return 'companion'; + if (normalized.includes('chat')) return 'chat'; + return null; +} + export function typeEmoji(advType) { - switch ((advType || '').toLowerCase()) { + switch (inferNodeType(advType) || (advType || '').toLowerCase()) { case 'chat': return '\u{1F4AC}'; // 💬 case 'repeater': return '\u{1F4E1}'; // 📡 + case 'companion': return '\u{1F4F1}'; // 📱 case 'room': return '\u{1FAA7}'; // 🪧 default: return '\u{1F4CD}'; // 📍 } @@ -74,7 +116,9 @@ export function extractFirstEmoji(str) { */ export function getNodeEmoji(nodeName, advType) { const nameEmoji = extractFirstEmoji(nodeName); - return nameEmoji || typeEmoji(advType); + if (nameEmoji) return nameEmoji; + const inferred = inferNodeType(advType) || inferNodeType(nodeName); + return typeEmoji(inferred || advType); } /** @@ -88,8 +132,9 @@ export function formatDateTime(isoString, options) { try { const config = getConfig(); const tz = config.timezone_iana || 'UTC'; - const date = new Date(isoString); - if (isNaN(date.getTime())) return '-'; + const locale = config.datetime_locale || 'en-US'; + const date = parseAppDate(isoString); + if (!date) return '-'; const opts = options || { timeZone: tz, year: 'numeric', month: '2-digit', day: '2-digit', @@ -97,7 +142,7 @@ export function formatDateTime(isoString, options) { hour12: false, }; if (!opts.timeZone) opts.timeZone = tz; - return date.toLocaleString('en-GB', opts); + return date.toLocaleString(locale, opts); } catch { return isoString ? isoString.slice(0, 19).replace('T', ' ') : '-'; } @@ -113,9 +158,10 @@ export function formatDateTimeShort(isoString) { try { const config = getConfig(); const tz = config.timezone_iana || 'UTC'; - const date = new Date(isoString); - if (isNaN(date.getTime())) return '-'; - return date.toLocaleString('en-GB', { + const locale = config.datetime_locale || 'en-US'; + const date = parseAppDate(isoString); + if (!date) return '-'; + return date.toLocaleString(locale, { timeZone: tz, year: 'numeric', month: '2-digit', day: '2-digit', hour: '2-digit', minute: '2-digit', @@ -133,8 +179,8 @@ export function formatDateTimeShort(isoString) { */ export function formatRelativeTime(isoString) { if (!isoString) return ''; - const date = new Date(isoString); - if (isNaN(date.getTime())) return ''; + const date = parseAppDate(isoString); + if (!date) return ''; const now = new Date(); const diffMs = now - date; const diffSec = Math.floor(diffMs / 1000); diff --git a/src/meshcore_hub/web/static/js/spa/pages/dashboard.js b/src/meshcore_hub/web/static/js/spa/pages/dashboard.js index c7bb0fd..cad567a 100644 --- a/src/meshcore_hub/web/static/js/spa/pages/dashboard.js +++ b/src/meshcore_hub/web/static/js/spa/pages/dashboard.js @@ -1,44 +1,51 @@ import { apiGet } from '../api.js'; import { html, litRender, nothing, - getConfig, typeEmoji, errorAlert, pageColors, t, + getConfig, typeEmoji, errorAlert, pageColors, t, formatDateTime, } from '../components.js'; import { iconNodes, iconAdvertisements, iconMessages, iconChannel, } from '../icons.js'; -function formatTimeOnly(isoString) { - if (!isoString) return '-'; - try { - const config = getConfig(); - const tz = config.timezone_iana || 'UTC'; - const date = new Date(isoString); - if (isNaN(date.getTime())) return '-'; - return date.toLocaleString('en-GB', { - timeZone: tz, - hour: '2-digit', minute: '2-digit', second: '2-digit', - hour12: false, - }); - } catch { - return '-'; +function knownChannelLabel(channelIdx) { + const config = getConfig(); + const configuredChannelLabels = new Map( + Object.entries(config.channel_labels || {}) + .map(([idx, label]) => [parseInt(idx, 10), typeof label === 'string' ? label.trim() : '']) + .filter(([idx, label]) => Number.isInteger(idx) && label.length > 0), + ); + const builtInChannelLabels = new Map([ + [17, 'Public'], + [217, '#test'], + [202, '#bot'], + [184, '#chat'], + [159, '#jokes'], + [221, '#sports'], + [104, '#emergency'], + ]); + return configuredChannelLabels.get(channelIdx) || builtInChannelLabels.get(channelIdx) || null; +} + +function channelLabel(channel) { + const idx = parseInt(String(channel), 10); + if (Number.isInteger(idx)) { + return knownChannelLabel(idx) || `Ch ${idx}`; } + return String(channel); +} + +function formatTimeOnly(isoString) { + return formatDateTime(isoString, { + hour: '2-digit', minute: '2-digit', second: '2-digit', + hour12: false, + }); } function formatTimeShort(isoString) { - if (!isoString) return '-'; - try { - const config = getConfig(); - const tz = config.timezone_iana || 'UTC'; - const date = new Date(isoString); - if (isNaN(date.getTime())) return '-'; - return date.toLocaleString('en-GB', { - timeZone: tz, - hour: '2-digit', minute: '2-digit', - hour12: false, - }); - } catch { - return '-'; - } + return formatDateTime(isoString, { + hour: '2-digit', minute: '2-digit', + hour12: false, + }); } function renderRecentAds(ads) { @@ -81,6 +88,7 @@ function renderChannelMessages(channelMessages) { if (!channelMessages || Object.keys(channelMessages).length === 0) return nothing; const channels = Object.entries(channelMessages).map(([channel, messages]) => { + const label = channelLabel(channel); const msgLines = messages.map(msg => html`
${formatTimeShort(msg.received_at)} @@ -89,8 +97,7 @@ function renderChannelMessages(channelMessages) { return html`

- CH${String(channel)} - ${t('dashboard.channel', { number: String(channel) })} + ${label}

${msgLines} diff --git a/src/meshcore_hub/web/static/js/spa/pages/messages.js b/src/meshcore_hub/web/static/js/spa/pages/messages.js index 57a95c6..3072078 100644 --- a/src/meshcore_hub/web/static/js/spa/pages/messages.js +++ b/src/meshcore_hub/web/static/js/spa/pages/messages.js @@ -19,6 +19,169 @@ export async function render(container, params, router) { const tz = config.timezone || ''; const tzBadge = tz && tz !== 'UTC' ? html`${tz}` : nothing; const navigate = (url) => router.navigate(url); + const configuredChannelLabels = new Map( + Object.entries(config.channel_labels || {}) + .map(([idx, label]) => [parseInt(idx, 10), typeof label === 'string' ? label.trim() : '']) + .filter(([idx, label]) => Number.isInteger(idx) && label.length > 0), + ); + const builtInChannelLabels = new Map([ + [17, 'Public'], + [217, '#test'], + [202, '#bot'], + [184, '#chat'], + [159, '#jokes'], + [221, '#sports'], + [104, '#emergency'], + ]); + + function knownChannelLabel(channelIdx) { + return configuredChannelLabels.get(channelIdx) || builtInChannelLabels.get(channelIdx) || null; + } + + function channelInfo(msg) { + if (msg.message_type !== 'channel') { + return { label: null, text: msg.text || '-' }; + } + const rawText = msg.text || ''; + const match = rawText.match(/^\[([^\]]+)\]\s+([\s\S]*)$/); + if (msg.channel_idx !== null && msg.channel_idx !== undefined) { + const knownLabel = knownChannelLabel(msg.channel_idx); + if (knownLabel) { + return { + label: knownLabel, + text: match ? (match[2] || '-') : (rawText || '-'), + }; + } + } + if (msg.channel_name) { + return { label: msg.channel_name, text: msg.text || '-' }; + } + if (match) { + return { + label: match[1], + text: match[2] || '-', + }; + } + if (msg.channel_idx !== null && msg.channel_idx !== undefined) { + const knownLabel = knownChannelLabel(msg.channel_idx); + return { label: knownLabel || `Ch ${msg.channel_idx}`, text: rawText || '-' }; + } + return { label: t('messages.type_channel'), text: rawText || '-' }; + } + + function senderBlock(msg, emphasize = false) { + const senderName = msg.sender_tag_name || msg.sender_name; + if (senderName) { + return emphasize + ? html`${senderName}` + : html`${senderName}`; + } + const prefix = (msg.pubkey_prefix || '').slice(0, 12); + if (prefix) { + return html`${prefix}`; + } + return html`-`; + } + + function parseSenderFromText(text) { + if (!text || typeof text !== 'string') { + return { sender: null, text: text || '-' }; + } + const patterns = [ + /^\s*ack\s+@\[(.+?)\]\s*:\s*([\s\S]+)$/i, + /^\s*@\[(.+?)\]\s*:\s*([\s\S]+)$/i, + /^\s*ack\s+([^:|\n]{1,80})\s*:\s*([\s\S]+)$/i, + ]; + for (const pattern of patterns) { + const match = text.match(pattern); + if (!match) continue; + const sender = (match[1] || '').trim(); + const remaining = (match[2] || '').trim(); + if (!sender) continue; + return { + sender, + text: remaining || text, + }; + } + return { sender: null, text }; + } + + function messageTextWithSender(msg, text) { + const parsed = parseSenderFromText(text || '-'); + const explicitSender = msg.sender_tag_name || msg.sender_name || (msg.pubkey_prefix || '').slice(0, 12) || null; + const sender = explicitSender || parsed.sender; + const body = (parsed.text || text || '-').trim() || '-'; + if (!sender) { + return body; + } + if (body.toLowerCase().startsWith(`${sender.toLowerCase()}:`)) { + return body; + } + return `${sender}: ${body}`; + } + + function dedupeBySignature(items) { + const deduped = []; + const bySignature = new Map(); + + for (const msg of items) { + const signature = typeof msg.signature === 'string' ? msg.signature.trim().toUpperCase() : ''; + const canDedupe = msg.message_type === 'channel' && signature.length >= 8; + if (!canDedupe) { + deduped.push(msg); + continue; + } + + const existing = bySignature.get(signature); + if (!existing) { + const clone = { + ...msg, + receivers: [...(msg.receivers || [])], + }; + bySignature.set(signature, clone); + deduped.push(clone); + continue; + } + + const combined = [...(existing.receivers || []), ...(msg.receivers || [])]; + const seenReceivers = new Set(); + existing.receivers = combined.filter((recv) => { + const key = recv?.public_key || recv?.node_id || `${recv?.received_at || ''}:${recv?.snr || ''}`; + if (seenReceivers.has(key)) return false; + seenReceivers.add(key); + return true; + }); + + if (!existing.received_by && msg.received_by) existing.received_by = msg.received_by; + if (!existing.receiver_name && msg.receiver_name) existing.receiver_name = msg.receiver_name; + if (!existing.receiver_tag_name && msg.receiver_tag_name) existing.receiver_tag_name = msg.receiver_tag_name; + if (!existing.pubkey_prefix && msg.pubkey_prefix) existing.pubkey_prefix = msg.pubkey_prefix; + if (!existing.sender_name && msg.sender_name) existing.sender_name = msg.sender_name; + if (!existing.sender_tag_name && msg.sender_tag_name) existing.sender_tag_name = msg.sender_tag_name; + if (!existing.channel_name && msg.channel_name) existing.channel_name = msg.channel_name; + if ( + existing.channel_name === 'Public' + && msg.channel_name + && msg.channel_name !== 'Public' + ) { + existing.channel_name = msg.channel_name; + } + if (existing.channel_idx === null || existing.channel_idx === undefined) { + if (msg.channel_idx !== null && msg.channel_idx !== undefined) { + existing.channel_idx = msg.channel_idx; + } + } else if ( + existing.channel_idx === 17 + && msg.channel_idx !== null + && msg.channel_idx !== undefined + && msg.channel_idx !== 17 + ) { + existing.channel_idx = msg.channel_idx; + } + } + + return deduped; + } function renderPage(content, { total = null } = {}) { litRender(html` @@ -39,7 +202,7 @@ ${content}`, container); async function fetchAndRenderData() { try { const data = await apiGet('/api/v1/messages', { limit, offset, message_type }); - const messages = data.items || []; + const messages = dedupeBySignature(data.items || []); const total = data.total || 0; const totalPages = Math.ceil(total / limit); @@ -49,17 +212,12 @@ ${content}`, container); const isChannel = msg.message_type === 'channel'; const typeIcon = isChannel ? '\u{1F4FB}' : '\u{1F464}'; const typeTitle = isChannel ? t('messages.type_channel') : t('messages.type_contact'); - let senderBlock; - if (isChannel) { - senderBlock = html`${t('messages.type_public')}`; - } else { - const senderName = msg.sender_tag_name || msg.sender_name; - if (senderName) { - senderBlock = senderName; - } else { - senderBlock = html`${(msg.pubkey_prefix || '-').slice(0, 12)}`; - } - } + const chInfo = channelInfo(msg); + const sender = senderBlock(msg); + const displayMessage = messageTextWithSender(msg, chInfo.text); + const fromPrimary = isChannel + ? html`${chInfo.label || t('messages.type_channel')}` + : sender; let receiversBlock = nothing; if (msg.receivers && msg.receivers.length >= 1) { receiversBlock = html`
@@ -81,7 +239,7 @@ ${content}`, container);
- ${senderBlock} + ${fromPrimary}
${formatDateTimeShort(msg.received_at)} @@ -92,7 +250,7 @@ ${content}`, container); ${receiversBlock}
-

${msg.text || '-'}

+

${displayMessage}

`; }); @@ -103,17 +261,12 @@ ${content}`, container); const isChannel = msg.message_type === 'channel'; const typeIcon = isChannel ? '\u{1F4FB}' : '\u{1F464}'; const typeTitle = isChannel ? t('messages.type_channel') : t('messages.type_contact'); - let senderBlock; - if (isChannel) { - senderBlock = html`${t('messages.type_public')}`; - } else { - const senderName = msg.sender_tag_name || msg.sender_name; - if (senderName) { - senderBlock = html`${senderName}`; - } else { - senderBlock = html`${(msg.pubkey_prefix || '-').slice(0, 12)}`; - } - } + const chInfo = channelInfo(msg); + const sender = senderBlock(msg, true); + const displayMessage = messageTextWithSender(msg, chInfo.text); + const fromPrimary = isChannel + ? html`${chInfo.label || t('messages.type_channel')}` + : sender; let receiversBlock; if (msg.receivers && msg.receivers.length >= 1) { receiversBlock = html`
@@ -131,8 +284,10 @@ ${content}`, container); return html` ${typeIcon} ${formatDateTime(msg.received_at)} - ${senderBlock} - ${msg.text || '-'} + +
${fromPrimary}
+ + ${displayMessage} ${receiversBlock} `; }); diff --git a/src/meshcore_hub/web/static/js/spa/pages/node-detail.js b/src/meshcore_hub/web/static/js/spa/pages/node-detail.js index 62f2794..381a3f9 100644 --- a/src/meshcore_hub/web/static/js/spa/pages/node-detail.js +++ b/src/meshcore_hub/web/static/js/spa/pages/node-detail.js @@ -209,7 +209,7 @@ ${heroHtml} const initQr = () => { const qrEl = document.getElementById('qr-code'); if (!qrEl || typeof QRCode === 'undefined') return false; - const typeMap = { chat: 1, repeater: 2, room: 3, sensor: 4 }; + const typeMap = { chat: 1, repeater: 2, room: 3, companion: 1, sensor: 4 }; const typeNum = typeMap[(node.adv_type || '').toLowerCase()] || 1; const url = 'meshcore://contact/add?name=' + encodeURIComponent(displayName) + '&public_key=' + node.public_key + '&type=' + typeNum; new QRCode(qrEl, { diff --git a/src/meshcore_hub/web/static/js/spa/pages/nodes.js b/src/meshcore_hub/web/static/js/spa/pages/nodes.js index 3553cb4..e6a072c 100644 --- a/src/meshcore_hub/web/static/js/spa/pages/nodes.js +++ b/src/meshcore_hub/web/static/js/spa/pages/nodes.js @@ -159,6 +159,7 @@ ${content}`, container); +
diff --git a/src/meshcore_hub/web/static/locales/en.json b/src/meshcore_hub/web/static/locales/en.json index 2a66f94..1d788b1 100644 --- a/src/meshcore_hub/web/static/locales/en.json +++ b/src/meshcore_hub/web/static/locales/en.json @@ -122,7 +122,8 @@ "node_types": { "chat": "Chat", "repeater": "Repeater", - "room": "Room", + "companion": "Companion", + "room": "Room Server", "unknown": "Unknown" }, "home": { diff --git a/src/meshcore_hub/web/static/locales/languages.md b/src/meshcore_hub/web/static/locales/languages.md index 4877a2c..d169e17 100644 --- a/src/meshcore_hub/web/static/locales/languages.md +++ b/src/meshcore_hub/web/static/locales/languages.md @@ -223,7 +223,8 @@ Mesh network node type labels: |-----|---------|---------| | `chat` | Chat | Chat node type | | `repeater` | Repeater | Repeater/relay node type | -| `room` | Room | Room/group node type | +| `companion` | Companion | Companion/observer node type | +| `room` | Room Server | Room server/group node type | | `unknown` | Unknown | Unknown node type fallback | ### 7. `home` diff --git a/tests/test_api/test_nodes.py b/tests/test_api/test_nodes.py index 7395a73..1b9f981 100644 --- a/tests/test_api/test_nodes.py +++ b/tests/test_api/test_nodes.py @@ -102,6 +102,52 @@ class TestListNodesFilters: data = response.json() assert len(data["items"]) == 0 + def test_filter_by_adv_type_matches_legacy_labels( + self, client_no_auth, api_db_session + ): + """Canonical adv_type filters match legacy LetsMesh values and names.""" + from datetime import datetime, timezone + + from meshcore_hub.common.models import Node + + repeater_node = Node( + public_key="ab" * 32, + name="Car Relay", + adv_type="PyMC-Repeater", + first_seen=datetime.now(timezone.utc), + ) + companion_node = Node( + public_key="cd" * 32, + name="YC-Observer", + adv_type="offline", + first_seen=datetime.now(timezone.utc), + ) + room_node = Node( + public_key="ef" * 32, + name="WAL-SE Room Server", + adv_type="unknown", + first_seen=datetime.now(timezone.utc), + ) + api_db_session.add(repeater_node) + api_db_session.add(companion_node) + api_db_session.add(room_node) + api_db_session.commit() + + response = client_no_auth.get("/api/v1/nodes?adv_type=repeater") + assert response.status_code == 200 + repeater_keys = {item["public_key"] for item in response.json()["items"]} + assert repeater_node.public_key in repeater_keys + + response = client_no_auth.get("/api/v1/nodes?adv_type=companion") + assert response.status_code == 200 + companion_keys = {item["public_key"] for item in response.json()["items"]} + assert companion_node.public_key in companion_keys + + response = client_no_auth.get("/api/v1/nodes?adv_type=room") + assert response.status_code == 200 + room_keys = {item["public_key"] for item in response.json()["items"]} + assert room_node.public_key in room_keys + def test_filter_by_member_id(self, client_no_auth, sample_node_with_member_tag): """Test filtering nodes by member_id tag.""" # Match alice diff --git a/tests/test_collector/test_handlers/test_advertisement.py b/tests/test_collector/test_handlers/test_advertisement.py index 6370396..5f6d360 100644 --- a/tests/test_collector/test_handlers/test_advertisement.py +++ b/tests/test_collector/test_handlers/test_advertisement.py @@ -71,6 +71,26 @@ class TestHandleAdvertisement: assert ad.public_key == "a" * 64 assert ad.name == "TestNode" + def test_updates_node_location_fields(self, db_manager, db_session): + """Advertisement payload lat/lon updates node coordinates.""" + payload = { + "public_key": "a" * 64, + "name": "LocNode", + "adv_type": "repeater", + "lat": 42.1234, + "lon": -71.9876, + } + + handle_advertisement("b" * 64, "advertisement", payload, db_manager) + + node = db_session.execute( + select(Node).where(Node.public_key == "a" * 64) + ).scalar_one_or_none() + + assert node is not None + assert node.lat == 42.1234 + assert node.lon == -71.9876 + def test_handles_missing_public_key(self, db_manager, db_session): """Test that missing public_key is handled gracefully.""" payload = { diff --git a/tests/test_collector/test_letsmesh_decoder.py b/tests/test_collector/test_letsmesh_decoder.py new file mode 100644 index 0000000..e624968 --- /dev/null +++ b/tests/test_collector/test_letsmesh_decoder.py @@ -0,0 +1,125 @@ +"""Tests for LetsMesh packet decoder integration.""" + +import subprocess +from unittest.mock import patch + +from meshcore_hub.collector.letsmesh_decoder import LetsMeshPacketDecoder + + +def test_decode_payload_returns_none_without_raw() -> None: + """Decoder returns None when packet has no raw hex.""" + decoder = LetsMeshPacketDecoder(enabled=True) + assert decoder.decode_payload({"packet_type": 5}) is None + + +def test_decode_payload_invokes_decoder_with_keys() -> None: + """Decoder command includes channel keys and returns parsed JSON.""" + decoder = LetsMeshPacketDecoder( + enabled=True, + command="meshcore-decoder", + channel_keys=["0xABCDEF", "name=012345", "abcDEF"], + timeout_seconds=1.5, + ) + completed = subprocess.CompletedProcess( + args=["meshcore-decoder"], + returncode=0, + stdout='{"payload":{"decoded":{"decrypted":{"message":"hello"}}}}', + stderr="", + ) + + with ( + patch("meshcore_hub.collector.letsmesh_decoder.shutil.which", return_value="1"), + patch( + "meshcore_hub.collector.letsmesh_decoder.subprocess.run", + return_value=completed, + ) as mock_run, + ): + decoded = decoder.decode_payload({"raw": "A1B2C3"}) + + assert isinstance(decoded, dict) + payload = decoded.get("payload") + assert isinstance(payload, dict) + decoded_payload = payload.get("decoded") + assert isinstance(decoded_payload, dict) + decrypted = decoded_payload.get("decrypted") + assert isinstance(decrypted, dict) + assert decrypted.get("message") == "hello" + command = mock_run.call_args.args[0] + assert command == [ + "meshcore-decoder", + "decode", + "A1B2C3", + "--json", + "--key", + "8B3387E9C5CDEA6AC9E5EDBAA115CD72", + "9CD8FCF22A47333B591D96A2B848B73F", + "ABCDEF", + "012345", + ] + assert mock_run.call_args.kwargs["timeout"] == 1.5 + + +def test_decode_payload_returns_none_for_decoder_error() -> None: + """Decoder returns None when decoder exits with failure.""" + decoder = LetsMeshPacketDecoder(enabled=True, command="meshcore-decoder") + completed = subprocess.CompletedProcess( + args=["meshcore-decoder"], + returncode=1, + stdout="", + stderr="decode error", + ) + + with ( + patch("meshcore_hub.collector.letsmesh_decoder.shutil.which", return_value="1"), + patch( + "meshcore_hub.collector.letsmesh_decoder.subprocess.run", + return_value=completed, + ), + ): + assert decoder.decode_payload({"raw": "A1B2C3"}) is None + + +def test_builtin_channel_keys_present_by_default() -> None: + """Public and #test keys are always present even without .env keys.""" + decoder = LetsMeshPacketDecoder(enabled=True, command="meshcore-decoder") + assert decoder._channel_keys == [ + "8B3387E9C5CDEA6AC9E5EDBAA115CD72", + "9CD8FCF22A47333B591D96A2B848B73F", + ] + + +def test_channel_name_lookup_from_decoded_hash() -> None: + """Decoder resolves channel names from configured label=key entries.""" + key_hex = "EB50A1BCB3E4E5D7BF69A57C9DADA211" + decoder = LetsMeshPacketDecoder( + enabled=False, + channel_keys=[f"#bot={key_hex}"], + ) + channel_hash = decoder._compute_channel_hash(key_hex) + decoded_packet = { + "payload": { + "decoded": { + "channelHash": channel_hash, + } + } + } + + assert decoder.channel_name_from_decoded(decoded_packet) == "bot" + + +def test_channel_labels_by_index_includes_labeled_entries() -> None: + """Channel labels map includes built-ins and label=key env entries.""" + decoder = LetsMeshPacketDecoder( + enabled=False, + channel_keys=[ + "bot=EB50A1BCB3E4E5D7BF69A57C9DADA211", + "chat=D0BDD6D71538138ED979EEC00D98AD97", + ], + ) + + labels = decoder.channel_labels_by_index() + + assert labels[17] == "Public" + assert labels[217] == "#test" + assert labels[202] == "#bot" + assert labels[184] == "#chat" diff --git a/tests/test_collector/test_subscriber.py b/tests/test_collector/test_subscriber.py index 439bba0..e4ee3de 100644 --- a/tests/test_collector/test_subscriber.py +++ b/tests/test_collector/test_subscriber.py @@ -1,7 +1,7 @@ """Tests for the collector subscriber.""" import pytest -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, call, patch from meshcore_hub.collector.subscriber import Subscriber, create_subscriber @@ -14,11 +14,16 @@ class TestSubscriber: """Create a mock MQTT client.""" client = MagicMock() client.topic_builder = MagicMock() + client.topic_builder.prefix = "meshcore/BOS" client.topic_builder.all_events_topic.return_value = "meshcore/+/event/#" client.topic_builder.parse_event_topic.return_value = ( "a" * 64, "advertisement", ) + client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "status", + ) return client @pytest.fixture @@ -66,6 +71,522 @@ class TestSubscriber: handler.assert_called_once() + def test_start_subscribes_to_letsmesh_topics(self, mock_mqtt_client, db_manager): + """LetsMesh ingest mode subscribes to packets/status/internal feeds.""" + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + + subscriber.start() + + expected_calls = [ + call("meshcore/BOS/+/packets", subscriber._handle_mqtt_message), + call("meshcore/BOS/+/status", subscriber._handle_mqtt_message), + call("meshcore/BOS/+/internal", subscriber._handle_mqtt_message), + ] + mock_mqtt_client.subscribe.assert_has_calls(expected_calls, any_order=False) + assert mock_mqtt_client.subscribe.call_count == 3 + + def test_letsmesh_status_maps_to_advertisement( + self, mock_mqtt_client, db_manager + ) -> None: + """LetsMesh status payloads are normalized to advertisement events.""" + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + handler = MagicMock() + subscriber.register_handler("advertisement", handler) + subscriber.start() + + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/status", + pattern="meshcore/BOS/+/status", + payload={ + "origin": "Observer Node", + "origin_id": "b" * 64, + "model": "Heltec V3", + "mode": "repeater", + "stats": {"debug_flags": 7}, + }, + ) + + handler.assert_called_once() + public_key, event_type, payload, _db = handler.call_args.args + assert public_key == "a" * 64 + assert event_type == "advertisement" + assert payload["public_key"] == "b" * 64 + assert payload["name"] == "Observer Node" + assert payload["adv_type"] == "repeater" + assert payload["flags"] == 7 + + def test_invalid_ingest_mode_raises(self, mock_mqtt_client, db_manager) -> None: + """Invalid ingest mode values are rejected.""" + with pytest.raises(ValueError): + Subscriber(mock_mqtt_client, db_manager, ingest_mode="invalid_mode") + + def test_letsmesh_packet_maps_to_channel_message( + self, mock_mqtt_client, db_manager + ) -> None: + """LetsMesh packets are mapped to channel messages when text is available.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + handler = MagicMock() + subscriber.register_handler("channel_msg_recv", handler) + subscriber.start() + + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value={ + "payloadType": 5, + "payload": { + "decoded": { + "decrypted": { + "message": "hello channel", + } + } + }, + }, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "5", + "hash": "ABCDEF1234", + "timestamp": "2026-02-21T17:42:39.897932", + "SNR": "12.5", + "path": "91CBC3", + }, + ) + + handler.assert_called_once() + public_key, event_type, payload, _db = handler.call_args.args + assert public_key == "a" * 64 + assert event_type == "channel_msg_recv" + assert payload["text"] == "hello channel" + assert payload["txt_type"] == 5 + assert "sender_timestamp" not in payload + assert payload["SNR"] == 12.5 + assert payload["path_len"] == 3 + + def test_letsmesh_packet_without_decrypted_text_is_not_shown_as_message( + self, mock_mqtt_client, db_manager + ) -> None: + """Undecodable LetsMesh packets are kept as informational events, not messages.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + letsmesh_packet_handler = MagicMock() + channel_handler = MagicMock() + subscriber.register_handler("letsmesh_packet", letsmesh_packet_handler) + subscriber.register_handler("channel_msg_recv", channel_handler) + subscriber.start() + + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value=None, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "5", + "hash": "ABCDEF1234", + "raw": "15040791959fd9", + }, + ) + + letsmesh_packet_handler.assert_called_once() + channel_handler.assert_not_called() + + def test_letsmesh_packet_uses_decoder_text_when_available( + self, mock_mqtt_client, db_manager + ) -> None: + """LetsMesh packet decoder output is used for message text and timestamp.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + handler = MagicMock() + subscriber.register_handler("channel_msg_recv", handler) + subscriber.start() + + with ( + patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value={ + "payloadType": 5, + "pathLength": 4, + "payload": { + "decoded": { + "channelHash": "AA", + "decrypted": { + "sender": "ABCD1234", + "timestamp": 1771695860, + "message": "decoded hello", + }, + } + }, + }, + ), + patch.object( + subscriber._letsmesh_decoder, + "channel_name_from_decoded", + return_value="test", + ), + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "5", + "hash": "ABCDEF1234", + "raw": "15040791959fd9", + "SNR": "9.0", + }, + ) + + handler.assert_called_once() + public_key, event_type, payload, _db = handler.call_args.args + assert public_key == "a" * 64 + assert event_type == "channel_msg_recv" + assert payload["text"] == "decoded hello" + assert payload["channel_name"] == "#test" + assert payload["sender_timestamp"] == 1771695860 + assert payload["txt_type"] == 5 + assert payload["path_len"] == 4 + assert payload["channel_idx"] == 170 + assert payload["pubkey_prefix"] == "ABCD1234" + + def test_letsmesh_packet_type_1_maps_to_contact_message( + self, mock_mqtt_client, db_manager + ) -> None: + """LetsMesh packet type 1 is treated as direct/contact message traffic.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + handler = MagicMock() + subscriber.register_handler("contact_msg_recv", handler) + subscriber.start() + + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value={ + "payloadType": 1, + "payload": { + "decoded": { + "sourceHash": "7CAF1337A58D", + "decrypted": { + "message": "hello dm", + }, + } + }, + }, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "1", + "hash": "ABABAB1234", + "raw": "010203", + }, + ) + + handler.assert_called_once() + public_key, event_type, payload, _db = handler.call_args.args + assert public_key == "a" * 64 + assert event_type == "contact_msg_recv" + assert payload["text"] == "hello dm" + assert payload["pubkey_prefix"] == "7CAF1337A58D" + + def test_letsmesh_decoder_sender_name_prefixes_message_text( + self, mock_mqtt_client, db_manager + ) -> None: + """Non-hex decoder sender names are rendered as `Name: Message`.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + handler = MagicMock() + subscriber.register_handler("channel_msg_recv", handler) + subscriber.start() + + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value={ + "payloadType": 5, + "payload": { + "decoded": { + "channelHash": "D9", + "decrypted": { + "sender": "Stephenbarz", + "message": "hello mesh", + }, + } + }, + }, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "5", + "hash": "FEEDC0DE", + "raw": "AABBCC", + }, + ) + + handler.assert_called_once() + _public_key, event_type, payload, _db = handler.call_args.args + assert event_type == "channel_msg_recv" + assert payload["text"] == "Stephenbarz: hello mesh" + assert payload["channel_idx"] == 217 + assert "pubkey_prefix" not in payload + + def test_letsmesh_packet_type_4_maps_to_advertisement_with_location( + self, mock_mqtt_client, db_manager + ) -> None: + """Decoder packet type 4 is mapped to advertisement with GPS coordinates.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + handler = MagicMock() + subscriber.register_handler("advertisement", handler) + subscriber.start() + + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value={ + "payloadType": 4, + "payload": { + "decoded": { + "type": 4, + "publicKey": "B" * 64, + "appData": { + "flags": 146, + "deviceRole": 2, + "location": { + "latitude": 42.470001, + "longitude": -71.330001, + }, + "name": "Concord Attic G2", + }, + } + }, + }, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "4", + "hash": "A1B2C3D4", + "raw": "010203", + }, + ) + + handler.assert_called_once() + public_key, event_type, payload, _db = handler.call_args.args + assert public_key == "a" * 64 + assert event_type == "advertisement" + assert payload["public_key"] == "B" * 64 + assert payload["name"] == "Concord Attic G2" + assert payload["adv_type"] == "repeater" + assert payload["flags"] == 146 + assert payload["lat"] == 42.470001 + assert payload["lon"] == -71.330001 + + def test_letsmesh_packet_type_11_maps_to_advertisement( + self, mock_mqtt_client, db_manager + ) -> None: + """Decoder packet type 11 is mapped to advertisement metadata updates.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + handler = MagicMock() + subscriber.register_handler("advertisement", handler) + subscriber.start() + + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value={ + "payloadType": 11, + "payload": { + "decoded": { + "type": 11, + "publicKey": "C" * 64, + "nodeType": 2, + "nodeTypeName": "Repeater", + "rawFlags": 146, + } + }, + }, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "11", + "hash": "E5F6A7B8", + "raw": "040506", + }, + ) + + handler.assert_called_once() + _public_key, event_type, payload, _db = handler.call_args.args + assert event_type == "advertisement" + assert payload["public_key"] == "C" * 64 + assert payload["adv_type"] == "repeater" + assert payload["flags"] == 146 + + def test_letsmesh_packet_fallback_logs_decoded_payload( + self, mock_mqtt_client, db_manager + ) -> None: + """Non-mapped packets include decoder output in letsmesh_packet payload.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + packet_handler = MagicMock() + subscriber.register_handler("letsmesh_packet", packet_handler) + subscriber.start() + + decoded_packet = { + "payloadType": 8, + "payload": { + "decoded": { + "type": 8, + "isValid": True, + "pathHashes": ["AA", "BB", "CC"], + } + }, + } + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value=decoded_packet, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "8", + "hash": "99887766", + "raw": "ABCDEF", + }, + ) + + packet_handler.assert_called_once() + _public_key, event_type, payload, _db = packet_handler.call_args.args + assert event_type == "letsmesh_packet" + assert payload["decoded_payload_type"] == 8 + assert payload["decoded_packet"] == decoded_packet + + def test_letsmesh_packet_sender_fallback_from_payload_fields( + self, mock_mqtt_client, db_manager + ) -> None: + """Sender prefix falls back to payload sourceHash when decoder has no sender.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + handler = MagicMock() + subscriber.register_handler("channel_msg_recv", handler) + subscriber.start() + + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value={ + "payloadType": 5, + "payload": { + "decoded": { + "decrypted": { + "message": "hello from payload sender", + }, + } + }, + }, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "5", + "hash": "ABABAB1234", + "sourceHash": "1A2B3C4D5E6F", + "raw": "010203", + }, + ) + + handler.assert_called_once() + _public_key, _event_type, payload, _db = handler.call_args.args + assert payload["text"] == "hello from payload sender" + assert payload["pubkey_prefix"] == "1A2B3C4D5E6F" + class TestCreateSubscriber: """Tests for create_subscriber factory function.""" diff --git a/tests/test_common/test_config.py b/tests/test_common/test_config.py index a823a2d..c584967 100644 --- a/tests/test_common/test_config.py +++ b/tests/test_common/test_config.py @@ -18,6 +18,17 @@ class TestCommonSettings: assert settings.data_home == "/custom/data" + def test_websocket_transport_settings(self) -> None: + """Test MQTT websocket transport settings.""" + settings = CommonSettings( + _env_file=None, + mqtt_transport="websockets", + mqtt_ws_path="/", + ) + + assert settings.mqtt_transport.value == "websockets" + assert settings.mqtt_ws_path == "/" + class TestInterfaceSettings: """Tests for InterfaceSettings.""" @@ -63,6 +74,28 @@ class TestCollectorSettings: assert settings.node_tags_file == "/seed/data/node_tags.yaml" assert settings.members_file == "/seed/data/members.yaml" + def test_collector_ingest_mode_letsmesh_upload(self) -> None: + """Test collector ingest mode can be set to LetsMesh upload.""" + settings = CollectorSettings( + _env_file=None, + collector_ingest_mode="letsmesh_upload", + ) + + assert settings.collector_ingest_mode.value == "letsmesh_upload" + + def test_collector_letsmesh_decoder_keys_list(self) -> None: + """LetsMesh decoder keys are parsed from comma/space-separated env values.""" + settings = CollectorSettings( + _env_file=None, + collector_letsmesh_decoder_keys="aa11, bb22 cc33", + ) + + assert settings.collector_letsmesh_decoder_keys_list == [ + "aa11", + "bb22", + "cc33", + ] + class TestAPISettings: """Tests for APISettings.""" @@ -92,3 +125,11 @@ class TestWebSettings: settings = WebSettings(_env_file=None, data_home="/custom/data") assert settings.web_data_dir == "/custom/data/web" + + def test_web_datetime_locale_default_and_override(self) -> None: + """Date formatting locale has sensible default and can be overridden.""" + default_settings = WebSettings(_env_file=None) + custom_settings = WebSettings(_env_file=None, web_datetime_locale="en-GB") + + assert default_settings.web_datetime_locale == "en-US" + assert custom_settings.web_datetime_locale == "en-GB" diff --git a/tests/test_common/test_mqtt.py b/tests/test_common/test_mqtt.py new file mode 100644 index 0000000..2a33c69 --- /dev/null +++ b/tests/test_common/test_mqtt.py @@ -0,0 +1,57 @@ +"""Tests for MQTT topic parsing utilities.""" + +from meshcore_hub.common.mqtt import TopicBuilder + + +class TestTopicBuilder: + """Tests for MQTT topic builder parsing helpers.""" + + def test_parse_event_topic_with_single_segment_prefix(self) -> None: + """Event topics are parsed correctly with a simple prefix.""" + builder = TopicBuilder(prefix="meshcore") + + parsed = builder.parse_event_topic( + "meshcore/ABCDEF1234567890/event/advertisement" + ) + + assert parsed == ("ABCDEF1234567890", "advertisement") + + def test_parse_event_topic_with_multi_segment_prefix(self) -> None: + """Event topics are parsed correctly with a slash-delimited prefix.""" + builder = TopicBuilder(prefix="meshcore/BOS") + + parsed = builder.parse_event_topic( + "meshcore/BOS/ABCDEF1234567890/event/channel_msg_recv" + ) + + assert parsed == ("ABCDEF1234567890", "channel_msg_recv") + + def test_parse_command_topic_with_multi_segment_prefix(self) -> None: + """Command topics are parsed correctly with a slash-delimited prefix.""" + builder = TopicBuilder(prefix="meshcore/BOS") + + parsed = builder.parse_command_topic( + "meshcore/BOS/ABCDEF123456/command/send_msg" + ) + + assert parsed == ("ABCDEF123456", "send_msg") + + def test_parse_letsmesh_upload_topic(self) -> None: + """LetsMesh upload topics map to public key and feed type.""" + builder = TopicBuilder(prefix="meshcore/BOS") + + parsed = builder.parse_letsmesh_upload_topic( + "meshcore/BOS/ABCDEF1234567890/status" + ) + + assert parsed == ("ABCDEF1234567890", "status") + + def test_parse_letsmesh_upload_topic_rejects_unknown_feed(self) -> None: + """Unknown LetsMesh feed topics are rejected.""" + builder = TopicBuilder(prefix="meshcore/BOS") + + parsed = builder.parse_letsmesh_upload_topic( + "meshcore/BOS/ABCDEF1234567890/something_else" + ) + + assert parsed is None diff --git a/tests/test_web/test_admin.py b/tests/test_web/test_admin.py index 459653e..ac70df1 100644 --- a/tests/test_web/test_admin.py +++ b/tests/test_web/test_admin.py @@ -59,6 +59,22 @@ def auth_headers() -> dict: } +@pytest.fixture +def auth_headers_basic() -> dict[str, str]: + """Basic auth header forwarded by reverse proxy.""" + return { + "Authorization": "Basic dGVzdDp0ZXN0", + } + + +@pytest.fixture +def auth_headers_auth_request() -> dict[str, str]: + """Auth-request style header from upstream proxy.""" + return { + "X-Auth-Request-User": "test-user-id", + } + + @pytest.fixture def admin_client(admin_app: Any, mock_http_client: MockHttpClient) -> TestClient: """Create a test client with admin enabled.""" @@ -113,6 +129,34 @@ class TestAdminHome: assert config["is_authenticated"] is True + def test_admin_home_config_authenticated_with_basic_auth( + self, admin_client, auth_headers_basic + ): + """Test admin config shows is_authenticated: true with basic auth header.""" + response = admin_client.get("/a/", headers=auth_headers_basic) + text = response.text + config_start = text.find("window.__APP_CONFIG__ = ") + len( + "window.__APP_CONFIG__ = " + ) + config_end = text.find(";", config_start) + config = json.loads(text[config_start:config_end]) + + assert config["is_authenticated"] is True + + def test_admin_home_config_authenticated_with_auth_request_header( + self, admin_client, auth_headers_auth_request + ): + """Test admin config shows is_authenticated with X-Auth-Request-User.""" + response = admin_client.get("/a/", headers=auth_headers_auth_request) + text = response.text + config_start = text.find("window.__APP_CONFIG__ = ") + len( + "window.__APP_CONFIG__ = " + ) + config_end = text.find(";", config_start) + config = json.loads(text[config_start:config_end]) + + assert config["is_authenticated"] is True + def test_admin_home_disabled_returns_spa_shell( self, admin_client_disabled, auth_headers ): @@ -248,6 +292,18 @@ class TestAdminApiProxyAuth: ) assert response.status_code == 201 + def test_proxy_post_allowed_with_basic_auth( + self, admin_client, auth_headers_basic, mock_http_client + ): + """POST to API proxy succeeds with basic auth header.""" + mock_http_client.set_response("POST", "/api/v1/members", 201, {"id": "new"}) + response = admin_client.post( + "/api/v1/members", + json={"name": "Test", "member_id": "test"}, + headers=auth_headers_basic, + ) + assert response.status_code == 201 + def test_proxy_put_allowed_with_auth( self, admin_client, auth_headers, mock_http_client ): diff --git a/tests/test_web/test_messages.py b/tests/test_web/test_messages.py index 4083b16..eb35fc9 100644 --- a/tests/test_web/test_messages.py +++ b/tests/test_web/test_messages.py @@ -88,3 +88,17 @@ class TestMessagesConfig: config = json.loads(text[config_start:config_end]) assert config["network_name"] == "Test Network" + assert config["datetime_locale"] == "en-US" + + def test_messages_config_has_channel_labels(self, client: TestClient) -> None: + """Test that SPA config includes known channel labels.""" + response = client.get("/messages") + text = response.text + config_start = text.find("window.__APP_CONFIG__ = ") + len( + "window.__APP_CONFIG__ = " + ) + config_end = text.find(";", config_start) + config = json.loads(text[config_start:config_end]) + + assert config["channel_labels"]["17"] == "Public" + assert config["channel_labels"]["217"] == "#test"