feat: enhance companion identity management and API documentation

- Added functionality to heal companion registration names with empty values.
- Improved handling of identity keys and public key derivation for companions.
- Updated API endpoints to support companion identity lookups using name, lookup_identity_key, or public_key_prefix.
- Enhanced OpenAPI documentation to clarify requirements for identity creation, updates, and deletions, including trimming whitespace from names.
This commit is contained in:
agessaman
2026-03-21 17:24:14 -07:00
parent ddaa62fa9e
commit 2e25467c5d
4 changed files with 574 additions and 60 deletions

View File

@@ -0,0 +1,190 @@
"""Resolve companion config rows by registration name, identity key, or public key prefix."""
from __future__ import annotations
import logging
from typing import Any, List, Optional, Set, Tuple
from repeater.companion.utils import normalize_companion_identity_key
logger = logging.getLogger(__name__)
# Minimum hex chars for identity_key / public_key prefix disambiguation (4 bytes)
_MIN_PREFIX_HEX_LEN = 8
def _companion_registration_name(entry: dict) -> str:
n = entry.get("name")
if n is None:
return ""
return str(n).strip()
def identity_key_bytes_from_config(identity_key: Any) -> Optional[bytes]:
"""Parse companion identity_key from YAML (str hex or raw bytes)."""
if identity_key is None:
return None
if isinstance(identity_key, (bytes, bytearray, memoryview)):
raw = bytes(identity_key)
return raw if len(raw) in (32, 64) else None
if isinstance(identity_key, str):
try:
raw = bytes.fromhex(normalize_companion_identity_key(identity_key))
except ValueError:
return None
return raw if len(raw) in (32, 64) else None
return None
def identity_key_hex_normalized(identity_key: Any) -> Optional[str]:
"""Lowercase hex string of the raw key bytes (64 or 128 chars), or None."""
raw = identity_key_bytes_from_config(identity_key)
if raw is None:
return None
return raw.hex().lower()
def derive_companion_public_key_hex(identity_key: Any) -> Optional[str]:
"""Return ed25519 public key hex for a companion seed, or None if invalid."""
raw = identity_key_bytes_from_config(identity_key)
if raw is None:
return None
try:
from pymc_core import LocalIdentity
identity = LocalIdentity(seed=raw)
return identity.get_public_key().hex()
except Exception as e:
logger.debug("derive_companion_public_key_hex failed: %s", e)
return None
def suggest_companion_name_from_pubkey(pubkey_hex: str, prefix_len: int = 8) -> str:
"""Stable default registration name: companion_<first prefix_len hex chars of pubkey>."""
p = pubkey_hex.strip().lower()
if p.startswith("0x"):
p = p[2:]
if len(p) < prefix_len:
prefix = p
else:
prefix = p[:prefix_len]
return f"companion_{prefix}"
def unique_suggested_name(
pubkey_hex: str,
existing_names: set,
prefix_len: int = 8,
) -> str:
"""Like suggest_companion_name_from_pubkey but appends -2, -3, ... if name collides."""
base = suggest_companion_name_from_pubkey(pubkey_hex, prefix_len=prefix_len)
if base not in existing_names:
return base
n = 2
while f"{base}-{n}" in existing_names:
n += 1
return f"{base}-{n}"
def find_companion_index(
companions: List[dict],
*,
name: Optional[str] = None,
identity_key: Optional[str] = None,
public_key_prefix: Optional[str] = None,
) -> Tuple[Optional[int], Optional[str]]:
"""
Find a single companion list index.
Lookup priority when multiple fields are set:
1) name (non-empty after strip)
2) identity_key (full hex or unique prefix)
3) public_key_prefix (unique prefix of derived public key hex)
Returns (index, None) on success, or (None, error_message) on failure.
"""
name_s = str(name).strip() if name is not None else ""
idk = str(identity_key).strip() if identity_key is not None else ""
pkp = str(public_key_prefix).strip() if public_key_prefix is not None else ""
if pkp.lower().startswith("0x"):
pkp = pkp[2:].strip()
pkp = pkp.lower()
if idk:
idk = normalize_companion_identity_key(idk).lower()
if name_s:
matches = [i for i, c in enumerate(companions) if _companion_registration_name(c) == name_s]
if len(matches) == 1:
return matches[0], None
if len(matches) == 0:
return None, f"Companion '{name_s}' not found"
return None, f"Multiple companions named '{name_s}'"
if idk:
if len(idk) < _MIN_PREFIX_HEX_LEN:
return None, (
f"identity_key lookup must be at least {_MIN_PREFIX_HEX_LEN} hex characters"
)
exact: List[int] = []
prefix_matches: List[int] = []
for i, c in enumerate(companions):
h = identity_key_hex_normalized(c.get("identity_key"))
if not h:
continue
if h == idk:
exact.append(i)
elif h.startswith(idk):
prefix_matches.append(i)
if len(exact) == 1:
return exact[0], None
if len(exact) > 1:
return None, "Multiple companions match identity_key (ambiguous)"
if len(prefix_matches) == 1:
return prefix_matches[0], None
if len(prefix_matches) == 0:
return None, "No companion matches identity_key"
return None, "Multiple companions match identity_key prefix (ambiguous)"
if pkp:
if len(pkp) < _MIN_PREFIX_HEX_LEN:
return None, (
f"public_key_prefix must be at least {_MIN_PREFIX_HEX_LEN} hex characters"
)
matches: List[int] = []
for i, c in enumerate(companions):
pub = derive_companion_public_key_hex(c.get("identity_key"))
if pub and pub.lower().startswith(pkp):
matches.append(i)
if len(matches) == 1:
return matches[0], None
if len(matches) == 0:
return None, "No companion matches public_key_prefix"
return None, "Multiple companions match public_key_prefix (ambiguous)"
return None, "Missing companion lookup: provide name, identity_key, or public_key_prefix"
def heal_companion_empty_names(companions: List[dict]) -> bool:
"""
Assign companion_<pubkeyPrefix> names to entries with missing/blank registration names.
Mutates companions in place. Returns True if any entry was updated.
"""
names_in_use: Set[str] = set()
for c in companions:
n = _companion_registration_name(c)
if n:
names_in_use.add(n)
changed = False
for entry in companions:
if _companion_registration_name(entry):
continue
pk = derive_companion_public_key_hex(entry.get("identity_key"))
if not pk:
logger.warning("Skipping companion name heal: invalid or missing identity_key")
continue
new_name = unique_suggested_name(pk, names_in_use)
entry["name"] = new_name
names_in_use.add(new_name)
changed = True
return changed

