Skip to content

Commit

Permalink
Keep changes inside instances sorted by source timestamp (#2182)
Browse files Browse the repository at this point in the history
* Refs 12419. Regression test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12419. Added sorted_vector_insert utility method.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12419. ReaderHistory uses sorted_vector_insert.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12419. SubscriberHistory uses sorted_vector_insert.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12421. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12421. Apply suggestions

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12421. Moved new template method to correct namespace.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
(cherry picked from commit 38e8d0f)

# Conflicts:
#	include/fastdds/rtps/history/ReaderHistory.h
#	src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
#	src/cpp/rtps/history/ReaderHistory.cpp
#	test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h
  • Loading branch information
MiguelCompany authored and mergify-bot committed Sep 6, 2021
1 parent 2cbde93 commit e86155b
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 7 deletions.
10 changes: 10 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ class ReaderHistory : public History

protected:

<<<<<<< HEAD
=======
RTPS_DllAPI bool do_reserve_cache(
CacheChange_t** change,
uint32_t size) override;

RTPS_DllAPI void do_release_cache(
CacheChange_t* ch) override;

>>>>>>> 38e8d0fb7 (Keep changes inside instances sorted by source timestamp (#2182))
//!Pointer to the reader
RTPSReader* mp_reader;
};
Expand Down
22 changes: 15 additions & 7 deletions src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@
*/

#include <fastrtps/subscriber/SubscriberHistory.h>
#include <fastrtps_deprecated/subscriber/SubscriberImpl.h>

#include <fastdds/rtps/reader/RTPSReader.h>
#include <rtps/reader/WriterProxy.h>
#include <limits>
#include <mutex>

#include <fastdds/dds/topic/TopicDataType.hpp>
#include <fastdds/dds/log/Log.hpp>

<<<<<<< HEAD
#include <mutex>
=======
#include <fastdds/rtps/reader/RTPSReader.h>

#include <fastrtps_deprecated/subscriber/SubscriberImpl.h>
#include <rtps/reader/WriterProxy.h>
#include <utils/collections/sorted_vector_insert.hpp>
>>>>>>> 38e8d0fb7 (Keep changes inside instances sorted by source timestamp (#2182))

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -265,10 +272,11 @@ bool SubscriberHistory::add_received_change_with_key(
}

//ADD TO KEY VECTOR

// As the instance should be ordered following the presentation QoS, and
// we only support ordering by reception timestamp, we can always add at the end.
instance_changes.push_back(a_change);
eprosima::utilities::collections::sorted_vector_insert(instance_changes, a_change,
[](const CacheChange_t* lhs, const CacheChange_t* rhs)
{
return lhs->sourceTimestamp < rhs->sourceTimestamp;
});

logInfo(SUBSCRIBER, mp_reader->getGuid().entityId
<< ": Change " << a_change->sequenceNumber << " added from: "
Expand Down
26 changes: 26 additions & 0 deletions src/cpp/rtps/history/ReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastdds/rtps/reader/ReaderListener.h>

#include <utils/collections/sorted_vector_insert.hpp>

#include <mutex>

namespace eprosima {
Expand Down Expand Up @@ -71,6 +73,7 @@ bool ReaderHistory::add_change(
logError(RTPS_READER_HISTORY, "The Writer GUID_t must be defined");
}

<<<<<<< HEAD
if (!m_changes.empty() && a_change->sourceTimestamp < (*m_changes.rbegin())->sourceTimestamp)
{
auto it = std::lower_bound(m_changes.begin(), m_changes.end(), a_change,
Expand All @@ -85,6 +88,13 @@ bool ReaderHistory::add_change(
m_changes.push_back(a_change);
}

=======
eprosima::utilities::collections::sorted_vector_insert(m_changes, a_change,
[](const CacheChange_t* lhs, const CacheChange_t* rhs)
{
return lhs->sourceTimestamp < rhs->sourceTimestamp;
});
>>>>>>> 38e8d0fb7 (Keep changes inside instances sorted by source timestamp (#2182))
logInfo(RTPS_READER_HISTORY,
"Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes");

Expand Down Expand Up @@ -226,6 +236,22 @@ bool ReaderHistory::get_min_change_from(
return ret;
}

<<<<<<< HEAD
=======
bool ReaderHistory::do_reserve_cache(
CacheChange_t** change,
uint32_t size)
{
return mp_reader->reserveCache(change, size);
}

void ReaderHistory::do_release_cache(
CacheChange_t* ch)
{
mp_reader->releaseCache(ch);
}

>>>>>>> 38e8d0fb7 (Keep changes inside instances sorted by source timestamp (#2182))
} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
64 changes: 64 additions & 0 deletions src/cpp/utils/collections/sorted_vector_insert.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file sorted_vector_insert.hpp
*/

#ifndef SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_
#define SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_

#include <algorithm>
#include <functional>

namespace eprosima {
namespace utilities {
namespace collections {

/**
* @brief Insert item into sorted vector-like collection
*
* @tparam CollectionType Type of the collection to be modified.
* @tparam ValueType Type of the item to insert. The collection should support to insert a value of this type.
* @tparam LessThanPredicate Predicate that performs ValueType < CollectionType::value_type comparison.
*
* @param[in,out] collection The collection to be modified.
* @param[in] item The item to be inserted.
* @param[in] pred The predicate to use for comparisons.
*/
template<
typename CollectionType,
typename ValueType,
typename LessThanPredicate = std::less<ValueType>>
void sorted_vector_insert(
CollectionType& collection,
const ValueType& item,
const LessThanPredicate& pred = LessThanPredicate())
{
// Insert at the end by default
auto it = collection.end();

// Find insertion position when item is less than last element in collection
if (!collection.empty() && pred(item, *collection.rbegin()))
{
it = std::lower_bound(collection.begin(), collection.end(), item, pred);
}
collection.insert(it, item);
}

} // namespace collections
} // namespace utilities
} // namespace eprosima

#endif // SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_
44 changes: 44 additions & 0 deletions test/blackbox/BlackboxTestsPubSubHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,50 @@ TEST(BlackBox, PubSubAsReliableKeepLastReaderSmallDepthWithKey)
}
}

// Regression test for redmine bug #12419
// It uses a test transport to drop some DATA messages, in order to force unordered reception.
TEST_P(PubSubHistory, PubSubAsReliableKeepLastWithKeyUnorderedReception)
{
PubSubReader<KeyedHelloWorldType> reader(TEST_TOPIC_NAME);
PubSubWriter<KeyedHelloWorldType> writer(TEST_TOPIC_NAME);

uint32_t keys = 2;
uint32_t depth = 10;

reader.resource_limits_max_instances(keys).
reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).
history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS).
history_depth(depth).mem_policy(mem_policy_).init();

ASSERT_TRUE(reader.isInitialized());

auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();
testTransport->dropDataMessagesPercentage = 25;

writer.resource_limits_max_instances(keys).
reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).
history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS).
history_depth(depth).mem_policy(mem_policy_).
disable_builtin_transport().add_user_transport_to_pparams(testTransport).
init();

ASSERT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_keyedhelloworld_data_generator(keys * depth);
reader.startReception(data);

// Send data
writer.send(data);
ASSERT_TRUE(data.empty());

reader.block_for_all();
reader.stopReception();
}

bool comparator(
HelloWorld first,
HelloWorld second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,15 @@ class ReaderHistory

RTPSReader* mp_reader;
RecursiveTimedMutex* mp_mutex;
<<<<<<< HEAD
HistoryAttributes m_att;
=======
std::vector<CacheChange_t*> m_changes;
bool m_isHistoryFull;
std::mutex samples_number_mutex_;
unsigned int samples_number_;
SequenceNumber_t last_sequence_number_;
>>>>>>> 38e8d0fb7 (Keep changes inside instances sorted by source timestamp (#2182))
};

} // namespace rtps
Expand Down

0 comments on commit e86155b

Please sign in to comment.