diff --git a/src/cpp/rtps/DataSharing/ReaderPool.hpp b/src/cpp/rtps/DataSharing/ReaderPool.hpp index 8c1b9db7bdd..8c34a64bfe5 100644 --- a/src/cpp/rtps/DataSharing/ReaderPool.hpp +++ b/src/cpp/rtps/DataSharing/ReaderPool.hpp @@ -123,13 +123,18 @@ class ReaderPool : public DataSharingPayloadPool } // Set the reading pointer + next_payload_ = begin(); if (is_volatile_) { - next_payload_ = end(); - } - else - { - next_payload_ = begin(); + CacheChange_t ch; + SequenceNumber_t last_sequence = c_SequenceNumber_Unknown; + get_next_unread_payload(ch, last_sequence); + while (ch.sequenceNumber != SequenceNumber_t::unknown()) + { + advance(next_payload_); + get_next_unread_payload(ch, last_sequence); + } + assert(next_payload_ == end()); } return true; diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index f0d9b3bef87..178428bc336 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -233,7 +233,18 @@ bool StatefulReader::matched_writer_add( } // Intraprocess manages durability itself - if (!is_same_process && m_att.durabilityKind != VOLATILE) + if (VOLATILE == m_att.durabilityKind) + { + std::shared_ptr pool = datasharing_listener_->get_pool_for_writer(wp->guid()); + SequenceNumber_t last_seq = pool->get_last_read_sequence_number(); + if (SequenceNumber_t::unknown() != last_seq) + { + SequenceNumberSet_t sns(last_seq + 1); + send_acknack(wp, sns, *wp, false); + wp->lost_changes_update(last_seq + 1); + } + } + else if (!is_same_process) { // simulate a notification to force reading of transient changes datasharing_listener_->notify(false); diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index ea74bfe27fa..309b0c34aca 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -453,7 +453,7 @@ class PubSubReader template - void wait_for_all_received( + bool wait_for_all_received( const std::chrono::duration<_Rep, _Period>& max_wait, size_t num_messages = 0) { @@ -462,10 +462,10 @@ class PubSubReader num_messages = number_samples_expected_; } std::unique_lock lock(message_receive_mutex_); - message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool - { - return num_messages == message_receive_count_; - }); + return message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool + { + return num_messages == message_receive_count_; + }); } void block_for_all() diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 507876a94dc..bce3e402de7 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -368,7 +368,7 @@ class PubSubReader template - void wait_for_all_received( + bool wait_for_all_received( const std::chrono::duration<_Rep, _Period>& max_wait, size_t num_messages = 0) { @@ -377,10 +377,10 @@ class PubSubReader num_messages = number_samples_expected_; } std::unique_lock lock(message_receive_mutex_); - message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool - { - return num_messages == message_receive_count_; - }); + return message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool + { + return num_messages == message_receive_count_; + }); } void block_for_all() diff --git a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp index 4a18848ab46..ee76f331835 100644 --- a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp @@ -56,6 +56,16 @@ class PubSubHistory : public testing::TestWithParam } mem_policy_ = std::get<1>(GetParam()); + + switch (mem_policy_) + { + case rtps::PREALLOCATED_MEMORY_MODE: + case rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE: + will_use_datasharing = enable_datasharing; + break; + default: + break; + } } void TearDown() override @@ -74,11 +84,14 @@ class PubSubHistory : public testing::TestWithParam default: break; } + will_use_datasharing = false; } protected: rtps::MemoryManagementPolicy mem_policy_; + + bool will_use_datasharing = false; }; // Test created to check bug #1568 (Github #34) @@ -1144,6 +1157,84 @@ TEST_P(PubSubHistory, WriterUnmatchClearsHistory) reader.block_for_all(); } +// Regression test for #11743 +/*! + * @fn TEST(PubSubHistory, KeepAllWriterContinueSendingAfterReaderMatched) + * @brief This test checks that the writer doesn't block writing samples when meet a Datasharing Volatile reader. + * + * The test creates a Reliable, Transient Local Writer with a Keep All history, and its resources limited to + * 1 samples. + * Then it creates a Reliable, Volatile Reader. + * Writer will be the first discovering and then sends a sample. + * Reader could discover the writer when the writer already put the sample in the Datasharing history for the reader. + * The Volatile reader should be able to acks these kind of samples. + * + * Writer will be able then to send a second sample. + */ +TEST_P(PubSubHistory, KeepAllWriterContinueSendingAfterReaderMatched) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + reader.reliability(RELIABLE_RELIABILITY_QOS); + + writer.reliability(RELIABLE_RELIABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) + .resource_limits_allocated_samples(1) + .resource_limits_max_samples(1); + + writer.mem_policy(mem_policy_).init(); + ASSERT_TRUE(writer.isInitialized()); + + reader.mem_policy(mem_policy_).init(); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + + HelloWorld data; + data.message("Hello world!"); + data.index(1u); + ASSERT_TRUE(writer.send_sample(data)); + + reader.wait_discovery(); + + // Second writer sends one sample (reader should discard previous one) + data.index(2u); + uint32_t expected_value = data.index(); + + if (will_use_datasharing) + { + if (reader.wait_for_all_received(std::chrono::seconds(3), 1)) + { + ASSERT_FALSE(writer.send_sample(data)); + expected_value = 1; + } + else + { + ASSERT_TRUE(writer.send_sample(data)); + } + } + else + { + ASSERT_TRUE(writer.send_sample(data)); + } + + if (will_use_datasharing) + { + reader.wait_for_all_received(std::chrono::seconds(3), expected_value); + } + else + { + writer.waitForAllAcked(std::chrono::seconds(3)); + } + + // Only one sample should be present + HelloWorld received; + ASSERT_TRUE(reader.takeNextData(&received)); + ASSERT_EQ(received.index(), expected_value); + ASSERT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3))); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else