Skip to content

Commit

Permalink
Fix RR handling for SenderBWE (#1507)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored Nov 29, 2019
1 parent 188d23d commit 32a1658
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 35 deletions.
2 changes: 1 addition & 1 deletion erizo/src/erizo/rtp/RtcpAggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void RtcpAggregator::analyzeSr(RtcpHeader* chead) {
uint32_t ntp;
uint64_t theNTP = chead->getNtpTimestamp();
ntp = (theNTP & (0xFFFFFFFF0000)) >> 16;
theData->senderReports.push_back(boost::shared_ptr<SrDelayData>( new SrDelayData(ntp, now)));
theData->senderReports.push_back(boost::shared_ptr<SrDelayData>( new SrDelayData(recvSSRC, ntp, now)));
// We only store the last 20 sr
if (theData->senderReports.size() > 20) {
theData->senderReports.pop_front();
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/rtp/RtcpForwarder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void RtcpForwarder::analyzeSr(RtcpHeader* chead) {
uint32_t ntp;
uint64_t theNTP = chead->getNtpTimestamp();
ntp = (theNTP & (0xFFFFFFFF0000)) >> 16;
theData->senderReports.push_back(boost::shared_ptr<SrDelayData>( new SrDelayData(ntp, now)));
theData->senderReports.push_back(boost::shared_ptr<SrDelayData>( new SrDelayData(recvSSRC, ntp, now)));
// We only store the last 20 sr
if (theData->senderReports.size() > 20) {
theData->senderReports.pop_front();
Expand Down
19 changes: 17 additions & 2 deletions erizo/src/erizo/rtp/RtcpProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,30 @@ namespace erizo {

class SrDelayData {
public:
uint32_t ssrc;
uint32_t sr_ntp;
uint64_t sr_send_time;

SrDelayData() : sr_ntp{0}, sr_send_time{0} {}
SrDelayData() : ssrc{0}, sr_ntp{0}, sr_send_time{0} {}

SrDelayData(uint32_t ntp, uint64_t send_time) : sr_ntp{ntp},
SrDelayData(uint32_t source_ssrc, uint32_t ntp, uint64_t send_time) : ssrc{source_ssrc},
sr_ntp{ntp},
sr_send_time{send_time} {}
};

class RrDelayData {
public:
uint32_t ssrc;
uint32_t delay;
uint64_t fraction_lost;

RrDelayData() : ssrc{0}, delay{0}, fraction_lost{0} {}

RrDelayData(uint32_t rr_ssrc, uint32_t rr_delay, uint64_t rr_fraction_lost) : ssrc{rr_ssrc},
delay{rr_delay},
fraction_lost{rr_fraction_lost} {}
};

class RtcpData {
// lost packets - list and length
public:
Expand Down
119 changes: 89 additions & 30 deletions erizo/src/erizo/rtp/SenderBandwidthEstimantionHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#include "./MediaDefinitions.h"
#include "rtp/SenderBandwidthEstimationHandler.h"

#include <utility>

#include "./MediaDefinitions.h"
#include "rtp/RtpUtils.h"
#include "./MediaStream.h"

namespace erizo {

Expand All @@ -11,10 +14,11 @@ constexpr duration SenderBandwidthEstimationHandler::kMinUpdateEstimateInterval;

SenderBandwidthEstimationHandler::SenderBandwidthEstimationHandler(std::shared_ptr<Clock> the_clock) :
connection_{nullptr}, bwe_listener_{nullptr}, clock_{the_clock}, initialized_{false}, enabled_{true},
received_remb_{false}, period_packets_sent_{0}, estimated_bitrate_{0}, estimated_loss_{0},
estimated_rtt_{0}, last_estimate_update_{clock::now()}, sender_bwe_{new SendSideBandwidthEstimation()} {
received_remb_{false}, estimated_bitrate_{0}, estimated_loss_{0},
estimated_rtt_{0}, last_estimate_update_{clock::now()}, sender_bwe_{new SendSideBandwidthEstimation()},
max_rr_delay_data_size_{0}, max_sr_delay_data_size_{0} {
sender_bwe_->SetBitrates(kStartSendBitrate, kMinSendBitrate, kMaxSendBitrate);
};
}

SenderBandwidthEstimationHandler::SenderBandwidthEstimationHandler(const SenderBandwidthEstimationHandler&& handler) : // NOLINT
connection_{handler.connection_},
Expand All @@ -28,7 +32,10 @@ SenderBandwidthEstimationHandler::SenderBandwidthEstimationHandler(const SenderB
estimated_loss_{handler.estimated_loss_},
estimated_rtt_{handler.estimated_rtt_},
sender_bwe_{handler.sender_bwe_},
sr_delay_data_{std::move(handler.sr_delay_data_)} {}
sr_delay_data_{std::move(handler.sr_delay_data_)},
rr_delay_data_{std::move(handler.rr_delay_data_)},
max_rr_delay_data_size_{handler.max_sr_delay_data_size_},
max_sr_delay_data_size_{handler.max_rr_delay_data_size_} {}


void SenderBandwidthEstimationHandler::enable() {
Expand All @@ -41,6 +48,7 @@ void SenderBandwidthEstimationHandler::disable() {

void SenderBandwidthEstimationHandler::notifyUpdate() {
if (initialized_) {
updateMaxListSizes();
return;
}
auto pipeline = getContext()->getPipelineShared();
Expand All @@ -50,33 +58,43 @@ void SenderBandwidthEstimationHandler::notifyUpdate() {
if (!connection_) {
return;
}
updateMaxListSizes();
stats_ = pipeline->getService<Stats>();
if (!stats_) {
return;
}
initialized_ = true;
}

void SenderBandwidthEstimationHandler::updateMaxListSizes() {
size_t num_streams = 0;
connection_->forEachMediaStream([&num_streams] (std::shared_ptr<MediaStream> media_stream) {
if (!media_stream->isPublisher()) {
num_streams++;
}
});
max_sr_delay_data_size_ = num_streams * kMaxSrListSize;
max_rr_delay_data_size_ = num_streams;
updateReceiverBlockFromList();
}

void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {
RtpUtils::forEachRtcpBlock(packet, [this](RtcpHeader *chead) {
ELOG_DEBUG("%s ssrc %u, sourceSSRC %u, PacketType %u", connection_->toLog(),
chead->getSSRC(),
chead->getSourceSSRC(),
chead->getPacketType());
switch (chead->packettype) {
case RTCP_Receiver_PT:
{
// calculate RTT + Update receiver block
uint32_t delay_since_last_ms = (chead->getDelaySinceLastSr() * 1000) / 65536;
int64_t now_ms = ClockUtils::timePointToMs(clock_->now());
uint32_t last_sr = chead->getLastSr();
uint32_t ssrc = chead->getSourceSSRC();

auto value = std::find_if(sr_delay_data_.begin(), sr_delay_data_.end(),
[last_sr](const std::shared_ptr<SrDelayData> sr_info) {
return sr_info->sr_ntp == last_sr;
[ssrc, last_sr](const std::shared_ptr<SrDelayData> sr_info) {
return sr_info->ssrc == ssrc && sr_info->sr_ntp == last_sr;
});
ELOG_DEBUG("%s, Analyzing Video RR: PacketLost %u, Ratio %u, blocks %d"
", sourceSSRC %u, ssrc %u, last_sr %u, remb_received %d, found %d",
", sourceSSRC %u, ssrc %u, last_sr %u, remb_received %d, found %d, max_size: %d, size: %d",
connection_->toLog(),
chead->getLostPackets(),
chead->getFractionLost(),
Expand All @@ -85,17 +103,14 @@ void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptr<DataPa
chead->getSSRC(),
chead->getLastSr(),
received_remb_,
value != sr_delay_data_.end());
// TODO(pedro) Implement alternative when there are no REMBs
value != sr_delay_data_.end(),
max_rr_delay_data_size_,
rr_delay_data_.size());
if (received_remb_ && value != sr_delay_data_.end()) {
uint32_t delay = now_ms - (*value)->sr_send_time - delay_since_last_ms;
ELOG_DEBUG("%s message: Updating Estimate with RR, fraction_lost: %u, "
"delay: %u, period_packets_sent_: %u",
connection_->toLog(), chead->getFractionLost(), delay, period_packets_sent_);
sender_bwe_->UpdateReceiverBlock(chead->getFractionLost(),
delay, period_packets_sent_, now_ms);
period_packets_sent_ = 0;
updateEstimate();
rr_delay_data_.push_back(
std::make_shared<RrDelayData>(chead->getSourceSSRC(), delay, chead->getFractionLost()));
updateReceiverBlockFromList();
}
}
break;
Expand Down Expand Up @@ -128,15 +143,58 @@ void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptr<DataPa
ctx->fireRead(std::move(packet));
}

void SenderBandwidthEstimationHandler::updateReceiverBlockFromList() {
if (rr_delay_data_.size() < max_rr_delay_data_size_) {
return;
}
// TODO(pedro) Implement alternative when there are no REMBs
if (received_remb_) {
uint32_t total_packets_lost = 0;
uint32_t total_packets_sent = 0;
uint64_t avg_delay = 0;
uint32_t rr_delay_data_size = rr_delay_data_.size();
int64_t now_ms = ClockUtils::timePointToMs(clock_->now());
std::for_each(rr_delay_data_.begin(), rr_delay_data_.end(),
[&avg_delay, &total_packets_lost, rr_delay_data_size, &total_packets_sent, this]
(const std::shared_ptr<RrDelayData> &rr_info) {
auto packets_sent_ssrc = period_packets_sent_.find(rr_info->ssrc);
if (packets_sent_ssrc != period_packets_sent_.end()) {
total_packets_lost += rr_info->fraction_lost * packets_sent_ssrc->second / 255;
total_packets_sent += packets_sent_ssrc->second;
avg_delay += rr_info->delay / rr_delay_data_size;
}
});
if (total_packets_sent > 0) {
uint32_t fraction_lost = total_packets_lost * 255 / total_packets_sent;
ELOG_DEBUG("%s message: Updating Estimate with RR, fraction_lost: %u, "
"delay: %u, period_packets_sent_: %u",
connection_->toLog(), fraction_lost, avg_delay, total_packets_sent);
sender_bwe_->UpdateReceiverBlock(fraction_lost, avg_delay, total_packets_sent, now_ms);
updateEstimate();
}
}
period_packets_sent_.clear();
rr_delay_data_.clear();
}

void SenderBandwidthEstimationHandler::write(Context *ctx, std::shared_ptr<DataPacket> packet) {
RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);
if (!chead->isRtcp() && packet->type == VIDEO_PACKET) {
period_packets_sent_++;
time_point now = clock_->now();
if (received_remb_ && now - last_estimate_update_ > kMinUpdateEstimateInterval) {
sender_bwe_->UpdateEstimate(ClockUtils::timePointToMs(now));
updateEstimate();
last_estimate_update_ = now;
if (!chead->isRtcp()) {
RtpHeader *rtp_header = reinterpret_cast<RtpHeader*>(packet->data);
uint32_t ssrc = rtp_header->getSSRC();
auto packets_sent_for_ssrc = period_packets_sent_.find(ssrc);
if (packets_sent_for_ssrc != period_packets_sent_.end()) {
packets_sent_for_ssrc->second++;
} else {
period_packets_sent_.emplace(ssrc, 1);
}
if (packet->type == VIDEO_PACKET) {
time_point now = clock_->now();
if (received_remb_ && now - last_estimate_update_ > kMinUpdateEstimateInterval) {
sender_bwe_->UpdateEstimate(ClockUtils::timePointToMs(now));
updateEstimate();
last_estimate_update_ = now;
}
}
} else if (chead->getPacketType() == RTCP_Sender_PT) {
analyzeSr(chead);
Expand All @@ -147,10 +205,11 @@ void SenderBandwidthEstimationHandler::write(Context *ctx, std::shared_ptr<DataP
void SenderBandwidthEstimationHandler::analyzeSr(RtcpHeader* chead) {
uint64_t now = ClockUtils::timePointToMs(clock_->now());
uint32_t ntp;
uint32_t ssrc = chead->getSSRC();
ntp = chead->get32MiddleNtp();
ELOG_DEBUG("%s message: adding incoming SR to list, ntp: %u", connection_->toLog(), ntp);
sr_delay_data_.push_back(std::shared_ptr<SrDelayData>( new SrDelayData(ntp, now)));
if (sr_delay_data_.size() >= kMaxSrListSize) {
sr_delay_data_.push_back(std::make_shared<SrDelayData>(ssrc, ntp, now));
if (sr_delay_data_.size() >= max_sr_delay_data_size_) {
sr_delay_data_.pop_front();
}
}
Expand Down
10 changes: 9 additions & 1 deletion erizo/src/erizo/rtp/SenderBandwidthEstimationHandler.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#ifndef ERIZO_SRC_ERIZO_RTP_SENDERBANDWIDTHESTIMATIONHANDLER_H_
#define ERIZO_SRC_ERIZO_RTP_SENDERBANDWIDTHESTIMATIONHANDLER_H_
#include <map>

#include "pipeline/Handler.h"
#include "./logger.h"
#include "./WebRtcConnection.h"
Expand Down Expand Up @@ -50,6 +52,9 @@ class SenderBandwidthEstimationHandler : public Handler,
void setListener(SenderBandwidthEstimationListener* listener) {
bwe_listener_ = listener;
}
private:
void updateMaxListSizes();
void updateReceiverBlockFromList();

private:
WebRtcConnection* connection_;
Expand All @@ -58,14 +63,17 @@ class SenderBandwidthEstimationHandler : public Handler,
bool initialized_;
bool enabled_;
bool received_remb_;
uint32_t period_packets_sent_;
std::map<uint32_t, uint32_t> period_packets_sent_;
int estimated_bitrate_;
uint8_t estimated_loss_;
int64_t estimated_rtt_;
time_point last_estimate_update_;
std::shared_ptr<SendSideBandwidthEstimation> sender_bwe_;
std::list<std::shared_ptr<SrDelayData>> sr_delay_data_;
std::list<std::shared_ptr<RrDelayData>> rr_delay_data_;
std::shared_ptr<Stats> stats_;
uint32_t max_rr_delay_data_size_;
uint32_t max_sr_delay_data_size_;

void updateEstimate();
};
Expand Down

0 comments on commit 32a1658

Please sign in to comment.