mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-04-25 06:14:59 +00:00
- Python: class extending NodeExecutor + factory.py (80+ plugins) - TypeScript: class implements NodeExecutor + factory.ts (7 groups, 116 classes) - Go: struct with methods + factory.go (36 plugins) - Rust: struct impl NodeExecutor trait + factory.rs (54 plugins) - Mojo: struct + factory.mojo (11 plugins) All package.json files now include: - files array listing source files - metadata.class/struct field - metadata.entrypoint field This enables a unified plugin loading system across all languages with no import side effects (Spring-style DI pattern). Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
203 lines
5.7 KiB
Python
203 lines
5.7 KiB
Python
"""
|
|
Base classes and types for workflow plugins.
|
|
|
|
This module provides the class-based plugin architecture that mirrors
|
|
the TypeScript implementation for consistency across languages.
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Optional, Callable
|
|
from enum import Enum
|
|
import time
|
|
|
|
|
|
class NodeStatus(Enum):
|
|
"""Status of node execution."""
|
|
SUCCESS = "success"
|
|
ERROR = "error"
|
|
SKIPPED = "skipped"
|
|
PENDING = "pending"
|
|
|
|
|
|
@dataclass
|
|
class NodeResult:
|
|
"""Result of a node execution."""
|
|
status: NodeStatus
|
|
output: Optional[Any] = None
|
|
error: Optional[str] = None
|
|
error_code: Optional[str] = None
|
|
timestamp: int = field(default_factory=lambda: int(time.time() * 1000))
|
|
duration: Optional[int] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for serialization."""
|
|
result = {
|
|
"status": self.status.value,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
if self.output is not None:
|
|
result["output"] = self.output
|
|
if self.error is not None:
|
|
result["error"] = self.error
|
|
if self.error_code is not None:
|
|
result["errorCode"] = self.error_code
|
|
if self.duration is not None:
|
|
result["duration"] = self.duration
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class ValidationResult:
|
|
"""Result of node validation."""
|
|
valid: bool
|
|
errors: List[str] = field(default_factory=list)
|
|
warnings: List[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class WorkflowNode:
|
|
"""Workflow node definition."""
|
|
id: str
|
|
name: str
|
|
node_type: str
|
|
parameters: Dict[str, Any] = field(default_factory=dict)
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "WorkflowNode":
|
|
"""Create from dictionary."""
|
|
return cls(
|
|
id=data.get("id", ""),
|
|
name=data.get("name", ""),
|
|
node_type=data.get("nodeType", data.get("node_type", "")),
|
|
parameters=data.get("parameters", {}),
|
|
metadata=data.get("metadata", {}),
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class WorkflowContext:
|
|
"""Context for workflow execution."""
|
|
execution_id: str
|
|
tenant_id: str
|
|
user_id: str
|
|
trigger_data: Dict[str, Any] = field(default_factory=dict)
|
|
variables: Dict[str, Any] = field(default_factory=dict)
|
|
secrets: Dict[str, str] = field(default_factory=dict)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "WorkflowContext":
|
|
"""Create from dictionary."""
|
|
return cls(
|
|
execution_id=data.get("executionId", data.get("execution_id", "")),
|
|
tenant_id=data.get("tenantId", data.get("tenant_id", "")),
|
|
user_id=data.get("userId", data.get("user_id", "")),
|
|
trigger_data=data.get("triggerData", data.get("trigger_data", {})),
|
|
variables=data.get("variables", {}),
|
|
secrets=data.get("secrets", {}),
|
|
)
|
|
|
|
|
|
class ExecutionState(dict):
|
|
"""State dictionary for workflow execution with variable storage."""
|
|
|
|
@property
|
|
def variables(self) -> Dict[str, Any]:
|
|
"""Get or create variables dict."""
|
|
if "variables" not in self:
|
|
self["variables"] = {}
|
|
return self["variables"]
|
|
|
|
@variables.setter
|
|
def variables(self, value: Dict[str, Any]) -> None:
|
|
self["variables"] = value
|
|
|
|
|
|
class NodeExecutor(ABC):
|
|
"""
|
|
Abstract base class for node executors.
|
|
|
|
All workflow plugins should inherit from this class and implement
|
|
the execute() method. The run() method provides legacy compatibility.
|
|
"""
|
|
|
|
node_type: str = ""
|
|
category: str = ""
|
|
description: str = ""
|
|
|
|
@abstractmethod
|
|
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
|
|
"""
|
|
Execute the node logic.
|
|
|
|
Args:
|
|
inputs: Input parameters dictionary
|
|
runtime: Optional runtime context (for advanced use)
|
|
|
|
Returns:
|
|
Dict with 'result' key on success, or 'error' key on failure
|
|
"""
|
|
pass
|
|
|
|
def validate(self, inputs: Dict[str, Any]) -> ValidationResult:
|
|
"""
|
|
Validate inputs.
|
|
|
|
Override this method to add custom validation logic.
|
|
Default implementation returns valid=True.
|
|
|
|
Args:
|
|
inputs: Input parameters to validate
|
|
|
|
Returns:
|
|
ValidationResult with valid flag and any errors/warnings
|
|
"""
|
|
return ValidationResult(valid=True)
|
|
|
|
def run(self, runtime: Any, inputs: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Legacy interface - calls execute().
|
|
|
|
Args:
|
|
runtime: Runtime context (passed through)
|
|
inputs: Input parameters
|
|
|
|
Returns:
|
|
Dict with result or error
|
|
"""
|
|
return self.execute(inputs, runtime)
|
|
|
|
def _resolve(self, value: Any, inputs: Dict[str, Any]) -> Any:
|
|
"""
|
|
Resolve template expressions in values.
|
|
|
|
Handles {{variable}} syntax for dynamic values.
|
|
"""
|
|
if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"):
|
|
expr = value[2:-2].strip()
|
|
return self._get_nested(inputs, expr)
|
|
return value
|
|
|
|
def _get_nested(self, data: Dict[str, Any], path: str) -> Any:
|
|
"""Get nested value from dict using dot notation."""
|
|
parts = path.split(".")
|
|
current = data
|
|
|
|
for part in parts:
|
|
if isinstance(current, dict):
|
|
current = current.get(part)
|
|
elif hasattr(current, part):
|
|
current = getattr(current, part)
|
|
else:
|
|
return None
|
|
|
|
if current is None:
|
|
return None
|
|
|
|
return current
|
|
|
|
|
|
|
|
|