Skip to content

Commit

Permalink
Custom pools on DDS layer feature (#3755) (#4247)
Browse files Browse the repository at this point in the history
* Custom pools on DDS layer feature (#3755)

* Custom Payload pools test implementation (#3719)

* Refs #19024: Public API implementation

Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>

* Refs #19024: Update versions.md

Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>

* Refs #19023: Fix build issues

Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>

* Refs #19023: Custom Payload pools test implementation

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #19023: Update test to use public API

Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>

* Refs #19023: Please linters

Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>

* Refs #19023: Added delay between writing and checking payload request

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

---------

Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>
Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>
Co-authored-by: JesusPoderoso <jesuspoderoso@eprosima.com>
Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Include custom pools impl (#3740)

Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>
Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19024: Modified custom payload pool and datasharing interaction

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>
Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19024. Correctly set payload owner on test pools.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

---------

Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>
Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>
Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: jsantiago-eProsima <90755661+jsantiago-eProsima@users.noreply.github.com>
Co-authored-by: Javier Santiago <javiersantiago@eprosima.com>
Co-authored-by: Miguel Company <MiguelCompany@eprosima.com>

* Add API to create writers and readers with a custom payload pool

Signed-off-by: Raul Sanchez-Mateos <raul@eprosima.com>

* Use correct API in tests

Signed-off-by: Raul Sanchez-Mateos <raul@eprosima.com>

---------

Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>
Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>
Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Signed-off-by: Raul Sanchez-Mateos <raul@eprosima.com>
Co-authored-by: Jesús Poderoso <120394830+JesusPoderoso@users.noreply.github.com>
Co-authored-by: jsantiago-eProsima <90755661+jsantiago-eProsima@users.noreply.github.com>
Co-authored-by: Javier Santiago <javiersantiago@eprosima.com>
Co-authored-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
5 people authored Jan 18, 2024
1 parent aa46333 commit 5e4583d
Show file tree
Hide file tree
Showing 22 changed files with 588 additions and 42 deletions.
34 changes: 34 additions & 0 deletions include/fastdds/dds/publisher/Publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,23 @@ class Publisher : public DomainEntity
DataWriterListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* This operation creates a DataWriter. The returned DataWriter will be attached and belongs to the Publisher.
*
* @param topic Topic the DataWriter will be listening
* @param qos QoS of the DataWriter.
* @param payload_pool IPayloadPool shared pointer that defines writer payload (default: nullptr).
* @param listener Pointer to the listener (default: nullptr).
* @param mask StatusMask that holds statuses the listener responds to (default: all).
* @return Pointer to the created DataWriter. nullptr if failed.
*/
RTPS_DllAPI DataWriter* create_datawriter_with_payload_pool(
Topic* topic,
const DataWriterQos& qos,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool,
DataWriterListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* This operation creates a DataWriter. The returned DataWriter will be attached and belongs to the Publisher.
*
Expand All @@ -178,6 +195,23 @@ class Publisher : public DomainEntity
DataWriterListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* This operation creates a DataWriter. The returned DataWriter will be attached and belongs to the Publisher.
*
* @param topic Topic the DataWriter will be listening
* @param profile_name DataWriter profile name.
* @param payload_pool IPayloadPool shared pointer that defines writer payload (default: nullptr).
* @param listener Pointer to the listener (default: nullptr).
* @param mask StatusMask that holds statuses the listener responds to (default: all).
* @return Pointer to the created DataWriter. nullptr if failed.
*/
RTPS_DllAPI DataWriter* create_datawriter_with_profile_with_payload_pool(
Topic* topic,
const std::string& profile_name,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool,
DataWriterListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* This operation deletes a DataWriter that belongs to the Publisher.
*
Expand Down
34 changes: 34 additions & 0 deletions include/fastdds/dds/subscriber/Subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,23 @@ class Subscriber : public DomainEntity
DataReaderListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* This operation creates a DataReader. The returned DataReader will be attached and belong to the Subscriber.
*
* @param topic Topic the DataReader will be listening.
* @param reader_qos QoS of the DataReader.
* @param payload_pool IPayloadPool shared pointer that defines reader payload (default: nullptr).
* @param listener Pointer to the listener (default: nullptr)
* @param mask StatusMask that holds statuses the listener responds to (default: all).
* @return Pointer to the created DataReader. nullptr if failed.
*/
RTPS_DllAPI DataReader* create_datareader_with_payload_pool(
TopicDescription* topic,
const DataReaderQos& reader_qos,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool,
DataReaderListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* This operation creates a DataReader. The returned DataReader will be attached and belongs to the Subscriber.
*
Expand All @@ -185,6 +202,23 @@ class Subscriber : public DomainEntity
DataReaderListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* This operation creates a DataReader. The returned DataReader will be attached and belongs to the Subscriber.
*
* @param topic Topic the DataReader will be listening.
* @param profile_name DataReader profile name.
* @param payload_pool IPayloadPool shared pointer that defines reader payload (default: nullptr).
* @param listener Pointer to the listener (default: nullptr)
* @param mask StatusMask that holds statuses the listener responds to (default: all).
* @return Pointer to the created DataReader. nullptr if failed.
*/
RTPS_DllAPI DataReader* create_datareader_with_profile_with_payload_pool(
TopicDescription* topic,
const std::string& profile_name,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool,
DataReaderListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* This operation deletes a DataReader that belongs to the Subscriber.
*
Expand Down
21 changes: 19 additions & 2 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ DataWriterImpl::DataWriterImpl(
TypeSupport type,
Topic* topic,
const DataWriterQos& qos,
DataWriterListener* listen)
DataWriterListener* listen,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool)
: publisher_(p)
, type_(type)
, topic_(topic)
Expand Down Expand Up @@ -174,6 +175,12 @@ DataWriterImpl::DataWriterImpl(
fastrtps::rtps::RTPSParticipantImpl::preprocess_endpoint_attributes<WRITER, 0x03, 0x02>(
EntityId_t::unknown(), publisher_->get_participant_impl()->id_counter(), endpoint_attributes, guid_.entityId);
guid_.guidPrefix = publisher_->get_participant_impl()->guid().guidPrefix;

if (payload_pool != nullptr)
{
is_custom_payload_pool_ = true;
payload_pool_ = payload_pool;
}
}

DataWriterImpl::DataWriterImpl(
Expand Down Expand Up @@ -1981,7 +1988,7 @@ bool DataWriterImpl::release_payload_pool()

bool result = true;

if (is_data_sharing_compatible_)
if (is_data_sharing_compatible_ || is_custom_payload_pool_)
{
// No-op
}
Expand Down Expand Up @@ -2036,6 +2043,11 @@ ReturnCode_t DataWriterImpl::check_datasharing_compatible(
return ReturnCode_t::RETCODE_OK;
break;
case DataSharingKind::ON:
if (is_custom_payload_pool_)
{
EPROSIMA_LOG_ERROR(DATA_WRITER, "Custom payload pool detected. Cannot force Data sharing usage.");
return ReturnCode_t::RETCODE_INCONSISTENT_POLICY;
}
#if HAVE_SECURITY
if (has_security_enabled)
{
Expand All @@ -2061,6 +2073,11 @@ ReturnCode_t DataWriterImpl::check_datasharing_compatible(
return ReturnCode_t::RETCODE_OK;
break;
case DataSharingKind::AUTO:
if (is_custom_payload_pool_)
{
EPROSIMA_LOG_INFO(DATA_WRITER, "Custom payload pool detected. Data Sharing disabled.");
return ReturnCode_t::RETCODE_OK;
}
#if HAVE_SECURITY
if (has_security_enabled)
{
Expand Down
5 changes: 4 additions & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
TypeSupport type,
Topic* topic,
const DataWriterQos& qos,
DataWriterListener* listener = nullptr);
DataWriterListener* listener = nullptr,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool = nullptr);

DataWriterImpl(
PublisherImpl* p,
Expand Down Expand Up @@ -488,6 +489,8 @@ class DataWriterImpl : protected rtps::IReaderDataFilter

std::shared_ptr<IPayloadPool> payload_pool_;

bool is_custom_payload_pool_ = false;

std::unique_ptr<LoanCollection> loans_;

fastrtps::rtps::GUID_t guid_;
Expand Down
24 changes: 22 additions & 2 deletions src/cpp/fastdds/publisher/Publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,17 @@ DataWriter* Publisher::create_datawriter(
DataWriterListener* listener,
const StatusMask& mask)
{
return impl_->create_datawriter(topic, qos, listener, mask);
return impl_->create_datawriter(topic, qos, listener, mask, nullptr);
}

DataWriter* Publisher::create_datawriter_with_payload_pool(
Topic* topic,
const DataWriterQos& qos,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool,
DataWriterListener* listener,
const StatusMask& mask)
{
return impl_->create_datawriter(topic, qos, listener, mask, payload_pool);
}

DataWriter* Publisher::create_datawriter_with_profile(
Expand All @@ -123,7 +133,17 @@ DataWriter* Publisher::create_datawriter_with_profile(
DataWriterListener* listener,
const StatusMask& mask)
{
return impl_->create_datawriter_with_profile(topic, profile_name, listener, mask);
return impl_->create_datawriter_with_profile(topic, profile_name, listener, mask, nullptr);
}

DataWriter* Publisher::create_datawriter_with_profile_with_payload_pool(
Topic* topic,
const std::string& profile_name,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool,
DataWriterListener* listener,
const StatusMask& mask)
{
return impl_->create_datawriter_with_profile(topic, profile_name, listener, mask, payload_pool);
}

ReturnCode_t Publisher::delete_datawriter(
Expand Down
15 changes: 9 additions & 6 deletions src/cpp/fastdds/publisher/PublisherImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,18 @@ DataWriterImpl* PublisherImpl::create_datawriter_impl(
const TypeSupport& type,
Topic* topic,
const DataWriterQos& qos,
DataWriterListener* listener)
DataWriterListener* listener,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool)
{
return new DataWriterImpl(this, type, topic, qos, listener);
return new DataWriterImpl(this, type, topic, qos, listener, payload_pool);
}

DataWriter* PublisherImpl::create_datawriter(
Topic* topic,
const DataWriterQos& qos,
DataWriterListener* listener,
const StatusMask& mask)
const StatusMask& mask,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool)
{
EPROSIMA_LOG_INFO(PUBLISHER, "CREATING WRITER IN TOPIC: " << topic->get_name());
//Look for the correct type registration
Expand All @@ -234,7 +236,7 @@ DataWriter* PublisherImpl::create_datawriter(
return nullptr;
}

DataWriterImpl* impl = create_datawriter_impl(type_support, topic, qos, listener);
DataWriterImpl* impl = create_datawriter_impl(type_support, topic, qos, listener, payload_pool);
return create_datawriter(topic, impl, mask);
}

Expand Down Expand Up @@ -269,15 +271,16 @@ DataWriter* PublisherImpl::create_datawriter_with_profile(
Topic* topic,
const std::string& profile_name,
DataWriterListener* listener,
const StatusMask& mask)
const StatusMask& mask,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool)
{
// TODO (ILG): Change when we have full XML support for DDS QoS profiles
PublisherAttributes attr;
if (XMLP_ret::XML_OK == XMLProfileManager::fillPublisherAttributes(profile_name, attr))
{
DataWriterQos qos = default_datawriter_qos_;
utils::set_qos_from_attributes(qos, attr);
return create_datawriter(topic, qos, listener, mask);
return create_datawriter(topic, qos, listener, mask, payload_pool);
}

return nullptr;
Expand Down
9 changes: 6 additions & 3 deletions src/cpp/fastdds/publisher/PublisherImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ class PublisherImpl
Topic* topic,
const DataWriterQos& qos,
DataWriterListener* listener,
const StatusMask& mask = StatusMask::all());
const StatusMask& mask = StatusMask::all(),
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool = nullptr);

DataWriter* create_datawriter_with_profile(
Topic* topic,
const std::string& profile_name,
DataWriterListener* listener,
const StatusMask& mask = StatusMask::all());
const StatusMask& mask = StatusMask::all(),
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool = nullptr);

ReturnCode_t delete_datawriter(
const DataWriter* writer);
Expand Down Expand Up @@ -255,7 +257,8 @@ class PublisherImpl
const TypeSupport& type,
Topic* topic,
const DataWriterQos& qos,
DataWriterListener* listener);
DataWriterListener* listener,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool);

static void set_qos(
PublisherQos& to,
Expand Down
31 changes: 24 additions & 7 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ DataReaderImpl::DataReaderImpl(
const TypeSupport& type,
TopicDescription* topic,
const DataReaderQos& qos,
DataReaderListener* listener)
DataReaderListener* listener,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool)
: subscriber_(s)
, type_(type)
, topic_(topic)
Expand All @@ -124,6 +125,12 @@ DataReaderImpl::DataReaderImpl(
RTPSParticipantImpl::preprocess_endpoint_attributes<READER, 0x04, 0x07>(
EntityId_t::unknown(), subscriber_->get_participant_impl()->id_counter(), endpoint_attributes, guid_.entityId);
guid_.guidPrefix = subscriber_->get_participant_impl()->guid().guidPrefix;

if (payload_pool != nullptr)
{
is_custom_payload_pool_ = true;
payload_pool_ = payload_pool;
}
}

ReturnCode_t DataReaderImpl::enable()
Expand Down Expand Up @@ -1724,22 +1731,32 @@ std::shared_ptr<IPayloadPool> DataReaderImpl::get_payload_pool()

PoolConfig config = PoolConfig::from_history_attributes(history_.m_att);

if (!payload_pool_)
if (!sample_pool_)
{
payload_pool_ = TopicPayloadPoolRegistry::get(topic_->get_impl()->get_rtps_topic_name(), config);
sample_pool_ = std::make_shared<detail::SampleLoanManager>(config, type_);
}

payload_pool_->reserve_history(config, true);
if (!is_custom_payload_pool_)
{
std::shared_ptr<ITopicPayloadPool> topic_payload_pool = TopicPayloadPoolRegistry::get(
topic_->get_impl()->get_rtps_topic_name(), config);
topic_payload_pool->reserve_history(config, true);
payload_pool_ = topic_payload_pool;
}
return payload_pool_;
}

void DataReaderImpl::release_payload_pool()
{
assert(payload_pool_);

PoolConfig config = PoolConfig::from_history_attributes(history_.m_att);
payload_pool_->release_history(config, true);
if (!is_custom_payload_pool_)
{
PoolConfig config = PoolConfig::from_history_attributes(history_.m_att);
std::shared_ptr<fastrtps::rtps::ITopicPayloadPool> topic_payload_pool =
std::dynamic_pointer_cast<fastrtps::rtps::ITopicPayloadPool>(payload_pool_);
topic_payload_pool->release_history(config, true);
}

payload_pool_.reset();
}

Expand Down
7 changes: 5 additions & 2 deletions src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class DataReaderImpl
const TypeSupport& type,
TopicDescription* topic,
const DataReaderQos& qos,
DataReaderListener* listener = nullptr);
DataReaderListener* listener = nullptr,
std::shared_ptr<fastrtps::rtps::IPayloadPool> payload_pool = nullptr);

public:

Expand Down Expand Up @@ -482,8 +483,10 @@ class DataReaderImpl

DataReader* user_datareader_ = nullptr;

std::shared_ptr<ITopicPayloadPool> payload_pool_;
std::shared_ptr<detail::SampleLoanManager> sample_pool_;
std::shared_ptr<IPayloadPool> payload_pool_;

bool is_custom_payload_pool_ = false;

detail::SampleInfoPool sample_info_pool_;
detail::DataReaderLoanManager loan_manager_;
Expand Down
Loading

0 comments on commit 5e4583d

Please sign in to comment.