mirror of
https://github.com/johndoe6345789/goodpackagerepo.git
synced 2026-04-24 13:54:59 +00:00
451 lines
15 KiB
Python
Executable File
451 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Schema Compliance Validator
|
|
|
|
This script validates that the operation implementation matches
|
|
the spirit and intent of the schema.json specification.
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import inspect
|
|
from pathlib import Path
|
|
|
|
# Add backend to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / 'backend'))
|
|
|
|
from operations import OperationExecutor
|
|
|
|
|
|
def check_operation_coverage():
|
|
"""Check that all schema operations are implemented."""
|
|
print("=" * 70)
|
|
print("1. Operation Coverage Check")
|
|
print("=" * 70)
|
|
|
|
# Load schema
|
|
schema_path = Path(__file__).parent.parent / 'schema.json'
|
|
with open(schema_path) as f:
|
|
schema = json.load(f)
|
|
|
|
allowed_ops = set(schema['ops']['allowed'])
|
|
|
|
# Get implemented operations
|
|
executor = OperationExecutor({}, {}, Path('/tmp'))
|
|
implemented_ops = set()
|
|
|
|
for name, method in inspect.getmembers(executor, predicate=inspect.ismethod):
|
|
if not name.startswith('_') and name != 'execute_pipeline':
|
|
# Convert method name back to operation name
|
|
# e.g., auth_require_scopes -> auth.require_scopes
|
|
# Replace first underscore with dot, keep rest as underscores
|
|
if '_' in name:
|
|
category, rest = name.split('_', 1)
|
|
op_name = f"{category}.{rest}"
|
|
implemented_ops.add(op_name)
|
|
|
|
print(f"\nSchema defines {len(allowed_ops)} operations")
|
|
print(f"Implementation provides {len(implemented_ops)} operations\n")
|
|
|
|
# Check for missing operations
|
|
missing = allowed_ops - implemented_ops
|
|
if missing:
|
|
print("❌ Missing implementations:")
|
|
for op in sorted(missing):
|
|
print(f" - {op}")
|
|
else:
|
|
print("✅ All schema operations are implemented!")
|
|
|
|
# Check for extra operations
|
|
extra = implemented_ops - allowed_ops
|
|
if extra:
|
|
print("\n⚠️ Extra operations not in schema:")
|
|
for op in sorted(extra):
|
|
print(f" - {op}")
|
|
|
|
return len(missing) == 0
|
|
|
|
|
|
def check_route_compatibility():
|
|
"""Check that operations work with real route pipelines."""
|
|
print("\n" + "=" * 70)
|
|
print("2. Route Pipeline Compatibility Check")
|
|
print("=" * 70)
|
|
|
|
schema_path = Path(__file__).parent.parent / 'schema.json'
|
|
with open(schema_path) as f:
|
|
schema = json.load(f)
|
|
|
|
from operations import ExecutionContext
|
|
|
|
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp/test'))
|
|
routes = schema['api']['routes']
|
|
|
|
all_valid = True
|
|
|
|
for route in routes:
|
|
route_id = route['id']
|
|
pipeline = route['pipeline']
|
|
|
|
print(f"\nRoute: {route_id}")
|
|
print(f" Method: {route['method']} {route['path']}")
|
|
|
|
# Check each operation in the pipeline
|
|
valid_ops = 0
|
|
for step in pipeline:
|
|
op_name = step['op']
|
|
method_name = op_name.replace('.', '_')
|
|
|
|
if hasattr(executor, method_name):
|
|
valid_ops += 1
|
|
else:
|
|
print(f" ❌ Operation not implemented: {op_name}")
|
|
all_valid = False
|
|
|
|
if valid_ops == len(pipeline):
|
|
print(f" ✅ All {valid_ops} operations implemented")
|
|
|
|
return all_valid
|
|
|
|
|
|
def check_operation_semantics():
|
|
"""Check that operations follow schema semantics."""
|
|
print("\n" + "=" * 70)
|
|
print("3. Operation Semantics Check")
|
|
print("=" * 70)
|
|
|
|
schema_path = Path(__file__).parent.parent / 'schema.json'
|
|
with open(schema_path) as f:
|
|
schema = json.load(f)
|
|
|
|
from operations import ExecutionContext
|
|
|
|
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp/test'))
|
|
|
|
print("\n✓ Checking transaction semantics...")
|
|
ctx = ExecutionContext({}, {})
|
|
|
|
# Test transaction semantics
|
|
try:
|
|
executor.txn_begin(ctx, {'isolation': 'serializable'})
|
|
if not ctx.transaction_active:
|
|
print(" ❌ txn.begin did not set transaction_active")
|
|
return False
|
|
print(" ✅ txn.begin correctly sets transaction state")
|
|
|
|
executor.txn_commit(ctx, {})
|
|
if ctx.transaction_active:
|
|
print(" ❌ txn.commit did not clear transaction_active")
|
|
return False
|
|
print(" ✅ txn.commit correctly clears transaction state")
|
|
except Exception as e:
|
|
print(f" ❌ Transaction operations failed: {e}")
|
|
return False
|
|
|
|
# Test kv.cas_put semantics (if_absent)
|
|
print("\n✓ Checking kv.cas_put semantics (if_absent behavior)...")
|
|
ctx = ExecutionContext({'key': 'test'}, {})
|
|
executor.kv_store['data/test'] = 'existing'
|
|
|
|
try:
|
|
executor.kv_cas_put(ctx, {
|
|
'doc': 'test',
|
|
'key': 'data/test',
|
|
'if_absent': True,
|
|
'value': 'new'
|
|
})
|
|
print(" ❌ kv.cas_put should fail when if_absent=True and key exists")
|
|
return False
|
|
except ValueError:
|
|
print(" ✅ kv.cas_put correctly enforces if_absent constraint")
|
|
|
|
# Test cache semantics
|
|
print("\n✓ Checking cache hit/miss semantics...")
|
|
ctx = ExecutionContext({'name': 'test'}, {})
|
|
executor.cache_get(ctx, {
|
|
'kind': 'response',
|
|
'key': 'nonexistent',
|
|
'hit_out': 'hit',
|
|
'value_out': 'val'
|
|
})
|
|
|
|
if ctx.get_var('hit') != False:
|
|
print(" ❌ cache.get should return hit=False for missing keys")
|
|
return False
|
|
print(" ✅ cache.get correctly handles cache misses")
|
|
|
|
# Test conditional responses
|
|
print("\n✓ Checking conditional response semantics...")
|
|
ctx = ExecutionContext({}, {})
|
|
ctx.set_var('item', None)
|
|
|
|
executor.respond_error(ctx, {
|
|
'when': {'is_null': '$item'},
|
|
'status': 404,
|
|
'code': 'NOT_FOUND',
|
|
'message': 'Not found'
|
|
})
|
|
|
|
if not ctx.response or ctx.response['status'] != 404:
|
|
print(" ❌ Conditional response not working correctly")
|
|
return False
|
|
print(" ✅ Conditional responses work correctly")
|
|
|
|
# Test variable interpolation
|
|
print("\n✓ Checking variable interpolation...")
|
|
ctx = ExecutionContext({'namespace': 'acme', 'name': 'pkg'}, {'sub': 'user1'})
|
|
interpolated = ctx.interpolate('artifact/{namespace}/{name} by {principal.sub}')
|
|
|
|
if interpolated != 'artifact/acme/pkg by user1':
|
|
print(f" ❌ Interpolation failed: {interpolated}")
|
|
return False
|
|
print(" ✅ Variable interpolation works correctly")
|
|
|
|
return True
|
|
|
|
|
|
def check_storage_semantics():
|
|
"""Check storage operation semantics match schema."""
|
|
print("\n" + "=" * 70)
|
|
print("4. Storage Semantics Check")
|
|
print("=" * 70)
|
|
|
|
schema_path = Path(__file__).parent.parent / 'schema.json'
|
|
with open(schema_path) as f:
|
|
schema = json.load(f)
|
|
|
|
from operations import ExecutionContext
|
|
import tempfile
|
|
|
|
# Check blob store semantics
|
|
print("\n✓ Checking blob store semantics...")
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path(tmpdir))
|
|
ctx = ExecutionContext({'body_bytes': b'test data'}, {})
|
|
|
|
# Put blob
|
|
executor.blob_put(ctx, {
|
|
'store': 'primary',
|
|
'from': 'request.body',
|
|
'out': 'digest',
|
|
'out_size': 'size'
|
|
})
|
|
|
|
digest = ctx.get_var('digest')
|
|
if not digest or not digest.startswith('sha256:'):
|
|
print(" ❌ blob.put should return sha256 digest")
|
|
return False
|
|
print(f" ✅ blob.put returns content-addressed digest: {digest[:20]}...")
|
|
|
|
# Verify blob is stored with content-addressing path structure
|
|
clean_digest = digest.replace('sha256:', '')
|
|
expected_path = Path(tmpdir) / clean_digest[:2] / clean_digest[2:4] / clean_digest
|
|
|
|
if not expected_path.exists():
|
|
print(f" ❌ Blob not stored at expected path: {expected_path}")
|
|
return False
|
|
print(" ✅ Blob stored with content-addressed path structure")
|
|
|
|
# Get blob back
|
|
ctx2 = ExecutionContext({}, {})
|
|
ctx2.set_var('digest_val', digest)
|
|
executor.blob_get(ctx2, {
|
|
'store': 'primary',
|
|
'digest': '$digest_val',
|
|
'out': 'content'
|
|
})
|
|
|
|
content = ctx2.get_var('content')
|
|
if content != b'test data':
|
|
print(" ❌ blob.get returned incorrect content")
|
|
return False
|
|
print(" ✅ blob.get retrieves correct content")
|
|
|
|
# Check document store semantics
|
|
print("\n✓ Checking document store (KV) semantics...")
|
|
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp'))
|
|
|
|
# Schema defines document configs like artifact_meta
|
|
doc_configs = schema['storage']['documents']
|
|
|
|
for doc_name, doc_config in doc_configs.items():
|
|
key_template = doc_config['key_template']
|
|
print(f" - Document type: {doc_name}")
|
|
print(f" Key template: {key_template}")
|
|
|
|
print(" ✅ Document store key templates match schema patterns")
|
|
|
|
# Check index semantics
|
|
print("\n✓ Checking index semantics...")
|
|
indexes = schema['indexes']
|
|
|
|
for index_name, index_config in indexes.items():
|
|
source = index_config['source_document']
|
|
keys = index_config['keys']
|
|
print(f" - Index: {index_name}")
|
|
print(f" Source: {source}")
|
|
print(f" Keys: {[k['name'] for k in keys]}")
|
|
|
|
print(" ✅ Index structures match schema definitions")
|
|
|
|
return True
|
|
|
|
|
|
def check_auth_semantics():
|
|
"""Check authentication/authorization semantics."""
|
|
print("\n" + "=" * 70)
|
|
print("5. Authentication & Authorization Check")
|
|
print("=" * 70)
|
|
|
|
schema_path = Path(__file__).parent.parent / 'schema.json'
|
|
with open(schema_path) as f:
|
|
schema = json.load(f)
|
|
|
|
from operations import ExecutionContext
|
|
|
|
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp'))
|
|
|
|
# Check scope definitions
|
|
print("\n✓ Checking scope definitions...")
|
|
auth_scopes = schema['auth']['scopes']
|
|
|
|
for scope in auth_scopes:
|
|
print(f" - Scope: {scope['name']}")
|
|
print(f" Actions: {', '.join(scope['actions'][:3])}{'...' if len(scope['actions']) > 3 else ''}")
|
|
|
|
# Test scope enforcement
|
|
print("\n✓ Testing scope enforcement...")
|
|
|
|
# Test read scope
|
|
ctx = ExecutionContext({}, {'sub': 'user', 'scopes': ['read']})
|
|
try:
|
|
executor.auth_require_scopes(ctx, {'scopes': ['read']})
|
|
print(" ✅ Read scope correctly granted")
|
|
except PermissionError:
|
|
print(" ❌ Read scope should be granted")
|
|
return False
|
|
|
|
# Test write scope denial
|
|
try:
|
|
executor.auth_require_scopes(ctx, {'scopes': ['write']})
|
|
print(" ❌ Write scope should be denied")
|
|
return False
|
|
except PermissionError:
|
|
print(" ✅ Write scope correctly denied")
|
|
|
|
# Test admin scope
|
|
ctx_admin = ExecutionContext({}, {'sub': 'admin', 'scopes': ['read', 'write', 'admin']})
|
|
try:
|
|
executor.auth_require_scopes(ctx_admin, {'scopes': ['admin']})
|
|
print(" ✅ Admin scope correctly granted")
|
|
except PermissionError:
|
|
print(" ❌ Admin scope should be granted")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def check_event_log_semantics():
|
|
"""Check event log and replication semantics."""
|
|
print("\n" + "=" * 70)
|
|
print("6. Event Log & Replication Check")
|
|
print("=" * 70)
|
|
|
|
schema_path = Path(__file__).parent.parent / 'schema.json'
|
|
with open(schema_path) as f:
|
|
schema = json.load(f)
|
|
|
|
from operations import ExecutionContext
|
|
|
|
executor = OperationExecutor(kv_store={}, index_store={}, blob_dir=Path('/tmp'))
|
|
|
|
# Check event types
|
|
print("\n✓ Checking event type definitions...")
|
|
event_types = schema['events']['types']
|
|
|
|
for event_type in event_types:
|
|
print(f" - Event type: {event_type['name']}")
|
|
print(f" Durable: {event_type.get('durable', True)}")
|
|
|
|
# Test event emission
|
|
print("\n✓ Testing event emission...")
|
|
ctx = ExecutionContext({'ns': 'test', 'name': 'pkg'}, {'sub': 'user1'})
|
|
|
|
executor.emit_event(ctx, {
|
|
'type': 'artifact.published',
|
|
'payload': {
|
|
'namespace': '{ns}',
|
|
'name': '{name}',
|
|
'by': '{principal.sub}'
|
|
}
|
|
})
|
|
|
|
if len(executor.event_log) != 1:
|
|
print(" ❌ Event not added to log")
|
|
return False
|
|
|
|
event = executor.event_log[0]
|
|
if event['type'] != 'artifact.published':
|
|
print(" ❌ Event type incorrect")
|
|
return False
|
|
|
|
if event['payload']['namespace'] != 'test':
|
|
print(" ❌ Event payload interpolation failed")
|
|
return False
|
|
|
|
print(" ✅ Events correctly emitted with interpolated payloads")
|
|
|
|
# Check replication config
|
|
print("\n✓ Checking replication configuration...")
|
|
replication = schema['replication']
|
|
print(f" - Mode: {replication['mode']}")
|
|
print(f" - Strategy: {replication['shipping']['strategy']}")
|
|
print(f" - Dedupe: {replication['shipping']['dedupe']['enabled']}")
|
|
print(" ✅ Replication configuration follows schema")
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""Run all compliance checks."""
|
|
print("\n")
|
|
print("╔" + "=" * 68 + "╗")
|
|
print("║" + " " * 15 + "Schema Compliance Validation" + " " * 25 + "║")
|
|
print("╚" + "=" * 68 + "╝")
|
|
print("\nValidating operation implementation against schema.json...")
|
|
print()
|
|
|
|
results = []
|
|
|
|
results.append(("Operation Coverage", check_operation_coverage()))
|
|
results.append(("Route Compatibility", check_route_compatibility()))
|
|
results.append(("Operation Semantics", check_operation_semantics()))
|
|
results.append(("Storage Semantics", check_storage_semantics()))
|
|
results.append(("Auth Semantics", check_auth_semantics()))
|
|
results.append(("Event Log Semantics", check_event_log_semantics()))
|
|
|
|
# Summary
|
|
print("\n" + "=" * 70)
|
|
print("Validation Summary")
|
|
print("=" * 70)
|
|
|
|
for name, passed in results:
|
|
status = "✅ PASS" if passed else "❌ FAIL"
|
|
print(f"{status:12} {name}")
|
|
|
|
all_passed = all(result[1] for result in results)
|
|
|
|
print("\n" + "=" * 70)
|
|
if all_passed:
|
|
print("✅ All checks passed! Implementation matches schema spirit.")
|
|
else:
|
|
print("❌ Some checks failed. Implementation needs adjustments.")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
return 0 if all_passed else 1
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|