mirror of
https://github.com/johndoe6345789/metabuilder.git
synced 2026-05-02 09:45:00 +00:00
Merge pull request #235 from johndoe6345789/codex/add-websocket-bridge-lifecycle-and-routing
Refactor websocket bridge lifecycle and routing
This commit is contained in:
@@ -0,0 +1,90 @@
|
||||
import { DBALError } from '../../core/foundation/errors'
|
||||
import type { RPCMessage } from '../utils/rpc-types'
|
||||
import type { BridgeState } from './state'
|
||||
import type { MessageRouter } from './message-router'
|
||||
|
||||
export interface ConnectionManager {
|
||||
ensureConnection: () => Promise<void>
|
||||
send: (message: RPCMessage) => Promise<void>
|
||||
close: () => Promise<void>
|
||||
}
|
||||
|
||||
export const createConnectionManager = (
|
||||
state: BridgeState,
|
||||
messageRouter: MessageRouter,
|
||||
): ConnectionManager => {
|
||||
let connectionPromise: Promise<void> | null = null
|
||||
|
||||
const resetConnection = () => {
|
||||
connectionPromise = null
|
||||
state.ws = null
|
||||
}
|
||||
|
||||
const rejectPendingRequests = (error: DBALError) => {
|
||||
state.pendingRequests.forEach(({ reject }) => reject(error))
|
||||
state.pendingRequests.clear()
|
||||
}
|
||||
|
||||
const ensureConnection = async (): Promise<void> => {
|
||||
if (state.ws?.readyState === WebSocket.OPEN) {
|
||||
return
|
||||
}
|
||||
|
||||
if (connectionPromise) {
|
||||
return connectionPromise
|
||||
}
|
||||
|
||||
connectionPromise = new Promise((resolve, reject) => {
|
||||
try {
|
||||
const ws = new WebSocket(state.endpoint)
|
||||
state.ws = ws
|
||||
|
||||
ws.onopen = () => resolve()
|
||||
ws.onerror = error => {
|
||||
const connectionError = DBALError.internal(`WebSocket connection failed: ${error}`)
|
||||
rejectPendingRequests(connectionError)
|
||||
resetConnection()
|
||||
reject(connectionError)
|
||||
}
|
||||
ws.onclose = () => {
|
||||
rejectPendingRequests(DBALError.internal('WebSocket connection closed'))
|
||||
resetConnection()
|
||||
}
|
||||
ws.onmessage = event => messageRouter.handle(event.data)
|
||||
} catch (error) {
|
||||
resetConnection()
|
||||
const connectionError =
|
||||
error instanceof DBALError ? error : DBALError.internal('Failed to establish WebSocket connection')
|
||||
reject(connectionError)
|
||||
}
|
||||
})
|
||||
|
||||
return connectionPromise
|
||||
}
|
||||
|
||||
const send = async (message: RPCMessage): Promise<void> => {
|
||||
await ensureConnection()
|
||||
|
||||
if (!state.ws || state.ws.readyState !== WebSocket.OPEN) {
|
||||
throw DBALError.internal('WebSocket connection not open')
|
||||
}
|
||||
|
||||
state.ws.send(JSON.stringify(message))
|
||||
}
|
||||
|
||||
const close = async (): Promise<void> => {
|
||||
rejectPendingRequests(DBALError.internal('WebSocket connection closed'))
|
||||
|
||||
if (state.ws) {
|
||||
state.ws.close()
|
||||
}
|
||||
|
||||
resetConnection()
|
||||
}
|
||||
|
||||
return {
|
||||
ensureConnection,
|
||||
send,
|
||||
close,
|
||||
}
|
||||
}
|
||||
@@ -1,28 +0,0 @@
|
||||
import { DBALError } from '../../core/foundation/errors'
|
||||
import { handleMessage } from './message-handler'
|
||||
import type { BridgeState } from './state'
|
||||
|
||||
export const connect = async (state: BridgeState): Promise<void> => {
|
||||
if (state.ws?.readyState === WebSocket.OPEN) {
|
||||
return
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
state.ws = new WebSocket(state.endpoint)
|
||||
|
||||
state.ws.onopen = () => resolve()
|
||||
state.ws.onerror = error => reject(DBALError.internal(`WebSocket connection failed: ${error}`))
|
||||
state.ws.onmessage = event => handleMessage(state, event.data)
|
||||
state.ws.onclose = () => {
|
||||
state.ws = null
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const closeConnection = async (state: BridgeState): Promise<void> => {
|
||||
if (state.ws) {
|
||||
state.ws.close()
|
||||
state.ws = null
|
||||
}
|
||||
state.pendingRequests.clear()
|
||||
}
|
||||
@@ -1,16 +1,20 @@
|
||||
import type { DBALAdapter, AdapterCapabilities } from '../../adapters/adapter'
|
||||
import type { ListOptions, ListResult } from '../../core/types'
|
||||
import { closeConnection } from './connection'
|
||||
import { createConnectionManager } from './connection-manager'
|
||||
import { createMessageRouter } from './message-router'
|
||||
import { createOperations } from './operations'
|
||||
import { createBridgeState } from './state'
|
||||
|
||||
export class WebSocketBridge implements DBALAdapter {
|
||||
private readonly state: ReturnType<typeof createBridgeState>
|
||||
private readonly connectionManager: ReturnType<typeof createConnectionManager>
|
||||
private readonly operations: ReturnType<typeof createOperations>
|
||||
|
||||
constructor(endpoint: string, auth?: { user: unknown; session: unknown }) {
|
||||
this.state = createBridgeState(endpoint, auth)
|
||||
this.operations = createOperations(this.state)
|
||||
const messageRouter = createMessageRouter(this.state)
|
||||
this.connectionManager = createConnectionManager(this.state, messageRouter)
|
||||
this.operations = createOperations(this.state, this.connectionManager)
|
||||
}
|
||||
|
||||
create(entity: string, data: Record<string, unknown>): Promise<unknown> {
|
||||
@@ -75,6 +79,6 @@ export class WebSocketBridge implements DBALAdapter {
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
await closeConnection(this.state)
|
||||
await this.connectionManager.close()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
import type { RPCResponse } from '../utils/rpc-types'
|
||||
import type { BridgeState } from './state'
|
||||
import { DBALError } from '../../core/foundation/errors'
|
||||
|
||||
export const handleMessage = (state: BridgeState, data: string): void => {
|
||||
try {
|
||||
const response: RPCResponse = JSON.parse(data)
|
||||
const pending = state.pendingRequests.get(response.id)
|
||||
|
||||
if (!pending) {
|
||||
return
|
||||
}
|
||||
|
||||
state.pendingRequests.delete(response.id)
|
||||
|
||||
if (response.error) {
|
||||
const error = new DBALError(response.error.message, response.error.code, response.error.details)
|
||||
pending.reject(error)
|
||||
} else {
|
||||
pending.resolve(response.result)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Failed to parse WebSocket message:', error)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
import { DBALError } from '../../core/foundation/errors'
|
||||
import type { RPCResponse } from '../utils/rpc-types'
|
||||
import type { BridgeState } from './state'
|
||||
|
||||
export interface MessageRouter {
|
||||
handle: (rawMessage: unknown) => void
|
||||
}
|
||||
|
||||
const isRecord = (value: unknown): value is Record<string, unknown> =>
|
||||
typeof value === 'object' && value !== null && !Array.isArray(value)
|
||||
|
||||
const isRPCError = (value: unknown): value is NonNullable<RPCResponse['error']> =>
|
||||
isRecord(value) &&
|
||||
typeof value.code === 'number' &&
|
||||
typeof value.message === 'string' &&
|
||||
(value.details === undefined || isRecord(value.details))
|
||||
|
||||
const isRPCResponse = (value: unknown): value is RPCResponse => {
|
||||
if (!isRecord(value)) {
|
||||
return false
|
||||
}
|
||||
|
||||
const hasId = typeof value.id === 'string'
|
||||
const hasResult = Object.prototype.hasOwnProperty.call(value, 'result')
|
||||
const hasError = isRPCError(value.error) || value.error === undefined
|
||||
|
||||
return hasId && (hasResult || isRPCError(value.error)) && hasError
|
||||
}
|
||||
|
||||
const parseResponse = (rawMessage: string): RPCResponse => {
|
||||
const parsed = JSON.parse(rawMessage) as unknown
|
||||
|
||||
if (!isRPCResponse(parsed)) {
|
||||
throw new Error('Invalid RPC response shape')
|
||||
}
|
||||
|
||||
return parsed
|
||||
}
|
||||
|
||||
export const createMessageRouter = (state: BridgeState): MessageRouter => ({
|
||||
handle: (rawMessage: unknown) => {
|
||||
if (typeof rawMessage !== 'string') {
|
||||
console.warn('Ignoring non-string WebSocket message')
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const response = parseResponse(rawMessage)
|
||||
const pending = state.pendingRequests.get(response.id)
|
||||
|
||||
if (!pending) {
|
||||
console.warn(`No pending request for response ${response.id}`)
|
||||
return
|
||||
}
|
||||
|
||||
state.pendingRequests.delete(response.id)
|
||||
|
||||
if (response.error) {
|
||||
const error = new DBALError(response.error.message, response.error.code, response.error.details)
|
||||
pending.reject(error)
|
||||
} else {
|
||||
pending.resolve(response.result)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Failed to process WebSocket message', error)
|
||||
}
|
||||
},
|
||||
})
|
||||
@@ -1,31 +1,36 @@
|
||||
import type { AdapterCapabilities } from '../../adapters/adapter'
|
||||
import type { ListOptions, ListResult } from '../../core/types'
|
||||
import type { ConnectionManager } from './connection-manager'
|
||||
import type { BridgeState } from './state'
|
||||
import { rpcCall } from './rpc'
|
||||
|
||||
export const createOperations = (state: BridgeState) => ({
|
||||
create: (entity: string, data: Record<string, unknown>) => rpcCall(state, 'create', entity, data),
|
||||
read: (entity: string, id: string) => rpcCall(state, 'read', entity, id),
|
||||
update: (entity: string, id: string, data: Record<string, unknown>) => rpcCall(state, 'update', entity, id, data),
|
||||
delete: (entity: string, id: string) => rpcCall(state, 'delete', entity, id) as Promise<boolean>,
|
||||
list: (entity: string, options?: ListOptions) => rpcCall(state, 'list', entity, options) as Promise<ListResult<unknown>>,
|
||||
findFirst: (entity: string, filter?: Record<string, unknown>) => rpcCall(state, 'findFirst', entity, filter),
|
||||
findByField: (entity: string, field: string, value: unknown) => rpcCall(state, 'findByField', entity, field, value),
|
||||
export const createOperations = (state: BridgeState, connectionManager: ConnectionManager) => ({
|
||||
create: (entity: string, data: Record<string, unknown>) => rpcCall(state, connectionManager, 'create', entity, data),
|
||||
read: (entity: string, id: string) => rpcCall(state, connectionManager, 'read', entity, id),
|
||||
update: (entity: string, id: string, data: Record<string, unknown>) =>
|
||||
rpcCall(state, connectionManager, 'update', entity, id, data),
|
||||
delete: (entity: string, id: string) => rpcCall(state, connectionManager, 'delete', entity, id) as Promise<boolean>,
|
||||
list: (entity: string, options?: ListOptions) =>
|
||||
rpcCall(state, connectionManager, 'list', entity, options) as Promise<ListResult<unknown>>,
|
||||
findFirst: (entity: string, filter?: Record<string, unknown>) =>
|
||||
rpcCall(state, connectionManager, 'findFirst', entity, filter),
|
||||
findByField: (entity: string, field: string, value: unknown) =>
|
||||
rpcCall(state, connectionManager, 'findByField', entity, field, value),
|
||||
upsert: (
|
||||
entity: string,
|
||||
filter: Record<string, unknown>,
|
||||
createData: Record<string, unknown>,
|
||||
updateData: Record<string, unknown>,
|
||||
) => rpcCall(state, 'upsert', entity, filter, createData, updateData),
|
||||
) => rpcCall(state, connectionManager, 'upsert', entity, filter, createData, updateData),
|
||||
updateByField: (entity: string, field: string, value: unknown, data: Record<string, unknown>) =>
|
||||
rpcCall(state, 'updateByField', entity, field, value, data),
|
||||
rpcCall(state, connectionManager, 'updateByField', entity, field, value, data),
|
||||
deleteByField: (entity: string, field: string, value: unknown) =>
|
||||
rpcCall(state, 'deleteByField', entity, field, value) as Promise<boolean>,
|
||||
rpcCall(state, connectionManager, 'deleteByField', entity, field, value) as Promise<boolean>,
|
||||
deleteMany: (entity: string, filter?: Record<string, unknown>) =>
|
||||
rpcCall(state, 'deleteMany', entity, filter) as Promise<number>,
|
||||
rpcCall(state, connectionManager, 'deleteMany', entity, filter) as Promise<number>,
|
||||
createMany: (entity: string, data: Record<string, unknown>[]) =>
|
||||
rpcCall(state, 'createMany', entity, data) as Promise<number>,
|
||||
rpcCall(state, connectionManager, 'createMany', entity, data) as Promise<number>,
|
||||
updateMany: (entity: string, filter: Record<string, unknown>, data: Record<string, unknown>) =>
|
||||
rpcCall(state, 'updateMany', entity, filter, data) as Promise<number>,
|
||||
getCapabilities: () => rpcCall(state, 'getCapabilities') as Promise<AdapterCapabilities>,
|
||||
rpcCall(state, connectionManager, 'updateMany', entity, filter, data) as Promise<number>,
|
||||
getCapabilities: () => rpcCall(state, connectionManager, 'getCapabilities') as Promise<AdapterCapabilities>,
|
||||
})
|
||||
|
||||
@@ -1,25 +1,28 @@
|
||||
import { DBALError } from '../../core/foundation/errors'
|
||||
import { generateRequestId } from '../utils/generate-request-id'
|
||||
import type { RPCMessage } from '../utils/rpc-types'
|
||||
import { connect } from './connection'
|
||||
import type { ConnectionManager } from './connection-manager'
|
||||
import type { BridgeState } from './state'
|
||||
|
||||
export const rpcCall = async (state: BridgeState, method: string, ...params: unknown[]): Promise<unknown> => {
|
||||
await connect(state)
|
||||
|
||||
export const rpcCall = async (
|
||||
state: BridgeState,
|
||||
connectionManager: ConnectionManager,
|
||||
method: string,
|
||||
...params: unknown[]
|
||||
): Promise<unknown> => {
|
||||
const id = generateRequestId()
|
||||
const message: RPCMessage = { id, method, params }
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
state.pendingRequests.set(id, { resolve, reject })
|
||||
|
||||
if (state.ws?.readyState === WebSocket.OPEN) {
|
||||
state.ws.send(JSON.stringify(message))
|
||||
} else {
|
||||
state.pendingRequests.delete(id)
|
||||
reject(DBALError.internal('WebSocket connection not open'))
|
||||
return
|
||||
}
|
||||
connectionManager
|
||||
.send(message)
|
||||
.catch(error => {
|
||||
state.pendingRequests.delete(id)
|
||||
reject(error)
|
||||
return
|
||||
})
|
||||
|
||||
setTimeout(() => {
|
||||
if (state.pendingRequests.has(id)) {
|
||||
|
||||
Reference in New Issue
Block a user