Skip to content

Commit

Permalink
Avoid a volatile datasharing reader blocks a writer [12222] (#2090)
Browse files Browse the repository at this point in the history
* Refs #11743. Add regression test

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11743. Volatire reader know acks samples in the writer pool

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Apply suggestions from code review

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11743. Fix error

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11743. Fix compilation error with new flow controllers

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11743. Fix segmentation fault

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
richiware and MiguelCompany authored Sep 6, 2021
1 parent 38e8d0f commit c97053e
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 17 deletions.
17 changes: 11 additions & 6 deletions src/cpp/rtps/DataSharing/ReaderPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,21 @@ class ReaderPool : public DataSharingPayloadPool
}

// Set the reading pointer
next_payload_ = begin();
segment_ = std::move(local_segment);
if (is_volatile_)
{
next_payload_ = end();
}
else
{
next_payload_ = begin();
CacheChange_t ch;
SequenceNumber_t last_sequence = c_SequenceNumber_Unknown;
get_next_unread_payload(ch, last_sequence);
while (ch.sequenceNumber != SequenceNumber_t::unknown())
{
advance(next_payload_);
get_next_unread_payload(ch, last_sequence);
}
assert(next_payload_ == end());
}

segment_ = std::move(local_segment);
return true;
}

Expand Down
13 changes: 12 additions & 1 deletion src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,18 @@ bool StatefulReader::matched_writer_add(
}

// Intraprocess manages durability itself
if (!is_same_process && m_att.durabilityKind != VOLATILE)
if (VOLATILE == m_att.durabilityKind)
{
std::shared_ptr<ReaderPool> pool = datasharing_listener_->get_pool_for_writer(wp->guid());
SequenceNumber_t last_seq = pool->get_last_read_sequence_number();
if (SequenceNumber_t::unknown() != last_seq)
{
SequenceNumberSet_t sns(last_seq + 1);
send_acknack(wp, sns, wp, false);
wp->lost_changes_update(last_seq + 1);
}
}
else if (!is_same_process)
{
// simulate a notification to force reading of transient changes
datasharing_listener_->notify(false);
Expand Down
10 changes: 5 additions & 5 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ class PubSubReader
template<class _Rep,
class _Period
>
void wait_for_all_received(
bool wait_for_all_received(
const std::chrono::duration<_Rep, _Period>& max_wait,
size_t num_messages = 0)
{
Expand All @@ -480,10 +480,10 @@ class PubSubReader
num_messages = number_samples_expected_;
}
std::unique_lock<std::mutex> lock(message_receive_mutex_);
message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool
{
return num_messages == message_receive_count_;
});
return message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool
{
return num_messages == message_receive_count_;
});
}

void block_for_all()
Expand Down
10 changes: 5 additions & 5 deletions test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class PubSubReader
template<class _Rep,
class _Period
>
void wait_for_all_received(
bool wait_for_all_received(
const std::chrono::duration<_Rep, _Period>& max_wait,
size_t num_messages = 0)
{
Expand All @@ -377,10 +377,10 @@ class PubSubReader
num_messages = number_samples_expected_;
}
std::unique_lock<std::mutex> lock(message_receive_mutex_);
message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool
{
return num_messages == message_receive_count_;
});
return message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool
{
return num_messages == message_receive_count_;
});
}

void block_for_all()
Expand Down
91 changes: 91 additions & 0 deletions test/blackbox/common/BlackboxTestsPubSubHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ class PubSubHistory : public testing::TestWithParam<test_params>
}

mem_policy_ = std::get<1>(GetParam());

switch (mem_policy_)
{
case rtps::PREALLOCATED_MEMORY_MODE:
case rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE:
will_use_datasharing = enable_datasharing;
break;
default:
break;
}
}

void TearDown() override
Expand All @@ -74,11 +84,14 @@ class PubSubHistory : public testing::TestWithParam<test_params>
default:
break;
}
will_use_datasharing = false;
}

protected:

rtps::MemoryManagementPolicy mem_policy_;

bool will_use_datasharing = false;
};

// Test created to check bug #1568 (Github #34)
Expand Down Expand Up @@ -1188,6 +1201,84 @@ TEST_P(PubSubHistory, WriterUnmatchClearsHistory)
reader.block_for_all();
}

// Regression test for #11743
/*!
* @fn TEST(PubSubHistory, KeepAllWriterContinueSendingAfterReaderMatched)
* @brief This test checks that the writer doesn't block writing samples when meet a Datasharing Volatile reader.
*
* The test creates a Reliable, Transient Local Writer with a Keep All history, and its resources limited to
* 1 samples.
* Then it creates a Reliable, Volatile Reader.
* Writer will be the first discovering and then sends a sample.
* Reader could discover the writer when the writer already put the sample in the Datasharing history for the reader.
* The Volatile reader should be able to acks these kind of samples.
*
* Writer will be able then to send a second sample.
*/
TEST_P(PubSubHistory, KeepAllWriterContinueSendingAfterReaderMatched)
{
PubSubReader<HelloWorldType> reader(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldType> writer(TEST_TOPIC_NAME);

reader.reliability(RELIABLE_RELIABILITY_QOS);

writer.reliability(RELIABLE_RELIABILITY_QOS)
.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS)
.resource_limits_allocated_samples(1)
.resource_limits_max_samples(1);

writer.mem_policy(mem_policy_).init();
ASSERT_TRUE(writer.isInitialized());

reader.mem_policy(mem_policy_).init();
ASSERT_TRUE(reader.isInitialized());

writer.wait_discovery();

HelloWorld data;
data.message("Hello world!");
data.index(1u);
ASSERT_TRUE(writer.send_sample(data));

reader.wait_discovery();

// Second writer sends one sample (reader should discard previous one)
data.index(2u);
uint32_t expected_value = data.index();

if (will_use_datasharing)
{
if (reader.wait_for_all_received(std::chrono::seconds(3), 1))
{
ASSERT_FALSE(writer.send_sample(data));
expected_value = 1;
}
else
{
ASSERT_TRUE(writer.send_sample(data));
}
}
else
{
ASSERT_TRUE(writer.send_sample(data));
}

if (will_use_datasharing)
{
reader.wait_for_all_received(std::chrono::seconds(3), expected_value);
}
else
{
writer.waitForAllAcked(std::chrono::seconds(3));
}

// Only one sample should be present
HelloWorld received;
ASSERT_TRUE(reader.takeNextData(&received));
ASSERT_EQ(received.index(), expected_value);
ASSERT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3)));
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit c97053e

Please sign in to comment.