Add some additional tests around radio and contact management

This commit is contained in:
Jack Kingsman
2026-01-30 21:45:50 -08:00
parent f004d80cc8
commit ea5283dd43
4 changed files with 960 additions and 4 deletions

View File

@@ -0,0 +1,526 @@
"""Tests for the contacts router.
Verifies the contact CRUD endpoints, sync, mark-read, delete,
and add/remove from radio operations.
Uses FastAPI TestClient with mocked dependencies, consistent
with the test_api.py pattern.
"""
from unittest.mock import AsyncMock, MagicMock, patch
from meshcore import EventType
# Sample 64-char hex public keys for testing
KEY_A = "aa" * 32 # aaaa...aa
KEY_B = "bb" * 32 # bbbb...bb
KEY_C = "cc" * 32 # cccc...cc
def _make_contact(public_key=KEY_A, name="Alice", **overrides):
"""Create a mock Contact model instance."""
from app.models import Contact
defaults = {
"public_key": public_key,
"name": name,
"type": 0,
"flags": 0,
"last_path": None,
"last_path_len": -1,
"last_advert": None,
"lat": None,
"lon": None,
"last_seen": None,
"on_radio": False,
"last_contacted": None,
"last_read_at": None,
}
defaults.update(overrides)
return Contact(**defaults)
class TestListContacts:
"""Test GET /api/contacts."""
def test_list_returns_contacts(self):
from fastapi.testclient import TestClient
contacts = [_make_contact(KEY_A, "Alice"), _make_contact(KEY_B, "Bob")]
with patch(
"app.routers.contacts.ContactRepository.get_all",
new_callable=AsyncMock,
return_value=contacts,
):
from app.main import app
client = TestClient(app)
response = client.get("/api/contacts")
assert response.status_code == 200
data = response.json()
assert len(data) == 2
assert data[0]["public_key"] == KEY_A
assert data[1]["public_key"] == KEY_B
def test_list_pagination_params(self):
"""Pagination parameters are forwarded to repository."""
from fastapi.testclient import TestClient
with patch(
"app.routers.contacts.ContactRepository.get_all",
new_callable=AsyncMock,
return_value=[],
) as mock_get_all:
from app.main import app
client = TestClient(app)
response = client.get("/api/contacts?limit=5&offset=10")
assert response.status_code == 200
mock_get_all.assert_called_once_with(limit=5, offset=10)
class TestCreateContact:
"""Test POST /api/contacts."""
def test_create_new_contact(self):
from fastapi.testclient import TestClient
with (
patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=None,
),
patch(
"app.routers.contacts.ContactRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
from app.main import app
client = TestClient(app)
response = client.post(
"/api/contacts",
json={"public_key": KEY_A, "name": "NewContact"},
)
assert response.status_code == 200
data = response.json()
assert data["public_key"] == KEY_A
assert data["name"] == "NewContact"
mock_upsert.assert_called_once()
def test_create_invalid_hex(self):
"""Non-hex public key returns 400."""
from fastapi.testclient import TestClient
with patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=None,
):
from app.main import app
client = TestClient(app)
response = client.post(
"/api/contacts",
json={"public_key": "zz" * 32, "name": "Bad"},
)
assert response.status_code == 400
assert "hex" in response.json()["detail"].lower()
def test_create_short_key_rejected(self):
"""Key shorter than 64 chars is rejected by pydantic validation."""
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
response = client.post(
"/api/contacts",
json={"public_key": "aa" * 16, "name": "Short"},
)
assert response.status_code == 422
def test_create_existing_updates_name(self):
"""Creating a contact that exists updates the name."""
from fastapi.testclient import TestClient
existing = _make_contact(KEY_A, "OldName")
with (
patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=existing,
),
patch(
"app.routers.contacts.ContactRepository.upsert",
new_callable=AsyncMock,
) as mock_upsert,
):
from app.main import app
client = TestClient(app)
response = client.post(
"/api/contacts",
json={"public_key": KEY_A, "name": "NewName"},
)
assert response.status_code == 200
# Upsert called with new name
mock_upsert.assert_called_once()
upsert_data = mock_upsert.call_args[0][0]
assert upsert_data["name"] == "NewName"
class TestGetContact:
"""Test GET /api/contacts/{public_key}."""
def test_get_existing(self):
from fastapi.testclient import TestClient
contact = _make_contact(KEY_A, "Alice")
with patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=contact,
):
from app.main import app
client = TestClient(app)
response = client.get(f"/api/contacts/{KEY_A}")
assert response.status_code == 200
assert response.json()["name"] == "Alice"
def test_get_not_found(self):
from fastapi.testclient import TestClient
with patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=None,
):
from app.main import app
client = TestClient(app)
response = client.get(f"/api/contacts/{KEY_A}")
assert response.status_code == 404
class TestMarkRead:
"""Test POST /api/contacts/{public_key}/mark-read."""
def test_mark_read_updates_timestamp(self):
from fastapi.testclient import TestClient
contact = _make_contact(KEY_A)
with (
patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=contact,
),
patch(
"app.routers.contacts.ContactRepository.update_last_read_at",
new_callable=AsyncMock,
return_value=True,
),
):
from app.main import app
client = TestClient(app)
response = client.post(f"/api/contacts/{KEY_A}/mark-read")
assert response.status_code == 200
assert response.json()["status"] == "ok"
def test_mark_read_not_found(self):
from fastapi.testclient import TestClient
with patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=None,
):
from app.main import app
client = TestClient(app)
response = client.post(f"/api/contacts/{KEY_A}/mark-read")
assert response.status_code == 404
class TestDeleteContact:
"""Test DELETE /api/contacts/{public_key}."""
def test_delete_existing(self):
from fastapi.testclient import TestClient
contact = _make_contact(KEY_A)
with (
patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=contact,
),
patch(
"app.routers.contacts.ContactRepository.delete",
new_callable=AsyncMock,
),
patch("app.routers.contacts.radio_manager") as mock_rm,
):
mock_rm.is_connected = False
mock_rm.meshcore = None
from app.main import app
client = TestClient(app)
response = client.delete(f"/api/contacts/{KEY_A}")
assert response.status_code == 200
assert response.json()["status"] == "ok"
def test_delete_not_found(self):
from fastapi.testclient import TestClient
with patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=None,
):
from app.main import app
client = TestClient(app)
response = client.delete(f"/api/contacts/{KEY_A}")
assert response.status_code == 404
def test_delete_removes_from_radio_if_connected(self):
"""When radio is connected and contact is on radio, remove it first."""
from fastapi.testclient import TestClient
contact = _make_contact(KEY_A, on_radio=True)
mock_radio_contact = MagicMock()
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=mock_radio_contact)
mock_mc.commands.remove_contact = AsyncMock()
with (
patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=contact,
),
patch(
"app.routers.contacts.ContactRepository.delete",
new_callable=AsyncMock,
),
patch("app.routers.contacts.radio_manager") as mock_rm,
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
from app.main import app
client = TestClient(app)
response = client.delete(f"/api/contacts/{KEY_A}")
assert response.status_code == 200
mock_mc.commands.remove_contact.assert_called_once_with(mock_radio_contact)
class TestSyncContacts:
"""Test POST /api/contacts/sync."""
def test_sync_from_radio(self):
from fastapi.testclient import TestClient
mock_mc = MagicMock()
mock_result = MagicMock()
mock_result.type = EventType.OK
mock_result.payload = {
KEY_A: {"adv_name": "Alice", "type": 1, "flags": 0},
KEY_B: {"adv_name": "Bob", "type": 1, "flags": 0},
}
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_result)
with (
patch("app.dependencies.radio_manager") as mock_dep_rm,
patch(
"app.routers.contacts.ContactRepository.upsert", new_callable=AsyncMock
) as mock_upsert,
):
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
from app.main import app
client = TestClient(app)
response = client.post("/api/contacts/sync")
assert response.status_code == 200
assert response.json()["synced"] == 2
assert mock_upsert.call_count == 2
def test_sync_requires_connection(self):
from fastapi.testclient import TestClient
with patch("app.dependencies.radio_manager") as mock_rm:
mock_rm.is_connected = False
mock_rm.meshcore = None
from app.main import app
client = TestClient(app)
response = client.post("/api/contacts/sync")
assert response.status_code == 503
class TestAddRemoveRadio:
"""Test add-to-radio and remove-from-radio endpoints."""
def test_add_to_radio(self):
from fastapi.testclient import TestClient
contact = _make_contact(KEY_A)
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None) # Not on radio
mock_result = MagicMock()
mock_result.type = EventType.OK
mock_mc.commands.add_contact = AsyncMock(return_value=mock_result)
with (
patch("app.dependencies.radio_manager") as mock_dep_rm,
patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=contact,
),
patch(
"app.routers.contacts.ContactRepository.set_on_radio",
new_callable=AsyncMock,
) as mock_set_on_radio,
):
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
from app.main import app
client = TestClient(app)
response = client.post(f"/api/contacts/{KEY_A}/add-to-radio")
assert response.status_code == 200
mock_mc.commands.add_contact.assert_called_once()
mock_set_on_radio.assert_called_once_with(KEY_A, True)
def test_add_already_on_radio(self):
"""Adding a contact already on radio returns ok without calling add_contact."""
from fastapi.testclient import TestClient
contact = _make_contact(KEY_A, on_radio=True)
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=MagicMock()) # On radio
with (
patch("app.dependencies.radio_manager") as mock_dep_rm,
patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=contact,
),
):
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
from app.main import app
client = TestClient(app)
response = client.post(f"/api/contacts/{KEY_A}/add-to-radio")
assert response.status_code == 200
assert "already" in response.json()["message"].lower()
def test_remove_from_radio(self):
from fastapi.testclient import TestClient
contact = _make_contact(KEY_A, on_radio=True)
mock_radio_contact = MagicMock()
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=mock_radio_contact)
mock_result = MagicMock()
mock_result.type = EventType.OK
mock_mc.commands.remove_contact = AsyncMock(return_value=mock_result)
with (
patch("app.dependencies.radio_manager") as mock_dep_rm,
patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=contact,
),
patch(
"app.routers.contacts.ContactRepository.set_on_radio",
new_callable=AsyncMock,
) as mock_set_on_radio,
):
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
from app.main import app
client = TestClient(app)
response = client.post(f"/api/contacts/{KEY_A}/remove-from-radio")
assert response.status_code == 200
mock_mc.commands.remove_contact.assert_called_once_with(mock_radio_contact)
mock_set_on_radio.assert_called_once_with(KEY_A, False)
def test_add_requires_connection(self):
from fastapi.testclient import TestClient
with patch("app.dependencies.radio_manager") as mock_rm:
mock_rm.is_connected = False
mock_rm.meshcore = None
from app.main import app
client = TestClient(app)
response = client.post(f"/api/contacts/{KEY_A}/add-to-radio")
assert response.status_code == 503
def test_remove_not_found(self):
from fastapi.testclient import TestClient
mock_mc = MagicMock()
with (
patch("app.dependencies.radio_manager") as mock_dep_rm,
patch(
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
new_callable=AsyncMock,
return_value=None,
),
):
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
from app.main import app
client = TestClient(app)
response = client.post(f"/api/contacts/{KEY_A}/remove-from-radio")
assert response.status_code == 404

