From 5d7a313c538073bb4d17d44c484dbee222e1f72c Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Mon, 23 Feb 2026 20:26:57 -0800 Subject: [PATCH] Add missing tests and address AGENTS.md gaps --- app/AGENTS.md | 38 +++++-- frontend/AGENTS.md | 56 ++++++--- frontend/src/hooks/useConversationMessages.ts | 4 + .../test/useConversationMessages.race.test.ts | 97 +++++++++++++++- tests/e2e/specs/messaging.spec.ts | 18 ++- tests/e2e/specs/radio-settings.spec.ts | 79 ++++++------- tests/test_echo_dedup.py | 106 ++++++++++++++++++ tests/test_event_handlers.py | 30 +++++ tests/test_send_messages.py | 99 ++++++++++++++++ 9 files changed, 462 insertions(+), 65 deletions(-) diff --git a/app/AGENTS.md b/app/AGENTS.md index 1e95c7f..9c0695a 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -203,13 +203,37 @@ Run backend tests: PYTHONPATH=. uv run pytest tests/ -v ``` -High-signal suites: -- `tests/test_packet_pipeline.py` -- `tests/test_event_handlers.py` -- `tests/test_send_messages.py` -- `tests/test_radio.py` -- `tests/test_api.py` -- `tests/test_migrations.py` +Test suites: + +```text +tests/ +├── conftest.py # Shared fixtures +├── test_api.py # REST endpoint integration tests +├── test_bot.py # Bot execution and sandboxing +├── test_config.py # Configuration validation +├── test_contacts_router.py # Contacts router endpoints +├── test_decoder.py # Packet parsing/decryption +├── test_echo_dedup.py # Echo/repeat deduplication (incl. concurrent) +├── test_event_handlers.py # ACK tracking, event registration, cleanup +├── test_frontend_static.py # Frontend static file serving +├── test_key_normalization.py # Public key normalization +├── test_keystore.py # Ephemeral keystore +├── test_message_pagination.py # Cursor-based message pagination +├── test_message_prefix_claim.py # Message prefix claim logic +├── test_migrations.py # Schema migration system +├── test_packet_pipeline.py # End-to-end packet processing +├── test_radio.py # RadioManager, serial detection +├── test_radio_operation.py # radio_operation() context manager +├── test_radio_router.py # Radio router endpoints +├── test_radio_sync.py # Polling, sync, advertisement +├── test_repeater_routes.py # Repeater command/telemetry/trace +├── test_repository.py # Data access layer +├── test_send_messages.py # Outgoing messages, bot triggers, concurrent sends +├── test_settings_router.py # Settings endpoints, advert validation +├── test_statistics.py # Statistics aggregation +├── test_websocket.py # WS manager broadcast/cleanup +└── test_websocket_route.py # WS endpoint lifecycle +``` ## Editing Checklist diff --git a/frontend/AGENTS.md b/frontend/AGENTS.md index 620aabe..8d3550a 100644 --- a/frontend/AGENTS.md +++ b/frontend/AGENTS.md @@ -18,33 +18,47 @@ Keep it aligned with `frontend/src` source code. ```text frontend/src/ +├── main.tsx # React entry point (StrictMode, root render) ├── App.tsx # App shell and orchestration ├── api.ts # Typed REST client ├── types.ts # Shared TS contracts ├── useWebSocket.ts # WS lifecycle + event dispatch ├── messageCache.ts # Conversation-scoped cache +├── prefetch.ts # Consumes prefetched API promises started in index.html ├── index.css # Global styles/utilities ├── styles.css # Additional global app styles +├── lib/ +│ └── utils.ts # cn() — clsx + tailwind-merge helper ├── hooks/ -│ ├── useConversationMessages.ts -│ ├── useUnreadCounts.ts -│ ├── useRepeaterMode.ts -│ └── useAirtimeTracking.ts +│ ├── index.ts # Central re-export of all hooks +│ ├── useConversationMessages.ts # Fetch, pagination, dedup, ACK buffering +│ ├── useUnreadCounts.ts # Unread counters, mentions, recent-sort timestamps +│ ├── useRepeaterMode.ts # Repeater login/command workflow +│ ├── useAirtimeTracking.ts # Repeater airtime stats polling +│ ├── useRadioControl.ts # Radio health/config state, reconnection +│ ├── useAppSettings.ts # Settings, favorites, preferences migration +│ ├── useConversationRouter.ts # URL hash → active conversation routing +│ └── useContactsAndChannels.ts # Contact/channel loading, creation, deletion ├── utils/ -│ ├── urlHash.ts -│ ├── conversationState.ts -│ ├── favorites.ts -│ ├── messageParser.ts -│ ├── pathUtils.ts -│ ├── pubkey.ts -│ └── contactAvatar.ts +│ ├── urlHash.ts # Hash parsing and encoding +│ ├── conversationState.ts # State keys, in-memory + localStorage helpers +│ ├── favorites.ts # LocalStorage migration for favorites +│ ├── messageParser.ts # Message text → rendered segments +│ ├── pathUtils.ts # Distance/validation helpers for paths + map +│ ├── pubkey.ts # getContactDisplayName (12-char prefix fallback) +│ ├── contactAvatar.ts # Avatar color derivation from public key +│ ├── rawPacketIdentity.ts # observation_id vs id dedup helpers +│ ├── visualizerUtils.ts # 3D visualizer node types, colors, particles +│ └── lastViewedConversation.ts # localStorage for last-viewed conversation ├── components/ │ ├── StatusBar.tsx │ ├── Sidebar.tsx +│ ├── ChatHeader.tsx # Conversation header (trace, favorite, delete) │ ├── MessageList.tsx │ ├── MessageInput.tsx │ ├── NewMessageModal.tsx │ ├── SettingsModal.tsx +│ ├── settingsConstants.ts # Settings section ordering and labels │ ├── RawPacketList.tsx │ ├── MapView.tsx │ ├── VisualizerView.tsx @@ -53,8 +67,12 @@ frontend/src/ │ ├── CrackerPanel.tsx │ ├── BotCodeEditor.tsx │ ├── ContactAvatar.tsx -│ └── ui/ +│ └── ui/ # shadcn/ui primitives +├── types/ +│ └── d3-force-3d.d.ts # Type declarations for d3-force-3d └── test/ + ├── setup.ts + ├── fixtures/websocket_events.json ├── api.test.ts ├── appFavorites.test.tsx ├── appStartupHash.test.tsx @@ -64,26 +82,32 @@ frontend/src/ ├── messageParser.test.ts ├── pathUtils.test.ts ├── radioPresets.test.ts + ├── rawPacketIdentity.test.ts ├── repeaterMode.test.ts ├── settingsModal.test.tsx + ├── sidebar.test.tsx ├── unreadCounts.test.ts ├── urlHash.test.ts ├── useConversationMessages.test.ts + ├── useConversationMessages.race.test.ts ├── useRepeaterMode.test.ts ├── useWebSocket.lifecycle.test.ts - ├── websocket.test.ts - └── setup.ts + └── websocket.test.ts ``` ## Architecture Notes ### State ownership -`App.tsx` orchestrates high-level state (health, config, contacts/channels, active conversation, UI flags). -Specialized logic is delegated to hooks: +`App.tsx` orchestrates high-level state and delegates to hooks: +- `useRadioControl`: radio health/config state, reconnect/reboot polling +- `useAppSettings`: settings CRUD, favorites, preferences migration +- `useContactsAndChannels`: contact/channel lists, creation, deletion +- `useConversationRouter`: URL hash → active conversation routing - `useConversationMessages`: fetch, pagination, dedup/update helpers - `useUnreadCounts`: unread counters, mention tracking, recent-sort timestamps - `useRepeaterMode`: repeater login/command workflow +- `useAirtimeTracking`: repeater airtime stats polling ### Initial load + realtime diff --git a/frontend/src/hooks/useConversationMessages.ts b/frontend/src/hooks/useConversationMessages.ts index 94edf32..598a7db 100644 --- a/frontend/src/hooks/useConversationMessages.ts +++ b/frontend/src/hooks/useConversationMessages.ts @@ -319,6 +319,10 @@ export function useConversationMessages( fetchingConversationIdRef.current = newId; prevConversationIdRef.current = newId; + // Reset loadingOlder — the previous conversation's in-flight older-message + // fetch is irrelevant now (its stale-check will discard the response). + setLoadingOlder(false); + // Clear state for new conversation if (!activeConversation || activeConversation.type === 'raw') { setMessages([]); diff --git a/frontend/src/test/useConversationMessages.race.test.ts b/frontend/src/test/useConversationMessages.race.test.ts index 1ad7393..0c61898 100644 --- a/frontend/src/test/useConversationMessages.race.test.ts +++ b/frontend/src/test/useConversationMessages.race.test.ts @@ -1,5 +1,5 @@ import { act, renderHook, waitFor } from '@testing-library/react'; -import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { beforeEach, describe, expect, it, vi, type Mock } from 'vitest'; import * as messageCache from '../messageCache'; import { useConversationMessages } from '../hooks/useConversationMessages'; @@ -124,3 +124,98 @@ describe('useConversationMessages ACK ordering', () => { expect(result.current.messages[0].paths).toEqual(highAckPaths); }); }); + +describe('useConversationMessages conversation switch', () => { + beforeEach(() => { + mockGetMessages.mockReset(); + messageCache.clear(); + }); + + it('resets loadingOlder when switching conversations mid-fetch', async () => { + const convA: Conversation = { type: 'contact', id: 'conv_a', name: 'Contact A' }; + const convB: Conversation = { type: 'contact', id: 'conv_b', name: 'Contact B' }; + + // Conv A initial fetch: return 200 messages so hasOlderMessages = true + const fullPage = Array.from({ length: 200 }, (_, i) => + createMessage({ + id: i + 1, + conversation_key: 'conv_a', + text: `msg-${i}`, + sender_timestamp: 1700000000 + i, + received_at: 1700000000 + i, + }) + ); + mockGetMessages.mockResolvedValueOnce(fullPage); + + const { result, rerender } = renderHook( + ({ conv }: { conv: Conversation }) => useConversationMessages(conv), + { initialProps: { conv: convA } } + ); + + await waitFor(() => expect(result.current.messagesLoading).toBe(false)); + expect(result.current.hasOlderMessages).toBe(true); + expect(result.current.messages).toHaveLength(200); + + // Start fetching older messages — use a deferred promise so it stays in-flight + const olderDeferred = createDeferred(); + mockGetMessages.mockReturnValueOnce(olderDeferred.promise); + + act(() => { + result.current.fetchOlderMessages(); + }); + + expect(result.current.loadingOlder).toBe(true); + + // Switch to conv B while older-messages fetch is still pending + mockGetMessages.mockResolvedValueOnce([createMessage({ id: 999, conversation_key: 'conv_b' })]); + rerender({ conv: convB }); + + // loadingOlder must reset immediately — no phantom spinner in conv B + await waitFor(() => expect(result.current.loadingOlder).toBe(false)); + await waitFor(() => expect(result.current.messagesLoading).toBe(false)); + expect(result.current.messages).toHaveLength(1); + expect(result.current.messages[0].conversation_key).toBe('conv_b'); + + // Resolve the stale older-messages fetch — should not affect conv B's state + olderDeferred.resolve([ + createMessage({ id: 500, conversation_key: 'conv_a', text: 'stale-old' }), + ]); + + // Give the stale response time to be processed (it should be discarded) + await new Promise((r) => setTimeout(r, 50)); + expect(result.current.messages).toHaveLength(1); + expect(result.current.messages[0].conversation_key).toBe('conv_b'); + }); + + it('aborts in-flight fetch when switching conversations', async () => { + const convA: Conversation = { type: 'contact', id: 'conv_a', name: 'Contact A' }; + const convB: Conversation = { type: 'contact', id: 'conv_b', name: 'Contact B' }; + + // Conv A: never resolves (simulates slow network) + mockGetMessages.mockReturnValueOnce(new Promise(() => {})); + + const { result, rerender } = renderHook( + ({ conv }: { conv: Conversation }) => useConversationMessages(conv), + { initialProps: { conv: convA } } + ); + + // Should be loading + expect(result.current.messagesLoading).toBe(true); + + // Verify the API was called with an AbortSignal + const firstCallSignal = (mockGetMessages as Mock).mock.calls[0]?.[1]; + expect(firstCallSignal).toBeInstanceOf(AbortSignal); + + // Switch to conv B + mockGetMessages.mockResolvedValueOnce([createMessage({ id: 1, conversation_key: 'conv_b' })]); + rerender({ conv: convB }); + + // The signal from conv A's fetch should have been aborted + expect(firstCallSignal.aborted).toBe(true); + + // Conv B should load normally + await waitFor(() => expect(result.current.messagesLoading).toBe(false)); + expect(result.current.messages).toHaveLength(1); + expect(result.current.messages[0].conversation_key).toBe('conv_b'); + }); +}); diff --git a/tests/e2e/specs/messaging.spec.ts b/tests/e2e/specs/messaging.spec.ts index f149a3f..b81d5ec 100644 --- a/tests/e2e/specs/messaging.spec.ts +++ b/tests/e2e/specs/messaging.spec.ts @@ -64,8 +64,22 @@ test.describe('Channel messaging in #flightless', () => { const messageContainer = messageEl.locator( 'xpath=ancestor::div[contains(@class,"break-words")][1]' ); - const resendButton = messageContainer.getByTitle('Resend message'); - await expect(resendButton).toBeVisible({ timeout: 15_000 }); + // Resend actions now live in the outgoing message status/path modal. + // Open it from either pending status (?) or echo-path indicator (✓...). + const statusOrPathTrigger = messageContainer.locator( + '[title="Message status"], [title="View echo paths"]' + ); + await expect(statusOrPathTrigger.first()).toBeVisible({ timeout: 15_000 }); + await statusOrPathTrigger.first().click(); + + const modal = page.getByRole('dialog'); + await expect(modal).toBeVisible({ timeout: 10_000 }); + + // Byte-perfect resend option (within 30s) includes this helper text. + const resendButton = modal.getByRole('button', { + name: /Only repeated by new repeaters/i, + }); + await expect(resendButton).toBeVisible({ timeout: 10_000 }); const resendResponsePromise = page.waitForResponse( (response) => diff --git a/tests/e2e/specs/radio-settings.spec.ts b/tests/e2e/specs/radio-settings.spec.ts index 9ea342b..7362bc6 100644 --- a/tests/e2e/specs/radio-settings.spec.ts +++ b/tests/e2e/specs/radio-settings.spec.ts @@ -2,53 +2,54 @@ import { test, expect } from '@playwright/test'; import { getRadioConfig, updateRadioConfig } from '../helpers/api'; test.describe('Radio settings', () => { - let originalName: string; - - test.beforeAll(async () => { - const config = await getRadioConfig(); - originalName = config.name; - }); - - test.afterAll(async () => { - // Restore original name via API - try { - await updateRadioConfig({ name: originalName }); - } catch { - console.warn('Failed to restore radio name — manual intervention may be needed'); - } - }); - test('change radio name via settings UI and verify persistence', async ({ page }) => { - // Radio names are limited to 8 characters - const testName = 'E2Etest1'; + const originalConfig = await getRadioConfig(); + const originalName = originalConfig.name; - await page.goto('/'); - await expect(page.getByText('Connected')).toBeVisible(); + // Radio names are limited to 8 characters. + // Use a randomized name per run to avoid collisions with stale state. + const randomSuffix = Math.floor(Math.random() * 10000) + .toString() + .padStart(4, '0'); + const testName = `E2E${randomSuffix}`; // 7 chars - // --- Step 1: Change the name via settings UI --- - await page.getByText('Settings').click(); - await page.getByRole('button', { name: /Identity/i }).click(); + try { + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); - const nameInput = page.locator('#name'); - await nameInput.clear(); - await nameInput.fill(testName); + // --- Step 1: Change the name via settings UI --- + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /Identity/i }).click(); - await page.getByRole('button', { name: 'Save Identity Settings' }).click(); - await expect(page.getByText('Identity settings saved')).toBeVisible({ timeout: 10_000 }); + const nameInput = page.locator('#name'); + await nameInput.clear(); + await nameInput.fill(testName); - // Exit settings page mode - await page.getByRole('button', { name: /Back to Chat/i }).click(); + await page.getByRole('button', { name: 'Save Identity Settings' }).click(); + await expect(page.getByText('Identity settings saved')).toBeVisible({ timeout: 10_000 }); - // --- Step 2: Verify via API (now returns fresh data after send_appstart fix) --- - const config = await getRadioConfig(); - expect(config.name).toBe(testName); + // Exit settings page mode + await page.getByRole('button', { name: /Back to Chat/i }).click(); - // --- Step 3: Verify persistence across page reload --- - await page.reload(); - await expect(page.getByText('Connected')).toBeVisible({ timeout: 15_000 }); + // --- Step 2: Verify via API (now returns fresh data after send_appstart fix) --- + const config = await getRadioConfig(); + expect(config.name).toBe(testName); + + // --- Step 3: Verify persistence across page reload --- + await page.reload(); + await expect(page.getByText('Connected')).toBeVisible({ timeout: 15_000 }); + + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /Identity/i }).click(); + await expect(page.locator('#name')).toHaveValue(testName, { timeout: 10_000 }); + } finally { + // Always restore original name, even when assertions fail. + try { + await updateRadioConfig({ name: originalName }); + } catch { + console.warn('Failed to restore radio name — manual intervention may be needed'); + } + } - await page.getByText('Settings').click(); - await page.getByRole('button', { name: /Identity/i }).click(); - await expect(page.locator('#name')).toHaveValue(testName, { timeout: 10_000 }); }); }); diff --git a/tests/test_echo_dedup.py b/tests/test_echo_dedup.py index 0dcbef4..e7d4232 100644 --- a/tests/test_echo_dedup.py +++ b/tests/test_echo_dedup.py @@ -6,6 +6,7 @@ messages, accumulate multi-path routing info, and ensure the dual DM processing paths (packet_processor + event_handler fallback) don't double-store messages. """ +import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -689,3 +690,108 @@ class TestDirectMessageDirectionDetection: # Not our message - should return None without attempting decryption assert result is None + + +class TestConcurrentDMDedup: + """Test that concurrent DM processing deduplicates via atomic INSERT OR IGNORE. + + On a mesh network, the same DM packet can arrive via two RF paths nearly + simultaneously, causing two concurrent calls to create_dm_message_from_decrypted. + SQLite's INSERT OR IGNORE ensures only one message is stored. + """ + + @pytest.mark.asyncio + async def test_concurrent_identical_dms_only_store_once(self, test_db, captured_broadcasts): + """Two concurrent create_dm_message_from_decrypted calls with identical content + should result in exactly one stored message.""" + from app.packet_processor import create_dm_message_from_decrypted + + pkt1, _ = await RawPacketRepository.create(b"concurrent_dm_1", SENDER_TIMESTAMP) + pkt2, _ = await RawPacketRepository.create(b"concurrent_dm_2", SENDER_TIMESTAMP + 1) + + decrypted = DecryptedDirectMessage( + timestamp=SENDER_TIMESTAMP, + flags=0, + message="Concurrent dedup test", + dest_hash="fa", + src_hash="a1", + ) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + results = await asyncio.gather( + create_dm_message_from_decrypted( + packet_id=pkt1, + decrypted=decrypted, + their_public_key=CONTACT_PUB, + our_public_key=OUR_PUB, + received_at=SENDER_TIMESTAMP, + path="aa", + outgoing=False, + ), + create_dm_message_from_decrypted( + packet_id=pkt2, + decrypted=decrypted, + their_public_key=CONTACT_PUB, + our_public_key=OUR_PUB, + received_at=SENDER_TIMESTAMP + 1, + path="bbcc", + outgoing=False, + ), + ) + + # Exactly one should create, the other should return None (duplicate) + created = [r for r in results if r is not None] + duplicates = [r for r in results if r is None] + assert len(created) == 1 + assert len(duplicates) == 1 + + # Only one message in DB + messages = await MessageRepository.get_all( + msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10 + ) + assert len(messages) == 1 + + @pytest.mark.asyncio + async def test_concurrent_channel_echoes_only_store_once(self, test_db, captured_broadcasts): + """Two concurrent create_message_from_decrypted calls with identical content + should result in exactly one stored message.""" + from app.packet_processor import create_message_from_decrypted + + pkt1, _ = await RawPacketRepository.create(b"concurrent_chan_1", SENDER_TIMESTAMP) + pkt2, _ = await RawPacketRepository.create(b"concurrent_chan_2", SENDER_TIMESTAMP + 1) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + results = await asyncio.gather( + create_message_from_decrypted( + packet_id=pkt1, + channel_key=CHANNEL_KEY, + sender="Alice", + message_text="Concurrent channel test", + timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP, + path="aa", + ), + create_message_from_decrypted( + packet_id=pkt2, + channel_key=CHANNEL_KEY, + sender="Alice", + message_text="Concurrent channel test", + timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP + 1, + path="bbcc", + ), + ) + + created = [r for r in results if r is not None] + duplicates = [r for r in results if r is None] + assert len(created) == 1 + assert len(duplicates) == 1 + + messages = await MessageRepository.get_all( + msg_type="CHAN", conversation_key=CHANNEL_KEY, limit=10 + ) + assert len(messages) == 1 diff --git a/tests/test_event_handlers.py b/tests/test_event_handlers.py index ca5e263..a351573 100644 --- a/tests/test_event_handlers.py +++ b/tests/test_event_handlers.py @@ -96,6 +96,36 @@ class TestAckTracking: assert "recent" in _pending_acks + def test_cleanup_handles_many_expired_acks_without_growth(self): + """Many tracked ACKs that all expire should all be cleaned up, + preventing unbounded memory growth when no ACKs ever arrive.""" + now = time.time() + for i in range(100): + _pending_acks[f"ack_{i}"] = (i, now - 300, 5000) # All expired (300s ago, 5s timeout) + + assert len(_pending_acks) == 100 + + cleanup_expired_acks() + + assert len(_pending_acks) == 0 + + def test_cleanup_preserves_valid_acks_among_expired(self): + """Cleanup removes only expired ACKs, preserving valid ones.""" + now = time.time() + # 50 expired + for i in range(50): + _pending_acks[f"expired_{i}"] = (i, now - 300, 5000) + # 50 valid + for i in range(50): + _pending_acks[f"valid_{i}"] = (100 + i, now, 60000) + + assert len(_pending_acks) == 100 + + cleanup_expired_acks() + + assert len(_pending_acks) == 50 + assert all(k.startswith("valid_") for k in _pending_acks) + class TestAckEventHandler: """Test the on_ack event handler.""" diff --git a/tests/test_send_messages.py b/tests/test_send_messages.py index 1d55447..52763bc 100644 --- a/tests/test_send_messages.py +++ b/tests/test_send_messages.py @@ -588,3 +588,102 @@ class TestResendChannelMessage: assert exc_info.value.status_code == 400 assert "expired" in exc_info.value.detail.lower() + + +class TestConcurrentChannelSends: + """Test that concurrent channel sends are serialized by the radio operation lock. + + The send_channel_message endpoint uses set_channel (slot 0) then send_chan_msg. + Concurrent sends must be serialized so two messages don't clobber the same + temporary radio slot. + """ + + @pytest.mark.asyncio + async def test_concurrent_sends_to_different_channels_both_succeed(self, test_db): + """Two concurrent send_channel_message calls to different channels + should both succeed — the radio_operation lock serializes them.""" + mc = _make_mc(name="TestNode") + chan_key_a = "aa" * 16 + chan_key_b = "bb" * 16 + await ChannelRepository.upsert(key=chan_key_a, name="#alpha") + await ChannelRepository.upsert(key=chan_key_b, name="#bravo") + + with ( + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event"), + ): + results = await asyncio.gather( + send_channel_message( + SendChannelMessageRequest(channel_key=chan_key_a, text="Hello alpha") + ), + send_channel_message( + SendChannelMessageRequest(channel_key=chan_key_b, text="Hello bravo") + ), + ) + + # Both should have returned Message objects with distinct IDs + assert results[0].id != results[1].id + assert results[0].conversation_key == chan_key_a.upper() + assert results[1].conversation_key == chan_key_b.upper() + + # set_channel should have been called twice (once per send, serialized) + assert mc.commands.set_channel.await_count == 2 + + # send_chan_msg should have been called twice + assert mc.commands.send_chan_msg.await_count == 2 + + # Both messages should be in DB + msgs_a = await MessageRepository.get_all( + msg_type="CHAN", conversation_key=chan_key_a.upper(), limit=10 + ) + msgs_b = await MessageRepository.get_all( + msg_type="CHAN", conversation_key=chan_key_b.upper(), limit=10 + ) + assert len(msgs_a) == 1 + assert len(msgs_b) == 1 + + @pytest.mark.asyncio + async def test_concurrent_sends_to_same_channel_both_succeed(self, test_db): + """Two concurrent sends to the same channel should both succeed + with distinct timestamps (serialized, no slot clobber).""" + mc = _make_mc(name="TestNode") + chan_key = "cc" * 16 + await ChannelRepository.upsert(key=chan_key, name="#charlie") + + call_count = 0 + + # Mock time to return incrementing seconds so the two messages + # get distinct sender_timestamps (avoiding same-second collision). + original_time = time.time + + def advancing_time(): + nonlocal call_count + call_count += 1 + return original_time() + call_count + + with ( + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event"), + patch("app.routers.messages.time") as mock_time, + ): + mock_time.time = advancing_time + results = await asyncio.gather( + send_channel_message( + SendChannelMessageRequest(channel_key=chan_key, text="Message one") + ), + send_channel_message( + SendChannelMessageRequest(channel_key=chan_key, text="Message two") + ), + ) + + assert results[0].id != results[1].id + texts = {results[0].text, results[1].text} + assert "TestNode: Message one" in texts + assert "TestNode: Message two" in texts + + msgs = await MessageRepository.get_all( + msg_type="CHAN", conversation_key=chan_key.upper(), limit=10 + ) + assert len(msgs) == 2