Files
metabuilder/workflow/plugins/python/notifications/notifications_slack.py
johndoe6345789 5ac579a2ed feat: Add Python plugins from AutoMetabuilder + restructure workflow folder
Restructure workflow/ for multi-language plugin support:
- Rename src/ to core/ (engine code: DAG executor, registry, types)
- Create executor/{cpp,python,ts}/ for language-specific runtimes
- Consolidate plugins to plugins/{ts,python}/ by language then category

Add 80+ Python plugins from AutoMetabuilder in 14 categories:
- control: bot control, switch logic, state management
- convert: type conversions (json, boolean, dict, list, number, string)
- core: AI requests, context management, tool calls
- dict: dictionary operations (get, set, keys, values, merge)
- list: list operations (concat, find, sort, slice, filter)
- logic: boolean logic (and, or, xor, equals, comparisons)
- math: arithmetic operations (add, subtract, multiply, power, etc.)
- string: string manipulation (concat, split, replace, format)
- notifications: Slack, Discord integrations
- test: assertion helpers and test suite runner
- tools: file operations, git, docker, testing utilities
- utils: filtering, mapping, reducing, condition branching
- var: variable store operations (get, set, delete, exists)
- web: Flask server, environment variables, JSON handling

Add language executor runtimes:
- TypeScript: direct import execution (default, fast startup)
- Python: child process with JSON stdin/stdout communication
- C++: placeholder for native FFI bindings (Phase 3)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 16:08:08 +00:00

47 lines
1.4 KiB
Python

"""Workflow plugin: send Slack notification."""
import os
import logging
logger = logging.getLogger("metabuilder.notifications")
def run(runtime, inputs):
"""Send a notification to Slack.
Inputs:
message: The message to send
channel: Optional channel (defaults to SLACK_CHANNEL env var)
Returns:
dict: Contains success status and any error message
"""
message = inputs.get("message", "")
channel = inputs.get("channel") or os.environ.get("SLACK_CHANNEL")
client = runtime.context.get("slack_client")
if not client:
logger.warning("Slack notification skipped: Slack client not initialized.")
return {
"success": False,
"skipped": True,
"error": "Slack client not initialized"
}
if not channel:
logger.warning("Slack notification skipped: SLACK_CHANNEL missing.")
return {
"success": False,
"skipped": True,
"error": "SLACK_CHANNEL missing"
}
try:
from slack_sdk.errors import SlackApiError
client.chat_postMessage(channel=channel, text=message)
logger.info("Slack notification sent successfully.")
return {"success": True, "message": "Slack notification sent"}
except SlackApiError as e:
logger.error(f"Error sending Slack notification: {e}")
return {"success": False, "error": str(e)}