Introduce AutoMetabuilder core components and workflow packages:

- Implement core components: CLI argument parsing, environment loading, GitHub service creation, and logging configuration.
- Add support for OpenAI client setup and model resolution.
- Develop SDLC context loader from GitHub and repository files.
- Implement workflow context and engine builders.
- Introduce major workflow packages: `game_tick_loop` and `contextual_iterative_loop`.
- Update localization files with new package descriptions and labels.
- Streamline web navigation by loading items from a dedicated JSON file.
This commit is contained in:
2026-01-09 21:41:39 +00:00
parent e425cfc413
commit ac8479581f
71 changed files with 1264 additions and 742 deletions

Binary file not shown.

View File

@@ -0,0 +1,65 @@
"""Application runner."""
import logging
import os
from . import load_messages
from .cli_args import parse_args
from .env_loader import load_env
from .github_service import create_github_integration
from .logging_config import configure_logging
from .metadata_loader import load_metadata
from .openai_factory import create_openai_client
from .plugin_loader import load_plugins
from .prompt_loader import load_prompt_yaml
from .tool_map_builder import build_tool_map
from .tool_policy_loader import load_tool_policies
from .tool_registry_loader import load_tool_registry
from .tools_loader import load_tools
from .web.server import start_web_ui
from .workflow_config_loader import load_workflow_config
from .workflow_context_builder import build_workflow_context
from .workflow_engine_builder import build_workflow_engine
def run_app() -> None:
load_env()
configure_logging()
logger = logging.getLogger("autometabuilder")
args = parse_args()
if args.web:
logger.info("Starting Web UI...")
start_web_ui()
return
msgs = load_messages()
token = os.environ.get("GITHUB_TOKEN")
if not token:
logger.error(msgs["error_github_token_missing"])
return
gh = create_github_integration(token, msgs)
client = create_openai_client(token)
prompt = load_prompt_yaml()
metadata = load_metadata()
tools = load_tools(metadata)
registry_entries = load_tool_registry()
tool_map = build_tool_map(gh, registry_entries)
load_plugins(tool_map, tools)
tool_policies = load_tool_policies()
workflow_config = load_workflow_config(metadata)
workflow_context = build_workflow_context(
args,
gh,
msgs,
client,
tools,
tool_map,
prompt,
tool_policies
)
logger.debug("Workflow context ready with %s tools", len(tool_map))
engine = build_workflow_engine(workflow_config, workflow_context, logger)
engine.execute()

View File

@@ -0,0 +1,8 @@
"""Load a callable by dotted path."""
import importlib
def load_callable(path: str):
module_path, attr = path.rsplit(".", 1)
module = importlib.import_module(module_path)
return getattr(module, attr)

View File

@@ -0,0 +1,11 @@
"""CLI argument parsing."""
import argparse
def parse_args():
parser = argparse.ArgumentParser(description="AutoMetabuilder: AI-driven SDLC assistant.")
parser.add_argument("--dry-run", action="store_true", help="Do not execute state-modifying tools.")
parser.add_argument("--yolo", action="store_true", help="Execute tools without confirmation.")
parser.add_argument("--once", action="store_true", help="Run a single full iteration (AI -> Tool -> AI).")
parser.add_argument("--web", action="store_true", help="Start the Web UI.")
return parser.parse_args()

View File

@@ -0,0 +1,37 @@
"""Load SDLC context from repo and GitHub."""
import os
import logging
from .github_integration import GitHubIntegration
logger = logging.getLogger("autometabuilder")
def get_sdlc_context(gh: GitHubIntegration, msgs: dict) -> str:
sdlc_context = ""
if os.path.exists("ROADMAP.md"):
with open("ROADMAP.md", "r", encoding="utf-8") as f:
roadmap_content = f.read()
label = msgs.get("roadmap_label", "ROADMAP.md Content:")
sdlc_context += f"\n{label}\n{roadmap_content}\n"
else:
msg = msgs.get(
"missing_roadmap_msg",
"ROADMAP.md is missing. Please analyze the repository and create it."
)
sdlc_context += f"\n{msg}\n"
if gh:
try:
issues = gh.get_open_issues()
issue_list = "\n".join([f"- #{i.number}: {i.title}" for i in issues[:5]])
if issue_list:
sdlc_context += f"\n{msgs['open_issues_label']}\n{issue_list}"
prs = gh.get_pull_requests()
pr_list = "\n".join([f"- #{p.number}: {p.title}" for p in prs[:5]])
if pr_list:
sdlc_context += f"\n{msgs['open_prs_label']}\n{pr_list}"
except Exception as error: # pylint: disable=broad-exception-caught
logger.error(msgs["error_sdlc_context"].format(error=error))
return sdlc_context

View File

@@ -0,0 +1,6 @@
"""Load environment variables from .env."""
from dotenv import load_dotenv
def load_env() -> None:
load_dotenv()

View File

@@ -0,0 +1,18 @@
"""GitHub integration builder."""
import logging
from .github_integration import GitHubIntegration, get_repo_name_from_env
logger = logging.getLogger("autometabuilder")
def create_github_integration(token: str, msgs: dict):
if not token:
return None
try:
repo_name = get_repo_name_from_env()
gh = GitHubIntegration(token, repo_name)
logger.info(msgs["info_integrated_repo"].format(repo_name=repo_name))
return gh
except Exception as error: # pylint: disable=broad-exception-caught
logger.warning(msgs["warn_github_init_failed"].format(error=error))
return None

View File

@@ -0,0 +1,27 @@
"""Logging configuration with TRACE support."""
import logging
import os
TRACE_LEVEL = 5
def configure_logging() -> None:
"""Configure logging with TRACE support."""
logging.addLevelName(TRACE_LEVEL, "TRACE")
def trace(self, message, *args, **kwargs):
if self.isEnabledFor(TRACE_LEVEL):
self.log(TRACE_LEVEL, message, *args, **kwargs)
logging.Logger.trace = trace # type: ignore[attr-defined]
level_name = os.environ.get("LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, logging.INFO)
logging.basicConfig(
level=level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("autometabuilder.log"),
logging.StreamHandler()
]
)

View File

