mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
extract message send service
This commit is contained in:
@@ -1,9 +1,7 @@
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Query
|
||||
from meshcore import EventType
|
||||
|
||||
from app.dependencies import require_connected
|
||||
from app.event_handlers import track_pending_ack
|
||||
@@ -14,12 +12,11 @@ from app.models import (
|
||||
SendDirectMessageRequest,
|
||||
)
|
||||
from app.radio import radio_manager
|
||||
from app.region_scope import normalize_region_scope
|
||||
from app.repository import AmbiguousPublicKeyPrefixError, AppSettingsRepository, MessageRepository
|
||||
from app.services.messages import (
|
||||
build_message_model,
|
||||
create_outgoing_channel_message,
|
||||
create_outgoing_direct_message,
|
||||
from app.services.message_send import (
|
||||
resend_channel_message_record,
|
||||
send_channel_message_to_channel,
|
||||
send_direct_message_to_contact,
|
||||
)
|
||||
from app.websocket import broadcast_error, broadcast_event
|
||||
|
||||
@@ -27,105 +24,6 @@ logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/messages", tags=["messages"])
|
||||
|
||||
|
||||
async def _send_channel_message_with_effective_scope(
|
||||
*,
|
||||
mc,
|
||||
channel,
|
||||
key_bytes: bytes,
|
||||
text: str,
|
||||
timestamp_bytes: bytes,
|
||||
action_label: str,
|
||||
) -> Any:
|
||||
"""Send a channel message, temporarily overriding flood scope when configured."""
|
||||
override_scope = normalize_region_scope(channel.flood_scope_override)
|
||||
baseline_scope = ""
|
||||
|
||||
if override_scope:
|
||||
settings = await AppSettingsRepository.get()
|
||||
baseline_scope = normalize_region_scope(settings.flood_scope)
|
||||
|
||||
if override_scope and override_scope != baseline_scope:
|
||||
logger.info(
|
||||
"Temporarily applying channel flood_scope override for %s: %r",
|
||||
channel.name,
|
||||
override_scope,
|
||||
)
|
||||
override_result = await mc.commands.set_flood_scope(override_scope)
|
||||
if override_result is not None and override_result.type == EventType.ERROR:
|
||||
logger.warning(
|
||||
"Failed to apply channel flood_scope override for %s: %s",
|
||||
channel.name,
|
||||
override_result.payload,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=(
|
||||
f"Failed to apply regional override {override_scope!r} before {action_label}: "
|
||||
f"{override_result.payload}"
|
||||
),
|
||||
)
|
||||
|
||||
try:
|
||||
set_result = await mc.commands.set_channel(
|
||||
channel_idx=TEMP_RADIO_SLOT,
|
||||
channel_name=channel.name,
|
||||
channel_secret=key_bytes,
|
||||
)
|
||||
if set_result.type == EventType.ERROR:
|
||||
logger.warning(
|
||||
"Failed to set channel on radio slot %d before %s: %s",
|
||||
TEMP_RADIO_SLOT,
|
||||
action_label,
|
||||
set_result.payload,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to configure channel on radio before {action_label}",
|
||||
)
|
||||
|
||||
return await mc.commands.send_chan_msg(
|
||||
chan=TEMP_RADIO_SLOT,
|
||||
msg=text,
|
||||
timestamp=timestamp_bytes,
|
||||
)
|
||||
finally:
|
||||
if override_scope and override_scope != baseline_scope:
|
||||
try:
|
||||
restore_result = await mc.commands.set_flood_scope(
|
||||
baseline_scope if baseline_scope else ""
|
||||
)
|
||||
if restore_result is not None and restore_result.type == EventType.ERROR:
|
||||
logger.error(
|
||||
"Failed to restore baseline flood_scope after sending to %s: %s",
|
||||
channel.name,
|
||||
restore_result.payload,
|
||||
)
|
||||
broadcast_error(
|
||||
"Regional override restore failed",
|
||||
(
|
||||
f"Sent to {channel.name}, but restoring flood scope failed. "
|
||||
"The radio may still be region-scoped. Consider rebooting the radio."
|
||||
),
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"Restored baseline flood_scope after channel send: %r",
|
||||
baseline_scope or "(disabled)",
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to restore baseline flood_scope after sending to %s",
|
||||
channel.name,
|
||||
)
|
||||
broadcast_error(
|
||||
"Regional override restore failed",
|
||||
(
|
||||
f"Sent to {channel.name}, but restoring flood scope failed. "
|
||||
"The radio may still be region-scoped. Consider rebooting the radio."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@router.get("/around/{message_id}", response_model=MessagesAroundResponse)
|
||||
async def get_messages_around(
|
||||
message_id: int,
|
||||
@@ -211,65 +109,16 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message:
|
||||
status_code=404, detail=f"Contact not found in database: {request.destination}"
|
||||
)
|
||||
|
||||
# Always add/update the contact on radio before sending.
|
||||
# The library cache (get_contact_by_key_prefix) can be stale after radio reboot,
|
||||
# so we can't rely on it to know if the firmware has the contact.
|
||||
# add_contact is idempotent - updates if exists, adds if not.
|
||||
contact_data = db_contact.to_radio_dict()
|
||||
async with radio_manager.radio_operation("send_direct_message") as mc:
|
||||
logger.debug("Ensuring contact %s is on radio before sending", db_contact.public_key[:12])
|
||||
add_result = await mc.commands.add_contact(contact_data)
|
||||
if add_result.type == EventType.ERROR:
|
||||
logger.warning("Failed to add contact to radio: %s", add_result.payload)
|
||||
# Continue anyway - might still work if contact exists
|
||||
|
||||
# Get the contact from the library cache (may have updated info like path)
|
||||
contact = mc.get_contact_by_key_prefix(db_contact.public_key[:12])
|
||||
if not contact:
|
||||
contact = contact_data
|
||||
|
||||
logger.info("Sending direct message to %s", db_contact.public_key[:12])
|
||||
|
||||
# Capture timestamp BEFORE sending so we can pass the same value to both the radio
|
||||
# and the database. This ensures consistency for deduplication.
|
||||
now = int(time.time())
|
||||
|
||||
result = await mc.commands.send_msg(
|
||||
dst=contact,
|
||||
msg=request.text,
|
||||
timestamp=now,
|
||||
)
|
||||
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
||||
|
||||
# Store outgoing message
|
||||
message = await create_outgoing_direct_message(
|
||||
conversation_key=db_contact.public_key.lower(),
|
||||
return await send_direct_message_to_contact(
|
||||
contact=db_contact,
|
||||
text=request.text,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
radio_manager=radio_manager,
|
||||
broadcast_fn=broadcast_event,
|
||||
track_pending_ack_fn=track_pending_ack,
|
||||
now_fn=time.time,
|
||||
message_repository=MessageRepository,
|
||||
contact_repository=ContactRepository,
|
||||
)
|
||||
if message is None:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to store outgoing message - unexpected duplicate",
|
||||
)
|
||||
|
||||
# Update last_contacted for the contact
|
||||
await ContactRepository.update_last_contacted(db_contact.public_key.lower(), now)
|
||||
|
||||
# Track the expected ACK for this message
|
||||
expected_ack = result.payload.get("expected_ack")
|
||||
suggested_timeout: int = result.payload.get("suggested_timeout", 10000) # default 10s
|
||||
if expected_ack:
|
||||
ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack
|
||||
track_pending_ack(ack_code, message.id, suggested_timeout)
|
||||
logger.debug("Tracking ACK %s for message %d", ack_code, message.id)
|
||||
|
||||
return message
|
||||
|
||||
|
||||
# Temporary radio slot used for sending channel messages
|
||||
@@ -307,80 +156,19 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message:
|
||||
TEMP_RADIO_SLOT,
|
||||
expected_hash,
|
||||
)
|
||||
channel_key_upper = request.channel_key.upper()
|
||||
message_id: int | None = None
|
||||
now: int | None = None
|
||||
radio_name: str = ""
|
||||
text_with_sender: str = request.text
|
||||
|
||||
our_public_key: str | None = None
|
||||
|
||||
async with radio_manager.radio_operation("send_channel_message") as mc:
|
||||
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
||||
our_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None
|
||||
text_with_sender = f"{radio_name}: {request.text}" if radio_name else request.text
|
||||
logger.info("Sending channel message to %s: %s", db_channel.name, request.text[:50])
|
||||
|
||||
# Capture timestamp BEFORE sending so we can pass the same value to both the radio
|
||||
# and the database. This ensures the echo's timestamp matches our stored message
|
||||
# for proper deduplication.
|
||||
now = int(time.time())
|
||||
timestamp_bytes = now.to_bytes(4, "little")
|
||||
|
||||
result = await _send_channel_message_with_effective_scope(
|
||||
mc=mc,
|
||||
channel=db_channel,
|
||||
key_bytes=key_bytes,
|
||||
text=request.text,
|
||||
timestamp_bytes=timestamp_bytes,
|
||||
action_label="sending message",
|
||||
)
|
||||
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
||||
|
||||
# Store outgoing immediately after send to avoid a race where
|
||||
# our own echo lands before persistence.
|
||||
outgoing_message = await create_outgoing_channel_message(
|
||||
conversation_key=channel_key_upper,
|
||||
text=text_with_sender,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=our_public_key,
|
||||
channel_name=db_channel.name,
|
||||
broadcast_fn=broadcast_event,
|
||||
message_repository=MessageRepository,
|
||||
)
|
||||
if outgoing_message is None:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to store outgoing message - unexpected duplicate",
|
||||
)
|
||||
message_id = outgoing_message.id
|
||||
|
||||
if message_id is None or now is None:
|
||||
raise HTTPException(status_code=500, detail="Failed to store outgoing message")
|
||||
|
||||
acked_count, paths = await MessageRepository.get_ack_and_paths(message_id)
|
||||
|
||||
message = build_message_model(
|
||||
message_id=message_id,
|
||||
msg_type="CHAN",
|
||||
conversation_key=channel_key_upper,
|
||||
text=text_with_sender,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
paths=paths,
|
||||
outgoing=True,
|
||||
acked=acked_count,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=our_public_key,
|
||||
channel_name=db_channel.name,
|
||||
return await send_channel_message_to_channel(
|
||||
channel=db_channel,
|
||||
channel_key_upper=request.channel_key.upper(),
|
||||
key_bytes=key_bytes,
|
||||
text=request.text,
|
||||
radio_manager=radio_manager,
|
||||
broadcast_fn=broadcast_event,
|
||||
error_broadcast_fn=broadcast_error,
|
||||
now_fn=time.time,
|
||||
temp_radio_slot=TEMP_RADIO_SLOT,
|
||||
message_repository=MessageRepository,
|
||||
)
|
||||
|
||||
return message
|
||||
|
||||
|
||||
RESEND_WINDOW_SECONDS = 30
|
||||
|
||||
@@ -425,73 +213,14 @@ async def resend_channel_message(
|
||||
if not db_channel:
|
||||
raise HTTPException(status_code=404, detail=f"Channel {msg.conversation_key} not found")
|
||||
|
||||
# Choose timestamp: original for byte-perfect, fresh for new-timestamp
|
||||
if new_timestamp:
|
||||
now = int(time.time())
|
||||
timestamp_bytes = now.to_bytes(4, "little")
|
||||
else:
|
||||
timestamp_bytes = msg.sender_timestamp.to_bytes(4, "little")
|
||||
|
||||
try:
|
||||
key_bytes = bytes.fromhex(msg.conversation_key)
|
||||
except ValueError:
|
||||
raise HTTPException(
|
||||
status_code=400, detail=f"Invalid channel key format: {msg.conversation_key}"
|
||||
) from None
|
||||
|
||||
resend_public_key: str | None = None
|
||||
|
||||
async with radio_manager.radio_operation("resend_channel_message") as mc:
|
||||
# Strip sender prefix: DB stores "RadioName: message" but radio needs "message"
|
||||
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
||||
resend_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None
|
||||
text_to_send = msg.text
|
||||
if radio_name and text_to_send.startswith(f"{radio_name}: "):
|
||||
text_to_send = text_to_send[len(f"{radio_name}: ") :]
|
||||
|
||||
result = await _send_channel_message_with_effective_scope(
|
||||
mc=mc,
|
||||
channel=db_channel,
|
||||
key_bytes=key_bytes,
|
||||
text=text_to_send,
|
||||
timestamp_bytes=timestamp_bytes,
|
||||
action_label="resending message",
|
||||
)
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to resend message: {result.payload}"
|
||||
)
|
||||
|
||||
# For new-timestamp resend, create a new message row and broadcast it
|
||||
if new_timestamp:
|
||||
new_message = await create_outgoing_channel_message(
|
||||
conversation_key=msg.conversation_key,
|
||||
text=msg.text,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=resend_public_key,
|
||||
channel_name=db_channel.name,
|
||||
broadcast_fn=broadcast_event,
|
||||
message_repository=MessageRepository,
|
||||
)
|
||||
if new_message is None:
|
||||
# Timestamp-second collision (same text+channel within the same second).
|
||||
# The radio already transmitted, so log and return the original ID rather
|
||||
# than surfacing a 500 for a message that was successfully sent over the air.
|
||||
logger.warning(
|
||||
"Duplicate timestamp collision resending message %d — radio sent but DB row not created",
|
||||
message_id,
|
||||
)
|
||||
return {"status": "ok", "message_id": message_id}
|
||||
|
||||
logger.info(
|
||||
"Resent channel message %d as new message %d to %s",
|
||||
message_id,
|
||||
new_message.id,
|
||||
db_channel.name,
|
||||
)
|
||||
return {"status": "ok", "message_id": new_message.id}
|
||||
|
||||
logger.info("Resent channel message %d to %s", message_id, db_channel.name)
|
||||
return {"status": "ok", "message_id": message_id}
|
||||
return await resend_channel_message_record(
|
||||
message=msg,
|
||||
channel=db_channel,
|
||||
new_timestamp=new_timestamp,
|
||||
radio_manager=radio_manager,
|
||||
broadcast_fn=broadcast_event,
|
||||
error_broadcast_fn=broadcast_error,
|
||||
now_fn=time.time,
|
||||
temp_radio_slot=TEMP_RADIO_SLOT,
|
||||
message_repository=MessageRepository,
|
||||
)
|
||||
|
||||
352
app/services/message_send.py
Normal file
352
app/services/message_send.py
Normal file
@@ -0,0 +1,352 @@
|
||||
"""Shared send/resend orchestration for outgoing messages."""
|
||||
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from fastapi import HTTPException
|
||||
from meshcore import EventType
|
||||
|
||||
from app.region_scope import normalize_region_scope
|
||||
from app.repository import AppSettingsRepository, ContactRepository, MessageRepository
|
||||
from app.services.messages import (
|
||||
build_message_model,
|
||||
create_outgoing_channel_message,
|
||||
create_outgoing_direct_message,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BroadcastFn = Callable[..., Any]
|
||||
TrackAckFn = Callable[[str, int, int], None]
|
||||
NowFn = Callable[[], float]
|
||||
|
||||
|
||||
async def send_channel_message_with_effective_scope(
|
||||
*,
|
||||
mc,
|
||||
channel,
|
||||
key_bytes: bytes,
|
||||
text: str,
|
||||
timestamp_bytes: bytes,
|
||||
action_label: str,
|
||||
temp_radio_slot: int,
|
||||
error_broadcast_fn: BroadcastFn,
|
||||
app_settings_repository=AppSettingsRepository,
|
||||
) -> Any:
|
||||
"""Send a channel message, temporarily overriding flood scope when configured."""
|
||||
override_scope = normalize_region_scope(channel.flood_scope_override)
|
||||
baseline_scope = ""
|
||||
|
||||
if override_scope:
|
||||
settings = await app_settings_repository.get()
|
||||
baseline_scope = normalize_region_scope(settings.flood_scope)
|
||||
|
||||
if override_scope and override_scope != baseline_scope:
|
||||
logger.info(
|
||||
"Temporarily applying channel flood_scope override for %s: %r",
|
||||
channel.name,
|
||||
override_scope,
|
||||
)
|
||||
override_result = await mc.commands.set_flood_scope(override_scope)
|
||||
if override_result is not None and override_result.type == EventType.ERROR:
|
||||
logger.warning(
|
||||
"Failed to apply channel flood_scope override for %s: %s",
|
||||
channel.name,
|
||||
override_result.payload,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=(
|
||||
f"Failed to apply regional override {override_scope!r} before {action_label}: "
|
||||
f"{override_result.payload}"
|
||||
),
|
||||
)
|
||||
|
||||
try:
|
||||
set_result = await mc.commands.set_channel(
|
||||
channel_idx=temp_radio_slot,
|
||||
channel_name=channel.name,
|
||||
channel_secret=key_bytes,
|
||||
)
|
||||
if set_result.type == EventType.ERROR:
|
||||
logger.warning(
|
||||
"Failed to set channel on radio slot %d before %s: %s",
|
||||
temp_radio_slot,
|
||||
action_label,
|
||||
set_result.payload,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to configure channel on radio before {action_label}",
|
||||
)
|
||||
|
||||
return await mc.commands.send_chan_msg(
|
||||
chan=temp_radio_slot,
|
||||
msg=text,
|
||||
timestamp=timestamp_bytes,
|
||||
)
|
||||
finally:
|
||||
if override_scope and override_scope != baseline_scope:
|
||||
try:
|
||||
restore_result = await mc.commands.set_flood_scope(
|
||||
baseline_scope if baseline_scope else ""
|
||||
)
|
||||
if restore_result is not None and restore_result.type == EventType.ERROR:
|
||||
logger.error(
|
||||
"Failed to restore baseline flood_scope after sending to %s: %s",
|
||||
channel.name,
|
||||
restore_result.payload,
|
||||
)
|
||||
error_broadcast_fn(
|
||||
"Regional override restore failed",
|
||||
(
|
||||
f"Sent to {channel.name}, but restoring flood scope failed. "
|
||||
"The radio may still be region-scoped. Consider rebooting the radio."
|
||||
),
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"Restored baseline flood_scope after channel send: %r",
|
||||
baseline_scope or "(disabled)",
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to restore baseline flood_scope after sending to %s",
|
||||
channel.name,
|
||||
)
|
||||
error_broadcast_fn(
|
||||
"Regional override restore failed",
|
||||
(
|
||||
f"Sent to {channel.name}, but restoring flood scope failed. "
|
||||
"The radio may still be region-scoped. Consider rebooting the radio."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def send_direct_message_to_contact(
|
||||
*,
|
||||
contact,
|
||||
text: str,
|
||||
radio_manager,
|
||||
broadcast_fn: BroadcastFn,
|
||||
track_pending_ack_fn: TrackAckFn,
|
||||
now_fn: NowFn,
|
||||
message_repository=MessageRepository,
|
||||
contact_repository=ContactRepository,
|
||||
) -> Any:
|
||||
"""Send a direct message and persist/broadcast the outgoing row."""
|
||||
contact_data = contact.to_radio_dict()
|
||||
async with radio_manager.radio_operation("send_direct_message") as mc:
|
||||
logger.debug("Ensuring contact %s is on radio before sending", contact.public_key[:12])
|
||||
add_result = await mc.commands.add_contact(contact_data)
|
||||
if add_result.type == EventType.ERROR:
|
||||
logger.warning("Failed to add contact to radio: %s", add_result.payload)
|
||||
|
||||
cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
|
||||
if not cached_contact:
|
||||
cached_contact = contact_data
|
||||
|
||||
logger.info("Sending direct message to %s", contact.public_key[:12])
|
||||
now = int(now_fn())
|
||||
result = await mc.commands.send_msg(
|
||||
dst=cached_contact,
|
||||
msg=text,
|
||||
timestamp=now,
|
||||
)
|
||||
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
||||
|
||||
message = await create_outgoing_direct_message(
|
||||
conversation_key=contact.public_key.lower(),
|
||||
text=text,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
broadcast_fn=broadcast_fn,
|
||||
message_repository=message_repository,
|
||||
)
|
||||
if message is None:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to store outgoing message - unexpected duplicate",
|
||||
)
|
||||
|
||||
await contact_repository.update_last_contacted(contact.public_key.lower(), now)
|
||||
|
||||
expected_ack = result.payload.get("expected_ack")
|
||||
suggested_timeout: int = result.payload.get("suggested_timeout", 10000)
|
||||
if expected_ack:
|
||||
ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack
|
||||
track_pending_ack_fn(ack_code, message.id, suggested_timeout)
|
||||
logger.debug("Tracking ACK %s for message %d", ack_code, message.id)
|
||||
|
||||
return message
|
||||
|
||||
|
||||
async def send_channel_message_to_channel(
|
||||
*,
|
||||
channel,
|
||||
channel_key_upper: str,
|
||||
key_bytes: bytes,
|
||||
text: str,
|
||||
radio_manager,
|
||||
broadcast_fn: BroadcastFn,
|
||||
error_broadcast_fn: BroadcastFn,
|
||||
now_fn: NowFn,
|
||||
temp_radio_slot: int,
|
||||
message_repository=MessageRepository,
|
||||
) -> Any:
|
||||
"""Send a channel message and persist/broadcast the outgoing row."""
|
||||
message_id: int | None = None
|
||||
now: int | None = None
|
||||
radio_name = ""
|
||||
our_public_key: str | None = None
|
||||
text_with_sender = text
|
||||
|
||||
async with radio_manager.radio_operation("send_channel_message") as mc:
|
||||
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
||||
our_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None
|
||||
text_with_sender = f"{radio_name}: {text}" if radio_name else text
|
||||
logger.info("Sending channel message to %s: %s", channel.name, text[:50])
|
||||
|
||||
now = int(now_fn())
|
||||
timestamp_bytes = now.to_bytes(4, "little")
|
||||
|
||||
result = await send_channel_message_with_effective_scope(
|
||||
mc=mc,
|
||||
channel=channel,
|
||||
key_bytes=key_bytes,
|
||||
text=text,
|
||||
timestamp_bytes=timestamp_bytes,
|
||||
action_label="sending message",
|
||||
temp_radio_slot=temp_radio_slot,
|
||||
error_broadcast_fn=error_broadcast_fn,
|
||||
)
|
||||
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
||||
|
||||
outgoing_message = await create_outgoing_channel_message(
|
||||
conversation_key=channel_key_upper,
|
||||
text=text_with_sender,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=our_public_key,
|
||||
channel_name=channel.name,
|
||||
broadcast_fn=broadcast_fn,
|
||||
message_repository=message_repository,
|
||||
)
|
||||
if outgoing_message is None:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to store outgoing message - unexpected duplicate",
|
||||
)
|
||||
message_id = outgoing_message.id
|
||||
|
||||
if message_id is None or now is None:
|
||||
raise HTTPException(status_code=500, detail="Failed to store outgoing message")
|
||||
|
||||
acked_count, paths = await message_repository.get_ack_and_paths(message_id)
|
||||
return build_message_model(
|
||||
message_id=message_id,
|
||||
msg_type="CHAN",
|
||||
conversation_key=channel_key_upper,
|
||||
text=text_with_sender,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
paths=paths,
|
||||
outgoing=True,
|
||||
acked=acked_count,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=our_public_key,
|
||||
channel_name=channel.name,
|
||||
)
|
||||
|
||||
|
||||
async def resend_channel_message_record(
|
||||
*,
|
||||
message,
|
||||
channel,
|
||||
new_timestamp: bool,
|
||||
radio_manager,
|
||||
broadcast_fn: BroadcastFn,
|
||||
error_broadcast_fn: BroadcastFn,
|
||||
now_fn: NowFn,
|
||||
temp_radio_slot: int,
|
||||
message_repository=MessageRepository,
|
||||
) -> dict[str, Any]:
|
||||
"""Resend a stored outgoing channel message."""
|
||||
try:
|
||||
key_bytes = bytes.fromhex(message.conversation_key)
|
||||
except ValueError:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Invalid channel key format: {message.conversation_key}",
|
||||
) from None
|
||||
|
||||
now: int | None = None
|
||||
if new_timestamp:
|
||||
now = int(now_fn())
|
||||
timestamp_bytes = now.to_bytes(4, "little")
|
||||
else:
|
||||
timestamp_bytes = message.sender_timestamp.to_bytes(4, "little")
|
||||
|
||||
resend_public_key: str | None = None
|
||||
radio_name = ""
|
||||
|
||||
async with radio_manager.radio_operation("resend_channel_message") as mc:
|
||||
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
||||
resend_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None
|
||||
text_to_send = message.text
|
||||
if radio_name and text_to_send.startswith(f"{radio_name}: "):
|
||||
text_to_send = text_to_send[len(f"{radio_name}: ") :]
|
||||
|
||||
result = await send_channel_message_with_effective_scope(
|
||||
mc=mc,
|
||||
channel=channel,
|
||||
key_bytes=key_bytes,
|
||||
text=text_to_send,
|
||||
timestamp_bytes=timestamp_bytes,
|
||||
action_label="resending message",
|
||||
temp_radio_slot=temp_radio_slot,
|
||||
error_broadcast_fn=error_broadcast_fn,
|
||||
)
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to resend message: {result.payload}",
|
||||
)
|
||||
|
||||
if new_timestamp:
|
||||
if now is None:
|
||||
raise HTTPException(status_code=500, detail="Failed to assign resend timestamp")
|
||||
new_message = await create_outgoing_channel_message(
|
||||
conversation_key=message.conversation_key,
|
||||
text=message.text,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=resend_public_key,
|
||||
channel_name=channel.name,
|
||||
broadcast_fn=broadcast_fn,
|
||||
message_repository=message_repository,
|
||||
)
|
||||
if new_message is None:
|
||||
logger.warning(
|
||||
"Duplicate timestamp collision resending message %d — radio sent but DB row not created",
|
||||
message.id,
|
||||
)
|
||||
return {"status": "ok", "message_id": message.id}
|
||||
|
||||
logger.info(
|
||||
"Resent channel message %d as new message %d to %s",
|
||||
message.id,
|
||||
new_message.id,
|
||||
channel.name,
|
||||
)
|
||||
return {"status": "ok", "message_id": new_message.id}
|
||||
|
||||
logger.info("Resent channel message %d to %s", message.id, channel.name)
|
||||
return {"status": "ok", "message_id": message.id}
|
||||
Reference in New Issue
Block a user