mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-05-03 20:13:00 +02:00
Misc. bug bash
This commit is contained in:
@@ -30,8 +30,6 @@ logger = logging.getLogger(__name__)
|
||||
# Track active subscriptions so we can unsubscribe before re-registering
|
||||
# This prevents handler duplication after reconnects
|
||||
_active_subscriptions: list["Subscription"] = []
|
||||
_pending_acks = dm_ack_tracker._pending_acks
|
||||
_buffered_acks = dm_ack_tracker._buffered_acks
|
||||
|
||||
|
||||
def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> bool:
|
||||
|
||||
@@ -20,7 +20,7 @@ from meshcore import EventType, MeshCore
|
||||
|
||||
from app.channel_constants import PUBLIC_CHANNEL_KEY, PUBLIC_CHANNEL_NAME
|
||||
from app.config import settings
|
||||
from app.event_handlers import cleanup_expired_acks
|
||||
from app.event_handlers import cleanup_expired_acks, on_contact_message
|
||||
from app.models import Contact, ContactUpsert
|
||||
from app.radio import RadioOperationBusyError
|
||||
from app.repository import (
|
||||
@@ -379,6 +379,14 @@ async def _resolve_channel_for_pending_message(
|
||||
return cached_key, channel.name if channel else None
|
||||
|
||||
|
||||
async def _store_pending_direct_message(event) -> None:
|
||||
"""Route a CONTACT_MSG_RECV event pulled via get_msg() through the DM ingest path."""
|
||||
try:
|
||||
await on_contact_message(event)
|
||||
except Exception:
|
||||
logger.warning("Failed to store pending direct message", exc_info=True)
|
||||
|
||||
|
||||
async def _store_pending_channel_message(mc: MeshCore, payload: dict) -> None:
|
||||
"""Persist a CHANNEL_MSG_RECV event pulled via get_msg()."""
|
||||
channel_idx = payload.get("channel_idx")
|
||||
@@ -403,7 +411,8 @@ async def _store_pending_channel_message(mc: MeshCore, payload: dict) -> None:
|
||||
return
|
||||
|
||||
received_at = int(time.time())
|
||||
sender_timestamp = payload.get("sender_timestamp") or received_at
|
||||
ts = payload.get("sender_timestamp")
|
||||
sender_timestamp = ts if ts is not None else received_at
|
||||
sender_name, message_text = _split_channel_sender_and_text(payload.get("text", ""))
|
||||
|
||||
await create_fallback_channel_message(
|
||||
@@ -488,6 +497,8 @@ async def drain_pending_messages(mc: MeshCore) -> int:
|
||||
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
|
||||
if result.type == EventType.CHANNEL_MSG_RECV:
|
||||
await _store_pending_channel_message(mc, result.payload)
|
||||
elif result.type == EventType.CONTACT_MSG_RECV:
|
||||
await _store_pending_direct_message(result)
|
||||
count += 1
|
||||
|
||||
# Small delay between fetches
|
||||
@@ -525,6 +536,8 @@ async def poll_for_messages(mc: MeshCore) -> int:
|
||||
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
|
||||
if result.type == EventType.CHANNEL_MSG_RECV:
|
||||
await _store_pending_channel_message(mc, result.payload)
|
||||
elif result.type == EventType.CONTACT_MSG_RECV:
|
||||
await _store_pending_direct_message(result)
|
||||
count += 1
|
||||
# If we got a message, there might be more - drain them
|
||||
count += await drain_pending_messages(mc)
|
||||
@@ -1018,6 +1031,7 @@ _last_contact_sync: float = 0.0
|
||||
CONTACT_SYNC_THROTTLE_SECONDS = 30 # Don't sync more than once per 30 seconds
|
||||
CONTACT_RECONCILE_BATCH_SIZE = 2
|
||||
CONTACT_RECONCILE_YIELD_SECONDS = 0.05
|
||||
CONTACT_RECONCILE_BUSY_BACKOFF_SECONDS = 2.0
|
||||
|
||||
|
||||
def _evict_removed_contact_from_library_cache(mc: MeshCore, public_key: str) -> None:
|
||||
@@ -1227,6 +1241,8 @@ async def _reconcile_radio_contacts_in_background(
|
||||
)
|
||||
except RadioOperationBusyError:
|
||||
logger.debug("Background contact reconcile yielding: radio busy")
|
||||
await asyncio.sleep(CONTACT_RECONCILE_BUSY_BACKOFF_SECONDS)
|
||||
continue
|
||||
|
||||
await asyncio.sleep(CONTACT_RECONCILE_YIELD_SECONDS)
|
||||
if not progressed:
|
||||
|
||||
@@ -43,9 +43,6 @@ class WebSocketManager:
|
||||
3. Send to all clients concurrently with timeout
|
||||
4. Re-acquire lock to clean up disconnected clients
|
||||
"""
|
||||
if not self.active_connections:
|
||||
return
|
||||
|
||||
message = dump_ws_event(event_type, data)
|
||||
|
||||
# Copy connection list under lock to avoid holding lock during I/O
|
||||
|
||||
@@ -274,6 +274,7 @@ export function App() {
|
||||
unreadLastReadAts,
|
||||
recordMessageEvent,
|
||||
renameConversationState,
|
||||
removeConversationState,
|
||||
markAllRead,
|
||||
refreshUnreads,
|
||||
} = useUnreadCounts(channels, contacts, activeConversation);
|
||||
@@ -349,6 +350,7 @@ export function App() {
|
||||
observeMessage,
|
||||
recordMessageEvent,
|
||||
renameConversationState,
|
||||
removeConversationState,
|
||||
checkMention,
|
||||
pendingDeleteFallbackRef,
|
||||
setActiveConversation,
|
||||
|
||||
@@ -174,7 +174,11 @@ export function SearchView({
|
||||
api
|
||||
.getMessages({ q: debouncedQuery, limit: SEARCH_PAGE_SIZE, offset }, controller.signal)
|
||||
.then((data) => {
|
||||
setResults((prev) => [...prev, ...(data as SearchResult[])]);
|
||||
setResults((prev) => {
|
||||
const existingIds = new Set(prev.map((r) => r.id));
|
||||
const unique = (data as SearchResult[]).filter((r) => !existingIds.has(r.id));
|
||||
return [...prev, ...unique];
|
||||
});
|
||||
setHasMore(data.length >= SEARCH_PAGE_SIZE);
|
||||
setOffset((prev) => prev + data.length);
|
||||
})
|
||||
|
||||
@@ -43,6 +43,7 @@ interface UseRealtimeAppStateArgs {
|
||||
hasMention?: boolean;
|
||||
}) => void;
|
||||
renameConversationState: (oldStateKey: string, newStateKey: string) => void;
|
||||
removeConversationState: (stateKey: string) => void;
|
||||
checkMention: (text: string) => boolean;
|
||||
pendingDeleteFallbackRef: MutableRefObject<boolean>;
|
||||
setActiveConversation: (conv: Conversation | null) => void;
|
||||
@@ -96,6 +97,7 @@ export function useRealtimeAppState({
|
||||
observeMessage,
|
||||
recordMessageEvent,
|
||||
renameConversationState,
|
||||
removeConversationState,
|
||||
checkMention,
|
||||
pendingDeleteFallbackRef,
|
||||
setActiveConversation,
|
||||
@@ -232,6 +234,7 @@ export function useRealtimeAppState({
|
||||
onContactDeleted: (publicKey: string) => {
|
||||
setContacts((prev) => prev.filter((c) => c.public_key !== publicKey));
|
||||
removeConversationMessages(publicKey);
|
||||
removeConversationState(getStateKey('contact', publicKey));
|
||||
const active = activeConversationRef.current;
|
||||
if (active?.type === 'contact' && active.id === publicKey) {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
@@ -241,6 +244,7 @@ export function useRealtimeAppState({
|
||||
onChannelDeleted: (key: string) => {
|
||||
setChannels((prev) => prev.filter((c) => c.key !== key));
|
||||
removeConversationMessages(key);
|
||||
removeConversationState(getStateKey('channel', key));
|
||||
const active = activeConversationRef.current;
|
||||
if (active?.type === 'channel' && active.id === key) {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
@@ -267,6 +271,7 @@ export function useRealtimeAppState({
|
||||
checkMention,
|
||||
fetchAllContacts,
|
||||
fetchConfig,
|
||||
removeConversationState,
|
||||
renameConversationState,
|
||||
renameConversationMessages,
|
||||
maxRawPackets,
|
||||
|
||||
@@ -23,6 +23,7 @@ interface UseUnreadCountsResult {
|
||||
hasMention?: boolean;
|
||||
}) => void;
|
||||
renameConversationState: (oldStateKey: string, newStateKey: string) => void;
|
||||
removeConversationState: (stateKey: string) => void;
|
||||
markAllRead: () => void;
|
||||
refreshUnreads: () => Promise<void>;
|
||||
}
|
||||
@@ -235,6 +236,27 @@ export function useUnreadCounts(
|
||||
setLastMessageTimes(renameConversationTimeKey(oldStateKey, newStateKey));
|
||||
}, []);
|
||||
|
||||
const removeConversationState = useCallback((stateKey: string) => {
|
||||
setUnreadCounts((prev) => {
|
||||
if (!(stateKey in prev)) return prev;
|
||||
const next = { ...prev };
|
||||
delete next[stateKey];
|
||||
return next;
|
||||
});
|
||||
setMentions((prev) => {
|
||||
if (!(stateKey in prev)) return prev;
|
||||
const next = { ...prev };
|
||||
delete next[stateKey];
|
||||
return next;
|
||||
});
|
||||
setUnreadLastReadAts((prev) => {
|
||||
if (!(stateKey in prev)) return prev;
|
||||
const next = { ...prev };
|
||||
delete next[stateKey];
|
||||
return next;
|
||||
});
|
||||
}, []);
|
||||
|
||||
// Mark all conversations as read
|
||||
// Calls single bulk API endpoint to persist read state
|
||||
const markAllRead = useCallback(() => {
|
||||
@@ -256,6 +278,7 @@ export function useUnreadCounts(
|
||||
unreadLastReadAts,
|
||||
recordMessageEvent,
|
||||
renameConversationState,
|
||||
removeConversationState,
|
||||
markAllRead,
|
||||
refreshUnreads: fetchUnreads,
|
||||
};
|
||||
|
||||
@@ -69,6 +69,7 @@ function createRealtimeArgs(overrides: Partial<Parameters<typeof useRealtimeAppS
|
||||
observeMessage: vi.fn(() => ({ added: false, activeConversation: false })),
|
||||
recordMessageEvent: vi.fn(),
|
||||
renameConversationState: vi.fn(),
|
||||
removeConversationState: vi.fn(),
|
||||
checkMention: vi.fn(() => false),
|
||||
pendingDeleteFallbackRef: { current: false },
|
||||
setActiveConversation: vi.fn(),
|
||||
|
||||
@@ -11,8 +11,6 @@ import pytest
|
||||
|
||||
from app.event_handlers import (
|
||||
_active_subscriptions,
|
||||
_buffered_acks,
|
||||
_pending_acks,
|
||||
cleanup_expired_acks,
|
||||
register_event_handlers,
|
||||
track_pending_ack,
|
||||
@@ -23,6 +21,7 @@ from app.repository import (
|
||||
ContactRepository,
|
||||
MessageRepository,
|
||||
)
|
||||
from app.services.dm_ack_tracker import _buffered_acks, _pending_acks
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
|
||||
Reference in New Issue
Block a user