@@ -1,708 +1,7 @@
"""
Main entry point for AutoMetabuilder.
"""
import os
import json
import subprocess
import argparse
import yaml
import logging
import importlib
import inspect
import re
from tenacity import retry, stop_after_attempt, wait_exponential
from dotenv import load_dotenv
from openai import OpenAI
from . import load_messages
from .github_integration import GitHubIntegration, get_repo_name_from_env
from .docker_utils import run_command_in_docker
from .web.server import start_web_ui
from .integrations.notifications import notify_all
from .roadmap_utils import update_roadmap, is_mvp_reached
load_dotenv()
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("autometabuilder.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("autometabuilder")
DEFAULT_PROMPT_PATH = "prompt.yml"
DEFAULT_ENDPOINT = "https://models.github.ai/inference"
DEFAULT_MODEL = "openai/gpt-4o"
def load_prompt_yaml() -> dict:
"""Load prompt configuration from local file."""
local_path = os.environ.get("PROMPT_PATH", DEFAULT_PROMPT_PATH)
if os.path.exists(local_path):
with open(local_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
raise FileNotFoundError(f"Prompt file not found at {local_path}")
def get_sdlc_context(gh: GitHubIntegration, msgs: dict) -> str:
"""Retrieve SDLC context (issues, PRs, and Roadmap) from GitHub/Local."""
sdlc_context = ""
# Load ROADMAP.md if it exists, otherwise add instruction to create it
if os.path.exists("ROADMAP.md"):
with open("ROADMAP.md", "r", encoding="utf-8") as f:
roadmap_content = f.read()
sdlc_context += f"\n{msgs.get('roadmap_label', 'ROADMAP.md Content:')}\n{roadmap_content}\n"
else:
msg = msgs.get('missing_roadmap_msg', 'ROADMAP.md is missing. Please analyze the repository and create it.')
sdlc_context += f"\n{msg}\n"
if gh:
try:
issues = gh.get_open_issues()
issue_list = "\n".join(
[f"- #{i.number}: {i.title}" for i in issues[:5]]
)
if issue_list:
sdlc_context += f"\n{msgs['open_issues_label']}\n{issue_list}"
prs = gh.get_pull_requests()
pr_list = "\n".join([f"- #{p.number}: {p.title}" for p in prs[:5]])
if pr_list:
sdlc_context += f"\n{msgs['open_prs_label']}\n{pr_list}"
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(msgs["error_sdlc_context"].format(error=e))
return sdlc_context
def list_files(directory: str = "."):
"""List files in the repository for indexing."""
files_list = []
for root, _, files in os.walk(directory):
if ".git" in root or "__pycache__" in root or ".venv" in root:
continue
for file in files:
files_list.append(os.path.join(root, file))
result = "\n".join(files_list)
logger.info(f"Indexing repository files in {directory}...")
return result
def run_tests(path: str = "tests"):
"""Run tests using pytest."""
logger.info(f"Running tests in {path}...")
result = subprocess.run(["pytest", path], capture_output=True, text=True, check=False)
logger.info(result.stdout)
if result.stderr:
logger.error(result.stderr)
return result.stdout
def run_lint(path: str = "src"):
"""Run linting using pylint."""
logger.info(f"Running linting in {path}...")
result = subprocess.run(["pylint", path], capture_output=True, text=True, check=False)
logger.info(result.stdout)
if result.stderr:
logger.error(result.stderr)
return result.stdout
def run_docker_task(image: str, command: str, workdir: str = "/workspace"):
"""
Run a task inside a Docker container.
Volumes are automatically mapped from current directory to /workspace.
"""
volumes = {os.getcwd(): "/workspace"}
return run_command_in_docker(image, command, volumes=volumes, workdir=workdir)
def read_file(path: str) -> str:
"""Read the content of a file."""
try:
with open(path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
return f"Error reading file {path}: {e}"
def write_file(path: str, content: str) -> str:
"""Write content to a file."""
try:
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote to {path}"
except Exception as e:
return f"Error writing to file {path}: {e}"
def edit_file(path: str, search: str, replace: str) -> str:
"""Edit a file using search and replace."""
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
if search not in content:
return f"Error: '{search}' not found in {path}"
new_content = content.replace(search, replace)
with open(path, 'w', encoding='utf-8') as f:
f.write(new_content)
return f"Successfully edited {path}"
except Exception as e:
return f"Error editing file {path}: {e}"
def load_plugins(tool_map: dict, tools: list):
"""Load custom tools from the plugins directory."""
plugins_dir = os.path.join(os.path.dirname(__file__), "plugins")
if not os.path.exists(plugins_dir):
return
for filename in os.listdir(plugins_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = f".plugins.{filename[:-3]}"
try:
module = importlib.import_module(module_name, package="autometabuilder")
for name, obj in inspect.getmembers(module):
if inspect.isfunction(obj) and hasattr(obj, "tool_metadata"):
tool_metadata = getattr(obj, "tool_metadata")
tool_map[name] = obj
tools.append(tool_metadata)
logger.info(f"Loaded plugin tool: {name}")
except Exception as e:
logger.error(f"Failed to load plugin {filename}: {e}")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def get_completion(client, model, messages, tools):
"""Get completion from OpenAI with retry logic."""
return client.chat.completions.create(
model=model,
messages=messages,
tools=tools,
tool_choice="auto",
temperature=1.0,
top_p=1.0,
)
def handle_tool_calls(resp_msg, tool_map: dict, gh: GitHubIntegration, msgs: dict, dry_run: bool = False, yolo: bool = False) -> list:
"""Process tool calls from the AI response and return results for the assistant."""
if not resp_msg.tool_calls:
return []
# Tools that modify state and should be skipped in dry-run
modifying_tools = {"create_branch", "create_pull_request", "update_roadmap", "write_file", "edit_file"}
tool_results = []
for tool_call in resp_msg.tool_calls:
function_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
call_id = tool_call.id
handler = tool_map.get(function_name)
if handler:
if not yolo:
confirm = input(msgs.get("confirm_tool_execution", "Do you want to execute {name} with {args}? [y/N]: ").format(name=function_name, args=args))
if confirm.lower() != 'y':
logger.info(msgs.get("info_tool_skipped", "Skipping tool: {name}").format(name=function_name))
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": "Skipped by user.",
})
continue
if dry_run and function_name in modifying_tools:
logger.info(msgs.get("info_dry_run_skipping", "DRY RUN: Skipping state-modifying tool {name}").format(name=function_name))
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": "Skipped due to dry-run.",
})
continue
logger.info(msgs.get("info_executing_tool", "Executing tool: {name}").format(name=function_name))
try:
result = handler(**args)
content = str(result) if result is not None else "Success"
if hasattr(result, "__iter__") and not isinstance(result, str):
# Handle iterables (like PyGithub PaginatedList)
items = list(result)[:5]
content = "\n".join([f"- {item}" for item in items])
logger.info(content)
elif result is not None:
logger.info(result)
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": content,
})
except Exception as e:
error_msg = f"Error executing {function_name}: {e}"
logger.error(error_msg)
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": error_msg,
})
else:
msg = msgs.get("error_tool_not_found", "Tool {name} not found or unavailable.").format(name=function_name)
logger.error(msg)
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": msg,
})
return tool_results
class WorkflowEngine:
"""Interpret and execute a node-based workflow."""
def __init__(self, workflow_config, context):
self.workflow_config = workflow_config or {}
self.context = context
self.store = {}
self.plugins = self._build_plugins()
def execute(self):
nodes = self.workflow_config.get("nodes")
if not isinstance(nodes, list):
logger.error("Workflow config missing nodes list.")
return
self._execute_nodes(nodes)
def _execute_nodes(self, nodes):
for node in nodes:
self._execute_node(node)
def _execute_node(self, node):
node_type = node.get("type")
if not node_type:
logger.error("Workflow node missing type.")
return None
when_value = node.get("when")
if when_value is not None:
if not self._coerce_bool(self._resolve_binding(when_value)):
return None
if node_type == "control.loop":
return self._execute_loop(node)
plugin = self.plugins.get(node_type)
if not plugin:
logger.error(f"Unknown node type: {node_type}")
return None
inputs = self._resolve_inputs(node.get("inputs", {}))
result = plugin(inputs)
if not isinstance(result, dict):
result = {"result": result}
outputs = node.get("outputs", {})
if outputs:
for output_name, store_key in outputs.items():
if output_name in result:
self.store[store_key] = result[output_name]
else:
for output_name, value in result.items():
self.store[output_name] = value
return result
def _execute_loop(self, node):
inputs = node.get("inputs", {})
max_iterations = self._resolve_binding(inputs.get("max_iterations", 1))
stop_when_raw = inputs.get("stop_when")
stop_on_raw = inputs.get("stop_on", True)
try:
max_iterations = int(max_iterations)
except (TypeError, ValueError):
max_iterations = 1
if self.context["args"].once:
max_iterations = min(max_iterations, 1)
stop_on = self._coerce_bool(self._resolve_binding(stop_on_raw))
body = node.get("body", [])
if not isinstance(body, list):
logger.error("Loop body must be a list of nodes.")
return None
iteration = 0
while iteration < max_iterations:
iteration += 1
logger.info(f"--- Loop iteration {iteration} ---")
self._execute_nodes(body)
if stop_when_raw is not None:
stop_value = self._resolve_binding(stop_when_raw)
if self._coerce_bool(stop_value) == stop_on:
break
return None
def _build_plugins(self):
return {
"core.load_context": self._plugin_load_context,
"core.seed_messages": self._plugin_seed_messages,
"core.append_context_message": self._plugin_append_context_message,
"core.append_user_instruction": self._plugin_append_user_instruction,
"core.ai_request": self._plugin_ai_request,
"core.run_tool_calls": self._plugin_run_tool_calls,
"core.append_tool_results": self._plugin_append_tool_results,
"tools.list_files": self._plugin_list_files,
"tools.read_file": self._plugin_read_file,
"tools.run_tests": self._plugin_run_tests,
"tools.run_lint": self._plugin_run_lint,
"tools.create_branch": self._plugin_create_branch,
"tools.create_pull_request": self._plugin_create_pull_request,
"utils.filter_list": self._plugin_filter_list,
"utils.map_list": self._plugin_map_list,
"utils.reduce_list": self._plugin_reduce_list,
"utils.branch_condition": self._plugin_branch_condition,
"utils.not": self._plugin_not,
}
def _plugin_load_context(self, inputs):
return {"context": get_sdlc_context(self.context["gh"], self.context["msgs"])}
def _plugin_seed_messages(self, inputs):
prompt = self.context["prompt"]
return {"messages": list(prompt["messages"])}
def _plugin_append_context_message(self, inputs):
messages = list(inputs.get("messages") or [])
context_val = inputs.get("context")
if context_val:
messages.append(
{
"role": "system",
"content": f"{self.context['msgs']['sdlc_context_label']}{context_val}",
}
)
return {"messages": messages}
def _plugin_append_user_instruction(self, inputs):
messages = list(inputs.get("messages") or [])
messages.append({"role": "user", "content": self.context["msgs"]["user_next_step"]})
return {"messages": messages}
def _plugin_ai_request(self, inputs):
messages = list(inputs.get("messages") or [])
response = get_completion(
self.context["client"],
self.context["model_name"],
messages,
self.context["tools"]
)
resp_msg = response.choices[0].message
logger.info(
resp_msg.content
if resp_msg.content
else self.context["msgs"]["info_tool_call_requested"]
)
messages.append(resp_msg)
tool_calls = getattr(resp_msg, "tool_calls", None) or []
return {
"response": resp_msg,
"has_tool_calls": bool(tool_calls),
"tool_calls_count": len(tool_calls)
}
def _plugin_run_tool_calls(self, inputs):
resp_msg = inputs.get("response")
tool_calls = getattr(resp_msg, "tool_calls", None) or []
if not resp_msg:
return {"tool_results": [], "no_tool_calls": True}
tool_results = handle_tool_calls(
resp_msg,
self.context["tool_map"],
self.context["gh"],
self.context["msgs"],
dry_run=self.context["args"].dry_run,
yolo=self.context["args"].yolo
)
if not tool_calls and resp_msg.content:
notify_all(f"AutoMetabuilder task complete: {resp_msg.content[:100]}...")
return {
"tool_results": tool_results,
"no_tool_calls": not bool(tool_calls)
}
def _plugin_append_tool_results(self, inputs):
messages = list(inputs.get("messages") or [])
tool_results = inputs.get("tool_results") or []
if tool_results:
messages.extend(tool_results)
if self.context["args"].yolo and is_mvp_reached():
logger.info("MVP reached. Stopping YOLO loop.")
notify_all("AutoMetabuilder YOLO loop stopped: MVP reached.")
return {"messages": messages}
def _plugin_list_files(self, inputs):
result = self._call_tool("list_files", directory=inputs.get("path", "."))
return {"files": result}
def _plugin_read_file(self, inputs):
result = self._call_tool("read_file", path=inputs.get("path"))
return {"content": result}
def _plugin_run_tests(self, inputs):
result = self._call_tool("run_tests", path=inputs.get("path", "tests"))
return {"results": result}
def _plugin_run_lint(self, inputs):
result = self._call_tool("run_lint", path=inputs.get("path", "src"))
return {"results": result}
def _plugin_create_branch(self, inputs):
result = self._call_tool(
"create_branch",
branch_name=inputs.get("branch_name"),
base_branch=inputs.get("base_branch", "main")
)
return {"result": result}
def _plugin_create_pull_request(self, inputs):
result = self._call_tool(
"create_pull_request",
title=inputs.get("title"),
body=inputs.get("body"),
head_branch=inputs.get("head_branch"),
base_branch=inputs.get("base_branch", "main")
)
return {"result": result}
def _plugin_filter_list(self, inputs):
items = self._ensure_list(inputs.get("items"))
mode = inputs.get("mode", "contains")
pattern = inputs.get("pattern", "")
filtered = []
for item in items:
candidate = str(item)
matched = False
if mode == "contains":
matched = pattern in candidate
elif mode == "regex":
matched = bool(re.search(pattern, candidate))
elif mode == "equals":
matched = candidate == pattern
elif mode == "not_equals":
matched = candidate != pattern
elif mode == "starts_with":
matched = candidate.startswith(pattern)
elif mode == "ends_with":
matched = candidate.endswith(pattern)
if matched:
filtered.append(item)
return {"items": filtered}
def _plugin_map_list(self, inputs):
items = self._ensure_list(inputs.get("items"))
template = inputs.get("template", "{item}")
mapped = []
for item in items:
try:
mapped.append(template.format(item=item))
except Exception:
mapped.append(str(item))
return {"items": mapped}
def _plugin_reduce_list(self, inputs):
items = self._ensure_list(inputs.get("items"))
separator = self._normalize_separator(inputs.get("separator", ""))
reduced = separator.join([str(item) for item in items])
return {"result": reduced}
def _plugin_branch_condition(self, inputs):
value = inputs.get("value")
mode = inputs.get("mode", "is_truthy")
compare = inputs.get("compare", "")
decision = False
if mode == "is_empty":
decision = not self._ensure_list(value)
elif mode == "is_truthy":
decision = bool(value)
elif mode == "equals":
decision = str(value) == compare
elif mode == "not_equals":
decision = str(value) != compare
elif mode == "contains":
decision = compare in str(value)
elif mode == "regex":
decision = bool(re.search(compare, str(value)))
return {"result": decision}
def _plugin_not(self, inputs):
return {"result": not self._coerce_bool(inputs.get("value"))}
def _resolve_inputs(self, inputs):
return {key: self._resolve_binding(value) for key, value in (inputs or {}).items()}
def _resolve_binding(self, value):
if isinstance(value, str) and value.startswith("$"):
return self.store.get(value[1:])
return value
def _coerce_bool(self, value):
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in ("true", "yes", "1"):
return True
if lowered in ("false", "no", "0", ""):
return False
return bool(value)
def _ensure_list(self, value):
if value is None:
return []
if isinstance(value, list):
return value
if isinstance(value, (tuple, set)):
return list(value)
if isinstance(value, str):
return [line for line in value.splitlines() if line.strip()]
return [value]
def _normalize_separator(self, text):
if text is None:
return ""
if isinstance(text, str):
return text.replace("\\n", "\n").replace("\\t", "\t")
return str(text)
def _call_tool(self, tool_name, **kwargs):
tool = self.context["tool_map"].get(tool_name)
if not tool:
msg = self.context["msgs"].get(
"error_tool_not_found",
"Tool {name} not found or unavailable."
).format(name=tool_name)
logger.error(msg)
return msg
filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None}
try:
result = tool(**filtered_kwargs)
return result if result is not None else "Success"
except Exception as e:
error_msg = f"Error executing {tool_name}: {e}"
logger.error(error_msg)
return error_msg
"""Entry point for AutoMetabuilder."""
from .app_runner import run_app
def main():
"""Main function to run AutoMetabuilder."""
parser = argparse.ArgumentParser(description="AutoMetabuilder: AI-driven SDLC assistant.")
parser.add_argument("--dry-run", action="store_true", help="Do not execute state-modifying tools.")
parser.add_argument("--yolo", action="store_true", help="Execute tools without confirmation.")
parser.add_argument("--once", action="store_true", help="Run a single full iteration (AI -> Tool -> AI).")
parser.add_argument("--web", action="store_true", help="Start the Web UI.")
args = parser.parse_args()
if args.web:
logger.info("Starting Web UI...")
start_web_ui()
return
msgs = load_messages()
token = os.environ.get("GITHUB_TOKEN")
if not token:
logger.error(msgs["error_github_token_missing"])
return
# Initialize GitHub Integration
gh = None
try:
repo_name = get_repo_name_from_env()
gh = GitHubIntegration(token, repo_name)
logger.info(msgs["info_integrated_repo"].format(repo_name=repo_name))
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning(msgs["warn_github_init_failed"].format(error=e))
client = OpenAI(
base_url=os.environ.get("GITHUB_MODELS_ENDPOINT", DEFAULT_ENDPOINT),
api_key=token,
)
prompt = load_prompt_yaml()
# Load Metadata
metadata_path = os.path.join(os.path.dirname(__file__), "metadata.json")
with open(metadata_path, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Load tools for SDLC operations from JSON file
tools_path = os.path.join(os.path.dirname(__file__), metadata.get("tools_path", "tools.json"))
with open(tools_path, "r", encoding="utf-8") as f:
tools = json.load(f)
# Declarative mapping of tool names to functions
tool_map = {
"create_branch": gh.create_branch if gh else None,
"create_pull_request": gh.create_pull_request if gh else None,
"get_pull_request_comments": gh.get_pull_request_comments if gh else None,
"update_roadmap": update_roadmap,
"list_files": list_files,
"run_tests": run_tests,
"run_lint": run_lint,
"run_docker_task": run_docker_task,
"read_file": read_file,
"write_file": write_file,
"edit_file": edit_file,
}
# Load plugins and update tool_map and tools list
load_plugins(tool_map, tools)
model_name = os.environ.get("LLM_MODEL", prompt.get("model", DEFAULT_MODEL))
# Load Workflow
workflow_path = os.path.join(os.path.dirname(__file__), metadata.get("workflow_path", "workflow.json"))
with open(workflow_path, "r", encoding="utf-8") as f:
workflow_config = json.load(f)
# Initialize Context for Workflow Engine
workflow_context = {
"gh": gh,
"msgs": msgs,
"client": client,
"prompt": prompt,
"tools": tools,
"tool_map": tool_map,
"model_name": model_name,
"args": args
}
engine = WorkflowEngine(workflow_config, workflow_context)
engine.execute()
if __name__ == "__main__":
main()
"""CLI entrypoint."""
run_app()

View File

@@ -389,5 +389,9 @@
"meta.workflow_packages.testing_triangle.label": "Testing Triangle",
"meta.workflow_packages.testing_triangle.description": "Run lint, unit, then UI tests with conditional gating.",
"meta.workflow_packages.repo_scan_context.label": "Repo Scan Context",
"meta.workflow_packages.repo_scan_context.description": "Map-reduce repo files into context before the AI request."
"meta.workflow_packages.repo_scan_context.description": "Map-reduce repo files into context before the AI request.",
"meta.workflow_packages.contextual_iterative_loop.label": "Contextual Iterative Loop",
"meta.workflow_packages.contextual_iterative_loop.description": "Scan files into context, then loop AI/tool steps until no tool calls remain.",
"meta.workflow_packages.game_tick_loop.label": "Game Tick Loop",
"meta.workflow_packages.game_tick_loop.description": "Seed a tick script and loop AI/tool steps for a few cycles."
}

