mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-06-22 02:55:53 +02:00
199 lines
8.3 KiB
Python
199 lines
8.3 KiB
Python
"""Tests for regional flood-scope (transport code) resolution."""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from app.path_utils import parse_packet_envelope
|
|
from app.region_resolver import compute_transport_code, resolve_region
|
|
|
|
FIXTURES_PATH = Path(__file__).parent / "fixtures" / "websocket_events.json"
|
|
with open(FIXTURES_PATH) as f:
|
|
FIXTURES = json.load(f)
|
|
|
|
|
|
def _reference_code(region_name: str, payload_type: int, payload: bytes) -> int:
|
|
"""Independent reimplementation of the firmware algorithm for cross-checking."""
|
|
key = hashlib.sha256(("#" + region_name).encode()).digest()[:16]
|
|
digest = hmac.new(key, bytes([payload_type]) + payload, hashlib.sha256).digest()
|
|
code = int.from_bytes(digest[:2], "little")
|
|
if code == 0:
|
|
return 1
|
|
if code == 0xFFFF:
|
|
return 0xFFFE
|
|
return code
|
|
|
|
|
|
class TestComputeTransportCode:
|
|
def test_matches_reference_algorithm(self):
|
|
payload = bytes.fromhex("0badcafe1234")
|
|
assert compute_transport_code("nl-gr", 0x05, payload) == _reference_code(
|
|
"nl-gr", 0x05, payload
|
|
)
|
|
|
|
def test_hashtag_prefix_is_equivalent(self):
|
|
payload = b"hello"
|
|
assert compute_transport_code("nl-gr", 0x05, payload) == compute_transport_code(
|
|
"#nl-gr", 0x05, payload
|
|
)
|
|
|
|
def test_blank_region_returns_none(self):
|
|
assert compute_transport_code("", 0x05, b"x") is None
|
|
|
|
def test_code_depends_on_payload(self):
|
|
# The code is a keyed MAC over the payload, so different payloads under the
|
|
# same region produce different codes (this is why there is no static map).
|
|
assert compute_transport_code("nl-gr", 0x05, b"a") != compute_transport_code(
|
|
"nl-gr", 0x05, b"b"
|
|
)
|
|
|
|
def test_never_returns_reserved_values(self):
|
|
for i in range(3000):
|
|
code = compute_transport_code(f"region-{i}", 0x05, bytes([i & 0xFF, (i >> 8) & 0xFF]))
|
|
assert code not in (0x0000, 0xFFFF)
|
|
|
|
|
|
class TestResolveRegion:
|
|
def test_resolves_first_matching_region(self):
|
|
payload = bytes.fromhex("c0ffee")
|
|
code = compute_transport_code("nl-gr", 0x05, payload)
|
|
assert resolve_region(0x05, payload, code, ["de-by", "nl-gr", "fr"]) == "nl-gr"
|
|
|
|
def test_no_match_returns_none(self):
|
|
payload = bytes.fromhex("c0ffee")
|
|
code = compute_transport_code("nl-gr", 0x05, payload)
|
|
assert resolve_region(0x05, payload, code, ["de-by", "fr"]) is None
|
|
|
|
def test_empty_candidates_returns_none(self):
|
|
assert resolve_region(0x05, b"x", 0x1234, []) is None
|
|
|
|
def test_blank_candidate_names_skipped(self):
|
|
payload = b"x"
|
|
code = compute_transport_code("nl-gr", 0x05, payload)
|
|
assert resolve_region(0x05, payload, code, ["", "nl-gr"]) == "nl-gr"
|
|
|
|
|
|
class TestEnvelopeTransportCodes:
|
|
def test_flood_packet_has_no_transport_codes(self):
|
|
raw = bytes.fromhex(FIXTURES["channel_message"]["raw_packet_hex"])
|
|
env = parse_packet_envelope(raw)
|
|
assert env is not None
|
|
assert env.transport_codes is None
|
|
|
|
def test_transport_routed_packet_exposes_codes(self):
|
|
# Build a TRANSPORT_FLOOD packet: header | code_1 | code_2 | path_byte | payload
|
|
code_1, code_2 = 0x9164, 0x0000
|
|
header = bytes([0x05 << 2]) # payload_type=GROUP_TEXT, route_type=TRANSPORT_FLOOD(0)
|
|
raw = (
|
|
header
|
|
+ code_1.to_bytes(2, "little")
|
|
+ code_2.to_bytes(2, "little")
|
|
+ bytes([0x00]) # path byte: 0 hops, 1-byte hash
|
|
+ b"payloadbytes"
|
|
)
|
|
env = parse_packet_envelope(raw)
|
|
assert env is not None
|
|
assert env.transport_codes == (code_1, code_2)
|
|
assert env.payload == b"payloadbytes"
|
|
|
|
|
|
def _build_transport_channel_packet(region_name: str | None, code_override: int | None = None):
|
|
"""Rebuild the channel fixture as a TRANSPORT_FLOOD packet, scoped to a region."""
|
|
raw = bytes.fromhex(FIXTURES["channel_message"]["raw_packet_hex"])
|
|
env = parse_packet_envelope(raw)
|
|
assert env is not None and env.hop_count == 0
|
|
payload = env.payload
|
|
if code_override is not None:
|
|
code_1 = code_override
|
|
else:
|
|
code_1 = compute_transport_code(region_name, 0x05, payload)
|
|
header = bytes([0x05 << 2]) # GROUP_TEXT + TRANSPORT_FLOOD
|
|
return (
|
|
header + code_1.to_bytes(2, "little") + (0).to_bytes(2, "little") + bytes([0x00]) + payload
|
|
), code_1
|
|
|
|
|
|
class TestRegionPersistedOnChannelMessage:
|
|
@pytest.mark.asyncio
|
|
async def test_known_region_stored_on_message_and_broadcast(self, test_db, captured_broadcasts):
|
|
from app.packet_processor import process_raw_packet
|
|
from app.repository import AppSettingsRepository, ChannelRepository, MessageRepository
|
|
|
|
fixture = FIXTURES["channel_message"]
|
|
await ChannelRepository.upsert(
|
|
key=fixture["channel_key_hex"].upper(), name=fixture["channel_name"], is_hashtag=True
|
|
)
|
|
await AppSettingsRepository.update(known_regions=["nl-gr"])
|
|
|
|
packet_bytes, code = _build_transport_channel_packet("nl-gr")
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
await process_raw_packet(packet_bytes, timestamp=1700000000)
|
|
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=fixture["channel_key_hex"].upper(), limit=10
|
|
)
|
|
assert len(messages) == 1
|
|
assert messages[0].region == "nl-gr"
|
|
assert messages[0].transport_code == code
|
|
|
|
# WS message + raw_packet broadcasts both carry the region
|
|
msg_b = [b for b in broadcasts if b["type"] == "message"][0]
|
|
assert msg_b["data"]["region"] == "nl-gr"
|
|
assert msg_b["data"]["transport_code"] == code
|
|
raw_b = [b for b in broadcasts if b["type"] == "raw_packet"][0]
|
|
assert raw_b["data"]["region"] == "nl-gr"
|
|
assert raw_b["data"]["transport_code"] == code
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unlisted_region_keeps_code_but_no_name(self, test_db, captured_broadcasts):
|
|
from app.packet_processor import process_raw_packet
|
|
from app.repository import AppSettingsRepository, ChannelRepository, MessageRepository
|
|
|
|
fixture = FIXTURES["channel_message"]
|
|
await ChannelRepository.upsert(
|
|
key=fixture["channel_key_hex"].upper(), name=fixture["channel_name"], is_hashtag=True
|
|
)
|
|
# Region list does NOT include the scope this packet is tagged with.
|
|
await AppSettingsRepository.update(known_regions=["somewhere-else"])
|
|
|
|
packet_bytes, code = _build_transport_channel_packet("nl-gr")
|
|
_, mock_broadcast = captured_broadcasts
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
await process_raw_packet(packet_bytes, timestamp=1700000000)
|
|
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=fixture["channel_key_hex"].upper(), limit=10
|
|
)
|
|
assert len(messages) == 1
|
|
# Scoped (transport_code set) but region unknown → distinguishable from unscoped.
|
|
assert messages[0].transport_code == code
|
|
assert messages[0].region is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plain_flood_message_is_unscoped(self, test_db, captured_broadcasts):
|
|
from app.packet_processor import process_raw_packet
|
|
from app.repository import AppSettingsRepository, ChannelRepository, MessageRepository
|
|
|
|
fixture = FIXTURES["channel_message"]
|
|
await ChannelRepository.upsert(
|
|
key=fixture["channel_key_hex"].upper(), name=fixture["channel_name"], is_hashtag=True
|
|
)
|
|
await AppSettingsRepository.update(known_regions=["nl-gr"])
|
|
|
|
packet_bytes = bytes.fromhex(fixture["raw_packet_hex"]) # original FLOOD packet
|
|
_, mock_broadcast = captured_broadcasts
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
await process_raw_packet(packet_bytes, timestamp=1700000000)
|
|
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=fixture["channel_key_hex"].upper(), limit=10
|
|
)
|
|
assert len(messages) == 1
|
|
assert messages[0].transport_code is None
|
|
assert messages[0].region is None
|