"""Shared send/resend orchestration for outgoing messages.""" import asyncio import logging from collections.abc import Callable from typing import Any from fastapi import HTTPException from meshcore import EventType from app.models import ResendChannelMessageResponse from app.region_scope import normalize_region_scope from app.repository import AppSettingsRepository, ContactRepository, MessageRepository from app.services import dm_ack_tracker from app.services.messages import ( broadcast_message, build_stored_outgoing_channel_message, create_outgoing_channel_message, create_outgoing_direct_message, increment_ack_and_broadcast, ) logger = logging.getLogger(__name__) NO_RADIO_RESPONSE_AFTER_SEND_DETAIL = ( "Send command was issued to the radio, but no response was heard back. " "The message may or may not have sent successfully." ) BroadcastFn = Callable[..., Any] TrackAckFn = Callable[[str, int, int], bool] NowFn = Callable[[], float] OutgoingReservationKey = tuple[str, str, str] RetryTaskScheduler = Callable[[Any], Any] _pending_outgoing_timestamp_reservations: dict[OutgoingReservationKey, set[int]] = {} _outgoing_timestamp_reservations_lock = asyncio.Lock() DM_SEND_MAX_ATTEMPTS = 3 DEFAULT_DM_ACK_TIMEOUT_MS = 10000 DM_RETRY_WAIT_MARGIN = 1.2 async def allocate_outgoing_sender_timestamp( *, message_repository, msg_type: str, conversation_key: str, text: str, requested_timestamp: int, ) -> int: """Pick a sender timestamp that will not collide with an existing stored message.""" reservation_key = (msg_type, conversation_key, text) candidate = requested_timestamp while True: async with _outgoing_timestamp_reservations_lock: reserved = _pending_outgoing_timestamp_reservations.get(reservation_key, set()) is_reserved = candidate in reserved if is_reserved: candidate += 1 continue existing = await message_repository.get_by_content( msg_type=msg_type, conversation_key=conversation_key, text=text, sender_timestamp=candidate, ) if existing is not None: candidate += 1 continue async with _outgoing_timestamp_reservations_lock: reserved = _pending_outgoing_timestamp_reservations.setdefault(reservation_key, set()) if candidate in reserved: candidate += 1 continue reserved.add(candidate) break if candidate != requested_timestamp: logger.info( "Bumped outgoing %s timestamp for %s from %d to %d to avoid same-content collision", msg_type, conversation_key[:12], requested_timestamp, candidate, ) return candidate async def release_outgoing_sender_timestamp( *, msg_type: str, conversation_key: str, text: str, sender_timestamp: int, ) -> None: reservation_key = (msg_type, conversation_key, text) async with _outgoing_timestamp_reservations_lock: reserved = _pending_outgoing_timestamp_reservations.get(reservation_key) if not reserved: return reserved.discard(sender_timestamp) if not reserved: _pending_outgoing_timestamp_reservations.pop(reservation_key, None) async def send_channel_message_with_effective_scope( *, mc, channel, channel_key: str, key_bytes: bytes, text: str, timestamp_bytes: bytes, action_label: str, radio_manager, temp_radio_slot: int, error_broadcast_fn: BroadcastFn, app_settings_repository=AppSettingsRepository, ) -> Any: """Send a channel message, temporarily overriding flood scope when configured.""" override_scope = normalize_region_scope(channel.flood_scope_override) baseline_scope = "" if override_scope: settings = await app_settings_repository.get() baseline_scope = normalize_region_scope(settings.flood_scope) if override_scope and override_scope != baseline_scope: logger.info( "Temporarily applying channel flood_scope override for %s: %r", channel.name, override_scope, ) override_result = await mc.commands.set_flood_scope(override_scope) if override_result is not None and override_result.type == EventType.ERROR: logger.warning( "Failed to apply channel flood_scope override for %s: %s", channel.name, override_result.payload, ) raise HTTPException( status_code=500, detail=( f"Failed to apply regional override {override_scope!r} before {action_label}: " f"{override_result.payload}" ), ) try: channel_slot, needs_configure, evicted_channel_key = radio_manager.plan_channel_send_slot( channel_key, preferred_slot=temp_radio_slot, ) if needs_configure: logger.debug( "Loading channel %s into radio slot %d before %s%s", channel.name, channel_slot, action_label, ( f" (evicting cached {evicted_channel_key[:8]})" if evicted_channel_key is not None else "" ), ) try: set_result = await mc.commands.set_channel( channel_idx=channel_slot, channel_name=channel.name, channel_secret=key_bytes, ) except Exception: if evicted_channel_key is not None: radio_manager.invalidate_cached_channel_slot(evicted_channel_key) raise if set_result.type == EventType.ERROR: if evicted_channel_key is not None: radio_manager.invalidate_cached_channel_slot(evicted_channel_key) logger.warning( "Failed to set channel on radio slot %d before %s: %s", channel_slot, action_label, set_result.payload, ) raise HTTPException( status_code=500, detail=f"Failed to configure channel on radio before {action_label}", ) radio_manager.note_channel_slot_loaded(channel_key, channel_slot) else: logger.debug( "Reusing cached radio slot %d for channel %s before %s", channel_slot, channel.name, action_label, ) send_result = await mc.commands.send_chan_msg( chan=channel_slot, msg=text, timestamp=timestamp_bytes, ) if send_result is None: logger.warning( "No response from radio after %s for channel %s; send outcome is unknown", action_label, channel.name, ) raise HTTPException(status_code=504, detail=NO_RADIO_RESPONSE_AFTER_SEND_DETAIL) if send_result.type == EventType.ERROR: radio_manager.invalidate_cached_channel_slot(channel_key) else: radio_manager.note_channel_slot_used(channel_key) return send_result finally: if override_scope and override_scope != baseline_scope: try: restore_result = await mc.commands.set_flood_scope( baseline_scope if baseline_scope else "" ) if restore_result is not None and restore_result.type == EventType.ERROR: logger.error( "Failed to restore baseline flood_scope after sending to %s: %s", channel.name, restore_result.payload, ) error_broadcast_fn( "Regional override restore failed", ( f"Sent to {channel.name}, but restoring flood scope failed. " "The radio may still be region-scoped. Consider rebooting the radio." ), ) else: logger.debug( "Restored baseline flood_scope after channel send: %r", baseline_scope or "(disabled)", ) except Exception: logger.exception( "Failed to restore baseline flood_scope after sending to %s", channel.name, ) error_broadcast_fn( "Regional override restore failed", ( f"Sent to {channel.name}, but restoring flood scope failed. " "The radio may still be region-scoped. Consider rebooting the radio." ), ) def _extract_expected_ack_code(result: Any) -> str | None: if result is None or result.type == EventType.ERROR: return None payload = result.payload or {} expected_ack = payload.get("expected_ack") if not expected_ack: return None return expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack def _get_ack_tracking_timeout_ms(result: Any) -> int: if result is None or result.type == EventType.ERROR: return DEFAULT_DM_ACK_TIMEOUT_MS payload = result.payload or {} suggested_timeout = payload.get("suggested_timeout") if suggested_timeout is None: return DEFAULT_DM_ACK_TIMEOUT_MS try: return max(1, int(suggested_timeout)) except (TypeError, ValueError): return DEFAULT_DM_ACK_TIMEOUT_MS def _get_direct_message_retry_timeout_ms(result: Any) -> int: """Return the ACK window to wait before retrying a DM. The MeshCore firmware already computes and returns `suggested_timeout` in `PACKET_MSG_SENT`, derived from estimated packet airtime and route mode. We use that firmware-supplied window directly so retries do not fire before the radio's own ACK timeout expires. Sources: - https://github.com/meshcore-dev/MeshCore/blob/main/src/helpers/BaseChatMesh.cpp - https://github.com/meshcore-dev/MeshCore/blob/main/examples/companion_radio/MyMesh.cpp - https://github.com/meshcore-dev/MeshCore/blob/main/docs/companion_protocol.md """ return _get_ack_tracking_timeout_ms(result) async def _apply_direct_message_ack_tracking( *, result: Any, message_id: int, track_pending_ack_fn: TrackAckFn, broadcast_fn: BroadcastFn, ) -> int: ack_code = _extract_expected_ack_code(result) if not ack_code: return 0 timeout_ms = _get_ack_tracking_timeout_ms(result) matched_immediately = track_pending_ack_fn(ack_code, message_id, timeout_ms) is True logger.debug("Tracking ACK %s for message %d", ack_code, message_id) if matched_immediately: dm_ack_tracker.clear_pending_acks_for_message(message_id) return await increment_ack_and_broadcast( message_id=message_id, broadcast_fn=broadcast_fn, ) return 0 async def _is_message_acked(*, message_id: int, message_repository) -> bool: acked_count, _paths = await message_repository.get_ack_and_paths(message_id) return acked_count > 0 async def _retry_direct_message_until_acked( *, contact, text: str, message_id: int, sender_timestamp: int, radio_manager, track_pending_ack_fn: TrackAckFn, broadcast_fn: BroadcastFn, wait_timeout_ms: int, sleep_fn, message_repository, ) -> None: next_wait_timeout_ms = wait_timeout_ms for attempt in range(1, DM_SEND_MAX_ATTEMPTS): await sleep_fn((next_wait_timeout_ms / 1000) * DM_RETRY_WAIT_MARGIN) if await _is_message_acked(message_id=message_id, message_repository=message_repository): return try: async with radio_manager.radio_operation("retry_direct_message") as mc: contact_data = contact.to_radio_dict() add_result = await mc.commands.add_contact(contact_data) if add_result.type == EventType.ERROR: logger.warning( "Failed to reload contact %s on radio before DM retry: %s", contact.public_key[:12], add_result.payload, ) cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) if not cached_contact: cached_contact = contact_data if attempt == DM_SEND_MAX_ATTEMPTS - 1: reset_result = await mc.commands.reset_path(contact.public_key) if reset_result is None: logger.warning( "No response from radio for reset_path to %s before final DM retry", contact.public_key[:12], ) elif reset_result.type == EventType.ERROR: logger.warning( "Failed to reset path before final DM retry to %s: %s", contact.public_key[:12], reset_result.payload, ) refreshed_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) if refreshed_contact: cached_contact = refreshed_contact result = await mc.commands.send_msg( dst=cached_contact, msg=text, timestamp=sender_timestamp, attempt=attempt, ) except Exception: logger.exception( "Background DM retry attempt %d/%d failed for %s", attempt + 1, DM_SEND_MAX_ATTEMPTS, contact.public_key[:12], ) continue if result is None: logger.warning( "No response from radio after background DM retry attempt %d/%d to %s", attempt + 1, DM_SEND_MAX_ATTEMPTS, contact.public_key[:12], ) continue if result.type == EventType.ERROR: logger.warning( "Background DM retry attempt %d/%d failed for %s: %s", attempt + 1, DM_SEND_MAX_ATTEMPTS, contact.public_key[:12], result.payload, ) continue if await _is_message_acked(message_id=message_id, message_repository=message_repository): return ack_code = _extract_expected_ack_code(result) if not ack_code: logger.warning( "Background DM retry attempt %d/%d for %s returned no expected_ack; " "stopping retries to avoid duplicate sends", attempt + 1, DM_SEND_MAX_ATTEMPTS, contact.public_key[:12], ) return next_wait_timeout_ms = _get_direct_message_retry_timeout_ms(result) ack_count = await _apply_direct_message_ack_tracking( result=result, message_id=message_id, track_pending_ack_fn=track_pending_ack_fn, broadcast_fn=broadcast_fn, ) if ack_count > 0: return async def send_direct_message_to_contact( *, contact, text: str, radio_manager, broadcast_fn: BroadcastFn, track_pending_ack_fn: TrackAckFn, now_fn: NowFn, retry_task_scheduler: RetryTaskScheduler | None = None, retry_sleep_fn=None, message_repository=MessageRepository, contact_repository=ContactRepository, ) -> Any: """Send a direct message and persist/broadcast the outgoing row.""" if retry_task_scheduler is None: retry_task_scheduler = asyncio.create_task if retry_sleep_fn is None: retry_sleep_fn = asyncio.sleep contact_data = contact.to_radio_dict() sent_at: int | None = None sender_timestamp: int | None = None message = None result = None try: async with radio_manager.radio_operation("send_direct_message") as mc: logger.debug("Ensuring contact %s is on radio before sending", contact.public_key[:12]) add_result = await mc.commands.add_contact(contact_data) if add_result.type == EventType.ERROR: logger.warning("Failed to add contact to radio: %s", add_result.payload) cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) if not cached_contact: cached_contact = contact_data logger.info("Sending direct message to %s", contact.public_key[:12]) sent_at = int(now_fn()) sender_timestamp = await allocate_outgoing_sender_timestamp( message_repository=message_repository, msg_type="PRIV", conversation_key=contact.public_key.lower(), text=text, requested_timestamp=sent_at, ) result = await mc.commands.send_msg( dst=cached_contact, msg=text, timestamp=sender_timestamp, ) if result is None: logger.warning( "No response from radio after direct send to %s; send outcome is unknown", contact.public_key[:12], ) raise HTTPException(status_code=504, detail=NO_RADIO_RESPONSE_AFTER_SEND_DETAIL) if result.type == EventType.ERROR: raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}") message = await create_outgoing_direct_message( conversation_key=contact.public_key.lower(), text=text, sender_timestamp=sender_timestamp, received_at=sent_at, broadcast_fn=broadcast_fn, message_repository=message_repository, ) if message is None: raise HTTPException( status_code=500, detail="Failed to store outgoing message - unexpected duplicate", ) finally: if sender_timestamp is not None: await release_outgoing_sender_timestamp( msg_type="PRIV", conversation_key=contact.public_key.lower(), text=text, sender_timestamp=sender_timestamp, ) if sent_at is None or sender_timestamp is None or message is None or result is None: raise HTTPException(status_code=500, detail="Failed to store outgoing message") await contact_repository.update_last_contacted(contact.public_key.lower(), sent_at) ack_code = _extract_expected_ack_code(result) retry_timeout_ms = _get_direct_message_retry_timeout_ms(result) ack_count = await _apply_direct_message_ack_tracking( result=result, message_id=message.id, track_pending_ack_fn=track_pending_ack_fn, broadcast_fn=broadcast_fn, ) if ack_count > 0: message.acked = ack_count return message if DM_SEND_MAX_ATTEMPTS > 1 and ack_code: retry_task_scheduler( _retry_direct_message_until_acked( contact=contact, text=text, message_id=message.id, sender_timestamp=sender_timestamp, radio_manager=radio_manager, track_pending_ack_fn=track_pending_ack_fn, broadcast_fn=broadcast_fn, wait_timeout_ms=retry_timeout_ms, sleep_fn=retry_sleep_fn, message_repository=message_repository, ) ) return message async def send_channel_message_to_channel( *, channel, channel_key_upper: str, key_bytes: bytes, text: str, radio_manager, broadcast_fn: BroadcastFn, error_broadcast_fn: BroadcastFn, now_fn: NowFn, temp_radio_slot: int, message_repository=MessageRepository, ) -> Any: """Send a channel message and persist/broadcast the outgoing row.""" sent_at: int | None = None sender_timestamp: int | None = None radio_name = "" our_public_key: str | None = None text_with_sender = text outgoing_message = None try: async with radio_manager.radio_operation("send_channel_message") as mc: radio_name = mc.self_info.get("name", "") if mc.self_info else "" our_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None text_with_sender = f"{radio_name}: {text}" if radio_name else text logger.info("Sending channel message to %s: %s", channel.name, text[:50]) sent_at = int(now_fn()) sender_timestamp = await allocate_outgoing_sender_timestamp( message_repository=message_repository, msg_type="CHAN", conversation_key=channel_key_upper, text=text_with_sender, requested_timestamp=sent_at, ) timestamp_bytes = sender_timestamp.to_bytes(4, "little") outgoing_message = await create_outgoing_channel_message( conversation_key=channel_key_upper, text=text_with_sender, sender_timestamp=sender_timestamp, received_at=sent_at, sender_name=radio_name or None, sender_key=our_public_key, channel_name=channel.name, broadcast_fn=broadcast_fn, broadcast=False, message_repository=message_repository, ) if outgoing_message is None: raise HTTPException( status_code=500, detail="Failed to store outgoing message - unexpected duplicate", ) result = await send_channel_message_with_effective_scope( mc=mc, channel=channel, channel_key=channel_key_upper, key_bytes=key_bytes, text=text, timestamp_bytes=timestamp_bytes, action_label="sending message", radio_manager=radio_manager, temp_radio_slot=temp_radio_slot, error_broadcast_fn=error_broadcast_fn, ) if result is None: logger.warning( "No response from radio after channel send to %s; send outcome is unknown", channel.name, ) raise HTTPException(status_code=504, detail=NO_RADIO_RESPONSE_AFTER_SEND_DETAIL) if result.type == EventType.ERROR: raise HTTPException( status_code=500, detail=f"Failed to send message: {result.payload}" ) except Exception: if outgoing_message is not None: await message_repository.delete_by_id(outgoing_message.id) outgoing_message = None raise finally: if sender_timestamp is not None: await release_outgoing_sender_timestamp( msg_type="CHAN", conversation_key=channel_key_upper, text=text_with_sender, sender_timestamp=sender_timestamp, ) if sent_at is None or sender_timestamp is None or outgoing_message is None: raise HTTPException(status_code=500, detail="Failed to store outgoing message") outgoing_message = await build_stored_outgoing_channel_message( message_id=outgoing_message.id, conversation_key=channel_key_upper, text=text_with_sender, sender_timestamp=sender_timestamp, received_at=sent_at, sender_name=radio_name or None, sender_key=our_public_key, channel_name=channel.name, message_repository=message_repository, ) broadcast_message(message=outgoing_message, broadcast_fn=broadcast_fn) return outgoing_message async def resend_channel_message_record( *, message, channel, new_timestamp: bool, radio_manager, broadcast_fn: BroadcastFn, error_broadcast_fn: BroadcastFn, now_fn: NowFn, temp_radio_slot: int, message_repository=MessageRepository, ) -> ResendChannelMessageResponse: """Resend a stored outgoing channel message.""" try: key_bytes = bytes.fromhex(message.conversation_key) except ValueError: raise HTTPException( status_code=400, detail=f"Invalid channel key format: {message.conversation_key}", ) from None sent_at: int | None = None sender_timestamp = message.sender_timestamp timestamp_bytes = message.sender_timestamp.to_bytes(4, "little") resend_public_key: str | None = None radio_name = "" new_message = None stored_text = message.text try: async with radio_manager.radio_operation("resend_channel_message") as mc: radio_name = mc.self_info.get("name", "") if mc.self_info else "" resend_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None text_to_send = message.text if radio_name and text_to_send.startswith(f"{radio_name}: "): text_to_send = text_to_send[len(f"{radio_name}: ") :] if new_timestamp: sent_at = int(now_fn()) sender_timestamp = await allocate_outgoing_sender_timestamp( message_repository=message_repository, msg_type="CHAN", conversation_key=message.conversation_key, text=stored_text, requested_timestamp=sent_at, ) timestamp_bytes = sender_timestamp.to_bytes(4, "little") new_message = await create_outgoing_channel_message( conversation_key=message.conversation_key, text=message.text, sender_timestamp=sender_timestamp, received_at=sent_at, sender_name=radio_name or None, sender_key=resend_public_key, channel_name=channel.name, broadcast_fn=broadcast_fn, broadcast=False, message_repository=message_repository, ) if new_message is None: raise HTTPException( status_code=500, detail="Failed to store resent message - unexpected duplicate", ) result = await send_channel_message_with_effective_scope( mc=mc, channel=channel, channel_key=message.conversation_key, key_bytes=key_bytes, text=text_to_send, timestamp_bytes=timestamp_bytes, action_label="resending message", radio_manager=radio_manager, temp_radio_slot=temp_radio_slot, error_broadcast_fn=error_broadcast_fn, ) if result is None: logger.warning( "No response from radio after channel resend to %s; send outcome is unknown", channel.name, ) raise HTTPException(status_code=504, detail=NO_RADIO_RESPONSE_AFTER_SEND_DETAIL) if result.type == EventType.ERROR: raise HTTPException( status_code=500, detail=f"Failed to resend message: {result.payload}", ) except Exception: if new_message is not None: await message_repository.delete_by_id(new_message.id) new_message = None raise finally: if new_timestamp and sent_at is not None: await release_outgoing_sender_timestamp( msg_type="CHAN", conversation_key=message.conversation_key, text=stored_text, sender_timestamp=sender_timestamp, ) if new_timestamp: if sent_at is None or new_message is None: raise HTTPException(status_code=500, detail="Failed to assign resend timestamp") new_message = await build_stored_outgoing_channel_message( message_id=new_message.id, conversation_key=message.conversation_key, text=message.text, sender_timestamp=sender_timestamp, received_at=sent_at, sender_name=radio_name or None, sender_key=resend_public_key, channel_name=channel.name, message_repository=message_repository, ) broadcast_message(message=new_message, broadcast_fn=broadcast_fn) logger.info( "Resent channel message %d as new message %d to %s", message.id, new_message.id, channel.name, ) return ResendChannelMessageResponse( status="ok", message_id=new_message.id, message=new_message, ) logger.info("Resent channel message %d to %s", message.id, channel.name) return ResendChannelMessageResponse(status="ok", message_id=message.id)