View File

@@ -385,5 +385,9 @@
"meta.workflow_packages.testing_triangle.label": "Triángulo de pruebas",
"meta.workflow_packages.testing_triangle.description": "Ejecuta lint, pruebas unitarias y UI con control condicional.",
"meta.workflow_packages.repo_scan_context.label": "Contexto del escaneo del repositorio",
"meta.workflow_packages.repo_scan_context.description": "Reduce el repositorio a contexto antes de la solicitud de IA."
"meta.workflow_packages.repo_scan_context.description": "Reduce el repositorio a contexto antes de la solicitud de IA.",
"meta.workflow_packages.contextual_iterative_loop.label": "Bucle iterativo con contexto",
"meta.workflow_packages.contextual_iterative_loop.description": "Escanea archivos y luego itera IA/herramientas hasta no tener llamadas.",
"meta.workflow_packages.game_tick_loop.label": "Bucle de ticks del juego",
"meta.workflow_packages.game_tick_loop.description": "Siembra un guion de ticks y repite IA/herramientas por algunos ciclos."
}

View File

@@ -385,5 +385,9 @@
"meta.workflow_packages.testing_triangle.label": "Triangle de tests",
"meta.workflow_packages.testing_triangle.description": "Exécute lint, tests unitaires puis UI avec contrôle conditionnel.",
"meta.workflow_packages.repo_scan_context.label": "Contexte d'exploration du dépôt",
"meta.workflow_packages.repo_scan_context.description": "Réduit les fichiers du dépôt en contexte avant la requête IA."
"meta.workflow_packages.repo_scan_context.description": "Réduit les fichiers du dépôt en contexte avant la requête IA.",
"meta.workflow_packages.contextual_iterative_loop.label": "Boucle itérative contextualisée",
"meta.workflow_packages.contextual_iterative_loop.description": "Scanne les fichiers puis boucle IA/outils jusqu'à absence d'appels.",
"meta.workflow_packages.game_tick_loop.label": "Boucle de ticks de jeu",
"meta.workflow_packages.game_tick_loop.description": "Sème un script de ticks puis boucle IA/outils sur quelques cycles."
}

