diff --git a/frontend/src/hooks/useConversationMessages.ts b/frontend/src/hooks/useConversationMessages.ts index 2280f2b..53ba4e0 100644 --- a/frontend/src/hooks/useConversationMessages.ts +++ b/frontend/src/hooks/useConversationMessages.ts @@ -5,6 +5,48 @@ import * as messageCache from '../messageCache'; import type { Conversation, Message, MessagePath } from '../types'; const MESSAGE_PAGE_SIZE = 200; +const MAX_PENDING_ACKS = 500; + +interface PendingAckUpdate { + ackCount: number; + paths?: MessagePath[]; +} + +function mergePendingAck( + existing: PendingAckUpdate | undefined, + ackCount: number, + paths?: MessagePath[] +): PendingAckUpdate { + if (!existing) { + return { + ackCount, + ...(paths !== undefined && { paths }), + }; + } + + if (ackCount > existing.ackCount) { + return { + ackCount, + ...(paths !== undefined && { paths }), + ...(paths === undefined && existing.paths !== undefined && { paths: existing.paths }), + }; + } + + if (ackCount < existing.ackCount) { + return existing; + } + + if (paths === undefined) { + return existing; + } + + const existingPathCount = existing.paths?.length ?? -1; + if (paths.length >= existingPathCount) { + return { ackCount, paths }; + } + + return existing; +} // Generate a key for deduplicating messages by content export function getMessageContentKey(msg: Message): string { @@ -34,6 +76,10 @@ export function useConversationMessages( // Track seen message content for deduplication const seenMessageContent = useRef>(new Set()); + // ACK events can arrive before the corresponding message event/response. + // Buffer latest ACK state by message_id and apply when the message arrives. + const pendingAcksRef = useRef>(new Map()); + // AbortController for cancelling in-flight requests on conversation change const abortControllerRef = useRef(null); @@ -54,6 +100,35 @@ export function useConversationMessages( hasOlderMessagesRef.current = hasOlderMessages; }, [hasOlderMessages]); + const setPendingAck = useCallback((messageId: number, ackCount: number, paths?: MessagePath[]) => { + const existing = pendingAcksRef.current.get(messageId); + const merged = mergePendingAck(existing, ackCount, paths); + + // Update insertion order so most recent updates remain in the buffer longest. + pendingAcksRef.current.delete(messageId); + pendingAcksRef.current.set(messageId, merged); + + if (pendingAcksRef.current.size > MAX_PENDING_ACKS) { + const oldestMessageId = pendingAcksRef.current.keys().next().value as number | undefined; + if (oldestMessageId !== undefined) { + pendingAcksRef.current.delete(oldestMessageId); + } + } + }, []); + + const applyPendingAck = useCallback((msg: Message): Message => { + const pending = pendingAcksRef.current.get(msg.id); + if (!pending) return msg; + + pendingAcksRef.current.delete(msg.id); + + return { + ...msg, + acked: Math.max(msg.acked, pending.ackCount), + ...(pending.paths !== undefined && { paths: pending.paths }), + }; + }, []); + // Fetch messages for active conversation // Note: This is called manually and from the useEffect. The useEffect handles // cancellation via AbortController; manual calls (e.g., after sending a message) @@ -91,14 +166,15 @@ export function useConversationMessages( return; } - setMessages(data); + const messagesWithPendingAck = data.map((msg) => applyPendingAck(msg)); + setMessages(messagesWithPendingAck); // Track seen content for new messages seenMessageContent.current.clear(); - for (const msg of data) { + for (const msg of messagesWithPendingAck) { seenMessageContent.current.add(getMessageContentKey(msg)); } // If we got a full page, there might be more - setHasOlderMessages(data.length >= MESSAGE_PAGE_SIZE); + setHasOlderMessages(messagesWithPendingAck.length >= MESSAGE_PAGE_SIZE); } catch (err) { // Don't show error toast for aborted requests (user switched conversations) if (isAbortError(err)) { @@ -114,7 +190,7 @@ export function useConversationMessages( } } }, - [activeConversation] + [activeConversation, applyPendingAck] ); // Fetch older messages (cursor-based pagination) @@ -148,17 +224,18 @@ export function useConversationMessages( before: oldestMessage.received_at, before_id: oldestMessage.id, }); + const dataWithPendingAck = data.map((msg) => applyPendingAck(msg)); - if (data.length > 0) { + if (dataWithPendingAck.length > 0) { // Prepend older messages (they come sorted DESC, so older are at the end) - setMessages((prev) => [...prev, ...data]); + setMessages((prev) => [...prev, ...dataWithPendingAck]); // Track seen content - for (const msg of data) { + for (const msg of dataWithPendingAck) { seenMessageContent.current.add(getMessageContentKey(msg)); } } // If we got less than a full page, no more messages - setHasOlderMessages(data.length >= MESSAGE_PAGE_SIZE); + setHasOlderMessages(dataWithPendingAck.length >= MESSAGE_PAGE_SIZE); } catch (err) { console.error('Failed to fetch older messages:', err); toast.error('Failed to load older messages', { @@ -167,7 +244,7 @@ export function useConversationMessages( } finally { setLoadingOlder(false); } - }, [activeConversation, loadingOlder, hasOlderMessages, messages]); + }, [activeConversation, loadingOlder, hasOlderMessages, messages, applyPendingAck]); // Background reconciliation: silently fetch from backend after a cache restore // and only update state if something differs (missed WS message, stale ack, etc.). @@ -187,7 +264,8 @@ export function useConversationMessages( // Stale check — conversation may have changed while awaiting if (fetchingConversationIdRef.current !== conversationId) return; - const merged = messageCache.reconcile(messagesRef.current, data); + const dataWithPendingAck = data.map((msg) => applyPendingAck(msg)); + const merged = messageCache.reconcile(messagesRef.current, dataWithPendingAck); if (!merged) return; // Cache was consistent — no rerender setMessages(merged); @@ -195,7 +273,7 @@ export function useConversationMessages( for (const msg of merged) { seenMessageContent.current.add(getMessageContentKey(msg)); } - if (data.length >= MESSAGE_PAGE_SIZE) { + if (dataWithPendingAck.length >= MESSAGE_PAGE_SIZE) { setHasOlderMessages(true); } }) @@ -268,7 +346,8 @@ export function useConversationMessages( // Add a message if it's new (deduplication) // Returns true if the message was added, false if it was a duplicate const addMessageIfNew = useCallback((msg: Message): boolean => { - const contentKey = getMessageContentKey(msg); + const msgWithPendingAck = applyPendingAck(msg); + const contentKey = getMessageContentKey(msgWithPendingAck); if (seenMessageContent.current.has(contentKey)) { console.debug('Duplicate message content ignored:', contentKey.slice(0, 50)); return false; @@ -282,33 +361,48 @@ export function useConversationMessages( } setMessages((prev) => { - if (prev.some((m) => m.id === msg.id)) { + if (prev.some((m) => m.id === msgWithPendingAck.id)) { return prev; } - return [...prev, msg]; + return [...prev, msgWithPendingAck]; }); return true; - }, []); + }, [applyPendingAck]); // Update a message's ack count and paths const updateMessageAck = useCallback( (messageId: number, ackCount: number, paths?: MessagePath[]) => { + const hasMessageLoaded = messagesRef.current.some((m) => m.id === messageId); + if (!hasMessageLoaded) { + setPendingAck(messageId, ackCount, paths); + return; + } + + // Message is loaded now, so any prior pending ACK for it is stale. + pendingAcksRef.current.delete(messageId); + setMessages((prev) => { const idx = prev.findIndex((m) => m.id === messageId); if (idx >= 0) { + const current = prev[idx]; + const nextAck = Math.max(current.acked, ackCount); + const nextPaths = + paths !== undefined && paths.length >= (current.paths?.length ?? 0) ? paths : current.paths; + const updated = [...prev]; updated[idx] = { - ...prev[idx], - acked: ackCount, - ...(paths !== undefined && { paths }), + ...current, + acked: nextAck, + ...(paths !== undefined && { paths: nextPaths }), }; return updated; } + setPendingAck(messageId, ackCount, paths); return prev; }); }, - [] + [setPendingAck] ); return { diff --git a/frontend/src/test/useConversationMessages.race.test.ts b/frontend/src/test/useConversationMessages.race.test.ts new file mode 100644 index 0000000..8947b96 --- /dev/null +++ b/frontend/src/test/useConversationMessages.race.test.ts @@ -0,0 +1,124 @@ +import { act, renderHook, waitFor } from '@testing-library/react'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import * as messageCache from '../messageCache'; +import { useConversationMessages } from '../hooks/useConversationMessages'; +import type { Conversation, Message } from '../types'; + +const mockGetMessages = vi.fn<(...args: unknown[]) => Promise>(); + +vi.mock('../api', () => ({ + api: { + getMessages: (...args: unknown[]) => mockGetMessages(...args), + }, + isAbortError: (err: unknown) => err instanceof DOMException && err.name === 'AbortError', +})); + +function createConversation(): Conversation { + return { + type: 'contact', + id: 'abc123', + name: 'Test Contact', + }; +} + +function createMessage(overrides: Partial = {}): Message { + return { + id: 42, + type: 'PRIV', + conversation_key: 'abc123', + text: 'hello', + sender_timestamp: 1700000000, + received_at: 1700000001, + paths: null, + txt_type: 0, + signature: null, + outgoing: true, + acked: 0, + ...overrides, + }; +} + +function createDeferred() { + let resolve: (value: T | PromiseLike) => void = () => {}; + const promise = new Promise((res) => { + resolve = res; + }); + return { promise, resolve }; +} + +describe('useConversationMessages ACK ordering', () => { + beforeEach(() => { + mockGetMessages.mockReset(); + messageCache.clear(); + }); + + it('applies buffered ACK when message is added after ACK event', async () => { + mockGetMessages.mockResolvedValueOnce([]); + + const { result } = renderHook(() => useConversationMessages(createConversation())); + + await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1)); + await waitFor(() => expect(result.current.messagesLoading).toBe(false)); + + const paths = [{ path: 'A1B2', received_at: 1700000010 }]; + act(() => { + result.current.updateMessageAck(42, 2, paths); + }); + + act(() => { + const added = result.current.addMessageIfNew(createMessage({ id: 42, acked: 0, paths: null })); + expect(added).toBe(true); + }); + + expect(result.current.messages).toHaveLength(1); + expect(result.current.messages[0].acked).toBe(2); + expect(result.current.messages[0].paths).toEqual(paths); + }); + + it('applies buffered ACK to message returned by in-flight fetch', async () => { + const deferred = createDeferred(); + mockGetMessages.mockReturnValueOnce(deferred.promise); + + const { result } = renderHook(() => useConversationMessages(createConversation())); + await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1)); + + const paths = [{ path: 'C3D4', received_at: 1700000011 }]; + act(() => { + result.current.updateMessageAck(42, 1, paths); + }); + + deferred.resolve([createMessage({ id: 42, acked: 0, paths: null })]); + + await waitFor(() => expect(result.current.messages).toHaveLength(1)); + expect(result.current.messages[0].acked).toBe(1); + expect(result.current.messages[0].paths).toEqual(paths); + }); + + it('keeps highest ACK state when out-of-order ACK updates arrive', async () => { + mockGetMessages.mockResolvedValueOnce([]); + + const { result } = renderHook(() => useConversationMessages(createConversation())); + + await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1)); + await waitFor(() => expect(result.current.messagesLoading).toBe(false)); + + act(() => { + result.current.addMessageIfNew(createMessage({ id: 42, acked: 0, paths: null })); + }); + + const highAckPaths = [ + { path: 'A1B2', received_at: 1700000010 }, + { path: 'A1C3', received_at: 1700000011 }, + ]; + const staleAckPaths = [{ path: 'A1B2', received_at: 1700000010 }]; + + act(() => { + result.current.updateMessageAck(42, 3, highAckPaths); + result.current.updateMessageAck(42, 2, staleAckPaths); + }); + + expect(result.current.messages[0].acked).toBe(3); + expect(result.current.messages[0].paths).toEqual(highAckPaths); + }); +});