Move implementations from data module into workflow plugins

Co-authored-by: johndoe6345789 <224850594+johndoe6345789@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-01-10 20:58:33 +00:00
parent d19ec5fa16
commit 2d1b80cb16
20 changed files with 521 additions and 51 deletions

View File

@@ -1,5 +1,4 @@
"""Workflow plugin: build prompt YAML."""
from ....data.prompt import build_prompt_yaml
def run(_runtime, inputs):
@@ -8,5 +7,23 @@ def run(_runtime, inputs):
user_content = inputs.get("user_content")
model = inputs.get("model")
yaml_content = build_prompt_yaml(system_content, user_content, model)
def indent_block(text):
if not text:
return ""
return "\n ".join(line.rstrip() for line in text.splitlines())
model_value = model or "openai/gpt-4o"
system_block = indent_block(system_content)
user_block = indent_block(user_content)
yaml_content = f"""messages:
- role: system
content: >-
{system_block}
- role: user
content: >-
{user_block}
model: {model_value}
"""
return {"result": yaml_content}

View File

@@ -1,5 +1,8 @@
"""Workflow plugin: create translation."""
from ....data.translations import create_translation
import json
import shutil
from pathlib import Path
from autometabuilder.loaders.metadata_loader import load_metadata
def run(_runtime, inputs):
@@ -8,5 +11,55 @@ def run(_runtime, inputs):
if not lang:
return {"error": "lang is required"}
created = create_translation(lang)
return {"result": created}
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
# Helper to read JSON
def read_json(path_obj):
if not path_obj.exists():
return {}
try:
return json.loads(path_obj.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
# Load metadata
metadata_base = read_json(package_root / "metadata.json")
messages_map = metadata_base.get("messages", {})
# Check if translation already exists
if lang in messages_map:
return {"result": False}
# Resolve base target
def resolve_target(language):
if language in messages_map:
return messages_map[language]
if (package_root / "messages" / language).exists():
return f"messages/{language}"
return f"messages_{language}.json"
base = resolve_target("en")
base_file = package_root / base
if not base_file.exists():
return {"result": False}
# Copy base to new language
if base_file.is_dir():
target_name = f"messages/{lang}"
target_path = package_root / target_name
shutil.copytree(base_file, target_path)
else:
target_name = f"messages_{lang}.json"
target_path = package_root / target_name
shutil.copy(base_file, target_path)
# Update metadata
messages_map[lang] = target_name
metadata_base["messages"] = messages_map
(package_root / "metadata.json").write_text(
json.dumps(metadata_base, indent=2, ensure_ascii=False),
encoding="utf-8"
)
return {"result": True}

View File

@@ -1,5 +1,8 @@
"""Workflow plugin: delete translation."""
from ....data.translations import delete_translation
import json
import shutil
from pathlib import Path
from autometabuilder.loaders.metadata_loader import load_metadata
def run(_runtime, inputs):
@@ -8,5 +11,43 @@ def run(_runtime, inputs):
if not lang:
return {"error": "lang is required"}
deleted = delete_translation(lang)
return {"result": deleted}
# Cannot delete English
if lang == "en":
return {"result": False}
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
# Helper to read JSON
def read_json(path_obj):
if not path_obj.exists():
return {}
try:
return json.loads(path_obj.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
# Load metadata
metadata_base = read_json(package_root / "metadata.json")
messages_map = metadata_base.get("messages", {})
# Check if translation exists
if lang not in messages_map:
return {"result": False}
# Delete the file/directory
target = package_root / messages_map[lang]
if target.exists():
if target.is_dir():
shutil.rmtree(target)
else:
target.unlink()
# Update metadata
del messages_map[lang]
metadata_base["messages"] = messages_map
(package_root / "metadata.json").write_text(
json.dumps(metadata_base, indent=2, ensure_ascii=False),
encoding="utf-8"
)
return {"result": True}

View File

@@ -1,8 +1,22 @@
"""Workflow plugin: get environment variables."""
from ....data.env import get_env_vars
from pathlib import Path
def run(_runtime, _inputs):
"""Get environment variables from .env file."""
env_vars = get_env_vars()
return {"result": env_vars}
env_path = Path(".env")
if not env_path.exists():
return {"result": {}}
result = {}
for raw in env_path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
value = value.strip().strip("'\"")
result[key.strip()] = value
return {"result": result}

View File

@@ -1,8 +1,22 @@
"""Workflow plugin: get navigation items."""
from ....data.navigation import get_navigation_items
import json
from pathlib import Path
def run(_runtime, _inputs):
"""Get navigation items."""
items = get_navigation_items()
return {"result": items}
# Path calculation
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
nav_path = package_root / "web" / "navigation_items.json"
if not nav_path.exists():
return {"result": []}
try:
nav = json.loads(nav_path.read_text(encoding="utf-8"))
if isinstance(nav, list):
return {"result": nav}
except json.JSONDecodeError:
pass
return {"result": []}

View File

@@ -1,8 +1,12 @@
"""Workflow plugin: get prompt content."""
from ....data.prompt import get_prompt_content
import os
from pathlib import Path
def run(_runtime, _inputs):
"""Get prompt content from prompt file."""
content = get_prompt_content()
return {"result": content}
path = Path(os.environ.get("PROMPT_PATH", "prompt.yml"))
if path.is_file():
content = path.read_text(encoding="utf-8")
return {"result": content}
return {"result": ""}

View File

@@ -1,9 +1,20 @@
"""Workflow plugin: get recent logs."""
from ....data.logs import get_recent_logs
from pathlib import Path
def run(_runtime, inputs):
"""Get recent log entries."""
lines = inputs.get("lines", 50)
logs = get_recent_logs(lines)
return {"result": logs}
# Use hardcoded path logic from data/paths.py
package_root = Path(__file__).resolve().parents[5] # Go up to backend/autometabuilder
repo_root = package_root.parent.parent
log_file = repo_root / "autometabuilder.log"
if not log_file.exists():
return {"result": ""}
with log_file.open("r", encoding="utf-8") as handle:
content = handle.readlines()
return {"result": "".join(content[-lines:])}

View File

@@ -1,5 +1,7 @@
"""Workflow plugin: get UI messages."""
from ....data.translations import get_ui_messages
import json
from pathlib import Path
from autometabuilder.loaders.metadata_loader import load_metadata
def run(_runtime, inputs):
@@ -13,5 +15,48 @@ def run(_runtime, inputs):
dict: UI messages with __lang key indicating the language
"""
lang = inputs.get("lang", "en")
messages = get_ui_messages(lang)
return {"result": messages}
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
# Helper to read JSON
def read_json(path_obj):
if not path_obj.exists():
return {}
try:
return json.loads(path_obj.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
# Helper to load messages from path
def load_messages(path_obj):
if path_obj.is_dir():
merged = {}
for file_path in sorted(path_obj.glob("*.json")):
merged.update(read_json(file_path))
return merged
return read_json(path_obj)
# Get messages map
metadata = load_metadata()
metadata_base = read_json(package_root / "metadata.json")
messages_map = metadata_base.get("messages", {})
# Resolve target path
def resolve_target(language):
if language in messages_map:
return messages_map[language]
if (package_root / "messages" / language).exists():
return f"messages/{language}"
return f"messages_{language}.json"
# Load base (English) and localized messages
base_name = resolve_target("en")
base = load_messages(package_root / base_name)
localized = load_messages(package_root / resolve_target(lang))
# Merge with localized overriding base
merged = dict(base)
merged.update(localized)
merged["__lang"] = lang
return {"result": merged}

View File

@@ -1,8 +1,17 @@
"""Workflow plugin: get workflow content."""
from ....data.workflow import get_workflow_content
from pathlib import Path
from autometabuilder.loaders.metadata_loader import load_metadata
def run(_runtime, _inputs):
"""Get workflow content from workflow file."""
content = get_workflow_content()
return {"result": content}
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
metadata = load_metadata()
workflow_name = metadata.get("workflow_path", "workflow.json")
workflow_path = package_root / workflow_name
if workflow_path.exists():
content = workflow_path.read_text(encoding="utf-8")
return {"result": content}
return {"result": ""}

View File

@@ -1,8 +1,32 @@
"""Workflow plugin: list translations."""
from ....data.translations import list_translations
import json
from pathlib import Path
from autometabuilder.loaders.metadata_loader import load_metadata
def run(_runtime, _inputs):
"""List all available translations."""
translations = list_translations()
return {"result": translations}
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
# Get messages map from metadata
metadata = load_metadata()
metadata_base = json.loads((package_root / "metadata.json").read_text(encoding="utf-8"))
messages_map = metadata_base.get("messages", {})
if messages_map:
return {"result": messages_map}
# Fallback: scan for messages files
fallback = {}
for candidate in package_root.glob("messages_*.json"):
name = candidate.name
language = name.removeprefix("messages_").removesuffix(".json")
fallback[language] = name
messages_dir = package_root / "messages"
if messages_dir.exists():
for candidate in messages_dir.iterdir():
if candidate.is_dir():
fallback[candidate.name] = f"messages/{candidate.name}"
return {"result": fallback}

View File

@@ -1,6 +1,6 @@
"""Workflow plugin: load messages."""
import json
from pathlib import Path
from ....data.messages_io import load_messages
def run(_runtime, inputs):
@@ -9,5 +9,23 @@ def run(_runtime, inputs):
if not path:
return {"error": "path is required"}
messages = load_messages(Path(path))
return {"result": messages}
path_obj = Path(path)
# Helper function to read JSON
def read_json(p):
if not p.exists():
return {}
try:
return json.loads(p.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
# If directory, merge all JSON files
if path_obj.is_dir():
merged = {}
for file_path in sorted(path_obj.glob("*.json")):
merged.update(read_json(file_path))
return {"result": merged}
# If file, just read it
return {"result": read_json(path_obj)}

View File

@@ -1,9 +1,47 @@
"""Workflow plugin: load translation."""
from ....data.translations import load_translation
import json
from pathlib import Path
from autometabuilder.loaders.metadata_loader import load_metadata
def run(_runtime, inputs):
"""Load translation for a specific language."""
lang = inputs.get("lang", "en")
translation = load_translation(lang)
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
# Helper to read JSON
def read_json(path_obj):
if not path_obj.exists():
return {}
try:
return json.loads(path_obj.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
# Helper to load messages from path
def load_messages(path_obj):
if path_obj.is_dir():
merged = {}
for file_path in sorted(path_obj.glob("*.json")):
merged.update(read_json(file_path))
return merged
return read_json(path_obj)
# Get messages map
metadata = load_metadata()
metadata_base = read_json(package_root / "metadata.json")
messages_map = metadata_base.get("messages", {})
# Resolve target path for language
if lang in messages_map:
target = messages_map[lang]
elif (package_root / "messages" / lang).exists():
target = f"messages/{lang}"
else:
target = f"messages_{lang}.json"
if not target:
return {"result": {}}
translation = load_messages(package_root / target)
return {"result": translation}

View File

@@ -1,8 +1,80 @@
"""Workflow plugin: load workflow packages."""
from ....data.workflow import load_workflow_packages
import json
import logging
from pathlib import Path
from autometabuilder.loaders.metadata_loader import load_metadata
logger = logging.getLogger(__name__)
def run(_runtime, _inputs):
"""Load all workflow packages."""
packages = load_workflow_packages()
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
metadata = load_metadata()
packages_name = metadata.get("workflow_packages_path", "packages")
packages_dir = package_root / packages_name
if not packages_dir.exists():
logger.warning("Packages directory not found: %s", packages_dir)
return {"result": []}
packages = []
for item in sorted(packages_dir.iterdir()):
if not item.is_dir():
continue
# Load package.json
package_json = item / "package.json"
if not package_json.exists():
logger.warning("Package %s missing package.json", item.name)
continue
try:
pkg_data = json.loads(package_json.read_text(encoding="utf-8"))
except json.JSONDecodeError:
logger.warning("Invalid package.json in %s", item.name)
continue
if not isinstance(pkg_data, dict):
logger.warning("Invalid package.json in %s", item.name)
continue
# Read workflow file
workflow_file = pkg_data.get("main", "workflow.json")
workflow_path = item / workflow_file
if not workflow_path.exists():
logger.warning("Workflow file %s not found in %s", workflow_file, item.name)
continue
try:
workflow_data = json.loads(workflow_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
logger.warning("Invalid workflow in %s", item.name)
continue
if not isinstance(workflow_data, dict):
logger.warning("Invalid workflow in %s", item.name)
continue
# Combine package metadata with workflow
metadata_info = pkg_data.get("metadata", {})
package = {
"id": pkg_data.get("name", item.name),
"name": pkg_data.get("name", item.name),
"version": pkg_data.get("version", "1.0.0"),
"description": pkg_data.get("description", ""),
"author": pkg_data.get("author", ""),
"license": pkg_data.get("license", ""),
"keywords": pkg_data.get("keywords", []),
"label": metadata_info.get("label", item.name),
"tags": metadata_info.get("tags", []),
"icon": metadata_info.get("icon", "workflow"),
"category": metadata_info.get("category", "templates"),
"workflow": workflow_data,
}
packages.append(package)
logger.debug("Loaded %d workflow packages", len(packages))
return {"result": packages}

View File

@@ -1,9 +1,15 @@
"""Workflow plugin: persist environment variables."""
from ....data.env import persist_env_vars
from pathlib import Path
def run(_runtime, inputs):
"""Persist environment variables to .env file."""
from dotenv import set_key
updates = inputs.get("updates", {})
persist_env_vars(updates)
env_path = Path(".env")
env_path.touch(exist_ok=True)
for key, value in updates.items():
set_key(env_path, key, value)
return {"result": "Environment variables persisted"}

View File

@@ -1,6 +1,6 @@
"""Workflow plugin: read JSON file."""
import json
from pathlib import Path
from ....data.json_utils import read_json
def run(_runtime, inputs):
@@ -9,5 +9,13 @@ def run(_runtime, inputs):
if not path:
return {"error": "path is required"}
json_data = read_json(Path(path))
path_obj = Path(path)
if not path_obj.exists():
return {"result": {}}
try:
json_data = json.loads(path_obj.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {"result": {}}
return {"result": json_data}

View File

@@ -1,9 +1,20 @@
"""Workflow plugin: summarize workflow packages."""
from ....data.workflow import summarize_workflow_packages
def run(_runtime, inputs):
"""Summarize workflow packages."""
packages = inputs.get("packages", [])
summary = summarize_workflow_packages(packages)
summary = []
for pkg in packages:
summary.append({
"id": pkg["id"],
"name": pkg.get("name", pkg["id"]),
"label": pkg.get("label") or pkg["id"],
"description": pkg.get("description", ""),
"tags": pkg.get("tags", []),
"version": pkg.get("version", "1.0.0"),
"category": pkg.get("category", "templates"),
})
return {"result": summary}

View File

@@ -1,5 +1,7 @@
"""Workflow plugin: update translation."""
from ....data.translations import update_translation
import json
from pathlib import Path
from autometabuilder.loaders.metadata_loader import load_metadata
def run(_runtime, inputs):
@@ -10,5 +12,57 @@ def run(_runtime, inputs):
if not lang:
return {"error": "lang is required"}
updated = update_translation(lang, payload)
return {"result": updated}
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
# Helper to read JSON
def read_json(path_obj):
if not path_obj.exists():
return {}
try:
return json.loads(path_obj.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
# Load metadata
metadata_base = read_json(package_root / "metadata.json")
messages_map = metadata_base.get("messages", {})
# Check if translation exists
if lang not in messages_map:
return {"result": False}
payload_content = payload.get("content", {})
target_path = package_root / messages_map[lang]
# Write based on whether it's a directory or file
if target_path.is_dir():
# Group messages by prefix for directory structure
target_path.mkdir(parents=True, exist_ok=True)
grouped = {}
for key, value in payload_content.items():
parts = key.split(".")
group = ".".join(parts[:2]) if len(parts) >= 2 else "root"
grouped.setdefault(group, {})[key] = value
# Remove old files not in desired set
existing = {path.stem for path in target_path.glob("*.json")}
desired = set(grouped.keys())
for name in existing - desired:
(target_path / f"{name}.json").unlink()
# Write grouped files
for name, entries in grouped.items():
file_path = target_path / f"{name}.json"
file_path.write_text(
json.dumps(entries, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8"
)
else:
# Write as single file
target_path.write_text(
json.dumps(payload_content, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8"
)
return {"result": True}

View File

@@ -1,6 +1,6 @@
"""Workflow plugin: write messages directory."""
import json
from pathlib import Path
from ....data.messages_io import write_messages_dir
def run(_runtime, inputs):
@@ -11,5 +11,28 @@ def run(_runtime, inputs):
if not base_dir:
return {"error": "base_dir is required"}
write_messages_dir(Path(base_dir), payload_content)
base_dir_path = Path(base_dir)
base_dir_path.mkdir(parents=True, exist_ok=True)
# Group messages by prefix
grouped = {}
for key, value in payload_content.items():
parts = key.split(".")
group = ".".join(parts[:2]) if len(parts) >= 2 else "root"
grouped.setdefault(group, {})[key] = value
# Remove old files not in desired set
existing = {path.stem for path in base_dir_path.glob("*.json")}
desired = set(grouped.keys())
for name in existing - desired:
(base_dir_path / f"{name}.json").unlink()
# Write grouped files
for name, entries in grouped.items():
target_path = base_dir_path / f"{name}.json"
target_path.write_text(
json.dumps(entries, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8"
)
return {"result": "Messages written successfully"}

View File

@@ -1,9 +1,11 @@
"""Workflow plugin: write prompt."""
from ....data.prompt import write_prompt
import os
from pathlib import Path
def run(_runtime, inputs):
"""Write prompt content to file."""
content = inputs.get("content", "")
write_prompt(content)
path = Path(os.environ.get("PROMPT_PATH", "prompt.yml"))
path.write_text(content or "", encoding="utf-8")
return {"result": "Prompt written successfully"}

View File

@@ -1,9 +1,15 @@
"""Workflow plugin: write workflow."""
from ....data.workflow import write_workflow
from pathlib import Path
from autometabuilder.loaders.metadata_loader import load_metadata
def run(_runtime, inputs):
"""Write workflow content to file."""
package_root = Path(__file__).resolve().parents[5] # backend/autometabuilder
content = inputs.get("content", "")
write_workflow(content)
metadata = load_metadata()
workflow_name = metadata.get("workflow_path", "workflow.json")
workflow_path = package_root / workflow_name
workflow_path.write_text(content or "", encoding="utf-8")
return {"result": "Workflow written successfully"}