Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19576] Updatable disable_positive_acks period (backport #3879) #3897

Merged
merged 6 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fastdds/dds/core/policy/QosPolicies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,7 @@ class DisablePositiveACKsQosPolicy : public Parameter_t, public QosPolicy
const DisablePositiveACKsQosPolicy& b) const
{
return enabled == b.enabled &&
duration == b.duration &&
Parameter_t::operator ==(b) &&
QosPolicy::operator ==(b);
}
Expand Down
3 changes: 2 additions & 1 deletion include/fastdds/dds/publisher/qos/DataWriterQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class RTPSReliableWriterQos
const RTPSReliableWriterQos& b) const
{
return (this->times == b.times) &&
(this->disable_positive_acks == b.disable_positive_acks);
(this->disable_positive_acks == b.disable_positive_acks) &&
(this->disable_heartbeat_piggyback == b.disable_heartbeat_piggyback);
}

//!Writer Timing Attributes
Expand Down
7 changes: 7 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ class StatefulWriter : public RTPSWriter
void updateTimes(
const WriterTimes& times);

/**
* Update the period of the disable positive ACKs policy.
* @param att WriterAttributes parameter.
*/
void updatePositiveAcks(
const WriterAttributes& att);

SequenceNumber_t next_sequence_number() const;

/**
Expand Down
176 changes: 109 additions & 67 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,10 +998,22 @@ ReturnCode_t DataWriterImpl::set_qos(
{
return ReturnCode_t::RETCODE_IMMUTABLE_POLICY;
}

set_qos(qos_, qos_to_set, !enabled);

if (enabled)
{
if (qos_.reliability().kind == eprosima::fastrtps::RELIABLE_RELIABILITY_QOS &&
qos_.reliable_writer_qos() == qos_to_set.reliable_writer_qos())
{
// Update times and positive_acks attributes on RTPS Layer
WriterAttributes w_att;
w_att.times = qos_.reliable_writer_qos().times;
w_att.disable_positive_acks = qos_.reliable_writer_qos().disable_positive_acks.enabled;
w_att.keep_duration = qos_.reliable_writer_qos().disable_positive_acks.duration;
writer_->updateAttributes(w_att);
}

//Notify the participant that a Writer has changed its QOS
fastrtps::TopicAttributes topic_att = get_topic_attributes(qos_, *topic_, type_);
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
Expand Down Expand Up @@ -1504,114 +1516,137 @@ LivelinessLostStatus& DataWriterImpl::update_liveliness_lost_status(
void DataWriterImpl::set_qos(
DataWriterQos& to,
const DataWriterQos& from,
bool is_default)
bool update_immutable)
{
if (is_default && !(to.durability() == from.durability()))
{
to.durability() = from.durability();
to.durability().hasChanged = true;
}
if (is_default && !(to.durability_service() == from.durability_service()))
// Check immutable policies
if (update_immutable)
{
to.durability_service() = from.durability_service();
to.durability_service().hasChanged = true;
if (!(to.durability() == from.durability()))
{
to.durability() = from.durability();
to.durability().hasChanged = true;
}

if (!(to.durability_service() == from.durability_service()))
{
to.durability_service() = from.durability_service();
to.durability_service().hasChanged = true;
}

if (!(to.liveliness() == from.liveliness()))
{
to.liveliness() = from.liveliness();
to.liveliness().hasChanged = true;
}

if (!(to.reliability().kind == from.reliability().kind))
{
to.reliability().kind = from.reliability().kind;
to.reliability().hasChanged = true;
}

if (!(to.destination_order() == from.destination_order()))
{
to.destination_order() = from.destination_order();
to.destination_order().hasChanged = true;
}

if (!(to.history() == from.history()))
{
to.history() = from.history();
to.history().hasChanged = true;
}

if (!(to.resource_limits() == from.resource_limits()))
{
to.resource_limits() = from.resource_limits();
to.resource_limits().hasChanged = true;
}

if (!(to.ownership() == from.ownership()))
{
to.ownership() = from.ownership();
to.ownership().hasChanged = true;
}

to.publish_mode() = from.publish_mode();

if (!(to.representation() == from.representation()))
{
to.representation() = from.representation();
to.representation().hasChanged = true;
}

to.properties() = from.properties();

if (!(to.reliable_writer_qos() == from.reliable_writer_qos()))
{
RTPSReliableWriterQos& rel_to = to.reliable_writer_qos();
rel_to.disable_heartbeat_piggyback = from.reliable_writer_qos().disable_heartbeat_piggyback;
rel_to.disable_positive_acks.enabled = from.reliable_writer_qos().disable_positive_acks.enabled;
}

to.endpoint() = from.endpoint();

to.writer_resource_limits() = from.writer_resource_limits();

to.data_sharing() = from.data_sharing();

to.throughput_controller() = from.throughput_controller();
}

if (!(to.deadline() == from.deadline()))
{
to.deadline() = from.deadline();
to.deadline().hasChanged = true;
}

if (!(to.latency_budget() == from.latency_budget()))
{
to.latency_budget() = from.latency_budget();
to.latency_budget().hasChanged = true;
}
if (is_default && !(to.liveliness() == from.liveliness()))
{
to.liveliness() = from.liveliness();
to.liveliness().hasChanged = true;
}
if (is_default && !(to.reliability() == from.reliability()))

if (!(to.reliability().max_blocking_time == from.reliability().max_blocking_time))
{
to.reliability() = from.reliability();
to.reliability().max_blocking_time = from.reliability().max_blocking_time;
to.reliability().hasChanged = true;
}
if (is_default && !(to.destination_order() == from.destination_order()))
{
to.destination_order() = from.destination_order();
to.destination_order().hasChanged = true;
}
if (is_default && !(to.history() == from.history()))
{
to.history() = from.history();
to.history().hasChanged = true;
}
if (is_default && !(to.resource_limits() == from.resource_limits()))
{
to.resource_limits() = from.resource_limits();
to.resource_limits().hasChanged = true;
}

if (!(to.transport_priority() == from.transport_priority()))
{
to.transport_priority() = from.transport_priority();
to.transport_priority().hasChanged = true;
}

if (!(to.lifespan() == from.lifespan()))
{
to.lifespan() = from.lifespan();
to.lifespan().hasChanged = true;
}

if (!(to.user_data() == from.user_data()))
{
to.user_data() = from.user_data();
to.user_data().hasChanged = true;
}
if (is_default && !(to.ownership() == from.ownership()))
{
to.ownership() = from.ownership();
to.ownership().hasChanged = true;
}

if (!(to.ownership_strength() == from.ownership_strength()))
{
to.ownership_strength() = from.ownership_strength();
to.ownership_strength().hasChanged = true;
}

if (!(to.writer_data_lifecycle() == from.writer_data_lifecycle()))
{
to.writer_data_lifecycle() = from.writer_data_lifecycle();
}
if (is_default && !(to.publish_mode() == from.publish_mode()))
{
to.publish_mode() = from.publish_mode();
}
if (!(to.representation() == from.representation()))
{
to.representation() = from.representation();
to.representation().hasChanged = true;
}
if (is_default && !(to.properties() == from.properties()))
{
to.properties() = from.properties();
}
if (is_default && !(to.reliable_writer_qos() == from.reliable_writer_qos()))
{
to.reliable_writer_qos() = from.reliable_writer_qos();
}
if (is_default && !(to.endpoint() == from.endpoint()))
{
to.endpoint() = from.endpoint();
}
if (is_default && !(to.writer_resource_limits() == from.writer_resource_limits()))
{
to.writer_resource_limits() = from.writer_resource_limits();
}
if (is_default && !(to.throughput_controller() == from.throughput_controller()))
{
to.throughput_controller() = from.throughput_controller();
}
if (is_default && !(to.data_sharing() == from.data_sharing()))

if (!(to.reliable_writer_qos() == from.reliable_writer_qos()))
{
to.data_sharing() = from.data_sharing();
RTPSReliableWriterQos& rel_to = to.reliable_writer_qos();
rel_to.times = from.reliable_writer_qos().times;
rel_to.disable_positive_acks.duration = from.reliable_writer_qos().disable_positive_acks.duration;
}
}

Expand Down Expand Up @@ -1731,6 +1766,13 @@ bool DataWriterImpl::can_qos_be_updated(
updatable = false;
logWarning(RTPS_QOS_CHECK, "Data sharing configuration cannot be changed after the creation of a DataWriter.");
}
if (to.reliable_writer_qos().disable_positive_acks.enabled !=
from.reliable_writer_qos().disable_positive_acks.enabled)
{
updatable = false;
logWarning(RTPS_QOS_CHECK,
"Only the period of Positive ACKs can be changed after the creation of a DataWriter.");
}
return updatable;
}

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
static void set_qos(
DataWriterQos& to,
const DataWriterQos& from,
bool is_default);
bool update_immutable);

static ReturnCode_t check_qos(
const DataWriterQos& qos);
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,13 @@ bool DataReaderImpl::can_qos_be_updated(
logWarning(RTPS_QOS_CHECK,
"Unique network flows request cannot be changed after the creation of a DataReader.");
}
if (to.reliable_reader_qos().disable_positive_ACKs.enabled !=
from.reliable_reader_qos().disable_positive_ACKs.enabled)
{
updatable = false;
logWarning(RTPS_QOS_CHECK,
"Positive ACKs QoS cannot be changed after the creation of a DataReader.");
}
return updatable;
}

Expand Down
51 changes: 47 additions & 4 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace rtps {
/**
* Loops over all the readers in the vector, applying the given routine.
* The loop continues until the result of the routine is true for any reader
* or all readers have been processes.
* or all readers have been processed.
* The returned value is true if the routine returned true at any point,
* or false otherwise.
*/
Expand Down Expand Up @@ -944,6 +944,17 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
if (disable_positive_acks_ && last_sequence_number_ == SequenceNumber_t())
{
last_sequence_number_ = change->sequenceNumber;
if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
{
// Restart ack_timer
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->restart_timer(max_blocking_time);
}
}

