From 3a1c2d691b763adb1888eb79aa411d9de421a793 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Mon, 30 Mar 2026 20:49:09 -0700 Subject: [PATCH] Misc. bug bash --- app/event_handlers.py | 2 -- app/radio_sync.py | 20 ++++++++++++++-- app/websocket.py | 3 --- frontend/src/App.tsx | 2 ++ frontend/src/components/SearchView.tsx | 6 ++++- frontend/src/hooks/useRealtimeAppState.ts | 5 ++++ frontend/src/hooks/useUnreadCounts.ts | 23 +++++++++++++++++++ frontend/src/test/useRealtimeAppState.test.ts | 1 + tests/test_event_handlers.py | 3 +-- 9 files changed, 55 insertions(+), 10 deletions(-) diff --git a/app/event_handlers.py b/app/event_handlers.py index 7a96f80..eaf5c43 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -30,8 +30,6 @@ logger = logging.getLogger(__name__) # Track active subscriptions so we can unsubscribe before re-registering # This prevents handler duplication after reconnects _active_subscriptions: list["Subscription"] = [] -_pending_acks = dm_ack_tracker._pending_acks -_buffered_acks = dm_ack_tracker._buffered_acks def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> bool: diff --git a/app/radio_sync.py b/app/radio_sync.py index eb488c7..8f732ee 100644 --- a/app/radio_sync.py +++ b/app/radio_sync.py @@ -20,7 +20,7 @@ from meshcore import EventType, MeshCore from app.channel_constants import PUBLIC_CHANNEL_KEY, PUBLIC_CHANNEL_NAME from app.config import settings -from app.event_handlers import cleanup_expired_acks +from app.event_handlers import cleanup_expired_acks, on_contact_message from app.models import Contact, ContactUpsert from app.radio import RadioOperationBusyError from app.repository import ( @@ -379,6 +379,14 @@ async def _resolve_channel_for_pending_message( return cached_key, channel.name if channel else None +async def _store_pending_direct_message(event) -> None: + """Route a CONTACT_MSG_RECV event pulled via get_msg() through the DM ingest path.""" + try: + await on_contact_message(event) + except Exception: + logger.warning("Failed to store pending direct message", exc_info=True) + + async def _store_pending_channel_message(mc: MeshCore, payload: dict) -> None: """Persist a CHANNEL_MSG_RECV event pulled via get_msg().""" channel_idx = payload.get("channel_idx") @@ -403,7 +411,8 @@ async def _store_pending_channel_message(mc: MeshCore, payload: dict) -> None: return received_at = int(time.time()) - sender_timestamp = payload.get("sender_timestamp") or received_at + ts = payload.get("sender_timestamp") + sender_timestamp = ts if ts is not None else received_at sender_name, message_text = _split_channel_sender_and_text(payload.get("text", "")) await create_fallback_channel_message( @@ -488,6 +497,8 @@ async def drain_pending_messages(mc: MeshCore) -> int: elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV): if result.type == EventType.CHANNEL_MSG_RECV: await _store_pending_channel_message(mc, result.payload) + elif result.type == EventType.CONTACT_MSG_RECV: + await _store_pending_direct_message(result) count += 1 # Small delay between fetches @@ -525,6 +536,8 @@ async def poll_for_messages(mc: MeshCore) -> int: elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV): if result.type == EventType.CHANNEL_MSG_RECV: await _store_pending_channel_message(mc, result.payload) + elif result.type == EventType.CONTACT_MSG_RECV: + await _store_pending_direct_message(result) count += 1 # If we got a message, there might be more - drain them count += await drain_pending_messages(mc) @@ -1018,6 +1031,7 @@ _last_contact_sync: float = 0.0 CONTACT_SYNC_THROTTLE_SECONDS = 30 # Don't sync more than once per 30 seconds CONTACT_RECONCILE_BATCH_SIZE = 2 CONTACT_RECONCILE_YIELD_SECONDS = 0.05 +CONTACT_RECONCILE_BUSY_BACKOFF_SECONDS = 2.0 def _evict_removed_contact_from_library_cache(mc: MeshCore, public_key: str) -> None: @@ -1227,6 +1241,8 @@ async def _reconcile_radio_contacts_in_background( ) except RadioOperationBusyError: logger.debug("Background contact reconcile yielding: radio busy") + await asyncio.sleep(CONTACT_RECONCILE_BUSY_BACKOFF_SECONDS) + continue await asyncio.sleep(CONTACT_RECONCILE_YIELD_SECONDS) if not progressed: diff --git a/app/websocket.py b/app/websocket.py index 27ebdb0..81e67c8 100644 --- a/app/websocket.py +++ b/app/websocket.py @@ -43,9 +43,6 @@ class WebSocketManager: 3. Send to all clients concurrently with timeout 4. Re-acquire lock to clean up disconnected clients """ - if not self.active_connections: - return - message = dump_ws_event(event_type, data) # Copy connection list under lock to avoid holding lock during I/O diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index b11009a..6d50afb 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -274,6 +274,7 @@ export function App() { unreadLastReadAts, recordMessageEvent, renameConversationState, + removeConversationState, markAllRead, refreshUnreads, } = useUnreadCounts(channels, contacts, activeConversation); @@ -349,6 +350,7 @@ export function App() { observeMessage, recordMessageEvent, renameConversationState, + removeConversationState, checkMention, pendingDeleteFallbackRef, setActiveConversation, diff --git a/frontend/src/components/SearchView.tsx b/frontend/src/components/SearchView.tsx index e297a51..7932996 100644 --- a/frontend/src/components/SearchView.tsx +++ b/frontend/src/components/SearchView.tsx @@ -174,7 +174,11 @@ export function SearchView({ api .getMessages({ q: debouncedQuery, limit: SEARCH_PAGE_SIZE, offset }, controller.signal) .then((data) => { - setResults((prev) => [...prev, ...(data as SearchResult[])]); + setResults((prev) => { + const existingIds = new Set(prev.map((r) => r.id)); + const unique = (data as SearchResult[]).filter((r) => !existingIds.has(r.id)); + return [...prev, ...unique]; + }); setHasMore(data.length >= SEARCH_PAGE_SIZE); setOffset((prev) => prev + data.length); }) diff --git a/frontend/src/hooks/useRealtimeAppState.ts b/frontend/src/hooks/useRealtimeAppState.ts index cd1c843..724abb3 100644 --- a/frontend/src/hooks/useRealtimeAppState.ts +++ b/frontend/src/hooks/useRealtimeAppState.ts @@ -43,6 +43,7 @@ interface UseRealtimeAppStateArgs { hasMention?: boolean; }) => void; renameConversationState: (oldStateKey: string, newStateKey: string) => void; + removeConversationState: (stateKey: string) => void; checkMention: (text: string) => boolean; pendingDeleteFallbackRef: MutableRefObject; setActiveConversation: (conv: Conversation | null) => void; @@ -96,6 +97,7 @@ export function useRealtimeAppState({ observeMessage, recordMessageEvent, renameConversationState, + removeConversationState, checkMention, pendingDeleteFallbackRef, setActiveConversation, @@ -232,6 +234,7 @@ export function useRealtimeAppState({ onContactDeleted: (publicKey: string) => { setContacts((prev) => prev.filter((c) => c.public_key !== publicKey)); removeConversationMessages(publicKey); + removeConversationState(getStateKey('contact', publicKey)); const active = activeConversationRef.current; if (active?.type === 'contact' && active.id === publicKey) { pendingDeleteFallbackRef.current = true; @@ -241,6 +244,7 @@ export function useRealtimeAppState({ onChannelDeleted: (key: string) => { setChannels((prev) => prev.filter((c) => c.key !== key)); removeConversationMessages(key); + removeConversationState(getStateKey('channel', key)); const active = activeConversationRef.current; if (active?.type === 'channel' && active.id === key) { pendingDeleteFallbackRef.current = true; @@ -267,6 +271,7 @@ export function useRealtimeAppState({ checkMention, fetchAllContacts, fetchConfig, + removeConversationState, renameConversationState, renameConversationMessages, maxRawPackets, diff --git a/frontend/src/hooks/useUnreadCounts.ts b/frontend/src/hooks/useUnreadCounts.ts index 623deb4..97f3382 100644 --- a/frontend/src/hooks/useUnreadCounts.ts +++ b/frontend/src/hooks/useUnreadCounts.ts @@ -23,6 +23,7 @@ interface UseUnreadCountsResult { hasMention?: boolean; }) => void; renameConversationState: (oldStateKey: string, newStateKey: string) => void; + removeConversationState: (stateKey: string) => void; markAllRead: () => void; refreshUnreads: () => Promise; } @@ -235,6 +236,27 @@ export function useUnreadCounts( setLastMessageTimes(renameConversationTimeKey(oldStateKey, newStateKey)); }, []); + const removeConversationState = useCallback((stateKey: string) => { + setUnreadCounts((prev) => { + if (!(stateKey in prev)) return prev; + const next = { ...prev }; + delete next[stateKey]; + return next; + }); + setMentions((prev) => { + if (!(stateKey in prev)) return prev; + const next = { ...prev }; + delete next[stateKey]; + return next; + }); + setUnreadLastReadAts((prev) => { + if (!(stateKey in prev)) return prev; + const next = { ...prev }; + delete next[stateKey]; + return next; + }); + }, []); + // Mark all conversations as read // Calls single bulk API endpoint to persist read state const markAllRead = useCallback(() => { @@ -256,6 +278,7 @@ export function useUnreadCounts( unreadLastReadAts, recordMessageEvent, renameConversationState, + removeConversationState, markAllRead, refreshUnreads: fetchUnreads, }; diff --git a/frontend/src/test/useRealtimeAppState.test.ts b/frontend/src/test/useRealtimeAppState.test.ts index 877fa89..05267f1 100644 --- a/frontend/src/test/useRealtimeAppState.test.ts +++ b/frontend/src/test/useRealtimeAppState.test.ts @@ -69,6 +69,7 @@ function createRealtimeArgs(overrides: Partial ({ added: false, activeConversation: false })), recordMessageEvent: vi.fn(), renameConversationState: vi.fn(), + removeConversationState: vi.fn(), checkMention: vi.fn(() => false), pendingDeleteFallbackRef: { current: false }, setActiveConversation: vi.fn(), diff --git a/tests/test_event_handlers.py b/tests/test_event_handlers.py index 7360bd6..5de10b0 100644 --- a/tests/test_event_handlers.py +++ b/tests/test_event_handlers.py @@ -11,8 +11,6 @@ import pytest from app.event_handlers import ( _active_subscriptions, - _buffered_acks, - _pending_acks, cleanup_expired_acks, register_event_handlers, track_pending_ack, @@ -23,6 +21,7 @@ from app.repository import ( ContactRepository, MessageRepository, ) +from app.services.dm_ack_tracker import _buffered_acks, _pending_acks @pytest.fixture(autouse=True)