Files
Remote-Terminal-for-MeshCore/tests/test_contacts_router.py
2026-02-28 15:33:50 -08:00

771 lines
27 KiB
Python

"""Tests for the contacts router.
Verifies the contact CRUD endpoints, sync, mark-read, delete,
and add/remove from radio operations.
Uses httpx.AsyncClient with real in-memory SQLite database.
"""
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from meshcore import EventType
from app.radio import radio_manager
from app.repository import ContactAdvertPathRepository, ContactRepository, MessageRepository
# Sample 64-char hex public keys for testing
KEY_A = "aa" * 32 # aaaa...aa
KEY_B = "bb" * 32 # bbbb...bb
KEY_C = "cc" * 32 # cccc...cc
def _noop_radio_operation(mc=None):
"""Factory for a no-op radio_operation context manager that yields mc."""
@asynccontextmanager
async def _ctx(*_args, **_kwargs):
yield mc
return _ctx
@pytest.fixture(autouse=True)
def _reset_radio_state():
"""Save/restore radio_manager state so tests don't leak."""
prev = radio_manager._meshcore
prev_lock = radio_manager._operation_lock
yield
radio_manager._meshcore = prev
radio_manager._operation_lock = prev_lock
async def _insert_contact(public_key=KEY_A, name="Alice", on_radio=False, **overrides):
"""Insert a contact into the test database."""
data = {
"public_key": public_key,
"name": name,
"type": 0,
"flags": 0,
"last_path": None,
"last_path_len": -1,
"last_advert": None,
"lat": None,
"lon": None,
"last_seen": None,
"on_radio": on_radio,
"last_contacted": None,
"first_seen": None,
}
data.update(overrides)
await ContactRepository.upsert(data)
class TestListContacts:
"""Test GET /api/contacts."""
@pytest.mark.asyncio
async def test_list_returns_contacts(self, test_db, client):
await _insert_contact(KEY_A, "Alice")
await _insert_contact(KEY_B, "Bob")
response = await client.get("/api/contacts")
assert response.status_code == 200
data = response.json()
assert len(data) == 2
keys = {d["public_key"] for d in data}
assert KEY_A in keys
assert KEY_B in keys
@pytest.mark.asyncio
async def test_list_pagination_params(self, test_db, client):
# Insert 3 contacts
await _insert_contact(KEY_A, "Alice")
await _insert_contact(KEY_B, "Bob")
await _insert_contact(KEY_C, "Carol")
response = await client.get("/api/contacts?limit=2&offset=0")
assert response.status_code == 200
data = response.json()
assert len(data) == 2
class TestCreateContact:
"""Test POST /api/contacts."""
@pytest.mark.asyncio
async def test_create_new_contact(self, test_db, client):
response = await client.post(
"/api/contacts",
json={"public_key": KEY_A, "name": "NewContact"},
)
assert response.status_code == 200
data = response.json()
assert data["public_key"] == KEY_A
assert data["name"] == "NewContact"
# Verify in DB
contact = await ContactRepository.get_by_key(KEY_A)
assert contact is not None
assert contact.name == "NewContact"
@pytest.mark.asyncio
async def test_create_invalid_hex(self, test_db, client):
"""Non-hex public key returns 400."""
response = await client.post(
"/api/contacts",
json={"public_key": "zz" * 32, "name": "Bad"},
)
assert response.status_code == 400
assert "hex" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_create_short_key_rejected(self, test_db, client):
"""Key shorter than 64 chars is rejected by pydantic validation."""
response = await client.post(
"/api/contacts",
json={"public_key": "aa" * 16, "name": "Short"},
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_create_existing_updates_name(self, test_db, client):
"""Creating a contact that exists updates the name."""
await _insert_contact(KEY_A, "OldName")
response = await client.post(
"/api/contacts",
json={"public_key": KEY_A, "name": "NewName"},
)
assert response.status_code == 200
# Verify name was updated in DB
contact = await ContactRepository.get_by_key(KEY_A)
assert contact.name == "NewName"
class TestGetContact:
"""Test GET /api/contacts/{public_key}."""
@pytest.mark.asyncio
async def test_get_existing(self, test_db, client):
await _insert_contact(KEY_A, "Alice")
response = await client.get(f"/api/contacts/{KEY_A}")
assert response.status_code == 200
assert response.json()["name"] == "Alice"
@pytest.mark.asyncio
async def test_get_not_found(self, test_db, client):
response = await client.get(f"/api/contacts/{KEY_A}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_get_ambiguous_prefix_returns_409(self, test_db, client):
# Insert two contacts that share a prefix
await _insert_contact("abcd12" + "00" * 29, "ContactA")
await _insert_contact("abcd12" + "ff" * 29, "ContactB")
response = await client.get("/api/contacts/abcd12")
assert response.status_code == 409
assert "ambiguous" in response.json()["detail"].lower()
class TestAdvertPaths:
"""Test repeater advert path endpoints."""
@pytest.mark.asyncio
async def test_list_repeater_advert_paths(self, test_db, client):
repeater_key = KEY_A
await _insert_contact(repeater_key, "R1", type=2)
await ContactAdvertPathRepository.record_observation(repeater_key, "1122", 1000)
await ContactAdvertPathRepository.record_observation(repeater_key, "3344", 1010)
response = await client.get("/api/contacts/repeaters/advert-paths?limit_per_repeater=1")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["public_key"] == repeater_key
assert len(data[0]["paths"]) == 1
assert data[0]["paths"][0]["path"] == "3344"
assert data[0]["paths"][0]["next_hop"] == "33"
@pytest.mark.asyncio
async def test_get_contact_advert_paths_for_repeater(self, test_db, client):
repeater_key = KEY_A
await _insert_contact(repeater_key, "R1", type=2)
await ContactAdvertPathRepository.record_observation(repeater_key, "", 1000)
response = await client.get(f"/api/contacts/{repeater_key}/advert-paths")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["path"] == ""
assert data[0]["next_hop"] is None
@pytest.mark.asyncio
async def test_get_contact_advert_paths_works_for_non_repeater(self, test_db, client):
await _insert_contact(KEY_A, "Alice", type=1)
response = await client.get(f"/api/contacts/{KEY_A}/advert-paths")
assert response.status_code == 200
assert response.json() == []
class TestContactDetail:
"""Test GET /api/contacts/{public_key}/detail."""
@pytest.mark.asyncio
async def test_detail_returns_full_profile(self, test_db, client):
"""Happy path: contact with DMs, channel messages, name history, advert paths."""
await _insert_contact(KEY_A, "Alice", type=1)
# Add some DMs
await MessageRepository.create(
msg_type="PRIV",
text="hi",
conversation_key=KEY_A,
sender_timestamp=1000,
received_at=1000,
sender_key=KEY_A,
)
await MessageRepository.create(
msg_type="PRIV",
text="hello",
conversation_key=KEY_A,
sender_timestamp=1001,
received_at=1001,
outgoing=True,
)
# Add a channel message attributed to this contact
from app.repository import ContactNameHistoryRepository
await MessageRepository.create(
msg_type="CHAN",
text="Alice: yo",
conversation_key="CHAN_KEY_0" * 2,
sender_timestamp=1002,
received_at=1002,
sender_name="Alice",
sender_key=KEY_A,
)
# Record name history
await ContactNameHistoryRepository.record_name(KEY_A, "Alice", 1000)
await ContactNameHistoryRepository.record_name(KEY_A, "AliceOld", 500)
# Record advert paths
await ContactAdvertPathRepository.record_observation(KEY_A, "1122", 1000)
await ContactAdvertPathRepository.record_observation(KEY_A, "", 900)
response = await client.get(f"/api/contacts/{KEY_A}/detail")
assert response.status_code == 200
data = response.json()
assert data["contact"]["public_key"] == KEY_A
assert data["dm_message_count"] == 2
assert data["channel_message_count"] == 1
assert len(data["name_history"]) == 2
assert data["name_history"][0]["name"] == "Alice" # most recent first
assert len(data["advert_paths"]) == 2
assert len(data["most_active_rooms"]) == 1
@pytest.mark.asyncio
async def test_detail_contact_not_found(self, test_db, client):
response = await client.get(f"/api/contacts/{KEY_A}/detail")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_detail_with_no_activity(self, test_db, client):
"""Contact with no messages or paths returns zero counts and empty lists."""
await _insert_contact(KEY_A, "Alice")
response = await client.get(f"/api/contacts/{KEY_A}/detail")
assert response.status_code == 200
data = response.json()
assert data["dm_message_count"] == 0
assert data["channel_message_count"] == 0
assert data["most_active_rooms"] == []
assert data["advert_paths"] == []
assert data["advert_frequency"] is None
assert data["nearest_repeaters"] == []
@pytest.mark.asyncio
async def test_detail_nearest_repeaters_resolved(self, test_db, client):
"""Nearest repeaters are resolved from first-hop prefixes in advert paths."""
await _insert_contact(KEY_A, "Alice", type=1)
# Create a repeater whose key starts with "bb"
await _insert_contact(KEY_B, "Relay1", type=2)
# Record advert paths that go through KEY_B's prefix
await ContactAdvertPathRepository.record_observation(KEY_A, "bb1122", 1000)
await ContactAdvertPathRepository.record_observation(KEY_A, "bb3344", 1010)
response = await client.get(f"/api/contacts/{KEY_A}/detail")
assert response.status_code == 200
data = response.json()
assert len(data["nearest_repeaters"]) == 1
repeater = data["nearest_repeaters"][0]
assert repeater["public_key"] == KEY_B
assert repeater["name"] == "Relay1"
assert repeater["heard_count"] == 2
@pytest.mark.asyncio
async def test_detail_advert_frequency_computed(self, test_db, client):
"""Advert frequency is computed from path observations over time span."""
await _insert_contact(KEY_A, "Alice")
# 10 observations over 1 hour (3600s)
for i in range(10):
path_hex = f"{i:02x}" * 2 # unique paths to avoid upsert
await ContactAdvertPathRepository.record_observation(KEY_A, path_hex, 1000 + i * 360)
response = await client.get(f"/api/contacts/{KEY_A}/detail")
assert response.status_code == 200
data = response.json()
# 10 observations / (3240s / 3600) ≈ 11.11/hr
assert data["advert_frequency"] is not None
assert data["advert_frequency"] > 0
class TestDeleteContactCascade:
"""Test that contact delete cleans up related tables."""
@pytest.mark.asyncio
async def test_delete_removes_name_history_and_advert_paths(self, test_db, client):
await _insert_contact(KEY_A, "Alice")
from app.repository import ContactNameHistoryRepository
await ContactNameHistoryRepository.record_name(KEY_A, "Alice", 1000)
await ContactAdvertPathRepository.record_observation(KEY_A, "1122", 1000)
# Verify data exists
assert len(await ContactNameHistoryRepository.get_history(KEY_A)) == 1
assert len(await ContactAdvertPathRepository.get_recent_for_contact(KEY_A)) == 1
with patch("app.routers.contacts.radio_manager") as mock_rm:
mock_rm.is_connected = False
mock_rm.meshcore = None
mock_rm.radio_operation = _noop_radio_operation()
response = await client.delete(f"/api/contacts/{KEY_A}")
assert response.status_code == 200
# Verify related data cleaned up
assert len(await ContactNameHistoryRepository.get_history(KEY_A)) == 0
assert len(await ContactAdvertPathRepository.get_recent_for_contact(KEY_A)) == 0
class TestMarkRead:
"""Test POST /api/contacts/{public_key}/mark-read."""
@pytest.mark.asyncio
async def test_mark_read_updates_timestamp(self, test_db, client):
await _insert_contact(KEY_A)
response = await client.post(f"/api/contacts/{KEY_A}/mark-read")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Verify last_read_at was set in DB
contact = await ContactRepository.get_by_key(KEY_A)
assert contact.last_read_at is not None
@pytest.mark.asyncio
async def test_mark_read_not_found(self, test_db, client):
response = await client.post(f"/api/contacts/{KEY_A}/mark-read")
assert response.status_code == 404
class TestDeleteContact:
"""Test DELETE /api/contacts/{public_key}."""
@pytest.mark.asyncio
async def test_delete_existing(self, test_db, client):
await _insert_contact(KEY_A)
with patch("app.routers.contacts.radio_manager") as mock_rm:
mock_rm.is_connected = False
mock_rm.meshcore = None
mock_rm.radio_operation = _noop_radio_operation()
response = await client.delete(f"/api/contacts/{KEY_A}")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Verify deleted from DB
contact = await ContactRepository.get_by_key(KEY_A)
assert contact is None
@pytest.mark.asyncio
async def test_delete_not_found(self, test_db, client):
response = await client.delete(f"/api/contacts/{KEY_A}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_removes_from_radio_if_connected(self, test_db, client):
"""When radio is connected and contact is on radio, remove it first."""
await _insert_contact(KEY_A, on_radio=True)
mock_radio_contact = MagicMock()
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=mock_radio_contact)
mock_mc.commands.remove_contact = AsyncMock()
with patch("app.routers.contacts.radio_manager") as mock_rm:
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
mock_rm.radio_operation = _noop_radio_operation(mock_mc)
response = await client.delete(f"/api/contacts/{KEY_A}")
assert response.status_code == 200
mock_mc.commands.remove_contact.assert_called_once_with(mock_radio_contact)
class TestSyncContacts:
"""Test POST /api/contacts/sync."""
@pytest.mark.asyncio
async def test_sync_from_radio(self, test_db, client):
mock_mc = MagicMock()
mock_result = MagicMock()
mock_result.type = EventType.OK
mock_result.payload = {
KEY_A: {"adv_name": "Alice", "type": 1, "flags": 0},
KEY_B: {"adv_name": "Bob", "type": 1, "flags": 0},
}
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_result)
radio_manager._meshcore = mock_mc
with patch("app.dependencies.radio_manager") as mock_dep_rm:
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
response = await client.post("/api/contacts/sync")
assert response.status_code == 200
assert response.json()["synced"] == 2
# Verify contacts are in real DB
alice = await ContactRepository.get_by_key(KEY_A)
assert alice is not None
assert alice.name == "Alice"
@pytest.mark.asyncio
async def test_sync_requires_connection(self, test_db, client):
with patch("app.dependencies.radio_manager") as mock_rm:
mock_rm.is_connected = False
mock_rm.meshcore = None
response = await client.post("/api/contacts/sync")
assert response.status_code == 503
@pytest.mark.asyncio
async def test_sync_claims_prefix_messages(self, test_db, client):
"""Syncing contacts promotes prefix-stored DM messages to the full key."""
await MessageRepository.create(
msg_type="PRIV",
text="hello from prefix",
received_at=1700000000,
conversation_key=KEY_A[:12],
sender_timestamp=1700000000,
)
mock_mc = MagicMock()
mock_result = MagicMock()
mock_result.type = EventType.OK
mock_result.payload = {KEY_A: {"adv_name": "Alice", "type": 1, "flags": 0}}
mock_mc.commands.get_contacts = AsyncMock(return_value=mock_result)
radio_manager._meshcore = mock_mc
with patch("app.dependencies.radio_manager") as mock_dep_rm:
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
response = await client.post("/api/contacts/sync")
assert response.status_code == 200
assert response.json()["synced"] == 1
messages = await MessageRepository.get_all(conversation_key=KEY_A)
assert len(messages) == 1
assert messages[0].conversation_key == KEY_A.lower()
class TestCreateContactWithHistorical:
"""Test POST /api/contacts with try_historical=true."""
@pytest.mark.asyncio
async def test_new_contact_triggers_historical_decrypt(self, test_db, client):
"""Creating a new contact with try_historical triggers DM decryption."""
with patch(
"app.routers.contacts.start_historical_dm_decryption", new_callable=AsyncMock
) as mock_start:
response = await client.post(
"/api/contacts",
json={"public_key": KEY_A, "name": "Alice", "try_historical": True},
)
assert response.status_code == 200
assert response.json()["public_key"] == KEY_A
mock_start.assert_awaited_once()
# Verify correct args: (background_tasks, public_key, name)
call_args = mock_start.call_args
assert call_args[0][1] == KEY_A # public_key
assert call_args[0][2] == "Alice" # display_name
@pytest.mark.asyncio
async def test_new_contact_without_historical(self, test_db, client):
"""Creating a new contact without try_historical does not trigger decryption."""
with patch(
"app.routers.contacts.start_historical_dm_decryption", new_callable=AsyncMock
) as mock_start:
response = await client.post(
"/api/contacts",
json={"public_key": KEY_A, "name": "Alice", "try_historical": False},
)
assert response.status_code == 200
mock_start.assert_not_awaited()
@pytest.mark.asyncio
async def test_existing_contact_with_historical(self, test_db, client):
"""Existing contact with try_historical still triggers decryption."""
await _insert_contact(KEY_A, "Alice")
with patch(
"app.routers.contacts.start_historical_dm_decryption", new_callable=AsyncMock
) as mock_start:
response = await client.post(
"/api/contacts",
json={"public_key": KEY_A, "name": "Alice", "try_historical": True},
)
assert response.status_code == 200
mock_start.assert_awaited_once()
@pytest.mark.asyncio
async def test_existing_contact_updates_name_and_decrypts(self, test_db, client):
"""Existing contact with try_historical updates name AND triggers decryption."""
await _insert_contact(KEY_A, "OldName")
with patch(
"app.routers.contacts.start_historical_dm_decryption", new_callable=AsyncMock
) as mock_start:
response = await client.post(
"/api/contacts",
json={"public_key": KEY_A, "name": "NewName", "try_historical": True},
)
assert response.status_code == 200
mock_start.assert_awaited_once()
# Verify name was also updated
contact = await ContactRepository.get_by_key(KEY_A)
assert contact.name == "NewName"
@pytest.mark.asyncio
async def test_default_try_historical_is_false(self, test_db, client):
"""try_historical defaults to false when not provided."""
with patch(
"app.routers.contacts.start_historical_dm_decryption", new_callable=AsyncMock
) as mock_start:
response = await client.post(
"/api/contacts",
json={"public_key": KEY_A, "name": "Alice"},
)
assert response.status_code == 200
mock_start.assert_not_awaited()
class TestResetPath:
"""Test POST /api/contacts/{public_key}/reset-path."""
@pytest.mark.asyncio
async def test_reset_path_to_flood(self, test_db, client):
"""Happy path: resets path to flood and returns ok."""
await _insert_contact(KEY_A, last_path="1122", last_path_len=1)
with (
patch("app.routers.contacts.radio_manager") as mock_rm,
patch("app.websocket.broadcast_event"),
):
mock_rm.is_connected = False
response = await client.post(f"/api/contacts/{KEY_A}/reset-path")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["public_key"] == KEY_A
# Verify path was reset in DB
contact = await ContactRepository.get_by_key(KEY_A)
assert contact.last_path == ""
assert contact.last_path_len == -1
@pytest.mark.asyncio
async def test_reset_path_not_found(self, test_db, client):
response = await client.post(f"/api/contacts/{KEY_A}/reset-path")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_reset_path_pushes_to_radio(self, test_db, client):
"""When radio connected and contact on_radio, pushes updated path."""
await _insert_contact(KEY_A, on_radio=True, last_path="1122", last_path_len=1)
mock_mc = MagicMock()
mock_result = MagicMock()
mock_result.type = EventType.OK
mock_mc.commands.add_contact = AsyncMock(return_value=mock_result)
with (
patch("app.routers.contacts.radio_manager") as mock_rm,
patch("app.websocket.broadcast_event"),
):
mock_rm.is_connected = True
mock_rm.radio_operation = _noop_radio_operation(mock_mc)
response = await client.post(f"/api/contacts/{KEY_A}/reset-path")
assert response.status_code == 200
mock_mc.commands.add_contact.assert_called_once()
@pytest.mark.asyncio
async def test_reset_path_broadcasts_websocket_event(self, test_db, client):
"""After resetting, broadcasts updated contact via WebSocket."""
await _insert_contact(KEY_A, last_path="1122", last_path_len=1)
with (
patch("app.routers.contacts.radio_manager") as mock_rm,
patch("app.websocket.broadcast_event") as mock_broadcast,
):
mock_rm.is_connected = False
response = await client.post(f"/api/contacts/{KEY_A}/reset-path")
assert response.status_code == 200
mock_broadcast.assert_called_once()
event_type, event_data = mock_broadcast.call_args[0]
assert event_type == "contact"
assert event_data["public_key"] == KEY_A
assert event_data["last_path_len"] == -1
class TestAddRemoveRadio:
"""Test add-to-radio and remove-from-radio endpoints."""
@pytest.mark.asyncio
async def test_add_to_radio(self, test_db, client):
await _insert_contact(KEY_A)
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None) # Not on radio
mock_result = MagicMock()
mock_result.type = EventType.OK
mock_mc.commands.add_contact = AsyncMock(return_value=mock_result)
radio_manager._meshcore = mock_mc
with patch("app.dependencies.radio_manager") as mock_dep_rm:
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
response = await client.post(f"/api/contacts/{KEY_A}/add-to-radio")
assert response.status_code == 200
mock_mc.commands.add_contact.assert_called_once()
# Verify on_radio flag updated in DB
contact = await ContactRepository.get_by_key(KEY_A)
assert contact.on_radio is True
@pytest.mark.asyncio
async def test_add_already_on_radio(self, test_db, client):
"""Adding a contact already on radio returns ok without calling add_contact."""
await _insert_contact(KEY_A, on_radio=True)
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=MagicMock()) # On radio
radio_manager._meshcore = mock_mc
with patch("app.dependencies.radio_manager") as mock_dep_rm:
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
response = await client.post(f"/api/contacts/{KEY_A}/add-to-radio")
assert response.status_code == 200
assert "already" in response.json()["message"].lower()
@pytest.mark.asyncio
async def test_remove_from_radio(self, test_db, client):
await _insert_contact(KEY_A, on_radio=True)
mock_radio_contact = MagicMock()
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=mock_radio_contact)
mock_result = MagicMock()
mock_result.type = EventType.OK
mock_mc.commands.remove_contact = AsyncMock(return_value=mock_result)
radio_manager._meshcore = mock_mc
with patch("app.dependencies.radio_manager") as mock_dep_rm:
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
response = await client.post(f"/api/contacts/{KEY_A}/remove-from-radio")
assert response.status_code == 200
mock_mc.commands.remove_contact.assert_called_once_with(mock_radio_contact)
# Verify on_radio flag updated in DB
contact = await ContactRepository.get_by_key(KEY_A)
assert contact.on_radio is False
@pytest.mark.asyncio
async def test_add_requires_connection(self, test_db, client):
with patch("app.dependencies.radio_manager") as mock_rm:
mock_rm.is_connected = False
mock_rm.meshcore = None
response = await client.post(f"/api/contacts/{KEY_A}/add-to-radio")
assert response.status_code == 503
@pytest.mark.asyncio
async def test_remove_not_found(self, test_db, client):
mock_mc = MagicMock()
with patch("app.dependencies.radio_manager") as mock_dep_rm:
mock_dep_rm.is_connected = True
mock_dep_rm.meshcore = mock_mc
response = await client.post(f"/api/contacts/{KEY_A}/remove-from-radio")
assert response.status_code == 404