Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

React on network online/offline events #204

Merged
merged 2 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 93 additions & 1 deletion src/centrifuge.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Centrifuge } from './centrifuge'
import { DisconnectedContext, Error as CentrifugeError, PublicationContext, TransportName, UnsubscribedContext } from './types';
import { disconnectedCodes, unsubscribedCodes } from './codes';
import { disconnectedCodes, unsubscribedCodes, connectingCodes } from './codes';

import WebSocket from 'ws';
import EventSource from 'eventsource';
Expand Down Expand Up @@ -156,3 +156,95 @@ test.each(transportCases)("%s: rpc buffered till connected", async (transport, e
c.disconnect();
expect(rpcErr.code).toStrictEqual(108);
});

test.each(transportCases)("%s: handles offline/online events", async (transport, endpoint) => {
const networkEventTarget = new EventTarget();

const c = new Centrifuge([{
transport: transport as TransportName,
endpoint: endpoint,
}], {
websocket: WebSocket,
fetch: fetch,
eventsource: EventSource,
readableStream: ReadableStream,
emulationEndpoint: 'http://localhost:8000/emulation',
networkEventTarget: networkEventTarget,
});

let connectingCalled: any;
const p = new Promise<DisconnectedContext>((resolve, _) => {
connectingCalled = resolve;
})

c.on('connecting', (ctx) => {
if (ctx.code == connectingCodes.transportClosed) {
connectingCalled(ctx);
}
})

c.connect();
await c.ready(5000);
expect(c.state).toBe(Centrifuge.State.Connected);

const offlineEvent = new Event('offline', { bubbles: true });
networkEventTarget.dispatchEvent(offlineEvent);

const ctx = await p;
expect(c.state).toBe(Centrifuge.State.Connecting);
expect(ctx.code).toBe(connectingCodes.transportClosed);

const onlineEvent = new Event('online', { bubbles: true });
networkEventTarget.dispatchEvent(onlineEvent);

let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
})
c.on('disconnected', (ctx) => {
disconnectCalled(ctx);
})

await c.ready(5000);
expect(c.state).toBe(Centrifuge.State.Connected);

c.disconnect();
await disconnectedPromise;
expect(c.state).toBe(Centrifuge.State.Disconnected);
});

test.each(transportCases.slice(0, 1))("%s: not connecting on online in disconnected state", async (transport, endpoint) => {
const networkEventTarget = new EventTarget();

const c = new Centrifuge([{
transport: transport as TransportName,
endpoint: endpoint,
}], {
websocket: WebSocket,
fetch: fetch,
eventsource: EventSource,
readableStream: ReadableStream,
emulationEndpoint: 'http://localhost:8000/emulation',
networkEventTarget: networkEventTarget,
});

c.connect();
await c.ready(5000);
expect(c.state).toBe(Centrifuge.State.Connected);

let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
})
c.on('disconnected', (ctx) => {
disconnectCalled(ctx);
})

c.disconnect();
await disconnectedPromise;
expect(c.state).toBe(Centrifuge.State.Disconnected);

const onlineEvent = new Event('online', { bubbles: true });
networkEventTarget.dispatchEvent(onlineEvent);
expect(c.state).toBe(Centrifuge.State.Disconnected);
});
36 changes: 35 additions & 1 deletion src/centrifuge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const defaults: Options = {
maxReconnectDelay: 20000,
timeout: 5000,
maxServerPingDelay: 10000,
networkEventTarget: null,
}

