Files
pyMC_Repeater/repeater/web/auth_endpoints.py
agessaman c2f8a2e3cd refactor: companion FrameServer and related (substantive only, no Black)
Reapply refactor from ce8381a (replace monolithic FrameServer with thin
pymc_core subclass, re-export constants, SQLite persistence hooks) while
preserving pre-refactor whitespace where patch applied cleanly. Remaining
files match refactor commit exactly. Diff vs ce8381a is whitespace-only.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-21 15:35:47 -08:00

451 lines
16 KiB
Python

"""
Authentication endpoints for login and token management
"""
import logging
import cherrypy
from .auth.middleware import require_auth
logger = logging.getLogger(__name__)
class AuthAPIEndpoints:
"""Nested endpoint for /api/auth/* RESTful routes"""
def __init__(self):
# Create tokens nested endpoint for /api/auth/tokens
self.tokens = TokensAPIEndpoint()
class TokensAPIEndpoint:
"""RESTful token management endpoints for /api/auth/tokens"""
@cherrypy.expose
@cherrypy.tools.json_out()
@require_auth
def index(self):
# Handle CORS preflight
if cherrypy.request.method == "OPTIONS":
return {}
# Get token manager from cherrypy config
token_manager = cherrypy.config.get("token_manager")
if not token_manager:
cherrypy.response.status = 500
return {"success": False, "error": "Token manager not available"}
if cherrypy.request.method == "GET":
try:
tokens = token_manager.list_tokens()
return {"success": True, "tokens": tokens}
except Exception as e:
logger.error(f"Token list error: {e}")
cherrypy.response.status = 500
return {"success": False, "error": "Failed to list tokens"}
elif cherrypy.request.method == "POST":
try:
import json
body = cherrypy.request.body.read().decode("utf-8")
data = json.loads(body) if body else {}
name = data.get("name", "").strip()
if not name:
cherrypy.response.status = 400
return {"success": False, "error": "Token name is required"}
# Create the token
token_id, plaintext_token = token_manager.create_token(name)
logger.info(
f"Generated API token '{name}' (ID: {token_id}) by user {cherrypy.request.user['username']}"
)
return {
"success": True,
"token": plaintext_token,
"token_id": token_id,
"name": name,
"warning": "Save this token securely - it will not be shown again",
}
except Exception as e:
logger.error(f"Token generation error: {e}")
cherrypy.response.status = 500
return {"success": False, "error": "Failed to generate token"}
else:
raise cherrypy.HTTPError(405, "Method not allowed")
@cherrypy.expose
@cherrypy.tools.json_out()
@require_auth
def default(self, token_id=None):
# Handle CORS preflight
if cherrypy.request.method == "OPTIONS":
return {}
# Get token manager from cherrypy config
token_manager = cherrypy.config.get("token_manager")
if not token_manager:
cherrypy.response.status = 500
return {"success": False, "error": "Token manager not available"}
if cherrypy.request.method == "DELETE":
try:
if not token_id:
cherrypy.response.status = 400
return {"success": False, "error": "Token ID is required"}
# Convert to int
try:
token_id_int = int(token_id)
except ValueError:
cherrypy.response.status = 400
return {"success": False, "error": "Invalid token ID"}
# Revoke the token
success = token_manager.revoke_token(token_id_int)
if success:
logger.info(
f"Revoked API token ID {token_id_int} by user {cherrypy.request.user['username']}"
)
return {"success": True, "message": "Token revoked successfully"}
else:
cherrypy.response.status = 404
return {"success": False, "error": "Token not found"}
except Exception as e:
logger.error(f"Token revocation error: {e}")
cherrypy.response.status = 500
return {"success": False, "error": "Failed to revoke token"}
else:
raise cherrypy.HTTPError(405, "Method not allowed")
class AuthEndpoints:
def __init__(self, config, jwt_handler, token_manager, config_manager=None):
self.config = config
self.jwt_handler = jwt_handler
self.token_manager = token_manager
self.config_manager = config_manager
@cherrypy.expose
def login(self, **kwargs):
cherrypy.response.headers["Content-Type"] = "application/json"
# Handle CORS preflight
if cherrypy.request.method == "OPTIONS":
cherrypy.response.headers["Access-Control-Allow-Methods"] = "POST, OPTIONS"
cherrypy.response.headers["Access-Control-Allow-Headers"] = (
"Content-Type, Authorization, X-API-Key"
)
return b""
if cherrypy.request.method != "POST":
raise cherrypy.HTTPError(405, "Method not allowed")
try:
# Parse JSON body manually since we can't use json_in decorator with OPTIONS
import json
body = cherrypy.request.body.read().decode("utf-8")
data = json.loads(body) if body else {}
username = data.get("username", "").strip()
password = data.get("password", "")
client_id = data.get("client_id", "").strip()
if not username or not password or not client_id:
return json.dumps(
{
"success": False,
"error": "Missing required fields: username, password, client_id",
}
).encode("utf-8")
# Validate credentials against config
# Check if username is 'admin' and password matches config
repeater_config = self.config.get("repeater", {})
security_config = repeater_config.get("security", {})
config_password = security_config.get("admin_password", "")
# Don't allow login with empty or unconfigured password
if not config_password:
logger.warning(f"Login attempt rejected - password not configured")
return json.dumps(
{
"success": False,
"error": "System not configured. Please complete setup wizard.",
}
).encode("utf-8")
if username == "admin" and password == config_password:
# Create JWT token
token = self.jwt_handler.create_jwt(username, client_id)
logger.info(
f"Successful login for user '{username}' from client '{client_id[:8]}...'"
)
return json.dumps(
{
"success": True,
"token": token,
"expires_in": self.jwt_handler.expiry_minutes * 60,
"username": username,
}
).encode("utf-8")
else:
logger.warning(f"Failed login attempt for user '{username}'")
# Don't reveal which part was wrong
return json.dumps(
{"success": False, "error": "Invalid username or password"}
).encode("utf-8")
except Exception as e:
logger.error(f"Login error: {e}")
return json.dumps({"success": False, "error": "Internal server error"}).encode("utf-8")
@cherrypy.expose
@cherrypy.tools.json_out()
@require_auth
def verify(self):
if cherrypy.request.method != "GET":
raise cherrypy.HTTPError(405, "Method not allowed")
return {"success": True, "authenticated": True, "user": cherrypy.request.user}
@cherrypy.expose
def refresh(self, **kwargs):
cherrypy.response.headers["Content-Type"] = "application/json"
# Handle CORS preflight
if cherrypy.request.method == "OPTIONS":
cherrypy.response.headers["Access-Control-Allow-Methods"] = "POST, OPTIONS"
cherrypy.response.headers["Access-Control-Allow-Headers"] = (
"Content-Type, Authorization, X-API-Key"
)
return b""
if cherrypy.request.method != "POST":
raise cherrypy.HTTPError(405, "Method not allowed")
try:
import json
# Manual authentication check (can't use @require_auth since we need to handle OPTIONS)
auth_header = cherrypy.request.headers.get("Authorization", "")
api_key = cherrypy.request.headers.get("X-API-Key", "")
jwt_handler = cherrypy.config.get("jwt_handler")
token_manager = cherrypy.config.get("token_manager")
user_info = None
# Check JWT first
if auth_header.startswith("Bearer "):
token = auth_header[7:]
payload = jwt_handler.verify_jwt(token)
if payload:
user_info = {
"username": payload["sub"],
"client_id": payload.get("client_id"),
"auth_method": "jwt",
}
# Check API token
if not user_info and api_key:
token_data = token_manager.verify_token(api_key)
if token_data:
user_info = {
"username": "admin",
"token_id": token_data["id"],
"auth_method": "api_token",
}
if not user_info:
return json.dumps(
{"success": False, "error": "Unauthorized - Valid JWT or API token required"}
).encode("utf-8")
# Parse request body
body = cherrypy.request.body.read().decode("utf-8")
data = json.loads(body) if body else {}
client_id = data.get("client_id", user_info.get("client_id", "")).strip()
if not client_id:
return json.dumps({"success": False, "error": "Client ID is required"}).encode(
"utf-8"
)
# Create new JWT token (refreshes expiry time)
new_token = self.jwt_handler.create_jwt(user_info["username"], client_id)
logger.info(
f"Token refreshed for user '{user_info['username']}' from client '{client_id[:8]}...'"
)
return json.dumps(
{
"success": True,
"token": new_token,
"expires_in": self.jwt_handler.expiry_minutes * 60,
"username": user_info["username"],
}
).encode("utf-8")
except Exception as e:
logger.error(f"Token refresh error: {e}")
return json.dumps({"success": False, "error": "Failed to refresh token"}).encode(
"utf-8"
)
@cherrypy.expose
def change_password(self):
import json
cherrypy.response.headers["Content-Type"] = "application/json"
# Handle CORS preflight
if cherrypy.request.method == "OPTIONS":
cherrypy.response.headers["Access-Control-Allow-Methods"] = "POST, OPTIONS"
cherrypy.response.headers["Access-Control-Allow-Headers"] = (
"Content-Type, Authorization, X-API-Key"
)
return b""
if cherrypy.request.method != "POST":
raise cherrypy.HTTPError(405, "Method not allowed")
# Require authentication for POST
# Get auth handlers from global cherrypy config
jwt_handler = cherrypy.config.get("jwt_handler")
token_manager = cherrypy.config.get("token_manager")
if not jwt_handler or not token_manager:
logger.error("Auth handlers not configured")
raise cherrypy.HTTPError(500, "Authentication not configured")
# Try JWT authentication first
auth_header = cherrypy.request.headers.get("Authorization", "")
user = None
if auth_header.startswith("Bearer "):
token = auth_header[7:] # Remove 'Bearer ' prefix
payload = jwt_handler.verify_jwt(token)
if payload:
user = {
"username": payload["sub"],
"client_id": payload["client_id"],
"auth_type": "jwt",
}
# Try API token authentication if JWT failed
if not user:
api_key = cherrypy.request.headers.get("X-API-Key", "")
if api_key:
token_info = token_manager.verify_token(api_key)
if token_info:
user = {
"username": "api_token",
"token_name": token_info["name"],
"token_id": token_info["id"],
"auth_type": "api_token",
}
if not user:
cherrypy.response.status = 401
return json.dumps(
{"success": False, "error": "Unauthorized - Valid JWT or API token required"}
).encode("utf-8")
try:
# Parse JSON body manually
body = cherrypy.request.body.read().decode("utf-8")
data = json.loads(body) if body else {}
current_password = data.get("current_password", "")
new_password = data.get("new_password", "")
if not current_password or not new_password:
cherrypy.response.status = 400
return json.dumps(
{
"success": False,
"error": "Both current_password and new_password are required",
}
).encode("utf-8")
# Validate new password strength
if len(new_password) < 8:
cherrypy.response.status = 400
return json.dumps(
{"success": False, "error": "New password must be at least 8 characters long"}
).encode("utf-8")
# Verify current password
repeater_config = self.config.get("repeater", {})
security_config = repeater_config.get("security", {})
config_password = security_config.get("admin_password", "")
if not config_password:
cherrypy.response.status = 500
return json.dumps({"success": False, "error": "System configuration error"}).encode(
"utf-8"
)
if current_password != config_password:
cherrypy.response.status = 401
return json.dumps(
{"success": False, "error": "Current password is incorrect"}
).encode("utf-8")
# Update password in config
if "repeater" not in self.config:
self.config["repeater"] = {}
if "security" not in self.config["repeater"]:
self.config["repeater"]["security"] = {}
self.config["repeater"]["security"]["admin_password"] = new_password
# Save to config file using ConfigManager
if self.config_manager:
saved, _ = self.config_manager.save_to_file()
if saved:
logger.info(f"Admin password changed successfully by user {user['username']}")
return json.dumps(
{
"success": True,
"message": "Password changed successfully. Please log in again with your new password.",
}
).encode("utf-8")
else:
cherrypy.response.status = 500
return json.dumps(
{"success": False, "error": "Failed to save password to config file"}
).encode("utf-8")
else:
cherrypy.response.status = 500
return json.dumps(
{"success": False, "error": "Config manager not available"}
).encode("utf-8")
except Exception as e:
logger.error(f"Password change error: {e}")
cherrypy.response.status = 500
return json.dumps({"success": False, "error": "Failed to change password"}).encode(
"utf-8"
)