Introduce Workflow Engine and Metadata Integration: Add tests, update Web UI, refactor main execution flow.

This commit is contained in:
2026-01-09 14:28:43 +00:00
parent 580d89b281
commit a46f4fc5d9
7 changed files with 312 additions and 73 deletions

View File

@@ -6,15 +6,25 @@ import json
def load_messages():
"""Load messages based on APP_LANG environment variable."""
"""Load messages based on APP_LANG environment variable and metadata."""
metadata_path = os.path.join(os.path.dirname(__file__), "metadata.json")
if os.path.exists(metadata_path):
with open(metadata_path, "r", encoding="utf-8") as f:
metadata = json.load(f)
messages_map = metadata.get("messages", {})
else:
messages_map = {}
lang = os.environ.get("APP_LANG", "en")
messages_path = os.path.join(
os.path.dirname(__file__), f"messages_{lang}.json"
)
# Get filename from metadata or fallback to default pattern
messages_file = messages_map.get(lang, f"messages_{lang}.json")
messages_path = os.path.join(os.path.dirname(__file__), messages_file)
if not os.path.exists(messages_path):
# Fallback to English if the requested language file doesn't exist
messages_path = os.path.join(
os.path.dirname(__file__), "messages_en.json"
)
# Fallback to English from metadata or default messages_en.json
en_file = messages_map.get("en", "messages_en.json")
messages_path = os.path.join(os.path.dirname(__file__), en_file)
with open(messages_path, "r", encoding="utf-8") as f:
return json.load(f)

View File

