diff --git a/erizo/src/erizo/rtp/RtcpAggregator.cpp b/erizo/src/erizo/rtp/RtcpAggregator.cpp index 4837f440f8..bf6c7c7517 100644 --- a/erizo/src/erizo/rtp/RtcpAggregator.cpp +++ b/erizo/src/erizo/rtp/RtcpAggregator.cpp @@ -59,7 +59,7 @@ void RtcpAggregator::analyzeSr(RtcpHeader* chead) { uint32_t ntp; uint64_t theNTP = chead->getNtpTimestamp(); ntp = (theNTP & (0xFFFFFFFF0000)) >> 16; - theData->senderReports.push_back(boost::shared_ptr( new SrDelayData(ntp, now))); + theData->senderReports.push_back(boost::shared_ptr( new SrDelayData(recvSSRC, ntp, now))); // We only store the last 20 sr if (theData->senderReports.size() > 20) { theData->senderReports.pop_front(); diff --git a/erizo/src/erizo/rtp/RtcpForwarder.cpp b/erizo/src/erizo/rtp/RtcpForwarder.cpp index a8d00bd0eb..622656027c 100644 --- a/erizo/src/erizo/rtp/RtcpForwarder.cpp +++ b/erizo/src/erizo/rtp/RtcpForwarder.cpp @@ -48,7 +48,7 @@ void RtcpForwarder::analyzeSr(RtcpHeader* chead) { uint32_t ntp; uint64_t theNTP = chead->getNtpTimestamp(); ntp = (theNTP & (0xFFFFFFFF0000)) >> 16; - theData->senderReports.push_back(boost::shared_ptr( new SrDelayData(ntp, now))); + theData->senderReports.push_back(boost::shared_ptr( new SrDelayData(recvSSRC, ntp, now))); // We only store the last 20 sr if (theData->senderReports.size() > 20) { theData->senderReports.pop_front(); diff --git a/erizo/src/erizo/rtp/RtcpProcessor.h b/erizo/src/erizo/rtp/RtcpProcessor.h index f0f5fe21b4..814b9f2654 100644 --- a/erizo/src/erizo/rtp/RtcpProcessor.h +++ b/erizo/src/erizo/rtp/RtcpProcessor.h @@ -17,15 +17,30 @@ namespace erizo { class SrDelayData { public: + uint32_t ssrc; uint32_t sr_ntp; uint64_t sr_send_time; - SrDelayData() : sr_ntp{0}, sr_send_time{0} {} + SrDelayData() : ssrc{0}, sr_ntp{0}, sr_send_time{0} {} - SrDelayData(uint32_t ntp, uint64_t send_time) : sr_ntp{ntp}, + SrDelayData(uint32_t source_ssrc, uint32_t ntp, uint64_t send_time) : ssrc{source_ssrc}, + sr_ntp{ntp}, sr_send_time{send_time} {} }; +class RrDelayData { + public: + uint32_t ssrc; + uint32_t delay; + uint64_t fraction_lost; + + RrDelayData() : ssrc{0}, delay{0}, fraction_lost{0} {} + + RrDelayData(uint32_t rr_ssrc, uint32_t rr_delay, uint64_t rr_fraction_lost) : ssrc{rr_ssrc}, + delay{rr_delay}, + fraction_lost{rr_fraction_lost} {} +}; + class RtcpData { // lost packets - list and length public: diff --git a/erizo/src/erizo/rtp/SenderBandwidthEstimantionHandler.cpp b/erizo/src/erizo/rtp/SenderBandwidthEstimantionHandler.cpp index 5a82381d65..686adce1fc 100644 --- a/erizo/src/erizo/rtp/SenderBandwidthEstimantionHandler.cpp +++ b/erizo/src/erizo/rtp/SenderBandwidthEstimantionHandler.cpp @@ -1,7 +1,10 @@ -#include "./MediaDefinitions.h" #include "rtp/SenderBandwidthEstimationHandler.h" +#include + +#include "./MediaDefinitions.h" #include "rtp/RtpUtils.h" +#include "./MediaStream.h" namespace erizo { @@ -11,10 +14,11 @@ constexpr duration SenderBandwidthEstimationHandler::kMinUpdateEstimateInterval; SenderBandwidthEstimationHandler::SenderBandwidthEstimationHandler(std::shared_ptr the_clock) : connection_{nullptr}, bwe_listener_{nullptr}, clock_{the_clock}, initialized_{false}, enabled_{true}, - received_remb_{false}, period_packets_sent_{0}, estimated_bitrate_{0}, estimated_loss_{0}, - estimated_rtt_{0}, last_estimate_update_{clock::now()}, sender_bwe_{new SendSideBandwidthEstimation()} { + received_remb_{false}, estimated_bitrate_{0}, estimated_loss_{0}, + estimated_rtt_{0}, last_estimate_update_{clock::now()}, sender_bwe_{new SendSideBandwidthEstimation()}, + max_rr_delay_data_size_{0}, max_sr_delay_data_size_{0} { sender_bwe_->SetBitrates(kStartSendBitrate, kMinSendBitrate, kMaxSendBitrate); - }; + } SenderBandwidthEstimationHandler::SenderBandwidthEstimationHandler(const SenderBandwidthEstimationHandler&& handler) : // NOLINT connection_{handler.connection_}, @@ -28,7 +32,10 @@ SenderBandwidthEstimationHandler::SenderBandwidthEstimationHandler(const SenderB estimated_loss_{handler.estimated_loss_}, estimated_rtt_{handler.estimated_rtt_}, sender_bwe_{handler.sender_bwe_}, - sr_delay_data_{std::move(handler.sr_delay_data_)} {} + sr_delay_data_{std::move(handler.sr_delay_data_)}, + rr_delay_data_{std::move(handler.rr_delay_data_)}, + max_rr_delay_data_size_{handler.max_sr_delay_data_size_}, + max_sr_delay_data_size_{handler.max_rr_delay_data_size_} {} void SenderBandwidthEstimationHandler::enable() { @@ -41,6 +48,7 @@ void SenderBandwidthEstimationHandler::disable() { void SenderBandwidthEstimationHandler::notifyUpdate() { if (initialized_) { + updateMaxListSizes(); return; } auto pipeline = getContext()->getPipelineShared(); @@ -50,6 +58,7 @@ void SenderBandwidthEstimationHandler::notifyUpdate() { if (!connection_) { return; } + updateMaxListSizes(); stats_ = pipeline->getService(); if (!stats_) { return; @@ -57,12 +66,20 @@ void SenderBandwidthEstimationHandler::notifyUpdate() { initialized_ = true; } +void SenderBandwidthEstimationHandler::updateMaxListSizes() { + size_t num_streams = 0; + connection_->forEachMediaStream([&num_streams] (std::shared_ptr media_stream) { + if (!media_stream->isPublisher()) { + num_streams++; + } + }); + max_sr_delay_data_size_ = num_streams * kMaxSrListSize; + max_rr_delay_data_size_ = num_streams; + updateReceiverBlockFromList(); +} + void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptr packet) { RtpUtils::forEachRtcpBlock(packet, [this](RtcpHeader *chead) { - ELOG_DEBUG("%s ssrc %u, sourceSSRC %u, PacketType %u", connection_->toLog(), - chead->getSSRC(), - chead->getSourceSSRC(), - chead->getPacketType()); switch (chead->packettype) { case RTCP_Receiver_PT: { @@ -70,13 +87,14 @@ void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptrgetDelaySinceLastSr() * 1000) / 65536; int64_t now_ms = ClockUtils::timePointToMs(clock_->now()); uint32_t last_sr = chead->getLastSr(); + uint32_t ssrc = chead->getSourceSSRC(); auto value = std::find_if(sr_delay_data_.begin(), sr_delay_data_.end(), - [last_sr](const std::shared_ptr sr_info) { - return sr_info->sr_ntp == last_sr; + [ssrc, last_sr](const std::shared_ptr sr_info) { + return sr_info->ssrc == ssrc && sr_info->sr_ntp == last_sr; }); ELOG_DEBUG("%s, Analyzing Video RR: PacketLost %u, Ratio %u, blocks %d" - ", sourceSSRC %u, ssrc %u, last_sr %u, remb_received %d, found %d", + ", sourceSSRC %u, ssrc %u, last_sr %u, remb_received %d, found %d, max_size: %d, size: %d", connection_->toLog(), chead->getLostPackets(), chead->getFractionLost(), @@ -85,17 +103,14 @@ void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptrgetSSRC(), chead->getLastSr(), received_remb_, - value != sr_delay_data_.end()); - // TODO(pedro) Implement alternative when there are no REMBs + value != sr_delay_data_.end(), + max_rr_delay_data_size_, + rr_delay_data_.size()); if (received_remb_ && value != sr_delay_data_.end()) { uint32_t delay = now_ms - (*value)->sr_send_time - delay_since_last_ms; - ELOG_DEBUG("%s message: Updating Estimate with RR, fraction_lost: %u, " - "delay: %u, period_packets_sent_: %u", - connection_->toLog(), chead->getFractionLost(), delay, period_packets_sent_); - sender_bwe_->UpdateReceiverBlock(chead->getFractionLost(), - delay, period_packets_sent_, now_ms); - period_packets_sent_ = 0; - updateEstimate(); + rr_delay_data_.push_back( + std::make_shared(chead->getSourceSSRC(), delay, chead->getFractionLost())); + updateReceiverBlockFromList(); } } break; @@ -128,15 +143,58 @@ void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptrfireRead(std::move(packet)); } +void SenderBandwidthEstimationHandler::updateReceiverBlockFromList() { + if (rr_delay_data_.size() < max_rr_delay_data_size_) { + return; + } + // TODO(pedro) Implement alternative when there are no REMBs + if (received_remb_) { + uint32_t total_packets_lost = 0; + uint32_t total_packets_sent = 0; + uint64_t avg_delay = 0; + uint32_t rr_delay_data_size = rr_delay_data_.size(); + int64_t now_ms = ClockUtils::timePointToMs(clock_->now()); + std::for_each(rr_delay_data_.begin(), rr_delay_data_.end(), + [&avg_delay, &total_packets_lost, rr_delay_data_size, &total_packets_sent, this] + (const std::shared_ptr &rr_info) { + auto packets_sent_ssrc = period_packets_sent_.find(rr_info->ssrc); + if (packets_sent_ssrc != period_packets_sent_.end()) { + total_packets_lost += rr_info->fraction_lost * packets_sent_ssrc->second / 255; + total_packets_sent += packets_sent_ssrc->second; + avg_delay += rr_info->delay / rr_delay_data_size; + } + }); + if (total_packets_sent > 0) { + uint32_t fraction_lost = total_packets_lost * 255 / total_packets_sent; + ELOG_DEBUG("%s message: Updating Estimate with RR, fraction_lost: %u, " + "delay: %u, period_packets_sent_: %u", + connection_->toLog(), fraction_lost, avg_delay, total_packets_sent); + sender_bwe_->UpdateReceiverBlock(fraction_lost, avg_delay, total_packets_sent, now_ms); + updateEstimate(); + } + } + period_packets_sent_.clear(); + rr_delay_data_.clear(); +} + void SenderBandwidthEstimationHandler::write(Context *ctx, std::shared_ptr packet) { RtcpHeader *chead = reinterpret_cast(packet->data); - if (!chead->isRtcp() && packet->type == VIDEO_PACKET) { - period_packets_sent_++; - time_point now = clock_->now(); - if (received_remb_ && now - last_estimate_update_ > kMinUpdateEstimateInterval) { - sender_bwe_->UpdateEstimate(ClockUtils::timePointToMs(now)); - updateEstimate(); - last_estimate_update_ = now; + if (!chead->isRtcp()) { + RtpHeader *rtp_header = reinterpret_cast(packet->data); + uint32_t ssrc = rtp_header->getSSRC(); + auto packets_sent_for_ssrc = period_packets_sent_.find(ssrc); + if (packets_sent_for_ssrc != period_packets_sent_.end()) { + packets_sent_for_ssrc->second++; + } else { + period_packets_sent_.emplace(ssrc, 1); + } + if (packet->type == VIDEO_PACKET) { + time_point now = clock_->now(); + if (received_remb_ && now - last_estimate_update_ > kMinUpdateEstimateInterval) { + sender_bwe_->UpdateEstimate(ClockUtils::timePointToMs(now)); + updateEstimate(); + last_estimate_update_ = now; + } } } else if (chead->getPacketType() == RTCP_Sender_PT) { analyzeSr(chead); @@ -147,10 +205,11 @@ void SenderBandwidthEstimationHandler::write(Context *ctx, std::shared_ptrnow()); uint32_t ntp; + uint32_t ssrc = chead->getSSRC(); ntp = chead->get32MiddleNtp(); ELOG_DEBUG("%s message: adding incoming SR to list, ntp: %u", connection_->toLog(), ntp); - sr_delay_data_.push_back(std::shared_ptr( new SrDelayData(ntp, now))); - if (sr_delay_data_.size() >= kMaxSrListSize) { + sr_delay_data_.push_back(std::make_shared(ssrc, ntp, now)); + if (sr_delay_data_.size() >= max_sr_delay_data_size_) { sr_delay_data_.pop_front(); } } diff --git a/erizo/src/erizo/rtp/SenderBandwidthEstimationHandler.h b/erizo/src/erizo/rtp/SenderBandwidthEstimationHandler.h index 3d2a4369b3..1cfb88a3cf 100644 --- a/erizo/src/erizo/rtp/SenderBandwidthEstimationHandler.h +++ b/erizo/src/erizo/rtp/SenderBandwidthEstimationHandler.h @@ -1,5 +1,7 @@ #ifndef ERIZO_SRC_ERIZO_RTP_SENDERBANDWIDTHESTIMATIONHANDLER_H_ #define ERIZO_SRC_ERIZO_RTP_SENDERBANDWIDTHESTIMATIONHANDLER_H_ +#include + #include "pipeline/Handler.h" #include "./logger.h" #include "./WebRtcConnection.h" @@ -50,6 +52,9 @@ class SenderBandwidthEstimationHandler : public Handler, void setListener(SenderBandwidthEstimationListener* listener) { bwe_listener_ = listener; } + private: + void updateMaxListSizes(); + void updateReceiverBlockFromList(); private: WebRtcConnection* connection_; @@ -58,14 +63,17 @@ class SenderBandwidthEstimationHandler : public Handler, bool initialized_; bool enabled_; bool received_remb_; - uint32_t period_packets_sent_; + std::map period_packets_sent_; int estimated_bitrate_; uint8_t estimated_loss_; int64_t estimated_rtt_; time_point last_estimate_update_; std::shared_ptr sender_bwe_; std::list> sr_delay_data_; + std::list> rr_delay_data_; std::shared_ptr stats_; + uint32_t max_rr_delay_data_size_; + uint32_t max_sr_delay_data_size_; void updateEstimate(); };