-
-
Notifications
You must be signed in to change notification settings - Fork 7.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature(@nestjs/microservices) RabbitMQ support #887
Conversation
package.json
Outdated
@@ -35,6 +35,8 @@ | |||
"@nestjs/microservices": "5.0.0", | |||
"@nestjs/testing": "5.0.0", | |||
"@nestjs/websockets": "5.0.0", | |||
"@types/amqplib": "^0.5.7", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, can you move this line to dev dependencies?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, my mistake. Moved it to dev dependencies.
@@ -35,6 +35,8 @@ | |||
"@nestjs/microservices": "5.0.0", | |||
"@nestjs/testing": "5.0.0", | |||
"@nestjs/websockets": "5.0.0", | |||
"@types/amqplib": "^0.5.7", | |||
"amqplib": "^0.5.2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and also we are forced to download this package if we don't even will not use RMQ !
could you make it as optional
if possible 😃 !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if microservices
is required to run, it has to be installed, whether you like it or not. (could lead to can't read property of undefined error at runtime). In the end, I believe that adapters should be in separate packages to decrease installed bundle size, but that was a mistake in structuring from the beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but do you know, sometimes mistakes are useful.
I updated merge request. I moved "@types/amqplib" to dev dependencies and rewrote client and server with correlationId like official documentation suggested: https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html. To match requests and responses I used EventEmitter. So now there is only one temporary queue for each client. No additional queue created on publish(). |
I don't personally use microservices, so I'm reluctant to leave an approval, but otherwise this looks good to me |
Maybe I am missing it, but is it possible to specify rabbitmq options like
Also is there a chance you could offer a generic function for Nice to see more RabbitMQ users here, I asked Kamil several months ago if he would accept a pull request for it as I wanted to commit that feature as well, but I never got a response. I hope he will accept the feature. |
@weeco, thank's for feedback and ideas! I dived into documentation:
As for generic functions, you are right. In |
@AlariCode Sure, my use case is a "request worker" which is consuming a rate limited REST Api. Each worker is allowed to perform x requests every second and I am using a prefetch of 100 there. I always want to use the full rate limit. For this purpose I introduced a second channel which publishes token at the API's rate limit (400 tokens published every second, equally distributed over a second). So what I am doing is:
In general I don't see any issue with providing the user all original amqplib options? Why wouldn't you make all options available to him, it shouldn't be more implementation effort either. |
Thank’s! I’ll add configs to my pull request. |
I updated request and added some options. Everything seams fine when I run client with this.client = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
url: `amqp://${conf.amqp.login}:${conf.amqp.password}@${conf.amqp.host}`,
queue: 'test',
queueOptions: { durable: false }
}
}); But when I try using @Client({
transport: Transport.RMQ,
options: {
url: `amqp://${conf.amqp.login}:${conf.amqp.password}@${conf.amqp.host}`,
queue: 'test',
queueOptions: { durable: false }
}
}) I got and error after Error: connect ECONNREFUSED 127.0.0.1:3000
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1161:14) Can anyone help me with decorator? It seams like it is trying t use TCP transport. |
Hi @AlariCode, |
Sure, I'll add PR to docs :) |
Another point - we need to cover RabbitMQ in the integration tests |
@AlariCode Did you try increasing the timeouts for tests? Integration tests often have these timing issues - see https://jestjs.io/docs/en/troubleshooting |
I looked deeper into the problem and found some confusing behavior: // Timeout error (no answer)
@Get('a')
a(): Observable<string> {
let msg = 'test ' + Math.random();
console.log('Client sent: ' + msg);
return this.client.send<string, string>({ cmd: 'test' }, msg);
}
// Works fine!
@Get('b')
b(): Promise<string> {
let msg = 'test ' + Math.random();
console.log('Client sent: ' + msg);
return this.client.send<string, string>({ cmd: 'test' }, msg).pipe(take(1)).toPromise();
} Strange difference in Promise and Observable. That's why integration tests failed with RMQ transport. Any suggestions what could be wrong? |
Fixed return Observable. The problem was with 'responseEmitter.once'. It was not waiting for 'isDisposed: true'. Changed it to 'responseEmitter.on' so it will wait for all events. |
It seems that few conflicts have appeared recently. Would you like to take charge of it? |
@kamilmysliwiec merged master into feature/rmq-transport and resolved conflicts. |
I suggest you also look
Without this, it's impossible to work in a large project These are my thoughts on this topic :) Example import * as amqp from 'amqplib';
/** @types/amqp-connection-manager has old types, so for now I can do only this */
import AmqpManager = require('amqp-connection-manager');
export interface LoggerInterface {
log(message: string): any;
info(message: string): any;
warn(message: string): any;
error(message: string, trace?: string): any;
}
/**
* How to work this implementation ?
* @link http://skillachie.com/2014/06/27/rabbitmq-exchange-to-exchange-bindings-ampq/
*
* What is routingKey ?
* @link https://www.cloudamqp.com/blog/2015-09-03-part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html
* @link https://www.rabbitmq.com/tutorials/tutorial-five-javascript.html
*/
export class EventDispatcher {
private readonly connection: AmqpManager.AmqpConnectionManager;
private publisherChannelWrapper: AmqpManager.ChannelWrapper;
/**
* @param {string} currentPlatformName
* @param {string[]} hosts
* @param {LoggerInterface} logger - any logger, for example console
*/
constructor(private readonly currentPlatformName: string, private readonly hosts: string[], private readonly logger: LoggerInterface) {
this.connection = AmqpManager.connect(this.hosts, {
/** @see https://github.com/squaremo/amqp.node/issues/331 */
// servername: parsedURI.hostname,
heartbeatIntervalInSeconds: 5,
});
this.connection.on('connect', () => {
this.logger.info(`amqp connected to the platform "${this.currentPlatformName}"`);
});
this.connection.on('disconnect', params => {
this.logger.error(`amqp disconnected from the "${this.currentPlatformName}"`, params.err.stack);
});
}
/**
* Dispatch event with a message
*
* @param {string} eventName
* @param {object} message
*/
async dispatch<T>(eventName: string, message: T): Promise<void> {
const channelWrapper = await this.getPublisherChannelWrapper();
const fullEventName = `${this.currentPlatformName}/${eventName}`;
try {
await channelWrapper.publish('events_gateway', `${fullEventName}`, message, {
deliveryMode: 2,
contentType: 'application/json',
});
// TODO need to implement
// @link https://github.com/squaremo/amqp.node/issues/103
// await channel.waitForConfirms();
this.logger.info(`amqp event published "${fullEventName}" with payload ${JSON.stringify(message)}`);
} catch (error) {
this.logger.error(`amqp message was rejected for event "${fullEventName}"`, error.stack);
channelWrapper.close();
this.connection.close();
}
}
/**
* TODO add retry configuration https://felipeelias.github.io/rabbitmq/2016/02/22/rabbitmq-exponential-backoff.html
* Subscribe to the events by routingKey
*
* @param {string} toPlatformName - the name of the platform from which to take the routingKey
* @param {string} routingKey - read this https://www.cloudamqp.com/blog/2015-09-03-part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html
* @param {function} callback
*/
async subscribe<T>(
toPlatformName,
routingKey: string,
callback: (content: T, message: amqp.Message, channel: amqp.ConfirmChannel) => Promise<boolean>,
): Promise<void> {
const fullRoutingKey = `${this.currentPlatformName}/${toPlatformName}/${routingKey}`;
const channelWrapper = this.connection.createChannel({
json: true,
setup: async (channel: amqp.ConfirmChannel) => {
await this.commonChannelSetup(channel);
const assertQueue: amqp.Replies.AssertQueue = await channel.assertQueue(fullRoutingKey, { durable: true, autoDelete: false });
return Promise.all([
channel.bindQueue(assertQueue.queue, 'events_distributor', `${toPlatformName}/${routingKey}`),
channel.prefetch(1),
channel.consume(
assertQueue.queue,
async (message: amqp.Message | null) => {
if (!message) {
this.logger.error(`No message`);
return;
}
this.logger.info(`amqp received event "${message.fields.routingKey}" with payload ${message.content.toString()}`);
const content: T = JSON.parse(message.content.toString());
/* const isAcknowledged = */ await callback(content, message, channel);
// channelWrapper.ack(message);
},
{ noAck: true },
),
]);
},
});
await channelWrapper.waitForConnect();
this.logger.info(`Listening for messages from ${fullRoutingKey}`);
}
/**
* General channel configuration
*
* @param {amqp.ConfirmChannel} channel
*/
private async commonChannelSetup(channel: amqp.ConfirmChannel) {
await Promise.all([
channel.assertExchange('events_gateway', 'fanout', { durable: true, autoDelete: false }),
channel.assertExchange('events_distributor', 'topic', { durable: true, autoDelete: false }),
channel.bindExchange('events_distributor', 'events_gateway', '#'),
]);
}
private async getPublisherChannelWrapper(): Promise<AmqpManager.ChannelWrapper> {
if (!this.publisherChannelWrapper) {
this.publisherChannelWrapper = await this.connection.createChannel({
json: true,
setup: async (channel: amqp.ConfirmChannel) => {
await this.commonChannelSetup(channel);
},
});
}
return this.publisherChannelWrapper;
}
} |
good job! |
Thanks for sharing your ideas @ruscon. I believe that |
@kamilmysliwiec ok, I'll add soon in my PR. |
@kamilmysliwiec аdded client = new ClientRMQ({
urls: [`amqp://login:password@host`],
queue: 'test',
queueOptions: { durable: false }
}); Client and Server now reconnects if disconnected. |
Sounds great @AlariCode, i'll review soon |
@kamilmysliwiec Hey, interested in this one as we might use it at work. Do you have any estimate when this will be ready? or even a pre-release to play around with 😉 |
hey @juristr, |
Alright. Do you plan to do prerelease versions? Or what would be the best way to integrate with RabbitMQ. We're currently having a simple node script for doing so, but I'd like to try it with NestJS as it's for sure more future proof. |
@juristr you can play with this package https://github.com/AlariCode/nestjs-rmq. This is the same implementation (even the same code) but as a Strategy. We use it in our production projects before 5.4.0. |
@AlariCode great, gonna try it 👍 |
@AlariCode How about Also why is noAck defaults to true? What if the message is not delivered properly? This is a great contribution! I'm sure lots of community members were waiting for this for a long time. |
@rychkog if you use any persistent message broker (like STAN or Kafka streams), I would recommend to not use it as a microservice strategy, but rather together with the Execution Context feature (+ manually setup all subscriptions) @AlariCode
you can see all diffs here: |
@kamilmysliwiec Could you, please, elaborate on this more - why the persistent message broker in the microservice strategy is not recommended? |
https://docs.nestjs.com/microservices/basics Generally, Nest emits additional messages to provide an efficient request-response cycle, for example, it sends a message to indicate whether stream completes already and "client side" should be notified or not. Eventually, it would create a redundancy in your persistent logs by adding either not really meaningful |
Thanks, I see! |
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
PR Checklist
Please check if your PR fulfills the following requirements:
PR Type
What kind of change does this PR introduce?
What is the current behavior?
No RabbitMQ transport.
What is the new behavior?
Added RabbitMQ transport.
Does this PR introduce a breaking change?
Other information
I ❤️ NestJS and use it in our company in production. But we use RabbitMQ. It is one of popular message queue engine. It would be great if RQM transport will become part of NestJS. It uses RPC pattern, in which we have one main queue which server (one or more instance) listens. When client sends message to main queue, it creates temporary queue in which it listens for an answer. This allow many clients communicate with server at ones.
You had old RQM example in your docs (as custom Strategy) that uses two queue, but we will run into issues if we have two different clients with different messages.
I covered code with test. Next step I'll write docs if pull request is useful for NestJS project.