Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 121 additions & 44 deletions apps/consumer/src/workers/export.worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,42 @@ const path = require('path');
const { GetObjectCommand } = require('@aws-sdk/client-s3');
const { getSignedUrl } = require('@aws-sdk/s3-request-presigner');

const {
redis,
exportQueue,
const {
redis,
exportQueue,
emailQueue,
Project,
getConnection,
Project,
getConnection,
getCompiledModel,
getS3CompatibleStorage,
getStorage,
decrypt,
getBucket
} = require('@urbackend/common');

// Maximum documents written per export.
// The cursor is queried for MAX_EXPORT_ROWS + 1 documents so we can
// distinguish a truncated result (collection has more documents than the cap)
// from an exact match (collection has exactly MAX_EXPORT_ROWS documents).
const MAX_EXPORT_ROWS = 100000;

/**
* Write a chunk to a Writable stream and wait for the drain event if the
* internal buffer is full. Honoring backpressure prevents unbounded memory
* growth when disk or object-storage throughput is slower than Mongo cursor
* throughput.
*
* @param {import('stream').Writable} stream
* @param {string} chunk
* @returns {Promise<void>}
*/
async function writeChunk(stream, chunk) {
const canContinue = stream.write(chunk);
if (!canContinue) {
await new Promise((resolve) => stream.once('drain', resolve));
}
}
Comment thread
yash-pouranik marked this conversation as resolved.

const initExportWorker = () => {
const worker = new Worker(exportQueue.name, async (job) => {
const { projectId, collectionName, userId, email } = job.data;
Expand All @@ -28,7 +51,7 @@ const initExportWorker = () => {
"name collections resources.db.isExternal resources.storage.isExternal +resources.storage.config.encrypted +resources.storage.config.iv +resources.storage.config.tag"
);
if (!project) throw new Error('Project not found');

const col = project.collections.find(c => c.name === collectionName);
if (!col) throw new Error(`Collection ${collectionName} not found`);

Expand All @@ -51,41 +74,72 @@ const initExportWorker = () => {

console.log(`[ExportWorker] Preparing upload to storage (Provider: ${provider})...`);

// wasTruncated is set when the collection contains more than MAX_EXPORT_ROWS
// documents. The cursor is limited to MAX_EXPORT_ROWS + 1 so we can detect
// truncation without a separate count query.
let wasTruncated = false;

if (provider === 'supabase') {
const tempFilePath = path.join(os.tmpdir(), `export_${projectId}_${collectionName}_${Date.now()}.json`);
const writeStream = fs.createWriteStream(tempFilePath);

try {
writeStream.write('{\n');
await writeChunk(writeStream, '{\n');
const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal);

writeStream.write(` "${col.name}": [\n`);

const cursor = Model.find().lean().cursor();

await writeChunk(writeStream, ` "${col.name}": [\n`);

// Query one extra row to detect whether the result was capped.
const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS + 1).cursor();
let first = true;

for await (const doc of cursor) {
if (!first) writeStream.write(',\n');
writeStream.write(` ${JSON.stringify(doc)}`);
first = false;
let exportedCount = 0;

// Wrap cursor iteration in try/finally to guarantee the cursor is
// closed even when the loop exits early via break (truncation case).
// An unclosed cursor holds a MongoDB server-side cursor open indefinitely.
try {
for await (const doc of cursor) {
if (exportedCount >= MAX_EXPORT_ROWS) {
// We received the sentinel row; mark truncated and stop writing.
wasTruncated = true;
break;
}
exportedCount++;
if (!first) await writeChunk(writeStream, ',\n');
await writeChunk(writeStream, ` ${JSON.stringify(doc)}`);
first = false;
}
} finally {
await cursor.close();
}

writeStream.write('\n ]\n');
writeStream.write('}\n');
writeStream.end();

await new Promise((resolve, reject) => {
if (wasTruncated) {
console.warn(`[ExportWorker] Export truncated: collection exceeds the ${MAX_EXPORT_ROWS}-document cap`);
}

await writeChunk(writeStream, '\n ]\n');
await writeChunk(writeStream, '}\n');

// Register the finish listener BEFORE calling end() to avoid a
// race condition where the finish event fires before the promise
// listener is attached, causing the promise to never resolve.
const finishPromise = new Promise((resolve, reject) => {
writeStream.on('finish', resolve);
writeStream.on('error', reject);
});
writeStream.end();
await finishPromise;

console.log(`[ExportWorker] Temp file created, uploading...`);
const fileBuffer = fs.readFileSync(tempFilePath);

const { error } = await client.storage.from(bucket).upload(storagePath, fileBuffer, {

// Stream the file directly instead of reading it all into a Buffer.
// readFileSync would load the entire export (potentially hundreds of MB)
// into memory, negating the purpose of the temp-file strategy.
const readStream = fs.createReadStream(tempFilePath);
const { error } = await client.storage.from(bucket).upload(storagePath, readStream, {
contentType: 'application/json'
});

if (error) throw error;
} finally {
if (fs.existsSync(tempFilePath)) {
Expand All @@ -102,24 +156,40 @@ const initExportWorker = () => {
});

try {
passThrough.write('{\n');
await writeChunk(passThrough, '{\n');

const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal);

passThrough.write(` "${col.name}": [\n`);

const cursor = Model.find().lean().cursor();

await writeChunk(passThrough, ` "${col.name}": [\n`);

// Query one extra row to detect whether the result was capped.
const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS + 1).cursor();
let first = true;

for await (const doc of cursor) {
if (!first) passThrough.write(',\n');
passThrough.write(` ${JSON.stringify(doc)}`);
first = false;
let exportedCount = 0;

// Wrap cursor iteration in try/finally to guarantee the cursor is
// closed even when the loop exits early via break (truncation case).
try {
for await (const doc of cursor) {
if (exportedCount >= MAX_EXPORT_ROWS) {
wasTruncated = true;
break;
}
exportedCount++;
if (!first) await writeChunk(passThrough, ',\n');
await writeChunk(passThrough, ` ${JSON.stringify(doc)}`);
first = false;
}
} finally {
await cursor.close();
}

if (wasTruncated) {
console.warn(`[ExportWorker] Export truncated: collection exceeds the ${MAX_EXPORT_ROWS}-document cap`);
}

passThrough.write('\n ]\n');

passThrough.write('}\n');

await writeChunk(passThrough, '\n ]\n');
await writeChunk(passThrough, '}\n');
passThrough.end();

console.log(`[ExportWorker] Database stream ended. Awaiting final storage upload...`);
Expand All @@ -144,9 +214,16 @@ const initExportWorker = () => {
downloadUrl = await getSignedUrl(s3Client, command, { expiresIn: 86400 });
}

// queue the email to be sent to the user
await emailQueue.add('send-export-email', { email, downloadUrl, projectName: project.name });
console.log(`[ExportWorker] Export completed! Email queued for ${email}`);
// Pass wasTruncated to the email handler so the recipient knows
// the file contains a capped subset of the collection.
await emailQueue.add('send-export-email', {
email,
downloadUrl,
projectName: project.name,
wasTruncated,
maxExportRows: MAX_EXPORT_ROWS
});
console.log(`[ExportWorker] Export completed! Email queued for ${email}${wasTruncated ? ' (truncated)' : ''}`);
}, { connection: redis, concurrency: 2 });

worker.on('completed', (job) => {
Expand Down
Loading