View File

@@ -385,5 +385,9 @@
"meta.workflow_packages.testing_triangle.label": "Testdriehoek",
"meta.workflow_packages.testing_triangle.description": "Voer lint, unit- en UI-tests uit met voorwaardelijke gating.",
"meta.workflow_packages.repo_scan_context.label": "Repo-scancontext",
"meta.workflow_packages.repo_scan_context.description": "Reduceer repo-bestanden naar context vóór de AI-aanvraag."
"meta.workflow_packages.repo_scan_context.description": "Reduceer repo-bestanden naar context vóór de AI-aanvraag.",
"meta.workflow_packages.contextual_iterative_loop.label": "Contextuele iteratieve lus",
"meta.workflow_packages.contextual_iterative_loop.description": "Scan bestanden en herhaal AI/tool-stappen tot er geen tool-calls zijn.",
"meta.workflow_packages.game_tick_loop.label": "Game-ticklus",
"meta.workflow_packages.game_tick_loop.description": "Zet een tick-script klaar en herhaal AI/tool-stappen enkele cycli."
}

View File

@@ -385,5 +385,9 @@
"meta.workflow_packages.testing_triangle.label": "Testin' Triangle",
"meta.workflow_packages.testing_triangle.description": "Run lint, unit, then UI tests with conditional gating.",
"meta.workflow_packages.repo_scan_context.label": "Repo Scan Context",
"meta.workflow_packages.repo_scan_context.description": "Map-reduce repo files into context before the AI request."
"meta.workflow_packages.repo_scan_context.description": "Map-reduce repo files into context before the AI request.",
"meta.workflow_packages.contextual_iterative_loop.label": "Contextual Iterative Loop",
"meta.workflow_packages.contextual_iterative_loop.description": "Scan files, then loop AI/tool steps till no tool calls remain.",
"meta.workflow_packages.game_tick_loop.label": "Game Tick Loop",
"meta.workflow_packages.game_tick_loop.description": "Seed a tick script and loop AI/tool steps fer a few cycles."
}

View File

@@ -0,0 +1,9 @@
"""Load metadata.json."""
import json
import os
def load_metadata() -> dict:
metadata_path = os.path.join(os.path.dirname(__file__), "metadata.json")
with open(metadata_path, "r", encoding="utf-8") as f:
return json.load(f)

View File

@@ -0,0 +1,8 @@
"""Resolve the LLM model name."""
import os
DEFAULT_MODEL = "openai/gpt-4o"
def resolve_model_name(prompt: dict) -> str:
return os.environ.get("LLM_MODEL", prompt.get("model", DEFAULT_MODEL))

View File

@@ -0,0 +1,14 @@
"""OpenAI client helpers."""
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def get_completion(client, model, messages, tools):
return client.chat.completions.create(
model=model,
messages=messages,
tools=tools,
tool_choice="auto",
temperature=1.0,
top_p=1.0,
)

View File

@@ -0,0 +1,12 @@
"""OpenAI client factory."""
import os
from openai import OpenAI
DEFAULT_ENDPOINT = "https://models.github.ai/inference"
def create_openai_client(token: str):
return OpenAI(
base_url=os.environ.get("GITHUB_MODELS_ENDPOINT", DEFAULT_ENDPOINT),
api_key=token,
)

View File

