diff --git a/ddspipe_core/src/cpp/communication/dds/Track.cpp b/ddspipe_core/src/cpp/communication/dds/Track.cpp index 4b769ca4..4d65c0a2 100644 --- a/ddspipe_core/src/cpp/communication/dds/Track.cpp +++ b/ddspipe_core/src/cpp/communication/dds/Track.cpp @@ -222,7 +222,7 @@ void Track::transmit_() noexcept if (ret == utils::ReturnCode::RETCODE_NO_DATA) { - // There is no more data, so reduce in 1 the status + // There is no more data; reduce the status by 1 unsigned int previous_status = data_available_status_.fetch_sub(DataAvailableStatus::transmitting_data); if (previous_status == DataAvailableStatus::transmitting_data) { @@ -271,7 +271,7 @@ void Track::transmit_() noexcept } } - // Let the data to be removed by itself + // Let the data be removed by itself } } diff --git a/ddspipe_core/src/cpp/types/data/RtpsPayloadData.cpp b/ddspipe_core/src/cpp/types/data/RtpsPayloadData.cpp index fc43547f..0050aca8 100644 --- a/ddspipe_core/src/cpp/types/data/RtpsPayloadData.cpp +++ b/ddspipe_core/src/cpp/types/data/RtpsPayloadData.cpp @@ -29,6 +29,7 @@ RtpsPayloadData::RtpsPayloadData() RtpsPayloadData::~RtpsPayloadData() { logDebug(DDSPIPE_PAYLOAD, "Deleting Payload " << this << ": " << *this << "."); + // If payload owner exists and payload has size, release it correctly in pool if (payload_owner && payload.length > 0) { diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/XmlParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/XmlParticipant.hpp index ff80d30e..3c687cac 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/XmlParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/XmlParticipant.hpp @@ -64,10 +64,6 @@ class XmlParticipant fastdds::dds::DomainParticipantQos reckon_participant_qos_() const override; - virtual - fastdds::dds::DomainParticipant* - create_dds_participant_() override; - ///////////////////////// // INTERNAL VARIABLES ///////////////////////// diff --git a/ddspipe_participants/src/cpp/participant/dds/XmlParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/XmlParticipant.cpp index ec796294..a4b08208 100644 --- a/ddspipe_participants/src/cpp/participant/dds/XmlParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/XmlParticipant.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -30,6 +31,10 @@ namespace ddspipe { namespace participants { namespace dds { + +using namespace eprosima::fastrtps::xmlparser; + + XmlParticipant::XmlParticipant( const std::shared_ptr& participant_configuration, const std::shared_ptr& payload_pool, @@ -37,7 +42,15 @@ XmlParticipant::XmlParticipant( : CommonParticipant(participant_configuration, payload_pool, discovery_database) , xml_specific_configuration_(*reinterpret_cast(configuration_.get())) { - // Do nothing + // Replace the configuration's domain with the XML's domainId + eprosima::fastrtps::ParticipantAttributes attr; + + if (xml_specific_configuration_.participant_profile.is_set() && + XMLProfileManager::fillParticipantAttributes(xml_specific_configuration_.participant_profile + .get_value(), attr) == XMLP_ret::XML_OK) + { + configuration_->domain = attr.domainId; + } } std::shared_ptr XmlParticipant::create_writer( @@ -76,11 +89,9 @@ std::shared_ptr XmlParticipant::create_reader( fastdds::dds::DomainParticipantQos XmlParticipant::reckon_participant_qos_() const { - // NOTE: Due to the creation of the participant using overriden create_dds_participant_ - // this method is never called. However we keep it for the possible future. fastdds::dds::DomainParticipantQos qos = CommonParticipant::reckon_participant_qos_(); - // If participant profile have been set, use it + // Use the participant's profile if it has been set if (xml_specific_configuration_.participant_profile.is_set()) { auto res = fastdds::dds::DomainParticipantFactory::get_instance()->get_participant_qos_from_profile( @@ -96,27 +107,12 @@ fastdds::dds::DomainParticipantQos XmlParticipant::reckon_participant_qos_() con } } - return qos; -} + // Enforce ignore local endpoints on XML participants + qos.properties().properties().emplace_back( + "fastdds.ignore_local_endpoints", + "true"); -fastdds::dds::DomainParticipant* XmlParticipant::create_dds_participant_() -{ - // Set listener mask so reader read its own messages - fastdds::dds::StatusMask mask; - mask << fastdds::dds::StatusMask::publication_matched(); - mask << fastdds::dds::StatusMask::subscription_matched(); - - if (xml_specific_configuration_.participant_profile.is_set()) - { - return eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant_with_profile( - xml_specific_configuration_.participant_profile.get_value(), - this, - mask); - } - else - { - return CommonParticipant::create_dds_participant_(); - } + return qos; } } /* namespace dds */ diff --git a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp index c59e4e4c..ed0a3809 100644 --- a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp @@ -98,8 +98,8 @@ void CommonReader::on_data_available( } CommonReader::CommonReader( - const core::types::ParticipantId& participant_id, - const core::types::DdsTopic& topic, + const ParticipantId& participant_id, + const DdsTopic& topic, const std::shared_ptr& payload_pool, fastdds::dds::DomainParticipant* participant, fastdds::dds::Topic* topic_entity) @@ -117,8 +117,8 @@ CommonReader::CommonReader( utils::ReturnCode CommonReader::take_nts_( std::unique_ptr& data) noexcept { - // NOTE: we assume this function is called always from same thread - // NOTE: we assume this function is called always with nullptr data + // NOTE: we assume this function is always called from the same thread + // NOTE: we assume this function is always called with nullptr data logInfo(DDSPIPE_DDS_READER, "Taking data in " << participant_id_ << " for topic " << topic_ << "."); @@ -128,41 +128,39 @@ utils::ReturnCode CommonReader::take_nts_( return utils::ReturnCode::RETCODE_NO_DATA; } - RtpsPayloadData* rtps_data; + std::unique_ptr rtps_data; fastdds::dds::SampleInfo info; - // Loop for read messages until we receive one that does not come from same participant - // NOTE: not reading local messages must be done in this loop because: - // 1. there is no way in DDS to ignore this reader from writer as there were in RTPS - // 2. ignore_local_endpoints would be override by xml configuration - while (true) + do { - // Ensure that the previous Payload gets destroyed to avoid memory leaks. - rtps_data = new core::types::RtpsPayloadData(); - data.reset(rtps_data); + rtps_data.reset(new RtpsPayloadData()); - auto ret = reader_->take_next_sample(rtps_data, &info); + auto ret = reader_->take_next_sample(rtps_data.get(), &info); - // Save the payload owner so that the memory is freed correctly. + // If the payload owner is not set, rtps_data won't release the payload on destruction rtps_data->payload_owner = payload_pool_.get(); - // If error reading data if (!ret) { + // There has been an error taking the data. Exit. return ret; } - - // Check if the sample is acceptable - if (should_accept_sample_(info)) - { - break; - } - } + } while (!should_accept_sample_(info)); logInfo(DDSPIPE_DDS_READER, "Data taken in " << participant_id_ << " for topic " << topic_ << "."); + // Verify that the rtps_data object is valid + if (!rtps_data) + { + logError(DDSPIPE_DDS_READER, "The data taken by the reader is not valid."); + return utils::ReturnCode::RETCODE_ERROR; + } + fill_received_data_(info, *rtps_data); + // data is a unique_ptr; the memory will be handled correctly. + data.reset(rtps_data.release()); + return utils::ReturnCode::RETCODE_OK; } @@ -245,7 +243,7 @@ bool CommonReader::should_accept_sample_( void CommonReader::fill_received_data_( const fastdds::dds::SampleInfo& info, - core::types::RtpsPayloadData& data_to_fill) const noexcept + RtpsPayloadData& data_to_fill) const noexcept { // Store the new data that has arrived in the Track data // Get the writer guid