Files
metabuilder/workflow/docs/DAG_EXECUTOR_QUICK_START.md
johndoe6345789 28ab35ba46 chore(docs): reorganize - move subproject docs to their homes (Phase 1)
Moves 45 documentation files from centralized /docs/ to subproject directories
following proximity-based organization principle. All moves use git mv to preserve history.

Changes:
- workflow/ docs: Move 27 files from docs/workflow/ to workflow/docs/
  - DAG executor docs, workflow compliance, executor analysis, loaderv2 guides, etc.
  - Result: workflow/docs/ now has 27 files

- dbal/ docs: Move 11 files from docs/dbal/ to dbal/docs/
  - DBAL architecture, analysis, integration, and workflow integration docs
  - Result: dbal/docs/ now has 18 files (11 new + 7 pre-existing)

- gameengine/ docs: Move 7 files from docs/gameengine/ to gameengine/docs/
  - GameEngine compliance audits, packages, Quake3, soundboard, engine tester
  - Result: gameengine/docs/ now has 20 files (7 new + 13 pre-existing)

Benefits:
- Docs are now closer to their code (easier to keep in sync)
- Reduces /docs/ clutter
- Establishes pattern for per-subproject documentation
- All git history preserved via git mv

Next phases:
- Phase 2: Move package-specific docs to /packages/{id}/docs/
- Phase 3: Separate N8N compliance docs by scope
- Phase 4: Organize UI documentation
- Phase 5: Create cross-project indices

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-01-23 17:22:58 +00:00

10 KiB

DAG Executor Quick Start Guide

For: Frontend executor developers Time: 10 minutes Goal: Understand how to integrate and use the DAG executor


5-Minute Overview

The MetaBuilder DAG executor is a production-grade workflow engine that:

  • Executes workflows as Directed Acyclic Graphs (DAG)
  • Manages dependency resolution automatically
  • Handles errors, retries, and conditional routing
  • Supports multi-tenant execution with secure isolation
  • Tracks metrics (execution time, node count, success rate)

Installation

The executor is built into the MetaBuilder monorepo. No installation needed.

# Initialize at startup (once per app)
import { initializeWorkflowEngine } from '@metabuilder/workflow'
initializeWorkflowEngine()

Quick Example (30 seconds)

import { DAGExecutor, WorkflowValidator } from '@metabuilder/workflow'

// 1. Load workflow (from database/JSON)
const workflow = loadWorkflowFromDatabase('wf_123')

// 2. Validate
const validator = new WorkflowValidator()
const result = validator.validate(workflow)
if (!result.valid) throw new Error('Invalid workflow')

// 3. Create context
const context = {
  executionId: 'exec_1',
  tenantId: 'acme',
  userId: 'user_1',
  user: { id: 'user_1', email: 'user@acme.com', level: 2 },
  trigger: workflow.triggers[0],
  triggerData: { message: 'hello' },
  variables: {},
  secrets: {}
}

// 4. Create executor
const executor = new DAGExecutor(
  context.executionId,
  workflow,
  context,
  nodeExecutor  // Your callback function
)

// 5. Execute
const state = await executor.execute()
const metrics = executor.getMetrics()

console.log(`Executed ${metrics.nodesExecuted} nodes in ${metrics.duration}ms`)

Key Concepts

1. Workflow Structure

A workflow is a DAG (no cycles allowed) with:

{
  nodes: [           // Individual tasks
    { id, name, type, parameters, ... }
  ],
  connections: {     // Edges between nodes
    "nodeA": {
      "main": {
        "0": [{ node: "nodeB", type: "main" }]
      }
    }
  }
}

2. Node Execution

Nodes execute sequentially (one at a time) based on:

  1. Dependency resolution - Can't run until inputs are ready
  2. Priority queue - Higher priority nodes dequeued first
  3. Availability - Trigger nodes start automatically

3. Error Handling

Each node has an onError policy:

onError: 'stopWorkflow'          // Abort everything (default)
onError: 'continueErrorOutput'   // Route to error port
onError: 'continueRegularOutput' // Route to main port with empty output
onError: 'skipNode'              // Skip dependent nodes

4. Retries

Nodes can retry on failure:

{
  maxTries: 3,
  retryPolicy: {
    backoffType: 'exponential',
    initialDelay: 1000,
    maxDelay: 30000,
    retryableErrors: ['TIMEOUT', 'TEMPORARY_FAILURE'],
    retryableStatusCodes: [408, 429, 500, 502, 503, 504]
  }
}

5. Variables & Templates

Use {{ }} syntax to reference data:

{
  "to": "{{ $context.user.email }}",
  "subject": "{{ $json.title }}",
  "body": "{{ $steps.transform.output.content }}"
}

The Node Executor Callback

The executor calls your callback for each node:

async function nodeExecutor(nodeId, workflow, context, state) {
  const node = workflow.nodes.find(n => n.id === nodeId)

  // Your implementation:
  // 1. Get the executor for this node type
  // 2. Execute the node
  // 3. Return result
  // 4. Handle errors

  try {
    const executor = registry.get(node.type)
    if (!executor) {
      return { status: 'error', error: 'No executor', timestamp: Date.now() }
    }

    return await executor.execute(node, context, state)
  } catch (error) {
    return {
      status: 'error',
      error: error.message,
      timestamp: Date.now()
    }
  }
}

