mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
325 lines
12 KiB
Python
325 lines
12 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import TYPE_CHECKING
|
|
|
|
from meshcore import EventType
|
|
|
|
from app.models import CONTACT_TYPE_REPEATER, Contact, Message, MessagePath
|
|
from app.packet_processor import process_raw_packet
|
|
from app.repository import (
|
|
AmbiguousPublicKeyPrefixError,
|
|
ContactNameHistoryRepository,
|
|
ContactRepository,
|
|
MessageRepository,
|
|
)
|
|
from app.websocket import broadcast_event
|
|
|
|
if TYPE_CHECKING:
|
|
from meshcore.events import Event, Subscription
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Track active subscriptions so we can unsubscribe before re-registering
|
|
# This prevents handler duplication after reconnects
|
|
_active_subscriptions: list["Subscription"] = []
|
|
|
|
|
|
# Track pending ACKs: expected_ack_code -> (message_id, timestamp, timeout_ms)
|
|
_pending_acks: dict[str, tuple[int, float, int]] = {}
|
|
|
|
|
|
def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> None:
|
|
"""Track a pending ACK for a direct message."""
|
|
_pending_acks[expected_ack] = (message_id, time.time(), timeout_ms)
|
|
logger.debug(
|
|
"Tracking pending ACK %s for message %d (timeout %dms)",
|
|
expected_ack,
|
|
message_id,
|
|
timeout_ms,
|
|
)
|
|
|
|
|
|
def cleanup_expired_acks() -> None:
|
|
"""Remove expired pending ACKs."""
|
|
now = time.time()
|
|
expired = []
|
|
for code, (_msg_id, created_at, timeout_ms) in _pending_acks.items():
|
|
if now - created_at > (timeout_ms / 1000) * 2: # 2x timeout as buffer
|
|
expired.append(code)
|
|
for code in expired:
|
|
del _pending_acks[code]
|
|
logger.debug("Expired pending ACK %s", code)
|
|
|
|
|
|
async def on_contact_message(event: "Event") -> None:
|
|
"""Handle incoming direct messages from MeshCore library.
|
|
|
|
NOTE: DMs are primarily handled by the packet processor via RX_LOG_DATA,
|
|
which decrypts using our exported private key. This handler exists as a
|
|
fallback for cases where:
|
|
1. The private key couldn't be exported (firmware without ENABLE_PRIVATE_KEY_EXPORT)
|
|
2. The packet processor couldn't match the sender to a known contact
|
|
|
|
The packet processor handles: decryption, storage, broadcast, bot trigger.
|
|
This handler only stores if the packet processor didn't already handle it
|
|
(detected via INSERT OR IGNORE returning None for duplicates).
|
|
"""
|
|
payload = event.payload
|
|
|
|
# Skip CLI command responses (txt_type=1) - these are handled by the command endpoint
|
|
txt_type = payload.get("txt_type", 0)
|
|
if txt_type == 1:
|
|
logger.debug("Skipping CLI response from %s (txt_type=1)", payload.get("pubkey_prefix"))
|
|
return
|
|
|
|
# Get full public key if available, otherwise use prefix
|
|
sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "")
|
|
received_at = int(time.time())
|
|
|
|
# Look up contact from database - use prefix lookup only if needed
|
|
# (get_by_key_or_prefix does exact match first, then prefix fallback)
|
|
try:
|
|
contact = await ContactRepository.get_by_key_or_prefix(sender_pubkey)
|
|
except AmbiguousPublicKeyPrefixError:
|
|
logger.warning(
|
|
"DM sender prefix '%s' is ambiguous; storing under prefix until full key is known",
|
|
sender_pubkey,
|
|
)
|
|
contact = None
|
|
if contact:
|
|
sender_pubkey = contact.public_key.lower()
|
|
|
|
# Promote any prefix-stored messages to this full key
|
|
await MessageRepository.claim_prefix_messages(sender_pubkey)
|
|
|
|
# Skip messages from repeaters - they only send CLI responses, not chat messages.
|
|
# CLI responses are handled by the command endpoint and txt_type filter above.
|
|
if contact.type == CONTACT_TYPE_REPEATER:
|
|
logger.debug(
|
|
"Skipping message from repeater %s (not stored in chat history)",
|
|
sender_pubkey[:12],
|
|
)
|
|
return
|
|
|
|
# Try to create message - INSERT OR IGNORE handles duplicates atomically
|
|
# If the packet processor already stored this message, this returns None
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="PRIV",
|
|
text=payload.get("text", ""),
|
|
conversation_key=sender_pubkey,
|
|
sender_timestamp=payload.get("sender_timestamp") or received_at,
|
|
received_at=received_at,
|
|
path=payload.get("path"),
|
|
txt_type=txt_type,
|
|
signature=payload.get("signature"),
|
|
)
|
|
|
|
if msg_id is None:
|
|
# Already handled by packet processor (or exact duplicate) - nothing more to do
|
|
logger.debug("DM from %s already processed by packet processor", sender_pubkey[:12])
|
|
return
|
|
|
|
# If we get here, the packet processor didn't handle this message
|
|
# (likely because private key export is not available)
|
|
logger.debug("DM from %s handled by event handler (fallback path)", sender_pubkey[:12])
|
|
|
|
# Build paths array for broadcast
|
|
path = payload.get("path")
|
|
paths = [MessagePath(path=path or "", received_at=received_at)] if path is not None else None
|
|
|
|
# Broadcast the new message
|
|
broadcast_event(
|
|
"message",
|
|
Message(
|
|
id=msg_id,
|
|
type="PRIV",
|
|
conversation_key=sender_pubkey,
|
|
text=payload.get("text", ""),
|
|
sender_timestamp=payload.get("sender_timestamp") or received_at,
|
|
received_at=received_at,
|
|
paths=paths,
|
|
txt_type=txt_type,
|
|
signature=payload.get("signature"),
|
|
).model_dump(),
|
|
)
|
|
|
|
# Update contact last_contacted (contact was already fetched above)
|
|
if contact:
|
|
await ContactRepository.update_last_contacted(sender_pubkey, received_at)
|
|
|
|
# Run bot if enabled
|
|
from app.bot import run_bot_for_message
|
|
|
|
asyncio.create_task(
|
|
run_bot_for_message(
|
|
sender_name=contact.name if contact else None,
|
|
sender_key=sender_pubkey,
|
|
message_text=payload.get("text", ""),
|
|
is_dm=True,
|
|
channel_key=None,
|
|
channel_name=None,
|
|
sender_timestamp=payload.get("sender_timestamp"),
|
|
path=payload.get("path"),
|
|
is_outgoing=False,
|
|
)
|
|
)
|
|
|
|
|
|
async def on_rx_log_data(event: "Event") -> None:
|
|
"""Store raw RF packet data and process via centralized packet processor.
|
|
|
|
This is the unified entry point for all RF packets. The packet processor
|
|
handles channel messages (GROUP_TEXT) and advertisements (ADVERT).
|
|
"""
|
|
payload = event.payload
|
|
logger.debug("Received RX log data packet")
|
|
|
|
if "payload" not in payload:
|
|
logger.warning("RX_LOG_DATA event missing 'payload' field")
|
|
return
|
|
|
|
raw_hex = payload["payload"]
|
|
raw_bytes = bytes.fromhex(raw_hex)
|
|
|
|
await process_raw_packet(
|
|
raw_bytes=raw_bytes,
|
|
snr=payload.get("snr"),
|
|
rssi=payload.get("rssi"),
|
|
)
|
|
|
|
|
|
async def on_path_update(event: "Event") -> None:
|
|
"""Handle path update events."""
|
|
payload = event.payload
|
|
public_key = str(payload.get("public_key", "")).lower()
|
|
pubkey_prefix = str(payload.get("pubkey_prefix", "")).lower()
|
|
|
|
contact: Contact | None = None
|
|
if public_key:
|
|
logger.debug("Path update for %s", public_key[:12])
|
|
contact = await ContactRepository.get_by_key(public_key)
|
|
elif pubkey_prefix:
|
|
# Legacy compatibility: older payloads may only include a prefix.
|
|
logger.debug("Path update for prefix %s", pubkey_prefix)
|
|
contact = await ContactRepository.get_by_key_prefix(pubkey_prefix)
|
|
else:
|
|
logger.debug("PATH_UPDATE missing public_key/pubkey_prefix, skipping")
|
|
return
|
|
|
|
if not contact:
|
|
return
|
|
|
|
# PATH_UPDATE is a serial control push event from firmware (not an RF packet).
|
|
# Current meshcore payloads only include public_key for this event.
|
|
# RF route/path bytes are handled via RX_LOG_DATA -> process_raw_packet,
|
|
# so if path fields are absent here we treat this as informational only.
|
|
path = payload.get("path")
|
|
path_len = payload.get("path_len")
|
|
if path is None or path_len is None:
|
|
logger.debug(
|
|
"PATH_UPDATE for %s has no path payload, skipping DB update", contact.public_key[:12]
|
|
)
|
|
return
|
|
|
|
try:
|
|
normalized_path_len = int(path_len)
|
|
except (TypeError, ValueError):
|
|
logger.warning(
|
|
"Invalid path_len in PATH_UPDATE for %s: %r", contact.public_key[:12], path_len
|
|
)
|
|
return
|
|
|
|
await ContactRepository.update_path(contact.public_key, str(path), normalized_path_len)
|
|
|
|
|
|
async def on_new_contact(event: "Event") -> None:
|
|
"""Handle new contact from radio's internal contact database.
|
|
|
|
This is different from RF advertisements - these are contacts synced
|
|
from the radio's stored contact list.
|
|
"""
|
|
payload = event.payload
|
|
public_key = payload.get("public_key", "")
|
|
|
|
if not public_key:
|
|
logger.warning("Received new contact event with no public_key, skipping")
|
|
return
|
|
|
|
logger.debug("New contact: %s", public_key[:12])
|
|
|
|
contact_data = {
|
|
**Contact.from_radio_dict(public_key.lower(), payload, on_radio=True),
|
|
"last_seen": int(time.time()),
|
|
}
|
|
await ContactRepository.upsert(contact_data)
|
|
|
|
# Record name history if contact has a name
|
|
adv_name = payload.get("adv_name")
|
|
if adv_name:
|
|
await ContactNameHistoryRepository.record_name(
|
|
public_key.lower(), adv_name, int(time.time())
|
|
)
|
|
|
|
# Read back from DB so the broadcast includes all fields (last_contacted,
|
|
# last_read_at, etc.) matching the REST Contact shape exactly.
|
|
db_contact = await ContactRepository.get_by_key(public_key)
|
|
broadcast_event("contact", (db_contact.model_dump() if db_contact else contact_data))
|
|
|
|
|
|
async def on_ack(event: "Event") -> None:
|
|
"""Handle ACK events for direct messages."""
|
|
payload = event.payload
|
|
ack_code = payload.get("code", "")
|
|
|
|
if not ack_code:
|
|
logger.debug("Received ACK with no code")
|
|
return
|
|
|
|
logger.debug("Received ACK with code %s", ack_code)
|
|
|
|
cleanup_expired_acks()
|
|
|
|
if ack_code in _pending_acks:
|
|
message_id, _, _ = _pending_acks.pop(ack_code)
|
|
logger.info("ACK received for message %d", message_id)
|
|
|
|
ack_count = await MessageRepository.increment_ack_count(message_id)
|
|
# DM ACKs don't carry path data, so paths is intentionally omitted.
|
|
# The frontend's mergePendingAck handles the missing field correctly,
|
|
# preserving any previously known paths.
|
|
broadcast_event("message_acked", {"message_id": message_id, "ack_count": ack_count})
|
|
else:
|
|
logger.debug("ACK code %s does not match any pending messages", ack_code)
|
|
|
|
|
|
def register_event_handlers(meshcore) -> None:
|
|
"""Register event handlers with the MeshCore instance.
|
|
|
|
Note: CHANNEL_MSG_RECV and ADVERTISEMENT events are NOT subscribed.
|
|
These are handled by the packet processor via RX_LOG_DATA to avoid
|
|
duplicate processing and ensure consistent handling.
|
|
|
|
This function is safe to call multiple times (e.g., after reconnect).
|
|
Existing handlers are unsubscribed before new ones are registered.
|
|
"""
|
|
global _active_subscriptions
|
|
|
|
# Unsubscribe existing handlers to prevent duplication after reconnects.
|
|
# Try/except handles the case where the old dispatcher is in a bad state
|
|
# (e.g., after reconnect with a new MeshCore instance).
|
|
for sub in _active_subscriptions:
|
|
try:
|
|
sub.unsubscribe()
|
|
except Exception:
|
|
pass # Old dispatcher may be gone, that's fine
|
|
_active_subscriptions.clear()
|
|
|
|
# Register handlers and track subscriptions
|
|
_active_subscriptions.append(meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_contact_message))
|
|
_active_subscriptions.append(meshcore.subscribe(EventType.RX_LOG_DATA, on_rx_log_data))
|
|
_active_subscriptions.append(meshcore.subscribe(EventType.PATH_UPDATE, on_path_update))
|
|
_active_subscriptions.append(meshcore.subscribe(EventType.NEW_CONTACT, on_new_contact))
|
|
_active_subscriptions.append(meshcore.subscribe(EventType.ACK, on_ack))
|
|
logger.info("Event handlers registered")
|