mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
Compare commits
4 Commits
room_serve
...
dm-ingest-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb8a69a11d | ||
|
|
3b63496996 | ||
|
|
a1a749283e | ||
|
|
d373585527 |
@@ -4,19 +4,21 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from meshcore import EventType
|
||||
|
||||
from app.models import CONTACT_TYPE_REPEATER, Contact, ContactUpsert
|
||||
from app.models import Contact, ContactUpsert
|
||||
from app.packet_processor import process_raw_packet
|
||||
from app.repository import (
|
||||
AmbiguousPublicKeyPrefixError,
|
||||
ContactRepository,
|
||||
)
|
||||
from app.services import dm_ack_tracker
|
||||
from app.services.contact_reconciliation import (
|
||||
claim_prefix_messages_for_contact,
|
||||
promote_prefix_contacts_for_contact,
|
||||
record_contact_name_and_reconcile,
|
||||
)
|
||||
from app.services.messages import create_fallback_direct_message, increment_ack_and_broadcast
|
||||
from app.services.dm_ingest import (
|
||||
ingest_fallback_direct_message,
|
||||
resolve_fallback_direct_message_context,
|
||||
)
|
||||
from app.services.messages import increment_ack_and_broadcast
|
||||
from app.websocket import broadcast_event
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -51,8 +53,8 @@ async def on_contact_message(event: "Event") -> None:
|
||||
2. The packet processor couldn't match the sender to a known contact
|
||||
|
||||
The packet processor handles: decryption, storage, broadcast, bot trigger.
|
||||
This handler only stores if the packet processor didn't already handle it
|
||||
(detected via INSERT OR IGNORE returning None for duplicates).
|
||||
This handler adapts CONTACT_MSG_RECV payloads into the shared DM ingest
|
||||
workflow, which reconciles duplicates against the packet pipeline when possible.
|
||||
"""
|
||||
payload = event.payload
|
||||
|
||||
@@ -66,54 +68,27 @@ async def on_contact_message(event: "Event") -> None:
|
||||
sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "")
|
||||
received_at = int(time.time())
|
||||
|
||||
# Look up contact from database - use prefix lookup only if needed
|
||||
# (get_by_key_or_prefix does exact match first, then prefix fallback)
|
||||
try:
|
||||
contact = await ContactRepository.get_by_key_or_prefix(sender_pubkey)
|
||||
except AmbiguousPublicKeyPrefixError:
|
||||
logger.warning(
|
||||
"DM sender prefix '%s' is ambiguous; storing under prefix until full key is known",
|
||||
sender_pubkey,
|
||||
context = await resolve_fallback_direct_message_context(
|
||||
sender_public_key=sender_pubkey,
|
||||
received_at=received_at,
|
||||
broadcast_fn=broadcast_event,
|
||||
contact_repository=ContactRepository,
|
||||
log=logger,
|
||||
)
|
||||
if context.skip_storage:
|
||||
logger.debug(
|
||||
"Skipping message from repeater %s (not stored in chat history)",
|
||||
context.conversation_key[:12],
|
||||
)
|
||||
contact = None
|
||||
if contact:
|
||||
sender_pubkey = contact.public_key.lower()
|
||||
return
|
||||
|
||||
# Promote any prefix-stored messages to this full key
|
||||
await claim_prefix_messages_for_contact(public_key=sender_pubkey, log=logger)
|
||||
|
||||
# Skip messages from repeaters - they only send CLI responses, not chat messages.
|
||||
# CLI responses are handled by the command endpoint and txt_type filter above.
|
||||
if contact.type == CONTACT_TYPE_REPEATER:
|
||||
logger.debug(
|
||||
"Skipping message from repeater %s (not stored in chat history)",
|
||||
sender_pubkey[:12],
|
||||
)
|
||||
return
|
||||
elif sender_pubkey:
|
||||
placeholder_upsert = ContactUpsert(
|
||||
public_key=sender_pubkey.lower(),
|
||||
type=0,
|
||||
last_seen=received_at,
|
||||
last_contacted=received_at,
|
||||
first_seen=received_at,
|
||||
on_radio=False,
|
||||
out_path_hash_mode=-1,
|
||||
)
|
||||
await ContactRepository.upsert(placeholder_upsert)
|
||||
contact = await ContactRepository.get_by_key(sender_pubkey.lower())
|
||||
if contact:
|
||||
broadcast_event("contact", contact.model_dump())
|
||||
|
||||
# Try to create message - INSERT OR IGNORE handles duplicates atomically
|
||||
# If the packet processor already stored this message, this returns None
|
||||
# Try to create or reconcile the message via the shared DM ingest service.
|
||||
ts = payload.get("sender_timestamp")
|
||||
sender_timestamp = ts if ts is not None else received_at
|
||||
sender_name = contact.name if contact else None
|
||||
path = payload.get("path")
|
||||
path_len = payload.get("path_len")
|
||||
message = await create_fallback_direct_message(
|
||||
conversation_key=sender_pubkey,
|
||||
message = await ingest_fallback_direct_message(
|
||||
conversation_key=context.conversation_key,
|
||||
text=payload.get("text", ""),
|
||||
sender_timestamp=sender_timestamp,
|
||||
received_at=received_at,
|
||||
@@ -121,23 +96,24 @@ async def on_contact_message(event: "Event") -> None:
|
||||
path_len=path_len,
|
||||
txt_type=txt_type,
|
||||
signature=payload.get("signature"),
|
||||
sender_name=sender_name,
|
||||
sender_key=sender_pubkey,
|
||||
sender_name=context.sender_name,
|
||||
sender_key=context.sender_key,
|
||||
broadcast_fn=broadcast_event,
|
||||
update_last_contacted_key=context.contact.public_key.lower() if context.contact else None,
|
||||
)
|
||||
|
||||
if message is None:
|
||||
# Already handled by packet processor (or exact duplicate) - nothing more to do
|
||||
logger.debug("DM from %s already processed by packet processor", sender_pubkey[:12])
|
||||
logger.debug(
|
||||
"DM from %s already processed by packet processor", context.conversation_key[:12]
|
||||
)
|
||||
return
|
||||
|
||||
# If we get here, the packet processor didn't handle this message
|
||||
# (likely because private key export is not available)
|
||||
logger.debug("DM from %s handled by event handler (fallback path)", sender_pubkey[:12])
|
||||
|
||||
# Update contact last_contacted (contact was already fetched above)
|
||||
if contact:
|
||||
await ContactRepository.update_last_contacted(sender_pubkey, received_at)
|
||||
logger.debug(
|
||||
"DM from %s handled by event handler (fallback path)", context.conversation_key[:12]
|
||||
)
|
||||
|
||||
|
||||
async def on_rx_log_data(event: "Event") -> None:
|
||||
|
||||
320
app/services/dm_ingest.py
Normal file
320
app/services/dm_ingest.py
Normal file
@@ -0,0 +1,320 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from app.models import CONTACT_TYPE_REPEATER, Contact, ContactUpsert, Message
|
||||
from app.repository import (
|
||||
AmbiguousPublicKeyPrefixError,
|
||||
ContactRepository,
|
||||
MessageRepository,
|
||||
RawPacketRepository,
|
||||
)
|
||||
from app.services.contact_reconciliation import claim_prefix_messages_for_contact
|
||||
from app.services.messages import (
|
||||
broadcast_message,
|
||||
build_message_model,
|
||||
build_message_paths,
|
||||
format_contact_log_target,
|
||||
handle_duplicate_message,
|
||||
reconcile_duplicate_message,
|
||||
truncate_for_log,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.decoder import DecryptedDirectMessage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BroadcastFn = Callable[..., Any]
|
||||
_decrypted_dm_store_lock = asyncio.Lock()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FallbackDirectMessageContext:
|
||||
conversation_key: str
|
||||
contact: Contact | None
|
||||
sender_name: str | None
|
||||
sender_key: str | None
|
||||
skip_storage: bool = False
|
||||
|
||||
|
||||
async def _prepare_resolved_contact(
|
||||
contact: Contact,
|
||||
*,
|
||||
log: logging.Logger | None = None,
|
||||
) -> tuple[str, bool]:
|
||||
conversation_key = contact.public_key.lower()
|
||||
await claim_prefix_messages_for_contact(public_key=conversation_key, log=log or logger)
|
||||
|
||||
if contact.type == CONTACT_TYPE_REPEATER:
|
||||
return conversation_key, True
|
||||
|
||||
return conversation_key, False
|
||||
|
||||
|
||||
async def resolve_fallback_direct_message_context(
|
||||
*,
|
||||
sender_public_key: str,
|
||||
received_at: int,
|
||||
broadcast_fn: BroadcastFn,
|
||||
contact_repository=ContactRepository,
|
||||
log: logging.Logger | None = None,
|
||||
) -> FallbackDirectMessageContext:
|
||||
normalized_sender = sender_public_key.lower()
|
||||
|
||||
try:
|
||||
contact = await contact_repository.get_by_key_or_prefix(normalized_sender)
|
||||
except AmbiguousPublicKeyPrefixError:
|
||||
(log or logger).warning(
|
||||
"DM sender prefix '%s' is ambiguous; storing under prefix until full key is known",
|
||||
sender_public_key,
|
||||
)
|
||||
contact = None
|
||||
|
||||
if contact is not None:
|
||||
conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=log)
|
||||
return FallbackDirectMessageContext(
|
||||
conversation_key=conversation_key,
|
||||
contact=contact,
|
||||
sender_name=contact.name,
|
||||
sender_key=conversation_key,
|
||||
skip_storage=skip_storage,
|
||||
)
|
||||
|
||||
if normalized_sender:
|
||||
placeholder_upsert = ContactUpsert(
|
||||
public_key=normalized_sender,
|
||||
type=0,
|
||||
last_seen=received_at,
|
||||
last_contacted=received_at,
|
||||
first_seen=received_at,
|
||||
on_radio=False,
|
||||
out_path_hash_mode=-1,
|
||||
)
|
||||
await contact_repository.upsert(placeholder_upsert)
|
||||
contact = await contact_repository.get_by_key(normalized_sender)
|
||||
if contact is not None:
|
||||
broadcast_fn("contact", contact.model_dump())
|
||||
|
||||
return FallbackDirectMessageContext(
|
||||
conversation_key=normalized_sender,
|
||||
contact=contact,
|
||||
sender_name=contact.name if contact else None,
|
||||
sender_key=normalized_sender or None,
|
||||
)
|
||||
|
||||
|
||||
async def _store_direct_message(
|
||||
*,
|
||||
packet_id: int | None,
|
||||
conversation_key: str,
|
||||
text: str,
|
||||
sender_timestamp: int,
|
||||
received_at: int,
|
||||
path: str | None,
|
||||
path_len: int | None,
|
||||
outgoing: bool,
|
||||
txt_type: int,
|
||||
signature: str | None,
|
||||
sender_name: str | None,
|
||||
sender_key: str | None,
|
||||
realtime: bool,
|
||||
broadcast_fn: BroadcastFn,
|
||||
update_last_contacted_key: str | None,
|
||||
best_effort_content_dedup: bool,
|
||||
linked_packet_dedup: bool,
|
||||
message_repository=MessageRepository,
|
||||
contact_repository=ContactRepository,
|
||||
raw_packet_repository=RawPacketRepository,
|
||||
) -> Message | None:
|
||||
async def store() -> Message | None:
|
||||
if linked_packet_dedup and packet_id is not None:
|
||||
linked_message_id = await raw_packet_repository.get_linked_message_id(packet_id)
|
||||
if linked_message_id is not None:
|
||||
existing_msg = await message_repository.get_by_id(linked_message_id)
|
||||
if existing_msg is not None:
|
||||
await reconcile_duplicate_message(
|
||||
existing_msg=existing_msg,
|
||||
packet_id=packet_id,
|
||||
path=path,
|
||||
received_at=received_at,
|
||||
path_len=path_len,
|
||||
broadcast_fn=broadcast_fn,
|
||||
)
|
||||
return None
|
||||
|
||||
if best_effort_content_dedup:
|
||||
existing_msg = await message_repository.get_by_content(
|
||||
msg_type="PRIV",
|
||||
conversation_key=conversation_key,
|
||||
text=text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
)
|
||||
if existing_msg is not None:
|
||||
await reconcile_duplicate_message(
|
||||
existing_msg=existing_msg,
|
||||
packet_id=packet_id,
|
||||
path=path,
|
||||
received_at=received_at,
|
||||
path_len=path_len,
|
||||
broadcast_fn=broadcast_fn,
|
||||
)
|
||||
return None
|
||||
|
||||
msg_id = await message_repository.create(
|
||||
msg_type="PRIV",
|
||||
text=text,
|
||||
conversation_key=conversation_key,
|
||||
sender_timestamp=sender_timestamp,
|
||||
received_at=received_at,
|
||||
path=path,
|
||||
path_len=path_len,
|
||||
txt_type=txt_type,
|
||||
signature=signature,
|
||||
outgoing=outgoing,
|
||||
sender_key=sender_key,
|
||||
sender_name=sender_name,
|
||||
)
|
||||
if msg_id is None:
|
||||
await handle_duplicate_message(
|
||||
packet_id=packet_id,
|
||||
msg_type="PRIV",
|
||||
conversation_key=conversation_key,
|
||||
text=text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
path=path,
|
||||
received_at=received_at,
|
||||
path_len=path_len,
|
||||
broadcast_fn=broadcast_fn,
|
||||
)
|
||||
return None
|
||||
|
||||
if packet_id is not None:
|
||||
await raw_packet_repository.mark_decrypted(packet_id, msg_id)
|
||||
|
||||
message = build_message_model(
|
||||
message_id=msg_id,
|
||||
msg_type="PRIV",
|
||||
conversation_key=conversation_key,
|
||||
text=text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
received_at=received_at,
|
||||
paths=build_message_paths(path, received_at, path_len),
|
||||
txt_type=txt_type,
|
||||
signature=signature,
|
||||
sender_key=sender_key,
|
||||
outgoing=outgoing,
|
||||
sender_name=sender_name,
|
||||
)
|
||||
broadcast_message(message=message, broadcast_fn=broadcast_fn, realtime=realtime)
|
||||
|
||||
if update_last_contacted_key:
|
||||
await contact_repository.update_last_contacted(update_last_contacted_key, received_at)
|
||||
|
||||
return message
|
||||
|
||||
if linked_packet_dedup:
|
||||
async with _decrypted_dm_store_lock:
|
||||
return await store()
|
||||
return await store()
|
||||
|
||||
|
||||
async def ingest_decrypted_direct_message(
|
||||
*,
|
||||
packet_id: int,
|
||||
decrypted: "DecryptedDirectMessage",
|
||||
their_public_key: str,
|
||||
received_at: int | None = None,
|
||||
path: str | None = None,
|
||||
path_len: int | None = None,
|
||||
outgoing: bool = False,
|
||||
realtime: bool = True,
|
||||
broadcast_fn: BroadcastFn,
|
||||
contact_repository=ContactRepository,
|
||||
) -> Message | None:
|
||||
conversation_key = their_public_key.lower()
|
||||
contact = await contact_repository.get_by_key(conversation_key)
|
||||
sender_name: str | None = None
|
||||
if contact is not None:
|
||||
conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=logger)
|
||||
if skip_storage:
|
||||
logger.debug(
|
||||
"Skipping message from repeater %s (CLI responses not stored): %s",
|
||||
conversation_key[:12],
|
||||
(decrypted.message or "")[:50],
|
||||
)
|
||||
return None
|
||||
if not outgoing:
|
||||
sender_name = contact.name
|
||||
|
||||
received = received_at or int(time.time())
|
||||
message = await _store_direct_message(
|
||||
packet_id=packet_id,
|
||||
conversation_key=conversation_key,
|
||||
text=decrypted.message,
|
||||
sender_timestamp=decrypted.timestamp,
|
||||
received_at=received,
|
||||
path=path,
|
||||
path_len=path_len,
|
||||
outgoing=outgoing,
|
||||
txt_type=0,
|
||||
signature=None,
|
||||
sender_name=sender_name,
|
||||
sender_key=conversation_key if not outgoing else None,
|
||||
realtime=realtime,
|
||||
broadcast_fn=broadcast_fn,
|
||||
update_last_contacted_key=conversation_key,
|
||||
best_effort_content_dedup=outgoing,
|
||||
linked_packet_dedup=True,
|
||||
)
|
||||
if message is None:
|
||||
return None
|
||||
|
||||
logger.info(
|
||||
'Stored direct message "%s" for %r (msg ID %d in contact ID %s, outgoing=%s)',
|
||||
truncate_for_log(decrypted.message),
|
||||
format_contact_log_target(contact.name if contact else None, conversation_key),
|
||||
message.id,
|
||||
conversation_key,
|
||||
outgoing,
|
||||
)
|
||||
return message
|
||||
|
||||
|
||||
async def ingest_fallback_direct_message(
|
||||
*,
|
||||
conversation_key: str,
|
||||
text: str,
|
||||
sender_timestamp: int,
|
||||
received_at: int,
|
||||
path: str | None,
|
||||
path_len: int | None,
|
||||
txt_type: int,
|
||||
signature: str | None,
|
||||
sender_name: str | None,
|
||||
sender_key: str | None,
|
||||
broadcast_fn: BroadcastFn,
|
||||
update_last_contacted_key: str | None = None,
|
||||
) -> Message | None:
|
||||
return await _store_direct_message(
|
||||
packet_id=None,
|
||||
conversation_key=conversation_key,
|
||||
text=text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
received_at=received_at,
|
||||
path=path,
|
||||
path_len=path_len,
|
||||
outgoing=False,
|
||||
txt_type=txt_type,
|
||||
signature=signature,
|
||||
sender_name=sender_name,
|
||||
sender_key=sender_key,
|
||||
realtime=True,
|
||||
broadcast_fn=broadcast_fn,
|
||||
update_last_contacted_key=update_last_contacted_key,
|
||||
best_effort_content_dedup=True,
|
||||
linked_packet_dedup=False,
|
||||
)
|
||||
@@ -1,10 +1,9 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from app.models import CONTACT_TYPE_REPEATER, Message, MessagePath
|
||||
from app.models import Message, MessagePath
|
||||
from app.repository import ContactRepository, MessageRepository, RawPacketRepository
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -14,10 +13,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
BroadcastFn = Callable[..., Any]
|
||||
LOG_MESSAGE_PREVIEW_LEN = 32
|
||||
_decrypted_dm_store_lock = asyncio.Lock()
|
||||
|
||||
|
||||
def _truncate_for_log(text: str, max_chars: int = LOG_MESSAGE_PREVIEW_LEN) -> str:
|
||||
def truncate_for_log(text: str, max_chars: int = LOG_MESSAGE_PREVIEW_LEN) -> str:
|
||||
"""Return a compact single-line message preview for log output."""
|
||||
normalized = " ".join(text.split())
|
||||
if len(normalized) <= max_chars:
|
||||
@@ -30,7 +28,7 @@ def _format_channel_log_target(channel_name: str | None, channel_key: str) -> st
|
||||
return channel_name or channel_key
|
||||
|
||||
|
||||
def _format_contact_log_target(contact_name: str | None, public_key: str) -> str:
|
||||
def format_contact_log_target(contact_name: str | None, public_key: str) -> str:
|
||||
"""Return a human-friendly DM target label for logs."""
|
||||
return contact_name or public_key[:12]
|
||||
|
||||
@@ -127,7 +125,7 @@ async def increment_ack_and_broadcast(
|
||||
return ack_count
|
||||
|
||||
|
||||
async def _reconcile_duplicate_message(
|
||||
async def reconcile_duplicate_message(
|
||||
*,
|
||||
existing_msg: Message,
|
||||
packet_id: int | None,
|
||||
@@ -194,7 +192,7 @@ async def handle_duplicate_message(
|
||||
)
|
||||
return
|
||||
|
||||
await _reconcile_duplicate_message(
|
||||
await reconcile_duplicate_message(
|
||||
existing_msg=existing_msg,
|
||||
packet_id=packet_id,
|
||||
path=path,
|
||||
@@ -257,7 +255,7 @@ async def create_message_from_decrypted(
|
||||
|
||||
logger.info(
|
||||
'Stored channel message "%s" for %r (msg ID %d in chan ID %s)',
|
||||
_truncate_for_log(text),
|
||||
truncate_for_log(text),
|
||||
_format_channel_log_target(channel_name, channel_key_normalized),
|
||||
msg_id,
|
||||
channel_key_normalized,
|
||||
@@ -298,165 +296,20 @@ async def create_dm_message_from_decrypted(
|
||||
broadcast_fn: BroadcastFn,
|
||||
) -> int | None:
|
||||
"""Store and broadcast a decrypted direct message."""
|
||||
contact = await ContactRepository.get_by_key(their_public_key)
|
||||
if contact and contact.type == CONTACT_TYPE_REPEATER:
|
||||
logger.debug(
|
||||
"Skipping message from repeater %s (CLI responses not stored): %s",
|
||||
their_public_key[:12],
|
||||
(decrypted.message or "")[:50],
|
||||
)
|
||||
return None
|
||||
from app.services.dm_ingest import ingest_decrypted_direct_message
|
||||
|
||||
received = received_at or int(time.time())
|
||||
conversation_key = their_public_key.lower()
|
||||
sender_name = contact.name if contact and not outgoing else None
|
||||
|
||||
async with _decrypted_dm_store_lock:
|
||||
linked_message_id = await RawPacketRepository.get_linked_message_id(packet_id)
|
||||
if linked_message_id is not None:
|
||||
existing_msg = await MessageRepository.get_by_id(linked_message_id)
|
||||
if existing_msg is not None:
|
||||
await _reconcile_duplicate_message(
|
||||
existing_msg=existing_msg,
|
||||
packet_id=packet_id,
|
||||
path=path,
|
||||
received_at=received,
|
||||
path_len=path_len,
|
||||
broadcast_fn=broadcast_fn,
|
||||
)
|
||||
return None
|
||||
|
||||
if outgoing:
|
||||
existing_msg = await MessageRepository.get_by_content(
|
||||
msg_type="PRIV",
|
||||
conversation_key=conversation_key,
|
||||
text=decrypted.message,
|
||||
sender_timestamp=decrypted.timestamp,
|
||||
)
|
||||
if existing_msg is not None:
|
||||
await _reconcile_duplicate_message(
|
||||
existing_msg=existing_msg,
|
||||
packet_id=packet_id,
|
||||
path=path,
|
||||
received_at=received,
|
||||
path_len=path_len,
|
||||
broadcast_fn=broadcast_fn,
|
||||
)
|
||||
return None
|
||||
|
||||
msg_id = await MessageRepository.create(
|
||||
msg_type="PRIV",
|
||||
text=decrypted.message,
|
||||
conversation_key=conversation_key,
|
||||
sender_timestamp=decrypted.timestamp,
|
||||
received_at=received,
|
||||
path=path,
|
||||
path_len=path_len,
|
||||
outgoing=outgoing,
|
||||
sender_key=conversation_key if not outgoing else None,
|
||||
sender_name=sender_name,
|
||||
)
|
||||
if msg_id is None:
|
||||
await handle_duplicate_message(
|
||||
packet_id=packet_id,
|
||||
msg_type="PRIV",
|
||||
conversation_key=conversation_key,
|
||||
text=decrypted.message,
|
||||
sender_timestamp=decrypted.timestamp,
|
||||
path=path,
|
||||
received_at=received,
|
||||
path_len=path_len,
|
||||
broadcast_fn=broadcast_fn,
|
||||
)
|
||||
return None
|
||||
|
||||
logger.info(
|
||||
'Stored direct message "%s" for %r (msg ID %d in contact ID %s, outgoing=%s)',
|
||||
_truncate_for_log(decrypted.message),
|
||||
_format_contact_log_target(contact.name if contact else None, conversation_key),
|
||||
msg_id,
|
||||
conversation_key,
|
||||
outgoing,
|
||||
)
|
||||
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
|
||||
|
||||
broadcast_message(
|
||||
message=build_message_model(
|
||||
message_id=msg_id,
|
||||
msg_type="PRIV",
|
||||
conversation_key=conversation_key,
|
||||
text=decrypted.message,
|
||||
sender_timestamp=decrypted.timestamp,
|
||||
received_at=received,
|
||||
paths=build_message_paths(path, received, path_len),
|
||||
outgoing=outgoing,
|
||||
sender_name=sender_name,
|
||||
sender_key=conversation_key if not outgoing else None,
|
||||
),
|
||||
broadcast_fn=broadcast_fn,
|
||||
realtime=realtime,
|
||||
)
|
||||
|
||||
await ContactRepository.update_last_contacted(conversation_key, received)
|
||||
return msg_id
|
||||
|
||||
|
||||
async def create_fallback_direct_message(
|
||||
*,
|
||||
conversation_key: str,
|
||||
text: str,
|
||||
sender_timestamp: int,
|
||||
received_at: int,
|
||||
path: str | None,
|
||||
path_len: int | None,
|
||||
txt_type: int,
|
||||
signature: str | None,
|
||||
sender_name: str | None,
|
||||
sender_key: str | None,
|
||||
broadcast_fn: BroadcastFn,
|
||||
message_repository=MessageRepository,
|
||||
) -> Message | None:
|
||||
"""Store and broadcast a CONTACT_MSG_RECV fallback direct message."""
|
||||
existing = await message_repository.get_by_content(
|
||||
msg_type="PRIV",
|
||||
conversation_key=conversation_key,
|
||||
text=text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
)
|
||||
if existing is not None:
|
||||
return None
|
||||
|
||||
msg_id = await message_repository.create(
|
||||
msg_type="PRIV",
|
||||
text=text,
|
||||
conversation_key=conversation_key,
|
||||
sender_timestamp=sender_timestamp,
|
||||
message = await ingest_decrypted_direct_message(
|
||||
packet_id=packet_id,
|
||||
decrypted=decrypted,
|
||||
their_public_key=their_public_key,
|
||||
received_at=received_at,
|
||||
path=path,
|
||||
path_len=path_len,
|
||||
txt_type=txt_type,
|
||||
signature=signature,
|
||||
sender_key=sender_key,
|
||||
sender_name=sender_name,
|
||||
outgoing=outgoing,
|
||||
realtime=realtime,
|
||||
broadcast_fn=broadcast_fn,
|
||||
)
|
||||
if msg_id is None:
|
||||
return None
|
||||
|
||||
message = build_message_model(
|
||||
message_id=msg_id,
|
||||
msg_type="PRIV",
|
||||
conversation_key=conversation_key,
|
||||
text=text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
received_at=received_at,
|
||||
paths=build_message_paths(path, received_at, path_len),
|
||||
txt_type=txt_type,
|
||||
signature=signature,
|
||||
sender_key=sender_key,
|
||||
sender_name=sender_name,
|
||||
)
|
||||
broadcast_message(message=message, broadcast_fn=broadcast_fn)
|
||||
return message
|
||||
return message.id if message is not None else None
|
||||
|
||||
|
||||
async def create_fallback_channel_message(
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { useEffect, useCallback, useRef, useState } from 'react';
|
||||
import { api } from './api';
|
||||
import * as messageCache from './messageCache';
|
||||
import { takePrefetchOrFetch } from './prefetch';
|
||||
import { useWebSocket } from './useWebSocket';
|
||||
import {
|
||||
@@ -109,6 +108,7 @@ export function App() {
|
||||
// useConversationRouter, but useConversationRouter needs channels/contacts from
|
||||
// useContactsAndChannels. We break the cycle with a ref-based indirection.
|
||||
const setActiveConversationRef = useRef<(conv: Conversation | null) => void>(() => {});
|
||||
const removeConversationMessagesRef = useRef<(conversationId: string) => void>(() => {});
|
||||
|
||||
// --- Extracted hooks ---
|
||||
|
||||
@@ -180,6 +180,8 @@ export function App() {
|
||||
setActiveConversation: (conv) => setActiveConversationRef.current(conv),
|
||||
pendingDeleteFallbackRef,
|
||||
hasSetDefaultConversation,
|
||||
removeConversationMessages: (conversationId) =>
|
||||
removeConversationMessagesRef.current(conversationId),
|
||||
});
|
||||
|
||||
// useConversationRouter is called second — it receives channels/contacts as inputs
|
||||
@@ -228,25 +230,27 @@ export function App() {
|
||||
hasOlderMessages,
|
||||
hasNewerMessages,
|
||||
loadingNewer,
|
||||
hasNewerMessagesRef,
|
||||
fetchOlderMessages,
|
||||
fetchNewerMessages,
|
||||
jumpToBottom,
|
||||
reloadCurrentConversation,
|
||||
addMessageIfNew,
|
||||
updateMessageAck,
|
||||
triggerReconcile,
|
||||
observeMessage,
|
||||
receiveMessageAck,
|
||||
reconcileOnReconnect,
|
||||
renameConversationMessages,
|
||||
removeConversationMessages,
|
||||
clearConversationMessages,
|
||||
} = useConversationMessages(activeConversation, targetMessageId);
|
||||
removeConversationMessagesRef.current = removeConversationMessages;
|
||||
|
||||
const {
|
||||
unreadCounts,
|
||||
mentions,
|
||||
lastMessageTimes,
|
||||
unreadLastReadAts,
|
||||
incrementUnread,
|
||||
recordMessageEvent,
|
||||
renameConversationState,
|
||||
markAllRead,
|
||||
trackNewMessage,
|
||||
refreshUnreads,
|
||||
} = useUnreadCounts(channels, contacts, activeConversation);
|
||||
|
||||
@@ -308,7 +312,7 @@ export function App() {
|
||||
setHealth,
|
||||
fetchConfig,
|
||||
setRawPackets,
|
||||
triggerReconcile,
|
||||
reconcileOnReconnect,
|
||||
refreshUnreads,
|
||||
setChannels,
|
||||
fetchAllContacts,
|
||||
@@ -316,23 +320,23 @@ export function App() {
|
||||
blockedKeysRef,
|
||||
blockedNamesRef,
|
||||
activeConversationRef,
|
||||
hasNewerMessagesRef,
|
||||
addMessageIfNew,
|
||||
trackNewMessage,
|
||||
incrementUnread,
|
||||
observeMessage,
|
||||
recordMessageEvent,
|
||||
renameConversationState,
|
||||
checkMention,
|
||||
pendingDeleteFallbackRef,
|
||||
setActiveConversation,
|
||||
updateMessageAck,
|
||||
renameConversationMessages,
|
||||
removeConversationMessages,
|
||||
receiveMessageAck,
|
||||
notifyIncomingMessage,
|
||||
});
|
||||
const handleVisibilityPolicyChanged = useCallback(() => {
|
||||
messageCache.clear();
|
||||
clearConversationMessages();
|
||||
reloadCurrentConversation();
|
||||
void refreshUnreads();
|
||||
setVisibilityVersion((current) => current + 1);
|
||||
}, [refreshUnreads, reloadCurrentConversation]);
|
||||
}, [clearConversationMessages, refreshUnreads, reloadCurrentConversation]);
|
||||
|
||||
const handleBlockKey = useCallback(
|
||||
async (key: string) => {
|
||||
@@ -361,7 +365,7 @@ export function App() {
|
||||
activeConversationRef,
|
||||
setContacts,
|
||||
setChannels,
|
||||
addMessageIfNew,
|
||||
observeMessage,
|
||||
messageInputRef,
|
||||
});
|
||||
const handleCreateCrackedChannel = useCallback(
|
||||
|
||||
@@ -329,7 +329,7 @@ export function MessageList({
|
||||
}, [messages, onResendChannelMessage]);
|
||||
|
||||
// Sort messages by received_at ascending (oldest first)
|
||||
// Note: Deduplication is handled by useConversationMessages.addMessageIfNew()
|
||||
// Note: Deduplication is handled by useConversationMessages.observeMessage()
|
||||
// and the database UNIQUE constraint on (type, conversation_key, text, sender_timestamp)
|
||||
const sortedMessages = useMemo(
|
||||
() => [...messages].sort((a, b) => a.received_at - b.received_at || a.id - b.id),
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
export { useUnreadCounts } from './useUnreadCounts';
|
||||
export { useConversationMessages, getMessageContentKey } from './useConversationMessages';
|
||||
export { useConversationMessages } from './useConversationMessages';
|
||||
export { useRadioControl } from './useRadioControl';
|
||||
export { useRepeaterDashboard } from './useRepeaterDashboard';
|
||||
export { useAppShell } from './useAppShell';
|
||||
|
||||
@@ -2,7 +2,6 @@ import { useState, useCallback, type MutableRefObject } from 'react';
|
||||
import { api } from '../api';
|
||||
import { takePrefetchOrFetch } from '../prefetch';
|
||||
import { toast } from '../components/ui/sonner';
|
||||
import * as messageCache from '../messageCache';
|
||||
import { getContactDisplayName } from '../utils/pubkey';
|
||||
import { findPublicChannel, PUBLIC_CHANNEL_KEY, PUBLIC_CHANNEL_NAME } from '../utils/publicChannel';
|
||||
import type { Channel, Contact, Conversation } from '../types';
|
||||
@@ -11,12 +10,14 @@ interface UseContactsAndChannelsArgs {
|
||||
setActiveConversation: (conv: Conversation | null) => void;
|
||||
pendingDeleteFallbackRef: MutableRefObject<boolean>;
|
||||
hasSetDefaultConversation: MutableRefObject<boolean>;
|
||||
removeConversationMessages: (conversationId: string) => void;
|
||||
}
|
||||
|
||||
export function useContactsAndChannels({
|
||||
setActiveConversation,
|
||||
pendingDeleteFallbackRef,
|
||||
hasSetDefaultConversation,
|
||||
removeConversationMessages,
|
||||
}: UseContactsAndChannelsArgs) {
|
||||
const [contacts, setContacts] = useState<Contact[]>([]);
|
||||
const [contactsLoaded, setContactsLoaded] = useState(false);
|
||||
@@ -117,7 +118,7 @@ export function useContactsAndChannels({
|
||||
try {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
await api.deleteChannel(key);
|
||||
messageCache.remove(key);
|
||||
removeConversationMessages(key);
|
||||
const refreshedChannels = await api.getChannels();
|
||||
setChannels(refreshedChannels);
|
||||
const publicChannel = findPublicChannel(refreshedChannels);
|
||||
@@ -135,7 +136,12 @@ export function useContactsAndChannels({
|
||||
});
|
||||
}
|
||||
},
|
||||
[setActiveConversation, pendingDeleteFallbackRef, hasSetDefaultConversation]
|
||||
[
|
||||
hasSetDefaultConversation,
|
||||
pendingDeleteFallbackRef,
|
||||
removeConversationMessages,
|
||||
setActiveConversation,
|
||||
]
|
||||
);
|
||||
|
||||
const handleDeleteContact = useCallback(
|
||||
@@ -144,7 +150,7 @@ export function useContactsAndChannels({
|
||||
try {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
await api.deleteContact(publicKey);
|
||||
messageCache.remove(publicKey);
|
||||
removeConversationMessages(publicKey);
|
||||
setContacts((prev) => prev.filter((c) => c.public_key !== publicKey));
|
||||
const refreshedChannels = await api.getChannels();
|
||||
setChannels(refreshedChannels);
|
||||
@@ -163,7 +169,12 @@ export function useContactsAndChannels({
|
||||
});
|
||||
}
|
||||
},
|
||||
[setActiveConversation, pendingDeleteFallbackRef, hasSetDefaultConversation]
|
||||
[
|
||||
hasSetDefaultConversation,
|
||||
pendingDeleteFallbackRef,
|
||||
removeConversationMessages,
|
||||
setActiveConversation,
|
||||
]
|
||||
);
|
||||
|
||||
return {
|
||||
|
||||
@@ -10,7 +10,7 @@ interface UseConversationActionsArgs {
|
||||
activeConversationRef: MutableRefObject<Conversation | null>;
|
||||
setContacts: React.Dispatch<React.SetStateAction<Contact[]>>;
|
||||
setChannels: React.Dispatch<React.SetStateAction<Channel[]>>;
|
||||
addMessageIfNew: (msg: Message) => boolean;
|
||||
observeMessage: (msg: Message) => { added: boolean; activeConversation: boolean };
|
||||
messageInputRef: RefObject<MessageInputHandle | null>;
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ export function useConversationActions({
|
||||
activeConversationRef,
|
||||
setContacts,
|
||||
setChannels,
|
||||
addMessageIfNew,
|
||||
observeMessage,
|
||||
messageInputRef,
|
||||
}: UseConversationActionsArgs): UseConversationActionsResult {
|
||||
const mergeChannelIntoList = useCallback(
|
||||
@@ -60,10 +60,10 @@ export function useConversationActions({
|
||||
: await api.sendDirectMessage(activeConversation.id, text);
|
||||
|
||||
if (activeConversationRef.current?.id === conversationId) {
|
||||
addMessageIfNew(sent);
|
||||
observeMessage(sent);
|
||||
}
|
||||
},
|
||||
[activeConversation, activeConversationRef, addMessageIfNew]
|
||||
[activeConversation, activeConversationRef, observeMessage]
|
||||
);
|
||||
|
||||
const handleResendChannelMessage = useCallback(
|
||||
@@ -77,7 +77,7 @@ export function useConversationActions({
|
||||
activeConversationRef.current?.type === 'channel' &&
|
||||
activeConversationRef.current.id === resentMessage.conversation_key
|
||||
) {
|
||||
addMessageIfNew(resentMessage);
|
||||
observeMessage(resentMessage);
|
||||
}
|
||||
toast.success(newTimestamp ? 'Message resent with new timestamp' : 'Message resent');
|
||||
} catch (err) {
|
||||
@@ -86,7 +86,7 @@ export function useConversationActions({
|
||||
});
|
||||
}
|
||||
},
|
||||
[activeConversationRef, addMessageIfNew]
|
||||
[activeConversationRef, observeMessage]
|
||||
);
|
||||
|
||||
const handleSetChannelFloodScopeOverride = useCallback(
|
||||
|
||||
@@ -1,19 +1,174 @@
|
||||
import {
|
||||
useCallback,
|
||||
useEffect,
|
||||
useRef,
|
||||
useState,
|
||||
type Dispatch,
|
||||
type MutableRefObject,
|
||||
type SetStateAction,
|
||||
} from 'react';
|
||||
import { useCallback, useEffect, useRef, useState } from 'react';
|
||||
import { toast } from '../components/ui/sonner';
|
||||
import { api, isAbortError } from '../api';
|
||||
import * as messageCache from '../messageCache';
|
||||
import type { Conversation, Message, MessagePath } from '../types';
|
||||
import { getMessageContentKey } from '../utils/messageIdentity';
|
||||
|
||||
const MAX_PENDING_ACKS = 500;
|
||||
const MESSAGE_PAGE_SIZE = 200;
|
||||
export const MAX_CACHED_CONVERSATIONS = 20;
|
||||
export const MAX_MESSAGES_PER_ENTRY = 200;
|
||||
|
||||
interface CachedConversationEntry {
|
||||
messages: Message[];
|
||||
hasOlderMessages: boolean;
|
||||
}
|
||||
|
||||
interface InternalCachedConversationEntry extends CachedConversationEntry {
|
||||
contentKeys: Set<string>;
|
||||
}
|
||||
|
||||
export class ConversationMessageCache {
|
||||
private readonly cache = new Map<string, InternalCachedConversationEntry>();
|
||||
|
||||
get(id: string): CachedConversationEntry | undefined {
|
||||
const entry = this.cache.get(id);
|
||||
if (!entry) return undefined;
|
||||
this.cache.delete(id);
|
||||
this.cache.set(id, entry);
|
||||
return {
|
||||
messages: entry.messages,
|
||||
hasOlderMessages: entry.hasOlderMessages,
|
||||
};
|
||||
}
|
||||
|
||||
set(id: string, entry: CachedConversationEntry): void {
|
||||
const contentKeys = new Set(entry.messages.map((message) => getMessageContentKey(message)));
|
||||
if (entry.messages.length > MAX_MESSAGES_PER_ENTRY) {
|
||||
const trimmed = [...entry.messages]
|
||||
.sort((a, b) => b.received_at - a.received_at)
|
||||
.slice(0, MAX_MESSAGES_PER_ENTRY);
|
||||
entry = { ...entry, messages: trimmed, hasOlderMessages: true };
|
||||
}
|
||||
const internalEntry: InternalCachedConversationEntry = {
|
||||
...entry,
|
||||
contentKeys,
|
||||
};
|
||||
this.cache.delete(id);
|
||||
this.cache.set(id, internalEntry);
|
||||
if (this.cache.size > MAX_CACHED_CONVERSATIONS) {
|
||||
const lruKey = this.cache.keys().next().value as string;
|
||||
this.cache.delete(lruKey);
|
||||
}
|
||||
}
|
||||
|
||||
addMessage(id: string, msg: Message): boolean {
|
||||
const entry = this.cache.get(id);
|
||||
const contentKey = getMessageContentKey(msg);
|
||||
if (!entry) {
|
||||
this.cache.set(id, {
|
||||
messages: [msg],
|
||||
hasOlderMessages: true,
|
||||
contentKeys: new Set([contentKey]),
|
||||
});
|
||||
if (this.cache.size > MAX_CACHED_CONVERSATIONS) {
|
||||
const lruKey = this.cache.keys().next().value as string;
|
||||
this.cache.delete(lruKey);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (entry.contentKeys.has(contentKey)) return false;
|
||||
if (entry.messages.some((message) => message.id === msg.id)) return false;
|
||||
entry.contentKeys.add(contentKey);
|
||||
entry.messages = [...entry.messages, msg];
|
||||
if (entry.messages.length > MAX_MESSAGES_PER_ENTRY) {
|
||||
entry.messages = [...entry.messages]
|
||||
.sort((a, b) => b.received_at - a.received_at)
|
||||
.slice(0, MAX_MESSAGES_PER_ENTRY);
|
||||
}
|
||||
this.cache.delete(id);
|
||||
this.cache.set(id, entry);
|
||||
return true;
|
||||
}
|
||||
|
||||
updateAck(messageId: number, ackCount: number, paths?: MessagePath[]): void {
|
||||
for (const entry of this.cache.values()) {
|
||||
const index = entry.messages.findIndex((message) => message.id === messageId);
|
||||
if (index < 0) continue;
|
||||
const current = entry.messages[index];
|
||||
const updated = [...entry.messages];
|
||||
updated[index] = {
|
||||
...current,
|
||||
acked: Math.max(current.acked, ackCount),
|
||||
...(paths !== undefined && paths.length >= (current.paths?.length ?? 0) && { paths }),
|
||||
};
|
||||
entry.messages = updated;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
remove(id: string): void {
|
||||
this.cache.delete(id);
|
||||
}
|
||||
|
||||
rename(oldId: string, newId: string): void {
|
||||
if (oldId === newId) return;
|
||||
const oldEntry = this.cache.get(oldId);
|
||||
if (!oldEntry) return;
|
||||
|
||||
const newEntry = this.cache.get(newId);
|
||||
if (!newEntry) {
|
||||
this.cache.delete(oldId);
|
||||
this.cache.set(newId, oldEntry);
|
||||
return;
|
||||
}
|
||||
|
||||
const mergedMessages = [...newEntry.messages];
|
||||
const seenIds = new Set(mergedMessages.map((message) => message.id));
|
||||
for (const message of oldEntry.messages) {
|
||||
if (!seenIds.has(message.id)) {
|
||||
mergedMessages.push(message);
|
||||
seenIds.add(message.id);
|
||||
}
|
||||
}
|
||||
|
||||
this.cache.delete(oldId);
|
||||
this.cache.set(newId, {
|
||||
messages: mergedMessages,
|
||||
hasOlderMessages: newEntry.hasOlderMessages || oldEntry.hasOlderMessages,
|
||||
contentKeys: new Set([...newEntry.contentKeys, ...oldEntry.contentKeys]),
|
||||
});
|
||||
}
|
||||
|
||||
clear(): void {
|
||||
this.cache.clear();
|
||||
}
|
||||
}
|
||||
|
||||
export function reconcileConversationMessages(
|
||||
current: Message[],
|
||||
fetched: Message[]
|
||||
): Message[] | null {
|
||||
const currentById = new Map<number, { acked: number; pathsLen: number; text: string }>();
|
||||
for (const message of current) {
|
||||
currentById.set(message.id, {
|
||||
acked: message.acked,
|
||||
pathsLen: message.paths?.length ?? 0,
|
||||
text: message.text,
|
||||
});
|
||||
}
|
||||
|
||||
let needsUpdate = false;
|
||||
for (const message of fetched) {
|
||||
const currentMessage = currentById.get(message.id);
|
||||
if (
|
||||
!currentMessage ||
|
||||
currentMessage.acked !== message.acked ||
|
||||
currentMessage.pathsLen !== (message.paths?.length ?? 0) ||
|
||||
currentMessage.text !== message.text
|
||||
) {
|
||||
needsUpdate = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!needsUpdate) return null;
|
||||
|
||||
const fetchedIds = new Set(fetched.map((message) => message.id));
|
||||
const olderMessages = current.filter((message) => !fetchedIds.has(message.id));
|
||||
return [...fetched, ...olderMessages];
|
||||
}
|
||||
|
||||
export const conversationMessageCache = new ConversationMessageCache();
|
||||
|
||||
interface PendingAckUpdate {
|
||||
ackCount: number;
|
||||
@@ -56,15 +211,6 @@ export function mergePendingAck(
|
||||
return existing;
|
||||
}
|
||||
|
||||
// Generate a key for deduplicating messages by content
|
||||
export function getMessageContentKey(msg: Message): string {
|
||||
// When sender_timestamp exists, dedup by content (catches radio-path duplicates with different IDs).
|
||||
// When null, include msg.id so each message gets a unique key — avoids silently dropping
|
||||
// different messages that share the same text and received_at second.
|
||||
const ts = msg.sender_timestamp ?? `r${msg.received_at}-${msg.id}`;
|
||||
return `${msg.type}-${msg.conversation_key}-${msg.text}-${ts}`;
|
||||
}
|
||||
|
||||
interface UseConversationMessagesResult {
|
||||
messages: Message[];
|
||||
messagesLoading: boolean;
|
||||
@@ -72,21 +218,36 @@ interface UseConversationMessagesResult {
|
||||
hasOlderMessages: boolean;
|
||||
hasNewerMessages: boolean;
|
||||
loadingNewer: boolean;
|
||||
hasNewerMessagesRef: MutableRefObject<boolean>;
|
||||
setMessages: Dispatch<SetStateAction<Message[]>>;
|
||||
fetchOlderMessages: () => Promise<void>;
|
||||
fetchNewerMessages: () => Promise<void>;
|
||||
jumpToBottom: () => void;
|
||||
reloadCurrentConversation: () => void;
|
||||
addMessageIfNew: (msg: Message) => boolean;
|
||||
updateMessageAck: (messageId: number, ackCount: number, paths?: MessagePath[]) => void;
|
||||
triggerReconcile: () => void;
|
||||
observeMessage: (msg: Message) => { added: boolean; activeConversation: boolean };
|
||||
receiveMessageAck: (messageId: number, ackCount: number, paths?: MessagePath[]) => void;
|
||||
reconcileOnReconnect: () => void;
|
||||
renameConversationMessages: (oldId: string, newId: string) => void;
|
||||
removeConversationMessages: (conversationId: string) => void;
|
||||
clearConversationMessages: () => void;
|
||||
}
|
||||
|
||||
function isMessageConversation(conversation: Conversation | null): conversation is Conversation {
|
||||
return !!conversation && !['raw', 'map', 'visualizer', 'search'].includes(conversation.type);
|
||||
}
|
||||
|
||||
function isActiveConversationMessage(
|
||||
activeConversation: Conversation | null,
|
||||
msg: Message
|
||||
): boolean {
|
||||
if (!activeConversation) return false;
|
||||
if (msg.type === 'CHAN' && activeConversation.type === 'channel') {
|
||||
return msg.conversation_key === activeConversation.id;
|
||||
}
|
||||
if (msg.type === 'PRIV' && activeConversation.type === 'contact') {
|
||||
return msg.conversation_key === activeConversation.id;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function appendUniqueMessages(current: Message[], incoming: Message[]): Message[] {
|
||||
if (incoming.length === 0) return current;
|
||||
|
||||
@@ -165,8 +326,10 @@ export function useConversationMessages(
|
||||
const newerAbortControllerRef = useRef<AbortController | null>(null);
|
||||
const fetchingConversationIdRef = useRef<string | null>(null);
|
||||
const latestReconcileRequestIdRef = useRef(0);
|
||||
const pendingReconnectReconcileRef = useRef(false);
|
||||
const messagesRef = useRef<Message[]>([]);
|
||||
const loadingOlderRef = useRef(false);
|
||||
const loadingNewerRef = useRef(false);
|
||||
const hasOlderMessagesRef = useRef(false);
|
||||
const hasNewerMessagesRef = useRef(false);
|
||||
const prevConversationIdRef = useRef<string | null>(null);
|
||||
@@ -181,6 +344,10 @@ export function useConversationMessages(
|
||||
loadingOlderRef.current = loadingOlder;
|
||||
}, [loadingOlder]);
|
||||
|
||||
useEffect(() => {
|
||||
loadingNewerRef.current = loadingNewer;
|
||||
}, [loadingNewer]);
|
||||
|
||||
useEffect(() => {
|
||||
hasOlderMessagesRef.current = hasOlderMessages;
|
||||
}, [hasOlderMessages]);
|
||||
@@ -208,6 +375,7 @@ export function useConversationMessages(
|
||||
}
|
||||
|
||||
const conversationId = activeConversation.id;
|
||||
pendingReconnectReconcileRef.current = false;
|
||||
|
||||
if (showLoading) {
|
||||
setMessagesLoading(true);
|
||||
@@ -229,7 +397,7 @@ export function useConversationMessages(
|
||||
}
|
||||
|
||||
const messagesWithPendingAck = data.map((msg) => applyPendingAck(msg));
|
||||
const merged = messageCache.reconcile(messagesRef.current, messagesWithPendingAck);
|
||||
const merged = reconcileConversationMessages(messagesRef.current, messagesWithPendingAck);
|
||||
const nextMessages = merged ?? messagesRef.current;
|
||||
if (merged) {
|
||||
setMessages(merged);
|
||||
@@ -271,7 +439,7 @@ export function useConversationMessages(
|
||||
|
||||
const dataWithPendingAck = data.map((msg) => applyPendingAck(msg));
|
||||
setHasOlderMessages(dataWithPendingAck.length >= MESSAGE_PAGE_SIZE);
|
||||
const merged = messageCache.reconcile(messagesRef.current, dataWithPendingAck);
|
||||
const merged = reconcileConversationMessages(messagesRef.current, dataWithPendingAck);
|
||||
if (!merged) return;
|
||||
|
||||
setMessages(merged);
|
||||
@@ -295,7 +463,7 @@ export function useConversationMessages(
|
||||
}
|
||||
|
||||
const conversationId = activeConversation.id;
|
||||
const oldestMessage = messages.reduce(
|
||||
const oldestMessage = messagesRef.current.reduce(
|
||||
(oldest, msg) => {
|
||||
if (!oldest) return msg;
|
||||
if (msg.received_at < oldest.received_at) return msg;
|
||||
@@ -356,13 +524,19 @@ export function useConversationMessages(
|
||||
loadingOlderRef.current = false;
|
||||
setLoadingOlder(false);
|
||||
}
|
||||
}, [activeConversation, applyPendingAck, messages, syncSeenContent]);
|
||||
}, [activeConversation, applyPendingAck, syncSeenContent]);
|
||||
|
||||
const fetchNewerMessages = useCallback(async () => {
|
||||
if (!isMessageConversation(activeConversation) || loadingNewer || !hasNewerMessages) return;
|
||||
if (
|
||||
!isMessageConversation(activeConversation) ||
|
||||
loadingNewerRef.current ||
|
||||
!hasNewerMessagesRef.current
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const conversationId = activeConversation.id;
|
||||
const newestMessage = messages.reduce(
|
||||
const newestMessage = messagesRef.current.reduce(
|
||||
(newest, msg) => {
|
||||
if (!newest) return msg;
|
||||
if (msg.received_at > newest.received_at) return msg;
|
||||
@@ -373,6 +547,7 @@ export function useConversationMessages(
|
||||
);
|
||||
if (!newestMessage) return;
|
||||
|
||||
loadingNewerRef.current = true;
|
||||
setLoadingNewer(true);
|
||||
const controller = new AbortController();
|
||||
newerAbortControllerRef.current = controller;
|
||||
@@ -401,7 +576,15 @@ export function useConversationMessages(
|
||||
seenMessageContent.current.add(getMessageContentKey(msg));
|
||||
}
|
||||
}
|
||||
setHasNewerMessages(dataWithPendingAck.length >= MESSAGE_PAGE_SIZE);
|
||||
const stillHasNewerMessages = dataWithPendingAck.length >= MESSAGE_PAGE_SIZE;
|
||||
setHasNewerMessages(stillHasNewerMessages);
|
||||
if (!stillHasNewerMessages && pendingReconnectReconcileRef.current) {
|
||||
pendingReconnectReconcileRef.current = false;
|
||||
const requestId = latestReconcileRequestIdRef.current + 1;
|
||||
latestReconcileRequestIdRef.current = requestId;
|
||||
const reconcileController = new AbortController();
|
||||
reconcileFromBackend(activeConversation, reconcileController.signal, requestId);
|
||||
}
|
||||
} catch (err) {
|
||||
if (isAbortError(err)) {
|
||||
return;
|
||||
@@ -414,26 +597,36 @@ export function useConversationMessages(
|
||||
if (newerAbortControllerRef.current === controller) {
|
||||
newerAbortControllerRef.current = null;
|
||||
}
|
||||
loadingNewerRef.current = false;
|
||||
setLoadingNewer(false);
|
||||
}
|
||||
}, [activeConversation, applyPendingAck, hasNewerMessages, loadingNewer, messages]);
|
||||
}, [activeConversation, applyPendingAck, reconcileFromBackend]);
|
||||
|
||||
const jumpToBottom = useCallback(() => {
|
||||
if (!activeConversation) return;
|
||||
setHasNewerMessages(false);
|
||||
messageCache.remove(activeConversation.id);
|
||||
conversationMessageCache.remove(activeConversation.id);
|
||||
void fetchLatestMessages(true);
|
||||
}, [activeConversation, fetchLatestMessages]);
|
||||
|
||||
const reloadCurrentConversation = useCallback(() => {
|
||||
if (!isMessageConversation(activeConversation)) return;
|
||||
setHasNewerMessages(false);
|
||||
messageCache.remove(activeConversation.id);
|
||||
conversationMessageCache.remove(activeConversation.id);
|
||||
setReloadVersion((current) => current + 1);
|
||||
}, [activeConversation]);
|
||||
|
||||
const triggerReconcile = useCallback(() => {
|
||||
if (!isMessageConversation(activeConversation)) return;
|
||||
const reconcileOnReconnect = useCallback(() => {
|
||||
if (!isMessageConversation(activeConversation)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (hasNewerMessagesRef.current) {
|
||||
pendingReconnectReconcileRef.current = true;
|
||||
return;
|
||||
}
|
||||
|
||||
pendingReconnectReconcileRef.current = false;
|
||||
const controller = new AbortController();
|
||||
const requestId = latestReconcileRequestIdRef.current + 1;
|
||||
latestReconcileRequestIdRef.current = requestId;
|
||||
@@ -461,6 +654,7 @@ export function useConversationMessages(
|
||||
prevConversationIdRef.current = newId;
|
||||
prevReloadVersionRef.current = reloadVersion;
|
||||
latestReconcileRequestIdRef.current = 0;
|
||||
pendingReconnectReconcileRef.current = false;
|
||||
|
||||
// Preserve around-loaded context on the same conversation when search clears targetMessageId.
|
||||
if (!conversationChanged && !targetMessageId && !reloadRequested) {
|
||||
@@ -480,9 +674,8 @@ export function useConversationMessages(
|
||||
messagesRef.current.length > 0 &&
|
||||
!hasNewerMessagesRef.current
|
||||
) {
|
||||
messageCache.set(prevId, {
|
||||
conversationMessageCache.set(prevId, {
|
||||
messages: messagesRef.current,
|
||||
seenContent: new Set(seenMessageContent.current),
|
||||
hasOlderMessages: hasOlderMessagesRef.current,
|
||||
});
|
||||
}
|
||||
@@ -524,10 +717,12 @@ export function useConversationMessages(
|
||||
setMessagesLoading(false);
|
||||
});
|
||||
} else {
|
||||
const cached = messageCache.get(activeConversation.id);
|
||||
const cached = conversationMessageCache.get(activeConversation.id);
|
||||
if (cached) {
|
||||
setMessages(cached.messages);
|
||||
seenMessageContent.current = new Set(cached.seenContent);
|
||||
seenMessageContent.current = new Set(
|
||||
cached.messages.map((message) => getMessageContentKey(message))
|
||||
);
|
||||
setHasOlderMessages(cached.hasOlderMessages);
|
||||
setMessagesLoading(false);
|
||||
const requestId = latestReconcileRequestIdRef.current + 1;
|
||||
@@ -544,9 +739,8 @@ export function useConversationMessages(
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [activeConversation?.id, activeConversation?.type, targetMessageId, reloadVersion]);
|
||||
|
||||
// Add a message if it's new (deduplication)
|
||||
// Returns true if the message was added, false if it was a duplicate
|
||||
const addMessageIfNew = useCallback(
|
||||
// Add a message to the active conversation if it is new.
|
||||
const appendActiveMessageIfNew = useCallback(
|
||||
(msg: Message): boolean => {
|
||||
const msgWithPendingAck = applyPendingAck(msg);
|
||||
const contentKey = getMessageContentKey(msgWithPendingAck);
|
||||
@@ -616,6 +810,56 @@ export function useConversationMessages(
|
||||
[messagesRef, setMessages, setPendingAck]
|
||||
);
|
||||
|
||||
const receiveMessageAck = useCallback(
|
||||
(messageId: number, ackCount: number, paths?: MessagePath[]) => {
|
||||
updateMessageAck(messageId, ackCount, paths);
|
||||
conversationMessageCache.updateAck(messageId, ackCount, paths);
|
||||
},
|
||||
[updateMessageAck]
|
||||
);
|
||||
|
||||
const observeMessage = useCallback(
|
||||
(msg: Message): { added: boolean; activeConversation: boolean } => {
|
||||
const msgWithPendingAck = applyPendingAck(msg);
|
||||
const activeConversationMessage = isActiveConversationMessage(
|
||||
activeConversation,
|
||||
msgWithPendingAck
|
||||
);
|
||||
|
||||
if (activeConversationMessage) {
|
||||
if (hasNewerMessagesRef.current) {
|
||||
return { added: false, activeConversation: true };
|
||||
}
|
||||
|
||||
return {
|
||||
added: appendActiveMessageIfNew(msgWithPendingAck),
|
||||
activeConversation: true,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
added: conversationMessageCache.addMessage(
|
||||
msgWithPendingAck.conversation_key,
|
||||
msgWithPendingAck
|
||||
),
|
||||
activeConversation: false,
|
||||
};
|
||||
},
|
||||
[activeConversation, appendActiveMessageIfNew, applyPendingAck, hasNewerMessagesRef]
|
||||
);
|
||||
|
||||
const renameConversationMessages = useCallback((oldId: string, newId: string) => {
|
||||
conversationMessageCache.rename(oldId, newId);
|
||||
}, []);
|
||||
|
||||
const removeConversationMessages = useCallback((conversationId: string) => {
|
||||
conversationMessageCache.remove(conversationId);
|
||||
}, []);
|
||||
|
||||
const clearConversationMessages = useCallback(() => {
|
||||
conversationMessageCache.clear();
|
||||
}, []);
|
||||
|
||||
return {
|
||||
messages,
|
||||
messagesLoading,
|
||||
@@ -623,14 +867,15 @@ export function useConversationMessages(
|
||||
hasOlderMessages,
|
||||
hasNewerMessages,
|
||||
loadingNewer,
|
||||
hasNewerMessagesRef,
|
||||
setMessages,
|
||||
fetchOlderMessages,
|
||||
fetchNewerMessages,
|
||||
jumpToBottom,
|
||||
reloadCurrentConversation,
|
||||
addMessageIfNew,
|
||||
updateMessageAck,
|
||||
triggerReconcile,
|
||||
observeMessage,
|
||||
receiveMessageAck,
|
||||
reconcileOnReconnect,
|
||||
renameConversationMessages,
|
||||
removeConversationMessages,
|
||||
clearConversationMessages,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -6,14 +6,12 @@ import {
|
||||
type SetStateAction,
|
||||
} from 'react';
|
||||
import { api } from '../api';
|
||||
import * as messageCache from '../messageCache';
|
||||
import type { UseWebSocketOptions } from '../useWebSocket';
|
||||
import { toast } from '../components/ui/sonner';
|
||||
import { getStateKey } from '../utils/conversationState';
|
||||
import { mergeContactIntoList } from '../utils/contactMerge';
|
||||
import { getContactDisplayName } from '../utils/pubkey';
|
||||
import { appendRawPacketUnique } from '../utils/rawPacketIdentity';
|
||||
import { getMessageContentKey } from './useConversationMessages';
|
||||
import type {
|
||||
Channel,
|
||||
Contact,
|
||||
@@ -29,7 +27,7 @@ interface UseRealtimeAppStateArgs {
|
||||
setHealth: Dispatch<SetStateAction<HealthStatus | null>>;
|
||||
fetchConfig: () => void | Promise<void>;
|
||||
setRawPackets: Dispatch<SetStateAction<RawPacket[]>>;
|
||||
triggerReconcile: () => void;
|
||||
reconcileOnReconnect: () => void;
|
||||
refreshUnreads: () => Promise<void>;
|
||||
setChannels: Dispatch<SetStateAction<Channel[]>>;
|
||||
fetchAllContacts: () => Promise<Contact[]>;
|
||||
@@ -37,15 +35,20 @@ interface UseRealtimeAppStateArgs {
|
||||
blockedKeysRef: MutableRefObject<string[]>;
|
||||
blockedNamesRef: MutableRefObject<string[]>;
|
||||
activeConversationRef: MutableRefObject<Conversation | null>;
|
||||
hasNewerMessagesRef: MutableRefObject<boolean>;
|
||||
addMessageIfNew: (msg: Message) => boolean;
|
||||
trackNewMessage: (msg: Message) => void;
|
||||
incrementUnread: (stateKey: string, hasMention?: boolean) => void;
|
||||
observeMessage: (msg: Message) => { added: boolean; activeConversation: boolean };
|
||||
recordMessageEvent: (args: {
|
||||
msg: Message;
|
||||
activeConversation: boolean;
|
||||
isNewMessage: boolean;
|
||||
hasMention?: boolean;
|
||||
}) => void;
|
||||
renameConversationState: (oldStateKey: string, newStateKey: string) => void;
|
||||
checkMention: (text: string) => boolean;
|
||||
pendingDeleteFallbackRef: MutableRefObject<boolean>;
|
||||
setActiveConversation: (conv: Conversation | null) => void;
|
||||
updateMessageAck: (messageId: number, ackCount: number, paths?: MessagePath[]) => void;
|
||||
renameConversationMessages: (oldId: string, newId: string) => void;
|
||||
removeConversationMessages: (conversationId: string) => void;
|
||||
receiveMessageAck: (messageId: number, ackCount: number, paths?: MessagePath[]) => void;
|
||||
notifyIncomingMessage?: (msg: Message) => void;
|
||||
maxRawPackets?: number;
|
||||
}
|
||||
@@ -71,30 +74,12 @@ function isMessageBlocked(msg: Message, blockedKeys: string[], blockedNames: str
|
||||
return blockedNames.length > 0 && !!msg.sender_name && blockedNames.includes(msg.sender_name);
|
||||
}
|
||||
|
||||
function isActiveConversationMessage(
|
||||
activeConversation: Conversation | null,
|
||||
msg: Message
|
||||
): boolean {
|
||||
if (!activeConversation) return false;
|
||||
if (msg.type === 'CHAN' && activeConversation.type === 'channel') {
|
||||
return msg.conversation_key === activeConversation.id;
|
||||
}
|
||||
if (msg.type === 'PRIV' && activeConversation.type === 'contact') {
|
||||
return msg.conversation_key === activeConversation.id;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function isMessageConversation(conversation: Conversation | null): boolean {
|
||||
return conversation?.type === 'channel' || conversation?.type === 'contact';
|
||||
}
|
||||
|
||||
export function useRealtimeAppState({
|
||||
prevHealthRef,
|
||||
setHealth,
|
||||
fetchConfig,
|
||||
setRawPackets,
|
||||
triggerReconcile,
|
||||
reconcileOnReconnect,
|
||||
refreshUnreads,
|
||||
setChannels,
|
||||
fetchAllContacts,
|
||||
@@ -102,15 +87,15 @@ export function useRealtimeAppState({
|
||||
blockedKeysRef,
|
||||
blockedNamesRef,
|
||||
activeConversationRef,
|
||||
hasNewerMessagesRef,
|
||||
addMessageIfNew,
|
||||
trackNewMessage,
|
||||
incrementUnread,
|
||||
observeMessage,
|
||||
recordMessageEvent,
|
||||
renameConversationState,
|
||||
checkMention,
|
||||
pendingDeleteFallbackRef,
|
||||
setActiveConversation,
|
||||
updateMessageAck,
|
||||
renameConversationMessages,
|
||||
removeConversationMessages,
|
||||
receiveMessageAck,
|
||||
notifyIncomingMessage,
|
||||
maxRawPackets = 500,
|
||||
}: UseRealtimeAppStateArgs): UseWebSocketOptions {
|
||||
@@ -184,11 +169,7 @@ export function useRealtimeAppState({
|
||||
},
|
||||
onReconnect: () => {
|
||||
setRawPackets([]);
|
||||
if (
|
||||
!(hasNewerMessagesRef.current && isMessageConversation(activeConversationRef.current))
|
||||
) {
|
||||
triggerReconcile();
|
||||
}
|
||||
reconcileOnReconnect();
|
||||
refreshUnreads();
|
||||
api.getChannels().then(setChannels).catch(console.error);
|
||||
fetchAllContacts()
|
||||
@@ -200,34 +181,14 @@ export function useRealtimeAppState({
|
||||
return;
|
||||
}
|
||||
|
||||
const isForActiveConversation = isActiveConversationMessage(
|
||||
activeConversationRef.current,
|
||||
msg
|
||||
);
|
||||
let isNewMessage = false;
|
||||
|
||||
if (isForActiveConversation && !hasNewerMessagesRef.current) {
|
||||
isNewMessage = addMessageIfNew(msg);
|
||||
}
|
||||
|
||||
trackNewMessage(msg);
|
||||
|
||||
const contentKey = getMessageContentKey(msg);
|
||||
if (!isForActiveConversation) {
|
||||
isNewMessage = messageCache.addMessage(msg.conversation_key, msg, contentKey);
|
||||
|
||||
if (!msg.outgoing && isNewMessage) {
|
||||
let stateKey: string | null = null;
|
||||
if (msg.type === 'CHAN' && msg.conversation_key) {
|
||||
stateKey = getStateKey('channel', msg.conversation_key);
|
||||
} else if (msg.type === 'PRIV' && msg.conversation_key) {
|
||||
stateKey = getStateKey('contact', msg.conversation_key);
|
||||
}
|
||||
if (stateKey) {
|
||||
incrementUnread(stateKey, checkMention(msg.text));
|
||||
}
|
||||
}
|
||||
}
|
||||
const { added: isNewMessage, activeConversation: isForActiveConversation } =
|
||||
observeMessage(msg);
|
||||
recordMessageEvent({
|
||||
msg,
|
||||
activeConversation: isForActiveConversation,
|
||||
isNewMessage,
|
||||
hasMention: checkMention(msg.text),
|
||||
});
|
||||
|
||||
if (!msg.outgoing && isNewMessage) {
|
||||
notifyIncomingMessage?.(msg);
|
||||
@@ -243,7 +204,7 @@ export function useRealtimeAppState({
|
||||
contact
|
||||
)
|
||||
);
|
||||
messageCache.rename(previousPublicKey, contact.public_key);
|
||||
renameConversationMessages(previousPublicKey, contact.public_key);
|
||||
renameConversationState(
|
||||
getStateKey('contact', previousPublicKey),
|
||||
getStateKey('contact', contact.public_key)
|
||||
@@ -263,7 +224,7 @@ export function useRealtimeAppState({
|
||||
},
|
||||
onContactDeleted: (publicKey: string) => {
|
||||
setContacts((prev) => prev.filter((c) => c.public_key !== publicKey));
|
||||
messageCache.remove(publicKey);
|
||||
removeConversationMessages(publicKey);
|
||||
const active = activeConversationRef.current;
|
||||
if (active?.type === 'contact' && active.id === publicKey) {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
@@ -272,7 +233,7 @@ export function useRealtimeAppState({
|
||||
},
|
||||
onChannelDeleted: (key: string) => {
|
||||
setChannels((prev) => prev.filter((c) => c.key !== key));
|
||||
messageCache.remove(key);
|
||||
removeConversationMessages(key);
|
||||
const active = activeConversationRef.current;
|
||||
if (active?.type === 'channel' && active.id === key) {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
@@ -283,34 +244,33 @@ export function useRealtimeAppState({
|
||||
setRawPackets((prev) => appendRawPacketUnique(prev, packet, maxRawPackets));
|
||||
},
|
||||
onMessageAcked: (messageId: number, ackCount: number, paths?: MessagePath[]) => {
|
||||
updateMessageAck(messageId, ackCount, paths);
|
||||
messageCache.updateAck(messageId, ackCount, paths);
|
||||
receiveMessageAck(messageId, ackCount, paths);
|
||||
},
|
||||
}),
|
||||
[
|
||||
activeConversationRef,
|
||||
addMessageIfNew,
|
||||
blockedKeysRef,
|
||||
blockedNamesRef,
|
||||
checkMention,
|
||||
fetchAllContacts,
|
||||
fetchConfig,
|
||||
hasNewerMessagesRef,
|
||||
incrementUnread,
|
||||
renameConversationState,
|
||||
renameConversationMessages,
|
||||
maxRawPackets,
|
||||
mergeChannelIntoList,
|
||||
pendingDeleteFallbackRef,
|
||||
prevHealthRef,
|
||||
recordMessageEvent,
|
||||
receiveMessageAck,
|
||||
observeMessage,
|
||||
refreshUnreads,
|
||||
reconcileOnReconnect,
|
||||
removeConversationMessages,
|
||||
setActiveConversation,
|
||||
setChannels,
|
||||
setContacts,
|
||||
setHealth,
|
||||
setRawPackets,
|
||||
trackNewMessage,
|
||||
triggerReconcile,
|
||||
updateMessageAck,
|
||||
notifyIncomingMessage,
|
||||
]
|
||||
);
|
||||
|
||||
@@ -16,10 +16,14 @@ interface UseUnreadCountsResult {
|
||||
mentions: Record<string, boolean>;
|
||||
lastMessageTimes: ConversationTimes;
|
||||
unreadLastReadAts: Record<string, number | null>;
|
||||
incrementUnread: (stateKey: string, hasMention?: boolean) => void;
|
||||
recordMessageEvent: (args: {
|
||||
msg: Message;
|
||||
activeConversation: boolean;
|
||||
isNewMessage: boolean;
|
||||
hasMention?: boolean;
|
||||
}) => void;
|
||||
renameConversationState: (oldStateKey: string, newStateKey: string) => void;
|
||||
markAllRead: () => void;
|
||||
trackNewMessage: (msg: Message) => void;
|
||||
refreshUnreads: () => Promise<void>;
|
||||
}
|
||||
|
||||
@@ -162,7 +166,6 @@ export function useUnreadCounts(
|
||||
}
|
||||
}, [activeConversation]);
|
||||
|
||||
// Increment unread count for a conversation
|
||||
const incrementUnread = useCallback((stateKey: string, hasMention?: boolean) => {
|
||||
setUnreadCounts((prev) => ({
|
||||
...prev,
|
||||
@@ -176,6 +179,40 @@ export function useUnreadCounts(
|
||||
}
|
||||
}, []);
|
||||
|
||||
const recordMessageEvent = useCallback(
|
||||
({
|
||||
msg,
|
||||
activeConversation: isActiveConversation,
|
||||
isNewMessage,
|
||||
hasMention,
|
||||
}: {
|
||||
msg: Message;
|
||||
activeConversation: boolean;
|
||||
isNewMessage: boolean;
|
||||
hasMention?: boolean;
|
||||
}) => {
|
||||
let stateKey: string | null = null;
|
||||
if (msg.type === 'CHAN' && msg.conversation_key) {
|
||||
stateKey = getStateKey('channel', msg.conversation_key);
|
||||
} else if (msg.type === 'PRIV' && msg.conversation_key) {
|
||||
stateKey = getStateKey('contact', msg.conversation_key);
|
||||
}
|
||||
|
||||
if (!stateKey) {
|
||||
return;
|
||||
}
|
||||
|
||||
const timestamp = msg.received_at || Math.floor(Date.now() / 1000);
|
||||
const updated = setLastMessageTime(stateKey, timestamp);
|
||||
setLastMessageTimes(updated);
|
||||
|
||||
if (!isActiveConversation && !msg.outgoing && isNewMessage) {
|
||||
incrementUnread(stateKey, hasMention);
|
||||
}
|
||||
},
|
||||
[incrementUnread]
|
||||
);
|
||||
|
||||
const renameConversationState = useCallback((oldStateKey: string, newStateKey: string) => {
|
||||
if (oldStateKey === newStateKey) return;
|
||||
|
||||
@@ -212,31 +249,14 @@ export function useUnreadCounts(
|
||||
});
|
||||
}, []);
|
||||
|
||||
// Track a new incoming message for unread counts
|
||||
const trackNewMessage = useCallback((msg: Message) => {
|
||||
let conversationKey: string | null = null;
|
||||
if (msg.type === 'CHAN' && msg.conversation_key) {
|
||||
conversationKey = getStateKey('channel', msg.conversation_key);
|
||||
} else if (msg.type === 'PRIV' && msg.conversation_key) {
|
||||
conversationKey = getStateKey('contact', msg.conversation_key);
|
||||
}
|
||||
|
||||
if (conversationKey) {
|
||||
const timestamp = msg.received_at || Math.floor(Date.now() / 1000);
|
||||
const updated = setLastMessageTime(conversationKey, timestamp);
|
||||
setLastMessageTimes(updated);
|
||||
}
|
||||
}, []);
|
||||
|
||||
return {
|
||||
unreadCounts,
|
||||
mentions,
|
||||
lastMessageTimes,
|
||||
unreadLastReadAts,
|
||||
incrementUnread,
|
||||
recordMessageEvent,
|
||||
renameConversationState,
|
||||
markAllRead,
|
||||
trackNewMessage,
|
||||
refreshUnreads: fetchUnreads,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,174 +0,0 @@
|
||||
/**
|
||||
* LRU message cache for recently-visited conversations.
|
||||
*
|
||||
* Uses Map insertion-order semantics: the most recently used entry
|
||||
* is always at the end. Eviction removes the first (least-recently-used) entry.
|
||||
*
|
||||
* Cache size: 20 conversations, 200 messages each (~2.4MB worst case).
|
||||
*/
|
||||
|
||||
import type { Message, MessagePath } from './types';
|
||||
|
||||
export const MAX_CACHED_CONVERSATIONS = 20;
|
||||
export const MAX_MESSAGES_PER_ENTRY = 200;
|
||||
|
||||
interface CacheEntry {
|
||||
messages: Message[];
|
||||
seenContent: Set<string>;
|
||||
hasOlderMessages: boolean;
|
||||
}
|
||||
|
||||
const cache = new Map<string, CacheEntry>();
|
||||
|
||||
/** Get a cached entry and promote it to most-recently-used. */
|
||||
export function get(id: string): CacheEntry | undefined {
|
||||
const entry = cache.get(id);
|
||||
if (!entry) return undefined;
|
||||
// Promote to MRU: delete and re-insert
|
||||
cache.delete(id);
|
||||
cache.set(id, entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
/** Insert or update an entry at MRU position, evicting LRU if over capacity. */
|
||||
export function set(id: string, entry: CacheEntry): void {
|
||||
// Trim to most recent messages to bound memory
|
||||
if (entry.messages.length > MAX_MESSAGES_PER_ENTRY) {
|
||||
const trimmed = [...entry.messages]
|
||||
.sort((a, b) => b.received_at - a.received_at)
|
||||
.slice(0, MAX_MESSAGES_PER_ENTRY);
|
||||
entry = { ...entry, messages: trimmed, hasOlderMessages: true };
|
||||
}
|
||||
// Remove first so re-insert moves to end
|
||||
cache.delete(id);
|
||||
cache.set(id, entry);
|
||||
// Evict LRU (first entry) if over capacity
|
||||
if (cache.size > MAX_CACHED_CONVERSATIONS) {
|
||||
const lruKey = cache.keys().next().value as string;
|
||||
cache.delete(lruKey);
|
||||
}
|
||||
}
|
||||
|
||||
/** Add a message to a cached conversation with dedup. Returns true if new, false if duplicate. */
|
||||
export function addMessage(id: string, msg: Message, contentKey: string): boolean {
|
||||
const entry = cache.get(id);
|
||||
if (!entry) {
|
||||
// Auto-create a minimal entry for never-visited conversations
|
||||
cache.set(id, {
|
||||
messages: [msg],
|
||||
seenContent: new Set([contentKey]),
|
||||
hasOlderMessages: true,
|
||||
});
|
||||
// Evict LRU if over capacity
|
||||
if (cache.size > MAX_CACHED_CONVERSATIONS) {
|
||||
const lruKey = cache.keys().next().value as string;
|
||||
cache.delete(lruKey);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (entry.seenContent.has(contentKey)) return false;
|
||||
if (entry.messages.some((m) => m.id === msg.id)) return false;
|
||||
entry.seenContent.add(contentKey);
|
||||
entry.messages = [...entry.messages, msg];
|
||||
// Trim if over limit (drop oldest by received_at)
|
||||
if (entry.messages.length > MAX_MESSAGES_PER_ENTRY) {
|
||||
entry.messages = [...entry.messages]
|
||||
.sort((a, b) => b.received_at - a.received_at)
|
||||
.slice(0, MAX_MESSAGES_PER_ENTRY);
|
||||
}
|
||||
// Promote to MRU so actively-messaged conversations aren't evicted
|
||||
cache.delete(id);
|
||||
cache.set(id, entry);
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Scan all cached entries for a message ID and update its ack/paths. */
|
||||
export function updateAck(messageId: number, ackCount: number, paths?: MessagePath[]): void {
|
||||
for (const entry of cache.values()) {
|
||||
const idx = entry.messages.findIndex((m) => m.id === messageId);
|
||||
if (idx >= 0) {
|
||||
const current = entry.messages[idx];
|
||||
const updated = [...entry.messages];
|
||||
updated[idx] = {
|
||||
...current,
|
||||
acked: Math.max(current.acked, ackCount),
|
||||
...(paths !== undefined && paths.length >= (current.paths?.length ?? 0) && { paths }),
|
||||
};
|
||||
entry.messages = updated;
|
||||
return; // Message IDs are unique, stop after first match
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare fetched messages against current state.
|
||||
* Returns merged array if there are differences (new messages or ack changes),
|
||||
* or null if the cache is already consistent (happy path — no rerender needed).
|
||||
* Preserves any older paginated messages not present in the fetched page.
|
||||
*/
|
||||
export function reconcile(current: Message[], fetched: Message[]): Message[] | null {
|
||||
const currentById = new Map<number, { acked: number; pathsLen: number; text: string }>();
|
||||
for (const m of current) {
|
||||
currentById.set(m.id, { acked: m.acked, pathsLen: m.paths?.length ?? 0, text: m.text });
|
||||
}
|
||||
|
||||
let needsUpdate = false;
|
||||
for (const m of fetched) {
|
||||
const cur = currentById.get(m.id);
|
||||
if (
|
||||
!cur ||
|
||||
cur.acked !== m.acked ||
|
||||
cur.pathsLen !== (m.paths?.length ?? 0) ||
|
||||
cur.text !== m.text
|
||||
) {
|
||||
needsUpdate = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!needsUpdate) return null;
|
||||
|
||||
// Merge: fresh recent page + any older paginated messages not in the fetch
|
||||
const fetchedIds = new Set(fetched.map((m) => m.id));
|
||||
const olderMessages = current.filter((m) => !fetchedIds.has(m.id));
|
||||
return [...fetched, ...olderMessages];
|
||||
}
|
||||
|
||||
/** Evict a specific conversation from the cache. */
|
||||
export function remove(id: string): void {
|
||||
cache.delete(id);
|
||||
}
|
||||
|
||||
/** Move cached conversation state to a new conversation id. */
|
||||
export function rename(oldId: string, newId: string): void {
|
||||
if (oldId === newId) return;
|
||||
const oldEntry = cache.get(oldId);
|
||||
if (!oldEntry) return;
|
||||
|
||||
const newEntry = cache.get(newId);
|
||||
if (!newEntry) {
|
||||
cache.delete(oldId);
|
||||
cache.set(newId, oldEntry);
|
||||
return;
|
||||
}
|
||||
|
||||
const mergedMessages = [...newEntry.messages];
|
||||
const seenIds = new Set(mergedMessages.map((message) => message.id));
|
||||
for (const message of oldEntry.messages) {
|
||||
if (!seenIds.has(message.id)) {
|
||||
mergedMessages.push(message);
|
||||
seenIds.add(message.id);
|
||||
}
|
||||
}
|
||||
|
||||
cache.delete(oldId);
|
||||
cache.set(newId, {
|
||||
messages: mergedMessages,
|
||||
seenContent: new Set([...newEntry.seenContent, ...oldEntry.seenContent]),
|
||||
hasOlderMessages: newEntry.hasOlderMessages || oldEntry.hasOlderMessages,
|
||||
});
|
||||
}
|
||||
|
||||
/** Clear the entire cache. */
|
||||
export function clear(): void {
|
||||
cache.clear();
|
||||
}
|
||||
@@ -31,15 +31,15 @@ const mocks = vi.hoisted(() => ({
|
||||
error: vi.fn(),
|
||||
},
|
||||
hookFns: {
|
||||
setMessages: vi.fn(),
|
||||
fetchMessages: vi.fn(async () => {}),
|
||||
fetchOlderMessages: vi.fn(async () => {}),
|
||||
addMessageIfNew: vi.fn(),
|
||||
updateMessageAck: vi.fn(),
|
||||
triggerReconcile: vi.fn(),
|
||||
incrementUnread: vi.fn(),
|
||||
observeMessage: vi.fn(() => ({ added: false, activeConversation: false })),
|
||||
receiveMessageAck: vi.fn(),
|
||||
reconcileOnReconnect: vi.fn(),
|
||||
renameConversationMessages: vi.fn(),
|
||||
removeConversationMessages: vi.fn(),
|
||||
clearConversationMessages: vi.fn(),
|
||||
recordMessageEvent: vi.fn(),
|
||||
markAllRead: vi.fn(),
|
||||
trackNewMessage: vi.fn(),
|
||||
refreshUnreads: vi.fn(async () => {}),
|
||||
},
|
||||
}));
|
||||
@@ -63,38 +63,30 @@ vi.mock('../hooks', async (importOriginal) => {
|
||||
hasOlderMessages: false,
|
||||
hasNewerMessages: false,
|
||||
loadingNewer: false,
|
||||
hasNewerMessagesRef: { current: false },
|
||||
setMessages: mocks.hookFns.setMessages,
|
||||
fetchMessages: mocks.hookFns.fetchMessages,
|
||||
fetchOlderMessages: mocks.hookFns.fetchOlderMessages,
|
||||
fetchNewerMessages: vi.fn(async () => {}),
|
||||
jumpToBottom: vi.fn(),
|
||||
reloadCurrentConversation: vi.fn(),
|
||||
addMessageIfNew: mocks.hookFns.addMessageIfNew,
|
||||
updateMessageAck: mocks.hookFns.updateMessageAck,
|
||||
triggerReconcile: mocks.hookFns.triggerReconcile,
|
||||
observeMessage: mocks.hookFns.observeMessage,
|
||||
receiveMessageAck: mocks.hookFns.receiveMessageAck,
|
||||
reconcileOnReconnect: mocks.hookFns.reconcileOnReconnect,
|
||||
renameConversationMessages: mocks.hookFns.renameConversationMessages,
|
||||
removeConversationMessages: mocks.hookFns.removeConversationMessages,
|
||||
clearConversationMessages: mocks.hookFns.clearConversationMessages,
|
||||
}),
|
||||
useUnreadCounts: () => ({
|
||||
unreadCounts: {},
|
||||
mentions: {},
|
||||
lastMessageTimes: {},
|
||||
unreadLastReadAts: {},
|
||||
incrementUnread: mocks.hookFns.incrementUnread,
|
||||
recordMessageEvent: mocks.hookFns.recordMessageEvent,
|
||||
renameConversationState: vi.fn(),
|
||||
markAllRead: mocks.hookFns.markAllRead,
|
||||
trackNewMessage: mocks.hookFns.trackNewMessage,
|
||||
refreshUnreads: mocks.hookFns.refreshUnreads,
|
||||
}),
|
||||
getMessageContentKey: () => 'content-key',
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock('../messageCache', () => ({
|
||||
addMessage: vi.fn(),
|
||||
updateAck: vi.fn(),
|
||||
remove: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../components/StatusBar', () => ({
|
||||
StatusBar: ({
|
||||
settingsMode,
|
||||
@@ -295,7 +287,7 @@ describe('App favorite toggle flow', () => {
|
||||
await waitFor(() => {
|
||||
expect(mocks.api.getChannels).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
expect(mocks.hookFns.triggerReconcile).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.hookFns.reconcileOnReconnect).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.hookFns.refreshUnreads).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
|
||||
@@ -37,15 +37,16 @@ vi.mock('../hooks', async (importOriginal) => {
|
||||
hasOlderMessages: false,
|
||||
hasNewerMessages: false,
|
||||
loadingNewer: false,
|
||||
hasNewerMessagesRef: { current: false },
|
||||
setMessages: vi.fn(),
|
||||
fetchOlderMessages: vi.fn(async () => {}),
|
||||
fetchNewerMessages: vi.fn(async () => {}),
|
||||
jumpToBottom: vi.fn(),
|
||||
reloadCurrentConversation: vi.fn(),
|
||||
addMessageIfNew: vi.fn(),
|
||||
updateMessageAck: vi.fn(),
|
||||
triggerReconcile: vi.fn(),
|
||||
observeMessage: vi.fn(() => ({ added: false, activeConversation: false })),
|
||||
receiveMessageAck: vi.fn(),
|
||||
reconcileOnReconnect: vi.fn(),
|
||||
renameConversationMessages: vi.fn(),
|
||||
removeConversationMessages: vi.fn(),
|
||||
clearConversationMessages: vi.fn(),
|
||||
};
|
||||
},
|
||||
useUnreadCounts: () => ({
|
||||
@@ -53,22 +54,14 @@ vi.mock('../hooks', async (importOriginal) => {
|
||||
mentions: {},
|
||||
lastMessageTimes: {},
|
||||
unreadLastReadAts: {},
|
||||
incrementUnread: vi.fn(),
|
||||
recordMessageEvent: vi.fn(),
|
||||
renameConversationState: vi.fn(),
|
||||
markAllRead: vi.fn(),
|
||||
trackNewMessage: vi.fn(),
|
||||
refreshUnreads: vi.fn(),
|
||||
}),
|
||||
getMessageContentKey: () => 'content-key',
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock('../messageCache', () => ({
|
||||
addMessage: vi.fn(),
|
||||
updateAck: vi.fn(),
|
||||
remove: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../components/StatusBar', () => ({
|
||||
StatusBar: () => <div data-testid="status-bar" />,
|
||||
}));
|
||||
|
||||
@@ -32,38 +32,30 @@ vi.mock('../hooks', async (importOriginal) => {
|
||||
hasOlderMessages: false,
|
||||
hasNewerMessages: false,
|
||||
loadingNewer: false,
|
||||
hasNewerMessagesRef: { current: false },
|
||||
setMessages: vi.fn(),
|
||||
fetchMessages: vi.fn(async () => {}),
|
||||
fetchOlderMessages: vi.fn(async () => {}),
|
||||
fetchNewerMessages: vi.fn(async () => {}),
|
||||
jumpToBottom: vi.fn(),
|
||||
reloadCurrentConversation: vi.fn(),
|
||||
addMessageIfNew: vi.fn(),
|
||||
updateMessageAck: vi.fn(),
|
||||
triggerReconcile: vi.fn(),
|
||||
observeMessage: vi.fn(() => ({ added: false, activeConversation: false })),
|
||||
receiveMessageAck: vi.fn(),
|
||||
reconcileOnReconnect: vi.fn(),
|
||||
renameConversationMessages: vi.fn(),
|
||||
removeConversationMessages: vi.fn(),
|
||||
clearConversationMessages: vi.fn(),
|
||||
}),
|
||||
useUnreadCounts: () => ({
|
||||
unreadCounts: {},
|
||||
mentions: {},
|
||||
lastMessageTimes: {},
|
||||
unreadLastReadAts: {},
|
||||
incrementUnread: vi.fn(),
|
||||
recordMessageEvent: vi.fn(),
|
||||
renameConversationState: vi.fn(),
|
||||
markAllRead: vi.fn(),
|
||||
trackNewMessage: vi.fn(),
|
||||
refreshUnreads: vi.fn(async () => {}),
|
||||
}),
|
||||
getMessageContentKey: () => 'content-key',
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock('../messageCache', () => ({
|
||||
addMessage: vi.fn(),
|
||||
updateAck: vi.fn(),
|
||||
remove: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../components/StatusBar', () => ({
|
||||
StatusBar: () => <div data-testid="status-bar" />,
|
||||
}));
|
||||
|
||||
@@ -8,12 +8,12 @@
|
||||
* between backend and frontend - both sides test against the same data.
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import fixtures from './fixtures/websocket_events.json';
|
||||
import { getMessageContentKey } from '../hooks/useConversationMessages';
|
||||
import { getStateKey } from '../utils/conversationState';
|
||||
import { mergeContactIntoList } from '../utils/contactMerge';
|
||||
import * as messageCache from '../messageCache';
|
||||
import { ConversationMessageCache } from '../hooks/useConversationMessages';
|
||||
import { getMessageContentKey } from '../utils/messageIdentity';
|
||||
import type { Contact, Message } from '../types';
|
||||
|
||||
/**
|
||||
@@ -25,6 +25,7 @@ interface MockState {
|
||||
unreadCounts: Record<string, number>;
|
||||
lastMessageTimes: Record<string, number>;
|
||||
seenActiveContent: Set<string>;
|
||||
messageCache: ConversationMessageCache;
|
||||
}
|
||||
|
||||
function createMockState(): MockState {
|
||||
@@ -33,6 +34,7 @@ function createMockState(): MockState {
|
||||
unreadCounts: {},
|
||||
lastMessageTimes: {},
|
||||
seenActiveContent: new Set(),
|
||||
messageCache: new ConversationMessageCache(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -68,7 +70,7 @@ function handleMessageEvent(
|
||||
state.lastMessageTimes[stateKey] = msg.received_at;
|
||||
|
||||
if (!isForActiveConversation) {
|
||||
const isNew = messageCache.addMessage(msg.conversation_key, msg, contentKey);
|
||||
const isNew = state.messageCache.addMessage(msg.conversation_key, msg);
|
||||
if (!msg.outgoing && isNew) {
|
||||
state.unreadCounts[stateKey] = (state.unreadCounts[stateKey] || 0) + 1;
|
||||
unreadIncremented = true;
|
||||
@@ -78,11 +80,6 @@ function handleMessageEvent(
|
||||
return { added, unreadIncremented };
|
||||
}
|
||||
|
||||
// Clear messageCache between tests to avoid cross-test contamination
|
||||
beforeEach(() => {
|
||||
messageCache.clear();
|
||||
});
|
||||
|
||||
describe('Integration: Channel Message Events', () => {
|
||||
const fixture = fixtures.channel_message;
|
||||
|
||||
@@ -180,7 +177,7 @@ describe('Integration: No phantom unreads from mesh echoes (hitlist #8 regressio
|
||||
// dual-set design the global set would drop msg-0's key during pruning,
|
||||
// so a later mesh echo of msg-0 would pass the global check and
|
||||
// phantom-increment unread. With the fix, messageCache's per-conversation
|
||||
// seenContent is the single source of truth and is never pruned.
|
||||
// Cached messages remain the source of truth for inactive-conversation dedup.
|
||||
const MESSAGE_COUNT = 1001;
|
||||
for (let i = 0; i < MESSAGE_COUNT; i++) {
|
||||
const msg: Message = {
|
||||
@@ -342,11 +339,8 @@ describe('Integration: Contact Merge', () => {
|
||||
// --- ACK + messageCache propagation tests ---
|
||||
|
||||
describe('Integration: ACK + messageCache propagation', () => {
|
||||
beforeEach(() => {
|
||||
messageCache.clear();
|
||||
});
|
||||
|
||||
it('updateAck updates acked count on cached message', () => {
|
||||
const messageCache = new ConversationMessageCache();
|
||||
const msg: Message = {
|
||||
id: 100,
|
||||
type: 'PRIV',
|
||||
@@ -362,7 +356,7 @@ describe('Integration: ACK + messageCache propagation', () => {
|
||||
acked: 0,
|
||||
sender_name: null,
|
||||
};
|
||||
messageCache.addMessage('pk_abc', msg, 'key-100');
|
||||
messageCache.addMessage('pk_abc', msg);
|
||||
|
||||
messageCache.updateAck(100, 1);
|
||||
|
||||
@@ -372,6 +366,7 @@ describe('Integration: ACK + messageCache propagation', () => {
|
||||
});
|
||||
|
||||
it('updateAck updates paths when longer', () => {
|
||||
const messageCache = new ConversationMessageCache();
|
||||
const msg: Message = {
|
||||
id: 101,
|
||||
type: 'PRIV',
|
||||
@@ -387,7 +382,7 @@ describe('Integration: ACK + messageCache propagation', () => {
|
||||
acked: 1,
|
||||
sender_name: null,
|
||||
};
|
||||
messageCache.addMessage('pk_abc', msg, 'key-101');
|
||||
messageCache.addMessage('pk_abc', msg);
|
||||
|
||||
const longerPaths = [
|
||||
{ path: 'aa', received_at: 1700000001 },
|
||||
@@ -401,6 +396,7 @@ describe('Integration: ACK + messageCache propagation', () => {
|
||||
});
|
||||
|
||||
it('preserves higher existing ack count (max semantics)', () => {
|
||||
const messageCache = new ConversationMessageCache();
|
||||
const msg: Message = {
|
||||
id: 102,
|
||||
type: 'PRIV',
|
||||
@@ -416,7 +412,7 @@ describe('Integration: ACK + messageCache propagation', () => {
|
||||
acked: 5,
|
||||
sender_name: null,
|
||||
};
|
||||
messageCache.addMessage('pk_abc', msg, 'key-102');
|
||||
messageCache.addMessage('pk_abc', msg);
|
||||
|
||||
// Try to update with a lower ack count
|
||||
messageCache.updateAck(102, 3);
|
||||
@@ -426,6 +422,7 @@ describe('Integration: ACK + messageCache propagation', () => {
|
||||
});
|
||||
|
||||
it('is a no-op for unknown message ID', () => {
|
||||
const messageCache = new ConversationMessageCache();
|
||||
const msg: Message = {
|
||||
id: 103,
|
||||
type: 'PRIV',
|
||||
@@ -441,7 +438,7 @@ describe('Integration: ACK + messageCache propagation', () => {
|
||||
acked: 0,
|
||||
sender_name: null,
|
||||
};
|
||||
messageCache.addMessage('pk_abc', msg, 'key-103');
|
||||
messageCache.addMessage('pk_abc', msg);
|
||||
|
||||
// Update a non-existent message ID — should not throw or modify anything
|
||||
messageCache.updateAck(999, 1);
|
||||
|
||||
@@ -3,8 +3,12 @@
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
import * as messageCache from '../messageCache';
|
||||
import { MAX_CACHED_CONVERSATIONS, MAX_MESSAGES_PER_ENTRY } from '../messageCache';
|
||||
import {
|
||||
ConversationMessageCache,
|
||||
MAX_CACHED_CONVERSATIONS,
|
||||
MAX_MESSAGES_PER_ENTRY,
|
||||
reconcileConversationMessages,
|
||||
} from '../hooks/useConversationMessages';
|
||||
import type { Message } from '../types';
|
||||
|
||||
function createMessage(overrides: Partial<Message> = {}): Message {
|
||||
@@ -27,16 +31,14 @@ function createMessage(overrides: Partial<Message> = {}): Message {
|
||||
}
|
||||
|
||||
function createEntry(messages: Message[] = [], hasOlderMessages = false) {
|
||||
const seenContent = new Set<string>();
|
||||
for (const msg of messages) {
|
||||
seenContent.add(`${msg.type}-${msg.conversation_key}-${msg.text}-${msg.sender_timestamp}`);
|
||||
}
|
||||
return { messages, seenContent, hasOlderMessages };
|
||||
return { messages, hasOlderMessages };
|
||||
}
|
||||
|
||||
describe('messageCache', () => {
|
||||
let messageCache: ConversationMessageCache;
|
||||
|
||||
beforeEach(() => {
|
||||
messageCache.clear();
|
||||
messageCache = new ConversationMessageCache();
|
||||
});
|
||||
|
||||
describe('get/set', () => {
|
||||
@@ -155,11 +157,7 @@ describe('messageCache', () => {
|
||||
messageCache.set('conv1', createEntry([]));
|
||||
|
||||
const msg = createMessage({ id: 10, text: 'New message' });
|
||||
const result = messageCache.addMessage(
|
||||
'conv1',
|
||||
msg,
|
||||
'CHAN-channel123-New message-1700000000'
|
||||
);
|
||||
const result = messageCache.addMessage('conv1', msg);
|
||||
|
||||
expect(result).toBe(true);
|
||||
const entry = messageCache.get('conv1');
|
||||
@@ -171,12 +169,11 @@ describe('messageCache', () => {
|
||||
messageCache.set('conv1', createEntry([]));
|
||||
|
||||
const msg1 = createMessage({ id: 10, text: 'Hello' });
|
||||
const contentKey = 'CHAN-channel123-Hello-1700000000';
|
||||
expect(messageCache.addMessage('conv1', msg1, contentKey)).toBe(true);
|
||||
expect(messageCache.addMessage('conv1', msg1)).toBe(true);
|
||||
|
||||
// Same content key, different message id
|
||||
const msg2 = createMessage({ id: 11, text: 'Hello' });
|
||||
expect(messageCache.addMessage('conv1', msg2, contentKey)).toBe(false);
|
||||
expect(messageCache.addMessage('conv1', msg2)).toBe(false);
|
||||
|
||||
const entry = messageCache.get('conv1');
|
||||
expect(entry!.messages).toHaveLength(1);
|
||||
@@ -187,9 +184,7 @@ describe('messageCache', () => {
|
||||
|
||||
// Same id, different content key
|
||||
const msg = createMessage({ id: 10, text: 'Different' });
|
||||
expect(messageCache.addMessage('conv1', msg, 'CHAN-channel123-Different-1700000000')).toBe(
|
||||
false
|
||||
);
|
||||
expect(messageCache.addMessage('conv1', msg)).toBe(false);
|
||||
|
||||
const entry = messageCache.get('conv1');
|
||||
expect(entry!.messages).toHaveLength(1);
|
||||
@@ -208,11 +203,7 @@ describe('messageCache', () => {
|
||||
text: 'newest',
|
||||
received_at: 1700000000 + MAX_MESSAGES_PER_ENTRY,
|
||||
});
|
||||
const result = messageCache.addMessage(
|
||||
'conv1',
|
||||
newMsg,
|
||||
`CHAN-channel123-newest-${newMsg.sender_timestamp}`
|
||||
);
|
||||
const result = messageCache.addMessage('conv1', newMsg);
|
||||
|
||||
expect(result).toBe(true);
|
||||
const entry = messageCache.get('conv1');
|
||||
@@ -225,11 +216,7 @@ describe('messageCache', () => {
|
||||
|
||||
it('auto-creates a minimal entry for never-visited conversations and returns true', () => {
|
||||
const msg = createMessage({ id: 10, text: 'First contact' });
|
||||
const result = messageCache.addMessage(
|
||||
'new_conv',
|
||||
msg,
|
||||
'CHAN-channel123-First contact-1700000000'
|
||||
);
|
||||
const result = messageCache.addMessage('new_conv', msg);
|
||||
|
||||
expect(result).toBe(true);
|
||||
const entry = messageCache.get('new_conv');
|
||||
@@ -237,7 +224,6 @@ describe('messageCache', () => {
|
||||
expect(entry!.messages).toHaveLength(1);
|
||||
expect(entry!.messages[0].text).toBe('First contact');
|
||||
expect(entry!.hasOlderMessages).toBe(true);
|
||||
expect(entry!.seenContent.has('CHAN-channel123-First contact-1700000000')).toBe(true);
|
||||
});
|
||||
|
||||
it('promotes entry to MRU on addMessage', () => {
|
||||
@@ -248,7 +234,7 @@ describe('messageCache', () => {
|
||||
|
||||
// addMessage to conv0 (currently LRU) should promote it
|
||||
const msg = createMessage({ id: 999, text: 'Incoming WS message' });
|
||||
messageCache.addMessage('conv0', msg, 'CHAN-channel123-Incoming WS message-1700000000');
|
||||
messageCache.addMessage('conv0', msg);
|
||||
|
||||
// Add one more — conv1 should now be LRU and get evicted, not conv0
|
||||
messageCache.set('conv_new', createEntry());
|
||||
@@ -259,11 +245,10 @@ describe('messageCache', () => {
|
||||
|
||||
it('returns false for duplicate delivery to auto-created entry', () => {
|
||||
const msg = createMessage({ id: 10, text: 'Echo' });
|
||||
const contentKey = 'CHAN-channel123-Echo-1700000000';
|
||||
|
||||
expect(messageCache.addMessage('new_conv', msg, contentKey)).toBe(true);
|
||||
expect(messageCache.addMessage('new_conv', msg)).toBe(true);
|
||||
// Duplicate via mesh echo
|
||||
expect(messageCache.addMessage('new_conv', msg, contentKey)).toBe(false);
|
||||
expect(messageCache.addMessage('new_conv', msg)).toBe(false);
|
||||
|
||||
const entry = messageCache.get('new_conv');
|
||||
expect(entry!.messages).toHaveLength(1);
|
||||
@@ -358,7 +343,7 @@ describe('messageCache', () => {
|
||||
createMessage({ id: 3, acked: 1 }),
|
||||
];
|
||||
|
||||
expect(messageCache.reconcile(msgs, fetched)).toBeNull();
|
||||
expect(reconcileConversationMessages(msgs, fetched)).toBeNull();
|
||||
});
|
||||
|
||||
it('detects new messages missing from cache', () => {
|
||||
@@ -369,7 +354,7 @@ describe('messageCache', () => {
|
||||
createMessage({ id: 3, text: 'missed via WS' }),
|
||||
];
|
||||
|
||||
const merged = messageCache.reconcile(current, fetched);
|
||||
const merged = reconcileConversationMessages(current, fetched);
|
||||
expect(merged).not.toBeNull();
|
||||
expect(merged!.map((m) => m.id)).toEqual([1, 2, 3]);
|
||||
});
|
||||
@@ -378,7 +363,7 @@ describe('messageCache', () => {
|
||||
const current = [createMessage({ id: 1, acked: 0 })];
|
||||
const fetched = [createMessage({ id: 1, acked: 3 })];
|
||||
|
||||
const merged = messageCache.reconcile(current, fetched);
|
||||
const merged = reconcileConversationMessages(current, fetched);
|
||||
expect(merged).not.toBeNull();
|
||||
expect(merged![0].acked).toBe(3);
|
||||
});
|
||||
@@ -397,20 +382,20 @@ describe('messageCache', () => {
|
||||
createMessage({ id: 2 }),
|
||||
];
|
||||
|
||||
const merged = messageCache.reconcile(current, fetched);
|
||||
const merged = reconcileConversationMessages(current, fetched);
|
||||
expect(merged).not.toBeNull();
|
||||
// Should have fetched page + older paginated message
|
||||
expect(merged!.map((m) => m.id)).toEqual([4, 3, 2, 1]);
|
||||
});
|
||||
|
||||
it('returns null for empty fetched and empty current', () => {
|
||||
expect(messageCache.reconcile([], [])).toBeNull();
|
||||
expect(reconcileConversationMessages([], [])).toBeNull();
|
||||
});
|
||||
|
||||
it('detects difference when current is empty but fetch has messages', () => {
|
||||
const fetched = [createMessage({ id: 1 })];
|
||||
|
||||
const merged = messageCache.reconcile([], fetched);
|
||||
const merged = reconcileConversationMessages([], fetched);
|
||||
expect(merged).not.toBeNull();
|
||||
expect(merged!).toHaveLength(1);
|
||||
});
|
||||
@@ -430,7 +415,7 @@ describe('messageCache', () => {
|
||||
}),
|
||||
];
|
||||
|
||||
const merged = messageCache.reconcile(current, fetched);
|
||||
const merged = reconcileConversationMessages(current, fetched);
|
||||
expect(merged).not.toBeNull();
|
||||
expect(merged![0].paths).toHaveLength(2);
|
||||
});
|
||||
@@ -439,7 +424,7 @@ describe('messageCache', () => {
|
||||
const current = [createMessage({ id: 1, text: '[encrypted]' })];
|
||||
const fetched = [createMessage({ id: 1, text: 'Hello world' })];
|
||||
|
||||
const merged = messageCache.reconcile(current, fetched);
|
||||
const merged = reconcileConversationMessages(current, fetched);
|
||||
expect(merged).not.toBeNull();
|
||||
expect(merged![0].text).toBe('Hello world');
|
||||
});
|
||||
@@ -449,7 +434,7 @@ describe('messageCache', () => {
|
||||
const current = [createMessage({ id: 1, acked: 2, paths, text: 'Hello' })];
|
||||
const fetched = [createMessage({ id: 1, acked: 2, paths, text: 'Hello' })];
|
||||
|
||||
expect(messageCache.reconcile(current, fetched)).toBeNull();
|
||||
expect(reconcileConversationMessages(current, fetched)).toBeNull();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -35,11 +35,6 @@ vi.mock('../components/ui/sonner', () => ({
|
||||
toast: { success: vi.fn(), error: vi.fn() },
|
||||
}));
|
||||
|
||||
// Mock messageCache
|
||||
vi.mock('../messageCache', () => ({
|
||||
remove: vi.fn(),
|
||||
}));
|
||||
|
||||
function makeContact(suffix: string): Contact {
|
||||
const key = suffix.padStart(64, '0');
|
||||
return {
|
||||
@@ -69,6 +64,7 @@ function makeContacts(count: number, startIndex = 0): Contact[] {
|
||||
|
||||
describe('useContactsAndChannels', () => {
|
||||
const setActiveConversation = vi.fn();
|
||||
const removeConversationMessages = vi.fn();
|
||||
const pendingDeleteFallbackRef = { current: false };
|
||||
const hasSetDefaultConversation = { current: false };
|
||||
|
||||
@@ -88,6 +84,7 @@ describe('useContactsAndChannels', () => {
|
||||
setActiveConversation,
|
||||
pendingDeleteFallbackRef,
|
||||
hasSetDefaultConversation,
|
||||
removeConversationMessages,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ function createArgs(overrides: Partial<Parameters<typeof useConversationActions>
|
||||
activeConversationRef: { current: activeConversation },
|
||||
setContacts: vi.fn(),
|
||||
setChannels: vi.fn(),
|
||||
addMessageIfNew: vi.fn(() => true),
|
||||
observeMessage: vi.fn(() => ({ added: true, activeConversation: true })),
|
||||
messageInputRef: { current: { appendText: vi.fn() } },
|
||||
...overrides,
|
||||
};
|
||||
@@ -85,7 +85,7 @@ describe('useConversationActions', () => {
|
||||
});
|
||||
|
||||
expect(mocks.api.sendChannelMessage).toHaveBeenCalledWith(publicChannel.key, sentMessage.text);
|
||||
expect(args.addMessageIfNew).toHaveBeenCalledWith(sentMessage);
|
||||
expect(args.observeMessage).toHaveBeenCalledWith(sentMessage);
|
||||
});
|
||||
|
||||
it('does not append a sent message after the active conversation changes', async () => {
|
||||
@@ -111,7 +111,7 @@ describe('useConversationActions', () => {
|
||||
await sendPromise;
|
||||
});
|
||||
|
||||
expect(args.addMessageIfNew).not.toHaveBeenCalled();
|
||||
expect(args.observeMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('appends sender mentions into the message input', () => {
|
||||
@@ -146,7 +146,7 @@ describe('useConversationActions', () => {
|
||||
});
|
||||
|
||||
expect(mocks.api.resendChannelMessage).toHaveBeenCalledWith(sentMessage.id, true);
|
||||
expect(args.addMessageIfNew).toHaveBeenCalledWith(resentMessage);
|
||||
expect(args.observeMessage).toHaveBeenCalledWith(resentMessage);
|
||||
});
|
||||
|
||||
it('does not append a byte-perfect resend locally', async () => {
|
||||
@@ -162,7 +162,7 @@ describe('useConversationActions', () => {
|
||||
await result.current.handleResendChannelMessage(sentMessage.id, false);
|
||||
});
|
||||
|
||||
expect(args.addMessageIfNew).not.toHaveBeenCalled();
|
||||
expect(args.observeMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('does not append a resend if the user has switched conversations', async () => {
|
||||
@@ -190,7 +190,7 @@ describe('useConversationActions', () => {
|
||||
await resendPromise;
|
||||
});
|
||||
|
||||
expect(args.addMessageIfNew).not.toHaveBeenCalled();
|
||||
expect(args.observeMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('merges returned contact data after path discovery', async () => {
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import { act, renderHook, waitFor } from '@testing-library/react';
|
||||
import { beforeEach, describe, expect, it, vi, type Mock } from 'vitest';
|
||||
|
||||
import * as messageCache from '../messageCache';
|
||||
import { api } from '../api';
|
||||
import { useConversationMessages } from '../hooks/useConversationMessages';
|
||||
import {
|
||||
conversationMessageCache,
|
||||
useConversationMessages,
|
||||
} from '../hooks/useConversationMessages';
|
||||
import type { Conversation, Message } from '../types';
|
||||
|
||||
const mockGetMessages = vi.fn<typeof api.getMessages>();
|
||||
@@ -62,7 +64,7 @@ function createDeferred<T>() {
|
||||
describe('useConversationMessages ACK ordering', () => {
|
||||
beforeEach(() => {
|
||||
mockGetMessages.mockReset();
|
||||
messageCache.clear();
|
||||
conversationMessageCache.clear();
|
||||
mockToastError.mockReset();
|
||||
});
|
||||
|
||||
@@ -76,11 +78,11 @@ describe('useConversationMessages ACK ordering', () => {
|
||||
|
||||
const paths = [{ path: 'A1B2', received_at: 1700000010 }];
|
||||
act(() => {
|
||||
result.current.updateMessageAck(42, 2, paths);
|
||||
result.current.receiveMessageAck(42, 2, paths);
|
||||
});
|
||||
|
||||
act(() => {
|
||||
const added = result.current.addMessageIfNew(
|
||||
const { added } = result.current.observeMessage(
|
||||
createMessage({ id: 42, acked: 0, paths: null })
|
||||
);
|
||||
expect(added).toBe(true);
|
||||
@@ -100,7 +102,7 @@ describe('useConversationMessages ACK ordering', () => {
|
||||
|
||||
const paths = [{ path: 'C3D4', received_at: 1700000011 }];
|
||||
act(() => {
|
||||
result.current.updateMessageAck(42, 1, paths);
|
||||
result.current.receiveMessageAck(42, 1, paths);
|
||||
});
|
||||
|
||||
deferred.resolve([createMessage({ id: 42, acked: 0, paths: null })]);
|
||||
@@ -118,7 +120,7 @@ describe('useConversationMessages ACK ordering', () => {
|
||||
await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1));
|
||||
|
||||
act(() => {
|
||||
const added = result.current.addMessageIfNew(
|
||||
const { added } = result.current.observeMessage(
|
||||
createMessage({
|
||||
id: 99,
|
||||
text: 'ws-arrived',
|
||||
@@ -153,7 +155,7 @@ describe('useConversationMessages ACK ordering', () => {
|
||||
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
|
||||
|
||||
act(() => {
|
||||
result.current.addMessageIfNew(createMessage({ id: 42, acked: 0, paths: null }));
|
||||
result.current.observeMessage(createMessage({ id: 42, acked: 0, paths: null }));
|
||||
});
|
||||
|
||||
const highAckPaths = [
|
||||
@@ -163,8 +165,8 @@ describe('useConversationMessages ACK ordering', () => {
|
||||
const staleAckPaths = [{ path: 'A1B2', received_at: 1700000010 }];
|
||||
|
||||
act(() => {
|
||||
result.current.updateMessageAck(42, 3, highAckPaths);
|
||||
result.current.updateMessageAck(42, 2, staleAckPaths);
|
||||
result.current.receiveMessageAck(42, 3, highAckPaths);
|
||||
result.current.receiveMessageAck(42, 2, staleAckPaths);
|
||||
});
|
||||
|
||||
expect(result.current.messages[0].acked).toBe(3);
|
||||
@@ -175,7 +177,7 @@ describe('useConversationMessages ACK ordering', () => {
|
||||
describe('useConversationMessages conversation switch', () => {
|
||||
beforeEach(() => {
|
||||
mockGetMessages.mockReset();
|
||||
messageCache.clear();
|
||||
conversationMessageCache.clear();
|
||||
});
|
||||
|
||||
it('resets loadingOlder when switching conversations mid-fetch', async () => {
|
||||
@@ -300,7 +302,7 @@ describe('useConversationMessages conversation switch', () => {
|
||||
describe('useConversationMessages background reconcile ordering', () => {
|
||||
beforeEach(() => {
|
||||
mockGetMessages.mockReset();
|
||||
messageCache.clear();
|
||||
conversationMessageCache.clear();
|
||||
});
|
||||
|
||||
it('ignores stale reconnect reconcile responses that finish after newer ones', async () => {
|
||||
@@ -321,8 +323,8 @@ describe('useConversationMessages background reconcile ordering', () => {
|
||||
.mockReturnValueOnce(secondReconcile.promise);
|
||||
|
||||
act(() => {
|
||||
result.current.triggerReconcile();
|
||||
result.current.triggerReconcile();
|
||||
result.current.reconcileOnReconnect();
|
||||
result.current.reconcileOnReconnect();
|
||||
});
|
||||
|
||||
secondReconcile.resolve([createMessage({ id: 42, text: 'newer snapshot', acked: 2 })]);
|
||||
@@ -342,11 +344,8 @@ describe('useConversationMessages background reconcile ordering', () => {
|
||||
const conv = createConversation();
|
||||
const cachedMessage = createMessage({ id: 42, text: 'cached snapshot' });
|
||||
|
||||
messageCache.set(conv.id, {
|
||||
conversationMessageCache.set(conv.id, {
|
||||
messages: [cachedMessage],
|
||||
seenContent: new Set([
|
||||
`PRIV-${cachedMessage.conversation_key}-${cachedMessage.text}-${cachedMessage.sender_timestamp}`,
|
||||
]),
|
||||
hasOlderMessages: true,
|
||||
});
|
||||
|
||||
@@ -365,7 +364,7 @@ describe('useConversationMessages background reconcile ordering', () => {
|
||||
describe('useConversationMessages older-page dedup and reentry', () => {
|
||||
beforeEach(() => {
|
||||
mockGetMessages.mockReset();
|
||||
messageCache.clear();
|
||||
conversationMessageCache.clear();
|
||||
});
|
||||
|
||||
it('prevents duplicate overlapping older-page fetches in the same tick', async () => {
|
||||
@@ -511,7 +510,7 @@ describe('useConversationMessages forward pagination', () => {
|
||||
beforeEach(() => {
|
||||
mockGetMessages.mockReset();
|
||||
mockGetMessagesAround.mockReset();
|
||||
messageCache.clear();
|
||||
conversationMessageCache.clear();
|
||||
mockToastError.mockReset();
|
||||
});
|
||||
|
||||
@@ -596,7 +595,7 @@ describe('useConversationMessages forward pagination', () => {
|
||||
|
||||
// Simulate WS adding a message with the same content key
|
||||
act(() => {
|
||||
result.current.addMessageIfNew(
|
||||
result.current.observeMessage(
|
||||
createMessage({
|
||||
id: 2,
|
||||
conversation_key: 'ch1',
|
||||
@@ -627,6 +626,70 @@ describe('useConversationMessages forward pagination', () => {
|
||||
expect(dupes).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('defers reconnect reconcile until forward pagination reaches the live tail', async () => {
|
||||
const conv: Conversation = { type: 'channel', id: 'ch1', name: 'Channel' };
|
||||
|
||||
mockGetMessagesAround.mockResolvedValueOnce({
|
||||
messages: [
|
||||
createMessage({
|
||||
id: 1,
|
||||
conversation_key: 'ch1',
|
||||
text: 'older-context',
|
||||
sender_timestamp: 1700000000,
|
||||
received_at: 1700000000,
|
||||
}),
|
||||
],
|
||||
has_older: false,
|
||||
has_newer: true,
|
||||
});
|
||||
|
||||
const { result } = renderHook(
|
||||
({ conv, target }: { conv: Conversation; target: number | null }) =>
|
||||
useConversationMessages(conv, target),
|
||||
{ initialProps: { conv, target: 1 } }
|
||||
);
|
||||
|
||||
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
|
||||
expect(result.current.hasNewerMessages).toBe(true);
|
||||
|
||||
act(() => {
|
||||
result.current.reconcileOnReconnect();
|
||||
});
|
||||
|
||||
expect(mockGetMessages).not.toHaveBeenCalled();
|
||||
|
||||
mockGetMessages
|
||||
.mockResolvedValueOnce([
|
||||
createMessage({
|
||||
id: 2,
|
||||
conversation_key: 'ch1',
|
||||
text: 'newer-page',
|
||||
sender_timestamp: 1700000001,
|
||||
received_at: 1700000001,
|
||||
}),
|
||||
])
|
||||
.mockResolvedValueOnce([
|
||||
createMessage({
|
||||
id: 2,
|
||||
conversation_key: 'ch1',
|
||||
text: 'newer-page',
|
||||
sender_timestamp: 1700000001,
|
||||
received_at: 1700000001,
|
||||
acked: 3,
|
||||
}),
|
||||
]);
|
||||
|
||||
await act(async () => {
|
||||
await result.current.fetchNewerMessages();
|
||||
});
|
||||
|
||||
await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(2));
|
||||
await waitFor(() =>
|
||||
expect(result.current.messages.find((message) => message.id === 2)?.acked).toBe(3)
|
||||
);
|
||||
expect(result.current.hasNewerMessages).toBe(false);
|
||||
});
|
||||
|
||||
it('jumpToBottom clears hasNewerMessages and refetches latest', async () => {
|
||||
const conv: Conversation = { type: 'channel', id: 'ch1', name: 'Channel' };
|
||||
|
||||
@@ -677,6 +740,55 @@ describe('useConversationMessages forward pagination', () => {
|
||||
expect(result.current.messages[0].text).toBe('latest-msg');
|
||||
});
|
||||
|
||||
it('jumpToBottom clears deferred reconnect reconcile without an extra reconcile fetch', async () => {
|
||||
const conv: Conversation = { type: 'channel', id: 'ch1', name: 'Channel' };
|
||||
|
||||
mockGetMessagesAround.mockResolvedValueOnce({
|
||||
messages: [
|
||||
createMessage({
|
||||
id: 5,
|
||||
conversation_key: 'ch1',
|
||||
text: 'around-msg',
|
||||
sender_timestamp: 1700000005,
|
||||
received_at: 1700000005,
|
||||
}),
|
||||
],
|
||||
has_older: true,
|
||||
has_newer: true,
|
||||
});
|
||||
|
||||
const { result } = renderHook(
|
||||
({ conv, target }: { conv: Conversation; target: number | null }) =>
|
||||
useConversationMessages(conv, target),
|
||||
{ initialProps: { conv, target: 5 } }
|
||||
);
|
||||
|
||||
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
|
||||
|
||||
act(() => {
|
||||
result.current.reconcileOnReconnect();
|
||||
});
|
||||
|
||||
mockGetMessages.mockResolvedValueOnce([
|
||||
createMessage({
|
||||
id: 10,
|
||||
conversation_key: 'ch1',
|
||||
text: 'latest-msg',
|
||||
sender_timestamp: 1700000010,
|
||||
received_at: 1700000010,
|
||||
}),
|
||||
]);
|
||||
|
||||
act(() => {
|
||||
result.current.jumpToBottom();
|
||||
});
|
||||
|
||||
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
|
||||
await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1));
|
||||
expect(result.current.messages[0].text).toBe('latest-msg');
|
||||
expect(result.current.hasNewerMessages).toBe(false);
|
||||
});
|
||||
|
||||
it('aborts stale newer-page requests on conversation switch without toasting', async () => {
|
||||
const convA: Conversation = { type: 'channel', id: 'ch1', name: 'Channel A' };
|
||||
const convB: Conversation = { type: 'channel', id: 'ch2', name: 'Channel B' };
|
||||
|
||||
@@ -5,7 +5,8 @@
|
||||
*/
|
||||
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { getMessageContentKey, mergePendingAck } from '../hooks/useConversationMessages';
|
||||
import { mergePendingAck } from '../hooks/useConversationMessages';
|
||||
import { getMessageContentKey } from '../utils/messageIdentity';
|
||||
import type { Message } from '../types';
|
||||
|
||||
function createMessage(overrides: Partial<Message> = {}): Message {
|
||||
|
||||
@@ -12,12 +12,6 @@ const mocks = vi.hoisted(() => ({
|
||||
success: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
messageCache: {
|
||||
addMessage: vi.fn(),
|
||||
remove: vi.fn(),
|
||||
rename: vi.fn(),
|
||||
updateAck: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('../api', () => ({
|
||||
@@ -28,8 +22,6 @@ vi.mock('../components/ui/sonner', () => ({
|
||||
toast: mocks.toast,
|
||||
}));
|
||||
|
||||
vi.mock('../messageCache', () => mocks.messageCache);
|
||||
|
||||
const publicChannel: Channel = {
|
||||
key: '8B3387E9C5CDEA6AC9E5EDBAA115CD72',
|
||||
name: 'Public',
|
||||
@@ -66,7 +58,7 @@ function createRealtimeArgs(overrides: Partial<Parameters<typeof useRealtimeAppS
|
||||
setHealth,
|
||||
fetchConfig: vi.fn(),
|
||||
setRawPackets,
|
||||
triggerReconcile: vi.fn(),
|
||||
reconcileOnReconnect: vi.fn(),
|
||||
refreshUnreads: vi.fn(async () => {}),
|
||||
setChannels,
|
||||
fetchAllContacts: vi.fn(async () => [] as Contact[]),
|
||||
@@ -74,15 +66,15 @@ function createRealtimeArgs(overrides: Partial<Parameters<typeof useRealtimeAppS
|
||||
blockedKeysRef: { current: [] as string[] },
|
||||
blockedNamesRef: { current: [] as string[] },
|
||||
activeConversationRef: { current: null as Conversation | null },
|
||||
hasNewerMessagesRef: { current: false },
|
||||
addMessageIfNew: vi.fn(),
|
||||
trackNewMessage: vi.fn(),
|
||||
incrementUnread: vi.fn(),
|
||||
observeMessage: vi.fn(() => ({ added: false, activeConversation: false })),
|
||||
recordMessageEvent: vi.fn(),
|
||||
renameConversationState: vi.fn(),
|
||||
checkMention: vi.fn(() => false),
|
||||
pendingDeleteFallbackRef: { current: false },
|
||||
setActiveConversation: vi.fn(),
|
||||
updateMessageAck: vi.fn(),
|
||||
renameConversationMessages: vi.fn(),
|
||||
removeConversationMessages: vi.fn(),
|
||||
receiveMessageAck: vi.fn(),
|
||||
notifyIncomingMessage: vi.fn(),
|
||||
...overrides,
|
||||
},
|
||||
@@ -133,7 +125,7 @@ describe('useRealtimeAppState', () => {
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(args.triggerReconcile).toHaveBeenCalledTimes(1);
|
||||
expect(args.reconcileOnReconnect).toHaveBeenCalledTimes(1);
|
||||
expect(args.refreshUnreads).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.api.getChannels).toHaveBeenCalledTimes(1);
|
||||
expect(args.fetchAllContacts).toHaveBeenCalledTimes(1);
|
||||
@@ -166,14 +158,6 @@ describe('useRealtimeAppState', () => {
|
||||
|
||||
const { args, fns } = createRealtimeArgs({
|
||||
fetchAllContacts: vi.fn(async () => contacts),
|
||||
activeConversationRef: {
|
||||
current: {
|
||||
type: 'channel',
|
||||
id: publicChannel.key,
|
||||
name: publicChannel.name,
|
||||
} satisfies Conversation,
|
||||
},
|
||||
hasNewerMessagesRef: { current: true },
|
||||
});
|
||||
|
||||
const { result } = renderHook(() => useRealtimeAppState(args));
|
||||
@@ -183,7 +167,7 @@ describe('useRealtimeAppState', () => {
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(args.triggerReconcile).not.toHaveBeenCalled();
|
||||
expect(args.reconcileOnReconnect).toHaveBeenCalledTimes(1);
|
||||
expect(args.refreshUnreads).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.api.getChannels).toHaveBeenCalledTimes(1);
|
||||
expect(args.fetchAllContacts).toHaveBeenCalledTimes(1);
|
||||
@@ -194,9 +178,9 @@ describe('useRealtimeAppState', () => {
|
||||
});
|
||||
|
||||
it('tracks unread state for a new non-active incoming message', () => {
|
||||
mocks.messageCache.addMessage.mockReturnValue(true);
|
||||
const { args } = createRealtimeArgs({
|
||||
checkMention: vi.fn(() => true),
|
||||
observeMessage: vi.fn(() => ({ added: true, activeConversation: false })),
|
||||
});
|
||||
|
||||
const { result } = renderHook(() => useRealtimeAppState(args));
|
||||
@@ -205,17 +189,13 @@ describe('useRealtimeAppState', () => {
|
||||
result.current.onMessage?.(incomingDm);
|
||||
});
|
||||
|
||||
expect(args.addMessageIfNew).not.toHaveBeenCalled();
|
||||
expect(args.trackNewMessage).toHaveBeenCalledWith(incomingDm);
|
||||
expect(mocks.messageCache.addMessage).toHaveBeenCalledWith(
|
||||
incomingDm.conversation_key,
|
||||
incomingDm,
|
||||
expect.any(String)
|
||||
);
|
||||
expect(args.incrementUnread).toHaveBeenCalledWith(
|
||||
`contact-${incomingDm.conversation_key}`,
|
||||
true
|
||||
);
|
||||
expect(args.observeMessage).toHaveBeenCalledWith(incomingDm);
|
||||
expect(args.recordMessageEvent).toHaveBeenCalledWith({
|
||||
msg: incomingDm,
|
||||
activeConversation: false,
|
||||
isNewMessage: true,
|
||||
hasMention: true,
|
||||
});
|
||||
expect(args.notifyIncomingMessage).toHaveBeenCalledWith(incomingDm);
|
||||
});
|
||||
|
||||
@@ -240,7 +220,7 @@ describe('useRealtimeAppState', () => {
|
||||
});
|
||||
|
||||
expect(fns.setContacts).toHaveBeenCalledWith(expect.any(Function));
|
||||
expect(mocks.messageCache.remove).toHaveBeenCalledWith(incomingDm.conversation_key);
|
||||
expect(args.removeConversationMessages).toHaveBeenCalledWith(incomingDm.conversation_key);
|
||||
expect(args.setActiveConversation).toHaveBeenCalledWith(null);
|
||||
expect(pendingDeleteFallbackRef.current).toBe(true);
|
||||
});
|
||||
@@ -282,7 +262,7 @@ describe('useRealtimeAppState', () => {
|
||||
});
|
||||
|
||||
expect(fns.setContacts).toHaveBeenCalledWith(expect.any(Function));
|
||||
expect(mocks.messageCache.rename).toHaveBeenCalledWith(
|
||||
expect(args.renameConversationMessages).toHaveBeenCalledWith(
|
||||
previousPublicKey,
|
||||
resolvedContact.public_key
|
||||
);
|
||||
|
||||
@@ -10,7 +10,8 @@ import { act, renderHook } from '@testing-library/react';
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||
|
||||
import { useUnreadCounts } from '../hooks/useUnreadCounts';
|
||||
import type { Channel, Contact, Conversation } from '../types';
|
||||
import type { Channel, Contact, Conversation, Message } from '../types';
|
||||
import { getStateKey } from '../utils/conversationState';
|
||||
|
||||
// Mock api module
|
||||
vi.mock('../api', () => ({
|
||||
@@ -57,6 +58,25 @@ function makeContact(pubkey: string): Contact {
|
||||
};
|
||||
}
|
||||
|
||||
function makeMessage(overrides: Partial<Message> = {}): Message {
|
||||
return {
|
||||
id: 1,
|
||||
type: 'PRIV',
|
||||
conversation_key: CONTACT_KEY,
|
||||
text: 'hello',
|
||||
sender_timestamp: 1700000000,
|
||||
received_at: 1700000001,
|
||||
paths: null,
|
||||
txt_type: 0,
|
||||
signature: null,
|
||||
sender_key: null,
|
||||
outgoing: false,
|
||||
acked: 0,
|
||||
sender_name: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
const CHANNEL_KEY = 'AABB00112233445566778899AABBCCDD';
|
||||
const CONTACT_KEY = '00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff';
|
||||
|
||||
@@ -332,4 +352,74 @@ describe('useUnreadCounts', () => {
|
||||
// Raw view doesn't filter any conversation's unreads
|
||||
expect(result.current.unreadCounts[`channel-${CHANNEL_KEY}`]).toBe(5);
|
||||
});
|
||||
|
||||
it('recordMessageEvent updates last-message time and unread count for new inactive incoming messages', async () => {
|
||||
const mocks = await getMockedApi();
|
||||
const { result } = renderWith({});
|
||||
|
||||
await act(async () => {
|
||||
await vi.waitFor(() => expect(mocks.getUnreads).toHaveBeenCalled());
|
||||
});
|
||||
|
||||
const msg = makeMessage({
|
||||
id: 5,
|
||||
type: 'CHAN',
|
||||
conversation_key: CHANNEL_KEY,
|
||||
received_at: 1700001234,
|
||||
});
|
||||
|
||||
await act(async () => {
|
||||
result.current.recordMessageEvent({
|
||||
msg,
|
||||
activeConversation: false,
|
||||
isNewMessage: true,
|
||||
hasMention: true,
|
||||
});
|
||||
});
|
||||
|
||||
expect(result.current.unreadCounts[getStateKey('channel', CHANNEL_KEY)]).toBe(1);
|
||||
expect(result.current.mentions[getStateKey('channel', CHANNEL_KEY)]).toBe(true);
|
||||
expect(result.current.lastMessageTimes[getStateKey('channel', CHANNEL_KEY)]).toBe(1700001234);
|
||||
});
|
||||
|
||||
it('recordMessageEvent skips unread increment for active or non-new messages but still tracks time', async () => {
|
||||
const mocks = await getMockedApi();
|
||||
const { result } = renderWith({});
|
||||
|
||||
await act(async () => {
|
||||
await vi.waitFor(() => expect(mocks.getUnreads).toHaveBeenCalled());
|
||||
});
|
||||
|
||||
const activeMsg = makeMessage({
|
||||
id: 6,
|
||||
type: 'PRIV',
|
||||
conversation_key: CONTACT_KEY,
|
||||
received_at: 1700002000,
|
||||
});
|
||||
|
||||
await act(async () => {
|
||||
result.current.recordMessageEvent({
|
||||
msg: activeMsg,
|
||||
activeConversation: true,
|
||||
isNewMessage: true,
|
||||
hasMention: true,
|
||||
});
|
||||
result.current.recordMessageEvent({
|
||||
msg: makeMessage({
|
||||
id: 7,
|
||||
type: 'CHAN',
|
||||
conversation_key: CHANNEL_KEY,
|
||||
received_at: 1700002001,
|
||||
}),
|
||||
activeConversation: false,
|
||||
isNewMessage: false,
|
||||
hasMention: true,
|
||||
});
|
||||
});
|
||||
|
||||
expect(result.current.unreadCounts[getStateKey('contact', CONTACT_KEY)]).toBeUndefined();
|
||||
expect(result.current.unreadCounts[getStateKey('channel', CHANNEL_KEY)]).toBeUndefined();
|
||||
expect(result.current.lastMessageTimes[getStateKey('contact', CONTACT_KEY)]).toBe(1700002000);
|
||||
expect(result.current.lastMessageTimes[getStateKey('channel', CHANNEL_KEY)]).toBe(1700002001);
|
||||
});
|
||||
});
|
||||
|
||||
10
frontend/src/utils/messageIdentity.ts
Normal file
10
frontend/src/utils/messageIdentity.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import type { Message } from '../types';
|
||||
|
||||
// Content identity matches the frontend's message-level dedup contract.
|
||||
export function getMessageContentKey(msg: Message): string {
|
||||
// When sender_timestamp exists, dedup by content (catches radio-path duplicates with different IDs).
|
||||
// When null, include msg.id so each message gets a unique key — avoids silently dropping
|
||||
// different messages that share the same text and received_at second.
|
||||
const ts = msg.sender_timestamp ?? `r${msg.received_at}-${msg.id}`;
|
||||
return `${msg.type}-${msg.conversation_key}-${msg.text}-${ts}`;
|
||||
}
|
||||
@@ -7,7 +7,7 @@ paths (packet_processor + event_handler fallback) don't double-store messages.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -410,7 +410,8 @@ class TestDualPathDedup:
|
||||
1. Primary: RX_LOG_DATA → packet_processor (decrypts with private key)
|
||||
2. Fallback: CONTACT_MSG_RECV → on_contact_message (MeshCore library decoded)
|
||||
|
||||
The fallback uses INSERT OR IGNORE to avoid double-storage when both fire.
|
||||
The fallback path should reconcile against the packet path instead of creating
|
||||
a second row, and should still add new path observations when available.
|
||||
"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -457,19 +458,7 @@ class TestDualPathDedup:
|
||||
"sender_timestamp": SENDER_TIMESTAMP,
|
||||
}
|
||||
|
||||
# Mock contact lookup to return a contact with the right key
|
||||
mock_contact = MagicMock()
|
||||
mock_contact.public_key = CONTACT_PUB
|
||||
mock_contact.type = 1 # Client, not repeater
|
||||
mock_contact.name = "TestContact"
|
||||
|
||||
with (
|
||||
patch("app.event_handlers.ContactRepository") as mock_contact_repo,
|
||||
patch("app.event_handlers.broadcast_event", mock_broadcast),
|
||||
):
|
||||
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact)
|
||||
mock_contact_repo.update_last_contacted = AsyncMock()
|
||||
|
||||
with patch("app.event_handlers.broadcast_event", mock_broadcast):
|
||||
await on_contact_message(mock_event)
|
||||
|
||||
# No additional message broadcast should have been sent
|
||||
@@ -538,18 +527,7 @@ class TestDualPathDedup:
|
||||
"sender_timestamp": SENDER_TIMESTAMP,
|
||||
}
|
||||
|
||||
mock_contact = MagicMock()
|
||||
mock_contact.public_key = upper_key # Uppercase from DB
|
||||
mock_contact.type = 1
|
||||
mock_contact.name = "TestContact"
|
||||
|
||||
with (
|
||||
patch("app.event_handlers.ContactRepository") as mock_contact_repo,
|
||||
patch("app.event_handlers.broadcast_event", mock_broadcast),
|
||||
):
|
||||
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact)
|
||||
mock_contact_repo.update_last_contacted = AsyncMock()
|
||||
|
||||
with patch("app.event_handlers.broadcast_event", mock_broadcast):
|
||||
await on_contact_message(mock_event)
|
||||
|
||||
# Should NOT create a second message (dedup catches it thanks to .lower())
|
||||
@@ -564,6 +542,146 @@ class TestDualPathDedup:
|
||||
)
|
||||
assert len(messages) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_handler_duplicate_adds_path_to_existing_dm(
|
||||
self, test_db, captured_broadcasts
|
||||
):
|
||||
"""Fallback DM duplicates should reconcile path updates onto the stored message."""
|
||||
from app.event_handlers import on_contact_message
|
||||
from app.packet_processor import create_dm_message_from_decrypted
|
||||
|
||||
await ContactRepository.upsert(
|
||||
{
|
||||
"public_key": CONTACT_PUB.lower(),
|
||||
"name": "TestContact",
|
||||
"type": 1,
|
||||
"last_seen": SENDER_TIMESTAMP,
|
||||
"last_contacted": SENDER_TIMESTAMP,
|
||||
"first_seen": SENDER_TIMESTAMP,
|
||||
"on_radio": False,
|
||||
"out_path_hash_mode": 0,
|
||||
}
|
||||
)
|
||||
|
||||
pkt_id, _ = await RawPacketRepository.create(b"primary_with_no_path", SENDER_TIMESTAMP)
|
||||
decrypted = DecryptedDirectMessage(
|
||||
timestamp=SENDER_TIMESTAMP,
|
||||
flags=0,
|
||||
message="Dual path with route update",
|
||||
dest_hash="fa",
|
||||
src_hash="a1",
|
||||
)
|
||||
|
||||
broadcasts, mock_broadcast = captured_broadcasts
|
||||
|
||||
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
||||
msg_id = await create_dm_message_from_decrypted(
|
||||
packet_id=pkt_id,
|
||||
decrypted=decrypted,
|
||||
their_public_key=CONTACT_PUB,
|
||||
our_public_key=OUR_PUB,
|
||||
received_at=SENDER_TIMESTAMP,
|
||||
outgoing=False,
|
||||
)
|
||||
|
||||
assert msg_id is not None
|
||||
broadcasts.clear()
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.payload = {
|
||||
"public_key": CONTACT_PUB,
|
||||
"text": "Dual path with route update",
|
||||
"txt_type": 0,
|
||||
"sender_timestamp": SENDER_TIMESTAMP,
|
||||
"path": "bbcc",
|
||||
"path_len": 2,
|
||||
}
|
||||
|
||||
with patch("app.event_handlers.broadcast_event", mock_broadcast):
|
||||
await on_contact_message(mock_event)
|
||||
|
||||
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
|
||||
assert message_broadcasts == []
|
||||
|
||||
ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"]
|
||||
assert len(ack_broadcasts) == 1
|
||||
assert ack_broadcasts[0]["data"]["message_id"] == msg_id
|
||||
assert ack_broadcasts[0]["data"]["ack_count"] == 0
|
||||
assert any(p["path"] == "bbcc" for p in ack_broadcasts[0]["data"]["paths"])
|
||||
|
||||
msg = await MessageRepository.get_by_id(msg_id)
|
||||
assert msg is not None
|
||||
assert msg.paths is not None
|
||||
assert any(p.path == "bbcc" for p in msg.paths)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_path_duplicate_reconciles_path_without_new_row(
|
||||
self, test_db, captured_broadcasts
|
||||
):
|
||||
"""Repeated fallback DMs should keep one row and merge path observations."""
|
||||
from app.event_handlers import on_contact_message
|
||||
|
||||
await ContactRepository.upsert(
|
||||
{
|
||||
"public_key": CONTACT_PUB.lower(),
|
||||
"name": "FallbackOnly",
|
||||
"type": 1,
|
||||
"last_seen": SENDER_TIMESTAMP,
|
||||
"last_contacted": SENDER_TIMESTAMP,
|
||||
"first_seen": SENDER_TIMESTAMP,
|
||||
"on_radio": False,
|
||||
"out_path_hash_mode": 0,
|
||||
}
|
||||
)
|
||||
|
||||
broadcasts, mock_broadcast = captured_broadcasts
|
||||
|
||||
first_event = MagicMock()
|
||||
first_event.payload = {
|
||||
"public_key": CONTACT_PUB,
|
||||
"text": "Fallback duplicate route test",
|
||||
"txt_type": 0,
|
||||
"sender_timestamp": SENDER_TIMESTAMP,
|
||||
}
|
||||
|
||||
with patch("app.event_handlers.broadcast_event", mock_broadcast):
|
||||
await on_contact_message(first_event)
|
||||
|
||||
messages = await MessageRepository.get_all(
|
||||
msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10
|
||||
)
|
||||
assert len(messages) == 1
|
||||
msg_id = messages[0].id
|
||||
|
||||
broadcasts.clear()
|
||||
|
||||
second_event = MagicMock()
|
||||
second_event.payload = {
|
||||
"public_key": CONTACT_PUB,
|
||||
"text": "Fallback duplicate route test",
|
||||
"txt_type": 0,
|
||||
"sender_timestamp": SENDER_TIMESTAMP,
|
||||
"path": "ddee",
|
||||
"path_len": 2,
|
||||
}
|
||||
|
||||
with patch("app.event_handlers.broadcast_event", mock_broadcast):
|
||||
await on_contact_message(second_event)
|
||||
|
||||
messages = await MessageRepository.get_all(
|
||||
msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10
|
||||
)
|
||||
assert len(messages) == 1
|
||||
|
||||
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
|
||||
assert message_broadcasts == []
|
||||
|
||||
ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"]
|
||||
assert len(ack_broadcasts) == 1
|
||||
assert ack_broadcasts[0]["data"]["message_id"] == msg_id
|
||||
assert ack_broadcasts[0]["data"]["ack_count"] == 0
|
||||
assert any(p["path"] == "ddee" for p in ack_broadcasts[0]["data"]["paths"])
|
||||
|
||||
|
||||
class TestDirectMessageDirectionDetection:
|
||||
"""Test src_hash/dest_hash direction detection in _process_direct_message.
|
||||
|
||||
Reference in New Issue
Block a user