fix: update transport key generation to use 16-byte length and add corresponding test

This commit is contained in:
Lloyd
2026-05-27 14:27:59 +01:00
parent 8eaf24ab35
commit 62f35c4b45
2 changed files with 38 additions and 6 deletions
+5 -6
View File
@@ -1641,13 +1641,13 @@ class SQLiteHandler:
logger.error(f"Failed to get adverts count for contact_type '{contact_type}': {e}")
return 0
def generate_transport_key(self, name: str, key_length_bytes: int = 32) -> str:
def generate_transport_key(self, name: str, key_length_bytes: int = 16) -> str:
"""
Generate a transport key using the proper MeshCore key derivation.
Generate a transport key using MeshCore-compatible key derivation.
Args:
name: The key name to derive the key from
key_length_bytes: Length of the key in bytes (default: 32 bytes = 256 bits)
key_length_bytes: Fallback random key length in bytes (default: 16)
Returns:
A base64-encoded transport key derived from the name
@@ -1655,7 +1655,6 @@ class SQLiteHandler:
try:
from pymc_core.protocol.transport_keys import get_auto_key_for
# Use the proper MeshCore key derivation function
key_bytes = get_auto_key_for(name)
# Encode to base64 for safe storage and transmission
@@ -1668,9 +1667,9 @@ class SQLiteHandler:
except Exception as e:
logger.error(f"Failed to generate transport key using get_auto_key_for: {e}")
# Fallback to secure random if MeshCore function fails
# Fallback to a transport-compatible random 16-byte key if derivation fails.
try:
random_bytes = secrets.token_bytes(key_length_bytes)
random_bytes = secrets.token_bytes(16)
key = base64.b64encode(random_bytes).decode("utf-8")
logger.warning(f"Using fallback random key generation for '{name}'")
return key
+33
View File
@@ -1,4 +1,7 @@
import base64
from pathlib import Path
import sys
import types
import pytest
@@ -61,6 +64,36 @@ def test_transport_key_crud_cycle(tmp_path):
assert h.delete_transport_key(key_id) is False
def test_generate_transport_key_uses_implicit_hashtag_region(tmp_path, monkeypatch):
h = _make_handler(tmp_path)
captured = {}
fake_transport_keys = types.ModuleType("pymc_core.protocol.transport_keys")
def _fake_get_auto_key_for(name: str) -> bytes:
captured["name"] = name
return b"0123456789abcdef"
fake_transport_keys.get_auto_key_for = _fake_get_auto_key_for
fake_protocol = types.ModuleType("pymc_core.protocol")
fake_protocol.transport_keys = fake_transport_keys
fake_core = types.ModuleType("pymc_core")
fake_core.protocol = fake_protocol
monkeypatch.setitem(sys.modules, "pymc_core", fake_core)
monkeypatch.setitem(sys.modules, "pymc_core.protocol", fake_protocol)
monkeypatch.setitem(sys.modules, "pymc_core.protocol.transport_keys", fake_transport_keys)
generated = h.generate_transport_key("eu")
generated_bytes = base64.b64decode(generated)
assert captured["name"] == "eu"
assert generated_bytes == b"0123456789abcdef"
assert len(generated_bytes) == 16
def test_room_messages_and_sync_flow(tmp_path):
h = _make_handler(tmp_path)