Skip to content

Commit

Permalink
Add warning about Datasharing limitation with Datareader's history si…
Browse files Browse the repository at this point in the history
…ze [11926] (#2038)

* Refs #11676. Add warning for Datareader's HistoryQoS when using Datasharing.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11676. Fix segment faults with Datasharing

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11926. Fix long sentence

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Apply suggestion
  • Loading branch information
richiware authored Jul 5, 2021
1 parent b8436f3 commit 163380d
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 29 deletions.
22 changes: 17 additions & 5 deletions src/cpp/rtps/DataSharing/DataSharingListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ void DataSharingListener::process_new_data ()

bool DataSharingListener::add_datasharing_writer(
const GUID_t& writer_guid,
bool is_volatile)
bool is_volatile,
int32_t reader_history_max_samples)
{
std::lock_guard<std::mutex> lock(mutex_);

Expand All @@ -217,11 +218,22 @@ bool DataSharingListener::add_datasharing_writer(

std::shared_ptr<ReaderPool> pool =
std::static_pointer_cast<ReaderPool>(DataSharingPayloadPool::get_reader_pool(is_volatile));
pool->init_shared_memory(writer_guid, datasharing_pools_directory_);
writer_pools_.emplace_back(pool, pool->last_liveliness_sequence());
writer_pools_changed_.store(true);
if (pool->init_shared_memory(writer_guid, datasharing_pools_directory_))
{
if (0 >= reader_history_max_samples ||
reader_history_max_samples >= static_cast<int32_t>(pool->history_size()))
{
logWarning(RTPS_READER,
"Reader " << reader_->getGuid() << " was configured to have a large history (" <<
reader_history_max_samples << " max samples), but the history size used with writer " <<
writer_guid << " will be " << pool->history_size() << " max samples.");
}
writer_pools_.emplace_back(pool, pool->last_liveliness_sequence());
writer_pools_changed_.store(true);
return true;
}

return true;
return false;
}

bool DataSharingListener::remove_datasharing_writer(
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/DataSharing/DataSharingListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class DataSharingListener : public IDataSharingListener

bool add_datasharing_writer(
const GUID_t& writer_guid,
bool is_volatile) override;
bool is_volatile,
int32_t reader_history_max_samples) override;

bool remove_datasharing_writer(
const GUID_t& writer_guid) override;
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ class DataSharingPayloadPool : public IPayloadPool
return "history";
}

uint32_t history_size() const
{
return descriptor_->history_size;
}

/**
* Advances an index to the history to the next position
*/
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/DataSharing/IDataSharingListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class IDataSharingListener
*/
virtual bool add_datasharing_writer(
const GUID_t& writer_guid,
bool is_reader_volatile) = 0;
bool is_reader_volatile,
int32_t reader_history_max_samples) = 0;

