mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-04-25 06:14:59 +00:00
Add full Python workflow execution engine with: Core Executor: - engine.py: WorkflowEngine for running n8n configs - n8n_executor.py: N8N-style workflow execution with connections - node_executor.py: Individual node execution with plugin dispatch - loop_executor.py: Loop node execution with iteration control - execution_order.py: Topological sort for node ordering Schema & Validation: - n8n_schema.py: N8N workflow schema types and validation - n8n_converter.py: Legacy to n8n schema conversion Plugin System: - plugin_loader.py: Dynamic plugin loading - plugin_registry.py: Plugin discovery and registration - plugin_map.json: 116 plugin type mappings Runtime & Context: - runtime.py: Workflow runtime container - input_resolver.py: Binding and coercion resolution - value_helpers.py: Value normalization helpers - workflow_context_builder.py: Runtime context assembly - workflow_config_loader.py: Configuration loading - workflow_engine_builder.py: Engine assembly with dependencies Utilities: - tool_calls_handler.py: LLM tool call handling - tool_runner.py: Tool execution with logging - notification_helpers.py: Slack/Discord notifications - workflow_adapter.py: N8N format handling - workflow_graph.py: Node/edge graph for visualization Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
99 lines
3.1 KiB
Python
99 lines
3.1 KiB
Python
"""Execute n8n-style workflows with explicit connections."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any, Dict, List
|
|
|
|
from .execution_order import build_execution_order
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class N8NExecutor:
|
|
"""Execute n8n-style workflows."""
|
|
|
|
def __init__(self, runtime, plugin_registry):
|
|
self.runtime = runtime
|
|
self.plugin_registry = plugin_registry
|
|
|
|
def execute(self, workflow: Dict[str, Any]) -> None:
|
|
"""Execute n8n workflow."""
|
|
nodes = workflow.get("nodes", [])
|
|
connections = workflow.get("connections", {})
|
|
triggers = workflow.get("triggers", [])
|
|
|
|
if not nodes:
|
|
logger.warning("No nodes in workflow")
|
|
return
|
|
|
|
# Find enabled manual trigger (if any)
|
|
start_node_id = self._get_start_node_from_triggers(triggers)
|
|
|
|
# Build execution order from connections (optionally starting from trigger node)
|
|
execution_order = build_execution_order(nodes, connections, start_node_id)
|
|
|
|
# Execute nodes in order
|
|
for node_name in execution_order:
|
|
node = self._find_node_by_name(nodes, node_name)
|
|
if node:
|
|
self._execute_node(node)
|
|
|
|
def _get_start_node_from_triggers(self, triggers: List[Dict]) -> str | None:
|
|
"""Get start node ID from enabled manual triggers.
|
|
|
|
Args:
|
|
triggers: List of trigger definitions
|
|
|
|
Returns:
|
|
Node ID to start from, or None if no suitable trigger found
|
|
"""
|
|
if not triggers:
|
|
return None
|
|
|
|
# Find first enabled manual trigger
|
|
for trigger in triggers:
|
|
if trigger.get("kind") == "manual" and trigger.get("enabled", True):
|
|
return trigger.get("nodeId")
|
|
|
|
# If no manual trigger, use first enabled trigger of any kind
|
|
for trigger in triggers:
|
|
if trigger.get("enabled", True):
|
|
return trigger.get("nodeId")
|
|
|
|
return None
|
|
|
|
def _find_node_by_name(self, nodes: List[Dict], name: str) -> Dict | None:
|
|
"""Find node by name."""
|
|
for node in nodes:
|
|
if node.get("name") == name:
|
|
return node
|
|
return None
|
|
|
|
def _execute_node(self, node: Dict[str, Any]) -> Any:
|
|
"""Execute single node."""
|
|
node_type = node.get("type")
|
|
node_name = node.get("name", node.get("id"))
|
|
|
|
if node.get("disabled"):
|
|
logger.debug("Node %s is disabled, skipping", node_name)
|
|
return None
|
|
|
|
if node_type == "control.loop":
|
|
return self._execute_loop(node)
|
|
|
|
plugin = self.plugin_registry.get(node_type)
|
|
if not plugin:
|
|
logger.error("Unknown node type: %s", node_type)
|
|
return None
|
|
|
|
inputs = node.get("parameters", {})
|
|
logger.debug("Executing node %s (%s)", node_name, node_type)
|
|
|
|
result = plugin(self.runtime, inputs)
|
|
return result
|
|
|
|
def _execute_loop(self, node: Dict[str, Any]) -> Any:
|
|
"""Execute loop node (placeholder)."""
|
|
logger.debug("Loop execution not yet implemented in n8n executor")
|
|
return None
|