This document explains the complete job import system, including:
- High-level architecture
- Job import workflow
- Cron + Queue + Worker system
- Real-time updates with Socket.IO
- Retry & concurrency logic
- Import logs and monitoring
- Sequence diagrams & flow diagrams (ASCII)
This application imports jobs from multiple RSS feeds, normalizes the data, and stores them in MongoDB. It is built as a scalable background processing pipeline using:
- Cron Jobs → Trigger imports every hour
- Redis + BullMQ Queue → Task scheduling + retries + backoff
- Workers → Process job imports in background
- MongoDB → Store job entries & import logs
- Socket.IO → Real-time status updates for dashboards
- Exponential Backoff → Automatic retries on failure
- Batch Processing & Concurrency → High performance imports
┌────────────────────────────────────────────────────────────┐
│ USER INTERFACE / ADMIN │
│ (Real-time import progress using Socket.IO) │
└────────────────────────────────────────────────────────────┘
▲
│ Socket Events
│
┌──────────────┴──────────────┐
│ Node.js Backend │
│ (API Server + Cron Job) │
└──────────────┬──────────────┘
│ Adds Jobs to Queue
▼
┌────────────────────────────────────────────────────────────┐
│ BullMQ Job Queue (Redis) │
│ Stores scheduled + retrying tasks │
└────────────────────────────────────────────────────────────┘
│ Worker Pulls Jobs
▼
┌────────────────────────────────────────────────────────────┐
│ Worker Processor │
│ Parses RSS → Normalizes → Inserts/Updates into MongoDB │
│ Emits Socket.IO Events → Logs import history │
└────────────────────────────────────────────────────────────┘
│ Saves Logs
▼
┌────────────────────────────────────────────────────────────┐
│ MongoDB Database │
│ jobs collection + import_logs collection │
└────────────────────────────────────────────────────────────┘
A cron job runs every hour:
- Loops through all RSS feed URLs
- Adds each feed as a new job to BullMQ queue
BullMQ stores each import job and:
- Handles retries
- Applies exponential backoff
- Guarantees no data loss
For each feed:
-
Fetches RSS XML
-
Parses using
fast-xml-parser -
Normalizes the fields
-
Upserts into MongoDB
-
Tracks:
- totalFetched
- newJobs
- updatedJobs
- failedJobs
-
Saves an import log record
Worker emits events through Socket.IO:
import:startedimport:completedimport:failed
Frontend admin dashboard listens and updates UI in real-time.
+-------------------------+
| Cron Job (every hour) |
+-----------+-------------+
|
v
+-------------------------+
| Add feed job to Queue |
+-----------+-------------+
|
v
+-------------------------+
| Redis (BullMQ Queue) |
+-----------+-------------+
|
v
+-------------------------+
| Worker picks a job |
+-----------+-------------+
|
v
+-------------------------+
| Fetch RSS XML |
+-----------+-------------+
|
v
+-------------------------+
| Parse XML → JSON |
+-----------+-------------+
|
v
+-------------------------+
| Normalize Item Data |
+-----------+-------------+
|
v
+-------------------------+
| Upsert into MongoDB |
+-----------+-------------+
|
v
+-------------------------+
| Record import_logs |
+-----------+-------------+
|
v
+-------------------------+
| Emit Socket.IO events |
+-------------------------+
BullMQ handles retry logic:
- 5 retry attempts
- Exponential backoff (2s → 4s → 8s → 16s → 32s)
This ensures:
- Temporary feed outages don't break the flow
- No manual intervention required
To optimize performance:
- Batch size controls number of items processed at once
- Concurrency controls number of parallel worker threads
Example environment variables:
JOB_BATCH_SIZE=50
JOB_MAX_CONCURRENCY=5
Worker processes multiple items per batch and multiple feeds in parallel.
Worker emits:
import:started→ when a feed beginsimport:completed→ when stats are returnedimport:failed→ on errors
Admin dashboard listens and updates the UI live.
Saved inside import_logs collection:
Fields:
- feedUrl
- timestamp
- totalFetched
- totalImported
- newJobs
- updatedJobs
- failedJobs[]
Purpose:
- Analytics
- Monitoring feed health
- Debugging issues
- Logs error
- Adds error to failed jobs list
- Job moved to retry queue
- Sends alert via Socket.IO
- Logs reason
Workers can be increased on multiple servers.
Redis queue + retries = stable imports.
Admins see import events instantly.
Cron, imports, workers, logs are separated.
Batch size, concurrency, and backoff controlled via env.
Cron → Queue → Worker → MongoDB → Admin UI
Full diag:
Cron Scheduler Queue (Redis) Worker MongoDB Admin Dashboard
| | | | |
| addJob(feedUrl) | | | |
|-------------------->| | | |
| | store job | | |
| |---------------->| | |
| | | process job | |
| | |---------------------->| |
| | | | insert/update jobs |
| | | |-------------------->|
| | | emit(import:completed)| |
| | |--------------------------------------------->|
This system is:
- scalable
- fast
- reliable
- observable
- real-time
It follows an enterprise-grade pipeline used in modern job aggregators, scrapers, and background ETL systems.
If you want, I can also add:
- ER Diagram (MongoDB)
- API documentation
- Frontend dashboard UI layout
- Deployment guide (Docker + PM2)
- Monitoring via Grafana or Upstash