diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts index c51eacc6..8cdf196c 100644 --- a/src/statement/stream/serverSideStream.ts +++ b/src/statement/stream/serverSideStream.ts @@ -13,7 +13,6 @@ export class ServerSideStream extends Readable { private readonly pendingRows: Row[] = []; private finished = false; private processingData = false; - private inputPaused = false; private readonly bufferGrowthThreshold = 10; // Stop adding to buffer when over this many rows are already in private lineBuffer = ""; private sourceStream: NodeJS.ReadableStream | null = null; @@ -70,11 +69,10 @@ export class ServerSideStream extends Readable { if ( this.pendingRows.length > this.bufferGrowthThreshold && this.sourceStream && - !this.inputPaused && + !this.sourceStream.isPaused() && !this.processingData ) { this.sourceStream.pause(); - this.inputPaused = true; } } @@ -95,27 +93,31 @@ export class ServerSideStream extends Readable { try { const parsed = JSONbig.parse(line); if (parsed) { - if (parsed.message_type === "DATA") { - this.handleDataMessage(parsed); - } else if (parsed.message_type === "START") { - this.meta = getNormalizedMeta(parsed.result_columns); - this.emit("meta", this.meta); - } else if (parsed.message_type === "FINISH_SUCCESSFULLY") { - this.finished = true; - this.tryPushPendingData(); - } else if (parsed.message_type === "FINISH_WITH_ERRORS") { - // Ensure source stream is resumed before destroying to prevent hanging - if (this.sourceStream && this.inputPaused) { - this.sourceStream.resume(); - this.inputPaused = false; - } - this.destroy( - new Error( - `Result encountered an error: ${parsed.errors - .map((error: { description: string }) => error.description) - .join("\n")}` - ) - ); + switch (parsed.message_type) { + case "DATA": + this.handleDataMessage(parsed); + break; + case "START": + this.meta = getNormalizedMeta(parsed.result_columns); + this.emit("meta", this.meta); + break; + case "FINISH_SUCCESSFULLY": + this.finished = true; + this.tryPushPendingData(); + break; + case "FINISH_WITH_ERRORS": + // Ensure source stream is resumed before destroying to prevent hanging + if (this.sourceStream && this.sourceStream.isPaused()) { + this.sourceStream.resume(); + } + this.destroy( + new Error( + `Result encountered an error: ${parsed.errors + .map((error: { description: string }) => error.description) + .join("\n")}` + ) + ); + break; } } else { this.destroy(new Error(`Result row could not be parsed: ${line}`)); @@ -137,10 +139,8 @@ export class ServerSideStream extends Readable { // Add to pending rows buffer this.pendingRows.push(...normalizedData); - // Try to push data immediately if not already processing - if (!this.processingData) { - this.tryPushPendingData(); - } + // Try to push pending data immediately + this.tryPushPendingData(); } } @@ -157,8 +157,7 @@ export class ServerSideStream extends Readable { // If push returns false, stop pushing and wait for _read to be called if (!canContinue) { - this.processingData = false; - return; + break; } } @@ -172,29 +171,26 @@ export class ServerSideStream extends Readable { this.processingData = false; } + // Called when the stream is ready for more data. _read() { - // Called when the stream is ready for more data - if (!this.processingData && this.pendingRows.length > 0) { - this.tryPushPendingData(); - } + // If there is pending data, push it first + this.tryPushPendingData(); - // Also resume source stream if it was paused and we have capacity + // Resume source stream if it was paused and we have capacity if ( this.sourceStream && - this.inputPaused && + this.sourceStream.isPaused() && this.pendingRows.length < this.bufferGrowthThreshold ) { this.sourceStream.resume(); - this.inputPaused = false; } } _destroy(err: Error | null, callback: (error?: Error | null) => void) { if (this.sourceStream) { // Resume stream if paused to ensure proper cleanup - if (this.inputPaused) { + if (this.sourceStream.isPaused()) { this.sourceStream.resume(); - this.inputPaused = false; } // Only call destroy if it exists (for Node.js streams) diff --git a/test/unit/v2/serverSideStream.test.ts b/test/unit/v2/serverSideStream.test.ts index 91da0897..1d8217c1 100644 --- a/test/unit/v2/serverSideStream.test.ts +++ b/test/unit/v2/serverSideStream.test.ts @@ -81,20 +81,27 @@ describe("ServerSideStream", () => { executeQueryOptions ); + let isPaused = false; let pauseCalled = false; let resumeCalled = false; // Set up mocks AFTER ServerSideStream is created to avoid initialization issues sourceStream.pause = jest.fn(() => { pauseCalled = true; + isPaused = true; return sourceStream; }); sourceStream.resume = jest.fn(() => { resumeCalled = true; + isPaused = false; return sourceStream; }); + sourceStream.isPaused = jest.fn(() => { + return isPaused; + }); + serverSideStream.on("error", done); // Send START message first @@ -185,7 +192,7 @@ describe("ServerSideStream", () => { const dataMessage = JSON.stringify({ message_type: "DATA", - data: [[i * 2 + 0], [i * 2 + 1]] + data: [[i * 2], [i * 2 + 1]] }) + "\n"; sourceStream.write(dataMessage); } @@ -201,6 +208,56 @@ describe("ServerSideStream", () => { }, 50); }); + it("should end stream when rows exceed highWaterMark and push returns false on last pending row", async () => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + const totalRows = 20; + + // Send START message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send all rows in a single DATA message so pendingRows accumulates > highWaterMark (16) + sourceStream.write( + JSON.stringify({ + message_type: "DATA", + data: Array.from({ length: totalRows }, (_, idx) => [idx]) + }) + "\n" + ); + + // Send FINISH message + sourceStream.write( + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + "\n" + ); + + sourceStream.end(); + + // Wait for handleInputEnd to fire (sourceStream 'end' event). + // This ensures tryPushPendingData runs while the buffer is still full, + // hitting backpressure and returning without calling push(null). + await new Promise(r => setTimeout(r, 50)); + + // Consume the stream + const dataEvents: unknown[] = []; + for await (const row of serverSideStream) { + dataEvents.push(row); + } + + expect(dataEvents).toHaveLength(totalRows); + }); + it("should handle error messages and cleanup properly", done => { const sourceStream = new PassThrough(); mockResponse.body = sourceStream;