Skip to content

Commit

Permalink
Merge pull request #72 from Nathan-Schwartz/amqp-transporter
Browse files Browse the repository at this point in the history
AMQP Transporter
  • Loading branch information
icebob authored Aug 16, 2017
2 parents ec9d8ec + 4dfbce5 commit 5c58752
Show file tree
Hide file tree
Showing 21 changed files with 1,441 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/service-broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ class ServiceBroker {
TransporterClass = Transporters.MQTT;
else if (opt.startsWith("redis://"))
TransporterClass = Transporters.Redis;
else if (opt.startsWith("amqp://"))
TransporterClass = Transporters.AMQP;

if (TransporterClass)
return new TransporterClass(opt);
Expand Down
48 changes: 29 additions & 19 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Transit {
};

this.connected = false;
this.disconnecting = true;
this.disconnecting = false;

if (this.tx)
this.tx.init(this, this.messageHandler.bind(this), this.afterConnect.bind(this));
Expand Down Expand Up @@ -175,30 +175,33 @@ class Transit {
* @memberOf Transit
*/
makeSubscriptions() {
this.subscribing = Promise.all([
// Subscribe to broadcast events
this.subscribe(P.PACKET_EVENT),

// Subscribe to broadcast events
this.subscribe(P.PACKET_EVENT);

// Subscribe to requests
this.subscribe(P.PACKET_REQUEST, this.nodeID);
// Subscribe to requests
this.subscribe(P.PACKET_REQUEST, this.nodeID),

// Subscribe to node responses of requests
this.subscribe(P.PACKET_RESPONSE, this.nodeID);
// Subscribe to node responses of requests
this.subscribe(P.PACKET_RESPONSE, this.nodeID),

// Discover handler
this.subscribe(P.PACKET_DISCOVER);
// Discover handler
this.subscribe(P.PACKET_DISCOVER),

// NodeInfo handler
this.subscribe(P.PACKET_INFO); // Broadcasted INFO. If a new node connected
this.subscribe(P.PACKET_INFO, this.nodeID); // Response INFO to DISCOVER packet
// NodeInfo handler
this.subscribe(P.PACKET_INFO), // Broadcasted INFO. If a new node connected
this.subscribe(P.PACKET_INFO, this.nodeID), // Response INFO to DISCOVER packet

// Disconnect handler
this.subscribe(P.PACKET_DISCONNECT);
// Disconnect handler
this.subscribe(P.PACKET_DISCONNECT),

// Heart-beat handler
this.subscribe(P.PACKET_HEARTBEAT);

return Promise.resolve();
// Heart-beat handler
this.subscribe(P.PACKET_HEARTBEAT),
])
.then(() => {
this.subscribing = null;
});
return this.subscribing;
}

/**
Expand Down Expand Up @@ -499,6 +502,13 @@ class Transit {
* @memberOf Transit
*/
publish(packet) {
if (this.subscribing) {
return this.subscribing
.then(() => {
this.stat.packets.sent = this.stat.packets.sent + 1;
return this.tx.publish(packet);
});
}
this.stat.packets.sent = this.stat.packets.sent + 1;
return this.tx.publish(packet);
}
Expand Down
Loading

0 comments on commit 5c58752

Please sign in to comment.