diff --git a/erizo/src/erizo/WebRtcConnection.cpp b/erizo/src/erizo/WebRtcConnection.cpp index 0619960356..36ce5ba8de 100644 --- a/erizo/src/erizo/WebRtcConnection.cpp +++ b/erizo/src/erizo/WebRtcConnection.cpp @@ -94,10 +94,10 @@ void WebRtcConnection::syncClose() { ELOG_DEBUG("%s message: Close ended", toLog()); } -void WebRtcConnection::close() { +boost::future WebRtcConnection::close() { ELOG_DEBUG("%s message: Async close called", toLog()); std::shared_ptr shared_this = shared_from_this(); - asyncTask([shared_this] (std::shared_ptr connection) { + return asyncTask([shared_this] (std::shared_ptr connection) { shared_this->syncClose(); }); } diff --git a/erizo/src/erizo/WebRtcConnection.h b/erizo/src/erizo/WebRtcConnection.h index c8c32b98a5..5b1920db84 100644 --- a/erizo/src/erizo/WebRtcConnection.h +++ b/erizo/src/erizo/WebRtcConnection.h @@ -82,7 +82,7 @@ class WebRtcConnection: public TransportListener, public LogContext, public Hand * @return True if the candidates are gathered. */ bool init(); - void close(); + boost::future close(); void syncClose(); boost::future setRemoteSdpInfo(std::shared_ptr sdp, int received_session_version); diff --git a/erizo/src/erizo/dtls/DtlsClient.cpp b/erizo/src/erizo/dtls/DtlsClient.cpp index b6b9616455..983bea8e41 100644 --- a/erizo/src/erizo/dtls/DtlsClient.cpp +++ b/erizo/src/erizo/dtls/DtlsClient.cpp @@ -441,6 +441,8 @@ int createCert(const std::string& pAor, int expireDays, int keyLen, X509*& outCe delete[] client_key_buffer; delete[] server_key_buffer; delete keys; + delete client_key; + delete server_key; srtp_profile = mSocket->getSrtpProfile(); diff --git a/erizoAPI/MediaStream.cc b/erizoAPI/MediaStream.cc index 0e8305da17..9c171eaf57 100644 --- a/erizoAPI/MediaStream.cc +++ b/erizoAPI/MediaStream.cc @@ -196,7 +196,7 @@ NAN_METHOD(MediaStream::close) { obj->Ref(); obj->close().then( [persistent, obj] (boost::future) { - ELOG_DEBUG("%s, MediaStream Close is finishied, resolving promise", obj->toLog()); + ELOG_DEBUG("%s, MediaStream Close is finished, resolving promise", obj->toLog()); obj->notifyFuture(persistent); }); info.GetReturnValue().Set(resolver->GetPromise()); diff --git a/erizoAPI/WebRtcConnection.cc b/erizoAPI/WebRtcConnection.cc index 33735243d1..b7ea428b3e 100644 --- a/erizoAPI/WebRtcConnection.cc +++ b/erizoAPI/WebRtcConnection.cc @@ -69,24 +69,11 @@ WebRtcConnection::WebRtcConnection() : closed_{false}, id_{"undefined"} { WebRtcConnection::~WebRtcConnection() { close(); + delete event_callback_; ELOG_DEBUG("%s, message: Destroyed", toLog()); } -void WebRtcConnection::close() { - ELOG_DEBUG("%s, message: Trying to close", toLog()); - if (closed_) { - ELOG_DEBUG("%s, message: Already closed", toLog()); - return; - } - ELOG_DEBUG("%s, message: Closing", toLog()); - if (me) { - me->setWebRtcConnectionEventListener(nullptr); - me->close(); - me.reset(); - } - - boost::mutex::scoped_lock lock(mutex); - +void WebRtcConnection::closeEvents() { if (!uv_is_closing(reinterpret_cast(async_))) { ELOG_DEBUG("%s, message: Closing handle", toLog()); uv_close(reinterpret_cast(async_), destroyWebRtcConnectionAsyncHandle); @@ -98,8 +85,29 @@ void WebRtcConnection::close() { } async_ = nullptr; future_async_ = nullptr; + ELOG_DEBUG("%s, message: Closed Events, pendingRefs: %d", toLog(), refs_); +} + +boost::future WebRtcConnection::close() { + auto close_promise = std::make_shared>(); + ELOG_DEBUG("%s, message: Trying to close", toLog()); + if (closed_) { + ELOG_DEBUG("%s, message: Already closed", toLog()); + close_promise->set_value(""); + return close_promise->get_future(); + } closed_ = true; + + ELOG_DEBUG("%s, message: Closing", toLog()); + if (me) { + me->setWebRtcConnectionEventListener(nullptr); + me->close().then([this, close_promise] (boost::future) { + close_promise->set_value(std::string("webrtcconnection_closed")); + me.reset(); + }); + } ELOG_DEBUG("%s, message: Closed", toLog()); + return close_promise->get_future(); } std::string WebRtcConnection::toLog() { @@ -252,6 +260,7 @@ NAN_METHOD(WebRtcConnection::New) { rtp_mappings, ext_mappings, enable_connection_quality_check, obj); obj->Wrap(info.This()); + obj->Ref(); info.GetReturnValue().Set(info.This()); } else { // TODO(pedro) Check what happens here @@ -260,7 +269,22 @@ NAN_METHOD(WebRtcConnection::New) { NAN_METHOD(WebRtcConnection::close) { WebRtcConnection* obj = Nan::ObjectWrap::Unwrap(info.Holder()); - obj->close(); + std::shared_ptr me = obj->me; + v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); + if (!me) { + resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing(); + info.GetReturnValue().Set(resolver->GetPromise()); + return; + } + + Nan::Persistent *persistent = new Nan::Persistent(resolver); + obj->close().then( + [persistent, obj] (boost::future fut) { + ELOG_DEBUG("%s, message: WebRTCConnection Close is finished, resolving promise", obj->toLog()); + ResultVariant result = fut.get(); + obj->notifyFuture(persistent, result); + }); + info.GetReturnValue().Set(resolver->GetPromise()); } NAN_METHOD(WebRtcConnection::init) { @@ -279,7 +303,10 @@ NAN_METHOD(WebRtcConnection::init) { NAN_METHOD(WebRtcConnection::createOffer) { WebRtcConnection* obj = Nan::ObjectWrap::Unwrap(info.Holder()); std::shared_ptr me = obj->me; + v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); if (!me) { + resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing(); + info.GetReturnValue().Set(resolver->GetPromise()); return; } @@ -290,9 +317,8 @@ NAN_METHOD(WebRtcConnection::createOffer) { bool audio_enabled = Nan::To(info[1]).FromJust(); bool bundle = Nan::To(info[2]).FromJust(); - v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); Nan::Persistent *persistent = new Nan::Persistent(resolver); - obj->Ref(); + me->createOffer(video_enabled, audio_enabled, bundle).then( [persistent, obj] (boost::future) { obj->notifyFuture(persistent); @@ -331,8 +357,11 @@ NAN_METHOD(WebRtcConnection::setMetadata) { NAN_METHOD(WebRtcConnection::setRemoteDescription) { WebRtcConnection* obj = Nan::ObjectWrap::Unwrap(info.Holder()); std::shared_ptr me = obj->me; + + v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); if (!me) { - info.GetReturnValue().Set(Nan::New(false)); + resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing(); + info.GetReturnValue().Set(resolver->GetPromise()); return; } @@ -341,10 +370,8 @@ NAN_METHOD(WebRtcConnection::setRemoteDescription) { int received_session_version = Nan::To(info[1]).FromJust(); auto sdp = std::make_shared(*param->me.get()); - v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); Nan::Persistent *persistent = new Nan::Persistent(resolver); - obj->Ref(); me->setRemoteSdpInfo(sdp, received_session_version).then( [persistent, obj] (boost::future) { obj->notifyFuture(persistent); @@ -356,13 +383,16 @@ NAN_METHOD(WebRtcConnection::setRemoteDescription) { NAN_METHOD(WebRtcConnection::getLocalDescription) { WebRtcConnection* obj = Nan::ObjectWrap::Unwrap(info.Holder()); std::shared_ptr me = obj->me; + + v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); if (!me) { + resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing(); + info.GetReturnValue().Set(resolver->GetPromise()); return; } - v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); - Nan::Persistent *persistent = new Nan::Persistent(resolver); - obj->Ref(); + Nan::Persistent *persistent = new Nan::Persistent(resolver); + ELOG_DEBUG("%s, message: getLocalDescription", obj->toLog()); me->getLocalSdpInfo().then( [persistent, obj] (boost::future> fut) { std::shared_ptr sdp_info = fut.get(); @@ -474,16 +504,19 @@ NAN_METHOD(WebRtcConnection::getConnectionQualityLevel) { NAN_METHOD(WebRtcConnection::addMediaStream) { WebRtcConnection* obj = Nan::ObjectWrap::Unwrap(info.Holder()); std::shared_ptr me = obj->me; + + v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); if (!me) { + resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing(); + info.GetReturnValue().Set(resolver->GetPromise()); return; } MediaStream* param = Nan::ObjectWrap::Unwrap(Nan::To(info[0]).ToLocalChecked()); auto ms = std::shared_ptr(param->me); - v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); Nan::Persistent *persistent = new Nan::Persistent(resolver); - obj->Ref(); + me->addMediaStream(ms).then( [persistent, obj] (boost::future) { obj->notifyFuture(persistent); @@ -495,16 +528,19 @@ NAN_METHOD(WebRtcConnection::addMediaStream) { NAN_METHOD(WebRtcConnection::removeMediaStream) { WebRtcConnection* obj = Nan::ObjectWrap::Unwrap(info.Holder()); std::shared_ptr me = obj->me; + + v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); if (!me) { + resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing(); + info.GetReturnValue().Set(resolver->GetPromise()); return; } Nan::Utf8String param(Nan::To(info[0]).ToLocalChecked()); std::string stream_id = std::string(*param); - v8::Local resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked(); Nan::Persistent *persistent = new Nan::Persistent(resolver); - obj->Ref(); + me->removeMediaStream(stream_id).then( [persistent, obj] (boost::future) { obj->notifyFuture(persistent); @@ -562,27 +598,36 @@ NAUV_WORK_CB(WebRtcConnection::eventsCallback) { void WebRtcConnection::notifyFuture(Nan::Persistent *persistent, ResultVariant result) { boost::mutex::scoped_lock lock(mutex); if (!future_async_) { + ELOG_DEBUG("%s, message: Future async does not exist anymore", toLog()); return; } + ELOG_DEBUG("%s, message: Added future to async send", toLog()); ResultPair result_pair(persistent, result); futures.push(result_pair); future_async_->data = this; + Ref(); uv_async_send(future_async_); } NAUV_WORK_CB(WebRtcConnection::promiseResolver) { Nan::HandleScope scope; WebRtcConnection* obj = reinterpret_cast(async->data); - if (!obj || !obj->me) { + if (!obj) { + ELOG_DEBUG("message: promiseResolver with null object"); return; } + bool closed = false; boost::mutex::scoped_lock lock(obj->mutex); - ELOG_DEBUG("%s, message: promiseResolver", obj->toLog()); + ELOG_DEBUG("%s, message: promiseResolver, refs: %d", obj->toLog(), obj->futures.size()); while (!obj->futures.empty()) { auto persistent = obj->futures.front().first; v8::Local resolver = Nan::New(*persistent); ResultVariant r = obj->futures.front().second; if (boost::get(&r) != nullptr) { + std::string result = boost::get(r); + if (result == "webrtcconnection_closed") { + closed = true; + } resolver->Resolve(Nan::GetCurrentContext(), Nan::New(boost::get(r).c_str()).ToLocalChecked()) .IsNothing(); } else if (boost::get>(&r) != nullptr) { @@ -598,8 +643,14 @@ NAUV_WORK_CB(WebRtcConnection::promiseResolver) { persistent->Reset(); delete persistent; obj->futures.pop(); - obj->Unref(); v8::Isolate::GetCurrent()->RunMicrotasks(); + obj->Unref(); + } + + ELOG_DEBUG("%s, message: promiseResolver finished, refs: %d, closed: %d", obj->toLog(), + obj->refs_, obj->closed_); + if (closed) { + obj->closeEvents(); + obj->Unref(); } - ELOG_DEBUG("%s, message: promiseResolver finished", obj->toLog()); } diff --git a/erizoAPI/WebRtcConnection.h b/erizoAPI/WebRtcConnection.h index a1864be4bb..60a94adee6 100644 --- a/erizoAPI/WebRtcConnection.h +++ b/erizoAPI/WebRtcConnection.h @@ -53,7 +53,8 @@ class WebRtcConnection : public erizo::WebRtcConnectionEventListener, ~WebRtcConnection(); std::string toLog(); - void close(); + void closeEvents(); + boost::future close(); Nan::Callback *event_callback_; uv_async_t *async_; diff --git a/erizo_controller/erizoJS/erizoJSController.js b/erizo_controller/erizoJS/erizoJSController.js index 1f64368dfd..1a2f67dc0e 100644 --- a/erizo_controller/erizoJS/erizoJSController.js +++ b/erizo_controller/erizoJS/erizoJSController.js @@ -114,6 +114,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => { const closePromise = node.close(sendOffer); return closePromise.then(() => { + log.debug(`message: Node Closed, clientId: ${node.clientId}, streamId: ${node.streamId}`); const client = clients.get(clientId); if (client === undefined) { log.debug('message: trying to close node with no associated client,' + diff --git a/erizo_controller/erizoJS/models/Connection.js b/erizo_controller/erizoJS/models/Connection.js index b9dae5f790..da304f3a53 100644 --- a/erizo_controller/erizoJS/models/Connection.js +++ b/erizo_controller/erizoJS/models/Connection.js @@ -163,8 +163,11 @@ class Connection extends events.EventEmitter { } getLocalSdp() { + if (!this.wrtc) { + return Promise.resolve(); + } return this.wrtc.getLocalDescription().then((desc) => { - if (!desc) { + if (!this.wrtc || !desc) { log.error('Cannot get local description,', logger.objectToLog(this.options), logger.objectToLog(this.options.metadata)); return ''; @@ -314,12 +317,12 @@ class Connection extends events.EventEmitter { this.mediaStreams.delete(id); return Promise.all([removePromise, closePromise]).then(() => { if (sendOffer) { - return this.sendOffer(); + this.sendOffer(); } return Promise.resolve(); }); } - log.error(`message: Trying to remove mediaStream not found, id: ${id},`, + log.error(`message: Trying to remove mediaStream not found, clientId: ${this.clientId}, streamId: ${id}`, logger.objectToLog(this.options), logger.objectToLog(this.options.metadata)); return promise; } @@ -488,9 +491,14 @@ class Connection extends events.EventEmitter { promises.push(mediaStream.close()); }); Promise.all(promises).then(() => { - this.wrtc.close(); + log.debug(`message: Closing WRTC, id: ${this.id},`, + logger.objectToLog(this.options), logger.objectToLog(this.options.metadata)); + this.wrtc.close().then(() => { + log.debug(`message: WRTC closed, id: ${this.id},`, + logger.objectToLog(this.options), logger.objectToLog(this.options.metadata)); + delete this.wrtc; + }); this.mediaStreams.clear(); - delete this.wrtc; }); } } diff --git a/erizo_controller/erizoJS/models/Publisher.js b/erizo_controller/erizoJS/models/Publisher.js index 477165c726..df74e7f48f 100644 --- a/erizo_controller/erizoJS/models/Publisher.js +++ b/erizo_controller/erizoJS/models/Publisher.js @@ -416,7 +416,7 @@ class Publisher extends Source { } close() { - const removeMediaStreamPromise = this.connection.removeMediaStream(this.mediaStream.id); + const removeMediaStreamPromise = this.connection.removeMediaStream(this.mediaStream.id, false); if (this.mediaStream.monitorInterval) { clearInterval(this.mediaStream.monitorInterval); } diff --git a/erizo_controller/erizoJS/models/Subscriber.js b/erizo_controller/erizoJS/models/Subscriber.js index b5862dec4a..6fa9535bce 100644 --- a/erizo_controller/erizoJS/models/Subscriber.js +++ b/erizo_controller/erizoJS/models/Subscriber.js @@ -88,11 +88,12 @@ class Subscriber extends NodeClass { } close(sendOffer = true) { - log.debug(`message: Closing subscriber, streamId:${this.streamId}, `, + log.debug(`message: Closing subscriber, clientId: ${this.clientId}, streamId: ${this.streamId}, `, logger.objectToLog(this.options), logger.objectToLog(this.options.metadata)); this.publisher = undefined; let promise = Promise.resolve(); if (this.connection) { + log.debug(`message: Removing Media Stream, clientId: ${this.clientId}, streamId: ${this.streamId}`); promise = this.connection.removeMediaStream(this.mediaStream.id, sendOffer); this.connection.removeListener('media_stream_event', this._mediaStreamListener); } diff --git a/test/negotiation/utils/ClientStream.js b/test/negotiation/utils/ClientStream.js index 83e54befb0..c3d79bfa62 100644 --- a/test/negotiation/utils/ClientStream.js +++ b/test/negotiation/utils/ClientStream.js @@ -1,7 +1,8 @@ +let currentClientStreamId = 0; class ClientStream { constructor(page) { this.page = page; - this.id = parseInt(Math.random() * 10000); + this.id = currentClientStreamId++; this.audio = true; this.video = true; this.data = true; diff --git a/test/negotiation/utils/NegotiationTest.js b/test/negotiation/utils/NegotiationTest.js index 9233ac6181..e1848c3c5b 100644 --- a/test/negotiation/utils/NegotiationTest.js +++ b/test/negotiation/utils/NegotiationTest.js @@ -9,6 +9,7 @@ const ErizoConnection = require('./ErizoConnection'); const SdpChecker = require('./SdpUtils'); let browser; +let currentErizoStreamId = 10; before(async function() { this.timeout(30000); @@ -33,8 +34,9 @@ after(async function() { }); -const describeNegotiationTest = function(title, test) { - describe(title, function() { +const describeNegotiationTest = function(title, test, only = false) { + const describeImpl = only ? describe.only : describe; + describeImpl(title, function() { this.timeout(50000); const ctx = { @@ -78,7 +80,7 @@ const describeNegotiationTest = function(title, test) { }; ctx.createErizoStream = async function() { - const id = parseInt(Math.random() * 1000, 0); + const id = parseInt(currentErizoStreamId++, 0); const stream = { id, label: id.toString(), audio: true, video: true, addedToConnection: false }; ctx.erizoStreams[id] = stream; return stream; @@ -114,6 +116,7 @@ const describeNegotiationTest = function(title, test) { const currentProcessPath = process.cwd(); const htmlPath = path.join(currentProcessPath, '../../extras/basic_example/public/index.html'); page = await browser.newPage(); + // page.on('console', msg => console.log('PAGE LOG:', msg.text())); await page.goto(`file://${htmlPath}?forceStart=1`); }); @@ -358,12 +361,27 @@ const describeNegotiationTest = function(title, test) { before(async function() { clientStream = await ctx.createClientStream(); erizoStream = await ctx.createErizoStream(); + let processOfferPromise, addStreamPromise, waitForSignalingPromise; for (const step of steps) { switch (step) { case 'client-add-stream': await ctx.client.addStream(clientStream); break; + case 'client-add-stream-and-process-erizo-offer': + addStreamPromise = ctx.client.addStream(clientStream); + processOfferPromise = ctx.client.processSignalingMessage(erizoOffer); + await addStreamPromise; + await processOfferPromise; + break; + case 'client-get-offer-and-process-erizo-offer': + processOfferPromise = ctx.client.processSignalingMessage(erizoOffer); + waitForSignalingPromise = ctx.client.waitForSignalingMessage(); + await waitForSignalingPromise; + clientOfferPromise = ctx.client.getSignalingMessage(); + await processOfferPromise; + clientOffer = await clientOfferPromise; + break; case 'erizo-publish-stream': await ctx.erizo.publishStream(clientStream); break;