From 605dba4cd81410b2619d0d04fa47585ac845dff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Tue, 22 Jun 2021 11:10:31 +0200 Subject: [PATCH 1/4] Refs #11676. Add warning for Datareader's HistoryQoS when using Datasharing. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- src/cpp/rtps/DataSharing/DataSharingListener.cpp | 10 +++++++++- src/cpp/rtps/DataSharing/DataSharingListener.hpp | 3 ++- src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp | 5 +++++ src/cpp/rtps/DataSharing/IDataSharingListener.hpp | 3 ++- src/cpp/rtps/reader/StatefulReader.cpp | 3 ++- src/cpp/rtps/reader/StatelessReader.cpp | 3 ++- 6 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index 858178da28b..beb5e2dfd71 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -205,7 +205,8 @@ void DataSharingListener::process_new_data () bool DataSharingListener::add_datasharing_writer( const GUID_t& writer_guid, - bool is_volatile) + bool is_volatile, + int32_t reader_history_max_samples) { std::lock_guard lock(mutex_); @@ -218,6 +219,13 @@ bool DataSharingListener::add_datasharing_writer( std::shared_ptr pool = std::static_pointer_cast(DataSharingPayloadPool::get_reader_pool(is_volatile)); pool->init_shared_memory(writer_guid, datasharing_pools_directory_); + if (0 == reader_history_max_samples || + reader_history_max_samples > static_cast(pool->history_size() - 1)) + { + logWarning(RTPS_READER, + "Reader " << reader_->getGuid() << " was configured to have a large history (" << reader_history_max_samples << " max samples), but the history size used with writer " << writer_guid << " will be " << pool->history_size() << + " max samples."); + } writer_pools_.emplace_back(pool, pool->last_liveliness_sequence()); writer_pools_changed_.store(true); diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.hpp b/src/cpp/rtps/DataSharing/DataSharingListener.hpp index e76a25d6fb7..41d27b4029c 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.hpp @@ -65,7 +65,8 @@ class DataSharingListener : public IDataSharingListener bool add_datasharing_writer( const GUID_t& writer_guid, - bool is_volatile) override; + bool is_volatile, + int32_t reader_history_max_samples) override; bool remove_datasharing_writer( const GUID_t& writer_guid) override; diff --git a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp index bd6d0aa520b..ec912ee8040 100644 --- a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp @@ -101,6 +101,11 @@ class DataSharingPayloadPool : public IPayloadPool return "history"; } + uint32_t history_size() const + { + return descriptor_->history_size; + } + /** * Advances an index to the history to the next position */ diff --git a/src/cpp/rtps/DataSharing/IDataSharingListener.hpp b/src/cpp/rtps/DataSharing/IDataSharingListener.hpp index ab3d4694fa3..5d1a2f5da67 100644 --- a/src/cpp/rtps/DataSharing/IDataSharingListener.hpp +++ b/src/cpp/rtps/DataSharing/IDataSharingListener.hpp @@ -69,7 +69,8 @@ class IDataSharingListener */ virtual bool add_datasharing_writer( const GUID_t& writer_guid, - bool is_reader_volatile) = 0; + bool is_reader_volatile, + int32_t reader_history_max_samples) = 0; /** * Removes a writer from the listening. The changes in the writer's history will not be diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 110717b38e3..385b895316a 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -215,7 +215,8 @@ bool StatefulReader::matched_writer_add( if (is_datasharing) { if (datasharing_listener_->add_datasharing_writer(wdata.guid(), - m_att.durabilityKind == VOLATILE)) + m_att.durabilityKind == VOLATILE, + mp_history->m_att.maximumReservedCaches)) { matched_writers_.push_back(wp); logInfo(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index a71b0173d6c..7e742ba30d8 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -113,7 +113,8 @@ bool StatelessReader::matched_writer_add( if (is_datasharing) { if (datasharing_listener_->add_datasharing_writer(wdata.guid(), - m_att.durabilityKind == VOLATILE)) + m_att.durabilityKind == VOLATILE, + mp_history->m_att.maximumReservedCaches)) { logInfo(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId << " with data sharing"); From a1b4d74ab0230dd7a417ecf0a749ecc52dffd140 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Tue, 22 Jun 2021 12:39:18 +0200 Subject: [PATCH 2/4] Refs #11676. Fix segment faults with Datasharing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- .../rtps/DataSharing/DataSharingListener.cpp | 21 +++++++++------- src/cpp/rtps/reader/RTPSReader.cpp | 25 +++++++++++-------- .../api/dds-pim/PubSubWriterReader.hpp | 7 +++--- .../PubSubWriterReader.hpp | 7 +++--- .../common/BlackboxTestsDiscovery.cpp | 6 ++--- 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index beb5e2dfd71..2188e5cf8d7 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -218,18 +218,21 @@ bool DataSharingListener::add_datasharing_writer( std::shared_ptr pool = std::static_pointer_cast(DataSharingPayloadPool::get_reader_pool(is_volatile)); - pool->init_shared_memory(writer_guid, datasharing_pools_directory_); - if (0 == reader_history_max_samples || - reader_history_max_samples > static_cast(pool->history_size() - 1)) + if (pool->init_shared_memory(writer_guid, datasharing_pools_directory_)) { - logWarning(RTPS_READER, - "Reader " << reader_->getGuid() << " was configured to have a large history (" << reader_history_max_samples << " max samples), but the history size used with writer " << writer_guid << " will be " << pool->history_size() << - " max samples."); + if (0 == reader_history_max_samples || + reader_history_max_samples > static_cast(pool->history_size() - 1)) + { + logWarning(RTPS_READER, + "Reader " << reader_->getGuid() << " was configured to have a large history (" << reader_history_max_samples << " max samples), but the history size used with writer " << writer_guid << " will be " << pool->history_size() << + " max samples."); + } + writer_pools_.emplace_back(pool, pool->last_liveliness_sequence()); + writer_pools_changed_.store(true); + return true; } - writer_pools_.emplace_back(pool, pool->last_liveliness_sequence()); - writer_pools_changed_.store(true); - return true; + return false; } bool DataSharingListener::remove_datasharing_writer( diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index d49406b6680..98e28c998d1 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -120,21 +120,24 @@ void RTPSReader::init( if (att.endpoint.data_sharing_configuration().kind() != OFF) { - is_datasharing_compatible_ = true; using std::placeholders::_1; std::shared_ptr notification = DataSharingNotification::create_notification( getGuid(), att.endpoint.data_sharing_configuration().shm_directory()); - datasharing_listener_.reset(new DataSharingListener( - notification, - att.endpoint.data_sharing_configuration().shm_directory(), - att.matched_writers_allocation, - this)); - - // We can start the listener here, as no writer can be matched already, - // so no notification will occur until the non-virtual instance is constructed. - // But we need to stop the listener in the non-virtual instance destructor. - datasharing_listener_->start(); + if (notification) + { + is_datasharing_compatible_ = true; + datasharing_listener_.reset(new DataSharingListener( + notification, + att.endpoint.data_sharing_configuration().shm_directory(), + att.matched_writers_allocation, + this)); + + // We can start the listener here, as no writer can be matched already, + // so no notification will occur until the non-virtual instance is constructed. + // But we need to stop the listener in the non-virtual instance destructor. + datasharing_listener_->start(); + } } mp_history->mp_reader = this; diff --git a/test/blackbox/api/dds-pim/PubSubWriterReader.hpp b/test/blackbox/api/dds-pim/PubSubWriterReader.hpp index 5924278e105..b002601df6d 100644 --- a/test/blackbox/api/dds-pim/PubSubWriterReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriterReader.hpp @@ -412,7 +412,8 @@ class PubSubWriterReader } bool create_additional_topics( - size_t num_topics) + size_t num_topics, + const char* suffix) { bool ret_val = initialized_; if (ret_val) @@ -421,12 +422,12 @@ class PubSubWriterReader for (size_t i = 0; i < entities_extra_.size(); i++) { - topic_name += "/"; + topic_name += suffix; } for (size_t i = 0; ret_val && (i < num_topics); i++) { - topic_name += "/"; + topic_name += suffix; eprosima::fastdds::dds::Topic* topic = participant_->create_topic(topic_name, type_->getName(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); ret_val &= (nullptr != topic); diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp index d3804dac2ee..5f525153e23 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp @@ -389,7 +389,8 @@ class PubSubWriterReader } bool create_additional_topics( - size_t num_topics) + size_t num_topics, + const char* suffix) { bool ret_val = initialized_; if (ret_val) @@ -398,7 +399,7 @@ class PubSubWriterReader for (size_t i = 0; ret_val && (i < num_topics); i++) { - topic_name += "/"; + topic_name += suffix; publisher_attr_.topic.topicName = topic_name; ret_val &= nullptr != eprosima::fastrtps::Domain::createPublisher(participant_, publisher_attr_, @@ -409,7 +410,7 @@ class PubSubWriterReader for (size_t i = 0; ret_val && (i < num_topics); i++) { - topic_name += "/"; + topic_name += suffix; subscriber_attr_.topic.topicName = topic_name; ret_val &= nullptr != eprosima::fastrtps::Domain::createSubscriber(participant_, subscriber_attr_, diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 71180414aac..5681e77ddbe 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -933,7 +933,7 @@ static void discoverParticipantsSeveralEndpointsTest( std::cout << "\rParticipant " << idx++ << " of " << n_participants << std::flush; ps->init(avoid_multicast); ASSERT_EQ(ps->isInitialized(), true); - ASSERT_TRUE(ps->create_additional_topics(n_topics - 1)); + ASSERT_TRUE(ps->create_additional_topics(n_topics - 1, "/")); } bool all_discovered = false; @@ -1063,7 +1063,7 @@ TEST_P(Discovery, EndpointCreationMultithreaded) while (!stop) { std::this_thread::sleep_for(creation_sleep); - EXPECT_NO_THROW(participant_1.create_additional_topics(1)); + EXPECT_NO_THROW(participant_1.create_additional_topics(1, "/")); } }; @@ -1086,7 +1086,7 @@ TEST_P(Discovery, EndpointCreationMultithreaded) // Additional endpoints created just after the second participant. // This gives the first participant very few time to receive the undiscovery, // and makes the intraprocess delivery on a deleted builtin reader. - participant_1.create_additional_topics(1); + participant_1.create_additional_topics(1, "_"); }; EXPECT_NO_THROW(second_participant_process()); From fd1b7a87b5a83fe869c1f508c01a2d27e3dc51b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Fri, 2 Jul 2021 09:56:56 +0200 Subject: [PATCH 3/4] Refs #11926. Fix long sentence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- src/cpp/rtps/DataSharing/DataSharingListener.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index 2188e5cf8d7..17d1432144a 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -224,8 +224,9 @@ bool DataSharingListener::add_datasharing_writer( reader_history_max_samples > static_cast(pool->history_size() - 1)) { logWarning(RTPS_READER, - "Reader " << reader_->getGuid() << " was configured to have a large history (" << reader_history_max_samples << " max samples), but the history size used with writer " << writer_guid << " will be " << pool->history_size() << - " max samples."); + "Reader " << reader_->getGuid() << " was configured to have a large history (" << + reader_history_max_samples << " max samples), but the history size used with writer " << + writer_guid << " will be " << pool->history_size() << " max samples."); } writer_pools_.emplace_back(pool, pool->last_liveliness_sequence()); writer_pools_changed_.store(true); From 9efbe1746dc9a54be44284a531098374fa19711d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez?= Date: Fri, 2 Jul 2021 10:19:52 +0200 Subject: [PATCH 4/4] Apply suggestion --- src/cpp/rtps/DataSharing/DataSharingListener.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index 17d1432144a..e76cb4d1c1e 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -220,8 +220,8 @@ bool DataSharingListener::add_datasharing_writer( std::static_pointer_cast(DataSharingPayloadPool::get_reader_pool(is_volatile)); if (pool->init_shared_memory(writer_guid, datasharing_pools_directory_)) { - if (0 == reader_history_max_samples || - reader_history_max_samples > static_cast(pool->history_size() - 1)) + if (0 >= reader_history_max_samples || + reader_history_max_samples >= static_cast(pool->history_size())) { logWarning(RTPS_READER, "Reader " << reader_->getGuid() << " was configured to have a large history (" <<