"""API test fixtures.""" import os import tempfile from contextlib import contextmanager from datetime import datetime, timezone from typing import Generator import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine, event as sa_event, text from sqlalchemy.engine.url import make_url from sqlalchemy.orm import sessionmaker from meshcore_hub.api.app import create_app from meshcore_hub.api.dependencies import ( get_db_session, get_mqtt_client, get_db_manager, ) from meshcore_hub.common.database import DatabaseManager, create_database_engine from meshcore_hub.common.models import ( Advertisement, Base, Channel, EventObserver, Message, Node, NodeTag, Telemetry, TracePath, UserProfile, UserProfileNode, ) @pytest.fixture(scope="session") def test_db_path(): """Session-scoped temporary database file path. One file per pytest session; the engine below builds schema on it once. """ fd, path = tempfile.mkstemp(suffix=".db") os.close(fd) yield path if os.path.exists(path): os.unlink(path) @pytest.fixture(scope="session") def db_backend() -> str: """Active test database backend (``sqlite`` or ``postgres``). Controlled by ``TEST_DATABASE_BACKEND`` env var (default: ``sqlite``). When ``postgres``, ``TEST_POSTGRES_URL`` must also be set. """ backend = os.environ.get("TEST_DATABASE_BACKEND", "sqlite").lower() if backend not in ("sqlite", "postgres"): raise ValueError( f"TEST_DATABASE_BACKEND must be 'sqlite' or 'postgres', got: {backend}" ) return backend @pytest.fixture(scope="session") def db_url(db_backend: str, test_db_path: str, request) -> Generator[str, None, None]: """Database URL for the active backend. For Postgres, each pytest-xdist worker gets its own database (e.g. ``test_gw0``) to avoid truncation races between parallel workers. """ if db_backend == "postgres": env_url = os.environ.get("TEST_POSTGRES_URL") if not env_url: pytest.skip( "TEST_DATABASE_BACKEND=postgres but TEST_POSTGRES_URL is not set; " "e.g. TEST_POSTGRES_URL=postgresql+psycopg2://postgres:postgres@localhost:55432/test" ) assert env_url is not None worker_id = "master" if hasattr(request.config, "workerinput"): worker_id = request.config.workerinput["workerid"] base_url = make_url(env_url) worker_db = f"{base_url.database}_{worker_id}" worker_url = base_url.set(database=worker_db).render_as_string( hide_password=False ) admin_url = base_url.set(database="postgres") admin_engine = create_engine( admin_url.render_as_string(hide_password=False), isolation_level="AUTOCOMMIT", ) try: with admin_engine.connect() as conn: exists = conn.execute( text("SELECT 1 FROM pg_database WHERE datname = :name"), {"name": worker_db}, ).scalar() if not exists: conn.execute(text(f'CREATE DATABASE "{worker_db}"')) finally: admin_engine.dispose() yield worker_url admin_engine = create_engine( admin_url.render_as_string(hide_password=False), isolation_level="AUTOCOMMIT", ) try: with admin_engine.connect() as conn: conn.execute( text( "SELECT pg_terminate_backend(pid) " "FROM pg_stat_activity " "WHERE datname = :name AND pid <> pg_backend_pid()" ), {"name": worker_db}, ) conn.execute(text(f'DROP DATABASE IF EXISTS "{worker_db}"')) finally: admin_engine.dispose() else: yield f"sqlite:///{test_db_path}" @pytest.fixture(scope="session") def api_db_engine(db_url: str, db_backend: str): """Session-scoped database engine. Schema is built once per pytest session. For Postgres, uses the production ``create_database_engine`` factory so the test exercises the same ``connect_args`` (including ``-ctimezone=UTC``). Each xdist worker has its own database (see ``db_url``), so no locking is needed. """ if db_backend == "postgres": engine = create_database_engine(db_url) Base.metadata.drop_all(engine) else: engine = create_engine( db_url, connect_args={"check_same_thread": False}, ) @sa_event.listens_for(engine, "connect") def set_sqlite_pragma( dbapi_connection: object, connection_record: object ) -> None: cursor = dbapi_connection.cursor() # type: ignore[attr-defined] cursor.execute("PRAGMA foreign_keys=ON") cursor.close() Base.metadata.create_all(engine) yield engine Base.metadata.drop_all(engine) engine.dispose() def _truncate_all(engine) -> None: """Delete rows from every table in child-first order (FK-safe).""" with engine.begin() as conn: for table in reversed(Base.metadata.sorted_tables): conn.execute(table.delete()) @pytest.fixture def api_db_session(api_db_engine): """Per-test session bound to the shared session-scoped engine. Rows are truncated at teardown so each test starts with empty tables without paying schema-build cost. Tests must ``commit()`` their seed data before invoking the test client (the sample_* fixtures already do). """ Session = sessionmaker(bind=api_db_engine) session = Session() yield session session.close() _truncate_all(api_db_engine) @pytest.fixture(scope="session") def mock_mqtt(): """Session-scoped mock MQTT client (no per-test state).""" from unittest.mock import MagicMock mock = MagicMock() mock.connect.return_value = None mock.start_background.return_value = None mock.stop.return_value = None mock.disconnect.return_value = None mock.publish_command.return_value = None return mock @pytest.fixture(scope="session") def mock_db_manager(api_db_engine): """Session-scoped mock database manager backed by the shared engine.""" from unittest.mock import MagicMock manager = MagicMock(spec=DatabaseManager) Session = sessionmaker(bind=api_db_engine) manager.get_session = lambda: Session() @contextmanager def _session_scope(): session = Session() try: yield session session.commit() except Exception: session.rollback() raise finally: session.close() manager.session_scope = _session_scope return manager @pytest.fixture(autouse=True) def _isolate_db_global(monkeypatch: pytest.MonkeyPatch, mock_db_manager) -> None: """Pin ``meshcore_hub.api.app._db_manager`` to the mock for every test. Function-scoped autouse so tests that mutate the global (e.g. the lifespan tests in ``test_cache.py``) cannot leak state to siblings. ``monkeypatch`` restores the prior value on exit. """ import meshcore_hub.api.app as app_module monkeypatch.setattr(app_module, "_db_manager", mock_db_manager) def _wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager) -> None: """Install the standard DB/MQTT dependency overrides on ``app``.""" Session = sessionmaker(bind=api_db_engine) def override_get_db_manager(request=None): return mock_db_manager def override_get_db_session(): session = Session() try: yield session finally: session.close() def override_get_mqtt_client(request=None): return mock_mqtt app.dependency_overrides[get_db_manager] = override_get_db_manager app.dependency_overrides[get_db_session] = override_get_db_session app.dependency_overrides[get_mqtt_client] = override_get_mqtt_client @pytest.fixture(scope="module") def app_no_auth(db_url, api_db_engine, mock_mqtt, mock_db_manager): """Module-scoped FastAPI app with no authentication. Built once per test module; ``create_app`` is the second-most expensive setup step (~0.3s), so sharing it across a module is a major win. The ``_isolate_db_global`` autouse fixture handles the global ``_db_manager`` so this fixture doesn't need a ``with patch(...)`` context. """ app = create_app( database_url=db_url, read_key=None, admin_key=None, ) _wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager) yield app @pytest.fixture(scope="module") def app_with_auth(db_url, api_db_engine, mock_mqtt, mock_db_manager): """Module-scoped FastAPI app with authentication enabled.""" app = create_app( database_url=db_url, read_key="test-read-key", admin_key="test-admin-key", ) _wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager) yield app @pytest.fixture def client_no_auth(app_no_auth) -> TestClient: """Test client with no authentication.""" return TestClient(app_no_auth, raise_server_exceptions=True) @pytest.fixture def client_with_auth(app_with_auth) -> TestClient: """Test client with authentication enabled.""" return TestClient(app_with_auth, raise_server_exceptions=True) @pytest.fixture def sample_node(api_db_session): """Create a sample node in the database.""" node = Node( public_key="abc123def456abc123def456abc123de", name="Test Node", adv_type="REPEATER", first_seen=datetime.now(timezone.utc), last_seen=datetime.now(timezone.utc), ) api_db_session.add(node) api_db_session.commit() api_db_session.refresh(node) return node @pytest.fixture def sample_node_tag(api_db_session, sample_node): """Create a sample node tag in the database.""" tag = NodeTag( node_id=sample_node.id, key="environment", value="production", ) api_db_session.add(tag) api_db_session.commit() api_db_session.refresh(tag) return tag @pytest.fixture def sample_message(api_db_session): """Create a sample message in the database.""" message = Message( message_type="direct", pubkey_prefix="abc123", text="Hello World", received_at=datetime.now(timezone.utc), ) api_db_session.add(message) api_db_session.commit() api_db_session.refresh(message) return message @pytest.fixture def sample_advertisement(api_db_session): """Create a sample advertisement in the database.""" advert = Advertisement( public_key="abc123def456abc123def456abc123de", name="TestNode", adv_type="REPEATER", received_at=datetime.now(timezone.utc), ) api_db_session.add(advert) api_db_session.commit() api_db_session.refresh(advert) return advert @pytest.fixture def sample_telemetry(api_db_session): """Create a sample telemetry record in the database.""" telemetry = Telemetry( node_public_key="abc123def456abc123def456abc123de", parsed_data={ "battery_level": 85.5, "temperature": 25.3, }, received_at=datetime.now(timezone.utc), ) api_db_session.add(telemetry) api_db_session.commit() api_db_session.refresh(telemetry) return telemetry @pytest.fixture def sample_trace_path(api_db_session): """Create a sample trace path in the database.""" trace = TracePath( initiator_tag=12345, path_hashes=["abc123", "def456", "ghi789"], hop_count=3, received_at=datetime.now(timezone.utc), ) api_db_session.add(trace) api_db_session.commit() api_db_session.refresh(trace) return trace @pytest.fixture def receiver_node(api_db_session): """Create a receiver node in the database.""" node = Node( public_key="receiver123receiver123receiver12", name="Receiver Node", adv_type="REPEATER", first_seen=datetime.now(timezone.utc), last_seen=datetime.now(timezone.utc), ) api_db_session.add(node) api_db_session.commit() api_db_session.refresh(node) return node @pytest.fixture def sample_message_with_receiver(api_db_session, receiver_node): """Create a message with a receiver node.""" event_hash = "deadbeefdeadbeefdeadbeefdeadbe01" message = Message( message_type="channel", channel_idx=17, pubkey_prefix="xyz789", text="Channel message with receiver", received_at=datetime.now(timezone.utc), observer_node_id=receiver_node.id, event_hash=event_hash, ) api_db_session.add(message) api_db_session.commit() api_db_session.refresh(message) api_db_session.add( EventObserver( event_type="message", event_hash=event_hash, observer_node_id=receiver_node.id, observed_at=datetime.now(timezone.utc), ) ) api_db_session.commit() return message @pytest.fixture def sample_advertisement_with_receiver(api_db_session, sample_node, receiver_node): """Create an advertisement with source and receiver nodes.""" event_hash = "deadbeefdeadbeefdeadbeefdeadbe02" advert = Advertisement( public_key=sample_node.public_key, name="SourceNode", adv_type="REPEATER", received_at=datetime.now(timezone.utc), node_id=sample_node.id, observer_node_id=receiver_node.id, event_hash=event_hash, ) api_db_session.add(advert) api_db_session.commit() api_db_session.refresh(advert) api_db_session.add( EventObserver( event_type="advertisement", event_hash=event_hash, observer_node_id=receiver_node.id, observed_at=datetime.now(timezone.utc), ) ) api_db_session.commit() return advert @pytest.fixture def sample_telemetry_with_receiver(api_db_session, receiver_node): """Create a telemetry record with a receiver node.""" event_hash = "deadbeefdeadbeefdeadbeefdeadbe03" telemetry = Telemetry( node_public_key="xyz789xyz789xyz789xyz789xyz789xy", parsed_data={"battery_level": 50.0}, received_at=datetime.now(timezone.utc), observer_node_id=receiver_node.id, event_hash=event_hash, ) api_db_session.add(telemetry) api_db_session.commit() api_db_session.refresh(telemetry) api_db_session.add( EventObserver( event_type="telemetry", event_hash=event_hash, observer_node_id=receiver_node.id, observed_at=datetime.now(timezone.utc), ) ) api_db_session.commit() return telemetry @pytest.fixture def sample_trace_path_with_receiver(api_db_session, receiver_node): """Create a trace path with a receiver node.""" event_hash = "deadbeefdeadbeefdeadbeefdeadbe04" trace = TracePath( initiator_tag=99999, path_hashes=["aaa111", "bbb222"], hop_count=2, received_at=datetime.now(timezone.utc), observer_node_id=receiver_node.id, event_hash=event_hash, ) api_db_session.add(trace) api_db_session.commit() api_db_session.refresh(trace) api_db_session.add( EventObserver( event_type="trace", event_hash=event_hash, observer_node_id=receiver_node.id, observed_at=datetime.now(timezone.utc), ) ) api_db_session.commit() return trace @pytest.fixture def sample_node_with_name_tag(api_db_session): """Create a node with a name tag for search testing.""" node = Node( public_key="searchable123searchable123searc", name="Original Name", adv_type="CLIENT", first_seen=datetime.now(timezone.utc), ) api_db_session.add(node) api_db_session.commit() tag = NodeTag( node_id=node.id, key="name", value="Friendly Search Name", ) api_db_session.add(tag) api_db_session.commit() api_db_session.refresh(node) return node @pytest.fixture def sample_user_profile(api_db_session): """Create a sample user profile in the database.""" profile = UserProfile( user_id="oidc-user-123", name="Test User", callsign="W1TEST", ) api_db_session.add(profile) api_db_session.commit() api_db_session.refresh(profile) return profile @pytest.fixture def sample_operator_profile(api_db_session): """Create a sample operator user profile for tag editing tests.""" profile = UserProfile( user_id="operator-123", name="Test Operator", ) api_db_session.add(profile) api_db_session.commit() api_db_session.refresh(profile) return profile @pytest.fixture def sample_operator_adoption(api_db_session, sample_operator_profile, sample_node): """Create an adoption record linking operator to the sample node.""" association = UserProfileNode( user_profile_id=sample_operator_profile.id, node_id=sample_node.id, ) api_db_session.add(association) api_db_session.commit() api_db_session.refresh(association) return association @pytest.fixture def sample_adopted_node(api_db_session, sample_user_profile, sample_node): """Create a sample adopted node association.""" association = UserProfileNode( user_profile_id=sample_user_profile.id, node_id=sample_node.id, ) api_db_session.add(association) api_db_session.commit() api_db_session.refresh(association) return association @pytest.fixture def sample_channel(api_db_session): """Create a sample community channel in the database.""" channel = Channel( name="TestChannel", key_hex="AABBCCDDEEFF00112233445566778899", channel_hash=Channel.compute_channel_hash("AABBCCDDEEFF00112233445566778899"), visibility="community", enabled=True, ) api_db_session.add(channel) api_db_session.commit() api_db_session.refresh(channel) return channel @pytest.fixture def sample_member_channel(api_db_session): """Create a sample member-only channel in the database.""" key = "11223344556677889900AABBCCDDEEFF" channel = Channel( name="MemberChannel", key_hex=key, channel_hash=Channel.compute_channel_hash(key), visibility="member", enabled=True, ) api_db_session.add(channel) api_db_session.commit() api_db_session.refresh(channel) return channel @pytest.fixture def sample_admin_channel(api_db_session): """Create a sample admin-only channel in the database.""" key = "FFEEDDCCBBAA99887766554433221100" channel = Channel( name="AdminChannel", key_hex=key, channel_hash=Channel.compute_channel_hash(key), visibility="admin", enabled=True, ) api_db_session.add(channel) api_db_session.commit() api_db_session.refresh(channel) return channel