refactor(workflow): convert all plugins to class/struct + factory pattern

- Python: class extending NodeExecutor + factory.py (80+ plugins)
- TypeScript: class implements NodeExecutor + factory.ts (7 groups, 116 classes)
- Go: struct with methods + factory.go (36 plugins)
- Rust: struct impl NodeExecutor trait + factory.rs (54 plugins)
- Mojo: struct + factory.mojo (11 plugins)

All package.json files now include:
- files array listing source files
- metadata.class/struct field
- metadata.entrypoint field

This enables a unified plugin loading system across all languages
with no import side effects (Spring-style DI pattern).

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-22 14:53:04 +00:00
parent 2562c2f55b
commit 7ce8b4ae8a
653 changed files with 13243 additions and 3034 deletions

View File

@@ -1,18 +1,27 @@
"""Workflow plugin: build tool map for function dispatch."""
from ...base import NodeExecutor
def run(runtime, inputs):
"""Build a map from tool names to their handlers.
This reads plugins from context and builds a dispatch map.
"""
plugins = runtime.context.get("plugins", {})
class BuildToolMap(NodeExecutor):
"""Build a map from tool names to their handlers."""
tool_map = {}
for plugin_type, plugin_info in plugins.items():
# Map plugin_type (e.g., "math.add") to handler info
tool_map[plugin_type] = plugin_info
node_type = "backend.build_tool_map"
category = "backend"
description = "Build tool map for function dispatch"
runtime.context["tool_map"] = tool_map
def execute(self, inputs, runtime=None):
"""Build a map from tool names to their handlers.
return {"success": True, "tool_count": len(tool_map)}
This reads plugins from context and builds a dispatch map.
"""
plugins = runtime.context.get("plugins", {})
tool_map = {}
for plugin_type, plugin_info in plugins.items():
# Map plugin_type (e.g., "math.add") to handler info
tool_map[plugin_type] = plugin_info
runtime.context["tool_map"] = tool_map
return {"success": True, "tool_count": len(tool_map)}

View File

@@ -0,0 +1,7 @@
"""Factory for BuildToolMap plugin."""
from .backend_build_tool_map import BuildToolMap
def create():
return BuildToolMap()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "tools"],
"main": "backend_build_tool_map.py",
"files": ["backend_build_tool_map.py", "factory.py"],
"metadata": {
"plugin_type": "backend.build_tool_map",
"category": "backend"
"category": "backend",
"class": "BuildToolMap",
"entrypoint": "execute"
}
}

View File

@@ -1,35 +1,45 @@
"""Workflow plugin: configure logging."""
import logging
import sys
from ...base import NodeExecutor
def run(runtime, inputs):
"""Configure logging for the workflow runtime.
Inputs:
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format: Log format string
file: Optional file path for log output
"""
level_str = inputs.get("level", "INFO").upper()
log_format = inputs.get("format", "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
log_file = inputs.get("file")
class ConfigureLogging(NodeExecutor):
"""Configure logging for the workflow runtime."""
level = getattr(logging, level_str, logging.INFO)
node_type = "backend.configure_logging"
category = "backend"
description = "Configure logging for workflow runtime"
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
handlers.append(logging.FileHandler(log_file))
def execute(self, inputs, runtime=None):
"""Configure logging for the workflow runtime.
logging.basicConfig(
level=level,
format=log_format,
handlers=handlers
)
Inputs:
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format: Log format string
file: Optional file path for log output
"""
level_str = inputs.get("level", "INFO").upper()
log_format = inputs.get("format", "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
log_file = inputs.get("file")
logger = logging.getLogger("metabuilder")
logger.setLevel(level)
level = getattr(logging, level_str, logging.INFO)
runtime.context["logger"] = logger
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
handlers.append(logging.FileHandler(log_file))
return {"success": True, "level": level_str}
logging.basicConfig(
level=level,
format=log_format,
handlers=handlers
)
logger = logging.getLogger("metabuilder")
logger.setLevel(level)
runtime.context["logger"] = logger
return {"success": True, "level": level_str}

View File

@@ -0,0 +1,7 @@
"""Factory for ConfigureLogging plugin."""
from .backend_configure_logging import ConfigureLogging
def create():
return ConfigureLogging()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "logging"],
"main": "backend_configure_logging.py",
"files": ["backend_configure_logging.py", "factory.py"],
"metadata": {
"plugin_type": "backend.configure_logging",
"category": "backend"
"category": "backend",
"class": "ConfigureLogging",
"entrypoint": "execute"
}
}

View File

@@ -1,18 +1,28 @@
"""Workflow plugin: create Discord client."""
import os
from ...base import NodeExecutor
def run(runtime, inputs):
"""Create a Discord webhook client and store in runtime context.
Inputs:
webhook_url: Discord webhook URL (defaults to DISCORD_WEBHOOK_URL env var)
"""
webhook_url = inputs.get("webhook_url") or os.getenv("DISCORD_WEBHOOK_URL")
class CreateDiscord(NodeExecutor):
"""Create a Discord webhook client and store in runtime context."""
if not webhook_url:
return {"success": False, "error": "No webhook URL provided"}
node_type = "backend.create_discord"
category = "backend"
description = "Create Discord webhook client for notifications"
runtime.context["discord_webhook"] = webhook_url
def execute(self, inputs, runtime=None):
"""Create a Discord webhook client and store in runtime context.
return {"success": True}
Inputs:
webhook_url: Discord webhook URL (defaults to DISCORD_WEBHOOK_URL env var)
"""
webhook_url = inputs.get("webhook_url") or os.getenv("DISCORD_WEBHOOK_URL")
if not webhook_url:
return {"success": False, "error": "No webhook URL provided"}
runtime.context["discord_webhook"] = webhook_url
return {"success": True}

View File

@@ -0,0 +1,7 @@
"""Factory for CreateDiscord plugin."""
from .backend_create_discord import CreateDiscord
def create():
return CreateDiscord()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "discord"],
"main": "backend_create_discord.py",
"files": ["backend_create_discord.py", "factory.py"],
"metadata": {
"plugin_type": "backend.create_discord",
"category": "backend"
"category": "backend",
"class": "CreateDiscord",
"entrypoint": "execute"
}
}

View File

@@ -1,25 +1,35 @@
"""Workflow plugin: create GitHub client."""
import os
from ...base import NodeExecutor
def run(runtime, inputs):
"""Create a GitHub client and store in runtime context.
Inputs:
token: GitHub token (defaults to GITHUB_TOKEN env var)
"""
try:
from github import Github
except ImportError:
return {"success": False, "error": "PyGithub package not installed"}
class CreateGitHub(NodeExecutor):
"""Create a GitHub client and store in runtime context."""
token = inputs.get("token") or os.getenv("GITHUB_TOKEN")
node_type = "backend.create_github"
category = "backend"
description = "Create GitHub client for repository operations"
if not token:
return {"success": False, "error": "No token provided"}
def execute(self, inputs, runtime=None):
"""Create a GitHub client and store in runtime context.
client = Github(token)
Inputs:
token: GitHub token (defaults to GITHUB_TOKEN env var)
"""
try:
from github import Github
except ImportError:
return {"success": False, "error": "PyGithub package not installed"}
runtime.context["github"] = client
token = inputs.get("token") or os.getenv("GITHUB_TOKEN")
return {"success": True}
if not token:
return {"success": False, "error": "No token provided"}
client = Github(token)
runtime.context["github"] = client
return {"success": True}

View File

@@ -0,0 +1,7 @@
"""Factory for CreateGitHub plugin."""
from .backend_create_github import CreateGitHub
def create():
return CreateGitHub()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "github"],
"main": "backend_create_github.py",
"files": ["backend_create_github.py", "factory.py"],
"metadata": {
"plugin_type": "backend.create_github",
"category": "backend"
"category": "backend",
"class": "CreateGitHub",
"entrypoint": "execute"
}
}

View File

