Files
meshcore-hub/tests/test_api/test_auth.py
T
Louis King 27b9ec21f2 feat: replace admin tag page with inline editor on node detail
Replace the dedicated admin tag management page with inline tag editing
on the node detail page. Operators can now edit tags directly on nodes
they've adopted; admins retain unrestricted access.

Key changes:
- Remove admin SPA page (admin/index.js, admin/node-tags.js)
- Add inline tag editor to node-detail.js with add/edit/delete modals
- Replace RequireAdmin with RequireOperatorOrAdmin for tag API routes
- Add ownership check: operators restricted to adopted nodes only
- Add validate_and_coerce_tag_value for number/boolean coercion
- Remove unused bulk endpoints (copy, move, replace all)
- Use AbortController for event listeners to prevent accumulation
  on lit-html DOM reuse across re-renders
- Track Leaflet map instance at module scope for defensive cleanup
- Fix checkAuthResponse to only redirect on 401 (not 403)
- Update tests for new OIDC-based auth model
- Update en.json locale, i18n.md, upgrading.md, AGENTS.md
2026-05-03 21:25:53 +01:00

292 lines
11 KiB
Python

"""Tests for API authentication.
Verifies that constant-time key comparison (hmac.compare_digest) works
correctly with no behavioral regressions from the original == operator.
"""
import base64
def _make_basic_auth(username: str, password: str) -> str:
"""Create a Basic auth header value."""
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
return f"Basic {credentials}"
def _clear_metrics_cache() -> None:
"""Clear the metrics module cache."""
from meshcore_hub.api.metrics import _cache
_cache["output"] = b""
_cache["expires_at"] = 0.0
class TestReadAuthentication:
"""Tests for read-level authentication (require_read)."""
def test_no_auth_when_keys_not_configured(self, client_no_auth):
"""Test that no auth is required when keys are not configured."""
# All endpoints should work without auth
response = client_no_auth.get("/api/v1/nodes")
assert response.status_code == 200
response = client_no_auth.get("/api/v1/messages")
assert response.status_code == 200
def test_read_endpoints_accept_read_key(self, client_with_auth):
"""Test that read endpoints accept read key."""
response = client_with_auth.get(
"/api/v1/nodes",
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 200
def test_read_key_accepted_on_multiple_endpoints(self, client_with_auth):
"""Test that read key is accepted across different read endpoints."""
for endpoint in ["/api/v1/nodes", "/api/v1/messages"]:
response = client_with_auth.get(
endpoint,
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 200, f"Read key rejected on {endpoint}"
def test_read_endpoints_accept_admin_key(self, client_with_auth):
"""Test that admin key also grants read access."""
response = client_with_auth.get(
"/api/v1/nodes",
headers={"Authorization": "Bearer test-admin-key"},
)
assert response.status_code == 200
def test_admin_key_grants_read_on_multiple_endpoints(self, client_with_auth):
"""Test that admin key grants read access across different endpoints."""
for endpoint in ["/api/v1/nodes", "/api/v1/messages"]:
response = client_with_auth.get(
endpoint,
headers={"Authorization": "Bearer test-admin-key"},
)
assert (
response.status_code == 200
), f"Admin key rejected on read endpoint {endpoint}"
def test_invalid_key_rejected_on_read_endpoint(self, client_with_auth):
"""Test that invalid keys are rejected with 401 on read endpoints."""
response = client_with_auth.get(
"/api/v1/nodes",
headers={"Authorization": "Bearer invalid-key"},
)
assert response.status_code == 401
def test_no_auth_header_rejected_on_read_endpoint(self, client_with_auth):
"""Test that missing auth header is rejected on read endpoints."""
response = client_with_auth.get("/api/v1/nodes")
assert response.status_code == 401
def test_missing_bearer_prefix_rejected(self, client_with_auth):
"""Test that tokens without Bearer prefix are rejected."""
response = client_with_auth.get(
"/api/v1/nodes",
headers={"Authorization": "test-read-key"},
)
assert response.status_code == 401
def test_empty_auth_header_rejected(self, client_with_auth):
"""Test that empty auth headers are rejected."""
response = client_with_auth.get(
"/api/v1/nodes",
headers={"Authorization": ""},
)
assert response.status_code == 401
class TestAdminAuthentication:
"""Tests for operator/admin-level authentication (RequireOperatorOrAdmin).
Tag endpoints now require OIDC user identity (X-User-Id header) plus
API key. Plain API keys without user identity headers get 401.
"""
def test_tag_endpoint_rejects_admin_key_without_user_identity(
self, client_with_auth
):
"""Test that admin key alone is rejected — OIDC user identity required."""
response = client_with_auth.post(
"/api/v1/nodes/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/tags",
json={
"key": "name",
"value": "test-node",
},
headers={"Authorization": "Bearer test-admin-key"},
)
assert response.status_code == 401
def test_tag_endpoint_rejects_read_key_without_user_identity(
self, client_with_auth
):
"""Test that read key alone is rejected — OIDC user identity required."""
response = client_with_auth.post(
"/api/v1/nodes/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/tags",
json={
"key": "name",
"value": "test-node",
},
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 401
def test_tag_endpoint_rejects_invalid_key(self, client_with_auth):
"""Test that invalid keys are rejected with 401."""
response = client_with_auth.post(
"/api/v1/nodes/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/tags",
json={
"key": "name",
"value": "test-node",
},
headers={"Authorization": "Bearer completely-wrong-key"},
)
assert response.status_code == 401
def test_tag_endpoint_rejects_no_auth_header(self, client_with_auth):
"""Test that missing auth header is rejected with 401."""
response = client_with_auth.post(
"/api/v1/nodes/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/tags",
json={
"key": "name",
"value": "test-node",
},
)
assert response.status_code == 401
def test_tag_endpoint_rejects_admin_key_without_roles(self, client_with_auth):
"""Test admin key + user identity but no roles header → 403."""
response = client_with_auth.post(
"/api/v1/nodes/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/tags",
json={
"key": "name",
"value": "test-node",
},
headers={
"Authorization": "Bearer test-admin-key",
"X-User-Id": "user-123",
},
)
assert response.status_code == 403
def test_tag_endpoint_accepts_admin_key_with_admin_role(self, client_with_auth):
"""Test admin key + user identity + admin role → allowed."""
response = client_with_auth.post(
"/api/v1/nodes/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/tags",
json={
"key": "name",
"value": "test-node",
},
headers={
"Authorization": "Bearer test-admin-key",
"X-User-Id": "user-123",
"X-User-Roles": "admin",
},
)
assert response.status_code in (200, 201, 404, 422)
def test_tag_endpoint_operator_rejected_on_unadopted_node(
self, client_with_auth, sample_node
):
"""Test operator role on non-adopted node → 403 ownership check."""
response = client_with_auth.post(
f"/api/v1/nodes/{sample_node.public_key}/tags",
json={
"key": "name",
"value": "test-node",
},
headers={
"Authorization": "Bearer test-admin-key",
"X-User-Id": "user-123",
"X-User-Roles": "operator",
},
)
assert response.status_code == 403
class TestMetricsAuthentication:
"""Tests for metrics endpoint authentication (Basic auth with hmac.compare_digest)."""
def test_metrics_no_auth_when_no_read_key(self, client_no_auth):
"""Test that metrics requires no auth when no read key is configured."""
_clear_metrics_cache()
response = client_no_auth.get("/metrics")
assert response.status_code == 200
def test_metrics_accepts_valid_basic_auth(self, client_with_auth):
"""Test that metrics accepts correct Basic credentials."""
_clear_metrics_cache()
response = client_with_auth.get(
"/metrics",
headers={"Authorization": _make_basic_auth("metrics", "test-read-key")},
)
assert response.status_code == 200
def test_metrics_rejects_no_auth_when_key_set(self, client_with_auth):
"""Test 401 when read key is set but no auth provided."""
_clear_metrics_cache()
response = client_with_auth.get("/metrics")
assert response.status_code == 401
assert "WWW-Authenticate" in response.headers
def test_metrics_rejects_wrong_password(self, client_with_auth):
"""Test that metrics rejects incorrect password."""
_clear_metrics_cache()
response = client_with_auth.get(
"/metrics",
headers={"Authorization": _make_basic_auth("metrics", "wrong-key")},
)
assert response.status_code == 401
def test_metrics_rejects_wrong_username(self, client_with_auth):
"""Test that metrics rejects incorrect username."""
_clear_metrics_cache()
response = client_with_auth.get(
"/metrics",
headers={"Authorization": _make_basic_auth("admin", "test-read-key")},
)
assert response.status_code == 401
def test_metrics_rejects_bearer_auth(self, client_with_auth):
"""Test that Bearer auth does not work for metrics."""
_clear_metrics_cache()
response = client_with_auth.get(
"/metrics",
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 401
def test_metrics_rejects_admin_key_as_password(self, client_with_auth):
"""Test that admin key is not accepted as metrics password.
Metrics uses only the read key for Basic auth, not the admin key.
"""
_clear_metrics_cache()
response = client_with_auth.get(
"/metrics",
headers={
"Authorization": _make_basic_auth("metrics", "test-admin-key"),
},
)
assert response.status_code == 401
class TestHealthEndpoint:
"""Tests for health check endpoint."""
def test_health_no_auth(self, client_no_auth):
"""Test health endpoint without auth."""
response = client_no_auth.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
def test_health_with_auth_configured(self, client_with_auth):
"""Test health endpoint works even when auth is configured."""
# Health endpoint should always be accessible
response = client_with_auth.get("/health")
assert response.status_code == 200