diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp index 5800bdc8cd4..d7ca53d4d25 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp @@ -140,39 +140,38 @@ struct ReadTakeCommand // If the change is in the future we can skip the remaining changes in the history, as they will be // in the future also - if (is_future_change) + if (!is_future_change) { - break; - } - - // Add sample and info to collections - ReturnCode_t previous_return_value = return_value_; - bool added = add_sample(change, remove_change); - reader_->end_sample_access_nts(change, wp, added); - // Check if the payload is dirty - if (added && !check_datasharing_validity(change, data_values_.has_ownership(), wp)) - { - // Decrement length of collections - --current_slot_; - ++remaining_samples_; - data_values_.length(current_slot_); - sample_infos_.length(current_slot_); - - return_value_ = previous_return_value; - finished_ = false; - - remove_change = true; - added = false; - } - - if (remove_change || (added && take_samples)) - { - // Remove from history - history_.remove_change_sub(change, it); - - // Current iterator will point to change next to the one removed. Avoid incrementing. - continue; + // Add sample and info to collections + ReturnCode_t previous_return_value = return_value_; + bool added = add_sample(change, remove_change); + reader_->end_sample_access_nts(change, wp, added); + + // Check if the payload is dirty + if (added && !check_datasharing_validity(change, data_values_.has_ownership(), wp)) + { + // Decrement length of collections + --current_slot_; + ++remaining_samples_; + data_values_.length(current_slot_); + sample_infos_.length(current_slot_); + + return_value_ = previous_return_value; + finished_ = false; + + remove_change = true; + added = false; + } + + if (remove_change || (added && take_samples)) + { + // Remove from history + history_.remove_change_sub(change, it); + + // Current iterator will point to change next to the one removed. Avoid incrementing. + continue; + } } } diff --git a/test/unittest/dds/subscriber/CMakeLists.txt b/test/unittest/dds/subscriber/CMakeLists.txt index b24e4e81b6d..573a8d5bbf0 100644 --- a/test/unittest/dds/subscriber/CMakeLists.txt +++ b/test/unittest/dds/subscriber/CMakeLists.txt @@ -22,7 +22,7 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER)) set(SUBSCRIBERTESTS_SOURCE SubscriberTests.cpp) set(DATAREADERTESTS_SOURCE DataReaderTests.cpp) - + if(WIN32) add_definitions(-D_WIN32_WINNT=0x0601) endif() diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index 0213f0a1d44..893417d833c 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -57,6 +57,9 @@ #include "../../logging/mock/MockConsumer.h" +#include +#include + namespace eprosima { namespace fastdds { namespace dds { @@ -141,7 +144,7 @@ class DataReaderTests : public ::testing::Test ASSERT_NE(data_reader_, nullptr); data_writer_ = publisher_->create_datawriter(topic_, wqos); - ASSERT_NE(data_reader_, nullptr); + ASSERT_NE(data_writer_, nullptr); } void create_instance_handles() @@ -1915,6 +1918,95 @@ TEST_F(DataReaderUnsupportedTests, UnsupportedDataReaderMethods) ASSERT_EQ(DomainParticipantFactory::get_instance()->delete_participant(participant), ReturnCode_t::RETCODE_OK); } +// Regression test for #12133. +TEST_F(DataReaderTests, read_samples_with_future_changes) +{ + fastrtps::LibrarySettingsAttributes att; + att.intraprocess_delivery = fastrtps::INTRAPROCESS_OFF; + eprosima::fastrtps::xmlparser::XMLProfileManager::library_settings(att); + static constexpr int32_t num_samples = 8; + static constexpr int32_t expected_samples = 4; + const ReturnCode_t& ok_code = ReturnCode_t::RETCODE_OK; + bool start_dropping_acks = false; + bool start_dropping_datas = false; + static const Duration_t time_to_wait(0, 100 * 1000 * 1000); + std::shared_ptr test_descriptor = + std::make_shared(); + test_descriptor->drop_ack_nack_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t&) -> bool + { + return start_dropping_acks; + }; + test_descriptor->drop_data_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t&) -> bool + { + return start_dropping_datas; + }; + + DomainParticipantQos participant_qos = PARTICIPANT_QOS_DEFAULT; + participant_qos.transport().use_builtin_transports = false; + participant_qos.transport().user_transports.push_back(test_descriptor); + + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS; + reader_qos.history().kind = KEEP_ALL_HISTORY_QOS; + + DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; + writer_qos.history().kind = KEEP_ALL_HISTORY_QOS; + + create_entities( + nullptr, + reader_qos, + SUBSCRIBER_QOS_DEFAULT, + writer_qos, + PUBLISHER_QOS_DEFAULT, + TOPIC_QOS_DEFAULT, + participant_qos); + + DataWriter* data_writer2 = publisher_->create_datawriter(topic_, writer_qos); + + create_instance_handles(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait discovery + + FooType data; + data.index(1); + data.message()[0] = '\0'; + data.message()[1] = '\0'; + + for (int i = 0; i < 2; ++i) + { + data_writer_->write(&data, handle_ok_); + } + + start_dropping_datas = true; + start_dropping_acks = true; + + for (int i = 0; i < 2; ++i) + { + data_writer2->write(&data, handle_ok_); + } + + start_dropping_datas = false; + + for (int i = 0; i < 2; ++i) + { + data_writer2->write(&data, handle_ok_); + } + + for (int i = 0; i < 2; ++i) + { + data_writer_->write(&data, handle_ok_); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait all received + + FooSeq data_seq(num_samples); + SampleInfoSeq info_seq(num_samples); + + EXPECT_EQ(ok_code, data_reader_->take(data_seq, info_seq, num_samples, NOT_READ_SAMPLE_STATE)); + check_collection(data_seq, true, num_samples, expected_samples); + + ASSERT_EQ(publisher_->delete_datawriter(data_writer2), ReturnCode_t::RETCODE_OK); +} + } // namespace dds