Files
meshcore-hub/tests/test_common/test_models.py
T
Louis King 38a57f4cd4 perf(api): run handlers in threadpool, tune SQLite, precompute is_observer
Three related performance fixes for slow (>500ms) API responses.

1. Stop blocking the event loop. Route handlers were declared `async def`
   but ran synchronous SQLAlchemy queries (and synchronous Redis calls via
   the cache decorator) directly on the event loop, serializing requests.
   Convert all handlers to sync `def` so FastAPI runs them in its
   threadpool, and make the `@cached` decorator dual-mode (sync wrapper for
   sync handlers, async wrapper preserved for a future async/Postgres path).

2. Tune SQLite for concurrency. Enable WAL, busy_timeout and
   synchronous=NORMAL on every connection, and size the pool above the
   threadpool so handlers don't wait on connections. In-memory SQLite is
   guarded (no overflow-pool kwargs).

3. Precompute an indexed `nodes.is_observer` flag. The `observer=true`
   filter scanned ~68k+ event rows to find a handful of observers (the
   advertisements page calls it with limit=500). Replace the 5-way OR of
   subqueries with `WHERE nodes.is_observer = ?`. The collector sets the
   flag on first observation (in add_event_observer); the cleanup job
   clears it once a node's events are all pruned; a migration adds the
   column/index and backfills from the existing union.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 23:35:39 +01:00

281 lines
8.1 KiB
Python