@@ -1,28 +1,38 @@
"""Workflow plugin: create OpenAI client."""
import os
from ...base import NodeExecutor
def run(runtime, inputs):
"""Create an OpenAI client and store in runtime context.
Inputs:
api_key: OpenAI API key (defaults to OPENAI_API_KEY env var)
model: Model name (default: gpt-4)
"""
try:
from openai import OpenAI
except ImportError:
return {"success": False, "error": "openai package not installed"}
class CreateOpenAI(NodeExecutor):
"""Create an OpenAI client and store in runtime context."""
api_key = inputs.get("api_key") or os.getenv("OPENAI_API_KEY")
model = inputs.get("model", "gpt-4")
node_type = "backend.create_openai"
category = "backend"
description = "Create OpenAI client for AI operations"
if not api_key:
return {"success": False, "error": "No API key provided"}
def execute(self, inputs, runtime=None):
"""Create an OpenAI client and store in runtime context.
client = OpenAI(api_key=api_key)
Inputs:
api_key: OpenAI API key (defaults to OPENAI_API_KEY env var)
model: Model name (default: gpt-4)
"""
try:
from openai import OpenAI
except ImportError:
return {"success": False, "error": "openai package not installed"}
runtime.context["client"] = client
runtime.context["model_name"] = model
api_key = inputs.get("api_key") or os.getenv("OPENAI_API_KEY")
model = inputs.get("model", "gpt-4")
return {"success": True, "model": model}
if not api_key:
return {"success": False, "error": "No API key provided"}
client = OpenAI(api_key=api_key)
runtime.context["client"] = client
runtime.context["model_name"] = model
return {"success": True, "model": model}

View File

@@ -0,0 +1,7 @@
"""Factory for CreateOpenAI plugin."""
from .backend_create_openai import CreateOpenAI
def create():
return CreateOpenAI()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "openai", "ai"],
"main": "backend_create_openai.py",
"files": ["backend_create_openai.py", "factory.py"],
"metadata": {
"plugin_type": "backend.create_openai",
"category": "backend"
"category": "backend",
"class": "CreateOpenAI",
"entrypoint": "execute"
}
}

View File

@@ -1,25 +1,35 @@
"""Workflow plugin: create Slack client."""
import os
from ...base import NodeExecutor
def run(runtime, inputs):
"""Create a Slack client and store in runtime context.
Inputs:
token: Slack bot token (defaults to SLACK_BOT_TOKEN env var)
"""
try:
from slack_sdk import WebClient
except ImportError:
return {"success": False, "error": "slack_sdk package not installed"}
class CreateSlack(NodeExecutor):
"""Create a Slack client and store in runtime context."""
token = inputs.get("token") or os.getenv("SLACK_BOT_TOKEN")
node_type = "backend.create_slack"
category = "backend"
description = "Create Slack client for messaging"
if not token:
return {"success": False, "error": "No token provided"}
def execute(self, inputs, runtime=None):
"""Create a Slack client and store in runtime context.
client = WebClient(token=token)
Inputs:
token: Slack bot token (defaults to SLACK_BOT_TOKEN env var)
"""
try:
from slack_sdk import WebClient
except ImportError:
return {"success": False, "error": "slack_sdk package not installed"}
runtime.context["slack"] = client
token = inputs.get("token") or os.getenv("SLACK_BOT_TOKEN")
return {"success": True}
if not token:
return {"success": False, "error": "No token provided"}
client = WebClient(token=token)
runtime.context["slack"] = client
return {"success": True}

View File

@@ -0,0 +1,7 @@
"""Factory for CreateSlack plugin."""
from .backend_create_slack import CreateSlack
def create():
return CreateSlack()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "slack"],
"main": "backend_create_slack.py",
"files": ["backend_create_slack.py", "factory.py"],
"metadata": {
"plugin_type": "backend.create_slack",
"category": "backend"
"category": "backend",
"class": "CreateSlack",
"entrypoint": "execute"
}
}

View File

@@ -1,20 +1,31 @@
"""Workflow plugin: load environment variables."""
import os
from dotenv import load_dotenv
from ...base import NodeExecutor
def run(_runtime, inputs):
"""Load environment variables from .env file.
Inputs:
path: Optional path to .env file (default: .env)
override: Whether to override existing env vars (default: False)
"""
path = inputs.get("path", ".env")
override = inputs.get("override", False)
class LoadEnv(NodeExecutor):
"""Load environment variables from .env file."""
if os.path.exists(path):
load_dotenv(path, override=override)
return {"success": True, "path": path}
node_type = "backend.load_env"
category = "backend"
description = "Load environment variables from .env file"
return {"success": False, "error": f"File not found: {path}"}
def execute(self, inputs, runtime=None):
"""Load environment variables from .env file.
Inputs:
path: Optional path to .env file (default: .env)
override: Whether to override existing env vars (default: False)
"""
path = inputs.get("path", ".env")
override = inputs.get("override", False)
if os.path.exists(path):
load_dotenv(path, override=override)
return {"success": True, "path": path}
return {"success": False, "error": f"File not found: {path}"}

View File

@@ -0,0 +1,7 @@
"""Factory for LoadEnv plugin."""
from .backend_load_env import LoadEnv
def create():
return LoadEnv()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "env"],
"main": "backend_load_env.py",
"files": ["backend_load_env.py", "factory.py"],
"metadata": {
"plugin_type": "backend.load_env",
"category": "backend"
"category": "backend",
"class": "LoadEnv",
"entrypoint": "execute"
}
}

View File

@@ -1,29 +1,39 @@
"""Workflow plugin: load UI/CLI messages."""
import os
import json
from ...base import NodeExecutor
def run(runtime, inputs):
"""Load UI/CLI messages for localization.
Inputs:
path: Path to messages file
locale: Locale code (default: en)
"""
path = inputs.get("path", "config/messages")
locale = inputs.get("locale", "en")
class LoadMessages(NodeExecutor):
"""Load UI/CLI messages for localization."""
messages_file = os.path.join(path, f"{locale}.json")
node_type = "backend.load_messages"
category = "backend"
description = "Load UI/CLI messages for localization"
if not os.path.exists(messages_file):
messages_file = os.path.join(path, "en.json") # Fallback
def execute(self, inputs, runtime=None):
"""Load UI/CLI messages for localization.
if not os.path.exists(messages_file):
return {"success": False, "error": "No messages file found"}
Inputs:
path: Path to messages file
locale: Locale code (default: en)
"""
path = inputs.get("path", "config/messages")
locale = inputs.get("locale", "en")
with open(messages_file) as f:
messages = json.load(f)
messages_file = os.path.join(path, f"{locale}.json")
runtime.context["msgs"] = messages
if not os.path.exists(messages_file):
messages_file = os.path.join(path, "en.json") # Fallback
return {"success": True, "locale": locale, "message_count": len(messages)}
if not os.path.exists(messages_file):
return {"success": False, "error": "No messages file found"}
with open(messages_file) as f:
messages = json.load(f)
runtime.context["msgs"] = messages
return {"success": True, "locale": locale, "message_count": len(messages)}

View File

@@ -0,0 +1,7 @@
"""Factory for LoadMessages plugin."""
from .backend_load_messages import LoadMessages
def create():
return LoadMessages()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "messages", "i18n"],
"main": "backend_load_messages.py",
"files": ["backend_load_messages.py", "factory.py"],
"metadata": {
"plugin_type": "backend.load_messages",
"category": "backend"
"category": "backend",
"class": "LoadMessages",
"entrypoint": "execute"
}
}

View File

@@ -1,26 +1,36 @@
"""Workflow plugin: load workflow metadata."""
import os
import json
from ...base import NodeExecutor
def run(runtime, inputs):
"""Load workflow metadata from package.json or config.
Inputs:
path: Path to metadata file
"""
path = inputs.get("path", "package.json")
class LoadMetadata(NodeExecutor):
"""Load workflow metadata from package.json or config."""
if not os.path.exists(path):
return {"success": False, "error": f"File not found: {path}"}
node_type = "backend.load_metadata"
category = "backend"
description = "Load workflow metadata from config"
with open(path) as f:
metadata = json.load(f)
def execute(self, inputs, runtime=None):
"""Load workflow metadata from package.json or config.
runtime.context["metadata"] = metadata
Inputs:
path: Path to metadata file
"""
path = inputs.get("path", "package.json")
return {
"success": True,
"name": metadata.get("name"),
"version": metadata.get("version")
}
if not os.path.exists(path):
return {"success": False, "error": f"File not found: {path}"}
with open(path) as f:
metadata = json.load(f)
runtime.context["metadata"] = metadata
return {
"success": True,
"name": metadata.get("name"),
"version": metadata.get("version")
}

View File

