Files
meshcore-hub/tests/test_api/test_raw_packets.py
T
Louis King b5b6872060 test: raise patch coverage for raw packets
Add tests covering the previously-uncovered new lines flagged by Codecov:
- /packets route: search, packet_type, channel_idx, route_type, observed_by,
  decryptable (both), max_snr, path-len ranges, since/until, observer-tag
  hydration, detail 404, ascending sort, and the role-aware cache key builder.
- store_raw_packet: existing-observer update branch, senderPublicKey source
  fallback, and the no-source-prefix case.
- normalizer: advertisement payload carries the wire packet_hash.
- CollectorSettings: raw-packet retention default/override and capture default.

Also stub the test decoder via setattr so mypy's method-assign analysis is
deterministic across incremental-cache states.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-12 23:07:43 +01:00

398 lines
15 KiB
Python

"""Tests for raw packet API routes."""
from datetime import datetime, timezone
import pytest
from meshcore_hub.common.models import Channel, Node, NodeTag, RawPacket
class TestListRawPackets:
"""Tests for GET /packets endpoint."""
def test_list_empty(self, client_no_auth):
response = client_no_auth.get("/api/v1/packets")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["total"] == 0
def test_list_with_data(self, client_no_auth, api_db_session):
api_db_session.add(
RawPacket(
raw_hex="0011",
packet_hash="h1",
event_type="letsmesh_packet",
received_at=datetime.now(timezone.utc),
)
)
api_db_session.commit()
response = client_no_auth.get("/api/v1/packets")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
item = data["items"][0]
assert item["raw_hex"] == "0011"
assert item["redacted"] is False
def test_filter_event_type(self, client_no_auth, api_db_session):
now = datetime.now(timezone.utc)
api_db_session.add_all(
[
RawPacket(raw_hex="00", event_type="advertisement", received_at=now),
RawPacket(raw_hex="11", event_type="channel_msg_recv", received_at=now),
]
)
api_db_session.commit()
response = client_no_auth.get("/api/v1/packets?event_type=advertisement")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert data["items"][0]["event_type"] == "advertisement"
def test_filter_source_prefix_and_snr(self, client_no_auth, api_db_session):
now = datetime.now(timezone.utc)
api_db_session.add_all(
[
RawPacket(
raw_hex="00",
source_pubkey_prefix="AABBCCDDEEFF",
snr=10.0,
received_at=now,
),
RawPacket(
raw_hex="11",
source_pubkey_prefix="112233445566",
snr=2.0,
received_at=now,
),
]
)
api_db_session.commit()
response = client_no_auth.get(
"/api/v1/packets?pubkey_prefix=AABBCCDDEEFF&min_snr=5"
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert data["items"][0]["source_pubkey_prefix"] == "AABBCCDDEEFF"
def test_pagination_params_echoed(self, client_no_auth):
response = client_no_auth.get("/api/v1/packets?limit=25&offset=5")
assert response.status_code == 200
data = response.json()
assert data["limit"] == 25
assert data["offset"] == 5
def test_filter_by_exact_packet_hash(self, client_no_auth, api_db_session):
now = datetime.now(timezone.utc)
api_db_session.add_all(
[
RawPacket(raw_hex="00", packet_hash="HASH_AAA", received_at=now),
RawPacket(raw_hex="11", packet_hash="HASH_AAA", received_at=now),
RawPacket(raw_hex="22", packet_hash="HASH_BBB", received_at=now),
]
)
api_db_session.commit()
response = client_no_auth.get("/api/v1/packets?packet_hash=HASH_AAA")
assert response.status_code == 200
data = response.json()
assert data["total"] == 2
assert {i["packet_hash"] for i in data["items"]} == {"HASH_AAA"}
class TestRawPacketRedaction:
"""Tests for channel-visibility redaction of raw packets."""
@pytest.fixture
def packets_with_visibility(self, api_db_session):
"""Community-channel, admin-channel, and non-channel raw packets."""
pub_key = "AABBCCDDEEFF00112233445566778899"
adm_key = "FFEEDDCCBBAA99887766554433221100"
pub_idx = int(Channel.compute_channel_hash(pub_key), 16)
adm_idx = int(Channel.compute_channel_hash(adm_key), 16)
api_db_session.add_all(
[
Channel(
name="CommunityCh",
key_hex=pub_key,
channel_hash=Channel.compute_channel_hash(pub_key),
visibility="community",
enabled=True,
),
Channel(
name="AdminCh",
key_hex=adm_key,
channel_hash=Channel.compute_channel_hash(adm_key),
visibility="admin",
enabled=True,
),
]
)
now = datetime.now(timezone.utc)
pub_pkt = RawPacket(
raw_hex="C0FFEE",
event_type="channel_msg_recv",
channel_idx=pub_idx,
decoded={"payload": {"decoded": {"channelHash": "x"}}},
received_at=now,
)
adm_pkt = RawPacket(
raw_hex="ADADAD",
event_type="channel_msg_recv",
channel_idx=adm_idx,
decoded={"payload": {"decoded": {"channelHash": "y"}}},
received_at=now,
)
plain_pkt = RawPacket(
raw_hex="DEADBE",
event_type="advertisement",
channel_idx=None,
received_at=now,
)
api_db_session.add_all([pub_pkt, adm_pkt, plain_pkt])
api_db_session.commit()
return pub_pkt, adm_pkt, plain_pkt
def test_anonymous_redacts_admin_channel(
self, client_no_auth, packets_with_visibility
):
"""Anonymous sees the admin-channel packet metadata-only (redacted)."""
response = client_no_auth.get("/api/v1/packets")
assert response.status_code == 200
data = response.json()
# All three rows are returned (count stable), but the admin one is redacted.
assert data["total"] == 3
adm = next(i for i in data["items"] if i["raw_hex"] is None)
assert adm["redacted"] is True
assert adm["decoded"] is None
# Non-channel and community packets are not redacted.
assert any(
i["raw_hex"] == "DEADBE" and not i["redacted"] for i in data["items"]
)
assert any(
i["raw_hex"] == "C0FFEE" and not i["redacted"] for i in data["items"]
)
def test_admin_sees_full(self, client_no_auth, packets_with_visibility):
"""Admin role sees the admin-channel packet in full."""
response = client_no_auth.get(
"/api/v1/packets", headers={"X-User-Roles": "admin"}
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 3
assert all(not i["redacted"] for i in data["items"])
assert any(i["raw_hex"] == "ADADAD" for i in data["items"])
def test_redacted_filter(self, client_no_auth, packets_with_visibility):
"""The redacted filter narrows to (only|excluding) redacted rows."""
only = client_no_auth.get("/api/v1/packets?redacted=true").json()
assert only["total"] == 1
assert only["items"][0]["redacted"] is True
without = client_no_auth.get("/api/v1/packets?redacted=false").json()
assert without["total"] == 2
assert all(not i["redacted"] for i in without["items"])
def test_count_stable_across_roles(self, client_no_auth, packets_with_visibility):
"""Pagination count is the same regardless of role (rows redacted not hidden)."""
anon = client_no_auth.get("/api/v1/packets").json()
admin = client_no_auth.get(
"/api/v1/packets", headers={"X-User-Roles": "admin"}
).json()
assert anon["total"] == admin["total"] == 3
def test_get_single_redacted(self, client_no_auth, packets_with_visibility):
"""GET /packets/{id} redacts an above-role channel packet (not 404)."""
_, adm_pkt, _ = packets_with_visibility
response = client_no_auth.get(f"/api/v1/packets/{adm_pkt.id}")
assert response.status_code == 200
data = response.json()
assert data["redacted"] is True
assert data["raw_hex"] is None
def test_get_single_visible_to_admin(self, client_no_auth, packets_with_visibility):
_, adm_pkt, _ = packets_with_visibility
response = client_no_auth.get(
f"/api/v1/packets/{adm_pkt.id}", headers={"X-User-Roles": "admin"}
)
assert response.status_code == 200
data = response.json()
assert data["redacted"] is False
assert data["raw_hex"] == "ADADAD"
class TestRawPacketFiltersAndHydration:
"""Cover the remaining filters, observer hydration, detail, and key builder."""
def test_search_packet_type_channel_route(self, client_no_auth, api_db_session):
now = datetime.now(timezone.utc)
api_db_session.add_all(
[
RawPacket(
raw_hex="AA",
packet_hash="findme",
packet_type=5,
channel_idx=7,
route_type="flood",
received_at=now,
),
RawPacket(
raw_hex="BB",
packet_type=2,
channel_idx=9,
route_type="direct",
received_at=now,
),
]
)
api_db_session.commit()
assert client_no_auth.get("/api/v1/packets?search=findme").json()["total"] == 1
assert client_no_auth.get("/api/v1/packets?packet_type=5").json()["total"] == 1
assert client_no_auth.get("/api/v1/packets?channel_idx=7").json()["total"] == 1
assert (
client_no_auth.get("/api/v1/packets?route_type=flood").json()["total"] == 1
)
assert client_no_auth.get("/api/v1/packets?route_type=all").json()["total"] == 2
def test_snr_and_path_len_ranges(self, client_no_auth, api_db_session):
now = datetime.now(timezone.utc)
api_db_session.add_all(
[
RawPacket(raw_hex="AA", snr=1.0, path_len=1, received_at=now),
RawPacket(raw_hex="BB", snr=9.0, path_len=5, received_at=now),
]
)
api_db_session.commit()
assert client_no_auth.get("/api/v1/packets?max_snr=3").json()["total"] == 1
assert client_no_auth.get("/api/v1/packets?min_path_len=3").json()["total"] == 1
assert client_no_auth.get("/api/v1/packets?max_path_len=3").json()["total"] == 1
def test_since_until_window(self, client_no_auth, api_db_session):
old = datetime(2020, 1, 1, tzinfo=timezone.utc)
recent = datetime.now(timezone.utc)
api_db_session.add_all(
[
RawPacket(raw_hex="AA", received_at=old),
RawPacket(raw_hex="BB", received_at=recent),
]
)
api_db_session.commit()
ts = "2021-01-01T00:00:00+00:00"
assert (
client_no_auth.get("/api/v1/packets", params={"since": ts}).json()["total"]
== 1
)
assert (
client_no_auth.get("/api/v1/packets", params={"until": ts}).json()["total"]
== 1
)
def test_decryptable_filter(self, client_no_auth, api_db_session):
now = datetime.now(timezone.utc)
api_db_session.add_all(
[
RawPacket(
raw_hex="AA",
decoded={"payload": {"decoded": {"decrypted": {"message": "hi"}}}},
received_at=now,
),
RawPacket(raw_hex="BB", decoded={"payload": {}}, received_at=now),
]
)
api_db_session.commit()
yes = client_no_auth.get("/api/v1/packets?decryptable=true").json()
assert yes["total"] == 1
assert yes["items"][0]["raw_hex"] == "AA"
no = client_no_auth.get("/api/v1/packets?decryptable=false").json()
assert no["total"] == 1
assert no["items"][0]["raw_hex"] == "BB"
def test_observer_hydration_and_filter(self, client_no_auth, api_db_session):
obs_key = "ab" * 32 # Node lowercases its public key
node = Node(public_key=obs_key, name="ObsNode")
api_db_session.add(node)
api_db_session.flush()
api_db_session.add(NodeTag(node_id=node.id, key="name", value="TaggedObs"))
api_db_session.add(
RawPacket(
raw_hex="AA",
observer_node_id=node.id,
received_at=datetime.now(timezone.utc),
)
)
api_db_session.commit()
data = client_no_auth.get("/api/v1/packets").json()
assert data["total"] == 1
item = data["items"][0]
assert item["observed_by"] == obs_key
assert item["observer_name"] == "ObsNode"
assert item["observer_tag_name"] == "TaggedObs"
# observed_by filter
filtered = client_no_auth.get(
"/api/v1/packets", params={"observed_by": obs_key}
).json()
assert filtered["total"] == 1
def test_sort_by_snr_ascending(self, client_no_auth, api_db_session):
now = datetime.now(timezone.utc)
api_db_session.add_all(
[
RawPacket(raw_hex="LO", snr=1.0, received_at=now),
RawPacket(raw_hex="HI", snr=9.0, received_at=now),
]
)
api_db_session.commit()
data = client_no_auth.get("/api/v1/packets?sort=snr&order=asc").json()
assert [i["raw_hex"] for i in data["items"]] == ["LO", "HI"]
def test_get_detail_with_observer(self, client_no_auth, api_db_session):
node = Node(public_key="M" * 64, name="DetailObs")
api_db_session.add(node)
api_db_session.flush()
api_db_session.add(NodeTag(node_id=node.id, key="name", value="DetailTag"))
pkt = RawPacket(
raw_hex="AA",
observer_node_id=node.id,
received_at=datetime.now(timezone.utc),
)
api_db_session.add(pkt)
api_db_session.commit()
data = client_no_auth.get(f"/api/v1/packets/{pkt.id}").json()
assert data["observer_tag_name"] == "DetailTag"
def test_get_detail_404(self, client_no_auth):
response = client_no_auth.get("/api/v1/packets/does-not-exist")
assert response.status_code == 404
def test_packets_key_builder_anonymous(self):
from starlette.requests import Request
from meshcore_hub.api.routes.raw_packets import _packets_key_builder
request = Request(
{
"type": "http",
"method": "GET",
"headers": [],
"query_string": b"limit=50&sort=time",
}
)
key = _packets_key_builder(request)
assert key.startswith("packets:role=anonymous:")
assert "limit=50" in key