extract backend message lifecycle service

This commit is contained in:
Jack Kingsman
2026-03-09 16:56:23 -07:00
parent 9421c10e8f
commit 557af55ee8
5 changed files with 511 additions and 365 deletions

View File

@@ -4,7 +4,7 @@ from typing import TYPE_CHECKING
from meshcore import EventType
from app.models import CONTACT_TYPE_REPEATER, Contact, Message, MessagePath
from app.models import CONTACT_TYPE_REPEATER, Contact
from app.packet_processor import process_raw_packet
from app.repository import (
AmbiguousPublicKeyPrefixError,
@@ -12,6 +12,7 @@ from app.repository import (
ContactRepository,
MessageRepository,
)
from app.services.messages import create_fallback_direct_message, increment_ack_and_broadcast
from app.websocket import broadcast_event
if TYPE_CHECKING:
@@ -108,21 +109,21 @@ async def on_contact_message(event: "Event") -> None:
sender_name = contact.name if contact else None
path = payload.get("path")
path_len = payload.get("path_len")
msg_id = await MessageRepository.create(
msg_type="PRIV",
text=payload.get("text", ""),
message = await create_fallback_direct_message(
conversation_key=sender_pubkey,
text=payload.get("text", ""),
sender_timestamp=sender_timestamp,
received_at=received_at,
path=path,
path_len=path_len,
txt_type=txt_type,
signature=payload.get("signature"),
sender_key=sender_pubkey,
sender_name=sender_name,
sender_key=sender_pubkey,
broadcast_fn=broadcast_event,
)
if msg_id is None:
if message is None:
# Already handled by packet processor (or exact duplicate) - nothing more to do
logger.debug("DM from %s already processed by packet processor", sender_pubkey[:12])
return
@@ -131,31 +132,6 @@ async def on_contact_message(event: "Event") -> None:
# (likely because private key export is not available)
logger.debug("DM from %s handled by event handler (fallback path)", sender_pubkey[:12])
# Build paths array for broadcast
paths = (
[MessagePath(path=path or "", received_at=received_at, path_len=path_len)]
if path is not None
else None
)
# Broadcast the new message
broadcast_event(
"message",
Message(
id=msg_id,
type="PRIV",
conversation_key=sender_pubkey,
text=payload.get("text", ""),
sender_timestamp=sender_timestamp,
received_at=received_at,
paths=paths,
txt_type=txt_type,
signature=payload.get("signature"),
sender_key=sender_pubkey,
sender_name=sender_name,
).model_dump(),
)
# Update contact last_contacted (contact was already fetched above)
if contact:
await ContactRepository.update_last_contacted(sender_pubkey, received_at)
@@ -307,12 +283,10 @@ async def on_ack(event: "Event") -> None:
if ack_code in _pending_acks:
message_id, _, _ = _pending_acks.pop(ack_code)
logger.info("ACK received for message %d", message_id)
ack_count = await MessageRepository.increment_ack_count(message_id)
# DM ACKs don't carry path data, so paths is intentionally omitted.
# The frontend's mergePendingAck handles the missing field correctly,
# preserving any previously known paths.
broadcast_event("message_acked", {"message_id": message_id, "ack_count": ack_count})
await increment_ack_and_broadcast(message_id=message_id, broadcast_fn=broadcast_event)
else:
logger.debug("ACK code %s does not match any pending messages", ack_code)

View File

@@ -30,8 +30,6 @@ from app.decoder import (
from app.keystore import get_private_key, get_public_key, has_private_key
from app.models import (
CONTACT_TYPE_REPEATER,
Message,
MessagePath,
RawPacketBroadcast,
RawPacketDecryptedInfo,
)
@@ -43,6 +41,12 @@ from app.repository import (
MessageRepository,
RawPacketRepository,
)
from app.services.messages import (
create_dm_message_from_decrypted as _create_dm_message_from_decrypted,
)
from app.services.messages import (
create_message_from_decrypted as _create_message_from_decrypted,
)
from app.websocket import broadcast_error, broadcast_event
logger = logging.getLogger(__name__)
@@ -50,77 +54,6 @@ logger = logging.getLogger(__name__)
_raw_observation_counter = count(1)
async def _handle_duplicate_message(
packet_id: int,
msg_type: str,
conversation_key: str,
text: str,
sender_timestamp: int,
path: str | None,
received: int,
path_len: int | None = None,
) -> None:
"""Handle a duplicate message by updating paths/acks on the existing record.
Called when MessageRepository.create returns None (INSERT OR IGNORE hit a duplicate).
Looks up the existing message, adds the new path, increments ack count for outgoing
messages, and broadcasts the update to clients.
"""
existing_msg = await MessageRepository.get_by_content(
msg_type=msg_type,
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
)
if not existing_msg:
label = "message" if msg_type == "CHAN" else "DM"
logger.warning(
"Duplicate %s for %s but couldn't find existing",
label,
conversation_key[:12],
)
return
logger.debug(
"Duplicate %s for %s (msg_id=%d, outgoing=%s) - adding path",
msg_type,
conversation_key[:12],
existing_msg.id,
existing_msg.outgoing,
)
# Add path if provided
if path is not None:
paths = await MessageRepository.add_path(existing_msg.id, path, received, path_len)
else:
# Get current paths for broadcast
paths = existing_msg.paths or []
# Increment ack count for outgoing messages (echo confirmation)
if existing_msg.outgoing:
ack_count = await MessageRepository.increment_ack_count(existing_msg.id)
else:
ack_count = existing_msg.acked
# Only broadcast when something actually changed:
# - outgoing: ack count was incremented
# - path provided: a new path entry was appended
# The path=None case happens for direct-delivery DMs (0-hop, no routing bytes).
# A non-outgoing duplicate with no new path changes nothing in the DB, so skip.
if existing_msg.outgoing or path is not None:
broadcast_event(
"message_acked",
{
"message_id": existing_msg.id,
"ack_count": ack_count,
"paths": [p.model_dump() for p in paths] if paths else [],
},
)
# Mark this packet as decrypted
await RawPacketRepository.mark_decrypted(packet_id, existing_msg.id)
async def create_message_from_decrypted(
packet_id: int,
channel_key: str,
@@ -133,95 +66,21 @@ async def create_message_from_decrypted(
channel_name: str | None = None,
realtime: bool = True,
) -> int | None:
"""Create a message record from decrypted channel packet content.
This is the shared logic for storing decrypted channel messages,
used by both real-time packet processing and historical decryption.
Args:
packet_id: ID of the raw packet being processed
channel_key: Hex string channel key
channel_name: Channel name (e.g. "#general"), for bot context
sender: Sender name (will be prefixed to message) or None
message_text: The decrypted message content
timestamp: Sender timestamp from the packet
received_at: When the packet was received (defaults to now)
path: Hex-encoded routing path
realtime: If False, skip fanout dispatch (used for historical decryption)
Returns the message ID if created, None if duplicate.
"""
received = received_at or int(time.time())
# Format the message text with sender prefix if present
text = f"{sender}: {message_text}" if sender else message_text
# Normalize channel key to uppercase for consistency
channel_key_normalized = channel_key.upper()
# Resolve sender_key: look up contact by exact name match
resolved_sender_key: str | None = None
if sender:
candidates = await ContactRepository.get_by_name(sender)
if len(candidates) == 1:
resolved_sender_key = candidates[0].public_key
# Try to create message - INSERT OR IGNORE handles duplicates atomically
msg_id = await MessageRepository.create(
msg_type="CHAN",
text=text,
conversation_key=channel_key_normalized,
sender_timestamp=timestamp,
received_at=received,
"""Store a decrypted channel message via the shared message service."""
return await _create_message_from_decrypted(
packet_id=packet_id,
channel_key=channel_key,
sender=sender,
message_text=message_text,
timestamp=timestamp,
received_at=received_at,
path=path,
path_len=path_len,
sender_name=sender,
sender_key=resolved_sender_key,
)
if msg_id is None:
# Duplicate message detected - this happens when:
# 1. Our own outgoing message echoes back (flood routing)
# 2. Same message arrives via multiple paths before first is committed
# In either case, add the path to the existing message.
await _handle_duplicate_message(
packet_id, "CHAN", channel_key_normalized, text, timestamp, path, received, path_len
)
return None
logger.info("Stored channel message %d for channel %s", msg_id, channel_key_normalized[:8])
# Mark the raw packet as decrypted
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
# Build paths array for broadcast
# Use "is not None" to include empty string (direct/0-hop messages)
paths = (
[MessagePath(path=path or "", received_at=received, path_len=path_len)]
if path is not None
else None
)
# Broadcast new message to connected clients (and fanout modules when realtime)
broadcast_event(
"message",
Message(
id=msg_id,
type="CHAN",
conversation_key=channel_key_normalized,
text=text,
sender_timestamp=timestamp,
received_at=received,
paths=paths,
sender_name=sender,
sender_key=resolved_sender_key,
channel_name=channel_name,
).model_dump(),
channel_name=channel_name,
realtime=realtime,
broadcast_fn=broadcast_event,
)
return msg_id
async def create_dm_message_from_decrypted(
packet_id: int,
@@ -234,111 +93,20 @@ async def create_dm_message_from_decrypted(
outgoing: bool = False,
realtime: bool = True,
) -> int | None:
"""Create a message record from decrypted direct message packet content.
This is the shared logic for storing decrypted direct messages,
used by both real-time packet processing and historical decryption.
Args:
packet_id: ID of the raw packet being processed
decrypted: DecryptedDirectMessage from decoder
their_public_key: The contact's full 64-char public key (conversation_key)
our_public_key: Our public key (to determine direction), or None
received_at: When the packet was received (defaults to now)
path: Hex-encoded routing path
outgoing: Whether this is an outgoing message (we sent it)
realtime: If False, skip fanout dispatch (used for historical decryption)
Returns the message ID if created, None if duplicate.
"""
# Check if sender is a repeater - repeaters only send CLI responses, not chat messages.
# CLI responses are handled by the command endpoint, not stored in chat history.
contact = await ContactRepository.get_by_key(their_public_key)
if contact and contact.type == CONTACT_TYPE_REPEATER:
logger.debug(
"Skipping message from repeater %s (CLI responses not stored): %s",
their_public_key[:12],
(decrypted.message or "")[:50],
)
return None
received = received_at or int(time.time())
# conversation_key is always the other party's public key
conversation_key = their_public_key.lower()
# Resolve sender name for incoming messages (used for name-based blocking)
sender_name = contact.name if contact and not outgoing else None
# Try to create message - INSERT OR IGNORE handles duplicates atomically
msg_id = await MessageRepository.create(
msg_type="PRIV",
text=decrypted.message,
conversation_key=conversation_key,
sender_timestamp=decrypted.timestamp,
received_at=received,
"""Store a decrypted direct message via the shared message service."""
return await _create_dm_message_from_decrypted(
packet_id=packet_id,
decrypted=decrypted,
their_public_key=their_public_key,
our_public_key=our_public_key,
received_at=received_at,
path=path,
path_len=path_len,
outgoing=outgoing,
sender_key=conversation_key if not outgoing else None,
sender_name=sender_name,
)
if msg_id is None:
# Duplicate message detected
await _handle_duplicate_message(
packet_id,
"PRIV",
conversation_key,
decrypted.message,
decrypted.timestamp,
path,
received,
path_len,
)
return None
logger.info(
"Stored direct message %d for contact %s (outgoing=%s)",
msg_id,
conversation_key[:12],
outgoing,
)
# Mark the raw packet as decrypted
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
# Build paths array for broadcast
paths = (
[MessagePath(path=path or "", received_at=received, path_len=path_len)]
if path is not None
else None
)
# Broadcast new message to connected clients (and fanout modules when realtime)
sender_name = contact.name if contact and not outgoing else None
broadcast_event(
"message",
Message(
id=msg_id,
type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
received_at=received,
paths=paths,
outgoing=outgoing,
sender_name=sender_name,
sender_key=conversation_key if not outgoing else None,
).model_dump(),
realtime=realtime,
broadcast_fn=broadcast_event,
)
# Update contact's last_contacted timestamp (for sorting)
await ContactRepository.update_last_contacted(conversation_key, received)
return msg_id
async def run_historical_dm_decryption(
private_key_bytes: bytes,

View File

@@ -16,6 +16,11 @@ from app.models import (
from app.radio import radio_manager
from app.region_scope import normalize_region_scope
from app.repository import AmbiguousPublicKeyPrefixError, AppSettingsRepository, MessageRepository
from app.services.messages import (
build_message_model,
create_outgoing_channel_message,
create_outgoing_direct_message,
)
from app.websocket import broadcast_error, broadcast_event
logger = logging.getLogger(__name__)
@@ -239,15 +244,15 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message:
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
# Store outgoing message
message_id = await MessageRepository.create(
msg_type="PRIV",
text=request.text,
message = await create_outgoing_direct_message(
conversation_key=db_contact.public_key.lower(),
text=request.text,
sender_timestamp=now,
received_at=now,
outgoing=True,
broadcast_fn=broadcast_event,
message_repository=MessageRepository,
)
if message_id is None:
if message is None:
raise HTTPException(
status_code=500,
detail="Failed to store outgoing message - unexpected duplicate",
@@ -261,23 +266,8 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message:
suggested_timeout: int = result.payload.get("suggested_timeout", 10000) # default 10s
if expected_ack:
ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack
track_pending_ack(ack_code, message_id, suggested_timeout)
logger.debug("Tracking ACK %s for message %d", ack_code, message_id)
message = Message(
id=message_id,
type="PRIV",
conversation_key=db_contact.public_key.lower(),
text=request.text,
sender_timestamp=now,
received_at=now,
outgoing=True,
acked=0,
)
# Broadcast so all connected clients (not just sender) see the outgoing message immediately.
# Fanout modules (including bots) are triggered via broadcast_event's realtime dispatch.
broadcast_event("message", message.model_dump())
track_pending_ack(ack_code, message.id, suggested_timeout)
logger.debug("Tracking ACK %s for message %d", ack_code, message.id)
return message
@@ -351,57 +341,39 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message:
# Store outgoing immediately after send to avoid a race where
# our own echo lands before persistence.
message_id = await MessageRepository.create(
msg_type="CHAN",
text=text_with_sender,
outgoing_message = await create_outgoing_channel_message(
conversation_key=channel_key_upper,
text=text_with_sender,
sender_timestamp=now,
received_at=now,
outgoing=True,
sender_name=radio_name or None,
sender_key=our_public_key,
channel_name=db_channel.name,
broadcast_fn=broadcast_event,
message_repository=MessageRepository,
)
if message_id is None:
if outgoing_message is None:
raise HTTPException(
status_code=500,
detail="Failed to store outgoing message - unexpected duplicate",
)
# Broadcast immediately so all connected clients see the message promptly.
# This ensures the message exists in frontend state when echo-driven
# `message_acked` events arrive.
broadcast_event(
"message",
Message(
id=message_id,
type="CHAN",
conversation_key=channel_key_upper,
text=text_with_sender,
sender_timestamp=now,
received_at=now,
outgoing=True,
acked=0,
sender_name=radio_name or None,
sender_key=our_public_key,
channel_name=db_channel.name,
).model_dump(),
)
message_id = outgoing_message.id
if message_id is None or now is None:
raise HTTPException(status_code=500, detail="Failed to store outgoing message")
acked_count, paths = await MessageRepository.get_ack_and_paths(message_id)
message = Message(
id=message_id,
type="CHAN",
message = build_message_model(
message_id=message_id,
msg_type="CHAN",
conversation_key=channel_key_upper,
text=text_with_sender,
sender_timestamp=now,
received_at=now,
paths=paths,
outgoing=True,
acked=acked_count,
paths=paths,
sender_name=radio_name or None,
sender_key=our_public_key,
channel_name=db_channel.name,
@@ -492,17 +464,18 @@ async def resend_channel_message(
# For new-timestamp resend, create a new message row and broadcast it
if new_timestamp:
new_msg_id = await MessageRepository.create(
msg_type="CHAN",
text=msg.text,
new_message = await create_outgoing_channel_message(
conversation_key=msg.conversation_key,
text=msg.text,
sender_timestamp=now,
received_at=now,
outgoing=True,
sender_name=radio_name or None,
sender_key=resend_public_key,
channel_name=db_channel.name,
broadcast_fn=broadcast_event,
message_repository=MessageRepository,
)
if new_msg_id is None:
if new_message is None:
# Timestamp-second collision (same text+channel within the same second).
# The radio already transmitted, so log and return the original ID rather
# than surfacing a 500 for a message that was successfully sent over the air.
@@ -512,30 +485,13 @@ async def resend_channel_message(
)
return {"status": "ok", "message_id": message_id}
broadcast_event(
"message",
Message(
id=new_msg_id,
type="CHAN",
conversation_key=msg.conversation_key,
text=msg.text,
sender_timestamp=now,
received_at=now,
outgoing=True,
acked=0,
sender_name=radio_name or None,
sender_key=resend_public_key,
channel_name=db_channel.name,
).model_dump(),
)
logger.info(
"Resent channel message %d as new message %d to %s",
message_id,
new_msg_id,
new_message.id,
db_channel.name,
)
return {"status": "ok", "message_id": new_msg_id}
return {"status": "ok", "message_id": new_message.id}
logger.info("Resent channel message %d to %s", message_id, db_channel.name)
return {"status": "ok", "message_id": message_id}

1
app/services/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Backend service-layer helpers."""

447
app/services/messages.py Normal file
View File

@@ -0,0 +1,447 @@
import logging
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from app.models import CONTACT_TYPE_REPEATER, Message, MessagePath
from app.repository import ContactRepository, MessageRepository, RawPacketRepository
if TYPE_CHECKING:
from app.decoder import DecryptedDirectMessage
logger = logging.getLogger(__name__)
BroadcastFn = Callable[..., Any]
def build_message_paths(
path: str | None,
received_at: int,
path_len: int | None = None,
) -> list[MessagePath] | None:
"""Build the single-path list used by message payloads."""
return (
[MessagePath(path=path or "", received_at=received_at, path_len=path_len)]
if path is not None
else None
)
def build_message_model(
*,
message_id: int,
msg_type: str,
conversation_key: str,
text: str,
sender_timestamp: int | None,
received_at: int,
paths: list[MessagePath] | None = None,
txt_type: int = 0,
signature: str | None = None,
sender_key: str | None = None,
outgoing: bool = False,
acked: int = 0,
sender_name: str | None = None,
channel_name: str | None = None,
) -> Message:
"""Build a Message model with the canonical backend payload shape."""
return Message(
id=message_id,
type=msg_type,
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
paths=paths,
txt_type=txt_type,
signature=signature,
sender_key=sender_key,
outgoing=outgoing,
acked=acked,
sender_name=sender_name,
channel_name=channel_name,
)
def broadcast_message(
*,
message: Message,
broadcast_fn: BroadcastFn,
realtime: bool | None = None,
) -> None:
"""Broadcast a message payload, preserving the caller's broadcast signature."""
payload = message.model_dump()
if realtime is None:
broadcast_fn("message", payload)
else:
broadcast_fn("message", payload, realtime=realtime)
def broadcast_message_acked(
*,
message_id: int,
ack_count: int,
paths: list[MessagePath] | None,
broadcast_fn: BroadcastFn,
) -> None:
"""Broadcast a message_acked payload."""
broadcast_fn(
"message_acked",
{
"message_id": message_id,
"ack_count": ack_count,
"paths": [path.model_dump() for path in paths] if paths else [],
},
)
async def increment_ack_and_broadcast(
*,
message_id: int,
broadcast_fn: BroadcastFn,
) -> int:
"""Increment a message's ACK count and broadcast the update."""
ack_count = await MessageRepository.increment_ack_count(message_id)
broadcast_fn("message_acked", {"message_id": message_id, "ack_count": ack_count})
return ack_count
async def handle_duplicate_message(
*,
packet_id: int,
msg_type: str,
conversation_key: str,
text: str,
sender_timestamp: int,
path: str | None,
received_at: int,
path_len: int | None = None,
broadcast_fn: BroadcastFn,
) -> None:
"""Handle a duplicate message by updating paths/acks on the existing record."""
existing_msg = await MessageRepository.get_by_content(
msg_type=msg_type,
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
)
if not existing_msg:
label = "message" if msg_type == "CHAN" else "DM"
logger.warning(
"Duplicate %s for %s but couldn't find existing",
label,
conversation_key[:12],
)
return
logger.debug(
"Duplicate %s for %s (msg_id=%d, outgoing=%s) - adding path",
msg_type,
conversation_key[:12],
existing_msg.id,
existing_msg.outgoing,
)
if path is not None:
paths = await MessageRepository.add_path(existing_msg.id, path, received_at, path_len)
else:
paths = existing_msg.paths or []
if existing_msg.outgoing:
ack_count = await MessageRepository.increment_ack_count(existing_msg.id)
else:
ack_count = existing_msg.acked
if existing_msg.outgoing or path is not None:
broadcast_message_acked(
message_id=existing_msg.id,
ack_count=ack_count,
paths=paths,
broadcast_fn=broadcast_fn,
)
await RawPacketRepository.mark_decrypted(packet_id, existing_msg.id)
async def create_message_from_decrypted(
*,
packet_id: int,
channel_key: str,
sender: str | None,
message_text: str,
timestamp: int,
received_at: int | None = None,
path: str | None = None,
path_len: int | None = None,
channel_name: str | None = None,
realtime: bool = True,
broadcast_fn: BroadcastFn,
) -> int | None:
"""Store and broadcast a decrypted channel message."""
received = received_at or int(time.time())
text = f"{sender}: {message_text}" if sender else message_text
channel_key_normalized = channel_key.upper()
resolved_sender_key: str | None = None
if sender:
candidates = await ContactRepository.get_by_name(sender)
if len(candidates) == 1:
resolved_sender_key = candidates[0].public_key
msg_id = await MessageRepository.create(
msg_type="CHAN",
text=text,
conversation_key=channel_key_normalized,
sender_timestamp=timestamp,
received_at=received,
path=path,
path_len=path_len,
sender_name=sender,
sender_key=resolved_sender_key,
)
if msg_id is None:
await handle_duplicate_message(
packet_id=packet_id,
msg_type="CHAN",
conversation_key=channel_key_normalized,
text=text,
sender_timestamp=timestamp,
path=path,
received_at=received,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
logger.info("Stored channel message %d for channel %s", msg_id, channel_key_normalized[:8])
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
broadcast_message(
message=build_message_model(
message_id=msg_id,
msg_type="CHAN",
conversation_key=channel_key_normalized,
text=text,
sender_timestamp=timestamp,
received_at=received,
paths=build_message_paths(path, received, path_len),
sender_name=sender,
sender_key=resolved_sender_key,
channel_name=channel_name,
),
broadcast_fn=broadcast_fn,
realtime=realtime,
)
return msg_id
async def create_dm_message_from_decrypted(
*,
packet_id: int,
decrypted: "DecryptedDirectMessage",
their_public_key: str,
our_public_key: str | None,
received_at: int | None = None,
path: str | None = None,
path_len: int | None = None,
outgoing: bool = False,
realtime: bool = True,
broadcast_fn: BroadcastFn,
) -> int | None:
"""Store and broadcast a decrypted direct message."""
contact = await ContactRepository.get_by_key(their_public_key)
if contact and contact.type == CONTACT_TYPE_REPEATER:
logger.debug(
"Skipping message from repeater %s (CLI responses not stored): %s",
their_public_key[:12],
(decrypted.message or "")[:50],
)
return None
received = received_at or int(time.time())
conversation_key = their_public_key.lower()
sender_name = contact.name if contact and not outgoing else None
msg_id = await MessageRepository.create(
msg_type="PRIV",
text=decrypted.message,
conversation_key=conversation_key,
sender_timestamp=decrypted.timestamp,
received_at=received,
path=path,
path_len=path_len,
outgoing=outgoing,
sender_key=conversation_key if not outgoing else None,
sender_name=sender_name,
)
if msg_id is None:
await handle_duplicate_message(
packet_id=packet_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
path=path,
received_at=received,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
logger.info(
"Stored direct message %d for contact %s (outgoing=%s)",
msg_id,
conversation_key[:12],
outgoing,
)
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
broadcast_message(
message=build_message_model(
message_id=msg_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
received_at=received,
paths=build_message_paths(path, received, path_len),
outgoing=outgoing,
sender_name=sender_name,
sender_key=conversation_key if not outgoing else None,
),
broadcast_fn=broadcast_fn,
realtime=realtime,
)
await ContactRepository.update_last_contacted(conversation_key, received)
return msg_id
async def create_fallback_direct_message(
*,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
path: str | None,
path_len: int | None,
txt_type: int,
signature: str | None,
sender_name: str | None,
sender_key: str | None,
broadcast_fn: BroadcastFn,
message_repository=MessageRepository,
) -> Message | None:
"""Store and broadcast a CONTACT_MSG_RECV fallback direct message."""
msg_id = await message_repository.create(
msg_type="PRIV",
text=text,
conversation_key=conversation_key,
sender_timestamp=sender_timestamp,
received_at=received_at,
path=path,
path_len=path_len,
txt_type=txt_type,
signature=signature,
sender_key=sender_key,
sender_name=sender_name,
)
if msg_id is None:
return None
message = build_message_model(
message_id=msg_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
paths=build_message_paths(path, received_at, path_len),
txt_type=txt_type,
signature=signature,
sender_key=sender_key,
sender_name=sender_name,
)
broadcast_message(message=message, broadcast_fn=broadcast_fn)
return message
async def create_outgoing_direct_message(
*,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
broadcast_fn: BroadcastFn,
message_repository=MessageRepository,
) -> Message | None:
"""Store and broadcast an outgoing direct message."""
msg_id = await message_repository.create(
msg_type="PRIV",
text=text,
conversation_key=conversation_key,
sender_timestamp=sender_timestamp,
received_at=received_at,
outgoing=True,
)
if msg_id is None:
return None
message = build_message_model(
message_id=msg_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
outgoing=True,
acked=0,
)
broadcast_message(message=message, broadcast_fn=broadcast_fn)
return message
async def create_outgoing_channel_message(
*,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
sender_name: str | None,
sender_key: str | None,
channel_name: str | None,
broadcast_fn: BroadcastFn,
message_repository=MessageRepository,
) -> Message | None:
"""Store and broadcast an outgoing channel message."""
msg_id = await message_repository.create(
msg_type="CHAN",
text=text,
conversation_key=conversation_key,
sender_timestamp=sender_timestamp,
received_at=received_at,
outgoing=True,
sender_name=sender_name,
sender_key=sender_key,
)
if msg_id is None:
return None
message = build_message_model(
message_id=msg_id,
msg_type="CHAN",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
outgoing=True,
acked=0,
sender_name=sender_name,
sender_key=sender_key,
channel_name=channel_name,
)
broadcast_message(message=message, broadcast_fn=broadcast_fn)
return message