From f38a596692f4965fdd20ce15ce5d93f3eabbd786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Cervi=C3=B1o?= Date: Fri, 3 Mar 2017 14:25:29 +0100 Subject: [PATCH] Add a handler to control PLI sending rate (#788) --- erizo/src/erizo/WebRtcConnection.cpp | 2 + erizo/src/erizo/rtp/PliPacerHandler.cpp | 87 +++++++++++ erizo/src/erizo/rtp/PliPacerHandler.h | 54 +++++++ erizo/src/erizo/rtp/RtpHeaders.h | 14 ++ erizo/src/erizo/rtp/RtpUtils.cpp | 39 ++++- erizo/src/erizo/rtp/RtpUtils.h | 6 + erizo/src/test/rtp/PliPacerHandlerTest.cpp | 143 ++++++++++++++++++ erizo/src/test/utils/Matchers.h | 12 ++ erizo/src/test/utils/Tools.h | 27 +++- .../erizoAgent/log4cxx.properties | 3 +- 10 files changed, 377 insertions(+), 10 deletions(-) create mode 100644 erizo/src/erizo/rtp/PliPacerHandler.cpp create mode 100644 erizo/src/erizo/rtp/PliPacerHandler.h create mode 100644 erizo/src/test/rtp/PliPacerHandlerTest.cpp diff --git a/erizo/src/erizo/WebRtcConnection.cpp b/erizo/src/erizo/WebRtcConnection.cpp index 7b160bd98a..e8855044ba 100644 --- a/erizo/src/erizo/WebRtcConnection.cpp +++ b/erizo/src/erizo/WebRtcConnection.cpp @@ -29,6 +29,7 @@ #include "rtp/LayerDetectorHandler.h" #include "rtp/QualityFilterHandler.h" #include "rtp/QualityManager.h" +#include "rtp/PliPacerHandler.h" namespace erizo { DEFINE_LOGGER(WebRtcConnection, "WebRtcConnection"); @@ -256,6 +257,7 @@ void WebRtcConnection::initializePipeline() { pipeline_->addFront(QualityFilterHandler()); pipeline_->addFront(RtpAudioMuteHandler()); pipeline_->addFront(RtpSlideShowHandler()); + pipeline_->addFront(PliPacerHandler()); pipeline_->addFront(BandwidthEstimationHandler()); pipeline_->addFront(RtcpFeedbackGenerationHandler()); pipeline_->addFront(RtpRetransmissionHandler()); diff --git a/erizo/src/erizo/rtp/PliPacerHandler.cpp b/erizo/src/erizo/rtp/PliPacerHandler.cpp new file mode 100644 index 0000000000..7c4296f010 --- /dev/null +++ b/erizo/src/erizo/rtp/PliPacerHandler.cpp @@ -0,0 +1,87 @@ +#include "rtp/PliPacerHandler.h" + +#include "rtp/RtpUtils.h" +#include "./MediaDefinitions.h" +#include "./WebRtcConnection.h" + +namespace erizo { + +DEFINE_LOGGER(PliPacerHandler, "rtp.PliPacerHandler"); + +constexpr duration PliPacerHandler::kMinPLIPeriod; +constexpr duration PliPacerHandler::kKeyframeTimeout; + +PliPacerHandler::PliPacerHandler(std::shared_ptr the_clock) + : enabled_{true}, connection_{nullptr}, clock_{the_clock}, time_last_keyframe_{clock_->now()}, + waiting_for_keyframe_{false}, scheduled_pli_{-1}, + video_sink_ssrc_{0}, video_source_ssrc_{0}, fir_seq_number_{0} {} + +void PliPacerHandler::enable() { + enabled_ = true; +} + +void PliPacerHandler::disable() { + enabled_ = false; +} + +void PliPacerHandler::notifyUpdate() { + auto pipeline = getContext()->getPipelineShared(); + if (pipeline && !connection_) { + connection_ = pipeline->getService().get(); + video_sink_ssrc_ = connection_->getVideoSinkSSRC(); + video_source_ssrc_ = connection_->getVideoSourceSSRC(); + } +} + +void PliPacerHandler::read(Context *ctx, std::shared_ptr packet) { + if (enabled_ && packet->is_keyframe) { + time_last_keyframe_ = clock_->now(); + waiting_for_keyframe_ = false; + connection_->getWorker()->unschedule(scheduled_pli_); + scheduled_pli_ = -1; + } + ctx->fireRead(packet); +} + +void PliPacerHandler::sendPLI() { + getContext()->fireWrite(RtpUtils::createPLI(video_source_ssrc_, video_sink_ssrc_)); + scheduleNextPLI(); +} + +void PliPacerHandler::sendFIR() { + ELOG_WARN("%s message: Timed out waiting for a keyframe", connection_->toLog()); + getContext()->fireWrite(RtpUtils::createFIR(video_source_ssrc_, video_sink_ssrc_, fir_seq_number_++)); + getContext()->fireWrite(RtpUtils::createFIR(video_source_ssrc_, video_sink_ssrc_, fir_seq_number_++)); + getContext()->fireWrite(RtpUtils::createFIR(video_source_ssrc_, video_sink_ssrc_, fir_seq_number_++)); + waiting_for_keyframe_ = false; + scheduled_pli_ = -1; +} + +void PliPacerHandler::scheduleNextPLI() { + if (!waiting_for_keyframe_ || !enabled_) { + return; + } + std::weak_ptr weak_this = shared_from_this(); + scheduled_pli_ = connection_->getWorker()->scheduleFromNow([weak_this] { + if (auto this_ptr = weak_this.lock()) { + if (this_ptr->clock_->now() - this_ptr->time_last_keyframe_ >= kKeyframeTimeout) { + this_ptr->sendFIR(); + return; + } + this_ptr->sendPLI(); + } + }, kMinPLIPeriod); +} + +void PliPacerHandler::write(Context *ctx, std::shared_ptr packet) { + if (enabled_ && RtpUtils::isPLI(packet)) { + if (waiting_for_keyframe_) { + return; + } + waiting_for_keyframe_ = true; + scheduleNextPLI(); + } + ctx->fireWrite(packet); +} + +} // namespace erizo diff --git a/erizo/src/erizo/rtp/PliPacerHandler.h b/erizo/src/erizo/rtp/PliPacerHandler.h new file mode 100644 index 0000000000..bccf2b9528 --- /dev/null +++ b/erizo/src/erizo/rtp/PliPacerHandler.h @@ -0,0 +1,54 @@ +#ifndef ERIZO_SRC_ERIZO_RTP_PLIPACERHANDLER_H_ +#define ERIZO_SRC_ERIZO_RTP_PLIPACERHANDLER_H_ + +#include + +#include "./logger.h" +#include "pipeline/Handler.h" +#include "lib/Clock.h" + +namespace erizo { + +class WebRtcConnection; + +class PliPacerHandler: public Handler, public std::enable_shared_from_this { + DECLARE_LOGGER(); + + public: + static constexpr duration kMinPLIPeriod = std::chrono::milliseconds(200); + static constexpr duration kKeyframeTimeout = std::chrono::seconds(4); + + public: + explicit PliPacerHandler(std::shared_ptr the_clock = std::make_shared()); + + void enable() override; + void disable() override; + + std::string getName() override { + return "pli-pacer"; + } + + void read(Context *ctx, std::shared_ptr packet) override; + void write(Context *ctx, std::shared_ptr packet) override; + void notifyUpdate() override; + + private: + void scheduleNextPLI(); + void sendPLI(); + void sendFIR(); + + private: + bool enabled_; + WebRtcConnection* connection_; + std::shared_ptr clock_; + time_point time_last_keyframe_; + bool waiting_for_keyframe_; + int scheduled_pli_; + uint32_t video_sink_ssrc_; + uint32_t video_source_ssrc_; + uint8_t fir_seq_number_; +}; + +} // namespace erizo + +#endif // ERIZO_SRC_ERIZO_RTP_PLIPACERHANDLER_H_ diff --git a/erizo/src/erizo/rtp/RtpHeaders.h b/erizo/src/erizo/rtp/RtpHeaders.h index 43490d7b28..da03cbd299 100644 --- a/erizo/src/erizo/rtp/RtpHeaders.h +++ b/erizo/src/erizo/rtp/RtpHeaders.h @@ -1,3 +1,4 @@ + /* * RtpHeaders.h */ @@ -350,6 +351,13 @@ class RtcpHeader { uint32_t ssrcsource; uint32_t fci; } pli; + + struct fir_t { + uint32_t ssrcsource; + uint32_t mediasource; + uint32_t seqnumber:8; + uint32_t reserved:24; + } fir; } report; inline RtcpHeader() : blockcount(0), padding(0), version(2), packettype(0), length(0), ssrc(0) { @@ -511,6 +519,12 @@ class RtcpHeader { inline void setFCI(uint32_t fci) { report.pli.fci = htonl(fci); } + inline void setFIRSourceSSRC(uint32_t ssrc) { + report.fir.mediasource = htonl(ssrc); + } + inline void setFIRSequenceNumber(uint8_t seq_number) { + report.fir.seqnumber = seq_number; + } }; diff --git a/erizo/src/erizo/rtp/RtpUtils.cpp b/erizo/src/erizo/rtp/RtpUtils.cpp index ce3186c4aa..65e4562178 100644 --- a/erizo/src/erizo/rtp/RtpUtils.cpp +++ b/erizo/src/erizo/rtp/RtpUtils.cpp @@ -35,10 +35,32 @@ void RtpUtils::forEachNack(RtcpHeader *chead, std::function packet) { + bool is_pli = false; + forEachRRBlock(packet, [&is_pli] (RtcpHeader *header) { + if (header->getPacketType() == RTCP_PS_Feedback_PT && + header->getBlockCount() == RTCP_PLI_FMT) { + is_pli = true; + } + }); + return is_pli; +} + +bool RtpUtils::isFIR(std::shared_ptr packet) { + bool is_fir = false; + forEachRRBlock(packet, [&is_fir] (RtcpHeader *header) { + if (header->getPacketType() == RTCP_PS_Feedback_PT && + header->getBlockCount() == RTCP_FIR_FMT) { + is_fir = true; + } + }); + return is_fir; +} + std::shared_ptr RtpUtils::createPLI(uint32_t source_ssrc, uint32_t sink_ssrc) { RtcpHeader pli; pli.setPacketType(RTCP_PS_Feedback_PT); - pli.setBlockCount(1); + pli.setBlockCount(RTCP_PLI_FMT); pli.setSSRC(sink_ssrc); pli.setSourceSSRC(source_ssrc); pli.setLength(2); @@ -47,6 +69,21 @@ std::shared_ptr RtpUtils::createPLI(uint32_t source_ssrc, uint32_t s return std::make_shared(0, buf, len, VIDEO_PACKET); } +std::shared_ptr RtpUtils::createFIR(uint32_t source_ssrc, uint32_t sink_ssrc, uint8_t seq_number) { + RtcpHeader fir; + fir.setPacketType(RTCP_PS_Feedback_PT); + fir.setBlockCount(RTCP_FIR_FMT); + fir.setSSRC(sink_ssrc); + fir.setSourceSSRC(source_ssrc); + fir.setLength(4); + fir.setFIRSourceSSRC(source_ssrc); + fir.setFIRSequenceNumber(seq_number); + char *buf = reinterpret_cast(&fir); + int len = (fir.getLength() + 1) * 4; + return std::make_shared(0, buf, len, VIDEO_PACKET); +} + + int RtpUtils::getPaddingLength(std::shared_ptr packet) { RtpHeader *rtp_header = reinterpret_cast(packet->data); if (rtp_header->hasPadding()) { diff --git a/erizo/src/erizo/rtp/RtpUtils.h b/erizo/src/erizo/rtp/RtpUtils.h index 10afa4ac19..df1ea21a30 100644 --- a/erizo/src/erizo/rtp/RtpUtils.h +++ b/erizo/src/erizo/rtp/RtpUtils.h @@ -19,10 +19,16 @@ class RtpUtils { static void updateREMB(RtcpHeader *chead, uint bitrate); + static bool isPLI(std::shared_ptr packet); + + static bool isFIR(std::shared_ptr packet); + static void forEachNack(RtcpHeader *chead, std::function f); static std::shared_ptr createPLI(uint32_t source_ssrc, uint32_t sink_ssrc); + static std::shared_ptr createFIR(uint32_t source_ssrc, uint32_t sink_ssrc, uint8_t seq_number); + static int getPaddingLength(std::shared_ptr packet); }; diff --git a/erizo/src/test/rtp/PliPacerHandlerTest.cpp b/erizo/src/test/rtp/PliPacerHandlerTest.cpp new file mode 100644 index 0000000000..0c2ae6e0de --- /dev/null +++ b/erizo/src/test/rtp/PliPacerHandlerTest.cpp @@ -0,0 +1,143 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include "../utils/Mocks.h" +#include "../utils/Tools.h" +#include "../utils/Matchers.h" + +using ::testing::_; +using ::testing::IsNull; +using ::testing::Args; +using ::testing::Return; +using erizo::dataPacket; +using erizo::packetType; +using erizo::AUDIO_PACKET; +using erizo::VIDEO_PACKET; +using erizo::IceConfig; +using erizo::RtpMap; +using erizo::PliPacerHandler; +using erizo::SimulatedClock; +using erizo::WebRtcConnection; +using erizo::Pipeline; +using erizo::InboundHandler; +using erizo::OutboundHandler; +using erizo::Worker; +using std::queue; + +constexpr int kShortPeriodMs = + std::chrono::duration_cast(PliPacerHandler::kMinPLIPeriod).count() / 5; + +constexpr int kPLIPeriodMs = + std::chrono::duration_cast(PliPacerHandler::kMinPLIPeriod).count(); + +constexpr int kKeyframeTimeoutMs = + std::chrono::duration_cast(PliPacerHandler::kKeyframeTimeout).count(); + + +class PliPacerHandlerTest : public erizo::HandlerTest { + public: + PliPacerHandlerTest() {} + + protected: + void setHandler() { + pli_pacer_handler = std::make_shared(simulated_clock); + pipeline->addBack(pli_pacer_handler); + } + + std::shared_ptr pli_pacer_handler; +}; + +TEST_F(PliPacerHandlerTest, basicBehaviourShouldReadPackets) { + auto packet = erizo::PacketTools::createDataPacket(erizo::kArbitrarySeqNumber, AUDIO_PACKET); + + EXPECT_CALL(*reader.get(), read(_, _)). + With(Args<1>(erizo::RtpHasSequenceNumber(erizo::kArbitrarySeqNumber))).Times(1); + pipeline->read(packet); +} + +TEST_F(PliPacerHandlerTest, basicBehaviourShouldWritePackets) { + auto packet = erizo::PacketTools::createDataPacket(erizo::kArbitrarySeqNumber, AUDIO_PACKET); + + EXPECT_CALL(*writer.get(), write(_, _)). + With(Args<1>(erizo::RtpHasSequenceNumber(erizo::kArbitrarySeqNumber))).Times(1); + pipeline->write(packet); +} + +TEST_F(PliPacerHandlerTest, shouldSendASinglePLIWhenReceivingSeveralInAShortPeriod) { + auto pli1 = erizo::PacketTools::createPLI(); + auto pli2 = erizo::PacketTools::createPLI(); + auto pli3 = erizo::PacketTools::createPLI(); + + EXPECT_CALL(*writer.get(), write(_, _)).Times(1); + pipeline->write(pli1); + executeTasksInNextMs(kShortPeriodMs); + pipeline->write(pli2); + executeTasksInNextMs(kShortPeriodMs); + pipeline->write(pli3); + executeTasksInNextMs(kShortPeriodMs); +} + +TEST_F(PliPacerHandlerTest, shouldResetPeriodWhenKeyframeIsReceived) { + auto pli1 = erizo::PacketTools::createPLI(); + auto keyframe = erizo::PacketTools::createVP8Packet(erizo::kArbitrarySeqNumber, true, true); + auto pli2 = erizo::PacketTools::createPLI(); + + EXPECT_CALL(*writer.get(), write(_, _)).With(Args<1>(erizo::IsPLI())).Times(2); + EXPECT_CALL(*reader.get(), read(_, _)). + With(Args<1>(erizo::RtpHasSequenceNumber(erizo::kArbitrarySeqNumber))).Times(1); + + pipeline->write(pli1); + executeTasksInNextMs(kShortPeriodMs); + pipeline->read(keyframe); + executeTasksInNextMs(kShortPeriodMs); + pipeline->write(pli2); + executeTasksInNextMs(kShortPeriodMs); +} + +TEST_F(PliPacerHandlerTest, shouldSendMultiplePLIsWhenPeriodIsExpiredWithNoKeyframes) { + auto pli1 = erizo::PacketTools::createPLI(); + auto keyframe = erizo::PacketTools::createVP8Packet(erizo::kArbitrarySeqNumber, true, true); + + EXPECT_CALL(*writer.get(), write(_, _)).With(Args<1>(erizo::IsPLI())).Times(2); + EXPECT_CALL(*reader.get(), read(_, _)). + With(Args<1>(erizo::RtpHasSequenceNumber(erizo::kArbitrarySeqNumber))).Times(1); + + pipeline->read(keyframe); + pipeline->write(pli1); + + executeTasksInNextMs(kPLIPeriodMs + 1); +} + +TEST_F(PliPacerHandlerTest, shouldNotSendMultiplePLIsWhenDisabled) { + auto pli1 = erizo::PacketTools::createPLI(); + auto keyframe = erizo::PacketTools::createVP8Packet(erizo::kArbitrarySeqNumber, true, true); + pli_pacer_handler->disable(); + + EXPECT_CALL(*writer.get(), write(_, _)).With(Args<1>(erizo::IsPLI())).Times(1); + EXPECT_CALL(*reader.get(), read(_, _)). + With(Args<1>(erizo::RtpHasSequenceNumber(erizo::kArbitrarySeqNumber))).Times(1); + + pipeline->read(keyframe); + pipeline->write(pli1); + + executeTasksInNextMs(kPLIPeriodMs + 1); +} + +TEST_F(PliPacerHandlerTest, shouldSendFIRWhenKeyframesAreNotReceivedInALongPeriod) { + auto pli1 = erizo::PacketTools::createPLI(); + + EXPECT_CALL(*writer.get(), write(_, _)).With(Args<1>(erizo::IsPLI())).Times(20); + EXPECT_CALL(*writer.get(), write(_, _)).With(Args<1>(erizo::IsFIR())).Times(3); + pipeline->write(pli1); + + executeTasksInNextMs(kKeyframeTimeoutMs + 10); +} diff --git a/erizo/src/test/utils/Matchers.h b/erizo/src/test/utils/Matchers.h index df9a75bcd7..108db4b82d 100644 --- a/erizo/src/test/utils/Matchers.h +++ b/erizo/src/test/utils/Matchers.h @@ -2,6 +2,7 @@ #define ERIZO_SRC_TEST_UTILS_MATCHERS_H_ #include +#include #include namespace erizo { @@ -49,6 +50,17 @@ MATCHER_P(SenderReportHasOctetsSentValue, octets_sent, "") { MATCHER_P(RembHasBitrateValue, bitrate, "") { return (reinterpret_cast(std::get<0>(arg)->data))->getREMBBitRate() == bitrate; } + +MATCHER(IsPLI, "") { + auto packet = std::get<0>(arg); + return RtpUtils::isPLI(packet); +} + +MATCHER(IsFIR, "") { + auto packet = std::get<0>(arg); + return RtpUtils::isFIR(packet); +} + MATCHER_P(PacketBelongsToSpatialLayer, spatial_layer_id, "") { return std::get<0>(arg)->belongsToSpatialLayer(spatial_layer_id); } diff --git a/erizo/src/test/utils/Tools.h b/erizo/src/test/utils/Tools.h index d8bb283d50..0bb332664f 100644 --- a/erizo/src/test/utils/Tools.h +++ b/erizo/src/test/utils/Tools.h @@ -96,7 +96,9 @@ class PacketTools { parsing_pointer++; *parsing_pointer = is_keyframe? 0x00: 0x01; - return std::make_shared(0, packet_buffer, 200, VIDEO_PACKET); + auto packet = std::make_shared(0, packet_buffer, 200, VIDEO_PACKET); + packet->is_keyframe = is_keyframe; + return packet; } static std::shared_ptr createVP9Packet(uint16_t seq_number, bool is_keyframe, bool is_marker) { @@ -115,7 +117,9 @@ class PacketTools { *parsing_pointer = is_keyframe? 0x00: 0x40; - return std::make_shared(0, packet_buffer, 200, VIDEO_PACKET); + auto packet = std::make_shared(0, packet_buffer, 200, VIDEO_PACKET); + packet->is_keyframe = is_keyframe; + return packet; } static std::shared_ptr createRembPacket(uint32_t bitrate) { @@ -159,10 +163,10 @@ class BaseHandlerTest { virtual void setHandler() = 0; virtual void internalSetUp() { - scheduler = std::make_shared(1); - worker = std::make_shared(scheduler); - worker->start(); - connection = std::make_shared(worker, ice_config, rtp_maps); + simulated_clock = std::make_shared(); + simulated_worker = std::make_shared(simulated_clock); + simulated_worker->start(); + connection = std::make_shared(simulated_worker, ice_config, rtp_maps); processor = std::make_shared(); stats = std::make_shared(); connection->setVideoSinkSSRC(erizo::kVideoSsrc); @@ -188,6 +192,13 @@ class BaseHandlerTest { pipeline->finalize(); } + virtual void executeTasksInNextMs(int time) { + for (int step = 0; step < time + 1; step++) { + simulated_worker->executePastScheduledTasks(); + simulated_clock->advanceTime(std::chrono::milliseconds(1)); + } + } + virtual void internalTearDown() { } @@ -199,8 +210,8 @@ class BaseHandlerTest { Pipeline::Ptr pipeline; std::shared_ptr reader; std::shared_ptr writer; - std::shared_ptr worker; - std::shared_ptr scheduler; + std::shared_ptr simulated_clock; + std::shared_ptr simulated_worker; std::queue> packet_queue; }; diff --git a/erizo_controller/erizoAgent/log4cxx.properties b/erizo_controller/erizoAgent/log4cxx.properties index 3bd66a44cc..d50d8ced10 100644 --- a/erizo_controller/erizoAgent/log4cxx.properties +++ b/erizo_controller/erizoAgent/log4cxx.properties @@ -46,7 +46,7 @@ log4j.logger.rtp.RtpPacketQueue=WARN log4j.logger.rtp.RtpRetransmissionHandler=WARN log4j.logger.rtp.RtpVP8Fragmenter=WARN log4j.logger.rtp.RtpVP8Parser=ERROR -log4j.logger.rtp.RtpVP8SlideShowHandler=WARN +log4j.logger.rtp.RtpSlideShowHandler=WARN log4j.logger.rtp.RtpAudioMuteHandler=WARN log4j.logger.rtp.RtpSink=WARN log4j.logger.rtp.RtpSource=WARN @@ -58,3 +58,4 @@ log4j.logger.rtp.BandwidthEstimationHandler=WARN log4j.logger.rtp.SenderBandwidthEstimationHandler=WARN log4j.logger.rtp.StatsCalculator=WARN log4j.logger.rtp.LayerDetectorHandler=WARN +log4j.logger.rtp.PliPacerHandler=WARN