diff --git a/src/autometabuilder/__init__.py b/src/autometabuilder/__init__.py index 9774f66..d0d8fd0 100644 --- a/src/autometabuilder/__init__.py +++ b/src/autometabuilder/__init__.py @@ -6,15 +6,25 @@ import json def load_messages(): - """Load messages based on APP_LANG environment variable.""" + """Load messages based on APP_LANG environment variable and metadata.""" + metadata_path = os.path.join(os.path.dirname(__file__), "metadata.json") + if os.path.exists(metadata_path): + with open(metadata_path, "r", encoding="utf-8") as f: + metadata = json.load(f) + messages_map = metadata.get("messages", {}) + else: + messages_map = {} + lang = os.environ.get("APP_LANG", "en") - messages_path = os.path.join( - os.path.dirname(__file__), f"messages_{lang}.json" - ) + + # Get filename from metadata or fallback to default pattern + messages_file = messages_map.get(lang, f"messages_{lang}.json") + messages_path = os.path.join(os.path.dirname(__file__), messages_file) + if not os.path.exists(messages_path): - # Fallback to English if the requested language file doesn't exist - messages_path = os.path.join( - os.path.dirname(__file__), "messages_en.json" - ) + # Fallback to English from metadata or default messages_en.json + en_file = messages_map.get("en", "messages_en.json") + messages_path = os.path.join(os.path.dirname(__file__), en_file) + with open(messages_path, "r", encoding="utf-8") as f: return json.load(f) diff --git a/src/autometabuilder/main.py b/src/autometabuilder/main.py index c89ab66..8bc3e80 100644 --- a/src/autometabuilder/main.py +++ b/src/autometabuilder/main.py @@ -270,6 +270,139 @@ def handle_tool_calls(resp_msg, tool_map: dict, gh: GitHubIntegration, msgs: dic return tool_results +class WorkflowEngine: + """Interpret and execute a JSON-defined workflow.""" + + def __init__(self, workflow_config, context): + self.workflow_config = workflow_config + self.context = context + self.state = {} + + def execute(self): + """Execute the workflow sequence.""" + for phase in self.workflow_config: + if phase.get("type") == "loop": + self._execute_loop(phase) + else: + self._execute_phase(phase) + + def _execute_phase(self, phase): + """Execute a phase which contains steps.""" + logger.info(f"--- Executing phase: {phase.get('name', 'unnamed')} ---") + for step in phase.get("steps", []): + self._execute_step(step) + + def _execute_loop(self, phase): + """Execute a loop of steps.""" + max_iterations = phase.get("max_iterations", 10) + if self.context["args"].once: + max_iterations = 2 # At most 2 passes for --once + + iteration = 0 + while iteration < max_iterations: + iteration += 1 + logger.info(f"--- {phase.get('name', 'loop')} Iteration {iteration} ---") + should_stop = False + for step in phase.get("steps", []): + result = self._execute_step(step) + if step.get("stop_if_no_tools") and result is True: + should_stop = True + break + + if should_stop or (self.context["args"].once and iteration >= 1 and not self.state.get("llm_response").tool_calls): + break + + if self.context["args"].once and iteration == 2: + break + + def _execute_step(self, step): + """Execute a single workflow step.""" + step_type = step.get("type") + output_key = step.get("output_key") + + try: + if step_type == "load_context": + sdlc_context = get_sdlc_context(self.context["gh"], self.context["msgs"]) + if output_key: + self.state[output_key] = sdlc_context + return sdlc_context + + elif step_type == "prepare_messages": + prompt = self.context["prompt"] + msgs = self.context["msgs"] + sdlc_context_val = self.state.get(step.get("input_context")) + messages = list(prompt["messages"]) # Copy to avoid mutating original prompt + if sdlc_context_val: + messages.append( + { + "role": "system", + "content": f"{msgs['sdlc_context_label']}{sdlc_context_val}", + } + ) + messages.append({"role": "user", "content": msgs["user_next_step"]}) + if output_key: + self.state[output_key] = messages + return messages + + elif step_type == "llm_gen": + messages = self.state.get(step.get("input_messages")) + response = get_completion( + self.context["client"], + self.context["model_name"], + messages, + self.context["tools"] + ) + resp_msg = response.choices[0].message + logger.info( + resp_msg.content + if resp_msg.content + else self.context["msgs"]["info_tool_call_requested"] + ) + messages.append(resp_msg) # Append AI response to messages + if output_key: + self.state[output_key] = resp_msg + return resp_msg + + elif step_type == "process_response": + resp_msg = self.state.get(step.get("input_response")) + tool_results = handle_tool_calls( + resp_msg, + self.context["tool_map"], + self.context["gh"], + self.context["msgs"], + dry_run=self.context["args"].dry_run, + yolo=self.context["args"].yolo + ) + if output_key: + self.state[output_key] = tool_results + + if step.get("stop_if_no_tools") and not resp_msg.tool_calls: + notify_all(f"AutoMetabuilder task complete: {resp_msg.content[:100]}...") + return True # Signal to stop loop + return False + + elif step_type == "update_messages": + tool_results = self.state.get(step.get("input_results")) + target_messages = self.state.get(step.get("target_messages")) + if tool_results and target_messages is not None: + target_messages.extend(tool_results) + + # Check for MVP if yolo + if self.context["args"].yolo and is_mvp_reached(): + logger.info("MVP reached. Stopping YOLO loop.") + notify_all("AutoMetabuilder YOLO loop stopped: MVP reached.") + return True + + else: + logger.error(f"Unknown step type: {step_type}") + + except Exception as e: + logger.error(f"Error executing step {step_type}: {e}") + raise + + return None + + def main(): """Main function to run AutoMetabuilder.""" parser = argparse.ArgumentParser(description="AutoMetabuilder: AI-driven SDLC assistant.") @@ -306,8 +439,13 @@ def main(): prompt = load_prompt_yaml() + # Load Metadata + metadata_path = os.path.join(os.path.dirname(__file__), "metadata.json") + with open(metadata_path, "r", encoding="utf-8") as f: + metadata = json.load(f) + # Load tools for SDLC operations from JSON file - tools_path = os.path.join(os.path.dirname(__file__), "tools.json") + tools_path = os.path.join(os.path.dirname(__file__), metadata.get("tools_path", "tools.json")) with open(tools_path, "r", encoding="utf-8") as f: tools = json.load(f) @@ -329,71 +467,27 @@ def main(): # Load plugins and update tool_map and tools list load_plugins(tool_map, tools) - # Add SDLC Context if available - sdlc_context_val = get_sdlc_context(gh, msgs) - - messages = prompt["messages"] - if sdlc_context_val: - messages.append( - { - "role": "system", - "content": f"{msgs['sdlc_context_label']}{sdlc_context_val}", - } - ) - - # Add runtime request - messages.append({"role": "user", "content": msgs["user_next_step"]}) - model_name = os.environ.get("LLM_MODEL", prompt.get("model", DEFAULT_MODEL)) - # Multi-iteration loop - iteration = 0 - max_iterations = 10 - - while iteration < max_iterations: - iteration += 1 - logger.info(f"--- Iteration {iteration} ---") - - response = get_completion(client, model_name, messages, tools) - resp_msg = response.choices[0].message - - logger.info( - resp_msg.content - if resp_msg.content - else msgs["info_tool_call_requested"] - ) - - messages.append(resp_msg) - - if not resp_msg.tool_calls: - # If no more tools requested, we are done - notify_all(f"AutoMetabuilder task complete: {resp_msg.content[:100]}...") - break - - # Handle tool calls - tool_results = handle_tool_calls(resp_msg, tool_map, gh, msgs, dry_run=args.dry_run, yolo=args.yolo) - messages.extend(tool_results) + # Load Workflow + workflow_path = os.path.join(os.path.dirname(__file__), metadata.get("workflow_path", "workflow.json")) + with open(workflow_path, "r", encoding="utf-8") as f: + workflow_config = json.load(f) - if args.yolo and is_mvp_reached(): - logger.info("MVP reached. Stopping YOLO loop.") - notify_all("AutoMetabuilder YOLO loop stopped: MVP reached.") - break - - if args.once: - # If --once is set, we do one more pass to show the final result - logger.info(msgs.get("info_second_pass", "Performing second pass with tool results...")) - response = get_completion(client, model_name, messages, tools) - final_msg = response.choices[0].message - logger.info(final_msg.content if final_msg.content else msgs["info_tool_call_requested"]) - notify_all(f"AutoMetabuilder task complete: {final_msg.content[:100]}...") - - # For --once, we still handle tool calls if any in the second pass, but then stop. - if final_msg.tool_calls: - handle_tool_calls(final_msg, tool_map, gh, msgs, dry_run=args.dry_run, yolo=args.yolo) - break - else: - logger.warning(f"Reached maximum iterations ({max_iterations}). Stopping.") - notify_all(f"AutoMetabuilder stopped: Reached {max_iterations} iterations.") + # Initialize Context for Workflow Engine + workflow_context = { + "gh": gh, + "msgs": msgs, + "client": client, + "prompt": prompt, + "tools": tools, + "tool_map": tool_map, + "model_name": model_name, + "args": args + } + + engine = WorkflowEngine(workflow_config, workflow_context) + engine.execute() if __name__ == "__main__": diff --git a/src/autometabuilder/metadata.json b/src/autometabuilder/metadata.json new file mode 100644 index 0000000..c1f4741 --- /dev/null +++ b/src/autometabuilder/metadata.json @@ -0,0 +1,13 @@ +{ + "tools_path": "tools.json", + "workflow_path": "workflow.json", + "messages": { + "en": "messages_en.json", + "es": "messages_es.json", + "fr": "messages_fr.json", + "nl": "messages_nl.json", + "pirate": "messages_pirate.json" + }, + "project_name": "AutoMetabuilder", + "version": "1.0.0" +} diff --git a/src/autometabuilder/web/server.py b/src/autometabuilder/web/server.py index a7de809..22dcce8 100644 --- a/src/autometabuilder/web/server.py +++ b/src/autometabuilder/web/server.py @@ -70,7 +70,21 @@ def get_env_vars(): env_vars[key] = value return env_vars +def get_metadata(): + pkg_dir = os.path.dirname(os.path.dirname(__file__)) + metadata_path = os.path.join(pkg_dir, "metadata.json") + if not os.path.exists(metadata_path): + return {} + with open(metadata_path, "r", encoding="utf-8") as f: + return json.load(f) + def get_translations(): + metadata = get_metadata() + messages_map = metadata.get("messages", {}) + if messages_map: + return messages_map + + # Fallback to scanning if metadata is empty pkg_dir = os.path.dirname(os.path.dirname(__file__)) files = [f for f in os.listdir(pkg_dir) if f.startswith("messages_") and f.endswith(".json")] translations = {} @@ -86,12 +100,23 @@ def get_prompt_content(): with open(prompt_path, "r", encoding="utf-8") as f: return f.read() +def get_workflow_content(): + pkg_dir = os.path.dirname(os.path.dirname(__file__)) + metadata = get_metadata() + workflow_file = metadata.get("workflow_path", "workflow.json") + workflow_path = os.path.join(pkg_dir, workflow_file) + if not os.path.exists(workflow_path): + return "" + with open(workflow_path, "r", encoding="utf-8") as f: + return f.read() + @app.get("/", response_class=HTMLResponse) async def read_item(request: Request, username: str = Depends(get_current_user)): logs = get_recent_logs() env_vars = get_env_vars() translations = get_translations() prompt_content = get_prompt_content() + workflow_content = get_workflow_content() is_running = bot_process is not None mvp_status = is_mvp_reached() return templates.TemplateResponse("index.html", { @@ -100,6 +125,7 @@ async def read_item(request: Request, username: str = Depends(get_current_user)) "env_vars": env_vars, "translations": translations, "prompt_content": prompt_content, + "workflow_content": workflow_content, "is_running": is_running, "mvp_reached": mvp_status, "username": username @@ -119,6 +145,16 @@ async def update_prompt(content: str = Form(...), username: str = Depends(get_cu f.write(content) return RedirectResponse(url="/", status_code=303) +@app.post("/workflow") +async def update_workflow(content: str = Form(...), username: str = Depends(get_current_user)): + pkg_dir = os.path.dirname(os.path.dirname(__file__)) + metadata = get_metadata() + workflow_file = metadata.get("workflow_path", "workflow.json") + workflow_path = os.path.join(pkg_dir, workflow_file) + with open(workflow_path, "w", encoding="utf-8") as f: + f.write(content) + return RedirectResponse(url="/", status_code=303) + @app.post("/settings") async def update_settings(request: Request, username: str = Depends(get_current_user)): form_data = await request.form() @@ -150,14 +186,27 @@ async def get_logs(username: str = Depends(get_current_user)): @app.post("/translations") async def create_translation(lang: str = Form(...), username: str = Depends(get_current_user)): pkg_dir = os.path.dirname(os.path.dirname(__file__)) - en_path = os.path.join(pkg_dir, "messages_en.json") - new_path = os.path.join(pkg_dir, f"messages_{lang}.json") + metadata = get_metadata() + messages_map = metadata.get("messages", {}) + + en_file = messages_map.get("en", "messages_en.json") + en_path = os.path.join(pkg_dir, en_file) + + new_file = f"messages_{lang}.json" + new_path = os.path.join(pkg_dir, new_file) if not os.path.exists(new_path): with open(en_path, "r", encoding="utf-8") as f: content = json.load(f) with open(new_path, "w", encoding="utf-8") as f: json.dump(content, f, indent=2) + + # Update metadata.json + messages_map[lang] = new_file + metadata["messages"] = messages_map + metadata_path = os.path.join(pkg_dir, "metadata.json") + with open(metadata_path, "w", encoding="utf-8") as f: + json.dump(metadata, f, indent=2) return RedirectResponse(url="/", status_code=303) diff --git a/src/autometabuilder/web/templates/index.html b/src/autometabuilder/web/templates/index.html index ec04ed9..82decd0 100644 --- a/src/autometabuilder/web/templates/index.html +++ b/src/autometabuilder/web/templates/index.html @@ -92,6 +92,16 @@ + +
+
+

Workflow (workflow.json)

+
+ + +
+
+