"""Tests for database models."""
import pytest
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
from meshcore_hub.common.models import (
Base,
Node,
NodeTag,
Message,
Advertisement,
TracePath,
Telemetry,
EventLog,
EventObserver,
add_event_observer,
)
@pytest.fixture
def db_session():
"""Create an in-memory SQLite database session."""
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(engine)
engine.dispose()
class TestNodeModel:
"""Tests for Node model."""
def test_create_node(self, db_session) -> None:
"""Test creating a node."""
node = Node(
public_key="a" * 64,
name="Test Node",
adv_type="chat",
flags=218,
)
db_session.add(node)
db_session.commit()
assert node.id is not None
assert node.public_key == "a" * 64
assert node.name == "Test Node"
assert node.adv_type == "chat"
assert node.flags == 218
def test_node_tags_relationship(self, db_session) -> None:
"""Test node-tag relationship."""
node = Node(public_key="b" * 64, name="Tagged Node")
tag = NodeTag(key="altitude", value="150", value_type="number")
node.tags.append(tag)
db_session.add(node)
db_session.commit()
assert len(node.tags) == 1
assert node.tags[0].key == "altitude"
class TestMessageModel:
"""Tests for Message model."""
def test_create_contact_message(self, db_session) -> None:
"""Test creating a contact message."""
message = Message(
message_type="contact",
pubkey_prefix="01ab2186c4d5",
text="Hello World!",
path_len=3,
snr=15.5,
)
db_session.add(message)
db_session.commit()
assert message.id is not None
assert message.message_type == "contact"
assert message.text == "Hello World!"
def test_create_channel_message(self, db_session) -> None:
"""Test creating a channel message."""
message = Message(
message_type="channel",
channel_idx=4,
text="Channel broadcast",
path_len=10,
)
db_session.add(message)
db_session.commit()
assert message.channel_idx == 4
assert message.message_type == "channel"
class TestAdvertisementModel:
"""Tests for Advertisement model."""
def test_create_advertisement(self, db_session) -> None:
"""Test creating an advertisement."""
ad = Advertisement(
public_key="c" * 64,
name="Repeater-01",
adv_type="repeater",
flags=128,
)
db_session.add(ad)
db_session.commit()
assert ad.id is not None
assert ad.public_key == "c" * 64
assert ad.adv_type == "repeater"
class TestTracePathModel:
"""Tests for TracePath model."""
def test_create_trace_path(self, db_session) -> None:
"""Test creating a trace path."""
trace = TracePath(
initiator_tag=123456789,
path_len=3,
path_hashes=["4a", "b3", "fa"],
snr_values=[25.3, 18.7, 12.4],
hop_count=3,
)
db_session.add(trace)
db_session.commit()
assert trace.id is not None
assert trace.initiator_tag == 123456789
assert trace.path_hashes == ["4a", "b3", "fa"]
def test_multibyte_path_hashes_round_trip(self, db_session) -> None:
"""Test that multibyte (4-char) path hashes round-trip correctly."""
path_hashes = ["4a2b", "b3fa", "02cd"]
trace = TracePath(
initiator_tag=987654321,
path_len=3,
path_hashes=path_hashes,
snr_values=[20.0, 15.0, 10.0],
hop_count=3,
)
db_session.add(trace)
db_session.commit()
# Expire cached attributes to force reload from database
db_session.expire(trace)
assert trace.path_hashes == ["4a2b", "b3fa", "02cd"]
assert len(trace.path_hashes) == 3
def test_mixed_length_path_hashes_round_trip(self, db_session) -> None:
"""Test that mixed-length path hashes round-trip correctly."""
path_hashes = ["4a", "b3fa", "02"]
trace = TracePath(
initiator_tag=111222333,
path_len=3,
path_hashes=path_hashes,
snr_values=[22.0, 17.5, 11.0],
hop_count=3,
)
db_session.add(trace)
db_session.commit()
# Expire cached attributes to force reload from database
db_session.expire(trace)
assert trace.path_hashes == ["4a", "b3fa", "02"]
assert len(trace.path_hashes) == 3
class TestTelemetryModel:
"""Tests for Telemetry model."""
def test_create_telemetry(self, db_session) -> None:
"""Test creating a telemetry record."""
telemetry = Telemetry(
node_public_key="d" * 64,
parsed_data={
"temperature": 22.5,
"humidity": 65,
"battery": 3.8,
},
)
db_session.add(telemetry)
db_session.commit()
assert telemetry.id is not None
assert telemetry.parsed_data is not None
assert telemetry.parsed_data["temperature"] == 22.5
class TestEventLogModel:
"""Tests for EventLog model."""
def test_create_event_log(self, db_session) -> None:
"""Test creating an event log entry."""
event = EventLog(
event_type="BATTERY",
payload={
"battery_voltage": 3.8,
"battery_percentage": 75,
},
)
db_session.add(event)
db_session.commit()
assert event.id is not None
assert event.event_type == "BATTERY"
assert event.payload is not None
assert event.payload["battery_percentage"] == 75
class TestEventObserverModel:
"""Tests for EventObserver model and add_event_observer helper."""
def test_add_event_observer_stores_path_len(self, db_session) -> None:
"""add_event_observer accepts and stores path_len."""
node = Node(public_key="a" * 64, name="Observer")
db_session.add(node)
db_session.commit()
result = add_event_observer(
session=db_session,
event_type="message",
event_hash="abcdef12",
observer_node_id=node.id,
snr=10.5,
path_len=3,
)
assert result is True
observer = db_session.execute(select(EventObserver)).scalar_one()
assert observer.snr == 10.5
assert observer.path_len == 3
def test_add_event_observer_path_len_defaults_none(self, db_session) -> None:
"""add_event_observer defaults path_len to None when not provided."""
node = Node(public_key="b" * 64, name="Observer2")
db_session.add(node)
db_session.commit()
result = add_event_observer(
session=db_session,
event_type="trace",
event_hash="12345678",
observer_node_id=node.id,
)
assert result is True
observer = db_session.execute(select(EventObserver)).scalar_one()
assert observer.path_len is None
assert observer.snr is None
def test_add_event_observer_sets_is_observer_flag(self, db_session) -> None:
"""Observing an event marks the observer node with is_observer=True."""
node = Node(public_key="c" * 64, name="Observer3")
db_session.add(node)
db_session.commit()
assert node.is_observer is False
add_event_observer(
session=db_session,
event_type="advertisement",
event_hash="deadbeef",
observer_node_id=node.id,
)
db_session.commit()
db_session.refresh(node)
assert node.is_observer is True