Common Tasks

Task 1: Execute a Workflow

const executor = new DAGExecutor(execId, workflow, context, nodeExecutor)
const state = await executor.execute()

Task 2: Check Execution Status

const metrics = executor.getMetrics()
console.log(`${metrics.successNodes} of ${metrics.nodesExecuted} nodes succeeded`)
console.log(`Duration: ${metrics.duration}ms`)

Task 3: Stop Execution

executor.abort()  // Stops at next checkpoint

Task 4: Validate Before Executing

const validator = new WorkflowValidator()
const result = validator.validate(workflow)

if (!result.valid) {
  result.errors.forEach(error => {
    console.error(`${error.path}: ${error.message}`)
  })
  return
}

Task 5: Interpolate Variables

import { interpolateTemplate } from '@metabuilder/workflow'

const interpolated = interpolateTemplate(node.parameters, {
  context,
  json: triggerData,
  steps: state
})

Task 6: Register a Custom Node Type

import { getNodeExecutorRegistry } from '@metabuilder/workflow'

const registry = getNodeExecutorRegistry()
registry.register('custom.node.type', {
  nodeType: 'custom.node.type',
  async execute(node, context, state) {
    return {
      status: 'success',
      output: { message: 'Executed' },
      timestamp: Date.now()
    }
  },
  validate(node) {
    return {
      valid: true,
      errors: [],
      warnings: []
    }
  }
})

Execution Flow (Visual)

Start
  │
  ▼
Initialize Triggers
  │ (find nodes with no inputs)
  ▼
Queue Start Nodes
  │
  ┌─────────────────────────┐
  │ Main Execution Loop     │
  │                         │
  ▼                         │
Dequeue Node              │
  │                       │
  ├─ Check Dependencies   │
  │ (skip if failed)      │
  │                       │
  ├─ Execute Node         │
  │ (with retries)        │
  │                       │
  ├─ Handle Error         │
  │ (apply policy)        │
  │                       │
  └─ Route Output         │
    (queue dependents)   ─┘
       │
       ▼
    Queue Empty?
       │
    ├─ No → [Loop back]
    │
    └─ Yes
         │
         ▼
      Finalize
      │
      ├─ Calculate metrics
      ├─ Return state
      │
      ▼
      End

Validation Rules (Must Know)

The validator checks these rules before execution:

Check Description
Unique node IDs Each node must have unique ID
Node names unique Each node must have unique name
Valid node types Each node type must be registered
Connection targets exist All connection targets must be valid nodes
No circular connections Workflow must be a DAG (no cycles)
Parameter structure No "[object Object]" serialization bugs
Variable names valid Regex: [a-zA-Z_][a-zA-Z0-9_]*
Multi-tenant safety tenantId must be present
No dangling references All connection targets exist

Check before executing:

const validator = new WorkflowValidator()
const result = validator.validate(workflow)
if (!result.valid) {
  throw new Error(`Validation failed: ${result.errors.map(e => e.message).join(', ')}`)
}

Error Handling Patterns

Pattern 1: Fail Fast (Stop Immediately)

{
  "onError": "stopWorkflow"
}

Use when: Critical operations that can't continue

Pattern 2: Route to Error Handler

{
  "onError": "continueErrorOutput"
}

With connections:

{
  "connections": {
    "risky_node": {
      "error": {
        "0": [{ "node": "error_handler" }]
      }
    }
  }
}

Use when: Want to handle errors gracefully

Pattern 3: Continue with Default

{
  "onError": "continueRegularOutput"
}

Use when: Node failure is non-critical, continue with empty output

Pattern 4: Skip Dependents

{
  "onError": "skipNode"
}

Use when: Node failure invalidates all subsequent nodes


Performance Tips

  1. Cache template regexes - Don't compile same regex repeatedly
  2. Index connections by target - O(n) lookup → O(1) lookup
  3. Pre-build dependency graph - Calculate once, reuse
  4. Stream large state - Don't keep everything in memory
  5. Limit node count - Executioner scales to ~1000 nodes

Common Issues

Problem Cause Fix
Workflow hangs Circular connection or waiting for impossible dependency Check DAG property
Wrong node order Priority queue issue Verify dependency resolution
Template not interpolating Missing context or typo Check context object
Error not routing Wrong error policy Set onError: 'continueErrorOutput'
Out of memory Large state accumulation Limit state size
Slow execution Too many retries Reduce maxAttempts or adjust backoff

Next Steps

  1. Read full architecture: DAG_EXECUTOR_N8N_INTEGRATION_ANALYSIS.md
  2. Check code examples: DAG_EXECUTOR_TECHNICAL_REFERENCE.md
  3. Implement integration: Follow Phase 3 Week 2 plan
  4. Write tests: Use provided test patterns
  5. Deploy: Monitor metrics in production

Key Files

File Purpose Location
dag-executor.ts Main engine workflow/executor/ts/executor/
types.ts Type definitions workflow/executor/ts/
workflow-validator.ts Validation logic workflow/executor/ts/utils/
node-executor-registry.ts Plugin system workflow/executor/ts/registry/
priority-queue.ts Execution scheduling workflow/executor/ts/utils/
template-engine.ts Variable interpolation workflow/executor/ts/utils/

Support


Ready to integrate? Start with the full integration analysis.