Skip to content

Commit

Permalink
Integrated Kafka: implemented message production and consumption for …
Browse files Browse the repository at this point in the history
…specific topics
  • Loading branch information
sebastiansj70 committed Oct 9, 2024
1 parent 35948c5 commit 867a318
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import { TransactionRepository } from '../../domain/repositories/transaction.rep
import { CreateTransactionUseCase } from '../../application/createTransaction.usecase';
import { CreateTransactionDTO } from '../../domain/dtos/createTransaction.dto';
import { validateCreateTransactionDto, validateUpdateTransactionBody } from '../validation/transaction.validator';
import { sendTransactionMessage } from '../../infrastructure/messageBroker/kafkaProducer';
import { mapTransactionToKafkaMessage } from '../../infrastructure/mappers/transaction.mapper';
import { UpdatedData } from '../../domain/dtos/updatedTransaction.dto';


export class TransactionController {
Expand Down Expand Up @@ -33,33 +36,30 @@ export class TransactionController {
const createTransactionUseCase = new CreateTransactionUseCase(this.transactionRepository)
const savedTransaction = await createTransactionUseCase.execute(createTransactionDto);

const message = mapTransactionToKafkaMessage(savedTransaction)
await sendTransactionMessage(message);

return res.status(200).json(savedTransaction);
} catch (error) {
console.error('Error creating transaction:', error);
return res.status(500).json({ message: 'Error creating transaction', error });
}
};

public updateTransaction = async (req: Request, res: Response): Promise<Response | void> => {
public updateTransaction = async (data: UpdatedData): Promise<void> => {
try {

if (!validateUpdateTransactionBody(req.body)) {
return res.status(400).json({ message: 'incorrect body' });
if (!validateUpdateTransactionBody(data)) {
throw new Error('incorrect body')
}

const { id } = req.params;
const updatedData = req.body;
const id = data.transactionExternalId

const updateTransactionUseCase = new UpdateTransactionUseCase(this.transactionRepository);
const updatedTransaction = await updateTransactionUseCase.execute(id, updatedData);
await updateTransactionUseCase.execute(id, data);

return res.status(200).json(updatedTransaction);
} catch (error: any) {
if (error?.message === 'Transaction not found') {
return res.status(404).json({ message: 'Transaction not found' });
}
console.error('Error updating transaction:', error);
return res.status(500).json({ message: 'Error updating transaction', error });
throw error;
}
};

Expand Down
3 changes: 0 additions & 3 deletions apps/transactions/src/adapters/routes/transaction.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,5 @@ router.post('/create', async (req: Request, res: Response) => {
await transactionController.createTransaction(req, res)
});

router.put('/:id', async (req: Request, res: Response) => {
await transactionController.updateTransaction(req, res);
});

export default router;
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ export const validateCreateTransactionDto = (data: any): data is CreateTransacti
};

export const validateUpdateTransactionBody = (data: any): data is UpdatedData => {
const { transactionType, transactionStatus, value, createdAt } = data;
const { transactionExternalId, transactionType, transactionStatus, value, createdAt } = data;

if (
typeof transactionExternalId !== 'string' ||
typeof transactionType !== 'object' ||
typeof transactionType.name !== 'number' ||
typeof transactionStatus !== 'object' ||
typeof transactionStatus.name !== 'string' ||
typeof value !== 'number' ||
value <= 0 ||
!(createdAt instanceof Date)
typeof createdAt !== 'string'
) {
return false;
}
Expand Down
11 changes: 11 additions & 0 deletions apps/transactions/src/domain/dtos/kafkaTransactionMessage.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export interface KafkaTransactionMessage {
transactionExternalId: string;
transactionType: {
name: number;
};
transactionStatus: {
name: string;
};
value: number;
createdAt: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ export interface UpdatedData {
name: string;
};
value: number;
createdAt: Date;
createdAt: string;
}
34 changes: 34 additions & 0 deletions apps/transactions/src/infrastructure/kafka/kafkaConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Kafka } from 'kafkajs';
import dotenv from 'dotenv';
import { TransactionRepositoryImpl } from '../../domain/repositories/transaction.repository';
import { TransactionController } from '../../adapters/controllers/transaction.controller';

dotenv.config();

const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID,
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});

const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID as string });
const transactionRepository = new TransactionRepositoryImpl();
const transactionController = new TransactionController(transactionRepository)

export const consumeTransactionMessages = async () => {
await consumer.connect();
await consumer.subscribe({ topic: process.env.KAFKA_TOPIC_UPDATE || 'antifraud-transactions-status', fromBeginning: true });

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const transactionData = JSON.parse(message.value?.toString() || '{}');
console.log(`Mensaje recibido en el tópico ${topic} - Partición ${partition}:`, transactionData);

try {
await transactionController.updateTransaction(transactionData);
console.log("mensaje procesado exitosamente");
} catch (error) {
console.error(`Error al procesar la transacción en el tópico ${topic} - message ${message}: `, error);
}
},
});
};
16 changes: 16 additions & 0 deletions apps/transactions/src/infrastructure/mappers/transaction.mapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Transaction } from "../../domain/entities/transaction.entity";
import { KafkaTransactionMessage } from "../../domain/dtos/kafkaTransactionMessage.dto";

export const mapTransactionToKafkaMessage = (transaction: Transaction): KafkaTransactionMessage => {
return {
transactionExternalId: transaction.id,
transactionType: {
name: transaction.transferTypeId,
},
transactionStatus: {
name: transaction.status,
},
value: transaction.value,
createdAt: transaction.createdAt,
};
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Kafka } from 'kafkajs';
import dotenv from 'dotenv';

dotenv.config();

const kafka = new Kafka({
clientId: 'transactions-service',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});

const producer = kafka.producer();


