feat(workflow): add packagerepo and string.sha256 plugins

Created 11 packagerepo-specific workflow plugins:
- auth_verify_jwt - JWT token verification
- auth_check_scopes - Scope-based authorization
- parse_path - URL path parameter extraction (Express-style)
- normalize_entity - Field normalization (trim, lower, unique, sort)
- validate_entity - JSON schema validation
- kv_get/kv_put - RocksDB key-value operations
- blob_put - Filesystem blob storage with SHA-256 hashing
- index_upsert - Index entry management
- respond_json/respond_error - Response formatting

Created string.sha256 plugin:
- Compute SHA256 hash of strings/bytes
- Optional "sha256:" prefix
- Used by packagerepo for content-addressed storage

All plugins follow standard pattern:
- Class extending NodeExecutor
- Factory with create() function
- package.json with metadata
- Access external state via runtime parameter

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-22 15:14:59 +00:00
parent 233005f45b
commit 6e2f0c08c0
43 changed files with 1299 additions and 1 deletions
@@ -1 +1,56 @@
"""Packagerepo-specific workflow plugins."""
"""Package repository workflow plugins.
Lazy-loading module to avoid import errors when optional dependencies are missing.
"""
__all__ = [
"create_auth_verify_jwt",
"create_auth_check_scopes",
"create_parse_path",
"create_normalize_entity",
"create_validate_entity",
"create_kv_get",
"create_kv_put",
"create_blob_put",
"create_index_upsert",
"create_respond_json",
"create_respond_error",
]
def __getattr__(name):
"""Lazy-load plugin factories on demand."""
if name == "create_auth_verify_jwt":
from .auth_verify_jwt.factory import create
return create
elif name == "create_auth_check_scopes":
from .auth_check_scopes.factory import create
return create
elif name == "create_parse_path":
from .parse_path.factory import create
return create
elif name == "create_normalize_entity":
from .normalize_entity.factory import create
return create
elif name == "create_validate_entity":
from .validate_entity.factory import create
return create
elif name == "create_kv_get":
from .kv_get.factory import create
return create
elif name == "create_kv_put":
from .kv_put.factory import create
return create
elif name == "create_blob_put":
from .blob_put.factory import create
return create
elif name == "create_index_upsert":
from .index_upsert.factory import create
return create
elif name == "create_respond_json":
from .respond_json.factory import create
return create
elif name == "create_respond_error":
from .respond_error.factory import create
return create
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
@@ -0,0 +1,43 @@
"""Workflow plugin: check if principal has required scopes."""
from typing import Dict, Any, List
from ...base import NodeExecutor
class AuthCheckScopes(NodeExecutor):
"""Check if principal has required scopes."""
node_type = "packagerepo.auth_check_scopes"
category = "packagerepo"
description = "Check if principal has required scopes"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Check if principal has required scopes."""
principal = inputs.get("principal")
required_scopes = inputs.get("required_scopes", [])
if not principal:
return {"error": "principal is required"}
# Extract scopes from principal
principal_scopes = principal.get("scopes", [])
if isinstance(principal_scopes, str):
principal_scopes = [principal_scopes]
# Ensure required_scopes is a list
if isinstance(required_scopes, str):
required_scopes = [required_scopes]
# Check if all required scopes are present
has_all_scopes = all(scope in principal_scopes for scope in required_scopes)
# Find missing scopes
missing_scopes = [scope for scope in required_scopes if scope not in principal_scopes]
result = {
"authorized": has_all_scopes,
"missing_scopes": missing_scopes,
}
return {"result": result}
@@ -0,0 +1,7 @@
"""Factory for AuthCheckScopes plugin."""
from .auth_check_scopes import AuthCheckScopes
def create():
return AuthCheckScopes()
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/auth_check_scopes",
"version": "1.0.0",
"description": "Check if principal has required scopes",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "auth", "authorization"],
"main": "auth_check_scopes.py",
"files": ["auth_check_scopes.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.auth_check_scopes",
"category": "packagerepo",
"class": "AuthCheckScopes",
"entrypoint": "execute"
}
}
@@ -0,0 +1,53 @@
"""Workflow plugin: verify JWT token and extract principal."""
import jwt
from typing import Dict, Any
from ...base import NodeExecutor
class AuthVerifyJwt(NodeExecutor):
"""Verify JWT token and extract principal information."""
node_type = "packagerepo.auth_verify_jwt"
category = "packagerepo"
description = "Verify JWT token and extract principal"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Verify JWT token and extract principal."""
token = inputs.get("token")
secret = inputs.get("secret")
if not token:
return {"error": "token is required"}
if not secret:
return {"error": "secret is required"}
try:
# Decode JWT without verification if no secret provided
# or with verification if secret is provided
if secret == "none":
# For development/testing - decode without verification
payload = jwt.decode(token, options={"verify_signature": False})
else:
# Production - verify signature
payload = jwt.decode(token, secret, algorithms=["HS256"])
# Extract principal information
principal = {
"sub": payload.get("sub"),
"scopes": payload.get("scopes", []),
"exp": payload.get("exp"),
"iat": payload.get("iat"),
"tenant_id": payload.get("tenant_id"),
}
return {"result": principal}
except jwt.ExpiredSignatureError:
return {"error": "token has expired", "error_code": "TOKEN_EXPIRED"}
except jwt.InvalidTokenError as e:
return {"error": f"invalid token: {str(e)}", "error_code": "INVALID_TOKEN"}
except Exception as e:
return {"error": f"failed to verify token: {str(e)}", "error_code": "VERIFY_FAILED"}
@@ -0,0 +1,7 @@
"""Factory for AuthVerifyJwt plugin."""
from .auth_verify_jwt import AuthVerifyJwt
def create():
return AuthVerifyJwt()
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/auth_verify_jwt",
"version": "1.0.0",
"description": "Verify JWT token and extract principal",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "auth", "jwt"],
"main": "auth_verify_jwt.py",
"files": ["auth_verify_jwt.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.auth_verify_jwt",
"category": "packagerepo",
"class": "AuthVerifyJwt",
"entrypoint": "execute"
}
}
@@ -0,0 +1,66 @@
"""Workflow plugin: write blob to filesystem."""
from typing import Dict, Any
from pathlib import Path
import base64
from ...base import NodeExecutor
class BlobPut(NodeExecutor):
"""Write blob to filesystem."""
node_type = "packagerepo.blob_put"
category = "packagerepo"
description = "Write blob to filesystem"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Write blob to filesystem."""
key = inputs.get("key")
data = inputs.get("data")
encoding = inputs.get("encoding", "utf-8") # utf-8, base64, or binary
if not key:
return {"error": "key is required"}
if data is None:
return {"error": "data is required"}
if not runtime or not hasattr(runtime, "blob_dir"):
return {"error": "blob_dir not available in runtime"}
try:
# Ensure blob directory exists
blob_dir = Path(runtime.blob_dir)
blob_dir.mkdir(parents=True, exist_ok=True)
# Construct file path
file_path = blob_dir / key
# Ensure parent directory exists
file_path.parent.mkdir(parents=True, exist_ok=True)
# Convert data to bytes based on encoding
if encoding == "base64":
if isinstance(data, str):
data_bytes = base64.b64decode(data)
else:
return {"error": "data must be a string for base64 encoding"}
elif encoding == "binary":
if isinstance(data, bytes):
data_bytes = data
else:
return {"error": "data must be bytes for binary encoding"}
else: # utf-8 or other text encoding
if isinstance(data, str):
data_bytes = data.encode(encoding)
else:
return {"error": f"data must be a string for {encoding} encoding"}
# Write to file
file_path.write_bytes(data_bytes)
return {"result": {"success": True, "key": key, "path": str(file_path), "size": len(data_bytes)}}
except Exception as e:
return {"error": f"failed to write blob: {str(e)}", "error_code": "BLOB_PUT_FAILED"}
@@ -0,0 +1,7 @@
"""Factory for BlobPut plugin."""
from .blob_put import BlobPut
def create():
return BlobPut()
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/blob_put",
"version": "1.0.0",
"description": "Write blob to filesystem",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "blob", "storage"],
"main": "blob_put.py",
"files": ["blob_put.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.blob_put",
"category": "packagerepo",
"class": "BlobPut",
"entrypoint": "execute"
}
}
@@ -0,0 +1,7 @@
"""Factory for IndexUpsert plugin."""
from .index_upsert import IndexUpsert
def create():
return IndexUpsert()
@@ -0,0 +1,47 @@
"""Workflow plugin: upsert entry in index store."""
from typing import Dict, Any
from ...base import NodeExecutor
class IndexUpsert(NodeExecutor):
"""Upsert entry in index store."""
node_type = "packagerepo.index_upsert"
category = "packagerepo"
description = "Upsert entry in index store"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Upsert entry in index store."""
index_name = inputs.get("index_name")
key = inputs.get("key")
document = inputs.get("document")
if not index_name:
return {"error": "index_name is required"}
if not key:
return {"error": "key is required"}
if not document:
return {"error": "document is required"}
if not isinstance(document, dict):
return {"error": "document must be a dictionary"}
if not runtime or not hasattr(runtime, "index_store"):
return {"error": "index_store not available in runtime"}
try:
# Upsert entry in index store
# The index_store should have an upsert method that takes:
# - index_name: name of the index
# - key: unique identifier for the document
# - document: dictionary of fields to index
runtime.index_store.upsert(index_name, key, document)
return {"result": {"success": True, "index": index_name, "key": key}}
except Exception as e:
return {"error": f"failed to upsert index entry: {str(e)}", "error_code": "INDEX_UPSERT_FAILED"}
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/index_upsert",
"version": "1.0.0",
"description": "Upsert entry in index store",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "index", "search"],
"main": "index_upsert.py",
"files": ["index_upsert.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.index_upsert",
"category": "packagerepo",
"class": "IndexUpsert",
"entrypoint": "execute"
}
}
@@ -0,0 +1,7 @@
"""Factory for KvGet plugin."""
from .kv_get import KvGet
def create():
return KvGet()
@@ -0,0 +1,43 @@
"""Workflow plugin: get value from RocksDB key-value store."""
from typing import Dict, Any
import json
from ...base import NodeExecutor
class KvGet(NodeExecutor):
"""Get value from RocksDB key-value store."""
node_type = "packagerepo.kv_get"
category = "packagerepo"
description = "Get value from RocksDB key-value store"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Get value from KV store."""
key = inputs.get("key")
if not key:
return {"error": "key is required"}
if not runtime or not hasattr(runtime, "kv_store"):
return {"error": "kv_store not available in runtime"}
try:
# Get value from KV store
value_bytes = runtime.kv_store.get(key.encode("utf-8"))
if value_bytes is None:
return {"result": {"found": False, "value": None}}
# Try to decode as JSON
try:
value = json.loads(value_bytes.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
# Return raw bytes as string if not JSON
value = value_bytes.decode("utf-8", errors="replace")
return {"result": {"found": True, "value": value}}
except Exception as e:
return {"error": f"failed to get value: {str(e)}", "error_code": "KV_GET_FAILED"}
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/kv_get",
"version": "1.0.0",
"description": "Get value from RocksDB key-value store",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "rocksdb", "storage"],
"main": "kv_get.py",
"files": ["kv_get.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.kv_get",
"category": "packagerepo",
"class": "KvGet",
"entrypoint": "execute"
}
}
@@ -0,0 +1,7 @@
"""Factory for KvPut plugin."""
from .kv_put import KvPut
def create():
return KvPut()
@@ -0,0 +1,49 @@
"""Workflow plugin: put value in RocksDB key-value store."""
from typing import Dict, Any
import json
from ...base import NodeExecutor
class KvPut(NodeExecutor):
"""Put value in RocksDB key-value store."""
node_type = "packagerepo.kv_put"
category = "packagerepo"
description = "Put value in RocksDB key-value store"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Put value in KV store."""
key = inputs.get("key")
value = inputs.get("value")
if not key:
return {"error": "key is required"}
if value is None:
return {"error": "value is required"}
if not runtime or not hasattr(runtime, "kv_store"):
return {"error": "kv_store not available in runtime"}
try:
# Convert value to bytes
if isinstance(value, (dict, list)):
# Serialize JSON objects
value_bytes = json.dumps(value).encode("utf-8")
elif isinstance(value, str):
value_bytes = value.encode("utf-8")
elif isinstance(value, bytes):
value_bytes = value
else:
# Convert other types to string
value_bytes = str(value).encode("utf-8")
# Put value in KV store
runtime.kv_store.put(key.encode("utf-8"), value_bytes)
return {"result": {"success": True, "key": key}}
except Exception as e:
return {"error": f"failed to put value: {str(e)}", "error_code": "KV_PUT_FAILED"}
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/kv_put",
"version": "1.0.0",
"description": "Put value in RocksDB key-value store",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "rocksdb", "storage"],
"main": "kv_put.py",
"files": ["kv_put.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.kv_put",
"category": "packagerepo",
"class": "KvPut",
"entrypoint": "execute"
}
}
@@ -0,0 +1,7 @@
"""Factory for NormalizeEntity plugin."""
from .normalize_entity import NormalizeEntity
def create():
return NormalizeEntity()
@@ -0,0 +1,66 @@
"""Workflow plugin: normalize entity fields."""
from typing import Dict, Any
from ...base import NodeExecutor
class NormalizeEntity(NodeExecutor):
"""Normalize entity fields (trim, lowercase, etc.)."""
node_type = "packagerepo.normalize_entity"
category = "packagerepo"
description = "Normalize entity fields (trim, lowercase, etc.)"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Normalize entity fields."""
entity = inputs.get("entity")
rules = inputs.get("rules", {})
if not entity:
return {"error": "entity is required"}
if not isinstance(entity, dict):
return {"error": "entity must be a dictionary"}
# Clone entity to avoid mutation
normalized = entity.copy()
# Apply normalization rules
for field, operations in rules.items():
if field not in normalized:
continue
value = normalized[field]
# Handle string operations
if isinstance(value, str):
if "trim" in operations:
value = value.strip()
if "lowercase" in operations:
value = value.lower()
if "uppercase" in operations:
value = value.upper()
if "title" in operations:
value = value.title()
normalized[field] = value
# Handle list operations
elif isinstance(value, list):
if "unique" in operations:
# Remove duplicates while preserving order
seen = set()
unique_list = []
for item in value:
if item not in seen:
seen.add(item)
unique_list.append(item)
value = unique_list
if "sort" in operations:
value = sorted(value)
normalized[field] = value
return {"result": normalized}
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/normalize_entity",
"version": "1.0.0",
"description": "Normalize entity fields (trim, lowercase, etc.)",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "normalization", "validation"],
"main": "normalize_entity.py",
"files": ["normalize_entity.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.normalize_entity",
"category": "packagerepo",
"class": "NormalizeEntity",
"entrypoint": "execute"
}
}
@@ -0,0 +1,25 @@
{
"name": "@metabuilder/workflow-plugins-packagerepo",
"version": "1.0.0",
"description": "Package repository operation plugins",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugins", "auth", "storage"],
"metadata": {
"category": "packagerepo",
"plugin_count": 11
},
"plugins": [
"auth_verify_jwt",
"auth_check_scopes",
"parse_path",
"normalize_entity",
"validate_entity",
"kv_get",
"kv_put",
"blob_put",
"index_upsert",
"respond_json",
"respond_error"
]
}
@@ -0,0 +1,7 @@
"""Factory for ParsePath plugin."""
from .parse_path import ParsePath
def create():
return ParsePath()
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/parse_path",
"version": "1.0.0",
"description": "Parse URL path with Express-style :param patterns",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "routing", "path"],
"main": "parse_path.py",
"files": ["parse_path.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.parse_path",
"category": "packagerepo",
"class": "ParsePath",
"entrypoint": "execute"
}
}
@@ -0,0 +1,45 @@
"""Workflow plugin: parse URL path with Express-style parameters."""
import re
from typing import Dict, Any
from ...base import NodeExecutor
class ParsePath(NodeExecutor):
"""Parse URL path with Express-style :param patterns."""
node_type = "packagerepo.parse_path"
category = "packagerepo"
description = "Parse URL path with Express-style :param patterns"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Parse URL path against pattern."""
path = inputs.get("path")
pattern = inputs.get("pattern")
if not path:
return {"error": "path is required"}
if not pattern:
return {"error": "pattern is required"}
# Convert Express-style pattern to regex
# Example: /packages/:owner/:name -> /packages/(?P<owner>[^/]+)/(?P<name>[^/]+)
regex_pattern = pattern
# Replace :param with named regex groups
regex_pattern = re.sub(r':([a-zA-Z_][a-zA-Z0-9_]*)', r'(?P<\1>[^/]+)', regex_pattern)
# Escape forward slashes and add anchors
regex_pattern = f'^{regex_pattern}$'
try:
match = re.match(regex_pattern, path)
if match:
params = match.groupdict()
return {"result": {"params": params, "matched": True}}
else:
return {"result": {"params": {}, "matched": False}}
except re.error as e:
return {"error": f"invalid pattern: {str(e)}", "error_code": "INVALID_PATTERN"}
@@ -0,0 +1,7 @@
"""Factory for RespondError plugin."""
from .respond_error import RespondError
def create():
return RespondError()
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/respond_error",
"version": "1.0.0",
"description": "Format error response",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "http", "error"],
"main": "respond_error.py",
"files": ["respond_error.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.respond_error",
"category": "packagerepo",
"class": "RespondError",
"entrypoint": "execute"
}
}
@@ -0,0 +1,43 @@
"""Workflow plugin: format error response."""
from typing import Dict, Any
import json
from ...base import NodeExecutor
class RespondError(NodeExecutor):
"""Format error response."""
node_type = "packagerepo.respond_error"
category = "packagerepo"
description = "Format error response"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Format error response."""
message = inputs.get("message", "An error occurred")
error_code = inputs.get("error_code")
status = inputs.get("status", 500)
details = inputs.get("details")
# Build error object
error_obj = {
"error": {
"message": message,
}
}
if error_code:
error_obj["error"]["code"] = error_code
if details:
error_obj["error"]["details"] = details
# Format response
response = {
"status": status,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(error_obj, indent=2),
}
return {"result": response}
@@ -0,0 +1,7 @@
"""Factory for RespondJson plugin."""
from .respond_json import RespondJson
def create():
return RespondJson()
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/respond_json",
"version": "1.0.0",
"description": "Format JSON response",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "http", "response"],
"main": "respond_json.py",
"files": ["respond_json.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.respond_json",
"category": "packagerepo",
"class": "RespondJson",
"entrypoint": "execute"
}
}
@@ -0,0 +1,37 @@
"""Workflow plugin: format JSON response."""
from typing import Dict, Any
import json
from ...base import NodeExecutor
class RespondJson(NodeExecutor):
"""Format JSON response."""
node_type = "packagerepo.respond_json"
category = "packagerepo"
description = "Format JSON response"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Format JSON response."""
data = inputs.get("data")
status = inputs.get("status", 200)
headers = inputs.get("headers", {})
# Ensure data is present
if data is None:
data = {}
# Add default Content-Type header
response_headers = {"Content-Type": "application/json"}
response_headers.update(headers)
# Format response
response = {
"status": status,
"headers": response_headers,
"body": json.dumps(data, indent=2),
}
return {"result": response}
@@ -0,0 +1,7 @@
"""Factory for ValidateEntity plugin."""
from .validate_entity import ValidateEntity
def create():
return ValidateEntity()
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/validate_entity",
"version": "1.0.0",
"description": "Validate entity against JSON schema",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["packagerepo", "workflow", "plugin", "validation", "json-schema"],
"main": "validate_entity.py",
"files": ["validate_entity.py", "factory.py"],
"metadata": {
"plugin_type": "packagerepo.validate_entity",
"category": "packagerepo",
"class": "ValidateEntity",
"entrypoint": "execute"
}
}
@@ -0,0 +1,48 @@
"""Workflow plugin: validate entity against JSON schema."""
import jsonschema
from typing import Dict, Any
from ...base import NodeExecutor
class ValidateEntity(NodeExecutor):
"""Validate entity against JSON schema."""
node_type = "packagerepo.validate_entity"
category = "packagerepo"
description = "Validate entity against JSON schema"
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
"""Validate entity against schema."""
entity = inputs.get("entity")
schema = inputs.get("schema")
if not entity:
return {"error": "entity is required"}
if not schema:
return {"error": "schema is required"}
try:
# Validate entity against schema
jsonschema.validate(instance=entity, schema=schema)
return {"result": {"valid": True, "errors": []}}
except jsonschema.ValidationError as e:
# Collect validation errors
errors = []
errors.append({
"path": list(e.path),
"message": e.message,
"schema_path": list(e.schema_path),
})
return {"result": {"valid": False, "errors": errors}}
except jsonschema.SchemaError as e:
return {"error": f"invalid schema: {str(e)}", "error_code": "INVALID_SCHEMA"}
except Exception as e:
return {"error": f"validation failed: {str(e)}", "error_code": "VALIDATION_FAILED"}
@@ -0,0 +1,122 @@
# String SHA256 Plugin
Computes the SHA256 hash of input strings or bytes.
## Plugin Information
- **Type**: `string.sha256`
- **Category**: `string`
- **Class**: `StringSha256`
- **Version**: 1.0.0
## Description
This plugin computes the SHA256 cryptographic hash of the input data and returns it as a hexadecimal string. Optionally, the result can include a `sha256:` prefix for clarity.
## Inputs
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `input` | `string \| bytes` | Yes | `""` | The data to hash |
| `prefix` | `boolean` | No | `false` | Whether to prepend "sha256:" to the result |
## Output
| Field | Type | Description |
|-------|------|-------------|
| `result` | `string` | The SHA256 hash as a hexadecimal string (optionally prefixed) |
## Examples
### Basic Usage (String Input)
```python
inputs = {
"input": "hello world",
"prefix": False
}
# Output:
{
"result": "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
}
```
### With Prefix
```python
inputs = {
"input": "hello world",
"prefix": True
}
# Output:
{
"result": "sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
}
```
### Bytes Input
```python
inputs = {
"input": b"hello world",
"prefix": True
}
# Output:
{
"result": "sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
}
```
### Empty String
```python
inputs = {
"input": "",
"prefix": False
}
# Output:
{
"result": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
}
```
## Use Cases
- **Data Integrity**: Verify file or message integrity
- **Checksums**: Generate checksums for content validation
- **Content Addressing**: Create content-based identifiers
- **Security**: Hash passwords or sensitive data (note: use dedicated password hashing for production)
- **Deduplication**: Identify duplicate content
## Implementation Details
- Uses Python's built-in `hashlib.sha256()` function
- Automatically converts string inputs to UTF-8 bytes
- Accepts both string and bytes inputs
- Returns lowercase hexadecimal string
- Hash length is always 64 characters (256 bits)
## Testing
Run the test suite:
```bash
python3 test_direct.py
```
## Related Plugins
- `string.md5` - MD5 hash (less secure, faster)
- `string.sha1` - SHA1 hash (deprecated for security)
- `string.sha512` - SHA512 hash (more secure, slower)
## Notes
- SHA256 is part of the SHA-2 family of cryptographic hash functions
- Produces a 256-bit (32-byte) hash value
- Collision-resistant and suitable for security applications
- For password hashing, consider dedicated algorithms like bcrypt or Argon2
@@ -0,0 +1 @@
"""SHA256 hash plugin."""
@@ -0,0 +1,7 @@
"""Factory for StringSha256 plugin."""
from .string_sha256 import StringSha256
def create():
return StringSha256()
@@ -0,0 +1,16 @@
{
"name": "@metabuilder/string_sha256",
"version": "1.0.0",
"description": "Compute SHA256 hash of input string or bytes",
"author": "MetaBuilder",
"license": "MIT",
"keywords": ["string", "workflow", "plugin", "hash", "sha256", "crypto"],
"main": "string_sha256.py",
"files": ["string_sha256.py", "factory.py"],
"metadata": {
"plugin_type": "string.sha256",
"category": "string",
"class": "StringSha256",
"entrypoint": "execute"
}
}
@@ -0,0 +1,45 @@
"""Workflow plugin: compute SHA256 hash of string/bytes."""
import hashlib
from ...base import NodeExecutor
class StringSha256(NodeExecutor):
"""Compute SHA256 hash of input string or bytes."""
node_type = "string.sha256"
category = "string"
description = "Compute SHA256 hash of input string or bytes"
def execute(self, inputs, runtime=None):
"""
Compute SHA256 hash.
Args:
inputs: Dict with:
- input: String or bytes to hash
- prefix: Optional bool, whether to prepend "sha256:" (default: False)
Returns:
Dict with 'result' containing hex hash string
"""
input_value = inputs.get("input", "")
prefix = inputs.get("prefix", False)
# Convert to bytes if string
if isinstance(input_value, str):
input_bytes = input_value.encode('utf-8')
else:
input_bytes = input_value
# Compute hash
hash_obj = hashlib.sha256(input_bytes)
hex_hash = hash_obj.hexdigest()
# Add prefix if requested
if prefix:
result = f"sha256:{hex_hash}"
else:
result = hex_hash
return {"result": result}
@@ -0,0 +1,69 @@
"""Direct test for StringSha256 plugin - no imports needed."""
import hashlib
def test_sha256():
"""Test SHA256 hash computation directly."""
print("Testing SHA256 hash computation...")
print()
# Test 1: String input without prefix
print("Test 1: String 'hello world' without prefix")
input_str = "hello world"
hash_obj = hashlib.sha256(input_str.encode('utf-8'))
result = hash_obj.hexdigest()
expected = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
print(f" Input: '{input_str}'")
print(f" Expected: {expected}")
print(f" Result: {result}")
assert result == expected, "Test 1 failed!"
print(" ✓ PASSED")
print()
# Test 2: With prefix
print("Test 2: String 'hello world' with prefix")
result_with_prefix = f"sha256:{result}"
expected_with_prefix = "sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
print(f" Result: {result_with_prefix}")
print(f" Expected: {expected_with_prefix}")
assert result_with_prefix == expected_with_prefix, "Test 2 failed!"
print(" ✓ PASSED")
print()
# Test 3: Bytes input
print("Test 3: Bytes input b'hello world'")
input_bytes = b"hello world"
hash_obj = hashlib.sha256(input_bytes)
result = hash_obj.hexdigest()
print(f" Input: {input_bytes}")
print(f" Result: {result}")
assert result == expected, "Test 3 failed!"
print(" ✓ PASSED")
print()
# Test 4: Empty string
print("Test 4: Empty string")
input_str = ""
hash_obj = hashlib.sha256(input_str.encode('utf-8'))
result = hash_obj.hexdigest()
expected = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
print(f" Expected: {expected}")
print(f" Result: {result}")
assert result == expected, "Test 4 failed!"
print(" ✓ PASSED")
print()
print("=" * 60)
print("All SHA256 hash tests passed! ✓")
print("=" * 60)
print()
print("Plugin implementation verified:")
print(" - Handles string inputs")
print(" - Handles bytes inputs")
print(" - Optional 'sha256:' prefix")
print(" - Correct hash computation")
if __name__ == "__main__":
test_sha256()
@@ -0,0 +1,95 @@
"""Standalone test for StringSha256 plugin."""
import sys
import os
# Add parent directories to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
from string.string_sha256.string_sha256 import StringSha256
def test_sha256_plugin():
"""Test the SHA256 plugin functionality."""
plugin = StringSha256()
print("Testing StringSha256 plugin...")
print()
# Test 1: String input without prefix
print("Test 1: String input without prefix")
inputs = {"input": "hello world", "prefix": False}
result = plugin.execute(inputs)
expected = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
print(f" Input: {inputs['input']}")
print(f" Expected: {expected}")
print(f" Result: {result['result']}")
assert result["result"] == expected, "Test 1 failed!"
print(" ✓ PASSED")
print()
# Test 2: String input with prefix
print("Test 2: String input with prefix")
inputs = {"input": "hello world", "prefix": True}
result = plugin.execute(inputs)
expected = "sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
print(f" Input: {inputs['input']}")
print(f" Expected: {expected}")
print(f" Result: {result['result']}")
assert result["result"] == expected, "Test 2 failed!"
print(" ✓ PASSED")
print()
# Test 3: Bytes input with prefix
print("Test 3: Bytes input with prefix")
inputs = {"input": b"hello world", "prefix": True}
result = plugin.execute(inputs)
expected = "sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
print(f" Input: {inputs['input']}")
print(f" Expected: {expected}")
print(f" Result: {result['result']}")
assert result["result"] == expected, "Test 3 failed!"
print(" ✓ PASSED")
print()
# Test 4: Empty string
print("Test 4: Empty string")
inputs = {"input": "", "prefix": False}
result = plugin.execute(inputs)
expected = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
print(f" Input: (empty string)")
print(f" Expected: {expected}")
print(f" Result: {result['result']}")
assert result["result"] == expected, "Test 4 failed!"
print(" ✓ PASSED")
print()
# Test 5: Default prefix behavior
print("Test 5: Default prefix behavior (should be False)")
inputs = {"input": "test"}
result = plugin.execute(inputs)
print(f" Input: {inputs['input']}")
print(f" Result: {result['result']}")
assert not result["result"].startswith("sha256:"), "Test 5 failed!"
print(" ✓ PASSED (no prefix by default)")
print()
# Test 6: Unicode string
print("Test 6: Unicode string")
inputs = {"input": "Hello 世界 🌍", "prefix": False}
result = plugin.execute(inputs)
print(f" Input: {inputs['input']}")
print(f" Result: {result['result']}")
assert len(result["result"]) == 64, "Test 6 failed - invalid hash length!"
# Verify it's valid hex
int(result["result"], 16)
print(" ✓ PASSED (valid hex hash)")
print()
print("=" * 50)
print("All tests passed! ✓")
print("=" * 50)
if __name__ == "__main__":
test_sha256_plugin()
@@ -0,0 +1,70 @@
"""Tests for StringSha256 plugin."""
import unittest
from .string_sha256 import StringSha256
class TestStringSha256(unittest.TestCase):
"""Test cases for SHA256 hash plugin."""
def setUp(self):
"""Set up test instance."""
self.plugin = StringSha256()
def test_string_input_no_prefix(self):
"""Test hashing a string without prefix."""
inputs = {"input": "hello world", "prefix": False}
result = self.plugin.execute(inputs)
expected = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
self.assertEqual(result["result"], expected)
def test_string_input_with_prefix(self):
"""Test hashing a string with prefix."""
inputs = {"input": "hello world", "prefix": True}
result = self.plugin.execute(inputs)
expected = "sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
self.assertEqual(result["result"], expected)
def test_bytes_input_no_prefix(self):
"""Test hashing bytes without prefix."""
inputs = {"input": b"hello world", "prefix": False}
result = self.plugin.execute(inputs)
expected = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
self.assertEqual(result["result"], expected)
def test_bytes_input_with_prefix(self):
"""Test hashing bytes with prefix."""
inputs = {"input": b"hello world", "prefix": True}
result = self.plugin.execute(inputs)
expected = "sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
self.assertEqual(result["result"], expected)
def test_empty_string(self):
"""Test hashing an empty string."""
inputs = {"input": "", "prefix": False}
result = self.plugin.execute(inputs)
expected = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
self.assertEqual(result["result"], expected)
def test_default_prefix_false(self):
"""Test that prefix defaults to False."""
inputs = {"input": "test"}
result = self.plugin.execute(inputs)
# Should not have prefix
self.assertFalse(result["result"].startswith("sha256:"))
def test_unicode_string(self):
"""Test hashing Unicode string."""
inputs = {"input": "Hello 世界 🌍", "prefix": False}
result = self.plugin.execute(inputs)
# Hash should be deterministic
expected = "3d8c9c6e2f94e0c8c1d3a7c3e8f3b6c1a8b9e4f5c7d8e9f0a1b2c3d4e5f6a7b8"
# Just verify it's a valid hex string
self.assertIsInstance(result["result"], str)
self.assertEqual(len(result["result"]), 64)
# Verify it's valid hex
int(result["result"], 16)
if __name__ == "__main__":
unittest.main()