// Restore in case a exception was launched by RTPSMessageGroup.
Expand Down Expand Up @@ -1571,6 +1582,24 @@ void StatefulWriter::updateAttributes(
const WriterAttributes& att)
{
this->updateTimes(att.times);
if (this->get_disable_positive_acks())
{
this->updatePositiveAcks(att);
}
}

void StatefulWriter::updatePositiveAcks(
const WriterAttributes& att)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3))
{
// Implicit conversion to microseconds
keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()};
}
// Restart ack timer with new duration
ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3);
ack_event_->restart_timer();
}

void StatefulWriter::updateTimes(
Expand Down Expand Up @@ -1988,26 +2017,40 @@ bool StatefulWriter::ack_timer_expired()

while (interval.count() < 0)
{
bool acks_flag = false;
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
[this](ReaderProxy* reader)
[this, &acks_flag](ReaderProxy* reader)
{
if (reader->disable_positive_acks())
{
reader->acked_changes_set(last_sequence_number_ + 1);
acks_flag = true;
}
return false;
}
);
last_sequence_number_++;
if (acks_flag)
{
check_acked_status();
}

// Get the next cache change from the history
CacheChange_t* change;

// Skip removed changes until reaching the last change
do
{
last_sequence_number_++;
} while (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change) && last_sequence_number_ < next_sequence_number());

if (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change))
{
// Stop ack_timer
return false;
}

Expand Down
Loading