From 9eb02bbbaa620eda7a01e351af48501e60d28699 Mon Sep 17 00:00:00 2001 From: Jibles Date: Tue, 19 Aug 2025 12:19:29 +1000 Subject: [PATCH 1/5] feat: POC proxy for coincap --- node/proxy/api/src/app.ts | 24 ++- node/proxy/api/src/marketData.ts | 291 +++++++++++++++++++++++++++++++ 2 files changed, 314 insertions(+), 1 deletion(-) create mode 100644 node/proxy/api/src/marketData.ts diff --git a/node/proxy/api/src/app.ts b/node/proxy/api/src/app.ts index 500471136..b88145f03 100644 --- a/node/proxy/api/src/app.ts +++ b/node/proxy/api/src/app.ts @@ -7,6 +7,7 @@ import { RegisterRoutes } from './routes' import { CoinGecko } from './coingecko' import { Zerion } from './zerion' import { Zrx } from './zrx' +import { MarketDataWebSocket } from './marketData' const PORT = process.env.PORT ?? 3000 @@ -50,4 +51,25 @@ app.get('/', async (_, res) => { app.use(middleware.errorHandler, middleware.notFoundHandler) -app.listen(PORT, () => logger.info('Server started')) +const server = app.listen(PORT, () => logger.info('Server started')) + +const marketDataWS = new MarketDataWebSocket(logger, process.env.MARKET_DATA_PROVIDER, process.env.MARKET_DATA_API_KEY) +marketDataWS.setupWebSocketServer(server) + +process.on('SIGTERM', () => { + logger.info('Received SIGTERM, shutting down gracefully') + marketDataWS.disconnect() + server.close(() => { + logger.info('Server closed') + process.exit(0) + }) +}) + +process.on('SIGINT', () => { + logger.info('Received SIGINT, shutting down gracefully') + marketDataWS.disconnect() + server.close(() => { + logger.info('Server closed') + process.exit(0) + }) +}) diff --git a/node/proxy/api/src/marketData.ts b/node/proxy/api/src/marketData.ts new file mode 100644 index 000000000..a4adc15ba --- /dev/null +++ b/node/proxy/api/src/marketData.ts @@ -0,0 +1,291 @@ +import { Logger } from '@shapeshiftoss/logger' +import WebSocket, { Server } from 'ws' +import { v4 } from 'uuid' +import { Server as HttpServer } from 'http' +import { IncomingMessage } from 'http' +import { URL } from 'url' + +export interface MarketDataProviderConfig { + name: string + wsBaseUrl: string + apiKey?: string + // How to construct the WebSocket URL for this provider + buildUrl: (baseUrl: string, apiKey?: string) => string + parseMessage: (data: unknown) => Record | null +} + +export interface MarketDataMessage { + type: 'price_update' + source: string + data: Record + timestamp: number +} + +export interface MarketDataClient { + id: string + websocket: WebSocket + assets: string[] +} + +const PROVIDERS: Record = { + coincap: { + name: 'CoinCap', + wsBaseUrl: 'wss://wss.coincap.io/prices', + buildUrl: (baseUrl: string, apiKey?: string) => { + const params = new URLSearchParams({ assets: 'ALL' }) + if (apiKey) params.set('apiKey', apiKey) + return `${baseUrl}?${params.toString()}` + }, + parseMessage: (data: unknown) => { + // CoinCap sends data directly as asset price object + return typeof data === 'object' && data !== null ? (data as Record) : null + }, + }, +} + +export class MarketDataWebSocket { + private clients = new Map() + private socket?: WebSocket + private reconnectTimeout?: NodeJS.Timeout + private readonly logger: Logger + private readonly provider: MarketDataProviderConfig + private readonly wsUrl: string + private readonly reconnectDelayMs = 5000 + + constructor(logger: Logger, providerName = 'coincap', apiKey?: string) { + this.logger = logger.child({ namespace: ['proxy', 'marketData'] }) + + this.provider = PROVIDERS[providerName] + if (!this.provider) { + throw new Error(`Unknown market data provider: ${providerName}. Available: ${Object.keys(PROVIDERS).join(', ')}`) + } + + this.wsUrl = this.provider.buildUrl(this.provider.wsBaseUrl, apiKey) + this.logger.info({ provider: this.provider.name, url: this.wsUrl }, 'Market data provider configured') + } + + private connect(): void { + if (this.socket?.readyState === WebSocket.OPEN) { + return + } + + this.logger.info({ provider: this.provider.name, url: this.wsUrl }, 'Connecting to market data provider') + this.socket = new WebSocket(this.wsUrl) + + this.socket.onopen = () => { + this.logger.info({ provider: this.provider.name }, 'Connected to market data provider') + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = undefined + } + } + + this.socket.onmessage = (event) => { + this.handleMessage(event) + } + + this.socket.onclose = (event) => { + this.logger.warn( + { + provider: this.provider.name, + code: event.code, + reason: event.reason, + }, + 'Market data provider WebSocket closed' + ) + this.scheduleReconnect() + } + + this.socket.onerror = (error) => { + this.logger.error({ provider: this.provider.name, error }, 'Market data provider WebSocket error') + } + } + + private handleMessage(event: WebSocket.MessageEvent): void { + try { + const rawData = JSON.parse(event.data.toString()) + const parsedData = this.provider.parseMessage(rawData) + + if (!parsedData) { + return + } + + const message: MarketDataMessage = { + type: 'price_update', + source: this.provider.name.toLowerCase(), + data: parsedData, + timestamp: Date.now(), + } + + this.broadcastToClients(message) + } catch (error) { + this.logger.error( + { + provider: this.provider.name, + error, + data: event.data, + }, + 'Error processing provider message' + ) + } + } + + private broadcastToClients(message: MarketDataMessage): void { + for (const [clientId, client] of this.clients) { + try { + if (client.websocket.readyState === WebSocket.OPEN) { + // Filter data to only include assets this client requested + const filteredData: Record = {} + for (const asset of client.assets) { + if (message.data[asset] !== undefined) { + filteredData[asset] = message.data[asset] + } + } + + // Only send if there's relevant data for this client + if (Object.keys(filteredData).length > 0) { + const filteredMessage: MarketDataMessage = { + ...message, + data: filteredData, + } + client.websocket.send(JSON.stringify(filteredMessage)) + } + } else { + this.removeClient(clientId) + } + } catch (error) { + this.logger.error({ clientId, error }, 'Error sending message to client') + this.removeClient(clientId) + } + } + } + + private scheduleReconnect(): void { + if (this.reconnectTimeout) { + return + } + + // Only reconnect if we still have clients + this.reconnectTimeout = setTimeout(() => { + if (this.clients.size > 0) { + this.logger.info({ provider: this.provider.name }, 'Attempting to reconnect to market data provider') + this.connect() + } else { + this.logger.info('No clients connected, skipping provider reconnect') + } + }, this.reconnectDelayMs) + } + + private disconnectFromProvider(): void { + this.logger.info({ provider: this.provider.name }, 'Disconnecting from market data provider - no more clients') + + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = undefined + } + + if (this.socket) { + this.socket.close() + this.socket = undefined + } + } + + addClient(websocket: WebSocket, clientId: string, requestedAssets: string[]): void { + const client: MarketDataClient = { + id: clientId, + websocket, + assets: requestedAssets, + } + + const wasEmpty = this.clients.size === 0 + this.clients.set(clientId, client) + + // Connect to provider if this is our first client + if (wasEmpty) { + this.logger.info({ provider: this.provider.name }, 'First client connected, establishing provider connection') + this.connect() + } + + this.logger.debug( + { + clientId, + requestedAssets, + totalClients: this.clients.size, + }, + 'Client connected to market data' + ) + + websocket.on('close', () => { + this.removeClient(clientId) + }) + + websocket.on('error', (error) => { + this.logger.error({ clientId, error }, 'Client WebSocket error') + this.removeClient(clientId) + }) + } + + private removeClient(clientId: string): void { + if (this.clients.has(clientId)) { + this.clients.delete(clientId) + this.logger.debug({ clientId, totalClients: this.clients.size }, 'Client disconnected from market data') + + // Disconnect from provider if no more clients + if (this.clients.size === 0) { + this.disconnectFromProvider() + } + } + } + + /** + * Parse assets from WebSocket URL query parameters + * Handles both full URLs and relative paths from different environments + */ + private parseAssetsFromUrl(request: IncomingMessage): string[] { + try { + const rawUrl = request.url || '' + // Use dummy base to handle relative URLs like "/ws/market-data?assets=bitcoin" + const url = new URL(rawUrl, 'ws://dummy.com') + const assetsParam = url.searchParams.get('assets') + + if (!assetsParam) { + return [] + } + + return assetsParam + .split(',') + .map((asset) => asset.trim().toLowerCase()) + .filter((asset) => asset.length > 0) + } catch (error) { + this.logger.error({ error, url: request.url }, 'Error parsing assets from URL') + return [] + } + } + + setupWebSocketServer(server: HttpServer, path = '/market-data'): Server { + const wsServer = new Server({ server, path }) + + wsServer.on('connection', (ws, request) => { + const clientId = v4() + const requestedAssets = this.parseAssetsFromUrl(request) + this.addClient(ws, clientId, requestedAssets) + }) + + this.logger.info({ path }, 'WebSocket server setup complete') + return wsServer + } + + disconnect(): void { + this.logger.info('Shutting down market data WebSocket') + + // Disconnect from provider + this.disconnectFromProvider() + + // Close all client connections + for (const [, client] of this.clients) { + client.websocket.close() + } + + this.clients.clear() + } +} From efe8cbdcd1adbed9272311309b8802925440b23b Mon Sep 17 00:00:00 2001 From: kaladinlight <35275952+kaladinlight@users.noreply.github.com> Date: Mon, 25 Aug 2025 12:02:48 -0600 Subject: [PATCH 2/5] suggestion: market data websocket proxy --- .../coinstacks/arbitrum-nova/api/package.json | 3 +- node/coinstacks/arbitrum-nova/api/src/app.ts | 2 +- node/coinstacks/arbitrum/api/package.json | 3 +- node/coinstacks/arbitrum/api/src/app.ts | 2 +- node/coinstacks/avalanche/api/package.json | 3 +- node/coinstacks/avalanche/api/src/app.ts | 2 +- node/coinstacks/base/api/package.json | 3 +- node/coinstacks/base/api/src/app.ts | 2 +- node/coinstacks/bitcoin/api/package.json | 1 + node/coinstacks/bitcoin/api/src/app.ts | 10 +- node/coinstacks/bitcoincash/api/package.json | 1 + node/coinstacks/bitcoincash/api/src/app.ts | 10 +- .../coinstacks/bnbsmartchain/api/package.json | 3 +- node/coinstacks/bnbsmartchain/api/src/app.ts | 2 +- node/coinstacks/common/api/package.json | 1 + node/coinstacks/common/api/src/index.ts | 1 - node/coinstacks/common/api/src/middleware.ts | 2 +- node/coinstacks/common/api/src/registry.ts | 2 +- node/coinstacks/common/api/src/websocket.ts | 140 ++------ node/coinstacks/dogecoin/api/package.json | 1 + node/coinstacks/dogecoin/api/src/app.ts | 10 +- node/coinstacks/ethereum/api/package.json | 3 +- node/coinstacks/ethereum/api/src/app.ts | 2 +- node/coinstacks/gnosis/api/package.json | 3 +- node/coinstacks/gnosis/api/src/app.ts | 2 +- node/coinstacks/litecoin/api/package.json | 1 + node/coinstacks/litecoin/api/src/app.ts | 10 +- node/coinstacks/optimism/api/package.json | 3 +- node/coinstacks/optimism/api/src/app.ts | 2 +- node/coinstacks/polygon/api/package.json | 3 +- node/coinstacks/polygon/api/src/app.ts | 2 +- node/coinstacks/solana/api/package.json | 1 + node/coinstacks/solana/api/src/app.ts | 3 +- node/coinstacks/solana/api/src/websocket.ts | 10 +- node/packages/blockbook/src/websocket.ts | 12 +- node/packages/prometheus/package.json | 20 ++ node/packages/prometheus/src/index.ts | 1 + .../prometheus}/src/prometheus.ts | 0 node/packages/prometheus/tsconfig.json | 9 + node/packages/websocket/package.json | 1 + .../websocket/src/connectionHandler.ts | 109 +++++++ node/packages/websocket/src/index.ts | 1 + node/packages/websocket/src/websocket.ts | 41 +-- node/proxy/api/src/app.ts | 31 +- node/proxy/api/src/coincap.ts | 58 ++++ node/proxy/api/src/marketData.ts | 300 +++--------------- node/proxy/sample.env | 1 + package.json | 1 - 48 files changed, 370 insertions(+), 464 deletions(-) create mode 100644 node/packages/prometheus/package.json create mode 100644 node/packages/prometheus/src/index.ts rename node/{coinstacks/common/api => packages/prometheus}/src/prometheus.ts (100%) create mode 100644 node/packages/prometheus/tsconfig.json create mode 100644 node/packages/websocket/src/connectionHandler.ts create mode 100644 node/proxy/api/src/coincap.ts diff --git a/node/coinstacks/arbitrum-nova/api/package.json b/node/coinstacks/arbitrum-nova/api/package.json index 823c79070..7a15b6f1b 100644 --- a/node/coinstacks/arbitrum-nova/api/package.json +++ b/node/coinstacks/arbitrum-nova/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/arbitrum-nova/api/src/app.ts b/node/coinstacks/arbitrum-nova/api/src/app.ts index 07b238073..f30ece82f 100644 --- a/node/coinstacks/arbitrum-nova/api/src/app.ts +++ b/node/coinstacks/arbitrum-nova/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, BlockHandler, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses, NewBlock } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/arbitrum/api/package.json b/node/coinstacks/arbitrum/api/package.json index 5f4f80346..631c4dbd4 100644 --- a/node/coinstacks/arbitrum/api/package.json +++ b/node/coinstacks/arbitrum/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/arbitrum/api/src/app.ts b/node/coinstacks/arbitrum/api/src/app.ts index abc56561a..a0e9af6a9 100644 --- a/node/coinstacks/arbitrum/api/src/app.ts +++ b/node/coinstacks/arbitrum/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, BlockHandler, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses, NewBlock } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/avalanche/api/package.json b/node/coinstacks/avalanche/api/package.json index 47aa55671..2fd17614a 100644 --- a/node/coinstacks/avalanche/api/package.json +++ b/node/coinstacks/avalanche/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/avalanche/api/src/app.ts b/node/coinstacks/avalanche/api/src/app.ts index 9ac1c7c85..100a5c112 100644 --- a/node/coinstacks/avalanche/api/src/app.ts +++ b/node/coinstacks/avalanche/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, BlockHandler, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses, NewBlock } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/base/api/package.json b/node/coinstacks/base/api/package.json index 912865df9..e7b186a78 100644 --- a/node/coinstacks/base/api/package.json +++ b/node/coinstacks/base/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/base/api/src/app.ts b/node/coinstacks/base/api/src/app.ts index aedba6abb..dfd322b0b 100644 --- a/node/coinstacks/base/api/src/app.ts +++ b/node/coinstacks/base/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, AddressFormatter, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/bitcoin/api/package.json b/node/coinstacks/bitcoin/api/package.json index beedb0969..e3e171caa 100644 --- a/node/coinstacks/bitcoin/api/package.json +++ b/node/coinstacks/bitcoin/api/package.json @@ -14,6 +14,7 @@ "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bech32": "^2.0.0" } } diff --git a/node/coinstacks/bitcoin/api/src/app.ts b/node/coinstacks/bitcoin/api/src/app.ts index 3a3fa086d..a20c48082 100644 --- a/node/coinstacks/bitcoin/api/src/app.ts +++ b/node/coinstacks/bitcoin/api/src/app.ts @@ -3,16 +3,10 @@ import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' import { Logger } from '@shapeshiftoss/logger' -import { - middleware, - ConnectionHandler, - Registry, - BlockHandler, - TransactionHandler, - Prometheus, -} from '@shapeshiftoss/common-api' +import { middleware, ConnectionHandler, Registry, BlockHandler, TransactionHandler } from '@shapeshiftoss/common-api' import { getAddresses, NewBlock, Tx as BlockbookTx, WebsocketClient } from '@shapeshiftoss/blockbook' import { utxo } from '@shapeshiftoss/common-api' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service, formatAddress } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/bitcoincash/api/package.json b/node/coinstacks/bitcoincash/api/package.json index 8417484eb..51e55c3b8 100644 --- a/node/coinstacks/bitcoincash/api/package.json +++ b/node/coinstacks/bitcoincash/api/package.json @@ -14,6 +14,7 @@ "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bech32": "^2.0.0" } } diff --git a/node/coinstacks/bitcoincash/api/src/app.ts b/node/coinstacks/bitcoincash/api/src/app.ts index f73750b43..ca06a8d30 100644 --- a/node/coinstacks/bitcoincash/api/src/app.ts +++ b/node/coinstacks/bitcoincash/api/src/app.ts @@ -3,16 +3,10 @@ import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' import { Logger } from '@shapeshiftoss/logger' -import { - middleware, - ConnectionHandler, - Registry, - BlockHandler, - TransactionHandler, - Prometheus, -} from '@shapeshiftoss/common-api' +import { middleware, ConnectionHandler, Registry, BlockHandler, TransactionHandler } from '@shapeshiftoss/common-api' import { getAddresses, NewBlock, Tx as BlockbookTx, WebsocketClient } from '@shapeshiftoss/blockbook' import { utxo } from '@shapeshiftoss/common-api' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service, formatAddress } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/bnbsmartchain/api/package.json b/node/coinstacks/bnbsmartchain/api/package.json index 92024710e..9e30b0342 100644 --- a/node/coinstacks/bnbsmartchain/api/package.json +++ b/node/coinstacks/bnbsmartchain/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/bnbsmartchain/api/src/app.ts b/node/coinstacks/bnbsmartchain/api/src/app.ts index 5c70b4bb8..cc89de26b 100644 --- a/node/coinstacks/bnbsmartchain/api/src/app.ts +++ b/node/coinstacks/bnbsmartchain/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, BlockHandler, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses, NewBlock } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/common/api/package.json b/node/coinstacks/common/api/package.json index ef52a3bab..523a268a8 100644 --- a/node/coinstacks/common/api/package.json +++ b/node/coinstacks/common/api/package.json @@ -15,6 +15,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "@shapeshiftoss/websocket": "^10.0.0", "bignumber.js": "^9.1.2", "uuid": "^9.0.1", diff --git a/node/coinstacks/common/api/src/index.ts b/node/coinstacks/common/api/src/index.ts index 1f0816534..5a44186df 100644 --- a/node/coinstacks/common/api/src/index.ts +++ b/node/coinstacks/common/api/src/index.ts @@ -4,7 +4,6 @@ export * from './models' export * as middleware from './middleware' export * from './websocket' export * from './registry' -export * from './prometheus' export * from './utils' export * as evm from './evm' diff --git a/node/coinstacks/common/api/src/middleware.ts b/node/coinstacks/common/api/src/middleware.ts index 3ab57a506..97ff584b3 100644 --- a/node/coinstacks/common/api/src/middleware.ts +++ b/node/coinstacks/common/api/src/middleware.ts @@ -1,10 +1,10 @@ +import { Prometheus } from '@shapeshiftoss/prometheus' import { json, urlencoded, NextFunction, Request, Response } from 'express' import compression from 'compression' import morgan from 'morgan' import cors from 'cors' import { ValidateError } from 'tsoa' import { ApiError, NotFoundError } from '.' -import { Prometheus } from './prometheus' export function errorHandler(err: Error, req: Request, res: Response, next: NextFunction): Response | void { if (err instanceof ValidateError) { diff --git a/node/coinstacks/common/api/src/registry.ts b/node/coinstacks/common/api/src/registry.ts index 50bc90108..142f51804 100644 --- a/node/coinstacks/common/api/src/registry.ts +++ b/node/coinstacks/common/api/src/registry.ts @@ -154,7 +154,7 @@ export class Registry { for (const [id, connection] of this.addresses[address].entries()) { const { subscriptionId } = Registry.fromId(id) - connection.publish(subscriptionId, address, tx) + connection.publish(subscriptionId, { address, data: tx }) } }) } diff --git a/node/coinstacks/common/api/src/websocket.ts b/node/coinstacks/common/api/src/websocket.ts index 99a85d812..915e4950c 100644 --- a/node/coinstacks/common/api/src/websocket.ts +++ b/node/coinstacks/common/api/src/websocket.ts @@ -1,22 +1,9 @@ import { Logger } from '@shapeshiftoss/logger' -import { WebsocketClient } from '@shapeshiftoss/websocket' -import { v4 } from 'uuid' +import { Prometheus } from '@shapeshiftoss/prometheus' +import { AddressSubscriptionWebsocketClient, BaseConnectionHandler } from '@shapeshiftoss/websocket' import WebSocket from 'ws' -import { Prometheus } from './prometheus' import { Registry } from './registry' -export interface RequestPayload { - subscriptionId: string - method: 'subscribe' | 'unsubscribe' | 'ping' - data?: TxsTopicData -} - -export interface ErrorResponse { - subscriptionId: string - type: 'error' - message: string -} - export type Topics = 'txs' export interface TxsTopicData { @@ -24,12 +11,6 @@ export interface TxsTopicData { addresses: Array } -export interface MessageResponse { - address: string - data: unknown - subscriptionId: string -} - export interface Methods { // eslint-disable-next-line @typescript-eslint/no-explicit-any subscribe: (subscriptionId: string, data?: any) => void @@ -37,128 +18,78 @@ export interface Methods { unsubscribe: (subscriptionId: string, data?: any) => void } -export class ConnectionHandler { - public readonly clientId: string +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function isTxsTopicData(data: any): data is TxsTopicData { + return data && 'topic' in data +} - private readonly websocket: WebSocket +export class ConnectionHandler extends BaseConnectionHandler { private readonly registry: Registry - private readonly client: WebsocketClient - private readonly prometheus: Prometheus - private readonly logger: Logger + private readonly client: AddressSubscriptionWebsocketClient private readonly routes: Record - private readonly pingIntervalMs = 10000 - - private pingTimeout?: NodeJS.Timeout - private subscriptionIds = new Set() private constructor( websocket: WebSocket, registry: Registry, - client: WebsocketClient, + client: AddressSubscriptionWebsocketClient, prometheus: Prometheus, logger: Logger ) { - this.clientId = v4() + super(websocket, prometheus, logger) + this.registry = registry this.client = client - this.prometheus = prometheus - this.logger = logger.child({ namespace: ['websocket'] }) this.routes = { txs: { subscribe: (subscriptionId: string, data?: TxsTopicData) => this.handleSubscribeTxs(subscriptionId, data), unsubscribe: (subscriptionId: string, data?: TxsTopicData) => this.handleUnsubscribeTxs(subscriptionId, data), }, } - - this.pingTimeout = undefined - this.prometheus.metrics.websocketCount.inc() - this.websocket = websocket - this.websocket.ping() - - const pingInterval = setInterval(() => { - this.websocket.ping() - }, this.pingIntervalMs) - - this.websocket.onerror = (error) => { - this.logger.error({ clientId: this.clientId, error, fn: 'ws.onerror' }, 'websocket error') - this.close(pingInterval) - } - this.websocket.onclose = ({ code, reason }) => { - this.prometheus.metrics.websocketCount.dec() - this.logger.debug({ clientId: this.clientId, code, reason, fn: 'ws.close' }, 'websocket closed') - this.close(pingInterval) - } - this.websocket.on('pong', () => this.heartbeat()) - this.websocket.on('ping', () => this.websocket.pong()) - this.websocket.onmessage = (event) => this.onMessage(event) } static start( websocket: WebSocket, registry: Registry, - client: WebsocketClient, + client: AddressSubscriptionWebsocketClient, prometheus: Prometheus, logger: Logger ): void { new ConnectionHandler(websocket, registry, client, prometheus, logger) } - private heartbeat(): void { - if (this.pingTimeout) { - clearTimeout(this.pingTimeout) + onSubscribe(subscriptionId: string, data?: unknown): void { + if (!isTxsTopicData(data)) { + this.sendError(`no topic specified for subscribe`, subscriptionId) + return } - this.pingTimeout = setTimeout(() => { - this.logger.debug({ clientId: this.clientId, fn: 'pingTimeout' }, 'heartbeat failed') - this.websocket.terminate() - }, this.pingIntervalMs + 1000) + const callback = this.routes[data.topic].subscribe + if (callback) { + callback(subscriptionId, data) + } else { + this.sendError(`subscribe method not implemented for topic: ${data.topic}`, subscriptionId) + } } - private sendError(message: string, subscriptionId: string): void { - this.websocket.send(JSON.stringify({ subscriptionId, type: 'error', message } as ErrorResponse)) - } + onUnsubscribe(subscriptionId: string, data?: unknown): void { + if (!isTxsTopicData(data)) { + this.sendError(`no topic specified for unsubscribe`, subscriptionId) + return + } - private onMessage(event: WebSocket.MessageEvent): void { - try { - const payload: RequestPayload = JSON.parse(event.data.toString()) - - switch (payload.method) { - // browsers do not support ping/pong frame, handle message instead - case 'ping': { - this.websocket.send('pong') - break - } - case 'subscribe': - case 'unsubscribe': { - const topic = payload.data?.topic - - if (!topic) { - this.sendError(`no topic specified for method: ${payload.method}`, payload.subscriptionId) - break - } - - const callback = this.routes[topic][payload.method] - if (callback) { - callback(payload.subscriptionId, payload.data) - } else { - this.sendError(`${payload.method} method not implemented for topic: ${topic}`, payload.subscriptionId) - } - } - } - } catch (err) { - this.logger.error(err, { clientId: this.clientId, fn: 'onMessage', event }, 'Error processing message') + const callback = this.routes[data.topic].unsubscribe + if (callback) { + callback(subscriptionId, data) + } else { + this.sendError(`unsubscribe method not implemented for topic: ${data.topic}`, subscriptionId) } } - private close(interval: NodeJS.Timeout): void { - this.pingTimeout && clearTimeout(this.pingTimeout) - clearInterval(interval) - + onClose(): void { for (const subscriptionId of this.subscriptionIds) { this.registry.unsubscribe(this.clientId, subscriptionId, []) } - this.subscriptionIds.clear() this.client.subscribeAddresses(this.registry.getAddresses()) } @@ -192,9 +123,4 @@ export class ConnectionHandler { this.client.subscribeAddresses(this.registry.getAddresses()) } - - publish(subscriptionId: string, address: string, data: unknown): void { - const message: MessageResponse = { address, data, subscriptionId } - this.websocket.send(JSON.stringify(message)) - } } diff --git a/node/coinstacks/dogecoin/api/package.json b/node/coinstacks/dogecoin/api/package.json index 803415a6f..10116fe1d 100644 --- a/node/coinstacks/dogecoin/api/package.json +++ b/node/coinstacks/dogecoin/api/package.json @@ -14,6 +14,7 @@ "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bech32": "^2.0.0" } } diff --git a/node/coinstacks/dogecoin/api/src/app.ts b/node/coinstacks/dogecoin/api/src/app.ts index 179fc63aa..546f22fe1 100644 --- a/node/coinstacks/dogecoin/api/src/app.ts +++ b/node/coinstacks/dogecoin/api/src/app.ts @@ -3,16 +3,10 @@ import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' import { Logger } from '@shapeshiftoss/logger' -import { - middleware, - ConnectionHandler, - Registry, - BlockHandler, - TransactionHandler, - Prometheus, -} from '@shapeshiftoss/common-api' +import { middleware, ConnectionHandler, Registry, BlockHandler, TransactionHandler } from '@shapeshiftoss/common-api' import { getAddresses, NewBlock, Tx as BlockbookTx, WebsocketClient } from '@shapeshiftoss/blockbook' import { utxo } from '@shapeshiftoss/common-api' +import { Prometheus } from '@shapeshiftoss/prometheus' import { formatAddress, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/ethereum/api/package.json b/node/coinstacks/ethereum/api/package.json index 3efad55d6..5da64d738 100644 --- a/node/coinstacks/ethereum/api/package.json +++ b/node/coinstacks/ethereum/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/ethereum/api/src/app.ts b/node/coinstacks/ethereum/api/src/app.ts index c16731d8a..0c7a7ef4d 100644 --- a/node/coinstacks/ethereum/api/src/app.ts +++ b/node/coinstacks/ethereum/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, BlockHandler, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses, NewBlock } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/gnosis/api/package.json b/node/coinstacks/gnosis/api/package.json index c5c73bda4..7bb49eb8a 100644 --- a/node/coinstacks/gnosis/api/package.json +++ b/node/coinstacks/gnosis/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/gnosis/api/src/app.ts b/node/coinstacks/gnosis/api/src/app.ts index 4a9d1968f..3a54033fd 100644 --- a/node/coinstacks/gnosis/api/src/app.ts +++ b/node/coinstacks/gnosis/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, BlockHandler, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses, NewBlock } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/litecoin/api/package.json b/node/coinstacks/litecoin/api/package.json index 0b31533a8..d5b49e9aa 100644 --- a/node/coinstacks/litecoin/api/package.json +++ b/node/coinstacks/litecoin/api/package.json @@ -14,6 +14,7 @@ "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bech32": "^2.0.0" } } diff --git a/node/coinstacks/litecoin/api/src/app.ts b/node/coinstacks/litecoin/api/src/app.ts index 9d95708f9..c1b4bdb7b 100644 --- a/node/coinstacks/litecoin/api/src/app.ts +++ b/node/coinstacks/litecoin/api/src/app.ts @@ -3,16 +3,10 @@ import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' import { Logger } from '@shapeshiftoss/logger' -import { - middleware, - ConnectionHandler, - Registry, - BlockHandler, - TransactionHandler, - Prometheus, -} from '@shapeshiftoss/common-api' +import { middleware, ConnectionHandler, Registry, BlockHandler, TransactionHandler } from '@shapeshiftoss/common-api' import { getAddresses, NewBlock, Tx as BlockbookTx, WebsocketClient } from '@shapeshiftoss/blockbook' import { utxo } from '@shapeshiftoss/common-api' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service, formatAddress } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/optimism/api/package.json b/node/coinstacks/optimism/api/package.json index a6848d36d..48f817d56 100644 --- a/node/coinstacks/optimism/api/package.json +++ b/node/coinstacks/optimism/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/optimism/api/src/app.ts b/node/coinstacks/optimism/api/src/app.ts index 1ab69038c..16baf8a8e 100644 --- a/node/coinstacks/optimism/api/src/app.ts +++ b/node/coinstacks/optimism/api/src/app.ts @@ -10,10 +10,10 @@ import { AddressFormatter, BlockHandler, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses, NewBlock } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/polygon/api/package.json b/node/coinstacks/polygon/api/package.json index 5187af28c..ca00824fd 100644 --- a/node/coinstacks/polygon/api/package.json +++ b/node/coinstacks/polygon/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/polygon/api/src/app.ts b/node/coinstacks/polygon/api/src/app.ts index c30cf0aa7..98ad848a9 100644 --- a/node/coinstacks/polygon/api/src/app.ts +++ b/node/coinstacks/polygon/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, BlockHandler, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses, NewBlock } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/solana/api/package.json b/node/coinstacks/solana/api/package.json index 08c705f95..4e7a3961c 100644 --- a/node/coinstacks/solana/api/package.json +++ b/node/coinstacks/solana/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "@shapeshiftoss/websocket": "^10.0.0", "@solana/web3.js": "^1.95.3", "helius-sdk": "^1.4.0" diff --git a/node/coinstacks/solana/api/src/app.ts b/node/coinstacks/solana/api/src/app.ts index 1f3c7ba7a..d9c3f2cab 100644 --- a/node/coinstacks/solana/api/src/app.ts +++ b/node/coinstacks/solana/api/src/app.ts @@ -1,5 +1,6 @@ -import { ConnectionHandler, middleware, Prometheus, Registry, TransactionHandler } from '@shapeshiftoss/common-api' +import { ConnectionHandler, middleware, Registry, TransactionHandler } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { Logs } from '@solana/web3.js' import express from 'express' import { join } from 'path' diff --git a/node/coinstacks/solana/api/src/websocket.ts b/node/coinstacks/solana/api/src/websocket.ts index 598b40f4d..6826edc96 100644 --- a/node/coinstacks/solana/api/src/websocket.ts +++ b/node/coinstacks/solana/api/src/websocket.ts @@ -1,5 +1,5 @@ import { Logger } from '@shapeshiftoss/logger' -import { WebsocketClient as BaseWebsocketClient, Args, Options, Subscription } from '@shapeshiftoss/websocket' +import { Args, Options, Subscription, AddressSubscriptionWebsocketClient } from '@shapeshiftoss/websocket' import WebSocket from 'ws' import { isWebsocketResponse, @@ -17,7 +17,7 @@ interface WebsocketArgs extends Omit { type TransactionHandler = (data: Logs) => Promise -export class WebsocketClient extends BaseWebsocketClient { +export class WebsocketClient extends AddressSubscriptionWebsocketClient { private handleTransaction: TransactionHandler | Array private addresses: Array = [] private subscriptionIds: Array = [] @@ -29,7 +29,7 @@ export class WebsocketClient extends BaseWebsocketClient { this.handleTransaction = args.transactionHandler - this.initialize() + super.connect() } protected onOpen(): void { @@ -95,7 +95,7 @@ export class WebsocketClient extends BaseWebsocketClient { } try { - this.socket.send(JSON.stringify(subscription)) + this.socket?.send(JSON.stringify(subscription)) } catch (err) { this.logger.debug(err, `failed to subscribe address: ${JSON.stringify(subscription)}`) } @@ -103,7 +103,7 @@ export class WebsocketClient extends BaseWebsocketClient { } private unsubscribe(subscriptionId: number): void { - this.socket.send( + this.socket?.send( JSON.stringify({ jsonrpc: '2.0', id: 'unsubscribe', diff --git a/node/packages/blockbook/src/websocket.ts b/node/packages/blockbook/src/websocket.ts index a151b14ad..8bcf65ab6 100644 --- a/node/packages/blockbook/src/websocket.ts +++ b/node/packages/blockbook/src/websocket.ts @@ -1,5 +1,5 @@ import { Logger } from '@shapeshiftoss/logger' -import { WebsocketClient as BaseWebsocketClient, Args, Options, Subscription } from '@shapeshiftoss/websocket' +import { Args, Options, Subscription, AddressSubscriptionWebsocketClient } from '@shapeshiftoss/websocket' import WebSocket from 'ws' import { Tx } from './models' import { NewBlock, WebsocketRepsonse } from '.' @@ -14,7 +14,7 @@ interface WebsocketArgs extends Omit { blockHandler: BlockHandler | Array } -export class WebsocketClient extends BaseWebsocketClient { +export class WebsocketClient extends AddressSubscriptionWebsocketClient { private handleTransaction: TransactionHandler | Array private handleBlock: BlockHandler | Array @@ -27,16 +27,16 @@ export class WebsocketClient extends BaseWebsocketClient { this.handleTransaction = args.transactionHandler this.handleBlock = args.blockHandler - super.initialize() + super.connect() } protected onOpen(): void { const subscribeNewBlock: Subscription = { jsonrpc: '2.0', id: 'newBlock', method: 'subscribeNewBlock', params: {} } - this.socket.send(JSON.stringify(subscribeNewBlock)) + this.socket?.send(JSON.stringify(subscribeNewBlock)) if (this.addresses.length) { const subscribeAddresses = this.getAddressesSubscription() - this.socket.send(JSON.stringify(subscribeAddresses)) + this.socket?.send(JSON.stringify(subscribeAddresses)) } } @@ -86,7 +86,7 @@ export class WebsocketClient extends BaseWebsocketClient { const subscribeAddresses = this.getAddressesSubscription() try { - this.socket.send(JSON.stringify(subscribeAddresses)) + this.socket?.send(JSON.stringify(subscribeAddresses)) } catch (err) { this.logger.debug(err, `failed to subscribe addresses: ${JSON.stringify(subscribeAddresses)}`) } diff --git a/node/packages/prometheus/package.json b/node/packages/prometheus/package.json new file mode 100644 index 000000000..5a9d4d107 --- /dev/null +++ b/node/packages/prometheus/package.json @@ -0,0 +1,20 @@ +{ + "name": "@shapeshiftoss/prometheus", + "version": "10.0.0", + "license": "MIT", + "main": "dist/index.js", + "source": "src/index.ts", + "types": "dist/index.d.ts", + "files": [ + "dist" + ], + "scripts": { + "build": "tsc", + "clean": "rm -rf dist node_modules", + "dev": "nodemon -e ts --watch src -x yarn build", + "watch": "nodemon -e ts --watch src -x yarn build" + }, + "dependencies": { + "prom-client": "^15.0.0" + } +} diff --git a/node/packages/prometheus/src/index.ts b/node/packages/prometheus/src/index.ts new file mode 100644 index 000000000..d0fa32f11 --- /dev/null +++ b/node/packages/prometheus/src/index.ts @@ -0,0 +1 @@ +export * from './prometheus' diff --git a/node/coinstacks/common/api/src/prometheus.ts b/node/packages/prometheus/src/prometheus.ts similarity index 100% rename from node/coinstacks/common/api/src/prometheus.ts rename to node/packages/prometheus/src/prometheus.ts diff --git a/node/packages/prometheus/tsconfig.json b/node/packages/prometheus/tsconfig.json new file mode 100644 index 000000000..f0beb6444 --- /dev/null +++ b/node/packages/prometheus/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "incremental": true + }, + "include": ["src"], + "exclude": ["node_modules", "**/__tests__", "**/__mocks__"] +} diff --git a/node/packages/websocket/package.json b/node/packages/websocket/package.json index 28cabc58a..6692cd929 100644 --- a/node/packages/websocket/package.json +++ b/node/packages/websocket/package.json @@ -15,6 +15,7 @@ "watch": "nodemon -e ts --watch src -x yarn build" }, "dependencies": { + "@shapeshiftoss/prometheus": "^10.0.0", "ws": "^8.15.0" }, "devDependencies": { diff --git a/node/packages/websocket/src/connectionHandler.ts b/node/packages/websocket/src/connectionHandler.ts new file mode 100644 index 000000000..8b858f95f --- /dev/null +++ b/node/packages/websocket/src/connectionHandler.ts @@ -0,0 +1,109 @@ +import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' +import { v4 } from 'uuid' +import WebSocket from 'ws' + +export interface RequestPayload { + subscriptionId: string + method: 'subscribe' | 'unsubscribe' | 'ping' + data?: unknown +} + +export interface ErrorResponse { + subscriptionId: string + type: 'error' + message: string +} + +export interface MessageResponse extends Record { + data: unknown +} + +export abstract class BaseConnectionHandler { + public readonly clientId: string + + protected readonly websocket: WebSocket + private readonly prometheus?: Prometheus + private readonly logger: Logger + private readonly pingIntervalMs = 10000 + + private pingTimeout?: NodeJS.Timeout + protected subscriptionIds = new Set() + + abstract onSubscribe(subscriptionId: string, data?: unknown): void + abstract onUnsubscribe(subscriptionId: string, data?: unknown): void + abstract onClose(): void + + constructor(websocket: WebSocket, prometheus: Prometheus, logger: Logger) { + this.clientId = v4() + this.prometheus = prometheus + this.logger = logger.child({ namespace: ['websocket'] }) + + this.pingTimeout = undefined + this.prometheus?.metrics.websocketCount.inc() + this.websocket = websocket + this.websocket.ping() + + const pingInterval = setInterval(() => { + this.websocket.ping() + }, this.pingIntervalMs) + + this.websocket.onerror = (error) => { + this.logger.error({ clientId: this.clientId, error, fn: 'ws.onerror' }, 'websocket error') + this.close(pingInterval) + } + this.websocket.onclose = ({ code, reason }) => { + this.prometheus?.metrics.websocketCount.dec() + this.logger.debug({ clientId: this.clientId, code, reason, fn: 'ws.close' }, 'websocket closed') + this.close(pingInterval) + } + this.websocket.on('pong', () => this.heartbeat()) + this.websocket.on('ping', () => this.websocket.pong()) + this.websocket.onmessage = (event) => this.onMessage(event) + } + + private heartbeat(): void { + if (this.pingTimeout) { + clearTimeout(this.pingTimeout) + } + + this.pingTimeout = setTimeout(() => { + this.logger.debug({ clientId: this.clientId, fn: 'pingTimeout' }, 'heartbeat failed') + this.websocket.terminate() + }, this.pingIntervalMs + 1000) + } + + protected sendError(message: string, subscriptionId: string): void { + this.websocket.send(JSON.stringify({ subscriptionId, type: 'error', message } as ErrorResponse)) + } + + private onMessage(event: WebSocket.MessageEvent): void { + try { + const payload: RequestPayload = JSON.parse(event.data.toString()) + + switch (payload.method) { + // browsers do not support ping/pong frame, handle message instead + case 'ping': { + return this.websocket.send('pong') + } + case 'subscribe': + return this.onSubscribe(payload.subscriptionId, payload.data) + case 'unsubscribe': + return this.onUnsubscribe(payload.subscriptionId, payload.data) + } + } catch (err) { + this.logger.error(err, { clientId: this.clientId, fn: 'onMessage', event }, 'Error processing message') + } + } + + private close(interval: NodeJS.Timeout): void { + this.pingTimeout && clearTimeout(this.pingTimeout) + clearInterval(interval) + this.onClose() + this.subscriptionIds.clear() + } + + publish(subscriptionId: string, payload: { data: unknown } & Record): void { + this.websocket.send(JSON.stringify({ ...payload, subscriptionId })) + } +} diff --git a/node/packages/websocket/src/index.ts b/node/packages/websocket/src/index.ts index b2196badb..f1d958ca5 100644 --- a/node/packages/websocket/src/index.ts +++ b/node/packages/websocket/src/index.ts @@ -1 +1,2 @@ +export * from './connectionHandler' export * from './websocket' diff --git a/node/packages/websocket/src/websocket.ts b/node/packages/websocket/src/websocket.ts index 61a3884a4..415997d2e 100644 --- a/node/packages/websocket/src/websocket.ts +++ b/node/packages/websocket/src/websocket.ts @@ -24,8 +24,8 @@ export interface Options { resetInterval?: number } -export abstract class WebsocketClient { - protected socket: WebSocket +export abstract class BaseWebsocketClient { + protected socket?: WebSocket protected url: string protected pingTimeout?: NodeJS.Timeout protected interval?: NodeJS.Timeout @@ -40,11 +40,8 @@ export abstract class WebsocketClient { protected abstract onOpen(): void protected abstract onMessage(message: WebSocket.MessageEvent): Promise - abstract subscribeAddresses(addresses: string[]): void - constructor(url: string, args: Args, opts?: Options) { this.url = url - this.socket = new WebSocket(this.url, { handshakeTimeout: 5000 }) this.logger = args.logger this.pingInterval = opts?.pingInterval ?? 10000 @@ -52,48 +49,50 @@ export abstract class WebsocketClient { this.resetInterval = opts?.resetInterval ?? RESET_INTERVAL } - protected initialize(): void { - this.socket.on('ping', () => this.socket.pong()) + protected connect(): void { + if (this.socket?.readyState === WebSocket.OPEN) return + + this.socket = new WebSocket(this.url, { handshakeTimeout: 5000 }) + + this.socket.on('ping', () => this.socket?.pong()) this.socket.on('pong', () => this.heartbeat()) this.socket.onerror = (error) => { this.logger.error({ error, fn: 'ws.onerror' }, 'websocket error') } this.socket.onclose = ({ code, reason }) => { this.logger.error({ code, reason, fn: 'ws.close' }, 'websocket closed') - this.close() + this.close(code) } this.socket.onmessage = (msg) => this.onMessage(msg) this.socket.onopen = () => { this.logger.debug({ fn: 'ws.onopen' }, 'websocket opened') this.retryCount = 0 - this.interval = setInterval(() => this.socket.ping(), this.pingInterval) + this.interval = setInterval(() => this.socket?.ping(), this.pingInterval) this.heartbeat() this.onOpen() this.reset() } } - private close(): void { + private close(code: number): void { this.interval && clearInterval(this.interval) + this.pingTimeout && clearTimeout(this.pingTimeout) + this.resetTimeout && clearTimeout(this.resetTimeout) if (++this.retryCount >= this.retryAttempts && this.retryAttempts !== 0) { throw new Error('failed to reconnect') } - setTimeout( - () => { - this.socket = new WebSocket(this.url, { handshakeTimeout: 5000 }) - this.initialize() - }, - Math.min(Math.random() * (BASE_DELAY * this.retryCount ** 2), MAX_DELAY) - ) + if (code === 1000) return + + setTimeout(() => this.connect(), Math.min(Math.random() * (BASE_DELAY * this.retryCount ** 2), MAX_DELAY)) } private heartbeat(): void { this.pingTimeout && clearTimeout(this.pingTimeout) this.pingTimeout = setTimeout(() => { this.logger.debug({ fn: 'pingTimeout' }, 'heartbeat failed') - this.socket.terminate() + this.socket?.terminate() }, this.pingInterval + 1000) } @@ -103,7 +102,11 @@ export abstract class WebsocketClient { this.resetTimeout && clearTimeout(this.resetTimeout) this.resetTimeout = setTimeout(() => { this.logger.debug({ fn: 'reset' }, 'reset websocket') - this.socket.terminate() + this.socket?.terminate() }, this.resetInterval) } } + +export abstract class AddressSubscriptionWebsocketClient extends BaseWebsocketClient { + abstract subscribeAddresses(addresses: string[]): void +} diff --git a/node/proxy/api/src/app.ts b/node/proxy/api/src/app.ts index b88145f03..6e898ef9f 100644 --- a/node/proxy/api/src/app.ts +++ b/node/proxy/api/src/app.ts @@ -3,19 +3,25 @@ import { join } from 'path' import swaggerUi from 'swagger-ui-express' import { middleware } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' +import { Server } from 'ws' import { RegisterRoutes } from './routes' import { CoinGecko } from './coingecko' import { Zerion } from './zerion' import { Zrx } from './zrx' -import { MarketDataWebSocket } from './marketData' +import { MarketDataConnectionHandler } from './marketData' +import { CoincapWebsocketClient } from './coincap' const PORT = process.env.PORT ?? 3000 +const COINCAP_API_KEY = process.env.COINCAP_API_KEY export const logger = new Logger({ namespace: ['unchained', 'proxy', 'api'], level: process.env.LOG_LEVEL, }) +const prometheus = new Prometheus({ coinstack: 'proxy' }) + const app = express() app.use(...middleware.common()) @@ -53,23 +59,12 @@ app.use(middleware.errorHandler, middleware.notFoundHandler) const server = app.listen(PORT, () => logger.info('Server started')) -const marketDataWS = new MarketDataWebSocket(logger, process.env.MARKET_DATA_PROVIDER, process.env.MARKET_DATA_API_KEY) -marketDataWS.setupWebSocketServer(server) - -process.on('SIGTERM', () => { - logger.info('Received SIGTERM, shutting down gracefully') - marketDataWS.disconnect() - server.close(() => { - logger.info('Server closed') - process.exit(0) - }) +const coincap = new CoincapWebsocketClient(`wss://wss.coincap.io/prices?assets=ALL&apiKey=${COINCAP_API_KEY}`, { + logger, }) -process.on('SIGINT', () => { - logger.info('Received SIGINT, shutting down gracefully') - marketDataWS.disconnect() - server.close(() => { - logger.info('Server closed') - process.exit(0) - }) +const wsServer = new Server({ server }) + +wsServer.on('connection', (connection) => { + MarketDataConnectionHandler.start(connection, coincap, prometheus, logger) }) diff --git a/node/proxy/api/src/coincap.ts b/node/proxy/api/src/coincap.ts new file mode 100644 index 000000000..724267153 --- /dev/null +++ b/node/proxy/api/src/coincap.ts @@ -0,0 +1,58 @@ +import WebSocket from 'ws' + +import { BaseWebsocketClient, Args, Options, BaseConnectionHandler } from '@shapeshiftoss/websocket' +import { MarketDataClient, MarketDataConnectionHandler, MarketDataMessage } from './marketData' + +// TODO: track assets and subscriptionId for clients +export class CoincapWebsocketClient extends BaseWebsocketClient implements MarketDataClient { + clients = new Map() + + constructor(url: string, args: Args, opts?: Options) { + super(url, { logger: args.logger }, opts) + } + + protected onOpen(): void {} + + protected async onMessage(message: WebSocket.MessageEvent): Promise { + try { + const res = JSON.parse(message.data.toString()) as Record + + if (!res) return + + super.reset() + this.handleMessage(res) + } catch (err) { + this.logger.error(err, `failed to handle message: ${JSON.stringify(message)}`) + } + } + + subscribe(clientId: string, subscriptionId: string, connection: MarketDataConnectionHandler, assets: Array) { + console.log(this.clients.size, { subscriptionId, assets }) + if (!this.clients.size) this.connect() + this.clients.set(clientId, connection) + } + + unsubscribe(clientId: string, subscriptionId: string, assets: Array) { + console.log({ subscriptionId, assets }) + if (!this.clients.has(clientId)) return + this.clients.delete(clientId) + if (!this.clients.size) this.socket?.close(1000) + } + + private handleMessage(message: Record): void { + for (const [clientId, client] of this.clients) { + try { + const payload: MarketDataMessage = { + type: 'price_update', + source: 'coincap', + data: message, + timestamp: Date.now(), + } + + client.publish(clientId, payload) + } catch (error) { + this.logger.error({ clientId, error }, 'failed to handle message') + } + } + } +} diff --git a/node/proxy/api/src/marketData.ts b/node/proxy/api/src/marketData.ts index a4adc15ba..c9093879e 100644 --- a/node/proxy/api/src/marketData.ts +++ b/node/proxy/api/src/marketData.ts @@ -1,291 +1,83 @@ import { Logger } from '@shapeshiftoss/logger' -import WebSocket, { Server } from 'ws' -import { v4 } from 'uuid' -import { Server as HttpServer } from 'http' -import { IncomingMessage } from 'http' -import { URL } from 'url' +import WebSocket from 'ws' -export interface MarketDataProviderConfig { - name: string - wsBaseUrl: string - apiKey?: string - // How to construct the WebSocket URL for this provider - buildUrl: (baseUrl: string, apiKey?: string) => string - parseMessage: (data: unknown) => Record | null -} +import { BaseConnectionHandler, MessageResponse } from '@shapeshiftoss/websocket' +import { Prometheus } from '@shapeshiftoss/prometheus' -export interface MarketDataMessage { +export interface MarketDataMessage extends MessageResponse { type: 'price_update' source: string data: Record timestamp: number } -export interface MarketDataClient { - id: string - websocket: WebSocket - assets: string[] +export interface SubscribePayload { + assets: Array } -const PROVIDERS: Record = { - coincap: { - name: 'CoinCap', - wsBaseUrl: 'wss://wss.coincap.io/prices', - buildUrl: (baseUrl: string, apiKey?: string) => { - const params = new URLSearchParams({ assets: 'ALL' }) - if (apiKey) params.set('apiKey', apiKey) - return `${baseUrl}?${params.toString()}` - }, - parseMessage: (data: unknown) => { - // CoinCap sends data directly as asset price object - return typeof data === 'object' && data !== null ? (data as Record) : null - }, - }, +function isSubscribePayload(data: unknown): data is SubscribePayload { + return data !== null && typeof data === 'object' && 'assets' in data } -export class MarketDataWebSocket { - private clients = new Map() - private socket?: WebSocket - private reconnectTimeout?: NodeJS.Timeout - private readonly logger: Logger - private readonly provider: MarketDataProviderConfig - private readonly wsUrl: string - private readonly reconnectDelayMs = 5000 - - constructor(logger: Logger, providerName = 'coincap', apiKey?: string) { - this.logger = logger.child({ namespace: ['proxy', 'marketData'] }) - - this.provider = PROVIDERS[providerName] - if (!this.provider) { - throw new Error(`Unknown market data provider: ${providerName}. Available: ${Object.keys(PROVIDERS).join(', ')}`) - } - - this.wsUrl = this.provider.buildUrl(this.provider.wsBaseUrl, apiKey) - this.logger.info({ provider: this.provider.name, url: this.wsUrl }, 'Market data provider configured') - } - - private connect(): void { - if (this.socket?.readyState === WebSocket.OPEN) { - return - } - - this.logger.info({ provider: this.provider.name, url: this.wsUrl }, 'Connecting to market data provider') - this.socket = new WebSocket(this.wsUrl) - - this.socket.onopen = () => { - this.logger.info({ provider: this.provider.name }, 'Connected to market data provider') - if (this.reconnectTimeout) { - clearTimeout(this.reconnectTimeout) - this.reconnectTimeout = undefined - } - } +export interface MarketDataClient { + subscribe( + clientId: string, + subscriptionId: string, + connection: MarketDataConnectionHandler, + assets: Array + ): void + unsubscribe(clientId: string, subscriptionId: string, assets: Array): void +} - this.socket.onmessage = (event) => { - this.handleMessage(event) - } +export class MarketDataConnectionHandler extends BaseConnectionHandler { + private readonly client: MarketDataClient - this.socket.onclose = (event) => { - this.logger.warn( - { - provider: this.provider.name, - code: event.code, - reason: event.reason, - }, - 'Market data provider WebSocket closed' - ) - this.scheduleReconnect() - } + private constructor(websocket: WebSocket, client: MarketDataClient, prometheus: Prometheus, logger: Logger) { + super(websocket, prometheus, logger) - this.socket.onerror = (error) => { - this.logger.error({ provider: this.provider.name, error }, 'Market data provider WebSocket error') - } + this.client = client } - private handleMessage(event: WebSocket.MessageEvent): void { - try { - const rawData = JSON.parse(event.data.toString()) - const parsedData = this.provider.parseMessage(rawData) - - if (!parsedData) { - return - } - - const message: MarketDataMessage = { - type: 'price_update', - source: this.provider.name.toLowerCase(), - data: parsedData, - timestamp: Date.now(), - } - - this.broadcastToClients(message) - } catch (error) { - this.logger.error( - { - provider: this.provider.name, - error, - data: event.data, - }, - 'Error processing provider message' - ) - } + static start(websocket: WebSocket, client: MarketDataClient, prometheus: Prometheus, logger: Logger): void { + new MarketDataConnectionHandler(websocket, client, prometheus, logger) } - private broadcastToClients(message: MarketDataMessage): void { - for (const [clientId, client] of this.clients) { - try { - if (client.websocket.readyState === WebSocket.OPEN) { - // Filter data to only include assets this client requested - const filteredData: Record = {} - for (const asset of client.assets) { - if (message.data[asset] !== undefined) { - filteredData[asset] = message.data[asset] - } - } - - // Only send if there's relevant data for this client - if (Object.keys(filteredData).length > 0) { - const filteredMessage: MarketDataMessage = { - ...message, - data: filteredData, - } - client.websocket.send(JSON.stringify(filteredMessage)) - } - } else { - this.removeClient(clientId) - } - } catch (error) { - this.logger.error({ clientId, error }, 'Error sending message to client') - this.removeClient(clientId) - } - } - } - - private scheduleReconnect(): void { - if (this.reconnectTimeout) { + onSubscribe(subscriptionId: string, data?: unknown): void { + if (!subscriptionId) { + this.sendError('subscriptionId required', subscriptionId) return } - // Only reconnect if we still have clients - this.reconnectTimeout = setTimeout(() => { - if (this.clients.size > 0) { - this.logger.info({ provider: this.provider.name }, 'Attempting to reconnect to market data provider') - this.connect() - } else { - this.logger.info('No clients connected, skipping provider reconnect') - } - }, this.reconnectDelayMs) - } - - private disconnectFromProvider(): void { - this.logger.info({ provider: this.provider.name }, 'Disconnecting from market data provider - no more clients') - - if (this.reconnectTimeout) { - clearTimeout(this.reconnectTimeout) - this.reconnectTimeout = undefined - } - - if (this.socket) { - this.socket.close() - this.socket = undefined - } - } - - addClient(websocket: WebSocket, clientId: string, requestedAssets: string[]): void { - const client: MarketDataClient = { - id: clientId, - websocket, - assets: requestedAssets, - } - - const wasEmpty = this.clients.size === 0 - this.clients.set(clientId, client) - - // Connect to provider if this is our first client - if (wasEmpty) { - this.logger.info({ provider: this.provider.name }, 'First client connected, establishing provider connection') - this.connect() + if (!isSubscribePayload(data)) { + this.sendError(`invalid subscription payload, no assets provided`, subscriptionId) + return } - this.logger.debug( - { - clientId, - requestedAssets, - totalClients: this.clients.size, - }, - 'Client connected to market data' - ) - - websocket.on('close', () => { - this.removeClient(clientId) - }) - - websocket.on('error', (error) => { - this.logger.error({ clientId, error }, 'Client WebSocket error') - this.removeClient(clientId) - }) + this.subscriptionIds.add(subscriptionId) + this.client.subscribe(this.clientId, subscriptionId, this, data.assets) } - private removeClient(clientId: string): void { - if (this.clients.has(clientId)) { - this.clients.delete(clientId) - this.logger.debug({ clientId, totalClients: this.clients.size }, 'Client disconnected from market data') - - // Disconnect from provider if no more clients - if (this.clients.size === 0) { - this.disconnectFromProvider() - } + onUnsubscribe(subscriptionId: string, data?: unknown): void { + if (!isSubscribePayload(data)) { + this.sendError(`invalid subscription payload, no assets provided`, subscriptionId) + return } - } - - /** - * Parse assets from WebSocket URL query parameters - * Handles both full URLs and relative paths from different environments - */ - private parseAssetsFromUrl(request: IncomingMessage): string[] { - try { - const rawUrl = request.url || '' - // Use dummy base to handle relative URLs like "/ws/market-data?assets=bitcoin" - const url = new URL(rawUrl, 'ws://dummy.com') - const assetsParam = url.searchParams.get('assets') - if (!assetsParam) { - return [] + if (subscriptionId) { + this.subscriptionIds.delete(subscriptionId) + this.client.unsubscribe(this.clientId, subscriptionId, data.assets) + } else { + for (const subscriptionId of this.subscriptionIds) { + this.client.unsubscribe(this.clientId, subscriptionId, []) } - return assetsParam - .split(',') - .map((asset) => asset.trim().toLowerCase()) - .filter((asset) => asset.length > 0) - } catch (error) { - this.logger.error({ error, url: request.url }, 'Error parsing assets from URL') - return [] + this.subscriptionIds.clear() } } - setupWebSocketServer(server: HttpServer, path = '/market-data'): Server { - const wsServer = new Server({ server, path }) - - wsServer.on('connection', (ws, request) => { - const clientId = v4() - const requestedAssets = this.parseAssetsFromUrl(request) - this.addClient(ws, clientId, requestedAssets) - }) - - this.logger.info({ path }, 'WebSocket server setup complete') - return wsServer - } - - disconnect(): void { - this.logger.info('Shutting down market data WebSocket') - - // Disconnect from provider - this.disconnectFromProvider() - - // Close all client connections - for (const [, client] of this.clients) { - client.websocket.close() + onClose(): void { + for (const subscriptionId of this.subscriptionIds) { + this.client.unsubscribe(this.clientId, subscriptionId, []) } - - this.clients.clear() } } diff --git a/node/proxy/sample.env b/node/proxy/sample.env index bb8c38262..6f43cb680 100644 --- a/node/proxy/sample.env +++ b/node/proxy/sample.env @@ -1,6 +1,7 @@ # SECRET ENVIRONMENT VARIABLES ELLIPTIC_API_KEY= ELLIPTIC_API_SECRET= +COINCAP_API_KEY= COINGECKO_API_KEY= ZERION_API_KEY= ZRX_API_KEY= \ No newline at end of file diff --git a/package.json b/package.json index 29d7ad43d..c2c9fa070 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,6 @@ "lerna": "^8.0.0", "morgan": "^1.10.0", "object-hash": "^3.0.0", - "prom-client": "^15.0.0", "swagger-ui-express": "^5.0.0", "tsoa": "^5.1.1", "viem": "^2.33.2" From bbf2cb0e9c4cd121b462df3f62820c7e7e2d44ad Mon Sep 17 00:00:00 2001 From: Jibles Date: Sat, 30 Aug 2025 11:10:37 +1000 Subject: [PATCH 3/5] implement client subscription id and asset selection --- node/proxy/api/src/coincap.ts | 69 ++++++++++++++++++++++++++------ node/proxy/api/src/marketData.ts | 13 ++---- 2 files changed, 60 insertions(+), 22 deletions(-) diff --git a/node/proxy/api/src/coincap.ts b/node/proxy/api/src/coincap.ts index 724267153..407cac736 100644 --- a/node/proxy/api/src/coincap.ts +++ b/node/proxy/api/src/coincap.ts @@ -3,9 +3,10 @@ import WebSocket from 'ws' import { BaseWebsocketClient, Args, Options, BaseConnectionHandler } from '@shapeshiftoss/websocket' import { MarketDataClient, MarketDataConnectionHandler, MarketDataMessage } from './marketData' -// TODO: track assets and subscriptionId for clients export class CoincapWebsocketClient extends BaseWebsocketClient implements MarketDataClient { clients = new Map() + // clientId -> subscriptionId -> assets + subscriptions = new Map>() constructor(url: string, args: Args, opts?: Options) { super(url, { logger: args.logger }, opts) @@ -27,29 +28,71 @@ export class CoincapWebsocketClient extends BaseWebsocketClient implements Marke } subscribe(clientId: string, subscriptionId: string, connection: MarketDataConnectionHandler, assets: Array) { - console.log(this.clients.size, { subscriptionId, assets }) if (!this.clients.size) this.connect() this.clients.set(clientId, connection) + + // Get or create the client's subscription map + let clientSubscriptions = this.subscriptions.get(clientId) + if (!clientSubscriptions) { + clientSubscriptions = new Map() + this.subscriptions.set(clientId, clientSubscriptions) + } + + // Add this subscription + clientSubscriptions.set(subscriptionId, assets) } - unsubscribe(clientId: string, subscriptionId: string, assets: Array) { - console.log({ subscriptionId, assets }) - if (!this.clients.has(clientId)) return - this.clients.delete(clientId) + unsubscribe(clientId: string, subscriptionId?: string) { + const clientSubscriptions = this.subscriptions.get(clientId) + if (!clientSubscriptions) return + + if (subscriptionId) { + // Remove specific subscription + clientSubscriptions.delete(subscriptionId) + + // If client has no more subscriptions, remove them entirely + if (clientSubscriptions.size === 0) { + this.clients.delete(clientId) + this.subscriptions.delete(clientId) + } + } else { + // Remove all subscriptions for this client + this.clients.delete(clientId) + this.subscriptions.delete(clientId) + } + + // Close connection if no more clients if (!this.clients.size) this.socket?.close(1000) } private handleMessage(message: Record): void { for (const [clientId, client] of this.clients) { try { - const payload: MarketDataMessage = { - type: 'price_update', - source: 'coincap', - data: message, - timestamp: Date.now(), - } + const clientSubscriptions = this.subscriptions.get(clientId) + if (!clientSubscriptions) continue - client.publish(clientId, payload) + // Send updates for each subscription + for (const [subscriptionId, assets] of clientSubscriptions) { + // Filter data to only include assets this subscription requested + const filteredData: Record = {} + for (const asset of assets) { + if (message[asset] !== undefined) { + filteredData[asset] = message[asset] + } + } + + // Only send if there's relevant data for this subscription + if (Object.keys(filteredData).length > 0) { + const payload: MarketDataMessage = { + type: 'price_update', + source: 'coincap', + data: filteredData, + timestamp: Date.now(), + } + + client.publish(subscriptionId, payload) + } + } } catch (error) { this.logger.error({ clientId, error }, 'failed to handle message') } diff --git a/node/proxy/api/src/marketData.ts b/node/proxy/api/src/marketData.ts index c9093879e..df960672c 100644 --- a/node/proxy/api/src/marketData.ts +++ b/node/proxy/api/src/marketData.ts @@ -26,7 +26,7 @@ export interface MarketDataClient { connection: MarketDataConnectionHandler, assets: Array ): void - unsubscribe(clientId: string, subscriptionId: string, assets: Array): void + unsubscribe(clientId: string, subscriptionId?: string): void } export class MarketDataConnectionHandler extends BaseConnectionHandler { @@ -65,19 +65,14 @@ export class MarketDataConnectionHandler extends BaseConnectionHandler { if (subscriptionId) { this.subscriptionIds.delete(subscriptionId) - this.client.unsubscribe(this.clientId, subscriptionId, data.assets) + this.client.unsubscribe(this.clientId, subscriptionId) } else { - for (const subscriptionId of this.subscriptionIds) { - this.client.unsubscribe(this.clientId, subscriptionId, []) - } - this.subscriptionIds.clear() + this.client.unsubscribe(this.clientId) } } onClose(): void { - for (const subscriptionId of this.subscriptionIds) { - this.client.unsubscribe(this.clientId, subscriptionId, []) - } + this.client.unsubscribe(this.clientId) } } From 9cbf0e1735c738f551884043b9dd99541c483425 Mon Sep 17 00:00:00 2001 From: Jibles Date: Sat, 30 Aug 2025 11:14:07 +1000 Subject: [PATCH 4/5] Remove assets requirement for unsub --- node/packages/websocket/src/connectionHandler.ts | 4 ++-- node/proxy/api/src/marketData.ts | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/node/packages/websocket/src/connectionHandler.ts b/node/packages/websocket/src/connectionHandler.ts index 8b858f95f..e36e16bc9 100644 --- a/node/packages/websocket/src/connectionHandler.ts +++ b/node/packages/websocket/src/connectionHandler.ts @@ -31,7 +31,7 @@ export abstract class BaseConnectionHandler { protected subscriptionIds = new Set() abstract onSubscribe(subscriptionId: string, data?: unknown): void - abstract onUnsubscribe(subscriptionId: string, data?: unknown): void + abstract onUnsubscribe(subscriptionId: string): void abstract onClose(): void constructor(websocket: WebSocket, prometheus: Prometheus, logger: Logger) { @@ -89,7 +89,7 @@ export abstract class BaseConnectionHandler { case 'subscribe': return this.onSubscribe(payload.subscriptionId, payload.data) case 'unsubscribe': - return this.onUnsubscribe(payload.subscriptionId, payload.data) + return this.onUnsubscribe(payload.subscriptionId) } } catch (err) { this.logger.error(err, { clientId: this.clientId, fn: 'onMessage', event }, 'Error processing message') diff --git a/node/proxy/api/src/marketData.ts b/node/proxy/api/src/marketData.ts index df960672c..ef8df3adc 100644 --- a/node/proxy/api/src/marketData.ts +++ b/node/proxy/api/src/marketData.ts @@ -57,12 +57,7 @@ export class MarketDataConnectionHandler extends BaseConnectionHandler { this.client.subscribe(this.clientId, subscriptionId, this, data.assets) } - onUnsubscribe(subscriptionId: string, data?: unknown): void { - if (!isSubscribePayload(data)) { - this.sendError(`invalid subscription payload, no assets provided`, subscriptionId) - return - } - + onUnsubscribe(subscriptionId: string): void { if (subscriptionId) { this.subscriptionIds.delete(subscriptionId) this.client.unsubscribe(this.clientId, subscriptionId) From 166f11751921658faf480f5f7390511a4376bbda Mon Sep 17 00:00:00 2001 From: kaladinlight <35275952+kaladinlight@users.noreply.github.com> Date: Mon, 5 Jan 2026 14:24:19 -0700 Subject: [PATCH 5/5] claude suggestions --- node/proxy/api/src/app.ts | 7 ++++++- node/proxy/api/src/coincap.ts | 15 ++++----------- node/proxy/api/src/marketData.ts | 7 ++++++- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/node/proxy/api/src/app.ts b/node/proxy/api/src/app.ts index 2239280c0..a1c263a69 100644 --- a/node/proxy/api/src/app.ts +++ b/node/proxy/api/src/app.ts @@ -25,10 +25,15 @@ const prometheus = new Prometheus({ coinstack: 'proxy' }) const app = express() -app.use(...middleware.common()) +app.use(...middleware.common(prometheus)) app.get('/health', async (_, res) => res.json({ status: 'ok' })) +app.get('/metrics', async (_, res) => { + res.setHeader('Content-Type', prometheus.register.contentType) + res.send(await prometheus.register.metrics()) +}) + const options: swaggerUi.SwaggerUiOptions = { customCss: '.swagger-ui .topbar { display: none }', customSiteTitle: 'ShapeShift Proxy API Docs', diff --git a/node/proxy/api/src/coincap.ts b/node/proxy/api/src/coincap.ts index 407cac736..ef89677ad 100644 --- a/node/proxy/api/src/coincap.ts +++ b/node/proxy/api/src/coincap.ts @@ -18,8 +18,6 @@ export class CoincapWebsocketClient extends BaseWebsocketClient implements Marke try { const res = JSON.parse(message.data.toString()) as Record - if (!res) return - super.reset() this.handleMessage(res) } catch (err) { @@ -31,22 +29,17 @@ export class CoincapWebsocketClient extends BaseWebsocketClient implements Marke if (!this.clients.size) this.connect() this.clients.set(clientId, connection) - // Get or create the client's subscription map - let clientSubscriptions = this.subscriptions.get(clientId) - if (!clientSubscriptions) { - clientSubscriptions = new Map() - this.subscriptions.set(clientId, clientSubscriptions) + if (!this.subscriptions.has(clientId)) { + this.subscriptions.set(clientId, new Map()) } - // Add this subscription - clientSubscriptions.set(subscriptionId, assets) + this.subscriptions.get(clientId)!.set(subscriptionId, assets) } unsubscribe(clientId: string, subscriptionId?: string) { const clientSubscriptions = this.subscriptions.get(clientId) - if (!clientSubscriptions) return - if (subscriptionId) { + if (clientSubscriptions && subscriptionId) { // Remove specific subscription clientSubscriptions.delete(subscriptionId) diff --git a/node/proxy/api/src/marketData.ts b/node/proxy/api/src/marketData.ts index ef8df3adc..8565f4142 100644 --- a/node/proxy/api/src/marketData.ts +++ b/node/proxy/api/src/marketData.ts @@ -49,7 +49,12 @@ export class MarketDataConnectionHandler extends BaseConnectionHandler { } if (!isSubscribePayload(data)) { - this.sendError(`invalid subscription payload, no assets provided`, subscriptionId) + this.sendError(`invalid subscription payload`, subscriptionId) + return + } + + if (!data.assets.length) { + this.sendError(`assets required`, subscriptionId) return }