Skip to content

Commit

Permalink
Fix read/take behavior when a future change is found (#2074)
Browse files Browse the repository at this point in the history
* Refs #12133. Add regression test

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #12133. Continue the search after found a future change

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #12133. Apply suggestion

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #12133. Apply suggestion.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
  • Loading branch information
richiware authored Jul 19, 2021
1 parent bb33b86 commit 3de55e3
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 33 deletions.
61 changes: 30 additions & 31 deletions src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,39 +140,38 @@ struct ReadTakeCommand

// If the change is in the future we can skip the remaining changes in the history, as they will be
// in the future also
if (is_future_change)
if (!is_future_change)
{
break;
}

// Add sample and info to collections
ReturnCode_t previous_return_value = return_value_;
bool added = add_sample(change, remove_change);
reader_->end_sample_access_nts(change, wp, added);

// Check if the payload is dirty
if (added && !check_datasharing_validity(change, data_values_.has_ownership(), wp))
{
// Decrement length of collections
--current_slot_;
++remaining_samples_;
data_values_.length(current_slot_);
sample_infos_.length(current_slot_);

return_value_ = previous_return_value;
finished_ = false;

remove_change = true;
added = false;
}

if (remove_change || (added && take_samples))
{
// Remove from history
history_.remove_change_sub(change, it);

// Current iterator will point to change next to the one removed. Avoid incrementing.
continue;
// Add sample and info to collections
ReturnCode_t previous_return_value = return_value_;
bool added = add_sample(change, remove_change);
reader_->end_sample_access_nts(change, wp, added);

// Check if the payload is dirty
if (added && !check_datasharing_validity(change, data_values_.has_ownership(), wp))
{
// Decrement length of collections
--current_slot_;
++remaining_samples_;
data_values_.length(current_slot_);
sample_infos_.length(current_slot_);

return_value_ = previous_return_value;
finished_ = false;

remove_change = true;
added = false;
}

if (remove_change || (added && take_samples))
{
// Remove from history
history_.remove_change_sub(change, it);

// Current iterator will point to change next to the one removed. Avoid incrementing.
continue;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/unittest/dds/subscriber/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER))

set(SUBSCRIBERTESTS_SOURCE SubscriberTests.cpp)
set(DATAREADERTESTS_SOURCE DataReaderTests.cpp)

if(WIN32)
add_definitions(-D_WIN32_WINNT=0x0601)
endif()
Expand Down
94 changes: 93 additions & 1 deletion test/unittest/dds/subscriber/DataReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@

#include "../../logging/mock/MockConsumer.h"

#include <fastdds/rtps/transport/test_UDPv4TransportDescriptor.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

namespace eprosima {
namespace fastdds {
namespace dds {
Expand Down Expand Up @@ -141,7 +144,7 @@ class DataReaderTests : public ::testing::Test
ASSERT_NE(data_reader_, nullptr);

data_writer_ = publisher_->create_datawriter(topic_, wqos);
ASSERT_NE(data_reader_, nullptr);
ASSERT_NE(data_writer_, nullptr);
}

void create_instance_handles()
Expand Down Expand Up @@ -1915,6 +1918,95 @@ TEST_F(DataReaderUnsupportedTests, UnsupportedDataReaderMethods)
ASSERT_EQ(DomainParticipantFactory::get_instance()->delete_participant(participant), ReturnCode_t::RETCODE_OK);
}

// Regression test for #12133.
TEST_F(DataReaderTests, read_samples_with_future_changes)
{
fastrtps::LibrarySettingsAttributes att;
att.intraprocess_delivery = fastrtps::INTRAPROCESS_OFF;
eprosima::fastrtps::xmlparser::XMLProfileManager::library_settings(att);
static constexpr int32_t num_samples = 8;
static constexpr int32_t expected_samples = 4;
const ReturnCode_t& ok_code = ReturnCode_t::RETCODE_OK;
bool start_dropping_acks = false;
bool start_dropping_datas = false;
static const Duration_t time_to_wait(0, 100 * 1000 * 1000);
std::shared_ptr<rtps::test_UDPv4TransportDescriptor> test_descriptor =
std::make_shared<rtps::test_UDPv4TransportDescriptor>();
test_descriptor->drop_ack_nack_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t&) -> bool
{
return start_dropping_acks;
};
test_descriptor->drop_data_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t&) -> bool
{
return start_dropping_datas;
};

DomainParticipantQos participant_qos = PARTICIPANT_QOS_DEFAULT;
participant_qos.transport().use_builtin_transports = false;
participant_qos.transport().user_transports.push_back(test_descriptor);

DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT;
reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
reader_qos.history().kind = KEEP_ALL_HISTORY_QOS;

DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT;
writer_qos.history().kind = KEEP_ALL_HISTORY_QOS;

create_entities(
nullptr,
reader_qos,
SUBSCRIBER_QOS_DEFAULT,
writer_qos,
PUBLISHER_QOS_DEFAULT,
TOPIC_QOS_DEFAULT,
participant_qos);

DataWriter* data_writer2 = publisher_->create_datawriter(topic_, writer_qos);

create_instance_handles();
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait discovery

FooType data;
data.index(1);
data.message()[0] = '\0';
data.message()[1] = '\0';

for (int i = 0; i < 2; ++i)
{
data_writer_->write(&data, handle_ok_);
}

start_dropping_datas = true;
start_dropping_acks = true;

for (int i = 0; i < 2; ++i)
{
data_writer2->write(&data, handle_ok_);
}

start_dropping_datas = false;

for (int i = 0; i < 2; ++i)
{
data_writer2->write(&data, handle_ok_);
}

for (int i = 0; i < 2; ++i)
{
data_writer_->write(&data, handle_ok_);
}

std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait all received

FooSeq data_seq(num_samples);
SampleInfoSeq info_seq(num_samples);

EXPECT_EQ(ok_code, data_reader_->take(data_seq, info_seq, num_samples, NOT_READ_SAMPLE_STATE));
check_collection(data_seq, true, num_samples, expected_samples);

ASSERT_EQ(publisher_->delete_datawriter(data_writer2), ReturnCode_t::RETCODE_OK);
}



} // namespace dds
Expand Down

0 comments on commit 3de55e3

Please sign in to comment.