mirror of
https://github.com/johndoe6345789/AutoMetabuilder.git
synced 2026-04-25 14:25:05 +00:00
143 lines
3.9 KiB
Python
143 lines
3.9 KiB
Python
"""Data access layer that delegates to workflow plugins.
|
|
|
|
This module provides a simple API for data access by wrapping workflow plugins.
|
|
Routes and other code can import from here to access data functions.
|
|
"""
|
|
|
|
from autometabuilder.workflow.plugin_registry import PluginRegistry, load_plugin_map
|
|
from autometabuilder.workflow.runtime import WorkflowRuntime
|
|
import logging
|
|
|
|
# Create a minimal runtime for plugin execution
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class _SimpleLogger:
|
|
"""Minimal logger for plugin execution."""
|
|
def info(self, *args, **kwargs):
|
|
_logger.info(*args, **kwargs)
|
|
|
|
def debug(self, *args, **kwargs):
|
|
_logger.debug(*args, **kwargs)
|
|
|
|
def error(self, *args, **kwargs):
|
|
_logger.error(*args, **kwargs)
|
|
|
|
|
|
def _run_plugin(plugin_name, inputs=None):
|
|
"""Execute a workflow plugin and return its result."""
|
|
plugin_map = load_plugin_map()
|
|
registry = PluginRegistry(plugin_map)
|
|
runtime = WorkflowRuntime(
|
|
context={},
|
|
store={},
|
|
tool_runner=None,
|
|
logger=_SimpleLogger()
|
|
)
|
|
|
|
plugin = registry.get(plugin_name)
|
|
if not plugin:
|
|
raise RuntimeError(f"Plugin {plugin_name} not found")
|
|
|
|
result = plugin(runtime, inputs or {})
|
|
return result.get("result")
|
|
|
|
|
|
# Environment functions
|
|
def get_env_vars():
|
|
"""Get environment variables from .env file."""
|
|
return _run_plugin("web.get_env_vars")
|
|
|
|
|
|
def persist_env_vars(updates):
|
|
"""Persist environment variables to .env file."""
|
|
return _run_plugin("web.persist_env_vars", {"updates": updates})
|
|
|
|
|
|
# Log functions
|
|
def get_recent_logs(lines=50):
|
|
"""Get recent log entries."""
|
|
return _run_plugin("web.get_recent_logs", {"lines": lines})
|
|
|
|
|
|
# Navigation functions
|
|
def get_navigation_items():
|
|
"""Get navigation menu items."""
|
|
return _run_plugin("web.get_navigation_items")
|
|
|
|
|
|
# Prompt functions
|
|
def get_prompt_content():
|
|
"""Get prompt content from prompt file."""
|
|
return _run_plugin("web.get_prompt_content")
|
|
|
|
|
|
def write_prompt(content):
|
|
"""Write prompt content to file."""
|
|
return _run_plugin("web.write_prompt", {"content": content})
|
|
|
|
|
|
def build_prompt_yaml(system_content, user_content, model):
|
|
"""Build prompt YAML from components."""
|
|
return _run_plugin("web.build_prompt_yaml", {
|
|
"system_content": system_content,
|
|
"user_content": user_content,
|
|
"model": model
|
|
})
|
|
|
|
|
|
# Workflow functions
|
|
def get_workflow_content():
|
|
"""Get workflow content from workflow file."""
|
|
return _run_plugin("web.get_workflow_content")
|
|
|
|
|
|
def write_workflow(content):
|
|
"""Write workflow content to file."""
|
|
return _run_plugin("web.write_workflow", {"content": content})
|
|
|
|
|
|
def load_workflow_packages():
|
|
"""Load all workflow packages."""
|
|
return _run_plugin("web.load_workflow_packages")
|
|
|
|
|
|
def summarize_workflow_packages(packages):
|
|
"""Summarize workflow packages."""
|
|
return _run_plugin("web.summarize_workflow_packages", {"packages": packages})
|
|
|
|
|
|
# Translation functions
|
|
def list_translations():
|
|
"""List all available translations."""
|
|
return _run_plugin("web.list_translations")
|
|
|
|
|
|
def load_translation(lang):
|
|
"""Load translation for a specific language."""
|
|
return _run_plugin("web.load_translation", {"lang": lang})
|
|
|
|
|
|
def create_translation(lang):
|
|
"""Create a new translation."""
|
|
return _run_plugin("web.create_translation", {"lang": lang})
|
|
|
|
|
|
def update_translation(lang, payload):
|
|
"""Update an existing translation."""
|
|
return _run_plugin("web.update_translation", {"lang": lang, "payload": payload})
|
|
|
|
|
|
def delete_translation(lang):
|
|
"""Delete a translation."""
|
|
return _run_plugin("web.delete_translation", {"lang": lang})
|
|
|
|
|
|
def get_ui_messages(lang):
|
|
"""Get UI messages for a specific language with fallback."""
|
|
return _run_plugin("web.get_ui_messages", {"lang": lang})
|
|
|
|
|
|
# Metadata - utility function
|
|
from autometabuilder.utils import load_metadata
|