mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
Prevent same-second outgoing collision now that we can send faster.
Also add pending ack tracking
This commit is contained in:
@@ -51,9 +51,8 @@ CREATE TABLE IF NOT EXISTS messages (
|
||||
sender_name TEXT,
|
||||
sender_key TEXT
|
||||
-- Deduplication: identical text + timestamp in the same conversation is treated as a
|
||||
-- mesh echo/repeat. Second-precision timestamps mean two intentional identical messages
|
||||
-- within the same second would collide, but this is not feasible in practice — LoRa
|
||||
-- transmission takes several seconds per message, and the UI clears the input on send.
|
||||
-- mesh echo/repeat. Outgoing sends allocate a collision-free sender_timestamp before
|
||||
-- transmit so legitimate repeat sends do not collide with this index.
|
||||
-- Enforced via idx_messages_dedup_null_safe (unique index) rather than a table constraint
|
||||
-- to avoid the storage overhead of SQLite's autoindex duplicating every message text.
|
||||
);
|
||||
|
||||
@@ -28,11 +28,12 @@ logger = logging.getLogger(__name__)
|
||||
# This prevents handler duplication after reconnects
|
||||
_active_subscriptions: list["Subscription"] = []
|
||||
_pending_acks = dm_ack_tracker._pending_acks
|
||||
_buffered_acks = dm_ack_tracker._buffered_acks
|
||||
|
||||
|
||||
def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> None:
|
||||
def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> bool:
|
||||
"""Compatibility wrapper for pending DM ACK tracking."""
|
||||
dm_ack_tracker.track_pending_ack(expected_ack, message_id, timeout_ms)
|
||||
return dm_ack_tracker.track_pending_ack(expected_ack, message_id, timeout_ms)
|
||||
|
||||
|
||||
def cleanup_expired_acks() -> None:
|
||||
@@ -302,6 +303,7 @@ async def on_ack(event: "Event") -> None:
|
||||
# preserving any previously known paths.
|
||||
await increment_ack_and_broadcast(message_id=message_id, broadcast_fn=broadcast_event)
|
||||
else:
|
||||
dm_ack_tracker.buffer_unmatched_ack(ack_code)
|
||||
logger.debug("ACK code %s does not match any pending messages", ack_code)
|
||||
|
||||
|
||||
|
||||
@@ -6,12 +6,26 @@ import time
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PendingAck = tuple[int, float, int]
|
||||
BUFFERED_ACK_TTL_SECONDS = 30.0
|
||||
|
||||
_pending_acks: dict[str, PendingAck] = {}
|
||||
_buffered_acks: dict[str, float] = {}
|
||||
|
||||
|
||||
def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> None:
|
||||
"""Track an expected ACK code for an outgoing direct message."""
|
||||
def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> bool:
|
||||
"""Track an expected ACK code for an outgoing direct message.
|
||||
|
||||
Returns True when the ACK was already observed and buffered before registration.
|
||||
"""
|
||||
buffered_at = _buffered_acks.pop(expected_ack, None)
|
||||
if buffered_at is not None:
|
||||
logger.debug(
|
||||
"Matched buffered ACK %s immediately for message %d",
|
||||
expected_ack,
|
||||
message_id,
|
||||
)
|
||||
return True
|
||||
|
||||
_pending_acks[expected_ack] = (message_id, time.time(), timeout_ms)
|
||||
logger.debug(
|
||||
"Tracking pending ACK %s for message %d (timeout %dms)",
|
||||
@@ -19,6 +33,13 @@ def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> No
|
||||
message_id,
|
||||
timeout_ms,
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def buffer_unmatched_ack(ack_code: str) -> None:
|
||||
"""Remember an ACK that arrived before its message registration."""
|
||||
_buffered_acks[ack_code] = time.time()
|
||||
logger.debug("Buffered unmatched ACK %s for late registration", ack_code)
|
||||
|
||||
|
||||
def cleanup_expired_acks() -> None:
|
||||
@@ -33,6 +54,15 @@ def cleanup_expired_acks() -> None:
|
||||
del _pending_acks[code]
|
||||
logger.debug("Expired pending ACK %s", code)
|
||||
|
||||
expired_buffered_codes = [
|
||||
code
|
||||
for code, buffered_at in _buffered_acks.items()
|
||||
if now - buffered_at > BUFFERED_ACK_TTL_SECONDS
|
||||
]
|
||||
for code in expired_buffered_codes:
|
||||
del _buffered_acks[code]
|
||||
logger.debug("Expired buffered ACK %s", code)
|
||||
|
||||
|
||||
def pop_pending_ack(ack_code: str) -> int | None:
|
||||
"""Claim the tracked message ID for an ACK code if present."""
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Shared send/resend orchestration for outgoing messages."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
@@ -13,13 +14,85 @@ from app.services.messages import (
|
||||
build_message_model,
|
||||
create_outgoing_channel_message,
|
||||
create_outgoing_direct_message,
|
||||
increment_ack_and_broadcast,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BroadcastFn = Callable[..., Any]
|
||||
TrackAckFn = Callable[[str, int, int], None]
|
||||
TrackAckFn = Callable[[str, int, int], bool]
|
||||
NowFn = Callable[[], float]
|
||||
OutgoingReservationKey = tuple[str, str, str]
|
||||
|
||||
_pending_outgoing_timestamp_reservations: dict[OutgoingReservationKey, set[int]] = {}
|
||||
_outgoing_timestamp_reservations_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def allocate_outgoing_sender_timestamp(
|
||||
*,
|
||||
message_repository,
|
||||
msg_type: str,
|
||||
conversation_key: str,
|
||||
text: str,
|
||||
requested_timestamp: int,
|
||||
) -> int:
|
||||
"""Pick a sender timestamp that will not collide with an existing stored message."""
|
||||
reservation_key = (msg_type, conversation_key, text)
|
||||
candidate = requested_timestamp
|
||||
while True:
|
||||
async with _outgoing_timestamp_reservations_lock:
|
||||
reserved = _pending_outgoing_timestamp_reservations.get(reservation_key, set())
|
||||
is_reserved = candidate in reserved
|
||||
|
||||
if is_reserved:
|
||||
candidate += 1
|
||||
continue
|
||||
|
||||
existing = await message_repository.get_by_content(
|
||||
msg_type=msg_type,
|
||||
conversation_key=conversation_key,
|
||||
text=text,
|
||||
sender_timestamp=candidate,
|
||||
)
|
||||
if existing is not None:
|
||||
candidate += 1
|
||||
continue
|
||||
|
||||
async with _outgoing_timestamp_reservations_lock:
|
||||
reserved = _pending_outgoing_timestamp_reservations.setdefault(reservation_key, set())
|
||||
if candidate in reserved:
|
||||
candidate += 1
|
||||
continue
|
||||
reserved.add(candidate)
|
||||
break
|
||||
|
||||
if candidate != requested_timestamp:
|
||||
logger.info(
|
||||
"Bumped outgoing %s timestamp for %s from %d to %d to avoid same-content collision",
|
||||
msg_type,
|
||||
conversation_key[:12],
|
||||
requested_timestamp,
|
||||
candidate,
|
||||
)
|
||||
|
||||
return candidate
|
||||
|
||||
|
||||
async def release_outgoing_sender_timestamp(
|
||||
*,
|
||||
msg_type: str,
|
||||
conversation_key: str,
|
||||
text: str,
|
||||
sender_timestamp: int,
|
||||
) -> None:
|
||||
reservation_key = (msg_type, conversation_key, text)
|
||||
async with _outgoing_timestamp_reservations_lock:
|
||||
reserved = _pending_outgoing_timestamp_reservations.get(reservation_key)
|
||||
if not reserved:
|
||||
return
|
||||
reserved.discard(sender_timestamp)
|
||||
if not reserved:
|
||||
_pending_outgoing_timestamp_reservations.pop(reservation_key, None)
|
||||
|
||||
|
||||
async def send_channel_message_with_effective_scope(
|
||||
@@ -175,49 +248,78 @@ async def send_direct_message_to_contact(
|
||||
) -> Any:
|
||||
"""Send a direct message and persist/broadcast the outgoing row."""
|
||||
contact_data = contact.to_radio_dict()
|
||||
async with radio_manager.radio_operation("send_direct_message") as mc:
|
||||
logger.debug("Ensuring contact %s is on radio before sending", contact.public_key[:12])
|
||||
add_result = await mc.commands.add_contact(contact_data)
|
||||
if add_result.type == EventType.ERROR:
|
||||
logger.warning("Failed to add contact to radio: %s", add_result.payload)
|
||||
sent_at: int | None = None
|
||||
sender_timestamp: int | None = None
|
||||
message = None
|
||||
result = None
|
||||
try:
|
||||
async with radio_manager.radio_operation("send_direct_message") as mc:
|
||||
logger.debug("Ensuring contact %s is on radio before sending", contact.public_key[:12])
|
||||
add_result = await mc.commands.add_contact(contact_data)
|
||||
if add_result.type == EventType.ERROR:
|
||||
logger.warning("Failed to add contact to radio: %s", add_result.payload)
|
||||
|
||||
cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
|
||||
if not cached_contact:
|
||||
cached_contact = contact_data
|
||||
cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
|
||||
if not cached_contact:
|
||||
cached_contact = contact_data
|
||||
|
||||
logger.info("Sending direct message to %s", contact.public_key[:12])
|
||||
now = int(now_fn())
|
||||
result = await mc.commands.send_msg(
|
||||
dst=cached_contact,
|
||||
msg=text,
|
||||
timestamp=now,
|
||||
logger.info("Sending direct message to %s", contact.public_key[:12])
|
||||
sent_at = int(now_fn())
|
||||
sender_timestamp = await allocate_outgoing_sender_timestamp(
|
||||
message_repository=message_repository,
|
||||
msg_type="PRIV",
|
||||
conversation_key=contact.public_key.lower(),
|
||||
text=text,
|
||||
requested_timestamp=sent_at,
|
||||
)
|
||||
result = await mc.commands.send_msg(
|
||||
dst=cached_contact,
|
||||
msg=text,
|
||||
timestamp=sender_timestamp,
|
||||
)
|
||||
|
||||
if result is None or result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
||||
|
||||
message = await create_outgoing_direct_message(
|
||||
conversation_key=contact.public_key.lower(),
|
||||
text=text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
received_at=sent_at,
|
||||
broadcast_fn=broadcast_fn,
|
||||
message_repository=message_repository,
|
||||
)
|
||||
if message is None:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to store outgoing message - unexpected duplicate",
|
||||
)
|
||||
finally:
|
||||
if sender_timestamp is not None:
|
||||
await release_outgoing_sender_timestamp(
|
||||
msg_type="PRIV",
|
||||
conversation_key=contact.public_key.lower(),
|
||||
text=text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
)
|
||||
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
||||
if sent_at is None or sender_timestamp is None or message is None or result is None:
|
||||
raise HTTPException(status_code=500, detail="Failed to store outgoing message")
|
||||
|
||||
message = await create_outgoing_direct_message(
|
||||
conversation_key=contact.public_key.lower(),
|
||||
text=text,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
broadcast_fn=broadcast_fn,
|
||||
message_repository=message_repository,
|
||||
)
|
||||
if message is None:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to store outgoing message - unexpected duplicate",
|
||||
)
|
||||
|
||||
await contact_repository.update_last_contacted(contact.public_key.lower(), now)
|
||||
await contact_repository.update_last_contacted(contact.public_key.lower(), sent_at)
|
||||
|
||||
expected_ack = result.payload.get("expected_ack")
|
||||
suggested_timeout: int = result.payload.get("suggested_timeout", 10000)
|
||||
if expected_ack:
|
||||
ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack
|
||||
track_pending_ack_fn(ack_code, message.id, suggested_timeout)
|
||||
matched_immediately = track_pending_ack_fn(ack_code, message.id, suggested_timeout) is True
|
||||
logger.debug("Tracking ACK %s for message %d", ack_code, message.id)
|
||||
if matched_immediately:
|
||||
ack_count = await increment_ack_and_broadcast(
|
||||
message_id=message.id,
|
||||
broadcast_fn=broadcast_fn,
|
||||
)
|
||||
message.acked = ack_count
|
||||
|
||||
return message
|
||||
|
||||
@@ -236,56 +338,76 @@ async def send_channel_message_to_channel(
|
||||
message_repository=MessageRepository,
|
||||
) -> Any:
|
||||
"""Send a channel message and persist/broadcast the outgoing row."""
|
||||
now: int | None = None
|
||||
sent_at: int | None = None
|
||||
sender_timestamp: int | None = None
|
||||
radio_name = ""
|
||||
our_public_key: str | None = None
|
||||
text_with_sender = text
|
||||
outgoing_message = None
|
||||
|
||||
async with radio_manager.radio_operation("send_channel_message") as mc:
|
||||
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
||||
our_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None
|
||||
text_with_sender = f"{radio_name}: {text}" if radio_name else text
|
||||
logger.info("Sending channel message to %s: %s", channel.name, text[:50])
|
||||
try:
|
||||
async with radio_manager.radio_operation("send_channel_message") as mc:
|
||||
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
||||
our_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None
|
||||
text_with_sender = f"{radio_name}: {text}" if radio_name else text
|
||||
logger.info("Sending channel message to %s: %s", channel.name, text[:50])
|
||||
|
||||
now = int(now_fn())
|
||||
timestamp_bytes = now.to_bytes(4, "little")
|
||||
sent_at = int(now_fn())
|
||||
sender_timestamp = await allocate_outgoing_sender_timestamp(
|
||||
message_repository=message_repository,
|
||||
msg_type="CHAN",
|
||||
conversation_key=channel_key_upper,
|
||||
text=text_with_sender,
|
||||
requested_timestamp=sent_at,
|
||||
)
|
||||
timestamp_bytes = sender_timestamp.to_bytes(4, "little")
|
||||
|
||||
result = await send_channel_message_with_effective_scope(
|
||||
mc=mc,
|
||||
channel=channel,
|
||||
channel_key=channel_key_upper,
|
||||
key_bytes=key_bytes,
|
||||
text=text,
|
||||
timestamp_bytes=timestamp_bytes,
|
||||
action_label="sending message",
|
||||
radio_manager=radio_manager,
|
||||
temp_radio_slot=temp_radio_slot,
|
||||
error_broadcast_fn=error_broadcast_fn,
|
||||
result = await send_channel_message_with_effective_scope(
|
||||
mc=mc,
|
||||
channel=channel,
|
||||
channel_key=channel_key_upper,
|
||||
key_bytes=key_bytes,
|
||||
text=text,
|
||||
timestamp_bytes=timestamp_bytes,
|
||||
action_label="sending message",
|
||||
radio_manager=radio_manager,
|
||||
temp_radio_slot=temp_radio_slot,
|
||||
error_broadcast_fn=error_broadcast_fn,
|
||||
)
|
||||
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to send message: {result.payload}"
|
||||
)
|
||||
|
||||
outgoing_message = await create_outgoing_channel_message(
|
||||
conversation_key=channel_key_upper,
|
||||
text=text_with_sender,
|
||||
sender_timestamp=sender_timestamp,
|
||||
received_at=sent_at,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=our_public_key,
|
||||
channel_name=channel.name,
|
||||
broadcast_fn=broadcast_fn,
|
||||
message_repository=message_repository,
|
||||
)
|
||||
if outgoing_message is None:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to store outgoing message - unexpected duplicate",
|
||||
)
|
||||
finally:
|
||||
if sender_timestamp is not None:
|
||||
await release_outgoing_sender_timestamp(
|
||||
msg_type="CHAN",
|
||||
conversation_key=channel_key_upper,
|
||||
text=text_with_sender,
|
||||
sender_timestamp=sender_timestamp,
|
||||
)
|
||||
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}")
|
||||
|
||||
if now is None:
|
||||
if sent_at is None or sender_timestamp is None or outgoing_message is None:
|
||||
raise HTTPException(status_code=500, detail="Failed to store outgoing message")
|
||||
|
||||
outgoing_message = await create_outgoing_channel_message(
|
||||
conversation_key=channel_key_upper,
|
||||
text=text_with_sender,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=our_public_key,
|
||||
channel_name=channel.name,
|
||||
broadcast_fn=broadcast_fn,
|
||||
message_repository=message_repository,
|
||||
)
|
||||
if outgoing_message is None:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to store outgoing message - unexpected duplicate",
|
||||
)
|
||||
|
||||
message_id = outgoing_message.id
|
||||
acked_count, paths = await message_repository.get_ack_and_paths(message_id)
|
||||
return build_message_model(
|
||||
@@ -293,8 +415,8 @@ async def send_channel_message_to_channel(
|
||||
msg_type="CHAN",
|
||||
conversation_key=channel_key_upper,
|
||||
text=text_with_sender,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
sender_timestamp=sender_timestamp,
|
||||
received_at=sent_at,
|
||||
paths=paths,
|
||||
outgoing=True,
|
||||
acked=acked_count,
|
||||
@@ -325,61 +447,82 @@ async def resend_channel_message_record(
|
||||
detail=f"Invalid channel key format: {message.conversation_key}",
|
||||
) from None
|
||||
|
||||
now: int | None = None
|
||||
if new_timestamp:
|
||||
now = int(now_fn())
|
||||
timestamp_bytes = now.to_bytes(4, "little")
|
||||
else:
|
||||
timestamp_bytes = message.sender_timestamp.to_bytes(4, "little")
|
||||
sent_at: int | None = None
|
||||
sender_timestamp = message.sender_timestamp
|
||||
timestamp_bytes = message.sender_timestamp.to_bytes(4, "little")
|
||||
|
||||
resend_public_key: str | None = None
|
||||
radio_name = ""
|
||||
new_message = None
|
||||
stored_text = message.text
|
||||
|
||||
async with radio_manager.radio_operation("resend_channel_message") as mc:
|
||||
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
||||
resend_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None
|
||||
text_to_send = message.text
|
||||
if radio_name and text_to_send.startswith(f"{radio_name}: "):
|
||||
text_to_send = text_to_send[len(f"{radio_name}: ") :]
|
||||
try:
|
||||
async with radio_manager.radio_operation("resend_channel_message") as mc:
|
||||
radio_name = mc.self_info.get("name", "") if mc.self_info else ""
|
||||
resend_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None
|
||||
text_to_send = message.text
|
||||
if radio_name and text_to_send.startswith(f"{radio_name}: "):
|
||||
text_to_send = text_to_send[len(f"{radio_name}: ") :]
|
||||
if new_timestamp:
|
||||
sent_at = int(now_fn())
|
||||
sender_timestamp = await allocate_outgoing_sender_timestamp(
|
||||
message_repository=message_repository,
|
||||
msg_type="CHAN",
|
||||
conversation_key=message.conversation_key,
|
||||
text=stored_text,
|
||||
requested_timestamp=sent_at,
|
||||
)
|
||||
timestamp_bytes = sender_timestamp.to_bytes(4, "little")
|
||||
|
||||
result = await send_channel_message_with_effective_scope(
|
||||
mc=mc,
|
||||
channel=channel,
|
||||
channel_key=message.conversation_key,
|
||||
key_bytes=key_bytes,
|
||||
text=text_to_send,
|
||||
timestamp_bytes=timestamp_bytes,
|
||||
action_label="resending message",
|
||||
radio_manager=radio_manager,
|
||||
temp_radio_slot=temp_radio_slot,
|
||||
error_broadcast_fn=error_broadcast_fn,
|
||||
)
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to resend message: {result.payload}",
|
||||
result = await send_channel_message_with_effective_scope(
|
||||
mc=mc,
|
||||
channel=channel,
|
||||
channel_key=message.conversation_key,
|
||||
key_bytes=key_bytes,
|
||||
text=text_to_send,
|
||||
timestamp_bytes=timestamp_bytes,
|
||||
action_label="resending message",
|
||||
radio_manager=radio_manager,
|
||||
temp_radio_slot=temp_radio_slot,
|
||||
error_broadcast_fn=error_broadcast_fn,
|
||||
)
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to resend message: {result.payload}",
|
||||
)
|
||||
|
||||
if new_timestamp:
|
||||
if sent_at is None:
|
||||
raise HTTPException(status_code=500, detail="Failed to assign resend timestamp")
|
||||
new_message = await create_outgoing_channel_message(
|
||||
conversation_key=message.conversation_key,
|
||||
text=message.text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
received_at=sent_at,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=resend_public_key,
|
||||
channel_name=channel.name,
|
||||
broadcast_fn=broadcast_fn,
|
||||
message_repository=message_repository,
|
||||
)
|
||||
if new_message is None:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to store resent message - unexpected duplicate",
|
||||
)
|
||||
finally:
|
||||
if new_timestamp and sent_at is not None:
|
||||
await release_outgoing_sender_timestamp(
|
||||
msg_type="CHAN",
|
||||
conversation_key=message.conversation_key,
|
||||
text=stored_text,
|
||||
sender_timestamp=sender_timestamp,
|
||||
)
|
||||
|
||||
if new_timestamp:
|
||||
if now is None:
|
||||
if sent_at is None or new_message is None:
|
||||
raise HTTPException(status_code=500, detail="Failed to assign resend timestamp")
|
||||
new_message = await create_outgoing_channel_message(
|
||||
conversation_key=message.conversation_key,
|
||||
text=message.text,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
sender_name=radio_name or None,
|
||||
sender_key=resend_public_key,
|
||||
channel_name=channel.name,
|
||||
broadcast_fn=broadcast_fn,
|
||||
message_repository=message_repository,
|
||||
)
|
||||
if new_message is None:
|
||||
logger.warning(
|
||||
"Duplicate timestamp collision resending message %d — radio sent but DB row not created",
|
||||
message.id,
|
||||
)
|
||||
return {"status": "ok", "message_id": message.id}
|
||||
|
||||
logger.info(
|
||||
"Resent channel message %d as new message %d to %s",
|
||||
|
||||
@@ -387,6 +387,7 @@ class TestMessagesEndpoint:
|
||||
_patch_require_connected(mock_mc),
|
||||
patch("app.routers.messages.MessageRepository") as mock_msg_repo,
|
||||
):
|
||||
mock_msg_repo.get_by_content = AsyncMock(return_value=None)
|
||||
# Simulate duplicate - create returns None
|
||||
mock_msg_repo.create = AsyncMock(return_value=None)
|
||||
|
||||
@@ -422,6 +423,7 @@ class TestMessagesEndpoint:
|
||||
_patch_require_connected(mock_mc),
|
||||
patch("app.routers.messages.MessageRepository") as mock_msg_repo,
|
||||
):
|
||||
mock_msg_repo.get_by_content = AsyncMock(return_value=None)
|
||||
# Simulate duplicate - create returns None
|
||||
mock_msg_repo.create = AsyncMock(return_value=None)
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import pytest
|
||||
|
||||
from app.event_handlers import (
|
||||
_active_subscriptions,
|
||||
_buffered_acks,
|
||||
_pending_acks,
|
||||
cleanup_expired_acks,
|
||||
register_event_handlers,
|
||||
@@ -26,9 +27,11 @@ from app.repository import (
|
||||
def clear_test_state():
|
||||
"""Clear pending ACKs and subscriptions before each test."""
|
||||
_pending_acks.clear()
|
||||
_buffered_acks.clear()
|
||||
_active_subscriptions.clear()
|
||||
yield
|
||||
_pending_acks.clear()
|
||||
_buffered_acks.clear()
|
||||
_active_subscriptions.clear()
|
||||
|
||||
|
||||
@@ -107,6 +110,28 @@ class TestAckTracking:
|
||||
assert len(_pending_acks) == 50
|
||||
assert all(k.startswith("valid_") for k in _pending_acks)
|
||||
|
||||
def test_track_pending_ack_consumes_buffered_ack(self):
|
||||
"""A buffered ACK should be matched immediately when the send registers later."""
|
||||
_buffered_acks["early"] = time.time()
|
||||
|
||||
matched = track_pending_ack("early", message_id=42, timeout_ms=5000)
|
||||
|
||||
assert matched is True
|
||||
assert "early" not in _buffered_acks
|
||||
assert "early" not in _pending_acks
|
||||
|
||||
def test_cleanup_removes_expired_buffered_acks(self):
|
||||
"""Buffered ACKs should expire so unmatched early ACKs do not leak forever."""
|
||||
from app.services.dm_ack_tracker import BUFFERED_ACK_TTL_SECONDS
|
||||
|
||||
_buffered_acks["stale"] = time.time() - (BUFFERED_ACK_TTL_SECONDS + 1)
|
||||
_buffered_acks["fresh"] = time.time()
|
||||
|
||||
cleanup_expired_acks()
|
||||
|
||||
assert "stale" not in _buffered_acks
|
||||
assert "fresh" in _buffered_acks
|
||||
|
||||
|
||||
class TestAckEventHandler:
|
||||
"""Test the on_ack event handler."""
|
||||
@@ -174,6 +199,7 @@ class TestAckEventHandler:
|
||||
|
||||
mock_broadcast.assert_not_called()
|
||||
assert "expected" in _pending_acks
|
||||
assert "different" in _buffered_acks
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ack_empty_code_ignored(self, test_db):
|
||||
@@ -189,6 +215,35 @@ class TestAckEventHandler:
|
||||
|
||||
mock_broadcast.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_buffered_ack_can_be_claimed_after_early_arrival(self, test_db):
|
||||
"""An ACK that arrives before registration should be recoverable."""
|
||||
from app.event_handlers import on_ack
|
||||
|
||||
msg_id = await MessageRepository.create(
|
||||
msg_type="PRIV",
|
||||
text="Hello",
|
||||
received_at=1700000000,
|
||||
conversation_key="aa" * 32,
|
||||
sender_timestamp=1700000000,
|
||||
outgoing=True,
|
||||
)
|
||||
|
||||
with patch("app.event_handlers.broadcast_event") as mock_broadcast:
|
||||
|
||||
class MockEvent:
|
||||
payload = {"code": "earlyack"}
|
||||
|
||||
await on_ack(MockEvent())
|
||||
|
||||
assert "earlyack" in _buffered_acks
|
||||
assert track_pending_ack("earlyack", message_id=msg_id, timeout_ms=10000) is True
|
||||
assert "earlyack" not in _buffered_acks
|
||||
assert "earlyack" not in _pending_acks
|
||||
ack_count, _ = await MessageRepository.get_ack_and_paths(msg_id)
|
||||
assert ack_count == 0
|
||||
mock_broadcast.assert_not_called()
|
||||
|
||||
|
||||
class TestContactMessageCLIFiltering:
|
||||
"""Test that CLI responses (txt_type=1) are filtered out."""
|
||||
|
||||
@@ -24,6 +24,7 @@ from app.routers.messages import (
|
||||
send_channel_message,
|
||||
send_direct_message,
|
||||
)
|
||||
from app.services import dm_ack_tracker
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -35,6 +36,8 @@ def _reset_radio_state():
|
||||
prev_connection_info = radio_manager._connection_info
|
||||
prev_slot_by_key = radio_manager._channel_slot_by_key.copy()
|
||||
prev_key_by_slot = radio_manager._channel_key_by_slot.copy()
|
||||
prev_pending_acks = dm_ack_tracker._pending_acks.copy()
|
||||
prev_buffered_acks = dm_ack_tracker._buffered_acks.copy()
|
||||
yield
|
||||
radio_manager._meshcore = prev
|
||||
radio_manager._operation_lock = prev_lock
|
||||
@@ -42,6 +45,10 @@ def _reset_radio_state():
|
||||
radio_manager._connection_info = prev_connection_info
|
||||
radio_manager._channel_slot_by_key = prev_slot_by_key
|
||||
radio_manager._channel_key_by_slot = prev_key_by_slot
|
||||
dm_ack_tracker._pending_acks.clear()
|
||||
dm_ack_tracker._pending_acks.update(prev_pending_acks)
|
||||
dm_ack_tracker._buffered_acks.clear()
|
||||
dm_ack_tracker._buffered_acks.update(prev_buffered_acks)
|
||||
|
||||
|
||||
def _make_radio_result(payload=None):
|
||||
@@ -190,6 +197,80 @@ class TestOutgoingDMBroadcast:
|
||||
assert contact_payload["out_path_len"] == 2
|
||||
assert contact_payload["out_path_hash_mode"] == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_dm_same_second_duplicate_bumps_timestamp(self, test_db):
|
||||
mc = _make_mc()
|
||||
pub_key = "fa" * 32
|
||||
await _insert_contact(pub_key, "Alice")
|
||||
|
||||
now = int(time.time())
|
||||
original_id = await MessageRepository.create(
|
||||
msg_type="PRIV",
|
||||
text="hello",
|
||||
conversation_key=pub_key,
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
outgoing=True,
|
||||
)
|
||||
assert original_id is not None
|
||||
|
||||
with (
|
||||
patch("app.routers.messages.require_connected", return_value=mc),
|
||||
patch.object(radio_manager, "_meshcore", mc),
|
||||
patch("app.routers.messages.broadcast_event"),
|
||||
patch("app.routers.messages.time") as mock_time,
|
||||
):
|
||||
mock_time.time.return_value = float(now)
|
||||
result = await send_direct_message(
|
||||
SendDirectMessageRequest(destination=pub_key, text="hello")
|
||||
)
|
||||
|
||||
assert result.id != original_id
|
||||
assert result.sender_timestamp == now + 1
|
||||
assert result.received_at == now
|
||||
assert mc.commands.send_msg.await_args.kwargs["timestamp"] == now + 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_dm_applies_buffered_ack_from_early_arrival(self, test_db):
|
||||
from app.event_handlers import on_ack
|
||||
|
||||
mc = _make_mc()
|
||||
ack_bytes = b"\xde\xad\xbe\xef"
|
||||
result = MagicMock()
|
||||
result.type = EventType.MSG_SENT
|
||||
result.payload = {
|
||||
"expected_ack": ack_bytes,
|
||||
"suggested_timeout": 8000,
|
||||
}
|
||||
mc.commands.send_msg = AsyncMock(return_value=result)
|
||||
|
||||
pub_key = "fb" * 32
|
||||
await _insert_contact(pub_key, "Alice")
|
||||
|
||||
class MockAckEvent:
|
||||
payload = {"code": "deadbeef"}
|
||||
|
||||
broadcasts = []
|
||||
|
||||
def capture_broadcast(event_type, data):
|
||||
broadcasts.append((event_type, data))
|
||||
|
||||
with (
|
||||
patch("app.event_handlers.broadcast_event", side_effect=capture_broadcast),
|
||||
patch("app.routers.messages.require_connected", return_value=mc),
|
||||
patch.object(radio_manager, "_meshcore", mc),
|
||||
patch("app.routers.messages.broadcast_event", side_effect=capture_broadcast),
|
||||
):
|
||||
await on_ack(MockAckEvent())
|
||||
message = await send_direct_message(
|
||||
SendDirectMessageRequest(destination=pub_key, text="Hello")
|
||||
)
|
||||
|
||||
ack_count, _ = await MessageRepository.get_ack_and_paths(message.id)
|
||||
assert ack_count == 1
|
||||
assert message.acked == 1
|
||||
assert any(event_type == "message_acked" for event_type, _data in broadcasts)
|
||||
|
||||
|
||||
class TestOutgoingChannelBroadcast:
|
||||
"""Test that outgoing channel messages are broadcast via broadcast_event for fanout dispatch."""
|
||||
@@ -223,6 +304,42 @@ class TestOutgoingChannelBroadcast:
|
||||
assert data["sender_name"] == "MyNode"
|
||||
assert data["channel_name"] == "#general"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_channel_same_second_duplicate_bumps_timestamp(self, test_db):
|
||||
mc = _make_mc(name="MyNode")
|
||||
chan_key = "ac" * 16
|
||||
await ChannelRepository.upsert(key=chan_key, name="#general")
|
||||
|
||||
now = int(time.time())
|
||||
original_id = await MessageRepository.create(
|
||||
msg_type="CHAN",
|
||||
text="MyNode: hello",
|
||||
conversation_key=chan_key.upper(),
|
||||
sender_timestamp=now,
|
||||
received_at=now,
|
||||
outgoing=True,
|
||||
)
|
||||
assert original_id is not None
|
||||
|
||||
with (
|
||||
patch("app.routers.messages.require_connected", return_value=mc),
|
||||
patch.object(radio_manager, "_meshcore", mc),
|
||||
patch("app.routers.messages.broadcast_event"),
|
||||
patch("app.routers.messages.time") as mock_time,
|
||||
):
|
||||
mock_time.time.return_value = float(now)
|
||||
result = await send_channel_message(
|
||||
SendChannelMessageRequest(channel_key=chan_key, text="hello")
|
||||
)
|
||||
|
||||
assert result.id != original_id
|
||||
assert result.sender_timestamp == now + 1
|
||||
assert result.received_at == now
|
||||
sent_timestamp = int.from_bytes(
|
||||
mc.commands.send_chan_msg.await_args.kwargs["timestamp"], "little"
|
||||
)
|
||||
assert sent_timestamp == now + 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_channel_msg_response_includes_current_ack_count(self, test_db):
|
||||
"""Send response reflects latest DB ack count at response time."""
|
||||
@@ -619,8 +736,8 @@ class TestResendChannelMessage:
|
||||
assert "restore failed" in mock_broadcast_error.call_args.args[0].lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resend_new_timestamp_collision_returns_original_id(self, test_db):
|
||||
"""When new-timestamp resend collides (same second), return original ID gracefully."""
|
||||
async def test_resend_new_timestamp_collision_bumps_timestamp(self, test_db):
|
||||
"""New-timestamp resend should bump the transmit timestamp instead of reusing the row."""
|
||||
mc = _make_mc(name="MyNode")
|
||||
chan_key = "dd" * 16
|
||||
await ChannelRepository.upsert(key=chan_key, name="#collision")
|
||||
@@ -642,13 +759,19 @@ class TestResendChannelMessage:
|
||||
patch("app.routers.messages.broadcast_event"),
|
||||
patch("app.routers.messages.time") as mock_time,
|
||||
):
|
||||
# Force the same second so MessageRepository.create returns None (duplicate)
|
||||
mock_time.time.return_value = float(now)
|
||||
result = await resend_channel_message(msg_id, new_timestamp=True)
|
||||
|
||||
# Should succeed gracefully, returning the original message ID
|
||||
assert result["status"] == "ok"
|
||||
assert result["message_id"] == msg_id
|
||||
assert result["message_id"] != msg_id
|
||||
resent = await MessageRepository.get_by_id(result["message_id"])
|
||||
assert resent is not None
|
||||
assert resent.sender_timestamp == now + 1
|
||||
assert resent.received_at == now
|
||||
sent_timestamp = int.from_bytes(
|
||||
mc.commands.send_chan_msg.await_args.kwargs["timestamp"], "little"
|
||||
)
|
||||
assert sent_timestamp == now + 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resend_non_outgoing_returns_400(self, test_db):
|
||||
|
||||
Reference in New Issue
Block a user