From e48f2c4c248579dac794d8442ea4053a8a9c5664 Mon Sep 17 00:00:00 2001 From: TaprootFreak <142087526+TaprootFreak@users.noreply.github.com> Date: Fri, 12 Jun 2026 16:15:21 +0200 Subject: [PATCH] fix(liquidity): recover Scrypt trade orders from transient WS errors Scrypt sell/buy orders were marked as failed when the WebSocket connection dropped or timed out after the order was sent, losing the locally generated ClOrdID even though the exchange had executed the trade. - generate the ClOrdID in the adapter and persist it on the order before sending, so the trade stays verifiable - treat transient WS errors and request timeouts during placement and completion checks as recoverable: keep the order in progress and let the completion check resolve the actual state (lost orders still fail after the existing 60 min cutoff) - add a re-execution guard that checks for an existing trade by ClOrdID before placing a new order, preventing duplicate trades on retries - fetch all execution report pages in getOrderStatus instead of only the first page --- .../services/scrypt-websocket-connection.ts | 9 + .../exchange/services/scrypt.service.ts | 17 +- .../actions/__tests__/scrypt.adapter.spec.ts | 232 ++++++++++++++++++ .../adapters/actions/scrypt.adapter.ts | 85 +++++-- 4 files changed, 323 insertions(+), 20 deletions(-) create mode 100644 src/subdomains/core/liquidity-management/adapters/actions/__tests__/scrypt.adapter.spec.ts diff --git a/src/integration/exchange/services/scrypt-websocket-connection.ts b/src/integration/exchange/services/scrypt-websocket-connection.ts index bf273bd07b..6eb8c3d749 100644 --- a/src/integration/exchange/services/scrypt-websocket-connection.ts +++ b/src/integration/exchange/services/scrypt-websocket-connection.ts @@ -49,6 +49,15 @@ export function isTransientWsError(e: Error): boolean { return TRANSIENT_WS_ERROR_MARKERS.some((m) => e.message?.toLowerCase().includes(m.toLowerCase())); } +// Matches the timeout errors thrown by requestWithId and requestAndWaitForUpdate below. +// Kept separate from TRANSIENT_WS_ERROR_MARKERS: a timed-out request may still be processed +// by Scrypt, so it must not trigger the automatic re-send in retryOnTransientWsError. +export const WS_TIMEOUT_ERROR_MARKERS = ['Timeout waiting for', 'Request timeout after']; + +export function isWsTimeoutError(e: Error): boolean { + return WS_TIMEOUT_ERROR_MARKERS.some((m) => e.message?.toLowerCase().includes(m.toLowerCase())); +} + interface ScryptRequest { reqid?: number; type: ScryptRequestType | ScryptMessageType; diff --git a/src/integration/exchange/services/scrypt.service.ts b/src/integration/exchange/services/scrypt.service.ts index 637caac598..3d9ed53d07 100644 --- a/src/integration/exchange/services/scrypt.service.ts +++ b/src/integration/exchange/services/scrypt.service.ts @@ -221,7 +221,8 @@ export class ScryptService extends PricingProvider { const filters: Record = {}; if (since) filters.StartDate = since.toISOString(); - return this.connection.fetch(ScryptMessageType.EXECUTION_REPORT, filters); + // fetchAll: the report may be on a later page, a single page would miss existing orders + return this.connection.fetchAll(ScryptMessageType.EXECUTION_REPORT, filters); } async getTrades(since?: Date): Promise { @@ -247,7 +248,7 @@ export class ScryptService extends PricingProvider { return side === ScryptOrderSide.BUY ? price : 1 / price; } - async sell(from: string, to: string, amount: number): Promise { + async sell(from: string, to: string, amount: number, clOrdId?: string): Promise { const { symbol, side } = await this.getTradePair(from, to); const price = await this.getOrderBookPrice(symbol, side); const sizeIncrement = await this.getSizeIncrement(symbol); @@ -258,10 +259,10 @@ export class ScryptService extends PricingProvider { const rawQty = side === ScryptOrderSide.SELL ? amount : amount / price; const orderQty = Util.floorToValue(rawQty, sizeIncrement); - return this.placeAndReturnId(symbol, side, orderQty, price); + return this.placeAndReturnId(symbol, side, orderQty, price, clOrdId); } - async buy(from: string, to: string, amount: number): Promise { + async buy(from: string, to: string, amount: number, clOrdId?: string): Promise { const { symbol, side } = await this.getTradePair(from, to); const price = await this.getOrderBookPrice(symbol, side); const sizeIncrement = await this.getSizeIncrement(symbol); @@ -272,7 +273,7 @@ export class ScryptService extends PricingProvider { const rawQty = side === ScryptOrderSide.BUY ? amount : amount / price; const orderQty = Util.floorToValue(rawQty, sizeIncrement); - return this.placeAndReturnId(symbol, side, orderQty, price); + return this.placeAndReturnId(symbol, side, orderQty, price, clOrdId); } private async getSizeIncrement(symbol: string): Promise { @@ -285,6 +286,7 @@ export class ScryptService extends PricingProvider { side: ScryptOrderSide, orderQty: number, price: number, + clOrdId?: string, ): Promise { const response = await this.placeOrder( symbol, @@ -293,6 +295,7 @@ export class ScryptService extends PricingProvider { ScryptOrderType.LIMIT, ScryptTimeInForce.GOOD_TILL_CANCEL, price, + clOrdId, ); return response.id; } @@ -438,8 +441,10 @@ export class ScryptService extends PricingProvider { orderType: ScryptOrderType = ScryptOrderType.LIMIT, timeInForce: ScryptTimeInForce = ScryptTimeInForce.GOOD_TILL_CANCEL, price?: number, + presetClOrdId?: string, ): Promise { - const clOrdId = randomUUID(); + // a caller-provided ClOrdID allows recovering the order if the connection drops after sending + const clOrdId = presetClOrdId ?? randomUUID(); // Price is required for LIMIT orders if (orderType === ScryptOrderType.LIMIT && price === undefined) { diff --git a/src/subdomains/core/liquidity-management/adapters/actions/__tests__/scrypt.adapter.spec.ts b/src/subdomains/core/liquidity-management/adapters/actions/__tests__/scrypt.adapter.spec.ts new file mode 100644 index 0000000000..df2987e39c --- /dev/null +++ b/src/subdomains/core/liquidity-management/adapters/actions/__tests__/scrypt.adapter.spec.ts @@ -0,0 +1,232 @@ +import { createMock, DeepMocked } from '@golevelup/ts-jest'; +import { Test, TestingModule } from '@nestjs/testing'; +import { ScryptOrderInfo, ScryptOrderSide, ScryptOrderStatus } from 'src/integration/exchange/dto/scrypt.dto'; +import { ScryptService } from 'src/integration/exchange/services/scrypt.service'; +import { createCustomAsset } from 'src/shared/models/asset/__mocks__/asset.entity.mock'; +import { AssetService } from 'src/shared/models/asset/asset.service'; +import { TestSharedModule } from 'src/shared/utils/test.shared.module'; +import { DexService } from 'src/subdomains/supporting/dex/services/dex.service'; +import { Price } from 'src/subdomains/supporting/pricing/domain/entities/price'; +import { PricingService } from 'src/subdomains/supporting/pricing/services/pricing.service'; +import { LiquidityManagementAction } from '../../../entities/liquidity-management-action.entity'; +import { LiquidityManagementOrder } from '../../../entities/liquidity-management-order.entity'; +import { LiquidityManagementPipeline } from '../../../entities/liquidity-management-pipeline.entity'; +import { LiquidityManagementRule } from '../../../entities/liquidity-management-rule.entity'; +import { LiquidityManagementOrderStatus, LiquidityManagementSystem } from '../../../enums'; +import { OrderFailedException } from '../../../exceptions/order-failed.exception'; +import { OrderNotProcessableException } from '../../../exceptions/order-not-processable.exception'; +import { LiquidityManagementOrderRepository } from '../../../repositories/liquidity-management-order.repository'; +import { ScryptAdapter, ScryptAdapterCommands } from '../scrypt.adapter'; + +describe('ScryptAdapter', () => { + let adapter: ScryptAdapter; + + let scryptService: DeepMocked; + let dexService: DeepMocked; + let orderRepo: DeepMocked; + let pricingService: DeepMocked; + let assetService: DeepMocked; + + beforeEach(async () => { + scryptService = createMock(); + dexService = createMock(); + orderRepo = createMock(); + pricingService = createMock(); + assetService = createMock(); + + const module: TestingModule = await Test.createTestingModule({ + imports: [TestSharedModule], + providers: [ + ScryptAdapter, + { provide: ScryptService, useValue: scryptService }, + { provide: DexService, useValue: dexService }, + { provide: LiquidityManagementOrderRepository, useValue: orderRepo }, + { provide: PricingService, useValue: pricingService }, + { provide: AssetService, useValue: assetService }, + ], + }).compile(); + + adapter = module.get(ScryptAdapter); + + // happy-path defaults for the sell command (USDT -> CHF) + assetService.getAssetByUniqueName.mockResolvedValue(createCustomAsset({ name: 'CHF', dexName: 'CHF' })); + scryptService.getCurrentPrice.mockResolvedValue(1); + pricingService.getPrice.mockResolvedValue(Price.create('USDT', 'CHF', 1)); + scryptService.getAvailableBalance.mockResolvedValue(1000); + scryptService.getOrderStatus.mockResolvedValue(null); + orderRepo.save.mockImplementation(async (order) => order as LiquidityManagementOrder); + }); + + function createOrder( + command: ScryptAdapterCommands, + customValues: Partial = {}, + ): LiquidityManagementOrder { + const rule = Object.assign(new LiquidityManagementRule(), { + targetAsset: createCustomAsset({ name: 'USDT', dexName: 'USDT' }), + }); + const pipeline = Object.assign(new LiquidityManagementPipeline(), { rule }); + const action = Object.assign(new LiquidityManagementAction(), { + system: LiquidityManagementSystem.SCRYPT, + command, + params: JSON.stringify({ tradeAsset: 'CHF' }), + }); + + return Object.assign(new LiquidityManagementOrder(), { + id: 1, + created: new Date(), + status: LiquidityManagementOrderStatus.CREATED, + minAmount: 10, + maxAmount: 100, + pipeline, + action, + ...customValues, + }); + } + + function createSellOrder(customValues: Partial = {}): LiquidityManagementOrder { + return createOrder(ScryptAdapterCommands.SELL, customValues); + } + + function createBuyOrder(customValues: Partial = {}): LiquidityManagementOrder { + return createOrder(ScryptAdapterCommands.BUY, customValues); + } + + function createOrderInfo(id: string): ScryptOrderInfo { + return { + id, + orderId: 'order-1', + symbol: 'USDT-CHF', + side: ScryptOrderSide.SELL, + status: ScryptOrderStatus.FILLED, + quantity: 100, + filledQuantity: 100, + remainingQuantity: 0, + }; + } + + it('should return the persisted ClOrdID on a transient WS error during sell', async () => { + const order = createSellOrder(); + scryptService.sell.mockRejectedValue(new Error('Connection closed')); + + const correlationId = await adapter.executeOrder(order); + + expect(correlationId).toEqual(order.correlationId); + expect(scryptService.sell).toHaveBeenCalledWith('USDT', 'CHF', 100, correlationId); + + // ClOrdID must be persisted before the order is sent + expect(orderRepo.save).toHaveBeenCalledWith(order); + expect(orderRepo.save.mock.invocationCallOrder[0]).toBeLessThan(scryptService.sell.mock.invocationCallOrder[0]); + }); + + it('should fail the order on a Scrypt rejection during sell', async () => { + const order = createSellOrder(); + scryptService.sell.mockRejectedValue(new Error('Scrypt order rejected: invalid order')); + + await expect(adapter.executeOrder(order)).rejects.toThrow(OrderFailedException); + }); + + it('should mark the order as not processable on insufficient funds during sell', async () => { + const order = createSellOrder(); + scryptService.sell.mockRejectedValue(new Error('Insufficient funds')); + + await expect(adapter.executeOrder(order)).rejects.toThrow(OrderNotProcessableException); + }); + + it('should not place a new order, if the existing ClOrdID is found at Scrypt', async () => { + const order = createSellOrder({ correlationId: 'existing-id' }); + scryptService.getOrderStatus.mockResolvedValue(createOrderInfo('existing-id')); + + const correlationId = await adapter.executeOrder(order); + + expect(correlationId).toEqual('existing-id'); + expect(scryptService.getOrderStatus).toHaveBeenCalledWith('existing-id'); + expect(scryptService.sell).not.toHaveBeenCalled(); + }); + + it('should place a new order and chain the ClOrdID, if the existing ClOrdID is not found at Scrypt', async () => { + const order = createSellOrder({ correlationId: 'lost-id' }); + scryptService.getOrderStatus.mockResolvedValue(null); + scryptService.sell.mockImplementation(async (_from, _to, _amount, clOrdId) => clOrdId); + + const correlationId = await adapter.executeOrder(order); + + expect(correlationId).not.toEqual('lost-id'); + expect(correlationId).toEqual(order.correlationId); + expect(order.previousCorrelationIds).toContain('lost-id'); + expect(scryptService.sell).toHaveBeenCalledWith('USDT', 'CHF', 100, correlationId); + }); + + it('should return the existing trade during sell, even if the balance is locked by the open order', async () => { + const order = createSellOrder({ correlationId: 'existing-id' }); + scryptService.getOrderStatus.mockResolvedValue(createOrderInfo('existing-id')); + scryptService.getAvailableBalance.mockResolvedValue(0); + + const correlationId = await adapter.executeOrder(order); + + expect(correlationId).toEqual('existing-id'); + expect(scryptService.sell).not.toHaveBeenCalled(); + }); + + it('should return the persisted ClOrdID on a transient WS error during buy', async () => { + const order = createBuyOrder(); + scryptService.getTradePair.mockResolvedValue({ symbol: 'USDT-CHF', side: ScryptOrderSide.BUY }); + scryptService.sell.mockRejectedValue(new Error('Connection closed')); + + const correlationId = await adapter.executeOrder(order); + + expect(correlationId).toEqual(order.correlationId); + expect(scryptService.sell).toHaveBeenCalledWith('CHF', 'USDT', 100, correlationId); + }); + + it('should return the persisted ClOrdID on a request timeout during sell', async () => { + const order = createSellOrder(); + scryptService.sell.mockRejectedValue(new Error('Timeout waiting for ExecutionReport update after 60000ms')); + + const correlationId = await adapter.executeOrder(order); + + expect(correlationId).toEqual(order.correlationId); + expect(scryptService.sell).toHaveBeenCalledWith('USDT', 'CHF', 100, correlationId); + }); + + it('should return the persisted ClOrdID on a request timeout without update wait during sell', async () => { + const order = createSellOrder(); + scryptService.sell.mockRejectedValue(new Error('Request timeout after 30000ms')); + + const correlationId = await adapter.executeOrder(order); + + expect(correlationId).toEqual(order.correlationId); + expect(scryptService.sell).toHaveBeenCalledWith('USDT', 'CHF', 100, correlationId); + }); + + it('should retry the completion check on a request timeout instead of failing the order', async () => { + const order = createSellOrder({ + status: LiquidityManagementOrderStatus.IN_PROGRESS, + correlationId: 'existing-id', + updated: new Date(), + }); + scryptService.checkTrade.mockRejectedValue(new Error('Request timeout after 30000ms')); + + await expect(adapter.checkCompletion(order)).resolves.toBe(false); + }); + + it('should return the existing ClOrdID, if the re-execution guard fails with a transient error', async () => { + const order = createSellOrder({ correlationId: 'existing-id' }); + scryptService.getOrderStatus.mockRejectedValue(new Error('Connection closed')); + + const correlationId = await adapter.executeOrder(order); + + expect(correlationId).toEqual('existing-id'); + expect(scryptService.sell).not.toHaveBeenCalled(); + }); + + it('should place a new order on a transient guard error, if no ClOrdID exists yet', async () => { + const order = createSellOrder(); + scryptService.getOrderStatus.mockRejectedValue(new Error('Connection closed')); + scryptService.sell.mockImplementation(async (_from, _to, _amount, clOrdId) => clOrdId); + + const correlationId = await adapter.executeOrder(order); + + expect(correlationId).toEqual(order.correlationId); + expect(scryptService.sell).toHaveBeenCalledWith('USDT', 'CHF', 100, correlationId); + }); +}); diff --git a/src/subdomains/core/liquidity-management/adapters/actions/scrypt.adapter.ts b/src/subdomains/core/liquidity-management/adapters/actions/scrypt.adapter.ts index 12032dee0c..226b5bcf9f 100644 --- a/src/subdomains/core/liquidity-management/adapters/actions/scrypt.adapter.ts +++ b/src/subdomains/core/liquidity-management/adapters/actions/scrypt.adapter.ts @@ -1,8 +1,9 @@ import { Injectable } from '@nestjs/common'; +import { randomUUID } from 'crypto'; import { Blockchain } from 'src/integration/blockchain/shared/enums/blockchain.enum'; import { ScryptOrderInfo, ScryptOrderSide, ScryptTransactionStatus } from 'src/integration/exchange/dto/scrypt.dto'; import { TradeChangedException } from 'src/integration/exchange/exceptions/trade-changed.exception'; -import { isTransientWsError } from 'src/integration/exchange/services/scrypt-websocket-connection'; +import { isTransientWsError, isWsTimeoutError } from 'src/integration/exchange/services/scrypt-websocket-connection'; import { ScryptService } from 'src/integration/exchange/services/scrypt.service'; import { Asset } from 'src/shared/models/asset/asset.entity'; import { AssetService } from 'src/shared/models/asset/asset.service'; @@ -108,6 +109,11 @@ export class ScryptAdapter extends LiquidityActionAdapter { } private async sell(order: LiquidityManagementOrder): Promise { + // re-execution guard must run before the price and balance checks: an already placed + // sell order locks the balance, which would fail the balance check before the guard + const existingTradeId = await this.findExistingTrade(order); + if (existingTradeId) return existingTradeId; + const { tradeAsset, maxPriceDeviation } = this.parseTradeParams(order.action.paramMap); // Structural guard: Scrypt BTC/EUR (and other Scrypt BTC pairs) have materially worse spreads @@ -138,6 +144,10 @@ export class ScryptAdapter extends LiquidityActionAdapter { } private async buy(order: LiquidityManagementOrder): Promise { + // re-execution guard: the trade may already have been placed (e.g. crash or transient error after send) + const existingTradeId = await this.findExistingTrade(order); + if (existingTradeId) return existingTradeId; + const { tradeAsset, maxPriceDeviation } = this.parseTradeParams(order.action.paramMap); const targetAssetEntity = order.pipeline.rule.targetAsset; @@ -172,15 +182,9 @@ export class ScryptAdapter extends LiquidityActionAdapter { order.inputAsset = tradeAsset; order.outputAsset = targetAssetEntity.dexName; - try { - return await this.scryptService.sell(tradeAsset, targetAssetEntity.dexName, amount); - } catch (e) { - if (this.isBalanceTooLowError(e)) { - throw new OrderNotProcessableException(e.message); - } - - throw e; - } + return this.placeTrade(order, (clOrdId) => + this.scryptService.sell(tradeAsset, targetAssetEntity.dexName, amount, clOrdId), + ); } // --- COMPLETION CHECKS --- // @@ -223,7 +227,9 @@ export class ScryptAdapter extends LiquidityActionAdapter { private async checkTradeCompletion(order: LiquidityManagementOrder, from: string, to: string): Promise { try { - const isComplete = await this.scryptService.checkTrade(order.correlationId, from, to, order.created); + // use order.updated as the "lost order" anchor: it reflects the placement time + // (updated on the placeTrade save), while order.created may be much older + const isComplete = await this.scryptService.checkTrade(order.correlationId, from, to, order.updated); if (isComplete) { order.outputAmount = await this.aggregateTradeOutput(order); @@ -237,8 +243,8 @@ export class ScryptAdapter extends LiquidityActionAdapter { return false; } - if (isTransientWsError(e)) { - this.logger.warn(`Transient WS error checking order ${order.id}, will retry next tick: ${e.message}`); + if (this.isRecoverableWsError(e)) { + this.logger.warn(`Recoverable WS error checking order ${order.id}, will retry next tick: ${e.message}`); return false; } @@ -345,20 +351,71 @@ export class ScryptAdapter extends LiquidityActionAdapter { fromAsset: string, toAsset: string, ): Promise { + // re-execution guard already ran in sell() order.inputAmount = amount; order.inputAsset = fromAsset; order.outputAsset = toAsset; + return this.placeTrade(order, (clOrdId) => this.scryptService.sell(fromAsset, toAsset, amount, clOrdId)); + } + + private async findExistingTrade(order: LiquidityManagementOrder): Promise { + if (!order.correlationId) return undefined; + + try { + const existingOrder = await this.scryptService.getOrderStatus(order.correlationId); + return existingOrder ? order.correlationId : undefined; + } catch (e) { + if (this.isRecoverableWsError(e)) { + // a live order with this ClOrdID may exist, so do not place a new one — the completion check will verify + this.logger.warn(`Recoverable error checking for existing trade of order ${order.id}: ${e.message}`); + return order.correlationId; + } + + throw e; + } + } + + private async placeTrade( + order: LiquidityManagementOrder, + place: (clOrdId: string) => Promise, + ): Promise { + // persist the ClOrdID before sending, so the trade can be verified even if the connection drops mid-request + const clOrdId = randomUUID(); + if (order.correlationId) { + order.updateCorrelationId(clOrdId); + } else { + order.correlationId = clOrdId; + } + await this.orderRepo.save(order); + try { - return await this.scryptService.sell(fromAsset, toAsset, amount); + return await place(clOrdId); } catch (e) { if (this.isBalanceTooLowError(e)) { throw new OrderNotProcessableException(e.message); } + + if (this.isRecoverableWsError(e)) { + // the order may still have been executed at Scrypt, the completion check will verify via the persisted ClOrdID + this.logger.warn( + `Recoverable error placing trade for order ${order.id}, completion check will verify: ${e.message}`, + ); + return clOrdId; + } + throw e; } } + // timeouts are recoverable here (not in TRANSIENT_WS_ERROR_MARKERS): the order state at Scrypt + // is unknown after a timed-out request, so placement and completion checks must retry instead of + // failing the order, but a global marker would also change the automatic re-send behavior of + // fetch/fetchAll + private isRecoverableWsError(e: Error): boolean { + return isTransientWsError(e) || isWsTimeoutError(e); + } + private isBalanceTooLowError(e: Error): boolean { return ['Insufficient funds', 'insufficient balance', 'Insufficient position', 'not enough balance'].some((m) => e.message?.toLowerCase().includes(m.toLowerCase()),