Move to SSoT for message dedup to prevent phantom unreads

This commit is contained in:
Jack Kingsman
2026-02-23 19:52:42 -08:00
parent 1bd31d68d9
commit 7306627ac7
4 changed files with 144 additions and 44 deletions

View File

@@ -57,8 +57,6 @@ const MAX_RAW_PACKETS = 500;
export function App() {
const messageInputRef = useRef<MessageInputHandle>(null);
// Track seen message content to prevent duplicate unread increments
const seenMessageContentRef = useRef<Set<string>>(new Set());
const [rawPackets, setRawPackets] = useState<RawPacket[]>([]);
const [showNewMessage, setShowNewMessage] = useState(false);
const [showSettings, setShowSettings] = useState(false);
@@ -248,25 +246,13 @@ export function App() {
const contentKey = getMessageContentKey(msg);
// Dedup: check if we've already seen this content BEFORE marking it seen.
const alreadySeen = !msg.outgoing && seenMessageContentRef.current.has(contentKey);
if (!msg.outgoing) {
seenMessageContentRef.current.add(contentKey);
// Limit set size to prevent memory issues
if (seenMessageContentRef.current.size > 1000) {
const keys = Array.from(seenMessageContentRef.current);
seenMessageContentRef.current = new Set(keys.slice(-500));
}
}
// For non-active conversations: update cache and count unreads
if (!isForActiveConversation) {
// Update message cache (instant restore on switch)
messageCache.addMessage(msg.conversation_key, msg, contentKey);
// Update message cache (instant restore on switch) — returns true if new
const isNew = messageCache.addMessage(msg.conversation_key, msg, contentKey);
// Count unread for incoming messages (skip duplicates from multiple mesh paths)
if (!msg.outgoing && !alreadySeen) {
if (!msg.outgoing && isNew) {
let stateKey: string | null = null;
if (msg.type === 'CHAN' && msg.conversation_key) {
stateKey = getStateKey('channel', msg.conversation_key);

View File

@@ -49,12 +49,25 @@ export function set(id: string, entry: CacheEntry): void {
}
}
/** Add a message to a cached (non-active) conversation with dedup. */
export function addMessage(id: string, msg: Message, contentKey: string): void {
/** Add a message to a cached conversation with dedup. Returns true if new, false if duplicate. */
export function addMessage(id: string, msg: Message, contentKey: string): boolean {
const entry = cache.get(id);
if (!entry) return;
if (entry.seenContent.has(contentKey)) return;
if (entry.messages.some((m) => m.id === msg.id)) return;
if (!entry) {
// Auto-create a minimal entry for never-visited conversations
cache.set(id, {
messages: [msg],
seenContent: new Set([contentKey]),
hasOlderMessages: true,
});
// Evict LRU if over capacity
if (cache.size > MAX_CACHED_CONVERSATIONS) {
const lruKey = cache.keys().next().value as string;
cache.delete(lruKey);
}
return true;
}
if (entry.seenContent.has(contentKey)) return false;
if (entry.messages.some((m) => m.id === msg.id)) return false;
entry.seenContent.add(contentKey);
entry.messages = [...entry.messages, msg];
// Trim if over limit (drop oldest by received_at)
@@ -63,6 +76,7 @@ export function addMessage(id: string, msg: Message, contentKey: string): void {
.sort((a, b) => b.received_at - a.received_at)
.slice(0, MAX_MESSAGES_PER_ENTRY);
}
return true;
}
/** Scan all cached entries for a message ID and update its ack/paths. */

View File

@@ -8,10 +8,11 @@
* between backend and frontend - both sides test against the same data.
*/
import { describe, it, expect } from 'vitest';
import { describe, it, expect, beforeEach } from 'vitest';
import fixtures from './fixtures/websocket_events.json';
import { getMessageContentKey } from '../hooks/useConversationMessages';
import { getStateKey } from '../utils/conversationState';
import * as messageCache from '../messageCache';
import type { Message, Contact, Channel } from '../types';
/**
@@ -24,7 +25,8 @@ interface MockState {
channels: Channel[];
unreadCounts: Record<string, number>;
lastMessageTimes: Record<string, number>;
seenMessageContent: Set<string>;
/** Active-conversation dedup (mirrors useConversationMessages internal set) */
seenActiveContent: Set<string>;
}
function createMockState(): MockState {
@@ -34,13 +36,16 @@ function createMockState(): MockState {
channels: [],
unreadCounts: {},
lastMessageTimes: {},
seenMessageContent: new Set(),
seenActiveContent: new Set(),
};
}
/**
* Simulate handling a message WebSocket event.
* Mirrors the logic in App.tsx onMessage handler.
*
* Non-active conversation dedup uses messageCache.addMessage (single source of truth).
* Active conversation dedup uses seenActiveContent (mirrors useConversationMessages).
*/
function handleMessageEvent(
state: MockState,
@@ -57,8 +62,8 @@ function handleMessageEvent(
// Add to messages if for active conversation (with deduplication)
if (isForActiveConversation) {
if (!state.seenMessageContent.has(contentKey)) {
state.seenMessageContent.add(contentKey);
if (!state.seenActiveContent.has(contentKey)) {
state.seenActiveContent.add(contentKey);
state.messages.push(msg);
added = true;
}
@@ -73,10 +78,10 @@ function handleMessageEvent(
state.lastMessageTimes[stateKey] = msg.received_at;
// Increment unread if not for active conversation and not outgoing
if (!msg.outgoing && !isForActiveConversation) {
// Deduplicate by content
if (!state.seenMessageContent.has(contentKey)) {
state.seenMessageContent.add(contentKey);
// Uses messageCache.addMessage as single source of truth for dedup
if (!isForActiveConversation) {
const isNew = messageCache.addMessage(msg.conversation_key, msg, contentKey);
if (!msg.outgoing && isNew) {
state.unreadCounts[stateKey] = (state.unreadCounts[stateKey] || 0) + 1;
unreadIncremented = true;
}
@@ -111,6 +116,11 @@ function handleMessageAckedEvent(state: MockState, messageId: number, ackCount:
return false;
}
// Clear messageCache between tests to avoid cross-test contamination
beforeEach(() => {
messageCache.clear();
});
describe('Integration: Channel Message Events', () => {
const fixture = fixtures.channel_message;
@@ -317,6 +327,61 @@ describe('Integration: ACK Events', () => {
});
});
describe('Integration: No phantom unreads from mesh echoes (hitlist #8 regression)', () => {
it('does not increment unread when a mesh echo arrives after many unique messages', () => {
const state = createMockState();
const convKey = 'channel_busy';
// Deliver 1001 unique messages — exceeding the old global
// seenMessageContentRef prune threshold (1000→500). Under the old
// dual-set design the global set would drop msg-0's key during pruning,
// so a later mesh echo of msg-0 would pass the global check and
// phantom-increment unread. With the fix, messageCache's per-conversation
// seenContent is the single source of truth and is never pruned.
const MESSAGE_COUNT = 1001;
for (let i = 0; i < MESSAGE_COUNT; i++) {
const msg: Message = {
id: i,
type: 'CHAN',
conversation_key: convKey,
text: `msg-${i}`,
sender_timestamp: 1700000000 + i,
received_at: 1700000000 + i,
paths: null,
txt_type: 0,
signature: null,
outgoing: false,
acked: 0,
};
handleMessageEvent(state, msg, 'other_active_conv');
}
const stateKey = getStateKey('channel', convKey);
expect(state.unreadCounts[stateKey]).toBe(MESSAGE_COUNT);
// Now a mesh echo of msg-0 arrives (same content, different id).
// msg-0's key would have been evicted by the old 1000→500 prune.
const echo: Message = {
id: 9999,
type: 'CHAN',
conversation_key: convKey,
text: 'msg-0',
sender_timestamp: 1700000000, // same sender_timestamp as original
received_at: 1700001000,
paths: null,
txt_type: 0,
signature: null,
outgoing: false,
acked: 0,
};
const result = handleMessageEvent(state, echo, 'other_active_conv');
// Must NOT increment unread — the echo is a duplicate
expect(result.unreadIncremented).toBe(false);
expect(state.unreadCounts[stateKey]).toBe(MESSAGE_COUNT);
});
});
describe('Integration: Message Content Key Contract', () => {
it('generates consistent keys for deduplication', () => {
const msg = fixtures.channel_message.expected_ws_event.data as unknown as Message;

View File

@@ -150,38 +150,45 @@ describe('messageCache', () => {
});
describe('addMessage', () => {
it('adds message to existing cached conversation', () => {
it('adds message to existing cached conversation and returns true', () => {
messageCache.set('conv1', createEntry([]));
const msg = createMessage({ id: 10, text: 'New message' });
messageCache.addMessage('conv1', msg, 'CHAN-channel123-New message-1700000000');
const result = messageCache.addMessage(
'conv1',
msg,
'CHAN-channel123-New message-1700000000'
);
expect(result).toBe(true);
const entry = messageCache.get('conv1');
expect(entry!.messages).toHaveLength(1);
expect(entry!.messages[0].text).toBe('New message');
});
it('deduplicates by content key', () => {
it('deduplicates by content key and returns false', () => {
messageCache.set('conv1', createEntry([]));
const msg1 = createMessage({ id: 10, text: 'Hello' });
const contentKey = 'CHAN-channel123-Hello-1700000000';
messageCache.addMessage('conv1', msg1, contentKey);
expect(messageCache.addMessage('conv1', msg1, contentKey)).toBe(true);
// Same content key, different message id
const msg2 = createMessage({ id: 11, text: 'Hello' });
messageCache.addMessage('conv1', msg2, contentKey);
expect(messageCache.addMessage('conv1', msg2, contentKey)).toBe(false);
const entry = messageCache.get('conv1');
expect(entry!.messages).toHaveLength(1);
});
it('deduplicates by message id', () => {
it('deduplicates by message id and returns false', () => {
messageCache.set('conv1', createEntry([createMessage({ id: 10, text: 'Original' })]));
// Same id, different content key
const msg = createMessage({ id: 10, text: 'Different' });
messageCache.addMessage('conv1', msg, 'CHAN-channel123-Different-1700000000');
expect(messageCache.addMessage('conv1', msg, 'CHAN-channel123-Different-1700000000')).toBe(
false
);
const entry = messageCache.get('conv1');
expect(entry!.messages).toHaveLength(1);
@@ -200,8 +207,13 @@ describe('messageCache', () => {
text: 'newest',
received_at: 1700000000 + MAX_MESSAGES_PER_ENTRY,
});
messageCache.addMessage('conv1', newMsg, `CHAN-channel123-newest-${newMsg.sender_timestamp}`);
const result = messageCache.addMessage(
'conv1',
newMsg,
`CHAN-channel123-newest-${newMsg.sender_timestamp}`
);
expect(result).toBe(true);
const entry = messageCache.get('conv1');
expect(entry!.messages).toHaveLength(MAX_MESSAGES_PER_ENTRY);
// Newest message should be kept
@@ -210,11 +222,34 @@ describe('messageCache', () => {
expect(entry!.messages.some((m) => m.id === 0)).toBe(false);
});
it('ignores messages for non-cached conversations', () => {
const msg = createMessage({ id: 10 });
// Should not throw
messageCache.addMessage('nonexistent', msg, 'key');
expect(messageCache.size()).toBe(0);
it('auto-creates a minimal entry for never-visited conversations and returns true', () => {
const msg = createMessage({ id: 10, text: 'First contact' });
const result = messageCache.addMessage(
'new_conv',
msg,
'CHAN-channel123-First contact-1700000000'
);
expect(result).toBe(true);
expect(messageCache.size()).toBe(1);
const entry = messageCache.get('new_conv');
expect(entry).toBeDefined();
expect(entry!.messages).toHaveLength(1);
expect(entry!.messages[0].text).toBe('First contact');
expect(entry!.hasOlderMessages).toBe(true);
expect(entry!.seenContent.has('CHAN-channel123-First contact-1700000000')).toBe(true);
});
it('returns false for duplicate delivery to auto-created entry', () => {
const msg = createMessage({ id: 10, text: 'Echo' });
const contentKey = 'CHAN-channel123-Echo-1700000000';
expect(messageCache.addMessage('new_conv', msg, contentKey)).toBe(true);
// Duplicate via mesh echo
expect(messageCache.addMessage('new_conv', msg, contentKey)).toBe(false);
const entry = messageCache.get('new_conv');
expect(entry!.messages).toHaveLength(1);
});
});