Files
metabuilder/frontends/workflowui/backend/auth.py
2026-03-09 22:30:41 +00:00

290 lines
8.8 KiB
Python

"""
Authentication Module
JWT-based user authentication for WorkflowUI
"""
from flask import request, jsonify
from functools import wraps
from datetime import datetime, timedelta
import jwt
import hashlib
import os
from typing import Dict, Tuple, Optional
# Get secret key from environment or use default (change in production!)
SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'your-secret-key-change-in-production')
JWT_EXPIRATION_HOURS = 24
class AuthError(Exception):
"""Custom exception for auth errors"""
def __init__(self, message: str, status_code: int = 401):
self.message = message
self.status_code = status_code
def hash_password(password: str) -> str:
"""Hash password using SHA-512"""
return hashlib.sha512(password.encode()).hexdigest()
def verify_password(password: str, password_hash: str) -> bool:
"""Verify password against hash"""
return hash_password(password) == password_hash
def generate_token(user_id: str, email: str) -> str:
"""Generate JWT token for user"""
payload = {
'user_id': user_id,
'email': email,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(hours=JWT_EXPIRATION_HOURS)
}
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
def verify_token(token: str) -> Optional[Dict]:
"""Verify JWT token and return payload"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
raise AuthError('Token has expired', 401)
except jwt.InvalidTokenError:
raise AuthError('Invalid token', 401)
def token_required(f):
"""Decorator to require valid JWT token"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# Check for token in headers
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(" ")[1]
except IndexError:
raise AuthError('Invalid authorization header', 401)
if not token:
raise AuthError('Token is missing', 401)
try:
payload = verify_token(token)
request.user_id = payload['user_id']
request.email = payload['email']
except AuthError as e:
raise e
return f(*args, **kwargs)
return decorated
def validate_email(email: str) -> bool:
"""Simple email validation"""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_password(password: str) -> Tuple[bool, Optional[str]]:
"""Validate password strength"""
if len(password) < 8:
return False, 'Password must be at least 8 characters'
if not any(c.isupper() for c in password):
return False, 'Password must contain uppercase letters'
if not any(c.islower() for c in password):
return False, 'Password must contain lowercase letters'
if not any(c.isdigit() for c in password):
return False, 'Password must contain numbers'
return True, None
def register_auth_routes(app):
"""Register authentication routes with Flask app"""
@app.route('/api/auth/register', methods=['POST'])
def register():
"""Register new user"""
try:
data = request.get_json()
# Validate input
if not data:
return jsonify({'error': 'No data provided'}), 400
email = data.get('email', '').strip()
password = data.get('password', '')
name = data.get('name', '').strip()
# Validate email
if not email or not validate_email(email):
return jsonify({'error': 'Invalid email address'}), 400
# Validate name
if not name or len(name) < 2:
return jsonify({'error': 'Name must be at least 2 characters'}), 400
# Validate password
is_valid, error_msg = validate_password(password)
if not is_valid:
return jsonify({'error': error_msg}), 400
# Check if user already exists
from models import User, db
existing_user = User.query.filter_by(email=email).first()
if existing_user:
return jsonify({'error': 'Email already registered'}), 409
# Create new user
user_id = f'user_{datetime.utcnow().timestamp()}'
password_hash = hash_password(password)
user = User(
id=user_id,
email=email,
password_hash=password_hash,
name=name,
created_at=datetime.utcnow()
)
db.session.add(user)
db.session.commit()
# Generate token
token = generate_token(user_id, email)
return jsonify({
'success': True,
'user': {
'id': user.id,
'email': user.email,
'name': user.name
},
'token': token
}), 201
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
@app.route('/api/auth/login', methods=['POST'])
def login():
"""Login user"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
email = data.get('email', '').strip()
password = data.get('password', '')
if not email or not password:
return jsonify({'error': 'Email and password required'}), 400
from models import User
# Find user
user = User.query.filter_by(email=email).first()
if not user or not verify_password(password, user.password_hash):
return jsonify({'error': 'Invalid email or password'}), 401
# Generate token
token = generate_token(user.id, user.email)
return jsonify({
'success': True,
'user': {
'id': user.id,
'email': user.email,
'name': user.name
},
'token': token
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/auth/me', methods=['GET'])
@token_required
def get_current_user():
"""Get current user info"""
try:
from models import User
user = User.query.filter_by(id=request.user_id).first()
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({
'id': user.id,
'email': user.email,
'name': user.name,
'created_at': user.created_at.isoformat()
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/auth/logout', methods=['POST'])
@token_required
def logout():
"""Logout user (token invalidation on client side)"""
# JWT tokens are stateless, so logout is client-side only
# In production, you might maintain a token blacklist
return jsonify({'success': True, 'message': 'Logged out successfully'}), 200
@app.route('/api/auth/change-password', methods=['POST'])
@token_required
def change_password():
"""Change user password"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
current_password = data.get('current_password', '')
new_password = data.get('new_password', '')
if not current_password or not new_password:
return jsonify({'error': 'Current and new password required'}), 400
from models import User
user = User.query.filter_by(id=request.user_id).first()
if not user:
return jsonify({'error': 'User not found'}), 404
# Verify current password
if not verify_password(current_password, user.password_hash):
return jsonify({'error': 'Current password is incorrect'}), 401
# Validate new password
is_valid, error_msg = validate_password(new_password)
if not is_valid:
return jsonify({'error': error_msg}), 400
# Update password
user.password_hash = hash_password(new_password)
user.updated_at = datetime.utcnow()
db.session.commit()
return jsonify({'success': True, 'message': 'Password changed successfully'}), 200
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500