Files
meshcore-hub/tests/test_common/test_models.py
T
Louis King 76f3dfa7eb feat: raw packet capture, browse, and classification (v0.13.0)
Add a first-class Raw Packets feature that captures every inbound MeshCore
packet from the LetsMesh `packets` feed exactly as received, independent of
how the collector later classifies it.

Capture & storage
- New `RawPacket` model + migration (raw_packets table) with single and
  composite indexes for the dominant filter-then-sort-by-newest queries.
- Collector-side `RAW_PACKET_CAPTURE_ENABLED` flag (default off); capture hook
  reuses the decoder's per-hex cache (no second decode), one row per observer
  reception, never blocks event dispatch.
- Separate `RAW_PACKET_RETENTION_DAYS` (falls back to DATA_RETENTION_DAYS);
  cleanup runs regardless of capture so disabling drains the table. Raw-packet
  observers retained in the is_observer recompute union.

API
- `GET /packets` and `/packets/{id}` with rich filtering, role-aware Redis
  cache key, and channel-visibility redaction (restricted-channel packets are
  returned metadata-only, not hidden, so pagination counts stay stable).

Web
- `FEATURE_PACKETS` flag (default off). Responsive Packets page (table desktop,
  cards mobile) plus a Packet Detail page (breadcrumb nav, raw hex + decoded).
- Nav entry after Messages on all three surfaces; home.js reordered so Map
  precedes Members; new packets icon + colour.

Finer-grained classification
- Replace the single `letsmesh_packet` catch-all with per-payload-type event
  types (req, ack, encrypted_direct, encrypted_channel, grp_data, multipart,
  control, raw_custom, ...); letsmesh_packet kept only as the unresolved-type
  safety net.

Link from structured tables
- Add `packet_hash` to advertisements and messages (populated at ingest);
  exact `packet_hash` filter on /packets; cube-icon link on the Adverts and
  Messages lists -> /packets?packet_hash=<hash>, shown only when the feature is
  on and the row has a stored hash.

