mirror of
https://github.com/johndoe6345789/goodpackagerepo.git
synced 2026-04-24 13:54:59 +00:00
93 lines
2.7 KiB
Python
93 lines
2.7 KiB
Python
"""
|
|
Authentication and user management module using SQLAlchemy.
|
|
"""
|
|
|
|
import bcrypt
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Any
|
|
from models import User, UsersSession
|
|
|
|
|
|
def init_db():
|
|
"""Initialize the database with default admin user if needed."""
|
|
session = UsersSession()
|
|
try:
|
|
# Check if admin user exists
|
|
admin = session.query(User).filter_by(username='admin').first()
|
|
if not admin:
|
|
# Create default admin user (admin/admin)
|
|
password_hash = bcrypt.hashpw("admin".encode('utf-8'), bcrypt.gensalt())
|
|
now = datetime.utcnow().isoformat() + "Z"
|
|
admin = User(
|
|
username='admin',
|
|
password_hash=password_hash.decode('utf-8'),
|
|
scopes='read,write,admin',
|
|
created_at=now,
|
|
updated_at=now
|
|
)
|
|
session.add(admin)
|
|
session.commit()
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def verify_password(username: str, password: str) -> Optional[Dict[str, Any]]:
|
|
"""Verify username and password, return user data if valid."""
|
|
session = UsersSession()
|
|
try:
|
|
user = session.query(User).filter_by(username=username).first()
|
|
|
|
if not user:
|
|
return None
|
|
|
|
# Verify password
|
|
if bcrypt.checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')):
|
|
return {
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'scopes': user.scopes.split(',')
|
|
}
|
|
|
|
return None
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def change_password(username: str, old_password: str, new_password: str) -> bool:
|
|
"""Change user password."""
|
|
# Verify old password first
|
|
user_data = verify_password(username, old_password)
|
|
if not user_data:
|
|
return False
|
|
|
|
# Hash new password
|
|
password_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
|
|
now = datetime.utcnow().isoformat() + "Z"
|
|
|
|
session = UsersSession()
|
|
try:
|
|
user = session.query(User).filter_by(username=username).first()
|
|
if user:
|
|
user.password_hash = password_hash.decode('utf-8')
|
|
user.updated_at = now
|
|
session.commit()
|
|
return True
|
|
return False
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def generate_token(user: Dict[str, Any], secret: str, expires_hours: int = 24) -> str:
|
|
"""Generate JWT token for user."""
|
|
payload = {
|
|
'sub': user['username'],
|
|
'scopes': user['scopes'],
|
|
'exp': datetime.utcnow() + timedelta(hours=expires_hours)
|
|
}
|
|
return jwt.encode(payload, secret, algorithm='HS256')
|
|
|
|
|
|
# Initialize database on module import
|
|
init_db()
|