Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add full simulcast support to recordings #1127

Merged
merged 5 commits into from
Jan 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ bool MediaStream::setLocalSdp(std::shared_ptr<SdpInfo> sdp) {
}

void MediaStream::initializePipeline() {
handler_manager_ = std::make_shared<HandlerManager>(shared_from_this());
pipeline_->addService(shared_from_this());
pipeline_->addService(handler_manager_);
pipeline_->addService(rtcp_processor_);
pipeline_->addService(stats_);
pipeline_->addService(quality_manager_);
Expand Down
6 changes: 4 additions & 2 deletions erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "rtp/RtpExtensionProcessor.h"
#include "lib/Clock.h"
#include "pipeline/Handler.h"
#include "pipeline/HandlerManager.h"
#include "pipeline/Service.h"
#include "rtp/QualityManager.h"
#include "rtp/PacketBufferService.h"
Expand All @@ -37,7 +38,7 @@ class MediaStreamStatsListener {
* A MediaStream. This class represents a Media Stream that can be established with other peers via a SDP negotiation
*/
class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
public FeedbackSource, public LogContext,
public FeedbackSource, public LogContext, public HandlerManagerListener,
public std::enable_shared_from_this<MediaStream>, public Service {
DECLARE_LOGGER();

Expand Down Expand Up @@ -99,7 +100,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,

void enableHandler(const std::string &name);
void disableHandler(const std::string &name);
void notifyUpdateToHandlers();
void notifyUpdateToHandlers() override;

void notifyToEventSink(MediaEventPtr event);

Expand Down Expand Up @@ -157,6 +158,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
std::shared_ptr<Stats> stats_;
std::shared_ptr<QualityManager> quality_manager_;
std::shared_ptr<PacketBufferService> packet_buffer_;
std::shared_ptr<HandlerManager> handler_manager_;

Pipeline::Ptr pipeline_;

Expand Down
99 changes: 84 additions & 15 deletions erizo/src/erizo/media/ExternalOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,23 @@
#include "rtp/RtpHeaders.h"
#include "rtp/RtpVP8Parser.h"

#include "rtp/QualityFilterHandler.h"
#include "rtp/LayerBitrateCalculationHandler.h"

using std::memcpy;

namespace erizo {

DEFINE_LOGGER(ExternalOutput, "media.ExternalOutput");
ExternalOutput::ExternalOutput(const std::string& output_url, const std::vector<RtpMap> rtp_mappings)
: audio_queue_{5.0, 10.0}, video_queue_{5.0, 10.0}, inited_{false}, video_stream_{nullptr},
ExternalOutput::ExternalOutput(std::shared_ptr<Worker> worker, const std::string& output_url,
const std::vector<RtpMap> rtp_mappings)
: worker_{worker}, pipeline_{Pipeline::create()}, audio_queue_{5.0, 10.0}, video_queue_{5.0, 10.0},
inited_{false}, video_stream_{nullptr},
audio_stream_{nullptr}, video_source_ssrc_{0},
first_video_timestamp_{-1}, first_audio_timestamp_{-1},
first_data_received_{}, video_offset_ms_{-1}, audio_offset_ms_{-1},
need_to_send_fir_{true}, rtp_mappings_{rtp_mappings}, video_codec_{AV_CODEC_ID_NONE},
audio_codec_{AV_CODEC_ID_NONE} {
audio_codec_{AV_CODEC_ID_NONE}, pipeline_initialized_{false} {
ELOG_DEBUG("Creating output to %s", output_url.c_str());

fb_sink_ = nullptr;
Expand All @@ -33,6 +38,8 @@ ExternalOutput::ExternalOutput(const std::string& output_url, const std::vector<
avcodec_register_all();

fec_receiver_.reset(webrtc::UlpfecReceiver::Create(this));
stats_ = std::make_shared<Stats>();
quality_manager_ = std::make_shared<QualityManager>();

for (auto rtp_map : rtp_mappings_) {
switch (rtp_map.media_type) {
Expand Down Expand Up @@ -64,25 +71,33 @@ bool ExternalOutput::init() {
MediaInfo m;
m.hasVideo = false;
m.hasAudio = false;
thread_ = boost::thread(&ExternalOutput::sendLoop, this);
recording_ = true;
asyncTask([] (std::shared_ptr<ExternalOutput> output) {
output->initializePipeline();
});
thread_ = boost::thread(&ExternalOutput::sendLoop, this);
ELOG_DEBUG("Initialized successfully");
return true;
}


ExternalOutput::~ExternalOutput() {
ELOG_DEBUG("Destructing");
close();
}

void ExternalOutput::close() {
std::shared_ptr<ExternalOutput> shared_this = shared_from_this();
asyncTask([shared_this] (std::shared_ptr<ExternalOutput> connection) {
shared_this->syncClose();
});
}

void ExternalOutput::syncClose() {
if (!recording_) {
return;
}
// Stop our thread so we can safely nuke libav stuff and close our
// our file.
recording_ = false;
cond_.notify_one();
thread_.join();

Expand All @@ -104,9 +119,21 @@ void ExternalOutput::close() {
context_ = nullptr;
}

pipeline_initialized_ = false;
recording_ = false;

ELOG_DEBUG("Closed Successfully");
}

void ExternalOutput::asyncTask(std::function<void(std::shared_ptr<ExternalOutput>)> f) {
std::weak_ptr<ExternalOutput> weak_this = shared_from_this();
worker_->task([weak_this, f] {
if (auto this_ptr = weak_this.lock()) {
f(this_ptr);
}
});
}

void ExternalOutput::receiveRawData(const RawDataPacket& /*packet*/) {
return;
}
Expand Down Expand Up @@ -265,28 +292,70 @@ void ExternalOutput::maybeWriteVideoPacket(char* buf, int len) {
}
}

void ExternalOutput::notifyUpdateToHandlers() {
asyncTask([] (std::shared_ptr<ExternalOutput> output) {
output->pipeline_->notifyUpdate();
});
}

void ExternalOutput::initializePipeline() {
stats_->getNode()["total"].insertStat("senderBitrateEstimation",
CumulativeStat{static_cast<uint64_t>(kExternalOutputMaxBitrate)});

handler_manager_ = std::make_shared<HandlerManager>(shared_from_this());
pipeline_->addService(handler_manager_);
pipeline_->addService(quality_manager_);
pipeline_->addService(stats_);

pipeline_->addFront(LayerBitrateCalculationHandler());
pipeline_->addFront(QualityFilterHandler());

pipeline_->addFront(ExternalOuputWriter(shared_from_this()));
pipeline_->finalize();
pipeline_initialized_ = true;
}

void ExternalOutput::write(std::shared_ptr<DataPacket> packet) {
queueData(packet->data, packet->length, packet->type);
}

void ExternalOutput::queueDataAsync(std::shared_ptr<DataPacket> copied_packet) {
asyncTask([copied_packet] (std::shared_ptr<ExternalOutput> this_ptr) {
if (!this_ptr->pipeline_initialized_) {
return;
}
this_ptr->pipeline_->write(std::move(copied_packet));
});
}

int ExternalOutput::deliverAudioData_(std::shared_ptr<DataPacket> audio_packet) {
std::shared_ptr<DataPacket> copied_packet = std::make_shared<DataPacket>(*audio_packet);
queueData(copied_packet->data, copied_packet->length, AUDIO_PACKET);
copied_packet->type = AUDIO_PACKET;
queueDataAsync(copied_packet);
return 0;
}

int ExternalOutput::deliverVideoData_(std::shared_ptr<DataPacket> video_packet) {
std::shared_ptr<DataPacket> copied_packet = std::make_shared<DataPacket>(*video_packet);
// TODO(javierc): We should support higher layers, but it requires having an entire pipeline at this point
if (!video_packet->belongsToSpatialLayer(0)) {
return 0;
}
if (video_source_ssrc_ == 0) {
RtpHeader* h = reinterpret_cast<RtpHeader*>(copied_packet->data);
RtpHeader* h = reinterpret_cast<RtpHeader*>(video_packet->data);
video_source_ssrc_ = h->getSSRC();
}
queueData(copied_packet->data, copied_packet->length, VIDEO_PACKET);

std::shared_ptr<DataPacket> copied_packet = std::make_shared<DataPacket>(*video_packet);
copied_packet->type = VIDEO_PACKET;
queueDataAsync(copied_packet);
return 0;
}

int ExternalOutput::deliverEvent_(MediaEventPtr event) {
return 0;
auto output_ptr = shared_from_this();
worker_->task([output_ptr, event]{
if (!output_ptr->pipeline_initialized_) {
return;
}
output_ptr->pipeline_->notifyEvent(event);
});
return 1;
}

bool ExternalOutput::initContext() {
Expand Down
64 changes: 53 additions & 11 deletions erizo/src/erizo/media/ExternalOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,32 @@ extern "C" {
#include <string>

#include "./MediaDefinitions.h"
#include "thread/Worker.h"
#include "rtp/RtpPacketQueue.h"
#include "webrtc/modules/rtp_rtcp/source/ulpfec_receiver_impl.h"
#include "media/MediaProcessor.h"
#include "media/Depacketizer.h"
#include "./Stats.h"
#include "lib/Clock.h"
#include "SdpInfo.h"
#include "rtp/QualityManager.h"
#include "pipeline/Handler.h"
#include "pipeline/HandlerManager.h"

#include "./logger.h"

namespace erizo {

class MediaStream;
static constexpr uint64_t kExternalOutputMaxBitrate = 1000000000;

// Our search state for VP8 frames.
enum vp8SearchState {
kLookingForStart,
kLookingForEnd
};

class ExternalOutput : public MediaSink, public RawDataReceiver, public FeedbackSource, public webrtc::RtpData {
class ExternalOutput : public MediaSink, public RawDataReceiver, public FeedbackSource,
public webrtc::RtpData, public HandlerManagerListener,
public std::enable_shared_from_this<ExternalOutput> {
DECLARE_LOGGER();

public:
explicit ExternalOutput(const std::string& output_url, const std::vector<RtpMap> rtp_mappings);
explicit ExternalOutput(std::shared_ptr<Worker> worker, const std::string& output_url,
const std::vector<RtpMap> rtp_mappings);
virtual ~ExternalOutput();
bool init();
void receiveRawData(const RawDataPacket& packet) override;
Expand All @@ -47,10 +49,18 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback

void close() override;

void write(std::shared_ptr<DataPacket> packet);

void notifyUpdateToHandlers() override;

bool isRecording() { return recording_; }

private:
std::shared_ptr<Worker> worker_;
Pipeline::Ptr pipeline_;
std::unique_ptr<webrtc::UlpfecReceiver> fec_receiver_;
RtpPacketQueue audio_queue_, video_queue_;
bool recording_, inited_;
std::atomic<bool> recording_, inited_;
boost::mutex mtx_; // a mutex we use to signal our writer thread that data is waiting.
boost::thread thread_;
boost::condition_variable cond_;
Expand Down Expand Up @@ -93,7 +103,6 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback
// Note: VP8 purportedly has two packetization schemes; per-frame and per-partition. A frame is
// composed of one or more partitions. However, we don't seem to be sent anything but partition 0
// so the second scheme seems not applicable. Too bad.
vp8SearchState video_search_state_;
bool need_to_send_fir_;
std::vector<RtpMap> rtp_mappings_;
enum AVCodecID video_codec_;
Expand All @@ -102,10 +111,16 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback
std::map<uint, RtpMap> audio_maps_;
RtpMap video_map_;
RtpMap audio_map_;
bool pipeline_initialized_;
std::shared_ptr<Stats> stats_;
std::shared_ptr<QualityManager> quality_manager_;
std::shared_ptr<HandlerManager> handler_manager_;

bool initContext();
int sendFirPacket();
void asyncTask(std::function<void(std::shared_ptr<ExternalOutput>)> f);
void queueData(char* buffer, int length, packetType type);
void queueDataAsync(std::shared_ptr<DataPacket> copied_packet);
void sendLoop();
int deliverAudioData_(std::shared_ptr<DataPacket> audio_packet) override;
int deliverVideoData_(std::shared_ptr<DataPacket> video_packet) override;
Expand All @@ -115,6 +130,33 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback
void updateVideoCodec(RtpMap map);
void updateAudioCodec(RtpMap map);
void maybeWriteVideoPacket(char* buf, int len);
void initializePipeline();
void syncClose();
};

class ExternalOuputWriter : public OutboundHandler {
public:
explicit ExternalOuputWriter(std::shared_ptr<ExternalOutput> output) : output_{output} {}

void enable() override {}
void disable() override {}

std::string getName() override {
return "writer";
}

void write(Context *ctx, std::shared_ptr<DataPacket> packet) override {
if (auto output = output_.lock()) {
output->write(std::move(packet));
}
}

void notifyUpdate() override {
}

private:
std::weak_ptr<ExternalOutput> output_;
};

} // namespace erizo
#endif // ERIZO_SRC_ERIZO_MEDIA_EXTERNALOUTPUT_H_
30 changes: 30 additions & 0 deletions erizo/src/erizo/pipeline/HandlerManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef ERIZO_SRC_ERIZO_PIPELINE_HANDLERMANAGER_H_
#define ERIZO_SRC_ERIZO_PIPELINE_HANDLERMANAGER_H_

#include "pipeline/Service.h"

namespace erizo {

class HandlerManagerListener {
public:
virtual ~HandlerManagerListener() = default;

virtual void notifyUpdateToHandlers() = 0;
};

class HandlerManager : public Service {
public:
explicit HandlerManager(std::weak_ptr<HandlerManagerListener> listener) : listener_{listener} {}
virtual ~HandlerManager() = default;

void notifyUpdateToHandlers() {
if (auto listener = listener_.lock()) {
listener->notifyUpdateToHandlers();
}
}
private:
std::weak_ptr<HandlerManagerListener> listener_;
};
} // namespace erizo

#endif // ERIZO_SRC_ERIZO_PIPELINE_HANDLERMANAGER_H_
11 changes: 5 additions & 6 deletions erizo/src/erizo/rtp/QualityFilterHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,18 @@ void QualityFilterHandler::notifyUpdate() {
max_video_bw_ = processor->getMaxVideoBW();
}

if (initialized_) {
return;
stream_ = pipeline->getService<MediaStream>().get();
if (stream_) {
video_sink_ssrc_ = stream_->getVideoSinkSSRC();
video_source_ssrc_ = stream_->getVideoSourceSSRC();
}

stream_ = pipeline->getService<MediaStream>().get();
if (!stream_) {
if (initialized_) {
return;
}

quality_manager_ = pipeline->getService<QualityManager>();

video_sink_ssrc_ = stream_->getVideoSinkSSRC();
video_source_ssrc_ = stream_->getVideoSourceSSRC();
initialized_ = true;
}
} // namespace erizo
Loading