mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
extract frontend realtime state hook
This commit is contained in:
32
AGENTS.md
32
AGENTS.md
@@ -28,28 +28,29 @@ Ancillary AGENTS.md files which should generally not be reviewed unless specific
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ Frontend (React) │
|
||||
│ Frontend (React) │
|
||||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │
|
||||
│ │ StatusBar│ │ Sidebar │ │MessageList│ │ MessageInput │ │
|
||||
│ └──────────┘ └──────────┘ └──────────┘ └──────────────────┘ │
|
||||
│ ┌────────────────────────────────────────────────────────────┐ │
|
||||
│ │ CrackerPanel (global collapsible, WebGPU cracking) │ │
|
||||
│ └────────────────────────────────────────────────────────────┘ │
|
||||
│ │ │
|
||||
│ useWebSocket ←──── Real-time updates │
|
||||
│ │ │
|
||||
│ api.ts ←──── REST API calls │
|
||||
└───────────────────────────┼──────────────────────────────────────┘
|
||||
│ │ │
|
||||
│ useWebSocket ←──── Real-time updates │
|
||||
│ │ │
|
||||
│ api.ts ←──── REST API calls │
|
||||
└───────────────────────────┼─────────────────────────────────────┘
|
||||
│ HTTP + WebSocket (/api/*)
|
||||
┌───────────────────────────┼──────────────────────────────────────┐
|
||||
│ Backend (FastAPI) │
|
||||
│ ┌──────────┐ ┌──────────────┐ ┌────────────┐ ┌───────────┐ │
|
||||
│ │ Routers │→ │ Repositories │→ │ SQLite DB │ │ WebSocket │ │
|
||||
│ └──────────┘ └──────────────┘ └────────────┘ │ Manager │ │
|
||||
│ ↓ └───────────┘ │
|
||||
│ ┌──────────────────────────────────────────────────────────┐ │
|
||||
│ │ RadioManager + Event Handlers │ │
|
||||
│ └──────────────────────────────────────────────────────────┘ │
|
||||
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ ┌────────────┐ │
|
||||
│ │ Routers │→ │ Services │→ │ Repositories │→ │ SQLite DB │ │
|
||||
│ └──────────┘ └──────────┘ └──────────────┘ └────────────┘ │
|
||||
│ ↓ │ ┌───────────┐ │
|
||||
│ ┌──────────────────────────┐ └──────────────→ │ WebSocket │ │
|
||||
│ │ RadioManager + lifecycle │ │ Manager │ │
|
||||
│ │ / event adapters │ └───────────┘ │
|
||||
│ └──────────────────────────┘ │
|
||||
└───────────────────────────┼──────────────────────────────────────┘
|
||||
│ Serial / TCP / BLE
|
||||
┌──────┴──────┐
|
||||
@@ -142,7 +143,7 @@ MeshCore firmware can encode path hops as 1-byte, 2-byte, or 3-byte identifiers.
|
||||
|
||||
1. User types message → clicks send
|
||||
2. `api.sendChannelMessage()` → POST to backend
|
||||
3. Backend calls `radio_manager.meshcore.commands.send_chan_msg()`
|
||||
3. Backend route delegates to service-layer send orchestration, which acquires the radio lock and calls MeshCore commands
|
||||
4. Message stored in database with `outgoing=true`
|
||||
5. For direct messages: ACK tracked; for channel: repeat detection
|
||||
|
||||
@@ -162,6 +163,7 @@ This message-layer echo/path handling is independent of raw-packet storage dedup
|
||||
│ ├── AGENTS.md # Backend documentation
|
||||
│ ├── main.py # App entry, lifespan
|
||||
│ ├── routers/ # API endpoints
|
||||
│ ├── services/ # Shared backend orchestration/domain services
|
||||
│ ├── packet_processor.py # Raw packet pipeline, dedup, path handling
|
||||
│ ├── repository/ # Database CRUD (contacts, channels, messages, raw_packets, settings, fanout)
|
||||
│ ├── event_handlers.py # Radio events
|
||||
@@ -250,6 +252,8 @@ Key test files:
|
||||
- `tests/test_messages_search.py` - Message search, around endpoint, forward pagination
|
||||
- `tests/test_rx_log_data.py` - on_rx_log_data event handler integration
|
||||
- `tests/test_ack_tracking_wiring.py` - DM ACK tracking extraction and wiring
|
||||
- `tests/test_radio_lifecycle_service.py` - Radio reconnect/setup orchestration helpers
|
||||
- `tests/test_radio_commands_service.py` - Radio config/private-key service workflows
|
||||
- `tests/test_health_mqtt_status.py` - Health endpoint MQTT status field
|
||||
- `tests/test_community_mqtt.py` - Community MQTT publisher (JWT, packet format, hash, broadcast)
|
||||
- `tests/test_radio_sync.py` - Radio sync, periodic tasks, and contact offload back to the radio
|
||||
|
||||
@@ -21,11 +21,19 @@ app/
|
||||
├── migrations.py # Schema migrations (SQLite user_version)
|
||||
├── models.py # Pydantic request/response models
|
||||
├── repository/ # Data access layer (contacts, channels, messages, raw_packets, settings, fanout)
|
||||
├── radio.py # RadioManager + auto-reconnect monitor
|
||||
├── services/ # Shared orchestration/domain services
|
||||
│ ├── messages.py # Shared message creation, dedup, ACK application
|
||||
│ ├── message_send.py # Direct send, channel send, resend workflows
|
||||
│ ├── dm_ack_tracker.py # Pending DM ACK state
|
||||
│ ├── contact_reconciliation.py # Prefix-claim, sender-key backfill, name-history wiring
|
||||
│ ├── radio_lifecycle.py # Post-connect setup and reconnect/setup helpers
|
||||
│ └── radio_commands.py # Radio config/private-key command workflows
|
||||
├── radio.py # RadioManager transport/session state + lock management
|
||||
├── radio_sync.py # Polling, sync, periodic advertisement loop
|
||||
├── decoder.py # Packet parsing/decryption
|
||||
├── packet_processor.py # Raw packet pipeline, dedup, path handling
|
||||
├── event_handlers.py # MeshCore event subscriptions and ACK tracking
|
||||
├── events.py # Typed WS event payload serialization
|
||||
├── websocket.py # WS manager + broadcast helpers
|
||||
├── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise (see fanout/AGENTS_fanout.md)
|
||||
├── dependencies.py # Shared FastAPI dependency providers
|
||||
@@ -53,13 +61,13 @@ app/
|
||||
|
||||
1. Radio emits events.
|
||||
2. `on_rx_log_data` stores raw packet and tries decrypt/pipeline handling.
|
||||
3. Decrypted messages are inserted into `messages` and broadcast over WS.
|
||||
3. Shared message-domain services create/update `messages` and shape WS payloads.
|
||||
4. `CONTACT_MSG_RECV` is a fallback DM path when packet pipeline cannot decrypt.
|
||||
|
||||
### Outgoing messages
|
||||
|
||||
1. Send endpoints in `routers/messages.py` call MeshCore commands.
|
||||
2. Message is persisted as outgoing.
|
||||
1. Send endpoints in `routers/messages.py` validate requests and delegate to `services/message_send.py`.
|
||||
2. Service-layer send workflows call MeshCore commands, persist outgoing messages, and wire ACK tracking.
|
||||
3. Endpoint broadcasts WS `message` event so all live clients update.
|
||||
4. ACK/repeat updates arrive later as `message_acked` events.
|
||||
5. Channel resend (`POST /messages/channel/{id}/resend`) strips the sender name prefix by exact match against the current radio name. This assumes the radio name hasn't changed between the original send and the resend. Name changes require an explicit radio config update and are rare, but the `new_timestamp=true` resend path has no time window, so a mismatch is possible if the name was changed between the original send and a later resend.
|
||||
@@ -67,9 +75,9 @@ app/
|
||||
### Connection lifecycle
|
||||
|
||||
- `RadioManager.start_connection_monitor()` checks health every 5s.
|
||||
- Monitor reconnect path runs `post_connect_setup()` before broadcasting healthy state.
|
||||
- Manual reconnect/reboot endpoints call `reconnect()` then `post_connect_setup()`.
|
||||
- Setup includes handler registration, key export, time sync, contact/channel sync, polling/advert tasks.
|
||||
- `RadioManager.post_connect_setup()` delegates to `services/radio_lifecycle.py`.
|
||||
- Shared reconnect/setup helpers in `services/radio_lifecycle.py` are used by startup, the monitor, and manual reconnect/reboot flows before broadcasting healthy state.
|
||||
- Setup still includes handler registration, key export, time sync, contact/channel sync, polling/advert tasks.
|
||||
|
||||
## Important Behaviors
|
||||
|
||||
@@ -215,7 +223,7 @@ app/
|
||||
- `error` — toast notification (reconnect failure, missing private key, etc.)
|
||||
- `success` — toast notification (historical decrypt complete, etc.)
|
||||
|
||||
Initial WS connect sends `health` only. Contacts/channels are loaded by REST.
|
||||
Backend WS sends go through typed serialization in `events.py`. Initial WS connect sends `health` only. Contacts/channels are loaded by REST.
|
||||
Client sends `"ping"` text; server replies `{"type":"pong"}`.
|
||||
|
||||
## Data Model Notes
|
||||
@@ -289,6 +297,8 @@ tests/
|
||||
├── test_packet_pipeline.py # End-to-end packet processing
|
||||
├── test_packets_router.py # Packets router endpoints (decrypt, maintenance)
|
||||
├── test_radio.py # RadioManager, serial detection
|
||||
├── test_radio_commands_service.py # Radio config/private-key service workflows
|
||||
├── test_radio_lifecycle_service.py # Reconnect/setup orchestration helpers
|
||||
├── test_real_crypto.py # Real cryptographic operations
|
||||
├── test_radio_operation.py # radio_operation() context manager
|
||||
├── test_radio_router.py # Radio router endpoints
|
||||
|
||||
@@ -25,6 +25,7 @@ frontend/src/
|
||||
├── api.ts # Typed REST client
|
||||
├── types.ts # Shared TS contracts
|
||||
├── useWebSocket.ts # WS lifecycle + event dispatch
|
||||
├── wsEvents.ts # Typed WS event parsing / discriminated union
|
||||
├── messageCache.ts # Conversation-scoped cache
|
||||
├── prefetch.ts # Consumes prefetched API promises started in index.html
|
||||
├── index.css # Global styles/utilities
|
||||
@@ -36,6 +37,7 @@ frontend/src/
|
||||
│ ├── index.ts # Central re-export of all hooks
|
||||
│ ├── useConversationMessages.ts # Fetch, pagination, dedup, ACK buffering
|
||||
│ ├── useUnreadCounts.ts # Unread counters, mentions, recent-sort timestamps
|
||||
│ ├── useRealtimeAppState.ts # WebSocket event application and reconnect recovery
|
||||
│ ├── useRepeaterDashboard.ts # Repeater dashboard state (login, panes, console, retries)
|
||||
│ ├── useRadioControl.ts # Radio health/config state, reconnection
|
||||
│ ├── useAppSettings.ts # Settings, favorites, preferences migration
|
||||
@@ -138,8 +140,11 @@ frontend/src/
|
||||
├── useConversationMessages.race.test.ts
|
||||
├── useRepeaterDashboard.test.ts
|
||||
├── useContactsAndChannels.test.ts
|
||||
├── useRealtimeAppState.test.ts
|
||||
├── useUnreadCounts.test.ts
|
||||
├── useWebSocket.dispatch.test.ts
|
||||
└── useWebSocket.lifecycle.test.ts
|
||||
├── useWebSocket.lifecycle.test.ts
|
||||
└── wsEvents.test.ts
|
||||
|
||||
```
|
||||
|
||||
@@ -154,12 +159,14 @@ frontend/src/
|
||||
- `useConversationRouter`: URL hash → active conversation routing
|
||||
- `useConversationMessages`: fetch, pagination, dedup/update helpers
|
||||
- `useUnreadCounts`: unread counters, mention tracking, recent-sort timestamps
|
||||
- `useRealtimeAppState`: typed WS event application, reconnect recovery, cache/unread coordination
|
||||
- `useRepeaterDashboard`: repeater dashboard state (login, pane data/retries, console, actions)
|
||||
|
||||
### Initial load + realtime
|
||||
|
||||
- Initial data: REST fetches (`api.ts`) for config/settings/channels/contacts/unreads.
|
||||
- WebSocket: realtime deltas/events.
|
||||
- On reconnect, `App.tsx` refetches channels and contacts, refreshes unread counts, and reconciles the active conversation to recover disconnect-window drift.
|
||||
- On WS connect, backend sends `health` only; contacts/channels still come from REST.
|
||||
|
||||
### New Message modal
|
||||
@@ -193,6 +200,7 @@ frontend/src/
|
||||
|
||||
- Auto reconnect (3s) with cleanup guard on unmount.
|
||||
- Heartbeat ping every 30s.
|
||||
- Incoming JSON is parsed through `wsEvents.ts`, which returns a typed discriminated union for known events and a centralized `unknown` fallback.
|
||||
- Event handlers: `health`, `message`, `contact`, `raw_packet`, `message_acked`, `contact_deleted`, `channel_deleted`, `error`, `success`, `pong` (ignored).
|
||||
- For `raw_packet` events, use `observation_id` as event identity; `id` is a storage reference and may repeat.
|
||||
|
||||
|
||||
@@ -14,11 +14,11 @@ import { useWebSocket } from './useWebSocket';
|
||||
import {
|
||||
useUnreadCounts,
|
||||
useConversationMessages,
|
||||
getMessageContentKey,
|
||||
useRadioControl,
|
||||
useAppSettings,
|
||||
useConversationRouter,
|
||||
useContactsAndChannels,
|
||||
useRealtimeAppState,
|
||||
} from './hooks';
|
||||
import * as messageCache from './messageCache';
|
||||
import { StatusBar } from './components/StatusBar';
|
||||
@@ -62,24 +62,11 @@ import {
|
||||
SheetTitle,
|
||||
} from './components/ui/sheet';
|
||||
import { Toaster, toast } from './components/ui/sonner';
|
||||
import { getStateKey } from './utils/conversationState';
|
||||
import { appendRawPacketUnique } from './utils/rawPacketIdentity';
|
||||
import { messageContainsMention } from './utils/messageParser';
|
||||
import { mergeContactIntoList } from './utils/contactMerge';
|
||||
import { getLocalLabel, getContrastTextColor } from './utils/localLabel';
|
||||
import { cn } from '@/lib/utils';
|
||||
import type { SearchNavigateTarget } from './components/SearchView';
|
||||
import type {
|
||||
Channel,
|
||||
Contact,
|
||||
Conversation,
|
||||
HealthStatus,
|
||||
Message,
|
||||
MessagePath,
|
||||
RawPacket,
|
||||
} from './types';
|
||||
|
||||
const MAX_RAW_PACKETS = 500;
|
||||
import type { Channel, Conversation, Message, RawPacket } from './types';
|
||||
|
||||
export function App() {
|
||||
const messageInputRef = useRef<MessageInputHandle>(null);
|
||||
@@ -233,6 +220,29 @@ export function App() {
|
||||
return contact?.type === CONTACT_TYPE_REPEATER;
|
||||
}, [activeConversation, contacts]);
|
||||
|
||||
const wsHandlers = useRealtimeAppState({
|
||||
prevHealthRef,
|
||||
setHealth,
|
||||
fetchConfig,
|
||||
setRawPackets,
|
||||
triggerReconcile,
|
||||
refreshUnreads,
|
||||
setChannels,
|
||||
fetchAllContacts,
|
||||
setContacts,
|
||||
blockedKeysRef,
|
||||
blockedNamesRef,
|
||||
activeConversationRef,
|
||||
hasNewerMessagesRef,
|
||||
addMessageIfNew,
|
||||
trackNewMessage,
|
||||
incrementUnread,
|
||||
checkMention,
|
||||
pendingDeleteFallbackRef,
|
||||
setActiveConversation,
|
||||
updateMessageAck,
|
||||
});
|
||||
|
||||
const mergeChannelIntoList = useCallback(
|
||||
(updated: Channel) => {
|
||||
setChannels((prev) => {
|
||||
@@ -248,185 +258,6 @@ export function App() {
|
||||
[setChannels]
|
||||
);
|
||||
|
||||
// WebSocket handlers - memoized to prevent reconnection loops
|
||||
const wsHandlers = useMemo(
|
||||
() => ({
|
||||
onHealth: (data: HealthStatus) => {
|
||||
const prev = prevHealthRef.current;
|
||||
prevHealthRef.current = data;
|
||||
setHealth(data);
|
||||
const initializationCompleted =
|
||||
prev !== null &&
|
||||
prev.radio_connected &&
|
||||
prev.radio_initializing &&
|
||||
data.radio_connected &&
|
||||
!data.radio_initializing;
|
||||
|
||||
// Show toast on connection status change
|
||||
if (prev !== null && prev.radio_connected !== data.radio_connected) {
|
||||
if (data.radio_connected) {
|
||||
toast.success('Radio connected', {
|
||||
description: data.connection_info
|
||||
? `Connected via ${data.connection_info}`
|
||||
: undefined,
|
||||
});
|
||||
// Refresh config after reconnection (may have changed after reboot)
|
||||
fetchConfig();
|
||||
} else {
|
||||
toast.error('Radio disconnected', {
|
||||
description: 'Check radio connection and power',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (initializationCompleted) {
|
||||
fetchConfig();
|
||||
}
|
||||
},
|
||||
onError: (error: { message: string; details?: string }) => {
|
||||
toast.error(error.message, {
|
||||
description: error.details,
|
||||
});
|
||||
},
|
||||
onSuccess: (success: { message: string; details?: string }) => {
|
||||
toast.success(success.message, {
|
||||
description: success.details,
|
||||
});
|
||||
},
|
||||
onReconnect: () => {
|
||||
// Clear raw packets: observation_id is a process-local counter that resets
|
||||
// on backend restart, so stale packets would cause new ones to be deduped away.
|
||||
setRawPackets([]);
|
||||
// Silently recover any data missed during the disconnect window
|
||||
triggerReconcile();
|
||||
refreshUnreads();
|
||||
api.getChannels().then(setChannels).catch(console.error);
|
||||
fetchAllContacts()
|
||||
.then((data) => setContacts(data))
|
||||
.catch(console.error);
|
||||
},
|
||||
onMessage: (msg: Message) => {
|
||||
// Filter blocked contacts on incoming (non-outgoing) messages
|
||||
if (!msg.outgoing) {
|
||||
const bKeys = blockedKeysRef.current;
|
||||
const bNames = blockedNamesRef.current;
|
||||
// Block DMs by sender key
|
||||
if (
|
||||
bKeys.length > 0 &&
|
||||
msg.type === 'PRIV' &&
|
||||
bKeys.includes(msg.conversation_key.toLowerCase())
|
||||
)
|
||||
return;
|
||||
// Block channel messages by sender key
|
||||
if (
|
||||
bKeys.length > 0 &&
|
||||
msg.type === 'CHAN' &&
|
||||
msg.sender_key &&
|
||||
bKeys.includes(msg.sender_key.toLowerCase())
|
||||
)
|
||||
return;
|
||||
// Block by sender name (works for both DMs and channel messages)
|
||||
if (bNames.length > 0 && msg.sender_name && bNames.includes(msg.sender_name)) return;
|
||||
}
|
||||
|
||||
const activeConv = activeConversationRef.current;
|
||||
|
||||
// Check if message belongs to the active conversation
|
||||
const isForActiveConversation = (() => {
|
||||
if (!activeConv) return false;
|
||||
if (msg.type === 'CHAN' && activeConv.type === 'channel') {
|
||||
return msg.conversation_key === activeConv.id;
|
||||
}
|
||||
if (msg.type === 'PRIV' && activeConv.type === 'contact') {
|
||||
return msg.conversation_key === activeConv.id;
|
||||
}
|
||||
return false;
|
||||
})();
|
||||
|
||||
// Only add to message list if it's for the active conversation
|
||||
// and we're not viewing historical messages (hasNewerMessages means we jumped mid-history)
|
||||
if (isForActiveConversation && !hasNewerMessagesRef.current) {
|
||||
addMessageIfNew(msg);
|
||||
}
|
||||
|
||||
// Track for unread counts and sorting
|
||||
trackNewMessage(msg);
|
||||
|
||||
const contentKey = getMessageContentKey(msg);
|
||||
|
||||
// For non-active conversations: update cache and count unreads
|
||||
if (!isForActiveConversation) {
|
||||
// Update message cache (instant restore on switch) — returns true if new
|
||||
const isNew = messageCache.addMessage(msg.conversation_key, msg, contentKey);
|
||||
|
||||
// Count unread for incoming messages (skip duplicates from multiple mesh paths)
|
||||
if (!msg.outgoing && isNew) {
|
||||
let stateKey: string | null = null;
|
||||
if (msg.type === 'CHAN' && msg.conversation_key) {
|
||||
stateKey = getStateKey('channel', msg.conversation_key);
|
||||
} else if (msg.type === 'PRIV' && msg.conversation_key) {
|
||||
stateKey = getStateKey('contact', msg.conversation_key);
|
||||
}
|
||||
if (stateKey) {
|
||||
const hasMention = checkMention(msg.text);
|
||||
incrementUnread(stateKey, hasMention);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
onContact: (contact: Contact) => {
|
||||
setContacts((prev) => mergeContactIntoList(prev, contact));
|
||||
},
|
||||
onChannel: (channel: Channel) => {
|
||||
mergeChannelIntoList(channel);
|
||||
},
|
||||
onContactDeleted: (publicKey: string) => {
|
||||
setContacts((prev) => prev.filter((c) => c.public_key !== publicKey));
|
||||
messageCache.remove(publicKey);
|
||||
const active = activeConversationRef.current;
|
||||
if (active?.type === 'contact' && active.id === publicKey) {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
setActiveConversation(null);
|
||||
}
|
||||
},
|
||||
onChannelDeleted: (key: string) => {
|
||||
setChannels((prev) => prev.filter((c) => c.key !== key));
|
||||
messageCache.remove(key);
|
||||
const active = activeConversationRef.current;
|
||||
if (active?.type === 'channel' && active.id === key) {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
setActiveConversation(null);
|
||||
}
|
||||
},
|
||||
onRawPacket: (packet: RawPacket) => {
|
||||
setRawPackets((prev) => appendRawPacketUnique(prev, packet, MAX_RAW_PACKETS));
|
||||
},
|
||||
onMessageAcked: (messageId: number, ackCount: number, paths?: MessagePath[]) => {
|
||||
updateMessageAck(messageId, ackCount, paths);
|
||||
messageCache.updateAck(messageId, ackCount, paths);
|
||||
},
|
||||
}),
|
||||
[
|
||||
addMessageIfNew,
|
||||
trackNewMessage,
|
||||
incrementUnread,
|
||||
updateMessageAck,
|
||||
checkMention,
|
||||
fetchConfig,
|
||||
mergeChannelIntoList,
|
||||
prevHealthRef,
|
||||
setHealth,
|
||||
activeConversationRef,
|
||||
hasNewerMessagesRef,
|
||||
setActiveConversation,
|
||||
setContacts,
|
||||
setChannels,
|
||||
triggerReconcile,
|
||||
refreshUnreads,
|
||||
fetchAllContacts,
|
||||
]
|
||||
);
|
||||
|
||||
// Connect to WebSocket
|
||||
useWebSocket(wsHandlers);
|
||||
|
||||
|
||||
@@ -5,3 +5,4 @@ export { useRepeaterDashboard } from './useRepeaterDashboard';
|
||||
export { useAppSettings } from './useAppSettings';
|
||||
export { useConversationRouter } from './useConversationRouter';
|
||||
export { useContactsAndChannels } from './useContactsAndChannels';
|
||||
export { useRealtimeAppState } from './useRealtimeAppState';
|
||||
|
||||
264
frontend/src/hooks/useRealtimeAppState.ts
Normal file
264
frontend/src/hooks/useRealtimeAppState.ts
Normal file
@@ -0,0 +1,264 @@
|
||||
import {
|
||||
useCallback,
|
||||
useMemo,
|
||||
type Dispatch,
|
||||
type MutableRefObject,
|
||||
type SetStateAction,
|
||||
} from 'react';
|
||||
import { api } from '../api';
|
||||
import * as messageCache from '../messageCache';
|
||||
import type { UseWebSocketOptions } from '../useWebSocket';
|
||||
import { toast } from '../components/ui/sonner';
|
||||
import { getStateKey } from '../utils/conversationState';
|
||||
import { mergeContactIntoList } from '../utils/contactMerge';
|
||||
import { appendRawPacketUnique } from '../utils/rawPacketIdentity';
|
||||
import { getMessageContentKey } from './useConversationMessages';
|
||||
import type {
|
||||
Channel,
|
||||
Contact,
|
||||
Conversation,
|
||||
HealthStatus,
|
||||
Message,
|
||||
MessagePath,
|
||||
RawPacket,
|
||||
} from '../types';
|
||||
|
||||
interface UseRealtimeAppStateArgs {
|
||||
prevHealthRef: MutableRefObject<HealthStatus | null>;
|
||||
setHealth: Dispatch<SetStateAction<HealthStatus | null>>;
|
||||
fetchConfig: () => void | Promise<void>;
|
||||
setRawPackets: Dispatch<SetStateAction<RawPacket[]>>;
|
||||
triggerReconcile: () => void;
|
||||
refreshUnreads: () => Promise<void>;
|
||||
setChannels: Dispatch<SetStateAction<Channel[]>>;
|
||||
fetchAllContacts: () => Promise<Contact[]>;
|
||||
setContacts: Dispatch<SetStateAction<Contact[]>>;
|
||||
blockedKeysRef: MutableRefObject<string[]>;
|
||||
blockedNamesRef: MutableRefObject<string[]>;
|
||||
activeConversationRef: MutableRefObject<Conversation | null>;
|
||||
hasNewerMessagesRef: MutableRefObject<boolean>;
|
||||
addMessageIfNew: (msg: Message) => boolean;
|
||||
trackNewMessage: (msg: Message) => void;
|
||||
incrementUnread: (stateKey: string, hasMention?: boolean) => void;
|
||||
checkMention: (text: string) => boolean;
|
||||
pendingDeleteFallbackRef: MutableRefObject<boolean>;
|
||||
setActiveConversation: (conv: Conversation | null) => void;
|
||||
updateMessageAck: (messageId: number, ackCount: number, paths?: MessagePath[]) => void;
|
||||
maxRawPackets?: number;
|
||||
}
|
||||
|
||||
function isMessageBlocked(msg: Message, blockedKeys: string[], blockedNames: string[]): boolean {
|
||||
if (msg.outgoing) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (blockedKeys.length > 0) {
|
||||
if (msg.type === 'PRIV' && blockedKeys.includes(msg.conversation_key.toLowerCase())) {
|
||||
return true;
|
||||
}
|
||||
if (
|
||||
msg.type === 'CHAN' &&
|
||||
msg.sender_key &&
|
||||
blockedKeys.includes(msg.sender_key.toLowerCase())
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return blockedNames.length > 0 && !!msg.sender_name && blockedNames.includes(msg.sender_name);
|
||||
}
|
||||
|
||||
function isActiveConversationMessage(
|
||||
activeConversation: Conversation | null,
|
||||
msg: Message
|
||||
): boolean {
|
||||
if (!activeConversation) return false;
|
||||
if (msg.type === 'CHAN' && activeConversation.type === 'channel') {
|
||||
return msg.conversation_key === activeConversation.id;
|
||||
}
|
||||
if (msg.type === 'PRIV' && activeConversation.type === 'contact') {
|
||||
return msg.conversation_key === activeConversation.id;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export function useRealtimeAppState({
|
||||
prevHealthRef,
|
||||
setHealth,
|
||||
fetchConfig,
|
||||
setRawPackets,
|
||||
triggerReconcile,
|
||||
refreshUnreads,
|
||||
setChannels,
|
||||
fetchAllContacts,
|
||||
setContacts,
|
||||
blockedKeysRef,
|
||||
blockedNamesRef,
|
||||
activeConversationRef,
|
||||
hasNewerMessagesRef,
|
||||
addMessageIfNew,
|
||||
trackNewMessage,
|
||||
incrementUnread,
|
||||
checkMention,
|
||||
pendingDeleteFallbackRef,
|
||||
setActiveConversation,
|
||||
updateMessageAck,
|
||||
maxRawPackets = 500,
|
||||
}: UseRealtimeAppStateArgs): UseWebSocketOptions {
|
||||
const mergeChannelIntoList = useCallback(
|
||||
(updated: Channel) => {
|
||||
setChannels((prev) => {
|
||||
const existingIndex = prev.findIndex((channel) => channel.key === updated.key);
|
||||
if (existingIndex === -1) {
|
||||
return [...prev, updated].sort((a, b) => a.name.localeCompare(b.name));
|
||||
}
|
||||
const next = [...prev];
|
||||
next[existingIndex] = updated;
|
||||
return next;
|
||||
});
|
||||
},
|
||||
[setChannels]
|
||||
);
|
||||
|
||||
return useMemo(
|
||||
() => ({
|
||||
onHealth: (data: HealthStatus) => {
|
||||
const prev = prevHealthRef.current;
|
||||
prevHealthRef.current = data;
|
||||
setHealth(data);
|
||||
const initializationCompleted =
|
||||
prev !== null &&
|
||||
prev.radio_connected &&
|
||||
prev.radio_initializing &&
|
||||
data.radio_connected &&
|
||||
!data.radio_initializing;
|
||||
|
||||
if (prev !== null && prev.radio_connected !== data.radio_connected) {
|
||||
if (data.radio_connected) {
|
||||
toast.success('Radio connected', {
|
||||
description: data.connection_info
|
||||
? `Connected via ${data.connection_info}`
|
||||
: undefined,
|
||||
});
|
||||
fetchConfig();
|
||||
} else {
|
||||
toast.error('Radio disconnected', {
|
||||
description: 'Check radio connection and power',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (initializationCompleted) {
|
||||
fetchConfig();
|
||||
}
|
||||
},
|
||||
onError: (error: { message: string; details?: string }) => {
|
||||
toast.error(error.message, {
|
||||
description: error.details,
|
||||
});
|
||||
},
|
||||
onSuccess: (success: { message: string; details?: string }) => {
|
||||
toast.success(success.message, {
|
||||
description: success.details,
|
||||
});
|
||||
},
|
||||
onReconnect: () => {
|
||||
setRawPackets([]);
|
||||
triggerReconcile();
|
||||
refreshUnreads();
|
||||
api.getChannels().then(setChannels).catch(console.error);
|
||||
fetchAllContacts()
|
||||
.then((data) => setContacts(data))
|
||||
.catch(console.error);
|
||||
},
|
||||
onMessage: (msg: Message) => {
|
||||
if (isMessageBlocked(msg, blockedKeysRef.current, blockedNamesRef.current)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const isForActiveConversation = isActiveConversationMessage(
|
||||
activeConversationRef.current,
|
||||
msg
|
||||
);
|
||||
|
||||
if (isForActiveConversation && !hasNewerMessagesRef.current) {
|
||||
addMessageIfNew(msg);
|
||||
}
|
||||
|
||||
trackNewMessage(msg);
|
||||
|
||||
const contentKey = getMessageContentKey(msg);
|
||||
if (!isForActiveConversation) {
|
||||
const isNew = messageCache.addMessage(msg.conversation_key, msg, contentKey);
|
||||
|
||||
if (!msg.outgoing && isNew) {
|
||||
let stateKey: string | null = null;
|
||||
if (msg.type === 'CHAN' && msg.conversation_key) {
|
||||
stateKey = getStateKey('channel', msg.conversation_key);
|
||||
} else if (msg.type === 'PRIV' && msg.conversation_key) {
|
||||
stateKey = getStateKey('contact', msg.conversation_key);
|
||||
}
|
||||
if (stateKey) {
|
||||
incrementUnread(stateKey, checkMention(msg.text));
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
onContact: (contact: Contact) => {
|
||||
setContacts((prev) => mergeContactIntoList(prev, contact));
|
||||
},
|
||||
onChannel: (channel: Channel) => {
|
||||
mergeChannelIntoList(channel);
|
||||
},
|
||||
onContactDeleted: (publicKey: string) => {
|
||||
setContacts((prev) => prev.filter((c) => c.public_key !== publicKey));
|
||||
messageCache.remove(publicKey);
|
||||
const active = activeConversationRef.current;
|
||||
if (active?.type === 'contact' && active.id === publicKey) {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
setActiveConversation(null);
|
||||
}
|
||||
},
|
||||
onChannelDeleted: (key: string) => {
|
||||
setChannels((prev) => prev.filter((c) => c.key !== key));
|
||||
messageCache.remove(key);
|
||||
const active = activeConversationRef.current;
|
||||
if (active?.type === 'channel' && active.id === key) {
|
||||
pendingDeleteFallbackRef.current = true;
|
||||
setActiveConversation(null);
|
||||
}
|
||||
},
|
||||
onRawPacket: (packet: RawPacket) => {
|
||||
setRawPackets((prev) => appendRawPacketUnique(prev, packet, maxRawPackets));
|
||||
},
|
||||
onMessageAcked: (messageId: number, ackCount: number, paths?: MessagePath[]) => {
|
||||
updateMessageAck(messageId, ackCount, paths);
|
||||
messageCache.updateAck(messageId, ackCount, paths);
|
||||
},
|
||||
}),
|
||||
[
|
||||
activeConversationRef,
|
||||
addMessageIfNew,
|
||||
blockedKeysRef,
|
||||
blockedNamesRef,
|
||||
checkMention,
|
||||
fetchAllContacts,
|
||||
fetchConfig,
|
||||
hasNewerMessagesRef,
|
||||
incrementUnread,
|
||||
maxRawPackets,
|
||||
mergeChannelIntoList,
|
||||
pendingDeleteFallbackRef,
|
||||
prevHealthRef,
|
||||
refreshUnreads,
|
||||
setActiveConversation,
|
||||
setChannels,
|
||||
setContacts,
|
||||
setHealth,
|
||||
setRawPackets,
|
||||
trackNewMessage,
|
||||
triggerReconcile,
|
||||
updateMessageAck,
|
||||
]
|
||||
);
|
||||
}
|
||||
216
frontend/src/test/useRealtimeAppState.test.ts
Normal file
216
frontend/src/test/useRealtimeAppState.test.ts
Normal file
@@ -0,0 +1,216 @@
|
||||
import { act, renderHook, waitFor } from '@testing-library/react';
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { useRealtimeAppState } from '../hooks/useRealtimeAppState';
|
||||
import type { Channel, Contact, Conversation, HealthStatus, Message, RawPacket } from '../types';
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
api: {
|
||||
getChannels: vi.fn(),
|
||||
},
|
||||
toast: {
|
||||
success: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
messageCache: {
|
||||
addMessage: vi.fn(),
|
||||
remove: vi.fn(),
|
||||
updateAck: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('../api', () => ({
|
||||
api: mocks.api,
|
||||
}));
|
||||
|
||||
vi.mock('../components/ui/sonner', () => ({
|
||||
toast: mocks.toast,
|
||||
}));
|
||||
|
||||
vi.mock('../messageCache', () => mocks.messageCache);
|
||||
|
||||
const publicChannel: Channel = {
|
||||
key: '8B3387E9C5CDEA6AC9E5EDBAA115CD72',
|
||||
name: 'Public',
|
||||
is_hashtag: false,
|
||||
on_radio: false,
|
||||
last_read_at: null,
|
||||
};
|
||||
|
||||
const incomingDm: Message = {
|
||||
id: 7,
|
||||
type: 'PRIV',
|
||||
conversation_key: 'aa'.repeat(32),
|
||||
text: 'hello',
|
||||
sender_timestamp: 1700000000,
|
||||
received_at: 1700000001,
|
||||
paths: null,
|
||||
txt_type: 0,
|
||||
signature: null,
|
||||
sender_key: 'aa'.repeat(32),
|
||||
outgoing: false,
|
||||
acked: 0,
|
||||
sender_name: 'Alice',
|
||||
};
|
||||
|
||||
function createRealtimeArgs(overrides: Partial<Parameters<typeof useRealtimeAppState>[0]> = {}) {
|
||||
const setHealth = vi.fn();
|
||||
const setRawPackets = vi.fn();
|
||||
const setChannels = vi.fn();
|
||||
const setContacts = vi.fn();
|
||||
|
||||
return {
|
||||
args: {
|
||||
prevHealthRef: { current: null as HealthStatus | null },
|
||||
setHealth,
|
||||
fetchConfig: vi.fn(),
|
||||
setRawPackets,
|
||||
triggerReconcile: vi.fn(),
|
||||
refreshUnreads: vi.fn(async () => {}),
|
||||
setChannels,
|
||||
fetchAllContacts: vi.fn(async () => [] as Contact[]),
|
||||
setContacts,
|
||||
blockedKeysRef: { current: [] as string[] },
|
||||
blockedNamesRef: { current: [] as string[] },
|
||||
activeConversationRef: { current: null as Conversation | null },
|
||||
hasNewerMessagesRef: { current: false },
|
||||
addMessageIfNew: vi.fn(),
|
||||
trackNewMessage: vi.fn(),
|
||||
incrementUnread: vi.fn(),
|
||||
checkMention: vi.fn(() => false),
|
||||
pendingDeleteFallbackRef: { current: false },
|
||||
setActiveConversation: vi.fn(),
|
||||
updateMessageAck: vi.fn(),
|
||||
...overrides,
|
||||
},
|
||||
fns: {
|
||||
setHealth,
|
||||
setRawPackets,
|
||||
setChannels,
|
||||
setContacts,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe('useRealtimeAppState', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mocks.api.getChannels.mockResolvedValue([publicChannel]);
|
||||
});
|
||||
|
||||
it('reconnect clears raw packets and refetches channels/contacts/unreads', async () => {
|
||||
const contacts: Contact[] = [
|
||||
{
|
||||
public_key: 'bb'.repeat(32),
|
||||
name: 'Bob',
|
||||
type: 1,
|
||||
flags: 0,
|
||||
last_path: null,
|
||||
last_path_len: 0,
|
||||
out_path_hash_mode: 0,
|
||||
last_advert: null,
|
||||
lat: null,
|
||||
lon: null,
|
||||
last_seen: null,
|
||||
on_radio: false,
|
||||
last_contacted: null,
|
||||
last_read_at: null,
|
||||
first_seen: null,
|
||||
},
|
||||
];
|
||||
|
||||
const { args, fns } = createRealtimeArgs({
|
||||
fetchAllContacts: vi.fn(async () => contacts),
|
||||
});
|
||||
|
||||
const { result } = renderHook(() => useRealtimeAppState(args));
|
||||
|
||||
act(() => {
|
||||
result.current.onReconnect?.();
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(args.triggerReconcile).toHaveBeenCalledTimes(1);
|
||||
expect(args.refreshUnreads).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.api.getChannels).toHaveBeenCalledTimes(1);
|
||||
expect(args.fetchAllContacts).toHaveBeenCalledTimes(1);
|
||||
expect(fns.setRawPackets).toHaveBeenCalledWith([]);
|
||||
expect(fns.setChannels).toHaveBeenCalledWith([publicChannel]);
|
||||
expect(fns.setContacts).toHaveBeenCalledWith(contacts);
|
||||
});
|
||||
});
|
||||
|
||||
it('tracks unread state for a new non-active incoming message', () => {
|
||||
mocks.messageCache.addMessage.mockReturnValue(true);
|
||||
const { args } = createRealtimeArgs({
|
||||
checkMention: vi.fn(() => true),
|
||||
});
|
||||
|
||||
const { result } = renderHook(() => useRealtimeAppState(args));
|
||||
|
||||
act(() => {
|
||||
result.current.onMessage?.(incomingDm);
|
||||
});
|
||||
|
||||
expect(args.addMessageIfNew).not.toHaveBeenCalled();
|
||||
expect(args.trackNewMessage).toHaveBeenCalledWith(incomingDm);
|
||||
expect(mocks.messageCache.addMessage).toHaveBeenCalledWith(
|
||||
incomingDm.conversation_key,
|
||||
incomingDm,
|
||||
expect.any(String)
|
||||
);
|
||||
expect(args.incrementUnread).toHaveBeenCalledWith(
|
||||
`contact-${incomingDm.conversation_key}`,
|
||||
true
|
||||
);
|
||||
});
|
||||
|
||||
it('deleting the active contact clears it and marks fallback recovery pending', () => {
|
||||
const pendingDeleteFallbackRef = { current: false };
|
||||
const activeConversationRef = {
|
||||
current: {
|
||||
type: 'contact',
|
||||
id: incomingDm.conversation_key,
|
||||
name: 'Alice',
|
||||
} satisfies Conversation,
|
||||
};
|
||||
const { args, fns } = createRealtimeArgs({
|
||||
activeConversationRef,
|
||||
pendingDeleteFallbackRef,
|
||||
});
|
||||
|
||||
const { result } = renderHook(() => useRealtimeAppState(args));
|
||||
|
||||
act(() => {
|
||||
result.current.onContactDeleted?.(incomingDm.conversation_key);
|
||||
});
|
||||
|
||||
expect(fns.setContacts).toHaveBeenCalledWith(expect.any(Function));
|
||||
expect(mocks.messageCache.remove).toHaveBeenCalledWith(incomingDm.conversation_key);
|
||||
expect(args.setActiveConversation).toHaveBeenCalledWith(null);
|
||||
expect(pendingDeleteFallbackRef.current).toBe(true);
|
||||
});
|
||||
|
||||
it('appends raw packets using observation identity dedup', () => {
|
||||
const { args, fns } = createRealtimeArgs();
|
||||
const packet: RawPacket = {
|
||||
id: 1,
|
||||
observation_id: 2,
|
||||
timestamp: 1700000000,
|
||||
data: 'aabb',
|
||||
payload_type: 'GROUP_TEXT',
|
||||
snr: 7.5,
|
||||
rssi: -80,
|
||||
decrypted: false,
|
||||
decrypted_info: null,
|
||||
};
|
||||
|
||||
const { result } = renderHook(() => useRealtimeAppState(args));
|
||||
|
||||
act(() => {
|
||||
result.current.onRawPacket?.(packet);
|
||||
});
|
||||
|
||||
expect(fns.setRawPackets).toHaveBeenCalledWith(expect.any(Function));
|
||||
});
|
||||
});
|
||||
@@ -12,7 +12,7 @@ interface SuccessEvent {
|
||||
details?: string;
|
||||
}
|
||||
|
||||
interface UseWebSocketOptions {
|
||||
export interface UseWebSocketOptions {
|
||||
onHealth?: (health: HealthStatus) => void;
|
||||
onMessage?: (message: Message) => void;
|
||||
onContact?: (contact: Contact) => void;
|
||||
|
||||
Reference in New Issue
Block a user