mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
131 lines
4.4 KiB
Python
131 lines
4.4 KiB
Python
"""Fanout module for Apprise push notifications."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
|
|
|
|
from app.fanout.base import FanoutModule
|
|
from app.path_utils import split_path_hex
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _parse_urls(raw: str) -> list[str]:
|
|
"""Split multi-line URL string into individual URLs."""
|
|
return [line.strip() for line in raw.splitlines() if line.strip()]
|
|
|
|
|
|
def _normalize_discord_url(url: str) -> str:
|
|
"""Add avatar=no to Discord URLs to suppress identity override."""
|
|
parts = urlsplit(url)
|
|
scheme = parts.scheme.lower()
|
|
host = parts.netloc.lower()
|
|
|
|
is_discord = scheme in ("discord", "discords") or (
|
|
scheme in ("http", "https")
|
|
and host in ("discord.com", "discordapp.com")
|
|
and parts.path.lower().startswith("/api/webhooks/")
|
|
)
|
|
if not is_discord:
|
|
return url
|
|
|
|
query = dict(parse_qsl(parts.query, keep_blank_values=True))
|
|
query["avatar"] = "no"
|
|
return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment))
|
|
|
|
|
|
def _format_body(data: dict, *, include_path: bool) -> str:
|
|
"""Build a human-readable notification body from message data."""
|
|
msg_type = data.get("type", "")
|
|
text = data.get("text", "")
|
|
sender_name = data.get("sender_name") or "Unknown"
|
|
|
|
via = ""
|
|
if include_path:
|
|
paths = data.get("paths")
|
|
if paths and isinstance(paths, list) and len(paths) > 0:
|
|
first_path = paths[0] if isinstance(paths[0], dict) else {}
|
|
path_str = first_path.get("path", "")
|
|
path_len = first_path.get("path_len")
|
|
else:
|
|
path_str = None
|
|
path_len = None
|
|
|
|
if msg_type == "PRIV" and path_str is None:
|
|
via = " **via:** [`direct`]"
|
|
elif path_str is not None:
|
|
path_str = path_str.strip().lower()
|
|
if path_str == "":
|
|
via = " **via:** [`direct`]"
|
|
else:
|
|
hop_count = path_len if isinstance(path_len, int) else len(path_str) // 2
|
|
hops = split_path_hex(path_str, hop_count)
|
|
if hops:
|
|
hop_list = ", ".join(f"`{h}`" for h in hops)
|
|
via = f" **via:** [{hop_list}]"
|
|
|
|
if msg_type == "PRIV":
|
|
return f"**DM:** {sender_name}: {text}{via}"
|
|
|
|
channel_name = data.get("channel_name") or data.get("conversation_key", "channel")
|
|
return f"**{channel_name}:** {sender_name}: {text}{via}"
|
|
|
|
|
|
def _send_sync(urls_raw: str, body: str, *, preserve_identity: bool) -> bool:
|
|
"""Send notification synchronously via Apprise. Returns True on success."""
|
|
import apprise as apprise_lib
|
|
|
|
urls = _parse_urls(urls_raw)
|
|
if not urls:
|
|
return False
|
|
|
|
notifier = apprise_lib.Apprise()
|
|
for url in urls:
|
|
if preserve_identity:
|
|
url = _normalize_discord_url(url)
|
|
notifier.add(url)
|
|
|
|
return bool(notifier.notify(title="", body=body))
|
|
|
|
|
|
class AppriseModule(FanoutModule):
|
|
"""Sends push notifications via Apprise for incoming messages."""
|
|
|
|
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
|
|
super().__init__(config_id, config, name=name)
|
|
self._last_error: str | None = None
|
|
|
|
async def on_message(self, data: dict) -> None:
|
|
# Skip outgoing messages — only notify on incoming
|
|
if data.get("outgoing"):
|
|
return
|
|
|
|
urls = self.config.get("urls", "")
|
|
if not urls or not urls.strip():
|
|
return
|
|
|
|
preserve_identity = self.config.get("preserve_identity", True)
|
|
include_path = self.config.get("include_path", True)
|
|
body = _format_body(data, include_path=include_path)
|
|
|
|
try:
|
|
success = await asyncio.to_thread(
|
|
_send_sync, urls, body, preserve_identity=preserve_identity
|
|
)
|
|
self._last_error = None if success else "Apprise notify returned failure"
|
|
if not success:
|
|
logger.warning("Apprise notification failed for module %s", self.config_id)
|
|
except Exception as exc:
|
|
self._last_error = str(exc)
|
|
logger.exception("Apprise send error for module %s", self.config_id)
|
|
|
|
@property
|
|
def status(self) -> str:
|
|
if not self.config.get("urls", "").strip():
|
|
return "disconnected"
|
|
if self._last_error:
|
|
return "error"
|
|
return "connected"
|