mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-04-25 14:25:02 +00:00
Complete implementation of enterprise-grade authentication middleware for email service: Features: - JWT token creation/validation with configurable expiration - Bearer token extraction and validation - Multi-tenant isolation enforced at middleware level - Role-based access control (RBAC) with user/admin roles - Row-level security (RLS) for resource access - Automatic request logging with user context and audit trail - CORS configuration for email client frontend - Rate limiting (50 req/min per user with Redis backend) - Comprehensive error handling with proper HTTP status codes Implementation: - Enhanced src/middleware/auth.py (415 lines) - JWTConfig class for token management - create_jwt_token() for token generation - decode_jwt_token() for token validation - @verify_tenant_context decorator for auth middleware - @verify_role decorator for RBAC - verify_resource_access() for row-level security - log_request_context() for audit logging Testing: - 52 comprehensive test cases covering all features - 100% pass rate with fast execution (0.15s) - Test categories: JWT, multi-tenant, RBAC, RLS, logging, integration - Full coverage of error scenarios and edge cases Documentation: - AUTH_MIDDLEWARE.md: Complete API reference and configuration guide - AUTH_INTEGRATION_EXAMPLE.py: Real-world usage examples for 5+ scenarios - PHASE_7_SUMMARY.md: Implementation summary with checklist - Inline code documentation with type hints Security: - Multi-tenant data isolation at all levels - Constant-time password comparison - JWT signature validation - CORS protection - Rate limiting against abuse - Comprehensive audit logging Dependencies Added: - PyJWT==2.8.1 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
13 KiB
13 KiB
Message Threading Node Executor - Phase 6
Professional email message threading plugin for MetaBuilder workflow engine. Groups messages by conversation thread using RFC 5256 IMAP THREAD semantics with advanced features for handling complex email hierarchies.
Overview
The Message Threading Executor constructs hierarchical conversation threads from raw email messages by analyzing RFC 5322 headers (Message-ID, References, In-Reply-To). Supports handling of orphaned messages, unread tracking per thread, and high-performance processing of 1000+ message conversations.
Features
Core Threading
- RFC 5322 Compliance: Proper parsing of Message-ID, References, and In-Reply-To headers
- Hierarchical Trees: Builds complete parent-child relationships with arbitrary depth
- Thread Roots: Automatically identifies conversation roots (original messages)
- Multiple Threads: Handles multiple independent conversations in single batch
Advanced Features
- Unread Tracking: Counts unread messages at thread and subtree levels
- Participant Extraction: Collects all unique senders/recipients in conversation
- Date Range: Tracks earliest and latest message in each thread
- Thread State: Manages collapsed/expanded view state per node
- Performance: Efficiently threads 1000+ messages in <500ms
Orphan Handling
- Orphan Detection: Identifies messages without parents
- Orphan Resolution: Optional linking using date/subject heuristics
- Orphan Linking Strategies:
date: Links orphans to closest message by timestampsubject: Fuzzy-matches subjects (with configurable similarity threshold)none: Treats orphans as separate conversations
Installation
npm install @metabuilder/workflow-plugin-message-threading
Configuration
Node Parameters
interface MessageThreadingConfig {
// Required
messages: EmailMessage[]; // Array of email messages to thread
tenantId: string; // Multi-tenant context identifier
// Optional
expandAll?: boolean; // Expand all threads (default: false, only root expanded)
maxDepth?: number; // Maximum tree depth (default: 100)
resolveOrphans?: boolean; // Enable orphan resolution (default: false)
orphanLinkingStrategy?: 'date' | 'subject' | 'none'; // How to link orphans
subjectSimilarityThreshold?: number; // Threshold 0.0-1.0 for subject matching (default: 0.6)
}
Input Message Format
interface EmailMessage {
messageId: string; // RFC 5322 Message-ID (required)
subject: string; // Email subject
from: string; // Sender email
to: string[]; // Recipient emails
date: string; // ISO 8601 timestamp
uid: string; // Message UID for retrieval
isRead: boolean; // Read status
// Optional RFC 5322 headers
references?: string; // Space-separated Message-IDs from RFC 5322
inReplyTo?: string; // Parent Message-ID from In-Reply-To header
flags?: string[]; // User labels/keywords
size?: number; // Message size in bytes
}
Output Format
ThreadingResult
interface ThreadingResult {
threads: ThreadGroup[]; // Array of conversation threads
messageCount: number; // Total input messages
threadedCount: number; // Messages successfully threaded
orphanCount: number; // Messages without parents
executionDuration: number; // Processing time (ms)
warnings: string[]; // Non-fatal issues
errors: ThreadingError[]; // Critical errors
metrics: {
avgThreadSize: number; // Average messages per thread
maxThreadSize: number; // Largest thread size
minThreadSize: number; // Smallest thread size
totalUnread: number; // Total unread across all threads
maxDepth: number; // Deepest nesting level
};
}
ThreadGroup
interface ThreadGroup {
threadId: string; // Root message ID
root: ThreadNode; // Root message node with tree structure
messages: EmailMessage[]; // Flat array of all messages
unreadCount: number; // Total unread in thread
participants: string[]; // All unique email addresses
startDate: string; // Earliest message (ISO 8601)
endDate: string; // Latest message (ISO 8601)
messageCount: number; // Total messages in thread
orphanedMessages: EmailMessage[]; // Messages without parents
threadState: {
expandedNodeIds: Set<string>; // Message IDs in expanded state
collapsedNodeIds: Set<string>; // Message IDs in collapsed state
};
metrics: {
threadingDurationMs: number; // Time to build this thread
orphanCount: number; // Orphans in this thread
maxDepth: number; // Tree depth
avgMessagesPerLevel: number; // Avg messages per nesting level
};
}
ThreadNode
interface ThreadNode {
message: EmailMessage; // The email message
children: ThreadNode[]; // Direct replies
parentId: string | null; // Parent message ID
depth: number; // 0 = root
isExpanded: boolean; // UI collapse/expand state
unreadCount: number; // Unread in this subtree
participants: Set<string>; // Senders/recipients in subtree
}
Usage Examples
Basic Threading
import { messageThreadingExecutor } from '@metabuilder/workflow-plugin-message-threading';
const executor = messageThreadingExecutor();
const config: MessageThreadingConfig = {
messages: [
{
messageId: 'msg1@company.com',
subject: 'Project Update',
from: 'alice@company.com',
to: ['bob@company.com'],
date: '2026-01-20T10:00:00Z',
uid: 'uid-001',
isRead: true
},
{
messageId: 'msg2@company.com',
subject: 'Re: Project Update',
from: 'bob@company.com',
to: ['alice@company.com'],
date: '2026-01-20T11:00:00Z',
inReplyTo: '<msg1@company.com>',
uid: 'uid-002',
isRead: false
}
],
tenantId: 'company-123'
};
const result = executor.execute({
node: { id: 'thread-001', name: 'Thread Messages', nodeType: 'message-threading', parameters: config },
context: {
executionId: 'exec-001',
tenantId: 'company-123',
userId: 'user-001',
triggerData: {},
variables: {}
},
state: {}
});
console.log(result.output);
// Result includes:
// - result.threads: Array of ThreadGroup objects
// - result.statistics: Thread counts and metrics
// - result.metrics: Detailed performance metrics
Handling Orphans with Subject Matching
const config: MessageThreadingConfig = {
messages: [/* ... */],
tenantId: 'company-123',
resolveOrphans: true,
orphanLinkingStrategy: 'subject',
subjectSimilarityThreshold: 0.75 // 75% subject similarity required
};
const result = executor.execute({
node: { /* ... */, parameters: config },
context: { /* ... */ },
state: {}
});
// Orphans will be linked to similar conversations
// Check result.output.threads[n].orphanedMessages for unresolved orphans
High-Performance Threading (1000+ messages)
const largeMessageSet: EmailMessage[] = generateLargeEmailSet(5000);
const config: MessageThreadingConfig = {
messages: largeMessageSet,
tenantId: 'company-123',
maxDepth: 50, // Limit tree depth
expandAll: false // Don't expand all (UI optimization)
};
const startTime = Date.now();
const result = executor.execute({
node: { /* ... */, parameters: config },
context: { /* ... */ },
state: {}
});
const duration = Date.now() - startTime;
console.log(`Threaded ${result.output.statistics.totalMessages} messages in ${duration}ms`);
console.log(`Created ${result.output.statistics.threadCount} threads`);
console.log(`Max thread depth: ${result.output.statistics.maxDepth}`);
Algorithm Details
Threading Algorithm (RFC 5256)
- Message Indexing: Build map of messageId → message for O(1) lookup
- Parent Extraction: Parse In-Reply-To and References headers
- In-Reply-To takes precedence if present
- Otherwise use last Message-ID from References
- Relationship Building: Construct parent-child mappings
- Root Finding: Identify messages with no parents
- Tree Construction: Recursively build thread hierarchies
- Metrics Calculation: Count unread, extract participants, find date range
- Orphan Resolution (optional): Link orphaned messages using heuristics
Subject Similarity (Levenshtein Distance)
similarity = (longer.length - editDistance) / longer.length
Subject normalization:
- Remove "Re: " prefixes
- Case-insensitive comparison
- Configurable threshold (default: 0.6 = 60% match)
Performance Characteristics
| Input Size | Typical Duration | Memory |
|---|---|---|
| 100 msgs | <10ms | ~1MB |
| 1,000 msgs | <100ms | ~10MB |
| 5,000 msgs | <300ms | ~50MB |
| 10,000 msgs | <600ms | ~100MB |
Tested on Node.js with typical email patterns (avg 3-5 msgs/thread).
Error Handling
Recoverable Errors
- Invalid Message-IDs: Treated as separate conversations
- Missing parent references: Message becomes root
- Malformed dates: Uses epoch if unparseable
- Circular references: Breaks cycles (not traversed)
Non-Recoverable Errors
- Empty message array: Throws error
- Missing tenantId: Throws error
- Invalid configuration values: Throws error
Use Cases
Email Client Threading
// Display conversation threads in email UI
const threads = result.output.threads;
// Build UI tree from ThreadNode structure
threads.forEach(group => {
renderThreadUI(group.root, {
collapsed: !group.root.isExpanded,
unreadBadge: group.root.unreadCount > 0 ? group.root.unreadCount : null
});
});
Conversation Analytics
// Analyze conversation patterns
const stats = result.output.statistics;
console.log(`Average thread size: ${stats.avgThreadSize}`);
console.log(`Total unread: ${stats.totalUnread}`);
console.log(`Max nesting depth: ${stats.maxDepth}`);
// Per-thread metrics
result.output.threads.forEach(thread => {
console.log(`Thread "${thread.messages[0].subject}"`);
console.log(` Messages: ${thread.messageCount}`);
console.log(` Participants: ${thread.participants.length}`);
console.log(` Duration: ${new Date(thread.endDate).getTime() - new Date(thread.startDate).getTime()}ms`);
});
Unread Management
// Get all unread threads
const unreadThreads = result.output.threads.filter(t => t.unreadCount > 0);
// Get deeply nested unread messages
const findUnreadAtDepth = (node: ThreadNode, depth: number): ThreadNode[] => {
if (node.depth === depth && !node.message.isRead) {
return [node];
}
return node.children.flatMap(c => findUnreadAtDepth(c, depth));
};
Testing
Comprehensive test suite with 40+ test cases:
npm test # Run all tests
npm run test:watch # Watch mode
npm run test:coverage # Generate coverage report
Test Categories
- Basic threading (simple chains)
- Multi-level hierarchies (deep threads)
- Multiple branches (parallel replies)
- Unread tracking (at all levels)
- Orphan handling (detection & resolution)
- Participant extraction (unique addresses)
- Thread state (collapsed/expanded)
- Subject matching (Levenshtein distance)
- Date range tracking (earliest/latest)
- References header parsing (complex chains)
- Performance (1000+ messages)
- Edge cases (circular refs, malformed IDs)
- Configuration validation (all parameters)
Coverage: >80% branches, functions, lines, statements
Workflow Integration
Workflow JSON Example
{
"version": "2.2.0",
"nodes": [
{
"id": "fetch-messages",
"type": "operation",
"op": "imap-sync",
"parameters": {
"imapId": "{{ $json.accountId }}",
"folderId": "{{ $json.folderId }}"
}
},
{
"id": "thread-messages",
"type": "operation",
"op": "message-threading",
"parameters": {
"messages": "{{ $json.messages }}",
"tenantId": "{{ context.tenantId }}",
"expandAll": false,
"resolveOrphans": true,
"orphanLinkingStrategy": "date"
},
"dependsOn": ["fetch-messages"]
},
{
"id": "render-ui",
"type": "operation",
"op": "render-component",
"parameters": {
"component": "ThreadListView",
"props": {
"threads": "{{ $json.threads }}"
}
},
"dependsOn": ["thread-messages"]
}
]
}
References
- RFC 5322: Internet Message Format
- RFC 5256: IMAP4 THREAD Extension (conversation threading algorithm)
- RFC 5322 Obsoletes: RFC 2822, RFC 822
Compatibility
- Node.js: 18+
- TypeScript: 5.0+
- MetaBuilder Workflow: ^3.0.0
License
MIT - See LICENSE in repository root