diff --git a/include/fastdds/dds/core/Entity.hpp b/include/fastdds/dds/core/Entity.hpp index 0cbe744248b..96cc0d4f639 100644 --- a/include/fastdds/dds/core/Entity.hpp +++ b/include/fastdds/dds/core/Entity.hpp @@ -45,7 +45,7 @@ class Entity RTPS_DllAPI Entity( const StatusMask& mask = StatusMask::all()) : status_mask_(mask) - , status_changes_(StatusMask::none()) + , status_condition_(this) , enable_(false) { } @@ -91,10 +91,7 @@ class Entity * * @return const reference to the StatusMask with the triggered statuses set to 1 */ - RTPS_DllAPI const StatusMask& get_status_changes() const - { - return status_changes_; - } + RTPS_DllAPI const StatusMask& get_status_changes() const; /** * @brief Retrieves the instance handler that represents the Entity @@ -124,9 +121,8 @@ class Entity * @brief Allows access to the StatusCondition associated with the Entity * @return Reference to StatusCondition object */ - RTPS_DllAPI const StatusCondition& get_statuscondition() const + RTPS_DllAPI StatusCondition& get_statuscondition() { - logWarning(CONDITION, "get_statuscondition method not implemented"); return status_condition_; } @@ -145,9 +141,6 @@ class Entity //! StatusMask with relevant statuses set to 1 StatusMask status_mask_; - //! StatusMask with triggered statuses set to 1 - StatusMask status_changes_; - //! Condition associated to the Entity StatusCondition status_condition_; diff --git a/include/fastdds/dds/core/condition/Condition.hpp b/include/fastdds/dds/core/condition/Condition.hpp index 552cc0ee0a0..2131b581e98 100644 --- a/include/fastdds/dds/core/condition/Condition.hpp +++ b/include/fastdds/dds/core/condition/Condition.hpp @@ -20,14 +20,21 @@ #ifndef _FASTDDS_CONDITION_HPP_ #define _FASTDDS_CONDITION_HPP_ -#include +#include #include + #include +#include namespace eprosima { namespace fastdds { namespace dds { +// Forward declaration of implementation details +namespace detail { +struct ConditionNotifier; +} // namespace detail + /** * @brief The Condition class is the root base class for all the conditions that may be attached to a WaitSet. */ @@ -35,21 +42,30 @@ class Condition { public: - // Condition class not implemented. - /** * @brief Retrieves the trigger_value of the Condition * @return true if trigger_value is set to 'true', 'false' otherwise */ - RTPS_DllAPI bool get_trigger_value() const + RTPS_DllAPI virtual bool get_trigger_value() const { logWarning(CONDITION, "get_trigger_value public member function not implemented"); return false; // TODO return trigger value } + detail::ConditionNotifier* get_notifier() const + { + return notifier_.get(); + } + +protected: + + Condition(); + virtual ~Condition(); + + std::unique_ptr notifier_; }; -typedef std::vector ConditionSeq; +using ConditionSeq = std::vector; } // namespace dds } // namespace fastdds diff --git a/include/fastdds/dds/core/condition/GuardCondition.hpp b/include/fastdds/dds/core/condition/GuardCondition.hpp index 1f6aed02465..1912fb8c92f 100644 --- a/include/fastdds/dds/core/condition/GuardCondition.hpp +++ b/include/fastdds/dds/core/condition/GuardCondition.hpp @@ -20,6 +20,8 @@ #ifndef _FASTDDS_GUARD_CONDITION_HPP_ #define _FASTDDS_GUARD_CONDITION_HPP_ +#include + #include #include #include @@ -43,7 +45,11 @@ class GuardCondition : public Condition { public: - // GuardCondition not implemented. + RTPS_DllAPI GuardCondition(); + + RTPS_DllAPI ~GuardCondition(); + + RTPS_DllAPI bool get_trigger_value() const override; /** * @brief Set the trigger_value @@ -51,15 +57,13 @@ class GuardCondition : public Condition * @return RETURN_OK */ RTPS_DllAPI ReturnCode_t set_trigger_value( - bool value) - { - static_cast(value); - return ReturnCode_t::RETCODE_UNSUPPORTED; - } + bool value); -}; +private: + + std::atomic trigger_value_; -typedef std::vector ConditionSeq; +}; } // namespace dds } // namespace fastdds diff --git a/include/fastdds/dds/core/condition/StatusCondition.hpp b/include/fastdds/dds/core/condition/StatusCondition.hpp index d877fb12f71..a3f5ecb4bad 100644 --- a/include/fastdds/dds/core/condition/StatusCondition.hpp +++ b/include/fastdds/dds/core/condition/StatusCondition.hpp @@ -31,6 +31,12 @@ namespace eprosima { namespace fastdds { namespace dds { +namespace detail { + +struct StatusConditionImpl; + +} // namespace detail + class Entity; /** @@ -41,7 +47,28 @@ class StatusCondition : public Condition { public: - // StatusCondition not implemented. + StatusCondition( + Entity* parent); + + ~StatusCondition() final; + + // Non-copyable + StatusCondition( + const StatusCondition&) = delete; + StatusCondition& operator =( + const StatusCondition&) = delete; + + // Non-movable + StatusCondition( + StatusCondition&&) = delete; + StatusCondition& operator =( + StatusCondition&&) = delete; + + /** + * @brief Retrieves the trigger_value of the Condition + * @return true if trigger_value is set to 'true', 'false' otherwise + */ + RTPS_DllAPI bool get_trigger_value() const override; /** * @brief Defines the list of communication statuses that are taken into account to determine the trigger_value @@ -63,10 +90,18 @@ class StatusCondition : public Condition */ RTPS_DllAPI Entity* get_entity() const; + detail::StatusConditionImpl* get_impl() const + { + return impl_.get(); + } + protected: - //! StatusMask with relevant statuses set to 1 - StatusMask status_mask; + //! DDS Entity for which this condition is monitoring the status + Entity* entity_ = nullptr; + + //! Class implementation + std::unique_ptr impl_; }; diff --git a/include/fastdds/dds/core/condition/WaitSet.hpp b/include/fastdds/dds/core/condition/WaitSet.hpp index a1ab9454bd6..2066aaede7f 100644 --- a/include/fastdds/dds/core/condition/WaitSet.hpp +++ b/include/fastdds/dds/core/condition/WaitSet.hpp @@ -20,6 +20,8 @@ #ifndef _FASTDDS_WAIT_SET_HPP_ #define _FASTDDS_WAIT_SET_HPP_ +#include + #include #include #include @@ -31,6 +33,11 @@ namespace eprosima { namespace fastdds { namespace dds { +// Forward declaration of implementation details +namespace detail { +struct WaitSetImpl; +} // namespace detail + /** * @brief The WaitSet class allows an application to wait until one or more of the attached Condition objects * has a trigger_value of TRUE or until timeout expires. @@ -40,7 +47,18 @@ class WaitSet { public: - // WaitSet class not implemented. + RTPS_DllAPI WaitSet(); + + RTPS_DllAPI ~WaitSet(); + + WaitSet( + const WaitSet&) = delete; + WaitSet( + WaitSet&&) = delete; + WaitSet& operator = ( + const WaitSet&) = delete; + WaitSet& operator = ( + WaitSet&&) = delete; /** * @brief Attaches a Condition to the Wait Set. @@ -79,6 +97,10 @@ class WaitSet */ RTPS_DllAPI ReturnCode_t get_conditions( ConditionSeq& attached_conditions) const; + +private: + + std::unique_ptr impl_; }; } // namespace dds diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index ebafef49a53..1ff26c41486 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -154,8 +154,14 @@ set(${PROJECT_NAME}_source_files dynamic-types/DynamicDataHelper.cpp fastrtps_deprecated/attributes/TopicAttributes.cpp + fastdds/core/Entity.cpp + fastdds/core/condition/Condition.cpp + fastdds/core/condition/ConditionNotifier.cpp + fastdds/core/condition/GuardCondition.cpp fastdds/core/condition/StatusCondition.cpp + fastdds/core/condition/StatusConditionImpl.cpp fastdds/core/condition/WaitSet.cpp + fastdds/core/condition/WaitSetImpl.cpp fastdds/core/policy/ParameterList.cpp fastdds/core/policy/QosPolicyUtils.cpp fastdds/publisher/qos/WriterQos.cpp diff --git a/src/cpp/fastdds/core/Entity.cpp b/src/cpp/fastdds/core/Entity.cpp new file mode 100644 index 00000000000..f6b8d4adcb6 --- /dev/null +++ b/src/cpp/fastdds/core/Entity.cpp @@ -0,0 +1,35 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file Entity.cpp + * + */ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { + +const StatusMask& Entity::get_status_changes() const +{ + return status_condition_.get_impl()->get_raw_status(); +} + +} // namespace dds +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/fastdds/core/condition/Condition.cpp b/src/cpp/fastdds/core/condition/Condition.cpp new file mode 100644 index 00000000000..89601d59872 --- /dev/null +++ b/src/cpp/fastdds/core/condition/Condition.cpp @@ -0,0 +1,39 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file Condition.cpp + */ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { + +Condition::Condition() + : notifier_ (new detail::ConditionNotifier()) +{ +} + +Condition::~Condition() +{ + notifier_->will_be_deleted(*this); +} + +} // namespace dds +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/fastdds/core/condition/ConditionNotifier.cpp b/src/cpp/fastdds/core/condition/ConditionNotifier.cpp new file mode 100644 index 00000000000..0b7159d539f --- /dev/null +++ b/src/cpp/fastdds/core/condition/ConditionNotifier.cpp @@ -0,0 +1,75 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ConditionNotifier.cpp + */ + +#include "ConditionNotifier.hpp" + +#include + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +void ConditionNotifier::attach_to ( + WaitSetImpl* wait_set) +{ + if (nullptr != wait_set) + { + std::lock_guard guard(mutex_); + entries_.remove(wait_set); + entries_.emplace_back(wait_set); + } +} + +void ConditionNotifier::detach_from ( + WaitSetImpl* wait_set) +{ + if (nullptr != wait_set) + { + std::lock_guard guard(mutex_); + entries_.remove(wait_set); + } +} + +void ConditionNotifier::notify () +{ + std::lock_guard guard(mutex_); + for (WaitSetImpl* wait_set : entries_) + { + wait_set->wake_up(); + } +} + +void ConditionNotifier::will_be_deleted ( + const Condition& condition) +{ + std::lock_guard guard(mutex_); + for (WaitSetImpl* wait_set : entries_) + { + wait_set->will_be_deleted(condition); + } +} + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/fastdds/core/condition/ConditionNotifier.hpp b/src/cpp/fastdds/core/condition/ConditionNotifier.hpp new file mode 100644 index 00000000000..f188bd8787a --- /dev/null +++ b/src/cpp/fastdds/core/condition/ConditionNotifier.hpp @@ -0,0 +1,78 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ConditionNotifier.hpp + */ + +#ifndef _FASTDDS_CORE_CONDITION_CONDITIONNOTIFIER_HPP_ +#define _FASTDDS_CORE_CONDITION_CONDITIONNOTIFIER_HPP_ + +#include + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +struct WaitSetImpl; + +struct ConditionNotifier +{ + /** + * Add a WaitSet implementation to the list of attached entries. + * Does nothing if wait_set was already attached to this notifier. + * @param wait_set WaitSet implementation to add to the list. + */ + void attach_to ( + WaitSetImpl* wait_set); + + + /** + * Remove a WaitSet implementation from the list of attached entries. + * Does nothing if wait_set was not attached to this notifier. + * @param wait_set WaitSet implementation to remove from the list. + */ + void detach_from ( + WaitSetImpl* wait_set); + + /** + * Wake up all the WaitSet implementations attached to this notifier. + */ + void notify (); + + /** + * Inform all the WaitSet implementations attached to this notifier that + * a condition is going to be deleted. + * @param condition The Condition being deleted. + */ + void will_be_deleted ( + const Condition& condition); + +private: + + std::mutex mutex_; + eprosima::utilities::collections::unordered_vector entries_; +}; + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_CORE_CONDITION_CONDITIONNOTIFIER_HPP_ diff --git a/src/cpp/fastdds/core/condition/GuardCondition.cpp b/src/cpp/fastdds/core/condition/GuardCondition.cpp new file mode 100644 index 00000000000..43eaa8a7313 --- /dev/null +++ b/src/cpp/fastdds/core/condition/GuardCondition.cpp @@ -0,0 +1,55 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file GuardCondition.cpp + */ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { + +GuardCondition::GuardCondition() + : trigger_value_(false) +{ +} + +GuardCondition::~GuardCondition() +{ +} + +bool GuardCondition::get_trigger_value() const +{ + return trigger_value_.load(); +} + +ReturnCode_t GuardCondition::set_trigger_value( + bool value) +{ + bool old_value = trigger_value_.exchange(value); + if (!old_value && value) + { + notifier_->notify(); + } + + return ReturnCode_t::RETCODE_OK; +} + +} // namespace dds +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/fastdds/core/condition/StatusCondition.cpp b/src/cpp/fastdds/core/condition/StatusCondition.cpp index 190fa27fa96..f7615b90aaf 100644 --- a/src/cpp/fastdds/core/condition/StatusCondition.cpp +++ b/src/cpp/fastdds/core/condition/StatusCondition.cpp @@ -20,29 +20,45 @@ #include #include +#include + namespace eprosima { namespace fastdds { namespace dds { using eprosima::fastrtps::types::ReturnCode_t; +StatusCondition::StatusCondition( + Entity* parent) + : Condition() + , entity_(parent) + , impl_(new detail::StatusConditionImpl(notifier_.get())) +{ +} + +StatusCondition::~StatusCondition() +{ +} + +bool StatusCondition::get_trigger_value() const +{ + return impl_->get_trigger_value(); +} + ReturnCode_t StatusCondition::set_enabled_statuses( const StatusMask& mask) { - static_cast(mask); - return ReturnCode_t::RETCODE_UNSUPPORTED; + return impl_->set_enabled_statuses(mask); } const StatusMask& StatusCondition::get_enabled_statuses() const { - logWarning(CONDITION, "get_enabled_statuses public member function not implemented"); - return status_mask; + return impl_->get_enabled_statuses(); } Entity* StatusCondition::get_entity() const { - logWarning(CONDITION, "get_entity public member function not implemented"); - return nullptr; + return entity_; } } // namespace dds diff --git a/src/cpp/fastdds/core/condition/StatusConditionImpl.cpp b/src/cpp/fastdds/core/condition/StatusConditionImpl.cpp new file mode 100644 index 00000000000..61f0e6cde2f --- /dev/null +++ b/src/cpp/fastdds/core/condition/StatusConditionImpl.cpp @@ -0,0 +1,103 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file StatusConditionImpl.cpp + */ + +#include "StatusConditionImpl.hpp" + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +StatusConditionImpl::StatusConditionImpl( + ConditionNotifier* notifier) + : mask_(StatusMask::all()) + , status_(StatusMask::none()) + , notifier_(notifier) +{ +} + +StatusConditionImpl::~StatusConditionImpl() +{ +} + +bool StatusConditionImpl::get_trigger_value() const +{ + std::lock_guard guard(mutex_); + return (mask_ & status_).any(); +} + +ReturnCode_t StatusConditionImpl::set_enabled_statuses( + const StatusMask& mask) +{ + bool notify = false; + { + std::lock_guard guard(mutex_); + bool old_trigger = (mask_ & status_).any(); + mask_ = mask; + bool new_trigger = (mask_ & status_).any(); + notify = !old_trigger && new_trigger; + } + + if (notify) + { + notifier_->notify(); + } + return ReturnCode_t::RETCODE_OK; +} + +const StatusMask& StatusConditionImpl::get_enabled_statuses() const +{ + std::lock_guard guard(mutex_); + return mask_; +} + +void StatusConditionImpl::set_status( + const StatusMask& status, + bool trigger_value) +{ + if (trigger_value) + { + bool notify = false; + { + std::lock_guard guard(mutex_); + bool old_trigger = (mask_ & status_).any(); + status_ |= status; + bool new_trigger = (mask_ & status_).any(); + notify = !old_trigger && new_trigger; + } + + if (notify) + { + notifier_->notify(); + } + } + else + { + std::lock_guard guard(mutex_); + status_ &= ~status; + } +} + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/fastdds/core/condition/StatusConditionImpl.hpp b/src/cpp/fastdds/core/condition/StatusConditionImpl.hpp new file mode 100644 index 00000000000..a08c52efaf8 --- /dev/null +++ b/src/cpp/fastdds/core/condition/StatusConditionImpl.hpp @@ -0,0 +1,110 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file StatusConditionImpl.hpp + */ + +#ifndef _FASTDDS_CORE_CONDITION_STATUSCONDITIONIMPL_HPP_ +#define _FASTDDS_CORE_CONDITION_STATUSCONDITIONIMPL_HPP_ + +#include + +#include +#include + +#include + +using eprosima::fastrtps::types::ReturnCode_t; + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +struct StatusConditionImpl +{ + /** + * Construct a StatusConditionImpl object. + * @param notifier @ref ConditionNotifier attatched to this object. + */ + StatusConditionImpl( + ConditionNotifier* notifier); + + ~StatusConditionImpl(); + + // Non-copyable + StatusConditionImpl( + const StatusConditionImpl&) = delete; + StatusConditionImpl& operator =( + const StatusConditionImpl&) = delete; + + // Non-movable + StatusConditionImpl( + StatusConditionImpl&&) = delete; + StatusConditionImpl& operator =( + StatusConditionImpl&&) = delete; + + /** + * @brief Retrieves the trigger_value of the Condition + * @return true if trigger_value is set to 'true', 'false' otherwise + */ + bool get_trigger_value() const; + + /** + * @brief Defines the list of communication statuses that are taken into account to determine the trigger_value + * @param mask defines the mask for the status + * @return RETCODE_OK with everything ok, error code otherwise + */ + ReturnCode_t set_enabled_statuses( + const StatusMask& mask); + + /** + * @brief Retrieves the list of communication statuses that are taken into account to determine the trigger_value + * @return Status set or default status if it has not been set + */ + const StatusMask& get_enabled_statuses() const; + + /** + * @brief Retrieves the list of communication statuses that are currently triggered. + * @return Triggered status. + */ + const StatusMask& get_raw_status() const + { + return status_; + } + + /** + * @brief Set the trigger value of a specific status + * @param status The status for which to change the trigger value + * @param trigger_value Whether the specified status should be set as triggered or non-triggered + */ + void set_status( + const StatusMask& status, + bool trigger_value); + +private: + + mutable std::mutex mutex_; + StatusMask mask_{}; + StatusMask status_{}; + ConditionNotifier* notifier_; +}; + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_CORE_CONDITION_STATUSCONDITIONIMPL_HPP_ diff --git a/src/cpp/fastdds/core/condition/WaitSet.cpp b/src/cpp/fastdds/core/condition/WaitSet.cpp index 89b6def1de7..16b339f3a17 100644 --- a/src/cpp/fastdds/core/condition/WaitSet.cpp +++ b/src/cpp/fastdds/core/condition/WaitSet.cpp @@ -20,40 +20,46 @@ #include #include +#include + namespace eprosima { namespace fastdds { namespace dds { using eprosima::fastrtps::types::ReturnCode_t; +WaitSet::WaitSet() + : impl_(new detail::WaitSetImpl()) +{ +} + +WaitSet::~WaitSet() +{ +} + ReturnCode_t WaitSet::attach_condition( const Condition& cond) { - static_cast(cond); - return ReturnCode_t::RETCODE_UNSUPPORTED; + return impl_->attach_condition(cond); } ReturnCode_t WaitSet::detach_condition( const Condition& cond) { - static_cast(cond); - return ReturnCode_t::RETCODE_UNSUPPORTED; + return impl_->detach_condition(cond); } ReturnCode_t WaitSet::wait( ConditionSeq& active_conditions, const fastrtps::Duration_t timeout) const { - static_cast(active_conditions); - static_cast(timeout); - return ReturnCode_t::RETCODE_UNSUPPORTED; + return impl_->wait(active_conditions, timeout); } ReturnCode_t WaitSet::get_conditions( ConditionSeq& attached_conditions) const { - static_cast(attached_conditions); - return ReturnCode_t::RETCODE_UNSUPPORTED; + return impl_->get_conditions(attached_conditions); } } // namespace dds diff --git a/src/cpp/fastdds/core/condition/WaitSetImpl.cpp b/src/cpp/fastdds/core/condition/WaitSetImpl.cpp new file mode 100644 index 00000000000..f1fb18928d2 --- /dev/null +++ b/src/cpp/fastdds/core/condition/WaitSetImpl.cpp @@ -0,0 +1,156 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file WaitSetImpl.cpp + */ + +#include "WaitSetImpl.hpp" + +#include +#include + +#include +#include +#include + +#include + +using eprosima::fastrtps::types::ReturnCode_t; + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +WaitSetImpl::~WaitSetImpl() +{ + std::lock_guard guard(mutex_); + for (const Condition* c : entries_) + { + c->get_notifier()->detach_from(this); + } +} + +ReturnCode_t WaitSetImpl::attach_condition( + const Condition& condition) +{ + std::lock_guard guard(mutex_); + bool was_there = entries_.remove(&condition); + entries_.emplace_back(&condition); + + if (!was_there) + { + // This is a new condition. Inform the notifier of our interest. + condition.get_notifier()->attach_to(this); + + // Should wake_up when adding a new triggered condition + if (is_waiting_ && condition.get_trigger_value()) + { + wake_up(); + } + } + + return ReturnCode_t::RETCODE_OK; +} + +ReturnCode_t WaitSetImpl::detach_condition( + const Condition& condition) +{ + std::lock_guard guard(mutex_); + bool was_there = entries_.remove(&condition); + + if (was_there) + { + // Inform the notifier we are not interested anymore. + condition.get_notifier()->detach_from(this); + return ReturnCode_t::RETCODE_OK; + } + + // Condition not found + return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; +} + +ReturnCode_t WaitSetImpl::wait( + ConditionSeq& active_conditions, + const fastrtps::Duration_t& timeout) +{ + std::unique_lock lock(mutex_); + + if (is_waiting_) + { + return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; + } + + auto fill_active_conditions = [&]() + { + bool ret_val = false; + active_conditions.clear(); + for (const Condition* c : entries_) + { + if (c->get_trigger_value()) + { + ret_val = true; + active_conditions.push_back(const_cast(c)); + } + } + return ret_val; + }; + + bool condition_value = false; + is_waiting_ = true; + if (fastrtps::c_TimeInfinite == timeout) + { + cond_.wait(lock, fill_active_conditions); + condition_value = true; + } + else + { + auto ns = timeout.to_ns(); + condition_value = cond_.wait_for(lock, std::chrono::nanoseconds(ns), fill_active_conditions); + } + is_waiting_ = false; + + return condition_value ? ReturnCode_t::RETCODE_OK : ReturnCode_t::RETCODE_TIMEOUT; +} + +ReturnCode_t WaitSetImpl::get_conditions( + ConditionSeq& attached_conditions) const +{ + std::lock_guard guard(mutex_); + attached_conditions.reserve(entries_.size()); + attached_conditions.clear(); + for (const Condition* c : entries_) + { + attached_conditions.push_back(const_cast(c)); + } + return ReturnCode_t::RETCODE_OK; +} + +void WaitSetImpl::wake_up() +{ + cond_.notify_one(); +} + +void WaitSetImpl::will_be_deleted ( + const Condition& condition) +{ + std::lock_guard guard(mutex_); + entries_.remove(&condition); +} + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/fastdds/core/condition/WaitSetImpl.hpp b/src/cpp/fastdds/core/condition/WaitSetImpl.hpp new file mode 100644 index 00000000000..d3e5fc187b0 --- /dev/null +++ b/src/cpp/fastdds/core/condition/WaitSetImpl.hpp @@ -0,0 +1,123 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file WaitSetImpl.hpp + */ + +#ifndef _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_ +#define _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_ + +#include +#include + +#include +#include +#include +#include + +using eprosima::fastrtps::types::ReturnCode_t; + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +struct WaitSetImpl +{ + ~WaitSetImpl(); + + WaitSetImpl() = default; + + // Non-copyable + WaitSetImpl( + const WaitSetImpl&) = delete; + WaitSetImpl& operator =( + const WaitSetImpl&) = delete; + + // Non-movable + WaitSetImpl( + WaitSetImpl&&) = delete; + WaitSetImpl& operator =( + WaitSetImpl&&) = delete; + + /** + * @brief Attach a Condition to this WaitSet implementation + * @param condition The Condition to attach to this WaitSet implementation + * @return RETCODE_OK + */ + ReturnCode_t attach_condition( + const Condition& condition); + + /** + * @brief Detach a Condition from this WaitSet implementation + * @param condition The Condition to detach from this WaitSet implementation + * @return RETCODE_OK if detached correctly + * @return PRECONDITION_NOT_MET if condition was not attached + */ + ReturnCode_t detach_condition( + const Condition& condition); + + /** + * @brief Wait for any of the attached conditions to be triggered. + * If none of the conditions attached to this WaitSet implementation have a trigger_value of true, + * this operation will block, suspending the calling thread. + * The list of conditions with a trigger_value of true will be returned on active_conditions. + * It is not possible to call this operation from two different threads at the same time (PRECONDITION_NOT_MET + * will be returned in that case) + * + * @param active_conditions Reference to the collection of conditions that have a trigger_value of true + * @param timeout Maximum time of the wait + * @return RETCODE_OK if everything correct + * @return PRECONDITION_NOT_MET if WaitSet already waiting + * @return TIMEOUT if maximum time expired + */ + ReturnCode_t wait( + ConditionSeq& active_conditions, + const fastrtps::Duration_t& timeout); + + /** + * @brief Retrieve the list of attached conditions + * @param attached_conditions Reference to the collection of attached conditions + * @return RETCODE_OK + */ + ReturnCode_t get_conditions( + ConditionSeq& attached_conditions) const; + + /** + * @brief Wake up this WaitSet implementation if it was waiting + */ + void wake_up(); + + /** + * @brief Called from the destructor of a Condition to inform this WaitSet implementation that the condition + * should be automatically detached. + */ + void will_be_deleted ( + const Condition& condition); + +private: + + mutable std::mutex mutex_; + std::condition_variable cond_; + eprosima::utilities::collections::unordered_vector entries_; + bool is_waiting_ = false; +}; + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_ diff --git a/src/cpp/fastdds/publisher/DataWriter.cpp b/src/cpp/fastdds/publisher/DataWriter.cpp index 0c19d0b7b23..a7b40388f51 100644 --- a/src/cpp/fastdds/publisher/DataWriter.cpp +++ b/src/cpp/fastdds/publisher/DataWriter.cpp @@ -255,11 +255,7 @@ ReturnCode_t DataWriter::get_offered_incompatible_qos_status( ReturnCode_t DataWriter::get_publication_matched_status( PublicationMatchedStatus& status) const { - static_cast (status); - return ReturnCode_t::RETCODE_UNSUPPORTED; - /* - return impl_->get_publication_matched_status(status); - */ + return impl_->get_publication_matched_status(status); } ReturnCode_t DataWriter::get_liveliness_lost_status( diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 44fd5cc3f4d..fe2ef4b28b3 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -19,25 +19,30 @@ #include #include + +#include +#include + +#include +#include #include #include -#include -#include #include #include +#include +#include +#include +#include +#include #include #include -#include -#include -#include - -#include +#include +#include #include -#include -#include -#include + +#include #include #include @@ -45,9 +50,6 @@ #include #include -#include -#include - using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; using namespace std::chrono; @@ -926,11 +928,7 @@ void DataWriterImpl::InnerDataWriterListener::onWriterMatched( RTPSWriter* /*writer*/, const PublicationMatchedStatus& info) { - DataWriterListener* listener = data_writer_->get_listener_for(StatusMask::publication_matched()); - if (listener != nullptr) - { - listener->on_publication_matched(data_writer_->user_datawriter_, info); - } + data_writer_->update_publication_matched_status(info); } void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos( @@ -938,7 +936,8 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos( fastdds::dds::PolicyMask qos) { data_writer_->update_offered_incompatible_qos(qos); - DataWriterListener* listener = data_writer_->get_listener_for(StatusMask::offered_incompatible_qos()); + StatusMask notify_status = StatusMask::offered_incompatible_qos(); + DataWriterListener* listener = data_writer_->get_listener_for(notify_status); if (listener != nullptr) { OfferedIncompatibleQosStatus callback_status; @@ -947,6 +946,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos( listener->on_offered_incompatible_qos(data_writer_->user_datawriter_, callback_status); } } + data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } void DataWriterImpl::InnerDataWriterListener::onWriterChangeReceivedByAll( @@ -969,12 +969,14 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost( fastrtps::rtps::RTPSWriter* /*writer*/, const fastrtps::LivelinessLostStatus& status) { - DataWriterListener* listener = data_writer_->get_listener_for(StatusMask::liveliness_lost()); + StatusMask notify_status = StatusMask::liveliness_lost(); + DataWriterListener* listener = data_writer_->get_listener_for(notify_status); if (listener != nullptr) { listener->on_liveliness_lost( data_writer_->user_datawriter_, status); } + data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } ReturnCode_t DataWriterImpl::wait_for_acknowledgments( @@ -992,6 +994,50 @@ ReturnCode_t DataWriterImpl::wait_for_acknowledgments( return ReturnCode_t::RETCODE_ERROR; } +void DataWriterImpl::update_publication_matched_status( + const PublicationMatchedStatus& status) +{ + auto count_change = status.current_count_change; + publication_matched_status_.current_count += count_change; + publication_matched_status_.current_count_change += count_change; + if (count_change > 0) + { + publication_matched_status_.total_count += count_change; + publication_matched_status_.total_count_change += count_change; + publication_matched_status_.last_subscription_handle = status.last_subscription_handle; + } + + StatusMask notify_status = StatusMask::publication_matched(); + DataWriterListener* listener = get_listener_for(notify_status); + if (listener != nullptr) + { + listener->on_publication_matched(user_datawriter_, publication_matched_status_); + publication_matched_status_.current_count_change = 0; + publication_matched_status_.total_count_change = 0; + } + user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +ReturnCode_t DataWriterImpl::get_publication_matched_status( + PublicationMatchedStatus& status) +{ + if (writer_ == nullptr) + { + return ReturnCode_t::RETCODE_NOT_ENABLED; + } + + { + std::unique_lock lock(writer_->getMutex()); + + status = publication_matched_status_; + publication_matched_status_.current_count_change = 0; + publication_matched_status_.total_count_change = 0; + } + + user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::publication_matched(), false); + return ReturnCode_t::RETCODE_OK; +} + bool DataWriterImpl::deadline_timer_reschedule() { assert(qos_.deadline().period != c_TimeInfinite); @@ -1019,12 +1065,14 @@ bool DataWriterImpl::deadline_missed() deadline_missed_status_.total_count++; deadline_missed_status_.total_count_change++; deadline_missed_status_.last_instance_handle = timer_owner_; - if (listener_ != nullptr) + StatusMask notify_status = StatusMask::offered_deadline_missed(); + auto listener = get_listener_for(notify_status); + if (nullptr != listener) { listener_->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_); + deadline_missed_status_.total_count_change = 0; } - publisher_->publisher_listener_.on_offered_deadline_missed(user_datawriter_, deadline_missed_status_); - deadline_missed_status_.total_count_change = 0; + user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); if (!history_.set_next_deadline( timer_owner_, @@ -1044,10 +1092,14 @@ ReturnCode_t DataWriterImpl::get_offered_deadline_missed_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(writer_->getMutex()); + { + std::unique_lock lock(writer_->getMutex()); + + status = deadline_missed_status_; + deadline_missed_status_.total_count_change = 0; + } - status = deadline_missed_status_; - deadline_missed_status_.total_count_change = 0; + user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_deadline_missed(), false); return ReturnCode_t::RETCODE_OK; } @@ -1059,10 +1111,14 @@ ReturnCode_t DataWriterImpl::get_offered_incompatible_qos_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(writer_->getMutex()); + { + std::unique_lock lock(writer_->getMutex()); + + status = offered_incompatible_qos_status_; + offered_incompatible_qos_status_.total_count_change = 0u; + } - status = offered_incompatible_qos_status_; - offered_incompatible_qos_status_.total_count_change = 0u; + user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_incompatible_qos(), false); return ReturnCode_t::RETCODE_OK; } @@ -1116,13 +1172,16 @@ ReturnCode_t DataWriterImpl::get_liveliness_lost_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(writer_->getMutex()); + { + std::unique_lock lock(writer_->getMutex()); - status.total_count = writer_->liveliness_lost_status_.total_count; - status.total_count_change = writer_->liveliness_lost_status_.total_count_change; + status.total_count = writer_->liveliness_lost_status_.total_count; + status.total_count_change = writer_->liveliness_lost_status_.total_count_change; - writer_->liveliness_lost_status_.total_count_change = 0u; + writer_->liveliness_lost_status_.total_count_change = 0u; + } + user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::liveliness_lost(), false); return ReturnCode_t::RETCODE_OK; } diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.hpp b/src/cpp/fastdds/publisher/DataWriterImpl.hpp index 5a03e1d4f3c..0f48da17aad 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.hpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.hpp @@ -221,6 +221,9 @@ class DataWriterImpl ReturnCode_t wait_for_acknowledgments( const fastrtps::Duration_t& max_wait); + ReturnCode_t get_publication_matched_status( + PublicationMatchedStatus& status); + ReturnCode_t get_offered_deadline_missed_status( fastrtps::OfferedDeadlineMissedStatus& status); @@ -341,6 +344,9 @@ class DataWriterImpl //! The current timer owner, i.e. the instance which started the deadline timer InstanceHandle_t timer_owner_; + //! The publication matched status + PublicationMatchedStatus publication_matched_status_; + //! The offered deadline missed status fastrtps::OfferedDeadlineMissedStatus deadline_missed_status_; @@ -412,6 +418,9 @@ class DataWriterImpl */ bool remove_min_seq_change(); + void update_publication_matched_status( + const PublicationMatchedStatus& status); + /** * @brief A method called when an instance misses the deadline */ diff --git a/src/cpp/fastdds/subscriber/DataReader.cpp b/src/cpp/fastdds/subscriber/DataReader.cpp index ac42b9d35b8..5191715bf6f 100644 --- a/src/cpp/fastdds/subscriber/DataReader.cpp +++ b/src/cpp/fastdds/subscriber/DataReader.cpp @@ -359,11 +359,7 @@ ReturnCode_t DataReader::get_sample_rejected_status( ReturnCode_t DataReader::get_subscription_matched_status( SubscriptionMatchedStatus& status) const { - static_cast (status); - return ReturnCode_t::RETCODE_UNSUPPORTED; - /* - return impl_->get_subscription_matched_status(status); - */ + return impl_->get_subscription_matched_status(status); } ReturnCode_t DataReader::get_matched_publication_data( diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 8f829673a00..7a7d8194894 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -36,6 +36,8 @@ #include #include #include + +#include #include #include @@ -295,6 +297,16 @@ bool DataReaderImpl::wait_for_unread_message( return reader_ ? reader_->wait_for_unread_cache(timeout) : false; } +void DataReaderImpl::set_read_communication_status( + bool trigger_value) +{ + StatusMask notify_status = StatusMask::data_on_readers(); + subscriber_->user_subscriber_->get_statuscondition().get_impl()->set_status(notify_status, trigger_value); + + notify_status = StatusMask::data_available(); + user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, trigger_value); +} + ReturnCode_t DataReaderImpl::check_collection_preconditions_and_calc_max_samples( LoanableCollection& data_values, SampleInfoSeq& sample_infos, @@ -440,6 +452,8 @@ ReturnCode_t DataReaderImpl::read_or_take( return ReturnCode_t::RETCODE_TIMEOUT; } + set_read_communication_status(false); + auto it = history_.lookup_instance(handle, exact_instance); if (!it.first) { @@ -618,6 +632,8 @@ ReturnCode_t DataReaderImpl::read_or_take_next_sample( return ReturnCode_t::RETCODE_TIMEOUT; } + set_read_communication_status(false); + auto it = history_.lookup_instance(HANDLE_NIL, false); if (!it.first) { @@ -774,6 +790,8 @@ void DataReaderImpl::InnerDataReaderListener::onNewCacheChangeAdded( { if (data_reader_->on_new_cache_change_added(change_in)) { + auto user_reader = data_reader_->user_datareader_; + //First check if we can handle with on_data_on_readers SubscriberListener* subscriber_listener = data_reader_->subscriber_->get_listener_for(StatusMask::data_on_readers()); @@ -787,9 +805,11 @@ void DataReaderImpl::InnerDataReaderListener::onNewCacheChangeAdded( DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::data_available()); if (listener != nullptr) { - listener->on_data_available(data_reader_->user_datareader_); + listener->on_data_available(user_reader); } } + + data_reader_->set_read_communication_status(true); } } @@ -797,11 +817,7 @@ void DataReaderImpl::InnerDataReaderListener::onReaderMatched( RTPSReader* /*reader*/, const SubscriptionMatchedStatus& info) { - DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::subscription_matched()); - if (listener != nullptr) - { - listener->on_subscription_matched(data_reader_->user_datareader_, info); - } + data_reader_->update_subscription_matched_status(info); } void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed( @@ -809,7 +825,8 @@ void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed( const fastrtps::LivelinessChangedStatus& status) { data_reader_->update_liveliness_status(status); - DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::liveliness_changed()); + StatusMask notify_status = StatusMask::liveliness_changed(); + DataReaderListener* listener = data_reader_->get_listener_for(notify_status); if (listener != nullptr) { LivelinessChangedStatus callback_status; @@ -818,6 +835,7 @@ void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed( listener->on_liveliness_changed(data_reader_->user_datareader_, callback_status); } } + data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( @@ -825,7 +843,8 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( fastdds::dds::PolicyMask qos) { data_reader_->update_requested_incompatible_qos(qos); - DataReaderListener* listener = data_reader_->get_listener_for(StatusMask::requested_incompatible_qos()); + StatusMask notify_status = StatusMask::requested_incompatible_qos(); + DataReaderListener* listener = data_reader_->get_listener_for(notify_status); if (listener != nullptr) { RequestedIncompatibleQosStatus callback_status; @@ -834,6 +853,7 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( listener->on_requested_incompatible_qos(data_reader_->user_datareader_, callback_status); } } + data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } bool DataReaderImpl::on_new_cache_change_added( @@ -902,6 +922,50 @@ bool DataReaderImpl::on_new_cache_change_added( return true; } +void DataReaderImpl::update_subscription_matched_status( + const SubscriptionMatchedStatus& status) +{ + auto count_change = status.current_count_change; + subscription_matched_status_.current_count += count_change; + subscription_matched_status_.current_count_change += count_change; + if (count_change > 0) + { + subscription_matched_status_.total_count += count_change; + subscription_matched_status_.total_count_change += count_change; + subscription_matched_status_.last_publication_handle = status.last_publication_handle; + } + + StatusMask notify_status = StatusMask::subscription_matched(); + DataReaderListener* listener = get_listener_for(notify_status); + if (listener != nullptr) + { + listener->on_subscription_matched(user_datareader_, subscription_matched_status_); + subscription_matched_status_.current_count_change = 0; + subscription_matched_status_.total_count_change = 0; + } + user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +ReturnCode_t DataReaderImpl::get_subscription_matched_status( + SubscriptionMatchedStatus& status) +{ + if (reader_ == nullptr) + { + return ReturnCode_t::RETCODE_NOT_ENABLED; + } + + { + std::unique_lock lock(reader_->getMutex()); + + status = subscription_matched_status_; + subscription_matched_status_.current_count_change = 0; + subscription_matched_status_.total_count_change = 0; + } + + user_datareader_->get_statuscondition().get_impl()->set_status(StatusMask::subscription_matched(), false); + return ReturnCode_t::RETCODE_OK; +} + bool DataReaderImpl::deadline_timer_reschedule() { assert(qos_.deadline().period != c_TimeInfinite); @@ -929,9 +993,14 @@ bool DataReaderImpl::deadline_missed() deadline_missed_status_.total_count++; deadline_missed_status_.total_count_change++; deadline_missed_status_.last_instance_handle = timer_owner_; - listener_->on_requested_deadline_missed(user_datareader_, deadline_missed_status_); - subscriber_->subscriber_listener_.on_requested_deadline_missed(user_datareader_, deadline_missed_status_); - deadline_missed_status_.total_count_change = 0; + StatusMask notify_status = StatusMask::requested_deadline_missed(); + auto listener = get_listener_for(notify_status); + if (nullptr != listener) + { + listener->on_requested_deadline_missed(user_datareader_, deadline_missed_status_); + deadline_missed_status_.total_count_change = 0; + } + user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); if (!history_.set_next_deadline( timer_owner_, @@ -951,10 +1020,14 @@ ReturnCode_t DataReaderImpl::get_requested_deadline_missed_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(reader_->getMutex()); + { + std::unique_lock lock(reader_->getMutex()); + + status = deadline_missed_status_; + deadline_missed_status_.total_count_change = 0; + } - status = deadline_missed_status_; - deadline_missed_status_.total_count_change = 0; + user_datareader_->get_statuscondition().get_impl()->set_status(StatusMask::requested_deadline_missed(), false); return ReturnCode_t::RETCODE_OK; } @@ -1032,12 +1105,15 @@ ReturnCode_t DataReaderImpl::get_liveliness_changed_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::lock_guard lock(reader_->getMutex()); + { + std::lock_guard lock(reader_->getMutex()); - status = liveliness_changed_status_; - liveliness_changed_status_.alive_count_change = 0u; - liveliness_changed_status_.not_alive_count_change = 0u; + status = liveliness_changed_status_; + liveliness_changed_status_.alive_count_change = 0u; + liveliness_changed_status_.not_alive_count_change = 0u; + } + user_datareader_->get_statuscondition().get_impl()->set_status(StatusMask::liveliness_changed(), false); return ReturnCode_t::RETCODE_OK; } @@ -1049,10 +1125,14 @@ ReturnCode_t DataReaderImpl::get_requested_incompatible_qos_status( return ReturnCode_t::RETCODE_NOT_ENABLED; } - std::unique_lock lock(reader_->getMutex()); + { + std::unique_lock lock(reader_->getMutex()); + + status = requested_incompatible_qos_status_; + requested_incompatible_qos_status_.total_count_change = 0u; + } - status = requested_incompatible_qos_status_; - requested_incompatible_qos_status_.total_count_change = 0u; + user_datareader_->get_statuscondition().get_impl()->set_status(StatusMask::requested_incompatible_qos(), false); return ReturnCode_t::RETCODE_OK; } diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index ff71cf80d88..4b493417926 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -218,6 +218,9 @@ class DataReaderImpl */ const TopicDescription* get_topicdescription() const; + ReturnCode_t get_subscription_matched_status( + SubscriptionMatchedStatus& status); + ReturnCode_t get_requested_deadline_missed_status( fastrtps::RequestedDeadlineMissedStatus& status); @@ -373,6 +376,9 @@ class DataReaderImpl //! The current timer owner, i.e. the instance which started the deadline timer fastrtps::rtps::InstanceHandle_t timer_owner_; + //! Subscription matched status + SubscriptionMatchedStatus subscription_matched_status_; + //! Liveliness changed status LivelinessChangedStatus liveliness_changed_status_; @@ -423,6 +429,12 @@ class DataReaderImpl SampleInfo* info, bool should_take); + void set_read_communication_status( + bool trigger_value); + + void update_subscription_matched_status( + const SubscriptionMatchedStatus& status); + /** * @brief A method called when a new cache change is added * @param change The cache change that has been added diff --git a/src/cpp/utils/collections/unordered_vector.hpp b/src/cpp/utils/collections/unordered_vector.hpp new file mode 100644 index 00000000000..f9d96d88a58 --- /dev/null +++ b/src/cpp/utils/collections/unordered_vector.hpp @@ -0,0 +1,38 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file unordered_vector.hpp + */ + +#ifndef SRC_CPP_UTILS_COLLECTIONS_UNORDERED_VECTOR_HPP_ +#define SRC_CPP_UTILS_COLLECTIONS_UNORDERED_VECTOR_HPP_ + +#include + +namespace eprosima { +namespace utilities { +namespace collections { + +template < + typename _Ty, + typename _Alloc = std::allocator<_Ty>> +using unordered_vector = eprosima::fastrtps::ResourceLimitedVector< + _Ty, std::false_type, eprosima::fastrtps::ResourceLimitedContainerConfig, _Alloc>; + +} // namespace collections +} // namespace utilities +} // namespace eprosima + +#endif /* SRC_CPP_UTILS_COLLECTIONS_NODE_SIZE_HELPERS_HPP_ */ diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index a75b8b985b2..53257f0b7d4 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -30,6 +30,9 @@ #if _MSC_VER #include #endif // _MSC_VER +#include +#include +#include #include #include #include @@ -63,7 +66,7 @@ class PubSubReader typedef TypeSupport type_support; typedef typename type_support::type type; -private: +protected: class ParticipantListener : public eprosima::fastdds::dds::DomainParticipantListener { @@ -310,7 +313,7 @@ class PubSubReader datareader_qos_.reliable_reader_qos().times.heartbeatResponseDelay.nanosec = 100000000; } - ~PubSubReader() + virtual ~PubSubReader() { destroy(); } @@ -357,11 +360,6 @@ class PubSubReader // Register type ASSERT_EQ(participant_->register_type(type_), ReturnCode_t::RETCODE_OK); - // Create subscriber - subscriber_ = participant_->create_subscriber(subscriber_qos_); - ASSERT_NE(subscriber_, nullptr); - ASSERT_TRUE(subscriber_->is_enabled()); - // Create topic topic_ = participant_->create_topic(topic_name_, type_->getName(), @@ -369,26 +367,39 @@ class PubSubReader ASSERT_NE(topic_, nullptr); ASSERT_TRUE(topic_->is_enabled()); - if (!xml_file_.empty()) + // Create publisher + createSubscriber(); + } + + virtual void createSubscriber() + { + if (participant_ != nullptr) { - if (!datareader_profile_.empty()) + subscriber_ = participant_->create_subscriber(subscriber_qos_); + ASSERT_NE(subscriber_, nullptr); + ASSERT_TRUE(subscriber_->is_enabled()); + + if (!xml_file_.empty()) { - datareader_ = subscriber_->create_datareader_with_profile(topic_, datareader_profile_, &listener_, - status_mask_); - ASSERT_NE(datareader_, nullptr); - ASSERT_TRUE(datareader_->is_enabled()); + if (!datareader_profile_.empty()) + { + datareader_ = subscriber_->create_datareader_with_profile(topic_, datareader_profile_, &listener_, + status_mask_); + ASSERT_NE(datareader_, nullptr); + ASSERT_TRUE(datareader_->is_enabled()); + } + } + if (datareader_ == nullptr) + { + datareader_ = subscriber_->create_datareader(topic_, datareader_qos_, &listener_, status_mask_); } - } - if (datareader_ == nullptr) - { - datareader_ = subscriber_->create_datareader(topic_, datareader_qos_, &listener_, status_mask_); - } - if (datareader_ != nullptr) - { - std::cout << "Created datareader " << datareader_->guid() << " for topic " << - topic_name_ << std::endl; - initialized_ = true; + if (datareader_ != nullptr) + { + std::cout << "Created datareader " << datareader_->guid() << " for topic " << + topic_name_ << std::endl; + initialized_ = true; + } } } @@ -398,7 +409,7 @@ class PubSubReader return initialized_; } - void destroy() + virtual void destroy() { if (participant_ != nullptr) { @@ -1511,7 +1522,12 @@ class PubSubReader datareader_profile_ = profile; } -private: + eprosima::fastdds::dds::StatusCondition& get_statuscondition() const + { + return datareader_->get_statuscondition(); + } + +protected: const eprosima::fastrtps::rtps::GUID_t& participant_guid() const { @@ -1674,4 +1690,351 @@ class PubSubReader std::atomic message_receive_count_; }; +template +class PubSubReaderWithWaitsets : public PubSubReader +{ +public: + + typedef TypeSupport type_support; + typedef typename type_support::type type; + +protected: + + class WaitsetThread + { + public: + + WaitsetThread( + PubSubReaderWithWaitsets& reader) + : reader_(reader) + { + } + + ~WaitsetThread() + { + stop(); + } + + void start( + const eprosima::fastrtps::Duration_t& timeout) + { + waitset_.attach_condition(reader_.datareader_->get_statuscondition()); + waitset_.attach_condition(reader_.subscriber_->get_statuscondition()); + waitset_.attach_condition(guard_condition_); + + std::unique_lock lock(mutex_); + if (nullptr == thread_) + { + running_ = true; + guard_condition_.set_trigger_value(false); + timeout_ = timeout; + thread_ = new std::thread(&WaitsetThread::run, this); + } + } + + void stop() + { + std::unique_lock lock(mutex_); + running_ = false; + if (nullptr != thread_) + { + lock.unlock(); + + // We need to trigger the wake up + guard_condition_.set_trigger_value(true); + thread_->join(); + lock.lock(); + delete thread_; + thread_ = nullptr; + } + } + + void run() + { + std::unique_lock lock(mutex_); + while (running_) + { + lock.unlock(); + auto wait_result = waitset_.wait(active_conditions_, timeout_); + if (wait_result == ReturnCode_t::RETCODE_TIMEOUT) + { + reader_.on_waitset_timeout(); + } + else + { + if (!guard_condition_.get_trigger_value()) + { + for (auto condition : active_conditions_) + { + process(dynamic_cast(condition)); + } + } + } + lock.lock(); + } + } + + void process( + eprosima::fastdds::dds::StatusCondition* condition) + { + eprosima::fastdds::dds::StatusMask triggered_statuses = reader_.datareader_->get_status_changes(); + triggered_statuses &= condition->get_enabled_statuses(); + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::subscription_matched())) + { + eprosima::fastdds::dds::SubscriptionMatchedStatus status; + reader_.datareader_->get_subscription_matched_status(status); + + if (0 < status.current_count_change) + { + std::cout << "Subscriber matched publisher " << status.last_publication_handle << std::endl; + reader_.matched(); + } + else if (0 > status.current_count_change) + { + std::cout << "Subscriber unmatched publisher " << status.last_publication_handle << std::endl; + reader_.unmatched(); + } + } + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::requested_deadline_missed())) + { + eprosima::fastdds::dds::RequestedDeadlineMissedStatus status; + reader_.datareader_->get_requested_deadline_missed_status(status); + times_deadline_missed_ = status.total_count; + } + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::requested_incompatible_qos())) + { + eprosima::fastdds::dds::RequestedIncompatibleQosStatus status; + reader_.datareader_->get_requested_incompatible_qos_status(status); + reader_.incompatible_qos(status); + } + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::liveliness_changed())) + { + eprosima::fastdds::dds::LivelinessChangedStatus status; + reader_.datareader_->get_liveliness_changed_status(status); + + reader_.set_liveliness_changed_status(status); + if (status.alive_count_change == 1) + { + reader_.liveliness_recovered(); + + } + else if (status.not_alive_count_change == 1) + { + reader_.liveliness_lost(); + + } + } + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::data_available())) + { + { + std::lock_guard guard(reader_.message_receive_mutex_); + reader_.message_receive_count_.fetch_add(1); + } + reader_.message_receive_cv_.notify_one(); + + if (reader_.receiving_.load()) + { + bool ret = false; + do + { + reader_.receive_one(reader_.datareader_, ret); + } while (ret); + } + } + + // We also have to process the subscriber + triggered_statuses = reader_.subscriber_->get_status_changes(); + triggered_statuses &= condition->get_enabled_statuses(); + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::data_on_readers())) + { + { + std::lock_guard guard(reader_.message_receive_mutex_); + reader_.message_receive_count_.fetch_add(1); + } + reader_.message_receive_cv_.notify_one(); + + if (reader_.receiving_.load()) + { + bool ret = false; + do + { + reader_.receive_one(reader_.datareader_, ret); + } while (ret); + } + } + } + + unsigned int missed_deadlines() const + { + return times_deadline_missed_; + } + + protected: + + // The reader this waitset thread serves + PubSubReaderWithWaitsets& reader_; + + // The waitset where the thread will be blocked + eprosima::fastdds::dds::WaitSet waitset_; + + // The active conditions that triggered the wake up + eprosima::fastdds::dds::ConditionSeq active_conditions_; + + // The thread that does the job + std::thread* thread_ = nullptr; + + // Whether the thread is running or not + bool running_ = false; + + // A Mutex to guard the thread start/stop + std::mutex mutex_; + + // A user-triggered condition used to signal the thread to stop + eprosima::fastdds::dds::GuardCondition guard_condition_; + + //! Number of times deadline was missed + unsigned int times_deadline_missed_ = 0; + + //! The timeout for the wait operation + eprosima::fastrtps::Duration_t timeout_; + + } + waitset_thread_; + + friend class WaitsetThread; + +public: + + PubSubReaderWithWaitsets( + const std::string& topic_name, + bool take = true, + bool statistics = false) + : PubSubReader(topic_name, take, statistics) + , waitset_thread_(*this) + , timeout_(eprosima::fastrtps::c_TimeInfinite) + , times_waitset_timeout_(0) + { + } + + ~PubSubReaderWithWaitsets() override + { + } + + void createSubscriber() override + { + if (participant_ != nullptr) + { + // Create subscriber + subscriber_ = participant_->create_subscriber(subscriber_qos_); + ASSERT_NE(subscriber_, nullptr); + ASSERT_TRUE(subscriber_->is_enabled()); + + if (!xml_file_.empty()) + { + if (!datareader_profile_.empty()) + { + datareader_ = subscriber_->create_datareader_with_profile(topic_, datareader_profile_, nullptr); + ASSERT_NE(datareader_, nullptr); + ASSERT_TRUE(datareader_->is_enabled()); + } + } + if (datareader_ == nullptr) + { + datareader_ = subscriber_->create_datareader(topic_, datareader_qos_, nullptr); + } + + if (datareader_ != nullptr) + { + initialized_ = datareader_->is_enabled(); + if (initialized_) + { + std::cout << "Created datareader " << datareader_->guid() << " for topic " << + topic_name_ << std::endl; + } + + // Set the desired status condition mask and start the waitset thread + datareader_->get_statuscondition().set_enabled_statuses(status_mask_); + subscriber_->get_statuscondition().set_enabled_statuses(status_mask_); + waitset_thread_.start(timeout_); + } + } + } + + void destroy() override + { + if (initialized_) + { + waitset_thread_.stop(); + } + + PubSubReader::destroy(); + } + + unsigned int missed_deadlines() const + { + return waitset_thread_.missed_deadlines(); + } + + void wait_waitset_timeout( + unsigned int times = 1) + { + std::unique_lock lock(waitset_timeout_mutex_); + + waitset_timeout_cv_.wait(lock, [&]() + { + return times_waitset_timeout_ >= times; + }); + } + + unsigned int times_waitset_timeout() + { + std::unique_lock lock(waitset_timeout_mutex_); + return times_waitset_timeout_; + } + + PubSubReaderWithWaitsets& waitset_timeout( + const eprosima::fastrtps::Duration_t& timeout) + { + timeout_ = timeout; + return *this; + } + +protected: + + void on_waitset_timeout() + { + std::unique_lock lock(waitset_timeout_mutex_); + ++times_waitset_timeout_; + waitset_timeout_cv_.notify_one(); + } + + //! The timeout for the waitset + eprosima::fastrtps::Duration_t timeout_; + + //! A mutex for waitset timeout + std::mutex waitset_timeout_mutex_; + //! A condition variable to notify when the waitset has timed out + std::condition_variable waitset_timeout_cv_; + //! Number of times the waitset has timed out + unsigned int times_waitset_timeout_; + + using PubSubReader::xml_file_; + using PubSubReader::participant_; + using PubSubReader::topic_name_; + using PubSubReader::topic_; + using PubSubReader::subscriber_; + using PubSubReader::subscriber_qos_; + using PubSubReader::datareader_; + using PubSubReader::datareader_qos_; + using PubSubReader::datareader_profile_; + using PubSubReader::initialized_; + using PubSubReader::status_mask_; +}; + #endif // _TEST_BLACKBOX_PUBSUBREADER_HPP_ diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 51c1b172e2d..0624b0e9b2f 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -32,6 +32,9 @@ #include #endif // _MSC_VER +#include +#include +#include #include #include #include @@ -303,7 +306,7 @@ class PubSubWriter datawriter_qos_.reliability().max_blocking_time.nanosec = 0; } - ~PubSubWriter() + virtual ~PubSubWriter() { destroy(); } @@ -363,7 +366,7 @@ class PubSubWriter return; } - void createPublisher() + virtual void createPublisher() { if (participant_ != nullptr) { @@ -426,7 +429,7 @@ class PubSubWriter return participant_; } - void destroy() + virtual void destroy() { if (participant_) { @@ -1387,7 +1390,7 @@ class PubSubWriter #endif // if HAVE_SQLITE3 -private: +protected: void participant_matched() { @@ -1691,4 +1694,312 @@ class PubSubWriter #endif // if HAVE_SECURITY }; + +template +class PubSubWriterWithWaitsets : public PubSubWriter +{ +public: + + typedef TypeSupport type_support; + typedef typename type_support::type type; + +protected: + + class WaitsetThread + { + public: + + WaitsetThread( + PubSubWriterWithWaitsets& writer) + : writer_(writer) + { + } + + ~WaitsetThread() + { + stop(); + } + + void start( + const eprosima::fastrtps::Duration_t& timeout) + { + waitset_.attach_condition(writer_.datawriter_->get_statuscondition()); + waitset_.attach_condition(guard_condition_); + + std::unique_lock lock(mutex_); + if (nullptr == thread_) + { + running_ = true; + guard_condition_.set_trigger_value(false); + timeout_ = timeout; + thread_ = new std::thread(&WaitsetThread::run, this); + } + } + + void stop() + { + std::unique_lock lock(mutex_); + running_ = false; + if (nullptr != thread_) + { + lock.unlock(); + + // We need to trigger the wake up + guard_condition_.set_trigger_value(true); + thread_->join(); + lock.lock(); + delete thread_; + thread_ = nullptr; + } + } + + void run() + { + std::unique_lock lock(mutex_); + while (running_) + { + lock.unlock(); + auto wait_result = waitset_.wait(active_conditions_, timeout_); + if (wait_result == ReturnCode_t::RETCODE_TIMEOUT) + { + writer_.on_waitset_timeout(); + } + else + { + if (!guard_condition_.get_trigger_value()) + { + ASSERT_FALSE(active_conditions_.empty()); + EXPECT_EQ(active_conditions_[0], &writer_.datawriter_->get_statuscondition()); + process(&writer_.datawriter_->get_statuscondition()); + } + } + lock.lock(); + } + } + + void process( + eprosima::fastdds::dds::StatusCondition* condition) + { + eprosima::fastdds::dds::StatusMask triggered_statuses = writer_.datawriter_->get_status_changes(); + triggered_statuses &= condition->get_enabled_statuses(); + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::publication_matched())) + { + eprosima::fastdds::dds::PublicationMatchedStatus status; + writer_.datawriter_->get_publication_matched_status(status); + + if (0 < status.current_count_change) + { + std::cout << "Publisher matched subscriber " << status.last_subscription_handle << std::endl; + writer_.matched(); + } + else if (0 > status.current_count_change) + { + std::cout << "Publisher unmatched subscriber " << status.last_subscription_handle << std::endl; + writer_.unmatched(); + } + } + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::offered_deadline_missed())) + { + eprosima::fastdds::dds::OfferedDeadlineMissedStatus status; + writer_.datawriter_->get_offered_deadline_missed_status(status); + times_deadline_missed_ = status.total_count; + } + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::offered_incompatible_qos())) + { + eprosima::fastdds::dds::OfferedIncompatibleQosStatus status; + writer_.datawriter_->get_offered_incompatible_qos_status(status); + writer_.incompatible_qos(status); + } + + if (triggered_statuses.is_active(eprosima::fastdds::dds::StatusMask::liveliness_lost())) + { + eprosima::fastdds::dds::LivelinessLostStatus status; + writer_.datawriter_->get_liveliness_lost_status(status); + + times_liveliness_lost_ = status.total_count; + writer_.liveliness_lost(); + } + } + + unsigned int missed_deadlines() const + { + return times_deadline_missed_; + } + + unsigned int times_liveliness_lost() const + { + return times_liveliness_lost_; + } + + protected: + + // The reader this waitset thread serves + PubSubWriterWithWaitsets& writer_; + + // The waitset where the thread will be blocked + eprosima::fastdds::dds::WaitSet waitset_; + + // The active conditions that triggered the wake up + eprosima::fastdds::dds::ConditionSeq active_conditions_; + + // The thread that does the job + std::thread* thread_ = nullptr; + + // Whether the thread is running or not + bool running_ = false; + + // A Mutex to guard the thread start/stop + std::mutex mutex_; + + // A user-triggered condition used to signal the thread to stop + eprosima::fastdds::dds::GuardCondition guard_condition_; + + //! The number of times deadline was missed + unsigned int times_deadline_missed_ = 0; + + //! The number of times liveliness was lost + unsigned int times_liveliness_lost_ = 0; + + //! The timeout for the wait operation + eprosima::fastrtps::Duration_t timeout_; + } + waitset_thread_; + + friend class WaitsetThread; + +public: + + PubSubWriterWithWaitsets( + const std::string& topic_name) + : PubSubWriter(topic_name) + , waitset_thread_(*this) + , timeout_(eprosima::fastrtps::c_TimeInfinite) + , times_waitset_timeout_(0) + { + } + + ~PubSubWriterWithWaitsets() override + { + } + + void createPublisher() override + { + if (participant_ != nullptr) + { + // Create publisher + publisher_ = participant_->create_publisher(publisher_qos_); + ASSERT_NE(publisher_, nullptr); + ASSERT_TRUE(publisher_->is_enabled()); + + if (!xml_file_.empty()) + { + if (!datawriter_profile_.empty()) + { + datawriter_ = publisher_->create_datawriter_with_profile(topic_, datawriter_profile_, nullptr); + ASSERT_NE(datawriter_, nullptr); + ASSERT_TRUE(datawriter_->is_enabled()); + } + } + if (datawriter_ == nullptr) + { + datawriter_ = publisher_->create_datawriter(topic_, datawriter_qos_, nullptr); + } + + if (datawriter_ != nullptr) + { + initialized_ = datawriter_->is_enabled(); + if (initialized_) + { + std::cout << "Created datawriter " << datawriter_->guid() << " for topic " << + topic_name_ << std::endl; + + // Set the desired status condition mask and start the waitset thread + datawriter_->get_statuscondition().set_enabled_statuses(status_mask_); + waitset_thread_.start(timeout_); + } + } + } + return; + } + + void destroy() override + { + if (initialized_) + { + waitset_thread_.stop(); + } + + PubSubWriter::destroy(); + } + + unsigned int missed_deadlines() const + { + return waitset_thread_.missed_deadlines(); + } + + unsigned int times_liveliness_lost() const + { + return waitset_thread_.times_liveliness_lost(); + } + + void wait_waitset_timeout( + unsigned int times = 1) + { + std::unique_lock lock(waitset_timeout_mutex_); + + waitset_timeout_cv_.wait(lock, [&]() + { + return times_waitset_timeout_ >= times; + }); + } + + unsigned int times_waitset_timeout() + { + std::unique_lock lock(waitset_timeout_mutex_); + return times_waitset_timeout_; + } + + PubSubWriterWithWaitsets& waitset_timeout( + const eprosima::fastrtps::Duration_t& timeout) + { + timeout_ = timeout; + return *this; + } + +protected: + + void on_waitset_timeout() + { + std::unique_lock lock(waitset_timeout_mutex_); + ++times_waitset_timeout_; + waitset_timeout_cv_.notify_one(); + } + + //! The timeout for the waitset + eprosima::fastrtps::Duration_t timeout_; + + //! A mutex for waitset timeout + std::mutex waitset_timeout_mutex_; + //! A condition variable to notify when the waitset has timed out + std::condition_variable waitset_timeout_cv_; + //! Number of times the waitset has timed out + unsigned int times_waitset_timeout_; + + using PubSubWriter::xml_file_; + using PubSubWriter::participant_; + using PubSubWriter::topic_name_; + using PubSubWriter::topic_; + using PubSubWriter::publisher_; + using PubSubWriter::publisher_qos_; + using PubSubWriter::datawriter_; + using PubSubWriter::datawriter_qos_; + using PubSubWriter::datawriter_profile_; + using PubSubWriter::initialized_; + using PubSubWriter::status_mask_; +}; + + #endif // _TEST_BLACKBOX_PUBSUBWRITER_HPP_ diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 76fe2a34f77..b228e96a734 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -18,6 +18,10 @@ #include "PubSubWriter.hpp" #include +#include +#include +#include + #include using namespace eprosima::fastrtps; @@ -76,6 +80,114 @@ class DDSStatus : public testing::TestWithParam }; +/* This test also serves as check for the publication_matched and subscription_matched conditions, + * as all the status conditions are processed through waitsets and we are checking if there is a match + */ +TEST_P(DDSStatus, IncompatibleQosConditions) +{ + PubSubWriterWithWaitsets writer(TEST_TOPIC_NAME); + writer.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .durability_kind(eprosima::fastrtps::VOLATILE_DURABILITY_QOS) + .init(); + ASSERT_TRUE(writer.isInitialized()); + + // A Reader on the same Topic but with incompatible QoS + // Should not match and trigger incompatible QoS conditions on both Writer and Reader + PubSubReaderWithWaitsets incompatible_reliability_reader(TEST_TOPIC_NAME); + incompatible_reliability_reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .init(); + ASSERT_TRUE(incompatible_reliability_reader.isInitialized()); + + writer.wait_incompatible_qos(1); + incompatible_reliability_reader.wait_incompatible_qos(1); + + EXPECT_EQ(writer.times_incompatible_qos(), 1u); + EXPECT_EQ(writer.last_incompatible_qos(), eprosima::fastdds::dds::RELIABILITY_QOS_POLICY_ID); + EXPECT_FALSE(writer.is_matched()); + + EXPECT_EQ(incompatible_reliability_reader.times_incompatible_qos(), 1u); + EXPECT_EQ(incompatible_reliability_reader.last_incompatible_qos(), + eprosima::fastdds::dds::RELIABILITY_QOS_POLICY_ID); + EXPECT_FALSE(incompatible_reliability_reader.is_matched()); + + // Another Reader on the same Topic but with incompatible QoS + // Should not match and trigger incompatible QoS condition on both Writer and Reader + // Total count of incompatible QoS occurrences in Writer increments, and updates the latest incompatible QoS ID, + // but old Reader stays the same + PubSubReaderWithWaitsets incompatible_durability_reader(TEST_TOPIC_NAME); + incompatible_durability_reader.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) + .init(); + ASSERT_TRUE(incompatible_durability_reader.isInitialized()); + + writer.wait_incompatible_qos(2); + incompatible_durability_reader.wait_incompatible_qos(1); + + EXPECT_EQ(writer.times_incompatible_qos(), 2u); + EXPECT_EQ(writer.last_incompatible_qos(), eprosima::fastdds::dds::DURABILITY_QOS_POLICY_ID); + EXPECT_FALSE(writer.is_matched()); + + EXPECT_EQ(incompatible_reliability_reader.times_incompatible_qos(), 1u); + EXPECT_EQ(incompatible_reliability_reader.last_incompatible_qos(), + eprosima::fastdds::dds::RELIABILITY_QOS_POLICY_ID); + EXPECT_FALSE(incompatible_reliability_reader.is_matched()); + + EXPECT_EQ(incompatible_durability_reader.times_incompatible_qos(), 1u); + EXPECT_EQ(incompatible_durability_reader.last_incompatible_qos(), eprosima::fastdds::dds::DURABILITY_QOS_POLICY_ID); + EXPECT_FALSE(incompatible_durability_reader.is_matched()); + + // Create another two writers equal to the first one. + // Incompatible readers increase incompatible QoS occurrences by two + PubSubWriterWithWaitsets writer2(TEST_TOPIC_NAME); + writer2.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .durability_kind(eprosima::fastrtps::VOLATILE_DURABILITY_QOS) + .init(); + ASSERT_TRUE(writer2.isInitialized()); + + PubSubWriterWithWaitsets writer3(TEST_TOPIC_NAME); + writer3.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .durability_kind(eprosima::fastrtps::VOLATILE_DURABILITY_QOS) + .init(); + ASSERT_TRUE(writer3.isInitialized()); + + // A compatible Reader on another Topic + // Should not match but never trigger incompatible QoS conditions + // Total count of incompatible QoS occurrences and latest incompatible QoS ID stay the same + PubSubReaderWithWaitsets compatible_reader(INCOMPATIBLE_TEST_TOPIC_NAME); + compatible_reader.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .durability_kind(eprosima::fastrtps::VOLATILE_DURABILITY_QOS) + .deactivate_status_listener(eprosima::fastdds::dds::StatusMask::requested_incompatible_qos()) + .init(); + ASSERT_TRUE(compatible_reader.isInitialized()); + + writer2.wait_incompatible_qos(2); + writer3.wait_incompatible_qos(2); + incompatible_reliability_reader.wait_incompatible_qos(3); + incompatible_durability_reader.wait_incompatible_qos(3); + + EXPECT_EQ(writer.times_incompatible_qos(), 2u); + EXPECT_EQ(writer.last_incompatible_qos(), eprosima::fastdds::dds::DURABILITY_QOS_POLICY_ID); + EXPECT_FALSE(writer.is_matched()); + + EXPECT_EQ(writer2.times_incompatible_qos(), 2u); + EXPECT_FALSE(writer2.is_matched()); + + EXPECT_EQ(writer3.times_incompatible_qos(), 2u); + EXPECT_FALSE(writer3.is_matched()); + + EXPECT_EQ(incompatible_reliability_reader.times_incompatible_qos(), 3u); + EXPECT_EQ(incompatible_reliability_reader.last_incompatible_qos(), + eprosima::fastdds::dds::RELIABILITY_QOS_POLICY_ID); + EXPECT_FALSE(incompatible_reliability_reader.is_matched()); + + EXPECT_EQ(incompatible_durability_reader.times_incompatible_qos(), 3u); + EXPECT_EQ(incompatible_durability_reader.last_incompatible_qos(), eprosima::fastdds::dds::DURABILITY_QOS_POLICY_ID); + EXPECT_FALSE(incompatible_durability_reader.is_matched()); + + EXPECT_EQ(compatible_reader.times_incompatible_qos(), 0u); + EXPECT_EQ(compatible_reader.last_incompatible_qos(), eprosima::fastdds::dds::INVALID_QOS_POLICY_ID); + EXPECT_FALSE(compatible_reader.is_matched()); +} TEST_P(DDSStatus, IncompatibleQosListeners) { @@ -376,6 +488,187 @@ TEST_P(DDSStatus, IncompatibleQosGetters) EXPECT_EQ(compatible_reader.last_incompatible_qos(), eprosima::fastdds::dds::INVALID_QOS_POLICY_ID); } +TEST_P(DDSStatus, LivelinessConditions) +{ + PubSubReaderWithWaitsets reader(TEST_TOPIC_NAME); + PubSubWriterWithWaitsets writer(TEST_TOPIC_NAME); + + constexpr unsigned int participant_announcement_period_ms = 50000; + + writer.lease_duration( + participant_announcement_period_ms * 3e-3, participant_announcement_period_ms * 1e-3); + reader.lease_duration( + participant_announcement_period_ms * 3e-3, participant_announcement_period_ms * 1e-3); + + // Number of samples to write + unsigned int num_samples = 2; + + // Lease duration, announcement period, and sleep time, in milliseconds + unsigned int sleep_ms = 10; + unsigned int lease_duration_ms = 1000; + unsigned int announcement_period_ms = 1; + + reader.reliability(RELIABLE_RELIABILITY_QOS) + .liveliness_kind(MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) + .liveliness_lease_duration(lease_duration_ms * 1e-3) + .init(); + writer.reliability(RELIABLE_RELIABILITY_QOS) + .liveliness_kind(MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) + .liveliness_announcement_period(announcement_period_ms * 1e-3) + .liveliness_lease_duration(lease_duration_ms * 1e-3) + .init(); + + ASSERT_TRUE(reader.isInitialized()); + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(num_samples); + reader.startReception(data); + + size_t count = 0; + for (auto data_sample : data) + { + ++count; + writer.send_sample(data_sample); + reader.block_for_at_least(count); + reader.wait_liveliness_recovered(); + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + } + for (count = 0; count < num_samples; count++) + { + writer.assert_liveliness(); + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + } + + // Liveliness shouldn't have been lost + EXPECT_EQ(writer.times_liveliness_lost(), 0u); + EXPECT_EQ(reader.times_liveliness_lost(), 0u); + EXPECT_EQ(reader.times_liveliness_recovered(), 1u); + + // Remove and re-create publisher, test liveliness on subscriber and the new publisher. + writer.removePublisher(); + ASSERT_FALSE(writer.isInitialized()); + reader.wait_writer_undiscovery(); + writer.createPublisher(); + ASSERT_TRUE(writer.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + for (count = 0; count < num_samples; count++) + { + writer.assert_liveliness(); + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + } + + // Liveliness shouldn't have been lost + EXPECT_EQ(writer.times_liveliness_lost(), 0u); + EXPECT_EQ(reader.times_liveliness_lost(), 0u); + EXPECT_EQ(reader.times_liveliness_recovered(), 2u); +} + +TEST_P(DDSStatus, DeadlineConditions) +{ + // This test sets a short deadline (short compared to the write rate), + // makes the writer send a few samples and checks that the deadline was missed every time + // Uses a topic with no key + + PubSubReaderWithWaitsets reader(TEST_TOPIC_NAME); + PubSubWriterWithWaitsets writer(TEST_TOPIC_NAME); + + // Write rate in milliseconds + uint32_t writer_sleep_ms = 1000; + // Number of samples written by writer + uint32_t writer_samples = 3; + // Deadline period in milliseconds + uint32_t deadline_period_ms = 10; + + reader.deadline_period(deadline_period_ms * 1e-3).init(); + writer.deadline_period(deadline_period_ms * 1e-3).init(); + + ASSERT_TRUE(reader.isInitialized()); + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(writer_samples); + + reader.startReception(data); + + size_t count = 0; + for (auto data_sample : data) + { + // Send data + writer.send_sample(data_sample); + ++count; + reader.block_for_at_least(count); + std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms)); + } + + // All samples should have missed the deadline + EXPECT_GE(writer.missed_deadlines(), writer_samples); + EXPECT_GE(reader.missed_deadlines(), writer_samples); +} + +TEST_P(DDSStatus, DataAvailableConditions) +{ + PubSubReaderWithWaitsets reader(TEST_TOPIC_NAME); + PubSubWriterWithWaitsets writer(TEST_TOPIC_NAME); + PubSubReaderWithWaitsets subscriber_reader(TEST_TOPIC_NAME); + + // Waitset timeout in seconds + uint32_t timeout_s = 2; + + // This reader will receive the data notification on the reader + reader.history_depth(100). + reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). + deactivate_status_listener(eprosima::fastdds::dds::StatusMask::data_on_readers()); + reader.waitset_timeout(timeout_s).init(); + ASSERT_TRUE(reader.isInitialized()); + + writer.history_depth(100).init(); + ASSERT_TRUE(writer.isInitialized()); + + // This reader will receive the data notification on the subscriber + subscriber_reader.history_depth(100). + reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). + deactivate_status_listener(eprosima::fastdds::dds::StatusMask::data_available()); + subscriber_reader.waitset_timeout(timeout_s).init(); + ASSERT_TRUE(reader.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(2); + reader.wait_discovery(); + subscriber_reader.wait_discovery(); + + auto data = default_helloworld_data_generator(); + + reader.startReception(data); + subscriber_reader.startReception(data); + + // Send data + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_all(); + subscriber_reader.block_for_all(); + + // No timeouts until this point + ASSERT_EQ(0u, reader.times_waitset_timeout()); + ASSERT_EQ(0u, subscriber_reader.times_waitset_timeout()); + + // Now wait until at least one timeout occurs + reader.wait_waitset_timeout(); + subscriber_reader.wait_waitset_timeout(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/unittest/dds/core/condition/CMakeLists.txt b/test/unittest/dds/core/condition/CMakeLists.txt index 1a9b86f20bf..c50694fcde6 100644 --- a/test/unittest/dds/core/condition/CMakeLists.txt +++ b/test/unittest/dds/core/condition/CMakeLists.txt @@ -12,19 +12,27 @@ # See the License for the specific language governing permissions and # limitations under the License. +set(LOG_SOURCES + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp +) + if(WIN32) add_definitions(-D_WIN32_WINNT=0x0601) endif() set(CONDITION_TESTS_SOURCE - ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypesBase.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/ConditionNotifier.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/GuardCondition.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusCondition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusConditionImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/WaitSet.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/WaitSetImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp + ${LOG_SOURCES} ConditionTests.cpp) add_executable(ConditionTests ${CONDITION_TESTS_SOURCE}) @@ -33,6 +41,78 @@ target_compile_definitions(ConditionTests PRIVATE FASTRTPS_NO_LIB $<$:__INTERNALDEBUG> # Internal debug activated. ) target_include_directories(ConditionTests PRIVATE - ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include) -target_link_libraries(ConditionTests GTest::gtest fastcdr) + ${PROJECT_SOURCE_DIR}/include + ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp + ) +target_link_libraries(ConditionTests GTest::gtest) add_gtest(ConditionTests SOURCES ${CONDITION_TESTS_SOURCE}) + +### ConditionNotifier ### +set(CONDITION_NOTIFIER_TESTS_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/ConditionNotifier.cpp + ${LOG_SOURCES} + ConditionNotifierTests.cpp) + +add_executable(ConditionNotifierTests ${CONDITION_NOTIFIER_TESTS_SOURCE}) +target_compile_definitions(ConditionNotifierTests PRIVATE FASTRTPS_NO_LIB + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) +target_include_directories(ConditionNotifierTests PRIVATE + mock/WaitSetImpl + ${GTEST_INCLUDE_DIRS} + ${PROJECT_SOURCE_DIR}/include + ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp + ) +target_link_libraries(ConditionNotifierTests GTest::gtest GTest::gmock) +add_gtest(ConditionNotifierTests SOURCES ${CONDITION_NOTIFIER_TESTS_SOURCE}) + +### StatusConditionImpl ### +set(STATUS_CONDITION_IMPL_TESTS_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusCondition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusConditionImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp + ${LOG_SOURCES} + StatusConditionImplTests.cpp) + +add_executable(StatusConditionImplTests ${STATUS_CONDITION_IMPL_TESTS_SOURCE}) +target_compile_definitions(StatusConditionImplTests PRIVATE FASTRTPS_NO_LIB + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) +target_include_directories(StatusConditionImplTests PRIVATE + mock/ConditionNotifier + ${GTEST_INCLUDE_DIRS} + ${PROJECT_SOURCE_DIR}/include + ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp + ) +target_link_libraries(StatusConditionImplTests GTest::gtest GTest::gmock) +add_gtest(StatusConditionImplTests SOURCES ${STATUS_CONDITION_IMPL_TESTS_SOURCE}) + +### WaitSetImpl ### +set(WAITSET_IMPL_TESTS_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/WaitSetImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp + ${LOG_SOURCES} + WaitSetImplTests.cpp) + +add_executable(WaitSetImplTests ${WAITSET_IMPL_TESTS_SOURCE}) +target_compile_definitions(WaitSetImplTests PRIVATE FASTRTPS_NO_LIB + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) +target_include_directories(WaitSetImplTests PRIVATE + mock/ConditionNotifier + ${GTEST_INCLUDE_DIRS} + ${PROJECT_SOURCE_DIR}/include + ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp + ) +target_link_libraries(WaitSetImplTests GTest::gtest GTest::gmock) +add_gtest(WaitSetImplTests SOURCES ${WAITSET_IMPL_TESTS_SOURCE}) diff --git a/test/unittest/dds/core/condition/ConditionNotifierTests.cpp b/test/unittest/dds/core/condition/ConditionNotifierTests.cpp new file mode 100644 index 00000000000..305737c81f0 --- /dev/null +++ b/test/unittest/dds/core/condition/ConditionNotifierTests.cpp @@ -0,0 +1,98 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +// Include mocks first +#include + +// Include UUT +#include + +// Other includes +#include + +using namespace eprosima::fastdds::dds; +using namespace eprosima::fastdds::dds::detail; +using ::testing::_; + +class TestCondition : public Condition +{ +}; + +TEST(ConditionNotifierTests, basic_test) +{ + WaitSetImpl wait_set; + ConditionNotifier notifier; + TestCondition condition; + + auto test_steps = [&]() + { + // This should not call wake_up, as the wait_set has not been attached to the notifier (ncalls = 0/1) + notifier.notify(); + notifier.will_be_deleted(condition); + + // Waitset should be called after being attached (ncalls = 1/2) + notifier.attach_to(&wait_set); + notifier.notify(); + notifier.will_be_deleted(condition); + + // Attaching nullptr should not fail and the other waitset should be called (ncalls = 2/3) + notifier.attach_to(nullptr); + notifier.notify(); + notifier.will_be_deleted(condition); + + // Attaching same waitset should not duplicate calls (ncalls = 3/4) + notifier.attach_to(&wait_set); + notifier.notify(); + notifier.will_be_deleted(condition); + + // Detaching nullptr should not fail and the other waitset should still be called (ncalls = 4/5) + notifier.detach_from(nullptr); + notifier.notify(); + notifier.will_be_deleted(condition); + + // Waitset should not be called after being detached (ncalls = 4/6) + notifier.detach_from(&wait_set); + notifier.notify(); + notifier.will_be_deleted(condition); + + // Waitset is allowed to be removed twice (ncalls = 4/7) + notifier.detach_from(&wait_set); + notifier.notify(); + notifier.will_be_deleted(condition); + }; + + EXPECT_CALL(wait_set, wake_up()).Times(4); + EXPECT_CALL(wait_set, will_be_deleted(_)).Times(4); + test_steps(); + testing::Mock::VerifyAndClearExpectations(&wait_set); + + WaitSetImpl other_waitset; + notifier.attach_to(&other_waitset); + + EXPECT_CALL(wait_set, wake_up()).Times(4); + EXPECT_CALL(wait_set, will_be_deleted(_)).Times(4); + EXPECT_CALL(other_waitset, wake_up()).Times(7); + EXPECT_CALL(other_waitset, will_be_deleted(_)).Times(7); + test_steps(); +} + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/unittest/dds/core/condition/ConditionTests.cpp b/test/unittest/dds/core/condition/ConditionTests.cpp index f0f3bad0a57..31a58e7ff37 100644 --- a/test/unittest/dds/core/condition/ConditionTests.cpp +++ b/test/unittest/dds/core/condition/ConditionTests.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -23,6 +24,8 @@ #include #include +#include + using eprosima::fastrtps::types::ReturnCode_t; using namespace eprosima::fastdds::dds; @@ -77,42 +80,296 @@ class ConditionTests : public ::testing::Test TEST_F(ConditionTests, unsupported_condition_methods) { - Condition cond; + class TestCondition : public Condition + { + } + cond; ASSERT_FALSE(cond.get_trigger_value()); HELPER_WaitForEntries(1); } -TEST_F(ConditionTests, unsupported_wait_set_methods) +TEST_F(ConditionTests, waitset_condition_management) { - WaitSet ws; - Condition aux_cond; - ConditionSeq aux_cond_seq; - eprosima::fastrtps::Duration_t timeout(1, 0u); - - ASSERT_EQ(ws.attach_condition(aux_cond), ReturnCode_t::RETCODE_UNSUPPORTED); - ASSERT_EQ(ws.detach_condition(aux_cond), ReturnCode_t::RETCODE_UNSUPPORTED); - ASSERT_EQ(ws.get_conditions(aux_cond_seq), ReturnCode_t::RETCODE_UNSUPPORTED); - ASSERT_EQ(ws.wait(aux_cond_seq, timeout), ReturnCode_t::RETCODE_UNSUPPORTED); + ConditionSeq conditions; + WaitSet wait_set; + + // WaitSet should be created without conditions + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_TRUE(conditions.empty()); + + // This scope allows checking the wait_set behavior when the condition is destroyed + { + GuardCondition condition; + + // Trying to detach without having attached + EXPECT_EQ(ReturnCode_t::RETCODE_PRECONDITION_NOT_MET, wait_set.detach_condition(condition)); + + // Adding the same condition several times should always succeed and keep the list with a single condition + for (int i = 0; i < 2; ++i) + { + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.attach_condition(condition)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + } + + // Detaching the condition once should succeed + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.detach_condition(condition)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_TRUE(conditions.empty()); + + // Detaching a second time should fail + EXPECT_EQ(ReturnCode_t::RETCODE_PRECONDITION_NOT_MET, wait_set.detach_condition(condition)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_TRUE(conditions.empty()); + + // Attach the condition again + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.attach_condition(condition)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + } + + // After the condition is destroyed, the wait_set should be empty + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_TRUE(conditions.empty()); } -TEST_F(ConditionTests, unsupported_guard_condition_methods) +TEST_F(ConditionTests, waitset_wait) +{ + GuardCondition condition; + ConditionSeq conditions; + WaitSet wait_set; + const eprosima::fastrtps::Duration_t timeout{ 1, 0 }; + + // Waiting on empty wait set should timeout + EXPECT_EQ(ReturnCode_t::RETCODE_TIMEOUT, wait_set.wait(conditions, timeout)); + EXPECT_TRUE(conditions.empty()); + + // Attach condition + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.attach_condition(condition)); + + // Waiting on untriggered condition should timeout + EXPECT_EQ(ReturnCode_t::RETCODE_TIMEOUT, wait_set.wait(conditions, timeout)); + EXPECT_TRUE(conditions.empty()); + + // Waiting on already triggered condition should inmediately return condition + EXPECT_EQ(ReturnCode_t::RETCODE_OK, condition.set_trigger_value(true)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.wait(conditions, timeout)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + + // Adding a non-triggered condition while waiting should timeout + EXPECT_EQ(ReturnCode_t::RETCODE_OK, condition.set_trigger_value(false)); + { + GuardCondition non_triggered_condition; + std::thread thr_add_non_triggered([&]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.attach_condition(non_triggered_condition)); + }); + + EXPECT_EQ(ReturnCode_t::RETCODE_TIMEOUT, wait_set.wait(conditions, timeout)); + EXPECT_TRUE(conditions.empty()); + thr_add_non_triggered.join(); + } + + // Setting the trigger while waiting should return the condition + { + std::thread thr_set_trigger([&]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, condition.set_trigger_value(true)); + }); + + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.wait(conditions, timeout)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + thr_set_trigger.join(); + } + + // Two threads are not allowed to wait at the same time + EXPECT_EQ(ReturnCode_t::RETCODE_OK, condition.set_trigger_value(false)); + { + std::thread thr_second_wait([&wait_set, &timeout]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ConditionSeq conds; + EXPECT_EQ(ReturnCode_t::RETCODE_PRECONDITION_NOT_MET, wait_set.wait(conds, timeout)); + EXPECT_TRUE(conds.empty()); + }); + + EXPECT_EQ(ReturnCode_t::RETCODE_TIMEOUT, wait_set.wait(conditions, timeout)); + EXPECT_TRUE(conditions.empty()); + thr_second_wait.join(); + } + + // Waiting forever and adding a triggered condition should wake and only return the added condition + { + GuardCondition triggered_condition; + EXPECT_EQ(ReturnCode_t::RETCODE_OK, triggered_condition.set_trigger_value(true)); + + std::thread thr_add_triggered([&]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + wait_set.attach_condition(triggered_condition); + }); + + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_EQ(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition)); + thr_add_triggered.join(); + } +} + +TEST_F(ConditionTests, guard_condition_methods) { GuardCondition cond; - ASSERT_EQ(cond.set_trigger_value(true), ReturnCode_t::RETCODE_UNSUPPORTED); + EXPECT_FALSE(cond.get_trigger_value()); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_trigger_value(true)); + EXPECT_TRUE(cond.get_trigger_value()); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_trigger_value(false)); + EXPECT_FALSE(cond.get_trigger_value()); } -TEST_F(ConditionTests, unsupported_status_condition_methods) +TEST_F(ConditionTests, status_condition_methods) { - StatusCondition cond; + Entity entity; + StatusCondition& cond = entity.get_statuscondition(); + + EXPECT_EQ(&entity, cond.get_entity()); + EXPECT_FALSE(cond.get_trigger_value()); + + StatusMask mask_none = StatusMask::none(); + StatusMask mask_all = StatusMask::all(); + StatusMask mask_single = StatusMask::inconsistent_topic(); + + // According to the DDS standard, StatusCondition should start with all statuses enabled + EXPECT_EQ(mask_all.to_string(), cond.get_enabled_statuses().to_string()); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_enabled_statuses(mask_single)); + EXPECT_EQ(mask_single.to_string(), cond.get_enabled_statuses().to_string()); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_enabled_statuses(mask_none)); + EXPECT_EQ(mask_none.to_string(), cond.get_enabled_statuses().to_string()); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_enabled_statuses(mask_all)); + EXPECT_EQ(mask_all.to_string(), cond.get_enabled_statuses().to_string()); +} + +TEST_F(ConditionTests, status_condition_trigger) +{ + WaitSet wait_set; + ConditionSeq conditions; + const eprosima::fastrtps::Duration_t timeout{ 1, 0 }; + + Entity entity; + StatusCondition& cond = entity.get_statuscondition(); + + StatusMask mask_all = StatusMask::all(); + StatusMask one_mask = StatusMask::inconsistent_topic(); + StatusMask other_mask = StatusMask::data_on_readers(); + + auto wait_for_trigger = [&]() + { + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_EQ(&cond, conditions[0]); + EXPECT_TRUE(cond.get_trigger_value()); + }; + + auto expect_no_trigger = [&]() + { + EXPECT_FALSE(cond.get_trigger_value()); + EXPECT_EQ(ReturnCode_t::RETCODE_TIMEOUT, wait_set.wait(conditions, timeout)); + EXPECT_TRUE(conditions.empty()); + }; + + ASSERT_NE(nullptr, cond.get_impl()); + + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.attach_condition(cond)); + + // Condition should be untriggered upon creation + EXPECT_EQ(mask_all.to_string(), cond.get_enabled_statuses().to_string()); + expect_no_trigger(); + + // Triggering other_mask should trigger + { + std::thread wait_thr(wait_for_trigger); + cond.get_impl()->set_status(other_mask, true); + EXPECT_TRUE(cond.get_trigger_value()); + wait_thr.join(); + } + + // Setting mask to one_mask should untrigger + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_enabled_statuses(one_mask)); + EXPECT_EQ(one_mask.to_string(), cond.get_enabled_statuses().to_string()); + expect_no_trigger(); + + // Triggering one_mask should trigger + { + std::thread wait_thr(wait_for_trigger); + cond.get_impl()->set_status(one_mask, true); + EXPECT_TRUE(cond.get_trigger_value()); + wait_thr.join(); + } + + // Triggering twice should not affect trigger + cond.get_impl()->set_status(one_mask, true); + wait_for_trigger(); + + // Untriggering other_mask should not affect trigger + cond.get_impl()->set_status(other_mask, false); + wait_for_trigger(); + + // Triggering other_mask should not affect trigger + cond.get_impl()->set_status(other_mask, true); + wait_for_trigger(); + + // Untriggering one_mask should untrigger + cond.get_impl()->set_status(one_mask, false); + expect_no_trigger(); + + // Untriggering other_mask should not trigger + cond.get_impl()->set_status(other_mask, false); + expect_no_trigger(); + + // Triggering other_mask should not trigger + cond.get_impl()->set_status(other_mask, true); + expect_no_trigger(); + + // Setting mask to other_mask should trigger + { + std::thread wait_thr(wait_for_trigger); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_enabled_statuses(other_mask)); + EXPECT_EQ(other_mask.to_string(), cond.get_enabled_statuses().to_string()); + wait_thr.join(); + } + + // Triggering one_mask should not affect trigger + cond.get_impl()->set_status(one_mask, true); + wait_for_trigger(); + + // Setting mask to one_mask should not affect trigger + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_enabled_statuses(one_mask)); + EXPECT_EQ(one_mask.to_string(), cond.get_enabled_statuses().to_string()); + wait_for_trigger(); + + // Untriggering other_mask should not affect trigger + cond.get_impl()->set_status(other_mask, false); + wait_for_trigger(); - ASSERT_EQ(cond.set_enabled_statuses(StatusMask()), ReturnCode_t::RETCODE_UNSUPPORTED); - ASSERT_EQ(cond.get_enabled_statuses().to_string(), StatusMask().to_string()); - ASSERT_EQ(cond.get_entity(), nullptr); + // Setting mask to other_mask should untrigger + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_enabled_statuses(other_mask)); + EXPECT_EQ(other_mask.to_string(), cond.get_enabled_statuses().to_string()); + expect_no_trigger(); - HELPER_WaitForEntries(2); + // Setting mask to one_mask should trigger + EXPECT_EQ(ReturnCode_t::RETCODE_OK, cond.set_enabled_statuses(one_mask)); + EXPECT_EQ(one_mask.to_string(), cond.get_enabled_statuses().to_string()); + EXPECT_TRUE(cond.get_trigger_value()); + wait_for_trigger(); } int main( diff --git a/test/unittest/dds/core/condition/StatusConditionImplTests.cpp b/test/unittest/dds/core/condition/StatusConditionImplTests.cpp new file mode 100644 index 00000000000..90b42f2d149 --- /dev/null +++ b/test/unittest/dds/core/condition/StatusConditionImplTests.cpp @@ -0,0 +1,157 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +// Include mocks first +#include + +// Include UUT +#include + +// Other includes +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +TEST(StatusConditionImplTests, enabled_status_management) +{ + ConditionNotifier notifier; + StatusConditionImpl uut(¬ifier); + + StatusMask mask_none = StatusMask::none(); + StatusMask mask_all = StatusMask::all(); + StatusMask mask_single = StatusMask::inconsistent_topic(); + + // According to the DDS standard, StatusCondition should start with all statuses enabled + EXPECT_EQ(mask_all.to_string(), uut.get_enabled_statuses().to_string()); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, uut.set_enabled_statuses(mask_single)); + EXPECT_EQ(mask_single.to_string(), uut.get_enabled_statuses().to_string()); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, uut.set_enabled_statuses(mask_none)); + EXPECT_EQ(mask_none.to_string(), uut.get_enabled_statuses().to_string()); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, uut.set_enabled_statuses(mask_all)); + EXPECT_EQ(mask_all.to_string(), uut.get_enabled_statuses().to_string()); +} + +TEST(StatusConditionImplTests, notify_trigger) +{ + ::testing::StrictMock notifier; + StatusConditionImpl uut(¬ifier); + + StatusMask mask_none = StatusMask::none(); + StatusMask mask_all = StatusMask::all(); + StatusMask one_mask = StatusMask::inconsistent_topic(); + StatusMask other_mask = StatusMask::data_on_readers(); + StatusMask both_mask = one_mask; + both_mask |= other_mask; + + // Condition should be untriggered upon creation + EXPECT_FALSE(uut.get_trigger_value()); + EXPECT_EQ(mask_all.to_string(), uut.get_enabled_statuses().to_string()); + EXPECT_EQ(mask_none.to_string(), uut.get_raw_status().to_string()); + + // Triggering other_mask should trigger + auto& call1 = EXPECT_CALL(notifier, notify()).Times(1); + uut.set_status(other_mask, true); + EXPECT_EQ(other_mask.to_string(), uut.get_raw_status().to_string()); + EXPECT_TRUE(uut.get_trigger_value()); + + // Setting mask to one_mask should untrigger + EXPECT_EQ(ReturnCode_t::RETCODE_OK, uut.set_enabled_statuses(one_mask)); + EXPECT_EQ(one_mask.to_string(), uut.get_enabled_statuses().to_string()); + EXPECT_FALSE(uut.get_trigger_value()); + + // Triggering one_mask should trigger + auto& call2 = EXPECT_CALL(notifier, notify()).Times(1).After(call1); + uut.set_status(one_mask, true); + EXPECT_EQ(both_mask.to_string(), uut.get_raw_status().to_string()); + EXPECT_TRUE(uut.get_trigger_value()); + + // Triggering twice should not affect trigger + uut.set_status(one_mask, true); + EXPECT_EQ(both_mask.to_string(), uut.get_raw_status().to_string()); + EXPECT_TRUE(uut.get_trigger_value()); + + // Untriggering other_mask should not affect trigger + uut.set_status(other_mask, false); + EXPECT_EQ(one_mask.to_string(), uut.get_raw_status().to_string()); + EXPECT_TRUE(uut.get_trigger_value()); + + // Triggering other_mask should not affect trigger + uut.set_status(other_mask, true); + EXPECT_EQ(both_mask.to_string(), uut.get_raw_status().to_string()); + EXPECT_TRUE(uut.get_trigger_value()); + + // Untriggering one_mask should untrigger + uut.set_status(one_mask, false); + EXPECT_EQ(other_mask.to_string(), uut.get_raw_status().to_string()); + EXPECT_FALSE(uut.get_trigger_value()); + + // Untriggering other_mask should not trigger + uut.set_status(other_mask, false); + EXPECT_EQ(mask_none.to_string(), uut.get_raw_status().to_string()); + EXPECT_FALSE(uut.get_trigger_value()); + + // Triggering other_mask should not trigger + uut.set_status(other_mask, true); + EXPECT_EQ(other_mask.to_string(), uut.get_raw_status().to_string()); + EXPECT_FALSE(uut.get_trigger_value()); + + // Setting mask to other_mask should trigger + auto& call3 = EXPECT_CALL(notifier, notify()).Times(1).After(call2); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, uut.set_enabled_statuses(other_mask)); + EXPECT_EQ(other_mask.to_string(), uut.get_enabled_statuses().to_string()); + + // Triggering one_mask should not affect trigger + uut.set_status(one_mask, true); + EXPECT_EQ(both_mask.to_string(), uut.get_raw_status().to_string()); + EXPECT_TRUE(uut.get_trigger_value()); + + // Setting mask to one_mask should not affect trigger + EXPECT_EQ(ReturnCode_t::RETCODE_OK, uut.set_enabled_statuses(one_mask)); + EXPECT_EQ(one_mask.to_string(), uut.get_enabled_statuses().to_string()); + EXPECT_TRUE(uut.get_trigger_value()); + + // Untriggering other_mask should not affect trigger + uut.set_status(other_mask, false); + EXPECT_EQ(one_mask.to_string(), uut.get_raw_status().to_string()); + EXPECT_TRUE(uut.get_trigger_value()); + + // Setting mask to other_mask should untrigger + EXPECT_EQ(ReturnCode_t::RETCODE_OK, uut.set_enabled_statuses(other_mask)); + EXPECT_EQ(other_mask.to_string(), uut.get_enabled_statuses().to_string()); + EXPECT_FALSE(uut.get_trigger_value()); + + // Setting mask to one_mask should trigger + EXPECT_CALL(notifier, notify()).Times(1).After(call3); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, uut.set_enabled_statuses(one_mask)); + EXPECT_EQ(one_mask.to_string(), uut.get_enabled_statuses().to_string()); + EXPECT_TRUE(uut.get_trigger_value()); +} + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/unittest/dds/core/condition/WaitSetImplTests.cpp b/test/unittest/dds/core/condition/WaitSetImplTests.cpp new file mode 100644 index 00000000000..2d13a7a3f8d --- /dev/null +++ b/test/unittest/dds/core/condition/WaitSetImplTests.cpp @@ -0,0 +1,206 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +// Include mocks first +#include + +// Include UUT +#include + +// Other includes +#include + +using namespace eprosima::fastdds::dds; +using namespace eprosima::fastdds::dds::detail; +using ::testing::_; + +class TestCondition : public Condition +{ +public: + + bool trigger_value = false; + + bool get_trigger_value() const override + { + return trigger_value; + } + +}; + +TEST(WaitSetImplTests, condition_management) +{ + TestCondition condition; + ConditionSeq conditions; + WaitSetImpl wait_set; + + // The condition is attached, detached, attached again and then deleted. + // The following calls to the notifier are expected + auto notifier = condition.get_notifier(); + EXPECT_CALL(*notifier, attach_to(_)).Times(2); + EXPECT_CALL(*notifier, detach_from(_)).Times(1); + EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1); + + // WaitSetImpl should be created without conditions + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_TRUE(conditions.empty()); + + // Trying to detach without having attached + EXPECT_EQ(ReturnCode_t::RETCODE_PRECONDITION_NOT_MET, wait_set.detach_condition(condition)); + + // Adding the same condition several times should always succeed and keep the list with a single condition + for (int i = 0; i < 2; ++i) + { + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.attach_condition(condition)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + } + + // Detaching the condition once should succeed + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.detach_condition(condition)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_TRUE(conditions.empty()); + + // Detaching a second time should fail + EXPECT_EQ(ReturnCode_t::RETCODE_PRECONDITION_NOT_MET, wait_set.detach_condition(condition)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_TRUE(conditions.empty()); + + // Attach the condition again + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.attach_condition(condition)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + + // Calling will_be_deleted should detach the condition + wait_set.will_be_deleted(condition); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.get_conditions(conditions)); + EXPECT_TRUE(conditions.empty()); +} + +TEST(WaitSetImplTests, wait) +{ + const eprosima::fastrtps::Duration_t timeout{ 1, 0 }; + + TestCondition condition; + + { + ConditionSeq conditions; + WaitSetImpl wait_set; + + // Expecting calls on the notifier of triggered_condition + auto notifier = condition.get_notifier(); + EXPECT_CALL(*notifier, attach_to(_)).Times(1); + EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1); + + // Waiting on empty wait set should timeout + EXPECT_EQ(ReturnCode_t::RETCODE_TIMEOUT, wait_set.wait(conditions, timeout)); + EXPECT_TRUE(conditions.empty()); + + // Attach condition + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.attach_condition(condition)); + + // Waiting on untriggered condition should timeout + EXPECT_EQ(ReturnCode_t::RETCODE_TIMEOUT, wait_set.wait(conditions, timeout)); + EXPECT_TRUE(conditions.empty()); + + // Waiting on already triggered condition should inmediately return condition + condition.trigger_value = true; + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.wait(conditions, timeout)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + + // A wake_up without a trigger should timeout + { + condition.trigger_value = false; + std::thread notify_without_trigger([&]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + wait_set.wake_up(); + }); + EXPECT_EQ(ReturnCode_t::RETCODE_TIMEOUT, wait_set.wait(conditions, timeout)); + EXPECT_TRUE(conditions.empty()); + notify_without_trigger.join(); + } + + // A wake_up with a trigger should return the condition + { + std::thread trigger_and_notify([&]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + condition.trigger_value = true; + wait_set.wake_up(); + }); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.wait(conditions, timeout)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + trigger_and_notify.join(); + } + + // Two threads are not allowed to wait at the same time + { + std::thread second_wait_thread([&wait_set, &timeout]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ConditionSeq conds; + EXPECT_EQ(ReturnCode_t::RETCODE_PRECONDITION_NOT_MET, wait_set.wait(conds, timeout)); + EXPECT_TRUE(conds.empty()); + }); + + condition.trigger_value = false; + EXPECT_EQ(ReturnCode_t::RETCODE_TIMEOUT, wait_set.wait(conditions, timeout)); + EXPECT_TRUE(conditions.empty()); + second_wait_thread.join(); + } + + // Waiting forever and adding a triggered condition should wake and only return the added condition + { + TestCondition triggered_condition; + triggered_condition.trigger_value = true; + + // Expecting calls on the notifier of triggered_condition + notifier = triggered_condition.get_notifier(); + EXPECT_CALL(*notifier, attach_to(_)).Times(1); + EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1); + + std::thread add_triggered_condition([&]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + wait_set.attach_condition(triggered_condition); + }); + + EXPECT_EQ(ReturnCode_t::RETCODE_OK, wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite)); + EXPECT_EQ(1u, conditions.size()); + EXPECT_EQ(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &condition)); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition)); + add_triggered_condition.join(); + + wait_set.will_be_deleted(triggered_condition); + } + + wait_set.will_be_deleted(condition); + } +} + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/unittest/dds/core/condition/mock/ConditionNotifier/fastdds/core/condition/ConditionNotifier.hpp b/test/unittest/dds/core/condition/mock/ConditionNotifier/fastdds/core/condition/ConditionNotifier.hpp new file mode 100644 index 00000000000..0ef854e6e59 --- /dev/null +++ b/test/unittest/dds/core/condition/mock/ConditionNotifier/fastdds/core/condition/ConditionNotifier.hpp @@ -0,0 +1,67 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ConditionNotifier.hpp + */ + +#ifndef _FASTDDS_CORE_CONDITION_CONDITIONNOTIFIER_HPP_ +#define _FASTDDS_CORE_CONDITION_CONDITIONNOTIFIER_HPP_ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +struct WaitSetImpl; + +struct ConditionNotifier +{ + /** + * Add a WaitSet implementation to the list of attached entries. + * Does nothing if wait_set was already attached to this notifier. + * @param wait_set WaitSet implementation to add to the list. + */ + MOCK_METHOD1(attach_to, void(WaitSetImpl * wait_set)); + + /** + * Remove a WaitSet implementation from the list of attached entries. + * Does nothing if wait_set was not attached to this notifier. + * @param wait_set WaitSet implementation to remove from the list. + */ + MOCK_METHOD1(detach_from, void(WaitSetImpl * wait_set)); + + /** + * Wake up all the WaitSet implementations attached to this notifier. + */ + MOCK_METHOD0(notify, void()); + + /** + * Inform all the WaitSet implementations attached to this notifier that + * a condition is going to be deleted. + * @param condition The Condition being deleted. + */ + MOCK_METHOD1(will_be_deleted, void(const Condition& condition)); +}; + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_CORE_CONDITION_CONDITIONNOTIFIER_HPP_ diff --git a/test/unittest/dds/core/condition/mock/WaitSetImpl/fastdds/core/condition/WaitSetImpl.hpp b/test/unittest/dds/core/condition/mock/WaitSetImpl/fastdds/core/condition/WaitSetImpl.hpp new file mode 100644 index 00000000000..d7ee0f82125 --- /dev/null +++ b/test/unittest/dds/core/condition/mock/WaitSetImpl/fastdds/core/condition/WaitSetImpl.hpp @@ -0,0 +1,50 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file WaitSetImpl.hpp + */ + +#ifndef _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_ +#define _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +struct WaitSetImpl +{ + /** + * @brief Wake up this WaitSet implementation if it was waiting + */ + MOCK_METHOD0(wake_up, void()); + + /** + * @brief Called from the destructor of a Condition to inform this WaitSet implementation that the condition + * should be automatically detached. + */ + MOCK_METHOD1(will_be_deleted, void(const Condition& condition)); +}; + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_ diff --git a/test/unittest/dds/core/entity/CMakeLists.txt b/test/unittest/dds/core/entity/CMakeLists.txt index b6303594d02..4559ac338e8 100644 --- a/test/unittest/dds/core/entity/CMakeLists.txt +++ b/test/unittest/dds/core/entity/CMakeLists.txt @@ -18,7 +18,12 @@ endif() set(ENTITY_TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypesBase.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/Entity.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/ConditionNotifier.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusCondition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusConditionImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/WaitSetImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp @@ -32,6 +37,9 @@ target_compile_definitions(EntityTests PRIVATE FASTRTPS_NO_LIB $<$:__INTERNALDEBUG> # Internal debug activated. ) target_include_directories(EntityTests PRIVATE - ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include) + ${PROJECT_SOURCE_DIR}/include + ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp +) target_link_libraries(EntityTests GTest::gtest fastcdr) add_gtest(EntityTests SOURCES ${ENTITY_TESTS_SOURCE}) diff --git a/test/unittest/dds/core/entity/EntityTests.cpp b/test/unittest/dds/core/entity/EntityTests.cpp index 6ddd01ed2ee..a708907629d 100644 --- a/test/unittest/dds/core/entity/EntityTests.cpp +++ b/test/unittest/dds/core/entity/EntityTests.cpp @@ -151,15 +151,12 @@ TEST_F(EntityTests, entity_equal_operator) ASSERT_FALSE(entity1 == entity4); } -/* Test unsupported methods behaviour*/ -TEST_F(EntityTests, unsupported_entity_methods) +TEST_F(EntityTests, get_statuscondition) { Entity entity; - // It cannot compare because StatusCondition does not have comparaison methods - entity.get_statuscondition(); - - HELPER_WaitForEntries(1); + StatusCondition& cond = entity.get_statuscondition(); + EXPECT_EQ(&entity, cond.get_entity()); } int main( diff --git a/test/unittest/dds/publisher/DataWriterTests.cpp b/test/unittest/dds/publisher/DataWriterTests.cpp index d75174c9caf..8a8a02e65e6 100644 --- a/test/unittest/dds/publisher/DataWriterTests.cpp +++ b/test/unittest/dds/publisher/DataWriterTests.cpp @@ -866,14 +866,13 @@ class DataWriterUnsupportedTests : public ::testing::Test /* * This test checks that the DataWriter methods defined in the standard not yet implemented in FastDDS return * ReturnCode_t::RETCODE_UNSUPPORTED. The following methods are checked: - * 1. get_publication_matched_status - * 2. get_matched_subscription_data - * 3. write_w_timestamp - * 4. register_instance_w_timestamp - * 5. unregister_instance_w_timestamp - * 6. get_matched_subscriptions - * 7. get_key_value - * 8. lookup_instance + * 1. get_matched_subscription_data + * 2. write_w_timestamp + * 3. register_instance_w_timestamp + * 4. unregister_instance_w_timestamp + * 5. get_matched_subscriptions + * 6. get_key_value + * 7. lookup_instance */ TEST_F(DataWriterUnsupportedTests, UnsupportedDataWriterMethods) { @@ -893,11 +892,6 @@ TEST_F(DataWriterUnsupportedTests, UnsupportedDataWriterMethods) DataWriter* data_writer = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT); ASSERT_NE(publisher, nullptr); - PublicationMatchedStatus status; - EXPECT_EQ( - ReturnCode_t::RETCODE_UNSUPPORTED, - data_writer->get_publication_matched_status(status)); - builtin::SubscriptionBuiltinTopicData subscription_data; fastrtps::rtps::InstanceHandle_t subscription_handle; EXPECT_EQ( diff --git a/test/unittest/dds/status/CMakeLists.txt b/test/unittest/dds/status/CMakeLists.txt index 16760eace9c..c9c2d024def 100644 --- a/test/unittest/dds/status/CMakeLists.txt +++ b/test/unittest/dds/status/CMakeLists.txt @@ -13,6 +13,12 @@ # limitations under the License. set(LISTENERTESTS_SOURCE ListenerTests.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/Entity.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/ConditionNotifier.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusCondition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusConditionImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/WaitSetImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/policy/QosPolicyUtils.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/domain/DomainParticipant.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/domain/DomainParticipantFactory.cpp diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index 20aea826b7b..d526a0b665f 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -1863,17 +1863,15 @@ class DataReaderUnsupportedTests : public ::testing::Test * ReturnCode_t::RETCODE_UNSUPPORTED. The following methods are checked: * 1. get_sample_lost_status * 2. get_sample_rejected_status - * 3. get_subscription_matched_status - * 4. get_subscription_matched_status - * 5. get_matched_publication_data - * 6. create_readcondition - * 7. create_querycondition - * 8. delete_readcondition - * 9. delete_contained_entities - * 10. get_matched_publications - * 11. get_key_value - * 12. lookup_instance - * 13. wait_for_historical_data + * 3. get_matched_publication_data + * 4. create_readcondition + * 5. create_querycondition + * 6. delete_readcondition + * 7. delete_contained_entities + * 8. get_matched_publications + * 9. get_key_value + * 10. lookup_instance + * 11. wait_for_historical_data */ TEST_F(DataReaderUnsupportedTests, UnsupportedDataReaderMethods) { @@ -1903,16 +1901,6 @@ TEST_F(DataReaderUnsupportedTests, UnsupportedDataReaderMethods) EXPECT_EQ(ReturnCode_t::RETCODE_UNSUPPORTED, data_reader->get_sample_rejected_status(status)); } - { - SubscriptionMatchedStatus status; - EXPECT_EQ(ReturnCode_t::RETCODE_UNSUPPORTED, data_reader->get_subscription_matched_status(status)); - } - - { - SubscriptionMatchedStatus status; - EXPECT_EQ(ReturnCode_t::RETCODE_UNSUPPORTED, data_reader->get_subscription_matched_status(status)); - } - builtin::PublicationBuiltinTopicData publication_data; fastrtps::rtps::InstanceHandle_t publication_handle; EXPECT_EQ( diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt index 7313b76751f..b6884f41f39 100644 --- a/test/unittest/statistics/dds/CMakeLists.txt +++ b/test/unittest/statistics/dds/CMakeLists.txt @@ -104,6 +104,13 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS) ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/builtin/typelookup/TypeLookupReplyListener.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/builtin/typelookup/TypeLookupRequestListener.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/builtin/typelookup/common/TypeLookupTypes.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/Entity.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/Condition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/ConditionNotifier.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusCondition.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/StatusConditionImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/condition/WaitSetImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/policy/QosPolicyUtils.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/policy/ParameterList.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/policy/QosPolicyUtils.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/domain/DomainParticipant.cpp