Files
goodpackagerepo/backend/config_db_sqlalchemy.py

472 lines
18 KiB
Python

"""
Database models for repository configuration using SQLAlchemy.
Stores schema.json configuration in SQLite for dynamic management.
"""
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Optional
from models import (
ConfigSession, RepositoryConfig, Capability, Entity, EntityField, EntityConstraint,
BlobStore, KVStore, APIRoute, AuthScope, AuthPolicy, CachingConfig, FeaturesConfig,
DocumentConfig, StorageSchema, Index, IndexKey, Upstream, EventType, ReplicationConfig,
GCConfig, OpsLimits, AllowedOp, Invariant, ValidationRule, VersioningConfig
)
def load_schema_to_db(schema_path: Path):
"""Load schema.json into the database using SQLAlchemy."""
with open(schema_path) as f:
schema = json.load(f)
session = ConfigSession()
try:
# Check if config already exists
existing = session.query(RepositoryConfig).first()
if existing:
print("Configuration already exists in database")
return
now = datetime.utcnow().isoformat() + "Z"
# Create repository config
config = RepositoryConfig(
schema_version=schema['schema_version'],
type_id=schema['type_id'],
description=schema['description'],
created_at=now,
updated_at=now
)
session.add(config)
session.flush() # Get the config.id
# Add capabilities
capability = Capability(
config_id=config.id,
protocols=json.dumps(schema['capabilities']['protocols']),
storage=json.dumps(schema['capabilities']['storage']),
features=json.dumps(schema['capabilities']['features'])
)
session.add(capability)
# Add entities
for entity_name, entity_data in schema['entities'].items():
if entity_name == 'versioning':
continue
entity = Entity(
config_id=config.id,
name=entity_name,
type='artifact',
primary_key=json.dumps(entity_data.get('primary_key', [])),
created_at=now
)
session.add(entity)
session.flush()
# Add entity fields
for field_name, field_data in entity_data.get('fields', {}).items():
field = EntityField(
entity_id=entity.id,
name=field_name,
type=field_data['type'],
optional=field_data.get('optional', False),
normalizations=json.dumps(field_data.get('normalize', []))
)
session.add(field)
# Add entity constraints
for constraint in entity_data.get('constraints', []):
constraint_obj = EntityConstraint(
entity_id=entity.id,
field=constraint['field'],
regex=constraint['regex'],
when_present=constraint.get('when_present', False)
)
session.add(constraint_obj)
# Add blob stores
for store_name, store_data in schema['storage']['blob_stores'].items():
blob_store = BlobStore(
config_id=config.id,
name=store_name,
kind=store_data['kind'],
root=store_data['root'],
addressing_mode=store_data['addressing'].get('mode'),
addressing_digest=store_data['addressing'].get('digest'),
path_template=store_data['addressing'].get('path_template'),
max_blob_bytes=store_data['limits'].get('max_blob_bytes'),
min_blob_bytes=store_data['limits'].get('min_blob_bytes')
)
session.add(blob_store)
# Add KV stores
for store_name, store_data in schema['storage']['kv_stores'].items():
kv_store = KVStore(
config_id=config.id,
name=store_name,
kind=store_data['kind'],
root=store_data['root']
)
session.add(kv_store)
# Add API routes
for route in schema['api']['routes']:
api_route = APIRoute(
config_id=config.id,
route_id=route['id'],
method=route['method'],
path=route['path'],
tags=json.dumps(route.get('tags', [])),
pipeline=json.dumps(route['pipeline']),
created_at=now
)
session.add(api_route)
# Add auth scopes
for scope in schema['auth']['scopes']:
auth_scope = AuthScope(
config_id=config.id,
name=scope['name'],
actions=json.dumps(scope['actions'])
)
session.add(auth_scope)
# Add auth policies
for policy in schema['auth']['policies']:
auth_policy = AuthPolicy(
config_id=config.id,
name=policy['name'],
effect=policy['effect'],
conditions=json.dumps(policy.get('when', {})),
requirements=json.dumps(policy.get('require', {}))
)
session.add(auth_policy)
# Add caching config
caching = schema['caching']
caching_config = CachingConfig(
config_id=config.id,
response_cache_enabled=1 if caching['response_cache']['enabled'] else 0,
response_cache_ttl=caching['response_cache']['default_ttl_seconds'],
blob_cache_enabled=1 if caching['blob_cache']['enabled'] else 0,
blob_cache_max_bytes=caching['blob_cache']['max_bytes']
)
session.add(caching_config)
# Add features config
features = schema['features']
features_config = FeaturesConfig(
config_id=config.id,
mutable_tags=1 if features['mutable_tags'] else 0,
allow_overwrite_artifacts=1 if features['allow_overwrite_artifacts'] else 0,
proxy_enabled=1 if features['proxy_enabled'] else 0,
gc_enabled=1 if schema['gc']['enabled'] else 0
)
session.add(features_config)
# Add document configs
for doc_name, doc_data in schema['storage'].get('documents', {}).items():
doc_config = DocumentConfig(
config_id=config.id,
name=doc_name,
store=doc_data['store'],
key_template=doc_data['key_template'],
schema_name=doc_data['schema']
)
session.add(doc_config)
# Add storage schemas
for schema_name, schema_def in schema['storage'].get('schemas', {}).items():
storage_schema = StorageSchema(
config_id=config.id,
name=schema_name,
schema_definition=json.dumps(schema_def)
)
session.add(storage_schema)
# Add indexes
for index_name, index_data in schema.get('indexes', {}).items():
index = Index(
config_id=config.id,
name=index_name,
source_document=index_data['source_document'],
materialization_mode=index_data['materialization'].get('mode'),
materialization_trigger=index_data['materialization'].get('trigger')
)
session.add(index)
session.flush()
# Add index keys
for key in index_data.get('keys', []):
index_key = IndexKey(
index_id=index.id,
name=key['name'],
fields=json.dumps(key['fields']),
sort=json.dumps(key.get('sort', [])),
unique_key=1 if key.get('unique', False) else 0
)
session.add(index_key)
# Add upstreams
for upstream_name, upstream_data in schema.get('upstreams', {}).items():
upstream = Upstream(
config_id=config.id,
name=upstream_name,
base_url=upstream_data['base_url'],
auth_mode=upstream_data.get('auth', {}).get('mode'),
connect_timeout_ms=upstream_data.get('timeouts_ms', {}).get('connect'),
read_timeout_ms=upstream_data.get('timeouts_ms', {}).get('read'),
retry_max_attempts=upstream_data.get('retry', {}).get('max_attempts'),
retry_backoff_ms=upstream_data.get('retry', {}).get('backoff_ms')
)
session.add(upstream)
# Add event types
for event in schema.get('events', {}).get('types', []):
event_type = EventType(
config_id=config.id,
name=event['name'],
durable=1 if event.get('durable', True) else 0,
schema_definition=json.dumps(event.get('schema', {}))
)
session.add(event_type)
# Add replication config
replication = schema.get('replication', {})
if replication:
replication_config = ReplicationConfig(
config_id=config.id,
mode=replication.get('mode'),
log_store=replication.get('log', {}).get('store'),
log_key_prefix=replication.get('log', {}).get('key_prefix'),
log_ordering=replication.get('log', {}).get('ordering'),
log_max_event_bytes=replication.get('log', {}).get('max_event_bytes'),
shipping_strategy=replication.get('shipping', {}).get('strategy'),
shipping_dedupe_enabled=1 if replication.get('shipping', {}).get('dedupe', {}).get('enabled', False) else 0,
shipping_batch_max_events=replication.get('shipping', {}).get('batch', {}).get('max_events'),
shipping_batch_max_bytes=replication.get('shipping', {}).get('batch', {}).get('max_bytes')
)
session.add(replication_config)
# Add GC config
gc = schema.get('gc', {})
if gc:
gc_config = GCConfig(
config_id=config.id,
enabled=1 if gc.get('enabled', True) else 0,
immutable_after_publish=1 if gc.get('retention', {}).get('immutable_after_publish', True) else 0,
keep_last_n_versions=gc.get('retention', {}).get('keep_last_n_versions'),
keep_tags_forever=1 if gc.get('retention', {}).get('keep_tags_forever', True) else 0,
sweep_schedule_rrule=gc.get('sweep', {}).get('schedule', {}).get('rrule'),
sweep_unreferenced_after_seconds=gc.get('sweep', {}).get('sweep_unreferenced_after_seconds')
)
session.add(gc_config)
# Add ops limits
ops = schema.get('ops', {})
if ops:
ops_limits = OpsLimits(
config_id=config.id,
closed_world=1 if ops.get('closed_world', True) else 0,
max_pipeline_ops=ops.get('limits', {}).get('max_pipeline_ops'),
max_request_body_bytes=ops.get('limits', {}).get('max_request_body_bytes'),
max_json_bytes=ops.get('limits', {}).get('max_json_bytes'),
max_kv_value_bytes=ops.get('limits', {}).get('max_kv_value_bytes'),
max_cpu_ms_per_request=ops.get('limits', {}).get('max_cpu_ms_per_request'),
max_io_ops_per_request=ops.get('limits', {}).get('max_io_ops_per_request')
)
session.add(ops_limits)
# Add allowed operations
for op in ops.get('allowed', []):
allowed_op = AllowedOp(
config_id=config.id,
operation=op
)
session.add(allowed_op)
# Add invariants
for invariant in schema.get('invariants', {}).get('global', []):
invariant_obj = Invariant(
config_id=config.id,
invariant_id=invariant['id'],
description=invariant['description'],
assertion=json.dumps(invariant['assert'])
)
session.add(invariant_obj)
# Add validation rules
validation = schema.get('validation', {})
for rule in validation.get('load_time_checks', []):
validation_rule = ValidationRule(
config_id=config.id,
rule_id=rule['id'],
rule_type='load_time',
requirement=json.dumps(rule['require']),
on_fail=rule['on_fail']
)
session.add(validation_rule)
for rule in validation.get('runtime_checks', []):
validation_rule = ValidationRule(
config_id=config.id,
rule_id=rule['id'],
rule_type='runtime',
requirement=json.dumps(rule['require']),
on_fail=rule['on_fail']
)
session.add(validation_rule)
# Add versioning config
versioning = schema.get('entities', {}).get('versioning', {})
if versioning:
versioning_config = VersioningConfig(
config_id=config.id,
scheme=versioning.get('scheme'),
ordering=versioning.get('ordering'),
allow_prerelease=1 if versioning.get('allow_prerelease', False) else 0,
latest_policy_enabled=1 if versioning.get('latest_policy', {}).get('enabled', True) else 0,
latest_policy_monotonic=1 if versioning.get('latest_policy', {}).get('monotonic', True) else 0,
latest_policy_exclude_prerelease=1 if versioning.get('latest_policy', {}).get('exclude_prerelease', True) else 0
)
session.add(versioning_config)
session.commit()
print("Schema loaded into database successfully")
except Exception as e:
session.rollback()
print(f"Error loading schema: {e}")
raise
finally:
session.close()
def get_repository_config() -> Optional[Dict[str, Any]]:
"""Get the current repository configuration."""
session = ConfigSession()
try:
config = session.query(RepositoryConfig).first()
if not config:
return None
config_dict = {
'id': config.id,
'schema_version': config.schema_version,
'type_id': config.type_id,
'description': config.description,
'created_at': config.created_at,
'updated_at': config.updated_at
}
# Get capabilities
if config.capabilities:
cap = config.capabilities[0]
config_dict['capabilities'] = {
'protocols': json.loads(cap.protocols),
'storage': json.loads(cap.storage),
'features': json.loads(cap.features)
}
# Get entities
entities = []
for entity in config.entities:
entity_dict = {
'id': entity.id,
'name': entity.name,
'type': entity.type,
'primary_key': entity.primary_key,
'created_at': entity.created_at,
'fields': [],
'constraints': []
}
# Get fields
for field in entity.fields:
entity_dict['fields'].append({
'id': field.id,
'name': field.name,
'type': field.type,
'optional': field.optional,
'normalizations': field.normalizations
})
# Get constraints
for constraint in entity.constraints:
entity_dict['constraints'].append({
'id': constraint.id,
'field': constraint.field,
'regex': constraint.regex,
'when_present': constraint.when_present
})
entities.append(entity_dict)
config_dict['entities'] = entities
# Get blob stores
config_dict['blob_stores'] = [
{
'id': bs.id,
'name': bs.name,
'kind': bs.kind,
'root': bs.root,
'addressing_mode': bs.addressing_mode,
'addressing_digest': bs.addressing_digest,
'path_template': bs.path_template,
'max_blob_bytes': bs.max_blob_bytes,
'min_blob_bytes': bs.min_blob_bytes
}
for bs in config.blob_stores
]
# Get KV stores
config_dict['kv_stores'] = [
{'id': kv.id, 'name': kv.name, 'kind': kv.kind, 'root': kv.root}
for kv in config.kv_stores
]
# Get API routes
config_dict['api_routes'] = [
{
'id': route.id,
'route_id': route.route_id,
'method': route.method,
'path': route.path,
'tags': route.tags,
'pipeline': route.pipeline,
'created_at': route.created_at
}
for route in config.api_routes
]
# Get auth scopes
config_dict['auth_scopes'] = [
{'id': scope.id, 'name': scope.name, 'actions': scope.actions}
for scope in config.auth_scopes
]
# Get auth policies
config_dict['auth_policies'] = [
{
'id': policy.id,
'name': policy.name,
'effect': policy.effect,
'conditions': policy.conditions,
'requirements': policy.requirements
}
for policy in config.auth_policies
]
return config_dict
finally:
session.close()
# Load schema if database is empty
schema_path = Path(__file__).parent.parent / "schema.json"
if schema_path.exists():
load_schema_to_db(schema_path)