interface serverSubscription {
Expand All @@ -65,6 +66,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
private _transportWasOpen: boolean;
private _transport?: any;
private _transportClosed: boolean;
private _reconnecting: boolean;
private _reconnectTimeout?: null | ReturnType<typeof setTimeout> = null;
private _reconnectAttempts: number;
private _client: null;
Expand Down Expand Up @@ -108,6 +110,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
this._transportClosed = true;
this._encoder = null;
this._decoder = null;
this._reconnecting = false;
this._reconnectTimeout = null;
this._reconnectAttempts = 0;
this._client = null;
Expand Down Expand Up @@ -448,6 +451,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli

private _setState(newState: State) {
if (this.state !== newState) {
this._reconnecting = false;
const oldState = this.state;
this.state = newState;
this.emit('state', { newState, oldState });
Expand All @@ -472,6 +476,31 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
return ++this._commandId;
}

private _setNetworkEvents() {
let eventTarget: EventTarget | null = null;
if (this._config.networkEventTarget !== null) {
eventTarget = this._config.networkEventTarget;
} else if (typeof globalThis.addEventListener !== 'undefined') {
eventTarget = globalThis as EventTarget;
}
if (eventTarget) {
eventTarget.addEventListener('offline', () => {
this._debug('offline event triggered');
if (this.state === State.Connected && this._transport && !this._transportClosed) {
this._transportClosed = true;
this._transport.close();
}
});
eventTarget.addEventListener('online', () => {
this._debug('online event triggered');
if (this.state === State.Connecting) {
this._clearReconnectTimeout();
this._startReconnecting();
}
});
}
}

private _getReconnectDelay() {
const delay = backoff(this._reconnectAttempts, this._config.minReconnectDelay, this._config.maxReconnectDelay);
this._reconnectAttempts += 1;
Expand Down Expand Up @@ -812,6 +841,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
delay = 0;
}
self._debug('reconnect after ' + delay + ' milliseconds');
self._reconnecting = false;
self._reconnectTimeout = setTimeout(() => {
self._startReconnecting();
}, delay);
Expand Down Expand Up @@ -845,10 +875,12 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
}

private _startReconnecting() {
if (!this._isConnecting()) {
if (!this._isConnecting() || this._reconnecting) {
return;
}

this._reconnecting = true;

const needTokenRefresh = this._refreshRequired || (!this._token && this._config.getToken !== null);
if (!needTokenRefresh) {
this._initializeTransport();
Expand Down Expand Up @@ -881,6 +913,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli
});
const delay = self._getReconnectDelay();
self._debug('error on connection token refresh, reconnect after ' + delay + ' milliseconds', e);
self._reconnecting = false;
self._reconnectTimeout = setTimeout(() => {
self._startReconnecting();
}, delay);
Expand Down Expand Up @@ -1296,6 +1329,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli

this._client = result.client;
this._setState(State.Connected);
this._setNetworkEvents();

if (this._refreshTimeout) {
clearTimeout(this._refreshTimeout);
Expand Down
13 changes: 10 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ export interface TransportEndpoint {

/** Options for Centrifuge client. */
export interface Options {
/** select protocol to use. Note that to use Protobuf protocol you need to use CentrifugeProtobuf class. */
/** select protocol to use. Note that to use Protobuf protocol you need to use
* CentrifugeProtobuf class. */
protocol: 'json' | 'protobuf';
/** allows enabling debug mode */
debug: boolean;
Expand All @@ -90,7 +91,8 @@ export interface Options {
getToken: null | ((ctx: ConnectionTokenContext) => Promise<string>);
/** data to send to a server with connect command */
data: any | null;
/** name of client - it's not a unique name of each connection, it's sth to identify from where client connected */
/** name of client - it's not a unique name of each connection, it's sth to identify
* from where client connected */
name: string;
/** version of client */
version: string;
Expand All @@ -102,7 +104,8 @@ export interface Options {
timeout: number;
/** maximum delay of server pings to detect broken connection in milliseconds */
maxServerPingDelay: number;
/** provide custom WebSocket constructor, useful for NodeJS env where WebSocket is not available globally */
/** provide custom WebSocket constructor, useful for NodeJS env where WebSocket is not
* available globally */
websocket: any | null;
/** provide shim for fetch implementation */
fetch: any | null;
Expand All @@ -116,6 +119,10 @@ export interface Options {
sockjsOptions: SockjsOptions;
/** which emulation endpoint to use */
emulationEndpoint: string;
/** EventTarget for network online/offline events, in browser environment
* Centrifuge uses global window online/offline events automatically
* by default. */
networkEventTarget: EventTarget | null;
}

export interface SockjsOptions {
Expand Down