mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-04-25 22:34:56 +00:00
Created 11 packagerepo-specific workflow plugins: - auth_verify_jwt - JWT token verification - auth_check_scopes - Scope-based authorization - parse_path - URL path parameter extraction (Express-style) - normalize_entity - Field normalization (trim, lower, unique, sort) - validate_entity - JSON schema validation - kv_get/kv_put - RocksDB key-value operations - blob_put - Filesystem blob storage with SHA-256 hashing - index_upsert - Index entry management - respond_json/respond_error - Response formatting Created string.sha256 plugin: - Compute SHA256 hash of strings/bytes - Optional "sha256:" prefix - Used by packagerepo for content-addressed storage All plugins follow standard pattern: - Class extending NodeExecutor - Factory with create() function - package.json with metadata - Access external state via runtime parameter Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
48 lines
1.6 KiB
Python
48 lines
1.6 KiB
Python
"""Workflow plugin: upsert entry in index store."""
|
|
|
|
from typing import Dict, Any
|
|
|
|
from ...base import NodeExecutor
|
|
|
|
|
|
class IndexUpsert(NodeExecutor):
|
|
"""Upsert entry in index store."""
|
|
|
|
node_type = "packagerepo.index_upsert"
|
|
category = "packagerepo"
|
|
description = "Upsert entry in index store"
|
|
|
|
def execute(self, inputs: Dict[str, Any], runtime: Any = None) -> Dict[str, Any]:
|
|
"""Upsert entry in index store."""
|
|
index_name = inputs.get("index_name")
|
|
key = inputs.get("key")
|
|
document = inputs.get("document")
|
|
|
|
if not index_name:
|
|
return {"error": "index_name is required"}
|
|
|
|
if not key:
|
|
return {"error": "key is required"}
|
|
|
|
if not document:
|
|
return {"error": "document is required"}
|
|
|
|
if not isinstance(document, dict):
|
|
return {"error": "document must be a dictionary"}
|
|
|
|
if not runtime or not hasattr(runtime, "index_store"):
|
|
return {"error": "index_store not available in runtime"}
|
|
|
|
try:
|
|
# Upsert entry in index store
|
|
# The index_store should have an upsert method that takes:
|
|
# - index_name: name of the index
|
|
# - key: unique identifier for the document
|
|
# - document: dictionary of fields to index
|
|
runtime.index_store.upsert(index_name, key, document)
|
|
|
|
return {"result": {"success": True, "index": index_name, "key": key}}
|
|
|
|
except Exception as e:
|
|
return {"error": f"failed to upsert index entry: {str(e)}", "error_code": "INDEX_UPSERT_FAILED"}
|