Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved nack generator #1096

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions erizo/src/erizo/rtp/RtcpFeedbackGenerationHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include "rtp/RtcpFeedbackGenerationHandler.h"
#include "./MediaStream.h"
#include "rtp/RtpUtils.h"

namespace erizo {

DEFINE_LOGGER(RtcpFeedbackGenerationHandler, "rtp.RtcpFeedbackGenerationHandler");

RtcpFeedbackGenerationHandler::RtcpFeedbackGenerationHandler(bool nacks_enabled,
RtcpFeedbackGenerationHandler::RtcpFeedbackGenerationHandler(bool nacks_enabled, bool pli_enabled,
std::shared_ptr<Clock> the_clock)
: stream_{nullptr}, enabled_{true}, initialized_{false}, nacks_enabled_{nacks_enabled}, clock_{the_clock} {}
: stream_{nullptr}, enabled_{true}, initialized_{false}, nacks_enabled_{nacks_enabled},
pli_enabled_{pli_enabled}, clock_{the_clock} {}

void RtcpFeedbackGenerationHandler::enable() {
enabled_ = true;
Expand Down Expand Up @@ -39,6 +41,7 @@ void RtcpFeedbackGenerationHandler::read(Context *ctx, std::shared_ptr<DataPacke
}
bool should_send_rr = false;
bool should_send_nack = false;
bool should_send_pli = false;

if (!chead->isRtcp()) {
RtpHeader *head = reinterpret_cast<RtpHeader*>(packet->data);
Expand All @@ -47,7 +50,14 @@ void RtcpFeedbackGenerationHandler::read(Context *ctx, std::shared_ptr<DataPacke
if (generator_it != generators_map_.end()) {
should_send_rr = generator_it->second->rr_generator->handleRtpPacket(packet);
if (nacks_enabled_) {
should_send_nack = generator_it->second->nack_generator->handleRtpPacket(packet);
const auto ret = generator_it->second->nack_generator->handleRtpPacket(packet);
should_send_nack = ret.first;
should_send_pli = ret.second;
if (pli_enabled_ && should_send_pli) {
ELOG_DEBUG("message: should send PLI, ssrc: %u", ssrc);
auto pli = RtpUtils::createPLI(ssrc, packet->type == VIDEO_PACKET ? video_sink_ssrc_ : audio_sink_ssrc_);
ctx->fireWrite(std::move(pli));
}
}
} else {
ELOG_DEBUG("message: no Generator found, ssrc: %u", ssrc);
Expand Down Expand Up @@ -83,6 +93,10 @@ void RtcpFeedbackGenerationHandler::notifyUpdate() {
if (!stream_) {
return;
}

video_sink_ssrc_ = stream_->getVideoSinkSSRC();
audio_sink_ssrc_ = stream_->getAudioSinkSSRC();

// TODO(pedro) detect if nacks are enabled here with the negotiated SDP scanning the rtp_mappings
std::vector<uint32_t> video_ssrc_list = stream_->getVideoSourceSSRCList();
std::for_each(video_ssrc_list.begin(), video_ssrc_list.end(), [this] (uint32_t video_ssrc) {
Expand All @@ -94,7 +108,7 @@ void RtcpFeedbackGenerationHandler::notifyUpdate() {
ELOG_DEBUG("%s, message: Initialized video rrGenerator, ssrc: %u", stream_->toLog(), video_ssrc);
if (nacks_enabled_) {
ELOG_DEBUG("%s, message: Initialized video nack generator, ssrc %u", stream_->toLog(), video_ssrc);
auto video_nack = std::make_shared<RtcpNackGenerator>(video_ssrc, clock_);
auto video_nack = std::make_shared<RtcpNewNackGenerator>(video_ssrc, clock_);
video_generator->nack_generator = video_nack;
}
}
Expand Down
10 changes: 7 additions & 3 deletions erizo/src/erizo/rtp/RtcpFeedbackGenerationHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "./logger.h"
#include "pipeline/Handler.h"
#include "rtp/RtcpRrGenerator.h"
#include "rtp/RtcpNackGenerator.h"
#include "rtp/RtcpNewNackGenerator.h"
#include "lib/ClockUtils.h"

#define MAX_DELAY 450000
Expand All @@ -20,7 +20,7 @@ class MediaStream;
class RtcpGeneratorPair {
public:
std::shared_ptr<RtcpRrGenerator> rr_generator;
std::shared_ptr<RtcpNackGenerator> nack_generator;
std::shared_ptr<RtcpNewNackGenerator> nack_generator;
};


Expand All @@ -29,7 +29,7 @@ class RtcpFeedbackGenerationHandler: public Handler {


public:
explicit RtcpFeedbackGenerationHandler(bool nacks_enabled = true,
explicit RtcpFeedbackGenerationHandler(bool nacks_enabled = true, bool pli_enabled = true,
std::shared_ptr<Clock> the_clock = std::make_shared<SteadyClock>());


Expand All @@ -50,7 +50,11 @@ class RtcpFeedbackGenerationHandler: public Handler {

bool enabled_, initialized_;
bool nacks_enabled_;
bool pli_enabled_;
std::shared_ptr<Clock> clock_;

uint32_t video_sink_ssrc_ = 0;
uint32_t audio_sink_ssrc_ = 0;
};
} // namespace erizo

Expand Down
14 changes: 7 additions & 7 deletions erizo/src/erizo/rtp/RtcpNackGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ static const int kNackCommonHeaderLengthRtcp = kNackCommonHeaderLengthBytes/4 -
RtcpNackGenerator::RtcpNackGenerator(uint32_t ssrc, std::shared_ptr<Clock> the_clock) :
initialized_{false}, highest_seq_num_{0}, ssrc_{ssrc}, clock_{the_clock} {}

bool RtcpNackGenerator::handleRtpPacket(std::shared_ptr<DataPacket> packet) {
std::pair<bool, bool> RtcpNackGenerator::handleRtpPacket(std::shared_ptr<DataPacket> packet) {
if (packet->type != VIDEO_PACKET) {
return false;
return {false, false};
}
RtpHeader *head = reinterpret_cast<RtpHeader*>(packet->data);
uint16_t seq_num = head->getSeqNumber();
if (head->getSSRC() != ssrc_) {
ELOG_DEBUG("message: handleRtpPacket Unknown SSRC, ssrc: %u", head->getSSRC());
return false;
return {false, false};
}
if (!initialized_) {
highest_seq_num_ = seq_num;
initialized_ = true;
return 0;
return {false, false};
}
if (seq_num == highest_seq_num_) {
return false;
return {false, false};
}
// TODO(pedro) Consider clearing the nack list if this is a keyframe
if (RtpUtils::sequenceNumberLessThan(seq_num, highest_seq_num_)) {
Expand All @@ -45,11 +45,11 @@ bool RtcpNackGenerator::handleRtpPacket(std::shared_ptr<DataPacket> packet) {
ELOG_DEBUG("message: Recovered Packet %u", seq_num);
nack_info_list_.erase(nack_info);
}
return false;
return {false, false};
}
bool available_nacks = addNacks(seq_num);
highest_seq_num_ = seq_num;
return available_nacks;
return {available_nacks, false};
}

bool RtcpNackGenerator::addNacks(uint16_t seq_num) {
Expand Down
5 changes: 4 additions & 1 deletion erizo/src/erizo/rtp/RtcpNackGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ class RtcpNackGenerator{
public:
explicit RtcpNackGenerator(uint32_t ssrc_,
std::shared_ptr<Clock> the_clock = std::make_shared<SteadyClock>());
bool handleRtpPacket(std::shared_ptr<DataPacket> packet);
/**
* @return A pair where first indicates if there's a NACK request and second if there's a PLI request
*/
std::pair<bool, bool> handleRtpPacket(std::shared_ptr<DataPacket> packet);
bool addNackPacketToRr(std::shared_ptr<DataPacket> rr_packet);

private:
Expand Down
189 changes: 189 additions & 0 deletions erizo/src/erizo/rtp/RtcpNewNackGenerator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#include "rtp/RtcpNewNackGenerator.h"

#include <algorithm>
#include "rtp/RtpUtils.h"

namespace erizo {
namespace {
constexpr int max_nack_blocks = 10;
constexpr int kNackCommonHeaderLengthRtcp = kNackCommonHeaderLengthBytes / 4 - 1;
constexpr int nack_interval = 50;

/**
* Drops from the set all packets older than the given sequence_number
*/
void drop_packets_from_set(Seq_number_set& set, uint16_t sequence_number) { // NOLINT
set.erase(set.begin(), set.upper_bound(sequence_number));
}

void attach_nack_to_rr(uint32_t ssrc,
std::shared_ptr<DataPacket>& rr_packet, // NOLINT
const std::vector<NackBlock>& nack_blocks);

std::vector<NackBlock> build_nack_blocks(const Seq_number_set& set);
} // namespace

DEFINE_LOGGER(RtcpNewNackGenerator, "rtp.RtcpNewNackGenerator");

std::pair<bool, bool> RtcpNewNackGenerator::handleRtpPacket(const std::shared_ptr<DataPacket>& packet) {
if (packet->type != VIDEO_PACKET) {
return {false, false};
}
RtpHeader *head = reinterpret_cast<RtpHeader*>(packet->data);
if (head->getSSRC() != ssrc_) {
ELOG_DEBUG("message: handleRtpPacket Unknown SSRC, ssrc: %u", head->getSSRC());
return {false, false};
}
const uint16_t seq_num = head->getSeqNumber();
// Save when we got a key frame
if (packet->is_keyframe) {
keyframe_sequence_numbers_.insert(keyframe_sequence_numbers_.cend(), seq_num);
}
if (!latest_received_sequence_number_) {
latest_received_sequence_number_ = seq_num;
return {false, false};
}
// Purge old keyframes
const auto newest_received_sequence_number = latest_sequence_number(*latest_received_sequence_number_, seq_num);
drop_packets_from_set(keyframe_sequence_numbers_,
newest_received_sequence_number - constants::max_packet_age_to_nack);
const auto should_ask_pli = !update_nack_list(seq_num);
latest_received_sequence_number_ = newest_received_sequence_number;
return {!missing_sequence_numbers_.empty() && is_time_to_send(clock_->now()), should_ask_pli};
}

bool RtcpNewNackGenerator::missing_too_old_packet(const uint16_t latest_sequence_number) {
if (missing_sequence_numbers_.empty()) {
return false;
}
const uint16_t age_of_oldest_missing_packet = latest_sequence_number - *missing_sequence_numbers_.cbegin();
return age_of_oldest_missing_packet > constants::max_packet_age_to_nack;
}

bool RtcpNewNackGenerator::handle_too_large_nack_list() {
ELOG_DEBUG("NACK list has grown too large");
bool key_frame_found = false;
while (too_large_nack_list()) {
key_frame_found = recycle_frames_until_key_frame();
}
return key_frame_found;
}

bool RtcpNewNackGenerator::handle_too_old_packets(const uint16_t latest_sequence_number) {
ELOG_DEBUG("NACK list contains too old sequence numbers");
bool key_frame_found = false;
while (missing_too_old_packet(latest_sequence_number)) {
key_frame_found = recycle_frames_until_key_frame();
}
return key_frame_found;
}

bool RtcpNewNackGenerator::recycle_frames_until_key_frame() {
// should use a frame buffer strategy, for now approximate by seeking the starting packet of the next key frame
std::pair<bool, int16_t> key_frame = extract_oldest_keyframe();
if (key_frame.first) {
drop_packets_from_set(missing_sequence_numbers_, key_frame.second);
ELOG_DEBUG("recycling frames... found keyframe at seq: %u", key_frame.second);
} else {
ELOG_DEBUG("recycling frames... dropping all");
missing_sequence_numbers_.clear();
}
return key_frame.first;
}

bool RtcpNewNackGenerator::update_nack_list(const uint16_t seq) {
if (is_newer_sequence_number(seq, *latest_received_sequence_number_)) {
for (uint16_t i = *latest_received_sequence_number_ + 1; is_newer_sequence_number(seq, i); ++i) {
missing_sequence_numbers_.insert(missing_sequence_numbers_.cend(), i);
ELOG_DEBUG("added sequence number to NACK list: %u", i);
}
if (too_large_nack_list() && !handle_too_large_nack_list()) {
return false;
}
if (missing_too_old_packet(seq) && !handle_too_old_packets(seq)) {
return false;
}
} else {
missing_sequence_numbers_.erase(seq);
ELOG_DEBUG("recovered packet: %u", seq);
}
return true;
}

std::pair<bool, uint16_t> RtcpNewNackGenerator::extract_oldest_keyframe() {
if (keyframe_sequence_numbers_.empty()) {
return {false, 0};
}
auto seq = *keyframe_sequence_numbers_.cbegin();
keyframe_sequence_numbers_.erase(keyframe_sequence_numbers_.cbegin());
return {true, seq};
}

bool RtcpNewNackGenerator::addNackPacketToRr(std::shared_ptr<DataPacket>& rr_packet) {
const auto now = clock_->now();
if (!is_time_to_send(now)) {
return false;
}
rtcp_send_time_ = now;
const auto nack_blocks = build_nack_blocks(missing_sequence_numbers_);
attach_nack_to_rr(ssrc_, rr_packet, nack_blocks);
return true;
}

std::chrono::milliseconds RtcpNewNackGenerator::rtcp_min_interval() const {
return std::chrono::milliseconds(nack_interval);
}

bool RtcpNewNackGenerator::is_time_to_send(clock::time_point tp) const {
return (rtcp_send_time_ + rtcp_min_interval() <= tp);
}

namespace {
std::vector<NackBlock> build_nack_blocks(const Seq_number_set& set) {
std::vector<NackBlock> nack_blocks;
for (auto it = set.cbegin(); it != set.cend(); ++it) {
if (nack_blocks.size() >= max_nack_blocks) {
break;
}
const uint16_t pid = *it;
ELOG_TRACE2(RtcpNewNackGenerator::logger, "added NACK PID, seq_num %u", pid);
uint16_t blp = 0;
for (++it; it != set.cend(); ++it) {
uint16_t distance = *it - pid;
if (distance > 16) {
break;
}
ELOG_TRACE2(RtcpNewNackGenerator::logger, "added Nack to BLP, seq_num: %u", *it);
blp |= (1 << (distance - 1));
}
--it;
NackBlock block;
block.setNackPid(pid);
block.setNackBlp(blp);
nack_blocks.push_back(block);
}
return nack_blocks;
}

void attach_nack_to_rr(const uint32_t ssrc,
std::shared_ptr<DataPacket>& rr_packet, // NOLINT
const std::vector<NackBlock>& nack_blocks) {
char* buffer = rr_packet->data;
buffer += rr_packet->length;

RtcpHeader nack_packet;
nack_packet.setPacketType(RTCP_RTP_Feedback_PT);
nack_packet.setBlockCount(1);
nack_packet.setSSRC(ssrc);
nack_packet.setSourceSSRC(ssrc);
nack_packet.setLength(kNackCommonHeaderLengthRtcp + nack_blocks.size());
memcpy(buffer, reinterpret_cast<char *>(&nack_packet), kNackCommonHeaderLengthBytes);
buffer += kNackCommonHeaderLengthBytes;

memcpy(buffer, nack_blocks.data(), nack_blocks.size()*4);
int nack_length = (nack_packet.getLength()+1)*4;

rr_packet->length += nack_length;
}
} // namespace
} // namespace erizo
Loading