/**
* Removes a writer from the listening. The changes in the writer's history will not be
Expand Down
25 changes: 14 additions & 11 deletions src/cpp/rtps/reader/RTPSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,24 @@ void RTPSReader::init(

if (att.endpoint.data_sharing_configuration().kind() != OFF)
{
is_datasharing_compatible_ = true;
using std::placeholders::_1;
std::shared_ptr<DataSharingNotification> notification =
DataSharingNotification::create_notification(
getGuid(), att.endpoint.data_sharing_configuration().shm_directory());
datasharing_listener_.reset(new DataSharingListener(
notification,
att.endpoint.data_sharing_configuration().shm_directory(),
att.matched_writers_allocation,
this));

// We can start the listener here, as no writer can be matched already,
// so no notification will occur until the non-virtual instance is constructed.
// But we need to stop the listener in the non-virtual instance destructor.
datasharing_listener_->start();
if (notification)
{
is_datasharing_compatible_ = true;
datasharing_listener_.reset(new DataSharingListener(
notification,
att.endpoint.data_sharing_configuration().shm_directory(),
att.matched_writers_allocation,
this));

// We can start the listener here, as no writer can be matched already,
// so no notification will occur until the non-virtual instance is constructed.
// But we need to stop the listener in the non-virtual instance destructor.
datasharing_listener_->start();
}
}

mp_history->mp_reader = this;
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ bool StatefulReader::matched_writer_add(
if (is_datasharing)
{
if (datasharing_listener_->add_datasharing_writer(wdata.guid(),
m_att.durabilityKind == VOLATILE))
m_att.durabilityKind == VOLATILE,
mp_history->m_att.maximumReservedCaches))
{
matched_writers_.push_back(wp);
logInfo(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ bool StatelessReader::matched_writer_add(
if (is_datasharing)
{
if (datasharing_listener_->add_datasharing_writer(wdata.guid(),
m_att.durabilityKind == VOLATILE))
m_att.durabilityKind == VOLATILE,
mp_history->m_att.maximumReservedCaches))
{
logInfo(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId
<< " with data sharing");
Expand Down
7 changes: 4 additions & 3 deletions test/blackbox/api/dds-pim/PubSubWriterReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ class PubSubWriterReader
}

bool create_additional_topics(
size_t num_topics)
size_t num_topics,
const char* suffix)
{
bool ret_val = initialized_;
if (ret_val)
Expand All @@ -422,12 +423,12 @@ class PubSubWriterReader

for (size_t i = 0; i < vector_size; i++)
{
topic_name += "/";
topic_name += suffix;
}

for (size_t i = 0; ret_val && (i < num_topics); i++)
{
topic_name += "/";
topic_name += suffix;
eprosima::fastdds::dds::Topic* topic = participant_->create_topic(topic_name,
type_->getName(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT);
ret_val &= (nullptr != topic);
Expand Down
7 changes: 4 additions & 3 deletions test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ class PubSubWriterReader
}

bool create_additional_topics(
size_t num_topics)
size_t num_topics,
const char* suffix)
{
bool ret_val = initialized_;
if (ret_val)
Expand All @@ -398,7 +399,7 @@ class PubSubWriterReader

for (size_t i = 0; ret_val && (i < num_topics); i++)
{
topic_name += "/";
topic_name += suffix;
publisher_attr_.topic.topicName = topic_name;
ret_val &=
nullptr != eprosima::fastrtps::Domain::createPublisher(participant_, publisher_attr_,
Expand All @@ -409,7 +410,7 @@ class PubSubWriterReader

for (size_t i = 0; ret_val && (i < num_topics); i++)
{
topic_name += "/";
topic_name += suffix;
subscriber_attr_.topic.topicName = topic_name;
ret_val &=
nullptr != eprosima::fastrtps::Domain::createSubscriber(participant_, subscriber_attr_,
Expand Down
6 changes: 3 additions & 3 deletions test/blackbox/common/BlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ static void discoverParticipantsSeveralEndpointsTest(
std::cout << "\rParticipant " << idx++ << " of " << n_participants << std::flush;
ps->init(avoid_multicast);
ASSERT_EQ(ps->isInitialized(), true);
ASSERT_TRUE(ps->create_additional_topics(n_topics - 1));
ASSERT_TRUE(ps->create_additional_topics(n_topics - 1, "/"));
}

bool all_discovered = false;
Expand Down Expand Up @@ -1063,7 +1063,7 @@ TEST_P(Discovery, EndpointCreationMultithreaded)
while (!stop)
{
std::this_thread::sleep_for(creation_sleep);
EXPECT_NO_THROW(participant_1.create_additional_topics(1));
EXPECT_NO_THROW(participant_1.create_additional_topics(1, "/"));
}
};

Expand All @@ -1086,7 +1086,7 @@ TEST_P(Discovery, EndpointCreationMultithreaded)
// Additional endpoints created just after the second participant.
// This gives the first participant very few time to receive the undiscovery,
// and makes the intraprocess delivery on a deleted builtin reader.
participant_1.create_additional_topics(1);
participant_1.create_additional_topics(1, "_");
};

EXPECT_NO_THROW(second_participant_process());
Expand Down

0 comments on commit 163380d

Please sign in to comment.