Add LetsMesh compatibility ingest, decoder integration, and admin auth updates

This commit is contained in:
yellowcooln
2026-02-21 21:40:14 -05:00
parent 3eff7f03db
commit 2f40b4a730
36 changed files with 2799 additions and 101 deletions

View File

@@ -51,8 +51,12 @@ def create_app(
admin_key: str | None = None,
mqtt_host: str = "localhost",
mqtt_port: int = 1883,
mqtt_username: str | None = None,
mqtt_password: str | None = None,
mqtt_prefix: str = "meshcore",
mqtt_tls: bool = False,
mqtt_transport: str = "tcp",
mqtt_ws_path: str = "/mqtt",
cors_origins: list[str] | None = None,
metrics_enabled: bool = True,
metrics_cache_ttl: int = 60,
@@ -65,8 +69,12 @@ def create_app(
admin_key: Admin API key
mqtt_host: MQTT broker host
mqtt_port: MQTT broker port
mqtt_username: MQTT username
mqtt_password: MQTT password
mqtt_prefix: MQTT topic prefix
mqtt_tls: Enable TLS/SSL for MQTT connection
mqtt_transport: MQTT transport protocol (tcp or websockets)
mqtt_ws_path: WebSocket path (used when transport=websockets)
cors_origins: Allowed CORS origins
metrics_enabled: Enable Prometheus metrics endpoint at /metrics
metrics_cache_ttl: Seconds to cache metrics output
@@ -90,8 +98,12 @@ def create_app(
app.state.admin_key = admin_key
app.state.mqtt_host = mqtt_host
app.state.mqtt_port = mqtt_port
app.state.mqtt_username = mqtt_username
app.state.mqtt_password = mqtt_password
app.state.mqtt_prefix = mqtt_prefix
app.state.mqtt_tls = mqtt_tls
app.state.mqtt_transport = mqtt_transport
app.state.mqtt_ws_path = mqtt_ws_path
app.state.metrics_cache_ttl = metrics_cache_ttl
# Configure CORS

View File

@@ -60,11 +60,25 @@ import click
envvar="MQTT_PORT",
help="MQTT broker port",
)
@click.option(
"--mqtt-username",
type=str,
default=None,
envvar="MQTT_USERNAME",
help="MQTT username",
)
@click.option(
"--mqtt-password",
type=str,
default=None,
envvar="MQTT_PASSWORD",
help="MQTT password",
)
@click.option(
"--mqtt-prefix",
type=str,
default="meshcore",
envvar="MQTT_TOPIC_PREFIX",
envvar=["MQTT_PREFIX", "MQTT_TOPIC_PREFIX"],
help="MQTT topic prefix",
)
@click.option(
@@ -74,6 +88,20 @@ import click
envvar="MQTT_TLS",
help="Enable TLS/SSL for MQTT connection",
)
@click.option(
"--mqtt-transport",
type=click.Choice(["tcp", "websockets"], case_sensitive=False),
default="tcp",
envvar="MQTT_TRANSPORT",
help="MQTT transport protocol",
)
@click.option(
"--mqtt-ws-path",
type=str,
default="/mqtt",
envvar="MQTT_WS_PATH",
help="MQTT WebSocket path (used when transport=websockets)",
)
@click.option(
"--cors-origins",
type=str,
@@ -111,8 +139,12 @@ def api(
admin_key: str | None,
mqtt_host: str,
mqtt_port: int,
mqtt_username: str | None,
mqtt_password: str | None,
mqtt_prefix: str,
mqtt_tls: bool,
mqtt_transport: str,
mqtt_ws_path: str,
cors_origins: str | None,
metrics_enabled: bool,
metrics_cache_ttl: int,
@@ -161,6 +193,7 @@ def api(
click.echo(f"Data home: {effective_data_home}")
click.echo(f"Database: {effective_db_url}")
click.echo(f"MQTT: {mqtt_host}:{mqtt_port} (prefix: {mqtt_prefix})")
click.echo(f"MQTT transport: {mqtt_transport} (ws_path: {mqtt_ws_path})")
click.echo(f"Read key configured: {read_key is not None}")
click.echo(f"Admin key configured: {admin_key is not None}")
click.echo(f"CORS origins: {cors_origins or 'none'}")
@@ -195,8 +228,12 @@ def api(
admin_key=admin_key,
mqtt_host=mqtt_host,
mqtt_port=mqtt_port,
mqtt_username=mqtt_username,
mqtt_password=mqtt_password,
mqtt_prefix=mqtt_prefix,
mqtt_tls=mqtt_tls,
mqtt_transport=mqtt_transport,
mqtt_ws_path=mqtt_ws_path,
cors_origins=origins_list,
metrics_enabled=metrics_enabled,
metrics_cache_ttl=metrics_cache_ttl,

View File

@@ -56,17 +56,25 @@ def get_mqtt_client(request: Request) -> MQTTClient:
"""
mqtt_host = getattr(request.app.state, "mqtt_host", "localhost")
mqtt_port = getattr(request.app.state, "mqtt_port", 1883)
mqtt_username = getattr(request.app.state, "mqtt_username", None)
mqtt_password = getattr(request.app.state, "mqtt_password", None)
mqtt_prefix = getattr(request.app.state, "mqtt_prefix", "meshcore")
mqtt_tls = getattr(request.app.state, "mqtt_tls", False)
mqtt_transport = getattr(request.app.state, "mqtt_transport", "tcp")
mqtt_ws_path = getattr(request.app.state, "mqtt_ws_path", "/mqtt")
# Use unique client ID to allow multiple API instances
unique_id = uuid.uuid4().hex[:8]
config = MQTTConfig(
host=mqtt_host,
port=mqtt_port,
username=mqtt_username,
password=mqtt_password,
prefix=mqtt_prefix,
client_id=f"meshcore-api-{unique_id}",
tls=mqtt_tls,
transport=mqtt_transport,
ws_path=mqtt_ws_path,
)
client = MQTTClient(config)

View File

@@ -48,7 +48,44 @@ async def list_nodes(
)
if adv_type:
query = query.where(Node.adv_type == adv_type)
normalized_adv_type = adv_type.strip().lower()
if normalized_adv_type == "repeater":
query = query.where(
or_(
Node.adv_type == "repeater",
Node.adv_type.ilike("%repeater%"),
Node.adv_type.ilike("%relay%"),
Node.name.ilike("%repeater%"),
Node.name.ilike("%relay%"),
)
)
elif normalized_adv_type == "companion":
query = query.where(
or_(
Node.adv_type == "companion",
Node.adv_type.ilike("%companion%"),
Node.adv_type.ilike("%observer%"),
Node.name.ilike("%companion%"),
Node.name.ilike("%observer%"),
)
)
elif normalized_adv_type == "room":
query = query.where(
or_(
Node.adv_type == "room",
Node.adv_type.ilike("%room%"),
Node.name.ilike("%room%"),
)
)
elif normalized_adv_type == "chat":
query = query.where(
or_(
Node.adv_type == "chat",
Node.adv_type.ilike("%chat%"),
)
)
else:
query = query.where(Node.adv_type == adv_type)
if member_id:
# Filter nodes that have a member_id tag with the specified value

View File

@@ -54,6 +54,31 @@ if TYPE_CHECKING:
envvar="MQTT_TLS",
help="Enable TLS/SSL for MQTT connection",
)
@click.option(
"--mqtt-transport",
type=click.Choice(["tcp", "websockets"], case_sensitive=False),
default="tcp",
envvar="MQTT_TRANSPORT",
help="MQTT transport protocol",
)
@click.option(
"--mqtt-ws-path",
type=str,
default="/mqtt",
envvar="MQTT_WS_PATH",
help="MQTT WebSocket path (used when transport=websockets)",
)
@click.option(
"--ingest-mode",
"collector_ingest_mode",
type=click.Choice(["native", "letsmesh_upload"], case_sensitive=False),
default="native",
envvar="COLLECTOR_INGEST_MODE",
help=(
"Collector ingest mode: native MeshCore events or LetsMesh upload "
"(packets/status/internal)"
),
)
@click.option(
"--data-home",
type=str,
@@ -90,6 +115,9 @@ def collector(
mqtt_password: str | None,
prefix: str,
mqtt_tls: bool,
mqtt_transport: str,
mqtt_ws_path: str,
collector_ingest_mode: str,
data_home: str | None,
seed_home: str | None,
database_url: str | None,
@@ -134,6 +162,9 @@ def collector(
ctx.obj["mqtt_password"] = mqtt_password
ctx.obj["prefix"] = prefix
ctx.obj["mqtt_tls"] = mqtt_tls
ctx.obj["mqtt_transport"] = mqtt_transport
ctx.obj["mqtt_ws_path"] = mqtt_ws_path
ctx.obj["collector_ingest_mode"] = collector_ingest_mode
ctx.obj["data_home"] = data_home or settings.data_home
ctx.obj["seed_home"] = settings.effective_seed_home
ctx.obj["database_url"] = effective_db_url
@@ -149,6 +180,9 @@ def collector(
mqtt_password=mqtt_password,
prefix=prefix,
mqtt_tls=mqtt_tls,
mqtt_transport=mqtt_transport,
mqtt_ws_path=mqtt_ws_path,
ingest_mode=collector_ingest_mode,
database_url=effective_db_url,
log_level=log_level,
data_home=data_home or settings.data_home,
@@ -163,6 +197,9 @@ def _run_collector_service(
mqtt_password: str | None,
prefix: str,
mqtt_tls: bool,
mqtt_transport: str,
mqtt_ws_path: str,
ingest_mode: str,
database_url: str,
log_level: str,
data_home: str,
@@ -191,6 +228,8 @@ def _run_collector_service(
click.echo(f"Data home: {data_home}")
click.echo(f"Seed home: {seed_home}")
click.echo(f"MQTT: {mqtt_host}:{mqtt_port} (prefix: {prefix})")
click.echo(f"MQTT transport: {mqtt_transport} (ws_path: {mqtt_ws_path})")
click.echo(f"Ingest mode: {ingest_mode}")
click.echo(f"Database: {database_url}")
# Load webhook configuration from settings
@@ -198,6 +237,7 @@ def _run_collector_service(
WebhookDispatcher,
create_webhooks_from_settings,
)
from meshcore_hub.collector.letsmesh_decoder import LetsMeshPacketDecoder
from meshcore_hub.common.config import get_collector_settings
settings = get_collector_settings()
@@ -234,6 +274,24 @@ def _run_collector_service(
if settings.data_retention_enabled or settings.node_cleanup_enabled:
click.echo(f" Interval: {settings.data_retention_interval_hours} hours")
if ingest_mode.lower() == "letsmesh_upload":
click.echo("")
click.echo("LetsMesh decode configuration:")
if settings.collector_letsmesh_decoder_enabled:
builtin_keys = len(LetsMeshPacketDecoder.BUILTIN_CHANNEL_KEYS)
env_keys = len(settings.collector_letsmesh_decoder_keys_list)
click.echo(
" Decoder: Enabled " f"({settings.collector_letsmesh_decoder_command})"
)
click.echo(f" Built-in keys: {builtin_keys}")
click.echo(" Additional keys from .env: " f"{env_keys} configured")
click.echo(
" Timeout: "
f"{settings.collector_letsmesh_decoder_timeout_seconds:.2f}s"
)
else:
click.echo(" Decoder: Disabled")
click.echo("")
click.echo("Starting MQTT subscriber...")
run_collector(
@@ -243,6 +301,9 @@ def _run_collector_service(
mqtt_password=mqtt_password,
mqtt_prefix=prefix,
mqtt_tls=mqtt_tls,
mqtt_transport=mqtt_transport,
mqtt_ws_path=mqtt_ws_path,
ingest_mode=ingest_mode,
database_url=database_url,
webhook_dispatcher=webhook_dispatcher,
cleanup_enabled=settings.data_retention_enabled,
@@ -250,6 +311,12 @@ def _run_collector_service(
cleanup_interval_hours=settings.data_retention_interval_hours,
node_cleanup_enabled=settings.node_cleanup_enabled,
node_cleanup_days=settings.node_cleanup_days,
letsmesh_decoder_enabled=settings.collector_letsmesh_decoder_enabled,
letsmesh_decoder_command=settings.collector_letsmesh_decoder_command,
letsmesh_decoder_channel_keys=settings.collector_letsmesh_decoder_keys_list,
letsmesh_decoder_timeout_seconds=(
settings.collector_letsmesh_decoder_timeout_seconds
),
)
@@ -267,6 +334,9 @@ def run_cmd(ctx: click.Context) -> None:
mqtt_password=ctx.obj["mqtt_password"],
prefix=ctx.obj["prefix"],
mqtt_tls=ctx.obj["mqtt_tls"],
mqtt_transport=ctx.obj["mqtt_transport"],
mqtt_ws_path=ctx.obj["mqtt_ws_path"],
ingest_mode=ctx.obj["collector_ingest_mode"],
database_url=ctx.obj["database_url"],
log_level=ctx.obj["log_level"],
data_home=ctx.obj["data_home"],

View File

@@ -14,6 +14,20 @@ from meshcore_hub.common.models import Advertisement, Node, add_event_receiver
logger = logging.getLogger(__name__)
def _coerce_float(value: Any) -> float | None:
"""Convert int/float/string values to float when possible."""
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
try:
return float(value.strip())
except ValueError:
return None
return None
def handle_advertisement(
public_key: str,
event_type: str,
@@ -40,6 +54,22 @@ def handle_advertisement(
name = payload.get("name")
adv_type = payload.get("adv_type")
flags = payload.get("flags")
lat = payload.get("lat")
lon = payload.get("lon")
if lat is None:
lat = payload.get("adv_lat")
if lon is None:
lon = payload.get("adv_lon")
location = payload.get("location")
if isinstance(location, dict):
if lat is None:
lat = location.get("latitude")
if lon is None:
lon = location.get("longitude")
lat = _coerce_float(lat)
lon = _coerce_float(lon)
now = datetime.now(timezone.utc)
# Compute event hash for deduplication (30-second time bucket)
@@ -79,6 +109,10 @@ def handle_advertisement(
node_query = select(Node).where(Node.public_key == adv_public_key)
node = session.execute(node_query).scalar_one_or_none()
if node:
if lat is not None:
node.lat = lat
if lon is not None:
node.lon = lon
node.last_seen = now
# Add this receiver to the junction table
@@ -110,6 +144,10 @@ def handle_advertisement(
node.adv_type = adv_type
if flags is not None:
node.flags = flags
if lat is not None:
node.lat = lat
if lon is not None:
node.lon = lon
node.last_seen = now
else:
# Create new node
@@ -120,6 +158,8 @@ def handle_advertisement(
flags=flags,
first_seen=now,
last_seen=now,
lat=lat,
lon=lon,
)
session.add(node)
session.flush()

View File

@@ -70,7 +70,7 @@ def _handle_message(
now = datetime.now(timezone.utc)
# Extract fields based on message type
pubkey_prefix = payload.get("pubkey_prefix") if message_type == "contact" else None
pubkey_prefix = payload.get("pubkey_prefix")
channel_idx = payload.get("channel_idx") if message_type == "channel" else None
path_len = payload.get("path_len")
txt_type = payload.get("txt_type")

View File

@@ -0,0 +1,272 @@
"""LetsMesh packet decoder integration.
Provides an optional bridge to the external `meshcore-decoder` CLI so the
collector can turn LetsMesh upload `raw` packet hex into decoded message data.
"""
from __future__ import annotations
import hashlib
import json
import logging
import shlex
import shutil
import string
import subprocess
from typing import Any, NamedTuple
logger = logging.getLogger(__name__)
class LetsMeshPacketDecoder:
"""Decode LetsMesh packet payloads with `meshcore-decoder` CLI."""
class ChannelKey(NamedTuple):
"""Channel key metadata for decryption and channel labeling."""
label: str | None
key_hex: str
channel_hash: str
# Built-in keys required by your deployment.
# - Public channel
# - #test channel
BUILTIN_CHANNEL_KEYS: tuple[tuple[str, str], ...] = (
("Public", "8B3387E9C5CDEA6AC9E5EDBAA115CD72"),
("test", "9CD8FCF22A47333B591D96A2B848B73F"),
)
def __init__(
self,
enabled: bool = True,
command: str = "meshcore-decoder",
channel_keys: list[str] | None = None,
timeout_seconds: float = 2.0,
) -> None:
self._enabled = enabled
self._command_tokens = shlex.split(command.strip()) if command.strip() else []
self._channel_key_infos = self._normalize_channel_keys(channel_keys or [])
self._channel_keys = [info.key_hex for info in self._channel_key_infos]
self._channel_names_by_hash = {
info.channel_hash: info.label
for info in self._channel_key_infos
if info.label
}
self._decode_cache: dict[str, dict[str, Any] | None] = {}
self._decode_cache_maxsize = 2048
self._timeout_seconds = timeout_seconds
self._checked_command = False
self._command_available = False
self._warned_unavailable = False
@classmethod
def _normalize_channel_keys(cls, values: list[str]) -> list[ChannelKey]:
"""Normalize key list (labels + key + channel hash, deduplicated)."""
normalized: list[LetsMeshPacketDecoder.ChannelKey] = []
seen_keys: set[str] = set()
for label, key in cls.BUILTIN_CHANNEL_KEYS:
entry = cls._normalize_channel_entry(f"{label}={key}")
if not entry:
continue
if entry.key_hex in seen_keys:
continue
normalized.append(entry)
seen_keys.add(entry.key_hex)
for value in values:
entry = cls._normalize_channel_entry(value)
if not entry:
continue
if entry.key_hex in seen_keys:
continue
normalized.append(entry)
seen_keys.add(entry.key_hex)
return normalized
@classmethod
def _normalize_channel_entry(cls, value: str | None) -> ChannelKey | None:
"""Normalize one key entry (`label=hex`, `label:hex`, or `hex`)."""
if value is None:
return None
candidate = value.strip()
if not candidate:
return None
label: str | None = None
key_candidate = candidate
for separator in ("=", ":"):
if separator not in candidate:
continue
left, right = candidate.split(separator, 1)
right = right.strip()
right = right.removeprefix("0x").removeprefix("0X").strip()
if right and cls._is_hex(right):
label = left.strip().lstrip("#")
key_candidate = right
break
key_candidate = key_candidate.strip()
key_candidate = key_candidate.removeprefix("0x").removeprefix("0X").strip()
if not key_candidate or not cls._is_hex(key_candidate):
return None
key_hex = key_candidate.upper()
channel_hash = cls._compute_channel_hash(key_hex)
normalized_label = label.strip() if label and label.strip() else None
return cls.ChannelKey(
label=normalized_label,
key_hex=key_hex,
channel_hash=channel_hash,
)
@staticmethod
def _is_hex(value: str) -> bool:
"""Return True if string contains only hex digits."""
return bool(value) and all(char in string.hexdigits for char in value)
@staticmethod
def _compute_channel_hash(key_hex: str) -> str:
"""Compute channel hash (first byte of SHA-256 of channel key)."""
return hashlib.sha256(bytes.fromhex(key_hex)).digest()[:1].hex().upper()
def channel_name_from_decoded(
self,
decoded_packet: dict[str, Any] | None,
) -> str | None:
"""Resolve channel label from decoded payload channel hash."""
if not isinstance(decoded_packet, dict):
return None
payload = decoded_packet.get("payload")
if not isinstance(payload, dict):
return None
decoded = payload.get("decoded")
if not isinstance(decoded, dict):
return None
channel_hash = decoded.get("channelHash")
if not isinstance(channel_hash, str):
return None
return self._channel_names_by_hash.get(channel_hash.upper())
def channel_labels_by_index(self) -> dict[int, str]:
"""Return channel labels keyed by numeric channel index (0-255)."""
labels: dict[int, str] = {}
for info in self._channel_key_infos:
if not info.label:
continue
label = info.label.strip()
if not label:
continue
if label.lower() == "public":
normalized_label = "Public"
else:
normalized_label = label if label.startswith("#") else f"#{label}"
channel_idx = int(info.channel_hash, 16)
labels.setdefault(channel_idx, normalized_label)
return labels
def decode_payload(self, payload: dict[str, Any]) -> dict[str, Any] | None:
"""Decode packet payload `raw` hex and return decoded JSON if available."""
raw_hex = payload.get("raw")
if not isinstance(raw_hex, str):
return None
clean_hex = raw_hex.strip()
if not clean_hex:
return None
cached = self._decode_cache.get(clean_hex)
if clean_hex in self._decode_cache:
return cached
decoded = self._decode_raw(clean_hex)
self._decode_cache[clean_hex] = decoded
if len(self._decode_cache) > self._decode_cache_maxsize:
# Drop oldest cached payload (insertion-order dict).
self._decode_cache.pop(next(iter(self._decode_cache)))
return decoded
def _decode_raw(self, raw_hex: str) -> dict[str, Any] | None:
"""Decode raw packet hex with decoder CLI (cached per packet hex)."""
if not self._enabled:
return None
if not self._is_command_available():
return None
command = [*self._command_tokens, "decode", raw_hex, "--json"]
if self._channel_keys:
command.append("--key")
command.extend(self._channel_keys)
try:
result = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
timeout=self._timeout_seconds,
)
except subprocess.TimeoutExpired:
logger.debug(
"LetsMesh decoder timed out after %.2fs",
self._timeout_seconds,
)
return None
except OSError as exc:
logger.debug("LetsMesh decoder failed to execute: %s", exc)
return None
if result.returncode != 0:
stderr = result.stderr.strip() if result.stderr else ""
logger.debug(
"LetsMesh decoder exited with code %s%s",
result.returncode,
f": {stderr}" if stderr else "",
)
return None
output = result.stdout.strip()
if not output:
return None
try:
decoded = json.loads(output)
except json.JSONDecodeError:
logger.debug("LetsMesh decoder returned non-JSON output")
return None
return decoded if isinstance(decoded, dict) else None
def _is_command_available(self) -> bool:
"""Check decoder command availability once."""
if self._checked_command:
return self._command_available
self._checked_command = True
if not self._command_tokens:
self._command_available = False
else:
command = self._command_tokens[0]
if "/" in command:
self._command_available = shutil.which(command) is not None
else:
self._command_available = shutil.which(command) is not None
if not self._command_available and not self._warned_unavailable:
self._warned_unavailable = True
command_text = " ".join(self._command_tokens) or "<empty>"
logger.warning(
"LetsMesh decoder command not found (%s). "
"Messages will remain encrypted placeholders until decoder is installed.",
command_text,
)
return self._command_available

View File

@@ -21,6 +21,7 @@ from typing import Any, Callable, Optional, TYPE_CHECKING
from meshcore_hub.common.database import DatabaseManager
from meshcore_hub.common.health import HealthReporter
from meshcore_hub.common.mqtt import MQTTClient, MQTTConfig
from meshcore_hub.collector.letsmesh_decoder import LetsMeshPacketDecoder
if TYPE_CHECKING:
from meshcore_hub.collector.webhook import WebhookDispatcher
@@ -35,6 +36,9 @@ EventHandler = Callable[[str, str, dict[str, Any], DatabaseManager], None]
class Subscriber:
"""MQTT Subscriber for collecting and storing MeshCore events."""
INGEST_MODE_NATIVE = "native"
INGEST_MODE_LETSMESH_UPLOAD = "letsmesh_upload"
def __init__(
self,
mqtt_client: MQTTClient,
@@ -45,6 +49,11 @@ class Subscriber:
cleanup_interval_hours: int = 24,
node_cleanup_enabled: bool = False,
node_cleanup_days: int = 90,
ingest_mode: str = INGEST_MODE_NATIVE,
letsmesh_decoder_enabled: bool = True,
letsmesh_decoder_command: str = "meshcore-decoder",
letsmesh_decoder_channel_keys: list[str] | None = None,
letsmesh_decoder_timeout_seconds: float = 2.0,
):
"""Initialize subscriber.
@@ -57,6 +66,11 @@ class Subscriber:
cleanup_interval_hours: Hours between cleanup runs
node_cleanup_enabled: Enable automatic cleanup of inactive nodes
node_cleanup_days: Remove nodes not seen for this many days
ingest_mode: Ingest mode ('native' or 'letsmesh_upload')
letsmesh_decoder_enabled: Enable external LetsMesh packet decoder
letsmesh_decoder_command: Decoder CLI command
letsmesh_decoder_channel_keys: Optional channel keys for decrypting group text
letsmesh_decoder_timeout_seconds: Decoder CLI timeout
"""
self.mqtt = mqtt_client
self.db = db_manager
@@ -79,6 +93,18 @@ class Subscriber:
self._node_cleanup_days = node_cleanup_days
self._cleanup_thread: Optional[threading.Thread] = None
self._last_cleanup: Optional[datetime] = None
self._ingest_mode = ingest_mode.lower()
if self._ingest_mode not in {
self.INGEST_MODE_NATIVE,
self.INGEST_MODE_LETSMESH_UPLOAD,
}:
raise ValueError(f"Unsupported collector ingest mode: {ingest_mode}")
self._letsmesh_decoder = LetsMeshPacketDecoder(
enabled=letsmesh_decoder_enabled,
command=letsmesh_decoder_command,
channel_keys=letsmesh_decoder_channel_keys,
timeout_seconds=letsmesh_decoder_timeout_seconds,
)
@property
def is_healthy(self) -> bool:
@@ -125,14 +151,702 @@ class Subscriber:
pattern: Subscription pattern
payload: Message payload
"""
# Parse event from topic
parsed = self.mqtt.topic_builder.parse_event_topic(topic)
parsed: tuple[str, str, dict[str, Any]] | None
if self._ingest_mode == self.INGEST_MODE_LETSMESH_UPLOAD:
parsed = self._normalize_letsmesh_event(topic, payload)
else:
parsed_event = self.mqtt.topic_builder.parse_event_topic(topic)
parsed = (
(parsed_event[0], parsed_event[1], payload) if parsed_event else None
)
if not parsed:
logger.warning(f"Could not parse event topic: {topic}")
logger.warning(
"Could not parse topic for ingest mode %s: %s",
self._ingest_mode,
topic,
)
return
public_key, event_type = parsed
logger.debug(f"Received event: {event_type} from {public_key[:12]}...")
public_key, event_type, normalized_payload = parsed
logger.debug("Received event: %s from %s...", event_type, public_key[:12])
self._dispatch_event(public_key, event_type, normalized_payload)
def _normalize_letsmesh_event(
self,
topic: str,
payload: dict[str, Any],
) -> tuple[str, str, dict[str, Any]] | None:
"""Normalize LetsMesh upload topics to collector event handlers."""
parsed = self.mqtt.topic_builder.parse_letsmesh_upload_topic(topic)
if not parsed:
return None
observer_public_key, feed_type = parsed
if feed_type == "status":
status_public_key = (
payload.get("origin_id")
or payload.get("public_key")
or observer_public_key
)
normalized_payload = dict(payload)
normalized_payload["public_key"] = status_public_key
status_name = payload.get("origin") or payload.get("name")
if status_name and not normalized_payload.get("name"):
normalized_payload["name"] = status_name
normalized_adv_type = self._normalize_letsmesh_adv_type(normalized_payload)
if normalized_adv_type:
normalized_payload["adv_type"] = normalized_adv_type
else:
normalized_payload.pop("adv_type", None)
stats = payload.get("stats")
if (
isinstance(stats, dict)
and "flags" not in normalized_payload
and "debug_flags" in stats
):
normalized_payload["flags"] = stats["debug_flags"]
return observer_public_key, "advertisement", normalized_payload
if feed_type == "packets":
decoded_packet = self._letsmesh_decoder.decode_payload(payload)
normalized_message = self._build_letsmesh_message_payload(
payload,
decoded_packet=decoded_packet,
)
if normalized_message:
event_type, message_payload = normalized_message
return observer_public_key, event_type, message_payload
normalized_advertisement = self._build_letsmesh_advertisement_payload(
payload,
decoded_packet=decoded_packet,
)
if normalized_advertisement:
return observer_public_key, "advertisement", normalized_advertisement
normalized_packet_payload = dict(payload)
if decoded_packet:
normalized_packet_payload["decoded_packet"] = decoded_packet
decoded_payload_type = self._extract_letsmesh_decoder_payload_type(
decoded_packet
)
if decoded_payload_type is not None:
normalized_packet_payload["decoded_payload_type"] = (
decoded_payload_type
)
return observer_public_key, "letsmesh_packet", normalized_packet_payload
if feed_type == "internal":
return observer_public_key, "letsmesh_internal", payload
return None
def _build_letsmesh_message_payload(
self,
payload: dict[str, Any],
decoded_packet: dict[str, Any] | None = None,
) -> tuple[str, dict[str, Any]] | None:
"""Build a message payload from LetsMesh packet data when possible."""
packet_type = self._resolve_letsmesh_packet_type(payload, decoded_packet)
if packet_type == 5:
event_type = "channel_msg_recv"
elif packet_type in {1, 2, 7}:
event_type = "contact_msg_recv"
else:
return None
normalized_payload = dict(payload)
packet_hash = payload.get("hash")
packet_hash_text = packet_hash if isinstance(packet_hash, str) else None
if decoded_packet is None:
decoded_packet = self._letsmesh_decoder.decode_payload(payload)
# In LetsMesh compatibility mode, only show messages that decrypt.
text = self._extract_letsmesh_decoder_text(decoded_packet)
if not text:
logger.debug(
"Skipping LetsMesh packet %s (type=%s): no decryptable text payload",
packet_hash_text or "unknown",
packet_type,
)
return None
txt_type = self._parse_int(payload.get("txt_type"))
if txt_type is None:
txt_type = self._extract_letsmesh_decoder_txt_type(decoded_packet)
normalized_payload["txt_type"] = (
txt_type if txt_type is not None else packet_type
)
normalized_payload["signature"] = payload.get("signature") or packet_hash
path_len = self._parse_path_length(payload.get("path"))
if path_len is None:
path_len = self._extract_letsmesh_decoder_path_length(decoded_packet)
normalized_payload["path_len"] = path_len
sender_timestamp = self._parse_sender_timestamp(payload)
if sender_timestamp is None:
sender_timestamp = self._extract_letsmesh_decoder_sender_timestamp(
decoded_packet
)
if sender_timestamp is not None:
normalized_payload["sender_timestamp"] = sender_timestamp
snr = self._parse_float(payload.get("SNR"))
if snr is None:
snr = self._parse_float(payload.get("snr"))
if snr is not None:
normalized_payload["SNR"] = snr
decoded_sender = self._extract_letsmesh_decoder_sender(
decoded_packet,
packet_type=packet_type,
)
sender_name = self._normalize_sender_name(decoded_sender)
if sender_name:
normalized_payload["sender_name"] = sender_name
if decoded_sender and not normalized_payload.get("pubkey_prefix"):
normalized_prefix = self._normalize_pubkey_prefix(decoded_sender)
if normalized_prefix:
normalized_payload["pubkey_prefix"] = normalized_prefix
if not normalized_payload.get("pubkey_prefix"):
fallback_sender = self._extract_letsmesh_sender_from_payload(payload)
if fallback_sender:
normalized_payload["pubkey_prefix"] = fallback_sender
sender_prefix = self._normalize_pubkey_prefix(
normalized_payload.get("pubkey_prefix")
)
if sender_prefix:
normalized_payload["pubkey_prefix"] = sender_prefix
else:
normalized_payload.pop("pubkey_prefix", None)
channel_idx = self._parse_int(payload.get("channel_idx"))
channel_hash = self._extract_letsmesh_decoder_channel_hash(decoded_packet)
if channel_idx is None and channel_hash:
channel_idx = self._parse_channel_hash_idx(channel_hash)
if channel_idx is not None:
normalized_payload["channel_idx"] = channel_idx
if event_type == "channel_msg_recv":
channel_name = self._letsmesh_decoder.channel_name_from_decoded(
decoded_packet
)
channel_label = self._format_channel_label(
channel_name=channel_name,
channel_hash=channel_hash,
channel_idx=channel_idx,
)
if channel_label:
normalized_payload["channel_name"] = channel_label
normalized_payload["text"] = self._prefix_sender_name(
text,
normalized_payload.get("sender_name"),
)
else:
normalized_payload["text"] = self._prefix_sender_name(
text,
normalized_payload.get("sender_name"),
)
return event_type, normalized_payload
def _build_letsmesh_advertisement_payload(
self,
payload: dict[str, Any],
decoded_packet: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
"""Map decoded LetsMesh packet payloads to advertisement events."""
if decoded_packet is None:
decoded_packet = self._letsmesh_decoder.decode_payload(payload)
if not isinstance(decoded_packet, dict):
return None
decoded_payload_type = self._extract_letsmesh_decoder_payload_type(
decoded_packet
)
# Primary packet forms that carry node identity/role/location metadata.
if decoded_payload_type not in {4, 11}:
return None
decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet)
if not decoded_payload:
return None
public_key = self._normalize_full_public_key(
decoded_payload.get("publicKey")
or payload.get("public_key")
or payload.get("origin_id")
)
if not public_key:
return None
normalized_payload: dict[str, Any] = {
"public_key": public_key,
}
app_data = decoded_payload.get("appData")
if isinstance(app_data, dict):
name = app_data.get("name")
if isinstance(name, str) and name.strip():
normalized_payload["name"] = name.strip()
flags = self._parse_int(app_data.get("flags"))
if flags is not None:
normalized_payload["flags"] = flags
device_role = app_data.get("deviceRole")
role_name = self._normalize_letsmesh_node_type(device_role)
if role_name:
normalized_payload["adv_type"] = role_name
location = app_data.get("location")
if isinstance(location, dict):
lat = self._parse_float(location.get("latitude"))
lon = self._parse_float(location.get("longitude"))
if lat is not None:
normalized_payload["lat"] = lat
if lon is not None:
normalized_payload["lon"] = lon
if "name" not in normalized_payload:
status_name = payload.get("origin") or payload.get("name")
if isinstance(status_name, str) and status_name.strip():
normalized_payload["name"] = status_name.strip()
if "flags" not in normalized_payload:
raw_flags = self._parse_int(decoded_payload.get("rawFlags"))
if raw_flags is not None:
normalized_payload["flags"] = raw_flags
if "adv_type" not in normalized_payload:
node_type = self._normalize_letsmesh_node_type(
decoded_payload.get("nodeType")
)
node_type_name = self._normalize_letsmesh_node_type(
decoded_payload.get("nodeTypeName")
)
normalized_adv_type = (
node_type
or node_type_name
or self._normalize_letsmesh_adv_type(normalized_payload)
)
if normalized_adv_type:
normalized_payload["adv_type"] = normalized_adv_type
return normalized_payload
@classmethod
def _extract_letsmesh_text(
cls,
payload: dict[str, Any],
depth: int = 3,
) -> str | None:
"""Extract text from possible LetsMesh packet payload fields."""
if depth < 0:
return None
for key in ("text", "message", "msg", "body", "content"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
for nested in payload.values():
if not isinstance(nested, dict):
continue
text = cls._extract_letsmesh_text(nested, depth=depth - 1)
if text:
return text
return None
@classmethod
def _extract_letsmesh_decoder_text(
cls,
decoded_packet: dict[str, Any] | None,
) -> str | None:
"""Extract human-readable text from decoder JSON output."""
if not isinstance(decoded_packet, dict):
return None
payload = decoded_packet.get("payload")
if not isinstance(payload, dict):
return None
return cls._extract_letsmesh_text(payload)
@classmethod
def _extract_letsmesh_decoder_sender_timestamp(
cls,
decoded_packet: dict[str, Any] | None,
) -> int | None:
"""Extract sender timestamp from decoder JSON output."""
if not isinstance(decoded_packet, dict):
return None
payload = decoded_packet.get("payload")
if not isinstance(payload, dict):
return None
decoded = payload.get("decoded")
if not isinstance(decoded, dict):
return None
decrypted = decoded.get("decrypted")
if not isinstance(decrypted, dict):
return None
return cls._parse_int(decrypted.get("timestamp"))
@classmethod
def _extract_letsmesh_decoder_sender(
cls,
decoded_packet: dict[str, Any] | None,
packet_type: int | None = None,
) -> str | None:
"""Extract sender identifier from decoder JSON output."""
if not isinstance(decoded_packet, dict):
return None
payload = decoded_packet.get("payload")
if not isinstance(payload, dict):
return None
decoded = payload.get("decoded")
if not isinstance(decoded, dict):
return None
decrypted = decoded.get("decrypted")
if not isinstance(decrypted, dict):
return None
sender = decrypted.get("sender")
if isinstance(sender, str) and sender.strip():
return sender.strip()
source_hash = decoded.get("sourceHash")
if isinstance(source_hash, str) and source_hash.strip():
return source_hash.strip()
return None
@staticmethod
def _extract_letsmesh_decoder_payload(
decoded_packet: dict[str, Any] | None,
) -> dict[str, Any] | None:
"""Extract decoded packet payload object."""
if not isinstance(decoded_packet, dict):
return None
payload = decoded_packet.get("payload")
if not isinstance(payload, dict):
return None
decoded = payload.get("decoded")
return decoded if isinstance(decoded, dict) else None
@classmethod
def _extract_letsmesh_decoder_payload_type(
cls,
decoded_packet: dict[str, Any] | None,
) -> int | None:
"""Extract payload type from decoder output."""
if not isinstance(decoded_packet, dict):
return None
payload_type = cls._parse_int(decoded_packet.get("payloadType"))
if payload_type is not None:
return payload_type
decoded = cls._extract_letsmesh_decoder_payload(decoded_packet)
if not decoded:
return None
return cls._parse_int(decoded.get("type"))
@classmethod
def _resolve_letsmesh_packet_type(
cls,
payload: dict[str, Any],
decoded_packet: dict[str, Any] | None = None,
) -> int | None:
"""Resolve packet type from source payload with decoder fallback."""
packet_type = cls._parse_int(payload.get("packet_type"))
if packet_type is not None:
return packet_type
return cls._extract_letsmesh_decoder_payload_type(decoded_packet)
@staticmethod
def _extract_letsmesh_sender_from_payload(payload: dict[str, Any]) -> str | None:
"""Extract sender-like identifiers from LetsMesh upload payload fields."""
for key in (
"pubkey_prefix",
"sourceHash",
"source_hash",
"source",
"sender",
"from",
"src",
):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return None
@classmethod
def _extract_letsmesh_decoder_txt_type(
cls,
decoded_packet: dict[str, Any] | None,
) -> int | None:
"""Extract txt_type equivalent from decoder output."""
if not isinstance(decoded_packet, dict):
return None
return cls._parse_int(decoded_packet.get("payloadType"))
@classmethod
def _extract_letsmesh_decoder_path_length(
cls,
decoded_packet: dict[str, Any] | None,
) -> int | None:
"""Extract path length from decoder output."""
if not isinstance(decoded_packet, dict):
return None
return cls._parse_int(decoded_packet.get("pathLength"))
@classmethod
def _extract_letsmesh_decoder_channel_hash(
cls,
decoded_packet: dict[str, Any] | None,
) -> str | None:
"""Extract channel hash (1-byte hex) from decoder output."""
if not isinstance(decoded_packet, dict):
return None
payload = decoded_packet.get("payload")
if not isinstance(payload, dict):
return None
decoded = payload.get("decoded")
if not isinstance(decoded, dict):
return None
channel_hash = decoded.get("channelHash")
if not isinstance(channel_hash, str):
return None
normalized = channel_hash.strip().upper()
if len(normalized) != 2:
return None
if any(ch not in "0123456789ABCDEF" for ch in normalized):
return None
return normalized
@staticmethod
def _normalize_full_public_key(value: Any) -> str | None:
"""Normalize full node public key (64 hex chars)."""
if not isinstance(value, str):
return None
normalized = value.strip().removeprefix("0x").removeprefix("0X").upper()
if len(normalized) != 64:
return None
if any(ch not in "0123456789ABCDEF" for ch in normalized):
return None
return normalized
@staticmethod
def _normalize_pubkey_prefix(value: Any) -> str | None:
"""Normalize sender key/prefix to 12 uppercase hex characters."""
if not isinstance(value, str):
return None
normalized = value.strip().removeprefix("0x").removeprefix("0X").upper()
if not normalized:
return None
if any(ch not in "0123456789ABCDEF" for ch in normalized):
return None
if len(normalized) < 8:
return None
return normalized[:12]
@staticmethod
def _parse_channel_hash_idx(channel_hash: str) -> int | None:
"""Convert 1-byte channel hash hex string into a stable numeric index."""
normalized = channel_hash.strip().upper()
if len(normalized) != 2:
return None
if any(ch not in "0123456789ABCDEF" for ch in normalized):
return None
return int(normalized, 16)
@staticmethod
def _format_channel_label(
channel_name: str | None,
channel_hash: str | None,
channel_idx: int | None,
) -> str | None:
"""Format a display label for channel messages."""
if channel_name and channel_name.strip():
cleaned = channel_name.strip()
if cleaned.lower() == "public":
return "Public"
return cleaned if cleaned.startswith("#") else f"#{cleaned}"
if channel_idx is not None:
return f"Ch {channel_idx}"
if channel_hash:
return f"Ch {channel_hash.upper()}"
return None
@staticmethod
def _prefix_channel_label(text: str, channel_label: str | None) -> str:
"""Prefix channel label to message text for LetsMesh channel feeds."""
if not channel_label:
return text
prefix = f"[{channel_label}] "
if text.startswith(prefix):
return text
return f"{prefix}{text}"
@classmethod
def _normalize_sender_name(cls, value: Any) -> str | None:
"""Normalize human sender names from decoder output."""
if not isinstance(value, str):
return None
normalized = value.strip()
if not normalized:
return None
if cls._normalize_pubkey_prefix(normalized):
return None
return normalized
@staticmethod
def _prefix_sender_name(text: str, sender_name: Any) -> str:
"""Prefix sender name when available and not already present."""
if not isinstance(sender_name, str):
return text
sender = sender_name.strip()
if not sender:
return text
lower_text = text.lstrip().lower()
prefix = f"{sender}:"
if lower_text.startswith(prefix.lower()):
return text
return f"{sender}: {text}"
@staticmethod
def _normalize_letsmesh_adv_type(payload: dict[str, Any]) -> str | None:
"""Map LetsMesh status fields to canonical node types."""
candidates: list[str] = []
for key in ("adv_type", "type", "node_type", "role", "mode", "status"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
candidates.append(value.strip().lower())
for key in ("origin", "name", "model"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
candidates.append(value.strip().lower())
if not candidates:
return None
normalized = " ".join(candidates)
if any(token in normalized for token in ("room server", "roomserver", "room")):
return "room"
if any(token in normalized for token in ("repeater", "relay")):
return "repeater"
if any(token in normalized for token in ("companion", "observer")):
return "companion"
if "chat" in normalized:
return "chat"
# Preserve existing canonical values when they are already set.
for candidate in candidates:
if candidate in {"chat", "repeater", "room", "companion"}:
return candidate
return None
@classmethod
def _normalize_letsmesh_node_type(cls, value: Any) -> str | None:
"""Normalize LetsMesh node-type values to canonical adv_type values."""
if value is None:
return None
if isinstance(value, (int, float)):
numeric = int(value)
if numeric == 0:
return None
if numeric == 1:
return "chat"
if numeric == 2:
return "repeater"
if numeric == 3:
return "room"
if numeric == 4:
return "companion"
return None
if isinstance(value, str):
normalized = value.strip()
if not normalized:
return None
return cls._normalize_letsmesh_adv_type({"type": normalized})
return None
@staticmethod
def _parse_int(value: Any) -> int | None:
"""Parse int-like values safely."""
if value is None:
return None
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
if isinstance(value, str):
try:
return int(value)
except ValueError:
return None
return None
@staticmethod
def _parse_float(value: Any) -> float | None:
"""Parse float-like values safely."""
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
try:
return float(value)
except ValueError:
return None
return None
@classmethod
def _parse_path_length(cls, value: Any) -> int | None:
"""Parse path length from list or packed hex string."""
if value is None:
return None
if isinstance(value, list):
return len(value)
if isinstance(value, str):
path = value.strip()
if not path:
return None
return len(path) // 2 if len(path) % 2 == 0 else len(path)
return cls._parse_int(value)
@staticmethod
def _parse_sender_timestamp(payload: dict[str, Any]) -> int | None:
"""Parse sender timestamp from known LetsMesh fields."""
sender_ts = payload.get("sender_timestamp")
if isinstance(sender_ts, (int, float)):
return int(sender_ts)
if isinstance(sender_ts, str):
try:
return int(float(sender_ts))
except ValueError:
return None
return None
def _dispatch_event(
self,
public_key: str,
event_type: str,
payload: dict[str, Any],
) -> None:
"""Route a normalized event to the appropriate handler."""
# Find and call handler
handler = self._handlers.get(event_type)
@@ -358,10 +1072,20 @@ class Subscriber:
logger.error(f"Failed to connect to MQTT broker: {e}")
raise
# Subscribe to all event topics
event_topic = self.mqtt.topic_builder.all_events_topic()
self.mqtt.subscribe(event_topic, self._handle_mqtt_message)
logger.info(f"Subscribed to event topic: {event_topic}")
# Subscribe to topics based on ingest mode
if self._ingest_mode == self.INGEST_MODE_LETSMESH_UPLOAD:
letsmesh_topics = [
f"{self.mqtt.topic_builder.prefix}/+/packets",
f"{self.mqtt.topic_builder.prefix}/+/status",
f"{self.mqtt.topic_builder.prefix}/+/internal",
]
for letsmesh_topic in letsmesh_topics:
self.mqtt.subscribe(letsmesh_topic, self._handle_mqtt_message)
logger.info(f"Subscribed to LetsMesh upload topic: {letsmesh_topic}")
else:
event_topic = self.mqtt.topic_builder.all_events_topic()
self.mqtt.subscribe(event_topic, self._handle_mqtt_message)
logger.info(f"Subscribed to event topic: {event_topic}")
self._running = True
@@ -429,6 +1153,9 @@ def create_subscriber(
mqtt_password: Optional[str] = None,
mqtt_prefix: str = "meshcore",
mqtt_tls: bool = False,
mqtt_transport: str = "tcp",
mqtt_ws_path: str = "/mqtt",
ingest_mode: str = "native",
database_url: str = "sqlite:///./meshcore.db",
webhook_dispatcher: Optional["WebhookDispatcher"] = None,
cleanup_enabled: bool = False,
@@ -436,6 +1163,10 @@ def create_subscriber(
cleanup_interval_hours: int = 24,
node_cleanup_enabled: bool = False,
node_cleanup_days: int = 90,
letsmesh_decoder_enabled: bool = True,
letsmesh_decoder_command: str = "meshcore-decoder",
letsmesh_decoder_channel_keys: list[str] | None = None,
letsmesh_decoder_timeout_seconds: float = 2.0,
) -> Subscriber:
"""Create a configured subscriber instance.
@@ -446,6 +1177,9 @@ def create_subscriber(
mqtt_password: MQTT password
mqtt_prefix: MQTT topic prefix
mqtt_tls: Enable TLS/SSL for MQTT connection
mqtt_transport: MQTT transport protocol (tcp or websockets)
mqtt_ws_path: WebSocket path (used when transport=websockets)
ingest_mode: Ingest mode ('native' or 'letsmesh_upload')
database_url: Database connection URL
webhook_dispatcher: Optional webhook dispatcher for event forwarding
cleanup_enabled: Enable automatic event data cleanup
@@ -453,6 +1187,10 @@ def create_subscriber(
cleanup_interval_hours: Hours between cleanup runs
node_cleanup_enabled: Enable automatic cleanup of inactive nodes
node_cleanup_days: Remove nodes not seen for this many days
letsmesh_decoder_enabled: Enable external LetsMesh packet decoder
letsmesh_decoder_command: Decoder CLI command
letsmesh_decoder_channel_keys: Optional channel keys for decrypting group text
letsmesh_decoder_timeout_seconds: Decoder CLI timeout
Returns:
Configured Subscriber instance
@@ -467,6 +1205,8 @@ def create_subscriber(
prefix=mqtt_prefix,
client_id=f"meshcore-collector-{unique_id}",
tls=mqtt_tls,
transport=mqtt_transport,
ws_path=mqtt_ws_path,
)
mqtt_client = MQTTClient(mqtt_config)
@@ -483,6 +1223,11 @@ def create_subscriber(
cleanup_interval_hours=cleanup_interval_hours,
node_cleanup_enabled=node_cleanup_enabled,
node_cleanup_days=node_cleanup_days,
ingest_mode=ingest_mode,
letsmesh_decoder_enabled=letsmesh_decoder_enabled,
letsmesh_decoder_command=letsmesh_decoder_command,
letsmesh_decoder_channel_keys=letsmesh_decoder_channel_keys,
letsmesh_decoder_timeout_seconds=letsmesh_decoder_timeout_seconds,
)
# Register handlers
@@ -500,6 +1245,9 @@ def run_collector(
mqtt_password: Optional[str] = None,
mqtt_prefix: str = "meshcore",
mqtt_tls: bool = False,
mqtt_transport: str = "tcp",
mqtt_ws_path: str = "/mqtt",
ingest_mode: str = "native",
database_url: str = "sqlite:///./meshcore.db",
webhook_dispatcher: Optional["WebhookDispatcher"] = None,
cleanup_enabled: bool = False,
@@ -507,6 +1255,10 @@ def run_collector(
cleanup_interval_hours: int = 24,
node_cleanup_enabled: bool = False,
node_cleanup_days: int = 90,
letsmesh_decoder_enabled: bool = True,
letsmesh_decoder_command: str = "meshcore-decoder",
letsmesh_decoder_channel_keys: list[str] | None = None,
letsmesh_decoder_timeout_seconds: float = 2.0,
) -> None:
"""Run the collector (blocking).
@@ -517,6 +1269,9 @@ def run_collector(
mqtt_password: MQTT password
mqtt_prefix: MQTT topic prefix
mqtt_tls: Enable TLS/SSL for MQTT connection
mqtt_transport: MQTT transport protocol (tcp or websockets)
mqtt_ws_path: WebSocket path (used when transport=websockets)
ingest_mode: Ingest mode ('native' or 'letsmesh_upload')
database_url: Database connection URL
webhook_dispatcher: Optional webhook dispatcher for event forwarding
cleanup_enabled: Enable automatic event data cleanup
@@ -524,6 +1279,10 @@ def run_collector(
cleanup_interval_hours: Hours between cleanup runs
node_cleanup_enabled: Enable automatic cleanup of inactive nodes
node_cleanup_days: Remove nodes not seen for this many days
letsmesh_decoder_enabled: Enable external LetsMesh packet decoder
letsmesh_decoder_command: Decoder CLI command
letsmesh_decoder_channel_keys: Optional channel keys for decrypting group text
letsmesh_decoder_timeout_seconds: Decoder CLI timeout
"""
subscriber = create_subscriber(
mqtt_host=mqtt_host,
@@ -532,6 +1291,9 @@ def run_collector(
mqtt_password=mqtt_password,
mqtt_prefix=mqtt_prefix,
mqtt_tls=mqtt_tls,
mqtt_transport=mqtt_transport,
mqtt_ws_path=mqtt_ws_path,
ingest_mode=ingest_mode,
database_url=database_url,
webhook_dispatcher=webhook_dispatcher,
cleanup_enabled=cleanup_enabled,
@@ -539,6 +1301,10 @@ def run_collector(
cleanup_interval_hours=cleanup_interval_hours,
node_cleanup_enabled=node_cleanup_enabled,
node_cleanup_days=node_cleanup_days,
letsmesh_decoder_enabled=letsmesh_decoder_enabled,
letsmesh_decoder_command=letsmesh_decoder_command,
letsmesh_decoder_channel_keys=letsmesh_decoder_channel_keys,
letsmesh_decoder_timeout_seconds=letsmesh_decoder_timeout_seconds,
)
# Set up signal handlers

View File

@@ -1,6 +1,7 @@
"""Pydantic Settings for MeshCore Hub configuration."""
from enum import Enum
import re
from typing import Optional
from pydantic import Field, field_validator
@@ -24,6 +25,20 @@ class InterfaceMode(str, Enum):
SENDER = "SENDER"
class MQTTTransport(str, Enum):
"""MQTT transport type."""
TCP = "tcp"
WEBSOCKETS = "websockets"
class CollectorIngestMode(str, Enum):
"""Collector MQTT ingest mode."""
NATIVE = "native"
LETSMESH_UPLOAD = "letsmesh_upload"
class CommonSettings(BaseSettings):
"""Common settings shared by all components."""
@@ -55,6 +70,14 @@ class CommonSettings(BaseSettings):
mqtt_tls: bool = Field(
default=False, description="Enable TLS/SSL for MQTT connection"
)
mqtt_transport: MQTTTransport = Field(
default=MQTTTransport.TCP,
description="MQTT transport protocol (tcp or websockets)",
)
mqtt_ws_path: str = Field(
default="/mqtt",
description="WebSocket path for MQTT transport (used when MQTT_TRANSPORT=websockets)",
)
class InterfaceSettings(CommonSettings):
@@ -162,6 +185,42 @@ class CollectorSettings(CommonSettings):
description="Remove nodes not seen for this many days (last_seen)",
ge=1,
)
collector_ingest_mode: CollectorIngestMode = Field(
default=CollectorIngestMode.NATIVE,
description=(
"Collector MQTT ingest mode. "
"'native' expects <prefix>/<pubkey>/event/<event_name>. "
"'letsmesh_upload' expects LetsMesh observer uploads on "
"<prefix>/<pubkey>/(packets|status|internal)."
),
)
collector_letsmesh_decoder_enabled: bool = Field(
default=True,
description=(
"Enable external LetsMesh packet decoding via meshcore-decoder. "
"Only applies when COLLECTOR_INGEST_MODE=letsmesh_upload."
),
)
collector_letsmesh_decoder_command: str = Field(
default="meshcore-decoder",
description=(
"Command used to run LetsMesh packet decoder CLI "
"(for example: meshcore-decoder, /usr/local/bin/meshcore-decoder, "
"or 'npx meshcore-decoder')."
),
)
collector_letsmesh_decoder_keys: Optional[str] = Field(
default=None,
description=(
"Optional channel secret keys for LetsMesh message decryption. "
"Provide as comma/space separated hex values."
),
)
collector_letsmesh_decoder_timeout_seconds: float = Field(
default=2.0,
description="Timeout in seconds for each decoder invocation.",
ge=0.1,
)
@property
def collector_data_dir(self) -> str:
@@ -201,6 +260,17 @@ class CollectorSettings(CommonSettings):
return str(Path(self.effective_seed_home) / "members.yaml")
@property
def collector_letsmesh_decoder_keys_list(self) -> list[str]:
"""Parse configured LetsMesh decoder keys into a normalized list."""
if not self.collector_letsmesh_decoder_keys:
return []
return [
part.strip()
for part in re.split(r"[,\s]+", self.collector_letsmesh_decoder_keys)
if part.strip()
]
@field_validator("database_url")
@classmethod
def validate_database_url(cls, v: Optional[str]) -> Optional[str]:
@@ -267,6 +337,13 @@ class WebSettings(CommonSettings):
default="en",
description="Locale/language for the web dashboard (e.g. 'en')",
)
web_datetime_locale: str = Field(
default="en-US",
description=(
"Locale used for date/time formatting in the web dashboard "
"(e.g. 'en-US', 'en-GB')."
),
)
# Auto-refresh interval for list pages
web_auto_refresh_seconds: int = Field(

View File

@@ -24,6 +24,8 @@ class MQTTConfig:
keepalive: int = 60
clean_session: bool = True
tls: bool = False
transport: str = "tcp"
ws_path: str = "/mqtt"
class TopicBuilder:
@@ -37,6 +39,10 @@ class TopicBuilder:
"""
self.prefix = prefix
def _prefix_parts(self) -> list[str]:
"""Split configured prefix into path segments."""
return [part for part in self.prefix.strip("/").split("/") if part]
def event_topic(self, public_key: str, event_name: str) -> str:
"""Build an event topic.
@@ -86,10 +92,16 @@ class TopicBuilder:
Returns:
Tuple of (public_key, event_name) or None if invalid
"""
parts = topic.split("/")
if len(parts) >= 4 and parts[0] == self.prefix and parts[2] == "event":
public_key = parts[1]
event_name = "/".join(parts[3:])
parts = [part for part in topic.strip("/").split("/") if part]
prefix_parts = self._prefix_parts()
prefix_len = len(prefix_parts)
if (
len(parts) >= prefix_len + 3
and parts[:prefix_len] == prefix_parts
and parts[prefix_len + 1] == "event"
):
public_key = parts[prefix_len]
event_name = "/".join(parts[prefix_len + 2 :])
return (public_key, event_name)
return None
@@ -102,13 +114,39 @@ class TopicBuilder:
Returns:
Tuple of (public_key, command_name) or None if invalid
"""
parts = topic.split("/")
if len(parts) >= 4 and parts[0] == self.prefix and parts[2] == "command":
public_key = parts[1]
command_name = "/".join(parts[3:])
parts = [part for part in topic.strip("/").split("/") if part]
prefix_parts = self._prefix_parts()
prefix_len = len(prefix_parts)
if (
len(parts) >= prefix_len + 3
and parts[:prefix_len] == prefix_parts
and parts[prefix_len + 1] == "command"
):
public_key = parts[prefix_len]
command_name = "/".join(parts[prefix_len + 2 :])
return (public_key, command_name)
return None
def parse_letsmesh_upload_topic(self, topic: str) -> tuple[str, str] | None:
"""Parse a LetsMesh upload topic to extract public key and feed type.
LetsMesh upload topics are expected in this form:
<prefix>/<public_key>/(packets|status|internal)
"""
parts = [part for part in topic.strip("/").split("/") if part]
prefix_parts = self._prefix_parts()
prefix_len = len(prefix_parts)
if len(parts) != prefix_len + 2 or parts[:prefix_len] != prefix_parts:
return None
public_key = parts[prefix_len]
feed_type = parts[prefix_len + 1]
if feed_type not in {"packets", "status", "internal"}:
return None
return (public_key, feed_type)
MessageHandler = Callable[[str, str, dict[str, Any]], None]
@@ -124,14 +162,24 @@ class MQTTClient:
"""
self.config = config
self.topic_builder = TopicBuilder(config.prefix)
transport = config.transport.lower()
if transport not in {"tcp", "websockets"}:
raise ValueError(f"Unsupported MQTT transport: {config.transport}")
self._client = mqtt.Client(
callback_api_version=CallbackAPIVersion.VERSION2, # type: ignore[call-arg]
client_id=config.client_id,
clean_session=config.clean_session,
transport=transport,
)
self._connected = False
self._message_handlers: dict[str, list[MessageHandler]] = {}
# Set WebSocket path when using MQTT over WebSockets.
if transport == "websockets":
self._client.ws_set_options(path=config.ws_path)
logger.debug("MQTT WebSocket transport enabled (path=%s)", config.ws_path)
# Set up TLS if enabled
if config.tls:
self._client.tls_set()

View File

@@ -28,6 +28,14 @@ class AdvertisementEvent(BaseModel):
default=None,
description="Capability/status flags bitmask",
)
lat: Optional[float] = Field(
default=None,
description="Node latitude when location metadata is available",
)
lon: Optional[float] = Field(
default=None,
description="Node longitude when location metadata is available",
)
class ContactMessageEvent(BaseModel):

View File

@@ -2,6 +2,8 @@
import json
import logging
import os
import re
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
@@ -16,6 +18,7 @@ from fastapi.templating import Jinja2Templates
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
from meshcore_hub import __version__
from meshcore_hub.collector.letsmesh_decoder import LetsMeshPacketDecoder
from meshcore_hub.common.i18n import load_locale, t
from meshcore_hub.common.schemas import RadioConfig
from meshcore_hub.web.middleware import CacheControlMiddleware
@@ -29,6 +32,40 @@ TEMPLATES_DIR = PACKAGE_DIR / "templates"
STATIC_DIR = PACKAGE_DIR / "static"
def _parse_decoder_key_entries(raw: str | None) -> list[str]:
"""Parse COLLECTOR_LETSMESH_DECODER_KEYS into key entries."""
if not raw:
return []
return [part.strip() for part in re.split(r"[,\s]+", raw) if part.strip()]
def _build_channel_labels() -> dict[str, str]:
"""Build UI channel labels from built-in + configured decoder keys."""
raw_keys = os.getenv("COLLECTOR_LETSMESH_DECODER_KEYS")
decoder = LetsMeshPacketDecoder(
enabled=False,
channel_keys=_parse_decoder_key_entries(raw_keys),
)
labels = decoder.channel_labels_by_index()
return {str(idx): label for idx, label in sorted(labels.items())}
def _is_authenticated_proxy_request(request: Request) -> bool:
"""Check whether request is authenticated by an upstream auth proxy.
Supported patterns:
- OAuth2/OIDC proxy headers: X-Forwarded-User, X-Auth-Request-User
- Forwarded Basic auth header: Authorization: Basic ...
"""
if request.headers.get("x-forwarded-user"):
return True
if request.headers.get("x-auth-request-user"):
return True
auth_header = request.headers.get("authorization", "")
return auth_header.lower().startswith("basic ")
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Application lifespan handler."""
@@ -114,10 +151,12 @@ def _build_config_json(app: FastAPI, request: Request) -> str:
"version": __version__,
"timezone": app.state.timezone_abbr,
"timezone_iana": app.state.timezone,
"is_authenticated": bool(request.headers.get("X-Forwarded-User")),
"is_authenticated": _is_authenticated_proxy_request(request),
"default_theme": app.state.web_theme,
"locale": app.state.web_locale,
"datetime_locale": app.state.web_datetime_locale,
"auto_refresh_seconds": app.state.auto_refresh_seconds,
"channel_labels": app.state.channel_labels,
}
return json.dumps(config)
@@ -183,10 +222,12 @@ def create_app(
# Load i18n translations
app.state.web_locale = settings.web_locale or "en"
app.state.web_datetime_locale = settings.web_datetime_locale or "en-US"
load_locale(app.state.web_locale)
# Auto-refresh interval
app.state.auto_refresh_seconds = settings.web_auto_refresh_seconds
app.state.channel_labels = _build_channel_labels()
# Store configuration in app state (use args if provided, else settings)
app.state.web_theme = (
@@ -310,7 +351,7 @@ def create_app(
if (
request.method in ("POST", "PUT", "DELETE", "PATCH")
and request.app.state.admin_enabled
and not request.headers.get("x-forwarded-user")
and not _is_authenticated_proxy_request(request)
):
return JSONResponse(
{"detail": "Authentication required"},

View File

@@ -22,6 +22,37 @@ export function getConfig() {
return window.__APP_CONFIG__ || {};
}
/**
* Parse API datetime strings reliably.
* MeshCore API often returns UTC timestamps without an explicit timezone suffix.
* In that case, treat them as UTC by appending 'Z' before Date parsing.
*
* @param {string|null} isoString
* @returns {Date|null}
*/
export function parseAppDate(isoString) {
if (!isoString || typeof isoString !== 'string') return null;
let value = isoString.trim();
if (!value) return null;
// Normalize "YYYY-MM-DD HH:MM:SS" to ISO separator.
if (/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}/.test(value)) {
value = value.replace(/\s+/, 'T');
}
// If no timezone suffix is present, treat as UTC.
const hasTimePart = /T\d{2}:\d{2}/.test(value);
const hasTimezoneSuffix = /(Z|[+-]\d{2}:\d{2}|[+-]\d{4})$/i.test(value);
if (hasTimePart && !hasTimezoneSuffix) {
value += 'Z';
}
const parsed = new Date(value);
if (isNaN(parsed.getTime())) return null;
return parsed;
}
/**
* Page color palette - reads from CSS custom properties (defined in app.css :root).
* Use for inline styles or dynamic coloring in page modules.
@@ -42,10 +73,21 @@ export const pageColors = {
* @param {string|null} advType
* @returns {string} Emoji character
*/
function inferNodeType(value) {
const normalized = (value || '').toLowerCase();
if (!normalized) return null;
if (normalized.includes('room')) return 'room';
if (normalized.includes('repeater') || normalized.includes('relay')) return 'repeater';
if (normalized.includes('companion') || normalized.includes('observer')) return 'companion';
if (normalized.includes('chat')) return 'chat';
return null;
}
export function typeEmoji(advType) {
switch ((advType || '').toLowerCase()) {
switch (inferNodeType(advType) || (advType || '').toLowerCase()) {
case 'chat': return '\u{1F4AC}'; // 💬
case 'repeater': return '\u{1F4E1}'; // 📡
case 'companion': return '\u{1F4F1}'; // 📱
case 'room': return '\u{1FAA7}'; // 🪧
default: return '\u{1F4CD}'; // 📍
}
@@ -74,7 +116,9 @@ export function extractFirstEmoji(str) {
*/
export function getNodeEmoji(nodeName, advType) {
const nameEmoji = extractFirstEmoji(nodeName);
return nameEmoji || typeEmoji(advType);
if (nameEmoji) return nameEmoji;
const inferred = inferNodeType(advType) || inferNodeType(nodeName);
return typeEmoji(inferred || advType);
}
/**
@@ -88,8 +132,9 @@ export function formatDateTime(isoString, options) {
try {
const config = getConfig();
const tz = config.timezone_iana || 'UTC';
const date = new Date(isoString);
if (isNaN(date.getTime())) return '-';
const locale = config.datetime_locale || 'en-US';
const date = parseAppDate(isoString);
if (!date) return '-';
const opts = options || {
timeZone: tz,
year: 'numeric', month: '2-digit', day: '2-digit',
@@ -97,7 +142,7 @@ export function formatDateTime(isoString, options) {
hour12: false,
};
if (!opts.timeZone) opts.timeZone = tz;
return date.toLocaleString('en-GB', opts);
return date.toLocaleString(locale, opts);
} catch {
return isoString ? isoString.slice(0, 19).replace('T', ' ') : '-';
}
@@ -113,9 +158,10 @@ export function formatDateTimeShort(isoString) {
try {
const config = getConfig();
const tz = config.timezone_iana || 'UTC';
const date = new Date(isoString);
if (isNaN(date.getTime())) return '-';
return date.toLocaleString('en-GB', {
const locale = config.datetime_locale || 'en-US';
const date = parseAppDate(isoString);
if (!date) return '-';
return date.toLocaleString(locale, {
timeZone: tz,
year: 'numeric', month: '2-digit', day: '2-digit',
hour: '2-digit', minute: '2-digit',
@@ -133,8 +179,8 @@ export function formatDateTimeShort(isoString) {
*/
export function formatRelativeTime(isoString) {
if (!isoString) return '';
const date = new Date(isoString);
if (isNaN(date.getTime())) return '';
const date = parseAppDate(isoString);
if (!date) return '';
const now = new Date();
const diffMs = now - date;
const diffSec = Math.floor(diffMs / 1000);

View File

@@ -1,44 +1,51 @@
import { apiGet } from '../api.js';
import {
html, litRender, nothing,
getConfig, typeEmoji, errorAlert, pageColors, t,
getConfig, typeEmoji, errorAlert, pageColors, t, formatDateTime,
} from '../components.js';
import {
iconNodes, iconAdvertisements, iconMessages, iconChannel,
} from '../icons.js';
function formatTimeOnly(isoString) {
if (!isoString) return '-';
try {
const config = getConfig();
const tz = config.timezone_iana || 'UTC';
const date = new Date(isoString);
if (isNaN(date.getTime())) return '-';
return date.toLocaleString('en-GB', {
timeZone: tz,
hour: '2-digit', minute: '2-digit', second: '2-digit',
hour12: false,
});
} catch {
return '-';
function knownChannelLabel(channelIdx) {
const config = getConfig();
const configuredChannelLabels = new Map(
Object.entries(config.channel_labels || {})
.map(([idx, label]) => [parseInt(idx, 10), typeof label === 'string' ? label.trim() : ''])
.filter(([idx, label]) => Number.isInteger(idx) && label.length > 0),
);
const builtInChannelLabels = new Map([
[17, 'Public'],
[217, '#test'],
[202, '#bot'],
[184, '#chat'],
[159, '#jokes'],
[221, '#sports'],
[104, '#emergency'],
]);
return configuredChannelLabels.get(channelIdx) || builtInChannelLabels.get(channelIdx) || null;
}
function channelLabel(channel) {
const idx = parseInt(String(channel), 10);
if (Number.isInteger(idx)) {
return knownChannelLabel(idx) || `Ch ${idx}`;
}
return String(channel);
}
function formatTimeOnly(isoString) {
return formatDateTime(isoString, {
hour: '2-digit', minute: '2-digit', second: '2-digit',
hour12: false,
});
}
function formatTimeShort(isoString) {
if (!isoString) return '-';
try {
const config = getConfig();
const tz = config.timezone_iana || 'UTC';
const date = new Date(isoString);
if (isNaN(date.getTime())) return '-';
return date.toLocaleString('en-GB', {
timeZone: tz,
hour: '2-digit', minute: '2-digit',
hour12: false,
});
} catch {
return '-';
}
return formatDateTime(isoString, {
hour: '2-digit', minute: '2-digit',
hour12: false,
});
}
function renderRecentAds(ads) {
@@ -81,6 +88,7 @@ function renderChannelMessages(channelMessages) {
if (!channelMessages || Object.keys(channelMessages).length === 0) return nothing;
const channels = Object.entries(channelMessages).map(([channel, messages]) => {
const label = channelLabel(channel);
const msgLines = messages.map(msg => html`
<div class="text-sm">
<span class="text-xs opacity-50">${formatTimeShort(msg.received_at)}</span>
@@ -89,8 +97,7 @@ function renderChannelMessages(channelMessages) {
return html`<div>
<h3 class="font-semibold text-sm mb-2 flex items-center gap-2">
<span class="badge badge-info badge-sm">CH${String(channel)}</span>
${t('dashboard.channel', { number: String(channel) })}
<span class="badge badge-info badge-sm">${label}</span>
</h3>
<div class="space-y-1 pl-2 border-l-2 border-base-300">
${msgLines}

View File

@@ -19,6 +19,169 @@ export async function render(container, params, router) {
const tz = config.timezone || '';
const tzBadge = tz && tz !== 'UTC' ? html`<span class="text-sm opacity-60">${tz}</span>` : nothing;
const navigate = (url) => router.navigate(url);
const configuredChannelLabels = new Map(
Object.entries(config.channel_labels || {})
.map(([idx, label]) => [parseInt(idx, 10), typeof label === 'string' ? label.trim() : ''])
.filter(([idx, label]) => Number.isInteger(idx) && label.length > 0),
);
const builtInChannelLabels = new Map([
[17, 'Public'],
[217, '#test'],
[202, '#bot'],
[184, '#chat'],
[159, '#jokes'],
[221, '#sports'],
[104, '#emergency'],
]);
function knownChannelLabel(channelIdx) {
return configuredChannelLabels.get(channelIdx) || builtInChannelLabels.get(channelIdx) || null;
}
function channelInfo(msg) {
if (msg.message_type !== 'channel') {
return { label: null, text: msg.text || '-' };
}
const rawText = msg.text || '';
const match = rawText.match(/^\[([^\]]+)\]\s+([\s\S]*)$/);
if (msg.channel_idx !== null && msg.channel_idx !== undefined) {
const knownLabel = knownChannelLabel(msg.channel_idx);
if (knownLabel) {
return {
label: knownLabel,
text: match ? (match[2] || '-') : (rawText || '-'),
};
}
}
if (msg.channel_name) {
return { label: msg.channel_name, text: msg.text || '-' };
}
if (match) {
return {
label: match[1],
text: match[2] || '-',
};
}
if (msg.channel_idx !== null && msg.channel_idx !== undefined) {
const knownLabel = knownChannelLabel(msg.channel_idx);
return { label: knownLabel || `Ch ${msg.channel_idx}`, text: rawText || '-' };
}
return { label: t('messages.type_channel'), text: rawText || '-' };
}
function senderBlock(msg, emphasize = false) {
const senderName = msg.sender_tag_name || msg.sender_name;
if (senderName) {
return emphasize
? html`<span class="font-medium">${senderName}</span>`
: html`${senderName}`;
}
const prefix = (msg.pubkey_prefix || '').slice(0, 12);
if (prefix) {
return html`<span class="font-mono text-xs">${prefix}</span>`;
}
return html`<span class="opacity-50">-</span>`;
}
function parseSenderFromText(text) {
if (!text || typeof text !== 'string') {
return { sender: null, text: text || '-' };
}
const patterns = [
/^\s*ack\s+@\[(.+?)\]\s*:\s*([\s\S]+)$/i,
/^\s*@\[(.+?)\]\s*:\s*([\s\S]+)$/i,
/^\s*ack\s+([^:|\n]{1,80})\s*:\s*([\s\S]+)$/i,
];
for (const pattern of patterns) {
const match = text.match(pattern);
if (!match) continue;
const sender = (match[1] || '').trim();
const remaining = (match[2] || '').trim();
if (!sender) continue;
return {
sender,
text: remaining || text,
};
}
return { sender: null, text };
}
function messageTextWithSender(msg, text) {
const parsed = parseSenderFromText(text || '-');
const explicitSender = msg.sender_tag_name || msg.sender_name || (msg.pubkey_prefix || '').slice(0, 12) || null;
const sender = explicitSender || parsed.sender;
const body = (parsed.text || text || '-').trim() || '-';
if (!sender) {
return body;
}
if (body.toLowerCase().startsWith(`${sender.toLowerCase()}:`)) {
return body;
}
return `${sender}: ${body}`;
}
function dedupeBySignature(items) {
const deduped = [];
const bySignature = new Map();
for (const msg of items) {
const signature = typeof msg.signature === 'string' ? msg.signature.trim().toUpperCase() : '';
const canDedupe = msg.message_type === 'channel' && signature.length >= 8;
if (!canDedupe) {
deduped.push(msg);
continue;
}
const existing = bySignature.get(signature);
if (!existing) {
const clone = {
...msg,
receivers: [...(msg.receivers || [])],
};
bySignature.set(signature, clone);
deduped.push(clone);
continue;
}
const combined = [...(existing.receivers || []), ...(msg.receivers || [])];
const seenReceivers = new Set();
existing.receivers = combined.filter((recv) => {
const key = recv?.public_key || recv?.node_id || `${recv?.received_at || ''}:${recv?.snr || ''}`;
if (seenReceivers.has(key)) return false;
seenReceivers.add(key);
return true;
});
if (!existing.received_by && msg.received_by) existing.received_by = msg.received_by;
if (!existing.receiver_name && msg.receiver_name) existing.receiver_name = msg.receiver_name;
if (!existing.receiver_tag_name && msg.receiver_tag_name) existing.receiver_tag_name = msg.receiver_tag_name;
if (!existing.pubkey_prefix && msg.pubkey_prefix) existing.pubkey_prefix = msg.pubkey_prefix;
if (!existing.sender_name && msg.sender_name) existing.sender_name = msg.sender_name;
if (!existing.sender_tag_name && msg.sender_tag_name) existing.sender_tag_name = msg.sender_tag_name;
if (!existing.channel_name && msg.channel_name) existing.channel_name = msg.channel_name;
if (
existing.channel_name === 'Public'
&& msg.channel_name
&& msg.channel_name !== 'Public'
) {
existing.channel_name = msg.channel_name;
}
if (existing.channel_idx === null || existing.channel_idx === undefined) {
if (msg.channel_idx !== null && msg.channel_idx !== undefined) {
existing.channel_idx = msg.channel_idx;
}
} else if (
existing.channel_idx === 17
&& msg.channel_idx !== null
&& msg.channel_idx !== undefined
&& msg.channel_idx !== 17
) {
existing.channel_idx = msg.channel_idx;
}
}
return deduped;
}
function renderPage(content, { total = null } = {}) {
litRender(html`
@@ -39,7 +202,7 @@ ${content}`, container);
async function fetchAndRenderData() {
try {
const data = await apiGet('/api/v1/messages', { limit, offset, message_type });
const messages = data.items || [];
const messages = dedupeBySignature(data.items || []);
const total = data.total || 0;
const totalPages = Math.ceil(total / limit);
@@ -49,17 +212,12 @@ ${content}`, container);
const isChannel = msg.message_type === 'channel';
const typeIcon = isChannel ? '\u{1F4FB}' : '\u{1F464}';
const typeTitle = isChannel ? t('messages.type_channel') : t('messages.type_contact');
let senderBlock;
if (isChannel) {
senderBlock = html`<span class="opacity-60">${t('messages.type_public')}</span>`;
} else {
const senderName = msg.sender_tag_name || msg.sender_name;
if (senderName) {
senderBlock = senderName;
} else {
senderBlock = html`<span class="font-mono text-xs">${(msg.pubkey_prefix || '-').slice(0, 12)}</span>`;
}
}
const chInfo = channelInfo(msg);
const sender = senderBlock(msg);
const displayMessage = messageTextWithSender(msg, chInfo.text);
const fromPrimary = isChannel
? html`<span class="font-medium">${chInfo.label || t('messages.type_channel')}</span>`
: sender;
let receiversBlock = nothing;
if (msg.receivers && msg.receivers.length >= 1) {
receiversBlock = html`<div class="flex gap-0.5">
@@ -81,7 +239,7 @@ ${content}`, container);
</span>
<div class="min-w-0">
<div class="font-medium text-sm truncate">
${senderBlock}
${fromPrimary}
</div>
<div class="text-xs opacity-60">
${formatDateTimeShort(msg.received_at)}
@@ -92,7 +250,7 @@ ${content}`, container);
${receiversBlock}
</div>
</div>
<p class="text-sm mt-2 break-words whitespace-pre-wrap">${msg.text || '-'}</p>
<p class="text-sm mt-2 break-words whitespace-pre-wrap">${displayMessage}</p>
</div>
</div>`;
});
@@ -103,17 +261,12 @@ ${content}`, container);
const isChannel = msg.message_type === 'channel';
const typeIcon = isChannel ? '\u{1F4FB}' : '\u{1F464}';
const typeTitle = isChannel ? t('messages.type_channel') : t('messages.type_contact');
let senderBlock;
if (isChannel) {
senderBlock = html`<span class="opacity-60">${t('messages.type_public')}</span>`;
} else {
const senderName = msg.sender_tag_name || msg.sender_name;
if (senderName) {
senderBlock = html`<span class="font-medium">${senderName}</span>`;
} else {
senderBlock = html`<span class="font-mono text-xs">${(msg.pubkey_prefix || '-').slice(0, 12)}</span>`;
}
}
const chInfo = channelInfo(msg);
const sender = senderBlock(msg, true);
const displayMessage = messageTextWithSender(msg, chInfo.text);
const fromPrimary = isChannel
? html`<span class="font-medium">${chInfo.label || t('messages.type_channel')}</span>`
: sender;
let receiversBlock;
if (msg.receivers && msg.receivers.length >= 1) {
receiversBlock = html`<div class="flex gap-1">
@@ -131,8 +284,10 @@ ${content}`, container);
return html`<tr class="hover align-top">
<td class="text-lg" title=${typeTitle}>${typeIcon}</td>
<td class="text-sm whitespace-nowrap">${formatDateTime(msg.received_at)}</td>
<td class="text-sm whitespace-nowrap">${senderBlock}</td>
<td class="break-words max-w-md" style="white-space: pre-wrap;">${msg.text || '-'}</td>
<td class="text-sm whitespace-nowrap">
<div>${fromPrimary}</div>
</td>
<td class="break-words max-w-md" style="white-space: pre-wrap;">${displayMessage}</td>
<td>${receiversBlock}</td>
</tr>`;
});

View File

@@ -209,7 +209,7 @@ ${heroHtml}
const initQr = () => {
const qrEl = document.getElementById('qr-code');
if (!qrEl || typeof QRCode === 'undefined') return false;
const typeMap = { chat: 1, repeater: 2, room: 3, sensor: 4 };
const typeMap = { chat: 1, repeater: 2, room: 3, companion: 1, sensor: 4 };
const typeNum = typeMap[(node.adv_type || '').toLowerCase()] || 1;
const url = 'meshcore://contact/add?name=' + encodeURIComponent(displayName) + '&public_key=' + node.public_key + '&type=' + typeNum;
new QRCode(qrEl, {

View File

@@ -159,6 +159,7 @@ ${content}`, container);
<option value="">${t('common.all_types')}</option>
<option value="chat" ?selected=${adv_type === 'chat'}>${t('node_types.chat')}</option>
<option value="repeater" ?selected=${adv_type === 'repeater'}>${t('node_types.repeater')}</option>
<option value="companion" ?selected=${adv_type === 'companion'}>${t('node_types.companion')}</option>
<option value="room" ?selected=${adv_type === 'room'}>${t('node_types.room')}</option>
</select>
</div>

View File

@@ -122,7 +122,8 @@
"node_types": {
"chat": "Chat",
"repeater": "Repeater",
"room": "Room",
"companion": "Companion",
"room": "Room Server",
"unknown": "Unknown"
},
"home": {

View File

@@ -223,7 +223,8 @@ Mesh network node type labels:
|-----|---------|---------|
| `chat` | Chat | Chat node type |
| `repeater` | Repeater | Repeater/relay node type |
| `room` | Room | Room/group node type |
| `companion` | Companion | Companion/observer node type |
| `room` | Room Server | Room server/group node type |
| `unknown` | Unknown | Unknown node type fallback |
### 7. `home`