mirror of
https://github.com/ipnet-mesh/meshcore-hub.git
synced 2026-03-28 17:42:56 +01:00
Updates
This commit is contained in:
63
alembic/versions/20241206_0002_004_event_receivers.py
Normal file
63
alembic/versions/20241206_0002_004_event_receivers.py
Normal file
@@ -0,0 +1,63 @@
|
||||
"""Add event_receivers junction table for multi-receiver tracking
|
||||
|
||||
Revision ID: 004
|
||||
Revises: 003
|
||||
Create Date: 2024-12-06
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "004"
|
||||
down_revision: Union[str, None] = "003"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table(
|
||||
"event_receivers",
|
||||
sa.Column("id", sa.String(36), primary_key=True),
|
||||
sa.Column("event_type", sa.String(20), nullable=False),
|
||||
sa.Column("event_hash", sa.String(32), nullable=False),
|
||||
sa.Column(
|
||||
"receiver_node_id",
|
||||
sa.String(36),
|
||||
sa.ForeignKey("nodes.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column("snr", sa.Float, nullable=True),
|
||||
sa.Column("received_at", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.UniqueConstraint(
|
||||
"event_hash", "receiver_node_id", name="uq_event_receivers_hash_node"
|
||||
),
|
||||
)
|
||||
op.create_index(
|
||||
"ix_event_receivers_event_hash",
|
||||
"event_receivers",
|
||||
["event_hash"],
|
||||
)
|
||||
op.create_index(
|
||||
"ix_event_receivers_receiver_node_id",
|
||||
"event_receivers",
|
||||
["receiver_node_id"],
|
||||
)
|
||||
op.create_index(
|
||||
"ix_event_receivers_type_hash",
|
||||
"event_receivers",
|
||||
["event_type", "event_hash"],
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_index("ix_event_receivers_type_hash", table_name="event_receivers")
|
||||
op.drop_index("ix_event_receivers_receiver_node_id", table_name="event_receivers")
|
||||
op.drop_index("ix_event_receivers_event_hash", table_name="event_receivers")
|
||||
op.drop_table("event_receivers")
|
||||
@@ -9,8 +9,12 @@ from sqlalchemy.orm import aliased, selectinload
|
||||
|
||||
from meshcore_hub.api.auth import RequireRead
|
||||
from meshcore_hub.api.dependencies import DbSession
|
||||
from meshcore_hub.common.models import Advertisement, Node
|
||||
from meshcore_hub.common.schemas.messages import AdvertisementList, AdvertisementRead
|
||||
from meshcore_hub.common.models import Advertisement, EventReceiver, Node, NodeTag
|
||||
from meshcore_hub.common.schemas.messages import (
|
||||
AdvertisementList,
|
||||
AdvertisementRead,
|
||||
ReceiverInfo,
|
||||
)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -25,6 +29,62 @@ def _get_friendly_name(node: Optional[Node]) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def _fetch_receivers_for_events(
|
||||
session: DbSession,
|
||||
event_type: str,
|
||||
event_hashes: list[str],
|
||||
) -> dict[str, list[ReceiverInfo]]:
|
||||
"""Fetch receiver info for a list of events by their hashes."""
|
||||
if not event_hashes:
|
||||
return {}
|
||||
|
||||
query = (
|
||||
select(
|
||||
EventReceiver.event_hash,
|
||||
EventReceiver.snr,
|
||||
EventReceiver.received_at,
|
||||
Node.id.label("node_id"),
|
||||
Node.public_key,
|
||||
Node.name,
|
||||
)
|
||||
.join(Node, EventReceiver.receiver_node_id == Node.id)
|
||||
.where(EventReceiver.event_type == event_type)
|
||||
.where(EventReceiver.event_hash.in_(event_hashes))
|
||||
.order_by(EventReceiver.received_at)
|
||||
)
|
||||
|
||||
results = session.execute(query).all()
|
||||
receivers_by_hash: dict[str, list[ReceiverInfo]] = {}
|
||||
|
||||
node_ids = [r.node_id for r in results]
|
||||
friendly_names: dict[str, str] = {}
|
||||
if node_ids:
|
||||
fn_query = (
|
||||
select(NodeTag.node_id, NodeTag.value)
|
||||
.where(NodeTag.node_id.in_(node_ids))
|
||||
.where(NodeTag.key == "friendly_name")
|
||||
)
|
||||
for node_id, value in session.execute(fn_query).all():
|
||||
friendly_names[node_id] = value
|
||||
|
||||
for row in results:
|
||||
if row.event_hash not in receivers_by_hash:
|
||||
receivers_by_hash[row.event_hash] = []
|
||||
|
||||
receivers_by_hash[row.event_hash].append(
|
||||
ReceiverInfo(
|
||||
node_id=row.node_id,
|
||||
public_key=row.public_key,
|
||||
name=row.name,
|
||||
friendly_name=friendly_names.get(row.node_id),
|
||||
snr=row.snr,
|
||||
received_at=row.received_at,
|
||||
)
|
||||
)
|
||||
|
||||
return receivers_by_hash
|
||||
|
||||
|
||||
@router.get("", response_model=AdvertisementList)
|
||||
async def list_advertisements(
|
||||
_: RequireRead,
|
||||
@@ -97,6 +157,12 @@ async def list_advertisements(
|
||||
nodes = session.execute(nodes_query).scalars().all()
|
||||
nodes_by_id = {n.id: n for n in nodes}
|
||||
|
||||
# Fetch all receivers for these advertisements
|
||||
event_hashes = [r[0].event_hash for r in results if r[0].event_hash]
|
||||
receivers_by_hash = _fetch_receivers_for_events(
|
||||
session, "advertisement", event_hashes
|
||||
)
|
||||
|
||||
# Build response with node details
|
||||
items = []
|
||||
for row in results:
|
||||
@@ -116,6 +182,9 @@ async def list_advertisements(
|
||||
"flags": adv.flags,
|
||||
"received_at": adv.received_at,
|
||||
"created_at": adv.created_at,
|
||||
"receivers": (
|
||||
receivers_by_hash.get(adv.event_hash, []) if adv.event_hash else []
|
||||
),
|
||||
}
|
||||
items.append(AdvertisementRead(**data))
|
||||
|
||||
@@ -175,6 +244,14 @@ async def get_advertisement(
|
||||
receiver_node = nodes_by_id.get(result.receiver_id) if result.receiver_id else None
|
||||
source_node = nodes_by_id.get(result.source_id) if result.source_id else None
|
||||
|
||||
# Fetch receivers for this advertisement
|
||||
receivers = []
|
||||
if adv.event_hash:
|
||||
receivers_by_hash = _fetch_receivers_for_events(
|
||||
session, "advertisement", [adv.event_hash]
|
||||
)
|
||||
receivers = receivers_by_hash.get(adv.event_hash, [])
|
||||
|
||||
data = {
|
||||
"received_by": result.receiver_pk,
|
||||
"receiver_name": result.receiver_name,
|
||||
@@ -187,5 +264,6 @@ async def get_advertisement(
|
||||
"flags": adv.flags,
|
||||
"received_at": adv.received_at,
|
||||
"created_at": adv.created_at,
|
||||
"receivers": receivers,
|
||||
}
|
||||
return AdvertisementRead(**data)
|
||||
|
||||
@@ -9,8 +9,8 @@ from sqlalchemy.orm import aliased, selectinload
|
||||
|
||||
from meshcore_hub.api.auth import RequireRead
|
||||
from meshcore_hub.api.dependencies import DbSession
|
||||
from meshcore_hub.common.models import Message, Node, NodeTag
|
||||
from meshcore_hub.common.schemas.messages import MessageList, MessageRead
|
||||
from meshcore_hub.common.models import EventReceiver, Message, Node, NodeTag
|
||||
from meshcore_hub.common.schemas.messages import MessageList, MessageRead, ReceiverInfo
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -25,6 +25,75 @@ def _get_friendly_name(node: Optional[Node]) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def _fetch_receivers_for_events(
|
||||
session: DbSession,
|
||||
event_type: str,
|
||||
event_hashes: list[str],
|
||||
) -> dict[str, list[ReceiverInfo]]:
|
||||
"""Fetch receiver info for a list of events by their hashes.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
event_type: Type of event ('message', 'advertisement', etc.)
|
||||
event_hashes: List of event hashes to fetch receivers for
|
||||
|
||||
Returns:
|
||||
Dict mapping event_hash to list of ReceiverInfo objects
|
||||
"""
|
||||
if not event_hashes:
|
||||
return {}
|
||||
|
||||
# Query event_receivers with receiver node info
|
||||
query = (
|
||||
select(
|
||||
EventReceiver.event_hash,
|
||||
EventReceiver.snr,
|
||||
EventReceiver.received_at,
|
||||
Node.id.label("node_id"),
|
||||
Node.public_key,
|
||||
Node.name,
|
||||
)
|
||||
.join(Node, EventReceiver.receiver_node_id == Node.id)
|
||||
.where(EventReceiver.event_type == event_type)
|
||||
.where(EventReceiver.event_hash.in_(event_hashes))
|
||||
.order_by(EventReceiver.received_at)
|
||||
)
|
||||
|
||||
results = session.execute(query).all()
|
||||
|
||||
# Group by event_hash
|
||||
receivers_by_hash: dict[str, list[ReceiverInfo]] = {}
|
||||
|
||||
# Get friendly names for receiver nodes
|
||||
node_ids = [r.node_id for r in results]
|
||||
friendly_names: dict[str, str] = {}
|
||||
if node_ids:
|
||||
fn_query = (
|
||||
select(NodeTag.node_id, NodeTag.value)
|
||||
.where(NodeTag.node_id.in_(node_ids))
|
||||
.where(NodeTag.key == "friendly_name")
|
||||
)
|
||||
for node_id, value in session.execute(fn_query).all():
|
||||
friendly_names[node_id] = value
|
||||
|
||||
for row in results:
|
||||
if row.event_hash not in receivers_by_hash:
|
||||
receivers_by_hash[row.event_hash] = []
|
||||
|
||||
receivers_by_hash[row.event_hash].append(
|
||||
ReceiverInfo(
|
||||
node_id=row.node_id,
|
||||
public_key=row.public_key,
|
||||
name=row.name,
|
||||
friendly_name=friendly_names.get(row.node_id),
|
||||
snr=row.snr,
|
||||
received_at=row.received_at,
|
||||
)
|
||||
)
|
||||
|
||||
return receivers_by_hash
|
||||
|
||||
|
||||
@router.get("", response_model=MessageList)
|
||||
async def list_messages(
|
||||
_: RequireRead,
|
||||
@@ -126,6 +195,10 @@ async def list_messages(
|
||||
receivers = session.execute(receivers_query).scalars().all()
|
||||
receivers_by_id = {n.id: n for n in receivers}
|
||||
|
||||
# Fetch all receivers for these messages
|
||||
event_hashes = [r[0].event_hash for r in results if r[0].event_hash]
|
||||
receivers_by_hash = _fetch_receivers_for_events(session, "message", event_hashes)
|
||||
|
||||
# Build response with sender info and received_by
|
||||
items = []
|
||||
for row in results:
|
||||
@@ -159,6 +232,9 @@ async def list_messages(
|
||||
"sender_timestamp": m.sender_timestamp,
|
||||
"received_at": m.received_at,
|
||||
"created_at": m.created_at,
|
||||
"receivers": (
|
||||
receivers_by_hash.get(m.event_hash, []) if m.event_hash else []
|
||||
),
|
||||
}
|
||||
items.append(MessageRead(**msg_dict))
|
||||
|
||||
@@ -189,6 +265,15 @@ async def get_message(
|
||||
raise HTTPException(status_code=404, detail="Message not found")
|
||||
|
||||
message, receiver_pk = result
|
||||
|
||||
# Fetch receivers for this message
|
||||
receivers = []
|
||||
if message.event_hash:
|
||||
receivers_by_hash = _fetch_receivers_for_events(
|
||||
session, "message", [message.event_hash]
|
||||
)
|
||||
receivers = receivers_by_hash.get(message.event_hash, [])
|
||||
|
||||
data = {
|
||||
"id": message.id,
|
||||
"receiver_node_id": message.receiver_node_id,
|
||||
@@ -204,5 +289,6 @@ async def get_message(
|
||||
"sender_timestamp": message.sender_timestamp,
|
||||
"received_at": message.received_at,
|
||||
"created_at": message.created_at,
|
||||
"receivers": receivers,
|
||||
}
|
||||
return MessageRead(**data)
|
||||
|
||||
@@ -8,7 +8,7 @@ from sqlalchemy import select
|
||||
|
||||
from meshcore_hub.common.database import DatabaseManager
|
||||
from meshcore_hub.common.hash_utils import compute_advertisement_hash
|
||||
from meshcore_hub.common.models import Advertisement, Node
|
||||
from meshcore_hub.common.models import Advertisement, Node, add_event_receiver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -51,21 +51,7 @@ def handle_advertisement(
|
||||
)
|
||||
|
||||
with db.session_scope() as session:
|
||||
# Check if advertisement with same hash already exists
|
||||
existing = session.execute(
|
||||
select(Advertisement.id).where(Advertisement.event_hash == event_hash)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
logger.debug(f"Duplicate advertisement skipped (hash={event_hash[:8]}...)")
|
||||
# Still update node last_seen even for duplicate advertisements
|
||||
node_query = select(Node).where(Node.public_key == adv_public_key)
|
||||
node = session.execute(node_query).scalar_one_or_none()
|
||||
if node:
|
||||
node.last_seen = now
|
||||
return
|
||||
|
||||
# Find or create receiver node
|
||||
# Find or create receiver node first (needed for both new and duplicate events)
|
||||
receiver_node = None
|
||||
if public_key:
|
||||
receiver_query = select(Node).where(Node.public_key == public_key)
|
||||
@@ -79,6 +65,37 @@ def handle_advertisement(
|
||||
)
|
||||
session.add(receiver_node)
|
||||
session.flush()
|
||||
else:
|
||||
receiver_node.last_seen = now
|
||||
|
||||
# Check if advertisement with same hash already exists
|
||||
existing = session.execute(
|
||||
select(Advertisement.id).where(Advertisement.event_hash == event_hash)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
# Still update advertised node's last_seen even for duplicate advertisements
|
||||
node_query = select(Node).where(Node.public_key == adv_public_key)
|
||||
node = session.execute(node_query).scalar_one_or_none()
|
||||
if node:
|
||||
node.last_seen = now
|
||||
|
||||
# Add this receiver to the junction table
|
||||
if receiver_node:
|
||||
added = add_event_receiver(
|
||||
session=session,
|
||||
event_type="advertisement",
|
||||
event_hash=event_hash,
|
||||
receiver_node_id=receiver_node.id,
|
||||
snr=None, # Advertisements don't have SNR
|
||||
received_at=now,
|
||||
)
|
||||
if added:
|
||||
logger.debug(
|
||||
f"Added receiver {public_key[:12]}... to advertisement "
|
||||
f"(hash={event_hash[:8]}...)"
|
||||
)
|
||||
return
|
||||
|
||||
# Find or create advertised node
|
||||
node_query = select(Node).where(Node.public_key == adv_public_key)
|
||||
@@ -119,6 +136,17 @@ def handle_advertisement(
|
||||
)
|
||||
session.add(advertisement)
|
||||
|
||||
# Add first receiver to junction table
|
||||
if receiver_node:
|
||||
add_event_receiver(
|
||||
session=session,
|
||||
event_type="advertisement",
|
||||
event_hash=event_hash,
|
||||
receiver_node_id=receiver_node.id,
|
||||
snr=None,
|
||||
received_at=now,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Stored advertisement from {name or adv_public_key[:12]!r} "
|
||||
f"(type={adv_type})"
|
||||
|
||||
@@ -8,7 +8,7 @@ from sqlalchemy import select
|
||||
|
||||
from meshcore_hub.common.database import DatabaseManager
|
||||
from meshcore_hub.common.hash_utils import compute_message_hash
|
||||
from meshcore_hub.common.models import Message, Node
|
||||
from meshcore_hub.common.models import Message, Node, add_event_receiver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -95,16 +95,7 @@ def _handle_message(
|
||||
)
|
||||
|
||||
with db.session_scope() as session:
|
||||
# Check if message with same hash already exists
|
||||
existing = session.execute(
|
||||
select(Message.id).where(Message.event_hash == event_hash)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
logger.debug(f"Duplicate message skipped (hash={event_hash[:8]}...)")
|
||||
return
|
||||
|
||||
# Find receiver node
|
||||
# Find or create receiver node first (needed for both new and duplicate events)
|
||||
receiver_node = None
|
||||
if public_key:
|
||||
receiver_query = select(Node).where(Node.public_key == public_key)
|
||||
@@ -121,6 +112,29 @@ def _handle_message(
|
||||
else:
|
||||
receiver_node.last_seen = now
|
||||
|
||||
# Check if message with same hash already exists
|
||||
existing = session.execute(
|
||||
select(Message.id).where(Message.event_hash == event_hash)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
# Event already exists - just add this receiver to the junction table
|
||||
if receiver_node:
|
||||
added = add_event_receiver(
|
||||
session=session,
|
||||
event_type="message",
|
||||
event_hash=event_hash,
|
||||
receiver_node_id=receiver_node.id,
|
||||
snr=snr,
|
||||
received_at=now,
|
||||
)
|
||||
if added:
|
||||
logger.debug(
|
||||
f"Added receiver {public_key[:12]}... to message "
|
||||
f"(hash={event_hash[:8]}...)"
|
||||
)
|
||||
return
|
||||
|
||||
# Create message record
|
||||
message = Message(
|
||||
receiver_node_id=receiver_node.id if receiver_node else None,
|
||||
@@ -138,6 +152,17 @@ def _handle_message(
|
||||
)
|
||||
session.add(message)
|
||||
|
||||
# Add first receiver to junction table
|
||||
if receiver_node:
|
||||
add_event_receiver(
|
||||
session=session,
|
||||
event_type="message",
|
||||
event_hash=event_hash,
|
||||
receiver_node_id=receiver_node.id,
|
||||
snr=snr,
|
||||
received_at=now,
|
||||
)
|
||||
|
||||
if message_type == "contact":
|
||||
logger.info(
|
||||
f"Stored contact message from {pubkey_prefix!r}: "
|
||||
|
||||
@@ -8,7 +8,7 @@ from sqlalchemy import select
|
||||
|
||||
from meshcore_hub.common.database import DatabaseManager
|
||||
from meshcore_hub.common.hash_utils import compute_telemetry_hash
|
||||
from meshcore_hub.common.models import Node, Telemetry
|
||||
from meshcore_hub.common.models import Node, Telemetry, add_event_receiver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -58,18 +58,7 @@ def handle_telemetry(
|
||||
)
|
||||
|
||||
with db.session_scope() as session:
|
||||
# Check if telemetry with same hash already exists
|
||||
existing = session.execute(
|
||||
select(Telemetry.id).where(Telemetry.event_hash == event_hash)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
logger.debug(
|
||||
f"Duplicate telemetry skipped (node={node_public_key[:12]}...)"
|
||||
)
|
||||
return
|
||||
|
||||
# Find receiver node
|
||||
# Find or create receiver node first (needed for both new and duplicate events)
|
||||
receiver_node = None
|
||||
if public_key:
|
||||
receiver_query = select(Node).where(Node.public_key == public_key)
|
||||
@@ -86,6 +75,29 @@ def handle_telemetry(
|
||||
else:
|
||||
receiver_node.last_seen = now
|
||||
|
||||
# Check if telemetry with same hash already exists
|
||||
existing = session.execute(
|
||||
select(Telemetry.id).where(Telemetry.event_hash == event_hash)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
# Event already exists - just add this receiver to the junction table
|
||||
if receiver_node:
|
||||
added = add_event_receiver(
|
||||
session=session,
|
||||
event_type="telemetry",
|
||||
event_hash=event_hash,
|
||||
receiver_node_id=receiver_node.id,
|
||||
snr=None,
|
||||
received_at=now,
|
||||
)
|
||||
if added:
|
||||
logger.debug(
|
||||
f"Added receiver {public_key[:12]}... to telemetry "
|
||||
f"(node={node_public_key[:12]}...)"
|
||||
)
|
||||
return
|
||||
|
||||
# Find or create reporting node
|
||||
reporting_node = None
|
||||
if node_public_key:
|
||||
@@ -115,6 +127,17 @@ def handle_telemetry(
|
||||
)
|
||||
session.add(telemetry)
|
||||
|
||||
# Add first receiver to junction table
|
||||
if receiver_node:
|
||||
add_event_receiver(
|
||||
session=session,
|
||||
event_type="telemetry",
|
||||
event_hash=event_hash,
|
||||
receiver_node_id=receiver_node.id,
|
||||
snr=None,
|
||||
received_at=now,
|
||||
)
|
||||
|
||||
# Log telemetry values
|
||||
if parsed_data:
|
||||
values = ", ".join(f"{k}={v}" for k, v in parsed_data.items())
|
||||
|
||||
@@ -8,7 +8,7 @@ from sqlalchemy import select
|
||||
|
||||
from meshcore_hub.common.database import DatabaseManager
|
||||
from meshcore_hub.common.hash_utils import compute_trace_hash
|
||||
from meshcore_hub.common.models import Node, TracePath
|
||||
from meshcore_hub.common.models import Node, TracePath, add_event_receiver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -45,16 +45,7 @@ def handle_trace_data(
|
||||
event_hash = compute_trace_hash(initiator_tag=initiator_tag)
|
||||
|
||||
with db.session_scope() as session:
|
||||
# Check if trace with same hash already exists
|
||||
existing = session.execute(
|
||||
select(TracePath.id).where(TracePath.event_hash == event_hash)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
logger.debug(f"Duplicate trace skipped (tag={initiator_tag})")
|
||||
return
|
||||
|
||||
# Find receiver node
|
||||
# Find or create receiver node first (needed for both new and duplicate events)
|
||||
receiver_node = None
|
||||
if public_key:
|
||||
receiver_query = select(Node).where(Node.public_key == public_key)
|
||||
@@ -71,6 +62,29 @@ def handle_trace_data(
|
||||
else:
|
||||
receiver_node.last_seen = now
|
||||
|
||||
# Check if trace with same hash already exists
|
||||
existing = session.execute(
|
||||
select(TracePath.id).where(TracePath.event_hash == event_hash)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
# Event already exists - just add this receiver to the junction table
|
||||
if receiver_node:
|
||||
added = add_event_receiver(
|
||||
session=session,
|
||||
event_type="trace",
|
||||
event_hash=event_hash,
|
||||
receiver_node_id=receiver_node.id,
|
||||
snr=None, # Trace events don't have a single SNR value
|
||||
received_at=now,
|
||||
)
|
||||
if added:
|
||||
logger.debug(
|
||||
f"Added receiver {public_key[:12]}... to trace "
|
||||
f"(tag={initiator_tag})"
|
||||
)
|
||||
return
|
||||
|
||||
# Create trace path record
|
||||
trace_path = TracePath(
|
||||
receiver_node_id=receiver_node.id if receiver_node else None,
|
||||
@@ -86,4 +100,15 @@ def handle_trace_data(
|
||||
)
|
||||
session.add(trace_path)
|
||||
|
||||
# Add first receiver to junction table
|
||||
if receiver_node:
|
||||
add_event_receiver(
|
||||
session=session,
|
||||
event_type="trace",
|
||||
event_hash=event_hash,
|
||||
receiver_node_id=receiver_node.id,
|
||||
snr=None,
|
||||
received_at=now,
|
||||
)
|
||||
|
||||
logger.info(f"Stored trace data: tag={initiator_tag}, hops={hop_count}")
|
||||
|
||||
@@ -10,6 +10,7 @@ from meshcore_hub.common.models.telemetry import Telemetry
|
||||
from meshcore_hub.common.models.event_log import EventLog
|
||||
from meshcore_hub.common.models.member import Member
|
||||
from meshcore_hub.common.models.member_node import MemberNode
|
||||
from meshcore_hub.common.models.event_receiver import EventReceiver, add_event_receiver
|
||||
|
||||
__all__ = [
|
||||
"Base",
|
||||
@@ -23,4 +24,6 @@ __all__ = [
|
||||
"EventLog",
|
||||
"Member",
|
||||
"MemberNode",
|
||||
"EventReceiver",
|
||||
"add_event_receiver",
|
||||
]
|
||||
|
||||
127
src/meshcore_hub/common/models/event_receiver.py
Normal file
127
src/meshcore_hub/common/models/event_receiver.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""EventReceiver model for tracking which nodes received each event."""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import DateTime, Float, ForeignKey, Index, String, UniqueConstraint
|
||||
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
||||
from sqlalchemy.orm import Mapped, Session, mapped_column, relationship
|
||||
|
||||
from meshcore_hub.common.models.base import Base, TimestampMixin, UUIDMixin, utc_now
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from meshcore_hub.common.models.node import Node
|
||||
|
||||
|
||||
class EventReceiver(Base, UUIDMixin, TimestampMixin):
|
||||
"""Junction model tracking which receivers observed each event.
|
||||
|
||||
This table enables multi-receiver tracking for deduplicated events.
|
||||
When multiple receiver nodes observe the same mesh event, each receiver
|
||||
gets an entry in this table linked by the event_hash.
|
||||
|
||||
Attributes:
|
||||
id: UUID primary key
|
||||
event_type: Type of event ('message', 'advertisement', 'trace', 'telemetry')
|
||||
event_hash: Hash identifying the unique event (links to event tables)
|
||||
receiver_node_id: FK to the node that received this event
|
||||
snr: Signal-to-noise ratio at this receiver (if available)
|
||||
received_at: When this specific receiver saw the event
|
||||
created_at: Record creation timestamp
|
||||
updated_at: Record update timestamp
|
||||
"""
|
||||
|
||||
__tablename__ = "event_receivers"
|
||||
|
||||
event_type: Mapped[str] = mapped_column(
|
||||
String(20),
|
||||
nullable=False,
|
||||
)
|
||||
event_hash: Mapped[str] = mapped_column(
|
||||
String(32),
|
||||
nullable=False,
|
||||
index=True,
|
||||
)
|
||||
receiver_node_id: Mapped[str] = mapped_column(
|
||||
String(36),
|
||||
ForeignKey("nodes.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
index=True,
|
||||
)
|
||||
snr: Mapped[Optional[float]] = mapped_column(
|
||||
Float,
|
||||
nullable=True,
|
||||
)
|
||||
received_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
default=utc_now,
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
# Relationship to receiver node
|
||||
receiver_node: Mapped["Node"] = relationship(
|
||||
"Node",
|
||||
foreign_keys=[receiver_node_id],
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"event_hash", "receiver_node_id", name="uq_event_receivers_hash_node"
|
||||
),
|
||||
Index("ix_event_receivers_type_hash", "event_type", "event_hash"),
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"<EventReceiver(type={self.event_type}, "
|
||||
f"hash={self.event_hash[:8]}..., "
|
||||
f"node={self.receiver_node_id[:8]}...)>"
|
||||
)
|
||||
|
||||
|
||||
def add_event_receiver(
|
||||
session: Session,
|
||||
event_type: str,
|
||||
event_hash: str,
|
||||
receiver_node_id: str,
|
||||
snr: Optional[float] = None,
|
||||
received_at: Optional[datetime] = None,
|
||||
) -> bool:
|
||||
"""Add a receiver to an event, handling duplicates gracefully.
|
||||
|
||||
Uses INSERT OR IGNORE to handle the unique constraint on (event_hash, receiver_node_id).
|
||||
|
||||
Args:
|
||||
session: SQLAlchemy session
|
||||
event_type: Type of event ('message', 'advertisement', 'trace', 'telemetry')
|
||||
event_hash: Hash identifying the unique event
|
||||
receiver_node_id: UUID of the receiver node
|
||||
snr: Signal-to-noise ratio at this receiver (optional)
|
||||
received_at: When this receiver saw the event (defaults to now)
|
||||
|
||||
Returns:
|
||||
True if a new receiver entry was added, False if it already existed.
|
||||
"""
|
||||
from datetime import timezone
|
||||
|
||||
now = received_at or datetime.now(timezone.utc)
|
||||
|
||||
stmt = (
|
||||
sqlite_insert(EventReceiver)
|
||||
.values(
|
||||
id=str(uuid4()),
|
||||
event_type=event_type,
|
||||
event_hash=event_hash,
|
||||
receiver_node_id=receiver_node_id,
|
||||
snr=snr,
|
||||
received_at=now,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
.on_conflict_do_nothing(index_elements=["event_hash", "receiver_node_id"])
|
||||
)
|
||||
result = session.execute(stmt)
|
||||
# CursorResult has rowcount attribute
|
||||
rowcount = getattr(result, "rowcount", 0)
|
||||
return bool(rowcount and rowcount > 0)
|
||||
@@ -20,6 +20,7 @@ from meshcore_hub.common.schemas.nodes import (
|
||||
NodeTagRead,
|
||||
)
|
||||
from meshcore_hub.common.schemas.messages import (
|
||||
ReceiverInfo,
|
||||
MessageRead,
|
||||
MessageList,
|
||||
MessageFilters,
|
||||
@@ -57,7 +58,8 @@ __all__ = [
|
||||
"NodeTagCreate",
|
||||
"NodeTagUpdate",
|
||||
"NodeTagRead",
|
||||
# Messages
|
||||
# Messages & Events
|
||||
"ReceiverInfo",
|
||||
"MessageRead",
|
||||
"MessageList",
|
||||
"MessageFilters",
|
||||
|
||||
@@ -6,6 +6,24 @@ from typing import Literal, Optional
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class ReceiverInfo(BaseModel):
|
||||
"""Information about a receiver that observed an event."""
|
||||
|
||||
node_id: str = Field(..., description="Receiver node UUID")
|
||||
public_key: str = Field(..., description="Receiver node public key")
|
||||
name: Optional[str] = Field(default=None, description="Receiver node name")
|
||||
friendly_name: Optional[str] = Field(
|
||||
default=None, description="Receiver friendly name from tags"
|
||||
)
|
||||
snr: Optional[float] = Field(
|
||||
default=None, description="Signal-to-noise ratio at this receiver"
|
||||
)
|
||||
received_at: datetime = Field(..., description="When this receiver saw the event")
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class MessageRead(BaseModel):
|
||||
"""Schema for reading a message."""
|
||||
|
||||
@@ -37,6 +55,9 @@ class MessageRead(BaseModel):
|
||||
)
|
||||
received_at: datetime = Field(..., description="When received by interface")
|
||||
created_at: datetime = Field(..., description="Record creation timestamp")
|
||||
receivers: list[ReceiverInfo] = Field(
|
||||
default_factory=list, description="All receivers that observed this message"
|
||||
)
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
@@ -104,6 +125,10 @@ class AdvertisementRead(BaseModel):
|
||||
flags: Optional[int] = Field(default=None, description="Capability flags")
|
||||
received_at: datetime = Field(..., description="When received")
|
||||
created_at: datetime = Field(..., description="Record creation timestamp")
|
||||
receivers: list[ReceiverInfo] = Field(
|
||||
default_factory=list,
|
||||
description="All receivers that observed this advertisement",
|
||||
)
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
@@ -137,6 +162,10 @@ class TracePathRead(BaseModel):
|
||||
hop_count: Optional[int] = Field(default=None, description="Total hops")
|
||||
received_at: datetime = Field(..., description="When received")
|
||||
created_at: datetime = Field(..., description="Record creation timestamp")
|
||||
receivers: list[ReceiverInfo] = Field(
|
||||
default_factory=list,
|
||||
description="All receivers that observed this trace",
|
||||
)
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
@@ -163,6 +192,10 @@ class TelemetryRead(BaseModel):
|
||||
)
|
||||
received_at: datetime = Field(..., description="When received")
|
||||
created_at: datetime = Field(..., description="Record creation timestamp")
|
||||
receivers: list[ReceiverInfo] = Field(
|
||||
default_factory=list,
|
||||
description="All receivers that observed this telemetry",
|
||||
)
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
@@ -71,7 +71,31 @@
|
||||
{% endif %}
|
||||
</td>
|
||||
<td>
|
||||
{% if ad.received_by %}
|
||||
{% if ad.receivers and ad.receivers|length > 1 %}
|
||||
<div class="dropdown dropdown-hover dropdown-end">
|
||||
<label tabindex="0" class="badge badge-outline badge-sm cursor-pointer">
|
||||
{{ ad.receivers|length }} receivers
|
||||
</label>
|
||||
<ul tabindex="0" class="dropdown-content z-[1] menu p-2 shadow bg-base-100 rounded-box w-56">
|
||||
{% for recv in ad.receivers %}
|
||||
<li>
|
||||
<a href="/nodes/{{ recv.public_key }}" class="text-sm">
|
||||
{{ recv.friendly_name or recv.name or recv.public_key[:12] + '...' }}
|
||||
</a>
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% elif ad.receivers and ad.receivers|length == 1 %}
|
||||
<a href="/nodes/{{ ad.receivers[0].public_key }}" class="link link-hover">
|
||||
{% if ad.receivers[0].friendly_name or ad.receivers[0].name %}
|
||||
<div class="font-medium">{{ ad.receivers[0].friendly_name or ad.receivers[0].name }}</div>
|
||||
<div class="text-xs font-mono opacity-70">{{ ad.receivers[0].public_key[:16] }}...</div>
|
||||
{% else %}
|
||||
<span class="font-mono text-sm">{{ ad.receivers[0].public_key[:16] }}...</span>
|
||||
{% endif %}
|
||||
</a>
|
||||
{% elif ad.received_by %}
|
||||
<a href="/nodes/{{ ad.received_by }}" class="link link-hover">
|
||||
{% if ad.receiver_friendly_name or ad.receiver_name %}
|
||||
<div class="font-medium">{{ ad.receiver_friendly_name or ad.receiver_name }}</div>
|
||||
|
||||
@@ -87,7 +87,34 @@
|
||||
</td>
|
||||
<td class="break-words max-w-md" style="white-space: pre-wrap;">{{ msg.text or '-' }}</td>
|
||||
<td>
|
||||
{% if msg.received_by %}
|
||||
{% if msg.receivers and msg.receivers|length > 1 %}
|
||||
<div class="dropdown dropdown-hover dropdown-end">
|
||||
<label tabindex="0" class="badge badge-outline badge-sm cursor-pointer">
|
||||
{{ msg.receivers|length }} receivers
|
||||
</label>
|
||||
<ul tabindex="0" class="dropdown-content z-[1] menu p-2 shadow bg-base-100 rounded-box w-56">
|
||||
{% for recv in msg.receivers %}
|
||||
<li>
|
||||
<a href="/nodes/{{ recv.public_key }}" class="text-sm">
|
||||
<span class="flex-1">{{ recv.friendly_name or recv.name or recv.public_key[:12] + '...' }}</span>
|
||||
{% if recv.snr is not none %}
|
||||
<span class="badge badge-ghost badge-xs">{{ "%.1f"|format(recv.snr) }}</span>
|
||||
{% endif %}
|
||||
</a>
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% elif msg.receivers and msg.receivers|length == 1 %}
|
||||
<a href="/nodes/{{ msg.receivers[0].public_key }}" class="link link-hover">
|
||||
{% if msg.receivers[0].friendly_name or msg.receivers[0].name %}
|
||||
<div class="font-medium">{{ msg.receivers[0].friendly_name or msg.receivers[0].name }}</div>
|
||||
<div class="text-xs font-mono opacity-70">{{ msg.receivers[0].public_key[:16] }}...</div>
|
||||
{% else %}
|
||||
<span class="font-mono text-sm">{{ msg.receivers[0].public_key[:16] }}...</span>
|
||||
{% endif %}
|
||||
</a>
|
||||
{% elif msg.received_by %}
|
||||
<a href="/nodes/{{ msg.received_by }}" class="link link-hover">
|
||||
{% if msg.receiver_friendly_name or msg.receiver_name %}
|
||||
<div class="font-medium">{{ msg.receiver_friendly_name or msg.receiver_name }}</div>
|
||||
|
||||
@@ -94,7 +94,7 @@ class TestComputeAdvertisementHash:
|
||||
|
||||
def test_same_content_same_bucket_produces_same_hash(self) -> None:
|
||||
"""Advertisements within the same time bucket should match."""
|
||||
# Two times within the same 5-minute bucket
|
||||
# Two times within the same 5-minute (300 second) bucket
|
||||
time1 = datetime(2024, 1, 15, 10, 31, 0, tzinfo=timezone.utc)
|
||||
time2 = datetime(2024, 1, 15, 10, 33, 0, tzinfo=timezone.utc)
|
||||
|
||||
@@ -104,7 +104,7 @@ class TestComputeAdvertisementHash:
|
||||
adv_type="chat",
|
||||
flags=128,
|
||||
received_at=time1,
|
||||
bucket_minutes=5,
|
||||
bucket_seconds=300, # 5 minutes
|
||||
)
|
||||
hash2 = compute_advertisement_hash(
|
||||
public_key="a" * 64,
|
||||
@@ -112,14 +112,14 @@ class TestComputeAdvertisementHash:
|
||||
adv_type="chat",
|
||||
flags=128,
|
||||
received_at=time2,
|
||||
bucket_minutes=5,
|
||||
bucket_seconds=300, # 5 minutes
|
||||
)
|
||||
|
||||
assert hash1 == hash2
|
||||
|
||||
def test_different_bucket_produces_different_hash(self) -> None:
|
||||
"""Advertisements in different time buckets should not match."""
|
||||
# Two times in different 5-minute buckets
|
||||
# Two times in different 5-minute (300 second) buckets
|
||||
time1 = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc)
|
||||
time2 = datetime(2024, 1, 15, 10, 36, 0, tzinfo=timezone.utc)
|
||||
|
||||
@@ -127,13 +127,13 @@ class TestComputeAdvertisementHash:
|
||||
public_key="a" * 64,
|
||||
name="Node1",
|
||||
received_at=time1,
|
||||
bucket_minutes=5,
|
||||
bucket_seconds=300, # 5 minutes
|
||||
)
|
||||
hash2 = compute_advertisement_hash(
|
||||
public_key="a" * 64,
|
||||
name="Node1",
|
||||
received_at=time2,
|
||||
bucket_minutes=5,
|
||||
bucket_seconds=300, # 5 minutes
|
||||
)
|
||||
|
||||
assert hash1 != hash2
|
||||
@@ -158,29 +158,29 @@ class TestComputeAdvertisementHash:
|
||||
time1 = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc)
|
||||
time2 = datetime(2024, 1, 15, 10, 35, 0, tzinfo=timezone.utc)
|
||||
|
||||
# With 5-minute bucket, these should be in different buckets
|
||||
# With 5-minute (300s) bucket, these should be in different buckets
|
||||
hash1_5min = compute_advertisement_hash(
|
||||
public_key="a" * 64,
|
||||
received_at=time1,
|
||||
bucket_minutes=5,
|
||||
bucket_seconds=300, # 5 minutes
|
||||
)
|
||||
hash2_5min = compute_advertisement_hash(
|
||||
public_key="a" * 64,
|
||||
received_at=time2,
|
||||
bucket_minutes=5,
|
||||
bucket_seconds=300, # 5 minutes
|
||||
)
|
||||
assert hash1_5min != hash2_5min
|
||||
|
||||
# With 10-minute bucket, these should be in the same bucket
|
||||
# With 10-minute (600s) bucket, these should be in the same bucket
|
||||
hash1_10min = compute_advertisement_hash(
|
||||
public_key="a" * 64,
|
||||
received_at=time1,
|
||||
bucket_minutes=10,
|
||||
bucket_seconds=600, # 10 minutes
|
||||
)
|
||||
hash2_10min = compute_advertisement_hash(
|
||||
public_key="a" * 64,
|
||||
received_at=time2,
|
||||
bucket_minutes=10,
|
||||
bucket_seconds=600, # 10 minutes
|
||||
)
|
||||
assert hash1_10min == hash2_10min
|
||||
|
||||
@@ -216,13 +216,13 @@ class TestComputeTelemetryHash:
|
||||
node_public_key="a" * 64,
|
||||
parsed_data=data,
|
||||
received_at=time1,
|
||||
bucket_minutes=5,
|
||||
bucket_seconds=300, # 5 minutes
|
||||
)
|
||||
hash2 = compute_telemetry_hash(
|
||||
node_public_key="a" * 64,
|
||||
parsed_data=data,
|
||||
received_at=time2,
|
||||
bucket_minutes=5,
|
||||
bucket_seconds=300, # 5 minutes
|
||||
)
|
||||
|
||||
assert hash1 == hash2
|
||||
|
||||
Reference in New Issue
Block a user