A NestJS module for RabbitMQ integration that provides easy-to-use abstractions for Direct, Fanout, and Topic exchanges.
npm install @iamnd/nest-rmq
- 🚀 Easy integration with NestJS applications
- 🔄 Support for Direct, Fanout, and Topic exchanges
- 🛡️ Built-in connection management and error handling
- 📨 Simple publish/subscribe patterns
- 🔌 Automatic reconnection handling
- 📝 TypeScript support
// app.module.ts
import { RabbitMQModule } from '@iamnd/nest-rmq';
@Module({
imports: [
RabbitMQModule.forRoot({
uri: 'amqp://localhost:5672',
}),
],
})
export class AppModule {}
// publisher.service.ts
import { Injectable } from '@nestjs/common';
import { RabbitMQService, ExchangeType } from '@iamnd/nest-rmq';
@Injectable()
export class PublisherService {
private readonly exchangeName = 'orders';
constructor(private readonly rabbitMQService: RabbitMQService) {
this.initialize();
}
private async initialize() {
await this.rabbitMQService.createExchange(
this.exchangeName,
ExchangeType.DIRECT
);
}
async publishOrder(order: any) {
await this.rabbitMQService.publish(
this.exchangeName,
'order.created',
order
);
}
}
// subscriber.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { RabbitMQService, ExchangeType } from '@iamnd/nest-rmq';
@Injectable()
export class SubscriberService implements OnModuleInit {
private readonly exchangeName = 'orders';
private readonly queueName = 'order-processing';
constructor(private readonly rabbitMQService: RabbitMQService) {}
async onModuleInit() {
// Create queue
await this.rabbitMQService.createQueue(this.queueName);
// Bind queue to exchange
await this.rabbitMQService.bindQueue(
this.queueName,
this.exchangeName,
'order.created'
);
// Subscribe to messages
await this.rabbitMQService.subscribe(
this.queueName,
this.handleOrder.bind(this)
);
}
private async handleOrder(message: any) {
console.log('Received order:', message);
// Process the order
}
}
// Direct exchange for point-to-point communication
await rabbitMQService.createExchange('orders', ExchangeType.DIRECT);
// Fanout exchange for broadcasting messages to all bound queues
await rabbitMQService.createExchange('notifications', ExchangeType.FANOUT);
// Topic exchange for pattern-based routing
await rabbitMQService.createExchange('events', ExchangeType.TOPIC);
// Subscribe to specific patterns
await rabbitMQService.bindQueue(
'audit-queue',
'events',
'order.*.success' // Matches: order.create.success, order.update.success, etc.
);
The module accepts the following configuration options:
RabbitMQModule.forRoot({
uri: 'amqp://localhost:5672', // RabbitMQ connection URI
// Additional options can be added here
});
The module includes built-in error handling and connection management:
- Automatic reconnection on connection loss
- Message acknowledgment handling
- Error logging with NestJS Logger
- Connection event handling
Example error handling in subscribers:
await this.rabbitMQService.subscribe(
queueName,
async (message) => {
try {
await this.processMessage(message);
// Message is automatically acknowledged on success
} catch (error) {
// Message is automatically nack'd on error
console.error('Error processing message:', error);
}
}
);
- Create exchanges in publishers, not subscribers
- Use meaningful exchange and queue names
- Implement proper error handling
- Use TypeScript interfaces for message types
- Follow the single responsibility principle
Contributions are welcome! Please feel free to submit a Pull Request.
MIT
ND
For issues and feature requests, please create an issue on GitHub.
Made with ❤️ for the NestJS community.