Files
copilot-swe-agent[bot] dc59911bdd Implement trigger-based workflow execution
Add support for workflow triggers to determine start node:
- Modified execution_order.py to accept optional start_node_id from triggers
- Updated N8NExecutor to use triggers for determining workflow entry point
- Added helper function to find start node from enabled manual triggers
- Created comprehensive tests for trigger-based execution
- Maintains backward compatibility when no triggers are specified

Co-authored-by: johndoe6345789 <224850594+johndoe6345789@users.noreply.github.com>
2026-01-11 00:12:51 +00:00

99 lines
3.3 KiB
Python

"""Execute n8n-style workflows with explicit connections."""
from __future__ import annotations
import logging
from typing import Any, Dict, List
from .execution_order import build_execution_order
logger = logging.getLogger(__name__)
class N8NExecutor:
"""Execute n8n-style workflows."""
def __init__(self, runtime, plugin_registry):
self.runtime = runtime
self.plugin_registry = plugin_registry
def execute(self, workflow: Dict[str, Any]) -> None:
"""Execute n8n workflow."""
nodes = workflow.get("nodes", [])
connections = workflow.get("connections", {})
triggers = workflow.get("triggers", [])
if not nodes:
logger.warning("No nodes in workflow")
return
# Find enabled manual trigger (if any)
start_node_id = self._get_start_node_from_triggers(triggers)
# Build execution order from connections (optionally starting from trigger node)
execution_order = build_execution_order(nodes, connections, start_node_id)
# Execute nodes in order
for node_name in execution_order:
node = self._find_node_by_name(nodes, node_name)
if node:
self._execute_node(node)
def _get_start_node_from_triggers(self, triggers: List[Dict]) -> str | None:
"""Get start node ID from enabled manual triggers.
Args:
triggers: List of trigger definitions
Returns:
Node ID to start from, or None if no suitable trigger found
"""
if not triggers:
return None
# Find first enabled manual trigger
for trigger in triggers:
if trigger.get("kind") == "manual" and trigger.get("enabled", True):
return trigger.get("nodeId")
# If no manual trigger, use first enabled trigger of any kind
for trigger in triggers:
if trigger.get("enabled", True):
return trigger.get("nodeId")
return None
def _find_node_by_name(self, nodes: List[Dict], name: str) -> Dict | None:
"""Find node by name."""
for node in nodes:
if node.get("name") == name:
return node
return None
def _execute_node(self, node: Dict[str, Any]) -> Any:
"""Execute single node."""
node_type = node.get("type")
node_name = node.get("name", node.get("id"))
if node.get("disabled"):
logger.debug("Node %s is disabled, skipping", node_name)
return None
if node_type == "control.loop":
return self._execute_loop(node)
plugin = self.plugin_registry.get(node_type)
if not plugin:
logger.error("Unknown node type: %s", node_type)
return None
inputs = node.get("parameters", {})
logger.debug("Executing node %s (%s)", node_name, node_type)
result = plugin(self.runtime, inputs)
return result
def _execute_loop(self, node: Dict[str, Any]) -> Any:
"""Execute loop node (placeholder)."""
logger.debug("Loop execution not yet implemented in n8n executor")
return None