mirror of
https://github.com/ipnet-mesh/meshcore-hub.git
synced 2026-06-16 08:04:44 +02:00
96a78d79f6
- Default-off coverage in pyproject.toml addopts; opt-in via make test-cov - Add pytest-xdist for parallel execution (make test = pytest -nauto --no-cov) - Promote API test fixtures to session/module scope (engine, app, mocks); per-test isolation via table truncation instead of schema rebuild - Remove Makefile include .env/export that leaked config vars into tests; docker-compose reads .env natively - Add _ignore_dotenv autouse fixture: disables env_file, clears leaked env vars from Settings fields and Click CLI envvars - Patch time.sleep in 3 subscriber scheduler tests (~3s -> ~0.03s) - Fix pytest.raises(Exception, match='') warning -> IntegrityError - Add .venv activation to .envrc - Suppress warn_unused_ignores for tests in mypy config (single-file pre-commit checks lack full-project context)
559 lines
16 KiB
Python
559 lines
16 KiB
Python
"""API test fixtures."""
|
|
|
|
import os
|
|
import tempfile
|
|
from contextlib import contextmanager
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine, event as sa_event
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from meshcore_hub.api.app import create_app
|
|
from meshcore_hub.api.dependencies import (
|
|
get_db_session,
|
|
get_mqtt_client,
|
|
get_db_manager,
|
|
)
|
|
from meshcore_hub.common.database import DatabaseManager
|
|
from meshcore_hub.common.models import (
|
|
Advertisement,
|
|
Base,
|
|
Channel,
|
|
EventObserver,
|
|
Message,
|
|
Node,
|
|
NodeTag,
|
|
Telemetry,
|
|
TracePath,
|
|
UserProfile,
|
|
UserProfileNode,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def test_db_path():
|
|
"""Session-scoped temporary database file path.
|
|
|
|
One file per pytest session; the engine below builds schema on it once.
|
|
"""
|
|
fd, path = tempfile.mkstemp(suffix=".db")
|
|
os.close(fd)
|
|
yield path
|
|
if os.path.exists(path):
|
|
os.unlink(path)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def api_db_engine(test_db_path):
|
|
"""Session-scoped SQLite engine. Schema is built once per pytest session.
|
|
|
|
Previously this was function-scoped and rebuilt ~15 tables for every test,
|
|
costing ~0.2s/test. Promoting it to session scope eliminates that. Per-test
|
|
isolation is handled by truncation in ``api_db_session``.
|
|
"""
|
|
db_url = f"sqlite:///{test_db_path}"
|
|
engine = create_engine(
|
|
db_url,
|
|
connect_args={"check_same_thread": False},
|
|
)
|
|
|
|
@sa_event.listens_for(engine, "connect")
|
|
def set_sqlite_pragma(dbapi_connection: object, connection_record: object) -> None:
|
|
cursor = dbapi_connection.cursor() # type: ignore[attr-defined]
|
|
cursor.execute("PRAGMA foreign_keys=ON")
|
|
cursor.close()
|
|
|
|
Base.metadata.create_all(engine)
|
|
yield engine
|
|
Base.metadata.drop_all(engine)
|
|
engine.dispose()
|
|
|
|
|
|
def _truncate_all(engine) -> None:
|
|
"""Delete rows from every table in child-first order (FK-safe)."""
|
|
with engine.begin() as conn:
|
|
for table in reversed(Base.metadata.sorted_tables):
|
|
conn.execute(table.delete())
|
|
|
|
|
|
@pytest.fixture
|
|
def api_db_session(api_db_engine):
|
|
"""Per-test session bound to the shared session-scoped engine.
|
|
|
|
Rows are truncated at teardown so each test starts with empty tables
|
|
without paying schema-build cost. Tests must ``commit()`` their seed
|
|
data before invoking the test client (the sample_* fixtures already do).
|
|
"""
|
|
Session = sessionmaker(bind=api_db_engine)
|
|
session = Session()
|
|
yield session
|
|
session.close()
|
|
_truncate_all(api_db_engine)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def mock_mqtt():
|
|
"""Session-scoped mock MQTT client (no per-test state)."""
|
|
from unittest.mock import MagicMock
|
|
|
|
mock = MagicMock()
|
|
mock.connect.return_value = None
|
|
mock.start_background.return_value = None
|
|
mock.stop.return_value = None
|
|
mock.disconnect.return_value = None
|
|
mock.publish_command.return_value = None
|
|
return mock
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def mock_db_manager(api_db_engine):
|
|
"""Session-scoped mock database manager backed by the shared engine."""
|
|
from unittest.mock import MagicMock
|
|
|
|
manager = MagicMock(spec=DatabaseManager)
|
|
Session = sessionmaker(bind=api_db_engine)
|
|
manager.get_session = lambda: Session()
|
|
|
|
@contextmanager
|
|
def _session_scope():
|
|
session = Session()
|
|
try:
|
|
yield session
|
|
session.commit()
|
|
except Exception:
|
|
session.rollback()
|
|
raise
|
|
finally:
|
|
session.close()
|
|
|
|
manager.session_scope = _session_scope
|
|
return manager
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolate_db_global(monkeypatch: pytest.MonkeyPatch, mock_db_manager) -> None:
|
|
"""Pin ``meshcore_hub.api.app._db_manager`` to the mock for every test.
|
|
|
|
Function-scoped autouse so tests that mutate the global (e.g. the lifespan
|
|
tests in ``test_cache.py``) cannot leak state to siblings. ``monkeypatch``
|
|
restores the prior value on exit.
|
|
"""
|
|
import meshcore_hub.api.app as app_module
|
|
|
|
monkeypatch.setattr(app_module, "_db_manager", mock_db_manager)
|
|
|
|
|
|
def _wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager) -> None:
|
|
"""Install the standard DB/MQTT dependency overrides on ``app``."""
|
|
Session = sessionmaker(bind=api_db_engine)
|
|
|
|
def override_get_db_manager(request=None):
|
|
return mock_db_manager
|
|
|
|
def override_get_db_session():
|
|
session = Session()
|
|
try:
|
|
yield session
|
|
finally:
|
|
session.close()
|
|
|
|
def override_get_mqtt_client(request=None):
|
|
return mock_mqtt
|
|
|
|
app.dependency_overrides[get_db_manager] = override_get_db_manager
|
|
app.dependency_overrides[get_db_session] = override_get_db_session
|
|
app.dependency_overrides[get_mqtt_client] = override_get_mqtt_client
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def app_no_auth(test_db_path, api_db_engine, mock_mqtt, mock_db_manager):
|
|
"""Module-scoped FastAPI app with no authentication.
|
|
|
|
Built once per test module; ``create_app`` is the second-most expensive
|
|
setup step (~0.3s), so sharing it across a module is a major win. The
|
|
``_isolate_db_global`` autouse fixture handles the global ``_db_manager``
|
|
so this fixture doesn't need a ``with patch(...)`` context.
|
|
"""
|
|
db_url = f"sqlite:///{test_db_path}"
|
|
app = create_app(
|
|
database_url=db_url,
|
|
read_key=None,
|
|
admin_key=None,
|
|
)
|
|
_wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager)
|
|
yield app
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def app_with_auth(test_db_path, api_db_engine, mock_mqtt, mock_db_manager):
|
|
"""Module-scoped FastAPI app with authentication enabled."""
|
|
db_url = f"sqlite:///{test_db_path}"
|
|
app = create_app(
|
|
database_url=db_url,
|
|
read_key="test-read-key",
|
|
admin_key="test-admin-key",
|
|
)
|
|
_wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager)
|
|
yield app
|
|
|
|
|
|
@pytest.fixture
|
|
def client_no_auth(app_no_auth) -> TestClient:
|
|
"""Test client with no authentication."""
|
|
return TestClient(app_no_auth, raise_server_exceptions=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def client_with_auth(app_with_auth) -> TestClient:
|
|
"""Test client with authentication enabled."""
|
|
return TestClient(app_with_auth, raise_server_exceptions=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_node(api_db_session):
|
|
"""Create a sample node in the database."""
|
|
node = Node(
|
|
public_key="abc123def456abc123def456abc123de",
|
|
name="Test Node",
|
|
adv_type="REPEATER",
|
|
first_seen=datetime.now(timezone.utc),
|
|
last_seen=datetime.now(timezone.utc),
|
|
)
|
|
api_db_session.add(node)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(node)
|
|
return node
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_node_tag(api_db_session, sample_node):
|
|
"""Create a sample node tag in the database."""
|
|
tag = NodeTag(
|
|
node_id=sample_node.id,
|
|
key="environment",
|
|
value="production",
|
|
)
|
|
api_db_session.add(tag)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(tag)
|
|
return tag
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_message(api_db_session):
|
|
"""Create a sample message in the database."""
|
|
message = Message(
|
|
message_type="direct",
|
|
pubkey_prefix="abc123",
|
|
text="Hello World",
|
|
received_at=datetime.now(timezone.utc),
|
|
)
|
|
api_db_session.add(message)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(message)
|
|
return message
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_advertisement(api_db_session):
|
|
"""Create a sample advertisement in the database."""
|
|
advert = Advertisement(
|
|
public_key="abc123def456abc123def456abc123de",
|
|
name="TestNode",
|
|
adv_type="REPEATER",
|
|
received_at=datetime.now(timezone.utc),
|
|
)
|
|
api_db_session.add(advert)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(advert)
|
|
return advert
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_telemetry(api_db_session):
|
|
"""Create a sample telemetry record in the database."""
|
|
telemetry = Telemetry(
|
|
node_public_key="abc123def456abc123def456abc123de",
|
|
parsed_data={
|
|
"battery_level": 85.5,
|
|
"temperature": 25.3,
|
|
},
|
|
received_at=datetime.now(timezone.utc),
|
|
)
|
|
api_db_session.add(telemetry)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(telemetry)
|
|
return telemetry
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_trace_path(api_db_session):
|
|
"""Create a sample trace path in the database."""
|
|
trace = TracePath(
|
|
initiator_tag=12345,
|
|
path_hashes=["abc123", "def456", "ghi789"],
|
|
hop_count=3,
|
|
received_at=datetime.now(timezone.utc),
|
|
)
|
|
api_db_session.add(trace)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(trace)
|
|
return trace
|
|
|
|
|
|
@pytest.fixture
|
|
def receiver_node(api_db_session):
|
|
"""Create a receiver node in the database."""
|
|
node = Node(
|
|
public_key="receiver123receiver123receiver12",
|
|
name="Receiver Node",
|
|
adv_type="REPEATER",
|
|
first_seen=datetime.now(timezone.utc),
|
|
last_seen=datetime.now(timezone.utc),
|
|
)
|
|
api_db_session.add(node)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(node)
|
|
return node
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_message_with_receiver(api_db_session, receiver_node):
|
|
"""Create a message with a receiver node."""
|
|
event_hash = "deadbeefdeadbeefdeadbeefdeadbe01"
|
|
message = Message(
|
|
message_type="channel",
|
|
channel_idx=17,
|
|
pubkey_prefix="xyz789",
|
|
text="Channel message with receiver",
|
|
received_at=datetime.now(timezone.utc),
|
|
observer_node_id=receiver_node.id,
|
|
event_hash=event_hash,
|
|
)
|
|
api_db_session.add(message)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(message)
|
|
|
|
api_db_session.add(
|
|
EventObserver(
|
|
event_type="message",
|
|
event_hash=event_hash,
|
|
observer_node_id=receiver_node.id,
|
|
observed_at=datetime.now(timezone.utc),
|
|
)
|
|
)
|
|
api_db_session.commit()
|
|
return message
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_advertisement_with_receiver(api_db_session, sample_node, receiver_node):
|
|
"""Create an advertisement with source and receiver nodes."""
|
|
event_hash = "deadbeefdeadbeefdeadbeefdeadbe02"
|
|
advert = Advertisement(
|
|
public_key=sample_node.public_key,
|
|
name="SourceNode",
|
|
adv_type="REPEATER",
|
|
received_at=datetime.now(timezone.utc),
|
|
node_id=sample_node.id,
|
|
observer_node_id=receiver_node.id,
|
|
event_hash=event_hash,
|
|
)
|
|
api_db_session.add(advert)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(advert)
|
|
|
|
api_db_session.add(
|
|
EventObserver(
|
|
event_type="advertisement",
|
|
event_hash=event_hash,
|
|
observer_node_id=receiver_node.id,
|
|
observed_at=datetime.now(timezone.utc),
|
|
)
|
|
)
|
|
api_db_session.commit()
|
|
return advert
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_telemetry_with_receiver(api_db_session, receiver_node):
|
|
"""Create a telemetry record with a receiver node."""
|
|
event_hash = "deadbeefdeadbeefdeadbeefdeadbe03"
|
|
telemetry = Telemetry(
|
|
node_public_key="xyz789xyz789xyz789xyz789xyz789xy",
|
|
parsed_data={"battery_level": 50.0},
|
|
received_at=datetime.now(timezone.utc),
|
|
observer_node_id=receiver_node.id,
|
|
event_hash=event_hash,
|
|
)
|
|
api_db_session.add(telemetry)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(telemetry)
|
|
|
|
api_db_session.add(
|
|
EventObserver(
|
|
event_type="telemetry",
|
|
event_hash=event_hash,
|
|
observer_node_id=receiver_node.id,
|
|
observed_at=datetime.now(timezone.utc),
|
|
)
|
|
)
|
|
api_db_session.commit()
|
|
return telemetry
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_trace_path_with_receiver(api_db_session, receiver_node):
|
|
"""Create a trace path with a receiver node."""
|
|
event_hash = "deadbeefdeadbeefdeadbeefdeadbe04"
|
|
trace = TracePath(
|
|
initiator_tag=99999,
|
|
path_hashes=["aaa111", "bbb222"],
|
|
hop_count=2,
|
|
received_at=datetime.now(timezone.utc),
|
|
observer_node_id=receiver_node.id,
|
|
event_hash=event_hash,
|
|
)
|
|
api_db_session.add(trace)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(trace)
|
|
|
|
api_db_session.add(
|
|
EventObserver(
|
|
event_type="trace",
|
|
event_hash=event_hash,
|
|
observer_node_id=receiver_node.id,
|
|
observed_at=datetime.now(timezone.utc),
|
|
)
|
|
)
|
|
api_db_session.commit()
|
|
return trace
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_node_with_name_tag(api_db_session):
|
|
"""Create a node with a name tag for search testing."""
|
|
node = Node(
|
|
public_key="searchable123searchable123searc",
|
|
name="Original Name",
|
|
adv_type="CLIENT",
|
|
first_seen=datetime.now(timezone.utc),
|
|
)
|
|
api_db_session.add(node)
|
|
api_db_session.commit()
|
|
|
|
tag = NodeTag(
|
|
node_id=node.id,
|
|
key="name",
|
|
value="Friendly Search Name",
|
|
)
|
|
api_db_session.add(tag)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(node)
|
|
return node
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_user_profile(api_db_session):
|
|
"""Create a sample user profile in the database."""
|
|
profile = UserProfile(
|
|
user_id="oidc-user-123",
|
|
name="Test User",
|
|
callsign="W1TEST",
|
|
)
|
|
api_db_session.add(profile)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(profile)
|
|
return profile
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_operator_profile(api_db_session):
|
|
"""Create a sample operator user profile for tag editing tests."""
|
|
profile = UserProfile(
|
|
user_id="operator-123",
|
|
name="Test Operator",
|
|
)
|
|
api_db_session.add(profile)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(profile)
|
|
return profile
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_operator_adoption(api_db_session, sample_operator_profile, sample_node):
|
|
"""Create an adoption record linking operator to the sample node."""
|
|
association = UserProfileNode(
|
|
user_profile_id=sample_operator_profile.id,
|
|
node_id=sample_node.id,
|
|
)
|
|
api_db_session.add(association)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(association)
|
|
return association
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_adopted_node(api_db_session, sample_user_profile, sample_node):
|
|
"""Create a sample adopted node association."""
|
|
association = UserProfileNode(
|
|
user_profile_id=sample_user_profile.id,
|
|
node_id=sample_node.id,
|
|
)
|
|
api_db_session.add(association)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(association)
|
|
return association
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_channel(api_db_session):
|
|
"""Create a sample community channel in the database."""
|
|
channel = Channel(
|
|
name="TestChannel",
|
|
key_hex="AABBCCDDEEFF00112233445566778899",
|
|
channel_hash=Channel.compute_channel_hash("AABBCCDDEEFF00112233445566778899"),
|
|
visibility="community",
|
|
enabled=True,
|
|
)
|
|
api_db_session.add(channel)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(channel)
|
|
return channel
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_member_channel(api_db_session):
|
|
"""Create a sample member-only channel in the database."""
|
|
key = "11223344556677889900AABBCCDDEEFF"
|
|
channel = Channel(
|
|
name="MemberChannel",
|
|
key_hex=key,
|
|
channel_hash=Channel.compute_channel_hash(key),
|
|
visibility="member",
|
|
enabled=True,
|
|
)
|
|
api_db_session.add(channel)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(channel)
|
|
return channel
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_admin_channel(api_db_session):
|
|
"""Create a sample admin-only channel in the database."""
|
|
key = "FFEEDDCCBBAA99887766554433221100"
|
|
channel = Channel(
|
|
name="AdminChannel",
|
|
key_hex=key,
|
|
channel_hash=Channel.compute_channel_hash(key),
|
|
visibility="admin",
|
|
enabled=True,
|
|
)
|
|
api_db_session.add(channel)
|
|
api_db_session.commit()
|
|
api_db_session.refresh(channel)
|
|
return channel
|