Files
meshcore-hub/tests/test_api/conftest.py
T
Louis King c48db03afb feat(spam): score messages at ingest and hide likely spam
Add an optional, off-by-default spam-detection feature that scores each
message's spam likelihood at ingest, stores the score on the row, and lets
the display layer hide likely-spam by default behind a "show potential spam"
toggle. Nothing is ever dropped at ingest, so the threshold can be retuned
without reprocessing.

Scoring (collector/spam.py): windowed COUNT(*) over new
(path_prefix, received_at) and (sender_normalized, received_at) indexes —
joint path+sender signal plus a sender-name signal (trailing-digit suffix
stripped so bob1/bob2 collapse to bob). When the path is short/zero-hop or
absent, the name signal stands alone at full weight so local spam is still
flaggable. A background sweep re-scores recent rows with hindsight to catch
the leading edge of bursts. The collector logs each score (WARNING at/above
the threshold).

Display: the messages API gains include_spam and a master-switch-aware
hide-filter; the SPA shows the toggle + a badge only when the feature is on.

Config: FEATURE_SPAM_DETECTION is the single operator switch, bridged in
Compose to the backend SPAM_DETECTION_ENABLED for collector + api (mirrors
the FEATURE_PACKETS / RAW_PACKET_CAPTURE_ENABLED pattern). Both default off.

Works on SQLite and Postgres: DB-agnostic queries, an Alembic batch migration
for the three new columns + two indexes, and backend-aware collector test
fixtures (lifted db_backend/db_url into the shared conftest).

Also: move the meshcore-hub image pull_policy out of the base compose file.
It lived in docker-compose.yml as pull_policy: daily and made `make up` pull
the published image over a freshly built local one. Base is now policy-neutral
(default missing); dev sets pull_policy: build on the hub services so it only
ever uses local builds. Prod refreshes images via a manual `docker compose
... pull`.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 00:11:39 +01:00

573 lines
16 KiB
Python