@@ -270,6 +270,139 @@ def handle_tool_calls(resp_msg, tool_map: dict, gh: GitHubIntegration, msgs: dic
return tool_results
class WorkflowEngine:
"""Interpret and execute a JSON-defined workflow."""
def __init__(self, workflow_config, context):
self.workflow_config = workflow_config
self.context = context
self.state = {}
def execute(self):
"""Execute the workflow sequence."""
for phase in self.workflow_config:
if phase.get("type") == "loop":
self._execute_loop(phase)
else:
self._execute_phase(phase)
def _execute_phase(self, phase):
"""Execute a phase which contains steps."""
logger.info(f"--- Executing phase: {phase.get('name', 'unnamed')} ---")
for step in phase.get("steps", []):
self._execute_step(step)
def _execute_loop(self, phase):
"""Execute a loop of steps."""
max_iterations = phase.get("max_iterations", 10)
if self.context["args"].once:
max_iterations = 2 # At most 2 passes for --once
iteration = 0
while iteration < max_iterations:
iteration += 1
logger.info(f"--- {phase.get('name', 'loop')} Iteration {iteration} ---")
should_stop = False
for step in phase.get("steps", []):
result = self._execute_step(step)
if step.get("stop_if_no_tools") and result is True:
should_stop = True
break
if should_stop or (self.context["args"].once and iteration >= 1 and not self.state.get("llm_response").tool_calls):
break
if self.context["args"].once and iteration == 2:
break
def _execute_step(self, step):
"""Execute a single workflow step."""
step_type = step.get("type")
output_key = step.get("output_key")
try:
if step_type == "load_context":
sdlc_context = get_sdlc_context(self.context["gh"], self.context["msgs"])
if output_key:
self.state[output_key] = sdlc_context
return sdlc_context
elif step_type == "prepare_messages":
prompt = self.context["prompt"]
msgs = self.context["msgs"]
sdlc_context_val = self.state.get(step.get("input_context"))
messages = list(prompt["messages"]) # Copy to avoid mutating original prompt
if sdlc_context_val:
messages.append(
{
"role": "system",
"content": f"{msgs['sdlc_context_label']}{sdlc_context_val}",
}
)
messages.append({"role": "user", "content": msgs["user_next_step"]})
if output_key:
self.state[output_key] = messages
return messages
elif step_type == "llm_gen":
messages = self.state.get(step.get("input_messages"))
response = get_completion(
self.context["client"],
self.context["model_name"],
messages,
self.context["tools"]
)
resp_msg = response.choices[0].message
logger.info(
resp_msg.content
if resp_msg.content
else self.context["msgs"]["info_tool_call_requested"]
)
messages.append(resp_msg) # Append AI response to messages
if output_key:
self.state[output_key] = resp_msg
return resp_msg
elif step_type == "process_response":
resp_msg = self.state.get(step.get("input_response"))
tool_results = handle_tool_calls(
resp_msg,
self.context["tool_map"],
self.context["gh"],
self.context["msgs"],
dry_run=self.context["args"].dry_run,
yolo=self.context["args"].yolo
)
if output_key:
self.state[output_key] = tool_results
if step.get("stop_if_no_tools") and not resp_msg.tool_calls:
notify_all(f"AutoMetabuilder task complete: {resp_msg.content[:100]}...")
return True # Signal to stop loop
return False
elif step_type == "update_messages":
tool_results = self.state.get(step.get("input_results"))
target_messages = self.state.get(step.get("target_messages"))
if tool_results and target_messages is not None:
target_messages.extend(tool_results)
# Check for MVP if yolo
if self.context["args"].yolo and is_mvp_reached():
logger.info("MVP reached. Stopping YOLO loop.")
notify_all("AutoMetabuilder YOLO loop stopped: MVP reached.")
return True
else:
logger.error(f"Unknown step type: {step_type}")
except Exception as e:
logger.error(f"Error executing step {step_type}: {e}")
raise
return None
def main():
"""Main function to run AutoMetabuilder."""
parser = argparse.ArgumentParser(description="AutoMetabuilder: AI-driven SDLC assistant.")
@@ -306,8 +439,13 @@ def main():
prompt = load_prompt_yaml()
# Load Metadata
metadata_path = os.path.join(os.path.dirname(__file__), "metadata.json")
with open(metadata_path, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Load tools for SDLC operations from JSON file
tools_path = os.path.join(os.path.dirname(__file__), "tools.json")
tools_path = os.path.join(os.path.dirname(__file__), metadata.get("tools_path", "tools.json"))
with open(tools_path, "r", encoding="utf-8") as f:
tools = json.load(f)
@@ -329,71 +467,27 @@ def main():
# Load plugins and update tool_map and tools list
load_plugins(tool_map, tools)
# Add SDLC Context if available
sdlc_context_val = get_sdlc_context(gh, msgs)
messages = prompt["messages"]
if sdlc_context_val:
messages.append(
{
"role": "system",
"content": f"{msgs['sdlc_context_label']}{sdlc_context_val}",
}
)
# Add runtime request
messages.append({"role": "user", "content": msgs["user_next_step"]})
model_name = os.environ.get("LLM_MODEL", prompt.get("model", DEFAULT_MODEL))
# Multi-iteration loop
iteration = 0
max_iterations = 10
while iteration < max_iterations:
iteration += 1
logger.info(f"--- Iteration {iteration} ---")
response = get_completion(client, model_name, messages, tools)
resp_msg = response.choices[0].message
logger.info(
resp_msg.content
if resp_msg.content
else msgs["info_tool_call_requested"]
)
messages.append(resp_msg)
if not resp_msg.tool_calls:
# If no more tools requested, we are done
notify_all(f"AutoMetabuilder task complete: {resp_msg.content[:100]}...")
break
# Handle tool calls
tool_results = handle_tool_calls(resp_msg, tool_map, gh, msgs, dry_run=args.dry_run, yolo=args.yolo)
messages.extend(tool_results)
# Load Workflow
workflow_path = os.path.join(os.path.dirname(__file__), metadata.get("workflow_path", "workflow.json"))
with open(workflow_path, "r", encoding="utf-8") as f:
workflow_config = json.load(f)
if args.yolo and is_mvp_reached():
logger.info("MVP reached. Stopping YOLO loop.")
notify_all("AutoMetabuilder YOLO loop stopped: MVP reached.")
break
if args.once:
# If --once is set, we do one more pass to show the final result
logger.info(msgs.get("info_second_pass", "Performing second pass with tool results..."))
response = get_completion(client, model_name, messages, tools)
final_msg = response.choices[0].message
logger.info(final_msg.content if final_msg.content else msgs["info_tool_call_requested"])
notify_all(f"AutoMetabuilder task complete: {final_msg.content[:100]}...")
# For --once, we still handle tool calls if any in the second pass, but then stop.
if final_msg.tool_calls:
handle_tool_calls(final_msg, tool_map, gh, msgs, dry_run=args.dry_run, yolo=args.yolo)
break
else:
logger.warning(f"Reached maximum iterations ({max_iterations}). Stopping.")
notify_all(f"AutoMetabuilder stopped: Reached {max_iterations} iterations.")
# Initialize Context for Workflow Engine
workflow_context = {
"gh": gh,
"msgs": msgs,
"client": client,
"prompt": prompt,
"tools": tools,
"tool_map": tool_map,
"model_name": model_name,
"args": args
}
engine = WorkflowEngine(workflow_config, workflow_context)
engine.execute()
if __name__ == "__main__":

View File

@@ -0,0 +1,13 @@
{
"tools_path": "tools.json",
"workflow_path": "workflow.json",
"messages": {
"en": "messages_en.json",
"es": "messages_es.json",
"fr": "messages_fr.json",
"nl": "messages_nl.json",
"pirate": "messages_pirate.json"
},
"project_name": "AutoMetabuilder",
"version": "1.0.0"
}

View File

@@ -70,7 +70,21 @@ def get_env_vars():
env_vars[key] = value
return env_vars
def get_metadata():
pkg_dir = os.path.dirname(os.path.dirname(__file__))
metadata_path = os.path.join(pkg_dir, "metadata.json")
if not os.path.exists(metadata_path):
return {}
with open(metadata_path, "r", encoding="utf-8") as f:
return json.load(f)
def get_translations():
metadata = get_metadata()
messages_map = metadata.get("messages", {})
if messages_map:
return messages_map
# Fallback to scanning if metadata is empty
pkg_dir = os.path.dirname(os.path.dirname(__file__))
files = [f for f in os.listdir(pkg_dir) if f.startswith("messages_") and f.endswith(".json")]
translations = {}
@@ -86,12 +100,23 @@ def get_prompt_content():
with open(prompt_path, "r", encoding="utf-8") as f:
return f.read()
def get_workflow_content():
pkg_dir = os.path.dirname(os.path.dirname(__file__))
metadata = get_metadata()
workflow_file = metadata.get("workflow_path", "workflow.json")
workflow_path = os.path.join(pkg_dir, workflow_file)
if not os.path.exists(workflow_path):
return ""
with open(workflow_path, "r", encoding="utf-8") as f:
return f.read()
@app.get("/", response_class=HTMLResponse)
async def read_item(request: Request, username: str = Depends(get_current_user)):
logs = get_recent_logs()
env_vars = get_env_vars()
translations = get_translations()
prompt_content = get_prompt_content()
workflow_content = get_workflow_content()
is_running = bot_process is not None
mvp_status = is_mvp_reached()
return templates.TemplateResponse("index.html", {
@@ -100,6 +125,7 @@ async def read_item(request: Request, username: str = Depends(get_current_user))
"env_vars": env_vars,
"translations": translations,
"prompt_content": prompt_content,
"workflow_content": workflow_content,
"is_running": is_running,
"mvp_reached": mvp_status,
"username": username
@@ -119,6 +145,16 @@ async def update_prompt(content: str = Form(...), username: str = Depends(get_cu
f.write(content)
return RedirectResponse(url="/", status_code=303)
@app.post("/workflow")
async def update_workflow(content: str = Form(...), username: str = Depends(get_current_user)):
pkg_dir = os.path.dirname(os.path.dirname(__file__))
metadata = get_metadata()
workflow_file = metadata.get("workflow_path", "workflow.json")
workflow_path = os.path.join(pkg_dir, workflow_file)
with open(workflow_path, "w", encoding="utf-8") as f:
f.write(content)
return RedirectResponse(url="/", status_code=303)
@app.post("/settings")
async def update_settings(request: Request, username: str = Depends(get_current_user)):
form_data = await request.form()
@@ -150,14 +186,27 @@ async def get_logs(username: str = Depends(get_current_user)):
@app.post("/translations")
async def create_translation(lang: str = Form(...), username: str = Depends(get_current_user)):
pkg_dir = os.path.dirname(os.path.dirname(__file__))
en_path = os.path.join(pkg_dir, "messages_en.json")
new_path = os.path.join(pkg_dir, f"messages_{lang}.json")
metadata = get_metadata()
messages_map = metadata.get("messages", {})
en_file = messages_map.get("en", "messages_en.json")
en_path = os.path.join(pkg_dir, en_file)
new_file = f"messages_{lang}.json"
new_path = os.path.join(pkg_dir, new_file)
if not os.path.exists(new_path):
with open(en_path, "r", encoding="utf-8") as f:
content = json.load(f)
with open(new_path, "w", encoding="utf-8") as f:
json.dump(content, f, indent=2)
# Update metadata.json
messages_map[lang] = new_file
metadata["messages"] = messages_map
metadata_path = os.path.join(pkg_dir, "metadata.json")
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2)
return RedirectResponse(url="/", status_code=303)

View File

@@ -92,6 +92,16 @@
</form>
</div>
</div>
<div class="row mt-4 mb-5">
<div class="col-12">
<h2>Workflow (workflow.json)</h2>
<form action="/workflow" method="post">
<textarea name="content" class="form-control" rows="15">{{ workflow_content }}</textarea>
<button type="submit" class="btn btn-success mt-2">Save Workflow</button>
</form>
</div>
</div>
<script>
async function refreshLogs() {
try {

View File

@@ -0,0 +1,39 @@
[
{
"name": "initialize_sdlc",
"steps": [
{
"type": "load_context",
"output_key": "sdlc_context"
},
{
"type": "prepare_messages",
"input_context": "sdlc_context",
"output_key": "messages"
}
]
},
{
"name": "main_loop",
"type": "loop",
"max_iterations": 10,
"steps": [
{
"type": "llm_gen",
"input_messages": "messages",
"output_key": "llm_response"
},
{
"type": "process_response",
"input_response": "llm_response",
"output_key": "tool_results",
"stop_if_no_tools": true
},
{
"type": "update_messages",
"input_results": "tool_results",
"target_messages": "messages"
}
]
}
]

24
tests/test_metadata.py Normal file
View File

@@ -0,0 +1,24 @@
import unittest
import os
import json
from autometabuilder import load_messages
class TestMetadata(unittest.TestCase):
def test_metadata_exists(self):
metadata_path = os.path.join("src", "autometabuilder", "metadata.json")
self.assertTrue(os.path.exists(metadata_path))
with open(metadata_path, "r") as f:
metadata = json.load(f)
self.assertIn("tools_path", metadata)
self.assertIn("workflow_path", metadata)
self.assertIn("messages", metadata)
def test_load_messages_with_metadata(self):
# Test default language (en)
messages = load_messages()
self.assertIsInstance(messages, dict)
self.assertIn("sdlc_context_label", messages)
if __name__ == '__main__':
unittest.main()