diff --git a/app/routers/messages.py b/app/routers/messages.py index b66d612..588d064 100644 --- a/app/routers/messages.py +++ b/app/routers/messages.py @@ -1,9 +1,7 @@ import logging import time -from typing import Any from fastapi import APIRouter, HTTPException, Query -from meshcore import EventType from app.dependencies import require_connected from app.event_handlers import track_pending_ack @@ -14,12 +12,11 @@ from app.models import ( SendDirectMessageRequest, ) from app.radio import radio_manager -from app.region_scope import normalize_region_scope from app.repository import AmbiguousPublicKeyPrefixError, AppSettingsRepository, MessageRepository -from app.services.messages import ( - build_message_model, - create_outgoing_channel_message, - create_outgoing_direct_message, +from app.services.message_send import ( + resend_channel_message_record, + send_channel_message_to_channel, + send_direct_message_to_contact, ) from app.websocket import broadcast_error, broadcast_event @@ -27,105 +24,6 @@ logger = logging.getLogger(__name__) router = APIRouter(prefix="/messages", tags=["messages"]) -async def _send_channel_message_with_effective_scope( - *, - mc, - channel, - key_bytes: bytes, - text: str, - timestamp_bytes: bytes, - action_label: str, -) -> Any: - """Send a channel message, temporarily overriding flood scope when configured.""" - override_scope = normalize_region_scope(channel.flood_scope_override) - baseline_scope = "" - - if override_scope: - settings = await AppSettingsRepository.get() - baseline_scope = normalize_region_scope(settings.flood_scope) - - if override_scope and override_scope != baseline_scope: - logger.info( - "Temporarily applying channel flood_scope override for %s: %r", - channel.name, - override_scope, - ) - override_result = await mc.commands.set_flood_scope(override_scope) - if override_result is not None and override_result.type == EventType.ERROR: - logger.warning( - "Failed to apply channel flood_scope override for %s: %s", - channel.name, - override_result.payload, - ) - raise HTTPException( - status_code=500, - detail=( - f"Failed to apply regional override {override_scope!r} before {action_label}: " - f"{override_result.payload}" - ), - ) - - try: - set_result = await mc.commands.set_channel( - channel_idx=TEMP_RADIO_SLOT, - channel_name=channel.name, - channel_secret=key_bytes, - ) - if set_result.type == EventType.ERROR: - logger.warning( - "Failed to set channel on radio slot %d before %s: %s", - TEMP_RADIO_SLOT, - action_label, - set_result.payload, - ) - raise HTTPException( - status_code=500, - detail=f"Failed to configure channel on radio before {action_label}", - ) - - return await mc.commands.send_chan_msg( - chan=TEMP_RADIO_SLOT, - msg=text, - timestamp=timestamp_bytes, - ) - finally: - if override_scope and override_scope != baseline_scope: - try: - restore_result = await mc.commands.set_flood_scope( - baseline_scope if baseline_scope else "" - ) - if restore_result is not None and restore_result.type == EventType.ERROR: - logger.error( - "Failed to restore baseline flood_scope after sending to %s: %s", - channel.name, - restore_result.payload, - ) - broadcast_error( - "Regional override restore failed", - ( - f"Sent to {channel.name}, but restoring flood scope failed. " - "The radio may still be region-scoped. Consider rebooting the radio." - ), - ) - else: - logger.debug( - "Restored baseline flood_scope after channel send: %r", - baseline_scope or "(disabled)", - ) - except Exception: - logger.exception( - "Failed to restore baseline flood_scope after sending to %s", - channel.name, - ) - broadcast_error( - "Regional override restore failed", - ( - f"Sent to {channel.name}, but restoring flood scope failed. " - "The radio may still be region-scoped. Consider rebooting the radio." - ), - ) - - @router.get("/around/{message_id}", response_model=MessagesAroundResponse) async def get_messages_around( message_id: int, @@ -211,65 +109,16 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message: status_code=404, detail=f"Contact not found in database: {request.destination}" ) - # Always add/update the contact on radio before sending. - # The library cache (get_contact_by_key_prefix) can be stale after radio reboot, - # so we can't rely on it to know if the firmware has the contact. - # add_contact is idempotent - updates if exists, adds if not. - contact_data = db_contact.to_radio_dict() - async with radio_manager.radio_operation("send_direct_message") as mc: - logger.debug("Ensuring contact %s is on radio before sending", db_contact.public_key[:12]) - add_result = await mc.commands.add_contact(contact_data) - if add_result.type == EventType.ERROR: - logger.warning("Failed to add contact to radio: %s", add_result.payload) - # Continue anyway - might still work if contact exists - - # Get the contact from the library cache (may have updated info like path) - contact = mc.get_contact_by_key_prefix(db_contact.public_key[:12]) - if not contact: - contact = contact_data - - logger.info("Sending direct message to %s", db_contact.public_key[:12]) - - # Capture timestamp BEFORE sending so we can pass the same value to both the radio - # and the database. This ensures consistency for deduplication. - now = int(time.time()) - - result = await mc.commands.send_msg( - dst=contact, - msg=request.text, - timestamp=now, - ) - - if result.type == EventType.ERROR: - raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}") - - # Store outgoing message - message = await create_outgoing_direct_message( - conversation_key=db_contact.public_key.lower(), + return await send_direct_message_to_contact( + contact=db_contact, text=request.text, - sender_timestamp=now, - received_at=now, + radio_manager=radio_manager, broadcast_fn=broadcast_event, + track_pending_ack_fn=track_pending_ack, + now_fn=time.time, message_repository=MessageRepository, + contact_repository=ContactRepository, ) - if message is None: - raise HTTPException( - status_code=500, - detail="Failed to store outgoing message - unexpected duplicate", - ) - - # Update last_contacted for the contact - await ContactRepository.update_last_contacted(db_contact.public_key.lower(), now) - - # Track the expected ACK for this message - expected_ack = result.payload.get("expected_ack") - suggested_timeout: int = result.payload.get("suggested_timeout", 10000) # default 10s - if expected_ack: - ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack - track_pending_ack(ack_code, message.id, suggested_timeout) - logger.debug("Tracking ACK %s for message %d", ack_code, message.id) - - return message # Temporary radio slot used for sending channel messages @@ -307,80 +156,19 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message: TEMP_RADIO_SLOT, expected_hash, ) - channel_key_upper = request.channel_key.upper() - message_id: int | None = None - now: int | None = None - radio_name: str = "" - text_with_sender: str = request.text - - our_public_key: str | None = None - - async with radio_manager.radio_operation("send_channel_message") as mc: - radio_name = mc.self_info.get("name", "") if mc.self_info else "" - our_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None - text_with_sender = f"{radio_name}: {request.text}" if radio_name else request.text - logger.info("Sending channel message to %s: %s", db_channel.name, request.text[:50]) - - # Capture timestamp BEFORE sending so we can pass the same value to both the radio - # and the database. This ensures the echo's timestamp matches our stored message - # for proper deduplication. - now = int(time.time()) - timestamp_bytes = now.to_bytes(4, "little") - - result = await _send_channel_message_with_effective_scope( - mc=mc, - channel=db_channel, - key_bytes=key_bytes, - text=request.text, - timestamp_bytes=timestamp_bytes, - action_label="sending message", - ) - - if result.type == EventType.ERROR: - raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}") - - # Store outgoing immediately after send to avoid a race where - # our own echo lands before persistence. - outgoing_message = await create_outgoing_channel_message( - conversation_key=channel_key_upper, - text=text_with_sender, - sender_timestamp=now, - received_at=now, - sender_name=radio_name or None, - sender_key=our_public_key, - channel_name=db_channel.name, - broadcast_fn=broadcast_event, - message_repository=MessageRepository, - ) - if outgoing_message is None: - raise HTTPException( - status_code=500, - detail="Failed to store outgoing message - unexpected duplicate", - ) - message_id = outgoing_message.id - - if message_id is None or now is None: - raise HTTPException(status_code=500, detail="Failed to store outgoing message") - - acked_count, paths = await MessageRepository.get_ack_and_paths(message_id) - - message = build_message_model( - message_id=message_id, - msg_type="CHAN", - conversation_key=channel_key_upper, - text=text_with_sender, - sender_timestamp=now, - received_at=now, - paths=paths, - outgoing=True, - acked=acked_count, - sender_name=radio_name or None, - sender_key=our_public_key, - channel_name=db_channel.name, + return await send_channel_message_to_channel( + channel=db_channel, + channel_key_upper=request.channel_key.upper(), + key_bytes=key_bytes, + text=request.text, + radio_manager=radio_manager, + broadcast_fn=broadcast_event, + error_broadcast_fn=broadcast_error, + now_fn=time.time, + temp_radio_slot=TEMP_RADIO_SLOT, + message_repository=MessageRepository, ) - return message - RESEND_WINDOW_SECONDS = 30 @@ -425,73 +213,14 @@ async def resend_channel_message( if not db_channel: raise HTTPException(status_code=404, detail=f"Channel {msg.conversation_key} not found") - # Choose timestamp: original for byte-perfect, fresh for new-timestamp - if new_timestamp: - now = int(time.time()) - timestamp_bytes = now.to_bytes(4, "little") - else: - timestamp_bytes = msg.sender_timestamp.to_bytes(4, "little") - - try: - key_bytes = bytes.fromhex(msg.conversation_key) - except ValueError: - raise HTTPException( - status_code=400, detail=f"Invalid channel key format: {msg.conversation_key}" - ) from None - - resend_public_key: str | None = None - - async with radio_manager.radio_operation("resend_channel_message") as mc: - # Strip sender prefix: DB stores "RadioName: message" but radio needs "message" - radio_name = mc.self_info.get("name", "") if mc.self_info else "" - resend_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None - text_to_send = msg.text - if radio_name and text_to_send.startswith(f"{radio_name}: "): - text_to_send = text_to_send[len(f"{radio_name}: ") :] - - result = await _send_channel_message_with_effective_scope( - mc=mc, - channel=db_channel, - key_bytes=key_bytes, - text=text_to_send, - timestamp_bytes=timestamp_bytes, - action_label="resending message", - ) - if result.type == EventType.ERROR: - raise HTTPException( - status_code=500, detail=f"Failed to resend message: {result.payload}" - ) - - # For new-timestamp resend, create a new message row and broadcast it - if new_timestamp: - new_message = await create_outgoing_channel_message( - conversation_key=msg.conversation_key, - text=msg.text, - sender_timestamp=now, - received_at=now, - sender_name=radio_name or None, - sender_key=resend_public_key, - channel_name=db_channel.name, - broadcast_fn=broadcast_event, - message_repository=MessageRepository, - ) - if new_message is None: - # Timestamp-second collision (same text+channel within the same second). - # The radio already transmitted, so log and return the original ID rather - # than surfacing a 500 for a message that was successfully sent over the air. - logger.warning( - "Duplicate timestamp collision resending message %d — radio sent but DB row not created", - message_id, - ) - return {"status": "ok", "message_id": message_id} - - logger.info( - "Resent channel message %d as new message %d to %s", - message_id, - new_message.id, - db_channel.name, - ) - return {"status": "ok", "message_id": new_message.id} - - logger.info("Resent channel message %d to %s", message_id, db_channel.name) - return {"status": "ok", "message_id": message_id} + return await resend_channel_message_record( + message=msg, + channel=db_channel, + new_timestamp=new_timestamp, + radio_manager=radio_manager, + broadcast_fn=broadcast_event, + error_broadcast_fn=broadcast_error, + now_fn=time.time, + temp_radio_slot=TEMP_RADIO_SLOT, + message_repository=MessageRepository, + ) diff --git a/app/services/message_send.py b/app/services/message_send.py new file mode 100644 index 0000000..2a5dffb --- /dev/null +++ b/app/services/message_send.py @@ -0,0 +1,352 @@ +"""Shared send/resend orchestration for outgoing messages.""" + +import logging +from collections.abc import Callable +from typing import Any + +from fastapi import HTTPException +from meshcore import EventType + +from app.region_scope import normalize_region_scope +from app.repository import AppSettingsRepository, ContactRepository, MessageRepository +from app.services.messages import ( + build_message_model, + create_outgoing_channel_message, + create_outgoing_direct_message, +) + +logger = logging.getLogger(__name__) + +BroadcastFn = Callable[..., Any] +TrackAckFn = Callable[[str, int, int], None] +NowFn = Callable[[], float] + + +async def send_channel_message_with_effective_scope( + *, + mc, + channel, + key_bytes: bytes, + text: str, + timestamp_bytes: bytes, + action_label: str, + temp_radio_slot: int, + error_broadcast_fn: BroadcastFn, + app_settings_repository=AppSettingsRepository, +) -> Any: + """Send a channel message, temporarily overriding flood scope when configured.""" + override_scope = normalize_region_scope(channel.flood_scope_override) + baseline_scope = "" + + if override_scope: + settings = await app_settings_repository.get() + baseline_scope = normalize_region_scope(settings.flood_scope) + + if override_scope and override_scope != baseline_scope: + logger.info( + "Temporarily applying channel flood_scope override for %s: %r", + channel.name, + override_scope, + ) + override_result = await mc.commands.set_flood_scope(override_scope) + if override_result is not None and override_result.type == EventType.ERROR: + logger.warning( + "Failed to apply channel flood_scope override for %s: %s", + channel.name, + override_result.payload, + ) + raise HTTPException( + status_code=500, + detail=( + f"Failed to apply regional override {override_scope!r} before {action_label}: " + f"{override_result.payload}" + ), + ) + + try: + set_result = await mc.commands.set_channel( + channel_idx=temp_radio_slot, + channel_name=channel.name, + channel_secret=key_bytes, + ) + if set_result.type == EventType.ERROR: + logger.warning( + "Failed to set channel on radio slot %d before %s: %s", + temp_radio_slot, + action_label, + set_result.payload, + ) + raise HTTPException( + status_code=500, + detail=f"Failed to configure channel on radio before {action_label}", + ) + + return await mc.commands.send_chan_msg( + chan=temp_radio_slot, + msg=text, + timestamp=timestamp_bytes, + ) + finally: + if override_scope and override_scope != baseline_scope: + try: + restore_result = await mc.commands.set_flood_scope( + baseline_scope if baseline_scope else "" + ) + if restore_result is not None and restore_result.type == EventType.ERROR: + logger.error( + "Failed to restore baseline flood_scope after sending to %s: %s", + channel.name, + restore_result.payload, + ) + error_broadcast_fn( + "Regional override restore failed", + ( + f"Sent to {channel.name}, but restoring flood scope failed. " + "The radio may still be region-scoped. Consider rebooting the radio." + ), + ) + else: + logger.debug( + "Restored baseline flood_scope after channel send: %r", + baseline_scope or "(disabled)", + ) + except Exception: + logger.exception( + "Failed to restore baseline flood_scope after sending to %s", + channel.name, + ) + error_broadcast_fn( + "Regional override restore failed", + ( + f"Sent to {channel.name}, but restoring flood scope failed. " + "The radio may still be region-scoped. Consider rebooting the radio." + ), + ) + + +async def send_direct_message_to_contact( + *, + contact, + text: str, + radio_manager, + broadcast_fn: BroadcastFn, + track_pending_ack_fn: TrackAckFn, + now_fn: NowFn, + message_repository=MessageRepository, + contact_repository=ContactRepository, +) -> Any: + """Send a direct message and persist/broadcast the outgoing row.""" + contact_data = contact.to_radio_dict() + async with radio_manager.radio_operation("send_direct_message") as mc: + logger.debug("Ensuring contact %s is on radio before sending", contact.public_key[:12]) + add_result = await mc.commands.add_contact(contact_data) + if add_result.type == EventType.ERROR: + logger.warning("Failed to add contact to radio: %s", add_result.payload) + + cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) + if not cached_contact: + cached_contact = contact_data + + logger.info("Sending direct message to %s", contact.public_key[:12]) + now = int(now_fn()) + result = await mc.commands.send_msg( + dst=cached_contact, + msg=text, + timestamp=now, + ) + + if result.type == EventType.ERROR: + raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}") + + message = await create_outgoing_direct_message( + conversation_key=contact.public_key.lower(), + text=text, + sender_timestamp=now, + received_at=now, + broadcast_fn=broadcast_fn, + message_repository=message_repository, + ) + if message is None: + raise HTTPException( + status_code=500, + detail="Failed to store outgoing message - unexpected duplicate", + ) + + await contact_repository.update_last_contacted(contact.public_key.lower(), now) + + expected_ack = result.payload.get("expected_ack") + suggested_timeout: int = result.payload.get("suggested_timeout", 10000) + if expected_ack: + ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack + track_pending_ack_fn(ack_code, message.id, suggested_timeout) + logger.debug("Tracking ACK %s for message %d", ack_code, message.id) + + return message + + +async def send_channel_message_to_channel( + *, + channel, + channel_key_upper: str, + key_bytes: bytes, + text: str, + radio_manager, + broadcast_fn: BroadcastFn, + error_broadcast_fn: BroadcastFn, + now_fn: NowFn, + temp_radio_slot: int, + message_repository=MessageRepository, +) -> Any: + """Send a channel message and persist/broadcast the outgoing row.""" + message_id: int | None = None + now: int | None = None + radio_name = "" + our_public_key: str | None = None + text_with_sender = text + + async with radio_manager.radio_operation("send_channel_message") as mc: + radio_name = mc.self_info.get("name", "") if mc.self_info else "" + our_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None + text_with_sender = f"{radio_name}: {text}" if radio_name else text + logger.info("Sending channel message to %s: %s", channel.name, text[:50]) + + now = int(now_fn()) + timestamp_bytes = now.to_bytes(4, "little") + + result = await send_channel_message_with_effective_scope( + mc=mc, + channel=channel, + key_bytes=key_bytes, + text=text, + timestamp_bytes=timestamp_bytes, + action_label="sending message", + temp_radio_slot=temp_radio_slot, + error_broadcast_fn=error_broadcast_fn, + ) + + if result.type == EventType.ERROR: + raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}") + + outgoing_message = await create_outgoing_channel_message( + conversation_key=channel_key_upper, + text=text_with_sender, + sender_timestamp=now, + received_at=now, + sender_name=radio_name or None, + sender_key=our_public_key, + channel_name=channel.name, + broadcast_fn=broadcast_fn, + message_repository=message_repository, + ) + if outgoing_message is None: + raise HTTPException( + status_code=500, + detail="Failed to store outgoing message - unexpected duplicate", + ) + message_id = outgoing_message.id + + if message_id is None or now is None: + raise HTTPException(status_code=500, detail="Failed to store outgoing message") + + acked_count, paths = await message_repository.get_ack_and_paths(message_id) + return build_message_model( + message_id=message_id, + msg_type="CHAN", + conversation_key=channel_key_upper, + text=text_with_sender, + sender_timestamp=now, + received_at=now, + paths=paths, + outgoing=True, + acked=acked_count, + sender_name=radio_name or None, + sender_key=our_public_key, + channel_name=channel.name, + ) + + +async def resend_channel_message_record( + *, + message, + channel, + new_timestamp: bool, + radio_manager, + broadcast_fn: BroadcastFn, + error_broadcast_fn: BroadcastFn, + now_fn: NowFn, + temp_radio_slot: int, + message_repository=MessageRepository, +) -> dict[str, Any]: + """Resend a stored outgoing channel message.""" + try: + key_bytes = bytes.fromhex(message.conversation_key) + except ValueError: + raise HTTPException( + status_code=400, + detail=f"Invalid channel key format: {message.conversation_key}", + ) from None + + now: int | None = None + if new_timestamp: + now = int(now_fn()) + timestamp_bytes = now.to_bytes(4, "little") + else: + timestamp_bytes = message.sender_timestamp.to_bytes(4, "little") + + resend_public_key: str | None = None + radio_name = "" + + async with radio_manager.radio_operation("resend_channel_message") as mc: + radio_name = mc.self_info.get("name", "") if mc.self_info else "" + resend_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None + text_to_send = message.text + if radio_name and text_to_send.startswith(f"{radio_name}: "): + text_to_send = text_to_send[len(f"{radio_name}: ") :] + + result = await send_channel_message_with_effective_scope( + mc=mc, + channel=channel, + key_bytes=key_bytes, + text=text_to_send, + timestamp_bytes=timestamp_bytes, + action_label="resending message", + temp_radio_slot=temp_radio_slot, + error_broadcast_fn=error_broadcast_fn, + ) + if result.type == EventType.ERROR: + raise HTTPException( + status_code=500, + detail=f"Failed to resend message: {result.payload}", + ) + + if new_timestamp: + if now is None: + raise HTTPException(status_code=500, detail="Failed to assign resend timestamp") + new_message = await create_outgoing_channel_message( + conversation_key=message.conversation_key, + text=message.text, + sender_timestamp=now, + received_at=now, + sender_name=radio_name or None, + sender_key=resend_public_key, + channel_name=channel.name, + broadcast_fn=broadcast_fn, + message_repository=message_repository, + ) + if new_message is None: + logger.warning( + "Duplicate timestamp collision resending message %d — radio sent but DB row not created", + message.id, + ) + return {"status": "ok", "message_id": message.id} + + logger.info( + "Resent channel message %d as new message %d to %s", + message.id, + new_message.id, + channel.name, + ) + return {"status": "ok", "message_id": new_message.id} + + logger.info("Resent channel message %d to %s", message.id, channel.name) + return {"status": "ok", "message_id": message.id}