diff --git a/src/meshcore_hub/web/routes/admin.py b/src/meshcore_hub/web/routes/admin.py index cb34146..0583c0c 100644 --- a/src/meshcore_hub/web/routes/admin.py +++ b/src/meshcore_hub/web/routes/admin.py @@ -55,6 +55,20 @@ def _get_auth_context(request: Request) -> dict: } +def _is_authenticated(request: Request) -> bool: + """Check if user is authenticated via OAuth2Proxy headers.""" + return bool( + request.headers.get("X-Forwarded-User") + or request.headers.get("X-Forwarded-Email") + ) + + +def _require_auth(request: Request) -> None: + """Require authentication, raise 403 if not authenticated.""" + if not _is_authenticated(request): + raise HTTPException(status_code=403, detail="Access denied") + + @router.get("/", response_class=HTMLResponse) async def admin_home(request: Request) -> HTMLResponse: """Render the admin page with OAuth2Proxy user info.""" @@ -65,6 +79,12 @@ async def admin_home(request: Request) -> HTMLResponse: context["request"] = request context.update(_get_auth_context(request)) + # Check if user is authenticated + if not _is_authenticated(request): + return templates.TemplateResponse( + "admin/access_denied.html", context, status_code=403 + ) + return templates.TemplateResponse("admin/index.html", context) @@ -83,6 +103,12 @@ async def admin_node_tags( context["request"] = request context.update(_get_auth_context(request)) + # Check if user is authenticated + if not _is_authenticated(request): + return templates.TemplateResponse( + "admin/access_denied.html", context, status_code=403 + ) + # Flash messages from redirects context["message"] = message context["error"] = error @@ -146,6 +172,7 @@ async def admin_create_node_tag( ) -> RedirectResponse: """Create a new node tag.""" _check_admin_enabled(request) + _require_auth(request) try: response = await request.app.state.http_client.post( @@ -187,6 +214,7 @@ async def admin_update_node_tag( ) -> RedirectResponse: """Update an existing node tag.""" _check_admin_enabled(request) + _require_auth(request) try: response = await request.app.state.http_client.put( @@ -224,6 +252,7 @@ async def admin_move_node_tag( ) -> RedirectResponse: """Move a node tag to a different node.""" _check_admin_enabled(request) + _require_auth(request) try: response = await request.app.state.http_client.put( @@ -263,6 +292,7 @@ async def admin_delete_node_tag( ) -> RedirectResponse: """Delete a node tag.""" _check_admin_enabled(request) + _require_auth(request) try: response = await request.app.state.http_client.delete( diff --git a/src/meshcore_hub/web/templates/admin/access_denied.html b/src/meshcore_hub/web/templates/admin/access_denied.html new file mode 100644 index 0000000..0f42a4a --- /dev/null +++ b/src/meshcore_hub/web/templates/admin/access_denied.html @@ -0,0 +1,20 @@ +{% extends "base.html" %} + +{% block title %}{{ network_name }} - Access Denied{% endblock %} + +{% block content %} +
+
+ + + +

Access Denied

+

You don't have permission to access the admin area.

+

Please contact the network administrator if you believe this is an error.