export const sendTransactionMessage = async (transactionData: any) => {
await producer.connect();

try {
await producer.send({
topic: process.env.KAFKA_TOPIC || 'transactions',
messages: [
{
value: JSON.stringify(transactionData),
},
],
});
console.log('Transaction message sent:', transactionData);
} catch (error) {
console.error('Error sending transaction message:', error);
} finally {
await producer.disconnect();
}
};

18 changes: 18 additions & 0 deletions apps/transactions/src/infrastructure/startup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import AppDataSource from "./database/transaction.ormconfig";
import { consumeTransactionMessages } from "./kafka/kafkaConsumer";


export const initializeApp = async () => {
try {
AppDataSource.initialize()
.then(() => {
console.log('Conexión a la base de datos establecida');
})
.catch((error) => console.log('Error al conectar a la base de datos', error));

await consumeTransactionMessages();
} catch (error) {
console.error('Error al inicializar la aplicación:', error);
process.exit(1);
}
}
20 changes: 11 additions & 9 deletions apps/transactions/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import express from 'express';
import transactionRoutes from './adapters/routes/transaction.routes';
import 'reflect-metadata';
import AppDataSource from './infrastructure/database/transaction.ormconfig';
import { initializeApp } from './infrastructure/startup';

const app = express();
const startServer = async () => {
await initializeApp()
}

app.use(express.json());

app.use('/api/transactions', transactionRoutes);

AppDataSource.initialize()
.then(() => {
console.log('Conexión a la base de datos establecida');
startServer().catch((err) => {
console.error('Error al iniciar el servidor:', err);
process.exit(1);
});

app.listen(3000, () => {
console.log('Servidor ejecutándose en http://localhost:3000');
});
})
.catch((error) => console.log('Error al conectar a la base de datos', error));
app.listen(3000, () => {
console.log('Servidor ejecutándose en http://localhost:3000');
});

Original file line number Diff line number Diff line change
Expand Up @@ -101,58 +101,50 @@ describe('Transaction Controller', () => {
await transactionController.createTransaction(req as Request, res as Response)
result = await transactionRepository.findAll()

req.body = {
transactionExternalId: result[0].id,
const transactionData = {
transactionExternalId: result[0].id.toString(),
transactionType: {
name: 2
},
transactionStatus: {
name: "approved"
},
value: 120,
createdAt: new Date()
createdAt: "2024-10-09T00:37:07.504Z"
}

req.params = {
id: result[0].id
}
expect(result[0].transferTypeId).toBe(1)
expect(result[0].status).toBe("pending")

await transactionController.updateTransaction(req as Request, res as Response)
await transactionController.updateTransaction(transactionData)
result = await transactionRepository.findAll()

expect(result[0].transferTypeId).toBe(2)
expect(result[0].status).toBe("approved")
expect(result.length).toBe(1)
expect(res.status).toHaveBeenCalledWith(200);
expect(res.json).toHaveBeenCalledWith(result[0]);
expect(result[0].transferTypeId).toBe(2);
});

it('should return error when body is incorrect ', async () => {
req.body = {}

await transactionController.updateTransaction(req as Request, res as Response)
const transactionData = JSON.parse('{}');
await expect(transactionController.updateTransaction(transactionData)).rejects.toThrow(new Error('incorrect body'));

const result = await transactionRepository.findAll()
expect(result.length).toBe(0)
expect(res.status).toHaveBeenCalledWith(400);
expect(res.json).toHaveBeenCalledWith({ message: 'incorrect body' });
});

it('should return 404 if trnsaction does not exist', async () => {
req.params = {
id: 'non-existent-id'
}

req.body = {
const transactionData = {
transactionExternalId: 'non-existent-id',
transactionType: { name: 2 },
transactionStatus: { name: 'aprobado' },
transactionType: {
name: 2
},
transactionStatus: {
name: "approved"
},
value: 120,
createdAt: new Date()
};
createdAt: "2024-10-09T00:37:07.504Z"
}

await transactionController.updateTransaction(req as Request, res as Response)
expect(res.status).toHaveBeenCalledWith(404)
expect(res.json).toHaveBeenCalledWith({ message: 'Transaction not found' })
await expect(transactionController.updateTransaction(transactionData)).rejects.toThrow(new Error('Transaction not found'));
});

it('should return 500 if there is an internal server error', async () => {
Expand All @@ -167,27 +159,21 @@ describe('Transaction Controller', () => {
await transactionController.createTransaction(req as Request, res as Response)
result = await transactionRepository.findAll()

req.body = {
"transactionExternalId": result[0].id,
"transactionType": {
"name": 2
const transactionData = {
transactionExternalId: result[0].id.toString(),
transactionType: {
name: 2
},
"transactionStatus": {
"name": "approved"
transactionStatus: {
name: "approved"
},
"value": 120,
"createdAt": new Date()
}

req.params = {
id: result[0].id
value: 120,
createdAt: "2024-10-09T00:37:07.504Z"
}

jest.spyOn(transactionRepository, 'update').mockRejectedValue(new Error('Error server'))

await transactionController.updateTransaction(req as Request, res as Response);
expect(res.status).toHaveBeenCalledWith(500);
expect(res.json).toHaveBeenCalledWith({ message: 'Error updating transaction', error: new Error('Error server') });
await expect(transactionController.updateTransaction(transactionData)).rejects.toThrow(new Error('Error server'));

});

Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ services:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
ports:
- 9092:9092
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"dependencies": {
"dotenv": "^16.4.5",
"express": "^4.21.0",
"kafkajs": "^2.2.4",
"nodemon": "^3.1.7",
"pg": "^8.13.0",
"reflect-metadata": "^0.2.2",
Expand Down

0 comments on commit 867a318

Please sign in to comment.