Files
goodpackagerepo/backend/operations.py

569 lines
22 KiB
Python

"""
Operation Executor - Implements the closed-world operation vocabulary.
This module provides the actual implementation for all operations defined
in the schema.json operations vocabulary. Each operation is a function that
can be executed as part of a pipeline.
"""
import re
import json
import hashlib
import requests
from datetime import datetime
from typing import Dict, Any, Optional, List
from pathlib import Path
class ExecutionContext:
"""Context for pipeline execution with variable storage."""
def __init__(self, request_data: Dict[str, Any], principal: Optional[Dict[str, Any]] = None):
self.variables = {}
self.request_data = request_data
self.principal = principal or {}
self.transaction_active = False
self.response = None
def set_var(self, name: str, value: Any) -> None:
"""Set a variable in the execution context."""
self.variables[name] = value
def get_var(self, name: str) -> Any:
"""Get a variable from the execution context."""
# Handle special variable references
if name.startswith('$'):
return self.variables.get(name[1:])
return self.variables.get(name)
def interpolate(self, template: str) -> str:
"""Interpolate variables in a template string."""
# Replace {field} with request data
result = template
for key, value in self.request_data.items():
result = result.replace(f"{{{key}}}", str(value))
# Replace $variable with context variables
for key, value in self.variables.items():
result = result.replace(f"${key}", str(value))
# Replace {principal.field}
for key, value in self.principal.items():
result = result.replace(f"{{principal.{key}}}", str(value))
return result
class OperationExecutor:
"""Executor for pipeline operations."""
def __init__(self, kv_store: Dict, index_store: Dict, blob_dir: Path):
self.kv_store = kv_store
self.index_store = index_store
self.blob_dir = blob_dir
self.cache_store = {}
self.event_log = []
# ========================================================================
# Authentication Operations
# ========================================================================
def auth_require_scopes(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Require specific authentication scopes."""
required_scopes = args.get('scopes', [])
user_scopes = ctx.principal.get('scopes', [])
# Check if user has any of the required scopes
if not any(scope in user_scopes for scope in required_scopes):
raise PermissionError(f"Required scopes: {required_scopes}")
# ========================================================================
# Parsing Operations
# ========================================================================
def parse_path(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Parse path parameters into entity fields."""
# Path parameters are already in ctx.request_data from Flask routing
pass
def parse_query(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Parse query parameters."""
out = args.get('out', 'query_params')
query_params = ctx.request_data.get('query_params', {})
ctx.set_var(out, query_params)
def parse_json(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Parse JSON request body."""
out = args.get('out', 'body')
body = ctx.request_data.get('body', {})
ctx.set_var(out, body)
# ========================================================================
# Normalization and Validation Operations
# ========================================================================
def normalize_entity(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Normalize entity fields according to schema rules."""
entity_type = args.get('entity', 'artifact')
# Normalization rules: trim, lower, replace
# This is handled by the normalize_entity function in app.py
pass
def validate_entity(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Validate entity against schema constraints."""
entity_type = args.get('entity', 'artifact')
# Validation is handled by the validate_entity function in app.py
pass
def validate_json_schema(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Validate data against JSON schema."""
import jsonschema
schema = args.get('schema', {})
value = self._resolve_value(ctx, args.get('value'))
try:
jsonschema.validate(value, schema)
except jsonschema.ValidationError as e:
raise ValueError(f"JSON schema validation failed: {e.message}")
# ========================================================================
# Transaction Operations
# ========================================================================
def txn_begin(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Begin a transaction."""
if ctx.transaction_active:
raise RuntimeError("Transaction already active")
ctx.transaction_active = True
def txn_commit(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Commit the current transaction."""
if not ctx.transaction_active:
raise RuntimeError("No active transaction")
ctx.transaction_active = False
# In production, this would commit to the actual database
def txn_abort(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Abort the current transaction."""
if not ctx.transaction_active:
raise RuntimeError("No active transaction")
ctx.transaction_active = False
# In production, this would rollback the database transaction
# ========================================================================
# Key-Value Store Operations
# ========================================================================
def kv_get(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Get value from KV store."""
doc = args.get('doc')
key_template = args.get('key')
out = args.get('out')
key = ctx.interpolate(key_template)
value = self.kv_store.get(key)
ctx.set_var(out, value)
def kv_put(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Put value into KV store."""
doc = args.get('doc')
key_template = args.get('key')
value = self._resolve_value(ctx, args.get('value'))
key = ctx.interpolate(key_template)
self.kv_store[key] = value
def kv_cas_put(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Compare-and-swap put into KV store."""
doc = args.get('doc')
key_template = args.get('key')
value = self._resolve_value(ctx, args.get('value'))
if_absent = args.get('if_absent', False)
key = ctx.interpolate(key_template)
if if_absent and key in self.kv_store:
raise ValueError(f"CAS operation failed: Key '{key}' already exists (if_absent constraint violated)")
self.kv_store[key] = value
def kv_delete(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Delete from KV store."""
doc = args.get('doc')
key_template = args.get('key')
key = ctx.interpolate(key_template)
if key in self.kv_store:
del self.kv_store[key]
# ========================================================================
# Blob Store Operations
# ========================================================================
def blob_get(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Get blob from blob store."""
store = args.get('store', 'primary')
digest_template = args.get('digest')
out = args.get('out')
digest = ctx.interpolate(digest_template)
clean_digest = digest.replace('sha256:', '')
blob_path = self.blob_dir / clean_digest[:2] / clean_digest[2:4] / clean_digest
if not blob_path.exists():
raise FileNotFoundError(f"Blob not found: {digest}")
with open(blob_path, 'rb') as f:
blob_data = f.read()
ctx.set_var(out, blob_data)
def blob_put(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Put blob into blob store."""
store = args.get('store', 'primary')
from_source = args.get('from')
out_digest = args.get('out', 'digest')
out_size = args.get('out_size', 'blob_size')
# Get blob data
if from_source == 'request.body':
blob_data = ctx.request_data.get('body_bytes', b'')
else:
blob_data = self._resolve_value(ctx, from_source)
if isinstance(blob_data, str):
blob_data = blob_data.encode('utf-8')
# Compute digest
digest = 'sha256:' + hashlib.sha256(blob_data).hexdigest()
blob_size = len(blob_data)
# Store blob
clean_digest = digest.replace('sha256:', '')
blob_path = self.blob_dir / clean_digest[:2] / clean_digest[2:4] / clean_digest
blob_path.parent.mkdir(parents=True, exist_ok=True)
with open(blob_path, 'wb') as f:
f.write(blob_data)
ctx.set_var(out_digest, digest)
ctx.set_var(out_size, blob_size)
def blob_verify_digest(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Verify blob digest."""
digest_template = args.get('digest')
algo = args.get('algo', 'sha256')
digest = ctx.interpolate(digest_template)
# In a full implementation, this would verify the digest matches the blob
if not digest.startswith(algo + ':'):
raise ValueError(f"Invalid digest format for {algo}")
# ========================================================================
# Index Operations
# ========================================================================
def index_query(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Query an index."""
index_name = args.get('index')
key = args.get('key', {})
limit = args.get('limit', 100)
out = args.get('out')
# Interpolate key fields
index_key_parts = []
for k, v in key.items():
interpolated = ctx.interpolate(v) if isinstance(v, str) else v
index_key_parts.append(str(interpolated))
index_key = '/'.join(index_key_parts)
rows = self.index_store.get(index_key, [])[:limit]
ctx.set_var(out, rows)
def index_upsert(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Insert or update index entry."""
index_name = args.get('index')
key = args.get('key', {})
value = self._resolve_value(ctx, args.get('value'))
# Build index key
index_key_parts = []
for k, v in key.items():
interpolated = ctx.interpolate(v) if isinstance(v, str) else v
index_key_parts.append(str(interpolated))
index_key = '/'.join(index_key_parts)
if index_key not in self.index_store:
self.index_store[index_key] = []
self.index_store[index_key].append(value)
def index_delete(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Delete from index."""
index_name = args.get('index')
key = args.get('key', {})
# Build index key
index_key_parts = []
for k, v in key.items():
interpolated = ctx.interpolate(v) if isinstance(v, str) else v
index_key_parts.append(str(interpolated))
index_key = '/'.join(index_key_parts)
if index_key in self.index_store:
del self.index_store[index_key]
# ========================================================================
# Cache Operations
# ========================================================================
def cache_get(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Get from cache."""
kind = args.get('kind', 'response')
key_template = args.get('key')
hit_out = args.get('hit_out', 'cache_hit')
value_out = args.get('value_out', 'cached_value')
key = ctx.interpolate(key_template)
cache_key = f"{kind}:{key}"
if cache_key in self.cache_store:
ctx.set_var(hit_out, True)
ctx.set_var(value_out, self.cache_store[cache_key])
else:
ctx.set_var(hit_out, False)
ctx.set_var(value_out, None)
def cache_put(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Put into cache."""
kind = args.get('kind', 'response')
key_template = args.get('key')
ttl_seconds = args.get('ttl_seconds', 300)
value = self._resolve_value(ctx, args.get('value'))
key = ctx.interpolate(key_template)
cache_key = f"{kind}:{key}"
self.cache_store[cache_key] = value
# In production, would set TTL on the cache entry
# ========================================================================
# Proxy Operations
# ========================================================================
def proxy_fetch(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Fetch from upstream proxy.
Note: This is a placeholder implementation. In production, this would:
1. Look up the upstream configuration from schema
2. Make an actual HTTP request with proper timeouts and retries
3. Handle authentication based on upstream.auth settings
4. Return the actual response
"""
upstream = args.get('upstream')
method = args.get('method', 'GET')
path_template = args.get('path')
out = args.get('out')
path = ctx.interpolate(path_template)
# TODO: Implement actual proxy fetch with requests library
# upstream_config = get_upstream_config(upstream)
# response = requests.request(
# method=method,
# url=upstream_config['base_url'] + path,
# timeout=(upstream_config['timeouts_ms']['connect']/1000,
# upstream_config['timeouts_ms']['read']/1000)
# )
# Placeholder response for now
response = {
'status': 200,
'body': None,
'headers': {}
}
ctx.set_var(out, response)
# ========================================================================
# Response Operations
# ========================================================================
def respond_json(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Return JSON response."""
if not self._check_condition(ctx, args.get('when')):
return
status = args.get('status', 200)
body = self._resolve_value(ctx, args.get('body'))
ctx.response = {
'type': 'json',
'status': status,
'body': body
}
def respond_bytes(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Return binary response."""
if not self._check_condition(ctx, args.get('when')):
return
status = args.get('status', 200)
body = self._resolve_value(ctx, args.get('body'))
headers = self._resolve_value(ctx, args.get('headers', {}))
ctx.response = {
'type': 'bytes',
'status': status,
'body': body,
'headers': headers
}
def respond_redirect(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Return redirect response."""
if not self._check_condition(ctx, args.get('when')):
return
status = args.get('status', 307)
location = ctx.interpolate(args.get('location'))
ctx.response = {
'type': 'redirect',
'status': status,
'location': location
}
def respond_error(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Return error response."""
if not self._check_condition(ctx, args.get('when')):
return
status = args.get('status', 400)
code = args.get('code', 'ERROR')
message = args.get('message', 'An error occurred')
ctx.response = {
'type': 'error',
'status': status,
'body': {
'error': {
'code': code,
'message': message
}
}
}
# ========================================================================
# Event Operations
# ========================================================================
def emit_event(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Emit an event to the event log."""
event_type = args.get('type')
payload = self._resolve_value(ctx, args.get('payload'))
event = {
'type': event_type,
'payload': payload,
'timestamp': datetime.utcnow().isoformat() + 'Z'
}
self.event_log.append(event)
# ========================================================================
# Utility Operations
# ========================================================================
def time_now_iso8601(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Get current time in ISO8601 format."""
out = args.get('out', 'now')
now = datetime.utcnow().isoformat() + 'Z'
ctx.set_var(out, now)
def string_format(self, ctx: ExecutionContext, args: Dict[str, Any]) -> None:
"""Format string with variable interpolation."""
template = args.get('template', '')
out = args.get('out', 'formatted')
result = ctx.interpolate(template)
ctx.set_var(out, result)
# ========================================================================
# Helper Methods
# ========================================================================
def _resolve_value(self, ctx: ExecutionContext, value: Any) -> Any:
"""Resolve a value, handling variable references and interpolation."""
if isinstance(value, str):
# Check if it's a variable reference
if value.startswith('$'):
return ctx.get_var(value[1:])
# Otherwise interpolate
return ctx.interpolate(value)
elif isinstance(value, dict):
# Recursively resolve dict values
return {k: self._resolve_value(ctx, v) for k, v in value.items()}
elif isinstance(value, list):
# Recursively resolve list items
return [self._resolve_value(ctx, item) for item in value]
else:
return value
def _check_condition(self, ctx: ExecutionContext, condition: Optional[Dict]) -> bool:
"""Check if a condition is met."""
if not condition:
return True
# Handle various condition types
if 'equals' in condition:
values = condition['equals']
v1 = self._resolve_value(ctx, values[0])
v2 = self._resolve_value(ctx, values[1])
return v1 == v2
if 'is_null' in condition:
value = self._resolve_value(ctx, condition['is_null'])
return value is None
if 'is_not_null' in condition:
value = self._resolve_value(ctx, condition['is_not_null'])
return value is not None
if 'is_empty' in condition:
value = self._resolve_value(ctx, condition['is_empty'])
return not value
if 'not_in' in condition:
check_value = self._resolve_value(ctx, condition['not_in'][0])
check_list = self._resolve_value(ctx, condition['not_in'][1])
return check_value not in check_list
return True
# ========================================================================
# Pipeline Execution
# ========================================================================
def execute_pipeline(self, pipeline: List[Dict[str, Any]], ctx: ExecutionContext) -> Optional[Dict]:
"""Execute a complete pipeline."""
for step in pipeline:
# Stop if we already have a response
if ctx.response:
break
op_name = step.get('op')
args = step.get('args', {})
# Get the operation method
method_name = op_name.replace('.', '_')
method = getattr(self, method_name, None)
if not method:
raise NotImplementedError(f"Operation {op_name} not implemented")
# Execute the operation
method(ctx, args)
return ctx.response