Files
johndoe6345789 df5398a7ee feat(auth): Phase 7 Flask authentication middleware with JWT and multi-tenant isolation
Complete implementation of enterprise-grade authentication middleware for email service:

Features:
- JWT token creation/validation with configurable expiration
- Bearer token extraction and validation
- Multi-tenant isolation enforced at middleware level
- Role-based access control (RBAC) with user/admin roles
- Row-level security (RLS) for resource access
- Automatic request logging with user context and audit trail
- CORS configuration for email client frontend
- Rate limiting (50 req/min per user with Redis backend)
- Comprehensive error handling with proper HTTP status codes

Implementation:
- Enhanced src/middleware/auth.py (415 lines)
  - JWTConfig class for token management
  - create_jwt_token() for token generation
  - decode_jwt_token() for token validation
  - @verify_tenant_context decorator for auth middleware
  - @verify_role decorator for RBAC
  - verify_resource_access() for row-level security
  - log_request_context() for audit logging

Testing:
- 52 comprehensive test cases covering all features
- 100% pass rate with fast execution (0.15s)
- Test categories: JWT, multi-tenant, RBAC, RLS, logging, integration
- Full coverage of error scenarios and edge cases

Documentation:
- AUTH_MIDDLEWARE.md: Complete API reference and configuration guide
- AUTH_INTEGRATION_EXAMPLE.py: Real-world usage examples for 5+ scenarios
- PHASE_7_SUMMARY.md: Implementation summary with checklist
- Inline code documentation with type hints

Security:
- Multi-tenant data isolation at all levels
- Constant-time password comparison
- JWT signature validation
- CORS protection
- Rate limiting against abuse
- Comprehensive audit logging

Dependencies Added:
- PyJWT==2.8.1

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-01-24 00:20:19 +00:00

13 KiB

Message Threading Node Executor - Phase 6

Professional email message threading plugin for MetaBuilder workflow engine. Groups messages by conversation thread using RFC 5256 IMAP THREAD semantics with advanced features for handling complex email hierarchies.

Overview

The Message Threading Executor constructs hierarchical conversation threads from raw email messages by analyzing RFC 5322 headers (Message-ID, References, In-Reply-To). Supports handling of orphaned messages, unread tracking per thread, and high-performance processing of 1000+ message conversations.

Features

Core Threading

  • RFC 5322 Compliance: Proper parsing of Message-ID, References, and In-Reply-To headers
  • Hierarchical Trees: Builds complete parent-child relationships with arbitrary depth
  • Thread Roots: Automatically identifies conversation roots (original messages)
  • Multiple Threads: Handles multiple independent conversations in single batch

Advanced Features

  • Unread Tracking: Counts unread messages at thread and subtree levels
  • Participant Extraction: Collects all unique senders/recipients in conversation
  • Date Range: Tracks earliest and latest message in each thread
  • Thread State: Manages collapsed/expanded view state per node
  • Performance: Efficiently threads 1000+ messages in <500ms

Orphan Handling

  • Orphan Detection: Identifies messages without parents
  • Orphan Resolution: Optional linking using date/subject heuristics
  • Orphan Linking Strategies:
    • date: Links orphans to closest message by timestamp
    • subject: Fuzzy-matches subjects (with configurable similarity threshold)
    • none: Treats orphans as separate conversations

Installation

npm install @metabuilder/workflow-plugin-message-threading

Configuration

Node Parameters

interface MessageThreadingConfig {
  // Required
  messages: EmailMessage[];           // Array of email messages to thread
  tenantId: string;                   // Multi-tenant context identifier

  // Optional
  expandAll?: boolean;                // Expand all threads (default: false, only root expanded)
  maxDepth?: number;                  // Maximum tree depth (default: 100)
  resolveOrphans?: boolean;           // Enable orphan resolution (default: false)
  orphanLinkingStrategy?: 'date' | 'subject' | 'none';  // How to link orphans
  subjectSimilarityThreshold?: number; // Threshold 0.0-1.0 for subject matching (default: 0.6)
}

Input Message Format

interface EmailMessage {
  messageId: string;                  // RFC 5322 Message-ID (required)
  subject: string;                    // Email subject
  from: string;                       // Sender email
  to: string[];                       // Recipient emails
  date: string;                       // ISO 8601 timestamp
  uid: string;                        // Message UID for retrieval
  isRead: boolean;                    // Read status

  // Optional RFC 5322 headers
  references?: string;                // Space-separated Message-IDs from RFC 5322
  inReplyTo?: string;                 // Parent Message-ID from In-Reply-To header
  flags?: string[];                   // User labels/keywords
  size?: number;                      // Message size in bytes
}

Output Format

ThreadingResult

