Skip to content

Commit

Permalink
Add auto subscriptions API (lynckia#1361)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored Mar 8, 2019
1 parent 6bbdcd7 commit 9f2cc2f
Show file tree
Hide file tree
Showing 35 changed files with 1,372 additions and 214 deletions.
1 change: 1 addition & 0 deletions erizo/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ include_directories(${GLIB_INCLUDE_DIRS})
# BOOST
set (BOOST_LIBS thread regex system)
find_package(Boost COMPONENTS ${BOOST_LIBS} REQUIRED)
set(ERIZO_CMAKE_CXX_FLAGS "${ERIZO_CMAKE_CXX_FLAGS} -DBOOST_THREAD_PROVIDES_FUTURE -DBOOST_THREAD_PROVIDES_FUTURE_CONTINUATION -DBOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY")

# GTHREAD
find_library(GTHREAD gthread-2.0 HINTS "${THIRD_PARTY_LIB}")
Expand Down
45 changes: 38 additions & 7 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ MediaStream::MediaStream(std::shared_ptr<Worker> worker,

rate_control_ = 0;
sending_ = true;
ready_ = false;
}

MediaStream::~MediaStream() {
Expand Down Expand Up @@ -125,6 +126,7 @@ void MediaStream::syncClose() {
return;
}
sending_ = false;
ready_ = false;
video_sink_ = nullptr;
audio_sink_ = nullptr;
fb_sink_ = nullptr;
Expand All @@ -142,7 +144,10 @@ void MediaStream::close() {
});
}

