Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix negotiations by better protecting access to PeerConnection #1371

Merged
merged 5 commits into from
Mar 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions erizo_controller/erizoClient/src/Room.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,20 @@ const Room = (altIo, altConnectionHelpers, altConnectionManager, specInput) => {
that.dispatchEvent(evt2);
};

const dispatchStreamUnsubscribed = (streamInput) => {
const maybeDispatchStreamUnsubscribed = (streamInput) => {
const stream = streamInput;
Logger.info('Stream unsubscribed');
const evt2 = StreamEvent({ type: 'stream-unsubscribed', stream });
that.dispatchEvent(evt2);
Logger.debug(`maybeDispatchStreamUnsubscribed - unsubscribe id ${stream.getID()}`, stream.unsubscribing);
if (stream && stream.unsubscribing.callbackReceived && stream.unsubscribing.pcEventReceived) {
Logger.info(`Dispatching Stream unsubscribed ${stream.getID()}`);
removeStream(stream);
delete stream.failed;
const evt2 = StreamEvent({ type: 'stream-unsubscribed', stream });
stream.unsubscribing.callbackReceived = false;
stream.unsubscribing.pcEventReceived = false;
that.dispatchEvent(evt2);
} else {
Logger.debug(`Not dispatching stream unsubscribed yet ${stream.getID()}`);
}
};

const getP2PConnectionOptions = (stream, peerSocket) => {
Expand Down Expand Up @@ -176,7 +185,9 @@ const Room = (altIo, altConnectionHelpers, altConnectionManager, specInput) => {
const onRemoteStreamRemovedListener = (label) => {
that.remoteStreams.forEach((stream) => {
if (!stream.local && stream.getLabel() === label) {
dispatchStreamUnsubscribed(stream);
const streamToRemove = stream;
streamToRemove.unsubscribing.pcEventReceived = true;
maybeDispatchStreamUnsubscribed(streamToRemove);
}
});
};
Expand Down Expand Up @@ -271,6 +282,8 @@ const Room = (altIo, altConnectionHelpers, altConnectionManager, specInput) => {

if (stream && stream.pc && !stream.failed) {
stream.pc.processSignalingMessage(arg.mess);
} else {
Logger.debug('Failed applying a signaling message, stream is no longer present');
}
};

Expand Down Expand Up @@ -852,9 +865,9 @@ const Room = (altIo, altConnectionHelpers, altConnectionManager, specInput) => {
callback(undefined, error);
return;
}
removeStream(stream);
delete stream.failed;
callback(true);
stream.unsubscribing.callbackReceived = true;
maybeDispatchStreamUnsubscribed(stream);
}, () => {
Logger.error('Error calling unsubscribe.');
});
Expand Down
4 changes: 4 additions & 0 deletions erizo_controller/erizoClient/src/Stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const Stream = (altConnectionHelpers, specInput) => {
that.desktopStreamId = spec.desktopStreamId;
that.audioMuted = false;
that.videoMuted = false;
that.unsubscribing = {
callbackReceived: false,
pcEventReceived: false,
};
that.p2p = false;
that.ConnectionHelpers =
altConnectionHelpers === undefined ? ConnectionHelpers : altConnectionHelpers;
Expand Down
51 changes: 51 additions & 0 deletions erizo_controller/erizoClient/src/utils/FunctionQueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
class FunctionQueue {
constructor() {
this._enqueuing = false;
this._queuedArgs = [];
}

protectFunction(protectedFunction) {
return this._protectedFunction.bind(this, protectedFunction);
}

isEnqueueing() {
return this._enqueuing;
}

startEnqueuing() {
this._enqueuing = true;
}

stopEnqueuing() {
this._enqueuing = false;
}

nextInQueue() {
if (this._queuedArgs.length > 0) {
const { protectedFunction, args } = this._queuedArgs.shift();
protectedFunction(...args);
}
}

dequeueAll() {
const queuedArgs = this._queuedArgs;
this._queuedArgs = [];
queuedArgs.forEach(({ protectedFunction, args }) => {
protectedFunction(...args);
});
}

_protectedFunction(protectedFunction, ...args) {
if (this.isEnqueueing()) {
this._enqueue(protectedFunction, ...args);
return;
}
protectedFunction(...args);
}

_enqueue(protectedFunction, ...args) {
this._queuedArgs.push({ protectedFunction, args });
}
}

export default FunctionQueue;
136 changes: 82 additions & 54 deletions erizo_controller/erizoClient/src/webrtc-stacks/BaseStack.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
/* global RTCSessionDescription, RTCIceCandidate, RTCPeerConnection */
// eslint-disable-next-line
import SemanticSdp from '../../../common/semanticSdp/SemanticSdp';
import Setup from '../../../common/semanticSdp/Setup';

import SdpHelpers from '../utils/SdpHelpers';
import Logger from '../utils/Logger';
import FunctionQueue from '../utils/FunctionQueue';

const BaseStack = (specInput) => {
const that = {};
const specBase = specInput;
const offerQueue = [];
const negotiationQueue = new FunctionQueue();
const firstOfferQueue = new FunctionQueue();
let firstOfferCreated = false;
let localDesc;
let remoteDesc;
let localSdp;
let remoteSdp;
let processOffer;
let isNegotiating = false;
let latestSessionVersion = -1;

Logger.info('Starting Base stack', specBase);
Expand Down Expand Up @@ -92,17 +94,6 @@ const BaseStack = (specInput) => {
}
};

const checkOfferQueue = () => {
if (!isNegotiating && offerQueue.length > 0) {
const args = offerQueue.shift();
if (args[0] === 'local') {
that.createOffer(args[1], args[2], args[3]);
} else {
processOffer(args[1]);
}
}
};

const setLocalDescForOffer = (isSubscribe, streamId, sessionDescription) => {
localDesc = sessionDescription;
if (!isSubscribe) {
Expand All @@ -122,6 +113,7 @@ const BaseStack = (specInput) => {

const setLocalDescForAnswer = (sessionDescription) => {
localDesc = sessionDescription;
localDesc.type = 'answer';
localSdp = SemanticSdp.SDPInfo.processString(localDesc.sdp);
SdpHelpers.setMaxBW(localSdp, specBase);
localDesc.sdp = localSdp.toString();
Expand All @@ -132,41 +124,56 @@ const BaseStack = (specInput) => {
config: { maxVideoBW: specBase.maxVideoBW },
});
Logger.info('Setting local description', localDesc);
that.peerConnection.setLocalDescription(localDesc).then(() => {
isNegotiating = false;
checkOfferQueue();
successCallback();
}).catch(errorCallback);
Logger.debug('processOffer - Local Description', localDesc.type, localDesc.sdp);
return that.peerConnection.setLocalDescription(localDesc);
};

const configureLocalSdpAsOffer = () => {
localDesc.type = 'offer';
localSdp = SemanticSdp.SDPInfo.processString(localDesc.sdp);
SdpHelpers.setMaxBW(localSdp, specBase);

localSdp.medias.forEach((media) => {
if (media.getSetup() !== Setup.ACTPASS) {
media.setSetup(Setup.ACTPASS);
}
});
localDesc.sdp = localSdp.toString();
that.localSdp = localSdp;
};

processOffer = (message) => {
const processOffer = negotiationQueue.protectFunction((message) => {
const msg = message;
if (isNegotiating) {
offerQueue.push(['remote', message]);
return;
}
remoteSdp = SemanticSdp.SDPInfo.processString(msg.sdp);

const sessionVersion = remoteSdp && remoteSdp.origin && remoteSdp.origin.sessionVersion;
if (latestSessionVersion >= sessionVersion) {
Logger.warning(`message: processOffer discarding old sdp sessionVersion: ${sessionVersion}, latestSessionVersion: ${latestSessionVersion}`);
return;
}
isNegotiating = true;
negotiationQueue.startEnqueuing();
latestSessionVersion = sessionVersion;

SdpHelpers.setMaxBW(remoteSdp, specBase);
msg.sdp = remoteSdp.toString();
that.remoteSdp = remoteSdp;
that.peerConnection.setRemoteDescription(msg).then(() => {
that.peerConnection.createAnswer(that.mediaConstraints)
.then(setLocalDescForAnswer)
.catch(errorCallback.bind(null, 'createAnswer', undefined));
specBase.remoteDescriptionSet = true;
}).catch(errorCallback.bind(null, 'process Offer', undefined));
};
Logger.debug('processOffer - Remote Description', msg.type, msg.sdp);

const processAnswer = (message) => {
that.peerConnection.setRemoteDescription(msg)
.then(() => {
specBase.remoteDescriptionSet = true;
return that.peerConnection.createAnswer(that.mediaConstraints);
})
.then(setLocalDescForAnswer.bind(this))
.then(successCallback.bind(this))
.catch(errorCallback.bind(this, 'createAnswer'))
.then(() => {
negotiationQueue.stopEnqueuing();
negotiationQueue.nextInQueue();
});
});

const processAnswer = negotiationQueue.protectFunction((message) => {
const msg = message;

remoteSdp = SemanticSdp.SDPInfo.processString(msg.sdp);
Expand All @@ -175,21 +182,26 @@ const BaseStack = (specInput) => {
Logger.warning(`processAnswer discarding old sdp, sessionVersion: ${sessionVersion}, latestSessionVersion: ${latestSessionVersion}`);
return;
}
Logger.info('Set remote and local description');
negotiationQueue.startEnqueuing();
latestSessionVersion = sessionVersion;
Logger.info('Set remote and local description');

SdpHelpers.setMaxBW(remoteSdp, specBase);
that.setStartVideoBW(remoteSdp);
that.setHardMinVideoBW(remoteSdp);

msg.sdp = remoteSdp.toString();
Logger.debug('Remote Description', msg.sdp);
Logger.debug('Local Description', localDesc.sdp);

configureLocalSdpAsOffer();

Logger.debug('processAnswer - Remote Description', msg.type, msg.sdp);
Logger.debug('processAnswer - Local Description', msg.type, localDesc.sdp);
that.remoteSdp = remoteSdp;

remoteDesc = msg;
that.peerConnection.setLocalDescription(localDesc).then(() => {
that.peerConnection.setRemoteDescription(new RTCSessionDescription(msg)).then(() => {
that.peerConnection.setLocalDescription(localDesc)
.then(() => that.peerConnection.setRemoteDescription(new RTCSessionDescription(msg)))
.then(() => {
specBase.remoteDescriptionSet = true;
Logger.info('Candidates to be added: ', specBase.remoteCandidates.length,
specBase.remoteCandidates);
Expand All @@ -202,11 +214,16 @@ const BaseStack = (specInput) => {
// IMPORTANT: preserve ordering of candidates
specBase.callback({ type: 'candidate', candidate: specBase.localCandidates.shift() });
}
isNegotiating = false;
checkOfferQueue();
}).catch(errorCallback.bind(null, 'processAnswer', undefined));
}).catch(errorCallback.bind(null, 'processAnswer', undefined));
};
})
.catch(errorCallback.bind(null, 'processAnswer', undefined))
.then(() => {
firstOfferCreated = true;
firstOfferQueue.stopEnqueuing();
firstOfferQueue.dequeueAll();
negotiationQueue.stopEnqueuing();
negotiationQueue.nextInQueue();
});
});

const processNewCandidate = (message) => {
const msg = message;
Expand Down Expand Up @@ -334,23 +351,34 @@ const BaseStack = (specInput) => {
}
};

that.createOffer = (isSubscribe = false, forceOfferToReceive = false, streamId = '') => {
const _createOfferOnPeerConnection = negotiationQueue.protectFunction((isSubscribe = false, streamId = '') => {
negotiationQueue.startEnqueuing();
Logger.debug('Creating offer', that.mediaConstraints, streamId);
that.peerConnection.createOffer(that.mediaConstraints)
.then(setLocalDescForOffer.bind(null, isSubscribe, streamId))
.catch(errorCallback.bind(null, 'Create Offer', undefined))
.then(() => {
negotiationQueue.stopEnqueuing();
negotiationQueue.nextInQueue();
});
});

// We need to protect it against calling multiple times to createOffer.
// Otherwise it could change the ICE credentials before calling setLocalDescription
// the first time in Chrome.
that.createOffer = firstOfferQueue.protectFunction((isSubscribe = false, forceOfferToReceive = false, streamId = '') => {
if (!firstOfferCreated) {
firstOfferQueue.startEnqueuing();
}

if (!isSubscribe && !forceOfferToReceive) {
that.mediaConstraints = {
offerToReceiveVideo: false,
offerToReceiveAudio: false,
};
}
if (isNegotiating) {
offerQueue.push(['local', isSubscribe, forceOfferToReceive, streamId]);
return;
}
isNegotiating = true;
Logger.debug('Creating offer', that.mediaConstraints, streamId);
that.peerConnection.createOffer(that.mediaConstraints)
.then(setLocalDescForOffer.bind(null, isSubscribe, streamId))
.catch(errorCallback.bind(null, 'Create Offer', undefined));
};
_createOfferOnPeerConnection(isSubscribe, streamId);
});

that.addStream = (stream) => {
that.peerConnection.addStream(stream);
Expand Down
14 changes: 10 additions & 4 deletions erizo_controller/erizoJS/erizoJSController.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,21 @@ exports.ErizoJSController = (threadPool, ioThreadPool) => {
const connection = node.connection;
log.debug(`message: closeNode, clientId: ${node.clientId}, streamId: ${node.streamId}`);

node.close();
const closePromise = node.close();

const client = clients.get(clientId);
if (client === undefined) {
log.debug('message: trying to close node with no associated client,' +
`clientId: ${clientId}, streamId: ${node.streamId}`);
return;
return Promise.resolve();
}

const remainingConnections = client.maybeCloseConnection(connection.id);
if (remainingConnections === 0) {
log.debug(`message: Removing empty client from list, clientId: ${client.id}`);
clients.delete(client.id);
}
return closePromise;
};

that.rovMessage = (args, callback) => {
Expand Down Expand Up @@ -307,10 +308,15 @@ exports.ErizoJSController = (threadPool, ioThreadPool) => {
const subscriber = publisher.getSubscriber(clientId);
log.info(`message: removing subscriber, streamId: ${subscriber.streamId}, ` +
`clientId: ${clientId}`);
closeNode(subscriber);
publisher.removeSubscriber(clientId);
return closeNode(subscriber).then(() => {
publisher.removeSubscriber(clientId);
log.info(`message: subscriber node Closed, streamId: ${subscriber.streamId}`);
callback('callback', true);
});
}
log.warn(`message: removeSubscriber no publisher has this subscriber, clientId: ${clientId}, streamId: ${streamId}`);
callback('callback', true);
return Promise.resolve();
};

/*
Expand Down
Loading