diff --git a/workflow/executor/python/__init__.py b/workflow/executor/python/__init__.py new file mode 100644 index 000000000..6774c978f --- /dev/null +++ b/workflow/executor/python/__init__.py @@ -0,0 +1,46 @@ +"""Workflow engine package. + +This package provides a declarative workflow engine that executes n8n-style workflows. + +Core Modules: + engine.py - Main workflow engine and execution coordinator + runtime.py - Runtime context and state management + +Execution: + n8n_executor.py - N8N workflow format executor + node_executor.py - Individual node execution + execution_order.py - Topological sort for node execution order + loop_executor.py - Loop iteration execution + +N8N Support: + n8n_schema.py - N8N workflow schema definitions + n8n_converter.py - Convert legacy workflows to N8N format + workflow_adapter.py - Workflow format adapter + +Plugin System: + plugin_loader.py - Plugin loading utilities + plugin_registry.py - Plugin registration and discovery + plugin_map.json - Plugin name to module path mapping + plugins/ - Organized workflow plugins by category + +Utilities: + input_resolver.py - Input value resolution and variable binding + value_helpers.py - Value type checking and conversion helpers + tool_runner.py - Tool execution wrapper + tool_calls_handler.py - AI tool calls processing + +Workflow Engine Building: + workflow_config_loader.py - Load workflow configuration JSON + workflow_context_builder.py - Build workflow runtime context + workflow_engine_builder.py - Assemble workflow engine with dependencies +""" + +from .workflow_config_loader import load_workflow_config +from .workflow_context_builder import build_workflow_context +from .workflow_engine_builder import build_workflow_engine + +__all__ = [ + "load_workflow_config", + "build_workflow_context", + "build_workflow_engine", +] diff --git a/workflow/executor/python/engine.py b/workflow/executor/python/engine.py new file mode 100644 index 000000000..9223783d0 --- /dev/null +++ b/workflow/executor/python/engine.py @@ -0,0 +1,31 @@ +"""Workflow engine runner for n8n format.""" +from .workflow_adapter import WorkflowAdapter, is_n8n_workflow + + +class WorkflowEngine: + """Run n8n workflow configs (breaking change: legacy format removed).""" + def __init__(self, workflow_config, node_executor, logger, runtime=None, plugin_registry=None): + self.workflow_config = workflow_config or {} + self.node_executor = node_executor + self.logger = logger + self.runtime = runtime + self.plugin_registry = plugin_registry + + # Create adapter if we have runtime and plugin registry + if runtime and plugin_registry: + self.adapter = WorkflowAdapter(node_executor, runtime, plugin_registry) + else: + self.adapter = None + + def execute(self): + """Execute the n8n workflow config.""" + # Enforce n8n format only + if not is_n8n_workflow(self.workflow_config): + self.logger.error("Legacy workflow format is no longer supported. Please migrate to n8n schema.") + raise ValueError("Only n8n workflow format is supported") + + if self.adapter: + self.adapter.execute(self.workflow_config) + else: + self.logger.error("Workflow engine requires runtime and plugin_registry for n8n execution") + raise RuntimeError("Cannot execute n8n workflow without runtime and plugin_registry") diff --git a/workflow/executor/python/execution_order.py b/workflow/executor/python/execution_order.py new file mode 100644 index 000000000..866938c08 --- /dev/null +++ b/workflow/executor/python/execution_order.py @@ -0,0 +1,74 @@ +"""Build execution order for n8n workflows.""" +from __future__ import annotations + +from typing import Any, Dict, List, Set + + +def build_execution_order( + nodes: List[Dict[str, Any]], + connections: Dict[str, Any], + start_node_id: str | None = None +) -> List[str]: + """Build topological execution order from connections. + + Args: + nodes: List of workflow nodes + connections: Node connections map + start_node_id: Optional node ID to start execution from (from trigger) + + Returns: + List of node names in execution order + """ + node_names = {node["name"] for node in nodes} + has_inputs = _find_nodes_with_inputs(connections) + + # If a start node is specified (from trigger), use it + if start_node_id: + start_node_name = _find_node_name_by_id(nodes, start_node_id) + if start_node_name: + # Start with the trigger node + order = [start_node_name] + # Add remaining nodes + remaining = node_names - {start_node_name} + order.extend(_add_remaining_nodes(remaining)) + return order + + # Default: Start with nodes that have no inputs + order = [name for name in node_names if name not in has_inputs] + + # Add remaining nodes (simplified BFS) + remaining = node_names - set(order) + order.extend(_add_remaining_nodes(remaining)) + + return order + + +def _find_nodes_with_inputs(connections: Dict[str, Any]) -> Set[str]: + """Find all nodes that have incoming connections.""" + has_inputs = set() + + for source_name, outputs in connections.items(): + for output_type, indices in outputs.items(): + for targets in indices.values(): + for target in targets: + has_inputs.add(target["node"]) + + return has_inputs + + +def _find_node_name_by_id(nodes: List[Dict[str, Any]], node_id: str) -> str | None: + """Find node name by node ID.""" + for node in nodes: + if node.get("id") == node_id: + return node.get("name") + return None + + +def _add_remaining_nodes(remaining: Set[str]) -> List[str]: + """Add remaining nodes in order.""" + order = [] + while remaining: + name = next(iter(remaining)) + order.append(name) + remaining.remove(name) + return order diff --git a/workflow/executor/python/input_resolver.py b/workflow/executor/python/input_resolver.py new file mode 100644 index 000000000..c2b8c31ed --- /dev/null +++ b/workflow/executor/python/input_resolver.py @@ -0,0 +1,22 @@ +"""Resolve workflow bindings and coercions.""" +from .value_helpers import ValueHelpers + + +class InputResolver: + """Resolve bindings in workflow inputs.""" + def __init__(self, store: dict): + self.store = store + + def resolve_inputs(self, inputs: dict) -> dict: + """Resolve bindings for every input.""" + return {key: self.resolve_binding(value) for key, value in (inputs or {}).items()} + + def resolve_binding(self, value): + """Resolve a single binding value.""" + if isinstance(value, str) and value.startswith("$"): + return self.store.get(value[1:]) + return value + + def coerce_bool(self, value) -> bool: + """Coerce values into booleans.""" + return ValueHelpers.coerce_bool(value) diff --git a/workflow/executor/python/loop_executor.py b/workflow/executor/python/loop_executor.py new file mode 100644 index 000000000..594a18621 --- /dev/null +++ b/workflow/executor/python/loop_executor.py @@ -0,0 +1,50 @@ +"""Execute workflow loops.""" + + +class LoopExecutor: + """Execute loop nodes.""" + def __init__(self, runtime, input_resolver): + self.runtime = runtime + self.input_resolver = input_resolver + self.node_executor = None + + def set_node_executor(self, node_executor) -> None: + """Inject node executor dependency.""" + self.node_executor = node_executor + + def execute(self, node): + """Run loop body until stop condition.""" + inputs = node.get("inputs", {}) + max_iterations = self.input_resolver.resolve_binding(inputs.get("max_iterations", 1)) + stop_when_raw = inputs.get("stop_when") + stop_on_raw = inputs.get("stop_on", True) + + try: + max_iterations = int(max_iterations) + except (TypeError, ValueError): + max_iterations = 1 + + if self.runtime.context["args"].once: + max_iterations = min(max_iterations, 1) + + stop_on = self.input_resolver.coerce_bool(self.input_resolver.resolve_binding(stop_on_raw)) + body = node.get("body", []) + if not isinstance(body, list): + self.runtime.logger.error("Loop body must be a list of nodes.") + return None + + iteration = 0 + while iteration < max_iterations: + iteration += 1 + self.runtime.logger.info("--- Loop iteration %s ---", iteration) + if not self.node_executor: + self.runtime.logger.error("Loop executor missing node executor.") + return None + self.node_executor.execute_nodes(body) + + if stop_when_raw is not None: + stop_value = self.input_resolver.resolve_binding(stop_when_raw) + if self.input_resolver.coerce_bool(stop_value) == stop_on: + break + + return None diff --git a/workflow/executor/python/n8n_converter.py b/workflow/executor/python/n8n_converter.py new file mode 100644 index 000000000..a5f91e0cb --- /dev/null +++ b/workflow/executor/python/n8n_converter.py @@ -0,0 +1,110 @@ +"""Convert legacy workflows to n8n schema.""" +from __future__ import annotations + +import logging +from typing import Any, Dict, List +from uuid import uuid4 + +logger = logging.getLogger(__name__) + + +def _generate_node_id() -> str: + """Generate unique node ID.""" + return str(uuid4()) + + +def _calculate_position(index: int, parent_level: int = 0) -> List[float]: + """Calculate node position on canvas.""" + x = parent_level * 300.0 + y = index * 100.0 + return [x, y] + + +def _convert_node( + node: Dict[str, Any], + index: int, + parent_level: int = 0 +) -> Dict[str, Any]: + """Convert legacy node to n8n format.""" + node_id = node.get("id", f"node-{_generate_node_id()}") + node_type = node.get("type", "unknown") + + n8n_node: Dict[str, Any] = { + "id": node_id, + "name": node.get("name", node_id), + "type": node_type, + "typeVersion": 1, + "position": _calculate_position(index, parent_level), + "parameters": node.get("inputs", {}), + } + + if node.get("disabled"): + n8n_node["disabled"] = True + if node.get("notes"): + n8n_node["notes"] = node["notes"] + + return n8n_node + + +def _build_connections( + nodes: List[Dict[str, Any]], + legacy_nodes: List[Dict[str, Any]] +) -> Dict[str, Any]: + """Build n8n connections from variable bindings.""" + connections: Dict[str, Any] = {} + producers: Dict[str, str] = {} + + # Map variable names to producer nodes + for i, legacy_node in enumerate(legacy_nodes): + outputs = legacy_node.get("outputs", {}) + node_name = nodes[i]["name"] + for var_name in outputs.values(): + if isinstance(var_name, str): + producers[var_name] = node_name + + # Build connections from inputs + for i, legacy_node in enumerate(legacy_nodes): + inputs = legacy_node.get("inputs", {}) + target_name = nodes[i]["name"] + + for port, value in inputs.items(): + if isinstance(value, str) and value.startswith("$"): + var_name = value[1:] + source_name = producers.get(var_name) + + if source_name: + if source_name not in connections: + connections[source_name] = {"main": {}} + + if "0" not in connections[source_name]["main"]: + connections[source_name]["main"]["0"] = [] + + connections[source_name]["main"]["0"].append({ + "node": target_name, + "type": "main", + "index": 0 + }) + + return connections + + +def convert_to_n8n(legacy_workflow: Dict[str, Any]) -> Dict[str, Any]: + """Convert legacy workflow to n8n schema.""" + legacy_nodes = legacy_workflow.get("nodes", []) + + n8n_nodes = [ + _convert_node(node, i) + for i, node in enumerate(legacy_nodes) + ] + + connections = _build_connections(n8n_nodes, legacy_nodes) + + return { + "id": legacy_workflow.get("id", _generate_node_id()), + "name": legacy_workflow.get("name", "Workflow"), + "active": legacy_workflow.get("active", False), + "nodes": n8n_nodes, + "connections": connections, + "settings": legacy_workflow.get("settings", {}), + "tags": legacy_workflow.get("tags", []), + } diff --git a/workflow/executor/python/n8n_executor.py b/workflow/executor/python/n8n_executor.py new file mode 100644 index 000000000..c6be3e492 --- /dev/null +++ b/workflow/executor/python/n8n_executor.py @@ -0,0 +1,98 @@ +"""Execute n8n-style workflows with explicit connections.""" +from __future__ import annotations + +import logging +from typing import Any, Dict, List + +from .execution_order import build_execution_order + +logger = logging.getLogger(__name__) + + +class N8NExecutor: + """Execute n8n-style workflows.""" + + def __init__(self, runtime, plugin_registry): + self.runtime = runtime + self.plugin_registry = plugin_registry + + def execute(self, workflow: Dict[str, Any]) -> None: + """Execute n8n workflow.""" + nodes = workflow.get("nodes", []) + connections = workflow.get("connections", {}) + triggers = workflow.get("triggers", []) + + if not nodes: + logger.warning("No nodes in workflow") + return + + # Find enabled manual trigger (if any) + start_node_id = self._get_start_node_from_triggers(triggers) + + # Build execution order from connections (optionally starting from trigger node) + execution_order = build_execution_order(nodes, connections, start_node_id) + + # Execute nodes in order + for node_name in execution_order: + node = self._find_node_by_name(nodes, node_name) + if node: + self._execute_node(node) + + def _get_start_node_from_triggers(self, triggers: List[Dict]) -> str | None: + """Get start node ID from enabled manual triggers. + + Args: + triggers: List of trigger definitions + + Returns: + Node ID to start from, or None if no suitable trigger found + """ + if not triggers: + return None + + # Find first enabled manual trigger + for trigger in triggers: + if trigger.get("kind") == "manual" and trigger.get("enabled", True): + return trigger.get("nodeId") + + # If no manual trigger, use first enabled trigger of any kind + for trigger in triggers: + if trigger.get("enabled", True): + return trigger.get("nodeId") + + return None + + def _find_node_by_name(self, nodes: List[Dict], name: str) -> Dict | None: + """Find node by name.""" + for node in nodes: + if node.get("name") == name: + return node + return None + + def _execute_node(self, node: Dict[str, Any]) -> Any: + """Execute single node.""" + node_type = node.get("type") + node_name = node.get("name", node.get("id")) + + if node.get("disabled"): + logger.debug("Node %s is disabled, skipping", node_name) + return None + + if node_type == "control.loop": + return self._execute_loop(node) + + plugin = self.plugin_registry.get(node_type) + if not plugin: + logger.error("Unknown node type: %s", node_type) + return None + + inputs = node.get("parameters", {}) + logger.debug("Executing node %s (%s)", node_name, node_type) + + result = plugin(self.runtime, inputs) + return result + + def _execute_loop(self, node: Dict[str, Any]) -> Any: + """Execute loop node (placeholder).""" + logger.debug("Loop execution not yet implemented in n8n executor") + return None diff --git a/workflow/executor/python/n8n_schema.py b/workflow/executor/python/n8n_schema.py new file mode 100644 index 000000000..096cdf7e5 --- /dev/null +++ b/workflow/executor/python/n8n_schema.py @@ -0,0 +1,103 @@ +"""N8N workflow schema types and validation.""" +from __future__ import annotations + +from typing import Any, Dict, List, Literal, Optional, Union + + +class N8NPosition: + """Canvas position [x, y].""" + + @staticmethod + def validate(value: Any) -> bool: + return ( + isinstance(value, list) and + len(value) == 2 and + all(isinstance(v, (int, float)) for v in value) + ) + + +class N8NConnectionTarget: + """Connection target specification.""" + + @staticmethod + def validate(value: Any) -> bool: + if not isinstance(value, dict): + return False + return ( + "node" in value and isinstance(value["node"], str) and + "type" in value and isinstance(value["type"], str) and + "index" in value and isinstance(value["index"], int) and value["index"] >= 0 + ) + + +class N8NNode: + """N8N workflow node specification.""" + + @staticmethod + def validate(value: Any) -> bool: + if not isinstance(value, dict): + return False + required = ["id", "name", "type", "typeVersion", "position"] + if not all(key in value for key in required): + return False + if not isinstance(value["id"], str) or not value["id"]: + return False + if not isinstance(value["name"], str) or not value["name"]: + return False + if not isinstance(value["type"], str) or not value["type"]: + return False + if not isinstance(value["typeVersion"], (int, float)) or value["typeVersion"] < 1: + return False + if not N8NPosition.validate(value["position"]): + return False + return True + + +class N8NTrigger: + """N8N workflow trigger specification.""" + + VALID_KINDS = ["webhook", "schedule", "queue", "email", "poll", "manual", "other"] + + @staticmethod + def validate(value: Any) -> bool: + if not isinstance(value, dict): + return False + required = ["nodeId", "kind"] + if not all(key in value for key in required): + return False + if not isinstance(value["nodeId"], str) or not value["nodeId"]: + return False + if not isinstance(value["kind"], str) or value["kind"] not in N8NTrigger.VALID_KINDS: + return False + if "enabled" in value and not isinstance(value["enabled"], bool): + return False + if "meta" in value and not isinstance(value["meta"], dict): + return False + return True + + +class N8NWorkflow: + """N8N workflow specification.""" + + @staticmethod + def validate(value: Any) -> bool: + if not isinstance(value, dict): + return False + required = ["name", "nodes", "connections"] + if not all(key in value for key in required): + return False + if not isinstance(value["name"], str) or not value["name"]: + return False + if not isinstance(value["nodes"], list) or len(value["nodes"]) < 1: + return False + if not isinstance(value["connections"], dict): + return False + if not all(N8NNode.validate(node) for node in value["nodes"]): + return False + # Validate triggers array if present + if "triggers" in value: + if not isinstance(value["triggers"], list): + return False + if not all(N8NTrigger.validate(trigger) for trigger in value["triggers"]): + return False + return True diff --git a/workflow/executor/python/node_executor.py b/workflow/executor/python/node_executor.py new file mode 100644 index 000000000..b2d2ba073 --- /dev/null +++ b/workflow/executor/python/node_executor.py @@ -0,0 +1,53 @@ +"""Execute workflow nodes.""" + + +class NodeExecutor: + """Execute workflow nodes with plugins.""" + def __init__(self, runtime, plugin_registry, input_resolver, loop_executor): + self.runtime = runtime + self.plugin_registry = plugin_registry + self.input_resolver = input_resolver + self.loop_executor = loop_executor + + def execute_nodes(self, nodes): + """Execute a list of nodes.""" + for node in nodes: + self.execute_node(node) + + def execute_node(self, node): + """Execute a single node.""" + node_type = node.get("type") + if not node_type: + self.runtime.logger.error("Workflow node missing type.") + return None + + when_value = node.get("when") + if when_value is not None: + if not self.input_resolver.coerce_bool(self.input_resolver.resolve_binding(when_value)): + self.runtime.logger.trace("Node %s skipped by condition", node.get("id")) + return None + + if node_type == "control.loop": + return self.loop_executor.execute(node) + + plugin = self.plugin_registry.get(node_type) + if not plugin: + self.runtime.logger.error("Unknown node type: %s", node_type) + return None + + inputs = self.input_resolver.resolve_inputs(node.get("inputs", {})) + self.runtime.logger.debug("Executing node %s", node_type) + result = plugin(self.runtime, inputs) + if not isinstance(result, dict): + result = {"result": result} + + outputs = node.get("outputs", {}) + if outputs: + for output_name, store_key in outputs.items(): + if output_name in result: + self.runtime.store[store_key] = result[output_name] + else: + for output_name, value in result.items(): + self.runtime.store[output_name] = value + + return result diff --git a/workflow/executor/python/notification_helpers.py b/workflow/executor/python/notification_helpers.py new file mode 100644 index 000000000..5b42a41c0 --- /dev/null +++ b/workflow/executor/python/notification_helpers.py @@ -0,0 +1,73 @@ +"""Notification helpers for workflow plugins.""" +import os +import logging +import asyncio + +logger = logging.getLogger("autometabuilder.notifications") + + +def send_slack_notification(runtime, message: str): + """Send a notification to Slack using client from runtime context.""" + client = runtime.context.get("slack_client") if runtime else None + channel = os.environ.get("SLACK_CHANNEL") + + if not client: + logger.warning("Slack notification skipped: Slack client not initialized.") + return + + if not channel: + logger.warning("Slack notification skipped: SLACK_CHANNEL missing.") + return + + try: + from slack_sdk.errors import SlackApiError + client.chat_postMessage(channel=channel, text=message) + logger.info("Slack notification sent successfully.") + except SlackApiError as e: + logger.error(f"Error sending Slack notification: {e}") + + +async def send_discord_notification_async(message: str, token: str, intents, channel_id: str): + """Send Discord notification asynchronously.""" + import discord + + client = discord.Client(intents=intents) + + @client.event + async def on_ready(): + channel = client.get_channel(int(channel_id)) + if channel: + await channel.send(message) + logger.info("Discord notification sent successfully.") + await client.close() + + try: + await client.start(token) + except Exception as e: + logger.error(f"Error sending Discord notification: {e}") + + +def send_discord_notification(runtime, message: str): + """Send a Discord notification using config from runtime context.""" + token = runtime.context.get("discord_token") if runtime else None + intents = runtime.context.get("discord_intents") if runtime else None + channel_id = os.environ.get("DISCORD_CHANNEL_ID") + + if not token: + logger.warning("Discord notification skipped: Discord client not initialized.") + return + + if not channel_id: + logger.warning("Discord notification skipped: DISCORD_CHANNEL_ID missing.") + return + + try: + asyncio.run(send_discord_notification_async(message, token, intents, channel_id)) + except Exception as e: + logger.error(f"Error running Discord notification: {e}") + + +def notify_all(runtime, message: str): + """Send notification to all configured channels.""" + send_slack_notification(runtime, message) + send_discord_notification(runtime, message) diff --git a/workflow/executor/python/plugin_loader.py b/workflow/executor/python/plugin_loader.py new file mode 100644 index 000000000..f0cfd0ea9 --- /dev/null +++ b/workflow/executor/python/plugin_loader.py @@ -0,0 +1,7 @@ +"""Load workflow plugins by dotted path.""" +from ..utils import load_callable + + +def load_plugin_callable(path: str): + """Load a workflow plugin callable.""" + return load_callable(path) diff --git a/workflow/executor/python/plugin_map.json b/workflow/executor/python/plugin_map.json new file mode 100644 index 000000000..7b2e293d2 --- /dev/null +++ b/workflow/executor/python/plugin_map.json @@ -0,0 +1,132 @@ +{ + "backend.build_tool_map": "autometabuilder.workflow.plugins.backend.backend_build_tool_map.backend_build_tool_map.run", + "backend.configure_logging": "autometabuilder.workflow.plugins.backend.backend_configure_logging.backend_configure_logging.run", + "backend.create_github": "autometabuilder.workflow.plugins.backend.backend_create_github.backend_create_github.run", + "backend.create_openai": "autometabuilder.workflow.plugins.backend.backend_create_openai.backend_create_openai.run", + "backend.load_env": "autometabuilder.workflow.plugins.backend.backend_load_env.backend_load_env.run", + "backend.load_messages": "autometabuilder.workflow.plugins.backend.backend_load_messages.backend_load_messages.run", + "backend.load_metadata": "autometabuilder.workflow.plugins.backend.backend_load_metadata.backend_load_metadata.run", + "backend.load_plugins": "autometabuilder.workflow.plugins.backend.backend_load_plugins.backend_load_plugins.run", + "backend.load_prompt": "autometabuilder.workflow.plugins.backend.backend_load_prompt.backend_load_prompt.run", + "backend.load_tool_policies": "autometabuilder.workflow.plugins.backend.backend_load_tool_policies.backend_load_tool_policies.run", + "backend.load_tool_registry": "autometabuilder.workflow.plugins.backend.backend_load_tool_registry.backend_load_tool_registry.run", + "backend.load_tools": "autometabuilder.workflow.plugins.backend.backend_load_tools.backend_load_tools.run", + "backend.parse_cli_args": "autometabuilder.workflow.plugins.backend.backend_parse_cli_args.backend_parse_cli_args.run", + "control.get_bot_status": "autometabuilder.workflow.plugins.control.control_get_bot_status.control_get_bot_status.run", + "control.reset_bot_state": "autometabuilder.workflow.plugins.control.control_reset_bot_state.control_reset_bot_state.run", + "control.start_bot": "autometabuilder.workflow.plugins.control.control_start_bot.control_start_bot.run", + "control.switch": "autometabuilder.workflow.plugins.control.control_switch.control_switch.run", + "convert.parse_json": "autometabuilder.workflow.plugins.convert.convert_parse_json.convert_parse_json.run", + "convert.to_boolean": "autometabuilder.workflow.plugins.convert.convert_to_boolean.convert_to_boolean.run", + "convert.to_dict": "autometabuilder.workflow.plugins.convert.convert_to_dict.convert_to_dict.run", + "convert.to_json": "autometabuilder.workflow.plugins.convert.convert_to_json.convert_to_json.run", + "convert.to_list": "autometabuilder.workflow.plugins.convert.convert_to_list.convert_to_list.run", + "convert.to_number": "autometabuilder.workflow.plugins.convert.convert_to_number.convert_to_number.run", + "convert.to_string": "autometabuilder.workflow.plugins.convert.convert_to_string.convert_to_string.run", + "core.ai_request": "autometabuilder.workflow.plugins.core.core_ai_request.core_ai_request.run", + "core.append_context_message": "autometabuilder.workflow.plugins.core.core_append_context_message.core_append_context_message.run", + "core.append_tool_results": "autometabuilder.workflow.plugins.core.core_append_tool_results.core_append_tool_results.run", + "core.append_user_instruction": "autometabuilder.workflow.plugins.core.core_append_user_instruction.core_append_user_instruction.run", + "core.load_context": "autometabuilder.workflow.plugins.core.core_load_context.core_load_context.run", + "core.run_tool_calls": "autometabuilder.workflow.plugins.core.core_run_tool_calls.core_run_tool_calls.run", + "core.seed_messages": "autometabuilder.workflow.plugins.core.core_seed_messages.core_seed_messages.run", + "dict.get": "autometabuilder.workflow.plugins.dict.dict_get.dict_get.run", + "dict.items": "autometabuilder.workflow.plugins.dict.dict_items.dict_items.run", + "dict.keys": "autometabuilder.workflow.plugins.dict.dict_keys.dict_keys.run", + "dict.merge": "autometabuilder.workflow.plugins.dict.dict_merge.dict_merge.run", + "dict.set": "autometabuilder.workflow.plugins.dict.dict_set.dict_set.run", + "dict.values": "autometabuilder.workflow.plugins.dict.dict_values.dict_values.run", + "list.concat": "autometabuilder.workflow.plugins.list.list_concat.list_concat.run", + "list.every": "autometabuilder.workflow.plugins.list.list_every.list_every.run", + "list.find": "autometabuilder.workflow.plugins.list.list_find.list_find.run", + "list.length": "autometabuilder.workflow.plugins.list.list_length.list_length.run", + "list.slice": "autometabuilder.workflow.plugins.list.list_slice.list_slice.run", + "list.some": "autometabuilder.workflow.plugins.list.list_some.list_some.run", + "list.sort": "autometabuilder.workflow.plugins.list.list_sort.list_sort.run", + "logic.and": "autometabuilder.workflow.plugins.logic.logic_and.logic_and.run", + "logic.equals": "autometabuilder.workflow.plugins.logic.logic_equals.logic_equals.run", + "logic.gt": "autometabuilder.workflow.plugins.logic.logic_gt.logic_gt.run", + "logic.gte": "autometabuilder.workflow.plugins.logic.logic_gte.logic_gte.run", + "logic.in": "autometabuilder.workflow.plugins.logic.logic_in.logic_in.run", + "logic.lt": "autometabuilder.workflow.plugins.logic.logic_lt.logic_lt.run", + "logic.lte": "autometabuilder.workflow.plugins.logic.logic_lte.logic_lte.run", + "logic.or": "autometabuilder.workflow.plugins.logic.logic_or.logic_or.run", + "logic.xor": "autometabuilder.workflow.plugins.logic.logic_xor.logic_xor.run", + "math.abs": "autometabuilder.workflow.plugins.math.math_abs.math_abs.run", + "math.add": "autometabuilder.workflow.plugins.math.math_add.math_add.run", + "math.divide": "autometabuilder.workflow.plugins.math.math_divide.math_divide.run", + "math.max": "autometabuilder.workflow.plugins.math.math_max.math_max.run", + "math.min": "autometabuilder.workflow.plugins.math.math_min.math_min.run", + "math.modulo": "autometabuilder.workflow.plugins.math.math_modulo.math_modulo.run", + "math.multiply": "autometabuilder.workflow.plugins.math.math_multiply.math_multiply.run", + "math.power": "autometabuilder.workflow.plugins.math.math_power.math_power.run", + "math.round": "autometabuilder.workflow.plugins.math.math_round.math_round.run", + "math.subtract": "autometabuilder.workflow.plugins.math.math_subtract.math_subtract.run", + "string.concat": "autometabuilder.workflow.plugins.string.string_concat.string_concat.run", + "string.format": "autometabuilder.workflow.plugins.string.string_format.string_format.run", + "string.length": "autometabuilder.workflow.plugins.string.string_length.string_length.run", + "string.lower": "autometabuilder.workflow.plugins.string.string_lower.string_lower.run", + "string.replace": "autometabuilder.workflow.plugins.string.string_replace.string_replace.run", + "string.split": "autometabuilder.workflow.plugins.string.string_split.string_split.run", + "string.trim": "autometabuilder.workflow.plugins.string.string_trim.string_trim.run", + "string.upper": "autometabuilder.workflow.plugins.string.string_upper.string_upper.run", + "test.assert_equals": "autometabuilder.workflow.plugins.test.test_assert_equals.test_assert_equals.run", + "test.assert_exists": "autometabuilder.workflow.plugins.test.test_assert_exists.test_assert_exists.run", + "test.assert_false": "autometabuilder.workflow.plugins.test.test_assert_false.test_assert_false.run", + "test.assert_true": "autometabuilder.workflow.plugins.test.test_assert_true.test_assert_true.run", + "test.run_suite": "autometabuilder.workflow.plugins.test.test_run_suite.test_run_suite.run", + "tools.create_branch": "autometabuilder.workflow.plugins.tools.tools_create_branch.tools_create_branch.run", + "tools.create_pull_request": "autometabuilder.workflow.plugins.tools.tools_create_pull_request.tools_create_pull_request.run", + "tools.list_files": "autometabuilder.workflow.plugins.tools.tools_list_files.tools_list_files.run", + "tools.read_file": "autometabuilder.workflow.plugins.tools.tools_read_file.tools_read_file.run", + "tools.run_docker": "autometabuilder.workflow.plugins.tools.tools_run_docker.tools_run_docker.run", + "tools.run_lint": "autometabuilder.workflow.plugins.tools.tools_run_lint.tools_run_lint.run", + "tools.run_tests": "autometabuilder.workflow.plugins.tools.tools_run_tests.tools_run_tests.run", + "utils.branch_condition": "autometabuilder.workflow.plugins.utils.utils_branch_condition.utils_branch_condition.run", + "utils.check_mvp": "autometabuilder.workflow.plugins.utils.utils_check_mvp.utils_check_mvp.run", + "utils.filter_list": "autometabuilder.workflow.plugins.utils.utils_filter_list.utils_filter_list.run", + "utils.map_list": "autometabuilder.workflow.plugins.utils.utils_map_list.utils_map_list.run", + "utils.not": "autometabuilder.workflow.plugins.utils.utils_not.utils_not.run", + "utils.reduce_list": "autometabuilder.workflow.plugins.utils.utils_reduce_list.utils_reduce_list.run", + "utils.update_roadmap": "autometabuilder.workflow.plugins.utils.utils_update_roadmap.utils_update_roadmap.run", + "var.delete": "autometabuilder.workflow.plugins.var.var_delete.var_delete.run", + "var.exists": "autometabuilder.workflow.plugins.var.var_exists.var_exists.run", + "var.get": "autometabuilder.workflow.plugins.var.var_get.var_get.run", + "var.set": "autometabuilder.workflow.plugins.var.var_set.var_set.run", + "web.build_context": "autometabuilder.workflow.plugins.web.web_build_context.web_build_context.run", + "web.build_prompt_yaml": "autometabuilder.workflow.plugins.web.web_build_prompt_yaml.web_build_prompt_yaml.run", + "web.create_flask_app": "autometabuilder.workflow.plugins.web.web_create_flask_app.web_create_flask_app.run", + "web.create_translation": "autometabuilder.workflow.plugins.web.web_create_translation.web_create_translation.run", + "web.delete_translation": "autometabuilder.workflow.plugins.web.web_delete_translation.web_delete_translation.run", + "web.get_env_vars": "autometabuilder.workflow.plugins.web.web_get_env_vars.web_get_env_vars.run", + "web.get_navigation_items": "autometabuilder.workflow.plugins.web.web_get_navigation_items.web_get_navigation_items.run", + "web.get_prompt_content": "autometabuilder.workflow.plugins.web.web_get_prompt_content.web_get_prompt_content.run", + "web.get_recent_logs": "autometabuilder.workflow.plugins.web.web_get_recent_logs.web_get_recent_logs.run", + "web.get_ui_messages": "autometabuilder.workflow.plugins.web.web_get_ui_messages.web_get_ui_messages.run", + "web.get_workflow_content": "autometabuilder.workflow.plugins.web.web_get_workflow_content.web_get_workflow_content.run", + "web.list_translations": "autometabuilder.workflow.plugins.web.web_list_translations.web_list_translations.run", + "web.load_messages": "autometabuilder.workflow.plugins.web.web_load_messages.web_load_messages.run", + "web.load_translation": "autometabuilder.workflow.plugins.web.web_load_translation.web_load_translation.run", + "web.load_workflow_packages": "autometabuilder.workflow.plugins.web.web_load_workflow_packages.web_load_workflow_packages.run", + "web.persist_env_vars": "autometabuilder.workflow.plugins.web.web_persist_env_vars.web_persist_env_vars.run", + "web.read_json": "autometabuilder.workflow.plugins.web.web_read_json.web_read_json.run", + "web.register_blueprint": "autometabuilder.workflow.plugins.web.web_register_blueprint.web_register_blueprint.run", + "web.route_context": "autometabuilder.workflow.plugins.web.web_route_context.web_route_context.run", + "web.route_navigation": "autometabuilder.workflow.plugins.web.web_route_navigation.web_route_navigation.run", + "web.route_prompt": "autometabuilder.workflow.plugins.web.web_route_prompt.web_route_prompt.run", + "web.route_run": "autometabuilder.workflow.plugins.web.web_route_run.web_route_run.run", + "web.route_settings": "autometabuilder.workflow.plugins.web.web_route_settings.web_route_settings.run", + "web.route_translations": "autometabuilder.workflow.plugins.web.web_route_translations.web_route_translations.run", + "web.start_server": "autometabuilder.workflow.plugins.web.web_start_server.web_start_server.run", + "web.summarize_workflow_packages": "autometabuilder.workflow.plugins.web.web_summarize_workflow_packages.web_summarize_workflow_packages.run", + "web.update_translation": "autometabuilder.workflow.plugins.web.web_update_translation.web_update_translation.run", + "web.write_messages_dir": "autometabuilder.workflow.plugins.web.web_write_messages_dir.web_write_messages_dir.run", + "web.write_prompt": "autometabuilder.workflow.plugins.web.web_write_prompt.web_write_prompt.run", + "web.write_workflow": "autometabuilder.workflow.plugins.web.web_write_workflow.web_write_workflow.run", + "web.register_routes": "autometabuilder.workflow.plugins.web.web_register_routes.web_register_routes.run", + "web.api_navigation": "autometabuilder.workflow.plugins.web.web_api_navigation.web_api_navigation.run", + "web.api_workflow_packages": "autometabuilder.workflow.plugins.web.web_api_workflow_packages.web_api_workflow_packages.run", + "web.api_workflow_plugins": "autometabuilder.workflow.plugins.web.web_api_workflow_plugins.web_api_workflow_plugins.run", + "web.api_workflow_graph": "autometabuilder.workflow.plugins.web.web_api_workflow_graph.web_api_workflow_graph.run", + "web.api_translation_options": "autometabuilder.workflow.plugins.web.web_api_translation_options.web_api_translation_options.run" +} diff --git a/workflow/executor/python/plugin_registry.py b/workflow/executor/python/plugin_registry.py new file mode 100644 index 000000000..16483ed1f --- /dev/null +++ b/workflow/executor/python/plugin_registry.py @@ -0,0 +1,110 @@ +"""Workflow plugin registry with automatic plugin discovery.""" +import json +import logging +import os +from pathlib import Path +from .plugin_loader import load_plugin_callable + +logger = logging.getLogger("autometabuilder") + + +def scan_plugins() -> dict: + """ + Automatically scan and discover workflow plugins. + + Scans the plugins directory and subdirectories, looking for package.json files + that define plugins. Returns a map of plugin_name -> callable_path. + + Plugin structure: + - Each plugin is in its own directory with a package.json file + - Plugin name can be in "metadata.plugin_type" (preferred) or "name" field + - package.json must have a "main" field pointing to the Python file + - The Python file must have a "run" function + """ + plugin_map = {} + plugins_base = Path(__file__).parent / "plugins" + + if not plugins_base.exists(): + logger.warning("Plugins directory not found: %s", plugins_base) + return plugin_map + + # Scan all subdirectories for package.json files + for package_json_path in plugins_base.rglob("package.json"): + try: + # Read package.json + with open(package_json_path, "r", encoding="utf-8") as f: + package_data = json.load(f) + + # Try metadata.plugin_type first (preferred), then fall back to name + metadata = package_data.get("metadata", {}) + plugin_name = metadata.get("plugin_type") or package_data.get("name") + main_file = package_data.get("main") + + if not plugin_name or not main_file: + logger.debug("Skipping %s: missing 'plugin_type'/'name' or 'main' field", package_json_path) + continue + + # Build the Python module path + plugin_dir = package_json_path.parent + main_file_stem = Path(main_file).stem # Remove .py extension + + # Calculate relative path from plugins directory + rel_path = plugin_dir.relative_to(plugins_base) + + # Build module path: autometabuilder.workflow.plugins....run + parts = ["autometabuilder", "workflow", "plugins"] + list(rel_path.parts) + [main_file_stem, "run"] + callable_path = ".".join(parts) + + plugin_map[plugin_name] = callable_path + logger.debug("Discovered plugin %s -> %s", plugin_name, callable_path) + + except json.JSONDecodeError: + logger.warning("Invalid JSON in %s", package_json_path) + except Exception as error: # pylint: disable=broad-exception-caught + logger.debug("Error scanning %s: %s", package_json_path, error) + + logger.info("Discovered %d plugins via scanning", len(plugin_map)) + return plugin_map + + +def load_plugin_map() -> dict: + """ + Load workflow plugin map. + + This function now uses automatic plugin discovery by scanning the plugins + directory instead of reading from a static plugin_map.json file. + + Falls back to plugin_map.json if it exists (for backwards compatibility). + """ + # Try scanning first + plugin_map = scan_plugins() + + # If no plugins found, try legacy plugin_map.json as fallback + if not plugin_map: + map_path = os.path.join(os.path.dirname(__file__), "plugin_map.json") + if os.path.exists(map_path): + logger.info("Using legacy plugin_map.json") + try: + with open(map_path, "r", encoding="utf-8") as f: + data = json.load(f) + plugin_map = data if isinstance(data, dict) else {} + except json.JSONDecodeError: + logger.error("Invalid workflow plugin map JSON.") + + return plugin_map + + +class PluginRegistry: + """Resolve workflow plugin handlers.""" + def __init__(self, plugin_map: dict): + self._plugins = {} + for node_type, path in plugin_map.items(): + try: + self._plugins[node_type] = load_plugin_callable(path) + logger.debug("Registered workflow plugin %s -> %s", node_type, path) + except Exception as error: # pylint: disable=broad-exception-caught + logger.error("Failed to register plugin %s: %s", node_type, error) + + def get(self, node_type: str): + """Return plugin handler for node type.""" + return self._plugins.get(node_type) diff --git a/workflow/executor/python/runtime.py b/workflow/executor/python/runtime.py new file mode 100644 index 000000000..53d92ecb1 --- /dev/null +++ b/workflow/executor/python/runtime.py @@ -0,0 +1,10 @@ +"""Workflow runtime container.""" + + +class WorkflowRuntime: + """Runtime state for workflow execution.""" + def __init__(self, context: dict, store: dict, tool_runner, logger): + self.context = context + self.store = store + self.tool_runner = tool_runner + self.logger = logger diff --git a/workflow/executor/python/tool_calls_handler.py b/workflow/executor/python/tool_calls_handler.py new file mode 100644 index 000000000..10fe99609 --- /dev/null +++ b/workflow/executor/python/tool_calls_handler.py @@ -0,0 +1,96 @@ +"""Handle tool calls from LLM responses.""" +import json + + +def handle_tool_calls(resp_msg, tool_map: dict, msgs: dict, args, policies: dict, logger) -> list: + """Execute tool calls and return tool result messages.""" + if not resp_msg.tool_calls: + return [] + + modifying_tools = set(policies.get("modifying_tools", [])) + tool_results = [] + + for tool_call in resp_msg.tool_calls: + function_name = tool_call.function.name + call_id = tool_call.id + payload = json.loads(tool_call.function.arguments) + logger.trace("Tool call %s payload: %s", function_name, payload) + + handler = tool_map.get(function_name) + if not handler: + msg_template = msgs.get( + "error_tool_not_found", + "Tool {name} not found or unavailable." + ) + msg = msg_template.format(name=function_name) + logger.error(msg) + tool_results.append({ + "tool_call_id": call_id, + "role": "tool", + "name": function_name, + "content": msg, + }) + continue + + if not args.yolo: + confirm = input( + msgs.get( + "confirm_tool_execution", + "Do you want to execute {name} with {args}? [y/N]: " + ).format(name=function_name, args=payload) + ) + if confirm.lower() != "y": + skipped_template = msgs.get("info_tool_skipped", "Skipping tool: {name}") + logger.info(skipped_template.format(name=function_name)) + tool_results.append({ + "tool_call_id": call_id, + "role": "tool", + "name": function_name, + "content": "Skipped by user.", + }) + continue + + if args.dry_run and function_name in modifying_tools: + logger.info( + msgs.get( + "info_dry_run_skipping", + "DRY RUN: Skipping state-modifying tool {name}" + ).format(name=function_name) + ) + tool_results.append({ + "tool_call_id": call_id, + "role": "tool", + "name": function_name, + "content": "Skipped due to dry-run.", + }) + continue + + exec_template = msgs.get("info_executing_tool", "Executing tool: {name}") + logger.info(exec_template.format(name=function_name)) + try: + result = handler(**payload) + content = str(result) if result is not None else "Success" + if hasattr(result, "__iter__") and not isinstance(result, str): + items = list(result)[:5] + content = "\n".join([f"- {item}" for item in items]) + logger.info(content) + elif result is not None: + logger.info(result) + + tool_results.append({ + "tool_call_id": call_id, + "role": "tool", + "name": function_name, + "content": content, + }) + except Exception as error: # pylint: disable=broad-exception-caught + error_msg = f"Error executing {function_name}: {error}" + logger.error(error_msg) + tool_results.append({ + "tool_call_id": call_id, + "role": "tool", + "name": function_name, + "content": error_msg, + }) + + return tool_results diff --git a/workflow/executor/python/tool_runner.py b/workflow/executor/python/tool_runner.py new file mode 100644 index 000000000..d4b81df5e --- /dev/null +++ b/workflow/executor/python/tool_runner.py @@ -0,0 +1,29 @@ +"""Run tools with logging and filtering.""" + + +class ToolRunner: + """Run tool callables with shared logging.""" + def __init__(self, tool_map: dict, msgs: dict, logger): + self.tool_map = tool_map + self.msgs = msgs + self.logger = logger + + def call(self, tool_name: str, **kwargs): + """Call a named tool with filtered kwargs.""" + tool = self.tool_map.get(tool_name) + if not tool: + msg = self.msgs.get( + "error_tool_not_found", + "Tool {name} not found or unavailable." + ).format(name=tool_name) + self.logger.error(msg) + return msg + + filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None} + try: + result = tool(**filtered_kwargs) + return result if result is not None else "Success" + except Exception as error: # pylint: disable=broad-exception-caught + error_msg = f"Error executing {tool_name}: {error}" + self.logger.error(error_msg) + return error_msg diff --git a/workflow/executor/python/value_helpers.py b/workflow/executor/python/value_helpers.py new file mode 100644 index 000000000..269fa387b --- /dev/null +++ b/workflow/executor/python/value_helpers.py @@ -0,0 +1,39 @@ +"""Helpers for normalizing workflow values.""" + + +class ValueHelpers: + """Normalize values for workflow helpers.""" + @staticmethod + def ensure_list(value): + """Return a list for any incoming value.""" + if value is None: + return [] + if isinstance(value, list): + return value + if isinstance(value, (tuple, set)): + return list(value) + if isinstance(value, str): + return [line for line in value.splitlines() if line.strip()] + return [value] + + @staticmethod + def coerce_bool(value) -> bool: + """Coerce values into booleans.""" + if isinstance(value, bool): + return value + if isinstance(value, str): + lowered = value.strip().lower() + if lowered in ("true", "yes", "1"): + return True + if lowered in ("false", "no", "0", ""): + return False + return bool(value) + + @staticmethod + def normalize_separator(text): + """Normalize escaped separators.""" + if text is None: + return "" + if isinstance(text, str): + return text.replace("\\n", "\n").replace("\\t", "\t") + return str(text) diff --git a/workflow/executor/python/workflow_adapter.py b/workflow/executor/python/workflow_adapter.py new file mode 100644 index 000000000..ef1359eae --- /dev/null +++ b/workflow/executor/python/workflow_adapter.py @@ -0,0 +1,48 @@ +"""N8N workflow format handler.""" +from __future__ import annotations + +import logging +from typing import Any, Dict + +from .n8n_executor import N8NExecutor + +logger = logging.getLogger(__name__) + + +def is_n8n_workflow(workflow: Dict[str, Any]) -> bool: + """Check if workflow uses n8n schema.""" + if not isinstance(workflow, dict): + return False + + # N8N workflows must have explicit connections + has_connections = "connections" in workflow + nodes = workflow.get("nodes", []) + + if not nodes: + return has_connections + + # Check if nodes have n8n properties + first_node = nodes[0] if isinstance(nodes, list) and nodes else {} + has_position = "position" in first_node + has_type_version = "typeVersion" in first_node + has_name = "name" in first_node + + return has_connections and (has_position or has_type_version or has_name) + + +class WorkflowAdapter: + """Execute n8n workflows (breaking change: legacy format no longer supported).""" + + def __init__(self, node_executor, runtime, plugin_registry): + self.runtime = runtime + self.plugin_registry = plugin_registry + self.n8n_executor = N8NExecutor(runtime, plugin_registry) + + def execute(self, workflow: Dict[str, Any]) -> None: + """Execute n8n workflow.""" + if not is_n8n_workflow(workflow): + logger.error("Legacy workflow format is no longer supported. Please migrate to n8n schema.") + raise ValueError("Only n8n workflow format is supported") + + logger.debug("Executing n8n workflow") + self.n8n_executor.execute(workflow) diff --git a/workflow/executor/python/workflow_config_loader.py b/workflow/executor/python/workflow_config_loader.py new file mode 100644 index 000000000..366168cf6 --- /dev/null +++ b/workflow/executor/python/workflow_config_loader.py @@ -0,0 +1,11 @@ +"""Load workflow configuration JSON.""" +import json +import os + + +def load_workflow_config(metadata: dict) -> dict: + """Load workflow config referenced by metadata.""" + workflow_file = metadata.get("workflow_path", "workflow.json") + workflow_path = os.path.join(os.path.dirname(__file__), workflow_file) + with open(workflow_path, "r", encoding="utf-8") as f: + return json.load(f) diff --git a/workflow/executor/python/workflow_context_builder.py b/workflow/executor/python/workflow_context_builder.py new file mode 100644 index 000000000..4abbb084b --- /dev/null +++ b/workflow/executor/python/workflow_context_builder.py @@ -0,0 +1,22 @@ +"""Build workflow runtime context.""" +import os + +DEFAULT_MODEL = "openai/gpt-4o" + + +def resolve_model_name(prompt: dict) -> str: + """Resolve model name from env or prompt.""" + return os.environ.get("LLM_MODEL", prompt.get("model", DEFAULT_MODEL)) + + +def build_workflow_context(parts: dict) -> dict: + """Build the workflow context dict.""" + context = dict(parts) + # Only resolve model if prompt is available, otherwise use default + if "prompt" in parts: + prompt = parts["prompt"] + context["model_name"] = resolve_model_name(prompt) + else: + # Workflow plugins will load prompt, model will be resolved then + context["model_name"] = resolve_model_name({}) + return context diff --git a/workflow/executor/python/workflow_engine_builder.py b/workflow/executor/python/workflow_engine_builder.py new file mode 100644 index 000000000..25dd60753 --- /dev/null +++ b/workflow/executor/python/workflow_engine_builder.py @@ -0,0 +1,25 @@ +"""Build workflow engine with dependencies.""" +from .engine import WorkflowEngine +from .input_resolver import InputResolver +from .loop_executor import LoopExecutor +from .node_executor import NodeExecutor +from .plugin_registry import PluginRegistry, load_plugin_map +from .runtime import WorkflowRuntime +from .tool_runner import ToolRunner + + +def build_workflow_engine(workflow_config: dict, context: dict, logger): + """Assemble workflow engine dependencies.""" + runtime = WorkflowRuntime(context=context, store={}, tool_runner=None, logger=logger) + # Only create ToolRunner if tool_map and msgs are provided (needed for AI workflows) + if "tool_map" in context and "msgs" in context: + tool_runner = ToolRunner(context["tool_map"], context["msgs"], logger) + runtime.tool_runner = tool_runner + + plugin_registry = PluginRegistry(load_plugin_map()) + input_resolver = InputResolver(runtime.store) + loop_executor = LoopExecutor(runtime, input_resolver) + node_executor = NodeExecutor(runtime, plugin_registry, input_resolver, loop_executor) + loop_executor.set_node_executor(node_executor) + + return WorkflowEngine(workflow_config, node_executor, logger, runtime, plugin_registry) diff --git a/workflow/executor/python/workflow_graph.py b/workflow/executor/python/workflow_graph.py new file mode 100644 index 000000000..d36b862a7 --- /dev/null +++ b/workflow/executor/python/workflow_graph.py @@ -0,0 +1,89 @@ +"""Build a node/edge view of n8n workflows for visualization.""" +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, Iterable, List + +from autometabuilder.data import get_workflow_content, load_metadata + +logger = logging.getLogger(__name__) + + +def _parse_workflow_definition() -> Dict[str, Any]: + payload = get_workflow_content() + if not payload: + return {"name": "Empty", "nodes": [], "connections": {}} + try: + parsed = json.loads(payload) + except json.JSONDecodeError as exc: + logger.warning("Invalid workflow JSON: %s", exc) + return {"name": "Invalid", "nodes": [], "connections": {}} + return parsed if isinstance(parsed, dict) else {"name": "Invalid", "nodes": [], "connections": {}} + + +def _gather_n8n_nodes( + nodes: Iterable[Dict[str, Any]], + plugin_map: Dict[str, Any] +) -> List[Dict[str, Any]]: + """Extract nodes from n8n format.""" + collected = [] + for node in nodes: + node_id = node.get("id", node.get("name", f"node-{len(collected)}")) + node_type = node.get("type", "unknown") + metadata = plugin_map.get(node_type, {}) + + collected.append({ + "id": node_id, + "name": node.get("name", node_id), + "type": node_type, + "label_key": metadata.get("label"), + "parent": None, + "position": node.get("position", [0, 0]), + }) + return collected + + +def _build_n8n_edges( + connections: Dict[str, Any], + nodes: List[Dict[str, Any]] +) -> List[Dict[str, str]]: + """Build edges from n8n connections.""" + # Build name to ID mapping + name_to_id = {node["name"]: node["id"] for node in nodes} + + edges = [] + for source_name, outputs in connections.items(): + source_id = name_to_id.get(source_name, source_name) + + for output_type, indices in outputs.items(): + for index, targets in indices.items(): + for target in targets: + target_name = target["node"] + target_id = name_to_id.get(target_name, target_name) + + edges.append({ + "from": source_id, + "to": target_id, + "type": target.get("type", "main"), + "output_index": index, + "input_index": target.get("index", 0), + }) + return edges + + +def build_workflow_graph() -> Dict[str, Any]: + """Build workflow graph from n8n format (breaking change: legacy format removed).""" + definition = _parse_workflow_definition() + plugin_map = load_metadata().get("workflow_plugins", {}) + + # Only support n8n format now + nodes = _gather_n8n_nodes(definition.get("nodes", []), plugin_map) + edges = _build_n8n_edges(definition.get("connections", {}), nodes) + + logger.debug("Built workflow graph with %d nodes and %d edges", len(nodes), len(edges)) + return { + "nodes": nodes, + "edges": edges, + "count": {"nodes": len(nodes), "edges": len(edges)}, + }