Merge pull request #89 from ipnet-mesh/claude/fix-admin-auth-bypass-atTWJ

Enforce authentication for admin API proxy mutations
This commit is contained in:
JingleManSweep
2026-02-10 08:51:40 +00:00
committed by GitHub
6 changed files with 168 additions and 0 deletions

View File

@@ -254,6 +254,20 @@ def create_app(
if h in request.headers:
headers[h] = request.headers[h]
# Block mutating requests from unauthenticated users when admin is
# enabled. OAuth2Proxy is expected to set X-Forwarded-User for
# authenticated sessions; without it, write operations must be
# rejected server-side to prevent auth bypass.
if (
request.method in ("POST", "PUT", "DELETE", "PATCH")
and request.app.state.admin_enabled
and not request.headers.get("x-forwarded-user")
):
return JSONResponse(
{"detail": "Authentication required"},
status_code=401,
)
try:
response = await client.request(
method=request.method,

View File

@@ -17,6 +17,17 @@ export async function render(container, params, router) {
return;
}
if (!config.is_authenticated) {
litRender(html`
<div class="flex flex-col items-center justify-center py-20">
${iconLock('h-16 w-16 opacity-30 mb-4')}
<h1 class="text-3xl font-bold mb-2">Authentication Required</h1>
<p class="opacity-70">You must sign in to access the admin interface.</p>
<a href="/oauth2/start?rd=${encodeURIComponent(window.location.pathname)}" class="btn btn-primary mt-6">Sign In</a>
</div>`, container);
return;
}
litRender(html`
<div class="flex items-center justify-between mb-4">
<div>

View File

@@ -20,6 +20,17 @@ export async function render(container, params, router) {
return;
}
if (!config.is_authenticated) {
litRender(html`
<div class="flex flex-col items-center justify-center py-20">
${iconLock('h-16 w-16 opacity-30 mb-4')}
<h1 class="text-3xl font-bold mb-2">Authentication Required</h1>
<p class="opacity-70">You must sign in to access the admin interface.</p>
<a href="/oauth2/start?rd=${encodeURIComponent(window.location.pathname)}" class="btn btn-primary mt-6">Sign In</a>
</div>`, container);
return;
}
const flashMessage = (params.query && params.query.message) || '';
const flashError = (params.query && params.query.error) || '';

View File

@@ -21,6 +21,17 @@ export async function render(container, params, router) {
return;
}
if (!config.is_authenticated) {
litRender(html`
<div class="flex flex-col items-center justify-center py-20">
${iconLock('h-16 w-16 opacity-30 mb-4')}
<h1 class="text-3xl font-bold mb-2">Authentication Required</h1>
<p class="opacity-70">You must sign in to access the admin interface.</p>
<a href="/oauth2/start?rd=${encodeURIComponent(window.location.pathname)}" class="btn btn-primary mt-6">Sign In</a>
</div>`, container);
return;
}
const selectedPublicKey = (params.query && params.query.public_key) || '';
const flashMessage = (params.query && params.query.message) || '';
const flashError = (params.query && params.query.error) || '';

View File

@@ -225,19 +225,42 @@ class MockHttpClient:
def _create_response(self, key: str) -> Response:
"""Create a mock response for a given key."""
import json as _json
response_data = self._responses.get(key)
if response_data is None:
# Return 404 for unknown endpoints
response = MagicMock(spec=Response)
response.status_code = 404
response.json.return_value = {"detail": "Not found"}
response.content = b'{"detail": "Not found"}'
response.headers = {"content-type": "application/json"}
return response
response = MagicMock(spec=Response)
response.status_code = response_data["status_code"]
response.json.return_value = response_data["json"]
response.content = _json.dumps(response_data["json"]).encode()
response.headers = {"content-type": "application/json"}
return response
async def request(
self,
method: str,
url: str,
params: dict | None = None,
content: bytes | None = None,
headers: dict | None = None,
) -> Response:
"""Mock generic request (used by API proxy)."""
key = f"{method.upper()}:{url}"
if key in self._responses:
return self._create_response(key)
# Try base path without query params
base_path = url.split("?")[0]
key = f"{method.upper()}:{base_path}"
return self._create_response(key)
async def get(self, path: str, params: dict | None = None) -> Response:
"""Mock GET request."""
# Try exact match first

View File

@@ -193,6 +193,104 @@ class TestAdminNodeTags:
assert "window.__APP_CONFIG__" in response.text
class TestAdminApiProxyAuth:
"""Tests for admin API proxy authentication enforcement.
When admin is enabled, mutating requests (POST/PUT/DELETE/PATCH) through
the API proxy must require authentication via X-Forwarded-User header.
This prevents unauthenticated users from performing admin operations
even though the web app's HTTP client has a service-level API key.
"""
def test_proxy_post_blocked_without_auth(self, admin_client, mock_http_client):
"""POST to API proxy returns 401 without auth headers."""
mock_http_client.set_response("POST", "/api/v1/members", 201, {"id": "new"})
response = admin_client.post(
"/api/v1/members",
json={"name": "Test", "member_id": "test"},
)
assert response.status_code == 401
assert "Authentication required" in response.json()["detail"]
def test_proxy_put_blocked_without_auth(self, admin_client, mock_http_client):
"""PUT to API proxy returns 401 without auth headers."""
mock_http_client.set_response("PUT", "/api/v1/members/1", 200, {"id": "1"})
response = admin_client.put(
"/api/v1/members/1",
json={"name": "Updated"},
)
assert response.status_code == 401
def test_proxy_delete_blocked_without_auth(self, admin_client, mock_http_client):
"""DELETE to API proxy returns 401 without auth headers."""
mock_http_client.set_response("DELETE", "/api/v1/members/1", 204, None)
response = admin_client.delete("/api/v1/members/1")
assert response.status_code == 401
def test_proxy_patch_blocked_without_auth(self, admin_client, mock_http_client):
"""PATCH to API proxy returns 401 without auth headers."""
mock_http_client.set_response("PATCH", "/api/v1/members/1", 200, {"id": "1"})
response = admin_client.patch(
"/api/v1/members/1",
json={"name": "Patched"},
)
assert response.status_code == 401
def test_proxy_post_allowed_with_auth(
self, admin_client, auth_headers, mock_http_client
):
"""POST to API proxy succeeds with auth headers."""
mock_http_client.set_response("POST", "/api/v1/members", 201, {"id": "new"})
response = admin_client.post(
"/api/v1/members",
json={"name": "Test", "member_id": "test"},
headers=auth_headers,
)
assert response.status_code == 201
def test_proxy_put_allowed_with_auth(
self, admin_client, auth_headers, mock_http_client
):
"""PUT to API proxy succeeds with auth headers."""
mock_http_client.set_response("PUT", "/api/v1/members/1", 200, {"id": "1"})
response = admin_client.put(
"/api/v1/members/1",
json={"name": "Updated"},
headers=auth_headers,
)
assert response.status_code == 200
def test_proxy_delete_allowed_with_auth(
self, admin_client, auth_headers, mock_http_client
):
"""DELETE to API proxy succeeds with auth headers."""
mock_http_client.set_response("DELETE", "/api/v1/members/1", 204, None)
response = admin_client.delete(
"/api/v1/members/1",
headers=auth_headers,
)
# 204 from the mock API
assert response.status_code == 204
def test_proxy_get_allowed_without_auth(self, admin_client, mock_http_client):
"""GET to API proxy is allowed without auth (read-only)."""
response = admin_client.get("/api/v1/nodes")
assert response.status_code == 200
def test_proxy_post_allowed_when_admin_disabled(
self, admin_client_disabled, mock_http_client
):
"""POST to API proxy allowed when admin is disabled (no proxy auth)."""
mock_http_client.set_response("POST", "/api/v1/members", 201, {"id": "new"})
response = admin_client_disabled.post(
"/api/v1/members",
json={"name": "Test", "member_id": "test"},
)
# Should reach the API (which may return its own auth error, but
# the proxy itself should not block it)
assert response.status_code == 201
class TestAdminFooterLink:
"""Tests for admin link in footer."""