From 557af55ee82f75497a35f3813924eb3a41c06e9f Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Mon, 9 Mar 2026 16:56:23 -0700 Subject: [PATCH] extract backend message lifecycle service --- app/event_handlers.py | 42 +--- app/packet_processor.py | 280 +++--------------------- app/routers/messages.py | 106 +++------- app/services/__init__.py | 1 + app/services/messages.py | 447 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 511 insertions(+), 365 deletions(-) create mode 100644 app/services/__init__.py create mode 100644 app/services/messages.py diff --git a/app/event_handlers.py b/app/event_handlers.py index a2dd087..0e1db4b 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING from meshcore import EventType -from app.models import CONTACT_TYPE_REPEATER, Contact, Message, MessagePath +from app.models import CONTACT_TYPE_REPEATER, Contact from app.packet_processor import process_raw_packet from app.repository import ( AmbiguousPublicKeyPrefixError, @@ -12,6 +12,7 @@ from app.repository import ( ContactRepository, MessageRepository, ) +from app.services.messages import create_fallback_direct_message, increment_ack_and_broadcast from app.websocket import broadcast_event if TYPE_CHECKING: @@ -108,21 +109,21 @@ async def on_contact_message(event: "Event") -> None: sender_name = contact.name if contact else None path = payload.get("path") path_len = payload.get("path_len") - msg_id = await MessageRepository.create( - msg_type="PRIV", - text=payload.get("text", ""), + message = await create_fallback_direct_message( conversation_key=sender_pubkey, + text=payload.get("text", ""), sender_timestamp=sender_timestamp, received_at=received_at, path=path, path_len=path_len, txt_type=txt_type, signature=payload.get("signature"), - sender_key=sender_pubkey, sender_name=sender_name, + sender_key=sender_pubkey, + broadcast_fn=broadcast_event, ) - if msg_id is None: + if message is None: # Already handled by packet processor (or exact duplicate) - nothing more to do logger.debug("DM from %s already processed by packet processor", sender_pubkey[:12]) return @@ -131,31 +132,6 @@ async def on_contact_message(event: "Event") -> None: # (likely because private key export is not available) logger.debug("DM from %s handled by event handler (fallback path)", sender_pubkey[:12]) - # Build paths array for broadcast - paths = ( - [MessagePath(path=path or "", received_at=received_at, path_len=path_len)] - if path is not None - else None - ) - - # Broadcast the new message - broadcast_event( - "message", - Message( - id=msg_id, - type="PRIV", - conversation_key=sender_pubkey, - text=payload.get("text", ""), - sender_timestamp=sender_timestamp, - received_at=received_at, - paths=paths, - txt_type=txt_type, - signature=payload.get("signature"), - sender_key=sender_pubkey, - sender_name=sender_name, - ).model_dump(), - ) - # Update contact last_contacted (contact was already fetched above) if contact: await ContactRepository.update_last_contacted(sender_pubkey, received_at) @@ -307,12 +283,10 @@ async def on_ack(event: "Event") -> None: if ack_code in _pending_acks: message_id, _, _ = _pending_acks.pop(ack_code) logger.info("ACK received for message %d", message_id) - - ack_count = await MessageRepository.increment_ack_count(message_id) # DM ACKs don't carry path data, so paths is intentionally omitted. # The frontend's mergePendingAck handles the missing field correctly, # preserving any previously known paths. - broadcast_event("message_acked", {"message_id": message_id, "ack_count": ack_count}) + await increment_ack_and_broadcast(message_id=message_id, broadcast_fn=broadcast_event) else: logger.debug("ACK code %s does not match any pending messages", ack_code) diff --git a/app/packet_processor.py b/app/packet_processor.py index d9e5e38..d676290 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -30,8 +30,6 @@ from app.decoder import ( from app.keystore import get_private_key, get_public_key, has_private_key from app.models import ( CONTACT_TYPE_REPEATER, - Message, - MessagePath, RawPacketBroadcast, RawPacketDecryptedInfo, ) @@ -43,6 +41,12 @@ from app.repository import ( MessageRepository, RawPacketRepository, ) +from app.services.messages import ( + create_dm_message_from_decrypted as _create_dm_message_from_decrypted, +) +from app.services.messages import ( + create_message_from_decrypted as _create_message_from_decrypted, +) from app.websocket import broadcast_error, broadcast_event logger = logging.getLogger(__name__) @@ -50,77 +54,6 @@ logger = logging.getLogger(__name__) _raw_observation_counter = count(1) -async def _handle_duplicate_message( - packet_id: int, - msg_type: str, - conversation_key: str, - text: str, - sender_timestamp: int, - path: str | None, - received: int, - path_len: int | None = None, -) -> None: - """Handle a duplicate message by updating paths/acks on the existing record. - - Called when MessageRepository.create returns None (INSERT OR IGNORE hit a duplicate). - Looks up the existing message, adds the new path, increments ack count for outgoing - messages, and broadcasts the update to clients. - """ - existing_msg = await MessageRepository.get_by_content( - msg_type=msg_type, - conversation_key=conversation_key, - text=text, - sender_timestamp=sender_timestamp, - ) - if not existing_msg: - label = "message" if msg_type == "CHAN" else "DM" - logger.warning( - "Duplicate %s for %s but couldn't find existing", - label, - conversation_key[:12], - ) - return - - logger.debug( - "Duplicate %s for %s (msg_id=%d, outgoing=%s) - adding path", - msg_type, - conversation_key[:12], - existing_msg.id, - existing_msg.outgoing, - ) - - # Add path if provided - if path is not None: - paths = await MessageRepository.add_path(existing_msg.id, path, received, path_len) - else: - # Get current paths for broadcast - paths = existing_msg.paths or [] - - # Increment ack count for outgoing messages (echo confirmation) - if existing_msg.outgoing: - ack_count = await MessageRepository.increment_ack_count(existing_msg.id) - else: - ack_count = existing_msg.acked - - # Only broadcast when something actually changed: - # - outgoing: ack count was incremented - # - path provided: a new path entry was appended - # The path=None case happens for direct-delivery DMs (0-hop, no routing bytes). - # A non-outgoing duplicate with no new path changes nothing in the DB, so skip. - if existing_msg.outgoing or path is not None: - broadcast_event( - "message_acked", - { - "message_id": existing_msg.id, - "ack_count": ack_count, - "paths": [p.model_dump() for p in paths] if paths else [], - }, - ) - - # Mark this packet as decrypted - await RawPacketRepository.mark_decrypted(packet_id, existing_msg.id) - - async def create_message_from_decrypted( packet_id: int, channel_key: str, @@ -133,95 +66,21 @@ async def create_message_from_decrypted( channel_name: str | None = None, realtime: bool = True, ) -> int | None: - """Create a message record from decrypted channel packet content. - - This is the shared logic for storing decrypted channel messages, - used by both real-time packet processing and historical decryption. - - Args: - packet_id: ID of the raw packet being processed - channel_key: Hex string channel key - channel_name: Channel name (e.g. "#general"), for bot context - sender: Sender name (will be prefixed to message) or None - message_text: The decrypted message content - timestamp: Sender timestamp from the packet - received_at: When the packet was received (defaults to now) - path: Hex-encoded routing path - realtime: If False, skip fanout dispatch (used for historical decryption) - - Returns the message ID if created, None if duplicate. - """ - received = received_at or int(time.time()) - - # Format the message text with sender prefix if present - text = f"{sender}: {message_text}" if sender else message_text - - # Normalize channel key to uppercase for consistency - channel_key_normalized = channel_key.upper() - - # Resolve sender_key: look up contact by exact name match - resolved_sender_key: str | None = None - if sender: - candidates = await ContactRepository.get_by_name(sender) - if len(candidates) == 1: - resolved_sender_key = candidates[0].public_key - - # Try to create message - INSERT OR IGNORE handles duplicates atomically - msg_id = await MessageRepository.create( - msg_type="CHAN", - text=text, - conversation_key=channel_key_normalized, - sender_timestamp=timestamp, - received_at=received, + """Store a decrypted channel message via the shared message service.""" + return await _create_message_from_decrypted( + packet_id=packet_id, + channel_key=channel_key, + sender=sender, + message_text=message_text, + timestamp=timestamp, + received_at=received_at, path=path, path_len=path_len, - sender_name=sender, - sender_key=resolved_sender_key, - ) - - if msg_id is None: - # Duplicate message detected - this happens when: - # 1. Our own outgoing message echoes back (flood routing) - # 2. Same message arrives via multiple paths before first is committed - # In either case, add the path to the existing message. - await _handle_duplicate_message( - packet_id, "CHAN", channel_key_normalized, text, timestamp, path, received, path_len - ) - return None - - logger.info("Stored channel message %d for channel %s", msg_id, channel_key_normalized[:8]) - - # Mark the raw packet as decrypted - await RawPacketRepository.mark_decrypted(packet_id, msg_id) - - # Build paths array for broadcast - # Use "is not None" to include empty string (direct/0-hop messages) - paths = ( - [MessagePath(path=path or "", received_at=received, path_len=path_len)] - if path is not None - else None - ) - - # Broadcast new message to connected clients (and fanout modules when realtime) - broadcast_event( - "message", - Message( - id=msg_id, - type="CHAN", - conversation_key=channel_key_normalized, - text=text, - sender_timestamp=timestamp, - received_at=received, - paths=paths, - sender_name=sender, - sender_key=resolved_sender_key, - channel_name=channel_name, - ).model_dump(), + channel_name=channel_name, realtime=realtime, + broadcast_fn=broadcast_event, ) - return msg_id - async def create_dm_message_from_decrypted( packet_id: int, @@ -234,111 +93,20 @@ async def create_dm_message_from_decrypted( outgoing: bool = False, realtime: bool = True, ) -> int | None: - """Create a message record from decrypted direct message packet content. - - This is the shared logic for storing decrypted direct messages, - used by both real-time packet processing and historical decryption. - - Args: - packet_id: ID of the raw packet being processed - decrypted: DecryptedDirectMessage from decoder - their_public_key: The contact's full 64-char public key (conversation_key) - our_public_key: Our public key (to determine direction), or None - received_at: When the packet was received (defaults to now) - path: Hex-encoded routing path - outgoing: Whether this is an outgoing message (we sent it) - realtime: If False, skip fanout dispatch (used for historical decryption) - - Returns the message ID if created, None if duplicate. - """ - # Check if sender is a repeater - repeaters only send CLI responses, not chat messages. - # CLI responses are handled by the command endpoint, not stored in chat history. - contact = await ContactRepository.get_by_key(their_public_key) - if contact and contact.type == CONTACT_TYPE_REPEATER: - logger.debug( - "Skipping message from repeater %s (CLI responses not stored): %s", - their_public_key[:12], - (decrypted.message or "")[:50], - ) - return None - - received = received_at or int(time.time()) - - # conversation_key is always the other party's public key - conversation_key = their_public_key.lower() - - # Resolve sender name for incoming messages (used for name-based blocking) - sender_name = contact.name if contact and not outgoing else None - - # Try to create message - INSERT OR IGNORE handles duplicates atomically - msg_id = await MessageRepository.create( - msg_type="PRIV", - text=decrypted.message, - conversation_key=conversation_key, - sender_timestamp=decrypted.timestamp, - received_at=received, + """Store a decrypted direct message via the shared message service.""" + return await _create_dm_message_from_decrypted( + packet_id=packet_id, + decrypted=decrypted, + their_public_key=their_public_key, + our_public_key=our_public_key, + received_at=received_at, path=path, path_len=path_len, outgoing=outgoing, - sender_key=conversation_key if not outgoing else None, - sender_name=sender_name, - ) - - if msg_id is None: - # Duplicate message detected - await _handle_duplicate_message( - packet_id, - "PRIV", - conversation_key, - decrypted.message, - decrypted.timestamp, - path, - received, - path_len, - ) - return None - - logger.info( - "Stored direct message %d for contact %s (outgoing=%s)", - msg_id, - conversation_key[:12], - outgoing, - ) - - # Mark the raw packet as decrypted - await RawPacketRepository.mark_decrypted(packet_id, msg_id) - - # Build paths array for broadcast - paths = ( - [MessagePath(path=path or "", received_at=received, path_len=path_len)] - if path is not None - else None - ) - - # Broadcast new message to connected clients (and fanout modules when realtime) - sender_name = contact.name if contact and not outgoing else None - broadcast_event( - "message", - Message( - id=msg_id, - type="PRIV", - conversation_key=conversation_key, - text=decrypted.message, - sender_timestamp=decrypted.timestamp, - received_at=received, - paths=paths, - outgoing=outgoing, - sender_name=sender_name, - sender_key=conversation_key if not outgoing else None, - ).model_dump(), realtime=realtime, + broadcast_fn=broadcast_event, ) - # Update contact's last_contacted timestamp (for sorting) - await ContactRepository.update_last_contacted(conversation_key, received) - - return msg_id - async def run_historical_dm_decryption( private_key_bytes: bytes, diff --git a/app/routers/messages.py b/app/routers/messages.py index b15c960..b66d612 100644 --- a/app/routers/messages.py +++ b/app/routers/messages.py @@ -16,6 +16,11 @@ from app.models import ( from app.radio import radio_manager from app.region_scope import normalize_region_scope from app.repository import AmbiguousPublicKeyPrefixError, AppSettingsRepository, MessageRepository +from app.services.messages import ( + build_message_model, + create_outgoing_channel_message, + create_outgoing_direct_message, +) from app.websocket import broadcast_error, broadcast_event logger = logging.getLogger(__name__) @@ -239,15 +244,15 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message: raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}") # Store outgoing message - message_id = await MessageRepository.create( - msg_type="PRIV", - text=request.text, + message = await create_outgoing_direct_message( conversation_key=db_contact.public_key.lower(), + text=request.text, sender_timestamp=now, received_at=now, - outgoing=True, + broadcast_fn=broadcast_event, + message_repository=MessageRepository, ) - if message_id is None: + if message is None: raise HTTPException( status_code=500, detail="Failed to store outgoing message - unexpected duplicate", @@ -261,23 +266,8 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message: suggested_timeout: int = result.payload.get("suggested_timeout", 10000) # default 10s if expected_ack: ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack - track_pending_ack(ack_code, message_id, suggested_timeout) - logger.debug("Tracking ACK %s for message %d", ack_code, message_id) - - message = Message( - id=message_id, - type="PRIV", - conversation_key=db_contact.public_key.lower(), - text=request.text, - sender_timestamp=now, - received_at=now, - outgoing=True, - acked=0, - ) - - # Broadcast so all connected clients (not just sender) see the outgoing message immediately. - # Fanout modules (including bots) are triggered via broadcast_event's realtime dispatch. - broadcast_event("message", message.model_dump()) + track_pending_ack(ack_code, message.id, suggested_timeout) + logger.debug("Tracking ACK %s for message %d", ack_code, message.id) return message @@ -351,57 +341,39 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message: # Store outgoing immediately after send to avoid a race where # our own echo lands before persistence. - message_id = await MessageRepository.create( - msg_type="CHAN", - text=text_with_sender, + outgoing_message = await create_outgoing_channel_message( conversation_key=channel_key_upper, + text=text_with_sender, sender_timestamp=now, received_at=now, - outgoing=True, sender_name=radio_name or None, sender_key=our_public_key, + channel_name=db_channel.name, + broadcast_fn=broadcast_event, + message_repository=MessageRepository, ) - if message_id is None: + if outgoing_message is None: raise HTTPException( status_code=500, detail="Failed to store outgoing message - unexpected duplicate", ) - - # Broadcast immediately so all connected clients see the message promptly. - # This ensures the message exists in frontend state when echo-driven - # `message_acked` events arrive. - broadcast_event( - "message", - Message( - id=message_id, - type="CHAN", - conversation_key=channel_key_upper, - text=text_with_sender, - sender_timestamp=now, - received_at=now, - outgoing=True, - acked=0, - sender_name=radio_name or None, - sender_key=our_public_key, - channel_name=db_channel.name, - ).model_dump(), - ) + message_id = outgoing_message.id if message_id is None or now is None: raise HTTPException(status_code=500, detail="Failed to store outgoing message") acked_count, paths = await MessageRepository.get_ack_and_paths(message_id) - message = Message( - id=message_id, - type="CHAN", + message = build_message_model( + message_id=message_id, + msg_type="CHAN", conversation_key=channel_key_upper, text=text_with_sender, sender_timestamp=now, received_at=now, + paths=paths, outgoing=True, acked=acked_count, - paths=paths, sender_name=radio_name or None, sender_key=our_public_key, channel_name=db_channel.name, @@ -492,17 +464,18 @@ async def resend_channel_message( # For new-timestamp resend, create a new message row and broadcast it if new_timestamp: - new_msg_id = await MessageRepository.create( - msg_type="CHAN", - text=msg.text, + new_message = await create_outgoing_channel_message( conversation_key=msg.conversation_key, + text=msg.text, sender_timestamp=now, received_at=now, - outgoing=True, sender_name=radio_name or None, sender_key=resend_public_key, + channel_name=db_channel.name, + broadcast_fn=broadcast_event, + message_repository=MessageRepository, ) - if new_msg_id is None: + if new_message is None: # Timestamp-second collision (same text+channel within the same second). # The radio already transmitted, so log and return the original ID rather # than surfacing a 500 for a message that was successfully sent over the air. @@ -512,30 +485,13 @@ async def resend_channel_message( ) return {"status": "ok", "message_id": message_id} - broadcast_event( - "message", - Message( - id=new_msg_id, - type="CHAN", - conversation_key=msg.conversation_key, - text=msg.text, - sender_timestamp=now, - received_at=now, - outgoing=True, - acked=0, - sender_name=radio_name or None, - sender_key=resend_public_key, - channel_name=db_channel.name, - ).model_dump(), - ) - logger.info( "Resent channel message %d as new message %d to %s", message_id, - new_msg_id, + new_message.id, db_channel.name, ) - return {"status": "ok", "message_id": new_msg_id} + return {"status": "ok", "message_id": new_message.id} logger.info("Resent channel message %d to %s", message_id, db_channel.name) return {"status": "ok", "message_id": message_id} diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..c43f9f8 --- /dev/null +++ b/app/services/__init__.py @@ -0,0 +1 @@ +"""Backend service-layer helpers.""" diff --git a/app/services/messages.py b/app/services/messages.py new file mode 100644 index 0000000..f96ee11 --- /dev/null +++ b/app/services/messages.py @@ -0,0 +1,447 @@ +import logging +import time +from collections.abc import Callable +from typing import TYPE_CHECKING, Any + +from app.models import CONTACT_TYPE_REPEATER, Message, MessagePath +from app.repository import ContactRepository, MessageRepository, RawPacketRepository + +if TYPE_CHECKING: + from app.decoder import DecryptedDirectMessage + +logger = logging.getLogger(__name__) + +BroadcastFn = Callable[..., Any] + + +def build_message_paths( + path: str | None, + received_at: int, + path_len: int | None = None, +) -> list[MessagePath] | None: + """Build the single-path list used by message payloads.""" + return ( + [MessagePath(path=path or "", received_at=received_at, path_len=path_len)] + if path is not None + else None + ) + + +def build_message_model( + *, + message_id: int, + msg_type: str, + conversation_key: str, + text: str, + sender_timestamp: int | None, + received_at: int, + paths: list[MessagePath] | None = None, + txt_type: int = 0, + signature: str | None = None, + sender_key: str | None = None, + outgoing: bool = False, + acked: int = 0, + sender_name: str | None = None, + channel_name: str | None = None, +) -> Message: + """Build a Message model with the canonical backend payload shape.""" + return Message( + id=message_id, + type=msg_type, + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + received_at=received_at, + paths=paths, + txt_type=txt_type, + signature=signature, + sender_key=sender_key, + outgoing=outgoing, + acked=acked, + sender_name=sender_name, + channel_name=channel_name, + ) + + +def broadcast_message( + *, + message: Message, + broadcast_fn: BroadcastFn, + realtime: bool | None = None, +) -> None: + """Broadcast a message payload, preserving the caller's broadcast signature.""" + payload = message.model_dump() + if realtime is None: + broadcast_fn("message", payload) + else: + broadcast_fn("message", payload, realtime=realtime) + + +def broadcast_message_acked( + *, + message_id: int, + ack_count: int, + paths: list[MessagePath] | None, + broadcast_fn: BroadcastFn, +) -> None: + """Broadcast a message_acked payload.""" + broadcast_fn( + "message_acked", + { + "message_id": message_id, + "ack_count": ack_count, + "paths": [path.model_dump() for path in paths] if paths else [], + }, + ) + + +async def increment_ack_and_broadcast( + *, + message_id: int, + broadcast_fn: BroadcastFn, +) -> int: + """Increment a message's ACK count and broadcast the update.""" + ack_count = await MessageRepository.increment_ack_count(message_id) + broadcast_fn("message_acked", {"message_id": message_id, "ack_count": ack_count}) + return ack_count + + +async def handle_duplicate_message( + *, + packet_id: int, + msg_type: str, + conversation_key: str, + text: str, + sender_timestamp: int, + path: str | None, + received_at: int, + path_len: int | None = None, + broadcast_fn: BroadcastFn, +) -> None: + """Handle a duplicate message by updating paths/acks on the existing record.""" + existing_msg = await MessageRepository.get_by_content( + msg_type=msg_type, + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + ) + if not existing_msg: + label = "message" if msg_type == "CHAN" else "DM" + logger.warning( + "Duplicate %s for %s but couldn't find existing", + label, + conversation_key[:12], + ) + return + + logger.debug( + "Duplicate %s for %s (msg_id=%d, outgoing=%s) - adding path", + msg_type, + conversation_key[:12], + existing_msg.id, + existing_msg.outgoing, + ) + + if path is not None: + paths = await MessageRepository.add_path(existing_msg.id, path, received_at, path_len) + else: + paths = existing_msg.paths or [] + + if existing_msg.outgoing: + ack_count = await MessageRepository.increment_ack_count(existing_msg.id) + else: + ack_count = existing_msg.acked + + if existing_msg.outgoing or path is not None: + broadcast_message_acked( + message_id=existing_msg.id, + ack_count=ack_count, + paths=paths, + broadcast_fn=broadcast_fn, + ) + + await RawPacketRepository.mark_decrypted(packet_id, existing_msg.id) + + +async def create_message_from_decrypted( + *, + packet_id: int, + channel_key: str, + sender: str | None, + message_text: str, + timestamp: int, + received_at: int | None = None, + path: str | None = None, + path_len: int | None = None, + channel_name: str | None = None, + realtime: bool = True, + broadcast_fn: BroadcastFn, +) -> int | None: + """Store and broadcast a decrypted channel message.""" + received = received_at or int(time.time()) + text = f"{sender}: {message_text}" if sender else message_text + channel_key_normalized = channel_key.upper() + + resolved_sender_key: str | None = None + if sender: + candidates = await ContactRepository.get_by_name(sender) + if len(candidates) == 1: + resolved_sender_key = candidates[0].public_key + + msg_id = await MessageRepository.create( + msg_type="CHAN", + text=text, + conversation_key=channel_key_normalized, + sender_timestamp=timestamp, + received_at=received, + path=path, + path_len=path_len, + sender_name=sender, + sender_key=resolved_sender_key, + ) + + if msg_id is None: + await handle_duplicate_message( + packet_id=packet_id, + msg_type="CHAN", + conversation_key=channel_key_normalized, + text=text, + sender_timestamp=timestamp, + path=path, + received_at=received, + path_len=path_len, + broadcast_fn=broadcast_fn, + ) + return None + + logger.info("Stored channel message %d for channel %s", msg_id, channel_key_normalized[:8]) + await RawPacketRepository.mark_decrypted(packet_id, msg_id) + + broadcast_message( + message=build_message_model( + message_id=msg_id, + msg_type="CHAN", + conversation_key=channel_key_normalized, + text=text, + sender_timestamp=timestamp, + received_at=received, + paths=build_message_paths(path, received, path_len), + sender_name=sender, + sender_key=resolved_sender_key, + channel_name=channel_name, + ), + broadcast_fn=broadcast_fn, + realtime=realtime, + ) + + return msg_id + + +async def create_dm_message_from_decrypted( + *, + packet_id: int, + decrypted: "DecryptedDirectMessage", + their_public_key: str, + our_public_key: str | None, + received_at: int | None = None, + path: str | None = None, + path_len: int | None = None, + outgoing: bool = False, + realtime: bool = True, + broadcast_fn: BroadcastFn, +) -> int | None: + """Store and broadcast a decrypted direct message.""" + contact = await ContactRepository.get_by_key(their_public_key) + if contact and contact.type == CONTACT_TYPE_REPEATER: + logger.debug( + "Skipping message from repeater %s (CLI responses not stored): %s", + their_public_key[:12], + (decrypted.message or "")[:50], + ) + return None + + received = received_at or int(time.time()) + conversation_key = their_public_key.lower() + sender_name = contact.name if contact and not outgoing else None + + msg_id = await MessageRepository.create( + msg_type="PRIV", + text=decrypted.message, + conversation_key=conversation_key, + sender_timestamp=decrypted.timestamp, + received_at=received, + path=path, + path_len=path_len, + outgoing=outgoing, + sender_key=conversation_key if not outgoing else None, + sender_name=sender_name, + ) + + if msg_id is None: + await handle_duplicate_message( + packet_id=packet_id, + msg_type="PRIV", + conversation_key=conversation_key, + text=decrypted.message, + sender_timestamp=decrypted.timestamp, + path=path, + received_at=received, + path_len=path_len, + broadcast_fn=broadcast_fn, + ) + return None + + logger.info( + "Stored direct message %d for contact %s (outgoing=%s)", + msg_id, + conversation_key[:12], + outgoing, + ) + await RawPacketRepository.mark_decrypted(packet_id, msg_id) + + broadcast_message( + message=build_message_model( + message_id=msg_id, + msg_type="PRIV", + conversation_key=conversation_key, + text=decrypted.message, + sender_timestamp=decrypted.timestamp, + received_at=received, + paths=build_message_paths(path, received, path_len), + outgoing=outgoing, + sender_name=sender_name, + sender_key=conversation_key if not outgoing else None, + ), + broadcast_fn=broadcast_fn, + realtime=realtime, + ) + + await ContactRepository.update_last_contacted(conversation_key, received) + return msg_id + + +async def create_fallback_direct_message( + *, + conversation_key: str, + text: str, + sender_timestamp: int, + received_at: int, + path: str | None, + path_len: int | None, + txt_type: int, + signature: str | None, + sender_name: str | None, + sender_key: str | None, + broadcast_fn: BroadcastFn, + message_repository=MessageRepository, +) -> Message | None: + """Store and broadcast a CONTACT_MSG_RECV fallback direct message.""" + msg_id = await message_repository.create( + msg_type="PRIV", + text=text, + conversation_key=conversation_key, + sender_timestamp=sender_timestamp, + received_at=received_at, + path=path, + path_len=path_len, + txt_type=txt_type, + signature=signature, + sender_key=sender_key, + sender_name=sender_name, + ) + if msg_id is None: + return None + + message = build_message_model( + message_id=msg_id, + msg_type="PRIV", + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + received_at=received_at, + paths=build_message_paths(path, received_at, path_len), + txt_type=txt_type, + signature=signature, + sender_key=sender_key, + sender_name=sender_name, + ) + broadcast_message(message=message, broadcast_fn=broadcast_fn) + return message + + +async def create_outgoing_direct_message( + *, + conversation_key: str, + text: str, + sender_timestamp: int, + received_at: int, + broadcast_fn: BroadcastFn, + message_repository=MessageRepository, +) -> Message | None: + """Store and broadcast an outgoing direct message.""" + msg_id = await message_repository.create( + msg_type="PRIV", + text=text, + conversation_key=conversation_key, + sender_timestamp=sender_timestamp, + received_at=received_at, + outgoing=True, + ) + if msg_id is None: + return None + + message = build_message_model( + message_id=msg_id, + msg_type="PRIV", + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + received_at=received_at, + outgoing=True, + acked=0, + ) + broadcast_message(message=message, broadcast_fn=broadcast_fn) + return message + + +async def create_outgoing_channel_message( + *, + conversation_key: str, + text: str, + sender_timestamp: int, + received_at: int, + sender_name: str | None, + sender_key: str | None, + channel_name: str | None, + broadcast_fn: BroadcastFn, + message_repository=MessageRepository, +) -> Message | None: + """Store and broadcast an outgoing channel message.""" + msg_id = await message_repository.create( + msg_type="CHAN", + text=text, + conversation_key=conversation_key, + sender_timestamp=sender_timestamp, + received_at=received_at, + outgoing=True, + sender_name=sender_name, + sender_key=sender_key, + ) + if msg_id is None: + return None + + message = build_message_model( + message_id=msg_id, + msg_type="CHAN", + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + received_at=received_at, + outgoing=True, + acked=0, + sender_name=sender_name, + sender_key=sender_key, + channel_name=channel_name, + ) + broadcast_message(message=message, broadcast_fn=broadcast_fn) + return message