Skip to content

Commit

Permalink
Move Qos modification for DataReader
Browse files Browse the repository at this point in the history
In order to be able to modify the dataReader Qos, moved its modification
(in case of a keyed topic) to Participant::create_subscription method.
Otherwise It is impossible to modify a QOS of an already created
dataReader.
Also retreive dataReader if not already created as per my understanding
for keyed topics only one reader is enougth

Issue: [eProsima#56]
Signed-off-by: Manuel Valch <manuelValch@proton.me>
  • Loading branch information
manuelValch committed Sep 8, 2024
1 parent 3ba855b commit 96f3b3e
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 68 deletions.
89 changes: 62 additions & 27 deletions plugins/datastreamer_plugin/fastdds/Participant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,41 +179,64 @@ void Participant::create_subscription(
topic_name,
type_name,
default_topic_qos_());

// Create datareader
eprosima::fastdds::dds::DataReader* datareader = subscriber_->create_datareader(
topic,
default_datareader_qos_(),
this); // Mask not required

// Get Dyn Type for type
// This could not fail, as we know type is registered
eprosima::fastrtps::types::DynamicType_ptr dyn_type = get_type_registered_(type_name);

// In order to find out if the topic is keyed or not, create a dynamicPubSubType and then
std::string topicName = topic->get_type_name();
eprosima::fastrtps::types::DynamicPubSubType* pIsKeyType =
eprosima::fastrtps::xmlparser::XMLProfileManager::CreateDynamicPubSubType(topicName);
bool is_keyed = pIsKeyType->m_isGetKeyDefined;
eprosima::fastrtps::xmlparser::XMLProfileManager::DeleteDynamicPubSubType(pIsKeyType);

// Create Reader Handler with all this information and add it to readers
// Create it with specific deleter for reader and topic
ReaderHandlerReference new_reader(
new ReaderHandler(
// Get Qos of the data reader, which is going to be different if keyed
eprosima::fastdds::dds::DataReaderQos readerQos = default_datareader_qos_();

// Retreive dataReader for a keyed topic only 1 reader is needed
eprosima::fastdds::dds::DataReader* datareader = nullptr;
if (true == is_keyed)
{
keyed_dataReader_qos_(readerQos);
DEBUG("Topic: " << topic->get_name() << " is keyed");
datareader = this->subscriber_->lookup_datareader(topicName);
}

// Create it if not found which also applies when is_keyed is false
if (nullptr == datareader)
{
datareader = subscriber_->create_datareader(
topic,
datareader,
dyn_type,
listener_,
data_type_configuration,
is_keyed),
ReaderHandlerDeleter(participant_, subscriber_)
);

// Insert this new Reader Handler indexed by topic name
readers_.insert(
std::make_pair(
topic_name,
std::move(new_reader)));
readerQos,
this); // Mask not required
}

// Get Dyn Type for type
// This could not fail, as we know type is registered
eprosima::fastrtps::types::DynamicType_ptr dyn_type = get_type_registered_(type_name);

if (nullptr != datareader)
{
// Create Reader Handler with all this information and add it to readers
// Create it with specific deleter for reader and topic
ReaderHandlerReference new_reader(
new ReaderHandler(
topic,
datareader,
dyn_type,
listener_,
data_type_configuration,
is_keyed),
ReaderHandlerDeleter(participant_, subscriber_)
);

// Insert this new Reader Handler indexed by topic name
readers_.insert(
std::make_pair(
topic_name,
std::move(new_reader)));
}
else
{
WARNING("No valid reader has been found or created for: " << topic_name);
}
}

////////////////////////////////////////////////////
Expand Down Expand Up @@ -529,6 +552,18 @@ eprosima::fastdds::dds::DataReaderQos Participant::default_datareader_qos_()
return qos;
}

void Participant::keyed_dataReader_qos_(eprosima::fastdds::dds::DataReaderQos& Qos)
{
Qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::DYNAMIC_RESERVE_MEMORY_MODE;
Qos.history().kind = eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS;
Qos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS;
Qos.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS;
Qos.resource_limits().max_samples = 100;
Qos.resource_limits().allocated_samples = 100;
Qos.resource_limits().max_instances = 5;
Qos.resource_limits().max_samples_per_instance = 20;
}

eprosima::fastdds::dds::TopicQos Participant::default_topic_qos_()
{
eprosima::fastdds::dds::TopicQos qos =
Expand Down
5 changes: 5 additions & 0 deletions plugins/datastreamer_plugin/fastdds/Participant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ class Participant : public eprosima::fastdds::dds::DomainParticipantListener

static eprosima::fastdds::dds::DataReaderQos default_datareader_qos_();

/**
* @brief add keyed topic QOS to a default one for a dataReader
*/
static void keyed_dataReader_qos_(eprosima::fastdds::dds::DataReaderQos& Qos);

/**
* @brief Get default mask
*
Expand Down
63 changes: 22 additions & 41 deletions plugins/datastreamer_plugin/fastdds/ReaderHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,51 +56,32 @@ ReaderHandler::ReaderHandler(
data_ = eprosima::fastrtps::types::DynamicDataFactory::get_instance()->create_data(type_);

auto refDesc = type_->get_descriptor();
if (true == is_keyed_)
// Create the static structures to store the data introspection information AND the data itself
utils::get_introspection_type_names(
topic_name(),
type_,
data_type_configuration,
numeric_data_info_,
string_data_info_);

// Create the data structures so they are not copied in the future
for (const auto& info : numeric_data_info_)
{
DEBUG("\tTopic: " << topic->get_name() << " has key: " << std::to_string(is_keyed_));

// get created reader QOS and apply minimal QOS for keyed data reader (try to avoid sample loss)
eprosima::fastdds::dds::DataReaderQos readerQos = reader_->get_qos();
readerQos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::DYNAMIC_RESERVE_MEMORY_MODE;
readerQos.history().kind = eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS;
readerQos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS;
readerQos.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS;
readerQos.resource_limits().max_samples = 100;
readerQos.resource_limits().allocated_samples = 100;
readerQos.resource_limits().max_instances = 5;
readerQos.resource_limits().max_samples_per_instance = 20;
reader_->set_qos(readerQos);
numeric_data_.push_back({ std::get<0>(info), 0});
}
else
for (const auto& info : string_data_info_)
{
// Create the static structures to store the data introspection information AND the data itself
utils::get_introspection_type_names(
topic_name(),
type_,
data_type_configuration,
numeric_data_info_,
string_data_info_);

// Create the data structures so they are not copied in the future
for (const auto& info : numeric_data_info_)
{
numeric_data_.push_back({ std::get<0>(info), 0});
}
for (const auto& info : string_data_info_)
{
string_data_.push_back({ std::get<0>(info), "-"});
}
string_data_.push_back({ std::get<0>(info), "-"});
}

DEBUG("Reader created in topic: " << topic_name() << " with types: ");
for (const auto& info : numeric_data_info_)
{
DEBUG("\tNumeric: " << std::get<0>(info));
}
for (const auto& info : string_data_info_)
{
DEBUG("\tString: " << std::get<0>(info));
}
DEBUG("Reader created in topic: " << topic_name() << " with types: ");
for (const auto& info : numeric_data_info_)
{
DEBUG("\tNumeric: " << std::get<0>(info));
}
for (const auto& info : string_data_info_)
{
DEBUG("\tString: " << std::get<0>(info));
}

// Set this object as this reader's listener
Expand Down

0 comments on commit 96f3b3e

Please sign in to comment.