From db0878087b7ac622781274a3cb7d21817f3118a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Fri, 23 Jul 2021 11:52:05 +0200 Subject: [PATCH 1/6] Refs #11743. Add regression test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- test/blackbox/api/dds-pim/PubSubReader.hpp | 10 +- .../api/fastrtps_deprecated/PubSubReader.hpp | 10 +- .../common/BlackboxTestsPubSubHistory.cpp | 93 +++++++++++++++++++ 3 files changed, 103 insertions(+), 10 deletions(-) diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index a0c46b2ee51..8ea3a87f6a6 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -471,7 +471,7 @@ class PubSubReader template - void wait_for_all_received( + bool wait_for_all_received( const std::chrono::duration<_Rep, _Period>& max_wait, size_t num_messages = 0) { @@ -480,10 +480,10 @@ class PubSubReader num_messages = number_samples_expected_; } std::unique_lock lock(message_receive_mutex_); - message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool - { - return num_messages == message_receive_count_; - }); + return message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool + { + return num_messages == message_receive_count_; + }); } void block_for_all() diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 5e216212d4d..80242e409b0 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -368,7 +368,7 @@ class PubSubReader template - void wait_for_all_received( + bool wait_for_all_received( const std::chrono::duration<_Rep, _Period>& max_wait, size_t num_messages = 0) { @@ -377,10 +377,10 @@ class PubSubReader num_messages = number_samples_expected_; } std::unique_lock lock(message_receive_mutex_); - message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool - { - return num_messages == message_receive_count_; - }); + return message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool + { + return num_messages == message_receive_count_; + }); } void block_for_all() diff --git a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp index 77987f8175f..43f0b7ef8b5 100644 --- a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp @@ -56,6 +56,18 @@ class PubSubHistory : public testing::TestWithParam } mem_policy_ = std::get<1>(GetParam()); + + switch (mem_policy_) + { + case rtps::PREALLOCATED_MEMORY_MODE: + will_use_datasharing = true; + break; + case rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE: + will_use_datasharing = true; + break; + default: + break; + } } void TearDown() override @@ -74,11 +86,14 @@ class PubSubHistory : public testing::TestWithParam default: break; } + will_use_datasharing = false; } protected: rtps::MemoryManagementPolicy mem_policy_; + + bool will_use_datasharing = false; }; // Test created to check bug #1568 (Github #34) @@ -1144,6 +1159,84 @@ TEST_P(PubSubHistory, WriterUnmatchClearsHistory) reader.block_for_all(); } +// Regression test for #11743 +/*! + * @fn TEST(PubSubHistory, KeepAllWriterContinueSendingAfterReaderMatched) + * @brief This test checks that the writer doesn't block writing samples when meet a Datasharing Volatile reader. + * + * The test creates a Reliable, Transient Local Writer with a Keep All history, and its resources limited to + * 1 samples. + * Then it creates a Reliable, Volatile Reader. + * Writer will be the first discovering and then sends a sample. + * Reader could discover the writer when the writer already put the sample in the Datasharing history for the reader. + * The Volatile reader should be able to acks these kind of samples. + * + * Writer will be able then to send a second sample. + */ +TEST_P(PubSubHistory, KeepAllWriterContinueSendingAfterReaderMatched) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + reader.reliability(RELIABLE_RELIABILITY_QOS); + + writer.reliability(RELIABLE_RELIABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) + .resource_limits_allocated_samples(1) + .resource_limits_max_samples(1); + + writer.mem_policy(mem_policy_).init(); + ASSERT_TRUE(writer.isInitialized()); + + reader.mem_policy(mem_policy_).init(); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + + HelloWorld data; + data.message("Hello world!"); + data.index(1u); + ASSERT_TRUE(writer.send_sample(data)); + + reader.wait_discovery(); + + // Second writer sends one sample (reader should discard previous one) + data.index(2u); + uint32_t expected_value = data.index(); + + if (enable_datasharing && will_use_datasharing) + { + if (reader.wait_for_all_received(std::chrono::seconds(3), 1)) + { + ASSERT_FALSE(writer.send_sample(data)); + expected_value = 1; + } + else + { + ASSERT_TRUE(writer.send_sample(data)); + } + } + else + { + ASSERT_TRUE(writer.send_sample(data)); + } + + if (enable_datasharing && will_use_datasharing) + { + reader.wait_for_all_received(std::chrono::seconds(3), expected_value); + } + else + { + writer.waitForAllAcked(std::chrono::seconds(3)); + } + + // Only one sample should be present + HelloWorld received; + ASSERT_TRUE(reader.takeNextData(&received)); + ASSERT_EQ(received.index(), expected_value); + ASSERT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3))); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else From 9a9725f39b000b8cd69f744466af1999bd60f60e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Fri, 23 Jul 2021 11:59:15 +0200 Subject: [PATCH 2/6] Refs #11743. Volatire reader know acks samples in the writer pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- src/cpp/rtps/DataSharing/ReaderPool.hpp | 14 +++++++++----- src/cpp/rtps/reader/StatefulReader.cpp | 12 +++++++++++- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/cpp/rtps/DataSharing/ReaderPool.hpp b/src/cpp/rtps/DataSharing/ReaderPool.hpp index f779c565c37..f2c9fa750a3 100644 --- a/src/cpp/rtps/DataSharing/ReaderPool.hpp +++ b/src/cpp/rtps/DataSharing/ReaderPool.hpp @@ -125,13 +125,17 @@ class ReaderPool : public DataSharingPayloadPool } // Set the reading pointer + next_payload_ = begin(); if (is_volatile_) { - next_payload_ = end(); - } - else - { - next_payload_ = begin(); + CacheChange_t ch; + SequenceNumber_t last_sequence = c_SequenceNumber_Unknown; + get_next_unread_payload(ch, last_sequence); + while (ch.sequenceNumber != c_SequenceNumber_Unknown) + { + advance(next_payload_); + get_next_unread_payload(ch, last_sequence); + } } segment_ = std::move(local_segment); diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 196a44f0618..e948649d184 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -233,7 +233,17 @@ bool StatefulReader::matched_writer_add( } // Intraprocess manages durability itself - if (!is_same_process && m_att.durabilityKind != VOLATILE) + if (VOLATILE == m_att.durabilityKind) + { + std::shared_ptr pool = datasharing_listener_->get_pool_for_writer(wp->guid()); + SequenceNumber_t last_seq = pool->get_last_read_sequence_number(); + if (SequenceNumber_t::unknown() != last_seq) + { + SequenceNumberSet_t sns(last_seq + 1); + send_acknack(wp, sns, *wp, false); + } + } + else if (!is_same_process) { // simulate a notification to force reading of transient changes datasharing_listener_->notify(false); From 2e2193e761a7adcaffa0fca9fd17a0e9c00221c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez?= Date: Fri, 23 Jul 2021 14:21:08 +0200 Subject: [PATCH 3/6] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Miguel Company Signed-off-by: Ricardo González Moreno --- test/blackbox/common/BlackboxTestsPubSubHistory.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp index 43f0b7ef8b5..6e83330efd2 100644 --- a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp @@ -60,10 +60,8 @@ class PubSubHistory : public testing::TestWithParam switch (mem_policy_) { case rtps::PREALLOCATED_MEMORY_MODE: - will_use_datasharing = true; - break; case rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE: - will_use_datasharing = true; + will_use_datasharing = enable_datasharing; break; default: break; @@ -1204,7 +1202,7 @@ TEST_P(PubSubHistory, KeepAllWriterContinueSendingAfterReaderMatched) data.index(2u); uint32_t expected_value = data.index(); - if (enable_datasharing && will_use_datasharing) + if (will_use_datasharing) { if (reader.wait_for_all_received(std::chrono::seconds(3), 1)) { @@ -1221,7 +1219,7 @@ TEST_P(PubSubHistory, KeepAllWriterContinueSendingAfterReaderMatched) ASSERT_TRUE(writer.send_sample(data)); } - if (enable_datasharing && will_use_datasharing) + if (will_use_datasharing) { reader.wait_for_all_received(std::chrono::seconds(3), expected_value); } From 111a7b0c66453e4a103d95108d9f439279bfeca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Fri, 23 Jul 2021 15:57:00 +0200 Subject: [PATCH 4/6] Refs #11743. Fix error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- src/cpp/rtps/DataSharing/ReaderPool.hpp | 3 ++- src/cpp/rtps/reader/StatefulReader.cpp | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cpp/rtps/DataSharing/ReaderPool.hpp b/src/cpp/rtps/DataSharing/ReaderPool.hpp index f2c9fa750a3..0f99f24ef63 100644 --- a/src/cpp/rtps/DataSharing/ReaderPool.hpp +++ b/src/cpp/rtps/DataSharing/ReaderPool.hpp @@ -131,11 +131,12 @@ class ReaderPool : public DataSharingPayloadPool CacheChange_t ch; SequenceNumber_t last_sequence = c_SequenceNumber_Unknown; get_next_unread_payload(ch, last_sequence); - while (ch.sequenceNumber != c_SequenceNumber_Unknown) + while (ch.sequenceNumber != SequenceNumber_t::unknown()) { advance(next_payload_); get_next_unread_payload(ch, last_sequence); } + assert(next_payload_ == end()); } segment_ = std::move(local_segment); diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index e948649d184..74f4c7dcbf1 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -241,6 +241,7 @@ bool StatefulReader::matched_writer_add( { SequenceNumberSet_t sns(last_seq + 1); send_acknack(wp, sns, *wp, false); + wp->lost_changes_update(last_seq + 1); } } else if (!is_same_process) From b38d18fc93cfb8c996e2681aa52520e07a976acf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Thu, 2 Sep 2021 10:36:41 +0200 Subject: [PATCH 5/6] Refs #11743. Fix compilation error with new flow controllers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- src/cpp/rtps/reader/StatefulReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 74f4c7dcbf1..b345c756737 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -240,7 +240,7 @@ bool StatefulReader::matched_writer_add( if (SequenceNumber_t::unknown() != last_seq) { SequenceNumberSet_t sns(last_seq + 1); - send_acknack(wp, sns, *wp, false); + send_acknack(wp, sns, wp, false); wp->lost_changes_update(last_seq + 1); } } From a6476eb899c6a6fe98d54fad8d0db86b5372da96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Mon, 6 Sep 2021 09:09:03 +0200 Subject: [PATCH 6/6] Refs #11743. Fix segmentation fault MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- src/cpp/rtps/DataSharing/ReaderPool.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cpp/rtps/DataSharing/ReaderPool.hpp b/src/cpp/rtps/DataSharing/ReaderPool.hpp index 0f99f24ef63..ba69610d135 100644 --- a/src/cpp/rtps/DataSharing/ReaderPool.hpp +++ b/src/cpp/rtps/DataSharing/ReaderPool.hpp @@ -126,6 +126,7 @@ class ReaderPool : public DataSharingPayloadPool // Set the reading pointer next_payload_ = begin(); + segment_ = std::move(local_segment); if (is_volatile_) { CacheChange_t ch; @@ -139,7 +140,6 @@ class ReaderPool : public DataSharingPayloadPool assert(next_payload_ == end()); } - segment_ = std::move(local_segment); return true; }