Skip to content

Commit

Permalink
DataWriter/Reader get_matched_publication/subscription() Tests & Fe…
Browse files Browse the repository at this point in the history
…ature 3.x (#5312)

* Refs #21808: DataWriter/Reader get_matched_publication/subscription() Test implementation 3.x

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21808: DataWriter/Reader get_matched_publication/subscription() Feature implementation 3.x

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21808: versions.md

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21808: Update UnsupportedData*Methods

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21808: Apply Ricardo's rev

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21808: Apply last suggestion

Signed-off-by: Mario-DL <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
Signed-off-by: Mario-DL <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL authored Oct 25, 2024
1 parent 2a1cdf6 commit 4b00904
Show file tree
Hide file tree
Showing 44 changed files with 1,592 additions and 57 deletions.
24 changes: 24 additions & 0 deletions include/fastdds/rtps/participant/RTPSParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,30 @@ class FASTDDS_EXPORTED_API RTPSParticipant
*/
std::vector<TransportNetmaskFilterInfo> get_netmask_filter_info() const;

/**
* @brief Fills the provided publication discovery data with the information of the
* writer identified by writer_guid.
*
* @param[out] data publication discovery data to fill.
* @param[in] writer_guid GUID of the writer to get the information from.
* @return True if the writer was found and the data was filled.
*/
bool get_publication_info(
fastdds::rtps::PublicationBuiltinTopicData& data,
const GUID_t& writer_guid) const;

/**
* @brief Fills the provided subscription discovery data with the information of the
* reader identified by reader_guid.
*
* @param[out] data subscription discovery data to fill.
* @param[in] reader_guid GUID of the reader to get the information from.
* @return True if the reader was found and the data was filled.
*/
bool get_subscription_info(
fastdds::rtps::SubscriptionBuiltinTopicData& data,
const GUID_t& reader_guid) const;

#if HAVE_SECURITY

/**
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/reader/RTPSReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ class RTPSReader : public Endpoint
FASTDDS_EXPORTED_API virtual void set_content_filter(
eprosima::fastdds::rtps::IReaderDataFilter* filter) = 0;

/**
* @brief Fills the provided vector with the GUIDs of the matched writers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched writers.
* @return True if the operation was successful.
*/
FASTDDS_EXPORTED_API virtual bool matched_writers_guids(
std::vector<GUID_t>& guids) const = 0;

/**
* @brief Read the next unread CacheChange_t from the history.
*
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/writer/RTPSWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ class RTPSWriter : public Endpoint
*/
FASTDDS_EXPORTED_API virtual bool get_disable_positive_acks() const = 0;

/**
* @brief Fills the provided vector with the GUIDs of the matched readers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched readers.
* @return True if the operation was successful.
*/
FASTDDS_EXPORTED_API virtual bool matched_readers_guids(
std::vector<GUID_t>& guids) const = 0;

#ifdef FASTDDS_STATISTICS

/**
Expand Down
13 changes: 2 additions & 11 deletions src/cpp/fastdds/publisher/DataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,22 +270,13 @@ ReturnCode_t DataWriter::get_matched_subscription_data(
SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const
{
static_cast<void> (subscription_data);
static_cast<void> (subscription_handle);
return RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_data, subscription_handle);
*/
return impl_->get_matched_subscription_data(subscription_data, subscription_handle);
}

ReturnCode_t DataWriter::get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const
{
static_cast<void> (subscription_handles);
return RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_handles);
*/
return impl_->get_matched_subscriptions(subscription_handles);
}

ReturnCode_t DataWriter::clear_history(
Expand Down
42 changes: 42 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2335,6 +2335,48 @@ void DataWriterImpl::filter_is_being_removed(
}
}

