diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index 858178da28b..e76cb4d1c1e 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -205,7 +205,8 @@ void DataSharingListener::process_new_data () bool DataSharingListener::add_datasharing_writer( const GUID_t& writer_guid, - bool is_volatile) + bool is_volatile, + int32_t reader_history_max_samples) { std::lock_guard lock(mutex_); @@ -217,11 +218,22 @@ bool DataSharingListener::add_datasharing_writer( std::shared_ptr pool = std::static_pointer_cast(DataSharingPayloadPool::get_reader_pool(is_volatile)); - pool->init_shared_memory(writer_guid, datasharing_pools_directory_); - writer_pools_.emplace_back(pool, pool->last_liveliness_sequence()); - writer_pools_changed_.store(true); + if (pool->init_shared_memory(writer_guid, datasharing_pools_directory_)) + { + if (0 >= reader_history_max_samples || + reader_history_max_samples >= static_cast(pool->history_size())) + { + logWarning(RTPS_READER, + "Reader " << reader_->getGuid() << " was configured to have a large history (" << + reader_history_max_samples << " max samples), but the history size used with writer " << + writer_guid << " will be " << pool->history_size() << " max samples."); + } + writer_pools_.emplace_back(pool, pool->last_liveliness_sequence()); + writer_pools_changed_.store(true); + return true; + } - return true; + return false; } bool DataSharingListener::remove_datasharing_writer( diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.hpp b/src/cpp/rtps/DataSharing/DataSharingListener.hpp index e76a25d6fb7..41d27b4029c 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.hpp @@ -65,7 +65,8 @@ class DataSharingListener : public IDataSharingListener bool add_datasharing_writer( const GUID_t& writer_guid, - bool is_volatile) override; + bool is_volatile, + int32_t reader_history_max_samples) override; bool remove_datasharing_writer( const GUID_t& writer_guid) override; diff --git a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp index bd6d0aa520b..ec912ee8040 100644 --- a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp @@ -101,6 +101,11 @@ class DataSharingPayloadPool : public IPayloadPool return "history"; } + uint32_t history_size() const + { + return descriptor_->history_size; + } + /** * Advances an index to the history to the next position */ diff --git a/src/cpp/rtps/DataSharing/IDataSharingListener.hpp b/src/cpp/rtps/DataSharing/IDataSharingListener.hpp index ab3d4694fa3..5d1a2f5da67 100644 --- a/src/cpp/rtps/DataSharing/IDataSharingListener.hpp +++ b/src/cpp/rtps/DataSharing/IDataSharingListener.hpp @@ -69,7 +69,8 @@ class IDataSharingListener */ virtual bool add_datasharing_writer( const GUID_t& writer_guid, - bool is_reader_volatile) = 0; + bool is_reader_volatile, + int32_t reader_history_max_samples) = 0; /** * Removes a writer from the listening. The changes in the writer's history will not be diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index d49406b6680..98e28c998d1 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -120,21 +120,24 @@ void RTPSReader::init( if (att.endpoint.data_sharing_configuration().kind() != OFF) { - is_datasharing_compatible_ = true; using std::placeholders::_1; std::shared_ptr notification = DataSharingNotification::create_notification( getGuid(), att.endpoint.data_sharing_configuration().shm_directory()); - datasharing_listener_.reset(new DataSharingListener( - notification, - att.endpoint.data_sharing_configuration().shm_directory(), - att.matched_writers_allocation, - this)); - - // We can start the listener here, as no writer can be matched already, - // so no notification will occur until the non-virtual instance is constructed. - // But we need to stop the listener in the non-virtual instance destructor. - datasharing_listener_->start(); + if (notification) + { + is_datasharing_compatible_ = true; + datasharing_listener_.reset(new DataSharingListener( + notification, + att.endpoint.data_sharing_configuration().shm_directory(), + att.matched_writers_allocation, + this)); + + // We can start the listener here, as no writer can be matched already, + // so no notification will occur until the non-virtual instance is constructed. + // But we need to stop the listener in the non-virtual instance destructor. + datasharing_listener_->start(); + } } mp_history->mp_reader = this; diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 3f45373fdb4..05fc7387aee 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -215,7 +215,8 @@ bool StatefulReader::matched_writer_add( if (is_datasharing) { if (datasharing_listener_->add_datasharing_writer(wdata.guid(), - m_att.durabilityKind == VOLATILE)) + m_att.durabilityKind == VOLATILE, + mp_history->m_att.maximumReservedCaches)) { matched_writers_.push_back(wp); logInfo(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index e3c4818f401..eaad5f964c4 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -113,7 +113,8 @@ bool StatelessReader::matched_writer_add( if (is_datasharing) { if (datasharing_listener_->add_datasharing_writer(wdata.guid(), - m_att.durabilityKind == VOLATILE)) + m_att.durabilityKind == VOLATILE, + mp_history->m_att.maximumReservedCaches)) { logInfo(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId << " with data sharing"); diff --git a/test/blackbox/api/dds-pim/PubSubWriterReader.hpp b/test/blackbox/api/dds-pim/PubSubWriterReader.hpp index f8515960653..8833c67cf1e 100644 --- a/test/blackbox/api/dds-pim/PubSubWriterReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriterReader.hpp @@ -412,7 +412,8 @@ class PubSubWriterReader } bool create_additional_topics( - size_t num_topics) + size_t num_topics, + const char* suffix) { bool ret_val = initialized_; if (ret_val) @@ -422,12 +423,12 @@ class PubSubWriterReader for (size_t i = 0; i < vector_size; i++) { - topic_name += "/"; + topic_name += suffix; } for (size_t i = 0; ret_val && (i < num_topics); i++) { - topic_name += "/"; + topic_name += suffix; eprosima::fastdds::dds::Topic* topic = participant_->create_topic(topic_name, type_->getName(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); ret_val &= (nullptr != topic); diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp index d3804dac2ee..5f525153e23 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp @@ -389,7 +389,8 @@ class PubSubWriterReader } bool create_additional_topics( - size_t num_topics) + size_t num_topics, + const char* suffix) { bool ret_val = initialized_; if (ret_val) @@ -398,7 +399,7 @@ class PubSubWriterReader for (size_t i = 0; ret_val && (i < num_topics); i++) { - topic_name += "/"; + topic_name += suffix; publisher_attr_.topic.topicName = topic_name; ret_val &= nullptr != eprosima::fastrtps::Domain::createPublisher(participant_, publisher_attr_, @@ -409,7 +410,7 @@ class PubSubWriterReader for (size_t i = 0; ret_val && (i < num_topics); i++) { - topic_name += "/"; + topic_name += suffix; subscriber_attr_.topic.topicName = topic_name; ret_val &= nullptr != eprosima::fastrtps::Domain::createSubscriber(participant_, subscriber_attr_, diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 71180414aac..5681e77ddbe 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -933,7 +933,7 @@ static void discoverParticipantsSeveralEndpointsTest( std::cout << "\rParticipant " << idx++ << " of " << n_participants << std::flush; ps->init(avoid_multicast); ASSERT_EQ(ps->isInitialized(), true); - ASSERT_TRUE(ps->create_additional_topics(n_topics - 1)); + ASSERT_TRUE(ps->create_additional_topics(n_topics - 1, "/")); } bool all_discovered = false; @@ -1063,7 +1063,7 @@ TEST_P(Discovery, EndpointCreationMultithreaded) while (!stop) { std::this_thread::sleep_for(creation_sleep); - EXPECT_NO_THROW(participant_1.create_additional_topics(1)); + EXPECT_NO_THROW(participant_1.create_additional_topics(1, "/")); } }; @@ -1086,7 +1086,7 @@ TEST_P(Discovery, EndpointCreationMultithreaded) // Additional endpoints created just after the second participant. // This gives the first participant very few time to receive the undiscovery, // and makes the intraprocess delivery on a deleted builtin reader. - participant_1.create_additional_topics(1); + participant_1.create_additional_topics(1, "_"); }; EXPECT_NO_THROW(second_participant_process());