@@ -0,0 +1,27 @@
"""Load custom tools from the plugins directory."""
import importlib
import inspect
import logging
import os
logger = logging.getLogger("autometabuilder")
def load_plugins(tool_map: dict, tools: list) -> None:
plugins_dir = os.path.join(os.path.dirname(__file__), "plugins")
if not os.path.exists(plugins_dir):
return
for filename in os.listdir(plugins_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = f".plugins.{filename[:-3]}"
try:
module = importlib.import_module(module_name, package="autometabuilder")
for name, obj in inspect.getmembers(module):
if inspect.isfunction(obj) and hasattr(obj, "tool_metadata"):
tool_metadata = getattr(obj, "tool_metadata")
tool_map[name] = obj
tools.append(tool_metadata)
logger.info("Loaded plugin tool: %s", name)
except Exception as error: # pylint: disable=broad-exception-caught
logger.error("Failed to load plugin %s: %s", filename, error)

View File

@@ -0,0 +1,14 @@
"""Load prompt configuration."""
import os
import yaml
DEFAULT_PROMPT_PATH = "prompt.yml"
def load_prompt_yaml() -> dict:
"""Load prompt YAML from disk."""
local_path = os.environ.get("PROMPT_PATH", DEFAULT_PROMPT_PATH)
if os.path.exists(local_path):
with open(local_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
raise FileNotFoundError(f"Prompt file not found at {local_path}")

View File

@@ -0,0 +1,21 @@
"""Build tool map from registry entries."""
from .callable_loader import load_callable
def build_tool_map(gh, registry_entries: list) -> dict:
tool_map = {}
for entry in registry_entries:
name = entry.get("name")
provider = entry.get("provider")
if not name:
continue
if provider == "github":
method = entry.get("method")
tool_map[name] = getattr(gh, method) if gh and method else None
continue
if provider == "module":
path = entry.get("callable")
tool_map[name] = load_callable(path) if path else None
continue
tool_map[name] = None
return tool_map

View File

@@ -0,0 +1,9 @@
{
"modifying_tools": [
"create_branch",
"create_pull_request",
"update_roadmap",
"write_file",
"edit_file"
]
}

View File

@@ -0,0 +1,15 @@
"""Load tool policies from JSON."""
import json
import os
def load_tool_policies() -> dict:
path = os.path.join(os.path.dirname(__file__), "tool_policies.json")
if not os.path.exists(path):
return {"modifying_tools": []}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
except json.JSONDecodeError:
return {"modifying_tools": []}
return data if isinstance(data, dict) else {"modifying_tools": []}

View File

@@ -0,0 +1,13 @@
[
{"name": "create_branch", "provider": "github", "method": "create_branch"},
{"name": "create_pull_request", "provider": "github", "method": "create_pull_request"},
{"name": "get_pull_request_comments", "provider": "github", "method": "get_pull_request_comments"},
{"name": "update_roadmap", "provider": "module", "callable": "autometabuilder.roadmap_utils.update_roadmap"},
{"name": "list_files", "provider": "module", "callable": "autometabuilder.tools.list_files.list_files"},
{"name": "run_tests", "provider": "module", "callable": "autometabuilder.tools.run_tests.run_tests"},
{"name": "run_lint", "provider": "module", "callable": "autometabuilder.tools.run_lint.run_lint"},
{"name": "run_docker_task", "provider": "module", "callable": "autometabuilder.tools.run_docker_task.run_docker_task"},
{"name": "read_file", "provider": "module", "callable": "autometabuilder.tools.read_file.read_file"},
{"name": "write_file", "provider": "module", "callable": "autometabuilder.tools.write_file.write_file"},
{"name": "edit_file", "provider": "module", "callable": "autometabuilder.tools.edit_file.edit_file"}
]

View File

@@ -0,0 +1,15 @@
"""Load tool registry entries."""
import json
import os
def load_tool_registry() -> list:
path = os.path.join(os.path.dirname(__file__), "tool_registry.json")
if not os.path.exists(path):
return []
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
except json.JSONDecodeError:
return []
return data if isinstance(data, list) else []

View File

@@ -0,0 +1 @@
"""Tool handlers."""

View File

@@ -0,0 +1,14 @@
"""Edit file content with search/replace."""
def edit_file(path: str, search: str, replace: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
if search not in content:
return f"Error: '{search}' not found in {path}"
new_content = content.replace(search, replace)
with open(path, "w", encoding="utf-8") as f:
f.write(new_content)
return f"Successfully edited {path}"
except Exception as error: # pylint: disable=broad-exception-caught
return f"Error editing file {path}: {error}"

View File

@@ -0,0 +1,17 @@
"""List files in the repo."""
import os
import logging
logger = logging.getLogger("autometabuilder")
def list_files(directory: str = ".") -> str:
files_list = []
for root, _, files in os.walk(directory):
if ".git" in root or "__pycache__" in root or ".venv" in root:
continue
for file in files:
files_list.append(os.path.join(root, file))
result = "\n".join(files_list)
logger.info("Indexing repository files in %s...", directory)
return result

View File

@@ -0,0 +1,8 @@
"""Read file content."""
def read_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception as error: # pylint: disable=broad-exception-caught
return f"Error reading file {path}: {error}"

View File

@@ -0,0 +1,8 @@
"""Run a task inside Docker."""
import os
from ..docker_utils import run_command_in_docker
def run_docker_task(image: str, command: str, workdir: str = "/workspace") -> str:
volumes = {os.getcwd(): "/workspace"}
return run_command_in_docker(image, command, volumes=volumes, workdir=workdir)

View File

@@ -0,0 +1,14 @@
"""Run pylint on a path."""
import logging
import subprocess
logger = logging.getLogger("autometabuilder")
def run_lint(path: str = "src") -> str:
logger.info("Running linting in %s...", path)
result = subprocess.run(["pylint", path], capture_output=True, text=True, check=False)
logger.info(result.stdout)
if result.stderr:
logger.error(result.stderr)
return result.stdout

View File

@@ -0,0 +1,14 @@
"""Run pytest on a path."""
import logging
import subprocess
logger = logging.getLogger("autometabuilder")
def run_tests(path: str = "tests") -> str:
logger.info("Running tests in %s...", path)
result = subprocess.run(["pytest", path], capture_output=True, text=True, check=False)
logger.info(result.stdout)
if result.stderr:
logger.error(result.stderr)
return result.stdout

View File

@@ -0,0 +1,9 @@
"""Write file content."""
def write_file(path: str, content: str) -> str:
try:
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return f"Successfully wrote to {path}"
except Exception as error: # pylint: disable=broad-exception-caught
return f"Error writing to file {path}: {error}"

View File

@@ -0,0 +1,9 @@
"""Load tool specs from JSON."""
import json
import os
def load_tools(metadata: dict) -> list:
tools_path = os.path.join(os.path.dirname(__file__), metadata.get("tools_path", "tools.json"))
with open(tools_path, "r", encoding="utf-8") as f:
return json.load(f)

View File

@@ -0,0 +1,32 @@
[
{
"section": "dashboard",
"icon": "speedometer2",
"label_key": "ui.nav.dashboard",
"default_label": "Dashboard"
},
{
"section": "workflow",
"icon": "diagram-3",
"label_key": "ui.nav.workflow",
"default_label": "Workflow"
},
{
"section": "prompt",
"icon": "file-text",
"label_key": "ui.nav.prompt",
"default_label": "Prompt"
},
{
"section": "settings",
"icon": "gear",
"label_key": "ui.nav.settings",
"default_label": "Settings"
},
{
"section": "translations",
"icon": "translate",
"label_key": "ui.nav.translations",
"default_label": "Translations"
}
]

View File

@@ -259,38 +259,15 @@ def load_workflow_packages():
return packages
def get_navigation_items():
return [
{
"section": "dashboard",
"icon": "speedometer2",
"label_key": "ui.nav.dashboard",
"default_label": "Dashboard"
},
{
"section": "workflow",
"icon": "diagram-3",
"label_key": "ui.nav.workflow",
"default_label": "Workflow"
},
{
"section": "prompt",
"icon": "file-text",
"label_key": "ui.nav.prompt",
"default_label": "Prompt"
},
{
"section": "settings",
"icon": "gear",
"label_key": "ui.nav.settings",
"default_label": "Settings"
},
{
"section": "translations",
"icon": "translate",
"label_key": "ui.nav.translations",
"default_label": "Translations"
}
]
items_path = os.path.join(os.path.dirname(__file__), "navigation_items.json")
if not os.path.exists(items_path):
return []
try:
with open(items_path, "r", encoding="utf-8") as f:
data = json.load(f)
except json.JSONDecodeError:
return []
return data if isinstance(data, list) else []
@app.get("/", response_class=HTMLResponse)
async def read_item(request: Request, username: str = Depends(get_current_user)):

View File

@@ -0,0 +1 @@
"""Workflow package."""

View File

@@ -0,0 +1,15 @@
"""Workflow engine runner."""
class WorkflowEngine:
def __init__(self, workflow_config, node_executor, logger):
self.workflow_config = workflow_config or {}
self.node_executor = node_executor
self.logger = logger
def execute(self):
nodes = self.workflow_config.get("nodes")
if not isinstance(nodes, list):
self.logger.error("Workflow config missing nodes list.")
return
self.node_executor.execute_nodes(nodes)

View File

@@ -0,0 +1,18 @@
"""Resolve workflow bindings and coercions."""
from .value_helpers import ValueHelpers
class InputResolver:
def __init__(self, store: dict):
self.store = store
def resolve_inputs(self, inputs: dict) -> dict:
return {key: self.resolve_binding(value) for key, value in (inputs or {}).items()}
def resolve_binding(self, value):
if isinstance(value, str) and value.startswith("$"):
return self.store.get(value[1:])
return value
def coerce_bool(self, value) -> bool:
return ValueHelpers.coerce_bool(value)

View File

@@ -0,0 +1,47 @@
"""Execute workflow loops."""
class LoopExecutor:
def __init__(self, runtime, input_resolver):
self.runtime = runtime
self.input_resolver = input_resolver
self.node_executor = None
def set_node_executor(self, node_executor) -> None:
self.node_executor = node_executor
def execute(self, node):
inputs = node.get("inputs", {})
max_iterations = self.input_resolver.resolve_binding(inputs.get("max_iterations", 1))
stop_when_raw = inputs.get("stop_when")
stop_on_raw = inputs.get("stop_on", True)
try:
max_iterations = int(max_iterations)
except (TypeError, ValueError):
max_iterations = 1
if self.runtime.context["args"].once:
max_iterations = min(max_iterations, 1)
stop_on = self.input_resolver.coerce_bool(self.input_resolver.resolve_binding(stop_on_raw))
body = node.get("body", [])
if not isinstance(body, list):
self.runtime.logger.error("Loop body must be a list of nodes.")
return None
iteration = 0
while iteration < max_iterations:
iteration += 1
self.runtime.logger.info("--- Loop iteration %s ---", iteration)
if not self.node_executor:
self.runtime.logger.error("Loop executor missing node executor.")
return None
self.node_executor.execute_nodes(body)
if stop_when_raw is not None:
stop_value = self.input_resolver.resolve_binding(stop_when_raw)
if self.input_resolver.coerce_bool(stop_value) == stop_on:
break
return None

View File

@@ -0,0 +1,50 @@
"""Execute workflow nodes."""
class NodeExecutor:
def __init__(self, runtime, plugin_registry, input_resolver, loop_executor):
self.runtime = runtime
self.plugin_registry = plugin_registry
self.input_resolver = input_resolver
self.loop_executor = loop_executor
def execute_nodes(self, nodes):
for node in nodes:
self.execute_node(node)
def execute_node(self, node):
node_type = node.get("type")
if not node_type:
self.runtime.logger.error("Workflow node missing type.")
return None
when_value = node.get("when")
if when_value is not None:
if not self.input_resolver.coerce_bool(self.input_resolver.resolve_binding(when_value)):
self.runtime.logger.trace("Node %s skipped by condition", node.get("id"))
return None
if node_type == "control.loop":
return self.loop_executor.execute(node)
plugin = self.plugin_registry.get(node_type)
if not plugin:
self.runtime.logger.error("Unknown node type: %s", node_type)
return None
inputs = self.input_resolver.resolve_inputs(node.get("inputs", {}))
self.runtime.logger.debug("Executing node %s", node_type)
result = plugin(self.runtime, inputs)
if not isinstance(result, dict):
result = {"result": result}
outputs = node.get("outputs", {})
if outputs:
for output_name, store_key in outputs.items():
if output_name in result:
self.runtime.store[store_key] = result[output_name]
else:
for output_name, value in result.items():
self.runtime.store[output_name] = value
return result

View File

@@ -0,0 +1,6 @@
"""Load workflow plugins by dotted path."""
from ..callable_loader import load_callable
def load_plugin_callable(path: str):
return load_callable(path)

View File

@@ -0,0 +1,20 @@
{
"core.load_context": "autometabuilder.workflow.plugins.core_load_context.run",
"core.seed_messages": "autometabuilder.workflow.plugins.core_seed_messages.run",
"core.append_context_message": "autometabuilder.workflow.plugins.core_append_context_message.run",
"core.append_user_instruction": "autometabuilder.workflow.plugins.core_append_user_instruction.run",
"core.ai_request": "autometabuilder.workflow.plugins.core_ai_request.run",
"core.run_tool_calls": "autometabuilder.workflow.plugins.core_run_tool_calls.run",
"core.append_tool_results": "autometabuilder.workflow.plugins.core_append_tool_results.run",
"tools.list_files": "autometabuilder.workflow.plugins.tools_list_files.run",
"tools.read_file": "autometabuilder.workflow.plugins.tools_read_file.run",
"tools.run_tests": "autometabuilder.workflow.plugins.tools_run_tests.run",
"tools.run_lint": "autometabuilder.workflow.plugins.tools_run_lint.run",
"tools.create_branch": "autometabuilder.workflow.plugins.tools_create_branch.run",
"tools.create_pull_request": "autometabuilder.workflow.plugins.tools_create_pull_request.run",
"utils.filter_list": "autometabuilder.workflow.plugins.utils_filter_list.run",
"utils.map_list": "autometabuilder.workflow.plugins.utils_map_list.run",
"utils.reduce_list": "autometabuilder.workflow.plugins.utils_reduce_list.run",
"utils.branch_condition": "autometabuilder.workflow.plugins.utils_branch_condition.run",
"utils.not": "autometabuilder.workflow.plugins.utils_not.run"
}

View File

@@ -0,0 +1,34 @@
"""Workflow plugin registry."""
import json
import logging
import os
from .plugin_loader import load_plugin_callable
logger = logging.getLogger("autometabuilder")
def load_plugin_map() -> dict:
map_path = os.path.join(os.path.dirname(__file__), "plugin_map.json")
if not os.path.exists(map_path):
return {}
try:
with open(map_path, "r", encoding="utf-8") as f:
data = json.load(f)
except json.JSONDecodeError:
logger.error("Invalid workflow plugin map JSON.")
return {}
return data if isinstance(data, dict) else {}
class PluginRegistry:
def __init__(self, plugin_map: dict):
self._plugins = {}
for node_type, path in plugin_map.items():
try:
self._plugins[node_type] = load_plugin_callable(path)
logger.debug("Registered workflow plugin %s -> %s", node_type, path)
except Exception as error: # pylint: disable=broad-exception-caught
logger.error("Failed to register plugin %s: %s", node_type, error)
def get(self, node_type: str):
return self._plugins.get(node_type)

View File

@@ -0,0 +1,25 @@
"""Workflow plugin: AI request."""
from ...openai_client import get_completion
def run(runtime, inputs):
messages = list(inputs.get("messages") or [])
response = get_completion(
runtime.context["client"],
runtime.context["model_name"],
messages,
runtime.context["tools"]
)
resp_msg = response.choices[0].message
runtime.logger.info(
resp_msg.content
if resp_msg.content
else runtime.context["msgs"]["info_tool_call_requested"]
)
messages.append(resp_msg)
tool_calls = getattr(resp_msg, "tool_calls", None) or []
return {
"response": resp_msg,
"has_tool_calls": bool(tool_calls),
"tool_calls_count": len(tool_calls)
}

View File

@@ -0,0 +1,12 @@
"""Workflow plugin: append context message."""
def run(runtime, inputs):
messages = list(inputs.get("messages") or [])
context_val = inputs.get("context")
if context_val:
messages.append({
"role": "system",
"content": f"{runtime.context['msgs']['sdlc_context_label']}{context_val}",
})
return {"messages": messages}

View File

@@ -0,0 +1,16 @@
"""Workflow plugin: append tool results."""
from ...integrations.notifications import notify_all
from ...roadmap_utils import is_mvp_reached
def run(runtime, inputs):
messages = list(inputs.get("messages") or [])
tool_results = inputs.get("tool_results") or []
if tool_results:
messages.extend(tool_results)
if runtime.context["args"].yolo and is_mvp_reached():
runtime.logger.info("MVP reached. Stopping YOLO loop.")
notify_all("AutoMetabuilder YOLO loop stopped: MVP reached.")
return {"messages": messages}

View File

@@ -0,0 +1,7 @@
"""Workflow plugin: append user instruction."""
def run(runtime, inputs):
messages = list(inputs.get("messages") or [])
messages.append({"role": "user", "content": runtime.context["msgs"]["user_next_step"]})
return {"messages": messages}

View File

@@ -0,0 +1,6 @@
"""Workflow plugin: load SDLC context."""
from ...context_loader import get_sdlc_context
def run(runtime, inputs):
return {"context": get_sdlc_context(runtime.context["gh"], runtime.context["msgs"])}

View File

@@ -0,0 +1,25 @@
"""Workflow plugin: run tool calls."""
from ...integrations.notifications import notify_all
from ..tool_calls_handler import handle_tool_calls
def run(runtime, inputs):
resp_msg = inputs.get("response")
tool_calls = getattr(resp_msg, "tool_calls", None) or []
if not resp_msg:
return {"tool_results": [], "no_tool_calls": True}
tool_results = handle_tool_calls(
resp_msg,
runtime.context["tool_map"],
runtime.context["msgs"],
runtime.context["args"],
runtime.context["tool_policies"],
runtime.logger
)
if not tool_calls and resp_msg.content:
notify_all(f"AutoMetabuilder task complete: {resp_msg.content[:100]}...")
return {
"tool_results": tool_results,
"no_tool_calls": not bool(tool_calls)
}

View File

@@ -0,0 +1,6 @@
"""Workflow plugin: seed messages."""
def run(runtime, inputs):
prompt = runtime.context["prompt"]
return {"messages": list(prompt["messages"])}

View File

@@ -0,0 +1,10 @@
"""Workflow plugin: create branch."""
def run(runtime, inputs):
result = runtime.tool_runner.call(
"create_branch",
branch_name=inputs.get("branch_name"),
base_branch=inputs.get("base_branch", "main")
)
return {"result": result}

View File

@@ -0,0 +1,12 @@
"""Workflow plugin: create pull request."""
def run(runtime, inputs):
result = runtime.tool_runner.call(
"create_pull_request",
title=inputs.get("title"),
body=inputs.get("body"),
head_branch=inputs.get("head_branch"),
base_branch=inputs.get("base_branch", "main")
)
return {"result": result}

View File

@@ -0,0 +1,6 @@
"""Workflow plugin: list files."""
def run(runtime, inputs):
result = runtime.tool_runner.call("list_files", directory=inputs.get("path", "."))
return {"files": result}

View File

@@ -0,0 +1,6 @@
"""Workflow plugin: read file."""
def run(runtime, inputs):
result = runtime.tool_runner.call("read_file", path=inputs.get("path"))
return {"content": result}

View File

@@ -0,0 +1,6 @@
"""Workflow plugin: run lint."""
def run(runtime, inputs):
result = runtime.tool_runner.call("run_lint", path=inputs.get("path", "src"))
return {"results": result}

View File

@@ -0,0 +1,6 @@
"""Workflow plugin: run tests."""
def run(runtime, inputs):
result = runtime.tool_runner.call("run_tests", path=inputs.get("path", "tests"))
return {"results": result}

View File

@@ -0,0 +1,25 @@
"""Workflow plugin: branch condition."""
import re
from ..value_helpers import ValueHelpers
def run(runtime, inputs):
value = inputs.get("value")
mode = inputs.get("mode", "is_truthy")
compare = inputs.get("compare", "")
decision = False
if mode == "is_empty":
decision = not ValueHelpers.ensure_list(value)
elif mode == "is_truthy":
decision = bool(value)
elif mode == "equals":
decision = str(value) == compare
elif mode == "not_equals":
decision = str(value) != compare
elif mode == "contains":
decision = compare in str(value)
elif mode == "regex":
decision = bool(re.search(compare, str(value)))
return {"result": decision}

View File

@@ -0,0 +1,28 @@
"""Workflow plugin: filter list."""
import re
from ..value_helpers import ValueHelpers
def run(runtime, inputs):
items = ValueHelpers.ensure_list(inputs.get("items"))
mode = inputs.get("mode", "contains")
pattern = inputs.get("pattern", "")
filtered = []
for item in items:
candidate = str(item)
matched = False
if mode == "contains":
matched = pattern in candidate
elif mode == "regex":
matched = bool(re.search(pattern, candidate))
elif mode == "equals":
matched = candidate == pattern
elif mode == "not_equals":
matched = candidate != pattern
elif mode == "starts_with":
matched = candidate.startswith(pattern)
elif mode == "ends_with":
matched = candidate.endswith(pattern)
if matched:
filtered.append(item)
return {"items": filtered}

View File

@@ -0,0 +1,14 @@
"""Workflow plugin: map list."""
from ..value_helpers import ValueHelpers
def run(runtime, inputs):
items = ValueHelpers.ensure_list(inputs.get("items"))
template = inputs.get("template", "{item}")
mapped = []
for item in items:
try:
mapped.append(template.format(item=item))
except Exception: # pylint: disable=broad-exception-caught
mapped.append(str(item))
return {"items": mapped}

View File

@@ -0,0 +1,6 @@
"""Workflow plugin: boolean not."""
from ..value_helpers import ValueHelpers
def run(runtime, inputs):
return {"result": not ValueHelpers.coerce_bool(inputs.get("value"))}

View File

@@ -0,0 +1,9 @@
"""Workflow plugin: reduce list."""
from ..value_helpers import ValueHelpers
def run(runtime, inputs):
items = ValueHelpers.ensure_list(inputs.get("items"))
separator = ValueHelpers.normalize_separator(inputs.get("separator", ""))
reduced = separator.join([str(item) for item in items])
return {"result": reduced}

View File

@@ -0,0 +1,9 @@
"""Workflow runtime container."""
class WorkflowRuntime:
def __init__(self, context: dict, store: dict, tool_runner, logger):
self.context = context
self.store = store
self.tool_runner = tool_runner
self.logger = logger

View File

@@ -0,0 +1,91 @@
"""Handle tool calls from LLM responses."""
import json
def handle_tool_calls(resp_msg, tool_map: dict, msgs: dict, args, policies: dict, logger) -> list:
if not resp_msg.tool_calls:
return []
modifying_tools = set(policies.get("modifying_tools", []))
tool_results = []
for tool_call in resp_msg.tool_calls:
function_name = tool_call.function.name
call_id = tool_call.id
payload = json.loads(tool_call.function.arguments)
logger.trace("Tool call %s payload: %s", function_name, payload)
handler = tool_map.get(function_name)
if not handler:
msg = msgs.get("error_tool_not_found", "Tool {name} not found or unavailable.").format(
name=function_name
)
logger.error(msg)
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": msg,
})
continue
if not args.yolo:
confirm = input(
msgs.get(
"confirm_tool_execution",
"Do you want to execute {name} with {args}? [y/N]: "
).format(name=function_name, args=payload)
)
if confirm.lower() != "y":
logger.info(msgs.get("info_tool_skipped", "Skipping tool: {name}").format(name=function_name))
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": "Skipped by user.",
})
continue
if args.dry_run and function_name in modifying_tools:
logger.info(
msgs.get(
"info_dry_run_skipping",
"DRY RUN: Skipping state-modifying tool {name}"
).format(name=function_name)
)
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": "Skipped due to dry-run.",
})
continue
logger.info(msgs.get("info_executing_tool", "Executing tool: {name}").format(name=function_name))
try:
result = handler(**payload)
content = str(result) if result is not None else "Success"
if hasattr(result, "__iter__") and not isinstance(result, str):
items = list(result)[:5]
content = "\n".join([f"- {item}" for item in items])
logger.info(content)
elif result is not None:
logger.info(result)
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": content,
})
except Exception as error: # pylint: disable=broad-exception-caught
error_msg = f"Error executing {function_name}: {error}"
logger.error(error_msg)
tool_results.append({
"tool_call_id": call_id,
"role": "tool",
"name": function_name,
"content": error_msg,
})
return tool_results

