diff --git a/apps/consumer/src/workers/export.worker.js b/apps/consumer/src/workers/export.worker.js index 046def7c..727e87e7 100644 --- a/apps/consumer/src/workers/export.worker.js +++ b/apps/consumer/src/workers/export.worker.js @@ -6,12 +6,12 @@ const path = require('path'); const { GetObjectCommand } = require('@aws-sdk/client-s3'); const { getSignedUrl } = require('@aws-sdk/s3-request-presigner'); -const { - redis, - exportQueue, +const { + redis, + exportQueue, emailQueue, - Project, - getConnection, + Project, + getConnection, getCompiledModel, getS3CompatibleStorage, getStorage, @@ -19,6 +19,29 @@ const { getBucket } = require('@urbackend/common'); +// Maximum documents written per export. +// The cursor is queried for MAX_EXPORT_ROWS + 1 documents so we can +// distinguish a truncated result (collection has more documents than the cap) +// from an exact match (collection has exactly MAX_EXPORT_ROWS documents). +const MAX_EXPORT_ROWS = 100000; + +/** + * Write a chunk to a Writable stream and wait for the drain event if the + * internal buffer is full. Honoring backpressure prevents unbounded memory + * growth when disk or object-storage throughput is slower than Mongo cursor + * throughput. + * + * @param {import('stream').Writable} stream + * @param {string} chunk + * @returns {Promise} + */ +async function writeChunk(stream, chunk) { + const canContinue = stream.write(chunk); + if (!canContinue) { + await new Promise((resolve) => stream.once('drain', resolve)); + } +} + const initExportWorker = () => { const worker = new Worker(exportQueue.name, async (job) => { const { projectId, collectionName, userId, email } = job.data; @@ -28,7 +51,7 @@ const initExportWorker = () => { "name collections resources.db.isExternal resources.storage.isExternal +resources.storage.config.encrypted +resources.storage.config.iv +resources.storage.config.tag" ); if (!project) throw new Error('Project not found'); - + const col = project.collections.find(c => c.name === collectionName); if (!col) throw new Error(`Collection ${collectionName} not found`); @@ -51,41 +74,72 @@ const initExportWorker = () => { console.log(`[ExportWorker] Preparing upload to storage (Provider: ${provider})...`); + // wasTruncated is set when the collection contains more than MAX_EXPORT_ROWS + // documents. The cursor is limited to MAX_EXPORT_ROWS + 1 so we can detect + // truncation without a separate count query. + let wasTruncated = false; + if (provider === 'supabase') { const tempFilePath = path.join(os.tmpdir(), `export_${projectId}_${collectionName}_${Date.now()}.json`); const writeStream = fs.createWriteStream(tempFilePath); - + try { - writeStream.write('{\n'); + await writeChunk(writeStream, '{\n'); const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal); - - writeStream.write(` "${col.name}": [\n`); - - const cursor = Model.find().lean().cursor(); + + await writeChunk(writeStream, ` "${col.name}": [\n`); + + // Query one extra row to detect whether the result was capped. + const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS + 1).cursor(); let first = true; - - for await (const doc of cursor) { - if (!first) writeStream.write(',\n'); - writeStream.write(` ${JSON.stringify(doc)}`); - first = false; + let exportedCount = 0; + + // Wrap cursor iteration in try/finally to guarantee the cursor is + // closed even when the loop exits early via break (truncation case). + // An unclosed cursor holds a MongoDB server-side cursor open indefinitely. + try { + for await (const doc of cursor) { + if (exportedCount >= MAX_EXPORT_ROWS) { + // We received the sentinel row; mark truncated and stop writing. + wasTruncated = true; + break; + } + exportedCount++; + if (!first) await writeChunk(writeStream, ',\n'); + await writeChunk(writeStream, ` ${JSON.stringify(doc)}`); + first = false; + } + } finally { + await cursor.close(); } - - writeStream.write('\n ]\n'); - writeStream.write('}\n'); - writeStream.end(); - await new Promise((resolve, reject) => { + if (wasTruncated) { + console.warn(`[ExportWorker] Export truncated: collection exceeds the ${MAX_EXPORT_ROWS}-document cap`); + } + + await writeChunk(writeStream, '\n ]\n'); + await writeChunk(writeStream, '}\n'); + + // Register the finish listener BEFORE calling end() to avoid a + // race condition where the finish event fires before the promise + // listener is attached, causing the promise to never resolve. + const finishPromise = new Promise((resolve, reject) => { writeStream.on('finish', resolve); writeStream.on('error', reject); }); + writeStream.end(); + await finishPromise; console.log(`[ExportWorker] Temp file created, uploading...`); - const fileBuffer = fs.readFileSync(tempFilePath); - - const { error } = await client.storage.from(bucket).upload(storagePath, fileBuffer, { + + // Stream the file directly instead of reading it all into a Buffer. + // readFileSync would load the entire export (potentially hundreds of MB) + // into memory, negating the purpose of the temp-file strategy. + const readStream = fs.createReadStream(tempFilePath); + const { error } = await client.storage.from(bucket).upload(storagePath, readStream, { contentType: 'application/json' }); - + if (error) throw error; } finally { if (fs.existsSync(tempFilePath)) { @@ -102,24 +156,40 @@ const initExportWorker = () => { }); try { - passThrough.write('{\n'); - + await writeChunk(passThrough, '{\n'); + const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal); - - passThrough.write(` "${col.name}": [\n`); - - const cursor = Model.find().lean().cursor(); + + await writeChunk(passThrough, ` "${col.name}": [\n`); + + // Query one extra row to detect whether the result was capped. + const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS + 1).cursor(); let first = true; - - for await (const doc of cursor) { - if (!first) passThrough.write(',\n'); - passThrough.write(` ${JSON.stringify(doc)}`); - first = false; + let exportedCount = 0; + + // Wrap cursor iteration in try/finally to guarantee the cursor is + // closed even when the loop exits early via break (truncation case). + try { + for await (const doc of cursor) { + if (exportedCount >= MAX_EXPORT_ROWS) { + wasTruncated = true; + break; + } + exportedCount++; + if (!first) await writeChunk(passThrough, ',\n'); + await writeChunk(passThrough, ` ${JSON.stringify(doc)}`); + first = false; + } + } finally { + await cursor.close(); + } + + if (wasTruncated) { + console.warn(`[ExportWorker] Export truncated: collection exceeds the ${MAX_EXPORT_ROWS}-document cap`); } - - passThrough.write('\n ]\n'); - - passThrough.write('}\n'); + + await writeChunk(passThrough, '\n ]\n'); + await writeChunk(passThrough, '}\n'); passThrough.end(); console.log(`[ExportWorker] Database stream ended. Awaiting final storage upload...`); @@ -144,9 +214,16 @@ const initExportWorker = () => { downloadUrl = await getSignedUrl(s3Client, command, { expiresIn: 86400 }); } - // queue the email to be sent to the user - await emailQueue.add('send-export-email', { email, downloadUrl, projectName: project.name }); - console.log(`[ExportWorker] Export completed! Email queued for ${email}`); + // Pass wasTruncated to the email handler so the recipient knows + // the file contains a capped subset of the collection. + await emailQueue.add('send-export-email', { + email, + downloadUrl, + projectName: project.name, + wasTruncated, + maxExportRows: MAX_EXPORT_ROWS + }); + console.log(`[ExportWorker] Export completed! Email queued for ${email}${wasTruncated ? ' (truncated)' : ''}`); }, { connection: redis, concurrency: 2 }); worker.on('completed', (job) => {