4 Commits

Author SHA1 Message Date
Jack Kingsman
cb8a69a11d Finish frontend phase 3 2026-03-16 17:30:15 -07:00
Jack Kingsman
3b63496996 Frontend optimization part 2 2026-03-16 17:12:01 -07:00
Jack Kingsman
a1a749283e Phase 1 of frontend fixup 2026-03-16 16:48:58 -07:00
Jack Kingsman
d373585527 Unify our DM ingest 2026-03-16 16:36:11 -07:00
25 changed files with 1260 additions and 778 deletions

View File

@@ -4,19 +4,21 @@ from typing import TYPE_CHECKING
from meshcore import EventType
from app.models import CONTACT_TYPE_REPEATER, Contact, ContactUpsert
from app.models import Contact, ContactUpsert
from app.packet_processor import process_raw_packet
from app.repository import (
AmbiguousPublicKeyPrefixError,
ContactRepository,
)
from app.services import dm_ack_tracker
from app.services.contact_reconciliation import (
claim_prefix_messages_for_contact,
promote_prefix_contacts_for_contact,
record_contact_name_and_reconcile,
)
from app.services.messages import create_fallback_direct_message, increment_ack_and_broadcast
from app.services.dm_ingest import (
ingest_fallback_direct_message,
resolve_fallback_direct_message_context,
)
from app.services.messages import increment_ack_and_broadcast
from app.websocket import broadcast_event
if TYPE_CHECKING:
@@ -51,8 +53,8 @@ async def on_contact_message(event: "Event") -> None:
2. The packet processor couldn't match the sender to a known contact
The packet processor handles: decryption, storage, broadcast, bot trigger.
This handler only stores if the packet processor didn't already handle it
(detected via INSERT OR IGNORE returning None for duplicates).
This handler adapts CONTACT_MSG_RECV payloads into the shared DM ingest
workflow, which reconciles duplicates against the packet pipeline when possible.
"""
payload = event.payload
@@ -66,54 +68,27 @@ async def on_contact_message(event: "Event") -> None:
sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "")
received_at = int(time.time())
# Look up contact from database - use prefix lookup only if needed
# (get_by_key_or_prefix does exact match first, then prefix fallback)
try:
contact = await ContactRepository.get_by_key_or_prefix(sender_pubkey)
except AmbiguousPublicKeyPrefixError:
logger.warning(
"DM sender prefix '%s' is ambiguous; storing under prefix until full key is known",
sender_pubkey,
context = await resolve_fallback_direct_message_context(
sender_public_key=sender_pubkey,
received_at=received_at,
broadcast_fn=broadcast_event,
contact_repository=ContactRepository,
log=logger,
)
if context.skip_storage:
logger.debug(
"Skipping message from repeater %s (not stored in chat history)",
context.conversation_key[:12],
)
contact = None
if contact:
sender_pubkey = contact.public_key.lower()
return
# Promote any prefix-stored messages to this full key
await claim_prefix_messages_for_contact(public_key=sender_pubkey, log=logger)
# Skip messages from repeaters - they only send CLI responses, not chat messages.
# CLI responses are handled by the command endpoint and txt_type filter above.
if contact.type == CONTACT_TYPE_REPEATER:
logger.debug(
"Skipping message from repeater %s (not stored in chat history)",
sender_pubkey[:12],
)
return
elif sender_pubkey:
placeholder_upsert = ContactUpsert(
public_key=sender_pubkey.lower(),
type=0,
last_seen=received_at,
last_contacted=received_at,
first_seen=received_at,
on_radio=False,
out_path_hash_mode=-1,
)
await ContactRepository.upsert(placeholder_upsert)
contact = await ContactRepository.get_by_key(sender_pubkey.lower())
if contact:
broadcast_event("contact", contact.model_dump())
# Try to create message - INSERT OR IGNORE handles duplicates atomically
# If the packet processor already stored this message, this returns None
# Try to create or reconcile the message via the shared DM ingest service.
ts = payload.get("sender_timestamp")
sender_timestamp = ts if ts is not None else received_at
sender_name = contact.name if contact else None
path = payload.get("path")
path_len = payload.get("path_len")
message = await create_fallback_direct_message(
conversation_key=sender_pubkey,
message = await ingest_fallback_direct_message(
conversation_key=context.conversation_key,
text=payload.get("text", ""),
sender_timestamp=sender_timestamp,
received_at=received_at,
@@ -121,23 +96,24 @@ async def on_contact_message(event: "Event") -> None:
path_len=path_len,
txt_type=txt_type,
signature=payload.get("signature"),
sender_name=sender_name,
sender_key=sender_pubkey,
sender_name=context.sender_name,
sender_key=context.sender_key,
broadcast_fn=broadcast_event,
update_last_contacted_key=context.contact.public_key.lower() if context.contact else None,
)
if message is None:
# Already handled by packet processor (or exact duplicate) - nothing more to do
logger.debug("DM from %s already processed by packet processor", sender_pubkey[:12])
logger.debug(
"DM from %s already processed by packet processor", context.conversation_key[:12]
)
return
# If we get here, the packet processor didn't handle this message
# (likely because private key export is not available)
logger.debug("DM from %s handled by event handler (fallback path)", sender_pubkey[:12])
# Update contact last_contacted (contact was already fetched above)
if contact:
await ContactRepository.update_last_contacted(sender_pubkey, received_at)
logger.debug(
"DM from %s handled by event handler (fallback path)", context.conversation_key[:12]
)
async def on_rx_log_data(event: "Event") -> None:

320
app/services/dm_ingest.py Normal file
View File

@@ -0,0 +1,320 @@
import asyncio
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from app.models import CONTACT_TYPE_REPEATER, Contact, ContactUpsert, Message
from app.repository import (
AmbiguousPublicKeyPrefixError,
ContactRepository,
MessageRepository,
RawPacketRepository,
)
from app.services.contact_reconciliation import claim_prefix_messages_for_contact
from app.services.messages import (
broadcast_message,
build_message_model,
build_message_paths,
format_contact_log_target,
handle_duplicate_message,
reconcile_duplicate_message,
truncate_for_log,
)
if TYPE_CHECKING:
from app.decoder import DecryptedDirectMessage
logger = logging.getLogger(__name__)
BroadcastFn = Callable[..., Any]
_decrypted_dm_store_lock = asyncio.Lock()
@dataclass(frozen=True)
class FallbackDirectMessageContext:
conversation_key: str
contact: Contact | None
sender_name: str | None
sender_key: str | None
skip_storage: bool = False
async def _prepare_resolved_contact(
contact: Contact,
*,
log: logging.Logger | None = None,
) -> tuple[str, bool]:
conversation_key = contact.public_key.lower()
await claim_prefix_messages_for_contact(public_key=conversation_key, log=log or logger)
if contact.type == CONTACT_TYPE_REPEATER:
return conversation_key, True
return conversation_key, False
async def resolve_fallback_direct_message_context(
*,
sender_public_key: str,
received_at: int,
broadcast_fn: BroadcastFn,
contact_repository=ContactRepository,
log: logging.Logger | None = None,
) -> FallbackDirectMessageContext:
normalized_sender = sender_public_key.lower()
try:
contact = await contact_repository.get_by_key_or_prefix(normalized_sender)
except AmbiguousPublicKeyPrefixError:
(log or logger).warning(
"DM sender prefix '%s' is ambiguous; storing under prefix until full key is known",
sender_public_key,
)
contact = None
if contact is not None:
conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=log)
return FallbackDirectMessageContext(
conversation_key=conversation_key,
contact=contact,
sender_name=contact.name,
sender_key=conversation_key,
skip_storage=skip_storage,
)
if normalized_sender:
placeholder_upsert = ContactUpsert(
public_key=normalized_sender,
type=0,
last_seen=received_at,
last_contacted=received_at,
first_seen=received_at,
on_radio=False,
out_path_hash_mode=-1,
)
await contact_repository.upsert(placeholder_upsert)
contact = await contact_repository.get_by_key(normalized_sender)
if contact is not None:
broadcast_fn("contact", contact.model_dump())
return FallbackDirectMessageContext(
conversation_key=normalized_sender,
contact=contact,
sender_name=contact.name if contact else None,
sender_key=normalized_sender or None,
)
async def _store_direct_message(
*,
packet_id: int | None,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
path: str | None,
path_len: int | None,
outgoing: bool,
txt_type: int,
signature: str | None,
sender_name: str | None,
sender_key: str | None,
realtime: bool,
broadcast_fn: BroadcastFn,
update_last_contacted_key: str | None,
best_effort_content_dedup: bool,
linked_packet_dedup: bool,
message_repository=MessageRepository,
contact_repository=ContactRepository,
raw_packet_repository=RawPacketRepository,
) -> Message | None:
async def store() -> Message | None:
if linked_packet_dedup and packet_id is not None:
linked_message_id = await raw_packet_repository.get_linked_message_id(packet_id)
if linked_message_id is not None:
existing_msg = await message_repository.get_by_id(linked_message_id)
if existing_msg is not None:
await reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
received_at=received_at,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
if best_effort_content_dedup:
existing_msg = await message_repository.get_by_content(
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
)
if existing_msg is not None:
await reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
received_at=received_at,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
msg_id = await message_repository.create(
msg_type="PRIV",
text=text,
conversation_key=conversation_key,
sender_timestamp=sender_timestamp,
received_at=received_at,
path=path,
path_len=path_len,
txt_type=txt_type,
signature=signature,
outgoing=outgoing,
sender_key=sender_key,
sender_name=sender_name,
)
if msg_id is None:
await handle_duplicate_message(
packet_id=packet_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
path=path,
received_at=received_at,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
if packet_id is not None:
await raw_packet_repository.mark_decrypted(packet_id, msg_id)
message = build_message_model(
message_id=msg_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
paths=build_message_paths(path, received_at, path_len),
txt_type=txt_type,
signature=signature,
sender_key=sender_key,
outgoing=outgoing,
sender_name=sender_name,
)
broadcast_message(message=message, broadcast_fn=broadcast_fn, realtime=realtime)
if update_last_contacted_key:
await contact_repository.update_last_contacted(update_last_contacted_key, received_at)
return message
if linked_packet_dedup:
async with _decrypted_dm_store_lock:
return await store()
return await store()
async def ingest_decrypted_direct_message(
*,
packet_id: int,
decrypted: "DecryptedDirectMessage",
their_public_key: str,
received_at: int | None = None,
path: str | None = None,
path_len: int | None = None,
outgoing: bool = False,
realtime: bool = True,
broadcast_fn: BroadcastFn,
contact_repository=ContactRepository,
) -> Message | None:
conversation_key = their_public_key.lower()
contact = await contact_repository.get_by_key(conversation_key)
sender_name: str | None = None
if contact is not None:
conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=logger)
if skip_storage:
logger.debug(
"Skipping message from repeater %s (CLI responses not stored): %s",
conversation_key[:12],
(decrypted.message or "")[:50],
)
return None
if not outgoing:
sender_name = contact.name
received = received_at or int(time.time())
message = await _store_direct_message(
packet_id=packet_id,
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
received_at=received,
path=path,
path_len=path_len,
outgoing=outgoing,
txt_type=0,
signature=None,
sender_name=sender_name,
sender_key=conversation_key if not outgoing else None,
realtime=realtime,
broadcast_fn=broadcast_fn,
update_last_contacted_key=conversation_key,
best_effort_content_dedup=outgoing,
linked_packet_dedup=True,
)
if message is None:
return None
logger.info(
'Stored direct message "%s" for %r (msg ID %d in contact ID %s, outgoing=%s)',
truncate_for_log(decrypted.message),
format_contact_log_target(contact.name if contact else None, conversation_key),
message.id,
conversation_key,
outgoing,
)
return message
async def ingest_fallback_direct_message(
*,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
path: str | None,
path_len: int | None,
txt_type: int,
signature: str | None,
sender_name: str | None,
sender_key: str | None,
broadcast_fn: BroadcastFn,
update_last_contacted_key: str | None = None,
) -> Message | None:
return await _store_direct_message(
packet_id=None,
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
path=path,
path_len=path_len,
outgoing=False,
txt_type=txt_type,
signature=signature,
sender_name=sender_name,
sender_key=sender_key,
realtime=True,
broadcast_fn=broadcast_fn,
update_last_contacted_key=update_last_contacted_key,
best_effort_content_dedup=True,
linked_packet_dedup=False,
)

View File

@@ -1,10 +1,9 @@
import asyncio
import logging
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from app.models import CONTACT_TYPE_REPEATER, Message, MessagePath
from app.models import Message, MessagePath
from app.repository import ContactRepository, MessageRepository, RawPacketRepository
if TYPE_CHECKING:
@@ -14,10 +13,9 @@ logger = logging.getLogger(__name__)
BroadcastFn = Callable[..., Any]
LOG_MESSAGE_PREVIEW_LEN = 32
_decrypted_dm_store_lock = asyncio.Lock()
def _truncate_for_log(text: str, max_chars: int = LOG_MESSAGE_PREVIEW_LEN) -> str:
def truncate_for_log(text: str, max_chars: int = LOG_MESSAGE_PREVIEW_LEN) -> str:
"""Return a compact single-line message preview for log output."""
normalized = " ".join(text.split())
if len(normalized) <= max_chars:
@@ -30,7 +28,7 @@ def _format_channel_log_target(channel_name: str | None, channel_key: str) -> st
return channel_name or channel_key
def _format_contact_log_target(contact_name: str | None, public_key: str) -> str:
def format_contact_log_target(contact_name: str | None, public_key: str) -> str:
"""Return a human-friendly DM target label for logs."""
return contact_name or public_key[:12]
@@ -127,7 +125,7 @@ async def increment_ack_and_broadcast(
return ack_count
async def _reconcile_duplicate_message(
async def reconcile_duplicate_message(
*,
existing_msg: Message,
packet_id: int | None,
@@ -194,7 +192,7 @@ async def handle_duplicate_message(
)
return
await _reconcile_duplicate_message(
await reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
@@ -257,7 +255,7 @@ async def create_message_from_decrypted(
logger.info(
'Stored channel message "%s" for %r (msg ID %d in chan ID %s)',
_truncate_for_log(text),
truncate_for_log(text),
_format_channel_log_target(channel_name, channel_key_normalized),
msg_id,
channel_key_normalized,
@@ -298,165 +296,20 @@ async def create_dm_message_from_decrypted(
broadcast_fn: BroadcastFn,
) -> int | None:
"""Store and broadcast a decrypted direct message."""
contact = await ContactRepository.get_by_key(their_public_key)
if contact and contact.type == CONTACT_TYPE_REPEATER:
logger.debug(
"Skipping message from repeater %s (CLI responses not stored): %s",
their_public_key[:12],
(decrypted.message or "")[:50],
)
return None
from app.services.dm_ingest import ingest_decrypted_direct_message
received = received_at or int(time.time())
conversation_key = their_public_key.lower()
sender_name = contact.name if contact and not outgoing else None
async with _decrypted_dm_store_lock:
linked_message_id = await RawPacketRepository.get_linked_message_id(packet_id)
if linked_message_id is not None:
existing_msg = await MessageRepository.get_by_id(linked_message_id)
if existing_msg is not None:
await _reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
received_at=received,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
if outgoing:
existing_msg = await MessageRepository.get_by_content(
msg_type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
)
if existing_msg is not None:
await _reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
received_at=received,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
msg_id = await MessageRepository.create(
msg_type="PRIV",
text=decrypted.message,
conversation_key=conversation_key,
sender_timestamp=decrypted.timestamp,
received_at=received,
path=path,
path_len=path_len,
outgoing=outgoing,
sender_key=conversation_key if not outgoing else None,
sender_name=sender_name,
)
if msg_id is None:
await handle_duplicate_message(
packet_id=packet_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
path=path,
received_at=received,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
logger.info(
'Stored direct message "%s" for %r (msg ID %d in contact ID %s, outgoing=%s)',
_truncate_for_log(decrypted.message),
_format_contact_log_target(contact.name if contact else None, conversation_key),
msg_id,
conversation_key,
outgoing,
)
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
broadcast_message(
message=build_message_model(
message_id=msg_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
received_at=received,
paths=build_message_paths(path, received, path_len),
outgoing=outgoing,
sender_name=sender_name,
sender_key=conversation_key if not outgoing else None,
),
broadcast_fn=broadcast_fn,
realtime=realtime,
)
await ContactRepository.update_last_contacted(conversation_key, received)
return msg_id
async def create_fallback_direct_message(
*,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
path: str | None,
path_len: int | None,
txt_type: int,
signature: str | None,
sender_name: str | None,
sender_key: str | None,
broadcast_fn: BroadcastFn,
message_repository=MessageRepository,
) -> Message | None:
"""Store and broadcast a CONTACT_MSG_RECV fallback direct message."""
existing = await message_repository.get_by_content(
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
)
if existing is not None:
return None
msg_id = await message_repository.create(
msg_type="PRIV",
text=text,
conversation_key=conversation_key,
sender_timestamp=sender_timestamp,
message = await ingest_decrypted_direct_message(
packet_id=packet_id,
decrypted=decrypted,
their_public_key=their_public_key,
received_at=received_at,
path=path,
path_len=path_len,
txt_type=txt_type,
signature=signature,
sender_key=sender_key,
sender_name=sender_name,
outgoing=outgoing,
realtime=realtime,
broadcast_fn=broadcast_fn,
)
if msg_id is None:
return None
message = build_message_model(
message_id=msg_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
paths=build_message_paths(path, received_at, path_len),
txt_type=txt_type,
signature=signature,
sender_key=sender_key,
sender_name=sender_name,
)
broadcast_message(message=message, broadcast_fn=broadcast_fn)
return message
return message.id if message is not None else None
async def create_fallback_channel_message(

View File

@@ -1,6 +1,5 @@
import { useEffect, useCallback, useRef, useState } from 'react';
import { api } from './api';
import * as messageCache from './messageCache';
import { takePrefetchOrFetch } from './prefetch';
import { useWebSocket } from './useWebSocket';
import {
@@ -109,6 +108,7 @@ export function App() {
// useConversationRouter, but useConversationRouter needs channels/contacts from
// useContactsAndChannels. We break the cycle with a ref-based indirection.
const setActiveConversationRef = useRef<(conv: Conversation | null) => void>(() => {});
const removeConversationMessagesRef = useRef<(conversationId: string) => void>(() => {});
// --- Extracted hooks ---
@@ -180,6 +180,8 @@ export function App() {
setActiveConversation: (conv) => setActiveConversationRef.current(conv),
pendingDeleteFallbackRef,
hasSetDefaultConversation,
removeConversationMessages: (conversationId) =>
removeConversationMessagesRef.current(conversationId),
});
// useConversationRouter is called second — it receives channels/contacts as inputs
@@ -228,25 +230,27 @@ export function App() {
hasOlderMessages,
hasNewerMessages,
loadingNewer,
hasNewerMessagesRef,
fetchOlderMessages,
fetchNewerMessages,
jumpToBottom,
reloadCurrentConversation,
addMessageIfNew,
updateMessageAck,
triggerReconcile,
observeMessage,
receiveMessageAck,
reconcileOnReconnect,
renameConversationMessages,
removeConversationMessages,
clearConversationMessages,
} = useConversationMessages(activeConversation, targetMessageId);
removeConversationMessagesRef.current = removeConversationMessages;
const {
unreadCounts,
mentions,
lastMessageTimes,
unreadLastReadAts,
incrementUnread,
recordMessageEvent,
renameConversationState,
markAllRead,
trackNewMessage,
refreshUnreads,
} = useUnreadCounts(channels, contacts, activeConversation);
@@ -308,7 +312,7 @@ export function App() {
setHealth,
fetchConfig,
setRawPackets,
triggerReconcile,
reconcileOnReconnect,
refreshUnreads,
setChannels,
fetchAllContacts,
@@ -316,23 +320,23 @@ export function App() {
blockedKeysRef,
blockedNamesRef,
activeConversationRef,
hasNewerMessagesRef,
addMessageIfNew,
trackNewMessage,
incrementUnread,
observeMessage,
recordMessageEvent,
renameConversationState,
checkMention,
pendingDeleteFallbackRef,
setActiveConversation,
updateMessageAck,
renameConversationMessages,
removeConversationMessages,
receiveMessageAck,
notifyIncomingMessage,
});
const handleVisibilityPolicyChanged = useCallback(() => {
messageCache.clear();
clearConversationMessages();
reloadCurrentConversation();
void refreshUnreads();
setVisibilityVersion((current) => current + 1);
}, [refreshUnreads, reloadCurrentConversation]);
}, [clearConversationMessages, refreshUnreads, reloadCurrentConversation]);
const handleBlockKey = useCallback(
async (key: string) => {
@@ -361,7 +365,7 @@ export function App() {
activeConversationRef,
setContacts,
setChannels,
addMessageIfNew,
observeMessage,
messageInputRef,
});
const handleCreateCrackedChannel = useCallback(

View File

@@ -329,7 +329,7 @@ export function MessageList({
}, [messages, onResendChannelMessage]);
// Sort messages by received_at ascending (oldest first)
// Note: Deduplication is handled by useConversationMessages.addMessageIfNew()
// Note: Deduplication is handled by useConversationMessages.observeMessage()
// and the database UNIQUE constraint on (type, conversation_key, text, sender_timestamp)
const sortedMessages = useMemo(
() => [...messages].sort((a, b) => a.received_at - b.received_at || a.id - b.id),

View File

@@ -1,5 +1,5 @@
export { useUnreadCounts } from './useUnreadCounts';
export { useConversationMessages, getMessageContentKey } from './useConversationMessages';
export { useConversationMessages } from './useConversationMessages';
export { useRadioControl } from './useRadioControl';
export { useRepeaterDashboard } from './useRepeaterDashboard';
export { useAppShell } from './useAppShell';

View File

@@ -2,7 +2,6 @@ import { useState, useCallback, type MutableRefObject } from 'react';
import { api } from '../api';
import { takePrefetchOrFetch } from '../prefetch';
import { toast } from '../components/ui/sonner';
import * as messageCache from '../messageCache';
import { getContactDisplayName } from '../utils/pubkey';
import { findPublicChannel, PUBLIC_CHANNEL_KEY, PUBLIC_CHANNEL_NAME } from '../utils/publicChannel';
import type { Channel, Contact, Conversation } from '../types';
@@ -11,12 +10,14 @@ interface UseContactsAndChannelsArgs {
setActiveConversation: (conv: Conversation | null) => void;
pendingDeleteFallbackRef: MutableRefObject<boolean>;
hasSetDefaultConversation: MutableRefObject<boolean>;
removeConversationMessages: (conversationId: string) => void;
}
export function useContactsAndChannels({
setActiveConversation,
pendingDeleteFallbackRef,
hasSetDefaultConversation,
removeConversationMessages,
}: UseContactsAndChannelsArgs) {
const [contacts, setContacts] = useState<Contact[]>([]);
const [contactsLoaded, setContactsLoaded] = useState(false);
@@ -117,7 +118,7 @@ export function useContactsAndChannels({
try {
pendingDeleteFallbackRef.current = true;
await api.deleteChannel(key);
messageCache.remove(key);
removeConversationMessages(key);
const refreshedChannels = await api.getChannels();
setChannels(refreshedChannels);
const publicChannel = findPublicChannel(refreshedChannels);
@@ -135,7 +136,12 @@ export function useContactsAndChannels({
});
}
},
[setActiveConversation, pendingDeleteFallbackRef, hasSetDefaultConversation]
[
hasSetDefaultConversation,
pendingDeleteFallbackRef,
removeConversationMessages,
setActiveConversation,
]
);
const handleDeleteContact = useCallback(
@@ -144,7 +150,7 @@ export function useContactsAndChannels({
try {
pendingDeleteFallbackRef.current = true;
await api.deleteContact(publicKey);
messageCache.remove(publicKey);
removeConversationMessages(publicKey);
setContacts((prev) => prev.filter((c) => c.public_key !== publicKey));
const refreshedChannels = await api.getChannels();
setChannels(refreshedChannels);
@@ -163,7 +169,12 @@ export function useContactsAndChannels({
});
}
},
[setActiveConversation, pendingDeleteFallbackRef, hasSetDefaultConversation]
[
hasSetDefaultConversation,
pendingDeleteFallbackRef,
removeConversationMessages,
setActiveConversation,
]
);
return {

View File

@@ -10,7 +10,7 @@ interface UseConversationActionsArgs {
activeConversationRef: MutableRefObject<Conversation | null>;
setContacts: React.Dispatch<React.SetStateAction<Contact[]>>;
setChannels: React.Dispatch<React.SetStateAction<Channel[]>>;
addMessageIfNew: (msg: Message) => boolean;
observeMessage: (msg: Message) => { added: boolean; activeConversation: boolean };
messageInputRef: RefObject<MessageInputHandle | null>;
}
@@ -31,7 +31,7 @@ export function useConversationActions({
activeConversationRef,
setContacts,
setChannels,
addMessageIfNew,
observeMessage,
messageInputRef,
}: UseConversationActionsArgs): UseConversationActionsResult {
const mergeChannelIntoList = useCallback(
@@ -60,10 +60,10 @@ export function useConversationActions({
: await api.sendDirectMessage(activeConversation.id, text);
if (activeConversationRef.current?.id === conversationId) {
addMessageIfNew(sent);
observeMessage(sent);
}
},
[activeConversation, activeConversationRef, addMessageIfNew]
[activeConversation, activeConversationRef, observeMessage]
);
const handleResendChannelMessage = useCallback(
@@ -77,7 +77,7 @@ export function useConversationActions({
activeConversationRef.current?.type === 'channel' &&
activeConversationRef.current.id === resentMessage.conversation_key
) {
addMessageIfNew(resentMessage);
observeMessage(resentMessage);
}
toast.success(newTimestamp ? 'Message resent with new timestamp' : 'Message resent');
} catch (err) {
@@ -86,7 +86,7 @@ export function useConversationActions({
});
}
},
[activeConversationRef, addMessageIfNew]
[activeConversationRef, observeMessage]
);
const handleSetChannelFloodScopeOverride = useCallback(

View File

@@ -1,19 +1,174 @@
import {
useCallback,
useEffect,
useRef,
useState,
type Dispatch,
type MutableRefObject,
type SetStateAction,
} from 'react';
import { useCallback, useEffect, useRef, useState } from 'react';
import { toast } from '../components/ui/sonner';
import { api, isAbortError } from '../api';
import * as messageCache from '../messageCache';
import type { Conversation, Message, MessagePath } from '../types';
import { getMessageContentKey } from '../utils/messageIdentity';
const MAX_PENDING_ACKS = 500;
const MESSAGE_PAGE_SIZE = 200;
export const MAX_CACHED_CONVERSATIONS = 20;
export const MAX_MESSAGES_PER_ENTRY = 200;
interface CachedConversationEntry {
messages: Message[];
hasOlderMessages: boolean;
}
interface InternalCachedConversationEntry extends CachedConversationEntry {
contentKeys: Set<string>;
}
export class ConversationMessageCache {
private readonly cache = new Map<string, InternalCachedConversationEntry>();
get(id: string): CachedConversationEntry | undefined {
const entry = this.cache.get(id);
if (!entry) return undefined;
this.cache.delete(id);
this.cache.set(id, entry);
return {
messages: entry.messages,
hasOlderMessages: entry.hasOlderMessages,
};
}
set(id: string, entry: CachedConversationEntry): void {
const contentKeys = new Set(entry.messages.map((message) => getMessageContentKey(message)));
if (entry.messages.length > MAX_MESSAGES_PER_ENTRY) {
const trimmed = [...entry.messages]
.sort((a, b) => b.received_at - a.received_at)
.slice(0, MAX_MESSAGES_PER_ENTRY);
entry = { ...entry, messages: trimmed, hasOlderMessages: true };
}
const internalEntry: InternalCachedConversationEntry = {
...entry,
contentKeys,
};
this.cache.delete(id);
this.cache.set(id, internalEntry);
if (this.cache.size > MAX_CACHED_CONVERSATIONS) {
const lruKey = this.cache.keys().next().value as string;
this.cache.delete(lruKey);
}
}
addMessage(id: string, msg: Message): boolean {
const entry = this.cache.get(id);
const contentKey = getMessageContentKey(msg);
if (!entry) {
this.cache.set(id, {
messages: [msg],
hasOlderMessages: true,
contentKeys: new Set([contentKey]),
});
if (this.cache.size > MAX_CACHED_CONVERSATIONS) {
const lruKey = this.cache.keys().next().value as string;
this.cache.delete(lruKey);
}
return true;
}
if (entry.contentKeys.has(contentKey)) return false;
if (entry.messages.some((message) => message.id === msg.id)) return false;
entry.contentKeys.add(contentKey);
entry.messages = [...entry.messages, msg];
if (entry.messages.length > MAX_MESSAGES_PER_ENTRY) {
entry.messages = [...entry.messages]
.sort((a, b) => b.received_at - a.received_at)
.slice(0, MAX_MESSAGES_PER_ENTRY);
}
this.cache.delete(id);
this.cache.set(id, entry);
return true;
}
updateAck(messageId: number, ackCount: number, paths?: MessagePath[]): void {
for (const entry of this.cache.values()) {
const index = entry.messages.findIndex((message) => message.id === messageId);
if (index < 0) continue;
const current = entry.messages[index];
const updated = [...entry.messages];
updated[index] = {
...current,
acked: Math.max(current.acked, ackCount),
...(paths !== undefined && paths.length >= (current.paths?.length ?? 0) && { paths }),
};
entry.messages = updated;
return;
}
}
remove(id: string): void {
this.cache.delete(id);
}
rename(oldId: string, newId: string): void {
if (oldId === newId) return;
const oldEntry = this.cache.get(oldId);
if (!oldEntry) return;
const newEntry = this.cache.get(newId);
if (!newEntry) {
this.cache.delete(oldId);
this.cache.set(newId, oldEntry);
return;
}
const mergedMessages = [...newEntry.messages];
const seenIds = new Set(mergedMessages.map((message) => message.id));
for (const message of oldEntry.messages) {
if (!seenIds.has(message.id)) {
mergedMessages.push(message);
seenIds.add(message.id);
}
}
this.cache.delete(oldId);
this.cache.set(newId, {
messages: mergedMessages,
hasOlderMessages: newEntry.hasOlderMessages || oldEntry.hasOlderMessages,
contentKeys: new Set([...newEntry.contentKeys, ...oldEntry.contentKeys]),
});
}
clear(): void {
this.cache.clear();
}
}
export function reconcileConversationMessages(
current: Message[],
fetched: Message[]
): Message[] | null {
const currentById = new Map<number, { acked: number; pathsLen: number; text: string }>();
for (const message of current) {
currentById.set(message.id, {
acked: message.acked,
pathsLen: message.paths?.length ?? 0,
text: message.text,
});
}
let needsUpdate = false;
for (const message of fetched) {
const currentMessage = currentById.get(message.id);
if (
!currentMessage ||
currentMessage.acked !== message.acked ||
currentMessage.pathsLen !== (message.paths?.length ?? 0) ||
currentMessage.text !== message.text
) {
needsUpdate = true;
break;
}
}
if (!needsUpdate) return null;
const fetchedIds = new Set(fetched.map((message) => message.id));
const olderMessages = current.filter((message) => !fetchedIds.has(message.id));
return [...fetched, ...olderMessages];
}
export const conversationMessageCache = new ConversationMessageCache();
interface PendingAckUpdate {
ackCount: number;
@@ -56,15 +211,6 @@ export function mergePendingAck(
return existing;
}
// Generate a key for deduplicating messages by content
export function getMessageContentKey(msg: Message): string {
// When sender_timestamp exists, dedup by content (catches radio-path duplicates with different IDs).
// When null, include msg.id so each message gets a unique key — avoids silently dropping
// different messages that share the same text and received_at second.
const ts = msg.sender_timestamp ?? `r${msg.received_at}-${msg.id}`;
return `${msg.type}-${msg.conversation_key}-${msg.text}-${ts}`;
}
interface UseConversationMessagesResult {
messages: Message[];
messagesLoading: boolean;
@@ -72,21 +218,36 @@ interface UseConversationMessagesResult {
hasOlderMessages: boolean;
hasNewerMessages: boolean;
loadingNewer: boolean;
hasNewerMessagesRef: MutableRefObject<boolean>;
setMessages: Dispatch<SetStateAction<Message[]>>;
fetchOlderMessages: () => Promise<void>;
fetchNewerMessages: () => Promise<void>;
jumpToBottom: () => void;
reloadCurrentConversation: () => void;
addMessageIfNew: (msg: Message) => boolean;
updateMessageAck: (messageId: number, ackCount: number, paths?: MessagePath[]) => void;
triggerReconcile: () => void;
observeMessage: (msg: Message) => { added: boolean; activeConversation: boolean };
receiveMessageAck: (messageId: number, ackCount: number, paths?: MessagePath[]) => void;
reconcileOnReconnect: () => void;
renameConversationMessages: (oldId: string, newId: string) => void;
removeConversationMessages: (conversationId: string) => void;
clearConversationMessages: () => void;
}
function isMessageConversation(conversation: Conversation | null): conversation is Conversation {
return !!conversation && !['raw', 'map', 'visualizer', 'search'].includes(conversation.type);
}
function isActiveConversationMessage(
activeConversation: Conversation | null,
msg: Message
): boolean {
if (!activeConversation) return false;
if (msg.type === 'CHAN' && activeConversation.type === 'channel') {
return msg.conversation_key === activeConversation.id;
}
if (msg.type === 'PRIV' && activeConversation.type === 'contact') {
return msg.conversation_key === activeConversation.id;
}
return false;
}
function appendUniqueMessages(current: Message[], incoming: Message[]): Message[] {
if (incoming.length === 0) return current;
@@ -165,8 +326,10 @@ export function useConversationMessages(
const newerAbortControllerRef = useRef<AbortController | null>(null);
const fetchingConversationIdRef = useRef<string | null>(null);
const latestReconcileRequestIdRef = useRef(0);
const pendingReconnectReconcileRef = useRef(false);
const messagesRef = useRef<Message[]>([]);
const loadingOlderRef = useRef(false);
const loadingNewerRef = useRef(false);
const hasOlderMessagesRef = useRef(false);
const hasNewerMessagesRef = useRef(false);
const prevConversationIdRef = useRef<string | null>(null);
@@ -181,6 +344,10 @@ export function useConversationMessages(
loadingOlderRef.current = loadingOlder;
}, [loadingOlder]);
useEffect(() => {
loadingNewerRef.current = loadingNewer;
}, [loadingNewer]);
useEffect(() => {
hasOlderMessagesRef.current = hasOlderMessages;
}, [hasOlderMessages]);
@@ -208,6 +375,7 @@ export function useConversationMessages(
}
const conversationId = activeConversation.id;
pendingReconnectReconcileRef.current = false;
if (showLoading) {
setMessagesLoading(true);
@@ -229,7 +397,7 @@ export function useConversationMessages(
}
const messagesWithPendingAck = data.map((msg) => applyPendingAck(msg));
const merged = messageCache.reconcile(messagesRef.current, messagesWithPendingAck);
const merged = reconcileConversationMessages(messagesRef.current, messagesWithPendingAck);
const nextMessages = merged ?? messagesRef.current;
if (merged) {
setMessages(merged);
@@ -271,7 +439,7 @@ export function useConversationMessages(
const dataWithPendingAck = data.map((msg) => applyPendingAck(msg));
setHasOlderMessages(dataWithPendingAck.length >= MESSAGE_PAGE_SIZE);
const merged = messageCache.reconcile(messagesRef.current, dataWithPendingAck);
const merged = reconcileConversationMessages(messagesRef.current, dataWithPendingAck);
if (!merged) return;
setMessages(merged);
@@ -295,7 +463,7 @@ export function useConversationMessages(
}
const conversationId = activeConversation.id;
const oldestMessage = messages.reduce(
const oldestMessage = messagesRef.current.reduce(
(oldest, msg) => {
if (!oldest) return msg;
if (msg.received_at < oldest.received_at) return msg;
@@ -356,13 +524,19 @@ export function useConversationMessages(
loadingOlderRef.current = false;
setLoadingOlder(false);
}
}, [activeConversation, applyPendingAck, messages, syncSeenContent]);
}, [activeConversation, applyPendingAck, syncSeenContent]);
const fetchNewerMessages = useCallback(async () => {
if (!isMessageConversation(activeConversation) || loadingNewer || !hasNewerMessages) return;
if (
!isMessageConversation(activeConversation) ||
loadingNewerRef.current ||
!hasNewerMessagesRef.current
) {
return;
}
const conversationId = activeConversation.id;
const newestMessage = messages.reduce(
const newestMessage = messagesRef.current.reduce(
(newest, msg) => {
if (!newest) return msg;
if (msg.received_at > newest.received_at) return msg;
@@ -373,6 +547,7 @@ export function useConversationMessages(
);
if (!newestMessage) return;
loadingNewerRef.current = true;
setLoadingNewer(true);
const controller = new AbortController();
newerAbortControllerRef.current = controller;
@@ -401,7 +576,15 @@ export function useConversationMessages(
seenMessageContent.current.add(getMessageContentKey(msg));
}
}
setHasNewerMessages(dataWithPendingAck.length >= MESSAGE_PAGE_SIZE);
const stillHasNewerMessages = dataWithPendingAck.length >= MESSAGE_PAGE_SIZE;
setHasNewerMessages(stillHasNewerMessages);
if (!stillHasNewerMessages && pendingReconnectReconcileRef.current) {
pendingReconnectReconcileRef.current = false;
const requestId = latestReconcileRequestIdRef.current + 1;
latestReconcileRequestIdRef.current = requestId;
const reconcileController = new AbortController();
reconcileFromBackend(activeConversation, reconcileController.signal, requestId);
}
} catch (err) {
if (isAbortError(err)) {
return;
@@ -414,26 +597,36 @@ export function useConversationMessages(
if (newerAbortControllerRef.current === controller) {
newerAbortControllerRef.current = null;
}
loadingNewerRef.current = false;
setLoadingNewer(false);
}
}, [activeConversation, applyPendingAck, hasNewerMessages, loadingNewer, messages]);
}, [activeConversation, applyPendingAck, reconcileFromBackend]);
const jumpToBottom = useCallback(() => {
if (!activeConversation) return;
setHasNewerMessages(false);
messageCache.remove(activeConversation.id);
conversationMessageCache.remove(activeConversation.id);
void fetchLatestMessages(true);
}, [activeConversation, fetchLatestMessages]);
const reloadCurrentConversation = useCallback(() => {
if (!isMessageConversation(activeConversation)) return;
setHasNewerMessages(false);
messageCache.remove(activeConversation.id);
conversationMessageCache.remove(activeConversation.id);
setReloadVersion((current) => current + 1);
}, [activeConversation]);
const triggerReconcile = useCallback(() => {
if (!isMessageConversation(activeConversation)) return;
const reconcileOnReconnect = useCallback(() => {
if (!isMessageConversation(activeConversation)) {
return;
}
if (hasNewerMessagesRef.current) {
pendingReconnectReconcileRef.current = true;
return;
}
pendingReconnectReconcileRef.current = false;
const controller = new AbortController();
const requestId = latestReconcileRequestIdRef.current + 1;
latestReconcileRequestIdRef.current = requestId;
@@ -461,6 +654,7 @@ export function useConversationMessages(
prevConversationIdRef.current = newId;
prevReloadVersionRef.current = reloadVersion;
latestReconcileRequestIdRef.current = 0;
pendingReconnectReconcileRef.current = false;
// Preserve around-loaded context on the same conversation when search clears targetMessageId.
if (!conversationChanged && !targetMessageId && !reloadRequested) {
@@ -480,9 +674,8 @@ export function useConversationMessages(
messagesRef.current.length > 0 &&
!hasNewerMessagesRef.current
) {
messageCache.set(prevId, {
conversationMessageCache.set(prevId, {
messages: messagesRef.current,
seenContent: new Set(seenMessageContent.current),
hasOlderMessages: hasOlderMessagesRef.current,
});
}
@@ -524,10 +717,12 @@ export function useConversationMessages(
setMessagesLoading(false);
});
} else {
const cached = messageCache.get(activeConversation.id);
const cached = conversationMessageCache.get(activeConversation.id);
if (cached) {
setMessages(cached.messages);
seenMessageContent.current = new Set(cached.seenContent);
seenMessageContent.current = new Set(
cached.messages.map((message) => getMessageContentKey(message))
);
setHasOlderMessages(cached.hasOlderMessages);
setMessagesLoading(false);
const requestId = latestReconcileRequestIdRef.current + 1;
@@ -544,9 +739,8 @@ export function useConversationMessages(
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [activeConversation?.id, activeConversation?.type, targetMessageId, reloadVersion]);
// Add a message if it's new (deduplication)
// Returns true if the message was added, false if it was a duplicate
const addMessageIfNew = useCallback(
// Add a message to the active conversation if it is new.
const appendActiveMessageIfNew = useCallback(
(msg: Message): boolean => {
const msgWithPendingAck = applyPendingAck(msg);
const contentKey = getMessageContentKey(msgWithPendingAck);
@@ -616,6 +810,56 @@ export function useConversationMessages(
[messagesRef, setMessages, setPendingAck]
);
const receiveMessageAck = useCallback(
(messageId: number, ackCount: number, paths?: MessagePath[]) => {
updateMessageAck(messageId, ackCount, paths);
conversationMessageCache.updateAck(messageId, ackCount, paths);
},
[updateMessageAck]
);
const observeMessage = useCallback(
(msg: Message): { added: boolean; activeConversation: boolean } => {
const msgWithPendingAck = applyPendingAck(msg);
const activeConversationMessage = isActiveConversationMessage(
activeConversation,
msgWithPendingAck
);
if (activeConversationMessage) {
if (hasNewerMessagesRef.current) {
return { added: false, activeConversation: true };
}
return {
added: appendActiveMessageIfNew(msgWithPendingAck),
activeConversation: true,
};
}
return {
added: conversationMessageCache.addMessage(
msgWithPendingAck.conversation_key,
msgWithPendingAck
),
activeConversation: false,
};
},
[activeConversation, appendActiveMessageIfNew, applyPendingAck, hasNewerMessagesRef]
);
const renameConversationMessages = useCallback((oldId: string, newId: string) => {
conversationMessageCache.rename(oldId, newId);
}, []);
const removeConversationMessages = useCallback((conversationId: string) => {
conversationMessageCache.remove(conversationId);
}, []);
const clearConversationMessages = useCallback(() => {
conversationMessageCache.clear();
}, []);
return {
messages,
messagesLoading,
@@ -623,14 +867,15 @@ export function useConversationMessages(
hasOlderMessages,
hasNewerMessages,
loadingNewer,
hasNewerMessagesRef,
setMessages,
fetchOlderMessages,
fetchNewerMessages,
jumpToBottom,
reloadCurrentConversation,
addMessageIfNew,
updateMessageAck,
triggerReconcile,
observeMessage,
receiveMessageAck,
reconcileOnReconnect,
renameConversationMessages,
removeConversationMessages,
clearConversationMessages,
};
}

View File

@@ -6,14 +6,12 @@ import {
type SetStateAction,
} from 'react';
import { api } from '../api';
import * as messageCache from '../messageCache';
import type { UseWebSocketOptions } from '../useWebSocket';
import { toast } from '../components/ui/sonner';
import { getStateKey } from '../utils/conversationState';
import { mergeContactIntoList } from '../utils/contactMerge';
import { getContactDisplayName } from '../utils/pubkey';
import { appendRawPacketUnique } from '../utils/rawPacketIdentity';
import { getMessageContentKey } from './useConversationMessages';
import type {
Channel,
Contact,
@@ -29,7 +27,7 @@ interface UseRealtimeAppStateArgs {
setHealth: Dispatch<SetStateAction<HealthStatus | null>>;
fetchConfig: () => void | Promise<void>;
setRawPackets: Dispatch<SetStateAction<RawPacket[]>>;
triggerReconcile: () => void;
reconcileOnReconnect: () => void;
refreshUnreads: () => Promise<void>;
setChannels: Dispatch<SetStateAction<Channel[]>>;
fetchAllContacts: () => Promise<Contact[]>;
@@ -37,15 +35,20 @@ interface UseRealtimeAppStateArgs {
blockedKeysRef: MutableRefObject<string[]>;
blockedNamesRef: MutableRefObject<string[]>;
activeConversationRef: MutableRefObject<Conversation | null>;
hasNewerMessagesRef: MutableRefObject<boolean>;
addMessageIfNew: (msg: Message) => boolean;
trackNewMessage: (msg: Message) => void;
incrementUnread: (stateKey: string, hasMention?: boolean) => void;
observeMessage: (msg: Message) => { added: boolean; activeConversation: boolean };
recordMessageEvent: (args: {
msg: Message;
activeConversation: boolean;
isNewMessage: boolean;
hasMention?: boolean;
}) => void;
renameConversationState: (oldStateKey: string, newStateKey: string) => void;
checkMention: (text: string) => boolean;
pendingDeleteFallbackRef: MutableRefObject<boolean>;
setActiveConversation: (conv: Conversation | null) => void;
updateMessageAck: (messageId: number, ackCount: number, paths?: MessagePath[]) => void;
renameConversationMessages: (oldId: string, newId: string) => void;
removeConversationMessages: (conversationId: string) => void;
receiveMessageAck: (messageId: number, ackCount: number, paths?: MessagePath[]) => void;
notifyIncomingMessage?: (msg: Message) => void;
maxRawPackets?: number;
}
@@ -71,30 +74,12 @@ function isMessageBlocked(msg: Message, blockedKeys: string[], blockedNames: str
return blockedNames.length > 0 && !!msg.sender_name && blockedNames.includes(msg.sender_name);
}
function isActiveConversationMessage(
activeConversation: Conversation | null,
msg: Message
): boolean {
if (!activeConversation) return false;
if (msg.type === 'CHAN' && activeConversation.type === 'channel') {
return msg.conversation_key === activeConversation.id;
}
if (msg.type === 'PRIV' && activeConversation.type === 'contact') {
return msg.conversation_key === activeConversation.id;
}
return false;
}
function isMessageConversation(conversation: Conversation | null): boolean {
return conversation?.type === 'channel' || conversation?.type === 'contact';
}
export function useRealtimeAppState({
prevHealthRef,
setHealth,
fetchConfig,
setRawPackets,
triggerReconcile,
reconcileOnReconnect,
refreshUnreads,
setChannels,
fetchAllContacts,
@@ -102,15 +87,15 @@ export function useRealtimeAppState({
blockedKeysRef,
blockedNamesRef,
activeConversationRef,
hasNewerMessagesRef,
addMessageIfNew,
trackNewMessage,
incrementUnread,
observeMessage,
recordMessageEvent,
renameConversationState,
checkMention,
pendingDeleteFallbackRef,
setActiveConversation,
updateMessageAck,
renameConversationMessages,
removeConversationMessages,
receiveMessageAck,
notifyIncomingMessage,
maxRawPackets = 500,
}: UseRealtimeAppStateArgs): UseWebSocketOptions {
@@ -184,11 +169,7 @@ export function useRealtimeAppState({
},
onReconnect: () => {
setRawPackets([]);
if (
!(hasNewerMessagesRef.current && isMessageConversation(activeConversationRef.current))
) {
triggerReconcile();
}
reconcileOnReconnect();
refreshUnreads();
api.getChannels().then(setChannels).catch(console.error);
fetchAllContacts()
@@ -200,34 +181,14 @@ export function useRealtimeAppState({
return;
}
const isForActiveConversation = isActiveConversationMessage(
activeConversationRef.current,
msg
);
let isNewMessage = false;
if (isForActiveConversation && !hasNewerMessagesRef.current) {
isNewMessage = addMessageIfNew(msg);
}
trackNewMessage(msg);
const contentKey = getMessageContentKey(msg);
if (!isForActiveConversation) {
isNewMessage = messageCache.addMessage(msg.conversation_key, msg, contentKey);
if (!msg.outgoing && isNewMessage) {
let stateKey: string | null = null;
if (msg.type === 'CHAN' && msg.conversation_key) {
stateKey = getStateKey('channel', msg.conversation_key);
} else if (msg.type === 'PRIV' && msg.conversation_key) {
stateKey = getStateKey('contact', msg.conversation_key);
}
if (stateKey) {
incrementUnread(stateKey, checkMention(msg.text));
}
}
}
const { added: isNewMessage, activeConversation: isForActiveConversation } =
observeMessage(msg);
recordMessageEvent({
msg,
activeConversation: isForActiveConversation,
isNewMessage,
hasMention: checkMention(msg.text),
});
if (!msg.outgoing && isNewMessage) {
notifyIncomingMessage?.(msg);
@@ -243,7 +204,7 @@ export function useRealtimeAppState({
contact
)
);
messageCache.rename(previousPublicKey, contact.public_key);
renameConversationMessages(previousPublicKey, contact.public_key);
renameConversationState(
getStateKey('contact', previousPublicKey),
getStateKey('contact', contact.public_key)
@@ -263,7 +224,7 @@ export function useRealtimeAppState({
},
onContactDeleted: (publicKey: string) => {
setContacts((prev) => prev.filter((c) => c.public_key !== publicKey));
messageCache.remove(publicKey);
removeConversationMessages(publicKey);
const active = activeConversationRef.current;
if (active?.type === 'contact' && active.id === publicKey) {
pendingDeleteFallbackRef.current = true;
@@ -272,7 +233,7 @@ export function useRealtimeAppState({
},
onChannelDeleted: (key: string) => {
setChannels((prev) => prev.filter((c) => c.key !== key));
messageCache.remove(key);
removeConversationMessages(key);
const active = activeConversationRef.current;
if (active?.type === 'channel' && active.id === key) {
pendingDeleteFallbackRef.current = true;
@@ -283,34 +244,33 @@ export function useRealtimeAppState({
setRawPackets((prev) => appendRawPacketUnique(prev, packet, maxRawPackets));
},
onMessageAcked: (messageId: number, ackCount: number, paths?: MessagePath[]) => {
updateMessageAck(messageId, ackCount, paths);
messageCache.updateAck(messageId, ackCount, paths);
receiveMessageAck(messageId, ackCount, paths);
},
}),
[
activeConversationRef,
addMessageIfNew,
blockedKeysRef,
blockedNamesRef,
checkMention,
fetchAllContacts,
fetchConfig,
hasNewerMessagesRef,
incrementUnread,
renameConversationState,
renameConversationMessages,
maxRawPackets,
mergeChannelIntoList,
pendingDeleteFallbackRef,
prevHealthRef,
recordMessageEvent,
receiveMessageAck,
observeMessage,
refreshUnreads,
reconcileOnReconnect,
removeConversationMessages,
setActiveConversation,
setChannels,
setContacts,
setHealth,
setRawPackets,
trackNewMessage,
triggerReconcile,
updateMessageAck,
notifyIncomingMessage,
]
);

View File

@@ -16,10 +16,14 @@ interface UseUnreadCountsResult {
mentions: Record<string, boolean>;
lastMessageTimes: ConversationTimes;
unreadLastReadAts: Record<string, number | null>;
incrementUnread: (stateKey: string, hasMention?: boolean) => void;
recordMessageEvent: (args: {
msg: Message;
activeConversation: boolean;
isNewMessage: boolean;
hasMention?: boolean;
}) => void;
renameConversationState: (oldStateKey: string, newStateKey: string) => void;
markAllRead: () => void;
trackNewMessage: (msg: Message) => void;
refreshUnreads: () => Promise<void>;
}
@@ -162,7 +166,6 @@ export function useUnreadCounts(
}
}, [activeConversation]);
// Increment unread count for a conversation
const incrementUnread = useCallback((stateKey: string, hasMention?: boolean) => {
setUnreadCounts((prev) => ({
...prev,
@@ -176,6 +179,40 @@ export function useUnreadCounts(
}
}, []);
const recordMessageEvent = useCallback(
({
msg,
activeConversation: isActiveConversation,
isNewMessage,
hasMention,
}: {
msg: Message;
activeConversation: boolean;
isNewMessage: boolean;
hasMention?: boolean;
}) => {
let stateKey: string | null = null;
if (msg.type === 'CHAN' && msg.conversation_key) {
stateKey = getStateKey('channel', msg.conversation_key);
} else if (msg.type === 'PRIV' && msg.conversation_key) {
stateKey = getStateKey('contact', msg.conversation_key);
}
if (!stateKey) {
return;
}
const timestamp = msg.received_at || Math.floor(Date.now() / 1000);
const updated = setLastMessageTime(stateKey, timestamp);
setLastMessageTimes(updated);
if (!isActiveConversation && !msg.outgoing && isNewMessage) {
incrementUnread(stateKey, hasMention);
}
},
[incrementUnread]
);
const renameConversationState = useCallback((oldStateKey: string, newStateKey: string) => {
if (oldStateKey === newStateKey) return;
@@ -212,31 +249,14 @@ export function useUnreadCounts(
});
}, []);
// Track a new incoming message for unread counts
const trackNewMessage = useCallback((msg: Message) => {
let conversationKey: string | null = null;
if (msg.type === 'CHAN' && msg.conversation_key) {
conversationKey = getStateKey('channel', msg.conversation_key);
} else if (msg.type === 'PRIV' && msg.conversation_key) {
conversationKey = getStateKey('contact', msg.conversation_key);
}
if (conversationKey) {
const timestamp = msg.received_at || Math.floor(Date.now() / 1000);
const updated = setLastMessageTime(conversationKey, timestamp);
setLastMessageTimes(updated);
}
}, []);
return {
unreadCounts,
mentions,
lastMessageTimes,
unreadLastReadAts,
incrementUnread,
recordMessageEvent,
renameConversationState,
markAllRead,
trackNewMessage,
refreshUnreads: fetchUnreads,
};
}

View File

@@ -1,174 +0,0 @@
/**
* LRU message cache for recently-visited conversations.
*
* Uses Map insertion-order semantics: the most recently used entry
* is always at the end. Eviction removes the first (least-recently-used) entry.
*
* Cache size: 20 conversations, 200 messages each (~2.4MB worst case).
*/
import type { Message, MessagePath } from './types';
export const MAX_CACHED_CONVERSATIONS = 20;
export const MAX_MESSAGES_PER_ENTRY = 200;
interface CacheEntry {
messages: Message[];
seenContent: Set<string>;
hasOlderMessages: boolean;
}
const cache = new Map<string, CacheEntry>();
/** Get a cached entry and promote it to most-recently-used. */
export function get(id: string): CacheEntry | undefined {
const entry = cache.get(id);
if (!entry) return undefined;
// Promote to MRU: delete and re-insert
cache.delete(id);
cache.set(id, entry);
return entry;
}
/** Insert or update an entry at MRU position, evicting LRU if over capacity. */
export function set(id: string, entry: CacheEntry): void {
// Trim to most recent messages to bound memory
if (entry.messages.length > MAX_MESSAGES_PER_ENTRY) {
const trimmed = [...entry.messages]
.sort((a, b) => b.received_at - a.received_at)
.slice(0, MAX_MESSAGES_PER_ENTRY);
entry = { ...entry, messages: trimmed, hasOlderMessages: true };
}
// Remove first so re-insert moves to end
cache.delete(id);
cache.set(id, entry);
// Evict LRU (first entry) if over capacity
if (cache.size > MAX_CACHED_CONVERSATIONS) {
const lruKey = cache.keys().next().value as string;
cache.delete(lruKey);
}
}
/** Add a message to a cached conversation with dedup. Returns true if new, false if duplicate. */
export function addMessage(id: string, msg: Message, contentKey: string): boolean {
const entry = cache.get(id);
if (!entry) {
// Auto-create a minimal entry for never-visited conversations
cache.set(id, {
messages: [msg],
seenContent: new Set([contentKey]),
hasOlderMessages: true,
});
// Evict LRU if over capacity
if (cache.size > MAX_CACHED_CONVERSATIONS) {
const lruKey = cache.keys().next().value as string;
cache.delete(lruKey);
}
return true;
}
if (entry.seenContent.has(contentKey)) return false;
if (entry.messages.some((m) => m.id === msg.id)) return false;
entry.seenContent.add(contentKey);
entry.messages = [...entry.messages, msg];
// Trim if over limit (drop oldest by received_at)
if (entry.messages.length > MAX_MESSAGES_PER_ENTRY) {
entry.messages = [...entry.messages]
.sort((a, b) => b.received_at - a.received_at)
.slice(0, MAX_MESSAGES_PER_ENTRY);
}
// Promote to MRU so actively-messaged conversations aren't evicted
cache.delete(id);
cache.set(id, entry);
return true;
}
/** Scan all cached entries for a message ID and update its ack/paths. */
export function updateAck(messageId: number, ackCount: number, paths?: MessagePath[]): void {
for (const entry of cache.values()) {
const idx = entry.messages.findIndex((m) => m.id === messageId);
if (idx >= 0) {
const current = entry.messages[idx];
const updated = [...entry.messages];
updated[idx] = {
...current,
acked: Math.max(current.acked, ackCount),
...(paths !== undefined && paths.length >= (current.paths?.length ?? 0) && { paths }),
};
entry.messages = updated;
return; // Message IDs are unique, stop after first match
}
}
}
/**
* Compare fetched messages against current state.
* Returns merged array if there are differences (new messages or ack changes),
* or null if the cache is already consistent (happy path — no rerender needed).
* Preserves any older paginated messages not present in the fetched page.
*/
export function reconcile(current: Message[], fetched: Message[]): Message[] | null {
const currentById = new Map<number, { acked: number; pathsLen: number; text: string }>();
for (const m of current) {
currentById.set(m.id, { acked: m.acked, pathsLen: m.paths?.length ?? 0, text: m.text });
}
let needsUpdate = false;
for (const m of fetched) {
const cur = currentById.get(m.id);
if (
!cur ||
cur.acked !== m.acked ||
cur.pathsLen !== (m.paths?.length ?? 0) ||
cur.text !== m.text
) {
needsUpdate = true;
break;
}
}
if (!needsUpdate) return null;
// Merge: fresh recent page + any older paginated messages not in the fetch
const fetchedIds = new Set(fetched.map((m) => m.id));
const olderMessages = current.filter((m) => !fetchedIds.has(m.id));
return [...fetched, ...olderMessages];
}
/** Evict a specific conversation from the cache. */
export function remove(id: string): void {
cache.delete(id);
}
/** Move cached conversation state to a new conversation id. */
export function rename(oldId: string, newId: string): void {
if (oldId === newId) return;
const oldEntry = cache.get(oldId);
if (!oldEntry) return;
const newEntry = cache.get(newId);
if (!newEntry) {
cache.delete(oldId);
cache.set(newId, oldEntry);
return;
}
const mergedMessages = [...newEntry.messages];
const seenIds = new Set(mergedMessages.map((message) => message.id));
for (const message of oldEntry.messages) {
if (!seenIds.has(message.id)) {
mergedMessages.push(message);
seenIds.add(message.id);
}
}
cache.delete(oldId);
cache.set(newId, {
messages: mergedMessages,
seenContent: new Set([...newEntry.seenContent, ...oldEntry.seenContent]),
hasOlderMessages: newEntry.hasOlderMessages || oldEntry.hasOlderMessages,
});
}
/** Clear the entire cache. */
export function clear(): void {
cache.clear();
}

View File

@@ -31,15 +31,15 @@ const mocks = vi.hoisted(() => ({
error: vi.fn(),
},
hookFns: {
setMessages: vi.fn(),
fetchMessages: vi.fn(async () => {}),
fetchOlderMessages: vi.fn(async () => {}),
addMessageIfNew: vi.fn(),
updateMessageAck: vi.fn(),
triggerReconcile: vi.fn(),
incrementUnread: vi.fn(),
observeMessage: vi.fn(() => ({ added: false, activeConversation: false })),
receiveMessageAck: vi.fn(),
reconcileOnReconnect: vi.fn(),
renameConversationMessages: vi.fn(),
removeConversationMessages: vi.fn(),
clearConversationMessages: vi.fn(),
recordMessageEvent: vi.fn(),
markAllRead: vi.fn(),
trackNewMessage: vi.fn(),
refreshUnreads: vi.fn(async () => {}),
},
}));
@@ -63,38 +63,30 @@ vi.mock('../hooks', async (importOriginal) => {
hasOlderMessages: false,
hasNewerMessages: false,
loadingNewer: false,
hasNewerMessagesRef: { current: false },
setMessages: mocks.hookFns.setMessages,
fetchMessages: mocks.hookFns.fetchMessages,
fetchOlderMessages: mocks.hookFns.fetchOlderMessages,
fetchNewerMessages: vi.fn(async () => {}),
jumpToBottom: vi.fn(),
reloadCurrentConversation: vi.fn(),
addMessageIfNew: mocks.hookFns.addMessageIfNew,
updateMessageAck: mocks.hookFns.updateMessageAck,
triggerReconcile: mocks.hookFns.triggerReconcile,
observeMessage: mocks.hookFns.observeMessage,
receiveMessageAck: mocks.hookFns.receiveMessageAck,
reconcileOnReconnect: mocks.hookFns.reconcileOnReconnect,
renameConversationMessages: mocks.hookFns.renameConversationMessages,
removeConversationMessages: mocks.hookFns.removeConversationMessages,
clearConversationMessages: mocks.hookFns.clearConversationMessages,
}),
useUnreadCounts: () => ({
unreadCounts: {},
mentions: {},
lastMessageTimes: {},
unreadLastReadAts: {},
incrementUnread: mocks.hookFns.incrementUnread,
recordMessageEvent: mocks.hookFns.recordMessageEvent,
renameConversationState: vi.fn(),
markAllRead: mocks.hookFns.markAllRead,
trackNewMessage: mocks.hookFns.trackNewMessage,
refreshUnreads: mocks.hookFns.refreshUnreads,
}),
getMessageContentKey: () => 'content-key',
};
});
vi.mock('../messageCache', () => ({
addMessage: vi.fn(),
updateAck: vi.fn(),
remove: vi.fn(),
}));
vi.mock('../components/StatusBar', () => ({
StatusBar: ({
settingsMode,
@@ -295,7 +287,7 @@ describe('App favorite toggle flow', () => {
await waitFor(() => {
expect(mocks.api.getChannels).toHaveBeenCalledTimes(2);
});
expect(mocks.hookFns.triggerReconcile).toHaveBeenCalledTimes(1);
expect(mocks.hookFns.reconcileOnReconnect).toHaveBeenCalledTimes(1);
expect(mocks.hookFns.refreshUnreads).toHaveBeenCalledTimes(1);
});

View File

@@ -37,15 +37,16 @@ vi.mock('../hooks', async (importOriginal) => {
hasOlderMessages: false,
hasNewerMessages: false,
loadingNewer: false,
hasNewerMessagesRef: { current: false },
setMessages: vi.fn(),
fetchOlderMessages: vi.fn(async () => {}),
fetchNewerMessages: vi.fn(async () => {}),
jumpToBottom: vi.fn(),
reloadCurrentConversation: vi.fn(),
addMessageIfNew: vi.fn(),
updateMessageAck: vi.fn(),
triggerReconcile: vi.fn(),
observeMessage: vi.fn(() => ({ added: false, activeConversation: false })),
receiveMessageAck: vi.fn(),
reconcileOnReconnect: vi.fn(),
renameConversationMessages: vi.fn(),
removeConversationMessages: vi.fn(),
clearConversationMessages: vi.fn(),
};
},
useUnreadCounts: () => ({
@@ -53,22 +54,14 @@ vi.mock('../hooks', async (importOriginal) => {
mentions: {},
lastMessageTimes: {},
unreadLastReadAts: {},
incrementUnread: vi.fn(),
recordMessageEvent: vi.fn(),
renameConversationState: vi.fn(),
markAllRead: vi.fn(),
trackNewMessage: vi.fn(),
refreshUnreads: vi.fn(),
}),
getMessageContentKey: () => 'content-key',
};
});
vi.mock('../messageCache', () => ({
addMessage: vi.fn(),
updateAck: vi.fn(),
remove: vi.fn(),
}));
vi.mock('../components/StatusBar', () => ({
StatusBar: () => <div data-testid="status-bar" />,
}));

View File

@@ -32,38 +32,30 @@ vi.mock('../hooks', async (importOriginal) => {
hasOlderMessages: false,
hasNewerMessages: false,
loadingNewer: false,
hasNewerMessagesRef: { current: false },
setMessages: vi.fn(),
fetchMessages: vi.fn(async () => {}),
fetchOlderMessages: vi.fn(async () => {}),
fetchNewerMessages: vi.fn(async () => {}),
jumpToBottom: vi.fn(),
reloadCurrentConversation: vi.fn(),
addMessageIfNew: vi.fn(),
updateMessageAck: vi.fn(),
triggerReconcile: vi.fn(),
observeMessage: vi.fn(() => ({ added: false, activeConversation: false })),
receiveMessageAck: vi.fn(),
reconcileOnReconnect: vi.fn(),
renameConversationMessages: vi.fn(),
removeConversationMessages: vi.fn(),
clearConversationMessages: vi.fn(),
}),
useUnreadCounts: () => ({
unreadCounts: {},
mentions: {},
lastMessageTimes: {},
unreadLastReadAts: {},
incrementUnread: vi.fn(),
recordMessageEvent: vi.fn(),
renameConversationState: vi.fn(),
markAllRead: vi.fn(),
trackNewMessage: vi.fn(),
refreshUnreads: vi.fn(async () => {}),
}),
getMessageContentKey: () => 'content-key',
};
});
vi.mock('../messageCache', () => ({
addMessage: vi.fn(),
updateAck: vi.fn(),
remove: vi.fn(),
}));
vi.mock('../components/StatusBar', () => ({
StatusBar: () => <div data-testid="status-bar" />,
}));

View File

@@ -8,12 +8,12 @@
* between backend and frontend - both sides test against the same data.
*/
import { describe, it, expect, beforeEach } from 'vitest';
import { describe, it, expect } from 'vitest';
import fixtures from './fixtures/websocket_events.json';
import { getMessageContentKey } from '../hooks/useConversationMessages';
import { getStateKey } from '../utils/conversationState';
import { mergeContactIntoList } from '../utils/contactMerge';
import * as messageCache from '../messageCache';
import { ConversationMessageCache } from '../hooks/useConversationMessages';
import { getMessageContentKey } from '../utils/messageIdentity';
import type { Contact, Message } from '../types';
/**
@@ -25,6 +25,7 @@ interface MockState {
unreadCounts: Record<string, number>;
lastMessageTimes: Record<string, number>;
seenActiveContent: Set<string>;
messageCache: ConversationMessageCache;
}
function createMockState(): MockState {
@@ -33,6 +34,7 @@ function createMockState(): MockState {
unreadCounts: {},
lastMessageTimes: {},
seenActiveContent: new Set(),
messageCache: new ConversationMessageCache(),
};
}
@@ -68,7 +70,7 @@ function handleMessageEvent(
state.lastMessageTimes[stateKey] = msg.received_at;
if (!isForActiveConversation) {
const isNew = messageCache.addMessage(msg.conversation_key, msg, contentKey);
const isNew = state.messageCache.addMessage(msg.conversation_key, msg);
if (!msg.outgoing && isNew) {
state.unreadCounts[stateKey] = (state.unreadCounts[stateKey] || 0) + 1;
unreadIncremented = true;
@@ -78,11 +80,6 @@ function handleMessageEvent(
return { added, unreadIncremented };
}
// Clear messageCache between tests to avoid cross-test contamination
beforeEach(() => {
messageCache.clear();
});
describe('Integration: Channel Message Events', () => {
const fixture = fixtures.channel_message;
@@ -180,7 +177,7 @@ describe('Integration: No phantom unreads from mesh echoes (hitlist #8 regressio
// dual-set design the global set would drop msg-0's key during pruning,
// so a later mesh echo of msg-0 would pass the global check and
// phantom-increment unread. With the fix, messageCache's per-conversation
// seenContent is the single source of truth and is never pruned.
// Cached messages remain the source of truth for inactive-conversation dedup.
const MESSAGE_COUNT = 1001;
for (let i = 0; i < MESSAGE_COUNT; i++) {
const msg: Message = {
@@ -342,11 +339,8 @@ describe('Integration: Contact Merge', () => {
// --- ACK + messageCache propagation tests ---
describe('Integration: ACK + messageCache propagation', () => {
beforeEach(() => {
messageCache.clear();
});
it('updateAck updates acked count on cached message', () => {
const messageCache = new ConversationMessageCache();
const msg: Message = {
id: 100,
type: 'PRIV',
@@ -362,7 +356,7 @@ describe('Integration: ACK + messageCache propagation', () => {
acked: 0,
sender_name: null,
};
messageCache.addMessage('pk_abc', msg, 'key-100');
messageCache.addMessage('pk_abc', msg);
messageCache.updateAck(100, 1);
@@ -372,6 +366,7 @@ describe('Integration: ACK + messageCache propagation', () => {
});
it('updateAck updates paths when longer', () => {
const messageCache = new ConversationMessageCache();
const msg: Message = {
id: 101,
type: 'PRIV',
@@ -387,7 +382,7 @@ describe('Integration: ACK + messageCache propagation', () => {
acked: 1,
sender_name: null,
};
messageCache.addMessage('pk_abc', msg, 'key-101');
messageCache.addMessage('pk_abc', msg);
const longerPaths = [
{ path: 'aa', received_at: 1700000001 },
@@ -401,6 +396,7 @@ describe('Integration: ACK + messageCache propagation', () => {
});
it('preserves higher existing ack count (max semantics)', () => {
const messageCache = new ConversationMessageCache();
const msg: Message = {
id: 102,
type: 'PRIV',
@@ -416,7 +412,7 @@ describe('Integration: ACK + messageCache propagation', () => {
acked: 5,
sender_name: null,
};
messageCache.addMessage('pk_abc', msg, 'key-102');
messageCache.addMessage('pk_abc', msg);
// Try to update with a lower ack count
messageCache.updateAck(102, 3);
@@ -426,6 +422,7 @@ describe('Integration: ACK + messageCache propagation', () => {
});
it('is a no-op for unknown message ID', () => {
const messageCache = new ConversationMessageCache();
const msg: Message = {
id: 103,
type: 'PRIV',
@@ -441,7 +438,7 @@ describe('Integration: ACK + messageCache propagation', () => {
acked: 0,
sender_name: null,
};
messageCache.addMessage('pk_abc', msg, 'key-103');
messageCache.addMessage('pk_abc', msg);
// Update a non-existent message ID — should not throw or modify anything
messageCache.updateAck(999, 1);

View File

@@ -3,8 +3,12 @@
*/
import { describe, it, expect, beforeEach } from 'vitest';
import * as messageCache from '../messageCache';
import { MAX_CACHED_CONVERSATIONS, MAX_MESSAGES_PER_ENTRY } from '../messageCache';
import {
ConversationMessageCache,
MAX_CACHED_CONVERSATIONS,
MAX_MESSAGES_PER_ENTRY,
reconcileConversationMessages,
} from '../hooks/useConversationMessages';
import type { Message } from '../types';
function createMessage(overrides: Partial<Message> = {}): Message {
@@ -27,16 +31,14 @@ function createMessage(overrides: Partial<Message> = {}): Message {
}
function createEntry(messages: Message[] = [], hasOlderMessages = false) {
const seenContent = new Set<string>();
for (const msg of messages) {
seenContent.add(`${msg.type}-${msg.conversation_key}-${msg.text}-${msg.sender_timestamp}`);
}
return { messages, seenContent, hasOlderMessages };
return { messages, hasOlderMessages };
}
describe('messageCache', () => {
let messageCache: ConversationMessageCache;
beforeEach(() => {
messageCache.clear();
messageCache = new ConversationMessageCache();
});
describe('get/set', () => {
@@ -155,11 +157,7 @@ describe('messageCache', () => {
messageCache.set('conv1', createEntry([]));
const msg = createMessage({ id: 10, text: 'New message' });
const result = messageCache.addMessage(
'conv1',
msg,
'CHAN-channel123-New message-1700000000'
);
const result = messageCache.addMessage('conv1', msg);
expect(result).toBe(true);
const entry = messageCache.get('conv1');
@@ -171,12 +169,11 @@ describe('messageCache', () => {
messageCache.set('conv1', createEntry([]));
const msg1 = createMessage({ id: 10, text: 'Hello' });
const contentKey = 'CHAN-channel123-Hello-1700000000';
expect(messageCache.addMessage('conv1', msg1, contentKey)).toBe(true);
expect(messageCache.addMessage('conv1', msg1)).toBe(true);
// Same content key, different message id
const msg2 = createMessage({ id: 11, text: 'Hello' });
expect(messageCache.addMessage('conv1', msg2, contentKey)).toBe(false);
expect(messageCache.addMessage('conv1', msg2)).toBe(false);
const entry = messageCache.get('conv1');
expect(entry!.messages).toHaveLength(1);
@@ -187,9 +184,7 @@ describe('messageCache', () => {
// Same id, different content key
const msg = createMessage({ id: 10, text: 'Different' });
expect(messageCache.addMessage('conv1', msg, 'CHAN-channel123-Different-1700000000')).toBe(
false
);
expect(messageCache.addMessage('conv1', msg)).toBe(false);
const entry = messageCache.get('conv1');
expect(entry!.messages).toHaveLength(1);
@@ -208,11 +203,7 @@ describe('messageCache', () => {
text: 'newest',
received_at: 1700000000 + MAX_MESSAGES_PER_ENTRY,
});
const result = messageCache.addMessage(
'conv1',
newMsg,
`CHAN-channel123-newest-${newMsg.sender_timestamp}`
);
const result = messageCache.addMessage('conv1', newMsg);
expect(result).toBe(true);
const entry = messageCache.get('conv1');
@@ -225,11 +216,7 @@ describe('messageCache', () => {
it('auto-creates a minimal entry for never-visited conversations and returns true', () => {
const msg = createMessage({ id: 10, text: 'First contact' });
const result = messageCache.addMessage(
'new_conv',
msg,
'CHAN-channel123-First contact-1700000000'
);
const result = messageCache.addMessage('new_conv', msg);
expect(result).toBe(true);
const entry = messageCache.get('new_conv');
@@ -237,7 +224,6 @@ describe('messageCache', () => {
expect(entry!.messages).toHaveLength(1);
expect(entry!.messages[0].text).toBe('First contact');
expect(entry!.hasOlderMessages).toBe(true);
expect(entry!.seenContent.has('CHAN-channel123-First contact-1700000000')).toBe(true);
});
it('promotes entry to MRU on addMessage', () => {
@@ -248,7 +234,7 @@ describe('messageCache', () => {
// addMessage to conv0 (currently LRU) should promote it
const msg = createMessage({ id: 999, text: 'Incoming WS message' });
messageCache.addMessage('conv0', msg, 'CHAN-channel123-Incoming WS message-1700000000');
messageCache.addMessage('conv0', msg);
// Add one more — conv1 should now be LRU and get evicted, not conv0
messageCache.set('conv_new', createEntry());
@@ -259,11 +245,10 @@ describe('messageCache', () => {
it('returns false for duplicate delivery to auto-created entry', () => {
const msg = createMessage({ id: 10, text: 'Echo' });
const contentKey = 'CHAN-channel123-Echo-1700000000';
expect(messageCache.addMessage('new_conv', msg, contentKey)).toBe(true);
expect(messageCache.addMessage('new_conv', msg)).toBe(true);
// Duplicate via mesh echo
expect(messageCache.addMessage('new_conv', msg, contentKey)).toBe(false);
expect(messageCache.addMessage('new_conv', msg)).toBe(false);
const entry = messageCache.get('new_conv');
expect(entry!.messages).toHaveLength(1);
@@ -358,7 +343,7 @@ describe('messageCache', () => {
createMessage({ id: 3, acked: 1 }),
];
expect(messageCache.reconcile(msgs, fetched)).toBeNull();
expect(reconcileConversationMessages(msgs, fetched)).toBeNull();
});
it('detects new messages missing from cache', () => {
@@ -369,7 +354,7 @@ describe('messageCache', () => {
createMessage({ id: 3, text: 'missed via WS' }),
];
const merged = messageCache.reconcile(current, fetched);
const merged = reconcileConversationMessages(current, fetched);
expect(merged).not.toBeNull();
expect(merged!.map((m) => m.id)).toEqual([1, 2, 3]);
});
@@ -378,7 +363,7 @@ describe('messageCache', () => {
const current = [createMessage({ id: 1, acked: 0 })];
const fetched = [createMessage({ id: 1, acked: 3 })];
const merged = messageCache.reconcile(current, fetched);
const merged = reconcileConversationMessages(current, fetched);
expect(merged).not.toBeNull();
expect(merged![0].acked).toBe(3);
});
@@ -397,20 +382,20 @@ describe('messageCache', () => {
createMessage({ id: 2 }),
];
const merged = messageCache.reconcile(current, fetched);
const merged = reconcileConversationMessages(current, fetched);
expect(merged).not.toBeNull();
// Should have fetched page + older paginated message
expect(merged!.map((m) => m.id)).toEqual([4, 3, 2, 1]);
});
it('returns null for empty fetched and empty current', () => {
expect(messageCache.reconcile([], [])).toBeNull();
expect(reconcileConversationMessages([], [])).toBeNull();
});
it('detects difference when current is empty but fetch has messages', () => {
const fetched = [createMessage({ id: 1 })];
const merged = messageCache.reconcile([], fetched);
const merged = reconcileConversationMessages([], fetched);
expect(merged).not.toBeNull();
expect(merged!).toHaveLength(1);
});
@@ -430,7 +415,7 @@ describe('messageCache', () => {
}),
];
const merged = messageCache.reconcile(current, fetched);
const merged = reconcileConversationMessages(current, fetched);
expect(merged).not.toBeNull();
expect(merged![0].paths).toHaveLength(2);
});
@@ -439,7 +424,7 @@ describe('messageCache', () => {
const current = [createMessage({ id: 1, text: '[encrypted]' })];
const fetched = [createMessage({ id: 1, text: 'Hello world' })];
const merged = messageCache.reconcile(current, fetched);
const merged = reconcileConversationMessages(current, fetched);
expect(merged).not.toBeNull();
expect(merged![0].text).toBe('Hello world');
});
@@ -449,7 +434,7 @@ describe('messageCache', () => {
const current = [createMessage({ id: 1, acked: 2, paths, text: 'Hello' })];
const fetched = [createMessage({ id: 1, acked: 2, paths, text: 'Hello' })];
expect(messageCache.reconcile(current, fetched)).toBeNull();
expect(reconcileConversationMessages(current, fetched)).toBeNull();
});
});
});

View File

@@ -35,11 +35,6 @@ vi.mock('../components/ui/sonner', () => ({
toast: { success: vi.fn(), error: vi.fn() },
}));
// Mock messageCache
vi.mock('../messageCache', () => ({
remove: vi.fn(),
}));
function makeContact(suffix: string): Contact {
const key = suffix.padStart(64, '0');
return {
@@ -69,6 +64,7 @@ function makeContacts(count: number, startIndex = 0): Contact[] {
describe('useContactsAndChannels', () => {
const setActiveConversation = vi.fn();
const removeConversationMessages = vi.fn();
const pendingDeleteFallbackRef = { current: false };
const hasSetDefaultConversation = { current: false };
@@ -88,6 +84,7 @@ describe('useContactsAndChannels', () => {
setActiveConversation,
pendingDeleteFallbackRef,
hasSetDefaultConversation,
removeConversationMessages,
})
);
}

View File

@@ -63,7 +63,7 @@ function createArgs(overrides: Partial<Parameters<typeof useConversationActions>
activeConversationRef: { current: activeConversation },
setContacts: vi.fn(),
setChannels: vi.fn(),
addMessageIfNew: vi.fn(() => true),
observeMessage: vi.fn(() => ({ added: true, activeConversation: true })),
messageInputRef: { current: { appendText: vi.fn() } },
...overrides,
};
@@ -85,7 +85,7 @@ describe('useConversationActions', () => {
});
expect(mocks.api.sendChannelMessage).toHaveBeenCalledWith(publicChannel.key, sentMessage.text);
expect(args.addMessageIfNew).toHaveBeenCalledWith(sentMessage);
expect(args.observeMessage).toHaveBeenCalledWith(sentMessage);
});
it('does not append a sent message after the active conversation changes', async () => {
@@ -111,7 +111,7 @@ describe('useConversationActions', () => {
await sendPromise;
});
expect(args.addMessageIfNew).not.toHaveBeenCalled();
expect(args.observeMessage).not.toHaveBeenCalled();
});
it('appends sender mentions into the message input', () => {
@@ -146,7 +146,7 @@ describe('useConversationActions', () => {
});
expect(mocks.api.resendChannelMessage).toHaveBeenCalledWith(sentMessage.id, true);
expect(args.addMessageIfNew).toHaveBeenCalledWith(resentMessage);
expect(args.observeMessage).toHaveBeenCalledWith(resentMessage);
});
it('does not append a byte-perfect resend locally', async () => {
@@ -162,7 +162,7 @@ describe('useConversationActions', () => {
await result.current.handleResendChannelMessage(sentMessage.id, false);
});
expect(args.addMessageIfNew).not.toHaveBeenCalled();
expect(args.observeMessage).not.toHaveBeenCalled();
});
it('does not append a resend if the user has switched conversations', async () => {
@@ -190,7 +190,7 @@ describe('useConversationActions', () => {
await resendPromise;
});
expect(args.addMessageIfNew).not.toHaveBeenCalled();
expect(args.observeMessage).not.toHaveBeenCalled();
});
it('merges returned contact data after path discovery', async () => {

View File

@@ -1,9 +1,11 @@
import { act, renderHook, waitFor } from '@testing-library/react';
import { beforeEach, describe, expect, it, vi, type Mock } from 'vitest';
import * as messageCache from '../messageCache';
import { api } from '../api';
import { useConversationMessages } from '../hooks/useConversationMessages';
import {
conversationMessageCache,
useConversationMessages,
} from '../hooks/useConversationMessages';
import type { Conversation, Message } from '../types';
const mockGetMessages = vi.fn<typeof api.getMessages>();
@@ -62,7 +64,7 @@ function createDeferred<T>() {
describe('useConversationMessages ACK ordering', () => {
beforeEach(() => {
mockGetMessages.mockReset();
messageCache.clear();
conversationMessageCache.clear();
mockToastError.mockReset();
});
@@ -76,11 +78,11 @@ describe('useConversationMessages ACK ordering', () => {
const paths = [{ path: 'A1B2', received_at: 1700000010 }];
act(() => {
result.current.updateMessageAck(42, 2, paths);
result.current.receiveMessageAck(42, 2, paths);
});
act(() => {
const added = result.current.addMessageIfNew(
const { added } = result.current.observeMessage(
createMessage({ id: 42, acked: 0, paths: null })
);
expect(added).toBe(true);
@@ -100,7 +102,7 @@ describe('useConversationMessages ACK ordering', () => {
const paths = [{ path: 'C3D4', received_at: 1700000011 }];
act(() => {
result.current.updateMessageAck(42, 1, paths);
result.current.receiveMessageAck(42, 1, paths);
});
deferred.resolve([createMessage({ id: 42, acked: 0, paths: null })]);
@@ -118,7 +120,7 @@ describe('useConversationMessages ACK ordering', () => {
await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1));
act(() => {
const added = result.current.addMessageIfNew(
const { added } = result.current.observeMessage(
createMessage({
id: 99,
text: 'ws-arrived',
@@ -153,7 +155,7 @@ describe('useConversationMessages ACK ordering', () => {
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
act(() => {
result.current.addMessageIfNew(createMessage({ id: 42, acked: 0, paths: null }));
result.current.observeMessage(createMessage({ id: 42, acked: 0, paths: null }));
});
const highAckPaths = [
@@ -163,8 +165,8 @@ describe('useConversationMessages ACK ordering', () => {
const staleAckPaths = [{ path: 'A1B2', received_at: 1700000010 }];
act(() => {
result.current.updateMessageAck(42, 3, highAckPaths);
result.current.updateMessageAck(42, 2, staleAckPaths);
result.current.receiveMessageAck(42, 3, highAckPaths);
result.current.receiveMessageAck(42, 2, staleAckPaths);
});
expect(result.current.messages[0].acked).toBe(3);
@@ -175,7 +177,7 @@ describe('useConversationMessages ACK ordering', () => {
describe('useConversationMessages conversation switch', () => {
beforeEach(() => {
mockGetMessages.mockReset();
messageCache.clear();
conversationMessageCache.clear();
});
it('resets loadingOlder when switching conversations mid-fetch', async () => {
@@ -300,7 +302,7 @@ describe('useConversationMessages conversation switch', () => {
describe('useConversationMessages background reconcile ordering', () => {
beforeEach(() => {
mockGetMessages.mockReset();
messageCache.clear();
conversationMessageCache.clear();
});
it('ignores stale reconnect reconcile responses that finish after newer ones', async () => {
@@ -321,8 +323,8 @@ describe('useConversationMessages background reconcile ordering', () => {
.mockReturnValueOnce(secondReconcile.promise);
act(() => {
result.current.triggerReconcile();
result.current.triggerReconcile();
result.current.reconcileOnReconnect();
result.current.reconcileOnReconnect();
});
secondReconcile.resolve([createMessage({ id: 42, text: 'newer snapshot', acked: 2 })]);
@@ -342,11 +344,8 @@ describe('useConversationMessages background reconcile ordering', () => {
const conv = createConversation();
const cachedMessage = createMessage({ id: 42, text: 'cached snapshot' });
messageCache.set(conv.id, {
conversationMessageCache.set(conv.id, {
messages: [cachedMessage],
seenContent: new Set([
`PRIV-${cachedMessage.conversation_key}-${cachedMessage.text}-${cachedMessage.sender_timestamp}`,
]),
hasOlderMessages: true,
});
@@ -365,7 +364,7 @@ describe('useConversationMessages background reconcile ordering', () => {
describe('useConversationMessages older-page dedup and reentry', () => {
beforeEach(() => {
mockGetMessages.mockReset();
messageCache.clear();
conversationMessageCache.clear();
});
it('prevents duplicate overlapping older-page fetches in the same tick', async () => {
@@ -511,7 +510,7 @@ describe('useConversationMessages forward pagination', () => {
beforeEach(() => {
mockGetMessages.mockReset();
mockGetMessagesAround.mockReset();
messageCache.clear();
conversationMessageCache.clear();
mockToastError.mockReset();
});
@@ -596,7 +595,7 @@ describe('useConversationMessages forward pagination', () => {
// Simulate WS adding a message with the same content key
act(() => {
result.current.addMessageIfNew(
result.current.observeMessage(
createMessage({
id: 2,
conversation_key: 'ch1',
@@ -627,6 +626,70 @@ describe('useConversationMessages forward pagination', () => {
expect(dupes).toHaveLength(1);
});
it('defers reconnect reconcile until forward pagination reaches the live tail', async () => {
const conv: Conversation = { type: 'channel', id: 'ch1', name: 'Channel' };
mockGetMessagesAround.mockResolvedValueOnce({
messages: [
createMessage({
id: 1,
conversation_key: 'ch1',
text: 'older-context',
sender_timestamp: 1700000000,
received_at: 1700000000,
}),
],
has_older: false,
has_newer: true,
});
const { result } = renderHook(
({ conv, target }: { conv: Conversation; target: number | null }) =>
useConversationMessages(conv, target),
{ initialProps: { conv, target: 1 } }
);
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
expect(result.current.hasNewerMessages).toBe(true);
act(() => {
result.current.reconcileOnReconnect();
});
expect(mockGetMessages).not.toHaveBeenCalled();
mockGetMessages
.mockResolvedValueOnce([
createMessage({
id: 2,
conversation_key: 'ch1',
text: 'newer-page',
sender_timestamp: 1700000001,
received_at: 1700000001,
}),
])
.mockResolvedValueOnce([
createMessage({
id: 2,
conversation_key: 'ch1',
text: 'newer-page',
sender_timestamp: 1700000001,
received_at: 1700000001,
acked: 3,
}),
]);
await act(async () => {
await result.current.fetchNewerMessages();
});
await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(2));
await waitFor(() =>
expect(result.current.messages.find((message) => message.id === 2)?.acked).toBe(3)
);
expect(result.current.hasNewerMessages).toBe(false);
});
it('jumpToBottom clears hasNewerMessages and refetches latest', async () => {
const conv: Conversation = { type: 'channel', id: 'ch1', name: 'Channel' };
@@ -677,6 +740,55 @@ describe('useConversationMessages forward pagination', () => {
expect(result.current.messages[0].text).toBe('latest-msg');
});
it('jumpToBottom clears deferred reconnect reconcile without an extra reconcile fetch', async () => {
const conv: Conversation = { type: 'channel', id: 'ch1', name: 'Channel' };
mockGetMessagesAround.mockResolvedValueOnce({
messages: [
createMessage({
id: 5,
conversation_key: 'ch1',
text: 'around-msg',
sender_timestamp: 1700000005,
received_at: 1700000005,
}),
],
has_older: true,
has_newer: true,
});
const { result } = renderHook(
({ conv, target }: { conv: Conversation; target: number | null }) =>
useConversationMessages(conv, target),
{ initialProps: { conv, target: 5 } }
);
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
act(() => {
result.current.reconcileOnReconnect();
});
mockGetMessages.mockResolvedValueOnce([
createMessage({
id: 10,
conversation_key: 'ch1',
text: 'latest-msg',
sender_timestamp: 1700000010,
received_at: 1700000010,
}),
]);
act(() => {
result.current.jumpToBottom();
});
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1));
expect(result.current.messages[0].text).toBe('latest-msg');
expect(result.current.hasNewerMessages).toBe(false);
});
it('aborts stale newer-page requests on conversation switch without toasting', async () => {
const convA: Conversation = { type: 'channel', id: 'ch1', name: 'Channel A' };
const convB: Conversation = { type: 'channel', id: 'ch2', name: 'Channel B' };

View File

@@ -5,7 +5,8 @@
*/
import { describe, it, expect } from 'vitest';
import { getMessageContentKey, mergePendingAck } from '../hooks/useConversationMessages';
import { mergePendingAck } from '../hooks/useConversationMessages';
import { getMessageContentKey } from '../utils/messageIdentity';
import type { Message } from '../types';
function createMessage(overrides: Partial<Message> = {}): Message {

View File

@@ -12,12 +12,6 @@ const mocks = vi.hoisted(() => ({
success: vi.fn(),
error: vi.fn(),
},
messageCache: {
addMessage: vi.fn(),
remove: vi.fn(),
rename: vi.fn(),
updateAck: vi.fn(),
},
}));
vi.mock('../api', () => ({
@@ -28,8 +22,6 @@ vi.mock('../components/ui/sonner', () => ({
toast: mocks.toast,
}));
vi.mock('../messageCache', () => mocks.messageCache);
const publicChannel: Channel = {
key: '8B3387E9C5CDEA6AC9E5EDBAA115CD72',
name: 'Public',
@@ -66,7 +58,7 @@ function createRealtimeArgs(overrides: Partial<Parameters<typeof useRealtimeAppS
setHealth,
fetchConfig: vi.fn(),
setRawPackets,
triggerReconcile: vi.fn(),
reconcileOnReconnect: vi.fn(),
refreshUnreads: vi.fn(async () => {}),
setChannels,
fetchAllContacts: vi.fn(async () => [] as Contact[]),
@@ -74,15 +66,15 @@ function createRealtimeArgs(overrides: Partial<Parameters<typeof useRealtimeAppS
blockedKeysRef: { current: [] as string[] },
blockedNamesRef: { current: [] as string[] },
activeConversationRef: { current: null as Conversation | null },
hasNewerMessagesRef: { current: false },
addMessageIfNew: vi.fn(),
trackNewMessage: vi.fn(),
incrementUnread: vi.fn(),
observeMessage: vi.fn(() => ({ added: false, activeConversation: false })),
recordMessageEvent: vi.fn(),
renameConversationState: vi.fn(),
checkMention: vi.fn(() => false),
pendingDeleteFallbackRef: { current: false },
setActiveConversation: vi.fn(),
updateMessageAck: vi.fn(),
renameConversationMessages: vi.fn(),
removeConversationMessages: vi.fn(),
receiveMessageAck: vi.fn(),
notifyIncomingMessage: vi.fn(),
...overrides,
},
@@ -133,7 +125,7 @@ describe('useRealtimeAppState', () => {
});
await waitFor(() => {
expect(args.triggerReconcile).toHaveBeenCalledTimes(1);
expect(args.reconcileOnReconnect).toHaveBeenCalledTimes(1);
expect(args.refreshUnreads).toHaveBeenCalledTimes(1);
expect(mocks.api.getChannels).toHaveBeenCalledTimes(1);
expect(args.fetchAllContacts).toHaveBeenCalledTimes(1);
@@ -166,14 +158,6 @@ describe('useRealtimeAppState', () => {
const { args, fns } = createRealtimeArgs({
fetchAllContacts: vi.fn(async () => contacts),
activeConversationRef: {
current: {
type: 'channel',
id: publicChannel.key,
name: publicChannel.name,
} satisfies Conversation,
},
hasNewerMessagesRef: { current: true },
});
const { result } = renderHook(() => useRealtimeAppState(args));
@@ -183,7 +167,7 @@ describe('useRealtimeAppState', () => {
});
await waitFor(() => {
expect(args.triggerReconcile).not.toHaveBeenCalled();
expect(args.reconcileOnReconnect).toHaveBeenCalledTimes(1);
expect(args.refreshUnreads).toHaveBeenCalledTimes(1);
expect(mocks.api.getChannels).toHaveBeenCalledTimes(1);
expect(args.fetchAllContacts).toHaveBeenCalledTimes(1);
@@ -194,9 +178,9 @@ describe('useRealtimeAppState', () => {
});
it('tracks unread state for a new non-active incoming message', () => {
mocks.messageCache.addMessage.mockReturnValue(true);
const { args } = createRealtimeArgs({
checkMention: vi.fn(() => true),
observeMessage: vi.fn(() => ({ added: true, activeConversation: false })),
});
const { result } = renderHook(() => useRealtimeAppState(args));
@@ -205,17 +189,13 @@ describe('useRealtimeAppState', () => {
result.current.onMessage?.(incomingDm);
});
expect(args.addMessageIfNew).not.toHaveBeenCalled();
expect(args.trackNewMessage).toHaveBeenCalledWith(incomingDm);
expect(mocks.messageCache.addMessage).toHaveBeenCalledWith(
incomingDm.conversation_key,
incomingDm,
expect.any(String)
);
expect(args.incrementUnread).toHaveBeenCalledWith(
`contact-${incomingDm.conversation_key}`,
true
);
expect(args.observeMessage).toHaveBeenCalledWith(incomingDm);
expect(args.recordMessageEvent).toHaveBeenCalledWith({
msg: incomingDm,
activeConversation: false,
isNewMessage: true,
hasMention: true,
});
expect(args.notifyIncomingMessage).toHaveBeenCalledWith(incomingDm);
});
@@ -240,7 +220,7 @@ describe('useRealtimeAppState', () => {
});
expect(fns.setContacts).toHaveBeenCalledWith(expect.any(Function));
expect(mocks.messageCache.remove).toHaveBeenCalledWith(incomingDm.conversation_key);
expect(args.removeConversationMessages).toHaveBeenCalledWith(incomingDm.conversation_key);
expect(args.setActiveConversation).toHaveBeenCalledWith(null);
expect(pendingDeleteFallbackRef.current).toBe(true);
});
@@ -282,7 +262,7 @@ describe('useRealtimeAppState', () => {
});
expect(fns.setContacts).toHaveBeenCalledWith(expect.any(Function));
expect(mocks.messageCache.rename).toHaveBeenCalledWith(
expect(args.renameConversationMessages).toHaveBeenCalledWith(
previousPublicKey,
resolvedContact.public_key
);

View File

@@ -10,7 +10,8 @@ import { act, renderHook } from '@testing-library/react';
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { useUnreadCounts } from '../hooks/useUnreadCounts';
import type { Channel, Contact, Conversation } from '../types';
import type { Channel, Contact, Conversation, Message } from '../types';
import { getStateKey } from '../utils/conversationState';
// Mock api module
vi.mock('../api', () => ({
@@ -57,6 +58,25 @@ function makeContact(pubkey: string): Contact {
};
}
function makeMessage(overrides: Partial<Message> = {}): Message {
return {
id: 1,
type: 'PRIV',
conversation_key: CONTACT_KEY,
text: 'hello',
sender_timestamp: 1700000000,
received_at: 1700000001,
paths: null,
txt_type: 0,
signature: null,
sender_key: null,
outgoing: false,
acked: 0,
sender_name: null,
...overrides,
};
}
const CHANNEL_KEY = 'AABB00112233445566778899AABBCCDD';
const CONTACT_KEY = '00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff';
@@ -332,4 +352,74 @@ describe('useUnreadCounts', () => {
// Raw view doesn't filter any conversation's unreads
expect(result.current.unreadCounts[`channel-${CHANNEL_KEY}`]).toBe(5);
});
it('recordMessageEvent updates last-message time and unread count for new inactive incoming messages', async () => {
const mocks = await getMockedApi();
const { result } = renderWith({});
await act(async () => {
await vi.waitFor(() => expect(mocks.getUnreads).toHaveBeenCalled());
});
const msg = makeMessage({
id: 5,
type: 'CHAN',
conversation_key: CHANNEL_KEY,
received_at: 1700001234,
});
await act(async () => {
result.current.recordMessageEvent({
msg,
activeConversation: false,
isNewMessage: true,
hasMention: true,
});
});
expect(result.current.unreadCounts[getStateKey('channel', CHANNEL_KEY)]).toBe(1);
expect(result.current.mentions[getStateKey('channel', CHANNEL_KEY)]).toBe(true);
expect(result.current.lastMessageTimes[getStateKey('channel', CHANNEL_KEY)]).toBe(1700001234);
});
it('recordMessageEvent skips unread increment for active or non-new messages but still tracks time', async () => {
const mocks = await getMockedApi();
const { result } = renderWith({});
await act(async () => {
await vi.waitFor(() => expect(mocks.getUnreads).toHaveBeenCalled());
});
const activeMsg = makeMessage({
id: 6,
type: 'PRIV',
conversation_key: CONTACT_KEY,
received_at: 1700002000,
});
await act(async () => {
result.current.recordMessageEvent({
msg: activeMsg,
activeConversation: true,
isNewMessage: true,
hasMention: true,
});
result.current.recordMessageEvent({
msg: makeMessage({
id: 7,
type: 'CHAN',
conversation_key: CHANNEL_KEY,
received_at: 1700002001,
}),
activeConversation: false,
isNewMessage: false,
hasMention: true,
});
});
expect(result.current.unreadCounts[getStateKey('contact', CONTACT_KEY)]).toBeUndefined();
expect(result.current.unreadCounts[getStateKey('channel', CHANNEL_KEY)]).toBeUndefined();
expect(result.current.lastMessageTimes[getStateKey('contact', CONTACT_KEY)]).toBe(1700002000);
expect(result.current.lastMessageTimes[getStateKey('channel', CHANNEL_KEY)]).toBe(1700002001);
});
});

View File

@@ -0,0 +1,10 @@
import type { Message } from '../types';
// Content identity matches the frontend's message-level dedup contract.
export function getMessageContentKey(msg: Message): string {
// When sender_timestamp exists, dedup by content (catches radio-path duplicates with different IDs).
// When null, include msg.id so each message gets a unique key — avoids silently dropping
// different messages that share the same text and received_at second.
const ts = msg.sender_timestamp ?? `r${msg.received_at}-${msg.id}`;
return `${msg.type}-${msg.conversation_key}-${msg.text}-${ts}`;
}

View File

@@ -7,7 +7,7 @@ paths (packet_processor + event_handler fallback) don't double-store messages.
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import MagicMock, patch
import pytest
@@ -410,7 +410,8 @@ class TestDualPathDedup:
1. Primary: RX_LOG_DATA → packet_processor (decrypts with private key)
2. Fallback: CONTACT_MSG_RECV → on_contact_message (MeshCore library decoded)
The fallback uses INSERT OR IGNORE to avoid double-storage when both fire.
The fallback path should reconcile against the packet path instead of creating
a second row, and should still add new path observations when available.
"""
@pytest.mark.asyncio
@@ -457,19 +458,7 @@ class TestDualPathDedup:
"sender_timestamp": SENDER_TIMESTAMP,
}
# Mock contact lookup to return a contact with the right key
mock_contact = MagicMock()
mock_contact.public_key = CONTACT_PUB
mock_contact.type = 1 # Client, not repeater
mock_contact.name = "TestContact"
with (
patch("app.event_handlers.ContactRepository") as mock_contact_repo,
patch("app.event_handlers.broadcast_event", mock_broadcast),
):
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact)
mock_contact_repo.update_last_contacted = AsyncMock()
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(mock_event)
# No additional message broadcast should have been sent
@@ -538,18 +527,7 @@ class TestDualPathDedup:
"sender_timestamp": SENDER_TIMESTAMP,
}
mock_contact = MagicMock()
mock_contact.public_key = upper_key # Uppercase from DB
mock_contact.type = 1
mock_contact.name = "TestContact"
with (
patch("app.event_handlers.ContactRepository") as mock_contact_repo,
patch("app.event_handlers.broadcast_event", mock_broadcast),
):
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact)
mock_contact_repo.update_last_contacted = AsyncMock()
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(mock_event)
# Should NOT create a second message (dedup catches it thanks to .lower())
@@ -564,6 +542,146 @@ class TestDualPathDedup:
)
assert len(messages) == 1
@pytest.mark.asyncio
async def test_event_handler_duplicate_adds_path_to_existing_dm(
self, test_db, captured_broadcasts
):
"""Fallback DM duplicates should reconcile path updates onto the stored message."""
from app.event_handlers import on_contact_message
from app.packet_processor import create_dm_message_from_decrypted
await ContactRepository.upsert(
{
"public_key": CONTACT_PUB.lower(),
"name": "TestContact",
"type": 1,
"last_seen": SENDER_TIMESTAMP,
"last_contacted": SENDER_TIMESTAMP,
"first_seen": SENDER_TIMESTAMP,
"on_radio": False,
"out_path_hash_mode": 0,
}
)
pkt_id, _ = await RawPacketRepository.create(b"primary_with_no_path", SENDER_TIMESTAMP)
decrypted = DecryptedDirectMessage(
timestamp=SENDER_TIMESTAMP,
flags=0,
message="Dual path with route update",
dest_hash="fa",
src_hash="a1",
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
msg_id = await create_dm_message_from_decrypted(
packet_id=pkt_id,
decrypted=decrypted,
their_public_key=CONTACT_PUB,
our_public_key=OUR_PUB,
received_at=SENDER_TIMESTAMP,
outgoing=False,
)
assert msg_id is not None
broadcasts.clear()
mock_event = MagicMock()
mock_event.payload = {
"public_key": CONTACT_PUB,
"text": "Dual path with route update",
"txt_type": 0,
"sender_timestamp": SENDER_TIMESTAMP,
"path": "bbcc",
"path_len": 2,
}
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(mock_event)
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert message_broadcasts == []
ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"]
assert len(ack_broadcasts) == 1
assert ack_broadcasts[0]["data"]["message_id"] == msg_id
assert ack_broadcasts[0]["data"]["ack_count"] == 0
assert any(p["path"] == "bbcc" for p in ack_broadcasts[0]["data"]["paths"])
msg = await MessageRepository.get_by_id(msg_id)
assert msg is not None
assert msg.paths is not None
assert any(p.path == "bbcc" for p in msg.paths)
@pytest.mark.asyncio
async def test_fallback_path_duplicate_reconciles_path_without_new_row(
self, test_db, captured_broadcasts
):
"""Repeated fallback DMs should keep one row and merge path observations."""
from app.event_handlers import on_contact_message
await ContactRepository.upsert(
{
"public_key": CONTACT_PUB.lower(),
"name": "FallbackOnly",
"type": 1,
"last_seen": SENDER_TIMESTAMP,
"last_contacted": SENDER_TIMESTAMP,
"first_seen": SENDER_TIMESTAMP,
"on_radio": False,
"out_path_hash_mode": 0,
}
)
broadcasts, mock_broadcast = captured_broadcasts
first_event = MagicMock()
first_event.payload = {
"public_key": CONTACT_PUB,
"text": "Fallback duplicate route test",
"txt_type": 0,
"sender_timestamp": SENDER_TIMESTAMP,
}
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(first_event)
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10
)
assert len(messages) == 1
msg_id = messages[0].id
broadcasts.clear()
second_event = MagicMock()
second_event.payload = {
"public_key": CONTACT_PUB,
"text": "Fallback duplicate route test",
"txt_type": 0,
"sender_timestamp": SENDER_TIMESTAMP,
"path": "ddee",
"path_len": 2,
}
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(second_event)
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10
)
assert len(messages) == 1
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert message_broadcasts == []
ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"]
assert len(ack_broadcasts) == 1
assert ack_broadcasts[0]["data"]["message_id"] == msg_id
assert ack_broadcasts[0]["data"]["ack_count"] == 0
assert any(p["path"] == "ddee" for p in ack_broadcasts[0]["data"]["paths"])
class TestDirectMessageDirectionDetection:
"""Test src_hash/dest_hash direction detection in _process_direct_message.