Files
Remote-Terminal-for-MeshCore/app/fanout/apprise_mod.py
2026-03-08 13:54:06 -07:00

131 lines
4.4 KiB
Python

"""Fanout module for Apprise push notifications."""
from __future__ import annotations
import asyncio
import logging
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from app.fanout.base import FanoutModule
from app.path_utils import split_path_hex
logger = logging.getLogger(__name__)
def _parse_urls(raw: str) -> list[str]:
"""Split multi-line URL string into individual URLs."""
return [line.strip() for line in raw.splitlines() if line.strip()]
def _normalize_discord_url(url: str) -> str:
"""Add avatar=no to Discord URLs to suppress identity override."""
parts = urlsplit(url)
scheme = parts.scheme.lower()
host = parts.netloc.lower()
is_discord = scheme in ("discord", "discords") or (
scheme in ("http", "https")
and host in ("discord.com", "discordapp.com")
and parts.path.lower().startswith("/api/webhooks/")
)
if not is_discord:
return url
query = dict(parse_qsl(parts.query, keep_blank_values=True))
query["avatar"] = "no"
return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment))
def _format_body(data: dict, *, include_path: bool) -> str:
"""Build a human-readable notification body from message data."""
msg_type = data.get("type", "")
text = data.get("text", "")
sender_name = data.get("sender_name") or "Unknown"
via = ""
if include_path:
paths = data.get("paths")
if paths and isinstance(paths, list) and len(paths) > 0:
first_path = paths[0] if isinstance(paths[0], dict) else {}
path_str = first_path.get("path", "")
path_len = first_path.get("path_len")
else:
path_str = None
path_len = None
if msg_type == "PRIV" and path_str is None:
via = " **via:** [`direct`]"
elif path_str is not None:
path_str = path_str.strip().lower()
if path_str == "":
via = " **via:** [`direct`]"
else:
hop_count = path_len if isinstance(path_len, int) else len(path_str) // 2
hops = split_path_hex(path_str, hop_count)
if hops:
hop_list = ", ".join(f"`{h}`" for h in hops)
via = f" **via:** [{hop_list}]"
if msg_type == "PRIV":
return f"**DM:** {sender_name}: {text}{via}"
channel_name = data.get("channel_name") or data.get("conversation_key", "channel")
return f"**{channel_name}:** {sender_name}: {text}{via}"
def _send_sync(urls_raw: str, body: str, *, preserve_identity: bool) -> bool:
"""Send notification synchronously via Apprise. Returns True on success."""
import apprise as apprise_lib
urls = _parse_urls(urls_raw)
if not urls:
return False
notifier = apprise_lib.Apprise()
for url in urls:
if preserve_identity:
url = _normalize_discord_url(url)
notifier.add(url)
return bool(notifier.notify(title="", body=body))
class AppriseModule(FanoutModule):
"""Sends push notifications via Apprise for incoming messages."""
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
super().__init__(config_id, config, name=name)
self._last_error: str | None = None
async def on_message(self, data: dict) -> None:
# Skip outgoing messages — only notify on incoming
if data.get("outgoing"):
return
urls = self.config.get("urls", "")
if not urls or not urls.strip():
return
preserve_identity = self.config.get("preserve_identity", True)
include_path = self.config.get("include_path", True)
body = _format_body(data, include_path=include_path)
try:
success = await asyncio.to_thread(
_send_sync, urls, body, preserve_identity=preserve_identity
)
self._last_error = None if success else "Apprise notify returned failure"
if not success:
logger.warning("Apprise notification failed for module %s", self.config_id)
except Exception as exc:
self._last_error = str(exc)
logger.exception("Apprise send error for module %s", self.config_id)
@property
def status(self) -> str:
if not self.config.get("urls", "").strip():
return "disconnected"
if self._last_error:
return "error"
return "connected"