From 273f5ed3a133c57438efc0d2ac82426c84882cd7 Mon Sep 17 00:00:00 2001 From: Pedro Rodriguez Date: Wed, 27 Oct 2021 11:58:12 +0200 Subject: [PATCH] Fix WebRTCConnections not properly waiting for IceConnections to close. (#1765) --- erizo/src/erizo/DtlsTransport.cpp | 24 ++++++----- erizo/src/erizo/DtlsTransport.h | 2 +- erizo/src/erizo/IceConnection.h | 3 +- erizo/src/erizo/LibNiceConnection.cpp | 30 ++++++++++---- erizo/src/erizo/LibNiceConnection.h | 3 +- erizo/src/erizo/NicerConnection.cpp | 26 ++++++++++-- erizo/src/erizo/NicerConnection.h | 3 +- erizo/src/erizo/Transport.h | 2 +- erizo/src/erizo/UnencryptedTransport.cpp | 13 +++--- erizo/src/erizo/UnencryptedTransport.h | 2 +- erizo/src/erizo/WebRtcConnection.cpp | 51 ++++++++++++++++-------- erizo/src/erizo/WebRtcConnection.h | 2 +- erizo/src/test/utils/Mocks.h | 6 ++- 13 files changed, 115 insertions(+), 52 deletions(-) diff --git a/erizo/src/erizo/DtlsTransport.cpp b/erizo/src/erizo/DtlsTransport.cpp index 8e0d7d1bcb..5b50967bf7 100644 --- a/erizo/src/erizo/DtlsTransport.cpp +++ b/erizo/src/erizo/DtlsTransport.cpp @@ -122,7 +122,7 @@ DtlsTransport::DtlsTransport(MediaType med, const std::string &transport_name, c DtlsTransport::~DtlsTransport() { if (this->state_ != TRANSPORT_FINISHED) { - this->close(); + ELOG_WARN("%s message: Destructor called but transport has not been properly closed", toLog()); } } @@ -133,7 +133,7 @@ void DtlsTransport::start() { ice_->start(); } -void DtlsTransport::close() { +boost::future DtlsTransport::close() { ELOG_DEBUG("%s message: closing", toLog()); running_ = false; if (rtp_timeout_checker_) { @@ -142,15 +142,17 @@ void DtlsTransport::close() { if (rtcp_timeout_checker_) { rtcp_timeout_checker_->cancel(); } - ice_->close(); - if (dtlsRtp) { - dtlsRtp->close(); - } - if (dtlsRtcp) { - dtlsRtcp->close(); - } - this->state_ = TRANSPORT_FINISHED; - ELOG_DEBUG("%s message: closed", toLog()); + std::shared_ptr shared_this = std::dynamic_pointer_cast(shared_from_this()); + return ice_->close().then([shared_this] (boost::future) { + if (shared_this->dtlsRtp) { + shared_this->dtlsRtp->close(); + } + if (shared_this->dtlsRtcp) { + shared_this->dtlsRtcp->close(); + } + shared_this->state_ = TRANSPORT_FINISHED; + ELOG_DEBUG("%s message: closed", shared_this->toLog()); + }); } void DtlsTransport::maybeRestartIce(std::string username, std::string password) { diff --git a/erizo/src/erizo/DtlsTransport.h b/erizo/src/erizo/DtlsTransport.h index abbe746df1..265825ec50 100644 --- a/erizo/src/erizo/DtlsTransport.h +++ b/erizo/src/erizo/DtlsTransport.h @@ -29,7 +29,7 @@ class DtlsTransport : dtls::DtlsReceiver, public Transport { std::string getMyFingerprint() const; static bool isDtlsPacket(const char* buf, int len); void start() override; - void close() override; + boost::future close() override; void maybeRestartIce(std::string username, std::string password) override; void onIceData(packetPtr packet) override; void onCandidate(const CandidateInfo &candidate, IceConnection *conn) override; diff --git a/erizo/src/erizo/IceConnection.h b/erizo/src/erizo/IceConnection.h index 88a1c4d0f7..d27fea57ce 100644 --- a/erizo/src/erizo/IceConnection.h +++ b/erizo/src/erizo/IceConnection.h @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -102,7 +103,7 @@ class IceConnection : public LogContext { virtual void onData(unsigned int component_id, char* buf, int len) {} virtual void onData(unsigned int component_id, packetPtr) {} virtual CandidatePair getSelectedPair() = 0; - virtual void close() = 0; + virtual boost::future close() = 0; virtual void maybeRestartIce(std::string remote_ufrag, std::string remote_pass) = 0; virtual void updateIceState(IceState state); diff --git a/erizo/src/erizo/LibNiceConnection.cpp b/erizo/src/erizo/LibNiceConnection.cpp index e2a2b8f300..e912d8b00d 100644 --- a/erizo/src/erizo/LibNiceConnection.cpp +++ b/erizo/src/erizo/LibNiceConnection.cpp @@ -107,14 +107,17 @@ LibNiceConnection::~LibNiceConnection() { } } -void LibNiceConnection::close() { - if (closed_) { - return; +boost::future LibNiceConnection::close() { + if (!closed_) { + auto shared_this = shared_from_this(); + return asyncWithFuture([shared_this] (std::shared_ptr this_ptr) { + shared_this->closeSync(); + }); + } else { + auto task_promise = std::make_shared>(); + task_promise->set_value(); + return task_promise->get_future(); } - auto shared_this = shared_from_this(); - async([shared_this] (std::shared_ptr this_ptr) { - shared_this->closeSync(); - }); } void LibNiceConnection::closeSync() { @@ -182,6 +185,19 @@ void LibNiceConnection::async(std::function LibNiceConnection::asyncWithFuture( + std::function)> f) { + auto task_promise = std::make_shared>(); + std::weak_ptr weak_this = shared_from_this(); + io_worker_->task([weak_this, f, task_promise] { + if (auto this_ptr = weak_this.lock()) { + f(this_ptr); + } + task_promise->set_value(); + }); + return task_promise->get_future(); +} + void LibNiceConnection::start() { async([] (std::shared_ptr this_ptr) { this_ptr->startSync(); diff --git a/erizo/src/erizo/LibNiceConnection.h b/erizo/src/erizo/LibNiceConnection.h index 3b6916e276..045ec8173b 100644 --- a/erizo/src/erizo/LibNiceConnection.h +++ b/erizo/src/erizo/LibNiceConnection.h @@ -60,8 +60,9 @@ class LibNiceConnection : public IceConnection, public std::enable_shared_from_t CandidatePair getSelectedPair() override; void maybeRestartIce(std::string remote_ufrag, std::string remote_pass) override; void restartIceSync(std::string remote_ufrag, std::string remote_pass); - void close() override; + boost::future close() override; void async(std::function)> f); + boost::future asyncWithFuture(std::function)> f); static std::shared_ptr create(std::shared_ptr io_worker, const IceConfig& ice_config); diff --git a/erizo/src/erizo/NicerConnection.cpp b/erizo/src/erizo/NicerConnection.cpp index d56918ce56..b5168e4eb4 100644 --- a/erizo/src/erizo/NicerConnection.cpp +++ b/erizo/src/erizo/NicerConnection.cpp @@ -154,9 +154,12 @@ NicerConnection::NicerConnection(std::shared_ptr io_worker, std::share } NicerConnection::~NicerConnection() { + if (!closed_) { + ELOG_WARN("%s message: Destructor without a previous close", toLog()); + } } -void NicerConnection::async(function)> f) { +void NicerConnection::async(std::function)> f) { std::weak_ptr weak_this = shared_from_this(); io_worker_->task([weak_this, f] { if (auto this_ptr = weak_this.lock()) { @@ -165,6 +168,19 @@ void NicerConnection::async(function)> f) }); } +boost::future NicerConnection::asyncWithFuture( + std::function)> f) { + auto task_promise = std::make_shared>(); + std::weak_ptr weak_this = shared_from_this(); + io_worker_->task([weak_this, f, task_promise] { + if (auto this_ptr = weak_this.lock()) { + f(this_ptr); + } + task_promise->set_value(); + }); + return task_promise->get_future(); +} + void NicerConnection::start() { ufrag_ = getNewUfrag(); upass_ = getNewPwd(); @@ -604,12 +620,16 @@ void NicerConnection::closeSync() { closed_ = true; } -void NicerConnection::close() { +boost::future NicerConnection::close() { if (!closed_) { auto shared_this = shared_from_this(); - async([shared_this] (std::shared_ptr this_ptr) { + return asyncWithFuture([shared_this] (std::shared_ptr this_ptr) { shared_this->closeSync(); }); + } else { + auto task_promise = std::make_shared>(); + task_promise->set_value(); + return task_promise->get_future(); } } diff --git a/erizo/src/erizo/NicerConnection.h b/erizo/src/erizo/NicerConnection.h index 2f78852d1d..96e4609d60 100644 --- a/erizo/src/erizo/NicerConnection.h +++ b/erizo/src/erizo/NicerConnection.h @@ -56,7 +56,7 @@ class NicerConnection : public IceConnection, public std::enable_shared_from_thi void onData(unsigned int component_id, char* buf, int len) override; CandidatePair getSelectedPair() override; - void close() override; + boost::future close() override; bool isClosed() { return closed_; } void maybeRestartIce(std::string remote_ufrag, std::string remote_pass) override; @@ -76,6 +76,7 @@ class NicerConnection : public IceConnection, public std::enable_shared_from_thi void startSync(); void closeSync(); void async(function)> f); + boost::future asyncWithFuture(std::function)> f); void setRemoteCredentialsSync(const std::string& username, const std::string& password); void addStreamSync(std::string remote_ufrag, std::string remote_pass); diff --git a/erizo/src/erizo/Transport.h b/erizo/src/erizo/Transport.h index fb78e72028..576b48ab7b 100644 --- a/erizo/src/erizo/Transport.h +++ b/erizo/src/erizo/Transport.h @@ -45,7 +45,7 @@ class Transport : public std::enable_shared_from_this, public IceConn virtual void write(char* data, int len) = 0; virtual void processLocalSdp(SdpInfo *localSdp_) = 0; virtual void start() = 0; - virtual void close() = 0; + virtual boost::future close() = 0; virtual std::shared_ptr getIceConnection() { return ice_; } void setTransportListener(std::weak_ptr listener) { transport_listener_ = listener; diff --git a/erizo/src/erizo/UnencryptedTransport.cpp b/erizo/src/erizo/UnencryptedTransport.cpp index 3d013b6cab..32b871ec07 100644 --- a/erizo/src/erizo/UnencryptedTransport.cpp +++ b/erizo/src/erizo/UnencryptedTransport.cpp @@ -40,7 +40,7 @@ UnencryptedTransport::UnencryptedTransport(MediaType med, const std::string &tra UnencryptedTransport::~UnencryptedTransport() { if (this->state_ != TRANSPORT_FINISHED) { - this->close(); + ELOG_WARN("%s message: Destructor called but transport has not been properly closed", toLog()); } } @@ -51,12 +51,15 @@ void UnencryptedTransport::start() { ice_->start(); } -void UnencryptedTransport::close() { +boost::future UnencryptedTransport::close() { + std::shared_ptr shared_this = + std::dynamic_pointer_cast(shared_from_this()); ELOG_DEBUG("%s message: closing", toLog()); running_ = false; - ice_->close(); - this->state_ = TRANSPORT_FINISHED; - ELOG_DEBUG("%s message: closed", toLog()); + return ice_->close().then([shared_this] (boost::future) { + shared_this->state_ = TRANSPORT_FINISHED; + ELOG_DEBUG("%s message: closed", shared_this->toLog()); + }); } void UnencryptedTransport::maybeRestartIce(std::string username, std::string password) { diff --git a/erizo/src/erizo/UnencryptedTransport.h b/erizo/src/erizo/UnencryptedTransport.h index 12ea6a750c..880d7d5895 100644 --- a/erizo/src/erizo/UnencryptedTransport.h +++ b/erizo/src/erizo/UnencryptedTransport.h @@ -21,7 +21,7 @@ class UnencryptedTransport : public Transport { virtual ~UnencryptedTransport(); void connectionStateChanged(IceState newState); void start() override; - void close() override; + boost::future close() override; void maybeRestartIce(std::string username, std::string password) override; void onIceData(packetPtr packet) override; void onCandidate(const CandidateInfo &candidate, IceConnection *conn) override; diff --git a/erizo/src/erizo/WebRtcConnection.cpp b/erizo/src/erizo/WebRtcConnection.cpp index a36ae7518e..3261888ec0 100644 --- a/erizo/src/erizo/WebRtcConnection.cpp +++ b/erizo/src/erizo/WebRtcConnection.cpp @@ -64,37 +64,54 @@ WebRtcConnection::~WebRtcConnection() { ELOG_DEBUG("%s message: Destructor ended", toLog()); } -void WebRtcConnection::syncClose() { +boost::future WebRtcConnection::syncClose() { ELOG_DEBUG("%s message: Close called", toLog()); + auto shared_this = shared_from_this(); + auto close_promise = std::make_shared>(); if (!sending_) { - return; + ELOG_DEBUG("%s message: Already closed, returning", toLog()); + close_promise->set_value(); + return close_promise->get_future(); } sending_ = false; transceivers_.clear(); streams_.clear(); + auto close_transport_futures = std::make_shared>>(); if (video_transport_.get()) { - video_transport_->close(); + close_transport_futures->push_back(video_transport_->close()); } if (audio_transport_.get()) { - audio_transport_->close(); - } - global_state_ = CONN_FINISHED; - if (conn_event_listener_ != nullptr) { - conn_event_listener_ = nullptr; + close_transport_futures->push_back(audio_transport_->close()); } - pipeline_initialized_ = false; - pipeline_->close(); - pipeline_.reset(); - - ELOG_DEBUG("%s message: Close ended", toLog()); + auto future_when_all = boost::when_all(close_transport_futures->begin(), close_transport_futures->end()); + future_when_all.then([shared_this, close_promise](decltype(future_when_all)) { + shared_this->global_state_ = CONN_FINISHED; + if (shared_this->conn_event_listener_ != nullptr) { + shared_this->conn_event_listener_ = nullptr; + } + shared_this->pipeline_initialized_ = false; + shared_this->pipeline_->close(); + shared_this->pipeline_.reset(); + ELOG_DEBUG("%s message: Close ended", shared_this->toLog()); + close_promise->set_value(); + }); + return close_promise->get_future(); } boost::future WebRtcConnection::close() { ELOG_DEBUG("%s message: Async close called", toLog()); - std::shared_ptr shared_this = shared_from_this(); - return asyncTask([shared_this] (std::shared_ptr connection) { - shared_this->syncClose(); + std::weak_ptr weak_this = shared_from_this(); + auto task_promise = std::make_shared>(); + worker_->task([weak_this, task_promise] { + if (auto this_ptr = weak_this.lock()) { + this_ptr->syncClose().then([task_promise] (boost::future) { + task_promise->set_value(); + }); + } else { + task_promise->set_value(); + } }); + return task_promise->get_future(); } bool WebRtcConnection::init() { @@ -1154,7 +1171,7 @@ void WebRtcConnection::updateState(TransportState state, Transport * transport) break; case TRANSPORT_FAILED: temp = CONN_FAILED; - sending_ = false; + close(); msg = ""; ELOG_ERROR("%s message: Transport Failed, transportType: %s", toLog(), transport->transport_name.c_str() ); break; diff --git a/erizo/src/erizo/WebRtcConnection.h b/erizo/src/erizo/WebRtcConnection.h index d977d85cd0..71e1e3af1f 100644 --- a/erizo/src/erizo/WebRtcConnection.h +++ b/erizo/src/erizo/WebRtcConnection.h @@ -87,7 +87,7 @@ class WebRtcConnection: public TransportListener, public LogContext, public Hand */ bool init(); boost::future close(); - void syncClose(); + boost::future syncClose(); boost::future setRemoteSdpInfo(std::shared_ptr sdp); diff --git a/erizo/src/test/utils/Mocks.h b/erizo/src/test/utils/Mocks.h index 8dcfb7c0e8..9b8a0343f3 100644 --- a/erizo/src/test/utils/Mocks.h +++ b/erizo/src/test/utils/Mocks.h @@ -74,7 +74,6 @@ class MockTransport: public Transport { void updateIceState(IceState state, IceConnection *conn) override { } void maybeRestartIce(std::string username, std::string password) override { - } void onIceData(packetPtr packet) override { } @@ -86,7 +85,10 @@ class MockTransport: public Transport { } void start() override { } - void close() override { + boost::future close() override { + boost::promise promise; + promise.set_value(); + return promise.get_future(); } };