From b10bef82a9e461b6480615b3e2680266b86754c9 Mon Sep 17 00:00:00 2001 From: johndoe6345789 Date: Sat, 27 Dec 2025 18:38:04 +0000 Subject: [PATCH] refactor: harden websocket bridge lifecycle --- .../websocket-bridge/connection-manager.ts | 90 +++++++++++++++++++ .../bridges/websocket-bridge/connection.ts | 28 ------ .../src/bridges/websocket-bridge/index.ts | 10 ++- .../websocket-bridge/message-handler.ts | 25 ------ .../websocket-bridge/message-router.ts | 68 ++++++++++++++ .../bridges/websocket-bridge/operations.ts | 35 ++++---- .../src/bridges/websocket-bridge/rpc.ts | 25 +++--- 7 files changed, 199 insertions(+), 82 deletions(-) create mode 100644 dbal/development/src/bridges/websocket-bridge/connection-manager.ts delete mode 100644 dbal/development/src/bridges/websocket-bridge/connection.ts delete mode 100644 dbal/development/src/bridges/websocket-bridge/message-handler.ts create mode 100644 dbal/development/src/bridges/websocket-bridge/message-router.ts diff --git a/dbal/development/src/bridges/websocket-bridge/connection-manager.ts b/dbal/development/src/bridges/websocket-bridge/connection-manager.ts new file mode 100644 index 000000000..2e39d36a4 --- /dev/null +++ b/dbal/development/src/bridges/websocket-bridge/connection-manager.ts @@ -0,0 +1,90 @@ +import { DBALError } from '../../core/foundation/errors' +import type { RPCMessage } from '../utils/rpc-types' +import type { BridgeState } from './state' +import type { MessageRouter } from './message-router' + +export interface ConnectionManager { + ensureConnection: () => Promise + send: (message: RPCMessage) => Promise + close: () => Promise +} + +export const createConnectionManager = ( + state: BridgeState, + messageRouter: MessageRouter, +): ConnectionManager => { + let connectionPromise: Promise | null = null + + const resetConnection = () => { + connectionPromise = null + state.ws = null + } + + const rejectPendingRequests = (error: DBALError) => { + state.pendingRequests.forEach(({ reject }) => reject(error)) + state.pendingRequests.clear() + } + + const ensureConnection = async (): Promise => { + if (state.ws?.readyState === WebSocket.OPEN) { + return + } + + if (connectionPromise) { + return connectionPromise + } + + connectionPromise = new Promise((resolve, reject) => { + try { + const ws = new WebSocket(state.endpoint) + state.ws = ws + + ws.onopen = () => resolve() + ws.onerror = error => { + const connectionError = DBALError.internal(`WebSocket connection failed: ${error}`) + rejectPendingRequests(connectionError) + resetConnection() + reject(connectionError) + } + ws.onclose = () => { + rejectPendingRequests(DBALError.internal('WebSocket connection closed')) + resetConnection() + } + ws.onmessage = event => messageRouter.handle(event.data) + } catch (error) { + resetConnection() + const connectionError = + error instanceof DBALError ? error : DBALError.internal('Failed to establish WebSocket connection') + reject(connectionError) + } + }) + + return connectionPromise + } + + const send = async (message: RPCMessage): Promise => { + await ensureConnection() + + if (!state.ws || state.ws.readyState !== WebSocket.OPEN) { + throw DBALError.internal('WebSocket connection not open') + } + + state.ws.send(JSON.stringify(message)) + } + + const close = async (): Promise => { + rejectPendingRequests(DBALError.internal('WebSocket connection closed')) + + if (state.ws) { + state.ws.close() + } + + resetConnection() + } + + return { + ensureConnection, + send, + close, + } +} diff --git a/dbal/development/src/bridges/websocket-bridge/connection.ts b/dbal/development/src/bridges/websocket-bridge/connection.ts deleted file mode 100644 index 9f348f18a..000000000 --- a/dbal/development/src/bridges/websocket-bridge/connection.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { DBALError } from '../../core/foundation/errors' -import { handleMessage } from './message-handler' -import type { BridgeState } from './state' - -export const connect = async (state: BridgeState): Promise => { - if (state.ws?.readyState === WebSocket.OPEN) { - return - } - - return new Promise((resolve, reject) => { - state.ws = new WebSocket(state.endpoint) - - state.ws.onopen = () => resolve() - state.ws.onerror = error => reject(DBALError.internal(`WebSocket connection failed: ${error}`)) - state.ws.onmessage = event => handleMessage(state, event.data) - state.ws.onclose = () => { - state.ws = null - } - }) -} - -export const closeConnection = async (state: BridgeState): Promise => { - if (state.ws) { - state.ws.close() - state.ws = null - } - state.pendingRequests.clear() -} diff --git a/dbal/development/src/bridges/websocket-bridge/index.ts b/dbal/development/src/bridges/websocket-bridge/index.ts index f4ecedcdd..b6f27cbad 100644 --- a/dbal/development/src/bridges/websocket-bridge/index.ts +++ b/dbal/development/src/bridges/websocket-bridge/index.ts @@ -1,16 +1,20 @@ import type { DBALAdapter, AdapterCapabilities } from '../../adapters/adapter' import type { ListOptions, ListResult } from '../../core/types' -import { closeConnection } from './connection' +import { createConnectionManager } from './connection-manager' +import { createMessageRouter } from './message-router' import { createOperations } from './operations' import { createBridgeState } from './state' export class WebSocketBridge implements DBALAdapter { private readonly state: ReturnType + private readonly connectionManager: ReturnType private readonly operations: ReturnType constructor(endpoint: string, auth?: { user: unknown; session: unknown }) { this.state = createBridgeState(endpoint, auth) - this.operations = createOperations(this.state) + const messageRouter = createMessageRouter(this.state) + this.connectionManager = createConnectionManager(this.state, messageRouter) + this.operations = createOperations(this.state, this.connectionManager) } create(entity: string, data: Record): Promise { @@ -75,6 +79,6 @@ export class WebSocketBridge implements DBALAdapter { } async close(): Promise { - await closeConnection(this.state) + await this.connectionManager.close() } } diff --git a/dbal/development/src/bridges/websocket-bridge/message-handler.ts b/dbal/development/src/bridges/websocket-bridge/message-handler.ts deleted file mode 100644 index 78db23362..000000000 --- a/dbal/development/src/bridges/websocket-bridge/message-handler.ts +++ /dev/null @@ -1,25 +0,0 @@ -import type { RPCResponse } from '../utils/rpc-types' -import type { BridgeState } from './state' -import { DBALError } from '../../core/foundation/errors' - -export const handleMessage = (state: BridgeState, data: string): void => { - try { - const response: RPCResponse = JSON.parse(data) - const pending = state.pendingRequests.get(response.id) - - if (!pending) { - return - } - - state.pendingRequests.delete(response.id) - - if (response.error) { - const error = new DBALError(response.error.message, response.error.code, response.error.details) - pending.reject(error) - } else { - pending.resolve(response.result) - } - } catch (error) { - console.error('Failed to parse WebSocket message:', error) - } -} diff --git a/dbal/development/src/bridges/websocket-bridge/message-router.ts b/dbal/development/src/bridges/websocket-bridge/message-router.ts new file mode 100644 index 000000000..0603f2a2a --- /dev/null +++ b/dbal/development/src/bridges/websocket-bridge/message-router.ts @@ -0,0 +1,68 @@ +import { DBALError } from '../../core/foundation/errors' +import type { RPCResponse } from '../utils/rpc-types' +import type { BridgeState } from './state' + +export interface MessageRouter { + handle: (rawMessage: unknown) => void +} + +const isRecord = (value: unknown): value is Record => + typeof value === 'object' && value !== null && !Array.isArray(value) + +const isRPCError = (value: unknown): value is NonNullable => + isRecord(value) && + typeof value.code === 'number' && + typeof value.message === 'string' && + (value.details === undefined || isRecord(value.details)) + +const isRPCResponse = (value: unknown): value is RPCResponse => { + if (!isRecord(value)) { + return false + } + + const hasId = typeof value.id === 'string' + const hasResult = Object.prototype.hasOwnProperty.call(value, 'result') + const hasError = isRPCError(value.error) || value.error === undefined + + return hasId && (hasResult || isRPCError(value.error)) && hasError +} + +const parseResponse = (rawMessage: string): RPCResponse => { + const parsed = JSON.parse(rawMessage) as unknown + + if (!isRPCResponse(parsed)) { + throw new Error('Invalid RPC response shape') + } + + return parsed +} + +export const createMessageRouter = (state: BridgeState): MessageRouter => ({ + handle: (rawMessage: unknown) => { + if (typeof rawMessage !== 'string') { + console.warn('Ignoring non-string WebSocket message') + return + } + + try { + const response = parseResponse(rawMessage) + const pending = state.pendingRequests.get(response.id) + + if (!pending) { + console.warn(`No pending request for response ${response.id}`) + return + } + + state.pendingRequests.delete(response.id) + + if (response.error) { + const error = new DBALError(response.error.message, response.error.code, response.error.details) + pending.reject(error) + } else { + pending.resolve(response.result) + } + } catch (error) { + console.error('Failed to process WebSocket message', error) + } + }, +}) diff --git a/dbal/development/src/bridges/websocket-bridge/operations.ts b/dbal/development/src/bridges/websocket-bridge/operations.ts index 05c9a866b..8519082fe 100644 --- a/dbal/development/src/bridges/websocket-bridge/operations.ts +++ b/dbal/development/src/bridges/websocket-bridge/operations.ts @@ -1,31 +1,36 @@ import type { AdapterCapabilities } from '../../adapters/adapter' import type { ListOptions, ListResult } from '../../core/types' +import type { ConnectionManager } from './connection-manager' import type { BridgeState } from './state' import { rpcCall } from './rpc' -export const createOperations = (state: BridgeState) => ({ - create: (entity: string, data: Record) => rpcCall(state, 'create', entity, data), - read: (entity: string, id: string) => rpcCall(state, 'read', entity, id), - update: (entity: string, id: string, data: Record) => rpcCall(state, 'update', entity, id, data), - delete: (entity: string, id: string) => rpcCall(state, 'delete', entity, id) as Promise, - list: (entity: string, options?: ListOptions) => rpcCall(state, 'list', entity, options) as Promise>, - findFirst: (entity: string, filter?: Record) => rpcCall(state, 'findFirst', entity, filter), - findByField: (entity: string, field: string, value: unknown) => rpcCall(state, 'findByField', entity, field, value), +export const createOperations = (state: BridgeState, connectionManager: ConnectionManager) => ({ + create: (entity: string, data: Record) => rpcCall(state, connectionManager, 'create', entity, data), + read: (entity: string, id: string) => rpcCall(state, connectionManager, 'read', entity, id), + update: (entity: string, id: string, data: Record) => + rpcCall(state, connectionManager, 'update', entity, id, data), + delete: (entity: string, id: string) => rpcCall(state, connectionManager, 'delete', entity, id) as Promise, + list: (entity: string, options?: ListOptions) => + rpcCall(state, connectionManager, 'list', entity, options) as Promise>, + findFirst: (entity: string, filter?: Record) => + rpcCall(state, connectionManager, 'findFirst', entity, filter), + findByField: (entity: string, field: string, value: unknown) => + rpcCall(state, connectionManager, 'findByField', entity, field, value), upsert: ( entity: string, filter: Record, createData: Record, updateData: Record, - ) => rpcCall(state, 'upsert', entity, filter, createData, updateData), + ) => rpcCall(state, connectionManager, 'upsert', entity, filter, createData, updateData), updateByField: (entity: string, field: string, value: unknown, data: Record) => - rpcCall(state, 'updateByField', entity, field, value, data), + rpcCall(state, connectionManager, 'updateByField', entity, field, value, data), deleteByField: (entity: string, field: string, value: unknown) => - rpcCall(state, 'deleteByField', entity, field, value) as Promise, + rpcCall(state, connectionManager, 'deleteByField', entity, field, value) as Promise, deleteMany: (entity: string, filter?: Record) => - rpcCall(state, 'deleteMany', entity, filter) as Promise, + rpcCall(state, connectionManager, 'deleteMany', entity, filter) as Promise, createMany: (entity: string, data: Record[]) => - rpcCall(state, 'createMany', entity, data) as Promise, + rpcCall(state, connectionManager, 'createMany', entity, data) as Promise, updateMany: (entity: string, filter: Record, data: Record) => - rpcCall(state, 'updateMany', entity, filter, data) as Promise, - getCapabilities: () => rpcCall(state, 'getCapabilities') as Promise, + rpcCall(state, connectionManager, 'updateMany', entity, filter, data) as Promise, + getCapabilities: () => rpcCall(state, connectionManager, 'getCapabilities') as Promise, }) diff --git a/dbal/development/src/bridges/websocket-bridge/rpc.ts b/dbal/development/src/bridges/websocket-bridge/rpc.ts index 2462558b4..3de06a550 100644 --- a/dbal/development/src/bridges/websocket-bridge/rpc.ts +++ b/dbal/development/src/bridges/websocket-bridge/rpc.ts @@ -1,25 +1,28 @@ import { DBALError } from '../../core/foundation/errors' import { generateRequestId } from '../utils/generate-request-id' import type { RPCMessage } from '../utils/rpc-types' -import { connect } from './connection' +import type { ConnectionManager } from './connection-manager' import type { BridgeState } from './state' -export const rpcCall = async (state: BridgeState, method: string, ...params: unknown[]): Promise => { - await connect(state) - +export const rpcCall = async ( + state: BridgeState, + connectionManager: ConnectionManager, + method: string, + ...params: unknown[] +): Promise => { const id = generateRequestId() const message: RPCMessage = { id, method, params } return new Promise((resolve, reject) => { state.pendingRequests.set(id, { resolve, reject }) - if (state.ws?.readyState === WebSocket.OPEN) { - state.ws.send(JSON.stringify(message)) - } else { - state.pendingRequests.delete(id) - reject(DBALError.internal('WebSocket connection not open')) - return - } + connectionManager + .send(message) + .catch(error => { + state.pendingRequests.delete(id) + reject(error) + return + }) setTimeout(() => { if (state.pendingRequests.has(id)) {