mirror of
https://github.com/johndoe6345789/goodpackagerepo.git
synced 2026-04-24 13:54:59 +00:00
114 lines
3.1 KiB
Python
114 lines
3.1 KiB
Python
"""
|
|
Authentication and user management module using SQLite.
|
|
"""
|
|
|
|
import sqlite3
|
|
import bcrypt
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any
|
|
|
|
# Database path
|
|
DB_PATH = Path(__file__).parent / "users.db"
|
|
|
|
|
|
def get_db():
|
|
"""Get database connection."""
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db():
|
|
"""Initialize the database with users table and default admin user."""
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
# Create users table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
scopes TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
# Check if admin user exists
|
|
cursor.execute("SELECT id FROM users WHERE username = ?", ("admin",))
|
|
if not cursor.fetchone():
|
|
# Create default admin user (admin/admin)
|
|
password_hash = bcrypt.hashpw("admin".encode('utf-8'), bcrypt.gensalt())
|
|
now = datetime.utcnow().isoformat() + "Z"
|
|
cursor.execute("""
|
|
INSERT INTO users (username, password_hash, scopes, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", ("admin", password_hash.decode('utf-8'), "read,write,admin", now, now))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def verify_password(username: str, password: str) -> Optional[Dict[str, Any]]:
|
|
"""Verify username and password, return user data if valid."""
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
|
|
user = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if not user:
|
|
return None
|
|
|
|
# Verify password
|
|
if bcrypt.checkpw(password.encode('utf-8'), user['password_hash'].encode('utf-8')):
|
|
return {
|
|
'id': user['id'],
|
|
'username': user['username'],
|
|
'scopes': user['scopes'].split(',')
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def change_password(username: str, old_password: str, new_password: str) -> bool:
|
|
"""Change user password."""
|
|
# Verify old password first
|
|
user = verify_password(username, old_password)
|
|
if not user:
|
|
return False
|
|
|
|
# Hash new password
|
|
password_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
|
|
now = datetime.utcnow().isoformat() + "Z"
|
|
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE users
|
|
SET password_hash = ?, updated_at = ?
|
|
WHERE username = ?
|
|
""", (password_hash.decode('utf-8'), now, username))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return True
|
|
|
|
|
|
def generate_token(user: Dict[str, Any], secret: str, expires_hours: int = 24) -> str:
|
|
"""Generate JWT token for user."""
|
|
payload = {
|
|
'sub': user['username'],
|
|
'scopes': user['scopes'],
|
|
'exp': datetime.utcnow() + timedelta(hours=expires_hours)
|
|
}
|
|
return jwt.encode(payload, secret, algorithm='HS256')
|
|
|
|
|
|
# Initialize database on module import
|
|
init_db()
|