Docs/config: .env.example, docker-compose (collector + web), AGENTS.md,
SCHEMAS.md, docs/letsmesh.md, docs/upgrading.md (## v0.13.0), en/nl i18n, and a
plan/tasks doc under docs/plans/.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-12 22:40:31 +01:00

342 lines
10 KiB
Python

"""Tests for database models."""
import pytest
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
from meshcore_hub.common.models import (
Base,
Node,
NodeTag,
Message,
Advertisement,
TracePath,
Telemetry,
EventLog,
EventObserver,
RawPacket,
add_event_observer,
)
@pytest.fixture
def db_session():
"""Create an in-memory SQLite database session."""
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(engine)
engine.dispose()
class TestNodeModel:
"""Tests for Node model."""
def test_create_node(self, db_session) -> None:
"""Test creating a node."""
node = Node(
public_key="a" * 64,
name="Test Node",
adv_type="chat",
flags=218,
)
db_session.add(node)
db_session.commit()
assert node.id is not None
assert node.public_key == "a" * 64
assert node.name == "Test Node"
assert node.adv_type == "chat"
assert node.flags == 218
def test_node_tags_relationship(self, db_session) -> None:
"""Test node-tag relationship."""
node = Node(public_key="b" * 64, name="Tagged Node")
tag = NodeTag(key="altitude", value="150", value_type="number")
node.tags.append(tag)
db_session.add(node)
db_session.commit()
assert len(node.tags) == 1
assert node.tags[0].key == "altitude"
class TestRawPacketModel:
"""Tests for RawPacket model."""
def test_create_raw_packet(self, db_session) -> None:
"""Test creating a raw packet with defaults."""
packet = RawPacket(
packet_hash="abc123",
raw_hex="0011223344",
packet_type=5,
payload_type=4,
event_type="channel_msg_recv",
channel_idx=42,
source_pubkey_prefix="01AB2186C4D5",
route_type="flood",
path_len=2,
snr=12.5,
decoded={"payloadType": 4},
)
db_session.add(packet)
db_session.commit()
assert packet.id is not None
assert packet.received_at is not None
assert packet.created_at is not None
assert packet.updated_at is not None
assert packet.event_type == "channel_msg_recv"
assert packet.channel_idx == 42
assert packet.decoded == {"payloadType": 4}
def test_raw_packet_nullable_columns(self, db_session) -> None:
"""Test that the optional columns accept None."""
packet = RawPacket()
db_session.add(packet)
db_session.commit()
assert packet.id is not None
assert packet.observer_node_id is None
assert packet.packet_hash is None
assert packet.raw_hex is None
assert packet.channel_idx is None
assert packet.source_pubkey_prefix is None
assert packet.decoded is None
def test_raw_packet_indexes(self) -> None:
"""Test that the expected indexes are declared on the table."""
index_names = {idx.name for idx in RawPacket.__table__.indexes} # type: ignore[attr-defined]
for expected in (
"ix_raw_packets_received_at",
"ix_raw_packets_event_type",
"ix_raw_packets_packet_hash",
"ix_raw_packets_channel_idx",
"ix_raw_packets_source_pubkey_prefix",
"ix_raw_packets_observer_node_id",
"ix_raw_packets_event_type_received_at",
"ix_raw_packets_channel_idx_received_at",
"ix_raw_packets_source_pubkey_prefix_received_at",
):
assert expected in index_names
class TestMessageModel:
"""Tests for Message model."""
def test_create_contact_message(self, db_session) -> None:
"""Test creating a contact message."""
message = Message(
message_type="contact",
pubkey_prefix="01ab2186c4d5",
text="Hello World!",
path_len=3,
snr=15.5,
)
db_session.add(message)
db_session.commit()
assert message.id is not None
assert message.message_type == "contact"
assert message.text == "Hello World!"
def test_create_channel_message(self, db_session) -> None:
"""Test creating a channel message."""
message = Message(
message_type="channel",
channel_idx=4,
text="Channel broadcast",
path_len=10,
)
db_session.add(message)
db_session.commit()
assert message.channel_idx == 4
assert message.message_type == "channel"
class TestAdvertisementModel:
"""Tests for Advertisement model."""
def test_create_advertisement(self, db_session) -> None:
"""Test creating an advertisement."""
ad = Advertisement(
public_key="c" * 64,
name="Repeater-01",
adv_type="repeater",
flags=128,
)
db_session.add(ad)
db_session.commit()
assert ad.id is not None
assert ad.public_key == "c" * 64
assert ad.adv_type == "repeater"
class TestTracePathModel:
"""Tests for TracePath model."""
def test_create_trace_path(self, db_session) -> None:
"""Test creating a trace path."""
trace = TracePath(
initiator_tag=123456789,
path_len=3,
path_hashes=["4a", "b3", "fa"],
snr_values=[25.3, 18.7, 12.4],
hop_count=3,
)
db_session.add(trace)
db_session.commit()
assert trace.id is not None
assert trace.initiator_tag == 123456789
assert trace.path_hashes == ["4a", "b3", "fa"]
def test_multibyte_path_hashes_round_trip(self, db_session) -> None:
"""Test that multibyte (4-char) path hashes round-trip correctly."""
path_hashes = ["4a2b", "b3fa", "02cd"]
trace = TracePath(
initiator_tag=987654321,
path_len=3,
path_hashes=path_hashes,
snr_values=[20.0, 15.0, 10.0],
hop_count=3,
)
db_session.add(trace)
db_session.commit()
# Expire cached attributes to force reload from database
db_session.expire(trace)
assert trace.path_hashes == ["4a2b", "b3fa", "02cd"]
assert len(trace.path_hashes) == 3
def test_mixed_length_path_hashes_round_trip(self, db_session) -> None:
"""Test that mixed-length path hashes round-trip correctly."""
path_hashes = ["4a", "b3fa", "02"]
trace = TracePath(
initiator_tag=111222333,
path_len=3,
path_hashes=path_hashes,
snr_values=[22.0, 17.5, 11.0],
hop_count=3,
)
db_session.add(trace)
db_session.commit()
# Expire cached attributes to force reload from database
db_session.expire(trace)
assert trace.path_hashes == ["4a", "b3fa", "02"]
assert len(trace.path_hashes) == 3
class TestTelemetryModel:
"""Tests for Telemetry model."""
def test_create_telemetry(self, db_session) -> None:
"""Test creating a telemetry record."""
telemetry = Telemetry(
node_public_key="d" * 64,
parsed_data={
"temperature": 22.5,
"humidity": 65,
"battery": 3.8,
},
)
db_session.add(telemetry)
db_session.commit()
assert telemetry.id is not None
assert telemetry.parsed_data is not None
assert telemetry.parsed_data["temperature"] == 22.5
class TestEventLogModel:
"""Tests for EventLog model."""
def test_create_event_log(self, db_session) -> None:
"""Test creating an event log entry."""
event = EventLog(
event_type="BATTERY",
payload={
"battery_voltage": 3.8,
"battery_percentage": 75,
},
)
db_session.add(event)
db_session.commit()
assert event.id is not None
assert event.event_type == "BATTERY"
assert event.payload is not None
assert event.payload["battery_percentage"] == 75
class TestEventObserverModel:
"""Tests for EventObserver model and add_event_observer helper."""
def test_add_event_observer_stores_path_len(self, db_session) -> None:
"""add_event_observer accepts and stores path_len."""
node = Node(public_key="a" * 64, name="Observer")
db_session.add(node)
db_session.commit()
result = add_event_observer(
session=db_session,
event_type="message",
event_hash="abcdef12",
observer_node_id=node.id,
snr=10.5,
path_len=3,
)
assert result is True
observer = db_session.execute(select(EventObserver)).scalar_one()
assert observer.snr == 10.5
assert observer.path_len == 3
def test_add_event_observer_path_len_defaults_none(self, db_session) -> None:
"""add_event_observer defaults path_len to None when not provided."""
node = Node(public_key="b" * 64, name="Observer2")
db_session.add(node)
db_session.commit()
result = add_event_observer(
session=db_session,
event_type="trace",
event_hash="12345678",
observer_node_id=node.id,
)
assert result is True
observer = db_session.execute(select(EventObserver)).scalar_one()
assert observer.path_len is None
assert observer.snr is None
def test_add_event_observer_sets_is_observer_flag(self, db_session) -> None:
"""Observing an event marks the observer node with is_observer=True."""
node = Node(public_key="c" * 64, name="Observer3")
db_session.add(node)
db_session.commit()
assert node.is_observer is False
add_event_observer(
session=db_session,
event_type="advertisement",
event_hash="deadbeef",
observer_node_id=node.id,
)
db_session.commit()
db_session.refresh(node)
assert node.is_observer is True