Skip to content

Commit

Permalink
Enforce ignore_local_endpoints on XML participants (#81)
Browse files Browse the repository at this point in the history
* Enforce ignore_local_endpoints on XML participants

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Improve the readability and safety of the DDS CommonReader

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Uncrustify

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Apply suggestions

Signed-off-by: tempate <danieldiaz@eprosima.com>

---------

Signed-off-by: tempate <danieldiaz@eprosima.com>
  • Loading branch information
Tempate authored Jan 9, 2024
1 parent 20606d1 commit 16b1acb
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 54 deletions.
4 changes: 2 additions & 2 deletions ddspipe_core/src/cpp/communication/dds/Track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void Track::transmit_() noexcept

if (ret == utils::ReturnCode::RETCODE_NO_DATA)
{
// There is no more data, so reduce in 1 the status
// There is no more data; reduce the status by 1
unsigned int previous_status = data_available_status_.fetch_sub(DataAvailableStatus::transmitting_data);
if (previous_status == DataAvailableStatus::transmitting_data)
{
Expand Down Expand Up @@ -271,7 +271,7 @@ void Track::transmit_() noexcept
}
}

// Let the data to be removed by itself
// Let the data be removed by itself
}
}

Expand Down
1 change: 1 addition & 0 deletions ddspipe_core/src/cpp/types/data/RtpsPayloadData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ RtpsPayloadData::RtpsPayloadData()
RtpsPayloadData::~RtpsPayloadData()
{
logDebug(DDSPIPE_PAYLOAD, "Deleting Payload " << this << ": " << *this << ".");

// If payload owner exists and payload has size, release it correctly in pool
if (payload_owner && payload.length > 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ class XmlParticipant
fastdds::dds::DomainParticipantQos
reckon_participant_qos_() const override;

virtual
fastdds::dds::DomainParticipant*
create_dds_participant_() override;

/////////////////////////
// INTERNAL VARIABLES
/////////////////////////
Expand Down
44 changes: 20 additions & 24 deletions ddspipe_participants/src/cpp/participant/dds/XmlParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <cpp_utils/exception/InitializationException.hpp>

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include <cpp_utils/Log.hpp>
#include <cpp_utils/exception/ConfigurationException.hpp>
Expand All @@ -30,14 +31,26 @@ namespace ddspipe {
namespace participants {
namespace dds {


using namespace eprosima::fastrtps::xmlparser;


XmlParticipant::XmlParticipant(
const std::shared_ptr<XmlParticipantConfiguration>& participant_configuration,
const std::shared_ptr<core::PayloadPool>& payload_pool,
const std::shared_ptr<core::DiscoveryDatabase>& discovery_database)
: CommonParticipant(participant_configuration, payload_pool, discovery_database)
, xml_specific_configuration_(*reinterpret_cast<XmlParticipantConfiguration*>(configuration_.get()))
{
// Do nothing
// Replace the configuration's domain with the XML's domainId
eprosima::fastrtps::ParticipantAttributes attr;

if (xml_specific_configuration_.participant_profile.is_set() &&
XMLProfileManager::fillParticipantAttributes(xml_specific_configuration_.participant_profile
.get_value(), attr) == XMLP_ret::XML_OK)
{
configuration_->domain = attr.domainId;
}
}

std::shared_ptr<core::IWriter> XmlParticipant::create_writer(
Expand Down Expand Up @@ -76,11 +89,9 @@ std::shared_ptr<core::IReader> XmlParticipant::create_reader(

fastdds::dds::DomainParticipantQos XmlParticipant::reckon_participant_qos_() const
{
// NOTE: Due to the creation of the participant using overriden create_dds_participant_
// this method is never called. However we keep it for the possible future.
fastdds::dds::DomainParticipantQos qos = CommonParticipant::reckon_participant_qos_();

// If participant profile have been set, use it
// Use the participant's profile if it has been set
if (xml_specific_configuration_.participant_profile.is_set())
{
auto res = fastdds::dds::DomainParticipantFactory::get_instance()->get_participant_qos_from_profile(
Expand All @@ -96,27 +107,12 @@ fastdds::dds::DomainParticipantQos XmlParticipant::reckon_participant_qos_() con
}
}

return qos;
}
// Enforce ignore local endpoints on XML participants
qos.properties().properties().emplace_back(
"fastdds.ignore_local_endpoints",
"true");

fastdds::dds::DomainParticipant* XmlParticipant::create_dds_participant_()
{
// Set listener mask so reader read its own messages
fastdds::dds::StatusMask mask;
mask << fastdds::dds::StatusMask::publication_matched();
mask << fastdds::dds::StatusMask::subscription_matched();

if (xml_specific_configuration_.participant_profile.is_set())
{
return eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant_with_profile(
xml_specific_configuration_.participant_profile.get_value(),
this,
mask);
}
else
{
return CommonParticipant::create_dds_participant_();
}
return qos;
}

} /* namespace dds */
Expand Down
46 changes: 22 additions & 24 deletions ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ void CommonReader::on_data_available(
}

CommonReader::CommonReader(
const core::types::ParticipantId& participant_id,
const core::types::DdsTopic& topic,
const ParticipantId& participant_id,
const DdsTopic& topic,
const std::shared_ptr<core::PayloadPool>& payload_pool,
fastdds::dds::DomainParticipant* participant,
fastdds::dds::Topic* topic_entity)
Expand All @@ -117,8 +117,8 @@ CommonReader::CommonReader(
utils::ReturnCode CommonReader::take_nts_(
std::unique_ptr<core::IRoutingData>& data) noexcept
{
// NOTE: we assume this function is called always from same thread
// NOTE: we assume this function is called always with nullptr data
// NOTE: we assume this function is always called from the same thread
// NOTE: we assume this function is always called with nullptr data

logInfo(DDSPIPE_DDS_READER, "Taking data in " << participant_id_ << " for topic " << topic_ << ".");

Expand All @@ -128,41 +128,39 @@ utils::ReturnCode CommonReader::take_nts_(
return utils::ReturnCode::RETCODE_NO_DATA;
}

RtpsPayloadData* rtps_data;
std::unique_ptr<RtpsPayloadData> rtps_data;
fastdds::dds::SampleInfo info;

// Loop for read messages until we receive one that does not come from same participant
// NOTE: not reading local messages must be done in this loop because:
// 1. there is no way in DDS to ignore this reader from writer as there were in RTPS
// 2. ignore_local_endpoints would be override by xml configuration
while (true)
do
{
// Ensure that the previous Payload gets destroyed to avoid memory leaks.
rtps_data = new core::types::RtpsPayloadData();
data.reset(rtps_data);
rtps_data.reset(new RtpsPayloadData());

auto ret = reader_->take_next_sample(rtps_data, &info);
auto ret = reader_->take_next_sample(rtps_data.get(), &info);

// Save the payload owner so that the memory is freed correctly.
// If the payload owner is not set, rtps_data won't release the payload on destruction
rtps_data->payload_owner = payload_pool_.get();

// If error reading data
if (!ret)
{
// There has been an error taking the data. Exit.
return ret;
}

// Check if the sample is acceptable
if (should_accept_sample_(info))
{
break;
}
}
} while (!should_accept_sample_(info));

logInfo(DDSPIPE_DDS_READER, "Data taken in " << participant_id_ << " for topic " << topic_ << ".");

// Verify that the rtps_data object is valid
if (!rtps_data)
{
logError(DDSPIPE_DDS_READER, "The data taken by the reader is not valid.");
return utils::ReturnCode::RETCODE_ERROR;
}

fill_received_data_(info, *rtps_data);

// data is a unique_ptr; the memory will be handled correctly.
data.reset(rtps_data.release());

return utils::ReturnCode::RETCODE_OK;
}

Expand Down Expand Up @@ -245,7 +243,7 @@ bool CommonReader::should_accept_sample_(

void CommonReader::fill_received_data_(
const fastdds::dds::SampleInfo& info,
core::types::RtpsPayloadData& data_to_fill) const noexcept
RtpsPayloadData& data_to_fill) const noexcept
{
// Store the new data that has arrived in the Track data
// Get the writer guid
Expand Down

0 comments on commit 16b1acb

Please sign in to comment.