mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
Fix ack/message race condition where out of sequence acks and messages would cause dropped acks
This commit is contained in:
@@ -5,6 +5,48 @@ import * as messageCache from '../messageCache';
|
||||
import type { Conversation, Message, MessagePath } from '../types';
|
||||
|
||||
const MESSAGE_PAGE_SIZE = 200;
|
||||
const MAX_PENDING_ACKS = 500;
|
||||
|
||||
interface PendingAckUpdate {
|
||||
ackCount: number;
|
||||
paths?: MessagePath[];
|
||||
}
|
||||
|
||||
function mergePendingAck(
|
||||
existing: PendingAckUpdate | undefined,
|
||||
ackCount: number,
|
||||
paths?: MessagePath[]
|
||||
): PendingAckUpdate {
|
||||
if (!existing) {
|
||||
return {
|
||||
ackCount,
|
||||
...(paths !== undefined && { paths }),
|
||||
};
|
||||
}
|
||||
|
||||
if (ackCount > existing.ackCount) {
|
||||
return {
|
||||
ackCount,
|
||||
...(paths !== undefined && { paths }),
|
||||
...(paths === undefined && existing.paths !== undefined && { paths: existing.paths }),
|
||||
};
|
||||
}
|
||||
|
||||
if (ackCount < existing.ackCount) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
if (paths === undefined) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const existingPathCount = existing.paths?.length ?? -1;
|
||||
if (paths.length >= existingPathCount) {
|
||||
return { ackCount, paths };
|
||||
}
|
||||
|
||||
return existing;
|
||||
}
|
||||
|
||||
// Generate a key for deduplicating messages by content
|
||||
export function getMessageContentKey(msg: Message): string {
|
||||
@@ -34,6 +76,10 @@ export function useConversationMessages(
|
||||
// Track seen message content for deduplication
|
||||
const seenMessageContent = useRef<Set<string>>(new Set());
|
||||
|
||||
// ACK events can arrive before the corresponding message event/response.
|
||||
// Buffer latest ACK state by message_id and apply when the message arrives.
|
||||
const pendingAcksRef = useRef<Map<number, PendingAckUpdate>>(new Map());
|
||||
|
||||
// AbortController for cancelling in-flight requests on conversation change
|
||||
const abortControllerRef = useRef<AbortController | null>(null);
|
||||
|
||||
@@ -54,6 +100,35 @@ export function useConversationMessages(
|
||||
hasOlderMessagesRef.current = hasOlderMessages;
|
||||
}, [hasOlderMessages]);
|
||||
|
||||
const setPendingAck = useCallback((messageId: number, ackCount: number, paths?: MessagePath[]) => {
|
||||
const existing = pendingAcksRef.current.get(messageId);
|
||||
const merged = mergePendingAck(existing, ackCount, paths);
|
||||
|
||||
// Update insertion order so most recent updates remain in the buffer longest.
|
||||
pendingAcksRef.current.delete(messageId);
|
||||
pendingAcksRef.current.set(messageId, merged);
|
||||
|
||||
if (pendingAcksRef.current.size > MAX_PENDING_ACKS) {
|
||||
const oldestMessageId = pendingAcksRef.current.keys().next().value as number | undefined;
|
||||
if (oldestMessageId !== undefined) {
|
||||
pendingAcksRef.current.delete(oldestMessageId);
|
||||
}
|
||||
}
|
||||
}, []);
|
||||
|
||||
const applyPendingAck = useCallback((msg: Message): Message => {
|
||||
const pending = pendingAcksRef.current.get(msg.id);
|
||||
if (!pending) return msg;
|
||||
|
||||
pendingAcksRef.current.delete(msg.id);
|
||||
|
||||
return {
|
||||
...msg,
|
||||
acked: Math.max(msg.acked, pending.ackCount),
|
||||
...(pending.paths !== undefined && { paths: pending.paths }),
|
||||
};
|
||||
}, []);
|
||||
|
||||
// Fetch messages for active conversation
|
||||
// Note: This is called manually and from the useEffect. The useEffect handles
|
||||
// cancellation via AbortController; manual calls (e.g., after sending a message)
|
||||
@@ -91,14 +166,15 @@ export function useConversationMessages(
|
||||
return;
|
||||
}
|
||||
|
||||
setMessages(data);
|
||||
const messagesWithPendingAck = data.map((msg) => applyPendingAck(msg));
|
||||
setMessages(messagesWithPendingAck);
|
||||
// Track seen content for new messages
|
||||
seenMessageContent.current.clear();
|
||||
for (const msg of data) {
|
||||
for (const msg of messagesWithPendingAck) {
|
||||
seenMessageContent.current.add(getMessageContentKey(msg));
|
||||
}
|
||||
// If we got a full page, there might be more
|
||||
setHasOlderMessages(data.length >= MESSAGE_PAGE_SIZE);
|
||||
setHasOlderMessages(messagesWithPendingAck.length >= MESSAGE_PAGE_SIZE);
|
||||
} catch (err) {
|
||||
// Don't show error toast for aborted requests (user switched conversations)
|
||||
if (isAbortError(err)) {
|
||||
@@ -114,7 +190,7 @@ export function useConversationMessages(
|
||||
}
|
||||
}
|
||||
},
|
||||
[activeConversation]
|
||||
[activeConversation, applyPendingAck]
|
||||
);
|
||||
|
||||
// Fetch older messages (cursor-based pagination)
|
||||
@@ -148,17 +224,18 @@ export function useConversationMessages(
|
||||
before: oldestMessage.received_at,
|
||||
before_id: oldestMessage.id,
|
||||
});
|
||||
const dataWithPendingAck = data.map((msg) => applyPendingAck(msg));
|
||||
|
||||
if (data.length > 0) {
|
||||
if (dataWithPendingAck.length > 0) {
|
||||
// Prepend older messages (they come sorted DESC, so older are at the end)
|
||||
setMessages((prev) => [...prev, ...data]);
|
||||
setMessages((prev) => [...prev, ...dataWithPendingAck]);
|
||||
// Track seen content
|
||||
for (const msg of data) {
|
||||
for (const msg of dataWithPendingAck) {
|
||||
seenMessageContent.current.add(getMessageContentKey(msg));
|
||||
}
|
||||
}
|
||||
// If we got less than a full page, no more messages
|
||||
setHasOlderMessages(data.length >= MESSAGE_PAGE_SIZE);
|
||||
setHasOlderMessages(dataWithPendingAck.length >= MESSAGE_PAGE_SIZE);
|
||||
} catch (err) {
|
||||
console.error('Failed to fetch older messages:', err);
|
||||
toast.error('Failed to load older messages', {
|
||||
@@ -167,7 +244,7 @@ export function useConversationMessages(
|
||||
} finally {
|
||||
setLoadingOlder(false);
|
||||
}
|
||||
}, [activeConversation, loadingOlder, hasOlderMessages, messages]);
|
||||
}, [activeConversation, loadingOlder, hasOlderMessages, messages, applyPendingAck]);
|
||||
|
||||
// Background reconciliation: silently fetch from backend after a cache restore
|
||||
// and only update state if something differs (missed WS message, stale ack, etc.).
|
||||
@@ -187,7 +264,8 @@ export function useConversationMessages(
|
||||
// Stale check — conversation may have changed while awaiting
|
||||
if (fetchingConversationIdRef.current !== conversationId) return;
|
||||
|
||||
const merged = messageCache.reconcile(messagesRef.current, data);
|
||||
const dataWithPendingAck = data.map((msg) => applyPendingAck(msg));
|
||||
const merged = messageCache.reconcile(messagesRef.current, dataWithPendingAck);
|
||||
if (!merged) return; // Cache was consistent — no rerender
|
||||
|
||||
setMessages(merged);
|
||||
@@ -195,7 +273,7 @@ export function useConversationMessages(
|
||||
for (const msg of merged) {
|
||||
seenMessageContent.current.add(getMessageContentKey(msg));
|
||||
}
|
||||
if (data.length >= MESSAGE_PAGE_SIZE) {
|
||||
if (dataWithPendingAck.length >= MESSAGE_PAGE_SIZE) {
|
||||
setHasOlderMessages(true);
|
||||
}
|
||||
})
|
||||
@@ -268,7 +346,8 @@ export function useConversationMessages(
|
||||
// Add a message if it's new (deduplication)
|
||||
// Returns true if the message was added, false if it was a duplicate
|
||||
const addMessageIfNew = useCallback((msg: Message): boolean => {
|
||||
const contentKey = getMessageContentKey(msg);
|
||||
const msgWithPendingAck = applyPendingAck(msg);
|
||||
const contentKey = getMessageContentKey(msgWithPendingAck);
|
||||
if (seenMessageContent.current.has(contentKey)) {
|
||||
console.debug('Duplicate message content ignored:', contentKey.slice(0, 50));
|
||||
return false;
|
||||
@@ -282,33 +361,48 @@ export function useConversationMessages(
|
||||
}
|
||||
|
||||
setMessages((prev) => {
|
||||
if (prev.some((m) => m.id === msg.id)) {
|
||||
if (prev.some((m) => m.id === msgWithPendingAck.id)) {
|
||||
return prev;
|
||||
}
|
||||
return [...prev, msg];
|
||||
return [...prev, msgWithPendingAck];
|
||||
});
|
||||
|
||||
return true;
|
||||
}, []);
|
||||
}, [applyPendingAck]);
|
||||
|
||||
// Update a message's ack count and paths
|
||||
const updateMessageAck = useCallback(
|
||||
(messageId: number, ackCount: number, paths?: MessagePath[]) => {
|
||||
const hasMessageLoaded = messagesRef.current.some((m) => m.id === messageId);
|
||||
if (!hasMessageLoaded) {
|
||||
setPendingAck(messageId, ackCount, paths);
|
||||
return;
|
||||
}
|
||||
|
||||
// Message is loaded now, so any prior pending ACK for it is stale.
|
||||
pendingAcksRef.current.delete(messageId);
|
||||
|
||||
setMessages((prev) => {
|
||||
const idx = prev.findIndex((m) => m.id === messageId);
|
||||
if (idx >= 0) {
|
||||
const current = prev[idx];
|
||||
const nextAck = Math.max(current.acked, ackCount);
|
||||
const nextPaths =
|
||||
paths !== undefined && paths.length >= (current.paths?.length ?? 0) ? paths : current.paths;
|
||||
|
||||
const updated = [...prev];
|
||||
updated[idx] = {
|
||||
...prev[idx],
|
||||
acked: ackCount,
|
||||
...(paths !== undefined && { paths }),
|
||||
...current,
|
||||
acked: nextAck,
|
||||
...(paths !== undefined && { paths: nextPaths }),
|
||||
};
|
||||
return updated;
|
||||
}
|
||||
setPendingAck(messageId, ackCount, paths);
|
||||
return prev;
|
||||
});
|
||||
},
|
||||
[]
|
||||
[setPendingAck]
|
||||
);
|
||||
|
||||
return {
|
||||
|
||||
124
frontend/src/test/useConversationMessages.race.test.ts
Normal file
124
frontend/src/test/useConversationMessages.race.test.ts
Normal file
@@ -0,0 +1,124 @@
|
||||
import { act, renderHook, waitFor } from '@testing-library/react';
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import * as messageCache from '../messageCache';
|
||||
import { useConversationMessages } from '../hooks/useConversationMessages';
|
||||
import type { Conversation, Message } from '../types';
|
||||
|
||||
const mockGetMessages = vi.fn<(...args: unknown[]) => Promise<Message[]>>();
|
||||
|
||||
vi.mock('../api', () => ({
|
||||
api: {
|
||||
getMessages: (...args: unknown[]) => mockGetMessages(...args),
|
||||
},
|
||||
isAbortError: (err: unknown) => err instanceof DOMException && err.name === 'AbortError',
|
||||
}));
|
||||
|
||||
function createConversation(): Conversation {
|
||||
return {
|
||||
type: 'contact',
|
||||
id: 'abc123',
|
||||
name: 'Test Contact',
|
||||
};
|
||||
}
|
||||
|
||||
function createMessage(overrides: Partial<Message> = {}): Message {
|
||||
return {
|
||||
id: 42,
|
||||
type: 'PRIV',
|
||||
conversation_key: 'abc123',
|
||||
text: 'hello',
|
||||
sender_timestamp: 1700000000,
|
||||
received_at: 1700000001,
|
||||
paths: null,
|
||||
txt_type: 0,
|
||||
signature: null,
|
||||
outgoing: true,
|
||||
acked: 0,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function createDeferred<T>() {
|
||||
let resolve: (value: T | PromiseLike<T>) => void = () => {};
|
||||
const promise = new Promise<T>((res) => {
|
||||
resolve = res;
|
||||
});
|
||||
return { promise, resolve };
|
||||
}
|
||||
|
||||
describe('useConversationMessages ACK ordering', () => {
|
||||
beforeEach(() => {
|
||||
mockGetMessages.mockReset();
|
||||
messageCache.clear();
|
||||
});
|
||||
|
||||
it('applies buffered ACK when message is added after ACK event', async () => {
|
||||
mockGetMessages.mockResolvedValueOnce([]);
|
||||
|
||||
const { result } = renderHook(() => useConversationMessages(createConversation()));
|
||||
|
||||
await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1));
|
||||
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
|
||||
|
||||
const paths = [{ path: 'A1B2', received_at: 1700000010 }];
|
||||
act(() => {
|
||||
result.current.updateMessageAck(42, 2, paths);
|
||||
});
|
||||
|
||||
act(() => {
|
||||
const added = result.current.addMessageIfNew(createMessage({ id: 42, acked: 0, paths: null }));
|
||||
expect(added).toBe(true);
|
||||
});
|
||||
|
||||
expect(result.current.messages).toHaveLength(1);
|
||||
expect(result.current.messages[0].acked).toBe(2);
|
||||
expect(result.current.messages[0].paths).toEqual(paths);
|
||||
});
|
||||
|
||||
it('applies buffered ACK to message returned by in-flight fetch', async () => {
|
||||
const deferred = createDeferred<Message[]>();
|
||||
mockGetMessages.mockReturnValueOnce(deferred.promise);
|
||||
|
||||
const { result } = renderHook(() => useConversationMessages(createConversation()));
|
||||
await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1));
|
||||
|
||||
const paths = [{ path: 'C3D4', received_at: 1700000011 }];
|
||||
act(() => {
|
||||
result.current.updateMessageAck(42, 1, paths);
|
||||
});
|
||||
|
||||
deferred.resolve([createMessage({ id: 42, acked: 0, paths: null })]);
|
||||
|
||||
await waitFor(() => expect(result.current.messages).toHaveLength(1));
|
||||
expect(result.current.messages[0].acked).toBe(1);
|
||||
expect(result.current.messages[0].paths).toEqual(paths);
|
||||
});
|
||||
|
||||
it('keeps highest ACK state when out-of-order ACK updates arrive', async () => {
|
||||
mockGetMessages.mockResolvedValueOnce([]);
|
||||
|
||||
const { result } = renderHook(() => useConversationMessages(createConversation()));
|
||||
|
||||
await waitFor(() => expect(mockGetMessages).toHaveBeenCalledTimes(1));
|
||||
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
|
||||
|
||||
act(() => {
|
||||
result.current.addMessageIfNew(createMessage({ id: 42, acked: 0, paths: null }));
|
||||
});
|
||||
|
||||
const highAckPaths = [
|
||||
{ path: 'A1B2', received_at: 1700000010 },
|
||||
{ path: 'A1C3', received_at: 1700000011 },
|
||||
];
|
||||
const staleAckPaths = [{ path: 'A1B2', received_at: 1700000010 }];
|
||||
|
||||
act(() => {
|
||||
result.current.updateMessageAck(42, 3, highAckPaths);
|
||||
result.current.updateMessageAck(42, 2, staleAckPaths);
|
||||
});
|
||||
|
||||
expect(result.current.messages[0].acked).toBe(3);
|
||||
expect(result.current.messages[0].paths).toEqual(highAckPaths);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user