diff --git a/package.json b/package.json index 42a65ebf..b2a32b1a 100644 --- a/package.json +++ b/package.json @@ -19,7 +19,8 @@ "cmd:rename-qb-accounts": "tsx src/cmd/renameQbAccount/index.ts", "patch-assembly-node-sdk": "cp ./lib-patches/assembly-js-node-sdk.js ./node_modules/@assembly-js/node-sdk/dist/api/init.js", "cmd:backfill-product-info": "tsx src/cmd/backfillProductInfo/index.ts", - "cmd:sync-missed-invoices": "tsx src/cmd/syncMissedInvoices/index.ts" + "cmd:sync-missed-invoices": "tsx src/cmd/syncMissedInvoices/index.ts", + "cmd:sync-missed-products": "tsx src/cmd/syncMissedProducts/index.ts" }, "dependencies": { "@sentry/nextjs": "^9.13.0", diff --git a/src/cmd/syncMissedProducts/index.ts b/src/cmd/syncMissedProducts/index.ts new file mode 100644 index 00000000..610903ad --- /dev/null +++ b/src/cmd/syncMissedProducts/index.ts @@ -0,0 +1,76 @@ +import APIError from '@/app/api/core/exceptions/api' +import User from '@/app/api/core/models/User.model' +import { SyncMissedProductsService } from '@/cmd/syncMissedProducts/syncMissedProducts.service' +import { copilotAPIKey } from '@/config' +import { PortalConnectionWithSettingType } from '@/db/schema/qbPortalConnections' +import { getAllActivePortalConnections } from '@/db/service/token.service' +import { CopilotAPI } from '@/utils/copilotAPI' +import { encodePayload } from '@/utils/crypto' +import CustomLogger from '@/utils/logger' + +/** + * This script updates mapped products in QBO whose names were changed in Assembly + * during a specific date window but were missed during regular sync. + */ + +// command to run the script: `yarn run cmd:sync-missed-products` +;(async function run() { + try { + console.info('SyncMissedProducts#run | Starting sync missed products') + const activeConnections = await getAllActivePortalConnections() + + if (!activeConnections.length) { + console.info('No active connection found') + process.exit(0) + } + + for (const connection of activeConnections) { + if (!connection.setting?.syncFlag || !connection.setting?.isEnabled) { + console.info( + 'Skipping connection: ' + JSON.stringify(connection.portalId), + ) + continue + } + + if (!connection.setting.createNewProductFlag) { + console.info( + `Skipping connection: ${connection.portalId}. Create new product flag is false`, + ) + continue + } + + console.info( + `\n\n\n ########### Processing for PORTAL: ${connection.portalId} #############`, + ) + + await initiateProcess(connection) + } + + console.info('\n Sync missed products completed successfully') + process.exit(0) + } catch (error) { + console.error(error) + process.exit(1) + } +})() + +async function initiateProcess(connection: PortalConnectionWithSettingType) { + console.info('Generating token for the portal') + const payload = { + workspaceId: connection.portalId, + } + const token = encodePayload(copilotAPIKey, payload) + + const copilot = new CopilotAPI(token) + const tokenPayload = await copilot.getTokenPayload() + CustomLogger.info({ + obj: { copilotApiCronToken: token, tokenPayload }, + message: + 'syncMissedProducts#initiateProcess | Copilot API token and payload', + }) + if (!tokenPayload) throw new APIError(500, 'Encoded token is not valid') + + const user = new User(token, tokenPayload) + const syncMissedService = new SyncMissedProductsService(user) + await syncMissedService.syncMissedProductsForPortal() +} diff --git a/src/cmd/syncMissedProducts/syncMissedProducts.service.ts b/src/cmd/syncMissedProducts/syncMissedProducts.service.ts new file mode 100644 index 00000000..d9ac48f0 --- /dev/null +++ b/src/cmd/syncMissedProducts/syncMissedProducts.service.ts @@ -0,0 +1,171 @@ +import APIError from '@/app/api/core/exceptions/api' +import { BaseService } from '@/app/api/core/services/base.service' +import { withRetry } from '@/app/api/core/utils/withRetry' +import { AuthService } from '@/app/api/quickbooks/auth/auth.service' +import { StatusableError } from '@/type/CopilotApiError' +import { CopilotAPI } from '@/utils/copilotAPI' +import { and, eq } from 'drizzle-orm' +import httpStatus from 'http-status' +import { MAX_PRODUCT_LIST_LIMIT } from '@/app/api/core/constants/limit' +import IntuitAPI from '@/utils/intuitAPI' +import { bottleneck } from '@/utils/bottleneck' +import { QBItemFullUpdatePayloadType } from '@/type/dto/intuitAPI.dto' +import { QBProductSync } from '@/db/schema/qbProductSync' +import CustomLogger from '@/utils/logger' + +// Products updated during this window were missed during regular sync +const SYNC_WINDOW_START = new Date('2026-02-17T00:00:00.000Z') +const SYNC_WINDOW_END = new Date('2026-02-20T00:00:00.000Z') // exclusive + +export class SyncMissedProductsService extends BaseService { + async _syncMissedProductsForPortal() { + try { + console.info( + `syncMissedProducts#syncMissedProductsForPortal :: Processing portal: ${this.user.workspaceId}`, + ) + + // 1. Get all the products for the portal + const copilotApi = new CopilotAPI(this.user.token) + const allProducts = await copilotApi.getProducts( + undefined, + undefined, + MAX_PRODUCT_LIST_LIMIT, + ) + + const filteredProducts = allProducts?.data?.filter( + (product) => + product.updatedAt && + new Date(product.updatedAt) >= SYNC_WINDOW_START && + new Date(product.updatedAt) < SYNC_WINDOW_END, + ) + + if (!filteredProducts?.length) { + console.info( + `No missed products found for portal ${this.user.workspaceId}`, + ) + return + } + + const filteredProductIds = filteredProducts.map((product) => product.id) + + console.info( + `Found ${filteredProductIds.length} missed products for portal ${this.user.workspaceId}`, + ) + + const mappedProducts = await this.db.query.QBProductSync.findMany({ + where: (QBProductSync, { eq, inArray, isNotNull }) => + and( + eq(QBProductSync.portalId, this.user.workspaceId), + isNotNull(QBProductSync.qbItemId), + inArray(QBProductSync.productId, filteredProductIds), + ), + }) + + if (!mappedProducts.length) { + console.info( + `No missed products found in Quickbooks for portal ${this.user.workspaceId}`, + ) + return + } + + // 2. update all the products in QBO + const authService = new AuthService(this.user) + const qbTokenInfo = await authService.getQBPortalConnection( + this.user.workspaceId, + ) + + if (!qbTokenInfo.accessToken || !qbTokenInfo.refreshToken) { + console.info( + `No access token found for portal: ${this.user.workspaceId}`, + ) + return + } + + const intuitApi = new IntuitAPI(qbTokenInfo) + const updatePromises = [] + for (const mappedProduct of mappedProducts) { + const assemblyProduct = filteredProducts.find( + (product) => product.id === mappedProduct.productId, + ) + if (!assemblyProduct) { + console.info( + `Product not found in assembly for product id: ${mappedProduct.productId}`, + ) + continue + } + if (!mappedProduct.qbItemId || !mappedProduct.qbSyncToken) { + console.info( + `Product qbItemId or qbSyncToken not found for product id: ${mappedProduct.productId}. QbItemId: ${mappedProduct.qbItemId}, QbSyncToken: ${mappedProduct.qbSyncToken}. Skipping...`, + ) + continue + } + + const payload = { + Id: mappedProduct.qbItemId, + Name: assemblyProduct.name, + SyncToken: mappedProduct.qbSyncToken, + sparse: true, + Active: true, + } + updatePromises.push( + bottleneck.schedule(() => { + return this.updateQbProduct(intuitApi, payload, mappedProduct.id) + }), + ) + } + await Promise.all(updatePromises) + } catch (error: unknown) { + if (error instanceof APIError) { + throw error + } + const assemblyError = error as StatusableError + const status = assemblyError.status || httpStatus.BAD_REQUEST + if (status === httpStatus.FORBIDDEN) { + console.info( + `Assembly sdk returns forbidden for the portal ${this.user.workspaceId}`, + ) + return + } + throw error + } + } + + private async updateQbProduct( + intuitApi: IntuitAPI, + payload: QBItemFullUpdatePayloadType, + recordId: string, + ) { + try { + const response = await intuitApi.itemFullUpdate(payload) + CustomLogger.info({ + obj: { response }, + message: `SyncMissedProductsService#updateQbProduct | Product updated in Quickbooks for product id: ${recordId}`, + }) + + // update the product map in mapping table + const updatePayload = { + name: response.Item.Name, + qbSyncToken: response.Item.SyncToken, + } + await this.db + .update(QBProductSync) + .set(updatePayload) + .where(eq(QBProductSync.id, recordId)) + } catch (error) { + CustomLogger.error({ + message: `SyncMissedProductsService#updateQbProduct | Failed to update product: ${recordId}`, + obj: { error, recordId }, + }) + } + } + + private wrapWithRetry( + fn: (...args: Args) => Promise, + ): (...args: Args) => Promise { + return (...args: Args): Promise => withRetry(fn.bind(this), args) + } + + syncMissedProductsForPortal = this.wrapWithRetry( + this._syncMissedProductsForPortal, + ) +} diff --git a/src/type/common.ts b/src/type/common.ts index b8240402..434a1a06 100644 --- a/src/type/common.ts +++ b/src/type/common.ts @@ -230,6 +230,7 @@ export const ProductResponseSchema = z.object({ status: z.nativeEnum(ProductStatus), object: z.string(), createdAt: z.string().datetime(), + updatedAt: z.string().datetime().optional(), }) export type ProductResponse = z.infer