Files
goodpackagerepo/tests/validate_schema_compliance.py

451 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Schema Compliance Validator
This script validates that the operation implementation matches
the spirit and intent of the schema.json specification.
"""
import sys
import json
import inspect
from pathlib import Path
# Add backend to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'backend'))
from operations import OperationExecutor
def check_operation_coverage():
"""Check that all schema operations are implemented."""
print("=" * 70)
print("1. Operation Coverage Check")
print("=" * 70)
# Load schema
schema_path = Path(__file__).parent.parent / 'schema.json'
with open(schema_path) as f:
schema = json.load(f)
allowed_ops = set(schema['ops']['allowed'])
# Get implemented operations
executor = OperationExecutor({}, {}, Path('/tmp'))
implemented_ops = set()
for name, method in inspect.getmembers(executor, predicate=inspect.ismethod):
if not name.startswith('_') and name != 'execute_pipeline':
# Convert method name back to operation name
# e.g., auth_require_scopes -> auth.require_scopes
# Replace first underscore with dot, keep rest as underscores
if '_' in name:
category, rest = name.split('_', 1)
op_name = f"{category}.{rest}"
implemented_ops.add(op_name)
print(f"\nSchema defines {len(allowed_ops)} operations")
print(f"Implementation provides {len(implemented_ops)} operations\n")
# Check for missing operations
missing = allowed_ops - implemented_ops
if missing:
print("❌ Missing implementations:")
for op in sorted(missing):
print(f" - {op}")
else:
print("✅ All schema operations are implemented!")
# Check for extra operations
extra = implemented_ops - allowed_ops
if extra:
print("\n⚠️ Extra operations not in schema:")
for op in sorted(extra):
print(f" - {op}")
return len(missing) == 0
def check_route_compatibility():
"""Check that operations work with real route pipelines."""
print("\n" + "=" * 70)
print("2. Route Pipeline Compatibility Check")
print("=" * 70)
schema_path = Path(__file__).parent.parent / 'schema.json'
with open(schema_path) as f:
schema = json.load(f)
from operations import ExecutionContext
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp/test'))
routes = schema['api']['routes']
all_valid = True
for route in routes:
route_id = route['id']
pipeline = route['pipeline']
print(f"\nRoute: {route_id}")
print(f" Method: {route['method']} {route['path']}")
# Check each operation in the pipeline
valid_ops = 0
for step in pipeline:
op_name = step['op']
method_name = op_name.replace('.', '_')
if hasattr(executor, method_name):
valid_ops += 1
else:
print(f" ❌ Operation not implemented: {op_name}")
all_valid = False
if valid_ops == len(pipeline):
print(f" ✅ All {valid_ops} operations implemented")
return all_valid
def check_operation_semantics():
"""Check that operations follow schema semantics."""
print("\n" + "=" * 70)
print("3. Operation Semantics Check")
print("=" * 70)
schema_path = Path(__file__).parent.parent / 'schema.json'
with open(schema_path) as f:
schema = json.load(f)
from operations import ExecutionContext
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp/test'))
print("\n✓ Checking transaction semantics...")
ctx = ExecutionContext({}, {})
# Test transaction semantics
try:
executor.txn_begin(ctx, {'isolation': 'serializable'})
if not ctx.transaction_active:
print(" ❌ txn.begin did not set transaction_active")
return False
print(" ✅ txn.begin correctly sets transaction state")
executor.txn_commit(ctx, {})
if ctx.transaction_active:
print(" ❌ txn.commit did not clear transaction_active")
return False
print(" ✅ txn.commit correctly clears transaction state")
except Exception as e:
print(f" ❌ Transaction operations failed: {e}")
return False
# Test kv.cas_put semantics (if_absent)
print("\n✓ Checking kv.cas_put semantics (if_absent behavior)...")
ctx = ExecutionContext({'key': 'test'}, {})
executor.kv_store['data/test'] = 'existing'
try:
executor.kv_cas_put(ctx, {
'doc': 'test',
'key': 'data/test',
'if_absent': True,
'value': 'new'
})
print(" ❌ kv.cas_put should fail when if_absent=True and key exists")
return False
except ValueError:
print(" ✅ kv.cas_put correctly enforces if_absent constraint")
# Test cache semantics
print("\n✓ Checking cache hit/miss semantics...")
ctx = ExecutionContext({'name': 'test'}, {})
executor.cache_get(ctx, {
'kind': 'response',
'key': 'nonexistent',
'hit_out': 'hit',
'value_out': 'val'
})
if ctx.get_var('hit') != False:
print(" ❌ cache.get should return hit=False for missing keys")
return False
print(" ✅ cache.get correctly handles cache misses")
# Test conditional responses
print("\n✓ Checking conditional response semantics...")
ctx = ExecutionContext({}, {})
ctx.set_var('item', None)
executor.respond_error(ctx, {
'when': {'is_null': '$item'},
'status': 404,
'code': 'NOT_FOUND',
'message': 'Not found'
})
if not ctx.response or ctx.response['status'] != 404:
print(" ❌ Conditional response not working correctly")
return False
print(" ✅ Conditional responses work correctly")
# Test variable interpolation
print("\n✓ Checking variable interpolation...")
ctx = ExecutionContext({'namespace': 'acme', 'name': 'pkg'}, {'sub': 'user1'})
interpolated = ctx.interpolate('artifact/{namespace}/{name} by {principal.sub}')
if interpolated != 'artifact/acme/pkg by user1':
print(f" ❌ Interpolation failed: {interpolated}")
return False
print(" ✅ Variable interpolation works correctly")
return True
def check_storage_semantics():
"""Check storage operation semantics match schema."""
print("\n" + "=" * 70)
print("4. Storage Semantics Check")
print("=" * 70)
schema_path = Path(__file__).parent.parent / 'schema.json'
with open(schema_path) as f:
schema = json.load(f)
from operations import ExecutionContext
import tempfile
# Check blob store semantics
print("\n✓ Checking blob store semantics...")
with tempfile.TemporaryDirectory() as tmpdir:
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path(tmpdir))
ctx = ExecutionContext({'body_bytes': b'test data'}, {})
# Put blob
executor.blob_put(ctx, {
'store': 'primary',
'from': 'request.body',
'out': 'digest',
'out_size': 'size'
})
digest = ctx.get_var('digest')
if not digest or not digest.startswith('sha256:'):
print(" ❌ blob.put should return sha256 digest")
return False
print(f" ✅ blob.put returns content-addressed digest: {digest[:20]}...")
# Verify blob is stored with content-addressing path structure
clean_digest = digest.replace('sha256:', '')
expected_path = Path(tmpdir) / clean_digest[:2] / clean_digest[2:4] / clean_digest
if not expected_path.exists():
print(f" ❌ Blob not stored at expected path: {expected_path}")
return False
print(" ✅ Blob stored with content-addressed path structure")
# Get blob back
ctx2 = ExecutionContext({}, {})
ctx2.set_var('digest_val', digest)
executor.blob_get(ctx2, {
'store': 'primary',
'digest': '$digest_val',
'out': 'content'
})
content = ctx2.get_var('content')
if content != b'test data':
print(" ❌ blob.get returned incorrect content")
return False
print(" ✅ blob.get retrieves correct content")
# Check document store semantics
print("\n✓ Checking document store (KV) semantics...")
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp'))
# Schema defines document configs like artifact_meta
doc_configs = schema['storage']['documents']
for doc_name, doc_config in doc_configs.items():
key_template = doc_config['key_template']
print(f" - Document type: {doc_name}")
print(f" Key template: {key_template}")
print(" ✅ Document store key templates match schema patterns")
# Check index semantics
print("\n✓ Checking index semantics...")
indexes = schema['indexes']
for index_name, index_config in indexes.items():
source = index_config['source_document']
keys = index_config['keys']
print(f" - Index: {index_name}")
print(f" Source: {source}")
print(f" Keys: {[k['name'] for k in keys]}")
print(" ✅ Index structures match schema definitions")
return True
def check_auth_semantics():
"""Check authentication/authorization semantics."""
print("\n" + "=" * 70)
print("5. Authentication & Authorization Check")
print("=" * 70)
schema_path = Path(__file__).parent.parent / 'schema.json'
with open(schema_path) as f:
schema = json.load(f)
from operations import ExecutionContext
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp'))
# Check scope definitions
print("\n✓ Checking scope definitions...")
auth_scopes = schema['auth']['scopes']
for scope in auth_scopes:
print(f" - Scope: {scope['name']}")
print(f" Actions: {', '.join(scope['actions'][:3])}{'...' if len(scope['actions']) > 3 else ''}")
# Test scope enforcement
print("\n✓ Testing scope enforcement...")
# Test read scope
ctx = ExecutionContext({}, {'sub': 'user', 'scopes': ['read']})
try:
executor.auth_require_scopes(ctx, {'scopes': ['read']})
print(" ✅ Read scope correctly granted")
except PermissionError:
print(" ❌ Read scope should be granted")
return False
# Test write scope denial
try:
executor.auth_require_scopes(ctx, {'scopes': ['write']})
print(" ❌ Write scope should be denied")
return False
except PermissionError:
print(" ✅ Write scope correctly denied")
# Test admin scope
ctx_admin = ExecutionContext({}, {'sub': 'admin', 'scopes': ['read', 'write', 'admin']})
try:
executor.auth_require_scopes(ctx_admin, {'scopes': ['admin']})
print(" ✅ Admin scope correctly granted")
except PermissionError:
print(" ❌ Admin scope should be granted")
return False
return True
def check_event_log_semantics():
"""Check event log and replication semantics."""
print("\n" + "=" * 70)
print("6. Event Log & Replication Check")
print("=" * 70)
schema_path = Path(__file__).parent.parent / 'schema.json'
with open(schema_path) as f:
schema = json.load(f)
from operations import ExecutionContext
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp'))
# Check event types
print("\n✓ Checking event type definitions...")
event_types = schema['events']['types']
for event_type in event_types:
print(f" - Event type: {event_type['name']}")
print(f" Durable: {event_type.get('durable', True)}")
# Test event emission
print("\n✓ Testing event emission...")
ctx = ExecutionContext({'ns': 'test', 'name': 'pkg'}, {'sub': 'user1'})
executor.emit_event(ctx, {
'type': 'artifact.published',
'payload': {
'namespace': '{ns}',
'name': '{name}',
'by': '{principal.sub}'
}
})
if len(executor.event_log) != 1:
print(" ❌ Event not added to log")
return False
event = executor.event_log[0]
if event['type'] != 'artifact.published':
print(" ❌ Event type incorrect")
return False
if event['payload']['namespace'] != 'test':
print(" ❌ Event payload interpolation failed")
return False
print(" ✅ Events correctly emitted with interpolated payloads")
# Check replication config
print("\n✓ Checking replication configuration...")
replication = schema['replication']
print(f" - Mode: {replication['mode']}")
print(f" - Strategy: {replication['shipping']['strategy']}")
print(f" - Dedupe: {replication['shipping']['dedupe']['enabled']}")
print(" ✅ Replication configuration follows schema")
return True
def main():
"""Run all compliance checks."""
print("\n")
print("" + "=" * 68 + "")
print("" + " " * 15 + "Schema Compliance Validation" + " " * 25 + "")
print("" + "=" * 68 + "")
print("\nValidating operation implementation against schema.json...")
print()
results = []
results.append(("Operation Coverage", check_operation_coverage()))
results.append(("Route Compatibility", check_route_compatibility()))
results.append(("Operation Semantics", check_operation_semantics()))
results.append(("Storage Semantics", check_storage_semantics()))
results.append(("Auth Semantics", check_auth_semantics()))
results.append(("Event Log Semantics", check_event_log_semantics()))
# Summary
print("\n" + "=" * 70)
print("Validation Summary")
print("=" * 70)
for name, passed in results:
status = "✅ PASS" if passed else "❌ FAIL"
print(f"{status:12} {name}")
all_passed = all(result[1] for result in results)
print("\n" + "=" * 70)
if all_passed:
print("✅ All checks passed! Implementation matches schema spirit.")
else:
print("❌ Some checks failed. Implementation needs adjustments.")
print("=" * 70)
print()
return 0 if all_passed else 1
if __name__ == '__main__':
sys.exit(main())