mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
373 lines
12 KiB
Python
373 lines
12 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from app.models import CONTACT_TYPE_REPEATER, CONTACT_TYPE_ROOM, Contact, ContactUpsert, Message
|
|
from app.repository import (
|
|
AmbiguousPublicKeyPrefixError,
|
|
ContactRepository,
|
|
MessageRepository,
|
|
RawPacketRepository,
|
|
)
|
|
from app.services.contact_reconciliation import claim_prefix_messages_for_contact
|
|
from app.services.messages import (
|
|
broadcast_message,
|
|
build_message_model,
|
|
build_message_paths,
|
|
format_contact_log_target,
|
|
handle_duplicate_message,
|
|
reconcile_duplicate_message,
|
|
truncate_for_log,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from app.decoder import DecryptedDirectMessage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BroadcastFn = Callable[..., Any]
|
|
_decrypted_dm_store_lock = asyncio.Lock()
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FallbackDirectMessageContext:
|
|
conversation_key: str
|
|
contact: Contact | None
|
|
sender_name: str | None
|
|
sender_key: str | None
|
|
skip_storage: bool = False
|
|
|
|
|
|
async def _prepare_resolved_contact(
|
|
contact: Contact,
|
|
*,
|
|
log: logging.Logger | None = None,
|
|
) -> tuple[str, bool]:
|
|
conversation_key = contact.public_key.lower()
|
|
await claim_prefix_messages_for_contact(public_key=conversation_key, log=log or logger)
|
|
|
|
if contact.type == CONTACT_TYPE_REPEATER:
|
|
return conversation_key, True
|
|
|
|
return conversation_key, False
|
|
|
|
|
|
async def resolve_fallback_direct_message_context(
|
|
*,
|
|
sender_public_key: str,
|
|
received_at: int,
|
|
broadcast_fn: BroadcastFn,
|
|
contact_repository=ContactRepository,
|
|
log: logging.Logger | None = None,
|
|
) -> FallbackDirectMessageContext:
|
|
normalized_sender = sender_public_key.lower()
|
|
|
|
try:
|
|
contact = await contact_repository.get_by_key_or_prefix(normalized_sender)
|
|
except AmbiguousPublicKeyPrefixError:
|
|
(log or logger).warning(
|
|
"DM sender prefix '%s' is ambiguous; storing under prefix until full key is known",
|
|
sender_public_key,
|
|
)
|
|
contact = None
|
|
|
|
if contact is not None:
|
|
conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=log)
|
|
return FallbackDirectMessageContext(
|
|
conversation_key=conversation_key,
|
|
contact=contact,
|
|
sender_name=contact.name,
|
|
sender_key=conversation_key,
|
|
skip_storage=skip_storage,
|
|
)
|
|
|
|
if normalized_sender:
|
|
placeholder_upsert = ContactUpsert(
|
|
public_key=normalized_sender,
|
|
type=0,
|
|
last_seen=received_at,
|
|
last_contacted=received_at,
|
|
first_seen=received_at,
|
|
on_radio=False,
|
|
)
|
|
await contact_repository.upsert(placeholder_upsert)
|
|
contact = await contact_repository.get_by_key(normalized_sender)
|
|
if contact is not None:
|
|
broadcast_fn("contact", contact.model_dump())
|
|
|
|
return FallbackDirectMessageContext(
|
|
conversation_key=normalized_sender,
|
|
contact=contact,
|
|
sender_name=contact.name if contact else None,
|
|
sender_key=normalized_sender or None,
|
|
)
|
|
|
|
|
|
async def resolve_direct_message_sender_metadata(
|
|
*,
|
|
sender_public_key: str,
|
|
received_at: int,
|
|
broadcast_fn: BroadcastFn,
|
|
contact_repository=ContactRepository,
|
|
log: logging.Logger | None = None,
|
|
) -> tuple[str | None, str | None]:
|
|
"""Resolve sender attribution for direct-message variants such as room-server posts."""
|
|
normalized_sender = sender_public_key.lower()
|
|
|
|
try:
|
|
contact = await contact_repository.get_by_key_or_prefix(normalized_sender)
|
|
except AmbiguousPublicKeyPrefixError:
|
|
(log or logger).warning(
|
|
"Sender prefix '%s' is ambiguous; preserving prefix-only attribution",
|
|
sender_public_key,
|
|
)
|
|
contact = None
|
|
|
|
if contact is not None:
|
|
await claim_prefix_messages_for_contact(
|
|
public_key=contact.public_key.lower(), log=log or logger
|
|
)
|
|
return contact.name, contact.public_key.lower()
|
|
|
|
return None, normalized_sender or None
|
|
|
|
|
|
async def _store_direct_message(
|
|
*,
|
|
packet_id: int | None,
|
|
conversation_key: str,
|
|
text: str,
|
|
sender_timestamp: int,
|
|
received_at: int,
|
|
path: str | None,
|
|
path_len: int | None,
|
|
outgoing: bool,
|
|
txt_type: int,
|
|
signature: str | None,
|
|
sender_name: str | None,
|
|
sender_key: str | None,
|
|
realtime: bool,
|
|
broadcast_fn: BroadcastFn,
|
|
update_last_contacted_key: str | None,
|
|
best_effort_content_dedup: bool,
|
|
linked_packet_dedup: bool,
|
|
message_repository=MessageRepository,
|
|
contact_repository=ContactRepository,
|
|
raw_packet_repository=RawPacketRepository,
|
|
) -> Message | None:
|
|
async def store() -> Message | None:
|
|
if linked_packet_dedup and packet_id is not None:
|
|
linked_message_id = await raw_packet_repository.get_linked_message_id(packet_id)
|
|
if linked_message_id is not None:
|
|
existing_msg = await message_repository.get_by_id(linked_message_id)
|
|
if existing_msg is not None:
|
|
await reconcile_duplicate_message(
|
|
existing_msg=existing_msg,
|
|
packet_id=packet_id,
|
|
path=path,
|
|
received_at=received_at,
|
|
path_len=path_len,
|
|
broadcast_fn=broadcast_fn,
|
|
)
|
|
return None
|
|
|
|
if best_effort_content_dedup:
|
|
existing_msg = await message_repository.get_by_content(
|
|
msg_type="PRIV",
|
|
conversation_key=conversation_key,
|
|
text=text,
|
|
sender_timestamp=sender_timestamp,
|
|
outgoing=outgoing,
|
|
)
|
|
if existing_msg is not None:
|
|
await reconcile_duplicate_message(
|
|
existing_msg=existing_msg,
|
|
packet_id=packet_id,
|
|
path=path,
|
|
received_at=received_at,
|
|
path_len=path_len,
|
|
broadcast_fn=broadcast_fn,
|
|
)
|
|
return None
|
|
|
|
msg_id = await message_repository.create(
|
|
msg_type="PRIV",
|
|
text=text,
|
|
conversation_key=conversation_key,
|
|
sender_timestamp=sender_timestamp,
|
|
received_at=received_at,
|
|
path=path,
|
|
path_len=path_len,
|
|
txt_type=txt_type,
|
|
signature=signature,
|
|
outgoing=outgoing,
|
|
sender_key=sender_key,
|
|
sender_name=sender_name,
|
|
)
|
|
if msg_id is None:
|
|
await handle_duplicate_message(
|
|
packet_id=packet_id,
|
|
msg_type="PRIV",
|
|
conversation_key=conversation_key,
|
|
text=text,
|
|
sender_timestamp=sender_timestamp,
|
|
outgoing=outgoing,
|
|
path=path,
|
|
received_at=received_at,
|
|
path_len=path_len,
|
|
broadcast_fn=broadcast_fn,
|
|
)
|
|
return None
|
|
|
|
if packet_id is not None:
|
|
await raw_packet_repository.mark_decrypted(packet_id, msg_id)
|
|
|
|
message = build_message_model(
|
|
message_id=msg_id,
|
|
msg_type="PRIV",
|
|
conversation_key=conversation_key,
|
|
text=text,
|
|
sender_timestamp=sender_timestamp,
|
|
received_at=received_at,
|
|
paths=build_message_paths(path, received_at, path_len),
|
|
txt_type=txt_type,
|
|
signature=signature,
|
|
sender_key=sender_key,
|
|
outgoing=outgoing,
|
|
sender_name=sender_name,
|
|
packet_id=packet_id,
|
|
)
|
|
broadcast_message(message=message, broadcast_fn=broadcast_fn, realtime=realtime)
|
|
|
|
if update_last_contacted_key:
|
|
await contact_repository.update_last_contacted(update_last_contacted_key, received_at)
|
|
|
|
return message
|
|
|
|
if linked_packet_dedup:
|
|
async with _decrypted_dm_store_lock:
|
|
return await store()
|
|
return await store()
|
|
|
|
|
|
async def ingest_decrypted_direct_message(
|
|
*,
|
|
packet_id: int,
|
|
decrypted: "DecryptedDirectMessage",
|
|
their_public_key: str,
|
|
received_at: int | None = None,
|
|
path: str | None = None,
|
|
path_len: int | None = None,
|
|
outgoing: bool = False,
|
|
realtime: bool = True,
|
|
broadcast_fn: BroadcastFn,
|
|
contact_repository=ContactRepository,
|
|
) -> Message | None:
|
|
conversation_key = their_public_key.lower()
|
|
|
|
if not outgoing and decrypted.txt_type == 1:
|
|
logger.debug(
|
|
"Skipping CLI response from %s (txt_type=1): %s",
|
|
conversation_key[:12],
|
|
(decrypted.message or "")[:50],
|
|
)
|
|
return None
|
|
|
|
contact = await contact_repository.get_by_key(conversation_key)
|
|
sender_name: str | None = None
|
|
sender_key: str | None = conversation_key if not outgoing else None
|
|
signature: str | None = None
|
|
if contact is not None:
|
|
conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=logger)
|
|
if skip_storage:
|
|
logger.debug(
|
|
"Skipping message from repeater %s (CLI responses not stored): %s",
|
|
conversation_key[:12],
|
|
(decrypted.message or "")[:50],
|
|
)
|
|
return None
|
|
if not outgoing:
|
|
if contact.type == CONTACT_TYPE_ROOM and decrypted.signed_sender_prefix:
|
|
sender_name, sender_key = await resolve_direct_message_sender_metadata(
|
|
sender_public_key=decrypted.signed_sender_prefix,
|
|
received_at=received_at or int(time.time()),
|
|
broadcast_fn=broadcast_fn,
|
|
contact_repository=contact_repository,
|
|
log=logger,
|
|
)
|
|
signature = decrypted.signed_sender_prefix
|
|
else:
|
|
sender_name = contact.name
|
|
|
|
received = received_at or int(time.time())
|
|
message = await _store_direct_message(
|
|
packet_id=packet_id,
|
|
conversation_key=conversation_key,
|
|
text=decrypted.message,
|
|
sender_timestamp=decrypted.timestamp,
|
|
received_at=received,
|
|
path=path,
|
|
path_len=path_len,
|
|
outgoing=outgoing,
|
|
txt_type=decrypted.txt_type,
|
|
signature=signature,
|
|
sender_name=sender_name,
|
|
sender_key=sender_key,
|
|
realtime=realtime,
|
|
broadcast_fn=broadcast_fn,
|
|
update_last_contacted_key=conversation_key,
|
|
best_effort_content_dedup=outgoing,
|
|
linked_packet_dedup=True,
|
|
)
|
|
if message is None:
|
|
return None
|
|
|
|
logger.info(
|
|
'Stored direct message "%s" for %r (msg ID %d in contact ID %s, outgoing=%s)',
|
|
truncate_for_log(decrypted.message),
|
|
format_contact_log_target(contact.name if contact else None, conversation_key),
|
|
message.id,
|
|
conversation_key,
|
|
outgoing,
|
|
)
|
|
return message
|
|
|
|
|
|
async def ingest_fallback_direct_message(
|
|
*,
|
|
conversation_key: str,
|
|
text: str,
|
|
sender_timestamp: int,
|
|
received_at: int,
|
|
path: str | None,
|
|
path_len: int | None,
|
|
txt_type: int,
|
|
signature: str | None,
|
|
sender_name: str | None,
|
|
sender_key: str | None,
|
|
broadcast_fn: BroadcastFn,
|
|
update_last_contacted_key: str | None = None,
|
|
) -> Message | None:
|
|
return await _store_direct_message(
|
|
packet_id=None,
|
|
conversation_key=conversation_key,
|
|
text=text,
|
|
sender_timestamp=sender_timestamp,
|
|
received_at=received_at,
|
|
path=path,
|
|
path_len=path_len,
|
|
outgoing=False,
|
|
txt_type=txt_type,
|
|
signature=signature,
|
|
sender_name=sender_name,
|
|
sender_key=sender_key,
|
|
realtime=True,
|
|
broadcast_fn=broadcast_fn,
|
|
update_last_contacted_key=update_last_contacted_key,
|
|
best_effort_content_dedup=True,
|
|
linked_packet_dedup=False,
|
|
)
|