mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-05-01 19:12:57 +02:00
190 lines
6.1 KiB
Python
190 lines
6.1 KiB
Python
"""Tests for repeater telemetry history: repository CRUD and embedded status response."""
|
|
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from app.models import CONTACT_TYPE_REPEATER
|
|
from app.repository import (
|
|
ContactRepository,
|
|
RepeaterTelemetryRepository,
|
|
)
|
|
|
|
KEY_A = "aa" * 32
|
|
KEY_B = "bb" * 32
|
|
|
|
SAMPLE_STATUS = {
|
|
"battery_volts": 4.15,
|
|
"tx_queue_len": 0,
|
|
"noise_floor_dbm": -100,
|
|
"last_rssi_dbm": -80,
|
|
"last_snr_db": 5.0,
|
|
"packets_received": 100,
|
|
"packets_sent": 50,
|
|
"airtime_seconds": 300,
|
|
"rx_airtime_seconds": 200,
|
|
"uptime_seconds": 1000,
|
|
"sent_flood": 10,
|
|
"sent_direct": 40,
|
|
"recv_flood": 60,
|
|
"recv_direct": 40,
|
|
"flood_dups": 5,
|
|
"direct_dups": 2,
|
|
"full_events": 0,
|
|
}
|
|
|
|
|
|
async def _insert_repeater(public_key: str, name: str = "Repeater"):
|
|
"""Insert a repeater contact into the test database."""
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": public_key,
|
|
"name": name,
|
|
"type": CONTACT_TYPE_REPEATER,
|
|
"flags": 0,
|
|
"direct_path": None,
|
|
"direct_path_len": -1,
|
|
"direct_path_hash_mode": -1,
|
|
"last_advert": None,
|
|
"lat": None,
|
|
"lon": None,
|
|
"last_seen": None,
|
|
"on_radio": False,
|
|
"last_contacted": None,
|
|
"first_seen": None,
|
|
}
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
async def _db(test_db):
|
|
"""Set up test DB and patch the repeater_telemetry module's db reference."""
|
|
from app.repository import repeater_telemetry
|
|
|
|
original = repeater_telemetry.db
|
|
repeater_telemetry.db = test_db
|
|
try:
|
|
yield test_db
|
|
finally:
|
|
repeater_telemetry.db = original
|
|
|
|
|
|
class TestRepeaterTelemetryRepository:
|
|
"""Tests for RepeaterTelemetryRepository CRUD operations with JSON blob storage."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_record_and_get_history(self, _db):
|
|
await _insert_repeater(KEY_A)
|
|
now = int(time.time())
|
|
|
|
await RepeaterTelemetryRepository.record(
|
|
public_key=KEY_A,
|
|
timestamp=now - 3600,
|
|
data={**SAMPLE_STATUS, "battery_volts": 4.15},
|
|
)
|
|
await RepeaterTelemetryRepository.record(
|
|
public_key=KEY_A,
|
|
timestamp=now,
|
|
data={**SAMPLE_STATUS, "battery_volts": 4.10},
|
|
)
|
|
|
|
history = await RepeaterTelemetryRepository.get_history(KEY_A, now - 7200)
|
|
assert len(history) == 2
|
|
assert history[0]["data"]["battery_volts"] == 4.15
|
|
assert history[1]["data"]["battery_volts"] == 4.10
|
|
assert history[0]["timestamp"] < history[1]["timestamp"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_history_filters_by_time(self, _db):
|
|
await _insert_repeater(KEY_A)
|
|
now = int(time.time())
|
|
|
|
await RepeaterTelemetryRepository.record(KEY_A, now - 7200, SAMPLE_STATUS)
|
|
await RepeaterTelemetryRepository.record(KEY_A, now - 3600, SAMPLE_STATUS)
|
|
await RepeaterTelemetryRepository.record(KEY_A, now, SAMPLE_STATUS)
|
|
|
|
history = await RepeaterTelemetryRepository.get_history(KEY_A, now - 3601)
|
|
assert len(history) == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_history_isolates_by_key(self, _db):
|
|
await _insert_repeater(KEY_A)
|
|
await _insert_repeater(KEY_B)
|
|
now = int(time.time())
|
|
|
|
await RepeaterTelemetryRepository.record(
|
|
KEY_A, now, {**SAMPLE_STATUS, "battery_volts": 4.1}
|
|
)
|
|
await RepeaterTelemetryRepository.record(
|
|
KEY_B, now, {**SAMPLE_STATUS, "battery_volts": 3.9}
|
|
)
|
|
|
|
history_a = await RepeaterTelemetryRepository.get_history(KEY_A, 0)
|
|
history_b = await RepeaterTelemetryRepository.get_history(KEY_B, 0)
|
|
assert len(history_a) == 1
|
|
assert len(history_b) == 1
|
|
assert history_a[0]["data"]["battery_volts"] == 4.1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_data_stored_as_json(self, _db):
|
|
"""Verify the data column stores valid JSON that round-trips correctly."""
|
|
await _insert_repeater(KEY_A)
|
|
now = int(time.time())
|
|
|
|
await RepeaterTelemetryRepository.record(KEY_A, now, SAMPLE_STATUS)
|
|
history = await RepeaterTelemetryRepository.get_history(KEY_A, 0)
|
|
assert len(history) == 1
|
|
assert history[0]["data"] == SAMPLE_STATUS
|
|
|
|
|
|
class TestTelemetryHistoryEndpoint:
|
|
"""Tests for the read-only GET telemetry-history endpoint."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_history_for_repeater(self, _db, client):
|
|
await _insert_repeater(KEY_A)
|
|
now = int(time.time())
|
|
await RepeaterTelemetryRepository.record(KEY_A, now, SAMPLE_STATUS)
|
|
|
|
resp = await client.get(f"/api/contacts/{KEY_A}/repeater/telemetry-history")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert len(data) == 1
|
|
assert data[0]["data"]["battery_volts"] == 4.15
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_empty_list_when_no_history(self, _db, client):
|
|
await _insert_repeater(KEY_A)
|
|
|
|
resp = await client.get(f"/api/contacts/{KEY_A}/repeater/telemetry-history")
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rejects_non_repeater(self, _db, client):
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": KEY_A,
|
|
"name": "Node",
|
|
"type": 0,
|
|
"flags": 0,
|
|
"direct_path": None,
|
|
"direct_path_len": -1,
|
|
"direct_path_hash_mode": -1,
|
|
"last_advert": None,
|
|
"lat": None,
|
|
"lon": None,
|
|
"last_seen": None,
|
|
"on_radio": False,
|
|
"last_contacted": None,
|
|
"first_seen": None,
|
|
}
|
|
)
|
|
resp = await client.get(f"/api/contacts/{KEY_A}/repeater/telemetry-history")
|
|
assert resp.status_code == 400
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_404_for_unknown_contact(self, _db, client):
|
|
resp = await client.get(f"/api/contacts/{KEY_A}/repeater/telemetry-history")
|
|
assert resp.status_code == 404
|