Remove old data module files and create plugin wrapper layer

Co-authored-by: johndoe6345789 <224850594+johndoe6345789@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-01-10 20:59:57 +00:00
parent 2d1b80cb16
commit f2d6152e4e
12 changed files with 140 additions and 438 deletions

View File

@@ -1,30 +1,142 @@
"""Web module: Flask HTTP server and REST API backend.
"""Data access layer that delegates to workflow plugins.
This module provides the HTTP/REST API backend for the AutoMetabuilder frontend.
It serves the Next.js web UI by handling HTTP requests and managing web application state.
Key Components:
- server.py: Flask application setup and entry point
- routes/: HTTP endpoint handlers (6 blueprints, ~20 endpoints)
- data/: Data access functions shared with workflow plugins
- run_state.py: Bot execution state management
- workflow_graph.py: Workflow visualization for UI
Relationship with Workflow Plugins:
The web module and workflow plugins in workflow/plugins/web/ serve different purposes:
- Web module: External HTTP interface (frontend <-> backend)
- Workflow plugins: Internal workflow operations (workflow automation)
Both systems coexist and complement each other:
- Flask routes call data functions to serve HTTP responses
- Workflow plugins call the same data functions for workflow operations
- Data functions in web/data/ provide shared business logic
This module CANNOT be replaced by workflow plugins because:
1. Workflow plugins cannot run HTTP servers
2. Workflow plugins cannot handle web requests
3. Workflow plugins cannot serve as REST API backends
4. The frontend requires HTTP endpoints to function
See WEB_MODULE_ANALYSIS.md for detailed architecture documentation.
This module provides a simple API for data access by wrapping workflow plugins.
Routes and other code can import from here to access data functions.
"""
from autometabuilder.workflow.plugin_registry import PluginRegistry, load_plugin_map
from autometabuilder.workflow.runtime import WorkflowRuntime
import logging
# Create a minimal runtime for plugin execution
_logger = logging.getLogger(__name__)
class _SimpleLogger:
"""Minimal logger for plugin execution."""
def info(self, *args, **kwargs):
_logger.info(*args, **kwargs)
def debug(self, *args, **kwargs):
_logger.debug(*args, **kwargs)
def error(self, *args, **kwargs):
_logger.error(*args, **kwargs)
def _run_plugin(plugin_name, inputs=None):
"""Execute a workflow plugin and return its result."""
plugin_map = load_plugin_map()
registry = PluginRegistry(plugin_map)
runtime = WorkflowRuntime(
context={},
store={},
tool_runner=None,
logger=_SimpleLogger()
)
plugin = registry.get(plugin_name)
if not plugin:
raise RuntimeError(f"Plugin {plugin_name} not found")
result = plugin(runtime, inputs or {})
return result.get("result")
# Environment functions
def get_env_vars():
"""Get environment variables from .env file."""
return _run_plugin("web.get_env_vars")
def persist_env_vars(updates):
"""Persist environment variables to .env file."""
return _run_plugin("web.persist_env_vars", {"updates": updates})
# Log functions
def get_recent_logs(lines=50):
"""Get recent log entries."""
return _run_plugin("web.get_recent_logs", {"lines": lines})
# Navigation functions
def get_navigation_items():
"""Get navigation menu items."""
return _run_plugin("web.get_navigation_items")
# Prompt functions
def get_prompt_content():
"""Get prompt content from prompt file."""
return _run_plugin("web.get_prompt_content")
def write_prompt(content):
"""Write prompt content to file."""
return _run_plugin("web.write_prompt", {"content": content})
def build_prompt_yaml(system_content, user_content, model):
"""Build prompt YAML from components."""
return _run_plugin("web.build_prompt_yaml", {
"system_content": system_content,
"user_content": user_content,
"model": model
})
# Workflow functions
def get_workflow_content():
"""Get workflow content from workflow file."""
return _run_plugin("web.get_workflow_content")
def write_workflow(content):
"""Write workflow content to file."""
return _run_plugin("web.write_workflow", {"content": content})
def load_workflow_packages():
"""Load all workflow packages."""
return _run_plugin("web.load_workflow_packages")
def summarize_workflow_packages(packages):
"""Summarize workflow packages."""
return _run_plugin("web.summarize_workflow_packages", {"packages": packages})
# Translation functions
def list_translations():
"""List all available translations."""
return _run_plugin("web.list_translations")
def load_translation(lang):
"""Load translation for a specific language."""
return _run_plugin("web.load_translation", {"lang": lang})
def create_translation(lang):
"""Create a new translation."""
return _run_plugin("web.create_translation", {"lang": lang})
def update_translation(lang, payload):
"""Update an existing translation."""
return _run_plugin("web.update_translation", {"lang": lang, "payload": payload})
def delete_translation(lang):
"""Delete a translation."""
return _run_plugin("web.delete_translation", {"lang": lang})
def get_ui_messages(lang):
"""Get UI messages for a specific language with fallback."""
return _run_plugin("web.get_ui_messages", {"lang": lang})
# Metadata - still using loaders directly
from autometabuilder.loaders.metadata_loader import load_metadata

View File

@@ -1,29 +0,0 @@
from __future__ import annotations
from pathlib import Path
def get_env_vars() -> dict[str, str]:
env_path = Path(".env")
if not env_path.exists():
return {}
result: dict[str, str] = {}
for raw in env_path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
value = value.strip().strip("'\"")
result[key.strip()] = value
return result
def persist_env_vars(updates: dict[str, str]) -> None:
from dotenv import set_key
env_path = Path(".env")
env_path.touch(exist_ok=True)
for key, value in updates.items():
set_key(env_path, key, value)

View File

@@ -1,14 +0,0 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
def read_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}

View File

@@ -1,11 +0,0 @@
from __future__ import annotations
from .paths import LOG_FILE
def get_recent_logs(lines: int = 50) -> str:
if not LOG_FILE.exists():
return ""
with LOG_FILE.open("r", encoding="utf-8") as handle:
content = handle.readlines()
return "".join(content[-lines:])

View File

@@ -1,46 +0,0 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from .json_utils import read_json
from .paths import PACKAGE_ROOT
def load_messages(path: Path) -> dict[str, Any]:
if path.is_dir():
merged: dict[str, Any] = {}
for file_path in sorted(path.glob("*.json")):
merged.update(read_json(file_path))
return merged
return read_json(path)
def group_messages(payload_content: dict[str, Any]) -> dict[str, dict[str, Any]]:
grouped: dict[str, dict[str, Any]] = {}
for key, value in payload_content.items():
parts = key.split(".")
group = ".".join(parts[:2]) if len(parts) >= 2 else "root"
grouped.setdefault(group, {})[key] = value
return grouped
def write_messages_dir(base_dir: Path, payload_content: dict[str, Any]) -> None:
base_dir.mkdir(parents=True, exist_ok=True)
grouped = group_messages(payload_content)
existing = {path.stem for path in base_dir.glob("*.json")}
desired = set(grouped.keys())
for name in existing - desired:
(base_dir / f"{name}.json").unlink()
for name, entries in grouped.items():
target_path = base_dir / f"{name}.json"
target_path.write_text(json.dumps(entries, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def resolve_messages_target(messages_map: dict[str, str], lang: str) -> str:
if lang in messages_map:
return messages_map[lang]
if (PACKAGE_ROOT / "messages" / lang).exists():
return f"messages/{lang}"
return f"messages_{lang}.json"

View File

@@ -1,27 +0,0 @@
from __future__ import annotations
import json
from typing import Any
from autometabuilder.loaders.metadata_loader import load_metadata as load_metadata_full
from .json_utils import read_json
from .paths import PACKAGE_ROOT
def load_metadata() -> dict[str, Any]:
return load_metadata_full()
def load_metadata_base() -> dict[str, Any]:
metadata_path = PACKAGE_ROOT / "metadata.json"
return read_json(metadata_path)
def write_metadata(metadata: dict[str, Any]) -> None:
path = PACKAGE_ROOT / "metadata.json"
path.write_text(json.dumps(metadata, indent=2, ensure_ascii=False), encoding="utf-8")
def get_messages_map(metadata: dict[str, Any] | None = None) -> dict[str, str]:
metadata = metadata or load_metadata_base()
return metadata.get("messages", {})

View File

@@ -1,14 +0,0 @@
from __future__ import annotations
from typing import Any
from .json_utils import read_json
from .paths import PACKAGE_ROOT
def get_navigation_items() -> list[dict[str, Any]]:
nav_path = PACKAGE_ROOT / "web" / "navigation_items.json"
nav = read_json(nav_path)
if isinstance(nav, list):
return nav
return []

View File

@@ -1,74 +0,0 @@
"""Load workflow packages from npm-style package directories."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any, Dict, List
from .json_utils import read_json
logger = logging.getLogger(__name__)
def load_package(package_dir: Path) -> Dict[str, Any] | None:
"""Load a single workflow package."""
package_json = package_dir / "package.json"
if not package_json.exists():
logger.warning("Package %s missing package.json", package_dir.name)
return None
# Read package.json
pkg_data = read_json(package_json)
if not isinstance(pkg_data, dict):
logger.warning("Invalid package.json in %s", package_dir.name)
return None
# Read workflow file
workflow_file = pkg_data.get("main", "workflow.json")
workflow_path = package_dir / workflow_file
if not workflow_path.exists():
logger.warning("Workflow file %s not found in %s", workflow_file, package_dir.name)
return None
workflow_data = read_json(workflow_path)
if not isinstance(workflow_data, dict):
logger.warning("Invalid workflow in %s", package_dir.name)
return None
# Combine package metadata with workflow
metadata = pkg_data.get("metadata", {})
return {
"id": pkg_data.get("name", package_dir.name),
"name": pkg_data.get("name", package_dir.name),
"version": pkg_data.get("version", "1.0.0"),
"description": pkg_data.get("description", ""),
"author": pkg_data.get("author", ""),
"license": pkg_data.get("license", ""),
"keywords": pkg_data.get("keywords", []),
"label": metadata.get("label", package_dir.name),
"tags": metadata.get("tags", []),
"icon": metadata.get("icon", "workflow"),
"category": metadata.get("category", "templates"),
"workflow": workflow_data,
}
def load_all_packages(packages_dir: Path) -> List[Dict[str, Any]]:
"""Load all workflow packages from directory."""
if not packages_dir.exists():
logger.warning("Packages directory not found: %s", packages_dir)
return []
packages = []
for item in sorted(packages_dir.iterdir()):
if not item.is_dir():
continue
package = load_package(item)
if package:
packages.append(package)
logger.debug("Loaded %d workflow packages", len(packages))
return packages

