Files
AutoMetabuilder/backend/autometabuilder/web/workflow_graph.py
T
git af98717aad Introduce AutoMetabuilder core components and workflow packages:
- Implement core components: CLI argument parsing, environment loading, GitHub service creation, and logging configuration.
- Add support for OpenAI client setup and model resolution.
- Develop SDLC context loader from GitHub and repository files.
- Implement workflow context and engine builders.
- Introduce major workflow packages: `game_tick_loop` and `contextual_iterative_loop`.
- Update localization files with new package descriptions and labels.
- Streamline web navigation by loading items from a dedicated JSON file.
2026-01-10 02:04:19 +00:00

80 lines
3.0 KiB
Python

"""Build a node/edge view of the declarative workflow for visualization."""
from __future__ import annotations
import json
import logging
from typing import Any, Dict, Iterable, List
from .data import get_workflow_content, load_metadata
logger = logging.getLogger(__name__)
def _parse_workflow_definition() -> Dict[str, Any]:
payload = get_workflow_content()
if not payload:
return {"nodes": []}
try:
parsed = json.loads(payload)
except json.JSONDecodeError as exc:
logger.warning("Invalid workflow JSON: %s", exc)
return {"nodes": []}
return parsed if isinstance(parsed, dict) else {"nodes": []}
def _gather_nodes(nodes: Iterable[Dict[str, Any]], plugin_map: Dict[str, Any], parent_id: str | None = None, collected: List[Dict[str, Any]] | None = None) -> List[Dict[str, Any]]:
collected = collected or []
for node in nodes:
node_id = node.get("id") or f"node-{len(collected)}"
node_type = node.get("type", "unknown")
metadata = plugin_map.get(node_type, {})
node_summary: Dict[str, Any] = {
"id": node_id,
"type": node_type,
"label_key": metadata.get("label"),
"parent": parent_id,
"inputs": node.get("inputs", {}),
"outputs": node.get("outputs", {}),
}
collected.append(node_summary)
body = node.get("body")
if isinstance(body, list):
_gather_nodes(body, plugin_map, parent_id=node_id, collected=collected)
return collected
def _build_edges(nodes: Iterable[Dict[str, Any]]) -> List[Dict[str, str]]:
producers: Dict[str, str] = {}
for node in nodes:
outputs = node.get("outputs", {})
for value in outputs.values():
if isinstance(value, str):
if value in producers:
logger.debug("Variable %s already produced by %s; overwriting with %s", value, producers[value], node["id"])
producers[value] = node["id"]
edges: List[Dict[str, str]] = []
for node in nodes:
inputs = node.get("inputs", {})
for port, value in inputs.items():
if isinstance(value, str) and value.startswith("$"):
variable = value[1:]
source = producers.get(variable)
if source:
edges.append({"from": source, "to": node["id"], "var": variable, "port": port})
else:
logger.debug("No producer found for %s referenced by %s.%s", variable, node["id"], port)
return edges
def build_workflow_graph() -> Dict[str, Any]:
definition = _parse_workflow_definition()
plugin_map = load_metadata().get("workflow_plugins", {})
nodes = _gather_nodes(definition.get("nodes", []), plugin_map)
edges = _build_edges(nodes)
logger.debug("Built workflow graph with %d nodes and %d edges", len(nodes), len(edges))
return {
"nodes": nodes,
"edges": edges,
"count": {"nodes": len(nodes), "edges": len(edges)},
}