159
tests/test_keystore.py Normal file
View File

@@ -0,0 +1,159 @@
"""Tests for the ephemeral keystore module.
Verifies private key storage, validation, public key derivation,
and the export_and_store_private_key flow with various radio responses.
"""
from unittest.mock import AsyncMock, MagicMock
import pytest
from meshcore import EventType
from app.keystore import (
export_and_store_private_key,
get_private_key,
get_public_key,
has_private_key,
set_private_key,
)
@pytest.fixture(autouse=True)
def reset_keystore():
"""Reset keystore state before each test."""
import app.keystore as ks
ks._private_key = None
ks._public_key = None
yield
ks._private_key = None
ks._public_key = None
def _make_valid_private_key() -> bytes:
"""Create a valid 64-byte MeshCore private key for testing.
The first 32 bytes are a clamped Ed25519 scalar,
the last 32 bytes are the signing prefix.
"""
# A clamped scalar: clear bottom 3 bits, set bit 254, clear bit 255
scalar = bytearray(b"\x01" * 32)
scalar[0] &= 0xF8 # Clear bottom 3 bits
scalar[31] &= 0x7F # Clear top bit
scalar[31] |= 0x40 # Set bit 254
prefix = b"\x02" * 32
return bytes(scalar) + prefix
VALID_KEY = _make_valid_private_key()
class TestSetPrivateKey:
"""Test set_private_key validation and storage."""
def test_stores_key_and_derives_public_key(self):
"""Valid 64-byte key is stored and public key is derived."""
set_private_key(VALID_KEY)
assert get_private_key() == VALID_KEY
pub = get_public_key()
assert pub is not None
assert len(pub) == 32
assert has_private_key() is True
def test_rejects_wrong_length(self):
"""Keys that aren't 64 bytes are rejected."""
with pytest.raises(ValueError, match="64 bytes"):
set_private_key(b"\x00" * 32)
def test_rejects_empty_key(self):
"""Empty key is rejected."""
with pytest.raises(ValueError, match="64 bytes"):
set_private_key(b"")
def test_overwrites_previous_key(self):
"""Setting a new key replaces the old one."""
set_private_key(VALID_KEY)
pub1 = get_public_key()
# Create a different valid key
other_key = bytearray(VALID_KEY)
other_key[1] = 0x42 # Change a byte in the scalar
other_key = bytes(other_key)
set_private_key(other_key)
pub2 = get_public_key()
assert get_private_key() == other_key
assert pub1 != pub2
class TestGettersWhenEmpty:
"""Test getter behavior when no key is stored."""
def test_get_private_key_returns_none(self):
assert get_private_key() is None
def test_get_public_key_returns_none(self):
assert get_public_key() is None
def test_has_private_key_false(self):
assert has_private_key() is False
class TestExportAndStorePrivateKey:
"""Test the export_and_store_private_key flow with various radio responses."""
@pytest.mark.asyncio
async def test_success_stores_key(self):
"""Successful export stores the key in the keystore."""
mock_mc = MagicMock()
mock_result = MagicMock()
mock_result.type = EventType.PRIVATE_KEY
mock_result.payload = {"private_key": VALID_KEY}
mock_mc.commands.export_private_key = AsyncMock(return_value=mock_result)
result = await export_and_store_private_key(mock_mc)
assert result is True
assert has_private_key()
assert get_private_key() == VALID_KEY
@pytest.mark.asyncio
async def test_disabled_returns_false(self):
"""DISABLED response returns False without storing."""
mock_mc = MagicMock()
mock_result = MagicMock()
mock_result.type = EventType.DISABLED
mock_result.payload = {}
mock_mc.commands.export_private_key = AsyncMock(return_value=mock_result)
result = await export_and_store_private_key(mock_mc)
assert result is False
assert not has_private_key()
@pytest.mark.asyncio
async def test_error_returns_false(self):
"""ERROR response returns False without storing."""
mock_mc = MagicMock()
mock_result = MagicMock()
mock_result.type = EventType.ERROR
mock_result.payload = {"error": "something went wrong"}
mock_mc.commands.export_private_key = AsyncMock(return_value=mock_result)
result = await export_and_store_private_key(mock_mc)
assert result is False
assert not has_private_key()
@pytest.mark.asyncio
async def test_exception_returns_false(self):
"""Exception during export returns False without storing."""
mock_mc = MagicMock()
mock_mc.commands.export_private_key = AsyncMock(side_effect=Exception("Connection lost"))
result = await export_and_store_private_key(mock_mc)
assert result is False
assert not has_private_key()

