From ddcd193f0e002777ffc9b5fcda0788d672730509 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 6 Sep 2021 14:49:02 +0200 Subject: [PATCH 1/3] Keep changes inside instances sorted by source timestamp (#2182) * Refs 12419. Regression test. Signed-off-by: Miguel Company * Refs 12419. Added sorted_vector_insert utility method. Signed-off-by: Miguel Company * Refs 12419. ReaderHistory uses sorted_vector_insert. Signed-off-by: Miguel Company * Refs 12419. SubscriberHistory uses sorted_vector_insert. Signed-off-by: Miguel Company * Refs 12421. Uncrustify. Signed-off-by: Miguel Company * Refs 12421. Apply suggestions Signed-off-by: Miguel Company * Refs 12421. Moved new template method to correct namespace. Signed-off-by: Miguel Company (cherry picked from commit 38e8d0fb7e1fdf5e90d03d3be33369dec87af8a2) # Conflicts: # src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp # src/cpp/rtps/history/ReaderHistory.cpp --- .../subscriber/SubscriberHistory.cpp | 22 +++++-- src/cpp/rtps/history/ReaderHistory.cpp | 10 +++ .../collections/sorted_vector_insert.hpp | 64 +++++++++++++++++++ .../common/BlackboxTestsPubSubHistory.cpp | 44 +++++++++++++ 4 files changed, 133 insertions(+), 7 deletions(-) create mode 100644 src/cpp/utils/collections/sorted_vector_insert.hpp diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp index dfc4fee9de0..8f4e9804b8b 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp @@ -18,15 +18,22 @@ */ #include -#include -#include -#include +#include +#include #include #include +<<<<<<< HEAD #include +======= +#include + +#include +#include +#include +>>>>>>> 38e8d0fb7 (Keep changes inside instances sorted by source timestamp (#2182)) namespace eprosima { namespace fastrtps { @@ -265,10 +272,11 @@ bool SubscriberHistory::add_received_change_with_key( } //ADD TO KEY VECTOR - - // As the instance should be ordered following the presentation QoS, and - // we only support ordering by reception timestamp, we can always add at the end. - instance_changes.push_back(a_change); + eprosima::utilities::collections::sorted_vector_insert(instance_changes, a_change, + [](const CacheChange_t* lhs, const CacheChange_t* rhs) + { + return lhs->sourceTimestamp < rhs->sourceTimestamp; + }); logInfo(SUBSCRIBER, mp_reader->getGuid().entityId << ": Change " << a_change->sequenceNumber << " added from: " diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index 41909ca3908..003c15fb47e 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -24,6 +24,8 @@ #include #include +#include + #include namespace eprosima { @@ -71,6 +73,7 @@ bool ReaderHistory::add_change( logError(RTPS_READER_HISTORY, "The Writer GUID_t must be defined"); } +<<<<<<< HEAD if (!m_changes.empty() && a_change->sourceTimestamp < (*m_changes.rbegin())->sourceTimestamp) { auto it = std::lower_bound(m_changes.begin(), m_changes.end(), a_change, @@ -85,6 +88,13 @@ bool ReaderHistory::add_change( m_changes.push_back(a_change); } +======= + eprosima::utilities::collections::sorted_vector_insert(m_changes, a_change, + [](const CacheChange_t* lhs, const CacheChange_t* rhs) + { + return lhs->sourceTimestamp < rhs->sourceTimestamp; + }); +>>>>>>> 38e8d0fb7 (Keep changes inside instances sorted by source timestamp (#2182)) logInfo(RTPS_READER_HISTORY, "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes"); diff --git a/src/cpp/utils/collections/sorted_vector_insert.hpp b/src/cpp/utils/collections/sorted_vector_insert.hpp new file mode 100644 index 00000000000..6d45cf216c1 --- /dev/null +++ b/src/cpp/utils/collections/sorted_vector_insert.hpp @@ -0,0 +1,64 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file sorted_vector_insert.hpp + */ + +#ifndef SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_ +#define SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_ + +#include +#include + +namespace eprosima { +namespace utilities { +namespace collections { + +/** + * @brief Insert item into sorted vector-like collection + * + * @tparam CollectionType Type of the collection to be modified. + * @tparam ValueType Type of the item to insert. The collection should support to insert a value of this type. + * @tparam LessThanPredicate Predicate that performs ValueType < CollectionType::value_type comparison. + * + * @param[in,out] collection The collection to be modified. + * @param[in] item The item to be inserted. + * @param[in] pred The predicate to use for comparisons. + */ +template< + typename CollectionType, + typename ValueType, + typename LessThanPredicate = std::less> +void sorted_vector_insert( + CollectionType& collection, + const ValueType& item, + const LessThanPredicate& pred = LessThanPredicate()) +{ + // Insert at the end by default + auto it = collection.end(); + + // Find insertion position when item is less than last element in collection + if (!collection.empty() && pred(item, *collection.rbegin())) + { + it = std::lower_bound(collection.begin(), collection.end(), item, pred); + } + collection.insert(it, item); +} + +} // namespace collections +} // namespace utilities +} // namespace eprosima + +#endif // SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_ diff --git a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp index 5371bf12b1f..8da9ec5e96e 100644 --- a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp @@ -563,6 +563,50 @@ TEST(BlackBox, PubSubAsReliableKeepLastReaderSmallDepthWithKey) } } +// Regression test for redmine bug #12419 +// It uses a test transport to drop some DATA messages, in order to force unordered reception. +TEST_P(PubSubHistory, PubSubAsReliableKeepLastWithKeyUnorderedReception) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + uint32_t keys = 2; + uint32_t depth = 10; + + reader.resource_limits_max_instances(keys). + reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). + history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS). + history_depth(depth).mem_policy(mem_policy_).init(); + + ASSERT_TRUE(reader.isInitialized()); + + auto testTransport = std::make_shared(); + testTransport->dropDataMessagesPercentage = 25; + + writer.resource_limits_max_instances(keys). + reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). + history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS). + history_depth(depth).mem_policy(mem_policy_). + disable_builtin_transport().add_user_transport_to_pparams(testTransport). + init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyedhelloworld_data_generator(keys * depth); + reader.startReception(data); + + // Send data + writer.send(data); + ASSERT_TRUE(data.empty()); + + reader.block_for_all(); + reader.stopReception(); +} + bool comparator( HelloWorld first, HelloWorld second) From 1f9e9cfc90261771cbe34e879853d606bb7a5ed6 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 7 Oct 2021 09:34:32 +0200 Subject: [PATCH 2/3] Refs 12421. Fix conflicts after backport. Signed-off-by: Miguel Company --- .../subscriber/SubscriberHistory.cpp | 4 ---- src/cpp/rtps/history/ReaderHistory.cpp | 17 ----------------- .../common/BlackboxTestsPubSubHistory.cpp | 5 +++-- 3 files changed, 3 insertions(+), 23 deletions(-) diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp index 8f4e9804b8b..73632f446c4 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp @@ -25,15 +25,11 @@ #include #include -<<<<<<< HEAD -#include -======= #include #include #include #include ->>>>>>> 38e8d0fb7 (Keep changes inside instances sorted by source timestamp (#2182)) namespace eprosima { namespace fastrtps { diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index 003c15fb47e..d840253a5b3 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -73,28 +73,11 @@ bool ReaderHistory::add_change( logError(RTPS_READER_HISTORY, "The Writer GUID_t must be defined"); } -<<<<<<< HEAD - if (!m_changes.empty() && a_change->sourceTimestamp < (*m_changes.rbegin())->sourceTimestamp) - { - auto it = std::lower_bound(m_changes.begin(), m_changes.end(), a_change, - [](const CacheChange_t* c1, const CacheChange_t* c2) -> bool - { - return c1->sourceTimestamp < c2->sourceTimestamp; - }); - m_changes.insert(it, a_change); - } - else - { - m_changes.push_back(a_change); - } - -======= eprosima::utilities::collections::sorted_vector_insert(m_changes, a_change, [](const CacheChange_t* lhs, const CacheChange_t* rhs) { return lhs->sourceTimestamp < rhs->sourceTimestamp; }); ->>>>>>> 38e8d0fb7 (Keep changes inside instances sorted by source timestamp (#2182)) logInfo(RTPS_READER_HISTORY, "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes"); diff --git a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp index 8da9ec5e96e..9ba7b02fe63 100644 --- a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp @@ -18,6 +18,7 @@ #include "PubSubWriter.hpp" #include +#include #include using namespace eprosima::fastrtps; @@ -576,7 +577,7 @@ TEST_P(PubSubHistory, PubSubAsReliableKeepLastWithKeyUnorderedReception) reader.resource_limits_max_instances(keys). reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS). - history_depth(depth).mem_policy(mem_policy_).init(); + history_depth(depth).init(); ASSERT_TRUE(reader.isInitialized()); @@ -586,7 +587,7 @@ TEST_P(PubSubHistory, PubSubAsReliableKeepLastWithKeyUnorderedReception) writer.resource_limits_max_instances(keys). reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS). - history_depth(depth).mem_policy(mem_policy_). + history_depth(depth). disable_builtin_transport().add_user_transport_to_pparams(testTransport). init(); From 18d6e5f07b139e6f91a484b364094f4d6c5bedc3 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 7 Oct 2021 09:40:08 +0200 Subject: [PATCH 3/3] Refs 12421. Uncrustify. Signed-off-by: Miguel Company --- .../fastrtps_deprecated/subscriber/SubscriberHistory.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp index 73632f446c4..2c9861c6090 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp @@ -138,7 +138,7 @@ bool SubscriberHistory::received_change_keep_all_no_key( size_t unknown_missing_changes_up_to) { // TODO(Ricardo) Check - if (m_changes.size() + unknown_missing_changes_up_to < static_cast(resource_limited_qos_.max_samples) ) + if (m_changes.size() + unknown_missing_changes_up_to < static_cast(resource_limited_qos_.max_samples)) { return add_received_change(a_change); } @@ -151,7 +151,7 @@ bool SubscriberHistory::received_change_keep_last_no_key( size_t /* unknown_missing_changes_up_to */ ) { bool add = false; - if (m_changes.size() < static_cast(history_qos_.depth) ) + if (m_changes.size() < static_cast(history_qos_.depth)) { add = true; } @@ -181,7 +181,7 @@ bool SubscriberHistory::received_change_keep_all_with_key( if (find_key_for_change(a_change, vit)) { std::vector& instance_changes = vit->second.cache_changes; - if (instance_changes.size() < static_cast(resource_limited_qos_.max_samples_per_instance) ) + if (instance_changes.size() < static_cast(resource_limited_qos_.max_samples_per_instance)) { return add_received_change_with_key(a_change, vit->second.cache_changes); } @@ -201,7 +201,7 @@ bool SubscriberHistory::received_change_keep_last_with_key( { bool add = false; std::vector& instance_changes = vit->second.cache_changes; - if (instance_changes.size() < static_cast(history_qos_.depth) ) + if (instance_changes.size() < static_cast(history_qos_.depth)) { add = true; }