Files
metabuilder/workflow/plugins/python/base.py
johndoe6345789 7ce8b4ae8a refactor(workflow): convert all plugins to class/struct + factory pattern
- Python: class extending NodeExecutor + factory.py (80+ plugins)
- TypeScript: class implements NodeExecutor + factory.ts (7 groups, 116 classes)
- Go: struct with methods + factory.go (36 plugins)
- Rust: struct impl NodeExecutor trait + factory.rs (54 plugins)
- Mojo: struct + factory.mojo (11 plugins)

All package.json files now include:
- files array listing source files
- metadata.class/struct field
- metadata.entrypoint field

This enables a unified plugin loading system across all languages
with no import side effects (Spring-style DI pattern).

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-22 14:53:04 +00:00

203 lines
5.7 KiB
Python

"""
Base classes and types for workflow plugins.
This module provides the class-based plugin architecture that mirrors
the TypeScript implementation for consistency across languages.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable
from enum import Enum
import time
class NodeStatus(Enum):
"""Status of node execution."""
SUCCESS = "success"
ERROR = "error"
SKIPPED = "skipped"
PENDING = "pending"
@dataclass
class NodeResult:
"""Result of a node execution."""
status: NodeStatus
output: Optional[Any] = None
error: Optional[str] = None
error_code: Optional[str] = None
timestamp: int = field(default_factory=lambda: int(time.time() * 1000))
duration: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
result = {
"status": self.status.value,
"timestamp": self.timestamp,
}
if self.output is not None:
result["output"] = self.output
if self.error is not None:
result["error"] = self.error
if self.error_code is not None:
result["errorCode"] = self.error_code
if self.duration is not None:
result["duration"] = self.duration
return result
@dataclass
class ValidationResult:
"""Result of node validation."""
valid: bool
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
@dataclass
class WorkflowNode:
"""Workflow node definition."""
id: str
name: str
node_type: str
parameters: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "WorkflowNode":
"""Create from dictionary."""
return cls(
id=data.get("id", ""),
name=data.get("name", ""),
node_type=data.get("nodeType", data.get("node_type", "")),
parameters=data.get("parameters", {}),
metadata=data.get("metadata", {}),
)
@dataclass
class WorkflowContext:
"""Context for workflow execution."""
execution_id: str
tenant_id: str
user_id: str
trigger_data: Dict[str, Any] = field(default_factory=dict)
variables: Dict[str, Any] = field(default_factory=dict)
secrets: Dict[str, str] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "WorkflowContext":
"""Create from dictionary."""
return cls(
execution_id=data.get("executionId", data.get("execution_id", "")),
tenant_id=data.get("tenantId", data.get("tenant_id", "")),
user_id=data.get("userId", data.get("user_id", "")),
trigger_data=data.get("triggerData", data.get("trigger_data", {})),
variables=data.get("variables", {}),
secrets=data.get("secrets", {}),
)
class ExecutionState(dict):
"""State dictionary for workflow execution with variable storage."""
@property
def variables(self) -> Dict[str, Any]:
"""Get or create variables dict."""
if "variables" not in self:
self["variables"] = {}
return self["variables"]
@variables.setter
def variables(self, value: Dict[str, Any]) -> None:
self["variables"] = value
class NodeExecutor(ABC):
"""
Abstract base class for node executors.
All workflow plugins should inherit from this class and implement
the execute() method. The run() method provides legacy compatibility.
"""
node_type: str = ""
category: str = ""
description: str = ""
@abstractmethod
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""
Execute the node logic.
Args:
inputs: Input parameters dictionary
runtime: Optional runtime context (for advanced use)
Returns:
Dict with 'result' key on success, or 'error' key on failure
"""
pass
def validate(self, inputs: Dict[str, Any]) -> ValidationResult:
"""
Validate inputs.
Override this method to add custom validation logic.
Default implementation returns valid=True.
Args:
inputs: Input parameters to validate
Returns:
ValidationResult with valid flag and any errors/warnings
"""
return ValidationResult(valid=True)
def run(self, runtime: Any, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""
Legacy interface - calls execute().
Args:
runtime: Runtime context (passed through)
inputs: Input parameters
Returns:
Dict with result or error
"""
return self.execute(inputs, runtime)
def _resolve(self, value: Any, inputs: Dict[str, Any]) -> Any:
"""
Resolve template expressions in values.
Handles {{variable}} syntax for dynamic values.
"""
if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"):
expr = value[2:-2].strip()
return self._get_nested(inputs, expr)
return value
def _get_nested(self, data: Dict[str, Any], path: str) -> Any:
"""Get nested value from dict using dot notation."""
parts = path.split(".")
current = data
for part in parts:
if isinstance(current, dict):
current = current.get(part)
elif hasattr(current, part):
current = getattr(current, part)
else:
return None
if current is None:
return None
return current