mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-04-25 14:25:02 +00:00
Add full Python workflow execution engine with: Core Executor: - engine.py: WorkflowEngine for running n8n configs - n8n_executor.py: N8N-style workflow execution with connections - node_executor.py: Individual node execution with plugin dispatch - loop_executor.py: Loop node execution with iteration control - execution_order.py: Topological sort for node ordering Schema & Validation: - n8n_schema.py: N8N workflow schema types and validation - n8n_converter.py: Legacy to n8n schema conversion Plugin System: - plugin_loader.py: Dynamic plugin loading - plugin_registry.py: Plugin discovery and registration - plugin_map.json: 116 plugin type mappings Runtime & Context: - runtime.py: Workflow runtime container - input_resolver.py: Binding and coercion resolution - value_helpers.py: Value normalization helpers - workflow_context_builder.py: Runtime context assembly - workflow_config_loader.py: Configuration loading - workflow_engine_builder.py: Engine assembly with dependencies Utilities: - tool_calls_handler.py: LLM tool call handling - tool_runner.py: Tool execution with logging - notification_helpers.py: Slack/Discord notifications - workflow_adapter.py: N8N format handling - workflow_graph.py: Node/edge graph for visualization Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
75 lines
2.2 KiB
Python
75 lines
2.2 KiB
Python
"""Build execution order for n8n workflows."""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, List, Set
|
|
|
|
|
|
def build_execution_order(
|
|
nodes: List[Dict[str, Any]],
|
|
connections: Dict[str, Any],
|
|
start_node_id: str | None = None
|
|
) -> List[str]:
|
|
"""Build topological execution order from connections.
|
|
|
|
Args:
|
|
nodes: List of workflow nodes
|
|
connections: Node connections map
|
|
start_node_id: Optional node ID to start execution from (from trigger)
|
|
|
|
Returns:
|
|
List of node names in execution order
|
|
"""
|
|
node_names = {node["name"] for node in nodes}
|
|
has_inputs = _find_nodes_with_inputs(connections)
|
|
|
|
# If a start node is specified (from trigger), use it
|
|
if start_node_id:
|
|
start_node_name = _find_node_name_by_id(nodes, start_node_id)
|
|
if start_node_name:
|
|
# Start with the trigger node
|
|
order = [start_node_name]
|
|
# Add remaining nodes
|
|
remaining = node_names - {start_node_name}
|
|
order.extend(_add_remaining_nodes(remaining))
|
|
return order
|
|
|
|
# Default: Start with nodes that have no inputs
|
|
order = [name for name in node_names if name not in has_inputs]
|
|
|
|
# Add remaining nodes (simplified BFS)
|
|
remaining = node_names - set(order)
|
|
order.extend(_add_remaining_nodes(remaining))
|
|
|
|
return order
|
|
|
|
|
|
def _find_nodes_with_inputs(connections: Dict[str, Any]) -> Set[str]:
|
|
"""Find all nodes that have incoming connections."""
|
|
has_inputs = set()
|
|
|
|
for source_name, outputs in connections.items():
|
|
for output_type, indices in outputs.items():
|
|
for targets in indices.values():
|
|
for target in targets:
|
|
has_inputs.add(target["node"])
|
|
|
|
return has_inputs
|
|
|
|
|
|
def _find_node_name_by_id(nodes: List[Dict[str, Any]], node_id: str) -> str | None:
|
|
"""Find node name by node ID."""
|
|
for node in nodes:
|
|
if node.get("id") == node_id:
|
|
return node.get("name")
|
|
return None
|
|
|
|
|
|
def _add_remaining_nodes(remaining: Set[str]) -> List[str]:
|
|
"""Add remaining nodes in order."""
|
|
order = []
|
|
while remaining:
|
|
name = next(iter(remaining))
|
|
order.append(name)
|
|
remaining.remove(name)
|
|
return order
|