bool MediaStream::init() {
bool MediaStream::init(bool doNotWaitForRemoteSdp) {
if (doNotWaitForRemoteSdp) {
ready_ = true;
}
return true;
}

Expand All @@ -155,28 +160,48 @@ bool MediaStream::isSinkSSRC(uint32_t ssrc) {
}

bool MediaStream::setRemoteSdp(std::shared_ptr<SdpInfo> sdp) {
ELOG_DEBUG("%s message: setting remote SDP", toLog());
ELOG_DEBUG("%s message: setting remote SDP to Stream, sending: %d, initialized: %d",
toLog(), sending_, pipeline_initialized_);
if (!sending_) {
return true;
}
remote_sdp_ = std::make_shared<SdpInfo>(*sdp.get());

std::shared_ptr<SdpInfo> remote_sdp = std::make_shared<SdpInfo>(*sdp.get());
auto video_ssrc_list_it = remote_sdp->video_ssrc_map.find(getLabel());
auto audio_ssrc_it = remote_sdp->audio_ssrc_map.find(getLabel());

if (isPublisher() && !ready_) {
bool stream_found = false;

if (video_ssrc_list_it != remote_sdp->video_ssrc_map.end() ||
audio_ssrc_it != remote_sdp->audio_ssrc_map.end()) {
stream_found = true;
}

if (!stream_found) {
return true;
}
}

remote_sdp_ = remote_sdp;

if (remote_sdp_->videoBandwidth != 0) {
ELOG_DEBUG("%s message: Setting remote BW, maxVideoBW: %u", toLog(), remote_sdp_->videoBandwidth);
this->rtcp_processor_->setMaxVideoBW(remote_sdp_->videoBandwidth*1000);
}

ready_ = true;

if (pipeline_initialized_ && pipeline_) {
pipeline_->notifyUpdate();
return true;
}

bundle_ = remote_sdp_->isBundle;
auto video_ssrc_list_it = remote_sdp_->video_ssrc_map.find(getLabel());
if (video_ssrc_list_it != remote_sdp_->video_ssrc_map.end()) {
setVideoSourceSSRCList(video_ssrc_list_it->second);
}

auto audio_ssrc_it = remote_sdp_->audio_ssrc_map.find(getLabel());
if (audio_ssrc_it != remote_sdp_->audio_ssrc_map.end()) {
setAudioSourceSSRC(audio_ssrc_it->second);
}
Expand Down Expand Up @@ -347,6 +372,9 @@ void MediaStream::printStats() {
}

void MediaStream::initializePipeline() {
if (pipeline_initialized_) {
return;
}
handler_manager_ = std::make_shared<HandlerManager>(shared_from_this());
pipeline_->addService(shared_from_this());
pipeline_->addService(handler_manager_);
Expand Down Expand Up @@ -744,13 +772,16 @@ void MediaStream::notifyUpdateToHandlers() {
});
}

void MediaStream::asyncTask(std::function<void(std::shared_ptr<MediaStream>)> f) {
boost::future<void> MediaStream::asyncTask(std::function<void(std::shared_ptr<MediaStream>)> f) {
auto task_promise = std::make_shared<boost::promise<void>>();
std::weak_ptr<MediaStream> weak_this = shared_from_this();
worker_->task([weak_this, f] {
worker_->task([weak_this, f, task_promise] {
if (auto this_ptr = weak_this.lock()) {
f(this_ptr);
}
task_promise->set_value();
});
return task_promise->get_future();
}

void MediaStream::sendPacket(std::shared_ptr<DataPacket> p) {
Expand Down
8 changes: 6 additions & 2 deletions erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#define ERIZO_SRC_ERIZO_MEDIASTREAM_H_

#include <boost/thread/mutex.hpp>
#include <boost/thread/future.hpp>

#include <atomic>
#include <string>
Expand Down Expand Up @@ -68,7 +69,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
* Destructor.
*/
virtual ~MediaStream();
bool init();
bool init(bool doNotWaitForRemoteSdp);
void close() override;
virtual uint32_t getMaxVideoBW();
virtual uint32_t getBitrateFromMaxQualityLayer() { return bitrate_from_max_quality_layer_; }
Expand Down Expand Up @@ -128,7 +129,8 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,

void notifyToEventSink(MediaEventPtr event);

void asyncTask(std::function<void(std::shared_ptr<MediaStream>)> f);

boost::future<void> asyncTask(std::function<void(std::shared_ptr<MediaStream>)> f);

void initializeStats();
void printStats();
Expand All @@ -155,6 +157,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,

bool isPipelineInitialized() { return pipeline_initialized_; }
bool isRunning() { return pipeline_initialized_ && sending_; }
bool isReady() { return ready_; }
Pipeline::Ptr getPipeline() { return pipeline_; }
bool isPublisher() { return is_publisher_; }
void setBitrateFromMaxQualityLayer(uint64_t bitrate) { bitrate_from_max_quality_layer_ = bitrate; }
Expand Down Expand Up @@ -186,6 +189,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
bool should_send_feedback_;
bool slide_show_mode_;
bool sending_;
bool ready_;
int bundle_;

uint32_t rate_control_; // Target bitrate for hacky rate control in BPS
Expand Down
5 changes: 3 additions & 2 deletions erizo/src/erizo/NicerConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ void NicerConnection::async(function<void(std::shared_ptr<NicerConnection>)> f)
}

void NicerConnection::start() {
ufrag_ = getNewUfrag();
upass_ = getNewPwd();

async([] (std::shared_ptr<NicerConnection> this_ptr) {
this_ptr->startSync();
});
Expand All @@ -170,8 +173,6 @@ void NicerConnection::start() {
void NicerConnection::startSync() {
UINT4 flags = NR_ICE_CTX_FLAGS_AGGRESSIVE_NOMINATION;

ufrag_ = getNewUfrag();
upass_ = getNewPwd();
if (ufrag_.empty() || upass_.empty()) {
start_promise_.set_value();
return;
Expand Down
24 changes: 20 additions & 4 deletions erizo/src/erizo/SdpInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ namespace erizo {
isRtcpMux = false;
isFingerprint = false;
dtlsRole = ACTPASS;
internal_dtls_role = ACTPASS;
hasAudio = false;
hasVideo = false;
profile = SAVPF;
Expand Down Expand Up @@ -472,8 +473,12 @@ namespace erizo {

// TODO(pedro): Should provide hints
void SdpInfo::createOfferSdp(bool videoEnabled, bool audioEnabled, bool bundle) {
ELOG_DEBUG("Creating offerSDP: video %d, audio %d, bundle %d", videoEnabled, audioEnabled, bundle);
this->payloadVector = internalPayloadVector_;
ELOG_DEBUG("Creating offerSDP: video %d, audio %d, bundle %d, payloadVector: %d, extSize: %d",
videoEnabled, audioEnabled, bundle, payloadVector.size(), extMapVector.size());
if (payloadVector.size() == 0) {
payloadVector = internalPayloadVector_;
}

this->isBundle = bundle;
this->profile = SAVPF;
this->isRtcpMux = true;
Expand All @@ -482,8 +487,8 @@ namespace erizo {
if (audioEnabled)
this->audioSdpMLine = 0;

for (unsigned int it = 0; it < internalPayloadVector_.size(); it++) {
RtpMap& rtp = internalPayloadVector_[it];
for (unsigned int it = 0; it < payloadVector.size(); it++) {
RtpMap& rtp = payloadVector[it];
if (rtp.media_type == VIDEO_TYPE) {
videoCodecs++;
} else if (rtp.media_type == AUDIO_TYPE) {
Expand All @@ -498,6 +503,17 @@ namespace erizo {
ELOG_DEBUG("Setting Offer SDP");
}

void SdpInfo::copyInfoFromSdp(std::shared_ptr<SdpInfo> offerSdp) {
payloadVector = offerSdp->payloadVector;
videoCodecs = offerSdp->videoCodecs;
audioCodecs = offerSdp->audioCodecs;
inOutPTMap = offerSdp->inOutPTMap;
outInPTMap = offerSdp->outInPTMap;
extMapVector = offerSdp->extMapVector;
ELOG_DEBUG("Offer SDP successfully copied, extSize: %d, payloadSize: %d, videoCodecs: %d, audioCodecs: %d",
extMapVector.size(), payloadVector.size(), videoCodecs, audioCodecs);
}

void SdpInfo::setOfferSdp(std::shared_ptr<SdpInfo> offerSdp) {
this->videoCodecs = offerSdp->videoCodecs;
this->audioCodecs = offerSdp->audioCodecs;
Expand Down
6 changes: 6 additions & 0 deletions erizo/src/erizo/SdpInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ class SdpInfo {
bool supportPayloadType(const unsigned int payloadType);

void createOfferSdp(bool videoEnabled, bool audioEnabled, bool bundle);

void copyInfoFromSdp(std::shared_ptr<SdpInfo> offerSdp);
/**
* @brief copies relevant information from the offer sdp for which this will be an answer sdp
* @param offerSdp The offer SDP as received via signaling and parsed
Expand Down Expand Up @@ -295,6 +297,10 @@ class SdpInfo {
*/
DtlsRole dtlsRole;
/**
* Internal DTLS Role
*/
DtlsRole internal_dtls_role;
/**
* Mapping from internal PT (key) to external PT (value)
*/
std::map<unsigned int, unsigned int> inOutPTMap;
Expand Down
Loading

0 comments on commit 9f2cc2f

Please sign in to comment.