Skip to content

Commit

Permalink
Keep changes inside instances sorted by source timestamp (#2182)
Browse files Browse the repository at this point in the history
* Refs 12419. Regression test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12419. Added sorted_vector_insert utility method.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12419. ReaderHistory uses sorted_vector_insert.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12419. SubscriberHistory uses sorted_vector_insert.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12421. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12421. Apply suggestions

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12421. Moved new template method to correct namespace.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
(cherry picked from commit 38e8d0f)
  • Loading branch information
MiguelCompany authored and mergify-bot committed Sep 6, 2021
1 parent e614897 commit 9625b5b
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 38 deletions.
3 changes: 0 additions & 3 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ class ReaderHistory : public History
RTPS_DllAPI void do_release_cache(
CacheChange_t* ch) override;

iterator get_first_change_with_minimum_ts(
const Time_t timestamp);

//!Pointer to the reader
RTPSReader* mp_reader;

Expand Down
21 changes: 12 additions & 9 deletions src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
*/

#include <fastrtps/subscriber/SubscriberHistory.h>
#include <fastrtps_deprecated/subscriber/SubscriberImpl.h>

#include <fastdds/rtps/reader/RTPSReader.h>
#include <rtps/reader/WriterProxy.h>
#include <limits>
#include <mutex>

#include <fastdds/dds/topic/TopicDataType.hpp>
#include <fastdds/dds/log/Log.hpp>

#include <limits>
#include <mutex>
#include <fastdds/rtps/reader/RTPSReader.h>

#include <fastrtps_deprecated/subscriber/SubscriberImpl.h>
#include <rtps/reader/WriterProxy.h>
#include <utils/collections/sorted_vector_insert.hpp>

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -281,10 +283,11 @@ bool SubscriberHistory::add_received_change_with_key(
}

//ADD TO KEY VECTOR

// As the instance should be ordered following the presentation QoS, and
// we only support ordering by reception timestamp, we can always add at the end.
instance_changes.push_back(a_change);
eprosima::utilities::collections::sorted_vector_insert(instance_changes, a_change,
[](const CacheChange_t* lhs, const CacheChange_t* rhs)
{
return lhs->sourceTimestamp < rhs->sourceTimestamp;
});

logInfo(SUBSCRIBER, mp_reader->getGuid().entityId
<< ": Change " << a_change->sequenceNumber << " added from: "
Expand Down
26 changes: 7 additions & 19 deletions src/cpp/rtps/history/ReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastdds/rtps/reader/ReaderListener.h>

#include <utils/collections/sorted_vector_insert.hpp>

#include <mutex>

namespace eprosima {
Expand Down Expand Up @@ -71,9 +73,11 @@ bool ReaderHistory::add_change(
logError(RTPS_READER_HISTORY, "The Writer GUID_t must be defined");
}

auto it = get_first_change_with_minimum_ts(a_change->sourceTimestamp);
m_changes.insert(it, a_change);

eprosima::utilities::collections::sorted_vector_insert(m_changes, a_change,
[](const CacheChange_t* lhs, const CacheChange_t* rhs)
{
return lhs->sourceTimestamp < rhs->sourceTimestamp;
});
logInfo(RTPS_READER_HISTORY,
"Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes");

Expand Down Expand Up @@ -231,22 +235,6 @@ void ReaderHistory::do_release_cache(
mp_reader->releaseCache(ch);
}

History::iterator ReaderHistory::get_first_change_with_minimum_ts(
const Time_t timestamp)
{
if (!m_changes.empty() && timestamp < (*m_changes.rbegin())->sourceTimestamp)
{
iterator it = std::lower_bound(m_changes.begin(), m_changes.end(), timestamp,
[](const CacheChange_t* c1, const Time_t& ts) -> bool
{
return c1->sourceTimestamp < ts;
});
return it;
}

return m_changes.end();
}

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
64 changes: 64 additions & 0 deletions src/cpp/utils/collections/sorted_vector_insert.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file sorted_vector_insert.hpp
*/

#ifndef SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_
#define SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_

#include <algorithm>
#include <functional>

namespace eprosima {
namespace utilities {
namespace collections {

/**
* @brief Insert item into sorted vector-like collection
*
* @tparam CollectionType Type of the collection to be modified.
* @tparam ValueType Type of the item to insert. The collection should support to insert a value of this type.
* @tparam LessThanPredicate Predicate that performs ValueType < CollectionType::value_type comparison.
*
* @param[in,out] collection The collection to be modified.
* @param[in] item The item to be inserted.
* @param[in] pred The predicate to use for comparisons.
*/
template<
typename CollectionType,
typename ValueType,
typename LessThanPredicate = std::less<ValueType>>
void sorted_vector_insert(
CollectionType& collection,
const ValueType& item,
const LessThanPredicate& pred = LessThanPredicate())
{
// Insert at the end by default
auto it = collection.end();

// Find insertion position when item is less than last element in collection
if (!collection.empty() && pred(item, *collection.rbegin()))
{
it = std::lower_bound(collection.begin(), collection.end(), item, pred);
}
collection.insert(it, item);
}

} // namespace collections
} // namespace utilities
} // namespace eprosima

#endif // SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_
44 changes: 44 additions & 0 deletions test/blackbox/common/BlackboxTestsPubSubHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,50 @@ TEST_P(PubSubHistory, PubSubAsReliableKeepLastReaderSmallDepthWithKey)
}
}

// Regression test for redmine bug #12419
// It uses a test transport to drop some DATA messages, in order to force unordered reception.
TEST_P(PubSubHistory, PubSubAsReliableKeepLastWithKeyUnorderedReception)
{
PubSubReader<KeyedHelloWorldType> reader(TEST_TOPIC_NAME);
PubSubWriter<KeyedHelloWorldType> writer(TEST_TOPIC_NAME);

uint32_t keys = 2;
uint32_t depth = 10;

reader.resource_limits_max_instances(keys).
reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).
history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS).
history_depth(depth).mem_policy(mem_policy_).init();

ASSERT_TRUE(reader.isInitialized());

auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();
testTransport->dropDataMessagesPercentage = 25;

writer.resource_limits_max_instances(keys).
reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).
history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS).
history_depth(depth).mem_policy(mem_policy_).
disable_builtin_transport().add_user_transport_to_pparams(testTransport).
init();

ASSERT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_keyedhelloworld_data_generator(keys * depth);
reader.startReception(data);

// Send data
writer.send(data);
ASSERT_TRUE(data.empty());

reader.block_for_all();
reader.stopReception();
}

bool comparator(
HelloWorld first,
HelloWorld second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,6 @@ class ReaderHistory
std::mutex samples_number_mutex_;
unsigned int samples_number_;
SequenceNumber_t last_sequence_number_;

iterator get_first_change_with_minimum_ts(
const Time_t& /* timestamp */)
{
return m_changes.end();
}

};

} // namespace rtps
Expand Down

0 comments on commit 9625b5b

Please sign in to comment.