Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add websocket fallback #6

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions NchanSubscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ var ughbind = (Function.prototype.bind
);

var sharedSubscriberTable={};
var conn = 0; // counter to save total connection made by one type of connector.
var maxConn = 3; // Switch to different protocol if it is reached to conn reached to maxConn.
var priorityIndex = 0; // Counter to switch to another protocol type.
var connError = false; // This helps to differentiate the errors and timeout of connection.
var disconnectError = false;
var time = 5000; // Hold time after iterating through every connection type.
var timeFactor = 1.3; // Increase hold time by timefactor.
var maxWaitTime = 120000; // Put cap on hold time upto 2 mins.

"use strict";
function NchanSubscriber(url, opt) {
Expand All @@ -98,6 +106,18 @@ function NchanSubscriber(url, opt) {
}
this.desiredTransport = opt.subscriber;

if(opt.maxTry) {
maxConn = opt.maxTry;
}
if(opt.holdTime) {
time = opt.holdTime;
}
if(opt.timeFactor) {
timeFactor = opt.timeFactor;
}
if(opt.maxWaitTime) {
maxWaitTime = opt.maxWaitTime;
}
if(opt.shared) {
if (!("localStorage" in global)) {
throw "localStorage unavailable for use in shared NchanSubscriber";
Expand Down Expand Up @@ -420,9 +440,12 @@ function NchanSubscriber(url, opt) {
Emitter(NchanSubscriber.prototype);

NchanSubscriber.prototype.initializeTransport = function(possibleTransports) {
if(possibleTransports) {
this.desiredTransport = possibleTransports;
}
// This is commented out because the original client is not sending any data.
// We are reinitializing our client to unable the websocket fallback.
// this.desiredTransport contains all the mentioned type of connection protocols.
// if(possibleTransports) {
// this.desiredTransport = possibleTransports;
// }
if(this.shared && this.shared.role == "slave") {
this.transport = new this.SubscriberClass["__slave"](ughbind(this.emit, this)); //try it
}
Expand All @@ -438,12 +461,9 @@ NchanSubscriber.prototype.initializeTransport = function(possibleTransports) {
}, this);

var i;
if(this.desiredTransport) {
for(i=0; i<this.desiredTransport.length; i++) {
if(tryInitializeTransport(this.desiredTransport[i])) {
break;
if(possibleTransports) {
if(tryInitializeTransport(possibleTransports)) {
}
}
}
else {
for(i in this.SubscriberClass) {
Expand Down Expand Up @@ -532,16 +552,41 @@ NchanSubscriber.prototype.start = function() {
}
}
else {
if(!this.transport) {
this.initializeTransport();
// First it will select one protocol and try to establish connection upto maxConn type.
// If it fails alll the time then it will switch to next protocol.
// If all the configured protocols fail to establish connection then client will go on hold state.
// It will wait upto holdtime and then it will again try to make connection using configured protocols.
if (!connError && disconnectError) {
conn = 0;
}
connError = false;
disconnectError = false;
if (conn<maxConn){
if(!this.transport) {
this.initializeTransport(this.desiredTransport[priorityIndex]);
}
conn++;
this.transport.listen(this.url, this.lastMessageId);
this.running = true;
delete this.starting;
}else{
conn = 0
priorityIndex++;
this.transport = null
if (priorityIndex<this.desiredTransport.length){
this.initializeTransport(this.desiredTransport[priorityIndex]);
this.transport.listen(this.url, this.lastMessageId);
this.running = true;
delete this.starting;
}else{
priorityIndex = 0;
setTimeout(this.start.bind(this), time);
time = Math.min(time * timeFactor, maxWaitTime);
}
}
this.transport.listen(this.url, this.lastMessageId);
this.running = true;
delete this.starting;
}
return this;
};

NchanSubscriber.prototype.stop = function() {
if(!this.running)
throw "Can't stop NchanSubscriber, it's not running.";
Expand Down Expand Up @@ -636,11 +681,13 @@ NchanSubscriber.prototype.SubscriberClass = {

l.onerror = ughbind(function(evt) {
//console.log("error", evt);
connError = true;
this.emit('error', evt, l);
delete this.listener;
}, this);

l.onclose = ughbind(function(evt) {
disconnectError = true;
this.emit('__disconnect', evt);
delete this.listener;
}, this);
Expand Down Expand Up @@ -744,18 +791,22 @@ NchanSubscriber.prototype.SubscriberClass = {
this.reqStartTime = new Date().getTime();
this.req = nanoajax.ajax({url: this.url, headers: this.headers}, requestCallback);
}
else if((code == 0 && response_text == "Error" && req.readyState == 4) || (code === null && response_text != "Abort")) {
//console.log("abort!!!");
else if(code == 408) {
this.reqStartTime = new Date().getTime();
this.req = nanoajax.ajax({url: this.url, headers: this.headers}, requestCallback);
}
else if((code == 0 && response_text == "Error" && req.readyState == 4) || (code === null && response_text != "Abort")) {
this.emit("__disconnect", code || 0, response_text);
delete this.req;
}
else if(code !== null) {
//HTTP error
connError = true;
this.emit("error", code, response_text);
delete this.req;
}
else {
//don't care about abortions
//don't care about abortions
delete this.req;
this.emit("__disconnect");
//console.log("abort!");
Expand Down