Files
pyMC_Repeater/tests/test_handler_helpers_room_server.py
Lloyd 45a44eb47b Refactor test cases and base code for consistency and readability
- Updated byte representations in tests to use lowercase hex format for consistency.
- Reformatted code for better readability, including line breaks and indentation adjustments.
- Consolidated multiple lines into single lines where appropriate to enhance clarity.
- Ensured that all test cases maintain consistent formatting and style across the test suite.
2026-05-27 20:15:10 +01:00

287 lines
8.9 KiB
Python

import time
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from repeater.handler_helpers.room_server import (
MAX_UNSYNCED_POSTS,
RoomServer,
TXT_TYPE_PLAIN,
)
class _FakeIdentity:
def __init__(self, pubkey: bytes):
self._pubkey = pubkey
def get_public_key(self):
return self._pubkey
class _FakeClient:
def __init__(self, pubkey: bytes, shared_secret=b"s" * 32, out_path=b"", out_path_len=-1):
self.id = _FakeIdentity(pubkey)
self.shared_secret = shared_secret
self.out_path = bytearray(out_path)
self.out_path_len = out_path_len
self.sync_since = 0
class _FakeACL:
def __init__(self, clients=None):
self._clients = list(clients or [])
self.remove_client = MagicMock(return_value=True)
def get_all_clients(self):
return list(self._clients)
class _FakeDB:
def __init__(self):
self.insert_room_message = MagicMock(return_value=1)
self.upsert_client_sync = MagicMock()
self.get_client_sync = MagicMock(return_value=None)
self.get_unsynced_count = MagicMock(return_value=0)
self.get_all_room_clients = MagicMock(return_value=[])
self.get_unsynced_messages = MagicMock(return_value=[])
self.cleanup_old_messages = MagicMock(return_value=0)
def _make_room_server(db=None, acl=None, injector=None, max_posts=8):
return RoomServer(
room_hash=0x34,
room_name="room-alpha",
local_identity=_FakeIdentity(b"R" * 32),
sqlite_handler=db or _FakeDB(),
packet_injector=injector or AsyncMock(return_value=True),
acl=acl or _FakeACL(),
max_posts=max_posts,
)
@pytest.mark.asyncio
async def test_room_server_add_post_stores_message_and_updates_sync_state():
db = _FakeDB()
rs = _make_room_server(db=db, acl=_FakeACL([_FakeClient(b"C" * 32)]))
ok = await rs.add_post(
client_pubkey=b"A" * 32,
message_text="hello room",
sender_timestamp=111,
txt_type=TXT_TYPE_PLAIN,
)
assert ok is True
db.insert_room_message.assert_called_once()
db.upsert_client_sync.assert_called_once()
kwargs = db.upsert_client_sync.call_args.kwargs
assert kwargs["room_hash"] == "0x34"
assert kwargs["client_pubkey"] == (b"A" * 32).hex()
@pytest.mark.asyncio
async def test_room_server_add_post_truncates_and_rate_limits_client():
db = _FakeDB()
rs = _make_room_server(db=db)
client_key = b"B" * 32
long_msg = "x" * 500
first_ok = await rs.add_post(client_key, long_msg, sender_timestamp=5)
assert first_ok is True
args = db.insert_room_message.call_args.kwargs
assert len(args["message_text"]) == 160
# Force client to appear at post-per-minute limit.
rs.client_post_times[client_key.hex()] = [time.time() - 1] * 10
second_ok = await rs.add_post(client_key, "blocked", sender_timestamp=6)
assert second_ok is False
@pytest.mark.asyncio
async def test_room_server_add_post_returns_false_on_db_insert_failure():
db = _FakeDB()
db.insert_room_message.return_value = 0
rs = _make_room_server(db=db)
ok = await rs.add_post(b"D" * 32, "msg", sender_timestamp=9)
assert ok is False
def test_room_server_init_caps_max_posts_to_hard_limit():
rs = _make_room_server(max_posts=MAX_UNSYNCED_POSTS + 50)
assert rs.max_posts == MAX_UNSYNCED_POSTS
@pytest.mark.asyncio
async def test_room_server_push_post_to_client_success_direct_route_sets_path_and_ack():
db = _FakeDB()
db.get_client_sync.return_value = {"push_failures": 0}
injector = AsyncMock(return_value=True)
rs = _make_room_server(db=db, injector=injector)
rs.global_limiter = SimpleNamespace(acquire=AsyncMock(), release=MagicMock())
rs._handle_ack_received = AsyncMock()
client = _FakeClient(pubkey=b"E" * 32, out_path=b"\xaa\xbb", out_path_len=2)
post = {
"author_pubkey": (b"F" * 32).hex(),
"message_text": "payload",
"post_timestamp": 1234.5,
}
packet = SimpleNamespace(path=bytearray(), path_len=0)
with (
patch(
"repeater.handler_helpers.room_server.PacketBuilder._pack_timestamp_data",
return_value=b"pk",
),
patch(
"repeater.handler_helpers.room_server.CryptoUtils.sha256",
return_value=b"\x01\x02\x03\x04abcd",
),
patch(
"repeater.handler_helpers.room_server.PacketBuilder.create_datagram",
return_value=packet,
),
):
ok = await rs.push_post_to_client(client, post)
assert ok is True
assert bytes(packet.path) == b"\xaa\xbb"
assert packet.path_len == 2
injector.assert_awaited_once_with(packet, wait_for_ack=True)
rs._handle_ack_received.assert_awaited_once_with(
client.id.get_public_key(), post["post_timestamp"]
)
rs.global_limiter.release.assert_called_once()
@pytest.mark.asyncio
async def test_room_server_push_post_to_client_backoff_skip_and_timeout_path():
db = _FakeDB()
injector = AsyncMock(return_value=False)
rs = _make_room_server(db=db, injector=injector)
rs.global_limiter = SimpleNamespace(acquire=AsyncMock(), release=MagicMock())
rs._handle_ack_timeout = AsyncMock()
client = _FakeClient(pubkey=b"G" * 32)
post = {
"author_pubkey": (b"H" * 32).hex(),
"message_text": "payload",
"post_timestamp": 88.0,
}
# In backoff window: skip send.
db.get_client_sync.return_value = {"push_failures": 2, "updated_at": time.time()}
skip_ok = await rs.push_post_to_client(client, post)
assert skip_ok is False
injector.assert_not_awaited()
# Out of backoff and send fails -> timeout handler called.
db.get_client_sync.return_value = {"push_failures": 1, "updated_at": time.time() - 9999}
with (
patch(
"repeater.handler_helpers.room_server.PacketBuilder._pack_timestamp_data",
return_value=b"pk",
),
patch(
"repeater.handler_helpers.room_server.CryptoUtils.sha256",
return_value=b"\x01\x02\x03\x04abcd",
),
patch(
"repeater.handler_helpers.room_server.PacketBuilder.create_datagram",
return_value=SimpleNamespace(path=bytearray(), path_len=0),
),
):
fail_ok = await rs.push_post_to_client(client, post)
assert fail_ok is False
rs._handle_ack_timeout.assert_awaited_once_with(client.id.get_public_key())
@pytest.mark.asyncio
async def test_room_server_ack_helpers_and_unsynced_count_fallbacks():
db = _FakeDB()
rs = _make_room_server(db=db)
await rs._handle_ack_received(b"I" * 32, post_timestamp=123.0)
db.upsert_client_sync.assert_called()
db.get_client_sync.return_value = {"push_failures": 2}
await rs._handle_ack_timeout(b"I" * 32)
# last call should have incremented failures and cleared pending ack
timeout_kwargs = db.upsert_client_sync.call_args.kwargs
assert timeout_kwargs["push_failures"] == 3
assert timeout_kwargs["pending_ack_crc"] == 0
db.get_client_sync.side_effect = RuntimeError("db down")
assert rs.get_unsynced_count(b"I" * 32) == 0
@pytest.mark.asyncio
async def test_room_server_evict_failed_clients_and_check_ack_timeouts():
db = _FakeDB()
acl = _FakeACL([_FakeClient(b"J" * 32)])
rs = _make_room_server(db=db, acl=acl)
now = time.time()
db.get_all_room_clients.return_value = [
{
"client_pubkey": (b"J" * 32).hex(),
"push_failures": 3,
"last_activity": now,
"pending_ack_crc": 0,
"ack_timeout_time": 0,
},
{
"client_pubkey": (b"K" * 32).hex(),
"push_failures": 0,
"last_activity": now - 5000,
"pending_ack_crc": 0,
"ack_timeout_time": 0,
},
]
await rs._evict_failed_clients()
assert db.upsert_client_sync.call_count >= 2
assert acl.remove_client.call_count == 2
rs._handle_ack_timeout = AsyncMock()
db.get_all_room_clients.return_value = [
{
"client_pubkey": (b"L" * 32).hex(),
"pending_ack_crc": 123,
"ack_timeout_time": now - 1,
},
{
"client_pubkey": (b"M" * 32).hex(),
"pending_ack_crc": 0,
"ack_timeout_time": now - 1,
},
]
await rs._check_ack_timeouts()
rs._handle_ack_timeout.assert_awaited_once_with(b"L" * 32)
@pytest.mark.asyncio
async def test_room_server_start_and_stop_are_idempotent():
rs = _make_room_server()
await rs.start()
assert rs._running is True
first_task = rs._sync_task
# Second start should not replace task.
await rs.start()
assert rs._sync_task is first_task
await rs.stop()
assert rs._running is False
# Stop again should be safe.
await rs.stop()
# Ensure task is cleaned up.
if first_task:
assert first_task.cancelled() or first_task.done()