""" Authentication endpoints for login and token management """ import cherrypy import logging from .auth.middleware import require_auth logger = logging.getLogger(__name__) class AuthAPIEndpoints: """Nested endpoint for /api/auth/* RESTful routes""" def __init__(self): # Create tokens nested endpoint for /api/auth/tokens self.tokens = TokensAPIEndpoint() class TokensAPIEndpoint: """RESTful token management endpoints for /api/auth/tokens""" @cherrypy.expose @cherrypy.tools.json_out() @require_auth def index(self): # Handle CORS preflight if cherrypy.request.method == 'OPTIONS': return {} # Get token manager from cherrypy config token_manager = cherrypy.config.get('token_manager') if not token_manager: cherrypy.response.status = 500 return {'success': False, 'error': 'Token manager not available'} if cherrypy.request.method == 'GET': try: tokens = token_manager.list_tokens() return { 'success': True, 'tokens': tokens } except Exception as e: logger.error(f"Token list error: {e}") cherrypy.response.status = 500 return { 'success': False, 'error': 'Failed to list tokens' } elif cherrypy.request.method == 'POST': try: import json body = cherrypy.request.body.read().decode('utf-8') data = json.loads(body) if body else {} name = data.get('name', '').strip() if not name: cherrypy.response.status = 400 return { 'success': False, 'error': 'Token name is required' } # Create the token token_id, plaintext_token = token_manager.create_token(name) logger.info(f"Generated API token '{name}' (ID: {token_id}) by user {cherrypy.request.user['username']}") return { 'success': True, 'token': plaintext_token, 'token_id': token_id, 'name': name, 'warning': 'Save this token securely - it will not be shown again' } except Exception as e: logger.error(f"Token generation error: {e}") cherrypy.response.status = 500 return { 'success': False, 'error': 'Failed to generate token' } else: raise cherrypy.HTTPError(405, "Method not allowed") @cherrypy.expose @cherrypy.tools.json_out() @require_auth def default(self, token_id=None): # Handle CORS preflight if cherrypy.request.method == 'OPTIONS': return {} # Get token manager from cherrypy config token_manager = cherrypy.config.get('token_manager') if not token_manager: cherrypy.response.status = 500 return {'success': False, 'error': 'Token manager not available'} if cherrypy.request.method == 'DELETE': try: if not token_id: cherrypy.response.status = 400 return { 'success': False, 'error': 'Token ID is required' } # Convert to int try: token_id_int = int(token_id) except ValueError: cherrypy.response.status = 400 return { 'success': False, 'error': 'Invalid token ID' } # Revoke the token success = token_manager.revoke_token(token_id_int) if success: logger.info(f"Revoked API token ID {token_id_int} by user {cherrypy.request.user['username']}") return { 'success': True, 'message': 'Token revoked successfully' } else: cherrypy.response.status = 404 return { 'success': False, 'error': 'Token not found' } except Exception as e: logger.error(f"Token revocation error: {e}") cherrypy.response.status = 500 return { 'success': False, 'error': 'Failed to revoke token' } else: raise cherrypy.HTTPError(405, "Method not allowed") class AuthEndpoints: def __init__(self, config, jwt_handler, token_manager, config_manager=None): self.config = config self.jwt_handler = jwt_handler self.token_manager = token_manager self.config_manager = config_manager @cherrypy.expose def login(self, **kwargs): cherrypy.response.headers['Content-Type'] = 'application/json' # Handle CORS preflight if cherrypy.request.method == 'OPTIONS': cherrypy.response.headers['Access-Control-Allow-Methods'] = 'POST, OPTIONS' cherrypy.response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-API-Key' return b'' if cherrypy.request.method != 'POST': raise cherrypy.HTTPError(405, "Method not allowed") try: # Parse JSON body manually since we can't use json_in decorator with OPTIONS import json body = cherrypy.request.body.read().decode('utf-8') data = json.loads(body) if body else {} username = data.get('username', '').strip() password = data.get('password', '') client_id = data.get('client_id', '').strip() if not username or not password or not client_id: return json.dumps({ 'success': False, 'error': 'Missing required fields: username, password, client_id' }).encode('utf-8') # Validate credentials against config # Check if username is 'admin' and password matches config repeater_config = self.config.get('repeater', {}) security_config = repeater_config.get('security', {}) config_password = security_config.get('admin_password', '') # Don't allow login with empty or unconfigured password if not config_password: logger.warning(f"Login attempt rejected - password not configured") return json.dumps({ 'success': False, 'error': 'System not configured. Please complete setup wizard.' }).encode('utf-8') if username == 'admin' and password == config_password: # Create JWT token token = self.jwt_handler.create_jwt(username, client_id) logger.info(f"Successful login for user '{username}' from client '{client_id[:8]}...'") return json.dumps({ 'success': True, 'token': token, 'expires_in': self.jwt_handler.expiry_minutes * 60, 'username': username }).encode('utf-8') else: logger.warning(f"Failed login attempt for user '{username}'") # Don't reveal which part was wrong return json.dumps({ 'success': False, 'error': 'Invalid username or password' }).encode('utf-8') except Exception as e: logger.error(f"Login error: {e}") return json.dumps({ 'success': False, 'error': 'Internal server error' }).encode('utf-8') @cherrypy.expose @cherrypy.tools.json_out() @require_auth def verify(self): if cherrypy.request.method != 'GET': raise cherrypy.HTTPError(405, "Method not allowed") return { 'success': True, 'authenticated': True, 'user': cherrypy.request.user } @cherrypy.expose def refresh(self, **kwargs): cherrypy.response.headers['Content-Type'] = 'application/json' # Handle CORS preflight if cherrypy.request.method == 'OPTIONS': cherrypy.response.headers['Access-Control-Allow-Methods'] = 'POST, OPTIONS' cherrypy.response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-API-Key' return b'' if cherrypy.request.method != 'POST': raise cherrypy.HTTPError(405, "Method not allowed") try: import json # Manual authentication check (can't use @require_auth since we need to handle OPTIONS) auth_header = cherrypy.request.headers.get('Authorization', '') api_key = cherrypy.request.headers.get('X-API-Key', '') jwt_handler = cherrypy.config.get('jwt_handler') token_manager = cherrypy.config.get('token_manager') user_info = None # Check JWT first if auth_header.startswith('Bearer '): token = auth_header[7:] payload = jwt_handler.verify_jwt(token) if payload: user_info = { 'username': payload['sub'], 'client_id': payload.get('client_id'), 'auth_method': 'jwt' } # Check API token if not user_info and api_key: token_data = token_manager.verify_token(api_key) if token_data: user_info = { 'username': 'admin', 'token_id': token_data['id'], 'auth_method': 'api_token' } if not user_info: return json.dumps({ 'success': False, 'error': 'Unauthorized - Valid JWT or API token required' }).encode('utf-8') # Parse request body body = cherrypy.request.body.read().decode('utf-8') data = json.loads(body) if body else {} client_id = data.get('client_id', user_info.get('client_id', '')).strip() if not client_id: return json.dumps({ 'success': False, 'error': 'Client ID is required' }).encode('utf-8') # Create new JWT token (refreshes expiry time) new_token = self.jwt_handler.create_jwt(user_info['username'], client_id) logger.info(f"Token refreshed for user '{user_info['username']}' from client '{client_id[:8]}...'") return json.dumps({ 'success': True, 'token': new_token, 'expires_in': self.jwt_handler.expiry_minutes * 60, 'username': user_info['username'] }).encode('utf-8') except Exception as e: logger.error(f"Token refresh error: {e}") return json.dumps({ 'success': False, 'error': 'Failed to refresh token' }).encode('utf-8') @cherrypy.expose def change_password(self): import json cherrypy.response.headers['Content-Type'] = 'application/json' # Handle CORS preflight if cherrypy.request.method == 'OPTIONS': cherrypy.response.headers['Access-Control-Allow-Methods'] = 'POST, OPTIONS' cherrypy.response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-API-Key' return b'' if cherrypy.request.method != 'POST': raise cherrypy.HTTPError(405, "Method not allowed") # Require authentication for POST # Get auth handlers from global cherrypy config jwt_handler = cherrypy.config.get('jwt_handler') token_manager = cherrypy.config.get('token_manager') if not jwt_handler or not token_manager: logger.error("Auth handlers not configured") raise cherrypy.HTTPError(500, "Authentication not configured") # Try JWT authentication first auth_header = cherrypy.request.headers.get('Authorization', '') user = None if auth_header.startswith('Bearer '): token = auth_header[7:] # Remove 'Bearer ' prefix payload = jwt_handler.verify_jwt(token) if payload: user = { 'username': payload['sub'], 'client_id': payload['client_id'], 'auth_type': 'jwt' } # Try API token authentication if JWT failed if not user: api_key = cherrypy.request.headers.get('X-API-Key', '') if api_key: token_info = token_manager.verify_token(api_key) if token_info: user = { 'username': 'api_token', 'token_name': token_info['name'], 'token_id': token_info['id'], 'auth_type': 'api_token' } if not user: cherrypy.response.status = 401 return json.dumps({ 'success': False, 'error': 'Unauthorized - Valid JWT or API token required' }).encode('utf-8') try: # Parse JSON body manually body = cherrypy.request.body.read().decode('utf-8') data = json.loads(body) if body else {} current_password = data.get('current_password', '') new_password = data.get('new_password', '') if not current_password or not new_password: cherrypy.response.status = 400 return json.dumps({ 'success': False, 'error': 'Both current_password and new_password are required' }).encode('utf-8') # Validate new password strength if len(new_password) < 8: cherrypy.response.status = 400 return json.dumps({ 'success': False, 'error': 'New password must be at least 8 characters long' }).encode('utf-8') # Verify current password repeater_config = self.config.get('repeater', {}) security_config = repeater_config.get('security', {}) config_password = security_config.get('admin_password', '') if not config_password: cherrypy.response.status = 500 return json.dumps({ 'success': False, 'error': 'System configuration error' }).encode('utf-8') if current_password != config_password: cherrypy.response.status = 401 return json.dumps({ 'success': False, 'error': 'Current password is incorrect' }).encode('utf-8') # Update password in config if 'repeater' not in self.config: self.config['repeater'] = {} if 'security' not in self.config['repeater']: self.config['repeater']['security'] = {} self.config['repeater']['security']['admin_password'] = new_password # Save to config file using ConfigManager if self.config_manager: if self.config_manager.save_to_file(): logger.info(f"Admin password changed successfully by user {user['username']}") return json.dumps({ 'success': True, 'message': 'Password changed successfully. Please log in again with your new password.' }).encode('utf-8') else: cherrypy.response.status = 500 return json.dumps({ 'success': False, 'error': 'Failed to save password to config file' }).encode('utf-8') else: cherrypy.response.status = 500 return json.dumps({ 'success': False, 'error': 'Config manager not available' }).encode('utf-8') except Exception as e: logger.error(f"Password change error: {e}") cherrypy.response.status = 500 return json.dumps({ 'success': False, 'error': 'Failed to change password' }).encode('utf-8')