Files
metabuilder/workflow/executor/python/execution_order.py
johndoe6345789 3d6ae4cbf7 feat: Add complete Python workflow executor from AutoMetabuilder
Add full Python workflow execution engine with:

Core Executor:
- engine.py: WorkflowEngine for running n8n configs
- n8n_executor.py: N8N-style workflow execution with connections
- node_executor.py: Individual node execution with plugin dispatch
- loop_executor.py: Loop node execution with iteration control
- execution_order.py: Topological sort for node ordering

Schema & Validation:
- n8n_schema.py: N8N workflow schema types and validation
- n8n_converter.py: Legacy to n8n schema conversion

Plugin System:
- plugin_loader.py: Dynamic plugin loading
- plugin_registry.py: Plugin discovery and registration
- plugin_map.json: 116 plugin type mappings

Runtime & Context:
- runtime.py: Workflow runtime container
- input_resolver.py: Binding and coercion resolution
- value_helpers.py: Value normalization helpers
- workflow_context_builder.py: Runtime context assembly
- workflow_config_loader.py: Configuration loading
- workflow_engine_builder.py: Engine assembly with dependencies

Utilities:
- tool_calls_handler.py: LLM tool call handling
- tool_runner.py: Tool execution with logging
- notification_helpers.py: Slack/Discord notifications
- workflow_adapter.py: N8N format handling
- workflow_graph.py: Node/edge graph for visualization

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 16:42:30 +00:00

75 lines
2.2 KiB
Python

"""Build execution order for n8n workflows."""
from __future__ import annotations
from typing import Any, Dict, List, Set
def build_execution_order(
nodes: List[Dict[str, Any]],
connections: Dict[str, Any],
start_node_id: str | None = None
) -> List[str]:
"""Build topological execution order from connections.
Args:
nodes: List of workflow nodes
connections: Node connections map
start_node_id: Optional node ID to start execution from (from trigger)
Returns:
List of node names in execution order
"""
node_names = {node["name"] for node in nodes}
has_inputs = _find_nodes_with_inputs(connections)
# If a start node is specified (from trigger), use it
if start_node_id:
start_node_name = _find_node_name_by_id(nodes, start_node_id)
if start_node_name:
# Start with the trigger node
order = [start_node_name]
# Add remaining nodes
remaining = node_names - {start_node_name}
order.extend(_add_remaining_nodes(remaining))
return order
# Default: Start with nodes that have no inputs
order = [name for name in node_names if name not in has_inputs]
# Add remaining nodes (simplified BFS)
remaining = node_names - set(order)
order.extend(_add_remaining_nodes(remaining))
return order
def _find_nodes_with_inputs(connections: Dict[str, Any]) -> Set[str]:
"""Find all nodes that have incoming connections."""
has_inputs = set()
for source_name, outputs in connections.items():
for output_type, indices in outputs.items():
for targets in indices.values():
for target in targets:
has_inputs.add(target["node"])
return has_inputs
def _find_node_name_by_id(nodes: List[Dict[str, Any]], node_id: str) -> str | None:
"""Find node name by node ID."""
for node in nodes:
if node.get("id") == node_id:
return node.get("name")
return None
def _add_remaining_nodes(remaining: Set[str]) -> List[str]:
"""Add remaining nodes in order."""
order = []
while remaining:
name = next(iter(remaining))
order.append(name)
remaining.remove(name)
return order