Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"cmd:rename-qb-accounts": "tsx src/cmd/renameQbAccount/index.ts",
"patch-assembly-node-sdk": "cp ./lib-patches/assembly-js-node-sdk.js ./node_modules/@assembly-js/node-sdk/dist/api/init.js",
"cmd:backfill-product-info": "tsx src/cmd/backfillProductInfo/index.ts",
"cmd:sync-missed-invoices": "tsx src/cmd/syncMissedInvoices/index.ts"
"cmd:sync-missed-invoices": "tsx src/cmd/syncMissedInvoices/index.ts",
"cmd:sync-missed-products": "tsx src/cmd/syncMissedProducts/index.ts"
},
"dependencies": {
"@sentry/nextjs": "^9.13.0",
Expand Down
76 changes: 76 additions & 0 deletions src/cmd/syncMissedProducts/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import APIError from '@/app/api/core/exceptions/api'
import User from '@/app/api/core/models/User.model'
import { SyncMissedProductsService } from '@/cmd/syncMissedProducts/syncMissedProducts.service'
import { copilotAPIKey } from '@/config'
import { PortalConnectionWithSettingType } from '@/db/schema/qbPortalConnections'
import { getAllActivePortalConnections } from '@/db/service/token.service'
import { CopilotAPI } from '@/utils/copilotAPI'
import { encodePayload } from '@/utils/crypto'
import CustomLogger from '@/utils/logger'

/**
* This script updates mapped products in QBO whose names were changed in Assembly
* during a specific date window but were missed during regular sync.
*/

// command to run the script: `yarn run cmd:sync-missed-products`
;(async function run() {
try {
console.info('SyncMissedProducts#run | Starting sync missed products')
const activeConnections = await getAllActivePortalConnections()

if (!activeConnections.length) {
console.info('No active connection found')
process.exit(0)
}

for (const connection of activeConnections) {
if (!connection.setting?.syncFlag || !connection.setting?.isEnabled) {
console.info(
'Skipping connection: ' + JSON.stringify(connection.portalId),
)
continue
}

if (!connection.setting.createNewProductFlag) {
console.info(
`Skipping connection: ${connection.portalId}. Create new product flag is false`,
)
continue
}

console.info(
`\n\n\n ########### Processing for PORTAL: ${connection.portalId} #############`,
)

await initiateProcess(connection)
}

console.info('\n Sync missed products completed successfully')
process.exit(0)
} catch (error) {
console.error(error)
process.exit(1)
}
})()

async function initiateProcess(connection: PortalConnectionWithSettingType) {
console.info('Generating token for the portal')
const payload = {
workspaceId: connection.portalId,
}
const token = encodePayload(copilotAPIKey, payload)

const copilot = new CopilotAPI(token)
const tokenPayload = await copilot.getTokenPayload()
CustomLogger.info({
obj: { copilotApiCronToken: token, tokenPayload },
message:
'syncMissedProducts#initiateProcess | Copilot API token and payload',
})
if (!tokenPayload) throw new APIError(500, 'Encoded token is not valid')

const user = new User(token, tokenPayload)
const syncMissedService = new SyncMissedProductsService(user)
await syncMissedService.syncMissedProductsForPortal()
}
171 changes: 171 additions & 0 deletions src/cmd/syncMissedProducts/syncMissedProducts.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import APIError from '@/app/api/core/exceptions/api'
import { BaseService } from '@/app/api/core/services/base.service'
import { withRetry } from '@/app/api/core/utils/withRetry'
import { AuthService } from '@/app/api/quickbooks/auth/auth.service'
import { StatusableError } from '@/type/CopilotApiError'
import { CopilotAPI } from '@/utils/copilotAPI'
import { and, eq } from 'drizzle-orm'
import httpStatus from 'http-status'
import { MAX_PRODUCT_LIST_LIMIT } from '@/app/api/core/constants/limit'
import IntuitAPI from '@/utils/intuitAPI'
import { bottleneck } from '@/utils/bottleneck'
import { QBItemFullUpdatePayloadType } from '@/type/dto/intuitAPI.dto'
import { QBProductSync } from '@/db/schema/qbProductSync'
import CustomLogger from '@/utils/logger'

// Products updated during this window were missed during regular sync
const SYNC_WINDOW_START = new Date('2026-02-17T00:00:00.000Z')
const SYNC_WINDOW_END = new Date('2026-02-20T00:00:00.000Z') // exclusive

export class SyncMissedProductsService extends BaseService {
async _syncMissedProductsForPortal() {
try {
console.info(
`syncMissedProducts#syncMissedProductsForPortal :: Processing portal: ${this.user.workspaceId}`,
)

// 1. Get all the products for the portal
const copilotApi = new CopilotAPI(this.user.token)
const allProducts = await copilotApi.getProducts(
undefined,
undefined,
MAX_PRODUCT_LIST_LIMIT,
)

const filteredProducts = allProducts?.data?.filter(
(product) =>
product.updatedAt &&
new Date(product.updatedAt) >= SYNC_WINDOW_START &&
new Date(product.updatedAt) < SYNC_WINDOW_END,
)

if (!filteredProducts?.length) {
console.info(
`No missed products found for portal ${this.user.workspaceId}`,
)
return
}

const filteredProductIds = filteredProducts.map((product) => product.id)

console.info(
`Found ${filteredProductIds.length} missed products for portal ${this.user.workspaceId}`,
)

const mappedProducts = await this.db.query.QBProductSync.findMany({
where: (QBProductSync, { eq, inArray, isNotNull }) =>
and(
eq(QBProductSync.portalId, this.user.workspaceId),
isNotNull(QBProductSync.qbItemId),
inArray(QBProductSync.productId, filteredProductIds),
),
})

if (!mappedProducts.length) {
console.info(
`No missed products found in Quickbooks for portal ${this.user.workspaceId}`,
)
return
}

// 2. update all the products in QBO
const authService = new AuthService(this.user)
const qbTokenInfo = await authService.getQBPortalConnection(
this.user.workspaceId,
)

if (!qbTokenInfo.accessToken || !qbTokenInfo.refreshToken) {
console.info(
`No access token found for portal: ${this.user.workspaceId}`,
)
return
}

const intuitApi = new IntuitAPI(qbTokenInfo)
const updatePromises = []
for (const mappedProduct of mappedProducts) {
const assemblyProduct = filteredProducts.find(
(product) => product.id === mappedProduct.productId,
)
if (!assemblyProduct) {
console.info(
`Product not found in assembly for product id: ${mappedProduct.productId}`,
)
continue
}
if (!mappedProduct.qbItemId || !mappedProduct.qbSyncToken) {
console.info(
`Product qbItemId or qbSyncToken not found for product id: ${mappedProduct.productId}. QbItemId: ${mappedProduct.qbItemId}, QbSyncToken: ${mappedProduct.qbSyncToken}. Skipping...`,
)
continue
}

const payload = {
Id: mappedProduct.qbItemId,
Name: assemblyProduct.name,
SyncToken: mappedProduct.qbSyncToken,
sparse: true,
Active: true,
}
updatePromises.push(
bottleneck.schedule(() => {
return this.updateQbProduct(intuitApi, payload, mappedProduct.id)
}),
)
}
await Promise.all(updatePromises)
} catch (error: unknown) {
if (error instanceof APIError) {
throw error
}
const assemblyError = error as StatusableError
const status = assemblyError.status || httpStatus.BAD_REQUEST
if (status === httpStatus.FORBIDDEN) {
console.info(
`Assembly sdk returns forbidden for the portal ${this.user.workspaceId}`,
)
return
}
throw error
}
}

private async updateQbProduct(
intuitApi: IntuitAPI,
payload: QBItemFullUpdatePayloadType,
recordId: string,
) {
try {
const response = await intuitApi.itemFullUpdate(payload)
CustomLogger.info({
obj: { response },
message: `SyncMissedProductsService#updateQbProduct | Product updated in Quickbooks for product id: ${recordId}`,
})

// update the product map in mapping table
const updatePayload = {
name: response.Item.Name,
qbSyncToken: response.Item.SyncToken,
}
await this.db
.update(QBProductSync)
.set(updatePayload)
.where(eq(QBProductSync.id, recordId))
} catch (error) {
CustomLogger.error({
message: `SyncMissedProductsService#updateQbProduct | Failed to update product: ${recordId}`,
obj: { error, recordId },
})
}
}

private wrapWithRetry<Args extends unknown[], R>(
fn: (...args: Args) => Promise<R>,
): (...args: Args) => Promise<R> {
return (...args: Args): Promise<R> => withRetry(fn.bind(this), args)
}

syncMissedProductsForPortal = this.wrapWithRetry(
this._syncMissedProductsForPortal,
)
}
1 change: 1 addition & 0 deletions src/type/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ export const ProductResponseSchema = z.object({
status: z.nativeEnum(ProductStatus),
object: z.string(),
createdAt: z.string().datetime(),
updatedAt: z.string().datetime().optional(),
})
export type ProductResponse = z.infer<typeof ProductResponseSchema>

Expand Down
Loading