@@ -0,0 +1,7 @@
"""Factory for LoadMetadata plugin."""
from .backend_load_metadata import LoadMetadata
def create():
return LoadMetadata()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "metadata"],
"main": "backend_load_metadata.py",
"files": ["backend_load_metadata.py", "factory.py"],
"metadata": {
"plugin_type": "backend.load_metadata",
"category": "backend"
"category": "backend",
"class": "LoadMetadata",
"entrypoint": "execute"
}
}

View File

@@ -1,50 +1,59 @@
"""Workflow plugin: load workflow plugins."""
import os
import json
import importlib.util
from ...base import NodeExecutor
def run(runtime, inputs):
"""Load workflow plugins from directory.
class LoadPlugins(NodeExecutor):
"""Load workflow plugins from directory."""
Inputs:
path: Path to plugins directory
"""
path = inputs.get("path", "workflow/plugins/python")
node_type = "backend.load_plugins"
category = "backend"
description = "Load workflow plugins from directory"
if not os.path.exists(path):
return {"success": False, "error": f"Path not found: {path}"}
def execute(self, inputs, runtime=None):
"""Load workflow plugins from directory.
plugins = {}
categories = []
Inputs:
path: Path to plugins directory
"""
path = inputs.get("path", "workflow/plugins/python")
for category in os.listdir(path):
category_path = os.path.join(path, category)
if not os.path.isdir(category_path) or category.startswith("_"):
continue
if not os.path.exists(path):
return {"success": False, "error": f"Path not found: {path}"}
categories.append(category)
plugins = {}
categories = []
for plugin_name in os.listdir(category_path):
plugin_path = os.path.join(category_path, plugin_name)
if not os.path.isdir(plugin_path):
for category in os.listdir(path):
category_path = os.path.join(path, category)
if not os.path.isdir(category_path) or category.startswith("_"):
continue
package_json = os.path.join(plugin_path, "package.json")
if os.path.exists(package_json):
with open(package_json) as f:
metadata = json.load(f)
plugin_type = metadata.get("metadata", {}).get("plugin_type")
if plugin_type:
plugins[plugin_type] = {
"path": plugin_path,
"metadata": metadata
}
categories.append(category)
runtime.context["plugins"] = plugins
for plugin_name in os.listdir(category_path):
plugin_path = os.path.join(category_path, plugin_name)
if not os.path.isdir(plugin_path):
continue
return {
"success": True,
"categories": categories,
"plugin_count": len(plugins)
}
package_json = os.path.join(plugin_path, "package.json")
if os.path.exists(package_json):
with open(package_json) as f:
metadata = json.load(f)
plugin_type = metadata.get("metadata", {}).get("plugin_type")
if plugin_type:
plugins[plugin_type] = {
"path": plugin_path,
"metadata": metadata
}
runtime.context["plugins"] = plugins
return {
"success": True,
"categories": categories,
"plugin_count": len(plugins)
}

View File

@@ -0,0 +1,7 @@
"""Factory for LoadPlugins plugin."""
from .backend_load_plugins import LoadPlugins
def create():
return LoadPlugins()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "loader"],
"main": "backend_load_plugins.py",
"files": ["backend_load_plugins.py", "factory.py"],
"metadata": {
"plugin_type": "backend.load_plugins",
"category": "backend"
"category": "backend",
"class": "LoadPlugins",
"entrypoint": "execute"
}
}

View File

@@ -1,21 +1,31 @@
"""Workflow plugin: load system prompt."""
import os
from ...base import NodeExecutor
def run(runtime, inputs):
"""Load system prompt from file.
Inputs:
path: Path to prompt file
"""
path = inputs.get("path", "config/system_prompt.txt")
class LoadPrompt(NodeExecutor):
"""Load system prompt from file."""
if not os.path.exists(path):
return {"success": False, "error": f"File not found: {path}"}
node_type = "backend.load_prompt"
category = "backend"
description = "Load system prompt from file"
with open(path) as f:
prompt = f.read()
def execute(self, inputs, runtime=None):
"""Load system prompt from file.
runtime.context["system_prompt"] = prompt
Inputs:
path: Path to prompt file
"""
path = inputs.get("path", "config/system_prompt.txt")
return {"success": True, "length": len(prompt)}
if not os.path.exists(path):
return {"success": False, "error": f"File not found: {path}"}
with open(path) as f:
prompt = f.read()
runtime.context["system_prompt"] = prompt
return {"success": True, "length": len(prompt)}

View File

@@ -0,0 +1,7 @@
"""Factory for LoadPrompt plugin."""
from .backend_load_prompt import LoadPrompt
def create():
return LoadPrompt()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "prompt", "ai"],
"main": "backend_load_prompt.py",
"files": ["backend_load_prompt.py", "factory.py"],
"metadata": {
"plugin_type": "backend.load_prompt",
"category": "backend"
"category": "backend",
"class": "LoadPrompt",
"entrypoint": "execute"
}
}

View File

@@ -1,24 +1,34 @@
"""Workflow plugin: load tool policies."""
import os
import json
from ...base import NodeExecutor
def run(runtime, inputs):
"""Load tool policies for access control.
Inputs:
path: Path to tool policies file
"""
path = inputs.get("path", "config/tool_policies.json")
class LoadToolPolicies(NodeExecutor):
"""Load tool policies for access control."""
if not os.path.exists(path):
# Default to permissive if no policies file
runtime.context["tool_policies"] = {}
return {"success": True, "policy_count": 0}
node_type = "backend.load_tool_policies"
category = "backend"
description = "Load tool policies for access control"
with open(path) as f:
policies = json.load(f)
def execute(self, inputs, runtime=None):
"""Load tool policies for access control.
runtime.context["tool_policies"] = policies
Inputs:
path: Path to tool policies file
"""
path = inputs.get("path", "config/tool_policies.json")
return {"success": True, "policy_count": len(policies)}
if not os.path.exists(path):
# Default to permissive if no policies file
runtime.context["tool_policies"] = {}
return {"success": True, "policy_count": 0}
with open(path) as f:
policies = json.load(f)
runtime.context["tool_policies"] = policies
return {"success": True, "policy_count": len(policies)}

View File

@@ -0,0 +1,7 @@
"""Factory for LoadToolPolicies plugin."""
from .backend_load_tool_policies import LoadToolPolicies
def create():
return LoadToolPolicies()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "tools", "policy"],
"main": "backend_load_tool_policies.py",
"files": ["backend_load_tool_policies.py", "factory.py"],
"metadata": {
"plugin_type": "backend.load_tool_policies",
"category": "backend"
"category": "backend",
"class": "LoadToolPolicies",
"entrypoint": "execute"
}
}

View File

@@ -1,22 +1,32 @@
"""Workflow plugin: load tool registry."""
import os
import json
from ...base import NodeExecutor
def run(runtime, inputs):
"""Load tool registry defining available AI tools.
Inputs:
path: Path to tool registry file
"""
path = inputs.get("path", "config/tool_registry.json")
class LoadToolRegistry(NodeExecutor):
"""Load tool registry defining available AI tools."""
if not os.path.exists(path):
return {"success": False, "error": f"File not found: {path}"}
node_type = "backend.load_tool_registry"
category = "backend"
description = "Load tool registry for AI function calling"
with open(path) as f:
registry = json.load(f)
def execute(self, inputs, runtime=None):
"""Load tool registry defining available AI tools.
runtime.context["tool_registry"] = registry
Inputs:
path: Path to tool registry file
"""
path = inputs.get("path", "config/tool_registry.json")
return {"success": True, "tool_count": len(registry)}
if not os.path.exists(path):
return {"success": False, "error": f"File not found: {path}"}
with open(path) as f:
registry = json.load(f)
runtime.context["tool_registry"] = registry
return {"success": True, "tool_count": len(registry)}

View File

@@ -0,0 +1,7 @@
"""Factory for LoadToolRegistry plugin."""
from .backend_load_tool_registry import LoadToolRegistry
def create():
return LoadToolRegistry()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "tools", "registry"],
"main": "backend_load_tool_registry.py",
"files": ["backend_load_tool_registry.py", "factory.py"],
"metadata": {
"plugin_type": "backend.load_tool_registry",
"category": "backend"
"category": "backend",
"class": "LoadToolRegistry",
"entrypoint": "execute"
}
}

View File

