Files
2026-03-09 22:30:41 +00:00

429 lines
16 KiB
Python

"""
SQLAlchemy Database Models
Defines the data schema for workflows, executions, and node types
"""
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import json
from typing import Optional, List, Dict, Any
db = SQLAlchemy()
class User(db.Model):
"""User model for authentication"""
__tablename__ = 'users'
id = db.Column(db.String(255), primary_key=True)
email = db.Column(db.String(255), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(255), nullable=False)
name = db.Column(db.String(255), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
__table_args__ = (
db.Index('idx_email', 'email'),
)
class Workflow(db.Model):
"""Workflow model representing a complete DAG workflow"""
__tablename__ = 'workflows'
id = db.Column(db.String(255), primary_key=True)
name = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text, default='')
version = db.Column(db.String(50), default='1.0.0')
tenant_id = db.Column(db.String(255), nullable=False, index=True)
# JSON fields for workflow structure
nodes_json = db.Column(db.Text, default='[]') # Array of node objects
connections_json = db.Column(db.Text, default='[]') # Array of edge objects
tags_json = db.Column(db.Text, default='[]') # Array of tag strings
# Project organization (NEW)
project_id = db.Column(db.String(255), db.ForeignKey('projects.id'), nullable=True)
workspace_id = db.Column(db.String(255), db.ForeignKey('workspaces.id'), nullable=True)
starred = db.Column(db.Boolean, default=False)
# Metadata
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
# Relationships
executions = db.relationship('Execution', backref='workflow', cascade='all, delete-orphan', lazy=True)
# Indexes for efficient querying
__table_args__ = (
db.Index('idx_workflow_tenant_id', 'tenant_id'),
db.Index('idx_workflow_tenant_name', 'tenant_id', 'name'),
db.Index('idx_workflow_project_id', 'project_id'),
db.Index('idx_workflow_workspace_id', 'workspace_id'),
db.Index('idx_workflow_tenant_project', 'tenant_id', 'project_id'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary"""
return {
'id': self.id,
'name': self.name,
'description': self.description,
'version': self.version,
'tenantId': self.tenant_id,
'projectId': self.project_id,
'workspaceId': self.workspace_id,
'starred': self.starred,
'nodes': json.loads(self.nodes_json),
'connections': json.loads(self.connections_json),
'tags': json.loads(self.tags_json),
'createdAt': int(self.created_at.timestamp() * 1000),
'updatedAt': int(self.updated_at.timestamp() * 1000)
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'Workflow':
"""Create model from dictionary"""
workflow = Workflow(
id=data.get('id'),
name=data.get('name', 'Untitled'),
description=data.get('description', ''),
version=data.get('version', '1.0.0'),
tenant_id=data.get('tenantId', 'default'),
project_id=data.get('projectId'),
workspace_id=data.get('workspaceId'),
starred=data.get('starred', False),
nodes_json=json.dumps(data.get('nodes', [])),
connections_json=json.dumps(data.get('connections', [])),
tags_json=json.dumps(data.get('tags', []))
)
return workflow
class Execution(db.Model):
"""Execution model representing a workflow execution run"""
__tablename__ = 'executions'
id = db.Column(db.String(255), primary_key=True)
workflow_id = db.Column(db.String(255), db.ForeignKey('workflows.id'), nullable=False, index=True)
workflow_name = db.Column(db.String(255), nullable=False)
tenant_id = db.Column(db.String(255), nullable=False, index=True)
# Execution state
status = db.Column(db.String(50), nullable=False, default='pending') # pending, running, success, error, stopped
start_time = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
end_time = db.Column(db.DateTime, nullable=True)
duration = db.Column(db.Integer, nullable=True) # milliseconds
# Results and errors
nodes_json = db.Column(db.Text, default='[]') # Array of node execution results
error_json = db.Column(db.Text, nullable=True) # JSON error object
input_json = db.Column(db.Text, nullable=True) # Input parameters
output_json = db.Column(db.Text, nullable=True) # Final output
# Metadata
triggered_by = db.Column(db.String(255), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
# Indexes for efficient querying
__table_args__ = (
db.Index('idx_execution_workflow_id', 'workflow_id'),
db.Index('idx_execution_tenant_workflow', 'tenant_id', 'workflow_id'),
db.Index('idx_execution_status', 'status'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary"""
return {
'id': self.id,
'workflowId': self.workflow_id,
'workflowName': self.workflow_name,
'tenantId': self.tenant_id,
'status': self.status,
'startTime': int(self.start_time.timestamp() * 1000),
'endTime': int(self.end_time.timestamp() * 1000) if self.end_time else None,
'duration': self.duration,
'nodes': json.loads(self.nodes_json) if self.nodes_json else [],
'error': json.loads(self.error_json) if self.error_json else None,
'input': json.loads(self.input_json) if self.input_json else None,
'output': json.loads(self.output_json) if self.output_json else None,
'triggeredBy': self.triggered_by,
'createdAt': int(self.created_at.timestamp() * 1000)
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'Execution':
"""Create model from dictionary"""
execution = Execution(
id=data.get('id'),
workflow_id=data.get('workflowId'),
workflow_name=data.get('workflowName'),
tenant_id=data.get('tenantId', 'default'),
status=data.get('status', 'pending'),
nodes_json=json.dumps(data.get('nodes', [])),
error_json=json.dumps(data.get('error')) if data.get('error') else None,
input_json=json.dumps(data.get('input')) if data.get('input') else None,
output_json=json.dumps(data.get('output')) if data.get('output') else None,
triggered_by=data.get('triggeredBy')
)
return execution
class NodeType(db.Model):
"""NodeType model caching available node types"""
__tablename__ = 'node_types'
id = db.Column(db.String(255), primary_key=True)
name = db.Column(db.String(255), nullable=False)
version = db.Column(db.String(50), default='1.0.0')
category = db.Column(db.String(100), nullable=False, index=True)
description = db.Column(db.Text, default='')
icon = db.Column(db.String(100), nullable=True)
# JSON field for node configuration
parameters_json = db.Column(db.Text, default='{}')
# Metadata
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
__table_args__ = (
db.Index('idx_category', 'category'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary"""
return {
'id': self.id,
'name': self.name,
'version': self.version,
'category': self.category,
'description': self.description,
'icon': self.icon,
'parameters': json.loads(self.parameters_json)
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'NodeType':
"""Create model from dictionary"""
node_type = NodeType(
id=data.get('id'),
name=data.get('name'),
version=data.get('version', '1.0.0'),
category=data.get('category'),
description=data.get('description', ''),
icon=data.get('icon'),
parameters_json=json.dumps(data.get('parameters', {}))
)
return node_type
class AuditLog(db.Model):
"""AuditLog model for tracking workflow changes"""
__tablename__ = 'audit_logs'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
workflow_id = db.Column(db.String(255), nullable=False, index=True)
tenant_id = db.Column(db.String(255), nullable=False, index=True)
action = db.Column(db.String(50), nullable=False) # create, update, delete, execute
entity_type = db.Column(db.String(100), nullable=False) # workflow, execution
changes_json = db.Column(db.Text, nullable=True) # JSON of changes
user_id = db.Column(db.String(255), nullable=True)
ip_address = db.Column(db.String(100), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
__table_args__ = (
db.Index('idx_audit_workflow_id', 'workflow_id'),
db.Index('idx_audit_tenant_id', 'tenant_id'),
db.Index('idx_audit_action', 'action'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary"""
return {
'id': self.id,
'workflowId': self.workflow_id,
'tenantId': self.tenant_id,
'action': self.action,
'entityType': self.entity_type,
'changes': json.loads(self.changes_json) if self.changes_json else None,
'userId': self.user_id,
'ipAddress': self.ip_address,
'createdAt': int(self.created_at.timestamp() * 1000)
}
class Workspace(db.Model):
"""Workspace model representing a top-level workspace container"""
__tablename__ = 'workspaces'
id = db.Column(db.String(255), primary_key=True)
name = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text, default='')
icon = db.Column(db.String(100), nullable=True)
color = db.Column(db.String(20), default='#1976d2')
tenant_id = db.Column(db.String(255), nullable=False, index=True)
# Metadata
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
# Relationships
projects = db.relationship('Project', backref='workspace', cascade='all, delete-orphan', lazy=True)
__table_args__ = (
db.Index('idx_workspace_tenant_id', 'tenant_id'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary"""
return {
'id': self.id,
'name': self.name,
'description': self.description,
'icon': self.icon,
'color': self.color,
'tenantId': self.tenant_id,
'createdAt': int(self.created_at.timestamp() * 1000),
'updatedAt': int(self.updated_at.timestamp() * 1000)
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'Workspace':
"""Create model from dictionary"""
workspace = Workspace(
id=data.get('id'),
name=data.get('name', 'Untitled'),
description=data.get('description', ''),
icon=data.get('icon'),
color=data.get('color', '#1976d2'),
tenant_id=data.get('tenantId', 'default')
)
return workspace
class Project(db.Model):
"""Project model representing a project container within a workspace"""
__tablename__ = 'projects'
id = db.Column(db.String(255), primary_key=True)
name = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text, default='')
workspace_id = db.Column(db.String(255), db.ForeignKey('workspaces.id'), nullable=False)
tenant_id = db.Column(db.String(255), nullable=False, index=True)
color = db.Column(db.String(20), default='#1976d2')
starred = db.Column(db.Boolean, default=False)
# Metadata
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
# Relationships
canvas_items = db.relationship('ProjectCanvasItem', backref='project', cascade='all, delete-orphan', lazy=True)
__table_args__ = (
db.Index('idx_project_workspace_id', 'workspace_id'),
db.Index('idx_project_tenant_id', 'tenant_id'),
db.Index('idx_project_starred', 'starred'),
db.Index('idx_project_tenant_workspace', 'tenant_id', 'workspace_id'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary"""
return {
'id': self.id,
'name': self.name,
'description': self.description,
'workspaceId': self.workspace_id,
'tenantId': self.tenant_id,
'color': self.color,
'starred': self.starred,
'createdAt': int(self.created_at.timestamp() * 1000),
'updatedAt': int(self.updated_at.timestamp() * 1000)
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'Project':
"""Create model from dictionary"""
project = Project(
id=data.get('id'),
name=data.get('name', 'Untitled'),
description=data.get('description', ''),
workspace_id=data.get('workspaceId'),
tenant_id=data.get('tenantId', 'default'),
color=data.get('color', '#1976d2'),
starred=data.get('starred', False)
)
return project
class ProjectCanvasItem(db.Model):
"""ProjectCanvasItem model representing a workflow card on the canvas"""
__tablename__ = 'project_canvas_items'
id = db.Column(db.String(255), primary_key=True)
project_id = db.Column(db.String(255), db.ForeignKey('projects.id'), nullable=False)
workflow_id = db.Column(db.String(255), db.ForeignKey('workflows.id'), nullable=False)
position_x = db.Column(db.Float, default=0)
position_y = db.Column(db.Float, default=0)
width = db.Column(db.Float, default=300)
height = db.Column(db.Float, default=200)
z_index = db.Column(db.Integer, default=0)
color = db.Column(db.String(20), nullable=True)
minimized = db.Column(db.Boolean, default=False)
# Metadata
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
__table_args__ = (
db.Index('idx_canvas_project_id', 'project_id'),
db.Index('idx_canvas_workflow_id', 'workflow_id'),
db.Index('idx_canvas_project_workflow', 'project_id', 'workflow_id'),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary"""
return {
'id': self.id,
'projectId': self.project_id,
'workflowId': self.workflow_id,
'position': {'x': self.position_x, 'y': self.position_y},
'size': {'width': self.width, 'height': self.height},
'zIndex': self.z_index,
'color': self.color,
'minimized': self.minimized,
'createdAt': int(self.created_at.timestamp() * 1000),
'updatedAt': int(self.updated_at.timestamp() * 1000)
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'ProjectCanvasItem':
"""Create model from dictionary"""
position = data.get('position', {'x': 0, 'y': 0})
size = data.get('size', {'width': 300, 'height': 200})
canvas_item = ProjectCanvasItem(
id=data.get('id'),
project_id=data.get('projectId'),
workflow_id=data.get('workflowId'),
position_x=position.get('x', 0),
position_y=position.get('y', 0),
width=size.get('width', 300),
height=size.get('height', 200),
z_index=data.get('zIndex', 0),
color=data.get('color'),
minimized=data.get('minimized', False)
)
return canvas_item