View File

@@ -1,7 +0,0 @@
from __future__ import annotations
from pathlib import Path
PACKAGE_ROOT = Path(__file__).resolve().parents[2]
REPO_ROOT = PACKAGE_ROOT.parent.parent
LOG_FILE = REPO_ROOT / "autometabuilder.log"

View File

@@ -1,36 +0,0 @@
from __future__ import annotations
import os
from pathlib import Path
def build_prompt_yaml(system_content: str | None, user_content: str | None, model: str | None) -> str:
def indent_block(text: str | None) -> str:
if not text:
return ""
return "\n ".join(line.rstrip() for line in text.splitlines())
model_value = model or "openai/gpt-4o"
system_block = indent_block(system_content)
user_block = indent_block(user_content)
return f"""messages:
- role: system
content: >-
{system_block}
- role: user
content: >-
{user_block}
model: {model_value}
"""
def get_prompt_content() -> str:
path = Path(os.environ.get("PROMPT_PATH", "prompt.yml"))
if path.is_file():
return path.read_text(encoding="utf-8")
return ""
def write_prompt(content: str) -> None:
path = Path(os.environ.get("PROMPT_PATH", "prompt.yml"))
path.write_text(content or "", encoding="utf-8")

View File

@@ -1,99 +0,0 @@
from __future__ import annotations
import json
import shutil
from typing import Any
from .messages_io import load_messages, resolve_messages_target, write_messages_dir
from .metadata import get_messages_map, load_metadata_base, write_metadata
from .paths import PACKAGE_ROOT
def load_translation(lang: str) -> dict[str, Any]:
messages_map = get_messages_map()
target = resolve_messages_target(messages_map, lang)
if not target:
return {}
return load_messages(PACKAGE_ROOT / target)
def list_translations() -> dict[str, str]:
messages_map = get_messages_map()
if messages_map:
return messages_map
fallback = {}
for candidate in PACKAGE_ROOT.glob("messages_*.json"):
name = candidate.name
language = name.removeprefix("messages_").removesuffix(".json")
fallback[language] = name
messages_dir = PACKAGE_ROOT / "messages"
if messages_dir.exists():
for candidate in messages_dir.iterdir():
if candidate.is_dir():
fallback[candidate.name] = f"messages/{candidate.name}"
return fallback
def get_ui_messages(lang: str) -> dict[str, Any]:
messages_map = get_messages_map()
base_name = resolve_messages_target(messages_map, "en")
base = load_messages(PACKAGE_ROOT / base_name)
localized = load_messages(PACKAGE_ROOT / resolve_messages_target(messages_map, lang))
merged = dict(base)
merged.update(localized)
merged["__lang"] = lang
return merged
def create_translation(lang: str) -> bool:
messages_map = get_messages_map()
if lang in messages_map:
return False
base = resolve_messages_target(messages_map, "en")
base_file = PACKAGE_ROOT / base
if not base_file.exists():
return False
if base_file.is_dir():
target_name = f"messages/{lang}"
target_path = PACKAGE_ROOT / target_name
shutil.copytree(base_file, target_path)
else:
target_name = f"messages_{lang}.json"
target_path = PACKAGE_ROOT / target_name
shutil.copy(base_file, target_path)
messages_map[lang] = target_name
metadata = load_metadata_base()
metadata["messages"] = messages_map
write_metadata(metadata)
return True
def delete_translation(lang: str) -> bool:
if lang == "en":
return False
messages_map = get_messages_map()
if lang not in messages_map:
return False
target = PACKAGE_ROOT / messages_map[lang]
if target.exists():
if target.is_dir():
shutil.rmtree(target)
else:
target.unlink()
del messages_map[lang]
metadata = load_metadata_base()
metadata["messages"] = messages_map
write_metadata(metadata)
return True
def update_translation(lang: str, payload: dict[str, Any]) -> bool:
messages_map = get_messages_map()
if lang not in messages_map:
return False
payload_content = payload.get("content", {})
target_path = PACKAGE_ROOT / messages_map[lang]
if target_path.is_dir():
write_messages_dir(target_path, payload_content)
else:
target_path.write_text(json.dumps(payload_content, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
return True

View File

@@ -1,53 +0,0 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Iterable
from .json_utils import read_json
from .metadata import load_metadata
from .package_loader import load_all_packages
from .paths import PACKAGE_ROOT
def get_workflow_content() -> str:
metadata = load_metadata()
workflow_name = metadata.get("workflow_path", "workflow.json")
workflow_path = PACKAGE_ROOT / workflow_name
if workflow_path.exists():
return workflow_path.read_text(encoding="utf-8")
return ""
def write_workflow(content: str) -> None:
metadata = load_metadata()
workflow_name = metadata.get("workflow_path", "workflow.json")
workflow_path = PACKAGE_ROOT / workflow_name
workflow_path.write_text(content or "", encoding="utf-8")
def get_workflow_packages_dir() -> Path:
metadata = load_metadata()
packages_name = metadata.get("workflow_packages_path", "packages")
return PACKAGE_ROOT / packages_name
def load_workflow_packages() -> list[dict[str, Any]]:
packages_dir = get_workflow_packages_dir()
return load_all_packages(packages_dir)
def summarize_workflow_packages(packages: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
summary = []
for pkg in packages:
summary.append(
{
"id": pkg["id"],
"name": pkg.get("name", pkg["id"]),
"label": pkg.get("label") or pkg["id"],
"description": pkg.get("description", ""),
"tags": pkg.get("tags", []),
"version": pkg.get("version", "1.0.0"),
"category": pkg.get("category", "templates"),
}
)
return summary