diff --git a/apps/transactions/src/adapters/controllers/transaction.controller.ts b/apps/transactions/src/adapters/controllers/transaction.controller.ts index aee2cb391..3aa71e8f8 100644 --- a/apps/transactions/src/adapters/controllers/transaction.controller.ts +++ b/apps/transactions/src/adapters/controllers/transaction.controller.ts @@ -4,6 +4,9 @@ import { TransactionRepository } from '../../domain/repositories/transaction.rep import { CreateTransactionUseCase } from '../../application/createTransaction.usecase'; import { CreateTransactionDTO } from '../../domain/dtos/createTransaction.dto'; import { validateCreateTransactionDto, validateUpdateTransactionBody } from '../validation/transaction.validator'; +import { sendTransactionMessage } from '../../infrastructure/messageBroker/kafkaProducer'; +import { mapTransactionToKafkaMessage } from '../../infrastructure/mappers/transaction.mapper'; +import { UpdatedData } from '../../domain/dtos/updatedTransaction.dto'; export class TransactionController { @@ -33,6 +36,9 @@ export class TransactionController { const createTransactionUseCase = new CreateTransactionUseCase(this.transactionRepository) const savedTransaction = await createTransactionUseCase.execute(createTransactionDto); + const message = mapTransactionToKafkaMessage(savedTransaction) + await sendTransactionMessage(message); + return res.status(200).json(savedTransaction); } catch (error) { console.error('Error creating transaction:', error); @@ -40,26 +46,20 @@ export class TransactionController { } }; - public updateTransaction = async (req: Request, res: Response): Promise => { + public updateTransaction = async (data: UpdatedData): Promise => { try { - - if (!validateUpdateTransactionBody(req.body)) { - return res.status(400).json({ message: 'incorrect body' }); + if (!validateUpdateTransactionBody(data)) { + throw new Error('incorrect body') } - const { id } = req.params; - const updatedData = req.body; + const id = data.transactionExternalId const updateTransactionUseCase = new UpdateTransactionUseCase(this.transactionRepository); - const updatedTransaction = await updateTransactionUseCase.execute(id, updatedData); + await updateTransactionUseCase.execute(id, data); - return res.status(200).json(updatedTransaction); } catch (error: any) { - if (error?.message === 'Transaction not found') { - return res.status(404).json({ message: 'Transaction not found' }); - } console.error('Error updating transaction:', error); - return res.status(500).json({ message: 'Error updating transaction', error }); + throw error; } }; diff --git a/apps/transactions/src/adapters/routes/transaction.routes.ts b/apps/transactions/src/adapters/routes/transaction.routes.ts index b56280130..06c1aeaad 100644 --- a/apps/transactions/src/adapters/routes/transaction.routes.ts +++ b/apps/transactions/src/adapters/routes/transaction.routes.ts @@ -11,8 +11,5 @@ router.post('/create', async (req: Request, res: Response) => { await transactionController.createTransaction(req, res) }); -router.put('/:id', async (req: Request, res: Response) => { - await transactionController.updateTransaction(req, res); -}); export default router; diff --git a/apps/transactions/src/adapters/validation/transaction.validator.ts b/apps/transactions/src/adapters/validation/transaction.validator.ts index 99f7fa987..8aa47b0d6 100644 --- a/apps/transactions/src/adapters/validation/transaction.validator.ts +++ b/apps/transactions/src/adapters/validation/transaction.validator.ts @@ -19,16 +19,17 @@ export const validateCreateTransactionDto = (data: any): data is CreateTransacti }; export const validateUpdateTransactionBody = (data: any): data is UpdatedData => { - const { transactionType, transactionStatus, value, createdAt } = data; + const { transactionExternalId, transactionType, transactionStatus, value, createdAt } = data; if ( + typeof transactionExternalId !== 'string' || typeof transactionType !== 'object' || typeof transactionType.name !== 'number' || typeof transactionStatus !== 'object' || typeof transactionStatus.name !== 'string' || typeof value !== 'number' || value <= 0 || - !(createdAt instanceof Date) + typeof createdAt !== 'string' ) { return false; } diff --git a/apps/transactions/src/domain/dtos/kafkaTransactionMessage.dto.ts b/apps/transactions/src/domain/dtos/kafkaTransactionMessage.dto.ts new file mode 100644 index 000000000..74afcc0a7 --- /dev/null +++ b/apps/transactions/src/domain/dtos/kafkaTransactionMessage.dto.ts @@ -0,0 +1,11 @@ +export interface KafkaTransactionMessage { + transactionExternalId: string; + transactionType: { + name: number; + }; + transactionStatus: { + name: string; + }; + value: number; + createdAt: Date; +} diff --git a/apps/transactions/src/domain/dtos/updatedTransaction.dto.ts b/apps/transactions/src/domain/dtos/updatedTransaction.dto.ts index c1044da15..5407b0df0 100644 --- a/apps/transactions/src/domain/dtos/updatedTransaction.dto.ts +++ b/apps/transactions/src/domain/dtos/updatedTransaction.dto.ts @@ -7,5 +7,5 @@ export interface UpdatedData { name: string; }; value: number; - createdAt: Date; + createdAt: string; } diff --git a/apps/transactions/src/infrastructure/kafka/kafkaConsumer.ts b/apps/transactions/src/infrastructure/kafka/kafkaConsumer.ts new file mode 100644 index 000000000..851ddc458 --- /dev/null +++ b/apps/transactions/src/infrastructure/kafka/kafkaConsumer.ts @@ -0,0 +1,34 @@ +import { Kafka } from 'kafkajs'; +import dotenv from 'dotenv'; +import { TransactionRepositoryImpl } from '../../domain/repositories/transaction.repository'; +import { TransactionController } from '../../adapters/controllers/transaction.controller'; + +dotenv.config(); + +const kafka = new Kafka({ + clientId: process.env.KAFKA_CLIENT_ID, + brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], +}); + +const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID as string }); +const transactionRepository = new TransactionRepositoryImpl(); +const transactionController = new TransactionController(transactionRepository) + +export const consumeTransactionMessages = async () => { + await consumer.connect(); + await consumer.subscribe({ topic: process.env.KAFKA_TOPIC_UPDATE || 'antifraud-transactions-status', fromBeginning: true }); + + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + const transactionData = JSON.parse(message.value?.toString() || '{}'); + console.log(`Mensaje recibido en el tópico ${topic} - Partición ${partition}:`, transactionData); + + try { + await transactionController.updateTransaction(transactionData); + console.log("mensaje procesado exitosamente"); + } catch (error) { + console.error(`Error al procesar la transacción en el tópico ${topic} - message ${message}: `, error); + } + }, + }); +}; \ No newline at end of file diff --git a/apps/transactions/src/infrastructure/mappers/transaction.mapper.ts b/apps/transactions/src/infrastructure/mappers/transaction.mapper.ts new file mode 100644 index 000000000..9a93423d3 --- /dev/null +++ b/apps/transactions/src/infrastructure/mappers/transaction.mapper.ts @@ -0,0 +1,16 @@ +import { Transaction } from "../../domain/entities/transaction.entity"; +import { KafkaTransactionMessage } from "../../domain/dtos/kafkaTransactionMessage.dto"; + +export const mapTransactionToKafkaMessage = (transaction: Transaction): KafkaTransactionMessage => { + return { + transactionExternalId: transaction.id, + transactionType: { + name: transaction.transferTypeId, + }, + transactionStatus: { + name: transaction.status, + }, + value: transaction.value, + createdAt: transaction.createdAt, + }; +}; \ No newline at end of file diff --git a/apps/transactions/src/infrastructure/messageBroker/kafkaProducer.ts b/apps/transactions/src/infrastructure/messageBroker/kafkaProducer.ts new file mode 100644 index 000000000..b91d35586 --- /dev/null +++ b/apps/transactions/src/infrastructure/messageBroker/kafkaProducer.ts @@ -0,0 +1,33 @@ +import { Kafka } from 'kafkajs'; +import dotenv from 'dotenv'; + +dotenv.config(); + +const kafka = new Kafka({ + clientId: 'transactions-service', + brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], +}); + +const producer = kafka.producer(); + + +export const sendTransactionMessage = async (transactionData: any) => { + await producer.connect(); + + try { + await producer.send({ + topic: process.env.KAFKA_TOPIC || 'transactions', + messages: [ + { + value: JSON.stringify(transactionData), + }, + ], + }); + console.log('Transaction message sent:', transactionData); + } catch (error) { + console.error('Error sending transaction message:', error); + } finally { + await producer.disconnect(); + } +}; + diff --git a/apps/transactions/src/infrastructure/startup.ts b/apps/transactions/src/infrastructure/startup.ts new file mode 100644 index 000000000..f99389c13 --- /dev/null +++ b/apps/transactions/src/infrastructure/startup.ts @@ -0,0 +1,18 @@ +import AppDataSource from "./database/transaction.ormconfig"; +import { consumeTransactionMessages } from "./kafka/kafkaConsumer"; + + +export const initializeApp = async () => { + try { + AppDataSource.initialize() + .then(() => { + console.log('Conexión a la base de datos establecida'); + }) + .catch((error) => console.log('Error al conectar a la base de datos', error)); + + await consumeTransactionMessages(); + } catch (error) { + console.error('Error al inicializar la aplicación:', error); + process.exit(1); + } +} diff --git a/apps/transactions/src/server.ts b/apps/transactions/src/server.ts index eb8fc5f3f..a1d640d7b 100644 --- a/apps/transactions/src/server.ts +++ b/apps/transactions/src/server.ts @@ -1,21 +1,23 @@ import express from 'express'; import transactionRoutes from './adapters/routes/transaction.routes'; import 'reflect-metadata'; -import AppDataSource from './infrastructure/database/transaction.ormconfig'; +import { initializeApp } from './infrastructure/startup'; const app = express(); +const startServer = async () => { + await initializeApp() +} app.use(express.json()); app.use('/api/transactions', transactionRoutes); -AppDataSource.initialize() - .then(() => { - console.log('Conexión a la base de datos establecida'); +startServer().catch((err) => { + console.error('Error al iniciar el servidor:', err); + process.exit(1); +}); - app.listen(3000, () => { - console.log('Servidor ejecutándose en http://localhost:3000'); - }); - }) - .catch((error) => console.log('Error al conectar a la base de datos', error)); +app.listen(3000, () => { + console.log('Servidor ejecutándose en http://localhost:3000'); +}); diff --git a/apps/transactions/test/adapters/controllers/transaction.controller.test.ts b/apps/transactions/test/adapters/controllers/transaction.controller.test.ts index 0c2ec0b59..d8ff4dd1d 100644 --- a/apps/transactions/test/adapters/controllers/transaction.controller.test.ts +++ b/apps/transactions/test/adapters/controllers/transaction.controller.test.ts @@ -101,8 +101,8 @@ describe('Transaction Controller', () => { await transactionController.createTransaction(req as Request, res as Response) result = await transactionRepository.findAll() - req.body = { - transactionExternalId: result[0].id, + const transactionData = { + transactionExternalId: result[0].id.toString(), transactionType: { name: 2 }, @@ -110,49 +110,41 @@ describe('Transaction Controller', () => { name: "approved" }, value: 120, - createdAt: new Date() + createdAt: "2024-10-09T00:37:07.504Z" } - req.params = { - id: result[0].id - } + expect(result[0].transferTypeId).toBe(1) + expect(result[0].status).toBe("pending") - await transactionController.updateTransaction(req as Request, res as Response) + await transactionController.updateTransaction(transactionData) result = await transactionRepository.findAll() + expect(result[0].transferTypeId).toBe(2) + expect(result[0].status).toBe("approved") expect(result.length).toBe(1) - expect(res.status).toHaveBeenCalledWith(200); - expect(res.json).toHaveBeenCalledWith(result[0]); - expect(result[0].transferTypeId).toBe(2); }); it('should return error when body is incorrect ', async () => { - req.body = {} - - await transactionController.updateTransaction(req as Request, res as Response) + const transactionData = JSON.parse('{}'); + await expect(transactionController.updateTransaction(transactionData)).rejects.toThrow(new Error('incorrect body')); - const result = await transactionRepository.findAll() - expect(result.length).toBe(0) - expect(res.status).toHaveBeenCalledWith(400); - expect(res.json).toHaveBeenCalledWith({ message: 'incorrect body' }); }); it('should return 404 if trnsaction does not exist', async () => { - req.params = { - id: 'non-existent-id' - } - req.body = { + const transactionData = { transactionExternalId: 'non-existent-id', - transactionType: { name: 2 }, - transactionStatus: { name: 'aprobado' }, + transactionType: { + name: 2 + }, + transactionStatus: { + name: "approved" + }, value: 120, - createdAt: new Date() - }; + createdAt: "2024-10-09T00:37:07.504Z" + } - await transactionController.updateTransaction(req as Request, res as Response) - expect(res.status).toHaveBeenCalledWith(404) - expect(res.json).toHaveBeenCalledWith({ message: 'Transaction not found' }) + await expect(transactionController.updateTransaction(transactionData)).rejects.toThrow(new Error('Transaction not found')); }); it('should return 500 if there is an internal server error', async () => { @@ -167,27 +159,21 @@ describe('Transaction Controller', () => { await transactionController.createTransaction(req as Request, res as Response) result = await transactionRepository.findAll() - req.body = { - "transactionExternalId": result[0].id, - "transactionType": { - "name": 2 + const transactionData = { + transactionExternalId: result[0].id.toString(), + transactionType: { + name: 2 }, - "transactionStatus": { - "name": "approved" + transactionStatus: { + name: "approved" }, - "value": 120, - "createdAt": new Date() - } - - req.params = { - id: result[0].id + value: 120, + createdAt: "2024-10-09T00:37:07.504Z" } jest.spyOn(transactionRepository, 'update').mockRejectedValue(new Error('Error server')) - await transactionController.updateTransaction(req as Request, res as Response); - expect(res.status).toHaveBeenCalledWith(500); - expect(res.json).toHaveBeenCalledWith({ message: 'Error updating transaction', error: new Error('Error server') }); + await expect(transactionController.updateTransaction(transactionData)).rejects.toThrow(new Error('Error server')); }); diff --git a/docker-compose.yml b/docker-compose.yml index f9592e225..0a18116c1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,5 +22,6 @@ services: KAFKA_BROKER_ID: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9991 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' ports: - 9092:9092 diff --git a/package-lock.json b/package-lock.json index f9d444fcb..b7071f873 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "dotenv": "^16.4.5", "express": "^4.21.0", + "kafkajs": "^2.2.4", "nodemon": "^3.1.7", "pg": "^8.13.0", "reflect-metadata": "^0.2.2", @@ -4580,6 +4581,14 @@ "node": ">=6" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/kleur": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/kleur/-/kleur-3.0.3.tgz", diff --git a/package.json b/package.json index d549aafaf..18fa890da 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "dependencies": { "dotenv": "^16.4.5", "express": "^4.21.0", + "kafkajs": "^2.2.4", "nodemon": "^3.1.7", "pg": "^8.13.0", "reflect-metadata": "^0.2.2",