"""API test fixtures.
The ``db_backend`` / ``db_url`` / ``test_db_path`` fixtures that drive the
SQLite-vs-Postgres switch live in the shared ``tests/conftest.py`` so the
collector suite can reuse the same backend; they are inherited here.
"""
from contextlib import contextmanager
from datetime import datetime, timezone
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine, event as sa_event
from sqlalchemy.orm import sessionmaker
from meshcore_hub.api.app import create_app
from meshcore_hub.api.dependencies import (
get_db_session,
get_mqtt_client,
get_db_manager,
)
from meshcore_hub.common.database import DatabaseManager, create_database_engine
from meshcore_hub.common.models import (
Advertisement,
Base,
Channel,
EventObserver,
Message,
Node,
NodeTag,
Telemetry,
TracePath,
UserProfile,
UserProfileNode,
)
@pytest.fixture(scope="session")
def api_db_engine(db_url: str, db_backend: str):
"""Session-scoped database engine. Schema is built once per pytest session.
For Postgres, uses the production ``create_database_engine`` factory so the
test exercises the same ``connect_args`` (including ``-ctimezone=UTC``).
Each xdist worker has its own database (see ``db_url``), so no locking
is needed.
"""
if db_backend == "postgres":
engine = create_database_engine(db_url)
Base.metadata.drop_all(engine)
else:
engine = create_engine(
db_url,
connect_args={"check_same_thread": False},
)
@sa_event.listens_for(engine, "connect")
def set_sqlite_pragma(
dbapi_connection: object, connection_record: object
) -> None:
cursor = dbapi_connection.cursor() # type: ignore[attr-defined]
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
Base.metadata.create_all(engine)
yield engine
Base.metadata.drop_all(engine)
engine.dispose()
def _truncate_all(engine) -> None:
"""Delete rows from every table in child-first order (FK-safe)."""
with engine.begin() as conn:
for table in reversed(Base.metadata.sorted_tables):
conn.execute(table.delete())
@pytest.fixture
def api_db_session(api_db_engine):
"""Per-test session bound to the shared session-scoped engine.
Rows are truncated at teardown so each test starts with empty tables
without paying schema-build cost. Tests must ``commit()`` their seed
data before invoking the test client (the sample_* fixtures already do).
"""
Session = sessionmaker(bind=api_db_engine)
session = Session()
yield session
session.close()
_truncate_all(api_db_engine)
@pytest.fixture(scope="session")
def mock_mqtt():
"""Session-scoped mock MQTT client (no per-test state)."""
from unittest.mock import MagicMock
mock = MagicMock()
mock.connect.return_value = None
mock.start_background.return_value = None
mock.stop.return_value = None
mock.disconnect.return_value = None
mock.publish_command.return_value = None
return mock
@pytest.fixture(scope="session")
def mock_db_manager(api_db_engine):
"""Session-scoped mock database manager backed by the shared engine."""
from unittest.mock import MagicMock
manager = MagicMock(spec=DatabaseManager)
Session = sessionmaker(bind=api_db_engine)
manager.get_session = lambda: Session()
@contextmanager
def _session_scope():
session = Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
manager.session_scope = _session_scope
return manager
@pytest.fixture(autouse=True)
def _isolate_db_global(monkeypatch: pytest.MonkeyPatch, mock_db_manager) -> None:
"""Pin ``meshcore_hub.api.app._db_manager`` to the mock for every test.
Function-scoped autouse so tests that mutate the global (e.g. the lifespan
tests in ``test_cache.py``) cannot leak state to siblings. ``monkeypatch``
restores the prior value on exit.
"""
import meshcore_hub.api.app as app_module
monkeypatch.setattr(app_module, "_db_manager", mock_db_manager)
def _wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager) -> None:
"""Install the standard DB/MQTT dependency overrides on ``app``."""
Session = sessionmaker(bind=api_db_engine)
def override_get_db_manager(request=None):
return mock_db_manager
def override_get_db_session():
session = Session()
try:
yield session
finally:
session.close()
def override_get_mqtt_client(request=None):
return mock_mqtt
app.dependency_overrides[get_db_manager] = override_get_db_manager
app.dependency_overrides[get_db_session] = override_get_db_session
app.dependency_overrides[get_mqtt_client] = override_get_mqtt_client
@pytest.fixture(scope="module")
def app_no_auth(db_url, api_db_engine, mock_mqtt, mock_db_manager):
"""Module-scoped FastAPI app with no authentication.
Built once per test module; ``create_app`` is the second-most expensive
setup step (~0.3s), so sharing it across a module is a major win. The
``_isolate_db_global`` autouse fixture handles the global ``_db_manager``
so this fixture doesn't need a ``with patch(...)`` context.
"""
app = create_app(
database_url=db_url,
read_key=None,
admin_key=None,
)
_wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager)
yield app
@pytest.fixture(scope="module")
def app_with_auth(db_url, api_db_engine, mock_mqtt, mock_db_manager):
"""Module-scoped FastAPI app with authentication enabled."""
app = create_app(
database_url=db_url,
read_key="test-read-key",
admin_key="test-admin-key",
)
_wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager)
yield app
@pytest.fixture(scope="module")
def app_spam(db_url, api_db_engine, mock_mqtt, mock_db_manager):
"""Module-scoped app with spam detection enabled (no auth)."""
app = create_app(
database_url=db_url,
read_key=None,
admin_key=None,
spam_detection_enabled=True,
spam_score_threshold=0.6,
)
_wire_overrides(app, api_db_engine, mock_mqtt, mock_db_manager)
yield app
@pytest.fixture
def client_spam(app_spam) -> TestClient:
"""Test client with spam detection enabled."""
return TestClient(app_spam, raise_server_exceptions=True)
@pytest.fixture
def client_no_auth(app_no_auth) -> TestClient:
"""Test client with no authentication."""
return TestClient(app_no_auth, raise_server_exceptions=True)
@pytest.fixture
def client_with_auth(app_with_auth) -> TestClient:
"""Test client with authentication enabled."""
return TestClient(app_with_auth, raise_server_exceptions=True)
@pytest.fixture
def sample_node(api_db_session):
"""Create a sample node in the database."""
node = Node(
public_key="abc123def456abc123def456abc123de",
name="Test Node",
adv_type="REPEATER",
first_seen=datetime.now(timezone.utc),
last_seen=datetime.now(timezone.utc),
)
api_db_session.add(node)
api_db_session.commit()
api_db_session.refresh(node)
return node
@pytest.fixture
def sample_node_tag(api_db_session, sample_node):
"""Create a sample node tag in the database."""
tag = NodeTag(
node_id=sample_node.id,
key="environment",
value="production",
)
api_db_session.add(tag)
api_db_session.commit()
api_db_session.refresh(tag)
return tag
@pytest.fixture
def sample_message(api_db_session):
"""Create a sample message in the database."""
message = Message(
message_type="direct",
pubkey_prefix="abc123",
text="Hello World",
received_at=datetime.now(timezone.utc),
)
api_db_session.add(message)
api_db_session.commit()
api_db_session.refresh(message)
return message
@pytest.fixture
def sample_advertisement(api_db_session):
"""Create a sample advertisement in the database."""
advert = Advertisement(
public_key="abc123def456abc123def456abc123de",
name="TestNode",
adv_type="REPEATER",
received_at=datetime.now(timezone.utc),
)
api_db_session.add(advert)
api_db_session.commit()
api_db_session.refresh(advert)
return advert
@pytest.fixture
def sample_telemetry(api_db_session):
"""Create a sample telemetry record in the database."""
telemetry = Telemetry(
node_public_key="abc123def456abc123def456abc123de",
parsed_data={
"battery_level": 85.5,
"temperature": 25.3,
},
received_at=datetime.now(timezone.utc),
)
api_db_session.add(telemetry)
api_db_session.commit()
api_db_session.refresh(telemetry)
return telemetry
@pytest.fixture
def sample_trace_path(api_db_session):
"""Create a sample trace path in the database."""
trace = TracePath(
initiator_tag=12345,
path_hashes=["abc123", "def456", "ghi789"],
hop_count=3,
received_at=datetime.now(timezone.utc),
)
api_db_session.add(trace)
api_db_session.commit()
api_db_session.refresh(trace)
return trace
@pytest.fixture
def receiver_node(api_db_session):
"""Create a receiver node in the database."""
node = Node(
public_key="receiver123receiver123receiver12",
name="Receiver Node",
adv_type="REPEATER",
first_seen=datetime.now(timezone.utc),
last_seen=datetime.now(timezone.utc),
)
api_db_session.add(node)
api_db_session.commit()
api_db_session.refresh(node)
return node
@pytest.fixture
def sample_message_with_receiver(api_db_session, receiver_node):
"""Create a message with a receiver node."""
event_hash = "deadbeefdeadbeefdeadbeefdeadbe01"
message = Message(
message_type="channel",
channel_idx=17,
pubkey_prefix="xyz789",
text="Channel message with receiver",
received_at=datetime.now(timezone.utc),
observer_node_id=receiver_node.id,
event_hash=event_hash,
)
api_db_session.add(message)
api_db_session.commit()
api_db_session.refresh(message)
api_db_session.add(
EventObserver(
event_type="message",
event_hash=event_hash,
observer_node_id=receiver_node.id,
observed_at=datetime.now(timezone.utc),
)
)
api_db_session.commit()
return message
@pytest.fixture
def sample_advertisement_with_receiver(api_db_session, sample_node, receiver_node):
"""Create an advertisement with source and receiver nodes."""
event_hash = "deadbeefdeadbeefdeadbeefdeadbe02"
advert = Advertisement(
public_key=sample_node.public_key,
name="SourceNode",
adv_type="REPEATER",
received_at=datetime.now(timezone.utc),
node_id=sample_node.id,
observer_node_id=receiver_node.id,
event_hash=event_hash,
)
api_db_session.add(advert)
api_db_session.commit()
api_db_session.refresh(advert)
api_db_session.add(
EventObserver(
event_type="advertisement",
event_hash=event_hash,
observer_node_id=receiver_node.id,
observed_at=datetime.now(timezone.utc),
)
)
api_db_session.commit()
return advert
@pytest.fixture
def sample_telemetry_with_receiver(api_db_session, receiver_node):
"""Create a telemetry record with a receiver node."""
event_hash = "deadbeefdeadbeefdeadbeefdeadbe03"
telemetry = Telemetry(
node_public_key="xyz789xyz789xyz789xyz789xyz789xy",
parsed_data={"battery_level": 50.0},
received_at=datetime.now(timezone.utc),
observer_node_id=receiver_node.id,
event_hash=event_hash,
)
api_db_session.add(telemetry)
api_db_session.commit()
api_db_session.refresh(telemetry)
api_db_session.add(
EventObserver(
event_type="telemetry",
event_hash=event_hash,
observer_node_id=receiver_node.id,
observed_at=datetime.now(timezone.utc),
)
)
api_db_session.commit()
return telemetry
@pytest.fixture
def sample_trace_path_with_receiver(api_db_session, receiver_node):
"""Create a trace path with a receiver node."""
event_hash = "deadbeefdeadbeefdeadbeefdeadbe04"
trace = TracePath(
initiator_tag=99999,
path_hashes=["aaa111", "bbb222"],
hop_count=2,
received_at=datetime.now(timezone.utc),
observer_node_id=receiver_node.id,
event_hash=event_hash,
)
api_db_session.add(trace)
api_db_session.commit()
api_db_session.refresh(trace)
api_db_session.add(
EventObserver(
event_type="trace",
event_hash=event_hash,
observer_node_id=receiver_node.id,
observed_at=datetime.now(timezone.utc),
)
)
api_db_session.commit()
return trace
@pytest.fixture
def sample_node_with_name_tag(api_db_session):
"""Create a node with a name tag for search testing."""
node = Node(
public_key="searchable123searchable123searc",
name="Original Name",
adv_type="CLIENT",
first_seen=datetime.now(timezone.utc),
)
api_db_session.add(node)
api_db_session.commit()
tag = NodeTag(
node_id=node.id,
key="name",
value="Friendly Search Name",
)
api_db_session.add(tag)
api_db_session.commit()
api_db_session.refresh(node)
return node
@pytest.fixture
def sample_user_profile(api_db_session):
"""Create a sample user profile in the database."""
profile = UserProfile(
user_id="oidc-user-123",
name="Test User",
callsign="W1TEST",
)
api_db_session.add(profile)
api_db_session.commit()
api_db_session.refresh(profile)
return profile
@pytest.fixture
def sample_operator_profile(api_db_session):
"""Create a sample operator user profile for tag editing tests."""
profile = UserProfile(
user_id="operator-123",
name="Test Operator",
)
api_db_session.add(profile)
api_db_session.commit()
api_db_session.refresh(profile)
return profile
@pytest.fixture
def sample_operator_adoption(api_db_session, sample_operator_profile, sample_node):
"""Create an adoption record linking operator to the sample node."""
association = UserProfileNode(
user_profile_id=sample_operator_profile.id,
node_id=sample_node.id,
)
api_db_session.add(association)
api_db_session.commit()
api_db_session.refresh(association)
return association
@pytest.fixture
def sample_adopted_node(api_db_session, sample_user_profile, sample_node):
"""Create a sample adopted node association."""
association = UserProfileNode(
user_profile_id=sample_user_profile.id,
node_id=sample_node.id,
)
api_db_session.add(association)
api_db_session.commit()
api_db_session.refresh(association)
return association
@pytest.fixture
def sample_channel(api_db_session):
"""Create a sample community channel in the database."""
channel = Channel(
name="TestChannel",
key_hex="AABBCCDDEEFF00112233445566778899",
channel_hash=Channel.compute_channel_hash("AABBCCDDEEFF00112233445566778899"),
visibility="community",
enabled=True,
)
api_db_session.add(channel)
api_db_session.commit()
api_db_session.refresh(channel)
return channel
@pytest.fixture
def sample_member_channel(api_db_session):
"""Create a sample member-only channel in the database."""
key = "11223344556677889900AABBCCDDEEFF"
channel = Channel(
name="MemberChannel",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility="member",
enabled=True,
)
api_db_session.add(channel)
api_db_session.commit()
api_db_session.refresh(channel)
return channel
@pytest.fixture
def sample_admin_channel(api_db_session):
"""Create a sample admin-only channel in the database."""
key = "FFEEDDCCBBAA99887766554433221100"
channel = Channel(
name="AdminChannel",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility="admin",
enabled=True,
)
api_db_session.add(channel)
api_db_session.commit()
api_db_session.refresh(channel)
return channel