mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
1553 lines
59 KiB
Python
1553 lines
59 KiB
Python
"""Tests for outgoing message sending via the messages router."""
|
|
|
|
import asyncio
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, call, patch
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
from meshcore import EventType
|
|
|
|
import app.services.message_send as message_send_service
|
|
from app.models import (
|
|
SendChannelMessageRequest,
|
|
SendDirectMessageRequest,
|
|
)
|
|
from app.radio import radio_manager
|
|
from app.repository import (
|
|
AppSettingsRepository,
|
|
ChannelRepository,
|
|
ContactRepository,
|
|
MessageRepository,
|
|
)
|
|
from app.routers.messages import (
|
|
resend_channel_message,
|
|
send_channel_message,
|
|
send_direct_message,
|
|
)
|
|
from app.services import dm_ack_tracker
|
|
from app.services.message_send import NO_RADIO_RESPONSE_AFTER_SEND_DETAIL
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_radio_state():
|
|
"""Save/restore radio_manager state so tests don't leak."""
|
|
prev = radio_manager._meshcore
|
|
prev_lock = radio_manager._operation_lock
|
|
prev_max_channels = radio_manager.max_channels
|
|
prev_connection_info = radio_manager._connection_info
|
|
prev_slot_by_key = radio_manager._channel_slot_by_key.copy()
|
|
prev_key_by_slot = radio_manager._channel_key_by_slot.copy()
|
|
prev_pending_acks = dm_ack_tracker._pending_acks.copy()
|
|
prev_buffered_acks = dm_ack_tracker._buffered_acks.copy()
|
|
yield
|
|
radio_manager._meshcore = prev
|
|
radio_manager._operation_lock = prev_lock
|
|
radio_manager.max_channels = prev_max_channels
|
|
radio_manager._connection_info = prev_connection_info
|
|
radio_manager._channel_slot_by_key = prev_slot_by_key
|
|
radio_manager._channel_key_by_slot = prev_key_by_slot
|
|
dm_ack_tracker._pending_acks.clear()
|
|
dm_ack_tracker._pending_acks.update(prev_pending_acks)
|
|
dm_ack_tracker._buffered_acks.clear()
|
|
dm_ack_tracker._buffered_acks.update(prev_buffered_acks)
|
|
|
|
|
|
def _make_radio_result(payload=None):
|
|
"""Create a mock radio command result."""
|
|
result = MagicMock()
|
|
result.type = EventType.MSG_SENT
|
|
result.payload = payload or {}
|
|
return result
|
|
|
|
|
|
def _make_mc(name="TestNode"):
|
|
"""Create a mock MeshCore connection."""
|
|
mc = MagicMock()
|
|
mc.self_info = {"name": name}
|
|
mc.commands = MagicMock()
|
|
mc.commands.set_flood_scope = AsyncMock(return_value=_make_radio_result())
|
|
mc.commands.send_msg = AsyncMock(return_value=_make_radio_result())
|
|
mc.commands.send_chan_msg = AsyncMock(return_value=_make_radio_result())
|
|
mc.commands.add_contact = AsyncMock(return_value=_make_radio_result())
|
|
mc.commands.reset_path = AsyncMock(return_value=MagicMock(type=EventType.OK, payload={}))
|
|
mc.commands.set_channel = AsyncMock(return_value=_make_radio_result())
|
|
mc.get_contact_by_key_prefix = MagicMock(return_value=None)
|
|
return mc
|
|
|
|
|
|
async def _insert_contact(public_key, name="Alice", **overrides):
|
|
"""Insert a contact into the test database."""
|
|
data = {
|
|
"public_key": public_key,
|
|
"name": name,
|
|
"type": 0,
|
|
"flags": 0,
|
|
"direct_path": None,
|
|
"direct_path_len": -1,
|
|
"direct_path_hash_mode": -1,
|
|
"last_advert": None,
|
|
"lat": None,
|
|
"lon": None,
|
|
"last_seen": None,
|
|
"on_radio": False,
|
|
"last_contacted": None,
|
|
}
|
|
data.update(overrides)
|
|
await ContactRepository.upsert(data)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _disable_background_dm_retries(monkeypatch):
|
|
monkeypatch.setattr(message_send_service, "DM_SEND_MAX_ATTEMPTS", 1)
|
|
yield
|
|
|
|
|
|
class TestOutgoingDMBroadcast:
|
|
"""Test that outgoing DMs are broadcast via broadcast_event for fanout dispatch."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_dm_broadcasts_outgoing(self, test_db):
|
|
"""Sending a DM broadcasts the message with outgoing=True for fanout dispatch."""
|
|
mc = _make_mc()
|
|
pub_key = "ab" * 32
|
|
await _insert_contact(pub_key, "Alice")
|
|
|
|
broadcasts = []
|
|
|
|
def capture_broadcast(event_type, data):
|
|
broadcasts.append({"type": event_type, "data": data})
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event", side_effect=capture_broadcast),
|
|
):
|
|
request = SendDirectMessageRequest(destination=pub_key, text="!lasttime Alice")
|
|
await send_direct_message(request)
|
|
|
|
msg_broadcasts = [b for b in broadcasts if b["type"] == "message"]
|
|
assert len(msg_broadcasts) == 1
|
|
data = msg_broadcasts[0]["data"]
|
|
assert data["text"] == "!lasttime Alice"
|
|
assert data["outgoing"] is True
|
|
assert data["type"] == "PRIV"
|
|
assert data["conversation_key"] == pub_key
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_dm_ambiguous_prefix_returns_409(self, test_db):
|
|
"""Ambiguous destination prefix should fail instead of selecting a random contact."""
|
|
mc = _make_mc()
|
|
|
|
# Insert two contacts that share the prefix "abc123"
|
|
await _insert_contact("abc123" + "00" * 29, "ContactA")
|
|
await _insert_contact("abc123" + "ff" * 29, "ContactB")
|
|
|
|
with patch("app.routers.messages.require_connected", return_value=mc):
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await send_direct_message(
|
|
SendDirectMessageRequest(destination="abc123", text="Hello")
|
|
)
|
|
|
|
assert exc_info.value.status_code == 409
|
|
assert "ambiguous" in exc_info.value.detail.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_dm_preserves_stored_direct_path_hash_mode(self, test_db):
|
|
"""Direct-message send pushes the persisted path hash mode back to the radio."""
|
|
mc = _make_mc()
|
|
pub_key = "cd" * 32
|
|
await _insert_contact(
|
|
pub_key,
|
|
"Alice",
|
|
direct_path="aa00bb00",
|
|
direct_path_len=2,
|
|
direct_path_hash_mode=1,
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
request = SendDirectMessageRequest(destination=pub_key, text="Hello")
|
|
await send_direct_message(request)
|
|
|
|
contact_payload = mc.commands.add_contact.call_args.args[0]
|
|
assert contact_payload["public_key"] == pub_key
|
|
assert contact_payload["out_path"] == "aa00bb00"
|
|
assert contact_payload["out_path_len"] == 2
|
|
assert contact_payload["out_path_hash_mode"] == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_dm_prefers_route_override_over_learned_path(self, test_db):
|
|
mc = _make_mc()
|
|
pub_key = "ef" * 32
|
|
await _insert_contact(
|
|
pub_key,
|
|
"Alice",
|
|
direct_path="aabb",
|
|
direct_path_len=1,
|
|
direct_path_hash_mode=0,
|
|
route_override_path="cc00dd00",
|
|
route_override_len=2,
|
|
route_override_hash_mode=1,
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
request = SendDirectMessageRequest(destination=pub_key, text="Hello")
|
|
await send_direct_message(request)
|
|
|
|
contact_payload = mc.commands.add_contact.call_args.args[0]
|
|
assert contact_payload["out_path"] == "cc00dd00"
|
|
assert contact_payload["out_path_len"] == 2
|
|
assert contact_payload["out_path_hash_mode"] == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_dm_same_second_duplicate_bumps_timestamp(self, test_db):
|
|
mc = _make_mc()
|
|
pub_key = "fa" * 32
|
|
await _insert_contact(pub_key, "Alice")
|
|
|
|
now = int(time.time())
|
|
original_id = await MessageRepository.create(
|
|
msg_type="PRIV",
|
|
text="hello",
|
|
conversation_key=pub_key,
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
assert original_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch("app.routers.messages.time") as mock_time,
|
|
):
|
|
mock_time.time.return_value = float(now)
|
|
result = await send_direct_message(
|
|
SendDirectMessageRequest(destination=pub_key, text="hello")
|
|
)
|
|
|
|
assert result.id != original_id
|
|
assert result.sender_timestamp == now + 1
|
|
assert result.received_at == now
|
|
assert mc.commands.send_msg.await_args.kwargs["timestamp"] == now + 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_dm_applies_buffered_ack_from_early_arrival(self, test_db):
|
|
from app.event_handlers import on_ack
|
|
|
|
mc = _make_mc()
|
|
ack_bytes = b"\xde\xad\xbe\xef"
|
|
result = MagicMock()
|
|
result.type = EventType.MSG_SENT
|
|
result.payload = {
|
|
"expected_ack": ack_bytes,
|
|
"suggested_timeout": 8000,
|
|
}
|
|
mc.commands.send_msg = AsyncMock(return_value=result)
|
|
|
|
pub_key = "fb" * 32
|
|
await _insert_contact(pub_key, "Alice")
|
|
|
|
class MockAckEvent:
|
|
payload = {"code": "deadbeef"}
|
|
|
|
broadcasts = []
|
|
|
|
def capture_broadcast(event_type, data):
|
|
broadcasts.append((event_type, data))
|
|
|
|
with (
|
|
patch("app.event_handlers.broadcast_event", side_effect=capture_broadcast),
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event", side_effect=capture_broadcast),
|
|
):
|
|
await on_ack(MockAckEvent())
|
|
message = await send_direct_message(
|
|
SendDirectMessageRequest(destination=pub_key, text="Hello")
|
|
)
|
|
|
|
ack_count, _ = await MessageRepository.get_ack_and_paths(message.id)
|
|
assert ack_count == 1
|
|
assert message.acked == 1
|
|
assert any(event_type == "message_acked" for event_type, _data in broadcasts)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_dm_without_expected_ack_does_not_schedule_retries(self, test_db):
|
|
mc = _make_mc()
|
|
pub_key = "fb" * 32
|
|
await _insert_contact(pub_key, "Alice")
|
|
|
|
mc.commands.send_msg = AsyncMock(return_value=_make_radio_result({}))
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch("app.services.message_send.asyncio.create_task") as mock_create_task,
|
|
):
|
|
message = await send_direct_message(
|
|
SendDirectMessageRequest(destination=pub_key, text="Hello")
|
|
)
|
|
|
|
assert message.acked == 0
|
|
mock_create_task.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_dm_background_retries_reset_path_before_final_attempt(self, test_db):
|
|
mc = _make_mc()
|
|
pub_key = "fc" * 32
|
|
await _insert_contact(pub_key, "Alice")
|
|
|
|
mc.commands.send_msg = AsyncMock(
|
|
side_effect=[
|
|
_make_radio_result(
|
|
{"expected_ack": b"\x00\x00\x00\x01", "suggested_timeout": 8000}
|
|
),
|
|
_make_radio_result(
|
|
{"expected_ack": b"\x00\x00\x00\x02", "suggested_timeout": 7000}
|
|
),
|
|
_make_radio_result(
|
|
{"expected_ack": b"\x00\x00\x00\x03", "suggested_timeout": 6000}
|
|
),
|
|
]
|
|
)
|
|
|
|
retry_tasks = []
|
|
loop = asyncio.get_running_loop()
|
|
slept_for = []
|
|
|
|
def schedule_retry(coro):
|
|
task = loop.create_task(coro)
|
|
retry_tasks.append(task)
|
|
return task
|
|
|
|
async def no_wait(seconds):
|
|
slept_for.append(seconds)
|
|
return None
|
|
|
|
with (
|
|
patch.object(message_send_service, "DM_SEND_MAX_ATTEMPTS", 3),
|
|
patch("app.routers.messages.track_pending_ack", return_value=False),
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch("app.services.message_send.asyncio.create_task", side_effect=schedule_retry),
|
|
patch("app.services.message_send.asyncio.sleep", side_effect=no_wait),
|
|
):
|
|
await send_direct_message(SendDirectMessageRequest(destination=pub_key, text="Hello"))
|
|
await asyncio.gather(*retry_tasks)
|
|
|
|
assert mc.commands.send_msg.await_count == 3
|
|
assert mc.commands.add_contact.await_count == 3
|
|
assert mc.commands.send_msg.await_args_list[1].kwargs["attempt"] == 1
|
|
assert mc.commands.send_msg.await_args_list[2].kwargs["attempt"] == 2
|
|
mc.commands.reset_path.assert_awaited_once_with(pub_key)
|
|
assert slept_for == pytest.approx([9.6, 8.4])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_dm_background_retry_stops_after_late_ack(self, test_db):
|
|
from app.event_handlers import on_ack
|
|
|
|
mc = _make_mc()
|
|
pub_key = "fd" * 32
|
|
await _insert_contact(pub_key, "Alice")
|
|
|
|
mc.commands.send_msg = AsyncMock(
|
|
return_value=_make_radio_result(
|
|
{"expected_ack": b"\xde\xad\xbe\xef", "suggested_timeout": 8000}
|
|
)
|
|
)
|
|
|
|
retry_tasks = []
|
|
sleep_gate = asyncio.Event()
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def schedule_retry(coro):
|
|
task = loop.create_task(coro)
|
|
retry_tasks.append(task)
|
|
return task
|
|
|
|
async def gated_sleep(_seconds):
|
|
await sleep_gate.wait()
|
|
|
|
class MockAckEvent:
|
|
payload = {"code": "deadbeef"}
|
|
|
|
with (
|
|
patch.object(message_send_service, "DM_SEND_MAX_ATTEMPTS", 3),
|
|
patch("app.event_handlers.broadcast_event"),
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch("app.services.message_send.asyncio.create_task", side_effect=schedule_retry),
|
|
patch("app.services.message_send.asyncio.sleep", side_effect=gated_sleep),
|
|
):
|
|
message = await send_direct_message(
|
|
SendDirectMessageRequest(destination=pub_key, text="Hello")
|
|
)
|
|
await on_ack(MockAckEvent())
|
|
sleep_gate.set()
|
|
await asyncio.gather(*retry_tasks)
|
|
|
|
ack_count, _ = await MessageRepository.get_ack_and_paths(message.id)
|
|
assert ack_count == 1
|
|
assert mc.commands.send_msg.await_count == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_buffered_retry_ack_clears_older_dm_ack_codes(self, test_db):
|
|
from app.event_handlers import on_ack
|
|
|
|
mc = _make_mc()
|
|
pub_key = "fe" * 32
|
|
await _insert_contact(pub_key, "Alice")
|
|
|
|
mc.commands.send_msg = AsyncMock(
|
|
side_effect=[
|
|
_make_radio_result(
|
|
{"expected_ack": b"\xaa\xaa\xaa\x01", "suggested_timeout": 8000}
|
|
),
|
|
_make_radio_result(
|
|
{"expected_ack": b"\xbb\xbb\xbb\x02", "suggested_timeout": 8000}
|
|
),
|
|
]
|
|
)
|
|
|
|
retry_tasks = []
|
|
sleep_gate = asyncio.Event()
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def schedule_retry(coro):
|
|
task = loop.create_task(coro)
|
|
retry_tasks.append(task)
|
|
return task
|
|
|
|
async def gated_sleep(_seconds):
|
|
await sleep_gate.wait()
|
|
|
|
class RetryAckEvent:
|
|
payload = {"code": "bbbbbb02"}
|
|
|
|
class FirstAckEvent:
|
|
payload = {"code": "aaaaaa01"}
|
|
|
|
with (
|
|
patch.object(message_send_service, "DM_SEND_MAX_ATTEMPTS", 3),
|
|
patch("app.event_handlers.broadcast_event"),
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch("app.services.message_send.asyncio.create_task", side_effect=schedule_retry),
|
|
patch("app.services.message_send.asyncio.sleep", side_effect=gated_sleep),
|
|
):
|
|
message = await send_direct_message(
|
|
SendDirectMessageRequest(destination=pub_key, text="Hello")
|
|
)
|
|
await on_ack(RetryAckEvent())
|
|
sleep_gate.set()
|
|
await asyncio.gather(*retry_tasks)
|
|
await on_ack(FirstAckEvent())
|
|
|
|
ack_count, _ = await MessageRepository.get_ack_and_paths(message.id)
|
|
assert ack_count == 1
|
|
|
|
|
|
class TestOutgoingChannelBroadcast:
|
|
"""Test that outgoing channel messages are broadcast via broadcast_event for fanout dispatch."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_broadcasts_outgoing(self, test_db):
|
|
"""Sending a channel message broadcasts with outgoing=True for fanout dispatch."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "aa" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#general")
|
|
|
|
broadcasts = []
|
|
|
|
def capture_broadcast(event_type, data):
|
|
broadcasts.append({"type": event_type, "data": data})
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event", side_effect=capture_broadcast),
|
|
):
|
|
request = SendChannelMessageRequest(channel_key=chan_key, text="!lasttime5 someone")
|
|
await send_channel_message(request)
|
|
|
|
msg_broadcasts = [b for b in broadcasts if b["type"] == "message"]
|
|
assert len(msg_broadcasts) == 1
|
|
data = msg_broadcasts[0]["data"]
|
|
assert data["outgoing"] is True
|
|
assert data["type"] == "CHAN"
|
|
assert data["conversation_key"] == chan_key.upper()
|
|
assert data["sender_name"] == "MyNode"
|
|
assert data["channel_name"] == "#general"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_same_second_duplicate_bumps_timestamp(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "ac" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#general")
|
|
|
|
now = int(time.time())
|
|
original_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: hello",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
assert original_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch("app.routers.messages.time") as mock_time,
|
|
):
|
|
mock_time.time.return_value = float(now)
|
|
result = await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="hello")
|
|
)
|
|
|
|
assert result.id != original_id
|
|
assert result.sender_timestamp == now + 1
|
|
assert result.received_at == now
|
|
sent_timestamp = int.from_bytes(
|
|
mc.commands.send_chan_msg.await_args.kwargs["timestamp"], "little"
|
|
)
|
|
assert sent_timestamp == now + 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_response_includes_current_ack_count(self, test_db):
|
|
"""Send response reflects latest DB ack count at response time."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "ff" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#acked")
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
request = SendChannelMessageRequest(channel_key=chan_key, text="acked now")
|
|
message = await send_channel_message(request)
|
|
|
|
# Fresh message has acked=0
|
|
assert message.id is not None
|
|
assert message.acked == 0
|
|
assert message.channel_name == "#acked"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_includes_sender_key(self, test_db):
|
|
"""Outgoing channel message includes our public key as sender_key."""
|
|
our_pubkey = "ab" * 32
|
|
mc = _make_mc(name="MyNode")
|
|
mc.self_info["public_key"] = our_pubkey
|
|
chan_key = "ee" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#test")
|
|
|
|
broadcasts = []
|
|
|
|
def capture_broadcast(event_type, data):
|
|
broadcasts.append({"type": event_type, "data": data})
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event", side_effect=capture_broadcast),
|
|
):
|
|
request = SendChannelMessageRequest(channel_key=chan_key, text="hello")
|
|
message = await send_channel_message(request)
|
|
|
|
# Response message includes sender_key
|
|
assert message.sender_key == our_pubkey
|
|
assert message.sender_name == "MyNode"
|
|
|
|
# Broadcast also includes sender_key
|
|
msg_broadcasts = [b for b in broadcasts if b["type"] == "message"]
|
|
assert len(msg_broadcasts) == 1
|
|
assert msg_broadcasts[0]["data"]["sender_key"] == our_pubkey
|
|
|
|
# DB row also has sender_key
|
|
db_msg = await MessageRepository.get_by_id(message.id)
|
|
assert db_msg is not None
|
|
assert db_msg.sender_key == our_pubkey
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_uses_channel_flood_scope_override(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "de" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#flightless")
|
|
await ChannelRepository.update_flood_scope_override(chan_key, "Esperance")
|
|
await AppSettingsRepository.update(flood_scope="Baseline")
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
request = SendChannelMessageRequest(channel_key=chan_key, text="hello")
|
|
await send_channel_message(request)
|
|
|
|
assert mc.commands.set_flood_scope.await_args_list == [
|
|
call("#Esperance"),
|
|
call("#Baseline"),
|
|
]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_skips_temporary_scope_when_override_matches_global(
|
|
self, test_db
|
|
):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "df" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#matching")
|
|
await ChannelRepository.update_flood_scope_override(chan_key, "Esperance")
|
|
await AppSettingsRepository.update(flood_scope="Esperance")
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
request = SendChannelMessageRequest(channel_key=chan_key, text="hello")
|
|
await send_channel_message(request)
|
|
|
|
mc.commands.set_flood_scope.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_aborts_when_override_apply_fails(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "a1" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#flightless")
|
|
await ChannelRepository.update_flood_scope_override(chan_key, "#Esperance")
|
|
await AppSettingsRepository.update(flood_scope="#Baseline")
|
|
mc.commands.set_flood_scope = AsyncMock(
|
|
return_value=MagicMock(type=EventType.ERROR, payload="unsupported")
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
request = SendChannelMessageRequest(channel_key=chan_key, text="hello")
|
|
await send_channel_message(request)
|
|
|
|
assert exc_info.value.status_code == 500
|
|
assert "regional override" in exc_info.value.detail.lower()
|
|
mc.commands.set_channel.assert_not_awaited()
|
|
mc.commands.send_chan_msg.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_reuses_cached_slot_for_same_channel(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "b1" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#cached")
|
|
radio_manager.max_channels = 4
|
|
radio_manager._connection_info = "Serial: /dev/ttyUSB0"
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="first send")
|
|
)
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="second send")
|
|
)
|
|
|
|
assert mc.commands.set_channel.await_count == 1
|
|
assert mc.commands.send_chan_msg.await_count == 2
|
|
assert [call.kwargs["chan"] for call in mc.commands.send_chan_msg.await_args_list] == [0, 0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_uses_lru_slot_eviction(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key_a = "c1" * 16
|
|
chan_key_b = "c2" * 16
|
|
chan_key_c = "c3" * 16
|
|
await ChannelRepository.upsert(key=chan_key_a, name="#alpha")
|
|
await ChannelRepository.upsert(key=chan_key_b, name="#bravo")
|
|
await ChannelRepository.upsert(key=chan_key_c, name="#charlie")
|
|
radio_manager.max_channels = 2
|
|
radio_manager._connection_info = "Serial: /dev/ttyUSB0"
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key_a, text="to alpha")
|
|
)
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key_b, text="to bravo")
|
|
)
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key_a, text="alpha again")
|
|
)
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key_c, text="to charlie")
|
|
)
|
|
|
|
assert [call.kwargs["channel_idx"] for call in mc.commands.set_channel.await_args_list] == [
|
|
0,
|
|
1,
|
|
1,
|
|
]
|
|
assert [call.kwargs["chan"] for call in mc.commands.send_chan_msg.await_args_list] == [
|
|
0,
|
|
1,
|
|
0,
|
|
1,
|
|
]
|
|
assert radio_manager.get_cached_channel_slot(chan_key_a) == 0
|
|
assert radio_manager.get_cached_channel_slot(chan_key_b) is None
|
|
assert radio_manager.get_cached_channel_slot(chan_key_c) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_tcp_always_reconfigures_slot(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "d1" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#tcp")
|
|
radio_manager.max_channels = 4
|
|
radio_manager._connection_info = "TCP: 127.0.0.1:4000"
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="first send")
|
|
)
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="second send")
|
|
)
|
|
|
|
assert mc.commands.set_channel.await_count == 2
|
|
assert mc.commands.send_chan_msg.await_count == 2
|
|
assert radio_manager.get_cached_channel_slot(chan_key) is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_force_reconfigure_env_disables_reuse(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "e1" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#forced")
|
|
radio_manager.max_channels = 4
|
|
radio_manager._connection_info = "Serial: /dev/ttyUSB0"
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch("app.radio.settings.force_channel_slot_reconfigure", True),
|
|
):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="first send")
|
|
)
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="second send")
|
|
)
|
|
|
|
assert mc.commands.set_channel.await_count == 2
|
|
assert radio_manager.get_cached_channel_slot(chan_key) is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_channel_msg_error_invalidates_cached_slot(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "f1" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#stale")
|
|
radio_manager.max_channels = 4
|
|
radio_manager._connection_info = "Serial: /dev/ttyUSB0"
|
|
|
|
mc.commands.send_chan_msg = AsyncMock(
|
|
return_value=MagicMock(type=EventType.ERROR, payload="bad slot")
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="this will fail")
|
|
)
|
|
|
|
assert exc_info.value.status_code == 500
|
|
assert radio_manager.get_cached_channel_slot(chan_key) is None
|
|
|
|
|
|
class TestResendChannelMessage:
|
|
"""Test the user-triggered resend endpoint."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_within_window_succeeds(self, test_db):
|
|
"""Resend within 30-second window sends with same timestamp bytes."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "aa" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#resend")
|
|
|
|
now = int(time.time()) - 10 # 10 seconds ago
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: hello",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
result = await resend_channel_message(msg_id, new_timestamp=False)
|
|
|
|
assert result.status == "ok"
|
|
assert result.message_id == msg_id
|
|
|
|
# Verify radio was called with correct timestamp bytes
|
|
mc.commands.send_chan_msg.assert_awaited_once()
|
|
call_kwargs = mc.commands.send_chan_msg.await_args.kwargs
|
|
assert call_kwargs["timestamp"] == now.to_bytes(4, "little")
|
|
assert call_kwargs["msg"] == "hello" # Sender prefix stripped
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_outside_window_returns_400(self, test_db):
|
|
"""Resend after 30-second window fails."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "bb" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#old")
|
|
|
|
old_ts = int(time.time()) - 60 # 60 seconds ago
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: old message",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=old_ts,
|
|
received_at=old_ts,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await resend_channel_message(msg_id, new_timestamp=False)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "expired" in exc_info.value.detail.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_uses_current_channel_flood_scope_override(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "be" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#flightless")
|
|
await ChannelRepository.update_flood_scope_override(chan_key, "#CurrentRegion")
|
|
await AppSettingsRepository.update(flood_scope="#Baseline")
|
|
|
|
now = int(time.time()) - 10
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: hello",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
await resend_channel_message(msg_id, new_timestamp=False)
|
|
|
|
assert mc.commands.set_flood_scope.await_args_list == [
|
|
call("#CurrentRegion"),
|
|
call("#Baseline"),
|
|
]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_restore_failure_broadcasts_warning(self, test_db):
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "b1" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#flightless")
|
|
await ChannelRepository.update_flood_scope_override(chan_key, "#CurrentRegion")
|
|
await AppSettingsRepository.update(flood_scope="#Baseline")
|
|
|
|
now = int(time.time()) - 10
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: hello",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
mc.commands.set_flood_scope = AsyncMock(
|
|
side_effect=[
|
|
_make_radio_result(),
|
|
MagicMock(type=EventType.ERROR, payload="restore failed"),
|
|
]
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_error") as mock_broadcast_error,
|
|
):
|
|
result = await resend_channel_message(msg_id, new_timestamp=False)
|
|
|
|
assert result.status == "ok"
|
|
mock_broadcast_error.assert_called_once()
|
|
assert "restore failed" in mock_broadcast_error.call_args.args[0].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_new_timestamp_collision_bumps_timestamp(self, test_db):
|
|
"""New-timestamp resend should bump the transmit timestamp instead of reusing the row."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "dd" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#collision")
|
|
|
|
now = int(time.time())
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: duplicate",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch("app.routers.messages.time") as mock_time,
|
|
):
|
|
mock_time.time.return_value = float(now)
|
|
result = await resend_channel_message(msg_id, new_timestamp=True)
|
|
|
|
assert result.status == "ok"
|
|
assert result.message_id != msg_id
|
|
resent = await MessageRepository.get_by_id(result.message_id)
|
|
assert resent is not None
|
|
assert result.message is not None
|
|
assert result.message.id == resent.id
|
|
assert result.message.conversation_key == resent.conversation_key
|
|
assert result.message.text == resent.text
|
|
assert result.message.sender_timestamp == resent.sender_timestamp
|
|
assert result.message.outgoing is True
|
|
assert resent.sender_timestamp == now + 1
|
|
assert resent.received_at == now
|
|
sent_timestamp = int.from_bytes(
|
|
mc.commands.send_chan_msg.await_args.kwargs["timestamp"], "little"
|
|
)
|
|
assert sent_timestamp == now + 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_no_radio_response_returns_504_and_creates_no_new_row(self, test_db):
|
|
"""When resend returns None, report unknown outcome and create no new message row."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "c1" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#resend-none")
|
|
|
|
now = int(time.time()) - 5
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: hello",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
mc.commands.send_chan_msg = AsyncMock(return_value=None)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await resend_channel_message(msg_id, new_timestamp=True)
|
|
|
|
assert exc_info.value.status_code == 504
|
|
assert exc_info.value.detail == NO_RADIO_RESPONSE_AFTER_SEND_DETAIL
|
|
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=chan_key.upper(), limit=10
|
|
)
|
|
assert len(messages) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_non_outgoing_returns_400(self, test_db):
|
|
"""Resend of incoming message fails."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "cc" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#incoming")
|
|
|
|
now = int(time.time())
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="SomeUser: incoming",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=False,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await resend_channel_message(msg_id, new_timestamp=False)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "outgoing" in exc_info.value.detail.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_dm_returns_400(self, test_db):
|
|
"""Resend of DM message fails."""
|
|
mc = _make_mc(name="MyNode")
|
|
pub_key = "dd" * 32
|
|
|
|
now = int(time.time())
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="PRIV",
|
|
text="hello dm",
|
|
conversation_key=pub_key,
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await resend_channel_message(msg_id, new_timestamp=False)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "channel" in exc_info.value.detail.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_nonexistent_returns_404(self, test_db):
|
|
"""Resend of nonexistent message fails."""
|
|
mc = _make_mc(name="MyNode")
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await resend_channel_message(999999, new_timestamp=False)
|
|
|
|
assert exc_info.value.status_code == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_strips_sender_prefix(self, test_db):
|
|
"""Resend strips the sender prefix before sending to radio."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "ee" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#strip")
|
|
|
|
now = int(time.time()) - 5
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: hello world",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=now,
|
|
received_at=now,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
await resend_channel_message(msg_id, new_timestamp=False)
|
|
|
|
call_kwargs = mc.commands.send_chan_msg.await_args.kwargs
|
|
assert call_kwargs["msg"] == "hello world"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_new_timestamp_skips_window(self, test_db):
|
|
"""new_timestamp=True succeeds even when the 30s window has expired."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "dd" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#old")
|
|
|
|
old_ts = int(time.time()) - 60 # 60 seconds ago — outside byte-perfect window
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: old message",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=old_ts,
|
|
received_at=old_ts,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
result = await resend_channel_message(msg_id, new_timestamp=True)
|
|
|
|
assert result.status == "ok"
|
|
# Should return a NEW message id, not the original
|
|
assert result.message_id != msg_id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_new_timestamp_creates_new_message(self, test_db):
|
|
"""new_timestamp=True creates a new DB row with a different sender_timestamp."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "dd" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#new")
|
|
|
|
old_ts = int(time.time()) - 10
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: test",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=old_ts,
|
|
received_at=old_ts,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
result = await resend_channel_message(msg_id, new_timestamp=True)
|
|
|
|
new_msg_id = result.message_id
|
|
new_msg = await MessageRepository.get_by_id(new_msg_id)
|
|
original_msg = await MessageRepository.get_by_id(msg_id)
|
|
|
|
assert new_msg is not None
|
|
assert original_msg is not None
|
|
assert new_msg.sender_timestamp != original_msg.sender_timestamp
|
|
assert new_msg.text == original_msg.text
|
|
assert new_msg.outgoing is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_new_timestamp_broadcasts_message(self, test_db):
|
|
"""new_timestamp=True broadcasts the new message via WebSocket."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "dd" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#broadcast")
|
|
|
|
old_ts = int(time.time()) - 5
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: broadcast test",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=old_ts,
|
|
received_at=old_ts,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event") as mock_broadcast,
|
|
):
|
|
result = await resend_channel_message(msg_id, new_timestamp=True)
|
|
|
|
mock_broadcast.assert_called_once()
|
|
event_type, event_data = mock_broadcast.call_args.args
|
|
assert event_type == "message"
|
|
assert event_data["id"] == result.message_id
|
|
assert event_data["outgoing"] is True
|
|
assert event_data["channel_name"] == "#broadcast"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resend_byte_perfect_still_enforces_window(self, test_db):
|
|
"""Default (byte-perfect) resend still enforces the 30s window."""
|
|
mc = _make_mc(name="MyNode")
|
|
chan_key = "dd" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#window")
|
|
|
|
old_ts = int(time.time()) - 60
|
|
msg_id = await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text="MyNode: expired",
|
|
conversation_key=chan_key.upper(),
|
|
sender_timestamp=old_ts,
|
|
received_at=old_ts,
|
|
outgoing=True,
|
|
)
|
|
assert msg_id is not None
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await resend_channel_message(msg_id, new_timestamp=False)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "expired" in exc_info.value.detail.lower()
|
|
|
|
|
|
class TestRadioExceptionMidSend:
|
|
"""Test that radio exceptions during send don't leave orphaned DB state."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dm_send_radio_exception_no_orphan_message(self, test_db):
|
|
"""When mc.commands.send_msg() raises, no message should be stored in DB."""
|
|
mc = _make_mc()
|
|
pub_key = "ab" * 32
|
|
await _insert_contact(pub_key, "Alice")
|
|
|
|
# Make the radio command raise (simulates serial timeout / connection drop)
|
|
mc.commands.send_msg = AsyncMock(side_effect=ConnectionError("Serial port disconnected"))
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(ConnectionError):
|
|
await send_direct_message(
|
|
SendDirectMessageRequest(destination=pub_key, text="This will fail")
|
|
)
|
|
|
|
# No message should be stored — the exception prevented reaching MessageRepository.create
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="PRIV", conversation_key=pub_key, limit=10
|
|
)
|
|
assert len(messages) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dm_send_no_radio_response_returns_504_without_storing_message(self, test_db):
|
|
"""When mc.commands.send_msg() returns None, report unknown outcome and store nothing."""
|
|
mc = _make_mc()
|
|
pub_key = "ac" * 32
|
|
await _insert_contact(pub_key, "Alice")
|
|
|
|
mc.commands.send_msg = AsyncMock(return_value=None)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await send_direct_message(
|
|
SendDirectMessageRequest(destination=pub_key, text="Did this send?")
|
|
)
|
|
|
|
assert exc_info.value.status_code == 504
|
|
assert exc_info.value.detail == NO_RADIO_RESPONSE_AFTER_SEND_DETAIL
|
|
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="PRIV", conversation_key=pub_key, limit=10
|
|
)
|
|
assert len(messages) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_send_no_radio_response_returns_504_without_storing_message(
|
|
self, test_db
|
|
):
|
|
"""When mc.commands.send_chan_msg() returns None, report unknown outcome and store nothing."""
|
|
mc = _make_mc(name="TestNode")
|
|
chan_key = "ad" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#unknown-outcome")
|
|
|
|
mc.commands.send_chan_msg = AsyncMock(return_value=None)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="Did this send?")
|
|
)
|
|
|
|
assert exc_info.value.status_code == 504
|
|
assert exc_info.value.detail == NO_RADIO_RESPONSE_AFTER_SEND_DETAIL
|
|
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=chan_key.upper(), limit=10
|
|
)
|
|
assert len(messages) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_send_radio_exception_no_orphan_message(self, test_db):
|
|
"""When mc.commands.send_chan_msg() raises, no message should be stored in DB."""
|
|
from app.repository import ChannelRepository
|
|
|
|
mc = _make_mc(name="TestNode")
|
|
chan_key = "ab" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#test")
|
|
|
|
mc.commands.send_chan_msg = AsyncMock(
|
|
side_effect=ConnectionError("Serial port disconnected")
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(ConnectionError):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="This will fail")
|
|
)
|
|
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=chan_key.upper(), limit=10
|
|
)
|
|
assert len(messages) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_send_set_channel_exception_no_orphan(self, test_db):
|
|
"""When mc.commands.set_channel() raises, send is not attempted and no message stored."""
|
|
from app.repository import ChannelRepository
|
|
|
|
mc = _make_mc(name="TestNode")
|
|
chan_key = "cd" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#broken")
|
|
|
|
mc.commands.set_channel = AsyncMock(side_effect=TimeoutError("Radio not responding"))
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(TimeoutError):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="Never sent")
|
|
)
|
|
|
|
# send_chan_msg should never have been called
|
|
mc.commands.send_chan_msg.assert_not_called()
|
|
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=chan_key.upper(), limit=10
|
|
)
|
|
assert len(messages) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_send_set_channel_exception_invalidates_evicted_cached_slot(
|
|
self, test_db
|
|
):
|
|
"""Eviction-path configure exceptions drop the stale cached slot owner."""
|
|
from app.repository import ChannelRepository
|
|
|
|
mc = _make_mc(name="TestNode")
|
|
chan_key_a = "de" * 16
|
|
chan_key_b = "ef" * 16
|
|
await ChannelRepository.upsert(key=chan_key_a, name="#alpha")
|
|
await ChannelRepository.upsert(key=chan_key_b, name="#bravo")
|
|
|
|
radio_manager.max_channels = 1
|
|
radio_manager._connection_info = "Serial: /dev/ttyUSB0"
|
|
radio_manager.note_channel_slot_loaded(chan_key_a, 0)
|
|
|
|
mc.commands.set_channel = AsyncMock(side_effect=TimeoutError("Radio not responding"))
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(TimeoutError):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key_b, text="Never sent")
|
|
)
|
|
|
|
assert radio_manager.get_cached_channel_slot(chan_key_a) is None
|
|
assert radio_manager.get_cached_channel_slot(chan_key_b) is None
|
|
mc.commands.send_chan_msg.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_send_set_channel_error_invalidates_evicted_cached_slot(self, test_db):
|
|
"""Eviction-path configure error results also drop the stale cached slot owner."""
|
|
mc = _make_mc(name="TestNode")
|
|
chan_key_a = "fa" * 16
|
|
chan_key_b = "fb" * 16
|
|
await ChannelRepository.upsert(key=chan_key_a, name="#alpha")
|
|
await ChannelRepository.upsert(key=chan_key_b, name="#bravo")
|
|
|
|
radio_manager.max_channels = 1
|
|
radio_manager._connection_info = "Serial: /dev/ttyUSB0"
|
|
radio_manager.note_channel_slot_loaded(chan_key_a, 0)
|
|
|
|
mc.commands.set_channel = AsyncMock(
|
|
return_value=MagicMock(type=EventType.ERROR, payload="radio busy")
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
pytest.raises(HTTPException) as exc_info,
|
|
):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key_b, text="Never sent")
|
|
)
|
|
|
|
assert exc_info.value.status_code == 500
|
|
assert radio_manager.get_cached_channel_slot(chan_key_a) is None
|
|
assert radio_manager.get_cached_channel_slot(chan_key_b) is None
|
|
mc.commands.send_chan_msg.assert_not_called()
|
|
|
|
|
|
class TestConcurrentChannelSends:
|
|
"""Test that concurrent channel sends are serialized by the radio operation lock.
|
|
|
|
The send_channel_message endpoint uses set_channel (slot 0) then send_chan_msg.
|
|
Concurrent sends must be serialized so two messages don't clobber the same
|
|
temporary radio slot.
|
|
"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_sends_to_different_channels_both_succeed(self, test_db):
|
|
"""Two concurrent send_channel_message calls to different channels
|
|
should both succeed — the radio_operation lock serializes them."""
|
|
mc = _make_mc(name="TestNode")
|
|
chan_key_a = "aa" * 16
|
|
chan_key_b = "bb" * 16
|
|
await ChannelRepository.upsert(key=chan_key_a, name="#alpha")
|
|
await ChannelRepository.upsert(key=chan_key_b, name="#bravo")
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
):
|
|
results = await asyncio.gather(
|
|
send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key_a, text="Hello alpha")
|
|
),
|
|
send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key_b, text="Hello bravo")
|
|
),
|
|
)
|
|
|
|
# Both should have returned Message objects with distinct IDs
|
|
assert results[0].id != results[1].id
|
|
assert results[0].conversation_key == chan_key_a.upper()
|
|
assert results[1].conversation_key == chan_key_b.upper()
|
|
|
|
# set_channel should have been called twice (once per send, serialized)
|
|
assert mc.commands.set_channel.await_count == 2
|
|
|
|
# send_chan_msg should have been called twice
|
|
assert mc.commands.send_chan_msg.await_count == 2
|
|
|
|
# Both messages should be in DB
|
|
msgs_a = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=chan_key_a.upper(), limit=10
|
|
)
|
|
msgs_b = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=chan_key_b.upper(), limit=10
|
|
)
|
|
assert len(msgs_a) == 1
|
|
assert len(msgs_b) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_sends_to_same_channel_both_succeed(self, test_db):
|
|
"""Two concurrent sends to the same channel should both succeed
|
|
with distinct timestamps (serialized, no slot clobber)."""
|
|
mc = _make_mc(name="TestNode")
|
|
chan_key = "cc" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#charlie")
|
|
|
|
call_count = 0
|
|
|
|
# Mock time to return incrementing seconds so the two messages
|
|
# get distinct sender_timestamps (avoiding same-second collision).
|
|
original_time = time.time
|
|
|
|
def advancing_time():
|
|
nonlocal call_count
|
|
call_count += 1
|
|
return original_time() + call_count
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch("app.routers.messages.time") as mock_time,
|
|
):
|
|
mock_time.time = advancing_time
|
|
results = await asyncio.gather(
|
|
send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="Message one")
|
|
),
|
|
send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="Message two")
|
|
),
|
|
)
|
|
|
|
assert results[0].id != results[1].id
|
|
texts = {results[0].text, results[1].text}
|
|
assert "TestNode: Message one" in texts
|
|
assert "TestNode: Message two" in texts
|
|
|
|
msgs = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=chan_key.upper(), limit=10
|
|
)
|
|
assert len(msgs) == 2
|
|
|
|
|
|
class TestChannelSendLockScope:
|
|
"""Channel send should release the radio lock before DB persistence work."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_message_row_created_after_radio_lock_released(self, test_db):
|
|
mc = _make_mc(name="TestNode")
|
|
chan_key = "de" * 16
|
|
await ChannelRepository.upsert(key=chan_key, name="#lockscope")
|
|
|
|
observed_lock_states: list[bool] = []
|
|
original_create = MessageRepository.create
|
|
|
|
async def _assert_lock_then_create(*args, **kwargs):
|
|
observed_lock_states.append(bool(radio_manager._operation_lock.locked()))
|
|
return await original_create(*args, **kwargs)
|
|
|
|
with (
|
|
patch("app.routers.messages.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.messages.broadcast_event"),
|
|
patch(
|
|
"app.services.message_send.MessageRepository.create",
|
|
side_effect=_assert_lock_then_create,
|
|
),
|
|
):
|
|
await send_channel_message(
|
|
SendChannelMessageRequest(channel_key=chan_key, text="Lock scope test")
|
|
)
|
|
|
|
assert observed_lock_states == [False]
|