diff --git a/AGENTS.md b/AGENTS.md index 64659a8..f2b7173 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -197,6 +197,7 @@ This message-layer echo/path handling is independent of raw-packet storage dedup │ ├── event_handlers.py # Radio events │ ├── decoder.py # Packet decryption │ ├── websocket.py # Real-time broadcasts +│ ├── push/ # Web Push notification subsystem (VAPID keys, dispatch, send) │ └── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise, SQS (see fanout/AGENTS_fanout.md) ├── frontend/ # React frontend │ ├── AGENTS.md # Frontend documentation @@ -380,6 +381,12 @@ All endpoints are prefixed with `/api` (e.g., `/api/health`). | DELETE | `/api/fanout/{id}` | Delete fanout config (stops module) | | POST | `/api/fanout/bots/disable-until-restart` | Stop bot fanout modules and keep bots disabled until the process restarts | | GET | `/api/statistics` | Aggregated mesh network statistics | +| GET | `/api/push/vapid-public-key` | VAPID public key for browser push subscription | +| POST | `/api/push/subscribe` | Register/upsert a push subscription | +| GET | `/api/push/subscriptions` | List all push subscriptions | +| PATCH | `/api/push/subscriptions/{id}` | Update subscription label or filter preferences | +| DELETE | `/api/push/subscriptions/{id}` | Delete a push subscription | +| POST | `/api/push/subscriptions/{id}/test` | Send a test push notification | | WS | `/api/ws` | Real-time updates | ## Key Concepts @@ -434,6 +441,17 @@ All external integrations are managed through the fanout bus (`app/fanout/`). Ea Community MQTT forwards raw packets only. Its derived `path` field, when present on direct packets, is a comma-separated list of hop identifiers as reported by the packet format. Token width therefore varies with the packet's path hash mode; it is intentionally not a flat per-byte rendering. +### Web Push Notifications + +Web Push is a standalone subsystem (`app/push/`) that sends browser push notifications for incoming messages even when the browser tab is closed. It is **not** a fanout module — it manages its own per-browser subscriptions, while the set of push-enabled conversations is stored once per server instance. + +- **Requires HTTPS** (self-signed certificates work) and outbound internet from the server to reach browser push services (Google FCM, Mozilla autopush). +- VAPID key pair is auto-generated on first startup and stored in `app_settings`. +- Each browser subscription is stored in `push_subscriptions` with device identity and delivery state. The set of push-enabled conversations is stored globally in `app_settings.push_conversations`, so all subscribed browsers receive the same configured rooms/DMs. +- `broadcast_event()` in `websocket.py` dispatches to `push_manager.dispatch_message()` alongside fanout for `message` events. +- Expired subscriptions (HTTP 404/410 from push service) are auto-deleted. +- Frontend: service worker (`sw.js`) handles push display and notification click navigation. The `BellRing` icon in `ChatHeader` toggles per-conversation push. Device management lives in Settings > Local. + ### Server-Side Decryption The server can decrypt packets using stored keys, both in real-time and for historical packets. diff --git a/app/AGENTS.md b/app/AGENTS.md index 6021c85..ae650cf 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -50,6 +50,10 @@ app/ ├── events.py # Typed WS event payload serialization ├── websocket.py # WS manager + broadcast helpers ├── security.py # Optional app-wide HTTP Basic auth middleware for HTTP + WS +├── push/ # Web Push notification subsystem +│ ├── vapid.py # VAPID key generation, storage, caching +│ ├── send.py # pywebpush wrapper (async via thread executor) +│ └── manager.py # Push dispatch: filter, build payload, concurrent send ├── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise, SQS (see fanout/AGENTS_fanout.md) ├── dependencies.py # Shared FastAPI dependency providers ├── path_utils.py # Path hex rendering and hop-width helpers @@ -71,6 +75,7 @@ app/ ├── fanout.py ├── repeaters.py ├── statistics.py + ├── push.py └── ws.py ``` @@ -168,6 +173,17 @@ app/ - Community MQTT publishes raw packets only, but its derived `path` field for direct packets is emitted as comma-separated hop identifiers, not flat path bytes. - See `app/fanout/AGENTS_fanout.md` for full architecture details and event payload shapes. +### Web Push notifications + +Web Push is a standalone subsystem in `app/push/`, separate from the fanout module system. It sends browser push notifications for incoming messages even when the tab is closed. + +- **Not a fanout module** — Web Push manages per-browser subscriptions (N browsers, each with its own endpoint and delivery state), unlike fanout which is one-config-to-one-destination. +- **VAPID keys**: auto-generated P-256 key pair on first startup, stored in `app_settings.vapid_private_key` / `vapid_public_key`. Cached in-module by `app/push/vapid.py`. +- **Dispatch**: `broadcast_event()` in `websocket.py` fires `push_manager.dispatch_message(data)` alongside fanout for `message` events. The manager checks the global `app_settings.push_conversations` list, then sends to all currently registered subscriptions via `pywebpush` (run in a thread executor). +- **Stale cleanup**: HTTP 404/410 from the push service triggers immediate subscription deletion. +- **Subscriptions stored** in `push_subscriptions` table with `UNIQUE(endpoint)` for upsert semantics. +- Requires HTTPS (self-signed OK) and outbound internet to reach browser push services. + ## API Surface (all under `/api`) ### Health @@ -258,6 +274,14 @@ app/ ### Statistics - `GET /statistics` — aggregated mesh network stats (entity counts, message/packet splits, activity windows, busiest channels) +### Push +- `GET /push/vapid-public-key` — VAPID public key for browser `PushManager.subscribe()` +- `POST /push/subscribe` — register/upsert push subscription (keyed by endpoint URL) +- `GET /push/subscriptions` — list all push subscriptions +- `PATCH /push/subscriptions/{id}` — update label or filter preferences +- `DELETE /push/subscriptions/{id}` — delete subscription +- `POST /push/subscriptions/{id}/test` — send test notification + ### WebSocket - `WS /ws` @@ -290,7 +314,8 @@ Main tables: - `contact_name_history` (tracks name changes over time) - `repeater_telemetry_history` (time-series telemetry snapshots for tracked repeaters) - `fanout_configs` (MQTT, bot, webhook, Apprise, SQS integration configs) -- `app_settings` +- `push_subscriptions` (Web Push browser subscriptions with delivery metadata; UNIQUE on endpoint) +- `app_settings` (includes `vapid_private_key` and `vapid_public_key` for Web Push VAPID signing) Contact route state is canonicalized on the backend: - stored route inputs: `direct_path`, `direct_path_len`, `direct_path_hash_mode`, `direct_path_updated_at`, plus optional `route_override_*` diff --git a/app/main.py b/app/main.py index 11a6289..40b327c 100644 --- a/app/main.py +++ b/app/main.py @@ -67,6 +67,7 @@ from app.routers import ( health, messages, packets, + push, radio, read_state, repeaters, @@ -102,6 +103,14 @@ async def lifespan(app: FastAPI): await db.connect() logger.info("Database connected") + # Initialize VAPID keys for Web Push (generates on first run) + from app.push.vapid import ensure_vapid_keys + + try: + await ensure_vapid_keys() + except Exception: + logger.warning("Failed to initialize VAPID keys for Web Push", exc_info=True) + # Ensure default channels exist in the database even before the radio # connects. Without this, a fresh or disconnected instance would return # zero channels from GET /channels until the first successful radio sync. @@ -185,6 +194,7 @@ app.include_router(packets.router, prefix="/api") app.include_router(read_state.router, prefix="/api") app.include_router(settings.router, prefix="/api") app.include_router(statistics.router, prefix="/api") +app.include_router(push.router, prefix="/api") app.include_router(ws.router, prefix="/api") # Serve frontend static files in production diff --git a/app/migrations/_058_web_push.py b/app/migrations/_058_web_push.py new file mode 100644 index 0000000..93ca6bf --- /dev/null +++ b/app/migrations/_058_web_push.py @@ -0,0 +1,49 @@ +import logging + +import aiosqlite + +logger = logging.getLogger(__name__) + + +async def migrate(conn: aiosqlite.Connection) -> None: + """Add Web Push support: VAPID keys, push subscriptions table, and global conversation list.""" + + # VAPID key pair + global push conversation list in app_settings + table_check = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='app_settings'" + ) + if await table_check.fetchone(): + cursor = await conn.execute("PRAGMA table_info(app_settings)") + columns = {row[1] for row in await cursor.fetchall()} + + if "vapid_private_key" not in columns: + await conn.execute( + "ALTER TABLE app_settings ADD COLUMN vapid_private_key TEXT DEFAULT ''" + ) + if "vapid_public_key" not in columns: + await conn.execute( + "ALTER TABLE app_settings ADD COLUMN vapid_public_key TEXT DEFAULT ''" + ) + if "push_conversations" not in columns: + await conn.execute( + "ALTER TABLE app_settings ADD COLUMN push_conversations TEXT DEFAULT '[]'" + ) + + # Push subscriptions — one row per browser/device + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS push_subscriptions ( + id TEXT PRIMARY KEY, + endpoint TEXT NOT NULL, + p256dh TEXT NOT NULL, + auth TEXT NOT NULL, + label TEXT NOT NULL DEFAULT '', + created_at INTEGER NOT NULL, + last_success_at INTEGER, + failure_count INTEGER DEFAULT 0, + UNIQUE(endpoint) + ) + """ + ) + + await conn.commit() diff --git a/app/push/__init__.py b/app/push/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/push/manager.py b/app/push/manager.py new file mode 100644 index 0000000..b9dad29 --- /dev/null +++ b/app/push/manager.py @@ -0,0 +1,172 @@ +"""Web Push dispatch manager. + +Checks the global push-enabled conversation list (stored in app_settings) +and sends push notifications to ALL registered devices when a matching +incoming message arrives. +""" + +import asyncio +import json +import logging +from dataclasses import dataclass + +from pywebpush import WebPushException + +from app.push.send import send_push +from app.push.vapid import get_vapid_private_key +from app.repository.push_subscriptions import PushSubscriptionRepository +from app.repository.settings import AppSettingsRepository + +logger = logging.getLogger(__name__) + +_SEND_TIMEOUT = 15 # seconds per push send +_VAPID_CLAIMS = {"sub": "mailto:noreply@meshcore.local"} + + +def _state_key_for_message(data: dict) -> str: + """Derive the conversation state key from a message event payload.""" + msg_type = data.get("type", "") + conversation_key = data.get("conversation_key", "") + if msg_type == "PRIV": + return f"contact-{conversation_key}" + return f"channel-{conversation_key}" + + +def _build_payload(data: dict) -> str: + """Build the push notification JSON payload from a message event.""" + msg_type = data.get("type", "") + text = data.get("text", "") + sender_name = data.get("sender_name") or "" + channel_name = data.get("channel_name") or "" + + if msg_type == "PRIV": + title = f"Message from {sender_name}" if sender_name else "New direct message" + body = text + else: + title = channel_name if channel_name else "Channel message" + body = text + + conversation_key = data.get("conversation_key", "") + state_key = _state_key_for_message(data) + if msg_type == "PRIV": + url_hash = f"#contact/{conversation_key}" + else: + url_hash = f"#channel/{conversation_key}" + + return json.dumps( + { + "title": title, + "body": body, + # Tag per conversation so different conversations coexist in the + # notification tray, while repeated messages in the same + # conversation replace each other. + "tag": f"meshcore-{state_key}", + "url_hash": url_hash, + } + ) + + +def _subscription_info(sub: dict) -> dict: + """Build the subscription_info dict that pywebpush expects.""" + return { + "endpoint": sub["endpoint"], + "keys": { + "p256dh": sub["p256dh"], + "auth": sub["auth"], + }, + } + + +@dataclass +class _SendResult: + sub_id: str + success: bool = False + expired: bool = False + + +class PushManager: + async def dispatch_message(self, data: dict) -> None: + """Send push notifications for a message event to all devices.""" + # Don't notify for messages the operator just sent themselves + if data.get("outgoing"): + return + + # Check the global conversation list + state_key = _state_key_for_message(data) + try: + push_conversations = await AppSettingsRepository.get_push_conversations() + except Exception: + logger.debug("Push dispatch: failed to load push_conversations", exc_info=True) + return + + if state_key not in push_conversations: + return + + try: + subs = await PushSubscriptionRepository.get_all() + except Exception: + logger.debug("Push dispatch: failed to load subscriptions", exc_info=True) + return + + if not subs: + return + + payload = _build_payload(data) + vapid_key = get_vapid_private_key() + if not vapid_key: + logger.debug("Push dispatch: no VAPID key configured, skipping") + return + + results = await asyncio.gather( + *(self._send_one(sub, payload, vapid_key) for sub in subs), + return_exceptions=True, + ) + + # Batch-update all delivery outcomes in one transaction. + success_ids: list[str] = [] + failure_ids: list[str] = [] + remove_ids: list[str] = [] + for r in results: + if isinstance(r, _SendResult): + if r.expired: + remove_ids.append(r.sub_id) + elif r.success: + success_ids.append(r.sub_id) + else: + failure_ids.append(r.sub_id) + if success_ids or failure_ids or remove_ids: + try: + await PushSubscriptionRepository.batch_record_outcomes( + success_ids, failure_ids, remove_ids + ) + except Exception: + logger.debug("Push dispatch: failed to record outcomes", exc_info=True) + + async def _send_one(self, sub: dict, payload: str, vapid_key: str) -> _SendResult: + sub_id = sub["id"] + result = _SendResult(sub_id=sub_id) + try: + async with asyncio.timeout(_SEND_TIMEOUT): + await send_push( + subscription_info=_subscription_info(sub), + payload=payload, + vapid_private_key=vapid_key, + vapid_claims=_VAPID_CLAIMS, + ) + result.success = True + except WebPushException as e: + status = getattr(e, "response", None) + status_code = getattr(status, "status_code", 0) if status else 0 + if status_code in (403, 404, 410): + logger.info("Push subscription expired (HTTP %d), removing %s", status_code, sub_id) + result.expired = True + else: + logger.warning("Push send failed for %s: %s", sub_id, e) + except TimeoutError: + logger.warning("Push send timed out for %s", sub_id) + except Exception: + logger.debug("Push send error for %s", sub_id, exc_info=True) + return result + + +push_manager = PushManager() diff --git a/app/push/send.py b/app/push/send.py new file mode 100644 index 0000000..2af8759 --- /dev/null +++ b/app/push/send.py @@ -0,0 +1,231 @@ +"""Thin wrapper around pywebpush for sending push notifications. + +Isolates the pywebpush dependency and runs the synchronous send in +a thread executor to avoid blocking the event loop. +""" + +import asyncio +import logging +import socket +from typing import Any, cast + +import requests +import urllib3.connection +import urllib3.connectionpool +from pywebpush import webpush +from requests.adapters import HTTPAdapter +from requests.exceptions import ConnectionError as RequestsConnectionError +from requests.exceptions import ConnectTimeout as RequestsConnectTimeout +from urllib3.exceptions import ConnectTimeoutError, NameResolutionError, NewConnectionError + +logger = logging.getLogger(__name__) + +DEFAULT_TIMEOUT = object() +DEFAULT_PUSH_CONNECT_TIMEOUT_SECONDS = 3 +IPV4_FALLBACK_CONNECT_TIMEOUT_SECONDS = 10 +DEFAULT_PUSH_READ_TIMEOUT_SECONDS = 10 + + +def _create_ipv4_connection( + address: tuple[str, int], + timeout: float | None | object = DEFAULT_TIMEOUT, + source_address: tuple[str, int] | None = None, + socket_options=None, +) -> socket.socket: + """Create a socket connection using IPv4 only.""" + host, port = address + if host.startswith("["): + host = host.strip("[]") + + err: OSError | None = None + for res in socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM): + af, socktype, proto, _, sa = res + sock = None + try: + sock = socket.socket(af, socktype, proto) + if socket_options: + for opt in socket_options: + sock.setsockopt(*opt) + if timeout is not DEFAULT_TIMEOUT: + sock.settimeout(cast(float | None, timeout)) + if source_address: + sock.bind(source_address) + sock.connect(sa) + return sock + except OSError as exc: + err = exc + if sock is not None: + sock.close() + + if err is not None: + raise err + raise OSError("getaddrinfo returns an empty list") + + +class IPv4HTTPConnection(urllib3.connection.HTTPConnection): + """urllib3 HTTP connection that resolves and connects via IPv4 only.""" + + def _new_conn(self) -> socket.socket: + try: + return _create_ipv4_connection( + (self._dns_host, self.port), + self.timeout, + source_address=self.source_address, + socket_options=self.socket_options, + ) + except socket.gaierror as exc: + raise NameResolutionError(self.host, self, exc) from exc + except TimeoutError as exc: + raise ConnectTimeoutError( + self, + f"Connection to {self.host} timed out. (connect timeout={self.timeout})", + ) from exc + except OSError as exc: + raise NewConnectionError(self, f"Failed to establish a new connection: {exc}") from exc + + +class IPv4HTTPSConnection(urllib3.connection.HTTPSConnection): + """urllib3 HTTPS connection that resolves and connects via IPv4 only.""" + + def _new_conn(self) -> socket.socket: + try: + return _create_ipv4_connection( + (self._dns_host, self.port), + self.timeout, + source_address=self.source_address, + socket_options=self.socket_options, + ) + except socket.gaierror as exc: + raise NameResolutionError(self.host, self, exc) from exc + except TimeoutError as exc: + raise ConnectTimeoutError( + self, + f"Connection to {self.host} timed out. (connect timeout={self.timeout})", + ) from exc + except OSError as exc: + raise NewConnectionError(self, f"Failed to establish a new connection: {exc}") from exc + + +class IPv4HTTPConnectionPool(urllib3.connectionpool.HTTPConnectionPool): + ConnectionCls = cast(Any, IPv4HTTPConnection) + + +class IPv4HTTPSConnectionPool(urllib3.connectionpool.HTTPSConnectionPool): + ConnectionCls = cast(Any, IPv4HTTPSConnection) + + +def _configure_pool_manager_for_ipv4(manager: Any) -> None: + manager.pool_classes_by_scheme = manager.pool_classes_by_scheme.copy() + manager.pool_classes_by_scheme["http"] = IPv4HTTPConnectionPool + manager.pool_classes_by_scheme["https"] = IPv4HTTPSConnectionPool + + +class IPv4HTTPAdapter(HTTPAdapter): + """requests adapter that uses IPv4-only urllib3 connection pools.""" + + def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs): + super().init_poolmanager(connections, maxsize, block=block, **pool_kwargs) + _configure_pool_manager_for_ipv4(self.poolmanager) + + def proxy_manager_for(self, *args, **kwargs): + manager = super().proxy_manager_for(*args, **kwargs) + _configure_pool_manager_for_ipv4(manager) + return manager + + +def _build_default_requests_session() -> requests.Session: + return requests.Session() + + +def _build_ipv4_requests_session() -> requests.Session: + session = requests.Session() + adapter = IPv4HTTPAdapter() + session.mount("http://", adapter) + session.mount("https://", adapter) + return session + + +def _send_push_with_session( + *, + subscription_info: dict, + payload: str, + vapid_private_key: str, + vapid_claims: dict, + session: requests.Session, + connect_timeout_seconds: int, +) -> int: + response = webpush( + subscription_info=subscription_info, + data=payload, + vapid_private_key=vapid_private_key, + vapid_claims=vapid_claims, + content_encoding="aes128gcm", + timeout=cast(Any, (connect_timeout_seconds, DEFAULT_PUSH_READ_TIMEOUT_SECONDS)), + requests_session=session, + ) + return response.status_code # type: ignore[union-attr] + + +def _send_push_with_fallback( + subscription_info: dict, + payload: str, + vapid_private_key: str, + vapid_claims: dict, +) -> int: + """Send using normal dual-stack resolution, then retry with IPv4-only on connect failures.""" + session = _build_default_requests_session() + try: + return _send_push_with_session( + subscription_info=subscription_info, + payload=payload, + vapid_private_key=vapid_private_key, + vapid_claims=vapid_claims, + session=session, + connect_timeout_seconds=DEFAULT_PUSH_CONNECT_TIMEOUT_SECONDS, + ) + except (RequestsConnectTimeout, RequestsConnectionError) as exc: + logger.info("Push delivery retrying via IPv4 after initial network failure: %s", exc) + finally: + session.close() + + session = _build_ipv4_requests_session() + try: + return _send_push_with_session( + subscription_info=subscription_info, + payload=payload, + vapid_private_key=vapid_private_key, + vapid_claims=vapid_claims, + session=session, + connect_timeout_seconds=IPV4_FALLBACK_CONNECT_TIMEOUT_SECONDS, + ) + finally: + session.close() + + +async def send_push( + subscription_info: dict, + payload: str, + vapid_private_key: str, + vapid_claims: dict, +) -> int: + """Send an encrypted push notification. + + Args: + subscription_info: {"endpoint": ..., "keys": {"p256dh": ..., "auth": ...}} + payload: JSON string to encrypt and send + vapid_private_key: base64url-encoded raw EC private key scalar + vapid_claims: {"sub": "mailto:..."} or {"sub": "https://..."} + + Returns: + HTTP status code from the push service. + + Raises: + WebPushException: on push service error (caller handles 404/410 cleanup). + """ + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, + lambda: _send_push_with_fallback( + subscription_info, payload, vapid_private_key, vapid_claims + ), + ) diff --git a/app/push/vapid.py b/app/push/vapid.py new file mode 100644 index 0000000..cf0ef9f --- /dev/null +++ b/app/push/vapid.py @@ -0,0 +1,60 @@ +"""VAPID key management for Web Push. + +Generates a P-256 key pair on first use and caches it in app_settings +via ``AppSettingsRepository``. The public key is served to browsers +for ``PushManager.subscribe()``. +""" + +import base64 +import logging + +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat +from py_vapid import Vapid + +from app.repository.settings import AppSettingsRepository + +logger = logging.getLogger(__name__) + +_cached_private_key: str = "" +_cached_public_key: str = "" + + +async def ensure_vapid_keys() -> tuple[str, str]: + """Read or generate VAPID keys. Call once at startup after DB connect.""" + global _cached_private_key, _cached_public_key + + private, public = await AppSettingsRepository.get_vapid_keys() + if private and public: + _cached_private_key = private + _cached_public_key = public + logger.info("VAPID keys loaded from database") + return _cached_private_key, _cached_public_key + + # Generate new key pair + vapid = Vapid() + vapid.generate_keys() + + # Private key as base64url-encoded raw 32-byte EC scalar — the format + # that pywebpush passes to ``Vapid.from_string()``. + raw_priv = vapid.private_key.private_numbers().private_value.to_bytes(32, "big") # type: ignore[union-attr] + _cached_private_key = base64.urlsafe_b64encode(raw_priv).rstrip(b"=").decode("ascii") + + # Public key as uncompressed P-256 point, base64url-encoded (no padding) + # for the browser Push API's applicationServerKey + raw_pub = vapid.public_key.public_bytes(Encoding.X962, PublicFormat.UncompressedPoint) # type: ignore[union-attr] + _cached_public_key = base64.urlsafe_b64encode(raw_pub).rstrip(b"=").decode("ascii") + + await AppSettingsRepository.set_vapid_keys(_cached_private_key, _cached_public_key) + logger.info("Generated and stored new VAPID key pair") + + return _cached_private_key, _cached_public_key + + +def get_vapid_public_key() -> str: + """Return the cached VAPID public key (base64url). Must call ensure_vapid_keys() first.""" + return _cached_public_key + + +def get_vapid_private_key() -> str: + """Return the cached VAPID private key (base64url). Must call ensure_vapid_keys() first.""" + return _cached_private_key diff --git a/app/repository/push_subscriptions.py b/app/repository/push_subscriptions.py new file mode 100644 index 0000000..b8d1fec --- /dev/null +++ b/app/repository/push_subscriptions.py @@ -0,0 +1,162 @@ +"""Repository for push_subscriptions table.""" + +import logging +import time +import uuid +from typing import Any + +from app.database import db + +logger = logging.getLogger(__name__) + +# Auto-delete subscriptions that have failed this many times consecutively +# without any successful delivery in between. +MAX_CONSECUTIVE_FAILURES = 15 + + +def _row_to_dict(row: Any) -> dict[str, Any]: + return { + "id": row["id"], + "endpoint": row["endpoint"], + "p256dh": row["p256dh"], + "auth": row["auth"], + "label": row["label"] or "", + "created_at": row["created_at"] or 0, + "last_success_at": row["last_success_at"], + "failure_count": row["failure_count"] or 0, + } + + +class PushSubscriptionRepository: + @staticmethod + async def create( + endpoint: str, + p256dh: str, + auth: str, + label: str = "", + ) -> dict[str, Any]: + """Create or upsert a push subscription (keyed by endpoint).""" + sub_id = str(uuid.uuid4()) + now = int(time.time()) + + async with db.tx() as conn: + await conn.execute( + """ + INSERT INTO push_subscriptions + (id, endpoint, p256dh, auth, label, created_at, failure_count) + VALUES (?, ?, ?, ?, ?, ?, 0) + ON CONFLICT(endpoint) DO UPDATE SET + p256dh = excluded.p256dh, + auth = excluded.auth, + label = CASE WHEN excluded.label != '' THEN excluded.label + ELSE push_subscriptions.label END, + failure_count = 0 + """, + (sub_id, endpoint, p256dh, auth, label, now), + ) + async with conn.execute( + "SELECT * FROM push_subscriptions WHERE endpoint = ?", (endpoint,) + ) as cursor: + row = await cursor.fetchone() + + return _row_to_dict(row) if row else {"id": sub_id} # type: ignore[arg-type] + + @staticmethod + async def get(subscription_id: str) -> dict[str, Any] | None: + async with db.readonly() as conn: + async with conn.execute( + "SELECT * FROM push_subscriptions WHERE id = ?", (subscription_id,) + ) as cursor: + row = await cursor.fetchone() + return _row_to_dict(row) if row else None + + @staticmethod + async def get_by_endpoint(endpoint: str) -> dict[str, Any] | None: + async with db.readonly() as conn: + async with conn.execute( + "SELECT * FROM push_subscriptions WHERE endpoint = ?", (endpoint,) + ) as cursor: + row = await cursor.fetchone() + return _row_to_dict(row) if row else None + + @staticmethod + async def get_all() -> list[dict[str, Any]]: + async with db.readonly() as conn: + async with conn.execute( + "SELECT * FROM push_subscriptions ORDER BY created_at DESC" + ) as cursor: + rows = await cursor.fetchall() + return [_row_to_dict(row) for row in rows] + + @staticmethod + async def update(subscription_id: str, **fields: Any) -> dict[str, Any] | None: + updates: list[str] = [] + params: list[Any] = [] + + if "label" in fields: + updates.append("label = ?") + params.append(fields["label"]) + + if not updates: + return await PushSubscriptionRepository.get(subscription_id) + + params.append(subscription_id) + async with db.tx() as conn: + await conn.execute( + f"UPDATE push_subscriptions SET {', '.join(updates)} WHERE id = ?", + params, + ) + async with conn.execute( + "SELECT * FROM push_subscriptions WHERE id = ?", (subscription_id,) + ) as cursor: + row = await cursor.fetchone() + return _row_to_dict(row) if row else None + + @staticmethod + async def delete(subscription_id: str) -> bool: + async with db.tx() as conn: + async with conn.execute( + "DELETE FROM push_subscriptions WHERE id = ?", (subscription_id,) + ) as cursor: + return cursor.rowcount > 0 + + @staticmethod + async def delete_by_endpoint(endpoint: str) -> bool: + async with db.tx() as conn: + async with conn.execute( + "DELETE FROM push_subscriptions WHERE endpoint = ?", (endpoint,) + ) as cursor: + return cursor.rowcount > 0 + + @staticmethod + async def batch_record_outcomes( + success_ids: list[str], failure_ids: list[str], remove_ids: list[str] + ) -> None: + """Batch-update delivery outcomes in a single transaction.""" + now = int(time.time()) + async with db.tx() as conn: + if remove_ids: + placeholders = ",".join("?" for _ in remove_ids) + await conn.execute( + f"DELETE FROM push_subscriptions WHERE id IN ({placeholders})", + remove_ids, + ) + if success_ids: + placeholders = ",".join("?" for _ in success_ids) + await conn.execute( + f"UPDATE push_subscriptions SET last_success_at = ?, failure_count = 0 " + f"WHERE id IN ({placeholders})", + [now, *success_ids], + ) + if failure_ids: + placeholders = ",".join("?" for _ in failure_ids) + await conn.execute( + f"UPDATE push_subscriptions SET failure_count = failure_count + 1 " + f"WHERE id IN ({placeholders})", + failure_ids, + ) + # Evict subscriptions that have exceeded the failure threshold + await conn.execute( + "DELETE FROM push_subscriptions WHERE failure_count >= ?", + (MAX_CONSECUTIVE_FAILURES,), + ) diff --git a/app/repository/settings.py b/app/repository/settings.py index 38bd087..7405eaf 100644 --- a/app/repository/settings.py +++ b/app/repository/settings.py @@ -282,6 +282,85 @@ class AppSettingsRepository: await AppSettingsRepository._apply_updates(conn, blocked_names=new_names) return await AppSettingsRepository._get_in_conn(conn) + @staticmethod + async def get_vapid_keys() -> tuple[str, str]: + """Return (private_key_pem, public_key_b64url) from app_settings. + + These are internal-only columns not exposed via the AppSettings model. + """ + async with db.readonly() as conn: + async with conn.execute( + "SELECT vapid_private_key, vapid_public_key FROM app_settings WHERE id = 1" + ) as cursor: + row = await cursor.fetchone() + if row and row["vapid_private_key"] and row["vapid_public_key"]: + return row["vapid_private_key"], row["vapid_public_key"] + return "", "" + + @staticmethod + async def set_vapid_keys(private_key: str, public_key: str) -> None: + """Persist auto-generated VAPID key pair to app_settings.""" + async with db.tx() as conn: + await conn.execute( + "UPDATE app_settings SET vapid_private_key = ?, vapid_public_key = ? WHERE id = 1", + (private_key, public_key), + ) + + @staticmethod + async def get_push_conversations() -> list[str]: + """Return the global list of push-enabled conversation state keys. + + Internal-only column, not exposed via the AppSettings model. + """ + async with db.readonly() as conn: + async with conn.execute( + "SELECT push_conversations FROM app_settings WHERE id = 1" + ) as cursor: + row = await cursor.fetchone() + if row and row["push_conversations"]: + try: + return json.loads(row["push_conversations"]) + except (json.JSONDecodeError, TypeError): + return [] + return [] + + @staticmethod + async def set_push_conversations(conversations: list[str]) -> list[str]: + """Replace the global push-enabled conversation list.""" + async with db.tx() as conn: + await conn.execute( + "UPDATE app_settings SET push_conversations = ? WHERE id = 1", + (json.dumps(conversations),), + ) + return conversations + + @staticmethod + async def toggle_push_conversation(key: str) -> list[str]: + """Add or remove a conversation state key from the global push list. + + Atomic read-modify-write under a single ``db.tx()`` lock. + """ + async with db.tx() as conn: + async with conn.execute( + "SELECT push_conversations FROM app_settings WHERE id = 1" + ) as cursor: + row = await cursor.fetchone() + current: list[str] = [] + if row and row["push_conversations"]: + try: + current = json.loads(row["push_conversations"]) + except (json.JSONDecodeError, TypeError): + current = [] + if key in current: + current = [k for k in current if k != key] + else: + current.append(key) + await conn.execute( + "UPDATE app_settings SET push_conversations = ? WHERE id = 1", + (json.dumps(current),), + ) + return current + class StatisticsRepository: @staticmethod diff --git a/app/routers/push.py b/app/routers/push.py new file mode 100644 index 0000000..942976c --- /dev/null +++ b/app/routers/push.py @@ -0,0 +1,164 @@ +"""Web Push subscription management endpoints.""" + +import asyncio +import json +import logging + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field +from pywebpush import WebPushException + +from app.push.send import send_push +from app.push.vapid import get_vapid_private_key, get_vapid_public_key +from app.repository.push_subscriptions import PushSubscriptionRepository +from app.repository.settings import AppSettingsRepository + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/push", tags=["push"]) + + +# ── Request/response models ───────────────────────────────────────────── + + +class VapidPublicKeyResponse(BaseModel): + public_key: str + + +class PushSubscribeRequest(BaseModel): + endpoint: str = Field(min_length=1) + p256dh: str = Field(min_length=1) + auth: str = Field(min_length=1) + label: str = "" + + +class PushSubscriptionUpdate(BaseModel): + label: str | None = None + + +class PushConversationToggle(BaseModel): + key: str = Field(min_length=1) + + +# ─��� Endpoints ──────────────────────────────────────────────────────────── + + +@router.get("/vapid-public-key", response_model=VapidPublicKeyResponse) +async def vapid_public_key() -> VapidPublicKeyResponse: + """Return the VAPID public key for browser PushManager.subscribe().""" + key = get_vapid_public_key() + if not key: + raise HTTPException(status_code=503, detail="VAPID keys not initialized") + return VapidPublicKeyResponse(public_key=key) + + +@router.post("/subscribe") +async def subscribe(body: PushSubscribeRequest) -> dict: + """Register or update a push subscription (device). Upserts by endpoint.""" + sub = await PushSubscriptionRepository.create( + endpoint=body.endpoint, + p256dh=body.p256dh, + auth=body.auth, + label=body.label, + ) + return sub + + +@router.get("/subscriptions") +async def list_subscriptions() -> list[dict]: + """List all push subscriptions (devices).""" + return await PushSubscriptionRepository.get_all() + + +@router.patch("/subscriptions/{subscription_id}") +async def update_subscription(subscription_id: str, body: PushSubscriptionUpdate) -> dict: + """Update a subscription's label.""" + existing = await PushSubscriptionRepository.get(subscription_id) + if not existing: + raise HTTPException(status_code=404, detail="Subscription not found") + + updates = {} + if body.label is not None: + updates["label"] = body.label + + result = await PushSubscriptionRepository.update(subscription_id, **updates) + return result or existing + + +@router.delete("/subscriptions/{subscription_id}") +async def unsubscribe(subscription_id: str) -> dict: + """Delete a push subscription (device).""" + deleted = await PushSubscriptionRepository.delete(subscription_id) + if not deleted: + raise HTTPException(status_code=404, detail="Subscription not found") + return {"deleted": True} + + +@router.post("/subscriptions/{subscription_id}/test") +async def test_push(subscription_id: str) -> dict: + """Send a test notification to a subscription.""" + sub = await PushSubscriptionRepository.get(subscription_id) + if not sub: + raise HTTPException(status_code=404, detail="Subscription not found") + + vapid_key = get_vapid_private_key() + if not vapid_key: + raise HTTPException(status_code=503, detail="VAPID keys not initialized") + + payload = json.dumps( + { + "title": "RemoteTerm Test", + "body": "Push notifications are working!", + "tag": "meshcore-test", + "url_hash": "", + } + ) + + try: + async with asyncio.timeout(15): + await send_push( + subscription_info={ + "endpoint": sub["endpoint"], + "keys": {"p256dh": sub["p256dh"], "auth": sub["auth"]}, + }, + payload=payload, + vapid_private_key=vapid_key, + vapid_claims={"sub": "mailto:noreply@meshcore.local"}, + ) + return {"status": "sent"} + except TimeoutError: + raise HTTPException(status_code=504, detail="Push delivery timed out") from None + except WebPushException as e: + status_code = getattr(getattr(e, "response", None), "status_code", 0) + if status_code in (403, 404, 410): + logger.info( + "Test push: subscription stale (HTTP %d), removing %s", + status_code, + subscription_id, + ) + await PushSubscriptionRepository.delete(subscription_id) + raise HTTPException( + status_code=410, + detail="Subscription is stale (VAPID key mismatch or expired). " + "Re-enable push from a conversation header.", + ) from None + logger.warning("Test push failed: %s", e) + raise HTTPException(status_code=502, detail=f"Push delivery failed: {e}") from None + except Exception as e: + logger.warning("Test push failed: %s", e) + raise HTTPException(status_code=502, detail=f"Push delivery failed: {e}") from None + + +# ── Global push conversation management ────────────────────────────────── + + +@router.get("/conversations") +async def get_push_conversations() -> list[str]: + """Return the global list of push-enabled conversation state keys.""" + return await AppSettingsRepository.get_push_conversations() + + +@router.post("/conversations/toggle") +async def toggle_push_conversation(body: PushConversationToggle) -> list[str]: + """Add or remove a conversation from the global push list.""" + return await AppSettingsRepository.toggle_push_conversation(body.key) diff --git a/app/websocket.py b/app/websocket.py index ba08694..1c54235 100644 --- a/app/websocket.py +++ b/app/websocket.py @@ -108,6 +108,10 @@ def broadcast_event(event_type: str, data: dict, *, realtime: bool = True) -> No if event_type == "message": asyncio.create_task(fanout_manager.broadcast_message(data)) + + from app.push.manager import push_manager + + asyncio.create_task(push_manager.dispatch_message(data)) elif event_type == "raw_packet": asyncio.create_task(fanout_manager.broadcast_raw(data)) elif event_type == "contact": diff --git a/frontend/AGENTS.md b/frontend/AGENTS.md index 32ac676..35d8ffd 100644 --- a/frontend/AGENTS.md +++ b/frontend/AGENTS.md @@ -57,6 +57,7 @@ frontend/src/ │ ├── useConversationRouter.ts # URL hash → active conversation routing │ ├── useContactsAndChannels.ts # Contact/channel loading, creation, deletion │ ├── useBrowserNotifications.ts # Per-conversation browser notification preferences + dispatch +│ ├── usePushSubscription.ts # Web Push subscription lifecycle, per-conversation filters │ ├── useFaviconBadge.ts # Browser tab unread badge state │ ├── useRawPacketStatsSession.ts # Session-scoped packet-feed stats history │ └── useRememberedServerPassword.ts # Browser-local repeater/room password persistence @@ -429,6 +430,17 @@ The `SearchView` component (`components/SearchView.tsx`) provides full-text sear - **Bidirectional pagination**: After jumping mid-history, `hasNewerMessages` enables forward pagination via `fetchNewerMessages`. The scroll-to-bottom button calls `jumpToBottom` (re-fetches latest page) instead of just scrolling. - **WS message suppression**: When `hasNewerMessages` is true, incoming WS messages for the active conversation are not added to the message list (the user is viewing historical context, not the latest page). +## Web Push Notifications + +Web Push allows notifications even when the browser tab is closed. Requires HTTPS (self-signed OK). + +- **Service worker**: `frontend/public/sw.js` handles `push` events (show notification) and `notificationclick` (focus/open tab, navigate via `url_hash`). Registered in `main.tsx` on secure contexts only. +- **`usePushSubscription` hook**: manages the full subscription lifecycle — subscribe (register SW → `PushManager.subscribe()` → POST to backend), unsubscribe, global push-conversation toggles, device listing, and deletion. +- **ChatHeader integration**: `BellRing` icon (amber when active) appears next to the existing desktop notification `Bell` on secure contexts. First click subscribes the browser and enables push for that conversation; subsequent clicks toggle the conversation on/off. +- **Settings > Local**: `PushDeviceManagement` component shows subscription status, lists all registered devices with test/delete buttons. Uses `usePushSubscription` hook directly. +- Auto-generates device labels from User-Agent (e.g., "Chrome on macOS"). +- `PushSubscriptionInfo` type in `types.ts`; API methods in `api.ts`. + ## Styling UI styling is mostly utility-class driven (Tailwind-style classes in JSX) plus shared globals in `index.css` and `styles.css`. diff --git a/frontend/index.html b/frontend/index.html index 2a1470f..027c16e 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -15,10 +15,8 @@