Add more tests and update AGENTS.md

This commit is contained in:
Jack Kingsman
2026-02-12 00:31:52 -08:00
parent b80093ba94
commit 7e7330eb12
8 changed files with 1916 additions and 7 deletions
+2 -2
View File
@@ -29,6 +29,7 @@ app/
├── websocket.py # WS manager + broadcast helpers
├── bot.py # Bot execution and outbound bot sends
├── dependencies.py # Shared FastAPI dependency providers
├── keystore.py # Ephemeral private/public key storage for DM decryption
├── frontend_static.py # Mount/serve built frontend (production)
└── routers/
├── health.py
@@ -142,8 +143,6 @@ app/
## WebSocket Events
- `health`
- `contacts`
- `channels`
- `contact`
- `message`
- `message_acked`
@@ -152,6 +151,7 @@ app/
- `success`
Initial WS connect sends `health` only. Contacts/channels are loaded by REST.
Note: the frontend WS hook also registers handlers for `contacts` and `channels` events, but the backend never emits them.
## Data Model Notes
+287 -2
View File
@@ -2,8 +2,8 @@
* Tests for API utilities.
*/
import { describe, it, expect } from 'vitest';
import { isAbortError } from '../api';
import { describe, it, expect, vi, afterEach } from 'vitest';
import { isAbortError, api } from '../api';
describe('isAbortError', () => {
it('returns true for AbortError', () => {
@@ -60,3 +60,288 @@ describe('isAbortError', () => {
expect(isAbortError(new CustomError())).toBe(false);
});
});
describe('fetchJson (via api methods)', () => {
const mockFetch = vi.fn();
// Replace global fetch before each test, restore after
afterEach(() => {
vi.restoreAllMocks();
});
function installMockFetch() {
global.fetch = mockFetch;
}
describe('successful responses', () => {
it('returns parsed JSON on a successful response', async () => {
installMockFetch();
const healthData = {
status: 'connected',
radio_connected: true,
connection_info: 'Serial: /dev/ttyUSB0',
database_size_mb: 1.2,
oldest_undecrypted_timestamp: null,
};
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve(healthData),
});
const result = await api.getHealth();
expect(result).toEqual(healthData);
});
it('calls fetch with /api prefix', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve([]),
});
await api.getContacts();
expect(mockFetch).toHaveBeenCalledTimes(1);
const [url] = mockFetch.mock.calls[0];
expect(url).toBe('/api/contacts?limit=100&offset=0');
});
});
describe('error handling', () => {
it('extracts detail from FastAPI JSON error response', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: false,
status: 503,
statusText: 'Service Unavailable',
text: () => Promise.resolve('{"detail": "Radio not connected"}'),
});
await expect(api.getHealth()).rejects.toThrow('Radio not connected');
});
it('uses raw text when error response is not JSON', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: false,
status: 500,
statusText: 'Internal Server Error',
text: () => Promise.resolve('Something broke on the server'),
});
await expect(api.getHealth()).rejects.toThrow('Something broke on the server');
});
it('uses statusText when error text is empty', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: false,
status: 502,
statusText: 'Bad Gateway',
text: () => Promise.resolve(''),
});
await expect(api.getHealth()).rejects.toThrow('Bad Gateway');
});
it('uses raw text when JSON lacks detail field', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: false,
status: 422,
statusText: 'Unprocessable Entity',
text: () => Promise.resolve('{"error": "validation failed"}'),
});
await expect(api.getHealth()).rejects.toThrow('{"error": "validation failed"}');
});
});
describe('Content-Type header', () => {
it('always sends Content-Type: application/json on GET requests', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ status: 'connected' }),
});
await api.getHealth();
const [, options] = mockFetch.mock.calls[0];
expect(options.headers).toEqual(
expect.objectContaining({ 'Content-Type': 'application/json' })
);
});
it('always sends Content-Type: application/json on POST requests', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ id: 1, text: 'hello' }),
});
await api.sendDirectMessage('abc123', 'hello');
const [, options] = mockFetch.mock.calls[0];
expect(options.headers).toEqual(
expect.objectContaining({ 'Content-Type': 'application/json' })
);
});
});
describe('HTTP methods and body', () => {
it('sends POST with JSON body for sendDirectMessage', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () =>
Promise.resolve({
id: 1,
type: 'PRIV',
text: 'hello',
destination: 'abc123',
}),
});
await api.sendDirectMessage('abc123', 'hello');
const [url, options] = mockFetch.mock.calls[0];
expect(url).toBe('/api/messages/direct');
expect(options.method).toBe('POST');
expect(JSON.parse(options.body)).toEqual({
destination: 'abc123',
text: 'hello',
});
});
it('sends PATCH with JSON body for updateRadioConfig', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ name: 'NewName' }),
});
await api.updateRadioConfig({ name: 'NewName' });
const [url, options] = mockFetch.mock.calls[0];
expect(url).toBe('/api/radio/config');
expect(options.method).toBe('PATCH');
expect(JSON.parse(options.body)).toEqual({ name: 'NewName' });
});
it('sends PUT with JSON body for setPrivateKey', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ status: 'ok' }),
});
await api.setPrivateKey('my-secret-key');
const [url, options] = mockFetch.mock.calls[0];
expect(url).toBe('/api/radio/private-key');
expect(options.method).toBe('PUT');
expect(JSON.parse(options.body)).toEqual({ private_key: 'my-secret-key' });
});
it('sends DELETE for deleteContact', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ status: 'ok' }),
});
await api.deleteContact('pubkey123');
const [url, options] = mockFetch.mock.calls[0];
expect(url).toBe('/api/contacts/pubkey123');
expect(options.method).toBe('DELETE');
});
it('sends POST without body for sendAdvertisement', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve({ status: 'ok' }),
});
await api.sendAdvertisement();
const [url, options] = mockFetch.mock.calls[0];
expect(url).toBe('/api/radio/advertise');
expect(options.method).toBe('POST');
expect(options.body).toBeUndefined();
});
});
describe('AbortSignal passthrough', () => {
it('passes signal option through to fetch for getMessages', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve([]),
});
const controller = new AbortController();
await api.getMessages({ limit: 10 }, controller.signal);
const [, options] = mockFetch.mock.calls[0];
expect(options.signal).toBe(controller.signal);
});
it('calls fetch without signal when none is provided', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve([]),
});
await api.getMessages({ limit: 10 });
const [, options] = mockFetch.mock.calls[0];
expect(options.signal).toBeUndefined();
});
});
describe('api.getMessages query parameter construction', () => {
it('builds query string with all parameters', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve([]),
});
await api.getMessages({
limit: 50,
offset: 10,
type: 'PRIV',
conversation_key: 'abc123',
before: 1700000000,
before_id: 99,
});
const [url] = mockFetch.mock.calls[0];
expect(url).toContain('/api/messages?');
expect(url).toContain('limit=50');
expect(url).toContain('offset=10');
expect(url).toContain('type=PRIV');
expect(url).toContain('conversation_key=abc123');
expect(url).toContain('before=1700000000');
expect(url).toContain('before_id=99');
});
it('builds URL without query string when no params given', async () => {
installMockFetch();
mockFetch.mockResolvedValueOnce({
ok: true,
json: () => Promise.resolve([]),
});
await api.getMessages();
const [url] = mockFetch.mock.calls[0];
expect(url).toBe('/api/messages');
});
});
});
+1
View File
@@ -43,6 +43,7 @@ const baseSettings: AppSettings = {
last_message_times: {},
preferences_migrated: false,
advert_interval: 0,
last_advert_time: 0,
bots: [],
};
+1
View File
@@ -131,6 +131,7 @@ export interface AppSettings {
last_message_times: Record<string, number>;
preferences_migrated: boolean;
advert_interval: number;
last_advert_time: number;
bots: BotConfig[];
}
+199
View File
@@ -431,3 +431,202 @@ class TestEventHandlerRegistration:
# Should have exactly 5 fresh subscriptions
assert len(_active_subscriptions) == 5
class TestOnPathUpdate:
"""Test the on_path_update event handler."""
@pytest.mark.asyncio
async def test_updates_path_for_existing_contact(self):
"""Path is updated when the contact exists in the database."""
from app.event_handlers import on_path_update
mock_contact = MagicMock()
mock_contact.public_key = "aa" * 32
with patch("app.event_handlers.ContactRepository") as mock_repo:
mock_repo.get_by_key_prefix = AsyncMock(return_value=mock_contact)
mock_repo.update_path = AsyncMock()
class MockEvent:
payload = {
"pubkey_prefix": "aaaaaa",
"path": "0102",
"path_len": 2,
}
await on_path_update(MockEvent())
mock_repo.get_by_key_prefix.assert_called_once_with("aaaaaa")
mock_repo.update_path.assert_called_once_with("aa" * 32, "0102", 2)
@pytest.mark.asyncio
async def test_does_nothing_when_contact_not_found(self):
"""No update is attempted when the contact is not in the database."""
from app.event_handlers import on_path_update
with patch("app.event_handlers.ContactRepository") as mock_repo:
mock_repo.get_by_key_prefix = AsyncMock(return_value=None)
mock_repo.update_path = AsyncMock()
class MockEvent:
payload = {
"pubkey_prefix": "unknown",
"path": "0102",
"path_len": 2,
}
await on_path_update(MockEvent())
mock_repo.get_by_key_prefix.assert_called_once_with("unknown")
mock_repo.update_path.assert_not_called()
@pytest.mark.asyncio
async def test_uses_defaults_for_missing_payload_fields(self):
"""Missing payload fields fall back to defaults (empty path, -1 length)."""
from app.event_handlers import on_path_update
mock_contact = MagicMock()
mock_contact.public_key = "bb" * 32
with patch("app.event_handlers.ContactRepository") as mock_repo:
mock_repo.get_by_key_prefix = AsyncMock(return_value=mock_contact)
mock_repo.update_path = AsyncMock()
class MockEvent:
payload = {}
await on_path_update(MockEvent())
mock_repo.get_by_key_prefix.assert_called_once_with("")
mock_repo.update_path.assert_called_once_with("bb" * 32, "", -1)
class TestOnNewContact:
"""Test the on_new_contact event handler."""
@pytest.mark.asyncio
async def test_creates_contact_and_broadcasts(self):
"""Valid new contact is upserted and broadcast via WebSocket."""
from app.event_handlers import on_new_contact
with (
patch("app.event_handlers.ContactRepository") as mock_repo,
patch("app.event_handlers.broadcast_event") as mock_broadcast,
patch("app.event_handlers.time") as mock_time,
):
mock_time.time.return_value = 1700000000
mock_repo.upsert = AsyncMock()
class MockEvent:
payload = {
"public_key": "cc" * 32,
"adv_name": "Charlie",
"type": 1,
"flags": 0,
}
await on_new_contact(MockEvent())
mock_repo.upsert.assert_called_once()
upserted_data = mock_repo.upsert.call_args[0][0]
assert upserted_data["public_key"] == "cc" * 32
assert upserted_data["name"] == "Charlie"
assert upserted_data["on_radio"] is True
assert upserted_data["last_seen"] == 1700000000
mock_broadcast.assert_called_once()
event_type, contact_data = mock_broadcast.call_args[0]
assert event_type == "contact"
assert contact_data["public_key"] == "cc" * 32
@pytest.mark.asyncio
async def test_returns_early_on_empty_public_key(self):
"""Handler exits without upserting when public_key is empty."""
from app.event_handlers import on_new_contact
with (
patch("app.event_handlers.ContactRepository") as mock_repo,
patch("app.event_handlers.broadcast_event") as mock_broadcast,
):
mock_repo.upsert = AsyncMock()
class MockEvent:
payload = {"public_key": "", "adv_name": "Ghost"}
await on_new_contact(MockEvent())
mock_repo.upsert.assert_not_called()
mock_broadcast.assert_not_called()
@pytest.mark.asyncio
async def test_returns_early_on_missing_public_key(self):
"""Handler exits without upserting when public_key field is absent."""
from app.event_handlers import on_new_contact
with (
patch("app.event_handlers.ContactRepository") as mock_repo,
patch("app.event_handlers.broadcast_event") as mock_broadcast,
):
mock_repo.upsert = AsyncMock()
class MockEvent:
payload = {"adv_name": "NoKey"}
await on_new_contact(MockEvent())
mock_repo.upsert.assert_not_called()
mock_broadcast.assert_not_called()
@pytest.mark.asyncio
async def test_sets_on_radio_true(self):
"""Contact data passed to upsert has on_radio=True."""
from app.event_handlers import on_new_contact
with (
patch("app.event_handlers.ContactRepository") as mock_repo,
patch("app.event_handlers.broadcast_event"),
patch("app.event_handlers.time") as mock_time,
):
mock_time.time.return_value = 1700000000
mock_repo.upsert = AsyncMock()
class MockEvent:
payload = {
"public_key": "dd" * 32,
"adv_name": "Delta",
"type": 0,
"flags": 0,
}
await on_new_contact(MockEvent())
upserted_data = mock_repo.upsert.call_args[0][0]
assert upserted_data["on_radio"] is True
@pytest.mark.asyncio
async def test_sets_last_seen_to_current_timestamp(self):
"""Contact data includes last_seen set to current time."""
from app.event_handlers import on_new_contact
with (
patch("app.event_handlers.ContactRepository") as mock_repo,
patch("app.event_handlers.broadcast_event"),
patch("app.event_handlers.time") as mock_time,
):
mock_time.time.return_value = 1700099999
mock_repo.upsert = AsyncMock()
class MockEvent:
payload = {
"public_key": "ee" * 32,
"adv_name": "Echo",
"type": 0,
"flags": 0,
}
await on_new_contact(MockEvent())
upserted_data = mock_repo.upsert.call_args[0][0]
assert upserted_data["last_seen"] == 1700099999
+792
View File
@@ -14,6 +14,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.database import Database
from app.decoder import DecryptedDirectMessage, PacketInfo, PayloadType
from app.repository import (
ChannelRepository,
ContactRepository,
@@ -1193,3 +1194,794 @@ class TestRepeaterMessageFiltering:
msg_type="PRIV", conversation_key=self.CLIENT_PUB.lower(), limit=10
)
assert len(messages) == 1
class TestProcessDirectMessageDispatch:
"""T1: Test _process_direct_message dispatch logic.
This tests the internal function that determines direction (incoming vs outgoing),
looks up candidate contacts by first-byte hash, and attempts DM decryption.
Uses real DB for contacts, mocks keystore and decoder functions.
"""
# Our public key starts with 0xFA
OUR_PUB_HEX = "FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46"
OUR_PUB = bytes.fromhex(OUR_PUB_HEX)
OUR_PRIV = bytes(64) # Dummy 64-byte private key (mocked, never actually used for crypto)
# Contact whose public key starts with 0xA1
CONTACT_PUB_HEX = "a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7"
CONTACT_PUB = bytes.fromhex(CONTACT_PUB_HEX)
def _make_packet_info(
self, dest_byte: int, src_byte: int, payload_extra: bytes = b"\x00" * 32
) -> PacketInfo:
"""Build a PacketInfo with a TEXT_MESSAGE payload containing given dest/src hash bytes."""
# payload: [dest_hash:1][src_hash:1][mac:2][ciphertext...]
payload = bytes([dest_byte, src_byte]) + payload_extra
return PacketInfo(
route_type=1, # FLOOD
payload_type=PayloadType.TEXT_MESSAGE,
payload_version=0,
path_length=0,
path=b"",
payload=payload,
)
def _keystore_patches(self, has_key=True):
"""Return a context manager stack that patches keystore functions."""
from contextlib import ExitStack
stack = ExitStack()
stack.enter_context(patch("app.packet_processor.has_private_key", return_value=has_key))
stack.enter_context(
patch(
"app.packet_processor.get_private_key",
return_value=self.OUR_PRIV if has_key else None,
)
)
stack.enter_context(
patch(
"app.packet_processor.get_public_key",
return_value=self.OUR_PUB if has_key else None,
)
)
return stack
@pytest.mark.asyncio
async def test_returns_none_when_no_private_key(self, test_db):
"""Without a private key, _process_direct_message returns None immediately."""
from app.packet_processor import _process_direct_message
packet_info = self._make_packet_info(0xFA, 0xA1)
with self._keystore_patches(has_key=False):
result = await _process_direct_message(b"\x09\x00" + b"\x00" * 30, 1, 1000, packet_info)
assert result is None
@pytest.mark.asyncio
async def test_returns_none_when_packet_info_is_none(self, test_db):
"""When packet_info is None and parse_packet also returns None, returns None."""
from app.packet_processor import _process_direct_message
with self._keystore_patches(has_key=True):
with patch("app.packet_processor.parse_packet", return_value=None):
result = await _process_direct_message(b"\x09\x00", 1, 1000, None)
assert result is None
@pytest.mark.asyncio
async def test_returns_none_when_payload_too_short(self, test_db):
"""Payload shorter than 4 bytes causes early return."""
from app.packet_processor import _process_direct_message
short_info = PacketInfo(
route_type=1,
payload_type=PayloadType.TEXT_MESSAGE,
payload_version=0,
path_length=0,
path=b"",
payload=b"\xfa\xa1\x00", # Only 3 bytes
)
with self._keystore_patches(has_key=True):
result = await _process_direct_message(b"\x09\x00", 1, 1000, short_info)
assert result is None
@pytest.mark.asyncio
async def test_returns_none_when_neither_hash_matches_us(self, test_db):
"""If neither dest_hash nor src_hash matches our first byte, returns None."""
from app.packet_processor import _process_direct_message
# Our first byte is 0xFA; use 0xBB and 0xCC instead
packet_info = self._make_packet_info(0xBB, 0xCC)
with self._keystore_patches(has_key=True):
result = await _process_direct_message(b"\x09\x00" + b"\x00" * 30, 1, 1000, packet_info)
assert result is None
@pytest.mark.asyncio
async def test_incoming_direction_dest_is_us(self, test_db, captured_broadcasts):
"""When dest_hash matches us and src_hash does not, direction is incoming."""
from app.decoder import DecryptedDirectMessage
from app.packet_processor import _process_direct_message
# dest=0xFA (us), src=0xA1 (contact) -> incoming
packet_info = self._make_packet_info(0xFA, 0xA1)
# Create contact in DB so lookup succeeds
await ContactRepository.upsert(
{"public_key": self.CONTACT_PUB_HEX, "name": "Alice", "type": 1}
)
mock_decrypted = DecryptedDirectMessage(
timestamp=1000, flags=0, message="Hello", dest_hash="fa", src_hash="a1"
)
broadcasts, mock_broadcast = captured_broadcasts
with self._keystore_patches(has_key=True):
with patch(
"app.packet_processor.try_decrypt_dm", return_value=mock_decrypted
) as mock_try:
with patch("app.packet_processor.broadcast_event", mock_broadcast):
result = await _process_direct_message(
b"\x09\x00" + b"\x00" * 30, 1, 1000, packet_info
)
assert result is not None
assert result["decrypted"] is True
# Verify try_decrypt_dm was called with our_public_key (incoming => filter enabled)
call_kwargs = mock_try.call_args
assert (
call_kwargs[1].get("our_public_key") == self.OUR_PUB
or call_kwargs[0][3] == self.OUR_PUB
)
@pytest.mark.asyncio
async def test_outgoing_direction_src_is_us(self, test_db, captured_broadcasts):
"""When src_hash matches us and dest_hash does not, direction is outgoing."""
from app.decoder import DecryptedDirectMessage
from app.packet_processor import _process_direct_message
# src=0xFA (us), dest=0xA1 (contact) -> outgoing
packet_info = self._make_packet_info(0xA1, 0xFA)
await ContactRepository.upsert(
{"public_key": self.CONTACT_PUB_HEX, "name": "Alice", "type": 1}
)
mock_decrypted = DecryptedDirectMessage(
timestamp=1000, flags=0, message="My outgoing", dest_hash="a1", src_hash="fa"
)
broadcasts, mock_broadcast = captured_broadcasts
with self._keystore_patches(has_key=True):
with patch(
"app.packet_processor.try_decrypt_dm", return_value=mock_decrypted
) as mock_try:
with patch("app.packet_processor.broadcast_event", mock_broadcast):
result = await _process_direct_message(
b"\x09\x00" + b"\x00" * 30, 1, 1000, packet_info
)
assert result is not None
assert result["decrypted"] is True
# Outgoing: our_public_key should be None (skip dest_hash filter)
call_kwargs = mock_try.call_args
assert call_kwargs[1].get("our_public_key") is None or call_kwargs[0][3] is None
@pytest.mark.asyncio
async def test_ambiguous_both_hashes_match_defaults_incoming(
self, test_db, captured_broadcasts
):
"""When both dest_hash and src_hash match our first byte, defaults to incoming."""
from app.decoder import DecryptedDirectMessage
from app.packet_processor import _process_direct_message
# Both 0xFA -> ambiguous, should default to incoming
packet_info = self._make_packet_info(0xFA, 0xFA)
# Contact also starts with 0xFA
fa_contact_pub = "fa" + "bb" * 31
await ContactRepository.upsert(
{"public_key": fa_contact_pub, "name": "FaContact", "type": 1}
)
mock_decrypted = DecryptedDirectMessage(
timestamp=1000, flags=0, message="Ambiguous", dest_hash="fa", src_hash="fa"
)
broadcasts, mock_broadcast = captured_broadcasts
with self._keystore_patches(has_key=True):
with patch(
"app.packet_processor.try_decrypt_dm", return_value=mock_decrypted
) as mock_try:
with patch("app.packet_processor.broadcast_event", mock_broadcast):
result = await _process_direct_message(
b"\x09\x00" + b"\x00" * 30, 1, 1000, packet_info
)
assert result is not None
# For incoming, try_decrypt_dm should be called with our_public_key set
call_kwargs = mock_try.call_args
assert (
call_kwargs[1].get("our_public_key") == self.OUR_PUB
or call_kwargs[0][3] == self.OUR_PUB
)
@pytest.mark.asyncio
async def test_returns_none_when_no_candidate_contacts(self, test_db):
"""If no contacts match the relevant hash byte, returns None."""
from app.packet_processor import _process_direct_message
# Incoming: dest=0xFA (us), src=0xA1 -> looks up contacts starting with "a1"
packet_info = self._make_packet_info(0xFA, 0xA1)
# Don't create any contacts -> no candidates
with self._keystore_patches(has_key=True):
result = await _process_direct_message(b"\x09\x00" + b"\x00" * 30, 1, 1000, packet_info)
assert result is None
@pytest.mark.asyncio
async def test_returns_none_when_decrypt_fails_all_candidates(self, test_db):
"""When try_decrypt_dm returns None for all candidates, returns None."""
from app.packet_processor import _process_direct_message
packet_info = self._make_packet_info(0xFA, 0xA1)
await ContactRepository.upsert(
{"public_key": self.CONTACT_PUB_HEX, "name": "Alice", "type": 1}
)
with self._keystore_patches(has_key=True):
with patch("app.packet_processor.try_decrypt_dm", return_value=None):
result = await _process_direct_message(
b"\x09\x00" + b"\x00" * 30, 1, 1000, packet_info
)
assert result is None
@pytest.mark.asyncio
async def test_creates_message_on_successful_decrypt(self, test_db, captured_broadcasts):
"""Successful decryption creates a DM message via create_dm_message_from_decrypted."""
from app.decoder import DecryptedDirectMessage
from app.packet_processor import _process_direct_message
# First, create a raw packet so create_dm_message_from_decrypted can link it
packet_id, _ = await RawPacketRepository.create(b"\x09\x00" + b"\x00" * 30, 1000)
packet_info = self._make_packet_info(0xFA, 0xA1)
await ContactRepository.upsert(
{"public_key": self.CONTACT_PUB_HEX, "name": "Alice", "type": 1}
)
mock_decrypted = DecryptedDirectMessage(
timestamp=1000, flags=0, message="Real message", dest_hash="fa", src_hash="a1"
)
broadcasts, mock_broadcast = captured_broadcasts
with self._keystore_patches(has_key=True):
with patch("app.packet_processor.try_decrypt_dm", return_value=mock_decrypted):
with patch("app.packet_processor.broadcast_event", mock_broadcast):
result = await _process_direct_message(
b"\x09\x00" + b"\x00" * 30, packet_id, 1000, packet_info
)
assert result is not None
assert result["decrypted"] is True
assert result["message_id"] is not None
# Verify message was stored in DB
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=self.CONTACT_PUB_HEX.lower(), limit=10
)
assert len(messages) == 1
assert messages[0].text == "Real message"
assert messages[0].outgoing is False
class TestProcessRawPacketIntegration:
"""T2: Test process_raw_packet dispatching to sub-processors.
Verifies that the main entry point correctly routes packets by payload type,
always broadcasts raw_packet events, and returns the expected result structure.
Uses real DB for packet storage, mocks parse_packet to control payload type.
"""
@pytest.mark.asyncio
async def test_dispatches_group_text(self, test_db, captured_broadcasts):
"""GROUP_TEXT packets are dispatched to _process_group_text."""
from app.packet_processor import process_raw_packet
broadcasts, mock_broadcast = captured_broadcasts
raw = b"\x15\x00" + b"\xaa" * 30 # Dummy bytes, parsing is mocked
group_info = PacketInfo(
route_type=1,
payload_type=PayloadType.GROUP_TEXT,
payload_version=0,
path_length=0,
path=b"",
payload=b"\xab" + b"\x00" * 20,
)
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.parse_packet", return_value=group_info):
with patch(
"app.packet_processor._process_group_text",
new_callable=AsyncMock,
return_value={
"decrypted": True,
"channel_name": "#test",
"sender": "Bob",
"message_id": 99,
},
) as mock_gt:
result = await process_raw_packet(raw, timestamp=2000)
mock_gt.assert_awaited_once()
assert result["decrypted"] is True
assert result["channel_name"] == "#test"
@pytest.mark.asyncio
async def test_dispatches_group_text_even_for_duplicates(self, test_db, captured_broadcasts):
"""GROUP_TEXT always dispatches regardless of is_new_packet flag."""
from app.packet_processor import process_raw_packet
broadcasts, mock_broadcast = captured_broadcasts
group_info = PacketInfo(
route_type=1,
payload_type=PayloadType.GROUP_TEXT,
payload_version=0,
path_length=0,
path=b"",
payload=b"\xab" + b"\x00" * 20,
)
raw = b"\x15\x00" + b"\xbb" * 30
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.parse_packet", return_value=group_info):
with patch(
"app.packet_processor._process_group_text",
new_callable=AsyncMock,
return_value=None,
) as mock_gt:
# Process same packet twice
await process_raw_packet(raw, timestamp=2000)
await process_raw_packet(raw, timestamp=2001)
# _process_group_text should be called both times
assert mock_gt.await_count == 2
@pytest.mark.asyncio
async def test_dispatches_advert_only_for_new_packets(self, test_db, captured_broadcasts):
"""ADVERT packets are dispatched only when is_new_packet is True."""
from app.packet_processor import process_raw_packet
broadcasts, mock_broadcast = captured_broadcasts
advert_info = PacketInfo(
route_type=1,
payload_type=PayloadType.ADVERT,
payload_version=0,
path_length=0,
path=b"",
payload=b"\x00" * 101,
)
raw = b"\x11\x00" + b"\xcc" * 30
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.parse_packet", return_value=advert_info):
with patch(
"app.packet_processor._process_advertisement",
new_callable=AsyncMock,
) as mock_adv:
# First call: new packet -> should dispatch
await process_raw_packet(raw, timestamp=3000)
# Second call: duplicate -> should NOT dispatch
await process_raw_packet(raw, timestamp=3001)
assert mock_adv.await_count == 1
@pytest.mark.asyncio
async def test_dispatches_text_message(self, test_db, captured_broadcasts):
"""TEXT_MESSAGE packets are dispatched to _process_direct_message."""
from app.packet_processor import process_raw_packet
broadcasts, mock_broadcast = captured_broadcasts
dm_info = PacketInfo(
route_type=1,
payload_type=PayloadType.TEXT_MESSAGE,
payload_version=0,
path_length=0,
path=b"",
payload=b"\xfa\xa1" + b"\x00" * 30,
)
raw = b"\x09\x00" + b"\xdd" * 30
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.parse_packet", return_value=dm_info):
with patch(
"app.packet_processor._process_direct_message",
new_callable=AsyncMock,
return_value={"decrypted": True, "sender": "Alice", "message_id": 42},
) as mock_dm:
result = await process_raw_packet(raw, timestamp=4000)
mock_dm.assert_awaited_once()
assert result["decrypted"] is True
@pytest.mark.asyncio
async def test_always_broadcasts_raw_packet(self, test_db, captured_broadcasts):
"""Every call to process_raw_packet broadcasts a raw_packet event."""
from app.packet_processor import process_raw_packet
broadcasts, mock_broadcast = captured_broadcasts
# Use a packet type that does not trigger any sub-processor (e.g. ACK)
ack_info = PacketInfo(
route_type=1,
payload_type=PayloadType.ACK,
payload_version=0,
path_length=0,
path=b"",
payload=b"\x00" * 10,
)
raw = b"\x0d\x00" + b"\xee" * 20
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.parse_packet", return_value=ack_info):
await process_raw_packet(raw, timestamp=5000)
raw_broadcasts = [b for b in broadcasts if b["type"] == "raw_packet"]
assert len(raw_broadcasts) == 1
assert raw_broadcasts[0]["data"]["payload_type"] == "ACK"
@pytest.mark.asyncio
async def test_result_structure(self, test_db, captured_broadcasts):
"""process_raw_packet returns a dict with all expected keys."""
from app.packet_processor import process_raw_packet
broadcasts, mock_broadcast = captured_broadcasts
raw = b"\x0d\x00" + b"\xff" * 20
ack_info = PacketInfo(
route_type=1,
payload_type=PayloadType.ACK,
payload_version=0,
path_length=0,
path=b"",
payload=b"\x00" * 10,
)
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.parse_packet", return_value=ack_info):
result = await process_raw_packet(raw, timestamp=6000, snr=5.5, rssi=-80)
assert "packet_id" in result
assert result["timestamp"] == 6000
assert result["snr"] == 5.5
assert result["rssi"] == -80
assert result["payload_type"] == "ACK"
assert result["decrypted"] is False
assert result["message_id"] is None
@pytest.mark.asyncio
async def test_raw_packet_stored_in_db(self, test_db, captured_broadcasts):
"""process_raw_packet stores the raw bytes in the database."""
from app.packet_processor import process_raw_packet
broadcasts, mock_broadcast = captured_broadcasts
raw = b"\x0d\x00" + b"\xab\xcd" * 10
ack_info = PacketInfo(
route_type=1,
payload_type=PayloadType.ACK,
payload_version=0,
path_length=0,
path=b"",
payload=b"\x00" * 10,
)
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.parse_packet", return_value=ack_info):
result = await process_raw_packet(raw, timestamp=7000)
# Verify packet is in undecrypted list
undecrypted = await RawPacketRepository.get_all_undecrypted()
packet_ids = [p[0] for p in undecrypted]
assert result["packet_id"] in packet_ids
class TestRunHistoricalDmDecryption:
"""T3: Test run_historical_dm_decryption background task.
Verifies iteration over undecrypted packets, decryption attempts,
message creation, and notification broadcasting.
Uses real DB for raw packet storage, mocks try_decrypt_dm and parse_packet.
"""
OUR_PUB_HEX = "FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46"
OUR_PUB = bytes.fromhex(OUR_PUB_HEX)
OUR_PRIV = bytes.fromhex(
"58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B"
"77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1"
)
CONTACT_PUB_HEX = "a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7"
CONTACT_PUB = bytes.fromhex(CONTACT_PUB_HEX)
def _make_text_message_bytes(self, unique_suffix: bytes = b"") -> bytes:
"""Build a minimal raw packet with TEXT_MESSAGE payload type.
Packet header byte: route_type=FLOOD(0x01), payload_type=TEXT_MESSAGE(0x02), version=0
header = (0x02 << 2) | 0x01 = 0x09
Then path_length=0, then payload with dest/src/mac/ciphertext.
"""
header = 0x09
path_length = 0
# Payload: [dest:1][src:1][mac:2][ciphertext:16] = 20 bytes min
payload = bytes([0xFA, 0xA1]) + b"\x00\x00" + b"\xab" * 16 + unique_suffix
return bytes([header, path_length]) + payload
@pytest.mark.asyncio
async def test_returns_early_when_no_undecrypted_packets(self, test_db, captured_broadcasts):
"""With no undecrypted TEXT_MESSAGE packets, returns immediately without broadcasting."""
from app.packet_processor import run_historical_dm_decryption
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.websocket.ws_manager") as mock_ws:
mock_ws.broadcast = AsyncMock()
await run_historical_dm_decryption(
self.OUR_PRIV, self.CONTACT_PUB, self.CONTACT_PUB_HEX
)
# No success broadcast should have been sent
success_broadcasts = [b for b in broadcasts if b["type"] == "success"]
assert len(success_broadcasts) == 0
@pytest.mark.asyncio
async def test_iterates_and_decrypts_packets(self, test_db, captured_broadcasts):
"""Successfully decrypts packets and creates messages in DB."""
from app.packet_processor import run_historical_dm_decryption
# Store some TEXT_MESSAGE packets in DB
raw1 = self._make_text_message_bytes(b"\x01")
raw2 = self._make_text_message_bytes(b"\x02")
raw3 = self._make_text_message_bytes(b"\x03")
pkt_id1, _ = await RawPacketRepository.create(raw1, 1000)
pkt_id2, _ = await RawPacketRepository.create(raw2, 1001)
pkt_id3, _ = await RawPacketRepository.create(raw3, 1002)
mock_packet_info = PacketInfo(
route_type=1,
payload_type=PayloadType.TEXT_MESSAGE,
payload_version=0,
path_length=2,
path=b"\xaa\xbb",
payload=b"\xfa\xa1" + b"\x00" * 18,
)
call_count = 0
def mock_decrypt(raw, priv, contact_pub, our_public_key=None):
nonlocal call_count
call_count += 1
# Decrypt packets 1 and 3, fail on packet 2
if call_count in (1, 3):
return DecryptedDirectMessage(
timestamp=1000 + call_count,
flags=0,
message=f"Decrypted message {call_count}",
dest_hash="fa",
src_hash="a1",
)
return None
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.try_decrypt_dm", side_effect=mock_decrypt):
with patch("app.packet_processor.parse_packet", return_value=mock_packet_info):
with patch("app.packet_processor.derive_public_key", return_value=self.OUR_PUB):
with patch("app.websocket.ws_manager") as mock_ws:
mock_ws.broadcast = AsyncMock()
await run_historical_dm_decryption(
self.OUR_PRIV, self.CONTACT_PUB, self.CONTACT_PUB_HEX
)
# 2 of 3 packets should have been decrypted
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=self.CONTACT_PUB_HEX.lower(), limit=10
)
assert len(messages) == 2
@pytest.mark.asyncio
async def test_does_not_create_message_on_decrypt_failure(self, test_db, captured_broadcasts):
"""When try_decrypt_dm returns None for all packets, no messages are created."""
from app.packet_processor import run_historical_dm_decryption
raw = self._make_text_message_bytes(b"\x10")
await RawPacketRepository.create(raw, 2000)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.try_decrypt_dm", return_value=None):
with patch("app.packet_processor.derive_public_key", return_value=self.OUR_PUB):
with patch("app.websocket.ws_manager") as mock_ws:
mock_ws.broadcast = AsyncMock()
await run_historical_dm_decryption(
self.OUR_PRIV, self.CONTACT_PUB, self.CONTACT_PUB_HEX
)
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=self.CONTACT_PUB_HEX.lower(), limit=10
)
assert len(messages) == 0
@pytest.mark.asyncio
async def test_sets_trigger_bot_false(self, test_db, captured_broadcasts):
"""Historical decryption calls create_dm_message_from_decrypted with trigger_bot=False."""
from app.packet_processor import run_historical_dm_decryption
raw = self._make_text_message_bytes(b"\x20")
await RawPacketRepository.create(raw, 3000)
mock_packet_info = PacketInfo(
route_type=1,
payload_type=PayloadType.TEXT_MESSAGE,
payload_version=0,
path_length=0,
path=b"",
payload=b"\xfa\xa1" + b"\x00" * 18,
)
mock_decrypted = DecryptedDirectMessage(
timestamp=3000, flags=0, message="Historical msg", dest_hash="fa", src_hash="a1"
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.try_decrypt_dm", return_value=mock_decrypted):
with patch("app.packet_processor.parse_packet", return_value=mock_packet_info):
with patch("app.packet_processor.derive_public_key", return_value=self.OUR_PUB):
with patch(
"app.packet_processor.create_dm_message_from_decrypted",
new_callable=AsyncMock,
return_value=42,
) as mock_create:
with patch("app.websocket.ws_manager") as mock_ws:
mock_ws.broadcast = AsyncMock()
await run_historical_dm_decryption(
self.OUR_PRIV, self.CONTACT_PUB, self.CONTACT_PUB_HEX
)
mock_create.assert_awaited_once()
call_kwargs = mock_create.call_args[1]
assert call_kwargs["trigger_bot"] is False
@pytest.mark.asyncio
async def test_broadcasts_success_when_decrypted(self, test_db, captured_broadcasts):
"""When at least one packet is decrypted, broadcasts a success notification."""
from app.packet_processor import run_historical_dm_decryption
raw = self._make_text_message_bytes(b"\x30")
await RawPacketRepository.create(raw, 4000)
mock_packet_info = PacketInfo(
route_type=1,
payload_type=PayloadType.TEXT_MESSAGE,
payload_version=0,
path_length=0,
path=b"",
payload=b"\xfa\xa1" + b"\x00" * 18,
)
mock_decrypted = DecryptedDirectMessage(
timestamp=4000, flags=0, message="Success msg", dest_hash="fa", src_hash="a1"
)
broadcasts, mock_broadcast = captured_broadcasts
success_calls = []
def mock_success(message, details=None):
success_calls.append({"message": message, "details": details})
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.try_decrypt_dm", return_value=mock_decrypted):
with patch("app.packet_processor.parse_packet", return_value=mock_packet_info):
with patch("app.packet_processor.derive_public_key", return_value=self.OUR_PUB):
with patch("app.websocket.broadcast_success", mock_success):
await run_historical_dm_decryption(
self.OUR_PRIV,
self.CONTACT_PUB,
self.CONTACT_PUB_HEX,
display_name="Alice",
)
assert len(success_calls) == 1
assert "Alice" in success_calls[0]["message"]
assert "1 message" in success_calls[0]["details"]
@pytest.mark.asyncio
async def test_no_broadcast_when_zero_decrypted(self, test_db, captured_broadcasts):
"""When no packets are decrypted, no success notification is broadcast."""
from app.packet_processor import run_historical_dm_decryption
raw = self._make_text_message_bytes(b"\x40")
await RawPacketRepository.create(raw, 5000)
broadcasts, mock_broadcast = captured_broadcasts
success_calls = []
def mock_success(message, details=None):
success_calls.append({"message": message, "details": details})
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.try_decrypt_dm", return_value=None):
with patch("app.packet_processor.derive_public_key", return_value=self.OUR_PUB):
with patch("app.websocket.broadcast_success", mock_success):
await run_historical_dm_decryption(
self.OUR_PRIV, self.CONTACT_PUB, self.CONTACT_PUB_HEX
)
assert len(success_calls) == 0
@pytest.mark.asyncio
async def test_plural_message_in_success_broadcast(self, test_db, captured_broadcasts):
"""Success notification uses plural 'messages' when count > 1."""
from app.packet_processor import run_historical_dm_decryption
# Create two distinct TEXT_MESSAGE packets
raw1 = self._make_text_message_bytes(b"\x50")
raw2 = self._make_text_message_bytes(b"\x51")
await RawPacketRepository.create(raw1, 6000)
await RawPacketRepository.create(raw2, 6001)
mock_packet_info = PacketInfo(
route_type=1,
payload_type=PayloadType.TEXT_MESSAGE,
payload_version=0,
path_length=0,
path=b"",
payload=b"\xfa\xa1" + b"\x00" * 18,
)
call_idx = 0
def mock_decrypt(raw, priv, contact_pub, our_public_key=None):
nonlocal call_idx
call_idx += 1
return DecryptedDirectMessage(
timestamp=6000 + call_idx,
flags=0,
message=f"Msg {call_idx}",
dest_hash="fa",
src_hash="a1",
)
broadcasts, mock_broadcast = captured_broadcasts
success_calls = []
def mock_success(message, details=None):
success_calls.append({"message": message, "details": details})
with patch("app.packet_processor.broadcast_event", mock_broadcast):
with patch("app.packet_processor.try_decrypt_dm", side_effect=mock_decrypt):
with patch("app.packet_processor.parse_packet", return_value=mock_packet_info):
with patch("app.packet_processor.derive_public_key", return_value=self.OUR_PUB):
with patch("app.websocket.broadcast_success", mock_success):
await run_historical_dm_decryption(
self.OUR_PRIV,
self.CONTACT_PUB,
self.CONTACT_PUB_HEX,
display_name="Bob",
)
assert len(success_calls) == 1
assert "2 messages" in success_calls[0]["details"]
+633
View File
@@ -528,3 +528,636 @@ class TestSyncRecentContactsToRadio:
assert result["loaded"] == 0
assert result["failed"] == 1
class TestSyncAndOffloadContacts:
"""Test sync_and_offload_contacts: pull contacts from radio, save to DB, remove from radio."""
@pytest.mark.asyncio
async def test_returns_error_when_not_connected(self):
"""Returns error dict when radio is not connected."""
from app.radio_sync import sync_and_offload_contacts
with patch("app.radio_sync.radio_manager") as mock_rm:
mock_rm.is_connected = False
mock_rm.meshcore = None
result = await sync_and_offload_contacts()
assert result["synced"] == 0
assert result["removed"] == 0
assert "error" in result
@pytest.mark.asyncio
async def test_syncs_and_removes_contacts(self):
"""Contacts are upserted to DB and removed from radio."""
from app.radio_sync import sync_and_offload_contacts
contact_payload = {
KEY_A: {"adv_name": "Alice", "type": 1, "flags": 0},
KEY_B: {"adv_name": "Bob", "type": 1, "flags": 0},
}
mock_get_result = MagicMock()
mock_get_result.type = EventType.NEW_CONTACT # Not ERROR
mock_get_result.payload = contact_payload
mock_remove_result = MagicMock()
mock_remove_result.type = EventType.OK
mock_mc = MagicMock()
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_get_result)
mock_mc.commands.remove_contact = AsyncMock(return_value=mock_remove_result)
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
patch(
"app.radio_sync.MessageRepository.claim_prefix_messages",
new_callable=AsyncMock,
return_value=0,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_and_offload_contacts()
assert result["synced"] == 2
assert result["removed"] == 2
assert mock_upsert.call_count == 2
assert mock_mc.commands.remove_contact.call_count == 2
@pytest.mark.asyncio
async def test_claims_prefix_messages_for_each_contact(self):
"""claim_prefix_messages is called for each synced contact."""
from app.radio_sync import sync_and_offload_contacts
contact_payload = {KEY_A: {"adv_name": "Alice", "type": 1, "flags": 0}}
mock_get_result = MagicMock()
mock_get_result.type = EventType.NEW_CONTACT
mock_get_result.payload = contact_payload
mock_remove_result = MagicMock()
mock_remove_result.type = EventType.OK
mock_mc = MagicMock()
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_get_result)
mock_mc.commands.remove_contact = AsyncMock(return_value=mock_remove_result)
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.upsert",
new_callable=AsyncMock,
),
patch(
"app.radio_sync.MessageRepository.claim_prefix_messages",
new_callable=AsyncMock,
return_value=3,
) as mock_claim,
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
await sync_and_offload_contacts()
mock_claim.assert_called_once_with(KEY_A.lower())
@pytest.mark.asyncio
async def test_handles_remove_failure_gracefully(self):
"""Failed remove_contact logs warning but continues to next contact."""
from app.radio_sync import sync_and_offload_contacts
contact_payload = {
KEY_A: {"adv_name": "Alice", "type": 1, "flags": 0},
KEY_B: {"adv_name": "Bob", "type": 1, "flags": 0},
}
mock_get_result = MagicMock()
mock_get_result.type = EventType.NEW_CONTACT
mock_get_result.payload = contact_payload
mock_fail_result = MagicMock()
mock_fail_result.type = EventType.ERROR
mock_fail_result.payload = {"error": "busy"}
mock_ok_result = MagicMock()
mock_ok_result.type = EventType.OK
mock_mc = MagicMock()
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_get_result)
# First remove fails, second succeeds
mock_mc.commands.remove_contact = AsyncMock(side_effect=[mock_fail_result, mock_ok_result])
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.upsert",
new_callable=AsyncMock,
),
patch(
"app.radio_sync.MessageRepository.claim_prefix_messages",
new_callable=AsyncMock,
return_value=0,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_and_offload_contacts()
# Both contacts synced, but only one removed successfully
assert result["synced"] == 2
assert result["removed"] == 1
@pytest.mark.asyncio
async def test_handles_remove_exception_gracefully(self):
"""Exception during remove_contact is caught and processing continues."""
from app.radio_sync import sync_and_offload_contacts
contact_payload = {KEY_A: {"adv_name": "Alice", "type": 1, "flags": 0}}
mock_get_result = MagicMock()
mock_get_result.type = EventType.NEW_CONTACT
mock_get_result.payload = contact_payload
mock_mc = MagicMock()
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_get_result)
mock_mc.commands.remove_contact = AsyncMock(side_effect=Exception("Timeout"))
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.upsert",
new_callable=AsyncMock,
),
patch(
"app.radio_sync.MessageRepository.claim_prefix_messages",
new_callable=AsyncMock,
return_value=0,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_and_offload_contacts()
assert result["synced"] == 1
assert result["removed"] == 0
@pytest.mark.asyncio
async def test_returns_error_when_get_contacts_fails(self):
"""Error result from get_contacts returns error dict."""
from app.radio_sync import sync_and_offload_contacts
mock_error_result = MagicMock()
mock_error_result.type = EventType.ERROR
mock_error_result.payload = {"error": "radio busy"}
mock_mc = MagicMock()
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_error_result)
with patch("app.radio_sync.radio_manager") as mock_rm:
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_and_offload_contacts()
assert result["synced"] == 0
assert result["removed"] == 0
assert "error" in result
@pytest.mark.asyncio
async def test_upserts_with_on_radio_false(self):
"""Contacts are upserted with on_radio=False (being removed from radio)."""
from app.radio_sync import sync_and_offload_contacts
contact_payload = {KEY_A: {"adv_name": "Alice", "type": 1, "flags": 0}}
mock_get_result = MagicMock()
mock_get_result.type = EventType.NEW_CONTACT
mock_get_result.payload = contact_payload
mock_remove_result = MagicMock()
mock_remove_result.type = EventType.OK
mock_mc = MagicMock()
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_get_result)
mock_mc.commands.remove_contact = AsyncMock(return_value=mock_remove_result)
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
patch(
"app.radio_sync.MessageRepository.claim_prefix_messages",
new_callable=AsyncMock,
return_value=0,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
await sync_and_offload_contacts()
upserted_data = mock_upsert.call_args[0][0]
assert upserted_data["on_radio"] is False
class TestSyncAndOffloadChannels:
"""Test sync_and_offload_channels: pull channels from radio, save to DB, clear from radio."""
@pytest.mark.asyncio
async def test_returns_error_when_not_connected(self):
"""Returns error dict when radio is not connected."""
from app.radio_sync import sync_and_offload_channels
with patch("app.radio_sync.radio_manager") as mock_rm:
mock_rm.is_connected = False
mock_rm.meshcore = None
result = await sync_and_offload_channels()
assert result["synced"] == 0
assert result["cleared"] == 0
assert "error" in result
@pytest.mark.asyncio
async def test_syncs_valid_channel_and_clears(self):
"""Valid channel is upserted to DB and cleared from radio."""
from app.radio_sync import sync_and_offload_channels
channel_result = MagicMock()
channel_result.type = EventType.CHANNEL_INFO
channel_result.payload = {
"channel_name": "#general",
"channel_secret": bytes.fromhex("8B3387E9C5CDEA6AC9E5EDBAA115CD72"),
}
# All other slots return non-CHANNEL_INFO
empty_result = MagicMock()
empty_result.type = EventType.ERROR
mock_mc = MagicMock()
mock_mc.commands.get_channel = AsyncMock(side_effect=[channel_result] + [empty_result] * 39)
clear_result = MagicMock()
clear_result.type = EventType.OK
mock_mc.commands.set_channel = AsyncMock(return_value=clear_result)
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_and_offload_channels()
assert result["synced"] == 1
assert result["cleared"] == 1
mock_upsert.assert_called_once_with(
key="8B3387E9C5CDEA6AC9E5EDBAA115CD72",
name="#general",
is_hashtag=True,
on_radio=False,
)
@pytest.mark.asyncio
async def test_skips_empty_channel_name(self):
"""Channels with empty names are skipped."""
from app.radio_sync import sync_and_offload_channels
empty_name_result = MagicMock()
empty_name_result.type = EventType.CHANNEL_INFO
empty_name_result.payload = {
"channel_name": "",
"channel_secret": bytes(16),
}
other_result = MagicMock()
other_result.type = EventType.ERROR
mock_mc = MagicMock()
mock_mc.commands.get_channel = AsyncMock(
side_effect=[empty_name_result] + [other_result] * 39
)
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_and_offload_channels()
assert result["synced"] == 0
assert result["cleared"] == 0
mock_upsert.assert_not_called()
@pytest.mark.asyncio
async def test_skips_channel_with_zero_key(self):
"""Channels with all-zero secret key are skipped."""
from app.radio_sync import sync_and_offload_channels
zero_key_result = MagicMock()
zero_key_result.type = EventType.CHANNEL_INFO
zero_key_result.payload = {
"channel_name": "SomeChannel",
"channel_secret": bytes(16), # All zeros
}
other_result = MagicMock()
other_result.type = EventType.ERROR
mock_mc = MagicMock()
mock_mc.commands.get_channel = AsyncMock(
side_effect=[zero_key_result] + [other_result] * 39
)
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_and_offload_channels()
assert result["synced"] == 0
mock_upsert.assert_not_called()
@pytest.mark.asyncio
async def test_non_hashtag_channel_detected(self):
"""Channel without '#' prefix has is_hashtag=False."""
from app.radio_sync import sync_and_offload_channels
channel_result = MagicMock()
channel_result.type = EventType.CHANNEL_INFO
channel_result.payload = {
"channel_name": "Public",
"channel_secret": bytes.fromhex("8B3387E9C5CDEA6AC9E5EDBAA115CD72"),
}
other_result = MagicMock()
other_result.type = EventType.ERROR
mock_mc = MagicMock()
mock_mc.commands.get_channel = AsyncMock(side_effect=[channel_result] + [other_result] * 39)
clear_result = MagicMock()
clear_result.type = EventType.OK
mock_mc.commands.set_channel = AsyncMock(return_value=clear_result)
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
await sync_and_offload_channels()
mock_upsert.assert_called_once()
assert mock_upsert.call_args.kwargs["is_hashtag"] is False
@pytest.mark.asyncio
async def test_clears_channel_with_empty_name_and_zero_key(self):
"""Cleared channels are set with empty name and 16 zero bytes."""
from app.radio_sync import sync_and_offload_channels
channel_result = MagicMock()
channel_result.type = EventType.CHANNEL_INFO
channel_result.payload = {
"channel_name": "#test",
"channel_secret": bytes.fromhex("AABBCCDD" * 4),
}
other_result = MagicMock()
other_result.type = EventType.ERROR
mock_mc = MagicMock()
mock_mc.commands.get_channel = AsyncMock(side_effect=[channel_result] + [other_result] * 39)
clear_result = MagicMock()
clear_result.type = EventType.OK
mock_mc.commands.set_channel = AsyncMock(return_value=clear_result)
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
await sync_and_offload_channels()
mock_mc.commands.set_channel.assert_called_once_with(
channel_idx=0,
channel_name="",
channel_secret=bytes(16),
)
@pytest.mark.asyncio
async def test_handles_clear_failure_gracefully(self):
"""Failed set_channel logs warning but continues processing."""
from app.radio_sync import sync_and_offload_channels
channel_results = []
for i in range(2):
r = MagicMock()
r.type = EventType.CHANNEL_INFO
r.payload = {
"channel_name": f"#ch{i}",
"channel_secret": bytes([i + 1] * 16),
}
channel_results.append(r)
other_result = MagicMock()
other_result.type = EventType.ERROR
mock_mc = MagicMock()
mock_mc.commands.get_channel = AsyncMock(side_effect=channel_results + [other_result] * 38)
fail_result = MagicMock()
fail_result.type = EventType.ERROR
fail_result.payload = {"error": "busy"}
ok_result = MagicMock()
ok_result.type = EventType.OK
mock_mc.commands.set_channel = AsyncMock(side_effect=[fail_result, ok_result])
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_and_offload_channels()
assert result["synced"] == 2
assert result["cleared"] == 1
@pytest.mark.asyncio
async def test_iterates_all_40_channel_slots(self):
"""All 40 channel slots are checked."""
from app.radio_sync import sync_and_offload_channels
empty_result = MagicMock()
empty_result.type = EventType.ERROR
mock_mc = MagicMock()
mock_mc.commands.get_channel = AsyncMock(return_value=empty_result)
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_and_offload_channels()
assert mock_mc.commands.get_channel.call_count == 40
assert result["synced"] == 0
assert result["cleared"] == 0
class TestEnsureDefaultChannels:
"""Test ensure_default_channels: create/fix the Public channel."""
PUBLIC_KEY = "8B3387E9C5CDEA6AC9E5EDBAA115CD72"
@pytest.mark.asyncio
async def test_creates_public_channel_when_missing(self):
"""Public channel is created when it does not exist."""
from app.radio_sync import ensure_default_channels
with (
patch(
"app.radio_sync.ChannelRepository.get_by_key",
new_callable=AsyncMock,
return_value=None,
) as mock_get,
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
await ensure_default_channels()
mock_get.assert_called_once_with(self.PUBLIC_KEY)
mock_upsert.assert_called_once_with(
key=self.PUBLIC_KEY,
name="Public",
is_hashtag=False,
on_radio=False,
)
@pytest.mark.asyncio
async def test_fixes_public_channel_with_wrong_name(self):
"""Public channel name is corrected when it exists with wrong name."""
from app.radio_sync import ensure_default_channels
existing = MagicMock()
existing.name = "public" # Wrong case
existing.on_radio = True
with (
patch(
"app.radio_sync.ChannelRepository.get_by_key",
new_callable=AsyncMock,
return_value=existing,
),
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
await ensure_default_channels()
mock_upsert.assert_called_once_with(
key=self.PUBLIC_KEY,
name="Public",
is_hashtag=False,
on_radio=True, # Preserves existing on_radio state
)
@pytest.mark.asyncio
async def test_no_op_when_public_channel_exists_correctly(self):
"""No upsert when Public channel already exists with correct name."""
from app.radio_sync import ensure_default_channels
existing = MagicMock()
existing.name = "Public"
existing.on_radio = False
with (
patch(
"app.radio_sync.ChannelRepository.get_by_key",
new_callable=AsyncMock,
return_value=existing,
),
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
await ensure_default_channels()
mock_upsert.assert_not_called()
@pytest.mark.asyncio
async def test_preserves_on_radio_state_when_fixing_name(self):
"""existing.on_radio is passed through when fixing the channel name."""
from app.radio_sync import ensure_default_channels
existing = MagicMock()
existing.name = "Pub"
existing.on_radio = True
with (
patch(
"app.radio_sync.ChannelRepository.get_by_key",
new_callable=AsyncMock,
return_value=existing,
),
patch(
"app.radio_sync.ChannelRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
await ensure_default_channels()
assert mock_upsert.call_args.kwargs["on_radio"] is True
+1 -3
View File
@@ -63,9 +63,7 @@ class TestMessageRepositoryAddPath:
"""Adding a path to a message with existing paths appends to the array."""
msg_id = await _create_message(test_db)
await MessageRepository.add_path(
message_id=msg_id, path="1A", received_at=1699999999
)
await MessageRepository.add_path(message_id=msg_id, path="1A", received_at=1699999999)
result = await MessageRepository.add_path(
message_id=msg_id, path="2B3C", received_at=1700000000
)