mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
Add missing tests and address AGENTS.md gaps
This commit is contained in:
@@ -203,13 +203,37 @@ Run backend tests:
|
||||
PYTHONPATH=. uv run pytest tests/ -v
|
||||
```
|
||||
|
||||
High-signal suites:
|
||||
- `tests/test_packet_pipeline.py`
|
||||
- `tests/test_event_handlers.py`
|
||||
- `tests/test_send_messages.py`
|
||||
- `tests/test_radio.py`
|
||||
- `tests/test_api.py`
|
||||
- `tests/test_migrations.py`
|
||||
Test suites:
|
||||
|
||||
```text
|
||||
tests/
|
||||
├── conftest.py # Shared fixtures
|
||||
├── test_api.py # REST endpoint integration tests
|
||||
├── test_bot.py # Bot execution and sandboxing
|
||||
├── test_config.py # Configuration validation
|
||||
├── test_contacts_router.py # Contacts router endpoints
|
||||
├── test_decoder.py # Packet parsing/decryption
|
||||
├── test_echo_dedup.py # Echo/repeat deduplication (incl. concurrent)
|
||||
├── test_event_handlers.py # ACK tracking, event registration, cleanup
|
||||
├── test_frontend_static.py # Frontend static file serving
|
||||
├── test_key_normalization.py # Public key normalization
|
||||
├── test_keystore.py # Ephemeral keystore
|
||||
├── test_message_pagination.py # Cursor-based message pagination
|
||||
├── test_message_prefix_claim.py # Message prefix claim logic
|
||||
├── test_migrations.py # Schema migration system
|
||||
├── test_packet_pipeline.py # End-to-end packet processing
|
||||
├── test_radio.py # RadioManager, serial detection
|
||||
├── test_radio_operation.py # radio_operation() context manager
|
||||
├── test_radio_router.py # Radio router endpoints
|
||||
├── test_radio_sync.py # Polling, sync, advertisement
|
||||
├── test_repeater_routes.py # Repeater command/telemetry/trace
|
||||
├── test_repository.py # Data access layer
|
||||
├── test_send_messages.py # Outgoing messages, bot triggers, concurrent sends
|
||||
├── test_settings_router.py # Settings endpoints, advert validation
|
||||
├── test_statistics.py # Statistics aggregation
|
||||
├── test_websocket.py # WS manager broadcast/cleanup
|
||||
└── test_websocket_route.py # WS endpoint lifecycle
|
||||
```
|
||||
|
||||
## Editing Checklist
|
||||
|
||||
|
||||
@@ -18,33 +18,47 @@ Keep it aligned with `frontend/src` source code.
|
||||
|
||||
```text
|
||||
frontend/src/
|
||||
├── main.tsx # React entry point (StrictMode, root render)
|
||||
├── App.tsx # App shell and orchestration
|
||||
├── api.ts # Typed REST client
|
||||
├── types.ts # Shared TS contracts
|
||||
├── useWebSocket.ts # WS lifecycle + event dispatch
|
||||
├── messageCache.ts # Conversation-scoped cache
|
||||
├── prefetch.ts # Consumes prefetched API promises started in index.html
|
||||
├── index.css # Global styles/utilities
|
||||
├── styles.css # Additional global app styles
|
||||
├── lib/
|
||||
│ └── utils.ts # cn() — clsx + tailwind-merge helper
|
||||
├── hooks/
|
||||
│ ├── useConversationMessages.ts
|
||||
│ ├── useUnreadCounts.ts
|
||||
│ ├── useRepeaterMode.ts
|
||||
│ └── useAirtimeTracking.ts
|
||||
│ ├── index.ts # Central re-export of all hooks
|
||||
│ ├── useConversationMessages.ts # Fetch, pagination, dedup, ACK buffering
|
||||
│ ├── useUnreadCounts.ts # Unread counters, mentions, recent-sort timestamps
|
||||
│ ├── useRepeaterMode.ts # Repeater login/command workflow
|
||||
│ ├── useAirtimeTracking.ts # Repeater airtime stats polling
|
||||
│ ├── useRadioControl.ts # Radio health/config state, reconnection
|
||||
│ ├── useAppSettings.ts # Settings, favorites, preferences migration
|
||||
│ ├── useConversationRouter.ts # URL hash → active conversation routing
|
||||
│ └── useContactsAndChannels.ts # Contact/channel loading, creation, deletion
|
||||
├── utils/
|
||||
│ ├── urlHash.ts
|
||||
│ ├── conversationState.ts
|
||||
│ ├── favorites.ts
|
||||
│ ├── messageParser.ts
|
||||
│ ├── pathUtils.ts
|
||||
│ ├── pubkey.ts
|
||||
│ └── contactAvatar.ts
|
||||
│ ├── urlHash.ts # Hash parsing and encoding
|
||||
│ ├── conversationState.ts # State keys, in-memory + localStorage helpers
|
||||
│ ├── favorites.ts # LocalStorage migration for favorites
|
||||
│ ├── messageParser.ts # Message text → rendered segments
|
||||
│ ├── pathUtils.ts # Distance/validation helpers for paths + map
|
||||
│ ├── pubkey.ts # getContactDisplayName (12-char prefix fallback)
|
||||
│ ├── contactAvatar.ts # Avatar color derivation from public key
|
||||
│ ├── rawPacketIdentity.ts # observation_id vs id dedup helpers
|
||||
│ ├── visualizerUtils.ts # 3D visualizer node types, colors, particles
|
||||
│ └── lastViewedConversation.ts # localStorage for last-viewed conversation
|
||||
├── components/
|
||||
│ ├── StatusBar.tsx
|
||||
│ ├── Sidebar.tsx
|
||||
│ ├── ChatHeader.tsx # Conversation header (trace, favorite, delete)
|
||||
│ ├── MessageList.tsx
|
||||
│ ├── MessageInput.tsx
|
||||
│ ├── NewMessageModal.tsx
|
||||
│ ├── SettingsModal.tsx
|
||||
│ ├── settingsConstants.ts # Settings section ordering and labels
|
||||
│ ├── RawPacketList.tsx
|
||||
│ ├── MapView.tsx
|
||||
│ ├── VisualizerView.tsx
|
||||
@@ -53,8 +67,12 @@ frontend/src/
|
||||
│ ├── CrackerPanel.tsx
|
||||
│ ├── BotCodeEditor.tsx
|
||||
│ ├── ContactAvatar.tsx
|
||||
│ └── ui/
|
||||
│ └── ui/ # shadcn/ui primitives
|
||||
├── types/
|
||||
│ └── d3-force-3d.d.ts # Type declarations for d3-force-3d
|
||||
└── test/
|
||||
├── setup.ts
|
||||
├── fixtures/websocket_events.json
|
||||
├── api.test.ts
|
||||
├── appFavorites.test.tsx
|
||||
├── appStartupHash.test.tsx
|
||||
@@ -64,26 +82,32 @@ frontend/src/
|
||||
├── messageParser.test.ts
|
||||
├── pathUtils.test.ts
|
||||
├── radioPresets.test.ts
|
||||
├── rawPacketIdentity.test.ts
|
||||
├── repeaterMode.test.ts
|
||||
├── settingsModal.test.tsx
|
||||
├── sidebar.test.tsx
|
||||
├── unreadCounts.test.ts
|
||||
├── urlHash.test.ts
|
||||
├── useConversationMessages.test.ts
|
||||
├── useConversationMessages.race.test.ts
|
||||
├── useRepeaterMode.test.ts
|
||||
├── useWebSocket.lifecycle.test.ts
|
||||
├── websocket.test.ts
|
||||
└── setup.ts
|
||||
└── websocket.test.ts
|
||||
```
|
||||
|
||||
## Architecture Notes
|
||||
|
||||
### State ownership
|
||||
|
||||
`App.tsx` orchestrates high-level state (health, config, contacts/channels, active conversation, UI flags).
|
||||
Specialized logic is delegated to hooks:
|
||||
`App.tsx` orchestrates high-level state and delegates to hooks:
|
||||
- `useRadioControl`: radio health/config state, reconnect/reboot polling
|
||||
- `useAppSettings`: settings CRUD, favorites, preferences migration
|
||||
- `useContactsAndChannels`: contact/channel lists, creation, deletion
|
||||
- `useConversationRouter`: URL hash → active conversation routing
|
||||
- `useConversationMessages`: fetch, pagination, dedup/update helpers
|
||||
- `useUnreadCounts`: unread counters, mention tracking, recent-sort timestamps
|
||||
- `useRepeaterMode`: repeater login/command workflow
|
||||
- `useAirtimeTracking`: repeater airtime stats polling
|
||||
|
||||
### Initial load + realtime
|
||||
|
||||
|
||||
@@ -319,6 +319,10 @@ export function useConversationMessages(
|
||||
fetchingConversationIdRef.current = newId;
|
||||
prevConversationIdRef.current = newId;
|
||||
|
||||
// Reset loadingOlder — the previous conversation's in-flight older-message
|
||||
// fetch is irrelevant now (its stale-check will discard the response).
|
||||
setLoadingOlder(false);
|
||||
|
||||
// Clear state for new conversation
|
||||
if (!activeConversation || activeConversation.type === 'raw') {
|
||||
setMessages([]);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { act, renderHook, waitFor } from '@testing-library/react';
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { beforeEach, describe, expect, it, vi, type Mock } from 'vitest';
|
||||
|
||||
import * as messageCache from '../messageCache';
|
||||
import { useConversationMessages } from '../hooks/useConversationMessages';
|
||||
@@ -124,3 +124,98 @@ describe('useConversationMessages ACK ordering', () => {
|
||||
expect(result.current.messages[0].paths).toEqual(highAckPaths);
|
||||
});
|
||||
});
|
||||
|
||||
describe('useConversationMessages conversation switch', () => {
|
||||
beforeEach(() => {
|
||||
mockGetMessages.mockReset();
|
||||
messageCache.clear();
|
||||
});
|
||||
|
||||
it('resets loadingOlder when switching conversations mid-fetch', async () => {
|
||||
const convA: Conversation = { type: 'contact', id: 'conv_a', name: 'Contact A' };
|
||||
const convB: Conversation = { type: 'contact', id: 'conv_b', name: 'Contact B' };
|
||||
|
||||
// Conv A initial fetch: return 200 messages so hasOlderMessages = true
|
||||
const fullPage = Array.from({ length: 200 }, (_, i) =>
|
||||
createMessage({
|
||||
id: i + 1,
|
||||
conversation_key: 'conv_a',
|
||||
text: `msg-${i}`,
|
||||
sender_timestamp: 1700000000 + i,
|
||||
received_at: 1700000000 + i,
|
||||
})
|
||||
);
|
||||
mockGetMessages.mockResolvedValueOnce(fullPage);
|
||||
|
||||
const { result, rerender } = renderHook(
|
||||
({ conv }: { conv: Conversation }) => useConversationMessages(conv),
|
||||
{ initialProps: { conv: convA } }
|
||||
);
|
||||
|
||||
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
|
||||
expect(result.current.hasOlderMessages).toBe(true);
|
||||
expect(result.current.messages).toHaveLength(200);
|
||||
|
||||
// Start fetching older messages — use a deferred promise so it stays in-flight
|
||||
const olderDeferred = createDeferred<Message[]>();
|
||||
mockGetMessages.mockReturnValueOnce(olderDeferred.promise);
|
||||
|
||||
act(() => {
|
||||
result.current.fetchOlderMessages();
|
||||
});
|
||||
|
||||
expect(result.current.loadingOlder).toBe(true);
|
||||
|
||||
// Switch to conv B while older-messages fetch is still pending
|
||||
mockGetMessages.mockResolvedValueOnce([createMessage({ id: 999, conversation_key: 'conv_b' })]);
|
||||
rerender({ conv: convB });
|
||||
|
||||
// loadingOlder must reset immediately — no phantom spinner in conv B
|
||||
await waitFor(() => expect(result.current.loadingOlder).toBe(false));
|
||||
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
|
||||
expect(result.current.messages).toHaveLength(1);
|
||||
expect(result.current.messages[0].conversation_key).toBe('conv_b');
|
||||
|
||||
// Resolve the stale older-messages fetch — should not affect conv B's state
|
||||
olderDeferred.resolve([
|
||||
createMessage({ id: 500, conversation_key: 'conv_a', text: 'stale-old' }),
|
||||
]);
|
||||
|
||||
// Give the stale response time to be processed (it should be discarded)
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(result.current.messages).toHaveLength(1);
|
||||
expect(result.current.messages[0].conversation_key).toBe('conv_b');
|
||||
});
|
||||
|
||||
it('aborts in-flight fetch when switching conversations', async () => {
|
||||
const convA: Conversation = { type: 'contact', id: 'conv_a', name: 'Contact A' };
|
||||
const convB: Conversation = { type: 'contact', id: 'conv_b', name: 'Contact B' };
|
||||
|
||||
// Conv A: never resolves (simulates slow network)
|
||||
mockGetMessages.mockReturnValueOnce(new Promise(() => {}));
|
||||
|
||||
const { result, rerender } = renderHook(
|
||||
({ conv }: { conv: Conversation }) => useConversationMessages(conv),
|
||||
{ initialProps: { conv: convA } }
|
||||
);
|
||||
|
||||
// Should be loading
|
||||
expect(result.current.messagesLoading).toBe(true);
|
||||
|
||||
// Verify the API was called with an AbortSignal
|
||||
const firstCallSignal = (mockGetMessages as Mock).mock.calls[0]?.[1];
|
||||
expect(firstCallSignal).toBeInstanceOf(AbortSignal);
|
||||
|
||||
// Switch to conv B
|
||||
mockGetMessages.mockResolvedValueOnce([createMessage({ id: 1, conversation_key: 'conv_b' })]);
|
||||
rerender({ conv: convB });
|
||||
|
||||
// The signal from conv A's fetch should have been aborted
|
||||
expect(firstCallSignal.aborted).toBe(true);
|
||||
|
||||
// Conv B should load normally
|
||||
await waitFor(() => expect(result.current.messagesLoading).toBe(false));
|
||||
expect(result.current.messages).toHaveLength(1);
|
||||
expect(result.current.messages[0].conversation_key).toBe('conv_b');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -64,8 +64,22 @@ test.describe('Channel messaging in #flightless', () => {
|
||||
const messageContainer = messageEl.locator(
|
||||
'xpath=ancestor::div[contains(@class,"break-words")][1]'
|
||||
);
|
||||
const resendButton = messageContainer.getByTitle('Resend message');
|
||||
await expect(resendButton).toBeVisible({ timeout: 15_000 });
|
||||
// Resend actions now live in the outgoing message status/path modal.
|
||||
// Open it from either pending status (?) or echo-path indicator (✓...).
|
||||
const statusOrPathTrigger = messageContainer.locator(
|
||||
'[title="Message status"], [title="View echo paths"]'
|
||||
);
|
||||
await expect(statusOrPathTrigger.first()).toBeVisible({ timeout: 15_000 });
|
||||
await statusOrPathTrigger.first().click();
|
||||
|
||||
const modal = page.getByRole('dialog');
|
||||
await expect(modal).toBeVisible({ timeout: 10_000 });
|
||||
|
||||
// Byte-perfect resend option (within 30s) includes this helper text.
|
||||
const resendButton = modal.getByRole('button', {
|
||||
name: /Only repeated by new repeaters/i,
|
||||
});
|
||||
await expect(resendButton).toBeVisible({ timeout: 10_000 });
|
||||
|
||||
const resendResponsePromise = page.waitForResponse(
|
||||
(response) =>
|
||||
|
||||
@@ -2,53 +2,54 @@ import { test, expect } from '@playwright/test';
|
||||
import { getRadioConfig, updateRadioConfig } from '../helpers/api';
|
||||
|
||||
test.describe('Radio settings', () => {
|
||||
let originalName: string;
|
||||
|
||||
test.beforeAll(async () => {
|
||||
const config = await getRadioConfig();
|
||||
originalName = config.name;
|
||||
});
|
||||
|
||||
test.afterAll(async () => {
|
||||
// Restore original name via API
|
||||
try {
|
||||
await updateRadioConfig({ name: originalName });
|
||||
} catch {
|
||||
console.warn('Failed to restore radio name — manual intervention may be needed');
|
||||
}
|
||||
});
|
||||
|
||||
test('change radio name via settings UI and verify persistence', async ({ page }) => {
|
||||
// Radio names are limited to 8 characters
|
||||
const testName = 'E2Etest1';
|
||||
const originalConfig = await getRadioConfig();
|
||||
const originalName = originalConfig.name;
|
||||
|
||||
await page.goto('/');
|
||||
await expect(page.getByText('Connected')).toBeVisible();
|
||||
// Radio names are limited to 8 characters.
|
||||
// Use a randomized name per run to avoid collisions with stale state.
|
||||
const randomSuffix = Math.floor(Math.random() * 10000)
|
||||
.toString()
|
||||
.padStart(4, '0');
|
||||
const testName = `E2E${randomSuffix}`; // 7 chars
|
||||
|
||||
// --- Step 1: Change the name via settings UI ---
|
||||
await page.getByText('Settings').click();
|
||||
await page.getByRole('button', { name: /Identity/i }).click();
|
||||
try {
|
||||
await page.goto('/');
|
||||
await expect(page.getByText('Connected')).toBeVisible();
|
||||
|
||||
const nameInput = page.locator('#name');
|
||||
await nameInput.clear();
|
||||
await nameInput.fill(testName);
|
||||
// --- Step 1: Change the name via settings UI ---
|
||||
await page.getByText('Settings').click();
|
||||
await page.getByRole('button', { name: /Identity/i }).click();
|
||||
|
||||
await page.getByRole('button', { name: 'Save Identity Settings' }).click();
|
||||
await expect(page.getByText('Identity settings saved')).toBeVisible({ timeout: 10_000 });
|
||||
const nameInput = page.locator('#name');
|
||||
await nameInput.clear();
|
||||
await nameInput.fill(testName);
|
||||
|
||||
// Exit settings page mode
|
||||
await page.getByRole('button', { name: /Back to Chat/i }).click();
|
||||
await page.getByRole('button', { name: 'Save Identity Settings' }).click();
|
||||
await expect(page.getByText('Identity settings saved')).toBeVisible({ timeout: 10_000 });
|
||||
|
||||
// --- Step 2: Verify via API (now returns fresh data after send_appstart fix) ---
|
||||
const config = await getRadioConfig();
|
||||
expect(config.name).toBe(testName);
|
||||
// Exit settings page mode
|
||||
await page.getByRole('button', { name: /Back to Chat/i }).click();
|
||||
|
||||
// --- Step 3: Verify persistence across page reload ---
|
||||
await page.reload();
|
||||
await expect(page.getByText('Connected')).toBeVisible({ timeout: 15_000 });
|
||||
// --- Step 2: Verify via API (now returns fresh data after send_appstart fix) ---
|
||||
const config = await getRadioConfig();
|
||||
expect(config.name).toBe(testName);
|
||||
|
||||
// --- Step 3: Verify persistence across page reload ---
|
||||
await page.reload();
|
||||
await expect(page.getByText('Connected')).toBeVisible({ timeout: 15_000 });
|
||||
|
||||
await page.getByText('Settings').click();
|
||||
await page.getByRole('button', { name: /Identity/i }).click();
|
||||
await expect(page.locator('#name')).toHaveValue(testName, { timeout: 10_000 });
|
||||
} finally {
|
||||
// Always restore original name, even when assertions fail.
|
||||
try {
|
||||
await updateRadioConfig({ name: originalName });
|
||||
} catch {
|
||||
console.warn('Failed to restore radio name — manual intervention may be needed');
|
||||
}
|
||||
}
|
||||
|
||||
await page.getByText('Settings').click();
|
||||
await page.getByRole('button', { name: /Identity/i }).click();
|
||||
await expect(page.locator('#name')).toHaveValue(testName, { timeout: 10_000 });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -6,6 +6,7 @@ messages, accumulate multi-path routing info, and ensure the dual DM processing
|
||||
paths (packet_processor + event_handler fallback) don't double-store messages.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
@@ -689,3 +690,108 @@ class TestDirectMessageDirectionDetection:
|
||||
|
||||
# Not our message - should return None without attempting decryption
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestConcurrentDMDedup:
|
||||
"""Test that concurrent DM processing deduplicates via atomic INSERT OR IGNORE.
|
||||
|
||||
On a mesh network, the same DM packet can arrive via two RF paths nearly
|
||||
simultaneously, causing two concurrent calls to create_dm_message_from_decrypted.
|
||||
SQLite's INSERT OR IGNORE ensures only one message is stored.
|
||||
"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_identical_dms_only_store_once(self, test_db, captured_broadcasts):
|
||||
"""Two concurrent create_dm_message_from_decrypted calls with identical content
|
||||
should result in exactly one stored message."""
|
||||
from app.packet_processor import create_dm_message_from_decrypted
|
||||
|
||||
pkt1, _ = await RawPacketRepository.create(b"concurrent_dm_1", SENDER_TIMESTAMP)
|
||||
pkt2, _ = await RawPacketRepository.create(b"concurrent_dm_2", SENDER_TIMESTAMP + 1)
|
||||
|
||||
decrypted = DecryptedDirectMessage(
|
||||
timestamp=SENDER_TIMESTAMP,
|
||||
flags=0,
|
||||
message="Concurrent dedup test",
|
||||
dest_hash="fa",
|
||||
src_hash="a1",
|
||||
)
|
||||
|
||||
broadcasts, mock_broadcast = captured_broadcasts
|
||||
|
||||
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
||||
results = await asyncio.gather(
|
||||
create_dm_message_from_decrypted(
|
||||
packet_id=pkt1,
|
||||
decrypted=decrypted,
|
||||
their_public_key=CONTACT_PUB,
|
||||
our_public_key=OUR_PUB,
|
||||
received_at=SENDER_TIMESTAMP,
|
||||
path="aa",
|
||||
outgoing=False,
|
||||
),
|
||||
create_dm_message_from_decrypted(
|
||||
packet_id=pkt2,
|
||||
decrypted=decrypted,
|
||||
their_public_key=CONTACT_PUB,
|
||||
our_public_key=OUR_PUB,
|
||||
received_at=SENDER_TIMESTAMP + 1,
|
||||
path="bbcc",
|
||||
outgoing=False,
|
||||
),
|
||||
)
|
||||
|
||||
# Exactly one should create, the other should return None (duplicate)
|
||||
created = [r for r in results if r is not None]
|
||||
duplicates = [r for r in results if r is None]
|
||||
assert len(created) == 1
|
||||
assert len(duplicates) == 1
|
||||
|
||||
# Only one message in DB
|
||||
messages = await MessageRepository.get_all(
|
||||
msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10
|
||||
)
|
||||
assert len(messages) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_channel_echoes_only_store_once(self, test_db, captured_broadcasts):
|
||||
"""Two concurrent create_message_from_decrypted calls with identical content
|
||||
should result in exactly one stored message."""
|
||||
from app.packet_processor import create_message_from_decrypted
|
||||
|
||||
pkt1, _ = await RawPacketRepository.create(b"concurrent_chan_1", SENDER_TIMESTAMP)
|
||||
pkt2, _ = await RawPacketRepository.create(b"concurrent_chan_2", SENDER_TIMESTAMP + 1)
|
||||
|
||||
broadcasts, mock_broadcast = captured_broadcasts
|
||||
|
||||
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
||||
results = await asyncio.gather(
|
||||
create_message_from_decrypted(
|
||||
packet_id=pkt1,
|
||||
channel_key=CHANNEL_KEY,
|
||||
sender="Alice",
|
||||
message_text="Concurrent channel test",
|
||||
timestamp=SENDER_TIMESTAMP,
|
||||
received_at=SENDER_TIMESTAMP,
|
||||
path="aa",
|
||||
),
|
||||
create_message_from_decrypted(
|
||||
packet_id=pkt2,
|
||||
channel_key=CHANNEL_KEY,
|
||||
sender="Alice",
|
||||
message_text="Concurrent channel test",
|
||||
timestamp=SENDER_TIMESTAMP,
|
||||
received_at=SENDER_TIMESTAMP + 1,
|
||||
path="bbcc",
|
||||
),
|
||||
)
|
||||
|
||||
created = [r for r in results if r is not None]
|
||||
duplicates = [r for r in results if r is None]
|
||||
assert len(created) == 1
|
||||
assert len(duplicates) == 1
|
||||
|
||||
messages = await MessageRepository.get_all(
|
||||
msg_type="CHAN", conversation_key=CHANNEL_KEY, limit=10
|
||||
)
|
||||
assert len(messages) == 1
|
||||
|
||||
@@ -96,6 +96,36 @@ class TestAckTracking:
|
||||
|
||||
assert "recent" in _pending_acks
|
||||
|
||||
def test_cleanup_handles_many_expired_acks_without_growth(self):
|
||||
"""Many tracked ACKs that all expire should all be cleaned up,
|
||||
preventing unbounded memory growth when no ACKs ever arrive."""
|
||||
now = time.time()
|
||||
for i in range(100):
|
||||
_pending_acks[f"ack_{i}"] = (i, now - 300, 5000) # All expired (300s ago, 5s timeout)
|
||||
|
||||
assert len(_pending_acks) == 100
|
||||
|
||||
cleanup_expired_acks()
|
||||
|
||||
assert len(_pending_acks) == 0
|
||||
|
||||
def test_cleanup_preserves_valid_acks_among_expired(self):
|
||||
"""Cleanup removes only expired ACKs, preserving valid ones."""
|
||||
now = time.time()
|
||||
# 50 expired
|
||||
for i in range(50):
|
||||
_pending_acks[f"expired_{i}"] = (i, now - 300, 5000)
|
||||
# 50 valid
|
||||
for i in range(50):
|
||||
_pending_acks[f"valid_{i}"] = (100 + i, now, 60000)
|
||||
|
||||
assert len(_pending_acks) == 100
|
||||
|
||||
cleanup_expired_acks()
|
||||
|
||||
assert len(_pending_acks) == 50
|
||||
assert all(k.startswith("valid_") for k in _pending_acks)
|
||||
|
||||
|
||||
class TestAckEventHandler:
|
||||
"""Test the on_ack event handler."""
|
||||
|
||||
@@ -588,3 +588,102 @@ class TestResendChannelMessage:
|
||||
|
||||
assert exc_info.value.status_code == 400
|
||||
assert "expired" in exc_info.value.detail.lower()
|
||||
|
||||
|
||||
class TestConcurrentChannelSends:
|
||||
"""Test that concurrent channel sends are serialized by the radio operation lock.
|
||||
|
||||
The send_channel_message endpoint uses set_channel (slot 0) then send_chan_msg.
|
||||
Concurrent sends must be serialized so two messages don't clobber the same
|
||||
temporary radio slot.
|
||||
"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_sends_to_different_channels_both_succeed(self, test_db):
|
||||
"""Two concurrent send_channel_message calls to different channels
|
||||
should both succeed — the radio_operation lock serializes them."""
|
||||
mc = _make_mc(name="TestNode")
|
||||
chan_key_a = "aa" * 16
|
||||
chan_key_b = "bb" * 16
|
||||
await ChannelRepository.upsert(key=chan_key_a, name="#alpha")
|
||||
await ChannelRepository.upsert(key=chan_key_b, name="#bravo")
|
||||
|
||||
with (
|
||||
patch("app.routers.messages.require_connected", return_value=mc),
|
||||
patch.object(radio_manager, "_meshcore", mc),
|
||||
patch("app.routers.messages.broadcast_event"),
|
||||
):
|
||||
results = await asyncio.gather(
|
||||
send_channel_message(
|
||||
SendChannelMessageRequest(channel_key=chan_key_a, text="Hello alpha")
|
||||
),
|
||||
send_channel_message(
|
||||
SendChannelMessageRequest(channel_key=chan_key_b, text="Hello bravo")
|
||||
),
|
||||
)
|
||||
|
||||
# Both should have returned Message objects with distinct IDs
|
||||
assert results[0].id != results[1].id
|
||||
assert results[0].conversation_key == chan_key_a.upper()
|
||||
assert results[1].conversation_key == chan_key_b.upper()
|
||||
|
||||
# set_channel should have been called twice (once per send, serialized)
|
||||
assert mc.commands.set_channel.await_count == 2
|
||||
|
||||
# send_chan_msg should have been called twice
|
||||
assert mc.commands.send_chan_msg.await_count == 2
|
||||
|
||||
# Both messages should be in DB
|
||||
msgs_a = await MessageRepository.get_all(
|
||||
msg_type="CHAN", conversation_key=chan_key_a.upper(), limit=10
|
||||
)
|
||||
msgs_b = await MessageRepository.get_all(
|
||||
msg_type="CHAN", conversation_key=chan_key_b.upper(), limit=10
|
||||
)
|
||||
assert len(msgs_a) == 1
|
||||
assert len(msgs_b) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_sends_to_same_channel_both_succeed(self, test_db):
|
||||
"""Two concurrent sends to the same channel should both succeed
|
||||
with distinct timestamps (serialized, no slot clobber)."""
|
||||
mc = _make_mc(name="TestNode")
|
||||
chan_key = "cc" * 16
|
||||
await ChannelRepository.upsert(key=chan_key, name="#charlie")
|
||||
|
||||
call_count = 0
|
||||
|
||||
# Mock time to return incrementing seconds so the two messages
|
||||
# get distinct sender_timestamps (avoiding same-second collision).
|
||||
original_time = time.time
|
||||
|
||||
def advancing_time():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return original_time() + call_count
|
||||
|
||||
with (
|
||||
patch("app.routers.messages.require_connected", return_value=mc),
|
||||
patch.object(radio_manager, "_meshcore", mc),
|
||||
patch("app.routers.messages.broadcast_event"),
|
||||
patch("app.routers.messages.time") as mock_time,
|
||||
):
|
||||
mock_time.time = advancing_time
|
||||
results = await asyncio.gather(
|
||||
send_channel_message(
|
||||
SendChannelMessageRequest(channel_key=chan_key, text="Message one")
|
||||
),
|
||||
send_channel_message(
|
||||
SendChannelMessageRequest(channel_key=chan_key, text="Message two")
|
||||
),
|
||||
)
|
||||
|
||||
assert results[0].id != results[1].id
|
||||
texts = {results[0].text, results[1].text}
|
||||
assert "TestNode: Message one" in texts
|
||||
assert "TestNode: Message two" in texts
|
||||
|
||||
msgs = await MessageRepository.get_all(
|
||||
msg_type="CHAN", conversation_key=chan_key.upper(), limit=10
|
||||
)
|
||||
assert len(msgs) == 2
|
||||
|
||||
Reference in New Issue
Block a user