Files
pyMC_Repeater/tests/test_handler_helpers_acl_advert.py
Lloyd 45a44eb47b Refactor test cases and base code for consistency and readability
- Updated byte representations in tests to use lowercase hex format for consistency.
- Reformatted code for better readability, including line breaks and indentation adjustments.
- Consolidated multiple lines into single lines where appropriate to enhance clarity.
- Ensured that all test cases maintain consistent formatting and style across the test suite.
2026-05-27 20:15:10 +01:00

299 lines
9.5 KiB
Python

import time
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from repeater.handler_helpers.acl import ACL, PERM_ACL_ADMIN, PERM_ACL_GUEST
from repeater.handler_helpers.advert import AdvertHelper, MeshActivityTier
class _FakeIdentity:
def __init__(self, pubkey: bytes):
self._pubkey = pubkey
def get_public_key(self):
return self._pubkey
class _FakePacket:
def __init__(self, *, header=0x00, path=None, pkt_hash=b"\xaa" * 16):
self.header = header
self.path = path if path is not None else bytearray()
self._pkt_hash = pkt_hash
self.mark_do_not_retransmit = MagicMock()
self.drop_reason = None
def calculate_packet_hash(self):
return self._pkt_hash
def test_acl_blank_password_guest_rules_and_room_server_password_requirements():
identity = _FakeIdentity(b"A" * 32)
acl = ACL(allow_read_only=True)
ok, perms = acl.authenticate_client(
client_identity=identity,
shared_secret=b"secret",
password="",
timestamp=10,
)
assert ok is True
assert perms == PERM_ACL_GUEST
acl_ro_disabled = ACL(allow_read_only=False)
ok2, perms2 = acl_ro_disabled.authenticate_client(
client_identity=identity,
shared_secret=b"secret",
password="",
timestamp=10,
)
assert ok2 is False
assert perms2 == 0
room_cfg = {"type": "room_server", "settings": {}}
ok3, perms3 = acl.authenticate_client(
client_identity=identity,
shared_secret=b"secret",
password="admin",
timestamp=11,
target_identity_name="room-a",
target_identity_config=room_cfg,
)
assert ok3 is False
assert perms3 == 0
def test_acl_admin_login_sets_client_state_and_replay_protection():
identity = _FakeIdentity(b"B" * 32)
acl = ACL(max_clients=5, admin_password="top-secret", guest_password="guest")
ok, perms = acl.authenticate_client(
client_identity=identity,
shared_secret=b"k" * 32,
password="top-secret",
timestamp=100,
sync_since=77,
)
assert ok is True
assert perms == PERM_ACL_ADMIN
client = acl.get_client(b"B" * 40)
assert client is not None
assert client.shared_secret == b"k" * 32
assert client.last_timestamp == 100
assert client.sync_since == 77
assert client.is_admin() is True
replay_ok, replay_perms = acl.authenticate_client(
client_identity=identity,
shared_secret=b"k" * 32,
password="top-secret",
timestamp=100,
)
assert replay_ok is False
assert replay_perms == 0
def test_acl_max_clients_invalid_password_and_remove_client_paths():
acl = ACL(max_clients=1, admin_password="a", guest_password="g")
id_a = _FakeIdentity(b"C" * 32)
id_b = _FakeIdentity(b"D" * 32)
ok_a, _ = acl.authenticate_client(id_a, b"s", "a", timestamp=1)
assert ok_a is True
assert acl.get_num_clients() == 1
full_ok, full_perms = acl.authenticate_client(id_b, b"s", "a", timestamp=2)
assert full_ok is False
assert full_perms == 0
bad_ok, bad_perms = acl.authenticate_client(id_a, b"s", "bad", timestamp=3)
assert bad_ok is False
assert bad_perms == 0
assert acl.remove_client(b"C" * 32) is True
assert acl.remove_client(b"C" * 32) is False
@pytest.mark.asyncio
async def test_advert_process_invalid_packet_marks_drop_and_no_storage():
storage = SimpleNamespace(get_neighbors=lambda: {}, record_advert=MagicMock())
helper = AdvertHelper(local_identity=None, storage=storage, config={"repeater": {}})
helper.advert_handler = AsyncMock(return_value={"valid": False})
packet = _FakePacket()
await helper.process_advert_packet(packet, rssi=-80, snr=6.5)
packet.mark_do_not_retransmit.assert_called_once()
assert packet.drop_reason == "Invalid advert packet"
storage.record_advert.assert_not_called()
@pytest.mark.asyncio
async def test_advert_duplicate_reheard_skips_storage_and_tracks_duplicate_stat():
storage = SimpleNamespace(get_neighbors=lambda: {}, record_advert=MagicMock())
helper = AdvertHelper(local_identity=None, storage=storage, config={"repeater": {}})
helper.advert_handler = AsyncMock(
return_value={
"valid": True,
"public_key": "11" * 32,
"name": "node-1",
"contact_type": "REPEATER",
"latitude": 1.0,
"longitude": 2.0,
}
)
packet = _FakePacket(pkt_hash=b"\x10" * 16)
await helper.process_advert_packet(packet, rssi=-70, snr=5.0)
await helper.process_advert_packet(packet, rssi=-70, snr=5.0)
assert storage.record_advert.call_count == 1
stats = helper.get_rate_limit_stats()
assert stats["stats"]["adverts_duplicate_reheard"] == 1
@pytest.mark.asyncio
async def test_advert_own_advert_is_ignored_after_validation():
local = _FakeIdentity(bytes.fromhex("22" * 32))
storage = SimpleNamespace(get_neighbors=lambda: {}, record_advert=MagicMock())
helper = AdvertHelper(local_identity=local, storage=storage, config={"repeater": {}})
helper.advert_handler = AsyncMock(
return_value={
"valid": True,
"public_key": ("22" * 32),
"name": "self-node",
"contact_type": "REPEATER",
"latitude": 1.0,
"longitude": 2.0,
}
)
await helper.process_advert_packet(_FakePacket(), rssi=-60, snr=8.0)
storage.record_advert.assert_not_called()
@pytest.mark.asyncio
async def test_advert_new_neighbor_persists_record_and_flags_new_neighbor():
stored_records = []
def _record_advert(data):
stored_records.append(data)
storage = SimpleNamespace(get_neighbors=lambda: {}, record_advert=_record_advert)
helper = AdvertHelper(local_identity=None, storage=storage, config={"repeater": {}})
helper.advert_handler = AsyncMock(
return_value={
"valid": True,
"public_key": "33" * 32,
"name": "neighbor-a",
"contact_type": "REPEATER",
"latitude": 10.0,
"longitude": 20.0,
}
)
packet = _FakePacket(header=0x01, path=bytearray())
await helper.process_advert_packet(packet, rssi=-75, snr=4.2)
assert len(stored_records) == 1
record = stored_records[0]
assert record["pubkey"] == "33" * 32
assert record["node_name"] == "neighbor-a"
assert record["is_new_neighbor"] is True
assert record["zero_hop"] is True
def test_advert_allow_advert_rate_limit_penalty_and_quiet_bypass():
cfg = {
"repeater": {
"advert_adaptive": {"enabled": False},
"advert_rate_limit": {
"enabled": True,
"bucket_capacity": 1,
"refill_tokens": 1,
"refill_interval_seconds": 9999,
"min_interval_seconds": 100,
},
"advert_penalty_box": {
"enabled": True,
"violation_threshold": 1,
"violation_decay_seconds": 1000,
"base_penalty_seconds": 10,
"penalty_multiplier": 2,
"max_penalty_seconds": 60,
},
}
}
helper = AdvertHelper(local_identity=None, storage=None, config=cfg)
t0 = time.time()
ok1, reason1 = helper._allow_advert("AA" * 16, t0)
assert ok1 is True
assert reason1 == ""
ok2, reason2 = helper._allow_advert("AA" * 16, t0 + 1)
assert ok2 is False
assert "min-interval" in reason2
ok3, reason3 = helper._allow_advert("AA" * 16, t0 + 2)
assert ok3 is False
assert "penalty box active" in reason3
# QUIET tier bypass when adaptive mode is on
helper._adaptive_enabled = True
helper._current_tier = MeshActivityTier.QUIET
ok4, _ = helper._allow_advert("AA" * 16, t0 + 3)
assert ok4 is True
def test_advert_reload_config_and_cleanup_old_state_bounds_memory():
helper = AdvertHelper(local_identity=None, storage=None, config={"repeater": {}})
helper.config = {
"repeater": {
"advert_adaptive": {
"enabled": True,
"ewma_alpha": 0.2,
"hysteresis_seconds": 10,
"thresholds": {"normal": 2, "busy": 7, "congested": 12},
},
"advert_rate_limit": {
"enabled": True,
"bucket_capacity": 3,
"refill_tokens": 2,
"refill_interval_seconds": 30,
"min_interval_seconds": 5,
},
"advert_penalty_box": {
"enabled": True,
"violation_threshold": 2,
"violation_decay_seconds": 20,
"base_penalty_seconds": 15,
"penalty_multiplier": 2,
"max_penalty_seconds": 120,
},
"advert_dedupe": {"ttl_seconds": 30, "max_hashes": 100},
}
}
helper.reload_config()
assert helper._ewma_alpha == 0.2
assert helper._base_bucket_capacity == 3.0
assert helper._advert_dedupe_ttl_seconds == 30.0
now = time.time()
helper._recent_advert_hashes["old"] = now - 1
helper._penalty_until["pk"] = now - 1
helper._bucket_state["oldpk"] = {
"last_seen": now - (helper._bucket_state_retention_seconds + 1)
}
helper._violation_state["oldpk"] = {"count": 3, "last_violation": now - 9999}
helper._cleanup_old_state(now)
assert "old" not in helper._recent_advert_hashes
assert "pk" not in helper._penalty_until
assert "oldpk" not in helper._bucket_state