From 6cbd6ead90ed61b842f01ed9bca83da60fe34731 Mon Sep 17 00:00:00 2001 From: anshul23102 Date: Tue, 2 Jun 2026 14:48:04 +0530 Subject: [PATCH 1/3] Fix resource exhaustion in consumer export worker by limiting collection streaming Issue #249: The export worker streams entire collections without any document count cap, causing memory exhaustion and long-running operations on large collections. Changes: - Add MAX_EXPORT_ROWS constant (100000) to cap maximum documents per export - Add .limit(MAX_EXPORT_ROWS) to MongoDB cursor in both storage providers (Supabase and S3/Cloudflare R2) - Add exported document counter to track progress - Log truncation warning when export hits the limit This prevents unbounded collection iteration and protects against resource exhaustion while allowing exports of reasonably-sized datasets up to 100k documents. --- apps/consumer/src/workers/export.worker.js | 43 +++++++++++++++------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/apps/consumer/src/workers/export.worker.js b/apps/consumer/src/workers/export.worker.js index 046def7c..eadf8820 100644 --- a/apps/consumer/src/workers/export.worker.js +++ b/apps/consumer/src/workers/export.worker.js @@ -6,12 +6,12 @@ const path = require('path'); const { GetObjectCommand } = require('@aws-sdk/client-s3'); const { getSignedUrl } = require('@aws-sdk/s3-request-presigner'); -const { - redis, - exportQueue, +const { + redis, + exportQueue, emailQueue, - Project, - getConnection, + Project, + getConnection, getCompiledModel, getS3CompatibleStorage, getStorage, @@ -19,6 +19,9 @@ const { getBucket } = require('@urbackend/common'); +// Maximum documents per export to prevent resource exhaustion and unbounded collection streaming +const MAX_EXPORT_ROWS = 100000; + const initExportWorker = () => { const worker = new Worker(exportQueue.name, async (job) => { const { projectId, collectionName, userId, email } = job.data; @@ -60,16 +63,22 @@ const initExportWorker = () => { const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal); writeStream.write(` "${col.name}": [\n`); - - const cursor = Model.find().lean().cursor(); + + const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS).cursor(); let first = true; - + let exportedCount = 0; + for await (const doc of cursor) { + exportedCount++; if (!first) writeStream.write(',\n'); writeStream.write(` ${JSON.stringify(doc)}`); first = false; } - + + if (exportedCount >= MAX_EXPORT_ROWS) { + console.warn(`[ExportWorker] Export truncated: reached limit of ${MAX_EXPORT_ROWS} documents`); + } + writeStream.write('\n ]\n'); writeStream.write('}\n'); writeStream.end(); @@ -107,18 +116,24 @@ const initExportWorker = () => { const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal); passThrough.write(` "${col.name}": [\n`); - - const cursor = Model.find().lean().cursor(); + + const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS).cursor(); let first = true; - + let exportedCount = 0; + for await (const doc of cursor) { + exportedCount++; if (!first) passThrough.write(',\n'); passThrough.write(` ${JSON.stringify(doc)}`); first = false; } - + + if (exportedCount >= MAX_EXPORT_ROWS) { + console.warn(`[ExportWorker] Export truncated: reached limit of ${MAX_EXPORT_ROWS} documents`); + } + passThrough.write('\n ]\n'); - + passThrough.write('}\n'); passThrough.end(); From 5a8fd0f73585d4525f9d4a80d343bb9052a058e2 Mon Sep 17 00:00:00 2001 From: anshul23102 Date: Tue, 2 Jun 2026 23:01:34 +0530 Subject: [PATCH 2/3] fix(export): address CodeRabbit review feedback Three issues raised in code review: 1. Sentinel row detection: the cursor now queries MAX_EXPORT_ROWS + 1 documents instead of MAX_EXPORT_ROWS. Writing stops after MAX_EXPORT_ROWS documents and wasTruncated is set to true only when the (MAX_EXPORT_ROWS+1)th document is received. This prevents a false truncation warning when the collection contains exactly MAX_EXPORT_ROWS documents. 2. Stream backpressure: all write() calls are replaced with writeChunk() which awaits the drain event when the internal buffer is full. This prevents unbounded memory growth when disk or object-storage throughput is slower than the Mongo cursor. 3. Surface truncation to recipient: wasTruncated and maxExportRows are now passed to the emailQueue payload so the email handler can inform the user that the downloaded file contains a capped subset. --- apps/consumer/src/workers/export.worker.js | 102 +++++++++++++++------ 1 file changed, 72 insertions(+), 30 deletions(-) diff --git a/apps/consumer/src/workers/export.worker.js b/apps/consumer/src/workers/export.worker.js index eadf8820..7e8a3295 100644 --- a/apps/consumer/src/workers/export.worker.js +++ b/apps/consumer/src/workers/export.worker.js @@ -19,9 +19,29 @@ const { getBucket } = require('@urbackend/common'); -// Maximum documents per export to prevent resource exhaustion and unbounded collection streaming +// Maximum documents written per export. +// The cursor is queried for MAX_EXPORT_ROWS + 1 documents so we can +// distinguish a truncated result (collection has more documents than the cap) +// from an exact match (collection has exactly MAX_EXPORT_ROWS documents). const MAX_EXPORT_ROWS = 100000; +/** + * Write a chunk to a Writable stream and wait for the drain event if the + * internal buffer is full. Honoring backpressure prevents unbounded memory + * growth when disk or object-storage throughput is slower than Mongo cursor + * throughput. + * + * @param {import('stream').Writable} stream + * @param {string} chunk + * @returns {Promise} + */ +async function writeChunk(stream, chunk) { + const canContinue = stream.write(chunk); + if (!canContinue) { + await new Promise((resolve) => stream.once('drain', resolve)); + } +} + const initExportWorker = () => { const worker = new Worker(exportQueue.name, async (job) => { const { projectId, collectionName, userId, email } = job.data; @@ -31,7 +51,7 @@ const initExportWorker = () => { "name collections resources.db.isExternal resources.storage.isExternal +resources.storage.config.encrypted +resources.storage.config.iv +resources.storage.config.tag" ); if (!project) throw new Error('Project not found'); - + const col = project.collections.find(c => c.name === collectionName); if (!col) throw new Error(`Collection ${collectionName} not found`); @@ -54,33 +74,44 @@ const initExportWorker = () => { console.log(`[ExportWorker] Preparing upload to storage (Provider: ${provider})...`); + // wasTruncated is set when the collection contains more than MAX_EXPORT_ROWS + // documents. The cursor is limited to MAX_EXPORT_ROWS + 1 so we can detect + // truncation without a separate count query. + let wasTruncated = false; + if (provider === 'supabase') { const tempFilePath = path.join(os.tmpdir(), `export_${projectId}_${collectionName}_${Date.now()}.json`); const writeStream = fs.createWriteStream(tempFilePath); - + try { - writeStream.write('{\n'); + await writeChunk(writeStream, '{\n'); const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal); - - writeStream.write(` "${col.name}": [\n`); - const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS).cursor(); + await writeChunk(writeStream, ` "${col.name}": [\n`); + + // Query one extra row to detect whether the result was capped. + const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS + 1).cursor(); let first = true; let exportedCount = 0; for await (const doc of cursor) { + if (exportedCount >= MAX_EXPORT_ROWS) { + // We received the sentinel row; mark truncated and stop writing. + wasTruncated = true; + break; + } exportedCount++; - if (!first) writeStream.write(',\n'); - writeStream.write(` ${JSON.stringify(doc)}`); + if (!first) await writeChunk(writeStream, ',\n'); + await writeChunk(writeStream, ` ${JSON.stringify(doc)}`); first = false; } - if (exportedCount >= MAX_EXPORT_ROWS) { - console.warn(`[ExportWorker] Export truncated: reached limit of ${MAX_EXPORT_ROWS} documents`); + if (wasTruncated) { + console.warn(`[ExportWorker] Export truncated: collection exceeds the ${MAX_EXPORT_ROWS}-document cap`); } - writeStream.write('\n ]\n'); - writeStream.write('}\n'); + await writeChunk(writeStream, '\n ]\n'); + await writeChunk(writeStream, '}\n'); writeStream.end(); await new Promise((resolve, reject) => { @@ -90,11 +121,11 @@ const initExportWorker = () => { console.log(`[ExportWorker] Temp file created, uploading...`); const fileBuffer = fs.readFileSync(tempFilePath); - + const { error } = await client.storage.from(bucket).upload(storagePath, fileBuffer, { contentType: 'application/json' }); - + if (error) throw error; } finally { if (fs.existsSync(tempFilePath)) { @@ -111,30 +142,34 @@ const initExportWorker = () => { }); try { - passThrough.write('{\n'); - + await writeChunk(passThrough, '{\n'); + const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal); - - passThrough.write(` "${col.name}": [\n`); - const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS).cursor(); + await writeChunk(passThrough, ` "${col.name}": [\n`); + + // Query one extra row to detect whether the result was capped. + const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS + 1).cursor(); let first = true; let exportedCount = 0; for await (const doc of cursor) { + if (exportedCount >= MAX_EXPORT_ROWS) { + wasTruncated = true; + break; + } exportedCount++; - if (!first) passThrough.write(',\n'); - passThrough.write(` ${JSON.stringify(doc)}`); + if (!first) await writeChunk(passThrough, ',\n'); + await writeChunk(passThrough, ` ${JSON.stringify(doc)}`); first = false; } - if (exportedCount >= MAX_EXPORT_ROWS) { - console.warn(`[ExportWorker] Export truncated: reached limit of ${MAX_EXPORT_ROWS} documents`); + if (wasTruncated) { + console.warn(`[ExportWorker] Export truncated: collection exceeds the ${MAX_EXPORT_ROWS}-document cap`); } - passThrough.write('\n ]\n'); - - passThrough.write('}\n'); + await writeChunk(passThrough, '\n ]\n'); + await writeChunk(passThrough, '}\n'); passThrough.end(); console.log(`[ExportWorker] Database stream ended. Awaiting final storage upload...`); @@ -159,9 +194,16 @@ const initExportWorker = () => { downloadUrl = await getSignedUrl(s3Client, command, { expiresIn: 86400 }); } - // queue the email to be sent to the user - await emailQueue.add('send-export-email', { email, downloadUrl, projectName: project.name }); - console.log(`[ExportWorker] Export completed! Email queued for ${email}`); + // Pass wasTruncated to the email handler so the recipient knows + // the file contains a capped subset of the collection. + await emailQueue.add('send-export-email', { + email, + downloadUrl, + projectName: project.name, + wasTruncated, + maxExportRows: MAX_EXPORT_ROWS + }); + console.log(`[ExportWorker] Export completed! Email queued for ${email}${wasTruncated ? ' (truncated)' : ''}`); }, { connection: redis, concurrency: 2 }); worker.on('completed', (job) => { From 7d5d87567d22ff925339685c6554314f94343f74 Mon Sep 17 00:00:00 2001 From: anshul23102 Date: Mon, 8 Jun 2026 08:16:39 +0530 Subject: [PATCH 3/3] fix: address CodeRabbit review comments in export worker (#249) Three issues fixed as requested by maintainer and CodeRabbit: 1. Replace readFileSync with createReadStream (Supabase path) readFileSync loaded the entire export into a single Node.js Buffer, potentially hundreds of MB for a 100k-document export. Now streams the temp file directly to Supabase storage, keeping memory usage flat regardless of export size. 2. Explicitly close cursor with try/finally (both paths) When the for-await loop exits via break (truncation case), the Mongoose cursor was left open on the MongoDB server indefinitely. Wrapping both cursor loops in try/finally ensures cursor.close() is always called, even on early exit. 3. Register finish listener before calling writeStream.end() (Supabase path) If the stream flushed and emitted 'finish' between the .end() call and the new Promise() constructor, the promise would never resolve. The listener is now registered before .end() is called. --- apps/consumer/src/workers/export.worker.js | 62 ++++++++++++++-------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/apps/consumer/src/workers/export.worker.js b/apps/consumer/src/workers/export.worker.js index 7e8a3295..727e87e7 100644 --- a/apps/consumer/src/workers/export.worker.js +++ b/apps/consumer/src/workers/export.worker.js @@ -94,16 +94,23 @@ const initExportWorker = () => { let first = true; let exportedCount = 0; - for await (const doc of cursor) { - if (exportedCount >= MAX_EXPORT_ROWS) { - // We received the sentinel row; mark truncated and stop writing. - wasTruncated = true; - break; + // Wrap cursor iteration in try/finally to guarantee the cursor is + // closed even when the loop exits early via break (truncation case). + // An unclosed cursor holds a MongoDB server-side cursor open indefinitely. + try { + for await (const doc of cursor) { + if (exportedCount >= MAX_EXPORT_ROWS) { + // We received the sentinel row; mark truncated and stop writing. + wasTruncated = true; + break; + } + exportedCount++; + if (!first) await writeChunk(writeStream, ',\n'); + await writeChunk(writeStream, ` ${JSON.stringify(doc)}`); + first = false; } - exportedCount++; - if (!first) await writeChunk(writeStream, ',\n'); - await writeChunk(writeStream, ` ${JSON.stringify(doc)}`); - first = false; + } finally { + await cursor.close(); } if (wasTruncated) { @@ -112,17 +119,24 @@ const initExportWorker = () => { await writeChunk(writeStream, '\n ]\n'); await writeChunk(writeStream, '}\n'); - writeStream.end(); - await new Promise((resolve, reject) => { + // Register the finish listener BEFORE calling end() to avoid a + // race condition where the finish event fires before the promise + // listener is attached, causing the promise to never resolve. + const finishPromise = new Promise((resolve, reject) => { writeStream.on('finish', resolve); writeStream.on('error', reject); }); + writeStream.end(); + await finishPromise; console.log(`[ExportWorker] Temp file created, uploading...`); - const fileBuffer = fs.readFileSync(tempFilePath); - const { error } = await client.storage.from(bucket).upload(storagePath, fileBuffer, { + // Stream the file directly instead of reading it all into a Buffer. + // readFileSync would load the entire export (potentially hundreds of MB) + // into memory, negating the purpose of the temp-file strategy. + const readStream = fs.createReadStream(tempFilePath); + const { error } = await client.storage.from(bucket).upload(storagePath, readStream, { contentType: 'application/json' }); @@ -153,15 +167,21 @@ const initExportWorker = () => { let first = true; let exportedCount = 0; - for await (const doc of cursor) { - if (exportedCount >= MAX_EXPORT_ROWS) { - wasTruncated = true; - break; + // Wrap cursor iteration in try/finally to guarantee the cursor is + // closed even when the loop exits early via break (truncation case). + try { + for await (const doc of cursor) { + if (exportedCount >= MAX_EXPORT_ROWS) { + wasTruncated = true; + break; + } + exportedCount++; + if (!first) await writeChunk(passThrough, ',\n'); + await writeChunk(passThrough, ` ${JSON.stringify(doc)}`); + first = false; } - exportedCount++; - if (!first) await writeChunk(passThrough, ',\n'); - await writeChunk(passThrough, ` ${JSON.stringify(doc)}`); - first = false; + } finally { + await cursor.close(); } if (wasTruncated) {