View File

@@ -10,6 +10,11 @@ import cherrypy
from pymc_core.protocol import CryptoUtils
from repeater import __version__
from repeater.companion.identity_resolve import (
derive_companion_public_key_hex,
find_companion_index,
heal_companion_empty_names,
)
from repeater.config import update_global_flood_policy
from .auth.middleware import require_auth
@@ -2257,6 +2262,17 @@ class APIEndpoints:
identities_config = self.config.get("identities", {})
room_servers = identities_config.get("room_servers") or []
companions_cfg = identities_config.get("companions") or []
if heal_companion_empty_names(companions_cfg):
self.config.setdefault("identities", {})["companions"] = companions_cfg
if self.config_manager:
if self.config_manager.save_to_file():
logger.info(
"Healed companion registration name(s): empty name -> companion_<pubkeyPrefix>"
)
else:
logger.warning("Failed to save config after healing companion name(s)")
# Enhance with config data (room servers)
configured = []
for room_config in room_servers:
@@ -2289,7 +2305,11 @@ class APIEndpoints:
configured_companions = []
for comp_config in companions:
name = comp_config.get("name")
identity_key = comp_config.get("identity_key", "")
raw_ik = comp_config.get("identity_key", "")
if isinstance(raw_ik, bytes):
ik_hex = raw_ik.hex()
else:
ik_hex = str(raw_ik)
settings = comp_config.get("settings", {})
matching = next(
@@ -2301,17 +2321,23 @@ class APIEndpoints:
None,
)
pk_display = None
if matching:
pk_display = matching.get("public_key")
else:
pk_display = derive_companion_public_key_hex(comp_config.get("identity_key"))
configured_companions.append(
{
"name": name,
"type": "companion",
"identity_key": (
identity_key[:16] + "..." if len(identity_key) > 16 else identity_key
ik_hex[:16] + "..." if len(ik_hex) > 16 else ik_hex
),
"identity_key_length": len(identity_key),
"identity_key_length": len(ik_hex),
"settings": settings,
"hash": matching["hash"] if matching else None,
"public_key": matching.get("public_key") if matching else None,
"public_key": pk_display,
"registered": matching is not None,
}
)
@@ -2412,7 +2438,8 @@ class APIEndpoints:
self._require_post()
data = cherrypy.request.json or {}
name = data.get("name")
raw_name = data.get("name")
name = str(raw_name).strip() if raw_name is not None else ""
identity_key = data.get("identity_key")
identity_type = data.get("type", "room_server")
settings = data.get("settings", {})
@@ -2463,7 +2490,7 @@ class APIEndpoints:
return self._error("Companion identity_key must be a valid hex string")
companions = identities_config.get("companions") or []
if any(c.get("name") == name for c in companions):
if any(str(c.get("name") or "").strip() == name for c in companions):
return self._error(f"Companion with name '{name}' already exists")
comp_settings = {
@@ -2484,7 +2511,7 @@ class APIEndpoints:
else:
# Room server
room_servers = identities_config.get("room_servers") or []
if any(r.get("name") == name for r in room_servers):
if any(str(r.get("name") or "").strip() == name for r in room_servers):
return self._error(f"Identity with name '{name}' already exists")
new_identity = {
@@ -2625,8 +2652,9 @@ class APIEndpoints:
data = cherrypy.request.json or {}
name = data.get("name")
if not name:
return self._error("Missing required field: name")
name_s = str(name).strip() if name is not None else ""
lookup_identity_key = data.get("lookup_identity_key")
public_key_prefix = data.get("public_key_prefix")
identity_type = data.get("type", "room_server")
if identity_type not in ["room_server", "companion"]:
@@ -2638,17 +2666,26 @@ class APIEndpoints:
if identity_type == "companion":
companions = identities_config.get("companions") or []
identity_index = next(
(i for i, c in enumerate(companions) if c.get("name") == name), None
)
if identity_index is None:
return self._error(f"Companion '{name}' not found")
if name_s:
identity_index, err = find_companion_index(companions, name=name_s)
else:
identity_index, err = find_companion_index(
companions,
identity_key=lookup_identity_key,
public_key_prefix=public_key_prefix,
)
if err:
return self._error(err)
identity = companions[identity_index]
resolved_name = str(identity.get("name") or "").strip()
if "new_name" in data:
new_name = data["new_name"]
new_name = str(new_name).strip() if new_name is not None else ""
if not new_name:
return self._error("new_name cannot be empty")
if any(
c.get("name") == new_name
str(c.get("name") or "").strip() == new_name
for i, c in enumerate(companions)
if i != identity_index
):
@@ -2662,7 +2699,9 @@ class APIEndpoints:
key_bytes = bytes.fromhex(new_key)
if len(key_bytes) in (32, 64):
identity["identity_key"] = new_key
logger.info(f"Updated identity_key for companion '{name}'")
logger.info(
f"Updated identity_key for companion '{resolved_name}'"
)
except ValueError:
pass
@@ -2679,27 +2718,41 @@ class APIEndpoints:
saved = self.config_manager.save_to_file()
if not saved:
return self._error("Failed to save configuration to file")
logger.info(f"Updated companion: {name}")
message = f"Companion '{name}' updated successfully. Restart required to apply changes."
logger.info(f"Updated companion: {resolved_name}")
message = (
f"Companion '{resolved_name}' updated successfully. "
"Restart required to apply changes."
)
return self._success(identity, message=message)
# Room server path
if not name_s:
return self._error("Missing required field: name")
room_servers = identities_config.get("room_servers") or []
identity_index = next(
(i for i, r in enumerate(room_servers) if r.get("name") == name), None
(
i
for i, r in enumerate(room_servers)
if str(r.get("name") or "").strip() == name_s
),
None,
)
if identity_index is None:
return self._error(f"Identity '{name}' not found")
return self._error(f"Identity '{name_s}' not found")
# Update fields
identity = room_servers[identity_index]
if "new_name" in data:
new_name = data["new_name"]
new_name = str(new_name).strip() if new_name is not None else ""
if not new_name:
return self._error("new_name cannot be empty")
# Check if new name conflicts
if any(
r.get("name") == new_name
str(r.get("name") or "").strip() == new_name
for i, r in enumerate(room_servers)
if i != identity_index
):
@@ -2716,7 +2769,7 @@ class APIEndpoints:
# Validate it's proper hex
bytes.fromhex(new_key)
identity["identity_key"] = new_key
logger.info(f"Updated identity_key for '{name}'")
logger.info(f"Updated identity_key for '{name_s}'")
except ValueError:
# Invalid hex, silently ignore
pass
@@ -2741,7 +2794,7 @@ class APIEndpoints:
if not saved:
return self._error("Failed to save configuration to file")
logger.info(f"Updated identity: {name}")
logger.info(f"Updated identity: {name_s}")
# Hot reload - re-register identity if key changed or name changed
registration_success = False
@@ -2795,18 +2848,18 @@ class APIEndpoints:
except Exception as reg_error:
logger.error(
f"Failed to hot reload identity {name}: {reg_error}", exc_info=True
f"Failed to hot reload identity {name_s}: {reg_error}", exc_info=True
)
if needs_reload:
message = (
f"Identity '{name}' updated successfully and changes applied immediately!"
f"Identity '{name_s}' updated successfully and changes applied immediately!"
if registration_success
else f"Identity '{name}' updated successfully. Restart required to apply changes."
else f"Identity '{name_s}' updated successfully. Restart required to apply changes."
)
else:
message = (
f"Identity '{name}' updated successfully (settings only, no reload needed)."
f"Identity '{name_s}' updated successfully (settings only, no reload needed)."
)
return self._success(identity, message=message)
@@ -2819,9 +2872,10 @@ class APIEndpoints:
@cherrypy.expose
@cherrypy.tools.json_out()
def delete_identity(self, name=None, type=None):
def delete_identity(self, name=None, type=None, lookup_identity_key=None, public_key_prefix=None):
"""
DELETE /api/delete_identity?name=<name>&type=<room_server|companion> - Delete an identity
Companions may also be deleted with lookup_identity_key or public_key_prefix when name is empty.
"""
# Enable CORS for this endpoint only if configured
self._set_cors_headers()
@@ -2835,8 +2889,7 @@ class APIEndpoints:
cherrypy.response.headers["Allow"] = "DELETE"
raise cherrypy.HTTPError(405, "Method not allowed. This endpoint requires DELETE.")
if not name:
return self._error("Missing name parameter")
name_s = str(name).strip() if name is not None else ""
identity_type = (type or "room_server").lower()
if identity_type not in ["room_server", "companion"]:
@@ -2847,39 +2900,59 @@ class APIEndpoints:
identities_config = self.config.get("identities", {})
if identity_type == "companion":
if not name_s and not lookup_identity_key and not public_key_prefix:
return self._error(
"Missing name parameter or lookup_identity_key or public_key_prefix"
)
companions = identities_config.get("companions") or []
initial_count = len(companions)
companions = [c for c in companions if c.get("name") != name]
if len(companions) == initial_count:
return self._error(f"Companion '{name}' not found")
if name_s:
idx, err = find_companion_index(companions, name=name_s)
else:
idx, err = find_companion_index(
companions,
identity_key=lookup_identity_key,
public_key_prefix=public_key_prefix,
)
if err:
return self._error(err)
resolved_name = str(companions[idx].get("name") or "").strip()
companions.pop(idx)
self.config["identities"]["companions"] = companions
saved = self.config_manager.save_to_file()
if not saved:
return self._error("Failed to save configuration to file")
logger.info(f"Deleted companion: {name}")
logger.info(f"Deleted companion: {resolved_name}")
unregister_success = False
if self.daemon_instance and hasattr(self.daemon_instance, "identity_manager"):
identity_manager = self.daemon_instance.identity_manager
if name in identity_manager.named_identities:
del identity_manager.named_identities[name]
logger.info(f"Removed companion {name} from named_identities")
if resolved_name and resolved_name in identity_manager.named_identities:
del identity_manager.named_identities[resolved_name]
logger.info(f"Removed companion {resolved_name} from named_identities")
unregister_success = True
message = (
f"Companion '{name}' deleted successfully and deactivated immediately!"
f"Companion '{resolved_name}' deleted successfully and deactivated immediately!"
if unregister_success
else f"Companion '{name}' deleted successfully. Restart required to fully remove."
else (
f"Companion '{resolved_name}' deleted successfully. "
"Restart required to fully remove."
)
)
return self._success({"name": name}, message=message)
return self._success({"name": resolved_name}, message=message)
# Room server path
if not name_s:
return self._error("Missing name parameter")
room_servers = identities_config.get("room_servers") or []
# Find and remove the identity
initial_count = len(room_servers)
room_servers = [r for r in room_servers if r.get("name") != name]
room_servers = [
r for r in room_servers if str(r.get("name") or "").strip() != name_s
]
if len(room_servers) == initial_count:
return self._error(f"Identity '{name}' not found")
return self._error(f"Identity '{name_s}' not found")
# Update config
self.config["identities"]["room_servers"] = room_servers
@@ -2888,7 +2961,7 @@ class APIEndpoints:
if not saved:
return self._error("Failed to save configuration to file")
logger.info(f"Deleted identity: {name}")
logger.info(f"Deleted identity: {name_s}")
unregister_success = False
if self.daemon_instance:
@@ -2897,9 +2970,9 @@ class APIEndpoints:
identity_manager = self.daemon_instance.identity_manager
# Remove from named_identities dict
if name in identity_manager.named_identities:
del identity_manager.named_identities[name]
logger.info(f"Removed identity {name} from named_identities")
if name_s in identity_manager.named_identities:
del identity_manager.named_identities[name_s]
logger.info(f"Removed identity {name_s} from named_identities")
unregister_success = True
# Note: We don't remove from identities dict (keyed by hash)
@@ -2909,16 +2982,16 @@ class APIEndpoints:
except Exception as unreg_error:
logger.error(
f"Failed to unregister identity {name}: {unreg_error}", exc_info=True
f"Failed to unregister identity {name_s}: {unreg_error}", exc_info=True
)
message = (
f"Identity '{name}' deleted successfully and deactivated immediately!"
f"Identity '{name_s}' deleted successfully and deactivated immediately!"
if unregister_success
else f"Identity '{name}' deleted successfully. Restart required to fully remove."
else f"Identity '{name_s}' deleted successfully. Restart required to fully remove."
)
return self._success({"name": name}, message=message)
return self._success({"name": name_s}, message=message)
except cherrypy.HTTPError:
raise

View File

@@ -1304,7 +1304,9 @@ paths:
post:
tags: [Identities]
summary: Create new identity
description: Create a new repeater or room server identity
description: |
Create a new repeater or room server identity.
`name` must be non-empty after trimming leading/trailing whitespace (whitespace-only values are rejected).
requestBody:
required: true
content:
@@ -1318,7 +1320,9 @@ paths:
minLength: 1
maxLength: 64
pattern: '^[a-zA-Z0-9_\-\s]+$'
description: Identity name (alphanumeric, spaces, hyphens, underscores)
description: |
Identity registration name (alphanumeric, spaces, hyphens, underscores).
Trimmed; must not be empty or whitespace-only.
example: "General Chat"
identity_key:
type: string
@@ -1408,6 +1412,10 @@ paths:
get:
tags: [Identities]
summary: List all identities
description: |
Returns configured room servers and companions plus runtime registration info.
Companion entries with missing or blank registration names are assigned a stable
`companion_<pubkeyPrefix>` name derived from the identity key and persisted when possible.
responses:
'200':
description: List of identities
@@ -1455,7 +1463,11 @@ paths:
put:
tags: [Identities]
summary: Update identity
description: Modify an existing identity's configuration
description: |
Modify an existing identity's configuration.
For `room_server`, `name` is required to locate the identity.
For `companion`, provide `name` OR `lookup_identity_key` OR `public_key_prefix`
(at least 8 hex characters for prefix lookups) when the registration name is unknown.
security:
- BearerAuth: []
- ApiKeyAuth: []
@@ -1465,14 +1477,26 @@ paths:
application/json:
schema:
type: object
required: [name]
properties:
type:
type: string
enum: [room_server, companion]
default: room_server
name:
type: string
description: Current identity name
description: Current identity registration name (required for room_server; optional for companion if lookup fields are set)
lookup_identity_key:
type: string
description: |
Companion only: hex private identity key (full or unique prefix, min 8 hex chars)
to locate the companion when `name` is omitted.
public_key_prefix:
type: string
description: |
Companion only: ed25519 public key hex prefix (min 8 hex chars) to locate the companion when `name` is omitted.
new_name:
type: string
description: New identity name (optional)
description: New identity name (optional; must be non-empty if provided)
identity_key:
type: string
description: New identity key (optional)
@@ -1491,17 +1515,42 @@ paths:
delete:
tags: [Identities]
summary: Delete identity
description: Remove an identity from the system
description: |
Remove an identity from the system.
For `room_server`, `name` is required.
For `companion`, provide `name` OR `lookup_identity_key` OR `public_key_prefix`
(at least 8 hex characters for prefix lookups).
security:
- BearerAuth: []
- ApiKeyAuth: []
parameters:
- name: name
in: query
required: true
required: false
schema:
type: string
description: Identity name to delete
description: Identity registration name to delete (required for room_server)
- name: type
in: query
required: false
schema:
type: string
enum: [room_server, companion]
description: Identity kind (default room_server)
- name: lookup_identity_key
in: query
required: false
schema:
type: string
description: |
Companion only: hex identity key (full or unique prefix, min 8 hex chars) when `name` is omitted.
- name: public_key_prefix
in: query
required: false
schema:
type: string
description: |
Companion only: public key hex prefix (min 8 hex chars) when `name` is omitted.
responses:
'200':
description: Identity deleted

View File

@@ -0,0 +1,202 @@
"""Unit tests for companion identity healing and lookup (identity_resolve)."""
import os
import pytest
from repeater.companion.identity_resolve import (
derive_companion_public_key_hex,
find_companion_index,
heal_companion_empty_names,
identity_key_bytes_from_config,
identity_key_hex_normalized,
suggest_companion_name_from_pubkey,
unique_suggested_name,
)
def _seed32() -> bytes:
return os.urandom(32)
@pytest.fixture
def key_a_hex() -> str:
return _seed32().hex()
@pytest.fixture
def key_b_hex() -> str:
return _seed32().hex()
def test_identity_key_bytes_from_config_hex_string():
raw = _seed32()
h = raw.hex()
assert identity_key_bytes_from_config(h) == raw
assert identity_key_bytes_from_config("0x" + h) == raw
def test_identity_key_bytes_from_config_raw_bytes():
raw = _seed32()
assert identity_key_bytes_from_config(raw) == raw
assert identity_key_bytes_from_config(bytearray(raw)) == raw
def test_identity_key_bytes_from_config_invalid():
assert identity_key_bytes_from_config(None) is None
assert identity_key_bytes_from_config("nothex") is None
assert identity_key_bytes_from_config("ab") is None # wrong length
def test_identity_key_hex_normalized_matches_bytes():
raw = _seed32()
assert identity_key_hex_normalized(raw.hex()) == raw.hex().lower()
assert identity_key_hex_normalized(raw) == raw.hex().lower()
def test_derive_companion_public_key_hex_32_byte_seed(key_a_hex: str):
pub = derive_companion_public_key_hex(key_a_hex)
assert pub is not None
assert len(pub) == 64
assert int(pub, 16) >= 0
def test_derive_companion_public_key_hex_invalid():
assert derive_companion_public_key_hex(None) is None
assert derive_companion_public_key_hex("") is None
def test_suggest_companion_name_from_pubkey():
pub = "a" * 64
assert suggest_companion_name_from_pubkey(pub) == "companion_aaaaaaaa"
assert suggest_companion_name_from_pubkey("0x" + "b" * 64) == "companion_bbbbbbbb"
def test_unique_suggested_name_collision_suffix(key_a_hex: str):
pub = derive_companion_public_key_hex(key_a_hex)
assert pub is not None
base = suggest_companion_name_from_pubkey(pub)
assert unique_suggested_name(pub, {base}) == f"{base}-2"
assert unique_suggested_name(pub, {base, f"{base}-2"}) == f"{base}-3"
def test_heal_companion_empty_names_sets_stable_name(key_a_hex: str):
companions = [{"name": "", "identity_key": key_a_hex, "settings": {}}]
assert heal_companion_empty_names(companions) is True
assert companions[0]["name"].startswith("companion_")
assert len(companions[0]["name"]) > len("companion_")
def test_heal_companion_empty_names_whitespace_name_treated_as_empty(key_a_hex: str):
companions = [{"name": " ", "identity_key": key_a_hex, "settings": {}}]
assert heal_companion_empty_names(companions) is True
assert companions[0]["name"].startswith("companion_")
def test_heal_companion_no_op_when_name_present(key_a_hex: str):
companions = [{"name": "MyCompanion", "identity_key": key_a_hex, "settings": {}}]
assert heal_companion_empty_names(companions) is False
assert companions[0]["name"] == "MyCompanion"
def test_heal_skips_row_without_derivable_key():
companions = [{"name": "", "identity_key": "dead", "settings": {}}]
assert heal_companion_empty_names(companions) is False
assert companions[0].get("name") in ("", None) or str(companions[0].get("name")).strip() == ""
def test_heal_two_unnamed_get_distinct_names(key_a_hex: str, key_b_hex: str):
companions = [
{"name": "", "identity_key": key_a_hex, "settings": {}},
{"name": None, "identity_key": key_b_hex, "settings": {}},
]
assert heal_companion_empty_names(companions) is True
assert companions[0]["name"] != companions[1]["name"]
def test_find_companion_index_by_name(key_a_hex: str):
companions = [{"name": "c1", "identity_key": key_a_hex, "settings": {}}]
idx, err = find_companion_index(companions, name="c1")
assert err is None and idx == 0
def test_find_companion_index_name_not_found(key_a_hex: str):
companions = [{"name": "c1", "identity_key": key_a_hex, "settings": {}}]
idx, err = find_companion_index(companions, name="nope")
assert idx is None and "not found" in (err or "")
def test_find_companion_index_duplicate_names_error(key_a_hex: str, key_b_hex: str):
companions = [
{"name": "dup", "identity_key": key_a_hex, "settings": {}},
{"name": "dup", "identity_key": key_b_hex, "settings": {}},
]
idx, err = find_companion_index(companions, name="dup")
assert idx is None and err and "Multiple" in err
def test_find_companion_index_by_full_identity_key(key_a_hex: str):
companions = [{"name": "x", "identity_key": key_a_hex, "settings": {}}]
idx, err = find_companion_index(companions, identity_key=key_a_hex.lower())
assert err is None and idx == 0
def test_find_companion_index_by_identity_key_prefix(key_a_hex: str):
companions = [{"name": "x", "identity_key": key_a_hex, "settings": {}}]
prefix = key_a_hex[:16]
idx, err = find_companion_index(companions, identity_key=prefix)
assert err is None and idx == 0
def test_find_companion_index_identity_key_too_short():
companions = [{"name": "x", "identity_key": "a" * 64, "settings": {}}]
idx, err = find_companion_index(companions, identity_key="abcd")
assert idx is None and err and "at least" in err
def test_find_companion_index_identity_key_ambiguous_prefix():
"""Two keys sharing the same 8-hex prefix should make prefix lookup ambiguous."""
shared = "aabbccdd"
key_a_hex = shared + os.urandom(28).hex()
key_b_hex = shared + os.urandom(28).hex()
assert len(key_a_hex) == 64 and len(key_b_hex) == 64
assert key_a_hex != key_b_hex
companions = [
{"name": "a1", "identity_key": key_a_hex, "settings": {}},
{"name": "a2", "identity_key": key_b_hex, "settings": {}},
]
idx, err = find_companion_index(companions, identity_key=shared)
assert idx is None and err and "ambiguous" in err
def test_find_companion_index_by_public_key_prefix(key_a_hex: str):
pub = derive_companion_public_key_hex(key_a_hex)
assert pub is not None
companions = [{"name": "x", "identity_key": key_a_hex, "settings": {}}]
idx, err = find_companion_index(companions, public_key_prefix=pub[:16])
assert err is None and idx == 0
def test_find_companion_index_public_key_prefix_with_0x(key_a_hex: str):
pub = derive_companion_public_key_hex(key_a_hex)
assert pub is not None
companions = [{"name": "x", "identity_key": key_a_hex, "settings": {}}]
idx, err = find_companion_index(companions, public_key_prefix="0x" + pub[:16])
assert err is None and idx == 0
def test_find_companion_index_missing_lookup_fields():
companions = [{"name": "x", "identity_key": "a" * 64, "settings": {}}]
idx, err = find_companion_index(companions)
assert idx is None and err and "Missing companion lookup" in err
def test_name_lookup_takes_priority_over_identity_key(key_a_hex: str, key_b_hex: str):
"""When `name` is set, identity_key lookup is not used."""
companions = [
{"name": "first", "identity_key": key_a_hex, "settings": {}},
{"name": "second", "identity_key": key_b_hex, "settings": {}},
]
idx, err = find_companion_index(companions, name="first", identity_key=key_b_hex)
assert err is None and idx == 0