forked from iarv/pyMC_Repeater
- Implemented JWT authentication with auto-generated secret if not provided. - Added API token management functionality. - Created authentication endpoints for login, token refresh, verification, and password change. - Introduced API documentation endpoints for Swagger UI and OpenAPI spec. - Enhanced CORS support for API and documentation endpoints. - Updated OpenAPI specification to include new authentication and system endpoints.
39 lines
1.1 KiB
Python
39 lines
1.1 KiB
Python
import jwt
|
|
import time
|
|
from typing import Dict, Optional
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class JWTHandler:
|
|
def __init__(self, secret: str, expiry_minutes: int = 15):
|
|
self.secret = secret
|
|
self.expiry_minutes = expiry_minutes
|
|
|
|
def create_jwt(self, username: str, client_id: str) -> str:
|
|
|
|
now = int(time.time())
|
|
expiry = now + (self.expiry_minutes * 60)
|
|
|
|
payload = {
|
|
'sub': username,
|
|
'exp': expiry,
|
|
'iat': now,
|
|
'client_id': client_id
|
|
}
|
|
|
|
token = jwt.encode(payload, self.secret, algorithm='HS256')
|
|
logger.info(f"Created JWT for user '{username}' with client_id '{client_id[:8]}...'")
|
|
return token
|
|
|
|
def verify_jwt(self, token: str) -> Optional[Dict]:
|
|
try:
|
|
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
logger.warning("JWT token expired")
|
|
return None
|
|
except jwt.InvalidTokenError as e:
|
|
logger.warning(f"Invalid JWT token: {e}")
|
|
return None
|