Files
metabuilder/workflow/executor/python/workflow_graph.py
johndoe6345789 3d6ae4cbf7 feat: Add complete Python workflow executor from AutoMetabuilder
Add full Python workflow execution engine with:

Core Executor:
- engine.py: WorkflowEngine for running n8n configs
- n8n_executor.py: N8N-style workflow execution with connections
- node_executor.py: Individual node execution with plugin dispatch
- loop_executor.py: Loop node execution with iteration control
- execution_order.py: Topological sort for node ordering

Schema & Validation:
- n8n_schema.py: N8N workflow schema types and validation
- n8n_converter.py: Legacy to n8n schema conversion

Plugin System:
- plugin_loader.py: Dynamic plugin loading
- plugin_registry.py: Plugin discovery and registration
- plugin_map.json: 116 plugin type mappings

Runtime & Context:
- runtime.py: Workflow runtime container
- input_resolver.py: Binding and coercion resolution
- value_helpers.py: Value normalization helpers
- workflow_context_builder.py: Runtime context assembly
- workflow_config_loader.py: Configuration loading
- workflow_engine_builder.py: Engine assembly with dependencies

Utilities:
- tool_calls_handler.py: LLM tool call handling
- tool_runner.py: Tool execution with logging
- notification_helpers.py: Slack/Discord notifications
- workflow_adapter.py: N8N format handling
- workflow_graph.py: Node/edge graph for visualization

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 16:42:30 +00:00

90 lines
3.0 KiB
Python

"""Build a node/edge view of n8n workflows for visualization."""
from __future__ import annotations
import json
import logging
from typing import Any, Dict, Iterable, List
from autometabuilder.data import get_workflow_content, load_metadata
logger = logging.getLogger(__name__)
def _parse_workflow_definition() -> Dict[str, Any]:
payload = get_workflow_content()
if not payload:
return {"name": "Empty", "nodes": [], "connections": {}}
try:
parsed = json.loads(payload)
except json.JSONDecodeError as exc:
logger.warning("Invalid workflow JSON: %s", exc)
return {"name": "Invalid", "nodes": [], "connections": {}}
return parsed if isinstance(parsed, dict) else {"name": "Invalid", "nodes": [], "connections": {}}
def _gather_n8n_nodes(
nodes: Iterable[Dict[str, Any]],
plugin_map: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Extract nodes from n8n format."""
collected = []
for node in nodes:
node_id = node.get("id", node.get("name", f"node-{len(collected)}"))
node_type = node.get("type", "unknown")
metadata = plugin_map.get(node_type, {})
collected.append({
"id": node_id,
"name": node.get("name", node_id),
"type": node_type,
"label_key": metadata.get("label"),
"parent": None,
"position": node.get("position", [0, 0]),
})
return collected
def _build_n8n_edges(
connections: Dict[str, Any],
nodes: List[Dict[str, Any]]
) -> List[Dict[str, str]]:
"""Build edges from n8n connections."""
# Build name to ID mapping
name_to_id = {node["name"]: node["id"] for node in nodes}
edges = []
for source_name, outputs in connections.items():
source_id = name_to_id.get(source_name, source_name)
for output_type, indices in outputs.items():
for index, targets in indices.items():
for target in targets:
target_name = target["node"]
target_id = name_to_id.get(target_name, target_name)
edges.append({
"from": source_id,
"to": target_id,
"type": target.get("type", "main"),
"output_index": index,
"input_index": target.get("index", 0),
})
return edges
def build_workflow_graph() -> Dict[str, Any]:
"""Build workflow graph from n8n format (breaking change: legacy format removed)."""
definition = _parse_workflow_definition()
plugin_map = load_metadata().get("workflow_plugins", {})
# Only support n8n format now
nodes = _gather_n8n_nodes(definition.get("nodes", []), plugin_map)
edges = _build_n8n_edges(definition.get("connections", {}), nodes)
logger.debug("Built workflow graph with %d nodes and %d edges", len(nodes), len(edges))
return {
"nodes": nodes,
"edges": edges,
"count": {"nodes": len(nodes), "edges": len(edges)},
}