From 3e14c1ad19fd03019d80c4371c88656cffc8ef91 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Fri, 19 Jun 2026 16:50:22 -0700 Subject: [PATCH] stream: fix merge abort for pending sources Wake the multi-source merge loop when its abort signal fires so a pending read rejects instead of waiting for another source to settle. When the primary error is the abort reason, do not await iterator cleanup because sources may already be stuck in pending next() calls. Fixes: https://github.com/nodejs/node/issues/64012 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/consumers.js | 40 +++++++++++++++++-- .../test-stream-iter-consumers-merge.js | 20 ++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/iter/consumers.js b/lib/internal/streams/iter/consumers.js index 1162439bf88c3a..2c21a076dfc9d8 100644 --- a/lib/internal/streams/iter/consumers.js +++ b/lib/internal/streams/iter/consumers.js @@ -39,6 +39,10 @@ const { validateObject, } = require('internal/validators'); +const { + markPromiseAsHandled, +} = internalBinding('util'); + const { from, fromSync, @@ -434,6 +438,20 @@ function merge(...args) { const ready = []; let activeCount = normalized.length; let waitResolve = null; + let onAbort; + + if (signal) { + onAbort = () => { + if (waitResolve) { + waitResolve(); + waitResolve = null; + } + }; + signal.addEventListener('abort', onAbort, { + __proto__: null, + once: true, + }); + } // Called when a source's .next() settles. Pushes the result into // the ready queue and wakes the consumer if it's waiting. @@ -498,27 +516,43 @@ function merge(...args) { if (activeCount > 0) { await new Promise((resolve) => { waitResolve = resolve; + if (signal?.aborted) { + waitResolve = null; + resolve(); + } }); } } } catch (err) { primaryError = err; } finally { + if (onAbort !== undefined) { + signal.removeEventListener('abort', onAbort); + } // Clean up: return all iterators. Cleanup errors are not // swallowed - a broken iterator.return() (e.g., failing to // release a resource) should be visible to the caller. - await cleanupIterators(iterators, primaryError); + await cleanupIterators( + iterators, + primaryError, + signal?.aborted && primaryError === signal.reason, + ); } }, }; } -async function cleanupIterators(iterators, primaryError) { +async function cleanupIterators(iterators, primaryError, skipAwaitCleanup) { let cleanupError; await SafePromiseAllReturnVoid(iterators, async (iterator) => { if (iterator.return) { try { - await iterator.return(); + const result = iterator.return(); + if (skipAwaitCleanup) { + markPromiseAsHandled(result); + } else { + await result; + } } catch (err) { // Keep the first cleanup error encountered. cleanupError ??= err; diff --git a/test/parallel/test-stream-iter-consumers-merge.js b/test/parallel/test-stream-iter-consumers-merge.js index 97cc8b9ac89477..b551599f731462 100644 --- a/test/parallel/test-stream-iter-consumers-merge.js +++ b/test/parallel/test-stream-iter-consumers-merge.js @@ -151,6 +151,25 @@ async function testMergeSignalMidIteration() { await assert.rejects(() => iter.next(), { name: 'AbortError' }); } +async function testMergeSignalDuringPendingMultiSourceRead() { + const ac = new AbortController(); + + async function* pending() { + await new Promise(() => {}); + yield []; + } + + const iter = merge(pending(), pending(), { + __proto__: null, + signal: ac.signal, + })[Symbol.asyncIterator](); + + const next = iter.next(); + ac.abort(); + + await assert.rejects(next, { name: 'AbortError' }); +} + // merge() accepts string sources (normalized via from()) async function testMergeStringSources() { const batches = []; @@ -286,6 +305,7 @@ Promise.all([ testMergeSourceError(), testMergeConsumerBreak(), testMergeSignalMidIteration(), + testMergeSignalDuringPendingMultiSourceRead(), testMergeStringSources(), testMergeObjectLikeSources(), testMergeCleanupErrorOnly(),