"""Tests for repeater telemetry history: repository CRUD and embedded status response.""" import time import pytest from app.models import CONTACT_TYPE_REPEATER from app.repository import ( ContactRepository, RepeaterTelemetryRepository, ) KEY_A = "aa" * 32 KEY_B = "bb" * 32 SAMPLE_STATUS = { "battery_volts": 4.15, "tx_queue_len": 0, "noise_floor_dbm": -100, "last_rssi_dbm": -80, "last_snr_db": 5.0, "packets_received": 100, "packets_sent": 50, "airtime_seconds": 300, "rx_airtime_seconds": 200, "uptime_seconds": 1000, "sent_flood": 10, "sent_direct": 40, "recv_flood": 60, "recv_direct": 40, "flood_dups": 5, "direct_dups": 2, "full_events": 0, } async def _insert_repeater(public_key: str, name: str = "Repeater"): """Insert a repeater contact into the test database.""" await ContactRepository.upsert( { "public_key": public_key, "name": name, "type": CONTACT_TYPE_REPEATER, "flags": 0, "direct_path": None, "direct_path_len": -1, "direct_path_hash_mode": -1, "last_advert": None, "lat": None, "lon": None, "last_seen": None, "on_radio": False, "last_contacted": None, "first_seen": None, } ) @pytest.fixture async def _db(test_db): """Set up test DB and patch the repeater_telemetry module's db reference.""" from app.repository import repeater_telemetry original = repeater_telemetry.db repeater_telemetry.db = test_db try: yield test_db finally: repeater_telemetry.db = original class TestRepeaterTelemetryRepository: """Tests for RepeaterTelemetryRepository CRUD operations with JSON blob storage.""" @pytest.mark.asyncio async def test_record_and_get_history(self, _db): await _insert_repeater(KEY_A) now = int(time.time()) await RepeaterTelemetryRepository.record( public_key=KEY_A, timestamp=now - 3600, data={**SAMPLE_STATUS, "battery_volts": 4.15}, ) await RepeaterTelemetryRepository.record( public_key=KEY_A, timestamp=now, data={**SAMPLE_STATUS, "battery_volts": 4.10}, ) history = await RepeaterTelemetryRepository.get_history(KEY_A, now - 7200) assert len(history) == 2 assert history[0]["data"]["battery_volts"] == 4.15 assert history[1]["data"]["battery_volts"] == 4.10 assert history[0]["timestamp"] < history[1]["timestamp"] @pytest.mark.asyncio async def test_get_history_filters_by_time(self, _db): await _insert_repeater(KEY_A) now = int(time.time()) await RepeaterTelemetryRepository.record(KEY_A, now - 7200, SAMPLE_STATUS) await RepeaterTelemetryRepository.record(KEY_A, now - 3600, SAMPLE_STATUS) await RepeaterTelemetryRepository.record(KEY_A, now, SAMPLE_STATUS) history = await RepeaterTelemetryRepository.get_history(KEY_A, now - 3601) assert len(history) == 2 @pytest.mark.asyncio async def test_get_history_isolates_by_key(self, _db): await _insert_repeater(KEY_A) await _insert_repeater(KEY_B) now = int(time.time()) await RepeaterTelemetryRepository.record( KEY_A, now, {**SAMPLE_STATUS, "battery_volts": 4.1} ) await RepeaterTelemetryRepository.record( KEY_B, now, {**SAMPLE_STATUS, "battery_volts": 3.9} ) history_a = await RepeaterTelemetryRepository.get_history(KEY_A, 0) history_b = await RepeaterTelemetryRepository.get_history(KEY_B, 0) assert len(history_a) == 1 assert len(history_b) == 1 assert history_a[0]["data"]["battery_volts"] == 4.1 @pytest.mark.asyncio async def test_data_stored_as_json(self, _db): """Verify the data column stores valid JSON that round-trips correctly.""" await _insert_repeater(KEY_A) now = int(time.time()) await RepeaterTelemetryRepository.record(KEY_A, now, SAMPLE_STATUS) history = await RepeaterTelemetryRepository.get_history(KEY_A, 0) assert len(history) == 1 assert history[0]["data"] == SAMPLE_STATUS class TestTelemetryHistoryInStatusResponse: """Tests that history is embedded in the status response (no separate endpoint).""" @pytest.mark.asyncio async def test_history_not_available_as_separate_endpoint(self, _db, client): """The old GET telemetry-history endpoint should be gone.""" await _insert_repeater(KEY_A) resp = await client.get(f"/api/contacts/{KEY_A}/repeater/telemetry-history") assert resp.status_code in (404, 405) @pytest.mark.asyncio async def test_history_endpoint_non_repeater_rejected(self, _db, client): await ContactRepository.upsert( { "public_key": KEY_A, "name": "Node", "type": 0, "flags": 0, "direct_path": None, "direct_path_len": -1, "direct_path_hash_mode": -1, "last_advert": None, "lat": None, "lon": None, "last_seen": None, "on_radio": False, "last_contacted": None, "first_seen": None, } ) resp = await client.get(f"/api/contacts/{KEY_A}/repeater/telemetry-history") # Either 404 (method not found) or 400 (not a repeater) — endpoint is gone assert resp.status_code in (400, 404, 405)