Files
goodpackagerepo/backend/auth_sqlalchemy.py
2025-12-29 09:04:28 +00:00

93 lines
2.7 KiB
Python

"""
Authentication and user management module using SQLAlchemy.
"""
import bcrypt
import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from models import User, UsersSession
def init_db():
"""Initialize the database with default admin user if needed."""
session = UsersSession()
try:
# Check if admin user exists
admin = session.query(User).filter_by(username='admin').first()
if not admin:
# Create default admin user (admin/admin)
password_hash = bcrypt.hashpw("admin".encode('utf-8'), bcrypt.gensalt())
now = datetime.utcnow().isoformat() + "Z"
admin = User(
username='admin',
password_hash=password_hash.decode('utf-8'),
scopes='read,write,admin',
created_at=now,
updated_at=now
)
session.add(admin)
session.commit()
finally:
session.close()
def verify_password(username: str, password: str) -> Optional[Dict[str, Any]]:
"""Verify username and password, return user data if valid."""
session = UsersSession()
try:
user = session.query(User).filter_by(username=username).first()
if not user:
return None
# Verify password
if bcrypt.checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')):
return {
'id': user.id,
'username': user.username,
'scopes': user.scopes.split(',')
}
return None
finally:
session.close()
def change_password(username: str, old_password: str, new_password: str) -> bool:
"""Change user password."""
# Verify old password first
user_data = verify_password(username, old_password)
if not user_data:
return False
# Hash new password
password_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
now = datetime.utcnow().isoformat() + "Z"
session = UsersSession()
try:
user = session.query(User).filter_by(username=username).first()
if user:
user.password_hash = password_hash.decode('utf-8')
user.updated_at = now
session.commit()
return True
return False
finally:
session.close()
def generate_token(user: Dict[str, Any], secret: str, expires_hours: int = 24) -> str:
"""Generate JWT token for user."""
payload = {
'sub': user['username'],
'scopes': user['scopes'],
'exp': datetime.utcnow() + timedelta(hours=expires_hours)
}
return jwt.encode(payload, secret, algorithm='HS256')
# Initialize database on module import
init_db()