View File

@@ -0,0 +1,27 @@
"""Run tools with logging and filtering."""
class ToolRunner:
def __init__(self, tool_map: dict, msgs: dict, logger):
self.tool_map = tool_map
self.msgs = msgs
self.logger = logger
def call(self, tool_name: str, **kwargs):
tool = self.tool_map.get(tool_name)
if not tool:
msg = self.msgs.get(
"error_tool_not_found",
"Tool {name} not found or unavailable."
).format(name=tool_name)
self.logger.error(msg)
return msg
filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None}
try:
result = tool(**filtered_kwargs)
return result if result is not None else "Success"
except Exception as error: # pylint: disable=broad-exception-caught
error_msg = f"Error executing {tool_name}: {error}"
self.logger.error(error_msg)
return error_msg

View File

@@ -0,0 +1,35 @@
"""Helpers for normalizing workflow values."""
class ValueHelpers:
@staticmethod
def ensure_list(value):
if value is None:
return []
if isinstance(value, list):
return value
if isinstance(value, (tuple, set)):
return list(value)
if isinstance(value, str):
return [line for line in value.splitlines() if line.strip()]
return [value]
@staticmethod
def coerce_bool(value) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in ("true", "yes", "1"):
return True
if lowered in ("false", "no", "0", ""):
return False
return bool(value)
@staticmethod
def normalize_separator(text):
if text is None:
return ""
if isinstance(text, str):
return text.replace("\\n", "\n").replace("\\t", "\t")
return str(text)