@@ -1,27 +1,37 @@
"""Workflow plugin: load tools for AI function calling."""
import os
import json
from ...base import NodeExecutor
def run(runtime, inputs):
"""Load tool definitions for AI function calling.
Inputs:
path: Path to tools definition file or directory
"""
path = inputs.get("path", "config/tools.json")
class LoadTools(NodeExecutor):
"""Load tool definitions for AI function calling."""
tools = []
node_type = "backend.load_tools"
category = "backend"
description = "Load tool definitions for AI function calling"
if os.path.isfile(path):
with open(path) as f:
tools = json.load(f)
elif os.path.isdir(path):
for filename in os.listdir(path):
if filename.endswith(".json"):
with open(os.path.join(path, filename)) as f:
tools.extend(json.load(f))
def execute(self, inputs, runtime=None):
"""Load tool definitions for AI function calling.
runtime.context["tools"] = tools
Inputs:
path: Path to tools definition file or directory
"""
path = inputs.get("path", "config/tools.json")
return {"success": True, "tool_count": len(tools)}
tools = []
if os.path.isfile(path):
with open(path) as f:
tools = json.load(f)
elif os.path.isdir(path):
for filename in os.listdir(path):
if filename.endswith(".json"):
with open(os.path.join(path, filename)) as f:
tools.extend(json.load(f))
runtime.context["tools"] = tools
return {"success": True, "tool_count": len(tools)}

View File

@@ -0,0 +1,7 @@
"""Factory for LoadTools plugin."""
from .backend_load_tools import LoadTools
def create():
return LoadTools()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "tools", "ai"],
"main": "backend_load_tools.py",
"files": ["backend_load_tools.py", "factory.py"],
"metadata": {
"plugin_type": "backend.load_tools",
"category": "backend"
"category": "backend",
"class": "LoadTools",
"entrypoint": "execute"
}
}

View File

@@ -1,27 +1,37 @@
"""Workflow plugin: parse CLI arguments."""
import argparse
from ...base import NodeExecutor
def run(runtime, inputs):
"""Parse command line arguments.
Inputs:
args: Optional list of arguments (defaults to sys.argv)
"""
parser = argparse.ArgumentParser(description="MetaBuilder Workflow")
class ParseCliArgs(NodeExecutor):
"""Parse command line arguments."""
parser.add_argument("--config", "-c", default="config.json",
help="Path to configuration file")
parser.add_argument("--workflow", "-w",
help="Path to workflow file")
parser.add_argument("--verbose", "-v", action="store_true",
help="Enable verbose output")
parser.add_argument("--dry-run", action="store_true",
help="Simulate workflow execution")
node_type = "backend.parse_cli_args"
category = "backend"
description = "Parse command line arguments"
args_list = inputs.get("args")
parsed = parser.parse_args(args_list)
def execute(self, inputs, runtime=None):
"""Parse command line arguments.
runtime.context["cli_args"] = vars(parsed)
Inputs:
args: Optional list of arguments (defaults to sys.argv)
"""
parser = argparse.ArgumentParser(description="MetaBuilder Workflow")
return {"success": True, "args": vars(parsed)}
parser.add_argument("--config", "-c", default="config.json",
help="Path to configuration file")
parser.add_argument("--workflow", "-w",
help="Path to workflow file")
parser.add_argument("--verbose", "-v", action="store_true",
help="Enable verbose output")
parser.add_argument("--dry-run", action="store_true",
help="Simulate workflow execution")
args_list = inputs.get("args")
parsed = parser.parse_args(args_list)
runtime.context["cli_args"] = vars(parsed)
return {"success": True, "args": vars(parsed)}

View File

@@ -0,0 +1,7 @@
"""Factory for ParseCliArgs plugin."""
from .backend_parse_cli_args import ParseCliArgs
def create():
return ParseCliArgs()

View File

@@ -6,8 +6,11 @@
"license": "MIT",
"keywords": ["backend", "workflow", "plugin", "cli", "args"],
"main": "backend_parse_cli_args.py",
"files": ["backend_parse_cli_args.py", "factory.py"],
"metadata": {
"plugin_type": "backend.parse_cli_args",
"category": "backend"
"category": "backend",
"class": "ParseCliArgs",
"entrypoint": "execute"
}
}

View File

