Add access denied page for unauthenticated admin access

When users try to access /a/ without valid OAuth2Proxy headers (e.g.,
GitHub account not in org), they now see a friendly 403 page instead
of a 500 error. Added authentication checks to all admin routes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Louis King
2026-01-11 13:34:03 +00:00
parent 6901bafb02
commit 307f3935e0
3 changed files with 117 additions and 21 deletions

View File

@@ -55,6 +55,20 @@ def _get_auth_context(request: Request) -> dict:
}
def _is_authenticated(request: Request) -> bool:
"""Check if user is authenticated via OAuth2Proxy headers."""
return bool(
request.headers.get("X-Forwarded-User")
or request.headers.get("X-Forwarded-Email")
)
def _require_auth(request: Request) -> None:
"""Require authentication, raise 403 if not authenticated."""
if not _is_authenticated(request):
raise HTTPException(status_code=403, detail="Access denied")
@router.get("/", response_class=HTMLResponse)
async def admin_home(request: Request) -> HTMLResponse:
"""Render the admin page with OAuth2Proxy user info."""
@@ -65,6 +79,12 @@ async def admin_home(request: Request) -> HTMLResponse:
context["request"] = request
context.update(_get_auth_context(request))
# Check if user is authenticated
if not _is_authenticated(request):
return templates.TemplateResponse(
"admin/access_denied.html", context, status_code=403
)
return templates.TemplateResponse("admin/index.html", context)
@@ -83,6 +103,12 @@ async def admin_node_tags(
context["request"] = request
context.update(_get_auth_context(request))
# Check if user is authenticated
if not _is_authenticated(request):
return templates.TemplateResponse(
"admin/access_denied.html", context, status_code=403
)
# Flash messages from redirects
context["message"] = message
context["error"] = error
@@ -146,6 +172,7 @@ async def admin_create_node_tag(
) -> RedirectResponse:
"""Create a new node tag."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.post(
@@ -187,6 +214,7 @@ async def admin_update_node_tag(
) -> RedirectResponse:
"""Update an existing node tag."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.put(
@@ -224,6 +252,7 @@ async def admin_move_node_tag(
) -> RedirectResponse:
"""Move a node tag to a different node."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.put(
@@ -263,6 +292,7 @@ async def admin_delete_node_tag(
) -> RedirectResponse:
"""Delete a node tag."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.delete(

View File

@@ -0,0 +1,20 @@
{% extends "base.html" %}
{% block title %}{{ network_name }} - Access Denied{% endblock %}
{% block content %}
<div class="flex flex-col items-center justify-center min-h-[50vh]">
<div class="text-center">
<svg xmlns="http://www.w3.org/2000/svg" class="h-24 w-24 mx-auto text-error opacity-50 mb-6" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 15v2m-6 4h12a2 2 0 002-2v-6a2 2 0 00-2-2H6a2 2 0 00-2 2v6a2 2 0 002 2zm10-10V7a4 4 0 00-8 0v4h8z" />
</svg>
<h1 class="text-3xl font-bold mb-2">Access Denied</h1>
<p class="text-lg opacity-70 mb-6">You don't have permission to access the admin area.</p>
<p class="text-sm opacity-50 mb-8">Please contact the network administrator if you believe this is an error.</p>
<div class="flex gap-4 justify-center">
<a href="/" class="btn btn-primary">Return Home</a>
<a href="/oauth2/sign_out" class="btn btn-outline">Sign Out</a>
</div>
</div>
</div>
{% endblock %}

View File

@@ -163,6 +163,16 @@ def admin_app_disabled(mock_http_client_admin: MockHttpClient) -> Any:
return app
@pytest.fixture
def auth_headers() -> dict:
"""Authentication headers for admin requests."""
return {
"X-Forwarded-User": "test-user-id",
"X-Forwarded-Email": "test@example.com",
"X-Forwarded-Preferred-Username": "testuser",
}
@pytest.fixture
def admin_client(admin_app: Any, mock_http_client_admin: MockHttpClient) -> TestClient:
"""Create a test client with admin enabled."""
@@ -182,25 +192,31 @@ def admin_client_disabled(
class TestAdminHome:
"""Tests for admin home page."""
def test_admin_home_enabled(self, admin_client):
def test_admin_home_enabled(self, admin_client, auth_headers):
"""Test admin home page when enabled."""
response = admin_client.get("/a/")
response = admin_client.get("/a/", headers=auth_headers)
assert response.status_code == 200
assert "Admin" in response.text
assert "Node Tags" in response.text
def test_admin_home_disabled(self, admin_client_disabled):
def test_admin_home_disabled(self, admin_client_disabled, auth_headers):
"""Test admin home page when disabled."""
response = admin_client_disabled.get("/a/")
response = admin_client_disabled.get("/a/", headers=auth_headers)
assert response.status_code == 404
def test_admin_home_unauthenticated(self, admin_client):
"""Test admin home page without authentication."""
response = admin_client.get("/a/")
assert response.status_code == 403
assert "Access Denied" in response.text
class TestAdminNodeTags:
"""Tests for admin node tags page."""
def test_node_tags_page_no_selection(self, admin_client):
def test_node_tags_page_no_selection(self, admin_client, auth_headers):
"""Test node tags page without selecting a node."""
response = admin_client.get("/a/node-tags")
response = admin_client.get("/a/node-tags", headers=auth_headers)
assert response.status_code == 200
assert "Node Tags" in response.text
assert "Select a Node" in response.text
@@ -208,10 +224,11 @@ class TestAdminNodeTags:
assert "Node One" in response.text
assert "Node Two" in response.text
def test_node_tags_page_with_selection(self, admin_client):
def test_node_tags_page_with_selection(self, admin_client, auth_headers):
"""Test node tags page with a node selected."""
response = admin_client.get(
"/a/node-tags?public_key=abc123def456abc123def456abc123de"
"/a/node-tags?public_key=abc123def456abc123def456abc123de",
headers=auth_headers,
)
assert response.status_code == 200
assert "Node Tags" in response.text
@@ -221,34 +238,42 @@ class TestAdminNodeTags:
assert "location" in response.text
assert "building-a" in response.text
def test_node_tags_page_disabled(self, admin_client_disabled):
def test_node_tags_page_disabled(self, admin_client_disabled, auth_headers):
"""Test node tags page when admin is disabled."""
response = admin_client_disabled.get("/a/node-tags")
response = admin_client_disabled.get("/a/node-tags", headers=auth_headers)
assert response.status_code == 404
def test_node_tags_page_with_message(self, admin_client):
def test_node_tags_page_with_message(self, admin_client, auth_headers):
"""Test node tags page displays success message."""
response = admin_client.get(
"/a/node-tags?public_key=abc123def456abc123def456abc123de"
"&message=Tag%20created%20successfully"
"&message=Tag%20created%20successfully",
headers=auth_headers,
)
assert response.status_code == 200
assert "Tag created successfully" in response.text
def test_node_tags_page_with_error(self, admin_client):
def test_node_tags_page_with_error(self, admin_client, auth_headers):
"""Test node tags page displays error message."""
response = admin_client.get(
"/a/node-tags?public_key=abc123def456abc123def456abc123de"
"&error=Tag%20already%20exists"
"&error=Tag%20already%20exists",
headers=auth_headers,
)
assert response.status_code == 200
assert "Tag already exists" in response.text
def test_node_tags_page_unauthenticated(self, admin_client):
"""Test node tags page without authentication."""
response = admin_client.get("/a/node-tags")
assert response.status_code == 403
assert "Access Denied" in response.text
class TestAdminCreateTag:
"""Tests for creating node tags."""
def test_create_tag_success(self, admin_client):
def test_create_tag_success(self, admin_client, auth_headers):
"""Test creating a new tag."""
response = admin_client.post(
"/a/node-tags",
@@ -258,13 +283,14 @@ class TestAdminCreateTag:
"value": "new_value",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303
assert "message=" in response.headers["location"]
assert "created" in response.headers["location"]
def test_create_tag_disabled(self, admin_client_disabled):
def test_create_tag_disabled(self, admin_client_disabled, auth_headers):
"""Test creating tag when admin is disabled."""
response = admin_client_disabled.post(
"/a/node-tags",
@@ -274,15 +300,30 @@ class TestAdminCreateTag:
"value": "new_value",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 404
def test_create_tag_unauthenticated(self, admin_client):
"""Test creating tag without authentication."""
response = admin_client.post(
"/a/node-tags",
data={
"public_key": "abc123def456abc123def456abc123de",
"key": "new_tag",
"value": "new_value",
"value_type": "string",
},
follow_redirects=False,
)
assert response.status_code == 403
class TestAdminUpdateTag:
"""Tests for updating node tags."""
def test_update_tag_success(self, admin_client):
def test_update_tag_success(self, admin_client, auth_headers):
"""Test updating a tag."""
response = admin_client.post(
"/a/node-tags/update",
@@ -292,6 +333,7 @@ class TestAdminUpdateTag:
"value": "staging",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303
@@ -299,7 +341,7 @@ class TestAdminUpdateTag:
assert "updated" in response.headers["location"]
def test_update_tag_not_found(
self, admin_app, mock_http_client_admin: MockHttpClient
self, admin_app, mock_http_client_admin: MockHttpClient, auth_headers
):
"""Test updating a non-existent tag returns error."""
# Set up 404 response for this specific tag
@@ -320,13 +362,14 @@ class TestAdminUpdateTag:
"value": "value",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303
assert "error=" in response.headers["location"]
assert "not+found" in response.headers["location"].lower()
def test_update_tag_disabled(self, admin_client_disabled):
def test_update_tag_disabled(self, admin_client_disabled, auth_headers):
"""Test updating tag when admin is disabled."""
response = admin_client_disabled.post(
"/a/node-tags/update",
@@ -336,6 +379,7 @@ class TestAdminUpdateTag:
"value": "staging",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 404
@@ -344,7 +388,7 @@ class TestAdminUpdateTag:
class TestAdminMoveTag:
"""Tests for moving node tags."""
def test_move_tag_success(self, admin_client):
def test_move_tag_success(self, admin_client, auth_headers):
"""Test moving a tag to another node."""
response = admin_client.post(
"/a/node-tags/move",
@@ -353,6 +397,7 @@ class TestAdminMoveTag:
"key": "environment",
"new_public_key": "xyz789xyz789xyz789xyz789xyz789xy",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303
@@ -365,7 +410,7 @@ class TestAdminMoveTag:
class TestAdminDeleteTag:
"""Tests for deleting node tags."""
def test_delete_tag_success(self, admin_client):
def test_delete_tag_success(self, admin_client, auth_headers):
"""Test deleting a tag."""
response = admin_client.post(
"/a/node-tags/delete",
@@ -373,6 +418,7 @@ class TestAdminDeleteTag:
"public_key": "abc123def456abc123def456abc123de",
"key": "environment",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303