Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RR handling for SenderBWE #1507

Merged
merged 3 commits into from
Nov 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion erizo/src/erizo/rtp/RtcpAggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void RtcpAggregator::analyzeSr(RtcpHeader* chead) {
uint32_t ntp;
uint64_t theNTP = chead->getNtpTimestamp();
ntp = (theNTP & (0xFFFFFFFF0000)) >> 16;
theData->senderReports.push_back(boost::shared_ptr<SrDelayData>( new SrDelayData(ntp, now)));
theData->senderReports.push_back(boost::shared_ptr<SrDelayData>( new SrDelayData(recvSSRC, ntp, now)));
// We only store the last 20 sr
if (theData->senderReports.size() > 20) {
theData->senderReports.pop_front();
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/rtp/RtcpForwarder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void RtcpForwarder::analyzeSr(RtcpHeader* chead) {
uint32_t ntp;
uint64_t theNTP = chead->getNtpTimestamp();
ntp = (theNTP & (0xFFFFFFFF0000)) >> 16;
theData->senderReports.push_back(boost::shared_ptr<SrDelayData>( new SrDelayData(ntp, now)));
theData->senderReports.push_back(boost::shared_ptr<SrDelayData>( new SrDelayData(recvSSRC, ntp, now)));
// We only store the last 20 sr
if (theData->senderReports.size() > 20) {
theData->senderReports.pop_front();
Expand Down
19 changes: 17 additions & 2 deletions erizo/src/erizo/rtp/RtcpProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,30 @@ namespace erizo {

class SrDelayData {
public:
uint32_t ssrc;
uint32_t sr_ntp;
uint64_t sr_send_time;

SrDelayData() : sr_ntp{0}, sr_send_time{0} {}
SrDelayData() : ssrc{0}, sr_ntp{0}, sr_send_time{0} {}

SrDelayData(uint32_t ntp, uint64_t send_time) : sr_ntp{ntp},
SrDelayData(uint32_t source_ssrc, uint32_t ntp, uint64_t send_time) : ssrc{source_ssrc},
sr_ntp{ntp},
sr_send_time{send_time} {}
};

class RrDelayData {
public:
uint32_t ssrc;
uint32_t delay;
uint64_t fraction_lost;

RrDelayData() : ssrc{0}, delay{0}, fraction_lost{0} {}

RrDelayData(uint32_t rr_ssrc, uint32_t rr_delay, uint64_t rr_fraction_lost) : ssrc{rr_ssrc},
delay{rr_delay},
fraction_lost{rr_fraction_lost} {}
};

class RtcpData {
// lost packets - list and length
public:
Expand Down
119 changes: 89 additions & 30 deletions erizo/src/erizo/rtp/SenderBandwidthEstimantionHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#include "./MediaDefinitions.h"
#include "rtp/SenderBandwidthEstimationHandler.h"

#include <utility>

#include "./MediaDefinitions.h"
#include "rtp/RtpUtils.h"
#include "./MediaStream.h"

namespace erizo {

Expand All @@ -11,10 +14,11 @@ constexpr duration SenderBandwidthEstimationHandler::kMinUpdateEstimateInterval;

SenderBandwidthEstimationHandler::SenderBandwidthEstimationHandler(std::shared_ptr<Clock> the_clock) :
connection_{nullptr}, bwe_listener_{nullptr}, clock_{the_clock}, initialized_{false}, enabled_{true},
received_remb_{false}, period_packets_sent_{0}, estimated_bitrate_{0}, estimated_loss_{0},
estimated_rtt_{0}, last_estimate_update_{clock::now()}, sender_bwe_{new SendSideBandwidthEstimation()} {
received_remb_{false}, estimated_bitrate_{0}, estimated_loss_{0},
estimated_rtt_{0}, last_estimate_update_{clock::now()}, sender_bwe_{new SendSideBandwidthEstimation()},
max_rr_delay_data_size_{0}, max_sr_delay_data_size_{0} {
sender_bwe_->SetBitrates(kStartSendBitrate, kMinSendBitrate, kMaxSendBitrate);
};
}

SenderBandwidthEstimationHandler::SenderBandwidthEstimationHandler(const SenderBandwidthEstimationHandler&& handler) : // NOLINT
connection_{handler.connection_},
Expand All @@ -28,7 +32,10 @@ SenderBandwidthEstimationHandler::SenderBandwidthEstimationHandler(const SenderB
estimated_loss_{handler.estimated_loss_},
estimated_rtt_{handler.estimated_rtt_},
sender_bwe_{handler.sender_bwe_},
sr_delay_data_{std::move(handler.sr_delay_data_)} {}
sr_delay_data_{std::move(handler.sr_delay_data_)},
rr_delay_data_{std::move(handler.rr_delay_data_)},
max_rr_delay_data_size_{handler.max_sr_delay_data_size_},
max_sr_delay_data_size_{handler.max_rr_delay_data_size_} {}


void SenderBandwidthEstimationHandler::enable() {
Expand All @@ -41,6 +48,7 @@ void SenderBandwidthEstimationHandler::disable() {

void SenderBandwidthEstimationHandler::notifyUpdate() {
if (initialized_) {
updateMaxListSizes();
return;
}
auto pipeline = getContext()->getPipelineShared();
Expand All @@ -50,33 +58,43 @@ void SenderBandwidthEstimationHandler::notifyUpdate() {
if (!connection_) {
return;
}
updateMaxListSizes();
stats_ = pipeline->getService<Stats>();
if (!stats_) {
return;
}
initialized_ = true;
}

void SenderBandwidthEstimationHandler::updateMaxListSizes() {
size_t num_streams = 0;
connection_->forEachMediaStream([&num_streams] (std::shared_ptr<MediaStream> media_stream) {
lodoyun marked this conversation as resolved.
Show resolved Hide resolved
if (!media_stream->isPublisher()) {
num_streams++;
}
});
max_sr_delay_data_size_ = num_streams * kMaxSrListSize;
max_rr_delay_data_size_ = num_streams;
updateReceiverBlockFromList();
}

void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {
RtpUtils::forEachRtcpBlock(packet, [this](RtcpHeader *chead) {
ELOG_DEBUG("%s ssrc %u, sourceSSRC %u, PacketType %u", connection_->toLog(),
chead->getSSRC(),
chead->getSourceSSRC(),
chead->getPacketType());
switch (chead->packettype) {
case RTCP_Receiver_PT:
{
// calculate RTT + Update receiver block
uint32_t delay_since_last_ms = (chead->getDelaySinceLastSr() * 1000) / 65536;
int64_t now_ms = ClockUtils::timePointToMs(clock_->now());
uint32_t last_sr = chead->getLastSr();
uint32_t ssrc = chead->getSourceSSRC();

auto value = std::find_if(sr_delay_data_.begin(), sr_delay_data_.end(),
[last_sr](const std::shared_ptr<SrDelayData> sr_info) {
return sr_info->sr_ntp == last_sr;
[ssrc, last_sr](const std::shared_ptr<SrDelayData> sr_info) {
return sr_info->ssrc == ssrc && sr_info->sr_ntp == last_sr;
});
ELOG_DEBUG("%s, Analyzing Video RR: PacketLost %u, Ratio %u, blocks %d"
", sourceSSRC %u, ssrc %u, last_sr %u, remb_received %d, found %d",
", sourceSSRC %u, ssrc %u, last_sr %u, remb_received %d, found %d, max_size: %d, size: %d",
connection_->toLog(),
chead->getLostPackets(),
chead->getFractionLost(),
Expand All @@ -85,17 +103,14 @@ void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptr<DataPa
chead->getSSRC(),
chead->getLastSr(),
received_remb_,
value != sr_delay_data_.end());
// TODO(pedro) Implement alternative when there are no REMBs
value != sr_delay_data_.end(),
max_rr_delay_data_size_,
rr_delay_data_.size());
if (received_remb_ && value != sr_delay_data_.end()) {
uint32_t delay = now_ms - (*value)->sr_send_time - delay_since_last_ms;
ELOG_DEBUG("%s message: Updating Estimate with RR, fraction_lost: %u, "
"delay: %u, period_packets_sent_: %u",
connection_->toLog(), chead->getFractionLost(), delay, period_packets_sent_);
sender_bwe_->UpdateReceiverBlock(chead->getFractionLost(),
delay, period_packets_sent_, now_ms);
period_packets_sent_ = 0;
updateEstimate();
rr_delay_data_.push_back(
std::make_shared<RrDelayData>(chead->getSourceSSRC(), delay, chead->getFractionLost()));
updateReceiverBlockFromList();
}
}
break;
Expand Down Expand Up @@ -128,15 +143,58 @@ void SenderBandwidthEstimationHandler::read(Context *ctx, std::shared_ptr<DataPa
ctx->fireRead(std::move(packet));
}

void SenderBandwidthEstimationHandler::updateReceiverBlockFromList() {
if (rr_delay_data_.size() < max_rr_delay_data_size_) {
return;
}
// TODO(pedro) Implement alternative when there are no REMBs
if (received_remb_) {
uint32_t total_packets_lost = 0;
uint32_t total_packets_sent = 0;
uint64_t avg_delay = 0;
uint32_t rr_delay_data_size = rr_delay_data_.size();
int64_t now_ms = ClockUtils::timePointToMs(clock_->now());
std::for_each(rr_delay_data_.begin(), rr_delay_data_.end(),
[&avg_delay, &total_packets_lost, rr_delay_data_size, &total_packets_sent, this]
(const std::shared_ptr<RrDelayData> &rr_info) {
auto packets_sent_ssrc = period_packets_sent_.find(rr_info->ssrc);
if (packets_sent_ssrc != period_packets_sent_.end()) {
total_packets_lost += rr_info->fraction_lost * packets_sent_ssrc->second / 255;
total_packets_sent += packets_sent_ssrc->second;
avg_delay += rr_info->delay / rr_delay_data_size;
}
});
if (total_packets_sent > 0) {
uint32_t fraction_lost = total_packets_lost * 255 / total_packets_sent;
ELOG_DEBUG("%s message: Updating Estimate with RR, fraction_lost: %u, "
"delay: %u, period_packets_sent_: %u",
connection_->toLog(), fraction_lost, avg_delay, total_packets_sent);
sender_bwe_->UpdateReceiverBlock(fraction_lost, avg_delay, total_packets_sent, now_ms);
updateEstimate();
}
}
period_packets_sent_.clear();
rr_delay_data_.clear();
}

void SenderBandwidthEstimationHandler::write(Context *ctx, std::shared_ptr<DataPacket> packet) {
RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);
if (!chead->isRtcp() && packet->type == VIDEO_PACKET) {
period_packets_sent_++;
time_point now = clock_->now();
if (received_remb_ && now - last_estimate_update_ > kMinUpdateEstimateInterval) {
sender_bwe_->UpdateEstimate(ClockUtils::timePointToMs(now));
updateEstimate();
last_estimate_update_ = now;
if (!chead->isRtcp()) {
RtpHeader *rtp_header = reinterpret_cast<RtpHeader*>(packet->data);
uint32_t ssrc = rtp_header->getSSRC();
auto packets_sent_for_ssrc = period_packets_sent_.find(ssrc);
if (packets_sent_for_ssrc != period_packets_sent_.end()) {
packets_sent_for_ssrc->second++;
} else {
period_packets_sent_.emplace(ssrc, 1);
}
if (packet->type == VIDEO_PACKET) {
time_point now = clock_->now();
if (received_remb_ && now - last_estimate_update_ > kMinUpdateEstimateInterval) {
sender_bwe_->UpdateEstimate(ClockUtils::timePointToMs(now));
updateEstimate();
last_estimate_update_ = now;
}
}
} else if (chead->getPacketType() == RTCP_Sender_PT) {
analyzeSr(chead);
Expand All @@ -147,10 +205,11 @@ void SenderBandwidthEstimationHandler::write(Context *ctx, std::shared_ptr<DataP
void SenderBandwidthEstimationHandler::analyzeSr(RtcpHeader* chead) {
uint64_t now = ClockUtils::timePointToMs(clock_->now());
uint32_t ntp;
uint32_t ssrc = chead->getSSRC();
ntp = chead->get32MiddleNtp();
ELOG_DEBUG("%s message: adding incoming SR to list, ntp: %u", connection_->toLog(), ntp);
sr_delay_data_.push_back(std::shared_ptr<SrDelayData>( new SrDelayData(ntp, now)));
if (sr_delay_data_.size() >= kMaxSrListSize) {
sr_delay_data_.push_back(std::make_shared<SrDelayData>(ssrc, ntp, now));
if (sr_delay_data_.size() >= max_sr_delay_data_size_) {
sr_delay_data_.pop_front();
}
}
Expand Down
10 changes: 9 additions & 1 deletion erizo/src/erizo/rtp/SenderBandwidthEstimationHandler.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#ifndef ERIZO_SRC_ERIZO_RTP_SENDERBANDWIDTHESTIMATIONHANDLER_H_
#define ERIZO_SRC_ERIZO_RTP_SENDERBANDWIDTHESTIMATIONHANDLER_H_
#include <map>

#include "pipeline/Handler.h"
#include "./logger.h"
#include "./WebRtcConnection.h"
Expand Down Expand Up @@ -50,6 +52,9 @@ class SenderBandwidthEstimationHandler : public Handler,
void setListener(SenderBandwidthEstimationListener* listener) {
bwe_listener_ = listener;
}
private:
void updateMaxListSizes();
void updateReceiverBlockFromList();

private:
WebRtcConnection* connection_;
Expand All @@ -58,14 +63,17 @@ class SenderBandwidthEstimationHandler : public Handler,
bool initialized_;
bool enabled_;
bool received_remb_;
uint32_t period_packets_sent_;
std::map<uint32_t, uint32_t> period_packets_sent_;
int estimated_bitrate_;
uint8_t estimated_loss_;
int64_t estimated_rtt_;
time_point last_estimate_update_;
std::shared_ptr<SendSideBandwidthEstimation> sender_bwe_;
std::list<std::shared_ptr<SrDelayData>> sr_delay_data_;
std::list<std::shared_ptr<RrDelayData>> rr_delay_data_;
std::shared_ptr<Stats> stats_;
uint32_t max_rr_delay_data_size_;
uint32_t max_sr_delay_data_size_;

void updateEstimate();
};
Expand Down