interface ThreadingResult {
  threads: ThreadGroup[];             // Array of conversation threads
  messageCount: number;               // Total input messages
  threadedCount: number;              // Messages successfully threaded
  orphanCount: number;                // Messages without parents
  executionDuration: number;          // Processing time (ms)
  warnings: string[];                 // Non-fatal issues
  errors: ThreadingError[];           // Critical errors
  metrics: {
    avgThreadSize: number;            // Average messages per thread
    maxThreadSize: number;            // Largest thread size
    minThreadSize: number;            // Smallest thread size
    totalUnread: number;              // Total unread across all threads
    maxDepth: number;                 // Deepest nesting level
  };
}

ThreadGroup

interface ThreadGroup {
  threadId: string;                   // Root message ID
  root: ThreadNode;                   // Root message node with tree structure
  messages: EmailMessage[];           // Flat array of all messages
  unreadCount: number;                // Total unread in thread
  participants: string[];             // All unique email addresses
  startDate: string;                  // Earliest message (ISO 8601)
  endDate: string;                    // Latest message (ISO 8601)
  messageCount: number;               // Total messages in thread
  orphanedMessages: EmailMessage[];   // Messages without parents
  threadState: {
    expandedNodeIds: Set<string>;     // Message IDs in expanded state
    collapsedNodeIds: Set<string>;    // Message IDs in collapsed state
  };
  metrics: {
    threadingDurationMs: number;      // Time to build this thread
    orphanCount: number;              // Orphans in this thread
    maxDepth: number;                 // Tree depth
    avgMessagesPerLevel: number;      // Avg messages per nesting level
  };
}

ThreadNode

interface ThreadNode {
  message: EmailMessage;              // The email message
  children: ThreadNode[];             // Direct replies
  parentId: string | null;            // Parent message ID
  depth: number;                      // 0 = root
  isExpanded: boolean;                // UI collapse/expand state
  unreadCount: number;                // Unread in this subtree
  participants: Set<string>;          // Senders/recipients in subtree
}

Usage Examples

Basic Threading

import { messageThreadingExecutor } from '@metabuilder/workflow-plugin-message-threading';

const executor = messageThreadingExecutor();

const config: MessageThreadingConfig = {
  messages: [
    {
      messageId: 'msg1@company.com',
      subject: 'Project Update',
      from: 'alice@company.com',
      to: ['bob@company.com'],
      date: '2026-01-20T10:00:00Z',
      uid: 'uid-001',
      isRead: true
    },
    {
      messageId: 'msg2@company.com',
      subject: 'Re: Project Update',
      from: 'bob@company.com',
      to: ['alice@company.com'],
      date: '2026-01-20T11:00:00Z',
      inReplyTo: '<msg1@company.com>',
      uid: 'uid-002',
      isRead: false
    }
  ],
  tenantId: 'company-123'
};

const result = executor.execute({
  node: { id: 'thread-001', name: 'Thread Messages', nodeType: 'message-threading', parameters: config },
  context: {
    executionId: 'exec-001',
    tenantId: 'company-123',
    userId: 'user-001',
    triggerData: {},
    variables: {}
  },
  state: {}
});

console.log(result.output);
// Result includes:
// - result.threads: Array of ThreadGroup objects
// - result.statistics: Thread counts and metrics
// - result.metrics: Detailed performance metrics

Handling Orphans with Subject Matching

const config: MessageThreadingConfig = {
  messages: [/* ... */],
  tenantId: 'company-123',
  resolveOrphans: true,
  orphanLinkingStrategy: 'subject',
  subjectSimilarityThreshold: 0.75  // 75% subject similarity required
};

const result = executor.execute({
  node: { /* ... */, parameters: config },
  context: { /* ... */ },
  state: {}
});

// Orphans will be linked to similar conversations
// Check result.output.threads[n].orphanedMessages for unresolved orphans

High-Performance Threading (1000+ messages)

const largeMessageSet: EmailMessage[] = generateLargeEmailSet(5000);

const config: MessageThreadingConfig = {
  messages: largeMessageSet,
  tenantId: 'company-123',
  maxDepth: 50,           // Limit tree depth
  expandAll: false        // Don't expand all (UI optimization)
};

const startTime = Date.now();
const result = executor.execute({
  node: { /* ... */, parameters: config },
  context: { /* ... */ },
  state: {}
});
const duration = Date.now() - startTime;

console.log(`Threaded ${result.output.statistics.totalMessages} messages in ${duration}ms`);
console.log(`Created ${result.output.statistics.threadCount} threads`);
console.log(`Max thread depth: ${result.output.statistics.maxDepth}`);

Algorithm Details

