Files
AutoMetabuilder/backend/autometabuilder/workflow/execution_order.py
copilot-swe-agent[bot] dc59911bdd Implement trigger-based workflow execution
Add support for workflow triggers to determine start node:
- Modified execution_order.py to accept optional start_node_id from triggers
- Updated N8NExecutor to use triggers for determining workflow entry point
- Added helper function to find start node from enabled manual triggers
- Created comprehensive tests for trigger-based execution
- Maintains backward compatibility when no triggers are specified

Co-authored-by: johndoe6345789 <224850594+johndoe6345789@users.noreply.github.com>
2026-01-11 00:12:51 +00:00

75 lines
2.3 KiB
Python

"""Build execution order for n8n workflows."""
from __future__ import annotations
from typing import Any, Dict, List, Set
def build_execution_order(
nodes: List[Dict[str, Any]],
connections: Dict[str, Any],
start_node_id: str | None = None
) -> List[str]:
"""Build topological execution order from connections.
Args:
nodes: List of workflow nodes
connections: Node connections map
start_node_id: Optional node ID to start execution from (from trigger)
Returns:
List of node names in execution order
"""
node_names = {node["name"] for node in nodes}
has_inputs = _find_nodes_with_inputs(connections)
# If a start node is specified (from trigger), use it
if start_node_id:
start_node_name = _find_node_name_by_id(nodes, start_node_id)
if start_node_name:
# Start with the trigger node
order = [start_node_name]
# Add remaining nodes
remaining = node_names - {start_node_name}
order.extend(_add_remaining_nodes(remaining))
return order
# Default: Start with nodes that have no inputs
order = [name for name in node_names if name not in has_inputs]
# Add remaining nodes (simplified BFS)
remaining = node_names - set(order)
order.extend(_add_remaining_nodes(remaining))
return order
def _find_nodes_with_inputs(connections: Dict[str, Any]) -> Set[str]:
"""Find all nodes that have incoming connections."""
has_inputs = set()
for source_name, outputs in connections.items():
for output_type, indices in outputs.items():
for targets in indices.values():
for target in targets:
has_inputs.add(target["node"])
return has_inputs
def _find_node_name_by_id(nodes: List[Dict[str, Any]], node_id: str) -> str | None:
"""Find node name by node ID."""
for node in nodes:
if node.get("id") == node_id:
return node.get("name")
return None
def _add_remaining_nodes(remaining: Set[str]) -> List[str]:
"""Add remaining nodes in order."""
order = []
while remaining:
name = next(iter(remaining))
order.append(name)
remaining.remove(name)
return order