mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-04-25 06:14:59 +00:00
Add full Python workflow execution engine with: Core Executor: - engine.py: WorkflowEngine for running n8n configs - n8n_executor.py: N8N-style workflow execution with connections - node_executor.py: Individual node execution with plugin dispatch - loop_executor.py: Loop node execution with iteration control - execution_order.py: Topological sort for node ordering Schema & Validation: - n8n_schema.py: N8N workflow schema types and validation - n8n_converter.py: Legacy to n8n schema conversion Plugin System: - plugin_loader.py: Dynamic plugin loading - plugin_registry.py: Plugin discovery and registration - plugin_map.json: 116 plugin type mappings Runtime & Context: - runtime.py: Workflow runtime container - input_resolver.py: Binding and coercion resolution - value_helpers.py: Value normalization helpers - workflow_context_builder.py: Runtime context assembly - workflow_config_loader.py: Configuration loading - workflow_engine_builder.py: Engine assembly with dependencies Utilities: - tool_calls_handler.py: LLM tool call handling - tool_runner.py: Tool execution with logging - notification_helpers.py: Slack/Discord notifications - workflow_adapter.py: N8N format handling - workflow_graph.py: Node/edge graph for visualization Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
32 lines
1.4 KiB
Python
32 lines
1.4 KiB
Python
"""Workflow engine runner for n8n format."""
|
|
from .workflow_adapter import WorkflowAdapter, is_n8n_workflow
|
|
|
|
|
|
class WorkflowEngine:
|
|
"""Run n8n workflow configs (breaking change: legacy format removed)."""
|
|
def __init__(self, workflow_config, node_executor, logger, runtime=None, plugin_registry=None):
|
|
self.workflow_config = workflow_config or {}
|
|
self.node_executor = node_executor
|
|
self.logger = logger
|
|
self.runtime = runtime
|
|
self.plugin_registry = plugin_registry
|
|
|
|
# Create adapter if we have runtime and plugin registry
|
|
if runtime and plugin_registry:
|
|
self.adapter = WorkflowAdapter(node_executor, runtime, plugin_registry)
|
|
else:
|
|
self.adapter = None
|
|
|
|
def execute(self):
|
|
"""Execute the n8n workflow config."""
|
|
# Enforce n8n format only
|
|
if not is_n8n_workflow(self.workflow_config):
|
|
self.logger.error("Legacy workflow format is no longer supported. Please migrate to n8n schema.")
|
|
raise ValueError("Only n8n workflow format is supported")
|
|
|
|
if self.adapter:
|
|
self.adapter.execute(self.workflow_config)
|
|
else:
|
|
self.logger.error("Workflow engine requires runtime and plugin_registry for n8n execution")
|
|
raise RuntimeError("Cannot execute n8n workflow without runtime and plugin_registry")
|