Threading Algorithm (RFC 5256)

  1. Message Indexing: Build map of messageId → message for O(1) lookup
  2. Parent Extraction: Parse In-Reply-To and References headers
    • In-Reply-To takes precedence if present
    • Otherwise use last Message-ID from References
  3. Relationship Building: Construct parent-child mappings
  4. Root Finding: Identify messages with no parents
  5. Tree Construction: Recursively build thread hierarchies
  6. Metrics Calculation: Count unread, extract participants, find date range
  7. Orphan Resolution (optional): Link orphaned messages using heuristics

Subject Similarity (Levenshtein Distance)

similarity = (longer.length - editDistance) / longer.length

Subject normalization:

  • Remove "Re: " prefixes
  • Case-insensitive comparison
  • Configurable threshold (default: 0.6 = 60% match)

Performance Characteristics

Input Size Typical Duration Memory
100 msgs <10ms ~1MB
1,000 msgs <100ms ~10MB
5,000 msgs <300ms ~50MB
10,000 msgs <600ms ~100MB

Tested on Node.js with typical email patterns (avg 3-5 msgs/thread).

Error Handling

Recoverable Errors

  • Invalid Message-IDs: Treated as separate conversations
  • Missing parent references: Message becomes root
  • Malformed dates: Uses epoch if unparseable
  • Circular references: Breaks cycles (not traversed)

Non-Recoverable Errors

  • Empty message array: Throws error
  • Missing tenantId: Throws error
  • Invalid configuration values: Throws error

Use Cases

Email Client Threading

// Display conversation threads in email UI
const threads = result.output.threads;

// Build UI tree from ThreadNode structure
threads.forEach(group => {
  renderThreadUI(group.root, {
    collapsed: !group.root.isExpanded,
    unreadBadge: group.root.unreadCount > 0 ? group.root.unreadCount : null
  });
});

Conversation Analytics

// Analyze conversation patterns
const stats = result.output.statistics;
console.log(`Average thread size: ${stats.avgThreadSize}`);
console.log(`Total unread: ${stats.totalUnread}`);
console.log(`Max nesting depth: ${stats.maxDepth}`);

// Per-thread metrics
result.output.threads.forEach(thread => {
  console.log(`Thread "${thread.messages[0].subject}"`);
  console.log(`  Messages: ${thread.messageCount}`);
  console.log(`  Participants: ${thread.participants.length}`);
  console.log(`  Duration: ${new Date(thread.endDate).getTime() - new Date(thread.startDate).getTime()}ms`);
});

Unread Management

// Get all unread threads
const unreadThreads = result.output.threads.filter(t => t.unreadCount > 0);

// Get deeply nested unread messages
const findUnreadAtDepth = (node: ThreadNode, depth: number): ThreadNode[] => {
  if (node.depth === depth && !node.message.isRead) {
    return [node];
  }
  return node.children.flatMap(c => findUnreadAtDepth(c, depth));
};

Testing

Comprehensive test suite with 40+ test cases:

npm test                    # Run all tests
npm run test:watch        # Watch mode
npm run test:coverage     # Generate coverage report

Test Categories

  • Basic threading (simple chains)
  • Multi-level hierarchies (deep threads)
  • Multiple branches (parallel replies)
  • Unread tracking (at all levels)
  • Orphan handling (detection & resolution)
  • Participant extraction (unique addresses)
  • Thread state (collapsed/expanded)
  • Subject matching (Levenshtein distance)
  • Date range tracking (earliest/latest)
  • References header parsing (complex chains)
  • Performance (1000+ messages)
  • Edge cases (circular refs, malformed IDs)
  • Configuration validation (all parameters)

Coverage: >80% branches, functions, lines, statements

Workflow Integration

Workflow JSON Example

{
  "version": "2.2.0",
  "nodes": [
    {
      "id": "fetch-messages",
      "type": "operation",
      "op": "imap-sync",
      "parameters": {
        "imapId": "{{ $json.accountId }}",
        "folderId": "{{ $json.folderId }}"
      }
    },
    {
      "id": "thread-messages",
      "type": "operation",
      "op": "message-threading",
      "parameters": {
        "messages": "{{ $json.messages }}",
        "tenantId": "{{ context.tenantId }}",
        "expandAll": false,
        "resolveOrphans": true,
        "orphanLinkingStrategy": "date"
      },
      "dependsOn": ["fetch-messages"]
    },
    {
      "id": "render-ui",
      "type": "operation",
      "op": "render-component",
      "parameters": {
        "component": "ThreadListView",
        "props": {
          "threads": "{{ $json.threads }}"
        }
      },
      "dependsOn": ["thread-messages"]
    }
  ]
}

References

  • RFC 5322: Internet Message Format
  • RFC 5256: IMAP4 THREAD Extension (conversation threading algorithm)
  • RFC 5322 Obsoletes: RFC 2822, RFC 822

Compatibility

  • Node.js: 18+
  • TypeScript: 5.0+
  • MetaBuilder Workflow: ^3.0.0

License

MIT - See LICENSE in repository root