Skip to content
Merged

1.10.0 #1163

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8e53ca4
feat: Added Backpressure when db buffer gets overflow
thisismayuresh Dec 30, 2025
42a089a
feat: Added Logging for broker disconnection
thisismayuresh Dec 30, 2025
a4fd762
feat: Added Streaming based imported data as json processing
thisismayuresh Dec 30, 2025
eb0c9bc
feat: Processing a batch of 50 pages in one publishToQueue event
thisismayuresh Dec 30, 2025
c3e0066
feat: Add `getRecordsStream` method to DalService for streaming recor…
thisismayuresh Dec 30, 2025
ba943ef
feat: Added error logging for import processing failures
thisismayuresh Dec 30, 2025
0a0a556
feat: Implemented Sentry error capturing in end import consumer
thisismayuresh Dec 30, 2025
986cbad
Feat/added streaming based record processing (#1146)
chavda-bhavik Dec 30, 2025
17345dd
feat: add comprehensive logging to all cron jobs for CPU debugging
sunilbhadu99 Jan 30, 2026
8b6f8aa
feat: update logo display and auto-import enhancements
sunilbhadu99 Jan 30, 2026
4d5fb77
Feat/logo display auto import enhancements (#1149)
chavda-bhavik Jan 30, 2026
03f327f
chore: add documentation comment to dashboard
SunilBhadu Feb 4, 2026
382bd5f
chore: add documentation comment to dashboard (#1153)
jenishpaghadal Feb 5, 2026
ac632a0
feat: implement iframe upload status communication
SunilBhadu Feb 9, 2026
0650e03
feat: implement iframe upload status communication (#1154)
jenishpaghadal Feb 9, 2026
95593d4
refactor: Simplify file heading renaming to only apply when columnHea…
SunilBhadu Feb 9, 2026
47dfe00
refactor: Simplify file heading renaming to only apply when columnHea…
jenishpaghadal Feb 9, 2026
e7a6e7c
feat: Add upload status events and dedicated error handlers
SunilBhadu Feb 12, 2026
4a847d3
feat: Add upload status events and dedicated error handlers (#1158)
jenishpaghadal Feb 12, 2026
198e5b2
style: Update 'Change Card' menu item text to 'Manage Billing' in act…
SunilBhadu Feb 16, 2026
e285018
refactor: move manage billing to direct button and bump version
SunilBhadu Feb 16, 2026
db5a33b
v1.10.0
SunilBhadu Feb 16, 2026
c478578
chore: bump docker image versions for api, queue-manager, widget, emb…
SunilBhadu Feb 16, 2026
50509ec
style: Update 'Change Card' menu item text to 'Manage Billing' in act…
chavda-bhavik Feb 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@impler/api",
"version": "1.9.3",
"version": "1.10.0",
"author": "implerhq",
"license": "MIT",
"private": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,94 @@ export class AutoImportJobsSchedular {

@Cron(CronExpression.EVERY_MINUTE)
async handleCronSchedular() {
console.log('Cron Running');
await this.fetchAndExecuteScheduledJobs();
const startTime = new Date();
const memUsageStart = process.memoryUsage();
const cpuUsageStart = process.cpuUsage();

console.log('========================================');
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Cron Started at ${startTime.toISOString()}`);
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Memory Usage (Start): RSS=${(memUsageStart.rss / 1024 / 1024).toFixed(2)}MB, Heap=${(memUsageStart.heapUsed / 1024 / 1024).toFixed(2)}MB`);
console.log('========================================');

try {
await this.fetchAndExecuteScheduledJobs();

const endTime = new Date();
const duration = endTime.getTime() - startTime.getTime();
const memUsageEnd = process.memoryUsage();
const cpuUsageEnd = process.cpuUsage(cpuUsageStart);

console.log('========================================');
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Cron Completed at ${endTime.toISOString()}`);
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Duration: ${duration}ms`);
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Memory Usage (End): RSS=${(memUsageEnd.rss / 1024 / 1024).toFixed(2)}MB, Heap=${(memUsageEnd.heapUsed / 1024 / 1024).toFixed(2)}MB`);
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Memory Delta: RSS=${((memUsageEnd.rss - memUsageStart.rss) / 1024 / 1024).toFixed(2)}MB, Heap=${((memUsageEnd.heapUsed - memUsageStart.heapUsed) / 1024 / 1024).toFixed(2)}MB`);
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] CPU Usage: User=${(cpuUsageEnd.user / 1000).toFixed(2)}ms, System=${(cpuUsageEnd.system / 1000).toFixed(2)}ms`);

if (duration > 5000) {
console.warn(`[AUTO-IMPORT-JOBS-SCHEDULER] ⚠️ WARNING: Cron execution took ${duration}ms (>5s threshold)`);
}
console.log('========================================');
} catch (error) {
const endTime = new Date();
const duration = endTime.getTime() - startTime.getTime();

console.error('========================================');
console.error(`[AUTO-IMPORT-JOBS-SCHEDULER] ❌ ERROR at ${endTime.toISOString()}`);
console.error(`[AUTO-IMPORT-JOBS-SCHEDULER] Duration before error: ${duration}ms`);
console.error('[AUTO-IMPORT-JOBS-SCHEDULER] Error details:', error);
console.error('========================================');
}
}

private async fetchAndExecuteScheduledJobs() {
const now = dayjs();
const userJobs = await this.userJobRepository.find({});

console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Total jobs found: ${userJobs.length}`);

let jobsProcessed = 0;
let jobsSkipped = 0;
let jobsExecuted = 0;

for (const userJob of userJobs) {
if (await this.shouldCroneRun({ userJob })) {
try {
const jobStartTime = Date.now();
try {
if (await this.shouldCroneRun({ userJob })) {
if (this.isJobDueNow(userJob.nextRun, now)) {
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Job is due now - JobID: ${userJob._id}, Time: ${new Date().toISOString()}`);

const nextScheduledTime = this.calculateNextRun(userJob.cron, userJob.nextRun);

await this.scheduleUpdateNextRun(userJob._id, nextScheduledTime, dayjs(userJob.endsOn));


const executeStartTime = Date.now();
await this.userJobTriggerService.execute(userJob._id);
const executeDuration = Date.now() - executeStartTime;

jobsExecuted++;
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Job executed - JobID: ${userJob._id}, Duration: ${executeDuration}ms`);

if (executeDuration > 5000) {
console.warn(`[AUTO-IMPORT-JOBS-SCHEDULER] ⚠️ WARNING: Job execution took ${executeDuration}ms (>5s) - JobID: ${userJob._id}`);
}
} else {
jobsSkipped++;
}
} catch (error) {}
} else {
jobsSkipped++;
}

jobsProcessed++;
const jobDuration = Date.now() - jobStartTime;
if (jobDuration > 1000) {
console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Job processing took ${jobDuration}ms - JobID: ${userJob._id}`);
}
} catch (error) {
console.error(`[AUTO-IMPORT-JOBS-SCHEDULER] ❌ Error processing job ${userJob._id} at ${new Date().toISOString()}`, error);
}
}

console.log(`[AUTO-IMPORT-JOBS-SCHEDULER] Summary - Processed: ${jobsProcessed}, Executed: ${jobsExecuted}, Skipped: ${jobsSkipped}`);
}

calculateNextRun(cronExpression: string, currentNextRun: Date): dayjs.Dayjs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,82 @@ export class FailedWebhookRetry {

@Cron(CronExpression.EVERY_5_MINUTES)
async processWebhookRetries() {
const startTime = new Date();
const memUsageStart = process.memoryUsage();
const cpuUsageStart = process.cpuUsage();

console.log('========================================');
console.log(`[FAILED-WEBHOOK-RETRY] Cron Started at ${startTime.toISOString()}`);
console.log(`[FAILED-WEBHOOK-RETRY] Memory Usage (Start): RSS=${(memUsageStart.rss / 1024 / 1024).toFixed(2)}MB, Heap=${(memUsageStart.heapUsed / 1024 / 1024).toFixed(2)}MB`);
console.log('========================================');

try {
const failedWebhooks: FailedWebhookRetryRequestsEntity[] = await this.failedWebhookRetryRequestsRepository.find({
nextRequestTime: { $lt: new Date() },
});

console.log(`[FAILED-WEBHOOK-RETRY] Found ${failedWebhooks.length} failed webhooks to retry`);

if (!failedWebhooks.length) {
console.log(`[FAILED-WEBHOOK-RETRY] No webhooks to process, exiting`);
return;
}

await Promise.allSettled(failedWebhooks.map((wbh) => this.processWebhook(wbh)));
const processStartTime = Date.now();
const results = await Promise.allSettled(failedWebhooks.map((wbh) => this.processWebhook(wbh)));
const processDuration = Date.now() - processStartTime;

const successful = results.filter(r => r.status === 'fulfilled').length;
const failed = results.filter(r => r.status === 'rejected').length;

const endTime = new Date();
const duration = endTime.getTime() - startTime.getTime();
const memUsageEnd = process.memoryUsage();
const cpuUsageEnd = process.cpuUsage(cpuUsageStart);

console.log('========================================');
console.log(`[FAILED-WEBHOOK-RETRY] Cron Completed at ${endTime.toISOString()}`);
console.log(`[FAILED-WEBHOOK-RETRY] Results - Successful: ${successful}, Failed: ${failed}, Total: ${failedWebhooks.length}`);
console.log(`[FAILED-WEBHOOK-RETRY] Processing Duration: ${processDuration}ms`);
console.log(`[FAILED-WEBHOOK-RETRY] Total Duration: ${duration}ms`);
console.log(`[FAILED-WEBHOOK-RETRY] Memory Usage (End): RSS=${(memUsageEnd.rss / 1024 / 1024).toFixed(2)}MB, Heap=${(memUsageEnd.heapUsed / 1024 / 1024).toFixed(2)}MB`);
console.log(`[FAILED-WEBHOOK-RETRY] Memory Delta: RSS=${((memUsageEnd.rss - memUsageStart.rss) / 1024 / 1024).toFixed(2)}MB, Heap=${((memUsageEnd.heapUsed - memUsageStart.heapUsed) / 1024 / 1024).toFixed(2)}MB`);
console.log(`[FAILED-WEBHOOK-RETRY] CPU Usage: User=${(cpuUsageEnd.user / 1000).toFixed(2)}ms, System=${(cpuUsageEnd.system / 1000).toFixed(2)}ms`);

if (duration > 5000) {
console.warn(`[FAILED-WEBHOOK-RETRY] ⚠️ WARNING: Cron execution took ${duration}ms (>5s threshold)`);
}

if (failedWebhooks.length > 100) {
console.warn(`[FAILED-WEBHOOK-RETRY] ⚠️ WARNING: Processing large batch of ${failedWebhooks.length} webhooks`);
}

console.log('========================================');
} catch (error) {
const endTime = new Date();
const duration = endTime.getTime() - startTime.getTime();

console.error('========================================');
console.error(`[FAILED-WEBHOOK-RETRY] ❌ ERROR at ${endTime.toISOString()}`);
console.error(`[FAILED-WEBHOOK-RETRY] Duration before error: ${duration}ms`);
console.error('[FAILED-WEBHOOK-RETRY] Error details:', error);
console.error('========================================');
throw error;
}
}

private async processWebhook(webhook: FailedWebhookRetryRequestsEntity) {
const webhookStartTime = Date.now();
try {
console.log(`[FAILED-WEBHOOK-RETRY] Processing webhook - ID: ${webhook._id}, Time: ${new Date().toISOString()}`);

this.queueService.publishToQueue(QueuesEnum.SEND_FAILED_WEBHOOK_DATA, webhook._id as string);

const webhookDuration = Date.now() - webhookStartTime;
console.log(`[FAILED-WEBHOOK-RETRY] Webhook queued - ID: ${webhook._id}, Duration: ${webhookDuration}ms`);
} catch (error) {
const webhookDuration = Date.now() - webhookStartTime;
console.error(`[FAILED-WEBHOOK-RETRY] ❌ Error processing webhook - ID: ${webhook._id}, Duration: ${webhookDuration}ms, Time: ${new Date().toISOString()}`, error);
throw error;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ export class ReanameFileHeadings {

const newHeadings = [...uploadInfo.headings];
templateColumnItems.forEach((mapping) => {
if (!mapping.columnHeading) {
const headingIndex = newHeadings.findIndex((heading) => heading === mapping.key);
if (headingIndex > -1) newHeadings[headingIndex] = '_';
} else {
if (mapping.columnHeading) {
const columnHeadingIndex = newHeadings.findIndex((heading) => heading === mapping.columnHeading);
const keyHeadingIndex = newHeadings.findIndex((keyHeading) => keyHeading === mapping.key);
if (keyHeadingIndex > -1 && columnHeadingIndex > -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ export class BaseReview {
Papa.parse(csvFileStream, {
dynamicTyping: false,
skipEmptyLines: 'greedy',
step: (results: Papa.ParseStepResult<any>) => {
step: (results: Papa.ParseStepResult<any>, parser: any) => {
totalRecords++;
const record = results.data;

Expand All @@ -477,7 +477,13 @@ export class BaseReview {
} else {
invalidRecords++;
}
dataStream.write(validationResultItem);
const canContinue = dataStream.write(validationResultItem);
if (!canContinue) {
parser.pause();
dataStream.once('drain', () => {
parser.resume();
});
}
}
},
complete: async () => {
Expand Down
4 changes: 2 additions & 2 deletions apps/api/src/app/shared/services/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ export class QueueService {
this.connection.on('error', (error: any) => {
Logger.error('QueueService RabbitMQ::Error!', error);
});
this.connection.on('disconnect', () => {
Logger.log('QueueService RabbitMQ::Disconnected!');
this.connection.on('disconnect', ({ error }: { error: any }) => {
Logger.log('QueueService RabbitMQ::Disconnected!', error);
this.isQueueConnected = false;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,71 @@ export class UploadCleanupSchedulerService {

@Cron(CRON_SCHEDULE.UPLOAD_CLEANUP_DEFAULT_CRON_TIME)
async handleCleanupCronSchedular(cleanupDays: number = CRON_SCHEDULE.UPLOAD_CLEANUP_DAYS) {
const startTime = new Date();
const memUsageStart = process.memoryUsage();
const cpuUsageStart = process.cpuUsage();

console.log('========================================');
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Cron Started at ${startTime.toISOString()}`);
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Cleanup days: ${cleanupDays}`);
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Memory Usage (Start): RSS=${(memUsageStart.rss / 1024 / 1024).toFixed(2)}MB, Heap=${(memUsageStart.heapUsed / 1024 / 1024).toFixed(2)}MB`);
console.log('========================================');

const cleanupDaysAgo = dayjs().subtract(cleanupDays, 'day').toDate();
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Finding uploads older than: ${cleanupDaysAgo.toISOString()}`);

const uploads = await Upload.find({
uploadedDate: { $lt: cleanupDaysAgo },
_uploadedFileId: { $exists: true, $ne: '' },
});

console.log(`[UPLOAD-CLEANUP-SCHEDULER] Found ${uploads.length} uploads to clean up`);

if (uploads.length === 0) {
console.log(`[UPLOAD-CLEANUP-SCHEDULER] No uploads to clean up, exiting`);
return;
}

if (uploads.length > 100) {
console.warn(`[UPLOAD-CLEANUP-SCHEDULER] ⚠️ WARNING: Processing large batch of ${uploads.length} uploads - potential CPU intensive operation`);
}

let uploadsProcessed = 0;
let uploadsSucceeded = 0;
let uploadsFailed = 0;
let totalFilesDeleted = 0;
let totalCollectionsDropped = 0;

for (const upload of uploads) {
const uploadStartTime = Date.now();
try {
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Processing upload - ID: ${upload.id}, UploadedFileID: ${upload._uploadedFileId}`);

const files = await this.fileRepository.find({ _id: upload._uploadedFileId });
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Found ${files.length} files for upload ${upload.id}`);

const storagDeleteStart = Date.now();
await this.storageService.deleteFolder(upload.id);
const storagDeleteDuration = Date.now() - storagDeleteStart;
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Storage folder deleted - Upload: ${upload.id}, Duration: ${storagDeleteDuration}ms`);

if (storagDeleteDuration > 2000) {
console.warn(`[UPLOAD-CLEANUP-SCHEDULER] ⚠️ WARNING: Storage deletion took ${storagDeleteDuration}ms (>2s) - Upload: ${upload.id}`);
}

await Promise.allSettled(
const fileResults = await Promise.allSettled(
files.map(async (file) => {
try {
await Upload.updateOne({ _uploadedFileId: file._id }, { $set: { _uploadedFileId: '' } });

// Delete file from storage and db
try {
await this.fileRepository.delete({ _id: file._id });
} catch (error) {}
totalFilesDeleted++;
console.log(`[UPLOAD-CLEANUP-SCHEDULER] File deleted - FileID: ${file._id}`);
} catch (error) {
console.error(`[UPLOAD-CLEANUP-SCHEDULER] ❌ Error deleting file - FileID: ${file._id}`, error);
}

const collectionName = `${upload._id}-records`;
try {
Expand All @@ -51,12 +91,59 @@ export class UploadCleanupSchedulerService {
if (collections.length > 0) {
const collection = this.dalService.connection.collection(collectionName);
await collection.drop();
totalCollectionsDropped++;
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Collection dropped - Name: ${collectionName}`);
}
} catch (error) {}
} catch (error) {}
} catch (error) {
console.error(`[UPLOAD-CLEANUP-SCHEDULER] ❌ Error dropping collection - Name: ${collectionName}`, error);
}
} catch (error) {
console.error(`[UPLOAD-CLEANUP-SCHEDULER] ❌ Error processing file - FileID: ${file._id}`, error);
throw error;
}
})
);
} catch (error) {}

const fileSuccesses = fileResults.filter(r => r.status === 'fulfilled').length;
const fileFailures = fileResults.filter(r => r.status === 'rejected').length;

const uploadDuration = Date.now() - uploadStartTime;
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Upload processed - ID: ${upload.id}, Files: ${files.length}, Success: ${fileSuccesses}, Failed: ${fileFailures}, Duration: ${uploadDuration}ms`);

if (uploadDuration > 5000) {
console.warn(`[UPLOAD-CLEANUP-SCHEDULER] ⚠️ WARNING: Upload processing took ${uploadDuration}ms (>5s) - Upload: ${upload.id}`);
}

uploadsSucceeded++;
} catch (error) {
const uploadDuration = Date.now() - uploadStartTime;
console.error(`[UPLOAD-CLEANUP-SCHEDULER] ❌ Error processing upload - ID: ${upload.id}, Duration: ${uploadDuration}ms`, error);
uploadsFailed++;
} finally {
uploadsProcessed++;
}
}

const endTime = new Date();
const duration = endTime.getTime() - startTime.getTime();
const memUsageEnd = process.memoryUsage();
const cpuUsageEnd = process.cpuUsage(cpuUsageStart);

console.log('========================================');
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Cron Completed at ${endTime.toISOString()}`);
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Summary - Total: ${uploadsProcessed}, Success: ${uploadsSucceeded}, Failed: ${uploadsFailed}`);
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Files Deleted: ${totalFilesDeleted}, Collections Dropped: ${totalCollectionsDropped}`);
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Total Duration: ${duration}ms`);
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Memory Usage (End): RSS=${(memUsageEnd.rss / 1024 / 1024).toFixed(2)}MB, Heap=${(memUsageEnd.heapUsed / 1024 / 1024).toFixed(2)}MB`);
console.log(`[UPLOAD-CLEANUP-SCHEDULER] Memory Delta: RSS=${((memUsageEnd.rss - memUsageStart.rss) / 1024 / 1024).toFixed(2)}MB, Heap=${((memUsageEnd.heapUsed - memUsageStart.heapUsed) / 1024 / 1024).toFixed(2)}MB`);
console.log(`[UPLOAD-CLEANUP-SCHEDULER] CPU Usage: User=${(cpuUsageEnd.user / 1000).toFixed(2)}ms, System=${(cpuUsageEnd.system / 1000).toFixed(2)}ms`);

if (duration > 30000) {
console.error(`[UPLOAD-CLEANUP-SCHEDULER] 🚨 CRITICAL: Cron execution took ${duration}ms (>30s threshold) - HIGH CPU RISK!`);
} else if (duration > 10000) {
console.warn(`[UPLOAD-CLEANUP-SCHEDULER] ⚠️ WARNING: Cron execution took ${duration}ms (>10s threshold)`);
}

console.log('========================================');
}
}
2 changes: 1 addition & 1 deletion apps/queue-manager/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@impler/queue-manager",
"version": "1.9.3",
"version": "1.10.0",
"author": "implerhq",
"license": "MIT",
"private": true,
Expand Down
Loading
Loading