mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-05-08 06:15:02 +02:00
Improve frontend response time
This commit is contained in:
@@ -227,6 +227,20 @@ class BotConfig(BaseModel):
|
||||
code: str = Field(default="", description="Python code for this bot")
|
||||
|
||||
|
||||
class UnreadCounts(BaseModel):
|
||||
"""Aggregated unread counts, mention flags, and last message times for all conversations."""
|
||||
|
||||
counts: dict[str, int] = Field(
|
||||
default_factory=dict, description="Map of stateKey -> unread count"
|
||||
)
|
||||
mentions: dict[str, bool] = Field(
|
||||
default_factory=dict, description="Map of stateKey -> has mention"
|
||||
)
|
||||
last_message_times: dict[str, int] = Field(
|
||||
default_factory=dict, description="Map of stateKey -> last message timestamp"
|
||||
)
|
||||
|
||||
|
||||
class AppSettings(BaseModel):
|
||||
"""Application settings stored in the database."""
|
||||
|
||||
|
||||
@@ -550,6 +550,86 @@ class MessageRepository:
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
async def get_unread_counts(name: str | None = None) -> dict:
|
||||
"""Get unread message counts, mention flags, and last message times for all conversations.
|
||||
|
||||
Args:
|
||||
name: User's display name for @[name] mention detection. If None, mentions are skipped.
|
||||
|
||||
Returns:
|
||||
Dict with 'counts', 'mentions', and 'last_message_times' keys.
|
||||
"""
|
||||
counts: dict[str, int] = {}
|
||||
mention_flags: dict[str, bool] = {}
|
||||
last_message_times: dict[str, int] = {}
|
||||
|
||||
mention_pattern = f"%@[{name}]%" if name else None
|
||||
|
||||
# Channel unreads
|
||||
cursor = await db.conn.execute(
|
||||
"""
|
||||
SELECT m.conversation_key,
|
||||
COUNT(*) as unread_count,
|
||||
MAX(m.received_at) as last_message_time,
|
||||
SUM(CASE WHEN m.text LIKE ? THEN 1 ELSE 0 END) > 0 as has_mention
|
||||
FROM messages m
|
||||
JOIN channels c ON m.conversation_key = c.key
|
||||
WHERE m.type = 'CHAN' AND m.outgoing = 0
|
||||
AND m.received_at > COALESCE(c.last_read_at, 0)
|
||||
GROUP BY m.conversation_key
|
||||
""",
|
||||
(mention_pattern or "",),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
for row in rows:
|
||||
state_key = f"channel-{row['conversation_key']}"
|
||||
counts[state_key] = row["unread_count"]
|
||||
if mention_pattern and row["has_mention"]:
|
||||
mention_flags[state_key] = True
|
||||
|
||||
# Contact unreads
|
||||
cursor = await db.conn.execute(
|
||||
"""
|
||||
SELECT m.conversation_key,
|
||||
COUNT(*) as unread_count,
|
||||
MAX(m.received_at) as last_message_time,
|
||||
SUM(CASE WHEN m.text LIKE ? THEN 1 ELSE 0 END) > 0 as has_mention
|
||||
FROM messages m
|
||||
JOIN contacts ct ON m.conversation_key = ct.public_key
|
||||
WHERE m.type = 'PRIV' AND m.outgoing = 0
|
||||
AND m.received_at > COALESCE(ct.last_read_at, 0)
|
||||
GROUP BY m.conversation_key
|
||||
""",
|
||||
(mention_pattern or "",),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
for row in rows:
|
||||
state_key = f"contact-{row['conversation_key']}"
|
||||
counts[state_key] = row["unread_count"]
|
||||
if mention_pattern and row["has_mention"]:
|
||||
mention_flags[state_key] = True
|
||||
|
||||
# Last message times for all conversations (including read ones)
|
||||
cursor = await db.conn.execute(
|
||||
"""
|
||||
SELECT type, conversation_key, MAX(received_at) as last_message_time
|
||||
FROM messages
|
||||
GROUP BY type, conversation_key
|
||||
"""
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
for row in rows:
|
||||
prefix = "channel" if row["type"] == "CHAN" else "contact"
|
||||
state_key = f"{prefix}-{row['conversation_key']}"
|
||||
last_message_times[state_key] = row["last_message_time"]
|
||||
|
||||
return {
|
||||
"counts": counts,
|
||||
"mentions": mention_flags,
|
||||
"last_message_times": last_message_times,
|
||||
}
|
||||
|
||||
|
||||
class RawPacketRepository:
|
||||
@staticmethod
|
||||
|
||||
@@ -3,14 +3,29 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
from fastapi import APIRouter
|
||||
from fastapi import APIRouter, Query
|
||||
|
||||
from app.database import db
|
||||
from app.models import UnreadCounts
|
||||
from app.repository import MessageRepository
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/read-state", tags=["read-state"])
|
||||
|
||||
|
||||
@router.get("/unreads", response_model=UnreadCounts)
|
||||
async def get_unreads(
|
||||
name: str | None = Query(default=None, description="User's name for @mention detection"),
|
||||
) -> UnreadCounts:
|
||||
"""Get unread counts, mention flags, and last message times for all conversations.
|
||||
|
||||
Computes unread counts server-side using last_read_at timestamps on
|
||||
channels and contacts, avoiding the need to fetch bulk messages.
|
||||
"""
|
||||
data = await MessageRepository.get_unread_counts(name)
|
||||
return UnreadCounts(**data)
|
||||
|
||||
|
||||
@router.post("/mark-all-read")
|
||||
async def mark_all_read() -> dict:
|
||||
"""Mark all contacts and channels as read.
|
||||
|
||||
+7
-29
@@ -7,7 +7,7 @@ from fastapi import APIRouter, WebSocket, WebSocketDisconnect
|
||||
|
||||
from app.config import settings
|
||||
from app.radio import radio_manager
|
||||
from app.repository import ChannelRepository, ContactRepository, RawPacketRepository
|
||||
from app.repository import RawPacketRepository
|
||||
from app.websocket import ws_manager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -16,12 +16,15 @@ router = APIRouter()
|
||||
|
||||
@router.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket) -> None:
|
||||
"""WebSocket endpoint for real-time updates."""
|
||||
"""WebSocket endpoint for real-time updates.
|
||||
|
||||
Only sends health status on initial connect. Contacts and channels
|
||||
are fetched via REST endpoints for faster parallel loading.
|
||||
"""
|
||||
await ws_manager.connect(websocket)
|
||||
|
||||
# Send initial state
|
||||
# Send initial health status
|
||||
try:
|
||||
# Health status
|
||||
db_size_mb = 0.0
|
||||
try:
|
||||
db_size_bytes = os.path.getsize(settings.database_path)
|
||||
@@ -45,31 +48,6 @@ async def websocket_endpoint(websocket: WebSocket) -> None:
|
||||
}
|
||||
await ws_manager.send_personal(websocket, "health", health_data)
|
||||
|
||||
# Contacts - fetch all by paginating until exhausted
|
||||
all_contacts = []
|
||||
chunk_size = 500
|
||||
offset = 0
|
||||
while True:
|
||||
chunk = await ContactRepository.get_all(limit=chunk_size, offset=offset)
|
||||
all_contacts.extend(chunk)
|
||||
if len(chunk) < chunk_size:
|
||||
break
|
||||
offset += chunk_size
|
||||
|
||||
await ws_manager.send_personal(
|
||||
websocket,
|
||||
"contacts",
|
||||
[c.model_dump() for c in all_contacts],
|
||||
)
|
||||
|
||||
# Channels
|
||||
channels = await ChannelRepository.get_all()
|
||||
await ws_manager.send_personal(
|
||||
websocket,
|
||||
"channels",
|
||||
[c.model_dump() for c in channels],
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error sending initial state: %s", e)
|
||||
|
||||
|
||||
Vendored
+2
-2
File diff suppressed because one or more lines are too long
+1
-1
File diff suppressed because one or more lines are too long
+572
File diff suppressed because one or more lines are too long
+1
File diff suppressed because one or more lines are too long
-572
File diff suppressed because one or more lines are too long
-1
File diff suppressed because one or more lines are too long
Vendored
+1
-1
@@ -13,7 +13,7 @@
|
||||
<link rel="shortcut icon" href="/favicon.ico" />
|
||||
<link rel="apple-touch-icon" sizes="180x180" href="/apple-touch-icon.png" />
|
||||
<link rel="manifest" href="/site.webmanifest" />
|
||||
<script type="module" crossorigin src="/assets/index-CL0FbnNJ.js"></script>
|
||||
<script type="module" crossorigin src="/assets/index-BrthzT77.js"></script>
|
||||
<link rel="stylesheet" crossorigin href="/assets/index-DJA5wYVF.css">
|
||||
</head>
|
||||
<body>
|
||||
|
||||
+94
-47
@@ -282,12 +282,32 @@ export function App() {
|
||||
}
|
||||
}, []);
|
||||
|
||||
// Initial fetch for config and settings
|
||||
// Fetch all contacts, paginating if >1000
|
||||
const fetchAllContacts = useCallback(async (): Promise<Contact[]> => {
|
||||
const pageSize = 1000;
|
||||
const first = await api.getContacts(pageSize, 0);
|
||||
if (first.length < pageSize) return first;
|
||||
let all = [...first];
|
||||
let offset = pageSize;
|
||||
while (true) {
|
||||
const page = await api.getContacts(pageSize, offset);
|
||||
all = all.concat(page);
|
||||
if (page.length < pageSize) break;
|
||||
offset += pageSize;
|
||||
}
|
||||
return all;
|
||||
}, []);
|
||||
|
||||
// Initial fetch for config, settings, and data
|
||||
useEffect(() => {
|
||||
fetchConfig();
|
||||
fetchAppSettings();
|
||||
fetchUndecryptedCount();
|
||||
}, [fetchConfig, fetchAppSettings, fetchUndecryptedCount]);
|
||||
|
||||
// Fetch contacts and channels via REST (parallel, faster than WS serial push)
|
||||
api.getChannels().then(setChannels).catch(console.error);
|
||||
fetchAllContacts().then(setContacts).catch(console.error);
|
||||
}, [fetchConfig, fetchAppSettings, fetchUndecryptedCount, fetchAllContacts]);
|
||||
|
||||
// One-time migration of localStorage preferences to server
|
||||
const hasMigratedRef = useRef(false);
|
||||
@@ -355,61 +375,53 @@ export function App() {
|
||||
migratePreferences();
|
||||
}, [appSettings]);
|
||||
|
||||
// Resolve URL hash to a conversation
|
||||
const resolveHashToConversation = useCallback((): Conversation | null => {
|
||||
const hashConv = parseHashConversation();
|
||||
if (!hashConv) return null;
|
||||
// Phase 1: Set initial conversation from URL hash or default to Public channel
|
||||
// Only needs channels (fast path) - doesn't wait for contacts
|
||||
const hasSetDefaultConversation = useRef(false);
|
||||
useEffect(() => {
|
||||
if (hasSetDefaultConversation.current || activeConversation) return;
|
||||
if (channels.length === 0) return;
|
||||
|
||||
if (hashConv.type === 'raw') {
|
||||
return { type: 'raw', id: 'raw', name: 'Raw Packet Feed' };
|
||||
const hashConv = parseHashConversation();
|
||||
|
||||
// Handle non-data views immediately
|
||||
if (hashConv?.type === 'raw') {
|
||||
setActiveConversation({ type: 'raw', id: 'raw', name: 'Raw Packet Feed' });
|
||||
hasSetDefaultConversation.current = true;
|
||||
return;
|
||||
}
|
||||
if (hashConv.type === 'map') {
|
||||
return {
|
||||
if (hashConv?.type === 'map') {
|
||||
setActiveConversation({
|
||||
type: 'map',
|
||||
id: 'map',
|
||||
name: 'Node Map',
|
||||
mapFocusKey: hashConv.mapFocusKey,
|
||||
};
|
||||
});
|
||||
hasSetDefaultConversation.current = true;
|
||||
return;
|
||||
}
|
||||
if (hashConv.type === 'visualizer') {
|
||||
return { type: 'visualizer', id: 'visualizer', name: 'Mesh Visualizer' };
|
||||
}
|
||||
if (hashConv.type === 'channel') {
|
||||
const channel = channels.find(
|
||||
(c) => c.name === hashConv.name || c.name === `#${hashConv.name}`
|
||||
);
|
||||
if (channel) {
|
||||
return { type: 'channel', id: channel.key, name: channel.name };
|
||||
}
|
||||
}
|
||||
if (hashConv.type === 'contact') {
|
||||
const contact = contacts.find(
|
||||
(c) => getContactDisplayName(c.name, c.public_key) === hashConv.name
|
||||
);
|
||||
if (contact) {
|
||||
return {
|
||||
type: 'contact',
|
||||
id: contact.public_key,
|
||||
name: getContactDisplayName(contact.name, contact.public_key),
|
||||
};
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}, [channels, contacts]);
|
||||
|
||||
// Set initial conversation from URL hash or default to Public channel
|
||||
const hasSetDefaultConversation = useRef(false);
|
||||
useEffect(() => {
|
||||
if (hasSetDefaultConversation.current || activeConversation) return;
|
||||
if (channels.length === 0 && contacts.length === 0) return;
|
||||
|
||||
const conv = resolveHashToConversation();
|
||||
if (conv) {
|
||||
setActiveConversation(conv);
|
||||
if (hashConv?.type === 'visualizer') {
|
||||
setActiveConversation({ type: 'visualizer', id: 'visualizer', name: 'Mesh Visualizer' });
|
||||
hasSetDefaultConversation.current = true;
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle channel hash
|
||||
if (hashConv?.type === 'channel') {
|
||||
const channel = channels.find(
|
||||
(c) => c.name === hashConv.name || c.name === `#${hashConv.name}`
|
||||
);
|
||||
if (channel) {
|
||||
setActiveConversation({ type: 'channel', id: channel.key, name: channel.name });
|
||||
hasSetDefaultConversation.current = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Contact hash — wait for phase 2
|
||||
if (hashConv?.type === 'contact') return;
|
||||
|
||||
// No hash or unresolvable — default to Public
|
||||
const publicChannel = channels.find((c) => c.name === 'Public');
|
||||
if (publicChannel) {
|
||||
setActiveConversation({
|
||||
@@ -419,7 +431,42 @@ export function App() {
|
||||
});
|
||||
hasSetDefaultConversation.current = true;
|
||||
}
|
||||
}, [channels, contacts, activeConversation, resolveHashToConversation]);
|
||||
}, [channels, activeConversation]);
|
||||
|
||||
// Phase 2: Resolve contact hash (only if phase 1 didn't set a conversation)
|
||||
useEffect(() => {
|
||||
if (hasSetDefaultConversation.current || activeConversation) return;
|
||||
if (contacts.length === 0) return;
|
||||
|
||||
const hashConv = parseHashConversation();
|
||||
if (hashConv?.type === 'contact') {
|
||||
const contact = contacts.find(
|
||||
(c) => getContactDisplayName(c.name, c.public_key) === hashConv.name
|
||||
);
|
||||
if (contact) {
|
||||
setActiveConversation({
|
||||
type: 'contact',
|
||||
id: contact.public_key,
|
||||
name: getContactDisplayName(contact.name, contact.public_key),
|
||||
});
|
||||
hasSetDefaultConversation.current = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Contact hash didn't match — fall back to Public if channels loaded
|
||||
if (channels.length > 0) {
|
||||
const publicChannel = channels.find((c) => c.name === 'Public');
|
||||
if (publicChannel) {
|
||||
setActiveConversation({
|
||||
type: 'channel',
|
||||
id: publicChannel.key,
|
||||
name: publicChannel.name,
|
||||
});
|
||||
hasSetDefaultConversation.current = true;
|
||||
}
|
||||
}
|
||||
}, [contacts, channels, activeConversation]);
|
||||
|
||||
// Keep ref in sync and update URL hash
|
||||
useEffect(() => {
|
||||
|
||||
+5
-14
@@ -13,13 +13,11 @@ import type {
|
||||
RadioConfig,
|
||||
RadioConfigUpdate,
|
||||
TelemetryResponse,
|
||||
UnreadCounts,
|
||||
} from './types';
|
||||
|
||||
const API_BASE = '/api';
|
||||
|
||||
/** Max messages fetched per conversation for unread counting. If count equals this, there may be more. */
|
||||
export const UNREAD_FETCH_LIMIT = 100;
|
||||
|
||||
async function fetchJson<T>(url: string, options?: RequestInit): Promise<T> {
|
||||
const res = await fetch(`${API_BASE}${url}`, {
|
||||
...options,
|
||||
@@ -153,17 +151,6 @@ export const api = {
|
||||
const query = searchParams.toString();
|
||||
return fetchJson<Message[]>(`/messages${query ? `?${query}` : ''}`, { signal });
|
||||
},
|
||||
getMessagesBulk: (
|
||||
conversations: Array<{ type: 'PRIV' | 'CHAN'; conversation_key: string }>,
|
||||
limitPerConversation: number = UNREAD_FETCH_LIMIT
|
||||
) =>
|
||||
fetchJson<Record<string, Message[]>>(
|
||||
`/messages/bulk?limit_per_conversation=${limitPerConversation}`,
|
||||
{
|
||||
method: 'POST',
|
||||
body: JSON.stringify(conversations),
|
||||
}
|
||||
),
|
||||
sendDirectMessage: (destination: string, text: string) =>
|
||||
fetchJson<Message>('/messages/direct', {
|
||||
method: 'POST',
|
||||
@@ -193,6 +180,10 @@ export const api = {
|
||||
}),
|
||||
|
||||
// Read State
|
||||
getUnreads: (name?: string) => {
|
||||
const params = name ? `?name=${encodeURIComponent(name)}` : '';
|
||||
return fetchJson<UnreadCounts>(`/read-state/unreads${params}`);
|
||||
},
|
||||
markAllRead: () =>
|
||||
fetchJson<{ status: string; timestamp: number }>('/read-state/mark-all-read', {
|
||||
method: 'POST',
|
||||
|
||||
@@ -5,7 +5,6 @@ import { getContactDisplayName } from '../utils/pubkey';
|
||||
import { ContactAvatar } from './ContactAvatar';
|
||||
import { CONTACT_TYPE_REPEATER } from '../utils/contactAvatar';
|
||||
import { isFavorite } from '../utils/favorites';
|
||||
import { UNREAD_FETCH_LIMIT } from '../api';
|
||||
import { Input } from './ui/input';
|
||||
import { Button } from './ui/button';
|
||||
import { cn } from '@/lib/utils';
|
||||
@@ -33,9 +32,9 @@ interface SidebarProps {
|
||||
onSortOrderChange?: (order: SortOrder) => void;
|
||||
}
|
||||
|
||||
/** Format unread count, showing "X+" if at the fetch limit (indicating there may be more) */
|
||||
/** Format unread count for display */
|
||||
function formatUnreadCount(count: number): string {
|
||||
return count >= UNREAD_FETCH_LIMIT ? `${count}+` : `${count}`;
|
||||
return `${count}`;
|
||||
}
|
||||
|
||||
export function Sidebar({
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { useState, useCallback, useEffect, useRef } from 'react';
|
||||
import { api, UNREAD_FETCH_LIMIT } from '../api';
|
||||
import { api } from '../api';
|
||||
import {
|
||||
getLastMessageTimes,
|
||||
setLastMessageTime,
|
||||
@@ -19,15 +19,6 @@ export interface UseUnreadCountsResult {
|
||||
trackNewMessage: (msg: Message) => void;
|
||||
}
|
||||
|
||||
/** Check if a message text contains a mention of the given name in @[name] format */
|
||||
function messageContainsMention(text: string, name: string | null): boolean {
|
||||
if (!name) return false;
|
||||
// Escape special regex characters in the name
|
||||
const escaped = name.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
|
||||
const mentionPattern = new RegExp(`@\\[${escaped}\\]`, 'i');
|
||||
return mentionPattern.test(text);
|
||||
}
|
||||
|
||||
export function useUnreadCounts(
|
||||
channels: Channel[],
|
||||
contacts: Contact[],
|
||||
@@ -44,106 +35,32 @@ export function useUnreadCounts(
|
||||
myNameRef.current = myName;
|
||||
}, [myName]);
|
||||
|
||||
// Track which channels/contacts we've already fetched unreads for
|
||||
const fetchedChannels = useRef<Set<string>>(new Set());
|
||||
const fetchedContacts = useRef<Set<string>>(new Set());
|
||||
// Fetch unreads from the server-side endpoint
|
||||
const fetchUnreads = useCallback(async () => {
|
||||
try {
|
||||
const data = await api.getUnreads(myNameRef.current ?? undefined);
|
||||
|
||||
// Fetch messages and count unreads for new channels/contacts
|
||||
// Uses server-side last_read_at for consistent read state across devices
|
||||
useEffect(() => {
|
||||
const newChannels = channels.filter((c) => !fetchedChannels.current.has(c.key));
|
||||
const newContacts = contacts.filter(
|
||||
(c) => c.public_key && !fetchedContacts.current.has(c.public_key)
|
||||
);
|
||||
// Replace (not merge) — server counts are authoritative
|
||||
setUnreadCounts(data.counts);
|
||||
setMentions(data.mentions);
|
||||
|
||||
if (newChannels.length === 0 && newContacts.length === 0) return;
|
||||
|
||||
// Mark as fetched before starting (to avoid duplicate fetches if effect re-runs)
|
||||
newChannels.forEach((c) => fetchedChannels.current.add(c.key));
|
||||
newContacts.forEach((c) => fetchedContacts.current.add(c.public_key));
|
||||
|
||||
const fetchAndCountUnreads = async () => {
|
||||
const conversations: Array<{ type: 'PRIV' | 'CHAN'; conversation_key: string }> = [
|
||||
...newChannels.map((c) => ({ type: 'CHAN' as const, conversation_key: c.key })),
|
||||
...newContacts.map((c) => ({ type: 'PRIV' as const, conversation_key: c.public_key })),
|
||||
];
|
||||
|
||||
if (conversations.length === 0) return;
|
||||
|
||||
try {
|
||||
// Fetch messages in chunks to avoid huge single requests
|
||||
const chunkSize = 200;
|
||||
const bulkMessages: Record<string, Message[]> = {};
|
||||
|
||||
for (let i = 0; i < conversations.length; i += chunkSize) {
|
||||
const chunk = conversations.slice(i, i + chunkSize);
|
||||
const chunkResult = await api.getMessagesBulk(chunk, UNREAD_FETCH_LIMIT);
|
||||
Object.assign(bulkMessages, chunkResult);
|
||||
}
|
||||
const newUnreadCounts: Record<string, number> = {};
|
||||
const newMentions: Record<string, boolean> = {};
|
||||
const newLastMessageTimes: Record<string, number> = {};
|
||||
|
||||
// Process channel messages - use server-side last_read_at
|
||||
for (const channel of newChannels) {
|
||||
const msgs = bulkMessages[`CHAN:${channel.key}`] || [];
|
||||
if (msgs.length > 0) {
|
||||
const key = getStateKey('channel', channel.key);
|
||||
// Use server-side last_read_at, fallback to 0 if never read
|
||||
const lastRead = channel.last_read_at || 0;
|
||||
|
||||
const unreadMsgs = msgs.filter((m) => !m.outgoing && m.received_at > lastRead);
|
||||
if (unreadMsgs.length > 0) {
|
||||
newUnreadCounts[key] = unreadMsgs.length;
|
||||
// Check if any unread message mentions the user
|
||||
if (unreadMsgs.some((m) => messageContainsMention(m.text, myNameRef.current))) {
|
||||
newMentions[key] = true;
|
||||
}
|
||||
}
|
||||
|
||||
const latestTime = Math.max(...msgs.map((m) => m.received_at));
|
||||
newLastMessageTimes[key] = latestTime;
|
||||
setLastMessageTime(key, latestTime);
|
||||
}
|
||||
}
|
||||
|
||||
// Process contact messages - use server-side last_read_at
|
||||
for (const contact of newContacts) {
|
||||
const msgs = bulkMessages[`PRIV:${contact.public_key}`] || [];
|
||||
if (msgs.length > 0) {
|
||||
const key = getStateKey('contact', contact.public_key);
|
||||
// Use server-side last_read_at, fallback to 0 if never read
|
||||
const lastRead = contact.last_read_at || 0;
|
||||
|
||||
const unreadMsgs = msgs.filter((m) => !m.outgoing && m.received_at > lastRead);
|
||||
if (unreadMsgs.length > 0) {
|
||||
newUnreadCounts[key] = unreadMsgs.length;
|
||||
// Check if any unread message mentions the user
|
||||
if (unreadMsgs.some((m) => messageContainsMention(m.text, myNameRef.current))) {
|
||||
newMentions[key] = true;
|
||||
}
|
||||
}
|
||||
|
||||
const latestTime = Math.max(...msgs.map((m) => m.received_at));
|
||||
newLastMessageTimes[key] = latestTime;
|
||||
setLastMessageTime(key, latestTime);
|
||||
}
|
||||
}
|
||||
|
||||
if (Object.keys(newUnreadCounts).length > 0) {
|
||||
setUnreadCounts((prev) => ({ ...prev, ...newUnreadCounts }));
|
||||
}
|
||||
if (Object.keys(newMentions).length > 0) {
|
||||
setMentions((prev) => ({ ...prev, ...newMentions }));
|
||||
if (Object.keys(data.last_message_times).length > 0) {
|
||||
// Update in-memory cache and state
|
||||
for (const [key, ts] of Object.entries(data.last_message_times)) {
|
||||
setLastMessageTime(key, ts);
|
||||
}
|
||||
setLastMessageTimes(getLastMessageTimes());
|
||||
} catch (err) {
|
||||
console.error('Failed to fetch messages bulk:', err);
|
||||
}
|
||||
};
|
||||
} catch (err) {
|
||||
console.error('Failed to fetch unreads:', err);
|
||||
}
|
||||
}, []);
|
||||
|
||||
fetchAndCountUnreads();
|
||||
}, [channels, contacts]);
|
||||
// Fetch when channels or contacts arrive/change
|
||||
useEffect(() => {
|
||||
if (channels.length === 0 && contacts.length === 0) return;
|
||||
fetchUnreads();
|
||||
}, [channels, contacts, fetchUnreads]);
|
||||
|
||||
// Mark conversation as read when user views it
|
||||
// Calls server API to persist read state across devices
|
||||
@@ -151,7 +68,8 @@ export function useUnreadCounts(
|
||||
if (
|
||||
activeConversation &&
|
||||
activeConversation.type !== 'raw' &&
|
||||
activeConversation.type !== 'map'
|
||||
activeConversation.type !== 'map' &&
|
||||
activeConversation.type !== 'visualizer'
|
||||
) {
|
||||
const key = getStateKey(
|
||||
activeConversation.type as 'channel' | 'contact',
|
||||
@@ -221,7 +139,7 @@ export function useUnreadCounts(
|
||||
// Mark a specific conversation as read
|
||||
// Calls server API to persist read state across devices
|
||||
const markConversationRead = useCallback((conv: Conversation) => {
|
||||
if (conv.type === 'raw' || conv.type === 'map') return;
|
||||
if (conv.type === 'raw' || conv.type === 'map' || conv.type === 'visualizer') return;
|
||||
|
||||
const key = getStateKey(conv.type as 'channel' | 'contact', conv.id);
|
||||
|
||||
|
||||
@@ -209,3 +209,9 @@ export interface CommandResponse {
|
||||
response: string;
|
||||
sender_timestamp: number | null;
|
||||
}
|
||||
|
||||
export interface UnreadCounts {
|
||||
counts: Record<string, number>;
|
||||
mentions: Record<string, boolean>;
|
||||
last_message_times: Record<string, number>;
|
||||
}
|
||||
|
||||
@@ -480,6 +480,196 @@ class TestReadStateEndpoints:
|
||||
assert response.status_code == 404
|
||||
assert "not found" in response.json()["detail"].lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_unreads_returns_counts_and_mentions(self):
|
||||
"""GET /unreads returns unread counts, mentions, and last message times."""
|
||||
import aiosqlite
|
||||
|
||||
from app.database import db
|
||||
from app.repository import MessageRepository
|
||||
|
||||
conn = await aiosqlite.connect(":memory:")
|
||||
conn.row_factory = aiosqlite.Row
|
||||
|
||||
# Create tables
|
||||
await conn.execute("""
|
||||
CREATE TABLE contacts (
|
||||
public_key TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
type INTEGER DEFAULT 0,
|
||||
flags INTEGER DEFAULT 0,
|
||||
last_path TEXT,
|
||||
last_path_len INTEGER DEFAULT -1,
|
||||
last_advert INTEGER,
|
||||
lat REAL,
|
||||
lon REAL,
|
||||
last_seen INTEGER,
|
||||
on_radio INTEGER DEFAULT 0,
|
||||
last_contacted INTEGER,
|
||||
last_read_at INTEGER
|
||||
)
|
||||
""")
|
||||
await conn.execute("""
|
||||
CREATE TABLE channels (
|
||||
key TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
is_hashtag INTEGER DEFAULT 0,
|
||||
on_radio INTEGER DEFAULT 0,
|
||||
last_read_at INTEGER
|
||||
)
|
||||
""")
|
||||
await conn.execute("""
|
||||
CREATE TABLE messages (
|
||||
id INTEGER PRIMARY KEY,
|
||||
type TEXT NOT NULL,
|
||||
conversation_key TEXT NOT NULL,
|
||||
text TEXT NOT NULL,
|
||||
sender_timestamp INTEGER,
|
||||
received_at INTEGER NOT NULL,
|
||||
paths TEXT,
|
||||
txt_type INTEGER DEFAULT 0,
|
||||
signature TEXT,
|
||||
outgoing INTEGER DEFAULT 0,
|
||||
acked INTEGER DEFAULT 0,
|
||||
UNIQUE(type, conversation_key, text, sender_timestamp)
|
||||
)
|
||||
""")
|
||||
|
||||
# Insert channel and contact
|
||||
await conn.execute(
|
||||
"INSERT INTO channels (key, name, last_read_at) VALUES (?, ?, ?)",
|
||||
("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1", "Public", 1000),
|
||||
)
|
||||
await conn.execute(
|
||||
"INSERT INTO contacts (public_key, name, last_read_at) VALUES (?, ?, ?)",
|
||||
("abcd" * 16, "Alice", 1000),
|
||||
)
|
||||
|
||||
# Insert messages: 2 unread channel msgs (after last_read_at=1000),
|
||||
# 1 read (before), 1 outgoing (should not count)
|
||||
await conn.execute(
|
||||
"INSERT INTO messages (type, conversation_key, text, received_at, outgoing) VALUES (?, ?, ?, ?, ?)",
|
||||
("CHAN", "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1", "Bob: hello", 1001, 0),
|
||||
)
|
||||
await conn.execute(
|
||||
"INSERT INTO messages (type, conversation_key, text, received_at, outgoing) VALUES (?, ?, ?, ?, ?)",
|
||||
("CHAN", "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1", "Bob: @[TestUser] hey", 1002, 0),
|
||||
)
|
||||
await conn.execute(
|
||||
"INSERT INTO messages (type, conversation_key, text, received_at, outgoing) VALUES (?, ?, ?, ?, ?)",
|
||||
("CHAN", "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1", "Bob: old msg", 999, 0),
|
||||
)
|
||||
await conn.execute(
|
||||
"INSERT INTO messages (type, conversation_key, text, received_at, outgoing) VALUES (?, ?, ?, ?, ?)",
|
||||
("CHAN", "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1", "Me: outgoing", 1003, 1),
|
||||
)
|
||||
|
||||
# Insert 1 unread DM
|
||||
await conn.execute(
|
||||
"INSERT INTO messages (type, conversation_key, text, received_at, outgoing) VALUES (?, ?, ?, ?, ?)",
|
||||
("PRIV", "abcd" * 16, "hi there", 1005, 0),
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
original_conn = db._connection
|
||||
db._connection = conn
|
||||
|
||||
try:
|
||||
result = await MessageRepository.get_unread_counts("TestUser")
|
||||
|
||||
# Channel: 2 unread (1001 and 1002), one has mention
|
||||
assert result["counts"]["channel-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1"] == 2
|
||||
assert result["mentions"]["channel-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1"] is True
|
||||
|
||||
# Contact: 1 unread
|
||||
assert result["counts"][f"contact-{'abcd' * 16}"] == 1
|
||||
|
||||
# Last message times should include all conversations
|
||||
assert "channel-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1" in result["last_message_times"]
|
||||
assert result["last_message_times"]["channel-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1"] == 1003
|
||||
assert f"contact-{'abcd' * 16}" in result["last_message_times"]
|
||||
assert result["last_message_times"][f"contact-{'abcd' * 16}"] == 1005
|
||||
finally:
|
||||
db._connection = original_conn
|
||||
await conn.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_unreads_no_name_skips_mentions(self):
|
||||
"""GET /unreads without name param returns counts but no mention flags."""
|
||||
import aiosqlite
|
||||
|
||||
from app.database import db
|
||||
from app.repository import MessageRepository
|
||||
|
||||
conn = await aiosqlite.connect(":memory:")
|
||||
conn.row_factory = aiosqlite.Row
|
||||
|
||||
await conn.execute("""
|
||||
CREATE TABLE channels (
|
||||
key TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
is_hashtag INTEGER DEFAULT 0,
|
||||
on_radio INTEGER DEFAULT 0,
|
||||
last_read_at INTEGER
|
||||
)
|
||||
""")
|
||||
await conn.execute("""
|
||||
CREATE TABLE contacts (
|
||||
public_key TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
type INTEGER DEFAULT 0,
|
||||
flags INTEGER DEFAULT 0,
|
||||
last_path TEXT,
|
||||
last_path_len INTEGER DEFAULT -1,
|
||||
last_advert INTEGER,
|
||||
lat REAL,
|
||||
lon REAL,
|
||||
last_seen INTEGER,
|
||||
on_radio INTEGER DEFAULT 0,
|
||||
last_contacted INTEGER,
|
||||
last_read_at INTEGER
|
||||
)
|
||||
""")
|
||||
await conn.execute("""
|
||||
CREATE TABLE messages (
|
||||
id INTEGER PRIMARY KEY,
|
||||
type TEXT NOT NULL,
|
||||
conversation_key TEXT NOT NULL,
|
||||
text TEXT NOT NULL,
|
||||
sender_timestamp INTEGER,
|
||||
received_at INTEGER NOT NULL,
|
||||
paths TEXT,
|
||||
txt_type INTEGER DEFAULT 0,
|
||||
signature TEXT,
|
||||
outgoing INTEGER DEFAULT 0,
|
||||
acked INTEGER DEFAULT 0,
|
||||
UNIQUE(type, conversation_key, text, sender_timestamp)
|
||||
)
|
||||
""")
|
||||
|
||||
await conn.execute(
|
||||
"INSERT INTO channels (key, name, last_read_at) VALUES (?, ?, ?)",
|
||||
("CHAN1KEY1CHAN1KEY1CHAN1KEY1CHAN1KEY1", "Public", 0),
|
||||
)
|
||||
await conn.execute(
|
||||
"INSERT INTO messages (type, conversation_key, text, received_at, outgoing) VALUES (?, ?, ?, ?, ?)",
|
||||
("CHAN", "CHAN1KEY1CHAN1KEY1CHAN1KEY1CHAN1KEY1", "Bob: @[Alice] hey", 1001, 0),
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
original_conn = db._connection
|
||||
db._connection = conn
|
||||
|
||||
try:
|
||||
result = await MessageRepository.get_unread_counts(None)
|
||||
|
||||
assert result["counts"]["channel-CHAN1KEY1CHAN1KEY1CHAN1KEY1CHAN1KEY1"] == 1
|
||||
# No mentions since name was None
|
||||
assert len(result["mentions"]) == 0
|
||||
finally:
|
||||
db._connection = original_conn
|
||||
await conn.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mark_all_read_updates_all_conversations(self):
|
||||
"""Bulk mark-all-read updates all contacts and channels."""
|
||||
|
||||
Reference in New Issue
Block a user