mirror of
https://github.com/ipnet-mesh/meshcore-hub.git
synced 2026-06-29 06:21:45 +02:00
Merge pull request #229 from ipnet-mesh/perf/batch-dashboard-sender-queries
perf(api): batch N+1 dashboard and message sender queries
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
"""Shared utilities for fetching event observer data."""
|
||||
|
||||
from sqlalchemy import select
|
||||
from collections.abc import Iterable
|
||||
|
||||
from sqlalchemy import or_, select
|
||||
|
||||
from meshcore_hub.api.dependencies import DbSession
|
||||
from meshcore_hub.common.models import EventObserver, Node, NodeTag
|
||||
@@ -73,3 +75,53 @@ def fetch_observers_for_events(
|
||||
)
|
||||
|
||||
return observers_by_hash
|
||||
|
||||
|
||||
def resolve_sender_names(
|
||||
session: DbSession,
|
||||
prefixes: Iterable[str],
|
||||
) -> tuple[dict[str, str], dict[str, str]]:
|
||||
"""Resolve sender pubkey prefixes to node names and name-tag values.
|
||||
|
||||
Messages store a leading slice of the sender's public key
|
||||
(``pubkey_prefix``). This looks up the matching nodes and returns two
|
||||
dicts, each keyed by the 12-char prefix: one of node names and one of
|
||||
"name" tag values.
|
||||
|
||||
All prefixes are batched into a single pair of queries (one for names,
|
||||
one for tags) rather than a lookup per prefix.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
prefixes: Sender pubkey prefixes to resolve
|
||||
|
||||
Returns:
|
||||
Tuple of (names_by_prefix, tag_names_by_prefix)
|
||||
"""
|
||||
names: dict[str, str] = {}
|
||||
tag_names: dict[str, str] = {}
|
||||
|
||||
unique = {p for p in prefixes if p}
|
||||
if not unique:
|
||||
return names, tag_names
|
||||
|
||||
# One indexable LIKE 'prefix%' term per prefix, ORed together. Tolerates
|
||||
# variable-length prefixes (unlike substr(...) IN), preserving the
|
||||
# per-prefix startswith semantics this replaces.
|
||||
clause = or_(*[Node.public_key.startswith(p) for p in unique])
|
||||
|
||||
name_query = select(Node.public_key, Node.name).where(clause)
|
||||
for public_key, name in session.execute(name_query).all():
|
||||
if name:
|
||||
names[public_key[:12]] = name
|
||||
|
||||
tag_query = (
|
||||
select(Node.public_key, NodeTag.value)
|
||||
.join(NodeTag, Node.id == NodeTag.node_id)
|
||||
.where(clause)
|
||||
.where(NodeTag.key == "name")
|
||||
)
|
||||
for public_key, value in session.execute(tag_query).all():
|
||||
tag_names[public_key[:12]] = value
|
||||
|
||||
return names, tag_names
|
||||
|
||||
@@ -14,6 +14,7 @@ from meshcore_hub.api.channel_visibility import (
|
||||
resolve_user_role,
|
||||
)
|
||||
from meshcore_hub.api.dependencies import DbSession
|
||||
from meshcore_hub.api.observer_utils import resolve_sender_names
|
||||
from meshcore_hub.common.models import (
|
||||
Advertisement,
|
||||
Message,
|
||||
@@ -240,25 +241,7 @@ def get_stats(
|
||||
|
||||
# Look up sender names for these messages
|
||||
msg_prefixes = [m.pubkey_prefix for m in channel_msgs if m.pubkey_prefix]
|
||||
msg_sender_names: dict[str, str] = {}
|
||||
msg_tag_names: dict[str, str] = {}
|
||||
if msg_prefixes:
|
||||
for prefix in set(msg_prefixes):
|
||||
sender_node_query = select(Node.public_key, Node.name).where(
|
||||
Node.public_key.startswith(prefix)
|
||||
)
|
||||
for public_key, name in session.execute(sender_node_query).all():
|
||||
if name:
|
||||
msg_sender_names[public_key[:12]] = name
|
||||
|
||||
sender_tag_query = (
|
||||
select(Node.public_key, NodeTag.value)
|
||||
.join(NodeTag, Node.id == NodeTag.node_id)
|
||||
.where(Node.public_key.startswith(prefix))
|
||||
.where(NodeTag.key == "name")
|
||||
)
|
||||
for public_key, value in session.execute(sender_tag_query).all():
|
||||
msg_tag_names[public_key[:12]] = value
|
||||
msg_sender_names, msg_tag_names = resolve_sender_names(session, msg_prefixes)
|
||||
|
||||
channel_messages[int(channel_idx)] = [
|
||||
ChannelMessage(
|
||||
@@ -467,23 +450,35 @@ def get_node_count_history(
|
||||
end_date = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
start_date = end_date - timedelta(days=days)
|
||||
|
||||
# Get all nodes with their creation dates
|
||||
# Count nodes created on or before each date
|
||||
# Cumulative count: seed the running total with every node that already
|
||||
# existed before the window, then add each day's new nodes as we walk it.
|
||||
# This replaces a per-day COUNT(*) loop (up to 90 full scans) with two
|
||||
# queries.
|
||||
baseline = (
|
||||
session.execute(
|
||||
select(func.count()).select_from(Node).where(Node.created_at < start_date)
|
||||
).scalar()
|
||||
or 0
|
||||
)
|
||||
|
||||
# New nodes per calendar day within the window.
|
||||
date_expr = func.date(Node.created_at)
|
||||
per_day_query = (
|
||||
select(date_expr.label("date"), func.count().label("count"))
|
||||
.where(Node.created_at >= start_date)
|
||||
.where(Node.created_at < end_date)
|
||||
.group_by(date_expr)
|
||||
)
|
||||
new_by_date: dict[str, int] = {
|
||||
row[0]: row[1] for row in session.execute(per_day_query).all()
|
||||
}
|
||||
|
||||
data = []
|
||||
running = baseline
|
||||
for i in range(days):
|
||||
date = start_date + timedelta(days=i)
|
||||
end_of_day = date.replace(hour=23, minute=59, second=59, microsecond=999999)
|
||||
date_str = date.strftime("%Y-%m-%d")
|
||||
|
||||
# Count nodes created on or before this date
|
||||
count = (
|
||||
session.execute(
|
||||
select(func.count())
|
||||
.select_from(Node)
|
||||
.where(Node.created_at <= end_of_day)
|
||||
).scalar()
|
||||
or 0
|
||||
)
|
||||
data.append(DailyActivityPoint(date=date_str, count=count))
|
||||
running += new_by_date.get(date_str, 0)
|
||||
data.append(DailyActivityPoint(date=date_str, count=running))
|
||||
|
||||
return NodeCountHistory(days=days, data=data)
|
||||
|
||||
@@ -15,8 +15,11 @@ from meshcore_hub.api.channel_visibility import (
|
||||
resolve_user_role,
|
||||
)
|
||||
from meshcore_hub.api.dependencies import DbSession
|
||||
from meshcore_hub.api.observer_utils import fetch_observers_for_events
|
||||
from meshcore_hub.common.models import Message, Node, NodeTag
|
||||
from meshcore_hub.api.observer_utils import (
|
||||
fetch_observers_for_events,
|
||||
resolve_sender_names,
|
||||
)
|
||||
from meshcore_hub.common.models import Message, Node
|
||||
from meshcore_hub.common.schemas.messages import MessageList, MessageRead
|
||||
|
||||
router = APIRouter()
|
||||
@@ -141,28 +144,7 @@ def list_messages(
|
||||
|
||||
# Look up sender names and tag names for senders with pubkey_prefix
|
||||
pubkey_prefixes = [r[0].pubkey_prefix for r in results if r[0].pubkey_prefix]
|
||||
sender_names: dict[str, str] = {}
|
||||
sender_tag_names: dict[str, str] = {}
|
||||
if pubkey_prefixes:
|
||||
# Find nodes whose public_key starts with any of these prefixes
|
||||
for prefix in set(pubkey_prefixes):
|
||||
# Get node name
|
||||
node_query = select(Node.public_key, Node.name).where(
|
||||
Node.public_key.startswith(prefix)
|
||||
)
|
||||
for public_key, name in session.execute(node_query).all():
|
||||
if name:
|
||||
sender_names[public_key[:12]] = name
|
||||
|
||||
# Get name tag
|
||||
tag_name_query = (
|
||||
select(Node.public_key, NodeTag.value)
|
||||
.join(NodeTag, Node.id == NodeTag.node_id)
|
||||
.where(Node.public_key.startswith(prefix))
|
||||
.where(NodeTag.key == "name")
|
||||
)
|
||||
for public_key, value in session.execute(tag_name_query).all():
|
||||
sender_tag_names[public_key[:12]] = value
|
||||
sender_names, sender_tag_names = resolve_sender_names(session, pubkey_prefixes)
|
||||
|
||||
# Collect receiver node IDs to fetch tags
|
||||
observer_ids = set()
|
||||
|
||||
@@ -291,6 +291,44 @@ class TestNodeCountHistory:
|
||||
# The last day should have count >= 1
|
||||
assert data["data"][-1]["count"] >= 1
|
||||
|
||||
def test_node_count_is_cumulative_with_baseline(
|
||||
self, client_no_auth, api_db_session
|
||||
):
|
||||
"""Cumulative series counts pre-window nodes from day 0 and steps up
|
||||
on the day a new in-window node is created."""
|
||||
now = datetime.now(timezone.utc)
|
||||
# Created well before a 30-day window: must be in the baseline, so
|
||||
# every day in the series includes it.
|
||||
api_db_session.add(
|
||||
Node(
|
||||
public_key="a" * 64,
|
||||
name="Old Node",
|
||||
created_at=now - timedelta(days=60),
|
||||
)
|
||||
)
|
||||
# Created inside the window, 5 days ago: bumps the running total on
|
||||
# its day and stays for the rest of the series.
|
||||
api_db_session.add(
|
||||
Node(
|
||||
public_key="b" * 64,
|
||||
name="Recent Node",
|
||||
created_at=now - timedelta(days=5),
|
||||
)
|
||||
)
|
||||
api_db_session.commit()
|
||||
|
||||
response = client_no_auth.get("/api/v1/dashboard/node-count?days=30")
|
||||
assert response.status_code == 200
|
||||
counts = [point["count"] for point in response.json()["data"]]
|
||||
|
||||
# Baseline node is present from the very first day.
|
||||
assert counts[0] == 1
|
||||
# Cumulative => never decreases.
|
||||
assert counts == sorted(counts)
|
||||
# Recent node lifts the total to 2 by the end, stepping up exactly once.
|
||||
assert counts[-1] == 2
|
||||
assert counts.count(2) >= 1 and 1 in counts
|
||||
|
||||
|
||||
class TestDashboardTestUserExclusion:
|
||||
"""Tests for test user exclusion from dashboard stats."""
|
||||
|
||||
@@ -73,6 +73,44 @@ class TestListMessages:
|
||||
assert len(data["items"]) == 1
|
||||
assert data["items"][0]["sender_name"] == "SenderNode"
|
||||
|
||||
def test_list_messages_resolves_multiple_distinct_senders(
|
||||
self, client_no_auth, api_db_session
|
||||
):
|
||||
"""Senders with different prefixes each resolve to their own name in a
|
||||
single batched lookup."""
|
||||
api_db_session.add_all(
|
||||
[
|
||||
Node(public_key="aa" + "0" * 62, name="Alice"),
|
||||
Node(public_key="bb" + "1" * 62, name="Bob"),
|
||||
]
|
||||
)
|
||||
api_db_session.commit()
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
api_db_session.add_all(
|
||||
[
|
||||
Message(
|
||||
message_type="contact",
|
||||
pubkey_prefix="aa" + "0" * 10,
|
||||
text="from alice",
|
||||
received_at=now,
|
||||
),
|
||||
Message(
|
||||
message_type="contact",
|
||||
pubkey_prefix="bb" + "1" * 10,
|
||||
text="from bob",
|
||||
received_at=now,
|
||||
),
|
||||
]
|
||||
)
|
||||
api_db_session.commit()
|
||||
|
||||
response = client_no_auth.get("/api/v1/messages")
|
||||
assert response.status_code == 200
|
||||
names = {item["text"]: item["sender_name"] for item in response.json()["items"]}
|
||||
assert names["from alice"] == "Alice"
|
||||
assert names["from bob"] == "Bob"
|
||||
|
||||
def test_list_messages_sender_tag_name_resolution(
|
||||
self, client_no_auth, api_db_session
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user