ReturnCode_t DataWriterImpl::get_matched_subscription_data(
SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const
{
ReturnCode_t ret = RETCODE_BAD_PARAMETER;
fastdds::rtps::GUID_t reader_guid = iHandle2GUID(subscription_handle);

if (writer_ && writer_->matched_reader_is_matched(reader_guid))
{
if (publisher_)
{
RTPSParticipant* rtps_participant = publisher_->rtps_participant();
if (rtps_participant &&
rtps_participant->get_subscription_info(subscription_data, reader_guid))
{
ret = RETCODE_OK;
}
}
}

return ret;
}

ReturnCode_t DataWriterImpl::get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const
{
ReturnCode_t ret = RETCODE_ERROR;
std::vector<rtps::GUID_t> matched_reader_guids;
subscription_handles.clear();

if (writer_ && writer_->matched_readers_guids(matched_reader_guids))
{
for (const rtps::GUID_t& guid : matched_reader_guids)
{
subscription_handles.emplace_back(InstanceHandle_t(guid));
}
ret = RETCODE_OK;
}

return ret;
}

bool DataWriterImpl::is_relevant(
const fastdds::rtps::CacheChange_t& change,
const fastdds::rtps::GUID_t& reader_guid) const
Expand Down
25 changes: 25 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,31 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
void filter_is_being_removed(
const char* filter_class_name);

/**
* @brief Retrieves in a subscription associated with the @ref DataWriter
*
* @param[out] subscription_data subscription data struct
* @param subscription_handle @ref InstanceHandle_t of the subscription
* @return @ref RETCODE_BAD_PARAMETER if the DataWriter is not matched with
* the given subscription handle, @ref RETCODE_OK otherwise.
*
*/
ReturnCode_t get_matched_subscription_data(
SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const;

/**
* @brief Fills the given vector with the @ref InstanceHandle_t of matched DataReaders
*
* @param[out] subscription_handles Vector where the @ref InstanceHandle_t are returned
* @return @ref RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns @ref RETCODE_OK.
*
*/
ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const;

/**
* Retrieve the publication data discovery information.
*
Expand Down
13 changes: 2 additions & 11 deletions src/cpp/fastdds/subscriber/DataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,22 +394,13 @@ ReturnCode_t DataReader::get_matched_publication_data(
PublicationBuiltinTopicData& publication_data,
const fastdds::rtps::InstanceHandle_t& publication_handle) const
{
static_cast<void> (publication_data);
static_cast<void> (publication_handle);
return RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_publication_data(publication_data, publication_handle);
*/
return impl_->get_matched_publication_data(publication_data, publication_handle);
}

ReturnCode_t DataReader::get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const
{
static_cast<void> (publication_handles);
return RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_publication_data(publication_handles);
*/
return impl_->get_matched_publications(publication_handles);
}

ReadCondition* DataReader::create_readcondition(
Expand Down
42 changes: 42 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,48 @@ ReturnCode_t DataReaderImpl::get_subscription_matched_status(
return RETCODE_OK;
}

ReturnCode_t DataReaderImpl::get_matched_publication_data(
PublicationBuiltinTopicData& publication_data,
const InstanceHandle_t& publication_handle) const
{
ReturnCode_t ret = RETCODE_BAD_PARAMETER;
fastdds::rtps::GUID_t writer_guid = iHandle2GUID(publication_handle);

if (reader_ && reader_->matched_writer_is_matched(writer_guid))
{
if (subscriber_)
{
RTPSParticipant* rtps_participant = subscriber_->rtps_participant();
if (rtps_participant &&
rtps_participant->get_publication_info(publication_data, writer_guid))
{
ret = RETCODE_OK;
}
}
}

return ret;
}

ReturnCode_t DataReaderImpl::get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const
{
ReturnCode_t ret = RETCODE_ERROR;
std::vector<rtps::GUID_t> matched_writers_guids;
publication_handles.clear();

if (reader_ && reader_->matched_writers_guids(matched_writers_guids))
{
for (const rtps::GUID_t& guid : matched_writers_guids)
{
publication_handles.emplace_back(InstanceHandle_t(guid));
}
ret = RETCODE_OK;
}

return ret;
}

bool DataReaderImpl::deadline_timer_reschedule()
{
assert(qos_.deadline().period != dds::c_TimeInfinite);
Expand Down
25 changes: 25 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,31 @@ class DataReaderImpl
ReturnCode_t get_subscription_matched_status(
SubscriptionMatchedStatus& status);

/**
* @brief Retrieves in a publication associated with the DataWriter
*
* @param[out] publication_data publication data struct
* @param publication_handle @ref InstanceHandle_t of the publication
* @return @ref RETCODE_BAD_PARAMETER if the DataReader is not matched with
* the given publication handle, @ref RETCODE_OK otherwise.
*
*/
ReturnCode_t get_matched_publication_data(
rtps::PublicationBuiltinTopicData& publication_data,
const InstanceHandle_t& publication_handle) const;

/**
* @brief Fills the given vector with the @ref InstanceHandle_t of matched DataReaders
*
* @param[out] publication_handles Vector where the @ref InstanceHandle_t are returned
* @return @ref RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns @ref RETCODE_OK.
*
*/
ReturnCode_t get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const;

ReturnCode_t get_requested_deadline_missed_status(
RequestedDeadlineMissedStatus& status);

Expand Down
14 changes: 14 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ std::vector<TransportNetmaskFilterInfo> RTPSParticipant::get_netmask_filter_info
return mp_impl->get_netmask_filter_info();
}

bool RTPSParticipant::get_publication_info(
PublicationBuiltinTopicData& data,
const GUID_t& writer_guid) const
{
return mp_impl->get_publication_info(data, writer_guid);
}

bool RTPSParticipant::get_subscription_info(
SubscriptionBuiltinTopicData& data,
const GUID_t& reader_guid) const
{
return mp_impl->get_subscription_info(data, reader_guid);
}

#if HAVE_SECURITY

bool RTPSParticipant::is_security_enabled_for_writer(
Expand Down
34 changes: 34 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2791,6 +2791,40 @@ std::vector<TransportNetmaskFilterInfo> RTPSParticipantImpl::get_netmask_filter_
return m_network_Factory.netmask_filter_info();
}

bool RTPSParticipantImpl::get_publication_info(
PublicationBuiltinTopicData& data,
const GUID_t& writer_guid) const
{
bool ret = false;
WriterProxyData wproxy_data(m_att.allocation.locators.max_unicast_locators,
m_att.allocation.locators.max_multicast_locators);

if (mp_builtinProtocols->mp_PDP->lookupWriterProxyData(writer_guid, wproxy_data))
{
from_proxy_to_builtin(wproxy_data, data);
ret = true;
}

return ret;
}

bool RTPSParticipantImpl::get_subscription_info(
SubscriptionBuiltinTopicData& data,
const GUID_t& reader_guid) const
{
bool ret = false;
ReaderProxyData rproxy_data(m_att.allocation.locators.max_unicast_locators,
m_att.allocation.locators.max_multicast_locators);

if (mp_builtinProtocols->mp_PDP->lookupReaderProxyData(reader_guid, rproxy_data))
{
from_proxy_to_builtin(rproxy_data, data);
ret = true;
}

return ret;
}

#ifdef FASTDDS_STATISTICS

bool RTPSParticipantImpl::register_in_writer(
Expand Down
24 changes: 24 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,30 @@ class RTPSParticipantImpl
*/
std::vector<TransportNetmaskFilterInfo> get_netmask_filter_info() const;

/**
* @brief Fills the provided @ref PublicationBuiltinTopicData with the information of the
* writer identified by writer_guid.
*
* @param[out] data @ref PublicationBuiltinTopicData to fill.
* @param[in] writer_guid GUID of the writer to get the information from.
* @return True if the writer was found and the data was filled.
*/
bool get_publication_info(
PublicationBuiltinTopicData& data,
const GUID_t& writer_guid) const;

/**
* @brief Fills the provided @ref SubscriptionBuiltinTopicData with the information of the
* reader identified by reader_guid.
*
* @param[out] data @ref SubscriptionBuiltinTopicData to fill.
* @param[in] reader_guid GUID of the reader to get the information from.
* @return True if the reader was found and the data was filled.
*/
bool get_subscription_info(
SubscriptionBuiltinTopicData& data,
const GUID_t& reader_guid) const;

template <EndpointKind_t kind, octet no_key, octet with_key>
static bool preprocess_endpoint_attributes(
const EntityId_t& entity_id,
Expand Down
13 changes: 13 additions & 0 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,19 @@ void StatefulReader::end_sample_access_nts(
}
}

bool StatefulReader::matched_writers_guids(
std::vector<GUID_t>& guids) const
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
guids.clear();
guids.reserve(matched_writers_.size());
for (WriterProxy* writer : matched_writers_)
{
guids.emplace_back(writer->guid());
}
return true;
}

#ifdef FASTDDS_STATISTICS

bool StatefulReader::get_connections(
Expand Down
Loading

0 comments on commit 4b00904

Please sign in to comment.