Skip to content

Commit

Permalink
Statistics link between DDS and RTPS layers (#1924)
Browse files Browse the repository at this point in the history
* MOCK_METHOD->MOCK_METHOD1

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Added empty DomainParticipantStatisticsListener.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Fixed DataWriter header guard.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Added empty DomainParticipantStatisticsListenerTests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Added implementation of DomainParticipantStatisticsListenerTests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Added implementation of DomainParticipantStatisticsListener.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Added simple blackbox test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Join transform and check topic name.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Add listener.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Link / unlink statistics writer to listener.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Proxy methods to create PublisherImpl and SubscriberImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Alias namespace.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Basic statistics PublisherImpl and SubscriberImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Adressed partial review.

* Refs 11273. Proxy method to create DataWriterImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. PublisherImpl creates statistics DataWriterImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Add / remove listener on DataWriterImpl enable / disable.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Blackbox test waits for data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Extend blackbox test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Proxy method to create DataReaderImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. SubscriberImpl creates statistics DataReaderImpl

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Add / remove listener on DataReaderImpl enable / disable.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Added DomainParticipantImpl::is_statistics_topic_name

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Do not add listeners to builtin topics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Link listener to participant.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Avoid ABBA on listener

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Avoid ABBA on StatisticsBase.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Improving test

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Add sender_guid to participant sendSync.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Utilities for statistics builtin GUIDs.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Method on_rtps_sent avoids statistics builtin endpoints.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. PublisherImpl method to create DataWriter from DataWriterImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. DomainParticipantImpl keeps builtin_publisher_impl_

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Proxy method to create RTPS writer.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Keep entity_id on DataWriterImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Added API on RTPSDomain to create writer with specific entity id.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Statistics RTPS writers created with specific entity_id.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Enable RTPS_SENT on blackbox test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Fix alignment issue on Entity2LocatorTrafficPubSubType.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Fix unused variable warning.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Fixed build with statistics disabled.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Header suggestions on sources.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: José Luis Bueno López <69244257+JLBuenoLopez-eProsima@users.noreply.github.com>

* Refs 11273. Protect against double enable.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Fixed warning on Windows.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Custom entity id indicates keyed writer.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #11273: fix DomainParticipantImpl mock class

Signed-off-by: JLBuenoLopez-eProsima <joseluisbueno@eprosima.com>

* Refs 11273. Make RTPSStatisticsTests compliant with old versions of gmock.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11273. Check on delete_statistics_builtin_entities.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #11273: fix mock statistics tests

Signed-off-by: JLBuenoLopez-eProsima <joseluisbueno@eprosima.com>

* Refs #11273: linters

Signed-off-by: JLBuenoLopez-eProsima <joseluisbueno@eprosima.com>

Co-authored-by: José Luis Bueno López <69244257+JLBuenoLopez-eProsima@users.noreply.github.com>
Co-authored-by: JLBuenoLopez-eProsima <joseluisbueno@eprosima.com>
  • Loading branch information
3 people committed Apr 28, 2021
1 parent cc0eb74 commit 153b1ab
Show file tree
Hide file tree
Showing 45 changed files with 1,738 additions and 214 deletions.
6 changes: 3 additions & 3 deletions include/fastdds/dds/publisher/DataWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
* @file DataWriter.hpp
*/

#ifndef _FASTRTPS_DATAWRITER_HPP_
#define _FASTRTPS_DATAWRITER_HPP_
#ifndef _FASTDDS_DDS_PUBLISHER_DATAWRITER_HPP_
#define _FASTDDS_DDS_PUBLISHER_DATAWRITER_HPP_

#include <fastdds/dds/builtin/topic/SubscriptionBuiltinTopicData.hpp>
#include <fastdds/dds/core/Entity.hpp>
Expand Down Expand Up @@ -550,4 +550,4 @@ class DataWriter : public DomainEntity
} /* namespace fastdds */
} /* namespace eprosima */

