mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
436 lines
16 KiB
Python
436 lines
16 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from meshcore import EventType
|
|
|
|
from app.dependencies import require_connected
|
|
from app.event_handlers import track_pending_ack
|
|
from app.models import Message, SendChannelMessageRequest, SendDirectMessageRequest
|
|
from app.radio import radio_manager
|
|
from app.repository import AmbiguousPublicKeyPrefixError, MessageRepository
|
|
from app.websocket import broadcast_event
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/messages", tags=["messages"])
|
|
|
|
|
|
@router.get("", response_model=list[Message])
|
|
async def list_messages(
|
|
limit: int = Query(default=100, ge=1, le=1000),
|
|
offset: int = Query(default=0, ge=0),
|
|
type: str | None = Query(default=None, description="Filter by type: PRIV or CHAN"),
|
|
conversation_key: str | None = Query(
|
|
default=None, description="Filter by conversation key (channel key or contact pubkey)"
|
|
),
|
|
before: int | None = Query(
|
|
default=None, description="Cursor: received_at of last seen message"
|
|
),
|
|
before_id: int | None = Query(default=None, description="Cursor: id of last seen message"),
|
|
) -> list[Message]:
|
|
"""List messages from the database."""
|
|
return await MessageRepository.get_all(
|
|
limit=limit,
|
|
offset=offset,
|
|
msg_type=type,
|
|
conversation_key=conversation_key,
|
|
before=before,
|
|
before_id=before_id,
|
|
)
|
|
|
|
|
|
@router.post("/direct", response_model=Message)
|
|
async def send_direct_message(request: SendDirectMessageRequest) -> Message:
|
|
"""Send a direct message to a contact."""
|
|
require_connected()
|
|
|
|
# First check our database for the contact
|
|
from app.repository import ContactRepository
|
|
|
|
try:
|
|
db_contact = await ContactRepository.get_by_key_or_prefix(request.destination)
|
|
except AmbiguousPublicKeyPrefixError as err:
|
|
sample = ", ".join(key[:12] for key in err.matches[:2])
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail=(
|
|
f"Ambiguous destination key prefix '{err.prefix}'. "
|
|
f"Use a full 64-character public key. Matching contacts: {sample}"
|
|
),
|
|
) from err
|
|
if not db_contact:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Contact not found in database: {request.destination}"
|
|
)
|
|
|
|
# Always add/update the contact on radio before sending.
|
|
# The library cache (get_contact_by_key_prefix) can be stale after radio reboot,
|
|
# so we can't rely on it to know if the firmware has the contact.
|
|
# add_contact is idempotent - updates if exists, adds if not.
|
|
contact_data = db_contact.to_radio_dict()
|
|
async with radio_manager.radio_operation("send_direct_message") as mc:
|
|
logger.debug("Ensuring contact %s is on radio before sending", db_contact.public_key[:12])
|
|
add_result = await mc.commands.add_contact(contact_data)
|
|
if add_result.type == EventType.ERROR:
|
|
logger.warning("Failed to add contact to radio: %s", add_result.payload)
|
|
# Continue anyway - might still work if contact exists
|
|
|
|
# Get the contact from the library cache (may have updated info like path)
|
|
contact = mc.get_contact_by_key_prefix(db_contact.public_key[:12])
|
|
if not contact:
|
|
contact = contact_data
|
|
|
|
logger.info("Sending direct message to %s", db_contact.public_key[:12])
|
|
|
|
# Capture timestamp BEFORE sending so we can pass the same value to both the radio
|
|
# and the database. This ensures consistency for deduplication.
|
|
now = int(time.time())
|
|
|
|
result = await mc.commands.send_msg(
|
|
dst=contact,
|
|
msg=request.text,
|
|
timestamp=now,
|
|
)
|
|
|
|
if result.type == EventType.ERROR:
|
|
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
|
|
|
# Store outgoing message
|
|
message_id = await MessageRepository.create(
|
|
msg_type="PRIV",
|
|
text=request.text,
|
|
conversation_key=db_contact.public_key.lower(),
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
if message_id is None:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to store outgoing message - unexpected duplicate",
|
|
)
|
|
|
|
# Update last_contacted for the contact
|
|
await ContactRepository.update_last_contacted(db_contact.public_key.lower(), now)
|
|
|
|
# Track the expected ACK for this message
|
|
expected_ack = result.payload.get("expected_ack")
|
|
suggested_timeout: int = result.payload.get("suggested_timeout", 10000) # default 10s
|
|
if expected_ack:
|
|
ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack
|
|
track_pending_ack(ack_code, message_id, suggested_timeout)
|
|
logger.debug("Tracking ACK %s for message %d", ack_code, message_id)
|
|
|
|
message = Message(
|
|
id=message_id,
|
|
type="PRIV",
|
|
conversation_key=db_contact.public_key.lower(),
|
|
text=request.text,
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
acked=0,
|
|
)
|
|
|
|
# Broadcast so all connected clients (not just sender) see the outgoing message immediately.
|
|
broadcast_event("message", message.model_dump())
|
|
|
|
# Trigger bots for outgoing DMs (runs in background, doesn't block response)
|
|
from app.bot import run_bot_for_message
|
|
|
|
asyncio.create_task(
|
|
run_bot_for_message(
|
|
sender_name=None,
|
|
sender_key=db_contact.public_key.lower(),
|
|
message_text=request.text,
|
|
is_dm=True,
|
|
channel_key=None,
|
|
channel_name=None,
|
|
sender_timestamp=now,
|
|
path=None,
|
|
is_outgoing=True,
|
|
)
|
|
)
|
|
|
|
return message
|
|
|
|
|
|
# Temporary radio slot used for sending channel messages
|
|
TEMP_RADIO_SLOT = 0
|
|
|
|
|
|
@router.post("/channel", response_model=Message)
|
|
async def send_channel_message(request: SendChannelMessageRequest) -> Message:
|
|
"""Send a message to a channel."""
|
|
require_connected()
|
|
|
|
# Get channel info from our database
|
|
from app.decoder import calculate_channel_hash
|
|
from app.repository import ChannelRepository
|
|
|
|
db_channel = await ChannelRepository.get_by_key(request.channel_key)
|
|
if not db_channel:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Channel {request.channel_key} not found in database"
|
|
)
|
|
|
|
# Convert channel key hex to bytes
|
|
try:
|
|
key_bytes = bytes.fromhex(request.channel_key)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400, detail=f"Invalid channel key format: {request.channel_key}"
|
|
) from None
|
|
|
|
expected_hash = calculate_channel_hash(key_bytes)
|
|
logger.info(
|
|
"Sending to channel %s (%s) via radio slot %d, key hash: %s",
|
|
request.channel_key,
|
|
db_channel.name,
|
|
TEMP_RADIO_SLOT,
|
|
expected_hash,
|
|
)
|
|
channel_key_upper = request.channel_key.upper()
|
|
message_id: int | None = None
|
|
now: int | None = None
|
|
radio_name: str = ""
|
|
text_with_sender: str = request.text
|
|
|
|
async with radio_manager.radio_operation("send_channel_message") as mc:
|
|
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
|
text_with_sender = f"{radio_name}: {request.text}" if radio_name else request.text
|
|
# Load the channel to a temporary radio slot before sending
|
|
set_result = await mc.commands.set_channel(
|
|
channel_idx=TEMP_RADIO_SLOT,
|
|
channel_name=db_channel.name,
|
|
channel_secret=key_bytes,
|
|
)
|
|
if set_result.type == EventType.ERROR:
|
|
logger.warning(
|
|
"Failed to set channel on radio slot %d before sending: %s",
|
|
TEMP_RADIO_SLOT,
|
|
set_result.payload,
|
|
)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to configure channel on radio before sending message",
|
|
)
|
|
|
|
logger.info("Sending channel message to %s: %s", db_channel.name, request.text[:50])
|
|
|
|
# Capture timestamp BEFORE sending so we can pass the same value to both the radio
|
|
# and the database. This ensures the echo's timestamp matches our stored message
|
|
# for proper deduplication.
|
|
now = int(time.time())
|
|
timestamp_bytes = now.to_bytes(4, "little")
|
|
|
|
result = await mc.commands.send_chan_msg(
|
|
chan=TEMP_RADIO_SLOT,
|
|
msg=request.text,
|
|
timestamp=timestamp_bytes,
|
|
)
|
|
|
|
if result.type == EventType.ERROR:
|
|
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
|
|
|
# Store outgoing immediately after send to avoid a race where
|
|
# our own echo lands before persistence.
|
|
message_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text=text_with_sender,
|
|
conversation_key=channel_key_upper,
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
if message_id is None:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to store outgoing message - unexpected duplicate",
|
|
)
|
|
|
|
# Broadcast immediately so all connected clients see the message promptly.
|
|
# This ensures the message exists in frontend state when echo-driven
|
|
# `message_acked` events arrive.
|
|
broadcast_event(
|
|
"message",
|
|
Message(
|
|
id=message_id,
|
|
type="CHAN",
|
|
conversation_key=channel_key_upper,
|
|
text=text_with_sender,
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
acked=0,
|
|
).model_dump(),
|
|
)
|
|
|
|
if message_id is None or now is None:
|
|
raise HTTPException(status_code=500, detail="Failed to store outgoing message")
|
|
|
|
acked_count, paths = await MessageRepository.get_ack_and_paths(message_id)
|
|
|
|
message = Message(
|
|
id=message_id,
|
|
type="CHAN",
|
|
conversation_key=channel_key_upper,
|
|
text=text_with_sender,
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
acked=acked_count,
|
|
paths=paths,
|
|
)
|
|
|
|
# Trigger bots for outgoing channel messages (runs in background, doesn't block response)
|
|
from app.bot import run_bot_for_message
|
|
|
|
asyncio.create_task(
|
|
run_bot_for_message(
|
|
sender_name=radio_name or None,
|
|
sender_key=None,
|
|
message_text=request.text,
|
|
is_dm=False,
|
|
channel_key=channel_key_upper,
|
|
channel_name=db_channel.name,
|
|
sender_timestamp=now,
|
|
path=None,
|
|
is_outgoing=True,
|
|
)
|
|
)
|
|
|
|
return message
|
|
|
|
|
|
RESEND_WINDOW_SECONDS = 30
|
|
|
|
|
|
@router.post("/channel/{message_id}/resend")
|
|
async def resend_channel_message(
|
|
message_id: int,
|
|
new_timestamp: bool = Query(default=False),
|
|
) -> dict:
|
|
"""Resend a channel message.
|
|
|
|
When new_timestamp=False (default): byte-perfect resend using the original timestamp.
|
|
Only allowed within 30 seconds of the original send.
|
|
|
|
When new_timestamp=True: resend with a fresh timestamp so repeaters treat it as a
|
|
new packet. Creates a new message row in the database. No time window restriction.
|
|
"""
|
|
require_connected()
|
|
|
|
from app.repository import ChannelRepository
|
|
|
|
msg = await MessageRepository.get_by_id(message_id)
|
|
if not msg:
|
|
raise HTTPException(status_code=404, detail="Message not found")
|
|
|
|
if not msg.outgoing:
|
|
raise HTTPException(status_code=400, detail="Can only resend outgoing messages")
|
|
|
|
if msg.type != "CHAN":
|
|
raise HTTPException(status_code=400, detail="Can only resend channel messages")
|
|
|
|
if msg.sender_timestamp is None:
|
|
raise HTTPException(status_code=400, detail="Message has no timestamp")
|
|
|
|
# Byte-perfect resend enforces the 30s window; new-timestamp resend does not
|
|
if not new_timestamp:
|
|
elapsed = int(time.time()) - msg.sender_timestamp
|
|
if elapsed > RESEND_WINDOW_SECONDS:
|
|
raise HTTPException(status_code=400, detail="Resend window has expired (30 seconds)")
|
|
|
|
db_channel = await ChannelRepository.get_by_key(msg.conversation_key)
|
|
if not db_channel:
|
|
raise HTTPException(status_code=404, detail=f"Channel {msg.conversation_key} not found")
|
|
|
|
# Choose timestamp: original for byte-perfect, fresh for new-timestamp
|
|
if new_timestamp:
|
|
now = int(time.time())
|
|
timestamp_bytes = now.to_bytes(4, "little")
|
|
else:
|
|
timestamp_bytes = msg.sender_timestamp.to_bytes(4, "little")
|
|
|
|
try:
|
|
key_bytes = bytes.fromhex(msg.conversation_key)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400, detail=f"Invalid channel key format: {msg.conversation_key}"
|
|
) from None
|
|
|
|
async with radio_manager.radio_operation("resend_channel_message") as mc:
|
|
# Strip sender prefix: DB stores "RadioName: message" but radio needs "message"
|
|
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
|
text_to_send = msg.text
|
|
if radio_name and text_to_send.startswith(f"{radio_name}: "):
|
|
text_to_send = text_to_send[len(f"{radio_name}: ") :]
|
|
|
|
set_result = await mc.commands.set_channel(
|
|
channel_idx=TEMP_RADIO_SLOT,
|
|
channel_name=db_channel.name,
|
|
channel_secret=key_bytes,
|
|
)
|
|
if set_result.type == EventType.ERROR:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to configure channel on radio before resending",
|
|
)
|
|
|
|
result = await mc.commands.send_chan_msg(
|
|
chan=TEMP_RADIO_SLOT,
|
|
msg=text_to_send,
|
|
timestamp=timestamp_bytes,
|
|
)
|
|
if result.type == EventType.ERROR:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Failed to resend message: {result.payload}"
|
|
)
|
|
|
|
# For new-timestamp resend, create a new message row and broadcast it
|
|
if new_timestamp:
|
|
new_msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text=msg.text,
|
|
conversation_key=msg.conversation_key,
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
if new_msg_id is None:
|
|
# Timestamp-second collision (same text+channel within the same second).
|
|
# The radio already transmitted, so log and return the original ID rather
|
|
# than surfacing a 500 for a message that was successfully sent over the air.
|
|
logger.warning(
|
|
"Duplicate timestamp collision resending message %d — radio sent but DB row not created",
|
|
message_id,
|
|
)
|
|
return {"status": "ok", "message_id": message_id}
|
|
|
|
broadcast_event(
|
|
"message",
|
|
Message(
|
|
id=new_msg_id,
|
|
type="CHAN",
|
|
conversation_key=msg.conversation_key,
|
|
text=msg.text,
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
acked=0,
|
|
).model_dump(),
|
|
)
|
|
|
|
logger.info(
|
|
"Resent channel message %d as new message %d to %s",
|
|
message_id,
|
|
new_msg_id,
|
|
db_channel.name,
|
|
)
|
|
return {"status": "ok", "message_id": new_msg_id}
|
|
|
|
logger.info("Resent channel message %d to %s", message_id, db_channel.name)
|
|
return {"status": "ok", "message_id": message_id}
|