mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
252 lines
9.4 KiB
Python
252 lines
9.4 KiB
Python
"""Tests for the channels router endpoints."""
|
|
|
|
import time
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from app.channel_constants import PUBLIC_CHANNEL_KEY, PUBLIC_CHANNEL_NAME
|
|
from app.repository import ChannelRepository, MessageRepository
|
|
|
|
|
|
class TestChannelFloodScopeOverride:
|
|
@pytest.mark.asyncio
|
|
async def test_sets_channel_flood_scope_override(self, test_db, client):
|
|
key = "AA" * 16
|
|
await ChannelRepository.upsert(key=key, name="#flightless", is_hashtag=True)
|
|
|
|
with patch("app.routers.channels.broadcast_event") as mock_broadcast:
|
|
response = await client.post(
|
|
f"/api/channels/{key}/flood-scope-override",
|
|
json={"flood_scope_override": "Esperance"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["flood_scope_override"] == "#Esperance"
|
|
|
|
channel = await ChannelRepository.get_by_key(key)
|
|
assert channel is not None
|
|
assert channel.flood_scope_override == "#Esperance"
|
|
mock_broadcast.assert_called_once()
|
|
assert mock_broadcast.call_args.args[0] == "channel"
|
|
|
|
|
|
class TestCreateChannel:
|
|
@pytest.mark.asyncio
|
|
async def test_create_broadcasts_channel_update(self, test_db):
|
|
from app.routers.channels import CreateChannelRequest, create_channel
|
|
|
|
with patch("app.routers.channels.broadcast_event") as mock_broadcast:
|
|
result = await create_channel(CreateChannelRequest(name="#mychannel"))
|
|
|
|
mock_broadcast.assert_called_once()
|
|
assert mock_broadcast.call_args.args[0] == "channel"
|
|
assert mock_broadcast.call_args.args[1]["key"] == result.key
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_existing_hash_is_not_doubled(self, test_db, client):
|
|
key = "CC" * 16
|
|
await ChannelRepository.upsert(key=key, name="#flightless", is_hashtag=True)
|
|
|
|
response = await client.post(
|
|
f"/api/channels/{key}/flood-scope-override",
|
|
json={"flood_scope_override": "#Esperance"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["flood_scope_override"] == "#Esperance"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blank_override_clears_channel_flood_scope_override(self, test_db, client):
|
|
key = "BB" * 16
|
|
await ChannelRepository.upsert(key=key, name="#flightless", is_hashtag=True)
|
|
await ChannelRepository.update_flood_scope_override(key, "#Esperance")
|
|
|
|
response = await client.post(
|
|
f"/api/channels/{key}/flood-scope-override",
|
|
json={"flood_scope_override": " "},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["flood_scope_override"] is None
|
|
|
|
channel = await ChannelRepository.get_by_key(key)
|
|
assert channel is not None
|
|
assert channel.flood_scope_override is None
|
|
|
|
|
|
class TestPublicChannelProtection:
|
|
@pytest.mark.asyncio
|
|
async def test_create_public_uses_canonical_key(self, test_db):
|
|
from app.routers.channels import CreateChannelRequest, create_channel
|
|
|
|
result = await create_channel(CreateChannelRequest(name="Public"))
|
|
|
|
assert result.key == PUBLIC_CHANNEL_KEY
|
|
assert result.name == PUBLIC_CHANNEL_NAME
|
|
assert result.is_hashtag is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_public_rejects_conflicting_key(self, test_db, client):
|
|
response = await client.post(
|
|
"/api/channels",
|
|
json={"name": "Public", "key": "AA" * 16},
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert "canonical Public key" in response.json()["detail"]
|
|
assert await ChannelRepository.get_by_key("AA" * 16) is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_non_public_rejects_public_key(self, test_db, client):
|
|
response = await client.post(
|
|
"/api/channels",
|
|
json={"name": "Ops", "key": PUBLIC_CHANNEL_KEY},
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert PUBLIC_CHANNEL_NAME in response.json()["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_public_channel_is_rejected(self, test_db, client):
|
|
await ChannelRepository.upsert(
|
|
key=PUBLIC_CHANNEL_KEY,
|
|
name=PUBLIC_CHANNEL_NAME,
|
|
is_hashtag=False,
|
|
on_radio=False,
|
|
)
|
|
|
|
response = await client.delete(f"/api/channels/{PUBLIC_CHANNEL_KEY}")
|
|
|
|
assert response.status_code == 400
|
|
assert "cannot be deleted" in response.json()["detail"]
|
|
channel = await ChannelRepository.get_by_key(PUBLIC_CHANNEL_KEY)
|
|
assert channel is not None
|
|
|
|
|
|
class TestChannelDetail:
|
|
"""Test GET /api/channels/{key}/detail."""
|
|
|
|
CHANNEL_KEY = "AABBCCDDAABBCCDDAABBCCDDAABBCCDD"
|
|
|
|
async def _seed_channel(self):
|
|
"""Create a channel in the DB."""
|
|
await ChannelRepository.upsert(
|
|
key=self.CHANNEL_KEY,
|
|
name="#test-channel",
|
|
is_hashtag=True,
|
|
on_radio=True,
|
|
)
|
|
|
|
async def _insert_message(
|
|
self,
|
|
conversation_key: str,
|
|
text: str,
|
|
received_at: int,
|
|
sender_key: str | None = None,
|
|
sender_name: str | None = None,
|
|
) -> int | None:
|
|
return await MessageRepository.create(
|
|
msg_type="CHAN",
|
|
text=text,
|
|
received_at=received_at,
|
|
conversation_key=conversation_key,
|
|
sender_key=sender_key,
|
|
sender_name=sender_name,
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detail_basic_stats(self, test_db, client):
|
|
"""Channel with messages returns correct counts."""
|
|
await self._seed_channel()
|
|
now = int(time.time())
|
|
# Insert messages at different ages
|
|
await self._insert_message(self.CHANNEL_KEY, "recent1", now - 60, "aaa", "Alice")
|
|
await self._insert_message(self.CHANNEL_KEY, "recent2", now - 120, "bbb", "Bob")
|
|
await self._insert_message(self.CHANNEL_KEY, "old", now - 90000, "aaa", "Alice")
|
|
|
|
response = await client.get(f"/api/channels/{self.CHANNEL_KEY}/detail")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data["channel"]["key"] == self.CHANNEL_KEY
|
|
assert data["channel"]["name"] == "#test-channel"
|
|
assert data["message_counts"]["all_time"] == 3
|
|
assert data["message_counts"]["last_1h"] == 2
|
|
assert data["unique_sender_count"] == 2
|
|
assert data["first_message_at"] == now - 90000
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detail_404_unknown_key(self, test_db, client):
|
|
"""Unknown channel key returns 404."""
|
|
response = await client.get("/api/channels/FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF/detail")
|
|
assert response.status_code == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detail_empty_stats(self, test_db, client):
|
|
"""Channel with no messages returns zeroed stats."""
|
|
await self._seed_channel()
|
|
|
|
response = await client.get(f"/api/channels/{self.CHANNEL_KEY}/detail")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data["message_counts"]["all_time"] == 0
|
|
assert data["message_counts"]["last_1h"] == 0
|
|
assert data["unique_sender_count"] == 0
|
|
assert data["first_message_at"] is None
|
|
assert data["top_senders_24h"] == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detail_time_window_bucketing(self, test_db, client):
|
|
"""Messages at different ages fall into correct time buckets."""
|
|
await self._seed_channel()
|
|
now = int(time.time())
|
|
|
|
# 30 min ago → last_1h, last_24h, last_48h, last_7d
|
|
await self._insert_message(self.CHANNEL_KEY, "m1", now - 1800, "aaa")
|
|
# 2 hours ago → last_24h, last_48h, last_7d (not last_1h)
|
|
await self._insert_message(self.CHANNEL_KEY, "m2", now - 7200, "bbb")
|
|
# 30 hours ago → last_48h, last_7d (not last_1h or last_24h)
|
|
await self._insert_message(self.CHANNEL_KEY, "m3", now - 108000, "ccc")
|
|
# 3 days ago → last_7d only
|
|
await self._insert_message(self.CHANNEL_KEY, "m4", now - 259200, "ddd")
|
|
# 10 days ago → all_time only
|
|
await self._insert_message(self.CHANNEL_KEY, "m5", now - 864000, "eee")
|
|
|
|
response = await client.get(f"/api/channels/{self.CHANNEL_KEY}/detail")
|
|
data = response.json()
|
|
counts = data["message_counts"]
|
|
|
|
assert counts["last_1h"] == 1
|
|
assert counts["last_24h"] == 2
|
|
assert counts["last_48h"] == 3
|
|
assert counts["last_7d"] == 4
|
|
assert counts["all_time"] == 5
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detail_top_senders_ordering(self, test_db, client):
|
|
"""Top senders are ordered by message count descending."""
|
|
await self._seed_channel()
|
|
now = int(time.time())
|
|
|
|
# Alice: 3 messages, Bob: 1 message
|
|
for i in range(3):
|
|
await self._insert_message(
|
|
self.CHANNEL_KEY, f"alice-{i}", now - 60 * (i + 1), "aaa", "Alice"
|
|
)
|
|
await self._insert_message(self.CHANNEL_KEY, "bob-1", now - 300, "bbb", "Bob")
|
|
|
|
response = await client.get(f"/api/channels/{self.CHANNEL_KEY}/detail")
|
|
data = response.json()
|
|
|
|
senders = data["top_senders_24h"]
|
|
assert len(senders) == 2
|
|
assert senders[0]["sender_name"] == "Alice"
|
|
assert senders[0]["message_count"] == 3
|
|
assert senders[1]["sender_name"] == "Bob"
|
|
assert senders[1]["message_count"] == 1
|