Skip to content

Commit

Permalink
Reconnect policy (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
taskbit authored Jul 13, 2022
1 parent 6f2d7fa commit 10dcbef
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 28 deletions.
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import Participant, { ConnectionQuality } from './room/participant/Participant';
import { ParticipantTrackPermission } from './room/participant/ParticipantTrackPermission';
import RemoteParticipant from './room/participant/RemoteParticipant';
import Room, { ConnectionState, RoomState } from './room/Room';
import DefaultReconnectPolicy from './room/DefaultReconnectPolicy';
import LocalAudioTrack from './room/track/LocalAudioTrack';
import LocalTrack from './room/track/LocalTrack';
import LocalTrackPublication from './room/track/LocalTrackPublication';
Expand Down Expand Up @@ -51,4 +52,5 @@ export {
VideoQuality,
ConnectionQuality,
ElementInfo,
DefaultReconnectPolicy,
};
6 changes: 6 additions & 0 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
VideoCaptureOptions,
} from './room/track/options';
import { AdaptiveStreamSettings } from './room/track/types';
import { ReconnectPolicy } from './room/ReconnectPolicy';

/**
* Options for when creating a new room
Expand Down Expand Up @@ -53,6 +54,11 @@ export interface RoomOptions {
* experimental flag, introduce a delay before sending signaling messages
*/
expSignalLatency?: number;

/**
* policy to use when attempting to reconnect
*/
reconnectPolicy?: ReconnectPolicy;
}