+ +
+
+{% endblock %} diff --git a/tests/test_web/test_admin.py b/tests/test_web/test_admin.py index 29261aa..f049387 100644 --- a/tests/test_web/test_admin.py +++ b/tests/test_web/test_admin.py @@ -163,6 +163,16 @@ def admin_app_disabled(mock_http_client_admin: MockHttpClient) -> Any: return app +@pytest.fixture +def auth_headers() -> dict: + """Authentication headers for admin requests.""" + return { + "X-Forwarded-User": "test-user-id", + "X-Forwarded-Email": "test@example.com", + "X-Forwarded-Preferred-Username": "testuser", + } + + @pytest.fixture def admin_client(admin_app: Any, mock_http_client_admin: MockHttpClient) -> TestClient: """Create a test client with admin enabled.""" @@ -182,25 +192,31 @@ def admin_client_disabled( class TestAdminHome: """Tests for admin home page.""" - def test_admin_home_enabled(self, admin_client): + def test_admin_home_enabled(self, admin_client, auth_headers): """Test admin home page when enabled.""" - response = admin_client.get("/a/") + response = admin_client.get("/a/", headers=auth_headers) assert response.status_code == 200 assert "Admin" in response.text assert "Node Tags" in response.text - def test_admin_home_disabled(self, admin_client_disabled): + def test_admin_home_disabled(self, admin_client_disabled, auth_headers): """Test admin home page when disabled.""" - response = admin_client_disabled.get("/a/") + response = admin_client_disabled.get("/a/", headers=auth_headers) assert response.status_code == 404 + def test_admin_home_unauthenticated(self, admin_client): + """Test admin home page without authentication.""" + response = admin_client.get("/a/") + assert response.status_code == 403 + assert "Access Denied" in response.text + class TestAdminNodeTags: """Tests for admin node tags page.""" - def test_node_tags_page_no_selection(self, admin_client): + def test_node_tags_page_no_selection(self, admin_client, auth_headers): """Test node tags page without selecting a node.""" - response = admin_client.get("/a/node-tags") + response = admin_client.get("/a/node-tags", headers=auth_headers) assert response.status_code == 200 assert "Node Tags" in response.text assert "Select a Node" in response.text @@ -208,10 +224,11 @@ class TestAdminNodeTags: assert "Node One" in response.text assert "Node Two" in response.text - def test_node_tags_page_with_selection(self, admin_client): + def test_node_tags_page_with_selection(self, admin_client, auth_headers): """Test node tags page with a node selected.""" response = admin_client.get( - "/a/node-tags?public_key=abc123def456abc123def456abc123de" + "/a/node-tags?public_key=abc123def456abc123def456abc123de", + headers=auth_headers, ) assert response.status_code == 200 assert "Node Tags" in response.text @@ -221,34 +238,42 @@ class TestAdminNodeTags: assert "location" in response.text assert "building-a" in response.text - def test_node_tags_page_disabled(self, admin_client_disabled): + def test_node_tags_page_disabled(self, admin_client_disabled, auth_headers): """Test node tags page when admin is disabled.""" - response = admin_client_disabled.get("/a/node-tags") + response = admin_client_disabled.get("/a/node-tags", headers=auth_headers) assert response.status_code == 404 - def test_node_tags_page_with_message(self, admin_client): + def test_node_tags_page_with_message(self, admin_client, auth_headers): """Test node tags page displays success message.""" response = admin_client.get( "/a/node-tags?public_key=abc123def456abc123def456abc123de" - "&message=Tag%20created%20successfully" + "&message=Tag%20created%20successfully", + headers=auth_headers, ) assert response.status_code == 200 assert "Tag created successfully" in response.text - def test_node_tags_page_with_error(self, admin_client): + def test_node_tags_page_with_error(self, admin_client, auth_headers): """Test node tags page displays error message.""" response = admin_client.get( "/a/node-tags?public_key=abc123def456abc123def456abc123de" - "&error=Tag%20already%20exists" + "&error=Tag%20already%20exists", + headers=auth_headers, ) assert response.status_code == 200 assert "Tag already exists" in response.text + def test_node_tags_page_unauthenticated(self, admin_client): + """Test node tags page without authentication.""" + response = admin_client.get("/a/node-tags") + assert response.status_code == 403 + assert "Access Denied" in response.text + class TestAdminCreateTag: """Tests for creating node tags.""" - def test_create_tag_success(self, admin_client): + def test_create_tag_success(self, admin_client, auth_headers): """Test creating a new tag.""" response = admin_client.post( "/a/node-tags", @@ -258,13 +283,14 @@ class TestAdminCreateTag: "value": "new_value", "value_type": "string", }, + headers=auth_headers, follow_redirects=False, ) assert response.status_code == 303 assert "message=" in response.headers["location"] assert "created" in response.headers["location"] - def test_create_tag_disabled(self, admin_client_disabled): + def test_create_tag_disabled(self, admin_client_disabled, auth_headers): """Test creating tag when admin is disabled.""" response = admin_client_disabled.post( "/a/node-tags", @@ -274,15 +300,30 @@ class TestAdminCreateTag: "value": "new_value", "value_type": "string", }, + headers=auth_headers, follow_redirects=False, ) assert response.status_code == 404 + def test_create_tag_unauthenticated(self, admin_client): + """Test creating tag without authentication.""" + response = admin_client.post( + "/a/node-tags", + data={ + "public_key": "abc123def456abc123def456abc123de", + "key": "new_tag", + "value": "new_value", + "value_type": "string", + }, + follow_redirects=False, + ) + assert response.status_code == 403 + class TestAdminUpdateTag: """Tests for updating node tags.""" - def test_update_tag_success(self, admin_client): + def test_update_tag_success(self, admin_client, auth_headers): """Test updating a tag.""" response = admin_client.post( "/a/node-tags/update", @@ -292,6 +333,7 @@ class TestAdminUpdateTag: "value": "staging", "value_type": "string", }, + headers=auth_headers, follow_redirects=False, ) assert response.status_code == 303 @@ -299,7 +341,7 @@ class TestAdminUpdateTag: assert "updated" in response.headers["location"] def test_update_tag_not_found( - self, admin_app, mock_http_client_admin: MockHttpClient + self, admin_app, mock_http_client_admin: MockHttpClient, auth_headers ): """Test updating a non-existent tag returns error.""" # Set up 404 response for this specific tag @@ -320,13 +362,14 @@ class TestAdminUpdateTag: "value": "value", "value_type": "string", }, + headers=auth_headers, follow_redirects=False, ) assert response.status_code == 303 assert "error=" in response.headers["location"] assert "not+found" in response.headers["location"].lower() - def test_update_tag_disabled(self, admin_client_disabled): + def test_update_tag_disabled(self, admin_client_disabled, auth_headers): """Test updating tag when admin is disabled.""" response = admin_client_disabled.post( "/a/node-tags/update", @@ -336,6 +379,7 @@ class TestAdminUpdateTag: "value": "staging", "value_type": "string", }, + headers=auth_headers, follow_redirects=False, ) assert response.status_code == 404 @@ -344,7 +388,7 @@ class TestAdminUpdateTag: class TestAdminMoveTag: """Tests for moving node tags.""" - def test_move_tag_success(self, admin_client): + def test_move_tag_success(self, admin_client, auth_headers): """Test moving a tag to another node.""" response = admin_client.post( "/a/node-tags/move", @@ -353,6 +397,7 @@ class TestAdminMoveTag: "key": "environment", "new_public_key": "xyz789xyz789xyz789xyz789xyz789xy", }, + headers=auth_headers, follow_redirects=False, ) assert response.status_code == 303 @@ -365,7 +410,7 @@ class TestAdminMoveTag: class TestAdminDeleteTag: """Tests for deleting node tags.""" - def test_delete_tag_success(self, admin_client): + def test_delete_tag_success(self, admin_client, auth_headers): """Test deleting a tag.""" response = admin_client.post( "/a/node-tags/delete", @@ -373,6 +418,7 @@ class TestAdminDeleteTag: "public_key": "abc123def456abc123def456abc123de", "key": "environment", }, + headers=auth_headers, follow_redirects=False, ) assert response.status_code == 303