mirror of
https://github.com/ipnet-mesh/meshcore-hub.git
synced 2026-06-11 08:44:53 +02:00
27b9ec21f2
Replace the dedicated admin tag management page with inline tag editing on the node detail page. Operators can now edit tags directly on nodes they've adopted; admins retain unrestricted access. Key changes: - Remove admin SPA page (admin/index.js, admin/node-tags.js) - Add inline tag editor to node-detail.js with add/edit/delete modals - Replace RequireAdmin with RequireOperatorOrAdmin for tag API routes - Add ownership check: operators restricted to adopted nodes only - Add validate_and_coerce_tag_value for number/boolean coercion - Remove unused bulk endpoints (copy, move, replace all) - Use AbortController for event listeners to prevent accumulation on lit-html DOM reuse across re-renders - Track Leaflet map instance at module scope for defensive cleanup - Fix checkAuthResponse to only redirect on 401 (not 403) - Update tests for new OIDC-based auth model - Update en.json locale, i18n.md, upgrading.md, AGENTS.md
408 lines
16 KiB
Python
408 lines
16 KiB
Python
"""Tests for OIDC authentication web routes."""
|
|
|
|
import json
|
|
from typing import Any
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
from meshcore_hub.web.oidc import init_oidc, strip_userinfo
|
|
|
|
|
|
class TestOIDCSettingsValidation:
|
|
"""Test OIDC configuration validation."""
|
|
|
|
def test_oidc_disabled_by_default(self, client: TestClient) -> None:
|
|
"""Test that OIDC is disabled by default."""
|
|
response = client.get("/")
|
|
assert response.status_code == 200
|
|
text = response.text
|
|
config = _extract_config(text)
|
|
assert config["oidc_enabled"] is False
|
|
assert config["user"] is None
|
|
assert config["roles"] == []
|
|
|
|
def test_oidc_enabled_config_injection(self, client_with_oidc: TestClient) -> None:
|
|
"""Test OIDC config injection when enabled (no session)."""
|
|
response = client_with_oidc.get("/")
|
|
assert response.status_code == 200
|
|
config = _extract_config(response.text)
|
|
assert config["oidc_enabled"] is True
|
|
assert config["user"] is None
|
|
assert config["roles"] == []
|
|
|
|
|
|
class TestAuthLogin:
|
|
"""Test /auth/login endpoint."""
|
|
|
|
def test_login_oidc_disabled(self, client: TestClient) -> None:
|
|
"""Test login returns 400 when OIDC disabled."""
|
|
response = client.get("/auth/login", follow_redirects=False)
|
|
assert response.status_code == 400
|
|
|
|
def test_login_oidc_enabled(self, client_with_oidc: TestClient) -> None:
|
|
"""Test login redirects to IdP when OIDC enabled."""
|
|
with patch(
|
|
"meshcore_hub.web.app.oauth.oidc.authorize_redirect",
|
|
new_callable=AsyncMock,
|
|
) as mock_redirect:
|
|
from starlette.responses import RedirectResponse
|
|
|
|
mock_redirect.return_value = RedirectResponse(
|
|
url="https://idp.example.com/authorize?state=abc"
|
|
)
|
|
response = client_with_oidc.get(
|
|
"/auth/login?next=/admin/node-tags", follow_redirects=False
|
|
)
|
|
assert response.status_code == 307
|
|
assert "idp.example.com" in response.headers["location"]
|
|
|
|
|
|
class TestAuthCallback:
|
|
"""Test /auth/callback endpoint."""
|
|
|
|
def test_callback_oidc_disabled(self, client: TestClient) -> None:
|
|
"""Test callback returns 400 when OIDC disabled."""
|
|
response = client.get("/auth/callback", follow_redirects=False)
|
|
assert response.status_code == 400
|
|
|
|
|
|
class TestAuthLogout:
|
|
"""Test /auth/logout endpoint."""
|
|
|
|
def test_logout_oidc_disabled(self, client: TestClient) -> None:
|
|
"""Test logout returns 400 when OIDC disabled."""
|
|
response = client.get("/auth/logout", follow_redirects=False)
|
|
assert response.status_code == 400
|
|
|
|
def test_logout_clears_session(
|
|
self, client_with_oidc_admin_session: TestClient
|
|
) -> None:
|
|
"""Test logout clears session and redirects via IdP."""
|
|
with patch(
|
|
"meshcore_hub.web.app.oauth.oidc.logout_redirect",
|
|
new_callable=AsyncMock,
|
|
) as mock_logout:
|
|
from starlette.responses import RedirectResponse
|
|
|
|
mock_logout.return_value = RedirectResponse(
|
|
url="https://idp.example.com/logout"
|
|
)
|
|
response = client_with_oidc_admin_session.get(
|
|
"/auth/logout", follow_redirects=False
|
|
)
|
|
assert response.status_code == 307
|
|
mock_logout.assert_called_once()
|
|
call_kwargs = mock_logout.call_args[1]
|
|
assert call_kwargs["client_id"] == "test-client-id"
|
|
assert "post_logout_redirect_uri" in call_kwargs
|
|
|
|
def test_logout_falls_back_to_base_url(
|
|
self, client_with_oidc_admin_session: TestClient
|
|
) -> None:
|
|
"""Test logout uses request.base_url when no redirect URI configured."""
|
|
with patch(
|
|
"meshcore_hub.web.app.oauth.oidc.logout_redirect",
|
|
new_callable=AsyncMock,
|
|
) as mock_logout:
|
|
from starlette.responses import RedirectResponse
|
|
|
|
mock_logout.return_value = RedirectResponse(
|
|
url="https://idp.example.com/logout"
|
|
)
|
|
response = client_with_oidc_admin_session.get(
|
|
"/auth/logout", follow_redirects=False
|
|
)
|
|
assert response.status_code == 307
|
|
call_kwargs = mock_logout.call_args[1]
|
|
assert "post_logout_redirect_uri" in call_kwargs
|
|
|
|
|
|
class TestAuthUser:
|
|
"""Test /auth/user endpoint."""
|
|
|
|
def test_user_oidc_disabled(self, client: TestClient) -> None:
|
|
"""Test user endpoint returns 400 when OIDC disabled."""
|
|
response = client.get("/auth/user")
|
|
assert response.status_code == 400
|
|
|
|
def test_user_not_authenticated(self, client_with_oidc: TestClient) -> None:
|
|
"""Test user endpoint returns 401 when not logged in."""
|
|
response = client_with_oidc.get("/auth/user")
|
|
assert response.status_code == 401
|
|
|
|
def test_user_admin_session(
|
|
self, client_with_oidc_admin_session: TestClient
|
|
) -> None:
|
|
"""Test user endpoint returns admin user."""
|
|
response = client_with_oidc_admin_session.get("/auth/user")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["user"]["name"] == "Admin User"
|
|
assert "admin" in data["roles"]
|
|
assert "member" in data["roles"]
|
|
|
|
def test_user_member_session(
|
|
self, client_with_oidc_member_session: TestClient
|
|
) -> None:
|
|
"""Test user endpoint returns member user."""
|
|
response = client_with_oidc_member_session.get("/auth/user")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["user"]["name"] == "Member User"
|
|
assert "admin" not in data["roles"]
|
|
assert "member" in data["roles"]
|
|
|
|
|
|
class TestAdminRouteProtection:
|
|
"""Test admin routes serve SPA shell (admin UI removed)."""
|
|
|
|
def test_no_session_gets_spa_shell(self, client_with_oidc: TestClient) -> None:
|
|
"""Test admin route returns SPA shell (client-side handles 404)."""
|
|
response = client_with_oidc.get("/admin/")
|
|
assert response.status_code == 200
|
|
assert "window.__APP_CONFIG__" in response.text
|
|
|
|
def test_member_session_gets_spa_shell(
|
|
self, client_with_oidc_member_session: TestClient
|
|
) -> None:
|
|
"""Test member session gets SPA shell."""
|
|
response = client_with_oidc_member_session.get("/admin/")
|
|
assert response.status_code == 200
|
|
assert "window.__APP_CONFIG__" in response.text
|
|
config = _extract_config(response.text)
|
|
assert config["oidc_enabled"] is True
|
|
assert "admin" not in config["roles"]
|
|
|
|
def test_admin_session_gets_spa_shell(
|
|
self, client_with_oidc_admin_session: TestClient
|
|
) -> None:
|
|
"""Test admin session gets SPA shell with admin config."""
|
|
response = client_with_oidc_admin_session.get("/admin/")
|
|
assert response.status_code == 200
|
|
config = _extract_config(response.text)
|
|
assert config["oidc_enabled"] is True
|
|
assert "admin" in config["roles"]
|
|
|
|
|
|
class TestAPIProxyWriteGating:
|
|
"""Test API proxy write method gating when OIDC enabled."""
|
|
|
|
def test_get_not_gated(self, client_with_oidc: TestClient) -> None:
|
|
"""Test GET requests to open endpoints are not gated."""
|
|
response = client_with_oidc.get("/api/v1/nodes")
|
|
assert response.status_code != 403
|
|
|
|
def test_post_blocked_for_member(
|
|
self, client_with_oidc_member_session: TestClient
|
|
) -> None:
|
|
"""Test POST to admin endpoint blocked for member session."""
|
|
response = client_with_oidc_member_session.post(
|
|
"/api/v1/nodes/some-node/tags",
|
|
json={"key": "test", "value": "test"},
|
|
)
|
|
assert response.status_code == 403
|
|
|
|
def test_put_allowed_for_admin(
|
|
self, client_with_oidc_admin_session: TestClient
|
|
) -> None:
|
|
"""Test PUT allowed for admin session."""
|
|
response = client_with_oidc_admin_session.put(
|
|
"/api/v1/user/profile/test-user-id",
|
|
json={"name": "test"},
|
|
)
|
|
assert response.status_code != 403
|
|
|
|
def test_write_blocked_when_oidc_disabled(self, client: TestClient) -> None:
|
|
"""Test write methods blocked when OIDC is disabled."""
|
|
response = client.put(
|
|
"/api/v1/user/profile/test-user-id",
|
|
json={"name": "test"},
|
|
)
|
|
assert response.status_code == 403
|
|
|
|
def test_read_open_when_oidc_disabled(self, client: TestClient) -> None:
|
|
"""Test GET to open endpoints allowed when OIDC is disabled."""
|
|
response = client.get("/api/v1/nodes")
|
|
assert response.status_code != 403
|
|
|
|
|
|
class TestBackwardCompatibility:
|
|
"""Test backward compatibility when OIDC is disabled."""
|
|
|
|
def test_oidc_disabled_config(self, client: TestClient) -> None:
|
|
"""Test config has no admin_enabled when OIDC disabled."""
|
|
response = client.get("/")
|
|
config = _extract_config(response.text)
|
|
assert "admin_enabled" not in config
|
|
assert config["oidc_enabled"] is False
|
|
assert config["roles"] == []
|
|
|
|
def test_admin_routes_serve_spa_shell_when_oidc_disabled(
|
|
self, client: TestClient
|
|
) -> None:
|
|
"""Test admin routes return SPA shell when OIDC disabled (client-side 404)."""
|
|
response = client.get("/admin/")
|
|
assert response.status_code == 200
|
|
assert "window.__APP_CONFIG__" in response.text
|
|
|
|
|
|
class TestConfigInjection:
|
|
"""Test config injection values for OIDC state."""
|
|
|
|
def test_admin_session_config(
|
|
self, client_with_oidc_admin_session: TestClient
|
|
) -> None:
|
|
"""Test admin session injects correct config values."""
|
|
response = client_with_oidc_admin_session.get("/")
|
|
config = _extract_config(response.text)
|
|
assert config["oidc_enabled"] is True
|
|
assert config["user"]["name"] == "Admin User"
|
|
assert "admin" in config["roles"]
|
|
assert "member" in config["roles"]
|
|
|
|
def test_member_session_config(
|
|
self, client_with_oidc_member_session: TestClient
|
|
) -> None:
|
|
"""Test member session injects correct config values."""
|
|
response = client_with_oidc_member_session.get("/")
|
|
config = _extract_config(response.text)
|
|
assert config["oidc_enabled"] is True
|
|
assert config["user"]["name"] == "Member User"
|
|
assert "admin" not in config["roles"]
|
|
assert "member" in config["roles"]
|
|
|
|
def test_no_session_config(self, client_with_oidc: TestClient) -> None:
|
|
"""Test no session injects correct config values."""
|
|
response = client_with_oidc.get("/")
|
|
config = _extract_config(response.text)
|
|
assert config["oidc_enabled"] is True
|
|
assert config["user"] is None
|
|
assert config["roles"] == []
|
|
|
|
|
|
class TestStripUserinfo:
|
|
"""Test strip_userinfo helper function."""
|
|
|
|
def test_name_from_name_claim(self) -> None:
|
|
"""Test name extracted from 'name' claim."""
|
|
userinfo = {"sub": "user-1", "name": "John Doe", "email": "john@example.com"}
|
|
result = strip_userinfo(userinfo, "roles")
|
|
assert result["name"] == "John Doe"
|
|
|
|
def test_name_from_preferred_username(self) -> None:
|
|
"""Test name falls back to 'preferred_username'."""
|
|
userinfo = {"sub": "user-1", "preferred_username": "johndoe"}
|
|
result = strip_userinfo(userinfo, "roles")
|
|
assert result["name"] == "johndoe"
|
|
|
|
def test_name_from_username(self) -> None:
|
|
"""Test name falls back to 'username' (LogTo-style)."""
|
|
userinfo = {"sub": "user-1", "username": "johndoe"}
|
|
result = strip_userinfo(userinfo, "roles")
|
|
assert result["name"] == "johndoe"
|
|
|
|
def test_name_from_nickname(self) -> None:
|
|
"""Test name falls back to 'nickname'."""
|
|
userinfo = {"sub": "user-1", "nickname": "johnny"}
|
|
result = strip_userinfo(userinfo, "roles")
|
|
assert result["name"] == "johnny"
|
|
|
|
def test_name_priority_order(self) -> None:
|
|
"""Test name claim priority: name > preferred_username > username > nickname."""
|
|
userinfo = {
|
|
"sub": "user-1",
|
|
"name": "Full Name",
|
|
"preferred_username": "pref",
|
|
"username": "user",
|
|
"nickname": "nick",
|
|
}
|
|
result = strip_userinfo(userinfo, "roles")
|
|
assert result["name"] == "Full Name"
|
|
|
|
def test_name_prefers_username_over_nickname(self) -> None:
|
|
"""Test username is preferred over nickname when name is absent."""
|
|
userinfo = {"sub": "user-1", "username": "logto_user", "nickname": "nick"}
|
|
result = strip_userinfo(userinfo, "roles")
|
|
assert result["name"] == "logto_user"
|
|
|
|
def test_name_none_when_all_missing(self) -> None:
|
|
"""Test name is None when no name-like claims present."""
|
|
userinfo = {"sub": "user-1", "email": "user@example.com"}
|
|
result = strip_userinfo(userinfo, "roles")
|
|
assert result["name"] is None
|
|
|
|
def test_roles_extracted(self) -> None:
|
|
"""Test roles are extracted from configured claim."""
|
|
userinfo = {"sub": "user-1", "custom_roles": ["admin", "member"]}
|
|
result = strip_userinfo(userinfo, "custom_roles")
|
|
assert result["custom_roles"] == ["admin", "member"]
|
|
|
|
def test_preserves_sub_email_picture(self) -> None:
|
|
"""Test sub, email, and picture are preserved."""
|
|
userinfo = {
|
|
"sub": "user-1",
|
|
"email": "user@example.com",
|
|
"picture": "https://example.com/avatar.png",
|
|
}
|
|
result = strip_userinfo(userinfo, "roles")
|
|
assert result["sub"] == "user-1"
|
|
assert result["email"] == "user@example.com"
|
|
assert result["picture"] == "https://example.com/avatar.png"
|
|
|
|
|
|
class TestInitOidcScopeParsing:
|
|
"""Test that init_oidc handles quoted and unquoted scope strings."""
|
|
|
|
def test_plain_scope_string(self) -> None:
|
|
"""Test unquoted scope string is split into list."""
|
|
with patch("meshcore_hub.web.oidc.oauth") as mock_oauth:
|
|
init_oidc(
|
|
"id", "secret", "https://idp.example.com/oidc", "openid email profile"
|
|
)
|
|
call_kwargs = mock_oauth.register.call_args[1]
|
|
assert call_kwargs["client_kwargs"]["scope"] == [
|
|
"openid",
|
|
"email",
|
|
"profile",
|
|
]
|
|
|
|
def test_double_quoted_scope_string(self) -> None:
|
|
"""Test double-quoted scope string (from Docker env) is stripped and split."""
|
|
with patch("meshcore_hub.web.oidc.oauth") as mock_oauth:
|
|
init_oidc(
|
|
"id",
|
|
"secret",
|
|
"https://idp.example.com/oidc",
|
|
'"openid email profile"',
|
|
)
|
|
call_kwargs = mock_oauth.register.call_args[1]
|
|
assert call_kwargs["client_kwargs"]["scope"] == [
|
|
"openid",
|
|
"email",
|
|
"profile",
|
|
]
|
|
|
|
def test_single_quoted_scope_string(self) -> None:
|
|
"""Test single-quoted scope string is stripped and split."""
|
|
with patch("meshcore_hub.web.oidc.oauth") as mock_oauth:
|
|
init_oidc(
|
|
"id",
|
|
"secret",
|
|
"https://idp.example.com/oidc",
|
|
"'openid email profile'",
|
|
)
|
|
call_kwargs = mock_oauth.register.call_args[1]
|
|
assert call_kwargs["client_kwargs"]["scope"] == [
|
|
"openid",
|
|
"email",
|
|
"profile",
|
|
]
|
|
|
|
|
|
def _extract_config(text: str) -> dict[str, Any]:
|
|
"""Extract __APP_CONFIG__ from SPA HTML."""
|
|
start = text.find("window.__APP_CONFIG__ = ") + len("window.__APP_CONFIG__ = ")
|
|
end = text.find(";", start)
|
|
return json.loads(text[start:end]) # type: ignore[no-any-return]
|