@@ -0,0 +1,202 @@
"""
Base classes and types for workflow plugins.
This module provides the class-based plugin architecture that mirrors
the TypeScript implementation for consistency across languages.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable
from enum import Enum
import time
class NodeStatus(Enum):
"""Status of node execution."""
SUCCESS = "success"
ERROR = "error"
SKIPPED = "skipped"
PENDING = "pending"
@dataclass
class NodeResult:
"""Result of a node execution."""
status: NodeStatus
output: Optional[Any] = None
error: Optional[str] = None
error_code: Optional[str] = None
timestamp: int = field(default_factory=lambda: int(time.time() * 1000))
duration: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
result = {
"status": self.status.value,
"timestamp": self.timestamp,
}
if self.output is not None:
result["output"] = self.output
if self.error is not None:
result["error"] = self.error
if self.error_code is not None:
result["errorCode"] = self.error_code
if self.duration is not None:
result["duration"] = self.duration
return result
@dataclass
class ValidationResult:
"""Result of node validation."""
valid: bool
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
@dataclass
class WorkflowNode:
"""Workflow node definition."""
id: str
name: str
node_type: str
parameters: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "WorkflowNode":
"""Create from dictionary."""
return cls(
id=data.get("id", ""),
name=data.get("name", ""),
node_type=data.get("nodeType", data.get("node_type", "")),
parameters=data.get("parameters", {}),
metadata=data.get("metadata", {}),
)
@dataclass
class WorkflowContext:
"""Context for workflow execution."""
execution_id: str
tenant_id: str
user_id: str
trigger_data: Dict[str, Any] = field(default_factory=dict)
variables: Dict[str, Any] = field(default_factory=dict)
secrets: Dict[str, str] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "WorkflowContext":
"""Create from dictionary."""
return cls(
execution_id=data.get("executionId", data.get("execution_id", "")),
tenant_id=data.get("tenantId", data.get("tenant_id", "")),
user_id=data.get("userId", data.get("user_id", "")),
trigger_data=data.get("triggerData", data.get("trigger_data", {})),
variables=data.get("variables", {}),
secrets=data.get("secrets", {}),
)
class ExecutionState(dict):
"""State dictionary for workflow execution with variable storage."""
@property
def variables(self) -> Dict[str, Any]:
"""Get or create variables dict."""
if "variables" not in self:
self["variables"] = {}
return self["variables"]
@variables.setter
def variables(self, value: Dict[str, Any]) -> None:
self["variables"] = value
class NodeExecutor(ABC):
"""
Abstract base class for node executors.
All workflow plugins should inherit from this class and implement
the execute() method. The run() method provides legacy compatibility.
"""
node_type: str = ""
category: str = ""
description: str = ""
@abstractmethod
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""
Execute the node logic.
Args:
inputs: Input parameters dictionary
runtime: Optional runtime context (for advanced use)
Returns:
Dict with 'result' key on success, or 'error' key on failure
"""
pass
def validate(self, inputs: Dict[str, Any]) -> ValidationResult:
"""
Validate inputs.
Override this method to add custom validation logic.
Default implementation returns valid=True.
Args:
inputs: Input parameters to validate
Returns:
ValidationResult with valid flag and any errors/warnings
"""
return ValidationResult(valid=True)
def run(self, runtime: Any, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""
Legacy interface - calls execute().
Args:
runtime: Runtime context (passed through)
inputs: Input parameters
Returns:
Dict with result or error
"""
return self.execute(inputs, runtime)
def _resolve(self, value: Any, inputs: Dict[str, Any]) -> Any:
"""
Resolve template expressions in values.
Handles {{variable}} syntax for dynamic values.
"""
if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"):
expr = value[2:-2].strip()
return self._get_nested(inputs, expr)
return value
def _get_nested(self, data: Dict[str, Any], path: str) -> Any:
"""Get nested value from dict using dot notation."""
parts = path.split(".")
current = data
for part in parts:
if isinstance(current, dict):
current = current.get(part)
elif hasattr(current, part):
current = getattr(current, part)
else:
return None
if current is None:
return None
return current

View File

@@ -1,5 +1,7 @@
"""Workflow plugin: get current bot execution status."""
from ...base import NodeExecutor
# Global state for bot process
_bot_process = None
_mock_running = False
@@ -27,13 +29,20 @@ def reset_bot_state():
_mock_running = False
def run(_runtime, _inputs):
"""Get current bot execution status.
class ControlGetBotStatus(NodeExecutor):
"""Get current bot execution status."""
Returns:
Dictionary with:
- is_running: bool - Whether the bot is currently running
- config: dict - Current run configuration (empty if not running)
- process: object - Bot process object (or None if not running)
"""
return get_bot_state()
node_type = "control.get_bot_status"
category = "control"
description = "Get current bot execution status"
def execute(self, inputs, runtime=None):
"""Get current bot execution status.
Returns:
Dictionary with:
- is_running: bool - Whether the bot is currently running
- config: dict - Current run configuration (empty if not running)
- process: object - Bot process object (or None if not running)
"""
return get_bot_state()

View File

@@ -0,0 +1,7 @@
"""Factory for ControlGetBotStatus plugin."""
from .control_get_bot_status import ControlGetBotStatus
def create():
return ControlGetBotStatus()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/control_get_bot_status",
"version": "1.0.0",
"description": "control_get_bot_status plugin",
"description": "Get current bot execution status",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["control", "workflow", "plugin"],
"main": "control_get_bot_status.py",
"files": ["control_get_bot_status.py", "factory.py"],
"metadata": {
"plugin_type": "control.get_bot_status",
"category": "control"
"category": "control",
"class": "ControlGetBotStatus",
"entrypoint": "execute"
}
}

View File

@@ -1,13 +1,22 @@
"""Workflow plugin: reset bot execution state."""
from .control_get_bot_status import reset_bot_state
from ...base import NodeExecutor
from ..control_get_bot_status.control_get_bot_status import reset_bot_state
def run(_runtime, _inputs):
"""Reset bot execution state.
class ControlResetBotState(NodeExecutor):
"""Reset bot execution state."""
Returns:
Dictionary with:
- reset: bool - Always True to indicate state was reset
"""
reset_bot_state()
return {"reset": True}
node_type = "control.reset_bot_state"
category = "control"
description = "Reset bot execution state"
def execute(self, inputs, runtime=None):
"""Reset bot execution state.
Returns:
Dictionary with:
- reset: bool - Always True to indicate state was reset
"""
reset_bot_state()
return {"reset": True}

View File

@@ -0,0 +1,7 @@
"""Factory for ControlResetBotState plugin."""
from .control_reset_bot_state import ControlResetBotState
def create():
return ControlResetBotState()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/control_reset_bot_state",
"version": "1.0.0",
"description": "control_reset_bot_state plugin",
"description": "Reset bot execution state",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["control", "workflow", "plugin"],
"main": "control_reset_bot_state.py",
"files": ["control_reset_bot_state.py", "factory.py"],
"metadata": {
"plugin_type": "control.reset_bot_state",
"category": "control"
"category": "control",
"class": "ControlResetBotState",
"entrypoint": "execute"
}
}

View File

@@ -1,20 +1,19 @@
"""Workflow plugin: start bot execution in background thread."""
import os
import subprocess
import sys
import threading
import time
from .control_get_bot_status import (
from ...base import NodeExecutor
from ..control_get_bot_status.control_get_bot_status import (
get_bot_state,
reset_bot_state,
_bot_process,
_mock_running,
_current_run_config
)
# Import global state
import workflow.plugins.python.control.control_get_bot_status as bot_status
import workflow.plugins.python.control.control_get_bot_status.control_get_bot_status as bot_status
def _run_bot_task(mode: str, iterations: int, yolo: bool, stop_at_mvp: bool) -> None:
@@ -61,37 +60,44 @@ def _run_bot_task(mode: str, iterations: int, yolo: bool, stop_at_mvp: bool) ->
reset_bot_state()
def run(_runtime, inputs):
"""Start bot execution in background thread.
class ControlStartBot(NodeExecutor):
"""Start bot execution in background thread."""
Args:
inputs: Dictionary with keys:
- mode: str (default: "once") - Execution mode ("once", "iterations", etc.)
- iterations: int (default: 1) - Number of iterations for "iterations" mode
- yolo: bool (default: True) - Run in YOLO mode
- stop_at_mvp: bool (default: False) - Stop when MVP is reached
node_type = "control.start_bot"
category = "control"
description = "Start bot execution in background thread"
Returns:
Dictionary with:
- started: bool - Whether the bot was started successfully
- error: str (optional) - Error message if bot is already running
"""
mode = inputs.get("mode", "once")
iterations = inputs.get("iterations", 1)
yolo = inputs.get("yolo", True)
stop_at_mvp = inputs.get("stop_at_mvp", False)
def execute(self, inputs, runtime=None):
"""Start bot execution in background thread.
# Check if bot is already running
state = get_bot_state()
if state["is_running"]:
return {"started": False, "error": "Bot already running"}
Args:
inputs: Dictionary with keys:
- mode: str (default: "once") - Execution mode ("once", "iterations", etc.)
- iterations: int (default: 1) - Number of iterations for "iterations" mode
- yolo: bool (default: True) - Run in YOLO mode
- stop_at_mvp: bool (default: False) - Stop when MVP is reached
# Start bot in background thread
thread = threading.Thread(
target=_run_bot_task,
args=(mode, iterations, yolo, stop_at_mvp),
daemon=True
)
thread.start()
Returns:
Dictionary with:
- started: bool - Whether the bot was started successfully
- error: str (optional) - Error message if bot is already running
"""
mode = inputs.get("mode", "once")
iterations = inputs.get("iterations", 1)
yolo = inputs.get("yolo", True)
stop_at_mvp = inputs.get("stop_at_mvp", False)
return {"started": True}
# Check if bot is already running
state = get_bot_state()
if state["is_running"]:
return {"started": False, "error": "Bot already running"}
# Start bot in background thread
thread = threading.Thread(
target=_run_bot_task,
args=(mode, iterations, yolo, stop_at_mvp),
daemon=True
)
thread.start()
return {"started": True}

View File

@@ -0,0 +1,7 @@
"""Factory for ControlStartBot plugin."""
from .control_start_bot import ControlStartBot
def create():
return ControlStartBot()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/control_start_bot",
"version": "1.0.0",
"description": "control_start_bot plugin",
"description": "Start bot execution in background thread",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["control", "workflow", "plugin"],
"main": "control_start_bot.py",
"files": ["control_start_bot.py", "factory.py"],
"metadata": {
"plugin_type": "control.start_bot",
"category": "control"
"category": "control",
"class": "ControlStartBot",
"entrypoint": "execute"
}
}

View File

@@ -1,11 +1,32 @@
"""Workflow plugin: switch/case control flow."""
from ...base import NodeExecutor
def run(_runtime, inputs):
class ControlSwitch(NodeExecutor):
"""Switch on value and return matching case."""
value = inputs.get("value")
cases = inputs.get("cases", {})
default = inputs.get("default")
result = cases.get(str(value), default)
return {"result": result, "matched": str(value) in cases}
node_type = "control.switch"
category = "control"
description = "Switch/case control flow"
def execute(self, inputs, runtime=None):
"""Switch on value and return matching case.
Args:
inputs: Dictionary with keys:
- value: The value to switch on
- cases: dict - Map of case values to results
- default: Default value if no case matches
Returns:
Dictionary with:
- result: The matched case value or default
- matched: bool - Whether a case was matched
"""
value = inputs.get("value")
cases = inputs.get("cases", {})
default = inputs.get("default")
result = cases.get(str(value), default)
return {"result": result, "matched": str(value) in cases}

View File

@@ -0,0 +1,7 @@
"""Factory for ControlSwitch plugin."""
from .control_switch import ControlSwitch
def create():
return ControlSwitch()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/control_switch",
"version": "1.0.0",
"description": "control_switch plugin",
"description": "Switch/case control flow",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["control", "workflow", "plugin"],
"main": "control_switch.py",
"files": ["control_switch.py", "factory.py"],
"metadata": {
"plugin_type": "control.switch",
"category": "control"
"category": "control",
"class": "ControlSwitch",
"entrypoint": "execute"
}
}

View File

@@ -1,13 +1,21 @@
"""Workflow plugin: parse JSON string."""
import json
from ...base import NodeExecutor
def run(_runtime, inputs):
class ConvertParseJson(NodeExecutor):
"""Parse JSON string to object."""
text = inputs.get("text", "")
try:
result = json.loads(text)
return {"result": result}
except json.JSONDecodeError as e:
return {"result": None, "error": str(e)}
node_type = "convert.parseJson"
category = "convert"
description = "Parse JSON string to object"
def execute(self, inputs, runtime=None):
text = inputs.get("text", "")
try:
result = json.loads(text)
return {"result": result}
except json.JSONDecodeError as e:
return {"result": None, "error": str(e)}

View File

@@ -0,0 +1,7 @@
"""Factory for ConvertParseJson plugin."""
from .convert_parse_json import ConvertParseJson
def create():
return ConvertParseJson()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/convert_parse_json",
"version": "1.0.0",
"description": "convert_parse_json plugin",
"description": "Parse JSON string to object",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["convert", "workflow", "plugin"],
"main": "convert_parse_json.py",
"files": ["convert_parse_json.py", "factory.py"],
"metadata": {
"plugin_type": "convert.parse_json",
"category": "convert"
"plugin_type": "convert.parseJson",
"category": "convert",
"class": "ConvertParseJson",
"entrypoint": "execute"
}
}

View File

@@ -1,11 +1,17 @@
"""Workflow plugin: convert to boolean."""
from ...base import NodeExecutor
def run(_runtime, inputs):
class ConvertToBoolean(NodeExecutor):
"""Convert value to boolean."""
value = inputs.get("value")
if isinstance(value, str):
return {"result": value.lower() not in ("false", "0", "", "none", "null")}
node_type = "convert.toBoolean"
category = "convert"
description = "Convert value to boolean"
return {"result": bool(value)}
def execute(self, inputs, runtime=None):
value = inputs.get("value")
if isinstance(value, str):
return {"result": value.lower() not in ("false", "0", "", "none", "null")}
return {"result": bool(value)}

View File

@@ -0,0 +1,7 @@
"""Factory for ConvertToBoolean plugin."""
from .convert_to_boolean import ConvertToBoolean
def create():
return ConvertToBoolean()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/convert_to_boolean",
"version": "1.0.0",
"description": "convert_to_boolean plugin",
"description": "Convert value to boolean",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["convert", "workflow", "plugin"],
"main": "convert_to_boolean.py",
"files": ["convert_to_boolean.py", "factory.py"],
"metadata": {
"plugin_type": "convert.to_boolean",
"category": "convert"
"plugin_type": "convert.toBoolean",
"category": "convert",
"class": "ConvertToBoolean",
"entrypoint": "execute"
}
}

View File

@@ -1,17 +1,25 @@
"""Workflow plugin: convert to dictionary."""
from ...base import NodeExecutor
def run(_runtime, inputs):
class ConvertToDict(NodeExecutor):
"""Convert value to dictionary."""
value = inputs.get("value")
if isinstance(value, dict):
return {"result": value}
elif isinstance(value, list):
# Convert list of [key, value] pairs to dict
try:
return {"result": dict(value)}
except (TypeError, ValueError):
return {"result": {}, "error": "Cannot convert list to dict"}
else:
return {"result": {}}
node_type = "convert.toDict"
category = "convert"
description = "Convert value to dictionary"
def execute(self, inputs, runtime=None):
value = inputs.get("value")
if isinstance(value, dict):
return {"result": value}
elif isinstance(value, list):
# Convert list of [key, value] pairs to dict
try:
return {"result": dict(value)}
except (TypeError, ValueError):
return {"result": {}, "error": "Cannot convert list to dict"}
else:
return {"result": {}}

View File

@@ -0,0 +1,7 @@
"""Factory for ConvertToDict plugin."""
from .convert_to_dict import ConvertToDict
def create():
return ConvertToDict()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/convert_to_dict",
"version": "1.0.0",
"description": "convert_to_dict plugin",
"description": "Convert value to dictionary",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["convert", "workflow", "plugin"],
"main": "convert_to_dict.py",
"files": ["convert_to_dict.py", "factory.py"],
"metadata": {
"plugin_type": "convert.to_dict",
"category": "convert"
"plugin_type": "convert.toDict",
"category": "convert",
"class": "ConvertToDict",
"entrypoint": "execute"
}
}

View File

@@ -1,14 +1,23 @@
"""Workflow plugin: convert to JSON string."""
import json
from ...base import NodeExecutor
def run(_runtime, inputs):
class ConvertToJson(NodeExecutor):
"""Convert value to JSON string."""
value = inputs.get("value")
indent = inputs.get("indent")
try:
result = json.dumps(value, indent=indent)
return {"result": result}
except (TypeError, ValueError) as e:
return {"result": None, "error": str(e)}
node_type = "convert.toJson"
category = "convert"
description = "Convert value to JSON string"
def execute(self, inputs, runtime=None):
value = inputs.get("value")
indent = inputs.get("indent")
try:
result = json.dumps(value, indent=indent)
return {"result": result}
except (TypeError, ValueError) as e:
return {"result": None, "error": str(e)}

View File

@@ -0,0 +1,7 @@
"""Factory for ConvertToJson plugin."""
from .convert_to_json import ConvertToJson
def create():
return ConvertToJson()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/convert_to_json",
"version": "1.0.0",
"description": "convert_to_json plugin",
"description": "Convert value to JSON string",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["convert", "workflow", "plugin"],
"main": "convert_to_json.py",
"files": ["convert_to_json.py", "factory.py"],
"metadata": {
"plugin_type": "convert.to_json",
"category": "convert"
"plugin_type": "convert.toJson",
"category": "convert",
"class": "ConvertToJson",
"entrypoint": "execute"
}
}

View File

@@ -1,17 +1,25 @@
"""Workflow plugin: convert to list."""
from ...base import NodeExecutor
def run(_runtime, inputs):
class ConvertToList(NodeExecutor):
"""Convert value to list."""
value = inputs.get("value")
if isinstance(value, list):
return {"result": value}
elif isinstance(value, (tuple, set)):
return {"result": list(value)}
elif isinstance(value, dict):
return {"result": list(value.items())}
elif value is None:
return {"result": []}
else:
return {"result": [value]}
node_type = "convert.toList"
category = "convert"
description = "Convert value to list"
def execute(self, inputs, runtime=None):
value = inputs.get("value")
if isinstance(value, list):
return {"result": value}
elif isinstance(value, (tuple, set)):
return {"result": list(value)}
elif isinstance(value, dict):
return {"result": list(value.items())}
elif value is None:
return {"result": []}
else:
return {"result": [value]}

View File

@@ -0,0 +1,7 @@
"""Factory for ConvertToList plugin."""
from .convert_to_list import ConvertToList
def create():
return ConvertToList()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/convert_to_list",
"version": "1.0.0",
"description": "convert_to_list plugin",
"description": "Convert value to list",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["convert", "workflow", "plugin"],
"main": "convert_to_list.py",
"files": ["convert_to_list.py", "factory.py"],
"metadata": {
"plugin_type": "convert.to_list",
"category": "convert"
"plugin_type": "convert.toList",
"category": "convert",
"class": "ConvertToList",
"entrypoint": "execute"
}
}

View File

@@ -1,14 +1,21 @@
"""Workflow plugin: convert to number."""
from ...base import NodeExecutor
def run(_runtime, inputs):
class ConvertToNumber(NodeExecutor):
"""Convert value to number."""
value = inputs.get("value")
default = inputs.get("default", 0)
try:
if isinstance(value, str) and "." in value:
return {"result": float(value)}
return {"result": int(value)}
except (ValueError, TypeError):
return {"result": default, "error": "Cannot convert to number"}
node_type = "convert.toNumber"
category = "convert"
description = "Convert value to number"
def execute(self, inputs, runtime=None):
value = inputs.get("value")
default = inputs.get("default", 0)
try:
if isinstance(value, str) and "." in value:
return {"result": float(value)}
return {"result": int(value)}
except (ValueError, TypeError):
return {"result": default, "error": "Cannot convert to number"}

View File

@@ -0,0 +1,7 @@
"""Factory for ConvertToNumber plugin."""
from .convert_to_number import ConvertToNumber
def create():
return ConvertToNumber()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/convert_to_number",
"version": "1.0.0",
"description": "convert_to_number plugin",
"description": "Convert value to number",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["convert", "workflow", "plugin"],
"main": "convert_to_number.py",
"files": ["convert_to_number.py", "factory.py"],
"metadata": {
"plugin_type": "convert.to_number",
"category": "convert"
"plugin_type": "convert.toNumber",
"category": "convert",
"class": "ConvertToNumber",
"entrypoint": "execute"
}
}

View File

@@ -1,7 +1,15 @@
"""Workflow plugin: convert to string."""
from ...base import NodeExecutor
def run(_runtime, inputs):
class ConvertToString(NodeExecutor):
"""Convert value to string."""
value = inputs.get("value")
return {"result": str(value) if value is not None else ""}
node_type = "convert.toString"
category = "convert"
description = "Convert value to string"
def execute(self, inputs, runtime=None):
value = inputs.get("value")
return {"result": str(value) if value is not None else ""}

View File

@@ -0,0 +1,7 @@
"""Factory for ConvertToString plugin."""
from .convert_to_string import ConvertToString
def create():
return ConvertToString()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/convert_to_string",
"version": "1.0.0",
"description": "convert_to_string plugin",
"description": "Convert value to string",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["convert", "workflow", "plugin"],
"main": "convert_to_string.py",
"files": ["convert_to_string.py", "factory.py"],
"metadata": {
"plugin_type": "convert.to_string",
"category": "convert"
"plugin_type": "convert.toString",
"category": "convert",
"class": "ConvertToString",
"entrypoint": "execute"
}
}

View File

@@ -1,6 +1,9 @@
"""Workflow plugin: AI request."""
from tenacity import retry, stop_after_attempt, wait_exponential
from ...base import NodeExecutor
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def _get_completion(client, model, messages, tools):
@@ -15,25 +18,32 @@ def _get_completion(client, model, messages, tools):
)
def run(runtime, inputs):
"""Invoke the model with current messages."""
messages = list(inputs.get("messages") or [])
response = _get_completion(
runtime.context["client"],
runtime.context["model_name"],
messages,
runtime.context["tools"]
)
resp_msg = response.choices[0].message
runtime.logger.info(
resp_msg.content
if resp_msg.content
else runtime.context["msgs"]["info_tool_call_requested"]
)
messages.append(resp_msg)
tool_calls = getattr(resp_msg, "tool_calls", None) or []
return {
"response": resp_msg,
"has_tool_calls": bool(tool_calls),
"tool_calls_count": len(tool_calls)
}
class CoreAiRequest(NodeExecutor):
"""Invoke the AI model with current messages."""
node_type = "core.ai_request"
category = "core"
description = "Invoke the AI model with current messages and return the response"
def execute(self, inputs, runtime=None):
"""Invoke the model with current messages."""
messages = list(inputs.get("messages") or [])
response = _get_completion(
runtime.context["client"],
runtime.context["model_name"],
messages,
runtime.context["tools"]
)
resp_msg = response.choices[0].message
runtime.logger.info(
resp_msg.content
if resp_msg.content
else runtime.context["msgs"]["info_tool_call_requested"]
)
messages.append(resp_msg)
tool_calls = getattr(resp_msg, "tool_calls", None) or []
return {
"response": resp_msg,
"has_tool_calls": bool(tool_calls),
"tool_calls_count": len(tool_calls)
}

View File

@@ -0,0 +1,8 @@
"""Factory for CoreAiRequest plugin."""
from .core_ai_request import CoreAiRequest
def create():
"""Create a new CoreAiRequest instance."""
return CoreAiRequest()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/core_ai_request",
"version": "1.0.0",
"description": "core_ai_request plugin",
"description": "Invoke the AI model with current messages and return the response",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["core", "workflow", "plugin"],
"main": "core_ai_request.py",
"files": ["core_ai_request.py", "factory.py"],
"metadata": {
"plugin_type": "core.ai_request",
"category": "core"
"category": "core",
"class": "CoreAiRequest",
"entrypoint": "execute"
}
}

View File

@@ -1,13 +1,22 @@
"""Workflow plugin: append context message."""
from ...base import NodeExecutor
def run(runtime, inputs):
class CoreAppendContextMessage(NodeExecutor):
"""Append context to the message list."""
messages = list(inputs.get("messages") or [])
context_val = inputs.get("context")
if context_val:
messages.append({
"role": "system",
"content": f"{runtime.context['msgs']['sdlc_context_label']}{context_val}",
})
return {"messages": messages}
node_type = "core.append_context_message"
category = "core"
description = "Append context information to the message list as a system message"
def execute(self, inputs, runtime=None):
"""Append context to the message list."""
messages = list(inputs.get("messages") or [])
context_val = inputs.get("context")
if context_val:
messages.append({
"role": "system",
"content": f"{runtime.context['msgs']['sdlc_context_label']}{context_val}",
})
return {"messages": messages}

View File

@@ -0,0 +1,8 @@
"""Factory for CoreAppendContextMessage plugin."""
from .core_append_context_message import CoreAppendContextMessage
def create():
"""Create a new CoreAppendContextMessage instance."""
return CoreAppendContextMessage()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/core_append_context_message",
"version": "1.0.0",
"description": "core_append_context_message plugin",
"description": "Append context information to the message list as a system message",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["core", "workflow", "plugin"],
"main": "core_append_context_message.py",
"files": ["core_append_context_message.py", "factory.py"],
"metadata": {
"plugin_type": "core.append_context_message",
"category": "core"
"category": "core",
"class": "CoreAppendContextMessage",
"entrypoint": "execute"
}
}

View File

@@ -1,7 +1,10 @@
"""Workflow plugin: append tool results."""
import os
import re
from ...base import NodeExecutor
def _is_mvp_reached() -> bool:
"""Check if the MVP section in ROADMAP.md is completed."""
@@ -31,14 +34,21 @@ def _is_mvp_reached() -> bool:
return False
def run(runtime, inputs):
class CoreAppendToolResults(NodeExecutor):
"""Append tool results to the message list."""
messages = list(inputs.get("messages") or [])
tool_results = inputs.get("tool_results") or []
if tool_results:
messages.extend(tool_results)
if runtime.context.get("args", {}).get("yolo") and _is_mvp_reached():
runtime.logger.info("MVP reached. Stopping YOLO loop.")
node_type = "core.append_tool_results"
category = "core"
description = "Append tool execution results to the message list"
return {"messages": messages}
def execute(self, inputs, runtime=None):
"""Append tool results to the message list."""
messages = list(inputs.get("messages") or [])
tool_results = inputs.get("tool_results") or []
if tool_results:
messages.extend(tool_results)
if runtime.context.get("args", {}).get("yolo") and _is_mvp_reached():
runtime.logger.info("MVP reached. Stopping YOLO loop.")
return {"messages": messages}

View File

@@ -0,0 +1,8 @@
"""Factory for CoreAppendToolResults plugin."""
from .core_append_tool_results import CoreAppendToolResults
def create():
"""Create a new CoreAppendToolResults instance."""
return CoreAppendToolResults()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/core_append_tool_results",
"version": "1.0.0",
"description": "core_append_tool_results plugin",
"description": "Append tool execution results to the message list",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["core", "workflow", "plugin"],
"main": "core_append_tool_results.py",
"files": ["core_append_tool_results.py", "factory.py"],
"metadata": {
"plugin_type": "core.append_tool_results",
"category": "core"
"category": "core",
"class": "CoreAppendToolResults",
"entrypoint": "execute"
}
}

View File

@@ -1,8 +1,17 @@
"""Workflow plugin: append user instruction."""
from ...base import NodeExecutor
def run(runtime, inputs):
"""Append the next user instruction."""
messages = list(inputs.get("messages") or [])
messages.append({"role": "user", "content": runtime.context["msgs"]["user_next_step"]})
return {"messages": messages}
class CoreAppendUserInstruction(NodeExecutor):
"""Append the next user instruction to the message list."""
node_type = "core.append_user_instruction"
category = "core"
description = "Append the next user instruction to the message list"
def execute(self, inputs, runtime=None):
"""Append the next user instruction."""
messages = list(inputs.get("messages") or [])
messages.append({"role": "user", "content": runtime.context["msgs"]["user_next_step"]})
return {"messages": messages}

View File

@@ -0,0 +1,8 @@
"""Factory for CoreAppendUserInstruction plugin."""
from .core_append_user_instruction import CoreAppendUserInstruction
def create():
"""Create a new CoreAppendUserInstruction instance."""
return CoreAppendUserInstruction()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/core_append_user_instruction",
"version": "1.0.0",
"description": "core_append_user_instruction plugin",
"description": "Append the next user instruction to the message list",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["core", "workflow", "plugin"],
"main": "core_append_user_instruction.py",
"files": ["core_append_user_instruction.py", "factory.py"],
"metadata": {
"plugin_type": "core.append_user_instruction",
"category": "core"
"category": "core",
"class": "CoreAppendUserInstruction",
"entrypoint": "execute"
}
}

View File

@@ -1,43 +1,53 @@
"""Workflow plugin: load SDLC context."""
import os
import logging
from ...base import NodeExecutor
logger = logging.getLogger("metabuilder")
def run(runtime, _inputs):
class CoreLoadContext(NodeExecutor):
"""Load SDLC context into the workflow store."""
gh = runtime.context.get("gh")
msgs = runtime.context.get("msgs", {})
sdlc_context = ""
node_type = "core.load_context"
category = "core"
description = "Load SDLC context from ROADMAP.md and GitHub issues/PRs"
# Load ROADMAP.md if it exists
if os.path.exists("ROADMAP.md"):
with open("ROADMAP.md", "r", encoding="utf-8") as f:
roadmap_content = f.read()
label = msgs.get("roadmap_label", "ROADMAP.md Content:")
sdlc_context += f"\n{label}\n{roadmap_content}\n"
else:
msg = msgs.get(
"missing_roadmap_msg",
"ROADMAP.md is missing. Please analyze the repository and create it."
)
sdlc_context += f"\n{msg}\n"
def execute(self, inputs, runtime=None):
"""Load SDLC context into the workflow store."""
gh = runtime.context.get("gh")
msgs = runtime.context.get("msgs", {})
# Load GitHub issues and PRs if integration is available
if gh:
try:
issues = gh.get_open_issues()
issue_list = "\n".join([f"- #{i.number}: {i.title}" for i in issues[:5]])
if issue_list:
sdlc_context += f"\n{msgs['open_issues_label']}\n{issue_list}"
sdlc_context = ""
prs = gh.get_pull_requests()
pr_list = "\n".join([f"- #{p.number}: {p.title}" for p in prs[:5]])
if pr_list:
sdlc_context += f"\n{msgs['open_prs_label']}\n{pr_list}"
except Exception as error:
logger.error(msgs.get("error_sdlc_context", "Error: {error}").format(error=error))
# Load ROADMAP.md if it exists
if os.path.exists("ROADMAP.md"):
with open("ROADMAP.md", "r", encoding="utf-8") as f:
roadmap_content = f.read()
label = msgs.get("roadmap_label", "ROADMAP.md Content:")
sdlc_context += f"\n{label}\n{roadmap_content}\n"
else:
msg = msgs.get(
"missing_roadmap_msg",
"ROADMAP.md is missing. Please analyze the repository and create it."
)
sdlc_context += f"\n{msg}\n"
return {"context": sdlc_context}
# Load GitHub issues and PRs if integration is available
if gh:
try:
issues = gh.get_open_issues()
issue_list = "\n".join([f"- #{i.number}: {i.title}" for i in issues[:5]])
if issue_list:
sdlc_context += f"\n{msgs['open_issues_label']}\n{issue_list}"
prs = gh.get_pull_requests()
pr_list = "\n".join([f"- #{p.number}: {p.title}" for p in prs[:5]])
if pr_list:
sdlc_context += f"\n{msgs['open_prs_label']}\n{pr_list}"
except Exception as error:
logger.error(msgs.get("error_sdlc_context", "Error: {error}").format(error=error))
return {"context": sdlc_context}

View File

@@ -0,0 +1,8 @@
"""Factory for CoreLoadContext plugin."""
from .core_load_context import CoreLoadContext
def create():
"""Create a new CoreLoadContext instance."""
return CoreLoadContext()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/core_load_context",
"version": "1.0.0",
"description": "core_load_context plugin",
"description": "Load SDLC context from ROADMAP.md and GitHub issues/PRs",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["core", "workflow", "plugin"],
"main": "core_load_context.py",
"files": ["core_load_context.py", "factory.py"],
"metadata": {
"plugin_type": "core.load_context",
"category": "core"
"category": "core",
"class": "CoreLoadContext",
"entrypoint": "execute"
}
}

View File

@@ -1,37 +1,47 @@
"""Workflow plugin: run tool calls."""
import json
def run(runtime, inputs):
from ...base import NodeExecutor
class CoreRunToolCalls(NodeExecutor):
"""Execute tool calls from an AI response."""
resp_msg = inputs.get("response")
tool_calls = getattr(resp_msg, "tool_calls", None) or []
if not resp_msg:
return {"tool_results": [], "no_tool_calls": True}
# Handle tool calls using tool map from context
tool_results = []
tool_map = runtime.context.get("tool_map", {})
node_type = "core.run_tool_calls"
category = "core"
description = "Execute tool calls from an AI response and return results"
for tool_call in tool_calls:
func_name = tool_call.function.name
if func_name in tool_map:
try:
import json
args = json.loads(tool_call.function.arguments)
result = tool_map[func_name](**args)
tool_results.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
})
except Exception as e:
tool_results.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": f"Error: {str(e)}"
})
def execute(self, inputs, runtime=None):
"""Execute tool calls from an AI response."""
resp_msg = inputs.get("response")
tool_calls = getattr(resp_msg, "tool_calls", None) or []
if not resp_msg:
return {"tool_results": [], "no_tool_calls": True}
return {
"tool_results": tool_results,
"no_tool_calls": not bool(tool_calls)
}
# Handle tool calls using tool map from context
tool_results = []
tool_map = runtime.context.get("tool_map", {})
for tool_call in tool_calls:
func_name = tool_call.function.name
if func_name in tool_map:
try:
args = json.loads(tool_call.function.arguments)
result = tool_map[func_name](**args)
tool_results.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
})
except Exception as e:
tool_results.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": f"Error: {str(e)}"
})
return {
"tool_results": tool_results,
"no_tool_calls": not bool(tool_calls)
}

View File

@@ -0,0 +1,8 @@
"""Factory for CoreRunToolCalls plugin."""
from .core_run_tool_calls import CoreRunToolCalls
def create():
"""Create a new CoreRunToolCalls instance."""
return CoreRunToolCalls()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/core_run_tool_calls",
"version": "1.0.0",
"description": "core_run_tool_calls plugin",
"description": "Execute tool calls from an AI response and return results",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["core", "workflow", "plugin"],
"main": "core_run_tool_calls.py",
"files": ["core_run_tool_calls.py", "factory.py"],
"metadata": {
"plugin_type": "core.run_tool_calls",
"category": "core"
"category": "core",
"class": "CoreRunToolCalls",
"entrypoint": "execute"
}
}

View File

@@ -1,7 +1,16 @@
"""Workflow plugin: seed messages."""
from ...base import NodeExecutor
def run(runtime, _inputs):
class CoreSeedMessages(NodeExecutor):
"""Seed messages from the prompt."""
prompt = runtime.context["prompt"]
return {"messages": list(prompt["messages"])}
node_type = "core.seed_messages"
category = "core"
description = "Initialize the message list from the prompt configuration"
def execute(self, inputs, runtime=None):
"""Seed messages from the prompt."""
prompt = runtime.context["prompt"]
return {"messages": list(prompt["messages"])}

View File

@@ -0,0 +1,8 @@
"""Factory for CoreSeedMessages plugin."""
from .core_seed_messages import CoreSeedMessages
def create():
"""Create a new CoreSeedMessages instance."""
return CoreSeedMessages()

View File

@@ -1,13 +1,16 @@
{
"name": "@metabuilder/core_seed_messages",
"version": "1.0.0",
"description": "core_seed_messages plugin",
"description": "Initialize the message list from the prompt configuration",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["core", "workflow", "plugin"],
"main": "core_seed_messages.py",
"files": ["core_seed_messages.py", "factory.py"],
"metadata": {
"plugin_type": "core.seed_messages",
"category": "core"
"category": "core",
"class": "CoreSeedMessages",
"entrypoint": "execute"
}
}

Some files were not shown because too many files have changed in this diff Show More