Skip to content

Commit

Permalink
Add full simulcast support to recordings (lynckia#1127)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored Jan 22, 2018
1 parent a3ea764 commit fd9a321
Show file tree
Hide file tree
Showing 11 changed files with 266 additions and 99 deletions.
2 changes: 2 additions & 0 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ bool MediaStream::setLocalSdp(std::shared_ptr<SdpInfo> sdp) {
}

void MediaStream::initializePipeline() {
handler_manager_ = std::make_shared<HandlerManager>(shared_from_this());
pipeline_->addService(shared_from_this());
pipeline_->addService(handler_manager_);
pipeline_->addService(rtcp_processor_);
pipeline_->addService(stats_);
pipeline_->addService(quality_manager_);
Expand Down
6 changes: 4 additions & 2 deletions erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "rtp/RtpExtensionProcessor.h"
#include "lib/Clock.h"
#include "pipeline/Handler.h"
#include "pipeline/HandlerManager.h"
#include "pipeline/Service.h"
#include "rtp/QualityManager.h"
#include "rtp/PacketBufferService.h"
Expand All @@ -37,7 +38,7 @@ class MediaStreamStatsListener {
* A MediaStream. This class represents a Media Stream that can be established with other peers via a SDP negotiation
*/
class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
public FeedbackSource, public LogContext,
public FeedbackSource, public LogContext, public HandlerManagerListener,
public std::enable_shared_from_this<MediaStream>, public Service {
DECLARE_LOGGER();

Expand Down Expand Up @@ -99,7 +100,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,

void enableHandler(const std::string &name);
void disableHandler(const std::string &name);
void notifyUpdateToHandlers();
void notifyUpdateToHandlers() override;

void notifyToEventSink(MediaEventPtr event);

Expand Down Expand Up @@ -157,6 +158,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
std::shared_ptr<Stats> stats_;
std::shared_ptr<QualityManager> quality_manager_;
std::shared_ptr<PacketBufferService> packet_buffer_;
std::shared_ptr<HandlerManager> handler_manager_;

Pipeline::Ptr pipeline_;

Expand Down
99 changes: 84 additions & 15 deletions erizo/src/erizo/media/ExternalOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,23 @@
#include "rtp/RtpHeaders.h"
#include "rtp/RtpVP8Parser.h"

#include "rtp/QualityFilterHandler.h"
#include "rtp/LayerBitrateCalculationHandler.h"

using std::memcpy;

namespace erizo {

DEFINE_LOGGER(ExternalOutput, "media.ExternalOutput");
ExternalOutput::ExternalOutput(const std::string& output_url, const std::vector<RtpMap> rtp_mappings)
: audio_queue_{5.0, 10.0}, video_queue_{5.0, 10.0}, inited_{false}, video_stream_{nullptr},
ExternalOutput::ExternalOutput(std::shared_ptr<Worker> worker, const std::string& output_url,
const std::vector<RtpMap> rtp_mappings)
: worker_{worker}, pipeline_{Pipeline::create()}, audio_queue_{5.0, 10.0}, video_queue_{5.0, 10.0},
inited_{false}, video_stream_{nullptr},
audio_stream_{nullptr}, video_source_ssrc_{0},
first_video_timestamp_{-1}, first_audio_timestamp_{-1},
first_data_received_{}, video_offset_ms_{-1}, audio_offset_ms_{-1},
need_to_send_fir_{true}, rtp_mappings_{rtp_mappings}, video_codec_{AV_CODEC_ID_NONE},
audio_codec_{AV_CODEC_ID_NONE} {
audio_codec_{AV_CODEC_ID_NONE}, pipeline_initialized_{false} {
ELOG_DEBUG("Creating output to %s", output_url.c_str());

fb_sink_ = nullptr;
Expand All @@ -33,6 +38,8 @@ ExternalOutput::ExternalOutput(const std::string& output_url, const std::vector<
avcodec_register_all();

fec_receiver_.reset(webrtc::UlpfecReceiver::Create(this));
stats_ = std::make_shared<Stats>();
quality_manager_ = std::make_shared<QualityManager>();

for (auto rtp_map : rtp_mappings_) {
switch (rtp_map.media_type) {
Expand Down Expand Up @@ -64,25 +71,33 @@ bool ExternalOutput::init() {
MediaInfo m;
m.hasVideo = false;
m.hasAudio = false;
thread_ = boost::thread(&ExternalOutput::sendLoop, this);
recording_ = true;
asyncTask([] (std::shared_ptr<ExternalOutput> output) {
output->initializePipeline();
});
thread_ = boost::thread(&ExternalOutput::sendLoop, this);
ELOG_DEBUG("Initialized successfully");
return true;
}


ExternalOutput::~ExternalOutput() {
ELOG_DEBUG("Destructing");
close();
}

void ExternalOutput::close() {
std::shared_ptr<ExternalOutput> shared_this = shared_from_this();
asyncTask([shared_this] (std::shared_ptr<ExternalOutput> connection) {
shared_this->syncClose();
});
}

void ExternalOutput::syncClose() {
if (!recording_) {
return;
}
// Stop our thread so we can safely nuke libav stuff and close our
// our file.
recording_ = false;
cond_.notify_one();
thread_.join();

Expand All @@ -104,9 +119,21 @@ void ExternalOutput::close() {
context_ = nullptr;
}

pipeline_initialized_ = false;
recording_ = false;

ELOG_DEBUG("Closed Successfully");
}

void ExternalOutput::asyncTask(std::function<void(std::shared_ptr<ExternalOutput>)> f) {
std::weak_ptr<ExternalOutput> weak_this = shared_from_this();
worker_->task([weak_this, f] {
if (auto this_ptr = weak_this.lock()) {
f(this_ptr);
}
});
}

void ExternalOutput::receiveRawData(const RawDataPacket& /*packet*/) {
return;
}
Expand Down Expand Up @@ -265,28 +292,70 @@ void ExternalOutput::maybeWriteVideoPacket(char* buf, int len) {
}
}

void ExternalOutput::notifyUpdateToHandlers() {
asyncTask([] (std::shared_ptr<ExternalOutput> output) {
output->pipeline_->notifyUpdate();
});
}

void ExternalOutput::initializePipeline() {
stats_->getNode()["total"].insertStat("senderBitrateEstimation",
CumulativeStat{static_cast<uint64_t>(kExternalOutputMaxBitrate)});

handler_manager_ = std::make_shared<HandlerManager>(shared_from_this());
pipeline_->addService(handler_manager_);
pipeline_->addService(quality_manager_);
pipeline_->addService(stats_);

pipeline_->addFront(LayerBitrateCalculationHandler());
pipeline_->addFront(QualityFilterHandler());

pipeline_->addFront(ExternalOuputWriter(shared_from_this()));
pipeline_->finalize();
pipeline_initialized_ = true;
}

void ExternalOutput::write(std::shared_ptr<DataPacket> packet) {
queueData(packet->data, packet->length, packet->type);
}

void ExternalOutput::queueDataAsync(std::shared_ptr<DataPacket> copied_packet) {
asyncTask([copied_packet] (std::shared_ptr<ExternalOutput> this_ptr) {
if (!this_ptr->pipeline_initialized_) {
return;
}
this_ptr->pipeline_->write(std::move(copied_packet));
});
}

int ExternalOutput::deliverAudioData_(std::shared_ptr<DataPacket> audio_packet) {
std::shared_ptr<DataPacket> copied_packet = std::make_shared<DataPacket>(*audio_packet);
queueData(copied_packet->data, copied_packet->length, AUDIO_PACKET);
copied_packet->type = AUDIO_PACKET;
queueDataAsync(copied_packet);
return 0;
}

int ExternalOutput::deliverVideoData_(std::shared_ptr<DataPacket> video_packet) {
std::shared_ptr<DataPacket> copied_packet = std::make_shared<DataPacket>(*video_packet);
// TODO(javierc): We should support higher layers, but it requires having an entire pipeline at this point
if (!video_packet->belongsToSpatialLayer(0)) {
return 0;
}
if (video_source_ssrc_ == 0) {
RtpHeader* h = reinterpret_cast<RtpHeader*>(copied_packet->data);
RtpHeader* h = reinterpret_cast<RtpHeader*>(video_packet->data);
video_source_ssrc_ = h->getSSRC();
}
queueData(copied_packet->data, copied_packet->length, VIDEO_PACKET);

std::shared_ptr<DataPacket> copied_packet = std::make_shared<DataPacket>(*video_packet);
copied_packet->type = VIDEO_PACKET;
queueDataAsync(copied_packet);
return 0;
}

int ExternalOutput::deliverEvent_(MediaEventPtr event) {
return 0;
auto output_ptr = shared_from_this();
worker_->task([output_ptr, event]{
if (!output_ptr->pipeline_initialized_) {
return;
}
output_ptr->pipeline_->notifyEvent(event);
});
return 1;
}

bool ExternalOutput::initContext() {
Expand Down
64 changes: 53 additions & 11 deletions erizo/src/erizo/media/ExternalOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,32 @@ extern "C" {
#include <string>

#include "./MediaDefinitions.h"
#include "thread/Worker.h"
#include "rtp/RtpPacketQueue.h"
#include "webrtc/modules/rtp_rtcp/source/ulpfec_receiver_impl.h"
#include "media/MediaProcessor.h"
#include "media/Depacketizer.h"
#include "./Stats.h"
#include "lib/Clock.h"
#include "SdpInfo.h"
#include "rtp/QualityManager.h"
#include "pipeline/Handler.h"
#include "pipeline/HandlerManager.h"

#include "./logger.h"

namespace erizo {

class MediaStream;
static constexpr uint64_t kExternalOutputMaxBitrate = 1000000000;

// Our search state for VP8 frames.
enum vp8SearchState {
kLookingForStart,
kLookingForEnd
};

class ExternalOutput : public MediaSink, public RawDataReceiver, public FeedbackSource, public webrtc::RtpData {
class ExternalOutput : public MediaSink, public RawDataReceiver, public FeedbackSource,
public webrtc::RtpData, public HandlerManagerListener,
public std::enable_shared_from_this<ExternalOutput> {
DECLARE_LOGGER();

public:
explicit ExternalOutput(const std::string& output_url, const std::vector<RtpMap> rtp_mappings);
explicit ExternalOutput(std::shared_ptr<Worker> worker, const std::string& output_url,
const std::vector<RtpMap> rtp_mappings);
virtual ~ExternalOutput();
bool init();
void receiveRawData(const RawDataPacket& packet) override;
Expand All @@ -47,10 +49,18 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback

void close() override;

void write(std::shared_ptr<DataPacket> packet);

void notifyUpdateToHandlers() override;

bool isRecording() { return recording_; }

private:
std::shared_ptr<Worker> worker_;
Pipeline::Ptr pipeline_;
std::unique_ptr<webrtc::UlpfecReceiver> fec_receiver_;
RtpPacketQueue audio_queue_, video_queue_;
bool recording_, inited_;
std::atomic<bool> recording_, inited_;
boost::mutex mtx_; // a mutex we use to signal our writer thread that data is waiting.
boost::thread thread_;
boost::condition_variable cond_;
Expand Down Expand Up @@ -93,7 +103,6 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback
// Note: VP8 purportedly has two packetization schemes; per-frame and per-partition. A frame is
// composed of one or more partitions. However, we don't seem to be sent anything but partition 0
// so the second scheme seems not applicable. Too bad.
vp8SearchState video_search_state_;
bool need_to_send_fir_;
std::vector<RtpMap> rtp_mappings_;
enum AVCodecID video_codec_;
Expand All @@ -102,10 +111,16 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback
std::map<uint, RtpMap> audio_maps_;
RtpMap video_map_;
RtpMap audio_map_;
bool pipeline_initialized_;
std::shared_ptr<Stats> stats_;
std::shared_ptr<QualityManager> quality_manager_;
std::shared_ptr<HandlerManager> handler_manager_;

bool initContext();
int sendFirPacket();
void asyncTask(std::function<void(std::shared_ptr<ExternalOutput>)> f);
void queueData(char* buffer, int length, packetType type);
void queueDataAsync(std::shared_ptr<DataPacket> copied_packet);
void sendLoop();
int deliverAudioData_(std::shared_ptr<DataPacket> audio_packet) override;
int deliverVideoData_(std::shared_ptr<DataPacket> video_packet) override;
Expand All @@ -115,6 +130,33 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback
void updateVideoCodec(RtpMap map);
void updateAudioCodec(RtpMap map);
void maybeWriteVideoPacket(char* buf, int len);
void initializePipeline();
void syncClose();
};

class ExternalOuputWriter : public OutboundHandler {
public:
explicit ExternalOuputWriter(std::shared_ptr<ExternalOutput> output) : output_{output} {}

void enable() override {}
void disable() override {}

std::string getName() override {
return "writer";
}

void write(Context *ctx, std::shared_ptr<DataPacket> packet) override {
if (auto output = output_.lock()) {
output->write(std::move(packet));
}
}

void notifyUpdate() override {
}

private:
std::weak_ptr<ExternalOutput> output_;
};

} // namespace erizo
#endif // ERIZO_SRC_ERIZO_MEDIA_EXTERNALOUTPUT_H_
30 changes: 30 additions & 0 deletions erizo/src/erizo/pipeline/HandlerManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef ERIZO_SRC_ERIZO_PIPELINE_HANDLERMANAGER_H_
#define ERIZO_SRC_ERIZO_PIPELINE_HANDLERMANAGER_H_

#include "pipeline/Service.h"

namespace erizo {

class HandlerManagerListener {
public:
virtual ~HandlerManagerListener() = default;

virtual void notifyUpdateToHandlers() = 0;
};

class HandlerManager : public Service {
public:
explicit HandlerManager(std::weak_ptr<HandlerManagerListener> listener) : listener_{listener} {}
virtual ~HandlerManager() = default;

void notifyUpdateToHandlers() {
if (auto listener = listener_.lock()) {
listener->notifyUpdateToHandlers();
}
}
private:
std::weak_ptr<HandlerManagerListener> listener_;
};
} // namespace erizo

#endif // ERIZO_SRC_ERIZO_PIPELINE_HANDLERMANAGER_H_
11 changes: 5 additions & 6 deletions erizo/src/erizo/rtp/QualityFilterHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,18 @@ void QualityFilterHandler::notifyUpdate() {
max_video_bw_ = processor->getMaxVideoBW();
}

if (initialized_) {
return;
stream_ = pipeline->getService<MediaStream>().get();
if (stream_) {
video_sink_ssrc_ = stream_->getVideoSinkSSRC();
video_source_ssrc_ = stream_->getVideoSourceSSRC();
}

stream_ = pipeline->getService<MediaStream>().get();
if (!stream_) {
if (initialized_) {
return;
}

quality_manager_ = pipeline->getService<QualityManager>();

video_sink_ssrc_ = stream_->getVideoSinkSSRC();
video_source_ssrc_ = stream_->getVideoSourceSSRC();
initialized_ = true;
}
} // namespace erizo
Loading

0 comments on commit fd9a321

Please sign in to comment.