Skip to content

Commit

Permalink
feat: configureable amqp queue name
Browse files Browse the repository at this point in the history
  • Loading branch information
KagChi committed Jul 4, 2022
1 parent 446a649 commit bdb0a30
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
4 changes: 3 additions & 1 deletion .env_example
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ TOTAL_CLUSTERS = 6
QUEUE_NAME = "scheduled-tasks"
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_PASSWORD =
REDIS_PASSWORD =
AMQP_HOST = "amqp://guest:guest@localhost"
AMQP_QUEUE_NAME = "scheduled-tasks"
8 changes: 4 additions & 4 deletions src/Structures/TaskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class TaskManager extends EventEmitter {
public amqpSender!: RoutingPublisher<string, Record<string, any>>;
public amqpReceiver!: RpcSubscriber<string, Record<string, any>>;
public amqpReceiverCluster!: RpcSubscriber<string, Record<string, any>>;
public bull = new Bull(`${process.env.QUEUE_NAME!}-cluster-${this.clusterId}`, {
public bull = new Bull(`${process.env.QUEUE_NAME ?? "scheduled-tasks"}-cluster-${this.clusterId}`, {
redis: {
host: process.env.REDIS_HOST!,
port: parseInt(process.env.REDIS_PORT!),
Expand Down Expand Up @@ -65,20 +65,20 @@ export class TaskManager extends EventEmitter {
this.amqpReceiver = new RpcSubscriber(channel);
this.amqpReceiverCluster = new RpcSubscriber(channel);
await this.amqpReceiver.init({
name: "scheduled-tasks.send",
name: `${process.env.AMQP_QUEUE_NAME ?? "scheduled-tasks"}.send`,
cb: async message => {
const isJobReady = await this.bull.isReady();
return handleJob(message, isJobReady, this.clusterId, this);
}
});
await this.amqpReceiverCluster.init({
name: `scheduled-tasks.send-cluster-${this.clusterId}`,
name: `${process.env.AMQP_QUEUE_NAME ?? "scheduled-tasks"}.send-cluster-${this.clusterId}`,
cb: async message => {
const isJobReady = await this.bull.isReady();
return handleJob(message, isJobReady, this.clusterId, this);
}
});
await this.amqpSender.init({ name: "scheduled-tasks.recv", durable: true, exchangeType: "topic", useExchangeBinding: true });
await this.amqpSender.init({ name: `${process.env.AMQP_QUEUE_NAME ?? "scheduled-tasks"}.recv`, durable: true, exchangeType: "topic", useExchangeBinding: true });
this.stores.register(new ListenerStore());
await Promise.all([...this.stores.values()].map((store: Store<Piece>) => store.loadAll()));
this.emit("ready", this);
Expand Down

0 comments on commit bdb0a30

Please sign in to comment.