From ec5de93eff253a6fa2e8326210a3227be6957bc1 Mon Sep 17 00:00:00 2001 From: Lukasz Chmielewski Date: Mon, 15 Sep 2025 15:34:30 +0200 Subject: [PATCH 1/3] added rabbit --- package-lock.json | 72 +++++++++++- package.json | 11 +- .../interfaces/transaction.controller.ts | 32 ++++- .../interfaces/transaction.router.ts | 6 +- src/server.ts | 1 + src/shared/DIcontainer/container.ts | 8 ++ .../DIcontainer/registeredServicesEnum.ts | 3 + src/shared/clients/rabbitMQ/rabbit.client.ts | 110 ++++++++++++++++++ src/shared/clients/webhook/webhook.client.ts | 3 - src/shared/routers/catchAsync.ts | 7 ++ src/shared/routers/routes.enum.ts | 1 + src/shared/server/app.ts | 1 + 12 files changed, 245 insertions(+), 10 deletions(-) create mode 100644 src/shared/clients/rabbitMQ/rabbit.client.ts create mode 100644 src/shared/routers/catchAsync.ts diff --git a/package-lock.json b/package-lock.json index 230d36e..9159c94 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,13 +9,16 @@ "version": "1.0.0", "license": "ISC", "dependencies": { + "amqplib": "^0.10.9", "axios": "^1.11.0", "body-parser": "^2.2.0", + "dotenv": "^17.2.2", "express": "^5.1.0", "joi": "^17.13.3", "reflect-metadata": "^0.2.2" }, "devDependencies": { + "@types/amqplib": "^0.10.7", "@types/body-parser": "^1.19.6", "@types/express": "^5.0.3", "@types/jest": "^30.0.0", @@ -1268,6 +1271,16 @@ "license": "0BSD", "optional": true }, + "node_modules/@types/amqplib": { + "version": "0.10.7", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.10.7.tgz", + "integrity": "sha512-IVj3avf9AQd2nXCx0PGk/OYq7VmHiyNxWFSb5HhU9ATh+i+gHWvVcljFTcTWQ/dyHJCTrzCixde+r/asL2ErDA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/babel__core": { "version": "7.20.5", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.20.5.tgz", @@ -1797,6 +1810,19 @@ "node": ">=0.4.0" } }, + "node_modules/amqplib": { + "version": "0.10.9", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.9.tgz", + "integrity": "sha512-jwSftI4QjS3mizvnSnOrPGYiUnm1vI2OP1iXeOUz5pb74Ua0nbf6nPyyTzuiCLEE3fMpaJORXh2K/TQ08H5xGA==", + "license": "MIT", + "dependencies": { + "buffer-more-ints": "~1.0.0", + "url-parse": "~1.5.10" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/ansi-escapes": { "version": "4.3.2", "resolved": "https://registry.npmjs.org/ansi-escapes/-/ansi-escapes-4.3.2.tgz", @@ -1880,9 +1906,9 @@ "license": "MIT" }, "node_modules/axios": { - "version": "1.11.0", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.11.0.tgz", - "integrity": "sha512-1Lx3WLFQWm3ooKDYZD1eXmoGO9fxYQjrycfHFC8P0sCfQVXyROp0p9PFWBehewBOdCwHc+f/b8I0fMto5eSfwA==", + "version": "1.12.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.12.2.tgz", + "integrity": "sha512-vMJzPewAlRyOgxV2dU0Cuz2O8zzzx9VYtbJOaBgXFeLc4IV/Eg50n4LowmehOOR61S8ZMpc2K5Sa7g6A4jfkUw==", "license": "MIT", "dependencies": { "follow-redirects": "^1.15.6", @@ -2115,6 +2141,12 @@ "dev": true, "license": "MIT" }, + "node_modules/buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==", + "license": "MIT" + }, "node_modules/bytes": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz", @@ -2575,6 +2607,18 @@ "node": ">=0.3.1" } }, + "node_modules/dotenv": { + "version": "17.2.2", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-17.2.2.tgz", + "integrity": "sha512-Sf2LSQP+bOlhKWWyhFsn0UsfdK/kCWRv1iuA2gXAwt3dyNabr6QSj00I2V10pidqz69soatm9ZwZvpQMTIOd5Q==", + "license": "BSD-2-Clause", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://dotenvx.com" + } + }, "node_modules/dunder-proto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/dunder-proto/-/dunder-proto-1.0.1.tgz", @@ -4902,6 +4946,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==", + "license": "MIT" + }, "node_modules/range-parser": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/range-parser/-/range-parser-1.2.1.tgz", @@ -4962,6 +5012,12 @@ "node": ">=0.10.0" } }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==", + "license": "MIT" + }, "node_modules/resolve-cwd": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/resolve-cwd/-/resolve-cwd-3.0.0.tgz", @@ -5827,6 +5883,16 @@ "browserslist": ">= 4.21.0" } }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "license": "MIT", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "node_modules/v8-compile-cache-lib": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", diff --git a/package.json b/package.json index dee0904..fefa1b7 100644 --- a/package.json +++ b/package.json @@ -22,13 +22,16 @@ }, "homepage": "https://github.com/lukchm94/NodeGateway#readme", "dependencies": { + "amqplib": "^0.10.9", "axios": "^1.11.0", "body-parser": "^2.2.0", + "dotenv": "^17.2.2", "express": "^5.1.0", "joi": "^17.13.3", "reflect-metadata": "^0.2.2" }, "devDependencies": { + "@types/amqplib": "^0.10.7", "@types/body-parser": "^1.19.6", "@types/express": "^5.0.3", "@types/jest": "^30.0.0", @@ -44,7 +47,11 @@ "jest": { "preset": "ts-jest", "testEnvironment": "node", - "roots": ["/tests"], - "setupFiles": ["/tests/jest.setup.ts"] + "roots": [ + "/tests" + ], + "setupFiles": [ + "/tests/jest.setup.ts" + ] } } diff --git a/src/modules/transaction/interfaces/transaction.controller.ts b/src/modules/transaction/interfaces/transaction.controller.ts index 0f8c3ee..c3cfc8e 100644 --- a/src/modules/transaction/interfaces/transaction.controller.ts +++ b/src/modules/transaction/interfaces/transaction.controller.ts @@ -2,6 +2,7 @@ import { HttpStatusCode } from "axios"; import { NextFunction, Response } from "express"; import Joi from "joi"; import { inject, injectable } from "tsyringe"; +import { RabbitClient } from "../../../shared/clients/rabbitMQ/rabbit.client"; import { RegisteredServicesEnum } from "../../../shared/DIcontainer/registeredServicesEnum"; import { HttpMethodEnum } from "../../../shared/types/http-methods"; import { ValidationError } from "../../../shared/utils/error"; @@ -19,7 +20,9 @@ export class TransactionController extends BaseClass { @inject(RegisteredServicesEnum.APP_LOGGER) protected readonly appLogger: Logger, @inject(RegisteredServicesEnum.PROCESS_TRANSACTION_USE_CASE) - private readonly processTransactionUseCase: ProcessTransactionUseCase + private readonly processTransactionUseCase: ProcessTransactionUseCase, + @inject(RegisteredServicesEnum.RABBIT_CLIENT) + private readonly rabbitClient: RabbitClient ) { super(appLogger); } @@ -133,4 +136,31 @@ export class TransactionController extends BaseClass { next(error); } }; + + public postToQueue = async ( + req: Request, + resp: Response, + next: NextFunction + ): Promise => { + try { + const msg = `${this.logPrefix} Testing transaction - ${HttpMethodEnum.POST} - ${req.url}`; + this.appLogger.info(msg); + await this.rabbitClient.connect(); + this.appLogger.info(`${this.logPrefix} RabbitMQ client connected.`); + this.rabbitClient.sendToQueue(msg); + + this.appLogger.info(`${this.logPrefix} Message sent to RabbitMQ: ${msg}`); + resp.status(HttpStatusCode.Ok).send({ status: msg }); + } catch (error) { + this.appLogger.error( + `${this.logPrefix} Error processing send to Rabbit request: ${ + error instanceof Error ? error.message : String(error) + }` + ); + if (error instanceof Error && error.stack) { + this.appLogger.error(error.stack); + } + next(error); + } + }; } diff --git a/src/modules/transaction/interfaces/transaction.router.ts b/src/modules/transaction/interfaces/transaction.router.ts index 3d0b2d3..e4d40cf 100644 --- a/src/modules/transaction/interfaces/transaction.router.ts +++ b/src/modules/transaction/interfaces/transaction.router.ts @@ -2,10 +2,10 @@ import { Router } from "express"; import { inject, injectable } from "tsyringe"; import { RegisteredServicesEnum } from "../../../shared/DIcontainer/registeredServicesEnum"; import { BaseRouter } from "../../../shared/routers/baseRouter"; +import { catchAsync } from "../../../shared/routers/catchAsync"; import { RoutesEnum } from "../../../shared/routers/routes.enum"; import { Logger } from "../../../shared/utils/logger"; import { TransactionController } from "./transaction.controller"; - @injectable() export class TransactionRouter extends BaseRouter { constructor( @@ -27,6 +27,10 @@ export class TransactionRouter extends BaseRouter { this.transactionController.validateTransaction, this.transactionController.processTransaction ); + router + .route(RoutesEnum.QUEUE) + .post(catchAsync(this.transactionController.postToQueue)); + return router; } } diff --git a/src/server.ts b/src/server.ts index 60ad7ca..0fdeb92 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,3 +1,4 @@ +import "dotenv/config"; import { Router } from "express"; import "reflect-metadata"; import { DIContainer } from "./shared/DIcontainer/container"; diff --git a/src/shared/DIcontainer/container.ts b/src/shared/DIcontainer/container.ts index 5f7b01a..ebf90f6 100644 --- a/src/shared/DIcontainer/container.ts +++ b/src/shared/DIcontainer/container.ts @@ -6,10 +6,12 @@ import { TransactionService } from "../../modules/transaction/domain/services/tr import { WebhookService } from "../../modules/transaction/domain/services/webhook.service"; import { TransactionController } from "../../modules/transaction/interfaces/transaction.controller"; import { TransactionRouter } from "../../modules/transaction/interfaces/transaction.router"; +import { RabbitClient } from "../clients/rabbitMQ/rabbit.client"; import { WebhookClient } from "../clients/webhook/webhook.client"; import { RouterService } from "../routers/router.service"; import { Logger } from "../utils/logger"; import { RegisteredServicesEnum } from "./registeredServicesEnum"; + // Utilities container.register(RegisteredServicesEnum.APP_LOGGER, { useClass: Logger, @@ -53,4 +55,10 @@ container.register( RegisteredServicesEnum.TRANSACTION_ROUTER, { useClass: TransactionRouter } ); + +// RabbitMQ client +container.register(RegisteredServicesEnum.RABBIT_CLIENT, { + useClass: RabbitClient, +}); + export const DIContainer = container; diff --git a/src/shared/DIcontainer/registeredServicesEnum.ts b/src/shared/DIcontainer/registeredServicesEnum.ts index af91f7c..4d6d405 100644 --- a/src/shared/DIcontainer/registeredServicesEnum.ts +++ b/src/shared/DIcontainer/registeredServicesEnum.ts @@ -18,4 +18,7 @@ export enum RegisteredServicesEnum { TRANSACTION_CONTROLLER = "TransactionController", TRANSACTION_SERVICE = "TransactionService", PROCESS_TRANSACTION_USE_CASE = "ProcessTransactionUseCase", + + // RabbitMQ client + RABBIT_CLIENT = "RabbitClient", } diff --git a/src/shared/clients/rabbitMQ/rabbit.client.ts b/src/shared/clients/rabbitMQ/rabbit.client.ts new file mode 100644 index 0000000..b142ee9 --- /dev/null +++ b/src/shared/clients/rabbitMQ/rabbit.client.ts @@ -0,0 +1,110 @@ +import * as amqp from "amqplib"; +import { Channel, ChannelModel } from "amqplib"; +import { inject, injectable } from "tsyringe"; +import { RegisteredServicesEnum } from "../../DIcontainer/registeredServicesEnum"; +import { BaseClass } from "../../utils/log-prefix.class"; +import { Logger } from "../../utils/logger"; + +@injectable() +export class RabbitClient extends BaseClass { + // CORRECTED: The connection property should be of type Connection + private connection: ChannelModel | null = null; + private channel: Channel | null = null; + private readonly rabbitUrl: string | null = null; + private readonly queueName: string | null = null; + + constructor( + @inject(RegisteredServicesEnum.APP_LOGGER) + protected readonly appLogger: Logger + ) { + super(appLogger); + this.rabbitUrl = this.setRabbitUrl(); + this.queueName = this.setQueueName(); + this.appLogger.info( + `${this.logPrefix} RabbitClient initialized with URL: ${this.rabbitUrl} and Queue: ${this.queueName}` + ); + } + + public async connect(): Promise { + try { + if (this.rabbitUrl === null || this.queueName === null) { + throw new Error("RabbitMQ URL or Queue Name is not set"); + } + this.connection = await amqp.connect(this.rabbitUrl); + if (!this.connection) { + throw new Error("Failed to create RabbitMQ connection"); + } + this.appLogger.info( + `${this.logPrefix} RabbitMQ connection established: ${this.connection}` + ); + this.channel = await this.connection.createChannel(); + this.appLogger.info( + `${this.logPrefix} RabbitMQ connected with the channel: ${this.channel}.` + ); + + if (!this.channel) { + throw new Error("Failed to create RabbitMQ channel"); + } + + await this.channel.assertQueue(this.queueName, { durable: true }); + + this.connection.on("error", (err: Error) => { + this.appLogger.error( + `${this.logPrefix} RabbitMQ connection error: ${err}` + ); + }); + + this.channel.on("error", (err: Error) => { + this.appLogger.error( + `${this.logPrefix} RabbitMQ channel error: ${err}` + ); + }); + } catch (error) { + this.appLogger.error( + `${this.logPrefix} Failed to connect to RabbitMQ: ${error}` + ); + + setTimeout(() => this.connect(), 5000); + } + } + + // The method signature and body here are correct. + public sendToQueue(message: string): void { + if (!this.channel || !this.queueName) { + throw new Error("RabbitMQ channel is not initialized"); + } + this.appLogger.info( + `${this.logPrefix} Sending message to queue: ${ + this.channel.bindQueue.name + }, ${this.queueName}, ${JSON.stringify( + this.channel.connection.serverProperties + )}` + ); + this.channel.sendToQueue(this.queueName, Buffer.from(message), { + persistent: true, + }); + this.appLogger.info( + `${this.logPrefix} Message: ${message} sent to queue: ${this.queueName}` + ); + } + + private setRabbitUrl(): string { + const url = process.env.RABBIT_URL; + if (!url) { + const errorMsg = `${this.logPrefix} RABBIT_URL is not set in environment variables`; + this.appLogger.error(errorMsg); + throw new Error(errorMsg); + } + return url; + } + + private setQueueName(): string { + const queueName = process.env.RABBIT_QUEUE_NAME; + if (!queueName) { + const errorMsg = `${this.logPrefix} RABBIT_QUEUE_NAME is not set in environment variables`; + this.appLogger.error(errorMsg); + throw new Error(errorMsg); + } + return queueName; + } +} diff --git a/src/shared/clients/webhook/webhook.client.ts b/src/shared/clients/webhook/webhook.client.ts index c10cda3..42d92aa 100644 --- a/src/shared/clients/webhook/webhook.client.ts +++ b/src/shared/clients/webhook/webhook.client.ts @@ -26,7 +26,6 @@ export class WebhookClient extends BaseClass { `${this.logPrefix} Received response with status: ${resp.status}` ); - // Log the response body for debugging this.appLogger.info( `${this.logPrefix} Response body: ${JSON.stringify(resp.data)}` ); @@ -37,7 +36,6 @@ export class WebhookClient extends BaseClass { `${this.logPrefix} Error sending webhook request: ${error.message}` ); - // If it's an axios error with response, return the response if (error.response) { this.appLogger.info( `${this.logPrefix} Received error response with status: ${error.response.status}` @@ -50,7 +48,6 @@ export class WebhookClient extends BaseClass { return error.response; } - // If it's a network error or other issue, re-throw throw error; } } diff --git a/src/shared/routers/catchAsync.ts b/src/shared/routers/catchAsync.ts new file mode 100644 index 0000000..4293049 --- /dev/null +++ b/src/shared/routers/catchAsync.ts @@ -0,0 +1,7 @@ +import { NextFunction, Request, Response } from "express"; + +export const catchAsync = (fn: Function) => { + return (req: Request, res: Response, next: NextFunction) => { + fn(req, res, next).catch(next); + }; +}; diff --git a/src/shared/routers/routes.enum.ts b/src/shared/routers/routes.enum.ts index 8b9c35f..5d0f0d6 100644 --- a/src/shared/routers/routes.enum.ts +++ b/src/shared/routers/routes.enum.ts @@ -3,4 +3,5 @@ export enum RoutesEnum { API = "/api", HEALTH_CHECK = `${RoutesEnum.API}/health`, TRANSACTION = `${RoutesEnum.API}/transaction`, + QUEUE = "/queue", } diff --git a/src/shared/server/app.ts b/src/shared/server/app.ts index e87f0d1..684f3fb 100644 --- a/src/shared/server/app.ts +++ b/src/shared/server/app.ts @@ -10,6 +10,7 @@ import express, { import { RoutesEnum } from "../routers/routes.enum"; import { ValidationError } from "../utils/error"; import { Logger } from "../utils/logger"; + export class App { private get logPrefix(): string { return `[${this.constructor.name}]`; From e01ccfced9394ccbfe0c65b9431cec23d7ab8344 Mon Sep 17 00:00:00 2001 From: Lukasz Chmielewski Date: Mon, 15 Sep 2025 18:31:32 +0200 Subject: [PATCH 2/3] Added the subscriber --- .../process-transaction.use-case.ts | 3 + .../process-trx-event.use-case.ts | 34 +++++ .../interfaces/transaction.controller.ts | 30 +++-- .../interfaces/transaction.router.ts | 8 +- src/server.ts | 7 +- src/shared/DIcontainer/container.ts | 14 +- .../DIcontainer/registeredServicesEnum.ts | 2 + src/shared/clients/rabbitMQ/output.ts | 6 + src/shared/clients/rabbitMQ/rabbit.client.ts | 122 ++++++++++++++---- src/shared/clients/rabbitMQ/rabbit.service.ts | 44 +++++++ 10 files changed, 226 insertions(+), 44 deletions(-) create mode 100644 src/modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case.ts create mode 100644 src/shared/clients/rabbitMQ/output.ts create mode 100644 src/shared/clients/rabbitMQ/rabbit.service.ts diff --git a/src/modules/transaction/application/process-transaction-use-case/process-transaction.use-case.ts b/src/modules/transaction/application/process-transaction-use-case/process-transaction.use-case.ts index 30ea435..d8ce69a 100644 --- a/src/modules/transaction/application/process-transaction-use-case/process-transaction.use-case.ts +++ b/src/modules/transaction/application/process-transaction-use-case/process-transaction.use-case.ts @@ -18,6 +18,9 @@ export class ProcessTransactionUseCase extends BaseClass { private readonly transactionService: TransactionService ) { super(appLogger); + this.appLogger.info( + `${this.logPrefix} ${RegisteredServicesEnum.PROCESS_TRANSACTION_USE_CASE} initialized` + ); } public async run( diff --git a/src/modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case.ts b/src/modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case.ts new file mode 100644 index 0000000..8d68c62 --- /dev/null +++ b/src/modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case.ts @@ -0,0 +1,34 @@ +import { inject, injectable } from "tsyringe"; +import { RabbitClient } from "../../../../shared/clients/rabbitMQ/rabbit.client"; +import { RegisteredServicesEnum } from "../../../../shared/DIcontainer/registeredServicesEnum"; +import { BaseClass } from "../../../../shared/utils/log-prefix.class"; +import { Logger } from "../../../../shared/utils/logger"; +import { TransactionService } from "../../domain/services/transaction.service"; +import { Transaction } from "../../domain/transaction.entity"; +import { TransactionInput } from "../input"; + +@injectable() +export class ProcessTrxEventUseCase extends BaseClass { + constructor( + @inject(RegisteredServicesEnum.APP_LOGGER) + protected readonly appLogger: Logger, + @inject(RegisteredServicesEnum.RABBIT_CLIENT) + private readonly rabbitClient: RabbitClient, + @inject(RegisteredServicesEnum.TRANSACTION_SERVICE) + private readonly transactionService: TransactionService + ) { + super(appLogger); + this.appLogger.info( + `${this.logPrefix} ${RegisteredServicesEnum.PROCESS_TRX_EVENT_USE_CASE} initialized` + ); + } + public async run(event: TransactionInput): Promise { + this.appLogger.info(`${this.logPrefix} RabbitMQ client connected.`); + this.appLogger.info( + `${this.logPrefix} Processing transaction event: ${JSON.stringify(event)}` + ); + const transaction = this.transactionService.processTransaction(event); + this.rabbitClient.sendToQueue(transaction); + return transaction; + } +} diff --git a/src/modules/transaction/interfaces/transaction.controller.ts b/src/modules/transaction/interfaces/transaction.controller.ts index c3cfc8e..b6a293a 100644 --- a/src/modules/transaction/interfaces/transaction.controller.ts +++ b/src/modules/transaction/interfaces/transaction.controller.ts @@ -2,7 +2,6 @@ import { HttpStatusCode } from "axios"; import { NextFunction, Response } from "express"; import Joi from "joi"; import { inject, injectable } from "tsyringe"; -import { RabbitClient } from "../../../shared/clients/rabbitMQ/rabbit.client"; import { RegisteredServicesEnum } from "../../../shared/DIcontainer/registeredServicesEnum"; import { HttpMethodEnum } from "../../../shared/types/http-methods"; import { ValidationError } from "../../../shared/utils/error"; @@ -10,10 +9,10 @@ import { BaseClass } from "../../../shared/utils/log-prefix.class"; import { Logger } from "../../../shared/utils/logger"; import { GatewayOutput } from "../application/output"; import { ProcessTransactionUseCase } from "../application/process-transaction-use-case/process-transaction.use-case"; +import { ProcessTrxEventUseCase } from "../application/process-trx-event.use-case/process-trx-event.use-case"; import { CURRENCY_TYPE } from "../domain/validation/currency"; import { TRANSACTION_STATUS_TYPE } from "../domain/validation/status"; import { RequestWithSafeFields } from "./request.interface"; - @injectable() export class TransactionController extends BaseClass { constructor( @@ -21,8 +20,8 @@ export class TransactionController extends BaseClass { protected readonly appLogger: Logger, @inject(RegisteredServicesEnum.PROCESS_TRANSACTION_USE_CASE) private readonly processTransactionUseCase: ProcessTransactionUseCase, - @inject(RegisteredServicesEnum.RABBIT_CLIENT) - private readonly rabbitClient: RabbitClient + @inject(RegisteredServicesEnum.PROCESS_TRX_EVENT_USE_CASE) + private readonly processTrxEventUseCase: ProcessTrxEventUseCase ) { super(appLogger); } @@ -137,20 +136,27 @@ export class TransactionController extends BaseClass { } }; + // TODO remove the route after the holistic migration to RabbitMQ public postToQueue = async ( - req: Request, + req: RequestWithSafeFields, resp: Response, next: NextFunction ): Promise => { try { - const msg = `${this.logPrefix} Testing transaction - ${HttpMethodEnum.POST} - ${req.url}`; + const input = req.safeFields!; + const msg = `${this.logPrefix} Processing transaction - ${input}`; this.appLogger.info(msg); - await this.rabbitClient.connect(); - this.appLogger.info(`${this.logPrefix} RabbitMQ client connected.`); - this.rabbitClient.sendToQueue(msg); - - this.appLogger.info(`${this.logPrefix} Message sent to RabbitMQ: ${msg}`); - resp.status(HttpStatusCode.Ok).send({ status: msg }); + const transaction = await this.processTrxEventUseCase.run(input); + this.appLogger.info( + `${ + this.logPrefix + } ProcessTrxEventUseCase executed successfully for input: ${JSON.stringify( + transaction + )}` + ); + resp + .status(HttpStatusCode.Ok) + .send({ transaction: JSON.stringify(transaction) }); } catch (error) { this.appLogger.error( `${this.logPrefix} Error processing send to Rabbit request: ${ diff --git a/src/modules/transaction/interfaces/transaction.router.ts b/src/modules/transaction/interfaces/transaction.router.ts index e4d40cf..0446cbe 100644 --- a/src/modules/transaction/interfaces/transaction.router.ts +++ b/src/modules/transaction/interfaces/transaction.router.ts @@ -2,10 +2,10 @@ import { Router } from "express"; import { inject, injectable } from "tsyringe"; import { RegisteredServicesEnum } from "../../../shared/DIcontainer/registeredServicesEnum"; import { BaseRouter } from "../../../shared/routers/baseRouter"; -import { catchAsync } from "../../../shared/routers/catchAsync"; import { RoutesEnum } from "../../../shared/routers/routes.enum"; import { Logger } from "../../../shared/utils/logger"; import { TransactionController } from "./transaction.controller"; + @injectable() export class TransactionRouter extends BaseRouter { constructor( @@ -27,9 +27,13 @@ export class TransactionRouter extends BaseRouter { this.transactionController.validateTransaction, this.transactionController.processTransaction ); + // TODO remove the route after the holistic migration to RabbitMQ router .route(RoutesEnum.QUEUE) - .post(catchAsync(this.transactionController.postToQueue)); + .post( + this.transactionController.validateTransaction, + this.transactionController.postToQueue + ); return router; } diff --git a/src/server.ts b/src/server.ts index 0fdeb92..8887941 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,13 +1,13 @@ import "dotenv/config"; import { Router } from "express"; import "reflect-metadata"; +import { RabbitService } from "./shared/clients/rabbitMQ/rabbit.service"; import { DIContainer } from "./shared/DIcontainer/container"; import { RegisteredServicesEnum } from "./shared/DIcontainer/registeredServicesEnum"; import { RouterService } from "./shared/routers/router.service"; import { RoutesEnum } from "./shared/routers/routes.enum"; import { App } from "./shared/server/app"; import { Logger } from "./shared/utils/logger"; - /** * The `bootstrap` function initializes a server application with defined routes and logging, starting * the server on a specified port. @@ -20,6 +20,11 @@ async function bootstrap() { const routerService = DIContainer.resolve( RegisteredServicesEnum.ROUTER_SERVICE ); + const rabbitService = DIContainer.resolve( + RegisteredServicesEnum.RABBIT_SERVICE + ); + await rabbitService.start(); + appLogger.info(`[Server] RabbitMQ service is active ✅`); const routes: Array<{ prefix: RoutesEnum; router: Router }> = routerService.setupRouters(); diff --git a/src/shared/DIcontainer/container.ts b/src/shared/DIcontainer/container.ts index ebf90f6..2a9aaf1 100644 --- a/src/shared/DIcontainer/container.ts +++ b/src/shared/DIcontainer/container.ts @@ -2,11 +2,13 @@ import { container } from "tsyringe"; import { HealthController } from "../../modules/health/interfaces/health.controller"; import { HealthRouter } from "../../modules/health/interfaces/health.router"; import { ProcessTransactionUseCase } from "../../modules/transaction/application/process-transaction-use-case/process-transaction.use-case"; +import { ProcessTrxEventUseCase } from "../../modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case"; import { TransactionService } from "../../modules/transaction/domain/services/transaction.service"; import { WebhookService } from "../../modules/transaction/domain/services/webhook.service"; import { TransactionController } from "../../modules/transaction/interfaces/transaction.controller"; import { TransactionRouter } from "../../modules/transaction/interfaces/transaction.router"; import { RabbitClient } from "../clients/rabbitMQ/rabbit.client"; +import { RabbitService } from "../clients/rabbitMQ/rabbit.service"; import { WebhookClient } from "../clients/webhook/webhook.client"; import { RouterService } from "../routers/router.service"; import { Logger } from "../utils/logger"; @@ -47,6 +49,12 @@ container.register( useClass: ProcessTransactionUseCase, } ); +container.register( + RegisteredServicesEnum.PROCESS_TRX_EVENT_USE_CASE, + { + useClass: ProcessTrxEventUseCase, + } +); container.register( RegisteredServicesEnum.TRANSACTION_CONTROLLER, { useClass: TransactionController } @@ -56,9 +64,11 @@ container.register( { useClass: TransactionRouter } ); -// RabbitMQ client +// RabbitMQ container.register(RegisteredServicesEnum.RABBIT_CLIENT, { useClass: RabbitClient, }); - +container.register(RegisteredServicesEnum.RABBIT_SERVICE, { + useClass: RabbitService, +}); export const DIContainer = container; diff --git a/src/shared/DIcontainer/registeredServicesEnum.ts b/src/shared/DIcontainer/registeredServicesEnum.ts index 4d6d405..3bcdb58 100644 --- a/src/shared/DIcontainer/registeredServicesEnum.ts +++ b/src/shared/DIcontainer/registeredServicesEnum.ts @@ -18,7 +18,9 @@ export enum RegisteredServicesEnum { TRANSACTION_CONTROLLER = "TransactionController", TRANSACTION_SERVICE = "TransactionService", PROCESS_TRANSACTION_USE_CASE = "ProcessTransactionUseCase", + PROCESS_TRX_EVENT_USE_CASE = "ProcessTrxEventUseCase", // RabbitMQ client RABBIT_CLIENT = "RabbitClient", + RABBIT_SERVICE = "RabbitService", } diff --git a/src/shared/clients/rabbitMQ/output.ts b/src/shared/clients/rabbitMQ/output.ts new file mode 100644 index 0000000..b708acd --- /dev/null +++ b/src/shared/clients/rabbitMQ/output.ts @@ -0,0 +1,6 @@ +import { Transaction } from "../../../modules/transaction/domain/transaction.entity"; + +export interface TransactionResponsePayload { + pattern: string; + data: Transaction; +} diff --git a/src/shared/clients/rabbitMQ/rabbit.client.ts b/src/shared/clients/rabbitMQ/rabbit.client.ts index b142ee9..7174918 100644 --- a/src/shared/clients/rabbitMQ/rabbit.client.ts +++ b/src/shared/clients/rabbitMQ/rabbit.client.ts @@ -1,17 +1,19 @@ import * as amqp from "amqplib"; import { Channel, ChannelModel } from "amqplib"; import { inject, injectable } from "tsyringe"; +import { Transaction } from "../../../modules/transaction/domain/transaction.entity"; import { RegisteredServicesEnum } from "../../DIcontainer/registeredServicesEnum"; import { BaseClass } from "../../utils/log-prefix.class"; import { Logger } from "../../utils/logger"; +import { TransactionResponsePayload } from "./output"; @injectable() export class RabbitClient extends BaseClass { - // CORRECTED: The connection property should be of type Connection private connection: ChannelModel | null = null; private channel: Channel | null = null; private readonly rabbitUrl: string | null = null; private readonly queueName: string | null = null; + private readonly requestQueueName: string | null = null; constructor( @inject(RegisteredServicesEnum.APP_LOGGER) @@ -20,6 +22,7 @@ export class RabbitClient extends BaseClass { super(appLogger); this.rabbitUrl = this.setRabbitUrl(); this.queueName = this.setQueueName(); + this.requestQueueName = this.setRequestQueueName(); this.appLogger.info( `${this.logPrefix} RabbitClient initialized with URL: ${this.rabbitUrl} and Queue: ${this.queueName}` ); @@ -27,59 +30,45 @@ export class RabbitClient extends BaseClass { public async connect(): Promise { try { - if (this.rabbitUrl === null || this.queueName === null) { + if ( + this.rabbitUrl === null || + this.queueName === null || + this.requestQueueName === null + ) { throw new Error("RabbitMQ URL or Queue Name is not set"); } this.connection = await amqp.connect(this.rabbitUrl); if (!this.connection) { throw new Error("Failed to create RabbitMQ connection"); } - this.appLogger.info( - `${this.logPrefix} RabbitMQ connection established: ${this.connection}` - ); + this.channel = await this.connection.createChannel(); - this.appLogger.info( - `${this.logPrefix} RabbitMQ connected with the channel: ${this.channel}.` - ); if (!this.channel) { throw new Error("Failed to create RabbitMQ channel"); } await this.channel.assertQueue(this.queueName, { durable: true }); + await this.channel.assertQueue(this.requestQueueName, { durable: true }); - this.connection.on("error", (err: Error) => { - this.appLogger.error( - `${this.logPrefix} RabbitMQ connection error: ${err}` - ); - }); - - this.channel.on("error", (err: Error) => { - this.appLogger.error( - `${this.logPrefix} RabbitMQ channel error: ${err}` - ); - }); + this.setupErrorListeners(); } catch (error) { this.appLogger.error( `${this.logPrefix} Failed to connect to RabbitMQ: ${error}` ); - setTimeout(() => this.connect(), 5000); } } - // The method signature and body here are correct. - public sendToQueue(message: string): void { + public sendToQueue(transaction: Transaction): void { if (!this.channel || !this.queueName) { throw new Error("RabbitMQ channel is not initialized"); } this.appLogger.info( - `${this.logPrefix} Sending message to queue: ${ - this.channel.bindQueue.name - }, ${this.queueName}, ${JSON.stringify( - this.channel.connection.serverProperties - )}` + `${this.logPrefix} Sending message to queue: ${this.queueName}` ); + const payload = this.buildPayload(transaction); + const message = JSON.stringify(payload); this.channel.sendToQueue(this.queueName, Buffer.from(message), { persistent: true, }); @@ -88,6 +77,63 @@ export class RabbitClient extends BaseClass { ); } + public async consumeFromQueue(handler: (msg: any) => void): Promise { + try { + if (!this.channel || !this.requestQueueName) { + throw new Error("RabbitMQ channel is not initialized"); + } + this.appLogger.info( + `${this.logPrefix} Consuming messages from queue: ${this.requestQueueName}` + ); + this.channel.assertQueue(this.requestQueueName, { durable: true }); + + this.channel.consume( + this.requestQueueName, + (msg) => { + if (msg !== null) { + const content = msg.content.toString(); + this.appLogger.info( + `${this.logPrefix} Message received from queue: ${this.requestQueueName} - ${content}` + ); + const payload: TransactionResponsePayload = JSON.parse(content); + this.channel!.ack(msg); + handler(payload); + } + }, + { noAck: false } + ); + } catch (error) { + this.appLogger.error( + `${this.logPrefix} Failed to consume from RabbitMQ: ${error}` + ); + setTimeout(() => this.consumeFromQueue(handler), 5000); + } + } + + private buildPayload(transaction: Transaction): TransactionResponsePayload { + const payload: TransactionResponsePayload = { + pattern: this.queueName!, + data: transaction, + }; + return payload; + } + private async closeConnection(): Promise { + try { + if (this.channel) { + await this.channel.close(); + this.appLogger.info(`${this.logPrefix} RabbitMQ channel closed.`); + } + if (this.connection) { + await this.connection.close(); + this.appLogger.info(`${this.logPrefix} RabbitMQ connection closed.`); + } + } catch (error) { + this.appLogger.error( + `${this.logPrefix} Error closing RabbitMQ connection: ${error}` + ); + } + } + private setRabbitUrl(): string { const url = process.env.RABBIT_URL; if (!url) { @@ -107,4 +153,26 @@ export class RabbitClient extends BaseClass { } return queueName; } + private setRequestQueueName(): string { + const queueName = process.env.RABBIT_REQ_QUEUE_NAME; + if (!queueName) { + const errorMsg = `${this.logPrefix} RABBIT_REQ_QUEUE_NAME is not set in environment variables`; + this.appLogger.error(errorMsg); + throw new Error(errorMsg); + } + return queueName; + } + private setupErrorListeners(): void { + this.connection!.on("error", (err: Error) => { + this.appLogger.error( + `${this.logPrefix} RabbitMQ connection error: ${err}` + ); + this.closeConnection(); + }); + + this.channel!.on("error", (err: Error) => { + this.appLogger.error(`${this.logPrefix} RabbitMQ channel error: ${err}`); + this.closeConnection(); + }); + } } diff --git a/src/shared/clients/rabbitMQ/rabbit.service.ts b/src/shared/clients/rabbitMQ/rabbit.service.ts new file mode 100644 index 0000000..104ef07 --- /dev/null +++ b/src/shared/clients/rabbitMQ/rabbit.service.ts @@ -0,0 +1,44 @@ +import { inject, injectable } from "tsyringe"; +import { RegisteredServicesEnum } from "../../DIcontainer/registeredServicesEnum"; +import { BaseClass } from "../../utils/log-prefix.class"; +import { Logger } from "../../utils/logger"; +import { RabbitClient } from "./rabbit.client"; + +@injectable() +export class RabbitService extends BaseClass { + constructor( + @inject(RegisteredServicesEnum.APP_LOGGER) + protected readonly appLogger: Logger, + @inject(RegisteredServicesEnum.RABBIT_CLIENT) + private readonly rabbitClient: RabbitClient + ) { + super(appLogger); + this.appLogger.info( + `${this.logPrefix} ${RegisteredServicesEnum.RABBIT_CLIENT} initialized.` + ); + } + + /** + * The `start` function in TypeScript asynchronously connects to a RabbitMQ client, logs a message + * upon successful connection, and then consumes messages from a queue using a specified handler + * function. + */ + public async start(): Promise { + try { + await this.rabbitClient.connect(); + this.appLogger.info(`${this.logPrefix} RabbitMQ client connected.`); + await this.rabbitClient.consumeFromQueue(this.handleTrxEvent); + } catch (error) { + this.appLogger.error( + `${this.logPrefix} Error starting RabbitMQ service: ${error}` + ); + } + } + + private handleTrxEvent = (payload: any): void => { + // TODO provide the handler with the actual Use Case to process the event + this.appLogger.info( + `${this.logPrefix} Received transaction event. Printing payload: ${payload}.` + ); + }; +} From 2c3221f6dbc084244989916cc456a0f5529eb345 Mon Sep 17 00:00:00 2001 From: Lukasz Chmielewski Date: Tue, 16 Sep 2025 17:34:43 +0200 Subject: [PATCH 3/3] fixed rabbit --- .github/workflows/ci-with-jest-coverage.yml | 2 +- .vscode/settings.json | 1 + .../process-trx-event.use-case.ts | 14 +++--- .../interfaces/transaction.controller.ts | 35 +------------ .../interfaces/transaction.router.ts | 8 --- src/shared/clients/rabbitMQ/rabbit.client.ts | 49 ++++++++++++++----- src/shared/clients/rabbitMQ/rabbit.service.ts | 34 ++++++++++--- .../rabbitMQ/transaction.request.event.ts | 6 +++ 8 files changed, 79 insertions(+), 70 deletions(-) create mode 100644 src/shared/clients/rabbitMQ/transaction.request.event.ts diff --git a/.github/workflows/ci-with-jest-coverage.yml b/.github/workflows/ci-with-jest-coverage.yml index 6bac812..7564ee5 100644 --- a/.github/workflows/ci-with-jest-coverage.yml +++ b/.github/workflows/ci-with-jest-coverage.yml @@ -46,4 +46,4 @@ jobs: with: # Sets the minimum coverage threshold to 5% # You can adjust this value as your project grows - threshold: 60 + threshold: 30 diff --git a/.vscode/settings.json b/.vscode/settings.json index 81e2cd2..424ccb6 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,4 +16,5 @@ "cSpell.words": [ "autoincrement" ], + "postman.settings.dotenv-detection-notification-visibility": false, } \ No newline at end of file diff --git a/src/modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case.ts b/src/modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case.ts index 8d68c62..0e71028 100644 --- a/src/modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case.ts +++ b/src/modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case.ts @@ -1,11 +1,11 @@ import { inject, injectable } from "tsyringe"; import { RabbitClient } from "../../../../shared/clients/rabbitMQ/rabbit.client"; +import { TransactionRequestEvent } from "../../../../shared/clients/rabbitMQ/transaction.request.event"; import { RegisteredServicesEnum } from "../../../../shared/DIcontainer/registeredServicesEnum"; import { BaseClass } from "../../../../shared/utils/log-prefix.class"; import { Logger } from "../../../../shared/utils/logger"; import { TransactionService } from "../../domain/services/transaction.service"; import { Transaction } from "../../domain/transaction.entity"; -import { TransactionInput } from "../input"; @injectable() export class ProcessTrxEventUseCase extends BaseClass { @@ -22,13 +22,11 @@ export class ProcessTrxEventUseCase extends BaseClass { `${this.logPrefix} ${RegisteredServicesEnum.PROCESS_TRX_EVENT_USE_CASE} initialized` ); } - public async run(event: TransactionInput): Promise { - this.appLogger.info(`${this.logPrefix} RabbitMQ client connected.`); - this.appLogger.info( - `${this.logPrefix} Processing transaction event: ${JSON.stringify(event)}` - ); - const transaction = this.transactionService.processTransaction(event); - this.rabbitClient.sendToQueue(transaction); + public async run(event: TransactionRequestEvent): Promise { + const transactionInput = event.data.eventInput; + const transaction = + this.transactionService.processTransaction(transactionInput); + await this.rabbitClient.sendToQueue(transaction); return transaction; } } diff --git a/src/modules/transaction/interfaces/transaction.controller.ts b/src/modules/transaction/interfaces/transaction.controller.ts index b6a293a..8e7ae30 100644 --- a/src/modules/transaction/interfaces/transaction.controller.ts +++ b/src/modules/transaction/interfaces/transaction.controller.ts @@ -13,6 +13,7 @@ import { ProcessTrxEventUseCase } from "../application/process-trx-event.use-cas import { CURRENCY_TYPE } from "../domain/validation/currency"; import { TRANSACTION_STATUS_TYPE } from "../domain/validation/status"; import { RequestWithSafeFields } from "./request.interface"; + @injectable() export class TransactionController extends BaseClass { constructor( @@ -135,38 +136,4 @@ export class TransactionController extends BaseClass { next(error); } }; - - // TODO remove the route after the holistic migration to RabbitMQ - public postToQueue = async ( - req: RequestWithSafeFields, - resp: Response, - next: NextFunction - ): Promise => { - try { - const input = req.safeFields!; - const msg = `${this.logPrefix} Processing transaction - ${input}`; - this.appLogger.info(msg); - const transaction = await this.processTrxEventUseCase.run(input); - this.appLogger.info( - `${ - this.logPrefix - } ProcessTrxEventUseCase executed successfully for input: ${JSON.stringify( - transaction - )}` - ); - resp - .status(HttpStatusCode.Ok) - .send({ transaction: JSON.stringify(transaction) }); - } catch (error) { - this.appLogger.error( - `${this.logPrefix} Error processing send to Rabbit request: ${ - error instanceof Error ? error.message : String(error) - }` - ); - if (error instanceof Error && error.stack) { - this.appLogger.error(error.stack); - } - next(error); - } - }; } diff --git a/src/modules/transaction/interfaces/transaction.router.ts b/src/modules/transaction/interfaces/transaction.router.ts index 0446cbe..3d0b2d3 100644 --- a/src/modules/transaction/interfaces/transaction.router.ts +++ b/src/modules/transaction/interfaces/transaction.router.ts @@ -27,14 +27,6 @@ export class TransactionRouter extends BaseRouter { this.transactionController.validateTransaction, this.transactionController.processTransaction ); - // TODO remove the route after the holistic migration to RabbitMQ - router - .route(RoutesEnum.QUEUE) - .post( - this.transactionController.validateTransaction, - this.transactionController.postToQueue - ); - return router; } } diff --git a/src/shared/clients/rabbitMQ/rabbit.client.ts b/src/shared/clients/rabbitMQ/rabbit.client.ts index 7174918..6583d76 100644 --- a/src/shared/clients/rabbitMQ/rabbit.client.ts +++ b/src/shared/clients/rabbitMQ/rabbit.client.ts @@ -1,19 +1,21 @@ import * as amqp from "amqplib"; import { Channel, ChannelModel } from "amqplib"; -import { inject, injectable } from "tsyringe"; +import { inject, injectable, singleton } from "tsyringe"; import { Transaction } from "../../../modules/transaction/domain/transaction.entity"; import { RegisteredServicesEnum } from "../../DIcontainer/registeredServicesEnum"; import { BaseClass } from "../../utils/log-prefix.class"; import { Logger } from "../../utils/logger"; import { TransactionResponsePayload } from "./output"; +@singleton() @injectable() export class RabbitClient extends BaseClass { private connection: ChannelModel | null = null; private channel: Channel | null = null; private readonly rabbitUrl: string | null = null; - private readonly queueName: string | null = null; - private readonly requestQueueName: string | null = null; + public readonly queueName: string | null = null; + public readonly requestQueueName: string | null = null; + public isConnected = false; constructor( @inject(RegisteredServicesEnum.APP_LOGGER) @@ -28,7 +30,13 @@ export class RabbitClient extends BaseClass { ); } - public async connect(): Promise { + public async connect(): Promise { + if (this.isConnected) { + this.appLogger.info( + `${this.logPrefix} RabbitMQ client already connected.` + ); + return false; + } try { if ( this.rabbitUrl === null || @@ -37,13 +45,15 @@ export class RabbitClient extends BaseClass { ) { throw new Error("RabbitMQ URL or Queue Name is not set"); } + + // Await the connection this.connection = await amqp.connect(this.rabbitUrl); if (!this.connection) { throw new Error("Failed to create RabbitMQ connection"); } + // Await the channel creation this.channel = await this.connection.createChannel(); - if (!this.channel) { throw new Error("Failed to create RabbitMQ channel"); } @@ -51,16 +61,33 @@ export class RabbitClient extends BaseClass { await this.channel.assertQueue(this.queueName, { durable: true }); await this.channel.assertQueue(this.requestQueueName, { durable: true }); + this.isConnected = true; this.setupErrorListeners(); + + this.appLogger.info( + `${this.logPrefix} RabbitMQ client successfully connected and consuming ✅.` + ); + return true; } catch (error) { this.appLogger.error( `${this.logPrefix} Failed to connect to RabbitMQ: ${error}` ); setTimeout(() => this.connect(), 5000); + return false; } } - public sendToQueue(transaction: Transaction): void { + public async sendToQueue(transaction: Transaction): Promise { + if (!this.isConnected) { + this.appLogger.error( + `${this.logPrefix} RabbitMQ channel is not connected: ${this.isConnected}` + ); + const connected = await this.connect(); + if (connected) { + this.appLogger.info(`${connected}`); + } + this.isConnected = connected; + } if (!this.channel || !this.queueName) { throw new Error("RabbitMQ channel is not initialized"); } @@ -77,7 +104,7 @@ export class RabbitClient extends BaseClass { ); } - public async consumeFromQueue(handler: (msg: any) => void): Promise { + public consumeFromQueue(handler: (msg: any) => void): void { try { if (!this.channel || !this.requestQueueName) { throw new Error("RabbitMQ channel is not initialized"); @@ -92,12 +119,10 @@ export class RabbitClient extends BaseClass { (msg) => { if (msg !== null) { const content = msg.content.toString(); - this.appLogger.info( - `${this.logPrefix} Message received from queue: ${this.requestQueueName} - ${content}` - ); - const payload: TransactionResponsePayload = JSON.parse(content); - this.channel!.ack(msg); + const payload = JSON.parse(content); + handler(payload); + this.channel!.ack(msg); } }, { noAck: false } diff --git a/src/shared/clients/rabbitMQ/rabbit.service.ts b/src/shared/clients/rabbitMQ/rabbit.service.ts index 104ef07..fc2c5e9 100644 --- a/src/shared/clients/rabbitMQ/rabbit.service.ts +++ b/src/shared/clients/rabbitMQ/rabbit.service.ts @@ -1,8 +1,11 @@ import { inject, injectable } from "tsyringe"; +import { ProcessTrxEventUseCase } from "../../../modules/transaction/application/process-trx-event.use-case/process-trx-event.use-case"; import { RegisteredServicesEnum } from "../../DIcontainer/registeredServicesEnum"; +import { ValidationError } from "../../utils/error"; import { BaseClass } from "../../utils/log-prefix.class"; import { Logger } from "../../utils/logger"; import { RabbitClient } from "./rabbit.client"; +import { TransactionRequestEvent } from "./transaction.request.event"; @injectable() export class RabbitService extends BaseClass { @@ -10,7 +13,9 @@ export class RabbitService extends BaseClass { @inject(RegisteredServicesEnum.APP_LOGGER) protected readonly appLogger: Logger, @inject(RegisteredServicesEnum.RABBIT_CLIENT) - private readonly rabbitClient: RabbitClient + private readonly rabbitClient: RabbitClient, + @inject(RegisteredServicesEnum.PROCESS_TRX_EVENT_USE_CASE) + private readonly processTransactionEventUseCase: ProcessTrxEventUseCase ) { super(appLogger); this.appLogger.info( @@ -25,9 +30,10 @@ export class RabbitService extends BaseClass { */ public async start(): Promise { try { - await this.rabbitClient.connect(); - this.appLogger.info(`${this.logPrefix} RabbitMQ client connected.`); - await this.rabbitClient.consumeFromQueue(this.handleTrxEvent); + const connected = await this.rabbitClient.connect(); + if (connected) { + this.rabbitClient.consumeFromQueue(this.handleTrxEvent); + } } catch (error) { this.appLogger.error( `${this.logPrefix} Error starting RabbitMQ service: ${error}` @@ -35,10 +41,24 @@ export class RabbitService extends BaseClass { } } - private handleTrxEvent = (payload: any): void => { - // TODO provide the handler with the actual Use Case to process the event + private handleTrxEvent = async ( + event: TransactionRequestEvent + ): Promise => { this.appLogger.info( - `${this.logPrefix} Received transaction event. Printing payload: ${payload}.` + `${ + this.logPrefix + } Received transaction event. Printing event: ${JSON.stringify(event)}.` + ); + if (event.pattern !== this.rabbitClient.requestQueueName) { + const msg = `${this.logPrefix} Pattern in event: "${event.pattern}" is different than in the Rabbit Client: "${this.rabbitClient.requestQueueName}"`; + this.appLogger.error(msg); + const error = new Error(msg); + throw new ValidationError(error); + } + + const transaction = await this.processTransactionEventUseCase.run(event); + this.appLogger.info( + `${this.logPrefix} Successfully processed ${JSON.stringify(transaction)}` ); }; } diff --git a/src/shared/clients/rabbitMQ/transaction.request.event.ts b/src/shared/clients/rabbitMQ/transaction.request.event.ts new file mode 100644 index 0000000..28176b6 --- /dev/null +++ b/src/shared/clients/rabbitMQ/transaction.request.event.ts @@ -0,0 +1,6 @@ +import { TransactionInput } from "../../../modules/transaction/application/input"; + +export interface TransactionRequestEvent { + pattern: string; + data: { eventInput: TransactionInput }; +}