From d9f68738376441a72b1372ca9058330c620c30b7 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 29 Jun 2021 16:05:18 +0200 Subject: [PATCH 1/9] Refs 11835. Fixed listener calls for deadline statuses. Signed-off-by: Miguel Company --- src/cpp/fastdds/publisher/DataWriterImpl.cpp | 6 +++--- src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 9 ++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 52c55cbda04..c2b90f720bd 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -1042,12 +1042,12 @@ bool DataWriterImpl::deadline_missed() deadline_missed_status_.total_count++; deadline_missed_status_.total_count_change++; deadline_missed_status_.last_instance_handle = timer_owner_; - if (listener_ != nullptr) + auto listener = get_listener_for(StatusMask::offered_deadline_missed()); + if (nullptr != listener) { listener_->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_); + deadline_missed_status_.total_count_change = 0; } - publisher_->publisher_listener_.on_offered_deadline_missed(user_datawriter_, deadline_missed_status_); - deadline_missed_status_.total_count_change = 0; if (!history_.set_next_deadline( timer_owner_, diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 8f829673a00..9a750a98e6e 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -929,9 +929,12 @@ bool DataReaderImpl::deadline_missed() deadline_missed_status_.total_count++; deadline_missed_status_.total_count_change++; deadline_missed_status_.last_instance_handle = timer_owner_; - listener_->on_requested_deadline_missed(user_datareader_, deadline_missed_status_); - subscriber_->subscriber_listener_.on_requested_deadline_missed(user_datareader_, deadline_missed_status_); - deadline_missed_status_.total_count_change = 0; + auto listener = get_listener_for(StatusMask::requested_deadline_missed()); + if (nullptr != listener) + { + listener->on_requested_deadline_missed(user_datareader_, deadline_missed_status_); + deadline_missed_status_.total_count_change = 0; + } if (!history_.set_next_deadline( timer_owner_, From 05d7aaf28893205abb8b4dd95555c09ca8e2cef1 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 29 Jun 2021 16:51:15 +0200 Subject: [PATCH 2/9] Refs 11835. Basic notifications on DataWriterImpl. Signed-off-by: Miguel Company --- src/cpp/fastdds/publisher/DataWriterImpl.cpp | 71 +++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index c2b90f720bd..13a4f3b3394 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -19,25 +19,30 @@ #include #include + +#include +#include + +#include +#include #include #include -#include -#include #include #include +#include +#include +#include +#include +#include #include #include -#include -#include -#include - -#include +#include +#include #include -#include -#include -#include + +#include #include #include @@ -45,9 +50,6 @@ #include #include -#include -#include - using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; using namespace std::chrono; @@ -961,7 +963,8 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos( fastdds::dds::PolicyMask qos) { data_writer_->update_offered_incompatible_qos(qos); - DataWriterListener* listener = data_writer_->get_listener_for(StatusMask::offered_incompatible_qos()); + StatusMask notify_status = StatusMask::offered_incompatible_qos(); + DataWriterListener* listener = data_writer_->get_listener_for(notify_status); if (listener != nullptr) { OfferedIncompatibleQosStatus callback_status; @@ -970,6 +973,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos( listener->on_offered_incompatible_qos(data_writer_->user_datawriter_, callback_status); } } + data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } void DataWriterImpl::InnerDataWriterListener::onWriterChangeReceivedByAll( @@ -992,12 +996,14 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost( fastrtps::rtps::RTPSWriter* /*writer*/, const fastrtps::LivelinessLostStatus& status) { - DataWriterListener* listener = data_writer_->get_listener_for(StatusMask::liveliness_lost()); + StatusMask notify_status = StatusMask::liveliness_lost(); + DataWriterListener* listener = data_writer_->get_listener_for(notify_status); if (listener != nullptr) { listener->on_liveliness_lost( data_writer_->user_datawriter_, status); } + data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } ReturnCode_t DataWriterImpl::wait_for_acknowledgments( @@ -1042,12 +1048,14 @@ bool DataWriterImpl::deadline_missed() deadline_missed_status_.total_count++; deadline_missed_status_.total_count_change++; deadline_missed_status_.last_instance_handle = timer_owner_; - auto listener = get_listener_for(StatusMask::offered_deadline_missed()); + StatusMask notify_status = StatusMask::offered_deadline_missed(); + auto listener = get_listener_for(notify_status); if (nullptr != listener) { listener_->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_); deadline_missed_status_.total_count_change = 0; } + user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); if (!history_.set_next_deadline( timer_owner_, @@ -1067,10 +1075,14 @@ ReturnCode_t DataWriterImpl::get_offered_deadline_missed_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(writer_->getMutex()); + { + std::unique_lock lock(writer_->getMutex()); - status = deadline_missed_status_; - deadline_missed_status_.total_count_change = 0; + status = deadline_missed_status_; + deadline_missed_status_.total_count_change = 0; + } + + user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_deadline_missed(), false); return ReturnCode_t::RETCODE_OK; } @@ -1082,10 +1094,14 @@ ReturnCode_t DataWriterImpl::get_offered_incompatible_qos_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(writer_->getMutex()); + { + std::unique_lock lock(writer_->getMutex()); - status = offered_incompatible_qos_status_; - offered_incompatible_qos_status_.total_count_change = 0u; + status = offered_incompatible_qos_status_; + offered_incompatible_qos_status_.total_count_change = 0u; + } + + user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_incompatible_qos(), false); return ReturnCode_t::RETCODE_OK; } @@ -1139,13 +1155,16 @@ ReturnCode_t DataWriterImpl::get_liveliness_lost_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(writer_->getMutex()); + { + std::unique_lock lock(writer_->getMutex()); - status.total_count = writer_->liveliness_lost_status_.total_count; - status.total_count_change = writer_->liveliness_lost_status_.total_count_change; + status.total_count = writer_->liveliness_lost_status_.total_count; + status.total_count_change = writer_->liveliness_lost_status_.total_count_change; - writer_->liveliness_lost_status_.total_count_change = 0u; + writer_->liveliness_lost_status_.total_count_change = 0u; + } + user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::liveliness_lost(), false); return ReturnCode_t::RETCODE_OK; } From 9c7cfd1050b2c14625bf69e79cf6116a8f8a16fa Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 29 Jun 2021 16:50:54 +0200 Subject: [PATCH 3/9] Refs 11835. Basic notifications on DataReaderImpl. Signed-off-by: Miguel Company --- src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 65 +++++++++++++++---- src/cpp/fastdds/subscriber/DataReaderImpl.hpp | 3 + 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 9a750a98e6e..b5666a988c4 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -36,6 +36,8 @@ #include #include #include + +#include #include #include @@ -295,6 +297,16 @@ bool DataReaderImpl::wait_for_unread_message( return reader_ ? reader_->wait_for_unread_cache(timeout) : false; } +void DataReaderImpl::set_read_communication_status( + bool trigger_value) +{ + StatusMask notify_status = StatusMask::data_on_readers(); + subscriber_->user_subscriber_->get_statuscondition().get_impl()->set_status(notify_status, trigger_value); + + notify_status = StatusMask::data_available(); + user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, trigger_value); +} + ReturnCode_t DataReaderImpl::check_collection_preconditions_and_calc_max_samples( LoanableCollection& data_values, SampleInfoSeq& sample_infos, @@ -440,6 +452,8 @@ ReturnCode_t DataReaderImpl::read_or_take( return ReturnCode_t::RETCODE_TIMEOUT; } + set_read_communication_status(false); + auto it = history_.lookup_instance(handle, exact_instance); if (!it.first) { @@ -618,6 +632,8 @@ ReturnCode_t DataReaderImpl::read_or_take_next_sample( return ReturnCode_t::RETCODE_TIMEOUT; } + set_read_communication_status(false); + auto it = history_.lookup_instance(HANDLE_NIL, false); if (!it.first) { @@ -774,6 +790,8 @@ void DataReaderImpl::InnerDataReaderListener::onNewCacheChangeAdded( { if (data_reader_->on_new_cache_change_added(change_in)) { + auto user_reader = data_reader_->user_datareader_; + //First check if we can handle with on_data_on_readers SubscriberListener* subscriber_listener = data_reader_->subscriber_->get_listener_for(StatusMask::data_on_readers()); @@ -787,9 +805,11 @@ void DataReaderImpl::InnerDataReaderListener::onNewCacheChangeAdded( DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::data_available()); if (listener != nullptr) { - listener->on_data_available(data_reader_->user_datareader_); + listener->on_data_available(user_reader); } } + + data_reader_->set_read_communication_status(true); } } @@ -809,7 +829,8 @@ void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed( const fastrtps::LivelinessChangedStatus& status) { data_reader_->update_liveliness_status(status); - DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::liveliness_changed()); + StatusMask notify_status = StatusMask::liveliness_changed(); + DataReaderListener* listener = data_reader_->get_listener_for(notify_status); if (listener != nullptr) { LivelinessChangedStatus callback_status; @@ -818,6 +839,7 @@ void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed( listener->on_liveliness_changed(data_reader_->user_datareader_, callback_status); } } + data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( @@ -825,7 +847,8 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( fastdds::dds::PolicyMask qos) { data_reader_->update_requested_incompatible_qos(qos); - DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::requested_incompatible_qos()); + StatusMask notify_status = StatusMask::requested_incompatible_qos(); + DataReaderListener* listener = data_reader_->get_listener_for(notify_status); if (listener != nullptr) { RequestedIncompatibleQosStatus callback_status; @@ -834,6 +857,7 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( listener->on_requested_incompatible_qos(data_reader_->user_datareader_, callback_status); } } + data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } bool DataReaderImpl::on_new_cache_change_added( @@ -929,12 +953,14 @@ bool DataReaderImpl::deadline_missed() deadline_missed_status_.total_count++; deadline_missed_status_.total_count_change++; deadline_missed_status_.last_instance_handle = timer_owner_; - auto listener = get_listener_for(StatusMask::requested_deadline_missed()); + StatusMask notify_status = StatusMask::requested_deadline_missed(); + auto listener = get_listener_for(notify_status); if (nullptr != listener) { listener->on_requested_deadline_missed(user_datareader_, deadline_missed_status_); deadline_missed_status_.total_count_change = 0; } + user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); if (!history_.set_next_deadline( timer_owner_, @@ -954,10 +980,14 @@ ReturnCode_t DataReaderImpl::get_requested_deadline_missed_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(reader_->getMutex()); + { + std::unique_lock lock(reader_->getMutex()); - status = deadline_missed_status_; - deadline_missed_status_.total_count_change = 0; + status = deadline_missed_status_; + deadline_missed_status_.total_count_change = 0; + } + + user_datareader_->get_statuscondition().get_impl()->set_status(StatusMask::requested_deadline_missed(), false); return ReturnCode_t::RETCODE_OK; } @@ -1035,12 +1065,15 @@ ReturnCode_t DataReaderImpl::get_liveliness_changed_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::lock_guard lock(reader_->getMutex()); + { + std::lock_guard lock(reader_->getMutex()); - status = liveliness_changed_status_; - liveliness_changed_status_.alive_count_change = 0u; - liveliness_changed_status_.not_alive_count_change = 0u; + status = liveliness_changed_status_; + liveliness_changed_status_.alive_count_change = 0u; + liveliness_changed_status_.not_alive_count_change = 0u; + } + user_datareader_->get_statuscondition().get_impl()->set_status(StatusMask::liveliness_changed(), false); return ReturnCode_t::RETCODE_OK; } @@ -1052,10 +1085,14 @@ ReturnCode_t DataReaderImpl::get_requested_incompatible_qos_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(reader_->getMutex()); + { + std::unique_lock lock(reader_->getMutex()); + + status = requested_incompatible_qos_status_; + requested_incompatible_qos_status_.total_count_change = 0u; + } - status = requested_incompatible_qos_status_; - requested_incompatible_qos_status_.total_count_change = 0u; + user_datareader_->get_statuscondition().get_impl()->set_status(StatusMask::requested_incompatible_qos(), false); return ReturnCode_t::RETCODE_OK; } diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index ff71cf80d88..c4967fff437 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -423,6 +423,9 @@ class DataReaderImpl SampleInfo* info, bool should_take); + void set_read_communication_status( + bool trigger_value); + /** * @brief A method called when a new cache change is added * @param change The cache change that has been added From 85b5ade65c7b17351780fc871bb69fb71b016060 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 30 Jun 2021 13:06:58 +0200 Subject: [PATCH 4/9] Refs 11835. Implementing DataReader::get_subscription_matched_status. Signed-off-by: Miguel Company --- src/cpp/fastdds/subscriber/DataReader.cpp | 6 +-- src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 50 +++++++++++++++++-- src/cpp/fastdds/subscriber/DataReaderImpl.hpp | 9 ++++ 3 files changed, 55 insertions(+), 10 deletions(-) diff --git a/src/cpp/fastdds/subscriber/DataReader.cpp b/src/cpp/fastdds/subscriber/DataReader.cpp index ac42b9d35b8..5191715bf6f 100644 --- a/src/cpp/fastdds/subscriber/DataReader.cpp +++ b/src/cpp/fastdds/subscriber/DataReader.cpp @@ -359,11 +359,7 @@ ReturnCode_t DataReader::get_sample_rejected_status( ReturnCode_t DataReader::get_subscription_matched_status( SubscriptionMatchedStatus& status) const { - static_cast (status); - return ReturnCode_t::RETCODE_UNSUPPORTED; - /* - return impl_->get_subscription_matched_status(status); - */ + return impl_->get_subscription_matched_status(status); } ReturnCode_t DataReader::get_matched_publication_data( diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index b5666a988c4..7a7d8194894 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -817,11 +817,7 @@ void DataReaderImpl::InnerDataReaderListener::onReaderMatched( RTPSReader* /*reader*/, const SubscriptionMatchedStatus& info) { - DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::subscription_matched()); - if (listener != nullptr) - { - listener->on_subscription_matched(data_reader_->user_datareader_, info); - } + data_reader_->update_subscription_matched_status(info); } void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed( @@ -926,6 +922,50 @@ bool DataReaderImpl::on_new_cache_change_added( return true; } +void DataReaderImpl::update_subscription_matched_status( + const SubscriptionMatchedStatus& status) +{ + auto count_change = status.current_count_change; + subscription_matched_status_.current_count += count_change; + subscription_matched_status_.current_count_change += count_change; + if (count_change > 0) + { + subscription_matched_status_.total_count += count_change; + subscription_matched_status_.total_count_change += count_change; + subscription_matched_status_.last_publication_handle = status.last_publication_handle; + } + + StatusMask notify_status = StatusMask::subscription_matched(); + DataReaderListener* listener = get_listener_for(notify_status); + if (listener != nullptr) + { + listener->on_subscription_matched(user_datareader_, subscription_matched_status_); + subscription_matched_status_.current_count_change = 0; + subscription_matched_status_.total_count_change = 0; + } + user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +ReturnCode_t DataReaderImpl::get_subscription_matched_status( + SubscriptionMatchedStatus& status) +{ + if (reader_ == nullptr) + { + return ReturnCode_t::RETCODE_NOT_ENABLED; + } + + { + std::unique_lock lock(reader_->getMutex()); + + status = subscription_matched_status_; + subscription_matched_status_.current_count_change = 0; + subscription_matched_status_.total_count_change = 0; + } + + user_datareader_->get_statuscondition().get_impl()->set_status(StatusMask::subscription_matched(), false); + return ReturnCode_t::RETCODE_OK; +} + bool DataReaderImpl::deadline_timer_reschedule() { assert(qos_.deadline().period != c_TimeInfinite); diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index c4967fff437..4b493417926 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -218,6 +218,9 @@ class DataReaderImpl */ const TopicDescription* get_topicdescription() const; + ReturnCode_t get_subscription_matched_status( + SubscriptionMatchedStatus& status); + ReturnCode_t get_requested_deadline_missed_status( fastrtps::RequestedDeadlineMissedStatus& status); @@ -373,6 +376,9 @@ class DataReaderImpl //! The current timer owner, i.e. the instance which started the deadline timer fastrtps::rtps::InstanceHandle_t timer_owner_; + //! Subscription matched status + SubscriptionMatchedStatus subscription_matched_status_; + //! Liveliness changed status LivelinessChangedStatus liveliness_changed_status_; @@ -426,6 +432,9 @@ class DataReaderImpl void set_read_communication_status( bool trigger_value); + void update_subscription_matched_status( + const SubscriptionMatchedStatus& status); + /** * @brief A method called when a new cache change is added * @param change The cache change that has been added From 072f8127323dbf8166edae27d540107dd05f71fb Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 30 Jun 2021 13:14:29 +0200 Subject: [PATCH 5/9] Refs 11835. Fixed unit test. Signed-off-by: Miguel Company --- .../dds/subscriber/DataReaderTests.cpp | 30 ++++++------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index 0213f0a1d44..3929edaf947 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -1759,17 +1759,15 @@ class DataReaderUnsupportedTests : public ::testing::Test * ReturnCode_t::RETCODE_UNSUPPORTED. The following methods are checked: * 1. get_sample_lost_status * 2. get_sample_rejected_status - * 3. get_subscription_matched_status - * 4. get_subscription_matched_status - * 5. get_matched_publication_data - * 6. create_readcondition - * 7. create_querycondition - * 8. delete_readcondition - * 9. delete_contained_entities - * 10. get_matched_publications - * 11. get_key_value - * 12. lookup_instance - * 13. wait_for_historical_data + * 3. get_matched_publication_data + * 4. create_readcondition + * 5. create_querycondition + * 6. delete_readcondition + * 7. delete_contained_entities + * 8. get_matched_publications + * 9. get_key_value + * 10. lookup_instance + * 11. wait_for_historical_data */ TEST_F(DataReaderUnsupportedTests, UnsupportedDataReaderMethods) { @@ -1799,16 +1797,6 @@ TEST_F(DataReaderUnsupportedTests, UnsupportedDataReaderMethods) EXPECT_EQ(ReturnCode_t::RETCODE_UNSUPPORTED, data_reader->get_sample_rejected_status(status)); } - { - SubscriptionMatchedStatus status; - EXPECT_EQ(ReturnCode_t::RETCODE_UNSUPPORTED, data_reader->get_subscription_matched_status(status)); - } - - { - SubscriptionMatchedStatus status; - EXPECT_EQ(ReturnCode_t::RETCODE_UNSUPPORTED, data_reader->get_subscription_matched_status(status)); - } - builtin::PublicationBuiltinTopicData publication_data; fastrtps::rtps::InstanceHandle_t publication_handle; EXPECT_EQ( From b113691afd6a72a0370fb93eb2f16ba2b9ad740d Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 30 Jun 2021 15:01:50 +0200 Subject: [PATCH 6/9] Refs 11835. Implementing DataWriter::get_publication_matched_status. Signed-off-by: Miguel Company --- src/cpp/fastdds/publisher/DataWriter.cpp | 6 +-- src/cpp/fastdds/publisher/DataWriterImpl.cpp | 50 ++++++++++++++++++-- src/cpp/fastdds/publisher/DataWriterImpl.hpp | 9 ++++ 3 files changed, 55 insertions(+), 10 deletions(-) diff --git a/src/cpp/fastdds/publisher/DataWriter.cpp b/src/cpp/fastdds/publisher/DataWriter.cpp index 115bce70da3..8f013aa838d 100644 --- a/src/cpp/fastdds/publisher/DataWriter.cpp +++ b/src/cpp/fastdds/publisher/DataWriter.cpp @@ -250,11 +250,7 @@ ReturnCode_t DataWriter::get_offered_incompatible_qos_status( ReturnCode_t DataWriter::get_publication_matched_status( PublicationMatchedStatus& status) const { - static_cast (status); - return ReturnCode_t::RETCODE_UNSUPPORTED; - /* - return impl_->get_publication_matched_status(status); - */ + return impl_->get_publication_matched_status(status); } ReturnCode_t DataWriter::get_liveliness_lost_status( diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 13a4f3b3394..f8e24bf3fa5 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -951,11 +951,7 @@ void DataWriterImpl::InnerDataWriterListener::onWriterMatched( RTPSWriter* /*writer*/, const PublicationMatchedStatus& info) { - DataWriterListener* listener = data_writer_->get_listener_for(StatusMask::publication_matched()); - if (listener != nullptr) - { - listener->on_publication_matched(data_writer_->user_datawriter_, info); - } + data_writer_->update_publication_matched_status(info); } void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos( @@ -1021,6 +1017,50 @@ ReturnCode_t DataWriterImpl::wait_for_acknowledgments( return ReturnCode_t::RETCODE_ERROR; } +void DataWriterImpl::update_publication_matched_status( + const PublicationMatchedStatus& status) +{ + auto count_change = status.current_count_change; + publication_matched_status_.current_count += count_change; + publication_matched_status_.current_count_change += count_change; + if (count_change > 0) + { + publication_matched_status_.total_count += count_change; + publication_matched_status_.total_count_change += count_change; + publication_matched_status_.last_subscription_handle = status.last_subscription_handle; + } + + StatusMask notify_status = StatusMask::publication_matched(); + DataWriterListener* listener = get_listener_for(notify_status); + if (listener != nullptr) + { + listener->on_publication_matched(user_datawriter_, publication_matched_status_); + publication_matched_status_.current_count_change = 0; + publication_matched_status_.total_count_change = 0; + } + user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +ReturnCode_t DataWriterImpl::get_publication_matched_status( + PublicationMatchedStatus& status) +{ + if (writer_ == nullptr) + { + return ReturnCode_t::RETCODE_NOT_ENABLED; + } + + { + std::unique_lock lock(writer_->getMutex()); + + status = publication_matched_status_; + publication_matched_status_.current_count_change = 0; + publication_matched_status_.total_count_change = 0; + } + + user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::publication_matched(), false); + return ReturnCode_t::RETCODE_OK; +} + bool DataWriterImpl::deadline_timer_reschedule() { assert(qos_.deadline().period != c_TimeInfinite); diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.hpp b/src/cpp/fastdds/publisher/DataWriterImpl.hpp index a5362e3bad3..83bda453804 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.hpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.hpp @@ -221,6 +221,9 @@ class DataWriterImpl ReturnCode_t wait_for_acknowledgments( const fastrtps::Duration_t& max_wait); + ReturnCode_t get_publication_matched_status( + PublicationMatchedStatus& status); + ReturnCode_t get_offered_deadline_missed_status( fastrtps::OfferedDeadlineMissedStatus& status); @@ -343,6 +346,9 @@ class DataWriterImpl //! The current timer owner, i.e. the instance which started the deadline timer InstanceHandle_t timer_owner_; + //! The publication matched status + PublicationMatchedStatus publication_matched_status_; + //! The offered deadline missed status fastrtps::OfferedDeadlineMissedStatus deadline_missed_status_; @@ -414,6 +420,9 @@ class DataWriterImpl */ bool remove_min_seq_change(); + void update_publication_matched_status( + const PublicationMatchedStatus& status); + /** * @brief A method called when an instance misses the deadline */ From 5171c018ade67f23f16581dcb675798d140d7409 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 30 Jun 2021 15:02:06 +0200 Subject: [PATCH 7/9] Refs 11835. Fixed unit test. Signed-off-by: Miguel Company --- .../dds/publisher/DataWriterTests.cpp | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/test/unittest/dds/publisher/DataWriterTests.cpp b/test/unittest/dds/publisher/DataWriterTests.cpp index 009b8c3d43f..9538bdf4d54 100644 --- a/test/unittest/dds/publisher/DataWriterTests.cpp +++ b/test/unittest/dds/publisher/DataWriterTests.cpp @@ -845,14 +845,13 @@ class DataWriterUnsupportedTests : public ::testing::Test /* * This test checks that the DataWriter methods defined in the standard not yet implemented in FastDDS return * ReturnCode_t::RETCODE_UNSUPPORTED. The following methods are checked: - * 1. get_publication_matched_status - * 2. get_matched_subscription_data - * 3. write_w_timestamp - * 4. register_instance_w_timestamp - * 5. unregister_instance_w_timestamp - * 6. get_matched_subscriptions - * 7. get_key_value - * 8. lookup_instance + * 1. get_matched_subscription_data + * 2. write_w_timestamp + * 3. register_instance_w_timestamp + * 4. unregister_instance_w_timestamp + * 5. get_matched_subscriptions + * 6. get_key_value + * 7. lookup_instance */ TEST_F(DataWriterUnsupportedTests, UnsupportedDataWriterMethods) { @@ -872,11 +871,6 @@ TEST_F(DataWriterUnsupportedTests, UnsupportedDataWriterMethods) DataWriter* data_writer = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT); ASSERT_NE(publisher, nullptr); - PublicationMatchedStatus status; - EXPECT_EQ( - ReturnCode_t::RETCODE_UNSUPPORTED, - data_writer->get_publication_matched_status(status)); - builtin::SubscriptionBuiltinTopicData subscription_data; fastrtps::rtps::InstanceHandle_t subscription_handle; EXPECT_EQ( From d4b5f3efe8bcc88f8830bf8055f7c7f207b39a24 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 30 Jun 2021 16:03:57 +0200 Subject: [PATCH 8/9] Refs 11835. Implement Entity::get_status_changes through StatusConditionImpl::get_raw_status. Signed-off-by: Miguel Company --- include/fastdds/dds/core/Entity.hpp | 9 +---- .../dds/core/condition/StatusCondition.hpp | 2 +- src/cpp/CMakeLists.txt | 1 + src/cpp/fastdds/core/Entity.cpp | 35 +++++++++++++++++++ .../core/condition/StatusConditionImpl.hpp | 9 +++++ test/unittest/dds/core/entity/CMakeLists.txt | 1 + test/unittest/dds/status/CMakeLists.txt | 1 + test/unittest/statistics/dds/CMakeLists.txt | 1 + 8 files changed, 50 insertions(+), 9 deletions(-) create mode 100644 src/cpp/fastdds/core/Entity.cpp diff --git a/include/fastdds/dds/core/Entity.hpp b/include/fastdds/dds/core/Entity.hpp index e39dec67b38..96cc0d4f639 100644 --- a/include/fastdds/dds/core/Entity.hpp +++ b/include/fastdds/dds/core/Entity.hpp @@ -45,7 +45,6 @@ class Entity RTPS_DllAPI Entity( const StatusMask& mask = StatusMask::all()) : status_mask_(mask) - , status_changes_(StatusMask::none()) , status_condition_(this) , enable_(false) { @@ -92,10 +91,7 @@ class Entity * * @return const reference to the StatusMask with the triggered statuses set to 1 */ - RTPS_DllAPI const StatusMask& get_status_changes() const - { - return status_changes_; - } + RTPS_DllAPI const StatusMask& get_status_changes() const; /** * @brief Retrieves the instance handler that represents the Entity @@ -145,9 +141,6 @@ class Entity //! StatusMask with relevant statuses set to 1 StatusMask status_mask_; - //! StatusMask with triggered statuses set to 1 - StatusMask status_changes_; - //! Condition associated to the Entity StatusCondition status_condition_; diff --git a/include/fastdds/dds/core/condition/StatusCondition.hpp b/include/fastdds/dds/core/condition/StatusCondition.hpp index 66baed08b6e..a3f5ecb4bad 100644 --- a/include/fastdds/dds/core/condition/StatusCondition.hpp +++ b/include/fastdds/dds/core/condition/StatusCondition.hpp @@ -90,7 +90,7 @@ class StatusCondition : public Condition */ RTPS_DllAPI Entity* get_entity() const; - detail::StatusConditionImpl* get_impl() + detail::StatusConditionImpl* get_impl() const { return impl_.get(); } diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index 5971d79b9f1..c02b583712c 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -158,6 +158,7 @@ set(${PROJECT_NAME}_source_files dynamic-types/DynamicDataHelper.cpp fastrtps_deprecated/attributes/TopicAttributes.cpp + fastdds/core/Entity.cpp fastdds/core/condition/Condition.cpp fastdds/core/condition/ConditionNotifier.cpp fastdds/core/condition/GuardCondition.cpp diff --git a/src/cpp/fastdds/core/Entity.cpp b/src/cpp/fastdds/core/Entity.cpp new file mode 100644 index 00000000000..f6b8d4adcb6 --- /dev/null +++ b/src/cpp/fastdds/core/Entity.cpp @@ -0,0 +1,35 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file Entity.cpp + * + */ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { + +const StatusMask& Entity::get_status_changes() const +{ + return status_condition_.get_impl()->get_raw_status(); +} + +} // namespace dds +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/fastdds/core/condition/StatusConditionImpl.hpp b/src/cpp/fastdds/core/condition/StatusConditionImpl.hpp index 8cb0d346e36..a08c52efaf8 100644 --- a/src/cpp/fastdds/core/condition/StatusConditionImpl.hpp +++ b/src/cpp/fastdds/core/condition/StatusConditionImpl.hpp @@ -76,6 +76,15 @@ struct StatusConditionImpl */ const StatusMask& get_enabled_statuses() const; + /** + * @brief Retrieves the list of communication statuses that are currently triggered. + * @return Triggered status. + */ + const StatusMask& get_raw_status() const + { + return status_; + } + /** * @brief Set the trigger value of a specific status * @param status The status for which to change the trigger value diff --git a/test/unittest/dds/core/entity/CMakeLists.txt b/test/unittest/dds/core/entity/CMakeLists.txt index 3448996906c..54909150b28 100644 --- a/test/unittest/dds/core/entity/CMakeLists.txt +++ b/test/unittest/dds/core/entity/CMakeLists.txt @@ -23,6 +23,7 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER)) set(ENTITY_TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypesBase.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/Entity.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/ConditionNotifier.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusCondition.cpp diff --git a/test/unittest/dds/status/CMakeLists.txt b/test/unittest/dds/status/CMakeLists.txt index b5919771e00..cd16cc51c8a 100644 --- a/test/unittest/dds/status/CMakeLists.txt +++ b/test/unittest/dds/status/CMakeLists.txt @@ -21,6 +21,7 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER)) find_package(Threads REQUIRED) set(LISTENERTESTS_SOURCE ListenerTests.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/Entity.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/ConditionNotifier.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusCondition.cpp diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt index 59046428dac..a785034fff0 100644 --- a/test/unittest/statistics/dds/CMakeLists.txt +++ b/test/unittest/statistics/dds/CMakeLists.txt @@ -110,6 +110,7 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER)) ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/builtin/typelookup/TypeLookupReplyListener.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/builtin/typelookup/TypeLookupRequestListener.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/builtin/typelookup/common/TypeLookupTypes.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/Entity.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/ConditionNotifier.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusCondition.cpp From 0f441317e88096271ef4eb2aa5f56450005d4155 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 1 Jul 2021 09:38:32 +0200 Subject: [PATCH 9/9] Refs 11835. Added StatusConditionImpl::get_raw_status to unit test. Signed-off-by: Miguel Company --- .../core/condition/StatusConditionImplTests.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/test/unittest/dds/core/condition/StatusConditionImplTests.cpp b/test/unittest/dds/core/condition/StatusConditionImplTests.cpp index 5bab750d318..748eccf6b22 100644 --- a/test/unittest/dds/core/condition/StatusConditionImplTests.cpp +++ b/test/unittest/dds/core/condition/StatusConditionImplTests.cpp @@ -52,17 +52,22 @@ TEST(StatusConditionImplTests, notify_trigger) ::testing::StrictMock notifier; StatusConditionImpl uut(¬ifier); + StatusMask mask_none = StatusMask::none(); StatusMask mask_all = StatusMask::all(); StatusMask one_mask = StatusMask::inconsistent_topic(); StatusMask other_mask = StatusMask::data_on_readers(); + StatusMask both_mask = one_mask; + both_mask |= other_mask; // Condition should be untriggered upon creation EXPECT_FALSE(uut.get_trigger_value()); EXPECT_EQ(mask_all.to_string(), uut.get_enabled_statuses().to_string()); + EXPECT_EQ(mask_none.to_string(), uut.get_raw_status().to_string()); // Triggering other_mask should trigger auto& call1 = EXPECT_CALL(notifier, notify).Times(1); uut.set_status(other_mask, true); + EXPECT_EQ(other_mask.to_string(), uut.get_raw_status().to_string()); EXPECT_TRUE(uut.get_trigger_value()); // Setting mask to one_mask should untrigger @@ -73,30 +78,37 @@ TEST(StatusConditionImplTests, notify_trigger) // Triggering one_mask should trigger auto& call2 = EXPECT_CALL(notifier, notify).Times(1).After(call1); uut.set_status(one_mask, true); + EXPECT_EQ(both_mask.to_string(), uut.get_raw_status().to_string()); EXPECT_TRUE(uut.get_trigger_value()); // Triggering twice should not affect trigger uut.set_status(one_mask, true); + EXPECT_EQ(both_mask.to_string(), uut.get_raw_status().to_string()); EXPECT_TRUE(uut.get_trigger_value()); // Untriggering other_mask should not affect trigger uut.set_status(other_mask, false); + EXPECT_EQ(one_mask.to_string(), uut.get_raw_status().to_string()); EXPECT_TRUE(uut.get_trigger_value()); // Triggering other_mask should not affect trigger uut.set_status(other_mask, true); + EXPECT_EQ(both_mask.to_string(), uut.get_raw_status().to_string()); EXPECT_TRUE(uut.get_trigger_value()); // Untriggering one_mask should untrigger uut.set_status(one_mask, false); + EXPECT_EQ(other_mask.to_string(), uut.get_raw_status().to_string()); EXPECT_FALSE(uut.get_trigger_value()); // Untriggering other_mask should not trigger uut.set_status(other_mask, false); + EXPECT_EQ(mask_none.to_string(), uut.get_raw_status().to_string()); EXPECT_FALSE(uut.get_trigger_value()); // Triggering other_mask should not trigger uut.set_status(other_mask, true); + EXPECT_EQ(other_mask.to_string(), uut.get_raw_status().to_string()); EXPECT_FALSE(uut.get_trigger_value()); // Setting mask to other_mask should trigger @@ -106,6 +118,7 @@ TEST(StatusConditionImplTests, notify_trigger) // Triggering one_mask should not affect trigger uut.set_status(one_mask, true); + EXPECT_EQ(both_mask.to_string(), uut.get_raw_status().to_string()); EXPECT_TRUE(uut.get_trigger_value()); // Setting mask to one_mask should not affect trigger @@ -115,6 +128,7 @@ TEST(StatusConditionImplTests, notify_trigger) // Untriggering other_mask should not affect trigger uut.set_status(other_mask, false); + EXPECT_EQ(one_mask.to_string(), uut.get_raw_status().to_string()); EXPECT_TRUE(uut.get_trigger_value()); // Setting mask to other_mask should untrigger