View File

@@ -0,0 +1,10 @@
"""Load workflow configuration JSON."""
import json
import os
def load_workflow_config(metadata: dict) -> dict:
workflow_file = metadata.get("workflow_path", "workflow.json")
workflow_path = os.path.join(os.path.dirname(__file__), workflow_file)
with open(workflow_path, "r", encoding="utf-8") as f:
return json.load(f)

View File

@@ -0,0 +1,16 @@
"""Build workflow runtime context."""
from .model_resolver import resolve_model_name
def build_workflow_context(args, gh, msgs, client, tools, tool_map, prompt, tool_policies) -> dict:
return {
"args": args,
"gh": gh,
"msgs": msgs,
"client": client,
"model_name": resolve_model_name(prompt),
"tools": tools,
"tool_map": tool_map,
"prompt": prompt,
"tool_policies": tool_policies,
}

View File

@@ -0,0 +1,22 @@
"""Build workflow engine with dependencies."""
from .workflow.engine import WorkflowEngine
from .workflow.input_resolver import InputResolver
from .workflow.loop_executor import LoopExecutor
from .workflow.node_executor import NodeExecutor
from .workflow.plugin_registry import PluginRegistry, load_plugin_map
from .workflow.runtime import WorkflowRuntime
from .workflow.tool_runner import ToolRunner
def build_workflow_engine(workflow_config: dict, context: dict, logger):
runtime = WorkflowRuntime(context=context, store={}, tool_runner=None, logger=logger)
tool_runner = ToolRunner(context["tool_map"], context["msgs"], logger)
runtime.tool_runner = tool_runner
plugin_registry = PluginRegistry(load_plugin_map())
input_resolver = InputResolver(runtime.store)
loop_executor = LoopExecutor(runtime, input_resolver)
node_executor = NodeExecutor(runtime, plugin_registry, input_resolver, loop_executor)
loop_executor.set_node_executor(node_executor)
return WorkflowEngine(workflow_config, node_executor, logger)