View File

@@ -7,22 +7,27 @@ message polling from interfering with repeater CLI operations.
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from meshcore import EventType
from app.models import Contact
from app.radio_sync import (
is_polling_paused,
pause_polling,
sync_radio_time,
sync_recent_contacts_to_radio,
)
@pytest.fixture(autouse=True)
def reset_polling_state():
"""Reset polling pause state before and after each test."""
def reset_sync_state():
"""Reset polling pause state and sync timestamp before and after each test."""
import app.radio_sync as radio_sync
radio_sync._polling_pause_count = 0
radio_sync._last_contact_sync = 0.0
yield
radio_sync._polling_pause_count = 0
radio_sync._last_contact_sync = 0.0
class TestPollingPause:
@@ -158,3 +163,263 @@ class TestSyncRadioTime:
result = await sync_radio_time()
assert result is False
KEY_A = "aa" * 32
KEY_B = "bb" * 32
def _make_contact(public_key=KEY_A, name="Alice", on_radio=False, **overrides):
"""Create a Contact model instance for testing."""
defaults = {
"public_key": public_key,
"name": name,
"type": 0,
"flags": 0,
"last_path": None,
"last_path_len": -1,
"last_advert": None,
"lat": None,
"lon": None,
"last_seen": None,
"on_radio": on_radio,
"last_contacted": None,
"last_read_at": None,
}
defaults.update(overrides)
return Contact(**defaults)
class TestSyncRecentContactsToRadio:
"""Test the sync_recent_contacts_to_radio function."""
@pytest.mark.asyncio
async def test_loads_contacts_not_on_radio(self):
"""Contacts not on radio are added via add_contact."""
contacts = [_make_contact(KEY_A, "Alice"), _make_contact(KEY_B, "Bob")]
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
mock_result = MagicMock()
mock_result.type = EventType.OK
mock_mc.commands.add_contact = AsyncMock(return_value=mock_result)
mock_settings = MagicMock()
mock_settings.max_radio_contacts = 200
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.get_recent_non_repeaters",
new_callable=AsyncMock,
return_value=contacts,
),
patch(
"app.radio_sync.ContactRepository.set_on_radio",
new_callable=AsyncMock,
) as mock_set_on_radio,
patch(
"app.radio_sync.AppSettingsRepository.get",
new_callable=AsyncMock,
return_value=mock_settings,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_recent_contacts_to_radio()
assert result["loaded"] == 2
assert mock_set_on_radio.call_count == 2
@pytest.mark.asyncio
async def test_skips_contacts_already_on_radio(self):
"""Contacts already on radio are counted but not re-added."""
contacts = [_make_contact(KEY_A, "Alice", on_radio=True)]
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=MagicMock()) # Found
mock_mc.commands.add_contact = AsyncMock()
mock_settings = MagicMock()
mock_settings.max_radio_contacts = 200
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.get_recent_non_repeaters",
new_callable=AsyncMock,
return_value=contacts,
),
patch(
"app.radio_sync.ContactRepository.set_on_radio",
new_callable=AsyncMock,
),
patch(
"app.radio_sync.AppSettingsRepository.get",
new_callable=AsyncMock,
return_value=mock_settings,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_recent_contacts_to_radio()
assert result["loaded"] == 0
assert result["already_on_radio"] == 1
mock_mc.commands.add_contact.assert_not_called()
@pytest.mark.asyncio
async def test_throttled_when_called_quickly(self):
"""Second call within throttle window returns throttled result."""
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
mock_settings = MagicMock()
mock_settings.max_radio_contacts = 200
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.get_recent_non_repeaters",
new_callable=AsyncMock,
return_value=[],
),
patch(
"app.radio_sync.AppSettingsRepository.get",
new_callable=AsyncMock,
return_value=mock_settings,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
# First call succeeds
result1 = await sync_recent_contacts_to_radio()
assert "throttled" not in result1
# Second call is throttled
result2 = await sync_recent_contacts_to_radio()
assert result2["throttled"] is True
assert result2["loaded"] == 0
@pytest.mark.asyncio
async def test_force_bypasses_throttle(self):
"""force=True bypasses the throttle window."""
mock_mc = MagicMock()
mock_settings = MagicMock()
mock_settings.max_radio_contacts = 200
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.get_recent_non_repeaters",
new_callable=AsyncMock,
return_value=[],
),
patch(
"app.radio_sync.AppSettingsRepository.get",
new_callable=AsyncMock,
return_value=mock_settings,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
# First call
await sync_recent_contacts_to_radio()
# Forced second call is not throttled
result = await sync_recent_contacts_to_radio(force=True)
assert "throttled" not in result
@pytest.mark.asyncio
async def test_not_connected_returns_error(self):
"""Returns error when radio is not connected."""
with patch("app.radio_sync.radio_manager") as mock_rm:
mock_rm.is_connected = False
mock_rm.meshcore = None
result = await sync_recent_contacts_to_radio()
assert result["loaded"] == 0
assert "error" in result
@pytest.mark.asyncio
async def test_marks_on_radio_when_found_but_not_flagged(self):
"""Contact found on radio but not flagged gets set_on_radio(True)."""
contact = _make_contact(KEY_A, "Alice", on_radio=False)
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=MagicMock()) # Found
mock_settings = MagicMock()
mock_settings.max_radio_contacts = 200
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.get_recent_non_repeaters",
new_callable=AsyncMock,
return_value=[contact],
),
patch(
"app.radio_sync.ContactRepository.set_on_radio",
new_callable=AsyncMock,
) as mock_set_on_radio,
patch(
"app.radio_sync.AppSettingsRepository.get",
new_callable=AsyncMock,
return_value=mock_settings,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_recent_contacts_to_radio()
assert result["already_on_radio"] == 1
# Should update the flag since contact.on_radio was False
mock_set_on_radio.assert_called_once_with(KEY_A, True)
@pytest.mark.asyncio
async def test_handles_add_failure(self):
"""Failed add_contact increments the failed counter."""
contacts = [_make_contact(KEY_A, "Alice")]
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
mock_result = MagicMock()
mock_result.type = EventType.ERROR
mock_result.payload = {"error": "Radio full"}
mock_mc.commands.add_contact = AsyncMock(return_value=mock_result)
mock_settings = MagicMock()
mock_settings.max_radio_contacts = 200
with (
patch("app.radio_sync.radio_manager") as mock_rm,
patch(
"app.radio_sync.ContactRepository.get_recent_non_repeaters",
new_callable=AsyncMock,
return_value=contacts,
),
patch(
"app.radio_sync.ContactRepository.set_on_radio",
new_callable=AsyncMock,
),
patch(
"app.radio_sync.AppSettingsRepository.get",
new_callable=AsyncMock,
return_value=mock_settings,
),
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
result = await sync_recent_contacts_to_radio()
assert result["loaded"] == 0
assert result["failed"] == 1

View File

@@ -73,7 +73,10 @@ class TestOutgoingDMBotTrigger:
db_contact = Contact(public_key="ab" * 32, name="Alice")
# Bot that would take a long time
slow_bot = AsyncMock(side_effect=lambda **kw: asyncio.sleep(10))
async def _slow(**kw):
await asyncio.sleep(10)
slow_bot = AsyncMock(side_effect=_slow)
with (
patch("app.routers.messages.require_connected", return_value=mc),
@@ -179,7 +182,10 @@ class TestOutgoingChannelBotTrigger:
mc = _make_mc(name="MyNode")
db_channel = Channel(key="cc" * 16, name="#slow")
slow_bot = AsyncMock(side_effect=lambda **kw: asyncio.sleep(10))
async def _slow(**kw):
await asyncio.sleep(10)
slow_bot = AsyncMock(side_effect=_slow)
with (
patch("app.routers.messages.require_connected", return_value=mc),