Files
pyMC_Repeater/tests/test_policy_engine.py
T
2026-06-09 13:51:18 +01:00

796 lines
24 KiB
Python

from unittest.mock import patch
import yaml
from pymc_core.protocol.constants import PAYLOAD_TYPE_GRP_TXT
from pymc_core.protocol.identity import LocalIdentity
from pymc_core.protocol.packet_builder import PacketBuilder
from repeater.policy_engine import PolicyEngine
class _DummyPacket:
def __init__(
self,
payload=b"\x01\x02",
path_hashes=None,
transport_codes=None,
payload_type=None,
):
self.payload = bytearray(payload)
self.transport_codes = transport_codes or [0, 0]
self._path_hashes = path_hashes or []
self._payload_type = payload_type
def get_path_hashes_hex(self):
return self._path_hashes
def get_payload_type(self):
return self._payload_type
def test_policy_engine_disabled_allows():
engine = PolicyEngine({"enabled": False, "rules": []})
pkt = _DummyPacket()
decision = engine.evaluate(pkt, {"hop_count": 3})
assert decision.action == "allow"
assert decision.matched is False
def test_policy_engine_first_match_wins_drop():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"rules": [
{
"id": 10,
"enabled": True,
"if": {"all": [{"field": "hop_count", "op": "greater_than", "value": 2}]},
"then": {"action": "drop"},
},
{
"id": 20,
"enabled": True,
"if": {"all": [{"field": "hop_count", "op": "greater_than", "value": 1}]},
"then": {"action": "allow"},
},
],
}
)
pkt = _DummyPacket()
decision = engine.evaluate(pkt, {"hop_count": 3})
assert decision.action == "drop"
assert decision.matched is True
assert decision.rule_id == 10
def test_policy_engine_default_action_applies_when_no_match():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "drop",
"rules": [
{
"id": 1,
"enabled": True,
"if": {"all": [{"field": "route_type", "op": "equals", "value": 1}]},
"then": {"action": "allow"},
}
],
}
)
pkt = _DummyPacket()
decision = engine.evaluate(pkt, {"route_type": 2})
assert decision.action == "drop"
assert decision.matched is False
def test_policy_engine_log_only_action_is_returned_when_rule_matches():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"rules": [
{
"id": 11,
"enabled": True,
"if": {"all": [{"field": "route_type", "op": "equals", "value": 1}]},
"then": {"action": "log_only"},
}
],
}
)
pkt = _DummyPacket()
decision = engine.evaluate(pkt, {"route_type": 1})
assert decision.matched is True
assert decision.action == "log_only"
def test_load_config_reads_sibling_policy_yaml(tmp_path):
from repeater.config import load_config
config_path = tmp_path / "config.yaml"
policy_path = tmp_path / "policy.yaml"
config_path.write_text(
yaml.safe_dump(
{
"repeater": {
"node_name": "test",
"security": {
"max_clients": 1,
"admin_password": "a",
"guest_password": "g",
"allow_read_only": False,
"jwt_secret": "x",
"jwt_expiry_minutes": 60,
},
},
"radio": {"frequency": 869618000, "bandwidth": 62500},
}
),
encoding="utf-8",
)
policy_path.write_text(
yaml.safe_dump(
{
"policy_engine": {
"enabled": True,
"default_action": "allow",
"rules": [
{
"id": 1,
"enabled": True,
"if": {
"all": [
{
"field": "hop_count",
"op": "greater_than",
"value": 4,
}
]
},
"then": {"action": "drop"},
}
],
}
}
),
encoding="utf-8",
)
with patch("repeater.config._load_or_create_identity_key", return_value=b"k" * 32):
cfg = load_config(str(config_path))
assert cfg["policy_engine"]["enabled"] is True
assert len(cfg["policy_engine"]["rules"]) == 1
assert cfg["policy_file_path"].endswith("policy.yaml")
def test_load_config_missing_policy_yaml_uses_safe_defaults(tmp_path):
from repeater.config import load_config
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"repeater": {
"node_name": "test",
"security": {
"max_clients": 1,
"admin_password": "a",
"guest_password": "g",
"allow_read_only": False,
"jwt_secret": "x",
"jwt_expiry_minutes": 60,
},
},
"radio": {"frequency": 869618000, "bandwidth": 62500},
}
),
encoding="utf-8",
)
with patch("repeater.config._load_or_create_identity_key", return_value=b"k" * 32):
cfg = load_config(str(config_path))
assert cfg["policy_engine"]["enabled"] is False
assert cfg["policy_engine"]["rules"] == []
def test_policy_engine_path_hash_intersects_group_object():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {
"channel_hash_groups": {
"ops_channels": ["0x42", "0xAA"],
}
},
"rules": [
{
"id": 101,
"enabled": True,
"if": {
"all": [
{
"field": "path_hashes",
"op": "intersects",
"value": "@channel_hash_groups.ops_channels",
}
]
},
"then": {"action": "drop"},
}
],
}
)
pkt = _DummyPacket(path_hashes=["0x10", "0x42"])
decision = engine.evaluate(pkt, {"hop_count": 2})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_in_operator_for_scalar_vs_group_list():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {"allow_modes": {"prod_modes": ["forward", "monitor"]}},
"rules": [
{
"id": 202,
"enabled": True,
"if": {
"all": [
{
"field": "mode",
"op": "in",
"value": "@allow_modes.prod_modes",
}
]
},
"then": {"action": "drop"},
}
],
}
)
pkt = _DummyPacket()
decision = engine.evaluate(pkt, {"mode": "forward"})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_matches_decrypted_channel_message_body_from_policy_objects():
channel_secret = (b"policy-channel-secret" + b"\x00" * 32)[:32].hex()
packet = PacketBuilder.create_group_datagram(
group_name="ops",
local_identity=LocalIdentity(),
message="hello mesh from channel",
sender_name="Alice",
channels_config=[{"name": "ops", "secret": channel_secret}],
)
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {
"channels": {
"ops": {
"secret": channel_secret,
}
}
},
"rules": [
{
"id": 303,
"enabled": True,
"if": {
"all": [
{
"field": "channel_message_body",
"op": "contains",
"value": "hello mesh",
}
]
},
"then": {"action": "drop"},
}
],
}
)
decision = engine.evaluate(packet, {"payload_type": packet.get_payload_type()})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_matches_decrypted_channel_sender_from_policy_objects():
channel_secret = (b"policy-channel-secret" + b"\x00" * 32)[:32].hex()
packet = PacketBuilder.create_group_datagram(
group_name="ops",
local_identity=LocalIdentity(),
message="hello mesh from channel",
sender_name="Alice",
channels_config=[{"name": "ops", "secret": channel_secret}],
)
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {
"channels": {
"ops": {
"secret": channel_secret,
}
}
},
"rules": [
{
"id": 304,
"enabled": True,
"if": {
"all": [
{
"field": "channel_sender",
"op": "equals",
"value": "Alice",
}
]
},
"then": {"action": "drop"},
}
],
}
)
decision = engine.evaluate(packet, {"payload_type": packet.get_payload_type()})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_path_hashes_intersects_normalized_literal_list():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"rules": [
{
"id": 404,
"enabled": True,
"if": {
"all": [
{
"field": "path_hashes",
"op": "intersects",
"value": ["0x002A", "00AA"],
}
]
},
"then": {"action": "drop"},
}
],
}
)
pkt = _DummyPacket(path_hashes=["002A", "00AA"])
decision = engine.evaluate(pkt, {"hop_count": 2})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_path_hashes_do_not_match_different_byte_lengths():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"rules": [
{
"id": 405,
"enabled": True,
"if": {
"all": [
{
"field": "path_hashes",
"op": "contains",
"value": "0x42",
}
]
},
"then": {"action": "drop"},
}
],
}
)
pkt = _DummyPacket(path_hashes=["0042"])
decision = engine.evaluate(pkt, {"hop_count": 2})
assert decision.matched is False
assert decision.action == "allow"
def test_policy_engine_path_hashes_reject_mixed_literal_byte_lengths():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"rules": [
{
"id": 406,
"enabled": True,
"if": {
"all": [
{
"field": "path_hashes",
"op": "intersects",
"value": ["42", "0042"],
}
]
},
"then": {"action": "drop"},
}
],
}
)
pkt = _DummyPacket(path_hashes=["42"])
decision = engine.evaluate(pkt, {"hop_count": 2})
assert decision.matched is False
assert decision.action == "allow"
def test_policy_engine_channel_hash_in_group_object():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {
"channel_hash_groups": {
"ops_channels": ["0x42", "0x99"],
}
},
"rules": [
{
"id": 407,
"enabled": True,
"if": {
"all": [
{
"field": "channel_hash",
"op": "in",
"value": "@channel_hash_groups.ops_channels",
}
]
},
"then": {"action": "drop"},
}
],
}
)
pkt = _DummyPacket(payload=b"\x42\xaa\xbb\xcc", payload_type=PAYLOAD_TYPE_GRP_TXT)
decision = engine.evaluate(pkt, {"hop_count": 2})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_channel_hash_rejects_oversized_literal():
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"rules": [
{
"id": 408,
"enabled": True,
"if": {
"all": [
{
"field": "channel_hash",
"op": "equals",
"value": "0x8B3387E9C5CDE8000000000000000000",
}
]
},
"then": {"action": "drop"},
}
],
}
)
pkt = _DummyPacket(payload=b"\x11\xaa\xbb\xcc", payload_type=PAYLOAD_TYPE_GRP_TXT)
decision = engine.evaluate(pkt, {"hop_count": 2})
assert decision.matched is False
assert decision.action == "allow"
def test_policy_engine_channel_hash_accepts_full_secret_literal():
public_secret = "8b3387e9c5cdea6ac9e5edbaa115cd72"
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"rules": [
{
"id": 409,
"enabled": True,
"if": {
"all": [
{
"field": "channel_hash",
"op": "equals",
"value": public_secret,
}
]
},
"then": {"action": "drop"},
}
],
}
)
# Derived hash for the secret above is 0x11.
pkt = _DummyPacket(payload=b"\x11\xaa\xbb\xcc", payload_type=PAYLOAD_TYPE_GRP_TXT)
decision = engine.evaluate(pkt, {"hop_count": 2})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_channel_decryptable_true_with_matching_secret():
channel_secret = (b"policy-channel-secret" + b"\x00" * 32)[:32].hex()
packet = PacketBuilder.create_group_datagram(
group_name="ops",
local_identity=LocalIdentity(),
message="decryptable test",
sender_name="Alice",
channels_config=[{"name": "ops", "secret": channel_secret}],
)
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {
"channels": {
"ops": {
"secret": channel_secret,
}
}
},
"rules": [
{
"id": 410,
"enabled": True,
"if": {
"all": [
{
"field": "channel_decryptable",
"op": "equals",
"value": True,
}
]
},
"then": {"action": "drop"},
}
],
}
)
decision = engine.evaluate(packet, {"payload_type": packet.get_payload_type()})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_channel_decryptable_false_with_non_matching_secret():
packet_secret = (b"packet-channel-secret" + b"\x00" * 32)[:32].hex()
wrong_secret = (b"wrong-channel-secret" + b"\x00" * 32)[:32].hex()
packet = PacketBuilder.create_group_datagram(
group_name="ops",
local_identity=LocalIdentity(),
message="undecryptable test",
sender_name="Alice",
channels_config=[{"name": "ops", "secret": packet_secret}],
)
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {
"channels": {
"ops": {
"secret": wrong_secret,
}
}
},
"rules": [
{
"id": 411,
"enabled": True,
"if": {
"all": [
{
"field": "channel_decryptable",
"op": "equals",
"value": False,
}
]
},
"then": {"action": "drop"},
}
],
}
)
decision = engine.evaluate(packet, {"payload_type": packet.get_payload_type()})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_channel_decryptable_accepts_channels_list_with_psk():
channel_secret = (b"policy-channel-secret" + b"\x00" * 32)[:32].hex()
packet = PacketBuilder.create_group_datagram(
group_name="ops",
local_identity=LocalIdentity(),
message="list schema decryptable test",
sender_name="Alice",
channels_config=[{"name": "ops", "secret": channel_secret}],
)
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {
"channels": [
{
"name": "ops",
"psk": channel_secret,
}
]
},
"rules": [
{
"id": 412,
"enabled": True,
"if": {
"all": [
{
"field": "channel_decryptable",
"op": "equals",
"value": True,
}
]
},
"then": {"action": "drop"},
}
],
}
)
decision = engine.evaluate(packet, {"payload_type": packet.get_payload_type()})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_channel_decryptable_uses_inline_channel_hash_secret_literal():
channel_secret = (b"policy-channel-secret" + b"\x00" * 32)[:32].hex()
packet = PacketBuilder.create_group_datagram(
group_name="ops",
local_identity=LocalIdentity(),
message="inline secret decryptable test",
sender_name="Alice",
channels_config=[{"name": "ops", "secret": channel_secret}],
)
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {"channels": {}},
"rules": [
{
"id": 413,
"enabled": True,
"if": {
"all": [
{
"field": "channel_hash",
"op": "equals",
"value": channel_secret,
},
{
"field": "channel_decryptable",
"op": "equals",
"value": True,
},
]
},
"then": {"action": "drop"},
}
],
}
)
decision = engine.evaluate(packet, {"payload_type": packet.get_payload_type()})
assert decision.matched is True
assert decision.action == "drop"
def test_policy_engine_channel_decryptable_uses_channel_hash_group_secret():
channel_secret = (b"policy-channel-secret" + b"\x00" * 32)[:32].hex()
packet = PacketBuilder.create_group_datagram(
group_name="ops",
local_identity=LocalIdentity(),
message="group secret decryptable test",
sender_name="Alice",
channels_config=[{"name": "ops", "secret": channel_secret}],
)
engine = PolicyEngine(
{
"enabled": True,
"default_action": "allow",
"objects": {
"channels": {},
"channel_hash_groups": {
"ops_channels": [channel_secret],
},
},
"rules": [
{
"id": 414,
"enabled": True,
"if": {
"all": [
{
"field": "channel_hash",
"op": "in",
"value": "@channel_hash_groups.ops_channels",
},
{
"field": "channel_decryptable",
"op": "equals",
"value": True,
},
]
},
"then": {"action": "drop"},
}
],
}
)
decision = engine.evaluate(packet, {"payload_type": packet.get_payload_type()})
assert decision.matched is True
assert decision.action == "drop"