View File

@@ -0,0 +1,80 @@
{
"id": "contextual_iterative_loop",
"label": "meta.workflow_packages.contextual_iterative_loop.label",
"description": "meta.workflow_packages.contextual_iterative_loop.description",
"tags": ["context", "loop", "map-reduce"],
"workflow": {
"nodes": [
{
"id": "list_files",
"type": "tools.list_files",
"inputs": {"path": "."},
"outputs": {"files": "repo_files"}
},
{
"id": "filter_python",
"type": "utils.filter_list",
"inputs": {"items": "$repo_files", "mode": "regex", "pattern": "\\.py$"},
"outputs": {"items": "python_files"}
},
{
"id": "map_python",
"type": "utils.map_list",
"inputs": {"items": "$python_files", "template": "PY: {item}"},
"outputs": {"items": "python_lines"}
},
{
"id": "reduce_python",
"type": "utils.reduce_list",
"inputs": {"items": "$python_lines", "separator": "\\n"},
"outputs": {"result": "python_summary"}
},
{
"id": "seed_messages",
"type": "core.seed_messages",
"outputs": {"messages": "messages"}
},
{
"id": "append_repo_summary",
"type": "core.append_context_message",
"inputs": {"messages": "$messages", "context": "$python_summary"},
"outputs": {"messages": "messages"}
},
{
"id": "append_user_instruction",
"type": "core.append_user_instruction",
"inputs": {"messages": "$messages"},
"outputs": {"messages": "messages"}
},
{
"id": "main_loop",
"type": "control.loop",
"inputs": {"max_iterations": 5, "stop_when": "$no_tool_calls", "stop_on": "true"},
"body": [
{
"id": "ai_request",
"type": "core.ai_request",
"inputs": {"messages": "$messages"},
"outputs": {
"response": "llm_response",
"has_tool_calls": "has_tool_calls",
"tool_calls_count": "tool_calls_count"
}
},
{
"id": "run_tool_calls",
"type": "core.run_tool_calls",
"inputs": {"response": "$llm_response"},
"outputs": {"tool_results": "tool_results", "no_tool_calls": "no_tool_calls"}
},
{
"id": "append_tool_results",
"type": "core.append_tool_results",
"inputs": {"messages": "$messages", "tool_results": "$tool_results"},
"outputs": {"messages": "messages"}
}
]
}
]
}
}

View File

@@ -0,0 +1,65 @@
{
"id": "game_tick_loop",
"label": "meta.workflow_packages.game_tick_loop.label",
"description": "meta.workflow_packages.game_tick_loop.description",
"tags": ["game", "loop", "ticks"],
"workflow": {
"nodes": [
{
"id": "seed_messages",
"type": "core.seed_messages",
"outputs": {"messages": "messages"}
},
{
"id": "map_ticks",
"type": "utils.map_list",
"inputs": {
"items": ["tick_start", "tick_update", "tick_render"],
"template": "Tick: {item}"
},
"outputs": {"items": "tick_lines"}
},
{
"id": "reduce_ticks",
"type": "utils.reduce_list",
"inputs": {"items": "$tick_lines", "separator": "\\n"},
"outputs": {"result": "tick_context"}
},
{
"id": "append_tick_context",
"type": "core.append_context_message",
"inputs": {"messages": "$messages", "context": "$tick_context"},
"outputs": {"messages": "messages"}
},
{
"id": "main_loop",
"type": "control.loop",
"inputs": {"max_iterations": 3, "stop_when": "$no_tool_calls", "stop_on": "true"},
"body": [
{
"id": "ai_request",
"type": "core.ai_request",
"inputs": {"messages": "$messages"},
"outputs": {
"response": "llm_response",
"has_tool_calls": "has_tool_calls",
"tool_calls_count": "tool_calls_count"
}
},
{
"id": "run_tool_calls",
"type": "core.run_tool_calls",
"inputs": {"response": "$llm_response"},
"outputs": {"tool_results": "tool_results", "no_tool_calls": "no_tool_calls"}
},
{
"id": "append_tool_results",
"type": "core.append_tool_results",
"inputs": {"messages": "$messages", "tool_results": "$tool_results"},
"outputs": {"messages": "messages"}
}
]
}
]
}
}