Add packet-groups API endpoint for deduplicated packet list

- New GroupedPacketRead/PacketReceptionInfo/GroupedPacketList schemas
- GET /api/v1/packet-groups: two-phase GROUP BY query, 7-day default
  window, lightweight Phase 2 (no raw_hex/decoded), role-aware cache
- GET /api/v1/packet-groups/{hash}: full reception list with path_hashes
  extracted from decoded.payload.decoded.pathHashes
- New (packet_hash, received_at) composite index via Alembic migration
- i18n keys for receptions, path, observer counts

https://claude.ai/code/session_01NH2rZzuHzasJj12SZeRhbJ
This commit is contained in:
Claude
2026-06-13 07:48:40 +00:00
parent d12e209b8a
commit e3e7cb26da
6 changed files with 384 additions and 1 deletions
@@ -0,0 +1,30 @@
"""add packet_hash_received_at composite index
Revision ID: a1b2c3d4e5f6
Revises: e9f0c4079540
Create Date: 2026-06-13 07:30:00.000000+00:00
"""
from typing import Sequence, Union
from alembic import op
revision: str = "a1b2c3d4e5f6"
down_revision: Union[str, None] = "e9f0c4079540"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("raw_packets", schema=None) as batch_op:
batch_op.create_index(
"ix_raw_packets_packet_hash_received_at",
["packet_hash", "received_at"],
unique=False,
)
def downgrade() -> None:
with op.batch_alter_table("raw_packets", schema=None) as batch_op:
batch_op.drop_index("ix_raw_packets_packet_hash_received_at")
+2
View File
@@ -13,6 +13,7 @@ from meshcore_hub.api.routes.user_profiles import router as user_profiles_router
from meshcore_hub.api.routes.adoptions import router as adoptions_router
from meshcore_hub.api.routes.channels import router as channels_router
from meshcore_hub.api.routes.raw_packets import router as raw_packets_router
from meshcore_hub.api.routes.packet_groups import router as packet_groups_router
api_router = APIRouter()
@@ -32,3 +33,4 @@ api_router.include_router(user_profiles_router, prefix="/user", tags=["User"])
api_router.include_router(adoptions_router, prefix="/adoptions", tags=["Adoptions"])
api_router.include_router(channels_router, prefix="/channels", tags=["Channels"])
api_router.include_router(raw_packets_router, prefix="/packets", tags=["Packets"])
api_router.include_router(packet_groups_router, prefix="/packet-groups", tags=["Packet Groups"])
@@ -0,0 +1,292 @@
"""Grouped raw packet routes — one entry per unique packet_hash."""
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Query, Request
from sqlalchemy import func, or_, select
from sqlalchemy.orm import aliased, selectinload
from meshcore_hub.api.auth import RequireRead
from meshcore_hub.api.cache import cached, sorted_query_string
from meshcore_hub.api.channel_visibility import (
get_max_visibility_level,
get_visible_channel_indices,
resolve_user_role,
)
from meshcore_hub.api.dependencies import DbSession
from meshcore_hub.common.models import Node, RawPacket
from meshcore_hub.common.schemas.raw_packets import (
GroupedPacketList,
GroupedPacketRead,
PacketReceptionInfo,
)
router = APIRouter()
SEARCH_DEFAULT_WINDOW_DAYS = 7
VALID_SORT_COLUMNS = {"time", "event_type", "reception_count"}
def _group_key_builder(request: Request) -> str:
role = resolve_user_role(request) or "anonymous"
return f"packet_groups:role={role}:{sorted_query_string(request)}"
def _extract_path_hashes(decoded: dict[str, Any] | None) -> list[str] | None:
"""Extract pathHashes from decoded.payload.decoded.pathHashes."""
if not decoded:
return None
payload = decoded.get("payload") or {}
inner = payload.get("decoded") or {}
hashes = inner.get("pathHashes")
return hashes if isinstance(hashes, list) else None
def _get_tag_name(node: Optional[Node]) -> Optional[str]:
if not node or not node.tags:
return None
for tag in node.tags:
if tag.key == "name":
return tag.value
return None
@router.get("", response_model=GroupedPacketList)
@cached("packet_groups", key_builder=_group_key_builder)
def list_packet_groups(
_: RequireRead,
session: DbSession,
request: Request,
search: Optional[str] = Query(None, description="Search in packet hash or observer name"),
event_type: Optional[str] = Query(None, description="Filter by event type"),
channel_idx: Optional[int] = Query(None, description="Filter by channel index"),
since: Optional[datetime] = Query(None, description="Start timestamp"),
until: Optional[datetime] = Query(None, description="End timestamp"),
sort: Optional[str] = Query(None, description="Sort column: time, event_type, reception_count"),
order: Optional[str] = Query(None, description="asc or desc"),
limit: int = Query(50, ge=1, le=100),
offset: int = Query(0, ge=0),
) -> GroupedPacketList:
"""List deduplicated packet groups — one row per unique packet_hash.
Rows with no packet_hash are excluded (they cannot be meaningfully grouped).
The 7-day default window applies unless ``since`` is provided.
"""
sort = sort if sort in VALID_SORT_COLUMNS else "time"
order = order if order in ("asc", "desc") else "desc"
role = resolve_user_role(request)
max_level = get_max_visibility_level(role)
visible_indices = get_visible_channel_indices(session, max_level)
# Default time window — keeps GROUP BY bounded when no explicit since/until.
if since is None:
if search:
since = datetime.now(timezone.utc) - timedelta(days=SEARCH_DEFAULT_WINDOW_DAYS)
else:
since = datetime.now(timezone.utc) - timedelta(days=SEARCH_DEFAULT_WINDOW_DAYS)
ObserverNode = aliased(Node)
# ── Phase 1: GROUP BY packet_hash to paginate over distinct groups ────────
group_query = (
select(
RawPacket.packet_hash,
func.count(RawPacket.id).label("reception_count"),
func.count(RawPacket.observer_node_id.distinct()).label("observer_count"),
func.min(RawPacket.received_at).label("first_seen"),
)
.outerjoin(ObserverNode, RawPacket.observer_node_id == ObserverNode.id)
.where(RawPacket.packet_hash.is_not(None))
.where(RawPacket.received_at >= since)
)
if search:
pattern = f"%{search}%"
group_query = group_query.where(
or_(
RawPacket.packet_hash.ilike(pattern),
ObserverNode.name.ilike(pattern),
)
)
if event_type:
group_query = group_query.where(RawPacket.event_type == event_type)
if channel_idx is not None:
group_query = group_query.where(RawPacket.channel_idx == channel_idx)
if until:
group_query = group_query.where(RawPacket.received_at <= until)
group_query = group_query.group_by(RawPacket.packet_hash)
count_query = select(func.count()).select_from(group_query.subquery())
total = session.execute(count_query).scalar() or 0
sort_col = {
"time": func.min(RawPacket.received_at),
"event_type": func.min(RawPacket.event_type),
"reception_count": func.count(RawPacket.id),
}[sort]
group_query = group_query.order_by(
sort_col.desc() if order == "desc" else sort_col.asc()
)
group_query = group_query.offset(offset).limit(limit)
group_rows = session.execute(group_query).all()
hashes = [r.packet_hash for r in group_rows]
if not hashes:
return GroupedPacketList(items=[], total=total, limit=limit, offset=offset)
# ── Phase 2: Lightweight metadata fetch — no raw_hex, no decoded ──────────
meta_query = select(
RawPacket.id,
RawPacket.packet_hash,
RawPacket.event_type,
RawPacket.channel_idx,
RawPacket.packet_type,
RawPacket.payload_type,
RawPacket.route_type,
RawPacket.source_pubkey_prefix,
RawPacket.received_at,
).where(RawPacket.packet_hash.in_(hashes)).order_by(RawPacket.received_at.asc())
meta_rows = session.execute(meta_query).all()
# Pick first (oldest) row per hash as the representative display row
representative: dict[str, Any] = {}
for row in meta_rows:
if row.packet_hash not in representative:
representative[row.packet_hash] = row
group_counts = {r.packet_hash: r for r in group_rows}
items = []
for h in hashes:
rep = representative.get(h)
grp = group_counts[h]
is_redacted = bool(
rep is not None
and rep.channel_idx is not None
and rep.channel_idx not in visible_indices
)
items.append(
GroupedPacketRead(
packet_hash=h,
event_type=rep.event_type if rep else None,
channel_idx=rep.channel_idx if rep else None,
packet_type=rep.packet_type if rep else None,
payload_type=rep.payload_type if rep else None,
route_type=rep.route_type if rep else None,
source_pubkey_prefix=(
None if is_redacted else (rep.source_pubkey_prefix if rep else None)
),
reception_count=grp.reception_count,
observer_count=grp.observer_count,
receptions=[],
first_seen=grp.first_seen,
redacted=is_redacted,
raw_hex=None,
decoded=None,
)
)
return GroupedPacketList(items=items, total=total, limit=limit, offset=offset)
@router.get("/{packet_hash}", response_model=GroupedPacketRead)
def get_packet_group(
_: RequireRead,
session: DbSession,
request: Request,
packet_hash: str,
) -> GroupedPacketRead:
"""Full detail for a packet group, including all (observer, path) receptions."""
ObserverNode = aliased(Node)
rows = session.execute(
select(
RawPacket,
ObserverNode.public_key.label("observer_pk"),
ObserverNode.name.label("observer_name"),
ObserverNode.id.label("observer_id"),
)
.outerjoin(ObserverNode, RawPacket.observer_node_id == ObserverNode.id)
.where(RawPacket.packet_hash == packet_hash)
.order_by(RawPacket.received_at.asc())
).all()
if not rows:
raise HTTPException(status_code=404, detail="Packet group not found")
role = resolve_user_role(request)
max_level = get_max_visibility_level(role)
visible_indices = get_visible_channel_indices(session, max_level)
observer_ids = {row.observer_id for row in rows if row.observer_id}
nodes_by_id: dict[str, Node] = {}
if observer_ids:
nodes = (
session.execute(
select(Node)
.where(Node.id.in_(observer_ids))
.options(selectinload(Node.tags))
)
.scalars()
.all()
)
nodes_by_id = {n.id: n for n in nodes}
receptions: list[PacketReceptionInfo] = []
for row in rows:
packet = row[0]
is_redacted = (
packet.channel_idx is not None
and packet.channel_idx not in visible_indices
)
observer_node = nodes_by_id.get(row.observer_id) if row.observer_id else None
receptions.append(
PacketReceptionInfo(
packet_id=packet.id,
observed_by=row.observer_pk,
observer_name=row.observer_name,
observer_tag_name=_get_tag_name(observer_node),
snr=packet.snr,
path_len=packet.path_len,
path_hashes=None if is_redacted else _extract_path_hashes(packet.decoded),
received_at=packet.received_at,
redacted=is_redacted,
)
)
first_packet = rows[0][0]
all_redacted = all(r.redacted for r in receptions)
unique_observers = len({r.observed_by for r in receptions if r.observed_by})
# Use first non-redacted row for the shared raw_hex/decoded panel
rep_packet = None
for row in rows:
p = row[0]
if p.channel_idx is None or p.channel_idx in visible_indices:
rep_packet = p
break
return GroupedPacketRead(
packet_hash=packet_hash,
event_type=first_packet.event_type,
channel_idx=first_packet.channel_idx,
packet_type=first_packet.packet_type,
payload_type=first_packet.payload_type,
route_type=first_packet.route_type,
source_pubkey_prefix=None if all_redacted else first_packet.source_pubkey_prefix,
reception_count=len(receptions),
observer_count=unique_observers,
receptions=receptions,
first_seen=first_packet.received_at,
redacted=all_redacted,
raw_hex=rep_packet.raw_hex if rep_packet else None,
decoded=rep_packet.decoded if rep_packet else None,
)
@@ -113,6 +113,11 @@ class RawPacket(Base, UUIDMixin, TimestampMixin):
"source_pubkey_prefix",
"received_at",
),
Index(
"ix_raw_packets_packet_hash_received_at",
"packet_hash",
"received_at",
),
)
def __repr__(self) -> str:
@@ -65,3 +65,50 @@ class RawPacketList(BaseModel):
total: int = Field(..., description="Total number of raw packets")
limit: int = Field(..., description="Page size limit")
offset: int = Field(..., description="Page offset")
class PacketReceptionInfo(BaseModel):
"""One raw_packet row — a single (observer, path) reception."""
packet_id: str = Field(..., description="Raw packet UUID")
observed_by: Optional[str] = Field(default=None, description="Observer public key")
observer_name: Optional[str] = Field(default=None, description="Observer node name")
observer_tag_name: Optional[str] = Field(default=None, description="Observer name from tags")
snr: Optional[float] = Field(default=None, description="SNR at this observer")
path_len: Optional[int] = Field(default=None, description="Hop count")
path_hashes: Optional[list[str]] = Field(
default=None, description="Hop node hash sequence from decoded.payload.decoded.pathHashes"
)
received_at: datetime = Field(..., description="When received")
redacted: bool = Field(default=False)
class GroupedPacketRead(BaseModel):
"""One packet hash with aggregated reception info."""
packet_hash: Optional[str] = Field(default=None)
event_type: Optional[str] = Field(default=None)
channel_idx: Optional[int] = Field(default=None)
packet_type: Optional[int] = Field(default=None)
payload_type: Optional[int] = Field(default=None)
route_type: Optional[str] = Field(default=None)
source_pubkey_prefix: Optional[str] = Field(default=None)
reception_count: int = Field(..., description="Total rows (paths × observers)")
observer_count: int = Field(..., description="Distinct observer nodes")
receptions: list[PacketReceptionInfo] = Field(
default_factory=list,
description="Individual receptions (populated for detail, empty for list)",
)
first_seen: datetime = Field(..., description="Earliest received_at across all receptions")
redacted: bool = Field(default=False, description="True when all receptions are redacted")
raw_hex: Optional[str] = Field(default=None, description="Representative raw hex (null for list view)")
decoded: Optional[dict[str, Any]] = Field(default=None, description="Representative decoded JSON (null for list view)")
class GroupedPacketList(BaseModel):
"""Paginated list of grouped packets."""
items: list[GroupedPacketRead] = Field(..., description="List of packet groups")
total: int = Field(..., description="Total distinct packet hashes matching filters")
limit: int
offset: int
+8 -1
View File
@@ -227,12 +227,19 @@
"col_source": "Source",
"col_route_type": "Route Type",
"col_raw": "Raw",
"receptions_title": "Receptions",
"receptions_count": "Receptions",
"reception_singular": "path",
"reception_plural": "paths",
"col_path": "Path",
"col_receptions": "Receptions",
"sort": {
"newest": "Time (newest)",
"oldest": "Time (oldest)",
"event_az": "Event (AZ)",
"snr_high": "SNR (highest)",
"path_high": "Hops (most)"
"path_high": "Hops (most)",
"receptions_high": "Receptions (most)"
}
},
"map": {