Skip to content

Commit

Permalink
Add sdp version control to start sending bytes when negotiation has f…
Browse files Browse the repository at this point in the history
…inished (#1518)
  • Loading branch information
jcague authored Feb 17, 2020
1 parent 2a842f7 commit 3f4faad
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 42 deletions.
14 changes: 11 additions & 3 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ MediaStream::MediaStream(std::shared_ptr<Worker> worker,
std::shared_ptr<WebRtcConnection> connection,
const std::string& media_stream_id,
const std::string& media_stream_label,
bool is_publisher) :
bool is_publisher,
int session_version) :
audio_enabled_{false}, video_enabled_{false},
media_stream_event_listener_{nullptr},
connection_{std::move(connection)},
Expand All @@ -70,7 +71,8 @@ MediaStream::MediaStream(std::shared_ptr<Worker> worker,
random_generator_{random_device_()},
target_padding_bitrate_{0},
periodic_keyframes_requested_{false},
periodic_keyframe_interval_{0} {
periodic_keyframe_interval_{0},
session_version_{session_version} {
if (is_publisher) {
setVideoSinkSSRC(kDefaultVideoSinkSSRC);
setAudioSinkSSRC(kDefaultAudioSinkSSRC);
Expand Down Expand Up @@ -165,7 +167,7 @@ bool MediaStream::isSinkSSRC(uint32_t ssrc) {
return isVideoSinkSSRC(ssrc) || isAudioSinkSSRC(ssrc);
}

bool MediaStream::setRemoteSdp(std::shared_ptr<SdpInfo> sdp) {
bool MediaStream::setRemoteSdp(std::shared_ptr<SdpInfo> sdp, int session_version_negotiated = -1) {
ELOG_DEBUG("%s message: setting remote SDP to Stream, sending: %d, initialized: %d",
toLog(), sending_, pipeline_initialized_);
if (!sending_) {
Expand All @@ -189,6 +191,12 @@ bool MediaStream::setRemoteSdp(std::shared_ptr<SdpInfo> sdp) {
}
}

if (!isPublisher() && session_version_negotiated >= 0 && session_version_ > session_version_negotiated) {
ELOG_WARN("%s message: too old session version, session_version_: %d, negotiated_session_version: %d",
toLog(), session_version_, session_version_negotiated);
return true;
}

remote_sdp_ = remote_sdp;

if (remote_sdp_->videoBandwidth != 0) {
Expand Down
5 changes: 3 additions & 2 deletions erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
*/
MediaStream(std::shared_ptr<Worker> worker, std::shared_ptr<WebRtcConnection> connection,
const std::string& media_stream_id, const std::string& media_stream_label,
bool is_publisher);
bool is_publisher, int session_version);
/**
* Destructor.
*/
Expand All @@ -77,7 +77,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
void setVideoBitrate(uint32_t bitrate) { video_bitrate_ = bitrate; }
void setMaxVideoBW(uint32_t max_video_bw);
void syncClose();
bool setRemoteSdp(std::shared_ptr<SdpInfo> sdp);
bool setRemoteSdp(std::shared_ptr<SdpInfo> sdp, int session_version_negotiated);

/**
* Sends a PLI Packet
Expand Down Expand Up @@ -236,6 +236,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
uint64_t target_padding_bitrate_;
bool periodic_keyframes_requested_;
uint32_t periodic_keyframe_interval_;
int session_version_;

protected:
std::shared_ptr<SdpInfo> remote_sdp_;
Expand Down
25 changes: 13 additions & 12 deletions erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,10 @@ void WebRtcConnection::forEachMediaStreamAsyncNoPromise(
}

boost::future<void> WebRtcConnection::setRemoteSdpInfo(
std::shared_ptr<SdpInfo> sdp) {
std::shared_ptr<SdpInfo> sdp, int received_session_version) {
std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();
auto task_promise = std::make_shared<boost::promise<void>>();
worker_->task([weak_this, sdp, task_promise] {
worker_->task([weak_this, sdp, task_promise, received_session_version] {
if (auto connection = weak_this.lock()) {
ELOG_DEBUG("%s message: setting remote SDPInfo", connection->toLog());
if (!connection->sending_) {
Expand All @@ -293,7 +293,7 @@ boost::future<void> WebRtcConnection::setRemoteSdpInfo(
}
connection->remote_sdp_ = sdp;
connection->notifyUpdateToHandlers();
boost::future<void> future = connection->processRemoteSdp().then(
boost::future<void> future = connection->processRemoteSdp(received_session_version).then(
[task_promise] (boost::future<void>) {
task_promise->set_value();
});
Expand Down Expand Up @@ -395,39 +395,40 @@ std::shared_ptr<SdpInfo> WebRtcConnection::getLocalSdpInfoSync() {
return local_sdp_copy;
}

boost::future<void> WebRtcConnection::setRemoteSdp(const std::string &sdp) {
boost::future<void> WebRtcConnection::setRemoteSdp(const std::string &sdp, int received_session_version) {
std::shared_ptr<boost::promise<void>> p = std::make_shared<boost::promise<void>>();
boost::future<void> f = p->get_future();
asyncTask([sdp, p] (std::shared_ptr<WebRtcConnection> connection) {
asyncTask([sdp, p, received_session_version] (std::shared_ptr<WebRtcConnection> connection) {
ELOG_DEBUG("%s message: setting remote SDP", connection->toLog());
if (!connection->sending_) {
p->set_value();
return;
}

connection->remote_sdp_->initWithSdp(sdp, "");
boost::future<void> f = connection->processRemoteSdp();
boost::future<void> f = connection->processRemoteSdp(received_session_version);
f.then([p](boost::future<void> future) {
p->set_value();
});
});
return f;
}

boost::future<void> WebRtcConnection::setRemoteSdpsToMediaStreams() {
boost::future<void> WebRtcConnection::setRemoteSdpsToMediaStreams(int received_session_version) {
ELOG_DEBUG("%s message: setting remote SDP, streams: %d", toLog(), media_streams_.size());
std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();
std::shared_ptr<SdpInfo> remote_sdp = std::make_shared<SdpInfo>(*remote_sdp_.get());
return forEachMediaStreamAsync([weak_this, remote_sdp](std::shared_ptr<MediaStream> media_stream) {
return forEachMediaStreamAsync([weak_this, remote_sdp, received_session_version]
(std::shared_ptr<MediaStream> media_stream) {
if (auto connection = weak_this.lock()) {
media_stream->setRemoteSdp(remote_sdp);
media_stream->setRemoteSdp(remote_sdp, received_session_version);
ELOG_DEBUG("%s message: setting remote SDP to stream, stream: %s",
connection->toLog(), media_stream->getId());
}
});
}

boost::future<void> WebRtcConnection::processRemoteSdp() {
boost::future<void> WebRtcConnection::processRemoteSdp(int received_session_version = -1) {
ELOG_DEBUG("%s message: processing remote SDP", toLog());
if (!first_remote_sdp_processed_ && local_sdp_->internal_dtls_role == ACTPASS) {
local_sdp_->internal_dtls_role = ACTIVE;
Expand All @@ -436,7 +437,7 @@ boost::future<void> WebRtcConnection::processRemoteSdp() {
ELOG_DEBUG("%s message: process remote sdp, setup: %d", toLog(), local_sdp_->internal_dtls_role);

if (first_remote_sdp_processed_) {
return setRemoteSdpsToMediaStreams();
return setRemoteSdpsToMediaStreams(received_session_version);
}

bundle_ = remote_sdp_->isBundle;
Expand Down Expand Up @@ -500,7 +501,7 @@ boost::future<void> WebRtcConnection::processRemoteSdp() {
}

first_remote_sdp_processed_ = true;
return setRemoteSdpsToMediaStreams();
return setRemoteSdpsToMediaStreams(received_session_version);
}

boost::future<void> WebRtcConnection::addRemoteCandidate(std::string mid, int mLineIndex, std::string sdp) {
Expand Down
8 changes: 4 additions & 4 deletions erizo/src/erizo/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ class WebRtcConnection: public TransportListener, public LogContext, public Hand
void close();
void syncClose();

boost::future<void> setRemoteSdpInfo(std::shared_ptr<SdpInfo> sdp);
boost::future<void> setRemoteSdpInfo(std::shared_ptr<SdpInfo> sdp, int received_session_version);
/**
* Sets the SDP of the remote peer.
* @param sdp The SDP.
* @return true if the SDP was received correctly.
*/
boost::future<void> setRemoteSdp(const std::string &sdp);
boost::future<void> setRemoteSdp(const std::string &sdp, int received_session_version);

boost::future<void> createOffer(bool video_enabled, bool audio_enabled, bool bundle);

Expand Down Expand Up @@ -176,8 +176,8 @@ class WebRtcConnection: public TransportListener, public LogContext, public Hand

private:
bool createOfferSync(bool video_enabled, bool audio_enabled, bool bundle);
boost::future<void> processRemoteSdp();
boost::future<void> setRemoteSdpsToMediaStreams();
boost::future<void> processRemoteSdp(int received_session_version);
boost::future<void> setRemoteSdpsToMediaStreams(int received_session_version);
std::string getJSONCandidate(const std::string& mid, const std::string& sdp);
void trackTransportInfo();
void onRtcpFromTransport(std::shared_ptr<DataPacket> packet, Transport *transport);
Expand Down
4 changes: 2 additions & 2 deletions erizo/src/test/utils/Mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class MockMediaStream: public MediaStream {
public:
MockMediaStream(std::shared_ptr<Worker> worker, std::shared_ptr<WebRtcConnection> connection,
const std::string& media_stream_id, const std::string& media_stream_label,
std::vector<RtpMap> rtp_mappings, bool is_publisher = true) :
MediaStream(worker, connection, media_stream_id, media_stream_label, is_publisher) {
std::vector<RtpMap> rtp_mappings, bool is_publisher = true, int session_version = -1) :
MediaStream(worker, connection, media_stream_id, media_stream_label, is_publisher, session_version) {
remote_sdp_ = std::make_shared<SdpInfo>(rtp_mappings);
}

Expand Down
4 changes: 2 additions & 2 deletions erizoAPI/MediaStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ NAN_METHOD(MediaStream::New) {
std::string stream_label = std::string(*paramLabel);

bool is_publisher = Nan::To<bool>(info[5]).FromJust();

int session_version = Nan::To<int>(info[6]).FromJust();
std::shared_ptr<erizo::Worker> worker = thread_pool->me->getLessUsedWorker();

MediaStream* obj = new MediaStream();
obj->me = std::make_shared<erizo::MediaStream>(worker, wrtc, wrtc_id, stream_label, is_publisher);
obj->me = std::make_shared<erizo::MediaStream>(worker, wrtc, wrtc_id, stream_label, is_publisher, session_version);
obj->msink = obj->me.get();
obj->id_ = wrtc_id;
obj->label_ = stream_label;
Expand Down
6 changes: 4 additions & 2 deletions erizoAPI/WebRtcConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,12 @@ NAN_METHOD(WebRtcConnection::setRemoteSdp) {

Nan::Utf8String param(Nan::To<v8::String>(info[0]).ToLocalChecked());
std::string sdp = std::string(*param);
int received_session_version = Nan::To<int>(info[1]).FromJust();

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();
me->setRemoteSdp(sdp).then(
me->setRemoteSdp(sdp, received_session_version).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
});
Expand All @@ -359,13 +360,14 @@ NAN_METHOD(WebRtcConnection::setRemoteDescription) {

ConnectionDescription* param =
Nan::ObjectWrap::Unwrap<ConnectionDescription>(Nan::To<v8::Object>(info[0]).ToLocalChecked());
int received_session_version = Nan::To<int>(info[1]).FromJust();
auto sdp = std::make_shared<erizo::SdpInfo>(*param->me.get());

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);

obj->Ref();
me->setRemoteSdpInfo(sdp).then(
me->setRemoteSdpInfo(sdp, received_session_version).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
});
Expand Down
6 changes: 4 additions & 2 deletions erizo_controller/erizoClient/src/webrtc-stacks/BaseStack.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ const BaseStack = (specInput) => {
specBase.callback({
type: localDesc.type,
sdp: localDesc.sdp,
receivedSessionVersion: latestSessionVersion,
config: { maxVideoBW: specBase.maxVideoBW },
});
};
Expand All @@ -138,6 +139,7 @@ const BaseStack = (specInput) => {
specBase.callback({
type: localDesc.type,
sdp: localDesc.sdp,
receivedSessionVersion: latestSessionVersion,
config: { maxVideoBW: specBase.maxVideoBW },
});
Logger.info('Setting local description', localDesc);
Expand Down Expand Up @@ -454,7 +456,7 @@ const BaseStack = (specInput) => {
new RTCSessionDescription(remoteDesc));
}).then(() => {
specBase.remoteDescriptionSet = true;
specBase.callback({ type: 'offer-noanswer', sdp: localDesc.sdp });
specBase.callback({ type: 'offer-noanswer', sdp: localDesc.sdp, receivedSessionVersion: latestSessionVersion });
}).catch((error) => {
callback('error', 'updateSpec');
rejectMessages.push(`in: protectedNegotiateMaxBW error: ${error}`);
Expand Down Expand Up @@ -522,7 +524,7 @@ const BaseStack = (specInput) => {
}

if (specBase.remoteDescriptionSet) {
specBase.callback({ type: 'candidate', candidate: candidateObject });
specBase.callback({ type: 'candidate', candidate: candidateObject, receivedSessionVersion: latestSessionVersion });
} else {
specBase.localCandidates.push(candidateObject);
Logger.info('Storing candidate: ', specBase.localCandidates.length, candidateObject);
Expand Down
28 changes: 17 additions & 11 deletions erizo_controller/erizoJS/models/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,16 @@ class Connection extends events.EventEmitter {
return wrtc;
}

_createMediaStream(id, options = {}, isPublisher = true) {
_createMediaStream(id, options = {}, isPublisher = true, offerFromErizo = false) {
log.debug(`message: _createMediaStream, connectionId: ${this.id}, ` +
`mediaStreamId: ${id}, isPublisher: ${isPublisher}`);
const mediaStream = new addon.MediaStream(this.threadPool, this.wrtc, id,
options.label, Connection._getMediaConfiguration(this.mediaConfiguration), isPublisher);
const sessionVersion = offerFromErizo ? this.sessionVersion : -1;
const mediaStream = new addon.MediaStream(this.threadPool,
this.wrtc, id,
options.label,
Connection._getMediaConfiguration(this.mediaConfiguration),
isPublisher,
sessionVersion);
mediaStream.id = id;
mediaStream.label = options.label;
if (options.metadata) {
Expand Down Expand Up @@ -266,11 +271,11 @@ class Connection extends events.EventEmitter {
return true;
}

addMediaStream(id, options, isPublisher) {
addMediaStream(id, options, isPublisher, offerFromErizo) {
let promise = Promise.resolve();
log.info(`message: addMediaStream, connectionId: ${this.id}, mediaStreamId: ${id}`);
if (this.mediaStreams.get(id) === undefined) {
const mediaStream = this._createMediaStream(id, options, isPublisher);
const mediaStream = this._createMediaStream(id, options, isPublisher, offerFromErizo);
promise = this.wrtc.addMediaStream(mediaStream);
this.mediaStreams.set(id, mediaStream);
}
Expand All @@ -294,11 +299,12 @@ class Connection extends events.EventEmitter {
return promise;
}

setRemoteDescription(sdp) {
setRemoteDescription(sdp, receivedSessionVersion = -1) {
const sdpInfo = SemanticSdp.SDPInfo.processString(sdp);
this.remoteDescription = new SessionDescription(sdpInfo, this.mediaConfiguration);
this._logSdp('setRemoteDescription');
return this.wrtc.setRemoteDescription(this.remoteDescription.connectionDescription);
return this.wrtc.setRemoteDescription(this.remoteDescription.connectionDescription,
receivedSessionVersion);
}

addRemoteCandidate(candidate) {
Expand Down Expand Up @@ -376,26 +382,26 @@ class Connection extends events.EventEmitter {
} else {
onEvent = this.onGathered;
}
return this.setRemoteDescription(msg.sdp)
return this.setRemoteDescription(msg.sdp, msg.receivedSessionVersion)
.then(() => onEvent)
.then(() => this.sendAnswer())
.catch(() => {
log.error('message: Error processing offer/answer in connection, connectionId:', this.id);
});
} else if (msg.type === 'offer-noanswer') {
return this.setRemoteDescription(msg.sdp).catch(() => {
return this.setRemoteDescription(msg.sdp, msg.receivedSessionVersion).catch(() => {
log.error('message: Error processing offer/noanswer in connection, connectionId:', this.id);
});
} else if (msg.type === 'answer') {
return this.setRemoteDescription(msg.sdp).catch(() => {
return this.setRemoteDescription(msg.sdp, msg.receivedSessionVersion).catch(() => {
log.error('message: Error processing answer in connection, connectionId:', this.id);
});
} else if (msg.type === 'candidate') {
this.addRemoteCandidate(msg.candidate);
return Promise.resolve();
} else if (msg.type === 'updatestream') {
if (msg.sdp) {
return this.setRemoteDescription(msg.sdp).catch(() => {
return this.setRemoteDescription(msg.sdp, msg.receivedSessionVersion).catch(() => {
log.error('message: Error processing updatestream in connection, connectionId:', this.id);
});
}
Expand Down
2 changes: 1 addition & 1 deletion erizo_controller/erizoJS/models/Publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class Publisher extends Source {
this.connection = connection;

this.connection.mediaConfiguration = options.mediaConfiguration;
this.promise = this.connection.addMediaStream(streamId, options, true);
this.promise = this.connection.addMediaStream(streamId, options, true, false);
this.mediaStream = this.connection.getMediaStream(streamId);

this.minVideoBW = options.minVideoBW;
Expand Down
3 changes: 2 additions & 1 deletion erizo_controller/erizoJS/models/Subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class Subscriber extends NodeClass {
super(clientId, streamId, options);
this.connection = connection;
this.connection.mediaConfiguration = options.mediaConfiguration;
this.promise = this.connection.addMediaStream(this.erizoStreamId, options, false);
this.promise = this.connection.addMediaStream(this.erizoStreamId, options, false,
options.offerFromErizo);
this._mediaStreamListener = this._onMediaStreamEvent.bind(this);
connection.on('media_stream_event', this._mediaStreamListener);
connection.onReady.then(() => {
Expand Down

0 comments on commit 3f4faad

Please sign in to comment.