Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify status changes [11835] #2030

Merged
merged 9 commits into from
Jul 5, 2021
9 changes: 1 addition & 8 deletions include/fastdds/dds/core/Entity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class Entity
RTPS_DllAPI Entity(
const StatusMask& mask = StatusMask::all())
: status_mask_(mask)
, status_changes_(StatusMask::none())
, status_condition_(this)
, enable_(false)
{
Expand Down Expand Up @@ -92,10 +91,7 @@ class Entity
*
* @return const reference to the StatusMask with the triggered statuses set to 1
*/
RTPS_DllAPI const StatusMask& get_status_changes() const
{
return status_changes_;
}
RTPS_DllAPI const StatusMask& get_status_changes() const;

/**
* @brief Retrieves the instance handler that represents the Entity
Expand Down Expand Up @@ -145,9 +141,6 @@ class Entity
//! StatusMask with relevant statuses set to 1
StatusMask status_mask_;

//! StatusMask with triggered statuses set to 1
StatusMask status_changes_;

//! Condition associated to the Entity
StatusCondition status_condition_;

Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/dds/core/condition/StatusCondition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class StatusCondition : public Condition
*/
RTPS_DllAPI Entity* get_entity() const;

detail::StatusConditionImpl* get_impl()
detail::StatusConditionImpl* get_impl() const
{
return impl_.get();
}
Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ set(${PROJECT_NAME}_source_files
dynamic-types/DynamicDataHelper.cpp

fastrtps_deprecated/attributes/TopicAttributes.cpp
fastdds/core/Entity.cpp
fastdds/core/condition/Condition.cpp
fastdds/core/condition/ConditionNotifier.cpp
fastdds/core/condition/GuardCondition.cpp
Expand Down
35 changes: 35 additions & 0 deletions src/cpp/fastdds/core/Entity.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file Entity.cpp
*
*/

#include <fastdds/dds/core/Entity.hpp>

#include <fastdds/core/condition/StatusConditionImpl.hpp>

namespace eprosima {
namespace fastdds {
namespace dds {

const StatusMask& Entity::get_status_changes() const
{
return status_condition_.get_impl()->get_raw_status();
}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
9 changes: 9 additions & 0 deletions src/cpp/fastdds/core/condition/StatusConditionImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ struct StatusConditionImpl
*/
const StatusMask& get_enabled_statuses() const;

/**
* @brief Retrieves the list of communication statuses that are currently triggered.
* @return Triggered status.
*/
const StatusMask& get_raw_status() const
{
return status_;
}

/**
* @brief Set the trigger value of a specific status
* @param status The status for which to change the trigger value
Expand Down
6 changes: 1 addition & 5 deletions src/cpp/fastdds/publisher/DataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,7 @@ ReturnCode_t DataWriter::get_offered_incompatible_qos_status(
ReturnCode_t DataWriter::get_publication_matched_status(
PublicationMatchedStatus& status) const
{
static_cast<void> (status);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_publication_matched_status(status);
*/
return impl_->get_publication_matched_status(status);
}

ReturnCode_t DataWriter::get_liveliness_lost_status(
Expand Down
125 changes: 92 additions & 33 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,37 @@
#include <fastrtps/config.h>

#include <fastdds/publisher/DataWriterImpl.hpp>

#include <functional>
#include <iostream>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastrtps/attributes/TopicAttributes.h>
#include <fastdds/publisher/PublisherImpl.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/PublisherListener.hpp>

#include <fastdds/rtps/RTPSDomain.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/resources/ResourceEvent.h>
#include <fastdds/rtps/resources/TimedEvent.h>
#include <fastdds/rtps/writer/RTPSWriter.h>
#include <fastdds/rtps/writer/StatefulWriter.h>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/RTPSDomain.h>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/publisher/PublisherImpl.hpp>
#include <fastrtps/attributes/TopicAttributes.h>
#include <fastrtps/utils/TimeConversion.h>
#include <fastdds/rtps/resources/ResourceEvent.h>
#include <fastdds/rtps/resources/TimedEvent.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>

#include <fastdds/core/condition/StatusConditionImpl.hpp>
#include <fastdds/core/policy/ParameterSerializer.hpp>
#include <fastdds/core/policy/QosPolicyUtils.hpp>

#include <rtps/history/TopicPayloadPoolRegistry.hpp>
#include <rtps/DataSharing/DataSharingPayloadPool.hpp>
#include <rtps/participant/RTPSParticipantImpl.h>

#include <functional>
#include <iostream>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace std::chrono;
Expand Down Expand Up @@ -949,19 +951,16 @@ void DataWriterImpl::InnerDataWriterListener::onWriterMatched(
RTPSWriter* /*writer*/,
const PublicationMatchedStatus& info)
{
DataWriterListener* listener = data_writer_->get_listener_for(StatusMask::publication_matched());
if (listener != nullptr)
{
listener->on_publication_matched(data_writer_->user_datawriter_, info);
}
data_writer_->update_publication_matched_status(info);
}

void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos(
RTPSWriter* /*writer*/,
fastdds::dds::PolicyMask qos)
{
data_writer_->update_offered_incompatible_qos(qos);
DataWriterListener* listener = data_writer_->get_listener_for(StatusMask::offered_incompatible_qos());
StatusMask notify_status = StatusMask::offered_incompatible_qos();
DataWriterListener* listener = data_writer_->get_listener_for(notify_status);
if (listener != nullptr)
{
OfferedIncompatibleQosStatus callback_status;
Expand All @@ -970,6 +969,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos(
listener->on_offered_incompatible_qos(data_writer_->user_datawriter_, callback_status);
}
}
data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
}

void DataWriterImpl::InnerDataWriterListener::onWriterChangeReceivedByAll(
Expand All @@ -992,12 +992,14 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost(
fastrtps::rtps::RTPSWriter* /*writer*/,
const fastrtps::LivelinessLostStatus& status)
{
DataWriterListener* listener = data_writer_->get_listener_for(StatusMask::liveliness_lost());
StatusMask notify_status = StatusMask::liveliness_lost();
DataWriterListener* listener = data_writer_->get_listener_for(notify_status);
if (listener != nullptr)
{
listener->on_liveliness_lost(
data_writer_->user_datawriter_, status);
}
data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
}

ReturnCode_t DataWriterImpl::wait_for_acknowledgments(
Expand All @@ -1015,6 +1017,50 @@ ReturnCode_t DataWriterImpl::wait_for_acknowledgments(
return ReturnCode_t::RETCODE_ERROR;
}

void DataWriterImpl::update_publication_matched_status(
const PublicationMatchedStatus& status)
{
auto count_change = status.current_count_change;
publication_matched_status_.current_count += count_change;
publication_matched_status_.current_count_change += count_change;
if (count_change > 0)
{
publication_matched_status_.total_count += count_change;
publication_matched_status_.total_count_change += count_change;
publication_matched_status_.last_subscription_handle = status.last_subscription_handle;
}

StatusMask notify_status = StatusMask::publication_matched();
DataWriterListener* listener = get_listener_for(notify_status);
if (listener != nullptr)
{
listener->on_publication_matched(user_datawriter_, publication_matched_status_);
publication_matched_status_.current_count_change = 0;
publication_matched_status_.total_count_change = 0;
}
user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
}

ReturnCode_t DataWriterImpl::get_publication_matched_status(
PublicationMatchedStatus& status)
{
if (writer_ == nullptr)
{
return ReturnCode_t::RETCODE_NOT_ENABLED;
}

{
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

status = publication_matched_status_;
publication_matched_status_.current_count_change = 0;
publication_matched_status_.total_count_change = 0;
}

user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::publication_matched(), false);
return ReturnCode_t::RETCODE_OK;
}

bool DataWriterImpl::deadline_timer_reschedule()
{
assert(qos_.deadline().period != c_TimeInfinite);
Expand Down Expand Up @@ -1042,12 +1088,14 @@ bool DataWriterImpl::deadline_missed()
deadline_missed_status_.total_count++;
deadline_missed_status_.total_count_change++;
deadline_missed_status_.last_instance_handle = timer_owner_;
if (listener_ != nullptr)
StatusMask notify_status = StatusMask::offered_deadline_missed();
auto listener = get_listener_for(notify_status);
if (nullptr != listener)
{
listener_->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
deadline_missed_status_.total_count_change = 0;
}
publisher_->publisher_listener_.on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
deadline_missed_status_.total_count_change = 0;
user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);

if (!history_.set_next_deadline(
timer_owner_,
Expand All @@ -1067,10 +1115,14 @@ ReturnCode_t DataWriterImpl::get_offered_deadline_missed_status(
return ReturnCode_t::RETCODE_NOT_ENABLED;
}

std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
{
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

status = deadline_missed_status_;
deadline_missed_status_.total_count_change = 0;
}

status = deadline_missed_status_;
deadline_missed_status_.total_count_change = 0;
user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_deadline_missed(), false);
return ReturnCode_t::RETCODE_OK;
}

Expand All @@ -1082,10 +1134,14 @@ ReturnCode_t DataWriterImpl::get_offered_incompatible_qos_status(
return ReturnCode_t::RETCODE_NOT_ENABLED;
}

std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
{
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

status = offered_incompatible_qos_status_;
offered_incompatible_qos_status_.total_count_change = 0u;
}

status = offered_incompatible_qos_status_;
offered_incompatible_qos_status_.total_count_change = 0u;
user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_incompatible_qos(), false);
return ReturnCode_t::RETCODE_OK;
}

Expand Down Expand Up @@ -1139,13 +1195,16 @@ ReturnCode_t DataWriterImpl::get_liveliness_lost_status(
return ReturnCode_t::RETCODE_NOT_ENABLED;
}

std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
{
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

status.total_count = writer_->liveliness_lost_status_.total_count;
status.total_count_change = writer_->liveliness_lost_status_.total_count_change;
status.total_count = writer_->liveliness_lost_status_.total_count;
status.total_count_change = writer_->liveliness_lost_status_.total_count_change;

writer_->liveliness_lost_status_.total_count_change = 0u;
writer_->liveliness_lost_status_.total_count_change = 0u;
}

user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::liveliness_lost(), false);
return ReturnCode_t::RETCODE_OK;
}

Expand Down
9 changes: 9 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ class DataWriterImpl
ReturnCode_t wait_for_acknowledgments(
const fastrtps::Duration_t& max_wait);

ReturnCode_t get_publication_matched_status(
PublicationMatchedStatus& status);

ReturnCode_t get_offered_deadline_missed_status(
fastrtps::OfferedDeadlineMissedStatus& status);

Expand Down Expand Up @@ -343,6 +346,9 @@ class DataWriterImpl
//! The current timer owner, i.e. the instance which started the deadline timer
InstanceHandle_t timer_owner_;

//! The publication matched status
PublicationMatchedStatus publication_matched_status_;

//! The offered deadline missed status
fastrtps::OfferedDeadlineMissedStatus deadline_missed_status_;

Expand Down Expand Up @@ -414,6 +420,9 @@ class DataWriterImpl
*/
bool remove_min_seq_change();

void update_publication_matched_status(
const PublicationMatchedStatus& status);

/**
* @brief A method called when an instance misses the deadline
*/
Expand Down
6 changes: 1 addition & 5 deletions src/cpp/fastdds/subscriber/DataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,7 @@ ReturnCode_t DataReader::get_sample_rejected_status(
ReturnCode_t DataReader::get_subscription_matched_status(
SubscriptionMatchedStatus& status) const
{
static_cast<void> (status);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_subscription_matched_status(status);
*/
return impl_->get_subscription_matched_status(status);
}

ReturnCode_t DataReader::get_matched_publication_data(
Expand Down
Loading