Files
2026-01-10 21:51:38 +00:00

143 lines
3.9 KiB
Python

"""Data access layer that delegates to workflow plugins.
This module provides a simple API for data access by wrapping workflow plugins.
Routes and other code can import from here to access data functions.
"""
from autometabuilder.workflow.plugin_registry import PluginRegistry, load_plugin_map
from autometabuilder.workflow.runtime import WorkflowRuntime
import logging
# Create a minimal runtime for plugin execution
_logger = logging.getLogger(__name__)
class _SimpleLogger:
"""Minimal logger for plugin execution."""
def info(self, *args, **kwargs):
_logger.info(*args, **kwargs)
def debug(self, *args, **kwargs):
_logger.debug(*args, **kwargs)
def error(self, *args, **kwargs):
_logger.error(*args, **kwargs)
def _run_plugin(plugin_name, inputs=None):
"""Execute a workflow plugin and return its result."""
plugin_map = load_plugin_map()
registry = PluginRegistry(plugin_map)
runtime = WorkflowRuntime(
context={},
store={},
tool_runner=None,
logger=_SimpleLogger()
)
plugin = registry.get(plugin_name)
if not plugin:
raise RuntimeError(f"Plugin {plugin_name} not found")
result = plugin(runtime, inputs or {})
return result.get("result")
# Environment functions
def get_env_vars():
"""Get environment variables from .env file."""
return _run_plugin("web.get_env_vars")
def persist_env_vars(updates):
"""Persist environment variables to .env file."""
return _run_plugin("web.persist_env_vars", {"updates": updates})
# Log functions
def get_recent_logs(lines=50):
"""Get recent log entries."""
return _run_plugin("web.get_recent_logs", {"lines": lines})
# Navigation functions
def get_navigation_items():
"""Get navigation menu items."""
return _run_plugin("web.get_navigation_items")
# Prompt functions
def get_prompt_content():
"""Get prompt content from prompt file."""
return _run_plugin("web.get_prompt_content")
def write_prompt(content):
"""Write prompt content to file."""
return _run_plugin("web.write_prompt", {"content": content})
def build_prompt_yaml(system_content, user_content, model):
"""Build prompt YAML from components."""
return _run_plugin("web.build_prompt_yaml", {
"system_content": system_content,
"user_content": user_content,
"model": model
})
# Workflow functions
def get_workflow_content():
"""Get workflow content from workflow file."""
return _run_plugin("web.get_workflow_content")
def write_workflow(content):
"""Write workflow content to file."""
return _run_plugin("web.write_workflow", {"content": content})
def load_workflow_packages():
"""Load all workflow packages."""
return _run_plugin("web.load_workflow_packages")
def summarize_workflow_packages(packages):
"""Summarize workflow packages."""
return _run_plugin("web.summarize_workflow_packages", {"packages": packages})
# Translation functions
def list_translations():
"""List all available translations."""
return _run_plugin("web.list_translations")
def load_translation(lang):
"""Load translation for a specific language."""
return _run_plugin("web.load_translation", {"lang": lang})
def create_translation(lang):
"""Create a new translation."""
return _run_plugin("web.create_translation", {"lang": lang})
def update_translation(lang, payload):
"""Update an existing translation."""
return _run_plugin("web.update_translation", {"lang": lang, "payload": payload})
def delete_translation(lang):
"""Delete a translation."""
return _run_plugin("web.delete_translation", {"lang": lang})
def get_ui_messages(lang):
"""Get UI messages for a specific language with fallback."""
return _run_plugin("web.get_ui_messages", {"lang": lang})
# Metadata - utility function
from autometabuilder.utils import load_metadata