/**
Expand Down
33 changes: 33 additions & 0 deletions src/room/DefaultReconnectPolicy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy';

const DEFAULT_RETRY_DELAYS_IN_MS = [
0,
300,
2 * 2 * 300,
3 * 3 * 300,
4 * 4 * 300,
5 * 5 * 300,
6 * 6 * 300,
7 * 7 * 300,
8 * 8 * 300,
9 * 9 * 300,
];

class DefaultReconnectPolicy implements ReconnectPolicy {
private readonly _retryDelays: number[];

constructor(retryDelays?: number[]) {
this._retryDelays = retryDelays !== undefined ? [...retryDelays] : DEFAULT_RETRY_DELAYS_IN_MS;
}

public nextRetryDelayInMs(context: ReconnectContext): number | null {
if (context.retryCount === this._retryDelays.length) return null;

const retryDelay = this._retryDelays[context.retryCount];
if (context.retryCount <= 1) return retryDelay;

return retryDelay + Math.random() * 1_000;
}
}

export default DefaultReconnectPolicy;
81 changes: 55 additions & 26 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import { ConnectionError, TrackInvalidError, UnexpectedConnectionState } from '.
import { EngineEvent } from './events';
import PCTransport from './PCTransport';
import { isFireFox, isWeb, sleep } from './utils';
import { RoomOptions } from '../options';
import { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy';
import DefaultReconnectPolicy from './DefaultReconnectPolicy';

const lossyDataChannel = '_lossy';
const reliableDataChannel = '_reliable';
const maxReconnectRetries = 10;
const minReconnectWait = 2 * 1000;
const maxReconnectDuration = 60 * 1000;
export const maxICEConnectTimeout = 15 * 1000;

enum PCState {
Expand Down Expand Up @@ -96,9 +97,13 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

private attemptingReconnect: boolean = false;

constructor() {
private reconnectPolicy: ReconnectPolicy;

constructor(private options: RoomOptions) {
super();
this.client = new SignalClient();
this.client.signalLatency = this.options.expSignalLatency;
this.reconnectPolicy = this.options.reconnectPolicy ?? new DefaultReconnectPolicy();
}

async join(
Expand Down Expand Up @@ -437,17 +442,38 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
// websocket reconnect behavior. if websocket is interrupted, and the PeerConnection
// continues to work, we can reconnect to websocket to continue the session
// after a number of retries, we'll close and give up permanently
private handleDisconnect = (connection: string) => {
private handleDisconnect = (connection: string, signalEvents: boolean = false) => {
if (this._isClosed) {
return;
}

log.debug(`${connection} disconnected`);
if (this.reconnectAttempts === 0) {
// only reset start time on the first try
this.reconnectStart = Date.now();
}

const delay = this.reconnectAttempts * this.reconnectAttempts * 300;
const disconnect = (duration: number) => {
log.info(
`could not recover connection after ${this.reconnectAttempts} attempts, ${duration}ms. giving up`,
);
this.emit(EngineEvent.Disconnected);
this.close();
};

const duration = Date.now() - this.reconnectStart;
const delay = this.getNextRetryDelay({
elapsedMs: duration,
retryCount: this.reconnectAttempts,
});

if (delay === null) {
disconnect(duration);
return;
}

log.debug(`reconnecting in ${delay}ms`);

setTimeout(async () => {
if (this._isClosed) {
return;
Expand All @@ -469,16 +495,17 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
try {
this.attemptingReconnect = true;
if (this.fullReconnectOnNext) {
await this.restartConnection();
await this.restartConnection(signalEvents);
} else {
await this.resumeConnection();
await this.resumeConnection(signalEvents);
}
this.reconnectAttempts = 0;
this.fullReconnectOnNext = false;
} catch (e) {
this.reconnectAttempts += 1;
let reconnectRequired = false;
let recoverable = true;
let requireSignalEvents = false;
if (e instanceof UnexpectedConnectionState) {
log.debug('received unrecoverable error', { error: e });
// unrecoverable
Expand All @@ -488,41 +515,43 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
reconnectRequired = true;
}

// when we flip from resume to reconnect, we need to reset reconnectAttempts
// this is needed to fire the right reconnecting events
// when we flip from resume to reconnect
// we need to fire the right reconnecting events
if (reconnectRequired && !this.fullReconnectOnNext) {
this.fullReconnectOnNext = true;
this.reconnectAttempts = 0;
}

const duration = Date.now() - this.reconnectStart;
if (this.reconnectAttempts >= maxReconnectRetries || duration > maxReconnectDuration) {
recoverable = false;
requireSignalEvents = true;
}

if (recoverable) {
this.handleDisconnect('reconnect');
this.handleDisconnect('reconnect', requireSignalEvents);
} else {
log.info(
`could not recover connection after ${maxReconnectRetries} attempts, ${duration}ms. giving up`,
);
this.emit(EngineEvent.Disconnected);
this.close();
disconnect(Date.now() - this.reconnectStart);
}
} finally {
this.attemptingReconnect = false;
}
}, delay);
};

private async restartConnection() {
private getNextRetryDelay(context: ReconnectContext) {
try {
return this.reconnectPolicy.nextRetryDelayInMs(context);
} catch (e) {
log.warn('encountered error in reconnect policy', { error: e });
}

// error in user code with provided reconnect policy, stop reconnecting
return null;
}

private async restartConnection(emitRestarting: boolean = false) {
if (!this.url || !this.token) {
// permanent failure, don't attempt reconnection
throw new UnexpectedConnectionState('could not reconnect, url or token not saved');
}

log.info(`reconnecting, attempt: ${this.reconnectAttempts}`);
if (this.reconnectAttempts === 0) {
if (emitRestarting || this.reconnectAttempts === 0) {
this.emit(EngineEvent.Restarting);
}

Expand Down Expand Up @@ -550,7 +579,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
this.emit(EngineEvent.Restarted, joinResponse);
}

private async resumeConnection(): Promise<void> {
private async resumeConnection(emitResuming: boolean = false): Promise<void> {
if (!this.url || !this.token) {
// permanent failure, don't attempt reconnection
throw new UnexpectedConnectionState('could not reconnect, url or token not saved');
Expand All @@ -561,14 +590,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}

log.info(`resuming signal connection, attempt ${this.reconnectAttempts}`);
if (this.reconnectAttempts === 0) {
if (emitResuming || this.reconnectAttempts === 0) {
this.emit(EngineEvent.Resuming);
}

try {
await this.client.reconnect(this.url, this.token);
} catch (e) {
throw new SignalReconnectError();
throw new SignalReconnectError(e);
}
this.emit(EngineEvent.SignalResumed);

Expand Down
25 changes: 25 additions & 0 deletions src/room/ReconnectPolicy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/** Controls reconnecting of the client */
export interface ReconnectPolicy {
/** Called after disconnect was detected
*
* @returns {number | null} Amount of time in milliseconds to delay the next reconnect attempt, `null` signals to stop retrying.
*/
nextRetryDelayInMs(context: ReconnectContext): number | null;
}

export interface ReconnectContext {
/**
* Number of failed reconnect attempts
*/
readonly retryCount: number;

/**
* Elapsed amount of time in milliseconds since the disconnect.
*/
readonly elapsedMs: number;

/**
* Reason for retrying
*/
readonly retryReason?: Error;
}
3 changes: 1 addition & 2 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
return;
}

this.engine = new RTCEngine();
this.engine = new RTCEngine(this.options);

this.engine.client.signalLatency = this.options.expSignalLatency;
this.engine.client.onParticipantUpdate = this.handleParticipantUpdates;
this.engine.client.onRoomUpdate = this.handleRoomUpdate;
this.engine.client.onSpeakersChanged = this.handleSpeakersChanged;
Expand Down

0 comments on commit 10dcbef

Please sign in to comment.