mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-04-26 14:54:55 +00:00
feat: Add complete Python workflow executor from AutoMetabuilder
Add full Python workflow execution engine with: Core Executor: - engine.py: WorkflowEngine for running n8n configs - n8n_executor.py: N8N-style workflow execution with connections - node_executor.py: Individual node execution with plugin dispatch - loop_executor.py: Loop node execution with iteration control - execution_order.py: Topological sort for node ordering Schema & Validation: - n8n_schema.py: N8N workflow schema types and validation - n8n_converter.py: Legacy to n8n schema conversion Plugin System: - plugin_loader.py: Dynamic plugin loading - plugin_registry.py: Plugin discovery and registration - plugin_map.json: 116 plugin type mappings Runtime & Context: - runtime.py: Workflow runtime container - input_resolver.py: Binding and coercion resolution - value_helpers.py: Value normalization helpers - workflow_context_builder.py: Runtime context assembly - workflow_config_loader.py: Configuration loading - workflow_engine_builder.py: Engine assembly with dependencies Utilities: - tool_calls_handler.py: LLM tool call handling - tool_runner.py: Tool execution with logging - notification_helpers.py: Slack/Discord notifications - workflow_adapter.py: N8N format handling - workflow_graph.py: Node/edge graph for visualization Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
50
workflow/executor/python/loop_executor.py
Normal file
50
workflow/executor/python/loop_executor.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Execute workflow loops."""
|
||||
|
||||
|
||||
class LoopExecutor:
|
||||
"""Execute loop nodes."""
|
||||
def __init__(self, runtime, input_resolver):
|
||||
self.runtime = runtime
|
||||
self.input_resolver = input_resolver
|
||||
self.node_executor = None
|
||||
|
||||
def set_node_executor(self, node_executor) -> None:
|
||||
"""Inject node executor dependency."""
|
||||
self.node_executor = node_executor
|
||||
|
||||
def execute(self, node):
|
||||
"""Run loop body until stop condition."""
|
||||
inputs = node.get("inputs", {})
|
||||
max_iterations = self.input_resolver.resolve_binding(inputs.get("max_iterations", 1))
|
||||
stop_when_raw = inputs.get("stop_when")
|
||||
stop_on_raw = inputs.get("stop_on", True)
|
||||
|
||||
try:
|
||||
max_iterations = int(max_iterations)
|
||||
except (TypeError, ValueError):
|
||||
max_iterations = 1
|
||||
|
||||
if self.runtime.context["args"].once:
|
||||
max_iterations = min(max_iterations, 1)
|
||||
|
||||
stop_on = self.input_resolver.coerce_bool(self.input_resolver.resolve_binding(stop_on_raw))
|
||||
body = node.get("body", [])
|
||||
if not isinstance(body, list):
|
||||
self.runtime.logger.error("Loop body must be a list of nodes.")
|
||||
return None
|
||||
|
||||
iteration = 0
|
||||
while iteration < max_iterations:
|
||||
iteration += 1
|
||||
self.runtime.logger.info("--- Loop iteration %s ---", iteration)
|
||||
if not self.node_executor:
|
||||
self.runtime.logger.error("Loop executor missing node executor.")
|
||||
return None
|
||||
self.node_executor.execute_nodes(body)
|
||||
|
||||
if stop_when_raw is not None:
|
||||
stop_value = self.input_resolver.resolve_binding(stop_when_raw)
|
||||
if self.input_resolver.coerce_bool(stop_value) == stop_on:
|
||||
break
|
||||
|
||||
return None
|
||||
Reference in New Issue
Block a user