From d715e4e4f0ffff2d2885b68ed163cc7fde637095 Mon Sep 17 00:00:00 2001 From: Louis King Date: Sat, 6 Dec 2025 13:33:02 +0000 Subject: [PATCH] Updates --- .../20241206_0002_004_event_receivers.py | 63 +++++++++ src/meshcore_hub/api/routes/advertisements.py | 82 ++++++++++- src/meshcore_hub/api/routes/messages.py | 90 ++++++++++++- .../collector/handlers/advertisement.py | 60 ++++++--- .../collector/handlers/message.py | 47 +++++-- .../collector/handlers/telemetry.py | 49 +++++-- src/meshcore_hub/collector/handlers/trace.py | 47 +++++-- src/meshcore_hub/common/models/__init__.py | 3 + .../common/models/event_receiver.py | 127 ++++++++++++++++++ src/meshcore_hub/common/schemas/__init__.py | 4 +- src/meshcore_hub/common/schemas/messages.py | 33 +++++ .../web/templates/advertisements.html | 26 +++- src/meshcore_hub/web/templates/messages.html | 29 +++- tests/test_common/test_hash_utils.py | 28 ++-- 14 files changed, 616 insertions(+), 72 deletions(-) create mode 100644 alembic/versions/20241206_0002_004_event_receivers.py create mode 100644 src/meshcore_hub/common/models/event_receiver.py diff --git a/alembic/versions/20241206_0002_004_event_receivers.py b/alembic/versions/20241206_0002_004_event_receivers.py new file mode 100644 index 0000000..2682803 --- /dev/null +++ b/alembic/versions/20241206_0002_004_event_receivers.py @@ -0,0 +1,63 @@ +"""Add event_receivers junction table for multi-receiver tracking + +Revision ID: 004 +Revises: 003 +Create Date: 2024-12-06 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "004" +down_revision: Union[str, None] = "003" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "event_receivers", + sa.Column("id", sa.String(36), primary_key=True), + sa.Column("event_type", sa.String(20), nullable=False), + sa.Column("event_hash", sa.String(32), nullable=False), + sa.Column( + "receiver_node_id", + sa.String(36), + sa.ForeignKey("nodes.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("snr", sa.Float, nullable=True), + sa.Column("received_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.UniqueConstraint( + "event_hash", "receiver_node_id", name="uq_event_receivers_hash_node" + ), + ) + op.create_index( + "ix_event_receivers_event_hash", + "event_receivers", + ["event_hash"], + ) + op.create_index( + "ix_event_receivers_receiver_node_id", + "event_receivers", + ["receiver_node_id"], + ) + op.create_index( + "ix_event_receivers_type_hash", + "event_receivers", + ["event_type", "event_hash"], + ) + + +def downgrade() -> None: + op.drop_index("ix_event_receivers_type_hash", table_name="event_receivers") + op.drop_index("ix_event_receivers_receiver_node_id", table_name="event_receivers") + op.drop_index("ix_event_receivers_event_hash", table_name="event_receivers") + op.drop_table("event_receivers") diff --git a/src/meshcore_hub/api/routes/advertisements.py b/src/meshcore_hub/api/routes/advertisements.py index 55a57ea..e36974b 100644 --- a/src/meshcore_hub/api/routes/advertisements.py +++ b/src/meshcore_hub/api/routes/advertisements.py @@ -9,8 +9,12 @@ from sqlalchemy.orm import aliased, selectinload from meshcore_hub.api.auth import RequireRead from meshcore_hub.api.dependencies import DbSession -from meshcore_hub.common.models import Advertisement, Node -from meshcore_hub.common.schemas.messages import AdvertisementList, AdvertisementRead +from meshcore_hub.common.models import Advertisement, EventReceiver, Node, NodeTag +from meshcore_hub.common.schemas.messages import ( + AdvertisementList, + AdvertisementRead, + ReceiverInfo, +) router = APIRouter() @@ -25,6 +29,62 @@ def _get_friendly_name(node: Optional[Node]) -> Optional[str]: return None +def _fetch_receivers_for_events( + session: DbSession, + event_type: str, + event_hashes: list[str], +) -> dict[str, list[ReceiverInfo]]: + """Fetch receiver info for a list of events by their hashes.""" + if not event_hashes: + return {} + + query = ( + select( + EventReceiver.event_hash, + EventReceiver.snr, + EventReceiver.received_at, + Node.id.label("node_id"), + Node.public_key, + Node.name, + ) + .join(Node, EventReceiver.receiver_node_id == Node.id) + .where(EventReceiver.event_type == event_type) + .where(EventReceiver.event_hash.in_(event_hashes)) + .order_by(EventReceiver.received_at) + ) + + results = session.execute(query).all() + receivers_by_hash: dict[str, list[ReceiverInfo]] = {} + + node_ids = [r.node_id for r in results] + friendly_names: dict[str, str] = {} + if node_ids: + fn_query = ( + select(NodeTag.node_id, NodeTag.value) + .where(NodeTag.node_id.in_(node_ids)) + .where(NodeTag.key == "friendly_name") + ) + for node_id, value in session.execute(fn_query).all(): + friendly_names[node_id] = value + + for row in results: + if row.event_hash not in receivers_by_hash: + receivers_by_hash[row.event_hash] = [] + + receivers_by_hash[row.event_hash].append( + ReceiverInfo( + node_id=row.node_id, + public_key=row.public_key, + name=row.name, + friendly_name=friendly_names.get(row.node_id), + snr=row.snr, + received_at=row.received_at, + ) + ) + + return receivers_by_hash + + @router.get("", response_model=AdvertisementList) async def list_advertisements( _: RequireRead, @@ -97,6 +157,12 @@ async def list_advertisements( nodes = session.execute(nodes_query).scalars().all() nodes_by_id = {n.id: n for n in nodes} + # Fetch all receivers for these advertisements + event_hashes = [r[0].event_hash for r in results if r[0].event_hash] + receivers_by_hash = _fetch_receivers_for_events( + session, "advertisement", event_hashes + ) + # Build response with node details items = [] for row in results: @@ -116,6 +182,9 @@ async def list_advertisements( "flags": adv.flags, "received_at": adv.received_at, "created_at": adv.created_at, + "receivers": ( + receivers_by_hash.get(adv.event_hash, []) if adv.event_hash else [] + ), } items.append(AdvertisementRead(**data)) @@ -175,6 +244,14 @@ async def get_advertisement( receiver_node = nodes_by_id.get(result.receiver_id) if result.receiver_id else None source_node = nodes_by_id.get(result.source_id) if result.source_id else None + # Fetch receivers for this advertisement + receivers = [] + if adv.event_hash: + receivers_by_hash = _fetch_receivers_for_events( + session, "advertisement", [adv.event_hash] + ) + receivers = receivers_by_hash.get(adv.event_hash, []) + data = { "received_by": result.receiver_pk, "receiver_name": result.receiver_name, @@ -187,5 +264,6 @@ async def get_advertisement( "flags": adv.flags, "received_at": adv.received_at, "created_at": adv.created_at, + "receivers": receivers, } return AdvertisementRead(**data) diff --git a/src/meshcore_hub/api/routes/messages.py b/src/meshcore_hub/api/routes/messages.py index 79a461a..811ad53 100644 --- a/src/meshcore_hub/api/routes/messages.py +++ b/src/meshcore_hub/api/routes/messages.py @@ -9,8 +9,8 @@ from sqlalchemy.orm import aliased, selectinload from meshcore_hub.api.auth import RequireRead from meshcore_hub.api.dependencies import DbSession -from meshcore_hub.common.models import Message, Node, NodeTag -from meshcore_hub.common.schemas.messages import MessageList, MessageRead +from meshcore_hub.common.models import EventReceiver, Message, Node, NodeTag +from meshcore_hub.common.schemas.messages import MessageList, MessageRead, ReceiverInfo router = APIRouter() @@ -25,6 +25,75 @@ def _get_friendly_name(node: Optional[Node]) -> Optional[str]: return None +def _fetch_receivers_for_events( + session: DbSession, + event_type: str, + event_hashes: list[str], +) -> dict[str, list[ReceiverInfo]]: + """Fetch receiver info for a list of events by their hashes. + + Args: + session: Database session + event_type: Type of event ('message', 'advertisement', etc.) + event_hashes: List of event hashes to fetch receivers for + + Returns: + Dict mapping event_hash to list of ReceiverInfo objects + """ + if not event_hashes: + return {} + + # Query event_receivers with receiver node info + query = ( + select( + EventReceiver.event_hash, + EventReceiver.snr, + EventReceiver.received_at, + Node.id.label("node_id"), + Node.public_key, + Node.name, + ) + .join(Node, EventReceiver.receiver_node_id == Node.id) + .where(EventReceiver.event_type == event_type) + .where(EventReceiver.event_hash.in_(event_hashes)) + .order_by(EventReceiver.received_at) + ) + + results = session.execute(query).all() + + # Group by event_hash + receivers_by_hash: dict[str, list[ReceiverInfo]] = {} + + # Get friendly names for receiver nodes + node_ids = [r.node_id for r in results] + friendly_names: dict[str, str] = {} + if node_ids: + fn_query = ( + select(NodeTag.node_id, NodeTag.value) + .where(NodeTag.node_id.in_(node_ids)) + .where(NodeTag.key == "friendly_name") + ) + for node_id, value in session.execute(fn_query).all(): + friendly_names[node_id] = value + + for row in results: + if row.event_hash not in receivers_by_hash: + receivers_by_hash[row.event_hash] = [] + + receivers_by_hash[row.event_hash].append( + ReceiverInfo( + node_id=row.node_id, + public_key=row.public_key, + name=row.name, + friendly_name=friendly_names.get(row.node_id), + snr=row.snr, + received_at=row.received_at, + ) + ) + + return receivers_by_hash + + @router.get("", response_model=MessageList) async def list_messages( _: RequireRead, @@ -126,6 +195,10 @@ async def list_messages( receivers = session.execute(receivers_query).scalars().all() receivers_by_id = {n.id: n for n in receivers} + # Fetch all receivers for these messages + event_hashes = [r[0].event_hash for r in results if r[0].event_hash] + receivers_by_hash = _fetch_receivers_for_events(session, "message", event_hashes) + # Build response with sender info and received_by items = [] for row in results: @@ -159,6 +232,9 @@ async def list_messages( "sender_timestamp": m.sender_timestamp, "received_at": m.received_at, "created_at": m.created_at, + "receivers": ( + receivers_by_hash.get(m.event_hash, []) if m.event_hash else [] + ), } items.append(MessageRead(**msg_dict)) @@ -189,6 +265,15 @@ async def get_message( raise HTTPException(status_code=404, detail="Message not found") message, receiver_pk = result + + # Fetch receivers for this message + receivers = [] + if message.event_hash: + receivers_by_hash = _fetch_receivers_for_events( + session, "message", [message.event_hash] + ) + receivers = receivers_by_hash.get(message.event_hash, []) + data = { "id": message.id, "receiver_node_id": message.receiver_node_id, @@ -204,5 +289,6 @@ async def get_message( "sender_timestamp": message.sender_timestamp, "received_at": message.received_at, "created_at": message.created_at, + "receivers": receivers, } return MessageRead(**data) diff --git a/src/meshcore_hub/collector/handlers/advertisement.py b/src/meshcore_hub/collector/handlers/advertisement.py index 8bb04f9..ccab99e 100644 --- a/src/meshcore_hub/collector/handlers/advertisement.py +++ b/src/meshcore_hub/collector/handlers/advertisement.py @@ -8,7 +8,7 @@ from sqlalchemy import select from meshcore_hub.common.database import DatabaseManager from meshcore_hub.common.hash_utils import compute_advertisement_hash -from meshcore_hub.common.models import Advertisement, Node +from meshcore_hub.common.models import Advertisement, Node, add_event_receiver logger = logging.getLogger(__name__) @@ -51,21 +51,7 @@ def handle_advertisement( ) with db.session_scope() as session: - # Check if advertisement with same hash already exists - existing = session.execute( - select(Advertisement.id).where(Advertisement.event_hash == event_hash) - ).scalar_one_or_none() - - if existing: - logger.debug(f"Duplicate advertisement skipped (hash={event_hash[:8]}...)") - # Still update node last_seen even for duplicate advertisements - node_query = select(Node).where(Node.public_key == adv_public_key) - node = session.execute(node_query).scalar_one_or_none() - if node: - node.last_seen = now - return - - # Find or create receiver node + # Find or create receiver node first (needed for both new and duplicate events) receiver_node = None if public_key: receiver_query = select(Node).where(Node.public_key == public_key) @@ -79,6 +65,37 @@ def handle_advertisement( ) session.add(receiver_node) session.flush() + else: + receiver_node.last_seen = now + + # Check if advertisement with same hash already exists + existing = session.execute( + select(Advertisement.id).where(Advertisement.event_hash == event_hash) + ).scalar_one_or_none() + + if existing: + # Still update advertised node's last_seen even for duplicate advertisements + node_query = select(Node).where(Node.public_key == adv_public_key) + node = session.execute(node_query).scalar_one_or_none() + if node: + node.last_seen = now + + # Add this receiver to the junction table + if receiver_node: + added = add_event_receiver( + session=session, + event_type="advertisement", + event_hash=event_hash, + receiver_node_id=receiver_node.id, + snr=None, # Advertisements don't have SNR + received_at=now, + ) + if added: + logger.debug( + f"Added receiver {public_key[:12]}... to advertisement " + f"(hash={event_hash[:8]}...)" + ) + return # Find or create advertised node node_query = select(Node).where(Node.public_key == adv_public_key) @@ -119,6 +136,17 @@ def handle_advertisement( ) session.add(advertisement) + # Add first receiver to junction table + if receiver_node: + add_event_receiver( + session=session, + event_type="advertisement", + event_hash=event_hash, + receiver_node_id=receiver_node.id, + snr=None, + received_at=now, + ) + logger.info( f"Stored advertisement from {name or adv_public_key[:12]!r} " f"(type={adv_type})" diff --git a/src/meshcore_hub/collector/handlers/message.py b/src/meshcore_hub/collector/handlers/message.py index 47a54f6..1e14343 100644 --- a/src/meshcore_hub/collector/handlers/message.py +++ b/src/meshcore_hub/collector/handlers/message.py @@ -8,7 +8,7 @@ from sqlalchemy import select from meshcore_hub.common.database import DatabaseManager from meshcore_hub.common.hash_utils import compute_message_hash -from meshcore_hub.common.models import Message, Node +from meshcore_hub.common.models import Message, Node, add_event_receiver logger = logging.getLogger(__name__) @@ -95,16 +95,7 @@ def _handle_message( ) with db.session_scope() as session: - # Check if message with same hash already exists - existing = session.execute( - select(Message.id).where(Message.event_hash == event_hash) - ).scalar_one_or_none() - - if existing: - logger.debug(f"Duplicate message skipped (hash={event_hash[:8]}...)") - return - - # Find receiver node + # Find or create receiver node first (needed for both new and duplicate events) receiver_node = None if public_key: receiver_query = select(Node).where(Node.public_key == public_key) @@ -121,6 +112,29 @@ def _handle_message( else: receiver_node.last_seen = now + # Check if message with same hash already exists + existing = session.execute( + select(Message.id).where(Message.event_hash == event_hash) + ).scalar_one_or_none() + + if existing: + # Event already exists - just add this receiver to the junction table + if receiver_node: + added = add_event_receiver( + session=session, + event_type="message", + event_hash=event_hash, + receiver_node_id=receiver_node.id, + snr=snr, + received_at=now, + ) + if added: + logger.debug( + f"Added receiver {public_key[:12]}... to message " + f"(hash={event_hash[:8]}...)" + ) + return + # Create message record message = Message( receiver_node_id=receiver_node.id if receiver_node else None, @@ -138,6 +152,17 @@ def _handle_message( ) session.add(message) + # Add first receiver to junction table + if receiver_node: + add_event_receiver( + session=session, + event_type="message", + event_hash=event_hash, + receiver_node_id=receiver_node.id, + snr=snr, + received_at=now, + ) + if message_type == "contact": logger.info( f"Stored contact message from {pubkey_prefix!r}: " diff --git a/src/meshcore_hub/collector/handlers/telemetry.py b/src/meshcore_hub/collector/handlers/telemetry.py index 61ebbf4..d4e16ca 100644 --- a/src/meshcore_hub/collector/handlers/telemetry.py +++ b/src/meshcore_hub/collector/handlers/telemetry.py @@ -8,7 +8,7 @@ from sqlalchemy import select from meshcore_hub.common.database import DatabaseManager from meshcore_hub.common.hash_utils import compute_telemetry_hash -from meshcore_hub.common.models import Node, Telemetry +from meshcore_hub.common.models import Node, Telemetry, add_event_receiver logger = logging.getLogger(__name__) @@ -58,18 +58,7 @@ def handle_telemetry( ) with db.session_scope() as session: - # Check if telemetry with same hash already exists - existing = session.execute( - select(Telemetry.id).where(Telemetry.event_hash == event_hash) - ).scalar_one_or_none() - - if existing: - logger.debug( - f"Duplicate telemetry skipped (node={node_public_key[:12]}...)" - ) - return - - # Find receiver node + # Find or create receiver node first (needed for both new and duplicate events) receiver_node = None if public_key: receiver_query = select(Node).where(Node.public_key == public_key) @@ -86,6 +75,29 @@ def handle_telemetry( else: receiver_node.last_seen = now + # Check if telemetry with same hash already exists + existing = session.execute( + select(Telemetry.id).where(Telemetry.event_hash == event_hash) + ).scalar_one_or_none() + + if existing: + # Event already exists - just add this receiver to the junction table + if receiver_node: + added = add_event_receiver( + session=session, + event_type="telemetry", + event_hash=event_hash, + receiver_node_id=receiver_node.id, + snr=None, + received_at=now, + ) + if added: + logger.debug( + f"Added receiver {public_key[:12]}... to telemetry " + f"(node={node_public_key[:12]}...)" + ) + return + # Find or create reporting node reporting_node = None if node_public_key: @@ -115,6 +127,17 @@ def handle_telemetry( ) session.add(telemetry) + # Add first receiver to junction table + if receiver_node: + add_event_receiver( + session=session, + event_type="telemetry", + event_hash=event_hash, + receiver_node_id=receiver_node.id, + snr=None, + received_at=now, + ) + # Log telemetry values if parsed_data: values = ", ".join(f"{k}={v}" for k, v in parsed_data.items()) diff --git a/src/meshcore_hub/collector/handlers/trace.py b/src/meshcore_hub/collector/handlers/trace.py index 280f764..3d53d5b 100644 --- a/src/meshcore_hub/collector/handlers/trace.py +++ b/src/meshcore_hub/collector/handlers/trace.py @@ -8,7 +8,7 @@ from sqlalchemy import select from meshcore_hub.common.database import DatabaseManager from meshcore_hub.common.hash_utils import compute_trace_hash -from meshcore_hub.common.models import Node, TracePath +from meshcore_hub.common.models import Node, TracePath, add_event_receiver logger = logging.getLogger(__name__) @@ -45,16 +45,7 @@ def handle_trace_data( event_hash = compute_trace_hash(initiator_tag=initiator_tag) with db.session_scope() as session: - # Check if trace with same hash already exists - existing = session.execute( - select(TracePath.id).where(TracePath.event_hash == event_hash) - ).scalar_one_or_none() - - if existing: - logger.debug(f"Duplicate trace skipped (tag={initiator_tag})") - return - - # Find receiver node + # Find or create receiver node first (needed for both new and duplicate events) receiver_node = None if public_key: receiver_query = select(Node).where(Node.public_key == public_key) @@ -71,6 +62,29 @@ def handle_trace_data( else: receiver_node.last_seen = now + # Check if trace with same hash already exists + existing = session.execute( + select(TracePath.id).where(TracePath.event_hash == event_hash) + ).scalar_one_or_none() + + if existing: + # Event already exists - just add this receiver to the junction table + if receiver_node: + added = add_event_receiver( + session=session, + event_type="trace", + event_hash=event_hash, + receiver_node_id=receiver_node.id, + snr=None, # Trace events don't have a single SNR value + received_at=now, + ) + if added: + logger.debug( + f"Added receiver {public_key[:12]}... to trace " + f"(tag={initiator_tag})" + ) + return + # Create trace path record trace_path = TracePath( receiver_node_id=receiver_node.id if receiver_node else None, @@ -86,4 +100,15 @@ def handle_trace_data( ) session.add(trace_path) + # Add first receiver to junction table + if receiver_node: + add_event_receiver( + session=session, + event_type="trace", + event_hash=event_hash, + receiver_node_id=receiver_node.id, + snr=None, + received_at=now, + ) + logger.info(f"Stored trace data: tag={initiator_tag}, hops={hop_count}") diff --git a/src/meshcore_hub/common/models/__init__.py b/src/meshcore_hub/common/models/__init__.py index d1dd8f0..69eaeb9 100644 --- a/src/meshcore_hub/common/models/__init__.py +++ b/src/meshcore_hub/common/models/__init__.py @@ -10,6 +10,7 @@ from meshcore_hub.common.models.telemetry import Telemetry from meshcore_hub.common.models.event_log import EventLog from meshcore_hub.common.models.member import Member from meshcore_hub.common.models.member_node import MemberNode +from meshcore_hub.common.models.event_receiver import EventReceiver, add_event_receiver __all__ = [ "Base", @@ -23,4 +24,6 @@ __all__ = [ "EventLog", "Member", "MemberNode", + "EventReceiver", + "add_event_receiver", ] diff --git a/src/meshcore_hub/common/models/event_receiver.py b/src/meshcore_hub/common/models/event_receiver.py new file mode 100644 index 0000000..c17f447 --- /dev/null +++ b/src/meshcore_hub/common/models/event_receiver.py @@ -0,0 +1,127 @@ +"""EventReceiver model for tracking which nodes received each event.""" + +from datetime import datetime +from typing import TYPE_CHECKING, Optional +from uuid import uuid4 + +from sqlalchemy import DateTime, Float, ForeignKey, Index, String, UniqueConstraint +from sqlalchemy.dialects.sqlite import insert as sqlite_insert +from sqlalchemy.orm import Mapped, Session, mapped_column, relationship + +from meshcore_hub.common.models.base import Base, TimestampMixin, UUIDMixin, utc_now + +if TYPE_CHECKING: + from meshcore_hub.common.models.node import Node + + +class EventReceiver(Base, UUIDMixin, TimestampMixin): + """Junction model tracking which receivers observed each event. + + This table enables multi-receiver tracking for deduplicated events. + When multiple receiver nodes observe the same mesh event, each receiver + gets an entry in this table linked by the event_hash. + + Attributes: + id: UUID primary key + event_type: Type of event ('message', 'advertisement', 'trace', 'telemetry') + event_hash: Hash identifying the unique event (links to event tables) + receiver_node_id: FK to the node that received this event + snr: Signal-to-noise ratio at this receiver (if available) + received_at: When this specific receiver saw the event + created_at: Record creation timestamp + updated_at: Record update timestamp + """ + + __tablename__ = "event_receivers" + + event_type: Mapped[str] = mapped_column( + String(20), + nullable=False, + ) + event_hash: Mapped[str] = mapped_column( + String(32), + nullable=False, + index=True, + ) + receiver_node_id: Mapped[str] = mapped_column( + String(36), + ForeignKey("nodes.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + snr: Mapped[Optional[float]] = mapped_column( + Float, + nullable=True, + ) + received_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utc_now, + nullable=False, + ) + + # Relationship to receiver node + receiver_node: Mapped["Node"] = relationship( + "Node", + foreign_keys=[receiver_node_id], + ) + + __table_args__ = ( + UniqueConstraint( + "event_hash", "receiver_node_id", name="uq_event_receivers_hash_node" + ), + Index("ix_event_receivers_type_hash", "event_type", "event_hash"), + ) + + def __repr__(self) -> str: + return ( + f"" + ) + + +def add_event_receiver( + session: Session, + event_type: str, + event_hash: str, + receiver_node_id: str, + snr: Optional[float] = None, + received_at: Optional[datetime] = None, +) -> bool: + """Add a receiver to an event, handling duplicates gracefully. + + Uses INSERT OR IGNORE to handle the unique constraint on (event_hash, receiver_node_id). + + Args: + session: SQLAlchemy session + event_type: Type of event ('message', 'advertisement', 'trace', 'telemetry') + event_hash: Hash identifying the unique event + receiver_node_id: UUID of the receiver node + snr: Signal-to-noise ratio at this receiver (optional) + received_at: When this receiver saw the event (defaults to now) + + Returns: + True if a new receiver entry was added, False if it already existed. + """ + from datetime import timezone + + now = received_at or datetime.now(timezone.utc) + + stmt = ( + sqlite_insert(EventReceiver) + .values( + id=str(uuid4()), + event_type=event_type, + event_hash=event_hash, + receiver_node_id=receiver_node_id, + snr=snr, + received_at=now, + created_at=now, + updated_at=now, + ) + .on_conflict_do_nothing(index_elements=["event_hash", "receiver_node_id"]) + ) + result = session.execute(stmt) + # CursorResult has rowcount attribute + rowcount = getattr(result, "rowcount", 0) + return bool(rowcount and rowcount > 0) diff --git a/src/meshcore_hub/common/schemas/__init__.py b/src/meshcore_hub/common/schemas/__init__.py index f9978e9..f9b2580 100644 --- a/src/meshcore_hub/common/schemas/__init__.py +++ b/src/meshcore_hub/common/schemas/__init__.py @@ -20,6 +20,7 @@ from meshcore_hub.common.schemas.nodes import ( NodeTagRead, ) from meshcore_hub.common.schemas.messages import ( + ReceiverInfo, MessageRead, MessageList, MessageFilters, @@ -57,7 +58,8 @@ __all__ = [ "NodeTagCreate", "NodeTagUpdate", "NodeTagRead", - # Messages + # Messages & Events + "ReceiverInfo", "MessageRead", "MessageList", "MessageFilters", diff --git a/src/meshcore_hub/common/schemas/messages.py b/src/meshcore_hub/common/schemas/messages.py index 6e74076..c19a876 100644 --- a/src/meshcore_hub/common/schemas/messages.py +++ b/src/meshcore_hub/common/schemas/messages.py @@ -6,6 +6,24 @@ from typing import Literal, Optional from pydantic import BaseModel, Field +class ReceiverInfo(BaseModel): + """Information about a receiver that observed an event.""" + + node_id: str = Field(..., description="Receiver node UUID") + public_key: str = Field(..., description="Receiver node public key") + name: Optional[str] = Field(default=None, description="Receiver node name") + friendly_name: Optional[str] = Field( + default=None, description="Receiver friendly name from tags" + ) + snr: Optional[float] = Field( + default=None, description="Signal-to-noise ratio at this receiver" + ) + received_at: datetime = Field(..., description="When this receiver saw the event") + + class Config: + from_attributes = True + + class MessageRead(BaseModel): """Schema for reading a message.""" @@ -37,6 +55,9 @@ class MessageRead(BaseModel): ) received_at: datetime = Field(..., description="When received by interface") created_at: datetime = Field(..., description="Record creation timestamp") + receivers: list[ReceiverInfo] = Field( + default_factory=list, description="All receivers that observed this message" + ) class Config: from_attributes = True @@ -104,6 +125,10 @@ class AdvertisementRead(BaseModel): flags: Optional[int] = Field(default=None, description="Capability flags") received_at: datetime = Field(..., description="When received") created_at: datetime = Field(..., description="Record creation timestamp") + receivers: list[ReceiverInfo] = Field( + default_factory=list, + description="All receivers that observed this advertisement", + ) class Config: from_attributes = True @@ -137,6 +162,10 @@ class TracePathRead(BaseModel): hop_count: Optional[int] = Field(default=None, description="Total hops") received_at: datetime = Field(..., description="When received") created_at: datetime = Field(..., description="Record creation timestamp") + receivers: list[ReceiverInfo] = Field( + default_factory=list, + description="All receivers that observed this trace", + ) class Config: from_attributes = True @@ -163,6 +192,10 @@ class TelemetryRead(BaseModel): ) received_at: datetime = Field(..., description="When received") created_at: datetime = Field(..., description="Record creation timestamp") + receivers: list[ReceiverInfo] = Field( + default_factory=list, + description="All receivers that observed this telemetry", + ) class Config: from_attributes = True diff --git a/src/meshcore_hub/web/templates/advertisements.html b/src/meshcore_hub/web/templates/advertisements.html index d3c5df8..41b6b06 100644 --- a/src/meshcore_hub/web/templates/advertisements.html +++ b/src/meshcore_hub/web/templates/advertisements.html @@ -71,7 +71,31 @@ {% endif %} - {% if ad.received_by %} + {% if ad.receivers and ad.receivers|length > 1 %} + + {% elif ad.receivers and ad.receivers|length == 1 %} + + {% if ad.receivers[0].friendly_name or ad.receivers[0].name %} +
{{ ad.receivers[0].friendly_name or ad.receivers[0].name }}
+
{{ ad.receivers[0].public_key[:16] }}...
+ {% else %} + {{ ad.receivers[0].public_key[:16] }}... + {% endif %} +
+ {% elif ad.received_by %} {% if ad.receiver_friendly_name or ad.receiver_name %}
{{ ad.receiver_friendly_name or ad.receiver_name }}
diff --git a/src/meshcore_hub/web/templates/messages.html b/src/meshcore_hub/web/templates/messages.html index 8fe1425..2c68e29 100644 --- a/src/meshcore_hub/web/templates/messages.html +++ b/src/meshcore_hub/web/templates/messages.html @@ -87,7 +87,34 @@ {{ msg.text or '-' }} - {% if msg.received_by %} + {% if msg.receivers and msg.receivers|length > 1 %} +
+ {% elif msg.receivers and msg.receivers|length == 1 %} + + {% if msg.receivers[0].friendly_name or msg.receivers[0].name %} +
{{ msg.receivers[0].friendly_name or msg.receivers[0].name }}
+
{{ msg.receivers[0].public_key[:16] }}...
+ {% else %} + {{ msg.receivers[0].public_key[:16] }}... + {% endif %} +
+ {% elif msg.received_by %} {% if msg.receiver_friendly_name or msg.receiver_name %}
{{ msg.receiver_friendly_name or msg.receiver_name }}
diff --git a/tests/test_common/test_hash_utils.py b/tests/test_common/test_hash_utils.py index 1ca94c4..62a85c2 100644 --- a/tests/test_common/test_hash_utils.py +++ b/tests/test_common/test_hash_utils.py @@ -94,7 +94,7 @@ class TestComputeAdvertisementHash: def test_same_content_same_bucket_produces_same_hash(self) -> None: """Advertisements within the same time bucket should match.""" - # Two times within the same 5-minute bucket + # Two times within the same 5-minute (300 second) bucket time1 = datetime(2024, 1, 15, 10, 31, 0, tzinfo=timezone.utc) time2 = datetime(2024, 1, 15, 10, 33, 0, tzinfo=timezone.utc) @@ -104,7 +104,7 @@ class TestComputeAdvertisementHash: adv_type="chat", flags=128, received_at=time1, - bucket_minutes=5, + bucket_seconds=300, # 5 minutes ) hash2 = compute_advertisement_hash( public_key="a" * 64, @@ -112,14 +112,14 @@ class TestComputeAdvertisementHash: adv_type="chat", flags=128, received_at=time2, - bucket_minutes=5, + bucket_seconds=300, # 5 minutes ) assert hash1 == hash2 def test_different_bucket_produces_different_hash(self) -> None: """Advertisements in different time buckets should not match.""" - # Two times in different 5-minute buckets + # Two times in different 5-minute (300 second) buckets time1 = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc) time2 = datetime(2024, 1, 15, 10, 36, 0, tzinfo=timezone.utc) @@ -127,13 +127,13 @@ class TestComputeAdvertisementHash: public_key="a" * 64, name="Node1", received_at=time1, - bucket_minutes=5, + bucket_seconds=300, # 5 minutes ) hash2 = compute_advertisement_hash( public_key="a" * 64, name="Node1", received_at=time2, - bucket_minutes=5, + bucket_seconds=300, # 5 minutes ) assert hash1 != hash2 @@ -158,29 +158,29 @@ class TestComputeAdvertisementHash: time1 = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc) time2 = datetime(2024, 1, 15, 10, 35, 0, tzinfo=timezone.utc) - # With 5-minute bucket, these should be in different buckets + # With 5-minute (300s) bucket, these should be in different buckets hash1_5min = compute_advertisement_hash( public_key="a" * 64, received_at=time1, - bucket_minutes=5, + bucket_seconds=300, # 5 minutes ) hash2_5min = compute_advertisement_hash( public_key="a" * 64, received_at=time2, - bucket_minutes=5, + bucket_seconds=300, # 5 minutes ) assert hash1_5min != hash2_5min - # With 10-minute bucket, these should be in the same bucket + # With 10-minute (600s) bucket, these should be in the same bucket hash1_10min = compute_advertisement_hash( public_key="a" * 64, received_at=time1, - bucket_minutes=10, + bucket_seconds=600, # 10 minutes ) hash2_10min = compute_advertisement_hash( public_key="a" * 64, received_at=time2, - bucket_minutes=10, + bucket_seconds=600, # 10 minutes ) assert hash1_10min == hash2_10min @@ -216,13 +216,13 @@ class TestComputeTelemetryHash: node_public_key="a" * 64, parsed_data=data, received_at=time1, - bucket_minutes=5, + bucket_seconds=300, # 5 minutes ) hash2 = compute_telemetry_hash( node_public_key="a" * 64, parsed_data=data, received_at=time2, - bucket_minutes=5, + bucket_seconds=300, # 5 minutes ) assert hash1 == hash2