mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-04-25 06:14:59 +00:00
Add full Python workflow execution engine with: Core Executor: - engine.py: WorkflowEngine for running n8n configs - n8n_executor.py: N8N-style workflow execution with connections - node_executor.py: Individual node execution with plugin dispatch - loop_executor.py: Loop node execution with iteration control - execution_order.py: Topological sort for node ordering Schema & Validation: - n8n_schema.py: N8N workflow schema types and validation - n8n_converter.py: Legacy to n8n schema conversion Plugin System: - plugin_loader.py: Dynamic plugin loading - plugin_registry.py: Plugin discovery and registration - plugin_map.json: 116 plugin type mappings Runtime & Context: - runtime.py: Workflow runtime container - input_resolver.py: Binding and coercion resolution - value_helpers.py: Value normalization helpers - workflow_context_builder.py: Runtime context assembly - workflow_config_loader.py: Configuration loading - workflow_engine_builder.py: Engine assembly with dependencies Utilities: - tool_calls_handler.py: LLM tool call handling - tool_runner.py: Tool execution with logging - notification_helpers.py: Slack/Discord notifications - workflow_adapter.py: N8N format handling - workflow_graph.py: Node/edge graph for visualization Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
54 lines
1.9 KiB
Python
54 lines
1.9 KiB
Python
"""Execute workflow nodes."""
|
|
|
|
|
|
class NodeExecutor:
|
|
"""Execute workflow nodes with plugins."""
|
|
def __init__(self, runtime, plugin_registry, input_resolver, loop_executor):
|
|
self.runtime = runtime
|
|
self.plugin_registry = plugin_registry
|
|
self.input_resolver = input_resolver
|
|
self.loop_executor = loop_executor
|
|
|
|
def execute_nodes(self, nodes):
|
|
"""Execute a list of nodes."""
|
|
for node in nodes:
|
|
self.execute_node(node)
|
|
|
|
def execute_node(self, node):
|
|
"""Execute a single node."""
|
|
node_type = node.get("type")
|
|
if not node_type:
|
|
self.runtime.logger.error("Workflow node missing type.")
|
|
return None
|
|
|
|
when_value = node.get("when")
|
|
if when_value is not None:
|
|
if not self.input_resolver.coerce_bool(self.input_resolver.resolve_binding(when_value)):
|
|
self.runtime.logger.trace("Node %s skipped by condition", node.get("id"))
|
|
return None
|
|
|
|
if node_type == "control.loop":
|
|
return self.loop_executor.execute(node)
|
|
|
|
plugin = self.plugin_registry.get(node_type)
|
|
if not plugin:
|
|
self.runtime.logger.error("Unknown node type: %s", node_type)
|
|
return None
|
|
|
|
inputs = self.input_resolver.resolve_inputs(node.get("inputs", {}))
|
|
self.runtime.logger.debug("Executing node %s", node_type)
|
|
result = plugin(self.runtime, inputs)
|
|
if not isinstance(result, dict):
|
|
result = {"result": result}
|
|
|
|
outputs = node.get("outputs", {})
|
|
if outputs:
|
|
for output_name, store_key in outputs.items():
|
|
if output_name in result:
|
|
self.runtime.store[store_key] = result[output_name]
|
|
else:
|
|
for output_name, value in result.items():
|
|
self.runtime.store[output_name] = value
|
|
|
|
return result
|