From 4a8402a07cc47b79893eb54bf191f2541cd1361b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Dom=C3=ADnguez=20L=C3=B3pez?= <116071334+Mario-DL@users.noreply.github.com> Date: Sat, 27 Jan 2024 09:16:07 +0100 Subject: [PATCH 1/3] Add a keyed fragmented change to the reader data instance only when its completed (#4261) * Refs #20257: Add regression test Signed-off-by: Mario Dominguez * #Refs #20257: Fix Signed-off-by: Mario Dominguez * Refs #20239: Second rev suggestions Signed-off-by: Mario Dominguez * Refs #20257: Linter Signed-off-by: Mario Dominguez * Refs #20257: Retrieve instance handle condition before for avoid being nullptr Signed-off-by: Mario Dominguez --------- Signed-off-by: Mario Dominguez (cherry picked from commit 9558ce436628bb0ce0bd76d44400efce1eed8378) # Conflicts: # src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp --- .../subscriber/history/DataReaderHistory.cpp | 36 +++++++++-- src/cpp/rtps/reader/StatefulReader.cpp | 7 +++ src/cpp/rtps/reader/StatelessReader.cpp | 8 +++ test/blackbox/api/dds-pim/PubSubReader.hpp | 7 +++ .../api/fastrtps_deprecated/PubSubReader.hpp | 7 +++ .../common/BlackboxTestsPubSubFragments.cpp | 60 +++++++++++++++++++ 6 files changed, 120 insertions(+), 5 deletions(-) diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index b989db21d44..72d341c5262 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -139,14 +139,14 @@ DataReaderHistory::DataReaderHistory( compute_key_for_change_fn_ = [this](CacheChange_t* a_change) { - if (a_change->instanceHandle.isDefined()) + if (!a_change->is_fully_assembled()) { - return true; + return false; } - if (!a_change->is_fully_assembled()) + if (a_change->instanceHandle.isDefined()) { - return false; + return true; } if (type_ != nullptr) @@ -635,11 +635,26 @@ ReaderHistory::iterator DataReaderHistory::remove_change_nts( bool DataReaderHistory::completed_change( CacheChange_t* change) { +<<<<<<< HEAD bool ret_value = true; +======= + SampleRejectedStatusKind reason; + return completed_change(change, 0, reason); +} - if (!change->instanceHandle.isDefined()) +bool DataReaderHistory::completed_change( + CacheChange_t* change, + size_t unknown_missing_changes_up_to, + SampleRejectedStatusKind& rejection_reason) +{ + bool ret_value = false; + rejection_reason = REJECTED_BY_INSTANCES_LIMIT; +>>>>>>> 9558ce436 (Add a keyed fragmented change to the reader data instance only when its completed (#4261)) + + if (compute_key_for_change_fn_(change)) { InstanceCollection::iterator vit; +<<<<<<< HEAD ret_value = compute_key_for_change_fn_(change) && find_key(change->instanceHandle, vit); if (ret_value) { @@ -658,9 +673,20 @@ bool DataReaderHistory::completed_change( { logError(SUBSCRIBER, "Change should exist but didn't find it"); } +======= + if (find_key(change->instanceHandle, vit)) + { + ret_value = !change->instanceHandle.isDefined() || + complete_fn_(change, *vit->second, unknown_missing_changes_up_to, rejection_reason); +>>>>>>> 9558ce436 (Add a keyed fragmented change to the reader data instance only when its completed (#4261)) } } + if (ret_value) + { + rejection_reason = NOT_REJECTED; + } + return ret_value; } diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index e58b1dd92eb..5f488c89780 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -673,6 +673,7 @@ bool StatefulReader::processDataFragMsg( { work_change->copy_not_memcpy(change_to_add); work_change->serializedPayload.length = sampleSize; + work_change->instanceHandle.clear(); work_change->setFragmentSize(change_to_add->getFragmentSize(), true); change_created = work_change; } @@ -681,6 +682,12 @@ bool StatefulReader::processDataFragMsg( if (work_change != nullptr) { + // Set the instanceHandle only when fragment number 1 is received + if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1) + { + work_change->instanceHandle = change_to_add->instanceHandle; + } + work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum, fragmentsInSubmessage); } diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 7b057501987..971f62950fb 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -606,6 +606,7 @@ bool StatelessReader::processDataFragMsg( // Sample fits inside pending change. Reuse it. work_change->copy_not_memcpy(change_to_add); work_change->serializedPayload.length = sampleSize; + work_change->instanceHandle.clear(); work_change->setFragmentSize(change_to_add->getFragmentSize(), true); } else @@ -631,6 +632,7 @@ bool StatelessReader::processDataFragMsg( { work_change->copy_not_memcpy(change_to_add); work_change->serializedPayload.length = sampleSize; + work_change->instanceHandle.clear(); work_change->setFragmentSize(change_to_add->getFragmentSize(), true); } } @@ -640,6 +642,12 @@ bool StatelessReader::processDataFragMsg( CacheChange_t* change_completed = nullptr; if (work_change != nullptr) { + // Set the instanceHandle only when fragment number 1 is received + if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1) + { + work_change->instanceHandle = change_to_add->instanceHandle; + } + if (work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum, fragmentsInSubmessage)) { diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 2f94b832992..d9a16d75e5c 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -991,6 +991,13 @@ class PubSubReader return *this; } + PubSubReader& expect_inline_qos( + bool expect) + { + datareader_qos_.expects_inline_qos(expect); + return *this; + } + PubSubReader& heartbeatResponseDelay( const int32_t secs, const int32_t frac) diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 54ad64aaa1d..19bd9ad35b7 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -793,6 +793,13 @@ class PubSubReader return *this; } + PubSubReader& expect_inline_qos( + bool expect) + { + subscriber_attr_.expectsInlineQos = expect; + return *this; + } + PubSubReader& heartbeatResponseDelay( const int32_t secs, const int32_t frac) diff --git a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp index 9df48268914..4200871f920 100644 --- a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp @@ -649,6 +649,66 @@ TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableKeyedData300kbKeepLast1InLos testTransport->dropLogLength); } +// Regression test for 20257 +// When a non existing change is removed, the change is also removed from the data instance changes sequence +TEST(PubSubFragmentsLimited, + AsyncPubSubAsReliableKeyedData300kbKeepLast1LoosyConditionsSmallFragmentsCorrectlyBehavesWhenInlineQoSAreForced) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + reader.history_depth(2) + .expect_inline_qos(true) + .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .init(); + + ASSERT_TRUE(reader.isInitialized()); + + // To simulate lossy conditions, we are going to remove the default + // builtin transport, and instead use a lossy shim layer variant. + auto testTransport = std::make_shared(); + testTransport->maxMessageSize = 1024; + // We drop 20% of all data frags + testTransport->dropDataFragMessagesPercentage = 20; + testTransport->dropLogLength = 1; + writer.disable_builtin_transport(); + writer.add_user_transport_to_pparams(testTransport); + + // When doing fragmentation, it is necessary to have some degree of + // flow control not to overrun the receive buffer. + uint32_t bytesPerPeriod = 153601; + uint32_t periodInMs = 100; + writer.add_throughput_controller_descriptor_to_pparams( + eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::HIGH_PRIORITY, bytesPerPeriod, periodInMs) + .heartbeat_period_seconds(0) + .heartbeat_period_nanosec(1000000) + .history_depth(1) + .asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE).init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyeddata300kb_data_generator(5); + + reader.startReception(data); + + // Send data + writer.send(data, 100); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_seq({ 0, 5 }); + + // Sanity check. Make sure we have dropped a few packets + ASSERT_EQ( + test_UDPv4Transport::test_UDPv4Transport_DropLog.size(), + testTransport->dropLogLength); +} + TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableVolatileData300kbInLossyConditionsSmallFragments) { PubSubReader reader(TEST_TOPIC_NAME); From 29551feeee717d26b50ef6b42d511caef234d3eb Mon Sep 17 00:00:00 2001 From: Mario Dominguez Date: Tue, 30 Jan 2024 11:28:27 +0100 Subject: [PATCH 2/3] Refs #20257: Correct test indentation Signed-off-by: Mario Dominguez --- test/blackbox/common/BlackboxTestsPubSubFragments.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp index 4200871f920..1ab90a11750 100644 --- a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp @@ -651,8 +651,9 @@ TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableKeyedData300kbKeepLast1InLos // Regression test for 20257 // When a non existing change is removed, the change is also removed from the data instance changes sequence -TEST(PubSubFragmentsLimited, - AsyncPubSubAsReliableKeyedData300kbKeepLast1LoosyConditionsSmallFragmentsCorrectlyBehavesWhenInlineQoSAreForced) +// For uncrustify sake *INDENT-OFF* +TEST(PubSubFragmentsLimited, AsyncPubSubAsReliableKeyedData300kbKeepLast1LoosyConditionsSmallFragmentsCorrectlyBehavesWhenInlineQoSAreForced) +// *INDENT-ON* { PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter writer(TEST_TOPIC_NAME); From 33492f45181a82ab16716e03d0e26a21971a5b14 Mon Sep 17 00:00:00 2001 From: Mario Dominguez Date: Tue, 30 Jan 2024 12:06:59 +0100 Subject: [PATCH 3/3] Refs #20257: Resolve conflicts Signed-off-by: Mario Dominguez --- .../subscriber/history/DataReaderHistory.cpp | 33 ++++--------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index 72d341c5262..1e92c5747bb 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -634,31 +634,17 @@ ReaderHistory::iterator DataReaderHistory::remove_change_nts( bool DataReaderHistory::completed_change( CacheChange_t* change) -{ -<<<<<<< HEAD - bool ret_value = true; -======= - SampleRejectedStatusKind reason; - return completed_change(change, 0, reason); -} - -bool DataReaderHistory::completed_change( - CacheChange_t* change, - size_t unknown_missing_changes_up_to, - SampleRejectedStatusKind& rejection_reason) { bool ret_value = false; - rejection_reason = REJECTED_BY_INSTANCES_LIMIT; ->>>>>>> 9558ce436 (Add a keyed fragmented change to the reader data instance only when its completed (#4261)) if (compute_key_for_change_fn_(change)) { InstanceCollection::iterator vit; -<<<<<<< HEAD - ret_value = compute_key_for_change_fn_(change) && find_key(change->instanceHandle, vit); - if (ret_value) + + if (find_key(change->instanceHandle, vit)) { - ret_value = !change->instanceHandle.isDefined() || complete_fn_(change, *vit->second); + ret_value = !change->instanceHandle.isDefined() || + complete_fn_(change, *vit->second); } if (!ret_value) @@ -673,18 +659,11 @@ bool DataReaderHistory::completed_change( { logError(SUBSCRIBER, "Change should exist but didn't find it"); } -======= - if (find_key(change->instanceHandle, vit)) - { - ret_value = !change->instanceHandle.isDefined() || - complete_fn_(change, *vit->second, unknown_missing_changes_up_to, rejection_reason); ->>>>>>> 9558ce436 (Add a keyed fragmented change to the reader data instance only when its completed (#4261)) } } - - if (ret_value) + else { - rejection_reason = NOT_REJECTED; + logError(SUBSCRIBER, "Could not compute key from change"); } return ret_value;