Update server-side keystore after key refresh

This commit is contained in:
Jack Kingsman
2026-02-23 19:33:17 -08:00
parent 152eab99db
commit 1bd31d68d9
2 changed files with 77 additions and 2 deletions

View File

@@ -128,9 +128,27 @@ async def set_private_key(update: PrivateKeyUpdate) -> dict:
async with radio_manager.radio_operation("import_private_key") as mc:
result = await mc.commands.import_private_key(key_bytes)
if result.type == EventType.ERROR:
if result.type == EventType.ERROR:
raise HTTPException(
status_code=500, detail=f"Failed to import private key: {result.payload}"
)
# Re-export from radio so the server-side keystore uses the new key
# for DM decryption immediately, rather than waiting for reconnect.
from app.keystore import export_and_store_private_key
keystore_refreshed = await export_and_store_private_key(mc)
if not keystore_refreshed:
logger.warning("Keystore refresh failed after import, retrying once")
keystore_refreshed = await export_and_store_private_key(mc)
if not keystore_refreshed:
raise HTTPException(
status_code=500, detail=f"Failed to import private key: {result.payload}"
status_code=500,
detail=(
"Private key imported on radio, but server-side keystore "
"refresh failed. Reconnect to apply the new key for DM decryption."
),
)
return {"status": "ok"}

View File

@@ -157,6 +157,63 @@ class TestPrivateKeyImport:
assert exc.value.status_code == 500
@pytest.mark.asyncio
async def test_successful_import_refreshes_keystore(self):
mc = _mock_meshcore_with_info()
mc.commands.import_private_key = AsyncMock(return_value=_radio_result())
with (
patch("app.routers.radio.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch(
"app.keystore.export_and_store_private_key",
new_callable=AsyncMock,
return_value=True,
) as mock_export,
):
result = await set_private_key(PrivateKeyUpdate(private_key="aa" * 64))
assert result == {"status": "ok"}
mock_export.assert_awaited_once_with(mc)
@pytest.mark.asyncio
async def test_import_ok_but_keystore_refresh_fails_returns_500(self):
mc = _mock_meshcore_with_info()
mc.commands.import_private_key = AsyncMock(return_value=_radio_result())
with (
patch("app.routers.radio.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch(
"app.keystore.export_and_store_private_key",
new_callable=AsyncMock,
return_value=False,
) as mock_export,
):
with pytest.raises(HTTPException) as exc:
await set_private_key(PrivateKeyUpdate(private_key="aa" * 64))
assert exc.value.status_code == 500
assert "keystore" in exc.value.detail.lower()
# Called twice: initial attempt + one retry
assert mock_export.await_count == 2
@pytest.mark.asyncio
async def test_keystore_refresh_succeeds_on_retry(self):
mc = _mock_meshcore_with_info()
mc.commands.import_private_key = AsyncMock(return_value=_radio_result())
with (
patch("app.routers.radio.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch(
"app.keystore.export_and_store_private_key",
new_callable=AsyncMock,
side_effect=[False, True],
) as mock_export,
):
result = await set_private_key(PrivateKeyUpdate(private_key="aa" * 64))
assert result == {"status": "ok"}
assert mock_export.await_count == 2
class TestAdvertise:
@pytest.mark.asyncio