Files
metabuilder/workflow/executor/python/workflow_adapter.py
johndoe6345789 3d6ae4cbf7 feat: Add complete Python workflow executor from AutoMetabuilder
Add full Python workflow execution engine with:

Core Executor:
- engine.py: WorkflowEngine for running n8n configs
- n8n_executor.py: N8N-style workflow execution with connections
- node_executor.py: Individual node execution with plugin dispatch
- loop_executor.py: Loop node execution with iteration control
- execution_order.py: Topological sort for node ordering

Schema & Validation:
- n8n_schema.py: N8N workflow schema types and validation
- n8n_converter.py: Legacy to n8n schema conversion

Plugin System:
- plugin_loader.py: Dynamic plugin loading
- plugin_registry.py: Plugin discovery and registration
- plugin_map.json: 116 plugin type mappings

Runtime & Context:
- runtime.py: Workflow runtime container
- input_resolver.py: Binding and coercion resolution
- value_helpers.py: Value normalization helpers
- workflow_context_builder.py: Runtime context assembly
- workflow_config_loader.py: Configuration loading
- workflow_engine_builder.py: Engine assembly with dependencies

Utilities:
- tool_calls_handler.py: LLM tool call handling
- tool_runner.py: Tool execution with logging
- notification_helpers.py: Slack/Discord notifications
- workflow_adapter.py: N8N format handling
- workflow_graph.py: Node/edge graph for visualization

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 16:42:30 +00:00

49 lines
1.6 KiB
Python

"""N8N workflow format handler."""
from __future__ import annotations
import logging
from typing import Any, Dict
from .n8n_executor import N8NExecutor
logger = logging.getLogger(__name__)
def is_n8n_workflow(workflow: Dict[str, Any]) -> bool:
"""Check if workflow uses n8n schema."""
if not isinstance(workflow, dict):
return False
# N8N workflows must have explicit connections
has_connections = "connections" in workflow
nodes = workflow.get("nodes", [])
if not nodes:
return has_connections
# Check if nodes have n8n properties
first_node = nodes[0] if isinstance(nodes, list) and nodes else {}
has_position = "position" in first_node
has_type_version = "typeVersion" in first_node
has_name = "name" in first_node
return has_connections and (has_position or has_type_version or has_name)
class WorkflowAdapter:
"""Execute n8n workflows (breaking change: legacy format no longer supported)."""
def __init__(self, node_executor, runtime, plugin_registry):
self.runtime = runtime
self.plugin_registry = plugin_registry
self.n8n_executor = N8NExecutor(runtime, plugin_registry)
def execute(self, workflow: Dict[str, Any]) -> None:
"""Execute n8n workflow."""
if not is_n8n_workflow(workflow):
logger.error("Legacy workflow format is no longer supported. Please migrate to n8n schema.")
raise ValueError("Only n8n workflow format is supported")
logger.debug("Executing n8n workflow")
self.n8n_executor.execute(workflow)