mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-04-24 22:04:56 +00:00
429 lines
16 KiB
Python
429 lines
16 KiB
Python
"""
|
|
SQLAlchemy Database Models
|
|
Defines the data schema for workflows, executions, and node types
|
|
"""
|
|
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from datetime import datetime
|
|
import json
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
db = SQLAlchemy()
|
|
|
|
|
|
class User(db.Model):
|
|
"""User model for authentication"""
|
|
|
|
__tablename__ = 'users'
|
|
|
|
id = db.Column(db.String(255), primary_key=True)
|
|
email = db.Column(db.String(255), unique=True, nullable=False, index=True)
|
|
password_hash = db.Column(db.String(255), nullable=False)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
|
|
__table_args__ = (
|
|
db.Index('idx_email', 'email'),
|
|
)
|
|
|
|
|
|
class Workflow(db.Model):
|
|
"""Workflow model representing a complete DAG workflow"""
|
|
|
|
__tablename__ = 'workflows'
|
|
|
|
id = db.Column(db.String(255), primary_key=True)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
description = db.Column(db.Text, default='')
|
|
version = db.Column(db.String(50), default='1.0.0')
|
|
tenant_id = db.Column(db.String(255), nullable=False, index=True)
|
|
|
|
# JSON fields for workflow structure
|
|
nodes_json = db.Column(db.Text, default='[]') # Array of node objects
|
|
connections_json = db.Column(db.Text, default='[]') # Array of edge objects
|
|
tags_json = db.Column(db.Text, default='[]') # Array of tag strings
|
|
|
|
# Project organization (NEW)
|
|
project_id = db.Column(db.String(255), db.ForeignKey('projects.id'), nullable=True)
|
|
workspace_id = db.Column(db.String(255), db.ForeignKey('workspaces.id'), nullable=True)
|
|
starred = db.Column(db.Boolean, default=False)
|
|
|
|
# Metadata
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
|
|
# Relationships
|
|
executions = db.relationship('Execution', backref='workflow', cascade='all, delete-orphan', lazy=True)
|
|
|
|
# Indexes for efficient querying
|
|
__table_args__ = (
|
|
db.Index('idx_workflow_tenant_id', 'tenant_id'),
|
|
db.Index('idx_workflow_tenant_name', 'tenant_id', 'name'),
|
|
db.Index('idx_workflow_project_id', 'project_id'),
|
|
db.Index('idx_workflow_workspace_id', 'workspace_id'),
|
|
db.Index('idx_workflow_tenant_project', 'tenant_id', 'project_id'),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'name': self.name,
|
|
'description': self.description,
|
|
'version': self.version,
|
|
'tenantId': self.tenant_id,
|
|
'projectId': self.project_id,
|
|
'workspaceId': self.workspace_id,
|
|
'starred': self.starred,
|
|
'nodes': json.loads(self.nodes_json),
|
|
'connections': json.loads(self.connections_json),
|
|
'tags': json.loads(self.tags_json),
|
|
'createdAt': int(self.created_at.timestamp() * 1000),
|
|
'updatedAt': int(self.updated_at.timestamp() * 1000)
|
|
}
|
|
|
|
@staticmethod
|
|
def from_dict(data: Dict[str, Any]) -> 'Workflow':
|
|
"""Create model from dictionary"""
|
|
workflow = Workflow(
|
|
id=data.get('id'),
|
|
name=data.get('name', 'Untitled'),
|
|
description=data.get('description', ''),
|
|
version=data.get('version', '1.0.0'),
|
|
tenant_id=data.get('tenantId', 'default'),
|
|
project_id=data.get('projectId'),
|
|
workspace_id=data.get('workspaceId'),
|
|
starred=data.get('starred', False),
|
|
nodes_json=json.dumps(data.get('nodes', [])),
|
|
connections_json=json.dumps(data.get('connections', [])),
|
|
tags_json=json.dumps(data.get('tags', []))
|
|
)
|
|
return workflow
|
|
|
|
|
|
class Execution(db.Model):
|
|
"""Execution model representing a workflow execution run"""
|
|
|
|
__tablename__ = 'executions'
|
|
|
|
id = db.Column(db.String(255), primary_key=True)
|
|
workflow_id = db.Column(db.String(255), db.ForeignKey('workflows.id'), nullable=False, index=True)
|
|
workflow_name = db.Column(db.String(255), nullable=False)
|
|
tenant_id = db.Column(db.String(255), nullable=False, index=True)
|
|
|
|
# Execution state
|
|
status = db.Column(db.String(50), nullable=False, default='pending') # pending, running, success, error, stopped
|
|
start_time = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
end_time = db.Column(db.DateTime, nullable=True)
|
|
duration = db.Column(db.Integer, nullable=True) # milliseconds
|
|
|
|
# Results and errors
|
|
nodes_json = db.Column(db.Text, default='[]') # Array of node execution results
|
|
error_json = db.Column(db.Text, nullable=True) # JSON error object
|
|
input_json = db.Column(db.Text, nullable=True) # Input parameters
|
|
output_json = db.Column(db.Text, nullable=True) # Final output
|
|
|
|
# Metadata
|
|
triggered_by = db.Column(db.String(255), nullable=True)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
|
|
# Indexes for efficient querying
|
|
__table_args__ = (
|
|
db.Index('idx_execution_workflow_id', 'workflow_id'),
|
|
db.Index('idx_execution_tenant_workflow', 'tenant_id', 'workflow_id'),
|
|
db.Index('idx_execution_status', 'status'),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'workflowId': self.workflow_id,
|
|
'workflowName': self.workflow_name,
|
|
'tenantId': self.tenant_id,
|
|
'status': self.status,
|
|
'startTime': int(self.start_time.timestamp() * 1000),
|
|
'endTime': int(self.end_time.timestamp() * 1000) if self.end_time else None,
|
|
'duration': self.duration,
|
|
'nodes': json.loads(self.nodes_json) if self.nodes_json else [],
|
|
'error': json.loads(self.error_json) if self.error_json else None,
|
|
'input': json.loads(self.input_json) if self.input_json else None,
|
|
'output': json.loads(self.output_json) if self.output_json else None,
|
|
'triggeredBy': self.triggered_by,
|
|
'createdAt': int(self.created_at.timestamp() * 1000)
|
|
}
|
|
|
|
@staticmethod
|
|
def from_dict(data: Dict[str, Any]) -> 'Execution':
|
|
"""Create model from dictionary"""
|
|
execution = Execution(
|
|
id=data.get('id'),
|
|
workflow_id=data.get('workflowId'),
|
|
workflow_name=data.get('workflowName'),
|
|
tenant_id=data.get('tenantId', 'default'),
|
|
status=data.get('status', 'pending'),
|
|
nodes_json=json.dumps(data.get('nodes', [])),
|
|
error_json=json.dumps(data.get('error')) if data.get('error') else None,
|
|
input_json=json.dumps(data.get('input')) if data.get('input') else None,
|
|
output_json=json.dumps(data.get('output')) if data.get('output') else None,
|
|
triggered_by=data.get('triggeredBy')
|
|
)
|
|
return execution
|
|
|
|
|
|
class NodeType(db.Model):
|
|
"""NodeType model caching available node types"""
|
|
|
|
__tablename__ = 'node_types'
|
|
|
|
id = db.Column(db.String(255), primary_key=True)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
version = db.Column(db.String(50), default='1.0.0')
|
|
category = db.Column(db.String(100), nullable=False, index=True)
|
|
description = db.Column(db.Text, default='')
|
|
icon = db.Column(db.String(100), nullable=True)
|
|
|
|
# JSON field for node configuration
|
|
parameters_json = db.Column(db.Text, default='{}')
|
|
|
|
# Metadata
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
|
|
__table_args__ = (
|
|
db.Index('idx_category', 'category'),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'name': self.name,
|
|
'version': self.version,
|
|
'category': self.category,
|
|
'description': self.description,
|
|
'icon': self.icon,
|
|
'parameters': json.loads(self.parameters_json)
|
|
}
|
|
|
|
@staticmethod
|
|
def from_dict(data: Dict[str, Any]) -> 'NodeType':
|
|
"""Create model from dictionary"""
|
|
node_type = NodeType(
|
|
id=data.get('id'),
|
|
name=data.get('name'),
|
|
version=data.get('version', '1.0.0'),
|
|
category=data.get('category'),
|
|
description=data.get('description', ''),
|
|
icon=data.get('icon'),
|
|
parameters_json=json.dumps(data.get('parameters', {}))
|
|
)
|
|
return node_type
|
|
|
|
|
|
class AuditLog(db.Model):
|
|
"""AuditLog model for tracking workflow changes"""
|
|
|
|
__tablename__ = 'audit_logs'
|
|
|
|
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
|
workflow_id = db.Column(db.String(255), nullable=False, index=True)
|
|
tenant_id = db.Column(db.String(255), nullable=False, index=True)
|
|
|
|
action = db.Column(db.String(50), nullable=False) # create, update, delete, execute
|
|
entity_type = db.Column(db.String(100), nullable=False) # workflow, execution
|
|
changes_json = db.Column(db.Text, nullable=True) # JSON of changes
|
|
|
|
user_id = db.Column(db.String(255), nullable=True)
|
|
ip_address = db.Column(db.String(100), nullable=True)
|
|
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
|
|
__table_args__ = (
|
|
db.Index('idx_audit_workflow_id', 'workflow_id'),
|
|
db.Index('idx_audit_tenant_id', 'tenant_id'),
|
|
db.Index('idx_audit_action', 'action'),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'workflowId': self.workflow_id,
|
|
'tenantId': self.tenant_id,
|
|
'action': self.action,
|
|
'entityType': self.entity_type,
|
|
'changes': json.loads(self.changes_json) if self.changes_json else None,
|
|
'userId': self.user_id,
|
|
'ipAddress': self.ip_address,
|
|
'createdAt': int(self.created_at.timestamp() * 1000)
|
|
}
|
|
|
|
|
|
class Workspace(db.Model):
|
|
"""Workspace model representing a top-level workspace container"""
|
|
|
|
__tablename__ = 'workspaces'
|
|
|
|
id = db.Column(db.String(255), primary_key=True)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
description = db.Column(db.Text, default='')
|
|
icon = db.Column(db.String(100), nullable=True)
|
|
color = db.Column(db.String(20), default='#1976d2')
|
|
tenant_id = db.Column(db.String(255), nullable=False, index=True)
|
|
|
|
# Metadata
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
|
|
# Relationships
|
|
projects = db.relationship('Project', backref='workspace', cascade='all, delete-orphan', lazy=True)
|
|
|
|
__table_args__ = (
|
|
db.Index('idx_workspace_tenant_id', 'tenant_id'),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'name': self.name,
|
|
'description': self.description,
|
|
'icon': self.icon,
|
|
'color': self.color,
|
|
'tenantId': self.tenant_id,
|
|
'createdAt': int(self.created_at.timestamp() * 1000),
|
|
'updatedAt': int(self.updated_at.timestamp() * 1000)
|
|
}
|
|
|
|
@staticmethod
|
|
def from_dict(data: Dict[str, Any]) -> 'Workspace':
|
|
"""Create model from dictionary"""
|
|
workspace = Workspace(
|
|
id=data.get('id'),
|
|
name=data.get('name', 'Untitled'),
|
|
description=data.get('description', ''),
|
|
icon=data.get('icon'),
|
|
color=data.get('color', '#1976d2'),
|
|
tenant_id=data.get('tenantId', 'default')
|
|
)
|
|
return workspace
|
|
|
|
|
|
class Project(db.Model):
|
|
"""Project model representing a project container within a workspace"""
|
|
|
|
__tablename__ = 'projects'
|
|
|
|
id = db.Column(db.String(255), primary_key=True)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
description = db.Column(db.Text, default='')
|
|
workspace_id = db.Column(db.String(255), db.ForeignKey('workspaces.id'), nullable=False)
|
|
tenant_id = db.Column(db.String(255), nullable=False, index=True)
|
|
color = db.Column(db.String(20), default='#1976d2')
|
|
starred = db.Column(db.Boolean, default=False)
|
|
|
|
# Metadata
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
|
|
# Relationships
|
|
canvas_items = db.relationship('ProjectCanvasItem', backref='project', cascade='all, delete-orphan', lazy=True)
|
|
|
|
__table_args__ = (
|
|
db.Index('idx_project_workspace_id', 'workspace_id'),
|
|
db.Index('idx_project_tenant_id', 'tenant_id'),
|
|
db.Index('idx_project_starred', 'starred'),
|
|
db.Index('idx_project_tenant_workspace', 'tenant_id', 'workspace_id'),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'name': self.name,
|
|
'description': self.description,
|
|
'workspaceId': self.workspace_id,
|
|
'tenantId': self.tenant_id,
|
|
'color': self.color,
|
|
'starred': self.starred,
|
|
'createdAt': int(self.created_at.timestamp() * 1000),
|
|
'updatedAt': int(self.updated_at.timestamp() * 1000)
|
|
}
|
|
|
|
@staticmethod
|
|
def from_dict(data: Dict[str, Any]) -> 'Project':
|
|
"""Create model from dictionary"""
|
|
project = Project(
|
|
id=data.get('id'),
|
|
name=data.get('name', 'Untitled'),
|
|
description=data.get('description', ''),
|
|
workspace_id=data.get('workspaceId'),
|
|
tenant_id=data.get('tenantId', 'default'),
|
|
color=data.get('color', '#1976d2'),
|
|
starred=data.get('starred', False)
|
|
)
|
|
return project
|
|
|
|
|
|
class ProjectCanvasItem(db.Model):
|
|
"""ProjectCanvasItem model representing a workflow card on the canvas"""
|
|
|
|
__tablename__ = 'project_canvas_items'
|
|
|
|
id = db.Column(db.String(255), primary_key=True)
|
|
project_id = db.Column(db.String(255), db.ForeignKey('projects.id'), nullable=False)
|
|
workflow_id = db.Column(db.String(255), db.ForeignKey('workflows.id'), nullable=False)
|
|
position_x = db.Column(db.Float, default=0)
|
|
position_y = db.Column(db.Float, default=0)
|
|
width = db.Column(db.Float, default=300)
|
|
height = db.Column(db.Float, default=200)
|
|
z_index = db.Column(db.Integer, default=0)
|
|
color = db.Column(db.String(20), nullable=True)
|
|
minimized = db.Column(db.Boolean, default=False)
|
|
|
|
# Metadata
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
|
|
__table_args__ = (
|
|
db.Index('idx_canvas_project_id', 'project_id'),
|
|
db.Index('idx_canvas_workflow_id', 'workflow_id'),
|
|
db.Index('idx_canvas_project_workflow', 'project_id', 'workflow_id'),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'projectId': self.project_id,
|
|
'workflowId': self.workflow_id,
|
|
'position': {'x': self.position_x, 'y': self.position_y},
|
|
'size': {'width': self.width, 'height': self.height},
|
|
'zIndex': self.z_index,
|
|
'color': self.color,
|
|
'minimized': self.minimized,
|
|
'createdAt': int(self.created_at.timestamp() * 1000),
|
|
'updatedAt': int(self.updated_at.timestamp() * 1000)
|
|
}
|
|
|
|
@staticmethod
|
|
def from_dict(data: Dict[str, Any]) -> 'ProjectCanvasItem':
|
|
"""Create model from dictionary"""
|
|
position = data.get('position', {'x': 0, 'y': 0})
|
|
size = data.get('size', {'width': 300, 'height': 200})
|
|
canvas_item = ProjectCanvasItem(
|
|
id=data.get('id'),
|
|
project_id=data.get('projectId'),
|
|
workflow_id=data.get('workflowId'),
|
|
position_x=position.get('x', 0),
|
|
position_y=position.get('y', 0),
|
|
width=size.get('width', 300),
|
|
height=size.get('height', 200),
|
|
z_index=data.get('zIndex', 0),
|
|
color=data.get('color'),
|
|
minimized=data.get('minimized', False)
|
|
)
|
|
return canvas_item
|