#endif //_FASTRTPS_DATAWRITER_HPP_
#endif // _FASTDDS_DDS_PUBLISHER_DATAWRITER_HPP_
18 changes: 18 additions & 0 deletions include/fastdds/rtps/RTPSDomain.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ class RTPSDomain
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Create a RTPSWriter in a participant.
* @param p Pointer to the RTPSParticipant.
* @param entity_id Specific entity id to use for the created writer.
* @param watt Writer Attributes.
* @param payload_pool Shared pointer to the IPayloadPool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @return Pointer to the created RTPSWriter.
*/
RTPS_DllAPI static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
const EntityId_t& entity_id,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Remove a RTPSWriter.
* @param writer Pointer to the writer you want to remove.
Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ if (FASTDDS_STATISTICS)

set(statistics_sources
statistics/fastdds/domain/DomainParticipantImpl.cpp
statistics/fastdds/domain/DomainParticipantStatisticsListener.cpp
statistics/rtps/StatisticsBase.cpp
statistics/rtps/reader/StatisticsReaderImpl.cpp
statistics/rtps/writer/StatisticsWriterImpl.cpp
Expand Down
18 changes: 16 additions & 2 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ Publisher* DomainParticipantImpl::create_publisher(
}

//TODO CONSTRUIR LA IMPLEMENTACION DENTRO DEL OBJETO DEL USUARIO.
PublisherImpl* pubimpl = new PublisherImpl(this, qos, listener);
PublisherImpl* pubimpl = create_publisher_impl(qos, listener);
Publisher* pub = new Publisher(pubimpl, mask);
pubimpl->user_publisher_ = pub;
pubimpl->rtps_participant_ = rtps_participant_;
Expand Down Expand Up @@ -517,6 +517,13 @@ Publisher* DomainParticipantImpl::create_publisher_with_profile(
return nullptr;
}

PublisherImpl* DomainParticipantImpl::create_publisher_impl(
const PublisherQos& qos,
PublisherListener* listener)
{
return new PublisherImpl(this, qos, listener);
}

/* TODO
Subscriber* DomainParticipantImpl::get_builtin_subscriber()
{
Expand Down Expand Up @@ -872,7 +879,7 @@ Subscriber* DomainParticipantImpl::create_subscriber(
}

//TODO CONSTRUIR LA IMPLEMENTACION DENTRO DEL OBJETO DEL USUARIO.
SubscriberImpl* subimpl = new SubscriberImpl(this, qos, listener);
SubscriberImpl* subimpl = create_subscriber_impl(qos, listener);
Subscriber* sub = new Subscriber(subimpl, mask);
subimpl->user_subscriber_ = sub;
subimpl->rtps_participant_ = this->rtps_participant_;
Expand Down Expand Up @@ -918,6 +925,13 @@ Subscriber* DomainParticipantImpl::create_subscriber_with_profile(
return nullptr;
}

SubscriberImpl* DomainParticipantImpl::create_subscriber_impl(
const SubscriberQos& qos,
SubscriberListener* listener)
{
return new SubscriberImpl(this, qos, listener);
}

Topic* DomainParticipantImpl::create_topic(
const std::string& topic_name,
const std::string& type_name,
Expand Down
10 changes: 9 additions & 1 deletion src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class DomainParticipantImpl
std::function<void(const std::string& name, const fastrtps::types::DynamicType_ptr type)>& callback);

//! Remove all listeners in the hierarchy to allow a quiet destruction
void disable();
virtual void disable();

/**
* This method checks if the DomainParticipant has created an entity that has not been
Expand Down Expand Up @@ -531,6 +531,14 @@ class DomainParticipantImpl
const fastrtps::rtps::SampleIdentity& requestId,
const fastrtps::types::TypeIdentifierWithSizeSeq& dependencies);

virtual PublisherImpl* create_publisher_impl(
const PublisherQos& qos,
PublisherListener* listener);

virtual SubscriberImpl* create_subscriber_impl(
const SubscriberQos& qos,
SubscriberListener* listener);

// Always call it with the mutex already taken
void remove_parent_request(
const fastrtps::rtps::SampleIdentity& request);
Expand Down
12 changes: 11 additions & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ DataWriterImpl::DataWriterImpl(
{
}

fastrtps::rtps::RTPSWriter* DataWriterImpl::create_rtps_writer(
fastrtps::rtps::RTPSParticipant* p,
fastrtps::rtps::WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
fastrtps::rtps::WriterHistory* hist,
fastrtps::rtps::WriterListener* listen)
{
return RTPSDomain::createRTPSWriter(p, watt, payload_pool, hist, listen);
}

ReturnCode_t DataWriterImpl::enable()
{
assert(writer_ == nullptr);
Expand Down Expand Up @@ -230,7 +240,7 @@ ReturnCode_t DataWriterImpl::enable()
return ReturnCode_t::RETCODE_ERROR;
}

RTPSWriter* writer = RTPSDomain::createRTPSWriter(
RTPSWriter* writer = create_rtps_writer(
publisher_->rtps_participant(),
w_att, pool,
static_cast<WriterHistory*>(&history_),
Expand Down
11 changes: 9 additions & 2 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class DataWriterImpl

virtual ~DataWriterImpl();

ReturnCode_t enable();
virtual ReturnCode_t enable();

ReturnCode_t check_delete_preconditions();

Expand Down Expand Up @@ -253,7 +253,7 @@ class DataWriterImpl
ReturnCode_t assert_liveliness();

//! Remove all listeners in the hierarchy to allow a quiet destruction
void disable();
virtual void disable();

/**
* Removes all changes from the History.
Expand Down Expand Up @@ -365,6 +365,13 @@ class DataWriterImpl

std::unique_ptr<LoanCollection> loans_;

virtual fastrtps::rtps::RTPSWriter* create_rtps_writer(
fastrtps::rtps::RTPSParticipant* p,
fastrtps::rtps::WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
fastrtps::rtps::WriterHistory* hist,
fastrtps::rtps::WriterListener* listen);

/**
*
* @param kind
Expand Down
25 changes: 18 additions & 7 deletions src/cpp/fastdds/publisher/PublisherImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ void PublisherImpl::PublisherWriterListener::on_offered_deadline_missed(
}
}

DataWriterImpl* PublisherImpl::create_datawriter_impl(
const TypeSupport& type,
Topic* topic,
const DataWriterQos& qos,
DataWriterListener* listener)
{
return new DataWriterImpl(this, type, topic, qos, listener);
}

DataWriter* PublisherImpl::create_datawriter(
Topic* topic,
const DataWriterQos& qos,
Expand All @@ -256,14 +265,16 @@ DataWriter* PublisherImpl::create_datawriter(
return nullptr;
}

topic->get_impl()->reference();
DataWriterImpl* impl = create_datawriter_impl(type_support, topic, qos, listener);
return create_datawriter(topic, impl, mask);
}

DataWriterImpl* impl = new DataWriterImpl(
this,
type_support,
topic,
qos,
listener);
DataWriter* PublisherImpl::create_datawriter(
Topic* topic,
DataWriterImpl* impl,
const StatusMask& mask)
{
topic->get_impl()->reference();

DataWriter* writer = new DataWriter(impl, mask);
impl->user_datawriter_ = writer;
Expand Down
11 changes: 11 additions & 0 deletions src/cpp/fastdds/publisher/PublisherImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ class PublisherImpl
ReturnCode_t set_listener(
PublisherListener* listener);

DataWriter* create_datawriter(
Topic* topic,
DataWriterImpl* impl,
const StatusMask& mask);

DataWriter* create_datawriter(
Topic* topic,
const DataWriterQos& qos,
Expand Down Expand Up @@ -240,6 +245,12 @@ class PublisherImpl

fastrtps::rtps::InstanceHandle_t handle_;

virtual DataWriterImpl* create_datawriter_impl(
const TypeSupport& type,
Topic* topic,
const DataWriterQos& qos,
DataWriterListener* listener);

static void set_qos(
PublisherQos& to,
const PublisherQos& from,
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static bool qos_has_specific_locators(

DataReaderImpl::DataReaderImpl(
SubscriberImpl* s,
TypeSupport& type,
const TypeSupport& type,
TopicDescription* topic,
const DataReaderQos& qos,
DataReaderListener* listener)
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class DataReaderImpl
*/
DataReaderImpl(
SubscriberImpl* s,
TypeSupport& type,
const TypeSupport& type,
TopicDescription* topic,
const DataReaderQos& qos,
DataReaderListener* listener = nullptr);
Expand All @@ -102,7 +102,7 @@ class DataReaderImpl

virtual ~DataReaderImpl();

ReturnCode_t enable();
virtual ReturnCode_t enable();

bool can_be_deleted() const;

Expand Down Expand Up @@ -261,7 +261,7 @@ class DataReaderImpl
*/

//! Remove all listeners in the hierarchy to allow a quiet destruction
void disable();
virtual void disable();

/* Check whether values in the DataReaderQos are compatible among them or not
* @return True if correct.
Expand Down
17 changes: 10 additions & 7 deletions src/cpp/fastdds/subscriber/SubscriberImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ ReturnCode_t SubscriberImpl::set_listener(
return ReturnCode_t::RETCODE_OK;
}

DataReaderImpl* SubscriberImpl::create_datareader_impl(
const TypeSupport& type,
TopicDescription* topic,
const DataReaderQos& qos,
DataReaderListener* listener)
{
return new DataReaderImpl(this, type, topic, qos, listener);
}

DataReader* SubscriberImpl::create_datareader(
TopicDescription* topic,
const DataReaderQos& qos,
Expand All @@ -226,13 +235,7 @@ DataReader* SubscriberImpl::create_datareader(

topic->get_impl()->reference();

DataReaderImpl* impl = new DataReaderImpl(
this,
type_support,
topic,
qos,
listener);

DataReaderImpl* impl = create_datareader_impl(type_support, topic, qos, listener);
DataReader* reader = new DataReader(impl, mask);
impl->user_datareader_ = reader;

Expand Down
5 changes: 5 additions & 0 deletions src/cpp/fastdds/subscriber/SubscriberImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ class SubscriberImpl

fastrtps::rtps::InstanceHandle_t handle_;

virtual DataReaderImpl* create_datareader_impl(
const TypeSupport& type,
TopicDescription* topic,
const DataReaderQos& qos,
DataReaderListener* listener);
};

} /* namespace dds */
Expand Down
21 changes: 21 additions & 0 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,27 @@ RTPSWriter* RTPSDomain::createRTPSWriter(
return nullptr;
}

RTPSWriter* RTPSDomain::createRTPSWriter(
RTPSParticipant* p,
const EntityId_t& entity_id,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
WriterHistory* hist,
WriterListener* listen)
{
RTPSParticipantImpl* impl = RTPSDomainImpl::find_local_participant(p->getGuid());
if (impl)
{
RTPSWriter* ret_val = nullptr;
if (impl->createWriter(&ret_val, watt, payload_pool, hist, listen, entity_id))
{
return ret_val;
}
}

return nullptr;
}

bool RTPSDomain::removeRTPSWriter(
RTPSWriter* writer)
{
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/builtin/data/WriterProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -992,11 +992,11 @@ bool WriterProxyData::readFromCDRMessage(
{
if (ParameterList::readParameterListfromCDRMsg(*msg, param_process, true, qos_size))
{
if (m_guid.entityId.value[3] == 0x03)
if (0x03 == (m_guid.entityId.value[3] & 0x0F))
{
m_topicKind = NO_KEY;
}
else if (m_guid.entityId.value[3] == 0x02)
else if (0x02 == (m_guid.entityId.value[3] & 0x0F))
{
m_topicKind = WITH_KEY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ DirectMessageSender::DirectMessageSender(
for (const GUID_t& guid : *guids)
{
if (std::find(participant_guids_.begin(), participant_guids_.end(), guid.guidPrefix) ==
participant_guids_.end())
participant_guids_.end())
{
participant_guids_.push_back(guid.guidPrefix);
}
Expand Down Expand Up @@ -98,7 +98,8 @@ bool DirectMessageSender::send(
CDRMessage_t* message,
std::chrono::steady_clock::time_point& max_blocking_time_point) const
{
return participant_->sendSync(message, Locators(locators_->begin()), Locators(locators_->end()), max_blocking_time_point);
return participant_->sendSync(message, participant_->getGuid(),
Locators(locators_->begin()), Locators(locators_->end()), max_blocking_time_point);
}

} /* namespace rtps */
Expand Down
Loading

0 comments on commit 153b1ab

Please sign in to comment.